Commit 72737a7f authored by Iustin Pop's avatar Iustin Pop
Browse files

Convert rpc module to RpcRunner

This big patch changes the call model used in internode-rpc from
standalong function calls in the rpc module to via a RpcRunner class,
that holds all the methods. This can be used in the future to enable
smarter processing in the RPC layer itself (some quick examples are not
setting the DiskID from cmdlib code, but only once in each rpc call,
etc.).

There are a few RPC calls that are made outside of the LU code, and
these calls are left as staticmethods, so they can be used without a
class instance (which requires a ConfigWriter instance).

Reviewed-by: imsnah
parent b9bddb6b
......@@ -381,7 +381,7 @@ def CheckAgreement():
# either single node cluster, or a misconfiguration, but I won't
# break any other node, so I can proceed
return True
results = rpc.call_master_info(node_list)
results = rpc.RpcRunner.call_master_info(node_list)
if not isinstance(results, dict):
# this should not happen (unless internal error in rpc)
logging.critical("Can't complete rpc call, aborting master startup")
......@@ -445,7 +445,7 @@ def main():
# activate ip
master_node = ssconf.SimpleConfigReader().GetMasterNode()
if not rpc.call_node_start_master(master_node, False):
if not rpc.RpcRunner.call_node_start_master(master_node, False):
logging.error("Can't activate master IP address")
master.setup_queue()
......
......@@ -38,6 +38,7 @@ from ganeti import constants
from ganeti import objects
from ganeti import ssconf
from ganeti.rpc import RpcRunner
def _InitSSHSetup(node):
"""Setup the SSH configuration for the cluster.
......@@ -236,7 +237,7 @@ def InitCluster(cluster_name, hypervisor_type, mac_prefix, def_bridge,
# start the master ip
# TODO: Review rpc call from bootstrap
rpc.call_node_start_master(hostname.name, True)
RpcRunner.call_node_start_master(hostname.name, True)
def InitConfig(version, cluster_config, master_node_config,
......@@ -281,9 +282,9 @@ def FinalizeClusterDestroy(master):
begun in cmdlib.LUDestroyOpcode.
"""
if not rpc.call_node_stop_master(master, True):
if not RpcRunner.call_node_stop_master(master, True):
logging.warning("Could not disable the master role")
if not rpc.call_node_leave_cluster(master):
if not RpcRunner.call_node_leave_cluster(master):
logging.warning("Could not shutdown the node daemon and cleanup the node")
......@@ -365,7 +366,7 @@ def MasterFailover():
logging.info("setting master to %s, old master: %s", new_master, old_master)
if not rpc.call_node_stop_master(old_master, True):
if not RpcRunner.call_node_stop_master(old_master, True):
logging.error("could disable the master role on the old master"
" %s, please disable manually", old_master)
......@@ -374,12 +375,12 @@ def MasterFailover():
# Here we have a phase where no master should be running
if not rpc.call_upload_file(cfg.GetNodeList(),
if not RpcRunner.call_upload_file(cfg.GetNodeList(),
constants.CLUSTER_CONF_FILE):
logging.error("could not distribute the new simple store master file"
" to the other nodes, please check.")
if not rpc.call_node_start_master(new_master, True):
if not RpcRunner.call_node_start_master(new_master, True):
logging.error("could not start the master role on the new master"
" %s, please check", new_master)
rcode = 1
......
This diff is collapsed.
......@@ -821,7 +821,7 @@ class ConfigWriter:
except ValueError:
pass
result = rpc.call_upload_file(nodelist, self._cfg_file)
result = rpc.RpcRunner.call_upload_file(nodelist, self._cfg_file)
for node in nodelist:
if not result[node]:
logging.error("copy of file %s to node %s failed",
......
......@@ -45,6 +45,7 @@ from ganeti import utils
from ganeti import jstore
from ganeti import rpc
from ganeti.rpc import RpcRunner
JOBQUEUE_THREADS = 25
......@@ -404,7 +405,7 @@ class JobQueue(object):
assert node_name != self._my_hostname
# Clean queue directory on added node
rpc.call_jobqueue_purge(node_name)
RpcRunner.call_jobqueue_purge(node_name)
# Upload the whole queue excluding archived jobs
files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
......@@ -420,7 +421,7 @@ class JobQueue(object):
finally:
fd.close()
result = rpc.call_jobqueue_update([node_name], file_name, content)
result = RpcRunner.call_jobqueue_update([node_name], file_name, content)
if not result[node_name]:
logging.error("Failed to upload %s to %s", file_name, node_name)
......@@ -459,14 +460,14 @@ class JobQueue(object):
"""
utils.WriteFile(file_name, data=data)
result = rpc.call_jobqueue_update(self._nodes, file_name, data)
result = RpcRunner.call_jobqueue_update(self._nodes, file_name, data)
self._CheckRpcResult(result, self._nodes,
"Updating %s" % file_name)
def _RenameFileUnlocked(self, old, new):
os.rename(old, new)
result = rpc.call_jobqueue_rename(self._nodes, old, new)
result = RpcRunner.call_jobqueue_rename(self._nodes, old, new)
self._CheckRpcResult(result, self._nodes,
"Moving %s to %s" % (old, new))
......
......@@ -97,6 +97,7 @@ class Processor(object):
self.context = context
self._feedback_fn = None
self.exclusive_BGL = False
self.rpc = rpc.RpcRunner(context.cfg)
def _ExecLU(self, lu):
"""Logical Unit execution sequence.
......@@ -104,7 +105,7 @@ class Processor(object):
"""
write_count = self.context.cfg.write_count
lu.CheckPrereq()
hm = HooksMaster(rpc.call_hooks_runner, self, lu)
hm = HooksMaster(self.rpc.call_hooks_runner, self, lu)
h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
self._feedback_fn, None)
......@@ -202,7 +203,7 @@ class Processor(object):
shared=not lu_class.REQ_BGL)
try:
self.exclusive_BGL = lu_class.REQ_BGL
lu = lu_class(self, op, self.context)
lu = lu_class(self, op, self.context, self.rpc)
lu.ExpandNames()
assert lu.needed_locks is not None, "needed_locks not set by LU"
result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE)
......
......@@ -23,7 +23,12 @@
"""
# pylint: disable-msg=C0103
# pylint: disable-msg=C0103,R0201,R0904
# C0103: Invalid name, since call_ are not valid
# R0201: Method could be a function, we keep all rpcs instance methods
# as not to change them back and forth between static/instance methods
# if they need to start using instance attributes
# R0904: Too many public methods
import os
import socket
......@@ -140,7 +145,20 @@ class Client:
self.results[node] = nc.get_response()
def call_volume_list(node_list, vg_name):
class RpcRunner(object):
"""RPC runner class"""
def __init__(self, cfg):
"""Initialized the rpc runner.
@type cfg: C{config.ConfigWriter}
@param cfg: the configuration object that will be used to get data
about the cluster
"""
self._cfg = cfg
def call_volume_list(self, node_list, vg_name):
"""Gets the logical volumes present in a given volume group.
This is a multi-node call.
......@@ -151,8 +169,7 @@ def call_volume_list(node_list, vg_name):
c.run()
return c.getresult()
def call_vg_list(node_list):
def call_vg_list(self, node_list):
"""Gets the volume group list.
This is a multi-node call.
......@@ -164,7 +181,7 @@ def call_vg_list(node_list):
return c.getresult()
def call_bridges_exist(node, bridges_list):
def call_bridges_exist(self, node, bridges_list):
"""Checks if a node has all the bridges given.
This method checks if all bridges given in the bridges_list are
......@@ -180,7 +197,7 @@ def call_bridges_exist(node, bridges_list):
return c.getresult().get(node, False)
def call_instance_start(node, instance, extra_args):
def call_instance_start(self, node, instance, extra_args):
"""Starts an instance.
This is a single-node call.
......@@ -192,7 +209,7 @@ def call_instance_start(node, instance, extra_args):
return c.getresult().get(node, False)
def call_instance_shutdown(node, instance):
def call_instance_shutdown(self, node, instance):
"""Stops an instance.
This is a single-node call.
......@@ -204,7 +221,7 @@ def call_instance_shutdown(node, instance):
return c.getresult().get(node, False)
def call_instance_migrate(node, instance, target, live):
def call_instance_migrate(self, node, instance, target, live):
"""Migrate an instance.
This is a single-node call.
......@@ -226,7 +243,7 @@ def call_instance_migrate(node, instance, target, live):
return c.getresult().get(node, False)
def call_instance_reboot(node, instance, reboot_type, extra_args):
def call_instance_reboot(self, node, instance, reboot_type, extra_args):
"""Reboots an instance.
This is a single-node call.
......@@ -238,7 +255,7 @@ def call_instance_reboot(node, instance, reboot_type, extra_args):
return c.getresult().get(node, False)
def call_instance_os_add(node, inst, osdev, swapdev):
def call_instance_os_add(self, node, inst, osdev, swapdev):
"""Installs an OS on the given instance.
This is a single-node call.
......@@ -251,7 +268,7 @@ def call_instance_os_add(node, inst, osdev, swapdev):
return c.getresult().get(node, False)
def call_instance_run_rename(node, inst, old_name, osdev, swapdev):
def call_instance_run_rename(self, node, inst, old_name, osdev, swapdev):
"""Run the OS rename script for an instance.
This is a single-node call.
......@@ -264,7 +281,7 @@ def call_instance_run_rename(node, inst, old_name, osdev, swapdev):
return c.getresult().get(node, False)
def call_instance_info(node, instance, hname):
def call_instance_info(self, node, instance, hname):
"""Returns information about a single instance.
This is a single-node call.
......@@ -283,7 +300,7 @@ def call_instance_info(node, instance, hname):
return c.getresult().get(node, False)
def call_all_instances_info(node_list, hypervisor_list):
def call_all_instances_info(self, node_list, hypervisor_list):
"""Returns information about all instances on the given nodes.
This is a multi-node call.
......@@ -300,7 +317,7 @@ def call_all_instances_info(node_list, hypervisor_list):
return c.getresult()
def call_instance_list(node_list, hypervisor_list):
def call_instance_list(self, node_list, hypervisor_list):
"""Returns the list of running instances on a given node.
This is a multi-node call.
......@@ -317,7 +334,8 @@ def call_instance_list(node_list, hypervisor_list):
return c.getresult()
def call_node_tcp_ping(node, source, target, port, timeout, live_port_needed):
def call_node_tcp_ping(self, node, source, target, port, timeout,
live_port_needed):
"""Do a TcpPing on the remote node
This is a single-node call.
......@@ -329,7 +347,7 @@ def call_node_tcp_ping(node, source, target, port, timeout, live_port_needed):
return c.getresult().get(node, False)
def call_node_info(node_list, vg_name, hypervisor_type):
def call_node_info(self, node_list, vg_name, hypervisor_type):
"""Return node information.
This will return memory information and volume group size and free
......@@ -340,7 +358,8 @@ def call_node_info(node_list, vg_name, hypervisor_type):
@type node_list: list
@param node_list: the list of nodes to query
@type vgname: C{string}
@param vgname: the name of the volume group to ask for disk space information
@param vgname: the name of the volume group to ask for disk space
information
@type hypervisor_type: C{str}
@param hypervisor_type: the name of the hypervisor to ask for
memory information
......@@ -368,7 +387,7 @@ def call_node_info(node_list, vg_name, hypervisor_type):
return retux
def call_node_add(node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
"""Add a node to the cluster.
This is a single-node call.
......@@ -381,7 +400,7 @@ def call_node_add(node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
return c.getresult().get(node, False)
def call_node_verify(node_list, checkdict, cluster_name):
def call_node_verify(self, node_list, checkdict, cluster_name):
"""Request verification of given parameters.
This is a multi-node call.
......@@ -393,7 +412,8 @@ def call_node_verify(node_list, checkdict, cluster_name):
return c.getresult()
def call_node_start_master(node, start_daemons):
@staticmethod
def call_node_start_master(node, start_daemons):
"""Tells a node to activate itself as a master.
This is a single-node call.
......@@ -405,7 +425,8 @@ def call_node_start_master(node, start_daemons):
return c.getresult().get(node, False)
def call_node_stop_master(node, stop_daemons):
@staticmethod
def call_node_stop_master(node, stop_daemons):
"""Tells a node to demote itself from master status.
This is a single-node call.
......@@ -417,19 +438,21 @@ def call_node_stop_master(node, stop_daemons):
return c.getresult().get(node, False)
def call_master_info(node_list):
@staticmethod
def call_master_info(node_list):
"""Query master info.
This is a multi-node call.
"""
# TODO: should this method query down nodes?
c = Client("master_info", [])
c.connect_list(node_list)
c.run()
return c.getresult()
def call_version(node_list):
def call_version(self, node_list):
"""Query node version.
This is a multi-node call.
......@@ -441,7 +464,7 @@ def call_version(node_list):
return c.getresult()
def call_blockdev_create(node, bdev, size, owner, on_primary, info):
def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
"""Request creation of a given block device.
This is a single-node call.
......@@ -454,7 +477,7 @@ def call_blockdev_create(node, bdev, size, owner, on_primary, info):
return c.getresult().get(node, False)
def call_blockdev_remove(node, bdev):
def call_blockdev_remove(self, node, bdev):
"""Request removal of a given block device.
This is a single-node call.
......@@ -466,7 +489,7 @@ def call_blockdev_remove(node, bdev):
return c.getresult().get(node, False)
def call_blockdev_rename(node, devlist):
def call_blockdev_rename(self, node, devlist):
"""Request rename of the given block devices.
This is a single-node call.
......@@ -479,7 +502,7 @@ def call_blockdev_rename(node, devlist):
return c.getresult().get(node, False)
def call_blockdev_assemble(node, disk, owner, on_primary):
def call_blockdev_assemble(self, node, disk, owner, on_primary):
"""Request assembling of a given block device.
This is a single-node call.
......@@ -492,7 +515,7 @@ def call_blockdev_assemble(node, disk, owner, on_primary):
return c.getresult().get(node, False)
def call_blockdev_shutdown(node, disk):
def call_blockdev_shutdown(self, node, disk):
"""Request shutdown of a given block device.
This is a single-node call.
......@@ -504,7 +527,7 @@ def call_blockdev_shutdown(node, disk):
return c.getresult().get(node, False)
def call_blockdev_addchildren(node, bdev, ndevs):
def call_blockdev_addchildren(self, node, bdev, ndevs):
"""Request adding a list of children to a (mirroring) device.
This is a single-node call.
......@@ -517,7 +540,7 @@ def call_blockdev_addchildren(node, bdev, ndevs):
return c.getresult().get(node, False)
def call_blockdev_removechildren(node, bdev, ndevs):
def call_blockdev_removechildren(self, node, bdev, ndevs):
"""Request removing a list of children from a (mirroring) device.
This is a single-node call.
......@@ -530,7 +553,7 @@ def call_blockdev_removechildren(node, bdev, ndevs):
return c.getresult().get(node, False)
def call_blockdev_getmirrorstatus(node, disks):
def call_blockdev_getmirrorstatus(self, node, disks):
"""Request status of a (mirroring) device.
This is a single-node call.
......@@ -543,7 +566,7 @@ def call_blockdev_getmirrorstatus(node, disks):
return c.getresult().get(node, False)
def call_blockdev_find(node, disk):
def call_blockdev_find(self, node, disk):
"""Request identification of a given block device.
This is a single-node call.
......@@ -555,7 +578,7 @@ def call_blockdev_find(node, disk):
return c.getresult().get(node, False)
def call_blockdev_close(node, disks):
def call_blockdev_close(self, node, disks):
"""Closes the given block devices.
This is a single-node call.
......@@ -568,7 +591,8 @@ def call_blockdev_close(node, disks):
return c.getresult().get(node, False)
def call_upload_file(node_list, file_name):
@staticmethod
def call_upload_file(node_list, file_name):
"""Upload a file.
The node will refuse the operation in case the file is not on the
......@@ -590,8 +614,30 @@ def call_upload_file(node_list, file_name):
c.run()
return c.getresult()
@staticmethod
def call_upload_file(node_list, file_name):
"""Upload a file.
The node will refuse the operation in case the file is not on the
approved file list.
This is a multi-node call.
"""
fh = file(file_name)
try:
data = fh.read()
finally:
fh.close()
st = os.stat(file_name)
params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
st.st_atime, st.st_mtime]
c = Client("upload_file", params)
c.connect_list(node_list)
c.run()
return c.getresult()
def call_os_diagnose(node_list):
def call_os_diagnose(self, node_list):
"""Request a diagnose of OS definitions.
This is a multi-node call.
......@@ -611,7 +657,7 @@ def call_os_diagnose(node_list):
return new_result
def call_os_get(node, name):
def call_os_get(self, node, name):
"""Returns an OS definition.
This is a single-node call.
......@@ -627,7 +673,7 @@ def call_os_get(node, name):
return result
def call_hooks_runner(node_list, hpath, phase, env):
def call_hooks_runner(self, node_list, hpath, phase, env):
"""Call the hooks runner.
Args:
......@@ -645,7 +691,7 @@ def call_hooks_runner(node_list, hpath, phase, env):
return result
def call_iallocator_runner(node, name, idata):
def call_iallocator_runner(self, node, name, idata):
"""Call an iallocator on a remote node
Args:
......@@ -663,7 +709,7 @@ def call_iallocator_runner(node, name, idata):
return result
def call_blockdev_grow(node, cf_bdev, amount):
def call_blockdev_grow(self, node, cf_bdev, amount):
"""Request a snapshot of the given block device.
This is a single-node call.
......@@ -675,7 +721,7 @@ def call_blockdev_grow(node, cf_bdev, amount):
return c.getresult().get(node, False)
def call_blockdev_snapshot(node, cf_bdev):
def call_blockdev_snapshot(self, node, cf_bdev):
"""Request a snapshot of the given block device.
This is a single-node call.
......@@ -687,7 +733,8 @@ def call_blockdev_snapshot(node, cf_bdev):
return c.getresult().get(node, False)
def call_snapshot_export(node, snap_bdev, dest_node, instance, cluster_name):
def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
cluster_name):
"""Request the export of a given snapshot.
This is a single-node call.
......@@ -700,7 +747,7 @@ def call_snapshot_export(node, snap_bdev, dest_node, instance, cluster_name):
return c.getresult().get(node, False)
def call_finalize_export(node, instance, snap_disks):
def call_finalize_export(self, node, instance, snap_disks):
"""Request the completion of an export operation.
This writes the export config file, etc.
......@@ -718,7 +765,7 @@ def call_finalize_export(node, instance, snap_disks):
return c.getresult().get(node, False)
def call_export_info(node, path):
def call_export_info(self, node, path):
"""Queries the export information in a given path.
This is a single-node call.
......@@ -733,7 +780,7 @@ def call_export_info(node, path):
return objects.SerializableConfigParser.Loads(str(result))
def call_instance_os_import(node, inst, osdev, swapdev,
def call_instance_os_import(self, node, inst, osdev, swapdev,
src_node, src_image, cluster_name):
"""Request the import of a backup into an instance.
......@@ -747,7 +794,7 @@ def call_instance_os_import(node, inst, osdev, swapdev,
return c.getresult().get(node, False)
def call_export_list(node_list):
def call_export_list(self, node_list):
"""Gets the stored exports list.
This is a multi-node call.
......@@ -760,7 +807,7 @@ def call_export_list(node_list):
return result
def call_export_remove(node, export):
def call_export_remove(self, node, export):
"""Requests removal of a given export.
This is a single-node call.
......@@ -772,7 +819,7 @@ def call_export_remove(node, export):
return c.getresult().get(node, False)
def call_node_leave_cluster(node):
def call_node_leave_cluster(self, node):
"""Requests a node to clean the cluster information it has.
This will remove the configuration information from the ganeti data
......@@ -787,7 +834,7 @@ def call_node_leave_cluster(node):
return c.getresult().get(node, False)
def call_node_volumes(node_list):
def call_node_volumes(self, node_list):
"""Gets all volumes on node(s).
This is a multi-node call.
......@@ -799,7 +846,7 @@ def call_node_volumes(node_list):
return c.getresult()
def call_test_delay(node_list, duration):
def call_test_delay(self, node_list, duration):
"""Sleep for a fixed time on given node(s).
This is a multi-node call.
......@@ -811,7 +858,7 @@ def call_test_delay(node_list, duration):
return c.getresult()
def call_file_storage_dir_create(node, file_storage_dir):
def call_file_storage_dir_create(self, node, file_storage_dir):
"""Create the given file storage directory.
This is a single-node call.
......@@ -823,7 +870,7 @@ def call_file_storage_dir_create(node, file_storage_dir):
return c.getresult().get(node, False)
def call_file_storage_dir_remove(node, file_storage_dir):
def call_file_storage_dir_remove(self, node, file_storage_dir):
"""Remove the given file storage directory.
This is a single-node call.
......@@ -835,7 +882,7 @@ def call_file_storage_dir_remove(node, file_storage_dir):
return c.getresult().get(node, False)
def call_file_storage_dir_rename(node, old_file_storage_dir,
def call_file_storage_dir_rename(self, node, old_file_storage_dir,
new_file_storage_dir):
"""Rename file storage directory.
......@@ -849,7 +896,8 @@ def call_file_storage_dir_rename(node, old_file_storage_dir,
return c.getresult().get(node, False)
def call_jobqueue_update(node_list, file_name, content):
@staticmethod
def call_jobqueue_update(node_list, file_name, content):
"""Update job queue.
This is a multi-node call.
......@@ -862,7 +910,8 @@ def call_jobqueue_update(node_list, file_name, content):
return result