diff --git a/lib/jqueue.py b/lib/jqueue.py index 5a352914aa77ea14557414f17f70276327434c1b..c2d1a2b341c7af662bff74cec62d7a9deb034c64 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -975,6 +975,8 @@ class JobQueue(object): """ logging.info("Inspecting job queue") + restartjobs = [] + all_job_ids = self._GetJobIDsUnlocked() jobs_count = len(all_job_ids) lastinfo = time.time() @@ -995,7 +997,7 @@ class JobQueue(object): status = job.CalcStatus() if status in (constants.JOB_STATUS_QUEUED, ): - self._wpool.AddTask((job, )) + restartjobs.append(job) elif status in (constants.JOB_STATUS_RUNNING, constants.JOB_STATUS_WAITLOCK, @@ -1005,6 +1007,10 @@ class JobQueue(object): "Unclean master daemon shutdown") self.UpdateJobUnlocked(job) + if restartjobs: + logging.info("Restarting %s jobs", len(restartjobs)) + self._EnqueueJobs(restartjobs) + logging.info("Job queue inspection finished") @locking.ssynchronized(_LOCK) @@ -1434,7 +1440,7 @@ class JobQueue(object): """ job_id = self._NewSerialsUnlocked(1)[0] - self._wpool.AddTask((self._SubmitJobUnlocked(job_id, ops), )) + self._EnqueueJobs([self._SubmitJobUnlocked(job_id, ops)]) return job_id @locking.ssynchronized(_LOCK) @@ -1446,21 +1452,32 @@ class JobQueue(object): """ results = [] - tasks = [] + added_jobs = [] all_job_ids = self._NewSerialsUnlocked(len(jobs)) for job_id, ops in zip(all_job_ids, jobs): try: - tasks.append((self._SubmitJobUnlocked(job_id, ops), )) + added_jobs.append(self._SubmitJobUnlocked(job_id, ops)) status = True data = job_id except errors.GenericError, err: data = str(err) status = False results.append((status, data)) - self._wpool.AddManyTasks(tasks) + + self._EnqueueJobs(added_jobs) return results + def _EnqueueJobs(self, jobs): + """Helper function to add jobs to worker pool's queue. + + @type jobs: list + @param jobs: List of all jobs + + """ + self._wpool.AddManyTasks([(job, ) for job in jobs], + priority=[job.CalcPriority() for job in jobs]) + @_RequireOpenQueue def UpdateJobUnlocked(self, job, replicate=True): """Update a job's on disk storage.