diff --git a/lib/locking.py b/lib/locking.py index 1e14f658893d813f3206f97ab77ec2bc98965ffb..e99da845bee76d81040a58324352dcd6163be75b 100644 --- a/lib/locking.py +++ b/lib/locking.py @@ -49,21 +49,18 @@ def ssynchronized(lock, shared=0): return wrap -class _SingleActionPipeConditionWaiter(object): - """Callable helper class for _SingleActionPipeCondition. +class _SingleNotifyPipeConditionWaiter(object): + """Helper class for SingleNotifyPipeCondition """ __slots__ = [ - "_cond", "_fd", "_poller", ] - def __init__(self, cond, poller, fd): - """Initializes this class. + def __init__(self, poller, fd): + """Constructor for _SingleNotifyPipeConditionWaiter - @type cond: L{_SingleActionPipeCondition} - @param cond: Parent condition @type poller: select.poll @param poller: Poller object @type fd: int @@ -71,8 +68,6 @@ class _SingleActionPipeConditionWaiter(object): """ object.__init__(self) - - self._cond = cond self._poller = poller self._fd = fd @@ -152,127 +147,98 @@ class _BaseCondition(object): raise RuntimeError("cannot work with un-aquired lock") -class _SingleActionPipeCondition(object): - """Wrapper around a pipe for usage inside conditions. - - This class contains a POSIX pipe(2) and a poller to poll it. The pipe is - always allocated when constructing the class. Extra care is taken to always - close the file descriptors. - - An additional class, L{_SingleActionPipeConditionWaiter}, is used to wait for - notifications. +class SingleNotifyPipeCondition(_BaseCondition): + """Condition which can only be notified once. - Warning: This class is designed to be used as the underlying component of a - locking condition, but is not by itself thread safe, and needs to be - protected by an external lock. + This condition class uses pipes and poll, internally, to be able to wait for + notification with a timeout, without resorting to polling. It is almost + compatible with Python's threading.Condition, with the following differences: + - notifyAll can only be called once, and no wait can happen after that + - notify is not supported, only notifyAll """ - __slots__ = [ + + __slots__ = _BaseCondition.__slots__ + [ "_poller", "_read_fd", "_write_fd", "_nwaiters", + "_notified", ] - _waiter_class = _SingleActionPipeConditionWaiter + _waiter_class = _SingleNotifyPipeConditionWaiter - def __init__(self): - """Initializes this class. + def __init__(self, lock): + """Constructor for SingleNotifyPipeCondition """ - object.__init__(self) - + _BaseCondition.__init__(self, lock) self._nwaiters = 0 + self._notified = False + self._read_fd = None + self._write_fd = None + self._poller = None - # Just assume the unpacking is successful, otherwise error handling gets - # very complicated. - (self._read_fd, self._write_fd) = os.pipe() - try: - # The poller looks for closure of the write side - poller = select.poll() - poller.register(self._read_fd, select.POLLHUP) - - self._poller = poller - except: - if self._read_fd is not None: - os.close(self._read_fd) - if self._write_fd is not None: - os.close(self._write_fd) - raise - - # There should be no code here anymore, otherwise the pipe file descriptors - # may be not be cleaned up properly in case of errors. - - def StartWaiting(self): - """Return function to wait for notification. + def _check_unnotified(self): + if self._notified: + raise RuntimeError("cannot use already notified condition") - @rtype: L{_SingleActionPipeConditionWaiter} - @return: Function to wait for notification + def _Cleanup(self): + """Cleanup open file descriptors, if any. """ - assert self._nwaiters >= 0 - - if self._poller is None: - raise RuntimeError("Already cleaned up") - - # Create waiter function and increase number of waiters - wait_fn = self._waiter_class(self, self._poller, self._read_fd) - self._nwaiters += 1 - return wait_fn + if self._read_fd is not None: + os.close(self._read_fd) + self._read_fd = None - def DoneWaiting(self): - """Decrement number of waiters and automatic cleanup. + if self._write_fd is not None: + os.close(self._write_fd) + self._write_fd = None + self._poller = None - Must be called after waiting for a notification. + def wait(self, timeout=None): + """Wait for a notification. - @rtype: bool - @return: Whether this was the last waiter + @type timeout: float or None + @param timeout: Waiting timeout (can be None) """ - assert self._nwaiters > 0 - - self._nwaiters -= 1 + self._check_owned() + self._check_unnotified() - if self._nwaiters == 0: - self._Cleanup() - return True + self._nwaiters += 1 + try: + if self._poller is None: + (self._read_fd, self._write_fd) = os.pipe() + self._poller = select.poll() + self._poller.register(self._read_fd, select.POLLHUP) - return False + wait_fn = self._waiter_class(self._poller, self._read_fd) + self.release() + try: + # Wait for notification + wait_fn(timeout) + finally: + # Re-acquire lock + self.acquire() + finally: + self._nwaiters -= 1 + if self._nwaiters == 0: + self._Cleanup() def notifyAll(self): """Close the writing side of the pipe to notify all waiters. """ - if self._write_fd is None: - raise RuntimeError("Can only notify once") - - os.close(self._write_fd) - self._write_fd = None - - def _Cleanup(self): - """Close all file descriptors. - - """ - if self._read_fd is not None: - os.close(self._read_fd) - self._read_fd = None - + self._check_owned() + self._check_unnotified() + self._notified = True if self._write_fd is not None: os.close(self._write_fd) self._write_fd = None - self._poller = None - - def __del__(self): - """Called on object deletion. - - Ensure no file descriptors are left open. - """ - self._Cleanup() - - -class _PipeCondition(_BaseCondition): +class PipeCondition(_BaseCondition): """Group-only non-polling condition with counters. This condition class uses pipes and poll, internally, to be able to wait for @@ -284,10 +250,10 @@ class _PipeCondition(_BaseCondition): """ __slots__ = _BaseCondition.__slots__ + [ "_nwaiters", - "_pipe", + "_single_condition", ] - _pipe_class = _SingleActionPipeCondition + _single_condition_class = SingleNotifyPipeCondition def __init__(self, lock): """Initializes this class. @@ -295,7 +261,7 @@ class _PipeCondition(_BaseCondition): """ _BaseCondition.__init__(self, lock) self._nwaiters = 0 - self._pipe = None + self._single_condition = self._single_condition_class(self._lock) def wait(self, timeout=None): """Wait for a notification. @@ -306,32 +272,14 @@ class _PipeCondition(_BaseCondition): """ self._check_owned() - if not self._pipe: - self._pipe = self._pipe_class() - # Keep local reference to the pipe. It could be replaced by another thread # notifying while we're waiting. - pipe = self._pipe + my_condition = self._single_condition assert self._nwaiters >= 0 self._nwaiters += 1 try: - # Get function to wait on the pipe - wait_fn = pipe.StartWaiting() - try: - # Release lock while waiting - self.release() - try: - # Wait for notification - wait_fn(timeout) - finally: - # Re-acquire lock - self.acquire() - finally: - # Destroy pipe if this was the last waiter and the current pipe is - # still the same. The same pipe cannot be reused after cleanup. - if pipe.DoneWaiting() and pipe == self._pipe: - self._pipe = None + my_condition.wait(timeout) finally: assert self._nwaiters > 0 self._nwaiters -= 1 @@ -341,12 +289,8 @@ class _PipeCondition(_BaseCondition): """ self._check_owned() - - # Notify and forget pipe. A new one will be created on the next call to - # wait. - if self._pipe is not None: - self._pipe.notifyAll() - self._pipe = None + self._single_condition.notifyAll() + self._single_condition = self._single_condition_class(self._lock) def has_waiting(self): """Returns whether there are active waiters. @@ -387,7 +331,7 @@ class _CountingCondition(object): """Waits for the condition to be notified. @type timeout: float or None - @param timeout: Timeout in seconds + @param timeout: Waiting timeout (can be None) """ assert self._nwaiters >= 0 @@ -427,7 +371,7 @@ class SharedLock(object): "__shr", ] - __condition_class = _PipeCondition + __condition_class = PipeCondition def __init__(self): """Construct a new SharedLock. diff --git a/test/ganeti.locking_unittest.py b/test/ganeti.locking_unittest.py index 2300517dba3a984c12a908e329d320b2cda4b413..259d46e102f9781a52bfa925f5037dc7eddf48a1 100755 --- a/test/ganeti.locking_unittest.py +++ b/test/ganeti.locking_unittest.py @@ -114,11 +114,37 @@ class _ConditionTestCase(_ThreadedTestCase): self.assert_(not self.cond._is_owned()) +class TestSingleNotifyPipeCondition(_ConditionTestCase): + """SingleNotifyPipeCondition tests""" + + def setUp(self): + _ConditionTestCase.setUp(self, locking.SingleNotifyPipeCondition) + + def testAcquireRelease(self): + self._testAcquireRelease() + + def testNotification(self): + self._testNotification() + + def testWaitReuse(self): + self.cond.acquire() + self.cond.wait(0) + self.cond.wait(0.1) + self.cond.release() + + def testNoNotifyReuse(self): + self.cond.acquire() + self.cond.notifyAll() + self.assertRaises(RuntimeError, self.cond.wait) + self.assertRaises(RuntimeError, self.cond.notifyAll) + self.cond.release() + + class TestPipeCondition(_ConditionTestCase): - """_PipeCondition tests""" + """PipeCondition tests""" def setUp(self): - _ConditionTestCase.setUp(self, locking._PipeCondition) + _ConditionTestCase.setUp(self, locking.PipeCondition) def testAcquireRelease(self): self._testAcquireRelease() @@ -214,86 +240,6 @@ class TestPipeCondition(_ConditionTestCase): self.assertRaises(Queue.Empty, self.done.get_nowait) -class TestSingleActionPipeCondition(unittest.TestCase): - """_SingleActionPipeCondition tests""" - - def setUp(self): - self.cond = locking._SingleActionPipeCondition() - - def testInitialization(self): - self.assert_(self.cond._read_fd is not None) - self.assert_(self.cond._write_fd is not None) - self.assert_(self.cond._poller is not None) - self.assertEqual(self.cond._nwaiters, 0) - - def testUsageCount(self): - self.cond.StartWaiting() - self.assert_(self.cond._read_fd is not None) - self.assert_(self.cond._write_fd is not None) - self.assert_(self.cond._poller is not None) - self.assertEqual(self.cond._nwaiters, 1) - - # use again - self.cond.StartWaiting() - self.assertEqual(self.cond._nwaiters, 2) - - # there is more than one user - self.assert_(not self.cond.DoneWaiting()) - self.assert_(self.cond._read_fd is not None) - self.assert_(self.cond._write_fd is not None) - self.assert_(self.cond._poller is not None) - self.assertEqual(self.cond._nwaiters, 1) - - self.assert_(self.cond.DoneWaiting()) - self.assertEqual(self.cond._nwaiters, 0) - self.assert_(self.cond._read_fd is None) - self.assert_(self.cond._write_fd is None) - self.assert_(self.cond._poller is None) - - def testNotify(self): - wait1 = self.cond.StartWaiting() - wait2 = self.cond.StartWaiting() - - self.assert_(self.cond._read_fd is not None) - self.assert_(self.cond._write_fd is not None) - self.assert_(self.cond._poller is not None) - - self.cond.notifyAll() - - self.assert_(self.cond._read_fd is not None) - self.assert_(self.cond._write_fd is None) - self.assert_(self.cond._poller is not None) - - self.assert_(not self.cond.DoneWaiting()) - - self.assert_(self.cond._read_fd is not None) - self.assert_(self.cond._write_fd is None) - self.assert_(self.cond._poller is not None) - - self.assert_(self.cond.DoneWaiting()) - - self.assert_(self.cond._read_fd is None) - self.assert_(self.cond._write_fd is None) - self.assert_(self.cond._poller is None) - - def testReusage(self): - self.cond.StartWaiting() - self.assert_(self.cond._read_fd is not None) - self.assert_(self.cond._write_fd is not None) - self.assert_(self.cond._poller is not None) - - self.assert_(self.cond.DoneWaiting()) - - self.assertRaises(RuntimeError, self.cond.StartWaiting) - self.assert_(self.cond._read_fd is None) - self.assert_(self.cond._write_fd is None) - self.assert_(self.cond._poller is None) - - def testNotifyTwice(self): - self.cond.notifyAll() - self.assertRaises(RuntimeError, self.cond.notifyAll) - - class TestSharedLock(_ThreadedTestCase): """SharedLock tests"""