Skip to content
Snippets Groups Projects
Commit 7b5c4a69 authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

jqueue: Use priority for worker pool


A small helper function is added to make this easier. Priorities are not
yet used in all necessary places.

Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarIustin Pop <iustin@google.com>
parent a0d2fe2c
No related branches found
No related tags found
No related merge requests found
...@@ -975,6 +975,8 @@ class JobQueue(object): ...@@ -975,6 +975,8 @@ class JobQueue(object):
""" """
logging.info("Inspecting job queue") logging.info("Inspecting job queue")
restartjobs = []
all_job_ids = self._GetJobIDsUnlocked() all_job_ids = self._GetJobIDsUnlocked()
jobs_count = len(all_job_ids) jobs_count = len(all_job_ids)
lastinfo = time.time() lastinfo = time.time()
...@@ -995,7 +997,7 @@ class JobQueue(object): ...@@ -995,7 +997,7 @@ class JobQueue(object):
status = job.CalcStatus() status = job.CalcStatus()
if status in (constants.JOB_STATUS_QUEUED, ): if status in (constants.JOB_STATUS_QUEUED, ):
self._wpool.AddTask((job, )) restartjobs.append(job)
elif status in (constants.JOB_STATUS_RUNNING, elif status in (constants.JOB_STATUS_RUNNING,
constants.JOB_STATUS_WAITLOCK, constants.JOB_STATUS_WAITLOCK,
...@@ -1005,6 +1007,10 @@ class JobQueue(object): ...@@ -1005,6 +1007,10 @@ class JobQueue(object):
"Unclean master daemon shutdown") "Unclean master daemon shutdown")
self.UpdateJobUnlocked(job) self.UpdateJobUnlocked(job)
if restartjobs:
logging.info("Restarting %s jobs", len(restartjobs))
self._EnqueueJobs(restartjobs)
logging.info("Job queue inspection finished") logging.info("Job queue inspection finished")
@locking.ssynchronized(_LOCK) @locking.ssynchronized(_LOCK)
...@@ -1434,7 +1440,7 @@ class JobQueue(object): ...@@ -1434,7 +1440,7 @@ class JobQueue(object):
""" """
job_id = self._NewSerialsUnlocked(1)[0] job_id = self._NewSerialsUnlocked(1)[0]
self._wpool.AddTask((self._SubmitJobUnlocked(job_id, ops), )) self._EnqueueJobs([self._SubmitJobUnlocked(job_id, ops)])
return job_id return job_id
@locking.ssynchronized(_LOCK) @locking.ssynchronized(_LOCK)
...@@ -1446,21 +1452,32 @@ class JobQueue(object): ...@@ -1446,21 +1452,32 @@ class JobQueue(object):
""" """
results = [] results = []
tasks = [] added_jobs = []
all_job_ids = self._NewSerialsUnlocked(len(jobs)) all_job_ids = self._NewSerialsUnlocked(len(jobs))
for job_id, ops in zip(all_job_ids, jobs): for job_id, ops in zip(all_job_ids, jobs):
try: try:
tasks.append((self._SubmitJobUnlocked(job_id, ops), )) added_jobs.append(self._SubmitJobUnlocked(job_id, ops))
status = True status = True
data = job_id data = job_id
except errors.GenericError, err: except errors.GenericError, err:
data = str(err) data = str(err)
status = False status = False
results.append((status, data)) results.append((status, data))
self._wpool.AddManyTasks(tasks)
self._EnqueueJobs(added_jobs)
return results return results
def _EnqueueJobs(self, jobs):
"""Helper function to add jobs to worker pool's queue.
@type jobs: list
@param jobs: List of all jobs
"""
self._wpool.AddManyTasks([(job, ) for job in jobs],
priority=[job.CalcPriority() for job in jobs])
@_RequireOpenQueue @_RequireOpenQueue
def UpdateJobUnlocked(self, job, replicate=True): def UpdateJobUnlocked(self, job, replicate=True):
"""Update a job's on disk storage. """Update a job's on disk storage.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment