diff --git a/lib/jqueue.py b/lib/jqueue.py index 84f612045a9568c390c6ed68fe6f4c3b32f44838..7addefe50c5fe4f9c925c6b6621dba7e3c6c3578 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -1447,28 +1447,39 @@ class _JobDependencyManager: " not one of '%s' as required" % (dep_job_id, status, utils.CommaJoin(dep_status))) - @locking.ssynchronized(_LOCK) + def _RemoveEmptyWaitersUnlocked(self): + """Remove all jobs without actual waiters. + + """ + for job_id in [job_id for (job_id, waiters) in self._waiters.items() + if not waiters]: + del self._waiters[job_id] + def NotifyWaiters(self, job_id): """Notifies all jobs waiting for a certain job ID. + @attention: Do not call until L{CheckAndRegister} returned a status other + than C{WAITDEP} for C{job_id}, or behaviour is undefined @type job_id: string @param job_id: Job ID """ assert ht.TString(job_id) - jobs = self._waiters.pop(job_id, None) + self._lock.acquire() + try: + self._RemoveEmptyWaitersUnlocked() + + jobs = self._waiters.pop(job_id, None) + finally: + self._lock.release() + if jobs: # Re-add jobs to workerpool logging.debug("Re-adding %s jobs which were waiting for job %s", len(jobs), job_id) self._enqueue_fn(jobs) - # Remove all jobs without actual waiters - for job_id in [job_id for (job_id, waiters) in self._waiters.items() - if not waiters]: - del self._waiters[job_id] - def _RequireOpenQueue(fn): """Decorator for "public" functions. diff --git a/test/ganeti.jqueue_unittest.py b/test/ganeti.jqueue_unittest.py index cd0bacdb510c2daaac148b4724003c97258784e5..ea5e8f4be522be31494e358b3546631a87336fe6 100755 --- a/test/ganeti.jqueue_unittest.py +++ b/test/ganeti.jqueue_unittest.py @@ -1917,6 +1917,9 @@ class TestJobDependencyManager(unittest.TestCase): return result def _Enqueue(self, jobs): + self.assertFalse(self.jdm._lock.is_owned(), + msg=("Must not own manager lock while re-adding jobs" + " (potential deadlock)")) self._queue.append(jobs) def testNotFinalizedThenCancel(self):