diff --git a/lib/locking.py b/lib/locking.py index e99da845bee76d81040a58324352dcd6163be75b..862e66d8681da7f3a7500f62d0fc7f280de687f3 100644 --- a/lib/locking.py +++ b/lib/locking.py @@ -536,7 +536,7 @@ class SharedLock(object): return False - def acquire(self, shared=0, timeout=None): + def acquire(self, shared=0, timeout=None, test_notify=None): """Acquire a shared lock. @type shared: int @@ -544,10 +544,16 @@ class SharedLock(object): exclusive lock will be acquired @type timeout: float @param timeout: maximum waiting time before giving up + @type test_notify: callable or None + @param test_notify: Special callback function for unittesting """ self.__lock.acquire() try: + # We already got the lock, notify now + if __debug__ and callable(test_notify): + test_notify() + return self.__acquire_unlocked(shared, timeout) finally: self.__lock.release() diff --git a/test/ganeti.locking_unittest.py b/test/ganeti.locking_unittest.py index 259d46e102f9781a52bfa925f5037dc7eddf48a1..33b915bf77a46dff4266afeefca306ffdcbd6e48 100755 --- a/test/ganeti.locking_unittest.py +++ b/test/ganeti.locking_unittest.py @@ -450,28 +450,34 @@ class TestSharedLock(_ThreadedTestCase): @_Repeat def testExclusiveAcquireTimeout(self): - def _LockExclusive(wait): - self.sl.acquire(shared=0) - self.done.put("A: start sleep") - time.sleep(wait) - self.done.put("A: end sleep") - self.sl.release() - for shared in [0, 1]: - # Start thread to hold lock for 20 ms - self._addThread(target=_LockExclusive, args=(20.0 / 1000.0, )) + on_queue = threading.Event() + release_exclusive = threading.Event() + + def _LockExclusive(): + self.sl.acquire(shared=0, test_notify=on_queue.set) + self.done.put("A: start wait") + release_exclusive.wait() + self.done.put("A: end wait") + self.sl.release() + + # Start thread to hold lock in exclusive mode + self._addThread(target=_LockExclusive) - # Wait for sleep to begin - self.assertEqual(self.done.get(), "A: start sleep") + # Wait for wait to begin + self.assertEqual(self.done.get(timeout=60), "A: start wait") + + # Wait up to 60s to get lock, but release exclusive lock as soon as we're + # on the queue + self.failUnless(self.sl.acquire(shared=shared, timeout=60, + test_notify=release_exclusive.set)) - # Wait up to 100 ms to get lock - self.failUnless(self.sl.acquire(shared=shared, timeout=0.1)) self.done.put("got 2nd") self.sl.release() self._waitThreads() - self.assertEqual(self.done.get_nowait(), "A: end sleep") + self.assertEqual(self.done.get_nowait(), "A: end wait") self.assertEqual(self.done.get_nowait(), "got 2nd") self.assertRaises(Queue.Empty, self.done.get_nowait) @@ -507,44 +513,52 @@ class TestSharedLock(_ThreadedTestCase): # Tests whether shared acquires jump in front of exclusive acquires in the # queue. - # Get exclusive lock while we fill the queue - self.sl.acquire() + def _Acquire(shared, name, notify_ev, wait_ev): + if notify_ev: + notify_fn = notify_ev.set + else: + notify_fn = None - def _Acquire(shared, name): - if not self.sl.acquire(shared=shared): + if wait_ev: + wait_ev.wait() + + if not self.sl.acquire(shared=shared, test_notify=notify_fn): return self.done.put(name) self.sl.release() - # Start shared acquires - for _ in xrange(5): - self._addThread(target=_Acquire, args=(1, "shared A")) + # Get exclusive lock while we fill the queue + self.sl.acquire() - # Start exclusive acquires - for _ in xrange(3): - self._addThread(target=_Acquire, args=(0, "exclusive B")) + shrcnt1 = 5 + shrcnt2 = 7 + shrcnt3 = 9 + shrcnt4 = 2 - # More shared acquires - for _ in xrange(5): - self._addThread(target=_Acquire, args=(1, "shared C")) + # Add acquires using threading.Event for synchronization. They'll be + # acquired exactly in the order defined in this list. + acquires = (shrcnt1 * [(1, "shared 1")] + + 3 * [(0, "exclusive 1")] + + shrcnt2 * [(1, "shared 2")] + + shrcnt3 * [(1, "shared 3")] + + shrcnt4 * [(1, "shared 4")] + + 3 * [(0, "exclusive 2")]) - # More exclusive acquires - for _ in xrange(3): - self._addThread(target=_Acquire, args=(0, "exclusive D")) + ev_cur = None + ev_prev = None + + for args in acquires: + ev_cur = threading.Event() + self._addThread(target=_Acquire, args=args + (ev_cur, ev_prev)) + ev_prev = ev_cur + + # Wait for last acquire to start + ev_prev.wait() # Expect 6 pending exclusive acquires and 1 for all shared acquires - # together. There's no way to wait for SharedLock.acquire to start - # its work. Hence the timeout of 2 seconds. - pending = 0 - end_time = time.time() + 2.0 - while time.time() < end_time: - pending = self.sl._count_pending() - self.assert_(pending >= 0 and pending <= 7) - if pending == 7: - break - time.sleep(0.05) - self.assertEqual(pending, 7) + # together + self.assertEqual(self.sl._count_pending(), 7) # Release exclusive lock and wait self.sl.release() @@ -552,24 +566,28 @@ class TestSharedLock(_ThreadedTestCase): self._waitThreads() # Check sequence - shr_a = 0 - shr_c = 0 - for _ in xrange(10): + for _ in range(shrcnt1 + shrcnt2 + shrcnt3 + shrcnt4): # Shared locks aren't guaranteed to be notified in order, but they'll be # first tmp = self.done.get_nowait() - if tmp == "shared A": - shr_a += 1 - elif tmp == "shared C": - shr_c += 1 - self.assertEqual(shr_a, 5) - self.assertEqual(shr_c, 5) + if tmp == "shared 1": + shrcnt1 -= 1 + elif tmp == "shared 2": + shrcnt2 -= 1 + elif tmp == "shared 3": + shrcnt3 -= 1 + elif tmp == "shared 4": + shrcnt4 -= 1 + self.assertEqual(shrcnt1, 0) + self.assertEqual(shrcnt2, 0) + self.assertEqual(shrcnt3, 0) + self.assertEqual(shrcnt3, 0) for _ in xrange(3): - self.assertEqual(self.done.get_nowait(), "exclusive B") + self.assertEqual(self.done.get_nowait(), "exclusive 1") for _ in xrange(3): - self.assertEqual(self.done.get_nowait(), "exclusive D") + self.assertEqual(self.done.get_nowait(), "exclusive 2") self.assertRaises(Queue.Empty, self.done.get_nowait)