Skip to content
Snippets Groups Projects
ganeti.utils.log_unittest.py 4.12 KiB
Newer Older
#!/usr/bin/python
#

# Copyright (C) 2011 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.


"""Script for testing ganeti.utils.log"""

import os
import unittest
import logging
import tempfile

from ganeti import constants
from ganeti import errors
from ganeti import utils

import testutils


class TestLogHandler(unittest.TestCase):
  def test(self):
    tmpfile = tempfile.NamedTemporaryFile()

    handler = utils.log._ReopenableLogHandler(tmpfile.name)
    handler.setFormatter(logging.Formatter("%(asctime)s: %(message)s"))

    logger = logging.Logger("TestLogger")
    logger.addHandler(handler)
    self.assertEqual(len(logger.handlers), 1)

    logger.error("Test message ERROR")
    logger.info("Test message INFO")

    logger.removeHandler(handler)
    self.assertFalse(logger.handlers)
    handler.close()

    self.assertEqual(len(utils.ReadFile(tmpfile.name).splitlines()), 2)

  def testReopen(self):
    tmpfile = tempfile.NamedTemporaryFile()
    tmpfile2 = tempfile.NamedTemporaryFile()

    handler = utils.log._ReopenableLogHandler(tmpfile.name)

    self.assertFalse(utils.ReadFile(tmpfile.name))
    self.assertFalse(utils.ReadFile(tmpfile2.name))

    logger = logging.Logger("TestLoggerReopen")
    logger.addHandler(handler)

    for _ in range(3):
      logger.error("Test message ERROR")
    handler.flush()
    self.assertEqual(len(utils.ReadFile(tmpfile.name).splitlines()), 3)
    before_id = utils.GetFileID(tmpfile.name)

    handler.RequestReopen()
    self.assertTrue(handler._reopen)
    self.assertTrue(utils.VerifyFileID(utils.GetFileID(tmpfile.name),
                                       before_id))

    # Rename only after requesting reopen
    os.rename(tmpfile.name, tmpfile2.name)
    assert not os.path.exists(tmpfile.name)

    # Write another message, should reopen
    for _ in range(4):
      logger.info("Test message INFO")
      self.assertFalse(utils.VerifyFileID(utils.GetFileID(tmpfile.name),
                                          before_id))

    logger.removeHandler(handler)
    self.assertFalse(logger.handlers)
    handler.close()

    self.assertEqual(len(utils.ReadFile(tmpfile.name).splitlines()), 4)
    self.assertEqual(len(utils.ReadFile(tmpfile2.name).splitlines()), 3)

  def testConsole(self):
    for (console, check) in [(None, False),
                             (tempfile.NamedTemporaryFile(), True),
                             (self._FailingFile(os.devnull), False)]:
      # Create a handler which will fail when handling errors
      cls = utils.log._LogErrorsToConsole(self._FailingHandler)

      # Instantiate handler with file which will fail when writing,
      # provoking a write to the console
      handler = cls(console, self._FailingFile(os.devnull))

      logger = logging.Logger("TestLogger")
      logger.addHandler(handler)
      self.assertEqual(len(logger.handlers), 1)

      # Provoke write
      logger.error("Test message ERROR")

      # Take everything apart
      logger.removeHandler(handler)
      self.assertFalse(logger.handlers)
      handler.close()

      if console and check:
        console.flush()

        # Check console output
        consout = utils.ReadFile(console.name)
        self.assertTrue("Cannot log message" in consout)
        self.assertTrue("Test message ERROR" in consout)

  class _FailingFile(file):
    def write(self, _):
      raise Exception

  class _FailingHandler(logging.StreamHandler):
    def handleError(self, _):
      raise Exception


if __name__ == "__main__":
  testutils.GanetiTestProgram()