Commit 6ea72e43 authored by Michael Hanselmann's avatar Michael Hanselmann

jqueue: Work around race condition between job processing and archival

This is a simplified version of a patch I sent earlier to make sure the job
file is only written once with a finalized status.
Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarIustin Pop <iustin@google.com>
parent dc1e2262
...@@ -754,6 +754,9 @@ class _JobQueueWorker(workerpool.BaseWorker): ...@@ -754,6 +754,9 @@ class _JobQueueWorker(workerpool.BaseWorker):
op.status = constants.OP_STATUS_SUCCESS op.status = constants.OP_STATUS_SUCCESS
op.result = result op.result = result
op.end_timestamp = TimeStampNow() op.end_timestamp = TimeStampNow()
if idx == count - 1:
job.lock_status = None
job.end_timestamp = TimeStampNow()
queue.UpdateJobUnlocked(job) queue.UpdateJobUnlocked(job)
finally: finally:
queue.release() queue.release()
...@@ -778,6 +781,8 @@ class _JobQueueWorker(workerpool.BaseWorker): ...@@ -778,6 +781,8 @@ class _JobQueueWorker(workerpool.BaseWorker):
logging.info("Op %s/%s: Error in opcode %s: %s", logging.info("Op %s/%s: Error in opcode %s: %s",
idx + 1, count, op_summary, err) idx + 1, count, op_summary, err)
finally: finally:
job.lock_status = None
job.end_timestamp = TimeStampNow()
queue.UpdateJobUnlocked(job) queue.UpdateJobUnlocked(job)
finally: finally:
queue.release() queue.release()
...@@ -788,6 +793,9 @@ class _JobQueueWorker(workerpool.BaseWorker): ...@@ -788,6 +793,9 @@ class _JobQueueWorker(workerpool.BaseWorker):
try: try:
job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED, job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
"Job canceled by request") "Job canceled by request")
job.lock_status = None
job.end_timestamp = TimeStampNow()
queue.UpdateJobUnlocked(job)
finally: finally:
queue.release() queue.release()
except errors.GenericError, err: except errors.GenericError, err:
...@@ -795,19 +803,8 @@ class _JobQueueWorker(workerpool.BaseWorker): ...@@ -795,19 +803,8 @@ class _JobQueueWorker(workerpool.BaseWorker):
except: except:
logging.exception("Unhandled exception") logging.exception("Unhandled exception")
finally: finally:
queue.acquire(shared=1) status = job.CalcStatus()
try: logging.info("Finished job %s, status = %s", job.id, status)
try:
job.lock_status = None
job.end_timestamp = TimeStampNow()
queue.UpdateJobUnlocked(job)
finally:
job_id = job.id
status = job.CalcStatus()
finally:
queue.release()
logging.info("Finished job %s, status = %s", job_id, status)
class _JobQueueWorkerPool(workerpool.WorkerPool): class _JobQueueWorkerPool(workerpool.WorkerPool):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment