Commit e07f7f7a authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

Add job support to query2 via LUXI



This enables the use of filters through query2 when listing jobs.
Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarIustin Pop <iustin@google.com>
parent dc2879ea
......@@ -2435,7 +2435,42 @@ class JobQueue(object):
return (archived_count, len(all_job_ids) - last_touched)
def QueryJobs(self, job_ids, fields):
def _Query(self, fields, qfilter):
qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
namefield="id")
job_ids = qobj.RequestedNames()
list_all = (job_ids is None)
if list_all:
# Since files are added to/removed from the queue atomically, there's no
# risk of getting the job ids in an inconsistent state.
job_ids = self._GetJobIDsUnlocked()
jobs = []
for job_id in job_ids:
job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
if job is not None or not list_all:
jobs.append((job_id, job))
return (qobj, jobs, list_all)
def QueryJobs(self, fields, qfilter):
"""Returns a list of jobs in queue.
@type fields: sequence
@param fields: List of wanted fields
@type qfilter: None or query2 filter (list)
@param qfilter: Query filter
"""
(qobj, ctx, sort_by_name) = self._Query(fields, qfilter)
return query.GetQueryResponse(qobj, ctx, sort_by_name=sort_by_name)
def OldStyleQueryJobs(self, job_ids, fields):
"""Returns a list of jobs in queue.
@type job_ids: list
......@@ -2447,22 +2482,11 @@ class JobQueue(object):
the requested fields
"""
jobs = []
list_all = False
if not job_ids:
# Since files are added to/removed from the queue atomically, there's no
# risk of getting the job ids in an inconsistent state.
job_ids = self._GetJobIDsUnlocked()
list_all = True
qfilter = qlang.MakeSimpleFilter("id", job_ids)
for job_id in job_ids:
job = self.SafeLoadJobFromDisk(job_id, True)
if job is not None:
jobs.append(job.GetInfo(fields))
elif not list_all:
jobs.append(None)
(qobj, ctx, sort_by_name) = self._Query(fields, qfilter)
return jobs
return qobj.OldStyleQuery(ctx, sort_by_name=sort_by_name)
@locking.ssynchronized(_LOCK)
def PrepareShutdown(self):
......
......@@ -255,7 +255,8 @@ class ClientOps:
self.server = server
def handle_request(self, method, args): # pylint: disable=R0911
queue = self.server.context.jobqueue
context = self.server.context
queue = context.jobqueue
# TODO: Parameter validation
if not isinstance(args, (tuple, list)):
......@@ -308,7 +309,9 @@ class ClientOps:
elif req.what == constants.QR_LOCK:
if req.qfilter is not None:
raise errors.OpPrereqError("Lock queries can't be filtered")
return self.server.context.glm.QueryLocks(req.fields)
return context.glm.QueryLocks(req.fields)
elif req.what == constants.QR_JOB:
return queue.QueryJobs(req.fields, req.qfilter)
elif req.what in constants.QR_VIA_LUXI:
raise NotImplementedError
else:
......@@ -336,7 +339,7 @@ class ClientOps:
else:
msg = str(job_ids)
logging.info("Received job query request for %s", msg)
return queue.QueryJobs(job_ids, fields)
return queue.OldStyleQueryJobs(job_ids, fields)
elif method == luxi.REQ_QUERY_INSTANCES:
(names, fields, use_locking) = args
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment