Commit dfe57c22 authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

Add RPC call to wait for job changes

This way clients can react faster to status or message changes and
don't have to poll anymore.

Reviewed-by: ultrotter
parent d5e317ba
......@@ -217,6 +217,10 @@ class ClientOps:
job_id = args
return queue.ArchiveJob(job_id)
elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
(job_id, fields, previous) = args
return queue.WaitForJobChanges(job_id, fields, previous)
elif method == luxi.REQ_QUERY_JOBS:
(job_ids, fields) = args
return queue.QueryJobs(job_ids, fields)
......
......@@ -121,6 +121,12 @@ class _QueuedJob(object):
This is what we use to track the user-submitted jobs.
"""
def __new__(cls, *args, **kwargs):
obj = object.__new__(cls, *args, **kwargs)
# Condition to wait for changes
obj.change = threading.Condition()
return obj
def __init__(self, queue, job_id, ops):
if not ops:
# TODO
......@@ -204,7 +210,16 @@ class _JobQueueWorker(workerpool.BaseWorker):
finally:
queue.release()
result = proc.ExecOpCode(input_opcode, op.Log)
def _Log(*args):
op.Log(*args)
job.change.acquire()
try:
job.change.notifyAll()
finally:
job.change.release()
result = proc.ExecOpCode(input_opcode, _Log)
queue.acquire()
try:
......@@ -516,6 +531,13 @@ class JobQueue(object):
self._WriteAndReplicateFileUnlocked(filename, data)
self._CleanCacheUnlocked([job.id])
# Notify waiters about potential changes
job.change.acquire()
try:
job.change.notifyAll()
finally:
job.change.release()
def _CleanCacheUnlocked(self, exclude):
"""Clean the memory cache.
......@@ -536,6 +558,43 @@ class JobQueue(object):
except KeyError:
pass
@_RequireOpenQueue
def WaitForJobChanges(self, job_id, fields, previous):
logging.debug("Waiting for changes in job %s", job_id)
while True:
self.acquire()
try:
job = self._LoadJobUnlocked(job_id)
if not job:
logging.debug("Job %s not found", job_id)
new_state = None
break
new_state = self._GetJobInfoUnlocked(job, fields)
finally:
self.release()
# Serializing and deserializing data can cause type changes (e.g. from
# tuple to list) or precision loss. We're doing it here so that we get
# the same modifications as the data received from the client. Without
# this, the comparison afterwards might fail without the data being
# significantly different.
new_state = serializer.LoadJson(serializer.DumpJson(new_state))
if previous != new_state:
break
job.change.acquire()
try:
job.change.wait()
finally:
job.change.release()
logging.debug("Job %s changed", job_id)
return new_state
@utils.LockedMethod
@_RequireOpenQueue
def CancelJob(self, job_id):
......
......@@ -44,6 +44,7 @@ KEY_SUCCESS = "success"
KEY_RESULT = "result"
REQ_SUBMIT_JOB = "SubmitJob"
REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
REQ_CANCEL_JOB = "CancelJob"
REQ_ARCHIVE_JOB = "ArchiveJob"
REQ_QUERY_JOBS = "QueryJobs"
......@@ -288,6 +289,10 @@ class Client(object):
def ArchiveJob(self, job_id):
return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
def WaitForJobChange(self, job_id, fields, previous):
return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
(job_id, fields, previous))
def QueryJobs(self, job_ids, fields):
return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment