From ec5c88dcd52735701222871017daccab6983b701 Mon Sep 17 00:00:00 2001
From: Iustin Pop <iustin@google.com>
Date: Mon, 6 Oct 2008 14:11:02 +0000
Subject: [PATCH] Start implementation of parallel burnin

This patch introduces a simple framework for executing jobs in parallel
in burnin (the ExecJobSet function) and the "--parallel" command line
flag.

The patch also changes the instance creation to run in parallel when the
above flag is given. Error handling/instance removal is currently flacky
with this options if there are errors in the instance creation.

We also modify burnin to reuse a single client.

Reviewed-by: imsnah
---
 tools/burnin | 38 +++++++++++++++++++++++++++++++++++---
 1 file changed, 35 insertions(+), 3 deletions(-)

diff --git a/tools/burnin b/tools/burnin
index f5b9b1a53..e0fbd1d9b 100755
--- a/tools/burnin
+++ b/tools/burnin
@@ -69,6 +69,7 @@ class Burner(object):
     self.instances = []
     self.to_rem = []
     self.opts = None
+    self.cl = cli.GetClient()
     self.ParseOptions()
     self.GetState()
 
@@ -90,7 +91,25 @@ class Burner(object):
   def ExecOp(self, op):
     """Execute an opcode and manage the exec buffer."""
     self.ClearFeedbackBuf()
-    return cli.SubmitOpCode(op, feedback_fn=self.Feedback)
+    return cli.SubmitOpCode(op, feedback_fn=self.Feedback, cl=self.cl)
+
+  def ExecJobSet(self, jobs):
+    """Execute a set of jobs and return once all are done.
+
+    The method will return the list of results, if all jobs are
+    successfull. Otherwise, OpExecError will be raised from within
+    cli.py.
+
+    """
+    self.ClearFeedbackBuf()
+    job_ids = [cli.SendJob(job, cl=self.cl) for job in jobs]
+    Log("- Submitted job IDs %s" % ", ".join(job_ids))
+    results = []
+    for jid in job_ids:
+      Log("- Waiting for job %s" % jid)
+      results.append(cli.PollJob(jid, cl=self.cl, feedback_fn=self.Feedback))
+
+    return results
 
   def ParseOptions(self):
     """Parses the command line options.
@@ -149,6 +168,10 @@ class Burner(object):
                       help="Perform the allocation using an iallocator"
                       " instead of fixed node spread (node restrictions no"
                       " longer apply, therefore -n/--nodes must not be used")
+    parser.add_option("-p", "--parallel", default=False, action="store_true",
+                      dest="parallel",
+                      help="Enable parallelization of some operations in"
+                      " order to speed burnin or to test granular locking")
 
     options, args = parser.parse_args()
     if len(args) < 1 or options.os is None:
@@ -206,6 +229,7 @@ class Burner(object):
     mytor = izip(cycle(self.nodes),
                  islice(cycle(self.nodes), 1, None),
                  self.instances)
+    jobset = []
     for pnode, snode, instance in mytor:
       if self.opts.iallocator:
         pnode = snode = None
@@ -240,8 +264,16 @@ class Burner(object):
                                     hvm_nic_type=constants.HT_HVM_NIC_RTL8139,
                                     hvm_disk_type=constants.HT_HVM_DEV_IOEMU)
 
-      self.ExecOp(op)
-      self.to_rem.append(instance)
+      if self.opts.parallel:
+        jobset.append([op])
+        # FIXME: here we should not append to to_rem uncoditionally,
+        # but only when the job is successful
+        self.to_rem.append(instance)
+      else:
+        self.ExecOp(op)
+        self.to_rem.append(instance)
+    if self.opts.parallel:
+      self.ExecJobSet(jobset)
 
   def ReplaceDisks1D8(self):
     """Replace disks on primary and secondary for drbd8."""
-- 
GitLab