Commit db37da70 authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

jqueue: Move assert into decorator

This reduces code duplication. A later patch will modify the job queue
a bit more and will need a change of this assert. The assertion is
also removed from all class-internal functions.

Reviewed-by: iustinp
parent 0a1e74d9
......@@ -251,6 +251,26 @@ class _JobQueueWorkerPool(workerpool.WorkerPool):
class JobQueue(object):
_RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
def _RequireOpenQueue(fn):
"""Decorator for "public" functions.
This function should be used for all "public" functions. That is, functions
usually called from other classes.
Important: Use this decorator only after utils.LockedMethod!
Example:
@utils.LockedMethod
@_RequireOpenQueue
def Example(self):
pass
"""
def wrapper(self, *args, **kwargs):
assert self.lock_fd, "Queue should be open"
return fn(self, *args, **kwargs)
return wrapper
def __init__(self, context):
self.context = context
self._memcache = {}
......@@ -352,8 +372,6 @@ class JobQueue(object):
return serial
def _InitQueueUnlocked(self):
assert self.lock_fd, "Queue should be open"
utils.WriteFile(constants.JOB_QUEUE_VERSION_FILE,
data="%s\n" % constants.JOB_QUEUE_VERSION)
if self._ReadSerial() is None:
......@@ -376,8 +394,6 @@ class JobQueue(object):
Returns: A string representing the job identifier.
"""
assert self.lock_fd, "Queue should be open"
# New number
serial = self._last_serial + 1
......@@ -433,14 +449,10 @@ class JobQueue(object):
return jlist
def _ListJobFiles(self):
assert self.lock_fd, "Queue should be open"
return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
if self._RE_JOB_FILE.match(name)]
def _LoadJobUnlocked(self, job_id):
assert self.lock_fd, "Queue should be open"
if job_id in self._memcache:
logging.debug("Found job %s in memcache", job_id)
return self._memcache[job_id]
......@@ -470,6 +482,7 @@ class JobQueue(object):
return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
@utils.LockedMethod
@_RequireOpenQueue
def SubmitJob(self, ops, nodes):
"""Create and store a new job.
......@@ -483,8 +496,6 @@ class JobQueue(object):
distributed.
"""
assert self.lock_fd, "Queue should be open"
# Get job identifier
job_id = self._NewSerialUnlocked(nodes)
job = _QueuedJob(self, job_id, ops)
......@@ -500,9 +511,8 @@ class JobQueue(object):
return job.id
@_RequireOpenQueue
def UpdateJobUnlocked(self, job):
assert self.lock_fd, "Queue should be open"
filename = self._GetJobPath(job.id)
logging.debug("Writing job %s to %s", job.id, filename)
utils.WriteFile(filename,
......@@ -530,6 +540,7 @@ class JobQueue(object):
pass
@utils.LockedMethod
@_RequireOpenQueue
def CancelJob(self, job_id):
"""Cancels a job.
......@@ -556,6 +567,7 @@ class JobQueue(object):
self.UpdateJobUnlocked(job)
@utils.LockedMethod
@_RequireOpenQueue
def ArchiveJob(self, job_id):
"""Archives a job.
......@@ -618,6 +630,7 @@ class JobQueue(object):
return row
@utils.LockedMethod
@_RequireOpenQueue
def QueryJobs(self, job_ids, fields):
"""Returns a list of jobs in queue.
......@@ -637,12 +650,11 @@ class JobQueue(object):
return jobs
@utils.LockedMethod
@_RequireOpenQueue
def Shutdown(self):
"""Stops the job queue.
"""
assert self.lock_fd, "Queue should be open"
self._wpool.TerminateWorkers()
self.lock_fd.close()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment