From 686d74337a38adc1d666ae372405ac037f4ad5a9 Mon Sep 17 00:00:00 2001 From: Iustin Pop <iustin@google.com> Date: Wed, 15 Oct 2008 10:52:07 +0000 Subject: [PATCH] Implement the job queue drain flag We add a (per-node) queue drain flag that blocks new job submission. There is not yet an interface to add/remove the flag (will come in next patches). Reviewed-by: imsnah --- lib/cli.py | 3 +++ lib/constants.py | 1 + lib/errors.py | 9 +++++++++ lib/jqueue.py | 12 ++++++++++++ 4 files changed, 25 insertions(+) diff --git a/lib/cli.py b/lib/cli.py index 73be93073..956be3d1e 100644 --- a/lib/cli.py +++ b/lib/cli.py @@ -655,6 +655,9 @@ def FormatError(err): obuf.write("Failure: command execution error:\n%s" % msg) elif isinstance(err, errors.TagError): obuf.write("Failure: invalid tag(s) given:\n%s" % msg) + elif isinstance(err, errors.JobQueueDrainError): + obuf.write("Failure: the job queue is marked for drain and doesn't" + " accept new requests\n") elif isinstance(err, errors.GenericError): obuf.write("Unhandled Ganeti error: %s" % msg) elif isinstance(err, luxi.NoMasterError): diff --git a/lib/constants.py b/lib/constants.py index 6dae29462..9873ca7eb 100644 --- a/lib/constants.py +++ b/lib/constants.py @@ -333,6 +333,7 @@ JOB_QUEUE_LOCK_FILE = QUEUE_DIR + "/lock" JOB_QUEUE_VERSION_FILE = QUEUE_DIR + "/version" JOB_QUEUE_SERIAL_FILE = QUEUE_DIR + "/serial" JOB_QUEUE_ARCHIVE_DIR = QUEUE_DIR + "/archive" +JOB_QUEUE_DRAIN_FILE = QUEUE_DIR + "/drain" JOB_ID_TEMPLATE = r"\d+" diff --git a/lib/errors.py b/lib/errors.py index 006da8e98..6efd9aa05 100644 --- a/lib/errors.py +++ b/lib/errors.py @@ -243,6 +243,15 @@ class JobQueueError(GenericError): """ +class JobQueueDrainError(JobQueueError): + """Job queue is marked for drain error. + + This is raised when a job submission attempt is made but the queue + is marked for drain. + + """ + + # errors should be added above diff --git a/lib/jqueue.py b/lib/jqueue.py index 54a3562dc..478bd787e 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -564,6 +564,16 @@ class JobQueue(object): return [self._LoadJobUnlocked(job_id) for job_id in job_ids] + @staticmethod + def _IsQueueMarkedDrain(): + """Check if the queue is marked from drain. + + This currently uses the queue drain file, which makes it a + per-node flag. In the future this can be moved to the config file. + + """ + return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE) + @utils.LockedMethod @_RequireOpenQueue def SubmitJob(self, ops): @@ -576,6 +586,8 @@ class JobQueue(object): @param ops: The list of OpCodes that will become the new job. """ + if self._IsQueueMarkedDrain(): + raise errors.JobQueueDrainError() # Get job identifier job_id = self._NewSerialUnlocked() job = _QueuedJob(self, job_id, ops) -- GitLab