diff --git a/lib/build/rpc_definitions.py b/lib/build/rpc_definitions.py index 5400b33d8aecd448163860c94328d30748b70651..930200b9857c552bb62a7e83f420678597ed6420 100644 --- a/lib/build/rpc_definitions.py +++ b/lib/build/rpc_definitions.py @@ -361,4 +361,14 @@ CALLS = { "RpcClientDefault": (_IMPEXP_CALLS + _X509_CALLS + _OS_CALLS + _NODE_CALLS + _FILE_STORAGE_CALLS + _MISC_CALLS + _INSTANCE_CALLS + _BLOCKDEV_CALLS + _STORAGE_CALLS), + "RpcClientJobQueue": [ + ("jobqueue_update", MULTI, TMO_URGENT, [ + ("file_name", None, None), + ("content", "self._Compress(%s)", None), + ], None, "Update job queue file"), + ("jobqueue_purge", SINGLE, TMO_NORMAL, [], None, "Purge job queue"), + ("jobqueue_rename", MULTI, TMO_URGENT, [ + ("rename", None, None), + ], None, "Rename job queue file"), + ], } diff --git a/lib/jqueue.py b/lib/jqueue.py index 3cf3b428762be1b26b4e509fc63fb34c2e819d33..3038ef41ce2d564005cb36429733b00beda97852 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -1611,6 +1611,12 @@ class JobQueue(object): logging.info("Job queue inspection finished") + def _GetRpc(self, address_list): + """Gets RPC runner with context. + + """ + return rpc.JobQueueRunner(self.context, address_list) + @locking.ssynchronized(_LOCK) @_RequireOpenQueue def AddNode(self, node): @@ -1624,7 +1630,7 @@ class JobQueue(object): assert node_name != self._my_hostname # Clean queue directory on added node - result = rpc.RpcRunner.call_jobqueue_purge(node_name) + result = self._GetRpc(None).call_jobqueue_purge(node_name) msg = result.fail_msg if msg: logging.warning("Cannot cleanup queue directory on node %s: %s", @@ -1642,13 +1648,15 @@ class JobQueue(object): # Upload current serial file files.append(constants.JOB_QUEUE_SERIAL_FILE) + # Static address list + addrs = [node.primary_ip] + for file_name in files: # Read file content content = utils.ReadFile(file_name) - result = rpc.RpcRunner.call_jobqueue_update([node_name], - [node.primary_ip], - file_name, content) + result = self._GetRpc(addrs).call_jobqueue_update([node_name], file_name, + content) msg = result[node_name].fail_msg if msg: logging.error("Failed to upload file %s to node %s: %s", @@ -1732,7 +1740,7 @@ class JobQueue(object): if replicate: names, addrs = self._GetNodeIp() - result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data) + result = self._GetRpc(addrs).call_jobqueue_update(names, file_name, data) self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name) def _RenameFilesUnlocked(self, rename): @@ -1751,7 +1759,7 @@ class JobQueue(object): # ... and on all nodes names, addrs = self._GetNodeIp() - result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename) + result = self._GetRpc(addrs).call_jobqueue_rename(names, rename) self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename) @staticmethod diff --git a/lib/rpc.py b/lib/rpc.py index 6f36ceb8b446d75957cb8302dcc0c0d16c83b077..bffd0d9ac8959ac160953fbbec8a6214a859780c 100644 --- a/lib/rpc.py +++ b/lib/rpc.py @@ -781,39 +781,6 @@ class RpcRunner(_generated_rpc.RpcClientDefault): return self.call_test_delay(node_list, duration, read_timeout=int(duration + 5)) - @classmethod - @_RpcTimeout(_TMO_URGENT) - def call_jobqueue_update(cls, node_list, address_list, file_name, content): - """Update job queue. - - This is a multi-node call. - - """ - return cls._StaticMultiNodeCall(node_list, "jobqueue_update", - [file_name, _Compress(content)], - address_list=address_list) - - @classmethod - @_RpcTimeout(_TMO_NORMAL) - def call_jobqueue_purge(cls, node): - """Purge job queue. - - This is a single-node call. - - """ - return cls._StaticSingleNodeCall(node, "jobqueue_purge", []) - - @classmethod - @_RpcTimeout(_TMO_URGENT) - def call_jobqueue_rename(cls, node_list, address_list, rename): - """Rename a job queue file. - - This is a multi-node call. - - """ - return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename, - address_list=address_list) - @_RpcTimeout(_TMO_NORMAL) def call_hypervisor_validate_params(self, node_list, hvname, hvparams): """Validate the hypervisor params. @@ -832,3 +799,34 @@ class RpcRunner(_generated_rpc.RpcClientDefault): hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams) return self._MultiNodeCall(node_list, "hypervisor_validate_params", [hvname, hv_full]) + + +class JobQueueRunner(_generated_rpc.RpcClientJobQueue): + """RPC wrappers for job queue. + + """ + _Compress = staticmethod(_Compress) + + def __init__(self, context, address_list): + """Initializes this class. + + """ + _generated_rpc.RpcClientJobQueue.__init__(self) + + if address_list is None: + resolver = _SsconfResolver + else: + # Caller provided an address list + resolver = _StaticResolver(address_list) + + self._proc = _RpcProcessor(resolver, + netutils.GetDaemonPort(constants.NODED), + lock_monitor_cb=context.glm.AddToLockMonitor) + + def _Call(self, node_list, procedure, timeout, args): + """Entry point for automatically generated RPC wrappers. + + """ + body = serializer.DumpJson(args, indent=False) + + return self._proc(node_list, procedure, body, read_timeout=timeout) diff --git a/lib/server/noded.py b/lib/server/noded.py index 86fff5ad8409418e9cbd9e905553a1e574e46a05..384807bcecad142213ebf1d023f32d93df430c52 100644 --- a/lib/server/noded.py +++ b/lib/server/noded.py @@ -914,7 +914,7 @@ class NodeHttpServer(http.server.HttpServer): """ # TODO: What if a file fails to rename? - return [backend.JobQueueRename(old, new) for old, new in params] + return [backend.JobQueueRename(old, new) for old, new in params[0]] # hypervisor ---------------