From c31825f7df19056ad16789f754f7e32f36d35a23 Mon Sep 17 00:00:00 2001 From: Michael Hanselmann <hansmi@google.com> Date: Thu, 26 Aug 2010 18:17:20 +0200 Subject: [PATCH] =?UTF-8?q?Show=20list=20of=20pending=20acquires=20in=20?= =?UTF-8?q?=E2=80=9Cgnt-debug=20locks=E2=80=9D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This is accomplished by keeping a list of waiting threads instead of just their number inside the lock-internal condition. A few other tweaks to the output format are also made. Signed-off-by: Michael Hanselmann <hansmi@google.com> Reviewed-by: Iustin Pop <iustin@google.com> --- lib/locking.py | 48 +++++++++++---- man/gnt-debug.sgml | 6 ++ scripts/gnt-debug | 9 ++- test/ganeti.locking_unittest.py | 102 ++++++++++++++++++++++++++++---- 4 files changed, 139 insertions(+), 26 deletions(-) diff --git a/lib/locking.py b/lib/locking.py index 2aeb7d980..057cc84f4 100644 --- a/lib/locking.py +++ b/lib/locking.py @@ -38,6 +38,10 @@ from ganeti import utils from ganeti import compat +_EXCLUSIVE_TEXT = "exclusive" +_SHARED_TEXT = "shared" + + def ssynchronized(mylock, shared=0): """Shared Synchronization decorator. @@ -343,7 +347,7 @@ class PipeCondition(_BaseCondition): """ __slots__ = [ - "_nwaiters", + "_waiters", "_single_condition", ] @@ -354,7 +358,7 @@ class PipeCondition(_BaseCondition): """ _BaseCondition.__init__(self, lock) - self._nwaiters = 0 + self._waiters = set() self._single_condition = self._single_condition_class(self._lock) def wait(self, timeout=None): @@ -368,15 +372,14 @@ class PipeCondition(_BaseCondition): # Keep local reference to the pipe. It could be replaced by another thread # notifying while we're waiting. - my_condition = self._single_condition + cond = self._single_condition - assert self._nwaiters >= 0 - self._nwaiters += 1 + self._waiters.add(threading.currentThread()) try: - my_condition.wait(timeout) + cond.wait(timeout) finally: - assert self._nwaiters > 0 - self._nwaiters -= 1 + self._check_owned() + self._waiters.remove(threading.currentThread()) def notifyAll(self): # pylint: disable-msg=C0103 """Notify all currently waiting threads. @@ -386,13 +389,21 @@ class PipeCondition(_BaseCondition): self._single_condition.notifyAll() self._single_condition = self._single_condition_class(self._lock) + def get_waiting(self): + """Returns a list of all waiting threads. + + """ + self._check_owned() + + return self._waiters + def has_waiting(self): """Returns whether there are active waiters. """ self._check_owned() - return bool(self._nwaiters) + return bool(self._waiters) class SharedLock(object): @@ -479,9 +490,9 @@ class SharedLock(object): info.append("deleted") assert not (self.__exc or self.__shr) elif self.__exc: - info.append("exclusive") + info.append(_EXCLUSIVE_TEXT) elif self.__shr: - info.append("shared") + info.append(_SHARED_TEXT) else: info.append(None) elif fname == "owner": @@ -495,6 +506,21 @@ class SharedLock(object): info.append([i.getName() for i in owner]) else: info.append(None) + elif fname == "pending": + data = [] + + for cond in self.__pending: + if cond in (self.__active_shr_c, self.__inactive_shr_c): + mode = _SHARED_TEXT + else: + mode = _EXCLUSIVE_TEXT + + # This function should be fast as it runs with the lock held. Hence + # not using utils.NiceSort. + data.append((mode, sorted([i.getName() + for i in cond.get_waiting()]))) + + info.append(data) else: raise errors.OpExecError("Invalid query field '%s'" % fname) diff --git a/man/gnt-debug.sgml b/man/gnt-debug.sgml index 2d2e99349..c117226cc 100644 --- a/man/gnt-debug.sgml +++ b/man/gnt-debug.sgml @@ -240,6 +240,12 @@ <simpara>Current lock owner(s)</simpara> </listitem> </varlistentry> + <varlistentry> + <term>pending</term> + <listitem> + <simpara>Threads waiting for the lock</simpara> + </listitem> + </varlistentry> </variablelist> </para> diff --git a/scripts/gnt-debug b/scripts/gnt-debug index 8fea3a911..e54a34a65 100755 --- a/scripts/gnt-debug +++ b/scripts/gnt-debug @@ -44,6 +44,7 @@ _LIST_LOCKS_DEF_FIELDS = [ "name", "mode", "owner", + "pending", ] @@ -423,6 +424,7 @@ def ListLocks(opts, args): # pylint: disable-msg=W0613 "name": "Name", "mode": "Mode", "owner": "Owner", + "pending": "Pending", } else: headers = None @@ -436,10 +438,13 @@ def ListLocks(opts, args): # pylint: disable-msg=W0613 for idx, field in enumerate(selected_fields): val = row[idx] - if field in ("mode", "owner") and val is None: + if field in ("mode", "owner", "pending") and not val: val = "-" elif field == "owner": - val = utils.CommaJoin(val) + val = ",".join(val) + elif field == "pending": + val = utils.CommaJoin("%s:%s" % (mode, ",".join(threads)) + for mode, threads in val) row[idx] = str(val) diff --git a/test/ganeti.locking_unittest.py b/test/ganeti.locking_unittest.py index c1ea3130b..273e81725 100755 --- a/test/ganeti.locking_unittest.py +++ b/test/ganeti.locking_unittest.py @@ -166,20 +166,22 @@ class TestPipeCondition(_ConditionTestCase): self._testNotification() def _TestWait(self, fn): - self._addThread(target=fn) - self._addThread(target=fn) - self._addThread(target=fn) + threads = [ + self._addThread(target=fn), + self._addThread(target=fn), + self._addThread(target=fn), + ] # Wait for threads to be waiting - self.assertEqual(self.done.get(True, 1), "A") - self.assertEqual(self.done.get(True, 1), "A") - self.assertEqual(self.done.get(True, 1), "A") + for _ in threads: + self.assertEqual(self.done.get(True, 1), "A") self.assertRaises(Queue.Empty, self.done.get_nowait) self.cond.acquire() - self.assertEqual(self.cond._nwaiters, 3) - # This new thread can"t acquire the lock, and thus call wait, before we + self.assertEqual(len(self.cond._waiters), 3) + self.assertEqual(self.cond._waiters, set(threads)) + # This new thread can't acquire the lock, and thus call wait, before we # release it self._addThread(target=fn) self.cond.notifyAll() @@ -1438,6 +1440,9 @@ class TestLockMonitor(_ThreadedTestCase): self.assertEqual(len(self.lm._locks), len(locks)) + self.assertEqual(len(self.lm.QueryLocks(["name"], False)), + 100) + # Delete all locks del locks[:] @@ -1484,8 +1489,10 @@ class TestLockMonitor(_ThreadedTestCase): self.assertRaises(NotImplementedError, self.lm.QueryLocks, ["name"], True) # Check query result - self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False), - [[name, None, None] for name in utils.NiceSort(expnames)]) + self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner", "pending"], + False), + [[name, None, None, []] + for name in utils.NiceSort(expnames)]) # Test exclusive acquire for tlock in locks[::4]: @@ -1493,10 +1500,12 @@ class TestLockMonitor(_ThreadedTestCase): try: def _GetExpResult(name): if tlock.name == name: - return [name, "exclusive", [threading.currentThread().getName()]] - return [name, None, None] + return [name, "exclusive", [threading.currentThread().getName()], + []] + return [name, None, None, []] - self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False), + self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner", + "pending"], False), [_GetExpResult(name) for name in utils.NiceSort(expnames)]) finally: @@ -1591,6 +1600,73 @@ class TestLockMonitor(_ThreadedTestCase): [[lock.name, "deleted", None]]) self.assertEqual(len(self.lm._locks), 1) + def testPending(self): + def _Acquire(lock, shared, prev, next): + prev.wait() + + lock.acquire(shared=shared, test_notify=next.set) + try: + pass + finally: + lock.release() + + lock = locking.SharedLock("ExcLock", monitor=self.lm) + + for shared in [0, 1]: + lock.acquire() + try: + self.assertEqual(len(self.lm._locks), 1) + self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False), + [[lock.name, "exclusive", + [threading.currentThread().getName()]]]) + + threads = [] + + first = threading.Event() + prev = first + + for i in range(5): + ev = threading.Event() + threads.append(self._addThread(target=_Acquire, + args=(lock, shared, prev, ev))) + prev = ev + + # Start acquires + first.set() + + # Wait for last acquire to start waiting + prev.wait() + + # NOTE: This works only because QueryLocks will acquire the + # lock-internal lock again and won't be able to get the information + # until it has the lock. By then the acquire should be registered in + # SharedLock.__pending (otherwise it's a bug). + + # All acquires are waiting now + if shared: + pending = [("shared", sorted([t.getName() for t in threads]))] + else: + pending = [("exclusive", [t.getName()]) for t in threads] + + self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner", + "pending"], False), + [[lock.name, "exclusive", + [threading.currentThread().getName()], + pending]]) + + self.assertEqual(len(self.lm._locks), 1) + finally: + lock.release() + + self._waitThreads() + + # No pending acquires + self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner", "pending"], + False), + [[lock.name, None, None, []]]) + + self.assertEqual(len(self.lm._locks), 1) + if __name__ == '__main__': testutils.GanetiTestProgram() -- GitLab