diff --git a/lib/rpc.py b/lib/rpc.py index 1ab4b7015cd84a12df0607a100577cae4ab265a2..20325030af702e072d3ec86d6cdd8a1540e9e8d8 100644 --- a/lib/rpc.py +++ b/lib/rpc.py @@ -39,69 +39,7 @@ import simplejson from ganeti import utils from ganeti import objects - - -class NodeController: - """Node-handling class. - - For each node that we speak with, we create an instance of this - class, so that we have a safe place to store the details of this - individual call. - - """ - def __init__(self, parent, node, address=None): - """Constructor for the node controller. - - @type parent: L{Client} - @param parent: the C{Client} instance which holds global parameters for - the call - @type node: str - @param node: the name of the node we connect to; it is used for error - messages and in cases we the address paramater is not passed - @type address: str - @keyword address: the node's address, in case we know it, so that we - don't need to resolve it; testing shows that httplib has high - overhead in resolving addresses (even when speficied in /etc/hosts) - - """ - self.parent = parent - self.node = node - if address is None: - address = node - self.failed = False - - self.http_conn = hc = httplib.HTTPConnection(address, parent.port) - try: - hc.connect() - hc.putrequest('PUT', "/%s" % parent.procedure, - skip_accept_encoding=True) - hc.putheader('Content-Length', parent.body_length) - hc.endheaders() - hc.send(parent.body) - except socket.error: - logging.exception("Error connecting to node %s", node) - self.failed = True - - def GetResponse(self): - """Try to process the response from the node. - - """ - if self.failed: - # we already failed in connect - return False - resp = self.http_conn.getresponse() - if resp.status != 200: - return False - try: - length = int(resp.getheader('Content-Length', '0')) - except ValueError: - return False - if not length: - logging.error("Zero-length reply from node %s", self.node) - return False - payload = resp.read(length) - unload = simplejson.loads(payload) - return unload +from ganeti import http class Client: @@ -115,25 +53,15 @@ class Client: 'False' result, which is not good. This overloading of values can cause bugs. - @var body_length: cached string value of the length of the body (so that - individual C{NodeController} instances don't have to recompute it) - """ - result_set = False - result = False - allresult = [] - def __init__(self, procedure, args): - self.port = utils.GetNodeDaemonPort() - self.nodepw = utils.GetNodeDaemonPassword() - self.nc = {} - self.results = {} self.procedure = procedure self.args = args self.body = simplejson.dumps(args) - self.body_length = str(len(self.body)) - #--- generic connector ------------- + self.port = utils.GetNodeDaemonPort() + self.nodepw = utils.GetNodeDaemonPassword() + self.nc = {} def ConnectList(self, node_list, address_list=None): """Add a list of nodes to the target nodes. @@ -162,23 +90,43 @@ class Client: @keyword address: the node address, if known """ - self.nc[name] = NodeController(self, name, address) + if address is None: + address = name + + self.nc[name] = http.HttpClientRequest(address, self.port, http.HTTP_PUT, + "/%s" % self.procedure, + post_data=self.body) def GetResults(self): - """Return the results of the call. + """Call nodes and return results. + + @rtype: list + @returns: List of RPC results """ - return self.results + # TODO: Shared and reused manager + mgr = http.HttpClientManager() + try: + mgr.ExecRequests(self.nc.values()) + finally: + mgr.Shutdown() - def Run(self): - """Gather results from the node controllers. + results = {} - This function simply calls GetResponse() for each of our node - controllers. + for name, req in self.nc.iteritems(): + if req.success and req.resp_status == http.HTTP_OK: + results[name] = simplejson.loads(req.resp_body) + continue - """ - for node, nc in self.nc.items(): - self.results[node] = nc.GetResponse() + if req.error: + msg = req.error + else: + msg = req.resp_body + + logging.error("RPC error from node %s: %s", name, msg) + results[name] = False + + return results class RpcRunner(object): @@ -256,7 +204,6 @@ class RpcRunner(object): """ c = Client("volume_list", [vg_name]) self._ConnectList(c, node_list) - c.Run() return c.GetResults() def call_vg_list(self, node_list): @@ -267,7 +214,6 @@ class RpcRunner(object): """ c = Client("vg_list", []) self._ConnectList(c, node_list) - c.Run() return c.GetResults() def call_bridges_exist(self, node, bridges_list): @@ -282,7 +228,6 @@ class RpcRunner(object): """ c = Client("bridges_exist", [bridges_list]) self._ConnectNode(c, node) - c.Run() return c.GetResults().get(node, False) def call_instance_start(self, node, instance, extra_args): @@ -293,7 +238,6 @@ class RpcRunner(object): """ c = Client("instance_start", [self._InstDict(instance), extra_args]) self._ConnectNode(c, node) - c.Run() return c.GetResults().get(node, False) def call_instance_shutdown(self, node, instance): @@ -304,7 +248,6 @@ class RpcRunner(object): """ c = Client("instance_shutdown", [self._InstDict(instance)]) self._ConnectNode(c, node) - c.Run() return c.GetResults().get(node, False) def call_instance_migrate(self, node, instance, target, live): @@ -325,7 +268,6 @@ class RpcRunner(object): """ c = Client("instance_migrate", [self._InstDict(instance), target, live]) self._ConnectNode(c, node) - c.Run() return c.GetResults().get(node, False) def call_instance_reboot(self, node, instance, reboot_type, extra_args): @@ -337,7 +279,6 @@ class RpcRunner(object): c = Client("instance_reboot", [self._InstDict(instance), reboot_type, extra_args]) self._ConnectNode(c, node) - c.Run() return c.GetResults().get(node, False) def call_instance_os_add(self, node, inst): @@ -349,7 +290,6 @@ class RpcRunner(object): params = [self._InstDict(inst)] c = Client("instance_os_add", params) self._ConnectNode(c, node) - c.Run() return c.GetResults().get(node, False) def call_instance_run_rename(self, node, inst, old_name): @@ -361,7 +301,6 @@ class RpcRunner(object): params = [self._InstDict(inst), old_name] c = Client("instance_run_rename", params) self._ConnectNode(c, node) - c.Run() return c.GetResults().get(node, False) def call_instance_info(self, node, instance, hname): @@ -379,7 +318,6 @@ class RpcRunner(object): """ c = Client("instance_info", [instance, hname]) self._ConnectNode(c, node) - c.Run() return c.GetResults().get(node, False) def call_all_instances_info(self, node_list, hypervisor_list): @@ -395,7 +333,6 @@ class RpcRunner(object): """ c = Client("all_instances_info", [hypervisor_list]) self._ConnectList(c, node_list) - c.Run() return c.GetResults() def call_instance_list(self, node_list, hypervisor_list): @@ -411,7 +348,6 @@ class RpcRunner(object): """ c = Client("instance_list", [hypervisor_list]) self._ConnectList(c, node_list) - c.Run() return c.GetResults() def call_node_tcp_ping(self, node, source, target, port, timeout, @@ -424,7 +360,6 @@ class RpcRunner(object): c = Client("node_tcp_ping", [source, target, port, timeout, live_port_needed]) self._ConnectNode(c, node) - c.Run() return c.GetResults().get(node, False) def call_node_has_ip_address(self, node, address): @@ -435,7 +370,6 @@ class RpcRunner(object): """ c = Client("node_has_ip_address", [address]) self._ConnectNode(c, node) - c.Run() return c.GetResults().get(node, False) def call_node_info(self, node_list, vg_name, hypervisor_type): @@ -458,7 +392,6 @@ class RpcRunner(object): """ c = Client("node_info", [vg_name, hypervisor_type]) self._ConnectList(c, node_list) - c.Run() retux = c.GetResults() for node_name in retux: @@ -486,7 +419,6 @@ class RpcRunner(object): params = [dsa, dsapub, rsa, rsapub, ssh, sshpub] c = Client("node_add", params) self._ConnectNode(c, node) - c.Run() return c.GetResults().get(node, False) def call_node_verify(self, node_list, checkdict, cluster_name): @@ -497,7 +429,6 @@ class RpcRunner(object): """ c = Client("node_verify", [checkdict, cluster_name]) self._ConnectList(c, node_list) - c.Run() return c.GetResults() @staticmethod @@ -509,7 +440,6 @@ class RpcRunner(object): """ c = Client("node_start_master", [start_daemons]) c.ConnectNode(node) - c.Run() return c.GetResults().get(node, False) @staticmethod @@ -521,7 +451,6 @@ class RpcRunner(object): """ c = Client("node_stop_master", [stop_daemons]) c.ConnectNode(node) - c.Run() return c.GetResults().get(node, False) @staticmethod @@ -534,7 +463,6 @@ class RpcRunner(object): # TODO: should this method query down nodes? c = Client("master_info", []) c.ConnectList(node_list) - c.Run() return c.GetResults() def call_version(self, node_list): @@ -545,7 +473,6 @@ class RpcRunner(object): """ c = Client("version", []) self._ConnectList(c, node_list) - c.Run() return c.GetResults() def call_blockdev_create(self, node, bdev, size, owner, on_primary, info): @@ -557,7 +484,6 @@ class RpcRunner(object): params = [bdev.ToDict(), size, owner, on_primary, info] c = Client("blockdev_create", params) self._ConnectNode(c, node) - c.Run() return c.GetResults().get(node, False) def call_blockdev_remove(self, node, bdev): @@ -568,7 +494,6 @@ class RpcRunner(object): """ c = Client("blockdev_remove", [bdev.ToDict()]) self._ConnectNode(c, node) - c.Run() return c.GetResults().get(node, False) def call_blockdev_rename(self, node, devlist): @@ -580,7 +505,6 @@ class RpcRunner(object): params = [(d.ToDict(), uid) for d, uid in devlist] c = Client("blockdev_rename", params) self._ConnectNode(c, node) - c.Run() return c.GetResults().get(node, False) def call_blockdev_assemble(self, node, disk, owner, on_primary): @@ -592,7 +516,6 @@ class RpcRunner(object): params = [disk.ToDict(), owner, on_primary] c = Client("blockdev_assemble", params) self._ConnectNode(c, node) - c.Run() return c.GetResults().get(node, False) def call_blockdev_shutdown(self, node, disk): @@ -603,7 +526,6 @@ class RpcRunner(object): """ c = Client("blockdev_shutdown", [disk.ToDict()]) self._ConnectNode(c, node) - c.Run() return c.GetResults().get(node, False) def call_blockdev_addchildren(self, node, bdev, ndevs): @@ -615,7 +537,6 @@ class RpcRunner(object): params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]] c = Client("blockdev_addchildren", params) self._ConnectNode(c, node) - c.Run() return c.GetResults().get(node, False) def call_blockdev_removechildren(self, node, bdev, ndevs): @@ -627,7 +548,6 @@ class RpcRunner(object): params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]] c = Client("blockdev_removechildren", params) self._ConnectNode(c, node) - c.Run() return c.GetResults().get(node, False) def call_blockdev_getmirrorstatus(self, node, disks): @@ -639,7 +559,6 @@ class RpcRunner(object): params = [dsk.ToDict() for dsk in disks] c = Client("blockdev_getmirrorstatus", params) self._ConnectNode(c, node) - c.Run() return c.GetResults().get(node, False) def call_blockdev_find(self, node, disk): @@ -650,7 +569,6 @@ class RpcRunner(object): """ c = Client("blockdev_find", [disk.ToDict()]) self._ConnectNode(c, node) - c.Run() return c.GetResults().get(node, False) def call_blockdev_close(self, node, disks): @@ -662,7 +580,6 @@ class RpcRunner(object): params = [cf.ToDict() for cf in disks] c = Client("blockdev_close", params) self._ConnectNode(c, node) - c.Run() return c.GetResults().get(node, False) @staticmethod @@ -693,7 +610,6 @@ class RpcRunner(object): st.st_atime, st.st_mtime] c = Client("upload_file", params) c.ConnectList(node_list, address_list=address_list) - c.Run() return c.GetResults() def call_os_diagnose(self, node_list): @@ -704,7 +620,6 @@ class RpcRunner(object): """ c = Client("os_diagnose", []) self._ConnectList(c, node_list) - c.Run() result = c.GetResults() new_result = {} for node_name in result: @@ -723,7 +638,6 @@ class RpcRunner(object): """ c = Client("os_get", [name]) self._ConnectNode(c, node) - c.Run() result = c.GetResults().get(node, False) if isinstance(result, dict): return objects.OS.FromDict(result) @@ -743,7 +657,6 @@ class RpcRunner(object): params = [hpath, phase, env] c = Client("hooks_runner", params) self._ConnectList(c, node_list) - c.Run() result = c.GetResults() return result @@ -760,7 +673,6 @@ class RpcRunner(object): params = [name, idata] c = Client("iallocator_runner", params) self._ConnectNode(c, node) - c.Run() result = c.GetResults().get(node, False) return result @@ -772,7 +684,6 @@ class RpcRunner(object): """ c = Client("blockdev_grow", [cf_bdev.ToDict(), amount]) self._ConnectNode(c, node) - c.Run() return c.GetResults().get(node, False) def call_blockdev_snapshot(self, node, cf_bdev): @@ -783,7 +694,6 @@ class RpcRunner(object): """ c = Client("blockdev_snapshot", [cf_bdev.ToDict()]) self._ConnectNode(c, node) - c.Run() return c.GetResults().get(node, False) def call_snapshot_export(self, node, snap_bdev, dest_node, instance, @@ -797,7 +707,6 @@ class RpcRunner(object): self._InstDict(instance), cluster_name, idx] c = Client("snapshot_export", params) self._ConnectNode(c, node) - c.Run() return c.GetResults().get(node, False) def call_finalize_export(self, node, instance, snap_disks): @@ -814,7 +723,6 @@ class RpcRunner(object): params = [self._InstDict(instance), flat_disks] c = Client("finalize_export", params) self._ConnectNode(c, node) - c.Run() return c.GetResults().get(node, False) def call_export_info(self, node, path): @@ -825,7 +733,6 @@ class RpcRunner(object): """ c = Client("export_info", [path]) self._ConnectNode(c, node) - c.Run() result = c.GetResults().get(node, False) if not result: return result @@ -841,7 +748,6 @@ class RpcRunner(object): params = [self._InstDict(inst), src_node, src_images, cluster_name] c = Client("instance_os_import", params) self._ConnectNode(c, node) - c.Run() return c.GetResults().get(node, False) def call_export_list(self, node_list): @@ -852,7 +758,6 @@ class RpcRunner(object): """ c = Client("export_list", []) self._ConnectList(c, node_list) - c.Run() result = c.GetResults() return result @@ -864,7 +769,6 @@ class RpcRunner(object): """ c = Client("export_remove", [export]) self._ConnectNode(c, node) - c.Run() return c.GetResults().get(node, False) @staticmethod @@ -879,7 +783,6 @@ class RpcRunner(object): """ c = Client("node_leave_cluster", []) c.ConnectNode(node) - c.Run() return c.GetResults().get(node, False) def call_node_volumes(self, node_list): @@ -890,7 +793,6 @@ class RpcRunner(object): """ c = Client("node_volumes", []) self._ConnectList(c, node_list) - c.Run() return c.GetResults() def call_test_delay(self, node_list, duration): @@ -901,7 +803,6 @@ class RpcRunner(object): """ c = Client("test_delay", [duration]) self._ConnectList(c, node_list) - c.Run() return c.GetResults() def call_file_storage_dir_create(self, node, file_storage_dir): @@ -912,7 +813,6 @@ class RpcRunner(object): """ c = Client("file_storage_dir_create", [file_storage_dir]) self._ConnectNode(c, node) - c.Run() return c.GetResults().get(node, False) def call_file_storage_dir_remove(self, node, file_storage_dir): @@ -923,7 +823,6 @@ class RpcRunner(object): """ c = Client("file_storage_dir_remove", [file_storage_dir]) self._ConnectNode(c, node) - c.Run() return c.GetResults().get(node, False) def call_file_storage_dir_rename(self, node, old_file_storage_dir, @@ -936,7 +835,6 @@ class RpcRunner(object): c = Client("file_storage_dir_rename", [old_file_storage_dir, new_file_storage_dir]) self._ConnectNode(c, node) - c.Run() return c.GetResults().get(node, False) @staticmethod @@ -948,7 +846,6 @@ class RpcRunner(object): """ c = Client("jobqueue_update", [file_name, content]) c.ConnectList(node_list, address_list=address_list) - c.Run() result = c.GetResults() return result @@ -961,7 +858,6 @@ class RpcRunner(object): """ c = Client("jobqueue_purge", []) c.ConnectNode(node) - c.Run() return c.GetResults().get(node, False) @staticmethod @@ -973,7 +869,6 @@ class RpcRunner(object): """ c = Client("jobqueue_rename", [old, new]) c.ConnectList(node_list, address_list=address_list) - c.Run() result = c.GetResults() return result @@ -992,7 +887,6 @@ class RpcRunner(object): """ c = Client("jobqueue_set_drain", [drain_flag]) c.ConnectList(node_list) - c.Run() result = c.GetResults() return result @@ -1014,6 +908,5 @@ class RpcRunner(object): hv_full = cluster.FillDict(cluster.hvparams.get(hvname, {}), hvparams) c = Client("hypervisor_validate_params", [hvname, hv_full]) self._ConnectList(c, node_list) - c.Run() result = c.GetResults() return result