Commit 5aab242c authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

locking.LockSet: Implement acquire timeouts



The timeout passed to LockSet.acquire() is measured over all lock acquires. If
LockSet.acquire fails to acquire all requested locks within the specified
amount of time, all locks are released again and the acquire fails.
Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarGuido Trotter <ultrotter@google.com>
parent 95a5d0fb
......@@ -2396,7 +2396,6 @@ class LUQueryNodes(NoHooksLU):
# if we don't request only static fields, we need to lock the nodes
self.needed_locks[locking.LEVEL_NODE] = self.wanted
def CheckPrereq(self):
"""Check prerequisites.
......
......@@ -632,6 +632,12 @@ class SharedLock(object):
ALL_SET = None
class _AcquireTimeout(Exception):
"""Internal exception to abort an acquire on a timeout.
"""
class LockSet:
"""Implements a set of locks.
......@@ -702,6 +708,12 @@ class LockSet:
else:
return set()
def _release_and_delete_owned(self):
"""Release and delete all resources owned by the current thread"""
for lname in self._list_owned():
self.__lockdict[lname].release()
self._del_owned(name=lname)
def __names(self):
"""Return the current set of names.
......@@ -730,30 +742,43 @@ class LockSet:
self.__lock.release()
return set(result)
def acquire(self, names, timeout=None, shared=0):
def acquire(self, names, timeout=None, shared=0, test_notify=None):
"""Acquire a set of resource locks.
@param names: the names of the locks which shall be acquired
(special lock names, or instance/node names)
@param shared: whether to acquire in shared mode; by default an
exclusive lock will be acquired
@type timeout: float
@type timeout: float or None
@param timeout: Maximum time to acquire all locks
@type test_notify: callable or None
@param test_notify: Special callback function for unittesting
@return: True when all the locks are successfully acquired
@return: Set of all locks successfully acquired or None in case of timeout
@raise errors.LockError: when any lock we try to acquire has
been deleted before we succeed. In this case none of the
locks requested will be acquired.
"""
if timeout is not None:
raise NotImplementedError
assert timeout is None or timeout >= 0.0
# Check we don't already own locks at this level
assert not self._is_owned(), "Cannot acquire locks in the same set twice"
if names is None:
# We need to keep track of how long we spent waiting for a lock. The
# timeout passed to this function is over all lock acquires.
remaining_timeout = timeout
if timeout is None:
start = None
calc_remaining_timeout = lambda: None
else:
start = time.time()
calc_remaining_timeout = lambda: (start + timeout) - time.time()
want_all = names is None
if want_all:
# If no names are given acquire the whole set by not letting new names
# being added before we release, and getting the current list of names.
# Some of them may then be deleted later, but we'll cope with this.
......@@ -763,7 +788,7 @@ class LockSet:
# them exclusively though they won't be able to do this anyway, though,
# so we'll get the list lock exclusively as well in order to be able to
# do add() on the set while owning it.
self.__lock.acquire(shared=shared)
self.__lock.acquire(shared=shared, timeout=remaining_timeout)
try:
# note we own the set-lock
self._add_owned()
......@@ -775,65 +800,103 @@ class LockSet:
self.__lock.release()
raise
# Re-calculate timeout
remaining_timeout = calc_remaining_timeout()
try:
# Support passing in a single resource to acquire rather than many
if isinstance(names, basestring):
names = [names]
else:
names = sorted(names)
acquire_list = []
# First we look the locks up on __lockdict. We have no way of being sure
# they will still be there after, but this makes it a lot faster should
# just one of them be the already wrong
for lname in utils.UniqueSequence(names):
try:
lock = self.__lockdict[lname] # raises KeyError if lock is not there
acquire_list.append((lname, lock))
except (KeyError):
if self.__lock._is_owned():
# We are acquiring all the set, it doesn't matter if this
# particular element is not there anymore.
continue
else:
raise errors.LockError('non-existing lock in set (%s)' % lname)
# This will hold the locknames we effectively acquired.
acquired = set()
# Now acquire_list contains a sorted list of resources and locks we want.
# In order to get them we loop on this (private) list and acquire() them.
# We gave no real guarantee they will still exist till this is done but
# .acquire() itself is safe and will alert us if the lock gets deleted.
for (lname, lock) in acquire_list:
try:
lock.acquire(shared=shared) # raises LockError if the lock is deleted
# now the lock cannot be deleted, we have it!
self._add_owned(name=lname)
acquired.add(lname)
except (errors.LockError):
if self.__lock._is_owned():
# We are acquiring all the set, it doesn't matter if this
# particular element is not there anymore.
continue
try:
# Support passing in a single resource to acquire rather than many
if isinstance(names, basestring):
names = [names]
else:
names = sorted(names)
acquire_list = []
# First we look the locks up on __lockdict. We have no way of being sure
# they will still be there after, but this makes it a lot faster should
# just one of them be the already wrong
for lname in utils.UniqueSequence(names):
try:
lock = self.__lockdict[lname] # raises KeyError if lock is not there
acquire_list.append((lname, lock))
except KeyError:
if want_all:
# We are acquiring all the set, it doesn't matter if this
# particular element is not there anymore.
continue
else:
raise errors.LockError("Non-existing lock in set (%s)" % lname)
# This will hold the locknames we effectively acquired.
acquired = set()
# Now acquire_list contains a sorted list of resources and locks we
# want. In order to get them we loop on this (private) list and
# acquire() them. We gave no real guarantee they will still exist till
# this is done but .acquire() itself is safe and will alert us if the
# lock gets deleted.
for (lname, lock) in acquire_list:
if __debug__ and callable(test_notify):
test_notify_fn = lambda: test_notify(lname)
else:
name_fail = lname
for lname in self._list_owned():
self.__lockdict[lname].release()
self._del_owned(name=lname)
raise errors.LockError('non-existing lock in set (%s)' % name_fail)
except:
# We shouldn't have problems adding the lock to the owners list, but
# if we did we'll try to release this lock and re-raise exception.
# Of course something is going to be really wrong, after this.
if lock._is_owned():
lock.release()
raise
test_notify_fn = None
except:
# If something went wrong and we had the set-lock let's release it...
if self.__lock._is_owned():
self.__lock.release()
raise
try:
if timeout is not None and remaining_timeout < 0:
raise _AcquireTimeout()
# raises LockError if the lock was deleted
if not lock.acquire(shared=shared, timeout=remaining_timeout,
test_notify=test_notify_fn):
# Couldn't get lock or timeout occurred
if timeout is None:
# This shouldn't happen as SharedLock.acquire(timeout=None) is
# blocking.
raise errors.LockError("Failed to get lock %s" % lname)
raise _AcquireTimeout()
# Re-calculate timeout
remaining_timeout = calc_remaining_timeout()
# now the lock cannot be deleted, we have it!
self._add_owned(name=lname)
acquired.add(lname)
except _AcquireTimeout:
# Release all acquired locks
self._release_and_delete_owned()
raise
except errors.LockError:
if want_all:
# We are acquiring all the set, it doesn't matter if this
# particular element is not there anymore.
continue
self._release_and_delete_owned()
raise errors.LockError("Non-existing lock in set (%s)" % lname)
except:
# We shouldn't have problems adding the lock to the owners list, but
# if we did we'll try to release this lock and re-raise exception.
# Of course something is going to be really wrong, after this.
if lock._is_owned():
lock.release()
raise
except:
# If something went wrong and we had the set-lock let's release it...
if want_all:
self.__lock.release()
raise
except _AcquireTimeout:
if want_all:
self._del_owned()
return None
return acquired
......@@ -939,7 +1002,7 @@ class LockSet:
@param names: names of the resource to remove.
@return:: a list of locks which we removed; the list is always
@return: a list of locks which we removed; the list is always
equal to the names list if we were holding all the locks
exclusively
......
......@@ -48,6 +48,15 @@ def _Repeat(fn):
return wrapper
def SafeSleep(duration):
start = time.time()
while True:
delay = start + duration - time.time()
if delay <= 0.0:
break
time.sleep(delay)
class _ThreadedTestCase(unittest.TestCase):
"""Test class that supports adding/waiting on threads"""
def setUp(self):
......@@ -965,6 +974,125 @@ class TestLockSet(_ThreadedTestCase):
for _ in range(6):
self.failUnlessEqual(self.done.get_nowait(), 'DONE')
@_Repeat
def testSimpleAcquireTimeoutExpiring(self):
names = sorted(self.ls._names())
self.assert_(len(names) >= 3)
# Get name of first lock
first = names[0]
# Get name of last lock
last = names.pop()
checks = [
# Block first and try to lock it again
(first, first),
# Block last and try to lock all locks
(None, first),
# Block last and try to lock it again
(last, last),
]
for (wanted, block) in checks:
# Lock in exclusive mode
self.assert_(self.ls.acquire(block, shared=0))
def _AcquireOne():
# Try to get the same lock again with a timeout (should never succeed)
if self.ls.acquire(wanted, timeout=0.1, shared=0):
self.done.put("acquired")
self.ls.release()
else:
self.assert_(not self.ls._list_owned())
self.assert_(not self.ls._is_owned())
self.done.put("not acquired")
self._addThread(target=_AcquireOne)
# Wait for timeout in thread to expire
self._waitThreads()
# Release exclusive lock again
self.ls.release()
self.assertEqual(self.done.get_nowait(), "not acquired")
self.assertRaises(Queue.Empty, self.done.get_nowait)
@_Repeat
def testDelayedAndExpiringLockAcquire(self):
self._setUpLS()
self.ls.add(['five', 'six', 'seven', 'eight', 'nine'])
for expire in (False, True):
names = sorted(self.ls._names())
self.assertEqual(len(names), 8)
lock_ev = dict([(i, threading.Event()) for i in names])
# Lock all in exclusive mode
self.assert_(self.ls.acquire(names, shared=0))
if expire:
# We'll wait at least 300ms per lock
lockwait = len(names) * [0.3]
# Fail if we can't acquire all locks in 400ms. There are 8 locks, so
# this gives us up to 2.4s to fail.
lockall_timeout = 0.4
else:
# This should finish rather quickly
lockwait = None
lockall_timeout = len(names) * 5.0
def _LockAll():
def acquire_notification(name):
if not expire:
self.done.put("getting %s" % name)
# Kick next lock
lock_ev[name].set()
if self.ls.acquire(names, shared=0, timeout=lockall_timeout,
test_notify=acquire_notification):
self.done.put("got all")
self.ls.release()
else:
self.done.put("timeout on all")
# Notify all locks
for ev in lock_ev.values():
ev.set()
t = self._addThread(target=_LockAll)
for idx, name in enumerate(names):
# Wait for actual acquire on this lock to start
lock_ev[name].wait(10.0)
if expire and t.isAlive():
# Wait some time after getting the notification to make sure the lock
# acquire will expire
SafeSleep(lockwait[idx])
self.ls.release(names=name)
self.assert_(not self.ls._list_owned())
self._waitThreads()
if expire:
# Not checking which locks were actually acquired. Doing so would be
# too timing-dependant.
self.assertEqual(self.done.get_nowait(), "timeout on all")
else:
for i in names:
self.assertEqual(self.done.get_nowait(), "getting %s" % i)
self.assertEqual(self.done.get_nowait(), "got all")
self.assertRaises(Queue.Empty, self.done.get_nowait)
@_Repeat
def testConcurrentRemove(self):
self.ls.add('four')
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment