From a95fd5d733f4db65a0c9a0777feecdfdb931f1f1 Mon Sep 17 00:00:00 2001 From: Guido Trotter <ultrotter@google.com> Date: Tue, 19 Feb 2008 13:50:57 +0000 Subject: [PATCH] Add the delete() operation to SharedLock This new operation lets a lock be cleanly deleted. The lock will be exclusively held before deletion, and after it pending and future acquires will raise an exception. Other SharedLock operations are modify to deal with delete() and to avoid code duplication. This patch also adds unit testing for the new function and its interaction with the other lock features. The helper threads are sligtly modified to handle and report the condition of a deleted lock. As a bonus a non-related unit test about not supporting non-blocking mode yet has been added as well. This feature will be used by the LockSet in order to support deadlock-free delete of resources. This in turn will be useful to gracefully handle the removal of instances and nodes from the cluster dealing with the fact that other operations may be pending on them. Reviewed-by: iustinp --- lib/locking.py | 102 ++++++++++++++++++++++++++------ test/ganeti.locking_unittest.py | 84 ++++++++++++++++++++++++-- 2 files changed, 163 insertions(+), 23 deletions(-) diff --git a/lib/locking.py b/lib/locking.py index 440eecf1e..b164ff33f 100644 --- a/lib/locking.py +++ b/lib/locking.py @@ -23,6 +23,9 @@ # pylint: disable-msg=W0613,W0201 import threading +# Wouldn't it be better to define LockingError in the locking module? +# Well, for now that's how the rest of the code does it... +from ganeti import errors class SharedLock: @@ -52,6 +55,9 @@ class SharedLock: self.__nwait_exc = 0 self.__nwait_shr = 0 + # is this lock in the deleted state? + self.__deleted = False + def __is_sharer(self): """Is the current thread sharing the lock at this time?""" return threading.currentThread() in self.__shr @@ -92,6 +98,41 @@ class SharedLock: return result + def __wait(self,c): + """Wait on the given condition, and raise an exception if the current lock + is declared deleted in the meantime. + + Args: + c: condition to wait on + + """ + c.wait() + if self.__deleted: + raise errors.LockError('deleted lock') + + def __exclusive_acquire(self): + """Acquire the lock exclusively. + + This is a private function that presumes you are already holding the + internal lock. It's defined separately to avoid code duplication between + acquire() and delete() + + """ + self.__nwait_exc += 1 + try: + # This is to save ourselves from a nasty race condition that could + # theoretically make the sharers starve. + if self.__nwait_shr > 0 or self.__nwait_exc > 1: + self.__wait(self.__turn_exc) + + while len(self.__shr) > 0 or self.__exc is not None: + self.__wait(self.__turn_exc) + + self.__exc = threading.currentThread() + finally: + self.__nwait_exc -= 1 + + def acquire(self, blocking=1, shared=0): """Acquire a shared lock. @@ -108,6 +149,9 @@ class SharedLock: self.__lock.acquire() try: + if self.__deleted: + raise errors.LockError('deleted lock') + # We cannot acquire the lock if we already have it assert not self.__is_owned(), "double acquire() on a non-recursive lock" @@ -119,32 +163,20 @@ class SharedLock: # we'll just wait while there are no exclusive holders. if self.__nwait_exc > 0: # TODO: if !blocking... - self.__turn_shr.wait() + self.__wait(self.__turn_shr) while self.__exc is not None: # TODO: if !blocking... - self.__turn_shr.wait() + self.__wait(self.__turn_shr) self.__shr.add(threading.currentThread()) finally: self.__nwait_shr -= 1 else: - self.__nwait_exc += 1 - try: - # This is to save ourselves from a nasty race condition that could - # theoretically make the sharers starve. - if self.__nwait_shr > 0 or self.__nwait_exc > 1: - # TODO: if !blocking... - self.__turn_exc.wait() - - while len(self.__shr) > 0 or self.__exc is not None: - # TODO: if !blocking... - self.__turn_exc.wait() - - self.__exc = threading.currentThread() - finally: - self.__nwait_exc -= 1 + # TODO: if !blocking... + # (or modify __exclusive_acquire for non-blocking mode) + self.__exclusive_acquire() finally: self.__lock.release() @@ -191,3 +223,39 @@ class SharedLock: finally: self.__lock.release() + def delete(self, blocking=1): + """Delete a Shared Lock. + + This operation will declare the lock for removal. First the lock will be + acquired in exclusive mode if you don't already own it, then the lock + will be put in a state where any future and pending acquire() fail. + + Args: + blocking: whether to block while trying to acquire or to operate in + try-lock mode. this locking mode is not supported yet unless + you are already holding exclusively the lock. + + """ + self.__lock.acquire() + try: + assert not self.__is_sharer(), "cannot delete() a lock while sharing it" + + if self.__deleted: + raise errors.LockError('deleted lock') + + if not self.__is_exclusive(): + if not blocking: + # We don't have non-blocking mode for now + raise NotImplementedError + self.__exclusive_acquire() + + self.__deleted = True + self.__exc = None + # Wake up everybody, they will fail acquiring the lock and + # raise an exception instead. + self.__turn_exc.notifyAll() + self.__turn_shr.notifyAll() + + finally: + self.__lock.release() + diff --git a/test/ganeti.locking_unittest.py b/test/ganeti.locking_unittest.py index 4ed355e22..89470934a 100755 --- a/test/ganeti.locking_unittest.py +++ b/test/ganeti.locking_unittest.py @@ -28,6 +28,7 @@ import time import Queue from ganeti import locking +from ganeti import errors from threading import Thread @@ -86,14 +87,28 @@ class TestSharedLock(unittest.TestCase): # helper functions: called in a separate thread they acquire the lock, send # their identifier on the done queue, then release it. def _doItSharer(self): - self.sl.acquire(shared=1) - self.done.put('SHR') - self.sl.release() + try: + self.sl.acquire(shared=1) + self.done.put('SHR') + self.sl.release() + except errors.LockError: + self.done.put('ERR') def _doItExclusive(self): - self.sl.acquire() - self.done.put('EXC') - self.sl.release() + try: + self.sl.acquire() + self.done.put('EXC') + self.sl.release() + except errors.LockError: + self.done.put('ERR') + + def _doItDelete(self): + try: + self.sl.acquire() + self.done.put('DEL') + self.sl.release() + except errors.LockError: + self.done.put('ERR') def testSharersCanCoexist(self): self.sl.acquire(shared=1) @@ -109,6 +124,14 @@ class TestSharedLock(unittest.TestCase): self.sl.release() self.assert_(self.done.get(True, 1)) + def testExclusiveBlocksDelete(self): + self.sl.acquire() + Thread(target=self._doItDelete).start() + # give it a bit of time to check that it's not actually doing anything + self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self.sl.release() + self.assert_(self.done.get(True, 1)) + def testExclusiveBlocksSharer(self): self.sl.acquire() Thread(target=self._doItSharer).start() @@ -125,6 +148,14 @@ class TestSharedLock(unittest.TestCase): self.sl.release() self.assert_(self.done.get(True, 1)) + def testSharerBlocksDelete(self): + self.sl.acquire(shared=1) + Thread(target=self._doItDelete).start() + time.sleep(0.05) + self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self.sl.release() + self.assert_(self.done.get(True, 1)) + def testWaitingExclusiveBlocksSharer(self): self.sl.acquire(shared=1) # the lock is acquired in shared mode... @@ -153,6 +184,47 @@ class TestSharedLock(unittest.TestCase): self.assertEqual(self.done.get(True, 1), 'SHR') self.assertEqual(self.done.get(True, 1), 'EXC') + def testNoNonBlocking(self): + self.assertRaises(NotImplementedError, self.sl.acquire, blocking=0) + self.assertRaises(NotImplementedError, self.sl.delete, blocking=0) + self.sl.acquire() + self.sl.delete(blocking=0) # Fine, because the lock is already acquired + + def testDelete(self): + self.sl.delete() + self.assertRaises(errors.LockError, self.sl.acquire) + self.assertRaises(errors.LockError, self.sl.delete) + + def testDeletePendingSharersExclusiveDelete(self): + self.sl.acquire() + Thread(target=self._doItSharer).start() + Thread(target=self._doItSharer).start() + time.sleep(0.05) + Thread(target=self._doItExclusive).start() + Thread(target=self._doItDelete).start() + time.sleep(0.05) + self.sl.delete() + # The two threads who were pending return both ERR + self.assertEqual(self.done.get(True, 1), 'ERR') + self.assertEqual(self.done.get(True, 1), 'ERR') + self.assertEqual(self.done.get(True, 1), 'ERR') + self.assertEqual(self.done.get(True, 1), 'ERR') + + def testDeletePendingDeleteExclusiveSharers(self): + self.sl.acquire() + Thread(target=self._doItDelete).start() + Thread(target=self._doItExclusive).start() + time.sleep(0.05) + Thread(target=self._doItSharer).start() + Thread(target=self._doItSharer).start() + time.sleep(0.05) + self.sl.delete() + # The two threads who were pending return both ERR + self.assertEqual(self.done.get(True, 1), 'ERR') + self.assertEqual(self.done.get(True, 1), 'ERR') + self.assertEqual(self.done.get(True, 1), 'ERR') + self.assertEqual(self.done.get(True, 1), 'ERR') + if __name__ == '__main__': unittest.main() -- GitLab