diff --git a/lib/jqueue.py b/lib/jqueue.py index 1a0c20d2eabbf95b86fd41159c4e00f1fcdaa867..93bc4eebfc79280b4738ab68785dab706d63540f 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -895,13 +895,28 @@ class _JobProcessor(object): """ assert op in job.ops + assert op.status in (constants.OP_STATUS_QUEUED, + constants.OP_STATUS_WAITLOCK) + + update = False - op.status = constants.OP_STATUS_WAITLOCK op.result = None - op.start_timestamp = TimeStampNow() + + if op.status == constants.OP_STATUS_QUEUED: + op.status = constants.OP_STATUS_WAITLOCK + update = True + + if op.start_timestamp is None: + op.start_timestamp = TimeStampNow() + update = True if job.start_timestamp is None: job.start_timestamp = op.start_timestamp + update = True + + assert op.status == constants.OP_STATUS_WAITLOCK + + return update def _ExecOpCodeUnlocked(self, opctx): """Processes one opcode and returns the result. @@ -929,7 +944,8 @@ class _JobProcessor(object): if op.status == constants.OP_STATUS_CANCELING: return (constants.OP_STATUS_CANCELING, None) - return (constants.OP_STATUS_QUEUED, None) + # Stay in waitlock while trying to re-acquire lock + return (constants.OP_STATUS_WAITLOCK, None) except CancelJob: logging.exception("%s: Canceling job", opctx.log_prefix) assert op.status == constants.OP_STATUS_CANCELING @@ -964,6 +980,7 @@ class _JobProcessor(object): # Is a previous opcode still pending? if job.cur_opctx: opctx = job.cur_opctx + job.cur_opctx = None else: if __debug__ and _nextop_fn: _nextop_fn() @@ -974,7 +991,7 @@ class _JobProcessor(object): # Consistency check assert compat.all(i.status in (constants.OP_STATUS_QUEUED, constants.OP_STATUS_CANCELED) - for i in job.ops[opctx.index:]) + for i in job.ops[opctx.index + 1:]) assert op.status in (constants.OP_STATUS_QUEUED, constants.OP_STATUS_WAITLOCK, @@ -985,13 +1002,13 @@ class _JobProcessor(object): if op.status != constants.OP_STATUS_CANCELED: # Prepare to start opcode - self._MarkWaitlock(job, op) + if self._MarkWaitlock(job, op): + # Write to disk + queue.UpdateJobUnlocked(job) assert op.status == constants.OP_STATUS_WAITLOCK assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK - - # Write to disk - queue.UpdateJobUnlocked(job) + assert job.start_timestamp and op.start_timestamp logging.info("%s: opcode %s waiting for locks", opctx.log_prefix, opctx.summary) @@ -1005,7 +1022,7 @@ class _JobProcessor(object): op.status = op_status op.result = op_result - if op.status == constants.OP_STATUS_QUEUED: + if op.status == constants.OP_STATUS_WAITLOCK: # Couldn't get locks in time assert not op.end_timestamp else: @@ -1018,10 +1035,12 @@ class _JobProcessor(object): else: assert op.status in constants.OPS_FINALIZED - if op.status == constants.OP_STATUS_QUEUED: + if op.status == constants.OP_STATUS_WAITLOCK: finalize = False - opctx.CheckPriorityIncrease() + if opctx.CheckPriorityIncrease(): + # Priority was changed, need to update on-disk file + queue.UpdateJobUnlocked(job) # Keep around for another round job.cur_opctx = opctx @@ -1030,9 +1049,7 @@ class _JobProcessor(object): op.priority >= constants.OP_PRIO_HIGHEST) # In no case must the status be finalized here - assert job.CalcStatus() == constants.JOB_STATUS_QUEUED - - queue.UpdateJobUnlocked(job) + assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK else: # Ensure all opcodes so far have been successful diff --git a/test/ganeti.jqueue_unittest.py b/test/ganeti.jqueue_unittest.py index 0939e08a08c7633c975a86f242a5e3b14a91cc38..02ea67f5fccb0175298e413ee6977fe22f66baa3 100755 --- a/test/ganeti.jqueue_unittest.py +++ b/test/ganeti.jqueue_unittest.py @@ -427,10 +427,14 @@ class TestQueuedJob(unittest.TestCase): class _FakeQueueForProc: def __init__(self): self._acquired = False + self._updates = [] def IsAcquired(self): return self._acquired + def GetNextUpdate(self): + return self._updates.pop(0) + def acquire(self, shared=0): assert shared == 1 self._acquired = True @@ -439,18 +443,21 @@ class _FakeQueueForProc: assert self._acquired self._acquired = False - def UpdateJobUnlocked(self, job, replicate=None): - # TODO: Ensure job is updated at the correct places - pass + def UpdateJobUnlocked(self, job, replicate=True): + assert self._acquired, "Lock not acquired while updating job" + self._updates.append((job, bool(replicate))) class _FakeExecOpCodeForProc: - def __init__(self, before_start, after_start): + def __init__(self, queue, before_start, after_start): + self._queue = queue self._before_start = before_start self._after_start = after_start def __call__(self, op, cbs, timeout=None, priority=None): assert isinstance(op, opcodes.OpTestDummy) + assert not self._queue.IsAcquired(), \ + "Queue lock not released when executing opcode" if self._before_start: self._before_start(timeout, priority) @@ -460,6 +467,9 @@ class _FakeExecOpCodeForProc: if self._after_start: self._after_start(op, cbs) + # Check again after the callbacks + assert not self._queue.IsAcquired() + if op.fail: raise errors.OpExecError("Error requested (%s)" % op.result) @@ -507,21 +517,31 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): job = self._CreateJob(queue, job_id, ops) def _BeforeStart(timeout, priority): + self.assertEqual(queue.GetNextUpdate(), (job, True)) + self.assertRaises(IndexError, queue.GetNextUpdate) self.assertFalse(queue.IsAcquired()) self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK) + self.assertFalse(job.cur_opctx) def _AfterStart(op, cbs): + self.assertEqual(queue.GetNextUpdate(), (job, True)) + self.assertRaises(IndexError, queue.GetNextUpdate) + self.assertFalse(queue.IsAcquired()) self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING) + self.assertFalse(job.cur_opctx) # Job is running, cancelling shouldn't be possible (success, _) = job.Cancel() self.assertFalse(success) - opexec = _FakeExecOpCodeForProc(_BeforeStart, _AfterStart) + opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart) for idx in range(len(ops)): + self.assertRaises(IndexError, queue.GetNextUpdate) result = jqueue._JobProcessor(queue, opexec, job)() + self.assertEqual(queue.GetNextUpdate(), (job, True)) + self.assertRaises(IndexError, queue.GetNextUpdate) if idx == len(ops) - 1: # Last opcode self.assert_(result) @@ -532,6 +552,8 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): self.assert_(job.start_timestamp) self.assertFalse(job.end_timestamp) + self.assertRaises(IndexError, queue.GetNextUpdate) + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS) self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS]) self.assertEqual(job.GetInfo(["opresult"]), @@ -568,10 +590,18 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): # Create job job = self._CreateJob(queue, job_id, ops) - opexec = _FakeExecOpCodeForProc(None, None) + opexec = _FakeExecOpCodeForProc(queue, None, None) for idx in range(len(ops)): + self.assertRaises(IndexError, queue.GetNextUpdate) result = jqueue._JobProcessor(queue, opexec, job)() + # queued to waitlock + self.assertEqual(queue.GetNextUpdate(), (job, True)) + # waitlock to running + self.assertEqual(queue.GetNextUpdate(), (job, True)) + # Opcode result + self.assertEqual(queue.GetNextUpdate(), (job, True)) + self.assertRaises(IndexError, queue.GetNextUpdate) if idx in (failfrom, len(ops) - 1): # Last opcode @@ -582,6 +612,8 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) + self.assertRaises(IndexError, queue.GetNextUpdate) + # Check job status self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR) self.assertEqual(job.GetInfo(["id"]), [job_id]) @@ -631,10 +663,12 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): (success, _) = job.Cancel() self.assert_(success) + self.assertRaises(IndexError, queue.GetNextUpdate) + self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELED for op in job.ops)) - opexec = _FakeExecOpCodeForProc(None, None) + opexec = _FakeExecOpCodeForProc(queue, None, None) jqueue._JobProcessor(queue, opexec, job)() # Check result @@ -661,6 +695,8 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) def _BeforeStart(timeout, priority): + self.assertEqual(queue.GetNextUpdate(), (job, True)) + self.assertRaises(IndexError, queue.GetNextUpdate) self.assertFalse(queue.IsAcquired()) self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK) @@ -670,14 +706,20 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING for op in job.ops)) + self.assertRaises(IndexError, queue.GetNextUpdate) def _AfterStart(op, cbs): + self.assertEqual(queue.GetNextUpdate(), (job, True)) + self.assertRaises(IndexError, queue.GetNextUpdate) self.assertFalse(queue.IsAcquired()) self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING) - opexec = _FakeExecOpCodeForProc(_BeforeStart, _AfterStart) + opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart) - jqueue._JobProcessor(queue, opexec, job)() + self.assertRaises(IndexError, queue.GetNextUpdate) + self.assert_(jqueue._JobProcessor(queue, opexec, job)()) + self.assertEqual(queue.GetNextUpdate(), (job, True)) + self.assertRaises(IndexError, queue.GetNextUpdate) # Check result self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED) @@ -719,7 +761,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): def _AfterStart(op, cbs): self.fail("Should not reach this") - opexec = _FakeExecOpCodeForProc(_BeforeStart, _AfterStart) + opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart) self.assert_(jqueue._JobProcessor(queue, opexec, job)()) @@ -747,7 +789,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) - opexec = _FakeExecOpCodeForProc(None, None) + opexec = _FakeExecOpCodeForProc(queue, None, None) # Run one opcode self.assertFalse(jqueue._JobProcessor(queue, opexec, job)()) @@ -783,7 +825,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): # program was restarted queue = _FakeQueueForProc() - opexec = _FakeExecOpCodeForProc(None, None) + opexec = _FakeExecOpCodeForProc(queue, None, None) for job_id, successcount in [(30697, 1), (2552, 4), (12489, 9)]: ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False) @@ -816,7 +858,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp) queue = _FakeQueueForProc() - opexec = _FakeExecOpCodeForProc(None, None) + opexec = _FakeExecOpCodeForProc(queue, None, None) for remaining in reversed(range(len(job.ops) - successcount)): result = jqueue._JobProcessor(queue, opexec, job)() @@ -854,7 +896,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): ops = [opcodes.OpTestDummy(result="result", fail=False)] queue = _FakeQueueForProc() - opexec = _FakeExecOpCodeForProc(None, None) + opexec = _FakeExecOpCodeForProc(queue, None, None) # Create job job = self._CreateJob(queue, 9571, ops) @@ -894,10 +936,14 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): job = self._CreateJob(queue, 29386, ops) def _BeforeStart(timeout, priority): + self.assertEqual(queue.GetNextUpdate(), (job, True)) + self.assertRaises(IndexError, queue.GetNextUpdate) self.assertFalse(queue.IsAcquired()) self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK) def _AfterStart(op, cbs): + self.assertEqual(queue.GetNextUpdate(), (job, True)) + self.assertRaises(IndexError, queue.GetNextUpdate) self.assertFalse(queue.IsAcquired()) self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING) @@ -905,15 +951,22 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): "too", "many", "arguments") for (log_type, msg) in op.messages: + self.assertRaises(IndexError, queue.GetNextUpdate) if log_type: cbs.Feedback(log_type, msg) else: cbs.Feedback(msg) + # Check for job update without replication + self.assertEqual(queue.GetNextUpdate(), (job, False)) + self.assertRaises(IndexError, queue.GetNextUpdate) - opexec = _FakeExecOpCodeForProc(_BeforeStart, _AfterStart) + opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart) for remaining in reversed(range(len(job.ops))): + self.assertRaises(IndexError, queue.GetNextUpdate) result = jqueue._JobProcessor(queue, opexec, job)() + self.assertEqual(queue.GetNextUpdate(), (job, True)) + self.assertRaises(IndexError, queue.GetNextUpdate) if remaining == 0: # Last opcode @@ -924,6 +977,8 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) + self.assertRaises(IndexError, queue.GetNextUpdate) + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS) self.assertEqual(job.GetInfo(["opresult"]), [[op.input.result for op in job.ops]]) @@ -996,12 +1051,19 @@ class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils): self.retries = 0 self.prev_tsop = None self.prev_prio = None + self.prev_status = None + self.lock_acq_prio = None self.gave_lock = None self.done_lock_before_blocking = False def _BeforeStart(self, timeout, priority): job = self.job + # If status has changed, job must've been written + if self.prev_status != self.job.ops[self.curop].status: + self.assertEqual(self.queue.GetNextUpdate(), (job, True)) + self.assertRaises(IndexError, self.queue.GetNextUpdate) + self.assertFalse(self.queue.IsAcquired()) self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK) @@ -1012,6 +1074,7 @@ class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils): self.assertEqual(priority, job.ops[self.curop].priority) self.gave_lock = True + self.lock_acq_prio = priority if (self.curop == 3 and job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST + 3): @@ -1035,6 +1098,10 @@ class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils): def _AfterStart(self, op, cbs): job = self.job + # Setting to "running" requires an update + self.assertEqual(self.queue.GetNextUpdate(), (job, True)) + self.assertRaises(IndexError, self.queue.GetNextUpdate) + self.assertFalse(self.queue.IsAcquired()) self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING) @@ -1045,6 +1112,7 @@ class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils): def _NextOpcode(self): self.curop = self.opcounter.next() self.prev_prio = self.job.ops[self.curop].priority + self.prev_status = self.job.ops[self.curop].status def _NewTimeoutStrategy(self): job = self.job @@ -1110,30 +1178,62 @@ class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils): self.opcounter = itertools.count(0) - opexec = _FakeExecOpCodeForProc(self._BeforeStart, self._AfterStart) + opexec = _FakeExecOpCodeForProc(self.queue, self._BeforeStart, + self._AfterStart) tsf = self._NewTimeoutStrategy self.assertFalse(self.done_lock_before_blocking) - for i in itertools.count(0): + while True: proc = jqueue._JobProcessor(self.queue, opexec, job, _timeout_strategy_factory=tsf) + self.assertRaises(IndexError, self.queue.GetNextUpdate) + + if self.curop is not None: + self.prev_status = self.job.ops[self.curop].status + + self.lock_acq_prio = None + result = proc(_nextop_fn=self._NextOpcode) + assert self.curop is not None + + if result or self.gave_lock: + # Got lock and/or job is done, result must've been written + self.assertFalse(job.cur_opctx) + self.assertEqual(self.queue.GetNextUpdate(), (job, True)) + self.assertRaises(IndexError, self.queue.GetNextUpdate) + self.assertEqual(self.lock_acq_prio, job.ops[self.curop].priority) + self.assert_(job.ops[self.curop].exec_timestamp) + if result: self.assertFalse(job.cur_opctx) break self.assertFalse(result) + if self.curop == 0: + self.assertEqual(job.ops[self.curop].start_timestamp, + job.start_timestamp) + if self.gave_lock: - self.assertFalse(job.cur_opctx) + # Opcode finished, but job not yet done + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) else: + # Did not get locks self.assert_(job.cur_opctx) self.assertEqual(job.cur_opctx._timeout_strategy._fn, self.timeout_strategy.NextAttempt) + self.assertFalse(job.ops[self.curop].exec_timestamp) + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK) + + # If priority has changed since acquiring locks, the job must've been + # updated + if self.lock_acq_prio != job.ops[self.curop].priority: + self.assertEqual(self.queue.GetNextUpdate(), (job, True)) + + self.assertRaises(IndexError, self.queue.GetNextUpdate) - self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) self.assert_(job.start_timestamp) self.assertFalse(job.end_timestamp) @@ -1142,6 +1242,7 @@ class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils): self.assertEqual(self.opcounter.next(), len(job.ops)) self.assert_(self.done_lock_before_blocking) + self.assertRaises(IndexError, self.queue.GetNextUpdate) self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS) self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS]) self.assertEqual(job.GetInfo(["opresult"]),