Skip to content
Snippets Groups Projects
ganeti-watcher 15.7 KiB
Newer Older
Iustin Pop's avatar
Iustin Pop committed
#!/usr/bin/python
#

# Copyright (C) 2006, 2007, 2008 Google Inc.
Iustin Pop's avatar
Iustin Pop committed
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.


"""Tool to restart erronously downed virtual machines.

This program and set of classes implement a watchdog to restart
virtual machines in a Ganeti cluster that have crashed or been killed
by a node reboot.  Run from cron or similar.

Iustin Pop's avatar
Iustin Pop committed

Iustin Pop's avatar
Iustin Pop committed
# pylint: disable-msg=C0103,W0142

# C0103: Invalid name ganeti-watcher

Iustin Pop's avatar
Iustin Pop committed
import os
import sys
import time
Iustin Pop's avatar
Iustin Pop committed
from optparse import OptionParser

from ganeti import utils
from ganeti import constants
from ganeti import serializer
from ganeti import errors
from ganeti import opcodes
from ganeti import cli
from ganeti import luxi
Iustin Pop's avatar
Iustin Pop committed


BAD_STATES = ['ERROR_down']
HELPLESS_STATES = ['ERROR_nodedown', 'ERROR_nodeoffline']
NOTICE = 'NOTICE'
ERROR = 'ERROR'
KEY_RESTART_COUNT = "restart_count"
KEY_RESTART_WHEN = "restart_when"
KEY_BOOT_ID = "bootid"
# Global client object
client = None


class NotMasterError(errors.GenericError):
  """Exception raised when this host is not the master."""
Iustin Pop's avatar
Iustin Pop committed


def Indent(s, prefix='| '):
  """Indent a piece of text with a given prefix before each line.

Iustin Pop's avatar
Iustin Pop committed
  @param s: the string to indent
  @param prefix: the string to prepend each line
Iustin Pop's avatar
Iustin Pop committed
  """
  return "%s%s\n" % (prefix, ('\n' + prefix).join(s.splitlines()))


def ShouldPause():
  """Check whether we should pause.

  """
  return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))


def EnsureDaemon(name):
  """Check for and start daemon if not alive.
  result = utils.RunCmd([constants.DAEMON_UTIL, "check-and-start", name])
  if result.failed:
    logging.error("Can't start daemon '%s', failure %s, output: %s",
                  name, result.fail_reason, result.output)
    return False
class WatcherState(object):
Iustin Pop's avatar
Iustin Pop committed
  """Interface to a state file recording restart attempts.

  """
  def __init__(self):
    """Open, lock, read and parse the file.

    Raises exception on lock contention.
Iustin Pop's avatar
Iustin Pop committed
    # The two-step dance below is necessary to allow both opening existing
    # file read/write and creating if not existing.  Vanilla open will truncate
    # an existing file -or- allow creating if not existing.
    fd = os.open(constants.WATCHER_STATEFILE, os.O_RDWR | os.O_CREAT)
    self.statefile = os.fdopen(fd, 'w+')
Iustin Pop's avatar
Iustin Pop committed

    utils.LockFile(self.statefile.fileno())
Iustin Pop's avatar
Iustin Pop committed

      state_data = self.statefile.read()
      if not state_data:
        self._data = {}
      else:
        self._data = serializer.Load(state_data)
Iustin Pop's avatar
Iustin Pop committed
    except Exception, msg: # pylint: disable-msg=W0703
      # Ignore errors while loading the file and treat it as empty
      logging.warning(("Invalid state file. Using defaults."
    if "instance" not in self._data:
      self._data["instance"] = {}
    if "node" not in self._data:
      self._data["node"] = {}
    self._orig_data = serializer.Dump(self._data)
  def Save(self):
    """Save state to file, then unlock and close it.
    assert self.statefile

    serialized_form = serializer.Dump(self._data)
    if self._orig_data == serialized_form:
      logging.debug("Data didn't change, just touching status file")
      os.utime(constants.WATCHER_STATEFILE, None)
      return

    # We need to make sure the file is locked before renaming it, otherwise
    # starting ganeti-watcher again at the same time will create a conflict.
    fd = utils.WriteFile(constants.WATCHER_STATEFILE,
                         data=serialized_form,
                         prewrite=utils.LockFile, close=False)
    self.statefile = os.fdopen(fd, 'w+')
  def Close(self):
    """Unlock configuration file and close it.

    """
    assert self.statefile

    # Files are automatically unlocked when closing them
    self.statefile.close()
    self.statefile = None

  def GetNodeBootID(self, name):
    """Returns the last boot ID of a node or None.
Iustin Pop's avatar
Iustin Pop committed

    if name in ndata and KEY_BOOT_ID in ndata[name]:
      return ndata[name][KEY_BOOT_ID]
    return None

  def SetNodeBootID(self, name, bootid):
    """Sets the boot ID of a node.

    """
    assert bootid
Iustin Pop's avatar
Iustin Pop committed

Iustin Pop's avatar
Iustin Pop committed

    if name not in ndata:
      ndata[name] = {}

    ndata[name][KEY_BOOT_ID] = bootid

  def NumberOfRestartAttempts(self, instance):
Iustin Pop's avatar
Iustin Pop committed
    """Returns number of previous restart attempts.

Iustin Pop's avatar
Iustin Pop committed
    @type instance: L{Instance}
    @param instance: the instance to look up
Iustin Pop's avatar
Iustin Pop committed
    """
Iustin Pop's avatar
Iustin Pop committed

    if instance.name in idata:
      return idata[instance.name][KEY_RESTART_COUNT]
Iustin Pop's avatar
Iustin Pop committed

    return 0

  def RecordRestartAttempt(self, instance):
Iustin Pop's avatar
Iustin Pop committed
    """Record a restart attempt.

Iustin Pop's avatar
Iustin Pop committed
    @type instance: L{Instance}
    @param instance: the instance being restarted
Iustin Pop's avatar
Iustin Pop committed
    """
Iustin Pop's avatar
Iustin Pop committed

    if instance.name not in idata:
      inst = idata[instance.name] = {}
    else:
      inst = idata[instance.name]
Iustin Pop's avatar
Iustin Pop committed

    inst[KEY_RESTART_WHEN] = time.time()
    inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
Iustin Pop's avatar
Iustin Pop committed

  def RemoveInstance(self, instance):
Iustin Pop's avatar
Iustin Pop committed
    """Update state to reflect that a machine is running.
Iustin Pop's avatar
Iustin Pop committed

Iustin Pop's avatar
Iustin Pop committed
    This method removes the record for a named instance (as we only
    track down instances).
Iustin Pop's avatar
Iustin Pop committed

Iustin Pop's avatar
Iustin Pop committed
    @type instance: L{Instance}
    @param instance: the instance to remove from books
Iustin Pop's avatar
Iustin Pop committed
    """
Iustin Pop's avatar
Iustin Pop committed

    if instance.name in idata:
      del idata[instance.name]
Iustin Pop's avatar
Iustin Pop committed


class Instance(object):
  """Abstraction for a Virtual Machine instance.

  """
  def __init__(self, name, state, autostart):
Iustin Pop's avatar
Iustin Pop committed
    self.name = name
    self.state = state
    self.autostart = autostart
Iustin Pop's avatar
Iustin Pop committed

  def Restart(self):
    """Encapsulates the start of an instance.

    """
    op = opcodes.OpStartupInstance(instance_name=self.name, force=False)
    cli.SubmitOpCode(op, cl=client)
Iustin Pop's avatar
Iustin Pop committed

  def ActivateDisks(self):
    """Encapsulates the activation of all disks of an instance.

    """
    op = opcodes.OpActivateInstanceDisks(instance_name=self.name)
    cli.SubmitOpCode(op, cl=client)
Iustin Pop's avatar
Iustin Pop committed


def GetClusterData():
  """Get a list of instances on this cluster.

  """
  op1_fields = ["name", "status", "admin_state", "snodes"]
  op1 = opcodes.OpQueryInstances(output_fields=op1_fields, names=[],
                                 use_locking=True)
  op2_fields = ["name", "bootid", "offline"]
  op2 = opcodes.OpQueryNodes(output_fields=op2_fields, names=[],
                             use_locking=True)
Iustin Pop's avatar
Iustin Pop committed

  job_id = client.SubmitJob([op1, op2])
Iustin Pop's avatar
Iustin Pop committed

  all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
  logging.debug("Got data from cluster, writing instance status file")

  result = all_results[0]
  smap = {}

  # write the upfile
  up_data = "".join(["%s %s\n" % (fields[0], fields[1]) for fields in result])
  utils.WriteFile(file_name=constants.INSTANCE_UPFILE, data=up_data)

  for fields in result:
    (name, status, autostart, snodes) = fields
    # update the secondary node map
    for node in snodes:
      if node not in smap:
        smap[node] = []
      smap[node].append(name)
Iustin Pop's avatar
Iustin Pop committed

    instances[name] = Instance(name, status, autostart)
  nodes =  dict([(name, (bootid, offline))
                 for name, bootid, offline in all_results[1]])
  client.ArchiveJob(job_id)
  return instances, nodes, smap
Iustin Pop's avatar
Iustin Pop committed


class Watcher(object):
Iustin Pop's avatar
Iustin Pop committed
  """Encapsulate the logic for restarting erronously halted virtual machines.

  The calling program should periodically instantiate me and call Run().
  This will traverse the list of instances, and make up to MAXTRIES attempts
  to restart machines that are down.
Iustin Pop's avatar
Iustin Pop committed
  """
  def __init__(self, opts, notepad):
    self.notepad = notepad
    master = client.QueryConfigValues(["master_node"])[0]
    if master != utils.HostInfo().name:
      raise NotMasterError("This is not the master node")
    # first archive old jobs
    self.ArchiveJobs(opts.job_age)
    # and only then submit new ones
    self.instances, self.bootids, self.smap = GetClusterData()
    self.started_instances = set()
    self.opts = opts
Iustin Pop's avatar
Iustin Pop committed

  def Run(self):
    """Watcher run sequence.

    """
    notepad = self.notepad
    self.CheckInstances(notepad)
    self.CheckDisks(notepad)
    self.VerifyDisks()
  @staticmethod
  def ArchiveJobs(age):
    """Archive old jobs.

    """
    arch_count, left_count = client.AutoArchiveJobs(age)
    logging.debug("Archived %s jobs, left %s", arch_count, left_count)
  def CheckDisks(self, notepad):
    """Check all nodes for restarted ones.
Iustin Pop's avatar
Iustin Pop committed
    """
    for name, (new_id, offline) in self.bootids.iteritems():
      old = notepad.GetNodeBootID(name)
      if new_id is None:
        # Bad node, not returning a boot id
        if not offline:
          logging.debug("Node %s missing boot id, skipping secondary checks",
                        name)
        continue
      if old != new_id:
        # Node's boot ID has changed, proably through a reboot.
        check_nodes.append(name)

    if check_nodes:
      # Activate disks for all instances with any of the checked nodes as a
      # secondary node.
      for node in check_nodes:
        if node not in self.smap:
        for instance_name in self.smap[node]:
          instance = self.instances[instance_name]
          if not instance.autostart:
            logging.info(("Skipping disk activation for non-autostart"
                          " instance %s"), instance.name)
            continue
          if instance.name in self.started_instances:
            # we already tried to start the instance, which should have
            # activated its drives (if they can be at all)
            continue
          try:
            logging.info("Activating disks for instance %s", instance.name)
            instance.ActivateDisks()
Iustin Pop's avatar
Iustin Pop committed
          except Exception: # pylint: disable-msg=W0703
            logging.exception("Error while activating disks for instance %s",
                              instance.name)

      # Keep changed boot IDs
      for name in check_nodes:
        notepad.SetNodeBootID(name, self.bootids[name][0])
Iustin Pop's avatar
Iustin Pop committed

  def CheckInstances(self, notepad):
    """Make a pass over the list of instances, restarting downed ones.

    """
    for instance in self.instances.values():
Iustin Pop's avatar
Iustin Pop committed
      if instance.state in BAD_STATES:
        n = notepad.NumberOfRestartAttempts(instance)
Iustin Pop's avatar
Iustin Pop committed

        if n > MAXTRIES:
          # stay quiet.
          continue
        elif n < MAXTRIES:
          last = " (Attempt #%d)" % (n + 1)
        else:
          notepad.RecordRestartAttempt(instance)
          logging.error("Could not restart %s after %d attempts, giving up",
                        instance.name, MAXTRIES)
Iustin Pop's avatar
Iustin Pop committed
          continue
        try:
          logging.info("Restarting %s%s",
                        instance.name, last)
Iustin Pop's avatar
Iustin Pop committed
          instance.Restart()
          self.started_instances.add(instance.name)
Iustin Pop's avatar
Iustin Pop committed
        except Exception: # pylint: disable-msg=W0703
Iustin Pop's avatar
Iustin Pop committed
          logging.exception("Error while restarting instance %s",
                            instance.name)
Iustin Pop's avatar
Iustin Pop committed

        notepad.RecordRestartAttempt(instance)
Iustin Pop's avatar
Iustin Pop committed
      elif instance.state in HELPLESS_STATES:
        if notepad.NumberOfRestartAttempts(instance):
          notepad.RemoveInstance(instance)
Iustin Pop's avatar
Iustin Pop committed
      else:
        if notepad.NumberOfRestartAttempts(instance):
          notepad.RemoveInstance(instance)
          logging.info("Restart of %s succeeded", instance.name)
Iustin Pop's avatar
Iustin Pop committed

  @staticmethod
  def VerifyDisks():
    """Run gnt-cluster verify-disks.

    """
    op = opcodes.OpVerifyDisks()
    job_id = client.SubmitJob([op])
    result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
    client.ArchiveJob(job_id)
    if not isinstance(result, (tuple, list)):
      logging.error("Can't get a valid result from verify-disks")
      return
    offline_disk_instances = result[2]
    if not offline_disk_instances:
      # nothing to do
      return
    logging.debug("Will activate disks for instances %s",
                  utils.CommaJoin(offline_disk_instances))
    # we submit only one job, and wait for it. not optimal, but spams
    # less the job queue
    job = [opcodes.OpActivateInstanceDisks(instance_name=name)
           for name in offline_disk_instances]
    job_id = cli.SendJob(job, cl=client)

    cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
Iustin Pop's avatar
Iustin Pop committed


def ParseOptions():
  """Parse the command line options.

Iustin Pop's avatar
Iustin Pop committed
  @return: (options, args) as from OptionParser.parse_args()
Iustin Pop's avatar
Iustin Pop committed

  """
  parser = OptionParser(description="Ganeti cluster watcher",
                        usage="%prog [-d]",
                        version="%%prog (ganeti) %s" %
                        constants.RELEASE_VERSION)

  parser.add_option(cli.DEBUG_OPT)
  parser.add_option("-A", "--job-age", dest="job_age",
                    help="Autoarchive jobs older than this age (default"
                    " 6 hours)", default=6*3600)
Iustin Pop's avatar
Iustin Pop committed
  options, args = parser.parse_args()
  options.job_age = cli.ParseTimespec(options.job_age)
Iustin Pop's avatar
Iustin Pop committed
  return options, args


def main():
  """Main function.

  """
Iustin Pop's avatar
Iustin Pop committed
  global client # pylint: disable-msg=W0603
  options, args = ParseOptions()

  if args: # watcher doesn't take any arguments
    print >> sys.stderr, ("Usage: %s [-f] " % sys.argv[0])
    sys.exit(constants.EXIT_FAILURE)
Iustin Pop's avatar
Iustin Pop committed

Iustin Pop's avatar
Iustin Pop committed
  utils.SetupLogging(constants.LOG_WATCHER, debug=options.debug,
                     stderr_logging=options.debug)
Iustin Pop's avatar
Iustin Pop committed

  if ShouldPause():
    logging.debug("Pause has been set, exiting")
    sys.exit(constants.EXIT_SUCCESS)

  update_file = False
Iustin Pop's avatar
Iustin Pop committed
  try:
    # on master or not, try to start the node dameon
    EnsureDaemon(constants.NODED)
    notepad = WatcherState()
      try:
        client = cli.GetClient()
      except errors.OpPrereqError:
        # this is, from cli.GetClient, a not-master case
        logging.debug("Not on master, exiting")
        update_file = True
        sys.exit(constants.EXIT_SUCCESS)
      except luxi.NoMasterError, err:
        logging.warning("Master seems to be down (%s), trying to restart",
                        str(err))
        if not EnsureDaemon(constants.MASTERD):
          logging.critical("Can't start the master, exiting")
          sys.exit(constants.EXIT_FAILURE)
        # else retry the connection
        client = cli.GetClient()
      # we are on master now
      EnsureDaemon(constants.RAPI)
      try:
        watcher = Watcher(options, notepad)
      except errors.ConfigurationError:
        # Just exit if there's no configuration
        update_file = True
        sys.exit(constants.EXIT_SUCCESS)
      update_file = True

      if update_file:
        notepad.Save()
      else:
        logging.debug("Not updating status file due to failure")
    logging.debug("Not master, exiting")
    sys.exit(constants.EXIT_NOTMASTER)
  except errors.ResolverError, err:
    logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
    sys.exit(constants.EXIT_NODESETUP_ERROR)
  except errors.JobQueueFull:
    logging.error("Job queue is full, can't query cluster state")
  except errors.JobQueueDrainError:
    logging.error("Job queue is drained, can't maintain cluster state")
  except Exception, err:
    logging.error(str(err), exc_info=True)
    sys.exit(constants.EXIT_FAILURE)
Iustin Pop's avatar
Iustin Pop committed

Iustin Pop's avatar
Iustin Pop committed
if __name__ == '__main__':
  main()