diff --git a/lib/cli.py b/lib/cli.py index 73be9307350860df7259e3030fb6c53539419a7d..956be3d1ef5083a66ba9000cb1b57d0a2a71ea2e 100644 --- a/lib/cli.py +++ b/lib/cli.py @@ -655,6 +655,9 @@ def FormatError(err): obuf.write("Failure: command execution error:\n%s" % msg) elif isinstance(err, errors.TagError): obuf.write("Failure: invalid tag(s) given:\n%s" % msg) + elif isinstance(err, errors.JobQueueDrainError): + obuf.write("Failure: the job queue is marked for drain and doesn't" + " accept new requests\n") elif isinstance(err, errors.GenericError): obuf.write("Unhandled Ganeti error: %s" % msg) elif isinstance(err, luxi.NoMasterError): diff --git a/lib/constants.py b/lib/constants.py index 6dae294629b2492b4d49021e246068f205118d25..9873ca7eb08eb385d00e2a0149004f3a726335aa 100644 --- a/lib/constants.py +++ b/lib/constants.py @@ -333,6 +333,7 @@ JOB_QUEUE_LOCK_FILE = QUEUE_DIR + "/lock" JOB_QUEUE_VERSION_FILE = QUEUE_DIR + "/version" JOB_QUEUE_SERIAL_FILE = QUEUE_DIR + "/serial" JOB_QUEUE_ARCHIVE_DIR = QUEUE_DIR + "/archive" +JOB_QUEUE_DRAIN_FILE = QUEUE_DIR + "/drain" JOB_ID_TEMPLATE = r"\d+" diff --git a/lib/errors.py b/lib/errors.py index 006da8e9865ca92550f159b525aa5180ef07824a..6efd9aa052836c38dbcaa3d8da169cec2d7dd2ca 100644 --- a/lib/errors.py +++ b/lib/errors.py @@ -243,6 +243,15 @@ class JobQueueError(GenericError): """ +class JobQueueDrainError(JobQueueError): + """Job queue is marked for drain error. + + This is raised when a job submission attempt is made but the queue + is marked for drain. + + """ + + # errors should be added above diff --git a/lib/jqueue.py b/lib/jqueue.py index 54a3562dcf477f5acb1ad38c7b274a787cb56009..478bd787e71f6c2df430f4a4db793b6e7672fb28 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -564,6 +564,16 @@ class JobQueue(object): return [self._LoadJobUnlocked(job_id) for job_id in job_ids] + @staticmethod + def _IsQueueMarkedDrain(): + """Check if the queue is marked from drain. + + This currently uses the queue drain file, which makes it a + per-node flag. In the future this can be moved to the config file. + + """ + return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE) + @utils.LockedMethod @_RequireOpenQueue def SubmitJob(self, ops): @@ -576,6 +586,8 @@ class JobQueue(object): @param ops: The list of OpCodes that will become the new job. """ + if self._IsQueueMarkedDrain(): + raise errors.JobQueueDrainError() # Get job identifier job_id = self._NewSerialUnlocked() job = _QueuedJob(self, job_id, ops)