Commit f47b32a8 authored by Petr Pudlak's avatar Petr Pudlak
Browse files

Make configuration per job/thread

Previously there was one shared configuration object for all jobs,
threads and other tasks. This patch creates separate ConfigWrite
instances for distinct jobs/threads.

All exported methods of ConfigWriter are now wrapped in calls that
obtain the ConfigLock from WConfD, read the current configuration, and
optionally write it back to WConfD.

_OpenConfig is now called at each such request (instead of just once at
the creation time of ConfigWriter). A new method _CloseConfig is added
that performs the necessary cleanup (saving the configuration, releasing
the lock).

_UpgradeConfig needs to be called every time a configuration is received
from WConfd, to fix parts that aren't persisted by the Python code. This
requires that it doesn't use any methods that acquire locks, and it must
not save the configuration at the end (unless it's called just after
creating a ConfigWriter instance in "offline" mode).

The semantics of Update changes slightly. Before it just checked its
argument existed in the configuration. Now it also checks that the
its serial number is the same as in the master configuration state, to
avoid overwriting changes in other threads. This will require fixing all
calls to Update, in particular to avoid interspersing calls to Update
and other ConfigWriter methods. In the future, we should aim to
eliminate Update completely.

All LUs now carry their own instance of ConfigWriter, with their
corresponding job ID. Other cide that uses ConfigWriter identifies with
job ID 'None' and thread ID.
Signed-off-by: default avatarPetr Pudlak <>
Reviewed-by: default avatarKlaus Aehlig <>
parent 028f2db5
......@@ -880,7 +880,8 @@ def FinalizeClusterDestroy(master_uuid):
begun in cmdlib.LUDestroyOpcode.
cfg = config.ConfigWriter()
livelock = utils.livelock.LiveLock("bootstrap_destroy")
cfg = config.GetConfig(None, livelock)
modify_ssh_setup = cfg.GetClusterInfo().modify_ssh_setup
runner = rpc.BootstrapRunner()
......@@ -1002,7 +1003,8 @@ def MasterFailover(no_voting=False):
# instantiate a real config writer, as we now know we have the
# configuration data
cfg = config.ConfigWriter(accept_foreign=True)
livelock = utils.livelock.LiveLock("bootstrap_failover")
cfg = config.GetConfig(None, livelock, accept_foreign=True)
old_master_node = cfg.GetNodeInfoByName(old_master)
if old_master_node is None:
......@@ -103,7 +103,8 @@ class LogicalUnit(object):
HTYPE = None
REQ_BGL = True
def __init__(self, processor, op, context, rpc_runner, wconfdcontext, wconfd):
def __init__(self, processor, op, context, cfg,
rpc_runner, wconfdcontext, wconfd):
"""Constructor for LogicalUnit.
This needs to be overridden in derived classes in order to check op
......@@ -119,7 +120,7 @@ class LogicalUnit(object):
self.proc = processor
self.op = op
self.cfg = context.cfg
self.cfg = cfg
self.wconfdlocks = []
self.wconfdcontext = wconfdcontext
self.context = context
......@@ -43,7 +43,6 @@ import threading
import itertools
from ganeti import errors
from ganeti import locking
from ganeti import utils
from ganeti import constants
import ganeti.wconfd as wc
......@@ -56,35 +55,6 @@ from ganeti import pathutils
from ganeti import network
_config_lock = locking.SharedLock("ConfigWriter")
def _ConfigSync(shared=0):
"""Configuration synchronization decorator.
def wrap(fn):
def sync_function(*args, **kwargs):
cw = args[0]
assert isinstance(cw, ConfigWriter), \
"cannot ssynchronize on non-class method: self not found"
return fn(*args, **kwargs)
if not shared:
except Exception, err:
logging.crititcal("Can't write the configuration: %s", str(err))
return sync_function
return wrap
# job id used for resource management at config upgrade time
_UPGRADE_CONFIG_JID = "jid-cfg-upgrade"
def GetWConfdContext(ec_id, livelock):
"""Prepare a context for communication with WConfd.
......@@ -104,6 +74,46 @@ def GetWConfdContext(ec_id, livelock):
def GetConfig(ec_id, livelock, **kwargs):
"""A utility function for constructing instances of ConfigWriter.
It prepares a WConfd context and uses it to create a ConfigWriter instance.
@type ec_id: int, or None
@param ec_id: the job ID or None, if the caller isn't a job
@type livelock: L{ganeti.utils.livelock.LiveLock}
@param livelock: a livelock object holding the lockfile needed for WConfd
@type kwargs: dict
@param kwargs: Any additional arguments for the ConfigWriter constructor
@rtype: L{ConfigWriter}
@return: the ConfigWriter context
kwargs['wconfdcontext'] = GetWConfdContext(ec_id, livelock)
kwargs['wconfd'] = wc.Client()
return ConfigWriter(**kwargs)
def _ConfigSync(shared=0):
"""Configuration synchronization decorator.
def wrap(fn):
def sync_function(*args, **kwargs):
with args[0].GetConfigManager(shared):
logging.debug("ConfigWriter.%s(%s, %s)",
fn.__name__, str(args), str(kwargs))
result = fn(*args, **kwargs)
logging.debug("ConfigWriter.%s(...) returned '%s'",
fn.__name__, str(result))
return result
return sync_function
return wrap
# job id used for resource management at config upgrade time
_UPGRADE_CONFIG_JID = "jid-cfg-upgrade"
def _ValidateConfig(data):
"""Verifies that a configuration objects looks valid.
......@@ -210,6 +220,37 @@ def _CheckInstanceDiskIvNames(disks):
return result
class ConfigManager(object):
"""Locks the configuration and exposes it to be read or modified.
def __init__(self, config_writer, shared=False):
assert isinstance(config_writer, ConfigWriter), \
"invalid argument: Not a ConfigWriter"
self._config_writer = config_writer
self._shared = shared
def __enter__(self):
self._config_writer._OpenConfig(self._shared) # pylint: disable=W0212
except Exception:
logging.debug("Opening configuration failed")
self._config_writer._CloseConfig(False) # pylint: disable=W0212
except Exception: # pylint: disable=W0703
logging.debug("Closing configuration failed as well")
def __exit__(self, exc_type, exc_value, traceback):
# save the configuration, if this was a write opreration that succeeded
if exc_type is not None:
logging.debug("Configuration operation failed,"
" the changes will not be saved")
# pylint: disable=W0212
self._config_writer._CloseConfig(not self._shared and exc_type is None)
return False
class ConfigWriter(object):
"""The interface to the cluster configuration.
......@@ -218,10 +259,10 @@ class ConfigWriter(object):
def __init__(self, cfg_file=None, offline=False, _getents=runtime.GetEnts,
accept_foreign=False, wconfdcontext=None, wconfd=None):
self.write_count = 0
self._lock = _config_lock
self._config_data = None
self._offline = offline
if cfg_file is None:
self._cfg_file = pathutils.CLUSTER_CONF_FILE
......@@ -243,12 +284,20 @@ class ConfigWriter(object):
# file than after it was modified
self._my_hostname = netutils.Hostname.GetSysName()
self._cfg_id = None
self._wconfd = None
self._wconfdcontext = wconfdcontext
self._wconfd = wconfd
self._accept_foreign = accept_foreign
self._lock_count = 0
def _ConfigData(self):
return self._config_data
def _SetConfigData(self, cfg):
self._config_data = cfg
def _GetWConfdContext(self):
return self._wconfdcontext
# this method needs to be static, so that we can call it on the class
def IsCluster():
......@@ -312,7 +361,9 @@ class ConfigWriter(object):
@return: A dict with the filled in disk params
return self._ConfigData().cluster.SimpleFillDP(group.diskparams)
data = self._ConfigData().cluster.SimpleFillDP(group.diskparams)
assert isinstance(data, dict), "Not a dictionary: " + str(data)
return data
def _UnlockedGetNetworkMACPrefix(self, net_uuid):
"""Return the network mac prefix if it exists or the cluster level default.
......@@ -1871,8 +1922,7 @@ class ConfigWriter(object):
def _UnlockedGetInstanceNames(self, inst_uuids):
return [self._UnlockedGetInstanceName(uuid) for uuid in inst_uuids]
def AddNode(self, node, ec_id):
def _UnlockedAddNode(self, node, ec_id):
"""Add a node to the configuration.
@type node: L{objects.Node}
......@@ -1886,9 +1936,20 @@ class ConfigWriter(object):
node.serial_no = 1
node.ctime = node.mtime = time.time()
assert node.uuid in self._ConfigData().nodegroups[].members
self._ConfigData().nodes[node.uuid] = node
self._ConfigData().cluster.serial_no += 1
def AddNode(self, node, ec_id):
"""Add a node to the configuration.
@type node: L{objects.Node}
@param node: a Node instance
self._UnlockedAddNode(node, ec_id)
def RemoveNode(self, node_uuid):
"""Remove a node from the configuration.
......@@ -2370,10 +2431,30 @@ class ConfigWriter(object):
self._AllNICs() +
def _OpenConfig(self, accept_foreign):
"""Read the config data from disk.
def GetConfigManager(self, shared=False):
"""Returns a ConfigManager, which is suitable to perform a synchronized
block of configuration operations.
WARNING: This blocks all other configuration operations, so anything that
runs inside the block should be very fast, preferably not using any IO.
return ConfigManager(self, shared)
def _AddLockCount(self, count):
self._lock_count += count
return self._lock_count
def _LockCount(self):
return self._lock_count
def _OpenConfig(self, shared):
"""Read the config data from WConfd or disk.
if self._LockCount() > 0:
raise errors.ConfigurationError("Configuration lock isn't reentrant")
# Read the configuration data. If offline, read the file directly.
# If online, call WConfd.
if self._offline:
......@@ -2382,56 +2463,90 @@ class ConfigWriter(object):
dict_data = serializer.Load(raw_data)
except Exception, err:
raise errors.ConfigurationError(err)
self._wconfd = wc.Client()
dict_data = self._wconfd.ReadConfig()
self._cfg_id = utils.GetFileID(path=self._cfg_file)
data = objects.ConfigData.FromDict(dict_data)
except Exception, err:
raise errors.ConfigurationError(err)
data = objects.ConfigData.FromDict(dict_data)
except Exception, err:
raise errors.ConfigurationError(err)
# Make sure the configuration has the right version
# Make sure the configuration has the right version
if (not hasattr(data, "cluster") or
not hasattr(data.cluster, "rsahostkeypub")):
raise errors.ConfigurationError("Incomplete configuration"
" (missing cluster.rsahostkeypub)")
if (not hasattr(data, "cluster") or
not hasattr(data.cluster, "rsahostkeypub")):
raise errors.ConfigurationError("Incomplete configuration"
" (missing cluster.rsahostkeypub)")
if not data.cluster.master_node in data.nodes:
msg = ("The configuration denotes node %s as master, but does not"
" contain information about this node" %
raise errors.ConfigurationError(msg)
if not data.cluster.master_node in data.nodes:
msg = ("The configuration denotes node %s as master, but does not"
" contain information about this node" %
raise errors.ConfigurationError(msg)
master_info = data.nodes[data.cluster.master_node]
if != self._my_hostname and not accept_foreign:
msg = ("The configuration denotes node %s as master, while my"
" hostname is %s; opening a foreign configuration is only"
" possible in accept_foreign mode" %
(, self._my_hostname))
raise errors.ConfigurationError(msg)
master_info = data.nodes[data.cluster.master_node]
if != self._my_hostname and not self._accept_foreign:
msg = ("The configuration denotes node %s as master, while my"
" hostname is %s; opening a foreign configuration is only"
" possible in accept_foreign mode" %
(, self._my_hostname))
raise errors.ConfigurationError(msg)
self._config_data = data
# Upgrade configuration if needed
# Upgrade configuration if needed
# poll until we acquire the lock
while True:
dict_data = \
self._wconfd.LockConfig(self._GetWConfdContext(), bool(shared))
logging.debug("Received '%s' from WConfd.LockConfig [shared=%s]",
str(dict_data), bool(shared))
if dict_data is not None:
self._cfg_id = utils.GetFileID(path=self._cfg_file)
except Exception, err:
raise errors.ConfigurationError(err)
# Transitional fix until ConfigWriter is completely rewritten into
# Haskell
def _CloseConfig(self, save):
"""Release resources relating the config data.
def _UpgradeConfig(self):
if save:
except Exception, err:
logging.critical("Can't write the configuration: %s", str(err))
if not self._offline:
except AttributeError:
# If the configuration hasn't been initialized yet, just ignore it.
logging.debug("Configuration in WConfd unlocked")
# TODO: To WConfd
def _UpgradeConfig(self, saveafter=False):
"""Run any upgrade steps.
This method performs both in-object upgrades and also update some data
elements that need uniqueness across the whole configuration or interact
with other objects.
@warning: this function will call L{_WriteConfig()}, but also
L{DropECReservations} so it needs to be called only from a
"safe" place (the constructor). If one wanted to call it with
the lock held, a DropECReservationUnlocked would need to be
created first, to avoid causing deadlock.
@warning: if 'saveafter' is 'True', this function will call
L{_WriteConfig()} so it needs to be called only from a
"safe" place.
# Keep a copy of the persistent part of _config_data to check for changes
......@@ -2451,7 +2566,7 @@ class ConfigWriter(object):
self._UnlockedAddNodeGroup(default_nodegroup, _UPGRADE_CONFIG_JID, True)
for node in self._ConfigData().nodes.values():
if not = self.LookupNodeGroup(None) = self._UnlockedLookupNodeGroup(None)
# This is technically *not* an upgrade, but needs to be done both when
# nodegroups are being added, and upon normally loading the config,
# because the members list of a node group is discarded upon
......@@ -2459,11 +2574,9 @@ class ConfigWriter(object):
modified = (oldconf != self._ConfigData().ToDict())
if modified:
if modified and saveafter:
# This is ok even if it acquires the internal lock, as _UpgradeConfig is
# only called at config init time, without the lock held
config_errors = self._UnlockedVerifyConfig()
if config_errors:
......@@ -2515,7 +2628,8 @@ class ConfigWriter(object):
except errors.LockError:
raise errors.ConfigurationError("The configuration file has been"
" modified since the last write, cannot"
......@@ -2722,24 +2836,35 @@ class ConfigWriter(object):
if self._ConfigData() is None:
raise errors.ProgrammerError("Configuration file not read,"
" cannot save.")
def check_serial(target, current):
if current is None:
raise errors.ConfigurationError("Configuration object unknown")
elif current.serial_no != target.serial_no:
raise errors.ConfigurationError("Configuration object updated since"
" it has been read: %d != %d",
current.serial_no, target.serial_no)
def replace_in(target, tdict):
check_serial(target, tdict.get(target.uuid))
tdict[target.uuid] = target
update_serial = False
if isinstance(target, objects.Cluster):
test = target == self._ConfigData().cluster
check_serial(target, self._ConfigData().cluster)
self._ConfigData().cluster = target
elif isinstance(target, objects.Node):
test = target in self._ConfigData().nodes.values()
replace_in(target, self._ConfigData().nodes)
update_serial = True
elif isinstance(target, objects.Instance):
test = target in self._ConfigData().instances.values()
replace_in(target, self._ConfigData().instances)
elif isinstance(target, objects.NodeGroup):
test = target in self._ConfigData().nodegroups.values()
replace_in(target, self._ConfigData().nodegroups)
elif isinstance(target, objects.Network):
test = target in self._ConfigData().networks.values()
replace_in(target, self._ConfigData().networks)
raise errors.ProgrammerError("Invalid object type (%s) passed to"
" ConfigWriter.Update" % type(target))
if not test:
raise errors.ConfigurationError("Configuration updated since object"
" has been read or unknown object")
target.serial_no += 1
target.mtime = now = time.time()
......@@ -1656,7 +1656,7 @@ class JobQueue(object):
"""Queue used to manage the jobs.
def __init__(self, context):
def __init__(self, context, cfg):
"""Constructor for JobQueue.
The constructor will initialize the job queue object and then
......@@ -1697,7 +1697,7 @@ class JobQueue(object):
# Get initial list of nodes
self._nodes = dict((, n.primary_ip)
for n in self.context.cfg.GetAllNodesInfo().values()
for n in cfg.GetAllNodesInfo().values()
if n.master_candidate)
# Remove master node
......@@ -304,7 +304,8 @@ class Processor(object):
self.context = context
self._ec_id = ec_id
self._cbs = None
self.rpc = context.rpc
self.cfg = context.GetConfig(ec_id)
self.rpc = context.GetRpc(self.cfg)
self.hmclass = hooksmaster.HooksMaster
self._enable_locks = enable_locks
self.wconfd = wconfd # Indirection to allow testing
......@@ -348,12 +349,12 @@ class Processor(object):
if opportunistic:
expand_fns = {
locking.LEVEL_CLUSTER: (lambda: [locking.BGL]),
locking.LEVEL_INSTANCE: self.context.cfg.GetInstanceList,
locking.LEVEL_INSTANCE: self.cfg.GetInstanceList,
locking.LEVEL_NODE_ALLOC: (lambda: [locking.NAL]),
locking.LEVEL_NODEGROUP: self.context.cfg.GetNodeGroupList,
locking.LEVEL_NODE: self.context.cfg.GetNodeList,
locking.LEVEL_NODE_RES: self.context.cfg.GetNodeList,
locking.LEVEL_NETWORK: self.context.cfg.GetNetworkList,
locking.LEVEL_NODEGROUP: self.cfg.GetNodeGroupList,
locking.LEVEL_NODE: self.cfg.GetNodeList,
locking.LEVEL_NODE_RES: self.cfg.GetNodeList,
locking.LEVEL_NETWORK: self.cfg.GetNetworkList,
names = expand_fns[level]()
......@@ -406,7 +407,7 @@ class Processor(object):
"""Logical Unit execution sequence.
write_count = self.context.cfg.write_count
write_count = self.cfg.write_count
hm = self.BuildHooksManager(lu)
......@@ -434,7 +435,7 @@ class Processor(object):
self.Log, result)
# FIXME: This needs locks if not lu_class.REQ_BGL
if write_count != self.context.cfg.write_count:
if write_count != self.cfg.write_count:
return result
......@@ -607,8 +608,8 @@ class Processor(object):
" disabled" % op.OP_ID)
lu = lu_class(self, op, self.context, self.rpc, self._wconfdcontext,
lu = lu_class(self, op, self.context, self.cfg, self.rpc,
self._wconfdcontext, self.wconfd)
lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext)
assert lu.needed_locks is not None, "needed_locks not set by LU"
......@@ -618,7 +619,7 @@ class Processor(object):
if self._ec_id:
# Release BGL if owned
bglname = "%s/%s" % (locking.LEVEL_NAMES[locking.LEVEL_CLUSTER],
......@@ -419,7 +419,9 @@ class ClientOps:
elif method == luxi.REQ_SET_WATCHER_PAUSE:
(until, ) = args
return _SetWatcherPause(context, until)
ec_id = None
return _SetWatcherPause(context, ec_id, until)
logging.critical("Request '%s' in luxi.REQ_ALL, but not known", method)
......@@ -460,21 +462,16 @@ class GanetiContext(object):
# Create a livelock file
self.livelock = utils.livelock.LiveLock("masterd")
# Create global configuration object
self.cfg = config.ConfigWriter()
# Locking manager
cfg = self.GetConfig(None)
self.glm = locking.GanetiLockManager(
[ for inst in self.cfg.GetAllInstancesInfo().values()],
# RPC runner
self.rpc = rpc.RpcRunner(self.cfg, self.glm.AddToLockMonitor)
[ for inst in cfg.GetAllInstancesInfo().values()],
# Job queue
self.jobqueue = jqueue.JobQueue(self)
self.jobqueue = jqueue.JobQueue(self, cfg)
# setting this also locks the class against attribute modifications
self.__class__._instance = self
......@@ -489,6 +486,12 @@ class GanetiContext(object):
def GetWConfdContext(self, ec_id):
return config.GetWConfdContext(ec_id, self.livelock)
def GetConfig(self, ec_id):
return config.GetConfig(ec_id, self.livelock)
def GetRpc(self, cfg):
return rpc.RpcRunner(cfg, self.glm.AddToLockMonitor)
def AddNode(self, cfg, node, ec_id):
"""Adds a node to the configuration.
......@@ -517,7 +520,7 @@ class GanetiContext(object):
def _SetWatcherPause(context, until):
def _SetWatcherPause(context, ec_id, until):
"""Creates or removes the watcher pause file.
@type context: L{GanetiContext}
......@@ -526,7 +529,7 @@ def _SetWatcherPause(context, until):
@param until: Unix timestamp saying until when the watcher shouldn't run