Commit 307149a8 authored by Iustin Pop's avatar Iustin Pop
Browse files

Switch _QueuedOpCode to have their own lock

Right now, the queued opcode doesn't have a lock, and instead relies on
the parent QueuedJob's lock.

This is not good for logging feedback, so it's better to have a lock for
each queuedopcode.

Reviewed-by: ultrotter
parent 7996a135
......@@ -37,13 +37,36 @@ JOBQUEUE_THREADS = 5
class _QueuedOpCode(object):
"""Encasulates an opcode object.
Access must be synchronized by using an external lock.
Access is synchronized by the '_lock' attribute.
"""
def __init__(self, op):
self.input = op
self.status = constants.OP_STATUS_QUEUED
self.result = None
self._lock = threading.Lock()
@utils.LockedMethod
def SetStatus(self, status, result):
"""Update the opcode status and result.
"""
self.status = status
self.result = result
@utils.LockedMethod
def GetStatus(self):
"""Get the opcode status.
"""
return self.status
@utils.LockedMethod
def GetResult(self):
"""Get the opcode result.
"""
return self.result
class _QueuedJob(object):
......@@ -64,21 +87,22 @@ class _QueuedJob(object):
# to use it.
self._ops = [_QueuedOpCode(op) for op in ops]
def _GetStatusUnlocked(self):
def GetStatus(self):
status = constants.JOB_STATUS_QUEUED
all_success = True
for op in self._ops:
if op.status == constants.OP_STATUS_SUCCESS:
op_status = op.GetStatus()
if op_status == constants.OP_STATUS_SUCCESS:
continue
all_success = False
if op.status == constants.OP_STATUS_QUEUED:
if op_status == constants.OP_STATUS_QUEUED:
pass
elif op.status == constants.OP_STATUS_ERROR:
elif op_status == constants.OP_STATUS_ERROR:
status = constants.JOB_STATUS_ERROR
elif op.status == constants.OP_STATUS_RUNNING:
elif op_status == constants.OP_STATUS_RUNNING:
status = constants.JOB_STATUS_RUNNING
if all_success:
......@@ -86,13 +110,6 @@ class _QueuedJob(object):
return status
def GetStatus(self):
self._lock.acquire()
try:
return self._GetStatusUnlocked()
finally:
self._lock.release()
def Run(self, proc):
"""Job executor.
......@@ -107,31 +124,17 @@ class _QueuedJob(object):
count = len(self._ops)
for idx, op in enumerate(self._ops):
try:
self._lock.acquire()
try:
logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
op.status = constants.OP_STATUS_RUNNING
finally:
self._lock.release()
logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
op.SetStatus(constants.OP_STATUS_RUNNING, None)
result = proc.ExecOpCode(op.input)
self._lock.acquire()
try:
logging.debug("Op %s/%s: Successfully finished %s",
idx + 1, count, op)
op.status = constants.OP_STATUS_SUCCESS
op.result = result
finally:
self._lock.release()
op.SetStatus(constants.OP_STATUS_SUCCESS, result)
logging.debug("Op %s/%s: Successfully finished %s",
idx + 1, count, op)
except Exception, err:
self._lock.acquire()
try:
logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
op.status = constants.OP_STATUS_ERROR
op.result = str(err)
finally:
self._lock.release()
op.SetStatus(constants.OP_STATUS_ERROR, str(err))
logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
raise
except errors.GenericError, err:
......@@ -227,7 +230,7 @@ class JobQueue:
row.append(job.GetStatus())
elif fname == "result":
# TODO
row.append(map(lambda op: op.result, job._ops))
row.append([op.GetResult() for op in job._ops])
else:
raise errors.OpExecError("Invalid job query field '%s'" % fname)
return row
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment