Commit 3d8548c4 authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

Make luxi RPC more flexible

- Use constants for dict entries
- Handle exceptions on server side
- Rename client function to CallMethod to match server side naming

Reviewed-by: iustinp
parent 50a3fbb2
......@@ -175,21 +175,36 @@ class ClientRqHandler(SocketServer.BaseRequestHandler):
while True:
msg = self.read_message()
if msg is None:
print "client closed connection"
logging.info("client closed connection")
break
request = simplejson.loads(msg)
logging.debug("request: %s", request)
if not isinstance(request, dict):
print "wrong request received: %s" % msg
logging.error("wrong request received: %s", msg)
break
method = request.get('request', None)
data = request.get('data', None)
if method is None or data is None:
print "no method or data in request"
method = request.get(luxi.KEY_METHOD, None)
args = request.get(luxi.KEY_ARGS, None)
if method is None or args is None:
logging.error("no method or args in request")
break
print "request:", method, data
result = self._ops.handle_request(method, data)
print "result:", result
self.send_message(simplejson.dumps({'success': True, 'result': result}))
success = False
try:
result = self._ops.handle_request(method, args)
success = True
except:
logging.error("Unexpected exception", exc_info=True)
err = sys.exc_info()
result = "Caught exception: %s" % str(err[1])
response = {
luxi.KEY_SUCCESS: success,
luxi.KEY_RESULT: result,
}
logging.debug("response: %s", response)
self.send_message(simplejson.dumps(response))
def read_message(self):
while not self._msgs:
......
......@@ -40,8 +40,11 @@ from ganeti import serializer
from ganeti import constants
KEY_REQUEST = 'request'
KEY_DATA = 'data'
KEY_METHOD = 'method'
KEY_ARGS = 'args'
KEY_SUCCESS = "success"
KEY_RESULT = "result"
REQ_SUBMIT = 'submit'
REQ_ABORT = 'abort'
REQ_QUERY = 'query'
......@@ -82,6 +85,7 @@ class RequestError(ProtocolError):
"""
class NoMasterError(ProtocolError):
"""The master cannot be reached
......@@ -261,35 +265,42 @@ class Client(object):
address = constants.MASTER_SOCKET
self.transport = transport(address, timeouts=timeouts)
def SendRequest(self, request, data):
def CallMethod(self, method, args):
"""Send a generic request and return the response.
"""
msg = {KEY_REQUEST: request, KEY_DATA: data}
result = self.transport.Call(serializer.DumpJson(msg, indent=False))
# Build request
request = {
KEY_METHOD: method,
KEY_ARGS: args,
}
# Send request and wait for response
result = self.transport.Call(serializer.DumpJson(request, indent=False))
try:
data = serializer.LoadJson(result)
except Exception, err:
raise ProtocolError("Error while deserializing response: %s" % str(err))
# Validate response
if (not isinstance(data, dict) or
'success' not in data or
'result' not in data):
KEY_SUCCESS not in data or
KEY_RESULT not in data):
raise DecodingError("Invalid response from server: %s" % str(data))
return data
if not data[KEY_SUCCESS]:
# TODO: decide on a standard exception
raise RequestError(data[KEY_RESULT])
return data[KEY_RESULT]
def SubmitJob(self, job):
"""Submit a job"""
result = self.SendRequest(REQ_SUBMIT, SerializeJob(job))
if not result['success']:
raise RequestError(result['result'])
return result['result']
return self.CallMethod(REQ_SUBMIT, SerializeJob(job))
def Query(self, data):
"""Make a query"""
result = self.SendRequest(REQ_QUERY, data)
if not result['success']:
raise RequestError(result[result])
result = result['result']
result = self.CallMethod(REQ_QUERY, data)
if data["object"] == "jobs":
# custom job processing of query values
for row in result:
......@@ -297,3 +308,5 @@ class Client(object):
if field == "op_list":
row[idx] = [opcodes.OpCode.LoadOpCode(i) for i in row[idx]]
return result
# TODO: class Server(object)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment