Commit 5d6fb8eb authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

jstore: Change to not always require a lock

This way we can do locking when both noded and masterd are running
on the same machine, the latter holding an exclusive lock on the
queue.

Reviewed-by: iustinp
parent aa9075c5
......@@ -283,7 +283,7 @@ class JobQueue(object):
self.release = self._lock.release
# Initialize
self._queue_lock = jstore.InitAndVerifyQueue(exclusive=True)
self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
# Read serial file
self._last_serial = jstore.ReadSerial()
......
......@@ -69,14 +69,13 @@ def ReadVersion():
return _ReadNumericFile(constants.JOB_QUEUE_VERSION_FILE)
def InitAndVerifyQueue(exclusive):
def InitAndVerifyQueue(must_lock):
"""Open and lock job queue.
If necessary, the queue is automatically initialized.
@type exclusive: bool
@param exclusive: Whether to lock the queue in exclusive mode. Shared
mode otherwise.
@type must_lock: bool
@param must_lock: Whether an exclusive lock must be held.
@rtype: utils.FileLock
@return: Lock object for the queue. This can be used to change the
locking mode.
......@@ -93,40 +92,47 @@ def InitAndVerifyQueue(exclusive):
# Lock queue
queue_lock = utils.FileLock(constants.JOB_QUEUE_LOCK_FILE)
try:
# Determine locking function and call it
if exclusive:
fn = queue_lock.Exclusive
# The queue needs to be locked in exclusive mode to write to the serial and
# version files.
if must_lock:
queue_lock.Exclusive(blocking=True)
holding_lock = True
else:
fn = queue_lock.Shared
fn(blocking=False)
# Verify version
version = ReadVersion()
if version is None:
# Write new version file
utils.WriteFile(constants.JOB_QUEUE_VERSION_FILE,
data="%s\n" % constants.JOB_QUEUE_VERSION)
# Read again
try:
queue_lock.Exclusive(blocking=False)
holding_lock = True
except errors.LockError:
# Ignore errors and assume the process keeping the lock checked
# everything.
holding_lock = False
if holding_lock:
# Verify version
version = ReadVersion()
if version is None:
# Write new version file
utils.WriteFile(constants.JOB_QUEUE_VERSION_FILE,
data="%s\n" % constants.JOB_QUEUE_VERSION)
if version != constants.JOB_QUEUE_VERSION:
raise errors.JobQueueError("Found job queue version %s, expected %s",
version, constants.JOB_QUEUE_VERSION)
# Read again
version = ReadVersion()
serial = ReadSerial()
if serial is None:
# Write new serial file
utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
data="%s\n" % 0)
if version != constants.JOB_QUEUE_VERSION:
raise errors.JobQueueError("Found job queue version %s, expected %s",
version, constants.JOB_QUEUE_VERSION)
# Read again
serial = ReadSerial()
if serial is None:
# Write new serial file
utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
data="%s\n" % 0)
# Read again
serial = ReadSerial()
if serial is None:
# There must be a serious problem
raise errors.JobQueueError("Can't read/parse the job queue serial file")
if serial is None:
# There must be a serious problem
raise errors.JobQueueError("Can't read/parse the job queue serial file")
except:
queue_lock.Close()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment