diff --git a/lib/locking.py b/lib/locking.py index 216c205323ace149bf541aaa7fe49b00ef6a795f..41519eb3f7fccc81ce599479bc66abd3105dc441 100644 --- a/lib/locking.py +++ b/lib/locking.py @@ -26,6 +26,7 @@ import threading # Wouldn't it be better to define LockingError in the locking module? # Well, for now that's how the rest of the code does it... from ganeti import errors +from ganeti import utils class SharedLock: @@ -555,3 +556,212 @@ class LockSet: return delete_failed + +# Locking levels, must be acquired in increasing order. +# Current rules are: +# - at level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be +# acquired before performing any operation, either in shared or in exclusive +# mode. acquiring the BGL in exclusive mode is discouraged and should be +# avoided. +# - at levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks. +# If you need more than one node, or more than one instance, acquire them at +# the same time. +# - level LEVEL_CONFIG contains the configuration lock, which you must acquire +# before reading or changing the config file. +LEVEL_CLUSTER = 0 +LEVEL_NODE = 1 +LEVEL_INSTANCE = 2 +LEVEL_CONFIG = 3 + +LEVELS = [LEVEL_CLUSTER, + LEVEL_NODE, + LEVEL_INSTANCE, + LEVEL_CONFIG] + +# Lock levels which are modifiable +LEVELS_MOD = [LEVEL_NODE, LEVEL_INSTANCE] + +# Constant for the big ganeti lock and config lock +BGL = 'BGL' +CONFIG = 'config' + + +class GanetiLockManager: + """The Ganeti Locking Library + + The purpouse of this small library is to manage locking for ganeti clusters + in a central place, while at the same time doing dynamic checks against + possible deadlocks. It will also make it easier to transition to a different + lock type should we migrate away from python threads. + + """ + _instance = None + + def __init__(self, nodes=None, instances=None): + """Constructs a new GanetiLockManager object. + + There should be only a + GanetiLockManager object at any time, so this function raises an error if this + is not the case. + + Args: + nodes: list of node names + instances: list of instance names + + """ + assert self.__class__._instance is None, "double GanetiLockManager instance" + self.__class__._instance = self + + # The keyring contains all the locks, at their level and in the correct + # locking order. + self.__keyring = { + LEVEL_CLUSTER: LockSet([BGL]), + LEVEL_NODE: LockSet(nodes), + LEVEL_INSTANCE: LockSet(instances), + LEVEL_CONFIG: LockSet([CONFIG]), + } + + def _names(self, level): + """List the lock names at the given level. + Used for debugging/testing purposes. + + Args: + level: the level whose list of locks to get + + """ + assert level in LEVELS, "Invalid locking level %s" % level + return self.__keyring[level]._names() + + def _is_owned(self, level): + """Check whether we are owning locks at the given level + + """ + return self.__keyring[level]._is_owned() + + def _list_owned(self, level): + """Get the set of owned locks at the given level + + """ + return self.__keyring[level]._list_owned() + + def _upper_owned(self, level): + """Check that we don't own any lock at a level greater than the given one. + + """ + # This way of checking only works if LEVELS[i] = i, which we check for in + # the test cases. + return utils.any((self._is_owned(l) for l in LEVELS[level + 1:])) + + def _BGL_owned(self): + """Check if the current thread owns the BGL. + + Both an exclusive or a shared acquisition work. + + """ + return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned() + + def _contains_BGL(self, level, names): + """Check if acting on the given level and set of names will change the + status of the Big Ganeti Lock. + + """ + return level == LEVEL_CLUSTER and (names is None or BGL in names) + + def acquire(self, level, names, blocking=1, shared=0): + """Acquire a set of resource locks, at the same level. + + Args: + level: the level at which the locks shall be acquired. + It must be a memmber of LEVELS. + names: the names of the locks which shall be acquired. + (special lock names, or instance/node names) + shared: whether to acquire in shared mode. By default an exclusive lock + will be acquired. + blocking: whether to block while trying to acquire or to operate in try-lock mode. + this locking mode is not supported yet. + + """ + assert level in LEVELS, "Invalid locking level %s" % level + + # Check that we are either acquiring the Big Ganeti Lock or we already own + # it. Some "legacy" opcodes need to be sure they are run non-concurrently + # so even if we've migrated we need to at least share the BGL to be + # compatible with them. Of course if we own the BGL exclusively there's no + # point in acquiring any other lock, unless perhaps we are half way through + # the migration of the current opcode. + assert (self._contains_BGL(level, names) or self._BGL_owned()), ( + "You must own the Big Ganeti Lock before acquiring any other") + + # Check we don't own locks at the same or upper levels. + assert not self._upper_owned(level), ("Cannot acquire locks at a level" + " while owning some at a greater one") + + # Acquire the locks in the set. + return self.__keyring[level].acquire(names, shared=shared, + blocking=blocking) + + def release(self, level, names=None): + """Release a set of resource locks, at the same level. + + You must have acquired the locks, either in shared or in exclusive mode, + before releasing them. + + Args: + level: the level at which the locks shall be released. + It must be a memmber of LEVELS. + names: the names of the locks which shall be released. + (defaults to all the locks acquired at that level). + + """ + assert level in LEVELS, "Invalid locking level %s" % level + assert (not self._contains_BGL(level, names) or + not self._upper_owned(LEVEL_CLUSTER)), ( + "Cannot release the Big Ganeti Lock while holding something" + " at upper levels") + + # Release will complain if we don't own the locks already + return self.__keyring[level].release(names) + + def add(self, level, names, acquired=0, shared=0): + """Add locks at the specified level. + + Args: + level: the level at which the locks shall be added. + It must be a memmber of LEVELS_MOD. + names: names of the locks to acquire + acquired: whether to acquire the newly added locks + shared: whether the acquisition will be shared + """ + assert level in LEVELS_MOD, "Invalid or immutable level %s" % level + assert self._BGL_owned(), ("You must own the BGL before performing other" + " operations") + assert not self._upper_owned(level), ("Cannot add locks at a level" + " while owning some at a greater one") + return self.__keyring[level].add(names, acquired=acquired, shared=shared) + + def remove(self, level, names, blocking=1): + """Remove locks from the specified level. + + You must either already own the locks you are trying to remove exclusively + or not own any lock at an upper level. + + Args: + level: the level at which the locks shall be removed. + It must be a memmber of LEVELS_MOD. + names: the names of the locks which shall be removed. + (special lock names, or instance/node names) + blocking: whether to block while trying to operate in try-lock mode. + this locking mode is not supported yet. + + """ + assert level in LEVELS_MOD, "Invalid or immutable level %s" % level + assert self._BGL_owned(), ("You must own the BGL before performing other" + " operations") + # Check we either own the level or don't own anything from here up. + # LockSet.remove() will check the case in which we don't own all the needed + # resources, or we have a shared ownership. + assert self._is_owned(level) or not self._upper_owned(level), ( + "Cannot remove locks at a level while not owning it or" + " owning some at a greater one") + return self.__keyring[level].remove(names, blocking) + diff --git a/test/ganeti.locking_unittest.py b/test/ganeti.locking_unittest.py index 4aaf5a55bcbe21ce331a34965ce12dda665d928c..331c31d81dd889ecea68d155c55932a6d86d8302 100755 --- a/test/ganeti.locking_unittest.py +++ b/test/ganeti.locking_unittest.py @@ -412,6 +412,151 @@ class TestLockSet(unittest.TestCase): self.ls.release() +class TestGanetiLockManager(unittest.TestCase): + + def setUp(self): + self.nodes=['n1', 'n2'] + self.instances=['i1', 'i2', 'i3'] + self.GL = locking.GanetiLockManager(nodes=self.nodes, + instances=self.instances) + self.done = Queue.Queue(0) + + def tearDown(self): + # Don't try this at home... + locking.GanetiLockManager._instance = None + + def testLockingConstants(self): + # The locking library internally cheats by assuming its constants have some + # relationships with each other. Check those hold true. + for i in range(len(locking.LEVELS)): + self.assertEqual(i, locking.LEVELS[i]) + + def testDoubleGLFails(self): + # We are not passing test=True, so instantiating a new one should fail + self.assertRaises(AssertionError, locking.GanetiLockManager) + + def testLockNames(self): + self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL'])) + self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes)) + self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set(self.instances)) + self.assertEqual(self.GL._names(locking.LEVEL_CONFIG), set(['config'])) + + def testInitAndResources(self): + locking.GanetiLockManager._instance = None + self.GL = locking.GanetiLockManager() + self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL'])) + self.assertEqual(self.GL._names(locking.LEVEL_NODE), set()) + self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set()) + self.assertEqual(self.GL._names(locking.LEVEL_CONFIG), set(['config'])) + + locking.GanetiLockManager._instance = None + self.GL = locking.GanetiLockManager(nodes=self.nodes) + self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL'])) + self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes)) + self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set()) + self.assertEqual(self.GL._names(locking.LEVEL_CONFIG), set(['config'])) + + locking.GanetiLockManager._instance = None + self.GL = locking.GanetiLockManager(instances=self.instances) + self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL'])) + self.assertEqual(self.GL._names(locking.LEVEL_NODE), set()) + self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set(self.instances)) + self.assertEqual(self.GL._names(locking.LEVEL_CONFIG), set(['config'])) + + def testAcquireRelease(self): + self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1) + self.assertEquals(self.GL._list_owned(locking.LEVEL_CLUSTER), set(['BGL'])) + self.GL.acquire(locking.LEVEL_NODE, ['n1', 'n2'], shared=1) + self.GL.release(locking.LEVEL_NODE) + self.GL.acquire(locking.LEVEL_NODE, ['n1']) + self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set(['n1'])) + self.GL.acquire(locking.LEVEL_INSTANCE, ['i1', 'i2']) + self.GL.acquire(locking.LEVEL_CONFIG, ['config']) + self.GL.release(locking.LEVEL_INSTANCE, ['i2']) + self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1'])) + self.GL.release(locking.LEVEL_NODE) + self.GL.release(locking.LEVEL_INSTANCE) + self.GL.release(locking.LEVEL_CONFIG) + self.assertRaises(errors.LockError, self.GL.acquire, + locking.LEVEL_INSTANCE, ['i5']) + self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'], shared=1) + self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i3'])) + + def testBGLDependency(self): + self.assertRaises(AssertionError, self.GL.acquire, + locking.LEVEL_NODE, ['n1', 'n2']) + self.assertRaises(AssertionError, self.GL.acquire, + locking.LEVEL_INSTANCE, ['i3']) + self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1) + self.GL.acquire(locking.LEVEL_NODE, ['n1']) + self.assertRaises(AssertionError, self.GL.release, + locking.LEVEL_CLUSTER, ['BGL']) + self.assertRaises(AssertionError, self.GL.release, + locking.LEVEL_CLUSTER) + self.GL.release(locking.LEVEL_NODE) + self.GL.acquire(locking.LEVEL_INSTANCE, ['i1', 'i2']) + self.assertRaises(AssertionError, self.GL.release, + locking.LEVEL_CLUSTER, ['BGL']) + self.assertRaises(AssertionError, self.GL.release, + locking.LEVEL_CLUSTER) + self.GL.release(locking.LEVEL_INSTANCE) + self.GL.acquire(locking.LEVEL_CONFIG, ['config']) + self.assertRaises(AssertionError, self.GL.release, + locking.LEVEL_CLUSTER) + + def testWrongOrder(self): + self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1) + self.GL.acquire(locking.LEVEL_INSTANCE, ['i3']) + self.assertRaises(AssertionError, self.GL.acquire, + locking.LEVEL_NODE, ['n1']) + self.assertRaises(AssertionError, self.GL.acquire, + locking.LEVEL_INSTANCE, ['i2']) + self.GL.acquire(locking.LEVEL_CONFIG, ['config']) + self.assertRaises(AssertionError, self.GL.acquire, + locking.LEVEL_CONFIG, ['config']) + self.GL.release(locking.LEVEL_INSTANCE) + self.assertRaises(AssertionError, self.GL.acquire, + locking.LEVEL_NODE, ['n1']) + self.assertRaises(AssertionError, self.GL.acquire, + locking.LEVEL_INSTANCE, ['i2']) + self.assertRaises(AssertionError, self.GL.acquire, + locking.LEVEL_CONFIG, ['config']) + + # Helper function to run as a thread that shared the BGL and then acquires + # some locks at another level. + def _doLock(self, level, names, shared): + try: + self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1) + self.GL.acquire(level, names, shared=shared) + self.done.put('DONE') + self.GL.release(level) + self.GL.release(locking.LEVEL_CLUSTER) + except errors.LockError: + self.done.put('ERR') + + def testConcurrency(self): + self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1) + Thread(target=self._doLock, args=(locking.LEVEL_INSTANCE, 'i1', 1)).start() + self.assertEqual(self.done.get(True, 1), 'DONE') + self.GL.acquire(locking.LEVEL_NODE, ['n1']) + self.GL.acquire(locking.LEVEL_INSTANCE, ['i3']) + self.GL.acquire(locking.LEVEL_CONFIG, ['config']) + Thread(target=self._doLock, args=(locking.LEVEL_INSTANCE, 'i1', 1)).start() + self.assertEqual(self.done.get(True, 1), 'DONE') + Thread(target=self._doLock, args=(locking.LEVEL_INSTANCE, 'i3', 1)).start() + self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self.GL.release(locking.LEVEL_CONFIG) + self.GL.release(locking.LEVEL_INSTANCE) + self.assertEqual(self.done.get(True, 1), 'DONE') + self.GL.acquire(locking.LEVEL_INSTANCE, ['i2'], shared=1) + Thread(target=self._doLock, args=(locking.LEVEL_INSTANCE, 'i2', 1)).start() + self.assertEqual(self.done.get(True, 1), 'DONE') + Thread(target=self._doLock, args=(locking.LEVEL_INSTANCE, 'i2', 0)).start() + self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self.GL.release(locking.LEVEL_INSTANCE) + self.assertEqual(self.done.get(True, 1), 'DONE') + + if __name__ == '__main__': unittest.main() #suite = unittest.TestLoader().loadTestsFromTestCase(TestSharedLock)