Commit 6bcb1446 authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

jqueue: Convert to utils.Retry


Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarIustin Pop <iustin@google.com>
parent 06b78e8b
......@@ -2096,6 +2096,8 @@ def _WaitForSync(lu, instance, oneshot=False, unlock=False):
for dev in instance.disks:
lu.cfg.SetDiskID(dev, node)
# TODO: Convert to utils.Retry
retries = 0
degr_retries = 10 # in seconds, as we sleep 1 second each time
while True:
......
......@@ -1140,21 +1140,13 @@ class JobQueue(object):
as such by the clients
"""
logging.debug("Waiting for changes in job %s", job_id)
job_info = None
log_entries = None
end_time = time.time() + timeout
while True:
delta_time = end_time - time.time()
if delta_time < 0:
return constants.JOB_NOTCHANGED
job = self._LoadJobUnlocked(job_id)
if not job:
logging.debug("Job %s not found", job_id)
return None
job = self._LoadJobUnlocked(job_id)
if not job:
logging.debug("Job %s not found", job_id)
break
def _CheckForChanges():
logging.debug("Waiting for changes in job %s", job_id)
status = job.CalcStatus()
job_info = self._GetJobInfoUnlocked(job, fields)
......@@ -1168,28 +1160,24 @@ class JobQueue(object):
job_info = serializer.LoadJson(serializer.DumpJson(job_info))
log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
if status not in (constants.JOB_STATUS_QUEUED,
constants.JOB_STATUS_RUNNING,
constants.JOB_STATUS_WAITLOCK):
# Don't even try to wait if the job is no longer running, there will be
# no changes.
break
if (prev_job_info != job_info or
# Don't even try to wait if the job is no longer running, there will be
# no changes.
if (status not in (constants.JOB_STATUS_QUEUED,
constants.JOB_STATUS_RUNNING,
constants.JOB_STATUS_WAITLOCK) or
prev_job_info != job_info or
(log_entries and prev_log_serial != log_entries[0][0])):
break
logging.debug("Waiting again")
logging.debug("Job %s changed", job_id)
return (job_info, log_entries)
# Release the queue lock while waiting
job.change.wait(delta_time)
raise utils.RetryAgain()
logging.debug("Job %s changed", job_id)
if job_info is None and log_entries is None:
return None
else:
return (job_info, log_entries)
try:
# Setting wait function to release the queue lock while waiting
return utils.Retry(_CheckForChanges, utils.RETRY_REMAINING_TIME, timeout,
wait_fn=job.change.wait)
except utils.RetryTimeout:
return constants.JOB_NOTCHANGED
@utils.LockedMethod
@_RequireOpenQueue
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment