diff --git a/lib/jqueue.py b/lib/jqueue.py index 77512a98455ff7df2ceb8b923c77e5d34d1eae34..870559d72f098ad1a16ced54a92f399536c3b128 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -429,7 +429,7 @@ class _OpExecCallbacks(mcpu.OpExecCbBase): Processor.ExecOpCode) set to OP_STATUS_WAITLOCK. """ - self._queue.acquire() + self._queue.acquire(shared=1) try: assert self._op.status in (constants.OP_STATUS_WAITLOCK, constants.OP_STATUS_CANCELING) @@ -446,7 +446,7 @@ class _OpExecCallbacks(mcpu.OpExecCbBase): finally: self._queue.release() - @locking.ssynchronized(_big_jqueue_lock) + @locking.ssynchronized(_big_jqueue_lock, shared=1) def _AppendFeedback(self, timestamp, log_type, log_msg): """Internal feedback append function, with locks @@ -626,7 +626,7 @@ class _JobQueueWorker(workerpool.BaseWorker): logging.info("Op %s/%s: Starting opcode %s", idx + 1, count, op_summary) - queue.acquire() + queue.acquire(shared=1) try: if op.status == constants.OP_STATUS_CANCELED: raise CancelJob() @@ -646,7 +646,7 @@ class _JobQueueWorker(workerpool.BaseWorker): result = proc.ExecOpCode(input_opcode, _OpExecCallbacks(queue, job, op)) - queue.acquire() + queue.acquire(shared=1) try: op.status = constants.OP_STATUS_SUCCESS op.result = result @@ -661,7 +661,7 @@ class _JobQueueWorker(workerpool.BaseWorker): # Will be handled further up raise except Exception, err: - queue.acquire() + queue.acquire(shared=1) try: try: op.status = constants.OP_STATUS_ERROR @@ -679,7 +679,7 @@ class _JobQueueWorker(workerpool.BaseWorker): raise except CancelJob: - queue.acquire() + queue.acquire(shared=1) try: job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED, "Job canceled by request") @@ -690,7 +690,7 @@ class _JobQueueWorker(workerpool.BaseWorker): except: logging.exception("Unhandled exception") finally: - queue.acquire() + queue.acquire(shared=1) try: try: job.lock_status = None