From 72737a7f7641e4277401fe154e3d75b61623d013 Mon Sep 17 00:00:00 2001 From: Iustin Pop <iustin@google.com> Date: Fri, 10 Oct 2008 09:55:02 +0000 Subject: [PATCH] Convert rpc module to RpcRunner This big patch changes the call model used in internode-rpc from standalong function calls in the rpc module to via a RpcRunner class, that holds all the methods. This can be used in the future to enable smarter processing in the RPC layer itself (some quick examples are not setting the DiskID from cmdlib code, but only once in each rpc call, etc.). There are a few RPC calls that are made outside of the LU code, and these calls are left as staticmethods, so they can be used without a class instance (which requires a ConfigWriter instance). Reviewed-by: imsnah --- daemons/ganeti-masterd | 4 +- lib/bootstrap.py | 15 +- lib/cmdlib.py | 288 ++++----- lib/config.py | 2 +- lib/jqueue.py | 9 +- lib/mcpu.py | 5 +- lib/rpc.py | 1104 +++++++++++++++++---------------- test/ganeti.hooks_unittest.py | 4 +- 8 files changed, 748 insertions(+), 683 deletions(-) diff --git a/daemons/ganeti-masterd b/daemons/ganeti-masterd index f49bc43c1..06886f3f4 100755 --- a/daemons/ganeti-masterd +++ b/daemons/ganeti-masterd @@ -381,7 +381,7 @@ def CheckAgreement(): # either single node cluster, or a misconfiguration, but I won't # break any other node, so I can proceed return True - results = rpc.call_master_info(node_list) + results = rpc.RpcRunner.call_master_info(node_list) if not isinstance(results, dict): # this should not happen (unless internal error in rpc) logging.critical("Can't complete rpc call, aborting master startup") @@ -445,7 +445,7 @@ def main(): # activate ip master_node = ssconf.SimpleConfigReader().GetMasterNode() - if not rpc.call_node_start_master(master_node, False): + if not rpc.RpcRunner.call_node_start_master(master_node, False): logging.error("Can't activate master IP address") master.setup_queue() diff --git a/lib/bootstrap.py b/lib/bootstrap.py index 45cd260e7..b69819a17 100644 --- a/lib/bootstrap.py +++ b/lib/bootstrap.py @@ -38,6 +38,7 @@ from ganeti import constants from ganeti import objects from ganeti import ssconf +from ganeti.rpc import RpcRunner def _InitSSHSetup(node): """Setup the SSH configuration for the cluster. @@ -236,7 +237,7 @@ def InitCluster(cluster_name, hypervisor_type, mac_prefix, def_bridge, # start the master ip # TODO: Review rpc call from bootstrap - rpc.call_node_start_master(hostname.name, True) + RpcRunner.call_node_start_master(hostname.name, True) def InitConfig(version, cluster_config, master_node_config, @@ -281,9 +282,9 @@ def FinalizeClusterDestroy(master): begun in cmdlib.LUDestroyOpcode. """ - if not rpc.call_node_stop_master(master, True): + if not RpcRunner.call_node_stop_master(master, True): logging.warning("Could not disable the master role") - if not rpc.call_node_leave_cluster(master): + if not RpcRunner.call_node_leave_cluster(master): logging.warning("Could not shutdown the node daemon and cleanup the node") @@ -365,7 +366,7 @@ def MasterFailover(): logging.info("setting master to %s, old master: %s", new_master, old_master) - if not rpc.call_node_stop_master(old_master, True): + if not RpcRunner.call_node_stop_master(old_master, True): logging.error("could disable the master role on the old master" " %s, please disable manually", old_master) @@ -374,12 +375,12 @@ def MasterFailover(): # Here we have a phase where no master should be running - if not rpc.call_upload_file(cfg.GetNodeList(), - constants.CLUSTER_CONF_FILE): + if not RpcRunner.call_upload_file(cfg.GetNodeList(), + constants.CLUSTER_CONF_FILE): logging.error("could not distribute the new simple store master file" " to the other nodes, please check.") - if not rpc.call_node_start_master(new_master, True): + if not RpcRunner.call_node_start_master(new_master, True): logging.error("could not start the master role on the new master" " %s, please check", new_master) rcode = 1 diff --git a/lib/cmdlib.py b/lib/cmdlib.py index f947f5e5a..33da97958 100644 --- a/lib/cmdlib.py +++ b/lib/cmdlib.py @@ -32,7 +32,6 @@ import re import platform import logging -from ganeti import rpc from ganeti import ssh from ganeti import logger from ganeti import utils @@ -67,7 +66,7 @@ class LogicalUnit(object): REQ_MASTER = True REQ_BGL = True - def __init__(self, processor, op, context): + def __init__(self, processor, op, context, rpc): """Constructor for LogicalUnit. This needs to be overriden in derived classes in order to check op @@ -78,6 +77,7 @@ class LogicalUnit(object): self.op = op self.cfg = context.cfg self.context = context + self.rpc = rpc # Dicts used to declare locking needs to mcpu self.needed_locks = None self.acquired_locks = {} @@ -448,7 +448,7 @@ def _CheckInstanceBridgesExist(lu, instance): """ # check bridges existance brlist = [nic.bridge for nic in instance.nics] - if not rpc.call_bridges_exist(instance.primary_node, brlist): + if not lu.rpc.call_bridges_exist(instance.primary_node, brlist): raise errors.OpPrereqError("one or more target bridges %s does not" " exist on destination node '%s'" % (brlist, instance.primary_node)) @@ -484,7 +484,7 @@ class LUDestroyCluster(NoHooksLU): """ master = self.cfg.GetMasterNode() - if not rpc.call_node_stop_master(master, False): + if not self.rpc.call_node_stop_master(master, False): raise errors.OpExecError("Could not disable the master role") priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS) utils.CreateBackup(priv_key) @@ -742,9 +742,9 @@ class LUVerifyCluster(LogicalUnit): local_checksums = utils.FingerprintFiles(file_names) feedback_fn("* Gathering data (%d nodes)" % len(nodelist)) - all_volumeinfo = rpc.call_volume_list(nodelist, vg_name) - all_instanceinfo = rpc.call_instance_list(nodelist, hypervisors) - all_vglist = rpc.call_vg_list(nodelist) + all_volumeinfo = self.rpc.call_volume_list(nodelist, vg_name) + all_instanceinfo = self.rpc.call_instance_list(nodelist, hypervisors) + all_vglist = self.rpc.call_vg_list(nodelist) node_verify_param = { 'filelist': file_names, 'nodelist': nodelist, @@ -752,11 +752,11 @@ class LUVerifyCluster(LogicalUnit): 'node-net-test': [(node.name, node.primary_ip, node.secondary_ip) for node in nodeinfo] } - all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param, - self.cfg.GetClusterName()) - all_rversion = rpc.call_version(nodelist) - all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName(), - self.cfg.GetHypervisorType()) + all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param, + self.cfg.GetClusterName()) + all_rversion = self.rpc.call_version(nodelist) + all_ninfo = self.rpc.call_node_info(nodelist, self.cfg.GetVGName(), + self.cfg.GetHypervisorType()) for node in nodelist: feedback_fn("* Verifying node %s" % node) @@ -970,7 +970,7 @@ class LUVerifyDisks(NoHooksLU): if not nv_dict: return result - node_lvs = rpc.call_volume_list(nodes, vg_name) + node_lvs = self.rpc.call_volume_list(nodes, vg_name) to_act = set() for node in nodes: @@ -1051,7 +1051,7 @@ class LURenameCluster(LogicalUnit): # shutdown the master IP master = self.cfg.GetMasterNode() - if not rpc.call_node_stop_master(master, False): + if not self.rpc.call_node_stop_master(master, False): raise errors.OpExecError("Could not disable the master role") try: @@ -1069,13 +1069,13 @@ class LURenameCluster(LogicalUnit): logger.Debug("Copying updated ssconf data to all nodes") for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]: fname = ss.KeyToFilename(keyname) - result = rpc.call_upload_file(dist_nodes, fname) + result = self.rpc.call_upload_file(dist_nodes, fname) for to_node in dist_nodes: if not result[to_node]: logger.Error("copy of file %s to node %s failed" % (fname, to_node)) finally: - if not rpc.call_node_start_master(master, False): + if not self.rpc.call_node_start_master(master, False): logger.Error("Could not re-enable the master role on the master," " please restart manually.") @@ -1145,7 +1145,7 @@ class LUSetClusterParams(LogicalUnit): # if vg_name not None, checks given volume group on all nodes if self.op.vg_name: node_list = self.acquired_locks[locking.LEVEL_NODE] - vglist = rpc.call_vg_list(node_list) + vglist = self.rpc.call_vg_list(node_list) for node in node_list: vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name, constants.MIN_VG_SIZE) @@ -1184,7 +1184,7 @@ def _WaitForSync(lu, instance, oneshot=False, unlock=False): max_time = 0 done = True cumul_degraded = False - rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks) + rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks) if not rstats: lu.proc.LogWarning("Can't get any data from node %s" % node) retries += 1 @@ -1238,7 +1238,7 @@ def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False): result = True if on_primary or dev.AssembleOnSecondary(): - rstats = rpc.call_blockdev_find(node, dev) + rstats = lu.rpc.call_blockdev_find(node, dev) if not rstats: logger.ToStderr("Node %s: Disk degraded, not found or node down" % node) result = False @@ -1313,7 +1313,7 @@ class LUDiagnoseOS(NoHooksLU): """ node_list = self.acquired_locks[locking.LEVEL_NODE] - node_data = rpc.call_os_diagnose(node_list) + node_data = self.rpc.call_os_diagnose(node_list) if node_data == False: raise errors.OpExecError("Can't gather the list of OSes") pol = self._DiagnoseByOS(node_list, node_data) @@ -1403,7 +1403,7 @@ class LURemoveNode(LogicalUnit): self.context.RemoveNode(node.name) - rpc.call_node_leave_cluster(node.name) + self.rpc.call_node_leave_cluster(node.name) class LUQueryNodes(NoHooksLU): @@ -1475,8 +1475,8 @@ class LUQueryNodes(NoHooksLU): if self.dynamic_fields.intersection(self.op.output_fields): live_data = {} - node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName(), - self.cfg.GetHypervisorType()) + node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(), + self.cfg.GetHypervisorType()) for name in nodenames: nodeinfo = node_data.get(name, None) if nodeinfo: @@ -1577,7 +1577,7 @@ class LUQueryNodeVolumes(NoHooksLU): """ nodenames = self.nodes - volumes = rpc.call_node_volumes(nodenames) + volumes = self.rpc.call_node_volumes(nodenames) ilist = [self.cfg.GetInstanceInfo(iname) for iname in self.cfg.GetInstanceList()] @@ -1732,7 +1732,7 @@ class LUAddNode(LogicalUnit): node = new_node.name # check connectivity - result = rpc.call_version([node])[node] + result = self.rpc.call_version([node])[node] if result: if constants.PROTOCOL_VERSION == result: logger.Info("communication to node %s fine, sw version %s match" % @@ -1759,8 +1759,9 @@ class LUAddNode(LogicalUnit): finally: f.close() - result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2], - keyarray[3], keyarray[4], keyarray[5]) + result = self.rpc.call_node_add(node, keyarray[0], keyarray[1], + keyarray[2], + keyarray[3], keyarray[4], keyarray[5]) if not result: raise errors.OpExecError("Cannot transfer ssh keys to the new node") @@ -1769,11 +1770,11 @@ class LUAddNode(LogicalUnit): utils.AddHostToEtcHosts(new_node.name) if new_node.secondary_ip != new_node.primary_ip: - if not rpc.call_node_tcp_ping(new_node.name, - constants.LOCALHOST_IP_ADDRESS, - new_node.secondary_ip, - constants.DEFAULT_NODED_PORT, - 10, False): + if not self.rpc.call_node_tcp_ping(new_node.name, + constants.LOCALHOST_IP_ADDRESS, + new_node.secondary_ip, + constants.DEFAULT_NODED_PORT, + 10, False): raise errors.OpExecError("Node claims it doesn't have the secondary ip" " you gave (%s). Please fix and re-run this" " command." % new_node.secondary_ip) @@ -1784,8 +1785,8 @@ class LUAddNode(LogicalUnit): # TODO: do a node-net-test as well? } - result = rpc.call_node_verify(node_verify_list, node_verify_param, - self.cfg.GetClusterName()) + result = self.rpc.call_node_verify(node_verify_list, node_verify_param, + self.cfg.GetClusterName()) for verifier in node_verify_list: if not result[verifier]: raise errors.OpExecError("Cannot communicate with %s's node daemon" @@ -1807,7 +1808,7 @@ class LUAddNode(LogicalUnit): logger.Debug("Copying hosts and known_hosts to all nodes") for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE): - result = rpc.call_upload_file(dist_nodes, fname) + result = self.rpc.call_upload_file(dist_nodes, fname) for to_node in dist_nodes: if not result[to_node]: logger.Error("copy of file %s to node %s failed" % @@ -1817,7 +1818,7 @@ class LUAddNode(LogicalUnit): if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors: to_copy.append(constants.VNC_PASSWORD_FILE) for fname in to_copy: - result = rpc.call_upload_file([node], fname) + result = self.rpc.call_upload_file([node], fname) if not result[node]: logger.Error("could not copy file %s to node %s" % (fname, node)) @@ -1968,7 +1969,7 @@ def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False): for inst_disk in instance.disks: for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node): lu.cfg.SetDiskID(node_disk, node) - result = rpc.call_blockdev_assemble(node, node_disk, iname, False) + result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False) if not result: logger.Error("could not prepare block device %s on node %s" " (is_primary=False, pass=1)" % (inst_disk.iv_name, node)) @@ -1983,7 +1984,7 @@ def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False): if node != instance.primary_node: continue lu.cfg.SetDiskID(node_disk, node) - result = rpc.call_blockdev_assemble(node, node_disk, iname, True) + result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True) if not result: logger.Error("could not prepare block device %s on node %s" " (is_primary=True, pass=2)" % (inst_disk.iv_name, node)) @@ -2054,8 +2055,8 @@ def _SafeShutdownInstanceDisks(lu, instance): _ShutdownInstanceDisks. """ - ins_l = rpc.call_instance_list([instance.primary_node], - [instance.hypervisor]) + ins_l = lu.rpc.call_instance_list([instance.primary_node], + [instance.hypervisor]) ins_l = ins_l[instance.primary_node] if not type(ins_l) is list: raise errors.OpExecError("Can't contact node '%s'" % @@ -2081,7 +2082,7 @@ def _ShutdownInstanceDisks(lu, instance, ignore_primary=False): for disk in instance.disks: for node, top_disk in disk.ComputeNodeTree(instance.primary_node): lu.cfg.SetDiskID(top_disk, node) - if not rpc.call_blockdev_shutdown(node, top_disk): + if not lu.rpc.call_blockdev_shutdown(node, top_disk): logger.Error("could not shutdown block device %s on node %s" % (disk.iv_name, node)) if not ignore_primary or node != instance.primary_node: @@ -2111,7 +2112,7 @@ def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor): we cannot check the node """ - nodeinfo = rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor) + nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor) if not nodeinfo or not isinstance(nodeinfo, dict): raise errors.OpPrereqError("Could not contact node %s for resource" " information" % (node,)) @@ -2189,7 +2190,7 @@ class LUStartupInstance(LogicalUnit): _StartInstanceDisks(self, instance, force) - if not rpc.call_instance_start(node_current, instance, extra_args): + if not self.rpc.call_instance_start(node_current, instance, extra_args): _ShutdownInstanceDisks(self, instance) raise errors.OpExecError("Could not start instance") @@ -2260,15 +2261,15 @@ class LURebootInstance(LogicalUnit): if reboot_type in [constants.INSTANCE_REBOOT_SOFT, constants.INSTANCE_REBOOT_HARD]: - if not rpc.call_instance_reboot(node_current, instance, - reboot_type, extra_args): + if not self.rpc.call_instance_reboot(node_current, instance, + reboot_type, extra_args): raise errors.OpExecError("Could not reboot instance") else: - if not rpc.call_instance_shutdown(node_current, instance): + if not self.rpc.call_instance_shutdown(node_current, instance): raise errors.OpExecError("could not shutdown instance for full reboot") _ShutdownInstanceDisks(self, instance) _StartInstanceDisks(self, instance, ignore_secondaries) - if not rpc.call_instance_start(node_current, instance, extra_args): + if not self.rpc.call_instance_start(node_current, instance, extra_args): _ShutdownInstanceDisks(self, instance) raise errors.OpExecError("Could not start instance for full reboot") @@ -2321,7 +2322,7 @@ class LUShutdownInstance(LogicalUnit): instance = self.instance node_current = instance.primary_node self.cfg.MarkInstanceDown(instance.name) - if not rpc.call_instance_shutdown(node_current, instance): + if not self.rpc.call_instance_shutdown(node_current, instance): logger.Error("could not shutdown instance") _ShutdownInstanceDisks(self, instance) @@ -2372,8 +2373,9 @@ class LUReinstallInstance(LogicalUnit): if instance.status != "down": raise errors.OpPrereqError("Instance '%s' is marked to be up" % self.op.instance_name) - remote_info = rpc.call_instance_info(instance.primary_node, instance.name, - instance.hypervisor) + remote_info = self.rpc.call_instance_info(instance.primary_node, + instance.name, + instance.hypervisor) if remote_info: raise errors.OpPrereqError("Instance '%s' is running on the node %s" % (self.op.instance_name, @@ -2387,7 +2389,7 @@ class LUReinstallInstance(LogicalUnit): if pnode is None: raise errors.OpPrereqError("Primary node '%s' is unknown" % self.op.pnode) - os_obj = rpc.call_os_get(pnode.name, self.op.os_type) + os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type) if not os_obj: raise errors.OpPrereqError("OS '%s' not in supported OS list for" " primary node" % self.op.os_type) @@ -2408,7 +2410,8 @@ class LUReinstallInstance(LogicalUnit): _StartInstanceDisks(self, inst, None) try: feedback_fn("Running the instance OS create scripts...") - if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"): + if not self.rpc.call_instance_os_add(inst.primary_node, inst, + "sda", "sdb"): raise errors.OpExecError("Could not install OS for instance %s" " on node %s" % (inst.name, inst.primary_node)) @@ -2450,8 +2453,9 @@ class LURenameInstance(LogicalUnit): if instance.status != "down": raise errors.OpPrereqError("Instance '%s' is marked to be up" % self.op.instance_name) - remote_info = rpc.call_instance_info(instance.primary_node, instance.name, - instance.hypervisor) + remote_info = self.rpc.call_instance_info(instance.primary_node, + instance.name, + instance.hypervisor) if remote_info: raise errors.OpPrereqError("Instance '%s' is running on the node %s" % (self.op.instance_name, @@ -2493,9 +2497,9 @@ class LURenameInstance(LogicalUnit): if inst.disk_template == constants.DT_FILE: new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1]) - result = rpc.call_file_storage_dir_rename(inst.primary_node, - old_file_storage_dir, - new_file_storage_dir) + result = self.rpc.call_file_storage_dir_rename(inst.primary_node, + old_file_storage_dir, + new_file_storage_dir) if not result: raise errors.OpExecError("Could not connect to node '%s' to rename" @@ -2512,8 +2516,9 @@ class LURenameInstance(LogicalUnit): _StartInstanceDisks(self, inst, None) try: - if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name, - "sda", "sdb"): + if not self.rpc.call_instance_run_rename(inst.primary_node, inst, + old_name, + "sda", "sdb"): msg = ("Could not run OS rename script for instance %s on node %s" " (but the instance has been renamed in Ganeti)" % (inst.name, inst.primary_node)) @@ -2568,7 +2573,7 @@ class LURemoveInstance(LogicalUnit): logger.Info("shutting down instance %s on node %s" % (instance.name, instance.primary_node)) - if not rpc.call_instance_shutdown(instance.primary_node, instance): + if not self.rpc.call_instance_shutdown(instance.primary_node, instance): if self.op.ignore_failures: feedback_fn("Warning: can't shutdown instance") else: @@ -2664,7 +2669,7 @@ class LUQueryInstances(NoHooksLU): bad_nodes = [] if self.dynamic_fields.intersection(self.op.output_fields): live_data = {} - node_data = rpc.call_all_instances_info(nodes, hv_list) + node_data = self.rpc.call_all_instances_info(nodes, hv_list) for name in nodes: result = node_data[name] if result: @@ -2820,7 +2825,7 @@ class LUFailoverInstance(LogicalUnit): # check bridge existance brlist = [nic.bridge for nic in instance.nics] - if not rpc.call_bridges_exist(target_node, brlist): + if not self.rpc.call_bridges_exist(target_node, brlist): raise errors.OpPrereqError("One or more target bridges %s does not" " exist on destination node '%s'" % (brlist, target_node)) @@ -2849,7 +2854,7 @@ class LUFailoverInstance(LogicalUnit): logger.Info("Shutting down instance %s on node %s" % (instance.name, source_node)) - if not rpc.call_instance_shutdown(source_node, instance): + if not self.rpc.call_instance_shutdown(source_node, instance): if self.op.ignore_consistency: logger.Error("Could not shutdown instance %s on node %s. Proceeding" " anyway. Please make sure node %s is down" % @@ -2879,7 +2884,7 @@ class LUFailoverInstance(LogicalUnit): raise errors.OpExecError("Can't activate the instance's disks") feedback_fn("* starting the instance on the target node") - if not rpc.call_instance_start(target_node, instance, None): + if not self.rpc.call_instance_start(target_node, instance, None): _ShutdownInstanceDisks(self, instance) raise errors.OpExecError("Could not start instance %s on node %s." % (instance.name, target_node)) @@ -2897,8 +2902,8 @@ def _CreateBlockDevOnPrimary(lu, node, instance, device, info): return False lu.cfg.SetDiskID(device, node) - new_id = rpc.call_blockdev_create(node, device, device.size, - instance.name, True, info) + new_id = lu.rpc.call_blockdev_create(node, device, device.size, + instance.name, True, info) if not new_id: return False if device.physical_id is None: @@ -2926,8 +2931,8 @@ def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info): if not force: return True lu.cfg.SetDiskID(device, node) - new_id = rpc.call_blockdev_create(node, device, device.size, - instance.name, False, info) + new_id = lu.rpc.call_blockdev_create(node, device, device.size, + instance.name, False, info) if not new_id: return False if device.physical_id is None: @@ -3049,8 +3054,8 @@ def _CreateDisks(lu, instance): if instance.disk_template == constants.DT_FILE: file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1]) - result = rpc.call_file_storage_dir_create(instance.primary_node, - file_storage_dir) + result = lu.rpc.call_file_storage_dir_create(instance.primary_node, + file_storage_dir) if not result: logger.Error("Could not connect to node '%s'" % instance.primary_node) @@ -3101,7 +3106,7 @@ def _RemoveDisks(lu, instance): for device in instance.disks: for node, disk in device.ComputeNodeTree(instance.primary_node): lu.cfg.SetDiskID(disk, node) - if not rpc.call_blockdev_remove(node, disk): + if not lu.rpc.call_blockdev_remove(node, disk): logger.Error("could not remove block device %s on node %s," " continuing anyway" % (device.iv_name, node)) @@ -3109,8 +3114,8 @@ def _RemoveDisks(lu, instance): if instance.disk_template == constants.DT_FILE: file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1]) - if not rpc.call_file_storage_dir_remove(instance.primary_node, - file_storage_dir): + if not lu.rpc.call_file_storage_dir_remove(instance.primary_node, + file_storage_dir): logger.Error("could not remove directory '%s'" % file_storage_dir) result = False @@ -3288,7 +3293,7 @@ class LUCreateInstance(LogicalUnit): {"size": self.op.swap_size, "mode": "w"}] nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None), "bridge": self.op.bridge}] - ial = IAllocator(self.cfg, + ial = IAllocator(self, mode=constants.IALLOCATOR_MODE_ALLOC, name=self.op.instance_name, disk_template=self.op.disk_template, @@ -3365,7 +3370,7 @@ class LUCreateInstance(LogicalUnit): src_node = self.op.src_node src_path = self.op.src_path - export_info = rpc.call_export_info(src_node, src_path) + export_info = self.rpc.call_export_info(src_node, src_path) if not export_info: raise errors.OpPrereqError("No export found in dir %s" % src_path) @@ -3435,8 +3440,8 @@ class LUCreateInstance(LogicalUnit): # Check lv size requirements if req_size is not None: nodenames = [pnode.name] + self.secondaries - nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName(), - self.op.hypervisor) + nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(), + self.op.hypervisor) for node in nodenames: info = nodeinfo.get(node, None) if not info: @@ -3452,7 +3457,7 @@ class LUCreateInstance(LogicalUnit): (node, info['vg_free'], req_size)) # os verification - os_obj = rpc.call_os_get(pnode.name, self.op.os_type) + os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type) if not os_obj: raise errors.OpPrereqError("OS '%s' not in supported os list for" " primary node" % self.op.os_type) @@ -3461,7 +3466,7 @@ class LUCreateInstance(LogicalUnit): raise errors.OpPrereqError("Can't set instance kernel to none") # bridge check on primary node - if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]): + if not self.rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]): raise errors.OpPrereqError("target bridge '%s' does not exist on" " destination node '%s'" % (self.op.bridge, pnode.name)) @@ -3610,7 +3615,7 @@ class LUCreateInstance(LogicalUnit): if iobj.disk_template != constants.DT_DISKLESS: if self.op.mode == constants.INSTANCE_CREATE: feedback_fn("* running the instance OS create scripts...") - if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"): + if not self.rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"): raise errors.OpExecError("could not add os for instance %s" " on node %s" % (instance, pnode_name)) @@ -3620,8 +3625,9 @@ class LUCreateInstance(LogicalUnit): src_node = self.op.src_node src_image = self.src_image cluster_name = self.cfg.GetClusterName() - if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb", - src_node, src_image, cluster_name): + if not self.rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb", + src_node, src_image, + cluster_name): raise errors.OpExecError("Could not import os for instance" " %s on node %s" % (instance, pnode_name)) @@ -3633,7 +3639,7 @@ class LUCreateInstance(LogicalUnit): if self.op.start: logger.Info("starting instance %s on node %s" % (instance, pnode_name)) feedback_fn("* starting instance...") - if not rpc.call_instance_start(pnode_name, iobj, None): + if not self.rpc.call_instance_start(pnode_name, iobj, None): raise errors.OpExecError("Could not start instance") @@ -3668,8 +3674,8 @@ class LUConnectConsole(NoHooksLU): instance = self.instance node = instance.primary_node - node_insts = rpc.call_instance_list([node], - [instance.hypervisor])[node] + node_insts = self.rpc.call_instance_list([node], + [instance.hypervisor])[node] if node_insts is False: raise errors.OpExecError("Can't connect to node %s." % node) @@ -3729,7 +3735,7 @@ class LUReplaceDisks(LogicalUnit): """Compute a new secondary node using an IAllocator. """ - ial = IAllocator(self.cfg, + ial = IAllocator(self, mode=constants.IALLOCATOR_MODE_RELOC, name=self.op.instance_name, relocate_from=[self.sec_node]) @@ -3871,7 +3877,7 @@ class LUReplaceDisks(LogicalUnit): self.proc.LogStep(1, steps_total, "check device existence") info("checking volume groups") my_vg = cfg.GetVGName() - results = rpc.call_vg_list([oth_node, tgt_node]) + results = self.rpc.call_vg_list([oth_node, tgt_node]) if not results: raise errors.OpExecError("Can't list volume groups on the nodes") for node in oth_node, tgt_node: @@ -3885,7 +3891,7 @@ class LUReplaceDisks(LogicalUnit): for node in tgt_node, oth_node: info("checking %s on %s" % (dev.iv_name, node)) cfg.SetDiskID(dev, node) - if not rpc.call_blockdev_find(node, dev): + if not self.rpc.call_blockdev_find(node, dev): raise errors.OpExecError("Can't find device %s on node %s" % (dev.iv_name, node)) @@ -3933,7 +3939,7 @@ class LUReplaceDisks(LogicalUnit): self.proc.LogStep(4, steps_total, "change drbd configuration") for dev, old_lvs, new_lvs in iv_names.itervalues(): info("detaching %s drbd from local storage" % dev.iv_name) - if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs): + if not self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs): raise errors.OpExecError("Can't detach drbd from local storage on node" " %s for device %s" % (tgt_node, dev.iv_name)) #dev.children = [] @@ -3952,17 +3958,17 @@ class LUReplaceDisks(LogicalUnit): # build the rename list based on what LVs exist on the node rlist = [] for to_ren in old_lvs: - find_res = rpc.call_blockdev_find(tgt_node, to_ren) + find_res = self.rpc.call_blockdev_find(tgt_node, to_ren) if find_res is not None: # device exists rlist.append((to_ren, ren_fn(to_ren, temp_suffix))) info("renaming the old LVs on the target node") - if not rpc.call_blockdev_rename(tgt_node, rlist): + if not self.rpc.call_blockdev_rename(tgt_node, rlist): raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node) # now we rename the new LVs to the old LVs info("renaming the new LVs on the target node") rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)] - if not rpc.call_blockdev_rename(tgt_node, rlist): + if not self.rpc.call_blockdev_rename(tgt_node, rlist): raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node) for old, new in zip(old_lvs, new_lvs): @@ -3975,9 +3981,9 @@ class LUReplaceDisks(LogicalUnit): # now that the new lvs have the old name, we can add them to the device info("adding new mirror component on %s" % tgt_node) - if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs): + if not self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs): for new_lv in new_lvs: - if not rpc.call_blockdev_remove(tgt_node, new_lv): + if not self.rpc.call_blockdev_remove(tgt_node, new_lv): warning("Can't rollback device %s", hint="manually cleanup unused" " logical volumes") raise errors.OpExecError("Can't add local storage to drbd") @@ -3996,7 +4002,7 @@ class LUReplaceDisks(LogicalUnit): # so check manually all the devices for name, (dev, old_lvs, new_lvs) in iv_names.iteritems(): cfg.SetDiskID(dev, instance.primary_node) - is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5] + is_degr = self.rpc.call_blockdev_find(instance.primary_node, dev)[5] if is_degr: raise errors.OpExecError("DRBD device %s is degraded!" % name) @@ -4006,7 +4012,7 @@ class LUReplaceDisks(LogicalUnit): info("remove logical volumes for %s" % name) for lv in old_lvs: cfg.SetDiskID(lv, tgt_node) - if not rpc.call_blockdev_remove(tgt_node, lv): + if not self.rpc.call_blockdev_remove(tgt_node, lv): warning("Can't remove old LV", hint="manually remove unused LVs") continue @@ -4044,7 +4050,7 @@ class LUReplaceDisks(LogicalUnit): self.proc.LogStep(1, steps_total, "check device existence") info("checking volume groups") my_vg = cfg.GetVGName() - results = rpc.call_vg_list([pri_node, new_node]) + results = self.rpc.call_vg_list([pri_node, new_node]) if not results: raise errors.OpExecError("Can't list volume groups on the nodes") for node in pri_node, new_node: @@ -4057,7 +4063,7 @@ class LUReplaceDisks(LogicalUnit): continue info("checking %s on %s" % (dev.iv_name, pri_node)) cfg.SetDiskID(dev, pri_node) - if not rpc.call_blockdev_find(pri_node, dev): + if not self.rpc.call_blockdev_find(pri_node, dev): raise errors.OpExecError("Can't find device %s on node %s" % (dev.iv_name, pri_node)) @@ -4124,7 +4130,7 @@ class LUReplaceDisks(LogicalUnit): # we have new devices, shutdown the drbd on the old secondary info("shutting down drbd for %s on old node" % dev.iv_name) cfg.SetDiskID(dev, old_node) - if not rpc.call_blockdev_shutdown(old_node, dev): + if not self.rpc.call_blockdev_shutdown(old_node, dev): warning("Failed to shutdown drbd for %s on old node" % dev.iv_name, hint="Please cleanup this device manually as soon as possible") @@ -4137,7 +4143,7 @@ class LUReplaceDisks(LogicalUnit): dev.physical_id = (None, None, None, None) + dev.physical_id[4:] # and 'find' the device, which will 'fix' it to match the # standalone state - if rpc.call_blockdev_find(pri_node, dev): + if self.rpc.call_blockdev_find(pri_node, dev): done += 1 else: warning("Failed to detach drbd %s from network, unusual case" % @@ -4169,7 +4175,7 @@ class LUReplaceDisks(LogicalUnit): # is correct cfg.SetDiskID(dev, pri_node) logging.debug("Disk to attach: %s", dev) - if not rpc.call_blockdev_find(pri_node, dev): + if not self.rpc.call_blockdev_find(pri_node, dev): warning("can't attach drbd %s to new secondary!" % dev.iv_name, "please do a gnt-instance info to see the status of disks") @@ -4182,7 +4188,7 @@ class LUReplaceDisks(LogicalUnit): # so check manually all the devices for name, (dev, old_lvs, _) in iv_names.iteritems(): cfg.SetDiskID(dev, pri_node) - is_degr = rpc.call_blockdev_find(pri_node, dev)[5] + is_degr = self.rpc.call_blockdev_find(pri_node, dev)[5] if is_degr: raise errors.OpExecError("DRBD device %s is degraded!" % name) @@ -4191,7 +4197,7 @@ class LUReplaceDisks(LogicalUnit): info("remove logical volumes for %s" % name) for lv in old_lvs: cfg.SetDiskID(lv, old_node) - if not rpc.call_blockdev_remove(old_node, lv): + if not self.rpc.call_blockdev_remove(old_node, lv): warning("Can't remove LV on old secondary", hint="Cleanup stale volumes by hand") @@ -4280,8 +4286,8 @@ class LUGrowDisk(LogicalUnit): (self.op.disk, instance.name)) nodenames = [instance.primary_node] + list(instance.secondary_nodes) - nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName(), - instance.hypervisor) + nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(), + instance.hypervisor) for node in nodenames: info = nodeinfo.get(node, None) if not info: @@ -4304,8 +4310,9 @@ class LUGrowDisk(LogicalUnit): disk = instance.FindDisk(self.op.disk) for node in (instance.secondary_nodes + (instance.primary_node,)): self.cfg.SetDiskID(disk, node) - result = rpc.call_blockdev_grow(node, disk, self.op.amount) - if not result or not isinstance(result, (list, tuple)) or len(result) != 2: + result = self.rpc.call_blockdev_grow(node, disk, self.op.amount) + if (not result or not isinstance(result, (list, tuple)) or + len(result) != 2): raise errors.OpExecError("grow request failed to node %s" % node) elif not result[0]: raise errors.OpExecError("grow request failed to node %s: %s" % @@ -4367,7 +4374,7 @@ class LUQueryInstanceData(NoHooksLU): """ self.cfg.SetDiskID(dev, instance.primary_node) - dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev) + dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev) if dev.dev_type in constants.LDS_DRBD: # we change the snode then (otherwise we use the one passed in) if dev.logical_id[0] == instance.primary_node: @@ -4377,7 +4384,7 @@ class LUQueryInstanceData(NoHooksLU): if snode: self.cfg.SetDiskID(dev, snode) - dev_sstatus = rpc.call_blockdev_find(snode, dev) + dev_sstatus = self.rpc.call_blockdev_find(snode, dev) else: dev_sstatus = None @@ -4403,9 +4410,9 @@ class LUQueryInstanceData(NoHooksLU): """Gather and return data""" result = {} for instance in self.wanted_instances: - remote_info = rpc.call_instance_info(instance.primary_node, - instance.name, - instance.hypervisor) + remote_info = self.rpc.call_instance_info(instance.primary_node, + instance.name, + instance.hypervisor) if remote_info and "state" in remote_info: remote_state = "up" else: @@ -4629,10 +4636,10 @@ class LUSetInstanceParams(LogicalUnit): pnode = self.instance.primary_node nodelist = [pnode] nodelist.extend(instance.secondary_nodes) - instance_info = rpc.call_instance_info(pnode, instance.name, - instance.hypervisor) - nodeinfo = rpc.call_node_info(nodelist, self.cfg.GetVGName(), - instance.hypervisor) + instance_info = self.rpc.call_instance_info(pnode, instance.name, + instance.hypervisor) + nodeinfo = self.rpc.call_node_info(nodelist, self.cfg.GetVGName(), + instance.hypervisor) if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict): # Assume the primary node is unreachable and go ahead @@ -4768,7 +4775,7 @@ class LUQueryExports(NoHooksLU): that node. """ - return rpc.call_export_list(self.nodes) + return self.rpc.call_export_list(self.nodes) class LUExportInstance(LogicalUnit): @@ -4843,7 +4850,7 @@ class LUExportInstance(LogicalUnit): src_node = instance.primary_node if self.op.shutdown: # shutdown the instance, but not the disks - if not rpc.call_instance_shutdown(src_node, instance): + if not self.rpc.call_instance_shutdown(src_node, instance): raise errors.OpExecError("Could not shutdown instance %s on node %s" % (instance.name, src_node)) @@ -4855,7 +4862,7 @@ class LUExportInstance(LogicalUnit): for disk in instance.disks: if disk.iv_name == "sda": # new_dev_name will be a snapshot of an lvm leaf of the one we passed - new_dev_name = rpc.call_blockdev_snapshot(src_node, disk) + new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk) if not new_dev_name: logger.Error("could not snapshot block device %s on node %s" % @@ -4869,7 +4876,7 @@ class LUExportInstance(LogicalUnit): finally: if self.op.shutdown and instance.status == "up": - if not rpc.call_instance_start(src_node, instance, None): + if not self.rpc.call_instance_start(src_node, instance, None): _ShutdownInstanceDisks(self, instance) raise errors.OpExecError("Could not start instance") @@ -4877,15 +4884,15 @@ class LUExportInstance(LogicalUnit): cluster_name = self.cfg.GetClusterName() for dev in snap_disks: - if not rpc.call_snapshot_export(src_node, dev, dst_node.name, + if not self.rpc.call_snapshot_export(src_node, dev, dst_node.name, instance, cluster_name): logger.Error("could not export block device %s from node %s to node %s" % (dev.logical_id[1], src_node, dst_node.name)) - if not rpc.call_blockdev_remove(src_node, dev): + if not self.rpc.call_blockdev_remove(src_node, dev): logger.Error("could not remove snapshot block device %s from node %s" % (dev.logical_id[1], src_node)) - if not rpc.call_finalize_export(dst_node.name, instance, snap_disks): + if not self.rpc.call_finalize_export(dst_node.name, instance, snap_disks): logger.Error("could not finalize export for instance %s on node %s" % (instance.name, dst_node.name)) @@ -4896,10 +4903,10 @@ class LUExportInstance(LogicalUnit): # if we proceed the backup would be removed because OpQueryExports # substitutes an empty list with the full cluster node list. if nodelist: - exportlist = rpc.call_export_list(nodelist) + exportlist = self.rpc.call_export_list(nodelist) for node in exportlist: if instance.name in exportlist[node]: - if not rpc.call_export_remove(node, instance.name): + if not self.rpc.call_export_remove(node, instance.name): logger.Error("could not remove older export for instance %s" " on node %s" % (instance.name, node)) @@ -4935,12 +4942,13 @@ class LURemoveExport(NoHooksLU): fqdn_warn = True instance_name = self.op.instance_name - exportlist = rpc.call_export_list(self.acquired_locks[locking.LEVEL_NODE]) + exportlist = self.rpc.call_export_list(self.acquired_locks[ + locking.LEVEL_NODE]) found = False for node in exportlist: if instance_name in exportlist[node]: found = True - if not rpc.call_export_remove(node, instance_name): + if not self.rpc.call_export_remove(node, instance_name): logger.Error("could not remove export for instance %s" " on node %s" % (instance_name, node)) @@ -5153,7 +5161,7 @@ class LUTestDelay(NoHooksLU): if not utils.TestDelay(self.op.duration): raise errors.OpExecError("Error during master delay test") if self.op.on_nodes: - result = rpc.call_test_delay(self.op.on_nodes, self.op.duration) + result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration) if not result: raise errors.OpExecError("Complete failure from rpc call") for node, node_result in result.items(): @@ -5183,8 +5191,8 @@ class IAllocator(object): "relocate_from", ] - def __init__(self, cfg, mode, name, **kwargs): - self.cfg = cfg + def __init__(self, lu, mode, name, **kwargs): + self.lu = lu # init buffer variables self.in_text = self.out_text = self.in_data = self.out_data = None # init all input fields so that pylint is happy @@ -5221,12 +5229,12 @@ class IAllocator(object): This is the data that is independent of the actual operation. """ - cfg = self.cfg + cfg = self.lu.cfg cluster_info = cfg.GetClusterInfo() # cluster data data = { "version": 1, - "cluster_name": self.cfg.GetClusterName(), + "cluster_name": cfg.GetClusterName(), "cluster_tags": list(cluster_info.GetTags()), "enable_hypervisors": list(cluster_info.enabled_hypervisors), # we don't have job IDs @@ -5239,8 +5247,8 @@ class IAllocator(object): node_list = cfg.GetNodeList() # FIXME: here we have only one hypervisor information, but # instance can belong to different hypervisors - node_data = rpc.call_node_info(node_list, cfg.GetVGName(), - cfg.GetHypervisorType()) + node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(), + cfg.GetHypervisorType()) for nname in node_list: ninfo = cfg.GetNodeInfo(nname) if nname not in node_data or not isinstance(node_data[nname], dict): @@ -5350,7 +5358,7 @@ class IAllocator(object): done. """ - instance = self.cfg.GetInstanceInfo(self.name) + instance = self.lu.cfg.GetInstanceInfo(self.name) if instance is None: raise errors.ProgrammerError("Unknown instance '%s' passed to" " IAllocator" % self.name) @@ -5389,13 +5397,15 @@ class IAllocator(object): self.in_text = serializer.Dump(self.in_data) - def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner): + def Run(self, name, validate=True, call_fn=None): """Run an instance allocator and return the results. """ + if call_fn is None: + call_fn = self.lu.rpc.call_iallocator_runner data = self.in_text - result = call_fn(self.cfg.GetMasterNode(), name, self.in_text) + result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text) if not isinstance(result, (list, tuple)) or len(result) != 4: raise errors.OpExecError("Invalid result from master iallocator runner") @@ -5508,7 +5518,7 @@ class LUTestAllocator(NoHooksLU): """ if self.op.mode == constants.IALLOCATOR_MODE_ALLOC: - ial = IAllocator(self.cfg, + ial = IAllocator(self, mode=self.op.mode, name=self.op.name, mem_size=self.op.mem_size, @@ -5520,7 +5530,7 @@ class LUTestAllocator(NoHooksLU): vcpus=self.op.vcpus, ) else: - ial = IAllocator(self.cfg, + ial = IAllocator(self, mode=self.op.mode, name=self.op.name, relocate_from=list(self.relocate_from), diff --git a/lib/config.py b/lib/config.py index e4c055c0b..2b30e608b 100644 --- a/lib/config.py +++ b/lib/config.py @@ -821,7 +821,7 @@ class ConfigWriter: except ValueError: pass - result = rpc.call_upload_file(nodelist, self._cfg_file) + result = rpc.RpcRunner.call_upload_file(nodelist, self._cfg_file) for node in nodelist: if not result[node]: logging.error("copy of file %s to node %s failed", diff --git a/lib/jqueue.py b/lib/jqueue.py index a4c7b1d84..54a3562dc 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -45,6 +45,7 @@ from ganeti import utils from ganeti import jstore from ganeti import rpc +from ganeti.rpc import RpcRunner JOBQUEUE_THREADS = 25 @@ -404,7 +405,7 @@ class JobQueue(object): assert node_name != self._my_hostname # Clean queue directory on added node - rpc.call_jobqueue_purge(node_name) + RpcRunner.call_jobqueue_purge(node_name) # Upload the whole queue excluding archived jobs files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()] @@ -420,7 +421,7 @@ class JobQueue(object): finally: fd.close() - result = rpc.call_jobqueue_update([node_name], file_name, content) + result = RpcRunner.call_jobqueue_update([node_name], file_name, content) if not result[node_name]: logging.error("Failed to upload %s to %s", file_name, node_name) @@ -459,14 +460,14 @@ class JobQueue(object): """ utils.WriteFile(file_name, data=data) - result = rpc.call_jobqueue_update(self._nodes, file_name, data) + result = RpcRunner.call_jobqueue_update(self._nodes, file_name, data) self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name) def _RenameFileUnlocked(self, old, new): os.rename(old, new) - result = rpc.call_jobqueue_rename(self._nodes, old, new) + result = RpcRunner.call_jobqueue_rename(self._nodes, old, new) self._CheckRpcResult(result, self._nodes, "Moving %s to %s" % (old, new)) diff --git a/lib/mcpu.py b/lib/mcpu.py index af0202abe..949b1e6c5 100644 --- a/lib/mcpu.py +++ b/lib/mcpu.py @@ -97,6 +97,7 @@ class Processor(object): self.context = context self._feedback_fn = None self.exclusive_BGL = False + self.rpc = rpc.RpcRunner(context.cfg) def _ExecLU(self, lu): """Logical Unit execution sequence. @@ -104,7 +105,7 @@ class Processor(object): """ write_count = self.context.cfg.write_count lu.CheckPrereq() - hm = HooksMaster(rpc.call_hooks_runner, self, lu) + hm = HooksMaster(self.rpc.call_hooks_runner, self, lu) h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE) lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results, self._feedback_fn, None) @@ -202,7 +203,7 @@ class Processor(object): shared=not lu_class.REQ_BGL) try: self.exclusive_BGL = lu_class.REQ_BGL - lu = lu_class(self, op, self.context) + lu = lu_class(self, op, self.context, self.rpc) lu.ExpandNames() assert lu.needed_locks is not None, "needed_locks not set by LU" result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE) diff --git a/lib/rpc.py b/lib/rpc.py index 02f770856..bb880dcb9 100644 --- a/lib/rpc.py +++ b/lib/rpc.py @@ -23,7 +23,12 @@ """ -# pylint: disable-msg=C0103 +# pylint: disable-msg=C0103,R0201,R0904 +# C0103: Invalid name, since call_ are not valid +# R0201: Method could be a function, we keep all rpcs instance methods +# as not to change them back and forth between static/instance methods +# if they need to start using instance attributes +# R0904: Too many public methods import os import socket @@ -140,748 +145,793 @@ class Client: self.results[node] = nc.get_response() -def call_volume_list(node_list, vg_name): - """Gets the logical volumes present in a given volume group. +class RpcRunner(object): + """RPC runner class""" - This is a multi-node call. + def __init__(self, cfg): + """Initialized the rpc runner. - """ - c = Client("volume_list", [vg_name]) - c.connect_list(node_list) - c.run() - return c.getresult() + @type cfg: C{config.ConfigWriter} + @param cfg: the configuration object that will be used to get data + about the cluster + """ + self._cfg = cfg -def call_vg_list(node_list): - """Gets the volume group list. + def call_volume_list(self, node_list, vg_name): + """Gets the logical volumes present in a given volume group. - This is a multi-node call. + This is a multi-node call. - """ - c = Client("vg_list", []) - c.connect_list(node_list) - c.run() - return c.getresult() + """ + c = Client("volume_list", [vg_name]) + c.connect_list(node_list) + c.run() + return c.getresult() + def call_vg_list(self, node_list): + """Gets the volume group list. -def call_bridges_exist(node, bridges_list): - """Checks if a node has all the bridges given. + This is a multi-node call. - This method checks if all bridges given in the bridges_list are - present on the remote node, so that an instance that uses interfaces - on those bridges can be started. + """ + c = Client("vg_list", []) + c.connect_list(node_list) + c.run() + return c.getresult() - This is a single-node call. - """ - c = Client("bridges_exist", [bridges_list]) - c.connect(node) - c.run() - return c.getresult().get(node, False) + def call_bridges_exist(self, node, bridges_list): + """Checks if a node has all the bridges given. + This method checks if all bridges given in the bridges_list are + present on the remote node, so that an instance that uses interfaces + on those bridges can be started. -def call_instance_start(node, instance, extra_args): - """Starts an instance. + This is a single-node call. - This is a single-node call. + """ + c = Client("bridges_exist", [bridges_list]) + c.connect(node) + c.run() + return c.getresult().get(node, False) - """ - c = Client("instance_start", [instance.ToDict(), extra_args]) - c.connect(node) - c.run() - return c.getresult().get(node, False) + def call_instance_start(self, node, instance, extra_args): + """Starts an instance. -def call_instance_shutdown(node, instance): - """Stops an instance. + This is a single-node call. - This is a single-node call. + """ + c = Client("instance_start", [instance.ToDict(), extra_args]) + c.connect(node) + c.run() + return c.getresult().get(node, False) - """ - c = Client("instance_shutdown", [instance.ToDict()]) - c.connect(node) - c.run() - return c.getresult().get(node, False) + def call_instance_shutdown(self, node, instance): + """Stops an instance. -def call_instance_migrate(node, instance, target, live): - """Migrate an instance. + This is a single-node call. - This is a single-node call. + """ + c = Client("instance_shutdown", [instance.ToDict()]) + c.connect(node) + c.run() + return c.getresult().get(node, False) - @type node: string - @param node: the node on which the instance is currently running - @type instance: C{objects.Instance} - @param instance: the instance definition - @type target: string - @param target: the target node name - @type live: boolean - @param live: whether the migration should be done live or not (the - interpretation of this parameter is left to the hypervisor) - """ - c = Client("instance_migrate", [instance.ToDict(), target, live]) - c.connect(node) - c.run() - return c.getresult().get(node, False) + def call_instance_migrate(self, node, instance, target, live): + """Migrate an instance. + This is a single-node call. -def call_instance_reboot(node, instance, reboot_type, extra_args): - """Reboots an instance. + @type node: string + @param node: the node on which the instance is currently running + @type instance: C{objects.Instance} + @param instance: the instance definition + @type target: string + @param target: the target node name + @type live: boolean + @param live: whether the migration should be done live or not (the + interpretation of this parameter is left to the hypervisor) - This is a single-node call. + """ + c = Client("instance_migrate", [instance.ToDict(), target, live]) + c.connect(node) + c.run() + return c.getresult().get(node, False) - """ - c = Client("instance_reboot", [instance.ToDict(), reboot_type, extra_args]) - c.connect(node) - c.run() - return c.getresult().get(node, False) + def call_instance_reboot(self, node, instance, reboot_type, extra_args): + """Reboots an instance. -def call_instance_os_add(node, inst, osdev, swapdev): - """Installs an OS on the given instance. + This is a single-node call. - This is a single-node call. + """ + c = Client("instance_reboot", [instance.ToDict(), reboot_type, extra_args]) + c.connect(node) + c.run() + return c.getresult().get(node, False) - """ - params = [inst.ToDict(), osdev, swapdev] - c = Client("instance_os_add", params) - c.connect(node) - c.run() - return c.getresult().get(node, False) + def call_instance_os_add(self, node, inst, osdev, swapdev): + """Installs an OS on the given instance. -def call_instance_run_rename(node, inst, old_name, osdev, swapdev): - """Run the OS rename script for an instance. + This is a single-node call. - This is a single-node call. + """ + params = [inst.ToDict(), osdev, swapdev] + c = Client("instance_os_add", params) + c.connect(node) + c.run() + return c.getresult().get(node, False) - """ - params = [inst.ToDict(), old_name, osdev, swapdev] - c = Client("instance_run_rename", params) - c.connect(node) - c.run() - return c.getresult().get(node, False) + def call_instance_run_rename(self, node, inst, old_name, osdev, swapdev): + """Run the OS rename script for an instance. -def call_instance_info(node, instance, hname): - """Returns information about a single instance. + This is a single-node call. - This is a single-node call. + """ + params = [inst.ToDict(), old_name, osdev, swapdev] + c = Client("instance_run_rename", params) + c.connect(node) + c.run() + return c.getresult().get(node, False) - @type node_list: list - @param node_list: the list of nodes to query - @type instance: string - @param instance: the instance name - @type hname: string - @param hname: the hypervisor type of the instance - """ - c = Client("instance_info", [instance]) - c.connect(node) - c.run() - return c.getresult().get(node, False) + def call_instance_info(self, node, instance, hname): + """Returns information about a single instance. + This is a single-node call. -def call_all_instances_info(node_list, hypervisor_list): - """Returns information about all instances on the given nodes. + @type node_list: list + @param node_list: the list of nodes to query + @type instance: string + @param instance: the instance name + @type hname: string + @param hname: the hypervisor type of the instance - This is a multi-node call. + """ + c = Client("instance_info", [instance]) + c.connect(node) + c.run() + return c.getresult().get(node, False) - @type node_list: list - @param node_list: the list of nodes to query - @type hypervisor_list: list - @param hypervisor_list: the hypervisors to query for instances - """ - c = Client("all_instances_info", [hypervisor_list]) - c.connect_list(node_list) - c.run() - return c.getresult() + def call_all_instances_info(self, node_list, hypervisor_list): + """Returns information about all instances on the given nodes. + This is a multi-node call. -def call_instance_list(node_list, hypervisor_list): - """Returns the list of running instances on a given node. + @type node_list: list + @param node_list: the list of nodes to query + @type hypervisor_list: list + @param hypervisor_list: the hypervisors to query for instances - This is a multi-node call. + """ + c = Client("all_instances_info", [hypervisor_list]) + c.connect_list(node_list) + c.run() + return c.getresult() - @type node_list: list - @param node_list: the list of nodes to query - @type hypervisor_list: list - @param hypervisor_list: the hypervisors to query for instances - """ - c = Client("instance_list", [hypervisor_list]) - c.connect_list(node_list) - c.run() - return c.getresult() + def call_instance_list(self, node_list, hypervisor_list): + """Returns the list of running instances on a given node. + This is a multi-node call. -def call_node_tcp_ping(node, source, target, port, timeout, live_port_needed): - """Do a TcpPing on the remote node + @type node_list: list + @param node_list: the list of nodes to query + @type hypervisor_list: list + @param hypervisor_list: the hypervisors to query for instances - This is a single-node call. - """ - c = Client("node_tcp_ping", [source, target, port, timeout, - live_port_needed]) - c.connect(node) - c.run() - return c.getresult().get(node, False) + """ + c = Client("instance_list", [hypervisor_list]) + c.connect_list(node_list) + c.run() + return c.getresult() -def call_node_info(node_list, vg_name, hypervisor_type): - """Return node information. + def call_node_tcp_ping(self, node, source, target, port, timeout, + live_port_needed): + """Do a TcpPing on the remote node - This will return memory information and volume group size and free - space. + This is a single-node call. + """ + c = Client("node_tcp_ping", [source, target, port, timeout, + live_port_needed]) + c.connect(node) + c.run() + return c.getresult().get(node, False) - This is a multi-node call. - @type node_list: list - @param node_list: the list of nodes to query - @type vgname: C{string} - @param vgname: the name of the volume group to ask for disk space information - @type hypervisor_type: C{str} - @param hypervisor_type: the name of the hypervisor to ask for - memory information + def call_node_info(self, node_list, vg_name, hypervisor_type): + """Return node information. - """ - c = Client("node_info", [vg_name, hypervisor_type]) - c.connect_list(node_list) - c.run() - retux = c.getresult() + This will return memory information and volume group size and free + space. - for node_name in retux: - ret = retux.get(node_name, False) - if type(ret) != dict: - logger.Error("could not connect to node %s" % (node_name)) - ret = {} + This is a multi-node call. - utils.CheckDict(ret, - { 'memory_total' : '-', - 'memory_dom0' : '-', - 'memory_free' : '-', - 'vg_size' : 'node_unreachable', - 'vg_free' : '-' }, - "call_node_info", - ) - return retux + @type node_list: list + @param node_list: the list of nodes to query + @type vgname: C{string} + @param vgname: the name of the volume group to ask for disk space + information + @type hypervisor_type: C{str} + @param hypervisor_type: the name of the hypervisor to ask for + memory information + """ + c = Client("node_info", [vg_name, hypervisor_type]) + c.connect_list(node_list) + c.run() + retux = c.getresult() -def call_node_add(node, dsa, dsapub, rsa, rsapub, ssh, sshpub): - """Add a node to the cluster. + for node_name in retux: + ret = retux.get(node_name, False) + if type(ret) != dict: + logger.Error("could not connect to node %s" % (node_name)) + ret = {} - This is a single-node call. + utils.CheckDict(ret, + { 'memory_total' : '-', + 'memory_dom0' : '-', + 'memory_free' : '-', + 'vg_size' : 'node_unreachable', + 'vg_free' : '-' }, + "call_node_info", + ) + return retux - """ - params = [dsa, dsapub, rsa, rsapub, ssh, sshpub] - c = Client("node_add", params) - c.connect(node) - c.run() - return c.getresult().get(node, False) + def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub): + """Add a node to the cluster. -def call_node_verify(node_list, checkdict, cluster_name): - """Request verification of given parameters. + This is a single-node call. - This is a multi-node call. + """ + params = [dsa, dsapub, rsa, rsapub, ssh, sshpub] + c = Client("node_add", params) + c.connect(node) + c.run() + return c.getresult().get(node, False) - """ - c = Client("node_verify", [checkdict, cluster_name]) - c.connect_list(node_list) - c.run() - return c.getresult() + def call_node_verify(self, node_list, checkdict, cluster_name): + """Request verification of given parameters. -def call_node_start_master(node, start_daemons): - """Tells a node to activate itself as a master. + This is a multi-node call. - This is a single-node call. + """ + c = Client("node_verify", [checkdict, cluster_name]) + c.connect_list(node_list) + c.run() + return c.getresult() - """ - c = Client("node_start_master", [start_daemons]) - c.connect(node) - c.run() - return c.getresult().get(node, False) + @staticmethod + def call_node_start_master(node, start_daemons): + """Tells a node to activate itself as a master. -def call_node_stop_master(node, stop_daemons): - """Tells a node to demote itself from master status. + This is a single-node call. - This is a single-node call. + """ + c = Client("node_start_master", [start_daemons]) + c.connect(node) + c.run() + return c.getresult().get(node, False) - """ - c = Client("node_stop_master", [stop_daemons]) - c.connect(node) - c.run() - return c.getresult().get(node, False) + @staticmethod + def call_node_stop_master(node, stop_daemons): + """Tells a node to demote itself from master status. -def call_master_info(node_list): - """Query master info. + This is a single-node call. - This is a multi-node call. + """ + c = Client("node_stop_master", [stop_daemons]) + c.connect(node) + c.run() + return c.getresult().get(node, False) - """ - c = Client("master_info", []) - c.connect_list(node_list) - c.run() - return c.getresult() + @staticmethod + def call_master_info(node_list): + """Query master info. -def call_version(node_list): - """Query node version. + This is a multi-node call. - This is a multi-node call. + """ + # TODO: should this method query down nodes? + c = Client("master_info", []) + c.connect_list(node_list) + c.run() + return c.getresult() - """ - c = Client("version", []) - c.connect_list(node_list) - c.run() - return c.getresult() + def call_version(self, node_list): + """Query node version. -def call_blockdev_create(node, bdev, size, owner, on_primary, info): - """Request creation of a given block device. + This is a multi-node call. - This is a single-node call. + """ + c = Client("version", []) + c.connect_list(node_list) + c.run() + return c.getresult() - """ - params = [bdev.ToDict(), size, owner, on_primary, info] - c = Client("blockdev_create", params) - c.connect(node) - c.run() - return c.getresult().get(node, False) + def call_blockdev_create(self, node, bdev, size, owner, on_primary, info): + """Request creation of a given block device. -def call_blockdev_remove(node, bdev): - """Request removal of a given block device. + This is a single-node call. - This is a single-node call. + """ + params = [bdev.ToDict(), size, owner, on_primary, info] + c = Client("blockdev_create", params) + c.connect(node) + c.run() + return c.getresult().get(node, False) - """ - c = Client("blockdev_remove", [bdev.ToDict()]) - c.connect(node) - c.run() - return c.getresult().get(node, False) + def call_blockdev_remove(self, node, bdev): + """Request removal of a given block device. -def call_blockdev_rename(node, devlist): - """Request rename of the given block devices. + This is a single-node call. - This is a single-node call. + """ + c = Client("blockdev_remove", [bdev.ToDict()]) + c.connect(node) + c.run() + return c.getresult().get(node, False) - """ - params = [(d.ToDict(), uid) for d, uid in devlist] - c = Client("blockdev_rename", params) - c.connect(node) - c.run() - return c.getresult().get(node, False) + def call_blockdev_rename(self, node, devlist): + """Request rename of the given block devices. -def call_blockdev_assemble(node, disk, owner, on_primary): - """Request assembling of a given block device. + This is a single-node call. - This is a single-node call. + """ + params = [(d.ToDict(), uid) for d, uid in devlist] + c = Client("blockdev_rename", params) + c.connect(node) + c.run() + return c.getresult().get(node, False) - """ - params = [disk.ToDict(), owner, on_primary] - c = Client("blockdev_assemble", params) - c.connect(node) - c.run() - return c.getresult().get(node, False) + def call_blockdev_assemble(self, node, disk, owner, on_primary): + """Request assembling of a given block device. -def call_blockdev_shutdown(node, disk): - """Request shutdown of a given block device. + This is a single-node call. - This is a single-node call. + """ + params = [disk.ToDict(), owner, on_primary] + c = Client("blockdev_assemble", params) + c.connect(node) + c.run() + return c.getresult().get(node, False) - """ - c = Client("blockdev_shutdown", [disk.ToDict()]) - c.connect(node) - c.run() - return c.getresult().get(node, False) + def call_blockdev_shutdown(self, node, disk): + """Request shutdown of a given block device. -def call_blockdev_addchildren(node, bdev, ndevs): - """Request adding a list of children to a (mirroring) device. + This is a single-node call. - This is a single-node call. + """ + c = Client("blockdev_shutdown", [disk.ToDict()]) + c.connect(node) + c.run() + return c.getresult().get(node, False) - """ - params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]] - c = Client("blockdev_addchildren", params) - c.connect(node) - c.run() - return c.getresult().get(node, False) + def call_blockdev_addchildren(self, node, bdev, ndevs): + """Request adding a list of children to a (mirroring) device. -def call_blockdev_removechildren(node, bdev, ndevs): - """Request removing a list of children from a (mirroring) device. + This is a single-node call. - This is a single-node call. + """ + params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]] + c = Client("blockdev_addchildren", params) + c.connect(node) + c.run() + return c.getresult().get(node, False) - """ - params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]] - c = Client("blockdev_removechildren", params) - c.connect(node) - c.run() - return c.getresult().get(node, False) + def call_blockdev_removechildren(self, node, bdev, ndevs): + """Request removing a list of children from a (mirroring) device. -def call_blockdev_getmirrorstatus(node, disks): - """Request status of a (mirroring) device. + This is a single-node call. - This is a single-node call. + """ + params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]] + c = Client("blockdev_removechildren", params) + c.connect(node) + c.run() + return c.getresult().get(node, False) - """ - params = [dsk.ToDict() for dsk in disks] - c = Client("blockdev_getmirrorstatus", params) - c.connect(node) - c.run() - return c.getresult().get(node, False) + def call_blockdev_getmirrorstatus(self, node, disks): + """Request status of a (mirroring) device. -def call_blockdev_find(node, disk): - """Request identification of a given block device. + This is a single-node call. - This is a single-node call. + """ + params = [dsk.ToDict() for dsk in disks] + c = Client("blockdev_getmirrorstatus", params) + c.connect(node) + c.run() + return c.getresult().get(node, False) - """ - c = Client("blockdev_find", [disk.ToDict()]) - c.connect(node) - c.run() - return c.getresult().get(node, False) + def call_blockdev_find(self, node, disk): + """Request identification of a given block device. + + This is a single-node call. -def call_blockdev_close(node, disks): - """Closes the given block devices. + """ + c = Client("blockdev_find", [disk.ToDict()]) + c.connect(node) + c.run() + return c.getresult().get(node, False) - This is a single-node call. - """ - params = [cf.ToDict() for cf in disks] - c = Client("blockdev_close", params) - c.connect(node) - c.run() - return c.getresult().get(node, False) + def call_blockdev_close(self, node, disks): + """Closes the given block devices. + This is a single-node call. -def call_upload_file(node_list, file_name): - """Upload a file. + """ + params = [cf.ToDict() for cf in disks] + c = Client("blockdev_close", params) + c.connect(node) + c.run() + return c.getresult().get(node, False) - The node will refuse the operation in case the file is not on the - approved file list. - This is a multi-node call. + @staticmethod + def call_upload_file(node_list, file_name): + """Upload a file. - """ - fh = file(file_name) - try: - data = fh.read() - finally: - fh.close() - st = os.stat(file_name) - params = [file_name, data, st.st_mode, st.st_uid, st.st_gid, - st.st_atime, st.st_mtime] - c = Client("upload_file", params) - c.connect_list(node_list) - c.run() - return c.getresult() + The node will refuse the operation in case the file is not on the + approved file list. + This is a multi-node call. -def call_os_diagnose(node_list): - """Request a diagnose of OS definitions. + """ + fh = file(file_name) + try: + data = fh.read() + finally: + fh.close() + st = os.stat(file_name) + params = [file_name, data, st.st_mode, st.st_uid, st.st_gid, + st.st_atime, st.st_mtime] + c = Client("upload_file", params) + c.connect_list(node_list) + c.run() + return c.getresult() + + @staticmethod + def call_upload_file(node_list, file_name): + """Upload a file. + + The node will refuse the operation in case the file is not on the + approved file list. + + This is a multi-node call. - This is a multi-node call. + """ + fh = file(file_name) + try: + data = fh.read() + finally: + fh.close() + st = os.stat(file_name) + params = [file_name, data, st.st_mode, st.st_uid, st.st_gid, + st.st_atime, st.st_mtime] + c = Client("upload_file", params) + c.connect_list(node_list) + c.run() + return c.getresult() + + def call_os_diagnose(self, node_list): + """Request a diagnose of OS definitions. + + This is a multi-node call. - """ - c = Client("os_diagnose", []) - c.connect_list(node_list) - c.run() - result = c.getresult() - new_result = {} - for node_name in result: - if result[node_name]: - nr = [objects.OS.FromDict(oss) for oss in result[node_name]] - else: - nr = [] - new_result[node_name] = nr - return new_result + """ + c = Client("os_diagnose", []) + c.connect_list(node_list) + c.run() + result = c.getresult() + new_result = {} + for node_name in result: + if result[node_name]: + nr = [objects.OS.FromDict(oss) for oss in result[node_name]] + else: + nr = [] + new_result[node_name] = nr + return new_result -def call_os_get(node, name): - """Returns an OS definition. + def call_os_get(self, node, name): + """Returns an OS definition. - This is a single-node call. + This is a single-node call. - """ - c = Client("os_get", [name]) - c.connect(node) - c.run() - result = c.getresult().get(node, False) - if isinstance(result, dict): - return objects.OS.FromDict(result) - else: - return result + """ + c = Client("os_get", [name]) + c.connect(node) + c.run() + result = c.getresult().get(node, False) + if isinstance(result, dict): + return objects.OS.FromDict(result) + else: + return result -def call_hooks_runner(node_list, hpath, phase, env): - """Call the hooks runner. + def call_hooks_runner(self, node_list, hpath, phase, env): + """Call the hooks runner. - Args: - - op: the OpCode instance - - env: a dictionary with the environment + Args: + - op: the OpCode instance + - env: a dictionary with the environment - This is a multi-node call. + This is a multi-node call. - """ - params = [hpath, phase, env] - c = Client("hooks_runner", params) - c.connect_list(node_list) - c.run() - result = c.getresult() - return result + """ + params = [hpath, phase, env] + c = Client("hooks_runner", params) + c.connect_list(node_list) + c.run() + result = c.getresult() + return result -def call_iallocator_runner(node, name, idata): - """Call an iallocator on a remote node + def call_iallocator_runner(self, node, name, idata): + """Call an iallocator on a remote node - Args: - - name: the iallocator name - - input: the json-encoded input string + Args: + - name: the iallocator name + - input: the json-encoded input string - This is a single-node call. + This is a single-node call. - """ - params = [name, idata] - c = Client("iallocator_runner", params) - c.connect(node) - c.run() - result = c.getresult().get(node, False) - return result + """ + params = [name, idata] + c = Client("iallocator_runner", params) + c.connect(node) + c.run() + result = c.getresult().get(node, False) + return result -def call_blockdev_grow(node, cf_bdev, amount): - """Request a snapshot of the given block device. + def call_blockdev_grow(self, node, cf_bdev, amount): + """Request a snapshot of the given block device. - This is a single-node call. + This is a single-node call. - """ - c = Client("blockdev_grow", [cf_bdev.ToDict(), amount]) - c.connect(node) - c.run() - return c.getresult().get(node, False) + """ + c = Client("blockdev_grow", [cf_bdev.ToDict(), amount]) + c.connect(node) + c.run() + return c.getresult().get(node, False) -def call_blockdev_snapshot(node, cf_bdev): - """Request a snapshot of the given block device. + def call_blockdev_snapshot(self, node, cf_bdev): + """Request a snapshot of the given block device. - This is a single-node call. + This is a single-node call. - """ - c = Client("blockdev_snapshot", [cf_bdev.ToDict()]) - c.connect(node) - c.run() - return c.getresult().get(node, False) + """ + c = Client("blockdev_snapshot", [cf_bdev.ToDict()]) + c.connect(node) + c.run() + return c.getresult().get(node, False) -def call_snapshot_export(node, snap_bdev, dest_node, instance, cluster_name): - """Request the export of a given snapshot. + def call_snapshot_export(self, node, snap_bdev, dest_node, instance, + cluster_name): + """Request the export of a given snapshot. - This is a single-node call. + This is a single-node call. - """ - params = [snap_bdev.ToDict(), dest_node, instance.ToDict(), cluster_name] - c = Client("snapshot_export", params) - c.connect(node) - c.run() - return c.getresult().get(node, False) + """ + params = [snap_bdev.ToDict(), dest_node, instance.ToDict(), cluster_name] + c = Client("snapshot_export", params) + c.connect(node) + c.run() + return c.getresult().get(node, False) -def call_finalize_export(node, instance, snap_disks): - """Request the completion of an export operation. + def call_finalize_export(self, node, instance, snap_disks): + """Request the completion of an export operation. - This writes the export config file, etc. + This writes the export config file, etc. - This is a single-node call. + This is a single-node call. - """ - flat_disks = [] - for disk in snap_disks: - flat_disks.append(disk.ToDict()) - params = [instance.ToDict(), flat_disks] - c = Client("finalize_export", params) - c.connect(node) - c.run() - return c.getresult().get(node, False) + """ + flat_disks = [] + for disk in snap_disks: + flat_disks.append(disk.ToDict()) + params = [instance.ToDict(), flat_disks] + c = Client("finalize_export", params) + c.connect(node) + c.run() + return c.getresult().get(node, False) -def call_export_info(node, path): - """Queries the export information in a given path. + def call_export_info(self, node, path): + """Queries the export information in a given path. - This is a single-node call. + This is a single-node call. - """ - c = Client("export_info", [path]) - c.connect(node) - c.run() - result = c.getresult().get(node, False) - if not result: - return result - return objects.SerializableConfigParser.Loads(str(result)) + """ + c = Client("export_info", [path]) + c.connect(node) + c.run() + result = c.getresult().get(node, False) + if not result: + return result + return objects.SerializableConfigParser.Loads(str(result)) -def call_instance_os_import(node, inst, osdev, swapdev, - src_node, src_image, cluster_name): - """Request the import of a backup into an instance. + def call_instance_os_import(self, node, inst, osdev, swapdev, + src_node, src_image, cluster_name): + """Request the import of a backup into an instance. - This is a single-node call. + This is a single-node call. - """ - params = [inst.ToDict(), osdev, swapdev, src_node, src_image, cluster_name] - c = Client("instance_os_import", params) - c.connect(node) - c.run() - return c.getresult().get(node, False) + """ + params = [inst.ToDict(), osdev, swapdev, src_node, src_image, cluster_name] + c = Client("instance_os_import", params) + c.connect(node) + c.run() + return c.getresult().get(node, False) -def call_export_list(node_list): - """Gets the stored exports list. + def call_export_list(self, node_list): + """Gets the stored exports list. - This is a multi-node call. + This is a multi-node call. - """ - c = Client("export_list", []) - c.connect_list(node_list) - c.run() - result = c.getresult() - return result + """ + c = Client("export_list", []) + c.connect_list(node_list) + c.run() + result = c.getresult() + return result -def call_export_remove(node, export): - """Requests removal of a given export. + def call_export_remove(self, node, export): + """Requests removal of a given export. - This is a single-node call. + This is a single-node call. - """ - c = Client("export_remove", [export]) - c.connect(node) - c.run() - return c.getresult().get(node, False) + """ + c = Client("export_remove", [export]) + c.connect(node) + c.run() + return c.getresult().get(node, False) -def call_node_leave_cluster(node): - """Requests a node to clean the cluster information it has. + def call_node_leave_cluster(self, node): + """Requests a node to clean the cluster information it has. - This will remove the configuration information from the ganeti data - dir. + This will remove the configuration information from the ganeti data + dir. - This is a single-node call. + This is a single-node call. - """ - c = Client("node_leave_cluster", []) - c.connect(node) - c.run() - return c.getresult().get(node, False) + """ + c = Client("node_leave_cluster", []) + c.connect(node) + c.run() + return c.getresult().get(node, False) -def call_node_volumes(node_list): - """Gets all volumes on node(s). + def call_node_volumes(self, node_list): + """Gets all volumes on node(s). - This is a multi-node call. + This is a multi-node call. - """ - c = Client("node_volumes", []) - c.connect_list(node_list) - c.run() - return c.getresult() + """ + c = Client("node_volumes", []) + c.connect_list(node_list) + c.run() + return c.getresult() -def call_test_delay(node_list, duration): - """Sleep for a fixed time on given node(s). + def call_test_delay(self, node_list, duration): + """Sleep for a fixed time on given node(s). - This is a multi-node call. + This is a multi-node call. - """ - c = Client("test_delay", [duration]) - c.connect_list(node_list) - c.run() - return c.getresult() + """ + c = Client("test_delay", [duration]) + c.connect_list(node_list) + c.run() + return c.getresult() -def call_file_storage_dir_create(node, file_storage_dir): - """Create the given file storage directory. + def call_file_storage_dir_create(self, node, file_storage_dir): + """Create the given file storage directory. - This is a single-node call. + This is a single-node call. - """ - c = Client("file_storage_dir_create", [file_storage_dir]) - c.connect(node) - c.run() - return c.getresult().get(node, False) + """ + c = Client("file_storage_dir_create", [file_storage_dir]) + c.connect(node) + c.run() + return c.getresult().get(node, False) -def call_file_storage_dir_remove(node, file_storage_dir): - """Remove the given file storage directory. + def call_file_storage_dir_remove(self, node, file_storage_dir): + """Remove the given file storage directory. - This is a single-node call. + This is a single-node call. - """ - c = Client("file_storage_dir_remove", [file_storage_dir]) - c.connect(node) - c.run() - return c.getresult().get(node, False) + """ + c = Client("file_storage_dir_remove", [file_storage_dir]) + c.connect(node) + c.run() + return c.getresult().get(node, False) -def call_file_storage_dir_rename(node, old_file_storage_dir, - new_file_storage_dir): - """Rename file storage directory. + def call_file_storage_dir_rename(self, node, old_file_storage_dir, + new_file_storage_dir): + """Rename file storage directory. - This is a single-node call. + This is a single-node call. - """ - c = Client("file_storage_dir_rename", - [old_file_storage_dir, new_file_storage_dir]) - c.connect(node) - c.run() - return c.getresult().get(node, False) + """ + c = Client("file_storage_dir_rename", + [old_file_storage_dir, new_file_storage_dir]) + c.connect(node) + c.run() + return c.getresult().get(node, False) -def call_jobqueue_update(node_list, file_name, content): - """Update job queue. + @staticmethod + def call_jobqueue_update(node_list, file_name, content): + """Update job queue. - This is a multi-node call. + This is a multi-node call. - """ - c = Client("jobqueue_update", [file_name, content]) - c.connect_list(node_list) - c.run() - result = c.getresult() - return result + """ + c = Client("jobqueue_update", [file_name, content]) + c.connect_list(node_list) + c.run() + result = c.getresult() + return result -def call_jobqueue_purge(node): - """Purge job queue. + @staticmethod + def call_jobqueue_purge(node): + """Purge job queue. - This is a single-node call. + This is a single-node call. - """ - c = Client("jobqueue_purge", []) - c.connect(node) - c.run() - return c.getresult().get(node, False) + """ + c = Client("jobqueue_purge", []) + c.connect(node) + c.run() + return c.getresult().get(node, False) -def call_jobqueue_rename(node_list, old, new): - """Rename a job queue file. + @staticmethod + def call_jobqueue_rename(node_list, old, new): + """Rename a job queue file. - This is a multi-node call. + This is a multi-node call. - """ - c = Client("jobqueue_rename", [old, new]) - c.connect_list(node_list) - c.run() - result = c.getresult() - return result + """ + c = Client("jobqueue_rename", [old, new]) + c.connect_list(node_list) + c.run() + result = c.getresult() + return result diff --git a/test/ganeti.hooks_unittest.py b/test/ganeti.hooks_unittest.py index 2f5f8b0a2..02522b905 100755 --- a/test/ganeti.hooks_unittest.py +++ b/test/ganeti.hooks_unittest.py @@ -228,7 +228,9 @@ class TestHooksMaster(unittest.TestCase): def setUp(self): self.op = opcodes.OpCode() self.context = FakeContext() - self.lu = FakeLU(None, self.op, self.context) + # WARNING: here we pass None as RpcRunner instance since we know + # our usage via HooksMaster will not use lu.rpc + self.lu = FakeLU(None, self.op, self.context, None) def testTotalFalse(self): """Test complete rpc failure""" -- GitLab