From 5a3103e9ef4564d8e8cd9f9d60aac799ccf3c08f Mon Sep 17 00:00:00 2001
From: Michael Hanselmann <hansmi@google.com>
Date: Wed, 10 Oct 2007 08:12:21 +0000
Subject: [PATCH] Detect node restarts and reactivate disks.

- Change format of watcher state file to JSON.
- Move log path for watcher script to constants.py.

Reviewed-by: iustinp
---
 daemons/ganeti-watcher | 280 ++++++++++++++++++++++++++++-------------
 lib/constants.py       |   3 +-
 2 files changed, 196 insertions(+), 87 deletions(-)

diff --git a/daemons/ganeti-watcher b/daemons/ganeti-watcher
index ea2bf6249..c912592e9 100755
--- a/daemons/ganeti-watcher
+++ b/daemons/ganeti-watcher
@@ -24,30 +24,31 @@
 This program and set of classes implement a watchdog to restart
 virtual machines in a Ganeti cluster that have crashed or been killed
 by a node reboot.  Run from cron or similar.
-"""
-
 
-LOGFILE = '/var/log/ganeti/watcher.log'
-MAXTRIES = 5
-BAD_STATES = ['stopped']
-HELPLESS_STATES = ['(node down)']
-NOTICE = 'NOTICE'
-ERROR = 'ERROR'
+"""
 
 import os
 import sys
+import re
 import time
 import fcntl
 import errno
+import simplejson
 from optparse import OptionParser
 
-
 from ganeti import utils
 from ganeti import constants
 from ganeti import ssconf
 from ganeti import errors
 
 
+MAXTRIES = 5
+BAD_STATES = ['stopped']
+HELPLESS_STATES = ['(node down)']
+NOTICE = 'NOTICE'
+ERROR = 'ERROR'
+
+
 class Error(Exception):
   """Generic custom error class."""
 
@@ -88,25 +89,16 @@ def DoCmd(cmd):
   return res
 
 
-class RestarterState(object):
+class WatcherState(object):
   """Interface to a state file recording restart attempts.
 
-  Methods:
-    Open(): open, lock, read and parse the file.
-            Raises StandardError on lock contention.
-
-    NumberOfAttempts(name): returns the number of times in succession
-                            a restart has been attempted of the named instance.
-
-    RecordAttempt(name, when): records one restart attempt of name at
-                               time in when.
-
-    Remove(name): remove record given by name, if exists.
-
-    Save(name): saves all records to file, releases lock and closes file.
-
   """
   def __init__(self):
+    """Open, lock, read and parse the file.
+
+    Raises StandardError on lock contention.
+
+    """
     # The two-step dance below is necessary to allow both opening existing
     # file read/write and creating if not existing.  Vanilla open will truncate
     # an existing file -or- allow creating if not existing.
@@ -121,44 +113,93 @@ class RestarterState(object):
       raise
 
     self.statefile = f
-    self.inst_map = {}
 
-    for line in f:
-      name, when, count = line.rstrip().split(':')
+    try:
+      self.data = simplejson.load(self.statefile)
+    except Exception, msg:
+      # Ignore errors while loading the file and treat it as empty
+      self.data = {}
+      sys.stderr.write("Empty or invalid state file. "
+          "Using defaults. Error message: %s\n" % msg)
+
+    if "instance" not in self.data:
+      self.data["instance"] = {}
+    if "node" not in self.data:
+      self.data["node"] = {}
+
+  def __del__(self):
+    """Called on destruction.
+
+    """
+    if self.statefile:
+      self._Close()
+
+  def _Close(self):
+    """Unlock configuration file and close it.
+
+    """
+    assert self.statefile
+
+    fcntl.flock(self.statefile.fileno(), fcntl.LOCK_UN)
+
+    self.statefile.close()
+    self.statefile = None
+
+  def GetNodeBootID(self, name):
+    """Returns the last boot ID of a node or None.
 
-      when = int(when)
-      count = int(count)
+    """
+    ndata = self.data["node"]
+
+    if name in ndata and "bootid" in ndata[name]:
+      return ndata[name]["bootid"]
+    return None
+
+  def SetNodeBootID(self, name, bootid):
+    """Sets the boot ID of a node.
+
+    """
+    assert bootid
 
-      self.inst_map[name] = (when, count)
+    ndata = self.data["node"]
 
-  def NumberOfAttempts(self, instance):
+    if name not in ndata:
+      ndata[name] = {}
+
+    ndata[name]["bootid"] = bootid
+
+  def NumberOfRestartAttempts(self, instance):
     """Returns number of previous restart attempts.
 
     Args:
       instance - the instance to look up.
 
     """
-    assert self.statefile
+    idata = self.data["instance"]
 
-    if instance.name in self.inst_map:
-      return self.inst_map[instance.name][1]
+    if instance.name in idata:
+      return idata[instance.name]["restart_count"]
 
     return 0
 
-  def RecordAttempt(self, instance):
+  def RecordRestartAttempt(self, instance):
     """Record a restart attempt.
 
     Args:
       instance - the instance being restarted
 
     """
-    assert self.statefile
+    idata = self.data["instance"]
 
-    when = time.time()
+    if instance.name not in idata:
+      inst = idata[instance.name] = {}
+    else:
+      inst = idata[instance.name]
 
-    self.inst_map[instance.name] = (when, 1 + self.NumberOfAttempts(instance))
+    inst["restart_when"] = time.time()
+    inst["restart_count"] = idata.get("restart_count", 0) + 1
 
-  def Remove(self, instance):
+  def RemoveInstance(self, instance):
     """Update state to reflect that a machine is running, i.e. remove record.
 
     Args:
@@ -167,13 +208,13 @@ class RestarterState(object):
     This method removes the record for a named instance.
 
     """
-    assert self.statefile
+    idata = self.data["instance"]
 
-    if instance.name in self.inst_map:
-      del self.inst_map[instance.name]
+    if instance.name in idata:
+      del idata[instance.name]
 
   def Save(self):
-    """Save records to file, then unlock and close file.
+    """Save state to file, then unlock and close it.
 
     """
     assert self.statefile
@@ -181,13 +222,9 @@ class RestarterState(object):
     self.statefile.seek(0)
     self.statefile.truncate()
 
-    for name in self.inst_map:
-      print >> self.statefile, "%s:%d:%d" % ((name,) + self.inst_map[name])
+    simplejson.dump(self.data, self.statefile)
 
-    fcntl.flock(self.statefile.fileno(), fcntl.LOCK_UN)
-
-    self.statefile.close()
-    self.statefile = None
+    self._Close()
 
 
 class Instance(object):
@@ -197,46 +234,82 @@ class Instance(object):
     Restart(): issue a command to restart the represented machine.
 
   """
-  def __init__(self, name, state):
+  def __init__(self, name, state, autostart):
     self.name = name
     self.state = state
+    self.autostart = autostart
 
   def Restart(self):
     """Encapsulates the start of an instance.
 
-    This is currently done using the command line interface and not
-    the Ganeti modules.
-
     """
     DoCmd(['gnt-instance', 'startup', '--lock-retries=15', self.name])
 
+  def ActivateDisks(self):
+    """Encapsulates the activation of all disks of an instance.
+
+    """
+    DoCmd(['gnt-instance', 'activate-disks', '--lock-retries=15', self.name])
+
 
-class InstanceList(object):
-  """The set of Virtual Machine instances on a cluster.
+def _RunListCmd(cmd):
+  """Runs a command and parses its output into lists.
 
   """
-  cmd = ['gnt-instance', 'list', '--lock-retries=15',
-         '-o', 'name,admin_state,oper_state', '--no-headers', '--separator=:']
+  for line in DoCmd(cmd).stdout.splitlines():
+    yield line.split(':')
 
-  def __init__(self):
-    res = DoCmd(self.cmd)
 
-    lines = res.stdout.splitlines()
+def GetInstanceList(with_secondaries=None):
+  """Get a list of instances on this cluster.
+
+  """
+  cmd = ['gnt-instance', 'list', '--lock-retries=15', '--no-headers',
+         '--separator=:']
+
+  fields = 'name,oper_state,admin_state'
 
-    self.instances = []
-    for line in lines:
-      fields = [fld.strip() for fld in line.split(':')]
+  if with_secondaries is not None:
+    fields += ',snodes'
 
-      if len(fields) != 3:
+  cmd.append('-o')
+  cmd.append(fields)
+
+  instances = []
+  for fields in _RunListCmd(cmd):
+    if with_secondaries is not None:
+      (name, status, autostart, snodes) = fields
+
+      if snodes == "-":
         continue
-      if fields[1] == "no": #no autostart, we don't care about this instance
+
+      for node in with_secondaries:
+        if node in snodes.split(','):
+          break
+      else:
         continue
-      name, status = fields[0], fields[2]
 
-      self.instances.append(Instance(name, status))
+    else:
+      (name, status, autostart) = fields
+
+    instances.append(Instance(name, status, autostart != "no"))
 
-  def __iter__(self):
-    return self.instances.__iter__()
+  return instances
+
+
+def GetNodeBootIDs():
+  """Get a dict mapping nodes to boot IDs.
+
+  """
+  cmd = ['gnt-node', 'list', '--lock-retries=15', '--no-headers',
+         '--separator=:', '-o', 'name,bootid']
+
+  ids = {}
+  for fields in _RunListCmd(cmd):
+    (name, bootid) = fields
+    ids[name] = bootid
+
+  return ids
 
 
 class Message(object):
@@ -252,7 +325,7 @@ class Message(object):
     return self.level + ' ' + time.ctime(self.when) + '\n' + Indent(self.msg)
 
 
-class Restarter(object):
+class Watcher(object):
   """Encapsulate the logic for restarting erronously halted virtual machines.
 
   The calling program should periodically instantiate me and call Run().
@@ -265,18 +338,54 @@ class Restarter(object):
     master = sstore.GetMasterNode()
     if master != utils.HostInfo().name:
       raise NotMasterError("This is not the master node")
-    self.instances = InstanceList()
+    self.instances = GetInstanceList()
+    self.bootids = GetNodeBootIDs()
     self.messages = []
 
   def Run(self):
-    """Make a pass over the list of instances, restarting downed ones.
+    notepad = WatcherState()
+    self.CheckInstances(notepad)
+    self.CheckDisks(notepad)
+    notepad.Save()
+
+  def CheckDisks(self, notepad):
+    """Check all nodes for restarted ones.
 
     """
-    notepad = RestarterState()
+    check_nodes = []
+    for name, id in self.bootids.iteritems():
+      old = notepad.GetNodeBootID(name)
+      if old != id:
+        # Node's boot ID has changed, proably through a reboot.
+        check_nodes.append(name)
+
+    if check_nodes:
+      # Activate disks for all instances with any of the checked nodes as a
+      # secondary node.
+      for instance in GetInstanceList(with_secondaries=check_nodes):
+        try:
+          self.messages.append(Message(NOTICE,
+                                       "Activating disks for %s." %
+                                       instance.name))
+          instance.ActivateDisks()
+        except Error, x:
+          self.messages.append(Message(ERROR, str(x)))
+
+      # Keep changed boot IDs
+      for name in check_nodes:
+        notepad.SetNodeBootID(name, self.bootids[name])
 
+  def CheckInstances(self, notepad):
+    """Make a pass over the list of instances, restarting downed ones.
+
+    """
     for instance in self.instances:
+      # Don't care about manually stopped instances
+      if not instance.autostart:
+        continue
+
       if instance.state in BAD_STATES:
-        n = notepad.NumberOfAttempts(instance)
+        n = notepad.NumberOfRestartAttempts(instance)
 
         if n > MAXTRIES:
           # stay quiet.
@@ -284,7 +393,7 @@ class Restarter(object):
         elif n < MAXTRIES:
           last = " (Attempt #%d)" % (n + 1)
         else:
-          notepad.RecordAttempt(instance)
+          notepad.RecordRestartAttempt(instance)
           self.messages.append(Message(ERROR, "Could not restart %s for %d"
                                        " times, giving up..." %
                                        (instance.name, MAXTRIES)))
@@ -297,19 +406,17 @@ class Restarter(object):
         except Error, x:
           self.messages.append(Message(ERROR, str(x)))
 
-        notepad.RecordAttempt(instance)
+        notepad.RecordRestartAttempt(instance)
       elif instance.state in HELPLESS_STATES:
-        if notepad.NumberOfAttempts(instance):
-          notepad.Remove(instance)
+        if notepad.NumberOfRestartAttempts(instance):
+          notepad.RemoveInstance(instance)
       else:
-        if notepad.NumberOfAttempts(instance):
-          notepad.Remove(instance)
+        if notepad.NumberOfRestartAttempts(instance):
+          notepad.RemoveInstance(instance)
           msg = Message(NOTICE,
                         "Restart of %s succeeded." % instance.name)
           self.messages.append(msg)
 
-    notepad.Save()
-
   def WriteReport(self, logfile):
     """Log all messages to file.
 
@@ -347,12 +454,12 @@ def main():
   options, args = ParseOptions()
 
   if not options.debug:
-    sys.stderr = sys.stdout = open(LOGFILE, 'a')
+    sys.stderr = sys.stdout = open(constants.LOG_WATCHER, 'a')
 
   try:
-    restarter = Restarter()
-    restarter.Run()
-    restarter.WriteReport(sys.stdout)
+    watcher = Watcher()
+    watcher.Run()
+    watcher.WriteReport(sys.stdout)
   except NotMasterError:
     if options.debug:
       sys.stderr.write("Not master, exiting.\n")
@@ -363,5 +470,6 @@ def main():
   except Error, err:
     print err
 
+
 if __name__ == '__main__':
   main()
diff --git a/lib/constants.py b/lib/constants.py
index 66c7e8378..6e63cf05a 100644
--- a/lib/constants.py
+++ b/lib/constants.py
@@ -35,7 +35,7 @@ EXPORT_VERSION = 0
 DATA_DIR = _autoconf.LOCALSTATEDIR + "/lib/ganeti"
 CLUSTER_CONF_FILE = DATA_DIR + "/config.data"
 SSL_CERT_FILE = DATA_DIR + "/server.pem"
-WATCHER_STATEFILE = DATA_DIR + "/restart_state"
+WATCHER_STATEFILE = DATA_DIR + "/watcher.data"
 SSH_KNOWN_HOSTS_FILE = DATA_DIR + "/known_hosts"
 
 NODE_INITD_SCRIPT = _autoconf.SYSCONFDIR + "/init.d/ganeti"
@@ -48,6 +48,7 @@ MASTER_SCRIPT = "ganeti-master"
 LOG_DIR = _autoconf.LOCALSTATEDIR + "/log/ganeti"
 LOG_OS_DIR = LOG_DIR + "/os"
 LOG_NODESERVER = LOG_DIR + "/node-daemon.log"
+LOG_WATCHER = LOG_DIR + "/watcher.log"
 
 OS_SEARCH_PATH = _autoconf.OS_SEARCH_PATH
 EXPORT_DIR = _autoconf.EXPORT_DIR
-- 
GitLab