Commit db04ce5d authored by Michael Hanselmann's avatar Michael Hanselmann

Move bootstrap-related RPC to generated wrappers

With this patch, only 6 RPC are left as old-style code.
Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarIustin Pop <iustin@google.com>
parent fb1ffbca
......@@ -222,7 +222,7 @@ def _WaitForNodeDaemon(node_name):
"""
def _CheckNodeDaemon():
result = rpc.RpcRunner.call_version([node_name])[node_name]
result = rpc.BootstrapRunner().call_version([node_name])[node_name]
if result.fail_msg:
raise utils.RetryAgain()
......@@ -565,11 +565,12 @@ def FinalizeClusterDestroy(master):
"""
cfg = config.ConfigWriter()
modify_ssh_setup = cfg.GetClusterInfo().modify_ssh_setup
result = rpc.RpcRunner.call_node_stop_master(master)
result = rpc.BootstrapRunner().call_node_stop_master(master)
msg = result.fail_msg
if msg:
logging.warning("Could not disable the master role: %s", msg)
result = rpc.RpcRunner.call_node_leave_cluster(master, modify_ssh_setup)
result = rpc.BootstrapRunner().call_node_leave_cluster(master,
modify_ssh_setup)
msg = result.fail_msg
if msg:
logging.warning("Could not shutdown the node daemon and cleanup"
......@@ -697,7 +698,7 @@ def MasterFailover(no_voting=False):
logging.info("Stopping the master daemon on node %s", old_master)
result = rpc.RpcRunner.call_node_stop_master(old_master)
result = rpc.BootstrapRunner().call_node_stop_master(old_master)
msg = result.fail_msg
if msg:
logging.error("Could not disable the master role on the old master"
......@@ -726,7 +727,8 @@ def MasterFailover(no_voting=False):
logging.info("Starting the master daemons on the new master")
result = rpc.RpcRunner.call_node_start_master_daemons(new_master, no_voting)
result = rpc.BootstrapRunner().call_node_start_master_daemons(new_master,
no_voting)
msg = result.fail_msg
if msg:
logging.error("Could not start the master role on the new master"
......@@ -782,7 +784,7 @@ def GatherMasterVotes(node_list):
if not node_list:
# no nodes left (eventually after removing myself)
return []
results = rpc.RpcRunner.call_master_info(node_list)
results = rpc.BootstrapRunner().call_master_info(node_list)
if not isinstance(results, dict):
# this should not happen (unless internal error in rpc)
logging.critical("Can't complete rpc call, aborting master startup")
......
......@@ -371,4 +371,23 @@ CALLS = {
("rename", None, None),
], None, "Rename job queue file"),
],
"RpcClientBootstrap": [
("node_start_master_daemons", SINGLE, TMO_FAST, [
("no_voting", None, None),
], None, "Starts master daemons on a node"),
("node_activate_master_ip", SINGLE, TMO_FAST, [], None,
"Activates master IP on a node"),
("node_stop_master", SINGLE, TMO_FAST, [], None,
"Deactivates master IP and stops master daemons on a node"),
("node_deactivate_master_ip", SINGLE, TMO_FAST, [], None,
"Deactivates master IP on a node"),
("node_change_master_netmask", SINGLE, TMO_FAST, [
("netmask", None, None),
], None, "Change master IP netmask"),
("node_leave_cluster", SINGLE, TMO_NORMAL, [
("modify_ssh_setup", None, None),
], None, "Requests a node to clean the cluster information it has"),
("master_info", MULTI, TMO_URGENT, [], None, "Query master info"),
("version", MULTI, TMO_URGENT, [], None, "Query node version"),
],
}
......@@ -437,7 +437,8 @@ class _RpcProcessor:
return self._CombineResults(results, requests, procedure)
class RpcRunner(_generated_rpc.RpcClientDefault):
class RpcRunner(_generated_rpc.RpcClientDefault,
_generated_rpc.RpcClientBootstrap):
"""RPC runner class.
"""
......@@ -448,6 +449,11 @@ class RpcRunner(_generated_rpc.RpcClientDefault):
@param context: Ganeti context
"""
# Pylint doesn't recognize multiple inheritance properly, see
# <http://www.logilab.org/ticket/36586> and
# <http://www.logilab.org/ticket/35642>
# pylint: disable=W0233
_generated_rpc.RpcClientBootstrap.__init__(self)
_generated_rpc.RpcClientDefault.__init__(self)
self._cfg = context.cfg
......@@ -646,79 +652,6 @@ class RpcRunner(_generated_rpc.RpcClientDefault):
[self._InstDict(inst, osp=osparams),
reinstall, debug])
@classmethod
@_RpcTimeout(_TMO_FAST)
def call_node_start_master_daemons(cls, node, no_voting):
"""Starts master daemons on a node.
This is a single-node call.
"""
return cls._StaticSingleNodeCall(node, "node_start_master_daemons",
[no_voting])
@classmethod
@_RpcTimeout(_TMO_FAST)
def call_node_activate_master_ip(cls, node):
"""Activates master IP on a node.
This is a single-node call.
"""
return cls._StaticSingleNodeCall(node, "node_activate_master_ip", [])
@classmethod
@_RpcTimeout(_TMO_FAST)
def call_node_stop_master(cls, node):
"""Deactivates master IP and stops master daemons on a node.
This is a single-node call.
"""
return cls._StaticSingleNodeCall(node, "node_stop_master", [])
@classmethod
@_RpcTimeout(_TMO_FAST)
def call_node_deactivate_master_ip(cls, node):
"""Deactivates master IP on a node.
This is a single-node call.
"""
return cls._StaticSingleNodeCall(node, "node_deactivate_master_ip", [])
@classmethod
@_RpcTimeout(_TMO_FAST)
def call_node_change_master_netmask(cls, node, netmask):
"""Change master IP netmask.
This is a single-node call.
"""
return cls._StaticSingleNodeCall(node, "node_change_master_netmask",
[netmask])
@classmethod
@_RpcTimeout(_TMO_URGENT)
def call_master_info(cls, node_list):
"""Query master info.
This is a multi-node call.
"""
# TODO: should this method query down nodes?
return cls._StaticMultiNodeCall(node_list, "master_info", [])
@classmethod
@_RpcTimeout(_TMO_URGENT)
def call_version(cls, node_list):
"""Query node version.
This is a multi-node call.
"""
return cls._StaticMultiNodeCall(node_list, "version", [])
@classmethod
@_RpcTimeout(_TMO_NORMAL)
def call_upload_file(cls, node_list, file_name, address_list=None):
......@@ -757,20 +690,6 @@ class RpcRunner(_generated_rpc.RpcClientDefault):
"""
return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
@classmethod
@_RpcTimeout(_TMO_NORMAL)
def call_node_leave_cluster(cls, node, modify_ssh_setup):
"""Requests a node to clean the cluster information it has.
This will remove the configuration information from the ganeti data
dir.
This is a single-node call.
"""
return cls._StaticSingleNodeCall(node, "node_leave_cluster",
[modify_ssh_setup])
def call_test_delay(self, node_list, duration, read_timeout=None):
"""Sleep for a fixed time on given node(s).
......@@ -830,3 +749,25 @@ class JobQueueRunner(_generated_rpc.RpcClientJobQueue):
body = serializer.DumpJson(args, indent=False)
return self._proc(node_list, procedure, body, read_timeout=timeout)
class BootstrapRunner(_generated_rpc.RpcClientBootstrap):
"""RPC wrappers for bootstrapping.
"""
def __init__(self):
"""Initializes this class.
"""
_generated_rpc.RpcClientBootstrap.__init__(self)
self._proc = _RpcProcessor(_SsconfResolver,
netutils.GetDaemonPort(constants.NODED))
def _Call(self, node_list, procedure, timeout, args):
"""Entry point for automatically generated RPC wrappers.
"""
body = serializer.DumpJson(args, indent=False)
return self._proc(node_list, procedure, body, read_timeout=timeout)
......@@ -532,7 +532,7 @@ def CheckAgreement():
def ActivateMasterIP():
# activate ip
master_node = ssconf.SimpleStore().GetMasterNode()
result = rpc.RpcRunner.call_node_activate_master_ip(master_node)
result = rpc.BootstrapRunner().call_node_activate_master_ip(master_node)
msg = result.fail_msg
if msg:
logging.error("Can't activate master IP address: %s", msg)
......
......@@ -73,7 +73,8 @@ class TestRpcProcessor(unittest.TestCase):
resolver = rpc._StaticResolver(["127.0.0.1"])
http_proc = _FakeRequestProcessor(self._GetVersionResponse)
proc = rpc._RpcProcessor(resolver, 24094)
result = proc(["localhost"], "version", None, _req_process_fn=http_proc)
result = proc(["localhost"], "version", None, _req_process_fn=http_proc,
read_timeout=60)
self.assertEqual(result.keys(), ["localhost"])
lhresp = result["localhost"]
self.assertFalse(lhresp.offline)
......@@ -113,7 +114,8 @@ class TestRpcProcessor(unittest.TestCase):
resolver = rpc._StaticResolver([rpc._OFFLINE])
http_proc = _FakeRequestProcessor(NotImplemented)
proc = rpc._RpcProcessor(resolver, 30668)
result = proc(["n17296"], "version", None, _req_process_fn=http_proc)
result = proc(["n17296"], "version", None, _req_process_fn=http_proc,
read_timeout=60)
self.assertEqual(result.keys(), ["n17296"])
lhresp = result["n17296"]
self.assertTrue(lhresp.offline)
......@@ -143,7 +145,8 @@ class TestRpcProcessor(unittest.TestCase):
resolver = rpc._StaticResolver(nodes)
http_proc = _FakeRequestProcessor(self._GetMultiVersionResponse)
proc = rpc._RpcProcessor(resolver, 23245)
result = proc(nodes, "version", None, _req_process_fn=http_proc)
result = proc(nodes, "version", None, _req_process_fn=http_proc,
read_timeout=60)
self.assertEqual(sorted(result.keys()), sorted(nodes))
for name in nodes:
......@@ -171,7 +174,7 @@ class TestRpcProcessor(unittest.TestCase):
_FakeRequestProcessor(compat.partial(self._GetVersionResponseFail,
errinfo))
result = proc(["aef9ur4i.example.com"], "version", None,
_req_process_fn=http_proc)
_req_process_fn=http_proc, read_timeout=60)
self.assertEqual(result.keys(), ["aef9ur4i.example.com"])
lhresp = result["aef9ur4i.example.com"]
self.assertFalse(lhresp.offline)
......@@ -263,7 +266,7 @@ class TestRpcProcessor(unittest.TestCase):
for fn in [self._GetInvalidResponseA, self._GetInvalidResponseB]:
http_proc = _FakeRequestProcessor(fn)
result = proc(["oqo7lanhly.example.com"], "version", None,
_req_process_fn=http_proc)
_req_process_fn=http_proc, read_timeout=60)
self.assertEqual(result.keys(), ["oqo7lanhly.example.com"])
lhresp = result["oqo7lanhly.example.com"]
self.assertFalse(lhresp.offline)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment