Commit 6a373640 authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

Implement submitting jobs from logical units



The design details can be seen in the design document
(doc/design-lu-generated-jobs.rst).
Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarRené Nussbaumer <rn@google.com>
parent 5e9bcdf4
......@@ -74,7 +74,28 @@ def _SupportsOob(cfg, node):
return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM]
# End types
class ResultWithJobs:
"""Data container for LU results with jobs.
Instances of this class returned from L{LogicalUnit.Exec} will be recognized
by L{mcpu.Processor._ProcessResult}. The latter will then submit the jobs
contained in the C{jobs} attribute and include the job IDs in the opcode
result.
"""
def __init__(self, jobs, **kwargs):
"""Initializes this class.
Additional return values can be specified as keyword arguments.
@type jobs: list of lists of L{opcode.OpCode}
@param jobs: A list of lists of opcode objects
"""
self.jobs = jobs
self.other = kwargs
class LogicalUnit(object):
"""Logical Unit base class.
......
......@@ -529,6 +529,9 @@ DISK_TRANSFER_CONNECT_TIMEOUT = 60
# Disk index separator
DISK_SEPARATOR = _autoconf.DISK_SEPARATOR
#: Key for job IDs in opcode result
JOB_IDS_KEY = "jobs"
# runparts results
(RUNPARTS_SKIP,
RUNPARTS_RUN,
......
......@@ -540,6 +540,15 @@ class _OpExecCallbacks(mcpu.OpExecCbBase):
# Cancel here if we were asked to
self._CheckCancel()
def SubmitManyJobs(self, jobs):
"""Submits jobs for processing.
See L{JobQueue.SubmitManyJobs}.
"""
# Locking is done in job queue
return self._queue.SubmitManyJobs(jobs)
class _JobChangesChecker(object):
def __init__(self, fields, prev_job_info, prev_log_serial):
......
......@@ -144,6 +144,14 @@ class OpExecCbBase: # pylint: disable-msg=W0232
"""
def SubmitManyJobs(self, jobs):
"""Submits jobs for processing.
See L{jqueue.JobQueue.SubmitManyJobs}.
"""
raise NotImplementedError
def _LUNameForOpName(opname):
"""Computes the LU name for a given OpCode name.
......@@ -209,6 +217,24 @@ class Processor(object):
return acquired
def _ProcessResult(self, result):
"""
"""
if isinstance(result, cmdlib.ResultWithJobs):
# Submit jobs
job_submission = self._cbs.SubmitManyJobs(result.jobs)
# Build dictionary
result = result.other
assert constants.JOB_IDS_KEY not in result, \
"Key '%s' found in additional return values" % constants.JOB_IDS_KEY
result[constants.JOB_IDS_KEY] = job_submission
return result
def _ExecLU(self, lu):
"""Logical Unit execution sequence.
......@@ -229,7 +255,7 @@ class Processor(object):
return lu.dry_run_result
try:
result = lu.Exec(self.Log)
result = self._ProcessResult(lu.Exec(self.Log))
h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
self.Log, result)
......
......@@ -1423,6 +1423,7 @@ class OpTestDummy(OpCode):
("result", ht.NoDefault, ht.NoType, None),
("messages", ht.NoDefault, ht.NoType, None),
("fail", ht.NoDefault, ht.NoType, None),
("submit_jobs", None, ht.NoType, None),
]
WITH_LU = False
......
......@@ -428,6 +428,9 @@ class _FakeQueueForProc:
def __init__(self):
self._acquired = False
self._updates = []
self._submitted = []
self._submit_count = itertools.count(1000)
def IsAcquired(self):
return self._acquired
......@@ -435,6 +438,9 @@ class _FakeQueueForProc:
def GetNextUpdate(self):
return self._updates.pop(0)
def GetNextSubmittedJob(self):
return self._submitted.pop(0)
def acquire(self, shared=0):
assert shared == 1
self._acquired = True
......@@ -447,6 +453,12 @@ class _FakeQueueForProc:
assert self._acquired, "Lock not acquired while updating job"
self._updates.append((job, bool(replicate)))
def SubmitManyJobs(self, jobs):
assert not self._acquired, "Lock acquired while submitting jobs"
job_ids = [self._submit_count.next() for _ in jobs]
self._submitted.extend(zip(job_ids, jobs))
return job_ids
class _FakeExecOpCodeForProc:
def __init__(self, queue, before_start, after_start):
......@@ -473,6 +485,9 @@ class _FakeExecOpCodeForProc:
if op.fail:
raise errors.OpExecError("Error requested (%s)" % op.result)
if hasattr(op, "submit_jobs") and op.submit_jobs is not None:
return cbs.SubmitManyJobs(op.submit_jobs)
return op.result
......@@ -1065,6 +1080,90 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
self.assertFalse(job.GetLogEntries(count))
self.assertFalse(job.GetLogEntries(count + 3))
def testSubmitManyJobs(self):
queue = _FakeQueueForProc()
job_id = 15656
ops = [
opcodes.OpTestDummy(result="Res0", fail=False,
submit_jobs=[]),
opcodes.OpTestDummy(result="Res1", fail=False,
submit_jobs=[
[opcodes.OpTestDummy(result="r1j0", fail=False)],
]),
opcodes.OpTestDummy(result="Res2", fail=False,
submit_jobs=[
[opcodes.OpTestDummy(result="r2j0o0", fail=False),
opcodes.OpTestDummy(result="r2j0o1", fail=False),
opcodes.OpTestDummy(result="r2j0o2", fail=False),
opcodes.OpTestDummy(result="r2j0o3", fail=False)],
[opcodes.OpTestDummy(result="r2j1", fail=False)],
[opcodes.OpTestDummy(result="r2j3o0", fail=False),
opcodes.OpTestDummy(result="r2j3o1", fail=False)],
]),
]
# Create job
job = self._CreateJob(queue, job_id, ops)
def _BeforeStart(timeout, priority):
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
self.assertFalse(queue.IsAcquired())
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
self.assertFalse(job.cur_opctx)
def _AfterStart(op, cbs):
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
self.assertFalse(queue.IsAcquired())
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
self.assertFalse(job.cur_opctx)
# Job is running, cancelling shouldn't be possible
(success, _) = job.Cancel()
self.assertFalse(success)
opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
for idx in range(len(ops)):
self.assertRaises(IndexError, queue.GetNextUpdate)
result = jqueue._JobProcessor(queue, opexec, job)()
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
if idx == len(ops) - 1:
# Last opcode
self.assert_(result)
else:
self.assertFalse(result)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
self.assert_(job.start_timestamp)
self.assertFalse(job.end_timestamp)
self.assertRaises(IndexError, queue.GetNextUpdate)
for idx, submitted_ops in enumerate(job_ops
for op in ops
for job_ops in op.submit_jobs):
self.assertEqual(queue.GetNextSubmittedJob(),
(1000 + idx, submitted_ops))
self.assertRaises(IndexError, queue.GetNextSubmittedJob)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
self.assertEqual(job.GetInfo(["opresult"]),
[[[], [1000], [1001, 1002, 1003]]])
self.assertEqual(job.GetInfo(["opstatus"]),
[len(job.ops) * [constants.OP_STATUS_SUCCESS]])
self._GenericCheckJob(job)
# Finished jobs can't be processed any further
self.assertRaises(errors.ProgrammerError,
jqueue._JobProcessor(queue, opexec, job))
class _FakeTimeoutStrategy:
def __init__(self, timeouts):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment