Commit 781de953 authored by Iustin Pop's avatar Iustin Pop
Browse files

Convert rpc results to a custom type

For a long time we had the problem that both RPC-layer errors and
results from the remote node share the same "valuespace". This is
because we shouldn't raise an exception when only one node failed
(and lose the results from the other nodes).

This patch attempts to address this problem by returning a special
object from RPC calls, which separates the rpc-layer status and the
remote results into different attributes.

All the users of rpc (mainly cmdlib, but also bootstrap and the
HooksMaster in mcpu) have been converted to this new model. The code has
changed from, e.g. for boolean return types:

  if not self.rpc.call_...

to

  result = self.rpc.call_
  if result.failed or not result.data:
     ^ rpc-layer error    |
                          - result payload

While this is slightly more complicated, it will allow cleaner checks in
the future; right now the code is just a plain port, without
optimizations.

There's also a "result.Raise()" which raises an OpExecError if the
rpc-layer had errors.

One side-effect of the patch is that now all return types from the
rpc.call_ functions are of either RpcResult (single-node) or dicts of
(node name, RpcResult); previously, some functions were returning
different object types based on error status.

The code passes burnin (after many retries :).

Reviewed-by: imsnah
parent 00f91f29
......@@ -268,9 +268,11 @@ def FinalizeClusterDestroy(master):
begun in cmdlib.LUDestroyOpcode.
"""
if not rpc.RpcRunner.call_node_stop_master(master, True):
result = rpc.RpcRunner.call_node_stop_master(master, True)
if result.failed or not result.data:
logging.warning("Could not disable the master role")
if not rpc.RpcRunner.call_node_leave_cluster(master):
result = rpc.RpcRunner.call_node_leave_cluster(master)
if result.failed or not result.data:
logging.warning("Could not shutdown the node daemon and cleanup the node")
......@@ -363,7 +365,8 @@ def MasterFailover():
logging.info("Setting master to %s, old master: %s", new_master, old_master)
if not rpc.RpcRunner.call_node_stop_master(old_master, True):
result = rpc.RpcRunner.call_node_stop_master(old_master, True)
if result.failed or not result.data:
logging.error("Could not disable the master role on the old master"
" %s, please disable manually", old_master)
......@@ -379,7 +382,8 @@ def MasterFailover():
# cluster info
cfg.Update(cluster_info)
if not rpc.RpcRunner.call_node_start_master(new_master, True):
result = rpc.RpcRunner.call_node_start_master(new_master, True)
if result.failed or not result.data:
logging.error("Could not start the master role on the new master"
" %s, please check", new_master)
rcode = 1
......@@ -425,13 +429,15 @@ def GatherMasterVotes(node_list):
other_masters = {}
votes = {}
for node in results:
if not isinstance(results[node], (tuple, list)) or len(results[node]) < 3:
nres = results[node]
data = nres.data
if nres.failed or not isinstance(data, (tuple, list)) or len(data) < 3:
# here the rpc layer should have already logged errors
if None not in votes:
votes[None] = 0
votes[None] += 1
continue
master_node = results[node][2]
master_node = data[2]
if master_node not in votes:
votes[master_node] = 0
votes[master_node] += 1
......
This diff is collapsed.
......@@ -337,10 +337,10 @@ class HooksMaster(object):
raise errors.HooksFailure("Communication failure")
for node_name in results:
res = results[node_name]
if res is False or not isinstance(res, list):
if res.failed or res.data is False or not isinstance(res.data, list):
self.proc.LogWarning("Communication failure to node %s" % node_name)
continue
for script, hkr, output in res:
for script, hkr, output in res.data:
if hkr == constants.HKR_FAIL:
output = output.strip().encode("string_escape")
errs.append((node_name, script, output))
......
......@@ -40,6 +40,7 @@ from ganeti import objects
from ganeti import http
from ganeti import serializer
from ganeti import constants
from ganeti import errors
# Module level variable
......@@ -72,6 +73,37 @@ def Shutdown():
_http_manager = None
class RpcResult(object):
"""RPC Result class.
This class holds an RPC result. It is needed since in multi-node
calls we can't raise an exception just because one one out of many
failed, and therefore we use this class to encapsulate the result.
"""
def __init__(self, data, failed=False, call=None, node=None):
self.failed = failed
self.call = None
self.node = None
if failed:
self.error = data
self.data = None
else:
self.data = data
self.error = None
def Raise(self):
"""If the result has failed, raise an OpExecError.
This is used so that LU code doesn't have to check for each
result, but instead can call this function.
"""
if self.failed:
raise errors.OpExecError("Call '%s' to node '%s' has failed: %s" %
(self.call, self.node, self.error))
class Client:
"""RPC Client class.
......@@ -145,7 +177,8 @@ class Client:
for name, req in self.nc.iteritems():
if req.success and req.resp_status == http.HTTP_OK:
results[name] = serializer.LoadJson(req.resp_body)
results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
node=name, call=self.procedure)
continue
# TODO: Better error reporting
......@@ -155,7 +188,8 @@ class Client:
msg = req.resp_body
logging.error("RPC error from node %s: %s", name, msg)
results[name] = False
results[name] = RpcResult(data=msg, failed=True, node=name,
call=self.procedure)
return results
......@@ -271,6 +305,10 @@ class RpcRunner(object):
c.ConnectNode(node)
return c.GetResults().get(node, False)
#
# Begin RPC calls
#
def call_volume_list(self, node_list, vg_name):
"""Gets the logical volumes present in a given volume group.
......@@ -446,19 +484,17 @@ class RpcRunner(object):
retux = self._MultiNodeCall(node_list, "node_info",
[vg_name, hypervisor_type])
for node_name in retux:
ret = retux.get(node_name, False)
if type(ret) != dict:
logging.error("could not connect to node %s", node_name)
ret = {}
utils.CheckDict(ret, {
'memory_total' : '-',
'memory_dom0' : '-',
'memory_free' : '-',
'vg_size' : 'node_unreachable',
'vg_free' : '-',
}, "call_node_info")
for result in retux.itervalues():
if result.failed or not isinstance(result.data, dict):
result.data = {}
utils.CheckDict(result.data, {
'memory_total' : '-',
'memory_dom0' : '-',
'memory_free' : '-',
'vg_size' : 'node_unreachable',
'vg_free' : '-',
}, "call_node_info")
return retux
def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
......@@ -647,14 +683,11 @@ class RpcRunner(object):
"""
result = self._MultiNodeCall(node_list, "os_diagnose", [])
new_result = {}
for node_name in result:
if result[node_name]:
nr = [objects.OS.FromDict(oss) for oss in result[node_name]]
else:
nr = []
new_result[node_name] = nr
return new_result
for node_name, node_result in result.iteritems():
if not node_result.failed and node_result.data:
node_result.data = [objects.OS.FromDict(oss)
for oss in node_result.data]
return result
def call_os_get(self, node, name):
"""Returns an OS definition.
......@@ -663,10 +696,9 @@ class RpcRunner(object):
"""
result = self._SingleNodeCall(node, "os_get", [name])
if isinstance(result, dict):
return objects.OS.FromDict(result)
else:
return result
if not result.failed and isinstance(result.data, dict):
result.data = objects.OS.FromDict(result.data)
return result
def call_hooks_runner(self, node_list, hpath, phase, env):
"""Call the hooks runner.
......@@ -743,9 +775,9 @@ class RpcRunner(object):
"""
result = self._SingleNodeCall(node, "export_info", [path])
if not result:
return result
return objects.SerializableConfigParser.Loads(str(result))
if not result.failed and result.data:
result.data = objects.SerializableConfigParser.Loads(str(result.data))
return result
def call_instance_os_import(self, node, inst, src_node, src_images,
cluster_name):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment