From 6d5ea38581f487a9aa581aca5556d4077ab753ca Mon Sep 17 00:00:00 2001 From: Michael Hanselmann <hansmi@google.com> Date: Thu, 17 Nov 2011 11:55:18 +0100 Subject: [PATCH] jqueue: Add code to prepare for queue shutdown Doing so will prevent job submissions (similar to a drained queue), but won't affect currently running jobs. No further jobs will be executed. Signed-off-by: Michael Hanselmann <hansmi@google.com> Reviewed-by: Iustin Pop <iustin@google.com> --- lib/jqueue.py | 35 ++++++++++++++++++++++++++++++++++- 1 file changed, 34 insertions(+), 1 deletion(-) diff --git a/lib/jqueue.py b/lib/jqueue.py index e21132e61..19d369f47 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -1504,6 +1504,10 @@ def _RequireNonDrainedQueue(fn): # Needs access to protected member, pylint: disable=W0212 if self._drained: raise errors.JobQueueDrainError("Job queue is drained, refusing job") + + if not self._accepting_jobs: + raise errors.JobQueueError("Job queue is shutting down, refusing job") + return fn(self, *args, **kwargs) return wrapper @@ -1539,6 +1543,9 @@ class JobQueue(object): self.acquire = self._lock.acquire self.release = self._lock.release + # Accept jobs by default + self._accepting_jobs = True + # Initialize the queue, and acquire the filelock. # This ensures no other process is working on the job queue. self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True) @@ -1558,8 +1565,9 @@ class JobQueue(object): # TODO: Check consistency across nodes - self._queue_size = 0 + self._queue_size = None self._UpdateQueueSizeUnlocked() + assert ht.TInt(self._queue_size) self._drained = jstore.CheckDrainFlag() # Job dependencies @@ -2450,6 +2458,31 @@ class JobQueue(object): return jobs + @locking.ssynchronized(_LOCK) + def PrepareShutdown(self): + """Prepare to stop the job queue. + + Disables execution of jobs in the workerpool and returns whether there are + any jobs currently running. If the latter is the case, the job queue is not + yet ready for shutdown. Once this function returns C{True} L{Shutdown} can + be called without interfering with any job. Queued and unfinished jobs will + be resumed next time. + + Once this function has been called no new job submissions will be accepted + (see L{_RequireNonDrainedQueue}). + + @rtype: bool + @return: Whether there are any running jobs + + """ + if self._accepting_jobs: + self._accepting_jobs = False + + # Tell worker pool to stop processing pending tasks + self._wpool.SetActive(False) + + return self._wpool.HasRunningTasks() + @locking.ssynchronized(_LOCK) @_RequireOpenQueue def Shutdown(self): -- GitLab