From 37d76f1e44a914f67a9a2a85f2261fae7243a734 Mon Sep 17 00:00:00 2001 From: Michael Hanselmann <hansmi@google.com> Date: Mon, 19 Dec 2011 16:26:55 +0100 Subject: [PATCH] jqueue: Fix deadlock between job queue and dependency manager MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When an opcode is about to be processed its dependencies are evaluated using β_JobDependencyManager.CheckAndRegisterβ. Due to its nature that function requires a lock on the manager's internal structures. All of this happens while the job queue lock is held in shared mode (required for the job processor). When a job has been processed any pending dependencies are re-added to the job workerpool. Before this patch that would require the manager's lock and then, for adding the jobs, the job queue lock. Since this is in reverse order it will lead to deadlocks. Signed-off-by: Michael Hanselmann <hansmi@google.com> Reviewed-by: Iustin Pop <iustin@google.com> --- lib/jqueue.py | 25 ++++++++++++++++++------- test/ganeti.jqueue_unittest.py | 3 +++ 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/lib/jqueue.py b/lib/jqueue.py index d5ea3cb79..cc06b702c 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -1439,28 +1439,39 @@ class _JobDependencyManager: " not one of '%s' as required" % (dep_job_id, status, utils.CommaJoin(dep_status))) - @locking.ssynchronized(_LOCK) + def _RemoveEmptyWaitersUnlocked(self): + """Remove all jobs without actual waiters. + + """ + for job_id in [job_id for (job_id, waiters) in self._waiters.items() + if not waiters]: + del self._waiters[job_id] + def NotifyWaiters(self, job_id): """Notifies all jobs waiting for a certain job ID. + @important: Do not call until L{CheckAndRegister} returned a status other + than L{self.WAIT} for C{job_id}, or behaviour is undefined @type job_id: string @param job_id: Job ID """ assert ht.TString(job_id) - jobs = self._waiters.pop(job_id, None) + self._lock.acquire() + try: + self._RemoveEmptyWaitersUnlocked() + + jobs = self._waiters.pop(job_id, None) + finally: + self._lock.release() + if jobs: # Re-add jobs to workerpool logging.debug("Re-adding %s jobs which were waiting for job %s", len(jobs), job_id) self._enqueue_fn(jobs) - # Remove all jobs without actual waiters - for job_id in [job_id for (job_id, waiters) in self._waiters.items() - if not waiters]: - del self._waiters[job_id] - def _RequireOpenQueue(fn): """Decorator for "public" functions. diff --git a/test/ganeti.jqueue_unittest.py b/test/ganeti.jqueue_unittest.py index 45c298450..f2ffc8a4e 100755 --- a/test/ganeti.jqueue_unittest.py +++ b/test/ganeti.jqueue_unittest.py @@ -1874,6 +1874,9 @@ class TestJobDependencyManager(unittest.TestCase): return result def _Enqueue(self, jobs): + self.assertFalse(self.jdm._lock.is_owned(), + msg=("Must not own manager lock while re-adding jobs" + " (potential deadlock)")) self._queue.append(jobs) def testNotFinalizedThenCancel(self): -- GitLab