diff --git a/lib/jqueue.py b/lib/jqueue.py index 285a1bfe505cf1f23c19d37af521c761b48e0579..0b86f33eec93d05e94e4dde37888dc9a42e71d3a 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -227,6 +227,7 @@ class JobStorage(object): def __init__(self): self._lock = threading.Lock() + self._memcache = {} # Make sure our directory exists try: @@ -319,6 +320,10 @@ class JobStorage(object): If the parameter archived is True, archived jobs IDs will be included. Currently this argument is unused. + The method only looks at disk because it's a requirement that all + jobs are present on disk (so in the _memcache we don't have any + extra IDs). + """ jfiles = self._ListJobFiles() return [int(m.group(1)) for m in @@ -333,6 +338,10 @@ class JobStorage(object): def _LoadJobUnlocked(self, job_id): assert self.lock_fd, "Queue should be open" + if job_id in self._memcache: + logging.debug("Found job %d in memcache", job_id) + return self._memcache[job_id] + filepath = self._GetJobPath(job_id) logging.debug("Loading job from %s", filepath) try: @@ -346,7 +355,10 @@ class JobStorage(object): finally: fd.close() - return _QueuedJob.Restore(self, data) + job = _QueuedJob.Restore(self, data) + self._memcache[job_id] = job + logging.debug("Added job %d to the cache", job_id) + return job def _GetJobsUnlocked(self, job_ids): if not job_ids: @@ -369,6 +381,9 @@ class JobStorage(object): # Write to disk self._UpdateJobUnlocked(job) + logging.debug("Added new job %d to the cache", job_id) + self._memcache[job_id] = job + return job def _UpdateJobUnlocked(self, job): @@ -378,6 +393,26 @@ class JobStorage(object): logging.debug("Writing job %s to %s", job.id, filename) utils.WriteFile(filename, data=serializer.DumpJson(job.Serialize(), indent=False)) + self._CleanCacheUnlocked(exceptions=[job.id]) + + def _CleanCacheUnlocked(self, exceptions=None): + """Clean the memory cache. + + The exceptions argument contains job IDs that should not be + cleaned. + + """ + assert isinstance(exceptions, list) + for job in self._memcache.values(): + if job.id in exceptions: + continue + if job.GetStatus() not in (constants.JOB_STATUS_QUEUED, + constants.JOB_STATUS_RUNNING): + logging.debug("Cleaning job %d from the cache", job.id) + try: + del self._memcache[job.id] + except KeyError: + pass @utils.LockedMethod def UpdateJob(self, job):