diff --git a/lib/jqueue.py b/lib/jqueue.py index 40fee0027bbc2c1b48d3736d0a6f04e969e78422..cc988964cfdee711204529a64a18f2e53ca44609 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -506,32 +506,35 @@ class JobQueue(object): # Setup worker pool self._wpool = _JobQueueWorkerPool(self) - - # We need to lock here because WorkerPool.AddTask() may start a job while - # we're still doing our work. - self.acquire() try: - for job in self._GetJobsUnlocked(None): - # a failure in loading the job can cause 'None' to be returned - if job is None: - continue + # We need to lock here because WorkerPool.AddTask() may start a job while + # we're still doing our work. + self.acquire() + try: + for job in self._GetJobsUnlocked(None): + # a failure in loading the job can cause 'None' to be returned + if job is None: + continue - status = job.CalcStatus() + status = job.CalcStatus() - if status in (constants.JOB_STATUS_QUEUED, ): - self._wpool.AddTask(job) + if status in (constants.JOB_STATUS_QUEUED, ): + self._wpool.AddTask(job) - elif status in (constants.JOB_STATUS_RUNNING, - constants.JOB_STATUS_WAITLOCK): - logging.warning("Unfinished job %s found: %s", job.id, job) - try: - for op in job.ops: - op.status = constants.OP_STATUS_ERROR - op.result = "Unclean master daemon shutdown" - finally: - self.UpdateJobUnlocked(job) - finally: - self.release() + elif status in (constants.JOB_STATUS_RUNNING, + constants.JOB_STATUS_WAITLOCK): + logging.warning("Unfinished job %s found: %s", job.id, job) + try: + for op in job.ops: + op.status = constants.OP_STATUS_ERROR + op.result = "Unclean master daemon shutdown" + finally: + self.UpdateJobUnlocked(job) + finally: + self.release() + except: + self._wpool.TerminateWorkers() + raise @utils.LockedMethod @_RequireOpenQueue