From 495ba852c42b180d00f3492b7c5b474894a6fbfb Mon Sep 17 00:00:00 2001 From: Guido Trotter <ultrotter@google.com> Date: Tue, 18 May 2010 13:20:46 +0100 Subject: [PATCH] daemon.AsyncAwaker This new asyncore dispatcher can be used to force a thread running the asyncore loop to awake from the select, by signaling it on one of its selected sockets. Signed-off-by: Guido Trotter <ultrotter@google.com> Reviewed-by: Michael Hanselmann <hansmi@google.com> --- lib/daemon.py | 52 ++++++++++++++++++++++++++++ test/ganeti.daemon_unittest.py | 62 ++++++++++++++++++++++++++++++++++ 2 files changed, 114 insertions(+) diff --git a/lib/daemon.py b/lib/daemon.py index 567de3433..dc41fdc5f 100644 --- a/lib/daemon.py +++ b/lib/daemon.py @@ -312,6 +312,58 @@ class AsyncUDPSocket(GanetiBaseAsyncoreDispatcher): return False +class AsyncAwaker(GanetiBaseAsyncoreDispatcher): + """A way to notify the asyncore loop that something is going on. + + If an asyncore daemon is multithreaded when a thread tries to push some data + to a socket, the main loop handling asynchronous requests might be sleeping + waiting on a select(). To avoid this it can create an instance of the + AsyncAwaker, which other threads can use to wake it up. + + """ + def __init__(self, signal_fn=None): + """Constructor for AsyncAwaker + + @type signal_fn: function + @param signal_fn: function to call when awaken + + """ + GanetiBaseAsyncoreDispatcher.__init__(self) + assert signal_fn == None or callable(signal_fn) + (self.in_socket, self.out_socket) = socket.socketpair(socket.AF_UNIX, + socket.SOCK_STREAM) + self.in_socket.setblocking(0) + self.set_socket(self.in_socket) + self.need_signal = True + self.signal_fn = signal_fn + self.connected = True + + # this method is overriding an asyncore.dispatcher method + def handle_read(self): + utils.IgnoreSignals(self.recv, 4096) + if self.signal_fn: + self.signal_fn() + self.need_signal = True + + # this method is overriding an asyncore.dispatcher method + def close(self): + asyncore.dispatcher.close(self) + self.out_socket.close() + + def signal(self): + """Signal the asyncore main loop. + + Any data we send here will be ignored, but it will cause the select() call + to return. + + """ + # Yes, there is a race condition here. No, we don't care, at worst we're + # sending more than one wakeup token, which doesn't harm at all. + if self.need_signal: + self.need_signal = False + self.out_socket.send("\0") + + class Mainloop(object): """Generic mainloop for daemons diff --git a/test/ganeti.daemon_unittest.py b/test/ganeti.daemon_unittest.py index 86d1a4720..968c9cbc9 100755 --- a/test/ganeti.daemon_unittest.py +++ b/test/ganeti.daemon_unittest.py @@ -484,5 +484,67 @@ class TestAsyncStreamServerUnixPath(TestAsyncStreamServerTCP): TestAsyncStreamServerTCP.tearDown(self) +class TestAsyncAwaker(testutils.GanetiTestCase): + """Test daemon.AsyncAwaker""" + + family = socket.AF_INET + + def setUp(self): + testutils.GanetiTestCase.setUp(self) + self.mainloop = daemon.Mainloop() + self.awaker = daemon.AsyncAwaker(signal_fn=self.handle_signal) + self.signal_count = 0 + self.signal_terminate_count = 1 + + def tearDown(self): + self.awaker.close() + + def handle_signal(self): + self.signal_count += 1 + self.signal_terminate_count -= 1 + if self.signal_terminate_count <= 0: + os.kill(os.getpid(), signal.SIGTERM) + + def testBasicSignaling(self): + self.awaker.signal() + self.mainloop.Run() + self.assertEquals(self.signal_count, 1) + + def testDoubleSignaling(self): + self.awaker.signal() + self.awaker.signal() + self.mainloop.Run() + # The second signal is never delivered + self.assertEquals(self.signal_count, 1) + + def testReallyDoubleSignaling(self): + self.assert_(self.awaker.readable()) + self.awaker.signal() + # Let's suppose two threads overlap, and both find need_signal True + self.awaker.need_signal = True + self.awaker.signal() + self.mainloop.Run() + # We still get only one signaling + self.assertEquals(self.signal_count, 1) + + def testNoSignalFnArgument(self): + myawaker = daemon.AsyncAwaker() + self.assertRaises(socket.error, myawaker.handle_read) + myawaker.signal() + myawaker.handle_read() + self.assertRaises(socket.error, myawaker.handle_read) + myawaker.signal() + myawaker.signal() + myawaker.handle_read() + self.assertRaises(socket.error, myawaker.handle_read) + myawaker.close() + + def testWrongSignalFnArgument(self): + self.assertRaises(AssertionError, daemon.AsyncAwaker, 1) + self.assertRaises(AssertionError, daemon.AsyncAwaker, "string") + self.assertRaises(AssertionError, daemon.AsyncAwaker, signal_fn=1) + self.assertRaises(AssertionError, daemon.AsyncAwaker, signal_fn="string") + + if __name__ == "__main__": testutils.GanetiTestProgram() -- GitLab