diff --git a/daemons/ganeti-masterd b/daemons/ganeti-masterd index f49bc43c1ef5215d5b242644131c3ebcf9b78fc3..06886f3f49928bf1bd43b91d9834a74a33d9ffc8 100755 --- a/daemons/ganeti-masterd +++ b/daemons/ganeti-masterd @@ -381,7 +381,7 @@ def CheckAgreement(): # either single node cluster, or a misconfiguration, but I won't # break any other node, so I can proceed return True - results = rpc.call_master_info(node_list) + results = rpc.RpcRunner.call_master_info(node_list) if not isinstance(results, dict): # this should not happen (unless internal error in rpc) logging.critical("Can't complete rpc call, aborting master startup") @@ -445,7 +445,7 @@ def main(): # activate ip master_node = ssconf.SimpleConfigReader().GetMasterNode() - if not rpc.call_node_start_master(master_node, False): + if not rpc.RpcRunner.call_node_start_master(master_node, False): logging.error("Can't activate master IP address") master.setup_queue() diff --git a/lib/bootstrap.py b/lib/bootstrap.py index 45cd260e794d155af2eeabe37f0941a44e2fcfd1..b69819a17056769feb903f1acc7f49fb5b93bf74 100644 --- a/lib/bootstrap.py +++ b/lib/bootstrap.py @@ -38,6 +38,7 @@ from ganeti import constants from ganeti import objects from ganeti import ssconf +from ganeti.rpc import RpcRunner def _InitSSHSetup(node): """Setup the SSH configuration for the cluster. @@ -236,7 +237,7 @@ def InitCluster(cluster_name, hypervisor_type, mac_prefix, def_bridge, # start the master ip # TODO: Review rpc call from bootstrap - rpc.call_node_start_master(hostname.name, True) + RpcRunner.call_node_start_master(hostname.name, True) def InitConfig(version, cluster_config, master_node_config, @@ -281,9 +282,9 @@ def FinalizeClusterDestroy(master): begun in cmdlib.LUDestroyOpcode. """ - if not rpc.call_node_stop_master(master, True): + if not RpcRunner.call_node_stop_master(master, True): logging.warning("Could not disable the master role") - if not rpc.call_node_leave_cluster(master): + if not RpcRunner.call_node_leave_cluster(master): logging.warning("Could not shutdown the node daemon and cleanup the node") @@ -365,7 +366,7 @@ def MasterFailover(): logging.info("setting master to %s, old master: %s", new_master, old_master) - if not rpc.call_node_stop_master(old_master, True): + if not RpcRunner.call_node_stop_master(old_master, True): logging.error("could disable the master role on the old master" " %s, please disable manually", old_master) @@ -374,12 +375,12 @@ def MasterFailover(): # Here we have a phase where no master should be running - if not rpc.call_upload_file(cfg.GetNodeList(), - constants.CLUSTER_CONF_FILE): + if not RpcRunner.call_upload_file(cfg.GetNodeList(), + constants.CLUSTER_CONF_FILE): logging.error("could not distribute the new simple store master file" " to the other nodes, please check.") - if not rpc.call_node_start_master(new_master, True): + if not RpcRunner.call_node_start_master(new_master, True): logging.error("could not start the master role on the new master" " %s, please check", new_master) rcode = 1 diff --git a/lib/cmdlib.py b/lib/cmdlib.py index f947f5e5aba6198dfbd4418bf7bb5c14367450da..33da97958dfb2da203f42f5728571ec83ebb1f5f 100644 --- a/lib/cmdlib.py +++ b/lib/cmdlib.py @@ -32,7 +32,6 @@ import re import platform import logging -from ganeti import rpc from ganeti import ssh from ganeti import logger from ganeti import utils @@ -67,7 +66,7 @@ class LogicalUnit(object): REQ_MASTER = True REQ_BGL = True - def __init__(self, processor, op, context): + def __init__(self, processor, op, context, rpc): """Constructor for LogicalUnit. This needs to be overriden in derived classes in order to check op @@ -78,6 +77,7 @@ class LogicalUnit(object): self.op = op self.cfg = context.cfg self.context = context + self.rpc = rpc # Dicts used to declare locking needs to mcpu self.needed_locks = None self.acquired_locks = {} @@ -448,7 +448,7 @@ def _CheckInstanceBridgesExist(lu, instance): """ # check bridges existance brlist = [nic.bridge for nic in instance.nics] - if not rpc.call_bridges_exist(instance.primary_node, brlist): + if not lu.rpc.call_bridges_exist(instance.primary_node, brlist): raise errors.OpPrereqError("one or more target bridges %s does not" " exist on destination node '%s'" % (brlist, instance.primary_node)) @@ -484,7 +484,7 @@ class LUDestroyCluster(NoHooksLU): """ master = self.cfg.GetMasterNode() - if not rpc.call_node_stop_master(master, False): + if not self.rpc.call_node_stop_master(master, False): raise errors.OpExecError("Could not disable the master role") priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS) utils.CreateBackup(priv_key) @@ -742,9 +742,9 @@ class LUVerifyCluster(LogicalUnit): local_checksums = utils.FingerprintFiles(file_names) feedback_fn("* Gathering data (%d nodes)" % len(nodelist)) - all_volumeinfo = rpc.call_volume_list(nodelist, vg_name) - all_instanceinfo = rpc.call_instance_list(nodelist, hypervisors) - all_vglist = rpc.call_vg_list(nodelist) + all_volumeinfo = self.rpc.call_volume_list(nodelist, vg_name) + all_instanceinfo = self.rpc.call_instance_list(nodelist, hypervisors) + all_vglist = self.rpc.call_vg_list(nodelist) node_verify_param = { 'filelist': file_names, 'nodelist': nodelist, @@ -752,11 +752,11 @@ class LUVerifyCluster(LogicalUnit): 'node-net-test': [(node.name, node.primary_ip, node.secondary_ip) for node in nodeinfo] } - all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param, - self.cfg.GetClusterName()) - all_rversion = rpc.call_version(nodelist) - all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName(), - self.cfg.GetHypervisorType()) + all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param, + self.cfg.GetClusterName()) + all_rversion = self.rpc.call_version(nodelist) + all_ninfo = self.rpc.call_node_info(nodelist, self.cfg.GetVGName(), + self.cfg.GetHypervisorType()) for node in nodelist: feedback_fn("* Verifying node %s" % node) @@ -970,7 +970,7 @@ class LUVerifyDisks(NoHooksLU): if not nv_dict: return result - node_lvs = rpc.call_volume_list(nodes, vg_name) + node_lvs = self.rpc.call_volume_list(nodes, vg_name) to_act = set() for node in nodes: @@ -1051,7 +1051,7 @@ class LURenameCluster(LogicalUnit): # shutdown the master IP master = self.cfg.GetMasterNode() - if not rpc.call_node_stop_master(master, False): + if not self.rpc.call_node_stop_master(master, False): raise errors.OpExecError("Could not disable the master role") try: @@ -1069,13 +1069,13 @@ class LURenameCluster(LogicalUnit): logger.Debug("Copying updated ssconf data to all nodes") for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]: fname = ss.KeyToFilename(keyname) - result = rpc.call_upload_file(dist_nodes, fname) + result = self.rpc.call_upload_file(dist_nodes, fname) for to_node in dist_nodes: if not result[to_node]: logger.Error("copy of file %s to node %s failed" % (fname, to_node)) finally: - if not rpc.call_node_start_master(master, False): + if not self.rpc.call_node_start_master(master, False): logger.Error("Could not re-enable the master role on the master," " please restart manually.") @@ -1145,7 +1145,7 @@ class LUSetClusterParams(LogicalUnit): # if vg_name not None, checks given volume group on all nodes if self.op.vg_name: node_list = self.acquired_locks[locking.LEVEL_NODE] - vglist = rpc.call_vg_list(node_list) + vglist = self.rpc.call_vg_list(node_list) for node in node_list: vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name, constants.MIN_VG_SIZE) @@ -1184,7 +1184,7 @@ def _WaitForSync(lu, instance, oneshot=False, unlock=False): max_time = 0 done = True cumul_degraded = False - rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks) + rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks) if not rstats: lu.proc.LogWarning("Can't get any data from node %s" % node) retries += 1 @@ -1238,7 +1238,7 @@ def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False): result = True if on_primary or dev.AssembleOnSecondary(): - rstats = rpc.call_blockdev_find(node, dev) + rstats = lu.rpc.call_blockdev_find(node, dev) if not rstats: logger.ToStderr("Node %s: Disk degraded, not found or node down" % node) result = False @@ -1313,7 +1313,7 @@ class LUDiagnoseOS(NoHooksLU): """ node_list = self.acquired_locks[locking.LEVEL_NODE] - node_data = rpc.call_os_diagnose(node_list) + node_data = self.rpc.call_os_diagnose(node_list) if node_data == False: raise errors.OpExecError("Can't gather the list of OSes") pol = self._DiagnoseByOS(node_list, node_data) @@ -1403,7 +1403,7 @@ class LURemoveNode(LogicalUnit): self.context.RemoveNode(node.name) - rpc.call_node_leave_cluster(node.name) + self.rpc.call_node_leave_cluster(node.name) class LUQueryNodes(NoHooksLU): @@ -1475,8 +1475,8 @@ class LUQueryNodes(NoHooksLU): if self.dynamic_fields.intersection(self.op.output_fields): live_data = {} - node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName(), - self.cfg.GetHypervisorType()) + node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(), + self.cfg.GetHypervisorType()) for name in nodenames: nodeinfo = node_data.get(name, None) if nodeinfo: @@ -1577,7 +1577,7 @@ class LUQueryNodeVolumes(NoHooksLU): """ nodenames = self.nodes - volumes = rpc.call_node_volumes(nodenames) + volumes = self.rpc.call_node_volumes(nodenames) ilist = [self.cfg.GetInstanceInfo(iname) for iname in self.cfg.GetInstanceList()] @@ -1732,7 +1732,7 @@ class LUAddNode(LogicalUnit): node = new_node.name # check connectivity - result = rpc.call_version([node])[node] + result = self.rpc.call_version([node])[node] if result: if constants.PROTOCOL_VERSION == result: logger.Info("communication to node %s fine, sw version %s match" % @@ -1759,8 +1759,9 @@ class LUAddNode(LogicalUnit): finally: f.close() - result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2], - keyarray[3], keyarray[4], keyarray[5]) + result = self.rpc.call_node_add(node, keyarray[0], keyarray[1], + keyarray[2], + keyarray[3], keyarray[4], keyarray[5]) if not result: raise errors.OpExecError("Cannot transfer ssh keys to the new node") @@ -1769,11 +1770,11 @@ class LUAddNode(LogicalUnit): utils.AddHostToEtcHosts(new_node.name) if new_node.secondary_ip != new_node.primary_ip: - if not rpc.call_node_tcp_ping(new_node.name, - constants.LOCALHOST_IP_ADDRESS, - new_node.secondary_ip, - constants.DEFAULT_NODED_PORT, - 10, False): + if not self.rpc.call_node_tcp_ping(new_node.name, + constants.LOCALHOST_IP_ADDRESS, + new_node.secondary_ip, + constants.DEFAULT_NODED_PORT, + 10, False): raise errors.OpExecError("Node claims it doesn't have the secondary ip" " you gave (%s). Please fix and re-run this" " command." % new_node.secondary_ip) @@ -1784,8 +1785,8 @@ class LUAddNode(LogicalUnit): # TODO: do a node-net-test as well? } - result = rpc.call_node_verify(node_verify_list, node_verify_param, - self.cfg.GetClusterName()) + result = self.rpc.call_node_verify(node_verify_list, node_verify_param, + self.cfg.GetClusterName()) for verifier in node_verify_list: if not result[verifier]: raise errors.OpExecError("Cannot communicate with %s's node daemon" @@ -1807,7 +1808,7 @@ class LUAddNode(LogicalUnit): logger.Debug("Copying hosts and known_hosts to all nodes") for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE): - result = rpc.call_upload_file(dist_nodes, fname) + result = self.rpc.call_upload_file(dist_nodes, fname) for to_node in dist_nodes: if not result[to_node]: logger.Error("copy of file %s to node %s failed" % @@ -1817,7 +1818,7 @@ class LUAddNode(LogicalUnit): if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors: to_copy.append(constants.VNC_PASSWORD_FILE) for fname in to_copy: - result = rpc.call_upload_file([node], fname) + result = self.rpc.call_upload_file([node], fname) if not result[node]: logger.Error("could not copy file %s to node %s" % (fname, node)) @@ -1968,7 +1969,7 @@ def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False): for inst_disk in instance.disks: for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node): lu.cfg.SetDiskID(node_disk, node) - result = rpc.call_blockdev_assemble(node, node_disk, iname, False) + result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False) if not result: logger.Error("could not prepare block device %s on node %s" " (is_primary=False, pass=1)" % (inst_disk.iv_name, node)) @@ -1983,7 +1984,7 @@ def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False): if node != instance.primary_node: continue lu.cfg.SetDiskID(node_disk, node) - result = rpc.call_blockdev_assemble(node, node_disk, iname, True) + result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True) if not result: logger.Error("could not prepare block device %s on node %s" " (is_primary=True, pass=2)" % (inst_disk.iv_name, node)) @@ -2054,8 +2055,8 @@ def _SafeShutdownInstanceDisks(lu, instance): _ShutdownInstanceDisks. """ - ins_l = rpc.call_instance_list([instance.primary_node], - [instance.hypervisor]) + ins_l = lu.rpc.call_instance_list([instance.primary_node], + [instance.hypervisor]) ins_l = ins_l[instance.primary_node] if not type(ins_l) is list: raise errors.OpExecError("Can't contact node '%s'" % @@ -2081,7 +2082,7 @@ def _ShutdownInstanceDisks(lu, instance, ignore_primary=False): for disk in instance.disks: for node, top_disk in disk.ComputeNodeTree(instance.primary_node): lu.cfg.SetDiskID(top_disk, node) - if not rpc.call_blockdev_shutdown(node, top_disk): + if not lu.rpc.call_blockdev_shutdown(node, top_disk): logger.Error("could not shutdown block device %s on node %s" % (disk.iv_name, node)) if not ignore_primary or node != instance.primary_node: @@ -2111,7 +2112,7 @@ def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor): we cannot check the node """ - nodeinfo = rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor) + nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor) if not nodeinfo or not isinstance(nodeinfo, dict): raise errors.OpPrereqError("Could not contact node %s for resource" " information" % (node,)) @@ -2189,7 +2190,7 @@ class LUStartupInstance(LogicalUnit): _StartInstanceDisks(self, instance, force) - if not rpc.call_instance_start(node_current, instance, extra_args): + if not self.rpc.call_instance_start(node_current, instance, extra_args): _ShutdownInstanceDisks(self, instance) raise errors.OpExecError("Could not start instance") @@ -2260,15 +2261,15 @@ class LURebootInstance(LogicalUnit): if reboot_type in [constants.INSTANCE_REBOOT_SOFT, constants.INSTANCE_REBOOT_HARD]: - if not rpc.call_instance_reboot(node_current, instance, - reboot_type, extra_args): + if not self.rpc.call_instance_reboot(node_current, instance, + reboot_type, extra_args): raise errors.OpExecError("Could not reboot instance") else: - if not rpc.call_instance_shutdown(node_current, instance): + if not self.rpc.call_instance_shutdown(node_current, instance): raise errors.OpExecError("could not shutdown instance for full reboot") _ShutdownInstanceDisks(self, instance) _StartInstanceDisks(self, instance, ignore_secondaries) - if not rpc.call_instance_start(node_current, instance, extra_args): + if not self.rpc.call_instance_start(node_current, instance, extra_args): _ShutdownInstanceDisks(self, instance) raise errors.OpExecError("Could not start instance for full reboot") @@ -2321,7 +2322,7 @@ class LUShutdownInstance(LogicalUnit): instance = self.instance node_current = instance.primary_node self.cfg.MarkInstanceDown(instance.name) - if not rpc.call_instance_shutdown(node_current, instance): + if not self.rpc.call_instance_shutdown(node_current, instance): logger.Error("could not shutdown instance") _ShutdownInstanceDisks(self, instance) @@ -2372,8 +2373,9 @@ class LUReinstallInstance(LogicalUnit): if instance.status != "down": raise errors.OpPrereqError("Instance '%s' is marked to be up" % self.op.instance_name) - remote_info = rpc.call_instance_info(instance.primary_node, instance.name, - instance.hypervisor) + remote_info = self.rpc.call_instance_info(instance.primary_node, + instance.name, + instance.hypervisor) if remote_info: raise errors.OpPrereqError("Instance '%s' is running on the node %s" % (self.op.instance_name, @@ -2387,7 +2389,7 @@ class LUReinstallInstance(LogicalUnit): if pnode is None: raise errors.OpPrereqError("Primary node '%s' is unknown" % self.op.pnode) - os_obj = rpc.call_os_get(pnode.name, self.op.os_type) + os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type) if not os_obj: raise errors.OpPrereqError("OS '%s' not in supported OS list for" " primary node" % self.op.os_type) @@ -2408,7 +2410,8 @@ class LUReinstallInstance(LogicalUnit): _StartInstanceDisks(self, inst, None) try: feedback_fn("Running the instance OS create scripts...") - if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"): + if not self.rpc.call_instance_os_add(inst.primary_node, inst, + "sda", "sdb"): raise errors.OpExecError("Could not install OS for instance %s" " on node %s" % (inst.name, inst.primary_node)) @@ -2450,8 +2453,9 @@ class LURenameInstance(LogicalUnit): if instance.status != "down": raise errors.OpPrereqError("Instance '%s' is marked to be up" % self.op.instance_name) - remote_info = rpc.call_instance_info(instance.primary_node, instance.name, - instance.hypervisor) + remote_info = self.rpc.call_instance_info(instance.primary_node, + instance.name, + instance.hypervisor) if remote_info: raise errors.OpPrereqError("Instance '%s' is running on the node %s" % (self.op.instance_name, @@ -2493,9 +2497,9 @@ class LURenameInstance(LogicalUnit): if inst.disk_template == constants.DT_FILE: new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1]) - result = rpc.call_file_storage_dir_rename(inst.primary_node, - old_file_storage_dir, - new_file_storage_dir) + result = self.rpc.call_file_storage_dir_rename(inst.primary_node, + old_file_storage_dir, + new_file_storage_dir) if not result: raise errors.OpExecError("Could not connect to node '%s' to rename" @@ -2512,8 +2516,9 @@ class LURenameInstance(LogicalUnit): _StartInstanceDisks(self, inst, None) try: - if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name, - "sda", "sdb"): + if not self.rpc.call_instance_run_rename(inst.primary_node, inst, + old_name, + "sda", "sdb"): msg = ("Could not run OS rename script for instance %s on node %s" " (but the instance has been renamed in Ganeti)" % (inst.name, inst.primary_node)) @@ -2568,7 +2573,7 @@ class LURemoveInstance(LogicalUnit): logger.Info("shutting down instance %s on node %s" % (instance.name, instance.primary_node)) - if not rpc.call_instance_shutdown(instance.primary_node, instance): + if not self.rpc.call_instance_shutdown(instance.primary_node, instance): if self.op.ignore_failures: feedback_fn("Warning: can't shutdown instance") else: @@ -2664,7 +2669,7 @@ class LUQueryInstances(NoHooksLU): bad_nodes = [] if self.dynamic_fields.intersection(self.op.output_fields): live_data = {} - node_data = rpc.call_all_instances_info(nodes, hv_list) + node_data = self.rpc.call_all_instances_info(nodes, hv_list) for name in nodes: result = node_data[name] if result: @@ -2820,7 +2825,7 @@ class LUFailoverInstance(LogicalUnit): # check bridge existance brlist = [nic.bridge for nic in instance.nics] - if not rpc.call_bridges_exist(target_node, brlist): + if not self.rpc.call_bridges_exist(target_node, brlist): raise errors.OpPrereqError("One or more target bridges %s does not" " exist on destination node '%s'" % (brlist, target_node)) @@ -2849,7 +2854,7 @@ class LUFailoverInstance(LogicalUnit): logger.Info("Shutting down instance %s on node %s" % (instance.name, source_node)) - if not rpc.call_instance_shutdown(source_node, instance): + if not self.rpc.call_instance_shutdown(source_node, instance): if self.op.ignore_consistency: logger.Error("Could not shutdown instance %s on node %s. Proceeding" " anyway. Please make sure node %s is down" % @@ -2879,7 +2884,7 @@ class LUFailoverInstance(LogicalUnit): raise errors.OpExecError("Can't activate the instance's disks") feedback_fn("* starting the instance on the target node") - if not rpc.call_instance_start(target_node, instance, None): + if not self.rpc.call_instance_start(target_node, instance, None): _ShutdownInstanceDisks(self, instance) raise errors.OpExecError("Could not start instance %s on node %s." % (instance.name, target_node)) @@ -2897,8 +2902,8 @@ def _CreateBlockDevOnPrimary(lu, node, instance, device, info): return False lu.cfg.SetDiskID(device, node) - new_id = rpc.call_blockdev_create(node, device, device.size, - instance.name, True, info) + new_id = lu.rpc.call_blockdev_create(node, device, device.size, + instance.name, True, info) if not new_id: return False if device.physical_id is None: @@ -2926,8 +2931,8 @@ def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info): if not force: return True lu.cfg.SetDiskID(device, node) - new_id = rpc.call_blockdev_create(node, device, device.size, - instance.name, False, info) + new_id = lu.rpc.call_blockdev_create(node, device, device.size, + instance.name, False, info) if not new_id: return False if device.physical_id is None: @@ -3049,8 +3054,8 @@ def _CreateDisks(lu, instance): if instance.disk_template == constants.DT_FILE: file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1]) - result = rpc.call_file_storage_dir_create(instance.primary_node, - file_storage_dir) + result = lu.rpc.call_file_storage_dir_create(instance.primary_node, + file_storage_dir) if not result: logger.Error("Could not connect to node '%s'" % instance.primary_node) @@ -3101,7 +3106,7 @@ def _RemoveDisks(lu, instance): for device in instance.disks: for node, disk in device.ComputeNodeTree(instance.primary_node): lu.cfg.SetDiskID(disk, node) - if not rpc.call_blockdev_remove(node, disk): + if not lu.rpc.call_blockdev_remove(node, disk): logger.Error("could not remove block device %s on node %s," " continuing anyway" % (device.iv_name, node)) @@ -3109,8 +3114,8 @@ def _RemoveDisks(lu, instance): if instance.disk_template == constants.DT_FILE: file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1]) - if not rpc.call_file_storage_dir_remove(instance.primary_node, - file_storage_dir): + if not lu.rpc.call_file_storage_dir_remove(instance.primary_node, + file_storage_dir): logger.Error("could not remove directory '%s'" % file_storage_dir) result = False @@ -3288,7 +3293,7 @@ class LUCreateInstance(LogicalUnit): {"size": self.op.swap_size, "mode": "w"}] nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None), "bridge": self.op.bridge}] - ial = IAllocator(self.cfg, + ial = IAllocator(self, mode=constants.IALLOCATOR_MODE_ALLOC, name=self.op.instance_name, disk_template=self.op.disk_template, @@ -3365,7 +3370,7 @@ class LUCreateInstance(LogicalUnit): src_node = self.op.src_node src_path = self.op.src_path - export_info = rpc.call_export_info(src_node, src_path) + export_info = self.rpc.call_export_info(src_node, src_path) if not export_info: raise errors.OpPrereqError("No export found in dir %s" % src_path) @@ -3435,8 +3440,8 @@ class LUCreateInstance(LogicalUnit): # Check lv size requirements if req_size is not None: nodenames = [pnode.name] + self.secondaries - nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName(), - self.op.hypervisor) + nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(), + self.op.hypervisor) for node in nodenames: info = nodeinfo.get(node, None) if not info: @@ -3452,7 +3457,7 @@ class LUCreateInstance(LogicalUnit): (node, info['vg_free'], req_size)) # os verification - os_obj = rpc.call_os_get(pnode.name, self.op.os_type) + os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type) if not os_obj: raise errors.OpPrereqError("OS '%s' not in supported os list for" " primary node" % self.op.os_type) @@ -3461,7 +3466,7 @@ class LUCreateInstance(LogicalUnit): raise errors.OpPrereqError("Can't set instance kernel to none") # bridge check on primary node - if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]): + if not self.rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]): raise errors.OpPrereqError("target bridge '%s' does not exist on" " destination node '%s'" % (self.op.bridge, pnode.name)) @@ -3610,7 +3615,7 @@ class LUCreateInstance(LogicalUnit): if iobj.disk_template != constants.DT_DISKLESS: if self.op.mode == constants.INSTANCE_CREATE: feedback_fn("* running the instance OS create scripts...") - if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"): + if not self.rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"): raise errors.OpExecError("could not add os for instance %s" " on node %s" % (instance, pnode_name)) @@ -3620,8 +3625,9 @@ class LUCreateInstance(LogicalUnit): src_node = self.op.src_node src_image = self.src_image cluster_name = self.cfg.GetClusterName() - if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb", - src_node, src_image, cluster_name): + if not self.rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb", + src_node, src_image, + cluster_name): raise errors.OpExecError("Could not import os for instance" " %s on node %s" % (instance, pnode_name)) @@ -3633,7 +3639,7 @@ class LUCreateInstance(LogicalUnit): if self.op.start: logger.Info("starting instance %s on node %s" % (instance, pnode_name)) feedback_fn("* starting instance...") - if not rpc.call_instance_start(pnode_name, iobj, None): + if not self.rpc.call_instance_start(pnode_name, iobj, None): raise errors.OpExecError("Could not start instance") @@ -3668,8 +3674,8 @@ class LUConnectConsole(NoHooksLU): instance = self.instance node = instance.primary_node - node_insts = rpc.call_instance_list([node], - [instance.hypervisor])[node] + node_insts = self.rpc.call_instance_list([node], + [instance.hypervisor])[node] if node_insts is False: raise errors.OpExecError("Can't connect to node %s." % node) @@ -3729,7 +3735,7 @@ class LUReplaceDisks(LogicalUnit): """Compute a new secondary node using an IAllocator. """ - ial = IAllocator(self.cfg, + ial = IAllocator(self, mode=constants.IALLOCATOR_MODE_RELOC, name=self.op.instance_name, relocate_from=[self.sec_node]) @@ -3871,7 +3877,7 @@ class LUReplaceDisks(LogicalUnit): self.proc.LogStep(1, steps_total, "check device existence") info("checking volume groups") my_vg = cfg.GetVGName() - results = rpc.call_vg_list([oth_node, tgt_node]) + results = self.rpc.call_vg_list([oth_node, tgt_node]) if not results: raise errors.OpExecError("Can't list volume groups on the nodes") for node in oth_node, tgt_node: @@ -3885,7 +3891,7 @@ class LUReplaceDisks(LogicalUnit): for node in tgt_node, oth_node: info("checking %s on %s" % (dev.iv_name, node)) cfg.SetDiskID(dev, node) - if not rpc.call_blockdev_find(node, dev): + if not self.rpc.call_blockdev_find(node, dev): raise errors.OpExecError("Can't find device %s on node %s" % (dev.iv_name, node)) @@ -3933,7 +3939,7 @@ class LUReplaceDisks(LogicalUnit): self.proc.LogStep(4, steps_total, "change drbd configuration") for dev, old_lvs, new_lvs in iv_names.itervalues(): info("detaching %s drbd from local storage" % dev.iv_name) - if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs): + if not self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs): raise errors.OpExecError("Can't detach drbd from local storage on node" " %s for device %s" % (tgt_node, dev.iv_name)) #dev.children = [] @@ -3952,17 +3958,17 @@ class LUReplaceDisks(LogicalUnit): # build the rename list based on what LVs exist on the node rlist = [] for to_ren in old_lvs: - find_res = rpc.call_blockdev_find(tgt_node, to_ren) + find_res = self.rpc.call_blockdev_find(tgt_node, to_ren) if find_res is not None: # device exists rlist.append((to_ren, ren_fn(to_ren, temp_suffix))) info("renaming the old LVs on the target node") - if not rpc.call_blockdev_rename(tgt_node, rlist): + if not self.rpc.call_blockdev_rename(tgt_node, rlist): raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node) # now we rename the new LVs to the old LVs info("renaming the new LVs on the target node") rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)] - if not rpc.call_blockdev_rename(tgt_node, rlist): + if not self.rpc.call_blockdev_rename(tgt_node, rlist): raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node) for old, new in zip(old_lvs, new_lvs): @@ -3975,9 +3981,9 @@ class LUReplaceDisks(LogicalUnit): # now that the new lvs have the old name, we can add them to the device info("adding new mirror component on %s" % tgt_node) - if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs): + if not self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs): for new_lv in new_lvs: - if not rpc.call_blockdev_remove(tgt_node, new_lv): + if not self.rpc.call_blockdev_remove(tgt_node, new_lv): warning("Can't rollback device %s", hint="manually cleanup unused" " logical volumes") raise errors.OpExecError("Can't add local storage to drbd") @@ -3996,7 +4002,7 @@ class LUReplaceDisks(LogicalUnit): # so check manually all the devices for name, (dev, old_lvs, new_lvs) in iv_names.iteritems(): cfg.SetDiskID(dev, instance.primary_node) - is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5] + is_degr = self.rpc.call_blockdev_find(instance.primary_node, dev)[5] if is_degr: raise errors.OpExecError("DRBD device %s is degraded!" % name) @@ -4006,7 +4012,7 @@ class LUReplaceDisks(LogicalUnit): info("remove logical volumes for %s" % name) for lv in old_lvs: cfg.SetDiskID(lv, tgt_node) - if not rpc.call_blockdev_remove(tgt_node, lv): + if not self.rpc.call_blockdev_remove(tgt_node, lv): warning("Can't remove old LV", hint="manually remove unused LVs") continue @@ -4044,7 +4050,7 @@ class LUReplaceDisks(LogicalUnit): self.proc.LogStep(1, steps_total, "check device existence") info("checking volume groups") my_vg = cfg.GetVGName() - results = rpc.call_vg_list([pri_node, new_node]) + results = self.rpc.call_vg_list([pri_node, new_node]) if not results: raise errors.OpExecError("Can't list volume groups on the nodes") for node in pri_node, new_node: @@ -4057,7 +4063,7 @@ class LUReplaceDisks(LogicalUnit): continue info("checking %s on %s" % (dev.iv_name, pri_node)) cfg.SetDiskID(dev, pri_node) - if not rpc.call_blockdev_find(pri_node, dev): + if not self.rpc.call_blockdev_find(pri_node, dev): raise errors.OpExecError("Can't find device %s on node %s" % (dev.iv_name, pri_node)) @@ -4124,7 +4130,7 @@ class LUReplaceDisks(LogicalUnit): # we have new devices, shutdown the drbd on the old secondary info("shutting down drbd for %s on old node" % dev.iv_name) cfg.SetDiskID(dev, old_node) - if not rpc.call_blockdev_shutdown(old_node, dev): + if not self.rpc.call_blockdev_shutdown(old_node, dev): warning("Failed to shutdown drbd for %s on old node" % dev.iv_name, hint="Please cleanup this device manually as soon as possible") @@ -4137,7 +4143,7 @@ class LUReplaceDisks(LogicalUnit): dev.physical_id = (None, None, None, None) + dev.physical_id[4:] # and 'find' the device, which will 'fix' it to match the # standalone state - if rpc.call_blockdev_find(pri_node, dev): + if self.rpc.call_blockdev_find(pri_node, dev): done += 1 else: warning("Failed to detach drbd %s from network, unusual case" % @@ -4169,7 +4175,7 @@ class LUReplaceDisks(LogicalUnit): # is correct cfg.SetDiskID(dev, pri_node) logging.debug("Disk to attach: %s", dev) - if not rpc.call_blockdev_find(pri_node, dev): + if not self.rpc.call_blockdev_find(pri_node, dev): warning("can't attach drbd %s to new secondary!" % dev.iv_name, "please do a gnt-instance info to see the status of disks") @@ -4182,7 +4188,7 @@ class LUReplaceDisks(LogicalUnit): # so check manually all the devices for name, (dev, old_lvs, _) in iv_names.iteritems(): cfg.SetDiskID(dev, pri_node) - is_degr = rpc.call_blockdev_find(pri_node, dev)[5] + is_degr = self.rpc.call_blockdev_find(pri_node, dev)[5] if is_degr: raise errors.OpExecError("DRBD device %s is degraded!" % name) @@ -4191,7 +4197,7 @@ class LUReplaceDisks(LogicalUnit): info("remove logical volumes for %s" % name) for lv in old_lvs: cfg.SetDiskID(lv, old_node) - if not rpc.call_blockdev_remove(old_node, lv): + if not self.rpc.call_blockdev_remove(old_node, lv): warning("Can't remove LV on old secondary", hint="Cleanup stale volumes by hand") @@ -4280,8 +4286,8 @@ class LUGrowDisk(LogicalUnit): (self.op.disk, instance.name)) nodenames = [instance.primary_node] + list(instance.secondary_nodes) - nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName(), - instance.hypervisor) + nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(), + instance.hypervisor) for node in nodenames: info = nodeinfo.get(node, None) if not info: @@ -4304,8 +4310,9 @@ class LUGrowDisk(LogicalUnit): disk = instance.FindDisk(self.op.disk) for node in (instance.secondary_nodes + (instance.primary_node,)): self.cfg.SetDiskID(disk, node) - result = rpc.call_blockdev_grow(node, disk, self.op.amount) - if not result or not isinstance(result, (list, tuple)) or len(result) != 2: + result = self.rpc.call_blockdev_grow(node, disk, self.op.amount) + if (not result or not isinstance(result, (list, tuple)) or + len(result) != 2): raise errors.OpExecError("grow request failed to node %s" % node) elif not result[0]: raise errors.OpExecError("grow request failed to node %s: %s" % @@ -4367,7 +4374,7 @@ class LUQueryInstanceData(NoHooksLU): """ self.cfg.SetDiskID(dev, instance.primary_node) - dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev) + dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev) if dev.dev_type in constants.LDS_DRBD: # we change the snode then (otherwise we use the one passed in) if dev.logical_id[0] == instance.primary_node: @@ -4377,7 +4384,7 @@ class LUQueryInstanceData(NoHooksLU): if snode: self.cfg.SetDiskID(dev, snode) - dev_sstatus = rpc.call_blockdev_find(snode, dev) + dev_sstatus = self.rpc.call_blockdev_find(snode, dev) else: dev_sstatus = None @@ -4403,9 +4410,9 @@ class LUQueryInstanceData(NoHooksLU): """Gather and return data""" result = {} for instance in self.wanted_instances: - remote_info = rpc.call_instance_info(instance.primary_node, - instance.name, - instance.hypervisor) + remote_info = self.rpc.call_instance_info(instance.primary_node, + instance.name, + instance.hypervisor) if remote_info and "state" in remote_info: remote_state = "up" else: @@ -4629,10 +4636,10 @@ class LUSetInstanceParams(LogicalUnit): pnode = self.instance.primary_node nodelist = [pnode] nodelist.extend(instance.secondary_nodes) - instance_info = rpc.call_instance_info(pnode, instance.name, - instance.hypervisor) - nodeinfo = rpc.call_node_info(nodelist, self.cfg.GetVGName(), - instance.hypervisor) + instance_info = self.rpc.call_instance_info(pnode, instance.name, + instance.hypervisor) + nodeinfo = self.rpc.call_node_info(nodelist, self.cfg.GetVGName(), + instance.hypervisor) if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict): # Assume the primary node is unreachable and go ahead @@ -4768,7 +4775,7 @@ class LUQueryExports(NoHooksLU): that node. """ - return rpc.call_export_list(self.nodes) + return self.rpc.call_export_list(self.nodes) class LUExportInstance(LogicalUnit): @@ -4843,7 +4850,7 @@ class LUExportInstance(LogicalUnit): src_node = instance.primary_node if self.op.shutdown: # shutdown the instance, but not the disks - if not rpc.call_instance_shutdown(src_node, instance): + if not self.rpc.call_instance_shutdown(src_node, instance): raise errors.OpExecError("Could not shutdown instance %s on node %s" % (instance.name, src_node)) @@ -4855,7 +4862,7 @@ class LUExportInstance(LogicalUnit): for disk in instance.disks: if disk.iv_name == "sda": # new_dev_name will be a snapshot of an lvm leaf of the one we passed - new_dev_name = rpc.call_blockdev_snapshot(src_node, disk) + new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk) if not new_dev_name: logger.Error("could not snapshot block device %s on node %s" % @@ -4869,7 +4876,7 @@ class LUExportInstance(LogicalUnit): finally: if self.op.shutdown and instance.status == "up": - if not rpc.call_instance_start(src_node, instance, None): + if not self.rpc.call_instance_start(src_node, instance, None): _ShutdownInstanceDisks(self, instance) raise errors.OpExecError("Could not start instance") @@ -4877,15 +4884,15 @@ class LUExportInstance(LogicalUnit): cluster_name = self.cfg.GetClusterName() for dev in snap_disks: - if not rpc.call_snapshot_export(src_node, dev, dst_node.name, + if not self.rpc.call_snapshot_export(src_node, dev, dst_node.name, instance, cluster_name): logger.Error("could not export block device %s from node %s to node %s" % (dev.logical_id[1], src_node, dst_node.name)) - if not rpc.call_blockdev_remove(src_node, dev): + if not self.rpc.call_blockdev_remove(src_node, dev): logger.Error("could not remove snapshot block device %s from node %s" % (dev.logical_id[1], src_node)) - if not rpc.call_finalize_export(dst_node.name, instance, snap_disks): + if not self.rpc.call_finalize_export(dst_node.name, instance, snap_disks): logger.Error("could not finalize export for instance %s on node %s" % (instance.name, dst_node.name)) @@ -4896,10 +4903,10 @@ class LUExportInstance(LogicalUnit): # if we proceed the backup would be removed because OpQueryExports # substitutes an empty list with the full cluster node list. if nodelist: - exportlist = rpc.call_export_list(nodelist) + exportlist = self.rpc.call_export_list(nodelist) for node in exportlist: if instance.name in exportlist[node]: - if not rpc.call_export_remove(node, instance.name): + if not self.rpc.call_export_remove(node, instance.name): logger.Error("could not remove older export for instance %s" " on node %s" % (instance.name, node)) @@ -4935,12 +4942,13 @@ class LURemoveExport(NoHooksLU): fqdn_warn = True instance_name = self.op.instance_name - exportlist = rpc.call_export_list(self.acquired_locks[locking.LEVEL_NODE]) + exportlist = self.rpc.call_export_list(self.acquired_locks[ + locking.LEVEL_NODE]) found = False for node in exportlist: if instance_name in exportlist[node]: found = True - if not rpc.call_export_remove(node, instance_name): + if not self.rpc.call_export_remove(node, instance_name): logger.Error("could not remove export for instance %s" " on node %s" % (instance_name, node)) @@ -5153,7 +5161,7 @@ class LUTestDelay(NoHooksLU): if not utils.TestDelay(self.op.duration): raise errors.OpExecError("Error during master delay test") if self.op.on_nodes: - result = rpc.call_test_delay(self.op.on_nodes, self.op.duration) + result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration) if not result: raise errors.OpExecError("Complete failure from rpc call") for node, node_result in result.items(): @@ -5183,8 +5191,8 @@ class IAllocator(object): "relocate_from", ] - def __init__(self, cfg, mode, name, **kwargs): - self.cfg = cfg + def __init__(self, lu, mode, name, **kwargs): + self.lu = lu # init buffer variables self.in_text = self.out_text = self.in_data = self.out_data = None # init all input fields so that pylint is happy @@ -5221,12 +5229,12 @@ class IAllocator(object): This is the data that is independent of the actual operation. """ - cfg = self.cfg + cfg = self.lu.cfg cluster_info = cfg.GetClusterInfo() # cluster data data = { "version": 1, - "cluster_name": self.cfg.GetClusterName(), + "cluster_name": cfg.GetClusterName(), "cluster_tags": list(cluster_info.GetTags()), "enable_hypervisors": list(cluster_info.enabled_hypervisors), # we don't have job IDs @@ -5239,8 +5247,8 @@ class IAllocator(object): node_list = cfg.GetNodeList() # FIXME: here we have only one hypervisor information, but # instance can belong to different hypervisors - node_data = rpc.call_node_info(node_list, cfg.GetVGName(), - cfg.GetHypervisorType()) + node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(), + cfg.GetHypervisorType()) for nname in node_list: ninfo = cfg.GetNodeInfo(nname) if nname not in node_data or not isinstance(node_data[nname], dict): @@ -5350,7 +5358,7 @@ class IAllocator(object): done. """ - instance = self.cfg.GetInstanceInfo(self.name) + instance = self.lu.cfg.GetInstanceInfo(self.name) if instance is None: raise errors.ProgrammerError("Unknown instance '%s' passed to" " IAllocator" % self.name) @@ -5389,13 +5397,15 @@ class IAllocator(object): self.in_text = serializer.Dump(self.in_data) - def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner): + def Run(self, name, validate=True, call_fn=None): """Run an instance allocator and return the results. """ + if call_fn is None: + call_fn = self.lu.rpc.call_iallocator_runner data = self.in_text - result = call_fn(self.cfg.GetMasterNode(), name, self.in_text) + result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text) if not isinstance(result, (list, tuple)) or len(result) != 4: raise errors.OpExecError("Invalid result from master iallocator runner") @@ -5508,7 +5518,7 @@ class LUTestAllocator(NoHooksLU): """ if self.op.mode == constants.IALLOCATOR_MODE_ALLOC: - ial = IAllocator(self.cfg, + ial = IAllocator(self, mode=self.op.mode, name=self.op.name, mem_size=self.op.mem_size, @@ -5520,7 +5530,7 @@ class LUTestAllocator(NoHooksLU): vcpus=self.op.vcpus, ) else: - ial = IAllocator(self.cfg, + ial = IAllocator(self, mode=self.op.mode, name=self.op.name, relocate_from=list(self.relocate_from), diff --git a/lib/config.py b/lib/config.py index e4c055c0bf08caf2f5de9743305d31f41de89d24..2b30e608bc8196d67d393619d77f3d135f448049 100644 --- a/lib/config.py +++ b/lib/config.py @@ -821,7 +821,7 @@ class ConfigWriter: except ValueError: pass - result = rpc.call_upload_file(nodelist, self._cfg_file) + result = rpc.RpcRunner.call_upload_file(nodelist, self._cfg_file) for node in nodelist: if not result[node]: logging.error("copy of file %s to node %s failed", diff --git a/lib/jqueue.py b/lib/jqueue.py index a4c7b1d8462d61ef0f3155e188402a6ffe8eb9cc..54a3562dcf477f5acb1ad38c7b274a787cb56009 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -45,6 +45,7 @@ from ganeti import utils from ganeti import jstore from ganeti import rpc +from ganeti.rpc import RpcRunner JOBQUEUE_THREADS = 25 @@ -404,7 +405,7 @@ class JobQueue(object): assert node_name != self._my_hostname # Clean queue directory on added node - rpc.call_jobqueue_purge(node_name) + RpcRunner.call_jobqueue_purge(node_name) # Upload the whole queue excluding archived jobs files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()] @@ -420,7 +421,7 @@ class JobQueue(object): finally: fd.close() - result = rpc.call_jobqueue_update([node_name], file_name, content) + result = RpcRunner.call_jobqueue_update([node_name], file_name, content) if not result[node_name]: logging.error("Failed to upload %s to %s", file_name, node_name) @@ -459,14 +460,14 @@ class JobQueue(object): """ utils.WriteFile(file_name, data=data) - result = rpc.call_jobqueue_update(self._nodes, file_name, data) + result = RpcRunner.call_jobqueue_update(self._nodes, file_name, data) self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name) def _RenameFileUnlocked(self, old, new): os.rename(old, new) - result = rpc.call_jobqueue_rename(self._nodes, old, new) + result = RpcRunner.call_jobqueue_rename(self._nodes, old, new) self._CheckRpcResult(result, self._nodes, "Moving %s to %s" % (old, new)) diff --git a/lib/mcpu.py b/lib/mcpu.py index af0202abe56867725772f2d74363545656917169..949b1e6c507cc9cf760174eeeb3bb3b78c994801 100644 --- a/lib/mcpu.py +++ b/lib/mcpu.py @@ -97,6 +97,7 @@ class Processor(object): self.context = context self._feedback_fn = None self.exclusive_BGL = False + self.rpc = rpc.RpcRunner(context.cfg) def _ExecLU(self, lu): """Logical Unit execution sequence. @@ -104,7 +105,7 @@ class Processor(object): """ write_count = self.context.cfg.write_count lu.CheckPrereq() - hm = HooksMaster(rpc.call_hooks_runner, self, lu) + hm = HooksMaster(self.rpc.call_hooks_runner, self, lu) h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE) lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results, self._feedback_fn, None) @@ -202,7 +203,7 @@ class Processor(object): shared=not lu_class.REQ_BGL) try: self.exclusive_BGL = lu_class.REQ_BGL - lu = lu_class(self, op, self.context) + lu = lu_class(self, op, self.context, self.rpc) lu.ExpandNames() assert lu.needed_locks is not None, "needed_locks not set by LU" result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE) diff --git a/lib/rpc.py b/lib/rpc.py index 02f770856db86b76e21d63724230727d01a68127..bb880dcb9d871e7f7efef0f5b153db64379155e4 100644 --- a/lib/rpc.py +++ b/lib/rpc.py @@ -23,7 +23,12 @@ """ -# pylint: disable-msg=C0103 +# pylint: disable-msg=C0103,R0201,R0904 +# C0103: Invalid name, since call_ are not valid +# R0201: Method could be a function, we keep all rpcs instance methods +# as not to change them back and forth between static/instance methods +# if they need to start using instance attributes +# R0904: Too many public methods import os import socket @@ -140,748 +145,793 @@ class Client: self.results[node] = nc.get_response() -def call_volume_list(node_list, vg_name): - """Gets the logical volumes present in a given volume group. +class RpcRunner(object): + """RPC runner class""" - This is a multi-node call. + def __init__(self, cfg): + """Initialized the rpc runner. - """ - c = Client("volume_list", [vg_name]) - c.connect_list(node_list) - c.run() - return c.getresult() + @type cfg: C{config.ConfigWriter} + @param cfg: the configuration object that will be used to get data + about the cluster + """ + self._cfg = cfg -def call_vg_list(node_list): - """Gets the volume group list. + def call_volume_list(self, node_list, vg_name): + """Gets the logical volumes present in a given volume group. - This is a multi-node call. + This is a multi-node call. - """ - c = Client("vg_list", []) - c.connect_list(node_list) - c.run() - return c.getresult() + """ + c = Client("volume_list", [vg_name]) + c.connect_list(node_list) + c.run() + return c.getresult() + def call_vg_list(self, node_list): + """Gets the volume group list. -def call_bridges_exist(node, bridges_list): - """Checks if a node has all the bridges given. + This is a multi-node call. - This method checks if all bridges given in the bridges_list are - present on the remote node, so that an instance that uses interfaces - on those bridges can be started. + """ + c = Client("vg_list", []) + c.connect_list(node_list) + c.run() + return c.getresult() - This is a single-node call. - """ - c = Client("bridges_exist", [bridges_list]) - c.connect(node) - c.run() - return c.getresult().get(node, False) + def call_bridges_exist(self, node, bridges_list): + """Checks if a node has all the bridges given. + This method checks if all bridges given in the bridges_list are + present on the remote node, so that an instance that uses interfaces + on those bridges can be started. -def call_instance_start(node, instance, extra_args): - """Starts an instance. + This is a single-node call. - This is a single-node call. + """ + c = Client("bridges_exist", [bridges_list]) + c.connect(node) + c.run() + return c.getresult().get(node, False) - """ - c = Client("instance_start", [instance.ToDict(), extra_args]) - c.connect(node) - c.run() - return c.getresult().get(node, False) + def call_instance_start(self, node, instance, extra_args): + """Starts an instance. -def call_instance_shutdown(node, instance): - """Stops an instance. + This is a single-node call. - This is a single-node call. + """ + c = Client("instance_start", [instance.ToDict(), extra_args]) + c.connect(node) + c.run() + return c.getresult().get(node, False) - """ - c = Client("instance_shutdown", [instance.ToDict()]) - c.connect(node) - c.run() - return c.getresult().get(node, False) + def call_instance_shutdown(self, node, instance): + """Stops an instance. -def call_instance_migrate(node, instance, target, live): - """Migrate an instance. + This is a single-node call. - This is a single-node call. + """ + c = Client("instance_shutdown", [instance.ToDict()]) + c.connect(node) + c.run() + return c.getresult().get(node, False) - @type node: string - @param node: the node on which the instance is currently running - @type instance: C{objects.Instance} - @param instance: the instance definition - @type target: string - @param target: the target node name - @type live: boolean - @param live: whether the migration should be done live or not (the - interpretation of this parameter is left to the hypervisor) - """ - c = Client("instance_migrate", [instance.ToDict(), target, live]) - c.connect(node) - c.run() - return c.getresult().get(node, False) + def call_instance_migrate(self, node, instance, target, live): + """Migrate an instance. + This is a single-node call. -def call_instance_reboot(node, instance, reboot_type, extra_args): - """Reboots an instance. + @type node: string + @param node: the node on which the instance is currently running + @type instance: C{objects.Instance} + @param instance: the instance definition + @type target: string + @param target: the target node name + @type live: boolean + @param live: whether the migration should be done live or not (the + interpretation of this parameter is left to the hypervisor) - This is a single-node call. + """ + c = Client("instance_migrate", [instance.ToDict(), target, live]) + c.connect(node) + c.run() + return c.getresult().get(node, False) - """ - c = Client("instance_reboot", [instance.ToDict(), reboot_type, extra_args]) - c.connect(node) - c.run() - return c.getresult().get(node, False) + def call_instance_reboot(self, node, instance, reboot_type, extra_args): + """Reboots an instance. -def call_instance_os_add(node, inst, osdev, swapdev): - """Installs an OS on the given instance. + This is a single-node call. - This is a single-node call. + """ + c = Client("instance_reboot", [instance.ToDict(), reboot_type, extra_args]) + c.connect(node) + c.run() + return c.getresult().get(node, False) - """ - params = [inst.ToDict(), osdev, swapdev] - c = Client("instance_os_add", params) - c.connect(node) - c.run() - return c.getresult().get(node, False) + def call_instance_os_add(self, node, inst, osdev, swapdev): + """Installs an OS on the given instance. -def call_instance_run_rename(node, inst, old_name, osdev, swapdev): - """Run the OS rename script for an instance. + This is a single-node call. - This is a single-node call. + """ + params = [inst.ToDict(), osdev, swapdev] + c = Client("instance_os_add", params) + c.connect(node) + c.run() + return c.getresult().get(node, False) - """ - params = [inst.ToDict(), old_name, osdev, swapdev] - c = Client("instance_run_rename", params) - c.connect(node) - c.run() - return c.getresult().get(node, False) + def call_instance_run_rename(self, node, inst, old_name, osdev, swapdev): + """Run the OS rename script for an instance. -def call_instance_info(node, instance, hname): - """Returns information about a single instance. + This is a single-node call. - This is a single-node call. + """ + params = [inst.ToDict(), old_name, osdev, swapdev] + c = Client("instance_run_rename", params) + c.connect(node) + c.run() + return c.getresult().get(node, False) - @type node_list: list - @param node_list: the list of nodes to query - @type instance: string - @param instance: the instance name - @type hname: string - @param hname: the hypervisor type of the instance - """ - c = Client("instance_info", [instance]) - c.connect(node) - c.run() - return c.getresult().get(node, False) + def call_instance_info(self, node, instance, hname): + """Returns information about a single instance. + This is a single-node call. -def call_all_instances_info(node_list, hypervisor_list): - """Returns information about all instances on the given nodes. + @type node_list: list + @param node_list: the list of nodes to query + @type instance: string + @param instance: the instance name + @type hname: string + @param hname: the hypervisor type of the instance - This is a multi-node call. + """ + c = Client("instance_info", [instance]) + c.connect(node) + c.run() + return c.getresult().get(node, False) - @type node_list: list - @param node_list: the list of nodes to query - @type hypervisor_list: list - @param hypervisor_list: the hypervisors to query for instances - """ - c = Client("all_instances_info", [hypervisor_list]) - c.connect_list(node_list) - c.run() - return c.getresult() + def call_all_instances_info(self, node_list, hypervisor_list): + """Returns information about all instances on the given nodes. + This is a multi-node call. -def call_instance_list(node_list, hypervisor_list): - """Returns the list of running instances on a given node. + @type node_list: list + @param node_list: the list of nodes to query + @type hypervisor_list: list + @param hypervisor_list: the hypervisors to query for instances - This is a multi-node call. + """ + c = Client("all_instances_info", [hypervisor_list]) + c.connect_list(node_list) + c.run() + return c.getresult() - @type node_list: list - @param node_list: the list of nodes to query - @type hypervisor_list: list - @param hypervisor_list: the hypervisors to query for instances - """ - c = Client("instance_list", [hypervisor_list]) - c.connect_list(node_list) - c.run() - return c.getresult() + def call_instance_list(self, node_list, hypervisor_list): + """Returns the list of running instances on a given node. + This is a multi-node call. -def call_node_tcp_ping(node, source, target, port, timeout, live_port_needed): - """Do a TcpPing on the remote node + @type node_list: list + @param node_list: the list of nodes to query + @type hypervisor_list: list + @param hypervisor_list: the hypervisors to query for instances - This is a single-node call. - """ - c = Client("node_tcp_ping", [source, target, port, timeout, - live_port_needed]) - c.connect(node) - c.run() - return c.getresult().get(node, False) + """ + c = Client("instance_list", [hypervisor_list]) + c.connect_list(node_list) + c.run() + return c.getresult() -def call_node_info(node_list, vg_name, hypervisor_type): - """Return node information. + def call_node_tcp_ping(self, node, source, target, port, timeout, + live_port_needed): + """Do a TcpPing on the remote node - This will return memory information and volume group size and free - space. + This is a single-node call. + """ + c = Client("node_tcp_ping", [source, target, port, timeout, + live_port_needed]) + c.connect(node) + c.run() + return c.getresult().get(node, False) - This is a multi-node call. - @type node_list: list - @param node_list: the list of nodes to query - @type vgname: C{string} - @param vgname: the name of the volume group to ask for disk space information - @type hypervisor_type: C{str} - @param hypervisor_type: the name of the hypervisor to ask for - memory information + def call_node_info(self, node_list, vg_name, hypervisor_type): + """Return node information. - """ - c = Client("node_info", [vg_name, hypervisor_type]) - c.connect_list(node_list) - c.run() - retux = c.getresult() + This will return memory information and volume group size and free + space. - for node_name in retux: - ret = retux.get(node_name, False) - if type(ret) != dict: - logger.Error("could not connect to node %s" % (node_name)) - ret = {} + This is a multi-node call. - utils.CheckDict(ret, - { 'memory_total' : '-', - 'memory_dom0' : '-', - 'memory_free' : '-', - 'vg_size' : 'node_unreachable', - 'vg_free' : '-' }, - "call_node_info", - ) - return retux + @type node_list: list + @param node_list: the list of nodes to query + @type vgname: C{string} + @param vgname: the name of the volume group to ask for disk space + information + @type hypervisor_type: C{str} + @param hypervisor_type: the name of the hypervisor to ask for + memory information + """ + c = Client("node_info", [vg_name, hypervisor_type]) + c.connect_list(node_list) + c.run() + retux = c.getresult() -def call_node_add(node, dsa, dsapub, rsa, rsapub, ssh, sshpub): - """Add a node to the cluster. + for node_name in retux: + ret = retux.get(node_name, False) + if type(ret) != dict: + logger.Error("could not connect to node %s" % (node_name)) + ret = {} - This is a single-node call. + utils.CheckDict(ret, + { 'memory_total' : '-', + 'memory_dom0' : '-', + 'memory_free' : '-', + 'vg_size' : 'node_unreachable', + 'vg_free' : '-' }, + "call_node_info", + ) + return retux - """ - params = [dsa, dsapub, rsa, rsapub, ssh, sshpub] - c = Client("node_add", params) - c.connect(node) - c.run() - return c.getresult().get(node, False) + def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub): + """Add a node to the cluster. -def call_node_verify(node_list, checkdict, cluster_name): - """Request verification of given parameters. + This is a single-node call. - This is a multi-node call. + """ + params = [dsa, dsapub, rsa, rsapub, ssh, sshpub] + c = Client("node_add", params) + c.connect(node) + c.run() + return c.getresult().get(node, False) - """ - c = Client("node_verify", [checkdict, cluster_name]) - c.connect_list(node_list) - c.run() - return c.getresult() + def call_node_verify(self, node_list, checkdict, cluster_name): + """Request verification of given parameters. -def call_node_start_master(node, start_daemons): - """Tells a node to activate itself as a master. + This is a multi-node call. - This is a single-node call. + """ + c = Client("node_verify", [checkdict, cluster_name]) + c.connect_list(node_list) + c.run() + return c.getresult() - """ - c = Client("node_start_master", [start_daemons]) - c.connect(node) - c.run() - return c.getresult().get(node, False) + @staticmethod + def call_node_start_master(node, start_daemons): + """Tells a node to activate itself as a master. -def call_node_stop_master(node, stop_daemons): - """Tells a node to demote itself from master status. + This is a single-node call. - This is a single-node call. + """ + c = Client("node_start_master", [start_daemons]) + c.connect(node) + c.run() + return c.getresult().get(node, False) - """ - c = Client("node_stop_master", [stop_daemons]) - c.connect(node) - c.run() - return c.getresult().get(node, False) + @staticmethod + def call_node_stop_master(node, stop_daemons): + """Tells a node to demote itself from master status. -def call_master_info(node_list): - """Query master info. + This is a single-node call. - This is a multi-node call. + """ + c = Client("node_stop_master", [stop_daemons]) + c.connect(node) + c.run() + return c.getresult().get(node, False) - """ - c = Client("master_info", []) - c.connect_list(node_list) - c.run() - return c.getresult() + @staticmethod + def call_master_info(node_list): + """Query master info. -def call_version(node_list): - """Query node version. + This is a multi-node call. - This is a multi-node call. + """ + # TODO: should this method query down nodes? + c = Client("master_info", []) + c.connect_list(node_list) + c.run() + return c.getresult() - """ - c = Client("version", []) - c.connect_list(node_list) - c.run() - return c.getresult() + def call_version(self, node_list): + """Query node version. -def call_blockdev_create(node, bdev, size, owner, on_primary, info): - """Request creation of a given block device. + This is a multi-node call. - This is a single-node call. + """ + c = Client("version", []) + c.connect_list(node_list) + c.run() + return c.getresult() - """ - params = [bdev.ToDict(), size, owner, on_primary, info] - c = Client("blockdev_create", params) - c.connect(node) - c.run() - return c.getresult().get(node, False) + def call_blockdev_create(self, node, bdev, size, owner, on_primary, info): + """Request creation of a given block device. -def call_blockdev_remove(node, bdev): - """Request removal of a given block device. + This is a single-node call. - This is a single-node call. + """ + params = [bdev.ToDict(), size, owner, on_primary, info] + c = Client("blockdev_create", params) + c.connect(node) + c.run() + return c.getresult().get(node, False) - """ - c = Client("blockdev_remove", [bdev.ToDict()]) - c.connect(node) - c.run() - return c.getresult().get(node, False) + def call_blockdev_remove(self, node, bdev): + """Request removal of a given block device. -def call_blockdev_rename(node, devlist): - """Request rename of the given block devices. + This is a single-node call. - This is a single-node call. + """ + c = Client("blockdev_remove", [bdev.ToDict()]) + c.connect(node) + c.run() + return c.getresult().get(node, False) - """ - params = [(d.ToDict(), uid) for d, uid in devlist] - c = Client("blockdev_rename", params) - c.connect(node) - c.run() - return c.getresult().get(node, False) + def call_blockdev_rename(self, node, devlist): + """Request rename of the given block devices. -def call_blockdev_assemble(node, disk, owner, on_primary): - """Request assembling of a given block device. + This is a single-node call. - This is a single-node call. + """ + params = [(d.ToDict(), uid) for d, uid in devlist] + c = Client("blockdev_rename", params) + c.connect(node) + c.run() + return c.getresult().get(node, False) - """ - params = [disk.ToDict(), owner, on_primary] - c = Client("blockdev_assemble", params) - c.connect(node) - c.run() - return c.getresult().get(node, False) + def call_blockdev_assemble(self, node, disk, owner, on_primary): + """Request assembling of a given block device. -def call_blockdev_shutdown(node, disk): - """Request shutdown of a given block device. + This is a single-node call. - This is a single-node call. + """ + params = [disk.ToDict(), owner, on_primary] + c = Client("blockdev_assemble", params) + c.connect(node) + c.run() + return c.getresult().get(node, False) - """ - c = Client("blockdev_shutdown", [disk.ToDict()]) - c.connect(node) - c.run() - return c.getresult().get(node, False) + def call_blockdev_shutdown(self, node, disk): + """Request shutdown of a given block device. -def call_blockdev_addchildren(node, bdev, ndevs): - """Request adding a list of children to a (mirroring) device. + This is a single-node call. - This is a single-node call. + """ + c = Client("blockdev_shutdown", [disk.ToDict()]) + c.connect(node) + c.run() + return c.getresult().get(node, False) - """ - params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]] - c = Client("blockdev_addchildren", params) - c.connect(node) - c.run() - return c.getresult().get(node, False) + def call_blockdev_addchildren(self, node, bdev, ndevs): + """Request adding a list of children to a (mirroring) device. -def call_blockdev_removechildren(node, bdev, ndevs): - """Request removing a list of children from a (mirroring) device. + This is a single-node call. - This is a single-node call. + """ + params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]] + c = Client("blockdev_addchildren", params) + c.connect(node) + c.run() + return c.getresult().get(node, False) - """ - params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]] - c = Client("blockdev_removechildren", params) - c.connect(node) - c.run() - return c.getresult().get(node, False) + def call_blockdev_removechildren(self, node, bdev, ndevs): + """Request removing a list of children from a (mirroring) device. -def call_blockdev_getmirrorstatus(node, disks): - """Request status of a (mirroring) device. + This is a single-node call. - This is a single-node call. + """ + params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]] + c = Client("blockdev_removechildren", params) + c.connect(node) + c.run() + return c.getresult().get(node, False) - """ - params = [dsk.ToDict() for dsk in disks] - c = Client("blockdev_getmirrorstatus", params) - c.connect(node) - c.run() - return c.getresult().get(node, False) + def call_blockdev_getmirrorstatus(self, node, disks): + """Request status of a (mirroring) device. -def call_blockdev_find(node, disk): - """Request identification of a given block device. + This is a single-node call. - This is a single-node call. + """ + params = [dsk.ToDict() for dsk in disks] + c = Client("blockdev_getmirrorstatus", params) + c.connect(node) + c.run() + return c.getresult().get(node, False) - """ - c = Client("blockdev_find", [disk.ToDict()]) - c.connect(node) - c.run() - return c.getresult().get(node, False) + def call_blockdev_find(self, node, disk): + """Request identification of a given block device. + + This is a single-node call. -def call_blockdev_close(node, disks): - """Closes the given block devices. + """ + c = Client("blockdev_find", [disk.ToDict()]) + c.connect(node) + c.run() + return c.getresult().get(node, False) - This is a single-node call. - """ - params = [cf.ToDict() for cf in disks] - c = Client("blockdev_close", params) - c.connect(node) - c.run() - return c.getresult().get(node, False) + def call_blockdev_close(self, node, disks): + """Closes the given block devices. + This is a single-node call. -def call_upload_file(node_list, file_name): - """Upload a file. + """ + params = [cf.ToDict() for cf in disks] + c = Client("blockdev_close", params) + c.connect(node) + c.run() + return c.getresult().get(node, False) - The node will refuse the operation in case the file is not on the - approved file list. - This is a multi-node call. + @staticmethod + def call_upload_file(node_list, file_name): + """Upload a file. - """ - fh = file(file_name) - try: - data = fh.read() - finally: - fh.close() - st = os.stat(file_name) - params = [file_name, data, st.st_mode, st.st_uid, st.st_gid, - st.st_atime, st.st_mtime] - c = Client("upload_file", params) - c.connect_list(node_list) - c.run() - return c.getresult() + The node will refuse the operation in case the file is not on the + approved file list. + This is a multi-node call. -def call_os_diagnose(node_list): - """Request a diagnose of OS definitions. + """ + fh = file(file_name) + try: + data = fh.read() + finally: + fh.close() + st = os.stat(file_name) + params = [file_name, data, st.st_mode, st.st_uid, st.st_gid, + st.st_atime, st.st_mtime] + c = Client("upload_file", params) + c.connect_list(node_list) + c.run() + return c.getresult() + + @staticmethod + def call_upload_file(node_list, file_name): + """Upload a file. + + The node will refuse the operation in case the file is not on the + approved file list. + + This is a multi-node call. - This is a multi-node call. + """ + fh = file(file_name) + try: + data = fh.read() + finally: + fh.close() + st = os.stat(file_name) + params = [file_name, data, st.st_mode, st.st_uid, st.st_gid, + st.st_atime, st.st_mtime] + c = Client("upload_file", params) + c.connect_list(node_list) + c.run() + return c.getresult() + + def call_os_diagnose(self, node_list): + """Request a diagnose of OS definitions. + + This is a multi-node call. - """ - c = Client("os_diagnose", []) - c.connect_list(node_list) - c.run() - result = c.getresult() - new_result = {} - for node_name in result: - if result[node_name]: - nr = [objects.OS.FromDict(oss) for oss in result[node_name]] - else: - nr = [] - new_result[node_name] = nr - return new_result + """ + c = Client("os_diagnose", []) + c.connect_list(node_list) + c.run() + result = c.getresult() + new_result = {} + for node_name in result: + if result[node_name]: + nr = [objects.OS.FromDict(oss) for oss in result[node_name]] + else: + nr = [] + new_result[node_name] = nr + return new_result -def call_os_get(node, name): - """Returns an OS definition. + def call_os_get(self, node, name): + """Returns an OS definition. - This is a single-node call. + This is a single-node call. - """ - c = Client("os_get", [name]) - c.connect(node) - c.run() - result = c.getresult().get(node, False) - if isinstance(result, dict): - return objects.OS.FromDict(result) - else: - return result + """ + c = Client("os_get", [name]) + c.connect(node) + c.run() + result = c.getresult().get(node, False) + if isinstance(result, dict): + return objects.OS.FromDict(result) + else: + return result -def call_hooks_runner(node_list, hpath, phase, env): - """Call the hooks runner. + def call_hooks_runner(self, node_list, hpath, phase, env): + """Call the hooks runner. - Args: - - op: the OpCode instance - - env: a dictionary with the environment + Args: + - op: the OpCode instance + - env: a dictionary with the environment - This is a multi-node call. + This is a multi-node call. - """ - params = [hpath, phase, env] - c = Client("hooks_runner", params) - c.connect_list(node_list) - c.run() - result = c.getresult() - return result + """ + params = [hpath, phase, env] + c = Client("hooks_runner", params) + c.connect_list(node_list) + c.run() + result = c.getresult() + return result -def call_iallocator_runner(node, name, idata): - """Call an iallocator on a remote node + def call_iallocator_runner(self, node, name, idata): + """Call an iallocator on a remote node - Args: - - name: the iallocator name - - input: the json-encoded input string + Args: + - name: the iallocator name + - input: the json-encoded input string - This is a single-node call. + This is a single-node call. - """ - params = [name, idata] - c = Client("iallocator_runner", params) - c.connect(node) - c.run() - result = c.getresult().get(node, False) - return result + """ + params = [name, idata] + c = Client("iallocator_runner", params) + c.connect(node) + c.run() + result = c.getresult().get(node, False) + return result -def call_blockdev_grow(node, cf_bdev, amount): - """Request a snapshot of the given block device. + def call_blockdev_grow(self, node, cf_bdev, amount): + """Request a snapshot of the given block device. - This is a single-node call. + This is a single-node call. - """ - c = Client("blockdev_grow", [cf_bdev.ToDict(), amount]) - c.connect(node) - c.run() - return c.getresult().get(node, False) + """ + c = Client("blockdev_grow", [cf_bdev.ToDict(), amount]) + c.connect(node) + c.run() + return c.getresult().get(node, False) -def call_blockdev_snapshot(node, cf_bdev): - """Request a snapshot of the given block device. + def call_blockdev_snapshot(self, node, cf_bdev): + """Request a snapshot of the given block device. - This is a single-node call. + This is a single-node call. - """ - c = Client("blockdev_snapshot", [cf_bdev.ToDict()]) - c.connect(node) - c.run() - return c.getresult().get(node, False) + """ + c = Client("blockdev_snapshot", [cf_bdev.ToDict()]) + c.connect(node) + c.run() + return c.getresult().get(node, False) -def call_snapshot_export(node, snap_bdev, dest_node, instance, cluster_name): - """Request the export of a given snapshot. + def call_snapshot_export(self, node, snap_bdev, dest_node, instance, + cluster_name): + """Request the export of a given snapshot. - This is a single-node call. + This is a single-node call. - """ - params = [snap_bdev.ToDict(), dest_node, instance.ToDict(), cluster_name] - c = Client("snapshot_export", params) - c.connect(node) - c.run() - return c.getresult().get(node, False) + """ + params = [snap_bdev.ToDict(), dest_node, instance.ToDict(), cluster_name] + c = Client("snapshot_export", params) + c.connect(node) + c.run() + return c.getresult().get(node, False) -def call_finalize_export(node, instance, snap_disks): - """Request the completion of an export operation. + def call_finalize_export(self, node, instance, snap_disks): + """Request the completion of an export operation. - This writes the export config file, etc. + This writes the export config file, etc. - This is a single-node call. + This is a single-node call. - """ - flat_disks = [] - for disk in snap_disks: - flat_disks.append(disk.ToDict()) - params = [instance.ToDict(), flat_disks] - c = Client("finalize_export", params) - c.connect(node) - c.run() - return c.getresult().get(node, False) + """ + flat_disks = [] + for disk in snap_disks: + flat_disks.append(disk.ToDict()) + params = [instance.ToDict(), flat_disks] + c = Client("finalize_export", params) + c.connect(node) + c.run() + return c.getresult().get(node, False) -def call_export_info(node, path): - """Queries the export information in a given path. + def call_export_info(self, node, path): + """Queries the export information in a given path. - This is a single-node call. + This is a single-node call. - """ - c = Client("export_info", [path]) - c.connect(node) - c.run() - result = c.getresult().get(node, False) - if not result: - return result - return objects.SerializableConfigParser.Loads(str(result)) + """ + c = Client("export_info", [path]) + c.connect(node) + c.run() + result = c.getresult().get(node, False) + if not result: + return result + return objects.SerializableConfigParser.Loads(str(result)) -def call_instance_os_import(node, inst, osdev, swapdev, - src_node, src_image, cluster_name): - """Request the import of a backup into an instance. + def call_instance_os_import(self, node, inst, osdev, swapdev, + src_node, src_image, cluster_name): + """Request the import of a backup into an instance. - This is a single-node call. + This is a single-node call. - """ - params = [inst.ToDict(), osdev, swapdev, src_node, src_image, cluster_name] - c = Client("instance_os_import", params) - c.connect(node) - c.run() - return c.getresult().get(node, False) + """ + params = [inst.ToDict(), osdev, swapdev, src_node, src_image, cluster_name] + c = Client("instance_os_import", params) + c.connect(node) + c.run() + return c.getresult().get(node, False) -def call_export_list(node_list): - """Gets the stored exports list. + def call_export_list(self, node_list): + """Gets the stored exports list. - This is a multi-node call. + This is a multi-node call. - """ - c = Client("export_list", []) - c.connect_list(node_list) - c.run() - result = c.getresult() - return result + """ + c = Client("export_list", []) + c.connect_list(node_list) + c.run() + result = c.getresult() + return result -def call_export_remove(node, export): - """Requests removal of a given export. + def call_export_remove(self, node, export): + """Requests removal of a given export. - This is a single-node call. + This is a single-node call. - """ - c = Client("export_remove", [export]) - c.connect(node) - c.run() - return c.getresult().get(node, False) + """ + c = Client("export_remove", [export]) + c.connect(node) + c.run() + return c.getresult().get(node, False) -def call_node_leave_cluster(node): - """Requests a node to clean the cluster information it has. + def call_node_leave_cluster(self, node): + """Requests a node to clean the cluster information it has. - This will remove the configuration information from the ganeti data - dir. + This will remove the configuration information from the ganeti data + dir. - This is a single-node call. + This is a single-node call. - """ - c = Client("node_leave_cluster", []) - c.connect(node) - c.run() - return c.getresult().get(node, False) + """ + c = Client("node_leave_cluster", []) + c.connect(node) + c.run() + return c.getresult().get(node, False) -def call_node_volumes(node_list): - """Gets all volumes on node(s). + def call_node_volumes(self, node_list): + """Gets all volumes on node(s). - This is a multi-node call. + This is a multi-node call. - """ - c = Client("node_volumes", []) - c.connect_list(node_list) - c.run() - return c.getresult() + """ + c = Client("node_volumes", []) + c.connect_list(node_list) + c.run() + return c.getresult() -def call_test_delay(node_list, duration): - """Sleep for a fixed time on given node(s). + def call_test_delay(self, node_list, duration): + """Sleep for a fixed time on given node(s). - This is a multi-node call. + This is a multi-node call. - """ - c = Client("test_delay", [duration]) - c.connect_list(node_list) - c.run() - return c.getresult() + """ + c = Client("test_delay", [duration]) + c.connect_list(node_list) + c.run() + return c.getresult() -def call_file_storage_dir_create(node, file_storage_dir): - """Create the given file storage directory. + def call_file_storage_dir_create(self, node, file_storage_dir): + """Create the given file storage directory. - This is a single-node call. + This is a single-node call. - """ - c = Client("file_storage_dir_create", [file_storage_dir]) - c.connect(node) - c.run() - return c.getresult().get(node, False) + """ + c = Client("file_storage_dir_create", [file_storage_dir]) + c.connect(node) + c.run() + return c.getresult().get(node, False) -def call_file_storage_dir_remove(node, file_storage_dir): - """Remove the given file storage directory. + def call_file_storage_dir_remove(self, node, file_storage_dir): + """Remove the given file storage directory. - This is a single-node call. + This is a single-node call. - """ - c = Client("file_storage_dir_remove", [file_storage_dir]) - c.connect(node) - c.run() - return c.getresult().get(node, False) + """ + c = Client("file_storage_dir_remove", [file_storage_dir]) + c.connect(node) + c.run() + return c.getresult().get(node, False) -def call_file_storage_dir_rename(node, old_file_storage_dir, - new_file_storage_dir): - """Rename file storage directory. + def call_file_storage_dir_rename(self, node, old_file_storage_dir, + new_file_storage_dir): + """Rename file storage directory. - This is a single-node call. + This is a single-node call. - """ - c = Client("file_storage_dir_rename", - [old_file_storage_dir, new_file_storage_dir]) - c.connect(node) - c.run() - return c.getresult().get(node, False) + """ + c = Client("file_storage_dir_rename", + [old_file_storage_dir, new_file_storage_dir]) + c.connect(node) + c.run() + return c.getresult().get(node, False) -def call_jobqueue_update(node_list, file_name, content): - """Update job queue. + @staticmethod + def call_jobqueue_update(node_list, file_name, content): + """Update job queue. - This is a multi-node call. + This is a multi-node call. - """ - c = Client("jobqueue_update", [file_name, content]) - c.connect_list(node_list) - c.run() - result = c.getresult() - return result + """ + c = Client("jobqueue_update", [file_name, content]) + c.connect_list(node_list) + c.run() + result = c.getresult() + return result -def call_jobqueue_purge(node): - """Purge job queue. + @staticmethod + def call_jobqueue_purge(node): + """Purge job queue. - This is a single-node call. + This is a single-node call. - """ - c = Client("jobqueue_purge", []) - c.connect(node) - c.run() - return c.getresult().get(node, False) + """ + c = Client("jobqueue_purge", []) + c.connect(node) + c.run() + return c.getresult().get(node, False) -def call_jobqueue_rename(node_list, old, new): - """Rename a job queue file. + @staticmethod + def call_jobqueue_rename(node_list, old, new): + """Rename a job queue file. - This is a multi-node call. + This is a multi-node call. - """ - c = Client("jobqueue_rename", [old, new]) - c.connect_list(node_list) - c.run() - result = c.getresult() - return result + """ + c = Client("jobqueue_rename", [old, new]) + c.connect_list(node_list) + c.run() + result = c.getresult() + return result diff --git a/test/ganeti.hooks_unittest.py b/test/ganeti.hooks_unittest.py index 2f5f8b0a269edac352876c74a044026756f54ace..02522b9053edcae3e579774ce1ddd011de403276 100755 --- a/test/ganeti.hooks_unittest.py +++ b/test/ganeti.hooks_unittest.py @@ -228,7 +228,9 @@ class TestHooksMaster(unittest.TestCase): def setUp(self): self.op = opcodes.OpCode() self.context = FakeContext() - self.lu = FakeLU(None, self.op, self.context) + # WARNING: here we pass None as RpcRunner instance since we know + # our usage via HooksMaster will not use lu.rpc + self.lu = FakeLU(None, self.op, self.context, None) def testTotalFalse(self): """Test complete rpc failure"""