Skip to content
Snippets Groups Projects
ganeti.utils.log_unittest.py 6.18 KiB
Newer Older
#!/usr/bin/python
#

# Copyright (C) 2011 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.


"""Script for testing ganeti.utils.log"""

import os
import unittest
import logging
import tempfile

from ganeti import constants
from ganeti import errors
from ganeti import utils

import testutils


class TestLogHandler(unittest.TestCase):
  def test(self):
    tmpfile = tempfile.NamedTemporaryFile()

    handler = utils.log._ReopenableLogHandler(tmpfile.name)
    handler.setFormatter(logging.Formatter("%(asctime)s: %(message)s"))

    logger = logging.Logger("TestLogger")
    logger.addHandler(handler)
    self.assertEqual(len(logger.handlers), 1)

    logger.error("Test message ERROR")
    logger.info("Test message INFO")

    logger.removeHandler(handler)
    self.assertFalse(logger.handlers)
    handler.close()

    self.assertEqual(len(utils.ReadFile(tmpfile.name).splitlines()), 2)

  def testReopen(self):
    tmpfile = tempfile.NamedTemporaryFile()
    tmpfile2 = tempfile.NamedTemporaryFile()

    handler = utils.log._ReopenableLogHandler(tmpfile.name)

    self.assertFalse(utils.ReadFile(tmpfile.name))
    self.assertFalse(utils.ReadFile(tmpfile2.name))

    logger = logging.Logger("TestLoggerReopen")
    logger.addHandler(handler)

    for _ in range(3):
      logger.error("Test message ERROR")
    handler.flush()
    self.assertEqual(len(utils.ReadFile(tmpfile.name).splitlines()), 3)
    before_id = utils.GetFileID(tmpfile.name)

    handler.RequestReopen()
    self.assertTrue(handler._reopen)
    self.assertTrue(utils.VerifyFileID(utils.GetFileID(tmpfile.name),
                                       before_id))

    # Rename only after requesting reopen
    os.rename(tmpfile.name, tmpfile2.name)
    assert not os.path.exists(tmpfile.name)

    # Write another message, should reopen
    for _ in range(4):
      logger.info("Test message INFO")
      self.assertFalse(utils.VerifyFileID(utils.GetFileID(tmpfile.name),
                                          before_id))

    logger.removeHandler(handler)
    self.assertFalse(logger.handlers)
    handler.close()

    self.assertEqual(len(utils.ReadFile(tmpfile.name).splitlines()), 4)
    self.assertEqual(len(utils.ReadFile(tmpfile2.name).splitlines()), 3)

  def testConsole(self):
    for (console, check) in [(None, False),
                             (tempfile.NamedTemporaryFile(), True),
                             (self._FailingFile(os.devnull), False)]:
      # Create a handler which will fail when handling errors
      cls = utils.log._LogErrorsToConsole(self._FailingHandler)

      # Instantiate handler with file which will fail when writing,
      # provoking a write to the console
      handler = cls(console, self._FailingFile(os.devnull))

      logger = logging.Logger("TestLogger")
      logger.addHandler(handler)
      self.assertEqual(len(logger.handlers), 1)

      # Provoke write
      logger.error("Test message ERROR")

      # Take everything apart
      logger.removeHandler(handler)
      self.assertFalse(logger.handlers)
      handler.close()

      if console and check:
        console.flush()

        # Check console output
        consout = utils.ReadFile(console.name)
        self.assertTrue("Cannot log message" in consout)
        self.assertTrue("Test message ERROR" in consout)

  class _FailingFile(file):
    def write(self, _):
      raise Exception

  class _FailingHandler(logging.StreamHandler):
    def handleError(self, _):
      raise Exception


class TestSetupLogging(unittest.TestCase):
  def setUp(self):
    self.tmpdir = tempfile.mkdtemp()

  def tearDown(self):
    shutil.rmtree(self.tmpdir)

  def testSimple(self):
    logfile = utils.PathJoin(self.tmpdir, "basic.log")
    logger = logging.Logger("TestLogger")
    self.assertTrue(callable(utils.SetupLogging(logfile, "test",
                                                console_logging=False,
                                                syslog=constants.SYSLOG_NO,
                                                stderr_logging=False,
                                                multithreaded=False,
                                                root_logger=logger)))
    self.assertEqual(utils.ReadFile(logfile), "")
    logger.error("This is a test")

    # Ensure SetupLogging used custom logger
    logging.error("This message should not show up in the test log file")

    self.assertTrue(utils.ReadFile(logfile).endswith("This is a test\n"))

  def testReopen(self):
    logfile = utils.PathJoin(self.tmpdir, "reopen.log")
    logfile2 = utils.PathJoin(self.tmpdir, "reopen.log.OLD")
    logger = logging.Logger("TestLogger")
    reopen_fn = utils.SetupLogging(logfile, "test",
                                   console_logging=False,
                                   syslog=constants.SYSLOG_NO,
                                   stderr_logging=False,
                                   multithreaded=False,
                                   root_logger=logger)
    self.assertTrue(callable(reopen_fn))

    self.assertEqual(utils.ReadFile(logfile), "")
    logger.error("This is a test")
    self.assertTrue(utils.ReadFile(logfile).endswith("This is a test\n"))

    os.rename(logfile, logfile2)
    assert not os.path.exists(logfile)

    # Notify logger to reopen on the next message
    reopen_fn()
    assert not os.path.exists(logfile)

    # Provoke actual reopen
    logger.error("First message")

    self.assertTrue(utils.ReadFile(logfile).endswith("First message\n"))
    self.assertTrue(utils.ReadFile(logfile2).endswith("This is a test\n"))


if __name__ == "__main__":
  testutils.GanetiTestProgram()