From be760ba897f4b081bb4a4051957a3ffccf3af858 Mon Sep 17 00:00:00 2001
From: Michael Hanselmann <hansmi@google.com>
Date: Mon, 20 Sep 2010 14:40:31 +0200
Subject: [PATCH] jqueue: Change model from per-job to per-opcode processing

In order to support priorities, the processing of jobs needs to be
changed. Instead of processing jobs as a whole, the code is changed to
process one opcode at a time and then return to the queue. See the
Ganeti 2.3 design document for details.

This patch does not yet use priorities for acquiring locks.

The enclosed unittests increase the test coverage of jqueue.py from
about 34% to 58%. Please note that they also test some parts not added
by this patch, but testing them became only possible with some
infrastructure added by this patch. For the first time, many
implications and assumptions for the job queue are codified in these
unittests.

Signed-off-by: Michael Hanselmann <hansmi@google.com>
Reviewed-by: Iustin Pop <iustin@google.com>
---
 lib/jqueue.py                  | 333 ++++++++++++++--------
 lib/opcodes.py                 |  12 +
 test/ganeti.jqueue_unittest.py | 500 +++++++++++++++++++++++++++++++++
 3 files changed, 732 insertions(+), 113 deletions(-)

diff --git a/lib/jqueue.py b/lib/jqueue.py
index c2d1a2b34..c0b6cf825 100644
--- a/lib/jqueue.py
+++ b/lib/jqueue.py
@@ -176,7 +176,7 @@ class _QueuedJob(object):
 
   """
   # pylint: disable-msg=W0212
-  __slots__ = ["queue", "id", "ops", "log_serial",
+  __slots__ = ["queue", "id", "ops", "log_serial", "current_op",
                "received_timestamp", "start_timestamp", "end_timestamp",
                "__weakref__"]
 
@@ -203,6 +203,8 @@ class _QueuedJob(object):
     self.start_timestamp = None
     self.end_timestamp = None
 
+    self.current_op = None
+
   def __repr__(self):
     status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
               "id=%s" % self.id,
@@ -237,6 +239,8 @@ class _QueuedJob(object):
         obj.log_serial = max(obj.log_serial, log_entry[0])
       obj.ops.append(op)
 
+    obj.current_op = None
+
     return obj
 
   def Serialize(self):
@@ -734,6 +738,211 @@ def _EncodeOpError(err):
   return errors.EncodeException(to_encode)
 
 
+class _JobProcessor(object):
+  def __init__(self, queue, opexec_fn, job):
+    """Initializes this class.
+
+    """
+    self.queue = queue
+    self.opexec_fn = opexec_fn
+    self.job = job
+
+  @staticmethod
+  def _FindNextOpcode(job):
+    """Locates the next opcode to run.
+
+    @type job: L{_QueuedJob}
+    @param job: Job object
+
+    """
+    # Create some sort of a cache to speed up locating next opcode for future
+    # lookups
+    # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
+    # pending and one for processed ops.
+    if job.current_op is None:
+      job.current_op = enumerate(job.ops)
+
+    # Find next opcode to run
+    while True:
+      try:
+        (idx, op) = job.current_op.next()
+      except StopIteration:
+        raise errors.ProgrammerError("Called for a finished job")
+
+      if op.status == constants.OP_STATUS_RUNNING:
+        # Found an opcode already marked as running
+        raise errors.ProgrammerError("Called for job marked as running")
+
+      log_prefix = "Op %s/%s" % (idx + 1, len(job.ops))
+      summary = op.input.Summary()
+
+      if op.status == constants.OP_STATUS_CANCELED:
+        # Cancelled jobs are handled by the caller
+        assert not compat.any(i.status != constants.OP_STATUS_CANCELED
+                              for i in job.ops[idx:])
+
+      elif op.status in constants.OPS_FINALIZED:
+        # This is a job that was partially completed before master daemon
+        # shutdown, so it can be expected that some opcodes are already
+        # completed successfully (if any did error out, then the whole job
+        # should have been aborted and not resubmitted for processing).
+        logging.info("%s: opcode %s already processed, skipping",
+                     log_prefix, summary)
+        continue
+
+      return (idx, op, log_prefix, summary)
+
+  @staticmethod
+  def _MarkWaitlock(job, op):
+    """Marks an opcode as waiting for locks.
+
+    The job's start timestamp is also set if necessary.
+
+    @type job: L{_QueuedJob}
+    @param job: Job object
+    @type job: L{_QueuedOpCode}
+    @param job: Opcode object
+
+    """
+    assert op in job.ops
+
+    op.status = constants.OP_STATUS_WAITLOCK
+    op.result = None
+    op.start_timestamp = TimeStampNow()
+
+    if job.start_timestamp is None:
+      job.start_timestamp = op.start_timestamp
+
+  def _ExecOpCodeUnlocked(self, log_prefix, op, summary):
+    """Processes one opcode and returns the result.
+
+    """
+    assert op.status == constants.OP_STATUS_WAITLOCK
+
+    try:
+      # Make sure not to hold queue lock while calling ExecOpCode
+      result = self.opexec_fn(op.input,
+                              _OpExecCallbacks(self.queue, self.job, op))
+    except CancelJob:
+      logging.exception("%s: Canceling job", log_prefix)
+      assert op.status == constants.OP_STATUS_CANCELING
+      return (constants.OP_STATUS_CANCELING, None)
+    except Exception, err: # pylint: disable-msg=W0703
+      logging.exception("%s: Caught exception in %s", log_prefix, summary)
+      return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
+    else:
+      logging.debug("%s: %s successful", log_prefix, summary)
+      return (constants.OP_STATUS_SUCCESS, result)
+
+  def __call__(self):
+    """Continues execution of a job.
+
+    @rtype: bool
+    @return: True if job is finished, False if processor needs to be called
+             again
+
+    """
+    queue = self.queue
+    job = self.job
+
+    logging.debug("Processing job %s", job.id)
+
+    queue.acquire(shared=1)
+    try:
+      opcount = len(job.ops)
+
+      (opidx, op, log_prefix, op_summary) = self._FindNextOpcode(job)
+
+      # Consistency check
+      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
+                                     constants.OP_STATUS_CANCELED)
+                        for i in job.ops[opidx:])
+
+      assert op.status in (constants.OP_STATUS_QUEUED,
+                           constants.OP_STATUS_WAITLOCK,
+                           constants.OP_STATUS_CANCELED)
+
+      if op.status != constants.OP_STATUS_CANCELED:
+        # Prepare to start opcode
+        self._MarkWaitlock(job, op)
+
+        assert op.status == constants.OP_STATUS_WAITLOCK
+        assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
+
+        # Write to disk
+        queue.UpdateJobUnlocked(job)
+
+        logging.info("%s: opcode %s waiting for locks", log_prefix, op_summary)
+
+        queue.release()
+        try:
+          (op_status, op_result) = \
+            self._ExecOpCodeUnlocked(log_prefix, op, op_summary)
+        finally:
+          queue.acquire(shared=1)
+
+        # Finalize opcode
+        op.end_timestamp = TimeStampNow()
+        op.status = op_status
+        op.result = op_result
+
+        if op.status == constants.OP_STATUS_CANCELING:
+          assert not compat.any(i.status != constants.OP_STATUS_CANCELING
+                                for i in job.ops[opidx:])
+        else:
+          assert op.status in constants.OPS_FINALIZED
+
+      # Ensure all opcodes so far have been successful
+      assert (opidx == 0 or
+              compat.all(i.status == constants.OP_STATUS_SUCCESS
+                         for i in job.ops[:opidx]))
+
+      if op.status == constants.OP_STATUS_SUCCESS:
+        finalize = False
+
+      elif op.status == constants.OP_STATUS_ERROR:
+        # Ensure failed opcode has an exception as its result
+        assert errors.GetEncodedError(job.ops[opidx].result)
+
+        to_encode = errors.OpExecError("Preceding opcode failed")
+        job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
+                              _EncodeOpError(to_encode))
+        finalize = True
+
+        # Consistency check
+        assert compat.all(i.status == constants.OP_STATUS_ERROR and
+                          errors.GetEncodedError(i.result)
+                          for i in job.ops[opidx:])
+
+      elif op.status == constants.OP_STATUS_CANCELING:
+        job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
+                              "Job canceled by request")
+        finalize = True
+
+      elif op.status == constants.OP_STATUS_CANCELED:
+        finalize = True
+
+      else:
+        raise errors.ProgrammerError("Unknown status '%s'" % op.status)
+
+      # Finalizing or last opcode?
+      if finalize or opidx == (opcount - 1):
+        # All opcodes have been run, finalize job
+        job.end_timestamp = TimeStampNow()
+
+      # Write to disk. If the job status is final, this is the final write
+      # allowed. Once the file has been written, it can be archived anytime.
+      queue.UpdateJobUnlocked(job)
+
+      if finalize or opidx == (opcount - 1):
+        logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
+        return True
+
+      return False
+    finally:
+      queue.release()
+
+
 class _JobQueueWorker(workerpool.BaseWorker):
   """The actual job workers.
 
@@ -741,125 +950,23 @@ class _JobQueueWorker(workerpool.BaseWorker):
   def RunTask(self, job): # pylint: disable-msg=W0221
     """Job executor.
 
-    This functions processes a job. It is closely tied to the _QueuedJob and
-    _QueuedOpCode classes.
+    This functions processes a job. It is closely tied to the L{_QueuedJob} and
+    L{_QueuedOpCode} classes.
 
     @type job: L{_QueuedJob}
     @param job: the job to be processed
 
     """
+    queue = job.queue
+    assert queue == self.pool.queue
+
     self.SetTaskName("Job%s" % job.id)
 
-    logging.info("Processing job %s", job.id)
-    proc = mcpu.Processor(self.pool.queue.context, job.id)
-    queue = job.queue
-    try:
-      try:
-        count = len(job.ops)
-        for idx, op in enumerate(job.ops):
-          op_summary = op.input.Summary()
-          if op.status == constants.OP_STATUS_SUCCESS:
-            # this is a job that was partially completed before master
-            # daemon shutdown, so it can be expected that some opcodes
-            # are already completed successfully (if any did error
-            # out, then the whole job should have been aborted and not
-            # resubmitted for processing)
-            logging.info("Op %s/%s: opcode %s already processed, skipping",
-                         idx + 1, count, op_summary)
-            continue
-          try:
-            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
-                         op_summary)
-
-            queue.acquire(shared=1)
-            try:
-              if op.status == constants.OP_STATUS_CANCELED:
-                logging.debug("Canceling opcode")
-                raise CancelJob()
-              assert op.status == constants.OP_STATUS_QUEUED
-              logging.debug("Opcode %s/%s waiting for locks",
-                            idx + 1, count)
-              op.status = constants.OP_STATUS_WAITLOCK
-              op.result = None
-              op.start_timestamp = TimeStampNow()
-              if idx == 0: # first opcode
-                job.start_timestamp = op.start_timestamp
-              queue.UpdateJobUnlocked(job)
-
-              input_opcode = op.input
-            finally:
-              queue.release()
-
-            # Make sure not to hold queue lock while calling ExecOpCode
-            result = proc.ExecOpCode(input_opcode,
-                                     _OpExecCallbacks(queue, job, op))
-
-            queue.acquire(shared=1)
-            try:
-              logging.debug("Opcode %s/%s succeeded", idx + 1, count)
-              op.status = constants.OP_STATUS_SUCCESS
-              op.result = result
-              op.end_timestamp = TimeStampNow()
-              if idx == count - 1:
-                job.end_timestamp = TimeStampNow()
-
-                # Consistency check
-                assert compat.all(i.status == constants.OP_STATUS_SUCCESS
-                                  for i in job.ops)
-
-              queue.UpdateJobUnlocked(job)
-            finally:
-              queue.release()
-
-            logging.info("Op %s/%s: Successfully finished opcode %s",
-                         idx + 1, count, op_summary)
-          except CancelJob:
-            # Will be handled further up
-            raise
-          except Exception, err:
-            queue.acquire(shared=1)
-            try:
-              try:
-                logging.debug("Opcode %s/%s failed", idx + 1, count)
-                op.status = constants.OP_STATUS_ERROR
-                op.result = _EncodeOpError(err)
-                op.end_timestamp = TimeStampNow()
-                logging.info("Op %s/%s: Error in opcode %s: %s",
-                             idx + 1, count, op_summary, err)
-
-                to_encode = errors.OpExecError("Preceding opcode failed")
-                job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
-                                      _EncodeOpError(to_encode))
-
-                # Consistency check
-                assert compat.all(i.status == constants.OP_STATUS_SUCCESS
-                                  for i in job.ops[:idx])
-                assert compat.all(i.status == constants.OP_STATUS_ERROR and
-                                  errors.GetEncodedError(i.result)
-                                  for i in job.ops[idx:])
-              finally:
-                job.end_timestamp = TimeStampNow()
-                queue.UpdateJobUnlocked(job)
-            finally:
-              queue.release()
-            raise
-
-      except CancelJob:
-        queue.acquire(shared=1)
-        try:
-          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
-                                "Job canceled by request")
-          job.end_timestamp = TimeStampNow()
-          queue.UpdateJobUnlocked(job)
-        finally:
-          queue.release()
-      except errors.GenericError, err:
-        logging.exception("Ganeti exception")
-      except:
-        logging.exception("Unhandled exception")
-    finally:
-      status = job.CalcStatus()
-      logging.info("Finished job %s, status = %s", job.id, status)
+    proc = mcpu.Processor(queue.context, job.id)
+
+    if not _JobProcessor(queue, proc.ExecOpCode, job)():
+      # Schedule again
+      raise workerpool.DeferTask()
 
 
 class _JobQueueWorkerPool(workerpool.WorkerPool):
diff --git a/lib/opcodes.py b/lib/opcodes.py
index 7a2ccf80c..71c01c3e1 100644
--- a/lib/opcodes.py
+++ b/lib/opcodes.py
@@ -802,6 +802,18 @@ class OpTestJobqueue(OpCode):
     ]
 
 
+class OpTestDummy(OpCode):
+  """Utility opcode used by unittests.
+
+  """
+  OP_ID = "OP_TEST_DUMMY"
+  __slots__ = [
+    "result",
+    "messages",
+    "fail",
+    ]
+
+
 OP_MAPPING = dict([(v.OP_ID, v) for v in globals().values()
                    if (isinstance(v, type) and issubclass(v, OpCode) and
                        hasattr(v, "OP_ID"))])
diff --git a/test/ganeti.jqueue_unittest.py b/test/ganeti.jqueue_unittest.py
index 2a7b61c3b..c83157509 100755
--- a/test/ganeti.jqueue_unittest.py
+++ b/test/ganeti.jqueue_unittest.py
@@ -347,5 +347,505 @@ class TestQueuedJob(unittest.TestCase):
     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20)
 
 
+class _FakeQueueForProc:
+  def __init__(self):
+    self._acquired = False
+
+  def IsAcquired(self):
+    return self._acquired
+
+  def acquire(self, shared=0):
+    assert shared == 1
+    self._acquired = True
+
+  def release(self):
+    assert self._acquired
+    self._acquired = False
+
+  def UpdateJobUnlocked(self, job, replicate=None):
+    # TODO: Ensure job is updated at the correct places
+    pass
+
+
+class _FakeExecOpCodeForProc:
+  def __init__(self, before_start, after_start):
+    self._before_start = before_start
+    self._after_start = after_start
+
+  def __call__(self, op, cbs):
+    assert isinstance(op, opcodes.OpTestDummy)
+
+    if self._before_start:
+      self._before_start()
+
+    cbs.NotifyStart()
+
+    if self._after_start:
+      self._after_start(op, cbs)
+
+    if op.fail:
+      raise errors.OpExecError("Error requested (%s)" % op.result)
+
+    return op.result
+
+
+class TestJobProcessor(unittest.TestCase):
+  def _CreateJob(self, queue, job_id, ops):
+    job = jqueue._QueuedJob(queue, job_id, ops)
+    self.assertFalse(job.start_timestamp)
+    self.assertFalse(job.end_timestamp)
+    self.assertEqual(len(ops), len(job.ops))
+    self.assert_(compat.all(op.input == inp
+                            for (op, inp) in zip(job.ops, ops)))
+    self.assertEqual(job.GetInfo(["ops"]), [[op.__getstate__() for op in ops]])
+    return job
+
+  def _GenericCheckJob(self, job):
+    assert compat.all(isinstance(op.input, opcodes.OpTestDummy)
+                      for op in job.ops)
+
+    self.assertEqual(job.GetInfo(["opstart", "opexec", "opend"]),
+                     [[op.start_timestamp for op in job.ops],
+                      [op.exec_timestamp for op in job.ops],
+                      [op.end_timestamp for op in job.ops]])
+    self.assertEqual(job.GetInfo(["received_ts", "start_ts", "end_ts"]),
+                     [job.received_timestamp,
+                      job.start_timestamp,
+                      job.end_timestamp])
+    self.assert_(job.start_timestamp)
+    self.assert_(job.end_timestamp)
+    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
+
+  def testSuccess(self):
+    queue = _FakeQueueForProc()
+
+    for (job_id, opcount) in [(25351, 1), (6637, 3),
+                              (24644, 10), (32207, 100)]:
+      ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
+             for i in range(opcount)]
+
+      # Create job
+      job = self._CreateJob(queue, job_id, ops)
+
+      def _BeforeStart():
+        self.assertFalse(queue.IsAcquired())
+        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+
+      def _AfterStart(op, cbs):
+        self.assertFalse(queue.IsAcquired())
+        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
+
+        # Job is running, cancelling shouldn't be possible
+        (success, _) = job.Cancel()
+        self.assertFalse(success)
+
+      opexec = _FakeExecOpCodeForProc(_BeforeStart, _AfterStart)
+
+      for idx in range(len(ops)):
+        result = jqueue._JobProcessor(queue, opexec, job)()
+        if idx == len(ops) - 1:
+          # Last opcode
+          self.assert_(result)
+        else:
+          self.assertFalse(result)
+
+          self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+          self.assert_(job.start_timestamp)
+          self.assertFalse(job.end_timestamp)
+
+      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
+      self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
+      self.assertEqual(job.GetInfo(["opresult"]),
+                       [[op.input.result for op in job.ops]])
+      self.assertEqual(job.GetInfo(["opstatus"]),
+                       [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
+      self.assert_(compat.all(op.start_timestamp and op.end_timestamp
+                              for op in job.ops))
+
+      self._GenericCheckJob(job)
+
+      # Finished jobs can't be processed any further
+      self.assertRaises(errors.ProgrammerError,
+                        jqueue._JobProcessor(queue, opexec, job))
+
+  def testOpcodeError(self):
+    queue = _FakeQueueForProc()
+
+    testdata = [
+      (17077, 1, 0, 0),
+      (1782, 5, 2, 2),
+      (18179, 10, 9, 9),
+      (4744, 10, 3, 8),
+      (23816, 100, 39, 45),
+      ]
+
+    for (job_id, opcount, failfrom, failto) in testdata:
+      # Prepare opcodes
+      ops = [opcodes.OpTestDummy(result="Res%s" % i,
+                                 fail=(failfrom <= i and
+                                       i <= failto))
+             for i in range(opcount)]
+
+      # Create job
+      job = self._CreateJob(queue, job_id, ops)
+
+      opexec = _FakeExecOpCodeForProc(None, None)
+
+      for idx in range(len(ops)):
+        result = jqueue._JobProcessor(queue, opexec, job)()
+
+        if idx in (failfrom, len(ops) - 1):
+          # Last opcode
+          self.assert_(result)
+          break
+
+        self.assertFalse(result)
+
+        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+
+      # Check job status
+      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
+      self.assertEqual(job.GetInfo(["id"]), [job_id])
+      self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
+
+      # Check opcode status
+      data = zip(job.ops,
+                 job.GetInfo(["opstatus"])[0],
+                 job.GetInfo(["opresult"])[0])
+
+      for idx, (op, opstatus, opresult) in enumerate(data):
+        if idx < failfrom:
+          assert not op.input.fail
+          self.assertEqual(opstatus, constants.OP_STATUS_SUCCESS)
+          self.assertEqual(opresult, op.input.result)
+        elif idx <= failto:
+          assert op.input.fail
+          self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
+          self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
+        else:
+          assert not op.input.fail
+          self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
+          self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
+
+      self.assert_(compat.all(op.start_timestamp and op.end_timestamp
+                              for op in job.ops[:failfrom]))
+
+      self._GenericCheckJob(job)
+
+      # Finished jobs can't be processed any further
+      self.assertRaises(errors.ProgrammerError,
+                        jqueue._JobProcessor(queue, opexec, job))
+
+  def testCancelWhileInQueue(self):
+    queue = _FakeQueueForProc()
+
+    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
+           for i in range(5)]
+
+    # Create job
+    job_id = 17045
+    job = self._CreateJob(queue, job_id, ops)
+
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+
+    # Mark as cancelled
+    (success, _) = job.Cancel()
+    self.assert_(success)
+
+    self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELED
+                            for op in job.ops))
+
+    opexec = _FakeExecOpCodeForProc(None, None)
+    jqueue._JobProcessor(queue, opexec, job)()
+
+    # Check result
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
+    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
+    self.assertFalse(job.start_timestamp)
+    self.assert_(job.end_timestamp)
+    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
+                                for op in job.ops))
+    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
+                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
+                      ["Job canceled by request" for _ in job.ops]])
+
+  def testCancelWhileWaitlock(self):
+    queue = _FakeQueueForProc()
+
+    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
+           for i in range(5)]
+
+    # Create job
+    job_id = 11009
+    job = self._CreateJob(queue, job_id, ops)
+
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+
+    def _BeforeStart():
+      self.assertFalse(queue.IsAcquired())
+      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+
+      # Mark as cancelled
+      (success, _) = job.Cancel()
+      self.assert_(success)
+
+      self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
+                              for op in job.ops))
+
+    def _AfterStart(op, cbs):
+      self.assertFalse(queue.IsAcquired())
+      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
+
+    opexec = _FakeExecOpCodeForProc(_BeforeStart, _AfterStart)
+
+    jqueue._JobProcessor(queue, opexec, job)()
+
+    # Check result
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
+    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
+    self.assert_(job.start_timestamp)
+    self.assert_(job.end_timestamp)
+    self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
+                                for op in job.ops))
+    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
+                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
+                      ["Job canceled by request" for _ in job.ops]])
+
+  def testCancelWhileRunning(self):
+    # Tests canceling a job with finished opcodes and more, unprocessed ones
+    queue = _FakeQueueForProc()
+
+    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
+           for i in range(3)]
+
+    # Create job
+    job_id = 28492
+    job = self._CreateJob(queue, job_id, ops)
+
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+
+    opexec = _FakeExecOpCodeForProc(None, None)
+
+    # Run one opcode
+    self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
+
+    # Job goes back to queued
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
+                     [[constants.OP_STATUS_SUCCESS,
+                       constants.OP_STATUS_QUEUED,
+                       constants.OP_STATUS_QUEUED],
+                      ["Res0", None, None]])
+
+    # Mark as cancelled
+    (success, _) = job.Cancel()
+    self.assert_(success)
+
+    # Try processing another opcode (this will actually cancel the job)
+    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
+
+    # Check result
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
+    self.assertEqual(job.GetInfo(["id"]), [job_id])
+    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
+    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
+                     [[constants.OP_STATUS_SUCCESS,
+                       constants.OP_STATUS_CANCELED,
+                       constants.OP_STATUS_CANCELED],
+                      ["Res0", "Job canceled by request",
+                       "Job canceled by request"]])
+
+  def testPartiallyRun(self):
+    # Tests calling the processor on a job that's been partially run before the
+    # program was restarted
+    queue = _FakeQueueForProc()
+
+    opexec = _FakeExecOpCodeForProc(None, None)
+
+    for job_id, successcount in [(30697, 1), (2552, 4), (12489, 9)]:
+      ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
+             for i in range(10)]
+
+      # Create job
+      job = self._CreateJob(queue, job_id, ops)
+
+      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+
+      for _ in range(successcount):
+        self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
+
+      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+      self.assertEqual(job.GetInfo(["opstatus"]),
+                       [[constants.OP_STATUS_SUCCESS
+                         for _ in range(successcount)] +
+                        [constants.OP_STATUS_QUEUED
+                         for _ in range(len(ops) - successcount)]])
+
+      self.assert_(job.current_op)
+
+      # Serialize and restore (simulates program restart)
+      newjob = jqueue._QueuedJob.Restore(queue, job.Serialize())
+      self.assertFalse(newjob.current_op)
+      self._TestPartial(newjob, successcount)
+
+  def _TestPartial(self, job, successcount):
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
+
+    queue = _FakeQueueForProc()
+    opexec = _FakeExecOpCodeForProc(None, None)
+
+    for remaining in reversed(range(len(job.ops) - successcount)):
+      result = jqueue._JobProcessor(queue, opexec, job)()
+
+      if remaining == 0:
+        # Last opcode
+        self.assert_(result)
+        break
+
+      self.assertFalse(result)
+
+      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
+    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
+    self.assertEqual(job.GetInfo(["opresult"]),
+                     [[op.input.result for op in job.ops]])
+    self.assertEqual(job.GetInfo(["opstatus"]),
+                     [[constants.OP_STATUS_SUCCESS for _ in job.ops]])
+    self.assert_(compat.all(op.start_timestamp and op.end_timestamp
+                            for op in job.ops))
+
+    self._GenericCheckJob(job)
+
+    # Finished jobs can't be processed any further
+    self.assertRaises(errors.ProgrammerError,
+                      jqueue._JobProcessor(queue, opexec, job))
+
+    # ... also after being restored
+    job2 = jqueue._QueuedJob.Restore(queue, job.Serialize())
+    self.assertRaises(errors.ProgrammerError,
+                      jqueue._JobProcessor(queue, opexec, job2))
+
+  def testProcessorOnRunningJob(self):
+    ops = [opcodes.OpTestDummy(result="result", fail=False)]
+
+    queue = _FakeQueueForProc()
+    opexec = _FakeExecOpCodeForProc(None, None)
+
+    # Create job
+    job = self._CreateJob(queue, 9571, ops)
+
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+
+    job.ops[0].status = constants.OP_STATUS_RUNNING
+
+    assert len(job.ops) == 1
+
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
+
+    # Calling on running job must fail
+    self.assertRaises(errors.ProgrammerError,
+                      jqueue._JobProcessor(queue, opexec, job))
+
+  def testLogMessages(self):
+    # Tests the "Feedback" callback function
+    queue = _FakeQueueForProc()
+
+    messages = {
+      1: [
+        (None, "Hello"),
+        (None, "World"),
+        (constants.ELOG_MESSAGE, "there"),
+        ],
+      4: [
+        (constants.ELOG_JQUEUE_TEST, (1, 2, 3)),
+        (constants.ELOG_JQUEUE_TEST, ("other", "type")),
+        ],
+      }
+    ops = [opcodes.OpTestDummy(result="Logtest%s" % i, fail=False,
+                               messages=messages.get(i, []))
+           for i in range(5)]
+
+    # Create job
+    job = self._CreateJob(queue, 29386, ops)
+
+    def _BeforeStart():
+      self.assertFalse(queue.IsAcquired())
+      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+
+    def _AfterStart(op, cbs):
+      self.assertFalse(queue.IsAcquired())
+      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
+
+      self.assertRaises(AssertionError, cbs.Feedback,
+                        "too", "many", "arguments")
+
+      for (log_type, msg) in op.messages:
+        if log_type:
+          cbs.Feedback(log_type, msg)
+        else:
+          cbs.Feedback(msg)
+
+    opexec = _FakeExecOpCodeForProc(_BeforeStart, _AfterStart)
+
+    for remaining in reversed(range(len(job.ops))):
+      result = jqueue._JobProcessor(queue, opexec, job)()
+
+      if remaining == 0:
+        # Last opcode
+        self.assert_(result)
+        break
+
+      self.assertFalse(result)
+
+      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
+    self.assertEqual(job.GetInfo(["opresult"]),
+                     [[op.input.result for op in job.ops]])
+
+    logmsgcount = sum(len(m) for m in messages.values())
+
+    self._CheckLogMessages(job, logmsgcount)
+
+    # Serialize and restore (simulates program restart)
+    newjob = jqueue._QueuedJob.Restore(queue, job.Serialize())
+    self._CheckLogMessages(newjob, logmsgcount)
+
+    # Check each message
+    prevserial = -1
+    for idx, oplog in enumerate(job.GetInfo(["oplog"])[0]):
+      for (serial, timestamp, log_type, msg) in oplog:
+        (exptype, expmsg) = messages.get(idx).pop(0)
+        if exptype:
+          self.assertEqual(log_type, exptype)
+        else:
+          self.assertEqual(log_type, constants.ELOG_MESSAGE)
+        self.assertEqual(expmsg, msg)
+        self.assert_(serial > prevserial)
+        prevserial = serial
+
+  def _CheckLogMessages(self, job, count):
+    # Check serial
+    self.assertEqual(job.log_serial, count)
+
+    # No filter
+    self.assertEqual(job.GetLogEntries(None),
+                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
+                      for entry in entries])
+
+    # Filter with serial
+    assert count > 3
+    self.assert_(job.GetLogEntries(3))
+    self.assertEqual(job.GetLogEntries(3),
+                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
+                      for entry in entries][3:])
+
+    # No log message after highest serial
+    self.assertFalse(job.GetLogEntries(count))
+    self.assertFalse(job.GetLogEntries(count + 3))
+
+
 if __name__ == "__main__":
   testutils.GanetiTestProgram()
-- 
GitLab