Commit 407339d0 authored by Michael Hanselmann's avatar Michael Hanselmann

mcpu: Implement lock timeouts

The timeout is always between ~0.1 and ~10.0 seconds. A small
variation of ±5% is added to prevent different jobs from
fighting each other. After 10 attempts to acquire the locks with
a timeout, a blocking acquire is made.

Lock status reporting will be improved in a separate patch.
Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarGuido Trotter <ultrotter@google.com>
parent 6b95b76d
......@@ -272,6 +272,7 @@ dist_TESTS = \
test/ganeti.hooks_unittest.py \
test/ganeti.http_unittest.py \
test/ganeti.locking_unittest.py \
test/ganeti.mcpu_unittest.py \
test/ganeti.objects_unittest.py \
test/ganeti.rapi.resources_unittest.py \
test/ganeti.serializer_unittest.py \
......
......@@ -29,6 +29,8 @@ are two kinds of classes defined:
"""
import logging
import random
import time
from ganeti import opcodes
from ganeti import constants
......@@ -39,6 +41,92 @@ from ganeti import locking
from ganeti import utils
class _LockAcquireTimeout(Exception):
"""Internal exception to report timeouts on acquiring locks.
"""
class _LockTimeoutStrategy(object):
"""Class with lock acquire timeout strategy.
"""
__slots__ = [
"_attempts",
"_random_fn",
"_start_time",
]
_MAX_ATTEMPTS = 10
"""How many retries before going into blocking mode"""
_ATTEMPT_FACTOR = 1.75
"""Factor between attempts"""
def __init__(self, _random_fn=None):
"""Initializes this class.
@param _random_fn: Random number generator for unittests
"""
object.__init__(self)
self._start_time = None
self._attempts = 0
if _random_fn is None:
self._random_fn = random.random
else:
self._random_fn = _random_fn
def NextAttempt(self):
"""Advances to the next attempt.
"""
assert self._attempts >= 0
self._attempts += 1
def CalcRemainingTimeout(self):
"""Returns the remaining timeout.
"""
assert self._attempts >= 0
if self._attempts == self._MAX_ATTEMPTS:
# Only blocking acquires after 10 retries
return None
if self._attempts > self._MAX_ATTEMPTS:
raise RuntimeError("Blocking acquire ran into timeout")
# Get start time on first calculation
if self._start_time is None:
self._start_time = time.time()
# Calculate remaining time for this attempt
timeout = (self._start_time + (self._ATTEMPT_FACTOR ** self._attempts) -
time.time())
if timeout > 10.0:
# Cap timeout at 10 seconds. This gives other jobs a chance to run
# even if we're still trying to get our locks, before finally moving
# to a blocking acquire.
timeout = 10.0
elif timeout < 0.1:
# Lower boundary
timeout = 0.1
# Add a small variation (-/+ 5%) to timeouts. This helps in situations
# where two or more jobs are fighting for the same lock(s).
variation_range = timeout * 0.1
timeout += (self._random_fn() * variation_range) - (variation_range * 0.5)
assert timeout >= 0.0, "Timeout must be positive"
return timeout
class OpExecCbBase:
"""Base class for OpCode execution callbacks.
......@@ -206,7 +294,7 @@ class Processor(object):
return result
def _LockAndExecLU(self, lu, level):
def _LockAndExecLU(self, lu, level, calc_timeout):
"""Execute a Logical Unit, with the needed locks.
This is a recursive function that starts locking the given level, and
......@@ -221,45 +309,62 @@ class Processor(object):
self._cbs.NotifyStart()
result = self._ExecLU(lu)
elif adding_locks and acquiring_locks:
# We could both acquire and add locks at the same level, but for now we
# don't need this, so we'll avoid the complicated code needed.
raise NotImplementedError(
"Can't declare locks to acquire when adding others")
raise NotImplementedError("Can't declare locks to acquire when adding"
" others")
elif adding_locks or acquiring_locks:
lu.DeclareLocks(level)
share = lu.share_locks[level]
if acquiring_locks:
needed_locks = lu.needed_locks[level]
self._ReportLocks(level, needed_locks, share, False)
lu.acquired_locks[level] = self.context.glm.acquire(level,
needed_locks,
shared=share)
self._ReportLocks(level, needed_locks, share, True)
else: # adding_locks
add_locks = lu.add_locks[level]
lu.remove_locks[level] = add_locks
try:
self.context.glm.add(level, add_locks, acquired=1, shared=share)
except errors.LockError:
raise errors.OpPrereqError(
"Couldn't add locks (%s), probably because of a race condition"
" with another job, who added them first" % add_locks)
try:
assert adding_locks ^ acquiring_locks, \
"Locks must be either added or acquired"
if acquiring_locks:
# Acquiring locks
needed_locks = lu.needed_locks[level]
self._ReportLocks(level, needed_locks, share, False)
acquired = self.context.glm.acquire(level,
needed_locks,
shared=share,
timeout=calc_timeout())
# TODO: Report timeout
self._ReportLocks(level, needed_locks, share, True)
if acquired is None:
raise _LockAcquireTimeout()
lu.acquired_locks[level] = acquired
else:
# Adding locks
add_locks = lu.add_locks[level]
lu.remove_locks[level] = add_locks
try:
self.context.glm.add(level, add_locks, acquired=1, shared=share)
except errors.LockError:
raise errors.OpPrereqError(
"Couldn't add locks (%s), probably because of a race condition"
" with another job, who added them first" % add_locks)
lu.acquired_locks[level] = add_locks
try:
if adding_locks:
lu.acquired_locks[level] = add_locks
result = self._LockAndExecLU(lu, level + 1)
result = self._LockAndExecLU(lu, level + 1, calc_timeout)
finally:
if level in lu.remove_locks:
self.context.glm.remove(level, lu.remove_locks[level])
finally:
if self.context.glm.is_owned(level):
self.context.glm.release(level)
else:
result = self._LockAndExecLU(lu, level + 1)
result = self._LockAndExecLU(lu, level + 1, calc_timeout)
return result
......@@ -282,29 +387,47 @@ class Processor(object):
if lu_class is None:
raise errors.OpCodeUnknown("Unknown opcode")
# Acquire the Big Ganeti Lock exclusively if this LU requires it, and in a
# shared fashion otherwise (to prevent concurrent run with an exclusive
# LU.
self._ReportLocks(locking.LEVEL_CLUSTER, [locking.BGL],
not lu_class.REQ_BGL, False)
try:
self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL],
shared=not lu_class.REQ_BGL)
finally:
self._ReportLocks(locking.LEVEL_CLUSTER, [locking.BGL],
not lu_class.REQ_BGL, True)
try:
lu = lu_class(self, op, self.context, self.rpc)
lu.ExpandNames()
assert lu.needed_locks is not None, "needed_locks not set by LU"
result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE)
finally:
self.context.glm.release(locking.LEVEL_CLUSTER)
timeout_strategy = _LockTimeoutStrategy()
calc_timeout = timeout_strategy.CalcRemainingTimeout
while True:
try:
self._ReportLocks(locking.LEVEL_CLUSTER, [locking.BGL],
not lu_class.REQ_BGL, False)
try:
# Acquire the Big Ganeti Lock exclusively if this LU requires it,
# and in a shared fashion otherwise (to prevent concurrent run with
# an exclusive LU.
acquired_bgl = self.context.glm.acquire(locking.LEVEL_CLUSTER,
[locking.BGL],
shared=not lu_class.REQ_BGL,
timeout=calc_timeout())
finally:
# TODO: Report timeout
self._ReportLocks(locking.LEVEL_CLUSTER, [locking.BGL],
not lu_class.REQ_BGL, True)
if acquired_bgl is None:
raise _LockAcquireTimeout()
try:
lu = lu_class(self, op, self.context, self.rpc)
lu.ExpandNames()
assert lu.needed_locks is not None, "needed_locks not set by LU"
return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout)
finally:
self.context.glm.release(locking.LEVEL_CLUSTER)
except _LockAcquireTimeout:
# Timeout while waiting for lock, try again
pass
timeout_strategy.NextAttempt()
finally:
self._cbs = None
return result
def _Feedback(self, *args):
"""Forward call to feedback callback function.
......
#!/usr/bin/python
#
# Copyright (C) 2009 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.
"""Script for unittesting the mcpu module"""
import unittest
from ganeti import mcpu
class TestLockTimeoutStrategy(unittest.TestCase):
def testConstants(self):
self.assert_(mcpu._LockTimeoutStrategy._MAX_ATTEMPTS > 0)
self.assert_(mcpu._LockTimeoutStrategy._ATTEMPT_FACTOR > 1.0)
def testSimple(self):
strat = mcpu._LockTimeoutStrategy(_random_fn=lambda: 0.5)
self.assertEqual(strat._attempts, 0)
prev = None
for _ in range(strat._MAX_ATTEMPTS):
timeout = strat.CalcRemainingTimeout()
self.assert_(timeout is not None)
self.assert_(timeout <= 10.0)
self.assert_(prev is None or timeout >= prev)
strat.NextAttempt()
prev = timeout
self.assert_(strat.CalcRemainingTimeout() is None)
if __name__ == "__main__":
unittest.main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment