diff --git a/daemons/ganeti-masterd b/daemons/ganeti-masterd index 784a871786d0b9e19130c9423db85e545d0449d7..d47aeb730dbdfaac4856c3c3545b05f04a584712 100755 --- a/daemons/ganeti-masterd +++ b/daemons/ganeti-masterd @@ -277,6 +277,11 @@ class ClientOps: op = opcodes.OpGetTags(kind=kind, name=name) return self._Query(op) + elif method == luxi.REQ_QUERY_LOCKS: + (fields, sync) = args + logging.info("Received locks query request") + return self.server.context.glm.QueryLocks(fields, sync) + elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG: drain_flag = args logging.info("Received queue drain flag change request to %s", diff --git a/lib/cli.py b/lib/cli.py index b33c98c8028f88d0f688d95ddb2518a5e372b2a3..83c288fe0aa30084dcb697fdca4f764a20411812 100644 --- a/lib/cli.py +++ b/lib/cli.py @@ -84,6 +84,7 @@ __all__ = [ "IGNORE_REMOVE_FAILURES_OPT", "IGNORE_SECONDARIES_OPT", "IGNORE_SIZE_OPT", + "INTERVAL_OPT", "MAC_PREFIX_OPT", "MAINTAIN_NODE_HEALTH_OPT", "MASTER_NETDEV_OPT", @@ -929,6 +930,11 @@ SHUTDOWN_TIMEOUT_OPT = cli_option("--shutdown-timeout", default=constants.DEFAULT_SHUTDOWN_TIMEOUT, help="Maximum time to wait for instance shutdown") +INTERVAL_OPT = cli_option("--interval", dest="interval", type="int", + default=None, + help=("Number of seconds between repetions of the" + " command")) + EARLY_RELEASE_OPT = cli_option("--early-release", dest="early_release", default=False, action="store_true", diff --git a/lib/locking.py b/lib/locking.py index 8793fae130e624c2c29511a41254cf23c02d761a..2aeb7d980bdd8081d8c911ff7c7ea93437181e1f 100644 --- a/lib/locking.py +++ b/lib/locking.py @@ -30,6 +30,8 @@ import select import threading import time import errno +import weakref +import logging from ganeti import errors from ganeti import utils @@ -409,6 +411,7 @@ class SharedLock(object): """ __slots__ = [ + "__weakref__", "__active_shr_c", "__inactive_shr_c", "__deleted", @@ -421,10 +424,12 @@ class SharedLock(object): __condition_class = PipeCondition - def __init__(self, name): + def __init__(self, name, monitor=None): """Construct a new SharedLock. @param name: the name of the lock + @type monitor: L{LockMonitor} + @param monitor: Lock monitor with which to register """ object.__init__(self) @@ -448,6 +453,55 @@ class SharedLock(object): # is this lock in the deleted state? self.__deleted = False + # Register with lock monitor + if monitor: + monitor.RegisterLock(self) + + def GetInfo(self, fields): + """Retrieves information for querying locks. + + @type fields: list of strings + @param fields: List of fields to return + + """ + self.__lock.acquire() + try: + info = [] + + # Note: to avoid unintentional race conditions, no references to + # modifiable objects should be returned unless they were created in this + # function. + for fname in fields: + if fname == "name": + info.append(self.name) + elif fname == "mode": + if self.__deleted: + info.append("deleted") + assert not (self.__exc or self.__shr) + elif self.__exc: + info.append("exclusive") + elif self.__shr: + info.append("shared") + else: + info.append(None) + elif fname == "owner": + if self.__exc: + owner = [self.__exc] + else: + owner = self.__shr + + if owner: + assert not self.__deleted + info.append([i.getName() for i in owner]) + else: + info.append(None) + else: + raise errors.OpExecError("Invalid query field '%s'" % fname) + + return info + finally: + self.__lock.release() + def __check_deleted(self): """Raises an exception if the lock has been deleted. @@ -671,6 +725,8 @@ class SharedLock(object): self.__deleted = True self.__exc = None + assert not (self.__exc or self.__shr), "Found owner during deletion" + # Notify all acquires. They'll throw an error. while self.__pending: self.__pending.pop().notifyAll() @@ -713,16 +769,21 @@ class LockSet: @ivar name: the name of the lockset """ - def __init__(self, members, name): + def __init__(self, members, name, monitor=None): """Constructs a new LockSet. @type members: list of strings @param members: initial members of the set + @type monitor: L{LockMonitor} + @param monitor: Lock monitor with which to register member locks """ assert members is not None, "members parameter is not a list" self.name = name + # Lock monitor + self.__monitor = monitor + # Used internally to guarantee coherency. self.__lock = SharedLock(name) @@ -731,7 +792,8 @@ class LockSet: self.__lockdict = {} for mname in members: - self.__lockdict[mname] = SharedLock(self._GetLockName(mname)) + self.__lockdict[mname] = SharedLock(self._GetLockName(mname), + monitor=monitor) # The owner dict contains the set of locks each thread owns. For # performance each thread can access its own key without a global lock on @@ -1055,7 +1117,7 @@ class LockSet: (invalid_names, self.name)) for lockname in names: - lock = SharedLock(self._GetLockName(lockname)) + lock = SharedLock(self._GetLockName(lockname), monitor=self.__monitor) if acquired: lock.acquire(shared=shared) @@ -1193,13 +1255,24 @@ class GanetiLockManager: self.__class__._instance = self + self._monitor = LockMonitor() + # The keyring contains all the locks, at their level and in the correct # locking order. self.__keyring = { - LEVEL_CLUSTER: LockSet([BGL], "bgl lockset"), - LEVEL_NODE: LockSet(nodes, "nodes lockset"), - LEVEL_INSTANCE: LockSet(instances, "instances lockset"), - } + LEVEL_CLUSTER: LockSet([BGL], "BGL", monitor=self._monitor), + LEVEL_NODE: LockSet(nodes, "nodes", monitor=self._monitor), + LEVEL_INSTANCE: LockSet(instances, "instances", + monitor=self._monitor), + } + + def QueryLocks(self, fields, sync): + """Queries information from all locks. + + See L{LockMonitor.QueryLocks}. + + """ + return self._monitor.QueryLocks(fields, sync) def _names(self, level): """List the lock names at the given level. @@ -1352,3 +1425,59 @@ class GanetiLockManager: "Cannot remove locks at a level while not owning it or" " owning some at a greater one") return self.__keyring[level].remove(names) + + +class LockMonitor(object): + _LOCK_ATTR = "_lock" + + def __init__(self): + """Initializes this class. + + """ + self._lock = SharedLock("LockMonitor") + + # Tracked locks. Weak references are used to avoid issues with circular + # references and deletion. + self._locks = weakref.WeakKeyDictionary() + + @ssynchronized(_LOCK_ATTR) + def RegisterLock(self, lock): + """Registers a new lock. + + """ + logging.debug("Registering lock %s", lock.name) + assert lock not in self._locks, "Duplicate lock registration" + assert not compat.any(lock.name == i.name for i in self._locks.keys()), \ + "Found duplicate lock name" + self._locks[lock] = None + + @ssynchronized(_LOCK_ATTR) + def _GetLockInfo(self, fields): + """Get information from all locks while the monitor lock is held. + + """ + result = {} + + for lock in self._locks.keys(): + assert lock.name not in result, "Found duplicate lock name" + result[lock.name] = lock.GetInfo(fields) + + return result + + def QueryLocks(self, fields, sync): + """Queries information from all locks. + + @type fields: list of strings + @param fields: List of fields to return + @type sync: boolean + @param sync: Whether to operate in synchronous mode + + """ + if sync: + raise NotImplementedError("Synchronous queries are not implemented") + + # Get all data without sorting + result = self._GetLockInfo(fields) + + # Sort by name + return [result[name] for name in utils.NiceSort(result.keys())] diff --git a/lib/luxi.py b/lib/luxi.py index cdd5518ff290d675cd3d1e8c9acce93fb744186a..669c3dd58de805b46f32d5b3d14141e09047e9d7 100644 --- a/lib/luxi.py +++ b/lib/luxi.py @@ -59,6 +59,7 @@ REQ_QUERY_EXPORTS = "QueryExports" REQ_QUERY_CONFIG_VALUES = "QueryConfigValues" REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo" REQ_QUERY_TAGS = "QueryTags" +REQ_QUERY_LOCKS = "QueryLocks" REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag" REQ_SET_WATCHER_PAUSE = "SetWatcherPause" @@ -490,3 +491,6 @@ class Client(object): def QueryTags(self, kind, name): return self.CallMethod(REQ_QUERY_TAGS, (kind, name)) + + def QueryLocks(self, fields, sync): + return self.CallMethod(REQ_QUERY_LOCKS, (fields, sync)) diff --git a/man/gnt-debug.sgml b/man/gnt-debug.sgml index a4929398f22c5e220375ac2ae7384d56a0199ce8..2d2e99349f01b5ac2ea68b4e7daefff6a6d32412 100644 --- a/man/gnt-debug.sgml +++ b/man/gnt-debug.sgml @@ -192,6 +192,70 @@ + + LOCKS + + locks + --no-headers + --separator=SEPARATOR + + -o [+]FIELD,... + --interval=SECONDS + + + + + Shows a list of locks in the master daemon. + + + + The option will skip the initial + header line. The option takes an + argument which denotes what will be used between the output + fields. Both these options are to help scripting. + + + + The option takes a comma-separated list of + output fields. The available fields and their meaning are: + + + name + + Lock name + + + + mode + + + Mode in which the lock is currently acquired (exclusive or + shared) + + + + + owner + + Current lock owner(s) + + + + + + + If the value of the option starts with the character + +, the new fields will be added to the default + list. This allows to quickly see the default list plus a few other + fields, instead of retyping the entire list of fields. + + + + Use to repeat the listing. A delay + specified by the option value in seconds is inserted. + + + &footer; diff --git a/scripts/gnt-debug b/scripts/gnt-debug index a860b3da019adb2cfe64163e7458f3a07adfd9d1..8fea3a9112005295e44764cd5e3f7bfabe005419 100755 --- a/scripts/gnt-debug +++ b/scripts/gnt-debug @@ -39,6 +39,14 @@ from ganeti import utils from ganeti import errors +#: Default fields for L{ListLocks} +_LIST_LOCKS_DEF_FIELDS = [ + "name", + "mode", + "owner", + ] + + def Delay(opts, args): """Sleeps for a while @@ -398,6 +406,57 @@ def TestJobqueue(opts, _): return 0 +def ListLocks(opts, args): # pylint: disable-msg=W0613 + """List all locks. + + @param opts: the command line options selected by the user + @type args: list + @param args: should be an empty list + @rtype: int + @return: the desired exit code + + """ + selected_fields = ParseFields(opts.output, _LIST_LOCKS_DEF_FIELDS) + + if not opts.no_headers: + headers = { + "name": "Name", + "mode": "Mode", + "owner": "Owner", + } + else: + headers = None + + while True: + # Not reusing client as interval might be too long + output = GetClient().QueryLocks(selected_fields, False) + + # change raw values to nicer strings + for row in output: + for idx, field in enumerate(selected_fields): + val = row[idx] + + if field in ("mode", "owner") and val is None: + val = "-" + elif field == "owner": + val = utils.CommaJoin(val) + + row[idx] = str(val) + + data = GenerateTable(separator=opts.separator, headers=headers, + fields=selected_fields, data=output) + for line in data: + ToStdout(line) + + if not opts.interval: + break + + ToStdout("") + time.sleep(opts.interval) + + return 0 + + commands = { 'delay': ( Delay, [ArgUnknown(min=1, max=1)], @@ -454,7 +513,10 @@ commands = { "{opts...} ", "Executes a TestAllocator OpCode"), "test-jobqueue": ( TestJobqueue, ARGS_NONE, [], - "", "Test a few aspects of the job queue") + "", "Test a few aspects of the job queue"), + "locks": ( + ListLocks, ARGS_NONE, [NOHDR_OPT, SEP_OPT, FIELDS_OPT, INTERVAL_OPT], + "[--interval N]", "Show a list of locks in the master daemon"), } diff --git a/test/ganeti.locking_unittest.py b/test/ganeti.locking_unittest.py index 3eb23754a82c4dcc2ebb728e8de5ff2bec7e563c..bbfe693f20625e467c7b7be500b3d012e09e4fa9 100755 --- a/test/ganeti.locking_unittest.py +++ b/test/ganeti.locking_unittest.py @@ -27,9 +27,11 @@ import unittest import time import Queue import threading +import random from ganeti import locking from ganeti import errors +from ganeti import utils import testutils @@ -1422,5 +1424,155 @@ class TestGanetiLockManager(_ThreadedTestCase): self.GL.release(locking.LEVEL_CLUSTER, ['BGL']) +class TestLockMonitor(_ThreadedTestCase): + def setUp(self): + _ThreadedTestCase.setUp(self) + self.lm = locking.LockMonitor() + + def testSingleThread(self): + locks = [] + + for i in range(100): + name = "TestLock%s" % i + locks.append(locking.SharedLock(name, monitor=self.lm)) + + self.assertEqual(len(self.lm._locks), len(locks)) + + # Delete all locks + del locks[:] + + # The garbage collector might needs some time + def _CheckLocks(): + if self.lm._locks: + raise utils.RetryAgain() + + utils.Retry(_CheckLocks, 0.1, 30.0) + + self.assertFalse(self.lm._locks) + + def testMultiThread(self): + locks = [] + + def _CreateLock(prev, next, name): + prev.wait() + locks.append(locking.SharedLock(name, monitor=self.lm)) + if next: + next.set() + + expnames = [] + + first = threading.Event() + prev = first + + # Use a deterministic random generator + for i in random.Random(4263).sample(range(100), 33): + name = "MtTestLock%s" % i + expnames.append(name) + + ev = threading.Event() + self._addThread(target=_CreateLock, args=(prev, ev, name)) + prev = ev + + # Add locks + first.set() + self._waitThreads() + + # Check order in which locks were added + self.assertEqual([i.name for i in locks], expnames) + + # Sync queries are not supported + self.assertRaises(NotImplementedError, self.lm.QueryLocks, ["name"], True) + + # Check query result + self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False), + [[name, None, None] for name in utils.NiceSort(expnames)]) + + # Test exclusive acquire + for tlock in locks[::4]: + tlock.acquire(shared=0) + try: + def _GetExpResult(name): + if tlock.name == name: + return [name, "exclusive", [threading.currentThread().getName()]] + return [name, None, None] + + self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False), + [_GetExpResult(name) + for name in utils.NiceSort(expnames)]) + finally: + tlock.release() + + # Test shared acquire + def _Acquire(lock, shared, ev): + lock.acquire(shared=shared) + try: + ev.wait() + finally: + lock.release() + + for tlock1 in locks[::11]: + for tlock2 in locks[::-15]: + if tlock2 == tlock1: + continue + + for tlock3 in locks[::10]: + if tlock3 == tlock2: + continue + + ev = threading.Event() + + # Acquire locks + tthreads1 = [] + for i in range(3): + tthreads1.append(self._addThread(target=_Acquire, + args=(tlock1, 1, ev))) + tthread2 = self._addThread(target=_Acquire, args=(tlock2, 1, ev)) + tthread3 = self._addThread(target=_Acquire, args=(tlock3, 0, ev)) + + # Check query result + for (name, mode, owner) in self.lm.QueryLocks(["name", "mode", + "owner"], False): + if name == tlock1.name: + self.assertEqual(mode, "shared") + self.assertEqual(set(owner), set(i.getName() for i in tthreads1)) + continue + + if name == tlock2.name: + self.assertEqual(mode, "shared") + self.assertEqual(owner, [tthread2.getName()]) + continue + + if name == tlock3.name: + self.assertEqual(mode, "exclusive") + self.assertEqual(owner, [tthread3.getName()]) + continue + + self.assert_(name in expnames) + self.assert_(mode is None) + self.assert_(owner is None) + + # Release locks again + ev.set() + + self._waitThreads() + + self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False), + [[name, None, None] + for name in utils.NiceSort(expnames)]) + + def testDelete(self): + lock = locking.SharedLock("TestLock", monitor=self.lm) + + self.assertEqual(len(self.lm._locks), 1) + self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False), + [[lock.name, None, None]]) + + lock.delete() + + self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False), + [[lock.name, "deleted", None]]) + self.assertEqual(len(self.lm._locks), 1) + + if __name__ == '__main__': testutils.GanetiTestProgram()