Skip to content
Snippets Groups Projects
Commit c8d0be94 authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

jqueue: Factorize code checking for drained queue


This is in preparation for a clean(er) shutdown of masterd.

Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarIustin Pop <iustin@google.com>
parent 69f0340a
No related branches found
No related tags found
No related merge requests found
...@@ -1487,6 +1487,27 @@ def _RequireOpenQueue(fn): ...@@ -1487,6 +1487,27 @@ def _RequireOpenQueue(fn):
return wrapper return wrapper
def _RequireNonDrainedQueue(fn):
"""Decorator checking for a non-drained queue.
To be used with functions submitting new jobs.
"""
def wrapper(self, *args, **kwargs):
"""Wrapper function.
@raise errors.JobQueueDrainError: if the job queue is marked for draining
"""
# Ok when sharing the big job queue lock, as the drain file is created when
# the lock is exclusive.
# Needs access to protected member, pylint: disable=W0212
if self._drained:
raise errors.JobQueueDrainError("Job queue is drained, refusing job")
return fn(self, *args, **kwargs)
return wrapper
class JobQueue(object): class JobQueue(object):
"""Queue used to manage the jobs. """Queue used to manage the jobs.
...@@ -2013,16 +2034,10 @@ class JobQueue(object): ...@@ -2013,16 +2034,10 @@ class JobQueue(object):
@param ops: The list of OpCodes that will become the new job. @param ops: The list of OpCodes that will become the new job.
@rtype: L{_QueuedJob} @rtype: L{_QueuedJob}
@return: the job object to be queued @return: the job object to be queued
@raise errors.JobQueueDrainError: if the job queue is marked for draining
@raise errors.JobQueueFull: if the job queue has too many jobs in it @raise errors.JobQueueFull: if the job queue has too many jobs in it
@raise errors.GenericError: If an opcode is not valid @raise errors.GenericError: If an opcode is not valid
""" """
# Ok when sharing the big job queue lock, as the drain file is created when
# the lock is exclusive.
if self._drained:
raise errors.JobQueueDrainError("Job queue is drained, refusing job")
if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT: if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
raise errors.JobQueueFull() raise errors.JobQueueFull()
...@@ -2054,6 +2069,7 @@ class JobQueue(object): ...@@ -2054,6 +2069,7 @@ class JobQueue(object):
@locking.ssynchronized(_LOCK) @locking.ssynchronized(_LOCK)
@_RequireOpenQueue @_RequireOpenQueue
@_RequireNonDrainedQueue
def SubmitJob(self, ops): def SubmitJob(self, ops):
"""Create and store a new job. """Create and store a new job.
...@@ -2066,6 +2082,7 @@ class JobQueue(object): ...@@ -2066,6 +2082,7 @@ class JobQueue(object):
@locking.ssynchronized(_LOCK) @locking.ssynchronized(_LOCK)
@_RequireOpenQueue @_RequireOpenQueue
@_RequireNonDrainedQueue
def SubmitManyJobs(self, jobs): def SubmitManyJobs(self, jobs):
"""Create and store multiple jobs. """Create and store multiple jobs.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment