Skip to content
Snippets Groups Projects
ganeti.asyncnotifier_unittest.py 6.4 KiB
Newer Older
#!/usr/bin/python
#

# Copyright (C) 2010 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.


"""Script for unittesting the asyncnotifier module"""

import unittest
import signal
import os
import tempfile
import shutil

try:
  # pylint: disable-msg=E0611
  from pyinotify import pyinotify
except ImportError:
  import pyinotify

from ganeti import asyncnotifier
from ganeti import daemon
from ganeti import utils
from ganeti import errors

import testutils


class _MyErrorLoggingAsyncNotifier(asyncnotifier.ErrorLoggingAsyncNotifier):
  def __init__(self, *args, **kwargs):
    asyncnotifier.ErrorLoggingAsyncNotifier.__init__(self, *args, **kwargs)
    self.error_count = 0

  def handle_error(self):
    self.error_count += 1
class TestSingleFileEventHandler(testutils.GanetiTestCase):
  """Test daemon.Mainloop"""

  NOTIFIERS = [NOTIFIER_TERM, NOTIFIER_NORM, NOTIFIER_ERR] = range(3)
  def setUp(self):
    testutils.GanetiTestCase.setUp(self)
    self.mainloop = daemon.Mainloop()
    self.chk_files = [self._CreateTempFile() for i in self.NOTIFIERS]
    self.notified = [False for i in self.NOTIFIERS]
    # We need one watch manager per notifier, as those contain the file
    # descriptor which is monitored by asyncore
    self.wms = [pyinotify.WatchManager() for i in self.NOTIFIERS]
    self.cbk = [self.OnInotifyCallback(self, i) for i in self.NOTIFIERS]
    self.ihandler = [asyncnotifier.SingleFileEventHandler(wm, cb, cf)
                     for (wm, cb, cf) in
                     zip(self.wms, self.cbk, self.chk_files)]
    self.notifiers = [_MyErrorLoggingAsyncNotifier(wm, ih)
                      for (wm, ih) in zip(self.wms, self.ihandler)]
    # TERM notifier is enabled by default, as we use it to get out of the loop
    self.ihandler[self.NOTIFIER_TERM].enable()

  class OnInotifyCallback:
    def __init__(self, testobj, i):
      self.testobj = testobj
      self.notified = testobj.notified
      self.i = i

    def __call__(self, enabled):
      self.notified[self.i] = True
      if self.i == self.testobj.NOTIFIER_TERM:
        os.kill(os.getpid(), signal.SIGTERM)
      elif self.i == self.testobj.NOTIFIER_ERR:
        raise errors.GenericError("an error")

  def testReplace(self):
    utils.WriteFile(self.chk_files[self.NOTIFIER_TERM], data="dummy")
    self.mainloop.Run()
    self.assert_(self.notified[self.NOTIFIER_TERM])
    self.assertFalse(self.notified[self.NOTIFIER_NORM])
    self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
    self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)

  def testEnableDisable(self):
    self.ihandler[self.NOTIFIER_TERM].enable()
    self.ihandler[self.NOTIFIER_TERM].disable()
    self.ihandler[self.NOTIFIER_TERM].disable()
    self.ihandler[self.NOTIFIER_TERM].enable()
    self.ihandler[self.NOTIFIER_TERM].disable()
    self.ihandler[self.NOTIFIER_TERM].enable()
    utils.WriteFile(self.chk_files[self.NOTIFIER_TERM], data="dummy")
    self.mainloop.Run()
    self.assert_(self.notified[self.NOTIFIER_TERM])
    self.assertFalse(self.notified[self.NOTIFIER_NORM])
    self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
    self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)

  def testDoubleEnable(self):
    self.ihandler[self.NOTIFIER_TERM].enable()
    self.ihandler[self.NOTIFIER_TERM].enable()
    utils.WriteFile(self.chk_files[self.NOTIFIER_TERM], data="dummy")
    self.mainloop.Run()
    self.assert_(self.notified[self.NOTIFIER_TERM])
    self.assertFalse(self.notified[self.NOTIFIER_NORM])
    self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
    self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)

  def testDefaultDisabled(self):
    utils.WriteFile(self.chk_files[self.NOTIFIER_NORM], data="dummy")
    utils.WriteFile(self.chk_files[self.NOTIFIER_TERM], data="dummy")
    self.mainloop.Run()
    self.assert_(self.notified[self.NOTIFIER_TERM])
    # NORM notifier is disabled by default
    self.assertFalse(self.notified[self.NOTIFIER_NORM])
    self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
    self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)

  def testBothEnabled(self):
    self.ihandler[self.NOTIFIER_NORM].enable()
    utils.WriteFile(self.chk_files[self.NOTIFIER_NORM], data="dummy")
    utils.WriteFile(self.chk_files[self.NOTIFIER_TERM], data="dummy")
    self.mainloop.Run()
    self.assert_(self.notified[self.NOTIFIER_TERM])
    self.assert_(self.notified[self.NOTIFIER_NORM])
    self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
    self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)

  def testError(self):
    self.ihandler[self.NOTIFIER_ERR].enable()
    utils.WriteFile(self.chk_files[self.NOTIFIER_ERR], data="dummy")
    self.assertRaises(errors.GenericError, self.mainloop.Run)
    self.assert_(self.notified[self.NOTIFIER_ERR])
    self.assertEquals(self.notifiers[self.NOTIFIER_ERR].error_count, 1)
    self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
    self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
class TestSingleFileEventHandlerError(unittest.TestCase):
  def setUp(self):
    self.tmpdir = tempfile.mkdtemp()

  def tearDown(self):
    shutil.rmtree(self.tmpdir)

  def test(self):
    wm = pyinotify.WatchManager()
    handler = asyncnotifier.SingleFileEventHandler(wm, None,
                                                   utils.PathJoin(self.tmpdir,
                                                                  "nonexist"))
    self.assertRaises(errors.InotifyError, handler.enable)
    self.assertRaises(errors.InotifyError, handler.enable)
    handler.disable()
    self.assertRaises(errors.InotifyError, handler.enable)


if __name__ == "__main__":
  testutils.GanetiTestProgram()