Commit 44b4eddc authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

Make lock monitor more versatile



With this change it'll be possible to register other lock information
providers. One usecase for this are job dependencies, which can be shown
in the output of “gnt-debug locks”, too.

The lock monitor is changed to accept more than one return value from
the function providing the information. Unfortunately it's hard to keep
weak references to bound methods, so that I settled on keeping a weak
reference on the object instead (see note in docstring).
Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarIustin Pop <iustin@google.com>
parent ee041788
......@@ -434,9 +434,10 @@ class SharedLock(object):
# Register with lock monitor
if monitor:
logging.debug("Adding lock %s to monitor", name)
monitor.RegisterLock(self)
def GetInfo(self, requested):
def GetLockInfo(self, requested):
"""Retrieves information for querying locks.
@type requested: set
......@@ -489,7 +490,7 @@ class SharedLock(object):
else:
pending = None
return (self.name, mode, owner_names, pending)
return [(self.name, mode, owner_names, pending)]
finally:
self.__lock.release()
......@@ -1638,15 +1639,17 @@ class GanetiLockManager:
return self.__keyring[level].remove(names)
def _MonitorSortKey((num, item)):
def _MonitorSortKey((item, idx, num)):
"""Sorting key function.
Sort by name, then by incoming order.
Sort by name, registration order and then order of information. This provides
a stable sort order over different providers, even if they return the same
name.
"""
(name, _, _, _) = item
return (utils.NiceSortKey(name), num)
return (utils.NiceSortKey(name), num, idx)
class LockMonitor(object):
......@@ -1666,12 +1669,19 @@ class LockMonitor(object):
self._locks = weakref.WeakKeyDictionary()
@ssynchronized(_LOCK_ATTR)
def RegisterLock(self, lock):
def RegisterLock(self, provider):
"""Registers a new lock.
@param provider: Object with a callable method named C{GetLockInfo}, taking
a single C{set} containing the requested information items
@note: It would be nicer to only receive the function generating the
requested information but, as it turns out, weak references to bound
methods (e.g. C{self.GetLockInfo}) are tricky; there are several
workarounds, but none of the ones I found works properly in combination
with a standard C{WeakKeyDictionary}
"""
logging.debug("Registering lock %s", lock.name)
assert lock not in self._locks, "Duplicate lock registration"
assert provider not in self._locks, "Duplicate registration"
# There used to be a check for duplicate names here. As it turned out, when
# a lock is re-created with the same name in a very short timeframe, the
......@@ -1679,14 +1689,22 @@ class LockMonitor(object):
# By keeping track of the order of incoming registrations, a stable sort
# ordering can still be guaranteed.
self._locks[lock] = self._counter.next()
self._locks[provider] = self._counter.next()
@ssynchronized(_LOCK_ATTR)
def _GetLockInfo(self, requested):
"""Get information from all locks while the monitor lock is held.
"""Get information from all locks.
"""
return [(num, lock.GetInfo(requested)) for lock, num in self._locks.items()]
# Must hold lock while getting consistent list of tracked items
self._lock.acquire(shared=1)
try:
items = self._locks.items()
finally:
self._lock.release()
return [(info, idx, num)
for (provider, num) in items
for (idx, info) in enumerate(provider.GetLockInfo(requested))]
def _Query(self, fields):
"""Queries information from all locks.
......@@ -1703,7 +1721,7 @@ class LockMonitor(object):
key=_MonitorSortKey)
# Extract lock information and build query data
return (qobj, query.LockQueryData(map(operator.itemgetter(1), lockinfo)))
return (qobj, query.LockQueryData(map(operator.itemgetter(0), lockinfo)))
def QueryLocks(self, fields):
"""Queries information from all locks.
......
......@@ -689,9 +689,9 @@ class TestSharedLock(_ThreadedTestCase):
ev.wait()
# Check lock information
self.assertEqual(self.sl.GetInfo(set([query.LQ_MODE, query.LQ_OWNER])),
(self.sl.name, "exclusive", [th_excl1.getName()], None))
(_, _, _, pending) = self.sl.GetInfo(set([query.LQ_PENDING]))
self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER])),
[(self.sl.name, "exclusive", [th_excl1.getName()], None)])
[(_, _, _, pending), ] = self.sl.GetLockInfo(set([query.LQ_PENDING]))
self.assertEqual([(pendmode, sorted(waiting))
for (pendmode, waiting) in pending],
[("exclusive", [th_excl2.getName()]),
......@@ -705,10 +705,11 @@ class TestSharedLock(_ThreadedTestCase):
ev.wait()
# Check lock information again
self.assertEqual(self.sl.GetInfo(set([query.LQ_MODE, query.LQ_PENDING])),
(self.sl.name, "shared", None,
[("exclusive", [th_excl2.getName()])]))
(_, _, owner, _) = self.sl.GetInfo(set([query.LQ_OWNER]))
self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE,
query.LQ_PENDING])),
[(self.sl.name, "shared", None,
[("exclusive", [th_excl2.getName()])])])
[(_, _, owner, _), ] = self.sl.GetLockInfo(set([query.LQ_OWNER]))
self.assertEqual(set(owner), set([th_excl1.getName()] +
[th.getName() for th in th_shared]))
......@@ -718,9 +719,9 @@ class TestSharedLock(_ThreadedTestCase):
self._waitThreads()
self.assertEqual(self.sl.GetInfo(set([query.LQ_MODE, query.LQ_OWNER,
query.LQ_PENDING])),
(self.sl.name, None, None, []))
self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER,
query.LQ_PENDING])),
[(self.sl.name, None, None, [])])
@_Repeat
def testMixedAcquireTimeout(self):
......@@ -887,12 +888,14 @@ class TestSharedLock(_ThreadedTestCase):
prev.wait()
# Check lock information
self.assertEqual(self.sl.GetInfo(set()), (self.sl.name, None, None, None))
self.assertEqual(self.sl.GetInfo(set([query.LQ_MODE, query.LQ_OWNER])),
(self.sl.name, "exclusive",
[threading.currentThread().getName()], None))
self.assertEqual(self.sl.GetLockInfo(set()),
[(self.sl.name, None, None, None)])
self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER])),
[(self.sl.name, "exclusive",
[threading.currentThread().getName()], None)])
self._VerifyPrioPending(self.sl.GetInfo(set([query.LQ_PENDING])), perprio)
self._VerifyPrioPending(self.sl.GetLockInfo(set([query.LQ_PENDING])),
perprio)
# Let threads acquire the lock
self.sl.release()
......@@ -913,7 +916,7 @@ class TestSharedLock(_ThreadedTestCase):
self.assertRaises(Queue.Empty, self.done.get_nowait)
def _VerifyPrioPending(self, (name, mode, owner, pending), perprio):
def _VerifyPrioPending(self, ((name, mode, owner, pending), ), perprio):
self.assertEqual(name, self.sl.name)
self.assert_(mode is None)
self.assert_(owner is None)
......@@ -2154,6 +2157,88 @@ class TestLockMonitor(_ThreadedTestCase):
result = self.lm.QueryLocks(["name", "mode", "owner"])
self.assertEqual(objects.QueryResponse.FromDict(result).data, [])
class _FakeLock:
def __init__(self):
self._info = []
def AddResult(self, *args):
self._info.append(args)
def CountPending(self):
return len(self._info)
def GetLockInfo(self, requested):
(exp_requested, result) = self._info.pop(0)
if exp_requested != requested:
raise Exception("Requested information (%s) does not match"
" expectations (%s)" % (requested, exp_requested))
return result
def testMultipleResults(self):
fl1 = self._FakeLock()
fl2 = self._FakeLock()
self.lm.RegisterLock(fl1)
self.lm.RegisterLock(fl2)
# Empty information
for i in [fl1, fl2]:
i.AddResult(set([query.LQ_MODE, query.LQ_OWNER]), [])
result = self.lm.QueryLocks(["name", "mode", "owner"])
self.assertEqual(objects.QueryResponse.FromDict(result).data, [])
for i in [fl1, fl2]:
self.assertEqual(i.CountPending(), 0)
# Check ordering
for fn in [lambda x: x, reversed, sorted]:
fl1.AddResult(set(), list(fn([
("aaa", None, None, None),
("bbb", None, None, None),
])))
fl2.AddResult(set(), [])
result = self.lm.QueryLocks(["name"])
self.assertEqual(objects.QueryResponse.FromDict(result).data, [
[(constants.RS_NORMAL, "aaa")],
[(constants.RS_NORMAL, "bbb")],
])
for i in [fl1, fl2]:
self.assertEqual(i.CountPending(), 0)
for fn2 in [lambda x: x, reversed, sorted]:
fl1.AddResult(set([query.LQ_MODE]), list(fn([
# Same name, but different information
("aaa", "mode0", None, None),
("aaa", "mode1", None, None),
("aaa", "mode2", None, None),
("aaa", "mode3", None, None),
])))
fl2.AddResult(set([query.LQ_MODE]), [
("zzz", "end", None, None),
("000", "start", None, None),
] + list(fn2([
("aaa", "b200", None, None),
("aaa", "b300", None, None),
])))
result = self.lm.QueryLocks(["name", "mode"])
self.assertEqual(objects.QueryResponse.FromDict(result).data, [
[(constants.RS_NORMAL, "000"), (constants.RS_NORMAL, "start")],
] + list(fn([
# Name is the same, so order must be equal to incoming order
[(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode0")],
[(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode1")],
[(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode2")],
[(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode3")],
])) + list(fn2([
[(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "b200")],
[(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "b300")],
])) + [
[(constants.RS_NORMAL, "zzz"), (constants.RS_NORMAL, "end")],
])
for i in [fl1, fl2]:
self.assertEqual(i.CountPending(), 0)
if __name__ == '__main__':
testutils.GanetiTestProgram()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment