Commit e7ec2f27 authored by Klaus Aehlig's avatar Klaus Aehlig

Remove Ganeti Lock Manager and Monitor

...from the jobs code, as this functionality is now
carried out by wconfd. Also remove dead code resulting
from this removal.
Signed-off-by: default avatarKlaus Aehlig <aehlig@google.com>
Reviewed-by: default avatarPetr Pudlak <pudlak@google.com>
parent 23b13756
......@@ -1643,7 +1643,6 @@ python_test_support = \
test/py/cmdlib/testsupport/config_mock.py \
test/py/cmdlib/testsupport/iallocator_mock.py \
test/py/cmdlib/testsupport/livelock_mock.py \
test/py/cmdlib/testsupport/lock_manager_mock.py \
test/py/cmdlib/testsupport/netutils_mock.py \
test/py/cmdlib/testsupport/processor_mock.py \
test/py/cmdlib/testsupport/rpc_runner_mock.py \
......
......@@ -1716,7 +1716,6 @@ class JobQueue(object):
# Job dependencies
self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
self._EnqueueJobs)
self.context.glm.AddToLockMonitor(self.depmgr)
# Setup worker pool
self._wpool = _JobQueueWorkerPool(self)
......
......@@ -29,10 +29,8 @@ import os
import select
import threading
import errno
import weakref
import logging
import heapq
import itertools
import time
from ganeti import errors
......@@ -423,7 +421,6 @@ class SharedLock(object):
"""Construct a new SharedLock.
@param name: the name of the lock
@type monitor: L{LockMonitor}
@param monitor: Lock monitor with which to register
"""
......@@ -990,7 +987,6 @@ class LockSet(object):
@type members: list of strings
@param members: initial members of the set
@type monitor: L{LockMonitor}
@param monitor: Lock monitor with which to register member locks
"""
......@@ -1618,353 +1614,3 @@ BGL = "BGL"
#: Node allocation lock
NAL = "NAL"
class GanetiLockManager(object):
"""The Ganeti Locking Library
The purpose of this small library is to manage locking for ganeti clusters
in a central place, while at the same time doing dynamic checks against
possible deadlocks. It will also make it easier to transition to a different
lock type should we migrate away from python threads.
"""
_instance = None
def __init__(self, node_uuids, nodegroups, instance_names, networks):
"""Constructs a new GanetiLockManager object.
There should be only a GanetiLockManager object at any time, so this
function raises an error if this is not the case.
@param node_uuids: list of node UUIDs
@param nodegroups: list of nodegroup uuids
@param instance_names: list of instance names
"""
assert self.__class__._instance is None, \
"double GanetiLockManager instance"
self.__class__._instance = self
self._monitor = LockMonitor()
# The keyring contains all the locks, at their level and in the correct
# locking order.
self.__keyring = {
LEVEL_CLUSTER: LockSet([BGL], "cluster", monitor=self._monitor),
LEVEL_NODE: LockSet(node_uuids, "node", monitor=self._monitor),
LEVEL_NODE_RES: LockSet(node_uuids, "node-res", monitor=self._monitor),
LEVEL_NODEGROUP: LockSet(nodegroups, "nodegroup", monitor=self._monitor),
LEVEL_INSTANCE: LockSet(instance_names, "instance",
monitor=self._monitor),
LEVEL_NETWORK: LockSet(networks, "network", monitor=self._monitor),
LEVEL_NODE_ALLOC: LockSet([NAL], "node-alloc", monitor=self._monitor),
}
assert compat.all(ls.name == LEVEL_NAMES[level]
for (level, ls) in self.__keyring.items()), \
"Keyring name mismatch"
def AddToLockMonitor(self, provider):
"""Registers a new lock with the monitor.
See L{LockMonitor.RegisterLock}.
"""
return self._monitor.RegisterLock(provider)
def QueryLocks(self, fields):
"""Queries information from all locks.
See L{LockMonitor.QueryLocks}.
"""
return self._monitor.QueryLocks(fields)
def _names(self, level):
"""List the lock names at the given level.
This can be used for debugging/testing purposes.
@param level: the level whose list of locks to get
"""
assert level in LEVELS, "Invalid locking level %s" % level
return self.__keyring[level]._names()
def is_owned(self, level):
"""Check whether we are owning locks at the given level
"""
return self.__keyring[level].is_owned()
def list_owned(self, level):
"""Get the set of owned locks at the given level
"""
return self.__keyring[level].list_owned()
def check_owned(self, level, names, shared=-1):
"""Check if locks at a certain level are owned in a specific mode.
@see: L{LockSet.check_owned}
"""
return self.__keyring[level].check_owned(names, shared=shared)
def owning_all(self, level):
"""Checks whether current thread owns all locks at a certain level.
@see: L{LockSet.owning_all}
"""
return self.__keyring[level].owning_all()
def _upper_owned(self, level):
"""Check that we don't own any lock at a level greater than the given one.
"""
# This way of checking only works if LEVELS[i] = i, which we check for in
# the test cases.
return compat.any((self.is_owned(l) for l in LEVELS[level + 1:]))
def _BGL_owned(self): # pylint: disable=C0103
"""Check if the current thread owns the BGL.
Both an exclusive or a shared acquisition work.
"""
return BGL in self.__keyring[LEVEL_CLUSTER].list_owned()
@staticmethod
def _contains_BGL(level, names): # pylint: disable=C0103
"""Check if the level contains the BGL.
Check if acting on the given level and set of names will change
the status of the Big Ganeti Lock.
"""
return level == LEVEL_CLUSTER and (names is None or BGL in names)
def acquire(self, level, names, timeout=None, shared=0, priority=None,
opportunistic=False):
"""Acquire a set of resource locks, at the same level.
@type level: member of locking.LEVELS
@param level: the level at which the locks shall be acquired
@type names: list of strings (or string)
@param names: the names of the locks which shall be acquired
(special lock names, or instance/node names)
@type shared: integer (0/1) used as a boolean
@param shared: whether to acquire in shared mode; by default
an exclusive lock will be acquired
@type timeout: float
@param timeout: Maximum time to acquire all locks
@type priority: integer
@param priority: Priority for acquiring lock
@type opportunistic: boolean
@param opportunistic: Acquire locks opportunistically; use the return value
to determine which locks were actually acquired
"""
assert level in LEVELS, "Invalid locking level %s" % level
# Check that we are either acquiring the Big Ganeti Lock or we already own
# it. Some "legacy" opcodes need to be sure they are run non-concurrently
# so even if we've migrated we need to at least share the BGL to be
# compatible with them. Of course if we own the BGL exclusively there's no
# point in acquiring any other lock, unless perhaps we are half way through
# the migration of the current opcode.
assert (self._contains_BGL(level, names) or self._BGL_owned()), (
"You must own the Big Ganeti Lock before acquiring any other")
# Check we don't own locks at the same or upper levels.
assert not self._upper_owned(level), ("Cannot acquire locks at a level"
" while owning some at a greater one")
# Acquire the locks in the set.
return self.__keyring[level].acquire(names, shared=shared, timeout=timeout,
priority=priority,
opportunistic=opportunistic)
def downgrade(self, level, names=None):
"""Downgrade a set of resource locks from exclusive to shared mode.
You must have acquired the locks in exclusive mode.
@type level: member of locking.LEVELS
@param level: the level at which the locks shall be downgraded
@type names: list of strings, or None
@param names: the names of the locks which shall be downgraded
(defaults to all the locks acquired at the level)
"""
assert level in LEVELS, "Invalid locking level %s" % level
return self.__keyring[level].downgrade(names=names)
def release(self, level, names=None):
"""Release a set of resource locks, at the same level.
You must have acquired the locks, either in shared or in exclusive
mode, before releasing them.
@type level: member of locking.LEVELS
@param level: the level at which the locks shall be released
@type names: list of strings, or None
@param names: the names of the locks which shall be released
(defaults to all the locks acquired at that level)
"""
assert level in LEVELS, "Invalid locking level %s" % level
assert (not self._contains_BGL(level, names) or
not self._upper_owned(LEVEL_CLUSTER)), (
"Cannot release the Big Ganeti Lock while holding something"
" at upper levels (%r)" %
(utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self.list_owned(i))
for i in self.__keyring.keys()]), ))
# Release will complain if we don't own the locks already
return self.__keyring[level].release(names)
def add(self, level, names, acquired=0, shared=0):
"""Add locks at the specified level.
@type level: member of locking.LEVELS_MOD
@param level: the level at which the locks shall be added
@type names: list of strings
@param names: names of the locks to acquire
@type acquired: integer (0/1) used as a boolean
@param acquired: whether to acquire the newly added locks
@type shared: integer (0/1) used as a boolean
@param shared: whether the acquisition will be shared
"""
assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
assert self._BGL_owned(), ("You must own the BGL before performing other"
" operations")
assert not self._upper_owned(level), ("Cannot add locks at a level"
" while owning some at a greater one")
return self.__keyring[level].add(names, acquired=acquired, shared=shared)
def remove(self, level, names):
"""Remove locks from the specified level.
You must either already own the locks you are trying to remove
exclusively or not own any lock at an upper level.
@type level: member of locking.LEVELS_MOD
@param level: the level at which the locks shall be removed
@type names: list of strings
@param names: the names of the locks which shall be removed
(special lock names, or instance/node names)
"""
assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
assert self._BGL_owned(), ("You must own the BGL before performing other"
" operations")
# Check we either own the level or don't own anything from here
# up. LockSet.remove() will check the case in which we don't own
# all the needed resources, or we have a shared ownership.
assert self.is_owned(level) or not self._upper_owned(level), (
"Cannot remove locks at a level while not owning it or"
" owning some at a greater one")
return self.__keyring[level].remove(names)
def _MonitorSortKey((item, idx, num)):
"""Sorting key function.
Sort by name, registration order and then order of information. This provides
a stable sort order over different providers, even if they return the same
name.
"""
(name, _, _, _) = item
return (utils.NiceSortKey(name), num, idx)
class LockMonitor(object):
_LOCK_ATTR = "_lock"
def __init__(self):
"""Initializes this class.
"""
self._lock = SharedLock("LockMonitor")
# Counter for stable sorting
self._counter = itertools.count(0)
# Tracked locks. Weak references are used to avoid issues with circular
# references and deletion.
self._locks = weakref.WeakKeyDictionary()
@ssynchronized(_LOCK_ATTR)
def RegisterLock(self, provider):
"""Registers a new lock.
@param provider: Object with a callable method named C{GetLockInfo}, taking
a single C{set} containing the requested information items
@note: It would be nicer to only receive the function generating the
requested information but, as it turns out, weak references to bound
methods (e.g. C{self.GetLockInfo}) are tricky; there are several
workarounds, but none of the ones I found works properly in combination
with a standard C{WeakKeyDictionary}
"""
assert provider not in self._locks, "Duplicate registration"
# There used to be a check for duplicate names here. As it turned out, when
# a lock is re-created with the same name in a very short timeframe, the
# previous instance might not yet be removed from the weakref dictionary.
# By keeping track of the order of incoming registrations, a stable sort
# ordering can still be guaranteed.
self._locks[provider] = self._counter.next()
def _GetLockInfo(self, requested):
"""Get information from all locks.
"""
# Must hold lock while getting consistent list of tracked items
self._lock.acquire(shared=1)
try:
items = self._locks.items()
finally:
self._lock.release()
return [(info, idx, num)
for (provider, num) in items
for (idx, info) in enumerate(provider.GetLockInfo(requested))]
def _Query(self, fields):
"""Queries information from all locks.
@type fields: list of strings
@param fields: List of fields to return
"""
qobj = query.Query(query.LOCK_FIELDS, fields)
# Get all data with internal lock held and then sort by name and incoming
# order
lockinfo = sorted(self._GetLockInfo(qobj.RequestedData()),
key=_MonitorSortKey)
# Extract lock information and build query data
return (qobj, query.LockQueryData(map(compat.fst, lockinfo)))
def QueryLocks(self, fields):
"""Queries information from all locks.
@type fields: list of strings
@param fields: List of fields to return
"""
(qobj, ctx) = self._Query(fields)
# Prepare query response
return query.GetQueryResponse(qobj, ctx)
......@@ -971,7 +971,7 @@ class JobQueueRunner(_RpcClientBase, _generated_rpc.RpcClientJobQueue):
"""RPC wrappers for job queue.
"""
def __init__(self, context, address_list):
def __init__(self, _context, address_list):
"""Initializes this class.
"""
......@@ -982,7 +982,7 @@ class JobQueueRunner(_RpcClientBase, _generated_rpc.RpcClientJobQueue):
resolver = _StaticResolver(address_list)
_RpcClientBase.__init__(self, resolver, _ENCODERS.get,
lock_monitor_cb=context.glm.AddToLockMonitor)
lock_monitor_cb=lambda _: None)
_generated_rpc.RpcClientJobQueue.__init__(self)
......@@ -1023,15 +1023,12 @@ class ConfigRunner(_RpcClientBase, _generated_rpc.RpcClientConfig):
"""RPC wrappers for L{config}.
"""
def __init__(self, context, address_list, _req_process_fn=None,
def __init__(self, _context, address_list, _req_process_fn=None,
_getents=None):
"""Initializes this class.
"""
if context:
lock_monitor_cb = context.glm.AddToLockMonitor
else:
lock_monitor_cb = None
lock_monitor_cb = None
if address_list is None:
resolver = compat.partial(_SsconfResolver, True)
......
......@@ -43,7 +43,6 @@ from ganeti import daemon
from ganeti import mcpu
from ganeti import opcodes
from ganeti import jqueue
from ganeti import locking
from ganeti import luxi
import ganeti.rpc.errors as rpcerr
from ganeti import utils
......@@ -343,10 +342,7 @@ class ClientOps(object):
result = self._Query(opcodes.OpQuery(what=what, fields=fields,
qfilter=qfilter))
elif what == constants.QR_LOCK:
if qfilter is not None:
raise errors.OpPrereqError("Lock queries can't be filtered",
errors.ECODE_INVAL)
result = self.server.context.glm.QueryLocks(fields)
raise errors.OpPrereqError("Lock queries cannot be asked to jobs")
elif what == constants.QR_JOB:
result = queue.QueryJobs(fields, qfilter)
elif what in constants.QR_VIA_LUXI:
......@@ -504,15 +500,8 @@ class GanetiContext(object):
else:
self.livelock = livelock
# Locking manager
cfg = self.GetConfig(None)
self.glm = locking.GanetiLockManager(
cfg.GetNodeList(),
cfg.GetNodeGroupList(),
[inst.name for inst in cfg.GetAllInstancesInfo().values()],
cfg.GetNetworkList())
# Job queue
cfg = self.GetConfig(None)
logging.debug("Creating the job queue")
self.jobqueue = jqueue.JobQueue(self, cfg)
......@@ -532,8 +521,10 @@ class GanetiContext(object):
def GetConfig(self, ec_id):
return config.GetConfig(ec_id, self.livelock)
# pylint: disable=R0201
# method could be a function, but keep interface backwards compatible
def GetRpc(self, cfg):
return rpc.RpcRunner(cfg, self.glm.AddToLockMonitor)
return rpc.RpcRunner(cfg, lambda _: None)
def AddNode(self, cfg, node, ec_id):
"""Adds a node to the configuration.
......
......@@ -29,7 +29,6 @@ from cmdlib.testsupport.config_mock import ConfigMock
from cmdlib.testsupport.iallocator_mock import patchIAllocator
from cmdlib.testsupport.livelock_mock import LiveLockMock
from cmdlib.testsupport.utils_mock import patchUtils
from cmdlib.testsupport.lock_manager_mock import LockManagerMock
from cmdlib.testsupport.netutils_mock import patchNetutils, HostnameMock
from cmdlib.testsupport.processor_mock import ProcessorMock
from cmdlib.testsupport.rpc_runner_mock import CreateRpcRunnerMock, \
......@@ -46,7 +45,6 @@ __all__ = ["CmdlibTestCase",
"patchUtils",
"patchNetutils",
"patchSsh",
"LockManagerMock",
"ProcessorMock",
"RpcResultsBuilder",
"LiveLockMock",
......
#
#
# Copyright (C) 2013 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.
"""Support for mocking the lock manager"""
from ganeti import locking
class LockManagerMock(locking.GanetiLockManager):
"""Mocked lock manager for tests.
"""
def __init__(self):
# reset singleton instance, there is a separate lock manager for every test
# pylint: disable=W0212
self.__class__._instance = None
super(LockManagerMock, self).__init__([], [], [], [])
def AddLocksFromConfig(self, cfg):
"""Create locks for all entities in the given configuration.
@type cfg: ganeti.config.ConfigWriter
"""
try:
self.acquire(locking.LEVEL_CLUSTER, locking.BGL)
for node_uuid in cfg.GetNodeList():
self.add(locking.LEVEL_NODE, node_uuid)
self.add(locking.LEVEL_NODE_RES, node_uuid)
for group_uuid in cfg.GetNodeGroupList():
self.add(locking.LEVEL_NODEGROUP, group_uuid)
for inst in cfg.GetAllInstancesInfo().values():
self.add(locking.LEVEL_INSTANCE, inst.name)
for net_uuid in cfg.GetNetworkList():
self.add(locking.LEVEL_NETWORK, net_uuid)
finally:
self.release(locking.LEVEL_CLUSTER, locking.BGL)
......@@ -2010,680 +2010,5 @@ class TestGetLsAcquireModeAndTimeouts(unittest.TestCase):
self.assertTrue(ls_timeout_fn is None)
class TestGanetiLockManager(_ThreadedTestCase):
def setUp(self):
_ThreadedTestCase.setUp(self)
self.nodes = ["n1", "n2"]
self.nodegroups = ["g1", "g2"]
self.instances = ["i1", "i2", "i3"]
self.networks = ["net1", "net2", "net3"]
self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups,
self.instances, self.networks)
def tearDown(self):
# Don't try this at home...
locking.GanetiLockManager._instance = None
def testLockingConstants(self):
# The locking library internally cheats by assuming its constants have some
# relationships with each other. Check those hold true.
# This relationship is also used in the Processor to recursively acquire
# the right locks. Again, please don't break it.
for i in range(len(locking.LEVELS)):
self.assertEqual(i, locking.LEVELS[i])
def testDoubleGLFails(self):
self.assertRaises(AssertionError, locking.GanetiLockManager, [], [], [], [])
def testLockNames(self):
self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(["BGL"]))
self.assertEqual(self.GL._names(locking.LEVEL_NODE_ALLOC), set(["NAL"]))
self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP),
set(self.nodegroups))
self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
set(self.instances))
self.assertEqual(self.GL._names(locking.LEVEL_NETWORK),
set(self.networks))
def testInitAndResources(self):
locking.GanetiLockManager._instance = None
self.GL = locking.GanetiLockManager([], [], [], [])
self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(["BGL"]))
self.assertEqual(self.GL._names(locking.LEVEL_NODE_ALLOC), set(["NAL"]))
self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
self.assertEqual(self.GL._names(locking.LEVEL_NETWORK), set())
locking.GanetiLockManager._instance = None
self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups, [], [])
self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(["BGL"]))
self.assertEqual(self.GL._names(locking.LEVEL_NODE_ALLOC), set(["NAL"]))
self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP),
set(self.nodegroups))
self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
self.assertEqual(self.GL._names(locking.LEVEL_NETWORK), set())
locking.GanetiLockManager._instance = None
self.GL = locking.GanetiLockManager([], [], self.instances, [])
self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(["BGL"]))
self.assertEqual(self.GL._names(locking.LEVEL_NODE_ALLOC), set(["NAL"]))
self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
set(self.instances))
locking.GanetiLockManager._instance = None
self.GL = locking.GanetiLockManager([], [], [], self.networks)
self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(["BGL"]))
self.assertEqual(self.GL._names(locking.LEVEL_NODE_ALLOC), set(["NAL"]))
self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
self.assertEqual(self.GL._names(locking.LEVEL_NETWORK),
set(self.networks))
def testAcquireRelease(self):
self.GL.acquire(locking