From b80cc51813254199e0b30e2bbe354374110cbd17 Mon Sep 17 00:00:00 2001 From: Michael Hanselmann <hansmi@google.com> Date: Mon, 20 Sep 2010 19:31:36 +0200 Subject: [PATCH] jqueue: Introduce per-opcode context object MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This is better to group per-opcode data. Signed-off-by: Michael Hanselmann <hansmi@google.com> Reviewed-by: RenΓ© Nussbaumer <rn@google.com> --- lib/jqueue.py | 55 ++++++++++++++++++++++++++++++++------------------- 1 file changed, 35 insertions(+), 20 deletions(-) diff --git a/lib/jqueue.py b/lib/jqueue.py index 4dd0b5364..c7fd4c7d7 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -745,6 +745,17 @@ def _EncodeOpError(err): return errors.EncodeException(to_encode) +class _OpExecContext: + def __init__(self, op, index, log_prefix): + """Initializes this class. + + """ + self.op = op + self.index = index + self.log_prefix = log_prefix + self.summary = op.input.Summary() + + class _JobProcessor(object): def __init__(self, queue, opexec_fn, job): """Initializes this class. @@ -780,8 +791,7 @@ class _JobProcessor(object): # Found an opcode already marked as running raise errors.ProgrammerError("Called for job marked as running") - log_prefix = "Op %s/%s" % (idx + 1, len(job.ops)) - summary = op.input.Summary() + opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops))) if op.status == constants.OP_STATUS_CANCELED: # Cancelled jobs are handled by the caller @@ -794,10 +804,10 @@ class _JobProcessor(object): # completed successfully (if any did error out, then the whole job # should have been aborted and not resubmitted for processing). logging.info("%s: opcode %s already processed, skipping", - log_prefix, summary) + opctx.log_prefix, opctx.summary) continue - return (idx, op, log_prefix, summary) + return opctx @staticmethod def _MarkWaitlock(job, op): @@ -820,10 +830,12 @@ class _JobProcessor(object): if job.start_timestamp is None: job.start_timestamp = op.start_timestamp - def _ExecOpCodeUnlocked(self, log_prefix, op, summary): + def _ExecOpCodeUnlocked(self, opctx): """Processes one opcode and returns the result. """ + op = opctx.op + assert op.status == constants.OP_STATUS_WAITLOCK try: @@ -831,14 +843,16 @@ class _JobProcessor(object): result = self.opexec_fn(op.input, _OpExecCallbacks(self.queue, self.job, op)) except CancelJob: - logging.exception("%s: Canceling job", log_prefix) + logging.exception("%s: Canceling job", opctx.log_prefix) assert op.status == constants.OP_STATUS_CANCELING return (constants.OP_STATUS_CANCELING, None) except Exception, err: # pylint: disable-msg=W0703 - logging.exception("%s: Caught exception in %s", log_prefix, summary) + logging.exception("%s: Caught exception in %s", + opctx.log_prefix, opctx.summary) return (constants.OP_STATUS_ERROR, _EncodeOpError(err)) else: - logging.debug("%s: %s successful", log_prefix, summary) + logging.debug("%s: %s successful", + opctx.log_prefix, opctx.summary) return (constants.OP_STATUS_SUCCESS, result) def __call__(self): @@ -858,12 +872,13 @@ class _JobProcessor(object): try: opcount = len(job.ops) - (opidx, op, log_prefix, op_summary) = self._FindNextOpcode(job) + opctx = self._FindNextOpcode(job) + op = opctx.op # Consistency check assert compat.all(i.status in (constants.OP_STATUS_QUEUED, constants.OP_STATUS_CANCELED) - for i in job.ops[opidx:]) + for i in job.ops[opctx.index:]) assert op.status in (constants.OP_STATUS_QUEUED, constants.OP_STATUS_WAITLOCK, @@ -879,12 +894,12 @@ class _JobProcessor(object): # Write to disk queue.UpdateJobUnlocked(job) - logging.info("%s: opcode %s waiting for locks", log_prefix, op_summary) + logging.info("%s: opcode %s waiting for locks", + opctx.log_prefix, opctx.summary) queue.release() try: - (op_status, op_result) = \ - self._ExecOpCodeUnlocked(log_prefix, op, op_summary) + (op_status, op_result) = self._ExecOpCodeUnlocked(opctx) finally: queue.acquire(shared=1) @@ -895,21 +910,21 @@ class _JobProcessor(object): if op.status == constants.OP_STATUS_CANCELING: assert not compat.any(i.status != constants.OP_STATUS_CANCELING - for i in job.ops[opidx:]) + for i in job.ops[opctx.index:]) else: assert op.status in constants.OPS_FINALIZED # Ensure all opcodes so far have been successful - assert (opidx == 0 or + assert (opctx.index == 0 or compat.all(i.status == constants.OP_STATUS_SUCCESS - for i in job.ops[:opidx])) + for i in job.ops[:opctx.index])) if op.status == constants.OP_STATUS_SUCCESS: finalize = False elif op.status == constants.OP_STATUS_ERROR: # Ensure failed opcode has an exception as its result - assert errors.GetEncodedError(job.ops[opidx].result) + assert errors.GetEncodedError(job.ops[opctx.index].result) to_encode = errors.OpExecError("Preceding opcode failed") job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, @@ -919,7 +934,7 @@ class _JobProcessor(object): # Consistency check assert compat.all(i.status == constants.OP_STATUS_ERROR and errors.GetEncodedError(i.result) - for i in job.ops[opidx:]) + for i in job.ops[opctx.index:]) elif op.status == constants.OP_STATUS_CANCELING: job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED, @@ -933,7 +948,7 @@ class _JobProcessor(object): raise errors.ProgrammerError("Unknown status '%s'" % op.status) # Finalizing or last opcode? - if finalize or opidx == (opcount - 1): + if finalize or opctx.index == (opcount - 1): # All opcodes have been run, finalize job job.end_timestamp = TimeStampNow() @@ -941,7 +956,7 @@ class _JobProcessor(object): # allowed. Once the file has been written, it can be archived anytime. queue.UpdateJobUnlocked(job) - if finalize or opidx == (opcount - 1): + if finalize or opctx.index == (opcount - 1): logging.info("Finished job %s, status = %s", job.id, job.CalcStatus()) return True -- GitLab