Commit 66bd7445 authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

jqueue: Fix potential race condition when cancelling queued jobs



When a job was cancelled, its status would be changed and the file
written again. Since this was a final status, the job file could be
moved anytime for archival. If the job was still in the queue, however,
it would be processed (not fully, just updating the “end_timestamp”
attribute) and written again. This was bad as it could leave the same
job in two different files.

With this patch the processor is changed to return early for finished
jobs. Cancelling a queued job will finalize it right away. Unittests are
updated.
Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarGuido Trotter <ultrotter@google.com>
parent 8785b71b
......@@ -426,6 +426,12 @@ class _QueuedJob(object):
op.result = result
not_marked = False
def Finalize(self):
"""Marks the job as finalized.
"""
self.end_timestamp = TimeStampNow()
def Cancel(self):
"""Marks job as canceled/-ing if possible.
......@@ -439,6 +445,7 @@ class _QueuedJob(object):
if status == constants.JOB_STATUS_QUEUED:
self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
"Job canceled by request")
self.Finalize()
return (True, "Job %s canceled" % self.id)
elif status == constants.JOB_STATUS_WAITLOCK:
......@@ -866,21 +873,15 @@ class _JobProcessor(object):
opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
timeout_strategy_factory)
if op.status == constants.OP_STATUS_CANCELED:
# Cancelled jobs are handled by the caller
assert not compat.any(i.status != constants.OP_STATUS_CANCELED
for i in job.ops[idx:])
elif op.status in constants.OPS_FINALIZED:
# This is a job that was partially completed before master daemon
# shutdown, so it can be expected that some opcodes are already
# completed successfully (if any did error out, then the whole job
# should have been aborted and not resubmitted for processing).
logging.info("%s: opcode %s already processed, skipping",
opctx.log_prefix, opctx.summary)
continue
if op.status not in constants.OPS_FINALIZED:
return opctx
return opctx
# This is a job that was partially completed before master daemon
# shutdown, so it can be expected that some opcodes are already
# completed successfully (if any did error out, then the whole job
# should have been aborted and not resubmitted for processing).
logging.info("%s: opcode %s already processed, skipping",
opctx.log_prefix, opctx.summary)
@staticmethod
def _MarkWaitlock(job, op):
......@@ -977,6 +978,10 @@ class _JobProcessor(object):
try:
opcount = len(job.ops)
# Don't do anything for finalized jobs
if job.CalcStatus() in constants.JOBS_FINALIZED:
return True
# Is a previous opcode still pending?
if job.cur_opctx:
opctx = job.cur_opctx
......@@ -990,20 +995,17 @@ class _JobProcessor(object):
# Consistency check
assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
constants.OP_STATUS_CANCELING,
constants.OP_STATUS_CANCELED)
constants.OP_STATUS_CANCELING)
for i in job.ops[opctx.index + 1:])
assert op.status in (constants.OP_STATUS_QUEUED,
constants.OP_STATUS_WAITLOCK,
constants.OP_STATUS_CANCELING,
constants.OP_STATUS_CANCELED)
constants.OP_STATUS_CANCELING)
assert (op.priority <= constants.OP_PRIO_LOWEST and
op.priority >= constants.OP_PRIO_HIGHEST)
if op.status not in (constants.OP_STATUS_CANCELING,
constants.OP_STATUS_CANCELED):
if op.status != constants.OP_STATUS_CANCELING:
assert op.status in (constants.OP_STATUS_QUEUED,
constants.OP_STATUS_WAITLOCK)
......@@ -1088,22 +1090,22 @@ class _JobProcessor(object):
"Job canceled by request")
finalize = True
elif op.status == constants.OP_STATUS_CANCELED:
finalize = True
else:
raise errors.ProgrammerError("Unknown status '%s'" % op.status)
# Finalizing or last opcode?
if finalize or opctx.index == (opcount - 1):
if opctx.index == (opcount - 1):
# Finalize on last opcode
finalize = True
if finalize:
# All opcodes have been run, finalize job
job.end_timestamp = TimeStampNow()
job.Finalize()
# Write to disk. If the job status is final, this is the final write
# allowed. Once the file has been written, it can be archived anytime.
queue.UpdateJobUnlocked(job)
if finalize or opctx.index == (opcount - 1):
if finalize:
logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
return True
......@@ -1775,6 +1777,10 @@ class JobQueue(object):
@param replicate: whether to replicate the change to remote nodes
"""
if __debug__:
finalized = job.CalcStatus() in constants.JOBS_FINALIZED
assert (finalized ^ (job.end_timestamp is None))
filename = self._GetJobPath(job.id)
data = serializer.DumpJson(job.Serialize(), indent=False)
logging.debug("Writing job %s to %s", job.id, filename)
......@@ -1832,6 +1838,8 @@ class JobQueue(object):
(success, msg) = job.Cancel()
if success:
# If the job was finalized (e.g. cancelled), this is the final write
# allowed. The job can be archived anytime.
self.UpdateJobUnlocked(job)
return (success, msg)
......
......@@ -565,9 +565,9 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
self._GenericCheckJob(job)
# Finished jobs can't be processed any further
self.assertRaises(errors.ProgrammerError,
jqueue._JobProcessor(queue, opexec, job))
# Calling the processor on a finished job should be a no-op
self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
self.assertRaises(IndexError, queue.GetNextUpdate)
def testOpcodeError(self):
queue = _FakeQueueForProc()
......@@ -643,9 +643,9 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
self._GenericCheckJob(job)
# Finished jobs can't be processed any further
self.assertRaises(errors.ProgrammerError,
jqueue._JobProcessor(queue, opexec, job))
# Calling the processor on a finished job should be a no-op
self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
self.assertRaises(IndexError, queue.GetNextUpdate)
def testCancelWhileInQueue(self):
queue = _FakeQueueForProc()
......@@ -665,9 +665,15 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
self.assertRaises(IndexError, queue.GetNextUpdate)
self.assertFalse(job.start_timestamp)
self.assertTrue(job.end_timestamp)
self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELED
for op in job.ops))
# Serialize to check for differences
before_proc = job.Serialize()
# Simulate processor called in workerpool
opexec = _FakeExecOpCodeForProc(queue, None, None)
self.assert_(jqueue._JobProcessor(queue, opexec, job)())
......@@ -675,13 +681,17 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
self.assertFalse(job.start_timestamp)
self.assert_(job.end_timestamp)
self.assertTrue(job.end_timestamp)
self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
for op in job.ops))
self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
[[constants.OP_STATUS_CANCELED for _ in job.ops],
["Job canceled by request" for _ in job.ops]])
# Must not have changed or written
self.assertEqual(before_proc, job.Serialize())
self.assertRaises(IndexError, queue.GetNextUpdate)
def testCancelWhileWaitlockInQueue(self):
queue = _FakeQueueForProc()
......@@ -903,6 +913,10 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
for remaining in reversed(range(len(job.ops) - successcount)):
result = jqueue._JobProcessor(queue, opexec, job)()
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
if remaining == 0:
# Last opcode
......@@ -913,6 +927,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
self.assertRaises(IndexError, queue.GetNextUpdate)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
self.assertEqual(job.GetInfo(["opresult"]),
......@@ -924,14 +939,14 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
self._GenericCheckJob(job)
# Finished jobs can't be processed any further
self.assertRaises(errors.ProgrammerError,
jqueue._JobProcessor(queue, opexec, job))
# Calling the processor on a finished job should be a no-op
self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
self.assertRaises(IndexError, queue.GetNextUpdate)
# ... also after being restored
job2 = jqueue._QueuedJob.Restore(queue, job.Serialize())
self.assertRaises(errors.ProgrammerError,
jqueue._JobProcessor(queue, opexec, job2))
self.assertTrue(jqueue._JobProcessor(queue, opexec, job2)())
self.assertRaises(IndexError, queue.GetNextUpdate)
def testProcessorOnRunningJob(self):
ops = [opcodes.OpTestDummy(result="result", fail=False)]
......@@ -1293,9 +1308,9 @@ class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
self.assert_(compat.all(op.start_timestamp and op.end_timestamp
for op in job.ops))
# Finished jobs can't be processed any further
self.assertRaises(errors.ProgrammerError,
jqueue._JobProcessor(self.queue, opexec, job))
# Calling the processor on a finished job should be a no-op
self.assertTrue(jqueue._JobProcessor(self.queue, opexec, job)())
self.assertRaises(IndexError, self.queue.GetNextUpdate)
if __name__ == "__main__":
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment