Commit be760ba8 authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

jqueue: Change model from per-job to per-opcode processing



In order to support priorities, the processing of jobs needs to be
changed. Instead of processing jobs as a whole, the code is changed to
process one opcode at a time and then return to the queue. See the
Ganeti 2.3 design document for details.

This patch does not yet use priorities for acquiring locks.

The enclosed unittests increase the test coverage of jqueue.py from
about 34% to 58%. Please note that they also test some parts not added
by this patch, but testing them became only possible with some
infrastructure added by this patch. For the first time, many
implications and assumptions for the job queue are codified in these
unittests.
Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarIustin Pop <iustin@google.com>
parent 7b5c4a69
......@@ -176,7 +176,7 @@ class _QueuedJob(object):
"""
# pylint: disable-msg=W0212
__slots__ = ["queue", "id", "ops", "log_serial",
__slots__ = ["queue", "id", "ops", "log_serial", "current_op",
"received_timestamp", "start_timestamp", "end_timestamp",
"__weakref__"]
......@@ -203,6 +203,8 @@ class _QueuedJob(object):
self.start_timestamp = None
self.end_timestamp = None
self.current_op = None
def __repr__(self):
status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
"id=%s" % self.id,
......@@ -237,6 +239,8 @@ class _QueuedJob(object):
obj.log_serial = max(obj.log_serial, log_entry[0])
obj.ops.append(op)
obj.current_op = None
return obj
def Serialize(self):
......@@ -734,6 +738,211 @@ def _EncodeOpError(err):
return errors.EncodeException(to_encode)
class _JobProcessor(object):
def __init__(self, queue, opexec_fn, job):
"""Initializes this class.
"""
self.queue = queue
self.opexec_fn = opexec_fn
self.job = job
@staticmethod
def _FindNextOpcode(job):
"""Locates the next opcode to run.
@type job: L{_QueuedJob}
@param job: Job object
"""
# Create some sort of a cache to speed up locating next opcode for future
# lookups
# TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
# pending and one for processed ops.
if job.current_op is None:
job.current_op = enumerate(job.ops)
# Find next opcode to run
while True:
try:
(idx, op) = job.current_op.next()
except StopIteration:
raise errors.ProgrammerError("Called for a finished job")
if op.status == constants.OP_STATUS_RUNNING:
# Found an opcode already marked as running
raise errors.ProgrammerError("Called for job marked as running")
log_prefix = "Op %s/%s" % (idx + 1, len(job.ops))
summary = op.input.Summary()
if op.status == constants.OP_STATUS_CANCELED:
# Cancelled jobs are handled by the caller
assert not compat.any(i.status != constants.OP_STATUS_CANCELED
for i in job.ops[idx:])
elif op.status in constants.OPS_FINALIZED:
# This is a job that was partially completed before master daemon
# shutdown, so it can be expected that some opcodes are already
# completed successfully (if any did error out, then the whole job
# should have been aborted and not resubmitted for processing).
logging.info("%s: opcode %s already processed, skipping",
log_prefix, summary)
continue
return (idx, op, log_prefix, summary)
@staticmethod
def _MarkWaitlock(job, op):
"""Marks an opcode as waiting for locks.
The job's start timestamp is also set if necessary.
@type job: L{_QueuedJob}
@param job: Job object
@type job: L{_QueuedOpCode}
@param job: Opcode object
"""
assert op in job.ops
op.status = constants.OP_STATUS_WAITLOCK
op.result = None
op.start_timestamp = TimeStampNow()
if job.start_timestamp is None:
job.start_timestamp = op.start_timestamp
def _ExecOpCodeUnlocked(self, log_prefix, op, summary):
"""Processes one opcode and returns the result.
"""
assert op.status == constants.OP_STATUS_WAITLOCK
try:
# Make sure not to hold queue lock while calling ExecOpCode
result = self.opexec_fn(op.input,
_OpExecCallbacks(self.queue, self.job, op))
except CancelJob:
logging.exception("%s: Canceling job", log_prefix)
assert op.status == constants.OP_STATUS_CANCELING
return (constants.OP_STATUS_CANCELING, None)
except Exception, err: # pylint: disable-msg=W0703
logging.exception("%s: Caught exception in %s", log_prefix, summary)
return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
else:
logging.debug("%s: %s successful", log_prefix, summary)
return (constants.OP_STATUS_SUCCESS, result)
def __call__(self):
"""Continues execution of a job.
@rtype: bool
@return: True if job is finished, False if processor needs to be called
again
"""
queue = self.queue
job = self.job
logging.debug("Processing job %s", job.id)
queue.acquire(shared=1)
try:
opcount = len(job.ops)
(opidx, op, log_prefix, op_summary) = self._FindNextOpcode(job)
# Consistency check
assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
constants.OP_STATUS_CANCELED)
for i in job.ops[opidx:])
assert op.status in (constants.OP_STATUS_QUEUED,
constants.OP_STATUS_WAITLOCK,
constants.OP_STATUS_CANCELED)
if op.status != constants.OP_STATUS_CANCELED:
# Prepare to start opcode
self._MarkWaitlock(job, op)
assert op.status == constants.OP_STATUS_WAITLOCK
assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
# Write to disk
queue.UpdateJobUnlocked(job)
logging.info("%s: opcode %s waiting for locks", log_prefix, op_summary)
queue.release()
try:
(op_status, op_result) = \
self._ExecOpCodeUnlocked(log_prefix, op, op_summary)
finally:
queue.acquire(shared=1)
# Finalize opcode
op.end_timestamp = TimeStampNow()
op.status = op_status
op.result = op_result
if op.status == constants.OP_STATUS_CANCELING:
assert not compat.any(i.status != constants.OP_STATUS_CANCELING
for i in job.ops[opidx:])
else:
assert op.status in constants.OPS_FINALIZED
# Ensure all opcodes so far have been successful
assert (opidx == 0 or
compat.all(i.status == constants.OP_STATUS_SUCCESS
for i in job.ops[:opidx]))
if op.status == constants.OP_STATUS_SUCCESS:
finalize = False
elif op.status == constants.OP_STATUS_ERROR:
# Ensure failed opcode has an exception as its result
assert errors.GetEncodedError(job.ops[opidx].result)
to_encode = errors.OpExecError("Preceding opcode failed")
job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
_EncodeOpError(to_encode))
finalize = True
# Consistency check
assert compat.all(i.status == constants.OP_STATUS_ERROR and
errors.GetEncodedError(i.result)
for i in job.ops[opidx:])
elif op.status == constants.OP_STATUS_CANCELING:
job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
"Job canceled by request")
finalize = True
elif op.status == constants.OP_STATUS_CANCELED:
finalize = True
else:
raise errors.ProgrammerError("Unknown status '%s'" % op.status)
# Finalizing or last opcode?
if finalize or opidx == (opcount - 1):
# All opcodes have been run, finalize job
job.end_timestamp = TimeStampNow()
# Write to disk. If the job status is final, this is the final write
# allowed. Once the file has been written, it can be archived anytime.
queue.UpdateJobUnlocked(job)
if finalize or opidx == (opcount - 1):
logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
return True
return False
finally:
queue.release()
class _JobQueueWorker(workerpool.BaseWorker):
"""The actual job workers.
......@@ -741,125 +950,23 @@ class _JobQueueWorker(workerpool.BaseWorker):
def RunTask(self, job): # pylint: disable-msg=W0221
"""Job executor.
This functions processes a job. It is closely tied to the _QueuedJob and
_QueuedOpCode classes.
This functions processes a job. It is closely tied to the L{_QueuedJob} and
L{_QueuedOpCode} classes.
@type job: L{_QueuedJob}
@param job: the job to be processed
"""
queue = job.queue
assert queue == self.pool.queue
self.SetTaskName("Job%s" % job.id)
logging.info("Processing job %s", job.id)
proc = mcpu.Processor(self.pool.queue.context, job.id)
queue = job.queue
try:
try:
count = len(job.ops)
for idx, op in enumerate(job.ops):
op_summary = op.input.Summary()
if op.status == constants.OP_STATUS_SUCCESS:
# this is a job that was partially completed before master
# daemon shutdown, so it can be expected that some opcodes
# are already completed successfully (if any did error
# out, then the whole job should have been aborted and not
# resubmitted for processing)
logging.info("Op %s/%s: opcode %s already processed, skipping",
idx + 1, count, op_summary)
continue
try:
logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
op_summary)
queue.acquire(shared=1)
try:
if op.status == constants.OP_STATUS_CANCELED:
logging.debug("Canceling opcode")
raise CancelJob()
assert op.status == constants.OP_STATUS_QUEUED
logging.debug("Opcode %s/%s waiting for locks",
idx + 1, count)
op.status = constants.OP_STATUS_WAITLOCK
op.result = None
op.start_timestamp = TimeStampNow()
if idx == 0: # first opcode
job.start_timestamp = op.start_timestamp
queue.UpdateJobUnlocked(job)
input_opcode = op.input
finally:
queue.release()
# Make sure not to hold queue lock while calling ExecOpCode
result = proc.ExecOpCode(input_opcode,
_OpExecCallbacks(queue, job, op))
queue.acquire(shared=1)
try:
logging.debug("Opcode %s/%s succeeded", idx + 1, count)
op.status = constants.OP_STATUS_SUCCESS
op.result = result
op.end_timestamp = TimeStampNow()
if idx == count - 1:
job.end_timestamp = TimeStampNow()
# Consistency check
assert compat.all(i.status == constants.OP_STATUS_SUCCESS
for i in job.ops)
queue.UpdateJobUnlocked(job)
finally:
queue.release()
logging.info("Op %s/%s: Successfully finished opcode %s",
idx + 1, count, op_summary)
except CancelJob:
# Will be handled further up
raise
except Exception, err:
queue.acquire(shared=1)
try:
try:
logging.debug("Opcode %s/%s failed", idx + 1, count)
op.status = constants.OP_STATUS_ERROR
op.result = _EncodeOpError(err)
op.end_timestamp = TimeStampNow()
logging.info("Op %s/%s: Error in opcode %s: %s",
idx + 1, count, op_summary, err)
to_encode = errors.OpExecError("Preceding opcode failed")
job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
_EncodeOpError(to_encode))
# Consistency check
assert compat.all(i.status == constants.OP_STATUS_SUCCESS
for i in job.ops[:idx])
assert compat.all(i.status == constants.OP_STATUS_ERROR and
errors.GetEncodedError(i.result)
for i in job.ops[idx:])
finally:
job.end_timestamp = TimeStampNow()
queue.UpdateJobUnlocked(job)
finally:
queue.release()
raise
except CancelJob:
queue.acquire(shared=1)
try:
job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
"Job canceled by request")
job.end_timestamp = TimeStampNow()
queue.UpdateJobUnlocked(job)
finally:
queue.release()
except errors.GenericError, err:
logging.exception("Ganeti exception")
except:
logging.exception("Unhandled exception")
finally:
status = job.CalcStatus()
logging.info("Finished job %s, status = %s", job.id, status)
proc = mcpu.Processor(queue.context, job.id)
if not _JobProcessor(queue, proc.ExecOpCode, job)():
# Schedule again
raise workerpool.DeferTask()
class _JobQueueWorkerPool(workerpool.WorkerPool):
......
......@@ -802,6 +802,18 @@ class OpTestJobqueue(OpCode):
]
class OpTestDummy(OpCode):
"""Utility opcode used by unittests.
"""
OP_ID = "OP_TEST_DUMMY"
__slots__ = [
"result",
"messages",
"fail",
]
OP_MAPPING = dict([(v.OP_ID, v) for v in globals().values()
if (isinstance(v, type) and issubclass(v, OpCode) and
hasattr(v, "OP_ID"))])
......@@ -347,5 +347,505 @@ class TestQueuedJob(unittest.TestCase):
self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20)
class _FakeQueueForProc:
def __init__(self):
self._acquired = False
def IsAcquired(self):
return self._acquired
def acquire(self, shared=0):
assert shared == 1
self._acquired = True
def release(self):
assert self._acquired
self._acquired = False
def UpdateJobUnlocked(self, job, replicate=None):
# TODO: Ensure job is updated at the correct places
pass
class _FakeExecOpCodeForProc:
def __init__(self, before_start, after_start):
self._before_start = before_start
self._after_start = after_start
def __call__(self, op, cbs):
assert isinstance(op, opcodes.OpTestDummy)
if self._before_start:
self._before_start()
cbs.NotifyStart()
if self._after_start:
self._after_start(op, cbs)
if op.fail:
raise errors.OpExecError("Error requested (%s)" % op.result)
return op.result
class TestJobProcessor(unittest.TestCase):
def _CreateJob(self, queue, job_id, ops):
job = jqueue._QueuedJob(queue, job_id, ops)
self.assertFalse(job.start_timestamp)
self.assertFalse(job.end_timestamp)
self.assertEqual(len(ops), len(job.ops))
self.assert_(compat.all(op.input == inp
for (op, inp) in zip(job.ops, ops)))
self.assertEqual(job.GetInfo(["ops"]), [[op.__getstate__() for op in ops]])
return job
def _GenericCheckJob(self, job):
assert compat.all(isinstance(op.input, opcodes.OpTestDummy)
for op in job.ops)
self.assertEqual(job.GetInfo(["opstart", "opexec", "opend"]),
[[op.start_timestamp for op in job.ops],
[op.exec_timestamp for op in job.ops],
[op.end_timestamp for op in job.ops]])
self.assertEqual(job.GetInfo(["received_ts", "start_ts", "end_ts"]),
[job.received_timestamp,
job.start_timestamp,
job.end_timestamp])
self.assert_(job.start_timestamp)
self.assert_(job.end_timestamp)
self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
def testSuccess(self):
queue = _FakeQueueForProc()
for (job_id, opcount) in [(25351, 1), (6637, 3),
(24644, 10), (32207, 100)]:
ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
for i in range(opcount)]
# Create job
job = self._CreateJob(queue, job_id, ops)
def _BeforeStart():
self.assertFalse(queue.IsAcquired())
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
def _AfterStart(op, cbs):
self.assertFalse(queue.IsAcquired())
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
# Job is running, cancelling shouldn't be possible
(success, _) = job.Cancel()
self.assertFalse(success)
opexec = _FakeExecOpCodeForProc(_BeforeStart, _AfterStart)
for idx in range(len(ops)):
result = jqueue._JobProcessor(queue, opexec, job)()
if idx == len(ops) - 1:
# Last opcode
self.assert_(result)
else:
self.assertFalse(result)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
self.assert_(job.start_timestamp)
self.assertFalse(job.end_timestamp)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
self.assertEqual(job.GetInfo(["opresult"]),
[[op.input.result for op in job.ops]])
self.assertEqual(job.GetInfo(["opstatus"]),
[len(job.ops) * [constants.OP_STATUS_SUCCESS]])
self.assert_(compat.all(op.start_timestamp and op.end_timestamp
for op in job.ops))
self._GenericCheckJob(job)
# Finished jobs can't be processed any further
self.assertRaises(errors.ProgrammerError,
jqueue._JobProcessor(queue, opexec, job))
def testOpcodeError(self):
queue = _FakeQueueForProc()
testdata = [
(17077, 1, 0, 0),
(1782, 5, 2, 2),
(18179, 10, 9, 9),
(4744, 10, 3, 8),
(23816, 100, 39, 45),
]
for (job_id, opcount, failfrom, failto) in testdata:
# Prepare opcodes
ops = [opcodes.OpTestDummy(result="Res%s" % i,
fail=(failfrom <= i and
i <= failto))
for i in range(opcount)]
# Create job
job = self._CreateJob(queue, job_id, ops)
opexec = _FakeExecOpCodeForProc(None, None)
for idx in range(len(ops)):
result = jqueue._JobProcessor(queue, opexec, job)()
if idx in (failfrom, len(ops) - 1):
# Last opcode
self.assert_(result)
break
self.assertFalse(result)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
# Check job status
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
self.assertEqual(job.GetInfo(["id"]), [job_id])
self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
# Check opcode status
data = zip(job.ops,
job.GetInfo(["opstatus"])[0],
job.GetInfo(["opresult"])[0])
for idx, (op, opstatus, opresult) in enumerate(data):
if idx < failfrom:
assert not op.input.fail
self.assertEqual(opstatus, constants.OP_STATUS_SUCCESS)
self.assertEqual(opresult, op.input.result)
elif idx <= failto:
assert op.input.fail
self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
else:
assert not op.input.fail
self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
self.assert_(compat.all(op.start_timestamp and op.end_timestamp
for op in job.ops[:failfrom]))
self._GenericCheckJob(job)
# Finished jobs can't be processed any further
self.assertRaises(errors.ProgrammerError,
jqueue._JobProcessor(queue, opexec, job))
def testCancelWhileInQueue(self):
queue = _FakeQueueForProc()
ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
for i in range(5)]
# Create job
job_id = 17045
job = self._CreateJob(queue, job_id, ops)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
# Mark as cancelled
(success, _) = job.Cancel()
self.assert_(success)
self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELED
for op in job.ops))
opexec = _FakeExecOpCodeForProc(None, None)
jqueue._JobProcessor(queue, opexec, job)()
# Check result
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
self.assertFalse(job.start_timestamp)