diff --git a/lib/cmdlib.py b/lib/cmdlib.py index a5e25316b0667e9efba2c7831c6b672a83b364f7..0dbdba1377d9bfcc6308941a3676c99cc74a0cf5 100644 --- a/lib/cmdlib.py +++ b/lib/cmdlib.py @@ -2396,7 +2396,6 @@ class LUQueryNodes(NoHooksLU): # if we don't request only static fields, we need to lock the nodes self.needed_locks[locking.LEVEL_NODE] = self.wanted - def CheckPrereq(self): """Check prerequisites. diff --git a/lib/locking.py b/lib/locking.py index 862e66d8681da7f3a7500f62d0fc7f280de687f3..7980e50ca9bb9b71c5d9f7b74d2668678c2b98dc 100644 --- a/lib/locking.py +++ b/lib/locking.py @@ -632,6 +632,12 @@ class SharedLock(object): ALL_SET = None +class _AcquireTimeout(Exception): + """Internal exception to abort an acquire on a timeout. + + """ + + class LockSet: """Implements a set of locks. @@ -702,6 +708,12 @@ class LockSet: else: return set() + def _release_and_delete_owned(self): + """Release and delete all resources owned by the current thread""" + for lname in self._list_owned(): + self.__lockdict[lname].release() + self._del_owned(name=lname) + def __names(self): """Return the current set of names. @@ -730,30 +742,43 @@ class LockSet: self.__lock.release() return set(result) - def acquire(self, names, timeout=None, shared=0): + def acquire(self, names, timeout=None, shared=0, test_notify=None): """Acquire a set of resource locks. @param names: the names of the locks which shall be acquired (special lock names, or instance/node names) @param shared: whether to acquire in shared mode; by default an exclusive lock will be acquired - @type timeout: float + @type timeout: float or None @param timeout: Maximum time to acquire all locks + @type test_notify: callable or None + @param test_notify: Special callback function for unittesting - @return: True when all the locks are successfully acquired + @return: Set of all locks successfully acquired or None in case of timeout @raise errors.LockError: when any lock we try to acquire has been deleted before we succeed. In this case none of the locks requested will be acquired. """ - if timeout is not None: - raise NotImplementedError + assert timeout is None or timeout >= 0.0 # Check we don't already own locks at this level assert not self._is_owned(), "Cannot acquire locks in the same set twice" - if names is None: + # We need to keep track of how long we spent waiting for a lock. The + # timeout passed to this function is over all lock acquires. + remaining_timeout = timeout + if timeout is None: + start = None + calc_remaining_timeout = lambda: None + else: + start = time.time() + calc_remaining_timeout = lambda: (start + timeout) - time.time() + + want_all = names is None + + if want_all: # If no names are given acquire the whole set by not letting new names # being added before we release, and getting the current list of names. # Some of them may then be deleted later, but we'll cope with this. @@ -763,7 +788,7 @@ class LockSet: # them exclusively though they won't be able to do this anyway, though, # so we'll get the list lock exclusively as well in order to be able to # do add() on the set while owning it. - self.__lock.acquire(shared=shared) + self.__lock.acquire(shared=shared, timeout=remaining_timeout) try: # note we own the set-lock self._add_owned() @@ -775,65 +800,103 @@ class LockSet: self.__lock.release() raise + # Re-calculate timeout + remaining_timeout = calc_remaining_timeout() + try: - # Support passing in a single resource to acquire rather than many - if isinstance(names, basestring): - names = [names] - else: - names = sorted(names) - - acquire_list = [] - # First we look the locks up on __lockdict. We have no way of being sure - # they will still be there after, but this makes it a lot faster should - # just one of them be the already wrong - for lname in utils.UniqueSequence(names): - try: - lock = self.__lockdict[lname] # raises KeyError if lock is not there - acquire_list.append((lname, lock)) - except (KeyError): - if self.__lock._is_owned(): - # We are acquiring all the set, it doesn't matter if this - # particular element is not there anymore. - continue - else: - raise errors.LockError('non-existing lock in set (%s)' % lname) - - # This will hold the locknames we effectively acquired. - acquired = set() - # Now acquire_list contains a sorted list of resources and locks we want. - # In order to get them we loop on this (private) list and acquire() them. - # We gave no real guarantee they will still exist till this is done but - # .acquire() itself is safe and will alert us if the lock gets deleted. - for (lname, lock) in acquire_list: - try: - lock.acquire(shared=shared) # raises LockError if the lock is deleted - # now the lock cannot be deleted, we have it! - self._add_owned(name=lname) - acquired.add(lname) - except (errors.LockError): - if self.__lock._is_owned(): - # We are acquiring all the set, it doesn't matter if this - # particular element is not there anymore. - continue + try: + # Support passing in a single resource to acquire rather than many + if isinstance(names, basestring): + names = [names] + else: + names = sorted(names) + + acquire_list = [] + # First we look the locks up on __lockdict. We have no way of being sure + # they will still be there after, but this makes it a lot faster should + # just one of them be the already wrong + for lname in utils.UniqueSequence(names): + try: + lock = self.__lockdict[lname] # raises KeyError if lock is not there + acquire_list.append((lname, lock)) + except KeyError: + if want_all: + # We are acquiring all the set, it doesn't matter if this + # particular element is not there anymore. + continue + else: + raise errors.LockError("Non-existing lock in set (%s)" % lname) + + # This will hold the locknames we effectively acquired. + acquired = set() + + # Now acquire_list contains a sorted list of resources and locks we + # want. In order to get them we loop on this (private) list and + # acquire() them. We gave no real guarantee they will still exist till + # this is done but .acquire() itself is safe and will alert us if the + # lock gets deleted. + for (lname, lock) in acquire_list: + if __debug__ and callable(test_notify): + test_notify_fn = lambda: test_notify(lname) else: - name_fail = lname - for lname in self._list_owned(): - self.__lockdict[lname].release() - self._del_owned(name=lname) - raise errors.LockError('non-existing lock in set (%s)' % name_fail) - except: - # We shouldn't have problems adding the lock to the owners list, but - # if we did we'll try to release this lock and re-raise exception. - # Of course something is going to be really wrong, after this. - if lock._is_owned(): - lock.release() - raise + test_notify_fn = None - except: - # If something went wrong and we had the set-lock let's release it... - if self.__lock._is_owned(): - self.__lock.release() - raise + try: + if timeout is not None and remaining_timeout < 0: + raise _AcquireTimeout() + + # raises LockError if the lock was deleted + if not lock.acquire(shared=shared, timeout=remaining_timeout, + test_notify=test_notify_fn): + # Couldn't get lock or timeout occurred + if timeout is None: + # This shouldn't happen as SharedLock.acquire(timeout=None) is + # blocking. + raise errors.LockError("Failed to get lock %s" % lname) + + raise _AcquireTimeout() + + # Re-calculate timeout + remaining_timeout = calc_remaining_timeout() + + # now the lock cannot be deleted, we have it! + self._add_owned(name=lname) + acquired.add(lname) + + except _AcquireTimeout: + # Release all acquired locks + self._release_and_delete_owned() + raise + + except errors.LockError: + if want_all: + # We are acquiring all the set, it doesn't matter if this + # particular element is not there anymore. + continue + + self._release_and_delete_owned() + + raise errors.LockError("Non-existing lock in set (%s)" % lname) + + except: + # We shouldn't have problems adding the lock to the owners list, but + # if we did we'll try to release this lock and re-raise exception. + # Of course something is going to be really wrong, after this. + if lock._is_owned(): + lock.release() + raise + + except: + # If something went wrong and we had the set-lock let's release it... + if want_all: + self.__lock.release() + raise + + except _AcquireTimeout: + if want_all: + self._del_owned() + + return None return acquired @@ -939,7 +1002,7 @@ class LockSet: @param names: names of the resource to remove. - @return:: a list of locks which we removed; the list is always + @return: a list of locks which we removed; the list is always equal to the names list if we were holding all the locks exclusively diff --git a/test/ganeti.locking_unittest.py b/test/ganeti.locking_unittest.py index 9483625e4476dc0ff413480715c77ca05061f1e2..f3630b791826ae96749044c82228b95ed5c57587 100755 --- a/test/ganeti.locking_unittest.py +++ b/test/ganeti.locking_unittest.py @@ -48,6 +48,15 @@ def _Repeat(fn): return wrapper +def SafeSleep(duration): + start = time.time() + while True: + delay = start + duration - time.time() + if delay <= 0.0: + break + time.sleep(delay) + + class _ThreadedTestCase(unittest.TestCase): """Test class that supports adding/waiting on threads""" def setUp(self): @@ -965,6 +974,125 @@ class TestLockSet(_ThreadedTestCase): for _ in range(6): self.failUnlessEqual(self.done.get_nowait(), 'DONE') + @_Repeat + def testSimpleAcquireTimeoutExpiring(self): + names = sorted(self.ls._names()) + self.assert_(len(names) >= 3) + + # Get name of first lock + first = names[0] + + # Get name of last lock + last = names.pop() + + checks = [ + # Block first and try to lock it again + (first, first), + + # Block last and try to lock all locks + (None, first), + + # Block last and try to lock it again + (last, last), + ] + + for (wanted, block) in checks: + # Lock in exclusive mode + self.assert_(self.ls.acquire(block, shared=0)) + + def _AcquireOne(): + # Try to get the same lock again with a timeout (should never succeed) + if self.ls.acquire(wanted, timeout=0.1, shared=0): + self.done.put("acquired") + self.ls.release() + else: + self.assert_(not self.ls._list_owned()) + self.assert_(not self.ls._is_owned()) + self.done.put("not acquired") + + self._addThread(target=_AcquireOne) + + # Wait for timeout in thread to expire + self._waitThreads() + + # Release exclusive lock again + self.ls.release() + + self.assertEqual(self.done.get_nowait(), "not acquired") + self.assertRaises(Queue.Empty, self.done.get_nowait) + + @_Repeat + def testDelayedAndExpiringLockAcquire(self): + self._setUpLS() + self.ls.add(['five', 'six', 'seven', 'eight', 'nine']) + + for expire in (False, True): + names = sorted(self.ls._names()) + self.assertEqual(len(names), 8) + + lock_ev = dict([(i, threading.Event()) for i in names]) + + # Lock all in exclusive mode + self.assert_(self.ls.acquire(names, shared=0)) + + if expire: + # We'll wait at least 300ms per lock + lockwait = len(names) * [0.3] + + # Fail if we can't acquire all locks in 400ms. There are 8 locks, so + # this gives us up to 2.4s to fail. + lockall_timeout = 0.4 + else: + # This should finish rather quickly + lockwait = None + lockall_timeout = len(names) * 5.0 + + def _LockAll(): + def acquire_notification(name): + if not expire: + self.done.put("getting %s" % name) + + # Kick next lock + lock_ev[name].set() + + if self.ls.acquire(names, shared=0, timeout=lockall_timeout, + test_notify=acquire_notification): + self.done.put("got all") + self.ls.release() + else: + self.done.put("timeout on all") + + # Notify all locks + for ev in lock_ev.values(): + ev.set() + + t = self._addThread(target=_LockAll) + + for idx, name in enumerate(names): + # Wait for actual acquire on this lock to start + lock_ev[name].wait(10.0) + + if expire and t.isAlive(): + # Wait some time after getting the notification to make sure the lock + # acquire will expire + SafeSleep(lockwait[idx]) + + self.ls.release(names=name) + + self.assert_(not self.ls._list_owned()) + + self._waitThreads() + + if expire: + # Not checking which locks were actually acquired. Doing so would be + # too timing-dependant. + self.assertEqual(self.done.get_nowait(), "timeout on all") + else: + for i in names: + self.assertEqual(self.done.get_nowait(), "getting %s" % i) + self.assertEqual(self.done.get_nowait(), "got all") + self.assertRaises(Queue.Empty, self.done.get_nowait) + @_Repeat def testConcurrentRemove(self): self.ls.add('four')