Commit d76167a5 authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

Add _SingleActionPipeCondition class



This class will be used as a basic block for pipe(2)-based
conditions. Upon initialization it creates a pipe and can be
notified once (hence the “single action” in the name). A
callable helper class is used to wait for notifications.
Signed-off-by: default avatarGuido Trotter <ultrotter@google.com>
Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarGuido Trotter <ultrotter@google.com>
parent 84e344d4
......@@ -20,7 +20,11 @@
"""Module implementing the Ganeti locking code."""
import os
import select
import threading
import time
import errno
from ganeti import errors
from ganeti import utils
......@@ -45,6 +49,180 @@ def ssynchronized(lock, shared=0):
return wrap
class _SingleActionPipeConditionWaiter(object):
"""Callable helper class for _SingleActionPipeCondition.
"""
__slots__ = [
"_cond",
"_fd",
"_poller",
]
def __init__(self, cond, poller, fd):
"""Initializes this class.
@type cond: L{_SingleActionPipeCondition}
@param cond: Parent condition
@type poller: select.poll
@param poller: Poller object
@type fd: int
@param fd: File descriptor to wait for
"""
object.__init__(self)
self._cond = cond
self._poller = poller
self._fd = fd
def __call__(self, timeout):
"""Wait for something to happen on the pipe.
@type timeout: float or None
@param timeout: Timeout for waiting (can be None)
"""
start_time = time.time()
remaining_time = timeout
while timeout is None or remaining_time > 0:
try:
result = self._poller.poll(remaining_time)
except EnvironmentError, err:
if err.errno != errno.EINTR:
raise
result = None
# Check whether we were notified
if result and result[0][0] == self._fd:
break
# Re-calculate timeout if necessary
if timeout is not None:
remaining_time = start_time + timeout - time.time()
class _SingleActionPipeCondition(object):
"""Wrapper around a pipe for usage inside conditions.
This class contains a POSIX pipe(2) and a poller to poll it. The pipe is
always allocated when constructing the class. Extra care is taken to always
close the file descriptors.
An additional class, L{_SingleActionPipeConditionWaiter}, is used to wait for
notifications.
Warning: This class is designed to be used as the underlying component of a
locking condition, but is not by itself thread safe, and needs to be
protected by an external lock.
"""
__slots__ = [
"_poller",
"_read_fd",
"_write_fd",
"_nwaiters",
]
_waiter_class = _SingleActionPipeConditionWaiter
def __init__(self):
"""Initializes this class.
"""
object.__init__(self)
self._nwaiters = 0
# Just assume the unpacking is successful, otherwise error handling gets
# very complicated.
(self._read_fd, self._write_fd) = os.pipe()
try:
# The poller looks for closure of the write side
poller = select.poll()
poller.register(self._read_fd, select.POLLHUP)
self._poller = poller
except:
if self._read_fd is not None:
os.close(self._read_fd)
if self._write_fd is not None:
os.close(self._write_fd)
raise
# There should be no code here anymore, otherwise the pipe file descriptors
# may be not be cleaned up properly in case of errors.
def StartWaiting(self):
"""Return function to wait for notification.
@rtype: L{_SingleActionPipeConditionWaiter}
@return: Function to wait for notification
"""
assert self._nwaiters >= 0
if self._poller is None:
raise RuntimeError("Already cleaned up")
# Create waiter function and increase number of waiters
wait_fn = self._waiter_class(self, self._poller, self._read_fd)
self._nwaiters += 1
return wait_fn
def DoneWaiting(self):
"""Decrement number of waiters and automatic cleanup.
Must be called after waiting for a notification.
@rtype: bool
@return: Whether this was the last waiter
"""
assert self._nwaiters > 0
self._nwaiters -= 1
if self._nwaiters == 0:
self._Cleanup()
return True
return False
def notifyAll(self):
"""Close the writing side of the pipe to notify all waiters.
"""
if self._write_fd is None:
raise RuntimeError("Can only notify once")
os.close(self._write_fd)
self._write_fd = None
def _Cleanup(self):
"""Close all file descriptors.
"""
if self._read_fd is not None:
os.close(self._read_fd)
self._read_fd = None
if self._write_fd is not None:
os.close(self._write_fd)
self._write_fd = None
self._poller = None
def __del__(self):
"""Called on object deletion.
Ensure no file descriptors are left open.
"""
self._Cleanup()
class _CountingCondition(object):
"""Wrapper for Python's built-in threading.Condition class.
......
......@@ -69,6 +69,86 @@ class _ThreadedTestCase(unittest.TestCase):
self.threads = []
class TestSingleActionPipeCondition(unittest.TestCase):
"""_SingleActionPipeCondition tests"""
def setUp(self):
self.cond = locking._SingleActionPipeCondition()
def testInitialization(self):
self.assert_(self.cond._read_fd is not None)
self.assert_(self.cond._write_fd is not None)
self.assert_(self.cond._poller is not None)
self.assertEqual(self.cond._nwaiters, 0)
def testUsageCount(self):
self.cond.StartWaiting()
self.assert_(self.cond._read_fd is not None)
self.assert_(self.cond._write_fd is not None)
self.assert_(self.cond._poller is not None)
self.assertEqual(self.cond._nwaiters, 1)
# use again
self.cond.StartWaiting()
self.assertEqual(self.cond._nwaiters, 2)
# there is more than one user
self.assert_(not self.cond.DoneWaiting())
self.assert_(self.cond._read_fd is not None)
self.assert_(self.cond._write_fd is not None)
self.assert_(self.cond._poller is not None)
self.assertEqual(self.cond._nwaiters, 1)
self.assert_(self.cond.DoneWaiting())
self.assertEqual(self.cond._nwaiters, 0)
self.assert_(self.cond._read_fd is None)
self.assert_(self.cond._write_fd is None)
self.assert_(self.cond._poller is None)
def testNotify(self):
wait1 = self.cond.StartWaiting()
wait2 = self.cond.StartWaiting()
self.assert_(self.cond._read_fd is not None)
self.assert_(self.cond._write_fd is not None)
self.assert_(self.cond._poller is not None)
self.cond.notifyAll()
self.assert_(self.cond._read_fd is not None)
self.assert_(self.cond._write_fd is None)
self.assert_(self.cond._poller is not None)
self.assert_(not self.cond.DoneWaiting())
self.assert_(self.cond._read_fd is not None)
self.assert_(self.cond._write_fd is None)
self.assert_(self.cond._poller is not None)
self.assert_(self.cond.DoneWaiting())
self.assert_(self.cond._read_fd is None)
self.assert_(self.cond._write_fd is None)
self.assert_(self.cond._poller is None)
def testReusage(self):
self.cond.StartWaiting()
self.assert_(self.cond._read_fd is not None)
self.assert_(self.cond._write_fd is not None)
self.assert_(self.cond._poller is not None)
self.assert_(self.cond.DoneWaiting())
self.assertRaises(RuntimeError, self.cond.StartWaiting)
self.assert_(self.cond._read_fd is None)
self.assert_(self.cond._write_fd is None)
self.assert_(self.cond._poller is None)
def testNotifyTwice(self):
self.cond.notifyAll()
self.assertRaises(RuntimeError, self.cond.notifyAll)
class TestSharedLock(_ThreadedTestCase):
"""SharedLock tests"""
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment