Commit 48dabc6a authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

Add _PipeCondition class



_PipeCondition is a condition implemented using pipe(2) and poll(2).
It allows the implementation of timeouts without using a busy-wait loop
with time.sleep.

Unlike Python's built-in threading.Condition class and to save file
descriptors and an internal queue, it can only be used to notify
all waiters. Ganeti's use case for this condition class doesn't
require the ability to notify only one waiter.
Signed-off-by: default avatarGuido Trotter <ultrotter@google.com>
Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarGuido Trotter <ultrotter@google.com>
parent d76167a5
......@@ -223,6 +223,122 @@ class _SingleActionPipeCondition(object):
self._Cleanup()
class _PipeCondition(object):
"""Group-only non-polling condition with counters.
This condition class uses pipes and poll, internally, to be able to wait for
notification with a timeout, without resorting to polling. It is almost
compatible with Python's threading.Condition, but only supports notifyAll and
non-recursive locks. As an additional features it's able to report whether
there are any waiting threads.
"""
__slots__ = [
"_lock",
"_nwaiters",
"_pipe",
"acquire",
"release",
]
_pipe_class = _SingleActionPipeCondition
def __init__(self, lock):
"""Initializes this class.
"""
object.__init__(self)
# Recursive locks are not supported
assert not hasattr(lock, "_acquire_restore")
assert not hasattr(lock, "_release_save")
self._lock = lock
# Export the lock's acquire() and release() methods
self.acquire = lock.acquire
self.release = lock.release
self._nwaiters = 0
self._pipe = None
def _is_owned(self):
"""Check whether lock is owned by current thread.
"""
if self._lock.acquire(0):
self._lock.release()
return False
return True
def _check_owned(self):
"""Raise an exception if the current thread doesn't own the lock.
"""
if not self._is_owned():
raise RuntimeError("cannot work with un-aquired lock")
def wait(self, timeout=None):
"""Wait for a notification.
@type timeout: float or None
@param timeout: Waiting timeout (can be None)
"""
self._check_owned()
if not self._pipe:
self._pipe = self._pipe_class()
# Keep local reference to the pipe. It could be replaced by another thread
# notifying while we're waiting.
pipe = self._pipe
assert self._nwaiters >= 0
self._nwaiters += 1
try:
# Get function to wait on the pipe
wait_fn = pipe.StartWaiting()
try:
# Release lock while waiting
self.release()
try:
# Wait for notification
wait_fn(timeout)
finally:
# Re-acquire lock
self.acquire()
finally:
# Destroy pipe if this was the last waiter and the current pipe is
# still the same. The same pipe cannot be reused after cleanup.
if pipe.DoneWaiting() and pipe == self._pipe:
self._pipe = None
finally:
assert self._nwaiters > 0
self._nwaiters -= 1
def notifyAll(self):
"""Notify all currently waiting threads.
"""
self._check_owned()
# Notify and forget pipe. A new one will be created on the next call to
# wait.
if self._pipe is not None:
self._pipe.notifyAll()
self._pipe = None
def has_waiting(self):
"""Returns whether there are active waiters.
"""
self._check_owned()
return bool(self._nwaiters)
class _CountingCondition(object):
"""Wrapper for Python's built-in threading.Condition class.
......
......@@ -69,6 +69,131 @@ class _ThreadedTestCase(unittest.TestCase):
self.threads = []
class TestPipeCondition(_ThreadedTestCase):
"""_PipeCondition tests"""
def setUp(self):
_ThreadedTestCase.setUp(self)
self.lock = threading.Lock()
self.cond = locking._PipeCondition(self.lock)
self.done = Queue.Queue(0)
def testAcquireRelease(self):
self.assert_(not self.cond._is_owned())
self.assertRaises(RuntimeError, self.cond.wait)
self.assertRaises(RuntimeError, self.cond.notifyAll)
self.cond.acquire()
self.assert_(self.cond._is_owned())
self.cond.notifyAll()
self.assert_(self.cond._is_owned())
self.cond.release()
self.assert_(not self.cond._is_owned())
self.assertRaises(RuntimeError, self.cond.wait)
self.assertRaises(RuntimeError, self.cond.notifyAll)
def testNotification(self):
def _NotifyAll():
self.cond.acquire()
self.cond.notifyAll()
self.cond.release()
self.cond.acquire()
self._addThread(target=_NotifyAll)
self.cond.wait()
self.assert_(self.cond._is_owned())
self.cond.release()
self.assert_(not self.cond._is_owned())
def _TestWait(self, fn):
self._addThread(target=fn)
self._addThread(target=fn)
self._addThread(target=fn)
# Wait for threads to be waiting
self.assertEqual(self.done.get(True, 1), "A")
self.assertEqual(self.done.get(True, 1), "A")
self.assertEqual(self.done.get(True, 1), "A")
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.cond.acquire()
self.assertEqual(self.cond._nwaiters, 3)
# This new thread can"t acquire the lock, and thus call wait, before we
# release it
self._addThread(target=fn)
self.cond.notifyAll()
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.cond.release()
# We should now get 3 W and 1 A (for the new thread) in whatever order
w = 0
a = 0
for i in range(4):
got = self.done.get(True, 1)
if got == "W":
w += 1
elif got == "A":
a += 1
else:
self.fail("Got %s on the done queue" % got)
self.assertEqual(w, 3)
self.assertEqual(a, 1)
self.cond.acquire()
self.cond.notifyAll()
self.cond.release()
self._waitThreads()
self.assertEqual(self.done.get_nowait(), "W")
self.assertRaises(Queue.Empty, self.done.get_nowait)
def testBlockingWait(self):
def _BlockingWait():
self.cond.acquire()
self.done.put("A")
self.cond.wait()
self.cond.release()
self.done.put("W")
self._TestWait(_BlockingWait)
def testLongTimeoutWait(self):
def _Helper():
self.cond.acquire()
self.done.put("A")
self.cond.wait(15.0)
self.cond.release()
self.done.put("W")
self._TestWait(_Helper)
def _TimeoutWait(self, timeout, check):
self.cond.acquire()
self.cond.wait(timeout)
self.cond.release()
self.done.put(check)
def testShortTimeoutWait(self):
self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
self._waitThreads()
self.assertEqual(self.done.get_nowait(), "T1")
self.assertEqual(self.done.get_nowait(), "T1")
self.assertRaises(Queue.Empty, self.done.get_nowait)
def testZeroTimeoutWait(self):
self._addThread(target=self._TimeoutWait, args=(0, "T0"))
self._addThread(target=self._TimeoutWait, args=(0, "T0"))
self._addThread(target=self._TimeoutWait, args=(0, "T0"))
self._waitThreads()
self.assertEqual(self.done.get_nowait(), "T0")
self.assertEqual(self.done.get_nowait(), "T0")
self.assertEqual(self.done.get_nowait(), "T0")
self.assertRaises(Queue.Empty, self.done.get_nowait)
class TestSingleActionPipeCondition(unittest.TestCase):
"""_SingleActionPipeCondition tests"""
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment