diff --git a/lib/jqueue.py b/lib/jqueue.py index 3319a0a5ffb7255251727793f89b8bcdf80d73b3..d1b3b521541d423d0fbe901a6e39a40a94918134 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -482,6 +482,52 @@ class _QueuedJob(object): logging.debug("Job %s is no longer waiting in the queue", self.id) return (False, "Job %s is no longer waiting in the queue" % self.id) + def ChangePriority(self, priority): + """Changes the job priority. + + @type priority: int + @param priority: New priority + @rtype: tuple; (bool, string) + @return: Boolean describing whether job's priority was successfully changed + and a text message + + """ + status = self.CalcStatus() + + if status in constants.JOBS_FINALIZED: + return (False, "Job %s is finished" % self.id) + elif status == constants.JOB_STATUS_CANCELING: + return (False, "Job %s is cancelling" % self.id) + else: + assert status in (constants.JOB_STATUS_QUEUED, + constants.JOB_STATUS_WAITING, + constants.JOB_STATUS_RUNNING) + + changed = False + for op in self.ops: + if (op.status == constants.OP_STATUS_RUNNING or + op.status in constants.OPS_FINALIZED): + assert not changed, \ + ("Found opcode for which priority should not be changed after" + " priority has been changed for previous opcodes") + continue + + assert op.status in (constants.OP_STATUS_QUEUED, + constants.OP_STATUS_WAITING) + + changed = True + + # Note: this also changes the on-disk priority ("op.priority" is only in + # memory) + op.input.priority = priority + op.priority = priority + + if changed: + return (True, ("Priorities of pending opcodes for job %s have been" + " changed to %s" % (self.id, priority))) + else: + return (False, "Job %s had no pending opcodes" % self.id) + class _OpExecCallbacks(mcpu.OpExecCbBase): def __init__(self, queue, job, op): @@ -2356,6 +2402,37 @@ class JobQueue(object): return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel()) + @locking.ssynchronized(_LOCK) + @_RequireOpenQueue + def ChangeJobPriority(self, job_id, priority): + """Changes a job's priority. + + @type job_id: int + @param job_id: ID of the job whose priority should be changed + @type priority: int + @param priority: New priority + + """ + logging.info("Changing priority of job %s to %s", job_id, priority) + + if priority not in constants.OP_PRIO_SUBMIT_VALID: + allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID) + raise errors.GenericError("Invalid priority %s, allowed are %s" % + (priority, allowed)) + + def fn(job): + (success, msg) = job.ChangePriority(priority) + + if success: + try: + self._wpool.ChangeTaskPriority(job.id, job.CalcPriority()) + except workerpool.NoSuchTask: + logging.debug("Job %s is not in workerpool at this time", job.id) + + return (success, msg) + + return self._ModifyJobUnlocked(job_id, fn) + def _ModifyJobUnlocked(self, job_id, mod_fn): """Modifies a job. diff --git a/test/ganeti.jqueue_unittest.py b/test/ganeti.jqueue_unittest.py index a5b19ad3e82cc9432c987e75b93ee8fe9dc91b6a..34cb71b895e5c20db3761cc77bc744b0445f3e30 100755 --- a/test/ganeti.jqueue_unittest.py +++ b/test/ganeti.jqueue_unittest.py @@ -29,6 +29,7 @@ import shutil import errno import itertools import random +import operator from ganeti import constants from ganeti import utils @@ -281,7 +282,7 @@ class TestQueuedOpCode(unittest.TestCase): class TestQueuedJob(unittest.TestCase): - def test(self): + def testNoOpCodes(self): self.assertRaises(errors.GenericError, jqueue._QueuedJob, None, 1, [], False) @@ -371,6 +372,181 @@ class TestQueuedJob(unittest.TestCase): job.ops[0].priority -= 19 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20) + def _JobForPriority(self, job_id): + ops = [ + opcodes.OpTagsGet(), + opcodes.OpTestDelay(), + opcodes.OpTagsGet(), + opcodes.OpTestDelay(), + ] + + job = jqueue._QueuedJob(None, job_id, ops, True) + + self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT + for op in job.ops)) + self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT) + self.assertFalse(compat.any(hasattr(op.input, "priority") + for op in job.ops)) + + return job + + def testChangePriorityAllQueued(self): + job = self._JobForPriority(24984) + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) + self.assertTrue(compat.all(op.status == constants.OP_STATUS_QUEUED + for op in job.ops)) + result = job.ChangePriority(-10) + self.assertEqual(job.CalcPriority(), -10) + self.assertTrue(compat.all(op.priority == -10 for op in job.ops)) + self.assertTrue(compat.all(op.input.priority == -10 for op in job.ops)) + self.assertEqual(result, + (True, ("Priorities of pending opcodes for job 24984 have" + " been changed to -10"))) + + def testChangePriorityAllFinished(self): + job = self._JobForPriority(16405) + + for (idx, op) in enumerate(job.ops): + if idx > 2: + op.status = constants.OP_STATUS_ERROR + else: + op.status = constants.OP_STATUS_SUCCESS + + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR) + self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT) + result = job.ChangePriority(-10) + self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT) + self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT + for op in job.ops)) + self.assertFalse(compat.any(hasattr(op.input, "priority") + for op in job.ops)) + self.assertEqual(map(operator.attrgetter("status"), job.ops), [ + constants.OP_STATUS_SUCCESS, + constants.OP_STATUS_SUCCESS, + constants.OP_STATUS_SUCCESS, + constants.OP_STATUS_ERROR, + ]) + self.assertEqual(result, (False, "Job 16405 is finished")) + + def testChangePriorityCancelling(self): + job = self._JobForPriority(31572) + + for (idx, op) in enumerate(job.ops): + if idx > 1: + op.status = constants.OP_STATUS_CANCELING + else: + op.status = constants.OP_STATUS_SUCCESS + + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELING) + self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT) + result = job.ChangePriority(5) + self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT) + self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT + for op in job.ops)) + self.assertFalse(compat.any(hasattr(op.input, "priority") + for op in job.ops)) + self.assertEqual(map(operator.attrgetter("status"), job.ops), [ + constants.OP_STATUS_SUCCESS, + constants.OP_STATUS_SUCCESS, + constants.OP_STATUS_CANCELING, + constants.OP_STATUS_CANCELING, + ]) + self.assertEqual(result, (False, "Job 31572 is cancelling")) + + def testChangePriorityFirstRunning(self): + job = self._JobForPriority(1716215889) + + for (idx, op) in enumerate(job.ops): + if idx == 0: + op.status = constants.OP_STATUS_RUNNING + else: + op.status = constants.OP_STATUS_QUEUED + + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING) + self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT) + result = job.ChangePriority(7) + self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT) + self.assertEqual(map(operator.attrgetter("priority"), job.ops), + [constants.OP_PRIO_DEFAULT, 7, 7, 7]) + self.assertEqual([getattr(op.input, "priority", None) for op in job.ops], + [None, 7, 7, 7]) + self.assertEqual(map(operator.attrgetter("status"), job.ops), [ + constants.OP_STATUS_RUNNING, + constants.OP_STATUS_QUEUED, + constants.OP_STATUS_QUEUED, + constants.OP_STATUS_QUEUED, + ]) + self.assertEqual(result, + (True, ("Priorities of pending opcodes for job" + " 1716215889 have been changed to 7"))) + + def testChangePriorityLastRunning(self): + job = self._JobForPriority(1308) + + for (idx, op) in enumerate(job.ops): + if idx == (len(job.ops) - 1): + op.status = constants.OP_STATUS_RUNNING + else: + op.status = constants.OP_STATUS_SUCCESS + + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING) + self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT) + result = job.ChangePriority(-3) + self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT) + self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT + for op in job.ops)) + self.assertFalse(compat.any(hasattr(op.input, "priority") + for op in job.ops)) + self.assertEqual(map(operator.attrgetter("status"), job.ops), [ + constants.OP_STATUS_SUCCESS, + constants.OP_STATUS_SUCCESS, + constants.OP_STATUS_SUCCESS, + constants.OP_STATUS_RUNNING, + ]) + self.assertEqual(result, (False, "Job 1308 had no pending opcodes")) + + def testChangePrioritySecondOpcodeRunning(self): + job = self._JobForPriority(27701) + + self.assertEqual(len(job.ops), 4) + job.ops[0].status = constants.OP_STATUS_SUCCESS + job.ops[1].status = constants.OP_STATUS_RUNNING + job.ops[2].status = constants.OP_STATUS_QUEUED + job.ops[3].status = constants.OP_STATUS_QUEUED + + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING) + result = job.ChangePriority(-19) + self.assertEqual(job.CalcPriority(), -19) + self.assertEqual(map(operator.attrgetter("priority"), job.ops), + [constants.OP_PRIO_DEFAULT, constants.OP_PRIO_DEFAULT, + -19, -19]) + self.assertEqual([getattr(op.input, "priority", None) for op in job.ops], + [None, None, -19, -19]) + self.assertEqual(map(operator.attrgetter("status"), job.ops), [ + constants.OP_STATUS_SUCCESS, + constants.OP_STATUS_RUNNING, + constants.OP_STATUS_QUEUED, + constants.OP_STATUS_QUEUED, + ]) + self.assertEqual(result, + (True, ("Priorities of pending opcodes for job" + " 27701 have been changed to -19"))) + + def testChangePriorityWithInconsistentJob(self): + job = self._JobForPriority(30097) + + self.assertEqual(len(job.ops), 4) + + # This job is invalid (as it has two opcodes marked as running) and make + # the call fail because an unprocessed opcode precedes a running one (which + # should never happen in reality) + job.ops[0].status = constants.OP_STATUS_SUCCESS + job.ops[1].status = constants.OP_STATUS_RUNNING + job.ops[2].status = constants.OP_STATUS_QUEUED + job.ops[3].status = constants.OP_STATUS_RUNNING + + self.assertRaises(AssertionError, job.ChangePriority, 19) + def testCalcStatus(self): def _Queued(ops): # The default status is "queued" @@ -2105,6 +2281,98 @@ class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils): self.assertRaises(IndexError, self.queue.GetNextUpdate) +class TestJobProcessorChangePriority(unittest.TestCase, _JobProcessorTestUtils): + def setUp(self): + self.queue = _FakeQueueForProc() + self.opexecprio = [] + + def _BeforeStart(self, timeout, priority): + self.assertFalse(self.queue.IsAcquired()) + self.opexecprio.append(priority) + + def testChangePriorityWhileRunning(self): + # Tests changing the priority on a job while it has finished opcodes + # (successful) and more, unprocessed ones + ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False) + for i in range(3)] + + # Create job + job_id = 3499 + job = self._CreateJob(self.queue, job_id, ops) + + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) + + opexec = _FakeExecOpCodeForProc(self.queue, self._BeforeStart, None) + + # Run first opcode + self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(), + jqueue._JobProcessor.DEFER) + + # Job goes back to queued + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) + self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT) + self.assertEqual(job.GetInfo(["opstatus", "opresult"]), + [[constants.OP_STATUS_SUCCESS, + constants.OP_STATUS_QUEUED, + constants.OP_STATUS_QUEUED], + ["Res0", None, None]]) + + self.assertEqual(self.opexecprio.pop(0), constants.OP_PRIO_DEFAULT) + self.assertRaises(IndexError, self.opexecprio.pop, 0) + + # Change priority + self.assertEqual(job.ChangePriority(-10), + (True, + ("Priorities of pending opcodes for job 3499 have" + " been changed to -10"))) + self.assertEqual(job.CalcPriority(), -10) + + # Process second opcode + self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(), + jqueue._JobProcessor.DEFER) + + self.assertEqual(self.opexecprio.pop(0), -10) + self.assertRaises(IndexError, self.opexecprio.pop, 0) + + # Check status + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) + self.assertEqual(job.CalcPriority(), -10) + self.assertEqual(job.GetInfo(["id"]), [job_id]) + self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_QUEUED]) + self.assertEqual(job.GetInfo(["opstatus", "opresult"]), + [[constants.OP_STATUS_SUCCESS, + constants.OP_STATUS_SUCCESS, + constants.OP_STATUS_QUEUED], + ["Res0", "Res1", None]]) + + # Change priority once more + self.assertEqual(job.ChangePriority(5), + (True, + ("Priorities of pending opcodes for job 3499 have" + " been changed to 5"))) + self.assertEqual(job.CalcPriority(), 5) + + # Process third opcode + self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(), + jqueue._JobProcessor.FINISHED) + + self.assertEqual(self.opexecprio.pop(0), 5) + self.assertRaises(IndexError, self.opexecprio.pop, 0) + + # Check status + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS) + self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT) + self.assertEqual(job.GetInfo(["id"]), [job_id]) + self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS]) + self.assertEqual(job.GetInfo(["opstatus", "opresult"]), + [[constants.OP_STATUS_SUCCESS, + constants.OP_STATUS_SUCCESS, + constants.OP_STATUS_SUCCESS], + ["Res0", "Res1", "Res2"]]) + self.assertEqual(map(operator.attrgetter("priority"), job.ops), + [constants.OP_PRIO_DEFAULT, -10, 5]) + + class _IdOnlyFakeJob: def __init__(self, job_id, priority=NotImplemented): self.id = str(job_id)