diff --git a/lib/daemon.py b/lib/daemon.py index 567de3433c41a6a97a6ca7b1f09d32f8871b4f9f..dc41fdc5f5582a4c0d38e204ccc1a1bfed2db5d3 100644 --- a/lib/daemon.py +++ b/lib/daemon.py @@ -312,6 +312,58 @@ class AsyncUDPSocket(GanetiBaseAsyncoreDispatcher): return False +class AsyncAwaker(GanetiBaseAsyncoreDispatcher): + """A way to notify the asyncore loop that something is going on. + + If an asyncore daemon is multithreaded when a thread tries to push some data + to a socket, the main loop handling asynchronous requests might be sleeping + waiting on a select(). To avoid this it can create an instance of the + AsyncAwaker, which other threads can use to wake it up. + + """ + def __init__(self, signal_fn=None): + """Constructor for AsyncAwaker + + @type signal_fn: function + @param signal_fn: function to call when awaken + + """ + GanetiBaseAsyncoreDispatcher.__init__(self) + assert signal_fn == None or callable(signal_fn) + (self.in_socket, self.out_socket) = socket.socketpair(socket.AF_UNIX, + socket.SOCK_STREAM) + self.in_socket.setblocking(0) + self.set_socket(self.in_socket) + self.need_signal = True + self.signal_fn = signal_fn + self.connected = True + + # this method is overriding an asyncore.dispatcher method + def handle_read(self): + utils.IgnoreSignals(self.recv, 4096) + if self.signal_fn: + self.signal_fn() + self.need_signal = True + + # this method is overriding an asyncore.dispatcher method + def close(self): + asyncore.dispatcher.close(self) + self.out_socket.close() + + def signal(self): + """Signal the asyncore main loop. + + Any data we send here will be ignored, but it will cause the select() call + to return. + + """ + # Yes, there is a race condition here. No, we don't care, at worst we're + # sending more than one wakeup token, which doesn't harm at all. + if self.need_signal: + self.need_signal = False + self.out_socket.send("\0") + + class Mainloop(object): """Generic mainloop for daemons diff --git a/test/ganeti.daemon_unittest.py b/test/ganeti.daemon_unittest.py index 86d1a47208ceffb1c5288cf29c2f1394399b254a..968c9cbc9712926bd20af0d90f28512ce3ca20ba 100755 --- a/test/ganeti.daemon_unittest.py +++ b/test/ganeti.daemon_unittest.py @@ -484,5 +484,67 @@ class TestAsyncStreamServerUnixPath(TestAsyncStreamServerTCP): TestAsyncStreamServerTCP.tearDown(self) +class TestAsyncAwaker(testutils.GanetiTestCase): + """Test daemon.AsyncAwaker""" + + family = socket.AF_INET + + def setUp(self): + testutils.GanetiTestCase.setUp(self) + self.mainloop = daemon.Mainloop() + self.awaker = daemon.AsyncAwaker(signal_fn=self.handle_signal) + self.signal_count = 0 + self.signal_terminate_count = 1 + + def tearDown(self): + self.awaker.close() + + def handle_signal(self): + self.signal_count += 1 + self.signal_terminate_count -= 1 + if self.signal_terminate_count <= 0: + os.kill(os.getpid(), signal.SIGTERM) + + def testBasicSignaling(self): + self.awaker.signal() + self.mainloop.Run() + self.assertEquals(self.signal_count, 1) + + def testDoubleSignaling(self): + self.awaker.signal() + self.awaker.signal() + self.mainloop.Run() + # The second signal is never delivered + self.assertEquals(self.signal_count, 1) + + def testReallyDoubleSignaling(self): + self.assert_(self.awaker.readable()) + self.awaker.signal() + # Let's suppose two threads overlap, and both find need_signal True + self.awaker.need_signal = True + self.awaker.signal() + self.mainloop.Run() + # We still get only one signaling + self.assertEquals(self.signal_count, 1) + + def testNoSignalFnArgument(self): + myawaker = daemon.AsyncAwaker() + self.assertRaises(socket.error, myawaker.handle_read) + myawaker.signal() + myawaker.handle_read() + self.assertRaises(socket.error, myawaker.handle_read) + myawaker.signal() + myawaker.signal() + myawaker.handle_read() + self.assertRaises(socket.error, myawaker.handle_read) + myawaker.close() + + def testWrongSignalFnArgument(self): + self.assertRaises(AssertionError, daemon.AsyncAwaker, 1) + self.assertRaises(AssertionError, daemon.AsyncAwaker, "string") + self.assertRaises(AssertionError, daemon.AsyncAwaker, signal_fn=1) + self.assertRaises(AssertionError, daemon.AsyncAwaker, signal_fn="string") + + if __name__ == "__main__": testutils.GanetiTestProgram()