Newer
Older
# Copyright (C) 2006, 2007, 2010 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 0.0510-1301, USA.
"""Script for unittesting the locking module"""
import os
import unittest
import time
import Queue
import itertools
from ganeti import compat
from ganeti import objects
from ganeti import query
# This is used to test the ssynchronize decorator.
# Since it's passed as input to a decorator it must be declared as a global.
_decoratorlock = locking.SharedLock("decorator lock")
#: List for looping tests
ITERATIONS = range(8)
def _Repeat(fn):
"""Decorator for executing a function many times"""
def wrapper(*args, **kwargs):
for i in ITERATIONS:
fn(*args, **kwargs)
return wrapper
def SafeSleep(duration):
start = time.time()
while True:
delay = start + duration - time.time()
if delay <= 0.0:
break
time.sleep(delay)
class _ThreadedTestCase(unittest.TestCase):
"""Test class that supports adding/waiting on threads"""
def setUp(self):
unittest.TestCase.setUp(self)
self.done = Queue.Queue(0)
self.threads = []
def _addThread(self, *args, **kwargs):
"""Create and remember a new thread"""
t = threading.Thread(*args, **kwargs)
self.threads.append(t)
t.start()
return t
def _waitThreads(self):
"""Wait for all our threads to finish"""
for t in self.threads:
t.join(60)
self.failIf(t.isAlive())
self.threads = []
class _ConditionTestCase(_ThreadedTestCase):
"""Common test case for conditions"""
_ThreadedTestCase.setUp(self)
self.lock = threading.Lock()
self.assertFalse(self.cond._is_owned())
self.assertRaises(RuntimeError, self.cond.wait)
self.assertRaises(RuntimeError, self.cond.notifyAll)
self.cond.acquire()
self.assert_(self.cond._is_owned())
self.cond.notifyAll()
self.assert_(self.cond._is_owned())
self.cond.release()
self.assertFalse(self.cond._is_owned())
self.assertRaises(RuntimeError, self.cond.wait)
self.assertRaises(RuntimeError, self.cond.notifyAll)
self.cond.release()
self.cond.acquire()
self._addThread(target=_NotifyAll)
self.assertEqual(self.done.get(True, 1), "NE")
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.assertEqual(self.done.get(True, 1), "NA")
self.assertEqual(self.done.get(True, 1), "NN")
self.assert_(self.cond._is_owned())
self.cond.release()
self.assertFalse(self.cond._is_owned())
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
class TestSingleNotifyPipeCondition(_ConditionTestCase):
"""SingleNotifyPipeCondition tests"""
def setUp(self):
_ConditionTestCase.setUp(self, locking.SingleNotifyPipeCondition)
def testAcquireRelease(self):
self._testAcquireRelease()
def testNotification(self):
self._testNotification()
def testWaitReuse(self):
self.cond.acquire()
self.cond.wait(0)
self.cond.wait(0.1)
self.cond.release()
def testNoNotifyReuse(self):
self.cond.acquire()
self.cond.notifyAll()
self.assertRaises(RuntimeError, self.cond.wait)
self.assertRaises(RuntimeError, self.cond.notifyAll)
self.cond.release()
class TestPipeCondition(_ConditionTestCase):
_ConditionTestCase.setUp(self, locking.PipeCondition)
def testAcquireRelease(self):
self._testAcquireRelease()
def testNotification(self):
self._testNotification()
threads = [
self._addThread(target=fn),
self._addThread(target=fn),
self._addThread(target=fn),
]
for _ in threads:
self.assertEqual(self.done.get(True, 1), "A")
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.cond.acquire()
self.assertEqual(len(self.cond._waiters), 3)
self.assertEqual(self.cond._waiters, set(threads))
# This new thread can't acquire the lock, and thus call wait, before we
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
# release it
self._addThread(target=fn)
self.cond.notifyAll()
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.cond.release()
# We should now get 3 W and 1 A (for the new thread) in whatever order
w = 0
a = 0
for i in range(4):
got = self.done.get(True, 1)
if got == "W":
w += 1
elif got == "A":
a += 1
else:
self.fail("Got %s on the done queue" % got)
self.assertEqual(w, 3)
self.assertEqual(a, 1)
self.cond.acquire()
self.cond.notifyAll()
self.cond.release()
self._waitThreads()
self.assertEqual(self.done.get_nowait(), "W")
self.assertRaises(Queue.Empty, self.done.get_nowait)
def testBlockingWait(self):
def _BlockingWait():
self.cond.acquire()
self.done.put("A")
self.cond.wait()
self.cond.release()
self.done.put("W")
self._TestWait(_BlockingWait)
def testLongTimeoutWait(self):
def _Helper():
self.cond.acquire()
self.done.put("A")
self.cond.wait(15.0)
self.cond.release()
self.done.put("W")
self._TestWait(_Helper)
def _TimeoutWait(self, timeout, check):
self.cond.acquire()
self.cond.wait(timeout)
self.cond.release()
self.done.put(check)
def testShortTimeoutWait(self):
self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
self._waitThreads()
self.assertEqual(self.done.get_nowait(), "T1")
self.assertEqual(self.done.get_nowait(), "T1")
self.assertRaises(Queue.Empty, self.done.get_nowait)
def testZeroTimeoutWait(self):
self._addThread(target=self._TimeoutWait, args=(0, "T0"))
self._addThread(target=self._TimeoutWait, args=(0, "T0"))
self._addThread(target=self._TimeoutWait, args=(0, "T0"))
self._waitThreads()
self.assertEqual(self.done.get_nowait(), "T0")
self.assertEqual(self.done.get_nowait(), "T0")
self.assertEqual(self.done.get_nowait(), "T0")
self.assertRaises(Queue.Empty, self.done.get_nowait)
class TestSharedLock(_ThreadedTestCase):
_ThreadedTestCase.setUp(self)
self.sl = locking.SharedLock("TestSharedLock")
def testSequenceAndOwnership(self):
self.assertFalse(self.sl._is_owned())
self.sl.acquire(shared=1)
self.assert_(self.sl._is_owned())
self.assert_(self.sl._is_owned(shared=1))
self.assertFalse(self.sl._is_owned(shared=0))
self.assertFalse(self.sl._is_owned())
self.sl.acquire()
self.assert_(self.sl._is_owned())
self.assertFalse(self.sl._is_owned(shared=1))
self.assert_(self.sl._is_owned(shared=0))
self.sl.release()
self.assertFalse(self.sl._is_owned())
self.sl.acquire(shared=1)
self.assert_(self.sl._is_owned())
self.assert_(self.sl._is_owned(shared=1))
self.assertFalse(self.sl._is_owned(shared=0))
self.assertFalse(self.sl._is_owned())
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
def testBooleanValue(self):
# semaphores are supposed to return a true value on a successful acquire
self.assert_(self.sl.acquire(shared=1))
self.sl.release()
self.assert_(self.sl.acquire())
self.sl.release()
def testDoubleLockingStoE(self):
self.sl.acquire(shared=1)
self.assertRaises(AssertionError, self.sl.acquire)
def testDoubleLockingEtoS(self):
self.sl.acquire()
self.assertRaises(AssertionError, self.sl.acquire, shared=1)
def testDoubleLockingStoS(self):
self.sl.acquire(shared=1)
self.assertRaises(AssertionError, self.sl.acquire, shared=1)
def testDoubleLockingEtoE(self):
self.sl.acquire()
self.assertRaises(AssertionError, self.sl.acquire)
# helper functions: called in a separate thread they acquire the lock, send
# their identifier on the done queue, then release it.
def _doItSharer(self):
try:
self.sl.acquire(shared=1)
self.done.put('SHR')
self.sl.release()
except errors.LockError:
self.done.put('ERR')
def _doItExclusive(self):
try:
self.sl.acquire()
self.done.put('EXC')
self.sl.release()
except errors.LockError:
self.done.put('ERR')
def _doItDelete(self):
try:
self.done.put('DEL')
except errors.LockError:
self.done.put('ERR')
def testSharersCanCoexist(self):
self.sl.acquire(shared=1)
threading.Thread(target=self._doItSharer).start()
self.assert_(self.done.get(True, 1))
self.sl.release()
def testExclusiveBlocksExclusive(self):
self.sl.acquire()
self._addThread(target=self._doItExclusive)
self.assertRaises(Queue.Empty, self.done.get_nowait)
self._waitThreads()
self.failUnlessEqual(self.done.get_nowait(), 'EXC')
def testExclusiveBlocksDelete(self):
self.sl.acquire()
self._addThread(target=self._doItDelete)
self.assertRaises(Queue.Empty, self.done.get_nowait)
self._waitThreads()
self.failUnlessEqual(self.done.get_nowait(), 'DEL')
self.sl = locking.SharedLock(self.sl.name)
def testExclusiveBlocksSharer(self):
self.sl.acquire()
self._addThread(target=self._doItSharer)
self.assertRaises(Queue.Empty, self.done.get_nowait)
self._waitThreads()
self.failUnlessEqual(self.done.get_nowait(), 'SHR')
def testSharerBlocksExclusive(self):
self.sl.acquire(shared=1)
self._addThread(target=self._doItExclusive)
self.assertRaises(Queue.Empty, self.done.get_nowait)
self._waitThreads()
self.failUnlessEqual(self.done.get_nowait(), 'EXC')
def testSharerBlocksDelete(self):
self.sl.acquire(shared=1)
self._addThread(target=self._doItDelete)
self.assertRaises(Queue.Empty, self.done.get_nowait)
self._waitThreads()
self.failUnlessEqual(self.done.get_nowait(), 'DEL')
self.sl = locking.SharedLock(self.sl.name)
def testWaitingExclusiveBlocksSharer(self):
"""SKIPPED testWaitingExclusiveBlockSharer"""
return
self.sl.acquire(shared=1)
# the lock is acquired in shared mode...
self._addThread(target=self._doItExclusive)
# ...but now an exclusive is waiting...
self._addThread(target=self._doItSharer)
# ...so the sharer should be blocked as well
self.assertRaises(Queue.Empty, self.done.get_nowait)
# The exclusive passed before
self.failUnlessEqual(self.done.get_nowait(), 'EXC')
self.failUnlessEqual(self.done.get_nowait(), 'SHR')
def testWaitingSharerBlocksExclusive(self):
"""SKIPPED testWaitingSharerBlocksExclusive"""
return
self.sl.acquire()
# the lock is acquired in exclusive mode...
self._addThread(target=self._doItSharer)
# ...but now a sharer is waiting...
self._addThread(target=self._doItExclusive)
# ...the exclusive is waiting too...
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.assertEqual(self.done.get_nowait(), 'SHR')
self.assertEqual(self.done.get_nowait(), 'EXC')
def testDelete(self):
self.sl.delete()
self.assertRaises(errors.LockError, self.sl.acquire)
self.assertRaises(errors.LockError, self.sl.acquire, shared=1)
self.assertRaises(errors.LockError, self.sl.delete)
def testDeleteTimeout(self):
self.sl.delete(timeout=60)
def testNoDeleteIfSharer(self):
self.sl.acquire(shared=1)
self.assertRaises(AssertionError, self.sl.delete)
def testDeletePendingSharersExclusiveDelete(self):
self.sl.acquire()
self._addThread(target=self._doItSharer)
self._addThread(target=self._doItSharer)
self._addThread(target=self._doItExclusive)
self._addThread(target=self._doItDelete)
self._waitThreads()
# The threads who were pending return ERR
for _ in range(4):
self.assertEqual(self.done.get_nowait(), 'ERR')
self.sl = locking.SharedLock(self.sl.name)
def testDeletePendingDeleteExclusiveSharers(self):
self.sl.acquire()
self._addThread(target=self._doItDelete)
self._addThread(target=self._doItExclusive)
self._addThread(target=self._doItSharer)
self._addThread(target=self._doItSharer)
# The two threads who were pending return both ERR
self.assertEqual(self.done.get_nowait(), 'ERR')
self.assertEqual(self.done.get_nowait(), 'ERR')
self.assertEqual(self.done.get_nowait(), 'ERR')
self.assertEqual(self.done.get_nowait(), 'ERR')
self.sl = locking.SharedLock(self.sl.name)
@_Repeat
def testExclusiveAcquireTimeout(self):
for shared in [0, 1]:
on_queue = threading.Event()
release_exclusive = threading.Event()
def _LockExclusive():
self.sl.acquire(shared=0, test_notify=on_queue.set)
self.done.put("A: start wait")
release_exclusive.wait()
self.done.put("A: end wait")
self.sl.release()
# Start thread to hold lock in exclusive mode
self._addThread(target=_LockExclusive)
# Wait for wait to begin
self.assertEqual(self.done.get(timeout=60), "A: start wait")
# Wait up to 60s to get lock, but release exclusive lock as soon as we're
# on the queue
self.failUnless(self.sl.acquire(shared=shared, timeout=60,
test_notify=release_exclusive.set))
self.done.put("got 2nd")
self.sl.release()
self._waitThreads()
self.assertEqual(self.done.get_nowait(), "A: end wait")
self.assertEqual(self.done.get_nowait(), "got 2nd")
self.assertRaises(Queue.Empty, self.done.get_nowait)
@_Repeat
def testAcquireExpiringTimeout(self):
def _AcquireWithTimeout(shared, timeout):
if not self.sl.acquire(shared=shared, timeout=timeout):
self.done.put("timeout")
for shared in [0, 1]:
# Lock exclusively
self.sl.acquire()
# Start shared acquires with timeout between 0 and 20 ms
self._addThread(target=_AcquireWithTimeout,
args=(shared, i * 2.0 / 1000.0))
# Wait for threads to finish (makes sure the acquire timeout expires
# before releasing the lock)
self._waitThreads()
# Release lock
self.sl.release()
self.assertEqual(self.done.get_nowait(), "timeout")
self.assertRaises(Queue.Empty, self.done.get_nowait)
@_Repeat
def testSharedSkipExclusiveAcquires(self):
# Tests whether shared acquires jump in front of exclusive acquires in the
# queue.
def _Acquire(shared, name, notify_ev, wait_ev):
if notify_ev:
notify_fn = notify_ev.set
else:
notify_fn = None
if wait_ev:
wait_ev.wait()
if not self.sl.acquire(shared=shared, test_notify=notify_fn):
return
self.done.put(name)
self.sl.release()
# Get exclusive lock while we fill the queue
self.sl.acquire()
shrcnt1 = 5
shrcnt2 = 7
shrcnt3 = 9
shrcnt4 = 2
# Add acquires using threading.Event for synchronization. They'll be
# acquired exactly in the order defined in this list.
acquires = (shrcnt1 * [(1, "shared 1")] +
3 * [(0, "exclusive 1")] +
shrcnt2 * [(1, "shared 2")] +
shrcnt3 * [(1, "shared 3")] +
shrcnt4 * [(1, "shared 4")] +
3 * [(0, "exclusive 2")])
ev_cur = None
ev_prev = None
for args in acquires:
ev_cur = threading.Event()
self._addThread(target=_Acquire, args=args + (ev_cur, ev_prev))
ev_prev = ev_cur
# Wait for last acquire to start
ev_prev.wait()
# Expect 6 pending exclusive acquires and 1 for all shared acquires
# together
self.assertEqual(self.sl._count_pending(), 7)
# Release exclusive lock and wait
self.sl.release()
self._waitThreads()
# Check sequence
for _ in range(shrcnt1 + shrcnt2 + shrcnt3 + shrcnt4):
# Shared locks aren't guaranteed to be notified in order, but they'll be
# first
if tmp == "shared 1":
shrcnt1 -= 1
elif tmp == "shared 2":
shrcnt2 -= 1
elif tmp == "shared 3":
shrcnt3 -= 1
elif tmp == "shared 4":
shrcnt4 -= 1
self.assertEqual(shrcnt1, 0)
self.assertEqual(shrcnt2, 0)
self.assertEqual(shrcnt3, 0)
self.assertEqual(shrcnt3, 0)
self.assertEqual(self.done.get_nowait(), "exclusive 1")
self.assertEqual(self.done.get_nowait(), "exclusive 2")
self.assertRaises(Queue.Empty, self.done.get_nowait)
@_Repeat
def testMixedAcquireTimeout(self):
sync = threading.Event()
def _AcquireShared(ev):
if not self.sl.acquire(shared=1, timeout=None):
return
self.done.put("shared")
# Notify main thread
ev.set()
# Wait for notification from main thread
sync.wait()
# Release lock
self.sl.release()
acquires = []
ev = threading.Event()
self._addThread(target=_AcquireShared, args=(ev, ))
acquires.append(ev)
# Wait for all acquires to finish
for i in acquires:
i.wait()
self.assertEqual(self.sl._count_pending(), 0)
# Try to get exclusive lock
self.failIf(self.sl.acquire(shared=0, timeout=0.02))
# Acquire exclusive without timeout
exclsync = threading.Event()
exclev = threading.Event()
def _AcquireExclusive():
if not self.sl.acquire(shared=0):
return
self.done.put("exclusive")
# Notify main thread
exclev.set()
# Wait for notification from main thread
exclsync.wait()
self.sl.release()
self._addThread(target=_AcquireExclusive)
# Try to get exclusive lock
self.failIf(self.sl.acquire(shared=0, timeout=0.02))
# Make all shared holders release their locks
sync.set()
# Wait for exclusive acquire to succeed
exclev.wait()
self.assertEqual(self.sl._count_pending(), 0)
# Try to get exclusive lock
self.failIf(self.sl.acquire(shared=0, timeout=0.02))
def _AcquireSharedSimple():
if self.sl.acquire(shared=1, timeout=None):
self.done.put("shared2")
self.sl.release()
self._addThread(target=_AcquireSharedSimple)
# Tell exclusive lock to release
exclsync.set()
# Wait for everything to finish
self._waitThreads()
self.assertEqual(self.sl._count_pending(), 0)
# Check sequence
self.assertEqual(self.done.get_nowait(), "shared")
self.assertEqual(self.done.get_nowait(), "exclusive")
self.assertEqual(self.done.get_nowait(), "shared2")
self.assertRaises(Queue.Empty, self.done.get_nowait)
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
def testPriority(self):
# Acquire in exclusive mode
self.assert_(self.sl.acquire(shared=0))
# Queue acquires
def _Acquire(prev, next, shared, priority, result):
prev.wait()
self.sl.acquire(shared=shared, priority=priority, test_notify=next.set)
try:
self.done.put(result)
finally:
self.sl.release()
counter = itertools.count(0)
priorities = range(-20, 30)
first = threading.Event()
prev = first
# Data structure:
# {
# priority:
# [(shared/exclusive, set(acquire names), set(pending threads)),
# (shared/exclusive, ...),
# ...,
# ],
# }
perprio = {}
# References shared acquire per priority in L{perprio}. Data structure:
# {
# priority: (shared=1, set(acquire names), set(pending threads)),
# }
prioshared = {}
for seed in [4979, 9523, 14902, 32440]:
# Use a deterministic random generator
rnd = random.Random(seed)
for priority in [rnd.choice(priorities) for _ in range(30)]:
modes = [0, 1]
rnd.shuffle(modes)
for shared in modes:
# Unique name
acqname = "%s/shr=%s/prio=%s" % (counter.next(), shared, priority)
ev = threading.Event()
thread = self._addThread(target=_Acquire,
args=(prev, ev, shared, priority, acqname))
prev = ev
# Record expected aqcuire, see above for structure
data = (shared, set([acqname]), set([thread]))
priolist = perprio.setdefault(priority, [])
if shared:
priosh = prioshared.get(priority, None)
if priosh:
# Shared acquires are merged
for i, j in zip(priosh[1:], data[1:]):
i.update(j)
assert data[0] == priosh[0]
else:
prioshared[priority] = data
priolist.append(data)
else:
priolist.append(data)
# Start all acquires and wait for them
first.set()
prev.wait()
# Check lock information
self.assertEqual(self.sl.GetInfo(set()), (self.sl.name, None, None, None))
self.assertEqual(self.sl.GetInfo(set([query.LQ_MODE, query.LQ_OWNER])),
(self.sl.name, "exclusive",
[threading.currentThread().getName()], None))
self._VerifyPrioPending(self.sl.GetInfo(set([query.LQ_PENDING])), perprio)
# Let threads acquire the lock
self.sl.release()
# Wait for everything to finish
self._waitThreads()
self.assert_(self.sl._check_empty())
# Check acquires by priority
for acquires in [perprio[i] for i in sorted(perprio.keys())]:
for (_, names, _) in acquires:
# For shared acquires, the set will contain 1..n entries. For exclusive
# acquires only one.
while names:
names.remove(self.done.get_nowait())
self.assertFalse(compat.any(names for (_, names, _) in acquires))
self.assertRaises(Queue.Empty, self.done.get_nowait)
def _VerifyPrioPending(self, (name, mode, owner, pending), perprio):
self.assertEqual(name, self.sl.name)
self.assert_(mode is None)
self.assert_(owner is None)
self.assertEqual([(pendmode, sorted(waiting))
for (pendmode, waiting) in pending],
[(["exclusive", "shared"][int(bool(shared))],
sorted(t.getName() for t in threads))
for acquires in [perprio[i]
for i in sorted(perprio.keys())]
for (shared, _, threads) in acquires])
class TestSharedLockInCondition(_ThreadedTestCase):
"""SharedLock as a condition lock tests"""
def setUp(self):
_ThreadedTestCase.setUp(self)
self.sl = locking.SharedLock("TestSharedLockInCondition")
self.setCondition()
def setCondition(self):
self.cond = threading.Condition(self.sl)
def testKeepMode(self):
self.cond.acquire(shared=1)
self.assert_(self.sl._is_owned(shared=1))
self.cond.wait(0)
self.assert_(self.sl._is_owned(shared=1))
self.cond.release()
self.cond.acquire(shared=0)
self.assert_(self.sl._is_owned(shared=0))
self.cond.wait(0)
self.assert_(self.sl._is_owned(shared=0))
self.cond.release()
class TestSharedLockInPipeCondition(TestSharedLockInCondition):
"""SharedLock as a pipe condition lock tests"""
def setCondition(self):
self.cond = locking.PipeCondition(self.sl)
class TestSSynchronizedDecorator(_ThreadedTestCase):
"""Shared Lock Synchronized decorator test"""
def setUp(self):
_ThreadedTestCase.setUp(self)
@locking.ssynchronized(_decoratorlock)
def _doItExclusive(self):
self.assert_(_decoratorlock._is_owned())
self.done.put('EXC')
@locking.ssynchronized(_decoratorlock, shared=1)
def _doItSharer(self):
self.assert_(_decoratorlock._is_owned(shared=1))
self.done.put('SHR')
def testDecoratedFunctions(self):
self._doItExclusive()
self.assertFalse(_decoratorlock._is_owned())
self.assertFalse(_decoratorlock._is_owned())
def testSharersCanCoexist(self):
_decoratorlock.acquire(shared=1)
threading.Thread(target=self._doItSharer).start()
self.assert_(self.done.get(True, 1))
_decoratorlock.release()
def testExclusiveBlocksExclusive(self):
_decoratorlock.acquire()
self._addThread(target=self._doItExclusive)
# give it a bit of time to check that it's not actually doing anything
self.assertRaises(Queue.Empty, self.done.get_nowait)
self._waitThreads()
self.failUnlessEqual(self.done.get_nowait(), 'EXC')
def testExclusiveBlocksSharer(self):
_decoratorlock.acquire()
self._addThread(target=self._doItSharer)
self.assertRaises(Queue.Empty, self.done.get_nowait)
self._waitThreads()
self.failUnlessEqual(self.done.get_nowait(), 'SHR')
def testSharerBlocksExclusive(self):
_decoratorlock.acquire(shared=1)
self._addThread(target=self._doItExclusive)
self.assertRaises(Queue.Empty, self.done.get_nowait)
self._waitThreads()
self.failUnlessEqual(self.done.get_nowait(), 'EXC')
class TestLockSet(_ThreadedTestCase):
"""LockSet tests"""
def setUp(self):
_ThreadedTestCase.setUp(self)
self._setUpLS()
def _setUpLS(self):
"""Helper to (re)initialize the lock set"""
self.resources = ['one', 'two', 'three']
self.ls = locking.LockSet(self.resources, "TestLockSet")
def testResources(self):
self.assertEquals(self.ls._names(), set(self.resources))
newls = locking.LockSet([], "TestLockSet.testResources")
self.assertEquals(newls._names(), set())
def testAcquireRelease(self):
self.assert_(self.ls.acquire('one'))
self.assertEquals(self.ls._list_owned(), set(['one']))
self.ls.release()
self.assertEquals(self.ls._list_owned(), set())
self.assertEquals(self.ls.acquire(['one']), set(['one']))
self.assertEquals(self.ls._list_owned(), set(['one']))
self.ls.release()
self.assertEquals(self.ls._list_owned(), set())
self.ls.acquire(['one', 'two', 'three'])
self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
self.ls.release('one')
self.assertEquals(self.ls._list_owned(), set(['two', 'three']))
self.ls.release(['three'])
self.assertEquals(self.ls._list_owned(), set(['two']))
self.ls.release()
self.assertEquals(self.ls._list_owned(), set())
self.assertEquals(self.ls.acquire(['one', 'three']), set(['one', 'three']))
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
self.assertEquals(self.ls._list_owned(), set(['one', 'three']))
self.ls.release()
self.assertEquals(self.ls._list_owned(), set())
def testNoDoubleAcquire(self):
self.ls.acquire('one')
self.assertRaises(AssertionError, self.ls.acquire, 'one')
self.assertRaises(AssertionError, self.ls.acquire, ['two'])
self.assertRaises(AssertionError, self.ls.acquire, ['two', 'three'])
self.ls.release()
self.ls.acquire(['one', 'three'])
self.ls.release('one')
self.assertRaises(AssertionError, self.ls.acquire, ['two'])
self.ls.release('three')
def testNoWrongRelease(self):
self.assertRaises(AssertionError, self.ls.release)
self.ls.acquire('one')
self.assertRaises(AssertionError, self.ls.release, 'two')
def testAddRemove(self):
self.ls.add('four')
self.assertEquals(self.ls._list_owned(), set())
self.assert_('four' in self.ls._names())
self.ls.add(['five', 'six', 'seven'], acquired=1)
self.assert_('five' in self.ls._names())
self.assert_('six' in self.ls._names())
self.assert_('seven' in self.ls._names())
self.assertEquals(self.ls._list_owned(), set(['five', 'six', 'seven']))
self.assertEquals(self.ls.remove(['five', 'six']), ['five', 'six'])
self.assert_('five' not in self.ls._names())
self.assert_('six' not in self.ls._names())
self.assertEquals(self.ls._list_owned(), set(['seven']))
self.assertRaises(AssertionError, self.ls.add, 'eight', acquired=1)
self.ls.remove('seven')
self.assert_('seven' not in self.ls._names())
self.assertEquals(self.ls._list_owned(), set([]))
self.ls.acquire(None, shared=1)
self.assertRaises(AssertionError, self.ls.add, 'eight')
self.ls.release()
self.ls.acquire(None)
self.ls.add('eight', acquired=1)
self.assert_('eight' in self.ls._names())
self.assert_('eight' in self.ls._list_owned())
self.ls.add('nine')
self.assert_('nine' in self.ls._names())
self.assert_('nine' not in self.ls._list_owned())
self.ls.release()
self.ls.remove(['two'])
self.assert_('two' not in self.ls._names())
self.ls.acquire('three')
self.assertEquals(self.ls.remove(['three']), ['three'])
self.assert_('three' not in self.ls._names())
self.assertEquals(self.ls.remove('three'), [])
self.assertEquals(self.ls.remove(['one', 'three', 'six']), ['one'])
self.assert_('one' not in self.ls._names())
def testRemoveNonBlocking(self):
self.ls.acquire('one')