Commit df5a5730 authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

jqueue: Factorize checking job processor's result



This allows for more unittesting.
Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarIustin Pop <iustin@google.com>
parent 1b5150ab
......@@ -1237,6 +1237,29 @@ class _JobProcessor(object):
queue.release()
def _EvaluateJobProcessorResult(depmgr, job, result):
"""Looks at a result from L{_JobProcessor} for a job.
To be used in a L{_JobQueueWorker}.
"""
if result == _JobProcessor.FINISHED:
# Notify waiting jobs
depmgr.NotifyWaiters(job.id)
elif result == _JobProcessor.DEFER:
# Schedule again
raise workerpool.DeferTask(priority=job.CalcPriority())
elif result == _JobProcessor.WAITDEP:
# No-op, dependency manager will re-schedule
pass
else:
raise errors.ProgrammerError("Job processor returned unknown status %s" %
(result, ))
class _JobQueueWorker(workerpool.BaseWorker):
"""The actual job workers.
......@@ -1277,23 +1300,8 @@ class _JobQueueWorker(workerpool.BaseWorker):
wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
proc.ExecOpCode)
result = _JobProcessor(queue, wrap_execop_fn, job)()
if result == _JobProcessor.FINISHED:
# Notify waiting jobs
queue.depmgr.NotifyWaiters(job.id)
elif result == _JobProcessor.DEFER:
# Schedule again
raise workerpool.DeferTask(priority=job.CalcPriority())
elif result == _JobProcessor.WAITDEP:
# No-op, dependency manager will re-schedule
pass
else:
raise errors.ProgrammerError("Job processor returned unknown status %s" %
(result, ))
_EvaluateJobProcessorResult(queue.depmgr, job,
_JobProcessor(queue, wrap_execop_fn, job)())
@staticmethod
def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
......
......@@ -38,6 +38,7 @@ from ganeti import opcodes
from ganeti import compat
from ganeti import mcpu
from ganeti import query
from ganeti import workerpool
import testutils
......@@ -1625,6 +1626,43 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
self.assertRaises(IndexError, queue.GetNextUpdate)
class TestEvaluateJobProcessorResult(unittest.TestCase):
def testFinished(self):
depmgr = _FakeDependencyManager()
job = _IdOnlyFakeJob(30953)
jqueue._EvaluateJobProcessorResult(depmgr, job,
jqueue._JobProcessor.FINISHED)
self.assertEqual(depmgr.GetNextNotification(), job.id)
self.assertRaises(IndexError, depmgr.GetNextNotification)
def testDefer(self):
depmgr = _FakeDependencyManager()
job = _IdOnlyFakeJob(11326, priority=5463)
try:
jqueue._EvaluateJobProcessorResult(depmgr, job,
jqueue._JobProcessor.DEFER)
except workerpool.DeferTask, err:
self.assertEqual(err.priority, 5463)
else:
self.fail("Didn't raise exception")
self.assertRaises(IndexError, depmgr.GetNextNotification)
def testWaitdep(self):
depmgr = _FakeDependencyManager()
job = _IdOnlyFakeJob(21317)
jqueue._EvaluateJobProcessorResult(depmgr, job,
jqueue._JobProcessor.WAITDEP)
self.assertRaises(IndexError, depmgr.GetNextNotification)
def testOther(self):
depmgr = _FakeDependencyManager()
job = _IdOnlyFakeJob(5813)
self.assertRaises(errors.ProgrammerError,
jqueue._EvaluateJobProcessorResult,
depmgr, job, "Other result")
self.assertRaises(IndexError, depmgr.GetNextNotification)
class _FakeTimeoutStrategy:
def __init__(self, timeouts):
self.timeouts = timeouts
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment