From 99aabbed5d61268db84cc339ccae129b0d7f867e Mon Sep 17 00:00:00 2001
From: Iustin Pop <iustin@google.com>
Date: Mon, 20 Oct 2008 14:47:17 +0000
Subject: [PATCH] Convert the job queue rpcs to address-based

The two main multi-node job queue RPC calls (jobqueue_update,
jobqueue_rename) are converted to address-based calls, in order to speed
up queue changes. For this, we need to change the _nodes attribute on
the jobqueue to be a dict {name: ip}, instead of a set.

Reviewed-by: imsnah
---
 daemons/ganeti-masterd |  4 ++--
 lib/jqueue.py          | 35 +++++++++++++++++++++++++++--------
 lib/rpc.py             |  8 ++++----
 3 files changed, 33 insertions(+), 14 deletions(-)

diff --git a/daemons/ganeti-masterd b/daemons/ganeti-masterd
index 13fa25d3a..d50909d87 100755
--- a/daemons/ganeti-masterd
+++ b/daemons/ganeti-masterd
@@ -317,7 +317,7 @@ class GanetiContext(object):
     self.cfg.AddNode(node)
 
     # If preseeding fails it'll not be added
-    self.jobqueue.AddNode(node.name)
+    self.jobqueue.AddNode(node)
 
     # Add the new node to the Ganeti Lock Manager
     self.glm.add(locking.LEVEL_NODE, node.name)
@@ -327,7 +327,7 @@ class GanetiContext(object):
 
     """
     # Synchronize the queue again
-    self.jobqueue.AddNode(node.name)
+    self.jobqueue.AddNode(node)
 
   def RemoveNode(self, name):
     """Removes a node from the configuration and lock manager.
diff --git a/lib/jqueue.py b/lib/jqueue.py
index 32af60a34..86c3d8633 100644
--- a/lib/jqueue.py
+++ b/lib/jqueue.py
@@ -364,11 +364,12 @@ class JobQueue(object):
                                            " check in jstore and here")
 
     # Get initial list of nodes
-    self._nodes = set(self.context.cfg.GetNodeList())
+    self._nodes = dict((n.name, n.primary_ip)
+                       for n in self.context.cfg.GetAllNodesInfo().values())
 
     # Remove master node
     try:
-      self._nodes.remove(self._my_hostname)
+      del self._nodes[self._my_hostname]
     except ValueError:
       pass
 
@@ -405,7 +406,14 @@ class JobQueue(object):
 
   @utils.LockedMethod
   @_RequireOpenQueue
-  def AddNode(self, node_name):
+  def AddNode(self, node):
+    """Register a new node with the queue.
+
+    @type node: L{objects.Node}
+    @param node: the node object to be added
+
+    """
+    node_name = node.name
     assert node_name != self._my_hostname
 
     # Clean queue directory on added node
@@ -425,18 +433,19 @@ class JobQueue(object):
       finally:
         fd.close()
 
-      result = RpcRunner.call_jobqueue_update([node_name], file_name, content)
+      result = RpcRunner.call_jobqueue_update([node_name], [node.primary_ip],
+                                              file_name, content)
       if not result[node_name]:
         logging.error("Failed to upload %s to %s", file_name, node_name)
 
-    self._nodes.add(node_name)
+    self._nodes[node_name] = node.primary_ip
 
   @utils.LockedMethod
   @_RequireOpenQueue
   def RemoveNode(self, node_name):
     try:
       # The queue is removed by the "leave node" RPC call.
-      self._nodes.remove(node_name)
+      del self._nodes[node_name]
     except KeyError:
       pass
 
@@ -458,20 +467,30 @@ class JobQueue(object):
       # TODO: Handle failing nodes
       logging.error("More than half of the nodes failed")
 
+  def _GetNodeIp(self):
+    """Helper for returning the node name/ip list.
+
+    """
+    name_list = self._nodes.keys()
+    addr_list = [self._nodes[name] for name in name_list]
+    return name_list, addr_list
+
   def _WriteAndReplicateFileUnlocked(self, file_name, data):
     """Writes a file locally and then replicates it to all nodes.
 
     """
     utils.WriteFile(file_name, data=data)
 
-    result = RpcRunner.call_jobqueue_update(self._nodes, file_name, data)
+    names, addrs = self._GetNodeIp()
+    result = RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
     self._CheckRpcResult(result, self._nodes,
                          "Updating %s" % file_name)
 
   def _RenameFileUnlocked(self, old, new):
     os.rename(old, new)
 
-    result = RpcRunner.call_jobqueue_rename(self._nodes, old, new)
+    names, addrs = self._GetNodeIp()
+    result = RpcRunner.call_jobqueue_rename(names, addrs, old, new)
     self._CheckRpcResult(result, self._nodes,
                          "Moving %s to %s" % (old, new))
 
diff --git a/lib/rpc.py b/lib/rpc.py
index dab37f8ad..0ac4324e7 100644
--- a/lib/rpc.py
+++ b/lib/rpc.py
@@ -941,14 +941,14 @@ class RpcRunner(object):
     return c.GetResults().get(node, False)
 
   @staticmethod
-  def call_jobqueue_update(node_list, file_name, content):
+  def call_jobqueue_update(node_list, address_list, file_name, content):
     """Update job queue.
 
     This is a multi-node call.
 
     """
     c = Client("jobqueue_update", [file_name, content])
-    c.ConnectList(node_list)
+    c.ConnectList(node_list, address_list=address_list)
     c.Run()
     result = c.GetResults()
     return result
@@ -966,14 +966,14 @@ class RpcRunner(object):
     return c.GetResults().get(node, False)
 
   @staticmethod
-  def call_jobqueue_rename(node_list, old, new):
+  def call_jobqueue_rename(node_list, address_list, old, new):
     """Rename a job queue file.
 
     This is a multi-node call.
 
     """
     c = Client("jobqueue_rename", [old, new])
-    c.ConnectList(node_list)
+    c.ConnectList(node_list, address_list=address_list)
     c.Run()
     result = c.GetResults()
     return result
-- 
GitLab