diff --git a/daemons/ganeti-masterd b/daemons/ganeti-masterd index 13fa25d3ad625a8b037a01fdfcf0af2f2d5edff4..d50909d87b1d6a4969ee80096e6db8e26e7c47c3 100755 --- a/daemons/ganeti-masterd +++ b/daemons/ganeti-masterd @@ -317,7 +317,7 @@ class GanetiContext(object): self.cfg.AddNode(node) # If preseeding fails it'll not be added - self.jobqueue.AddNode(node.name) + self.jobqueue.AddNode(node) # Add the new node to the Ganeti Lock Manager self.glm.add(locking.LEVEL_NODE, node.name) @@ -327,7 +327,7 @@ class GanetiContext(object): """ # Synchronize the queue again - self.jobqueue.AddNode(node.name) + self.jobqueue.AddNode(node) def RemoveNode(self, name): """Removes a node from the configuration and lock manager. diff --git a/lib/jqueue.py b/lib/jqueue.py index 32af60a34fb0296f97277e3e4265d642e54f5d56..86c3d863340aeb01601afe8fbdeda7ff0a89aff2 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -364,11 +364,12 @@ class JobQueue(object): " check in jstore and here") # Get initial list of nodes - self._nodes = set(self.context.cfg.GetNodeList()) + self._nodes = dict((n.name, n.primary_ip) + for n in self.context.cfg.GetAllNodesInfo().values()) # Remove master node try: - self._nodes.remove(self._my_hostname) + del self._nodes[self._my_hostname] except ValueError: pass @@ -405,7 +406,14 @@ class JobQueue(object): @utils.LockedMethod @_RequireOpenQueue - def AddNode(self, node_name): + def AddNode(self, node): + """Register a new node with the queue. + + @type node: L{objects.Node} + @param node: the node object to be added + + """ + node_name = node.name assert node_name != self._my_hostname # Clean queue directory on added node @@ -425,18 +433,19 @@ class JobQueue(object): finally: fd.close() - result = RpcRunner.call_jobqueue_update([node_name], file_name, content) + result = RpcRunner.call_jobqueue_update([node_name], [node.primary_ip], + file_name, content) if not result[node_name]: logging.error("Failed to upload %s to %s", file_name, node_name) - self._nodes.add(node_name) + self._nodes[node_name] = node.primary_ip @utils.LockedMethod @_RequireOpenQueue def RemoveNode(self, node_name): try: # The queue is removed by the "leave node" RPC call. - self._nodes.remove(node_name) + del self._nodes[node_name] except KeyError: pass @@ -458,20 +467,30 @@ class JobQueue(object): # TODO: Handle failing nodes logging.error("More than half of the nodes failed") + def _GetNodeIp(self): + """Helper for returning the node name/ip list. + + """ + name_list = self._nodes.keys() + addr_list = [self._nodes[name] for name in name_list] + return name_list, addr_list + def _WriteAndReplicateFileUnlocked(self, file_name, data): """Writes a file locally and then replicates it to all nodes. """ utils.WriteFile(file_name, data=data) - result = RpcRunner.call_jobqueue_update(self._nodes, file_name, data) + names, addrs = self._GetNodeIp() + result = RpcRunner.call_jobqueue_update(names, addrs, file_name, data) self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name) def _RenameFileUnlocked(self, old, new): os.rename(old, new) - result = RpcRunner.call_jobqueue_rename(self._nodes, old, new) + names, addrs = self._GetNodeIp() + result = RpcRunner.call_jobqueue_rename(names, addrs, old, new) self._CheckRpcResult(result, self._nodes, "Moving %s to %s" % (old, new)) diff --git a/lib/rpc.py b/lib/rpc.py index dab37f8ad17924b833da22297bc0890fcc892b56..0ac4324e7917a78b29e7dc95b68e1db3766d11e2 100644 --- a/lib/rpc.py +++ b/lib/rpc.py @@ -941,14 +941,14 @@ class RpcRunner(object): return c.GetResults().get(node, False) @staticmethod - def call_jobqueue_update(node_list, file_name, content): + def call_jobqueue_update(node_list, address_list, file_name, content): """Update job queue. This is a multi-node call. """ c = Client("jobqueue_update", [file_name, content]) - c.ConnectList(node_list) + c.ConnectList(node_list, address_list=address_list) c.Run() result = c.GetResults() return result @@ -966,14 +966,14 @@ class RpcRunner(object): return c.GetResults().get(node, False) @staticmethod - def call_jobqueue_rename(node_list, old, new): + def call_jobqueue_rename(node_list, address_list, old, new): """Rename a job queue file. This is a multi-node call. """ c = Client("jobqueue_rename", [old, new]) - c.ConnectList(node_list) + c.ConnectList(node_list, address_list=address_list) c.Run() result = c.GetResults() return result