diff --git a/lib/constants.py b/lib/constants.py index 3949f56b7634841e0cd498a7c1dffad91c86510b..05226c03fe0686f88ab84b8f6b4ad7ccf29d9378 100644 --- a/lib/constants.py +++ b/lib/constants.py @@ -453,13 +453,19 @@ JOB_STATUS_CANCELED = "canceled" JOB_STATUS_SUCCESS = "success" JOB_STATUS_ERROR = "error" +# OpCode status +# not yet finalized OP_STATUS_QUEUED = "queued" OP_STATUS_WAITLOCK = "waiting" OP_STATUS_CANCELING = "canceling" OP_STATUS_RUNNING = "running" +# finalized OP_STATUS_CANCELED = "canceled" OP_STATUS_SUCCESS = "success" OP_STATUS_ERROR = "error" +OPS_FINALIZED = frozenset([OP_STATUS_CANCELED, + OP_STATUS_SUCCESS, + OP_STATUS_ERROR]) # Execution log types ELOG_MESSAGE = "message" diff --git a/lib/jqueue.py b/lib/jqueue.py index 083031c077011a5ab339c3699932225ad307d216..20fa1c7b9546cd9252d30222824b1718b1f814d8 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -313,6 +313,26 @@ class _QueuedJob(object): return entries + def MarkUnfinishedOps(self, status, result): + """Mark unfinished opcodes with a given status and result. + + This is an utility function for marking all running or waiting to + be run opcodes with a given status. Opcodes which are already + finalised are not changed. + + @param status: a given opcode status + @param result: the opcode result + + """ + not_marked = True + for op in self.ops: + if op.status in constants.OPS_FINALIZED: + assert not_marked, "Finalized opcodes found after non-finalized ones" + continue + op.status = status + op.result = result + not_marked = False + class _JobQueueWorker(workerpool.BaseWorker): """The actual job workers. @@ -593,9 +613,8 @@ class JobQueue(object): constants.JOB_STATUS_CANCELING): logging.warning("Unfinished job %s found: %s", job.id, job) try: - for op in job.ops: - op.status = constants.OP_STATUS_ERROR - op.result = "Unclean master daemon shutdown" + job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, + "Unclean master daemon shutdown") finally: self.UpdateJobUnlocked(job) @@ -1157,8 +1176,7 @@ class JobQueue(object): elif job_status == constants.JOB_STATUS_WAITLOCK: # The worker will notice the new status and cancel the job try: - for op in job.ops: - op.status = constants.OP_STATUS_CANCELING + job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None) finally: self.UpdateJobUnlocked(job) return (True, "Job %s will be canceled" % job.id) @@ -1169,9 +1187,8 @@ class JobQueue(object): """ try: - for op in job.ops: - op.status = constants.OP_STATUS_CANCELED - op.result = "Job canceled by request" + job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED, + "Job canceled by request") finally: self.UpdateJobUnlocked(job)