diff --git a/lib/mcpu.py b/lib/mcpu.py index 1ad93ca323654db69e55454f5f7c5b5403f662a1..f67114c90cc51ee704df8747e5f5bb7d2e8c4d70 100644 --- a/lib/mcpu.py +++ b/lib/mcpu.py @@ -46,84 +46,99 @@ class _LockAcquireTimeout(Exception): """ -class _LockTimeoutStrategy(object): +def _CalculateLockAttemptTimeouts(): + """Calculate timeouts for lock attempts. + + """ + running_sum = 0 + result = [1.0] + + # Wait for a total of at least 150s before doing a blocking acquire + while sum(result) < 150.0: + timeout = (result[-1] * 1.05) ** 1.25 + + # Cap timeout at 10 seconds. This gives other jobs a chance to run + # even if we're still trying to get our locks, before finally moving + # to a blocking acquire. + if timeout > 10.0: + timeout = 10.0 + + elif timeout < 0.1: + # Lower boundary for safety + timeout = 0.1 + + result.append(timeout) + + return result + + +class _LockAttemptTimeoutStrategy(object): """Class with lock acquire timeout strategy. """ __slots__ = [ - "_attempts", + "_attempt", "_random_fn", "_start_time", + "_time_fn", ] - _MAX_ATTEMPTS = 10 - """How many retries before going into blocking mode""" - - _ATTEMPT_FACTOR = 1.75 - """Factor between attempts""" + _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts() - def __init__(self, _random_fn=None): + def __init__(self, attempt=0, _time_fn=time.time, _random_fn=random.random): """Initializes this class. + @type attempt: int + @param attempt: Current attempt number + @param _time_fn: Time function for unittests @param _random_fn: Random number generator for unittests """ object.__init__(self) - self._start_time = None - self._attempts = 0 + if attempt < 0: + raise ValueError("Attempt must be zero or positive") - if _random_fn is None: - self._random_fn = random.random - else: - self._random_fn = _random_fn + self._attempt = attempt + self._time_fn = _time_fn + self._random_fn = _random_fn + + self._start_time = None def NextAttempt(self): - """Advances to the next attempt. + """Returns the strategy for the next attempt. """ - assert self._attempts >= 0 - self._attempts += 1 + return _LockAttemptTimeoutStrategy(attempt=self._attempt + 1, + _time_fn=self._time_fn, + _random_fn=self._random_fn) def CalcRemainingTimeout(self): """Returns the remaining timeout. """ - assert self._attempts >= 0 - - if self._attempts == self._MAX_ATTEMPTS: - # Only blocking acquires after 10 retries + try: + timeout = self._TIMEOUT_PER_ATTEMPT[self._attempt] + except IndexError: + # No more timeouts, do blocking acquire return None - if self._attempts > self._MAX_ATTEMPTS: - raise RuntimeError("Blocking acquire ran into timeout") - # Get start time on first calculation if self._start_time is None: - self._start_time = time.time() + self._start_time = self._time_fn() # Calculate remaining time for this attempt - timeout = (self._start_time + (self._ATTEMPT_FACTOR ** self._attempts) - - time.time()) - - if timeout > 10.0: - # Cap timeout at 10 seconds. This gives other jobs a chance to run - # even if we're still trying to get our locks, before finally moving - # to a blocking acquire. - timeout = 10.0 - - elif timeout < 0.1: - # Lower boundary - timeout = 0.1 + remaining_timeout = self._start_time + timeout - self._time_fn() # Add a small variation (-/+ 5%) to timeouts. This helps in situations # where two or more jobs are fighting for the same lock(s). - variation_range = timeout * 0.1 - timeout += (self._random_fn() * variation_range) - (variation_range * 0.5) + variation_range = remaining_timeout * 0.1 + remaining_timeout += ((self._random_fn() * variation_range) - + (variation_range * 0.5)) - assert timeout >= 0.0, "Timeout must be positive" + assert remaining_timeout >= 0.0, "Timeout must be positive" - return timeout + return remaining_timeout class OpExecCbBase: @@ -414,16 +429,17 @@ class Processor(object): if lu_class is None: raise errors.OpCodeUnknown("Unknown opcode") - timeout_strategy = _LockTimeoutStrategy() - calc_timeout = timeout_strategy.CalcRemainingTimeout + timeout_strategy = _LockAttemptTimeoutStrategy() while True: try: + acquire_timeout = timeout_strategy.CalcRemainingTimeout() + # Acquire the Big Ganeti Lock exclusively if this LU requires it, # and in a shared fashion otherwise (to prevent concurrent run with # an exclusive LU. if self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL, - not lu_class.REQ_BGL, calc_timeout()) is None: + not lu_class.REQ_BGL, acquire_timeout) is None: raise _LockAcquireTimeout() try: @@ -431,7 +447,8 @@ class Processor(object): lu.ExpandNames() assert lu.needed_locks is not None, "needed_locks not set by LU" - return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout) + return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, + timeout_strategy.CalcRemainingTimeout) finally: self.context.glm.release(locking.LEVEL_CLUSTER) @@ -439,7 +456,7 @@ class Processor(object): # Timeout while waiting for lock, try again pass - timeout_strategy.NextAttempt() + timeout_strategy = timeout_strategy.NextAttempt() finally: self._cbs = None diff --git a/test/ganeti.mcpu_unittest.py b/test/ganeti.mcpu_unittest.py index 02183e17b93166ca98032cae5e166fa4a156e963..f7d8e0e56f95b86d57d3111130f042dd468b8a2e 100755 --- a/test/ganeti.mcpu_unittest.py +++ b/test/ganeti.mcpu_unittest.py @@ -27,25 +27,27 @@ import unittest from ganeti import mcpu -class TestLockTimeoutStrategy(unittest.TestCase): +class TestLockAttemptTimeoutStrategy(unittest.TestCase): def testConstants(self): - self.assert_(mcpu._LockTimeoutStrategy._MAX_ATTEMPTS > 0) - self.assert_(mcpu._LockTimeoutStrategy._ATTEMPT_FACTOR > 1.0) + tpa = mcpu._LockAttemptTimeoutStrategy._TIMEOUT_PER_ATTEMPT + self.assert_(len(tpa) > 10) + self.assert_(sum(tpa) >= 150.0) def testSimple(self): - strat = mcpu._LockTimeoutStrategy(_random_fn=lambda: 0.5) + strat = mcpu._LockAttemptTimeoutStrategy(_random_fn=lambda: 0.5, + _time_fn=lambda: 0.0) - self.assertEqual(strat._attempts, 0) + self.assertEqual(strat._attempt, 0) prev = None - for _ in range(strat._MAX_ATTEMPTS): + for _ in range(len(mcpu._LockAttemptTimeoutStrategy._TIMEOUT_PER_ATTEMPT)): timeout = strat.CalcRemainingTimeout() self.assert_(timeout is not None) self.assert_(timeout <= 10.0) self.assert_(prev is None or timeout >= prev) - strat.NextAttempt() + strat = strat.NextAttempt() prev = timeout