diff --git a/lib/jqueue.py b/lib/jqueue.py index c2d1a2b341c7af662bff74cec62d7a9deb034c64..c0b6cf825038fe44342466c43400c4f84a0720b4 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -176,7 +176,7 @@ class _QueuedJob(object): """ # pylint: disable-msg=W0212 - __slots__ = ["queue", "id", "ops", "log_serial", + __slots__ = ["queue", "id", "ops", "log_serial", "current_op", "received_timestamp", "start_timestamp", "end_timestamp", "__weakref__"] @@ -203,6 +203,8 @@ class _QueuedJob(object): self.start_timestamp = None self.end_timestamp = None + self.current_op = None + def __repr__(self): status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__), "id=%s" % self.id, @@ -237,6 +239,8 @@ class _QueuedJob(object): obj.log_serial = max(obj.log_serial, log_entry[0]) obj.ops.append(op) + obj.current_op = None + return obj def Serialize(self): @@ -734,6 +738,211 @@ def _EncodeOpError(err): return errors.EncodeException(to_encode) +class _JobProcessor(object): + def __init__(self, queue, opexec_fn, job): + """Initializes this class. + + """ + self.queue = queue + self.opexec_fn = opexec_fn + self.job = job + + @staticmethod + def _FindNextOpcode(job): + """Locates the next opcode to run. + + @type job: L{_QueuedJob} + @param job: Job object + + """ + # Create some sort of a cache to speed up locating next opcode for future + # lookups + # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for + # pending and one for processed ops. + if job.current_op is None: + job.current_op = enumerate(job.ops) + + # Find next opcode to run + while True: + try: + (idx, op) = job.current_op.next() + except StopIteration: + raise errors.ProgrammerError("Called for a finished job") + + if op.status == constants.OP_STATUS_RUNNING: + # Found an opcode already marked as running + raise errors.ProgrammerError("Called for job marked as running") + + log_prefix = "Op %s/%s" % (idx + 1, len(job.ops)) + summary = op.input.Summary() + + if op.status == constants.OP_STATUS_CANCELED: + # Cancelled jobs are handled by the caller + assert not compat.any(i.status != constants.OP_STATUS_CANCELED + for i in job.ops[idx:]) + + elif op.status in constants.OPS_FINALIZED: + # This is a job that was partially completed before master daemon + # shutdown, so it can be expected that some opcodes are already + # completed successfully (if any did error out, then the whole job + # should have been aborted and not resubmitted for processing). + logging.info("%s: opcode %s already processed, skipping", + log_prefix, summary) + continue + + return (idx, op, log_prefix, summary) + + @staticmethod + def _MarkWaitlock(job, op): + """Marks an opcode as waiting for locks. + + The job's start timestamp is also set if necessary. + + @type job: L{_QueuedJob} + @param job: Job object + @type job: L{_QueuedOpCode} + @param job: Opcode object + + """ + assert op in job.ops + + op.status = constants.OP_STATUS_WAITLOCK + op.result = None + op.start_timestamp = TimeStampNow() + + if job.start_timestamp is None: + job.start_timestamp = op.start_timestamp + + def _ExecOpCodeUnlocked(self, log_prefix, op, summary): + """Processes one opcode and returns the result. + + """ + assert op.status == constants.OP_STATUS_WAITLOCK + + try: + # Make sure not to hold queue lock while calling ExecOpCode + result = self.opexec_fn(op.input, + _OpExecCallbacks(self.queue, self.job, op)) + except CancelJob: + logging.exception("%s: Canceling job", log_prefix) + assert op.status == constants.OP_STATUS_CANCELING + return (constants.OP_STATUS_CANCELING, None) + except Exception, err: # pylint: disable-msg=W0703 + logging.exception("%s: Caught exception in %s", log_prefix, summary) + return (constants.OP_STATUS_ERROR, _EncodeOpError(err)) + else: + logging.debug("%s: %s successful", log_prefix, summary) + return (constants.OP_STATUS_SUCCESS, result) + + def __call__(self): + """Continues execution of a job. + + @rtype: bool + @return: True if job is finished, False if processor needs to be called + again + + """ + queue = self.queue + job = self.job + + logging.debug("Processing job %s", job.id) + + queue.acquire(shared=1) + try: + opcount = len(job.ops) + + (opidx, op, log_prefix, op_summary) = self._FindNextOpcode(job) + + # Consistency check + assert compat.all(i.status in (constants.OP_STATUS_QUEUED, + constants.OP_STATUS_CANCELED) + for i in job.ops[opidx:]) + + assert op.status in (constants.OP_STATUS_QUEUED, + constants.OP_STATUS_WAITLOCK, + constants.OP_STATUS_CANCELED) + + if op.status != constants.OP_STATUS_CANCELED: + # Prepare to start opcode + self._MarkWaitlock(job, op) + + assert op.status == constants.OP_STATUS_WAITLOCK + assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK + + # Write to disk + queue.UpdateJobUnlocked(job) + + logging.info("%s: opcode %s waiting for locks", log_prefix, op_summary) + + queue.release() + try: + (op_status, op_result) = \ + self._ExecOpCodeUnlocked(log_prefix, op, op_summary) + finally: + queue.acquire(shared=1) + + # Finalize opcode + op.end_timestamp = TimeStampNow() + op.status = op_status + op.result = op_result + + if op.status == constants.OP_STATUS_CANCELING: + assert not compat.any(i.status != constants.OP_STATUS_CANCELING + for i in job.ops[opidx:]) + else: + assert op.status in constants.OPS_FINALIZED + + # Ensure all opcodes so far have been successful + assert (opidx == 0 or + compat.all(i.status == constants.OP_STATUS_SUCCESS + for i in job.ops[:opidx])) + + if op.status == constants.OP_STATUS_SUCCESS: + finalize = False + + elif op.status == constants.OP_STATUS_ERROR: + # Ensure failed opcode has an exception as its result + assert errors.GetEncodedError(job.ops[opidx].result) + + to_encode = errors.OpExecError("Preceding opcode failed") + job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, + _EncodeOpError(to_encode)) + finalize = True + + # Consistency check + assert compat.all(i.status == constants.OP_STATUS_ERROR and + errors.GetEncodedError(i.result) + for i in job.ops[opidx:]) + + elif op.status == constants.OP_STATUS_CANCELING: + job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED, + "Job canceled by request") + finalize = True + + elif op.status == constants.OP_STATUS_CANCELED: + finalize = True + + else: + raise errors.ProgrammerError("Unknown status '%s'" % op.status) + + # Finalizing or last opcode? + if finalize or opidx == (opcount - 1): + # All opcodes have been run, finalize job + job.end_timestamp = TimeStampNow() + + # Write to disk. If the job status is final, this is the final write + # allowed. Once the file has been written, it can be archived anytime. + queue.UpdateJobUnlocked(job) + + if finalize or opidx == (opcount - 1): + logging.info("Finished job %s, status = %s", job.id, job.CalcStatus()) + return True + + return False + finally: + queue.release() + + class _JobQueueWorker(workerpool.BaseWorker): """The actual job workers. @@ -741,125 +950,23 @@ class _JobQueueWorker(workerpool.BaseWorker): def RunTask(self, job): # pylint: disable-msg=W0221 """Job executor. - This functions processes a job. It is closely tied to the _QueuedJob and - _QueuedOpCode classes. + This functions processes a job. It is closely tied to the L{_QueuedJob} and + L{_QueuedOpCode} classes. @type job: L{_QueuedJob} @param job: the job to be processed """ + queue = job.queue + assert queue == self.pool.queue + self.SetTaskName("Job%s" % job.id) - logging.info("Processing job %s", job.id) - proc = mcpu.Processor(self.pool.queue.context, job.id) - queue = job.queue - try: - try: - count = len(job.ops) - for idx, op in enumerate(job.ops): - op_summary = op.input.Summary() - if op.status == constants.OP_STATUS_SUCCESS: - # this is a job that was partially completed before master - # daemon shutdown, so it can be expected that some opcodes - # are already completed successfully (if any did error - # out, then the whole job should have been aborted and not - # resubmitted for processing) - logging.info("Op %s/%s: opcode %s already processed, skipping", - idx + 1, count, op_summary) - continue - try: - logging.info("Op %s/%s: Starting opcode %s", idx + 1, count, - op_summary) - - queue.acquire(shared=1) - try: - if op.status == constants.OP_STATUS_CANCELED: - logging.debug("Canceling opcode") - raise CancelJob() - assert op.status == constants.OP_STATUS_QUEUED - logging.debug("Opcode %s/%s waiting for locks", - idx + 1, count) - op.status = constants.OP_STATUS_WAITLOCK - op.result = None - op.start_timestamp = TimeStampNow() - if idx == 0: # first opcode - job.start_timestamp = op.start_timestamp - queue.UpdateJobUnlocked(job) - - input_opcode = op.input - finally: - queue.release() - - # Make sure not to hold queue lock while calling ExecOpCode - result = proc.ExecOpCode(input_opcode, - _OpExecCallbacks(queue, job, op)) - - queue.acquire(shared=1) - try: - logging.debug("Opcode %s/%s succeeded", idx + 1, count) - op.status = constants.OP_STATUS_SUCCESS - op.result = result - op.end_timestamp = TimeStampNow() - if idx == count - 1: - job.end_timestamp = TimeStampNow() - - # Consistency check - assert compat.all(i.status == constants.OP_STATUS_SUCCESS - for i in job.ops) - - queue.UpdateJobUnlocked(job) - finally: - queue.release() - - logging.info("Op %s/%s: Successfully finished opcode %s", - idx + 1, count, op_summary) - except CancelJob: - # Will be handled further up - raise - except Exception, err: - queue.acquire(shared=1) - try: - try: - logging.debug("Opcode %s/%s failed", idx + 1, count) - op.status = constants.OP_STATUS_ERROR - op.result = _EncodeOpError(err) - op.end_timestamp = TimeStampNow() - logging.info("Op %s/%s: Error in opcode %s: %s", - idx + 1, count, op_summary, err) - - to_encode = errors.OpExecError("Preceding opcode failed") - job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, - _EncodeOpError(to_encode)) - - # Consistency check - assert compat.all(i.status == constants.OP_STATUS_SUCCESS - for i in job.ops[:idx]) - assert compat.all(i.status == constants.OP_STATUS_ERROR and - errors.GetEncodedError(i.result) - for i in job.ops[idx:]) - finally: - job.end_timestamp = TimeStampNow() - queue.UpdateJobUnlocked(job) - finally: - queue.release() - raise - - except CancelJob: - queue.acquire(shared=1) - try: - job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED, - "Job canceled by request") - job.end_timestamp = TimeStampNow() - queue.UpdateJobUnlocked(job) - finally: - queue.release() - except errors.GenericError, err: - logging.exception("Ganeti exception") - except: - logging.exception("Unhandled exception") - finally: - status = job.CalcStatus() - logging.info("Finished job %s, status = %s", job.id, status) + proc = mcpu.Processor(queue.context, job.id) + + if not _JobProcessor(queue, proc.ExecOpCode, job)(): + # Schedule again + raise workerpool.DeferTask() class _JobQueueWorkerPool(workerpool.WorkerPool): diff --git a/lib/opcodes.py b/lib/opcodes.py index 7a2ccf80c6f5b61592f7cfc4a6d387cb17f5f92a..71c01c3e11e5ac6c9378ce8117ce4e8ddc46b66e 100644 --- a/lib/opcodes.py +++ b/lib/opcodes.py @@ -802,6 +802,18 @@ class OpTestJobqueue(OpCode): ] +class OpTestDummy(OpCode): + """Utility opcode used by unittests. + + """ + OP_ID = "OP_TEST_DUMMY" + __slots__ = [ + "result", + "messages", + "fail", + ] + + OP_MAPPING = dict([(v.OP_ID, v) for v in globals().values() if (isinstance(v, type) and issubclass(v, OpCode) and hasattr(v, "OP_ID"))]) diff --git a/test/ganeti.jqueue_unittest.py b/test/ganeti.jqueue_unittest.py index 2a7b61c3b41f214010a95bec33359048d1772ef9..c83157509a1e053f0937a982d902b22602a15d7e 100755 --- a/test/ganeti.jqueue_unittest.py +++ b/test/ganeti.jqueue_unittest.py @@ -347,5 +347,505 @@ class TestQueuedJob(unittest.TestCase): self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20) +class _FakeQueueForProc: + def __init__(self): + self._acquired = False + + def IsAcquired(self): + return self._acquired + + def acquire(self, shared=0): + assert shared == 1 + self._acquired = True + + def release(self): + assert self._acquired + self._acquired = False + + def UpdateJobUnlocked(self, job, replicate=None): + # TODO: Ensure job is updated at the correct places + pass + + +class _FakeExecOpCodeForProc: + def __init__(self, before_start, after_start): + self._before_start = before_start + self._after_start = after_start + + def __call__(self, op, cbs): + assert isinstance(op, opcodes.OpTestDummy) + + if self._before_start: + self._before_start() + + cbs.NotifyStart() + + if self._after_start: + self._after_start(op, cbs) + + if op.fail: + raise errors.OpExecError("Error requested (%s)" % op.result) + + return op.result + + +class TestJobProcessor(unittest.TestCase): + def _CreateJob(self, queue, job_id, ops): + job = jqueue._QueuedJob(queue, job_id, ops) + self.assertFalse(job.start_timestamp) + self.assertFalse(job.end_timestamp) + self.assertEqual(len(ops), len(job.ops)) + self.assert_(compat.all(op.input == inp + for (op, inp) in zip(job.ops, ops))) + self.assertEqual(job.GetInfo(["ops"]), [[op.__getstate__() for op in ops]]) + return job + + def _GenericCheckJob(self, job): + assert compat.all(isinstance(op.input, opcodes.OpTestDummy) + for op in job.ops) + + self.assertEqual(job.GetInfo(["opstart", "opexec", "opend"]), + [[op.start_timestamp for op in job.ops], + [op.exec_timestamp for op in job.ops], + [op.end_timestamp for op in job.ops]]) + self.assertEqual(job.GetInfo(["received_ts", "start_ts", "end_ts"]), + [job.received_timestamp, + job.start_timestamp, + job.end_timestamp]) + self.assert_(job.start_timestamp) + self.assert_(job.end_timestamp) + self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp) + + def testSuccess(self): + queue = _FakeQueueForProc() + + for (job_id, opcount) in [(25351, 1), (6637, 3), + (24644, 10), (32207, 100)]: + ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False) + for i in range(opcount)] + + # Create job + job = self._CreateJob(queue, job_id, ops) + + def _BeforeStart(): + self.assertFalse(queue.IsAcquired()) + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK) + + def _AfterStart(op, cbs): + self.assertFalse(queue.IsAcquired()) + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING) + + # Job is running, cancelling shouldn't be possible + (success, _) = job.Cancel() + self.assertFalse(success) + + opexec = _FakeExecOpCodeForProc(_BeforeStart, _AfterStart) + + for idx in range(len(ops)): + result = jqueue._JobProcessor(queue, opexec, job)() + if idx == len(ops) - 1: + # Last opcode + self.assert_(result) + else: + self.assertFalse(result) + + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) + self.assert_(job.start_timestamp) + self.assertFalse(job.end_timestamp) + + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS) + self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS]) + self.assertEqual(job.GetInfo(["opresult"]), + [[op.input.result for op in job.ops]]) + self.assertEqual(job.GetInfo(["opstatus"]), + [len(job.ops) * [constants.OP_STATUS_SUCCESS]]) + self.assert_(compat.all(op.start_timestamp and op.end_timestamp + for op in job.ops)) + + self._GenericCheckJob(job) + + # Finished jobs can't be processed any further + self.assertRaises(errors.ProgrammerError, + jqueue._JobProcessor(queue, opexec, job)) + + def testOpcodeError(self): + queue = _FakeQueueForProc() + + testdata = [ + (17077, 1, 0, 0), + (1782, 5, 2, 2), + (18179, 10, 9, 9), + (4744, 10, 3, 8), + (23816, 100, 39, 45), + ] + + for (job_id, opcount, failfrom, failto) in testdata: + # Prepare opcodes + ops = [opcodes.OpTestDummy(result="Res%s" % i, + fail=(failfrom <= i and + i <= failto)) + for i in range(opcount)] + + # Create job + job = self._CreateJob(queue, job_id, ops) + + opexec = _FakeExecOpCodeForProc(None, None) + + for idx in range(len(ops)): + result = jqueue._JobProcessor(queue, opexec, job)() + + if idx in (failfrom, len(ops) - 1): + # Last opcode + self.assert_(result) + break + + self.assertFalse(result) + + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) + + # Check job status + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR) + self.assertEqual(job.GetInfo(["id"]), [job_id]) + self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR]) + + # Check opcode status + data = zip(job.ops, + job.GetInfo(["opstatus"])[0], + job.GetInfo(["opresult"])[0]) + + for idx, (op, opstatus, opresult) in enumerate(data): + if idx < failfrom: + assert not op.input.fail + self.assertEqual(opstatus, constants.OP_STATUS_SUCCESS) + self.assertEqual(opresult, op.input.result) + elif idx <= failto: + assert op.input.fail + self.assertEqual(opstatus, constants.OP_STATUS_ERROR) + self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult) + else: + assert not op.input.fail + self.assertEqual(opstatus, constants.OP_STATUS_ERROR) + self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult) + + self.assert_(compat.all(op.start_timestamp and op.end_timestamp + for op in job.ops[:failfrom])) + + self._GenericCheckJob(job) + + # Finished jobs can't be processed any further + self.assertRaises(errors.ProgrammerError, + jqueue._JobProcessor(queue, opexec, job)) + + def testCancelWhileInQueue(self): + queue = _FakeQueueForProc() + + ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False) + for i in range(5)] + + # Create job + job_id = 17045 + job = self._CreateJob(queue, job_id, ops) + + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) + + # Mark as cancelled + (success, _) = job.Cancel() + self.assert_(success) + + self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELED + for op in job.ops)) + + opexec = _FakeExecOpCodeForProc(None, None) + jqueue._JobProcessor(queue, opexec, job)() + + # Check result + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED) + self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED]) + self.assertFalse(job.start_timestamp) + self.assert_(job.end_timestamp) + self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp + for op in job.ops)) + self.assertEqual(job.GetInfo(["opstatus", "opresult"]), + [[constants.OP_STATUS_CANCELED for _ in job.ops], + ["Job canceled by request" for _ in job.ops]]) + + def testCancelWhileWaitlock(self): + queue = _FakeQueueForProc() + + ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False) + for i in range(5)] + + # Create job + job_id = 11009 + job = self._CreateJob(queue, job_id, ops) + + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) + + def _BeforeStart(): + self.assertFalse(queue.IsAcquired()) + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK) + + # Mark as cancelled + (success, _) = job.Cancel() + self.assert_(success) + + self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING + for op in job.ops)) + + def _AfterStart(op, cbs): + self.assertFalse(queue.IsAcquired()) + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING) + + opexec = _FakeExecOpCodeForProc(_BeforeStart, _AfterStart) + + jqueue._JobProcessor(queue, opexec, job)() + + # Check result + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED) + self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED]) + self.assert_(job.start_timestamp) + self.assert_(job.end_timestamp) + self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp + for op in job.ops)) + self.assertEqual(job.GetInfo(["opstatus", "opresult"]), + [[constants.OP_STATUS_CANCELED for _ in job.ops], + ["Job canceled by request" for _ in job.ops]]) + + def testCancelWhileRunning(self): + # Tests canceling a job with finished opcodes and more, unprocessed ones + queue = _FakeQueueForProc() + + ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False) + for i in range(3)] + + # Create job + job_id = 28492 + job = self._CreateJob(queue, job_id, ops) + + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) + + opexec = _FakeExecOpCodeForProc(None, None) + + # Run one opcode + self.assertFalse(jqueue._JobProcessor(queue, opexec, job)()) + + # Job goes back to queued + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) + self.assertEqual(job.GetInfo(["opstatus", "opresult"]), + [[constants.OP_STATUS_SUCCESS, + constants.OP_STATUS_QUEUED, + constants.OP_STATUS_QUEUED], + ["Res0", None, None]]) + + # Mark as cancelled + (success, _) = job.Cancel() + self.assert_(success) + + # Try processing another opcode (this will actually cancel the job) + self.assert_(jqueue._JobProcessor(queue, opexec, job)()) + + # Check result + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED) + self.assertEqual(job.GetInfo(["id"]), [job_id]) + self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED]) + self.assertEqual(job.GetInfo(["opstatus", "opresult"]), + [[constants.OP_STATUS_SUCCESS, + constants.OP_STATUS_CANCELED, + constants.OP_STATUS_CANCELED], + ["Res0", "Job canceled by request", + "Job canceled by request"]]) + + def testPartiallyRun(self): + # Tests calling the processor on a job that's been partially run before the + # program was restarted + queue = _FakeQueueForProc() + + opexec = _FakeExecOpCodeForProc(None, None) + + for job_id, successcount in [(30697, 1), (2552, 4), (12489, 9)]: + ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False) + for i in range(10)] + + # Create job + job = self._CreateJob(queue, job_id, ops) + + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) + + for _ in range(successcount): + self.assertFalse(jqueue._JobProcessor(queue, opexec, job)()) + + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) + self.assertEqual(job.GetInfo(["opstatus"]), + [[constants.OP_STATUS_SUCCESS + for _ in range(successcount)] + + [constants.OP_STATUS_QUEUED + for _ in range(len(ops) - successcount)]]) + + self.assert_(job.current_op) + + # Serialize and restore (simulates program restart) + newjob = jqueue._QueuedJob.Restore(queue, job.Serialize()) + self.assertFalse(newjob.current_op) + self._TestPartial(newjob, successcount) + + def _TestPartial(self, job, successcount): + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) + self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp) + + queue = _FakeQueueForProc() + opexec = _FakeExecOpCodeForProc(None, None) + + for remaining in reversed(range(len(job.ops) - successcount)): + result = jqueue._JobProcessor(queue, opexec, job)() + + if remaining == 0: + # Last opcode + self.assert_(result) + break + + self.assertFalse(result) + + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) + + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS) + self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS]) + self.assertEqual(job.GetInfo(["opresult"]), + [[op.input.result for op in job.ops]]) + self.assertEqual(job.GetInfo(["opstatus"]), + [[constants.OP_STATUS_SUCCESS for _ in job.ops]]) + self.assert_(compat.all(op.start_timestamp and op.end_timestamp + for op in job.ops)) + + self._GenericCheckJob(job) + + # Finished jobs can't be processed any further + self.assertRaises(errors.ProgrammerError, + jqueue._JobProcessor(queue, opexec, job)) + + # ... also after being restored + job2 = jqueue._QueuedJob.Restore(queue, job.Serialize()) + self.assertRaises(errors.ProgrammerError, + jqueue._JobProcessor(queue, opexec, job2)) + + def testProcessorOnRunningJob(self): + ops = [opcodes.OpTestDummy(result="result", fail=False)] + + queue = _FakeQueueForProc() + opexec = _FakeExecOpCodeForProc(None, None) + + # Create job + job = self._CreateJob(queue, 9571, ops) + + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) + + job.ops[0].status = constants.OP_STATUS_RUNNING + + assert len(job.ops) == 1 + + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING) + + # Calling on running job must fail + self.assertRaises(errors.ProgrammerError, + jqueue._JobProcessor(queue, opexec, job)) + + def testLogMessages(self): + # Tests the "Feedback" callback function + queue = _FakeQueueForProc() + + messages = { + 1: [ + (None, "Hello"), + (None, "World"), + (constants.ELOG_MESSAGE, "there"), + ], + 4: [ + (constants.ELOG_JQUEUE_TEST, (1, 2, 3)), + (constants.ELOG_JQUEUE_TEST, ("other", "type")), + ], + } + ops = [opcodes.OpTestDummy(result="Logtest%s" % i, fail=False, + messages=messages.get(i, [])) + for i in range(5)] + + # Create job + job = self._CreateJob(queue, 29386, ops) + + def _BeforeStart(): + self.assertFalse(queue.IsAcquired()) + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK) + + def _AfterStart(op, cbs): + self.assertFalse(queue.IsAcquired()) + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING) + + self.assertRaises(AssertionError, cbs.Feedback, + "too", "many", "arguments") + + for (log_type, msg) in op.messages: + if log_type: + cbs.Feedback(log_type, msg) + else: + cbs.Feedback(msg) + + opexec = _FakeExecOpCodeForProc(_BeforeStart, _AfterStart) + + for remaining in reversed(range(len(job.ops))): + result = jqueue._JobProcessor(queue, opexec, job)() + + if remaining == 0: + # Last opcode + self.assert_(result) + break + + self.assertFalse(result) + + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) + + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS) + self.assertEqual(job.GetInfo(["opresult"]), + [[op.input.result for op in job.ops]]) + + logmsgcount = sum(len(m) for m in messages.values()) + + self._CheckLogMessages(job, logmsgcount) + + # Serialize and restore (simulates program restart) + newjob = jqueue._QueuedJob.Restore(queue, job.Serialize()) + self._CheckLogMessages(newjob, logmsgcount) + + # Check each message + prevserial = -1 + for idx, oplog in enumerate(job.GetInfo(["oplog"])[0]): + for (serial, timestamp, log_type, msg) in oplog: + (exptype, expmsg) = messages.get(idx).pop(0) + if exptype: + self.assertEqual(log_type, exptype) + else: + self.assertEqual(log_type, constants.ELOG_MESSAGE) + self.assertEqual(expmsg, msg) + self.assert_(serial > prevserial) + prevserial = serial + + def _CheckLogMessages(self, job, count): + # Check serial + self.assertEqual(job.log_serial, count) + + # No filter + self.assertEqual(job.GetLogEntries(None), + [entry for entries in job.GetInfo(["oplog"])[0] if entries + for entry in entries]) + + # Filter with serial + assert count > 3 + self.assert_(job.GetLogEntries(3)) + self.assertEqual(job.GetLogEntries(3), + [entry for entries in job.GetInfo(["oplog"])[0] if entries + for entry in entries][3:]) + + # No log message after highest serial + self.assertFalse(job.GetLogEntries(count)) + self.assertFalse(job.GetLogEntries(count + 3)) + + if __name__ == "__main__": testutils.GanetiTestProgram()