Skip to content
Snippets Groups Projects
Commit 34cb5617 authored by Guido Trotter's avatar Guido Trotter
Browse files

SingleActionPipeCondition =~ s/Action/Notify/


With this patch we simplify usage on the SingleActionCondition (which
wasn't a condition at all) by making it a real condition. This way we
can just wait() on it, or notifyAll() as we would on a normal one. The
only catch is that notifyAll can be called only once, and wait can only
be called before notifyAll has, but luckily our PipeCondition, now quite
simplified, takes care of this, by providing a new SingleActionCondition
each time the previous one has been notified.

No Start/StopUsing function are needed anymore, and thus the condition
is a lot more robust, and there's no way file descriptors can be left
open, as they are closed in a finally block, in the same function where
they were opened, by the last thread exiting the class.

Signed-off-by: default avatarGuido Trotter <ultrotter@google.com>
Reviewed-by: default avatarMichael Hanselmann <hansmi@google.com>
parent b8140229
No related branches found
No related tags found
No related merge requests found
......@@ -49,21 +49,18 @@ def ssynchronized(lock, shared=0):
return wrap
class _SingleActionPipeConditionWaiter(object):
"""Callable helper class for _SingleActionPipeCondition.
class _SingleNotifyPipeConditionWaiter(object):
"""Helper class for SingleNotifyPipeCondition
"""
__slots__ = [
"_cond",
"_fd",
"_poller",
]
def __init__(self, cond, poller, fd):
"""Initializes this class.
def __init__(self, poller, fd):
"""Constructor for _SingleNotifyPipeConditionWaiter
@type cond: L{_SingleActionPipeCondition}
@param cond: Parent condition
@type poller: select.poll
@param poller: Poller object
@type fd: int
......@@ -71,8 +68,6 @@ class _SingleActionPipeConditionWaiter(object):
"""
object.__init__(self)
self._cond = cond
self._poller = poller
self._fd = fd
......@@ -152,127 +147,98 @@ class _BaseCondition(object):
raise RuntimeError("cannot work with un-aquired lock")
class _SingleActionPipeCondition(object):
"""Wrapper around a pipe for usage inside conditions.
This class contains a POSIX pipe(2) and a poller to poll it. The pipe is
always allocated when constructing the class. Extra care is taken to always
close the file descriptors.
An additional class, L{_SingleActionPipeConditionWaiter}, is used to wait for
notifications.
class SingleNotifyPipeCondition(_BaseCondition):
"""Condition which can only be notified once.
Warning: This class is designed to be used as the underlying component of a
locking condition, but is not by itself thread safe, and needs to be
protected by an external lock.
This condition class uses pipes and poll, internally, to be able to wait for
notification with a timeout, without resorting to polling. It is almost
compatible with Python's threading.Condition, with the following differences:
- notifyAll can only be called once, and no wait can happen after that
- notify is not supported, only notifyAll
"""
__slots__ = [
__slots__ = _BaseCondition.__slots__ + [
"_poller",
"_read_fd",
"_write_fd",
"_nwaiters",
"_notified",
]
_waiter_class = _SingleActionPipeConditionWaiter
_waiter_class = _SingleNotifyPipeConditionWaiter
def __init__(self):
"""Initializes this class.
def __init__(self, lock):
"""Constructor for SingleNotifyPipeCondition
"""
object.__init__(self)
_BaseCondition.__init__(self, lock)
self._nwaiters = 0
self._notified = False
self._read_fd = None
self._write_fd = None
self._poller = None
# Just assume the unpacking is successful, otherwise error handling gets
# very complicated.
(self._read_fd, self._write_fd) = os.pipe()
try:
# The poller looks for closure of the write side
poller = select.poll()
poller.register(self._read_fd, select.POLLHUP)
self._poller = poller
except:
if self._read_fd is not None:
os.close(self._read_fd)
if self._write_fd is not None:
os.close(self._write_fd)
raise
# There should be no code here anymore, otherwise the pipe file descriptors
# may be not be cleaned up properly in case of errors.
def StartWaiting(self):
"""Return function to wait for notification.
def _check_unnotified(self):
if self._notified:
raise RuntimeError("cannot use already notified condition")
@rtype: L{_SingleActionPipeConditionWaiter}
@return: Function to wait for notification
def _Cleanup(self):
"""Cleanup open file descriptors, if any.
"""
assert self._nwaiters >= 0
if self._poller is None:
raise RuntimeError("Already cleaned up")
# Create waiter function and increase number of waiters
wait_fn = self._waiter_class(self, self._poller, self._read_fd)
self._nwaiters += 1
return wait_fn
if self._read_fd is not None:
os.close(self._read_fd)
self._read_fd = None
def DoneWaiting(self):
"""Decrement number of waiters and automatic cleanup.
if self._write_fd is not None:
os.close(self._write_fd)
self._write_fd = None
self._poller = None
Must be called after waiting for a notification.
def wait(self, timeout=None):
"""Wait for a notification.
@rtype: bool
@return: Whether this was the last waiter
@type timeout: float or None
@param timeout: Waiting timeout (can be None)
"""
assert self._nwaiters > 0
self._nwaiters -= 1
self._check_owned()
self._check_unnotified()
if self._nwaiters == 0:
self._Cleanup()
return True
self._nwaiters += 1
try:
if self._poller is None:
(self._read_fd, self._write_fd) = os.pipe()
self._poller = select.poll()
self._poller.register(self._read_fd, select.POLLHUP)
return False
wait_fn = self._waiter_class(self._poller, self._read_fd)
self.release()
try:
# Wait for notification
wait_fn(timeout)
finally:
# Re-acquire lock
self.acquire()
finally:
self._nwaiters -= 1
if self._nwaiters == 0:
self._Cleanup()
def notifyAll(self):
"""Close the writing side of the pipe to notify all waiters.
"""
if self._write_fd is None:
raise RuntimeError("Can only notify once")
os.close(self._write_fd)
self._write_fd = None
def _Cleanup(self):
"""Close all file descriptors.
"""
if self._read_fd is not None:
os.close(self._read_fd)
self._read_fd = None
self._check_owned()
self._check_unnotified()
self._notified = True
if self._write_fd is not None:
os.close(self._write_fd)
self._write_fd = None
self._poller = None
def __del__(self):
"""Called on object deletion.
Ensure no file descriptors are left open.
"""
self._Cleanup()
class _PipeCondition(_BaseCondition):
class PipeCondition(_BaseCondition):
"""Group-only non-polling condition with counters.
This condition class uses pipes and poll, internally, to be able to wait for
......@@ -284,10 +250,10 @@ class _PipeCondition(_BaseCondition):
"""
__slots__ = _BaseCondition.__slots__ + [
"_nwaiters",
"_pipe",
"_single_condition",
]
_pipe_class = _SingleActionPipeCondition
_single_condition_class = SingleNotifyPipeCondition
def __init__(self, lock):
"""Initializes this class.
......@@ -295,7 +261,7 @@ class _PipeCondition(_BaseCondition):
"""
_BaseCondition.__init__(self, lock)
self._nwaiters = 0
self._pipe = None
self._single_condition = self._single_condition_class(self._lock)
def wait(self, timeout=None):
"""Wait for a notification.
......@@ -306,32 +272,14 @@ class _PipeCondition(_BaseCondition):
"""
self._check_owned()
if not self._pipe:
self._pipe = self._pipe_class()
# Keep local reference to the pipe. It could be replaced by another thread
# notifying while we're waiting.
pipe = self._pipe
my_condition = self._single_condition
assert self._nwaiters >= 0
self._nwaiters += 1
try:
# Get function to wait on the pipe
wait_fn = pipe.StartWaiting()
try:
# Release lock while waiting
self.release()
try:
# Wait for notification
wait_fn(timeout)
finally:
# Re-acquire lock
self.acquire()
finally:
# Destroy pipe if this was the last waiter and the current pipe is
# still the same. The same pipe cannot be reused after cleanup.
if pipe.DoneWaiting() and pipe == self._pipe:
self._pipe = None
my_condition.wait(timeout)
finally:
assert self._nwaiters > 0
self._nwaiters -= 1
......@@ -341,12 +289,8 @@ class _PipeCondition(_BaseCondition):
"""
self._check_owned()
# Notify and forget pipe. A new one will be created on the next call to
# wait.
if self._pipe is not None:
self._pipe.notifyAll()
self._pipe = None
self._single_condition.notifyAll()
self._single_condition = self._single_condition_class(self._lock)
def has_waiting(self):
"""Returns whether there are active waiters.
......@@ -387,7 +331,7 @@ class _CountingCondition(object):
"""Waits for the condition to be notified.
@type timeout: float or None
@param timeout: Timeout in seconds
@param timeout: Waiting timeout (can be None)
"""
assert self._nwaiters >= 0
......@@ -427,7 +371,7 @@ class SharedLock(object):
"__shr",
]
__condition_class = _PipeCondition
__condition_class = PipeCondition
def __init__(self):
"""Construct a new SharedLock.
......
......@@ -114,11 +114,37 @@ class _ConditionTestCase(_ThreadedTestCase):
self.assert_(not self.cond._is_owned())
class TestSingleNotifyPipeCondition(_ConditionTestCase):
"""SingleNotifyPipeCondition tests"""
def setUp(self):
_ConditionTestCase.setUp(self, locking.SingleNotifyPipeCondition)
def testAcquireRelease(self):
self._testAcquireRelease()
def testNotification(self):
self._testNotification()
def testWaitReuse(self):
self.cond.acquire()
self.cond.wait(0)
self.cond.wait(0.1)
self.cond.release()
def testNoNotifyReuse(self):
self.cond.acquire()
self.cond.notifyAll()
self.assertRaises(RuntimeError, self.cond.wait)
self.assertRaises(RuntimeError, self.cond.notifyAll)
self.cond.release()
class TestPipeCondition(_ConditionTestCase):
"""_PipeCondition tests"""
"""PipeCondition tests"""
def setUp(self):
_ConditionTestCase.setUp(self, locking._PipeCondition)
_ConditionTestCase.setUp(self, locking.PipeCondition)
def testAcquireRelease(self):
self._testAcquireRelease()
......@@ -214,86 +240,6 @@ class TestPipeCondition(_ConditionTestCase):
self.assertRaises(Queue.Empty, self.done.get_nowait)
class TestSingleActionPipeCondition(unittest.TestCase):
"""_SingleActionPipeCondition tests"""
def setUp(self):
self.cond = locking._SingleActionPipeCondition()
def testInitialization(self):
self.assert_(self.cond._read_fd is not None)
self.assert_(self.cond._write_fd is not None)
self.assert_(self.cond._poller is not None)
self.assertEqual(self.cond._nwaiters, 0)
def testUsageCount(self):
self.cond.StartWaiting()
self.assert_(self.cond._read_fd is not None)
self.assert_(self.cond._write_fd is not None)
self.assert_(self.cond._poller is not None)
self.assertEqual(self.cond._nwaiters, 1)
# use again
self.cond.StartWaiting()
self.assertEqual(self.cond._nwaiters, 2)
# there is more than one user
self.assert_(not self.cond.DoneWaiting())
self.assert_(self.cond._read_fd is not None)
self.assert_(self.cond._write_fd is not None)
self.assert_(self.cond._poller is not None)
self.assertEqual(self.cond._nwaiters, 1)
self.assert_(self.cond.DoneWaiting())
self.assertEqual(self.cond._nwaiters, 0)
self.assert_(self.cond._read_fd is None)
self.assert_(self.cond._write_fd is None)
self.assert_(self.cond._poller is None)
def testNotify(self):
wait1 = self.cond.StartWaiting()
wait2 = self.cond.StartWaiting()
self.assert_(self.cond._read_fd is not None)
self.assert_(self.cond._write_fd is not None)
self.assert_(self.cond._poller is not None)
self.cond.notifyAll()
self.assert_(self.cond._read_fd is not None)
self.assert_(self.cond._write_fd is None)
self.assert_(self.cond._poller is not None)
self.assert_(not self.cond.DoneWaiting())
self.assert_(self.cond._read_fd is not None)
self.assert_(self.cond._write_fd is None)
self.assert_(self.cond._poller is not None)
self.assert_(self.cond.DoneWaiting())
self.assert_(self.cond._read_fd is None)
self.assert_(self.cond._write_fd is None)
self.assert_(self.cond._poller is None)
def testReusage(self):
self.cond.StartWaiting()
self.assert_(self.cond._read_fd is not None)
self.assert_(self.cond._write_fd is not None)
self.assert_(self.cond._poller is not None)
self.assert_(self.cond.DoneWaiting())
self.assertRaises(RuntimeError, self.cond.StartWaiting)
self.assert_(self.cond._read_fd is None)
self.assert_(self.cond._write_fd is None)
self.assert_(self.cond._poller is None)
def testNotifyTwice(self):
self.cond.notifyAll()
self.assertRaises(RuntimeError, self.cond.notifyAll)
class TestSharedLock(_ThreadedTestCase):
"""SharedLock tests"""
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment