Commit 20571a26 authored by Guido Trotter's avatar Guido Trotter
Browse files

Cache a few bits of status in jqueue



Currently each time we submit a job we check the job queue size, and the
drained file. With this change we keep these pieces of information in
memory and don't read them from the filesystem each time.

Significant changes include:
  - The drained value can only be properly set by calling the
    appropriate cluster command "gnt-cluster queue drain/undrain" and
    not by removing/creating the file in the job queue directory. Not
    that anybody would have done it in this undocumented way before.
  - We get rid of the soft limit for the job queue, which we haven't
    ever used anyway.
Signed-off-by: default avatarGuido Trotter <ultrotter@google.com>
Reviewed-by: default avatarMichael Hanselmann <hansmi@google.com>
parent 6358dbc2
......@@ -695,7 +695,6 @@ JOB_QUEUE_SERIAL_FILE = QUEUE_DIR + "/serial"
JOB_QUEUE_ARCHIVE_DIR = QUEUE_DIR + "/archive"
JOB_QUEUE_DRAIN_FILE = QUEUE_DIR + "/drain"
JOB_QUEUE_SIZE_HARD_LIMIT = 5000
JOB_QUEUE_SIZE_SOFT_LIMIT = JOB_QUEUE_SIZE_HARD_LIMIT * 0.8
JOB_QUEUE_DIRS = [QUEUE_DIR, JOB_QUEUE_ARCHIVE_DIR]
JOB_QUEUE_DIRS_MODE = SECURE_DIR_MODE
......
......@@ -628,6 +628,10 @@ class JobQueue(object):
# TODO: Check consistency across nodes
self._queue_size = 0
self._UpdateQueueSizeUnlocked()
self._drained = self._IsQueueMarkedDrain()
# Setup worker pool
self._wpool = _JobQueueWorkerPool(self)
try:
......@@ -997,8 +1001,15 @@ class JobQueue(object):
"""
return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
@staticmethod
def SetDrainFlag(drain_flag):
def _UpdateQueueSizeUnlocked(self):
"""Update the queue size.
"""
self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
@utils.LockedMethod
@_RequireOpenQueue
def SetDrainFlag(self, drain_flag):
"""Sets the drain flag for the queue.
@type drain_flag: boolean
......@@ -1009,6 +1020,9 @@ class JobQueue(object):
utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
else:
utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
self._drained = drain_flag
return True
@_RequireOpenQueue
......@@ -1027,18 +1041,12 @@ class JobQueue(object):
@raise errors.JobQueueDrainError: if the job is marked for draining
"""
if self._IsQueueMarkedDrain():
# Ok when sharing the big job queue lock, as the drain file is created when
# the lock is exclusive.
if self._drained:
raise errors.JobQueueDrainError("Job queue is drained, refusing job")
# Check job queue size
size = len(self._GetJobIDsUnlocked(sort=False))
if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
# TODO: Autoarchive jobs. Make sure it's not done on every job
# submission, though.
#size = ...
pass
if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
raise errors.JobQueueFull()
job = _QueuedJob(self, job_id, ops)
......@@ -1046,6 +1054,8 @@ class JobQueue(object):
# Write to disk
self.UpdateJobUnlocked(job)
self._queue_size += 1
logging.debug("Adding new job %s to the cache", job_id)
self._memcache[job_id] = job
......@@ -1250,6 +1260,11 @@ class JobQueue(object):
logging.debug("Successfully archived job(s) %s",
utils.CommaJoin(job.id for job in archive_jobs))
# Since we haven't quite checked, above, if we succeeded or failed renaming
# the files, we update the cached queue size from the filesystem. When we
# get around to fix the TODO: above, we can use the number of actually
# archived jobs to fix this.
self._UpdateQueueSizeUnlocked()
return len(archive_jobs)
@utils.LockedMethod
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment