Commit ec5c88dc authored by Iustin Pop's avatar Iustin Pop
Browse files

Start implementation of parallel burnin

This patch introduces a simple framework for executing jobs in parallel
in burnin (the ExecJobSet function) and the "--parallel" command line
flag.

The patch also changes the instance creation to run in parallel when the
above flag is given. Error handling/instance removal is currently flacky
with this options if there are errors in the instance creation.

We also modify burnin to reuse a single client.

Reviewed-by: imsnah
parent e0ec0ff6
...@@ -69,6 +69,7 @@ class Burner(object): ...@@ -69,6 +69,7 @@ class Burner(object):
self.instances = [] self.instances = []
self.to_rem = [] self.to_rem = []
self.opts = None self.opts = None
self.cl = cli.GetClient()
self.ParseOptions() self.ParseOptions()
self.GetState() self.GetState()
...@@ -90,7 +91,25 @@ class Burner(object): ...@@ -90,7 +91,25 @@ class Burner(object):
def ExecOp(self, op): def ExecOp(self, op):
"""Execute an opcode and manage the exec buffer.""" """Execute an opcode and manage the exec buffer."""
self.ClearFeedbackBuf() self.ClearFeedbackBuf()
return cli.SubmitOpCode(op, feedback_fn=self.Feedback) return cli.SubmitOpCode(op, feedback_fn=self.Feedback, cl=self.cl)
def ExecJobSet(self, jobs):
"""Execute a set of jobs and return once all are done.
The method will return the list of results, if all jobs are
successfull. Otherwise, OpExecError will be raised from within
cli.py.
"""
self.ClearFeedbackBuf()
job_ids = [cli.SendJob(job, cl=self.cl) for job in jobs]
Log("- Submitted job IDs %s" % ", ".join(job_ids))
results = []
for jid in job_ids:
Log("- Waiting for job %s" % jid)
results.append(cli.PollJob(jid, cl=self.cl, feedback_fn=self.Feedback))
return results
def ParseOptions(self): def ParseOptions(self):
"""Parses the command line options. """Parses the command line options.
...@@ -149,6 +168,10 @@ class Burner(object): ...@@ -149,6 +168,10 @@ class Burner(object):
help="Perform the allocation using an iallocator" help="Perform the allocation using an iallocator"
" instead of fixed node spread (node restrictions no" " instead of fixed node spread (node restrictions no"
" longer apply, therefore -n/--nodes must not be used") " longer apply, therefore -n/--nodes must not be used")
parser.add_option("-p", "--parallel", default=False, action="store_true",
dest="parallel",
help="Enable parallelization of some operations in"
" order to speed burnin or to test granular locking")
options, args = parser.parse_args() options, args = parser.parse_args()
if len(args) < 1 or options.os is None: if len(args) < 1 or options.os is None:
...@@ -206,6 +229,7 @@ class Burner(object): ...@@ -206,6 +229,7 @@ class Burner(object):
mytor = izip(cycle(self.nodes), mytor = izip(cycle(self.nodes),
islice(cycle(self.nodes), 1, None), islice(cycle(self.nodes), 1, None),
self.instances) self.instances)
jobset = []
for pnode, snode, instance in mytor: for pnode, snode, instance in mytor:
if self.opts.iallocator: if self.opts.iallocator:
pnode = snode = None pnode = snode = None
...@@ -240,8 +264,16 @@ class Burner(object): ...@@ -240,8 +264,16 @@ class Burner(object):
hvm_nic_type=constants.HT_HVM_NIC_RTL8139, hvm_nic_type=constants.HT_HVM_NIC_RTL8139,
hvm_disk_type=constants.HT_HVM_DEV_IOEMU) hvm_disk_type=constants.HT_HVM_DEV_IOEMU)
self.ExecOp(op) if self.opts.parallel:
self.to_rem.append(instance) jobset.append([op])
# FIXME: here we should not append to to_rem uncoditionally,
# but only when the job is successful
self.to_rem.append(instance)
else:
self.ExecOp(op)
self.to_rem.append(instance)
if self.opts.parallel:
self.ExecJobSet(jobset)
def ReplaceDisks1D8(self): def ReplaceDisks1D8(self):
"""Replace disks on primary and secondary for drbd8.""" """Replace disks on primary and secondary for drbd8."""
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment