Commit a4eae71f authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

Provide feedback from redistributing configuration



This is particularily useful for “gnt-cluster redist-conf”, but
also for all other cases where the configuration files are
rewritten on other nodes.

$ gnt-cluster redist-conf
… Copy of file /var/lib/ganeti/config.data to node … failed: Error while
executing backend function: [Errno 1] Operation not permitted
… Error while uploading ssconf files to node …: Error while executing backend
function: [Errno 1] Operation not permitted

$ gnt-node modify --offline no --force node3.example.com
… - WARNING: Not enough master candidates (desired 10, new value will be 4)
… Copy of file /var/lib/ganeti/config.data to node node8.example.com failed:
Error while executing backend function: [Errno 1] Operation not permitted
Modified node node3.example.com
 - offline -> True
 - master_candidate -> auto-demotion due to offline
Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarIustin Pop <iustin@google.com>
parent a10caf87
......@@ -289,7 +289,7 @@ def InitCluster(cluster_name, mac_prefix,
InitConfig(constants.CONFIG_VERSION, cluster_config, master_node_config)
cfg = config.ConfigWriter()
ssh.WriteKnownHostsFile(cfg, constants.SSH_KNOWN_HOSTS_FILE)
cfg.Update(cfg.GetClusterInfo())
cfg.Update(cfg.GetClusterInfo(), logging.error)
# start the master ip
# TODO: Review rpc call from bootstrap
......@@ -482,7 +482,7 @@ def MasterFailover(no_voting=False):
cluster_info.master_node = new_master
# this will also regenerate the ssconf files, since we updated the
# cluster info
cfg.Update(cluster_info)
cfg.Update(cluster_info, logging.error)
result = rpc.RpcRunner.call_node_start_master(new_master, True, no_voting)
msg = result.fail_msg
......
......@@ -1730,10 +1730,10 @@ class LURepairDiskSizes(NoHooksLU):
" correcting: recorded %d, actual %d", idx,
instance.name, disk.size, size)
disk.size = size
self.cfg.Update(instance)
self.cfg.Update(instance, feedback_fn)
changed.append((instance.name, idx, size))
if self._EnsureChildSizes(disk):
self.cfg.Update(instance)
self.cfg.Update(instance, feedback_fn)
changed.append((instance.name, idx, disk.size))
return changed
......@@ -1794,7 +1794,7 @@ class LURenameCluster(LogicalUnit):
cluster = self.cfg.GetClusterInfo()
cluster.cluster_name = clustername
cluster.master_ip = ip
self.cfg.Update(cluster)
self.cfg.Update(cluster, feedback_fn)
# update the known hosts file
ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
......@@ -1988,7 +1988,7 @@ class LUSetClusterParams(LogicalUnit):
# we need to update the pool size here, otherwise the save will fail
_AdjustCandidatePool(self, [])
self.cfg.Update(self.cluster)
self.cfg.Update(self.cluster, feedback_fn)
def _RedistributeAncillaryFiles(lu, additional_nodes=None):
......@@ -2009,6 +2009,7 @@ def _RedistributeAncillaryFiles(lu, additional_nodes=None):
dist_nodes.extend(additional_nodes)
if myself.name in dist_nodes:
dist_nodes.remove(myself.name)
# 2. Gather files to distribute
dist_files = set([constants.ETC_HOSTS,
constants.SSH_KNOWN_HOSTS_FILE,
......@@ -2058,7 +2059,7 @@ class LURedistributeConfig(NoHooksLU):
"""Redistribute the configuration.
"""
self.cfg.Update(self.cfg.GetClusterInfo())
self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
_RedistributeAncillaryFiles(self)
......@@ -2958,7 +2959,7 @@ class LUAddNode(LogicalUnit):
_RedistributeAncillaryFiles(self)
self.context.ReaddNode(new_node)
# make sure we redistribute the config
self.cfg.Update(new_node)
self.cfg.Update(new_node, feedback_fn)
# and make sure the new node will not have old files around
if not new_node.master_candidate:
result = self.rpc.call_node_demote_from_mc(new_node.name)
......@@ -3113,7 +3114,7 @@ class LUSetNodeParams(LogicalUnit):
result.append(("offline", "clear offline status due to drain"))
# this will trigger configuration file update, if needed
self.cfg.Update(node)
self.cfg.Update(node, feedback_fn)
# this will trigger job queue propagation or cleanup
if changed_mc:
self.context.ReaddNode(node)
......@@ -3828,7 +3829,7 @@ class LUReinstallInstance(LogicalUnit):
if self.op.os_type is not None:
feedback_fn("Changing OS to '%s'..." % self.op.os_type)
inst.os = self.op.os_type
self.cfg.Update(inst)
self.cfg.Update(inst, feedback_fn)
_StartInstanceDisks(self, inst, None)
try:
......@@ -4501,7 +4502,7 @@ class LUFailoverInstance(LogicalUnit):
instance.primary_node = target_node
# distribute new instance config to the other nodes
self.cfg.Update(instance)
self.cfg.Update(instance, feedback_fn)
# Only start the instance if it's marked as up
if instance.admin_up:
......@@ -4725,7 +4726,7 @@ class LUMoveInstance(LogicalUnit):
(",".join(errs),))
instance.primary_node = target_node
self.cfg.Update(instance)
self.cfg.Update(instance, feedback_fn)
self.LogInfo("Removing the disks on the original node")
_RemoveDisks(self, instance, target_node=source_node)
......@@ -4963,7 +4964,7 @@ class TLMigrateInstance(Tasklet):
self.feedback_fn("* instance running on secondary node (%s),"
" updating config" % target_node)
instance.primary_node = target_node
self.cfg.Update(instance)
self.cfg.Update(instance, self.feedback_fn)
demoted_node = source_node
else:
self.feedback_fn("* instance confirmed to be running on its"
......@@ -5090,7 +5091,7 @@ class TLMigrateInstance(Tasklet):
instance.primary_node = target_node
# distribute new instance config to the other nodes
self.cfg.Update(instance)
self.cfg.Update(instance, self.feedback_fn)
result = self.rpc.call_finalize_migration(target_node,
instance,
......@@ -6040,7 +6041,7 @@ class LUCreateInstance(LogicalUnit):
if self.op.start:
iobj.admin_up = True
self.cfg.Update(iobj)
self.cfg.Update(iobj, feedback_fn)
logging.info("Starting instance %s on node %s", instance, pnode_name)
feedback_fn("* starting instance...")
result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
......@@ -6490,9 +6491,11 @@ class TLReplaceDisks(Tasklet):
try:
# Should we replace the secondary node?
if self.new_node is not None:
return self._ExecDrbd8Secondary()
fn = self._ExecDrbd8Secondary
else:
return self._ExecDrbd8DiskOnly()
fn = self._ExecDrbd8DiskOnly
return fn(feedback_fn)
finally:
# Deactivate the instance disks if we're replacing them on a down instance
......@@ -6608,7 +6611,7 @@ class TLReplaceDisks(Tasklet):
self.lu.LogWarning("Can't remove old LV: %s" % msg,
hint="remove unused LVs manually")
def _ExecDrbd8DiskOnly(self):
def _ExecDrbd8DiskOnly(self, feedback_fn):
"""Replace a disk on the primary or secondary for DRBD 8.
The algorithm for replace is quite complicated:
......@@ -6716,7 +6719,7 @@ class TLReplaceDisks(Tasklet):
dev.children = new_lvs
self.cfg.Update(self.instance)
self.cfg.Update(self.instance, feedback_fn)
# Wait for sync
# This can fail as the old devices are degraded and _WaitForSync
......@@ -6731,7 +6734,7 @@ class TLReplaceDisks(Tasklet):
self.lu.LogStep(6, steps_total, "Removing old storage")
self._RemoveOldStorage(self.target_node, iv_names)
def _ExecDrbd8Secondary(self):
def _ExecDrbd8Secondary(self, feedback_fn):
"""Replace the secondary node for DRBD 8.
The algorithm for replace is quite complicated:
......@@ -6844,7 +6847,7 @@ class TLReplaceDisks(Tasklet):
dev.logical_id = new_logical_id
self.cfg.SetDiskID(dev, self.instance.primary_node)
self.cfg.Update(self.instance)
self.cfg.Update(self.instance, feedback_fn)
# and now perform the drbd attach
self.lu.LogInfo("Attaching primary drbds to new secondary"
......@@ -7015,7 +7018,7 @@ class LUGrowDisk(LogicalUnit):
result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
result.Raise("Grow request failed to node %s" % node)
disk.RecordGrow(self.op.amount)
self.cfg.Update(instance)
self.cfg.Update(instance, feedback_fn)
if self.op.wait_for_sync:
disk_abort = not _WaitForSync(self, instance)
if disk_abort:
......@@ -7685,7 +7688,7 @@ class LUSetInstanceParams(LogicalUnit):
for key, val in self.op.beparams.iteritems():
result.append(("be/%s" % key, val))
self.cfg.Update(instance)
self.cfg.Update(instance, feedback_fn)
return result
......@@ -8088,7 +8091,7 @@ class LUAddTags(TagsLU):
except errors.TagError, err:
raise errors.OpExecError("Error while setting tag: %s" % str(err))
try:
self.cfg.Update(self.target)
self.cfg.Update(self.target, feedback_fn)
except errors.ConfigurationError:
raise errors.OpRetryError("There has been a modification to the"
" config file and the operation has been"
......@@ -8127,7 +8130,7 @@ class LUDelTags(TagsLU):
for tag in self.op.tags:
self.target.RemoveTag(tag)
try:
self.cfg.Update(self.target)
self.cfg.Update(self.target, feedback_fn)
except errors.ConfigurationError:
raise errors.OpRetryError("There has been a modification to the"
" config file and the operation has been"
......
......@@ -1145,7 +1145,7 @@ class ConfigWriter:
if modified:
self._WriteConfig()
def _DistributeConfig(self):
def _DistributeConfig(self, feedback_fn):
"""Distribute the configuration to the other nodes.
Currently, this only copies the configuration file. In the future,
......@@ -1154,6 +1154,7 @@ class ConfigWriter:
"""
if self._offline:
return True
bad = False
node_list = []
......@@ -1180,13 +1181,20 @@ class ConfigWriter:
msg = ("Copy of file %s to node %s failed: %s" %
(self._cfg_file, to_node, msg))
logging.error(msg)
if feedback_fn:
feedback_fn(msg)
bad = True
return not bad
def _WriteConfig(self, destination=None):
def _WriteConfig(self, destination=None, feedback_fn=None):
"""Write the configuration data to persistent storage.
"""
assert feedback_fn is None or callable(feedback_fn)
# first, cleanup the _temporary_ids set, if an ID is now in the
# other objects it should be discarded to prevent unbounded growth
# of that structure
......@@ -1206,7 +1214,7 @@ class ConfigWriter:
self.write_count += 1
# and redistribute the config file to master candidates
self._DistributeConfig()
self._DistributeConfig(feedback_fn)
# Write ssconf files on all nodes (including locally)
if self._last_cluster_serial < self._config_data.cluster.serial_no:
......@@ -1214,11 +1222,17 @@ class ConfigWriter:
result = rpc.RpcRunner.call_write_ssconf_files(
self._UnlockedGetNodeList(),
self._UnlockedGetSsconfValues())
for nname, nresu in result.items():
msg = nresu.fail_msg
if msg:
logging.warning("Error while uploading ssconf files to"
" node %s: %s", nname, msg)
errmsg = ("Error while uploading ssconf files to"
" node %s: %s" % (nname, msg))
logging.warning(errmsg)
if feedback_fn:
feedback_fn(errmsg)
self._last_cluster_serial = self._config_data.cluster.serial_no
def _UnlockedGetSsconfValues(self):
......@@ -1302,7 +1316,7 @@ class ConfigWriter:
return self._config_data.cluster
@locking.ssynchronized(_config_lock)
def Update(self, target):
def Update(self, target, feedback_fn):
"""Notify function to be called after updates.
This function must be called when an object (as returned by
......@@ -1314,6 +1328,7 @@ class ConfigWriter:
@param target: an instance of either L{objects.Cluster},
L{objects.Node} or L{objects.Instance} which is existing in
the cluster
@param feedback_fn: Callable feedback function
"""
if self._config_data is None:
......@@ -1346,4 +1361,4 @@ class ConfigWriter:
for nic in target.nics:
self._temporary_macs.discard(nic.mac)
self._WriteConfig()
self._WriteConfig(feedback_fn=feedback_fn)
......@@ -108,15 +108,15 @@ class TestConfigRunner(unittest.TestCase):
# construct a fake cluster object
fake_cl = objects.Cluster()
# fail if we didn't read the config
self.failUnlessRaises(errors.ConfigurationError, cfg.Update, fake_cl)
self.failUnlessRaises(errors.ConfigurationError, cfg.Update, fake_cl, None)
cl = cfg.GetClusterInfo()
# first pass, must not fail
cfg.Update(cl)
cfg.Update(cl, None)
# second pass, also must not fail (after the config has been written)
cfg.Update(cl)
cfg.Update(cl, None)
# but the fake_cl update should still fail
self.failUnlessRaises(errors.ConfigurationError, cfg.Update, fake_cl)
self.failUnlessRaises(errors.ConfigurationError, cfg.Update, fake_cl, None)
def testUpdateNode(self):
"""Test updates on one node object"""
......@@ -124,15 +124,17 @@ class TestConfigRunner(unittest.TestCase):
# construct a fake node
fake_node = objects.Node()
# fail if we didn't read the config
self.failUnlessRaises(errors.ConfigurationError, cfg.Update, fake_node)
self.failUnlessRaises(errors.ConfigurationError, cfg.Update, fake_node,
None)
node = cfg.GetNodeInfo(cfg.GetNodeList()[0])
# first pass, must not fail
cfg.Update(node)
cfg.Update(node, None)
# second pass, also must not fail (after the config has been written)
cfg.Update(node)
cfg.Update(node, None)
# but the fake_node update should still fail
self.failUnlessRaises(errors.ConfigurationError, cfg.Update, fake_node)
self.failUnlessRaises(errors.ConfigurationError, cfg.Update, fake_node,
None)
def testUpdateInstance(self):
"""Test updates on one instance object"""
......@@ -141,16 +143,18 @@ class TestConfigRunner(unittest.TestCase):
inst = self._create_instance()
fake_instance = objects.Instance()
# fail if we didn't read the config
self.failUnlessRaises(errors.ConfigurationError, cfg.Update, fake_instance)
self.failUnlessRaises(errors.ConfigurationError, cfg.Update, fake_instance,
None)
cfg.AddInstance(inst)
instance = cfg.GetInstanceInfo(cfg.GetInstanceList()[0])
# first pass, must not fail
cfg.Update(instance)
cfg.Update(instance, None)
# second pass, also must not fail (after the config has been written)
cfg.Update(instance)
cfg.Update(instance, None)
# but the fake_instance update should still fail
self.failUnlessRaises(errors.ConfigurationError, cfg.Update, fake_instance)
self.failUnlessRaises(errors.ConfigurationError, cfg.Update, fake_instance,
None)
def testNICParameterSyntaxCheck(self):
"""Test the NIC's CheckParameterSyntax function"""
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment