diff --git a/lib/jqueue.py b/lib/jqueue.py index e21132e616aeb08eab70742657dc2b112165684f..19d369f47cabbfea3e2e15f6b4cf4fb526498b92 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -1504,6 +1504,10 @@ def _RequireNonDrainedQueue(fn): # Needs access to protected member, pylint: disable=W0212 if self._drained: raise errors.JobQueueDrainError("Job queue is drained, refusing job") + + if not self._accepting_jobs: + raise errors.JobQueueError("Job queue is shutting down, refusing job") + return fn(self, *args, **kwargs) return wrapper @@ -1539,6 +1543,9 @@ class JobQueue(object): self.acquire = self._lock.acquire self.release = self._lock.release + # Accept jobs by default + self._accepting_jobs = True + # Initialize the queue, and acquire the filelock. # This ensures no other process is working on the job queue. self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True) @@ -1558,8 +1565,9 @@ class JobQueue(object): # TODO: Check consistency across nodes - self._queue_size = 0 + self._queue_size = None self._UpdateQueueSizeUnlocked() + assert ht.TInt(self._queue_size) self._drained = jstore.CheckDrainFlag() # Job dependencies @@ -2450,6 +2458,31 @@ class JobQueue(object): return jobs + @locking.ssynchronized(_LOCK) + def PrepareShutdown(self): + """Prepare to stop the job queue. + + Disables execution of jobs in the workerpool and returns whether there are + any jobs currently running. If the latter is the case, the job queue is not + yet ready for shutdown. Once this function returns C{True} L{Shutdown} can + be called without interfering with any job. Queued and unfinished jobs will + be resumed next time. + + Once this function has been called no new job submissions will be accepted + (see L{_RequireNonDrainedQueue}). + + @rtype: bool + @return: Whether there are any running jobs + + """ + if self._accepting_jobs: + self._accepting_jobs = False + + # Tell worker pool to stop processing pending tasks + self._wpool.SetActive(False) + + return self._wpool.HasRunningTasks() + @locking.ssynchronized(_LOCK) @_RequireOpenQueue def Shutdown(self):