Commit aaae9bc0 authored by Guido Trotter's avatar Guido Trotter
Browse files

LockSet implementation and unit tests

A LockSet represents locking for a set of resources of the same type. A thread
can acquire multiple resources at the same time, and release some or all of
them, but cannot acquire more resources incrementally at different times
without releasing all of them in between.

Internally a LockSet uses a SharedLock for each resource to be able to grant
both exclusive and shared acquisition. It also supports safe addition and
removal of resources at runtime. Acquisitions are ordered alphabetically in
order to grant them to be deadlock-free. A lot of assumptions about how the
code interacts are made in order to grant both safety and speed; in order to
document all of them the code features pretty lenghty comments.

The test suit tries to catch most common interactions but cannot really tests
tight race conditions, for which we still need to rely on human checking.

This is the second basic building block for the Ganeti Lock Manager. Instance
and Node locks will be put in LockSets to manage their acquisition and release.

Reviewed-by: imsnah
parent f3b100e1
......@@ -259,3 +259,292 @@ class SharedLock:
finally:
self.__lock.release()
class LockSet:
"""Implements a set of locks.
This abstraction implements a set of shared locks for the same resource type,
distinguished by name. The user can lock a subset of the resources and the
LockSet will take care of acquiring the locks always in the same order, thus
preventing deadlock.
All the locks needed in the same set must be acquired together, though.
"""
def __init__(self, members=None):
"""Constructs a new LockSet.
Args:
members: initial members of the set
"""
# Used internally to guarantee coherency.
self.__lock = SharedLock()
# The lockdict indexes the relationship name -> lock
# The order-of-locking is implied by the alphabetical order of names
self.__lockdict = {}
if members is not None:
for name in members:
self.__lockdict[name] = SharedLock()
# The owner dict contains the set of locks each thread owns. For
# performance each thread can access its own key without a global lock on
# this structure. It is paramount though that *no* other type of access is
# done to this structure (eg. no looping over its keys). *_owner helper
# function are defined to guarantee access is correct, but in general never
# do anything different than __owners[threading.currentThread()], or there
# will be trouble.
self.__owners = {}
def _is_owned(self):
"""Is the current thread a current level owner?"""
return threading.currentThread() in self.__owners
def _add_owned(self, name):
"""Note the current thread owns the given lock"""
if self._is_owned():
self.__owners[threading.currentThread()].add(name)
else:
self.__owners[threading.currentThread()] = set([name])
def _del_owned(self, name):
"""Note the current thread owns the given lock"""
self.__owners[threading.currentThread()].remove(name)
if not self.__owners[threading.currentThread()]:
del self.__owners[threading.currentThread()]
def _list_owned(self):
"""Get the set of resource names owned by the current thread"""
if self._is_owned():
return self.__owners[threading.currentThread()].copy()
else:
return set()
def __names(self):
"""Return the current set of names.
Only call this function while holding __lock and don't iterate on the
result after releasing the lock.
"""
return set(self.__lockdict.keys())
def _names(self):
"""Return a copy of the current set of elements.
Used only for debugging purposes.
"""
self.__lock.acquire(shared=1)
try:
result = self.__names()
finally:
self.__lock.release()
return result
def acquire(self, names, blocking=1, shared=0):
"""Acquire a set of resource locks.
Args:
names: the names of the locks which shall be acquired.
(special lock names, or instance/node names)
shared: whether to acquire in shared mode. By default an exclusive lock
will be acquired.
blocking: whether to block while trying to acquire or to operate in try-lock mode.
this locking mode is not supported yet.
Returns:
True: when all the locks are successfully acquired
Raises:
errors.LockError: when any lock we try to acquire has been deleted
before we succeed. In this case none of the locks requested will be
acquired.
"""
if not blocking:
# We don't have non-blocking mode for now
raise NotImplementedError
# Check we don't already own locks at this level
assert not self._is_owned(), "Cannot acquire locks in the same set twice"
# Support passing in a single resource to acquire rather than many
if isinstance(names, basestring):
names = [names]
else:
names.sort()
# Now names contains a sorted list of resources whose lock we want to
# acquire. In order to get them we loop on this (private) list and look
# them up in __lockdict. Since we have no lock held on lockdict we have no
# guarantees on their presence, and they may even disappear after we looked
# them up. This is fine though as .acquire() itself is safe and will alert
# us if the lock gets deleted.
try:
for lname in names:
lock = self.__lockdict[lname] # raises KeyError if the lock is not there
lock.acquire(shared=shared) # raises LockError if the lock is deleted
try:
# now the lock cannot be deleted, we have it!
self._add_owned(lname)
except:
# We shouldn't have problems adding the lock to the owners list, but
# if we did we'll try to release this lock and re-raise exception.
# Of course something is going to be really wrong, after this.
lock.release()
raise
except (KeyError, errors.LockError):
name_fail = lname
for lname in self._list_owned():
self.__lockdict[lname].release()
self._del_owned(lname)
raise errors.LockError('non-existing lock in set (%s)' % name_fail)
return True
def release(self, names=None):
"""Release a set of resource locks, at the same level.
You must have acquired the locks, either in shared or in exclusive mode,
before releasing them.
Args:
names: the names of the locks which shall be released.
(defaults to all the locks acquired at that level).
"""
assert self._is_owned(), "release() on lock set while not owner"
# Support passing in a single resource to release rather than many
if isinstance(names, basestring):
names = [names]
if names is None:
names = self._list_owned()
else:
names = set(names)
assert self._list_owned().issuperset(names), (
"release() on unheld resources %s" %
names.difference(self._list_owned()))
for lockname in names:
# If we are sure the lock doesn't leave __lockdict without being
# exclusively held we can do this...
self.__lockdict[lockname].release()
self._del_owned(lockname)
def add(self, names, acquired=0, shared=0):
"""Add a new set of elements to the set
Args:
names: names of the new elements to add
acquired: pre-acquire the new resource?
shared: is the pre-acquisition shared?
"""
# Support passing in a single resource to add rather than many
if isinstance(names, basestring):
names = [names]
# Acquire the internal lock in an exclusive way, so there cannot be a
# conflicting add()
self.__lock.acquire()
try:
invalid_names = self.__names().intersection(names)
if invalid_names:
# This must be an explicit raise, not an assert, because assert is
# turned off when using optimization, and this can happen because of
# concurrency even if the user doesn't want it.
raise errors.LockError("duplicate add() (%s)" % invalid_names)
for lockname in names:
lock = SharedLock()
if acquired:
lock.acquire(shared=shared)
# now the lock cannot be deleted, we have it!
try:
self._add_owned(lockname)
except:
# We shouldn't have problems adding the lock to the owners list,
# but if we did we'll try to release this lock and re-raise
# exception. Of course something is going to be really wrong,
# after this. On the other hand the lock hasn't been added to the
# __lockdict yet so no other threads should be pending on it. This
# release is just a safety measure.
lock.release()
raise
self.__lockdict[lockname] = lock
finally:
self.__lock.release()
return True
def remove(self, names, blocking=1):
"""Remove elements from the lock set.
You can either not hold anything in the lockset or already hold a superset
of the elements you want to delete, exclusively.
Args:
names: names of the resource to remove.
blocking: whether to block while trying to acquire or to operate in
try-lock mode. this locking mode is not supported yet unless
you are already holding exclusively the locks.
Returns:
A list of lock which we failed to delete. The list is always empty if we
were holding all the locks exclusively.
"""
if not blocking and not self._is_owned():
# We don't have non-blocking mode for now
raise NotImplementedError
# Support passing in a single resource to remove rather than many
if isinstance(names, basestring):
names = [names]
# If we own any subset of this lock it must be a superset of what we want
# to delete. The ownership must also be exclusive, but that will be checked
# by the lock itself.
assert not self._is_owned() or self._list_owned().issuperset(names), (
"remove() on acquired lockset while not owning all elements")
delete_failed=[]
for lname in names:
# Calling delete() acquires the lock exclusively if we don't already own
# it, and causes all pending and subsequent lock acquires to fail. It's
# fine to call it out of order because delete() also implies release(),
# and the assertion above guarantees that if we either already hold
# everything we want to delete, or we hold none.
try:
self.__lockdict[lname].delete()
except (KeyError, errors.LockError):
delete_failed.append(lname)
# This cannot happen if we were already holding it, verify:
assert not self._is_owned(), "remove failed while holding lockset"
else:
# If no LockError was raised we are the ones who deleted the lock.
# This means we can safely remove it from lockdict, as any further or
# pending delete() or acquire() will fail (and nobody can have the lock
# since before our call to delete()).
#
# This is done in an else clause because if the exception was thrown
# it's the job of the one who actually deleted it.
del self.__lockdict[lname]
# And let's remove it from our private list if we owned it.
if self._is_owned():
self._del_owned(lname)
return delete_failed
......@@ -230,6 +230,188 @@ class TestSharedLock(unittest.TestCase):
self.assertEqual(self.done.get(True, 1), 'ERR')
class TestLockSet(unittest.TestCase):
"""LockSet tests"""
def setUp(self):
self.resources = ['one', 'two', 'three']
self.ls = locking.LockSet(self.resources)
# helper threads use the 'done' queue to tell the master they finished.
self.done = Queue.Queue(0)
def testResources(self):
self.assertEquals(self.ls._names(), set(self.resources))
newls = locking.LockSet()
self.assertEquals(newls._names(), set())
def testAcquireRelease(self):
self.ls.acquire('one')
self.assertEquals(self.ls._list_owned(), set(['one']))
self.ls.release()
self.assertEquals(self.ls._list_owned(), set())
self.ls.acquire(['one'])
self.assertEquals(self.ls._list_owned(), set(['one']))
self.ls.release()
self.assertEquals(self.ls._list_owned(), set())
self.ls.acquire(['one', 'two', 'three'])
self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
self.ls.release('one')
self.assertEquals(self.ls._list_owned(), set(['two', 'three']))
self.ls.release(['three'])
self.assertEquals(self.ls._list_owned(), set(['two']))
self.ls.release()
self.assertEquals(self.ls._list_owned(), set())
self.ls.acquire(['one', 'three'])
self.assertEquals(self.ls._list_owned(), set(['one', 'three']))
self.ls.release()
self.assertEquals(self.ls._list_owned(), set())
def testNoDoubleAcquire(self):
self.ls.acquire('one')
self.assertRaises(AssertionError, self.ls.acquire, 'one')
self.assertRaises(AssertionError, self.ls.acquire, ['two'])
self.assertRaises(AssertionError, self.ls.acquire, ['two', 'three'])
self.ls.release()
self.ls.acquire(['one', 'three'])
self.ls.release('one')
self.assertRaises(AssertionError, self.ls.acquire, ['two'])
self.ls.release('three')
def testNoWrongRelease(self):
self.assertRaises(AssertionError, self.ls.release)
self.ls.acquire('one')
self.assertRaises(AssertionError, self.ls.release, 'two')
def testAddRemove(self):
self.ls.add('four')
self.assertEquals(self.ls._list_owned(), set())
self.assert_('four' in self.ls._names())
self.ls.add(['five', 'six', 'seven'], acquired=1)
self.assert_('five' in self.ls._names())
self.assert_('six' in self.ls._names())
self.assert_('seven' in self.ls._names())
self.assertEquals(self.ls._list_owned(), set(['five', 'six', 'seven']))
self.ls.remove(['five', 'six'])
self.assert_('five' not in self.ls._names())
self.assert_('six' not in self.ls._names())
self.assertEquals(self.ls._list_owned(), set(['seven']))
self.ls.add('eight', acquired=1, shared=1)
self.assert_('eight' in self.ls._names())
self.assertEquals(self.ls._list_owned(), set(['seven', 'eight']))
self.ls.remove('seven')
self.assert_('seven' not in self.ls._names())
self.assertEquals(self.ls._list_owned(), set(['eight']))
self.ls.release()
self.ls.remove(['two'])
self.assert_('two' not in self.ls._names())
self.ls.acquire('three')
self.ls.remove(['three'])
self.assert_('three' not in self.ls._names())
self.assertEquals(self.ls.remove('three'), ['three'])
self.assertEquals(self.ls.remove(['one', 'three', 'six']), ['three', 'six'])
self.assert_('one' not in self.ls._names())
def testRemoveNonBlocking(self):
self.assertRaises(NotImplementedError, self.ls.remove, 'one', blocking=0)
self.ls.acquire('one')
self.assertEquals(self.ls.remove('one', blocking=0), [])
self.ls.acquire(['two', 'three'])
self.assertEquals(self.ls.remove(['two', 'three'], blocking=0), [])
def testNoDoubleAdd(self):
self.assertRaises(errors.LockError, self.ls.add, 'two')
self.ls.add('four')
self.assertRaises(errors.LockError, self.ls.add, 'four')
def testNoWrongRemoves(self):
self.ls.acquire(['one', 'three'], shared=1)
# Cannot remove 'two' while holding something which is not a superset
self.assertRaises(AssertionError, self.ls.remove, 'two')
# Cannot remove 'three' as we are sharing it
self.assertRaises(AssertionError, self.ls.remove, 'three')
def _doLockSet(self, set, shared):
try:
self.ls.acquire(set, shared=shared)
self.done.put('DONE')
self.ls.release()
except errors.LockError:
self.done.put('ERR')
def _doRemoveSet(self, set):
self.done.put(self.ls.remove(set))
def testConcurrentSharedAcquire(self):
self.ls.acquire(['one', 'two'], shared=1)
Thread(target=self._doLockSet, args=(['one', 'two'], 1)).start()
self.assertEqual(self.done.get(True, 1), 'DONE')
Thread(target=self._doLockSet, args=(['one', 'two', 'three'], 1)).start()
self.assertEqual(self.done.get(True, 1), 'DONE')
Thread(target=self._doLockSet, args=('three', 1)).start()
self.assertEqual(self.done.get(True, 1), 'DONE')
Thread(target=self._doLockSet, args=(['one', 'two'], 0)).start()
Thread(target=self._doLockSet, args=(['two', 'three'], 0)).start()
self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
self.ls.release()
self.assertEqual(self.done.get(True, 1), 'DONE')
self.assertEqual(self.done.get(True, 1), 'DONE')
def testConcurrentExclusiveAcquire(self):
self.ls.acquire(['one', 'two'])
Thread(target=self._doLockSet, args=('three', 1)).start()
self.assertEqual(self.done.get(True, 1), 'DONE')
Thread(target=self._doLockSet, args=('three', 0)).start()
self.assertEqual(self.done.get(True, 1), 'DONE')
Thread(target=self._doLockSet, args=(['one', 'two'], 0)).start()
Thread(target=self._doLockSet, args=(['one', 'two'], 1)).start()
Thread(target=self._doLockSet, args=('one', 0)).start()
Thread(target=self._doLockSet, args=('one', 1)).start()
Thread(target=self._doLockSet, args=(['two', 'three'], 0)).start()
Thread(target=self._doLockSet, args=(['two', 'three'], 1)).start()
self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
self.ls.release()
self.assertEqual(self.done.get(True, 1), 'DONE')
self.assertEqual(self.done.get(True, 1), 'DONE')
self.assertEqual(self.done.get(True, 1), 'DONE')
self.assertEqual(self.done.get(True, 1), 'DONE')
self.assertEqual(self.done.get(True, 1), 'DONE')
self.assertEqual(self.done.get(True, 1), 'DONE')
def testConcurrentRemove(self):
self.ls.add('four')
self.ls.acquire(['one', 'two', 'four'])
Thread(target=self._doLockSet, args=(['one', 'four'], 0)).start()
Thread(target=self._doLockSet, args=(['one', 'four'], 1)).start()
Thread(target=self._doLockSet, args=(['one', 'two'], 0)).start()
Thread(target=self._doLockSet, args=(['one', 'two'], 1)).start()
self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
self.ls.remove('one')
self.ls.release()
self.assertEqual(self.done.get(True, 1), 'ERR')
self.assertEqual(self.done.get(True, 1), 'ERR')
self.assertEqual(self.done.get(True, 1), 'ERR')
self.assertEqual(self.done.get(True, 1), 'ERR')
self.ls.add(['five', 'six'], acquired=1)
Thread(target=self._doLockSet, args=(['three', 'six'], 1)).start()
Thread(target=self._doLockSet, args=(['three', 'six'], 0)).start()
Thread(target=self._doLockSet, args=(['four', 'six'], 1)).start()
Thread(target=self._doLockSet, args=(['four', 'six'], 0)).start()
self.ls.remove('five')
self.ls.release()
self.assertEqual(self.done.get(True, 1), 'DONE')
self.assertEqual(self.done.get(True, 1), 'DONE')
self.assertEqual(self.done.get(True, 1), 'DONE')
self.assertEqual(self.done.get(True, 1), 'DONE')
self.ls.acquire(['three', 'four'])
Thread(target=self._doRemoveSet, args=(['four', 'six'], )).start()
self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
self.ls.remove('four')
self.assertEqual(self.done.get(True, 1), ['four'])
Thread(target=self._doRemoveSet, args=(['two'])).start()
self.assertEqual(self.done.get(True, 1), [])
self.ls.release()
if __name__ == '__main__':
unittest.main()
#suite = unittest.TestLoader().loadTestsFromTestCase(TestSharedLock)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment