Commit 34327f51 authored by Iustin Pop's avatar Iustin Pop
Browse files

job queue: fix loss of finalized opcode result



Currently, unclean master daemon shutdown overwrites all of a job's
opcode status and result with error/None. This is incorrect, since the
any already finished opcode(s) should have their status and result
preserved, and only not-yet-processed opcodes should be marked as
‘error’. Cancelling jobs between opcodes does the same (but this is not
allowed currently by the code, so it's not as important as unclean
shutdown).

This patch adds a new _QueuedJob function that only overwrites the
status and result of finalized opcodes, which is then used in job queue
init and in the cancel job functions. The patch also adds some comments
and a new set constants in constants.py highlighting the finalized vs.
non-finalized opcode statuses.
Signed-off-by: default avatarIustin Pop <iustin@google.com>
Reviewed-by: default avatarGuido Trotter <ultrotter@google.com>
parent b59252fe
......@@ -453,13 +453,19 @@ JOB_STATUS_CANCELED = "canceled"
JOB_STATUS_SUCCESS = "success"
JOB_STATUS_ERROR = "error"
# OpCode status
# not yet finalized
OP_STATUS_QUEUED = "queued"
OP_STATUS_WAITLOCK = "waiting"
OP_STATUS_CANCELING = "canceling"
OP_STATUS_RUNNING = "running"
# finalized
OP_STATUS_CANCELED = "canceled"
OP_STATUS_SUCCESS = "success"
OP_STATUS_ERROR = "error"
OPS_FINALIZED = frozenset([OP_STATUS_CANCELED,
OP_STATUS_SUCCESS,
OP_STATUS_ERROR])
# Execution log types
ELOG_MESSAGE = "message"
......
......@@ -313,6 +313,26 @@ class _QueuedJob(object):
return entries
def MarkUnfinishedOps(self, status, result):
"""Mark unfinished opcodes with a given status and result.
This is an utility function for marking all running or waiting to
be run opcodes with a given status. Opcodes which are already
finalised are not changed.
@param status: a given opcode status
@param result: the opcode result
"""
not_marked = True
for op in self.ops:
if op.status in constants.OPS_FINALIZED:
assert not_marked, "Finalized opcodes found after non-finalized ones"
continue
op.status = status
op.result = result
not_marked = False
class _JobQueueWorker(workerpool.BaseWorker):
"""The actual job workers.
......@@ -593,9 +613,8 @@ class JobQueue(object):
constants.JOB_STATUS_CANCELING):
logging.warning("Unfinished job %s found: %s", job.id, job)
try:
for op in job.ops:
op.status = constants.OP_STATUS_ERROR
op.result = "Unclean master daemon shutdown"
job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
"Unclean master daemon shutdown")
finally:
self.UpdateJobUnlocked(job)
......@@ -1157,8 +1176,7 @@ class JobQueue(object):
elif job_status == constants.JOB_STATUS_WAITLOCK:
# The worker will notice the new status and cancel the job
try:
for op in job.ops:
op.status = constants.OP_STATUS_CANCELING
job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
finally:
self.UpdateJobUnlocked(job)
return (True, "Job %s will be canceled" % job.id)
......@@ -1169,9 +1187,8 @@ class JobQueue(object):
"""
try:
for op in job.ops:
op.status = constants.OP_STATUS_CANCELED
op.result = "Job canceled by request"
job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
"Job canceled by request")
finally:
self.UpdateJobUnlocked(job)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment