diff --git a/lib/locking.py b/lib/locking.py index a0e418e990be7b951a8e1a1071283b52ae504c39..5d5a01d9c725522677e5bfc4eafb1542498b986f 100644 --- a/lib/locking.py +++ b/lib/locking.py @@ -33,6 +33,7 @@ import weakref import logging import heapq import itertools +import time from ganeti import errors from ganeti import utils @@ -404,12 +405,13 @@ class SharedLock(object): "__pending_by_prio", "__pending_shared", "__shr", + "__time_fn", "name", ] __condition_class = _PipeConditionWithMode - def __init__(self, name, monitor=None): + def __init__(self, name, monitor=None, _time_fn=time.time): """Construct a new SharedLock. @param name: the name of the lock @@ -421,6 +423,9 @@ class SharedLock(object): self.name = name + # Used for unittesting + self.__time_fn = _time_fn + # Internal lock self.__lock = threading.Lock() @@ -682,24 +687,27 @@ class SharedLock(object): assert priority not in self.__pending_shared self.__pending_shared[priority] = wait_condition + wait_start = self.__time_fn() + acquired = False + try: # Wait until we become the topmost acquire in the queue or the timeout # expires. - # TODO: Decrease timeout with spurious notifications - while not (self.__is_on_top(wait_condition) and - self.__can_acquire(shared)): - # Wait for notification - wait_condition.wait(timeout) - self.__check_deleted() + while True: + if self.__is_on_top(wait_condition) and self.__can_acquire(shared): + self.__do_acquire(shared) + acquired = True + break - # A lot of code assumes blocking acquires always succeed. Loop - # internally for that case. - if timeout is not None: + # A lot of code assumes blocking acquires always succeed, therefore we + # can never return False for a blocking acquire + if (timeout is not None and + utils.TimeoutExpired(wait_start, timeout, _time_fn=self.__time_fn)): break - if self.__is_on_top(wait_condition) and self.__can_acquire(shared): - self.__do_acquire(shared) - return True + # Wait for notification + wait_condition.wait(timeout) + self.__check_deleted() finally: # Remove condition from queue if there are no more waiters if not wait_condition.has_waiting(): @@ -709,7 +717,7 @@ class SharedLock(object): # (e.g. on lock deletion) self.__pending_shared.pop(priority, None) - return False + return acquired def acquire(self, shared=0, timeout=None, priority=None, test_notify=None): @@ -800,15 +808,30 @@ class SharedLock(object): self.__shr.remove(threading.currentThread()) # Notify topmost condition in queue - (priority, prioqueue) = self.__find_first_pending_queue() - if prioqueue: - cond = prioqueue[0] - cond.notifyAll() - if cond.shared: - # Prevent further shared acquires from sneaking in while waiters are - # notified - self.__pending_shared.pop(priority, None) + self.__notify_topmost() + finally: + self.__lock.release() + + def __notify_topmost(self): + """Notifies topmost condition in queue of pending acquires. + + """ + (priority, prioqueue) = self.__find_first_pending_queue() + if prioqueue: + cond = prioqueue[0] + cond.notifyAll() + if cond.shared: + # Prevent further shared acquires from sneaking in while waiters are + # notified + self.__pending_shared.pop(priority, None) + + def _notify_topmost(self): + """Exported version of L{__notify_topmost}. + """ + self.__lock.acquire() + try: + return self.__notify_topmost() finally: self.__lock.release() diff --git a/test/ganeti.locking_unittest.py b/test/ganeti.locking_unittest.py index c8a6807fc866a0689c4e940b17b167627f1524fa..e48e74faf08edad25ca8e726d1c80af88ce39f97 100755 --- a/test/ganeti.locking_unittest.py +++ b/test/ganeti.locking_unittest.py @@ -953,6 +953,84 @@ class TestSharedLock(_ThreadedTestCase): for i in sorted(perprio.keys())] for (shared, _, threads) in acquires]) + class _FakeTimeForSpuriousNotifications: + def __init__(self, now, check_end): + self.now = now + self.check_end = check_end + + # Deterministic random number generator + self.rnd = random.Random(15086) + + def time(self): + # Advance time if the random number generator thinks so (this is to test + # multiple notifications without advancing the time) + if self.rnd.random() < 0.3: + self.now += self.rnd.random() + + self.check_end(self.now) + + return self.now + + @_Repeat + def testAcquireTimeoutWithSpuriousNotifications(self): + ready = threading.Event() + locked = threading.Event() + req = Queue.Queue(0) + + epoch = 4000.0 + timeout = 60.0 + + def check_end(now): + self.assertFalse(locked.isSet()) + + # If we waited long enough (in virtual time), tell main thread to release + # lock, otherwise tell it to notify once more + req.put(now < (epoch + (timeout * 0.8))) + + time_fn = self._FakeTimeForSpuriousNotifications(epoch, check_end).time + + sl = locking.SharedLock("test", _time_fn=time_fn) + + # Acquire in exclusive mode + sl.acquire(shared=0) + + def fn(): + self.assertTrue(sl.acquire(shared=0, timeout=timeout, + test_notify=ready.set)) + locked.set() + sl.release() + self.done.put("success") + + # Start acquire with timeout and wait for it to be ready + self._addThread(target=fn) + ready.wait() + + # The separate thread is now waiting to acquire the lock, so start sending + # spurious notifications. + + # Wait for separate thread to ask for another notification + count = 0 + while req.get(): + # After sending the notification, the lock will take a short amount of + # time to notice and to retrieve the current time + sl._notify_topmost() + count += 1 + + self.assertTrue(count > 100, "Not enough notifications were sent") + + self.assertFalse(locked.isSet()) + + # Some notifications have been sent, now actually release the lock + sl.release() + + # Wait for lock to be acquired + locked.wait() + + self._waitThreads() + + self.assertEqual(self.done.get_nowait(), "success") + self.assertRaises(Queue.Empty, self.done.get_nowait) + class TestSharedLockInCondition(_ThreadedTestCase): """SharedLock as a condition lock tests"""