Skip to content
Snippets Groups Projects
ganeti.daemon_unittest.py 8.74 KiB
Newer Older
#!/usr/bin/python
#

# Copyright (C) 2010 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.


"""Script for unittesting the daemon module"""

import unittest
import signal
import os
Guido Trotter's avatar
Guido Trotter committed
import socket

from ganeti import daemon

import testutils


class TestMainloop(testutils.GanetiTestCase):
  """Test daemon.Mainloop"""

  def setUp(self):
    testutils.GanetiTestCase.setUp(self)
    self.mainloop = daemon.Mainloop()
    self.sendsig_events = []
    self.onsignal_events = []

  def _CancelEvent(self, handle):
    self.mainloop.scheduler.cancel(handle)

  def _SendSig(self, sig):
    self.sendsig_events.append(sig)
    os.kill(os.getpid(), sig)

  def OnSignal(self, signum):
    self.onsignal_events.append(signum)

  def testRunAndTermBySched(self):
    self.mainloop.scheduler.enter(0.1, 1, self._SendSig, [signal.SIGTERM])
    self.mainloop.Run() # terminates by _SendSig being scheduled
    self.assertEquals(self.sendsig_events, [signal.SIGTERM])

  def testTerminatingSignals(self):
    self.mainloop.scheduler.enter(0.1, 1, self._SendSig, [signal.SIGCHLD])
    self.mainloop.scheduler.enter(0.2, 1, self._SendSig, [signal.SIGINT])
    self.mainloop.Run()
    self.assertEquals(self.sendsig_events, [signal.SIGCHLD, signal.SIGINT])
    self.mainloop.scheduler.enter(0.1, 1, self._SendSig, [signal.SIGTERM])
    self.mainloop.Run()
    self.assertEquals(self.sendsig_events, [signal.SIGCHLD, signal.SIGINT,
                                            signal.SIGTERM])

  def testSchedulerCancel(self):
    handle = self.mainloop.scheduler.enter(0.1, 1, self._SendSig,
                                           [signal.SIGTERM])
    self.mainloop.scheduler.cancel(handle)
    self.mainloop.scheduler.enter(0.2, 1, self._SendSig, [signal.SIGCHLD])
    self.mainloop.scheduler.enter(0.3, 1, self._SendSig, [signal.SIGTERM])
    self.mainloop.Run()
    self.assertEquals(self.sendsig_events, [signal.SIGCHLD, signal.SIGTERM])

  def testRegisterSignal(self):
    self.mainloop.RegisterSignal(self)
    self.mainloop.scheduler.enter(0.1, 1, self._SendSig, [signal.SIGCHLD])
    handle = self.mainloop.scheduler.enter(0.1, 1, self._SendSig,
                                           [signal.SIGTERM])
    self.mainloop.scheduler.cancel(handle)
    self.mainloop.scheduler.enter(0.2, 1, self._SendSig, [signal.SIGCHLD])
    self.mainloop.scheduler.enter(0.3, 1, self._SendSig, [signal.SIGTERM])
    # ...not delievered because they are scheduled after TERM
    self.mainloop.scheduler.enter(0.4, 1, self._SendSig, [signal.SIGCHLD])
    self.mainloop.scheduler.enter(0.5, 1, self._SendSig, [signal.SIGCHLD])
    self.mainloop.Run()
    self.assertEquals(self.sendsig_events,
                      [signal.SIGCHLD, signal.SIGCHLD, signal.SIGTERM])
    self.assertEquals(self.onsignal_events, self.sendsig_events)

  def testDeferredCancel(self):
    self.mainloop.RegisterSignal(self)
    now = time.time()
    self.mainloop.scheduler.enterabs(now + 0.1, 1, self._SendSig,
                                     [signal.SIGCHLD])
    handle1 = self.mainloop.scheduler.enterabs(now + 0.3, 2, self._SendSig,
                                               [signal.SIGCHLD])
    handle2 = self.mainloop.scheduler.enterabs(now + 0.4, 2, self._SendSig,
                                               [signal.SIGCHLD])
    self.mainloop.scheduler.enterabs(now + 0.2, 1, self._CancelEvent,
                                     [handle1])
    self.mainloop.scheduler.enterabs(now + 0.2, 1, self._CancelEvent,
                                     [handle2])
    self.mainloop.scheduler.enter(0.5, 1, self._SendSig, [signal.SIGTERM])
    self.mainloop.Run()
    self.assertEquals(self.sendsig_events, [signal.SIGCHLD, signal.SIGTERM])
    self.assertEquals(self.onsignal_events, self.sendsig_events)

Guido Trotter's avatar
Guido Trotter committed
  def testReRun(self):
    self.mainloop.RegisterSignal(self)
    self.mainloop.scheduler.enter(0.1, 1, self._SendSig, [signal.SIGCHLD])
    self.mainloop.scheduler.enter(0.2, 1, self._SendSig, [signal.SIGCHLD])
    self.mainloop.scheduler.enter(0.3, 1, self._SendSig, [signal.SIGTERM])
    self.mainloop.scheduler.enter(0.4, 1, self._SendSig, [signal.SIGCHLD])
    self.mainloop.scheduler.enter(0.5, 1, self._SendSig, [signal.SIGCHLD])
    self.mainloop.Run()
    self.assertEquals(self.sendsig_events,
                      [signal.SIGCHLD, signal.SIGCHLD, signal.SIGTERM])
    self.assertEquals(self.onsignal_events, self.sendsig_events)
    self.mainloop.scheduler.enter(0.3, 1, self._SendSig, [signal.SIGTERM])
    self.mainloop.Run()
    self.assertEquals(self.sendsig_events,
                      [signal.SIGCHLD, signal.SIGCHLD, signal.SIGTERM,
                       signal.SIGCHLD, signal.SIGCHLD, signal.SIGTERM])
    self.assertEquals(self.onsignal_events, self.sendsig_events)

  def testPriority(self):
    # for events at the same time, the highest priority one executes first
    now = time.time()
    self.mainloop.scheduler.enterabs(now + 0.1, 2, self._SendSig,
                                     [signal.SIGCHLD])
    self.mainloop.scheduler.enterabs(now + 0.1, 1, self._SendSig,
                                     [signal.SIGTERM])
    self.mainloop.Run()
    self.assertEquals(self.sendsig_events, [signal.SIGTERM])
    self.mainloop.scheduler.enter(0.2, 1, self._SendSig, [signal.SIGTERM])
    self.mainloop.Run()
    self.assertEquals(self.sendsig_events,
                      [signal.SIGTERM, signal.SIGCHLD, signal.SIGTERM])

Guido Trotter's avatar
Guido Trotter committed
class _MyAsyncUDPSocket(daemon.AsyncUDPSocket):

  def __init__(self):
    daemon.AsyncUDPSocket.__init__(self)
    self.received = []
    self.error_count = 0

  def handle_datagram(self, payload, ip, port):
    self.received.append((payload))
    if payload == "terminate":
      os.kill(os.getpid(), signal.SIGTERM)
    elif payload == "error":
      raise errors.GenericError("error")

  def handle_error(self):
    self.error_count += 1


class TestAsyncUDPSocket(testutils.GanetiTestCase):
  """Test daemon.AsyncUDPSocket"""

  def setUp(self):
    testutils.GanetiTestCase.setUp(self)
    self.mainloop = daemon.Mainloop()
    self.server = _MyAsyncUDPSocket()
    self.client = _MyAsyncUDPSocket()
    self.server.bind(("127.0.0.1", 0))
    self.port = self.server.getsockname()[1]

  def tearDown(self):
    self.server.close()
    self.client.close()
    testutils.GanetiTestCase.tearDown(self)

  def testNoDoubleBind(self):
    self.assertRaises(socket.error, self.client.bind, ("127.0.0.1", self.port))

  def _ThreadedClient(self, payload):
    self.client.enqueue_send("127.0.0.1", self.port, payload)
    print "sending %s" % payload
    while self.client.writable():
      self.client.handle_write()

  def testAsyncClientServer(self):
    self.client.enqueue_send("127.0.0.1", self.port, "p1")
    self.client.enqueue_send("127.0.0.1", self.port, "p2")
    self.client.enqueue_send("127.0.0.1", self.port, "terminate")
    self.mainloop.Run()
    self.assertEquals(self.server.received, ["p1", "p2", "terminate"])

  def testSyncClientServer(self):
    self.client.enqueue_send("127.0.0.1", self.port, "p1")
    self.client.enqueue_send("127.0.0.1", self.port, "p2")
    while self.client.writable():
      self.client.handle_write()
    self.server.process_next_packet()
    self.assertEquals(self.server.received, ["p1"])
    self.server.process_next_packet()
    self.assertEquals(self.server.received, ["p1", "p2"])
    self.client.enqueue_send("127.0.0.1", self.port, "p3")
    while self.client.writable():
      self.client.handle_write()
    self.server.process_next_packet()
    self.assertEquals(self.server.received, ["p1", "p2", "p3"])

  def testErrorHandling(self):
    self.client.enqueue_send("127.0.0.1", self.port, "p1")
    self.client.enqueue_send("127.0.0.1", self.port, "p2")
    self.client.enqueue_send("127.0.0.1", self.port, "error")
    self.client.enqueue_send("127.0.0.1", self.port, "p3")
    self.client.enqueue_send("127.0.0.1", self.port, "error")
    self.client.enqueue_send("127.0.0.1", self.port, "terminate")
    self.mainloop.Run()
    self.assertEquals(self.server.received,
                      ["p1", "p2", "error", "p3", "error", "terminate"])
    self.assertEquals(self.server.error_count, 2)


if __name__ == "__main__":
  testutils.GanetiTestProgram()