Commit 37d76f1e authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

jqueue: Fix deadlock between job queue and dependency manager



When an opcode is about to be processed its dependencies are
evaluated using “_JobDependencyManager.CheckAndRegister”. Due
to its nature that function requires a lock on the manager's
internal structures. All of this happens while the job queue
lock is held in shared mode (required for the job processor).

When a job has been processed any pending dependencies are re-added
to the job workerpool. Before this patch that would require
the manager's lock and then, for adding the jobs, the job queue
lock. Since this is in reverse order it will lead to deadlocks.
Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarIustin Pop <iustin@google.com>
parent 1d4930b9
......@@ -1439,28 +1439,39 @@ class _JobDependencyManager:
" not one of '%s' as required" %
(dep_job_id, status, utils.CommaJoin(dep_status)))
@locking.ssynchronized(_LOCK)
def _RemoveEmptyWaitersUnlocked(self):
"""Remove all jobs without actual waiters.
"""
for job_id in [job_id for (job_id, waiters) in self._waiters.items()
if not waiters]:
del self._waiters[job_id]
def NotifyWaiters(self, job_id):
"""Notifies all jobs waiting for a certain job ID.
@important: Do not call until L{CheckAndRegister} returned a status other
than L{self.WAIT} for C{job_id}, or behaviour is undefined
@type job_id: string
@param job_id: Job ID
"""
assert ht.TString(job_id)
jobs = self._waiters.pop(job_id, None)
self._lock.acquire()
try:
self._RemoveEmptyWaitersUnlocked()
jobs = self._waiters.pop(job_id, None)
finally:
self._lock.release()
if jobs:
# Re-add jobs to workerpool
logging.debug("Re-adding %s jobs which were waiting for job %s",
len(jobs), job_id)
self._enqueue_fn(jobs)
# Remove all jobs without actual waiters
for job_id in [job_id for (job_id, waiters) in self._waiters.items()
if not waiters]:
del self._waiters[job_id]
def _RequireOpenQueue(fn):
"""Decorator for "public" functions.
......
......@@ -1874,6 +1874,9 @@ class TestJobDependencyManager(unittest.TestCase):
return result
def _Enqueue(self, jobs):
self.assertFalse(self.jdm._lock.is_owned(),
msg=("Must not own manager lock while re-adding jobs"
" (potential deadlock)"))
self._queue.append(jobs)
def testNotFinalizedThenCancel(self):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment