Commit a95fd5d7 authored by Guido Trotter's avatar Guido Trotter
Browse files

Add the delete() operation to SharedLock

This new operation lets a lock be cleanly deleted. The lock will be exclusively
held before deletion, and after it pending and future acquires will raise an
exception. Other SharedLock operations are modify to deal with delete() and to
avoid code duplication.

This patch also adds unit testing for the new function and its interaction with
the other lock features. The helper threads are sligtly modified to handle and
report the condition of a deleted lock. As a bonus a non-related unit test
about not supporting non-blocking mode yet has been added as well.

This feature will be used by the LockSet in order to support deadlock-free
delete of resources. This in turn will be useful to gracefully handle the
removal of instances and nodes from the cluster dealing with the fact that
other operations may be pending on them.

Reviewed-by: iustinp
parent d6646186
......@@ -23,6 +23,9 @@
# pylint: disable-msg=W0613,W0201
import threading
# Wouldn't it be better to define LockingError in the locking module?
# Well, for now that's how the rest of the code does it...
from ganeti import errors
class SharedLock:
......@@ -52,6 +55,9 @@ class SharedLock:
self.__nwait_exc = 0
self.__nwait_shr = 0
# is this lock in the deleted state?
self.__deleted = False
def __is_sharer(self):
"""Is the current thread sharing the lock at this time?"""
return threading.currentThread() in self.__shr
......@@ -92,6 +98,41 @@ class SharedLock:
return result
def __wait(self,c):
"""Wait on the given condition, and raise an exception if the current lock
is declared deleted in the meantime.
Args:
c: condition to wait on
"""
c.wait()
if self.__deleted:
raise errors.LockError('deleted lock')
def __exclusive_acquire(self):
"""Acquire the lock exclusively.
This is a private function that presumes you are already holding the
internal lock. It's defined separately to avoid code duplication between
acquire() and delete()
"""
self.__nwait_exc += 1
try:
# This is to save ourselves from a nasty race condition that could
# theoretically make the sharers starve.
if self.__nwait_shr > 0 or self.__nwait_exc > 1:
self.__wait(self.__turn_exc)
while len(self.__shr) > 0 or self.__exc is not None:
self.__wait(self.__turn_exc)
self.__exc = threading.currentThread()
finally:
self.__nwait_exc -= 1
def acquire(self, blocking=1, shared=0):
"""Acquire a shared lock.
......@@ -108,6 +149,9 @@ class SharedLock:
self.__lock.acquire()
try:
if self.__deleted:
raise errors.LockError('deleted lock')
# We cannot acquire the lock if we already have it
assert not self.__is_owned(), "double acquire() on a non-recursive lock"
......@@ -119,32 +163,20 @@ class SharedLock:
# we'll just wait while there are no exclusive holders.
if self.__nwait_exc > 0:
# TODO: if !blocking...
self.__turn_shr.wait()
self.__wait(self.__turn_shr)
while self.__exc is not None:
# TODO: if !blocking...
self.__turn_shr.wait()
self.__wait(self.__turn_shr)
self.__shr.add(threading.currentThread())
finally:
self.__nwait_shr -= 1
else:
self.__nwait_exc += 1
try:
# This is to save ourselves from a nasty race condition that could
# theoretically make the sharers starve.
if self.__nwait_shr > 0 or self.__nwait_exc > 1:
# TODO: if !blocking...
self.__turn_exc.wait()
while len(self.__shr) > 0 or self.__exc is not None:
# TODO: if !blocking...
self.__turn_exc.wait()
self.__exc = threading.currentThread()
finally:
self.__nwait_exc -= 1
# TODO: if !blocking...
# (or modify __exclusive_acquire for non-blocking mode)
self.__exclusive_acquire()
finally:
self.__lock.release()
......@@ -191,3 +223,39 @@ class SharedLock:
finally:
self.__lock.release()
def delete(self, blocking=1):
"""Delete a Shared Lock.
This operation will declare the lock for removal. First the lock will be
acquired in exclusive mode if you don't already own it, then the lock
will be put in a state where any future and pending acquire() fail.
Args:
blocking: whether to block while trying to acquire or to operate in
try-lock mode. this locking mode is not supported yet unless
you are already holding exclusively the lock.
"""
self.__lock.acquire()
try:
assert not self.__is_sharer(), "cannot delete() a lock while sharing it"
if self.__deleted:
raise errors.LockError('deleted lock')
if not self.__is_exclusive():
if not blocking:
# We don't have non-blocking mode for now
raise NotImplementedError
self.__exclusive_acquire()
self.__deleted = True
self.__exc = None
# Wake up everybody, they will fail acquiring the lock and
# raise an exception instead.
self.__turn_exc.notifyAll()
self.__turn_shr.notifyAll()
finally:
self.__lock.release()
......@@ -28,6 +28,7 @@ import time
import Queue
from ganeti import locking
from ganeti import errors
from threading import Thread
......@@ -86,14 +87,28 @@ class TestSharedLock(unittest.TestCase):
# helper functions: called in a separate thread they acquire the lock, send
# their identifier on the done queue, then release it.
def _doItSharer(self):
self.sl.acquire(shared=1)
self.done.put('SHR')
self.sl.release()
try:
self.sl.acquire(shared=1)
self.done.put('SHR')
self.sl.release()
except errors.LockError:
self.done.put('ERR')
def _doItExclusive(self):
self.sl.acquire()
self.done.put('EXC')
self.sl.release()
try:
self.sl.acquire()
self.done.put('EXC')
self.sl.release()
except errors.LockError:
self.done.put('ERR')
def _doItDelete(self):
try:
self.sl.acquire()
self.done.put('DEL')
self.sl.release()
except errors.LockError:
self.done.put('ERR')
def testSharersCanCoexist(self):
self.sl.acquire(shared=1)
......@@ -109,6 +124,14 @@ class TestSharedLock(unittest.TestCase):
self.sl.release()
self.assert_(self.done.get(True, 1))
def testExclusiveBlocksDelete(self):
self.sl.acquire()
Thread(target=self._doItDelete).start()
# give it a bit of time to check that it's not actually doing anything
self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
self.sl.release()
self.assert_(self.done.get(True, 1))
def testExclusiveBlocksSharer(self):
self.sl.acquire()
Thread(target=self._doItSharer).start()
......@@ -125,6 +148,14 @@ class TestSharedLock(unittest.TestCase):
self.sl.release()
self.assert_(self.done.get(True, 1))
def testSharerBlocksDelete(self):
self.sl.acquire(shared=1)
Thread(target=self._doItDelete).start()
time.sleep(0.05)
self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
self.sl.release()
self.assert_(self.done.get(True, 1))
def testWaitingExclusiveBlocksSharer(self):
self.sl.acquire(shared=1)
# the lock is acquired in shared mode...
......@@ -153,6 +184,47 @@ class TestSharedLock(unittest.TestCase):
self.assertEqual(self.done.get(True, 1), 'SHR')
self.assertEqual(self.done.get(True, 1), 'EXC')
def testNoNonBlocking(self):
self.assertRaises(NotImplementedError, self.sl.acquire, blocking=0)
self.assertRaises(NotImplementedError, self.sl.delete, blocking=0)
self.sl.acquire()
self.sl.delete(blocking=0) # Fine, because the lock is already acquired
def testDelete(self):
self.sl.delete()
self.assertRaises(errors.LockError, self.sl.acquire)
self.assertRaises(errors.LockError, self.sl.delete)
def testDeletePendingSharersExclusiveDelete(self):
self.sl.acquire()
Thread(target=self._doItSharer).start()
Thread(target=self._doItSharer).start()
time.sleep(0.05)
Thread(target=self._doItExclusive).start()
Thread(target=self._doItDelete).start()
time.sleep(0.05)
self.sl.delete()
# The two threads who were pending return both ERR
self.assertEqual(self.done.get(True, 1), 'ERR')
self.assertEqual(self.done.get(True, 1), 'ERR')
self.assertEqual(self.done.get(True, 1), 'ERR')
self.assertEqual(self.done.get(True, 1), 'ERR')
def testDeletePendingDeleteExclusiveSharers(self):
self.sl.acquire()
Thread(target=self._doItDelete).start()
Thread(target=self._doItExclusive).start()
time.sleep(0.05)
Thread(target=self._doItSharer).start()
Thread(target=self._doItSharer).start()
time.sleep(0.05)
self.sl.delete()
# The two threads who were pending return both ERR
self.assertEqual(self.done.get(True, 1), 'ERR')
self.assertEqual(self.done.get(True, 1), 'ERR')
self.assertEqual(self.done.get(True, 1), 'ERR')
self.assertEqual(self.done.get(True, 1), 'ERR')
if __name__ == '__main__':
unittest.main()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment