From 407339d0ff568320621977937fdd2203c863d486 Mon Sep 17 00:00:00 2001
From: Michael Hanselmann <hansmi@google.com>
Date: Wed, 7 Oct 2009 14:58:06 +0200
Subject: [PATCH] mcpu: Implement lock timeouts
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

The timeout is always between ~0.1 and ~10.0 seconds. A small
variation of Β±5% is added to prevent different jobs from
fighting each other. After 10 attempts to acquire the locks with
a timeout, a blocking acquire is made.

Lock status reporting will be improved in a separate patch.

Signed-off-by: Michael Hanselmann <hansmi@google.com>
Reviewed-by: Guido Trotter <ultrotter@google.com>
---
 Makefile.am                  |   1 +
 lib/mcpu.py                  | 213 +++++++++++++++++++++++++++--------
 test/ganeti.mcpu_unittest.py |  56 +++++++++
 3 files changed, 225 insertions(+), 45 deletions(-)
 create mode 100755 test/ganeti.mcpu_unittest.py

diff --git a/Makefile.am b/Makefile.am
index 9d693566a..51270a059 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -272,6 +272,7 @@ dist_TESTS = \
 	test/ganeti.hooks_unittest.py \
 	test/ganeti.http_unittest.py \
 	test/ganeti.locking_unittest.py \
+	test/ganeti.mcpu_unittest.py \
 	test/ganeti.objects_unittest.py \
 	test/ganeti.rapi.resources_unittest.py \
 	test/ganeti.serializer_unittest.py \
diff --git a/lib/mcpu.py b/lib/mcpu.py
index 6a054f93e..7b910fda7 100644
--- a/lib/mcpu.py
+++ b/lib/mcpu.py
@@ -29,6 +29,8 @@ are two kinds of classes defined:
 """
 
 import logging
+import random
+import time
 
 from ganeti import opcodes
 from ganeti import constants
@@ -39,6 +41,92 @@ from ganeti import locking
 from ganeti import utils
 
 
+class _LockAcquireTimeout(Exception):
+  """Internal exception to report timeouts on acquiring locks.
+
+  """
+
+
+class _LockTimeoutStrategy(object):
+  """Class with lock acquire timeout strategy.
+
+  """
+  __slots__ = [
+    "_attempts",
+    "_random_fn",
+    "_start_time",
+    ]
+
+  _MAX_ATTEMPTS = 10
+  """How many retries before going into blocking mode"""
+
+  _ATTEMPT_FACTOR = 1.75
+  """Factor between attempts"""
+
+  def __init__(self, _random_fn=None):
+    """Initializes this class.
+
+    @param _random_fn: Random number generator for unittests
+
+    """
+    object.__init__(self)
+
+    self._start_time = None
+    self._attempts = 0
+
+    if _random_fn is None:
+      self._random_fn = random.random
+    else:
+      self._random_fn = _random_fn
+
+  def NextAttempt(self):
+    """Advances to the next attempt.
+
+    """
+    assert self._attempts >= 0
+    self._attempts += 1
+
+  def CalcRemainingTimeout(self):
+    """Returns the remaining timeout.
+
+    """
+    assert self._attempts >= 0
+
+    if self._attempts == self._MAX_ATTEMPTS:
+      # Only blocking acquires after 10 retries
+      return None
+
+    if self._attempts > self._MAX_ATTEMPTS:
+      raise RuntimeError("Blocking acquire ran into timeout")
+
+    # Get start time on first calculation
+    if self._start_time is None:
+      self._start_time = time.time()
+
+    # Calculate remaining time for this attempt
+    timeout = (self._start_time + (self._ATTEMPT_FACTOR ** self._attempts) -
+               time.time())
+
+    if timeout > 10.0:
+      # Cap timeout at 10 seconds. This gives other jobs a chance to run
+      # even if we're still trying to get our locks, before finally moving
+      # to a blocking acquire.
+      timeout = 10.0
+
+    elif timeout < 0.1:
+      # Lower boundary
+      timeout = 0.1
+
+    # Add a small variation (-/+ 5%) to timeouts. This helps in situations
+    # where two or more jobs are fighting for the same lock(s).
+    variation_range = timeout * 0.1
+    timeout += (self._random_fn() * variation_range) - (variation_range * 0.5)
+
+    assert timeout >= 0.0, "Timeout must be positive"
+
+    return timeout
+
+
 class OpExecCbBase:
   """Base class for OpCode execution callbacks.
 
@@ -206,7 +294,7 @@ class Processor(object):
 
     return result
 
-  def _LockAndExecLU(self, lu, level):
+  def _LockAndExecLU(self, lu, level, calc_timeout):
     """Execute a Logical Unit, with the needed locks.
 
     This is a recursive function that starts locking the given level, and
@@ -221,45 +309,62 @@ class Processor(object):
         self._cbs.NotifyStart()
 
       result = self._ExecLU(lu)
+
     elif adding_locks and acquiring_locks:
       # We could both acquire and add locks at the same level, but for now we
       # don't need this, so we'll avoid the complicated code needed.
-      raise NotImplementedError(
-        "Can't declare locks to acquire when adding others")
+      raise NotImplementedError("Can't declare locks to acquire when adding"
+                                " others")
+
     elif adding_locks or acquiring_locks:
       lu.DeclareLocks(level)
       share = lu.share_locks[level]
-      if acquiring_locks:
-        needed_locks = lu.needed_locks[level]
-
-        self._ReportLocks(level, needed_locks, share, False)
-        lu.acquired_locks[level] = self.context.glm.acquire(level,
-                                                            needed_locks,
-                                                            shared=share)
-        self._ReportLocks(level, needed_locks, share, True)
-
-      else: # adding_locks
-        add_locks = lu.add_locks[level]
-        lu.remove_locks[level] = add_locks
-        try:
-          self.context.glm.add(level, add_locks, acquired=1, shared=share)
-        except errors.LockError:
-          raise errors.OpPrereqError(
-            "Couldn't add locks (%s), probably because of a race condition"
-            " with another job, who added them first" % add_locks)
+
       try:
+        assert adding_locks ^ acquiring_locks, \
+          "Locks must be either added or acquired"
+
+        if acquiring_locks:
+          # Acquiring locks
+          needed_locks = lu.needed_locks[level]
+
+          self._ReportLocks(level, needed_locks, share, False)
+          acquired = self.context.glm.acquire(level,
+                                              needed_locks,
+                                              shared=share,
+                                              timeout=calc_timeout())
+          # TODO: Report timeout
+          self._ReportLocks(level, needed_locks, share, True)
+
+          if acquired is None:
+            raise _LockAcquireTimeout()
+
+          lu.acquired_locks[level] = acquired
+
+        else:
+          # Adding locks
+          add_locks = lu.add_locks[level]
+          lu.remove_locks[level] = add_locks
+
+          try:
+            self.context.glm.add(level, add_locks, acquired=1, shared=share)
+          except errors.LockError:
+            raise errors.OpPrereqError(
+              "Couldn't add locks (%s), probably because of a race condition"
+              " with another job, who added them first" % add_locks)
+
+          lu.acquired_locks[level] = add_locks
         try:
-          if adding_locks:
-            lu.acquired_locks[level] = add_locks
-          result = self._LockAndExecLU(lu, level + 1)
+          result = self._LockAndExecLU(lu, level + 1, calc_timeout)
         finally:
           if level in lu.remove_locks:
             self.context.glm.remove(level, lu.remove_locks[level])
       finally:
         if self.context.glm.is_owned(level):
           self.context.glm.release(level)
+
     else:
-      result = self._LockAndExecLU(lu, level + 1)
+      result = self._LockAndExecLU(lu, level + 1, calc_timeout)
 
     return result
 
@@ -282,29 +387,47 @@ class Processor(object):
       if lu_class is None:
         raise errors.OpCodeUnknown("Unknown opcode")
 
-      # Acquire the Big Ganeti Lock exclusively if this LU requires it, and in a
-      # shared fashion otherwise (to prevent concurrent run with an exclusive
-      # LU.
-      self._ReportLocks(locking.LEVEL_CLUSTER, [locking.BGL],
-                        not lu_class.REQ_BGL, False)
-      try:
-        self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL],
-                                 shared=not lu_class.REQ_BGL)
-      finally:
-        self._ReportLocks(locking.LEVEL_CLUSTER, [locking.BGL],
-                          not lu_class.REQ_BGL, True)
-      try:
-        lu = lu_class(self, op, self.context, self.rpc)
-        lu.ExpandNames()
-        assert lu.needed_locks is not None, "needed_locks not set by LU"
-        result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE)
-      finally:
-        self.context.glm.release(locking.LEVEL_CLUSTER)
+      timeout_strategy = _LockTimeoutStrategy()
+      calc_timeout = timeout_strategy.CalcRemainingTimeout
+
+      while True:
+        try:
+          self._ReportLocks(locking.LEVEL_CLUSTER, [locking.BGL],
+                            not lu_class.REQ_BGL, False)
+          try:
+            # Acquire the Big Ganeti Lock exclusively if this LU requires it,
+            # and in a shared fashion otherwise (to prevent concurrent run with
+            # an exclusive LU.
+            acquired_bgl = self.context.glm.acquire(locking.LEVEL_CLUSTER,
+                                                    [locking.BGL],
+                                                    shared=not lu_class.REQ_BGL,
+                                                    timeout=calc_timeout())
+          finally:
+            # TODO: Report timeout
+            self._ReportLocks(locking.LEVEL_CLUSTER, [locking.BGL],
+                              not lu_class.REQ_BGL, True)
+
+          if acquired_bgl is None:
+            raise _LockAcquireTimeout()
+
+          try:
+            lu = lu_class(self, op, self.context, self.rpc)
+            lu.ExpandNames()
+            assert lu.needed_locks is not None, "needed_locks not set by LU"
+
+            return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout)
+          finally:
+            self.context.glm.release(locking.LEVEL_CLUSTER)
+
+        except _LockAcquireTimeout:
+          # Timeout while waiting for lock, try again
+          pass
+
+        timeout_strategy.NextAttempt()
+
     finally:
       self._cbs = None
 
-    return result
-
   def _Feedback(self, *args):
     """Forward call to feedback callback function.
 
diff --git a/test/ganeti.mcpu_unittest.py b/test/ganeti.mcpu_unittest.py
new file mode 100755
index 000000000..02183e17b
--- /dev/null
+++ b/test/ganeti.mcpu_unittest.py
@@ -0,0 +1,56 @@
+#!/usr/bin/python
+#
+
+# Copyright (C) 2009 Google Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+# 02110-1301, USA.
+
+
+"""Script for unittesting the mcpu module"""
+
+
+import unittest
+
+from ganeti import mcpu
+
+
+class TestLockTimeoutStrategy(unittest.TestCase):
+  def testConstants(self):
+    self.assert_(mcpu._LockTimeoutStrategy._MAX_ATTEMPTS > 0)
+    self.assert_(mcpu._LockTimeoutStrategy._ATTEMPT_FACTOR > 1.0)
+
+  def testSimple(self):
+    strat = mcpu._LockTimeoutStrategy(_random_fn=lambda: 0.5)
+
+    self.assertEqual(strat._attempts, 0)
+
+    prev = None
+    for _ in range(strat._MAX_ATTEMPTS):
+      timeout = strat.CalcRemainingTimeout()
+      self.assert_(timeout is not None)
+
+      self.assert_(timeout <= 10.0)
+      self.assert_(prev is None or timeout >= prev)
+
+      strat.NextAttempt()
+
+      prev = timeout
+
+    self.assert_(strat.CalcRemainingTimeout() is None)
+
+
+if __name__ == "__main__":
+  unittest.main()
-- 
GitLab