-
Iustin Pop authored
The waiting sharer blocks exclusive is another not-possible to test right now. Sorry for missing it the first time; there are no other testWaiting... tests. Reviewed-by: ultrotter
a143be68
ganeti.locking_unittest.py 32.00 KiB
#!/usr/bin/python
#
# Copyright (C) 2006, 2007 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 0.0510-1301, USA.
"""Script for unittesting the locking module"""
import os
import unittest
import time
import Queue
from ganeti import locking
from ganeti import errors
from threading import Thread
# This is used to test the ssynchronize decorator.
# Since it's passed as input to a decorator it must be declared as a global.
_decoratorlock = locking.SharedLock()
#: List for looping tests
ITERATIONS = range(8)
def _Repeat(fn):
"""Decorator for executing a function many times"""
def wrapper(*args, **kwargs):
for i in ITERATIONS:
fn(*args, **kwargs)
return wrapper
class _ThreadedTestCase(unittest.TestCase):
"""Test class that supports adding/waiting on threads"""
def setUp(self):
unittest.TestCase.setUp(self)
self.threads = []
def _addThread(self, *args, **kwargs):
"""Create and remember a new thread"""
t = Thread(*args, **kwargs)
self.threads.append(t)
t.start()
return t
def _waitThreads(self):
"""Wait for all our threads to finish"""
for t in self.threads:
t.join(60)
self.failIf(t.isAlive())
self.threads = []
class TestSharedLock(_ThreadedTestCase):
"""SharedLock tests"""
def setUp(self):
_ThreadedTestCase.setUp(self)
self.sl = locking.SharedLock()
# helper threads use the 'done' queue to tell the master they finished.
self.done = Queue.Queue(0)
def testSequenceAndOwnership(self):
self.assert_(not self.sl._is_owned())
self.sl.acquire(shared=1)
self.assert_(self.sl._is_owned())
self.assert_(self.sl._is_owned(shared=1))
self.assert_(not self.sl._is_owned(shared=0))
self.sl.release()
self.assert_(not self.sl._is_owned())
self.sl.acquire()
self.assert_(self.sl._is_owned())
self.assert_(not self.sl._is_owned(shared=1))
self.assert_(self.sl._is_owned(shared=0))
self.sl.release()
self.assert_(not self.sl._is_owned())
self.sl.acquire(shared=1)
self.assert_(self.sl._is_owned())
self.assert_(self.sl._is_owned(shared=1))
self.assert_(not self.sl._is_owned(shared=0))
self.sl.release()
self.assert_(not self.sl._is_owned())
def testBooleanValue(self):
# semaphores are supposed to return a true value on a successful acquire
self.assert_(self.sl.acquire(shared=1))
self.sl.release()
self.assert_(self.sl.acquire())
self.sl.release()
def testDoubleLockingStoE(self):
self.sl.acquire(shared=1)
self.assertRaises(AssertionError, self.sl.acquire)
def testDoubleLockingEtoS(self):
self.sl.acquire()
self.assertRaises(AssertionError, self.sl.acquire, shared=1)
def testDoubleLockingStoS(self):
self.sl.acquire(shared=1)
self.assertRaises(AssertionError, self.sl.acquire, shared=1)
def testDoubleLockingEtoE(self):
self.sl.acquire()
self.assertRaises(AssertionError, self.sl.acquire)
# helper functions: called in a separate thread they acquire the lock, send
# their identifier on the done queue, then release it.
def _doItSharer(self):
try:
self.sl.acquire(shared=1)
self.done.put('SHR')
self.sl.release()
except errors.LockError:
self.done.put('ERR')
def _doItExclusive(self):
try:
self.sl.acquire()
self.done.put('EXC')
self.sl.release()
except errors.LockError:
self.done.put('ERR')
def _doItDelete(self):
try:
self.sl.delete()
self.done.put('DEL')
except errors.LockError:
self.done.put('ERR')
def testSharersCanCoexist(self):
self.sl.acquire(shared=1)
Thread(target=self._doItSharer).start()
self.assert_(self.done.get(True, 1))
self.sl.release()
@_Repeat
def testExclusiveBlocksExclusive(self):
self.sl.acquire()
self._addThread(target=self._doItExclusive)
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.sl.release()
self._waitThreads()
self.failUnlessEqual(self.done.get_nowait(), 'EXC')
@_Repeat
def testExclusiveBlocksDelete(self):
self.sl.acquire()
self._addThread(target=self._doItDelete)
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.sl.release()
self._waitThreads()
self.failUnlessEqual(self.done.get_nowait(), 'DEL')
self.sl = locking.SharedLock()
@_Repeat
def testExclusiveBlocksSharer(self):
self.sl.acquire()
self._addThread(target=self._doItSharer)
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.sl.release()
self._waitThreads()
self.failUnlessEqual(self.done.get_nowait(), 'SHR')
@_Repeat
def testSharerBlocksExclusive(self):
self.sl.acquire(shared=1)
self._addThread(target=self._doItExclusive)
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.sl.release()
self._waitThreads()
self.failUnlessEqual(self.done.get_nowait(), 'EXC')
@_Repeat
def testSharerBlocksDelete(self):
self.sl.acquire(shared=1)
self._addThread(target=self._doItDelete)
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.sl.release()
self._waitThreads()
self.failUnlessEqual(self.done.get_nowait(), 'DEL')
self.sl = locking.SharedLock()
@_Repeat
def testWaitingExclusiveBlocksSharer(self):
"""SKIPPED testWaitingExclusiveBlockSharer"""
return
self.sl.acquire(shared=1)
# the lock is acquired in shared mode...
self._addThread(target=self._doItExclusive)
# ...but now an exclusive is waiting...
self._addThread(target=self._doItSharer)
# ...so the sharer should be blocked as well
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.sl.release()
self._waitThreads()
# The exclusive passed before
self.failUnlessEqual(self.done.get_nowait(), 'EXC')
self.failUnlessEqual(self.done.get_nowait(), 'SHR')
@_Repeat
def testWaitingSharerBlocksExclusive(self):
"""SKIPPED testWaitingSharerBlocksExclusive"""
return
self.sl.acquire()
# the lock is acquired in exclusive mode...
self._addThread(target=self._doItSharer)
# ...but now a sharer is waiting...
self._addThread(target=self._doItExclusive)
# ...the exclusive is waiting too...
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.sl.release()
self._waitThreads()
# The sharer passed before
self.assertEqual(self.done.get_nowait(), 'SHR')
self.assertEqual(self.done.get_nowait(), 'EXC')
def testNoNonBlocking(self):
self.assertRaises(NotImplementedError, self.sl.acquire, blocking=0)
self.assertRaises(NotImplementedError, self.sl.delete, blocking=0)
self.sl.acquire()
self.sl.delete(blocking=0) # Fine, because the lock is already acquired
def testDelete(self):
self.sl.delete()
self.assertRaises(errors.LockError, self.sl.acquire)
self.assertRaises(errors.LockError, self.sl.acquire, shared=1)
self.assertRaises(errors.LockError, self.sl.delete)
def testNoDeleteIfSharer(self):
self.sl.acquire(shared=1)
self.assertRaises(AssertionError, self.sl.delete)
@_Repeat
def testDeletePendingSharersExclusiveDelete(self):
self.sl.acquire()
self._addThread(target=self._doItSharer)
self._addThread(target=self._doItSharer)
self._addThread(target=self._doItExclusive)
self._addThread(target=self._doItDelete)
self.sl.delete()
self._waitThreads()
# The threads who were pending return ERR
for _ in range(4):
self.assertEqual(self.done.get_nowait(), 'ERR')
self.sl = locking.SharedLock()
@_Repeat
def testDeletePendingDeleteExclusiveSharers(self):
self.sl.acquire()
self._addThread(target=self._doItDelete)
self._addThread(target=self._doItExclusive)
self._addThread(target=self._doItSharer)
self._addThread(target=self._doItSharer)
self.sl.delete()
self._waitThreads()
# The two threads who were pending return both ERR
self.assertEqual(self.done.get_nowait(), 'ERR')
self.assertEqual(self.done.get_nowait(), 'ERR')
self.assertEqual(self.done.get_nowait(), 'ERR')
self.assertEqual(self.done.get_nowait(), 'ERR')
self.sl = locking.SharedLock()
class TestSSynchronizedDecorator(_ThreadedTestCase):
"""Shared Lock Synchronized decorator test"""
def setUp(self):
_ThreadedTestCase.setUp(self)
# helper threads use the 'done' queue to tell the master they finished.
self.done = Queue.Queue(0)
@locking.ssynchronized(_decoratorlock)
def _doItExclusive(self):
self.assert_(_decoratorlock._is_owned())
self.done.put('EXC')
@locking.ssynchronized(_decoratorlock, shared=1)
def _doItSharer(self):
self.assert_(_decoratorlock._is_owned(shared=1))
self.done.put('SHR')
def testDecoratedFunctions(self):
self._doItExclusive()
self.assert_(not _decoratorlock._is_owned())
self._doItSharer()
self.assert_(not _decoratorlock._is_owned())
def testSharersCanCoexist(self):
_decoratorlock.acquire(shared=1)
Thread(target=self._doItSharer).start()
self.assert_(self.done.get(True, 1))
_decoratorlock.release()
@_Repeat
def testExclusiveBlocksExclusive(self):
_decoratorlock.acquire()
self._addThread(target=self._doItExclusive)
# give it a bit of time to check that it's not actually doing anything
self.assertRaises(Queue.Empty, self.done.get_nowait)
_decoratorlock.release()
self._waitThreads()
self.failUnlessEqual(self.done.get_nowait(), 'EXC')
@_Repeat
def testExclusiveBlocksSharer(self):
_decoratorlock.acquire()
self._addThread(target=self._doItSharer)
self.assertRaises(Queue.Empty, self.done.get_nowait)
_decoratorlock.release()
self._waitThreads()
self.failUnlessEqual(self.done.get_nowait(), 'SHR')
@_Repeat
def testSharerBlocksExclusive(self):
_decoratorlock.acquire(shared=1)
self._addThread(target=self._doItExclusive)
self.assertRaises(Queue.Empty, self.done.get_nowait)
_decoratorlock.release()
self._waitThreads()
self.failUnlessEqual(self.done.get_nowait(), 'EXC')
class TestLockSet(_ThreadedTestCase):
"""LockSet tests"""
def setUp(self):
_ThreadedTestCase.setUp(self)
self._setUpLS()
# helper threads use the 'done' queue to tell the master they finished.
self.done = Queue.Queue(0)
def _setUpLS(self):
"""Helper to (re)initialize the lock set"""
self.resources = ['one', 'two', 'three']
self.ls = locking.LockSet(members=self.resources)
def testResources(self):
self.assertEquals(self.ls._names(), set(self.resources))
newls = locking.LockSet()
self.assertEquals(newls._names(), set())
def testAcquireRelease(self):
self.assert_(self.ls.acquire('one'))
self.assertEquals(self.ls._list_owned(), set(['one']))
self.ls.release()
self.assertEquals(self.ls._list_owned(), set())
self.assertEquals(self.ls.acquire(['one']), set(['one']))
self.assertEquals(self.ls._list_owned(), set(['one']))
self.ls.release()
self.assertEquals(self.ls._list_owned(), set())
self.ls.acquire(['one', 'two', 'three'])
self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
self.ls.release('one')
self.assertEquals(self.ls._list_owned(), set(['two', 'three']))
self.ls.release(['three'])
self.assertEquals(self.ls._list_owned(), set(['two']))
self.ls.release()
self.assertEquals(self.ls._list_owned(), set())
self.assertEquals(self.ls.acquire(['one', 'three']), set(['one', 'three']))
self.assertEquals(self.ls._list_owned(), set(['one', 'three']))
self.ls.release()
self.assertEquals(self.ls._list_owned(), set())
def testNoDoubleAcquire(self):
self.ls.acquire('one')
self.assertRaises(AssertionError, self.ls.acquire, 'one')
self.assertRaises(AssertionError, self.ls.acquire, ['two'])
self.assertRaises(AssertionError, self.ls.acquire, ['two', 'three'])
self.ls.release()
self.ls.acquire(['one', 'three'])
self.ls.release('one')
self.assertRaises(AssertionError, self.ls.acquire, ['two'])
self.ls.release('three')
def testNoWrongRelease(self):
self.assertRaises(AssertionError, self.ls.release)
self.ls.acquire('one')
self.assertRaises(AssertionError, self.ls.release, 'two')
def testAddRemove(self):
self.ls.add('four')
self.assertEquals(self.ls._list_owned(), set())
self.assert_('four' in self.ls._names())
self.ls.add(['five', 'six', 'seven'], acquired=1)
self.assert_('five' in self.ls._names())
self.assert_('six' in self.ls._names())
self.assert_('seven' in self.ls._names())
self.assertEquals(self.ls._list_owned(), set(['five', 'six', 'seven']))
self.assertEquals(self.ls.remove(['five', 'six']), ['five', 'six'])
self.assert_('five' not in self.ls._names())
self.assert_('six' not in self.ls._names())
self.assertEquals(self.ls._list_owned(), set(['seven']))
self.assertRaises(AssertionError, self.ls.add, 'eight', acquired=1)
self.ls.remove('seven')
self.assert_('seven' not in self.ls._names())
self.assertEquals(self.ls._list_owned(), set([]))
self.ls.acquire(None, shared=1)
self.assertRaises(AssertionError, self.ls.add, 'eight')
self.ls.release()
self.ls.acquire(None)
self.ls.add('eight', acquired=1)
self.assert_('eight' in self.ls._names())
self.assert_('eight' in self.ls._list_owned())
self.ls.add('nine')
self.assert_('nine' in self.ls._names())
self.assert_('nine' not in self.ls._list_owned())
self.ls.release()
self.ls.remove(['two'])
self.assert_('two' not in self.ls._names())
self.ls.acquire('three')
self.assertEquals(self.ls.remove(['three']), ['three'])
self.assert_('three' not in self.ls._names())
self.assertEquals(self.ls.remove('three'), [])
self.assertEquals(self.ls.remove(['one', 'three', 'six']), ['one'])
self.assert_('one' not in self.ls._names())
def testRemoveNonBlocking(self):
self.assertRaises(NotImplementedError, self.ls.remove, 'one', blocking=0)
self.ls.acquire('one')
self.assertEquals(self.ls.remove('one', blocking=0), ['one'])
self.ls.acquire(['two', 'three'])
self.assertEquals(self.ls.remove(['two', 'three'], blocking=0),
['two', 'three'])
def testNoDoubleAdd(self):
self.assertRaises(errors.LockError, self.ls.add, 'two')
self.ls.add('four')
self.assertRaises(errors.LockError, self.ls.add, 'four')
def testNoWrongRemoves(self):
self.ls.acquire(['one', 'three'], shared=1)
# Cannot remove 'two' while holding something which is not a superset
self.assertRaises(AssertionError, self.ls.remove, 'two')
# Cannot remove 'three' as we are sharing it
self.assertRaises(AssertionError, self.ls.remove, 'three')
def testAcquireSetLock(self):
# acquire the set-lock exclusively
self.assertEquals(self.ls.acquire(None), set(['one', 'two', 'three']))
self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
self.assertEquals(self.ls._is_owned(), True)
self.assertEquals(self.ls._names(), set(['one', 'two', 'three']))
# I can still add/remove elements...
self.assertEquals(self.ls.remove(['two', 'three']), ['two', 'three'])
self.assert_(self.ls.add('six'))
self.ls.release()
# share the set-lock
self.assertEquals(self.ls.acquire(None, shared=1), set(['one', 'six']))
# adding new elements is not possible
self.assertRaises(AssertionError, self.ls.add, 'five')
self.ls.release()
def testAcquireWithRepetitions(self):
self.assertEquals(self.ls.acquire(['two', 'two', 'three'], shared=1),
set(['two', 'two', 'three']))
self.ls.release(['two', 'two'])
self.assertEquals(self.ls._list_owned(), set(['three']))
def testEmptyAcquire(self):
# Acquire an empty list of locks...
self.assertEquals(self.ls.acquire([]), set())
self.assertEquals(self.ls._list_owned(), set())
# New locks can still be addded
self.assert_(self.ls.add('six'))
# "re-acquiring" is not an issue, since we had really acquired nothing
self.assertEquals(self.ls.acquire([], shared=1), set())
self.assertEquals(self.ls._list_owned(), set())
# We haven't really acquired anything, so we cannot release
self.assertRaises(AssertionError, self.ls.release)
def _doLockSet(self, set, shared):
try:
self.ls.acquire(set, shared=shared)
self.done.put('DONE')
self.ls.release()
except errors.LockError:
self.done.put('ERR')
def _doAddSet(self, set):
try:
self.ls.add(set, acquired=1)
self.done.put('DONE')
self.ls.release()
except errors.LockError:
self.done.put('ERR')
def _doRemoveSet(self, set):
self.done.put(self.ls.remove(set))
@_Repeat
def testConcurrentSharedAcquire(self):
self.ls.acquire(['one', 'two'], shared=1)
self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
self._waitThreads()
self.assertEqual(self.done.get_nowait(), 'DONE')
self._addThread(target=self._doLockSet, args=(['one', 'two', 'three'], 1))
self._waitThreads()
self.assertEqual(self.done.get_nowait(), 'DONE')
self._addThread(target=self._doLockSet, args=('three', 1))
self._waitThreads()
self.assertEqual(self.done.get_nowait(), 'DONE')
self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.ls.release()
self._waitThreads()
self.assertEqual(self.done.get_nowait(), 'DONE')
self.assertEqual(self.done.get_nowait(), 'DONE')
@_Repeat
def testConcurrentExclusiveAcquire(self):
self.ls.acquire(['one', 'two'])
self._addThread(target=self._doLockSet, args=('three', 1))
self._waitThreads()
self.assertEqual(self.done.get_nowait(), 'DONE')
self._addThread(target=self._doLockSet, args=('three', 0))
self._waitThreads()
self.assertEqual(self.done.get_nowait(), 'DONE')
self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
self._addThread(target=self._doLockSet, args=('one', 0))
self._addThread(target=self._doLockSet, args=('one', 1))
self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
self._addThread(target=self._doLockSet, args=(['two', 'three'], 1))
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.ls.release()
self._waitThreads()
for _ in range(6):
self.failUnlessEqual(self.done.get_nowait(), 'DONE')
@_Repeat
def testConcurrentRemove(self):
self.ls.add('four')
self.ls.acquire(['one', 'two', 'four'])
self._addThread(target=self._doLockSet, args=(['one', 'four'], 0))
self._addThread(target=self._doLockSet, args=(['one', 'four'], 1))
self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.ls.remove('one')
self.ls.release()
self._waitThreads()
for i in range(4):
self.failUnlessEqual(self.done.get_nowait(), 'ERR')
self.ls.add(['five', 'six'], acquired=1)
self._addThread(target=self._doLockSet, args=(['three', 'six'], 1))
self._addThread(target=self._doLockSet, args=(['three', 'six'], 0))
self._addThread(target=self._doLockSet, args=(['four', 'six'], 1))
self._addThread(target=self._doLockSet, args=(['four', 'six'], 0))
self.ls.remove('five')
self.ls.release()
self._waitThreads()
for i in range(4):
self.failUnlessEqual(self.done.get_nowait(), 'DONE')
self.ls.acquire(['three', 'four'])
self._addThread(target=self._doRemoveSet, args=(['four', 'six'], ))
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.ls.remove('four')
self._waitThreads()
self.assertEqual(self.done.get_nowait(), ['six'])
self._addThread(target=self._doRemoveSet, args=(['two']))
self._waitThreads()
self.assertEqual(self.done.get_nowait(), ['two'])
self.ls.release()
# reset lockset
self._setUpLS()
@_Repeat
def testConcurrentSharedSetLock(self):
# share the set-lock...
self.ls.acquire(None, shared=1)
# ...another thread can share it too
self._addThread(target=self._doLockSet, args=(None, 1))
self._waitThreads()
self.assertEqual(self.done.get_nowait(), 'DONE')
# ...or just share some elements
self._addThread(target=self._doLockSet, args=(['one', 'three'], 1))
self._waitThreads()
self.assertEqual(self.done.get_nowait(), 'DONE')
# ...but not add new ones or remove any
t = self._addThread(target=self._doAddSet, args=(['nine']))
self._addThread(target=self._doRemoveSet, args=(['two'], ))
self.assertRaises(Queue.Empty, self.done.get_nowait)
# this just releases the set-lock
self.ls.release([])
t.join(60)
self.assertEqual(self.done.get_nowait(), 'DONE')
# release the lock on the actual elements so remove() can proceed too
self.ls.release()
self._waitThreads()
self.failUnlessEqual(self.done.get_nowait(), ['two'])
# reset lockset
self._setUpLS()
@_Repeat
def testConcurrentExclusiveSetLock(self):
# acquire the set-lock...
self.ls.acquire(None, shared=0)
# ...no one can do anything else
self._addThread(target=self._doLockSet, args=(None, 1))
self._addThread(target=self._doLockSet, args=(None, 0))
self._addThread(target=self._doLockSet, args=(['three'], 0))
self._addThread(target=self._doLockSet, args=(['two'], 1))
self._addThread(target=self._doAddSet, args=(['nine']))
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.ls.release()
self._waitThreads()
for _ in range(5):
self.assertEqual(self.done.get(True, 1), 'DONE')
# cleanup
self._setUpLS()
@_Repeat
def testConcurrentSetLockAdd(self):
self.ls.acquire('one')
# Another thread wants the whole SetLock
self._addThread(target=self._doLockSet, args=(None, 0))
self._addThread(target=self._doLockSet, args=(None, 1))
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.assertRaises(AssertionError, self.ls.add, 'four')
self.ls.release()
self._waitThreads()
self.assertEqual(self.done.get_nowait(), 'DONE')
self.assertEqual(self.done.get_nowait(), 'DONE')
self.ls.acquire(None)
self._addThread(target=self._doLockSet, args=(None, 0))
self._addThread(target=self._doLockSet, args=(None, 1))
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.ls.add('four')
self.ls.add('five', acquired=1)
self.ls.add('six', acquired=1, shared=1)
self.assertEquals(self.ls._list_owned(),
set(['one', 'two', 'three', 'five', 'six']))
self.assertEquals(self.ls._is_owned(), True)
self.assertEquals(self.ls._names(),
set(['one', 'two', 'three', 'four', 'five', 'six']))
self.ls.release()
self._waitThreads()
self.assertEqual(self.done.get_nowait(), 'DONE')
self.assertEqual(self.done.get_nowait(), 'DONE')
self._setUpLS()
@_Repeat
def testEmptyLockSet(self):
# get the set-lock
self.assertEqual(self.ls.acquire(None), set(['one', 'two', 'three']))
# now empty it...
self.ls.remove(['one', 'two', 'three'])
# and adds/locks by another thread still wait
self._addThread(target=self._doAddSet, args=(['nine']))
self._addThread(target=self._doLockSet, args=(None, 1))
self._addThread(target=self._doLockSet, args=(None, 0))
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.ls.release()
self._waitThreads()
for _ in range(3):
self.assertEqual(self.done.get_nowait(), 'DONE')
# empty it again...
self.assertEqual(self.ls.remove(['nine']), ['nine'])
# now share it...
self.assertEqual(self.ls.acquire(None, shared=1), set())
# other sharers can go, adds still wait
self._addThread(target=self._doLockSet, args=(None, 1))
self._waitThreads()
self.assertEqual(self.done.get_nowait(), 'DONE')
self._addThread(target=self._doAddSet, args=(['nine']))
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.ls.release()
self._waitThreads()
self.assertEqual(self.done.get_nowait(), 'DONE')
self._setUpLS()
class TestGanetiLockManager(_ThreadedTestCase):
def setUp(self):
_ThreadedTestCase.setUp(self)
self.nodes=['n1', 'n2']
self.instances=['i1', 'i2', 'i3']
self.GL = locking.GanetiLockManager(nodes=self.nodes,
instances=self.instances)
self.done = Queue.Queue(0)
def tearDown(self):
# Don't try this at home...
locking.GanetiLockManager._instance = None
def testLockingConstants(self):
# The locking library internally cheats by assuming its constants have some
# relationships with each other. Check those hold true.
# This relationship is also used in the Processor to recursively acquire
# the right locks. Again, please don't break it.
for i in range(len(locking.LEVELS)):
self.assertEqual(i, locking.LEVELS[i])
def testDoubleGLFails(self):
self.assertRaises(AssertionError, locking.GanetiLockManager)
def testLockNames(self):
self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
set(self.instances))
def testInitAndResources(self):
locking.GanetiLockManager._instance = None
self.GL = locking.GanetiLockManager()
self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
locking.GanetiLockManager._instance = None
self.GL = locking.GanetiLockManager(nodes=self.nodes)
self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
locking.GanetiLockManager._instance = None
self.GL = locking.GanetiLockManager(instances=self.instances)
self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
set(self.instances))
def testAcquireRelease(self):
self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
self.assertEquals(self.GL._list_owned(locking.LEVEL_CLUSTER), set(['BGL']))
self.GL.acquire(locking.LEVEL_INSTANCE, ['i1'])
self.GL.acquire(locking.LEVEL_NODE, ['n1', 'n2'], shared=1)
self.GL.release(locking.LEVEL_NODE, ['n2'])
self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set(['n1']))
self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
self.GL.release(locking.LEVEL_NODE)
self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set())
self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
self.GL.release(locking.LEVEL_INSTANCE)
self.assertRaises(errors.LockError, self.GL.acquire,
locking.LEVEL_INSTANCE, ['i5'])
self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'], shared=1)
self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i3']))
def testAcquireWholeSets(self):
self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
set(self.instances))
self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
set(self.instances))
self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1),
set(self.nodes))
self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
set(self.nodes))
self.GL.release(locking.LEVEL_NODE)
self.GL.release(locking.LEVEL_INSTANCE)
self.GL.release(locking.LEVEL_CLUSTER)
def testAcquireWholeAndPartial(self):
self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
set(self.instances))
self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
set(self.instances))
self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ['n2'], shared=1),
set(['n2']))
self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
set(['n2']))
self.GL.release(locking.LEVEL_NODE)
self.GL.release(locking.LEVEL_INSTANCE)
self.GL.release(locking.LEVEL_CLUSTER)
def testBGLDependency(self):
self.assertRaises(AssertionError, self.GL.acquire,
locking.LEVEL_NODE, ['n1', 'n2'])
self.assertRaises(AssertionError, self.GL.acquire,
locking.LEVEL_INSTANCE, ['i3'])
self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
self.GL.acquire(locking.LEVEL_NODE, ['n1'])
self.assertRaises(AssertionError, self.GL.release,
locking.LEVEL_CLUSTER, ['BGL'])
self.assertRaises(AssertionError, self.GL.release,
locking.LEVEL_CLUSTER)
self.GL.release(locking.LEVEL_NODE)
self.GL.acquire(locking.LEVEL_INSTANCE, ['i1', 'i2'])
self.assertRaises(AssertionError, self.GL.release,
locking.LEVEL_CLUSTER, ['BGL'])
self.assertRaises(AssertionError, self.GL.release,
locking.LEVEL_CLUSTER)
self.GL.release(locking.LEVEL_INSTANCE)
def testWrongOrder(self):
self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
self.GL.acquire(locking.LEVEL_NODE, ['n2'])
self.assertRaises(AssertionError, self.GL.acquire,
locking.LEVEL_NODE, ['n1'])
self.assertRaises(AssertionError, self.GL.acquire,
locking.LEVEL_INSTANCE, ['i2'])
# Helper function to run as a thread that shared the BGL and then acquires
# some locks at another level.
def _doLock(self, level, names, shared):
try:
self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
self.GL.acquire(level, names, shared=shared)
self.done.put('DONE')
self.GL.release(level)
self.GL.release(locking.LEVEL_CLUSTER)
except errors.LockError:
self.done.put('ERR')
@_Repeat
def testConcurrency(self):
self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
self._addThread(target=self._doLock,
args=(locking.LEVEL_INSTANCE, 'i1', 1))
self._waitThreads()
self.assertEqual(self.done.get_nowait(), 'DONE')
self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'])
self._addThread(target=self._doLock,
args=(locking.LEVEL_INSTANCE, 'i1', 1))
self._waitThreads()
self.assertEqual(self.done.get_nowait(), 'DONE')
self._addThread(target=self._doLock,
args=(locking.LEVEL_INSTANCE, 'i3', 1))
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.GL.release(locking.LEVEL_INSTANCE)
self._waitThreads()
self.assertEqual(self.done.get_nowait(), 'DONE')
self.GL.acquire(locking.LEVEL_INSTANCE, ['i2'], shared=1)
self._addThread(target=self._doLock,
args=(locking.LEVEL_INSTANCE, 'i2', 1))
self._waitThreads()
self.assertEqual(self.done.get_nowait(), 'DONE')
self._addThread(target=self._doLock,
args=(locking.LEVEL_INSTANCE, 'i2', 0))
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.GL.release(locking.LEVEL_INSTANCE)
self._waitThreads()
self.assertEqual(self.done.get(True, 1), 'DONE')
self.GL.release(locking.LEVEL_CLUSTER, ['BGL'])
if __name__ == '__main__':
unittest.main()
#suite = unittest.TestLoader().loadTestsFromTestCase(TestSharedLock)
#unittest.TextTestRunner(verbosity=2).run(suite)