Commit fcd70b89 authored by Klaus Aehlig's avatar Klaus Aehlig

Factor out functionality to pickup a job from the queue

The current restart procedure for masterd includes functionality
to pick up a job from the queue and restart it, if it hasn't been
started before. Move this functionality into a separate function
to be able to have the enqueuing be done by luxid.
Signed-off-by: default avatarKlaus Aehlig <aehlig@google.com>
Reviewed-by: default avatarMichele Tartara <mtartara@google.com>
parent 32b07c5f
......@@ -1712,6 +1712,44 @@ class JobQueue(object):
self._wpool.TerminateWorkers()
raise
def _PickupJobUnlocked(self, job_id):
"""Load a job from the job queue
Pick up a job that already is in the job queue and start/resume it.
"""
job = self._LoadJobUnlocked(job_id)
if job is None:
logging.warning("Job %s could not be read", job_id)
return
status = job.CalcStatus()
if status == constants.JOB_STATUS_QUEUED:
self._EnqueueJobsUnlocked([job])
logging.info("Restarting job %s", job.id)
elif status in (constants.JOB_STATUS_RUNNING,
constants.JOB_STATUS_WAITING,
constants.JOB_STATUS_CANCELING):
logging.warning("Unfinished job %s found: %s", job.id, job)
if status == constants.JOB_STATUS_WAITING:
job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
self._EnqueueJobsUnlocked([job])
logging.info("Restarting job %s", job.id)
else:
job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
"Unclean master daemon shutdown")
job.Finalize()
self.UpdateJobUnlocked(job)
@locking.ssynchronized(_LOCK)
def PickupJob(self, job_id):
self._PickupJobUnlocked(job_id)
@locking.ssynchronized(_LOCK)
@_RequireOpenQueue
def _InspectQueue(self):
......@@ -1723,8 +1761,6 @@ class JobQueue(object):
"""
logging.info("Inspecting job queue")
restartjobs = []
all_job_ids = self._GetJobIDsUnlocked()
jobs_count = len(all_job_ids)
lastinfo = time.time()
......@@ -1736,36 +1772,7 @@ class JobQueue(object):
idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
lastinfo = time.time()
job = self._LoadJobUnlocked(job_id)
# a failure in loading the job can cause 'None' to be returned
if job is None:
continue
status = job.CalcStatus()
if status == constants.JOB_STATUS_QUEUED:
restartjobs.append(job)
elif status in (constants.JOB_STATUS_RUNNING,
constants.JOB_STATUS_WAITING,
constants.JOB_STATUS_CANCELING):
logging.warning("Unfinished job %s found: %s", job.id, job)
if status == constants.JOB_STATUS_WAITING:
# Restart job
job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
restartjobs.append(job)
else:
job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
"Unclean master daemon shutdown")
job.Finalize()
self.UpdateJobUnlocked(job)
if restartjobs:
logging.info("Restarting %s jobs", len(restartjobs))
self._EnqueueJobsUnlocked(restartjobs)
self._PickupJobUnlocked(job_id)
logging.info("Job queue inspection finished")
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment