Commit a95c53ea authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

locking: Implement opportunistic locking in LockSet

This patch adds a new parameter to “LockSet.acquire” named
“opportunistic”. When enabled the lockset will try to acquire as many
locks as possible, but it won't wait for them (with the exception of the
lockset-internal lock in case the whole set is acquired). This is
implemented by using a timeout of 0 seconds when acquiring individual
locks. Commit 03c5291c

 made such acquisitions significantly cheaper.

The most complicated code included in this patch is probably the helper
function used to determine which mode to use and which timeout functions
are needed.

Full unit tests are included for the new and changed code
(“locking.py”'s overall coverage is at 97%).
Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarIustin Pop <iustin@google.com>
parent 9b4329e9
......@@ -53,7 +53,14 @@ _LOCK_ACQUIRE_MIN_TIMEOUT = (1.0 / 1000)
# Internal lock acquisition modes for L{LockSet}
(_LS_ACQUIRE_EXACT,
_LS_ACQUIRE_ALL) = range(1, 3)
_LS_ACQUIRE_ALL,
_LS_ACQUIRE_OPPORTUNISTIC) = range(1, 4)
_LS_ACQUIRE_MODES = frozenset([
_LS_ACQUIRE_EXACT,
_LS_ACQUIRE_ALL,
_LS_ACQUIRE_OPPORTUNISTIC,
])
def ssynchronized(mylock, shared=0):
......@@ -917,6 +924,52 @@ class SharedLock(object):
ALL_SET = None
def _TimeoutZero():
"""Returns the number zero.
"""
return 0
def _GetLsAcquireModeAndTimeouts(want_all, timeout, opportunistic):
"""Determines modes and timeouts for L{LockSet.acquire}.
@type want_all: boolean
@param want_all: Whether all locks in set should be acquired
@param timeout: Timeout in seconds or C{None}
@param opportunistic: Whther locks should be acquired opportunistically
@rtype: tuple
@return: Tuple containing mode to be passed to L{LockSet.__acquire_inner}
(one of L{_LS_ACQUIRE_MODES}), a function to calculate timeout for
acquiring the lockset-internal lock (might be C{None}) and a function to
calculate the timeout for acquiring individual locks
"""
# Short circuit when no running timeout is needed
if opportunistic and not want_all:
assert timeout is None, "Got timeout for an opportunistic acquisition"
return (_LS_ACQUIRE_OPPORTUNISTIC, None, _TimeoutZero)
# We need to keep track of how long we spent waiting for a lock. The
# timeout passed to this function is over all lock acquisitions.
running_timeout = utils.RunningTimeout(timeout, False)
if want_all:
mode = _LS_ACQUIRE_ALL
ls_timeout_fn = running_timeout.Remaining
else:
mode = _LS_ACQUIRE_EXACT
ls_timeout_fn = None
if opportunistic:
mode = _LS_ACQUIRE_OPPORTUNISTIC
timeout_fn = _TimeoutZero
else:
timeout_fn = running_timeout.Remaining
return (mode, ls_timeout_fn, timeout_fn)
class _AcquireTimeout(Exception):
"""Internal exception to abort an acquire on a timeout.
......@@ -1114,9 +1167,12 @@ class LockSet:
return set(result)
def acquire(self, names, timeout=None, shared=0, priority=None,
test_notify=None):
opportunistic=False, test_notify=None):
"""Acquire a set of resource locks.
@note: When acquiring locks opportunistically, any number of locks might
actually be acquired, even zero.
@type names: list of strings (or string)
@param names: the names of the locks which shall be acquired
(special lock names, or instance/node names)
......@@ -1124,9 +1180,16 @@ class LockSet:
@param shared: whether to acquire in shared mode; by default an
exclusive lock will be acquired
@type timeout: float or None
@param timeout: Maximum time to acquire all locks
@param timeout: Maximum time to acquire all locks; for opportunistic
acquisitions, a timeout can only be given when C{names} is C{None}, in
which case it is exclusively used for acquiring the L{LockSet}-internal
lock; opportunistic acquisitions don't use a timeout for acquiring
individual locks
@type priority: integer
@param priority: Priority for acquiring locks
@type opportunistic: boolean
@param opportunistic: Acquire locks opportunistically; use the return value
to determine which locks were actually acquired
@type test_notify: callable or None
@param test_notify: Special callback function for unittesting
......@@ -1146,20 +1209,26 @@ class LockSet:
if priority is None:
priority = _DEFAULT_PRIORITY
# We need to keep track of how long we spent waiting for a lock. The
# timeout passed to this function is over all lock acquires.
running_timeout = utils.RunningTimeout(timeout, False)
try:
if names is not None:
assert timeout is None or not opportunistic, \
("Opportunistic acquisitions can only use a timeout if no"
" names are given; see docstring for details")
# Support passing in a single resource to acquire rather than many
if isinstance(names, basestring):
names = [names]
return self.__acquire_inner(names, _LS_ACQUIRE_EXACT, shared, priority,
running_timeout.Remaining, test_notify)
(mode, _, timeout_fn) = \
_GetLsAcquireModeAndTimeouts(False, timeout, opportunistic)
return self.__acquire_inner(names, mode, shared, priority,
timeout_fn, test_notify)
else:
(mode, ls_timeout_fn, timeout_fn) = \
_GetLsAcquireModeAndTimeouts(True, timeout, opportunistic)
# If no names are given acquire the whole set by not letting new names
# being added before we release, and getting the current list of names.
# Some of them may then be deleted later, but we'll cope with this.
......@@ -1170,15 +1239,15 @@ class LockSet:
# anyway, though, so we'll get the list lock exclusively as well in
# order to be able to do add() on the set while owning it.
if not self.__lock.acquire(shared=shared, priority=priority,
timeout=running_timeout.Remaining()):
timeout=ls_timeout_fn()):
raise _AcquireTimeout()
try:
# note we own the set-lock
self._add_owned()
return self.__acquire_inner(self.__names(), _LS_ACQUIRE_ALL, shared,
priority, running_timeout.Remaining,
test_notify)
return self.__acquire_inner(self.__names(), mode, shared,
priority, timeout_fn, test_notify)
except:
# We shouldn't have problems adding the lock to the owners list, but
# if we did we'll try to release this lock and re-raise exception.
......@@ -1194,15 +1263,27 @@ class LockSet:
timeout_fn, test_notify):
"""Inner logic for acquiring a number of locks.
Acquisition modes:
- C{_LS_ACQUIRE_ALL}: C{names} contains names of all locks in set, but
deleted locks can be ignored as the whole set is being acquired with
its internal lock held
- C{_LS_ACQUIRE_EXACT}: The names listed in C{names} must be acquired;
timeouts and deleted locks are fatal
- C{_LS_ACQUIRE_OPPORTUNISTIC}: C{names} lists names of locks (potentially
all within the set) which should be acquired opportunistically, that is
failures are ignored
@param names: Names of the locks to be acquired
@param mode: Lock acquisition mode
@param mode: Lock acquisition mode (one of L{_LS_ACQUIRE_MODES})
@param shared: Whether to acquire in shared mode
@param timeout_fn: Function returning remaining timeout
@param timeout_fn: Function returning remaining timeout (C{None} for
opportunistic acquisitions)
@param priority: Priority for acquiring locks
@param test_notify: Special callback function for unittesting
"""
assert mode in (_LS_ACQUIRE_EXACT, _LS_ACQUIRE_ALL)
assert mode in _LS_ACQUIRE_MODES
acquire_list = []
......@@ -1246,7 +1327,7 @@ class LockSet:
priority=priority,
test_notify=test_notify_fn)
except errors.LockError:
if mode == _LS_ACQUIRE_ALL:
if mode in (_LS_ACQUIRE_ALL, _LS_ACQUIRE_OPPORTUNISTIC):
# We are acquiring the whole set, it doesn't matter if this
# particular element is not there anymore.
continue
......@@ -1256,6 +1337,10 @@ class LockSet:
if not acq_success:
# Couldn't get lock or timeout occurred
if mode == _LS_ACQUIRE_OPPORTUNISTIC:
# Ignore timeouts on opportunistic acquisitions
continue
if timeout is None:
# This shouldn't happen as SharedLock.acquire(timeout=None) is
# blocking.
......
......@@ -1780,6 +1780,235 @@ class TestLockSet(_ThreadedTestCase):
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.assertRaises(Queue.Empty, done_two.get_nowait)
def testNamesWithOpportunisticAndTimeout(self):
self.assertRaises(AssertionError, self.ls.acquire,
["one", "two"], timeout=1.0, opportunistic=True)
def testOpportunisticWithUnknownName(self):
name = "unknown"
self.assertFalse(name in self.ls._names())
result = self.ls.acquire(name, opportunistic=True)
self.assertFalse(result)
self.assertFalse(self.ls.list_owned())
result = self.ls.acquire(["two", name], opportunistic=True)
self.assertEqual(result, set(["two"]))
self.assertEqual(self.ls.list_owned(), set(["two"]))
self.ls.release()
def testSimpleOpportunisticAcquisition(self):
self.assertEquals(self.ls._names(), set(["one", "two", "three"]))
# Hold a lock in main thread
self.assertEqual(self.ls.acquire("two", shared=0), set(["two"]))
def fn():
# The lock "two" is held by the main thread
result = self.ls.acquire(["one", "two"], shared=0, opportunistic=True)
self.assertEqual(result, set(["one"]))
self.assertEqual(self.ls.list_owned(), set(["one"]))
self.assertFalse(self.ls._get_lock().is_owned())
self.ls.release()
self.assertFalse(self.ls.list_owned())
# Try to acquire the lock held by the main thread
result = self.ls.acquire(["two"], shared=0, opportunistic=True)
self.assertFalse(self.ls._get_lock().is_owned())
self.assertFalse(result)
self.assertFalse(self.ls.list_owned())
# Try to acquire all locks
result = self.ls.acquire(locking.ALL_SET, shared=0, opportunistic=True)
self.assertTrue(self.ls._get_lock().is_owned(),
msg="Internal lock is not owned")
self.assertEqual(result, set(["one", "three"]))
self.assertEqual(self.ls.list_owned(), set(["one", "three"]))
self.ls.release()
self.assertFalse(self.ls.list_owned())
self.done.put(True)
self._addThread(target=fn)
# Wait for threads to finish
self._waitThreads()
self.assertEqual(self.ls.list_owned(), set(["two"]))
self.ls.release()
self.assertFalse(self.ls.list_owned())
self.assertFalse(self.ls._get_lock().is_owned())
self.assertTrue(self.done.get_nowait())
self.assertRaises(Queue.Empty, self.done.get_nowait)
def testOpportunisticAcquisitionWithoutNamesExpires(self):
self.assertEquals(self.ls._names(), set(["one", "two", "three"]))
# Hold all locks in main thread
self.ls.acquire(locking.ALL_SET, shared=0)
self.assertTrue(self.ls._get_lock().is_owned())
def fn():
# Try to acquire all locks in separate thread
result = self.ls.acquire(locking.ALL_SET, shared=0, opportunistic=True,
timeout=0.1)
self.assertFalse(result)
self.assertFalse(self.ls._get_lock().is_owned())
self.assertFalse(self.ls.list_owned())
# Try once more without a timeout
self.assertFalse(self.ls.acquire("one", shared=0, opportunistic=True))
self.done.put(True)
self._addThread(target=fn)
# Wait for threads to finish
self._waitThreads()
self.assertEqual(self.ls.list_owned(), set(["one", "two", "three"]))
self.ls.release()
self.assertFalse(self.ls.list_owned())
self.assertFalse(self.ls._get_lock().is_owned(shared=0))
self.assertTrue(self.done.get_nowait())
self.assertRaises(Queue.Empty, self.done.get_nowait)
def testSharedOpportunisticAcquisitionWithoutNames(self):
self.assertEquals(self.ls._names(), set(["one", "two", "three"]))
# Hold all locks in main thread
self.ls.acquire(locking.ALL_SET, shared=1)
self.assertTrue(self.ls._get_lock().is_owned(shared=1))
def fn():
# Try to acquire all locks in separate thread in shared mode
result = self.ls.acquire(locking.ALL_SET, shared=1, opportunistic=True,
timeout=0.1)
self.assertEqual(result, set(["one", "two", "three"]))
self.assertTrue(self.ls._get_lock().is_owned(shared=1))
self.ls.release()
self.assertFalse(self.ls._get_lock().is_owned())
# Try one in exclusive mode
self.assertFalse(self.ls.acquire("one", shared=0, opportunistic=True))
self.done.put(True)
self._addThread(target=fn)
# Wait for threads to finish
self._waitThreads()
self.assertEqual(self.ls.list_owned(), set(["one", "two", "three"]))
self.ls.release()
self.assertFalse(self.ls.list_owned())
self.assertFalse(self.ls._get_lock().is_owned())
self.assertTrue(self.done.get_nowait())
self.assertRaises(Queue.Empty, self.done.get_nowait)
def testLockDeleteWithOpportunisticAcquisition(self):
# This test exercises some code handling LockError on acquisition, that is
# after all lock names have been gathered. This shouldn't happen in reality
# as removing locks from the set requires the lockset-internal lock, but
# the code should handle the situation anyway.
ready = threading.Event()
finished = threading.Event()
self.assertEquals(self.ls._names(), set(["one", "two", "three"]))
# Thread function to delete lock
def fn():
# Wait for notification
ready.wait()
# Delete lock named "two" by accessing lockset-internal data
ld = self.ls._get_lockdict()
self.assertTrue(ld["two"].delete())
self.done.put("deleted.two")
# Notify helper
finished.set()
self._addThread(target=fn)
# Notification helper, called when lock already holds internal lock.
# Therefore only one of the locks not yet locked can be deleted.
def notify(name):
self.done.put("notify.%s" % name)
if name == "one":
# Tell helper thread to delete lock "two"
ready.set()
finished.wait()
# Hold all locks in main thread
self.ls.acquire(locking.ALL_SET, shared=0, test_notify=notify)
self.assertEqual(self.ls.list_owned(), set(["one", "three"]))
# Wait for threads to finish
self._waitThreads()
# Release all locks
self.ls.release()
self.assertFalse(self.ls.list_owned())
self.assertFalse(self.ls._get_lock().is_owned())
self.assertEqual(self.done.get_nowait(), "notify.one")
self.assertEqual(self.done.get_nowait(), "deleted.two")
self.assertEqual(self.done.get_nowait(), "notify.three")
self.assertEqual(self.done.get_nowait(), "notify.two")
self.assertRaises(Queue.Empty, self.done.get_nowait)
class TestGetLsAcquireModeAndTimeouts(unittest.TestCase):
def setUp(self):
self.fn = locking._GetLsAcquireModeAndTimeouts
def testOpportunisticWithoutNames(self):
(mode, ls_timeout_fn, timeout_fn) = self.fn(False, None, True)
self.assertEqual(mode, locking._LS_ACQUIRE_OPPORTUNISTIC)
self.assertTrue(ls_timeout_fn is None)
self.assertEqual(timeout_fn(), 0)
def testAllInputCombinations(self):
for want_all in [False, True]:
for timeout in [None, 0, 100]:
for opportunistic in [False, True]:
if (opportunistic and
not want_all and
timeout is not None):
# Can't accept a timeout when acquiring opportunistically
self.assertRaises(AssertionError, self.fn,
want_all, timeout, opportunistic)
else:
(mode, ls_timeout_fn, timeout_fn) = \
self.fn(want_all, timeout, opportunistic)
if opportunistic:
self.assertEqual(mode, locking._LS_ACQUIRE_OPPORTUNISTIC)
self.assertEqual(timeout_fn(), 0)
else:
self.assertTrue(callable(timeout_fn))
if want_all:
self.assertEqual(mode, locking._LS_ACQUIRE_ALL)
else:
self.assertEqual(mode, locking._LS_ACQUIRE_EXACT)
if want_all:
self.assertTrue(callable(ls_timeout_fn))
else:
self.assertTrue(ls_timeout_fn is None)
class TestGanetiLockManager(_ThreadedTestCase):
def setUp(self):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment