Commit 0bbe448c authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

Change masterd/client RPC protocol

- Introduce abstraction class on client side
- Use constants for method names
- Adopt legacy function SubmitOpCode to use it

Reviewed-by: iustinp
parent 3d8548c4
......@@ -225,46 +225,30 @@ class ClientOps:
"""Class holding high-level client operations."""
def __init__(self, server):
self.server = server
self._cpu = None
def _getcpu(self):
if self._cpu is None:
self._cpu = mcpu.Processor(lambda x: None)
return self._cpu
def handle_request(self, operation, args):
print operation, args
if operation == "submit":
return self.put(args)
elif operation == "query":
return self.query(args)
else:
raise ValueError("Invalid operation")
def put(self, args):
job = luxi.UnserializeJob(args)
rid = self.server.queue.put(job)
return rid
def query(self, args):
path = args["object"]
fields = args["fields"]
names = args["names"]
if path == "instances":
opclass = opcodes.OpQueryInstances
elif path == "jobs":
# early exit because job query-ing is special (not via opcodes)
return self.query_jobs(fields, names)
else:
raise ValueError("Invalid object %s" % path)
def handle_request(self, method, args):
queue = self.server.jobqueue
# TODO: Parameter validation
if method == luxi.REQ_SUBMIT_JOB:
ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
return queue.SubmitJob(ops)
op = opclass(output_fields = fields, names=names)
cpu = self._getcpu()
result = cpu.ExecOpCode(op)
return result
elif method == luxi.REQ_CANCEL_JOB:
(job_id, ) = args
return queue.CancelJob(job_id)
def query_jobs(self, fields, names):
return self.server.queue.query_jobs(fields, names)
elif method == luxi.REQ_ARCHIVE_JOB:
(job_id, ) = args
return queue.ArchiveJob(job_id)
elif method == luxi.REQ_QUERY_JOBS:
(job_ids, fields) = args
return queue.QueryJobs(job_ids, fields)
else:
raise ValueError("Invalid operation")
def JobRunner(proc, job, context):
......
......@@ -41,7 +41,7 @@ from optparse import (OptionParser, make_option, TitledHelpFormatter,
Option, OptionValueError, SUPPRESS_HELP)
__all__ = ["DEBUG_OPT", "NOHDR_OPT", "SEP_OPT", "GenericMain",
"SubmitOpCode", "SubmitJob", "SubmitQuery",
"SubmitOpCode",
"cli_option", "GenerateTable", "AskUser",
"ARGS_NONE", "ARGS_FIXED", "ARGS_ATLEAST", "ARGS_ANY", "ARGS_ONE",
"USEUNITS_OPT", "FIELDS_OPT", "FORCE_OPT",
......@@ -370,7 +370,7 @@ def AskUser(text, choices=None):
def SubmitOpCode(op, proc=None, feedback_fn=None):
"""Function to submit an opcode.
"""Legacy function to submit an opcode.
This is just a simple wrapper over the construction of the processor
instance. It should be extended to better handle feedback and
......@@ -379,46 +379,30 @@ def SubmitOpCode(op, proc=None, feedback_fn=None):
"""
# TODO: Fix feedback_fn situation.
cl = luxi.Client()
job = opcodes.Job(op_list=[op])
jid = SubmitJob(job, cl)
query = {
"object": "jobs",
"fields": ["status"],
"names": [jid],
}
job_id = cl.SubmitJob([op])
while True:
jdata = SubmitQuery(query, cl)
if not jdata:
jobs = cl.QueryJobs([job_id], ["status"])
if not jobs:
# job not found, go away!
raise errors.JobLost("Job with id %s lost" % jid)
raise errors.JobLost("Job with id %s lost" % job_id)
status = jdata[0][0]
if status in (opcodes.Job.STATUS_SUCCESS, opcodes.Job.STATUS_FAIL):
# TODO: Handle canceled and archived jobs
status = jobs[0][0]
if status in (constants.JOB_STATUS_SUCCESS, constants.JOB_STATUS_ERROR):
break
time.sleep(1)
query["fields"].extend(["op_list", "op_status", "op_result"])
jdata = SubmitQuery(query, cl)
if not jdata:
raise errors.JobLost("Job with id %s lost" % jid)
status, op_list, op_status, op_result = jdata[0]
if status != opcodes.Job.STATUS_SUCCESS:
raise errors.OpExecError(op_result[0])
return op_result[0]
jobs = cl.QueryJobs([job_id], ["status", "result"])
if not jobs:
raise errors.JobLost("Job with id %s lost" % job_id)
def SubmitJob(job, cl=None):
if cl is None:
cl = luxi.Client()
return cl.SubmitJob(job)
def SubmitQuery(data, cl=None):
if cl is None:
cl = luxi.Client()
return cl.Query(data)
status, result = jobs[0]
if status == constants.JOB_STATUS_SUCCESS:
return result[0]
else:
raise errors.OpExecError(result)
def FormatError(err):
......
......@@ -45,9 +45,10 @@ KEY_ARGS = 'args'
KEY_SUCCESS = "success"
KEY_RESULT = "result"
REQ_SUBMIT = 'submit'
REQ_ABORT = 'abort'
REQ_QUERY = 'query'
REQ_SUBMIT_JOB = "SubmitJob"
REQ_CANCEL_JOB = "CancelJob"
REQ_ARCHIVE_JOB = "ArchiveJob"
REQ_QUERY_JOBS = "QueryJobs"
DEF_CTMO = 10
DEF_RWTO = 60
......@@ -294,19 +295,17 @@ class Client(object):
return data[KEY_RESULT]
def SubmitJob(self, job):
"""Submit a job"""
return self.CallMethod(REQ_SUBMIT, SerializeJob(job))
def Query(self, data):
"""Make a query"""
result = self.CallMethod(REQ_QUERY, data)
if data["object"] == "jobs":
# custom job processing of query values
for row in result:
for idx, field in enumerate(data["fields"]):
if field == "op_list":
row[idx] = [opcodes.OpCode.LoadOpCode(i) for i in row[idx]]
return result
def SubmitJob(self, ops):
ops_state = map(lambda op: op.__getstate__(), ops)
return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
def CancelJob(self, job_id):
return self.CallMethod(REQ_CANCEL_JOB, job_id)
def ArchiveJob(self, job_id):
return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
def QueryJobs(self, job_ids, fields):
return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
# TODO: class Server(object)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment