From fb1ffbca53e1fa9f7167ab04ec9c8299b64c43d6 Mon Sep 17 00:00:00 2001 From: Michael Hanselmann <hansmi@google.com> Date: Tue, 25 Oct 2011 17:30:33 +0200 Subject: [PATCH] Convert job queue's RPC to generated code With these changes job queue RPC will finally show up on the lock monitor. See below for an example. A job queue-specific class is used to restrict the use of a static list for name resolution to the job queue. Further improvements can be made to not re-create the whole RPC client for every call (e.g. by using a more dynamic resolver), but for now this works. rpc/node8.example.com/jobqueue_update Jq8/Job9/TEST_DELAY Signed-off-by: Michael Hanselmann <hansmi@google.com> Reviewed-by: Iustin Pop <iustin@google.com> --- lib/build/rpc_definitions.py | 10 ++++++ lib/jqueue.py | 20 +++++++---- lib/rpc.py | 64 +++++++++++++++++------------------- lib/server/noded.py | 2 +- 4 files changed, 56 insertions(+), 40 deletions(-) diff --git a/lib/build/rpc_definitions.py b/lib/build/rpc_definitions.py index 5400b33d8..930200b98 100644 --- a/lib/build/rpc_definitions.py +++ b/lib/build/rpc_definitions.py @@ -361,4 +361,14 @@ CALLS = { "RpcClientDefault": (_IMPEXP_CALLS + _X509_CALLS + _OS_CALLS + _NODE_CALLS + _FILE_STORAGE_CALLS + _MISC_CALLS + _INSTANCE_CALLS + _BLOCKDEV_CALLS + _STORAGE_CALLS), + "RpcClientJobQueue": [ + ("jobqueue_update", MULTI, TMO_URGENT, [ + ("file_name", None, None), + ("content", "self._Compress(%s)", None), + ], None, "Update job queue file"), + ("jobqueue_purge", SINGLE, TMO_NORMAL, [], None, "Purge job queue"), + ("jobqueue_rename", MULTI, TMO_URGENT, [ + ("rename", None, None), + ], None, "Rename job queue file"), + ], } diff --git a/lib/jqueue.py b/lib/jqueue.py index 3cf3b4287..3038ef41c 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -1611,6 +1611,12 @@ class JobQueue(object): logging.info("Job queue inspection finished") + def _GetRpc(self, address_list): + """Gets RPC runner with context. + + """ + return rpc.JobQueueRunner(self.context, address_list) + @locking.ssynchronized(_LOCK) @_RequireOpenQueue def AddNode(self, node): @@ -1624,7 +1630,7 @@ class JobQueue(object): assert node_name != self._my_hostname # Clean queue directory on added node - result = rpc.RpcRunner.call_jobqueue_purge(node_name) + result = self._GetRpc(None).call_jobqueue_purge(node_name) msg = result.fail_msg if msg: logging.warning("Cannot cleanup queue directory on node %s: %s", @@ -1642,13 +1648,15 @@ class JobQueue(object): # Upload current serial file files.append(constants.JOB_QUEUE_SERIAL_FILE) + # Static address list + addrs = [node.primary_ip] + for file_name in files: # Read file content content = utils.ReadFile(file_name) - result = rpc.RpcRunner.call_jobqueue_update([node_name], - [node.primary_ip], - file_name, content) + result = self._GetRpc(addrs).call_jobqueue_update([node_name], file_name, + content) msg = result[node_name].fail_msg if msg: logging.error("Failed to upload file %s to node %s: %s", @@ -1732,7 +1740,7 @@ class JobQueue(object): if replicate: names, addrs = self._GetNodeIp() - result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data) + result = self._GetRpc(addrs).call_jobqueue_update(names, file_name, data) self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name) def _RenameFilesUnlocked(self, rename): @@ -1751,7 +1759,7 @@ class JobQueue(object): # ... and on all nodes names, addrs = self._GetNodeIp() - result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename) + result = self._GetRpc(addrs).call_jobqueue_rename(names, rename) self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename) @staticmethod diff --git a/lib/rpc.py b/lib/rpc.py index 6f36ceb8b..bffd0d9ac 100644 --- a/lib/rpc.py +++ b/lib/rpc.py @@ -781,39 +781,6 @@ class RpcRunner(_generated_rpc.RpcClientDefault): return self.call_test_delay(node_list, duration, read_timeout=int(duration + 5)) - @classmethod - @_RpcTimeout(_TMO_URGENT) - def call_jobqueue_update(cls, node_list, address_list, file_name, content): - """Update job queue. - - This is a multi-node call. - - """ - return cls._StaticMultiNodeCall(node_list, "jobqueue_update", - [file_name, _Compress(content)], - address_list=address_list) - - @classmethod - @_RpcTimeout(_TMO_NORMAL) - def call_jobqueue_purge(cls, node): - """Purge job queue. - - This is a single-node call. - - """ - return cls._StaticSingleNodeCall(node, "jobqueue_purge", []) - - @classmethod - @_RpcTimeout(_TMO_URGENT) - def call_jobqueue_rename(cls, node_list, address_list, rename): - """Rename a job queue file. - - This is a multi-node call. - - """ - return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename, - address_list=address_list) - @_RpcTimeout(_TMO_NORMAL) def call_hypervisor_validate_params(self, node_list, hvname, hvparams): """Validate the hypervisor params. @@ -832,3 +799,34 @@ class RpcRunner(_generated_rpc.RpcClientDefault): hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams) return self._MultiNodeCall(node_list, "hypervisor_validate_params", [hvname, hv_full]) + + +class JobQueueRunner(_generated_rpc.RpcClientJobQueue): + """RPC wrappers for job queue. + + """ + _Compress = staticmethod(_Compress) + + def __init__(self, context, address_list): + """Initializes this class. + + """ + _generated_rpc.RpcClientJobQueue.__init__(self) + + if address_list is None: + resolver = _SsconfResolver + else: + # Caller provided an address list + resolver = _StaticResolver(address_list) + + self._proc = _RpcProcessor(resolver, + netutils.GetDaemonPort(constants.NODED), + lock_monitor_cb=context.glm.AddToLockMonitor) + + def _Call(self, node_list, procedure, timeout, args): + """Entry point for automatically generated RPC wrappers. + + """ + body = serializer.DumpJson(args, indent=False) + + return self._proc(node_list, procedure, body, read_timeout=timeout) diff --git a/lib/server/noded.py b/lib/server/noded.py index 86fff5ad8..384807bce 100644 --- a/lib/server/noded.py +++ b/lib/server/noded.py @@ -914,7 +914,7 @@ class NodeHttpServer(http.server.HttpServer): """ # TODO: What if a file fails to rename? - return [backend.JobQueueRename(old, new) for old, new in params] + return [backend.JobQueueRename(old, new) for old, new in params[0]] # hypervisor --------------- -- GitLab