From 6dfcc47b2b58242e7a50b90cf7e80e7a1fee2fcd Mon Sep 17 00:00:00 2001
From: Iustin Pop <iustin@google.com>
Date: Mon, 6 Apr 2009 08:21:04 +0000
Subject: [PATCH] Change the watcher to use jobs instead of queries

As per the mailing list discussion, this patch changes the watcher to
use a single job (two opcodes) for getting the cluster state (node list
and instance list); it will then compute the needed actions based on
this data.

The patch also archives this job and the verify-disks job.

Reviewed-by: imsnah
---
 daemons/ganeti-watcher | 92 +++++++++++++++++++++---------------------
 1 file changed, 47 insertions(+), 45 deletions(-)

diff --git a/daemons/ganeti-watcher b/daemons/ganeti-watcher
index 2c5948b00..37571a5b7 100755
--- a/daemons/ganeti-watcher
+++ b/daemons/ganeti-watcher
@@ -225,45 +225,42 @@ class Instance(object):
     cli.SubmitOpCode(op, cl=client)
 
 
-def GetInstanceList(with_secondaries=None):
+def GetClusterData():
   """Get a list of instances on this cluster.
 
   """
-  fields = ["name", "status", "admin_state"]
+  op1_fields = ["name", "status", "admin_state", "snodes"]
+  op1 = opcodes.OpQueryInstances(output_fields=op1_fields, names=[],
+                                 use_locking=True)
+  op2_fields = ["name", "bootid", "offline"]
+  op2 = opcodes.OpQueryNodes(output_fields=op2_fields, names=[],
+                             use_locking=True)
 
-  if with_secondaries is not None:
-    fields.append("snodes")
+  job_id = client.SubmitJob([op1, op2])
 
-  result = client.QueryInstances([], fields, True)
+  all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
 
-  instances = []
-  for fields in result:
-    if with_secondaries is not None:
-      (name, status, autostart, snodes) = fields
-
-      if not snodes:
-        continue
+  result = all_results[0]
+  smap = {}
 
-      for node in with_secondaries:
-        if node in snodes:
-          break
-      else:
-        continue
-
-    else:
-      (name, status, autostart) = fields
+  instances = {}
+  for fields in result:
+    (name, status, autostart, snodes) = fields
 
-    instances.append(Instance(name, status, autostart))
+    # update the secondary node map
+    for node in snodes:
+      if node not in smap:
+        smap[node] = []
+      smap[node].append(name)
 
-  return instances
+    instances[name] = Instance(name, status, autostart)
 
+  nodes =  dict([(name, (bootid, offline))
+                 for name, bootid, offline in all_results[1]])
 
-def GetNodeBootIDs():
-  """Get a dict mapping nodes to boot IDs.
+  client.ArchiveJob(job_id)
 
-  """
-  result = client.QueryNodes([], ["name", "bootid", "offline"], True)
-  return dict([(name, (bootid, offline)) for name, bootid, offline in result])
+  return instances, nodes, smap
 
 
 class Watcher(object):
@@ -279,8 +276,7 @@ class Watcher(object):
     master = client.QueryConfigValues(["master_node"])[0]
     if master != utils.HostInfo().name:
       raise NotMasterError("This is not the master node")
-    self.instances = GetInstanceList()
-    self.bootids = GetNodeBootIDs()
+    self.instances, self.bootids, self.smap = GetClusterData()
     self.started_instances = set()
     self.opts = opts
 
@@ -321,21 +317,25 @@ class Watcher(object):
     if check_nodes:
       # Activate disks for all instances with any of the checked nodes as a
       # secondary node.
-      for instance in GetInstanceList(with_secondaries=check_nodes):
-        if not instance.autostart:
-          logging.info(("Skipping disk activation for non-autostart"
-                        " instance %s"), instance.name)
-          continue
-        if instance.name in self.started_instances:
-          # we already tried to start the instance, which should have
-          # activated its drives (if they can be at all)
+      for node in check_nodes:
+        if node not in self.smap:
           continue
-        try:
-          logging.info("Activating disks for instance %s", instance.name)
-          instance.ActivateDisks()
-        except Exception:
-          logging.exception("Error while activating disks for instance %s",
-                            instance.name)
+        for instance_name in self.smap[node]:
+          instance = self.instances[instance_name]
+          if not instance.autostart:
+            logging.info(("Skipping disk activation for non-autostart"
+                          " instance %s"), instance.name)
+            continue
+          if instance.name in self.started_instances:
+            # we already tried to start the instance, which should have
+            # activated its drives (if they can be at all)
+            continue
+          try:
+            logging.info("Activating disks for instance %s", instance.name)
+            instance.ActivateDisks()
+          except Exception:
+            logging.exception("Error while activating disks for instance %s",
+                              instance.name)
 
       # Keep changed boot IDs
       for name in check_nodes:
@@ -345,7 +345,7 @@ class Watcher(object):
     """Make a pass over the list of instances, restarting downed ones.
 
     """
-    for instance in self.instances:
+    for instance in self.instances.values():
       if instance.state in BAD_STATES:
         n = notepad.NumberOfRestartAttempts(instance)
 
@@ -383,7 +383,9 @@ class Watcher(object):
 
     """
     op = opcodes.OpVerifyDisks()
-    result = cli.SubmitOpCode(op, cl=client)
+    job_id = client.SubmitJob([op])
+    result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
+    client.ArchiveJob(job_id)
     if not isinstance(result, (tuple, list)):
       logging.error("Can't get a valid result from verify-disks")
       return
-- 
GitLab