Commit 19b9ba9a authored by Michael Hanselmann's avatar Michael Hanselmann

Add simple lock monitor

This patch adds an initial implementation of a lock monitor, accessible
for the user through “gnt-debug locks”. It currently shows all resource
locks: BGL, nodes and instances. Config and job queue locks could be
shown too, but wouldn't be of much help.  The current owner(s) and mode
are also shown.

Showing pending acquires will require further changes on the SharedLock
internals and is not yet implemented.

Example output:
$ gnt-debug locks -o name,mode,owner
Name            Mode      Owner
BGL/BGL         shared    JobQueue19/Job147
instances/inst1 exclusive JobQueue19/Job147
instances/inst2 -         -
instances/inst3 -         -
instances/inst4 -         -
nodes/node1     exclusive JobQueue19/Job147
nodes/node2     exclusive JobQueue19/Job147
Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarGuido Trotter <ultrotter@google.com>
Reviewed-by: default avatarIustin Pop <iustin@google.com>
parent daba67c7
......@@ -277,6 +277,11 @@ class ClientOps:
op = opcodes.OpGetTags(kind=kind, name=name)
return self._Query(op)
elif method == luxi.REQ_QUERY_LOCKS:
(fields, sync) = args
logging.info("Received locks query request")
return self.server.context.glm.QueryLocks(fields, sync)
elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
drain_flag = args
logging.info("Received queue drain flag change request to %s",
......
......@@ -84,6 +84,7 @@ __all__ = [
"IGNORE_REMOVE_FAILURES_OPT",
"IGNORE_SECONDARIES_OPT",
"IGNORE_SIZE_OPT",
"INTERVAL_OPT",
"MAC_PREFIX_OPT",
"MAINTAIN_NODE_HEALTH_OPT",
"MASTER_NETDEV_OPT",
......@@ -929,6 +930,11 @@ SHUTDOWN_TIMEOUT_OPT = cli_option("--shutdown-timeout",
default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
help="Maximum time to wait for instance shutdown")
INTERVAL_OPT = cli_option("--interval", dest="interval", type="int",
default=None,
help=("Number of seconds between repetions of the"
" command"))
EARLY_RELEASE_OPT = cli_option("--early-release",
dest="early_release", default=False,
action="store_true",
......
......@@ -30,6 +30,8 @@ import select
import threading
import time
import errno
import weakref
import logging
from ganeti import errors
from ganeti import utils
......@@ -409,6 +411,7 @@ class SharedLock(object):
"""
__slots__ = [
"__weakref__",
"__active_shr_c",
"__inactive_shr_c",
"__deleted",
......@@ -421,10 +424,12 @@ class SharedLock(object):
__condition_class = PipeCondition
def __init__(self, name):
def __init__(self, name, monitor=None):
"""Construct a new SharedLock.
@param name: the name of the lock
@type monitor: L{LockMonitor}
@param monitor: Lock monitor with which to register
"""
object.__init__(self)
......@@ -448,6 +453,55 @@ class SharedLock(object):
# is this lock in the deleted state?
self.__deleted = False
# Register with lock monitor
if monitor:
monitor.RegisterLock(self)
def GetInfo(self, fields):
"""Retrieves information for querying locks.
@type fields: list of strings
@param fields: List of fields to return
"""
self.__lock.acquire()
try:
info = []
# Note: to avoid unintentional race conditions, no references to
# modifiable objects should be returned unless they were created in this
# function.
for fname in fields:
if fname == "name":
info.append(self.name)
elif fname == "mode":
if self.__deleted:
info.append("deleted")
assert not (self.__exc or self.__shr)
elif self.__exc:
info.append("exclusive")
elif self.__shr:
info.append("shared")
else:
info.append(None)
elif fname == "owner":
if self.__exc:
owner = [self.__exc]
else:
owner = self.__shr
if owner:
assert not self.__deleted
info.append([i.getName() for i in owner])
else:
info.append(None)
else:
raise errors.OpExecError("Invalid query field '%s'" % fname)
return info
finally:
self.__lock.release()
def __check_deleted(self):
"""Raises an exception if the lock has been deleted.
......@@ -671,6 +725,8 @@ class SharedLock(object):
self.__deleted = True
self.__exc = None
assert not (self.__exc or self.__shr), "Found owner during deletion"
# Notify all acquires. They'll throw an error.
while self.__pending:
self.__pending.pop().notifyAll()
......@@ -713,16 +769,21 @@ class LockSet:
@ivar name: the name of the lockset
"""
def __init__(self, members, name):
def __init__(self, members, name, monitor=None):
"""Constructs a new LockSet.
@type members: list of strings
@param members: initial members of the set
@type monitor: L{LockMonitor}
@param monitor: Lock monitor with which to register member locks
"""
assert members is not None, "members parameter is not a list"
self.name = name
# Lock monitor
self.__monitor = monitor
# Used internally to guarantee coherency.
self.__lock = SharedLock(name)
......@@ -731,7 +792,8 @@ class LockSet:
self.__lockdict = {}
for mname in members:
self.__lockdict[mname] = SharedLock(self._GetLockName(mname))
self.__lockdict[mname] = SharedLock(self._GetLockName(mname),
monitor=monitor)
# The owner dict contains the set of locks each thread owns. For
# performance each thread can access its own key without a global lock on
......@@ -1055,7 +1117,7 @@ class LockSet:
(invalid_names, self.name))
for lockname in names:
lock = SharedLock(self._GetLockName(lockname))
lock = SharedLock(self._GetLockName(lockname), monitor=self.__monitor)
if acquired:
lock.acquire(shared=shared)
......@@ -1193,14 +1255,25 @@ class GanetiLockManager:
self.__class__._instance = self
self._monitor = LockMonitor()
# The keyring contains all the locks, at their level and in the correct
# locking order.
self.__keyring = {
LEVEL_CLUSTER: LockSet([BGL], "bgl lockset"),
LEVEL_NODE: LockSet(nodes, "nodes lockset"),
LEVEL_INSTANCE: LockSet(instances, "instances lockset"),
LEVEL_CLUSTER: LockSet([BGL], "BGL", monitor=self._monitor),
LEVEL_NODE: LockSet(nodes, "nodes", monitor=self._monitor),
LEVEL_INSTANCE: LockSet(instances, "instances",
monitor=self._monitor),
}
def QueryLocks(self, fields, sync):
"""Queries information from all locks.
See L{LockMonitor.QueryLocks}.
"""
return self._monitor.QueryLocks(fields, sync)
def _names(self, level):
"""List the lock names at the given level.
......@@ -1352,3 +1425,59 @@ class GanetiLockManager:
"Cannot remove locks at a level while not owning it or"
" owning some at a greater one")
return self.__keyring[level].remove(names)
class LockMonitor(object):
_LOCK_ATTR = "_lock"
def __init__(self):
"""Initializes this class.
"""
self._lock = SharedLock("LockMonitor")
# Tracked locks. Weak references are used to avoid issues with circular
# references and deletion.
self._locks = weakref.WeakKeyDictionary()
@ssynchronized(_LOCK_ATTR)
def RegisterLock(self, lock):
"""Registers a new lock.
"""
logging.debug("Registering lock %s", lock.name)
assert lock not in self._locks, "Duplicate lock registration"
assert not compat.any(lock.name == i.name for i in self._locks.keys()), \
"Found duplicate lock name"
self._locks[lock] = None
@ssynchronized(_LOCK_ATTR)
def _GetLockInfo(self, fields):
"""Get information from all locks while the monitor lock is held.
"""
result = {}
for lock in self._locks.keys():
assert lock.name not in result, "Found duplicate lock name"
result[lock.name] = lock.GetInfo(fields)
return result
def QueryLocks(self, fields, sync):
"""Queries information from all locks.
@type fields: list of strings
@param fields: List of fields to return
@type sync: boolean
@param sync: Whether to operate in synchronous mode
"""
if sync:
raise NotImplementedError("Synchronous queries are not implemented")
# Get all data without sorting
result = self._GetLockInfo(fields)
# Sort by name
return [result[name] for name in utils.NiceSort(result.keys())]
......@@ -59,6 +59,7 @@ REQ_QUERY_EXPORTS = "QueryExports"
REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
REQ_QUERY_TAGS = "QueryTags"
REQ_QUERY_LOCKS = "QueryLocks"
REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
......@@ -490,3 +491,6 @@ class Client(object):
def QueryTags(self, kind, name):
return self.CallMethod(REQ_QUERY_TAGS, (kind, name))
def QueryLocks(self, fields, sync):
return self.CallMethod(REQ_QUERY_LOCKS, (fields, sync))
......@@ -192,6 +192,70 @@
</para>
</refsect2>
<refsect2>
<title>LOCKS</title>
<cmdsynopsis>
<command>locks</command>
<arg>--no-headers</arg>
<arg>--separator=<replaceable>SEPARATOR</replaceable></arg>
<sbr>
<arg>-o <replaceable>[+]FIELD,...</replaceable></arg>
<arg>--interval=<replaceable>SECONDS</replaceable></arg>
<sbr>
</cmdsynopsis>
<para>
Shows a list of locks in the master daemon.
</para>
<para>
The <option>--no-headers</option> option will skip the initial
header line. The <option>--separator</option> option takes an
argument which denotes what will be used between the output
fields. Both these options are to help scripting.
</para>
<para>
The <option>-o</option> option takes a comma-separated list of
output fields. The available fields and their meaning are:
<variablelist>
<varlistentry>
<term>name</term>
<listitem>
<simpara>Lock name</simpara>
</listitem>
</varlistentry>
<varlistentry>
<term>mode</term>
<listitem>
<simpara>
Mode in which the lock is currently acquired (exclusive or
shared)
</simpara>
</listitem>
</varlistentry>
<varlistentry>
<term>owner</term>
<listitem>
<simpara>Current lock owner(s)</simpara>
</listitem>
</varlistentry>
</variablelist>
</para>
<para>
If the value of the option starts with the character
<constant>+</constant>, the new fields will be added to the default
list. This allows to quickly see the default list plus a few other
fields, instead of retyping the entire list of fields.
</para>
<para>
Use <option>--interval</option> to repeat the listing. A delay
specified by the option value in seconds is inserted.
</para>
</refsect2>
</refsect1>
&footer;
......
......@@ -39,6 +39,14 @@ from ganeti import utils
from ganeti import errors
#: Default fields for L{ListLocks}
_LIST_LOCKS_DEF_FIELDS = [
"name",
"mode",
"owner",
]
def Delay(opts, args):
"""Sleeps for a while
......@@ -398,6 +406,57 @@ def TestJobqueue(opts, _):
return 0
def ListLocks(opts, args): # pylint: disable-msg=W0613
"""List all locks.
@param opts: the command line options selected by the user
@type args: list
@param args: should be an empty list
@rtype: int
@return: the desired exit code
"""
selected_fields = ParseFields(opts.output, _LIST_LOCKS_DEF_FIELDS)
if not opts.no_headers:
headers = {
"name": "Name",
"mode": "Mode",
"owner": "Owner",
}
else:
headers = None
while True:
# Not reusing client as interval might be too long
output = GetClient().QueryLocks(selected_fields, False)
# change raw values to nicer strings
for row in output:
for idx, field in enumerate(selected_fields):
val = row[idx]
if field in ("mode", "owner") and val is None:
val = "-"
elif field == "owner":
val = utils.CommaJoin(val)
row[idx] = str(val)
data = GenerateTable(separator=opts.separator, headers=headers,
fields=selected_fields, data=output)
for line in data:
ToStdout(line)
if not opts.interval:
break
ToStdout("")
time.sleep(opts.interval)
return 0
commands = {
'delay': (
Delay, [ArgUnknown(min=1, max=1)],
......@@ -454,7 +513,10 @@ commands = {
"{opts...} <instance>", "Executes a TestAllocator OpCode"),
"test-jobqueue": (
TestJobqueue, ARGS_NONE, [],
"", "Test a few aspects of the job queue")
"", "Test a few aspects of the job queue"),
"locks": (
ListLocks, ARGS_NONE, [NOHDR_OPT, SEP_OPT, FIELDS_OPT, INTERVAL_OPT],
"[--interval N]", "Show a list of locks in the master daemon"),
}
......
......@@ -27,9 +27,11 @@ import unittest
import time
import Queue
import threading
import random
from ganeti import locking
from ganeti import errors
from ganeti import utils
import testutils
......@@ -1422,5 +1424,155 @@ class TestGanetiLockManager(_ThreadedTestCase):
self.GL.release(locking.LEVEL_CLUSTER, ['BGL'])
class TestLockMonitor(_ThreadedTestCase):
def setUp(self):
_ThreadedTestCase.setUp(self)
self.lm = locking.LockMonitor()
def testSingleThread(self):
locks = []
for i in range(100):
name = "TestLock%s" % i
locks.append(locking.SharedLock(name, monitor=self.lm))
self.assertEqual(len(self.lm._locks), len(locks))
# Delete all locks
del locks[:]
# The garbage collector might needs some time
def _CheckLocks():
if self.lm._locks:
raise utils.RetryAgain()
utils.Retry(_CheckLocks, 0.1, 30.0)
self.assertFalse(self.lm._locks)
def testMultiThread(self):
locks = []
def _CreateLock(prev, next, name):
prev.wait()
locks.append(locking.SharedLock(name, monitor=self.lm))
if next:
next.set()
expnames = []
first = threading.Event()
prev = first
# Use a deterministic random generator
for i in random.Random(4263).sample(range(100), 33):
name = "MtTestLock%s" % i
expnames.append(name)
ev = threading.Event()
self._addThread(target=_CreateLock, args=(prev, ev, name))
prev = ev
# Add locks
first.set()
self._waitThreads()
# Check order in which locks were added
self.assertEqual([i.name for i in locks], expnames)
# Sync queries are not supported
self.assertRaises(NotImplementedError, self.lm.QueryLocks, ["name"], True)
# Check query result
self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False),
[[name, None, None] for name in utils.NiceSort(expnames)])
# Test exclusive acquire
for tlock in locks[::4]:
tlock.acquire(shared=0)
try:
def _GetExpResult(name):
if tlock.name == name:
return [name, "exclusive", [threading.currentThread().getName()]]
return [name, None, None]
self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False),
[_GetExpResult(name)
for name in utils.NiceSort(expnames)])
finally:
tlock.release()
# Test shared acquire
def _Acquire(lock, shared, ev):
lock.acquire(shared=shared)
try:
ev.wait()
finally:
lock.release()
for tlock1 in locks[::11]:
for tlock2 in locks[::-15]:
if tlock2 == tlock1:
continue
for tlock3 in locks[::10]:
if tlock3 == tlock2:
continue
ev = threading.Event()
# Acquire locks
tthreads1 = []
for i in range(3):
tthreads1.append(self._addThread(target=_Acquire,
args=(tlock1, 1, ev)))
tthread2 = self._addThread(target=_Acquire, args=(tlock2, 1, ev))
tthread3 = self._addThread(target=_Acquire, args=(tlock3, 0, ev))
# Check query result
for (name, mode, owner) in self.lm.QueryLocks(["name", "mode",
"owner"], False):
if name == tlock1.name:
self.assertEqual(mode, "shared")
self.assertEqual(set(owner), set(i.getName() for i in tthreads1))
continue
if name == tlock2.name:
self.assertEqual(mode, "shared")
self.assertEqual(owner, [tthread2.getName()])
continue
if name == tlock3.name:
self.assertEqual(mode, "exclusive")
self.assertEqual(owner, [tthread3.getName()])
continue
self.assert_(name in expnames)
self.assert_(mode is None)
self.assert_(owner is None)
# Release locks again
ev.set()
self._waitThreads()
self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False),
[[name, None, None]
for name in utils.NiceSort(expnames)])
def testDelete(self):
lock = locking.SharedLock("TestLock", monitor=self.lm)
self.assertEqual(len(self.lm._locks), 1)
self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False),
[[lock.name, None, None]])
lock.delete()
self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False),
[[lock.name, "deleted", None]])
self.assertEqual(len(self.lm._locks), 1)
if __name__ == '__main__':
testutils.GanetiTestProgram()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment