Commit 5685c1a5 authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

jqueue: Replace normal cache dict with weakref dict

A job should only exist once in memory. After the cache is cleaned,
there can still be references to a job somewhere else. If there
are multiple instances, one can get updated while a function is
waiting for changes on another instance. By using
weakref.WeakValueDictionary, which automatically removes instances as
soon as there are no strong references to it anymore, we can solve
this problem.

Reviewed-by: iustinp
parent 70552c46
# #
# #
# Copyright (C) 2006, 2007 Google Inc. # Copyright (C) 2006, 2007, 2008 Google Inc.
# #
# This program is free software; you can redistribute it and/or modify # This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by # it under the terms of the GNU General Public License as published by
...@@ -33,6 +33,7 @@ import threading ...@@ -33,6 +33,7 @@ import threading
import errno import errno
import re import re
import time import time
import weakref
from ganeti import constants from ganeti import constants
from ganeti import serializer from ganeti import serializer
...@@ -312,7 +313,7 @@ class JobQueue(object): ...@@ -312,7 +313,7 @@ class JobQueue(object):
def __init__(self, context): def __init__(self, context):
self.context = context self.context = context
self._memcache = {} self._memcache = weakref.WeakValueDictionary()
self._my_hostname = utils.HostInfo().name self._my_hostname = utils.HostInfo().name
# Locking # Locking
...@@ -489,9 +490,10 @@ class JobQueue(object): ...@@ -489,9 +490,10 @@ class JobQueue(object):
if self._RE_JOB_FILE.match(name)] if self._RE_JOB_FILE.match(name)]
def _LoadJobUnlocked(self, job_id): def _LoadJobUnlocked(self, job_id):
if job_id in self._memcache: job = self._memcache.get(job_id, None)
if job:
logging.debug("Found job %s in memcache", job_id) logging.debug("Found job %s in memcache", job_id)
return self._memcache[job_id] return job
filepath = self._GetJobPath(job_id) filepath = self._GetJobPath(job_id)
logging.debug("Loading job from %s", filepath) logging.debug("Loading job from %s", filepath)
...@@ -536,7 +538,7 @@ class JobQueue(object): ...@@ -536,7 +538,7 @@ class JobQueue(object):
# Write to disk # Write to disk
self.UpdateJobUnlocked(job) self.UpdateJobUnlocked(job)
logging.debug("Added new job %s to the cache", job_id) logging.debug("Adding new job %s to the cache", job_id)
self._memcache[job_id] = job self._memcache[job_id] = job
# Add to worker pool # Add to worker pool
...@@ -550,31 +552,10 @@ class JobQueue(object): ...@@ -550,31 +552,10 @@ class JobQueue(object):
data = serializer.DumpJson(job.Serialize(), indent=False) data = serializer.DumpJson(job.Serialize(), indent=False)
logging.debug("Writing job %s to %s", job.id, filename) logging.debug("Writing job %s to %s", job.id, filename)
self._WriteAndReplicateFileUnlocked(filename, data) self._WriteAndReplicateFileUnlocked(filename, data)
self._CleanCacheUnlocked([job.id])
# Notify waiters about potential changes # Notify waiters about potential changes
job.change.notifyAll() job.change.notifyAll()
def _CleanCacheUnlocked(self, exclude):
"""Clean the memory cache.
The exceptions argument contains job IDs that should not be
cleaned.
"""
assert isinstance(exclude, list)
for job in self._memcache.values():
if job.id in exclude:
continue
if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,
constants.JOB_STATUS_RUNNING):
logging.debug("Cleaning job %s from the cache", job.id)
try:
del self._memcache[job.id]
except KeyError:
pass
@utils.LockedMethod @utils.LockedMethod
@_RequireOpenQueue @_RequireOpenQueue
def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial): def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial):
...@@ -679,17 +660,12 @@ class JobQueue(object): ...@@ -679,17 +660,12 @@ class JobQueue(object):
logging.debug("Job %s is not yet done", job.id) logging.debug("Job %s is not yet done", job.id)
return return
try: old = self._GetJobPath(job.id)
old = self._GetJobPath(job.id) new = self._GetArchivedJobPath(job.id)
new = self._GetArchivedJobPath(job.id)
self._RenameFileUnlocked(old, new) self._RenameFileUnlocked(old, new)
logging.debug("Successfully archived job %s", job.id) logging.debug("Successfully archived job %s", job.id)
finally:
# Cleaning the cache because we don't know what os.rename actually did
# and to be on the safe side.
self._CleanCacheUnlocked([])
def _GetJobInfoUnlocked(self, job, fields): def _GetJobInfoUnlocked(self, job, fields):
row = [] row = []
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment