From ac0930b925974d3bb1174cf3922bafc4fbe50596 Mon Sep 17 00:00:00 2001
From: Iustin Pop <iustin@google.com>
Date: Mon, 14 Jul 2008 11:27:40 +0000
Subject: [PATCH] Cache some jobs in memory

This patch adds a caching mechanisms to the JobStorage. Note that is
does not make the memory cache authoritative.

The algorithm is:
  - all jobs loaded from disks are entered in the cache
  - all new jobs are entered in the cache
  - at each job save (in UpdateJobUnlocked), jobs which are not
    executing or queued are removed from the cache

The end effect is that running jobs will always be in the cache (which
will fix the opcode log changes) and finished jobs will be kept for a
while in the cache after being loaded.

Reviewed-by: imsnah
---
 lib/jqueue.py | 37 ++++++++++++++++++++++++++++++++++++-
 1 file changed, 36 insertions(+), 1 deletion(-)

diff --git a/lib/jqueue.py b/lib/jqueue.py
index 285a1bfe5..0b86f33ee 100644
--- a/lib/jqueue.py
+++ b/lib/jqueue.py
@@ -227,6 +227,7 @@ class JobStorage(object):
 
   def __init__(self):
     self._lock = threading.Lock()
+    self._memcache = {}
 
     # Make sure our directory exists
     try:
@@ -319,6 +320,10 @@ class JobStorage(object):
     If the parameter archived is True, archived jobs IDs will be
     included. Currently this argument is unused.
 
+    The method only looks at disk because it's a requirement that all
+    jobs are present on disk (so in the _memcache we don't have any
+    extra IDs).
+
     """
     jfiles = self._ListJobFiles()
     return [int(m.group(1)) for m in
@@ -333,6 +338,10 @@ class JobStorage(object):
   def _LoadJobUnlocked(self, job_id):
     assert self.lock_fd, "Queue should be open"
 
+    if job_id in self._memcache:
+      logging.debug("Found job %d in memcache", job_id)
+      return self._memcache[job_id]
+
     filepath = self._GetJobPath(job_id)
     logging.debug("Loading job from %s", filepath)
     try:
@@ -346,7 +355,10 @@ class JobStorage(object):
     finally:
       fd.close()
 
-    return _QueuedJob.Restore(self, data)
+    job = _QueuedJob.Restore(self, data)
+    self._memcache[job_id] = job
+    logging.debug("Added job %d to the cache", job_id)
+    return job
 
   def _GetJobsUnlocked(self, job_ids):
     if not job_ids:
@@ -369,6 +381,9 @@ class JobStorage(object):
     # Write to disk
     self._UpdateJobUnlocked(job)
 
+    logging.debug("Added new job %d to the cache", job_id)
+    self._memcache[job_id] = job
+
     return job
 
   def _UpdateJobUnlocked(self, job):
@@ -378,6 +393,26 @@ class JobStorage(object):
     logging.debug("Writing job %s to %s", job.id, filename)
     utils.WriteFile(filename,
                     data=serializer.DumpJson(job.Serialize(), indent=False))
+    self._CleanCacheUnlocked(exceptions=[job.id])
+
+  def _CleanCacheUnlocked(self, exceptions=None):
+    """Clean the memory cache.
+
+    The exceptions argument contains job IDs that should not be
+    cleaned.
+
+    """
+    assert isinstance(exceptions, list)
+    for job in self._memcache.values():
+      if job.id in exceptions:
+        continue
+      if job.GetStatus() not in (constants.JOB_STATUS_QUEUED,
+                                 constants.JOB_STATUS_RUNNING):
+        logging.debug("Cleaning job %d from the cache", job.id)
+        try:
+          del self._memcache[job.id]
+        except KeyError:
+          pass
 
   @utils.LockedMethod
   def UpdateJob(self, job):
-- 
GitLab