Commit 686d7433 authored by Iustin Pop's avatar Iustin Pop
Implement the job queue drain flag

We add a (per-node) queue drain flag that blocks new job submission.
There is not yet an interface to add/remove the flag (will come in next

Reviewed-by: imsnah
parent 6797ec29
......@@ -655,6 +655,9 @@ def FormatError(err):
obuf.write("Failure: command execution error:\n%s" % msg)
elif isinstance(err, errors.TagError):
obuf.write("Failure: invalid tag(s) given:\n%s" % msg)
elif isinstance(err, errors.JobQueueDrainError):
obuf.write("Failure: the job queue is marked for drain and doesn't"
" accept new requests\n")
elif isinstance(err, errors.GenericError):
obuf.write("Unhandled Ganeti error: %s" % msg)
elif isinstance(err, luxi.NoMasterError):
......@@ -333,6 +333,7 @@ JOB_QUEUE_LOCK_FILE = QUEUE_DIR + "/lock"
......@@ -243,6 +243,15 @@ class JobQueueError(GenericError):
class JobQueueDrainError(JobQueueError):
"""Job queue is marked for drain error.
This is raised when a job submission attempt is made but the queue
is marked for drain.
# errors should be added above
......@@ -564,6 +564,16 @@ class JobQueue(object):
return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
def _IsQueueMarkedDrain():
"""Check if the queue is marked from drain.
This currently uses the queue drain file, which makes it a
per-node flag. In the future this can be moved to the config file.
return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
def SubmitJob(self, ops):
......@@ -576,6 +586,8 @@ class JobQueue(object):
@param ops: The list of OpCodes that will become the new job.
if self._IsQueueMarkedDrain():
raise errors.JobQueueDrainError()
# Get job identifier
job_id = self._NewSerialUnlocked()
job = _QueuedJob(self, job_id, ops)
