diff --git a/lib/locking.py b/lib/locking.py index b164ff33fde5d1a32b544e6f069edfbd7a813973..15c33e711aa80b8da8e283db16565777e0f76835 100644 --- a/lib/locking.py +++ b/lib/locking.py @@ -259,3 +259,292 @@ class SharedLock: finally: self.__lock.release() + +class LockSet: + """Implements a set of locks. + + This abstraction implements a set of shared locks for the same resource type, + distinguished by name. The user can lock a subset of the resources and the + LockSet will take care of acquiring the locks always in the same order, thus + preventing deadlock. + + All the locks needed in the same set must be acquired together, though. + + """ + def __init__(self, members=None): + """Constructs a new LockSet. + + Args: + members: initial members of the set + + """ + # Used internally to guarantee coherency. + self.__lock = SharedLock() + + # The lockdict indexes the relationship name -> lock + # The order-of-locking is implied by the alphabetical order of names + self.__lockdict = {} + + if members is not None: + for name in members: + self.__lockdict[name] = SharedLock() + + # The owner dict contains the set of locks each thread owns. For + # performance each thread can access its own key without a global lock on + # this structure. It is paramount though that *no* other type of access is + # done to this structure (eg. no looping over its keys). *_owner helper + # function are defined to guarantee access is correct, but in general never + # do anything different than __owners[threading.currentThread()], or there + # will be trouble. + self.__owners = {} + + def _is_owned(self): + """Is the current thread a current level owner?""" + return threading.currentThread() in self.__owners + + def _add_owned(self, name): + """Note the current thread owns the given lock""" + if self._is_owned(): + self.__owners[threading.currentThread()].add(name) + else: + self.__owners[threading.currentThread()] = set([name]) + + def _del_owned(self, name): + """Note the current thread owns the given lock""" + self.__owners[threading.currentThread()].remove(name) + + if not self.__owners[threading.currentThread()]: + del self.__owners[threading.currentThread()] + + def _list_owned(self): + """Get the set of resource names owned by the current thread""" + if self._is_owned(): + return self.__owners[threading.currentThread()].copy() + else: + return set() + + def __names(self): + """Return the current set of names. + + Only call this function while holding __lock and don't iterate on the + result after releasing the lock. + + """ + return set(self.__lockdict.keys()) + + def _names(self): + """Return a copy of the current set of elements. + + Used only for debugging purposes. + """ + self.__lock.acquire(shared=1) + try: + result = self.__names() + finally: + self.__lock.release() + return result + + def acquire(self, names, blocking=1, shared=0): + """Acquire a set of resource locks. + + Args: + names: the names of the locks which shall be acquired. + (special lock names, or instance/node names) + shared: whether to acquire in shared mode. By default an exclusive lock + will be acquired. + blocking: whether to block while trying to acquire or to operate in try-lock mode. + this locking mode is not supported yet. + + Returns: + True: when all the locks are successfully acquired + + Raises: + errors.LockError: when any lock we try to acquire has been deleted + before we succeed. In this case none of the locks requested will be + acquired. + + """ + if not blocking: + # We don't have non-blocking mode for now + raise NotImplementedError + + # Check we don't already own locks at this level + assert not self._is_owned(), "Cannot acquire locks in the same set twice" + + # Support passing in a single resource to acquire rather than many + if isinstance(names, basestring): + names = [names] + else: + names.sort() + + # Now names contains a sorted list of resources whose lock we want to + # acquire. In order to get them we loop on this (private) list and look + # them up in __lockdict. Since we have no lock held on lockdict we have no + # guarantees on their presence, and they may even disappear after we looked + # them up. This is fine though as .acquire() itself is safe and will alert + # us if the lock gets deleted. + + try: + for lname in names: + lock = self.__lockdict[lname] # raises KeyError if the lock is not there + lock.acquire(shared=shared) # raises LockError if the lock is deleted + try: + # now the lock cannot be deleted, we have it! + self._add_owned(lname) + except: + # We shouldn't have problems adding the lock to the owners list, but + # if we did we'll try to release this lock and re-raise exception. + # Of course something is going to be really wrong, after this. + lock.release() + raise + + except (KeyError, errors.LockError): + name_fail = lname + for lname in self._list_owned(): + self.__lockdict[lname].release() + self._del_owned(lname) + raise errors.LockError('non-existing lock in set (%s)' % name_fail) + + return True + + def release(self, names=None): + """Release a set of resource locks, at the same level. + + You must have acquired the locks, either in shared or in exclusive mode, + before releasing them. + + Args: + names: the names of the locks which shall be released. + (defaults to all the locks acquired at that level). + + """ + + assert self._is_owned(), "release() on lock set while not owner" + + # Support passing in a single resource to release rather than many + if isinstance(names, basestring): + names = [names] + + if names is None: + names = self._list_owned() + else: + names = set(names) + assert self._list_owned().issuperset(names), ( + "release() on unheld resources %s" % + names.difference(self._list_owned())) + + for lockname in names: + # If we are sure the lock doesn't leave __lockdict without being + # exclusively held we can do this... + self.__lockdict[lockname].release() + self._del_owned(lockname) + + def add(self, names, acquired=0, shared=0): + """Add a new set of elements to the set + + Args: + names: names of the new elements to add + acquired: pre-acquire the new resource? + shared: is the pre-acquisition shared? + + """ + # Support passing in a single resource to add rather than many + if isinstance(names, basestring): + names = [names] + + # Acquire the internal lock in an exclusive way, so there cannot be a + # conflicting add() + self.__lock.acquire() + try: + invalid_names = self.__names().intersection(names) + if invalid_names: + # This must be an explicit raise, not an assert, because assert is + # turned off when using optimization, and this can happen because of + # concurrency even if the user doesn't want it. + raise errors.LockError("duplicate add() (%s)" % invalid_names) + + for lockname in names: + lock = SharedLock() + + if acquired: + lock.acquire(shared=shared) + # now the lock cannot be deleted, we have it! + try: + self._add_owned(lockname) + except: + # We shouldn't have problems adding the lock to the owners list, + # but if we did we'll try to release this lock and re-raise + # exception. Of course something is going to be really wrong, + # after this. On the other hand the lock hasn't been added to the + # __lockdict yet so no other threads should be pending on it. This + # release is just a safety measure. + lock.release() + raise + + self.__lockdict[lockname] = lock + + finally: + self.__lock.release() + + return True + + def remove(self, names, blocking=1): + """Remove elements from the lock set. + + You can either not hold anything in the lockset or already hold a superset + of the elements you want to delete, exclusively. + + Args: + names: names of the resource to remove. + blocking: whether to block while trying to acquire or to operate in + try-lock mode. this locking mode is not supported yet unless + you are already holding exclusively the locks. + + Returns: + A list of lock which we failed to delete. The list is always empty if we + were holding all the locks exclusively. + + """ + if not blocking and not self._is_owned(): + # We don't have non-blocking mode for now + raise NotImplementedError + + # Support passing in a single resource to remove rather than many + if isinstance(names, basestring): + names = [names] + + # If we own any subset of this lock it must be a superset of what we want + # to delete. The ownership must also be exclusive, but that will be checked + # by the lock itself. + assert not self._is_owned() or self._list_owned().issuperset(names), ( + "remove() on acquired lockset while not owning all elements") + + delete_failed=[] + + for lname in names: + # Calling delete() acquires the lock exclusively if we don't already own + # it, and causes all pending and subsequent lock acquires to fail. It's + # fine to call it out of order because delete() also implies release(), + # and the assertion above guarantees that if we either already hold + # everything we want to delete, or we hold none. + try: + self.__lockdict[lname].delete() + except (KeyError, errors.LockError): + delete_failed.append(lname) + # This cannot happen if we were already holding it, verify: + assert not self._is_owned(), "remove failed while holding lockset" + else: + # If no LockError was raised we are the ones who deleted the lock. + # This means we can safely remove it from lockdict, as any further or + # pending delete() or acquire() will fail (and nobody can have the lock + # since before our call to delete()). + # + # This is done in an else clause because if the exception was thrown + # it's the job of the one who actually deleted it. + del self.__lockdict[lname] + # And let's remove it from our private list if we owned it. + if self._is_owned(): + self._del_owned(lname) + + return delete_failed + diff --git a/test/ganeti.locking_unittest.py b/test/ganeti.locking_unittest.py index 75d9e9475a76c899c0df04accd0fa0aeae40b5aa..4aaf5a55bcbe21ce331a34965ce12dda665d928c 100755 --- a/test/ganeti.locking_unittest.py +++ b/test/ganeti.locking_unittest.py @@ -230,6 +230,188 @@ class TestSharedLock(unittest.TestCase): self.assertEqual(self.done.get(True, 1), 'ERR') +class TestLockSet(unittest.TestCase): + """LockSet tests""" + + def setUp(self): + self.resources = ['one', 'two', 'three'] + self.ls = locking.LockSet(self.resources) + # helper threads use the 'done' queue to tell the master they finished. + self.done = Queue.Queue(0) + + def testResources(self): + self.assertEquals(self.ls._names(), set(self.resources)) + newls = locking.LockSet() + self.assertEquals(newls._names(), set()) + + def testAcquireRelease(self): + self.ls.acquire('one') + self.assertEquals(self.ls._list_owned(), set(['one'])) + self.ls.release() + self.assertEquals(self.ls._list_owned(), set()) + self.ls.acquire(['one']) + self.assertEquals(self.ls._list_owned(), set(['one'])) + self.ls.release() + self.assertEquals(self.ls._list_owned(), set()) + self.ls.acquire(['one', 'two', 'three']) + self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three'])) + self.ls.release('one') + self.assertEquals(self.ls._list_owned(), set(['two', 'three'])) + self.ls.release(['three']) + self.assertEquals(self.ls._list_owned(), set(['two'])) + self.ls.release() + self.assertEquals(self.ls._list_owned(), set()) + self.ls.acquire(['one', 'three']) + self.assertEquals(self.ls._list_owned(), set(['one', 'three'])) + self.ls.release() + self.assertEquals(self.ls._list_owned(), set()) + + def testNoDoubleAcquire(self): + self.ls.acquire('one') + self.assertRaises(AssertionError, self.ls.acquire, 'one') + self.assertRaises(AssertionError, self.ls.acquire, ['two']) + self.assertRaises(AssertionError, self.ls.acquire, ['two', 'three']) + self.ls.release() + self.ls.acquire(['one', 'three']) + self.ls.release('one') + self.assertRaises(AssertionError, self.ls.acquire, ['two']) + self.ls.release('three') + + def testNoWrongRelease(self): + self.assertRaises(AssertionError, self.ls.release) + self.ls.acquire('one') + self.assertRaises(AssertionError, self.ls.release, 'two') + + def testAddRemove(self): + self.ls.add('four') + self.assertEquals(self.ls._list_owned(), set()) + self.assert_('four' in self.ls._names()) + self.ls.add(['five', 'six', 'seven'], acquired=1) + self.assert_('five' in self.ls._names()) + self.assert_('six' in self.ls._names()) + self.assert_('seven' in self.ls._names()) + self.assertEquals(self.ls._list_owned(), set(['five', 'six', 'seven'])) + self.ls.remove(['five', 'six']) + self.assert_('five' not in self.ls._names()) + self.assert_('six' not in self.ls._names()) + self.assertEquals(self.ls._list_owned(), set(['seven'])) + self.ls.add('eight', acquired=1, shared=1) + self.assert_('eight' in self.ls._names()) + self.assertEquals(self.ls._list_owned(), set(['seven', 'eight'])) + self.ls.remove('seven') + self.assert_('seven' not in self.ls._names()) + self.assertEquals(self.ls._list_owned(), set(['eight'])) + self.ls.release() + self.ls.remove(['two']) + self.assert_('two' not in self.ls._names()) + self.ls.acquire('three') + self.ls.remove(['three']) + self.assert_('three' not in self.ls._names()) + self.assertEquals(self.ls.remove('three'), ['three']) + self.assertEquals(self.ls.remove(['one', 'three', 'six']), ['three', 'six']) + self.assert_('one' not in self.ls._names()) + + def testRemoveNonBlocking(self): + self.assertRaises(NotImplementedError, self.ls.remove, 'one', blocking=0) + self.ls.acquire('one') + self.assertEquals(self.ls.remove('one', blocking=0), []) + self.ls.acquire(['two', 'three']) + self.assertEquals(self.ls.remove(['two', 'three'], blocking=0), []) + + def testNoDoubleAdd(self): + self.assertRaises(errors.LockError, self.ls.add, 'two') + self.ls.add('four') + self.assertRaises(errors.LockError, self.ls.add, 'four') + + def testNoWrongRemoves(self): + self.ls.acquire(['one', 'three'], shared=1) + # Cannot remove 'two' while holding something which is not a superset + self.assertRaises(AssertionError, self.ls.remove, 'two') + # Cannot remove 'three' as we are sharing it + self.assertRaises(AssertionError, self.ls.remove, 'three') + + def _doLockSet(self, set, shared): + try: + self.ls.acquire(set, shared=shared) + self.done.put('DONE') + self.ls.release() + except errors.LockError: + self.done.put('ERR') + + def _doRemoveSet(self, set): + self.done.put(self.ls.remove(set)) + + def testConcurrentSharedAcquire(self): + self.ls.acquire(['one', 'two'], shared=1) + Thread(target=self._doLockSet, args=(['one', 'two'], 1)).start() + self.assertEqual(self.done.get(True, 1), 'DONE') + Thread(target=self._doLockSet, args=(['one', 'two', 'three'], 1)).start() + self.assertEqual(self.done.get(True, 1), 'DONE') + Thread(target=self._doLockSet, args=('three', 1)).start() + self.assertEqual(self.done.get(True, 1), 'DONE') + Thread(target=self._doLockSet, args=(['one', 'two'], 0)).start() + Thread(target=self._doLockSet, args=(['two', 'three'], 0)).start() + self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self.ls.release() + self.assertEqual(self.done.get(True, 1), 'DONE') + self.assertEqual(self.done.get(True, 1), 'DONE') + + def testConcurrentExclusiveAcquire(self): + self.ls.acquire(['one', 'two']) + Thread(target=self._doLockSet, args=('three', 1)).start() + self.assertEqual(self.done.get(True, 1), 'DONE') + Thread(target=self._doLockSet, args=('three', 0)).start() + self.assertEqual(self.done.get(True, 1), 'DONE') + Thread(target=self._doLockSet, args=(['one', 'two'], 0)).start() + Thread(target=self._doLockSet, args=(['one', 'two'], 1)).start() + Thread(target=self._doLockSet, args=('one', 0)).start() + Thread(target=self._doLockSet, args=('one', 1)).start() + Thread(target=self._doLockSet, args=(['two', 'three'], 0)).start() + Thread(target=self._doLockSet, args=(['two', 'three'], 1)).start() + self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self.ls.release() + self.assertEqual(self.done.get(True, 1), 'DONE') + self.assertEqual(self.done.get(True, 1), 'DONE') + self.assertEqual(self.done.get(True, 1), 'DONE') + self.assertEqual(self.done.get(True, 1), 'DONE') + self.assertEqual(self.done.get(True, 1), 'DONE') + self.assertEqual(self.done.get(True, 1), 'DONE') + + def testConcurrentRemove(self): + self.ls.add('four') + self.ls.acquire(['one', 'two', 'four']) + Thread(target=self._doLockSet, args=(['one', 'four'], 0)).start() + Thread(target=self._doLockSet, args=(['one', 'four'], 1)).start() + Thread(target=self._doLockSet, args=(['one', 'two'], 0)).start() + Thread(target=self._doLockSet, args=(['one', 'two'], 1)).start() + self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self.ls.remove('one') + self.ls.release() + self.assertEqual(self.done.get(True, 1), 'ERR') + self.assertEqual(self.done.get(True, 1), 'ERR') + self.assertEqual(self.done.get(True, 1), 'ERR') + self.assertEqual(self.done.get(True, 1), 'ERR') + self.ls.add(['five', 'six'], acquired=1) + Thread(target=self._doLockSet, args=(['three', 'six'], 1)).start() + Thread(target=self._doLockSet, args=(['three', 'six'], 0)).start() + Thread(target=self._doLockSet, args=(['four', 'six'], 1)).start() + Thread(target=self._doLockSet, args=(['four', 'six'], 0)).start() + self.ls.remove('five') + self.ls.release() + self.assertEqual(self.done.get(True, 1), 'DONE') + self.assertEqual(self.done.get(True, 1), 'DONE') + self.assertEqual(self.done.get(True, 1), 'DONE') + self.assertEqual(self.done.get(True, 1), 'DONE') + self.ls.acquire(['three', 'four']) + Thread(target=self._doRemoveSet, args=(['four', 'six'], )).start() + self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self.ls.remove('four') + self.assertEqual(self.done.get(True, 1), ['four']) + Thread(target=self._doRemoveSet, args=(['two'])).start() + self.assertEqual(self.done.get(True, 1), []) + self.ls.release() + + if __name__ == '__main__': unittest.main() #suite = unittest.TestLoader().loadTestsFromTestCase(TestSharedLock)