diff --git a/autotools/build-rpc b/autotools/build-rpc index 0e8d5b93e07c01663a6834dcd6cf23d2eea53684..33e9cfc05a9a68c797915be92179c63b83ac1f3d 100755 --- a/autotools/build-rpc +++ b/autotools/build-rpc @@ -117,7 +117,7 @@ def _WriteBaseClass(sw, clsname, calls): sw.Write("_CALLS = rpc_defs.CALLS[%r]", clsname) sw.Write("") - for (name, kind, timeout, args, postproc, desc) in calls: + for (name, kind, timeout, args, _, desc) in calls: funcargs = ["self"] if kind == _SINGLE: @@ -148,8 +148,6 @@ def _WriteBaseClass(sw, clsname, calls): # In case line gets too long and is wrapped in a bad spot buf.write("( ") - if postproc: - buf.write("%s(" % postproc) buf.write("self._Call(_def, ") if kind == _SINGLE: buf.write("[node]") @@ -162,8 +160,6 @@ def _WriteBaseClass(sw, clsname, calls): if kind == _SINGLE: buf.write("[node]") - if postproc: - buf.write(")") buf.write(")") for line in _WrapCode(buf.getvalue()): diff --git a/lib/rpc.py b/lib/rpc.py index 74c924f408cbb6b7f5bbfe8f16e50c00bf648851..8ccd7111d5d025586b419050966c3a35f607212c 100644 --- a/lib/rpc.py +++ b/lib/rpc.py @@ -434,13 +434,18 @@ class _RpcClientBase: """Entry point for automatically generated RPC wrappers. """ - (procedure, _, _, argdefs, _, _) = cdef + (procedure, _, _, argdefs, postproc_fn, _) = cdef body = serializer.DumpJson(map(self._encoder, zip(map(compat.snd, argdefs), args)), indent=False) - return self._proc(node_list, procedure, body, read_timeout=timeout) + result = self._proc(node_list, procedure, body, read_timeout=timeout) + + if postproc_fn: + return postproc_fn(result) + else: + return result def _ObjectToDict(value): @@ -615,65 +620,6 @@ class RpcRunner(_RpcClientBase, """ return self._InstDict(instance, osp=osparams) - @staticmethod - def _MigrationStatusPostProc(result): - if not result.fail_msg and result.payload is not None: - result.payload = objects.MigrationStatus.FromDict(result.payload) - return result - - @staticmethod - def _BlockdevFindPostProc(result): - if not result.fail_msg and result.payload is not None: - result.payload = objects.BlockDevStatus.FromDict(result.payload) - return result - - @staticmethod - def _BlockdevGetMirrorStatusPostProc(result): - if not result.fail_msg: - result.payload = [objects.BlockDevStatus.FromDict(i) - for i in result.payload] - return result - - @staticmethod - def _BlockdevGetMirrorStatusMultiPostProc(result): - for nres in result.values(): - if nres.fail_msg: - continue - - for idx, (success, status) in enumerate(nres.payload): - if success: - nres.payload[idx] = (success, objects.BlockDevStatus.FromDict(status)) - - return result - - @staticmethod - def _OsGetPostProc(result): - if not result.fail_msg and isinstance(result.payload, dict): - result.payload = objects.OS.FromDict(result.payload) - return result - - @staticmethod - def _ImpExpStatusPostProc(result): - """Post-processor for import/export status. - - @rtype: Payload containing list of L{objects.ImportExportStatus} instances - @return: Returns a list of the state of each named import/export or None if - a status couldn't be retrieved - - """ - if not result.fail_msg: - decoded = [] - - for i in result.payload: - if i is None: - decoded.append(None) - continue - decoded.append(objects.ImportExportStatus.FromDict(i)) - - result.payload = decoded - - return result - # # Begin RPC calls # diff --git a/lib/rpc_defs.py b/lib/rpc_defs.py index 35bd5e64ab835cdbebea854c519f8dbef26d656d..ad0cb3a274f232da92f24c0b02e4365c4c1add53 100644 --- a/lib/rpc_defs.py +++ b/lib/rpc_defs.py @@ -37,6 +37,7 @@ RPC definition fields: """ from ganeti import utils +from ganeti import objects # Guidelines for choosing timeouts: @@ -75,6 +76,79 @@ def _Prepare(calls): return utils.SequenceToDict(calls) +def _MigrationStatusPostProc(result): + """Post-processor for L{rpc.RpcRunner.call_instance_get_migration_status}. + + """ + if not result.fail_msg and result.payload is not None: + result.payload = objects.MigrationStatus.FromDict(result.payload) + return result + + +def _BlockdevFindPostProc(result): + """Post-processor for L{rpc.RpcRunner.call_blockdev_find}. + + """ + if not result.fail_msg and result.payload is not None: + result.payload = objects.BlockDevStatus.FromDict(result.payload) + return result + + +def _BlockdevGetMirrorStatusPostProc(result): + """Post-processor for L{rpc.RpcRunner.call_blockdev_getmirrorstatus}. + + """ + if not result.fail_msg: + result.payload = map(objects.BlockDevStatus.FromDict, result.payload) + return result + + +def _BlockdevGetMirrorStatusMultiPostProc(result): + """Post-processor for L{rpc.RpcRunner.call_blockdev_getmirrorstatus_multi}. + + """ + for nres in result.values(): + if nres.fail_msg: + continue + + for idx, (success, status) in enumerate(nres.payload): + if success: + nres.payload[idx] = (success, objects.BlockDevStatus.FromDict(status)) + + return result + + +def _OsGetPostProc(result): + """Post-processor for L{rpc.RpcRunner.call_os_get}. + + """ + if not result.fail_msg and isinstance(result.payload, dict): + result.payload = objects.OS.FromDict(result.payload) + return result + + +def _ImpExpStatusPostProc(result): + """Post-processor for import/export status. + + @rtype: Payload containing list of L{objects.ImportExportStatus} instances + @return: Returns a list of the state of each named import/export or None if + a status couldn't be retrieved + + """ + if not result.fail_msg: + decoded = [] + + for i in result.payload: + if i is None: + decoded.append(None) + continue + decoded.append(objects.ImportExportStatus.FromDict(i)) + + result.payload = decoded + + return result + + _FILE_STORAGE_CALLS = [ ("file_storage_dir_create", SINGLE, TMO_FAST, [ ("file_storage_dir", None, "File storage directory"), @@ -163,7 +237,7 @@ _INSTANCE_CALLS = [ ], None, "Finalize the instance migration on the source node"), ("instance_get_migration_status", SINGLE, TMO_SLOW, [ ("instance", ED_INST_DICT, "Instance object"), - ], "self._MigrationStatusPostProc", "Report migration status"), + ], _MigrationStatusPostProc, "Report migration status"), ("instance_start", SINGLE, TMO_NORMAL, [ ("instance_hvp_bep", ED_INST_DICT_HVP_BEP, None), ("startup_paused", None, None), @@ -192,7 +266,7 @@ _IMPEXP_CALLS = [ ], None, "Starts an export daemon"), ("impexp_status", SINGLE, TMO_FAST, [ ("names", None, "Import/export names"), - ], "self._ImpExpStatusPostProc", "Gets the status of an import or export"), + ], _ImpExpStatusPostProc, "Gets the status of an import or export"), ("impexp_abort", SINGLE, TMO_NORMAL, [ ("name", None, "Import/export name"), ], None, "Aborts an import or export"), @@ -302,15 +376,15 @@ _BLOCKDEV_CALLS = [ ], None, "Request rename of the given block devices"), ("blockdev_find", SINGLE, TMO_NORMAL, [ ("disk", ED_OBJECT_DICT, None), - ], "self._BlockdevFindPostProc", + ], _BlockdevFindPostProc, "Request identification of a given block device"), ("blockdev_getmirrorstatus", SINGLE, TMO_NORMAL, [ ("disks", ED_OBJECT_DICT_LIST, None), - ], "self._BlockdevGetMirrorStatusPostProc", + ], _BlockdevGetMirrorStatusPostProc, "Request status of a (mirroring) device"), ("blockdev_getmirrorstatus_multi", MULTI, TMO_NORMAL, [ ("node_disks", ED_NODE_TO_DISK_DICT, None), - ], "self._BlockdevGetMirrorStatusMultiPostProc", + ], _BlockdevGetMirrorStatusMultiPostProc, "Request status of (mirroring) devices from multiple nodes"), ] @@ -325,7 +399,7 @@ _OS_CALLS = [ ], None, "Run a validation routine for a given OS"), ("os_get", SINGLE, TMO_FAST, [ ("name", None, None), - ], "self._OsGetPostProc", "Returns an OS definition"), + ], _OsGetPostProc, "Returns an OS definition"), ] _NODE_CALLS = [