Commit de9d02c7 authored by Michael Hanselmann's avatar Michael Hanselmann

jqueue: Move queue inspection into separate function

This makes the __init__ function a lot smaller while not changing
functionality.
Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarRené Nussbaumer <rn@google.com>
parent 747f6113
......@@ -910,48 +910,53 @@ class JobQueue(object):
# Setup worker pool
self._wpool = _JobQueueWorkerPool(self)
try:
# We need to lock here because WorkerPool.AddTask() may start a job while
# we're still doing our work.
self.acquire()
try:
logging.info("Inspecting job queue")
self._InspectQueue()
except:
self._wpool.TerminateWorkers()
raise
@locking.ssynchronized(_LOCK)
@_RequireOpenQueue
def _InspectQueue(self):
"""Loads the whole job queue and resumes unfinished jobs.
This function needs the lock here because WorkerPool.AddTask() may start a
job while we're still doing our work.
"""
logging.info("Inspecting job queue")
all_job_ids = self._GetJobIDsUnlocked()
jobs_count = len(all_job_ids)
all_job_ids = self._GetJobIDsUnlocked()
jobs_count = len(all_job_ids)
lastinfo = time.time()
for idx, job_id in enumerate(all_job_ids):
# Give an update every 1000 jobs or 10 seconds
if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
idx == (jobs_count - 1)):
logging.info("Job queue inspection: %d/%d (%0.1f %%)",
idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
lastinfo = time.time()
for idx, job_id in enumerate(all_job_ids):
# Give an update every 1000 jobs or 10 seconds
if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
idx == (jobs_count - 1)):
logging.info("Job queue inspection: %d/%d (%0.1f %%)",
idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
lastinfo = time.time()
job = self._LoadJobUnlocked(job_id)
# a failure in loading the job can cause 'None' to be returned
if job is None:
continue
status = job.CalcStatus()
job = self._LoadJobUnlocked(job_id)
if status in (constants.JOB_STATUS_QUEUED, ):
self._wpool.AddTask((job, ))
# a failure in loading the job can cause 'None' to be returned
if job is None:
continue
elif status in (constants.JOB_STATUS_RUNNING,
constants.JOB_STATUS_WAITLOCK,
constants.JOB_STATUS_CANCELING):
logging.warning("Unfinished job %s found: %s", job.id, job)
job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
"Unclean master daemon shutdown")
self.UpdateJobUnlocked(job)
status = job.CalcStatus()
logging.info("Job queue inspection finished")
finally:
self.release()
except:
self._wpool.TerminateWorkers()
raise
if status in (constants.JOB_STATUS_QUEUED, ):
self._wpool.AddTask((job, ))
elif status in (constants.JOB_STATUS_RUNNING,
constants.JOB_STATUS_WAITLOCK,
constants.JOB_STATUS_CANCELING):
logging.warning("Unfinished job %s found: %s", job.id, job)
job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
"Unclean master daemon shutdown")
self.UpdateJobUnlocked(job)
logging.info("Job queue inspection finished")
@locking.ssynchronized(_LOCK)
@_RequireOpenQueue
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment