diff --git a/lib/client/gnt_debug.py b/lib/client/gnt_debug.py index 2cd537c040a713ef49b6b4d6b8936089c1766c73..37eeed5268686e353b8433254412351c093c6c4d 100644 --- a/lib/client/gnt_debug.py +++ b/lib/client/gnt_debug.py @@ -479,39 +479,34 @@ def ListLocks(opts, args): # pylint: disable-msg=W0613 """ selected_fields = ParseFields(opts.output, _LIST_LOCKS_DEF_FIELDS) - if not opts.no_headers: - headers = { - "name": "Name", - "mode": "Mode", - "owner": "Owner", - "pending": "Pending", - } - else: - headers = None + def _DashIfNone(fn): + def wrapper(value): + if not value: + return "-" + return fn(value) + return wrapper + + def _FormatPending(value): + """Format pending acquires. + + """ + return utils.CommaJoin("%s:%s" % (mode, ",".join(threads)) + for mode, threads in value) + + # Format raw values + fmtoverride = { + "mode": (_DashIfNone(str), False), + "owner": (_DashIfNone(",".join), False), + "pending": (_DashIfNone(_FormatPending), False), + } while True: - # Not reusing client as interval might be too long - output = GetClient().QueryLocks(selected_fields, False) - - # change raw values to nicer strings - for row in output: - for idx, field in enumerate(selected_fields): - val = row[idx] - - if field in ("mode", "owner", "pending") and not val: - val = "-" - elif field == "owner": - val = ",".join(val) - elif field == "pending": - val = utils.CommaJoin("%s:%s" % (mode, ",".join(threads)) - for mode, threads in val) - - row[idx] = str(val) - - data = GenerateTable(separator=opts.separator, headers=headers, - fields=selected_fields, data=output) - for line in data: - ToStdout(line) + ret = GenericList(constants.QR_LOCK, selected_fields, None, None, + opts.separator, not opts.no_headers, + format_override=fmtoverride) + + if ret != constants.EXIT_SUCCESS: + return ret if not opts.interval: break diff --git a/lib/constants.py b/lib/constants.py index deae46c2a6a5d11ba7fcd1cb08bfe37c31c38fb9..1a3ccd61ffbe4ec03bd2ecd963bbef647146c027 100644 --- a/lib/constants.py +++ b/lib/constants.py @@ -951,12 +951,14 @@ JQT_ALL = frozenset([ # Query resources QR_INSTANCE = "instance" QR_NODE = "node" +QR_LOCK = "lock" #: List of resources which can be queried using L{opcodes.OpQuery} QR_OP_QUERY = frozenset([QR_INSTANCE, QR_NODE]) #: List of resources which can be queried using LUXI QR_OP_LUXI = QR_OP_QUERY.union([ + QR_LOCK, ]) # Query field types diff --git a/lib/locking.py b/lib/locking.py index e44546aab464a738a1b2117e0c89b6fe322f2197..35750851e169ac51594598ae75359688b78c8519 100644 --- a/lib/locking.py +++ b/lib/locking.py @@ -32,14 +32,17 @@ import errno import weakref import logging import heapq +import operator from ganeti import errors from ganeti import utils from ganeti import compat +from ganeti import query _EXCLUSIVE_TEXT = "exclusive" _SHARED_TEXT = "shared" +_DELETED_TEXT = "deleted" _DEFAULT_PRIORITY = 0 @@ -432,65 +435,60 @@ class SharedLock(object): if monitor: monitor.RegisterLock(self) - def GetInfo(self, fields): + def GetInfo(self, requested): """Retrieves information for querying locks. - @type fields: list of strings - @param fields: List of fields to return + @type requested: set + @param requested: Requested information, see C{query.LQ_*} """ self.__lock.acquire() try: - info = [] - # Note: to avoid unintentional race conditions, no references to # modifiable objects should be returned unless they were created in this # function. - for fname in fields: - if fname == "name": - info.append(self.name) - elif fname == "mode": - if self.__deleted: - info.append("deleted") - assert not (self.__exc or self.__shr) - elif self.__exc: - info.append(_EXCLUSIVE_TEXT) - elif self.__shr: - info.append(_SHARED_TEXT) - else: - info.append(None) - elif fname == "owner": - if self.__exc: - owner = [self.__exc] - else: - owner = self.__shr - - if owner: - assert not self.__deleted - info.append([i.getName() for i in owner]) - else: - info.append(None) - elif fname == "pending": - data = [] - - # Sorting instead of copying and using heaq functions for simplicity - for (_, prioqueue) in sorted(self.__pending): - for cond in prioqueue: - if cond.shared: - mode = _SHARED_TEXT - else: - mode = _EXCLUSIVE_TEXT - - # This function should be fast as it runs with the lock held. - # Hence not using utils.NiceSort. - data.append((mode, sorted(i.getName() - for i in cond.get_waiting()))) - - info.append(data) + mode = None + owner_names = None + + if query.LQ_MODE in requested: + if self.__deleted: + mode = _DELETED_TEXT + assert not (self.__exc or self.__shr) + elif self.__exc: + mode = _EXCLUSIVE_TEXT + elif self.__shr: + mode = _SHARED_TEXT + + # Current owner(s) are wanted + if query.LQ_OWNER in requested: + if self.__exc: + owner = [self.__exc] else: - raise errors.OpExecError("Invalid query field '%s'" % fname) + owner = self.__shr + + if owner: + assert not self.__deleted + owner_names = [i.getName() for i in owner] - return info + # Pending acquires are wanted + if query.LQ_PENDING in requested: + pending = [] + + # Sorting instead of copying and using heaq functions for simplicity + for (_, prioqueue) in sorted(self.__pending): + for cond in prioqueue: + if cond.shared: + pendmode = _SHARED_TEXT + else: + pendmode = _EXCLUSIVE_TEXT + + # List of names will be sorted in L{query._GetLockPending} + pending.append((pendmode, [i.getName() + for i in cond.get_waiting()])) + else: + pending = None + + return (self.name, mode, owner_names, pending) finally: self.__lock.release() @@ -1344,13 +1342,21 @@ class GanetiLockManager: monitor=self._monitor), } - def QueryLocks(self, fields, sync): + def QueryLocks(self, fields): """Queries information from all locks. See L{LockMonitor.QueryLocks}. """ - return self._monitor.QueryLocks(fields, sync) + return self._monitor.QueryLocks(fields) + + def OldStyleQueryLocks(self, fields): + """Queries information from all locks, returning old-style data. + + See L{LockMonitor.OldStyleQueryLocks}. + + """ + return self._monitor.OldStyleQueryLocks(fields) def _names(self, level): """List the lock names at the given level. @@ -1533,32 +1539,46 @@ class LockMonitor(object): self._locks[lock] = None @ssynchronized(_LOCK_ATTR) - def _GetLockInfo(self, fields): + def _GetLockInfo(self, requested): """Get information from all locks while the monitor lock is held. """ - result = {} + return [lock.GetInfo(requested) for lock in self._locks.keys()] - for lock in self._locks.keys(): - assert lock.name not in result, "Found duplicate lock name" - result[lock.name] = lock.GetInfo(fields) + def _Query(self, fields): + """Queries information from all locks. - return result + @type fields: list of strings + @param fields: List of fields to return + + """ + qobj = query.Query(query.LOCK_FIELDS, fields) + + # Get all data and sort by name + lockinfo = utils.NiceSort(self._GetLockInfo(qobj.RequestedData()), + key=operator.itemgetter(0)) + + return (qobj, query.LockQueryData(lockinfo)) - def QueryLocks(self, fields, sync): + def QueryLocks(self, fields): """Queries information from all locks. @type fields: list of strings @param fields: List of fields to return - @type sync: boolean - @param sync: Whether to operate in synchronous mode """ - if sync: - raise NotImplementedError("Synchronous queries are not implemented") + (qobj, ctx) = self._Query(fields) - # Get all data without sorting - result = self._GetLockInfo(fields) + # Prepare query response + return query.GetQueryResponse(qobj, ctx) + + def OldStyleQueryLocks(self, fields): + """Queries information from all locks, returning old-style data. + + @type fields: list of strings + @param fields: List of fields to return + + """ + (qobj, ctx) = self._Query(fields) - # Sort by name - return [result[name] for name in utils.NiceSort(result.keys())] + return qobj.OldStyleQuery(ctx) diff --git a/lib/luxi.py b/lib/luxi.py index e4c0ff753ce3a6f6d138326a31ee3061321cb64c..023fad7a7556a9591ffbbf810b2dad9e5b1fb71f 100644 --- a/lib/luxi.py +++ b/lib/luxi.py @@ -34,6 +34,7 @@ import collections import time import errno import logging +import warnings from ganeti import serializer from ganeti import constants @@ -543,4 +544,6 @@ class Client(object): return self.CallMethod(REQ_QUERY_TAGS, (kind, name)) def QueryLocks(self, fields, sync): + warnings.warn("This LUXI call is deprecated and will be removed, use" + " Query(\"%s\", ...) instead" % constants.QR_LOCK) return self.CallMethod(REQ_QUERY_LOCKS, (fields, sync)) diff --git a/lib/query.py b/lib/query.py index 9b599a90b2b51a797537ad3760d1904a0a47a4bd..14c0d4fd5da9866619c54e3649c46f399baca378 100644 --- a/lib/query.py +++ b/lib/query.py @@ -42,6 +42,9 @@ from ganeti import ht IQ_LIVE, IQ_DISKUSAGE) = range(100, 103) +(LQ_MODE, + LQ_OWNER, + LQ_PENDING) = range(10, 13) FIELD_NAME_RE = re.compile(r"^[a-z0-9/._]+$") TITLE_RE = re.compile(r"^[^\s]+$") @@ -1003,8 +1006,69 @@ def _BuildInstanceFields(): return _PrepareFieldList(fields) +class LockQueryData: + """Data container for lock data queries. + + """ + def __init__(self, lockdata): + """Initializes this class. + + """ + self.lockdata = lockdata + + def __iter__(self): + """Iterate over all locks. + + """ + return iter(self.lockdata) + + +def _GetLockOwners(_, data): + """Returns a sorted list of a lock's current owners. + + """ + (_, _, owners, _) = data + + if owners: + owners = utils.NiceSort(owners) + + return (constants.QRFS_NORMAL, owners) + + +def _GetLockPending(_, data): + """Returns a sorted list of a lock's pending acquires. + + """ + (_, _, _, pending) = data + + if pending: + pending = [(mode, utils.NiceSort(names)) + for (mode, names) in pending] + + return (constants.QRFS_NORMAL, pending) + + +def _BuildLockFields(): + """Builds list of fields for lock queries. + + """ + return _PrepareFieldList([ + (_MakeField("name", "Name", constants.QFT_TEXT), None, + lambda ctx, (name, mode, owners, pending): (constants.QRFS_NORMAL, name)), + (_MakeField("mode", "Mode", constants.QFT_OTHER), LQ_MODE, + lambda ctx, (name, mode, owners, pending): (constants.QRFS_NORMAL, mode)), + (_MakeField("owner", "Owner", constants.QFT_OTHER), LQ_OWNER, + _GetLockOwners), + (_MakeField("pending", "Pending", constants.QFT_OTHER), LQ_PENDING, + _GetLockPending), + ]) + + #: Fields available for node queries NODE_FIELDS = _BuildNodeFields() #: Fields available for instance queries INSTANCE_FIELDS = _BuildInstanceFields() + +#: Fields available for lock queries +LOCK_FIELDS = _BuildLockFields() diff --git a/lib/server/masterd.py b/lib/server/masterd.py index 906923de5d9c27f3e1480f3059f4164fbfa6c940..8b276c3f68561d73d92fb32ce5df3799cc06945d 100644 --- a/lib/server/masterd.py +++ b/lib/server/masterd.py @@ -56,6 +56,7 @@ from ganeti import rpc from ganeti import bootstrap from ganeti import netutils from ganeti import objects +from ganeti import query CLIENT_REQUEST_WORKERS = 16 @@ -234,6 +235,10 @@ class ClientOps: if req.what in constants.QR_OP_QUERY: result = self._Query(opcodes.OpQuery(what=req.what, fields=req.fields, filter=req.filter)) + elif req.what == constants.QR_LOCK: + if req.filter is not None: + raise errors.OpPrereqError("Lock queries can't be filtered") + return self.server.context.glm.QueryLocks(req.fields) elif req.what in constants.QR_OP_LUXI: raise NotImplementedError else: @@ -248,6 +253,8 @@ class ClientOps: if req.what in constants.QR_OP_QUERY: result = self._Query(opcodes.OpQueryFields(what=req.what, fields=req.fields)) + elif req.what == constants.QR_LOCK: + return query.QueryFields(query.LOCK_FIELDS, req.fields) elif req.what in constants.QR_OP_LUXI: raise NotImplementedError else: @@ -323,7 +330,9 @@ class ClientOps: elif method == luxi.REQ_QUERY_LOCKS: (fields, sync) = args logging.info("Received locks query request") - return self.server.context.glm.QueryLocks(fields, sync) + if sync: + raise NotImplementedError("Synchronous queries are not implemented") + return self.server.context.glm.OldStyleQueryLocks(fields) elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG: drain_flag = args diff --git a/test/ganeti.locking_unittest.py b/test/ganeti.locking_unittest.py index 3284131f5afde469c1e4ddff3e12b421e2c54a3b..3364c88b6d7825fa6c7c40d6476d941ee39884a1 100755 --- a/test/ganeti.locking_unittest.py +++ b/test/ganeti.locking_unittest.py @@ -30,10 +30,13 @@ import threading import random import itertools +from ganeti import constants from ganeti import locking from ganeti import errors from ganeti import utils from ganeti import compat +from ganeti import objects +from ganeti import query import testutils @@ -773,16 +776,12 @@ class TestSharedLock(_ThreadedTestCase): prev.wait() # Check lock information - self.assertEqual(self.sl.GetInfo(["name"]), [self.sl.name]) - self.assertEqual(self.sl.GetInfo(["mode", "owner"]), - ["exclusive", [threading.currentThread().getName()]]) - self.assertEqual(self.sl.GetInfo(["name", "pending"]), - [self.sl.name, - [(["exclusive", "shared"][int(bool(shared))], - sorted([t.getName() for t in threads])) - for acquires in [perprio[i] - for i in sorted(perprio.keys())] - for (shared, _, threads) in acquires]]) + self.assertEqual(self.sl.GetInfo(set()), (self.sl.name, None, None, None)) + self.assertEqual(self.sl.GetInfo(set([query.LQ_MODE, query.LQ_OWNER])), + (self.sl.name, "exclusive", + [threading.currentThread().getName()], None)) + + self._VerifyPrioPending(self.sl.GetInfo(set([query.LQ_PENDING])), perprio) # Let threads acquire the lock self.sl.release() @@ -803,6 +802,19 @@ class TestSharedLock(_ThreadedTestCase): self.assertRaises(Queue.Empty, self.done.get_nowait) + def _VerifyPrioPending(self, (name, mode, owner, pending), perprio): + self.assertEqual(name, self.sl.name) + self.assert_(mode is None) + self.assert_(owner is None) + + self.assertEqual([(pendmode, sorted(waiting)) + for (pendmode, waiting) in pending], + [(["exclusive", "shared"][int(bool(shared))], + sorted(t.getName() for t in threads)) + for acquires in [perprio[i] + for i in sorted(perprio.keys())] + for (shared, _, threads) in acquires]) + class TestSharedLockInCondition(_ThreadedTestCase): """SharedLock as a condition lock tests""" @@ -1638,9 +1650,9 @@ class TestLockMonitor(_ThreadedTestCase): locks.append(locking.SharedLock(name, monitor=self.lm)) self.assertEqual(len(self.lm._locks), len(locks)) - - self.assertEqual(len(self.lm.QueryLocks(["name"], False)), - 100) + result = objects.QueryResponse.FromDict(self.lm.QueryLocks(["name"])) + self.assertEqual(len(result.fields), 1) + self.assertEqual(len(result.data), 100) # Delete all locks del locks[:] @@ -1684,14 +1696,19 @@ class TestLockMonitor(_ThreadedTestCase): # Check order in which locks were added self.assertEqual([i.name for i in locks], expnames) - # Sync queries are not supported - self.assertRaises(NotImplementedError, self.lm.QueryLocks, ["name"], True) - # Check query result - self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner", "pending"], - False), - [[name, None, None, []] + result = self.lm.QueryLocks(["name", "mode", "owner", "pending"]) + self.assert_(isinstance(result, dict)) + response = objects.QueryResponse.FromDict(result) + self.assertEqual(response.data, + [[(constants.QRFS_NORMAL, name), + (constants.QRFS_NORMAL, None), + (constants.QRFS_NORMAL, None), + (constants.QRFS_NORMAL, [])] for name in utils.NiceSort(expnames)]) + self.assertEqual(len(response.fields), 4) + self.assertEqual(["name", "mode", "owner", "pending"], + [fdef.name for fdef in response.fields]) # Test exclusive acquire for tlock in locks[::4]: @@ -1699,12 +1716,18 @@ class TestLockMonitor(_ThreadedTestCase): try: def _GetExpResult(name): if tlock.name == name: - return [name, "exclusive", [threading.currentThread().getName()], - []] - return [name, None, None, []] - - self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner", - "pending"], False), + return [(constants.QRFS_NORMAL, name), + (constants.QRFS_NORMAL, "exclusive"), + (constants.QRFS_NORMAL, + [threading.currentThread().getName()]), + (constants.QRFS_NORMAL, [])] + return [(constants.QRFS_NORMAL, name), + (constants.QRFS_NORMAL, None), + (constants.QRFS_NORMAL, None), + (constants.QRFS_NORMAL, [])] + + result = self.lm.QueryLocks(["name", "mode", "owner", "pending"]) + self.assertEqual(objects.QueryResponse.FromDict(result).data, [_GetExpResult(name) for name in utils.NiceSort(expnames)]) finally: @@ -1756,47 +1779,64 @@ class TestLockMonitor(_ThreadedTestCase): i.wait() # Check query result - for (name, mode, owner) in self.lm.QueryLocks(["name", "mode", - "owner"], False): - if name == tlock1.name: - self.assertEqual(mode, "shared") - self.assertEqual(set(owner), set(i.getName() for i in tthreads1)) + result = self.lm.QueryLocks(["name", "mode", "owner"]) + response = objects.QueryResponse.FromDict(result) + for (name, mode, owner) in response.data: + (name_status, name_value) = name + (owner_status, owner_value) = owner + + self.assertEqual(name_status, constants.QRFS_NORMAL) + self.assertEqual(owner_status, constants.QRFS_NORMAL) + + if name_value == tlock1.name: + self.assertEqual(mode, (constants.QRFS_NORMAL, "shared")) + self.assertEqual(set(owner_value), + set(i.getName() for i in tthreads1)) continue - if name == tlock2.name: - self.assertEqual(mode, "shared") - self.assertEqual(owner, [tthread2.getName()]) + if name_value == tlock2.name: + self.assertEqual(mode, (constants.QRFS_NORMAL, "shared")) + self.assertEqual(owner_value, [tthread2.getName()]) continue - if name == tlock3.name: - self.assertEqual(mode, "exclusive") - self.assertEqual(owner, [tthread3.getName()]) + if name_value == tlock3.name: + self.assertEqual(mode, (constants.QRFS_NORMAL, "exclusive")) + self.assertEqual(owner_value, [tthread3.getName()]) continue - self.assert_(name in expnames) - self.assert_(mode is None) - self.assert_(owner is None) + self.assert_(name_value in expnames) + self.assertEqual(mode, (constants.QRFS_NORMAL, None)) + self.assert_(owner_value is None) # Release locks again releaseev.set() self._waitThreads() - self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False), - [[name, None, None] + result = self.lm.QueryLocks(["name", "mode", "owner"]) + self.assertEqual(objects.QueryResponse.FromDict(result).data, + [[(constants.QRFS_NORMAL, name), + (constants.QRFS_NORMAL, None), + (constants.QRFS_NORMAL, None)] for name in utils.NiceSort(expnames)]) def testDelete(self): lock = locking.SharedLock("TestLock", monitor=self.lm) self.assertEqual(len(self.lm._locks), 1) - self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False), - [[lock.name, None, None]]) + result = self.lm.QueryLocks(["name", "mode", "owner"]) + self.assertEqual(objects.QueryResponse.FromDict(result).data, + [[(constants.QRFS_NORMAL, lock.name), + (constants.QRFS_NORMAL, None), + (constants.QRFS_NORMAL, None)]]) lock.delete() - self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False), - [[lock.name, "deleted", None]]) + result = self.lm.QueryLocks(["name", "mode", "owner"]) + self.assertEqual(objects.QueryResponse.FromDict(result).data, + [[(constants.QRFS_NORMAL, lock.name), + (constants.QRFS_NORMAL, "deleted"), + (constants.QRFS_NORMAL, None)]]) self.assertEqual(len(self.lm._locks), 1) def testPending(self): @@ -1815,9 +1855,12 @@ class TestLockMonitor(_ThreadedTestCase): lock.acquire() try: self.assertEqual(len(self.lm._locks), 1) - self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False), - [[lock.name, "exclusive", - [threading.currentThread().getName()]]]) + result = self.lm.QueryLocks(["name", "mode", "owner"]) + self.assertEqual(objects.QueryResponse.FromDict(result).data, + [[(constants.QRFS_NORMAL, lock.name), + (constants.QRFS_NORMAL, "exclusive"), + (constants.QRFS_NORMAL, + [threading.currentThread().getName()])]]) threads = [] @@ -1843,15 +1886,17 @@ class TestLockMonitor(_ThreadedTestCase): # All acquires are waiting now if shared: - pending = [("shared", sorted([t.getName() for t in threads]))] + pending = [("shared", utils.NiceSort(t.getName() for t in threads))] else: pending = [("exclusive", [t.getName()]) for t in threads] - self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner", - "pending"], False), - [[lock.name, "exclusive", - [threading.currentThread().getName()], - pending]]) + result = self.lm.QueryLocks(["name", "mode", "owner", "pending"]) + self.assertEqual(objects.QueryResponse.FromDict(result).data, + [[(constants.QRFS_NORMAL, lock.name), + (constants.QRFS_NORMAL, "exclusive"), + (constants.QRFS_NORMAL, + [threading.currentThread().getName()]), + (constants.QRFS_NORMAL, pending)]]) self.assertEqual(len(self.lm._locks), 1) finally: @@ -1860,9 +1905,12 @@ class TestLockMonitor(_ThreadedTestCase): self._waitThreads() # No pending acquires - self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner", "pending"], - False), - [[lock.name, None, None, []]]) + result = self.lm.QueryLocks(["name", "mode", "owner", "pending"]) + self.assertEqual(objects.QueryResponse.FromDict(result).data, + [[(constants.QRFS_NORMAL, lock.name), + (constants.QRFS_NORMAL, None), + (constants.QRFS_NORMAL, None), + (constants.QRFS_NORMAL, [])]]) self.assertEqual(len(self.lm._locks), 1)