Skip to content
Snippets Groups Projects
Commit 9e49dfc5 authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

jqueue: Fix bug when cancelling jobs


If a job was cancelled while it was waiting for locks, an assertion
would've failed. This patch fixes the problem and provides a unit
test to check for this situation.

Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarIustin Pop <iustin@google.com>
parent 320d1daf
No related branches found
No related tags found
No related merge requests found
...@@ -921,7 +921,14 @@ class _JobProcessor(object): ...@@ -921,7 +921,14 @@ class _JobProcessor(object):
except mcpu.LockAcquireTimeout: except mcpu.LockAcquireTimeout:
assert timeout is not None, "Received timeout for blocking acquire" assert timeout is not None, "Received timeout for blocking acquire"
logging.debug("Couldn't acquire locks in %0.6fs", timeout) logging.debug("Couldn't acquire locks in %0.6fs", timeout)
assert op.status == constants.OP_STATUS_WAITLOCK
assert op.status in (constants.OP_STATUS_WAITLOCK,
constants.OP_STATUS_CANCELING)
# Was job cancelled while we were waiting for the lock?
if op.status == constants.OP_STATUS_CANCELING:
return (constants.OP_STATUS_CANCELING, None)
return (constants.OP_STATUS_QUEUED, None) return (constants.OP_STATUS_QUEUED, None)
except CancelJob: except CancelJob:
logging.exception("%s: Canceling job", opctx.log_prefix) logging.exception("%s: Canceling job", opctx.log_prefix)
......
...@@ -690,6 +690,50 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): ...@@ -690,6 +690,50 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
[[constants.OP_STATUS_CANCELED for _ in job.ops], [[constants.OP_STATUS_CANCELED for _ in job.ops],
["Job canceled by request" for _ in job.ops]]) ["Job canceled by request" for _ in job.ops]])
def testCancelWhileWaitlockWithTimeout(self):
queue = _FakeQueueForProc()
ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
for i in range(5)]
# Create job
job_id = 24314
job = self._CreateJob(queue, job_id, ops)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
def _BeforeStart(timeout, priority):
self.assertFalse(queue.IsAcquired())
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
# Mark as cancelled
(success, _) = job.Cancel()
self.assert_(success)
self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
for op in job.ops))
# Fake an acquire attempt timing out
raise mcpu.LockAcquireTimeout()
def _AfterStart(op, cbs):
self.fail("Should not reach this")
opexec = _FakeExecOpCodeForProc(_BeforeStart, _AfterStart)
self.assert_(jqueue._JobProcessor(queue, opexec, job)())
# Check result
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
self.assert_(job.start_timestamp)
self.assert_(job.end_timestamp)
self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
for op in job.ops))
self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
[[constants.OP_STATUS_CANCELED for _ in job.ops],
["Job canceled by request" for _ in job.ops]])
def testCancelWhileRunning(self): def testCancelWhileRunning(self):
# Tests canceling a job with finished opcodes and more, unprocessed ones # Tests canceling a job with finished opcodes and more, unprocessed ones
queue = _FakeQueueForProc() queue = _FakeQueueForProc()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment