Commit 39ed3a98 authored by Guido Trotter's avatar Guido Trotter
Browse files

MarkUnfinishedOps: update job file on disk



Every time we call MarkUnfinishedOps we do it in a try/finally block
that updates the job file. With this patch we move the try/finally
inside. CancelJobUnlocked is removed, because it just becomes a wrapper
over MarkUnfinishedOps with two constant values.
Signed-off-by: default avatarGuido Trotter <ultrotter@google.com>
Reviewed-by: default avatarIustin Pop <iustin@google.com>
parent a1bfdeb1
......@@ -381,14 +381,17 @@ class _QueuedJob(object):
@param result: the opcode result
"""
not_marked = True
for op in self.ops:
if op.status in constants.OPS_FINALIZED:
assert not_marked, "Finalized opcodes found after non-finalized ones"
continue
op.status = status
op.result = result
not_marked = False
try:
not_marked = True
for op in self.ops:
if op.status in constants.OPS_FINALIZED:
assert not_marked, "Finalized opcodes found after non-finalized ones"
continue
op.status = status
op.result = result
not_marked = False
finally:
self.queue.UpdateJobUnlocked(self)
class _OpExecCallbacks(mcpu.OpExecCbBase):
......@@ -670,7 +673,8 @@ class _JobQueueWorker(workerpool.BaseWorker):
except CancelJob:
queue.acquire()
try:
queue.CancelJobUnlocked(job)
job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
"Job canceled by request")
finally:
queue.release()
except errors.GenericError, err:
......@@ -817,11 +821,8 @@ class JobQueue(object):
constants.JOB_STATUS_WAITLOCK,
constants.JOB_STATUS_CANCELING):
logging.warning("Unfinished job %s found: %s", job.id, job)
try:
job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
"Unclean master daemon shutdown")
finally:
self.UpdateJobUnlocked(job)
job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
"Unclean master daemon shutdown")
logging.info("Job queue inspection finished")
finally:
......@@ -1344,28 +1345,15 @@ class JobQueue(object):
return (False, "Job %s is no longer waiting in the queue" % job.id)
if job_status == constants.JOB_STATUS_QUEUED:
self.CancelJobUnlocked(job)
job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
"Job canceled by request")
return (True, "Job %s canceled" % job.id)
elif job_status == constants.JOB_STATUS_WAITLOCK:
# The worker will notice the new status and cancel the job
try:
job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
finally:
self.UpdateJobUnlocked(job)
job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
return (True, "Job %s will be canceled" % job.id)
@_RequireOpenQueue
def CancelJobUnlocked(self, job):
"""Marks a job as canceled.
"""
try:
job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
"Job canceled by request")
finally:
self.UpdateJobUnlocked(job)
@_RequireOpenQueue
def _ArchiveJobsUnlocked(self, jobs):
"""Archives jobs.
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment