Commit ac0930b9 authored by Iustin Pop's avatar Iustin Pop
Browse files

Cache some jobs in memory

This patch adds a caching mechanisms to the JobStorage. Note that is
does not make the memory cache authoritative.

The algorithm is:
  - all jobs loaded from disks are entered in the cache
  - all new jobs are entered in the cache
  - at each job save (in UpdateJobUnlocked), jobs which are not
    executing or queued are removed from the cache

The end effect is that running jobs will always be in the cache (which
will fix the opcode log changes) and finished jobs will be kept for a
while in the cache after being loaded.

Reviewed-by: imsnah
parent 8a70e415
...@@ -227,6 +227,7 @@ class JobStorage(object): ...@@ -227,6 +227,7 @@ class JobStorage(object):
def __init__(self): def __init__(self):
self._lock = threading.Lock() self._lock = threading.Lock()
self._memcache = {}
# Make sure our directory exists # Make sure our directory exists
try: try:
...@@ -319,6 +320,10 @@ class JobStorage(object): ...@@ -319,6 +320,10 @@ class JobStorage(object):
If the parameter archived is True, archived jobs IDs will be If the parameter archived is True, archived jobs IDs will be
included. Currently this argument is unused. included. Currently this argument is unused.
The method only looks at disk because it's a requirement that all
jobs are present on disk (so in the _memcache we don't have any
extra IDs).
""" """
jfiles = self._ListJobFiles() jfiles = self._ListJobFiles()
return [int(m.group(1)) for m in return [int(m.group(1)) for m in
...@@ -333,6 +338,10 @@ class JobStorage(object): ...@@ -333,6 +338,10 @@ class JobStorage(object):
def _LoadJobUnlocked(self, job_id): def _LoadJobUnlocked(self, job_id):
assert self.lock_fd, "Queue should be open" assert self.lock_fd, "Queue should be open"
if job_id in self._memcache:
logging.debug("Found job %d in memcache", job_id)
return self._memcache[job_id]
filepath = self._GetJobPath(job_id) filepath = self._GetJobPath(job_id)
logging.debug("Loading job from %s", filepath) logging.debug("Loading job from %s", filepath)
try: try:
...@@ -346,7 +355,10 @@ class JobStorage(object): ...@@ -346,7 +355,10 @@ class JobStorage(object):
finally: finally:
fd.close() fd.close()
return _QueuedJob.Restore(self, data) job = _QueuedJob.Restore(self, data)
self._memcache[job_id] = job
logging.debug("Added job %d to the cache", job_id)
return job
def _GetJobsUnlocked(self, job_ids): def _GetJobsUnlocked(self, job_ids):
if not job_ids: if not job_ids:
...@@ -369,6 +381,9 @@ class JobStorage(object): ...@@ -369,6 +381,9 @@ class JobStorage(object):
# Write to disk # Write to disk
self._UpdateJobUnlocked(job) self._UpdateJobUnlocked(job)
logging.debug("Added new job %d to the cache", job_id)
self._memcache[job_id] = job
return job return job
def _UpdateJobUnlocked(self, job): def _UpdateJobUnlocked(self, job):
...@@ -378,6 +393,26 @@ class JobStorage(object): ...@@ -378,6 +393,26 @@ class JobStorage(object):
logging.debug("Writing job %s to %s", job.id, filename) logging.debug("Writing job %s to %s", job.id, filename)
utils.WriteFile(filename, utils.WriteFile(filename,
data=serializer.DumpJson(job.Serialize(), indent=False)) data=serializer.DumpJson(job.Serialize(), indent=False))
self._CleanCacheUnlocked(exceptions=[job.id])
def _CleanCacheUnlocked(self, exceptions=None):
"""Clean the memory cache.
The exceptions argument contains job IDs that should not be
cleaned.
"""
assert isinstance(exceptions, list)
for job in self._memcache.values():
if job.id in exceptions:
continue
if job.GetStatus() not in (constants.JOB_STATUS_QUEUED,
constants.JOB_STATUS_RUNNING):
logging.debug("Cleaning job %d from the cache", job.id)
try:
del self._memcache[job.id]
except KeyError:
pass
@utils.LockedMethod @utils.LockedMethod
def UpdateJob(self, job): def UpdateJob(self, job):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment