Commit 24d16f76 authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

Convert “gnt-debug locks” to query2



Locks can now be queried using “Query(what="lock", …)” over LUXI.
Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarIustin Pop <iustin@google.com>
parent f7c8f153
......@@ -479,39 +479,34 @@ def ListLocks(opts, args): # pylint: disable-msg=W0613
"""
selected_fields = ParseFields(opts.output, _LIST_LOCKS_DEF_FIELDS)
if not opts.no_headers:
headers = {
"name": "Name",
"mode": "Mode",
"owner": "Owner",
"pending": "Pending",
}
else:
headers = None
def _DashIfNone(fn):
def wrapper(value):
if not value:
return "-"
return fn(value)
return wrapper
def _FormatPending(value):
"""Format pending acquires.
"""
return utils.CommaJoin("%s:%s" % (mode, ",".join(threads))
for mode, threads in value)
# Format raw values
fmtoverride = {
"mode": (_DashIfNone(str), False),
"owner": (_DashIfNone(",".join), False),
"pending": (_DashIfNone(_FormatPending), False),
}
while True:
# Not reusing client as interval might be too long
output = GetClient().QueryLocks(selected_fields, False)
# change raw values to nicer strings
for row in output:
for idx, field in enumerate(selected_fields):
val = row[idx]
if field in ("mode", "owner", "pending") and not val:
val = "-"
elif field == "owner":
val = ",".join(val)
elif field == "pending":
val = utils.CommaJoin("%s:%s" % (mode, ",".join(threads))
for mode, threads in val)
row[idx] = str(val)
data = GenerateTable(separator=opts.separator, headers=headers,
fields=selected_fields, data=output)
for line in data:
ToStdout(line)
ret = GenericList(constants.QR_LOCK, selected_fields, None, None,
opts.separator, not opts.no_headers,
format_override=fmtoverride)
if ret != constants.EXIT_SUCCESS:
return ret
if not opts.interval:
break
......
......@@ -951,12 +951,14 @@ JQT_ALL = frozenset([
# Query resources
QR_INSTANCE = "instance"
QR_NODE = "node"
QR_LOCK = "lock"
#: List of resources which can be queried using L{opcodes.OpQuery}
QR_OP_QUERY = frozenset([QR_INSTANCE, QR_NODE])
#: List of resources which can be queried using LUXI
QR_OP_LUXI = QR_OP_QUERY.union([
QR_LOCK,
])
# Query field types
......
......@@ -32,14 +32,17 @@ import errno
import weakref
import logging
import heapq
import operator
from ganeti import errors
from ganeti import utils
from ganeti import compat
from ganeti import query
_EXCLUSIVE_TEXT = "exclusive"
_SHARED_TEXT = "shared"
_DELETED_TEXT = "deleted"
_DEFAULT_PRIORITY = 0
......@@ -432,65 +435,60 @@ class SharedLock(object):
if monitor:
monitor.RegisterLock(self)
def GetInfo(self, fields):
def GetInfo(self, requested):
"""Retrieves information for querying locks.
@type fields: list of strings
@param fields: List of fields to return
@type requested: set
@param requested: Requested information, see C{query.LQ_*}
"""
self.__lock.acquire()
try:
info = []
# Note: to avoid unintentional race conditions, no references to
# modifiable objects should be returned unless they were created in this
# function.
for fname in fields:
if fname == "name":
info.append(self.name)
elif fname == "mode":
if self.__deleted:
info.append("deleted")
assert not (self.__exc or self.__shr)
elif self.__exc:
info.append(_EXCLUSIVE_TEXT)
elif self.__shr:
info.append(_SHARED_TEXT)
else:
info.append(None)
elif fname == "owner":
if self.__exc:
owner = [self.__exc]
else:
owner = self.__shr
if owner:
assert not self.__deleted
info.append([i.getName() for i in owner])
else:
info.append(None)
elif fname == "pending":
data = []
# Sorting instead of copying and using heaq functions for simplicity
for (_, prioqueue) in sorted(self.__pending):
for cond in prioqueue:
if cond.shared:
mode = _SHARED_TEXT
else:
mode = _EXCLUSIVE_TEXT
# This function should be fast as it runs with the lock held.
# Hence not using utils.NiceSort.
data.append((mode, sorted(i.getName()
for i in cond.get_waiting())))
info.append(data)
mode = None
owner_names = None
if query.LQ_MODE in requested:
if self.__deleted:
mode = _DELETED_TEXT
assert not (self.__exc or self.__shr)
elif self.__exc:
mode = _EXCLUSIVE_TEXT
elif self.__shr:
mode = _SHARED_TEXT
# Current owner(s) are wanted
if query.LQ_OWNER in requested:
if self.__exc:
owner = [self.__exc]
else:
raise errors.OpExecError("Invalid query field '%s'" % fname)
owner = self.__shr
if owner:
assert not self.__deleted
owner_names = [i.getName() for i in owner]
return info
# Pending acquires are wanted
if query.LQ_PENDING in requested:
pending = []
# Sorting instead of copying and using heaq functions for simplicity
for (_, prioqueue) in sorted(self.__pending):
for cond in prioqueue:
if cond.shared:
pendmode = _SHARED_TEXT
else:
pendmode = _EXCLUSIVE_TEXT
# List of names will be sorted in L{query._GetLockPending}
pending.append((pendmode, [i.getName()
for i in cond.get_waiting()]))
else:
pending = None
return (self.name, mode, owner_names, pending)
finally:
self.__lock.release()
......@@ -1344,13 +1342,21 @@ class GanetiLockManager:
monitor=self._monitor),
}
def QueryLocks(self, fields, sync):
def QueryLocks(self, fields):
"""Queries information from all locks.
See L{LockMonitor.QueryLocks}.
"""
return self._monitor.QueryLocks(fields, sync)
return self._monitor.QueryLocks(fields)
def OldStyleQueryLocks(self, fields):
"""Queries information from all locks, returning old-style data.
See L{LockMonitor.OldStyleQueryLocks}.
"""
return self._monitor.OldStyleQueryLocks(fields)
def _names(self, level):
"""List the lock names at the given level.
......@@ -1533,32 +1539,46 @@ class LockMonitor(object):
self._locks[lock] = None
@ssynchronized(_LOCK_ATTR)
def _GetLockInfo(self, fields):
def _GetLockInfo(self, requested):
"""Get information from all locks while the monitor lock is held.
"""
result = {}
return [lock.GetInfo(requested) for lock in self._locks.keys()]
for lock in self._locks.keys():
assert lock.name not in result, "Found duplicate lock name"
result[lock.name] = lock.GetInfo(fields)
def _Query(self, fields):
"""Queries information from all locks.
return result
@type fields: list of strings
@param fields: List of fields to return
"""
qobj = query.Query(query.LOCK_FIELDS, fields)
# Get all data and sort by name
lockinfo = utils.NiceSort(self._GetLockInfo(qobj.RequestedData()),
key=operator.itemgetter(0))
return (qobj, query.LockQueryData(lockinfo))
def QueryLocks(self, fields, sync):
def QueryLocks(self, fields):
"""Queries information from all locks.
@type fields: list of strings
@param fields: List of fields to return
@type sync: boolean
@param sync: Whether to operate in synchronous mode
"""
if sync:
raise NotImplementedError("Synchronous queries are not implemented")
(qobj, ctx) = self._Query(fields)
# Get all data without sorting
result = self._GetLockInfo(fields)
# Prepare query response
return query.GetQueryResponse(qobj, ctx)
def OldStyleQueryLocks(self, fields):
"""Queries information from all locks, returning old-style data.
@type fields: list of strings
@param fields: List of fields to return
"""
(qobj, ctx) = self._Query(fields)
# Sort by name
return [result[name] for name in utils.NiceSort(result.keys())]
return qobj.OldStyleQuery(ctx)
......@@ -34,6 +34,7 @@ import collections
import time
import errno
import logging
import warnings
from ganeti import serializer
from ganeti import constants
......@@ -543,4 +544,6 @@ class Client(object):
return self.CallMethod(REQ_QUERY_TAGS, (kind, name))
def QueryLocks(self, fields, sync):
warnings.warn("This LUXI call is deprecated and will be removed, use"
" Query(\"%s\", ...) instead" % constants.QR_LOCK)
return self.CallMethod(REQ_QUERY_LOCKS, (fields, sync))
......@@ -42,6 +42,9 @@ from ganeti import ht
IQ_LIVE,
IQ_DISKUSAGE) = range(100, 103)
(LQ_MODE,
LQ_OWNER,
LQ_PENDING) = range(10, 13)
FIELD_NAME_RE = re.compile(r"^[a-z0-9/._]+$")
TITLE_RE = re.compile(r"^[^\s]+$")
......@@ -1003,8 +1006,69 @@ def _BuildInstanceFields():
return _PrepareFieldList(fields)
class LockQueryData:
"""Data container for lock data queries.
"""
def __init__(self, lockdata):
"""Initializes this class.
"""
self.lockdata = lockdata
def __iter__(self):
"""Iterate over all locks.
"""
return iter(self.lockdata)
def _GetLockOwners(_, data):
"""Returns a sorted list of a lock's current owners.
"""
(_, _, owners, _) = data
if owners:
owners = utils.NiceSort(owners)
return (constants.QRFS_NORMAL, owners)
def _GetLockPending(_, data):
"""Returns a sorted list of a lock's pending acquires.
"""
(_, _, _, pending) = data
if pending:
pending = [(mode, utils.NiceSort(names))
for (mode, names) in pending]
return (constants.QRFS_NORMAL, pending)
def _BuildLockFields():
"""Builds list of fields for lock queries.
"""
return _PrepareFieldList([
(_MakeField("name", "Name", constants.QFT_TEXT), None,
lambda ctx, (name, mode, owners, pending): (constants.QRFS_NORMAL, name)),
(_MakeField("mode", "Mode", constants.QFT_OTHER), LQ_MODE,
lambda ctx, (name, mode, owners, pending): (constants.QRFS_NORMAL, mode)),
(_MakeField("owner", "Owner", constants.QFT_OTHER), LQ_OWNER,
_GetLockOwners),
(_MakeField("pending", "Pending", constants.QFT_OTHER), LQ_PENDING,
_GetLockPending),
])
#: Fields available for node queries
NODE_FIELDS = _BuildNodeFields()
#: Fields available for instance queries
INSTANCE_FIELDS = _BuildInstanceFields()
#: Fields available for lock queries
LOCK_FIELDS = _BuildLockFields()
......@@ -56,6 +56,7 @@ from ganeti import rpc
from ganeti import bootstrap
from ganeti import netutils
from ganeti import objects
from ganeti import query
CLIENT_REQUEST_WORKERS = 16
......@@ -234,6 +235,10 @@ class ClientOps:
if req.what in constants.QR_OP_QUERY:
result = self._Query(opcodes.OpQuery(what=req.what, fields=req.fields,
filter=req.filter))
elif req.what == constants.QR_LOCK:
if req.filter is not None:
raise errors.OpPrereqError("Lock queries can't be filtered")
return self.server.context.glm.QueryLocks(req.fields)
elif req.what in constants.QR_OP_LUXI:
raise NotImplementedError
else:
......@@ -248,6 +253,8 @@ class ClientOps:
if req.what in constants.QR_OP_QUERY:
result = self._Query(opcodes.OpQueryFields(what=req.what,
fields=req.fields))
elif req.what == constants.QR_LOCK:
return query.QueryFields(query.LOCK_FIELDS, req.fields)
elif req.what in constants.QR_OP_LUXI:
raise NotImplementedError
else:
......@@ -323,7 +330,9 @@ class ClientOps:
elif method == luxi.REQ_QUERY_LOCKS:
(fields, sync) = args
logging.info("Received locks query request")
return self.server.context.glm.QueryLocks(fields, sync)
if sync:
raise NotImplementedError("Synchronous queries are not implemented")
return self.server.context.glm.OldStyleQueryLocks(fields)
elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
drain_flag = args
......
......@@ -30,10 +30,13 @@ import threading
import random
import itertools
from ganeti import constants
from ganeti import locking
from ganeti import errors
from ganeti import utils
from ganeti import compat
from ganeti import objects
from ganeti import query
import testutils
......@@ -773,16 +776,12 @@ class TestSharedLock(_ThreadedTestCase):
prev.wait()
# Check lock information
self.assertEqual(self.sl.GetInfo(["name"]), [self.sl.name])
self.assertEqual(self.sl.GetInfo(["mode", "owner"]),
["exclusive", [threading.currentThread().getName()]])
self.assertEqual(self.sl.GetInfo(["name", "pending"]),
[self.sl.name,
[(["exclusive", "shared"][int(bool(shared))],
sorted([t.getName() for t in threads]))
for acquires in [perprio[i]
for i in sorted(perprio.keys())]
for (shared, _, threads) in acquires]])
self.assertEqual(self.sl.GetInfo(set()), (self.sl.name, None, None, None))
self.assertEqual(self.sl.GetInfo(set([query.LQ_MODE, query.LQ_OWNER])),
(self.sl.name, "exclusive",
[threading.currentThread().getName()], None))
self._VerifyPrioPending(self.sl.GetInfo(set([query.LQ_PENDING])), perprio)
# Let threads acquire the lock
self.sl.release()
......@@ -803,6 +802,19 @@ class TestSharedLock(_ThreadedTestCase):
self.assertRaises(Queue.Empty, self.done.get_nowait)
def _VerifyPrioPending(self, (name, mode, owner, pending), perprio):
self.assertEqual(name, self.sl.name)
self.assert_(mode is None)
self.assert_(owner is None)
self.assertEqual([(pendmode, sorted(waiting))
for (pendmode, waiting) in pending],
[(["exclusive", "shared"][int(bool(shared))],
sorted(t.getName() for t in threads))
for acquires in [perprio[i]
for i in sorted(perprio.keys())]
for (shared, _, threads) in acquires])
class TestSharedLockInCondition(_ThreadedTestCase):
"""SharedLock as a condition lock tests"""
......@@ -1638,9 +1650,9 @@ class TestLockMonitor(_ThreadedTestCase):
locks.append(locking.SharedLock(name, monitor=self.lm))
self.assertEqual(len(self.lm._locks), len(locks))
self.assertEqual(len(self.lm.QueryLocks(["name"], False)),
100)
result = objects.QueryResponse.FromDict(self.lm.QueryLocks(["name"]))
self.assertEqual(len(result.fields), 1)
self.assertEqual(len(result.data), 100)
# Delete all locks
del locks[:]
......@@ -1684,14 +1696,19 @@ class TestLockMonitor(_ThreadedTestCase):
# Check order in which locks were added
self.assertEqual([i.name for i in locks], expnames)
# Sync queries are not supported
self.assertRaises(NotImplementedError, self.lm.QueryLocks, ["name"], True)
# Check query result
self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner", "pending"],
False),
[[name, None, None, []]
result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
self.assert_(isinstance(result, dict))
response = objects.QueryResponse.FromDict(result)
self.assertEqual(response.data,
[[(constants.QRFS_NORMAL, name),
(constants.QRFS_NORMAL, None),
(constants.QRFS_NORMAL, None),
(constants.QRFS_NORMAL, [])]
for name in utils.NiceSort(expnames)])
self.assertEqual(len(response.fields), 4)
self.assertEqual(["name", "mode", "owner", "pending"],
[fdef.name for fdef in response.fields])
# Test exclusive acquire
for tlock in locks[::4]:
......@@ -1699,12 +1716,18 @@ class TestLockMonitor(_ThreadedTestCase):
try:
def _GetExpResult(name):
if tlock.name == name:
return [name, "exclusive", [threading.currentThread().getName()],
[]]
return [name, None, None, []]
self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner",
"pending"], False),
return [(constants.QRFS_NORMAL, name),
(constants.QRFS_NORMAL, "exclusive"),
(constants.QRFS_NORMAL,
[threading.currentThread().getName()]),
(constants.QRFS_NORMAL, [])]
return [(constants.QRFS_NORMAL, name),
(constants.QRFS_NORMAL, None),
(constants.QRFS_NORMAL, None),
(constants.QRFS_NORMAL, [])]
result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
self.assertEqual(objects.QueryResponse.FromDict(result).data,
[_GetExpResult(name)
for name in utils.NiceSort(expnames)])
finally:
......@@ -1756,47 +1779,64 @@ class TestLockMonitor(_ThreadedTestCase):
i.wait()
# Check query result
for (name, mode, owner) in self.lm.QueryLocks(["name", "mode",
"owner"], False):
if name == tlock1.name:
self.assertEqual(mode, "shared")
self.assertEqual(set(owner), set(i.getName() for i in tthreads1))
result = self.lm.QueryLocks(["name", "mode", "owner"])
response = objects.QueryResponse.FromDict(result)
for (name, mode, owner) in response.data:
(name_status, name_value) = name
(owner_status, owner_value) = owner
self.assertEqual(name_status, constants.QRFS_NORMAL)
self.assertEqual(owner_status, constants.QRFS_NORMAL)
if name_value == tlock1.name:
self.assertEqual(mode, (constants.QRFS_NORMAL, "shared"))
self.assertEqual(set(owner_value),
set(i.getName() for i in tthreads1))
continue
if name == tlock2.name:
self.assertEqual(mode, "shared")
self.assertEqual(owner, [tthread2.getName()])
if name_value == tlock2.name:
self.assertEqual(mode, (constants.QRFS_NORMAL, "shared"))
self.assertEqual(owner_value, [tthread2.getName()])
continue
if name == tlock3.name:
self.assertEqual(mode, "exclusive")
self.assertEqual(owner, [tthread3.getName()])
if name_value == tlock3.name:
self.assertEqual(mode, (constants.QRFS_NORMAL, "exclusive"))
self.assertEqual(owner_value, [tthread3.getName()])
continue
self.assert_(name in expnames)
self.assert_(mode is None)
self.assert_(owner is None)
self.assert_(name_value in expnames)
self.assertEqual(mode, (constants.QRFS_NORMAL, None))
self.assert_(owner_value is None)
# Release locks again
releaseev.set()
self._waitThreads()
self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False),
[[name, None, None]
result = self.lm.QueryLocks(["name", "mode", "owner"])
self.assertEqual(objects.QueryResponse.FromDict(result).data,
[[(constants.QRFS_NORMAL, name),
(constants.QRFS_NORMAL, None),
(constants.QRFS_NORMAL, None)]
for name in utils.NiceSort(expnames)])
def testDelete(self):
lock = locking.SharedLock("TestLock", monitor=self.lm)
self.assertEqual(len(self.lm._locks), 1)