Commit 04ab05ce authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

jqueue: Use new jstore module

Reviewed-by: iustinp
parent 8b537bb0
......@@ -35,6 +35,7 @@ from ganeti import opcodes
from ganeti import errors
from ganeti import mcpu
from ganeti import utils
from ganeti import jstore
from ganeti import rpc
......@@ -267,7 +268,7 @@ class JobQueue(object):
"""
def wrapper(self, *args, **kwargs):
assert self.lock_fd, "Queue should be open"
assert self._queue_lock is not None, "Queue should be open"
return fn(self, *args, **kwargs)
return wrapper
......@@ -281,50 +282,13 @@ class JobQueue(object):
self.acquire = self._lock.acquire
self.release = self._lock.release
# Make sure our directories exists
for path in (constants.QUEUE_DIR, constants.JOB_QUEUE_ARCHIVE_DIR):
try:
os.mkdir(path, 0700)
except OSError, err:
if err.errno not in (errno.EEXIST, ):
raise
# Get queue lock
self.lock_fd = open(constants.JOB_QUEUE_LOCK_FILE, "w")
try:
utils.LockFile(self.lock_fd)
except:
self.lock_fd.close()
raise
# Read version
try:
version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r")
except IOError, err:
if err.errno not in (errno.ENOENT, ):
raise
# Setup a new queue
self._InitQueueUnlocked()
# Try to open again
version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r")
# Initialize
self._queue_lock = jstore.InitAndVerifyQueue(exclusive=True)
try:
# Try to read version
version = int(version_fd.read(128))
# Verify version
if version != constants.JOB_QUEUE_VERSION:
raise errors.JobQueueError("Found version %s, expected %s",
version, constants.JOB_QUEUE_VERSION)
finally:
version_fd.close()
self._last_serial = self._ReadSerial()
if self._last_serial is None:
raise errors.ConfigurationError("Can't read/parse the job queue serial"
" file")
# Read serial file
self._last_serial = jstore.ReadSerial()
assert self._last_serial is not None, ("Serial file was modified between"
" check in jstore and here")
# Setup worker pool
self._wpool = _JobQueueWorkerPool(self)
......@@ -350,34 +314,6 @@ class JobQueue(object):
finally:
self.release()
@staticmethod
def _ReadSerial():
"""Try to read the job serial file.
@rtype: None or int
@return: If the serial can be read, then it is returned. Otherwise None
is returned.
"""
try:
serial_fd = open(constants.JOB_QUEUE_SERIAL_FILE, "r")
try:
# Read last serial
serial = int(serial_fd.read(1024).strip())
finally:
serial_fd.close()
except (ValueError, EnvironmentError):
serial = None
return serial
def _InitQueueUnlocked(self):
utils.WriteFile(constants.JOB_QUEUE_VERSION_FILE,
data="%s\n" % constants.JOB_QUEUE_VERSION)
if self._ReadSerial() is None:
utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
data="%s\n" % 0)
def _FormatJobID(self, job_id):
if not isinstance(job_id, (int, long)):
raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
......@@ -657,5 +593,5 @@ class JobQueue(object):
"""
self._wpool.TerminateWorkers()
self.lock_fd.close()
self.lock_fd = None
self._queue_lock.Close()
self._queue_lock = None
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment