Commit e3200b18 authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

mcpu: Change lock attempt timeout calculation



With this patch all timeouts are pre-calculated. The interface of
the _LockTimeoutStrategy class is also changed a bit; NextAttempt
now returns a new instance.
Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarGuido Trotter <ultrotter@google.com>
parent 69b99987
......@@ -46,84 +46,99 @@ class _LockAcquireTimeout(Exception):
"""
class _LockTimeoutStrategy(object):
def _CalculateLockAttemptTimeouts():
"""Calculate timeouts for lock attempts.
"""
running_sum = 0
result = [1.0]
# Wait for a total of at least 150s before doing a blocking acquire
while sum(result) < 150.0:
timeout = (result[-1] * 1.05) ** 1.25
# Cap timeout at 10 seconds. This gives other jobs a chance to run
# even if we're still trying to get our locks, before finally moving
# to a blocking acquire.
if timeout > 10.0:
timeout = 10.0
elif timeout < 0.1:
# Lower boundary for safety
timeout = 0.1
result.append(timeout)
return result
class _LockAttemptTimeoutStrategy(object):
"""Class with lock acquire timeout strategy.
"""
__slots__ = [
"_attempts",
"_attempt",
"_random_fn",
"_start_time",
"_time_fn",
]
_MAX_ATTEMPTS = 10
"""How many retries before going into blocking mode"""
_ATTEMPT_FACTOR = 1.75
"""Factor between attempts"""
_TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
def __init__(self, _random_fn=None):
def __init__(self, attempt=0, _time_fn=time.time, _random_fn=random.random):
"""Initializes this class.
@type attempt: int
@param attempt: Current attempt number
@param _time_fn: Time function for unittests
@param _random_fn: Random number generator for unittests
"""
object.__init__(self)
self._start_time = None
self._attempts = 0
if attempt < 0:
raise ValueError("Attempt must be zero or positive")
if _random_fn is None:
self._random_fn = random.random
else:
self._random_fn = _random_fn
self._attempt = attempt
self._time_fn = _time_fn
self._random_fn = _random_fn
self._start_time = None
def NextAttempt(self):
"""Advances to the next attempt.
"""Returns the strategy for the next attempt.
"""
assert self._attempts >= 0
self._attempts += 1
return _LockAttemptTimeoutStrategy(attempt=self._attempt + 1,
_time_fn=self._time_fn,
_random_fn=self._random_fn)
def CalcRemainingTimeout(self):
"""Returns the remaining timeout.
"""
assert self._attempts >= 0
if self._attempts == self._MAX_ATTEMPTS:
# Only blocking acquires after 10 retries
try:
timeout = self._TIMEOUT_PER_ATTEMPT[self._attempt]
except IndexError:
# No more timeouts, do blocking acquire
return None
if self._attempts > self._MAX_ATTEMPTS:
raise RuntimeError("Blocking acquire ran into timeout")
# Get start time on first calculation
if self._start_time is None:
self._start_time = time.time()
self._start_time = self._time_fn()
# Calculate remaining time for this attempt
timeout = (self._start_time + (self._ATTEMPT_FACTOR ** self._attempts) -
time.time())
if timeout > 10.0:
# Cap timeout at 10 seconds. This gives other jobs a chance to run
# even if we're still trying to get our locks, before finally moving
# to a blocking acquire.
timeout = 10.0
elif timeout < 0.1:
# Lower boundary
timeout = 0.1
remaining_timeout = self._start_time + timeout - self._time_fn()
# Add a small variation (-/+ 5%) to timeouts. This helps in situations
# where two or more jobs are fighting for the same lock(s).
variation_range = timeout * 0.1
timeout += (self._random_fn() * variation_range) - (variation_range * 0.5)
variation_range = remaining_timeout * 0.1
remaining_timeout += ((self._random_fn() * variation_range) -
(variation_range * 0.5))
assert timeout >= 0.0, "Timeout must be positive"
assert remaining_timeout >= 0.0, "Timeout must be positive"
return timeout
return remaining_timeout
class OpExecCbBase:
......@@ -414,16 +429,17 @@ class Processor(object):
if lu_class is None:
raise errors.OpCodeUnknown("Unknown opcode")
timeout_strategy = _LockTimeoutStrategy()
calc_timeout = timeout_strategy.CalcRemainingTimeout
timeout_strategy = _LockAttemptTimeoutStrategy()
while True:
try:
acquire_timeout = timeout_strategy.CalcRemainingTimeout()
# Acquire the Big Ganeti Lock exclusively if this LU requires it,
# and in a shared fashion otherwise (to prevent concurrent run with
# an exclusive LU.
if self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
not lu_class.REQ_BGL, calc_timeout()) is None:
not lu_class.REQ_BGL, acquire_timeout) is None:
raise _LockAcquireTimeout()
try:
......@@ -431,7 +447,8 @@ class Processor(object):
lu.ExpandNames()
assert lu.needed_locks is not None, "needed_locks not set by LU"
return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout)
return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE,
timeout_strategy.CalcRemainingTimeout)
finally:
self.context.glm.release(locking.LEVEL_CLUSTER)
......@@ -439,7 +456,7 @@ class Processor(object):
# Timeout while waiting for lock, try again
pass
timeout_strategy.NextAttempt()
timeout_strategy = timeout_strategy.NextAttempt()
finally:
self._cbs = None
......
......@@ -27,25 +27,27 @@ import unittest
from ganeti import mcpu
class TestLockTimeoutStrategy(unittest.TestCase):
class TestLockAttemptTimeoutStrategy(unittest.TestCase):
def testConstants(self):
self.assert_(mcpu._LockTimeoutStrategy._MAX_ATTEMPTS > 0)
self.assert_(mcpu._LockTimeoutStrategy._ATTEMPT_FACTOR > 1.0)
tpa = mcpu._LockAttemptTimeoutStrategy._TIMEOUT_PER_ATTEMPT
self.assert_(len(tpa) > 10)
self.assert_(sum(tpa) >= 150.0)
def testSimple(self):
strat = mcpu._LockTimeoutStrategy(_random_fn=lambda: 0.5)
strat = mcpu._LockAttemptTimeoutStrategy(_random_fn=lambda: 0.5,
_time_fn=lambda: 0.0)
self.assertEqual(strat._attempts, 0)
self.assertEqual(strat._attempt, 0)
prev = None
for _ in range(strat._MAX_ATTEMPTS):
for _ in range(len(mcpu._LockAttemptTimeoutStrategy._TIMEOUT_PER_ATTEMPT)):
timeout = strat.CalcRemainingTimeout()
self.assert_(timeout is not None)
self.assert_(timeout <= 10.0)
self.assert_(prev is None or timeout >= prev)
strat.NextAttempt()
strat = strat.NextAttempt()
prev = timeout
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment