Commit 162c1c1f authored by Guido Trotter's avatar Guido Trotter

Shared Lock implementation and unit tests.

Adding a locking.py file for the ganeti locking library. Its first component is
the implementation of a non-recursive blocking shared lock complete with a
testing library.

Reviewed-by: imsnah, iustinp
parent e2618bc7
#
#
# Copyright (C) 2006, 2007 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.
"""Module implementing the Ganeti locking code."""
# pylint: disable-msg=W0613,W0201
import threading
class SharedLock:
"""Implements a shared lock.
Multiple threads can acquire the lock in a shared way, calling
acquire_shared(). In order to acquire the lock in an exclusive way threads
can call acquire_exclusive().
The lock prevents starvation but does not guarantee that threads will acquire
the shared lock in the order they queued for it, just that they will
eventually do so.
"""
def __init__(self):
"""Construct a new Shared Lock"""
# we have two conditions, c_shr and c_exc, sharing the same lock.
self.__lock = threading.Lock()
self.__turn_shr = threading.Condition(self.__lock)
self.__turn_exc = threading.Condition(self.__lock)
# current lock holders
self.__shr = set()
self.__exc = None
# lock waiters
self.__nwait_exc = 0
self.__nwait_shr = 0
def __is_sharer(self):
"""Is the current thread sharing the lock at this time?"""
return threading.currentThread() in self.__shr
def __is_exclusive(self):
"""Is the current thread holding the lock exclusively at this time?"""
return threading.currentThread() == self.__exc
def __is_owned(self, shared=-1):
"""Is the current thread somehow owning the lock at this time?
This is a private version of the function, which presumes you're holding
the internal lock.
"""
if shared < 0:
return self.__is_sharer() or self.__is_exclusive()
elif shared:
return self.__is_sharer()
else:
return self.__is_exclusive()
def _is_owned(self, shared=-1):
"""Is the current thread somehow owning the lock at this time?
Args:
shared:
< 0: check for any type of ownership (default)
0: check for exclusive ownership
> 0: check for shared ownership
"""
self.__lock.acquire()
try:
result = self.__is_owned(shared)
finally:
self.__lock.release()
return result
def acquire(self, blocking=1, shared=0):
"""Acquire a shared lock.
Args:
shared: whether to acquire in shared mode. By default an exclusive lock
will be acquired.
blocking: whether to block while trying to acquire or to operate in try-lock mode.
this locking mode is not supported yet.
"""
if not blocking:
# We don't have non-blocking mode for now
raise NotImplementedError
self.__lock.acquire()
try:
# We cannot acquire the lock if we already have it
assert not self.__is_owned(), "double acquire() on a non-recursive lock"
if shared:
self.__nwait_shr += 1
try:
# If there is an exclusive holder waiting we have to wait. We'll
# only do this once, though, when we start waiting for the lock. Then
# we'll just wait while there are no exclusive holders.
if self.__nwait_exc > 0:
# TODO: if !blocking...
self.__turn_shr.wait()
while self.__exc is not None:
# TODO: if !blocking...
self.__turn_shr.wait()
self.__shr.add(threading.currentThread())
finally:
self.__nwait_shr -= 1
else:
self.__nwait_exc += 1
try:
# This is to save ourselves from a nasty race condition that could
# theoretically make the sharers starve.
if self.__nwait_shr > 0 or self.__nwait_exc > 1:
# TODO: if !blocking...
self.__turn_exc.wait()
while len(self.__shr) > 0 or self.__exc is not None:
# TODO: if !blocking...
self.__turn_exc.wait()
self.__exc = threading.currentThread()
finally:
self.__nwait_exc -= 1
finally:
self.__lock.release()
return True
def release(self):
"""Release a Shared Lock.
You must have acquired the lock, either in shared or in exclusive mode,
before calling this function.
"""
self.__lock.acquire()
try:
# Autodetect release type
if self.__is_exclusive():
self.__exc = None
# An exclusive holder has just had the lock, time to put it in shared
# mode if there are shared holders waiting. Otherwise wake up the next
# exclusive holder.
if self.__nwait_shr > 0:
self.__turn_shr.notifyAll()
elif self.__nwait_exc > 0:
self.__turn_exc.notify()
elif self.__is_sharer():
self.__shr.remove(threading.currentThread())
# If there are shared holders waiting there *must* be an exclusive holder
# waiting as well; otherwise what were they waiting for?
assert (self.__nwait_shr == 0 or self.__nwait_exc > 0,
"Lock sharers waiting while no exclusive is queueing")
# If there are no more shared holders and some exclusive holders are
# waiting let's wake one up.
if len(self.__shr) == 0 and self.__nwait_exc > 0:
self.__turn_exc.notify()
else:
assert False, "Cannot release non-owned lock"
finally:
self.__lock.release()
......@@ -2,7 +2,8 @@ TESTS = \
ganeti.config_unittest.py \
ganeti.hooks_unittest.py \
ganeti.utils_unittest.py \
ganeti.bdev_unittest.py
ganeti.bdev_unittest.py \
ganeti.locking_unittest.py
TESTS_ENVIRONMENT = PYTHONPATH=.:$(top_builddir)
......
#!/usr/bin/python
#
# Copyright (C) 2006, 2007 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 0.0510-1301, USA.
"""Script for unittesting the locking module"""
import os
import unittest
import time
import Queue
from ganeti import locking
from threading import Thread
class TestSharedLock(unittest.TestCase):
"""Shared lock tests"""
def setUp(self):
self.sl = locking.SharedLock()
# helper threads use the 'done' queue to tell the master they finished.
self.done = Queue.Queue(0)
def testSequenceAndOwnership(self):
self.assert_(not self.sl._is_owned())
self.sl.acquire(shared=1)
self.assert_(self.sl._is_owned())
self.assert_(self.sl._is_owned(shared=1))
self.assert_(not self.sl._is_owned(shared=0))
self.sl.release()
self.assert_(not self.sl._is_owned())
self.sl.acquire()
self.assert_(self.sl._is_owned())
self.assert_(not self.sl._is_owned(shared=1))
self.assert_(self.sl._is_owned(shared=0))
self.sl.release()
self.assert_(not self.sl._is_owned())
self.sl.acquire(shared=1)
self.assert_(self.sl._is_owned())
self.assert_(self.sl._is_owned(shared=1))
self.assert_(not self.sl._is_owned(shared=0))
self.sl.release()
self.assert_(not self.sl._is_owned())
def testBooleanValue(self):
# semaphores are supposed to return a true value on a successful acquire
self.assert_(self.sl.acquire(shared=1))
self.sl.release()
self.assert_(self.sl.acquire())
self.sl.release()
def testDoubleLockingStoE(self):
self.sl.acquire(shared=1)
self.assertRaises(AssertionError, self.sl.acquire)
def testDoubleLockingEtoS(self):
self.sl.acquire()
self.assertRaises(AssertionError, self.sl.acquire, shared=1)
def testDoubleLockingStoS(self):
self.sl.acquire(shared=1)
self.assertRaises(AssertionError, self.sl.acquire, shared=1)
def testDoubleLockingEtoE(self):
self.sl.acquire()
self.assertRaises(AssertionError, self.sl.acquire)
# helper functions: called in a separate thread they acquire the lock, send
# their identifier on the done queue, then release it.
def _doItSharer(self):
self.sl.acquire(shared=1)
self.done.put('SHR')
self.sl.release()
def _doItExclusive(self):
self.sl.acquire()
self.done.put('EXC')
self.sl.release()
def testSharersCanCoexist(self):
self.sl.acquire(shared=1)
Thread(target=self._doItSharer).start()
self.assert_(self.done.get(True, 1))
self.sl.release()
def testExclusiveBlocksExclusive(self):
self.sl.acquire()
Thread(target=self._doItExclusive).start()
# give it a bit of time to check that it's not actually doing anything
self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
self.sl.release()
self.assert_(self.done.get(True, 1))
def testExclusiveBlocksSharer(self):
self.sl.acquire()
Thread(target=self._doItSharer).start()
time.sleep(0.05)
self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
self.sl.release()
self.assert_(self.done.get(True, 1))
def testSharerBlocksExclusive(self):
self.sl.acquire(shared=1)
Thread(target=self._doItExclusive).start()
time.sleep(0.05)
self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
self.sl.release()
self.assert_(self.done.get(True, 1))
def testWaitingExclusiveBlocksSharer(self):
self.sl.acquire(shared=1)
# the lock is acquired in shared mode...
Thread(target=self._doItExclusive).start()
# ...but now an exclusive is waiting...
time.sleep(0.05)
Thread(target=self._doItSharer).start()
# ...so the sharer should be blocked as well
self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
self.sl.release()
# The exclusive passed before
self.assertEqual(self.done.get(True, 1), 'EXC')
self.assertEqual(self.done.get(True, 1), 'SHR')
def testWaitingSharerBlocksExclusive(self):
self.sl.acquire()
# the lock is acquired in exclusive mode...
Thread(target=self._doItSharer).start()
# ...but now a sharer is waiting...
time.sleep(0.05)
Thread(target=self._doItExclusive).start()
# ...the exclusive is waiting too...
self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
self.sl.release()
# The sharer passed before
self.assertEqual(self.done.get(True, 1), 'SHR')
self.assertEqual(self.done.get(True, 1), 'EXC')
if __name__ == '__main__':
unittest.main()
#suite = unittest.TestLoader().loadTestsFromTestCase(TestSharedLock)
#unittest.TextTestRunner(verbosity=2).run(suite)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment