diff --git a/lib/locking.py b/lib/locking.py index 440eecf1e41e6aafd87cc85d3e9c197875fd4575..b164ff33fde5d1a32b544e6f069edfbd7a813973 100644 --- a/lib/locking.py +++ b/lib/locking.py @@ -23,6 +23,9 @@ # pylint: disable-msg=W0613,W0201 import threading +# Wouldn't it be better to define LockingError in the locking module? +# Well, for now that's how the rest of the code does it... +from ganeti import errors class SharedLock: @@ -52,6 +55,9 @@ class SharedLock: self.__nwait_exc = 0 self.__nwait_shr = 0 + # is this lock in the deleted state? + self.__deleted = False + def __is_sharer(self): """Is the current thread sharing the lock at this time?""" return threading.currentThread() in self.__shr @@ -92,6 +98,41 @@ class SharedLock: return result + def __wait(self,c): + """Wait on the given condition, and raise an exception if the current lock + is declared deleted in the meantime. + + Args: + c: condition to wait on + + """ + c.wait() + if self.__deleted: + raise errors.LockError('deleted lock') + + def __exclusive_acquire(self): + """Acquire the lock exclusively. + + This is a private function that presumes you are already holding the + internal lock. It's defined separately to avoid code duplication between + acquire() and delete() + + """ + self.__nwait_exc += 1 + try: + # This is to save ourselves from a nasty race condition that could + # theoretically make the sharers starve. + if self.__nwait_shr > 0 or self.__nwait_exc > 1: + self.__wait(self.__turn_exc) + + while len(self.__shr) > 0 or self.__exc is not None: + self.__wait(self.__turn_exc) + + self.__exc = threading.currentThread() + finally: + self.__nwait_exc -= 1 + + def acquire(self, blocking=1, shared=0): """Acquire a shared lock. @@ -108,6 +149,9 @@ class SharedLock: self.__lock.acquire() try: + if self.__deleted: + raise errors.LockError('deleted lock') + # We cannot acquire the lock if we already have it assert not self.__is_owned(), "double acquire() on a non-recursive lock" @@ -119,32 +163,20 @@ class SharedLock: # we'll just wait while there are no exclusive holders. if self.__nwait_exc > 0: # TODO: if !blocking... - self.__turn_shr.wait() + self.__wait(self.__turn_shr) while self.__exc is not None: # TODO: if !blocking... - self.__turn_shr.wait() + self.__wait(self.__turn_shr) self.__shr.add(threading.currentThread()) finally: self.__nwait_shr -= 1 else: - self.__nwait_exc += 1 - try: - # This is to save ourselves from a nasty race condition that could - # theoretically make the sharers starve. - if self.__nwait_shr > 0 or self.__nwait_exc > 1: - # TODO: if !blocking... - self.__turn_exc.wait() - - while len(self.__shr) > 0 or self.__exc is not None: - # TODO: if !blocking... - self.__turn_exc.wait() - - self.__exc = threading.currentThread() - finally: - self.__nwait_exc -= 1 + # TODO: if !blocking... + # (or modify __exclusive_acquire for non-blocking mode) + self.__exclusive_acquire() finally: self.__lock.release() @@ -191,3 +223,39 @@ class SharedLock: finally: self.__lock.release() + def delete(self, blocking=1): + """Delete a Shared Lock. + + This operation will declare the lock for removal. First the lock will be + acquired in exclusive mode if you don't already own it, then the lock + will be put in a state where any future and pending acquire() fail. + + Args: + blocking: whether to block while trying to acquire or to operate in + try-lock mode. this locking mode is not supported yet unless + you are already holding exclusively the lock. + + """ + self.__lock.acquire() + try: + assert not self.__is_sharer(), "cannot delete() a lock while sharing it" + + if self.__deleted: + raise errors.LockError('deleted lock') + + if not self.__is_exclusive(): + if not blocking: + # We don't have non-blocking mode for now + raise NotImplementedError + self.__exclusive_acquire() + + self.__deleted = True + self.__exc = None + # Wake up everybody, they will fail acquiring the lock and + # raise an exception instead. + self.__turn_exc.notifyAll() + self.__turn_shr.notifyAll() + + finally: + self.__lock.release() + diff --git a/test/ganeti.locking_unittest.py b/test/ganeti.locking_unittest.py index 4ed355e224be3aa68d3ae6b0140bc7be1bde45fe..89470934a4dad5a573057313af5873a2339b7338 100755 --- a/test/ganeti.locking_unittest.py +++ b/test/ganeti.locking_unittest.py @@ -28,6 +28,7 @@ import time import Queue from ganeti import locking +from ganeti import errors from threading import Thread @@ -86,14 +87,28 @@ class TestSharedLock(unittest.TestCase): # helper functions: called in a separate thread they acquire the lock, send # their identifier on the done queue, then release it. def _doItSharer(self): - self.sl.acquire(shared=1) - self.done.put('SHR') - self.sl.release() + try: + self.sl.acquire(shared=1) + self.done.put('SHR') + self.sl.release() + except errors.LockError: + self.done.put('ERR') def _doItExclusive(self): - self.sl.acquire() - self.done.put('EXC') - self.sl.release() + try: + self.sl.acquire() + self.done.put('EXC') + self.sl.release() + except errors.LockError: + self.done.put('ERR') + + def _doItDelete(self): + try: + self.sl.acquire() + self.done.put('DEL') + self.sl.release() + except errors.LockError: + self.done.put('ERR') def testSharersCanCoexist(self): self.sl.acquire(shared=1) @@ -109,6 +124,14 @@ class TestSharedLock(unittest.TestCase): self.sl.release() self.assert_(self.done.get(True, 1)) + def testExclusiveBlocksDelete(self): + self.sl.acquire() + Thread(target=self._doItDelete).start() + # give it a bit of time to check that it's not actually doing anything + self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self.sl.release() + self.assert_(self.done.get(True, 1)) + def testExclusiveBlocksSharer(self): self.sl.acquire() Thread(target=self._doItSharer).start() @@ -125,6 +148,14 @@ class TestSharedLock(unittest.TestCase): self.sl.release() self.assert_(self.done.get(True, 1)) + def testSharerBlocksDelete(self): + self.sl.acquire(shared=1) + Thread(target=self._doItDelete).start() + time.sleep(0.05) + self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self.sl.release() + self.assert_(self.done.get(True, 1)) + def testWaitingExclusiveBlocksSharer(self): self.sl.acquire(shared=1) # the lock is acquired in shared mode... @@ -153,6 +184,47 @@ class TestSharedLock(unittest.TestCase): self.assertEqual(self.done.get(True, 1), 'SHR') self.assertEqual(self.done.get(True, 1), 'EXC') + def testNoNonBlocking(self): + self.assertRaises(NotImplementedError, self.sl.acquire, blocking=0) + self.assertRaises(NotImplementedError, self.sl.delete, blocking=0) + self.sl.acquire() + self.sl.delete(blocking=0) # Fine, because the lock is already acquired + + def testDelete(self): + self.sl.delete() + self.assertRaises(errors.LockError, self.sl.acquire) + self.assertRaises(errors.LockError, self.sl.delete) + + def testDeletePendingSharersExclusiveDelete(self): + self.sl.acquire() + Thread(target=self._doItSharer).start() + Thread(target=self._doItSharer).start() + time.sleep(0.05) + Thread(target=self._doItExclusive).start() + Thread(target=self._doItDelete).start() + time.sleep(0.05) + self.sl.delete() + # The two threads who were pending return both ERR + self.assertEqual(self.done.get(True, 1), 'ERR') + self.assertEqual(self.done.get(True, 1), 'ERR') + self.assertEqual(self.done.get(True, 1), 'ERR') + self.assertEqual(self.done.get(True, 1), 'ERR') + + def testDeletePendingDeleteExclusiveSharers(self): + self.sl.acquire() + Thread(target=self._doItDelete).start() + Thread(target=self._doItExclusive).start() + time.sleep(0.05) + Thread(target=self._doItSharer).start() + Thread(target=self._doItSharer).start() + time.sleep(0.05) + self.sl.delete() + # The two threads who were pending return both ERR + self.assertEqual(self.done.get(True, 1), 'ERR') + self.assertEqual(self.done.get(True, 1), 'ERR') + self.assertEqual(self.done.get(True, 1), 'ERR') + self.assertEqual(self.done.get(True, 1), 'ERR') + if __name__ == '__main__': unittest.main()