diff --git a/lib/jqueue.py b/lib/jqueue.py index ed6fc0c2b5d4271424946de1c3ba90251c2444b6..a0402a05f8dc4d5ece41b94a8c60716bb4c22b03 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -381,14 +381,17 @@ class _QueuedJob(object): @param result: the opcode result """ - not_marked = True - for op in self.ops: - if op.status in constants.OPS_FINALIZED: - assert not_marked, "Finalized opcodes found after non-finalized ones" - continue - op.status = status - op.result = result - not_marked = False + try: + not_marked = True + for op in self.ops: + if op.status in constants.OPS_FINALIZED: + assert not_marked, "Finalized opcodes found after non-finalized ones" + continue + op.status = status + op.result = result + not_marked = False + finally: + self.queue.UpdateJobUnlocked(self) class _OpExecCallbacks(mcpu.OpExecCbBase): @@ -670,7 +673,8 @@ class _JobQueueWorker(workerpool.BaseWorker): except CancelJob: queue.acquire() try: - queue.CancelJobUnlocked(job) + job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED, + "Job canceled by request") finally: queue.release() except errors.GenericError, err: @@ -817,11 +821,8 @@ class JobQueue(object): constants.JOB_STATUS_WAITLOCK, constants.JOB_STATUS_CANCELING): logging.warning("Unfinished job %s found: %s", job.id, job) - try: - job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, - "Unclean master daemon shutdown") - finally: - self.UpdateJobUnlocked(job) + job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, + "Unclean master daemon shutdown") logging.info("Job queue inspection finished") finally: @@ -1344,28 +1345,15 @@ class JobQueue(object): return (False, "Job %s is no longer waiting in the queue" % job.id) if job_status == constants.JOB_STATUS_QUEUED: - self.CancelJobUnlocked(job) + job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED, + "Job canceled by request") return (True, "Job %s canceled" % job.id) elif job_status == constants.JOB_STATUS_WAITLOCK: # The worker will notice the new status and cancel the job - try: - job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None) - finally: - self.UpdateJobUnlocked(job) + job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None) return (True, "Job %s will be canceled" % job.id) - @_RequireOpenQueue - def CancelJobUnlocked(self, job): - """Marks a job as canceled. - - """ - try: - job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED, - "Job canceled by request") - finally: - self.UpdateJobUnlocked(job) - @_RequireOpenQueue def _ArchiveJobsUnlocked(self, jobs): """Archives jobs.