diff --git a/lib/jqueue.py b/lib/jqueue.py index d5ea3cb79b81d5bc7e8d36a5079227d6605d9724..84f612045a9568c390c6ed68fe6f4c3b32f44838 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -1237,6 +1237,29 @@ class _JobProcessor(object): queue.release() +def _EvaluateJobProcessorResult(depmgr, job, result): + """Looks at a result from L{_JobProcessor} for a job. + + To be used in a L{_JobQueueWorker}. + + """ + if result == _JobProcessor.FINISHED: + # Notify waiting jobs + depmgr.NotifyWaiters(job.id) + + elif result == _JobProcessor.DEFER: + # Schedule again + raise workerpool.DeferTask(priority=job.CalcPriority()) + + elif result == _JobProcessor.WAITDEP: + # No-op, dependency manager will re-schedule + pass + + else: + raise errors.ProgrammerError("Job processor returned unknown status %s" % + (result, )) + + class _JobQueueWorker(workerpool.BaseWorker): """The actual job workers. @@ -1277,23 +1300,8 @@ class _JobQueueWorker(workerpool.BaseWorker): wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn, proc.ExecOpCode) - result = _JobProcessor(queue, wrap_execop_fn, job)() - - if result == _JobProcessor.FINISHED: - # Notify waiting jobs - queue.depmgr.NotifyWaiters(job.id) - - elif result == _JobProcessor.DEFER: - # Schedule again - raise workerpool.DeferTask(priority=job.CalcPriority()) - - elif result == _JobProcessor.WAITDEP: - # No-op, dependency manager will re-schedule - pass - - else: - raise errors.ProgrammerError("Job processor returned unknown status %s" % - (result, )) + _EvaluateJobProcessorResult(queue.depmgr, job, + _JobProcessor(queue, wrap_execop_fn, job)()) @staticmethod def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs): diff --git a/test/ganeti.jqueue_unittest.py b/test/ganeti.jqueue_unittest.py index 4669320def2e6256a8a3132287073a9ffe2bb66c..cd0bacdb510c2daaac148b4724003c97258784e5 100755 --- a/test/ganeti.jqueue_unittest.py +++ b/test/ganeti.jqueue_unittest.py @@ -38,6 +38,7 @@ from ganeti import opcodes from ganeti import compat from ganeti import mcpu from ganeti import query +from ganeti import workerpool import testutils @@ -1625,6 +1626,43 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): self.assertRaises(IndexError, queue.GetNextUpdate) +class TestEvaluateJobProcessorResult(unittest.TestCase): + def testFinished(self): + depmgr = _FakeDependencyManager() + job = _IdOnlyFakeJob(30953) + jqueue._EvaluateJobProcessorResult(depmgr, job, + jqueue._JobProcessor.FINISHED) + self.assertEqual(depmgr.GetNextNotification(), job.id) + self.assertRaises(IndexError, depmgr.GetNextNotification) + + def testDefer(self): + depmgr = _FakeDependencyManager() + job = _IdOnlyFakeJob(11326, priority=5463) + try: + jqueue._EvaluateJobProcessorResult(depmgr, job, + jqueue._JobProcessor.DEFER) + except workerpool.DeferTask, err: + self.assertEqual(err.priority, 5463) + else: + self.fail("Didn't raise exception") + self.assertRaises(IndexError, depmgr.GetNextNotification) + + def testWaitdep(self): + depmgr = _FakeDependencyManager() + job = _IdOnlyFakeJob(21317) + jqueue._EvaluateJobProcessorResult(depmgr, job, + jqueue._JobProcessor.WAITDEP) + self.assertRaises(IndexError, depmgr.GetNextNotification) + + def testOther(self): + depmgr = _FakeDependencyManager() + job = _IdOnlyFakeJob(5813) + self.assertRaises(errors.ProgrammerError, + jqueue._EvaluateJobProcessorResult, + depmgr, job, "Other result") + self.assertRaises(IndexError, depmgr.GetNextNotification) + + class _FakeTimeoutStrategy: def __init__(self, timeouts): self.timeouts = timeouts