diff --git a/tools/burnin b/tools/burnin index 1ba5f7b4848387044d4b0bec428ed3c3c62dd1fd..3d2ad71b2819b9fb7d6e8c717ff080e1c24efabc 100755 --- a/tools/burnin +++ b/tools/burnin @@ -41,11 +41,16 @@ from ganeti import utils USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...") +MAX_RETRIES = 3 class InstanceDown(Exception): """The checked instance was not up""" +class BurninFailure(Exception): + """Failure detected during burning""" + + def Usage(): """Shows program usage information and exits the program.""" @@ -106,6 +111,9 @@ class Burner(object): self.to_rem = [] self.queued_ops = [] self.opts = None + self.queue_retry = False + self.disk_count = self.disk_growth = self.disk_size = None + self.hvp = self.bep = None self.ParseOptions() self.cl = cli.GetClient() self.GetState() @@ -125,7 +133,39 @@ class Burner(object): if self.opts.verbose: Log(msg, indent=3) - def ExecOp(self, *ops): + def MaybeRetry(self, retry_count, msg, fn, *args): + """Possibly retry a given function execution. + + @type retry_count: int + @param retry_count: retry counter: + - 0: non-retryable action + - 1: last retry for a retryable action + - MAX_RETRIES: original try for a retryable action + @type msg: str + @param msg: the kind of the operation + @type fn: callable + @param fn: the function to be called + + """ + try: + val = fn(*args) + if retry_count > 0 and retry_count < MAX_RETRIES: + Log("Idempotent %s succeeded after %d retries" % + (msg, MAX_RETRIES - retry_count)) + return val + except Exception, err: + if retry_count == 0: + Log("Non-idempotent %s failed, aborting" % (msg, )) + raise + elif retry_count == 1: + Log("Idempotent %s repeated failure, aborting" % (msg, )) + raise + else: + Log("Idempotent %s failed, retry #%d/%d: %s" % + (msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err)) + self.MaybeRetry(retry_count - 1, msg, fn, *args) + + def _ExecOp(self, *ops): """Execute one or more opcodes and manage the exec buffer. @result: if only opcode has been passed, we return its result; @@ -139,20 +179,48 @@ class Burner(object): else: return results + def ExecOp(self, retry, *ops): + """Execute one or more opcodes and manage the exec buffer. + + @result: if only opcode has been passed, we return its result; + otherwise we return the list of results + + """ + if retry: + rval = MAX_RETRIES + else: + rval = 0 + return self.MaybeRetry(rval, "opcode", self._ExecOp, *ops) + def ExecOrQueue(self, name, *ops): """Execute an opcode and manage the exec buffer.""" if self.opts.parallel: self.queued_ops.append((ops, name)) else: - return self.ExecOp(*ops) + return self.ExecOp(self.queue_retry, *ops) + + def StartBatch(self, retry): + """Start a new batch of jobs. + + @param retry: whether this is a retryable batch + + """ + self.queued_ops = [] + self.queue_retry = retry def CommitQueue(self): """Execute all submitted opcodes in case of parallel burnin""" if not self.opts.parallel: return + if self.queue_retry: + rval = MAX_RETRIES + else: + rval = 0 + try: - results = self.ExecJobSet(self.queued_ops) + results = self.MaybeRetry(rval, "jobset", self.ExecJobSet, + self.queued_ops) finally: self.queued_ops = [] return results @@ -171,8 +239,12 @@ class Burner(object): results = [] for jid, (_, iname) in zip(job_ids, jobs): Log("waiting for job %s for %s" % (jid, iname), indent=2) - results.append(cli.PollJob(jid, cl=self.cl, feedback_fn=self.Feedback)) - + try: + results.append(cli.PollJob(jid, cl=self.cl, feedback_fn=self.Feedback)) + except Exception, err: + Log("Job for %s failed: %s" % (iname, err)) + if len(results) != len(jobs): + raise BurninFailure() return results def ParseOptions(self): @@ -325,14 +397,14 @@ class Burner(object): try: op = opcodes.OpQueryNodes(output_fields=["name", "offline", "drained"], names=names, use_locking=True) - result = self.ExecOp(op) + result = self.ExecOp(True, op) except errors.GenericError, err: err_code, msg = cli.FormatError(err) Err(msg, exit_code=err_code) self.nodes = [data[0] for data in result if not (data[1] or data[2])] - result = self.ExecOp(opcodes.OpDiagnoseOS(output_fields=["name", "valid"], - names=[])) + op_diagos = opcodes.OpDiagnoseOS(output_fields=["name", "valid"], names=[]) + result = self.ExecOp(True, op_diagos) if not result: Err("Can't get the OS list") @@ -347,6 +419,7 @@ class Burner(object): """Create the given instances. """ + self.StartBatch(False) self.to_rem = [] mytor = izip(cycle(self.nodes), islice(cycle(self.nodes), 1, None), @@ -396,6 +469,7 @@ class Burner(object): def BurnGrowDisks(self): """Grow both the os and the swap disks by the requested amount, if any.""" Log("Growing disks") + self.StartBatch(False) for instance in self.instances: Log("instance %s" % instance, indent=1) for idx, growth in enumerate(self.disk_growth): @@ -409,6 +483,7 @@ class Burner(object): def BurnReplaceDisks1D8(self): """Replace disks on primary and secondary for drbd8.""" Log("Replacing disks on the same nodes") + self.StartBatch(True) for instance in self.instances: Log("instance %s" % instance, indent=1) ops = [] @@ -424,6 +499,7 @@ class Burner(object): def BurnReplaceDisks2(self): """Replace secondary node.""" Log("Changing the secondary node") + self.StartBatch(True) mode = constants.REPLACE_DISK_CHG mytor = izip(islice(cycle(self.nodes), 2, None), @@ -447,6 +523,7 @@ class Burner(object): def BurnFailover(self): """Failover the instances.""" Log("Failing over instances") + self.StartBatch(False) for instance in self.instances: Log("instance %s" % instance, indent=1) op = opcodes.OpFailoverInstance(instance_name=instance, @@ -460,6 +537,7 @@ class Burner(object): def BurnMigrate(self): """Migrate the instances.""" Log("Migrating instances") + self.StartBatch(False) for instance in self.instances: Log("instance %s" % instance, indent=1) op1 = opcodes.OpMigrateInstance(instance_name=instance, live=True, @@ -476,6 +554,7 @@ class Burner(object): """ Log("Exporting and re-importing instances") + self.StartBatch(False) mytor = izip(cycle(self.nodes), islice(cycle(self.nodes), 1, None), islice(cycle(self.nodes), 2, None), @@ -486,7 +565,7 @@ class Burner(object): # read the full name of the instance nam_op = opcodes.OpQueryInstances(output_fields=["name"], names=[instance], use_locking=True) - full_name = self.ExecOp(nam_op)[0][0] + full_name = self.ExecOp(False, nam_op)[0][0] if self.opts.iallocator: pnode = snode = None @@ -555,6 +634,7 @@ class Burner(object): def BurnStopStart(self): """Stop/start the instances.""" Log("Stopping and starting instances") + self.StartBatch(True) for instance in self.instances: Log("instance %s" % instance, indent=1) op1 = self.StopInstanceOp(instance) @@ -568,6 +648,7 @@ class Burner(object): def BurnRemove(self): """Remove the instances.""" + self.StartBatch(False) Log("Removing instances") for instance in self.to_rem: Log("instance %s" % instance, indent=1) @@ -594,14 +675,15 @@ class Burner(object): op_rename2 = self.RenameInstanceOp(rename, instance) op_start1 = self.StartInstanceOp(rename) op_start2 = self.StartInstanceOp(instance) - self.ExecOp(op_stop1, op_rename1, op_start1) + self.ExecOp(False, op_stop1, op_rename1, op_start1) self._CheckInstanceAlive(rename) - self.ExecOp(op_stop2, op_rename2, op_start2) + self.ExecOp(False, op_stop2, op_rename2, op_start2) self._CheckInstanceAlive(instance) def BurnReinstall(self): """Reinstall the instances.""" Log("Reinstalling instances") + self.StartBatch(True) for instance in self.instances: Log("instance %s" % instance, indent=1) op1 = self.StopInstanceOp(instance) @@ -621,6 +703,7 @@ class Burner(object): def BurnReboot(self): """Reboot the instances.""" Log("Rebooting instances") + self.StartBatch(True) for instance in self.instances: Log("instance %s" % instance, indent=1) ops = [] @@ -640,6 +723,7 @@ class Burner(object): def BurnActivateDisks(self): """Activate and deactivate disks of the instances.""" Log("Activating/deactivating disks") + self.StartBatch(True) for instance in self.instances: Log("instance %s" % instance, indent=1) op_start = self.StartInstanceOp(instance) @@ -657,6 +741,7 @@ class Burner(object): def BurnAddRemoveDisks(self): """Add and remove an extra disk for the instances.""" Log("Adding and removing disks") + self.StartBatch(False) for instance in self.instances: Log("instance %s" % instance, indent=1) op_add = opcodes.OpSetInstanceParams(\ @@ -676,6 +761,7 @@ class Burner(object): def BurnAddRemoveNICs(self): """Add and remove an extra NIC for the instances.""" Log("Adding and removing NICs") + self.StartBatch(False) for instance in self.instances: Log("instance %s" % instance, indent=1) op_add = opcodes.OpSetInstanceParams(\