diff --git a/lib/locking.py b/lib/locking.py index ec7180a21f4dc426f91e53859532d7015f014748..b0b4758b65cc28de5b9b6c19b9138fb6a3022f2d 100644 --- a/lib/locking.py +++ b/lib/locking.py @@ -53,7 +53,14 @@ _LOCK_ACQUIRE_MIN_TIMEOUT = (1.0 / 1000) # Internal lock acquisition modes for L{LockSet} (_LS_ACQUIRE_EXACT, - _LS_ACQUIRE_ALL) = range(1, 3) + _LS_ACQUIRE_ALL, + _LS_ACQUIRE_OPPORTUNISTIC) = range(1, 4) + +_LS_ACQUIRE_MODES = frozenset([ + _LS_ACQUIRE_EXACT, + _LS_ACQUIRE_ALL, + _LS_ACQUIRE_OPPORTUNISTIC, + ]) def ssynchronized(mylock, shared=0): @@ -917,6 +924,52 @@ class SharedLock(object): ALL_SET = None +def _TimeoutZero(): + """Returns the number zero. + + """ + return 0 + + +def _GetLsAcquireModeAndTimeouts(want_all, timeout, opportunistic): + """Determines modes and timeouts for L{LockSet.acquire}. + + @type want_all: boolean + @param want_all: Whether all locks in set should be acquired + @param timeout: Timeout in seconds or C{None} + @param opportunistic: Whther locks should be acquired opportunistically + @rtype: tuple + @return: Tuple containing mode to be passed to L{LockSet.__acquire_inner} + (one of L{_LS_ACQUIRE_MODES}), a function to calculate timeout for + acquiring the lockset-internal lock (might be C{None}) and a function to + calculate the timeout for acquiring individual locks + + """ + # Short circuit when no running timeout is needed + if opportunistic and not want_all: + assert timeout is None, "Got timeout for an opportunistic acquisition" + return (_LS_ACQUIRE_OPPORTUNISTIC, None, _TimeoutZero) + + # We need to keep track of how long we spent waiting for a lock. The + # timeout passed to this function is over all lock acquisitions. + running_timeout = utils.RunningTimeout(timeout, False) + + if want_all: + mode = _LS_ACQUIRE_ALL + ls_timeout_fn = running_timeout.Remaining + else: + mode = _LS_ACQUIRE_EXACT + ls_timeout_fn = None + + if opportunistic: + mode = _LS_ACQUIRE_OPPORTUNISTIC + timeout_fn = _TimeoutZero + else: + timeout_fn = running_timeout.Remaining + + return (mode, ls_timeout_fn, timeout_fn) + + class _AcquireTimeout(Exception): """Internal exception to abort an acquire on a timeout. @@ -1114,9 +1167,12 @@ class LockSet: return set(result) def acquire(self, names, timeout=None, shared=0, priority=None, - test_notify=None): + opportunistic=False, test_notify=None): """Acquire a set of resource locks. + @note: When acquiring locks opportunistically, any number of locks might + actually be acquired, even zero. + @type names: list of strings (or string) @param names: the names of the locks which shall be acquired (special lock names, or instance/node names) @@ -1124,9 +1180,16 @@ class LockSet: @param shared: whether to acquire in shared mode; by default an exclusive lock will be acquired @type timeout: float or None - @param timeout: Maximum time to acquire all locks + @param timeout: Maximum time to acquire all locks; for opportunistic + acquisitions, a timeout can only be given when C{names} is C{None}, in + which case it is exclusively used for acquiring the L{LockSet}-internal + lock; opportunistic acquisitions don't use a timeout for acquiring + individual locks @type priority: integer @param priority: Priority for acquiring locks + @type opportunistic: boolean + @param opportunistic: Acquire locks opportunistically; use the return value + to determine which locks were actually acquired @type test_notify: callable or None @param test_notify: Special callback function for unittesting @@ -1146,20 +1209,26 @@ class LockSet: if priority is None: priority = _DEFAULT_PRIORITY - # We need to keep track of how long we spent waiting for a lock. The - # timeout passed to this function is over all lock acquires. - running_timeout = utils.RunningTimeout(timeout, False) - try: if names is not None: + assert timeout is None or not opportunistic, \ + ("Opportunistic acquisitions can only use a timeout if no" + " names are given; see docstring for details") + # Support passing in a single resource to acquire rather than many if isinstance(names, basestring): names = [names] - return self.__acquire_inner(names, _LS_ACQUIRE_EXACT, shared, priority, - running_timeout.Remaining, test_notify) + (mode, _, timeout_fn) = \ + _GetLsAcquireModeAndTimeouts(False, timeout, opportunistic) + + return self.__acquire_inner(names, mode, shared, priority, + timeout_fn, test_notify) else: + (mode, ls_timeout_fn, timeout_fn) = \ + _GetLsAcquireModeAndTimeouts(True, timeout, opportunistic) + # If no names are given acquire the whole set by not letting new names # being added before we release, and getting the current list of names. # Some of them may then be deleted later, but we'll cope with this. @@ -1170,15 +1239,15 @@ class LockSet: # anyway, though, so we'll get the list lock exclusively as well in # order to be able to do add() on the set while owning it. if not self.__lock.acquire(shared=shared, priority=priority, - timeout=running_timeout.Remaining()): + timeout=ls_timeout_fn()): raise _AcquireTimeout() + try: # note we own the set-lock self._add_owned() - return self.__acquire_inner(self.__names(), _LS_ACQUIRE_ALL, shared, - priority, running_timeout.Remaining, - test_notify) + return self.__acquire_inner(self.__names(), mode, shared, + priority, timeout_fn, test_notify) except: # We shouldn't have problems adding the lock to the owners list, but # if we did we'll try to release this lock and re-raise exception. @@ -1194,15 +1263,27 @@ class LockSet: timeout_fn, test_notify): """Inner logic for acquiring a number of locks. + Acquisition modes: + + - C{_LS_ACQUIRE_ALL}: C{names} contains names of all locks in set, but + deleted locks can be ignored as the whole set is being acquired with + its internal lock held + - C{_LS_ACQUIRE_EXACT}: The names listed in C{names} must be acquired; + timeouts and deleted locks are fatal + - C{_LS_ACQUIRE_OPPORTUNISTIC}: C{names} lists names of locks (potentially + all within the set) which should be acquired opportunistically, that is + failures are ignored + @param names: Names of the locks to be acquired - @param mode: Lock acquisition mode + @param mode: Lock acquisition mode (one of L{_LS_ACQUIRE_MODES}) @param shared: Whether to acquire in shared mode - @param timeout_fn: Function returning remaining timeout + @param timeout_fn: Function returning remaining timeout (C{None} for + opportunistic acquisitions) @param priority: Priority for acquiring locks @param test_notify: Special callback function for unittesting """ - assert mode in (_LS_ACQUIRE_EXACT, _LS_ACQUIRE_ALL) + assert mode in _LS_ACQUIRE_MODES acquire_list = [] @@ -1246,7 +1327,7 @@ class LockSet: priority=priority, test_notify=test_notify_fn) except errors.LockError: - if mode == _LS_ACQUIRE_ALL: + if mode in (_LS_ACQUIRE_ALL, _LS_ACQUIRE_OPPORTUNISTIC): # We are acquiring the whole set, it doesn't matter if this # particular element is not there anymore. continue @@ -1256,6 +1337,10 @@ class LockSet: if not acq_success: # Couldn't get lock or timeout occurred + if mode == _LS_ACQUIRE_OPPORTUNISTIC: + # Ignore timeouts on opportunistic acquisitions + continue + if timeout is None: # This shouldn't happen as SharedLock.acquire(timeout=None) is # blocking. diff --git a/test/ganeti.locking_unittest.py b/test/ganeti.locking_unittest.py index 6863d6d90ce77000a132d770ca300ab27c6ef7f6..705103f141e91bab2272d8fc05495847132a5f79 100755 --- a/test/ganeti.locking_unittest.py +++ b/test/ganeti.locking_unittest.py @@ -1780,6 +1780,235 @@ class TestLockSet(_ThreadedTestCase): self.assertRaises(Queue.Empty, self.done.get_nowait) self.assertRaises(Queue.Empty, done_two.get_nowait) + def testNamesWithOpportunisticAndTimeout(self): + self.assertRaises(AssertionError, self.ls.acquire, + ["one", "two"], timeout=1.0, opportunistic=True) + + def testOpportunisticWithUnknownName(self): + name = "unknown" + self.assertFalse(name in self.ls._names()) + result = self.ls.acquire(name, opportunistic=True) + self.assertFalse(result) + self.assertFalse(self.ls.list_owned()) + + result = self.ls.acquire(["two", name], opportunistic=True) + self.assertEqual(result, set(["two"])) + self.assertEqual(self.ls.list_owned(), set(["two"])) + + self.ls.release() + + def testSimpleOpportunisticAcquisition(self): + self.assertEquals(self.ls._names(), set(["one", "two", "three"])) + + # Hold a lock in main thread + self.assertEqual(self.ls.acquire("two", shared=0), set(["two"])) + + def fn(): + # The lock "two" is held by the main thread + result = self.ls.acquire(["one", "two"], shared=0, opportunistic=True) + self.assertEqual(result, set(["one"])) + self.assertEqual(self.ls.list_owned(), set(["one"])) + self.assertFalse(self.ls._get_lock().is_owned()) + + self.ls.release() + self.assertFalse(self.ls.list_owned()) + + # Try to acquire the lock held by the main thread + result = self.ls.acquire(["two"], shared=0, opportunistic=True) + self.assertFalse(self.ls._get_lock().is_owned()) + self.assertFalse(result) + self.assertFalse(self.ls.list_owned()) + + # Try to acquire all locks + result = self.ls.acquire(locking.ALL_SET, shared=0, opportunistic=True) + self.assertTrue(self.ls._get_lock().is_owned(), + msg="Internal lock is not owned") + self.assertEqual(result, set(["one", "three"])) + self.assertEqual(self.ls.list_owned(), set(["one", "three"])) + + self.ls.release() + + self.assertFalse(self.ls.list_owned()) + + self.done.put(True) + + self._addThread(target=fn) + + # Wait for threads to finish + self._waitThreads() + + self.assertEqual(self.ls.list_owned(), set(["two"])) + + self.ls.release() + self.assertFalse(self.ls.list_owned()) + self.assertFalse(self.ls._get_lock().is_owned()) + + self.assertTrue(self.done.get_nowait()) + self.assertRaises(Queue.Empty, self.done.get_nowait) + + def testOpportunisticAcquisitionWithoutNamesExpires(self): + self.assertEquals(self.ls._names(), set(["one", "two", "three"])) + + # Hold all locks in main thread + self.ls.acquire(locking.ALL_SET, shared=0) + self.assertTrue(self.ls._get_lock().is_owned()) + + def fn(): + # Try to acquire all locks in separate thread + result = self.ls.acquire(locking.ALL_SET, shared=0, opportunistic=True, + timeout=0.1) + self.assertFalse(result) + self.assertFalse(self.ls._get_lock().is_owned()) + self.assertFalse(self.ls.list_owned()) + + # Try once more without a timeout + self.assertFalse(self.ls.acquire("one", shared=0, opportunistic=True)) + + self.done.put(True) + + self._addThread(target=fn) + + # Wait for threads to finish + self._waitThreads() + + self.assertEqual(self.ls.list_owned(), set(["one", "two", "three"])) + + self.ls.release() + self.assertFalse(self.ls.list_owned()) + self.assertFalse(self.ls._get_lock().is_owned(shared=0)) + + self.assertTrue(self.done.get_nowait()) + self.assertRaises(Queue.Empty, self.done.get_nowait) + + def testSharedOpportunisticAcquisitionWithoutNames(self): + self.assertEquals(self.ls._names(), set(["one", "two", "three"])) + + # Hold all locks in main thread + self.ls.acquire(locking.ALL_SET, shared=1) + self.assertTrue(self.ls._get_lock().is_owned(shared=1)) + + def fn(): + # Try to acquire all locks in separate thread in shared mode + result = self.ls.acquire(locking.ALL_SET, shared=1, opportunistic=True, + timeout=0.1) + self.assertEqual(result, set(["one", "two", "three"])) + self.assertTrue(self.ls._get_lock().is_owned(shared=1)) + self.ls.release() + self.assertFalse(self.ls._get_lock().is_owned()) + + # Try one in exclusive mode + self.assertFalse(self.ls.acquire("one", shared=0, opportunistic=True)) + + self.done.put(True) + + self._addThread(target=fn) + + # Wait for threads to finish + self._waitThreads() + + self.assertEqual(self.ls.list_owned(), set(["one", "two", "three"])) + + self.ls.release() + self.assertFalse(self.ls.list_owned()) + self.assertFalse(self.ls._get_lock().is_owned()) + + self.assertTrue(self.done.get_nowait()) + self.assertRaises(Queue.Empty, self.done.get_nowait) + + def testLockDeleteWithOpportunisticAcquisition(self): + # This test exercises some code handling LockError on acquisition, that is + # after all lock names have been gathered. This shouldn't happen in reality + # as removing locks from the set requires the lockset-internal lock, but + # the code should handle the situation anyway. + ready = threading.Event() + finished = threading.Event() + + self.assertEquals(self.ls._names(), set(["one", "two", "three"])) + + # Thread function to delete lock + def fn(): + # Wait for notification + ready.wait() + + # Delete lock named "two" by accessing lockset-internal data + ld = self.ls._get_lockdict() + self.assertTrue(ld["two"].delete()) + + self.done.put("deleted.two") + + # Notify helper + finished.set() + + self._addThread(target=fn) + + # Notification helper, called when lock already holds internal lock. + # Therefore only one of the locks not yet locked can be deleted. + def notify(name): + self.done.put("notify.%s" % name) + + if name == "one": + # Tell helper thread to delete lock "two" + ready.set() + finished.wait() + + # Hold all locks in main thread + self.ls.acquire(locking.ALL_SET, shared=0, test_notify=notify) + self.assertEqual(self.ls.list_owned(), set(["one", "three"])) + + # Wait for threads to finish + self._waitThreads() + + # Release all locks + self.ls.release() + self.assertFalse(self.ls.list_owned()) + self.assertFalse(self.ls._get_lock().is_owned()) + + self.assertEqual(self.done.get_nowait(), "notify.one") + self.assertEqual(self.done.get_nowait(), "deleted.two") + self.assertEqual(self.done.get_nowait(), "notify.three") + self.assertEqual(self.done.get_nowait(), "notify.two") + self.assertRaises(Queue.Empty, self.done.get_nowait) + + +class TestGetLsAcquireModeAndTimeouts(unittest.TestCase): + def setUp(self): + self.fn = locking._GetLsAcquireModeAndTimeouts + + def testOpportunisticWithoutNames(self): + (mode, ls_timeout_fn, timeout_fn) = self.fn(False, None, True) + self.assertEqual(mode, locking._LS_ACQUIRE_OPPORTUNISTIC) + self.assertTrue(ls_timeout_fn is None) + self.assertEqual(timeout_fn(), 0) + + def testAllInputCombinations(self): + for want_all in [False, True]: + for timeout in [None, 0, 100]: + for opportunistic in [False, True]: + if (opportunistic and + not want_all and + timeout is not None): + # Can't accept a timeout when acquiring opportunistically + self.assertRaises(AssertionError, self.fn, + want_all, timeout, opportunistic) + else: + (mode, ls_timeout_fn, timeout_fn) = \ + self.fn(want_all, timeout, opportunistic) + + if opportunistic: + self.assertEqual(mode, locking._LS_ACQUIRE_OPPORTUNISTIC) + self.assertEqual(timeout_fn(), 0) + else: + self.assertTrue(callable(timeout_fn)) + if want_all: + self.assertEqual(mode, locking._LS_ACQUIRE_ALL) + else: + self.assertEqual(mode, locking._LS_ACQUIRE_EXACT) + + if want_all: + self.assertTrue(callable(ls_timeout_fn)) + else: + self.assertTrue(ls_timeout_fn is None) + class TestGanetiLockManager(_ThreadedTestCase): def setUp(self):