From 162c1c1f115d3604c3621c84aeb71753563e25a5 Mon Sep 17 00:00:00 2001
From: Guido Trotter <ultrotter@google.com>
Date: Fri, 8 Feb 2008 11:23:40 +0000
Subject: [PATCH] Shared Lock implementation and unit tests.

Adding a locking.py file for the ganeti locking library. Its first component is
the implementation of a non-recursive blocking shared lock complete with a
testing library.

Reviewed-by: imsnah, iustinp
---
 lib/locking.py                  | 193 ++++++++++++++++++++++++++++++++
 test/Makefile.am                |   3 +-
 test/ganeti.locking_unittest.py | 160 ++++++++++++++++++++++++++
 3 files changed, 355 insertions(+), 1 deletion(-)
 create mode 100644 lib/locking.py
 create mode 100755 test/ganeti.locking_unittest.py

diff --git a/lib/locking.py b/lib/locking.py
new file mode 100644
index 000000000..4e9911026
--- /dev/null
+++ b/lib/locking.py
@@ -0,0 +1,193 @@
+#
+#
+
+# Copyright (C) 2006, 2007 Google Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+# 02110-1301, USA.
+
+"""Module implementing the Ganeti locking code."""
+
+# pylint: disable-msg=W0613,W0201
+
+import threading
+
+
+class SharedLock:
+  """Implements a shared lock.
+
+  Multiple threads can acquire the lock in a shared way, calling
+  acquire_shared().  In order to acquire the lock in an exclusive way threads
+  can call acquire_exclusive().
+
+  The lock prevents starvation but does not guarantee that threads will acquire
+  the shared lock in the order they queued for it, just that they will
+  eventually do so.
+
+  """
+  def __init__(self):
+    """Construct a new Shared Lock"""
+    # we have two conditions, c_shr and c_exc, sharing the same lock.
+    self.__lock = threading.Lock()
+    self.__turn_shr = threading.Condition(self.__lock)
+    self.__turn_exc = threading.Condition(self.__lock)
+
+    # current lock holders
+    self.__shr = set()
+    self.__exc = None
+
+    # lock waiters
+    self.__nwait_exc = 0
+    self.__nwait_shr = 0
+
+  def __is_sharer(self):
+    """Is the current thread sharing the lock at this time?"""
+    return threading.currentThread() in self.__shr
+
+  def __is_exclusive(self):
+    """Is the current thread holding the lock exclusively at this time?"""
+    return threading.currentThread() == self.__exc
+
+  def __is_owned(self, shared=-1):
+    """Is the current thread somehow owning the lock at this time?
+
+    This is a private version of the function, which presumes you're holding
+    the internal lock.
+
+    """
+    if shared < 0:
+      return self.__is_sharer() or self.__is_exclusive()
+    elif shared:
+      return self.__is_sharer()
+    else:
+      return self.__is_exclusive()
+
+  def _is_owned(self, shared=-1):
+    """Is the current thread somehow owning the lock at this time?
+
+    Args:
+      shared:
+        < 0: check for any type of ownership (default)
+        0: check for exclusive ownership
+        > 0: check for shared ownership
+
+    """
+    self.__lock.acquire()
+    try:
+      result = self.__is_owned(shared)
+    finally:
+      self.__lock.release()
+
+    return result
+
+  def acquire(self, blocking=1, shared=0):
+    """Acquire a shared lock.
+
+    Args:
+      shared: whether to acquire in shared mode. By default an exclusive lock
+              will be acquired.
+      blocking: whether to block while trying to acquire or to operate in try-lock mode.
+                this locking mode is not supported yet.
+
+    """
+    if not blocking:
+      # We don't have non-blocking mode for now
+      raise NotImplementedError
+
+    self.__lock.acquire()
+    try:
+      # We cannot acquire the lock if we already have it
+      assert not self.__is_owned(), "double acquire() on a non-recursive lock"
+
+      if shared:
+        self.__nwait_shr += 1
+        try:
+          # If there is an exclusive holder waiting we have to wait.  We'll
+          # only do this once, though, when we start waiting for the lock. Then
+          # we'll just wait while there are no exclusive holders.
+          if self.__nwait_exc > 0:
+            # TODO: if !blocking...
+            self.__turn_shr.wait()
+
+          while self.__exc is not None:
+            # TODO: if !blocking...
+            self.__turn_shr.wait()
+
+          self.__shr.add(threading.currentThread())
+        finally:
+          self.__nwait_shr -= 1
+
+      else:
+        self.__nwait_exc += 1
+        try:
+          # This is to save ourselves from a nasty race condition that could
+          # theoretically make the sharers starve.
+          if self.__nwait_shr > 0 or self.__nwait_exc > 1:
+            # TODO: if !blocking...
+              self.__turn_exc.wait()
+
+          while len(self.__shr) > 0 or self.__exc is not None:
+            # TODO: if !blocking...
+            self.__turn_exc.wait()
+
+          self.__exc = threading.currentThread()
+        finally:
+          self.__nwait_exc -= 1
+
+    finally:
+      self.__lock.release()
+
+    return True
+
+  def release(self):
+    """Release a Shared Lock.
+
+    You must have acquired the lock, either in shared or in exclusive mode,
+    before calling this function.
+
+    """
+    self.__lock.acquire()
+    try:
+      # Autodetect release type
+      if self.__is_exclusive():
+        self.__exc = None
+
+        # An exclusive holder has just had the lock, time to put it in shared
+        # mode if there are shared holders waiting. Otherwise wake up the next
+        # exclusive holder.
+        if self.__nwait_shr > 0:
+          self.__turn_shr.notifyAll()
+        elif self.__nwait_exc > 0:
+         self.__turn_exc.notify()
+
+      elif self.__is_sharer():
+        self.__shr.remove(threading.currentThread())
+
+        # If there are shared holders waiting there *must* be an exclusive holder
+        # waiting as well; otherwise what were they waiting for?
+        assert (self.__nwait_shr == 0 or self.__nwait_exc > 0,
+                "Lock sharers waiting while no exclusive is queueing")
+
+        # If there are no more shared holders and some exclusive holders are
+        # waiting let's wake one up.
+        if len(self.__shr) == 0 and self.__nwait_exc > 0:
+          self.__turn_exc.notify()
+
+      else:
+        assert False, "Cannot release non-owned lock"
+
+    finally:
+      self.__lock.release()
+
diff --git a/test/Makefile.am b/test/Makefile.am
index d2af6b35a..3eed00002 100644
--- a/test/Makefile.am
+++ b/test/Makefile.am
@@ -2,7 +2,8 @@ TESTS = \
   ganeti.config_unittest.py \
   ganeti.hooks_unittest.py \
   ganeti.utils_unittest.py \
-  ganeti.bdev_unittest.py
+  ganeti.bdev_unittest.py \
+  ganeti.locking_unittest.py
 
 TESTS_ENVIRONMENT = PYTHONPATH=.:$(top_builddir)
 
diff --git a/test/ganeti.locking_unittest.py b/test/ganeti.locking_unittest.py
new file mode 100755
index 000000000..bd2691b36
--- /dev/null
+++ b/test/ganeti.locking_unittest.py
@@ -0,0 +1,160 @@
+#!/usr/bin/python
+#
+
+# Copyright (C) 2006, 2007 Google Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+# 0.0510-1301, USA.
+
+
+"""Script for unittesting the locking module"""
+
+
+import os
+import unittest
+import time
+import Queue
+
+from ganeti import locking
+from threading import Thread
+
+
+class TestSharedLock(unittest.TestCase):
+  """Shared lock tests"""
+
+  def setUp(self):
+    self.sl = locking.SharedLock()
+    # helper threads use the 'done' queue to tell the master they finished.
+    self.done = Queue.Queue(0)
+
+  def testSequenceAndOwnership(self):
+    self.assert_(not self.sl._is_owned())
+    self.sl.acquire(shared=1)
+    self.assert_(self.sl._is_owned())
+    self.assert_(self.sl._is_owned(shared=1))
+    self.assert_(not self.sl._is_owned(shared=0))
+    self.sl.release()
+    self.assert_(not self.sl._is_owned())
+    self.sl.acquire()
+    self.assert_(self.sl._is_owned())
+    self.assert_(not self.sl._is_owned(shared=1))
+    self.assert_(self.sl._is_owned(shared=0))
+    self.sl.release()
+    self.assert_(not self.sl._is_owned())
+    self.sl.acquire(shared=1)
+    self.assert_(self.sl._is_owned())
+    self.assert_(self.sl._is_owned(shared=1))
+    self.assert_(not self.sl._is_owned(shared=0))
+    self.sl.release()
+    self.assert_(not self.sl._is_owned())
+
+  def testBooleanValue(self):
+    # semaphores are supposed to return a true value on a successful acquire
+    self.assert_(self.sl.acquire(shared=1))
+    self.sl.release()
+    self.assert_(self.sl.acquire())
+    self.sl.release()
+
+  def testDoubleLockingStoE(self):
+    self.sl.acquire(shared=1)
+    self.assertRaises(AssertionError, self.sl.acquire)
+
+  def testDoubleLockingEtoS(self):
+    self.sl.acquire()
+    self.assertRaises(AssertionError, self.sl.acquire, shared=1)
+
+  def testDoubleLockingStoS(self):
+    self.sl.acquire(shared=1)
+    self.assertRaises(AssertionError, self.sl.acquire, shared=1)
+
+  def testDoubleLockingEtoE(self):
+    self.sl.acquire()
+    self.assertRaises(AssertionError, self.sl.acquire)
+
+  # helper functions: called in a separate thread they acquire the lock, send
+  # their identifier on the done queue, then release it.
+  def _doItSharer(self):
+    self.sl.acquire(shared=1)
+    self.done.put('SHR')
+    self.sl.release()
+
+  def _doItExclusive(self):
+    self.sl.acquire()
+    self.done.put('EXC')
+    self.sl.release()
+
+  def testSharersCanCoexist(self):
+    self.sl.acquire(shared=1)
+    Thread(target=self._doItSharer).start()
+    self.assert_(self.done.get(True, 1))
+    self.sl.release()
+
+  def testExclusiveBlocksExclusive(self):
+    self.sl.acquire()
+    Thread(target=self._doItExclusive).start()
+    # give it a bit of time to check that it's not actually doing anything
+    self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
+    self.sl.release()
+    self.assert_(self.done.get(True, 1))
+
+  def testExclusiveBlocksSharer(self):
+    self.sl.acquire()
+    Thread(target=self._doItSharer).start()
+    time.sleep(0.05)
+    self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
+    self.sl.release()
+    self.assert_(self.done.get(True, 1))
+
+  def testSharerBlocksExclusive(self):
+    self.sl.acquire(shared=1)
+    Thread(target=self._doItExclusive).start()
+    time.sleep(0.05)
+    self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
+    self.sl.release()
+    self.assert_(self.done.get(True, 1))
+
+  def testWaitingExclusiveBlocksSharer(self):
+    self.sl.acquire(shared=1)
+    # the lock is acquired in shared mode...
+    Thread(target=self._doItExclusive).start()
+    # ...but now an exclusive is waiting...
+    time.sleep(0.05)
+    Thread(target=self._doItSharer).start()
+    # ...so the sharer should be blocked as well
+    self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
+    self.sl.release()
+    # The exclusive passed before
+    self.assertEqual(self.done.get(True, 1), 'EXC')
+    self.assertEqual(self.done.get(True, 1), 'SHR')
+
+  def testWaitingSharerBlocksExclusive(self):
+    self.sl.acquire()
+    # the lock is acquired in exclusive mode...
+    Thread(target=self._doItSharer).start()
+    # ...but now a sharer is waiting...
+    time.sleep(0.05)
+    Thread(target=self._doItExclusive).start()
+    # ...the exclusive is waiting too...
+    self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
+    self.sl.release()
+    # The sharer passed before
+    self.assertEqual(self.done.get(True, 1), 'SHR')
+    self.assertEqual(self.done.get(True, 1), 'EXC')
+
+
+if __name__ == '__main__':
+  unittest.main()
+  #suite = unittest.TestLoader().loadTestsFromTestCase(TestSharedLock)
+  #unittest.TextTestRunner(verbosity=2).run(suite)
-- 
GitLab