Commit fbf0262f authored by Michael Hanselmann's avatar Michael Hanselmann

jqueue: Allow jobs waiting for locks to be canceled

- Add new "canceling" status
- Notify clients when job is canceled
- Give a return value from CancelJob
- Handle it in the client library

Reviewed-by: iustinp
parent 87622829
......@@ -560,7 +560,10 @@ def PollJob(job_id, cl=None, feedback_fn=None):
prev_logmsg_serial = max(prev_logmsg_serial, serial)
# TODO: Handle canceled and archived jobs
elif status in (constants.JOB_STATUS_SUCCESS, constants.JOB_STATUS_ERROR):
elif status in (constants.JOB_STATUS_SUCCESS,
constants.JOB_STATUS_ERROR,
constants.JOB_STATUS_CANCELING,
constants.JOB_STATUS_CANCELED):
break
prev_job_info = job_info
......@@ -572,6 +575,9 @@ def PollJob(job_id, cl=None, feedback_fn=None):
status, opstatus, result = jobs[0]
if status == constants.JOB_STATUS_SUCCESS:
return result
elif status in (constants.JOB_STATUS_CANCELING,
constants.JOB_STATUS_CANCELED):
raise errors.OpExecError("Job was canceled")
else:
has_ok = False
for idx, (status, msg) in enumerate(zip(opstatus, result)):
......
......@@ -353,6 +353,7 @@ JOB_NOTCHANGED = "nochange"
# Job status
JOB_STATUS_QUEUED = "queued"
JOB_STATUS_WAITLOCK = "waiting"
JOB_STATUS_CANCELING = "canceling"
JOB_STATUS_RUNNING = "running"
JOB_STATUS_CANCELED = "canceled"
JOB_STATUS_SUCCESS = "success"
......@@ -360,6 +361,7 @@ JOB_STATUS_ERROR = "error"
OP_STATUS_QUEUED = "queued"
OP_STATUS_WAITLOCK = "waiting"
OP_STATUS_CANCELING = "canceling"
OP_STATUS_RUNNING = "running"
OP_STATUS_CANCELED = "canceled"
OP_STATUS_SUCCESS = "success"
......
......@@ -47,9 +47,16 @@ from ganeti import utils
from ganeti import jstore
from ganeti import rpc
JOBQUEUE_THREADS = 25
class CancelJob:
"""Special exception to cancel a job.
"""
def TimeStampNow():
"""Returns the current timestamp.
......@@ -232,6 +239,7 @@ class _QueuedJob(object):
status will be the same
- otherwise, the last opcode with the status one of:
- waitlock
- canceling
- running
will determine the job status
......@@ -257,6 +265,9 @@ class _QueuedJob(object):
status = constants.JOB_STATUS_WAITLOCK
elif op.status == constants.OP_STATUS_RUNNING:
status = constants.JOB_STATUS_RUNNING
elif op.status == constants.OP_STATUS_CANCELING:
status = constants.JOB_STATUS_CANCELING
break
elif op.status == constants.OP_STATUS_ERROR:
status = constants.JOB_STATUS_ERROR
# The whole job fails if one opcode failed
......@@ -311,6 +322,13 @@ class _JobQueueWorker(workerpool.BaseWorker):
self.queue.acquire()
try:
assert self.opcode.status in (constants.OP_STATUS_WAITLOCK,
constants.OP_STATUS_CANCELING)
# Cancel here if we were asked to
if self.opcode.status == constants.OP_STATUS_CANCELING:
raise CancelJob()
self.opcode.status = constants.OP_STATUS_RUNNING
finally:
self.queue.release()
......@@ -338,6 +356,7 @@ class _JobQueueWorker(workerpool.BaseWorker):
queue.acquire()
try:
assert op.status == constants.OP_STATUS_QUEUED
job.run_op_index = idx
op.status = constants.OP_STATUS_WAITLOCK
op.result = None
......@@ -390,6 +409,9 @@ class _JobQueueWorker(workerpool.BaseWorker):
logging.debug("Op %s/%s: Successfully finished %s",
idx + 1, count, op)
except CancelJob:
# Will be handled further up
raise
except Exception, err:
queue.acquire()
try:
......@@ -404,6 +426,12 @@ class _JobQueueWorker(workerpool.BaseWorker):
queue.release()
raise
except CancelJob:
queue.acquire()
try:
queue.CancelJobUnlocked(job)
finally:
queue.release()
except errors.GenericError, err:
logging.exception("Ganeti exception")
except:
......@@ -535,7 +563,8 @@ class JobQueue(object):
self._wpool.AddTask(job)
elif status in (constants.JOB_STATUS_RUNNING,
constants.JOB_STATUS_WAITLOCK):
constants.JOB_STATUS_WAITLOCK,
constants.JOB_STATUS_CANCELING):
logging.warning("Unfinished job %s found: %s", job.id, job)
try:
for op in job.ops:
......@@ -1020,21 +1049,42 @@ class JobQueue(object):
@param job_id: job ID of job to be cancelled.
"""
logging.debug("Cancelling job %s", job_id)
logging.info("Cancelling job %s", job_id)
job = self._LoadJobUnlocked(job_id)
if not job:
logging.debug("Job %s not found", job_id)
return
return (False, "Job %s not found" % job_id)
job_status = job.CalcStatus()
if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
if job_status not in (constants.JOB_STATUS_QUEUED,
constants.JOB_STATUS_WAITLOCK):
logging.debug("Job %s is no longer in the queue", job.id)
return
return (False, "Job %s is no longer in the queue" % job.id)
if job_status == constants.JOB_STATUS_QUEUED:
self.CancelJobUnlocked(job)
return (True, "Job %s canceled" % job.id)
elif job_status == constants.JOB_STATUS_WAITLOCK:
# The worker will notice the new status and cancel the job
try:
for op in job.ops:
op.status = constants.OP_STATUS_CANCELING
finally:
self.UpdateJobUnlocked(job)
return (True, "Job %s will be canceled" % job.id)
@_RequireOpenQueue
def CancelJobUnlocked(self, job):
"""Marks a job as canceled.
"""
try:
for op in job.ops:
op.status = constants.OP_STATUS_ERROR
op.result = "Job cancelled by request"
op.result = "Job canceled by request"
finally:
self.UpdateJobUnlocked(job)
......
......@@ -38,6 +38,7 @@ _LIST_DEF_FIELDS = ["id", "status", "summary"]
_USER_JOB_STATUS = {
constants.JOB_STATUS_QUEUED: "queued",
constants.JOB_STATUS_WAITLOCK: "waiting",
constants.JOB_STATUS_CANCELING: "canceling",
constants.JOB_STATUS_RUNNING: "running",
constants.JOB_STATUS_CANCELED: "canceled",
constants.JOB_STATUS_SUCCESS: "success",
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment