From 8f5c488dd1bce2e4794adcf77598db0fde1c895f Mon Sep 17 00:00:00 2001
From: Michael Hanselmann <hansmi@google.com>
Date: Thu, 2 Sep 2010 18:04:50 +0200
Subject: [PATCH] Add support for job priority to opcodes and job queue objects

This allows clients to submit opcodes with a priority. Except for being
tracked by the job queue, it is not yet used by any code.

Unittests for jqueue._QueuedOpCode and jqueue._QueuedJob are provided for
the first time.

Signed-off-by: Michael Hanselmann <hansmi@google.com>
Reviewed-by: Iustin Pop <iustin@google.com>
---
 lib/jqueue.py                  |  25 ++++++++-
 lib/opcodes.py                 |   3 +-
 test/ganeti.jqueue_unittest.py | 100 +++++++++++++++++++++++++++++++++
 3 files changed, 126 insertions(+), 2 deletions(-)

diff --git a/lib/jqueue.py b/lib/jqueue.py
index fcd42e27f..fbcb19675 100644
--- a/lib/jqueue.py
+++ b/lib/jqueue.py
@@ -95,7 +95,7 @@ class _QueuedOpCode(object):
   @ivar stop_timestamp: timestamp for the end of the execution
 
   """
-  __slots__ = ["input", "status", "result", "log",
+  __slots__ = ["input", "status", "result", "log", "priority",
                "start_timestamp", "exec_timestamp", "end_timestamp",
                "__weakref__"]
 
@@ -114,6 +114,9 @@ class _QueuedOpCode(object):
     self.exec_timestamp = None
     self.end_timestamp = None
 
+    # Get initial priority (it might change during the lifetime of this opcode)
+    self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
+
   @classmethod
   def Restore(cls, state):
     """Restore the _QueuedOpCode from the serialized form.
@@ -132,6 +135,7 @@ class _QueuedOpCode(object):
     obj.start_timestamp = state.get("start_timestamp", None)
     obj.exec_timestamp = state.get("exec_timestamp", None)
     obj.end_timestamp = state.get("end_timestamp", None)
+    obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
     return obj
 
   def Serialize(self):
@@ -149,6 +153,7 @@ class _QueuedOpCode(object):
       "start_timestamp": self.start_timestamp,
       "exec_timestamp": self.exec_timestamp,
       "end_timestamp": self.end_timestamp,
+      "priority": self.priority,
       }
 
 
@@ -302,6 +307,24 @@ class _QueuedJob(object):
 
     return status
 
+  def CalcPriority(self):
+    """Gets the current priority for this job.
+
+    Only unfinished opcodes are considered. When all are done, the default
+    priority is used.
+
+    @rtype: int
+
+    """
+    priorities = [op.priority for op in self.ops
+                  if op.status not in constants.OPS_FINALIZED]
+
+    if not priorities:
+      # All opcodes are done, assume default priority
+      return constants.OP_PRIO_DEFAULT
+
+    return min(priorities)
+
   def GetLogEntries(self, newer_than):
     """Selectively returns the log entries.
 
diff --git a/lib/opcodes.py b/lib/opcodes.py
index 34ed717f2..0f633b67f 100644
--- a/lib/opcodes.py
+++ b/lib/opcodes.py
@@ -117,10 +117,11 @@ class OpCode(BaseOpCode):
                children of this class.
   @ivar dry_run: Whether the LU should be run in dry-run mode, i.e. just
                  the check steps
+  @ivar priority: Opcode priority for queue
 
   """
   OP_ID = "OP_ABSTRACT"
-  __slots__ = ["dry_run", "debug_level"]
+  __slots__ = ["dry_run", "debug_level", "priority"]
 
   def __getstate__(self):
     """Specialized getstate for opcodes.
diff --git a/test/ganeti.jqueue_unittest.py b/test/ganeti.jqueue_unittest.py
index e6746eec5..ab6ce2225 100755
--- a/test/ganeti.jqueue_unittest.py
+++ b/test/ganeti.jqueue_unittest.py
@@ -32,6 +32,8 @@ from ganeti import constants
 from ganeti import utils
 from ganeti import errors
 from ganeti import jqueue
+from ganeti import opcodes
+from ganeti import compat
 
 import testutils
 
@@ -239,5 +241,103 @@ class TestEncodeOpError(unittest.TestCase):
     self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
 
 
+class TestQueuedOpCode(unittest.TestCase):
+  def testDefaults(self):
+    def _Check(op):
+      self.assertFalse(hasattr(op.input, "dry_run"))
+      self.assertEqual(op.priority, constants.OP_PRIO_DEFAULT)
+      self.assertFalse(op.log)
+      self.assert_(op.start_timestamp is None)
+      self.assert_(op.exec_timestamp is None)
+      self.assert_(op.end_timestamp is None)
+      self.assert_(op.result is None)
+      self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
+
+    op1 = jqueue._QueuedOpCode(opcodes.OpTestDelay())
+    _Check(op1)
+    op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
+    _Check(op2)
+    self.assertEqual(op1.Serialize(), op2.Serialize())
+
+  def testPriority(self):
+    def _Check(op):
+      assert constants.OP_PRIO_DEFAULT != constants.OP_PRIO_HIGH, \
+             "Default priority equals high priority; test can't work"
+      self.assertEqual(op.priority, constants.OP_PRIO_HIGH)
+      self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
+
+    inpop = opcodes.OpGetTags(priority=constants.OP_PRIO_HIGH)
+    op1 = jqueue._QueuedOpCode(inpop)
+    _Check(op1)
+    op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
+    _Check(op2)
+    self.assertEqual(op1.Serialize(), op2.Serialize())
+
+
+class TestQueuedJob(unittest.TestCase):
+  def testDefaults(self):
+    job_id = 4260
+    ops = [
+      opcodes.OpGetTags(),
+      opcodes.OpTestDelay(),
+      ]
+
+    def _Check(job):
+      self.assertEqual(job.id, job_id)
+      self.assertEqual(job.log_serial, 0)
+      self.assert_(job.received_timestamp)
+      self.assert_(job.start_timestamp is None)
+      self.assert_(job.end_timestamp is None)
+      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+      self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
+      self.assert_(repr(job).startswith("<"))
+      self.assertEqual(len(job.ops), len(ops))
+      self.assert_(compat.all(inp.__getstate__() == op.input.__getstate__()
+                              for (inp, op) in zip(ops, job.ops)))
+
+    job1 = jqueue._QueuedJob(None, job_id, ops)
+    _Check(job1)
+    job2 = jqueue._QueuedJob.Restore(None, job1.Serialize())
+    _Check(job2)
+    self.assertEqual(job1.Serialize(), job2.Serialize())
+
+  def testPriority(self):
+    job_id = 4283
+    ops = [
+      opcodes.OpGetTags(priority=constants.OP_PRIO_DEFAULT),
+      opcodes.OpTestDelay(),
+      ]
+
+    def _Check(job):
+      self.assertEqual(job.id, job_id)
+      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+      self.assert_(repr(job).startswith("<"))
+
+    job = jqueue._QueuedJob(None, job_id, ops)
+    _Check(job)
+    self.assert_(compat.all(op.priority == constants.OP_PRIO_DEFAULT
+                            for op in job.ops))
+    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
+
+    # Increase first
+    job.ops[0].priority -= 1
+    _Check(job)
+    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 1)
+
+    # Mark opcode as finished
+    job.ops[0].status = constants.OP_STATUS_SUCCESS
+    _Check(job)
+    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
+
+    # Increase second
+    job.ops[1].priority -= 10
+    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 10)
+
+    # Test increasing first
+    job.ops[0].status = constants.OP_STATUS_RUNNING
+    job.ops[0].priority -= 19
+    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20)
+
+
 if __name__ == "__main__":
   testutils.GanetiTestProgram()
-- 
GitLab