diff --git a/lib/locking.py b/lib/locking.py index 2f41d471fbc413c4fd6c85c44fd41cdf5ba6ba34..11f4927ab77810444dd082df9cbd2f42e998b2a7 100644 --- a/lib/locking.py +++ b/lib/locking.py @@ -20,11 +20,8 @@ """Module implementing the Ganeti locking code.""" -# pylint: disable-msg=W0613,W0201 - import threading -# Wouldn't it be better to define LockingError in the locking module? -# Well, for now that's how the rest of the code does it... + from ganeti import errors from ganeti import utils @@ -48,7 +45,55 @@ def ssynchronized(lock, shared=0): return wrap -class SharedLock: +class _CountingCondition(object): + """Wrapper for Python's built-in threading.Condition class. + + This wrapper keeps a count of active waiters. We can't access the internal + "__waiters" attribute of threading.Condition because it's not thread-safe. + + """ + __slots__ = [ + "_cond", + "_nwaiters", + ] + + def __init__(self, lock): + """Initializes this class. + + """ + object.__init__(self) + self._cond = threading.Condition(lock=lock) + self._nwaiters = 0 + + def notifyAll(self): + """Notifies the condition. + + """ + return self._cond.notifyAll() + + def wait(self, timeout=None): + """Waits for the condition to be notified. + + @type timeout: float or None + @param timeout: Timeout in seconds + + """ + assert self._nwaiters >= 0 + + self._nwaiters += 1 + try: + return self._cond.wait(timeout=timeout) + finally: + self._nwaiters -= 1 + + def has_waiting(self): + """Returns whether there are active waiters. + + """ + return bool(self._nwaiters) + + +class SharedLock(object): """Implements a shared lock. Multiple threads can acquire the lock in a shared way, calling @@ -60,31 +105,58 @@ class SharedLock: eventually do so. """ + __slots__ = [ + "__active_shr_c", + "__inactive_shr_c", + "__deleted", + "__exc", + "__lock", + "__pending", + "__shr", + ] + + __condition_class = _CountingCondition + def __init__(self): - """Construct a new SharedLock""" - # we have two conditions, c_shr and c_exc, sharing the same lock. + """Construct a new SharedLock. + + """ + object.__init__(self) + + # Internal lock self.__lock = threading.Lock() - self.__turn_shr = threading.Condition(self.__lock) - self.__turn_exc = threading.Condition(self.__lock) - # current lock holders + # Queue containing waiting acquires + self.__pending = [] + + # Active and inactive conditions for shared locks + self.__active_shr_c = self.__condition_class(self.__lock) + self.__inactive_shr_c = self.__condition_class(self.__lock) + + # Current lock holders self.__shr = set() self.__exc = None - # lock waiters - self.__nwait_exc = 0 - self.__nwait_shr = 0 - self.__npass_shr = 0 - # is this lock in the deleted state? self.__deleted = False + def __check_deleted(self): + """Raises an exception if the lock has been deleted. + + """ + if self.__deleted: + raise errors.LockError("Deleted lock") + def __is_sharer(self): - """Is the current thread sharing the lock at this time?""" + """Is the current thread sharing the lock at this time? + + """ return threading.currentThread() in self.__shr def __is_exclusive(self): - """Is the current thread holding the lock exclusively at this time?""" + """Is the current thread holding the lock exclusively at this time? + + """ return threading.currentThread() == self.__exc def __is_owned(self, shared=-1): @@ -112,116 +184,118 @@ class SharedLock: """ self.__lock.acquire() try: - result = self.__is_owned(shared=shared) + return self.__is_owned(shared=shared) finally: self.__lock.release() - return result - - def __wait(self, c): - """Wait on the given condition, and raise an exception if the current lock - is declared deleted in the meantime. + def _count_pending(self): + """Returns the number of pending acquires. - @param c: the condition to wait on + @rtype: int """ - c.wait() - if self.__deleted: - raise errors.LockError('deleted lock') + self.__lock.acquire() + try: + return len(self.__pending) + finally: + self.__lock.release() - def __exclusive_acquire(self): - """Acquire the lock exclusively. + def __do_acquire(self, shared): + """Actually acquire the lock. - This is a private function that presumes you are already holding the - internal lock. It's defined separately to avoid code duplication between - acquire() and delete() + """ + if shared: + self.__shr.add(threading.currentThread()) + else: + self.__exc = threading.currentThread() + + def __can_acquire(self, shared): + """Determine whether lock can be acquired. """ - self.__nwait_exc += 1 - try: - # This is to save ourselves from a nasty race condition that could - # theoretically make the sharers starve. - if self.__nwait_shr > 0 or self.__nwait_exc > 1: - self.__wait(self.__turn_exc) + if shared: + return self.__exc is None + else: + return len(self.__shr) == 0 and self.__exc is None - while len(self.__shr) > 0 or self.__exc is not None: - self.__wait(self.__turn_exc) + def __is_on_top(self, cond): + """Checks whether the passed condition is on top of the queue. - self.__exc = threading.currentThread() - finally: - self.__nwait_exc -= 1 + The caller must make sure the queue isn't empty. - assert self.__npass_shr == 0, "SharedLock: internal fairness violation" + """ + return self.__pending[0] == cond - def __shared_acquire(self): - """Acquire the lock in shared mode + def __acquire_unlocked(self, shared=0, timeout=None): + """Acquire a shared lock. - This is a private function that presumes you are already holding the - internal lock. + @param shared: whether to acquire in shared mode; by default an + exclusive lock will be acquired + @param timeout: maximum waiting time before giving up """ - self.__nwait_shr += 1 - try: - wait = False - # If there is an exclusive holder waiting we have to wait. - # We'll only do this once, though, when we start waiting for - # the lock. Then we'll just wait while there are no - # exclusive holders. - if self.__nwait_exc > 0: - # TODO: if !blocking... - wait = True - self.__wait(self.__turn_shr) - - while self.__exc is not None: - wait = True - # TODO: if !blocking... - self.__wait(self.__turn_shr) + self.__check_deleted() - self.__shr.add(threading.currentThread()) + # We cannot acquire the lock if we already have it + assert not self.__is_owned(), "double acquire() on a non-recursive lock" + + # Check whether someone else holds the lock or there are pending acquires. + if not self.__pending and self.__can_acquire(shared): + # Apparently not, can acquire lock directly. + self.__do_acquire(shared) + return True - # If we were waiting note that we passed - if wait: - self.__npass_shr -= 1 + if shared: + wait_condition = self.__active_shr_c + # Check if we're not yet in the queue + if wait_condition not in self.__pending: + self.__pending.append(wait_condition) + else: + wait_condition = self.__condition_class(self.__lock) + # Always add to queue + self.__pending.append(wait_condition) + + try: + # Wait until we become the topmost acquire in the queue or the timeout + # expires. + while not (self.__is_on_top(wait_condition) and + self.__can_acquire(shared)): + # Wait for notification + wait_condition.wait(timeout) + self.__check_deleted() + + # A lot of code assumes blocking acquires always succeed. Loop + # internally for that case. + if timeout is not None: + break + + if self.__is_on_top(wait_condition) and self.__can_acquire(shared): + self.__do_acquire(shared) + return True finally: - self.__nwait_shr -= 1 + # Remove condition from queue if there are no more waiters + if not wait_condition.has_waiting() and not self.__deleted: + self.__pending.remove(wait_condition) - assert self.__npass_shr >= 0, "Internal fairness condition weirdness" + return False - def acquire(self, blocking=1, shared=0): + def acquire(self, shared=0, timeout=None): """Acquire a shared lock. + @type shared: int @param shared: whether to acquire in shared mode; by default an exclusive lock will be acquired - @param blocking: whether to block while trying to acquire or to - operate in try-lock mode (this locking mode is not supported yet) + @type timeout: float + @param timeout: maximum waiting time before giving up """ - if not blocking: - # We don't have non-blocking mode for now - raise NotImplementedError - self.__lock.acquire() try: - if self.__deleted: - raise errors.LockError('deleted lock') - - # We cannot acquire the lock if we already have it - assert not self.__is_owned(), "double acquire() on a non-recursive lock" - assert self.__npass_shr >= 0, "Internal fairness condition weirdness" - - if shared: - self.__shared_acquire() - else: - # TODO: if !blocking... - # (or modify __exclusive_acquire for non-blocking mode) - self.__exclusive_acquire() - + return self.__acquire_unlocked(shared, timeout) finally: self.__lock.release() - return True - def release(self): """Release a Shared Lock. @@ -231,76 +305,59 @@ class SharedLock: """ self.__lock.acquire() try: - assert self.__npass_shr >= 0, "Internal fairness condition weirdness" + assert self.__is_exclusive() or self.__is_sharer(), \ + "Cannot release non-owned lock" + # Autodetect release type if self.__is_exclusive(): self.__exc = None - - # An exclusive holder has just had the lock, time to put it in shared - # mode if there are shared holders waiting. Otherwise wake up the next - # exclusive holder. - if self.__nwait_shr > 0: - # Make sure at least the ones which were blocked pass. - self.__npass_shr = self.__nwait_shr - self.__turn_shr.notifyAll() - elif self.__nwait_exc > 0: - self.__turn_exc.notify() - - elif self.__is_sharer(): + else: self.__shr.remove(threading.currentThread()) - # If there are shared holders waiting (and not just scheduled to pass) - # there *must* be an exclusive holder waiting as well; otherwise what - # were they waiting for? - assert (self.__nwait_exc > 0 or - self.__npass_shr == self.__nwait_shr), \ - "Lock sharers waiting while no exclusive is queueing" - - # If there are no more shared holders either in or scheduled to pass, - # and some exclusive holders are waiting let's wake one up. - if (len(self.__shr) == 0 and - self.__nwait_exc > 0 and - not self.__npass_shr > 0): - self.__turn_exc.notify() + # Notify topmost condition in queue + if self.__pending: + first_condition = self.__pending[0] + first_condition.notifyAll() - else: - assert False, "Cannot release non-owned lock" + if first_condition == self.__active_shr_c: + self.__active_shr_c = self.__inactive_shr_c + self.__inactive_shr_c = first_condition finally: self.__lock.release() - def delete(self, blocking=1): + def delete(self, timeout=None): """Delete a Shared Lock. This operation will declare the lock for removal. First the lock will be acquired in exclusive mode if you don't already own it, then the lock will be put in a state where any future and pending acquire() fail. - @param blocking: whether to block while trying to acquire or to - operate in try-lock mode. this locking mode is not supported - yet unless you are already holding exclusively the lock. + @type timeout: float + @param timeout: maximum waiting time before giving up """ self.__lock.acquire() try: - assert not self.__is_sharer(), "cannot delete() a lock while sharing it" + assert not self.__is_sharer(), "Cannot delete() a lock while sharing it" + + self.__check_deleted() - if self.__deleted: - raise errors.LockError('deleted lock') + # The caller is allowed to hold the lock exclusively already. + acquired = self.__is_exclusive() - if not self.__is_exclusive(): - if not blocking: - # We don't have non-blocking mode for now - raise NotImplementedError - self.__exclusive_acquire() + if not acquired: + acquired = self.__acquire_unlocked(timeout) + + if acquired: + self.__deleted = True + self.__exc = None - self.__deleted = True - self.__exc = None - # Wake up everybody, they will fail acquiring the lock and - # raise an exception instead. - self.__turn_exc.notifyAll() - self.__turn_shr.notifyAll() + # Notify all acquires. They'll throw an error. + while self.__pending: + self.__pending.pop().notifyAll() + return acquired finally: self.__lock.release() diff --git a/test/ganeti.locking_unittest.py b/test/ganeti.locking_unittest.py index aee6860f289a6e04252905e84337a36d889507ea..6be0c570ceb012653eb2ed313c39cb4e48777a0b 100755 --- a/test/ganeti.locking_unittest.py +++ b/test/ganeti.locking_unittest.py @@ -26,10 +26,10 @@ import os import unittest import time import Queue +import threading from ganeti import locking from ganeti import errors -from threading import Thread # This is used to test the ssynchronize decorator. @@ -39,6 +39,7 @@ _decoratorlock = locking.SharedLock() #: List for looping tests ITERATIONS = range(8) + def _Repeat(fn): """Decorator for executing a function many times""" def wrapper(*args, **kwargs): @@ -46,6 +47,7 @@ def _Repeat(fn): fn(*args, **kwargs) return wrapper + class _ThreadedTestCase(unittest.TestCase): """Test class that supports adding/waiting on threads""" def setUp(self): @@ -54,7 +56,7 @@ class _ThreadedTestCase(unittest.TestCase): def _addThread(self, *args, **kwargs): """Create and remember a new thread""" - t = Thread(*args, **kwargs) + t = threading.Thread(*args, **kwargs) self.threads.append(t) t.start() return t @@ -147,7 +149,7 @@ class TestSharedLock(_ThreadedTestCase): def testSharersCanCoexist(self): self.sl.acquire(shared=1) - Thread(target=self._doItSharer).start() + threading.Thread(target=self._doItSharer).start() self.assert_(self.done.get(True, 1)) self.sl.release() @@ -234,12 +236,6 @@ class TestSharedLock(_ThreadedTestCase): self.assertEqual(self.done.get_nowait(), 'SHR') self.assertEqual(self.done.get_nowait(), 'EXC') - def testNoNonBlocking(self): - self.assertRaises(NotImplementedError, self.sl.acquire, blocking=0) - self.assertRaises(NotImplementedError, self.sl.delete, blocking=0) - self.sl.acquire() - self.sl.delete(blocking=0) # Fine, because the lock is already acquired - def testDelete(self): self.sl.delete() self.assertRaises(errors.LockError, self.sl.acquire) @@ -280,6 +276,222 @@ class TestSharedLock(_ThreadedTestCase): self.assertEqual(self.done.get_nowait(), 'ERR') self.sl = locking.SharedLock() + @_Repeat + def testExclusiveAcquireTimeout(self): + def _LockExclusive(wait): + self.sl.acquire(shared=0) + self.done.put("A: start sleep") + time.sleep(wait) + self.done.put("A: end sleep") + self.sl.release() + + for shared in [0, 1]: + # Start thread to hold lock for 20 ms + self._addThread(target=_LockExclusive, args=(20.0 / 1000.0, )) + + # Wait up to 100 ms to get lock + self.failUnless(self.sl.acquire(shared=shared, timeout=0.1)) + self.done.put("got 2nd") + self.sl.release() + + self._waitThreads() + + self.assertEqual(self.done.get_nowait(), "A: start sleep") + self.assertEqual(self.done.get_nowait(), "A: end sleep") + self.assertEqual(self.done.get_nowait(), "got 2nd") + self.assertRaises(Queue.Empty, self.done.get_nowait) + + @_Repeat + def testAcquireExpiringTimeout(self): + def _AcquireWithTimeout(shared, timeout): + if not self.sl.acquire(shared=shared, timeout=timeout): + self.done.put("timeout") + + for shared in [0, 1]: + # Lock exclusively + self.sl.acquire() + + # Start shared acquires with timeout between 0 and 20 ms + for i in xrange(11): + self._addThread(target=_AcquireWithTimeout, + args=(shared, i * 2.0 / 1000.0)) + + # Wait for threads to finish (makes sure the acquire timeout expires + # before releasing the lock) + self._waitThreads() + + # Release lock + self.sl.release() + + for _ in xrange(11): + self.assertEqual(self.done.get_nowait(), "timeout") + + self.assertRaises(Queue.Empty, self.done.get_nowait) + + @_Repeat + def testSharedSkipExclusiveAcquires(self): + # Tests whether shared acquires jump in front of exclusive acquires in the + # queue. + + # Get exclusive lock while we fill the queue + self.sl.acquire() + + def _Acquire(shared, name): + if not self.sl.acquire(shared=shared): + return + + self.done.put(name) + self.sl.release() + + # Start shared acquires + for _ in xrange(5): + self._addThread(target=_Acquire, args=(1, "shared A")) + + # Start exclusive acquires + for _ in xrange(3): + self._addThread(target=_Acquire, args=(0, "exclusive B")) + + # More shared acquires + for _ in xrange(5): + self._addThread(target=_Acquire, args=(1, "shared C")) + + # More exclusive acquires + for _ in xrange(3): + self._addThread(target=_Acquire, args=(0, "exclusive D")) + + # Expect 6 pending exclusive acquires and 1 for all shared acquires + # together + self.assertEqual(self.sl._count_pending(), 7) + + # Release exclusive lock and wait + self.sl.release() + + self._waitThreads() + + # Check sequence + for _ in xrange(10): + # Shared locks aren't guaranteed to be notified in order, but they'll be + # first + self.assert_(self.done.get_nowait() in ("shared A", "shared C")) + + for _ in xrange(3): + self.assertEqual(self.done.get_nowait(), "exclusive B") + + for _ in xrange(3): + self.assertEqual(self.done.get_nowait(), "exclusive D") + + self.assertRaises(Queue.Empty, self.done.get_nowait) + + @_Repeat + def testMixedAcquireTimeout(self): + sync = threading.Condition() + + def _AcquireShared(ev): + if not self.sl.acquire(shared=1, timeout=None): + return + + self.done.put("shared") + + # Notify main thread + ev.set() + + # Wait for notification + sync.acquire() + try: + sync.wait() + finally: + sync.release() + + # Release lock + self.sl.release() + + acquires = [] + for _ in xrange(3): + ev = threading.Event() + self._addThread(target=_AcquireShared, args=(ev, )) + acquires.append(ev) + + # Wait for all acquires to finish + for i in acquires: + i.wait() + + self.assertEqual(self.sl._count_pending(), 0) + + # Try to get exclusive lock + self.failIf(self.sl.acquire(shared=0, timeout=0.02)) + + # Acquire exclusive without timeout + exclsync = threading.Condition() + exclev = threading.Event() + + def _AcquireExclusive(): + if not self.sl.acquire(shared=0): + return + + self.done.put("exclusive") + + # Notify main thread + exclev.set() + + exclsync.acquire() + try: + exclsync.wait() + finally: + exclsync.release() + + self.sl.release() + + self._addThread(target=_AcquireExclusive) + + # Try to get exclusive lock + self.failIf(self.sl.acquire(shared=0, timeout=0.02)) + + # Make all shared holders release their locks + sync.acquire() + try: + sync.notifyAll() + finally: + sync.release() + + # Wait for exclusive acquire to succeed + exclev.wait() + + self.assertEqual(self.sl._count_pending(), 0) + + # Try to get exclusive lock + self.failIf(self.sl.acquire(shared=0, timeout=0.02)) + + def _AcquireSharedSimple(): + if self.sl.acquire(shared=1, timeout=None): + self.done.put("shared2") + self.sl.release() + + for _ in xrange(10): + self._addThread(target=_AcquireSharedSimple) + + # Tell exclusive lock to release + exclsync.acquire() + try: + exclsync.notifyAll() + finally: + exclsync.release() + + # Wait for everything to finish + self._waitThreads() + + self.assertEqual(self.sl._count_pending(), 0) + + # Check sequence + for _ in xrange(3): + self.assertEqual(self.done.get_nowait(), "shared") + + self.assertEqual(self.done.get_nowait(), "exclusive") + + for _ in xrange(10): + self.assertEqual(self.done.get_nowait(), "shared2") + + self.assertRaises(Queue.Empty, self.done.get_nowait) + class TestSSynchronizedDecorator(_ThreadedTestCase): """Shared Lock Synchronized decorator test""" @@ -307,7 +519,7 @@ class TestSSynchronizedDecorator(_ThreadedTestCase): def testSharersCanCoexist(self): _decoratorlock.acquire(shared=1) - Thread(target=self._doItSharer).start() + threading.Thread(target=self._doItSharer).start() self.assert_(self.done.get(True, 1)) _decoratorlock.release() @@ -354,7 +566,6 @@ class TestLockSet(_ThreadedTestCase): self.resources = ['one', 'two', 'three'] self.ls = locking.LockSet(members=self.resources) - def testResources(self): self.assertEquals(self.ls._names(), set(self.resources)) newls = locking.LockSet() @@ -489,24 +700,24 @@ class TestLockSet(_ThreadedTestCase): # We haven't really acquired anything, so we cannot release self.assertRaises(AssertionError, self.ls.release) - def _doLockSet(self, set, shared): + def _doLockSet(self, names, shared): try: - self.ls.acquire(set, shared=shared) + self.ls.acquire(names, shared=shared) self.done.put('DONE') self.ls.release() except errors.LockError: self.done.put('ERR') - def _doAddSet(self, set): + def _doAddSet(self, names): try: - self.ls.add(set, acquired=1) + self.ls.add(names, acquired=1) self.done.put('DONE') self.ls.release() except errors.LockError: self.done.put('ERR') - def _doRemoveSet(self, set): - self.done.put(self.ls.remove(set)) + def _doRemoveSet(self, names): + self.done.put(self.ls.remove(names)) @_Repeat def testConcurrentSharedAcquire(self): @@ -537,6 +748,7 @@ class TestLockSet(_ThreadedTestCase): self._addThread(target=self._doLockSet, args=('three', 0)) self._waitThreads() self.assertEqual(self.done.get_nowait(), 'DONE') + self.assertRaises(Queue.Empty, self.done.get_nowait) self._addThread(target=self._doLockSet, args=(['one', 'two'], 0)) self._addThread(target=self._doLockSet, args=(['one', 'two'], 1)) self._addThread(target=self._doLockSet, args=('one', 0))