Commit 008b92fa authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

More locking tests race conditions fixes



There were more race conditions. By adding a notify function to
SharedLock.acquire we can prevent them.
Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarIustin Pop <iustin@google.com>
parent 9648f1b4
......@@ -536,7 +536,7 @@ class SharedLock(object):
return False
def acquire(self, shared=0, timeout=None):
def acquire(self, shared=0, timeout=None, test_notify=None):
"""Acquire a shared lock.
@type shared: int
......@@ -544,10 +544,16 @@ class SharedLock(object):
exclusive lock will be acquired
@type timeout: float
@param timeout: maximum waiting time before giving up
@type test_notify: callable or None
@param test_notify: Special callback function for unittesting
"""
self.__lock.acquire()
try:
# We already got the lock, notify now
if __debug__ and callable(test_notify):
test_notify()
return self.__acquire_unlocked(shared, timeout)
finally:
self.__lock.release()
......
......@@ -450,28 +450,34 @@ class TestSharedLock(_ThreadedTestCase):
@_Repeat
def testExclusiveAcquireTimeout(self):
def _LockExclusive(wait):
self.sl.acquire(shared=0)
self.done.put("A: start sleep")
time.sleep(wait)
self.done.put("A: end sleep")
self.sl.release()
for shared in [0, 1]:
# Start thread to hold lock for 20 ms
self._addThread(target=_LockExclusive, args=(20.0 / 1000.0, ))
on_queue = threading.Event()
release_exclusive = threading.Event()
def _LockExclusive():
self.sl.acquire(shared=0, test_notify=on_queue.set)
self.done.put("A: start wait")
release_exclusive.wait()
self.done.put("A: end wait")
self.sl.release()
# Start thread to hold lock in exclusive mode
self._addThread(target=_LockExclusive)
# Wait for sleep to begin
self.assertEqual(self.done.get(), "A: start sleep")
# Wait for wait to begin
self.assertEqual(self.done.get(timeout=60), "A: start wait")
# Wait up to 60s to get lock, but release exclusive lock as soon as we're
# on the queue
self.failUnless(self.sl.acquire(shared=shared, timeout=60,
test_notify=release_exclusive.set))
# Wait up to 100 ms to get lock
self.failUnless(self.sl.acquire(shared=shared, timeout=0.1))
self.done.put("got 2nd")
self.sl.release()
self._waitThreads()
self.assertEqual(self.done.get_nowait(), "A: end sleep")
self.assertEqual(self.done.get_nowait(), "A: end wait")
self.assertEqual(self.done.get_nowait(), "got 2nd")
self.assertRaises(Queue.Empty, self.done.get_nowait)
......@@ -507,44 +513,52 @@ class TestSharedLock(_ThreadedTestCase):
# Tests whether shared acquires jump in front of exclusive acquires in the
# queue.
# Get exclusive lock while we fill the queue
self.sl.acquire()
def _Acquire(shared, name, notify_ev, wait_ev):
if notify_ev:
notify_fn = notify_ev.set
else:
notify_fn = None
def _Acquire(shared, name):
if not self.sl.acquire(shared=shared):
if wait_ev:
wait_ev.wait()
if not self.sl.acquire(shared=shared, test_notify=notify_fn):
return
self.done.put(name)
self.sl.release()
# Start shared acquires
for _ in xrange(5):
self._addThread(target=_Acquire, args=(1, "shared A"))
# Get exclusive lock while we fill the queue
self.sl.acquire()
# Start exclusive acquires
for _ in xrange(3):
self._addThread(target=_Acquire, args=(0, "exclusive B"))
shrcnt1 = 5
shrcnt2 = 7
shrcnt3 = 9
shrcnt4 = 2
# More shared acquires
for _ in xrange(5):
self._addThread(target=_Acquire, args=(1, "shared C"))
# Add acquires using threading.Event for synchronization. They'll be
# acquired exactly in the order defined in this list.
acquires = (shrcnt1 * [(1, "shared 1")] +
3 * [(0, "exclusive 1")] +
shrcnt2 * [(1, "shared 2")] +
shrcnt3 * [(1, "shared 3")] +
shrcnt4 * [(1, "shared 4")] +
3 * [(0, "exclusive 2")])
# More exclusive acquires
for _ in xrange(3):
self._addThread(target=_Acquire, args=(0, "exclusive D"))
ev_cur = None
ev_prev = None
for args in acquires:
ev_cur = threading.Event()
self._addThread(target=_Acquire, args=args + (ev_cur, ev_prev))
ev_prev = ev_cur
# Wait for last acquire to start
ev_prev.wait()
# Expect 6 pending exclusive acquires and 1 for all shared acquires
# together. There's no way to wait for SharedLock.acquire to start
# its work. Hence the timeout of 2 seconds.
pending = 0
end_time = time.time() + 2.0
while time.time() < end_time:
pending = self.sl._count_pending()
self.assert_(pending >= 0 and pending <= 7)
if pending == 7:
break
time.sleep(0.05)
self.assertEqual(pending, 7)
# together
self.assertEqual(self.sl._count_pending(), 7)
# Release exclusive lock and wait
self.sl.release()
......@@ -552,24 +566,28 @@ class TestSharedLock(_ThreadedTestCase):
self._waitThreads()
# Check sequence
shr_a = 0
shr_c = 0
for _ in xrange(10):
for _ in range(shrcnt1 + shrcnt2 + shrcnt3 + shrcnt4):
# Shared locks aren't guaranteed to be notified in order, but they'll be
# first
tmp = self.done.get_nowait()
if tmp == "shared A":
shr_a += 1
elif tmp == "shared C":
shr_c += 1
self.assertEqual(shr_a, 5)
self.assertEqual(shr_c, 5)
if tmp == "shared 1":
shrcnt1 -= 1
elif tmp == "shared 2":
shrcnt2 -= 1
elif tmp == "shared 3":
shrcnt3 -= 1
elif tmp == "shared 4":
shrcnt4 -= 1
self.assertEqual(shrcnt1, 0)
self.assertEqual(shrcnt2, 0)
self.assertEqual(shrcnt3, 0)
self.assertEqual(shrcnt3, 0)
for _ in xrange(3):
self.assertEqual(self.done.get_nowait(), "exclusive B")
self.assertEqual(self.done.get_nowait(), "exclusive 1")
for _ in xrange(3):
self.assertEqual(self.done.get_nowait(), "exclusive D")
self.assertEqual(self.done.get_nowait(), "exclusive 2")
self.assertRaises(Queue.Empty, self.done.get_nowait)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment