From 8d7d8b570a42e7d90b290ca4bb40684adf72d4ef Mon Sep 17 00:00:00 2001 From: Michael Hanselmann <hansmi@google.com> Date: Wed, 21 Mar 2012 14:55:22 +0100 Subject: [PATCH] locking: Handle spurious notifications on lock acquire This was already a TODO since the implementation of lock priorities in September 2010. Under certain conditions a waiting acquire can be notified at a time when it can't actually get the lock. In this case it would try and fail to acquire the lock and then return to the caller before the timeout ends. While this is not bad (nothing breaks), it isn't nice either. A separate patch will prevent unnecessary notifications when shared locks are released. Signed-off-by: Michael Hanselmann <hansmi@google.com> Reviewed-by: Bernardo Dal Seno <bdalseno@google.com> --- lib/locking.py | 67 ++++++++++++++++++---------- test/ganeti.locking_unittest.py | 78 +++++++++++++++++++++++++++++++++ 2 files changed, 123 insertions(+), 22 deletions(-) diff --git a/lib/locking.py b/lib/locking.py index a0e418e99..5d5a01d9c 100644 --- a/lib/locking.py +++ b/lib/locking.py @@ -33,6 +33,7 @@ import weakref import logging import heapq import itertools +import time from ganeti import errors from ganeti import utils @@ -404,12 +405,13 @@ class SharedLock(object): "__pending_by_prio", "__pending_shared", "__shr", + "__time_fn", "name", ] __condition_class = _PipeConditionWithMode - def __init__(self, name, monitor=None): + def __init__(self, name, monitor=None, _time_fn=time.time): """Construct a new SharedLock. @param name: the name of the lock @@ -421,6 +423,9 @@ class SharedLock(object): self.name = name + # Used for unittesting + self.__time_fn = _time_fn + # Internal lock self.__lock = threading.Lock() @@ -682,24 +687,27 @@ class SharedLock(object): assert priority not in self.__pending_shared self.__pending_shared[priority] = wait_condition + wait_start = self.__time_fn() + acquired = False + try: # Wait until we become the topmost acquire in the queue or the timeout # expires. - # TODO: Decrease timeout with spurious notifications - while not (self.__is_on_top(wait_condition) and - self.__can_acquire(shared)): - # Wait for notification - wait_condition.wait(timeout) - self.__check_deleted() + while True: + if self.__is_on_top(wait_condition) and self.__can_acquire(shared): + self.__do_acquire(shared) + acquired = True + break - # A lot of code assumes blocking acquires always succeed. Loop - # internally for that case. - if timeout is not None: + # A lot of code assumes blocking acquires always succeed, therefore we + # can never return False for a blocking acquire + if (timeout is not None and + utils.TimeoutExpired(wait_start, timeout, _time_fn=self.__time_fn)): break - if self.__is_on_top(wait_condition) and self.__can_acquire(shared): - self.__do_acquire(shared) - return True + # Wait for notification + wait_condition.wait(timeout) + self.__check_deleted() finally: # Remove condition from queue if there are no more waiters if not wait_condition.has_waiting(): @@ -709,7 +717,7 @@ class SharedLock(object): # (e.g. on lock deletion) self.__pending_shared.pop(priority, None) - return False + return acquired def acquire(self, shared=0, timeout=None, priority=None, test_notify=None): @@ -800,15 +808,30 @@ class SharedLock(object): self.__shr.remove(threading.currentThread()) # Notify topmost condition in queue - (priority, prioqueue) = self.__find_first_pending_queue() - if prioqueue: - cond = prioqueue[0] - cond.notifyAll() - if cond.shared: - # Prevent further shared acquires from sneaking in while waiters are - # notified - self.__pending_shared.pop(priority, None) + self.__notify_topmost() + finally: + self.__lock.release() + + def __notify_topmost(self): + """Notifies topmost condition in queue of pending acquires. + + """ + (priority, prioqueue) = self.__find_first_pending_queue() + if prioqueue: + cond = prioqueue[0] + cond.notifyAll() + if cond.shared: + # Prevent further shared acquires from sneaking in while waiters are + # notified + self.__pending_shared.pop(priority, None) + + def _notify_topmost(self): + """Exported version of L{__notify_topmost}. + """ + self.__lock.acquire() + try: + return self.__notify_topmost() finally: self.__lock.release() diff --git a/test/ganeti.locking_unittest.py b/test/ganeti.locking_unittest.py index c8a6807fc..e48e74faf 100755 --- a/test/ganeti.locking_unittest.py +++ b/test/ganeti.locking_unittest.py @@ -953,6 +953,84 @@ class TestSharedLock(_ThreadedTestCase): for i in sorted(perprio.keys())] for (shared, _, threads) in acquires]) + class _FakeTimeForSpuriousNotifications: + def __init__(self, now, check_end): + self.now = now + self.check_end = check_end + + # Deterministic random number generator + self.rnd = random.Random(15086) + + def time(self): + # Advance time if the random number generator thinks so (this is to test + # multiple notifications without advancing the time) + if self.rnd.random() < 0.3: + self.now += self.rnd.random() + + self.check_end(self.now) + + return self.now + + @_Repeat + def testAcquireTimeoutWithSpuriousNotifications(self): + ready = threading.Event() + locked = threading.Event() + req = Queue.Queue(0) + + epoch = 4000.0 + timeout = 60.0 + + def check_end(now): + self.assertFalse(locked.isSet()) + + # If we waited long enough (in virtual time), tell main thread to release + # lock, otherwise tell it to notify once more + req.put(now < (epoch + (timeout * 0.8))) + + time_fn = self._FakeTimeForSpuriousNotifications(epoch, check_end).time + + sl = locking.SharedLock("test", _time_fn=time_fn) + + # Acquire in exclusive mode + sl.acquire(shared=0) + + def fn(): + self.assertTrue(sl.acquire(shared=0, timeout=timeout, + test_notify=ready.set)) + locked.set() + sl.release() + self.done.put("success") + + # Start acquire with timeout and wait for it to be ready + self._addThread(target=fn) + ready.wait() + + # The separate thread is now waiting to acquire the lock, so start sending + # spurious notifications. + + # Wait for separate thread to ask for another notification + count = 0 + while req.get(): + # After sending the notification, the lock will take a short amount of + # time to notice and to retrieve the current time + sl._notify_topmost() + count += 1 + + self.assertTrue(count > 100, "Not enough notifications were sent") + + self.assertFalse(locked.isSet()) + + # Some notifications have been sent, now actually release the lock + sl.release() + + # Wait for lock to be acquired + locked.wait() + + self._waitThreads() + + self.assertEqual(self.done.get_nowait(), "success") + self.assertRaises(Queue.Empty, self.done.get_nowait) + class TestSharedLockInCondition(_ThreadedTestCase): """SharedLock as a condition lock tests""" -- GitLab