From 04ab05cefa2b2cee90d18878903317e519a34ae8 Mon Sep 17 00:00:00 2001 From: Michael Hanselmann <hansmi@google.com> Date: Mon, 4 Aug 2008 12:27:18 +0000 Subject: [PATCH] jqueue: Use new jstore module Reviewed-by: iustinp --- lib/jqueue.py | 84 ++++++--------------------------------------------- 1 file changed, 10 insertions(+), 74 deletions(-) diff --git a/lib/jqueue.py b/lib/jqueue.py index 74d8de88a..42ad15f3d 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -35,6 +35,7 @@ from ganeti import opcodes from ganeti import errors from ganeti import mcpu from ganeti import utils +from ganeti import jstore from ganeti import rpc @@ -267,7 +268,7 @@ class JobQueue(object): """ def wrapper(self, *args, **kwargs): - assert self.lock_fd, "Queue should be open" + assert self._queue_lock is not None, "Queue should be open" return fn(self, *args, **kwargs) return wrapper @@ -281,50 +282,13 @@ class JobQueue(object): self.acquire = self._lock.acquire self.release = self._lock.release - # Make sure our directories exists - for path in (constants.QUEUE_DIR, constants.JOB_QUEUE_ARCHIVE_DIR): - try: - os.mkdir(path, 0700) - except OSError, err: - if err.errno not in (errno.EEXIST, ): - raise - - # Get queue lock - self.lock_fd = open(constants.JOB_QUEUE_LOCK_FILE, "w") - try: - utils.LockFile(self.lock_fd) - except: - self.lock_fd.close() - raise - - # Read version - try: - version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r") - except IOError, err: - if err.errno not in (errno.ENOENT, ): - raise - - # Setup a new queue - self._InitQueueUnlocked() - - # Try to open again - version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r") + # Initialize + self._queue_lock = jstore.InitAndVerifyQueue(exclusive=True) - try: - # Try to read version - version = int(version_fd.read(128)) - - # Verify version - if version != constants.JOB_QUEUE_VERSION: - raise errors.JobQueueError("Found version %s, expected %s", - version, constants.JOB_QUEUE_VERSION) - finally: - version_fd.close() - - self._last_serial = self._ReadSerial() - if self._last_serial is None: - raise errors.ConfigurationError("Can't read/parse the job queue serial" - " file") + # Read serial file + self._last_serial = jstore.ReadSerial() + assert self._last_serial is not None, ("Serial file was modified between" + " check in jstore and here") # Setup worker pool self._wpool = _JobQueueWorkerPool(self) @@ -350,34 +314,6 @@ class JobQueue(object): finally: self.release() - @staticmethod - def _ReadSerial(): - """Try to read the job serial file. - - @rtype: None or int - @return: If the serial can be read, then it is returned. Otherwise None - is returned. - - """ - try: - serial_fd = open(constants.JOB_QUEUE_SERIAL_FILE, "r") - try: - # Read last serial - serial = int(serial_fd.read(1024).strip()) - finally: - serial_fd.close() - except (ValueError, EnvironmentError): - serial = None - - return serial - - def _InitQueueUnlocked(self): - utils.WriteFile(constants.JOB_QUEUE_VERSION_FILE, - data="%s\n" % constants.JOB_QUEUE_VERSION) - if self._ReadSerial() is None: - utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE, - data="%s\n" % 0) - def _FormatJobID(self, job_id): if not isinstance(job_id, (int, long)): raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id) @@ -657,5 +593,5 @@ class JobQueue(object): """ self._wpool.TerminateWorkers() - self.lock_fd.close() - self.lock_fd = None + self._queue_lock.Close() + self._queue_lock = None -- GitLab