From 34cb561709059e6fee9c9f20e0c2a4c1049ad09b Mon Sep 17 00:00:00 2001 From: Guido Trotter <ultrotter@google.com> Date: Wed, 30 Sep 2009 19:37:17 +0100 Subject: [PATCH] SingleActionPipeCondition =~ s/Action/Notify/ With this patch we simplify usage on the SingleActionCondition (which wasn't a condition at all) by making it a real condition. This way we can just wait() on it, or notifyAll() as we would on a normal one. The only catch is that notifyAll can be called only once, and wait can only be called before notifyAll has, but luckily our PipeCondition, now quite simplified, takes care of this, by providing a new SingleActionCondition each time the previous one has been notified. No Start/StopUsing function are needed anymore, and thus the condition is a lot more robust, and there's no way file descriptors can be left open, as they are closed in a finally block, in the same function where they were opened, by the last thread exiting the class. Signed-off-by: Guido Trotter <ultrotter@google.com> Reviewed-by: Michael Hanselmann <hansmi@google.com> --- lib/locking.py | 198 ++++++++++++-------------------- test/ganeti.locking_unittest.py | 110 +++++------------- 2 files changed, 99 insertions(+), 209 deletions(-) diff --git a/lib/locking.py b/lib/locking.py index 1e14f6588..e99da845b 100644 --- a/lib/locking.py +++ b/lib/locking.py @@ -49,21 +49,18 @@ def ssynchronized(lock, shared=0): return wrap -class _SingleActionPipeConditionWaiter(object): - """Callable helper class for _SingleActionPipeCondition. +class _SingleNotifyPipeConditionWaiter(object): + """Helper class for SingleNotifyPipeCondition """ __slots__ = [ - "_cond", "_fd", "_poller", ] - def __init__(self, cond, poller, fd): - """Initializes this class. + def __init__(self, poller, fd): + """Constructor for _SingleNotifyPipeConditionWaiter - @type cond: L{_SingleActionPipeCondition} - @param cond: Parent condition @type poller: select.poll @param poller: Poller object @type fd: int @@ -71,8 +68,6 @@ class _SingleActionPipeConditionWaiter(object): """ object.__init__(self) - - self._cond = cond self._poller = poller self._fd = fd @@ -152,127 +147,98 @@ class _BaseCondition(object): raise RuntimeError("cannot work with un-aquired lock") -class _SingleActionPipeCondition(object): - """Wrapper around a pipe for usage inside conditions. - - This class contains a POSIX pipe(2) and a poller to poll it. The pipe is - always allocated when constructing the class. Extra care is taken to always - close the file descriptors. - - An additional class, L{_SingleActionPipeConditionWaiter}, is used to wait for - notifications. +class SingleNotifyPipeCondition(_BaseCondition): + """Condition which can only be notified once. - Warning: This class is designed to be used as the underlying component of a - locking condition, but is not by itself thread safe, and needs to be - protected by an external lock. + This condition class uses pipes and poll, internally, to be able to wait for + notification with a timeout, without resorting to polling. It is almost + compatible with Python's threading.Condition, with the following differences: + - notifyAll can only be called once, and no wait can happen after that + - notify is not supported, only notifyAll """ - __slots__ = [ + + __slots__ = _BaseCondition.__slots__ + [ "_poller", "_read_fd", "_write_fd", "_nwaiters", + "_notified", ] - _waiter_class = _SingleActionPipeConditionWaiter + _waiter_class = _SingleNotifyPipeConditionWaiter - def __init__(self): - """Initializes this class. + def __init__(self, lock): + """Constructor for SingleNotifyPipeCondition """ - object.__init__(self) - + _BaseCondition.__init__(self, lock) self._nwaiters = 0 + self._notified = False + self._read_fd = None + self._write_fd = None + self._poller = None - # Just assume the unpacking is successful, otherwise error handling gets - # very complicated. - (self._read_fd, self._write_fd) = os.pipe() - try: - # The poller looks for closure of the write side - poller = select.poll() - poller.register(self._read_fd, select.POLLHUP) - - self._poller = poller - except: - if self._read_fd is not None: - os.close(self._read_fd) - if self._write_fd is not None: - os.close(self._write_fd) - raise - - # There should be no code here anymore, otherwise the pipe file descriptors - # may be not be cleaned up properly in case of errors. - - def StartWaiting(self): - """Return function to wait for notification. + def _check_unnotified(self): + if self._notified: + raise RuntimeError("cannot use already notified condition") - @rtype: L{_SingleActionPipeConditionWaiter} - @return: Function to wait for notification + def _Cleanup(self): + """Cleanup open file descriptors, if any. """ - assert self._nwaiters >= 0 - - if self._poller is None: - raise RuntimeError("Already cleaned up") - - # Create waiter function and increase number of waiters - wait_fn = self._waiter_class(self, self._poller, self._read_fd) - self._nwaiters += 1 - return wait_fn + if self._read_fd is not None: + os.close(self._read_fd) + self._read_fd = None - def DoneWaiting(self): - """Decrement number of waiters and automatic cleanup. + if self._write_fd is not None: + os.close(self._write_fd) + self._write_fd = None + self._poller = None - Must be called after waiting for a notification. + def wait(self, timeout=None): + """Wait for a notification. - @rtype: bool - @return: Whether this was the last waiter + @type timeout: float or None + @param timeout: Waiting timeout (can be None) """ - assert self._nwaiters > 0 - - self._nwaiters -= 1 + self._check_owned() + self._check_unnotified() - if self._nwaiters == 0: - self._Cleanup() - return True + self._nwaiters += 1 + try: + if self._poller is None: + (self._read_fd, self._write_fd) = os.pipe() + self._poller = select.poll() + self._poller.register(self._read_fd, select.POLLHUP) - return False + wait_fn = self._waiter_class(self._poller, self._read_fd) + self.release() + try: + # Wait for notification + wait_fn(timeout) + finally: + # Re-acquire lock + self.acquire() + finally: + self._nwaiters -= 1 + if self._nwaiters == 0: + self._Cleanup() def notifyAll(self): """Close the writing side of the pipe to notify all waiters. """ - if self._write_fd is None: - raise RuntimeError("Can only notify once") - - os.close(self._write_fd) - self._write_fd = None - - def _Cleanup(self): - """Close all file descriptors. - - """ - if self._read_fd is not None: - os.close(self._read_fd) - self._read_fd = None - + self._check_owned() + self._check_unnotified() + self._notified = True if self._write_fd is not None: os.close(self._write_fd) self._write_fd = None - self._poller = None - - def __del__(self): - """Called on object deletion. - - Ensure no file descriptors are left open. - """ - self._Cleanup() - - -class _PipeCondition(_BaseCondition): +class PipeCondition(_BaseCondition): """Group-only non-polling condition with counters. This condition class uses pipes and poll, internally, to be able to wait for @@ -284,10 +250,10 @@ class _PipeCondition(_BaseCondition): """ __slots__ = _BaseCondition.__slots__ + [ "_nwaiters", - "_pipe", + "_single_condition", ] - _pipe_class = _SingleActionPipeCondition + _single_condition_class = SingleNotifyPipeCondition def __init__(self, lock): """Initializes this class. @@ -295,7 +261,7 @@ class _PipeCondition(_BaseCondition): """ _BaseCondition.__init__(self, lock) self._nwaiters = 0 - self._pipe = None + self._single_condition = self._single_condition_class(self._lock) def wait(self, timeout=None): """Wait for a notification. @@ -306,32 +272,14 @@ class _PipeCondition(_BaseCondition): """ self._check_owned() - if not self._pipe: - self._pipe = self._pipe_class() - # Keep local reference to the pipe. It could be replaced by another thread # notifying while we're waiting. - pipe = self._pipe + my_condition = self._single_condition assert self._nwaiters >= 0 self._nwaiters += 1 try: - # Get function to wait on the pipe - wait_fn = pipe.StartWaiting() - try: - # Release lock while waiting - self.release() - try: - # Wait for notification - wait_fn(timeout) - finally: - # Re-acquire lock - self.acquire() - finally: - # Destroy pipe if this was the last waiter and the current pipe is - # still the same. The same pipe cannot be reused after cleanup. - if pipe.DoneWaiting() and pipe == self._pipe: - self._pipe = None + my_condition.wait(timeout) finally: assert self._nwaiters > 0 self._nwaiters -= 1 @@ -341,12 +289,8 @@ class _PipeCondition(_BaseCondition): """ self._check_owned() - - # Notify and forget pipe. A new one will be created on the next call to - # wait. - if self._pipe is not None: - self._pipe.notifyAll() - self._pipe = None + self._single_condition.notifyAll() + self._single_condition = self._single_condition_class(self._lock) def has_waiting(self): """Returns whether there are active waiters. @@ -387,7 +331,7 @@ class _CountingCondition(object): """Waits for the condition to be notified. @type timeout: float or None - @param timeout: Timeout in seconds + @param timeout: Waiting timeout (can be None) """ assert self._nwaiters >= 0 @@ -427,7 +371,7 @@ class SharedLock(object): "__shr", ] - __condition_class = _PipeCondition + __condition_class = PipeCondition def __init__(self): """Construct a new SharedLock. diff --git a/test/ganeti.locking_unittest.py b/test/ganeti.locking_unittest.py index 2300517db..259d46e10 100755 --- a/test/ganeti.locking_unittest.py +++ b/test/ganeti.locking_unittest.py @@ -114,11 +114,37 @@ class _ConditionTestCase(_ThreadedTestCase): self.assert_(not self.cond._is_owned()) +class TestSingleNotifyPipeCondition(_ConditionTestCase): + """SingleNotifyPipeCondition tests""" + + def setUp(self): + _ConditionTestCase.setUp(self, locking.SingleNotifyPipeCondition) + + def testAcquireRelease(self): + self._testAcquireRelease() + + def testNotification(self): + self._testNotification() + + def testWaitReuse(self): + self.cond.acquire() + self.cond.wait(0) + self.cond.wait(0.1) + self.cond.release() + + def testNoNotifyReuse(self): + self.cond.acquire() + self.cond.notifyAll() + self.assertRaises(RuntimeError, self.cond.wait) + self.assertRaises(RuntimeError, self.cond.notifyAll) + self.cond.release() + + class TestPipeCondition(_ConditionTestCase): - """_PipeCondition tests""" + """PipeCondition tests""" def setUp(self): - _ConditionTestCase.setUp(self, locking._PipeCondition) + _ConditionTestCase.setUp(self, locking.PipeCondition) def testAcquireRelease(self): self._testAcquireRelease() @@ -214,86 +240,6 @@ class TestPipeCondition(_ConditionTestCase): self.assertRaises(Queue.Empty, self.done.get_nowait) -class TestSingleActionPipeCondition(unittest.TestCase): - """_SingleActionPipeCondition tests""" - - def setUp(self): - self.cond = locking._SingleActionPipeCondition() - - def testInitialization(self): - self.assert_(self.cond._read_fd is not None) - self.assert_(self.cond._write_fd is not None) - self.assert_(self.cond._poller is not None) - self.assertEqual(self.cond._nwaiters, 0) - - def testUsageCount(self): - self.cond.StartWaiting() - self.assert_(self.cond._read_fd is not None) - self.assert_(self.cond._write_fd is not None) - self.assert_(self.cond._poller is not None) - self.assertEqual(self.cond._nwaiters, 1) - - # use again - self.cond.StartWaiting() - self.assertEqual(self.cond._nwaiters, 2) - - # there is more than one user - self.assert_(not self.cond.DoneWaiting()) - self.assert_(self.cond._read_fd is not None) - self.assert_(self.cond._write_fd is not None) - self.assert_(self.cond._poller is not None) - self.assertEqual(self.cond._nwaiters, 1) - - self.assert_(self.cond.DoneWaiting()) - self.assertEqual(self.cond._nwaiters, 0) - self.assert_(self.cond._read_fd is None) - self.assert_(self.cond._write_fd is None) - self.assert_(self.cond._poller is None) - - def testNotify(self): - wait1 = self.cond.StartWaiting() - wait2 = self.cond.StartWaiting() - - self.assert_(self.cond._read_fd is not None) - self.assert_(self.cond._write_fd is not None) - self.assert_(self.cond._poller is not None) - - self.cond.notifyAll() - - self.assert_(self.cond._read_fd is not None) - self.assert_(self.cond._write_fd is None) - self.assert_(self.cond._poller is not None) - - self.assert_(not self.cond.DoneWaiting()) - - self.assert_(self.cond._read_fd is not None) - self.assert_(self.cond._write_fd is None) - self.assert_(self.cond._poller is not None) - - self.assert_(self.cond.DoneWaiting()) - - self.assert_(self.cond._read_fd is None) - self.assert_(self.cond._write_fd is None) - self.assert_(self.cond._poller is None) - - def testReusage(self): - self.cond.StartWaiting() - self.assert_(self.cond._read_fd is not None) - self.assert_(self.cond._write_fd is not None) - self.assert_(self.cond._poller is not None) - - self.assert_(self.cond.DoneWaiting()) - - self.assertRaises(RuntimeError, self.cond.StartWaiting) - self.assert_(self.cond._read_fd is None) - self.assert_(self.cond._write_fd is None) - self.assert_(self.cond._poller is None) - - def testNotifyTwice(self): - self.cond.notifyAll() - self.assertRaises(RuntimeError, self.cond.notifyAll) - - class TestSharedLock(_ThreadedTestCase): """SharedLock tests""" -- GitLab