diff --git a/lib/cli.py b/lib/cli.py index 737c4016eb61e947bb9c1b4d6057e0639e4c995b..dea707801516f634b974d656863b269a47e2914b 100644 --- a/lib/cli.py +++ b/lib/cli.py @@ -687,6 +687,9 @@ def FormatError(err): elif isinstance(err, errors.JobQueueDrainError): obuf.write("Failure: the job queue is marked for drain and doesn't" " accept new requests\n") + elif isinstance(err, errors.JobQueueFull): + obuf.write("Failure: the job queue is full and doesn't accept new" + " job submissions until old jobs are archived\n") elif isinstance(err, errors.GenericError): obuf.write("Unhandled Ganeti error: %s" % msg) elif isinstance(err, luxi.NoMasterError): diff --git a/lib/constants.py b/lib/constants.py index a5748c900a26d9bcd32a3a77b203021a28b7cd2b..7dfa2ac41f271d8ab3a1047a7b17737bbd1e9343 100644 --- a/lib/constants.py +++ b/lib/constants.py @@ -366,6 +366,8 @@ JOB_QUEUE_VERSION_FILE = QUEUE_DIR + "/version" JOB_QUEUE_SERIAL_FILE = QUEUE_DIR + "/serial" JOB_QUEUE_ARCHIVE_DIR = QUEUE_DIR + "/archive" JOB_QUEUE_DRAIN_FILE = QUEUE_DIR + "/drain" +JOB_QUEUE_SIZE_HARD_LIMIT = 5000 +JOB_QUEUE_SIZE_SOFT_LIMIT = JOB_QUEUE_SIZE_HARD_LIMIT * 0.8 JOB_ID_TEMPLATE = r"\d+" diff --git a/lib/errors.py b/lib/errors.py index 4f3d86d5ae396448b7478d521827812aa971def6..b54a1493a38291e733172e8764e0d20012bf6c2b 100644 --- a/lib/errors.py +++ b/lib/errors.py @@ -253,6 +253,14 @@ class JobQueueDrainError(JobQueueError): """ +class JobQueueFull(JobQueueError): + """Job queue full error. + + Raised when job queue size reached its hard limit. + + """ + + # errors should be added above diff --git a/lib/jqueue.py b/lib/jqueue.py index 38b13b73b66514970ebaa41c6d7b34b3f8c807ea..3734476070fca51806ad0fcc8662434a09b0e125 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -941,6 +941,18 @@ class JobQueue(object): """ if self._IsQueueMarkedDrain(): raise errors.JobQueueDrainError() + + # Check job queue size + size = len(self._ListJobFiles()) + if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT: + # TODO: Autoarchive jobs. Make sure it's not done on every job + # submission, though. + #size = ... + pass + + if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT: + raise errors.JobQueueFull() + # Get job identifier job_id = self._NewSerialUnlocked() job = _QueuedJob(self, job_id, ops)