Commit 5c735209 authored by Iustin Pop's avatar Iustin Pop
Browse files

Make WaitForJobChanges deal with long jobs

This patch alters the WaitForJobChanges luxi-RPC call to have a
configurable timeout, so that the call behaves nicely with long jobs
that have no update.

We do this by adding a timeout parameter in the RPC call, and returning
a special constant when the timeout is reached without an update. The
luxi client will repeatedly call the WaitForJobChanges until it gets a
real change. The timeout is hardcoded as half the RWTO value.

The patch also removes an unused variable (new_state) from the
WaitForJobChanges method.

Reviewed-by: imsnah,ultrotter
parent 3fc175f0
......@@ -217,9 +217,9 @@ class ClientOps:
return queue.ArchiveJob(job_id)
elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
(job_id, fields, prev_job_info, prev_log_serial) = args
(job_id, fields, prev_job_info, prev_log_serial, timeout) = args
return queue.WaitForJobChanges(job_id, fields, prev_job_info,
prev_log_serial)
prev_log_serial, timeout)
elif method == luxi.REQ_QUERY_JOBS:
(job_ids, fields) = args
......
......@@ -279,6 +279,9 @@ JOB_QUEUE_ARCHIVE_DIR = QUEUE_DIR + "/archive"
JOB_ID_TEMPLATE = r"\d+"
# unchanged job return
JOB_NOTCHANGED = "nochange"
# Job status
JOB_STATUS_QUEUED = "queued"
JOB_STATUS_RUNNING = "running"
......
......@@ -558,7 +558,8 @@ class JobQueue(object):
@utils.LockedMethod
@_RequireOpenQueue
def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial):
def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
timeout):
"""Waits for changes in a job.
@type job_id: string
......@@ -569,15 +570,20 @@ class JobQueue(object):
@param prev_job_info: Last job information returned
@type prev_log_serial: int
@param prev_log_serial: Last job message serial number
@type timeout: float
@param timeout: maximum time to wait
"""
logging.debug("Waiting for changes in job %s", job_id)
end_time = time.time() + timeout
while True:
delta_time = end_time - time.time()
if delta_time < 0:
return constants.JOB_NOTCHANGED
job = self._LoadJobUnlocked(job_id)
if not job:
logging.debug("Job %s not found", job_id)
new_state = None
break
status = job.CalcStatus()
......@@ -605,7 +611,7 @@ class JobQueue(object):
logging.debug("Waiting again")
# Release the queue lock while waiting
job.change.wait()
job.change.wait(delta_time)
logging.debug("Job %s changed", job_id)
......
......@@ -290,8 +290,14 @@ class Client(object):
return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
(job_id, fields, prev_job_info, prev_log_serial))
timeout = (DEF_RWTO - 1) / 2
while True:
result = self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
(job_id, fields, prev_job_info,
prev_log_serial, timeout))
if result != constants.JOB_NOTCHANGED:
break
return result
def QueryJobs(self, job_ids, fields):
return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment