diff --git a/lib/constants.py b/lib/constants.py index 94eac9dd6848e03b872508d8869ced79bd6cfbe9..15a4257efa4258defeb2b8f1d67db260d1d84280 100644 --- a/lib/constants.py +++ b/lib/constants.py @@ -695,7 +695,6 @@ JOB_QUEUE_SERIAL_FILE = QUEUE_DIR + "/serial" JOB_QUEUE_ARCHIVE_DIR = QUEUE_DIR + "/archive" JOB_QUEUE_DRAIN_FILE = QUEUE_DIR + "/drain" JOB_QUEUE_SIZE_HARD_LIMIT = 5000 -JOB_QUEUE_SIZE_SOFT_LIMIT = JOB_QUEUE_SIZE_HARD_LIMIT * 0.8 JOB_QUEUE_DIRS = [QUEUE_DIR, JOB_QUEUE_ARCHIVE_DIR] JOB_QUEUE_DIRS_MODE = SECURE_DIR_MODE diff --git a/lib/jqueue.py b/lib/jqueue.py index 306fdcfdbafe26246b29618ead977e8b6ab6365e..d6b20ea928f03f0a36e6bd6561109981396e3b60 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -628,6 +628,10 @@ class JobQueue(object): # TODO: Check consistency across nodes + self._queue_size = 0 + self._UpdateQueueSizeUnlocked() + self._drained = self._IsQueueMarkedDrain() + # Setup worker pool self._wpool = _JobQueueWorkerPool(self) try: @@ -997,8 +1001,15 @@ class JobQueue(object): """ return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE) - @staticmethod - def SetDrainFlag(drain_flag): + def _UpdateQueueSizeUnlocked(self): + """Update the queue size. + + """ + self._queue_size = len(self._GetJobIDsUnlocked(sort=False)) + + @utils.LockedMethod + @_RequireOpenQueue + def SetDrainFlag(self, drain_flag): """Sets the drain flag for the queue. @type drain_flag: boolean @@ -1009,6 +1020,9 @@ class JobQueue(object): utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True) else: utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE) + + self._drained = drain_flag + return True @_RequireOpenQueue @@ -1027,18 +1041,12 @@ class JobQueue(object): @raise errors.JobQueueDrainError: if the job is marked for draining """ - if self._IsQueueMarkedDrain(): + # Ok when sharing the big job queue lock, as the drain file is created when + # the lock is exclusive. + if self._drained: raise errors.JobQueueDrainError("Job queue is drained, refusing job") - # Check job queue size - size = len(self._GetJobIDsUnlocked(sort=False)) - if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT: - # TODO: Autoarchive jobs. Make sure it's not done on every job - # submission, though. - #size = ... - pass - - if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT: + if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT: raise errors.JobQueueFull() job = _QueuedJob(self, job_id, ops) @@ -1046,6 +1054,8 @@ class JobQueue(object): # Write to disk self.UpdateJobUnlocked(job) + self._queue_size += 1 + logging.debug("Adding new job %s to the cache", job_id) self._memcache[job_id] = job @@ -1250,6 +1260,11 @@ class JobQueue(object): logging.debug("Successfully archived job(s) %s", utils.CommaJoin(job.id for job in archive_jobs)) + # Since we haven't quite checked, above, if we succeeded or failed renaming + # the files, we update the cached queue size from the filesystem. When we + # get around to fix the TODO: above, we can use the number of actually + # archived jobs to fix this. + self._UpdateQueueSizeUnlocked() return len(archive_jobs) @utils.LockedMethod