diff --git a/lib/jqueue.py b/lib/jqueue.py index d96094ea8d56aa1920e126e36ede00748e7c7648..3ba41afff5cb9385dd6fd065d52c739046921048 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -37,13 +37,36 @@ JOBQUEUE_THREADS = 5 class _QueuedOpCode(object): """Encasulates an opcode object. - Access must be synchronized by using an external lock. + Access is synchronized by the '_lock' attribute. """ def __init__(self, op): self.input = op self.status = constants.OP_STATUS_QUEUED self.result = None + self._lock = threading.Lock() + + @utils.LockedMethod + def SetStatus(self, status, result): + """Update the opcode status and result. + + """ + self.status = status + self.result = result + + @utils.LockedMethod + def GetStatus(self): + """Get the opcode status. + + """ + return self.status + + @utils.LockedMethod + def GetResult(self): + """Get the opcode result. + + """ + return self.result class _QueuedJob(object): @@ -64,21 +87,22 @@ class _QueuedJob(object): # to use it. self._ops = [_QueuedOpCode(op) for op in ops] - def _GetStatusUnlocked(self): + def GetStatus(self): status = constants.JOB_STATUS_QUEUED all_success = True for op in self._ops: - if op.status == constants.OP_STATUS_SUCCESS: + op_status = op.GetStatus() + if op_status == constants.OP_STATUS_SUCCESS: continue all_success = False - if op.status == constants.OP_STATUS_QUEUED: + if op_status == constants.OP_STATUS_QUEUED: pass - elif op.status == constants.OP_STATUS_ERROR: + elif op_status == constants.OP_STATUS_ERROR: status = constants.JOB_STATUS_ERROR - elif op.status == constants.OP_STATUS_RUNNING: + elif op_status == constants.OP_STATUS_RUNNING: status = constants.JOB_STATUS_RUNNING if all_success: @@ -86,13 +110,6 @@ class _QueuedJob(object): return status - def GetStatus(self): - self._lock.acquire() - try: - return self._GetStatusUnlocked() - finally: - self._lock.release() - def Run(self, proc): """Job executor. @@ -107,31 +124,17 @@ class _QueuedJob(object): count = len(self._ops) for idx, op in enumerate(self._ops): try: - self._lock.acquire() - try: - logging.debug("Op %s/%s: Starting %s", idx + 1, count, op) - op.status = constants.OP_STATUS_RUNNING - finally: - self._lock.release() + logging.debug("Op %s/%s: Starting %s", idx + 1, count, op) + op.SetStatus(constants.OP_STATUS_RUNNING, None) result = proc.ExecOpCode(op.input) - self._lock.acquire() - try: - logging.debug("Op %s/%s: Successfully finished %s", - idx + 1, count, op) - op.status = constants.OP_STATUS_SUCCESS - op.result = result - finally: - self._lock.release() + op.SetStatus(constants.OP_STATUS_SUCCESS, result) + logging.debug("Op %s/%s: Successfully finished %s", + idx + 1, count, op) except Exception, err: - self._lock.acquire() - try: - logging.debug("Op %s/%s: Error in %s", idx + 1, count, op) - op.status = constants.OP_STATUS_ERROR - op.result = str(err) - finally: - self._lock.release() + op.SetStatus(constants.OP_STATUS_ERROR, str(err)) + logging.debug("Op %s/%s: Error in %s", idx + 1, count, op) raise except errors.GenericError, err: @@ -227,7 +230,7 @@ class JobQueue: row.append(job.GetStatus()) elif fname == "result": # TODO - row.append(map(lambda op: op.result, job._ops)) + row.append([op.GetResult() for op in job._ops]) else: raise errors.OpExecError("Invalid job query field '%s'" % fname) return row