Commit 8f5c488d authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

Add support for job priority to opcodes and job queue objects

This allows clients to submit opcodes with a priority. Except for being
tracked by the job queue, it is not yet used by any code.

Unittests for jqueue._QueuedOpCode and jqueue._QueuedJob are provided for
the first time.
Signed-off-by: default avatarMichael Hanselmann <>
Reviewed-by: default avatarIustin Pop <>
parent e5d8774b
...@@ -95,7 +95,7 @@ class _QueuedOpCode(object): ...@@ -95,7 +95,7 @@ class _QueuedOpCode(object):
@ivar stop_timestamp: timestamp for the end of the execution @ivar stop_timestamp: timestamp for the end of the execution
""" """
__slots__ = ["input", "status", "result", "log", __slots__ = ["input", "status", "result", "log", "priority",
"start_timestamp", "exec_timestamp", "end_timestamp", "start_timestamp", "exec_timestamp", "end_timestamp",
"__weakref__"] "__weakref__"]
...@@ -114,6 +114,9 @@ class _QueuedOpCode(object): ...@@ -114,6 +114,9 @@ class _QueuedOpCode(object):
self.exec_timestamp = None self.exec_timestamp = None
self.end_timestamp = None self.end_timestamp = None
# Get initial priority (it might change during the lifetime of this opcode)
self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
@classmethod @classmethod
def Restore(cls, state): def Restore(cls, state):
"""Restore the _QueuedOpCode from the serialized form. """Restore the _QueuedOpCode from the serialized form.
...@@ -132,6 +135,7 @@ class _QueuedOpCode(object): ...@@ -132,6 +135,7 @@ class _QueuedOpCode(object):
obj.start_timestamp = state.get("start_timestamp", None) obj.start_timestamp = state.get("start_timestamp", None)
obj.exec_timestamp = state.get("exec_timestamp", None) obj.exec_timestamp = state.get("exec_timestamp", None)
obj.end_timestamp = state.get("end_timestamp", None) obj.end_timestamp = state.get("end_timestamp", None)
obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
return obj return obj
def Serialize(self): def Serialize(self):
...@@ -149,6 +153,7 @@ class _QueuedOpCode(object): ...@@ -149,6 +153,7 @@ class _QueuedOpCode(object):
"start_timestamp": self.start_timestamp, "start_timestamp": self.start_timestamp,
"exec_timestamp": self.exec_timestamp, "exec_timestamp": self.exec_timestamp,
"end_timestamp": self.end_timestamp, "end_timestamp": self.end_timestamp,
"priority": self.priority,
} }
...@@ -302,6 +307,24 @@ class _QueuedJob(object): ...@@ -302,6 +307,24 @@ class _QueuedJob(object):
return status return status
def CalcPriority(self):
"""Gets the current priority for this job.
Only unfinished opcodes are considered. When all are done, the default
priority is used.
@rtype: int
priorities = [op.priority for op in self.ops
if op.status not in constants.OPS_FINALIZED]
if not priorities:
# All opcodes are done, assume default priority
return constants.OP_PRIO_DEFAULT
return min(priorities)
def GetLogEntries(self, newer_than): def GetLogEntries(self, newer_than):
"""Selectively returns the log entries. """Selectively returns the log entries.
...@@ -117,10 +117,11 @@ class OpCode(BaseOpCode): ...@@ -117,10 +117,11 @@ class OpCode(BaseOpCode):
children of this class. children of this class.
@ivar dry_run: Whether the LU should be run in dry-run mode, i.e. just @ivar dry_run: Whether the LU should be run in dry-run mode, i.e. just
the check steps the check steps
@ivar priority: Opcode priority for queue
""" """
__slots__ = ["dry_run", "debug_level"] __slots__ = ["dry_run", "debug_level", "priority"]
def __getstate__(self): def __getstate__(self):
"""Specialized getstate for opcodes. """Specialized getstate for opcodes.
...@@ -32,6 +32,8 @@ from ganeti import constants ...@@ -32,6 +32,8 @@ from ganeti import constants
from ganeti import utils from ganeti import utils
from ganeti import errors from ganeti import errors
from ganeti import jqueue from ganeti import jqueue
from ganeti import opcodes
from ganeti import compat
import testutils import testutils
...@@ -239,5 +241,103 @@ class TestEncodeOpError(unittest.TestCase): ...@@ -239,5 +241,103 @@ class TestEncodeOpError(unittest.TestCase):
self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr) self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
class TestQueuedOpCode(unittest.TestCase):
def testDefaults(self):
def _Check(op):
self.assertFalse(hasattr(op.input, "dry_run"))
self.assertEqual(op.priority, constants.OP_PRIO_DEFAULT)
self.assert_(op.start_timestamp is None)
self.assert_(op.exec_timestamp is None)
self.assert_(op.end_timestamp is None)
self.assert_(op.result is None)
self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
op1 = jqueue._QueuedOpCode(opcodes.OpTestDelay())
op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
self.assertEqual(op1.Serialize(), op2.Serialize())
def testPriority(self):
def _Check(op):
assert constants.OP_PRIO_DEFAULT != constants.OP_PRIO_HIGH, \
"Default priority equals high priority; test can't work"
self.assertEqual(op.priority, constants.OP_PRIO_HIGH)
self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
inpop = opcodes.OpGetTags(priority=constants.OP_PRIO_HIGH)
op1 = jqueue._QueuedOpCode(inpop)
op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
self.assertEqual(op1.Serialize(), op2.Serialize())
class TestQueuedJob(unittest.TestCase):
def testDefaults(self):
job_id = 4260
ops = [
def _Check(job):
self.assertEqual(, job_id)
self.assertEqual(job.log_serial, 0)
self.assert_(job.start_timestamp is None)
self.assert_(job.end_timestamp is None)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
self.assertEqual(len(job.ops), len(ops))
self.assert_(compat.all(inp.__getstate__() == op.input.__getstate__()
for (inp, op) in zip(ops, job.ops)))
job1 = jqueue._QueuedJob(None, job_id, ops)
job2 = jqueue._QueuedJob.Restore(None, job1.Serialize())
self.assertEqual(job1.Serialize(), job2.Serialize())
def testPriority(self):
job_id = 4283
ops = [
def _Check(job):
self.assertEqual(, job_id)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
job = jqueue._QueuedJob(None, job_id, ops)
self.assert_(compat.all(op.priority == constants.OP_PRIO_DEFAULT
for op in job.ops))
self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
# Increase first
job.ops[0].priority -= 1
self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 1)
# Mark opcode as finished
job.ops[0].status = constants.OP_STATUS_SUCCESS
self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
# Increase second
job.ops[1].priority -= 10
self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 10)
# Test increasing first
job.ops[0].status = constants.OP_STATUS_RUNNING
job.ops[0].priority -= 19
self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20)
if __name__ == "__main__": if __name__ == "__main__":
testutils.GanetiTestProgram() testutils.GanetiTestProgram()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment