Commit 73ff3118 authored by Iustin Pop's avatar Iustin Pop
Browse files

burnin: Implement retryable operations



Some burnin steps are idempotent: e.g. reinstalling an instance (from
burning p.o.v.) can be done multiple times without any side-effects that
would affect later burnin steps. As such, failing the whole burnin
process due a reinstall failure is undesirable.

This patch modifies burnin by marking each opcode (in case of individual
execution) and job set retryable or not. Retryable actions will be
retried up to a number of times, after which we give up and return
failure.

One side-effect is that in case of full-failure in retryable job sets we
lose the original exception (but we do log its string format), so we
have a little bit less information in this case.
Signed-off-by: default avatarIustin Pop <iustin@google.com>
Reviewed-by: default avatarGuido Trotter <ultrotter@google.com>
parent 699d856f
......@@ -41,11 +41,16 @@ from ganeti import utils
USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...")
MAX_RETRIES = 3
class InstanceDown(Exception):
"""The checked instance was not up"""
class BurninFailure(Exception):
"""Failure detected during burning"""
def Usage():
"""Shows program usage information and exits the program."""
......@@ -106,6 +111,9 @@ class Burner(object):
self.to_rem = []
self.queued_ops = []
self.opts = None
self.queue_retry = False
self.disk_count = self.disk_growth = self.disk_size = None
self.hvp = self.bep = None
self.ParseOptions()
self.cl = cli.GetClient()
self.GetState()
......@@ -125,7 +133,39 @@ class Burner(object):
if self.opts.verbose:
Log(msg, indent=3)
def ExecOp(self, *ops):
def MaybeRetry(self, retry_count, msg, fn, *args):
"""Possibly retry a given function execution.
@type retry_count: int
@param retry_count: retry counter:
- 0: non-retryable action
- 1: last retry for a retryable action
- MAX_RETRIES: original try for a retryable action
@type msg: str
@param msg: the kind of the operation
@type fn: callable
@param fn: the function to be called
"""
try:
val = fn(*args)
if retry_count > 0 and retry_count < MAX_RETRIES:
Log("Idempotent %s succeeded after %d retries" %
(msg, MAX_RETRIES - retry_count))
return val
except Exception, err:
if retry_count == 0:
Log("Non-idempotent %s failed, aborting" % (msg, ))
raise
elif retry_count == 1:
Log("Idempotent %s repeated failure, aborting" % (msg, ))
raise
else:
Log("Idempotent %s failed, retry #%d/%d: %s" %
(msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err))
self.MaybeRetry(retry_count - 1, msg, fn, *args)
def _ExecOp(self, *ops):
"""Execute one or more opcodes and manage the exec buffer.
@result: if only opcode has been passed, we return its result;
......@@ -139,20 +179,48 @@ class Burner(object):
else:
return results
def ExecOp(self, retry, *ops):
"""Execute one or more opcodes and manage the exec buffer.
@result: if only opcode has been passed, we return its result;
otherwise we return the list of results
"""
if retry:
rval = MAX_RETRIES
else:
rval = 0
return self.MaybeRetry(rval, "opcode", self._ExecOp, *ops)
def ExecOrQueue(self, name, *ops):
"""Execute an opcode and manage the exec buffer."""
if self.opts.parallel:
self.queued_ops.append((ops, name))
else:
return self.ExecOp(*ops)
return self.ExecOp(self.queue_retry, *ops)
def StartBatch(self, retry):
"""Start a new batch of jobs.
@param retry: whether this is a retryable batch
"""
self.queued_ops = []
self.queue_retry = retry
def CommitQueue(self):
"""Execute all submitted opcodes in case of parallel burnin"""
if not self.opts.parallel:
return
if self.queue_retry:
rval = MAX_RETRIES
else:
rval = 0
try:
results = self.ExecJobSet(self.queued_ops)
results = self.MaybeRetry(rval, "jobset", self.ExecJobSet,
self.queued_ops)
finally:
self.queued_ops = []
return results
......@@ -171,8 +239,12 @@ class Burner(object):
results = []
for jid, (_, iname) in zip(job_ids, jobs):
Log("waiting for job %s for %s" % (jid, iname), indent=2)
results.append(cli.PollJob(jid, cl=self.cl, feedback_fn=self.Feedback))
try:
results.append(cli.PollJob(jid, cl=self.cl, feedback_fn=self.Feedback))
except Exception, err:
Log("Job for %s failed: %s" % (iname, err))
if len(results) != len(jobs):
raise BurninFailure()
return results
def ParseOptions(self):
......@@ -325,14 +397,14 @@ class Burner(object):
try:
op = opcodes.OpQueryNodes(output_fields=["name", "offline", "drained"],
names=names, use_locking=True)
result = self.ExecOp(op)
result = self.ExecOp(True, op)
except errors.GenericError, err:
err_code, msg = cli.FormatError(err)
Err(msg, exit_code=err_code)
self.nodes = [data[0] for data in result if not (data[1] or data[2])]
result = self.ExecOp(opcodes.OpDiagnoseOS(output_fields=["name", "valid"],
names=[]))
op_diagos = opcodes.OpDiagnoseOS(output_fields=["name", "valid"], names=[])
result = self.ExecOp(True, op_diagos)
if not result:
Err("Can't get the OS list")
......@@ -347,6 +419,7 @@ class Burner(object):
"""Create the given instances.
"""
self.StartBatch(False)
self.to_rem = []
mytor = izip(cycle(self.nodes),
islice(cycle(self.nodes), 1, None),
......@@ -396,6 +469,7 @@ class Burner(object):
def BurnGrowDisks(self):
"""Grow both the os and the swap disks by the requested amount, if any."""
Log("Growing disks")
self.StartBatch(False)
for instance in self.instances:
Log("instance %s" % instance, indent=1)
for idx, growth in enumerate(self.disk_growth):
......@@ -409,6 +483,7 @@ class Burner(object):
def BurnReplaceDisks1D8(self):
"""Replace disks on primary and secondary for drbd8."""
Log("Replacing disks on the same nodes")
self.StartBatch(True)
for instance in self.instances:
Log("instance %s" % instance, indent=1)
ops = []
......@@ -424,6 +499,7 @@ class Burner(object):
def BurnReplaceDisks2(self):
"""Replace secondary node."""
Log("Changing the secondary node")
self.StartBatch(True)
mode = constants.REPLACE_DISK_CHG
mytor = izip(islice(cycle(self.nodes), 2, None),
......@@ -447,6 +523,7 @@ class Burner(object):
def BurnFailover(self):
"""Failover the instances."""
Log("Failing over instances")
self.StartBatch(False)
for instance in self.instances:
Log("instance %s" % instance, indent=1)
op = opcodes.OpFailoverInstance(instance_name=instance,
......@@ -460,6 +537,7 @@ class Burner(object):
def BurnMigrate(self):
"""Migrate the instances."""
Log("Migrating instances")
self.StartBatch(False)
for instance in self.instances:
Log("instance %s" % instance, indent=1)
op1 = opcodes.OpMigrateInstance(instance_name=instance, live=True,
......@@ -476,6 +554,7 @@ class Burner(object):
"""
Log("Exporting and re-importing instances")
self.StartBatch(False)
mytor = izip(cycle(self.nodes),
islice(cycle(self.nodes), 1, None),
islice(cycle(self.nodes), 2, None),
......@@ -486,7 +565,7 @@ class Burner(object):
# read the full name of the instance
nam_op = opcodes.OpQueryInstances(output_fields=["name"],
names=[instance], use_locking=True)
full_name = self.ExecOp(nam_op)[0][0]
full_name = self.ExecOp(False, nam_op)[0][0]
if self.opts.iallocator:
pnode = snode = None
......@@ -555,6 +634,7 @@ class Burner(object):
def BurnStopStart(self):
"""Stop/start the instances."""
Log("Stopping and starting instances")
self.StartBatch(True)
for instance in self.instances:
Log("instance %s" % instance, indent=1)
op1 = self.StopInstanceOp(instance)
......@@ -568,6 +648,7 @@ class Burner(object):
def BurnRemove(self):
"""Remove the instances."""
self.StartBatch(False)
Log("Removing instances")
for instance in self.to_rem:
Log("instance %s" % instance, indent=1)
......@@ -594,14 +675,15 @@ class Burner(object):
op_rename2 = self.RenameInstanceOp(rename, instance)
op_start1 = self.StartInstanceOp(rename)
op_start2 = self.StartInstanceOp(instance)
self.ExecOp(op_stop1, op_rename1, op_start1)
self.ExecOp(False, op_stop1, op_rename1, op_start1)
self._CheckInstanceAlive(rename)
self.ExecOp(op_stop2, op_rename2, op_start2)
self.ExecOp(False, op_stop2, op_rename2, op_start2)
self._CheckInstanceAlive(instance)
def BurnReinstall(self):
"""Reinstall the instances."""
Log("Reinstalling instances")
self.StartBatch(True)
for instance in self.instances:
Log("instance %s" % instance, indent=1)
op1 = self.StopInstanceOp(instance)
......@@ -621,6 +703,7 @@ class Burner(object):
def BurnReboot(self):
"""Reboot the instances."""
Log("Rebooting instances")
self.StartBatch(True)
for instance in self.instances:
Log("instance %s" % instance, indent=1)
ops = []
......@@ -640,6 +723,7 @@ class Burner(object):
def BurnActivateDisks(self):
"""Activate and deactivate disks of the instances."""
Log("Activating/deactivating disks")
self.StartBatch(True)
for instance in self.instances:
Log("instance %s" % instance, indent=1)
op_start = self.StartInstanceOp(instance)
......@@ -657,6 +741,7 @@ class Burner(object):
def BurnAddRemoveDisks(self):
"""Add and remove an extra disk for the instances."""
Log("Adding and removing disks")
self.StartBatch(False)
for instance in self.instances:
Log("instance %s" % instance, indent=1)
op_add = opcodes.OpSetInstanceParams(\
......@@ -676,6 +761,7 @@ class Burner(object):
def BurnAddRemoveNICs(self):
"""Add and remove an extra NIC for the instances."""
Log("Adding and removing NICs")
self.StartBatch(False)
for instance in self.instances:
Log("instance %s" % instance, indent=1)
op_add = opcodes.OpSetInstanceParams(\
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment