Skip to content
Snippets Groups Projects
ganeti-confd 12.15 KiB
#!/usr/bin/python
#

# Copyright (C) 2009, Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.


"""Ganeti configuration daemon

Ganeti-confd is a daemon to query master candidates for configuration values.
It uses UDP+HMAC for authentication with a global cluster key.

"""

# pylint: disable-msg=C0103
# C0103: Invalid name ganeti-confd

import os
import sys
import logging
import time

try:
  # pylint: disable-msg=E0611
  from pyinotify import pyinotify
except ImportError:
  import pyinotify

from optparse import OptionParser

from ganeti import asyncnotifier
from ganeti import confd
from ganeti.confd import server as confd_server
from ganeti import constants
from ganeti import errors
from ganeti import daemon
from ganeti import ssconf


class ConfdAsyncUDPServer(daemon.AsyncUDPSocket):
  """The confd udp server, suitable for use with asyncore.

  """
  def __init__(self, bind_address, port, processor):
    """Constructor for ConfdAsyncUDPServer

    @type bind_address: string
    @param bind_address: socket bind address ('' for all)
    @type port: int
    @param port: udp port
    @type processor: L{confd.server.ConfdProcessor}
    @param processor: ConfdProcessor to use to handle queries

    """
    daemon.AsyncUDPSocket.__init__(self)
    self.bind_address = bind_address
    self.port = port
    self.processor = processor
    self.bind((bind_address, port))
    logging.debug("listening on ('%s':%d)", bind_address, port)

  # this method is overriding a daemon.AsyncUDPSocket method
  def handle_datagram(self, payload_in, ip, port):
    try:
      query = confd.UnpackMagic(payload_in)
    except errors.ConfdMagicError, err:
      logging.debug(err)
      return

    answer =  self.processor.ExecQuery(query, ip, port)
    if answer is not None:
      try:
        self.enqueue_send(ip, port, confd.PackMagic(answer))
      except errors.UdpDataSizeError:
        logging.error("Reply too big to fit in an udp packet.")


class ConfdInotifyEventHandler(pyinotify.ProcessEvent):

  def __init__(self, watch_manager, callback,
               filename=constants.CLUSTER_CONF_FILE):
    """Constructor for ConfdInotifyEventHandler

    @type watch_manager: L{pyinotify.WatchManager}
    @param watch_manager: ganeti-confd inotify watch manager
    @type callback: function accepting a boolean
    @param callback: function to call when an inotify event happens
    @type filename: string
    @param filename: config file to watch

    """
    # no need to call the parent's constructor
    self.watch_manager = watch_manager
    self.callback = callback
    # pylint: disable-msg=E1103
    # pylint for some reason doesn't see the below constants
    self.mask = pyinotify.EventsCodes.IN_IGNORED | \
                pyinotify.EventsCodes.IN_MODIFY
    self.file = filename
    self.watch_handle = None

  def enable(self):
    """Watch the given file

    """
    if self.watch_handle is None:
      result = self.watch_manager.add_watch(self.file, self.mask)
      if not self.file in result or result[self.file] <= 0:
        raise errors.InotifyError("Could not add inotify watcher")
      else:
        self.watch_handle = result[self.file]

  def disable(self):
    """Stop watching the given file

    """
    if self.watch_handle is not None:
      result = self.watch_manager.rm_watch(self.watch_handle)
      if result[self.watch_handle]:
        self.watch_handle = None

  def process_IN_IGNORED(self, event):
    # Due to the fact that we monitor just for the cluster config file (rather
    # than for the whole data dir) when the file is replaced with another one
    # (which is what happens normally in ganeti) we're going to receive an
    # IN_IGNORED event from inotify, because of the file removal (which is
    # contextual with the replacement). In such a case we need to create
    # another watcher for the "new" file.
    logging.debug("Received 'ignored' inotify event for %s", event.path)
    self.watch_handle = None

    try:
      # Since the kernel believes the file we were interested in is gone, it's
      # not going to notify us of any other events, until we set up, here, the
      # new watch. This is not a race condition, though, since we're anyway
      # going to realod the file after setting up the new watch.
      self.callback(False)
    except errors.ConfdFatalError, err:
      logging.critical("Critical error, shutting down: %s", err)
      sys.exit(constants.EXIT_FAILURE)
    except:
      # we need to catch any exception here, log it, but proceed, because even
      # if we failed handling a single request, we still want the confd to
      # continue working.
      logging.error("Unexpected exception", exc_info=True)

  def process_IN_MODIFY(self, event):
    # This gets called when the config file is modified. Note that this doesn't
    # usually happen in Ganeti, as the config file is normally replaced by a
    # new one, at filesystem level, rather than actually modified (see
    # utils.WriteFile)
    logging.debug("Received 'modify' inotify event for %s", event.path)

    try:
      self.callback(True)
    except errors.ConfdFatalError, err:
      logging.critical("Critical error, shutting down: %s", err)
      sys.exit(constants.EXIT_FAILURE)
    except:
      # we need to catch any exception here, log it, but proceed, because even
      # if we failed handling a single request, we still want the confd to
      # continue working.
      logging.error("Unexpected exception", exc_info=True)

  def process_default(self, event):
    logging.error("Received unhandled inotify event: %s", event)


class ConfdConfigurationReloader(object):
  """Logic to control when to reload the ganeti configuration

  This class is able to alter between inotify and polling, to rate-limit the
  number of reloads. When using inotify it also supports a fallback timed
  check, to verify that the reload hasn't failed.

  """
  def __init__(self, processor, mainloop):
    """Constructor for ConfdConfigurationReloader

    @type processor: L{confd.server.ConfdProcessor}
    @param processor: ganeti-confd ConfdProcessor
    @type mainloop: L{daemon.Mainloop}
    @param mainloop: ganeti-confd mainloop

    """
    self.processor = processor
    self.mainloop = mainloop

    self.polling = True
    self.last_notification = 0

    # Asyncronous inotify handler for config changes
    self.wm = pyinotify.WatchManager()
    self.inotify_handler = ConfdInotifyEventHandler(self.wm, self.OnInotify)
    self.notifier = asyncnotifier.AsyncNotifier(self.wm, self.inotify_handler)

    self.timer_handle = None
    self._EnableTimer()

  def OnInotify(self, notifier_enabled):
    """Receive an inotify notification.

    @type notifier_enabled: boolean
    @param notifier_enabled: whether the notifier is still enabled

    """
    current_time = time.time()
    time_delta = current_time - self.last_notification
    self.last_notification = current_time

    if time_delta < constants.CONFD_CONFIG_RELOAD_RATELIMIT:
      logging.debug("Moving from inotify mode to polling mode")
      self.polling = True
      if notifier_enabled:
        self.inotify_handler.disable()

    if not self.polling and not notifier_enabled:
      try:
        self.inotify_handler.enable()
      except errors.InotifyError:
        self.polling = True

    try:
      reloaded = self.processor.reader.Reload()
      if reloaded:
        logging.info("Reloaded ganeti config")
      else:
        logging.debug("Skipped double config reload")
    except errors.ConfigurationError:
      self.DisableConfd()
      self.inotify_handler.disable()
      return

    # Reset the timer. If we're polling it will go to the polling rate, if
    # we're not it will delay it again to its base safe timeout.
    self._ResetTimer()

  def _DisableTimer(self):
    if self.timer_handle is not None:
      self.mainloop.scheduler.cancel(self.timer_handle)
      self.timer_handle = None

  def _EnableTimer(self):
    if self.polling:
      timeout = constants.CONFD_CONFIG_RELOAD_RATELIMIT
    else:
      timeout = constants.CONFD_CONFIG_RELOAD_TIMEOUT

    if self.timer_handle is None:
      self.timer_handle = self.mainloop.scheduler.enter(
        timeout, 1, self.OnTimer, [])

  def _ResetTimer(self):
    self._DisableTimer()
    self._EnableTimer()

  def OnTimer(self):
    """Function called when the timer fires

    """
    self.timer_handle = None
    reloaded = False
    was_disabled = False
    try:
      if self.processor.reader is None:
        was_disabled = True
        self.EnableConfd()
        reloaded = True
      else:
        reloaded = self.processor.reader.Reload()
    except errors.ConfigurationError:
      self.DisableConfd(silent=was_disabled)
      return

    if self.polling and reloaded:
      logging.info("Reloaded ganeti config")
    elif reloaded:
      # We have reloaded the config files, but received no inotify event.  If
      # an event is pending though, we just happen to have timed out before
      # receiving it, so this is not a problem, and we shouldn't alert
      if not self.notifier.check_events() and not was_disabled:
        logging.warning("Config file reload at timeout (inotify failure)")
    elif self.polling:
      # We're polling, but we haven't reloaded the config:
      # Going back to inotify mode
      logging.debug("Moving from polling mode to inotify mode")
      self.polling = False
      try:
        self.inotify_handler.enable()
      except errors.InotifyError:
        self.polling = True
    else:
      logging.debug("Performed configuration check")

    self._EnableTimer()

  def DisableConfd(self, silent=False):
    """Puts confd in non-serving mode

    """
    if not silent:
      logging.warning("Confd is being disabled")
    self.processor.Disable()
    self.polling = False
    self._ResetTimer()

  def EnableConfd(self):
    self.processor.Enable()
    logging.warning("Confd is being enabled")
    self.polling = True
    self._ResetTimer()


def CheckConfd(_, args):
  """Initial checks whether to run exit with a failure.

  """
  if args: # confd doesn't take any arguments
    print >> sys.stderr, ("Usage: %s [-f] [-d] [-b ADDRESS]" % sys.argv[0])
    sys.exit(constants.EXIT_FAILURE)

  # TODO: collapse HMAC daemons handling in daemons GenericMain, when we'll
  # have more than one.
  if not os.path.isfile(constants.HMAC_CLUSTER_KEY):
    print >> sys.stderr, "Need HMAC key %s to run" % constants.HMAC_CLUSTER_KEY
    sys.exit(constants.EXIT_FAILURE)


def ExecConfd(options, _):
  """Main confd function, executed with PID file held

  """
  # TODO: clarify how the server and reloader variables work (they are
  # not used)
  # pylint: disable-msg=W0612
  mainloop = daemon.Mainloop()

  # Asyncronous confd UDP server
  processor = confd_server.ConfdProcessor()
  try:
    processor.Enable()
  except errors.ConfigurationError:
    # If enabling the processor has failed, we can still go on, but confd will
    # be disabled
    logging.warning("Confd is starting in disabled mode")

  server = ConfdAsyncUDPServer(options.bind_address, options.port, processor)

  # Configuration reloader
  reloader = ConfdConfigurationReloader(processor, mainloop)

  mainloop.Run()


def main():
  """Main function for the confd daemon.

  """
  parser = OptionParser(description="Ganeti configuration daemon",
                        usage="%prog [-f] [-d] [-b ADDRESS]",
                        version="%%prog (ganeti) %s" %
                        constants.RELEASE_VERSION)

  dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
  dirs.append((constants.LOCK_DIR, 1777))
  daemon.GenericMain(constants.CONFD, parser, dirs, CheckConfd, ExecConfd)


if __name__ == "__main__":
  main()