diff --git a/Makefile.am b/Makefile.am index 283c104125da306f8f90e7261abae8b079384b8c..6ee3b49239105afbb7b4ca6228264bca7ec390f9 100644 --- a/Makefile.am +++ b/Makefile.am @@ -324,7 +324,8 @@ qa_scripts = \ qa/qa_tags.py \ qa/qa_utils.py -dist_sbin_SCRIPTS = +dist_sbin_SCRIPTS = \ + tools/ganeti-listrunner nodist_sbin_SCRIPTS = \ $(PYTHON_BOOTSTRAP) \ @@ -397,6 +398,7 @@ man_MANS = \ man/ganeti.7 \ man/ganeti-cleaner.8 \ man/ganeti-confd.8 \ + man/ganeti-listrunner.8 \ man/ganeti-masterd.8 \ man/ganeti-noded.8 \ man/ganeti-os-interface.7 \ diff --git a/man/ganeti-listrunner.rst b/man/ganeti-listrunner.rst new file mode 100644 index 0000000000000000000000000000000000000000..ce8086aa18cc74efef82499fc6a978140c17d717 --- /dev/null +++ b/man/ganeti-listrunner.rst @@ -0,0 +1,98 @@ +ganeti-listrunner(8) Ganeti | Version @GANETI_VERSION@ +====================================================== + +NAME +---- + +ganeti-listrunner - Run commands in parallel over multiple machines + + +SYNOPSIS +-------- + +**ganeti-listrunner** ``-l`` *logdir* +{``-x`` *executable* | ``-c`` *shell-cmd*} +{``-f`` *hostfile* | ``-h`` *hostlist*} +[``-a`` *aux-file*] +[``-b`` *batch-size*] +[``-u`` *username*] +[``-A``] + + +DESCRIPTION +----------- + +**ganeti-listrunner** is a tool to run commands in parallel over multiple +machines. It differs from ``dsh`` or other tools in that it asks for the +password once (if not using ``ssh-agent``) and then reuses the password to +connect to all machines, thus being easily usable even when public key +authentication or Kerberos authentication is not available. + +It can run either a command or a script (which gets uploaded first and deleted +after execution) on a list of hosts provided either via a file (one host per +line) or as a comma-separated list on the commandline. The outβ put (stdout and +stderr are merged) of the remote execution is written to a logfile. One logfile +per host is written. + + +OPTIONS +------- + +The options that can be passed to the program are as follows: + +``-l`` *logdir* + The directory under which the logfiles files should be written. + +``-x`` *executable* + The executable to copy and run on the target hosts. + +``-c`` *shell-cmd* + The shell command to run on the remote hosts. + +``-f`` *hostfile* + The file with the target hosts, one hostname per line. + +``-h`` *hostlist* + Comma-separated list of target hosts. + +``-a`` *aux-file* + A file to copy to the target hosts. Can be given multiple times, in which case + all files will be copied to the temporary directory. The executable or the + shell command will be run from the (temporary) directory where these files + have been copied. + +``-b`` *batch-size* + The host list will be split into batches of batch-size which will be processed + in parallel. The default if 15, and should be increased if faster processing + is needed. + +``-u`` *username* + Username to connect as instead of the default root username. + +``-A`` + Use an existing ssh-agent instead of password authentication. + + +EXIT STATUS +----------- + +The exist status of the command will be zero, unless it was aborted in some way +(e.g. ^C). + + +EXAMPLE +------- + +Run a command on a list of hosts:: + + listrunner -l logdir -c "uname -a" -h host1,host2,host3 + +Upload a script, some auxiliary files and run the script:: + + listrunner -l logdir -x runme.sh -a seed.dat -a golden.dat -h host1,host2,host3 + + +SEE ALSO +-------- + +dsh(1), cssh(1) diff --git a/tools/ganeti-listrunner b/tools/ganeti-listrunner new file mode 100644 index 0000000000000000000000000000000000000000..72b4266b200ecb770b4c680abecff24b6e3438aa --- /dev/null +++ b/tools/ganeti-listrunner @@ -0,0 +1,553 @@ +#!/usr/bin/python +# + +# Copyright (C) 2006, 2007, 2010 Google Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA +# 02110-1301, USA. + +"""Run an executable on a list of hosts. + +Script to serially run an executable on a list of hosts via ssh +with password auth as root. If the provided log dir does not yet +exist, it will try to create it. + +Implementation: + - the main process spawns up to batch_size children, which: + - connects to the remote host via ssh as root + - uploads the executable with a random name to /tmp via sftp + - chmod 500s it + - via ssh: chdirs into the upload directory and runs the script + - deletes it + - writes status messages and all output to one logfile per host + - the main process gathers then the status of the children and + reports the success/failure ratio + - entire script can be aborted with Ctrl-C + +Security considerations: + - the root password for the remote hosts is stored in memory for the + runtime of the script + - the executable to be run on the remote host is handled the following way: + - try to create a random directory with permissions 700 on the + remote host, abort furter processing on this host if this failes + - upload the executable with to a random filename in that directory + - set executable permissions to 500 + - run the executable + - delete the execuable and the directory on the remote host + +""" + +# pylint: disable-msg=C0103 +# C0103: Invalid name ganeti-listrunner + +import errno +import getopt +import getpass +import logging +import os +import random +import select +import socket +import sys +import time +import traceback + +import paramiko + + +REMOTE_PATH_BASE = "/tmp/listrunner" + + +def LogDirUseable(logdir): + """Ensure log file directory is available and usable.""" + testfile = "%s/test-%s-%s.deleteme" % (logdir, random.random(), + random.random()) + try: + os.mkdir(logdir) + except OSError, err: + if err.errno != errno.EEXIST: + raise + try: + logtest = open(testfile, "aw") + logtest.writelines("log file writeability test\n") + logtest.close() + os.unlink(testfile) + return True + except (OSError, IOError): + return False + + +def ShowHelp(executable): + """Print short usage information.""" + print ("usage: %s -l logdir [-c|-x] value [-b batch_size]" + " [-f hostfile|-h hosts] [-u username]" + " [-p password_file]" % executable) + print """ -l logdir to write logfiles to + -x executable to run on remote host(s) + -c shell command to run on remote host(s) + -f hostlist file (one host per line) + -a optional auxiliary file to upload (can be given multiple times) + -b batch-size, how many hosts to process in parallel [15] + -h comma-separated list of hosts or single hostname + -u username used to connect [root] + -p password used to authenticate""" + + +def GetTimeStamp(timestamp=None): + """Return ISO8601 timestamp. + + Returns ISO8601 timestamp, optionally expects a time.localtime() tuple + in timestamp, but will use the current time if this argument is not + supplied. + """ + if timestamp is None: + timestamp = time.localtime() + + isotime = time.strftime("%Y-%m-%dT%H:%M:%S", timestamp) + return isotime + + +def PingByTcp(target, port, timeout=10, live_port_needed=False, source=None): + """Simple ping implementation using TCP connect(2). + + Try to do a TCP connect(2) from an optional source IP to the + specified target IP and the specified target port. If the optional + parameter live_port_needed is set to true, requires the remote end + to accept the connection. The timeout is specified in seconds and + defaults to 10 seconds. If the source optional argument is not + passed, the source address selection is left to the kernel, + otherwise we try to connect using the passed address (failures to + bind other than EADDRNOTAVAIL will be ignored). + + """ + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + + success = False + + if source is not None: + try: + sock.bind((source, 0)) + except socket.error, (errcode): + if errcode == errno.EADDRNOTAVAIL: + success = False + + sock.settimeout(timeout) + + try: + sock.connect((target, port)) + sock.close() + success = True + except socket.timeout: + success = False + except socket.error, (errcode): + success = (not live_port_needed) and (errcode == errno.ECONNREFUSED) + + return success + + +def GetHosts(hostsfile): + """Return list of hosts from hostfile. + + Reads the hostslist file and returns a list of hosts. + Expects the hostslist file to contain one hostname per line. + + """ + try: + datafile = open(hostsfile, "r") + except IOError, msg: + print "Failed to open hosts file %s: %s" % (hostsfile, msg) + sys.exit(2) + + hosts = datafile.readlines() + datafile.close() + + return hosts + + +def WriteLog(message, logfile): + """Writes message, terminated by newline, to logfile.""" + try: + logfile = open(logfile, "aw") + except IOError, msg: + print "failed to open log file %s: %s" % (logfile, msg) + print "log message was: %s" % message + sys.exit(1) # no being able to log is critical + try: + timestamp = GetTimeStamp() + logfile.writelines("%s %s\n" % (timestamp, message)) + logfile.close() + except IOError, msg: + print "failed to write to logfile %s: %s" % (logfile, msg) + print "log message was: %s" % message + sys.exit(1) # no being able to log is critical + + +def GetAgentKeys(): + """Tries to get a list of ssh keys from an agent.""" + try: + agent = paramiko.Agent() + return list(agent.get_keys()) + except paramiko.SSHException: + return [] + + +def SetupSshConnection(host, username, password, keys, logfile): + """Setup the ssh connection used for all later steps. + + This function sets up the ssh connection that will be used both + for upload and remote command execution. + + On success, it will return paramiko.Transport object with an + already logged in session. On failure, False will be returned. + + """ + # check if target is willing to talk to us at all + if not PingByTcp(host, 22, live_port_needed=True): + WriteLog("ERROR: FAILURE_NOT_REACHABLE", logfile) + print " - ERROR: host not reachable on 22/tcp" + return False + + all_kwargs = [{"pkey": k} for k in keys] + all_desc = ["key %d" % d for d in range(len(keys))] + if password is not None: + all_kwargs.append({"password": password}) + all_desc.append("password") + + # deal with logging out of paramiko.transport + handler = None + + for desc, kwargs in zip(all_desc, all_kwargs): + try: + transport = paramiko.Transport((host, 22)) + + # only try to setup the logging handler once + if not handler: + handler = logging.StreamHandler() + handler.setLevel(logging.ERROR) + log = logging.getLogger(transport.get_log_channel()) + log.addHandler(handler) + + transport.connect(username=username, **kwargs) # pylint: disable-msg=W0142 + WriteLog("ssh connection established using %s" % desc, logfile) + # strange ... when establishing the session and the immediately + # setting up the channels for sftp & shell from that, it sometimes + # fails, but waiting 1 second after session setup makes it always work + # time.sleep(1) + # FIXME apparently needfull to give sshd some time + return transport + except (socket.gaierror, socket.error, paramiko.SSHException): + continue + + methods = ", ".join(all_desc) + WriteLog("ERROR: FAILURE_CONNECTION_SETUP (tried %s) " % methods, logfile) + WriteLog("aborted", logfile) + print " - ERROR: connection setup failed (tried %s)" % methods + + return False + + +def UploadFiles(connection, executable, filelist, logfile): + """Uploads the specified files via sftp. + + Uploads the specified files to a random, freshly created directory with + a temporary name under /tmp. All uploaded files are chmod 0400 after upload + with the exception of executable, with is chmod 500. + + Upon success, returns the absolute path to the remote upload directory, + but will return False upon failure. + """ + remote_dir = "%s.%s-%s" % (REMOTE_PATH_BASE, + random.random(), random.random()) + + try: + sftp = paramiko.SFTPClient.from_transport(connection) + sftp.mkdir(remote_dir, mode=0700) + for item in filelist: + remote_file = "%s/%s" % (remote_dir, item.split("/").pop()) + WriteLog("uploading %s to remote %s" % (item, remote_file), logfile) + sftp.put(item, remote_file) + if item == executable: + sftp.chmod(remote_file, 0500) + else: + sftp.chmod(remote_file, 0400) + sftp.close() + except IOError, err: + WriteLog("ERROR: FAILURE_UPLOAD: %s" % err, logfile) + return False + + return remote_dir + + +def CleanupRemoteDir(connection, upload_dir, filelist, logfile): + """Cleanes out and removes the remote work directory.""" + try: + sftp = paramiko.SFTPClient.from_transport(connection) + for item in filelist: + fullpath = "%s/%s" % (upload_dir, item.split("/").pop()) + WriteLog("removing remote %s" % fullpath, logfile) + sftp.remove(fullpath) + sftp.rmdir(upload_dir) + sftp.close() + except IOError, err: + WriteLog("ERROR: FAILURE_CLEANUP: %s" % err, logfile) + return False + + return True + + +def RunRemoteCommand(connection, command, logfile): + """Execute the command via ssh on the remote host.""" + session = connection.open_session() + session.setblocking(0) + + # the following dance is needed because paramiko changed APIs: + # from returning True/False for success to always returning None + # and throwing an exception in case of problems. + # And I want to support both the old and the new API. + result = True # being optimistic here, I know + message = None + try: + if session.exec_command("%s 2>&1" % command) is False: + result = False + except paramiko.SSHException, message: + result = False + + if not result: + WriteLog("ERROR: FAILURE_COMMAND_EXECUTION: %s" % message, logfile) + return False + + ### Read when data is available + output = "" + while select.select([session], [], []): + data = session.recv(1024) + if not data: + break + output += data + select.select([], [], [], .1) + + WriteLog("SUCCESS: command output follows", logfile) + for line in output.split("\n"): + WriteLog("output = %s" %line, logfile) + WriteLog("command execution completed", logfile) + session.close() + + return True + + +def HostWorker(logdir, username, password, keys, hostname, + executable, command, filelist): + """Per-host worker. + + This function does not return - it's the main code of the childs, + which exit at the end of this function. The exit code 0 or 1 will be + interpreted by the parent. + + Args: + logdir: the directory where the logfiles must be created + username: SSH username + password: SSH password + keys: SSH keys + hostname: the hostname to connect to + executable: the executable to upload, if not None + command: the command to run + filelist: auxiliary files to upload + + """ + # in the child/worker process + logfile = "%s/%s.log" % (logdir, hostname) + print "%s - starting" % hostname + result = 0 # optimism, I know + try: + connection = SetupSshConnection(hostname, username, + password, keys, logfile) + if connection is not False: + if executable is not None: + print " %s: uploading files" % hostname + upload_dir = UploadFiles(connection, executable, + filelist, logfile) + command = "cd %s && ./%s" % (upload_dir, + executable.split("/").pop()) + print " %s: executing remote command" % hostname + cmd_result = RunRemoteCommand(connection, command, logfile) + if cmd_result is True: + print " %s: remote command execution successful" % hostname + else: + print (" %s: remote command execution failed," + " check log for details" % hostname) + result = 1 + if executable is not None: + print " %s: cleaning up remote work dir" % hostname + cln_result = CleanupRemoteDir(connection, upload_dir, + filelist, logfile) + if cln_result is False: + print (" %s: remote work dir cleanup failed, check" + " log for details" % hostname) + result = 1 + connection.close() + else: + print " %s: connection setup failed, skipping" % hostname + result = 1 + except KeyboardInterrupt: + print " %s: received KeyboardInterrupt, aborting" % hostname + WriteLog("ERROR: ABORT_KEYBOARD_INTERRUPT", logfile) + result = 1 + except Exception, err: + result = 1 + trace = traceback.format_exc() + msg = "ERROR: UNHANDLED_EXECPTION_ERROR: %s\nTrace: %s" % (err, trace) + WriteLog(msg, logfile) + print " %s: %s" % (hostname, msg) + # and exit with exit code 0 or 1, so the parent can compute statistics + sys.exit(result) + + +def LaunchWorker(child_pids, logdir, username, password, keys, hostname, + executable, command, filelist): + """Launch the per-host worker. + + Arguments are the same as for HostWorker, except for child_pids, + which is a dictionary holding the pid-to-hostname mapping. + + """ + hostname = hostname.rstrip("\n") + pid = os.fork() + if pid > 0: + # controller just record the pids + child_pids[pid] = hostname + else: + HostWorker(logdir, username, password, keys, hostname, + executable, command, filelist) + + +def main(): + """main.""" + try: + optlist, _ = getopt.getopt(sys.argv[1:], "l:x:h:f:a:c:b:u:p:A") + except getopt.GetoptError, err: + print str(err) + ShowHelp(sys.argv[0]) + sys.exit(2) + + logdir = executable = hostfile = hostlist = command = None + use_agent = False + auxfiles = [] + username = "root" + password = None + batch_size = 15 + for option in optlist: + if option[0] == "-l": + logdir = option[1] + if option[0] == "-x": + executable = option[1] + if option[0] == "-f": + hostfile = option[1] + if option[0] == "-h": + hostlist = option[1] + if option[0] == "-a": + auxfiles.append(option[1]) + if option[0] == "-c": + command = option[1] + if option[0] == "-b": + batch_size = int(option[1]) + if option[0] == "-u": + username = option[1] + if option[0] == "-p": + password = option[1] + if option[0] == "-A": + use_agent = True + + if not (logdir and (executable or command) and (hostfile or hostlist)): + print "error: missing required commandline argument(s)" + ShowHelp(sys.argv[0]) + sys.exit(3) + + if executable and command: + print "error: can run either a command or an executable, not both" + ShowHelp(sys.argv[0]) + sys.exit(3) + + if hostlist and hostfile: + print "error: specify either -f or -h arguments, not both" + ShowHelp(sys.argv[0]) + sys.exit(3) + + ### Unbuffered sys.stdout + sys.stdout = os.fdopen(1, "w", 0) + + if LogDirUseable(logdir) is False: + print "ERROR: cannot create logfiles in dir %s, aborting" % logdir + sys.exit(1) + + keys = [] + if use_agent: + keys = GetAgentKeys() + elif password: + try: + fh = file(password) + pwvalue = fh.readline().strip() + fh.close() + except IOError, e: + print "error: can not read in from password file %s: %s" % (password, e) + sys.exit(1) + password = pwvalue + else: + password = getpass.getpass("%s's password for all nodes: " % username) + + if hostfile: + hosts = GetHosts(hostfile) + else: + if "," in hostlist: + hostlist = hostlist.rstrip(",") # commandline robustness + hosts = hostlist.split(",") + else: + hosts = [hostlist] + + successes = failures = 0 + + filelist = auxfiles[:] + filelist.append(executable) + + # initial batch + batch = hosts[:batch_size] + hosts = hosts[batch_size:] + child_pids = {} + for hostname in batch: + LaunchWorker(child_pids, logdir, username, password, keys, hostname, + executable, command, filelist) + + while child_pids: + pid, status = os.wait() + hostname = child_pids.pop(pid, "<unknown host>") + print " %s: done (in parent)" % hostname + if os.WIFEXITED(status) and os.WEXITSTATUS(status) == 0: + successes += 1 + else: + failures += 1 + if hosts: + LaunchWorker(child_pids, logdir, username, password, keys, + hosts.pop(0), executable, command, filelist) + + print + print "All done, %s successful and %s failed hosts" % (successes, failures) + + sys.exit(0) + + +if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + print "Received KeyboardInterrupt, aborting" + sys.exit(1)