Commit ebb2a2a3 authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

Improve jqueue unittests



- Verify job file updates
- Ensure queue lock is released while executing opcode
Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarIustin Pop <iustin@google.com>
parent e7441f80
......@@ -427,10 +427,14 @@ class TestQueuedJob(unittest.TestCase):
class _FakeQueueForProc:
def __init__(self):
self._acquired = False
self._updates = []
def IsAcquired(self):
return self._acquired
def GetNextUpdate(self):
return self._updates.pop(0)
def acquire(self, shared=0):
assert shared == 1
self._acquired = True
......@@ -439,18 +443,21 @@ class _FakeQueueForProc:
assert self._acquired
self._acquired = False
def UpdateJobUnlocked(self, job, replicate=None):
# TODO: Ensure job is updated at the correct places
pass
def UpdateJobUnlocked(self, job, replicate=True):
assert self._acquired, "Lock not acquired while updating job"
self._updates.append((job, bool(replicate)))
class _FakeExecOpCodeForProc:
def __init__(self, before_start, after_start):
def __init__(self, queue, before_start, after_start):
self._queue = queue
self._before_start = before_start
self._after_start = after_start
def __call__(self, op, cbs, timeout=None, priority=None):
assert isinstance(op, opcodes.OpTestDummy)
assert not self._queue.IsAcquired(), \
"Queue lock not released when executing opcode"
if self._before_start:
self._before_start(timeout, priority)
......@@ -460,6 +467,9 @@ class _FakeExecOpCodeForProc:
if self._after_start:
self._after_start(op, cbs)
# Check again after the callbacks
assert not self._queue.IsAcquired()
if op.fail:
raise errors.OpExecError("Error requested (%s)" % op.result)
......@@ -507,10 +517,15 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
job = self._CreateJob(queue, job_id, ops)
def _BeforeStart(timeout, priority):
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
self.assertFalse(queue.IsAcquired())
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
def _AfterStart(op, cbs):
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
self.assertFalse(queue.IsAcquired())
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
......@@ -518,10 +533,13 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
(success, _) = job.Cancel()
self.assertFalse(success)
opexec = _FakeExecOpCodeForProc(_BeforeStart, _AfterStart)
opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
for idx in range(len(ops)):
self.assertRaises(IndexError, queue.GetNextUpdate)
result = jqueue._JobProcessor(queue, opexec, job)()
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
if idx == len(ops) - 1:
# Last opcode
self.assert_(result)
......@@ -532,6 +550,8 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
self.assert_(job.start_timestamp)
self.assertFalse(job.end_timestamp)
self.assertRaises(IndexError, queue.GetNextUpdate)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
self.assertEqual(job.GetInfo(["opresult"]),
......@@ -568,10 +588,18 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
# Create job
job = self._CreateJob(queue, job_id, ops)
opexec = _FakeExecOpCodeForProc(None, None)
opexec = _FakeExecOpCodeForProc(queue, None, None)
for idx in range(len(ops)):
self.assertRaises(IndexError, queue.GetNextUpdate)
result = jqueue._JobProcessor(queue, opexec, job)()
# queued to waitlock
self.assertEqual(queue.GetNextUpdate(), (job, True))
# waitlock to running
self.assertEqual(queue.GetNextUpdate(), (job, True))
# Opcode result
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
if idx in (failfrom, len(ops) - 1):
# Last opcode
......@@ -582,6 +610,8 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
self.assertRaises(IndexError, queue.GetNextUpdate)
# Check job status
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
self.assertEqual(job.GetInfo(["id"]), [job_id])
......@@ -631,10 +661,12 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
(success, _) = job.Cancel()
self.assert_(success)
self.assertRaises(IndexError, queue.GetNextUpdate)
self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELED
for op in job.ops))
opexec = _FakeExecOpCodeForProc(None, None)
opexec = _FakeExecOpCodeForProc(queue, None, None)
jqueue._JobProcessor(queue, opexec, job)()
# Check result
......@@ -661,6 +693,8 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
def _BeforeStart(timeout, priority):
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
self.assertFalse(queue.IsAcquired())
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
......@@ -670,14 +704,20 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
for op in job.ops))
self.assertRaises(IndexError, queue.GetNextUpdate)
def _AfterStart(op, cbs):
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
self.assertFalse(queue.IsAcquired())
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
opexec = _FakeExecOpCodeForProc(_BeforeStart, _AfterStart)
opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
jqueue._JobProcessor(queue, opexec, job)()
self.assertRaises(IndexError, queue.GetNextUpdate)
self.assert_(jqueue._JobProcessor(queue, opexec, job)())
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
# Check result
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
......@@ -719,7 +759,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
def _AfterStart(op, cbs):
self.fail("Should not reach this")
opexec = _FakeExecOpCodeForProc(_BeforeStart, _AfterStart)
opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
self.assert_(jqueue._JobProcessor(queue, opexec, job)())
......@@ -747,7 +787,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
opexec = _FakeExecOpCodeForProc(None, None)
opexec = _FakeExecOpCodeForProc(queue, None, None)
# Run one opcode
self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
......@@ -783,7 +823,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
# program was restarted
queue = _FakeQueueForProc()
opexec = _FakeExecOpCodeForProc(None, None)
opexec = _FakeExecOpCodeForProc(queue, None, None)
for job_id, successcount in [(30697, 1), (2552, 4), (12489, 9)]:
ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
......@@ -816,7 +856,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
queue = _FakeQueueForProc()
opexec = _FakeExecOpCodeForProc(None, None)
opexec = _FakeExecOpCodeForProc(queue, None, None)
for remaining in reversed(range(len(job.ops) - successcount)):
result = jqueue._JobProcessor(queue, opexec, job)()
......@@ -854,7 +894,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
ops = [opcodes.OpTestDummy(result="result", fail=False)]
queue = _FakeQueueForProc()
opexec = _FakeExecOpCodeForProc(None, None)
opexec = _FakeExecOpCodeForProc(queue, None, None)
# Create job
job = self._CreateJob(queue, 9571, ops)
......@@ -894,10 +934,14 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
job = self._CreateJob(queue, 29386, ops)
def _BeforeStart(timeout, priority):
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
self.assertFalse(queue.IsAcquired())
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
def _AfterStart(op, cbs):
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
self.assertFalse(queue.IsAcquired())
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
......@@ -905,15 +949,22 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
"too", "many", "arguments")
for (log_type, msg) in op.messages:
self.assertRaises(IndexError, queue.GetNextUpdate)
if log_type:
cbs.Feedback(log_type, msg)
else:
cbs.Feedback(msg)
# Check for job update without replication
self.assertEqual(queue.GetNextUpdate(), (job, False))
self.assertRaises(IndexError, queue.GetNextUpdate)
opexec = _FakeExecOpCodeForProc(_BeforeStart, _AfterStart)
opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
for remaining in reversed(range(len(job.ops))):
self.assertRaises(IndexError, queue.GetNextUpdate)
result = jqueue._JobProcessor(queue, opexec, job)()
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
if remaining == 0:
# Last opcode
......@@ -924,6 +975,8 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
self.assertRaises(IndexError, queue.GetNextUpdate)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
self.assertEqual(job.GetInfo(["opresult"]),
[[op.input.result for op in job.ops]])
......@@ -1002,6 +1055,8 @@ class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
def _BeforeStart(self, timeout, priority):
job = self.job
self.assertEqual(self.queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, self.queue.GetNextUpdate)
self.assertFalse(self.queue.IsAcquired())
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
......@@ -1035,6 +1090,8 @@ class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
def _AfterStart(self, op, cbs):
job = self.job
self.assertEqual(self.queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, self.queue.GetNextUpdate)
self.assertFalse(self.queue.IsAcquired())
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
......@@ -1110,7 +1167,8 @@ class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
self.opcounter = itertools.count(0)
opexec = _FakeExecOpCodeForProc(self._BeforeStart, self._AfterStart)
opexec = _FakeExecOpCodeForProc(self.queue, self._BeforeStart,
self._AfterStart)
tsf = self._NewTimeoutStrategy
self.assertFalse(self.done_lock_before_blocking)
......@@ -1119,7 +1177,10 @@ class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
proc = jqueue._JobProcessor(self.queue, opexec, job,
_timeout_strategy_factory=tsf)
self.assertRaises(IndexError, self.queue.GetNextUpdate)
result = proc(_nextop_fn=self._NextOpcode)
self.assertEqual(self.queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, self.queue.GetNextUpdate)
if result:
self.assertFalse(job.cur_opctx)
break
......@@ -1142,6 +1203,7 @@ class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
self.assertEqual(self.opcounter.next(), len(job.ops))
self.assert_(self.done_lock_before_blocking)
self.assertRaises(IndexError, self.queue.GetNextUpdate)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
self.assertEqual(job.GetInfo(["opresult"]),
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment