Commit fb1ffbca authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

Convert job queue's RPC to generated code



With these changes job queue RPC will finally show up on the lock
monitor. See below for an example. A job queue-specific class is used to
restrict the use of a static list for name resolution to the job queue.
Further improvements can be made to not re-create the whole RPC client
for every call (e.g. by using a more dynamic resolver), but for now this
works.

rpc/node8.example.com/jobqueue_update Jq8/Job9/TEST_DELAY
Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarIustin Pop <iustin@google.com>
parent 46c293f0
......@@ -361,4 +361,14 @@ CALLS = {
"RpcClientDefault": (_IMPEXP_CALLS + _X509_CALLS + _OS_CALLS + _NODE_CALLS +
_FILE_STORAGE_CALLS + _MISC_CALLS + _INSTANCE_CALLS + _BLOCKDEV_CALLS +
_STORAGE_CALLS),
"RpcClientJobQueue": [
("jobqueue_update", MULTI, TMO_URGENT, [
("file_name", None, None),
("content", "self._Compress(%s)", None),
], None, "Update job queue file"),
("jobqueue_purge", SINGLE, TMO_NORMAL, [], None, "Purge job queue"),
("jobqueue_rename", MULTI, TMO_URGENT, [
("rename", None, None),
], None, "Rename job queue file"),
],
}
......@@ -1611,6 +1611,12 @@ class JobQueue(object):
logging.info("Job queue inspection finished")
def _GetRpc(self, address_list):
"""Gets RPC runner with context.
"""
return rpc.JobQueueRunner(self.context, address_list)
@locking.ssynchronized(_LOCK)
@_RequireOpenQueue
def AddNode(self, node):
......@@ -1624,7 +1630,7 @@ class JobQueue(object):
assert node_name != self._my_hostname
# Clean queue directory on added node
result = rpc.RpcRunner.call_jobqueue_purge(node_name)
result = self._GetRpc(None).call_jobqueue_purge(node_name)
msg = result.fail_msg
if msg:
logging.warning("Cannot cleanup queue directory on node %s: %s",
......@@ -1642,13 +1648,15 @@ class JobQueue(object):
# Upload current serial file
files.append(constants.JOB_QUEUE_SERIAL_FILE)
# Static address list
addrs = [node.primary_ip]
for file_name in files:
# Read file content
content = utils.ReadFile(file_name)
result = rpc.RpcRunner.call_jobqueue_update([node_name],
[node.primary_ip],
file_name, content)
result = self._GetRpc(addrs).call_jobqueue_update([node_name], file_name,
content)
msg = result[node_name].fail_msg
if msg:
logging.error("Failed to upload file %s to node %s: %s",
......@@ -1732,7 +1740,7 @@ class JobQueue(object):
if replicate:
names, addrs = self._GetNodeIp()
result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
result = self._GetRpc(addrs).call_jobqueue_update(names, file_name, data)
self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
def _RenameFilesUnlocked(self, rename):
......@@ -1751,7 +1759,7 @@ class JobQueue(object):
# ... and on all nodes
names, addrs = self._GetNodeIp()
result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
@staticmethod
......
......@@ -781,39 +781,6 @@ class RpcRunner(_generated_rpc.RpcClientDefault):
return self.call_test_delay(node_list, duration,
read_timeout=int(duration + 5))
@classmethod
@_RpcTimeout(_TMO_URGENT)
def call_jobqueue_update(cls, node_list, address_list, file_name, content):
"""Update job queue.
This is a multi-node call.
"""
return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
[file_name, _Compress(content)],
address_list=address_list)
@classmethod
@_RpcTimeout(_TMO_NORMAL)
def call_jobqueue_purge(cls, node):
"""Purge job queue.
This is a single-node call.
"""
return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
@classmethod
@_RpcTimeout(_TMO_URGENT)
def call_jobqueue_rename(cls, node_list, address_list, rename):
"""Rename a job queue file.
This is a multi-node call.
"""
return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
address_list=address_list)
@_RpcTimeout(_TMO_NORMAL)
def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
"""Validate the hypervisor params.
......@@ -832,3 +799,34 @@ class RpcRunner(_generated_rpc.RpcClientDefault):
hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
return self._MultiNodeCall(node_list, "hypervisor_validate_params",
[hvname, hv_full])
class JobQueueRunner(_generated_rpc.RpcClientJobQueue):
"""RPC wrappers for job queue.
"""
_Compress = staticmethod(_Compress)
def __init__(self, context, address_list):
"""Initializes this class.
"""
_generated_rpc.RpcClientJobQueue.__init__(self)
if address_list is None:
resolver = _SsconfResolver
else:
# Caller provided an address list
resolver = _StaticResolver(address_list)
self._proc = _RpcProcessor(resolver,
netutils.GetDaemonPort(constants.NODED),
lock_monitor_cb=context.glm.AddToLockMonitor)
def _Call(self, node_list, procedure, timeout, args):
"""Entry point for automatically generated RPC wrappers.
"""
body = serializer.DumpJson(args, indent=False)
return self._proc(node_list, procedure, body, read_timeout=timeout)
......@@ -914,7 +914,7 @@ class NodeHttpServer(http.server.HttpServer):
"""
# TODO: What if a file fails to rename?
return [backend.JobQueueRename(old, new) for old, new in params]
return [backend.JobQueueRename(old, new) for old, new in params[0]]
# hypervisor ---------------
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment