Commit 793a8f7c authored by Michael Hanselmann's avatar Michael Hanselmann

RAPI: Allow waiting for job changes

Signed-off-by: default avatarMichael Hanselmann <>
Reviewed-by: default avatarIustin Pop <>
parent cf9ada49
...@@ -651,6 +651,30 @@ while by a resource we refer to an instance's disk, or NIC, etc. ...@@ -651,6 +651,30 @@ while by a resource we refer to an instance's disk, or NIC, etc.
Cancel a not-yet-started job. Cancel a not-yet-started job.
Waits for changes on a job. Takes the following body parameters in a
The job fields on which to watch for changes.
Previously received field values or None if not yet available.
Highest log serial number received so far or None if not yet
Returns None if no changes have been detected and a dict with two keys,
``job_info`` and ``log_entries`` otherwise.
``/2/nodes`` ``/2/nodes``
++++++++++++ ++++++++++++
...@@ -64,6 +64,9 @@ REQ_SET_WATCHER_PAUSE = "SetWatcherPause" ...@@ -64,6 +64,9 @@ REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
# WaitForJobChange timeout
class ProtocolError(Exception): class ProtocolError(Exception):
"""Denotes an error in the server communication""" """Denotes an error in the server communication"""
...@@ -373,11 +376,27 @@ class Client(object): ...@@ -373,11 +376,27 @@ class Client(object):
return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout)) return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
def WaitForJobChangeOnce(self, job_id, fields, def WaitForJobChangeOnce(self, job_id, fields,
prev_job_info, prev_log_serial): prev_job_info, prev_log_serial,
timeout = (DEF_RWTO - 1) / 2 timeout=WFJC_TIMEOUT):
"""Waits for changes on a job.
@param job_id: Job ID
@type fields: list
@param fields: List of field names to be observed
@type prev_job_info: None or list
@param prev_job_info: Previously received job information
@type prev_log_serial: None or int/long
@param prev_log_serial: Highest log serial number previously received
@type timeout: int/float
@param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
be capped to that value)
assert timeout >= 0, "Timeout can not be negative"
return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE, return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
(job_id, fields, prev_job_info, (job_id, fields, prev_job_info,
prev_log_serial, timeout)) prev_log_serial,
min(WFJC_TIMEOUT, timeout)))
def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial): def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
while True: while True:
...@@ -209,6 +209,8 @@ def GetHandlers(node_name_pattern, instance_name_pattern, job_id_pattern): ...@@ -209,6 +209,8 @@ def GetHandlers(node_name_pattern, instance_name_pattern, job_id_pattern):
"/2/jobs": rlib2.R_2_jobs, "/2/jobs": rlib2.R_2_jobs,
re.compile(r'/2/jobs/(%s)$' % job_id_pattern): re.compile(r'/2/jobs/(%s)$' % job_id_pattern):
rlib2.R_2_jobs_id, rlib2.R_2_jobs_id,
re.compile(r'/2/jobs/(%s)/wait$' % job_id_pattern):
"/2/tags": rlib2.R_2_tags, "/2/tags": rlib2.R_2_tags,
"/2/info": rlib2.R_2_info, "/2/info": rlib2.R_2_info,
...@@ -83,6 +83,9 @@ _NR_MAP = { ...@@ -83,6 +83,9 @@ _NR_MAP = {
} }
# Timeout for /2/jobs/[job_id]/wait. Gives job up to 10 seconds to change.
class R_version(baserlib.R_Generic): class R_version(baserlib.R_Generic):
"""/version resource. """/version resource.
...@@ -211,6 +214,55 @@ class R_2_jobs_id(baserlib.R_Generic): ...@@ -211,6 +214,55 @@ class R_2_jobs_id(baserlib.R_Generic):
return result return result
class R_2_jobs_id_wait(baserlib.R_Generic):
"""/2/jobs/[job_id]/wait resource.
# WaitForJobChange provides access to sensitive information and blocks
# machine resources (it's a blocking RAPI call), hence restricting access.
def GET(self):
"""Waits for job changes.
job_id = self.items[0]
fields = self.getBodyParameter("fields")
prev_job_info = self.getBodyParameter("previous_job_info", None)
prev_log_serial = self.getBodyParameter("previous_log_serial", None)
if not isinstance(fields, list):
raise http.HttpBadRequest("The 'fields' parameter should be a list")
if not (prev_job_info is None or isinstance(prev_job_info, list)):
raise http.HttpBadRequest("The 'previous_job_info' parameter should"
" be a list")
if not (prev_log_serial is None or
isinstance(prev_log_serial, (int, long))):
raise http.HttpBadRequest("The 'previous_log_serial' parameter should"
" be a number")
client = baserlib.GetClient()
result = client.WaitForJobChangeOnce(job_id, fields,
prev_job_info, prev_log_serial,
if not result:
raise http.HttpNotFound()
if result == constants.JOB_NOTCHANGED:
# No changes
return None
(job_info, log_entries) = result
return {
"job_info": job_info,
"log_entries": log_entries,
class R_2_nodes(baserlib.R_Generic): class R_2_nodes(baserlib.R_Generic):
"""/2/nodes resource. """/2/nodes resource.
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment