diff --git a/Makefile.am b/Makefile.am index 57dd202bc5a2e1478d91945af479693f0b3a35a5..10d8d27a77179bb6ef4e67188ba2398923d7203a 100644 --- a/Makefile.am +++ b/Makefile.am @@ -1643,7 +1643,6 @@ python_test_support = \ test/py/cmdlib/testsupport/config_mock.py \ test/py/cmdlib/testsupport/iallocator_mock.py \ test/py/cmdlib/testsupport/livelock_mock.py \ - test/py/cmdlib/testsupport/lock_manager_mock.py \ test/py/cmdlib/testsupport/netutils_mock.py \ test/py/cmdlib/testsupport/processor_mock.py \ test/py/cmdlib/testsupport/rpc_runner_mock.py \ diff --git a/lib/jqueue/__init__.py b/lib/jqueue/__init__.py index bd8f792cf017e0fe75d834023d5713b84dc18dcd..7cb39bbecb0428da10143281ec9effaab00dd681 100644 --- a/lib/jqueue/__init__.py +++ b/lib/jqueue/__init__.py @@ -1716,7 +1716,6 @@ class JobQueue(object): # Job dependencies self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies, self._EnqueueJobs) - self.context.glm.AddToLockMonitor(self.depmgr) # Setup worker pool self._wpool = _JobQueueWorkerPool(self) diff --git a/lib/locking.py b/lib/locking.py index ee6b4062385b91adb21255b652f82efcea234b88..0d347a9676f88475ac5a074d6424be264433a5c6 100644 --- a/lib/locking.py +++ b/lib/locking.py @@ -29,10 +29,8 @@ import os import select import threading import errno -import weakref import logging import heapq -import itertools import time from ganeti import errors @@ -423,7 +421,6 @@ class SharedLock(object): """Construct a new SharedLock. @param name: the name of the lock - @type monitor: L{LockMonitor} @param monitor: Lock monitor with which to register """ @@ -990,7 +987,6 @@ class LockSet(object): @type members: list of strings @param members: initial members of the set - @type monitor: L{LockMonitor} @param monitor: Lock monitor with which to register member locks """ @@ -1618,353 +1614,3 @@ BGL = "BGL" #: Node allocation lock NAL = "NAL" - - -class GanetiLockManager(object): - """The Ganeti Locking Library - - The purpose of this small library is to manage locking for ganeti clusters - in a central place, while at the same time doing dynamic checks against - possible deadlocks. It will also make it easier to transition to a different - lock type should we migrate away from python threads. - - """ - _instance = None - - def __init__(self, node_uuids, nodegroups, instance_names, networks): - """Constructs a new GanetiLockManager object. - - There should be only a GanetiLockManager object at any time, so this - function raises an error if this is not the case. - - @param node_uuids: list of node UUIDs - @param nodegroups: list of nodegroup uuids - @param instance_names: list of instance names - - """ - assert self.__class__._instance is None, \ - "double GanetiLockManager instance" - - self.__class__._instance = self - - self._monitor = LockMonitor() - - # The keyring contains all the locks, at their level and in the correct - # locking order. - self.__keyring = { - LEVEL_CLUSTER: LockSet([BGL], "cluster", monitor=self._monitor), - LEVEL_NODE: LockSet(node_uuids, "node", monitor=self._monitor), - LEVEL_NODE_RES: LockSet(node_uuids, "node-res", monitor=self._monitor), - LEVEL_NODEGROUP: LockSet(nodegroups, "nodegroup", monitor=self._monitor), - LEVEL_INSTANCE: LockSet(instance_names, "instance", - monitor=self._monitor), - LEVEL_NETWORK: LockSet(networks, "network", monitor=self._monitor), - LEVEL_NODE_ALLOC: LockSet([NAL], "node-alloc", monitor=self._monitor), - } - - assert compat.all(ls.name == LEVEL_NAMES[level] - for (level, ls) in self.__keyring.items()), \ - "Keyring name mismatch" - - def AddToLockMonitor(self, provider): - """Registers a new lock with the monitor. - - See L{LockMonitor.RegisterLock}. - - """ - return self._monitor.RegisterLock(provider) - - def QueryLocks(self, fields): - """Queries information from all locks. - - See L{LockMonitor.QueryLocks}. - - """ - return self._monitor.QueryLocks(fields) - - def _names(self, level): - """List the lock names at the given level. - - This can be used for debugging/testing purposes. - - @param level: the level whose list of locks to get - - """ - assert level in LEVELS, "Invalid locking level %s" % level - return self.__keyring[level]._names() - - def is_owned(self, level): - """Check whether we are owning locks at the given level - - """ - return self.__keyring[level].is_owned() - - def list_owned(self, level): - """Get the set of owned locks at the given level - - """ - return self.__keyring[level].list_owned() - - def check_owned(self, level, names, shared=-1): - """Check if locks at a certain level are owned in a specific mode. - - @see: L{LockSet.check_owned} - - """ - return self.__keyring[level].check_owned(names, shared=shared) - - def owning_all(self, level): - """Checks whether current thread owns all locks at a certain level. - - @see: L{LockSet.owning_all} - - """ - return self.__keyring[level].owning_all() - - def _upper_owned(self, level): - """Check that we don't own any lock at a level greater than the given one. - - """ - # This way of checking only works if LEVELS[i] = i, which we check for in - # the test cases. - return compat.any((self.is_owned(l) for l in LEVELS[level + 1:])) - - def _BGL_owned(self): # pylint: disable=C0103 - """Check if the current thread owns the BGL. - - Both an exclusive or a shared acquisition work. - - """ - return BGL in self.__keyring[LEVEL_CLUSTER].list_owned() - - @staticmethod - def _contains_BGL(level, names): # pylint: disable=C0103 - """Check if the level contains the BGL. - - Check if acting on the given level and set of names will change - the status of the Big Ganeti Lock. - - """ - return level == LEVEL_CLUSTER and (names is None or BGL in names) - - def acquire(self, level, names, timeout=None, shared=0, priority=None, - opportunistic=False): - """Acquire a set of resource locks, at the same level. - - @type level: member of locking.LEVELS - @param level: the level at which the locks shall be acquired - @type names: list of strings (or string) - @param names: the names of the locks which shall be acquired - (special lock names, or instance/node names) - @type shared: integer (0/1) used as a boolean - @param shared: whether to acquire in shared mode; by default - an exclusive lock will be acquired - @type timeout: float - @param timeout: Maximum time to acquire all locks - @type priority: integer - @param priority: Priority for acquiring lock - @type opportunistic: boolean - @param opportunistic: Acquire locks opportunistically; use the return value - to determine which locks were actually acquired - - """ - assert level in LEVELS, "Invalid locking level %s" % level - - # Check that we are either acquiring the Big Ganeti Lock or we already own - # it. Some "legacy" opcodes need to be sure they are run non-concurrently - # so even if we've migrated we need to at least share the BGL to be - # compatible with them. Of course if we own the BGL exclusively there's no - # point in acquiring any other lock, unless perhaps we are half way through - # the migration of the current opcode. - assert (self._contains_BGL(level, names) or self._BGL_owned()), ( - "You must own the Big Ganeti Lock before acquiring any other") - - # Check we don't own locks at the same or upper levels. - assert not self._upper_owned(level), ("Cannot acquire locks at a level" - " while owning some at a greater one") - - # Acquire the locks in the set. - return self.__keyring[level].acquire(names, shared=shared, timeout=timeout, - priority=priority, - opportunistic=opportunistic) - - def downgrade(self, level, names=None): - """Downgrade a set of resource locks from exclusive to shared mode. - - You must have acquired the locks in exclusive mode. - - @type level: member of locking.LEVELS - @param level: the level at which the locks shall be downgraded - @type names: list of strings, or None - @param names: the names of the locks which shall be downgraded - (defaults to all the locks acquired at the level) - - """ - assert level in LEVELS, "Invalid locking level %s" % level - - return self.__keyring[level].downgrade(names=names) - - def release(self, level, names=None): - """Release a set of resource locks, at the same level. - - You must have acquired the locks, either in shared or in exclusive - mode, before releasing them. - - @type level: member of locking.LEVELS - @param level: the level at which the locks shall be released - @type names: list of strings, or None - @param names: the names of the locks which shall be released - (defaults to all the locks acquired at that level) - - """ - assert level in LEVELS, "Invalid locking level %s" % level - assert (not self._contains_BGL(level, names) or - not self._upper_owned(LEVEL_CLUSTER)), ( - "Cannot release the Big Ganeti Lock while holding something" - " at upper levels (%r)" % - (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self.list_owned(i)) - for i in self.__keyring.keys()]), )) - - # Release will complain if we don't own the locks already - return self.__keyring[level].release(names) - - def add(self, level, names, acquired=0, shared=0): - """Add locks at the specified level. - - @type level: member of locking.LEVELS_MOD - @param level: the level at which the locks shall be added - @type names: list of strings - @param names: names of the locks to acquire - @type acquired: integer (0/1) used as a boolean - @param acquired: whether to acquire the newly added locks - @type shared: integer (0/1) used as a boolean - @param shared: whether the acquisition will be shared - - """ - assert level in LEVELS_MOD, "Invalid or immutable level %s" % level - assert self._BGL_owned(), ("You must own the BGL before performing other" - " operations") - assert not self._upper_owned(level), ("Cannot add locks at a level" - " while owning some at a greater one") - return self.__keyring[level].add(names, acquired=acquired, shared=shared) - - def remove(self, level, names): - """Remove locks from the specified level. - - You must either already own the locks you are trying to remove - exclusively or not own any lock at an upper level. - - @type level: member of locking.LEVELS_MOD - @param level: the level at which the locks shall be removed - @type names: list of strings - @param names: the names of the locks which shall be removed - (special lock names, or instance/node names) - - """ - assert level in LEVELS_MOD, "Invalid or immutable level %s" % level - assert self._BGL_owned(), ("You must own the BGL before performing other" - " operations") - # Check we either own the level or don't own anything from here - # up. LockSet.remove() will check the case in which we don't own - # all the needed resources, or we have a shared ownership. - assert self.is_owned(level) or not self._upper_owned(level), ( - "Cannot remove locks at a level while not owning it or" - " owning some at a greater one") - return self.__keyring[level].remove(names) - - -def _MonitorSortKey((item, idx, num)): - """Sorting key function. - - Sort by name, registration order and then order of information. This provides - a stable sort order over different providers, even if they return the same - name. - - """ - (name, _, _, _) = item - - return (utils.NiceSortKey(name), num, idx) - - -class LockMonitor(object): - _LOCK_ATTR = "_lock" - - def __init__(self): - """Initializes this class. - - """ - self._lock = SharedLock("LockMonitor") - - # Counter for stable sorting - self._counter = itertools.count(0) - - # Tracked locks. Weak references are used to avoid issues with circular - # references and deletion. - self._locks = weakref.WeakKeyDictionary() - - @ssynchronized(_LOCK_ATTR) - def RegisterLock(self, provider): - """Registers a new lock. - - @param provider: Object with a callable method named C{GetLockInfo}, taking - a single C{set} containing the requested information items - @note: It would be nicer to only receive the function generating the - requested information but, as it turns out, weak references to bound - methods (e.g. C{self.GetLockInfo}) are tricky; there are several - workarounds, but none of the ones I found works properly in combination - with a standard C{WeakKeyDictionary} - - """ - assert provider not in self._locks, "Duplicate registration" - - # There used to be a check for duplicate names here. As it turned out, when - # a lock is re-created with the same name in a very short timeframe, the - # previous instance might not yet be removed from the weakref dictionary. - # By keeping track of the order of incoming registrations, a stable sort - # ordering can still be guaranteed. - - self._locks[provider] = self._counter.next() - - def _GetLockInfo(self, requested): - """Get information from all locks. - - """ - # Must hold lock while getting consistent list of tracked items - self._lock.acquire(shared=1) - try: - items = self._locks.items() - finally: - self._lock.release() - - return [(info, idx, num) - for (provider, num) in items - for (idx, info) in enumerate(provider.GetLockInfo(requested))] - - def _Query(self, fields): - """Queries information from all locks. - - @type fields: list of strings - @param fields: List of fields to return - - """ - qobj = query.Query(query.LOCK_FIELDS, fields) - - # Get all data with internal lock held and then sort by name and incoming - # order - lockinfo = sorted(self._GetLockInfo(qobj.RequestedData()), - key=_MonitorSortKey) - - # Extract lock information and build query data - return (qobj, query.LockQueryData(map(compat.fst, lockinfo))) - - def QueryLocks(self, fields): - """Queries information from all locks. - - @type fields: list of strings - @param fields: List of fields to return - - """ - (qobj, ctx) = self._Query(fields) - - # Prepare query response - return query.GetQueryResponse(qobj, ctx) diff --git a/lib/rpc/node.py b/lib/rpc/node.py index a6a7e59f52c7b0d663a6c085e0b17fee0452dd87..1a0b72dc2347570843b3a332680858429d459fb9 100644 --- a/lib/rpc/node.py +++ b/lib/rpc/node.py @@ -971,7 +971,7 @@ class JobQueueRunner(_RpcClientBase, _generated_rpc.RpcClientJobQueue): """RPC wrappers for job queue. """ - def __init__(self, context, address_list): + def __init__(self, _context, address_list): """Initializes this class. """ @@ -982,7 +982,7 @@ class JobQueueRunner(_RpcClientBase, _generated_rpc.RpcClientJobQueue): resolver = _StaticResolver(address_list) _RpcClientBase.__init__(self, resolver, _ENCODERS.get, - lock_monitor_cb=context.glm.AddToLockMonitor) + lock_monitor_cb=lambda _: None) _generated_rpc.RpcClientJobQueue.__init__(self) @@ -1023,15 +1023,12 @@ class ConfigRunner(_RpcClientBase, _generated_rpc.RpcClientConfig): """RPC wrappers for L{config}. """ - def __init__(self, context, address_list, _req_process_fn=None, + def __init__(self, _context, address_list, _req_process_fn=None, _getents=None): """Initializes this class. """ - if context: - lock_monitor_cb = context.glm.AddToLockMonitor - else: - lock_monitor_cb = None + lock_monitor_cb = None if address_list is None: resolver = compat.partial(_SsconfResolver, True) diff --git a/lib/server/masterd.py b/lib/server/masterd.py index f546524e0e39e7560aa0ff4619f726237ba7e66f..ee1ca77544dff2d155fa6f47a7622f733439564f 100644 --- a/lib/server/masterd.py +++ b/lib/server/masterd.py @@ -43,7 +43,6 @@ from ganeti import daemon from ganeti import mcpu from ganeti import opcodes from ganeti import jqueue -from ganeti import locking from ganeti import luxi import ganeti.rpc.errors as rpcerr from ganeti import utils @@ -343,10 +342,7 @@ class ClientOps(object): result = self._Query(opcodes.OpQuery(what=what, fields=fields, qfilter=qfilter)) elif what == constants.QR_LOCK: - if qfilter is not None: - raise errors.OpPrereqError("Lock queries can't be filtered", - errors.ECODE_INVAL) - result = self.server.context.glm.QueryLocks(fields) + raise errors.OpPrereqError("Lock queries cannot be asked to jobs") elif what == constants.QR_JOB: result = queue.QueryJobs(fields, qfilter) elif what in constants.QR_VIA_LUXI: @@ -504,15 +500,8 @@ class GanetiContext(object): else: self.livelock = livelock - # Locking manager - cfg = self.GetConfig(None) - self.glm = locking.GanetiLockManager( - cfg.GetNodeList(), - cfg.GetNodeGroupList(), - [inst.name for inst in cfg.GetAllInstancesInfo().values()], - cfg.GetNetworkList()) - # Job queue + cfg = self.GetConfig(None) logging.debug("Creating the job queue") self.jobqueue = jqueue.JobQueue(self, cfg) @@ -532,8 +521,10 @@ class GanetiContext(object): def GetConfig(self, ec_id): return config.GetConfig(ec_id, self.livelock) + # pylint: disable=R0201 + # method could be a function, but keep interface backwards compatible def GetRpc(self, cfg): - return rpc.RpcRunner(cfg, self.glm.AddToLockMonitor) + return rpc.RpcRunner(cfg, lambda _: None) def AddNode(self, cfg, node, ec_id): """Adds a node to the configuration. diff --git a/test/py/cmdlib/testsupport/__init__.py b/test/py/cmdlib/testsupport/__init__.py index dc172633d4801097c1d41d9c2d0e43af2924fc29..66ce8458e15481910550f457395876b32f98e5f0 100644 --- a/test/py/cmdlib/testsupport/__init__.py +++ b/test/py/cmdlib/testsupport/__init__.py @@ -29,7 +29,6 @@ from cmdlib.testsupport.config_mock import ConfigMock from cmdlib.testsupport.iallocator_mock import patchIAllocator from cmdlib.testsupport.livelock_mock import LiveLockMock from cmdlib.testsupport.utils_mock import patchUtils -from cmdlib.testsupport.lock_manager_mock import LockManagerMock from cmdlib.testsupport.netutils_mock import patchNetutils, HostnameMock from cmdlib.testsupport.processor_mock import ProcessorMock from cmdlib.testsupport.rpc_runner_mock import CreateRpcRunnerMock, \ @@ -46,7 +45,6 @@ __all__ = ["CmdlibTestCase", "patchUtils", "patchNetutils", "patchSsh", - "LockManagerMock", "ProcessorMock", "RpcResultsBuilder", "LiveLockMock", diff --git a/test/py/cmdlib/testsupport/lock_manager_mock.py b/test/py/cmdlib/testsupport/lock_manager_mock.py deleted file mode 100644 index 068086e63dc45af5e7eaba20ae0637e43d8e90ea..0000000000000000000000000000000000000000 --- a/test/py/cmdlib/testsupport/lock_manager_mock.py +++ /dev/null @@ -1,57 +0,0 @@ -# -# - -# Copyright (C) 2013 Google Inc. -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, but -# WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA -# 02110-1301, USA. - - -"""Support for mocking the lock manager""" - - -from ganeti import locking - - -class LockManagerMock(locking.GanetiLockManager): - """Mocked lock manager for tests. - - """ - def __init__(self): - # reset singleton instance, there is a separate lock manager for every test - # pylint: disable=W0212 - self.__class__._instance = None - - super(LockManagerMock, self).__init__([], [], [], []) - - def AddLocksFromConfig(self, cfg): - """Create locks for all entities in the given configuration. - - @type cfg: ganeti.config.ConfigWriter - """ - try: - self.acquire(locking.LEVEL_CLUSTER, locking.BGL) - - for node_uuid in cfg.GetNodeList(): - self.add(locking.LEVEL_NODE, node_uuid) - self.add(locking.LEVEL_NODE_RES, node_uuid) - for group_uuid in cfg.GetNodeGroupList(): - self.add(locking.LEVEL_NODEGROUP, group_uuid) - for inst in cfg.GetAllInstancesInfo().values(): - self.add(locking.LEVEL_INSTANCE, inst.name) - for net_uuid in cfg.GetNetworkList(): - self.add(locking.LEVEL_NETWORK, net_uuid) - finally: - self.release(locking.LEVEL_CLUSTER, locking.BGL) diff --git a/test/py/ganeti.locking_unittest.py b/test/py/ganeti.locking_unittest.py index 705103f141e91bab2272d8fc05495847132a5f79..2d2ab3ed43326643c06f963d36936784625d86a7 100755 --- a/test/py/ganeti.locking_unittest.py +++ b/test/py/ganeti.locking_unittest.py @@ -2010,680 +2010,5 @@ class TestGetLsAcquireModeAndTimeouts(unittest.TestCase): self.assertTrue(ls_timeout_fn is None) -class TestGanetiLockManager(_ThreadedTestCase): - def setUp(self): - _ThreadedTestCase.setUp(self) - self.nodes = ["n1", "n2"] - self.nodegroups = ["g1", "g2"] - self.instances = ["i1", "i2", "i3"] - self.networks = ["net1", "net2", "net3"] - self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups, - self.instances, self.networks) - - def tearDown(self): - # Don't try this at home... - locking.GanetiLockManager._instance = None - - def testLockingConstants(self): - # The locking library internally cheats by assuming its constants have some - # relationships with each other. Check those hold true. - # This relationship is also used in the Processor to recursively acquire - # the right locks. Again, please don't break it. - for i in range(len(locking.LEVELS)): - self.assertEqual(i, locking.LEVELS[i]) - - def testDoubleGLFails(self): - self.assertRaises(AssertionError, locking.GanetiLockManager, [], [], [], []) - - def testLockNames(self): - self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(["BGL"])) - self.assertEqual(self.GL._names(locking.LEVEL_NODE_ALLOC), set(["NAL"])) - self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes)) - self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), - set(self.nodegroups)) - self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), - set(self.instances)) - self.assertEqual(self.GL._names(locking.LEVEL_NETWORK), - set(self.networks)) - - def testInitAndResources(self): - locking.GanetiLockManager._instance = None - self.GL = locking.GanetiLockManager([], [], [], []) - self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(["BGL"])) - self.assertEqual(self.GL._names(locking.LEVEL_NODE_ALLOC), set(["NAL"])) - self.assertEqual(self.GL._names(locking.LEVEL_NODE), set()) - self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set()) - self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set()) - self.assertEqual(self.GL._names(locking.LEVEL_NETWORK), set()) - - locking.GanetiLockManager._instance = None - self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups, [], []) - self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(["BGL"])) - self.assertEqual(self.GL._names(locking.LEVEL_NODE_ALLOC), set(["NAL"])) - self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes)) - self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), - set(self.nodegroups)) - self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set()) - self.assertEqual(self.GL._names(locking.LEVEL_NETWORK), set()) - - locking.GanetiLockManager._instance = None - self.GL = locking.GanetiLockManager([], [], self.instances, []) - self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(["BGL"])) - self.assertEqual(self.GL._names(locking.LEVEL_NODE_ALLOC), set(["NAL"])) - self.assertEqual(self.GL._names(locking.LEVEL_NODE), set()) - self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set()) - self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), - set(self.instances)) - - locking.GanetiLockManager._instance = None - self.GL = locking.GanetiLockManager([], [], [], self.networks) - self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(["BGL"])) - self.assertEqual(self.GL._names(locking.LEVEL_NODE_ALLOC), set(["NAL"])) - self.assertEqual(self.GL._names(locking.LEVEL_NODE), set()) - self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set()) - self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set()) - self.assertEqual(self.GL._names(locking.LEVEL_NETWORK), - set(self.networks)) - - def testAcquireRelease(self): - self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"], shared=1) - self.assertEquals(self.GL.list_owned(locking.LEVEL_CLUSTER), set(["BGL"])) - self.GL.acquire(locking.LEVEL_INSTANCE, ["i1"]) - self.GL.acquire(locking.LEVEL_NODEGROUP, ["g2"]) - self.GL.acquire(locking.LEVEL_NODE, ["n1", "n2"], shared=1) - self.assertTrue(self.GL.check_owned(locking.LEVEL_NODE, ["n1", "n2"], - shared=1)) - self.assertFalse(self.GL.check_owned(locking.LEVEL_INSTANCE, ["i1", "i3"])) - self.GL.release(locking.LEVEL_NODE, ["n2"]) - self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE), set(["n1"])) - self.assertEquals(self.GL.list_owned(locking.LEVEL_NODEGROUP), set(["g2"])) - self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE), set(["i1"])) - self.GL.release(locking.LEVEL_NODE) - self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE), set()) - self.assertEquals(self.GL.list_owned(locking.LEVEL_NODEGROUP), set(["g2"])) - self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE), set(["i1"])) - self.GL.release(locking.LEVEL_NODEGROUP) - self.GL.release(locking.LEVEL_INSTANCE) - self.assertRaises(errors.LockError, self.GL.acquire, - locking.LEVEL_INSTANCE, ["i5"]) - self.GL.acquire(locking.LEVEL_INSTANCE, ["i3"], shared=1) - self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE), set(["i3"])) - - def testAcquireWholeSets(self): - self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"], shared=1) - self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None), - set(self.instances)) - self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE), - set(self.instances)) - self.assertEquals(self.GL.acquire(locking.LEVEL_NODEGROUP, None), - set(self.nodegroups)) - self.assertEquals(self.GL.list_owned(locking.LEVEL_NODEGROUP), - set(self.nodegroups)) - self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1), - set(self.nodes)) - self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE), - set(self.nodes)) - self.assertTrue(self.GL.owning_all(locking.LEVEL_INSTANCE)) - self.assertTrue(self.GL.owning_all(locking.LEVEL_NODEGROUP)) - self.assertTrue(self.GL.owning_all(locking.LEVEL_NODE)) - self.GL.release(locking.LEVEL_NODE) - self.GL.release(locking.LEVEL_NODEGROUP) - self.GL.release(locking.LEVEL_INSTANCE) - self.GL.release(locking.LEVEL_CLUSTER) - - def testAcquireWholeAndPartial(self): - self.assertFalse(self.GL.owning_all(locking.LEVEL_INSTANCE)) - self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"], shared=1) - self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None), - set(self.instances)) - self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE), - set(self.instances)) - self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ["n2"], shared=1), - set(["n2"])) - self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE), - set(["n2"])) - self.assertTrue(self.GL.owning_all(locking.LEVEL_INSTANCE)) - self.assertFalse(self.GL.owning_all(locking.LEVEL_NODE)) - self.GL.release(locking.LEVEL_NODE) - self.GL.release(locking.LEVEL_INSTANCE) - self.GL.release(locking.LEVEL_CLUSTER) - - def testBGLDependency(self): - self.assertRaises(AssertionError, self.GL.acquire, - locking.LEVEL_NODE, ["n1", "n2"]) - self.assertRaises(AssertionError, self.GL.acquire, - locking.LEVEL_INSTANCE, ["i3"]) - self.assertRaises(AssertionError, self.GL.acquire, - locking.LEVEL_NODEGROUP, ["g1"]) - self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"], shared=1) - self.GL.acquire(locking.LEVEL_NODE, ["n1"]) - self.assertRaises(AssertionError, self.GL.release, - locking.LEVEL_CLUSTER, ["BGL"]) - self.assertRaises(AssertionError, self.GL.release, - locking.LEVEL_CLUSTER) - self.GL.release(locking.LEVEL_NODE) - self.GL.acquire(locking.LEVEL_INSTANCE, ["i1", "i2"]) - self.assertRaises(AssertionError, self.GL.release, - locking.LEVEL_CLUSTER, ["BGL"]) - self.assertRaises(AssertionError, self.GL.release, - locking.LEVEL_CLUSTER) - self.GL.release(locking.LEVEL_INSTANCE) - self.GL.acquire(locking.LEVEL_NODEGROUP, None) - self.GL.release(locking.LEVEL_NODEGROUP, ["g1"]) - self.assertRaises(AssertionError, self.GL.release, - locking.LEVEL_CLUSTER, ["BGL"]) - self.assertRaises(AssertionError, self.GL.release, - locking.LEVEL_CLUSTER) - self.GL.release(locking.LEVEL_NODEGROUP) - self.GL.release(locking.LEVEL_CLUSTER) - - def testWrongOrder(self): - self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"], shared=1) - self.GL.acquire(locking.LEVEL_NODE, ["n2"]) - self.assertRaises(AssertionError, self.GL.acquire, - locking.LEVEL_NODE, ["n1"]) - self.assertRaises(AssertionError, self.GL.acquire, - locking.LEVEL_NODEGROUP, ["g1"]) - self.assertRaises(AssertionError, self.GL.acquire, - locking.LEVEL_INSTANCE, ["i2"]) - - def testModifiableLevels(self): - self.assertRaises(AssertionError, self.GL.add, locking.LEVEL_CLUSTER, - ["BGL2"]) - self.assertRaises(AssertionError, self.GL.add, locking.LEVEL_NODE_ALLOC, - ["NAL2"]) - self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"]) - self.GL.add(locking.LEVEL_INSTANCE, ["i4"]) - self.GL.remove(locking.LEVEL_INSTANCE, ["i3"]) - self.GL.remove(locking.LEVEL_INSTANCE, ["i1"]) - self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set(["i2", "i4"])) - self.GL.add(locking.LEVEL_NODE, ["n3"]) - self.GL.remove(locking.LEVEL_NODE, ["n1"]) - self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(["n2", "n3"])) - self.GL.add(locking.LEVEL_NODEGROUP, ["g3"]) - self.GL.remove(locking.LEVEL_NODEGROUP, ["g2"]) - self.GL.remove(locking.LEVEL_NODEGROUP, ["g1"]) - self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set(["g3"])) - self.assertRaises(AssertionError, self.GL.remove, locking.LEVEL_CLUSTER, - ["BGL2"]) - - # Helper function to run as a thread that shared the BGL and then acquires - # some locks at another level. - def _doLock(self, level, names, shared): - try: - self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"], shared=1) - self.GL.acquire(level, names, shared=shared) - self.done.put("DONE") - self.GL.release(level) - self.GL.release(locking.LEVEL_CLUSTER) - except errors.LockError: - self.done.put("ERR") - - @_Repeat - def testConcurrency(self): - self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"], shared=1) - self._addThread(target=self._doLock, - args=(locking.LEVEL_INSTANCE, "i1", 1)) - self._waitThreads() - self.assertEqual(self.done.get_nowait(), "DONE") - self.GL.acquire(locking.LEVEL_INSTANCE, ["i3"]) - self._addThread(target=self._doLock, - args=(locking.LEVEL_INSTANCE, "i1", 1)) - self._waitThreads() - self.assertEqual(self.done.get_nowait(), "DONE") - self._addThread(target=self._doLock, - args=(locking.LEVEL_INSTANCE, "i3", 1)) - self.assertRaises(Queue.Empty, self.done.get_nowait) - self.GL.release(locking.LEVEL_INSTANCE) - self._waitThreads() - self.assertEqual(self.done.get_nowait(), "DONE") - self.GL.acquire(locking.LEVEL_INSTANCE, ["i2"], shared=1) - self._addThread(target=self._doLock, - args=(locking.LEVEL_INSTANCE, "i2", 1)) - self._waitThreads() - self.assertEqual(self.done.get_nowait(), "DONE") - self._addThread(target=self._doLock, - args=(locking.LEVEL_INSTANCE, "i2", 0)) - self.assertRaises(Queue.Empty, self.done.get_nowait) - self.GL.release(locking.LEVEL_INSTANCE) - self._waitThreads() - self.assertEqual(self.done.get(True, 1), "DONE") - self.GL.release(locking.LEVEL_CLUSTER, ["BGL"]) - - -class TestLockMonitor(_ThreadedTestCase): - def setUp(self): - _ThreadedTestCase.setUp(self) - self.lm = locking.LockMonitor() - - def testSingleThread(self): - locks = [] - - for i in range(100): - name = "TestLock%s" % i - locks.append(locking.SharedLock(name, monitor=self.lm)) - - self.assertEqual(len(self.lm._locks), len(locks)) - result = objects.QueryResponse.FromDict(self.lm.QueryLocks(["name"])) - self.assertEqual(len(result.fields), 1) - self.assertEqual(len(result.data), 100) - - # Delete all locks - del locks[:] - - # The garbage collector might needs some time - def _CheckLocks(): - if self.lm._locks: - raise utils.RetryAgain() - - utils.Retry(_CheckLocks, 0.1, 30.0) - - self.assertFalse(self.lm._locks) - - def testMultiThread(self): - locks = [] - - def _CreateLock(prev, next, name): - prev.wait() - locks.append(locking.SharedLock(name, monitor=self.lm)) - if next: - next.set() - - expnames = [] - - first = threading.Event() - prev = first - - # Use a deterministic random generator - for i in random.Random(4263).sample(range(100), 33): - name = "MtTestLock%s" % i - expnames.append(name) - - ev = threading.Event() - self._addThread(target=_CreateLock, args=(prev, ev, name)) - prev = ev - - # Add locks - first.set() - self._waitThreads() - - # Check order in which locks were added - self.assertEqual([i.name for i in locks], expnames) - - # Check query result - result = self.lm.QueryLocks(["name", "mode", "owner", "pending"]) - self.assert_(isinstance(result, dict)) - response = objects.QueryResponse.FromDict(result) - self.assertEqual(response.data, - [[(constants.RS_NORMAL, name), - (constants.RS_NORMAL, None), - (constants.RS_NORMAL, None), - (constants.RS_NORMAL, [])] - for name in utils.NiceSort(expnames)]) - self.assertEqual(len(response.fields), 4) - self.assertEqual(["name", "mode", "owner", "pending"], - [fdef.name for fdef in response.fields]) - - # Test exclusive acquire - for tlock in locks[::4]: - tlock.acquire(shared=0) - try: - def _GetExpResult(name): - if tlock.name == name: - return [(constants.RS_NORMAL, name), - (constants.RS_NORMAL, "exclusive"), - (constants.RS_NORMAL, - [threading.currentThread().getName()]), - (constants.RS_NORMAL, [])] - return [(constants.RS_NORMAL, name), - (constants.RS_NORMAL, None), - (constants.RS_NORMAL, None), - (constants.RS_NORMAL, [])] - - result = self.lm.QueryLocks(["name", "mode", "owner", "pending"]) - self.assertEqual(objects.QueryResponse.FromDict(result).data, - [_GetExpResult(name) - for name in utils.NiceSort(expnames)]) - finally: - tlock.release() - - # Test shared acquire - def _Acquire(lock, shared, ev, notify): - lock.acquire(shared=shared) - try: - notify.set() - ev.wait() - finally: - lock.release() - - for tlock1 in locks[::11]: - for tlock2 in locks[::-15]: - if tlock2 == tlock1: - # Avoid deadlocks - continue - - for tlock3 in locks[::10]: - if tlock3 in (tlock2, tlock1): - # Avoid deadlocks - continue - - releaseev = threading.Event() - - # Acquire locks - acquireev = [] - tthreads1 = [] - for i in range(3): - ev = threading.Event() - tthreads1.append(self._addThread(target=_Acquire, - args=(tlock1, 1, releaseev, ev))) - acquireev.append(ev) - - ev = threading.Event() - tthread2 = self._addThread(target=_Acquire, - args=(tlock2, 1, releaseev, ev)) - acquireev.append(ev) - - ev = threading.Event() - tthread3 = self._addThread(target=_Acquire, - args=(tlock3, 0, releaseev, ev)) - acquireev.append(ev) - - # Wait for all locks to be acquired - for i in acquireev: - i.wait() - - # Check query result - result = self.lm.QueryLocks(["name", "mode", "owner"]) - response = objects.QueryResponse.FromDict(result) - for (name, mode, owner) in response.data: - (name_status, name_value) = name - (owner_status, owner_value) = owner - - self.assertEqual(name_status, constants.RS_NORMAL) - self.assertEqual(owner_status, constants.RS_NORMAL) - - if name_value == tlock1.name: - self.assertEqual(mode, (constants.RS_NORMAL, "shared")) - self.assertEqual(set(owner_value), - set(i.getName() for i in tthreads1)) - continue - - if name_value == tlock2.name: - self.assertEqual(mode, (constants.RS_NORMAL, "shared")) - self.assertEqual(owner_value, [tthread2.getName()]) - continue - - if name_value == tlock3.name: - self.assertEqual(mode, (constants.RS_NORMAL, "exclusive")) - self.assertEqual(owner_value, [tthread3.getName()]) - continue - - self.assert_(name_value in expnames) - self.assertEqual(mode, (constants.RS_NORMAL, None)) - self.assert_(owner_value is None) - - # Release locks again - releaseev.set() - - self._waitThreads() - - result = self.lm.QueryLocks(["name", "mode", "owner"]) - self.assertEqual(objects.QueryResponse.FromDict(result).data, - [[(constants.RS_NORMAL, name), - (constants.RS_NORMAL, None), - (constants.RS_NORMAL, None)] - for name in utils.NiceSort(expnames)]) - - def testDelete(self): - lock = locking.SharedLock("TestLock", monitor=self.lm) - - self.assertEqual(len(self.lm._locks), 1) - result = self.lm.QueryLocks(["name", "mode", "owner"]) - self.assertEqual(objects.QueryResponse.FromDict(result).data, - [[(constants.RS_NORMAL, lock.name), - (constants.RS_NORMAL, None), - (constants.RS_NORMAL, None)]]) - - lock.delete() - - result = self.lm.QueryLocks(["name", "mode", "owner"]) - self.assertEqual(objects.QueryResponse.FromDict(result).data, - [[(constants.RS_NORMAL, lock.name), - (constants.RS_NORMAL, "deleted"), - (constants.RS_NORMAL, None)]]) - self.assertEqual(len(self.lm._locks), 1) - - def testPending(self): - def _Acquire(lock, shared, prev, next): - prev.wait() - - lock.acquire(shared=shared, test_notify=next.set) - try: - pass - finally: - lock.release() - - lock = locking.SharedLock("ExcLock", monitor=self.lm) - - for shared in [0, 1]: - lock.acquire() - try: - self.assertEqual(len(self.lm._locks), 1) - result = self.lm.QueryLocks(["name", "mode", "owner"]) - self.assertEqual(objects.QueryResponse.FromDict(result).data, - [[(constants.RS_NORMAL, lock.name), - (constants.RS_NORMAL, "exclusive"), - (constants.RS_NORMAL, - [threading.currentThread().getName()])]]) - - threads = [] - - first = threading.Event() - prev = first - - for i in range(5): - ev = threading.Event() - threads.append(self._addThread(target=_Acquire, - args=(lock, shared, prev, ev))) - prev = ev - - # Start acquires - first.set() - - # Wait for last acquire to start waiting - prev.wait() - - # NOTE: This works only because QueryLocks will acquire the - # lock-internal lock again and won't be able to get the information - # until it has the lock. By then the acquire should be registered in - # SharedLock.__pending (otherwise it's a bug). - - # All acquires are waiting now - if shared: - pending = [("shared", utils.NiceSort(t.getName() for t in threads))] - else: - pending = [("exclusive", [t.getName()]) for t in threads] - - result = self.lm.QueryLocks(["name", "mode", "owner", "pending"]) - self.assertEqual(objects.QueryResponse.FromDict(result).data, - [[(constants.RS_NORMAL, lock.name), - (constants.RS_NORMAL, "exclusive"), - (constants.RS_NORMAL, - [threading.currentThread().getName()]), - (constants.RS_NORMAL, pending)]]) - - self.assertEqual(len(self.lm._locks), 1) - finally: - lock.release() - - self._waitThreads() - - # No pending acquires - result = self.lm.QueryLocks(["name", "mode", "owner", "pending"]) - self.assertEqual(objects.QueryResponse.FromDict(result).data, - [[(constants.RS_NORMAL, lock.name), - (constants.RS_NORMAL, None), - (constants.RS_NORMAL, None), - (constants.RS_NORMAL, [])]]) - - self.assertEqual(len(self.lm._locks), 1) - - def testDeleteAndRecreate(self): - lname = "TestLock101923193" - - # Create some locks with the same name and keep all references - locks = [locking.SharedLock(lname, monitor=self.lm) - for _ in range(5)] - - self.assertEqual(len(self.lm._locks), len(locks)) - - result = self.lm.QueryLocks(["name", "mode", "owner"]) - self.assertEqual(objects.QueryResponse.FromDict(result).data, - [[(constants.RS_NORMAL, lname), - (constants.RS_NORMAL, None), - (constants.RS_NORMAL, None)]] * 5) - - locks[2].delete() - - # Check information order - result = self.lm.QueryLocks(["name", "mode", "owner"]) - self.assertEqual(objects.QueryResponse.FromDict(result).data, - [[(constants.RS_NORMAL, lname), - (constants.RS_NORMAL, None), - (constants.RS_NORMAL, None)]] * 2 + - [[(constants.RS_NORMAL, lname), - (constants.RS_NORMAL, "deleted"), - (constants.RS_NORMAL, None)]] + - [[(constants.RS_NORMAL, lname), - (constants.RS_NORMAL, None), - (constants.RS_NORMAL, None)]] * 2) - - locks[1].acquire(shared=0) - - last_status = [ - [(constants.RS_NORMAL, lname), - (constants.RS_NORMAL, None), - (constants.RS_NORMAL, None)], - [(constants.RS_NORMAL, lname), - (constants.RS_NORMAL, "exclusive"), - (constants.RS_NORMAL, [threading.currentThread().getName()])], - [(constants.RS_NORMAL, lname), - (constants.RS_NORMAL, "deleted"), - (constants.RS_NORMAL, None)], - [(constants.RS_NORMAL, lname), - (constants.RS_NORMAL, None), - (constants.RS_NORMAL, None)], - [(constants.RS_NORMAL, lname), - (constants.RS_NORMAL, None), - (constants.RS_NORMAL, None)], - ] - - # Check information order - result = self.lm.QueryLocks(["name", "mode", "owner"]) - self.assertEqual(objects.QueryResponse.FromDict(result).data, last_status) - - self.assertEqual(len(set(self.lm._locks.values())), len(locks)) - self.assertEqual(len(self.lm._locks), len(locks)) - - # Check lock deletion - for idx in range(len(locks)): - del locks[0] - assert gc.isenabled() - gc.collect() - self.assertEqual(len(self.lm._locks), len(locks)) - result = self.lm.QueryLocks(["name", "mode", "owner"]) - self.assertEqual(objects.QueryResponse.FromDict(result).data, - last_status[idx + 1:]) - - # All locks should have been deleted - assert not locks - self.assertFalse(self.lm._locks) - - result = self.lm.QueryLocks(["name", "mode", "owner"]) - self.assertEqual(objects.QueryResponse.FromDict(result).data, []) - - class _FakeLock: - def __init__(self): - self._info = [] - - def AddResult(self, *args): - self._info.append(args) - - def CountPending(self): - return len(self._info) - - def GetLockInfo(self, requested): - (exp_requested, result) = self._info.pop(0) - - if exp_requested != requested: - raise Exception("Requested information (%s) does not match" - " expectations (%s)" % (requested, exp_requested)) - - return result - - def testMultipleResults(self): - fl1 = self._FakeLock() - fl2 = self._FakeLock() - - self.lm.RegisterLock(fl1) - self.lm.RegisterLock(fl2) - - # Empty information - for i in [fl1, fl2]: - i.AddResult(set([query.LQ_MODE, query.LQ_OWNER]), []) - result = self.lm.QueryLocks(["name", "mode", "owner"]) - self.assertEqual(objects.QueryResponse.FromDict(result).data, []) - for i in [fl1, fl2]: - self.assertEqual(i.CountPending(), 0) - - # Check ordering - for fn in [lambda x: x, reversed, sorted]: - fl1.AddResult(set(), list(fn([ - ("aaa", None, None, None), - ("bbb", None, None, None), - ]))) - fl2.AddResult(set(), []) - result = self.lm.QueryLocks(["name"]) - self.assertEqual(objects.QueryResponse.FromDict(result).data, [ - [(constants.RS_NORMAL, "aaa")], - [(constants.RS_NORMAL, "bbb")], - ]) - for i in [fl1, fl2]: - self.assertEqual(i.CountPending(), 0) - - for fn2 in [lambda x: x, reversed, sorted]: - fl1.AddResult(set([query.LQ_MODE]), list(fn([ - # Same name, but different information - ("aaa", "mode0", None, None), - ("aaa", "mode1", None, None), - ("aaa", "mode2", None, None), - ("aaa", "mode3", None, None), - ]))) - fl2.AddResult(set([query.LQ_MODE]), [ - ("zzz", "end", None, None), - ("000", "start", None, None), - ] + list(fn2([ - ("aaa", "b200", None, None), - ("aaa", "b300", None, None), - ]))) - result = self.lm.QueryLocks(["name", "mode"]) - self.assertEqual(objects.QueryResponse.FromDict(result).data, [ - [(constants.RS_NORMAL, "000"), (constants.RS_NORMAL, "start")], - ] + list(fn([ - # Name is the same, so order must be equal to incoming order - [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode0")], - [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode1")], - [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode2")], - [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode3")], - ])) + list(fn2([ - [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "b200")], - [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "b300")], - ])) + [ - [(constants.RS_NORMAL, "zzz"), (constants.RS_NORMAL, "end")], - ]) - for i in [fl1, fl2]: - self.assertEqual(i.CountPending(), 0) - - if __name__ == "__main__": testutils.GanetiTestProgram()