diff --git a/Makefile.am b/Makefile.am index e1325321b7c610a8abf2d5b08be167b5677a017f..dcdf17e1c964a2e9537366794f3a9cb520f19ac6 100644 --- a/Makefile.am +++ b/Makefile.am @@ -337,9 +337,11 @@ TEST_FILES = \ test/import-export_unittest-helper python_tests = \ + test/ganeti.asyncnotifier_unittest.py \ test/ganeti.backend_unittest.py \ test/ganeti.bdev_unittest.py \ test/ganeti.cli_unittest.py \ + test/ganeti.daemon_unittest.py \ test/ganeti.cmdlib_unittest.py \ test/ganeti.compat_unittest.py \ test/ganeti.confd.client_unittest.py \ diff --git a/daemons/ganeti-confd b/daemons/ganeti-confd index 290f6188710436feef2d8c02876bb8e9dfc35be1..958e1a133e50948250b38f899628b0d668e72846 100755 --- a/daemons/ganeti-confd +++ b/daemons/ganeti-confd @@ -88,96 +88,6 @@ class ConfdAsyncUDPServer(daemon.AsyncUDPSocket): logging.error("Reply too big to fit in an udp packet.") -class ConfdInotifyEventHandler(pyinotify.ProcessEvent): - - def __init__(self, watch_manager, callback, - filename=constants.CLUSTER_CONF_FILE): - """Constructor for ConfdInotifyEventHandler - - @type watch_manager: pyinotify.WatchManager - @param watch_manager: ganeti-confd inotify watch manager - @type callback: function accepting a boolean - @param callback: function to call when an inotify event happens - @type filename: string - @param filename: config file to watch - - """ - # pylint: disable-msg=W0231 - # no need to call the parent's constructor - self.watch_manager = watch_manager - self.callback = callback - self.mask = pyinotify.EventsCodes.ALL_FLAGS["IN_IGNORED"] | \ - pyinotify.EventsCodes.ALL_FLAGS["IN_MODIFY"] - self.file = filename - self.watch_handle = None - - def enable(self): - """Watch the given file - - """ - if self.watch_handle is None: - result = self.watch_manager.add_watch(self.file, self.mask) - if not self.file in result or result[self.file] <= 0: - raise errors.InotifyError("Could not add inotify watcher") - else: - self.watch_handle = result[self.file] - - def disable(self): - """Stop watching the given file - - """ - if self.watch_handle is not None: - result = self.watch_manager.rm_watch(self.watch_handle) - if result[self.watch_handle]: - self.watch_handle = None - - def process_IN_IGNORED(self, event): - # Due to the fact that we monitor just for the cluster config file (rather - # than for the whole data dir) when the file is replaced with another one - # (which is what happens normally in ganeti) we're going to receive an - # IN_IGNORED event from inotify, because of the file removal (which is - # contextual with the replacement). In such a case we need to create - # another watcher for the "new" file. - logging.debug("Received 'ignored' inotify event for %s", event.path) - self.watch_handle = None - - try: - # Since the kernel believes the file we were interested in is gone, it's - # not going to notify us of any other events, until we set up, here, the - # new watch. This is not a race condition, though, since we're anyway - # going to realod the file after setting up the new watch. - self.callback(False) - except errors.ConfdFatalError, err: - logging.critical("Critical error, shutting down: %s", err) - sys.exit(constants.EXIT_FAILURE) - except: - # we need to catch any exception here, log it, but proceed, because even - # if we failed handling a single request, we still want the confd to - # continue working. - logging.error("Unexpected exception", exc_info=True) - - def process_IN_MODIFY(self, event): - # This gets called when the config file is modified. Note that this doesn't - # usually happen in Ganeti, as the config file is normally replaced by a - # new one, at filesystem level, rather than actually modified (see - # utils.WriteFile) - logging.debug("Received 'modify' inotify event for %s", event.path) - - try: - self.callback(True) - except errors.ConfdFatalError, err: - logging.critical("Critical error, shutting down: %s", err) - sys.exit(constants.EXIT_FAILURE) - except: - # we need to catch any exception here, log it, but proceed, because even - # if we failed handling a single request, we still want the confd to - # continue working. - logging.error("Unexpected exception", exc_info=True) - - def process_default(self, event): - logging.error("Received unhandled inotify event: %s", event) - - class ConfdConfigurationReloader(object): """Logic to control when to reload the ganeti configuration @@ -202,8 +112,11 @@ class ConfdConfigurationReloader(object): self.last_notification = 0 # Asyncronous inotify handler for config changes + cfg_file = constants.CLUSTER_CONF_FILE self.wm = pyinotify.WatchManager() - self.inotify_handler = ConfdInotifyEventHandler(self.wm, self.OnInotify) + self.inotify_handler = asyncnotifier.SingleFileEventHandler(self.wm, + self.OnInotify, + cfg_file) self.notifier = asyncnotifier.AsyncNotifier(self.wm, self.inotify_handler) self.timer_handle = None diff --git a/devel/review b/devel/review index 70cdf81447c1e1dd19ce2733b9b1514d66d53631..97433e4e469eb9c83d970d9033cba1bd81c081fa 100755 --- a/devel/review +++ b/devel/review @@ -196,14 +196,31 @@ copy_commit() { GIT_EDITOR="$me --commit-editor \"\$@\"" git commit -c "$rev" -s } -main() { - local range="$1" target_branch="$2" +usage() { + echo "Usage: $me_plain [from..to] <target-branch>" >&2 + echo " If not passed from..to defaults to target-branch..HEAD" >&2 + exit 1 +} - if [[ -z "$target_branch" || "$range" != *..* ]] - then - echo "Usage: $me_plain <from..to> <target-branch>" >&2 - exit 1 - fi +main() { + local range target_branch + + case "$#" in + 1) + target_branch="$1" + range="$target_branch..$(git rev-parse HEAD)" + ;; + 2) + range="$1" + target_branch="$2" + if [[ "$range" != *..* ]]; then + usage + fi + ;; + *) + usage + ;; + esac git checkout "$target_branch" local old_head=$(git rev-parse HEAD) diff --git a/lib/asyncnotifier.py b/lib/asyncnotifier.py index 63c020fa541b9dd4d922e7f18faa34ff2151d0ef..b576a5d6f566f6624707ce30d1bcb454b3e61580 100644 --- a/lib/asyncnotifier.py +++ b/lib/asyncnotifier.py @@ -23,6 +23,7 @@ import asyncore +import logging try: # pylint: disable-msg=E0611 @@ -30,7 +31,11 @@ try: except ImportError: import pyinotify +from ganeti import errors +# We contributed the AsyncNotifier class back to python-pyinotify, and it's +# part of their codebase since version 0.8.7. This code can be removed once +# we'll be ready to depend on python-pyinotify >= 0.8.7 class AsyncNotifier(asyncore.file_dispatcher): """An asyncore dispatcher for inotify events. @@ -58,3 +63,93 @@ class AsyncNotifier(asyncore.file_dispatcher): def handle_read(self): self.notifier.read_events() self.notifier.process_events() + + +class SingleFileEventHandler(pyinotify.ProcessEvent): + """Handle modify events for a single file. + + """ + + def __init__(self, watch_manager, callback, filename): + """Constructor for SingleFileEventHandler + + @type watch_manager: pyinotify.WatchManager + @param watch_manager: inotify watch manager + @type callback: function accepting a boolean + @param callback: function to call when an inotify event happens + @type filename: string + @param filename: config file to watch + + """ + # pylint: disable-msg=W0231 + # no need to call the parent's constructor + self.watch_manager = watch_manager + self.callback = callback + self.mask = pyinotify.EventsCodes.ALL_FLAGS["IN_IGNORED"] | \ + pyinotify.EventsCodes.ALL_FLAGS["IN_MODIFY"] + self.file = filename + self.watch_handle = None + + def enable(self): + """Watch the given file + + """ + if self.watch_handle is None: + result = self.watch_manager.add_watch(self.file, self.mask) + if not self.file in result or result[self.file] <= 0: + raise errors.InotifyError("Could not add inotify watcher") + else: + self.watch_handle = result[self.file] + + def disable(self): + """Stop watching the given file + + """ + if self.watch_handle is not None: + result = self.watch_manager.rm_watch(self.watch_handle) + if result[self.watch_handle]: + self.watch_handle = None + + # pylint: disable-msg=C0103 + # this overrides a method in pyinotify.ProcessEvent + def process_IN_IGNORED(self, event): + # Due to the fact that we monitor just for the cluster config file (rather + # than for the whole data dir) when the file is replaced with another one + # (which is what happens normally in ganeti) we're going to receive an + # IN_IGNORED event from inotify, because of the file removal (which is + # contextual with the replacement). In such a case we need to create + # another watcher for the "new" file. + logging.debug("Received 'ignored' inotify event for %s", event.path) + self.watch_handle = None + + try: + # Since the kernel believes the file we were interested in is gone, it's + # not going to notify us of any other events, until we set up, here, the + # new watch. This is not a race condition, though, since we're anyway + # going to realod the file after setting up the new watch. + self.callback(False) + except: # pylint: disable-msg=W0702 + # we need to catch any exception here, log it, but proceed, because even + # if we failed handling a single request, we still want our daemon to + # proceed. + logging.error("Unexpected exception", exc_info=True) + + # pylint: disable-msg=C0103 + # this overrides a method in pyinotify.ProcessEvent + def process_IN_MODIFY(self, event): + # This gets called when the config file is modified. Note that this doesn't + # usually happen in Ganeti, as the config file is normally replaced by a + # new one, at filesystem level, rather than actually modified (see + # utils.WriteFile) + logging.debug("Received 'modify' inotify event for %s", event.path) + + try: + self.callback(True) + except: # pylint: disable-msg=W0702 + # we need to catch any exception here, log it, but proceed, because even + # if we failed handling a single request, we still want our daemon to + # proceed. + logging.error("Unexpected exception", exc_info=True) + + def process_default(self, event): + logging.error("Received unhandled inotify event: %s", event) diff --git a/lib/errors.py b/lib/errors.py index a50b0376000c6b00b747b432de4dd057313b3f6e..17a02371db97acff54783036ef7696de6f6a93d1 100644 --- a/lib/errors.py +++ b/lib/errors.py @@ -305,15 +305,6 @@ class JobQueueFull(JobQueueError): """ -class ConfdFatalError(GenericError): - """A fatal failure in Ganeti confd. - - Events that compromise the ability of confd to proceed further. - (for example: inability to load the config file) - - """ - - class ConfdRequestError(GenericError): """A request error in Ganeti confd. diff --git a/test/ganeti.asyncnotifier_unittest.py b/test/ganeti.asyncnotifier_unittest.py new file mode 100755 index 0000000000000000000000000000000000000000..f487af77259d9bf9a8cf033babe9e00a4ce8b60e --- /dev/null +++ b/test/ganeti.asyncnotifier_unittest.py @@ -0,0 +1,120 @@ +#!/usr/bin/python +# + +# Copyright (C) 2010 Google Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA +# 02110-1301, USA. + + +"""Script for unittesting the asyncnotifier module""" + +import unittest +import signal +import os + +try: + # pylint: disable-msg=E0611 + from pyinotify import pyinotify +except ImportError: + import pyinotify + +from ganeti import asyncnotifier +from ganeti import daemon +from ganeti import utils + +import testutils + + +class TestSingleFileEventHandler(testutils.GanetiTestCase): + """Test daemon.Mainloop""" + + def setUp(self): + testutils.GanetiTestCase.setUp(self) + self.mainloop = daemon.Mainloop() + notifier_count = 2 + self.chk_files = [self._CreateTempFile() for i in range(notifier_count)] + self.notified = [False for i in range(notifier_count)] + # We need one watch manager per notifier, as those contain the file + # descriptor which is monitored by asyncore + self.wms = [pyinotify.WatchManager() for i in range(notifier_count)] + self.cbk = [self.OnInotifyCallback(self.notified, i) + for i in range(notifier_count)] + self.ihandler = [asyncnotifier.SingleFileEventHandler(self.wms[i], + self.cbk[i], + self.chk_files[i]) + for i in range(notifier_count)] + self.notifiers = [asyncnotifier.AsyncNotifier(self.wms[i], + self.ihandler[i]) + for i in range(notifier_count)] + # notifier 0 is enabled by default, as we use it to get out of the loop + self.ihandler[0].enable() + + class OnInotifyCallback: + def __init__(self, notified, i): + self.notified = notified + self.i = i + + def __call__(self, enabled): + self.notified[self.i] = True + # notifier 0 is special as we use it to terminate the mainloop + if self.i == 0: + os.kill(os.getpid(), signal.SIGTERM) + + def testReplace(self): + utils.WriteFile(self.chk_files[0], data="dummy") + self.mainloop.Run() + self.assert_(self.notified[0]) + self.assert_(not self.notified[1]) + + def testEnableDisable(self): + self.ihandler[0].enable() + self.ihandler[0].disable() + self.ihandler[0].disable() + self.ihandler[0].enable() + self.ihandler[0].disable() + self.ihandler[0].enable() + utils.WriteFile(self.chk_files[0], data="dummy") + self.mainloop.Run() + self.assert_(self.notified[0]) + self.assert_(not self.notified[1]) + + def testDoubleEnable(self): + self.ihandler[0].enable() + self.ihandler[0].enable() + utils.WriteFile(self.chk_files[0], data="dummy") + self.mainloop.Run() + self.assert_(self.notified[0]) + self.assert_(not self.notified[1]) + + def testDefaultDisabled(self): + utils.WriteFile(self.chk_files[1], data="dummy") + utils.WriteFile(self.chk_files[0], data="dummy") + self.mainloop.Run() + self.assert_(self.notified[0]) + # notifier 1 is disabled by default + self.assert_(not self.notified[1]) + + def testBothEnabled(self): + self.ihandler[1].enable() + utils.WriteFile(self.chk_files[1], data="dummy") + utils.WriteFile(self.chk_files[0], data="dummy") + self.mainloop.Run() + self.assert_(self.notified[0]) + self.assert_(self.notified[1]) + + +if __name__ == "__main__": + testutils.GanetiTestProgram() diff --git a/test/ganeti.daemon_unittest.py b/test/ganeti.daemon_unittest.py new file mode 100755 index 0000000000000000000000000000000000000000..ad5a12852904f2c68f7ae1f635afd833c4753a67 --- /dev/null +++ b/test/ganeti.daemon_unittest.py @@ -0,0 +1,214 @@ +#!/usr/bin/python +# + +# Copyright (C) 2010 Google Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA +# 02110-1301, USA. + + +"""Script for unittesting the daemon module""" + +import unittest +import signal +import os +import socket +import time + +from ganeti import daemon + +import testutils + + +class TestMainloop(testutils.GanetiTestCase): + """Test daemon.Mainloop""" + + def setUp(self): + testutils.GanetiTestCase.setUp(self) + self.mainloop = daemon.Mainloop() + self.sendsig_events = [] + self.onsignal_events = [] + + def _CancelEvent(self, handle): + self.mainloop.scheduler.cancel(handle) + + def _SendSig(self, sig): + self.sendsig_events.append(sig) + os.kill(os.getpid(), sig) + + def OnSignal(self, signum): + self.onsignal_events.append(signum) + + def testRunAndTermBySched(self): + self.mainloop.scheduler.enter(0.1, 1, self._SendSig, [signal.SIGTERM]) + self.mainloop.Run() # terminates by _SendSig being scheduled + self.assertEquals(self.sendsig_events, [signal.SIGTERM]) + + def testSchedulerCancel(self): + handle = self.mainloop.scheduler.enter(0.1, 1, self._SendSig, + [signal.SIGTERM]) + self.mainloop.scheduler.cancel(handle) + self.mainloop.scheduler.enter(0.2, 1, self._SendSig, [signal.SIGCHLD]) + self.mainloop.scheduler.enter(0.3, 1, self._SendSig, [signal.SIGTERM]) + self.mainloop.Run() + self.assertEquals(self.sendsig_events, [signal.SIGCHLD, signal.SIGTERM]) + + def testRegisterSignal(self): + self.mainloop.RegisterSignal(self) + self.mainloop.scheduler.enter(0.1, 1, self._SendSig, [signal.SIGCHLD]) + handle = self.mainloop.scheduler.enter(0.1, 1, self._SendSig, + [signal.SIGTERM]) + self.mainloop.scheduler.cancel(handle) + self.mainloop.scheduler.enter(0.2, 1, self._SendSig, [signal.SIGCHLD]) + self.mainloop.scheduler.enter(0.3, 1, self._SendSig, [signal.SIGTERM]) + # ...not delievered because they are scheduled after TERM + self.mainloop.scheduler.enter(0.4, 1, self._SendSig, [signal.SIGCHLD]) + self.mainloop.scheduler.enter(0.5, 1, self._SendSig, [signal.SIGCHLD]) + self.mainloop.Run() + self.assertEquals(self.sendsig_events, + [signal.SIGCHLD, signal.SIGCHLD, signal.SIGTERM]) + self.assertEquals(self.onsignal_events, self.sendsig_events) + + def testDeferredCancel(self): + self.mainloop.RegisterSignal(self) + now = time.time() + self.mainloop.scheduler.enterabs(now + 0.1, 1, self._SendSig, + [signal.SIGCHLD]) + handle1 = self.mainloop.scheduler.enterabs(now + 0.3, 2, self._SendSig, + [signal.SIGCHLD]) + handle2 = self.mainloop.scheduler.enterabs(now + 0.4, 2, self._SendSig, + [signal.SIGCHLD]) + self.mainloop.scheduler.enterabs(now + 0.2, 1, self._CancelEvent, + [handle1]) + self.mainloop.scheduler.enterabs(now + 0.2, 1, self._CancelEvent, + [handle2]) + self.mainloop.scheduler.enter(0.5, 1, self._SendSig, [signal.SIGTERM]) + self.mainloop.Run() + self.assertEquals(self.sendsig_events, [signal.SIGCHLD, signal.SIGTERM]) + self.assertEquals(self.onsignal_events, self.sendsig_events) + + def testReRun(self): + self.mainloop.RegisterSignal(self) + self.mainloop.scheduler.enter(0.1, 1, self._SendSig, [signal.SIGCHLD]) + self.mainloop.scheduler.enter(0.2, 1, self._SendSig, [signal.SIGCHLD]) + self.mainloop.scheduler.enter(0.3, 1, self._SendSig, [signal.SIGTERM]) + self.mainloop.scheduler.enter(0.4, 1, self._SendSig, [signal.SIGCHLD]) + self.mainloop.scheduler.enter(0.5, 1, self._SendSig, [signal.SIGCHLD]) + self.mainloop.Run() + self.assertEquals(self.sendsig_events, + [signal.SIGCHLD, signal.SIGCHLD, signal.SIGTERM]) + self.assertEquals(self.onsignal_events, self.sendsig_events) + self.mainloop.scheduler.enter(0.3, 1, self._SendSig, [signal.SIGTERM]) + self.mainloop.Run() + self.assertEquals(self.sendsig_events, + [signal.SIGCHLD, signal.SIGCHLD, signal.SIGTERM, + signal.SIGCHLD, signal.SIGCHLD, signal.SIGTERM]) + self.assertEquals(self.onsignal_events, self.sendsig_events) + + def testPriority(self): + # for events at the same time, the highest priority one executes first + now = time.time() + self.mainloop.scheduler.enterabs(now + 0.1, 2, self._SendSig, + [signal.SIGCHLD]) + self.mainloop.scheduler.enterabs(now + 0.1, 1, self._SendSig, + [signal.SIGTERM]) + self.mainloop.Run() + self.assertEquals(self.sendsig_events, [signal.SIGTERM]) + self.mainloop.scheduler.enter(0.2, 1, self._SendSig, [signal.SIGTERM]) + self.mainloop.Run() + self.assertEquals(self.sendsig_events, + [signal.SIGTERM, signal.SIGCHLD, signal.SIGTERM]) + + +class _MyAsyncUDPSocket(daemon.AsyncUDPSocket): + + def __init__(self): + daemon.AsyncUDPSocket.__init__(self) + self.received = [] + self.error_count = 0 + + def handle_datagram(self, payload, ip, port): + self.received.append((payload)) + if payload == "terminate": + os.kill(os.getpid(), signal.SIGTERM) + elif payload == "error": + raise errors.GenericError("error") + + def handle_error(self): + self.error_count += 1 + + +class TestAsyncUDPSocket(testutils.GanetiTestCase): + """Test daemon.AsyncUDPSocket""" + + def setUp(self): + testutils.GanetiTestCase.setUp(self) + self.mainloop = daemon.Mainloop() + self.server = _MyAsyncUDPSocket() + self.client = _MyAsyncUDPSocket() + self.server.bind(("127.0.0.1", 0)) + self.port = self.server.getsockname()[1] + + def tearDown(self): + self.server.close() + self.client.close() + testutils.GanetiTestCase.tearDown(self) + + def testNoDoubleBind(self): + self.assertRaises(socket.error, self.client.bind, ("127.0.0.1", self.port)) + + def _ThreadedClient(self, payload): + self.client.enqueue_send("127.0.0.1", self.port, payload) + print "sending %s" % payload + while self.client.writable(): + self.client.handle_write() + + def testAsyncClientServer(self): + self.client.enqueue_send("127.0.0.1", self.port, "p1") + self.client.enqueue_send("127.0.0.1", self.port, "p2") + self.client.enqueue_send("127.0.0.1", self.port, "terminate") + self.mainloop.Run() + self.assertEquals(self.server.received, ["p1", "p2", "terminate"]) + + def testSyncClientServer(self): + self.client.enqueue_send("127.0.0.1", self.port, "p1") + self.client.enqueue_send("127.0.0.1", self.port, "p2") + while self.client.writable(): + self.client.handle_write() + self.server.process_next_packet() + self.assertEquals(self.server.received, ["p1"]) + self.server.process_next_packet() + self.assertEquals(self.server.received, ["p1", "p2"]) + self.client.enqueue_send("127.0.0.1", self.port, "p3") + while self.client.writable(): + self.client.handle_write() + self.server.process_next_packet() + self.assertEquals(self.server.received, ["p1", "p2", "p3"]) + + def testErrorHandling(self): + self.client.enqueue_send("127.0.0.1", self.port, "p1") + self.client.enqueue_send("127.0.0.1", self.port, "p2") + self.client.enqueue_send("127.0.0.1", self.port, "error") + self.client.enqueue_send("127.0.0.1", self.port, "p3") + self.client.enqueue_send("127.0.0.1", self.port, "error") + self.client.enqueue_send("127.0.0.1", self.port, "terminate") + self.mainloop.Run() + self.assertEquals(self.server.received, + ["p1", "p2", "error", "p3", "error", "terminate"]) + self.assertEquals(self.server.error_count, 2) + + +if __name__ == "__main__": + testutils.GanetiTestProgram()