Commit 123cb248 authored by Helga Velroyen's avatar Helga Velroyen

Fix lint error in 'masterd'

... and by that fixing a TODO as well.
Signed-off-by: default avatarHelga Velroyen <helgav@google.com>
Reviewed-by: default avatarJose Lopes <jabolopes@google.com>
parent f07b075e
......@@ -266,6 +266,152 @@ class ClientOps(object):
def __init__(self, server):
self.server = server
@staticmethod
def _SubmitJob(args, queue):
logging.info("Receiving new job")
(job_def, ) = args
ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def]
job_id = queue.SubmitJob(ops)
_LogNewJob(True, job_id, ops)
return job_id
@staticmethod
def _PickupJob(args, queue):
logging.info("Picking up new job from queue")
(job_id, ) = args
queue.PickupJob(job_id)
return job_id
@staticmethod
def _SubmitJobToDrainedQueue(args, queue):
logging.info("Forcefully receiving new job")
(job_def, ) = args
ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def]
job_id = queue.SubmitJobToDrainedQueue(ops)
_LogNewJob(True, job_id, ops)
return job_id
@staticmethod
def _SubmitManyJobs(args, queue):
logging.info("Receiving multiple jobs")
(job_defs, ) = args
jobs = []
for ops in job_defs:
jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
job_ids = queue.SubmitManyJobs(jobs)
for ((status, job_id), ops) in zip(job_ids, jobs):
_LogNewJob(status, job_id, ops)
return job_ids
@staticmethod
def _CancelJob(args, queue):
(job_id, ) = args
logging.info("Received job cancel request for %s", job_id)
return queue.CancelJob(job_id)
@staticmethod
def _ChangeJobPriority(args, queue):
(job_id, priority) = args
logging.info("Received request to change priority for job %s to %s",
job_id, priority)
return queue.ChangeJobPriority(job_id, priority)
@staticmethod
def _ArchiveJob(args, queue):
(job_id, ) = args
logging.info("Received job archive request for %s", job_id)
return queue.ArchiveJob(job_id)
@staticmethod
def _AutoArchiveJobs(args, queue):
(age, timeout) = args
logging.info("Received job autoarchive request for age %s, timeout %s",
age, timeout)
return queue.AutoArchiveJobs(age, timeout)
@staticmethod
def _WaitForJobChange(args, queue):
(job_id, fields, prev_job_info, prev_log_serial, timeout) = args
logging.info("Received job poll request for %s", job_id)
return queue.WaitForJobChanges(job_id, fields, prev_job_info,
prev_log_serial, timeout)
def _PerformQuery(self, args, queue):
(what, fields, qfilter) = args
if what in constants.QR_VIA_OP:
result = self._Query(opcodes.OpQuery(what=what, fields=fields,
qfilter=qfilter))
elif what == constants.QR_LOCK:
if qfilter is not None:
raise errors.OpPrereqError("Lock queries can't be filtered",
errors.ECODE_INVAL)
result = self.server.context.glm.QueryLocks(fields)
elif what == constants.QR_JOB:
result = queue.QueryJobs(fields, qfilter)
elif what in constants.QR_VIA_LUXI:
luxi_client = runtime.GetClient()
result = luxi_client.Query(what, fields, qfilter).ToDict()
else:
raise errors.OpPrereqError("Resource type '%s' unknown" % what,
errors.ECODE_INVAL)
return result
@staticmethod
def _QueryFields(args):
(what, fields) = args
req = objects.QueryFieldsRequest(what=what, fields=fields)
try:
fielddefs = query.ALL_FIELDS[req.what]
except KeyError:
raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
errors.ECODE_INVAL)
return query.QueryFields(fielddefs, req.fields)
@staticmethod
def _QueryJobs(args, queue):
(job_ids, fields) = args
if isinstance(job_ids, (tuple, list)) and job_ids:
msg = utils.CommaJoin(job_ids)
else:
msg = str(job_ids)
logging.info("Received job query request for %s", msg)
return queue.OldStyleQueryJobs(job_ids, fields)
def _QueryConfigValues(self, args):
(fields, ) = args
logging.info("Received config values query request for %s", fields)
op = opcodes.OpClusterConfigQuery(output_fields=fields)
return self._Query(op)
def _QueryClusterInfo(self):
logging.info("Received cluster info query request")
op = opcodes.OpClusterQuery()
return self._Query(op)
def _QueryTags(self, args):
(kind, name) = args
logging.info("Received tags query request")
op = opcodes.OpTagsGet(kind=kind, name=name, use_locking=False)
return self._Query(op)
@staticmethod
def _SetDrainFlag(args, queue):
(drain_flag, ) = args
logging.info("Received queue drain flag change request to %s",
drain_flag)
return queue.SetDrainFlag(drain_flag)
@staticmethod
def _SetWatcherPause(args, context):
(until, ) = args
# FIXME!
ec_id = None
return _SetWatcherPause(context, ec_id, until)
def handle_request(self, method, args): # pylint: disable=R0911
context = self.server.context
queue = context.jobqueue
......@@ -279,146 +425,48 @@ class ClientOps(object):
logging.info("Received invalid request '%s'", method)
raise ValueError("Invalid operation '%s'" % method)
# TODO: Rewrite to not exit in each 'if/elif' branch
job_id = None
if method == luxi.REQ_SUBMIT_JOB:
logging.info("Receiving new job")
(job_def, ) = args
ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def]
job_id = queue.SubmitJob(ops)
_LogNewJob(True, job_id, ops)
return job_id
job_id = self._SubmitJob(args, queue)
elif method == luxi.REQ_PICKUP_JOB:
logging.info("Picking up new job from queue")
(job_id, ) = args
queue.PickupJob(job_id)
job_id = self._PickupJob(args, queue)
elif method == luxi.REQ_SUBMIT_JOB_TO_DRAINED_QUEUE:
logging.info("Forcefully receiving new job")
(job_def, ) = args
ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def]
job_id = queue.SubmitJobToDrainedQueue(ops)
_LogNewJob(True, job_id, ops)
return job_id
job_id = self._SubmitJobToDrainedQueue(args, queue)
elif method == luxi.REQ_SUBMIT_MANY_JOBS:
logging.info("Receiving multiple jobs")
(job_defs, ) = args
jobs = []
for ops in job_defs:
jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
job_ids = queue.SubmitManyJobs(jobs)
for ((status, job_id), ops) in zip(job_ids, jobs):
_LogNewJob(status, job_id, ops)
return job_ids
job_id = self._SubmitManyJobs(args, queue)
elif method == luxi.REQ_CANCEL_JOB:
(job_id, ) = args
logging.info("Received job cancel request for %s", job_id)
return queue.CancelJob(job_id)
job_id = self._CancelJob(args, queue)
elif method == luxi.REQ_CHANGE_JOB_PRIORITY:
(job_id, priority) = args
logging.info("Received request to change priority for job %s to %s",
job_id, priority)
return queue.ChangeJobPriority(job_id, priority)
job_id = self._ChangeJobPriority(args, queue)
elif method == luxi.REQ_ARCHIVE_JOB:
(job_id, ) = args
logging.info("Received job archive request for %s", job_id)
return queue.ArchiveJob(job_id)
job_id = self._ArchiveJob(args, queue)
elif method == luxi.REQ_AUTO_ARCHIVE_JOBS:
(age, timeout) = args
logging.info("Received job autoarchive request for age %s, timeout %s",
age, timeout)
return queue.AutoArchiveJobs(age, timeout)
job_id = self._AutoArchiveJobs(args, queue)
elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
(job_id, fields, prev_job_info, prev_log_serial, timeout) = args
logging.info("Received job poll request for %s", job_id)
return queue.WaitForJobChanges(job_id, fields, prev_job_info,
prev_log_serial, timeout)
job_id = self._WaitForJobChange(args, queue)
elif method == luxi.REQ_QUERY:
(what, fields, qfilter) = args
if what in constants.QR_VIA_OP:
result = self._Query(opcodes.OpQuery(what=what, fields=fields,
qfilter=qfilter))
elif what == constants.QR_LOCK:
if qfilter is not None:
raise errors.OpPrereqError("Lock queries can't be filtered",
errors.ECODE_INVAL)
return context.glm.QueryLocks(fields)
elif what == constants.QR_JOB:
return queue.QueryJobs(fields, qfilter)
elif what in constants.QR_VIA_LUXI:
luxi_client = runtime.GetClient()
result = luxi_client.Query(what, fields, qfilter).ToDict()
else:
raise errors.OpPrereqError("Resource type '%s' unknown" % what,
errors.ECODE_INVAL)
return result
job_id = self._PerformQuery(args, queue)
elif method == luxi.REQ_QUERY_FIELDS:
(what, fields) = args
req = objects.QueryFieldsRequest(what=what, fields=fields)
try:
fielddefs = query.ALL_FIELDS[req.what]
except KeyError:
raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
errors.ECODE_INVAL)
return query.QueryFields(fielddefs, req.fields)
job_id = self._QueryFields(args)
elif method == luxi.REQ_QUERY_JOBS:
(job_ids, fields) = args
if isinstance(job_ids, (tuple, list)) and job_ids:
msg = utils.CommaJoin(job_ids)
else:
msg = str(job_ids)
logging.info("Received job query request for %s", msg)
return queue.OldStyleQueryJobs(job_ids, fields)
job_id = self._QueryJobs(args, queue)
elif method == luxi.REQ_QUERY_CONFIG_VALUES:
(fields, ) = args
logging.info("Received config values query request for %s", fields)
op = opcodes.OpClusterConfigQuery(output_fields=fields)
return self._Query(op)
job_id = self._QueryConfigValues(args)
elif method == luxi.REQ_QUERY_CLUSTER_INFO:
logging.info("Received cluster info query request")
op = opcodes.OpClusterQuery()
return self._Query(op)
job_id = self._QueryClusterInfo()
elif method == luxi.REQ_QUERY_TAGS:
(kind, name) = args
logging.info("Received tags query request")
op = opcodes.OpTagsGet(kind=kind, name=name, use_locking=False)
return self._Query(op)
job_id = self._QueryTags(args)
elif method == luxi.REQ_SET_DRAIN_FLAG:
(drain_flag, ) = args
logging.info("Received queue drain flag change request to %s",
drain_flag)
return queue.SetDrainFlag(drain_flag)
job_id = self._SetDrainFlag(args, queue)
elif method == luxi.REQ_SET_WATCHER_PAUSE:
(until, ) = args
# FIXME!
ec_id = None
return _SetWatcherPause(context, ec_id, until)
job_id = self._SetWatcherPause(args, context)
else:
logging.critical("Request '%s' in luxi.REQ_ALL, but not known", method)
raise errors.ProgrammerError("Operation '%s' in luxi.REQ_ALL,"
" but not implemented" % method)
return job_id
def _Query(self, op):
"""Runs the specified opcode and returns the result.
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment