From 307149a81f1c37747ca33807c08df5d4d4d8ab52 Mon Sep 17 00:00:00 2001 From: Iustin Pop <iustin@google.com> Date: Thu, 10 Jul 2008 12:29:07 +0000 Subject: [PATCH] Switch _QueuedOpCode to have their own lock Right now, the queued opcode doesn't have a lock, and instead relies on the parent QueuedJob's lock. This is not good for logging feedback, so it's better to have a lock for each queuedopcode. Reviewed-by: ultrotter --- lib/jqueue.py | 73 +++++++++++++++++++++++++++------------------------ 1 file changed, 38 insertions(+), 35 deletions(-) diff --git a/lib/jqueue.py b/lib/jqueue.py index d96094ea8..3ba41afff 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -37,13 +37,36 @@ JOBQUEUE_THREADS = 5 class _QueuedOpCode(object): """Encasulates an opcode object. - Access must be synchronized by using an external lock. + Access is synchronized by the '_lock' attribute. """ def __init__(self, op): self.input = op self.status = constants.OP_STATUS_QUEUED self.result = None + self._lock = threading.Lock() + + @utils.LockedMethod + def SetStatus(self, status, result): + """Update the opcode status and result. + + """ + self.status = status + self.result = result + + @utils.LockedMethod + def GetStatus(self): + """Get the opcode status. + + """ + return self.status + + @utils.LockedMethod + def GetResult(self): + """Get the opcode result. + + """ + return self.result class _QueuedJob(object): @@ -64,21 +87,22 @@ class _QueuedJob(object): # to use it. self._ops = [_QueuedOpCode(op) for op in ops] - def _GetStatusUnlocked(self): + def GetStatus(self): status = constants.JOB_STATUS_QUEUED all_success = True for op in self._ops: - if op.status == constants.OP_STATUS_SUCCESS: + op_status = op.GetStatus() + if op_status == constants.OP_STATUS_SUCCESS: continue all_success = False - if op.status == constants.OP_STATUS_QUEUED: + if op_status == constants.OP_STATUS_QUEUED: pass - elif op.status == constants.OP_STATUS_ERROR: + elif op_status == constants.OP_STATUS_ERROR: status = constants.JOB_STATUS_ERROR - elif op.status == constants.OP_STATUS_RUNNING: + elif op_status == constants.OP_STATUS_RUNNING: status = constants.JOB_STATUS_RUNNING if all_success: @@ -86,13 +110,6 @@ class _QueuedJob(object): return status - def GetStatus(self): - self._lock.acquire() - try: - return self._GetStatusUnlocked() - finally: - self._lock.release() - def Run(self, proc): """Job executor. @@ -107,31 +124,17 @@ class _QueuedJob(object): count = len(self._ops) for idx, op in enumerate(self._ops): try: - self._lock.acquire() - try: - logging.debug("Op %s/%s: Starting %s", idx + 1, count, op) - op.status = constants.OP_STATUS_RUNNING - finally: - self._lock.release() + logging.debug("Op %s/%s: Starting %s", idx + 1, count, op) + op.SetStatus(constants.OP_STATUS_RUNNING, None) result = proc.ExecOpCode(op.input) - self._lock.acquire() - try: - logging.debug("Op %s/%s: Successfully finished %s", - idx + 1, count, op) - op.status = constants.OP_STATUS_SUCCESS - op.result = result - finally: - self._lock.release() + op.SetStatus(constants.OP_STATUS_SUCCESS, result) + logging.debug("Op %s/%s: Successfully finished %s", + idx + 1, count, op) except Exception, err: - self._lock.acquire() - try: - logging.debug("Op %s/%s: Error in %s", idx + 1, count, op) - op.status = constants.OP_STATUS_ERROR - op.result = str(err) - finally: - self._lock.release() + op.SetStatus(constants.OP_STATUS_ERROR, str(err)) + logging.debug("Op %s/%s: Error in %s", idx + 1, count, op) raise except errors.GenericError, err: @@ -227,7 +230,7 @@ class JobQueue: row.append(job.GetStatus()) elif fname == "result": # TODO - row.append(map(lambda op: op.result, job._ops)) + row.append([op.GetResult() for op in job._ops]) else: raise errors.OpExecError("Invalid job query field '%s'" % fname) return row -- GitLab