diff --git a/lib/locking.py b/lib/locking.py index 11f4927ab77810444dd082df9cbd2f42e998b2a7..16d2c46ecf01981e6cfd5b9a049f54b23fd9beb3 100644 --- a/lib/locking.py +++ b/lib/locking.py @@ -20,7 +20,11 @@ """Module implementing the Ganeti locking code.""" +import os +import select import threading +import time +import errno from ganeti import errors from ganeti import utils @@ -45,6 +49,180 @@ def ssynchronized(lock, shared=0): return wrap +class _SingleActionPipeConditionWaiter(object): + """Callable helper class for _SingleActionPipeCondition. + + """ + __slots__ = [ + "_cond", + "_fd", + "_poller", + ] + + def __init__(self, cond, poller, fd): + """Initializes this class. + + @type cond: L{_SingleActionPipeCondition} + @param cond: Parent condition + @type poller: select.poll + @param poller: Poller object + @type fd: int + @param fd: File descriptor to wait for + + """ + object.__init__(self) + + self._cond = cond + self._poller = poller + self._fd = fd + + def __call__(self, timeout): + """Wait for something to happen on the pipe. + + @type timeout: float or None + @param timeout: Timeout for waiting (can be None) + + """ + start_time = time.time() + remaining_time = timeout + + while timeout is None or remaining_time > 0: + try: + result = self._poller.poll(remaining_time) + except EnvironmentError, err: + if err.errno != errno.EINTR: + raise + result = None + + # Check whether we were notified + if result and result[0][0] == self._fd: + break + + # Re-calculate timeout if necessary + if timeout is not None: + remaining_time = start_time + timeout - time.time() + + +class _SingleActionPipeCondition(object): + """Wrapper around a pipe for usage inside conditions. + + This class contains a POSIX pipe(2) and a poller to poll it. The pipe is + always allocated when constructing the class. Extra care is taken to always + close the file descriptors. + + An additional class, L{_SingleActionPipeConditionWaiter}, is used to wait for + notifications. + + Warning: This class is designed to be used as the underlying component of a + locking condition, but is not by itself thread safe, and needs to be + protected by an external lock. + + """ + __slots__ = [ + "_poller", + "_read_fd", + "_write_fd", + "_nwaiters", + ] + + _waiter_class = _SingleActionPipeConditionWaiter + + def __init__(self): + """Initializes this class. + + """ + object.__init__(self) + + self._nwaiters = 0 + + # Just assume the unpacking is successful, otherwise error handling gets + # very complicated. + (self._read_fd, self._write_fd) = os.pipe() + try: + # The poller looks for closure of the write side + poller = select.poll() + poller.register(self._read_fd, select.POLLHUP) + + self._poller = poller + except: + if self._read_fd is not None: + os.close(self._read_fd) + if self._write_fd is not None: + os.close(self._write_fd) + raise + + # There should be no code here anymore, otherwise the pipe file descriptors + # may be not be cleaned up properly in case of errors. + + def StartWaiting(self): + """Return function to wait for notification. + + @rtype: L{_SingleActionPipeConditionWaiter} + @return: Function to wait for notification + + """ + assert self._nwaiters >= 0 + + if self._poller is None: + raise RuntimeError("Already cleaned up") + + # Create waiter function and increase number of waiters + wait_fn = self._waiter_class(self, self._poller, self._read_fd) + self._nwaiters += 1 + return wait_fn + + def DoneWaiting(self): + """Decrement number of waiters and automatic cleanup. + + Must be called after waiting for a notification. + + @rtype: bool + @return: Whether this was the last waiter + + """ + assert self._nwaiters > 0 + + self._nwaiters -= 1 + + if self._nwaiters == 0: + self._Cleanup() + return True + + return False + + def notifyAll(self): + """Close the writing side of the pipe to notify all waiters. + + """ + if self._write_fd is None: + raise RuntimeError("Can only notify once") + + os.close(self._write_fd) + self._write_fd = None + + def _Cleanup(self): + """Close all file descriptors. + + """ + if self._read_fd is not None: + os.close(self._read_fd) + self._read_fd = None + + if self._write_fd is not None: + os.close(self._write_fd) + self._write_fd = None + + self._poller = None + + def __del__(self): + """Called on object deletion. + + Ensure no file descriptors are left open. + + """ + self._Cleanup() + + class _CountingCondition(object): """Wrapper for Python's built-in threading.Condition class. diff --git a/test/ganeti.locking_unittest.py b/test/ganeti.locking_unittest.py index 6be0c570ceb012653eb2ed313c39cb4e48777a0b..5b9b2c93e20b671041bcf438bb62f9f6c9b2b911 100755 --- a/test/ganeti.locking_unittest.py +++ b/test/ganeti.locking_unittest.py @@ -69,6 +69,86 @@ class _ThreadedTestCase(unittest.TestCase): self.threads = [] +class TestSingleActionPipeCondition(unittest.TestCase): + """_SingleActionPipeCondition tests""" + + def setUp(self): + self.cond = locking._SingleActionPipeCondition() + + def testInitialization(self): + self.assert_(self.cond._read_fd is not None) + self.assert_(self.cond._write_fd is not None) + self.assert_(self.cond._poller is not None) + self.assertEqual(self.cond._nwaiters, 0) + + def testUsageCount(self): + self.cond.StartWaiting() + self.assert_(self.cond._read_fd is not None) + self.assert_(self.cond._write_fd is not None) + self.assert_(self.cond._poller is not None) + self.assertEqual(self.cond._nwaiters, 1) + + # use again + self.cond.StartWaiting() + self.assertEqual(self.cond._nwaiters, 2) + + # there is more than one user + self.assert_(not self.cond.DoneWaiting()) + self.assert_(self.cond._read_fd is not None) + self.assert_(self.cond._write_fd is not None) + self.assert_(self.cond._poller is not None) + self.assertEqual(self.cond._nwaiters, 1) + + self.assert_(self.cond.DoneWaiting()) + self.assertEqual(self.cond._nwaiters, 0) + self.assert_(self.cond._read_fd is None) + self.assert_(self.cond._write_fd is None) + self.assert_(self.cond._poller is None) + + def testNotify(self): + wait1 = self.cond.StartWaiting() + wait2 = self.cond.StartWaiting() + + self.assert_(self.cond._read_fd is not None) + self.assert_(self.cond._write_fd is not None) + self.assert_(self.cond._poller is not None) + + self.cond.notifyAll() + + self.assert_(self.cond._read_fd is not None) + self.assert_(self.cond._write_fd is None) + self.assert_(self.cond._poller is not None) + + self.assert_(not self.cond.DoneWaiting()) + + self.assert_(self.cond._read_fd is not None) + self.assert_(self.cond._write_fd is None) + self.assert_(self.cond._poller is not None) + + self.assert_(self.cond.DoneWaiting()) + + self.assert_(self.cond._read_fd is None) + self.assert_(self.cond._write_fd is None) + self.assert_(self.cond._poller is None) + + def testReusage(self): + self.cond.StartWaiting() + self.assert_(self.cond._read_fd is not None) + self.assert_(self.cond._write_fd is not None) + self.assert_(self.cond._poller is not None) + + self.assert_(self.cond.DoneWaiting()) + + self.assertRaises(RuntimeError, self.cond.StartWaiting) + self.assert_(self.cond._read_fd is None) + self.assert_(self.cond._write_fd is None) + self.assert_(self.cond._poller is None) + + def testNotifyTwice(self): + self.cond.notifyAll() + self.assertRaises(RuntimeError, self.cond.notifyAll) + + class TestSharedLock(_ThreadedTestCase): """SharedLock tests"""