Commit 4679547e authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

jqueue: Allow changing of job priority



This is due to a feature request. Sometimes one wants to change the
priority of a job after it has been submitted, e.g. after submitting an
important job only to later notice many other pending jobs which will be
processed first. Priority changes only take effect at the next lock
acquisition or when the job is re-scheduled.

The design is very similar to how jobs are cancelled.

Unit tests for “_QueuedJob.ChangePriority” are included.

Also rename “TestQueuedJob.test” to “TestQueuedJob.testError”.
Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarBernardo Dal Seno <bdalseno@google.com>
parent 99fb250b
......@@ -482,6 +482,52 @@ class _QueuedJob(object):
logging.debug("Job %s is no longer waiting in the queue", self.id)
return (False, "Job %s is no longer waiting in the queue" % self.id)
def ChangePriority(self, priority):
"""Changes the job priority.
@type priority: int
@param priority: New priority
@rtype: tuple; (bool, string)
@return: Boolean describing whether job's priority was successfully changed
and a text message
"""
status = self.CalcStatus()
if status in constants.JOBS_FINALIZED:
return (False, "Job %s is finished" % self.id)
elif status == constants.JOB_STATUS_CANCELING:
return (False, "Job %s is cancelling" % self.id)
else:
assert status in (constants.JOB_STATUS_QUEUED,
constants.JOB_STATUS_WAITING,
constants.JOB_STATUS_RUNNING)
changed = False
for op in self.ops:
if (op.status == constants.OP_STATUS_RUNNING or
op.status in constants.OPS_FINALIZED):
assert not changed, \
("Found opcode for which priority should not be changed after"
" priority has been changed for previous opcodes")
continue
assert op.status in (constants.OP_STATUS_QUEUED,
constants.OP_STATUS_WAITING)
changed = True
# Note: this also changes the on-disk priority ("op.priority" is only in
# memory)
op.input.priority = priority
op.priority = priority
if changed:
return (True, ("Priorities of pending opcodes for job %s have been"
" changed to %s" % (self.id, priority)))
else:
return (False, "Job %s had no pending opcodes" % self.id)
class _OpExecCallbacks(mcpu.OpExecCbBase):
def __init__(self, queue, job, op):
......@@ -2356,6 +2402,37 @@ class JobQueue(object):
return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
@locking.ssynchronized(_LOCK)
@_RequireOpenQueue
def ChangeJobPriority(self, job_id, priority):
"""Changes a job's priority.
@type job_id: int
@param job_id: ID of the job whose priority should be changed
@type priority: int
@param priority: New priority
"""
logging.info("Changing priority of job %s to %s", job_id, priority)
if priority not in constants.OP_PRIO_SUBMIT_VALID:
allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
raise errors.GenericError("Invalid priority %s, allowed are %s" %
(priority, allowed))
def fn(job):
(success, msg) = job.ChangePriority(priority)
if success:
try:
self._wpool.ChangeTaskPriority(job.id, job.CalcPriority())
except workerpool.NoSuchTask:
logging.debug("Job %s is not in workerpool at this time", job.id)
return (success, msg)
return self._ModifyJobUnlocked(job_id, fn)
def _ModifyJobUnlocked(self, job_id, mod_fn):
"""Modifies a job.
......
......@@ -29,6 +29,7 @@ import shutil
import errno
import itertools
import random
import operator
from ganeti import constants
from ganeti import utils
......@@ -281,7 +282,7 @@ class TestQueuedOpCode(unittest.TestCase):
class TestQueuedJob(unittest.TestCase):
def test(self):
def testNoOpCodes(self):
self.assertRaises(errors.GenericError, jqueue._QueuedJob,
None, 1, [], False)
......@@ -371,6 +372,181 @@ class TestQueuedJob(unittest.TestCase):
job.ops[0].priority -= 19
self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20)
def _JobForPriority(self, job_id):
ops = [
opcodes.OpTagsGet(),
opcodes.OpTestDelay(),
opcodes.OpTagsGet(),
opcodes.OpTestDelay(),
]
job = jqueue._QueuedJob(None, job_id, ops, True)
self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
for op in job.ops))
self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
self.assertFalse(compat.any(hasattr(op.input, "priority")
for op in job.ops))
return job
def testChangePriorityAllQueued(self):
job = self._JobForPriority(24984)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
self.assertTrue(compat.all(op.status == constants.OP_STATUS_QUEUED
for op in job.ops))
result = job.ChangePriority(-10)
self.assertEqual(job.CalcPriority(), -10)
self.assertTrue(compat.all(op.priority == -10 for op in job.ops))
self.assertTrue(compat.all(op.input.priority == -10 for op in job.ops))
self.assertEqual(result,
(True, ("Priorities of pending opcodes for job 24984 have"
" been changed to -10")))
def testChangePriorityAllFinished(self):
job = self._JobForPriority(16405)
for (idx, op) in enumerate(job.ops):
if idx > 2:
op.status = constants.OP_STATUS_ERROR
else:
op.status = constants.OP_STATUS_SUCCESS
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
result = job.ChangePriority(-10)
self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
for op in job.ops))
self.assertFalse(compat.any(hasattr(op.input, "priority")
for op in job.ops))
self.assertEqual(map(operator.attrgetter("status"), job.ops), [
constants.OP_STATUS_SUCCESS,
constants.OP_STATUS_SUCCESS,
constants.OP_STATUS_SUCCESS,
constants.OP_STATUS_ERROR,
])
self.assertEqual(result, (False, "Job 16405 is finished"))
def testChangePriorityCancelling(self):
job = self._JobForPriority(31572)
for (idx, op) in enumerate(job.ops):
if idx > 1:
op.status = constants.OP_STATUS_CANCELING
else:
op.status = constants.OP_STATUS_SUCCESS
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELING)
self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
result = job.ChangePriority(5)
self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
for op in job.ops))
self.assertFalse(compat.any(hasattr(op.input, "priority")
for op in job.ops))
self.assertEqual(map(operator.attrgetter("status"), job.ops), [
constants.OP_STATUS_SUCCESS,
constants.OP_STATUS_SUCCESS,
constants.OP_STATUS_CANCELING,
constants.OP_STATUS_CANCELING,
])
self.assertEqual(result, (False, "Job 31572 is cancelling"))
def testChangePriorityFirstRunning(self):
job = self._JobForPriority(1716215889)
for (idx, op) in enumerate(job.ops):
if idx == 0:
op.status = constants.OP_STATUS_RUNNING
else:
op.status = constants.OP_STATUS_QUEUED
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
result = job.ChangePriority(7)
self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
self.assertEqual(map(operator.attrgetter("priority"), job.ops),
[constants.OP_PRIO_DEFAULT, 7, 7, 7])
self.assertEqual([getattr(op.input, "priority", None) for op in job.ops],
[None, 7, 7, 7])
self.assertEqual(map(operator.attrgetter("status"), job.ops), [
constants.OP_STATUS_RUNNING,
constants.OP_STATUS_QUEUED,
constants.OP_STATUS_QUEUED,
constants.OP_STATUS_QUEUED,
])
self.assertEqual(result,
(True, ("Priorities of pending opcodes for job"
" 1716215889 have been changed to 7")))
def testChangePriorityLastRunning(self):
job = self._JobForPriority(1308)
for (idx, op) in enumerate(job.ops):
if idx == (len(job.ops) - 1):
op.status = constants.OP_STATUS_RUNNING
else:
op.status = constants.OP_STATUS_SUCCESS
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
result = job.ChangePriority(-3)
self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
for op in job.ops))
self.assertFalse(compat.any(hasattr(op.input, "priority")
for op in job.ops))
self.assertEqual(map(operator.attrgetter("status"), job.ops), [
constants.OP_STATUS_SUCCESS,
constants.OP_STATUS_SUCCESS,
constants.OP_STATUS_SUCCESS,
constants.OP_STATUS_RUNNING,
])
self.assertEqual(result, (False, "Job 1308 had no pending opcodes"))
def testChangePrioritySecondOpcodeRunning(self):
job = self._JobForPriority(27701)
self.assertEqual(len(job.ops), 4)
job.ops[0].status = constants.OP_STATUS_SUCCESS
job.ops[1].status = constants.OP_STATUS_RUNNING
job.ops[2].status = constants.OP_STATUS_QUEUED
job.ops[3].status = constants.OP_STATUS_QUEUED
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
result = job.ChangePriority(-19)
self.assertEqual(job.CalcPriority(), -19)
self.assertEqual(map(operator.attrgetter("priority"), job.ops),
[constants.OP_PRIO_DEFAULT, constants.OP_PRIO_DEFAULT,
-19, -19])
self.assertEqual([getattr(op.input, "priority", None) for op in job.ops],
[None, None, -19, -19])
self.assertEqual(map(operator.attrgetter("status"), job.ops), [
constants.OP_STATUS_SUCCESS,
constants.OP_STATUS_RUNNING,
constants.OP_STATUS_QUEUED,
constants.OP_STATUS_QUEUED,
])
self.assertEqual(result,
(True, ("Priorities of pending opcodes for job"
" 27701 have been changed to -19")))
def testChangePriorityWithInconsistentJob(self):
job = self._JobForPriority(30097)
self.assertEqual(len(job.ops), 4)
# This job is invalid (as it has two opcodes marked as running) and make
# the call fail because an unprocessed opcode precedes a running one (which
# should never happen in reality)
job.ops[0].status = constants.OP_STATUS_SUCCESS
job.ops[1].status = constants.OP_STATUS_RUNNING
job.ops[2].status = constants.OP_STATUS_QUEUED
job.ops[3].status = constants.OP_STATUS_RUNNING
self.assertRaises(AssertionError, job.ChangePriority, 19)
def testCalcStatus(self):
def _Queued(ops):
# The default status is "queued"
......@@ -2105,6 +2281,98 @@ class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
self.assertRaises(IndexError, self.queue.GetNextUpdate)
class TestJobProcessorChangePriority(unittest.TestCase, _JobProcessorTestUtils):
def setUp(self):
self.queue = _FakeQueueForProc()
self.opexecprio = []
def _BeforeStart(self, timeout, priority):
self.assertFalse(self.queue.IsAcquired())
self.opexecprio.append(priority)
def testChangePriorityWhileRunning(self):
# Tests changing the priority on a job while it has finished opcodes
# (successful) and more, unprocessed ones
ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
for i in range(3)]
# Create job
job_id = 3499
job = self._CreateJob(self.queue, job_id, ops)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
opexec = _FakeExecOpCodeForProc(self.queue, self._BeforeStart, None)
# Run first opcode
self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
jqueue._JobProcessor.DEFER)
# Job goes back to queued
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
[[constants.OP_STATUS_SUCCESS,
constants.OP_STATUS_QUEUED,
constants.OP_STATUS_QUEUED],
["Res0", None, None]])
self.assertEqual(self.opexecprio.pop(0), constants.OP_PRIO_DEFAULT)
self.assertRaises(IndexError, self.opexecprio.pop, 0)
# Change priority
self.assertEqual(job.ChangePriority(-10),
(True,
("Priorities of pending opcodes for job 3499 have"
" been changed to -10")))
self.assertEqual(job.CalcPriority(), -10)
# Process second opcode
self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
jqueue._JobProcessor.DEFER)
self.assertEqual(self.opexecprio.pop(0), -10)
self.assertRaises(IndexError, self.opexecprio.pop, 0)
# Check status
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
self.assertEqual(job.CalcPriority(), -10)
self.assertEqual(job.GetInfo(["id"]), [job_id])
self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_QUEUED])
self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
[[constants.OP_STATUS_SUCCESS,
constants.OP_STATUS_SUCCESS,
constants.OP_STATUS_QUEUED],
["Res0", "Res1", None]])
# Change priority once more
self.assertEqual(job.ChangePriority(5),
(True,
("Priorities of pending opcodes for job 3499 have"
" been changed to 5")))
self.assertEqual(job.CalcPriority(), 5)
# Process third opcode
self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
jqueue._JobProcessor.FINISHED)
self.assertEqual(self.opexecprio.pop(0), 5)
self.assertRaises(IndexError, self.opexecprio.pop, 0)
# Check status
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
self.assertEqual(job.GetInfo(["id"]), [job_id])
self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
[[constants.OP_STATUS_SUCCESS,
constants.OP_STATUS_SUCCESS,
constants.OP_STATUS_SUCCESS],
["Res0", "Res1", "Res2"]])
self.assertEqual(map(operator.attrgetter("priority"), job.ops),
[constants.OP_PRIO_DEFAULT, -10, 5])
class _IdOnlyFakeJob:
def __init__(self, job_id, priority=NotImplemented):
self.id = str(job_id)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment