diff --git a/lib/cli.py b/lib/cli.py index 55027890afa01451d53afc35f03b889658111fdf..558feacf6473abf7c93b869250103e40412ea6a8 100644 --- a/lib/cli.py +++ b/lib/cli.py @@ -36,6 +36,7 @@ from ganeti import opcodes from ganeti import luxi from ganeti import ssconf from ganeti import rpc +from ganeti import ssh from optparse import (OptionParser, TitledHelpFormatter, Option, OptionValueError) @@ -128,6 +129,7 @@ __all__ = [ "JobExecutor", "JobSubmittedException", "ParseTimespec", + "RunWhileClusterStopped", "SubmitOpCode", "SubmitOrSend", "UsesRPC", @@ -1549,6 +1551,127 @@ def GenericInstanceCreate(mode, opts, args): return 0 +class _RunWhileClusterStoppedHelper: + """Helper class for L{RunWhileClusterStopped} to simplify state management + + """ + def __init__(self, feedback_fn, cluster_name, master_node, online_nodes): + """Initializes this class. + + @type feedback_fn: callable + @param feedback_fn: Feedback function + @type cluster_name: string + @param cluster_name: Cluster name + @type master_node: string + @param master_node Master node name + @type online_nodes: list + @param online_nodes: List of names of online nodes + + """ + self.feedback_fn = feedback_fn + self.cluster_name = cluster_name + self.master_node = master_node + self.online_nodes = online_nodes + + self.ssh = ssh.SshRunner(self.cluster_name) + + self.nonmaster_nodes = [name for name in online_nodes + if name != master_node] + + assert self.master_node not in self.nonmaster_nodes + + def _RunCmd(self, node_name, cmd): + """Runs a command on the local or a remote machine. + + @type node_name: string + @param node_name: Machine name + @type cmd: list + @param cmd: Command + + """ + if node_name is None or node_name == self.master_node: + # No need to use SSH + result = utils.RunCmd(cmd) + else: + result = self.ssh.Run(node_name, "root", utils.ShellQuoteArgs(cmd)) + + if result.failed: + errmsg = ["Failed to run command %s" % result.cmd] + if node_name: + errmsg.append("on node %s" % node_name) + errmsg.append(": exitcode %s and error %s" % + (result.exit_code, result.output)) + raise errors.OpExecError(" ".join(errmsg)) + + def Call(self, fn, *args): + """Call function while all daemons are stopped. + + @type fn: callable + @param fn: Function to be called + + """ + # Pause watcher by acquiring an exclusive lock on watcher state file + self.feedback_fn("Blocking watcher") + watcher_block = utils.FileLock.Open(constants.WATCHER_STATEFILE) + try: + # TODO: Currently, this just blocks. There's no timeout. + # TODO: Should it be a shared lock? + watcher_block.Exclusive(blocking=True) + + # Stop master daemons, so that no new jobs can come in and all running + # ones are finished + self.feedback_fn("Stopping master daemons") + self._RunCmd(None, [constants.DAEMON_UTIL, "stop-master"]) + try: + # Stop daemons on all nodes + for node_name in self.online_nodes: + self.feedback_fn("Stopping daemons on %s" % node_name) + self._RunCmd(node_name, [constants.DAEMON_UTIL, "stop-all"]) + + # All daemons are shut down now + try: + return fn(self, *args) + except Exception: + logging.exception("Caught exception") + raise + finally: + # Start cluster again, master node last + for node_name in self.nonmaster_nodes + [self.master_node]: + self.feedback_fn("Starting daemons on %s" % node_name) + self._RunCmd(node_name, [constants.DAEMON_UTIL, "start-all"]) + finally: + # Resume watcher + watcher_block.Close() + + +def RunWhileClusterStopped(feedback_fn, fn, *args): + """Calls a function while all cluster daemons are stopped. + + @type feedback_fn: callable + @param feedback_fn: Feedback function + @type fn: callable + @param fn: Function to be called when daemons are stopped + + """ + feedback_fn("Gathering cluster information") + + # This ensures we're running on the master daemon + cl = GetClient() + + (cluster_name, master_node) = \ + cl.QueryConfigValues(["cluster_name", "master_node"]) + + online_nodes = GetOnlineNodes([], cl=cl) + + # Don't keep a reference to the client. The master daemon will go away. + del cl + + assert master_node in online_nodes + + return _RunWhileClusterStoppedHelper(feedback_fn, cluster_name, master_node, + online_nodes).Call(fn, *args) + + def GenerateTable(headers, fields, separator, data, numfields=None, unitfields=None, units=None):