diff --git a/lib/jqueue.py b/lib/jqueue.py index 74d8de88a77782467e0618ec4c9974c3a021ac66..42ad15f3d1cc4b832c1c72f83150f292b90d112c 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -35,6 +35,7 @@ from ganeti import opcodes from ganeti import errors from ganeti import mcpu from ganeti import utils +from ganeti import jstore from ganeti import rpc @@ -267,7 +268,7 @@ class JobQueue(object): """ def wrapper(self, *args, **kwargs): - assert self.lock_fd, "Queue should be open" + assert self._queue_lock is not None, "Queue should be open" return fn(self, *args, **kwargs) return wrapper @@ -281,50 +282,13 @@ class JobQueue(object): self.acquire = self._lock.acquire self.release = self._lock.release - # Make sure our directories exists - for path in (constants.QUEUE_DIR, constants.JOB_QUEUE_ARCHIVE_DIR): - try: - os.mkdir(path, 0700) - except OSError, err: - if err.errno not in (errno.EEXIST, ): - raise - - # Get queue lock - self.lock_fd = open(constants.JOB_QUEUE_LOCK_FILE, "w") - try: - utils.LockFile(self.lock_fd) - except: - self.lock_fd.close() - raise - - # Read version - try: - version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r") - except IOError, err: - if err.errno not in (errno.ENOENT, ): - raise - - # Setup a new queue - self._InitQueueUnlocked() - - # Try to open again - version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r") + # Initialize + self._queue_lock = jstore.InitAndVerifyQueue(exclusive=True) - try: - # Try to read version - version = int(version_fd.read(128)) - - # Verify version - if version != constants.JOB_QUEUE_VERSION: - raise errors.JobQueueError("Found version %s, expected %s", - version, constants.JOB_QUEUE_VERSION) - finally: - version_fd.close() - - self._last_serial = self._ReadSerial() - if self._last_serial is None: - raise errors.ConfigurationError("Can't read/parse the job queue serial" - " file") + # Read serial file + self._last_serial = jstore.ReadSerial() + assert self._last_serial is not None, ("Serial file was modified between" + " check in jstore and here") # Setup worker pool self._wpool = _JobQueueWorkerPool(self) @@ -350,34 +314,6 @@ class JobQueue(object): finally: self.release() - @staticmethod - def _ReadSerial(): - """Try to read the job serial file. - - @rtype: None or int - @return: If the serial can be read, then it is returned. Otherwise None - is returned. - - """ - try: - serial_fd = open(constants.JOB_QUEUE_SERIAL_FILE, "r") - try: - # Read last serial - serial = int(serial_fd.read(1024).strip()) - finally: - serial_fd.close() - except (ValueError, EnvironmentError): - serial = None - - return serial - - def _InitQueueUnlocked(self): - utils.WriteFile(constants.JOB_QUEUE_VERSION_FILE, - data="%s\n" % constants.JOB_QUEUE_VERSION) - if self._ReadSerial() is None: - utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE, - data="%s\n" % 0) - def _FormatJobID(self, job_id): if not isinstance(job_id, (int, long)): raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id) @@ -657,5 +593,5 @@ class JobQueue(object): """ self._wpool.TerminateWorkers() - self.lock_fd.close() - self.lock_fd = None + self._queue_lock.Close() + self._queue_lock = None