diff --git a/Makefile.am b/Makefile.am index 62625810ca4bc5c5e6a011e91e22ff86489aeac6..a4672de1e58fd10304654b59853b829ab3f2b9bd 100644 --- a/Makefile.am +++ b/Makefile.am @@ -233,6 +233,7 @@ dist_tools_SCRIPTS = \ tools/burnin \ tools/cfgshell \ tools/cfgupgrade \ + tools/cluster-merge \ tools/lvmstrap pkglib_SCRIPTS = \ diff --git a/tools/cluster-merge b/tools/cluster-merge new file mode 100644 index 0000000000000000000000000000000000000000..889d81644888f2be60dd7dc68b815182c7d8c22c --- /dev/null +++ b/tools/cluster-merge @@ -0,0 +1,510 @@ +#!/usr/bin/python +# + +# Copyright (C) 2010 Google Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA +# 02110-1301, USA. + +"""Tool to merge two or more clusters together. + +The clusters have to run the same version of Ganeti! + +""" + +# pylint: disable-msg=C0103 +# C0103: Invalid name cluster-merge + +import logging +import os +import optparse +import shutil +import sys +import tempfile + +from ganeti import cli +from ganeti import config +from ganeti import constants +from ganeti import errors +from ganeti import ssh +from ganeti import utils + + +PAUSE_PERIOD_OPT = cli.cli_option("-p", "--watcher-pause-period", default=1800, + action="store", type="int", + dest="pause_period", + help=("Amount of time in seconds watcher" + " should be suspended from running")) + + +def Flatten(unflatten_list): + """Flattens a list. + + @param unflatten_list: A list of unflatten list objects. + @return: A flatten list + + """ + flatten_list = [] + + for item in unflatten_list: + if isinstance(item, list): + flatten_list.extend(Flatten(item)) + else: + flatten_list.append(item) + return flatten_list + + +class MergerData(object): + """Container class to hold data used for merger. + + """ + def __init__(self, cluster, key_path, nodes, instances, config_path=None): + """Initialize the container. + + @param cluster: The name of the cluster + @param key_path: Path to the ssh private key used for authentication + @param config_path: Path to the merging cluster config + @param nodes: List of nodes in the merging cluster + @param instances: List of instances running on merging cluster + + """ + self.cluster = cluster + self.key_path = key_path + self.config_path = config_path + self.instances = instances + self.nodes = nodes + + +class Merger(object): + """Handling the merge. + + """ + def __init__(self, clusters, pause_period): + """Initialize object with sane defaults and infos required. + + @param clusters: The list of clusters to merge in + @param pause_period: The time watcher shall be disabled for + + """ + self.merger_data = [] + self.clusters = clusters + self.pause_period = pause_period + self.work_dir = tempfile.mkdtemp(suffix="cluster-merger") + self.cluster_name = cli.GetClient().QueryConfigValues(["cluster_name"]) + self.ssh_runner = ssh.SshRunner(self.cluster_name) + + def Setup(self): + """Sets up our end so we can do the merger. + + This method is setting us up as a preparation for the merger. + It makes the initial contact and gathers information needed. + + @raise errors.RemoteError: for errors in communication/grabbing + + """ + (remote_path, _, _) = ssh.GetUserFiles("root") + + # Fetch remotes private key + for cluster in self.clusters: + result = self._RunCmd(cluster, "cat %s" % remote_path, batch=False, + ask_key=False) + if result.failed: + raise errors.RemoteError("There was an error while grabbing ssh private" + " key from %s. Fail reason: %s; output: %s" % + (cluster, result.fail_reason, result.output)) + + key_path = os.path.join(self.work_dir, cluster) + utils.WriteFile(key_path, mode=0600, data=result.stdout) + + result = self._RunCmd(cluster, "gnt-node list -o name --no-header", + private_key=key_path) + if result.failed: + raise errors.RemoteError("Unable to retrieve list of nodes from %s." + " Fail reason: %s; output: %s" % + (cluster, result.fail_reason, result.output)) + nodes = result.stdout.splitlines() + + result = self._RunCmd(cluster, "gnt-instance list -o name --no-header", + private_key=key_path) + if result.failed: + raise errors.RemoteError("Unable to retrieve list of instances from" + " %s. Fail reason: %s; output: %s" % + (cluster, result.fail_reason, result.output)) + instances = result.stdout.splitlines() + + self.merger_data.append(MergerData(cluster, key_path, nodes, instances)) + + def _PrepareAuthorizedKeys(self): + """Prepare the authorized_keys on every merging node. + + This method add our public key to remotes authorized_key for further + communication. + + """ + (_, pub_key_file, auth_keys) = ssh.GetUserFiles("root") + pub_key = utils.ReadFile(pub_key_file) + + for data in self.merger_data: + for node in data.nodes: + result = self._RunCmd(node, ("cat >> %s << '!EOF.'\n%s!EOF.\n" % + (auth_keys, pub_key)), + private_key=data.key_path) + + if result.failed: + raise errors.RemoteError("Unable to add our public key to %s in %s." + " Fail reason: %s; output: %s" % + (node, data.cluster, result.fail_reason, + result.output)) + + def _RunCmd(self, hostname, command, user="root", use_cluster_key=False, + strict_host_check=False, private_key=None, batch=True, + ask_key=False): + """Wrapping SshRunner.Run with default parameters. + + For explanation of parameters see L{ssh.SshRunner.Run}. + + """ + return self.ssh_runner.Run(hostname=hostname, command=command, user=user, + use_cluster_key=use_cluster_key, + strict_host_check=strict_host_check, + private_key=private_key, batch=batch, + ask_key=ask_key) + + def _StopMergingInstances(self): + """Stop instances on merging clusters. + + """ + for cluster in self.clusters: + result = self._RunCmd(cluster, "gnt-instance shutdown --all" + " --force-multiple") + + if result.failed: + raise errors.RemoteError("Unable to stop instances on %s." + " Fail reason: %s; output: %s" % + (cluster, result.fail_reason, result.output)) + + def _DisableWatcher(self): + """Disable watch on all merging clusters, including ourself. + + """ + for cluster in ["localhost"] + self.clusters: + result = self._RunCmd(cluster, "gnt-cluster watcher pause %d" % + self.pause_period) + + if result.failed: + raise errors.RemoteError("Unable to pause watcher on %s." + " Fail reason: %s; output: %s" % + (cluster, result.fail_reason, result.output)) + + + # R0201: Method could be a function + def _EnableWatcher(self): # pylint: disable-msg=R0201 + """Reenable watcher (locally). + + """ + result = utils.RunCmd(["gnt-cluster", "watcher", "continue"]) + + if result.failed: + logging.warning("Unable to continue watcher. Fail reason: %s;" + " output: %s" % (result.fail_reason, + result.output)) + + def _StopDaemons(self): + """Stop all daemons on merging nodes. + + """ + # FIXME: Worth to put this into constants? + cmds = [] + for daemon in (constants.RAPI, constants.MASTERD, + constants.NODED, constants.CONFD): + cmds.append("%s stop %s" % (constants.DAEMON_UTIL, daemon)) + for data in self.merger_data: + for node in data.nodes: + result = self._RunCmd(node, " && ".join(cmds)) + + if result.failed: + raise errors.RemoteError("Unable to stop daemons on %s." + " Fail reason: %s; output: %s." % + (node, result.fail_reason, result.output)) + + def _FetchRemoteConfig(self): + """Fetches and stores remote cluster config from the master. + + This step is needed before we can merge the config. + + """ + for data in self.merger_data: + result = self._RunCmd(data.cluster, "cat %s" % + constants.CLUSTER_CONF_FILE) + + if result.failed: + raise errors.RemoteError("Unable to retrieve remote config on %s." + " Fail reason: %s; output %s" % + (data.cluster, result.fail_reason, + result.output)) + + data.config_path = os.path.join(self.work_dir, "%s_config.data" % + data.cluster) + utils.WriteFile(data.config_path, data=result.stdout) + + # R0201: Method could be a function + def _KillMasterDaemon(self): # pylint: disable-msg=R0201 + """Kills the local master daemon. + + @raise errors.CommandError: If unable to kill + + """ + result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"]) + if result.failed: + raise errors.CommandError("Unable to stop master daemons." + " Fail reason: %s; output: %s" % + (result.fail_reason, result.output)) + + def _MergeConfig(self): + """Merges all foreign config into our own config. + + """ + my_config = config.ConfigWriter(offline=True) + fake_ec_id = 0 # Needs to be uniq over the whole config merge + + for data in self.merger_data: + other_config = config.ConfigWriter(data.config_path) + + for node in other_config.GetNodeList(): + node_info = other_config.GetNodeInfo(node) + node_info.master_candidate = False + my_config.AddNode(node_info, str(fake_ec_id)) + fake_ec_id += 1 + + for instance in other_config.GetInstanceList(): + instance_info = other_config.GetInstanceInfo(instance) + + # Update the DRBD port assignments + # This is a little bit hackish + for dsk in instance_info.disks: + if dsk.dev_type in constants.LDS_DRBD: + port = my_config.AllocatePort() + + logical_id = list(dsk.logical_id) + logical_id[2] = port + dsk.logical_id = tuple(logical_id) + + physical_id = list(dsk.physical_id) + physical_id[1] = physical_id[3] = port + dsk.physical_id = tuple(physical_id) + + my_config.AddInstance(instance_info, str(fake_ec_id)) + fake_ec_id += 1 + + # R0201: Method could be a function + def _StartMasterDaemon(self, no_vote=False): # pylint: disable-msg=R0201 + """Starts the local master daemon. + + @param no_vote: Should the masterd started without voting? default: False + @raise errors.CommandError: If unable to start daemon. + + """ + env = {} + if no_vote: + env["EXTRA_MASTERD_ARGS"] = "--no-voting --yes-do-it" + + result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env) + if result.failed: + raise errors.CommandError("Couldn't start ganeti master." + " Fail reason: %s; output: %s" % + (result.fail_reason, result.output)) + + def _ReaddMergedNodesAndRedist(self): + """Readds all merging nodes and make sure their config is up-to-date. + + @raise errors.CommandError: If anything fails. + + """ + for data in self.merger_data: + for node in data.nodes: + result = utils.RunCmd(["gnt-node", "add", "--readd", + "--no-ssh-key-check", node]) + if result.failed: + raise errors.CommandError("Couldn't readd node %s. Fail reason: %s;" + " output: %s" % (node, result.fail_reason, + result.output)) + + result = utils.RunCmd(["gnt-cluster", "redist-conf"]) + if result.failed: + raise errors.CommandError("Redistribution failed. Fail reason: %s;" + " output: %s" % (result.fail_reason, + result.output)) + + # R0201: Method could be a function + def _StartupAllInstances(self): # pylint: disable-msg=R0201 + """Starts up all instances (locally). + + @raise errors.CommandError: If unable to start clusters + + """ + result = utils.RunCmd(["gnt-instance", "startup", "--all", + "--force-multiple"]) + if result.failed: + raise errors.CommandError("Unable to start all instances." + " Fail reason: %s; output: %s" % + (result.fail_reason, result.output)) + + # R0201: Method could be a function + def _VerifyCluster(self): # pylint: disable-msg=R0201 + """Runs gnt-cluster verify to verify the health. + + @raise errors.ProgrammError: If cluster fails on verification + + """ + result = utils.RunCmd(["gnt-cluster", "verify"]) + if result.failed: + raise errors.CommandError("Verification of cluster failed." + " Fail reason: %s; output: %s" % + (result.fail_reason, result.output)) + + def Merge(self): + """Does the actual merge. + + It runs all the steps in the right order and updates the user about steps + taken. Also it keeps track of rollback_steps to undo everything. + + """ + rbsteps = [] + try: + logging.info("Pre cluster verification") + self._VerifyCluster() + + logging.info("Prepare authorized_keys") + rbsteps.append("Remove our key from authorized_keys on nodes:" + " %(nodes)s") + self._PrepareAuthorizedKeys() + + rbsteps.append("Start all instances again on the merging" + " clusters: %(clusters)s") + logging.info("Stopping merging instances (takes a while)") + self._StopMergingInstances() + + logging.info("Disable watcher") + self._DisableWatcher() + logging.info("Stop daemons on merging nodes") + self._StopDaemons() + logging.info("Merging config") + self._FetchRemoteConfig() + self._KillMasterDaemon() + + rbsteps.append("Restore %s from another master candidate" % + constants.CLUSTER_CONF_FILE) + self._MergeConfig() + self._StartMasterDaemon(no_vote=True) + + # Point of no return, delete rbsteps + del rbsteps[:] + + logging.warning("We are at the point of no return. Merge can not easily" + " be undone after this point.") + logging.info("Readd nodes and redistribute config") + self._ReaddMergedNodesAndRedist() + self._KillMasterDaemon() + self._StartMasterDaemon() + logging.info("Starting instances again") + self._StartupAllInstances() + logging.info("Post cluster verification") + self._VerifyCluster() + except errors.GenericError, e: + logging.exception(e) + + if rbsteps: + nodes = Flatten([data.nodes for data in self.merger_data]) + info = { + "clusters": self.clusters, + "nodes": nodes, + } + logging.critical("In order to rollback do the following:") + for step in rbsteps: + logging.critical(" * %s" % (step % info)) + else: + logging.critical("Nothing to rollback.") + + # TODO: Keep track of steps done for a flawless resume? + + def Cleanup(self): + """Clean up our environment. + + This cleans up remote private keys and configs and after that + deletes the temporary directory. + + """ + shutil.rmtree(self.work_dir) + + +def SetupLogging(options): + """Setting up logging infrastructure. + + @param options: Parsed command line options + + """ + formatter = logging.Formatter("%(asctime)s: %(levelname)s %(message)s") + + stderr_handler = logging.StreamHandler() + stderr_handler.setFormatter(formatter) + if options.debug: + stderr_handler.setLevel(logging.NOTSET) + elif options.verbose: + stderr_handler.setLevel(logging.INFO) + else: + stderr_handler.setLevel(logging.ERROR) + + root_logger = logging.getLogger("") + root_logger.setLevel(logging.NOTSET) + root_logger.addHandler(stderr_handler) + + +def main(): + """Main routine. + + """ + program = os.path.basename(sys.argv[0]) + + parser = optparse.OptionParser(usage=("%prog [--debug|--verbose]" + " [--watcher-pause-period SECONDS]" + " <cluster> <cluster...>"), + prog=program) + parser.add_option(cli.DEBUG_OPT) + parser.add_option(cli.VERBOSE_OPT) + parser.add_option(PAUSE_PERIOD_OPT) + + (options, args) = parser.parse_args() + + SetupLogging(options) + + if not args: + parser.error("No clusters specified") + + cluster_merger = Merger(utils.UniqueSequence(args), options.pause_period) + try: + try: + cluster_merger.Setup() + cluster_merger.Merge() + except errors.GenericError, e: + logging.exception(e) + return constants.EXIT_FAILURE + finally: + cluster_merger.Cleanup() + + return constants.EXIT_SUCCESS + + +if __name__ == "__main__": + sys.exit(main())