Commit ecfe9491 authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

Convert RPC module to new HTTP client

Currently, HttpClientManager is instantiated for every RPC call. This
will be changed with another patch, as will the use of SSL. The “Run”
method is no longer needed.

Reviewed-by: iustinp
parent 4607c978
......@@ -39,69 +39,7 @@ import simplejson
from ganeti import utils
from ganeti import objects
class NodeController:
"""Node-handling class.
For each node that we speak with, we create an instance of this
class, so that we have a safe place to store the details of this
individual call.
"""
def __init__(self, parent, node, address=None):
"""Constructor for the node controller.
@type parent: L{Client}
@param parent: the C{Client} instance which holds global parameters for
the call
@type node: str
@param node: the name of the node we connect to; it is used for error
messages and in cases we the address paramater is not passed
@type address: str
@keyword address: the node's address, in case we know it, so that we
don't need to resolve it; testing shows that httplib has high
overhead in resolving addresses (even when speficied in /etc/hosts)
"""
self.parent = parent
self.node = node
if address is None:
address = node
self.failed = False
self.http_conn = hc = httplib.HTTPConnection(address, parent.port)
try:
hc.connect()
hc.putrequest('PUT', "/%s" % parent.procedure,
skip_accept_encoding=True)
hc.putheader('Content-Length', parent.body_length)
hc.endheaders()
hc.send(parent.body)
except socket.error:
logging.exception("Error connecting to node %s", node)
self.failed = True
def GetResponse(self):
"""Try to process the response from the node.
"""
if self.failed:
# we already failed in connect
return False
resp = self.http_conn.getresponse()
if resp.status != 200:
return False
try:
length = int(resp.getheader('Content-Length', '0'))
except ValueError:
return False
if not length:
logging.error("Zero-length reply from node %s", self.node)
return False
payload = resp.read(length)
unload = simplejson.loads(payload)
return unload
from ganeti import http
class Client:
......@@ -115,25 +53,15 @@ class Client:
'False' result, which is not good. This overloading of values can
cause bugs.
@var body_length: cached string value of the length of the body (so that
individual C{NodeController} instances don't have to recompute it)
"""
result_set = False
result = False
allresult = []
def __init__(self, procedure, args):
self.port = utils.GetNodeDaemonPort()
self.nodepw = utils.GetNodeDaemonPassword()
self.nc = {}
self.results = {}
self.procedure = procedure
self.args = args
self.body = simplejson.dumps(args)
self.body_length = str(len(self.body))
#--- generic connector -------------
self.port = utils.GetNodeDaemonPort()
self.nodepw = utils.GetNodeDaemonPassword()
self.nc = {}
def ConnectList(self, node_list, address_list=None):
"""Add a list of nodes to the target nodes.
......@@ -162,23 +90,43 @@ class Client:
@keyword address: the node address, if known
"""
self.nc[name] = NodeController(self, name, address)
if address is None:
address = name
self.nc[name] = http.HttpClientRequest(address, self.port, http.HTTP_PUT,
"/%s" % self.procedure,
post_data=self.body)
def GetResults(self):
"""Return the results of the call.
"""Call nodes and return results.
@rtype: list
@returns: List of RPC results
"""
return self.results
# TODO: Shared and reused manager
mgr = http.HttpClientManager()
try:
mgr.ExecRequests(self.nc.values())
finally:
mgr.Shutdown()
def Run(self):
"""Gather results from the node controllers.
results = {}
This function simply calls GetResponse() for each of our node
controllers.
for name, req in self.nc.iteritems():
if req.success and req.resp_status == http.HTTP_OK:
results[name] = simplejson.loads(req.resp_body)
continue
"""
for node, nc in self.nc.items():
self.results[node] = nc.GetResponse()
if req.error:
msg = req.error
else:
msg = req.resp_body
logging.error("RPC error from node %s: %s", name, msg)
results[name] = False
return results
class RpcRunner(object):
......@@ -256,7 +204,6 @@ class RpcRunner(object):
"""
c = Client("volume_list", [vg_name])
self._ConnectList(c, node_list)
c.Run()
return c.GetResults()
def call_vg_list(self, node_list):
......@@ -267,7 +214,6 @@ class RpcRunner(object):
"""
c = Client("vg_list", [])
self._ConnectList(c, node_list)
c.Run()
return c.GetResults()
def call_bridges_exist(self, node, bridges_list):
......@@ -282,7 +228,6 @@ class RpcRunner(object):
"""
c = Client("bridges_exist", [bridges_list])
self._ConnectNode(c, node)
c.Run()
return c.GetResults().get(node, False)
def call_instance_start(self, node, instance, extra_args):
......@@ -293,7 +238,6 @@ class RpcRunner(object):
"""
c = Client("instance_start", [self._InstDict(instance), extra_args])
self._ConnectNode(c, node)
c.Run()
return c.GetResults().get(node, False)
def call_instance_shutdown(self, node, instance):
......@@ -304,7 +248,6 @@ class RpcRunner(object):
"""
c = Client("instance_shutdown", [self._InstDict(instance)])
self._ConnectNode(c, node)
c.Run()
return c.GetResults().get(node, False)
def call_instance_migrate(self, node, instance, target, live):
......@@ -325,7 +268,6 @@ class RpcRunner(object):
"""
c = Client("instance_migrate", [self._InstDict(instance), target, live])
self._ConnectNode(c, node)
c.Run()
return c.GetResults().get(node, False)
def call_instance_reboot(self, node, instance, reboot_type, extra_args):
......@@ -337,7 +279,6 @@ class RpcRunner(object):
c = Client("instance_reboot", [self._InstDict(instance),
reboot_type, extra_args])
self._ConnectNode(c, node)
c.Run()
return c.GetResults().get(node, False)
def call_instance_os_add(self, node, inst):
......@@ -349,7 +290,6 @@ class RpcRunner(object):
params = [self._InstDict(inst)]
c = Client("instance_os_add", params)
self._ConnectNode(c, node)
c.Run()
return c.GetResults().get(node, False)
def call_instance_run_rename(self, node, inst, old_name):
......@@ -361,7 +301,6 @@ class RpcRunner(object):
params = [self._InstDict(inst), old_name]
c = Client("instance_run_rename", params)
self._ConnectNode(c, node)
c.Run()
return c.GetResults().get(node, False)
def call_instance_info(self, node, instance, hname):
......@@ -379,7 +318,6 @@ class RpcRunner(object):
"""
c = Client("instance_info", [instance, hname])
self._ConnectNode(c, node)
c.Run()
return c.GetResults().get(node, False)
def call_all_instances_info(self, node_list, hypervisor_list):
......@@ -395,7 +333,6 @@ class RpcRunner(object):
"""
c = Client("all_instances_info", [hypervisor_list])
self._ConnectList(c, node_list)
c.Run()
return c.GetResults()
def call_instance_list(self, node_list, hypervisor_list):
......@@ -411,7 +348,6 @@ class RpcRunner(object):
"""
c = Client("instance_list", [hypervisor_list])
self._ConnectList(c, node_list)
c.Run()
return c.GetResults()
def call_node_tcp_ping(self, node, source, target, port, timeout,
......@@ -424,7 +360,6 @@ class RpcRunner(object):
c = Client("node_tcp_ping", [source, target, port, timeout,
live_port_needed])
self._ConnectNode(c, node)
c.Run()
return c.GetResults().get(node, False)
def call_node_has_ip_address(self, node, address):
......@@ -435,7 +370,6 @@ class RpcRunner(object):
"""
c = Client("node_has_ip_address", [address])
self._ConnectNode(c, node)
c.Run()
return c.GetResults().get(node, False)
def call_node_info(self, node_list, vg_name, hypervisor_type):
......@@ -458,7 +392,6 @@ class RpcRunner(object):
"""
c = Client("node_info", [vg_name, hypervisor_type])
self._ConnectList(c, node_list)
c.Run()
retux = c.GetResults()
for node_name in retux:
......@@ -486,7 +419,6 @@ class RpcRunner(object):
params = [dsa, dsapub, rsa, rsapub, ssh, sshpub]
c = Client("node_add", params)
self._ConnectNode(c, node)
c.Run()
return c.GetResults().get(node, False)
def call_node_verify(self, node_list, checkdict, cluster_name):
......@@ -497,7 +429,6 @@ class RpcRunner(object):
"""
c = Client("node_verify", [checkdict, cluster_name])
self._ConnectList(c, node_list)
c.Run()
return c.GetResults()
@staticmethod
......@@ -509,7 +440,6 @@ class RpcRunner(object):
"""
c = Client("node_start_master", [start_daemons])
c.ConnectNode(node)
c.Run()
return c.GetResults().get(node, False)
@staticmethod
......@@ -521,7 +451,6 @@ class RpcRunner(object):
"""
c = Client("node_stop_master", [stop_daemons])
c.ConnectNode(node)
c.Run()
return c.GetResults().get(node, False)
@staticmethod
......@@ -534,7 +463,6 @@ class RpcRunner(object):
# TODO: should this method query down nodes?
c = Client("master_info", [])
c.ConnectList(node_list)
c.Run()
return c.GetResults()
def call_version(self, node_list):
......@@ -545,7 +473,6 @@ class RpcRunner(object):
"""
c = Client("version", [])
self._ConnectList(c, node_list)
c.Run()
return c.GetResults()
def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
......@@ -557,7 +484,6 @@ class RpcRunner(object):
params = [bdev.ToDict(), size, owner, on_primary, info]
c = Client("blockdev_create", params)
self._ConnectNode(c, node)
c.Run()
return c.GetResults().get(node, False)
def call_blockdev_remove(self, node, bdev):
......@@ -568,7 +494,6 @@ class RpcRunner(object):
"""
c = Client("blockdev_remove", [bdev.ToDict()])
self._ConnectNode(c, node)
c.Run()
return c.GetResults().get(node, False)
def call_blockdev_rename(self, node, devlist):
......@@ -580,7 +505,6 @@ class RpcRunner(object):
params = [(d.ToDict(), uid) for d, uid in devlist]
c = Client("blockdev_rename", params)
self._ConnectNode(c, node)
c.Run()
return c.GetResults().get(node, False)
def call_blockdev_assemble(self, node, disk, owner, on_primary):
......@@ -592,7 +516,6 @@ class RpcRunner(object):
params = [disk.ToDict(), owner, on_primary]
c = Client("blockdev_assemble", params)
self._ConnectNode(c, node)
c.Run()
return c.GetResults().get(node, False)
def call_blockdev_shutdown(self, node, disk):
......@@ -603,7 +526,6 @@ class RpcRunner(object):
"""
c = Client("blockdev_shutdown", [disk.ToDict()])
self._ConnectNode(c, node)
c.Run()
return c.GetResults().get(node, False)
def call_blockdev_addchildren(self, node, bdev, ndevs):
......@@ -615,7 +537,6 @@ class RpcRunner(object):
params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
c = Client("blockdev_addchildren", params)
self._ConnectNode(c, node)
c.Run()
return c.GetResults().get(node, False)
def call_blockdev_removechildren(self, node, bdev, ndevs):
......@@ -627,7 +548,6 @@ class RpcRunner(object):
params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
c = Client("blockdev_removechildren", params)
self._ConnectNode(c, node)
c.Run()
return c.GetResults().get(node, False)
def call_blockdev_getmirrorstatus(self, node, disks):
......@@ -639,7 +559,6 @@ class RpcRunner(object):
params = [dsk.ToDict() for dsk in disks]
c = Client("blockdev_getmirrorstatus", params)
self._ConnectNode(c, node)
c.Run()
return c.GetResults().get(node, False)
def call_blockdev_find(self, node, disk):
......@@ -650,7 +569,6 @@ class RpcRunner(object):
"""
c = Client("blockdev_find", [disk.ToDict()])
self._ConnectNode(c, node)
c.Run()
return c.GetResults().get(node, False)
def call_blockdev_close(self, node, disks):
......@@ -662,7 +580,6 @@ class RpcRunner(object):
params = [cf.ToDict() for cf in disks]
c = Client("blockdev_close", params)
self._ConnectNode(c, node)
c.Run()
return c.GetResults().get(node, False)
@staticmethod
......@@ -693,7 +610,6 @@ class RpcRunner(object):
st.st_atime, st.st_mtime]
c = Client("upload_file", params)
c.ConnectList(node_list, address_list=address_list)
c.Run()
return c.GetResults()
def call_os_diagnose(self, node_list):
......@@ -704,7 +620,6 @@ class RpcRunner(object):
"""
c = Client("os_diagnose", [])
self._ConnectList(c, node_list)
c.Run()
result = c.GetResults()
new_result = {}
for node_name in result:
......@@ -723,7 +638,6 @@ class RpcRunner(object):
"""
c = Client("os_get", [name])
self._ConnectNode(c, node)
c.Run()
result = c.GetResults().get(node, False)
if isinstance(result, dict):
return objects.OS.FromDict(result)
......@@ -743,7 +657,6 @@ class RpcRunner(object):
params = [hpath, phase, env]
c = Client("hooks_runner", params)
self._ConnectList(c, node_list)
c.Run()
result = c.GetResults()
return result
......@@ -760,7 +673,6 @@ class RpcRunner(object):
params = [name, idata]
c = Client("iallocator_runner", params)
self._ConnectNode(c, node)
c.Run()
result = c.GetResults().get(node, False)
return result
......@@ -772,7 +684,6 @@ class RpcRunner(object):
"""
c = Client("blockdev_grow", [cf_bdev.ToDict(), amount])
self._ConnectNode(c, node)
c.Run()
return c.GetResults().get(node, False)
def call_blockdev_snapshot(self, node, cf_bdev):
......@@ -783,7 +694,6 @@ class RpcRunner(object):
"""
c = Client("blockdev_snapshot", [cf_bdev.ToDict()])
self._ConnectNode(c, node)
c.Run()
return c.GetResults().get(node, False)
def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
......@@ -797,7 +707,6 @@ class RpcRunner(object):
self._InstDict(instance), cluster_name, idx]
c = Client("snapshot_export", params)
self._ConnectNode(c, node)
c.Run()
return c.GetResults().get(node, False)
def call_finalize_export(self, node, instance, snap_disks):
......@@ -814,7 +723,6 @@ class RpcRunner(object):
params = [self._InstDict(instance), flat_disks]
c = Client("finalize_export", params)
self._ConnectNode(c, node)
c.Run()
return c.GetResults().get(node, False)
def call_export_info(self, node, path):
......@@ -825,7 +733,6 @@ class RpcRunner(object):
"""
c = Client("export_info", [path])
self._ConnectNode(c, node)
c.Run()
result = c.GetResults().get(node, False)
if not result:
return result
......@@ -841,7 +748,6 @@ class RpcRunner(object):
params = [self._InstDict(inst), src_node, src_images, cluster_name]
c = Client("instance_os_import", params)
self._ConnectNode(c, node)
c.Run()
return c.GetResults().get(node, False)
def call_export_list(self, node_list):
......@@ -852,7 +758,6 @@ class RpcRunner(object):
"""
c = Client("export_list", [])
self._ConnectList(c, node_list)
c.Run()
result = c.GetResults()
return result
......@@ -864,7 +769,6 @@ class RpcRunner(object):
"""
c = Client("export_remove", [export])
self._ConnectNode(c, node)
c.Run()
return c.GetResults().get(node, False)
@staticmethod
......@@ -879,7 +783,6 @@ class RpcRunner(object):
"""
c = Client("node_leave_cluster", [])
c.ConnectNode(node)
c.Run()
return c.GetResults().get(node, False)
def call_node_volumes(self, node_list):
......@@ -890,7 +793,6 @@ class RpcRunner(object):
"""
c = Client("node_volumes", [])
self._ConnectList(c, node_list)
c.Run()
return c.GetResults()
def call_test_delay(self, node_list, duration):
......@@ -901,7 +803,6 @@ class RpcRunner(object):
"""
c = Client("test_delay", [duration])
self._ConnectList(c, node_list)
c.Run()
return c.GetResults()
def call_file_storage_dir_create(self, node, file_storage_dir):
......@@ -912,7 +813,6 @@ class RpcRunner(object):
"""
c = Client("file_storage_dir_create", [file_storage_dir])
self._ConnectNode(c, node)
c.Run()
return c.GetResults().get(node, False)
def call_file_storage_dir_remove(self, node, file_storage_dir):
......@@ -923,7 +823,6 @@ class RpcRunner(object):
"""
c = Client("file_storage_dir_remove", [file_storage_dir])
self._ConnectNode(c, node)
c.Run()
return c.GetResults().get(node, False)
def call_file_storage_dir_rename(self, node, old_file_storage_dir,
......@@ -936,7 +835,6 @@ class RpcRunner(object):
c = Client("file_storage_dir_rename",
[old_file_storage_dir, new_file_storage_dir])
self._ConnectNode(c, node)
c.Run()
return c.GetResults().get(node, False)
@staticmethod
......@@ -948,7 +846,6 @@ class RpcRunner(object):
"""
c = Client("jobqueue_update", [file_name, content])
c.ConnectList(node_list, address_list=address_list)
c.Run()
result = c.GetResults()
return result
......@@ -961,7 +858,6 @@ class RpcRunner(object):
"""
c = Client("jobqueue_purge", [])
c.ConnectNode(node)
c.Run()
return c.GetResults().get(node, False)
@staticmethod
......@@ -973,7 +869,6 @@ class RpcRunner(object):
"""
c = Client("jobqueue_rename", [old, new])
c.ConnectList(node_list, address_list=address_list)
c.Run()
result = c.GetResults()
return result
......@@ -992,7 +887,6 @@ class RpcRunner(object):
"""
c = Client("jobqueue_set_drain", [drain_flag])
c.ConnectList(node_list)
c.Run()
result = c.GetResults()
return result
......@@ -1014,6 +908,5 @@ class RpcRunner(object):
hv_full = cluster.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
c = Client("hypervisor_validate_params", [hvname, hv_full])
self._ConnectList(c, node_list)
c.Run()
result = c.GetResults()
return result
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment