Commit 6d5ea385 authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

jqueue: Add code to prepare for queue shutdown



Doing so will prevent job submissions (similar to a drained queue),
but won't affect currently running jobs. No further jobs will be
executed.
Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarIustin Pop <iustin@google.com>
parent ef52306a
......@@ -1504,6 +1504,10 @@ def _RequireNonDrainedQueue(fn):
# Needs access to protected member, pylint: disable=W0212
if self._drained:
raise errors.JobQueueDrainError("Job queue is drained, refusing job")
if not self._accepting_jobs:
raise errors.JobQueueError("Job queue is shutting down, refusing job")
return fn(self, *args, **kwargs)
return wrapper
......@@ -1539,6 +1543,9 @@ class JobQueue(object):
self.acquire = self._lock.acquire
self.release = self._lock.release
# Accept jobs by default
self._accepting_jobs = True
# Initialize the queue, and acquire the filelock.
# This ensures no other process is working on the job queue.
self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
......@@ -1558,8 +1565,9 @@ class JobQueue(object):
# TODO: Check consistency across nodes
self._queue_size = 0
self._queue_size = None
self._UpdateQueueSizeUnlocked()
assert ht.TInt(self._queue_size)
self._drained = jstore.CheckDrainFlag()
# Job dependencies
......@@ -2450,6 +2458,31 @@ class JobQueue(object):
return jobs
@locking.ssynchronized(_LOCK)
def PrepareShutdown(self):
"""Prepare to stop the job queue.
Disables execution of jobs in the workerpool and returns whether there are
any jobs currently running. If the latter is the case, the job queue is not
yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
be called without interfering with any job. Queued and unfinished jobs will
be resumed next time.
Once this function has been called no new job submissions will be accepted
(see L{_RequireNonDrainedQueue}).
@rtype: bool
@return: Whether there are any running jobs
"""
if self._accepting_jobs:
self._accepting_jobs = False
# Tell worker pool to stop processing pending tasks
self._wpool.SetActive(False)
return self._wpool.HasRunningTasks()
@locking.ssynchronized(_LOCK)
@_RequireOpenQueue
def Shutdown(self):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment