#!/usr/bin/python # # Copyright (C) 2006, 2007, 2010 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA # 0.0510-1301, USA. """Script for unittesting the locking module""" import os import unittest import time import Queue import threading import random import itertools from ganeti import constants from ganeti import locking from ganeti import errors from ganeti import utils from ganeti import compat from ganeti import objects from ganeti import query import testutils # This is used to test the ssynchronize decorator. # Since it's passed as input to a decorator it must be declared as a global. _decoratorlock = locking.SharedLock("decorator lock") #: List for looping tests ITERATIONS = range(8) def _Repeat(fn): """Decorator for executing a function many times""" def wrapper(*args, **kwargs): for i in ITERATIONS: fn(*args, **kwargs) return wrapper def SafeSleep(duration): start = time.time() while True: delay = start + duration - time.time() if delay <= 0.0: break time.sleep(delay) class _ThreadedTestCase(unittest.TestCase): """Test class that supports adding/waiting on threads""" def setUp(self): unittest.TestCase.setUp(self) self.done = Queue.Queue(0) self.threads = [] def _addThread(self, *args, **kwargs): """Create and remember a new thread""" t = threading.Thread(*args, **kwargs) self.threads.append(t) t.start() return t def _waitThreads(self): """Wait for all our threads to finish""" for t in self.threads: t.join(60) self.failIf(t.isAlive()) self.threads = [] class _ConditionTestCase(_ThreadedTestCase): """Common test case for conditions""" def setUp(self, cls): _ThreadedTestCase.setUp(self) self.lock = threading.Lock() self.cond = cls(self.lock) def _testAcquireRelease(self): self.assertFalse(self.cond._is_owned()) self.assertRaises(RuntimeError, self.cond.wait) self.assertRaises(RuntimeError, self.cond.notifyAll) self.cond.acquire() self.assert_(self.cond._is_owned()) self.cond.notifyAll() self.assert_(self.cond._is_owned()) self.cond.release() self.assertFalse(self.cond._is_owned()) self.assertRaises(RuntimeError, self.cond.wait) self.assertRaises(RuntimeError, self.cond.notifyAll) def _testNotification(self): def _NotifyAll(): self.done.put("NE") self.cond.acquire() self.done.put("NA") self.cond.notifyAll() self.done.put("NN") self.cond.release() self.cond.acquire() self._addThread(target=_NotifyAll) self.assertEqual(self.done.get(True, 1), "NE") self.assertRaises(Queue.Empty, self.done.get_nowait) self.cond.wait() self.assertEqual(self.done.get(True, 1), "NA") self.assertEqual(self.done.get(True, 1), "NN") self.assert_(self.cond._is_owned()) self.cond.release() self.assertFalse(self.cond._is_owned()) class TestSingleNotifyPipeCondition(_ConditionTestCase): """SingleNotifyPipeCondition tests""" def setUp(self): _ConditionTestCase.setUp(self, locking.SingleNotifyPipeCondition) def testAcquireRelease(self): self._testAcquireRelease() def testNotification(self): self._testNotification() def testWaitReuse(self): self.cond.acquire() self.cond.wait(0) self.cond.wait(0.1) self.cond.release() def testNoNotifyReuse(self): self.cond.acquire() self.cond.notifyAll() self.assertRaises(RuntimeError, self.cond.wait) self.assertRaises(RuntimeError, self.cond.notifyAll) self.cond.release() class TestPipeCondition(_ConditionTestCase): """PipeCondition tests""" def setUp(self): _ConditionTestCase.setUp(self, locking.PipeCondition) def testAcquireRelease(self): self._testAcquireRelease() def testNotification(self): self._testNotification() def _TestWait(self, fn): threads = [ self._addThread(target=fn), self._addThread(target=fn), self._addThread(target=fn), ] # Wait for threads to be waiting for _ in threads: self.assertEqual(self.done.get(True, 1), "A") self.assertRaises(Queue.Empty, self.done.get_nowait) self.cond.acquire() self.assertEqual(len(self.cond._waiters), 3) self.assertEqual(self.cond._waiters, set(threads)) # This new thread can't acquire the lock, and thus call wait, before we # release it self._addThread(target=fn) self.cond.notifyAll() self.assertRaises(Queue.Empty, self.done.get_nowait) self.cond.release() # We should now get 3 W and 1 A (for the new thread) in whatever order w = 0 a = 0 for i in range(4): got = self.done.get(True, 1) if got == "W": w += 1 elif got == "A": a += 1 else: self.fail("Got %s on the done queue" % got) self.assertEqual(w, 3) self.assertEqual(a, 1) self.cond.acquire() self.cond.notifyAll() self.cond.release() self._waitThreads() self.assertEqual(self.done.get_nowait(), "W") self.assertRaises(Queue.Empty, self.done.get_nowait) def testBlockingWait(self): def _BlockingWait(): self.cond.acquire() self.done.put("A") self.cond.wait() self.cond.release() self.done.put("W") self._TestWait(_BlockingWait) def testLongTimeoutWait(self): def _Helper(): self.cond.acquire() self.done.put("A") self.cond.wait(15.0) self.cond.release() self.done.put("W") self._TestWait(_Helper) def _TimeoutWait(self, timeout, check): self.cond.acquire() self.cond.wait(timeout) self.cond.release() self.done.put(check) def testShortTimeoutWait(self): self._addThread(target=self._TimeoutWait, args=(0.1, "T1")) self._addThread(target=self._TimeoutWait, args=(0.1, "T1")) self._waitThreads() self.assertEqual(self.done.get_nowait(), "T1") self.assertEqual(self.done.get_nowait(), "T1") self.assertRaises(Queue.Empty, self.done.get_nowait) def testZeroTimeoutWait(self): self._addThread(target=self._TimeoutWait, args=(0, "T0")) self._addThread(target=self._TimeoutWait, args=(0, "T0")) self._addThread(target=self._TimeoutWait, args=(0, "T0")) self._waitThreads() self.assertEqual(self.done.get_nowait(), "T0") self.assertEqual(self.done.get_nowait(), "T0") self.assertEqual(self.done.get_nowait(), "T0") self.assertRaises(Queue.Empty, self.done.get_nowait) class TestSharedLock(_ThreadedTestCase): """SharedLock tests""" def setUp(self): _ThreadedTestCase.setUp(self) self.sl = locking.SharedLock("TestSharedLock") def testSequenceAndOwnership(self): self.assertFalse(self.sl._is_owned()) self.sl.acquire(shared=1) self.assert_(self.sl._is_owned()) self.assert_(self.sl._is_owned(shared=1)) self.assertFalse(self.sl._is_owned(shared=0)) self.sl.release() self.assertFalse(self.sl._is_owned()) self.sl.acquire() self.assert_(self.sl._is_owned()) self.assertFalse(self.sl._is_owned(shared=1)) self.assert_(self.sl._is_owned(shared=0)) self.sl.release() self.assertFalse(self.sl._is_owned()) self.sl.acquire(shared=1) self.assert_(self.sl._is_owned()) self.assert_(self.sl._is_owned(shared=1)) self.assertFalse(self.sl._is_owned(shared=0)) self.sl.release() self.assertFalse(self.sl._is_owned()) def testBooleanValue(self): # semaphores are supposed to return a true value on a successful acquire self.assert_(self.sl.acquire(shared=1)) self.sl.release() self.assert_(self.sl.acquire()) self.sl.release() def testDoubleLockingStoE(self): self.sl.acquire(shared=1) self.assertRaises(AssertionError, self.sl.acquire) def testDoubleLockingEtoS(self): self.sl.acquire() self.assertRaises(AssertionError, self.sl.acquire, shared=1) def testDoubleLockingStoS(self): self.sl.acquire(shared=1) self.assertRaises(AssertionError, self.sl.acquire, shared=1) def testDoubleLockingEtoE(self): self.sl.acquire() self.assertRaises(AssertionError, self.sl.acquire) # helper functions: called in a separate thread they acquire the lock, send # their identifier on the done queue, then release it. def _doItSharer(self): try: self.sl.acquire(shared=1) self.done.put('SHR') self.sl.release() except errors.LockError: self.done.put('ERR') def _doItExclusive(self): try: self.sl.acquire() self.done.put('EXC') self.sl.release() except errors.LockError: self.done.put('ERR') def _doItDelete(self): try: self.sl.delete() self.done.put('DEL') except errors.LockError: self.done.put('ERR') def testSharersCanCoexist(self): self.sl.acquire(shared=1) threading.Thread(target=self._doItSharer).start() self.assert_(self.done.get(True, 1)) self.sl.release() @_Repeat def testExclusiveBlocksExclusive(self): self.sl.acquire() self._addThread(target=self._doItExclusive) self.assertRaises(Queue.Empty, self.done.get_nowait) self.sl.release() self._waitThreads() self.failUnlessEqual(self.done.get_nowait(), 'EXC') @_Repeat def testExclusiveBlocksDelete(self): self.sl.acquire() self._addThread(target=self._doItDelete) self.assertRaises(Queue.Empty, self.done.get_nowait) self.sl.release() self._waitThreads() self.failUnlessEqual(self.done.get_nowait(), 'DEL') self.sl = locking.SharedLock(self.sl.name) @_Repeat def testExclusiveBlocksSharer(self): self.sl.acquire() self._addThread(target=self._doItSharer) self.assertRaises(Queue.Empty, self.done.get_nowait) self.sl.release() self._waitThreads() self.failUnlessEqual(self.done.get_nowait(), 'SHR') @_Repeat def testSharerBlocksExclusive(self): self.sl.acquire(shared=1) self._addThread(target=self._doItExclusive) self.assertRaises(Queue.Empty, self.done.get_nowait) self.sl.release() self._waitThreads() self.failUnlessEqual(self.done.get_nowait(), 'EXC') @_Repeat def testSharerBlocksDelete(self): self.sl.acquire(shared=1) self._addThread(target=self._doItDelete) self.assertRaises(Queue.Empty, self.done.get_nowait) self.sl.release() self._waitThreads() self.failUnlessEqual(self.done.get_nowait(), 'DEL') self.sl = locking.SharedLock(self.sl.name) @_Repeat def testWaitingExclusiveBlocksSharer(self): """SKIPPED testWaitingExclusiveBlockSharer""" return self.sl.acquire(shared=1) # the lock is acquired in shared mode... self._addThread(target=self._doItExclusive) # ...but now an exclusive is waiting... self._addThread(target=self._doItSharer) # ...so the sharer should be blocked as well self.assertRaises(Queue.Empty, self.done.get_nowait) self.sl.release() self._waitThreads() # The exclusive passed before self.failUnlessEqual(self.done.get_nowait(), 'EXC') self.failUnlessEqual(self.done.get_nowait(), 'SHR') @_Repeat def testWaitingSharerBlocksExclusive(self): """SKIPPED testWaitingSharerBlocksExclusive""" return self.sl.acquire() # the lock is acquired in exclusive mode... self._addThread(target=self._doItSharer) # ...but now a sharer is waiting... self._addThread(target=self._doItExclusive) # ...the exclusive is waiting too... self.assertRaises(Queue.Empty, self.done.get_nowait) self.sl.release() self._waitThreads() # The sharer passed before self.assertEqual(self.done.get_nowait(), 'SHR') self.assertEqual(self.done.get_nowait(), 'EXC') def testDelete(self): self.sl.delete() self.assertRaises(errors.LockError, self.sl.acquire) self.assertRaises(errors.LockError, self.sl.acquire, shared=1) self.assertRaises(errors.LockError, self.sl.delete) def testDeleteTimeout(self): self.sl.delete(timeout=60) def testNoDeleteIfSharer(self): self.sl.acquire(shared=1) self.assertRaises(AssertionError, self.sl.delete) @_Repeat def testDeletePendingSharersExclusiveDelete(self): self.sl.acquire() self._addThread(target=self._doItSharer) self._addThread(target=self._doItSharer) self._addThread(target=self._doItExclusive) self._addThread(target=self._doItDelete) self.sl.delete() self._waitThreads() # The threads who were pending return ERR for _ in range(4): self.assertEqual(self.done.get_nowait(), 'ERR') self.sl = locking.SharedLock(self.sl.name) @_Repeat def testDeletePendingDeleteExclusiveSharers(self): self.sl.acquire() self._addThread(target=self._doItDelete) self._addThread(target=self._doItExclusive) self._addThread(target=self._doItSharer) self._addThread(target=self._doItSharer) self.sl.delete() self._waitThreads() # The two threads who were pending return both ERR self.assertEqual(self.done.get_nowait(), 'ERR') self.assertEqual(self.done.get_nowait(), 'ERR') self.assertEqual(self.done.get_nowait(), 'ERR') self.assertEqual(self.done.get_nowait(), 'ERR') self.sl = locking.SharedLock(self.sl.name) @_Repeat def testExclusiveAcquireTimeout(self): for shared in [0, 1]: on_queue = threading.Event() release_exclusive = threading.Event() def _LockExclusive(): self.sl.acquire(shared=0, test_notify=on_queue.set) self.done.put("A: start wait") release_exclusive.wait() self.done.put("A: end wait") self.sl.release() # Start thread to hold lock in exclusive mode self._addThread(target=_LockExclusive) # Wait for wait to begin self.assertEqual(self.done.get(timeout=60), "A: start wait") # Wait up to 60s to get lock, but release exclusive lock as soon as we're # on the queue self.failUnless(self.sl.acquire(shared=shared, timeout=60, test_notify=release_exclusive.set)) self.done.put("got 2nd") self.sl.release() self._waitThreads() self.assertEqual(self.done.get_nowait(), "A: end wait") self.assertEqual(self.done.get_nowait(), "got 2nd") self.assertRaises(Queue.Empty, self.done.get_nowait) @_Repeat def testAcquireExpiringTimeout(self): def _AcquireWithTimeout(shared, timeout): if not self.sl.acquire(shared=shared, timeout=timeout): self.done.put("timeout") for shared in [0, 1]: # Lock exclusively self.sl.acquire() # Start shared acquires with timeout between 0 and 20 ms for i in range(11): self._addThread(target=_AcquireWithTimeout, args=(shared, i * 2.0 / 1000.0)) # Wait for threads to finish (makes sure the acquire timeout expires # before releasing the lock) self._waitThreads() # Release lock self.sl.release() for _ in range(11): self.assertEqual(self.done.get_nowait(), "timeout") self.assertRaises(Queue.Empty, self.done.get_nowait) @_Repeat def testSharedSkipExclusiveAcquires(self): # Tests whether shared acquires jump in front of exclusive acquires in the # queue. def _Acquire(shared, name, notify_ev, wait_ev): if notify_ev: notify_fn = notify_ev.set else: notify_fn = None if wait_ev: wait_ev.wait() if not self.sl.acquire(shared=shared, test_notify=notify_fn): return self.done.put(name) self.sl.release() # Get exclusive lock while we fill the queue self.sl.acquire() shrcnt1 = 5 shrcnt2 = 7 shrcnt3 = 9 shrcnt4 = 2 # Add acquires using threading.Event for synchronization. They'll be # acquired exactly in the order defined in this list. acquires = (shrcnt1 * [(1, "shared 1")] + 3 * [(0, "exclusive 1")] + shrcnt2 * [(1, "shared 2")] + shrcnt3 * [(1, "shared 3")] + shrcnt4 * [(1, "shared 4")] + 3 * [(0, "exclusive 2")]) ev_cur = None ev_prev = None for args in acquires: ev_cur = threading.Event() self._addThread(target=_Acquire, args=args + (ev_cur, ev_prev)) ev_prev = ev_cur # Wait for last acquire to start ev_prev.wait() # Expect 6 pending exclusive acquires and 1 for all shared acquires # together self.assertEqual(self.sl._count_pending(), 7) # Release exclusive lock and wait self.sl.release() self._waitThreads() # Check sequence for _ in range(shrcnt1 + shrcnt2 + shrcnt3 + shrcnt4): # Shared locks aren't guaranteed to be notified in order, but they'll be # first tmp = self.done.get_nowait() if tmp == "shared 1": shrcnt1 -= 1 elif tmp == "shared 2": shrcnt2 -= 1 elif tmp == "shared 3": shrcnt3 -= 1 elif tmp == "shared 4": shrcnt4 -= 1 self.assertEqual(shrcnt1, 0) self.assertEqual(shrcnt2, 0) self.assertEqual(shrcnt3, 0) self.assertEqual(shrcnt3, 0) for _ in range(3): self.assertEqual(self.done.get_nowait(), "exclusive 1") for _ in range(3): self.assertEqual(self.done.get_nowait(), "exclusive 2") self.assertRaises(Queue.Empty, self.done.get_nowait) @_Repeat def testMixedAcquireTimeout(self): sync = threading.Event() def _AcquireShared(ev): if not self.sl.acquire(shared=1, timeout=None): return self.done.put("shared") # Notify main thread ev.set() # Wait for notification from main thread sync.wait() # Release lock self.sl.release() acquires = [] for _ in range(3): ev = threading.Event() self._addThread(target=_AcquireShared, args=(ev, )) acquires.append(ev) # Wait for all acquires to finish for i in acquires: i.wait() self.assertEqual(self.sl._count_pending(), 0) # Try to get exclusive lock self.failIf(self.sl.acquire(shared=0, timeout=0.02)) # Acquire exclusive without timeout exclsync = threading.Event() exclev = threading.Event() def _AcquireExclusive(): if not self.sl.acquire(shared=0): return self.done.put("exclusive") # Notify main thread exclev.set() # Wait for notification from main thread exclsync.wait() self.sl.release() self._addThread(target=_AcquireExclusive) # Try to get exclusive lock self.failIf(self.sl.acquire(shared=0, timeout=0.02)) # Make all shared holders release their locks sync.set() # Wait for exclusive acquire to succeed exclev.wait() self.assertEqual(self.sl._count_pending(), 0) # Try to get exclusive lock self.failIf(self.sl.acquire(shared=0, timeout=0.02)) def _AcquireSharedSimple(): if self.sl.acquire(shared=1, timeout=None): self.done.put("shared2") self.sl.release() for _ in range(10): self._addThread(target=_AcquireSharedSimple) # Tell exclusive lock to release exclsync.set() # Wait for everything to finish self._waitThreads() self.assertEqual(self.sl._count_pending(), 0) # Check sequence for _ in range(3): self.assertEqual(self.done.get_nowait(), "shared") self.assertEqual(self.done.get_nowait(), "exclusive") for _ in range(10): self.assertEqual(self.done.get_nowait(), "shared2") self.assertRaises(Queue.Empty, self.done.get_nowait) def testPriority(self): # Acquire in exclusive mode self.assert_(self.sl.acquire(shared=0)) # Queue acquires def _Acquire(prev, next, shared, priority, result): prev.wait() self.sl.acquire(shared=shared, priority=priority, test_notify=next.set) try: self.done.put(result) finally: self.sl.release() counter = itertools.count(0) priorities = range(-20, 30) first = threading.Event() prev = first # Data structure: # { # priority: # [(shared/exclusive, set(acquire names), set(pending threads)), # (shared/exclusive, ...), # ..., # ], # } perprio = {} # References shared acquire per priority in L{perprio}. Data structure: # { # priority: (shared=1, set(acquire names), set(pending threads)), # } prioshared = {} for seed in [4979, 9523, 14902, 32440]: # Use a deterministic random generator rnd = random.Random(seed) for priority in [rnd.choice(priorities) for _ in range(30)]: modes = [0, 1] rnd.shuffle(modes) for shared in modes: # Unique name acqname = "%s/shr=%s/prio=%s" % (counter.next(), shared, priority) ev = threading.Event() thread = self._addThread(target=_Acquire, args=(prev, ev, shared, priority, acqname)) prev = ev # Record expected aqcuire, see above for structure data = (shared, set([acqname]), set([thread])) priolist = perprio.setdefault(priority, []) if shared: priosh = prioshared.get(priority, None) if priosh: # Shared acquires are merged for i, j in zip(priosh[1:], data[1:]): i.update(j) assert data[0] == priosh[0] else: prioshared[priority] = data priolist.append(data) else: priolist.append(data) # Start all acquires and wait for them first.set() prev.wait() # Check lock information self.assertEqual(self.sl.GetInfo(set()), (self.sl.name, None, None, None)) self.assertEqual(self.sl.GetInfo(set([query.LQ_MODE, query.LQ_OWNER])), (self.sl.name, "exclusive", [threading.currentThread().getName()], None)) self._VerifyPrioPending(self.sl.GetInfo(set([query.LQ_PENDING])), perprio) # Let threads acquire the lock self.sl.release() # Wait for everything to finish self._waitThreads() self.assert_(self.sl._check_empty()) # Check acquires by priority for acquires in [perprio[i] for i in sorted(perprio.keys())]: for (_, names, _) in acquires: # For shared acquires, the set will contain 1..n entries. For exclusive # acquires only one. while names: names.remove(self.done.get_nowait()) self.assertFalse(compat.any(names for (_, names, _) in acquires)) self.assertRaises(Queue.Empty, self.done.get_nowait) def _VerifyPrioPending(self, (name, mode, owner, pending), perprio): self.assertEqual(name, self.sl.name) self.assert_(mode is None) self.assert_(owner is None) self.assertEqual([(pendmode, sorted(waiting)) for (pendmode, waiting) in pending], [(["exclusive", "shared"][int(bool(shared))], sorted(t.getName() for t in threads)) for acquires in [perprio[i] for i in sorted(perprio.keys())] for (shared, _, threads) in acquires]) class TestSharedLockInCondition(_ThreadedTestCase): """SharedLock as a condition lock tests""" def setUp(self): _ThreadedTestCase.setUp(self) self.sl = locking.SharedLock("TestSharedLockInCondition") self.setCondition() def setCondition(self): self.cond = threading.Condition(self.sl) def testKeepMode(self): self.cond.acquire(shared=1) self.assert_(self.sl._is_owned(shared=1)) self.cond.wait(0) self.assert_(self.sl._is_owned(shared=1)) self.cond.release() self.cond.acquire(shared=0) self.assert_(self.sl._is_owned(shared=0)) self.cond.wait(0) self.assert_(self.sl._is_owned(shared=0)) self.cond.release() class TestSharedLockInPipeCondition(TestSharedLockInCondition): """SharedLock as a pipe condition lock tests""" def setCondition(self): self.cond = locking.PipeCondition(self.sl) class TestSSynchronizedDecorator(_ThreadedTestCase): """Shared Lock Synchronized decorator test""" def setUp(self): _ThreadedTestCase.setUp(self) @locking.ssynchronized(_decoratorlock) def _doItExclusive(self): self.assert_(_decoratorlock._is_owned()) self.done.put('EXC') @locking.ssynchronized(_decoratorlock, shared=1) def _doItSharer(self): self.assert_(_decoratorlock._is_owned(shared=1)) self.done.put('SHR') def testDecoratedFunctions(self): self._doItExclusive() self.assertFalse(_decoratorlock._is_owned()) self._doItSharer() self.assertFalse(_decoratorlock._is_owned()) def testSharersCanCoexist(self): _decoratorlock.acquire(shared=1) threading.Thread(target=self._doItSharer).start() self.assert_(self.done.get(True, 1)) _decoratorlock.release() @_Repeat def testExclusiveBlocksExclusive(self): _decoratorlock.acquire() self._addThread(target=self._doItExclusive) # give it a bit of time to check that it's not actually doing anything self.assertRaises(Queue.Empty, self.done.get_nowait) _decoratorlock.release() self._waitThreads() self.failUnlessEqual(self.done.get_nowait(), 'EXC') @_Repeat def testExclusiveBlocksSharer(self): _decoratorlock.acquire() self._addThread(target=self._doItSharer) self.assertRaises(Queue.Empty, self.done.get_nowait) _decoratorlock.release() self._waitThreads() self.failUnlessEqual(self.done.get_nowait(), 'SHR') @_Repeat def testSharerBlocksExclusive(self): _decoratorlock.acquire(shared=1) self._addThread(target=self._doItExclusive) self.assertRaises(Queue.Empty, self.done.get_nowait) _decoratorlock.release() self._waitThreads() self.failUnlessEqual(self.done.get_nowait(), 'EXC') class TestLockSet(_ThreadedTestCase): """LockSet tests""" def setUp(self): _ThreadedTestCase.setUp(self) self._setUpLS() def _setUpLS(self): """Helper to (re)initialize the lock set""" self.resources = ['one', 'two', 'three'] self.ls = locking.LockSet(self.resources, "TestLockSet") def testResources(self): self.assertEquals(self.ls._names(), set(self.resources)) newls = locking.LockSet([], "TestLockSet.testResources") self.assertEquals(newls._names(), set()) def testAcquireRelease(self): self.assert_(self.ls.acquire('one')) self.assertEquals(self.ls._list_owned(), set(['one'])) self.ls.release() self.assertEquals(self.ls._list_owned(), set()) self.assertEquals(self.ls.acquire(['one']), set(['one'])) self.assertEquals(self.ls._list_owned(), set(['one'])) self.ls.release() self.assertEquals(self.ls._list_owned(), set()) self.ls.acquire(['one', 'two', 'three']) self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three'])) self.ls.release('one') self.assertEquals(self.ls._list_owned(), set(['two', 'three'])) self.ls.release(['three']) self.assertEquals(self.ls._list_owned(), set(['two'])) self.ls.release() self.assertEquals(self.ls._list_owned(), set()) self.assertEquals(self.ls.acquire(['one', 'three']), set(['one', 'three'])) self.assertEquals(self.ls._list_owned(), set(['one', 'three'])) self.ls.release() self.assertEquals(self.ls._list_owned(), set()) def testNoDoubleAcquire(self): self.ls.acquire('one') self.assertRaises(AssertionError, self.ls.acquire, 'one') self.assertRaises(AssertionError, self.ls.acquire, ['two']) self.assertRaises(AssertionError, self.ls.acquire, ['two', 'three']) self.ls.release() self.ls.acquire(['one', 'three']) self.ls.release('one') self.assertRaises(AssertionError, self.ls.acquire, ['two']) self.ls.release('three') def testNoWrongRelease(self): self.assertRaises(AssertionError, self.ls.release) self.ls.acquire('one') self.assertRaises(AssertionError, self.ls.release, 'two') def testAddRemove(self): self.ls.add('four') self.assertEquals(self.ls._list_owned(), set()) self.assert_('four' in self.ls._names()) self.ls.add(['five', 'six', 'seven'], acquired=1) self.assert_('five' in self.ls._names()) self.assert_('six' in self.ls._names()) self.assert_('seven' in self.ls._names()) self.assertEquals(self.ls._list_owned(), set(['five', 'six', 'seven'])) self.assertEquals(self.ls.remove(['five', 'six']), ['five', 'six']) self.assert_('five' not in self.ls._names()) self.assert_('six' not in self.ls._names()) self.assertEquals(self.ls._list_owned(), set(['seven'])) self.assertRaises(AssertionError, self.ls.add, 'eight', acquired=1) self.ls.remove('seven') self.assert_('seven' not in self.ls._names()) self.assertEquals(self.ls._list_owned(), set([])) self.ls.acquire(None, shared=1) self.assertRaises(AssertionError, self.ls.add, 'eight') self.ls.release() self.ls.acquire(None) self.ls.add('eight', acquired=1) self.assert_('eight' in self.ls._names()) self.assert_('eight' in self.ls._list_owned()) self.ls.add('nine') self.assert_('nine' in self.ls._names()) self.assert_('nine' not in self.ls._list_owned()) self.ls.release() self.ls.remove(['two']) self.assert_('two' not in self.ls._names()) self.ls.acquire('three') self.assertEquals(self.ls.remove(['three']), ['three']) self.assert_('three' not in self.ls._names()) self.assertEquals(self.ls.remove('three'), []) self.assertEquals(self.ls.remove(['one', 'three', 'six']), ['one']) self.assert_('one' not in self.ls._names()) def testRemoveNonBlocking(self): self.ls.acquire('one') self.assertEquals(self.ls.remove('one'), ['one']) self.ls.acquire(['two', 'three']) self.assertEquals(self.ls.remove(['two', 'three']), ['two', 'three']) def testNoDoubleAdd(self): self.assertRaises(errors.LockError, self.ls.add, 'two') self.ls.add('four') self.assertRaises(errors.LockError, self.ls.add, 'four') def testNoWrongRemoves(self): self.ls.acquire(['one', 'three'], shared=1) # Cannot remove 'two' while holding something which is not a superset self.assertRaises(AssertionError, self.ls.remove, 'two') # Cannot remove 'three' as we are sharing it self.assertRaises(AssertionError, self.ls.remove, 'three') def testAcquireSetLock(self): # acquire the set-lock exclusively self.assertEquals(self.ls.acquire(None), set(['one', 'two', 'three'])) self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three'])) self.assertEquals(self.ls._is_owned(), True) self.assertEquals(self.ls._names(), set(['one', 'two', 'three'])) # I can still add/remove elements... self.assertEquals(self.ls.remove(['two', 'three']), ['two', 'three']) self.assert_(self.ls.add('six')) self.ls.release() # share the set-lock self.assertEquals(self.ls.acquire(None, shared=1), set(['one', 'six'])) # adding new elements is not possible self.assertRaises(AssertionError, self.ls.add, 'five') self.ls.release() def testAcquireWithRepetitions(self): self.assertEquals(self.ls.acquire(['two', 'two', 'three'], shared=1), set(['two', 'two', 'three'])) self.ls.release(['two', 'two']) self.assertEquals(self.ls._list_owned(), set(['three'])) def testEmptyAcquire(self): # Acquire an empty list of locks... self.assertEquals(self.ls.acquire([]), set()) self.assertEquals(self.ls._list_owned(), set()) # New locks can still be addded self.assert_(self.ls.add('six')) # "re-acquiring" is not an issue, since we had really acquired nothing self.assertEquals(self.ls.acquire([], shared=1), set()) self.assertEquals(self.ls._list_owned(), set()) # We haven't really acquired anything, so we cannot release self.assertRaises(AssertionError, self.ls.release) def _doLockSet(self, names, shared): try: self.ls.acquire(names, shared=shared) self.done.put('DONE') self.ls.release() except errors.LockError: self.done.put('ERR') def _doAddSet(self, names): try: self.ls.add(names, acquired=1) self.done.put('DONE') self.ls.release() except errors.LockError: self.done.put('ERR') def _doRemoveSet(self, names): self.done.put(self.ls.remove(names)) @_Repeat def testConcurrentSharedAcquire(self): self.ls.acquire(['one', 'two'], shared=1) self._addThread(target=self._doLockSet, args=(['one', 'two'], 1)) self._waitThreads() self.assertEqual(self.done.get_nowait(), 'DONE') self._addThread(target=self._doLockSet, args=(['one', 'two', 'three'], 1)) self._waitThreads() self.assertEqual(self.done.get_nowait(), 'DONE') self._addThread(target=self._doLockSet, args=('three', 1)) self._waitThreads() self.assertEqual(self.done.get_nowait(), 'DONE') self._addThread(target=self._doLockSet, args=(['one', 'two'], 0)) self._addThread(target=self._doLockSet, args=(['two', 'three'], 0)) self.assertRaises(Queue.Empty, self.done.get_nowait) self.ls.release() self._waitThreads() self.assertEqual(self.done.get_nowait(), 'DONE') self.assertEqual(self.done.get_nowait(), 'DONE') @_Repeat def testConcurrentExclusiveAcquire(self): self.ls.acquire(['one', 'two']) self._addThread(target=self._doLockSet, args=('three', 1)) self._waitThreads() self.assertEqual(self.done.get_nowait(), 'DONE') self._addThread(target=self._doLockSet, args=('three', 0)) self._waitThreads() self.assertEqual(self.done.get_nowait(), 'DONE') self.assertRaises(Queue.Empty, self.done.get_nowait) self._addThread(target=self._doLockSet, args=(['one', 'two'], 0)) self._addThread(target=self._doLockSet, args=(['one', 'two'], 1)) self._addThread(target=self._doLockSet, args=('one', 0)) self._addThread(target=self._doLockSet, args=('one', 1)) self._addThread(target=self._doLockSet, args=(['two', 'three'], 0)) self._addThread(target=self._doLockSet, args=(['two', 'three'], 1)) self.assertRaises(Queue.Empty, self.done.get_nowait) self.ls.release() self._waitThreads() for _ in range(6): self.failUnlessEqual(self.done.get_nowait(), 'DONE') @_Repeat def testSimpleAcquireTimeoutExpiring(self): names = sorted(self.ls._names()) self.assert_(len(names) >= 3) # Get name of first lock first = names[0] # Get name of last lock last = names.pop() checks = [ # Block first and try to lock it again (first, first), # Block last and try to lock all locks (None, first), # Block last and try to lock it again (last, last), ] for (wanted, block) in checks: # Lock in exclusive mode self.assert_(self.ls.acquire(block, shared=0)) def _AcquireOne(): # Try to get the same lock again with a timeout (should never succeed) acquired = self.ls.acquire(wanted, timeout=0.1, shared=0) if acquired: self.done.put("acquired") self.ls.release() else: self.assert_(acquired is None) self.assertFalse(self.ls._list_owned()) self.assertFalse(self.ls._is_owned()) self.done.put("not acquired") self._addThread(target=_AcquireOne) # Wait for timeout in thread to expire self._waitThreads() # Release exclusive lock again self.ls.release() self.assertEqual(self.done.get_nowait(), "not acquired") self.assertRaises(Queue.Empty, self.done.get_nowait) @_Repeat def testDelayedAndExpiringLockAcquire(self): self._setUpLS() self.ls.add(['five', 'six', 'seven', 'eight', 'nine']) for expire in (False, True): names = sorted(self.ls._names()) self.assertEqual(len(names), 8) lock_ev = dict([(i, threading.Event()) for i in names]) # Lock all in exclusive mode self.assert_(self.ls.acquire(names, shared=0)) if expire: # We'll wait at least 300ms per lock lockwait = len(names) * [0.3] # Fail if we can't acquire all locks in 400ms. There are 8 locks, so # this gives us up to 2.4s to fail. lockall_timeout = 0.4 else: # This should finish rather quickly lockwait = None lockall_timeout = len(names) * 5.0 def _LockAll(): def acquire_notification(name): if not expire: self.done.put("getting %s" % name) # Kick next lock lock_ev[name].set() if self.ls.acquire(names, shared=0, timeout=lockall_timeout, test_notify=acquire_notification): self.done.put("got all") self.ls.release() else: self.done.put("timeout on all") # Notify all locks for ev in lock_ev.values(): ev.set() t = self._addThread(target=_LockAll) for idx, name in enumerate(names): # Wait for actual acquire on this lock to start lock_ev[name].wait(10.0) if expire and t.isAlive(): # Wait some time after getting the notification to make sure the lock # acquire will expire SafeSleep(lockwait[idx]) self.ls.release(names=name) self.assertFalse(self.ls._list_owned()) self._waitThreads() if expire: # Not checking which locks were actually acquired. Doing so would be # too timing-dependant. self.assertEqual(self.done.get_nowait(), "timeout on all") else: for i in names: self.assertEqual(self.done.get_nowait(), "getting %s" % i) self.assertEqual(self.done.get_nowait(), "got all") self.assertRaises(Queue.Empty, self.done.get_nowait) @_Repeat def testConcurrentRemove(self): self.ls.add('four') self.ls.acquire(['one', 'two', 'four']) self._addThread(target=self._doLockSet, args=(['one', 'four'], 0)) self._addThread(target=self._doLockSet, args=(['one', 'four'], 1)) self._addThread(target=self._doLockSet, args=(['one', 'two'], 0)) self._addThread(target=self._doLockSet, args=(['one', 'two'], 1)) self.assertRaises(Queue.Empty, self.done.get_nowait) self.ls.remove('one') self.ls.release() self._waitThreads() for i in range(4): self.failUnlessEqual(self.done.get_nowait(), 'ERR') self.ls.add(['five', 'six'], acquired=1) self._addThread(target=self._doLockSet, args=(['three', 'six'], 1)) self._addThread(target=self._doLockSet, args=(['three', 'six'], 0)) self._addThread(target=self._doLockSet, args=(['four', 'six'], 1)) self._addThread(target=self._doLockSet, args=(['four', 'six'], 0)) self.ls.remove('five') self.ls.release() self._waitThreads() for i in range(4): self.failUnlessEqual(self.done.get_nowait(), 'DONE') self.ls.acquire(['three', 'four']) self._addThread(target=self._doRemoveSet, args=(['four', 'six'], )) self.assertRaises(Queue.Empty, self.done.get_nowait) self.ls.remove('four') self._waitThreads() self.assertEqual(self.done.get_nowait(), ['six']) self._addThread(target=self._doRemoveSet, args=(['two'])) self._waitThreads() self.assertEqual(self.done.get_nowait(), ['two']) self.ls.release() # reset lockset self._setUpLS() @_Repeat def testConcurrentSharedSetLock(self): # share the set-lock... self.ls.acquire(None, shared=1) # ...another thread can share it too self._addThread(target=self._doLockSet, args=(None, 1)) self._waitThreads() self.assertEqual(self.done.get_nowait(), 'DONE') # ...or just share some elements self._addThread(target=self._doLockSet, args=(['one', 'three'], 1)) self._waitThreads() self.assertEqual(self.done.get_nowait(), 'DONE') # ...but not add new ones or remove any t = self._addThread(target=self._doAddSet, args=(['nine'])) self._addThread(target=self._doRemoveSet, args=(['two'], )) self.assertRaises(Queue.Empty, self.done.get_nowait) # this just releases the set-lock self.ls.release([]) t.join(60) self.assertEqual(self.done.get_nowait(), 'DONE') # release the lock on the actual elements so remove() can proceed too self.ls.release() self._waitThreads() self.failUnlessEqual(self.done.get_nowait(), ['two']) # reset lockset self._setUpLS() @_Repeat def testConcurrentExclusiveSetLock(self): # acquire the set-lock... self.ls.acquire(None, shared=0) # ...no one can do anything else self._addThread(target=self._doLockSet, args=(None, 1)) self._addThread(target=self._doLockSet, args=(None, 0)) self._addThread(target=self._doLockSet, args=(['three'], 0)) self._addThread(target=self._doLockSet, args=(['two'], 1)) self._addThread(target=self._doAddSet, args=(['nine'])) self.assertRaises(Queue.Empty, self.done.get_nowait) self.ls.release() self._waitThreads() for _ in range(5): self.assertEqual(self.done.get(True, 1), 'DONE') # cleanup self._setUpLS() @_Repeat def testConcurrentSetLockAdd(self): self.ls.acquire('one') # Another thread wants the whole SetLock self._addThread(target=self._doLockSet, args=(None, 0)) self._addThread(target=self._doLockSet, args=(None, 1)) self.assertRaises(Queue.Empty, self.done.get_nowait) self.assertRaises(AssertionError, self.ls.add, 'four') self.ls.release() self._waitThreads() self.assertEqual(self.done.get_nowait(), 'DONE') self.assertEqual(self.done.get_nowait(), 'DONE') self.ls.acquire(None) self._addThread(target=self._doLockSet, args=(None, 0)) self._addThread(target=self._doLockSet, args=(None, 1)) self.assertRaises(Queue.Empty, self.done.get_nowait) self.ls.add('four') self.ls.add('five', acquired=1) self.ls.add('six', acquired=1, shared=1) self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three', 'five', 'six'])) self.assertEquals(self.ls._is_owned(), True) self.assertEquals(self.ls._names(), set(['one', 'two', 'three', 'four', 'five', 'six'])) self.ls.release() self._waitThreads() self.assertEqual(self.done.get_nowait(), 'DONE') self.assertEqual(self.done.get_nowait(), 'DONE') self._setUpLS() @_Repeat def testEmptyLockSet(self): # get the set-lock self.assertEqual(self.ls.acquire(None), set(['one', 'two', 'three'])) # now empty it... self.ls.remove(['one', 'two', 'three']) # and adds/locks by another thread still wait self._addThread(target=self._doAddSet, args=(['nine'])) self._addThread(target=self._doLockSet, args=(None, 1)) self._addThread(target=self._doLockSet, args=(None, 0)) self.assertRaises(Queue.Empty, self.done.get_nowait) self.ls.release() self._waitThreads() for _ in range(3): self.assertEqual(self.done.get_nowait(), 'DONE') # empty it again... self.assertEqual(self.ls.remove(['nine']), ['nine']) # now share it... self.assertEqual(self.ls.acquire(None, shared=1), set()) # other sharers can go, adds still wait self._addThread(target=self._doLockSet, args=(None, 1)) self._waitThreads() self.assertEqual(self.done.get_nowait(), 'DONE') self._addThread(target=self._doAddSet, args=(['nine'])) self.assertRaises(Queue.Empty, self.done.get_nowait) self.ls.release() self._waitThreads() self.assertEqual(self.done.get_nowait(), 'DONE') self._setUpLS() def testPriority(self): def _Acquire(prev, next, name, priority, success_fn): prev.wait() self.assert_(self.ls.acquire(name, shared=0, priority=priority, test_notify=lambda _: next.set())) try: success_fn() finally: self.ls.release() # Get all in exclusive mode self.assert_(self.ls.acquire(locking.ALL_SET, shared=0)) done_two = Queue.Queue(0) first = threading.Event() prev = first acquires = [("one", prio, self.done) for prio in range(1, 33)] acquires.extend([("two", prio, done_two) for prio in range(1, 33)]) # Use a deterministic random generator random.Random(741).shuffle(acquires) for (name, prio, done) in acquires: ev = threading.Event() self._addThread(target=_Acquire, args=(prev, ev, name, prio, compat.partial(done.put, "Prio%s" % prio))) prev = ev # Start acquires first.set() # Wait for last acquire to start prev.wait() # Let threads acquire locks self.ls.release() # Wait for threads to finish self._waitThreads() for i in range(1, 33): self.assertEqual(self.done.get_nowait(), "Prio%s" % i) self.assertEqual(done_two.get_nowait(), "Prio%s" % i) self.assertRaises(Queue.Empty, self.done.get_nowait) self.assertRaises(Queue.Empty, done_two.get_nowait) class TestGanetiLockManager(_ThreadedTestCase): def setUp(self): _ThreadedTestCase.setUp(self) self.nodes=['n1', 'n2'] self.nodegroups=['g1', 'g2'] self.instances=['i1', 'i2', 'i3'] self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups, self.instances) def tearDown(self): # Don't try this at home... locking.GanetiLockManager._instance = None def testLockingConstants(self): # The locking library internally cheats by assuming its constants have some # relationships with each other. Check those hold true. # This relationship is also used in the Processor to recursively acquire # the right locks. Again, please don't break it. for i in range(len(locking.LEVELS)): self.assertEqual(i, locking.LEVELS[i]) def testDoubleGLFails(self): self.assertRaises(AssertionError, locking.GanetiLockManager, [], [], []) def testLockNames(self): self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL'])) self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes)) self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set(self.nodegroups)) self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set(self.instances)) def testInitAndResources(self): locking.GanetiLockManager._instance = None self.GL = locking.GanetiLockManager([], [], []) self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL'])) self.assertEqual(self.GL._names(locking.LEVEL_NODE), set()) self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set()) self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set()) locking.GanetiLockManager._instance = None self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups, []) self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL'])) self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes)) self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set(self.nodegroups)) self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set()) locking.GanetiLockManager._instance = None self.GL = locking.GanetiLockManager([], [], self.instances) self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL'])) self.assertEqual(self.GL._names(locking.LEVEL_NODE), set()) self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set()) self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set(self.instances)) def testAcquireRelease(self): self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1) self.assertEquals(self.GL._list_owned(locking.LEVEL_CLUSTER), set(['BGL'])) self.GL.acquire(locking.LEVEL_INSTANCE, ['i1']) self.GL.acquire(locking.LEVEL_NODEGROUP, ['g2']) self.GL.acquire(locking.LEVEL_NODE, ['n1', 'n2'], shared=1) self.GL.release(locking.LEVEL_NODE, ['n2']) self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set(['n1'])) self.assertEquals(self.GL._list_owned(locking.LEVEL_NODEGROUP), set(['g2'])) self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1'])) self.GL.release(locking.LEVEL_NODE) self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set()) self.assertEquals(self.GL._list_owned(locking.LEVEL_NODEGROUP), set(['g2'])) self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1'])) self.GL.release(locking.LEVEL_NODEGROUP) self.GL.release(locking.LEVEL_INSTANCE) self.assertRaises(errors.LockError, self.GL.acquire, locking.LEVEL_INSTANCE, ['i5']) self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'], shared=1) self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i3'])) def testAcquireWholeSets(self): self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1) self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None), set(self.instances)) self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(self.instances)) self.assertEquals(self.GL.acquire(locking.LEVEL_NODEGROUP, None), set(self.nodegroups)) self.assertEquals(self.GL._list_owned(locking.LEVEL_NODEGROUP), set(self.nodegroups)) self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1), set(self.nodes)) self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set(self.nodes)) self.GL.release(locking.LEVEL_NODE) self.GL.release(locking.LEVEL_NODEGROUP) self.GL.release(locking.LEVEL_INSTANCE) self.GL.release(locking.LEVEL_CLUSTER) def testAcquireWholeAndPartial(self): self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1) self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None), set(self.instances)) self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(self.instances)) self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ['n2'], shared=1), set(['n2'])) self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set(['n2'])) self.GL.release(locking.LEVEL_NODE) self.GL.release(locking.LEVEL_INSTANCE) self.GL.release(locking.LEVEL_CLUSTER) def testBGLDependency(self): self.assertRaises(AssertionError, self.GL.acquire, locking.LEVEL_NODE, ['n1', 'n2']) self.assertRaises(AssertionError, self.GL.acquire, locking.LEVEL_INSTANCE, ['i3']) self.assertRaises(AssertionError, self.GL.acquire, locking.LEVEL_NODEGROUP, ['g1']) self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1) self.GL.acquire(locking.LEVEL_NODE, ['n1']) self.assertRaises(AssertionError, self.GL.release, locking.LEVEL_CLUSTER, ['BGL']) self.assertRaises(AssertionError, self.GL.release, locking.LEVEL_CLUSTER) self.GL.release(locking.LEVEL_NODE) self.GL.acquire(locking.LEVEL_INSTANCE, ['i1', 'i2']) self.assertRaises(AssertionError, self.GL.release, locking.LEVEL_CLUSTER, ['BGL']) self.assertRaises(AssertionError, self.GL.release, locking.LEVEL_CLUSTER) self.GL.release(locking.LEVEL_INSTANCE) self.GL.acquire(locking.LEVEL_NODEGROUP, None) self.GL.release(locking.LEVEL_NODEGROUP, ['g1']) self.assertRaises(AssertionError, self.GL.release, locking.LEVEL_CLUSTER, ['BGL']) self.assertRaises(AssertionError, self.GL.release, locking.LEVEL_CLUSTER) self.GL.release(locking.LEVEL_NODEGROUP) self.GL.release(locking.LEVEL_CLUSTER) def testWrongOrder(self): self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1) self.GL.acquire(locking.LEVEL_NODE, ['n2']) self.assertRaises(AssertionError, self.GL.acquire, locking.LEVEL_NODE, ['n1']) self.assertRaises(AssertionError, self.GL.acquire, locking.LEVEL_NODEGROUP, ['g1']) self.assertRaises(AssertionError, self.GL.acquire, locking.LEVEL_INSTANCE, ['i2']) def testModifiableLevels(self): self.assertRaises(AssertionError, self.GL.add, locking.LEVEL_CLUSTER, ['BGL2']) self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL']) self.GL.add(locking.LEVEL_INSTANCE, ['i4']) self.GL.remove(locking.LEVEL_INSTANCE, ['i3']) self.GL.remove(locking.LEVEL_INSTANCE, ['i1']) self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set(['i2', 'i4'])) self.GL.add(locking.LEVEL_NODE, ['n3']) self.GL.remove(locking.LEVEL_NODE, ['n1']) self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(['n2', 'n3'])) self.GL.add(locking.LEVEL_NODEGROUP, ['g3']) self.GL.remove(locking.LEVEL_NODEGROUP, ['g2']) self.GL.remove(locking.LEVEL_NODEGROUP, ['g1']) self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set(['g3'])) self.assertRaises(AssertionError, self.GL.remove, locking.LEVEL_CLUSTER, ['BGL2']) # Helper function to run as a thread that shared the BGL and then acquires # some locks at another level. def _doLock(self, level, names, shared): try: self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1) self.GL.acquire(level, names, shared=shared) self.done.put('DONE') self.GL.release(level) self.GL.release(locking.LEVEL_CLUSTER) except errors.LockError: self.done.put('ERR') @_Repeat def testConcurrency(self): self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1) self._addThread(target=self._doLock, args=(locking.LEVEL_INSTANCE, 'i1', 1)) self._waitThreads() self.assertEqual(self.done.get_nowait(), 'DONE') self.GL.acquire(locking.LEVEL_INSTANCE, ['i3']) self._addThread(target=self._doLock, args=(locking.LEVEL_INSTANCE, 'i1', 1)) self._waitThreads() self.assertEqual(self.done.get_nowait(), 'DONE') self._addThread(target=self._doLock, args=(locking.LEVEL_INSTANCE, 'i3', 1)) self.assertRaises(Queue.Empty, self.done.get_nowait) self.GL.release(locking.LEVEL_INSTANCE) self._waitThreads() self.assertEqual(self.done.get_nowait(), 'DONE') self.GL.acquire(locking.LEVEL_INSTANCE, ['i2'], shared=1) self._addThread(target=self._doLock, args=(locking.LEVEL_INSTANCE, 'i2', 1)) self._waitThreads() self.assertEqual(self.done.get_nowait(), 'DONE') self._addThread(target=self._doLock, args=(locking.LEVEL_INSTANCE, 'i2', 0)) self.assertRaises(Queue.Empty, self.done.get_nowait) self.GL.release(locking.LEVEL_INSTANCE) self._waitThreads() self.assertEqual(self.done.get(True, 1), 'DONE') self.GL.release(locking.LEVEL_CLUSTER, ['BGL']) class TestLockMonitor(_ThreadedTestCase): def setUp(self): _ThreadedTestCase.setUp(self) self.lm = locking.LockMonitor() def testSingleThread(self): locks = [] for i in range(100): name = "TestLock%s" % i locks.append(locking.SharedLock(name, monitor=self.lm)) self.assertEqual(len(self.lm._locks), len(locks)) result = objects.QueryResponse.FromDict(self.lm.QueryLocks(["name"])) self.assertEqual(len(result.fields), 1) self.assertEqual(len(result.data), 100) # Delete all locks del locks[:] # The garbage collector might needs some time def _CheckLocks(): if self.lm._locks: raise utils.RetryAgain() utils.Retry(_CheckLocks, 0.1, 30.0) self.assertFalse(self.lm._locks) def testMultiThread(self): locks = [] def _CreateLock(prev, next, name): prev.wait() locks.append(locking.SharedLock(name, monitor=self.lm)) if next: next.set() expnames = [] first = threading.Event() prev = first # Use a deterministic random generator for i in random.Random(4263).sample(range(100), 33): name = "MtTestLock%s" % i expnames.append(name) ev = threading.Event() self._addThread(target=_CreateLock, args=(prev, ev, name)) prev = ev # Add locks first.set() self._waitThreads() # Check order in which locks were added self.assertEqual([i.name for i in locks], expnames) # Check query result result = self.lm.QueryLocks(["name", "mode", "owner", "pending"]) self.assert_(isinstance(result, dict)) response = objects.QueryResponse.FromDict(result) self.assertEqual(response.data, [[(constants.RS_NORMAL, name), (constants.RS_NORMAL, None), (constants.RS_NORMAL, None), (constants.RS_NORMAL, [])] for name in utils.NiceSort(expnames)]) self.assertEqual(len(response.fields), 4) self.assertEqual(["name", "mode", "owner", "pending"], [fdef.name for fdef in response.fields]) # Test exclusive acquire for tlock in locks[::4]: tlock.acquire(shared=0) try: def _GetExpResult(name): if tlock.name == name: return [(constants.RS_NORMAL, name), (constants.RS_NORMAL, "exclusive"), (constants.RS_NORMAL, [threading.currentThread().getName()]), (constants.RS_NORMAL, [])] return [(constants.RS_NORMAL, name), (constants.RS_NORMAL, None), (constants.RS_NORMAL, None), (constants.RS_NORMAL, [])] result = self.lm.QueryLocks(["name", "mode", "owner", "pending"]) self.assertEqual(objects.QueryResponse.FromDict(result).data, [_GetExpResult(name) for name in utils.NiceSort(expnames)]) finally: tlock.release() # Test shared acquire def _Acquire(lock, shared, ev, notify): lock.acquire(shared=shared) try: notify.set() ev.wait() finally: lock.release() for tlock1 in locks[::11]: for tlock2 in locks[::-15]: if tlock2 == tlock1: # Avoid deadlocks continue for tlock3 in locks[::10]: if tlock3 in (tlock2, tlock1): # Avoid deadlocks continue releaseev = threading.Event() # Acquire locks acquireev = [] tthreads1 = [] for i in range(3): ev = threading.Event() tthreads1.append(self._addThread(target=_Acquire, args=(tlock1, 1, releaseev, ev))) acquireev.append(ev) ev = threading.Event() tthread2 = self._addThread(target=_Acquire, args=(tlock2, 1, releaseev, ev)) acquireev.append(ev) ev = threading.Event() tthread3 = self._addThread(target=_Acquire, args=(tlock3, 0, releaseev, ev)) acquireev.append(ev) # Wait for all locks to be acquired for i in acquireev: i.wait() # Check query result result = self.lm.QueryLocks(["name", "mode", "owner"]) response = objects.QueryResponse.FromDict(result) for (name, mode, owner) in response.data: (name_status, name_value) = name (owner_status, owner_value) = owner self.assertEqual(name_status, constants.RS_NORMAL) self.assertEqual(owner_status, constants.RS_NORMAL) if name_value == tlock1.name: self.assertEqual(mode, (constants.RS_NORMAL, "shared")) self.assertEqual(set(owner_value), set(i.getName() for i in tthreads1)) continue if name_value == tlock2.name: self.assertEqual(mode, (constants.RS_NORMAL, "shared")) self.assertEqual(owner_value, [tthread2.getName()]) continue if name_value == tlock3.name: self.assertEqual(mode, (constants.RS_NORMAL, "exclusive")) self.assertEqual(owner_value, [tthread3.getName()]) continue self.assert_(name_value in expnames) self.assertEqual(mode, (constants.RS_NORMAL, None)) self.assert_(owner_value is None) # Release locks again releaseev.set() self._waitThreads() result = self.lm.QueryLocks(["name", "mode", "owner"]) self.assertEqual(objects.QueryResponse.FromDict(result).data, [[(constants.RS_NORMAL, name), (constants.RS_NORMAL, None), (constants.RS_NORMAL, None)] for name in utils.NiceSort(expnames)]) def testDelete(self): lock = locking.SharedLock("TestLock", monitor=self.lm) self.assertEqual(len(self.lm._locks), 1) result = self.lm.QueryLocks(["name", "mode", "owner"]) self.assertEqual(objects.QueryResponse.FromDict(result).data, [[(constants.RS_NORMAL, lock.name), (constants.RS_NORMAL, None), (constants.RS_NORMAL, None)]]) lock.delete() result = self.lm.QueryLocks(["name", "mode", "owner"]) self.assertEqual(objects.QueryResponse.FromDict(result).data, [[(constants.RS_NORMAL, lock.name), (constants.RS_NORMAL, "deleted"), (constants.RS_NORMAL, None)]]) self.assertEqual(len(self.lm._locks), 1) def testPending(self): def _Acquire(lock, shared, prev, next): prev.wait() lock.acquire(shared=shared, test_notify=next.set) try: pass finally: lock.release() lock = locking.SharedLock("ExcLock", monitor=self.lm) for shared in [0, 1]: lock.acquire() try: self.assertEqual(len(self.lm._locks), 1) result = self.lm.QueryLocks(["name", "mode", "owner"]) self.assertEqual(objects.QueryResponse.FromDict(result).data, [[(constants.RS_NORMAL, lock.name), (constants.RS_NORMAL, "exclusive"), (constants.RS_NORMAL, [threading.currentThread().getName()])]]) threads = [] first = threading.Event() prev = first for i in range(5): ev = threading.Event() threads.append(self._addThread(target=_Acquire, args=(lock, shared, prev, ev))) prev = ev # Start acquires first.set() # Wait for last acquire to start waiting prev.wait() # NOTE: This works only because QueryLocks will acquire the # lock-internal lock again and won't be able to get the information # until it has the lock. By then the acquire should be registered in # SharedLock.__pending (otherwise it's a bug). # All acquires are waiting now if shared: pending = [("shared", utils.NiceSort(t.getName() for t in threads))] else: pending = [("exclusive", [t.getName()]) for t in threads] result = self.lm.QueryLocks(["name", "mode", "owner", "pending"]) self.assertEqual(objects.QueryResponse.FromDict(result).data, [[(constants.RS_NORMAL, lock.name), (constants.RS_NORMAL, "exclusive"), (constants.RS_NORMAL, [threading.currentThread().getName()]), (constants.RS_NORMAL, pending)]]) self.assertEqual(len(self.lm._locks), 1) finally: lock.release() self._waitThreads() # No pending acquires result = self.lm.QueryLocks(["name", "mode", "owner", "pending"]) self.assertEqual(objects.QueryResponse.FromDict(result).data, [[(constants.RS_NORMAL, lock.name), (constants.RS_NORMAL, None), (constants.RS_NORMAL, None), (constants.RS_NORMAL, [])]]) self.assertEqual(len(self.lm._locks), 1) if __name__ == '__main__': testutils.GanetiTestProgram()