Commit 84e344d4 authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

SharedLock: implement timeouts



This patch greatly simplifies the SharedLock code and implements
timeouts for the acquire() and delete() functions. A wrapper around
Python's threading.Condition class must be used to ensure thread
safety when check whether there are any waiters left.
Signed-off-by: default avatarGuido Trotter <ultrotter@google.com>
Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarGuido Trotter <ultrotter@google.com>
parent 23824641
......@@ -20,11 +20,8 @@
"""Module implementing the Ganeti locking code."""
# pylint: disable-msg=W0613,W0201
import threading
# Wouldn't it be better to define LockingError in the locking module?
# Well, for now that's how the rest of the code does it...
from ganeti import errors
from ganeti import utils
......@@ -48,7 +45,55 @@ def ssynchronized(lock, shared=0):
return wrap
class SharedLock:
class _CountingCondition(object):
"""Wrapper for Python's built-in threading.Condition class.
This wrapper keeps a count of active waiters. We can't access the internal
"__waiters" attribute of threading.Condition because it's not thread-safe.
"""
__slots__ = [
"_cond",
"_nwaiters",
]
def __init__(self, lock):
"""Initializes this class.
"""
object.__init__(self)
self._cond = threading.Condition(lock=lock)
self._nwaiters = 0
def notifyAll(self):
"""Notifies the condition.
"""
return self._cond.notifyAll()
def wait(self, timeout=None):
"""Waits for the condition to be notified.
@type timeout: float or None
@param timeout: Timeout in seconds
"""
assert self._nwaiters >= 0
self._nwaiters += 1
try:
return self._cond.wait(timeout=timeout)
finally:
self._nwaiters -= 1
def has_waiting(self):
"""Returns whether there are active waiters.
"""
return bool(self._nwaiters)
class SharedLock(object):
"""Implements a shared lock.
Multiple threads can acquire the lock in a shared way, calling
......@@ -60,31 +105,58 @@ class SharedLock:
eventually do so.
"""
__slots__ = [
"__active_shr_c",
"__inactive_shr_c",
"__deleted",
"__exc",
"__lock",
"__pending",
"__shr",
]
__condition_class = _CountingCondition
def __init__(self):
"""Construct a new SharedLock"""
# we have two conditions, c_shr and c_exc, sharing the same lock.
"""Construct a new SharedLock.
"""
object.__init__(self)
# Internal lock
self.__lock = threading.Lock()
self.__turn_shr = threading.Condition(self.__lock)
self.__turn_exc = threading.Condition(self.__lock)
# current lock holders
# Queue containing waiting acquires
self.__pending = []
# Active and inactive conditions for shared locks
self.__active_shr_c = self.__condition_class(self.__lock)
self.__inactive_shr_c = self.__condition_class(self.__lock)
# Current lock holders
self.__shr = set()
self.__exc = None
# lock waiters
self.__nwait_exc = 0
self.__nwait_shr = 0
self.__npass_shr = 0
# is this lock in the deleted state?
self.__deleted = False
def __check_deleted(self):
"""Raises an exception if the lock has been deleted.
"""
if self.__deleted:
raise errors.LockError("Deleted lock")
def __is_sharer(self):
"""Is the current thread sharing the lock at this time?"""
"""Is the current thread sharing the lock at this time?
"""
return threading.currentThread() in self.__shr
def __is_exclusive(self):
"""Is the current thread holding the lock exclusively at this time?"""
"""Is the current thread holding the lock exclusively at this time?
"""
return threading.currentThread() == self.__exc
def __is_owned(self, shared=-1):
......@@ -112,116 +184,118 @@ class SharedLock:
"""
self.__lock.acquire()
try:
result = self.__is_owned(shared=shared)
return self.__is_owned(shared=shared)
finally:
self.__lock.release()
return result
def __wait(self, c):
"""Wait on the given condition, and raise an exception if the current lock
is declared deleted in the meantime.
def _count_pending(self):
"""Returns the number of pending acquires.
@param c: the condition to wait on
@rtype: int
"""
c.wait()
if self.__deleted:
raise errors.LockError('deleted lock')
self.__lock.acquire()
try:
return len(self.__pending)
finally:
self.__lock.release()
def __exclusive_acquire(self):
"""Acquire the lock exclusively.
def __do_acquire(self, shared):
"""Actually acquire the lock.
This is a private function that presumes you are already holding the
internal lock. It's defined separately to avoid code duplication between
acquire() and delete()
"""
if shared:
self.__shr.add(threading.currentThread())
else:
self.__exc = threading.currentThread()
def __can_acquire(self, shared):
"""Determine whether lock can be acquired.
"""
self.__nwait_exc += 1
try:
# This is to save ourselves from a nasty race condition that could
# theoretically make the sharers starve.
if self.__nwait_shr > 0 or self.__nwait_exc > 1:
self.__wait(self.__turn_exc)
if shared:
return self.__exc is None
else:
return len(self.__shr) == 0 and self.__exc is None
while len(self.__shr) > 0 or self.__exc is not None:
self.__wait(self.__turn_exc)
def __is_on_top(self, cond):
"""Checks whether the passed condition is on top of the queue.
self.__exc = threading.currentThread()
finally:
self.__nwait_exc -= 1
The caller must make sure the queue isn't empty.
assert self.__npass_shr == 0, "SharedLock: internal fairness violation"
"""
return self.__pending[0] == cond
def __shared_acquire(self):
"""Acquire the lock in shared mode
def __acquire_unlocked(self, shared=0, timeout=None):
"""Acquire a shared lock.
This is a private function that presumes you are already holding the
internal lock.
@param shared: whether to acquire in shared mode; by default an
exclusive lock will be acquired
@param timeout: maximum waiting time before giving up
"""
self.__nwait_shr += 1
try:
wait = False
# If there is an exclusive holder waiting we have to wait.
# We'll only do this once, though, when we start waiting for
# the lock. Then we'll just wait while there are no
# exclusive holders.
if self.__nwait_exc > 0:
# TODO: if !blocking...
wait = True
self.__wait(self.__turn_shr)
while self.__exc is not None:
wait = True
# TODO: if !blocking...
self.__wait(self.__turn_shr)
self.__check_deleted()
self.__shr.add(threading.currentThread())
# We cannot acquire the lock if we already have it
assert not self.__is_owned(), "double acquire() on a non-recursive lock"
# Check whether someone else holds the lock or there are pending acquires.
if not self.__pending and self.__can_acquire(shared):
# Apparently not, can acquire lock directly.
self.__do_acquire(shared)
return True
# If we were waiting note that we passed
if wait:
self.__npass_shr -= 1
if shared:
wait_condition = self.__active_shr_c
# Check if we're not yet in the queue
if wait_condition not in self.__pending:
self.__pending.append(wait_condition)
else:
wait_condition = self.__condition_class(self.__lock)
# Always add to queue
self.__pending.append(wait_condition)
try:
# Wait until we become the topmost acquire in the queue or the timeout
# expires.
while not (self.__is_on_top(wait_condition) and
self.__can_acquire(shared)):
# Wait for notification
wait_condition.wait(timeout)
self.__check_deleted()
# A lot of code assumes blocking acquires always succeed. Loop
# internally for that case.
if timeout is not None:
break
if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
self.__do_acquire(shared)
return True
finally:
self.__nwait_shr -= 1
# Remove condition from queue if there are no more waiters
if not wait_condition.has_waiting() and not self.__deleted:
self.__pending.remove(wait_condition)
assert self.__npass_shr >= 0, "Internal fairness condition weirdness"
return False
def acquire(self, blocking=1, shared=0):
def acquire(self, shared=0, timeout=None):
"""Acquire a shared lock.
@type shared: int
@param shared: whether to acquire in shared mode; by default an
exclusive lock will be acquired
@param blocking: whether to block while trying to acquire or to
operate in try-lock mode (this locking mode is not supported yet)
@type timeout: float
@param timeout: maximum waiting time before giving up
"""
if not blocking:
# We don't have non-blocking mode for now
raise NotImplementedError
self.__lock.acquire()
try:
if self.__deleted:
raise errors.LockError('deleted lock')
# We cannot acquire the lock if we already have it
assert not self.__is_owned(), "double acquire() on a non-recursive lock"
assert self.__npass_shr >= 0, "Internal fairness condition weirdness"
if shared:
self.__shared_acquire()
else:
# TODO: if !blocking...
# (or modify __exclusive_acquire for non-blocking mode)
self.__exclusive_acquire()
return self.__acquire_unlocked(shared, timeout)
finally:
self.__lock.release()
return True
def release(self):
"""Release a Shared Lock.
......@@ -231,76 +305,59 @@ class SharedLock:
"""
self.__lock.acquire()
try:
assert self.__npass_shr >= 0, "Internal fairness condition weirdness"
assert self.__is_exclusive() or self.__is_sharer(), \
"Cannot release non-owned lock"
# Autodetect release type
if self.__is_exclusive():
self.__exc = None
# An exclusive holder has just had the lock, time to put it in shared
# mode if there are shared holders waiting. Otherwise wake up the next
# exclusive holder.
if self.__nwait_shr > 0:
# Make sure at least the ones which were blocked pass.
self.__npass_shr = self.__nwait_shr
self.__turn_shr.notifyAll()
elif self.__nwait_exc > 0:
self.__turn_exc.notify()
elif self.__is_sharer():
else:
self.__shr.remove(threading.currentThread())
# If there are shared holders waiting (and not just scheduled to pass)
# there *must* be an exclusive holder waiting as well; otherwise what
# were they waiting for?
assert (self.__nwait_exc > 0 or
self.__npass_shr == self.__nwait_shr), \
"Lock sharers waiting while no exclusive is queueing"
# If there are no more shared holders either in or scheduled to pass,
# and some exclusive holders are waiting let's wake one up.
if (len(self.__shr) == 0 and
self.__nwait_exc > 0 and
not self.__npass_shr > 0):
self.__turn_exc.notify()
# Notify topmost condition in queue
if self.__pending:
first_condition = self.__pending[0]
first_condition.notifyAll()
else:
assert False, "Cannot release non-owned lock"
if first_condition == self.__active_shr_c:
self.__active_shr_c = self.__inactive_shr_c
self.__inactive_shr_c = first_condition
finally:
self.__lock.release()
def delete(self, blocking=1):
def delete(self, timeout=None):
"""Delete a Shared Lock.
This operation will declare the lock for removal. First the lock will be
acquired in exclusive mode if you don't already own it, then the lock
will be put in a state where any future and pending acquire() fail.
@param blocking: whether to block while trying to acquire or to
operate in try-lock mode. this locking mode is not supported
yet unless you are already holding exclusively the lock.
@type timeout: float
@param timeout: maximum waiting time before giving up
"""
self.__lock.acquire()
try:
assert not self.__is_sharer(), "cannot delete() a lock while sharing it"
assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
self.__check_deleted()
if self.__deleted:
raise errors.LockError('deleted lock')
# The caller is allowed to hold the lock exclusively already.
acquired = self.__is_exclusive()
if not self.__is_exclusive():
if not blocking:
# We don't have non-blocking mode for now
raise NotImplementedError
self.__exclusive_acquire()
if not acquired:
acquired = self.__acquire_unlocked(timeout)
if acquired:
self.__deleted = True
self.__exc = None
self.__deleted = True
self.__exc = None
# Wake up everybody, they will fail acquiring the lock and
# raise an exception instead.
self.__turn_exc.notifyAll()
self.__turn_shr.notifyAll()
# Notify all acquires. They'll throw an error.
while self.__pending:
self.__pending.pop().notifyAll()
return acquired
finally:
self.__lock.release()
......
......@@ -26,10 +26,10 @@ import os
import unittest
import time
import Queue
import threading
from ganeti import locking
from ganeti import errors
from threading import Thread
# This is used to test the ssynchronize decorator.
......@@ -39,6 +39,7 @@ _decoratorlock = locking.SharedLock()
#: List for looping tests
ITERATIONS = range(8)
def _Repeat(fn):
"""Decorator for executing a function many times"""
def wrapper(*args, **kwargs):
......@@ -46,6 +47,7 @@ def _Repeat(fn):
fn(*args, **kwargs)
return wrapper
class _ThreadedTestCase(unittest.TestCase):
"""Test class that supports adding/waiting on threads"""
def setUp(self):
......@@ -54,7 +56,7 @@ class _ThreadedTestCase(unittest.TestCase):
def _addThread(self, *args, **kwargs):
"""Create and remember a new thread"""
t = Thread(*args, **kwargs)
t = threading.Thread(*args, **kwargs)
self.threads.append(t)
t.start()
return t
......@@ -147,7 +149,7 @@ class TestSharedLock(_ThreadedTestCase):
def testSharersCanCoexist(self):
self.sl.acquire(shared=1)
Thread(target=self._doItSharer).start()
threading.Thread(target=self._doItSharer).start()
self.assert_(self.done.get(True, 1))
self.sl.release()
......@@ -234,12 +236,6 @@ class TestSharedLock(_ThreadedTestCase):
self.assertEqual(self.done.get_nowait(), 'SHR')
self.assertEqual(self.done.get_nowait(), 'EXC')
def testNoNonBlocking(self):
self.assertRaises(NotImplementedError, self.sl.acquire, blocking=0)
self.assertRaises(NotImplementedError, self.sl.delete, blocking=0)
self.sl.acquire()
self.sl.delete(blocking=0) # Fine, because the lock is already acquired
def testDelete(self):
self.sl.delete()
self.assertRaises(errors.LockError, self.sl.acquire)
......@@ -280,6 +276,222 @@ class TestSharedLock(_ThreadedTestCase):
self.assertEqual(self.done.get_nowait(), 'ERR')
self.sl = locking.SharedLock()
@_Repeat
def testExclusiveAcquireTimeout(self):
def _LockExclusive(wait):
self.sl.acquire(shared=0)
self.done.put("A: start sleep")
time.sleep(wait)
self.done.put("A: end sleep")
self.sl.release()
for shared in [0, 1]:
# Start thread to hold lock for 20 ms
self._addThread(target=_LockExclusive, args=(20.0 / 1000.0, ))
# Wait up to 100 ms to get lock
self.failUnless(self.sl.acquire(shared=shared, timeout=0.1))
self.done.put("got 2nd")
self.sl.release()
self._waitThreads()
self.assertEqual(self.done.get_nowait(), "A: start sleep")
self.assertEqual(self.done.get_nowait(), "A: end sleep")
self.assertEqual(self.done.get_nowait(), "got 2nd")
self.assertRaises(Queue.Empty, self.done.get_nowait)
@_Repeat
def testAcquireExpiringTimeout(self):
def _AcquireWithTimeout(shared, timeout):
if not self.sl.acquire(shared=shared, timeout=timeout):
self.done.put("timeout")
for shared in [0, 1]:
# Lock exclusively
self.sl.acquire()
# Start shared acquires with timeout between 0 and 20 ms
for i in xrange(11):
self._addThread(target=_AcquireWithTimeout,
args=(shared, i * 2.0 / 1000.0))
# Wait for threads to finish (makes sure the acquire timeout expires
# before releasing the lock)
self._waitThreads()
# Release lock
self.sl.release()
for _ in xrange(11):
self.assertEqual(self.done.get_nowait(), "timeout")
self.assertRaises(Queue.Empty, self.done.get_nowait)
@_Repeat
def testSharedSkipExclusiveAcquires(self):
# Tests whether shared acquires jump in front of exclusive acquires in the
# queue.
# Get exclusive lock while we fill the queue
self.sl.acquire()
def _Acquire(shared, name):
if not self.sl.acquire(shared=shared):
return
self.done.put(name)
self.sl.release()
# Start shared acquires
for _ in xrange(5):
self._addThread(target=_Acquire, args=(1, "shared A"))
# Start exclusive acquires
for _ in xrange(3):
self._addThread(target=_Acquire, args=(0, "exclusive B"))
# More shared acquires
for _ in xrange(5):
self._addThread(target=_Acquire, args=(1, "shared C"))
# More exclusive acquires
for _ in xrange(3):
self._addThread(target=_Acquire, args=(0, "exclusive D"))
# Expect 6 pending exclusive acquires and 1 for all shared acquires
# together
self.assertEqual(self.sl._count_pending(), 7)
# Release exclusive lock and wait
self.sl.release()
self._waitThreads()
# Check sequence
for _ in xrange(10):
# Shared locks aren't guaranteed to be notified in order, but they'll be
# first
self.assert_(self.done.get_nowait() in ("shared A", "shared C"))
for _ in xrange(3):
self.assertEqual(self.done.get_nowait(), "exclusive B")
for _ in xrange(3):
self.assertEqual(self.done.get_nowait(), "exclusive D")
self.assertRaises(Queue.Empty, self.done.get_nowait)
@_Repeat
def testMixedAcquireTimeout(self):
sync = threading.Condition()
def _AcquireShared(ev):
if not self.sl.acquire(shared=1, timeout=None):
return
self.done.put("shared")
# Notify main thread
ev.set()
# Wait for notification
sync.acquire()
try:
sync.wait()
finally:
sync.release()
# Release lock
self.sl.release()