diff --git a/lib/jqueue.py b/lib/jqueue.py index 7411f4665dfb45aa246bd1063cdfa756747a916d..e2b4cf4fd3f109843d5ae4db07ccd660f5cdf886 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -916,7 +916,7 @@ class _JobProcessor(object): # Make sure not to hold queue lock while calling ExecOpCode result = self.opexec_fn(op.input, _OpExecCallbacks(self.queue, self.job, op), - timeout=timeout) + timeout=timeout, priority=op.priority) except mcpu.LockAcquireTimeout: assert timeout is not None, "Received timeout for blocking acquire" logging.debug("Couldn't acquire locks in %0.6fs", timeout) diff --git a/test/ganeti.jqueue_unittest.py b/test/ganeti.jqueue_unittest.py index ba56ae5686e80e14ecdc081376bb492ab060d183..dddf7c9f44ad306e9b0fe544b079546f6ae3026a 100755 --- a/test/ganeti.jqueue_unittest.py +++ b/test/ganeti.jqueue_unittest.py @@ -449,11 +449,11 @@ class _FakeExecOpCodeForProc: self._before_start = before_start self._after_start = after_start - def __call__(self, op, cbs, timeout=None): + def __call__(self, op, cbs, timeout=None, priority=None): assert isinstance(op, opcodes.OpTestDummy) if self._before_start: - self._before_start(timeout) + self._before_start(timeout, priority) cbs.NotifyStart() @@ -506,7 +506,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): # Create job job = self._CreateJob(queue, job_id, ops) - def _BeforeStart(_): + def _BeforeStart(timeout, priority): self.assertFalse(queue.IsAcquired()) self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK) @@ -660,7 +660,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) - def _BeforeStart(_): + def _BeforeStart(timeout, priority): self.assertFalse(queue.IsAcquired()) self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK) @@ -849,7 +849,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): # Create job job = self._CreateJob(queue, 29386, ops) - def _BeforeStart(_): + def _BeforeStart(timeout, priority): self.assertFalse(queue.IsAcquired()) self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK) @@ -955,7 +955,7 @@ class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils): self.gave_lock = None self.done_lock_before_blocking = False - def _BeforeStart(self, timeout): + def _BeforeStart(self, timeout, priority): job = self.job self.assertFalse(self.queue.IsAcquired()) @@ -965,6 +965,7 @@ class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils): self.assert_(timeout is None or isinstance(timeout, (int, float))) self.assertEqual(timeout, ts.last_timeout) + self.assertEqual(priority, job.ops[self.curop].priority) self.gave_lock = True