Skip to content
Snippets Groups Projects
Commit 3c0d60d0 authored by Guido Trotter's avatar Guido Trotter
Browse files

Share the jqueue lock on job-local changes


We can share the jqueue lock when we do per-job updates. These only
conflict with updates/checks on the same job from another thread (eg.
CancelJob, ArchiveJob, which keep the lock unshared, since they are less
frequent).

Signed-off-by: default avatarGuido Trotter <ultrotter@google.com>
Reviewed-by: default avatarIustin Pop <iustin@google.com>
parent 9bf5e01f
No related branches found
No related tags found
No related merge requests found
......@@ -429,7 +429,7 @@ class _OpExecCallbacks(mcpu.OpExecCbBase):
Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
"""
self._queue.acquire()
self._queue.acquire(shared=1)
try:
assert self._op.status in (constants.OP_STATUS_WAITLOCK,
constants.OP_STATUS_CANCELING)
......@@ -446,7 +446,7 @@ class _OpExecCallbacks(mcpu.OpExecCbBase):
finally:
self._queue.release()
@locking.ssynchronized(_big_jqueue_lock)
@locking.ssynchronized(_big_jqueue_lock, shared=1)
def _AppendFeedback(self, timestamp, log_type, log_msg):
"""Internal feedback append function, with locks
......@@ -626,7 +626,7 @@ class _JobQueueWorker(workerpool.BaseWorker):
logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
op_summary)
queue.acquire()
queue.acquire(shared=1)
try:
if op.status == constants.OP_STATUS_CANCELED:
raise CancelJob()
......@@ -646,7 +646,7 @@ class _JobQueueWorker(workerpool.BaseWorker):
result = proc.ExecOpCode(input_opcode,
_OpExecCallbacks(queue, job, op))
queue.acquire()
queue.acquire(shared=1)
try:
op.status = constants.OP_STATUS_SUCCESS
op.result = result
......@@ -661,7 +661,7 @@ class _JobQueueWorker(workerpool.BaseWorker):
# Will be handled further up
raise
except Exception, err:
queue.acquire()
queue.acquire(shared=1)
try:
try:
op.status = constants.OP_STATUS_ERROR
......@@ -679,7 +679,7 @@ class _JobQueueWorker(workerpool.BaseWorker):
raise
except CancelJob:
queue.acquire()
queue.acquire(shared=1)
try:
job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
"Job canceled by request")
......@@ -690,7 +690,7 @@ class _JobQueueWorker(workerpool.BaseWorker):
except:
logging.exception("Unhandled exception")
finally:
queue.acquire()
queue.acquire(shared=1)
try:
try:
job.lock_status = None
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment