Commit 3dbe3ddf authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

SharedLock: Implement downgrade from exclusive to shared mode



If a job needs to modify a resource and then wait for a result, it must
acquire the resource lock in exclusive mode. In some cases it would be
possible to only have a shared lock for waiting. Until now it was not
possible to change a lock's mode once it'd been acquired. Releasing and
re-acquiring might have been possible, but would require many more
checks and can introduce new issues.

With this patch a new method, named “downgrade”, is added to Ganeti's
own SharedLock class. It can only be called when the lock is held in
exclusive mode and changes it to shared. If there are any pending shared
acquires on the same priority, they're moved to the front of the queue
and notified (jumping ahead of exclusive acquires).

In a lockset the internal lock will be downgraded if, and only if, all
individual locks owned by the current thread are either released or
acquired in shared mode.

Unittests are provided.
Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarGuido Trotter <ultrotter@google.com>
parent 9133387e
......@@ -727,6 +727,48 @@ class SharedLock(object):
finally:
self.__lock.release()
def downgrade(self):
"""Changes the lock mode from exclusive to shared.
Pending acquires in shared mode on the same priority will go ahead.
"""
self.__lock.acquire()
try:
assert self.__is_owned(), "Lock must be owned"
if self.__is_exclusive():
# Do nothing if the lock is already acquired in shared mode
self.__exc = None
self.__do_acquire(1)
# Important: pending shared acquires should only jump ahead if there
# was a transition from exclusive to shared, otherwise an owner of a
# shared lock can keep calling this function to push incoming shared
# acquires
(priority, prioqueue) = self.__find_first_pending_queue()
if prioqueue:
# Is there a pending shared acquire on this priority?
cond = self.__pending_shared.pop(priority, None)
if cond:
assert cond.shared
assert cond in prioqueue
# Ensure shared acquire is on top of queue
if len(prioqueue) > 1:
prioqueue.remove(cond)
prioqueue.insert(0, cond)
# Notify
cond.notifyAll()
assert not self.__is_exclusive()
assert self.__is_sharer()
return True
finally:
self.__lock.release()
def release(self):
"""Release a Shared Lock.
......@@ -881,6 +923,21 @@ class LockSet:
"""
return "%s/%s" % (self.name, mname)
def _get_lock(self):
"""Returns the lockset-internal lock.
"""
return self.__lock
def _get_lockdict(self):
"""Returns the lockset-internal lock dictionary.
Accessing this structure is only safe in single-thread usage or when the
lockset-internal lock is held.
"""
return self.__lockdict
def _is_owned(self):
"""Is the current thread a current level owner?"""
return threading.currentThread() in self.__owners
......@@ -1122,6 +1179,42 @@ class LockSet:
return acquired
def downgrade(self, names=None):
"""Downgrade a set of resource locks from exclusive to shared mode.
The locks must have been acquired in exclusive mode.
"""
assert self._is_owned(), ("downgrade on lockset %s while not owning any"
" lock" % self.name)
# Support passing in a single resource to downgrade rather than many
if isinstance(names, basestring):
names = [names]
owned = self._list_owned()
if names is None:
names = owned
else:
names = set(names)
assert owned.issuperset(names), \
("downgrade() on unheld resources %s (set %s)" %
(names.difference(owned), self.name))
for lockname in names:
self.__lockdict[lockname].downgrade()
# Do we own the lockset in exclusive mode?
if self.__lock._is_owned(shared=0):
# Have all locks been downgraded?
if not compat.any(lock._is_owned(shared=0)
for lock in self.__lockdict.values()):
self.__lock.downgrade()
assert self.__lock._is_owned(shared=1)
return True
def release(self, names=None):
"""Release a set of resource locks, at the same level.
......@@ -1458,6 +1551,22 @@ class GanetiLockManager:
return self.__keyring[level].acquire(names, shared=shared, timeout=timeout,
priority=priority)
def downgrade(self, level, names=None):
"""Downgrade a set of resource locks from exclusive to shared mode.
You must have acquired the locks in exclusive mode.
@type level: member of locking.LEVELS
@param level: the level at which the locks shall be downgraded
@type names: list of strings, or None
@param names: the names of the locks which shall be downgraded
(defaults to all the locks acquired at the level)
"""
assert level in LEVELS, "Invalid locking level %s" % level
return self.__keyring[level].downgrade(names=names)
def release(self, level, names=None):
"""Release a set of resource locks, at the same level.
......
......@@ -612,6 +612,116 @@ class TestSharedLock(_ThreadedTestCase):
self.assertRaises(Queue.Empty, self.done.get_nowait)
def testIllegalDowngrade(self):
# Not yet acquired
self.assertRaises(AssertionError, self.sl.downgrade)
# Acquire in shared mode, downgrade should be no-op
self.assertTrue(self.sl.acquire(shared=1))
self.assertTrue(self.sl._is_owned(shared=1))
self.assertTrue(self.sl.downgrade())
self.assertTrue(self.sl._is_owned(shared=1))
self.sl.release()
def testDowngrade(self):
self.assertTrue(self.sl.acquire())
self.assertTrue(self.sl._is_owned(shared=0))
self.assertTrue(self.sl.downgrade())
self.assertTrue(self.sl._is_owned(shared=1))
self.sl.release()
@_Repeat
def testDowngradeJumpsAheadOfExclusive(self):
def _KeepExclusive(ev_got, ev_downgrade, ev_release):
self.assertTrue(self.sl.acquire())
self.assertTrue(self.sl._is_owned(shared=0))
ev_got.set()
ev_downgrade.wait()
self.assertTrue(self.sl._is_owned(shared=0))
self.assertTrue(self.sl.downgrade())
self.assertTrue(self.sl._is_owned(shared=1))
ev_release.wait()
self.assertTrue(self.sl._is_owned(shared=1))
self.sl.release()
def _KeepExclusive2(ev_started, ev_release):
self.assertTrue(self.sl.acquire(test_notify=ev_started.set))
self.assertTrue(self.sl._is_owned(shared=0))
ev_release.wait()
self.assertTrue(self.sl._is_owned(shared=0))
self.sl.release()
def _KeepShared(ev_started, ev_got, ev_release):
self.assertTrue(self.sl.acquire(shared=1, test_notify=ev_started.set))
self.assertTrue(self.sl._is_owned(shared=1))
ev_got.set()
ev_release.wait()
self.assertTrue(self.sl._is_owned(shared=1))
self.sl.release()
# Acquire lock in exclusive mode
ev_got_excl1 = threading.Event()
ev_downgrade_excl1 = threading.Event()
ev_release_excl1 = threading.Event()
th_excl1 = self._addThread(target=_KeepExclusive,
args=(ev_got_excl1, ev_downgrade_excl1,
ev_release_excl1))
ev_got_excl1.wait()
# Start a second exclusive acquire
ev_started_excl2 = threading.Event()
ev_release_excl2 = threading.Event()
th_excl2 = self._addThread(target=_KeepExclusive2,
args=(ev_started_excl2, ev_release_excl2))
ev_started_excl2.wait()
# Start shared acquires, will jump ahead of second exclusive acquire when
# first exclusive acquire downgrades
ev_shared = [(threading.Event(), threading.Event()) for _ in range(5)]
ev_release_shared = threading.Event()
th_shared = [self._addThread(target=_KeepShared,
args=(ev_started, ev_got, ev_release_shared))
for (ev_started, ev_got) in ev_shared]
# Wait for all shared acquires to start
for (ev, _) in ev_shared:
ev.wait()
# Check lock information
self.assertEqual(self.sl.GetInfo(set([query.LQ_MODE, query.LQ_OWNER])),
(self.sl.name, "exclusive", [th_excl1.getName()], None))
(_, _, _, pending) = self.sl.GetInfo(set([query.LQ_PENDING]))
self.assertEqual([(pendmode, sorted(waiting))
for (pendmode, waiting) in pending],
[("exclusive", [th_excl2.getName()]),
("shared", sorted(th.getName() for th in th_shared))])
# Shared acquires won't start until the exclusive lock is downgraded
ev_downgrade_excl1.set()
# Wait for all shared acquires to be successful
for (_, ev) in ev_shared:
ev.wait()
# Check lock information again
self.assertEqual(self.sl.GetInfo(set([query.LQ_MODE, query.LQ_PENDING])),
(self.sl.name, "shared", None,
[("exclusive", [th_excl2.getName()])]))
(_, _, owner, _) = self.sl.GetInfo(set([query.LQ_OWNER]))
self.assertEqual(set(owner), set([th_excl1.getName()] +
[th.getName() for th in th_shared]))
ev_release_excl1.set()
ev_release_excl2.set()
ev_release_shared.set()
self._waitThreads()
self.assertEqual(self.sl.GetInfo(set([query.LQ_MODE, query.LQ_OWNER,
query.LQ_PENDING])),
(self.sl.name, None, None, []))
@_Repeat
def testMixedAcquireTimeout(self):
sync = threading.Event()
......@@ -1374,6 +1484,61 @@ class TestLockSet(_ThreadedTestCase):
self.assertEqual(self.done.get_nowait(), 'DONE')
self._setUpLS()
def testAcquireWithNamesDowngrade(self):
self.assertEquals(self.ls.acquire("two", shared=0), set(["two"]))
self.assertTrue(self.ls._is_owned())
self.assertFalse(self.ls._get_lock()._is_owned())
self.ls.release()
self.assertFalse(self.ls._is_owned())
self.assertFalse(self.ls._get_lock()._is_owned())
# Can't downgrade after releasing
self.assertRaises(AssertionError, self.ls.downgrade, "two")
def testDowngrade(self):
# Not owning anything, must raise an exception
self.assertFalse(self.ls._is_owned())
self.assertRaises(AssertionError, self.ls.downgrade)
self.assertFalse(compat.any(i._is_owned()
for i in self.ls._get_lockdict().values()))
self.assertEquals(self.ls.acquire(None, shared=0),
set(["one", "two", "three"]))
self.assertRaises(AssertionError, self.ls.downgrade, "unknown lock")
self.assertTrue(self.ls._get_lock()._is_owned(shared=0))
self.assertTrue(compat.all(i._is_owned(shared=0)
for i in self.ls._get_lockdict().values()))
# Start downgrading locks
self.assertTrue(self.ls.downgrade(names=["one"]))
self.assertTrue(self.ls._get_lock()._is_owned(shared=0))
self.assertTrue(compat.all(lock._is_owned(shared=[0, 1][int(name == "one")])
for name, lock in
self.ls._get_lockdict().items()))
self.assertTrue(self.ls.downgrade(names="two"))
self.assertTrue(self.ls._get_lock()._is_owned(shared=0))
should_share = lambda name: [0, 1][int(name in ("one", "two"))]
self.assertTrue(compat.all(lock._is_owned(shared=should_share(name))
for name, lock in
self.ls._get_lockdict().items()))
# Downgrading the last exclusive lock to shared must downgrade the
# lockset-internal lock too
self.assertTrue(self.ls.downgrade(names="three"))
self.assertTrue(self.ls._get_lock()._is_owned(shared=1))
self.assertTrue(compat.all(i._is_owned(shared=1)
for i in self.ls._get_lockdict().values()))
# Downgrading a shared lock must be a no-op
self.assertTrue(self.ls.downgrade(names=["one", "three"]))
self.assertTrue(self.ls._get_lock()._is_owned(shared=1))
self.assertTrue(compat.all(i._is_owned(shared=1)
for i in self.ls._get_lockdict().values()))
self.ls.release()
def testPriority(self):
def _Acquire(prev, next, name, priority, success_fn):
prev.wait()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment