From c723c1630a69167ff29c42f7f866bbb8b589cffd Mon Sep 17 00:00:00 2001
From: Iustin Pop <iustin@google.com>
Date: Fri, 23 Jan 2009 12:36:18 +0000
Subject: [PATCH] Rework the execution model in burnin

This patch changes (significantly) the execution model in burnin:
  - for all runs, (almost) all instance mods in a single Burn* procedure
    are done as part of a job; so for example add disk, stop, remove
    disk, start are no longer done as separate jobs but as a single job
    consisting of four opcodes
  - for parallel runs, all Burn* procedures except the rename (which
    uses a single target name) run in parallel; before, only the
    creation was done in parallel
  - due to the single-job execution and also parallel execution, the
    logging messages are no longer happening synchronously with the
    execution, so they are more informative than an actual execution log

The end result is that burnin now tests properly multi-opcode jobs and
also tests all opcodes (except rename) for parallel execution.

Note: On a test cluster, parallelization reduces burnin time from 23m to
15m.

Reviewed-by: ultrotter
---
 tools/burnin | 269 ++++++++++++++++++++++++++++-----------------------
 1 file changed, 150 insertions(+), 119 deletions(-)

diff --git a/tools/burnin b/tools/burnin
index fa8776284..c8d76f978 100755
--- a/tools/burnin
+++ b/tools/burnin
@@ -29,12 +29,10 @@ import optparse
 import time
 import socket
 import urllib
-import errno
 from itertools import izip, islice, cycle
 from cStringIO import StringIO
 
 from ganeti import opcodes
-from ganeti import mcpu
 from ganeti import constants
 from ganeti import cli
 from ganeti import errors
@@ -106,6 +104,7 @@ class Burner(object):
     self.nodes = []
     self.instances = []
     self.to_rem = []
+    self.queued_ops = []
     self.opts = None
     self.ParseOptions()
     self.cl = cli.GetClient()
@@ -126,25 +125,52 @@ class Burner(object):
     if self.opts.verbose:
       Log(msg, indent=3)
 
-  def ExecOp(self, op):
+  def ExecOp(self, *ops):
+    """Execute one or more opcodes and manage the exec buffer.
+
+    @result: if only opcode has been passed, we return its result;
+        otherwise we return the list of results
+
+    """
+    job_id = cli.SendJob(ops, cl=self.cl)
+    results = cli.PollJob(job_id, cl=self.cl, feedback_fn=self.Feedback)
+    if len(ops) == 1:
+      return results[0]
+    else:
+      return results
+
+  def ExecOrQueue(self, name, *ops):
     """Execute an opcode and manage the exec buffer."""
-    self.ClearFeedbackBuf()
-    return cli.SubmitOpCode(op, feedback_fn=self.Feedback, cl=self.cl)
+    if self.opts.parallel:
+      self.queued_ops.append((ops, name))
+    else:
+      return self.ExecOp(*ops)
+
+  def CommitQueue(self):
+    """Execute all submitted opcodes in case of parallel burnin"""
+    if not self.opts.parallel:
+      return
+
+    try:
+      results = self.ExecJobSet(self.queued_ops)
+    finally:
+      self.queued_ops = []
+    return results
 
   def ExecJobSet(self, jobs):
     """Execute a set of jobs and return once all are done.
 
     The method will return the list of results, if all jobs are
-    successfull. Otherwise, OpExecError will be raised from within
+    successful. Otherwise, OpExecError will be raised from within
     cli.py.
 
     """
     self.ClearFeedbackBuf()
-    job_ids = [cli.SendJob(job, cl=self.cl) for job in jobs]
-    Log("Submitted job IDs %s" % ", ".join(job_ids), indent=1)
+    job_ids = [cli.SendJob(row[0], cl=self.cl) for row in jobs]
+    Log("Submitted job ID(s) %s" % ", ".join(job_ids), indent=1)
     results = []
-    for jid in job_ids:
-      Log("Waiting for job %s" % jid, indent=2)
+    for jid, (_, iname) in zip(job_ids, jobs):
+      Log("waiting for job %s for %s" % (jid, iname), indent=2)
       results.append(cli.PollJob(jid, cl=self.cl, feedback_fn=self.Feedback))
 
     return results
@@ -316,7 +342,7 @@ class Burner(object):
     if self.opts.os not in os_set:
       Err("OS '%s' not found" % self.opts.os)
 
-  def CreateInstances(self):
+  def BurnCreateInstances(self):
     """Create the given instances.
 
     """
@@ -324,7 +350,6 @@ class Burner(object):
     mytor = izip(cycle(self.nodes),
                  islice(cycle(self.nodes), 1, None),
                  self.instances)
-    jobset = []
 
     Log("Creating instances")
     for pnode, snode, instance in mytor:
@@ -359,21 +384,15 @@ class Burner(object):
                                     hvparams=self.hvp,
                                     )
 
-      if self.opts.parallel:
-        jobset.append([op])
-        # FIXME: here we should not append to to_rem uncoditionally,
-        # but only when the job is successful
-        self.to_rem.append(instance)
-      else:
-        self.ExecOp(op)
-        self.to_rem.append(instance)
-    if self.opts.parallel:
-      self.ExecJobSet(jobset)
+      self.ExecOrQueue(instance, op)
+      self.to_rem.append(instance)
+
+    self.CommitQueue()
 
     for instance in self.instances:
       self._CheckInstanceAlive(instance)
 
-  def GrowDisks(self):
+  def BurnGrowDisks(self):
     """Grow both the os and the swap disks by the requested amount, if any."""
     Log("Growing disks")
     for instance in self.instances:
@@ -383,21 +402,25 @@ class Burner(object):
           op = opcodes.OpGrowDisk(instance_name=instance, disk=idx,
                                   amount=growth, wait_for_sync=True)
           Log("increase disk/%s by %s MB" % (idx, growth), indent=2)
-          self.ExecOp(op)
+          self.ExecOrQueue(instance, op)
+    self.CommitQueue()
 
-  def ReplaceDisks1D8(self):
+  def BurnReplaceDisks1D8(self):
     """Replace disks on primary and secondary for drbd8."""
     Log("Replacing disks on the same nodes")
     for instance in self.instances:
       Log("instance %s" % instance, indent=1)
+      ops = []
       for mode in constants.REPLACE_DISK_SEC, constants.REPLACE_DISK_PRI:
         op = opcodes.OpReplaceDisks(instance_name=instance,
                                     mode=mode,
                                     disks=[i for i in range(self.disk_count)])
         Log("run %s" % mode, indent=2)
-        self.ExecOp(op)
+        ops.append(op)
+      self.ExecOrQueue(instance, *ops)
+    self.CommitQueue()
 
-  def ReplaceDisks2(self):
+  def BurnReplaceDisks2(self):
     """Replace secondary node."""
     Log("Changing the secondary node")
     mode = constants.REPLACE_DISK_CHG
@@ -417,9 +440,10 @@ class Burner(object):
                                   iallocator=self.opts.iallocator,
                                   disks=[i for i in range(self.disk_count)])
       Log("run %s %s" % (mode, msg), indent=2)
-      self.ExecOp(op)
+      self.ExecOrQueue(instance, op)
+    self.CommitQueue()
 
-  def Failover(self):
+  def BurnFailover(self):
     """Failover the instances."""
     Log("Failing over instances")
     for instance in self.instances:
@@ -427,26 +451,26 @@ class Burner(object):
       op = opcodes.OpFailoverInstance(instance_name=instance,
                                       ignore_consistency=False)
 
-      self.ExecOp(op)
+      self.ExecOrQueue(instance, op)
+    self.CommitQueue()
     for instance in self.instances:
       self._CheckInstanceAlive(instance)
 
-  def Migrate(self):
+  def BurnMigrate(self):
     """Migrate the instances."""
     Log("Migrating instances")
     for instance in self.instances:
       Log("instance %s" % instance, indent=1)
-      op = opcodes.OpMigrateInstance(instance_name=instance, live=True,
-                                     cleanup=False)
+      op1 = opcodes.OpMigrateInstance(instance_name=instance, live=True,
+                                      cleanup=False)
 
-      Log("migration", indent=2)
-      self.ExecOp(op)
-      op = opcodes.OpMigrateInstance(instance_name=instance, live=True,
-                                     cleanup=True)
-      Log("migration cleanup", indent=2)
-      self.ExecOp(op)
+      op2 = opcodes.OpMigrateInstance(instance_name=instance, live=True,
+                                      cleanup=True)
+      Log("migration and migration cleanup", indent=2)
+      self.ExecOrQueue(instance, op1, op2)
+    self.CommitQueue()
 
-  def ImportExport(self):
+  def BurnImportExport(self):
     """Export the instance, delete it, and import it back.
 
     """
@@ -458,6 +482,11 @@ class Burner(object):
 
     for pnode, snode, enode, instance in mytor:
       Log("instance %s" % instance, indent=1)
+      # read the full name of the instance
+      nam_op = opcodes.OpQueryInstances(output_fields=["name"],
+                                           names=[instance])
+      full_name = self.ExecOp(nam_op)[0][0]
+
       if self.opts.iallocator:
         pnode = snode = None
         import_log_msg = ("import from %s"
@@ -476,9 +505,6 @@ class Burner(object):
                                            shutdown=True)
       rem_op = opcodes.OpRemoveInstance(instance_name=instance,
                                         ignore_failures=True)
-      nam_op = opcodes.OpQueryInstances(output_fields=["name"],
-                                           names=[instance])
-      full_name = self.ExecOp(nam_op)[0][0]
       imp_dir = os.path.join(constants.EXPORT_DIR, full_name)
       imp_op = opcodes.OpCreateInstance(instance_name=instance,
                                         disks = [ {"size": size}
@@ -503,125 +529,130 @@ class Burner(object):
       erem_op = opcodes.OpRemoveExport(instance_name=instance)
 
       Log("export to node %s" % enode, indent=2)
-      self.ExecOp(exp_op)
       Log("remove instance", indent=2)
-      self.ExecOp(rem_op)
-      self.to_rem.remove(instance)
       Log(import_log_msg, indent=2)
-      self.ExecOp(imp_op)
       Log("remove export", indent=2)
-      self.ExecOp(erem_op)
-
-      self.to_rem.append(instance)
+      self.ExecOrQueue(instance, exp_op, rem_op, imp_op, erem_op)
 
+    self.CommitQueue()
     for instance in self.instances:
       self._CheckInstanceAlive(instance)
 
-  def StopInstance(self, instance):
+  def StopInstanceOp(self, instance):
     """Stop given instance."""
-    op = opcodes.OpShutdownInstance(instance_name=instance)
-    Log("shutdown", indent=2)
-    self.ExecOp(op)
+    return opcodes.OpShutdownInstance(instance_name=instance)
 
-  def StartInstance(self, instance):
+  def StartInstanceOp(self, instance):
     """Start given instance."""
-    op = opcodes.OpStartupInstance(instance_name=instance, force=False)
-    Log("startup", indent=2)
-    self.ExecOp(op)
+    return opcodes.OpStartupInstance(instance_name=instance, force=False)
 
-  def RenameInstance(self, instance, instance_new):
+  def RenameInstanceOp(self, instance, instance_new):
     """Rename instance."""
-    op = opcodes.OpRenameInstance(instance_name=instance,
-                                  new_name=instance_new)
-    Log("rename to %s" % instance_new, indent=2)
-    self.ExecOp(op)
+    return opcodes.OpRenameInstance(instance_name=instance,
+                                    new_name=instance_new)
 
-  def StopStart(self):
+  def BurnStopStart(self):
     """Stop/start the instances."""
     Log("Stopping and starting instances")
     for instance in self.instances:
       Log("instance %s" % instance, indent=1)
-      self.StopInstance(instance)
-      self.StartInstance(instance)
+      op1 = self.StopInstanceOp(instance)
+      op2 = self.StartInstanceOp(instance)
+      self.ExecOrQueue(instance, op1, op2)
+
+    self.CommitQueue()
 
     for instance in self.instances:
       self._CheckInstanceAlive(instance)
 
-  def Remove(self):
+  def BurnRemove(self):
     """Remove the instances."""
     Log("Removing instances")
     for instance in self.to_rem:
       Log("instance %s" % instance, indent=1)
       op = opcodes.OpRemoveInstance(instance_name=instance,
                                     ignore_failures=True)
-      self.ExecOp(op)
+      self.ExecOrQueue(instance, op)
+
+    self.CommitQueue()
+
+  def BurnRename(self):
+    """Rename the instances.
 
-  def Rename(self):
-    """Rename the instances."""
+    Note that this function will not execute in parallel, since we
+    only have one target for rename.
+
+    """
     Log("Renaming instances")
     rename = self.opts.rename
     for instance in self.instances:
       Log("instance %s" % instance, indent=1)
-      self.StopInstance(instance)
-      self.RenameInstance(instance, rename)
-      self.StartInstance(rename)
+      op_stop = self.StopInstanceOp(instance)
+      op_rename1 = self.RenameInstanceOp(instance, rename)
+      op_rename2 = self.RenameInstanceOp(rename, instance)
+      op_start1 = self.StartInstanceOp(rename)
+      op_start2 = self.StartInstanceOp(instance)
+      self.ExecOp(op_stop, op_rename1, op_start1)
       self._CheckInstanceAlive(rename)
-      self.StopInstance(rename)
-      self.RenameInstance(rename, instance)
-      self.StartInstance(instance)
-
-    for instance in self.instances:
+      self.ExecOp(op_stop, op_rename2, op_start2)
       self._CheckInstanceAlive(instance)
 
-  def Reinstall(self):
+  def BurnReinstall(self):
     """Reinstall the instances."""
     Log("Reinstalling instances")
     for instance in self.instances:
       Log("instance %s" % instance, indent=1)
-      self.StopInstance(instance)
-      op = opcodes.OpReinstallInstance(instance_name=instance)
+      op1 = self.StopInstanceOp(instance)
+      op2 = opcodes.OpReinstallInstance(instance_name=instance)
       Log("reinstall without passing the OS", indent=2)
-      self.ExecOp(op)
-      op = opcodes.OpReinstallInstance(instance_name=instance,
-                                       os_type=self.opts.os)
+      op3 = opcodes.OpReinstallInstance(instance_name=instance,
+                                        os_type=self.opts.os)
       Log("reinstall specifying the OS", indent=2)
-      self.ExecOp(op)
-      self.StartInstance(instance)
+      op4 = self.StartInstanceOp(instance)
+      self.ExecOrQueue(instance, op1, op2, op3, op4)
+
+    self.CommitQueue()
+
     for instance in self.instances:
       self._CheckInstanceAlive(instance)
 
-  def Reboot(self):
+  def BurnReboot(self):
     """Reboot the instances."""
     Log("Rebooting instances")
     for instance in self.instances:
       Log("instance %s" % instance, indent=1)
+      ops = []
       for reboot_type in constants.REBOOT_TYPES:
         op = opcodes.OpRebootInstance(instance_name=instance,
                                       reboot_type=reboot_type,
                                       ignore_secondaries=False)
         Log("reboot with type '%s'" % reboot_type, indent=2)
-        self.ExecOp(op)
-        self._CheckInstanceAlive(instance)
+        ops.append(op)
+      self.ExecOrQueue(instance, *ops)
+
+    self.CommitQueue()
+
+    for instance in self.instances:
+      self._CheckInstanceAlive(instance)
 
-  def ActivateDisks(self):
+  def BurnActivateDisks(self):
     """Activate and deactivate disks of the instances."""
     Log("Activating/deactivating disks")
     for instance in self.instances:
       Log("instance %s" % instance, indent=1)
+      op_start = self.StartInstanceOp(instance)
       op_act = opcodes.OpActivateInstanceDisks(instance_name=instance)
       op_deact = opcodes.OpDeactivateInstanceDisks(instance_name=instance)
+      op_stop = self.StopInstanceOp(instance)
       Log("activate disks when online", indent=2)
-      self.ExecOp(op_act)
-      self.StopInstance(instance)
       Log("activate disks when offline", indent=2)
-      self.ExecOp(op_act)
       Log("deactivate disks (when offline)", indent=2)
-      self.ExecOp(op_deact)
-      self.StartInstance(instance)
+      self.ExecOrQueue(instance, op_act, op_stop, op_act, op_deact, op_start)
+    self.CommitQueue()
     for instance in self.instances:
       self._CheckInstanceAlive(instance)
 
-  def AddRemoveDisks(self):
+  def BurnAddRemoveDisks(self):
     """Add and remove an extra disk for the instances."""
     Log("Adding and removing disks")
     for instance in self.instances:
@@ -631,16 +662,16 @@ class Burner(object):
         disks=[(constants.DDM_ADD, {"size": self.disk_size[0]})])
       op_rem = opcodes.OpSetInstanceParams(\
         instance_name=instance, disks=[(constants.DDM_REMOVE, {})])
+      op_stop = self.StopInstanceOp(instance)
+      op_start = self.StartInstanceOp(instance)
       Log("adding a disk", indent=2)
-      self.ExecOp(op_add)
-      self.StopInstance(instance)
       Log("removing last disk", indent=2)
-      self.ExecOp(op_rem)
-      self.StartInstance(instance)
+      self.ExecOrQueue(instance, op_add, op_stop, op_rem, op_start)
+    self.CommitQueue()
     for instance in self.instances:
       self._CheckInstanceAlive(instance)
 
-  def AddRemoveNICs(self):
+  def BurnAddRemoveNICs(self):
     """Add and remove an extra NIC for the instances."""
     Log("Adding and removing NICs")
     for instance in self.instances:
@@ -650,9 +681,9 @@ class Burner(object):
       op_rem = opcodes.OpSetInstanceParams(\
         instance_name=instance, nics=[(constants.DDM_REMOVE, {})])
       Log("adding a NIC", indent=2)
-      self.ExecOp(op_add)
       Log("removing last NIC", indent=2)
-      self.ExecOp(op_rem)
+      self.ExecOrQueue(instance, op_add, op_rem)
+    self.CommitQueue()
 
   def _CheckInstanceAlive(self, instance):
     """Check if an instance is alive by doing http checks.
@@ -670,7 +701,7 @@ class Burner(object):
     while time.time() < end_time and url is None:
       try:
         url = self.url_opener.open("http://%s/hostname.txt" % instance)
-      except IOError, err:
+      except IOError:
         # here we can have connection refused, no route to host, etc.
         time.sleep(1)
     if url is None:
@@ -701,48 +732,48 @@ class Burner(object):
 
     has_err = True
     try:
-      self.CreateInstances()
+      self.BurnCreateInstances()
       if opts.do_replace1 and opts.disk_template in constants.DTS_NET_MIRROR:
-        self.ReplaceDisks1D8()
+        self.BurnReplaceDisks1D8()
       if (opts.do_replace2 and len(self.nodes) > 2 and
           opts.disk_template in constants.DTS_NET_MIRROR) :
-        self.ReplaceDisks2()
+        self.BurnReplaceDisks2()
 
       if (opts.disk_template != constants.DT_DISKLESS and
           utils.any(self.disk_growth, lambda n: n > 0)):
-        self.GrowDisks()
+        self.BurnGrowDisks()
 
       if opts.do_failover and opts.disk_template in constants.DTS_NET_MIRROR:
-        self.Failover()
+        self.BurnFailover()
 
       if opts.do_migrate and opts.disk_template == constants.DT_DRBD8:
-        self.Migrate()
+        self.BurnMigrate()
 
       if (opts.do_importexport and
           opts.disk_template not in (constants.DT_DISKLESS,
                                      constants.DT_FILE)):
-        self.ImportExport()
+        self.BurnImportExport()
 
       if opts.do_reinstall:
-        self.Reinstall()
+        self.BurnReinstall()
 
       if opts.do_reboot:
-        self.Reboot()
+        self.BurnReboot()
 
       if opts.do_addremove_disks:
-        self.AddRemoveDisks()
+        self.BurnAddRemoveDisks()
 
       if opts.do_addremove_nics:
-        self.AddRemoveNICs()
+        self.BurnAddRemoveNICs()
 
       if opts.do_activate_disks:
-        self.ActivateDisks()
+        self.BurnActivateDisks()
 
       if opts.rename:
-        self.Rename()
+        self.BurnRename()
 
       if opts.do_startstop:
-        self.StopStart()
+        self.BurnStopStart()
 
       has_err = False
     finally:
@@ -751,7 +782,7 @@ class Burner(object):
         Log(self.GetFeedbackBuf())
         Log("\n\n")
       if not self.opts.keep_instances:
-        self.Remove()
+        self.BurnRemove()
 
     return 0
 
-- 
GitLab