diff --git a/.gitignore b/.gitignore index aac33b9075ce9126ab36fbd3f34a4edcf29dab3c..e4bd31eb1f2f984c14e9bc62a8e25da77b1efcab 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,7 @@ # global ignores *.py[co] +*.swp # / /Makefile diff --git a/Makefile.am b/Makefile.am index 0732fea3b63eaaf27fa904e5e2ec7b5952867c99..875cd123708b6b88f8e3db5dcb75710b34c3b480 100644 --- a/Makefile.am +++ b/Makefile.am @@ -85,6 +85,7 @@ pkgpython_PYTHON = \ lib/serializer.py \ lib/ssconf.py \ lib/ssh.py \ + lib/storage.py \ lib/utils.py \ lib/workerpool.py @@ -112,6 +113,7 @@ http_PYTHON = \ docrst = \ doc/admin.rst \ doc/design-2.0.rst \ + doc/design-2.1.rst \ doc/glossary.rst \ doc/hooks.rst \ doc/iallocator.rst \ diff --git a/daemons/ganeti-masterd b/daemons/ganeti-masterd index bf7146067e89dcc81e468e07b7d48de636aa2838..ea51950b325b032c6b2fe9b1bc32167f8d351154 100755 --- a/daemons/ganeti-masterd +++ b/daemons/ganeti-masterd @@ -43,6 +43,7 @@ from optparse import OptionParser from ganeti import config from ganeti import constants +from ganeti import daemon from ganeti import mcpu from ganeti import opcodes from ganeti import jqueue @@ -383,35 +384,6 @@ class GanetiContext(object): self.glm.remove(locking.LEVEL_NODE, name) -def ParseOptions(): - """Parse the command line options. - - @return: (options, args) as from OptionParser.parse_args() - - """ - parser = OptionParser(description="Ganeti master daemon", - usage="%prog [-f] [-d]", - version="%%prog (ganeti) %s" % - constants.RELEASE_VERSION) - - parser.add_option("-f", "--foreground", dest="fork", - help="Don't detach from the current terminal", - default=True, action="store_false") - parser.add_option("-d", "--debug", dest="debug", - help="Enable some debug messages", - default=False, action="store_true") - parser.add_option("--no-voting", dest="no_voting", - help="Do not check that the nodes agree on this node" - " being the master and start the daemon unconditionally", - default=False, action="store_true") - parser.add_option("--yes-do-it", dest="yes_do_it", - help="Override interactive check for --no-voting", - default=False, action="store_true") - - options, args = parser.parse_args() - return options, args - - def CheckAgreement(): """Check the agreement on who is the master. @@ -468,17 +440,10 @@ def CheckAgreement(): return result +def CheckMASTERD(options, args): + """Initial checks whether to run or exit with a failure -def main(): - """Main function""" - - options, args = ParseOptions() - utils.debug = options.debug - utils.no_fork = True - - if options.fork: - utils.CloseFDs() - + """ rpc.Init() try: ssconf.CheckMaster(options.debug) @@ -496,31 +461,20 @@ def main(): elif not options.no_voting: if not CheckAgreement(): return - - dirs = [(constants.RUN_GANETI_DIR, constants.RUN_DIRS_MODE), - (constants.SOCKET_DIR, constants.SOCKET_DIR_MODE), - ] - utils.EnsureDirs(dirs) - - # This is safe to do as the pid file guarantees against - # concurrent execution. - utils.RemoveFile(constants.MASTER_SOCKET) - - master = IOServer(constants.MASTER_SOCKET, ClientRqHandler) finally: rpc.Shutdown() - # become a daemon - if options.fork: - utils.Daemonize(logfile=constants.LOG_MASTERDAEMON) - utils.WritePidFile(constants.MASTERD_PID) - try: - utils.SetupLogging(constants.LOG_MASTERDAEMON, debug=options.debug, - stderr_logging=not options.fork, multithreaded=True) +def ExecMASTERD(options, args): + """Main MASTERD function, executed with the pidfile held. - logging.info("Ganeti master daemon startup") + """ + # This is safe to do as the pid file guarantees against + # concurrent execution. + utils.RemoveFile(constants.MASTER_SOCKET) + master = IOServer(constants.MASTER_SOCKET, ClientRqHandler) + try: rpc.Init() try: # activate ip @@ -538,9 +492,27 @@ def main(): finally: rpc.Shutdown() finally: - utils.RemovePidFile(constants.MASTERD_PID) utils.RemoveFile(constants.MASTER_SOCKET) +def main(): + """Main function""" + parser = OptionParser(description="Ganeti master daemon", + usage="%prog [-f] [-d]", + version="%%prog (ganeti) %s" % + constants.RELEASE_VERSION) + parser.add_option("--no-voting", dest="no_voting", + help="Do not check that the nodes agree on this node" + " being the master and start the daemon unconditionally", + default=False, action="store_true") + parser.add_option("--yes-do-it", dest="yes_do_it", + help="Override interactive check for --no-voting", + default=False, action="store_true") + dirs = [(constants.RUN_GANETI_DIR, constants.RUN_DIRS_MODE), + (constants.SOCKET_DIR, constants.SOCKET_DIR_MODE), + ] + daemon.GenericMain(constants.MASTERD, parser, dirs, + CheckMASTERD, ExecMASTERD) + if __name__ == "__main__": main() diff --git a/daemons/ganeti-noded b/daemons/ganeti-noded index 876321d4ba82ba9e09faf52a5693f3b69d9016da..db3c7aa88648bf23528553089d51ee9b57c99b27 100755 --- a/daemons/ganeti-noded +++ b/daemons/ganeti-noded @@ -42,6 +42,7 @@ from ganeti import jstore from ganeti import daemon from ganeti import http from ganeti import utils +from ganeti import storage import ganeti.http.server @@ -354,6 +355,16 @@ class NodeHttpServer(http.server.HttpServer): """ return backend.ListVolumeGroups() + # Storage -------------------------- + + @staticmethod + def perspective_storage_list(params): + """Get list of storage units. + + """ + (su_name, su_args, name, fields) = params + return storage.GetStorage(su_name, *su_args).List(name, fields) + # bridge -------------------------- @staticmethod @@ -732,86 +743,44 @@ class NodeHttpServer(http.server.HttpServer): return backend.ValidateHVParams(hvname, hvparams) -def ParseOptions(): - """Parse the command line options. - - @return: (options, args) as from OptionParser.parse_args() +def ExecNODED(options, args): + """Main NODED function, executed with the pidfile held. """ - parser = OptionParser(description="Ganeti node daemon", - usage="%prog [-f] [-d] [-b ADDRESS]", - version="%%prog (ganeti) %s" % - constants.RELEASE_VERSION) + global queue_lock - parser.add_option("-f", "--foreground", dest="fork", - help="Don't detach from the current terminal", - default=True, action="store_false") - parser.add_option("-d", "--debug", dest="debug", - help="Enable some debug messages", - default=False, action="store_true") - parser.add_option("-b", "--bind", dest="bind_address", - help="Bind address", - default="", metavar="ADDRESS") + # Read SSL certificate + if options.ssl: + ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key, + ssl_cert_path=options.ssl_cert) + else: + ssl_params = None - options, args = parser.parse_args() - return options, args + # Prepare job queue + queue_lock = jstore.InitAndVerifyQueue(must_lock=False) + + mainloop = daemon.Mainloop() + server = NodeHttpServer(mainloop, options.bind_address, options.port, + ssl_params=ssl_params, ssl_verify_peer=True) + server.Start() + try: + mainloop.Run() + finally: + server.Stop() def main(): """Main function for the node daemon. """ - global queue_lock - - options, args = ParseOptions() - utils.debug = options.debug - - if options.fork: - utils.CloseFDs() - - for fname in (constants.SSL_CERT_FILE,): - if not os.path.isfile(fname): - print "config %s not there, will not run." % fname - sys.exit(5) - - try: - port = utils.GetNodeDaemonPort() - except errors.ConfigurationError, err: - print "Cluster configuration incomplete: '%s'" % str(err) - sys.exit(5) - + parser = OptionParser(description="Ganeti node daemon", + usage="%prog [-f] [-d] [-p port] [-b ADDRESS]", + version="%%prog (ganeti) %s" % + constants.RELEASE_VERSION) dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS] dirs.append((constants.LOG_OS_DIR, 0750)) dirs.append((constants.LOCK_DIR, 1777)) - utils.EnsureDirs(dirs) - - # become a daemon - if options.fork: - utils.Daemonize(logfile=constants.LOG_NODESERVER) - - utils.WritePidFile(constants.NODED_PID) - try: - utils.SetupLogging(logfile=constants.LOG_NODESERVER, debug=options.debug, - stderr_logging=not options.fork) - logging.info("ganeti node daemon startup") - - # Read SSL certificate - ssl_params = http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE, - ssl_cert_path=constants.SSL_CERT_FILE) - - # Prepare job queue - queue_lock = jstore.InitAndVerifyQueue(must_lock=False) - - mainloop = daemon.Mainloop() - server = NodeHttpServer(mainloop, options.bind_address, port, - ssl_params=ssl_params, ssl_verify_peer=True) - server.Start() - try: - mainloop.Run() - finally: - server.Stop() - finally: - utils.RemovePidFile(constants.NODED_PID) + daemon.GenericMain(constants.NODED, parser, dirs, None, ExecNODED) if __name__ == '__main__': diff --git a/daemons/ganeti-rapi b/daemons/ganeti-rapi index 7b9710a9bf46930beeda8342d10f32a263c96939..3208b545e1b10c35861256070eafe03d66a660b3 100755 --- a/daemons/ganeti-rapi +++ b/daemons/ganeti-rapi @@ -177,92 +177,51 @@ class RemoteApiHttpServer(http.auth.HttpServerRequestAuthentication, return result -def ParseOptions(): - """Parse the command line options. - - @return: (options, args) as from OptionParser.parse_args() +def CheckRAPI(options, args): + """Initial checks whether to run or exit with a failure """ - parser = optparse.OptionParser(description="Ganeti Remote API", - usage="%prog [-d] [-p port]", - version="%%prog (ganeti) %s" % - constants.RAPI_VERSION) - parser.add_option("-d", "--debug", dest="debug", - help="Enable some debug messages", - default=False, action="store_true") - parser.add_option("-p", "--port", dest="port", - help="Port to run API (%s default)." % - constants.RAPI_PORT, - default=constants.RAPI_PORT, type="int") - parser.add_option("--no-ssl", dest="ssl", - help="Do not secure HTTP protocol with SSL", - default=True, action="store_false") - parser.add_option("-K", "--ssl-key", dest="ssl_key", - help="SSL key", - default=constants.RAPI_CERT_FILE, type="string") - parser.add_option("-C", "--ssl-cert", dest="ssl_cert", - help="SSL certificate", - default=constants.RAPI_CERT_FILE, type="string") - parser.add_option("-f", "--foreground", dest="fork", - help="Don't detach from the current terminal", - default=True, action="store_false") - - options, args = parser.parse_args() - if len(args) != 0: - print >> sys.stderr, "Usage: %s [-d] [-p port]" % sys.argv[0] - sys.exit(1) + print >> sys.stderr, "Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" % \ + sys.argv[0] + sys.exit(constants.EXIT_FAILURE) - if options.ssl and not (options.ssl_cert and options.ssl_key): - print >> sys.stderr, ("For secure mode please provide " - "--ssl-key and --ssl-cert arguments") - sys.exit(1) + ssconf.CheckMaster(options.debug) - return options, args - -def main(): - """Main function. +def ExecRAPI(options, args): + """Main RAPI function, executed with the pidfile held. """ - options, args = ParseOptions() - - if options.fork: - utils.CloseFDs() - + # Read SSL certificate if options.ssl: - # Read SSL certificate - try: - ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key, - ssl_cert_path=options.ssl_cert) - except Exception, err: - sys.stderr.write("Can't load the SSL certificate/key: %s\n" % (err,)) - sys.exit(1) + ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key, + ssl_cert_path=options.ssl_cert) else: ssl_params = None - ssconf.CheckMaster(options.debug) + mainloop = daemon.Mainloop() + server = RemoteApiHttpServer(mainloop, options.bind_address, options.port, + ssl_params=ssl_params, ssl_verify_peer=False, + request_executor_class=JsonErrorRequestExecutor) + server.Start() + try: + mainloop.Run() + finally: + server.Stop() - if options.fork: - utils.Daemonize(logfile=constants.LOG_RAPISERVER) - utils.SetupLogging(constants.LOG_RAPISERVER, debug=options.debug, - stderr_logging=not options.fork) +def main(): + """Main function. - utils.WritePidFile(constants.RAPI_PID) - try: - mainloop = daemon.Mainloop() - server = RemoteApiHttpServer(mainloop, "", options.port, - ssl_params=ssl_params, ssl_verify_peer=False, - request_executor_class= - JsonErrorRequestExecutor) - server.Start() - try: - mainloop.Run() - finally: - server.Stop() - finally: - utils.RemovePidFile(constants.RAPI_PID) + """ + parser = optparse.OptionParser(description="Ganeti Remote API", + usage="%prog [-f] [-d] [-p port] [-b ADDRESS]", + version="%%prog (ganeti) %s" % constants.RAPI_VERSION) + + dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS] + dirs.append((constants.LOG_OS_DIR, 0750)) + daemon.GenericMain(constants.RAPI, parser, dirs, CheckRAPI, ExecRAPI) if __name__ == '__main__': diff --git a/daemons/ganeti-watcher b/daemons/ganeti-watcher index 2749de63f77a0a2ae6c99ed0e3b13642a0e9ba5d..1a3ed1c00d191c97ecee6bb13efb4b1cc0bdd03f 100755 --- a/daemons/ganeti-watcher +++ b/daemons/ganeti-watcher @@ -478,9 +478,8 @@ def main(): update_file = False try: - # on master or not, try to start the node dameon (use _PID but is - # the same as daemon name) - EnsureDaemon(constants.NODED_PID) + # on master or not, try to start the node dameon + EnsureDaemon(constants.NODED) notepad = WatcherState() try: @@ -500,8 +499,8 @@ def main(): # else retry the connection client = cli.GetClient() - # we are on master now (use _PID but is the same as daemon name) - EnsureDaemon(constants.RAPI_PID) + # we are on master now + EnsureDaemon(constants.RAPI) try: watcher = Watcher(options, notepad) diff --git a/devel/review b/devel/review new file mode 100755 index 0000000000000000000000000000000000000000..febf80dfb53236ca659ed385bb081265c442d5ed --- /dev/null +++ b/devel/review @@ -0,0 +1,186 @@ +#!/bin/bash + +# Copyright (C) 2009 Google Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA +# 02110-1301, USA. + +# To set user mappings, use this command: +# git config gnt-review.johndoe 'John Doe <johndoe@domain.tld>' + +set -e + +# Get absolute path to myself +me_plain="$0" +me=$(readlink -f "$me_plain") + +add_reviewed_by() { + local msgfile="$1" + + grep -q '^Reviewed-by: ' "$msgfile" && return + + perl -i -e ' + my $sob = 0; + while (<>) { + if ($sob == 0 and m/^Signed-off-by:/) { + $sob = 1; + + } elsif ($sob == 1 and not m/^Signed-off-by:/) { + print "Reviewed-by: \n"; + $sob = -1; + } + + print; + } + + if ($sob == 1) { + print "Reviewed-by: \n"; + } + ' "$msgfile" +} + +replace_users() { + local msgfile="$1" + + perl -i -e ' + sub map_username { + my ($name) = @_; + + return $name unless $name; + + my @cmd = ("git", "config", "--get", "gnt-review.$name"); + + open(my $fh, "-|", @cmd) or die "Command \"@cmd\" failed: $!"; + my $output = do { local $/ = undef; <$fh> }; + close($fh); + + if ($? == 0) { + chomp $output; + $output =~ s/\s+/ /; + return $output; + } + + return $name; + } + + while (<>) { + if (m/^Reviewed-by:(.*)$/) { + my @names = grep { + # Ignore empty entries + !/^$/ + } map { + # Normalize whitespace + $_ =~ s/(^\s+|\s+$)//g; + $_ =~ s/\s+/ /g; + + # Map names + $_ = map_username($_); + + $_; + } split(m/,/, $1); + + foreach (sort @names) { + print "Reviewed-by: $_\n"; + } + } else { + print; + } + } + ' "$msgfile" + + if ! grep -q '^Reviewed-by: ' "$msgfile" + then + echo 'Missing Reviewed-by: line' >&2 + sleep 1 + return 1 + fi + + return 0 +} + +run_editor() { + local filename="$1" + local editor=${EDITOR:-vi} + local args + + case "$(basename "$editor")" in + vi* | *vim) + # Start edit mode at Reviewed-by: line + args='+/^Reviewed-by: +nohlsearch +startinsert!' + ;; + *) + args= + ;; + esac + + $editor $args "$filename" +} + +commit_editor() { + local msgfile="$1" + + local tmpf=$(mktemp) + trap "rm -f $tmpf" EXIT + + cp "$msgfile" "$tmpf" + + while : + do + add_reviewed_by "$tmpf" + + run_editor "$tmpf" + + replace_users "$tmpf" && break + done + + cp "$tmpf" "$msgfile" +} + +copy_commit() { + local rev="$1" target_branch="$2" + + echo "Copying commit $rev ..." + + git cherry-pick -n "$rev" + GIT_EDITOR="$me --commit-editor \"\$@\"" git commit -c "$rev" -s +} + +main() { + local range="$1" target_branch="$2" + + if [[ -z "$target_branch" || "$range" != *..* ]] + then + echo "Usage: $me_plain <from..to> <target-branch>" >&2 + exit 1 + fi + + git checkout "$target_branch" + local old_head=$(git rev-parse HEAD) + + for rev in $(git rev-list --reverse "$range") + do + copy_commit "$rev" + done + + git log "$old_head..$target_branch" +} + +if [[ "$1" == --commit-editor ]] +then + shift + commit_editor "$@" +else + main "$@" +fi diff --git a/doc/design-2.1.rst b/doc/design-2.1.rst index a605b186f519c14d4abec613d876579725baa22f..4e47b44a20d937e029bfc01038b2554225ff58ea 100644 --- a/doc/design-2.1.rst +++ b/doc/design-2.1.rst @@ -37,9 +37,107 @@ As for 2.0 we divide the 2.1 design into three areas: Core changes ------------ +Storage units modelling +~~~~~~~~~~~~~~~~~~~~~~~ + +Currently, Ganeti has a good model of the block devices for instances +(e.g. LVM logical volumes, files, DRBD devices, etc.) but none of the +storage pools that are providing the space for these front-end +devices. For example, there are hardcoded inter-node RPC calls for +volume group listing, file storage creation/deletion, etc. + +The storage units framework will implement a generic handling for all +kinds of storage backends: + +- LVM physical volumes +- LVM volume groups +- File-based storage directories +- any other future storage method + +There will be a generic list of methods that each storage unit type +will provide, like: + +- list of storage units of this type +- check status of the storage unit + +Additionally, there will be specific methods for each method, for example: + +- enable/disable allocations on a specific PV +- file storage directory creation/deletion +- VG consistency fixing + +This will allow a much better modeling and unification of the various +RPC calls related to backend storage pool in the future. Ganeti 2.1 is +intended to add the basics of the framework, and not necessarilly move +all the curent VG/FileBased operations to it. + +Note that while we model both LVM PVs and LVM VGs, the framework will +**not** model any relationship between the different types. In other +words, we don't model neither inheritances nor stacking, since this is +too complex for our needs. While a ``vgreduce`` operation on a LVM VG +could actually remove a PV from it, this will not be handled at the +framework level, but at individual operation level. The goal is that +this is a lightweight framework, for abstracting the different storage +operation, and not for modelling the storage hierarchy. + Feature changes --------------- +Ganeti Confd +~~~~~~~~~~~~ + +Current State and shortcomings +++++++++++++++++++++++++++++++ +In Ganeti 2.0 all nodes are equal, but some are more equal than others. In +particular they are divided between "master", "master candidates" and "normal". +(Moreover they can be offline or drained, but this is not important for the +current discussion). In general the whole configuration is only replicated to +master candidates, and some partial information is spread to all nodes via +ssconf. + +This change was done so that the most frequent Ganeti operations didn't need to +contact all nodes, and so clusters could become bigger. If we want more +information to be available on all nodes, we need to add more ssconf values, +which is counter-balancing the change, or to talk with the master node, which +is not designed to happen now, and requires its availability. + +Information such as the instance->primary_node mapping will be needed on all +nodes, and we also want to make sure services external to the cluster can query +this information as well. This information must be available at all times, so +we can't query it through RAPI, which would be a single point of failure, as +it's only available on the master. + + +Proposed changes +++++++++++++++++ + +In order to allow fast and highly available access read-only to some +configuration values, we'll create a new ganeti-confd daemon, which will run on +master candidates. This daemon will talk via UDP, and authenticate messages +using HMAC with a cluster-wide shared key. + +An interested client can query a value by making a request to a subset of the +cluster master candidates. It will then wait to get a few responses, and use +the one with the highest configuration serial number (which will be always +included in the answer). If some candidates are stale, or we are in the middle +of a configuration update, various master candidates may return different +values, and this should make sure the most recent information is used. + +In order to prevent replay attacks queries will contain the current unix +timestamp according to the client, and the server will verify that its +timestamp is in the same 5 minutes range (this requires synchronized clocks, +which is a good idea anyway). Queries will also contain a "salt" which they +expect the answers to be sent with, and clients are supposed to accept only +answers which contain salt generated by them. + +The configuration daemon will be able to answer simple queries such as: +- master candidates list +- master node +- offline nodes +- instance list +- instance primary nodes + + Redistribute Config ~~~~~~~~~~~~~~~~~~~ @@ -187,6 +285,65 @@ handle both cases. The default kvm vif script will be changed to do so. (Xen doesn't have a ganeti provided script, so nothing will be done for that hypervisor) + +Automated disk repairs infrastructure +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Replacing defective disks in an automated fashion is quite difficult with the +current version of Ganeti. These changes will introduce additional +functionality and interfaces to simplify automating disk replacements on a +Ganeti node. + +Fix node volume group ++++++++++++++++++++++ + +This is the most difficult addition, as it can lead to dataloss if it's not +properly safeguarded. + +The operation must be done only when all the other nodes that have instances in +common with the target node are fine, i.e. this is the only node with problems, +and also we have to double-check that all instances on this node have at least +a good copy of the data. + +This might mean that we have to enhance the GetMirrorStatus calls, and +introduce and a smarter version that can tell us more about the status of an +instance. + +Stop allocation on a given PV ++++++++++++++++++++++++++++++ + +This is somewhat simple. First we need a "list PVs" opcode (and its associated +logical unit) and then a set PV status opcode/LU. These in combination should +allow both checking and changing the disk/PV status. + +Instance disk status +++++++++++++++++++++ + +This new opcode or opcode change must list the instance-disk-index and node +combinations of the instance together with their status. This will allow +determining what part of the instance is broken (if any). + +Repair instance ++++++++++++++++ + +This new opcode/LU/RAPI call will run ``replace-disks -p`` as needed, in order +to fix the instance status. It only affects primary instances; secondaries can +just be moved away. + +Migrate node +++++++++++++ + +This new opcode/LU/RAPI call will take over the current ``gnt-node migrate`` +code and run migrate for all instances on the node. + +Evacuate node +++++++++++++++ + +This new opcode/LU/RAPI call will take over the current ``gnt-node evacuate`` +code and run replace-secondary with an iallocator script for all instances on +the node. + + External interface changes -------------------------- diff --git a/doc/examples/hooks/ethers b/doc/examples/hooks/ethers new file mode 100755 index 0000000000000000000000000000000000000000..35a66b0f8179c5ff10b5910a4e79423245f0e206 --- /dev/null +++ b/doc/examples/hooks/ethers @@ -0,0 +1,85 @@ +#!/bin/bash + +# Copyright (C) 2009 Google Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA +# 02110-1301, USA. + +# This is an example ganeti hook that writes the instance mac addresses in the +# node's /etc/ether file. It will pic up the first nic connected to the +# TARGET_BRIDGE bridge, and write it down with the syntax "MAC INSTANCE_NAME". + +# The hook will also send a HUP signal the daemon whose PID is in +# DAEMON_PID_FILE, so that it can load the new /etc/ethers file and use it. +# This has been tested in conjunction with dnsmasq's dhcp implementation. + +# It will also remove any other occurrences for the same instance in the +# aformentioned file. This hook supports the "instance-add", "instance-modify" +# "instance-remove", and "instance-mirror-replace" ganeti post hook paths. To +# install it add a symlink from those hooks' directories to where this file is +# installed (with a mode which permits execution). + +# TARGET_BRIDGE: We'll only add the first nic which gets connected to this +# bridge to /etc/ethers. +TARGET_BRIDGE="br0" +DAEMON_PID_FILE="/var/run/dnsmasq.pid" +LOCKFILE="/var/lock/ganeti_ethers.lock" + +hooks_path=$GANETI_HOOKS_PATH +[ -n "$hooks_path" ] || exit 1 +instance=$GANETI_INSTANCE_NAME +[ -n "$instance" ] || exit 1 +nic_count=$GANETI_INSTANCE_NIC_COUNT + +acquire_lockfile() { + if ! ( set -o noclobber; echo "$$" > $LOCKFILE) 2> /dev/null; then + logger -s "Cannot acquire lockfile for ethers update" + exit 1 + fi + trap "rm -f $LOCKFILE" EXIT +} + +update_ethers_from_new() { + chmod 644 /etc/ethers.new + mv /etc/ethers.new /etc/ethers + [ -f "$DAEMON_PID_FILE" ] && kill -HUP $(< $DAEMON_PID_FILE) +} + +if [ "$hooks_path" = "instance-add" -o \ + "$hooks_path" = "instance-modify" -o \ + "$hooks_path" = "instance-mirror-replace" ] +then + for i in $(seq 0 $((nic_count - 1)) ); do + bridge_var="GANETI_INSTANCE_NIC${i}_BRIDGE" + bridge=${!bridge_var} + if [ -n "$bridge" -a "$bridge" = "$TARGET_BRIDGE" ]; then + mac_var="GANETI_INSTANCE_NIC${i}_MAC" + mac=${!mac_var} + acquire_lockfile + cat /etc/ethers | awk -- "! /^([[:xdigit:]:]*)[[:blank:]]+$instance\>/; + END {print \"$mac\t$instance\"}" > /etc/ethers.new + update_ethers_from_new + break + fi + done +fi +if [ "$hooks_path" = "instance-remove" -o \ + \( "$hooks_path" = "instance-modify" -a "$nic_count" -eq 0 \) ]; then + acquire_lockfile + cat /etc/ethers | awk -- "! /^([[:xdigit:]:]*)[[:blank:]]+$instance\>/" \ + > /etc/ethers.new + update_ethers_from_new +fi + diff --git a/doc/hooks.rst b/doc/hooks.rst index b2f05ce5b00f9bf68dd219749e9813d4734b970e..ecd6c9e3690381348ca9d3d738381eda2490c3a3 100644 --- a/doc/hooks.rst +++ b/doc/hooks.rst @@ -145,6 +145,16 @@ Changes a node's parameters. :pre-execution: master node, the target node :post-execution: master node, the target node +OP_NODE_EVACUATE +++++++++++++++++ + +Relocate secondary instances from a node. + +:directory: node-evacuate +:env. vars: NEW_SECONDARY, NODE_NAME +:pre-execution: master node, target node +:post-execution: master node, target node + Instance operations ~~~~~~~~~~~~~~~~~~~ diff --git a/doc/rapi.rst b/doc/rapi.rst index a913b3215871724a6293f77f9fd836ce1e94ac87..6ec6c6d470b0618fa2b076e2060f5274db94c8ea 100644 --- a/doc/rapi.rst +++ b/doc/rapi.rst @@ -464,6 +464,36 @@ Example:: ... ] +``/2/nodes/[node_name]/evacuate`` ++++++++++++++++++++++++++++++++++ + +Evacuates all secondary instances off a node. + +It supports the following commands: ``POST``. + +``POST`` +~~~~~~~~ + +To evacuate a node, either one of the ``iallocator`` or ``remote_node`` +parameters must be passed: + + evacuate?iallocator=[iallocator] + evacuate?remote_node=[nodeX.example.com] + +``/2/nodes/[node_name]/migrate`` ++++++++++++++++++++++++++++++++++ + +Migrates all primary instances from a node. + +It supports the following commands: ``POST``. + +``POST`` +~~~~~~~~ + +No parameters are required, but ``live`` can be set to a boolean value. + + migrate?live=[0|1] + ``/2/nodes/[node_name]/role`` +++++++++++++++++++++++++++++ diff --git a/lib/backend.py b/lib/backend.py index e7c4787013fb7db348f3bd3b122788b26c07740a..966f601a7f9254f4c2d760d229d254c8f38ac46d 100644 --- a/lib/backend.py +++ b/lib/backend.py @@ -19,7 +19,12 @@ # 02110-1301, USA. -"""Functions used by the node daemon""" +"""Functions used by the node daemon + +@var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in + the L{UploadFile} function + +""" import os @@ -145,6 +150,32 @@ def _CleanDirectory(path, exclude=None): utils.RemoveFile(full_name) +def _BuildUploadFileList(): + """Build the list of allowed upload files. + + This is abstracted so that it's built only once at module import time. + + """ + allowed_files = set([ + constants.CLUSTER_CONF_FILE, + constants.ETC_HOSTS, + constants.SSH_KNOWN_HOSTS_FILE, + constants.VNC_PASSWORD_FILE, + constants.RAPI_CERT_FILE, + constants.RAPI_USERS_FILE, + constants.HMAC_CLUSTER_KEY, + ]) + + for hv_name in constants.HYPER_TYPES: + hv_class = hypervisor.GetHypervisorClass(hv_name) + allowed_files.update(hv_class.GetAncillaryFiles()) + + return frozenset(allowed_files) + + +_ALLOWED_UPLOAD_FILES = _BuildUploadFileList() + + def JobQueuePurge(): """Removes job queue files and archived jobs. @@ -267,7 +298,7 @@ def StopMaster(stop_daemons): if stop_daemons: # stop/kill the rapi and the master daemon - for daemon in constants.RAPI_PID, constants.MASTERD_PID: + for daemon in constants.RAPI, constants.MASTERD: utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon))) @@ -445,7 +476,7 @@ def VerifyNode(what, cluster_name): tmp[my_name] = ("Can't find my own primary/secondary IP" " in the node list") else: - port = utils.GetNodeDaemonPort() + port = utils.GetDaemonPort(constants.NODED) for name, pip, sip in what[constants.NV_NODENETTEST]: fail = [] if not utils.TcpPing(pip, port, source=my_pip): @@ -1425,20 +1456,7 @@ def UploadFile(file_name, data, mode, uid, gid, atime, mtime): if not os.path.isabs(file_name): _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name) - allowed_files = set([ - constants.CLUSTER_CONF_FILE, - constants.ETC_HOSTS, - constants.SSH_KNOWN_HOSTS_FILE, - constants.VNC_PASSWORD_FILE, - constants.RAPI_CERT_FILE, - constants.RAPI_USERS_FILE, - ]) - - for hv_name in constants.HYPER_TYPES: - hv_class = hypervisor.GetHypervisor(hv_name) - allowed_files.update(hv_class.GetAncillaryFiles()) - - if file_name not in allowed_files: + if file_name not in _ALLOWED_UPLOAD_FILES: _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'", file_name) @@ -2262,7 +2280,7 @@ def DemoteFromMC(): master, myself = ssconf.GetMasterAndMyself() if master == myself: _Fail("ssconf status shows I'm the master node, will not demote") - pid_file = utils.DaemonPidFileName(constants.MASTERD_PID) + pid_file = utils.DaemonPidFileName(constants.MASTERD) if utils.IsProcessAlive(utils.ReadPidFile(pid_file)): _Fail("The master daemon is running, will not demote") try: diff --git a/lib/bootstrap.py b/lib/bootstrap.py index 2b34f4111bc71c3be7aa6847544cb7e258147a80..06fd29258f57497679cb934530ee3503ae092943 100644 --- a/lib/bootstrap.py +++ b/lib/bootstrap.py @@ -116,6 +116,11 @@ def _InitGanetiServerSetup(): if not os.path.exists(constants.RAPI_CERT_FILE): _GenerateSelfSignedSslCert(constants.RAPI_CERT_FILE) + if not os.path.exists(constants.HMAC_CLUSTER_KEY): + utils.WriteFile(constants.HMAC_CLUSTER_KEY, + data=utils.GenerateSecret(), + mode=0400) + result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"]) if result.failed: diff --git a/lib/cli.py b/lib/cli.py index f9a9628e0e14a3a0961d92d00da924e618a1cd29..fd329695f742895b008f9a309ed2e5a7af0d78e9 100644 --- a/lib/cli.py +++ b/lib/cli.py @@ -766,8 +766,6 @@ def GenericMain(commands, override=None, aliases=None): utils.SetupLogging(constants.LOG_COMMANDS, debug=options.debug, stderr_logging=True, program=binary) - utils.debug = options.debug - if old_cmdline: logging.info("run with arguments '%s'", old_cmdline) else: @@ -1035,7 +1033,6 @@ class JobExecutor(object): """ self.queue.append((name, ops)) - def SubmitPending(self): """Submit all pending jobs. diff --git a/lib/cmdlib.py b/lib/cmdlib.py index 7438316794a37e3b7cc9edc9647356c6f26f8039..c6101c53e814b541d6037f75000e54b72e00a5d9 100644 --- a/lib/cmdlib.py +++ b/lib/cmdlib.py @@ -47,8 +47,8 @@ class LogicalUnit(object): Subclasses must follow these rules: - implement ExpandNames - - implement CheckPrereq - - implement Exec + - implement CheckPrereq (except when tasklets are used) + - implement Exec (except when tasklets are used) - implement BuildHooksEnv - redefine HPATH and HTYPE - optionally redefine their run requirements: @@ -89,14 +89,19 @@ class LogicalUnit(object): # logging self.LogWarning = processor.LogWarning self.LogInfo = processor.LogInfo + self.LogStep = processor.LogStep # support for dry-run self.dry_run_result = None + # Tasklets + self.tasklets = None + for attr_name in self._OP_REQP: attr_val = getattr(op, attr_name, None) if attr_val is None: raise errors.OpPrereqError("Required parameter '%s' missing" % attr_name) + self.CheckArguments() def __GetSSH(self): @@ -148,6 +153,10 @@ class LogicalUnit(object): level you can modify self.share_locks, setting a true value (usually 1) for that level. By default locks are not shared. + This function can also define a list of tasklets, which then will be + executed in order instead of the usual LU-level CheckPrereq and Exec + functions, if those are not defined by the LU. + Examples:: # Acquire all nodes and one instance @@ -204,7 +213,13 @@ class LogicalUnit(object): their canonical form if it hasn't been done by ExpandNames before. """ - raise NotImplementedError + if self.tasklets is not None: + for (idx, tl) in enumerate(self.tasklets): + logging.debug("Checking prerequisites for tasklet %s/%s", + idx + 1, len(self.tasklets)) + tl.CheckPrereq() + else: + raise NotImplementedError def Exec(self, feedback_fn): """Execute the LU. @@ -214,7 +229,12 @@ class LogicalUnit(object): code, or expected. """ - raise NotImplementedError + if self.tasklets is not None: + for (idx, tl) in enumerate(self.tasklets): + logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets)) + tl.Exec(feedback_fn) + else: + raise NotImplementedError def BuildHooksEnv(self): """Build hooks environment for this LU. @@ -338,6 +358,52 @@ class NoHooksLU(LogicalUnit): HTYPE = None +class Tasklet: + """Tasklet base class. + + Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or + they can mix legacy code with tasklets. Locking needs to be done in the LU, + tasklets know nothing about locks. + + Subclasses must follow these rules: + - Implement CheckPrereq + - Implement Exec + + """ + def __init__(self, lu): + self.lu = lu + + # Shortcuts + self.cfg = lu.cfg + self.rpc = lu.rpc + + def CheckPrereq(self): + """Check prerequisites for this tasklets. + + This method should check whether the prerequisites for the execution of + this tasklet are fulfilled. It can do internode communication, but it + should be idempotent - no cluster or system changes are allowed. + + The method should raise errors.OpPrereqError in case something is not + fulfilled. Its return value is ignored. + + This method should also update all parameters to their canonical form if it + hasn't been done before. + + """ + raise NotImplementedError + + def Exec(self, feedback_fn): + """Execute the tasklet. + + This method should implement the actual work. It should raise + errors.OpExecError for failures that are somewhat dealt with in code, or + expected. + + """ + raise NotImplementedError + + def _GetWantedNodes(lu, nodes): """Returns list of checked and expanded node names. @@ -643,6 +709,32 @@ def _CheckInstanceBridgesExist(lu, instance, node=None): _CheckNicsBridgesExist(lu, instance.nics, node) +def _GetNodePrimaryInstances(cfg, node_name): + """Returns primary instances on a node. + + """ + instances = [] + + for (_, inst) in cfg.GetAllInstancesInfo().iteritems(): + if node_name == inst.primary_node: + instances.append(inst) + + return instances + + +def _GetNodeSecondaryInstances(cfg, node_name): + """Returns secondary instances on a node. + + """ + instances = [] + + for (_, inst) in cfg.GetAllInstancesInfo().iteritems(): + if node_name in inst.secondary_nodes: + instances.append(inst) + + return instances + + class LUDestroyCluster(NoHooksLU): """Logical unit for destroying the cluster. @@ -1641,6 +1733,7 @@ def _RedistributeAncillaryFiles(lu, additional_nodes=None): constants.SSH_KNOWN_HOSTS_FILE, constants.RAPI_CERT_FILE, constants.RAPI_USERS_FILE, + constants.HMAC_CLUSTER_KEY, ]) enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors @@ -2213,6 +2306,106 @@ class LUQueryNodeVolumes(NoHooksLU): return output +class LUQueryNodeStorage(NoHooksLU): + """Logical unit for getting information on storage units on node(s). + + """ + _OP_REQP = ["nodes", "storage_type", "output_fields"] + REQ_BGL = False + _FIELDS_STATIC = utils.FieldSet("node") + + def ExpandNames(self): + storage_type = self.op.storage_type + + if storage_type not in constants.VALID_STORAGE_FIELDS: + raise errors.OpPrereqError("Unknown storage type: %s" % storage_type) + + dynamic_fields = constants.VALID_STORAGE_FIELDS[storage_type] + + _CheckOutputFields(static=self._FIELDS_STATIC, + dynamic=utils.FieldSet(*dynamic_fields), + selected=self.op.output_fields) + + self.needed_locks = {} + self.share_locks[locking.LEVEL_NODE] = 1 + + if self.op.nodes: + self.needed_locks[locking.LEVEL_NODE] = \ + _GetWantedNodes(self, self.op.nodes) + else: + self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET + + def CheckPrereq(self): + """Check prerequisites. + + This checks that the fields required are valid output fields. + + """ + self.op.name = getattr(self.op, "name", None) + + self.nodes = self.acquired_locks[locking.LEVEL_NODE] + + def Exec(self, feedback_fn): + """Computes the list of nodes and their attributes. + + """ + # Special case for file storage + if self.op.storage_type == constants.ST_FILE: + st_args = [self.cfg.GetFileStorageDir()] + else: + st_args = [] + + # Always get name to sort by + if constants.SF_NAME in self.op.output_fields: + fields = self.op.output_fields[:] + else: + fields = [constants.SF_NAME] + self.op.output_fields + + # Never ask for node as it's only known to the LU + while "node" in fields: + fields.remove("node") + + field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)]) + name_idx = field_idx[constants.SF_NAME] + + data = self.rpc.call_storage_list(self.nodes, + self.op.storage_type, st_args, + self.op.name, fields) + + result = [] + + for node in utils.NiceSort(self.nodes): + nresult = data[node] + if nresult.offline: + continue + + msg = nresult.fail_msg + if msg: + self.LogWarning("Can't get storage data from node %s: %s", node, msg) + continue + + rows = dict([(row[name_idx], row) for row in nresult.payload]) + + for name in utils.NiceSort(rows.keys()): + row = rows[name] + + out = [] + + for field in self.op.output_fields: + if field == "node": + val = node + elif field in field_idx: + val = row[field_idx[field]] + else: + raise errors.ParameterError(field) + + out.append(val) + + result.append(out) + + return result + + class LUAddNode(LogicalUnit): """Logical unit for adding node to the cluster. @@ -3835,9 +4028,14 @@ class LUMigrateInstance(LogicalUnit): def ExpandNames(self): self._ExpandAndLockInstance() + self.needed_locks[locking.LEVEL_NODE] = [] self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE + self._migrater = TLMigrateInstance(self, self.op.instance_name, + self.op.live, self.op.cleanup) + self.tasklets = [self._migrater] + def DeclareLocks(self, level): if level == locking.LEVEL_NODE: self._LockInstancesNodes() @@ -3848,12 +4046,80 @@ class LUMigrateInstance(LogicalUnit): This runs on master, primary and secondary nodes of the instance. """ - env = _BuildInstanceHookEnvByObject(self, self.instance) + instance = self._migrater.instance + env = _BuildInstanceHookEnvByObject(self, instance) env["MIGRATE_LIVE"] = self.op.live env["MIGRATE_CLEANUP"] = self.op.cleanup - nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes) + nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes) return env, nl, nl + +class LUMigrateNode(LogicalUnit): + """Migrate all instances from a node. + + """ + HPATH = "node-migrate" + HTYPE = constants.HTYPE_NODE + _OP_REQP = ["node_name", "live"] + REQ_BGL = False + + def ExpandNames(self): + self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name) + if self.op.node_name is None: + raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name) + + self.needed_locks = { + locking.LEVEL_NODE: [self.op.node_name], + } + + self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND + + # Create tasklets for migrating instances for all instances on this node + names = [] + tasklets = [] + + for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name): + logging.debug("Migrating instance %s", inst.name) + names.append(inst.name) + + tasklets.append(TLMigrateInstance(self, inst.name, self.op.live, False)) + + self.tasklets = tasklets + + # Declare instance locks + self.needed_locks[locking.LEVEL_INSTANCE] = names + + def DeclareLocks(self, level): + if level == locking.LEVEL_NODE: + self._LockInstancesNodes() + + def BuildHooksEnv(self): + """Build hooks env. + + This runs on the master, the primary and all the secondaries. + + """ + env = { + "NODE_NAME": self.op.node_name, + } + + nl = [self.cfg.GetMasterNode()] + + return (env, nl, nl) + + +class TLMigrateInstance(Tasklet): + def __init__(self, lu, instance_name, live, cleanup): + """Initializes this class. + + """ + Tasklet.__init__(self, lu) + + # Parameters + self.instance_name = instance_name + self.live = live + self.cleanup = cleanup + def CheckPrereq(self): """Check prerequisites. @@ -3861,10 +4127,10 @@ class LUMigrateInstance(LogicalUnit): """ instance = self.cfg.GetInstanceInfo( - self.cfg.ExpandInstanceName(self.op.instance_name)) + self.cfg.ExpandInstanceName(self.instance_name)) if instance is None: raise errors.OpPrereqError("Instance '%s' not known" % - self.op.instance_name) + self.instance_name) if instance.disk_template != constants.DT_DRBD8: raise errors.OpPrereqError("Instance's disk layout is not" @@ -3886,7 +4152,7 @@ class LUMigrateInstance(LogicalUnit): # check bridge existance _CheckInstanceBridgesExist(self, instance, node=target_node) - if not self.op.cleanup: + if not self.cleanup: _CheckNodeNotDrained(self, target_node) result = self.rpc.call_instance_migratable(instance.primary_node, instance) @@ -4033,10 +4299,10 @@ class LUMigrateInstance(LogicalUnit): self._GoReconnect(False) self._WaitUntilSync() except errors.OpExecError, err: - self.LogWarning("Migration failed and I can't reconnect the" - " drives: error '%s'\n" - "Please look and recover the instance status" % - str(err)) + self.lu.LogWarning("Migration failed and I can't reconnect the" + " drives: error '%s'\n" + "Please look and recover the instance status" % + str(err)) def _AbortMigration(self): """Call the hypervisor code to abort a started migration. @@ -4116,7 +4382,7 @@ class LUMigrateInstance(LogicalUnit): time.sleep(10) result = self.rpc.call_instance_migrate(source_node, instance, self.nodes_ip[target_node], - self.op.live) + self.live) msg = result.fail_msg if msg: logging.error("Instance migration failed, trying to revert" @@ -4154,6 +4420,8 @@ class LUMigrateInstance(LogicalUnit): """Perform the migration. """ + feedback_fn("Migrating instance %s" % self.instance.name) + self.feedback_fn = feedback_fn self.source_node = self.instance.primary_node @@ -4163,7 +4431,8 @@ class LUMigrateInstance(LogicalUnit): self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip, self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip, } - if self.op.cleanup: + + if self.cleanup: return self._ExecCleanup() else: return self._ExecMigration() @@ -4685,7 +4954,7 @@ class LUCreateInstance(LogicalUnit): """ nics = [n.ToDict() for n in self.nics] - ial = IAllocator(self, + ial = IAllocator(self.cfg, self.rpc, mode=constants.IALLOCATOR_MODE_ALLOC, name=self.op.instance_name, disk_template=self.op.disk_template, @@ -5120,43 +5389,40 @@ class LUReplaceDisks(LogicalUnit): if not hasattr(self.op, "iallocator"): self.op.iallocator = None - # check for valid parameter combination - cnt = [self.op.remote_node, self.op.iallocator].count(None) - if self.op.mode == constants.REPLACE_DISK_CHG: - if cnt == 2: - raise errors.OpPrereqError("When changing the secondary either an" - " iallocator script must be used or the" - " new node given") - elif cnt == 0: - raise errors.OpPrereqError("Give either the iallocator or the new" - " secondary, not both") - else: # not replacing the secondary - if cnt != 2: - raise errors.OpPrereqError("The iallocator and new node options can" - " be used only when changing the" - " secondary node") + TLReplaceDisks.CheckArguments(self.op.mode, self.op.remote_node, + self.op.iallocator) def ExpandNames(self): self._ExpandAndLockInstance() if self.op.iallocator is not None: self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET + elif self.op.remote_node is not None: remote_node = self.cfg.ExpandNodeName(self.op.remote_node) if remote_node is None: raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node) + self.op.remote_node = remote_node + # Warning: do not remove the locking of the new secondary here # unless DRBD8.AddChildren is changed to work in parallel; # currently it doesn't since parallel invocations of # FindUnusedMinor will conflict self.needed_locks[locking.LEVEL_NODE] = [remote_node] self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND + else: self.needed_locks[locking.LEVEL_NODE] = [] self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE + self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode, + self.op.iallocator, self.op.remote_node, + self.op.disks) + + self.tasklets = [self.replacer] + def DeclareLocks(self, level): # If we're not already locking all nodes in the set we have to declare the # instance's primary/secondary nodes. @@ -5164,213 +5430,463 @@ class LUReplaceDisks(LogicalUnit): self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET): self._LockInstancesNodes() - def _RunAllocator(self): - """Compute a new secondary node using an IAllocator. - - """ - ial = IAllocator(self, - mode=constants.IALLOCATOR_MODE_RELOC, - name=self.op.instance_name, - relocate_from=[self.sec_node]) - - ial.Run(self.op.iallocator) - - if not ial.success: - raise errors.OpPrereqError("Can't compute nodes using" - " iallocator '%s': %s" % (self.op.iallocator, - ial.info)) - if len(ial.nodes) != ial.required_nodes: - raise errors.OpPrereqError("iallocator '%s' returned invalid number" - " of nodes (%s), required %s" % - (len(ial.nodes), ial.required_nodes)) - self.op.remote_node = ial.nodes[0] - self.LogInfo("Selected new secondary for the instance: %s", - self.op.remote_node) - def BuildHooksEnv(self): """Build hooks env. This runs on the master, the primary and all the secondaries. """ + instance = self.replacer.instance env = { "MODE": self.op.mode, "NEW_SECONDARY": self.op.remote_node, - "OLD_SECONDARY": self.instance.secondary_nodes[0], + "OLD_SECONDARY": instance.secondary_nodes[0], } - env.update(_BuildInstanceHookEnvByObject(self, self.instance)) + env.update(_BuildInstanceHookEnvByObject(self, instance)) nl = [ self.cfg.GetMasterNode(), - self.instance.primary_node, + instance.primary_node, ] if self.op.remote_node is not None: nl.append(self.op.remote_node) return env, nl, nl + +class LUEvacuateNode(LogicalUnit): + """Relocate the secondary instances from a node. + + """ + HPATH = "node-evacuate" + HTYPE = constants.HTYPE_NODE + _OP_REQP = ["node_name"] + REQ_BGL = False + + def CheckArguments(self): + if not hasattr(self.op, "remote_node"): + self.op.remote_node = None + if not hasattr(self.op, "iallocator"): + self.op.iallocator = None + + TLReplaceDisks.CheckArguments(constants.REPLACE_DISK_CHG, + self.op.remote_node, + self.op.iallocator) + + def ExpandNames(self): + self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name) + if self.op.node_name is None: + raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name) + + self.needed_locks = {} + + # Declare node locks + if self.op.iallocator is not None: + self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET + + elif self.op.remote_node is not None: + remote_node = self.cfg.ExpandNodeName(self.op.remote_node) + if remote_node is None: + raise errors.OpPrereqError("Node '%s' not known" % + self.op.remote_node) + + self.op.remote_node = remote_node + + # Warning: do not remove the locking of the new secondary here + # unless DRBD8.AddChildren is changed to work in parallel; + # currently it doesn't since parallel invocations of + # FindUnusedMinor will conflict + self.needed_locks[locking.LEVEL_NODE] = [remote_node] + self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND + + else: + raise errors.OpPrereqError("Invalid parameters") + + # Create tasklets for replacing disks for all secondary instances on this + # node + names = [] + tasklets = [] + + for inst in _GetNodeSecondaryInstances(self.cfg, self.op.node_name): + logging.debug("Replacing disks for instance %s", inst.name) + names.append(inst.name) + + replacer = TLReplaceDisks(self, inst.name, constants.REPLACE_DISK_CHG, + self.op.iallocator, self.op.remote_node, []) + tasklets.append(replacer) + + self.tasklets = tasklets + self.instance_names = names + + # Declare instance locks + self.needed_locks[locking.LEVEL_INSTANCE] = self.instance_names + + def DeclareLocks(self, level): + # If we're not already locking all nodes in the set we have to declare the + # instance's primary/secondary nodes. + if (level == locking.LEVEL_NODE and + self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET): + self._LockInstancesNodes() + + def BuildHooksEnv(self): + """Build hooks env. + + This runs on the master, the primary and all the secondaries. + + """ + env = { + "NODE_NAME": self.op.node_name, + } + + nl = [self.cfg.GetMasterNode()] + + if self.op.remote_node is not None: + env["NEW_SECONDARY"] = self.op.remote_node + nl.append(self.op.remote_node) + + return (env, nl, nl) + + +class TLReplaceDisks(Tasklet): + """Replaces disks for an instance. + + Note: Locking is not within the scope of this class. + + """ + def __init__(self, lu, instance_name, mode, iallocator_name, remote_node, + disks): + """Initializes this class. + + """ + Tasklet.__init__(self, lu) + + # Parameters + self.instance_name = instance_name + self.mode = mode + self.iallocator_name = iallocator_name + self.remote_node = remote_node + self.disks = disks + + # Runtime data + self.instance = None + self.new_node = None + self.target_node = None + self.other_node = None + self.remote_node_info = None + self.node_secondary_ip = None + + @staticmethod + def CheckArguments(mode, remote_node, iallocator): + """Helper function for users of this class. + + """ + # check for valid parameter combination + cnt = [remote_node, iallocator].count(None) + if mode == constants.REPLACE_DISK_CHG: + if cnt == 2: + raise errors.OpPrereqError("When changing the secondary either an" + " iallocator script must be used or the" + " new node given") + elif cnt == 0: + raise errors.OpPrereqError("Give either the iallocator or the new" + " secondary, not both") + else: # not replacing the secondary + if cnt != 2: + raise errors.OpPrereqError("The iallocator and new node options can" + " be used only when changing the" + " secondary node") + + @staticmethod + def _RunAllocator(lu, iallocator_name, instance_name, relocate_from): + """Compute a new secondary node using an IAllocator. + + """ + ial = IAllocator(lu.cfg, lu.rpc, + mode=constants.IALLOCATOR_MODE_RELOC, + name=instance_name, + relocate_from=relocate_from) + + ial.Run(iallocator_name) + + if not ial.success: + raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':" + " %s" % (iallocator_name, ial.info)) + + if len(ial.nodes) != ial.required_nodes: + raise errors.OpPrereqError("iallocator '%s' returned invalid number" + " of nodes (%s), required %s" % + (len(ial.nodes), ial.required_nodes)) + + remote_node_name = ial.nodes[0] + + lu.LogInfo("Selected new secondary for instance '%s': %s", + instance_name, remote_node_name) + + return remote_node_name + def CheckPrereq(self): """Check prerequisites. This checks that the instance is in the cluster. """ - instance = self.cfg.GetInstanceInfo(self.op.instance_name) - assert instance is not None, \ - "Cannot retrieve locked instance %s" % self.op.instance_name - self.instance = instance + self.instance = self.cfg.GetInstanceInfo(self.instance_name) + assert self.instance is not None, \ + "Cannot retrieve locked instance %s" % self.instance_name - if instance.disk_template != constants.DT_DRBD8: + if self.instance.disk_template != constants.DT_DRBD8: raise errors.OpPrereqError("Can only run replace disks for DRBD8-based" " instances") - if len(instance.secondary_nodes) != 1: + if len(self.instance.secondary_nodes) != 1: raise errors.OpPrereqError("The instance has a strange layout," " expected one secondary but found %d" % - len(instance.secondary_nodes)) + len(self.instance.secondary_nodes)) - self.sec_node = instance.secondary_nodes[0] + secondary_node = self.instance.secondary_nodes[0] - if self.op.iallocator is not None: - self._RunAllocator() + if self.iallocator_name is None: + remote_node = self.remote_node + else: + remote_node = self._RunAllocator(self.lu, self.iallocator_name, + self.instance.name, secondary_node) - remote_node = self.op.remote_node if remote_node is not None: self.remote_node_info = self.cfg.GetNodeInfo(remote_node) assert self.remote_node_info is not None, \ "Cannot retrieve locked node %s" % remote_node else: self.remote_node_info = None - if remote_node == instance.primary_node: + + if remote_node == self.instance.primary_node: raise errors.OpPrereqError("The specified node is the primary node of" " the instance.") - elif remote_node == self.sec_node: + + if remote_node == secondary_node: raise errors.OpPrereqError("The specified node is already the" " secondary node of the instance.") - if self.op.mode == constants.REPLACE_DISK_PRI: - n1 = self.tgt_node = instance.primary_node - n2 = self.oth_node = self.sec_node - elif self.op.mode == constants.REPLACE_DISK_SEC: - n1 = self.tgt_node = self.sec_node - n2 = self.oth_node = instance.primary_node - elif self.op.mode == constants.REPLACE_DISK_CHG: - n1 = self.new_node = remote_node - n2 = self.oth_node = instance.primary_node - self.tgt_node = self.sec_node - _CheckNodeNotDrained(self, remote_node) - else: - raise errors.ProgrammerError("Unhandled disk replace mode") + if self.mode == constants.REPLACE_DISK_PRI: + self.target_node = self.instance.primary_node + self.other_node = secondary_node + check_nodes = [self.target_node, self.other_node] + + elif self.mode == constants.REPLACE_DISK_SEC: + self.target_node = secondary_node + self.other_node = self.instance.primary_node + check_nodes = [self.target_node, self.other_node] - _CheckNodeOnline(self, n1) - _CheckNodeOnline(self, n2) + elif self.mode == constants.REPLACE_DISK_CHG: + self.new_node = remote_node + self.other_node = self.instance.primary_node + self.target_node = secondary_node + check_nodes = [self.new_node, self.other_node] - if not self.op.disks: - self.op.disks = range(len(instance.disks)) + _CheckNodeNotDrained(self.lu, remote_node) - for disk_idx in self.op.disks: - instance.FindDisk(disk_idx) + else: + raise errors.ProgrammerError("Unhandled disk replace mode (%s)" % + self.mode) - def _ExecD8DiskOnly(self, feedback_fn): - """Replace a disk on the primary or secondary for dbrd8. + for node in check_nodes: + _CheckNodeOnline(self.lu, node) - The algorithm for replace is quite complicated: + # If not specified all disks should be replaced + if not self.disks: + self.disks = range(len(self.instance.disks)) - 1. for each disk to be replaced: + # Check whether disks are valid + for disk_idx in self.disks: + self.instance.FindDisk(disk_idx) - 1. create new LVs on the target node with unique names - 1. detach old LVs from the drbd device - 1. rename old LVs to name_replaced.<time_t> - 1. rename new LVs to old LVs - 1. attach the new LVs (with the old names now) to the drbd device + # Get secondary node IP addresses + node_2nd_ip = {} - 1. wait for sync across all devices + for node_name in [self.target_node, self.other_node, self.new_node]: + if node_name is not None: + node_2nd_ip[node_name] = self.cfg.GetNodeInfo(node_name).secondary_ip - 1. for each modified disk: + self.node_secondary_ip = node_2nd_ip - 1. remove old LVs (which have the name name_replaces.<time_t>) + def Exec(self, feedback_fn): + """Execute disk replacement. - Failures are not very well handled. + This dispatches the disk replacement to the appropriate handler. """ - steps_total = 6 - warning, info = (self.proc.LogWarning, self.proc.LogInfo) - instance = self.instance - iv_names = {} + feedback_fn("Replacing disks for %s" % self.instance.name) + + activate_disks = (not self.instance.admin_up) + + # Activate the instance disks if we're replacing them on a down instance + if activate_disks: + _StartInstanceDisks(self.lu, self.instance, True) + + try: + if self.mode == constants.REPLACE_DISK_CHG: + return self._ExecDrbd8Secondary() + else: + return self._ExecDrbd8DiskOnly() + + finally: + # Deactivate the instance disks if we're replacing them on a down instance + if activate_disks: + _SafeShutdownInstanceDisks(self.lu, self.instance) + + def _CheckVolumeGroup(self, nodes): + self.lu.LogInfo("Checking volume groups") + vgname = self.cfg.GetVGName() - # start of work - cfg = self.cfg - tgt_node = self.tgt_node - oth_node = self.oth_node - # Step: check device activation - self.proc.LogStep(1, steps_total, "check device existence") - info("checking volume groups") - my_vg = cfg.GetVGName() - results = self.rpc.call_vg_list([oth_node, tgt_node]) + # Make sure volume group exists on all involved nodes + results = self.rpc.call_vg_list(nodes) if not results: raise errors.OpExecError("Can't list volume groups on the nodes") - for node in oth_node, tgt_node: + + for node in nodes: res = results[node] res.Raise("Error checking node %s" % node) - if my_vg not in res.payload: - raise errors.OpExecError("Volume group '%s' not found on %s" % - (my_vg, node)) - for idx, dev in enumerate(instance.disks): - if idx not in self.op.disks: + if vgname not in res.payload: + raise errors.OpExecError("Volume group '%s' not found on node %s" % + (vgname, node)) + + def _CheckDisksExistence(self, nodes): + # Check disk existence + for idx, dev in enumerate(self.instance.disks): + if idx not in self.disks: continue - for node in tgt_node, oth_node: - info("checking disk/%d on %s" % (idx, node)) - cfg.SetDiskID(dev, node) + + for node in nodes: + self.lu.LogInfo("Checking disk/%d on %s" % (idx, node)) + self.cfg.SetDiskID(dev, node) + result = self.rpc.call_blockdev_find(node, dev) + msg = result.fail_msg - if not msg and not result.payload: - msg = "disk not found" - if msg: + if msg or not result.payload: + if not msg: + msg = "disk not found" raise errors.OpExecError("Can't find disk/%d on node %s: %s" % (idx, node, msg)) - # Step: check other node consistency - self.proc.LogStep(2, steps_total, "check peer consistency") - for idx, dev in enumerate(instance.disks): - if idx not in self.op.disks: + def _CheckDisksConsistency(self, node_name, on_primary, ldisk): + for idx, dev in enumerate(self.instance.disks): + if idx not in self.disks: continue - info("checking disk/%d consistency on %s" % (idx, oth_node)) - if not _CheckDiskConsistency(self, dev, oth_node, - oth_node==instance.primary_node): - raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe" - " to replace disks on this node (%s)" % - (oth_node, tgt_node)) - # Step: create new storage - self.proc.LogStep(3, steps_total, "allocate new storage") - for idx, dev in enumerate(instance.disks): - if idx not in self.op.disks: + self.lu.LogInfo("Checking disk/%d consistency on node %s" % + (idx, node_name)) + + if not _CheckDiskConsistency(self.lu, dev, node_name, on_primary, + ldisk=ldisk): + raise errors.OpExecError("Node %s has degraded storage, unsafe to" + " replace disks for instance %s" % + (node_name, self.instance.name)) + + def _CreateNewStorage(self, node_name): + vgname = self.cfg.GetVGName() + iv_names = {} + + for idx, dev in enumerate(self.instance.disks): + if idx not in self.disks: continue - size = dev.size - cfg.SetDiskID(dev, tgt_node) - lv_names = [".disk%d_%s" % (idx, suf) - for suf in ["data", "meta"]] - names = _GenerateUniqueNames(self, lv_names) - lv_data = objects.Disk(dev_type=constants.LD_LV, size=size, + + self.lu.LogInfo("Adding storage on %s for disk/%d" % (node_name, idx)) + + self.cfg.SetDiskID(dev, node_name) + + lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]] + names = _GenerateUniqueNames(self.lu, lv_names) + + lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size, logical_id=(vgname, names[0])) lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128, logical_id=(vgname, names[1])) + new_lvs = [lv_data, lv_meta] old_lvs = dev.children iv_names[dev.iv_name] = (dev, old_lvs, new_lvs) - info("creating new local storage on %s for %s" % - (tgt_node, dev.iv_name)) + # we pass force_create=True to force the LVM creation for new_lv in new_lvs: - _CreateBlockDev(self, tgt_node, instance, new_lv, True, - _GetInstanceInfoText(instance), False) + _CreateBlockDev(self.lu, node_name, self.instance, new_lv, True, + _GetInstanceInfoText(self.instance), False) + + return iv_names + + def _CheckDevices(self, node_name, iv_names): + for name, (dev, old_lvs, new_lvs) in iv_names.iteritems(): + self.cfg.SetDiskID(dev, node_name) + + result = self.rpc.call_blockdev_find(node_name, dev) + + msg = result.fail_msg + if msg or not result.payload: + if not msg: + msg = "disk not found" + raise errors.OpExecError("Can't find DRBD device %s: %s" % + (name, msg)) + + if result.payload[5]: + raise errors.OpExecError("DRBD device %s is degraded!" % name) + + def _RemoveOldStorage(self, node_name, iv_names): + for name, (dev, old_lvs, _) in iv_names.iteritems(): + self.lu.LogInfo("Remove logical volumes for %s" % name) + + for lv in old_lvs: + self.cfg.SetDiskID(lv, node_name) + + msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg + if msg: + self.lu.LogWarning("Can't remove old LV: %s" % msg, + hint="remove unused LVs manually") + + def _ExecDrbd8DiskOnly(self): + """Replace a disk on the primary or secondary for DRBD 8. + + The algorithm for replace is quite complicated: + + 1. for each disk to be replaced: + + 1. create new LVs on the target node with unique names + 1. detach old LVs from the drbd device + 1. rename old LVs to name_replaced.<time_t> + 1. rename new LVs to old LVs + 1. attach the new LVs (with the old names now) to the drbd device + + 1. wait for sync across all devices + + 1. for each modified disk: + + 1. remove old LVs (which have the name name_replaces.<time_t>) + + Failures are not very well handled. + + """ + steps_total = 6 + + # Step: check device activation + self.lu.LogStep(1, steps_total, "Check device existence") + self._CheckDisksExistence([self.other_node, self.target_node]) + self._CheckVolumeGroup([self.target_node, self.other_node]) + + # Step: check other node consistency + self.lu.LogStep(2, steps_total, "Check peer consistency") + self._CheckDisksConsistency(self.other_node, + self.other_node == self.instance.primary_node, + False) + + # Step: create new storage + self.lu.LogStep(3, steps_total, "Allocate new storage") + iv_names = self._CreateNewStorage(self.target_node) # Step: for each lv, detach+rename*2+attach - self.proc.LogStep(4, steps_total, "change drbd configuration") + self.lu.LogStep(4, steps_total, "Changing drbd configuration") for dev, old_lvs, new_lvs in iv_names.itervalues(): - info("detaching %s drbd from local storage" % dev.iv_name) - result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs) + self.lu.LogInfo("Detaching %s drbd from local storage" % dev.iv_name) + + result = self.rpc.call_blockdev_removechildren(self.target_node, dev, old_lvs) result.Raise("Can't detach drbd from local storage on node" - " %s for device %s" % (tgt_node, dev.iv_name)) + " %s for device %s" % (self.target_node, dev.iv_name)) #dev.children = [] #cfg.Update(instance) @@ -5384,81 +5900,66 @@ class LUReplaceDisks(LogicalUnit): temp_suffix = int(time.time()) ren_fn = lambda d, suff: (d.physical_id[0], d.physical_id[1] + "_replaced-%s" % suff) - # build the rename list based on what LVs exist on the node - rlist = [] + + # Build the rename list based on what LVs exist on the node + rename_old_to_new = [] for to_ren in old_lvs: - result = self.rpc.call_blockdev_find(tgt_node, to_ren) + result = self.rpc.call_blockdev_find(self.target_node, to_ren) if not result.fail_msg and result.payload: # device exists - rlist.append((to_ren, ren_fn(to_ren, temp_suffix))) + rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix))) - info("renaming the old LVs on the target node") - result = self.rpc.call_blockdev_rename(tgt_node, rlist) - result.Raise("Can't rename old LVs on node %s" % tgt_node) - # now we rename the new LVs to the old LVs - info("renaming the new LVs on the target node") - rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)] - result = self.rpc.call_blockdev_rename(tgt_node, rlist) - result.Raise("Can't rename new LVs on node %s" % tgt_node) + self.lu.LogInfo("Renaming the old LVs on the target node") + result = self.rpc.call_blockdev_rename(self.target_node, rename_old_to_new) + result.Raise("Can't rename old LVs on node %s" % self.target_node) + + # Now we rename the new LVs to the old LVs + self.lu.LogInfo("Renaming the new LVs on the target node") + rename_new_to_old = [(new, old.physical_id) + for old, new in zip(old_lvs, new_lvs)] + result = self.rpc.call_blockdev_rename(self.target_node, rename_new_to_old) + result.Raise("Can't rename new LVs on node %s" % self.target_node) for old, new in zip(old_lvs, new_lvs): new.logical_id = old.logical_id - cfg.SetDiskID(new, tgt_node) + self.cfg.SetDiskID(new, self.target_node) for disk in old_lvs: disk.logical_id = ren_fn(disk, temp_suffix) - cfg.SetDiskID(disk, tgt_node) + self.cfg.SetDiskID(disk, self.target_node) - # now that the new lvs have the old name, we can add them to the device - info("adding new mirror component on %s" % tgt_node) - result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs) + # Now that the new lvs have the old name, we can add them to the device + self.lu.LogInfo("Adding new mirror component on %s" % self.target_node) + result = self.rpc.call_blockdev_addchildren(self.target_node, dev, new_lvs) msg = result.fail_msg if msg: for new_lv in new_lvs: - msg2 = self.rpc.call_blockdev_remove(tgt_node, new_lv).fail_msg + msg2 = self.rpc.call_blockdev_remove(self.target_node, new_lv).fail_msg if msg2: - warning("Can't rollback device %s: %s", dev, msg2, - hint="cleanup manually the unused logical volumes") + self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2, + hint=("cleanup manually the unused logical" + "volumes")) raise errors.OpExecError("Can't add local storage to drbd: %s" % msg) dev.children = new_lvs - cfg.Update(instance) - # Step: wait for sync + self.cfg.Update(self.instance) - # this can fail as the old devices are degraded and _WaitForSync - # does a combined result over all disks, so we don't check its - # return value - self.proc.LogStep(5, steps_total, "sync devices") - _WaitForSync(self, instance, unlock=True) + # Wait for sync + # This can fail as the old devices are degraded and _WaitForSync + # does a combined result over all disks, so we don't check its return value + self.lu.LogStep(5, steps_total, "Sync devices") + _WaitForSync(self.lu, self.instance, unlock=True) - # so check manually all the devices - for name, (dev, old_lvs, new_lvs) in iv_names.iteritems(): - cfg.SetDiskID(dev, instance.primary_node) - result = self.rpc.call_blockdev_find(instance.primary_node, dev) - msg = result.fail_msg - if not msg and not result.payload: - msg = "disk not found" - if msg: - raise errors.OpExecError("Can't find DRBD device %s: %s" % - (name, msg)) - if result.payload[5]: - raise errors.OpExecError("DRBD device %s is degraded!" % name) + # Check all devices manually + self._CheckDevices(self.instance.primary_node, iv_names) # Step: remove old storage - self.proc.LogStep(6, steps_total, "removing old storage") - for name, (dev, old_lvs, new_lvs) in iv_names.iteritems(): - info("remove logical volumes for %s" % name) - for lv in old_lvs: - cfg.SetDiskID(lv, tgt_node) - msg = self.rpc.call_blockdev_remove(tgt_node, lv).fail_msg - if msg: - warning("Can't remove old LV: %s" % msg, - hint="manually remove unused LVs") - continue + self.lu.LogStep(6, steps_total, "Removing old storage") + self._RemoveOldStorage(self.target_node, iv_names) - def _ExecD8Secondary(self, feedback_fn): - """Replace the secondary node for drbd8. + def _ExecDrbd8Secondary(self): + """Replace the secondary node for DRBD 8. The algorithm for replace is quite complicated: - for all disks of the instance: @@ -5477,86 +5978,49 @@ class LUReplaceDisks(LogicalUnit): """ steps_total = 6 - warning, info = (self.proc.LogWarning, self.proc.LogInfo) - instance = self.instance - iv_names = {} - # start of work - cfg = self.cfg - old_node = self.tgt_node - new_node = self.new_node - pri_node = instance.primary_node - nodes_ip = { - old_node: self.cfg.GetNodeInfo(old_node).secondary_ip, - new_node: self.cfg.GetNodeInfo(new_node).secondary_ip, - pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip, - } # Step: check device activation - self.proc.LogStep(1, steps_total, "check device existence") - info("checking volume groups") - my_vg = cfg.GetVGName() - results = self.rpc.call_vg_list([pri_node, new_node]) - for node in pri_node, new_node: - res = results[node] - res.Raise("Error checking node %s" % node) - if my_vg not in res.payload: - raise errors.OpExecError("Volume group '%s' not found on %s" % - (my_vg, node)) - for idx, dev in enumerate(instance.disks): - if idx not in self.op.disks: - continue - info("checking disk/%d on %s" % (idx, pri_node)) - cfg.SetDiskID(dev, pri_node) - result = self.rpc.call_blockdev_find(pri_node, dev) - msg = result.fail_msg - if not msg and not result.payload: - msg = "disk not found" - if msg: - raise errors.OpExecError("Can't find disk/%d on node %s: %s" % - (idx, pri_node, msg)) + self.lu.LogStep(1, steps_total, "Check device existence") + self._CheckDisksExistence([self.instance.primary_node]) + self._CheckVolumeGroup([self.instance.primary_node]) # Step: check other node consistency - self.proc.LogStep(2, steps_total, "check peer consistency") - for idx, dev in enumerate(instance.disks): - if idx not in self.op.disks: - continue - info("checking disk/%d consistency on %s" % (idx, pri_node)) - if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True): - raise errors.OpExecError("Primary node (%s) has degraded storage," - " unsafe to replace the secondary" % - pri_node) + self.lu.LogStep(2, steps_total, "Check peer consistency") + self._CheckDisksConsistency(self.instance.primary_node, True, True) # Step: create new storage - self.proc.LogStep(3, steps_total, "allocate new storage") - for idx, dev in enumerate(instance.disks): - info("adding new local storage on %s for disk/%d" % - (new_node, idx)) + self.lu.LogStep(3, steps_total, "Allocate new storage") + for idx, dev in enumerate(self.instance.disks): + self.lu.LogInfo("Adding new local storage on %s for disk/%d" % + (self.new_node, idx)) # we pass force_create=True to force LVM creation for new_lv in dev.children: - _CreateBlockDev(self, new_node, instance, new_lv, True, - _GetInstanceInfoText(instance), False) + _CreateBlockDev(self.lu, self.new_node, self.instance, new_lv, True, + _GetInstanceInfoText(self.instance), False) # Step 4: dbrd minors and drbd setups changes # after this, we must manually remove the drbd minors on both the # error and the success paths - minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks], - instance.name) - logging.debug("Allocated minors %s" % (minors,)) - self.proc.LogStep(4, steps_total, "changing drbd configuration") - for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)): - info("activating a new drbd on %s for disk/%d" % (new_node, idx)) + self.lu.LogStep(4, steps_total, "Changing drbd configuration") + minors = self.cfg.AllocateDRBDMinor([self.new_node for dev in self.instance.disks], + self.instance.name) + logging.debug("Allocated minors %r" % (minors,)) + + iv_names = {} + for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)): + self.lu.LogInfo("activating a new drbd on %s for disk/%d" % (self.new_node, idx)) # create new devices on new_node; note that we create two IDs: # one without port, so the drbd will be activated without # networking information on the new node at this stage, and one # with network, for the latter activation in step 4 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id - if pri_node == o_node1: + if self.instance.primary_node == o_node1: p_minor = o_minor1 else: p_minor = o_minor2 - new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret) - new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret) + new_alone_id = (self.instance.primary_node, self.new_node, None, p_minor, new_minor, o_secret) + new_net_id = (self.instance.primary_node, self.new_node, o_port, p_minor, new_minor, o_secret) iv_names[idx] = (dev, dev.children, new_net_id) logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor, @@ -5566,106 +6030,68 @@ class LUReplaceDisks(LogicalUnit): children=dev.children, size=dev.size) try: - _CreateSingleBlockDev(self, new_node, instance, new_drbd, - _GetInstanceInfoText(instance), False) + _CreateSingleBlockDev(self.lu, self.new_node, self.instance, new_drbd, + _GetInstanceInfoText(self.instance), False) except errors.GenericError: - self.cfg.ReleaseDRBDMinors(instance.name) + self.cfg.ReleaseDRBDMinors(self.instance.name) raise - for idx, dev in enumerate(instance.disks): - # we have new devices, shutdown the drbd on the old secondary - info("shutting down drbd for disk/%d on old node" % idx) - cfg.SetDiskID(dev, old_node) - msg = self.rpc.call_blockdev_shutdown(old_node, dev).fail_msg + # We have new devices, shutdown the drbd on the old secondary + for idx, dev in enumerate(self.instance.disks): + self.lu.LogInfo("Shutting down drbd for disk/%d on old node" % idx) + self.cfg.SetDiskID(dev, self.target_node) + msg = self.rpc.call_blockdev_shutdown(self.target_node, dev).fail_msg if msg: - warning("Failed to shutdown drbd for disk/%d on old node: %s" % - (idx, msg), - hint="Please cleanup this device manually as soon as possible") + self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old" + "node: %s" % (idx, msg), + hint=("Please cleanup this device manually as" + " soon as possible")) - info("detaching primary drbds from the network (=> standalone)") - result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip, - instance.disks)[pri_node] + self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)") + result = self.rpc.call_drbd_disconnect_net([self.instance.primary_node], self.node_secondary_ip, + self.instance.disks)[self.instance.primary_node] msg = result.fail_msg if msg: # detaches didn't succeed (unlikely) - self.cfg.ReleaseDRBDMinors(instance.name) + self.cfg.ReleaseDRBDMinors(self.instance.name) raise errors.OpExecError("Can't detach the disks from the network on" " old node: %s" % (msg,)) # if we managed to detach at least one, we update all the disks of # the instance to point to the new secondary - info("updating instance configuration") + self.lu.LogInfo("Updating instance configuration") for dev, _, new_logical_id in iv_names.itervalues(): dev.logical_id = new_logical_id - cfg.SetDiskID(dev, pri_node) - cfg.Update(instance) + self.cfg.SetDiskID(dev, self.instance.primary_node) + + self.cfg.Update(self.instance) # and now perform the drbd attach - info("attaching primary drbds to new secondary (standalone => connected)") - result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip, - instance.disks, instance.name, + self.lu.LogInfo("Attaching primary drbds to new secondary" + " (standalone => connected)") + result = self.rpc.call_drbd_attach_net([self.instance.primary_node, self.new_node], self.node_secondary_ip, + self.instance.disks, self.instance.name, False) for to_node, to_result in result.items(): msg = to_result.fail_msg if msg: - warning("can't attach drbd disks on node %s: %s", to_node, msg, - hint="please do a gnt-instance info to see the" - " status of disks") - - # this can fail as the old devices are degraded and _WaitForSync - # does a combined result over all disks, so we don't check its - # return value - self.proc.LogStep(5, steps_total, "sync devices") - _WaitForSync(self, instance, unlock=True) - - # so check manually all the devices - for idx, (dev, old_lvs, _) in iv_names.iteritems(): - cfg.SetDiskID(dev, pri_node) - result = self.rpc.call_blockdev_find(pri_node, dev) - msg = result.fail_msg - if not msg and not result.payload: - msg = "disk not found" - if msg: - raise errors.OpExecError("Can't find DRBD device disk/%d: %s" % - (idx, msg)) - if result.payload[5]: - raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx) + self.lu.LogWarning("Can't attach drbd disks on node %s: %s", to_node, msg, + hint=("please do a gnt-instance info to see the" + " status of disks")) - self.proc.LogStep(6, steps_total, "removing old storage") - for idx, (dev, old_lvs, _) in iv_names.iteritems(): - info("remove logical volumes for disk/%d" % idx) - for lv in old_lvs: - cfg.SetDiskID(lv, old_node) - msg = self.rpc.call_blockdev_remove(old_node, lv).fail_msg - if msg: - warning("Can't remove LV on old secondary: %s", msg, - hint="Cleanup stale volumes by hand") + # Wait for sync + # This can fail as the old devices are degraded and _WaitForSync + # does a combined result over all disks, so we don't check its return value + self.lu.LogStep(5, steps_total, "Sync devices") + _WaitForSync(self.lu, self.instance, unlock=True) - def Exec(self, feedback_fn): - """Execute disk replacement. + # Check all devices manually + self._CheckDevices(self.instance.primary_node, iv_names) - This dispatches the disk replacement to the appropriate handler. - - """ - instance = self.instance - - # Activate the instance disks if we're replacing them on a down instance - if not instance.admin_up: - _StartInstanceDisks(self, instance, True) - - if self.op.mode == constants.REPLACE_DISK_CHG: - fn = self._ExecD8Secondary - else: - fn = self._ExecD8DiskOnly - - ret = fn(feedback_fn) - - # Deactivate the instance disks if we're replacing them on a down instance - if not instance.admin_up: - _SafeShutdownInstanceDisks(self, instance) - - return ret + # Step: remove old storage + self.lu.LogStep(6, steps_total, "Removing old storage") + self._RemoveOldStorage(self.target_node, iv_names) class LUGrowDisk(LogicalUnit): @@ -6895,8 +7321,9 @@ class IAllocator(object): "relocate_from", ] - def __init__(self, lu, mode, name, **kwargs): - self.lu = lu + def __init__(self, cfg, rpc, mode, name, **kwargs): + self.cfg = cfg + self.rpc = rpc # init buffer variables self.in_text = self.out_text = self.in_data = self.out_data = None # init all input fields so that pylint is happy @@ -6934,7 +7361,7 @@ class IAllocator(object): This is the data that is independent of the actual operation. """ - cfg = self.lu.cfg + cfg = self.cfg cluster_info = cfg.GetClusterInfo() # cluster data data = { @@ -6956,10 +7383,11 @@ class IAllocator(object): elif self.mode == constants.IALLOCATOR_MODE_RELOC: hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor - node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(), - hypervisor_name) - node_iinfo = self.lu.rpc.call_all_instances_info(node_list, - cluster_info.enabled_hypervisors) + node_data = self.rpc.call_node_info(node_list, cfg.GetVGName(), + hypervisor_name) + node_iinfo = \ + self.rpc.call_all_instances_info(node_list, + cluster_info.enabled_hypervisors) for nname, nresult in node_data.items(): # first fill in static (config-based) values ninfo = cfg.GetNodeInfo(nname) @@ -7096,7 +7524,7 @@ class IAllocator(object): done. """ - instance = self.lu.cfg.GetInstanceInfo(self.name) + instance = self.cfg.GetInstanceInfo(self.name) if instance is None: raise errors.ProgrammerError("Unknown instance '%s' passed to" " IAllocator" % self.name) @@ -7138,9 +7566,9 @@ class IAllocator(object): """ if call_fn is None: - call_fn = self.lu.rpc.call_iallocator_runner + call_fn = self.rpc.call_iallocator_runner - result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text) + result = call_fn(self.cfg.GetMasterNode(), name, self.in_text) result.Raise("Failure while running the iallocator script") self.out_text = result.payload @@ -7244,7 +7672,7 @@ class LUTestAllocator(NoHooksLU): """ if self.op.mode == constants.IALLOCATOR_MODE_ALLOC: - ial = IAllocator(self, + ial = IAllocator(self.cfg, self.rpc, mode=self.op.mode, name=self.op.name, mem_size=self.op.mem_size, @@ -7257,7 +7685,7 @@ class LUTestAllocator(NoHooksLU): hypervisor=self.op.hypervisor, ) else: - ial = IAllocator(self, + ial = IAllocator(self.cfg, self.rpc, mode=self.op.mode, name=self.op.name, relocate_from=list(self.relocate_from), diff --git a/lib/constants.py b/lib/constants.py index 198b8972232f0cab88fef335f6053c42ab41e0cf..23dacdcc2a6ef5759ccf0b30d77dd6acc4f30e69 100644 --- a/lib/constants.py +++ b/lib/constants.py @@ -97,6 +97,7 @@ SSCONF_LOCK_FILE = LOCK_DIR + "/ganeti-ssconf.lock" CLUSTER_CONF_FILE = DATA_DIR + "/config.data" SSL_CERT_FILE = DATA_DIR + "/server.pem" RAPI_CERT_FILE = DATA_DIR + "/rapi.pem" +HMAC_CLUSTER_KEY = DATA_DIR + "/hmac.key" WATCHER_STATEFILE = DATA_DIR + "/watcher.data" INSTANCE_UPFILE = RUN_GANETI_DIR + "/instance-status" SSH_KNOWN_HOSTS_FILE = DATA_DIR + "/known_hosts" @@ -108,23 +109,41 @@ SYSCONFDIR = _autoconf.SYSCONFDIR MASTER_SOCKET = SOCKET_DIR + "/ganeti-master" -# PID files -MASTERD_PID = "ganeti-masterd" -NODED_PID = "ganeti-noded" -RAPI_PID = "ganeti-rapi" - NODE_INITD_SCRIPT = _autoconf.SYSCONFDIR + "/init.d/ganeti" -DEFAULT_NODED_PORT = 1811 + +NODED = "ganeti-noded" +RAPI = "ganeti-rapi" +MASTERD = "ganeti-masterd" + +MULTITHREADED_DAEMONS = frozenset([MASTERD]) + +DAEMONS_SSL = { + # daemon-name: (default-cert-path, default-key-path) + NODED: (SSL_CERT_FILE, SSL_CERT_FILE), + RAPI: (RAPI_CERT_FILE, RAPI_CERT_FILE), +} + +DAEMONS_PORTS = { + # daemon-name: ("proto", "default-port") + NODED: ("tcp", 1811), + RAPI: ("tcp", 5080), +} +DEFAULT_NODED_PORT = DAEMONS_PORTS[NODED][1] +DEFAULT_RAPI_PORT = DAEMONS_PORTS[RAPI][1] + FIRST_DRBD_PORT = 11000 LAST_DRBD_PORT = 14999 MASTER_SCRIPT = "ganeti-master" LOG_DIR = _autoconf.LOCALSTATEDIR + "/log/ganeti/" +DAEMONS_LOGFILES = { + # "daemon-name": "logfile" + NODED: LOG_DIR + "node-daemon.log", + RAPI: LOG_DIR + "rapi-daemon.log", + MASTERD: LOG_DIR + "master-daemon.log", +} LOG_OS_DIR = LOG_DIR + "os" -LOG_NODESERVER = LOG_DIR + "node-daemon.log" LOG_WATCHER = LOG_DIR + "watcher.log" -LOG_MASTERDAEMON = LOG_DIR + "master-daemon.log" -LOG_RAPISERVER = LOG_DIR + "rapi-daemon.log" LOG_COMMANDS = LOG_DIR + "commands.log" LOG_BURNIN = LOG_DIR + "burnin.log" @@ -163,6 +182,25 @@ HKR_SKIP = 0 HKR_FAIL = 1 HKR_SUCCESS = 2 +# Storage types +ST_FILE = "file" +ST_LVM_PV = "lvm-pv" +ST_LVM_VG = "lvm-vg" + +# Storage fields +SF_NAME = "name" +SF_SIZE = "size" +SF_FREE = "free" +SF_USED = "used" +SF_ALLOCATABLE = "allocatable" + +# Available fields per storage type +VALID_STORAGE_FIELDS = { + ST_FILE: frozenset([SF_NAME, SF_USED, SF_FREE]), + ST_LVM_PV: frozenset([SF_NAME, SF_SIZE, SF_USED, SF_FREE, SF_ALLOCATABLE]), + ST_LVM_VG: frozenset([SF_NAME, SF_SIZE]), + } + # disk template types DT_DISKLESS = "diskless" DT_PLAIN = "plain" @@ -227,6 +265,7 @@ DDM_REMOVE = 'remove' # common exit codes EXIT_SUCCESS = 0 EXIT_FAILURE = 1 +EXIT_NOTCLUSTER = 5 EXIT_NOTMASTER = 11 EXIT_NODESETUP_ERROR = 12 EXIT_CONFIRMATION = 13 # need user confirmation @@ -471,22 +510,24 @@ JOB_STATUS_CANCELED = "canceled" JOB_STATUS_SUCCESS = "success" JOB_STATUS_ERROR = "error" +# OpCode status +# not yet finalized OP_STATUS_QUEUED = "queued" OP_STATUS_WAITLOCK = "waiting" OP_STATUS_CANCELING = "canceling" OP_STATUS_RUNNING = "running" +# finalized OP_STATUS_CANCELED = "canceled" OP_STATUS_SUCCESS = "success" OP_STATUS_ERROR = "error" +OPS_FINALIZED = frozenset([OP_STATUS_CANCELED, + OP_STATUS_SUCCESS, + OP_STATUS_ERROR]) # Execution log types ELOG_MESSAGE = "message" ELOG_PROGRESS = "progress" -# Temporary RAPI constants until we have cluster parameters -RAPI_ENABLE = True -RAPI_PORT = 5080 - # max dynamic devices MAX_NICS = 8 MAX_DISKS = 16 diff --git a/lib/daemon.py b/lib/daemon.py index 26f9cdfe742d61e4313027a9ac6acc3ecbcdc112..e26d4445eb26be8e876f72858f3195bb1257ace7 100644 --- a/lib/daemon.py +++ b/lib/daemon.py @@ -22,12 +22,15 @@ """Module with helper classes and functions for daemons""" +import os import select import signal import errno import time +import logging from ganeti import utils +from ganeti import constants class Timer(object): @@ -297,3 +300,88 @@ class Mainloop(object): """ self._timer_remove.append(timer_id) + + +def GenericMain(daemon_name, optionparser, dirs, check_fn, exec_fn): + """Shared main function for daemons. + + @type daemon_name: string + @param daemon_name: daemon name + @type optionparser: L{optparse.OptionParser} + @param optionparser: initialized optionparser with daemon-specific options + (common -f -d options will be handled by this module) + @type options: object @param options: OptionParser result, should contain at + least the fork and the debug options + @type dirs: list of strings + @param dirs: list of directories that must exist for this daemon to work + @type check_fn: function which accepts (options, args) + @param check_fn: function that checks start conditions and exits if they're + not met + @type exec_fn: function which accepts (options, args) + @param exec_fn: function that's executed with the daemon's pid file held, and + runs the daemon itself. + + """ + optionparser.add_option("-f", "--foreground", dest="fork", + help="Don't detach from the current terminal", + default=True, action="store_false") + optionparser.add_option("-d", "--debug", dest="debug", + help="Enable some debug messages", + default=False, action="store_true") + if daemon_name in constants.DAEMONS_PORTS: + # for networked daemons we also allow choosing the bind port and address. + # by default we use the port provided by utils.GetDaemonPort, and bind to + # 0.0.0.0 (which is represented by and empty bind address. + port = utils.GetDaemonPort(daemon_name) + optionparser.add_option("-p", "--port", dest="port", + help="Network port (%s default)." % port, + default=port, type="int") + optionparser.add_option("-b", "--bind", dest="bind_address", + help="Bind address", + default="", metavar="ADDRESS") + + if daemon_name in constants.DAEMONS_SSL: + default_cert, default_key = constants.DAEMONS_SSL[daemon_name] + optionparser.add_option("--no-ssl", dest="ssl", + help="Do not secure HTTP protocol with SSL", + default=True, action="store_false") + optionparser.add_option("-K", "--ssl-key", dest="ssl_key", + help="SSL key", + default=default_key, type="string") + optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert", + help="SSL certificate", + default=default_cert, type="string") + + multithread = utils.no_fork = daemon_name in constants.MULTITHREADED_DAEMONS + + options, args = optionparser.parse_args() + + if hasattr(options, 'ssl') and options.ssl: + if not (options.ssl_cert and options.ssl_key): + print >> sys.stderr, "Need key and certificate to use ssl" + sys.exit(constants.EXIT_FAILURE) + for fname in (options.ssl_cert, options.ssl_key): + if not os.path.isfile(fname): + print >> sys.stderr, "Need ssl file %s to run" % fname + sys.exit(constants.EXIT_FAILURE) + + if check_fn is not None: + check_fn(options, args) + + utils.EnsureDirs(dirs) + + if options.fork: + utils.CloseFDs() + utils.Daemonize(logfile=constants.DAEMONS_LOGFILES[daemon_name]) + + utils.WritePidFile(daemon_name) + try: + utils.SetupLogging(logfile=constants.DAEMONS_LOGFILES[daemon_name], + debug=options.debug, + stderr_logging=not options.fork, + multithreaded=multithread) + logging.info("%s daemon startup" % daemon_name) + exec_fn(options, args) + finally: + utils.RemovePidFile(daemon_name) + diff --git a/lib/errors.py b/lib/errors.py index 9bc9a593ec33f54fbb537f752ea8cb969970c75c..cdcc1c2719728e4f771c69a51ee08cdf9a60ae59 100644 --- a/lib/errors.py +++ b/lib/errors.py @@ -195,11 +195,13 @@ class UnitParseError(GenericError): """ + class TypeEnforcementError(GenericError): """Unable to enforce data type. """ + class SshKeyError(GenericError): """Invalid SSH key. @@ -220,6 +222,12 @@ class CommandError(GenericError): """ +class StorageError(GenericError): + """Storage-related exception. + + """ + + class QuitGanetiException(Exception): """Signal that Ganeti that it must quit. diff --git a/lib/hypervisor/__init__.py b/lib/hypervisor/__init__.py index c2da1144d76fa01deeb20c1d5354ee34f7a48cee..f89621934c2b14c4d833c5d799699b93f29aa797 100644 --- a/lib/hypervisor/__init__.py +++ b/lib/hypervisor/__init__.py @@ -41,11 +41,11 @@ _HYPERVISOR_MAP = { } -def GetHypervisor(ht_kind): - """Return a Hypervisor instance. +def GetHypervisorClass(ht_kind): + """Return a Hypervisor class. - This function parses the cluster hypervisor configuration file and - instantiates a class based on the value of this file. + This function returns the hypervisor class corresponding to the + given hypervisor name. @type ht_kind: string @param ht_kind: The requested hypervisor type @@ -55,4 +55,19 @@ def GetHypervisor(ht_kind): raise errors.HypervisorError("Unknown hypervisor type '%s'" % ht_kind) cls = _HYPERVISOR_MAP[ht_kind] + return cls + + +def GetHypervisor(ht_kind): + """Return a Hypervisor instance. + + This is a wrapper over L{GetHypervisorClass} which returns an + instance of the class. + + @type ht_kind: string + @param ht_kind: The requested hypervisor type + + """ + cls = GetHypervisorClass(ht_kind) + return cls() diff --git a/lib/jqueue.py b/lib/jqueue.py index 74139b7b5aac54bfc78d2eecec128531f4a3fd47..0eb769960fe59e99d7ecfa4328ca20d7c832d3ae 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -80,6 +80,10 @@ class _QueuedOpCode(object): @ivar stop_timestamp: timestamp for the end of the execution """ + __slots__ = ["input", "status", "result", "log", + "start_timestamp", "end_timestamp", + "__weakref__"] + def __init__(self, op): """Constructor for the _QuededOpCode. @@ -152,6 +156,11 @@ class _QueuedJob(object): @ivar change: a Condition variable we use for waiting for job changes """ + __slots__ = ["queue", "id", "ops", "run_op_index", "log_serial", + "received_timestamp", "start_timestamp", "end_timestamp", + "change", + "__weakref__"] + def __init__(self, queue, job_id, ops): """Constructor for the _QueuedJob. @@ -304,6 +313,26 @@ class _QueuedJob(object): return entries + def MarkUnfinishedOps(self, status, result): + """Mark unfinished opcodes with a given status and result. + + This is an utility function for marking all running or waiting to + be run opcodes with a given status. Opcodes which are already + finalised are not changed. + + @param status: a given opcode status + @param result: the opcode result + + """ + not_marked = True + for op in self.ops: + if op.status in constants.OPS_FINALIZED: + assert not_marked, "Finalized opcodes found after non-finalized ones" + continue + op.status = status + op.result = result + not_marked = False + class _JobQueueWorker(workerpool.BaseWorker): """The actual job workers. @@ -353,6 +382,15 @@ class _JobQueueWorker(workerpool.BaseWorker): count = len(job.ops) for idx, op in enumerate(job.ops): op_summary = op.input.Summary() + if op.status == constants.OP_STATUS_SUCCESS: + # this is a job that was partially completed before master + # daemon shutdown, so it can be expected that some opcodes + # are already completed successfully (if any did error + # out, then the whole job should have been aborted and not + # resubmitted for processing) + logging.info("Op %s/%s: opcode %s already processed, skipping", + idx + 1, count, op_summary) + continue try: logging.info("Op %s/%s: Starting opcode %s", idx + 1, count, op_summary) @@ -446,7 +484,7 @@ class _JobQueueWorker(workerpool.BaseWorker): queue.acquire() try: try: - job.run_op_idx = -1 + job.run_op_index = -1 job.end_timestamp = TimeStampNow() queue.UpdateJobUnlocked(job) finally: @@ -575,9 +613,8 @@ class JobQueue(object): constants.JOB_STATUS_CANCELING): logging.warning("Unfinished job %s found: %s", job.id, job) try: - for op in job.ops: - op.status = constants.OP_STATUS_ERROR - op.result = "Unclean master daemon shutdown" + job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, + "Unclean master daemon shutdown") finally: self.UpdateJobUnlocked(job) @@ -1070,6 +1107,10 @@ class JobQueue(object): """ logging.debug("Waiting for changes in job %s", job_id) + + job_info = None + log_entries = None + end_time = time.time() + timeout while True: delta_time = end_time - time.time() @@ -1111,7 +1152,10 @@ class JobQueue(object): logging.debug("Job %s changed", job_id) - return (job_info, log_entries) + if job_info is None and log_entries is None: + return None + else: + return (job_info, log_entries) @utils.LockedMethod @_RequireOpenQueue @@ -1135,8 +1179,8 @@ class JobQueue(object): if job_status not in (constants.JOB_STATUS_QUEUED, constants.JOB_STATUS_WAITLOCK): - logging.debug("Job %s is no longer in the queue", job.id) - return (False, "Job %s is no longer in the queue" % job.id) + logging.debug("Job %s is no longer waiting in the queue", job.id) + return (False, "Job %s is no longer waiting in the queue" % job.id) if job_status == constants.JOB_STATUS_QUEUED: self.CancelJobUnlocked(job) @@ -1145,8 +1189,7 @@ class JobQueue(object): elif job_status == constants.JOB_STATUS_WAITLOCK: # The worker will notice the new status and cancel the job try: - for op in job.ops: - op.status = constants.OP_STATUS_CANCELING + job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None) finally: self.UpdateJobUnlocked(job) return (True, "Job %s will be canceled" % job.id) @@ -1157,9 +1200,8 @@ class JobQueue(object): """ try: - for op in job.ops: - op.status = constants.OP_STATUS_CANCELED - op.result = "Job canceled by request" + job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED, + "Job canceled by request") finally: self.UpdateJobUnlocked(job) diff --git a/lib/mcpu.py b/lib/mcpu.py index 78faa37ddbf5e3344f68310efce853a8a4529308..55d235fb632118d2469b00af10d84fdd757d5214 100644 --- a/lib/mcpu.py +++ b/lib/mcpu.py @@ -54,9 +54,12 @@ class Processor(object): opcodes.OpAddNode: cmdlib.LUAddNode, opcodes.OpQueryNodes: cmdlib.LUQueryNodes, opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes, + opcodes.OpQueryNodeStorage: cmdlib.LUQueryNodeStorage, opcodes.OpRemoveNode: cmdlib.LURemoveNode, opcodes.OpSetNodeParams: cmdlib.LUSetNodeParams, opcodes.OpPowercycleNode: cmdlib.LUPowercycleNode, + opcodes.OpEvacuateNode: cmdlib.LUEvacuateNode, + opcodes.OpMigrateNode: cmdlib.LUMigrateNode, # instance lu opcodes.OpCreateInstance: cmdlib.LUCreateInstance, opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance, diff --git a/lib/opcodes.py b/lib/opcodes.py index e7d34a8bd6bb47a681b369977f829177ce3e13c1..2a2a5f8a4c632205db7726e313eec2ae80cf47da 100644 --- a/lib/opcodes.py +++ b/lib/opcodes.py @@ -328,6 +328,17 @@ class OpQueryNodeVolumes(OpCode): __slots__ = OpCode.__slots__ + ["nodes", "output_fields"] +class OpQueryNodeStorage(OpCode): + """Get information on storage for node(s).""" + OP_ID = "OP_NODE_QUERY_STORAGE" + __slots__ = OpCode.__slots__ + [ + "nodes", + "storage_type", + "name", + "output_fields", + ] + + class OpSetNodeParams(OpCode): """Change the parameters of a node.""" OP_ID = "OP_NODE_SET_PARAMS" @@ -350,6 +361,26 @@ class OpPowercycleNode(OpCode): "force", ] + +class OpEvacuateNode(OpCode): + """Relocate secondary instances from a node.""" + OP_ID = "OP_NODE_EVACUATE" + OP_DSC_FIELD = "node_name" + __slots__ = OpCode.__slots__ + [ + "node_name", "remote_node", "iallocator", + ] + + +class OpMigrateNode(OpCode): + """Migrate all instances from a node.""" + OP_ID = "OP_NODE_MIGRATE" + OP_DSC_FIELD = "node_name" + __slots__ = OpCode.__slots__ + [ + "node_name", + "live", + ] + + # instance opcodes class OpCreateInstance(OpCode): diff --git a/lib/rapi/baserlib.py b/lib/rapi/baserlib.py index c75dae18232291035c753c818633d2663f6178c4..1ed974bb03491fab34977209b1f3cd80eb73c482 100644 --- a/lib/rapi/baserlib.py +++ b/lib/rapi/baserlib.py @@ -233,16 +233,16 @@ class R_Generic(object): """ return self.sn - def _checkIntVariable(self, name): + def _checkIntVariable(self, name, default=0): """Return the parsed value of an int argument. """ - val = self.queryargs.get(name, 0) + val = self.queryargs.get(name, default) if isinstance(val, list): if val: val = val[0] else: - val = 0 + val = default try: val = int(val) except (ValueError, TypeError): diff --git a/lib/rapi/connector.py b/lib/rapi/connector.py index c3c12e6c77450eea2898d34b4c5031fb81001c12..83c1f2e19da7375619dac2c41163e873f60de2e0 100644 --- a/lib/rapi/connector.py +++ b/lib/rapi/connector.py @@ -155,6 +155,10 @@ CONNECTOR.update({ re.compile(r'^/2/nodes/([\w\._-]+)$'): rlib2.R_2_nodes_name, re.compile(r'^/2/nodes/([\w\._-]+)/tags$'): rlib2.R_2_nodes_name_tags, re.compile(r'^/2/nodes/([\w\._-]+)/role$'): rlib2.R_2_nodes_name_role, + re.compile(r'^/2/nodes/([\w\._-]+)/evacuate$'): + rlib2.R_2_nodes_name_evacuate, + re.compile(r'^/2/nodes/([\w\._-]+)/migrate$'): + rlib2.R_2_nodes_name_migrate, "/2/instances": rlib2.R_2_instances, re.compile(r'^/2/instances/([\w\._-]+)$'): rlib2.R_2_instances_name, re.compile(r'^/2/instances/([\w\._-]+)/tags$'): rlib2.R_2_instances_name_tags, diff --git a/lib/rapi/rlib2.py b/lib/rapi/rlib2.py index 9c8e2a09f582ad9e09d7656871a0b3abb1e5e58c..0da39c1058294afd6a11442278cf4f96923bf127 100644 --- a/lib/rapi/rlib2.py +++ b/lib/rapi/rlib2.py @@ -261,6 +261,41 @@ class R_2_nodes_name_role(baserlib.R_Generic): return baserlib.SubmitJob([op]) +class R_2_nodes_name_evacuate(baserlib.R_Generic): + """/2/nodes/[node_name]/evacuate resource. + + """ + def POST(self): + """Evacuate all secondary instances off a node. + + """ + node_name = self.items[0] + remote_node = self._checkStringVariable("remote_node", default=None) + iallocator = self._checkStringVariable("iallocator", default=None) + + op = opcodes.OpEvacuateNode(node_name=node_name, + remote_node=remote_node, + iallocator=iallocator) + + return baserlib.SubmitJob([op]) + + +class R_2_nodes_name_migrate(baserlib.R_Generic): + """/2/nodes/[node_name]/evacuate migrate. + + """ + def POST(self): + """Migrate all primary instances from a node. + + """ + node_name = self.items[0] + live = bool(self._checkIntVariable("live", default=1)) + + op = opcodes.OpMigrateNode(node_name=node_name, live=live) + + return baserlib.SubmitJob([op]) + + class R_2_instances(baserlib.R_Generic): """/2/instances resource. diff --git a/lib/rpc.py b/lib/rpc.py index b02cc1afb084bf93856c57b45c61965c0d4f5fdb..a46444bade5156f83903a9104e8eef8760a421f5 100644 --- a/lib/rpc.py +++ b/lib/rpc.py @@ -265,7 +265,7 @@ class RpcRunner(object): """ self._cfg = cfg - self.port = utils.GetNodeDaemonPort() + self.port = utils.GetDaemonPort(constants.NODED) def _InstDict(self, instance, hvp=None, bep=None): """Convert the given instance to a dict. @@ -301,7 +301,7 @@ class RpcRunner(object): def _ConnectList(self, client, node_list, call): """Helper for computing node addresses. - @type client: L{Client} + @type client: L{ganeti.rpc.Client} @param client: a C{Client} instance @type node_list: list @param node_list: the node list we should connect @@ -331,7 +331,7 @@ class RpcRunner(object): def _ConnectNode(self, client, node, call): """Helper for computing one node's address. - @type client: L{Client} + @type client: L{ganeti.rpc.Client} @param client: a C{Client} instance @type node: str @param node: the node we should connect @@ -366,7 +366,7 @@ class RpcRunner(object): """ body = serializer.DumpJson(args, indent=False) - c = Client(procedure, body, utils.GetNodeDaemonPort()) + c = Client(procedure, body, utils.GetDaemonPort(constants.NODED)) c.ConnectList(node_list, address_list=address_list) return c.GetResults() @@ -388,7 +388,7 @@ class RpcRunner(object): """ body = serializer.DumpJson(args, indent=False) - c = Client(procedure, body, utils.GetNodeDaemonPort()) + c = Client(procedure, body, utils.GetDaemonPort(constants.NODED)) c.ConnectNode(node) return c.GetResults()[node] @@ -432,6 +432,15 @@ class RpcRunner(object): """ return self._MultiNodeCall(node_list, "vg_list", []) + def call_storage_list(self, node_list, su_name, su_args, name, fields): + """Get list of storage units.. + + This is a multi-node call. + + """ + return self._MultiNodeCall(node_list, "storage_list", + [su_name, su_args, name, fields]) + def call_bridges_exist(self, node, bridges_list): """Checks if a node has all the bridges given. diff --git a/lib/storage.py b/lib/storage.py new file mode 100644 index 0000000000000000000000000000000000000000..de275bd69c9f0b81bcff80929f2981d2fb88c4ac --- /dev/null +++ b/lib/storage.py @@ -0,0 +1,344 @@ +# +# + +# Copyright (C) 2009 Google Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA +# 02110-1301, USA. + + +"""Storage container abstraction. + +""" + + +import logging + +from ganeti import errors +from ganeti import constants +from ganeti import utils + + +def _ParseSize(value): + return int(round(float(value), 0)) + + +class _Base: + """Base class for storage abstraction. + + """ + def List(self, name, fields): + """Returns a list of all entities within the storage unit. + + @type name: string or None + @param name: Entity name or None for all + @type fields: list + @param fields: List with all requested result fields (order is preserved) + + """ + raise NotImplementedError() + + +class FileStorage(_Base): + """File storage unit. + + """ + def __init__(self, paths): + """Initializes this class. + + @type paths: list + @param paths: List of file storage paths + + """ + self._paths = paths + + def List(self, name, fields): + """Returns a list of all entities within the storage unit. + + See L{_Base.List}. + + """ + rows = [] + + if name is None: + paths = self._paths + else: + paths = [name] + + for path in paths: + rows.append(self._ListInner(path, fields)) + + return rows + + @staticmethod + def _ListInner(path, fields): + """Gathers requested information from directory. + + @type path: string + @param path: Path to directory + @type fields: list + @param fields: Requested fields + + """ + values = [] + + # Pre-calculate information in case it's requested more than once + if constants.SF_USED in fields: + dirsize = utils.CalculateDirectorySize(path) + else: + dirsize = None + + if constants.SF_FREE in fields: + fsfree = utils.GetFreeFilesystemSpace(path) + else: + fsfree = None + + # Make sure to update constants.VALID_STORAGE_FIELDS when changing fields. + for field_name in fields: + if field_name == constants.SF_NAME: + values.append(path) + + elif field_name == constants.SF_USED: + values.append(dirsize) + + elif field_name == constants.SF_FREE: + values.append(fsfree) + + else: + raise errors.StorageError("Unknown field: %r" % field_name) + + return values + + +class _LvmBase(_Base): + """Base class for LVM storage containers. + + """ + LIST_SEP = "|" + LIST_COMMAND = None + LIST_FIELDS = None + + def List(self, name, wanted_field_names): + """Returns a list of all entities within the storage unit. + + See L{_Base.List}. + + """ + # Get needed LVM fields + lvm_fields = self._GetLvmFields(self.LIST_FIELDS, wanted_field_names) + + # Build LVM command + cmd_args = self._BuildListCommand(self.LIST_COMMAND, self.LIST_SEP, + lvm_fields, name) + + # Run LVM command + cmd_result = self._RunListCommand(cmd_args) + + # Split and rearrange LVM command output + return self._BuildList(self._SplitList(cmd_result, self.LIST_SEP, + len(lvm_fields)), + self.LIST_FIELDS, + wanted_field_names, + lvm_fields) + + @staticmethod + def _GetLvmFields(fields_def, wanted_field_names): + """Returns unique list of fields wanted from LVM command. + + @type fields_def: list + @param fields_def: Field definitions + @type wanted_field_names: list + @param wanted_field_names: List of requested fields + + """ + field_to_idx = dict([(field_name, idx) + for (idx, (field_name, _, _)) in enumerate(fields_def)]) + + lvm_fields = [] + + for field_name in wanted_field_names: + try: + idx = field_to_idx[field_name] + except IndexError: + raise errors.StorageError("Unknown field: %r" % field_name) + + (_, lvm_name, _) = fields_def[idx] + + lvm_fields.append(lvm_name) + + return utils.UniqueSequence(lvm_fields) + + @classmethod + def _BuildList(cls, cmd_result, fields_def, wanted_field_names, lvm_fields): + """Builds the final result list. + + @type cmd_result: iterable + @param cmd_result: Iterable of LVM command output (iterable of lists) + @type fields_def: list + @param fields_def: Field definitions + @type wanted_field_names: list + @param wanted_field_names: List of requested fields + @type lvm_fields: list + @param lvm_fields: LVM fields + + """ + lvm_name_to_idx = dict([(lvm_name, idx) + for (idx, lvm_name) in enumerate(lvm_fields)]) + field_to_idx = dict([(field_name, idx) + for (idx, (field_name, _, _)) in enumerate(fields_def)]) + + data = [] + for raw_data in cmd_result: + row = [] + + for field_name in wanted_field_names: + (_, lvm_name, convert_fn) = fields_def[field_to_idx[field_name]] + + value = raw_data[lvm_name_to_idx[lvm_name]] + + if convert_fn: + value = convert_fn(value) + + row.append(value) + + data.append(row) + + return data + + @staticmethod + def _BuildListCommand(cmd, sep, options, name): + """Builds LVM command line. + + @type cmd: string + @param cmd: Command name + @type sep: string + @param sep: Field separator character + @type options: list of strings + @param options: Wanted LVM fields + @type name: name or None + @param name: Name of requested entity + + """ + args = [cmd, + "--noheadings", "--units=m", "--nosuffix", + "--separator", sep, + "--options", ",".join(options)] + + if name is not None: + args.append(name) + + return args + + @staticmethod + def _RunListCommand(args): + """Run LVM command. + + """ + result = utils.RunCmd(args) + + if result.failed: + raise errors.StorageError("Failed to run %r, command output: %s" % + (args[0], result.output)) + + return result.stdout + + @staticmethod + def _SplitList(data, sep, fieldcount): + """Splits LVM command output into rows and fields. + + @type data: string + @param data: LVM command output + @type sep: string + @param sep: Field separator character + @type fieldcount: int + @param fieldcount: Expected number of fields + + """ + for line in data.splitlines(): + fields = line.strip().split(sep) + + if len(fields) != fieldcount: + continue + + yield fields + + +class LvmPvStorage(_LvmBase): + """LVM Physical Volume storage unit. + + """ + def _GetAllocatable(attr): + if attr: + return (attr[0] == "a") + else: + logging.warning("Invalid PV attribute: %r", attr) + return False + + LIST_COMMAND = "pvs" + + # Make sure to update constants.VALID_STORAGE_FIELDS when changing field + # definitions. + LIST_FIELDS = [ + (constants.SF_NAME, "pv_name", None), + (constants.SF_SIZE, "pv_size", _ParseSize), + (constants.SF_USED, "pv_used", _ParseSize), + (constants.SF_FREE, "pv_free", _ParseSize), + (constants.SF_ALLOCATABLE, "pv_attr", _GetAllocatable), + ] + + +class LvmVgStorage(_LvmBase): + """LVM Volume Group storage unit. + + """ + LIST_COMMAND = "vgs" + + # Make sure to update constants.VALID_STORAGE_FIELDS when changing field + # definitions. + LIST_FIELDS = [ + (constants.SF_NAME, "vg_name", None), + (constants.SF_SIZE, "vg_size", _ParseSize), + ] + + +# Lookup table for storage types +_STORAGE_TYPES = { + constants.ST_FILE: FileStorage, + constants.ST_LVM_PV: LvmPvStorage, + constants.ST_LVM_VG: LvmVgStorage, + } + + +def GetStorageClass(name): + """Returns the class for a storage type. + + @type name: string + @param name: Storage type + + """ + try: + return _STORAGE_TYPES[name] + except KeyError: + raise errors.StorageError("Unknown storage type: %r" % name) + + +def GetStorage(name, *args): + """Factory function for storage methods. + + @type name: string + @param name: Storage type + + """ + return GetStorageClass(name)(*args) diff --git a/lib/utils.py b/lib/utils.py index aace5eccea89b3cf5e1e8258736f6bc950c8d629..0af26132af8fd49b70cfdb871cba8e24fc96222b 100644 --- a/lib/utils.py +++ b/lib/utils.py @@ -1679,20 +1679,26 @@ def MergeTime(timetuple): return float(seconds) + (float(microseconds) * 0.000001) -def GetNodeDaemonPort(): - """Get the node daemon port for this cluster. +def GetDaemonPort(daemon_name): + """Get the daemon port for this cluster. Note that this routine does not read a ganeti-specific file, but instead uses C{socket.getservbyname} to allow pre-customization of this parameter outside of Ganeti. + @type daemon_name: string + @param daemon_name: daemon name (in constants.DAEMONS_PORTS) @rtype: int """ + if daemon_name not in constants.DAEMONS_PORTS: + raise errors.ProgrammerError("Unknown daemon: %s" % daemon_name) + + (proto, default_port) = constants.DAEMONS_PORTS[daemon_name] try: - port = socket.getservbyname("ganeti-noded", "tcp") + port = socket.getservbyname(daemon_name, proto) except socket.error: - port = constants.DEFAULT_NODED_PORT + port = default_port return port @@ -1841,6 +1847,51 @@ def CommaJoin(names): return ", ".join(["'%s'" % val for val in names]) +def BytesToMebibyte(value): + """Converts bytes to mebibytes. + + @type value: int + @param value: Value in bytes + @rtype: int + @return: Value in mebibytes + + """ + return int(round(value / (1024.0 * 1024.0), 0)) + + +def CalculateDirectorySize(path): + """Calculates the size of a directory recursively. + + @type path: string + @param path: Path to directory + @rtype: int + @return: Size in mebibytes + + """ + size = 0 + + for (curpath, _, files) in os.walk(path): + for file in files: + st = os.lstat(os.path.join(curpath, file)) + size += st.st_size + + return BytesToMebibyte(size) + + +def GetFreeFilesystemSpace(path): + """Returns the free space on a filesystem. + + @type path: string + @param path: Path on filesystem to be examined + @rtype: int + @return: Free space in mebibytes + + """ + st = os.statvfs(path) + + return BytesToMebibyte(st.f_bavail * st.f_frsize) + + def LockedMethod(fn): """Synchronized object access decorator. diff --git a/man/ganeti-rapi.sgml b/man/ganeti-rapi.sgml index 20a82850dd8230679cf12476ec4bfd4d79e42151..00ddfde05fbd2d7980019196ff55606889eb651f 100644 --- a/man/ganeti-rapi.sgml +++ b/man/ganeti-rapi.sgml @@ -40,7 +40,6 @@ <command>&dhpackage; </command> <arg>-d</arg> <arg>-f</arg> - <arg>-p <replaceable>PORT</replaceable></arg> <arg>--no-ssl</arg> <arg>-K <replaceable>SSL_KEY_FILE</replaceable></arg> <arg>-C <replaceable>SSL_CERT_FILE</replaceable></arg> @@ -65,8 +64,8 @@ </para> <para> - The daemon will listen by default on the port 5080, but this can - be changed via the <option>-p</option> option. + The daemon will listen to the "ganeti-rapi" tcp port, as listed in the + system services database, or to port 5080 by default. </para> <para> diff --git a/man/gnt-job.sgml b/man/gnt-job.sgml index bf81ec7062e1c476caa9564b87008556633845e7..e31d417894ae1c727a783c9f91ea04a852cecd1d 100644 --- a/man/gnt-job.sgml +++ b/man/gnt-job.sgml @@ -231,6 +231,20 @@ </para> </refsect2> + + <refsect2> + <title>WATCH</title> + <cmdsynopsis> + <command>watch</command> + <arg>id</arg> + </cmdsynopsis> + + <para> + This command follows the output of the job by the given + <replaceable>id</replaceable> and prints it. + </para> + </refsect2> + </refsect1> &footer; diff --git a/man/gnt-node.sgml b/man/gnt-node.sgml index ea02f1944c46e1501f3e690de9508d0da19bf555..d7d9f852c9cd253c4ede1df01fc77fd2eb9ae3eb 100644 --- a/man/gnt-node.sgml +++ b/man/gnt-node.sgml @@ -721,6 +721,94 @@ node1.example.com /dev/hdc1 xenvg instance1.example.com-sda_11001.data 256 inst </para> </refsect2> + <refsect2> + <title>VOLUMES</title> + + <cmdsynopsis> + <command>physical-volumes</command> + <arg>--no-headers</arg> + <arg>--human-readable</arg> + <arg>--separator=<replaceable>SEPARATOR</replaceable></arg> + <arg>--output=<replaceable>FIELDS</replaceable></arg> + <sbr> + <arg rep="repeat"><replaceable>node</replaceable></arg> + </cmdsynopsis> + + <para> + Lists all physical volumes and their details from the node(s) provided. + </para> + + <para> + The <option>--no-headers</option> option will skip the initial header + line. The <option>--separator</option> option takes an argument which + denotes what will be used between the output fields. Both these options + are to help scripting. + </para> + + <para> + The units used to display the numeric values in the output varies, + depending on the options given. By default, the values will be + formatted in the most appropriate unit. If the + <option>--separator</option> option is given, then the values are shown + in mebibytes to allow parsing by scripts. In both cases, the + <option>--units</option> option can be used to enforce a given output + unit. + </para> + + <para> + The <option>-o</option> option takes a comma-separated list of + output fields. The available fields and their meaning are: + <variablelist> + <varlistentry> + <term>node</term> + <listitem> + <simpara>the node name on which the volume exists</simpara> + </listitem> + </varlistentry> + <varlistentry> + <term>name</term> + <listitem> + <simpara>the physical drive name</simpara> + </listitem> + </varlistentry> + <varlistentry> + <term>size</term> + <listitem> + <simpara>the physical drive size</simpara> + </listitem> + </varlistentry> + <varlistentry> + <term>used</term> + <listitem> + <simpara>used disk space</simpara> + </listitem> + </varlistentry> + <varlistentry> + <term>free</term> + <listitem> + <simpara>available disk space</simpara> + </listitem> + </varlistentry> + <varlistentry> + <term>allocatable</term> + <listitem> + <simpara>whether physical volume is allocatable</simpara> + </listitem> + </varlistentry> + </variablelist> + </para> + + <para> + Example: + <screen> +# gnt-node physical-volumes node5.example.com +Node Name Size Used Free +node5.example.com /dev/sda7 673.8G 0M 673.8G +node5.example.com /dev/sdb1 698.6G 1.3G 697.4G + </screen> + </para> + </refsect2> + <refsect2> <title>POWERCYCLE</title> diff --git a/qa/qa_rapi.py b/qa/qa_rapi.py index 7c668c5184436215900922150357364a6d2b50ff..1ab34f931d33879a05f7a9d7b3f126b1c6853132 100644 --- a/qa/qa_rapi.py +++ b/qa/qa_rapi.py @@ -58,25 +58,13 @@ def Enabled(): """Return whether remote API tests should be run. """ - return constants.RAPI_ENABLE and qa_config.TestEnabled('rapi') - - -def PrintRemoteAPIWarning(): - """Print warning if remote API is not enabled. - - """ - if constants.RAPI_ENABLE or not qa_config.TestEnabled('rapi'): - return - msg = ("Remote API is not enabled in this Ganeti build. Please run" - " `configure [...] --enable-rapi'.") - print - print qa_utils.FormatWarning(msg) + return qa_config.TestEnabled('rapi') def _DoTests(uris): master = qa_config.GetMasterNode() host = master["primary"] - port = qa_config.get("rapi-port", default=constants.RAPI_PORT) + port = qa_config.get("rapi-port", default=constants.DEFAULT_RAPI_PORT) for uri, verify in uris: assert uri.startswith("/") diff --git a/scripts/gnt-debug b/scripts/gnt-debug index d3bf05450c3c08cc6ad94de10d744dbc4d3273cd..3a12b2b7aaa9c1ec111a6d0f00c7ec081706243f 100755 --- a/scripts/gnt-debug +++ b/scripts/gnt-debug @@ -71,19 +71,14 @@ def GenericOpCodes(opts, args): """ cl = cli.GetClient() - job_data = [] - job_ids = [] + jex = cli.JobExecutor(cl=cl) + for fname in args: op_data = simplejson.loads(open(fname).read()) op_list = [opcodes.OpCode.LoadOpCode(val) for val in op_data] - job_data.append((fname, op_list)) - for fname, op_list in job_data: - jid = cli.SendJob(op_list, cl=cl) - ToStdout("File '%s', job id: %s", fname, jid) - job_ids.append(jid) - for jid in job_ids: - ToStdout("Waiting for job id %s", jid) - cli.PollJob(jid, cl=cl) + jex.QueueJob("file %s" % fname, *op_list) + + jex.GetResults() return 0 diff --git a/scripts/gnt-job b/scripts/gnt-job index 2da75a35b1da77b5ccf864de88ccfbc2aa0d95d3..1402583e6ae8ba80c3d628aaac0ed2cb7730a7b5 100755 --- a/scripts/gnt-job +++ b/scripts/gnt-job @@ -29,6 +29,7 @@ from ganeti.cli import * from ganeti import constants from ganeti import errors from ganeti import utils +from ganeti import cli #: default list of fields for L{ListJobs} @@ -312,6 +313,32 @@ def ShowJobs(opts, args): return 0 +def WatchJob(opts, args): + """Follow a job and print its output as it arrives. + + @param opts: the command line options selected by the user + @type args: list + @param args: Contains the job ID + @rtype: int + @return: the desired exit code + + """ + job_id = args[0] + + msg = ("Output from job %s follows" % job_id) + ToStdout(msg) + ToStdout("-" * len(msg)) + + retcode = 0 + try: + cli.PollJob(job_id) + except errors.GenericError, err: + (retcode, job_result) = cli.FormatError(err) + ToStderr("Job %s failed: %s", job_id, job_result) + + return retcode + + commands = { 'list': (ListJobs, ARGS_ANY, [DEBUG_OPT, NOHDR_OPT, SEP_OPT, FIELDS_OPT], @@ -336,6 +363,9 @@ commands = { 'info': (ShowJobs, ARGS_ANY, [DEBUG_OPT], "<job-id> [<job-id> ...]", "Show detailed information about the specified jobs"), + 'watch': (WatchJob, ARGS_ONE, [DEBUG_OPT], + "<job-id>", + "Follows a job and prints its output as it arrives"), } diff --git a/scripts/gnt-node b/scripts/gnt-node index c2b2f88bbd1450a6d3b6a6cd83db6ba3479ab000..6141e50272dd4b11869ead2df651d69c0d038fdd 100755 --- a/scripts/gnt-node +++ b/scripts/gnt-node @@ -224,17 +224,9 @@ def EvacuateNode(opts, args): src_node, txt_msg)): return constants.EXIT_CONFIRMATION - ops = [] - for iname in sinst: - op = opcodes.OpReplaceDisks(instance_name=iname, - remote_node=dst_node, - mode=constants.REPLACE_DISK_CHG, - iallocator=iallocator, - disks=[]) - ops.append(op) - - job_id = cli.SendJob(ops, cl=cl) - cli.PollJob(job_id, cl=cl) + op = opcodes.OpEvacuateNode(node_name=args[0], remote_node=dst_node, + iallocator=iallocator) + SubmitOpCode(op, cl=cl) def FailoverNode(opts, args): @@ -307,20 +299,8 @@ def MigrateNode(opts, args): (",".join("'%s'" % name for name in pinst))): return 2 - jex = JobExecutor(cl=cl) - for iname in pinst: - op = opcodes.OpMigrateInstance(instance_name=iname, live=opts.live, - cleanup=False) - jex.QueueJob(iname, op) - - results = jex.GetResults() - bad_cnt = len([row for row in results if not row[0]]) - if bad_cnt == 0: - ToStdout("All %d instance(s) migrated successfully.", len(results)) - else: - ToStdout("There were errors during the migration:\n" - "%d error(s) out of %d instance(s).", bad_cnt, len(results)) - return retcode + op = opcodes.OpMigrateNode(node_name=args[0], live=opts.live) + SubmitOpCode(op, cl=cl) def ShowNodeConfig(opts, args): @@ -445,6 +425,54 @@ def ListVolumes(opts, args): return 0 +def ListPhysicalVolumes(opts, args): + """List physical volumes on node(s). + + @param opts: the command line options selected by the user + @type args: list + @param args: should either be an empty list, in which case + we list data for all nodes, or contain a list of nodes + to display data only for those + @rtype: int + @return: the desired exit code + + """ + if opts.output is None: + selected_fields = ["node", constants.SF_NAME, constants.SF_SIZE, + constants.SF_USED, constants.SF_FREE] + else: + selected_fields = opts.output.split(",") + + op = opcodes.OpQueryNodeStorage(nodes=args, + storage_type=constants.ST_LVM_PV, + output_fields=selected_fields) + output = SubmitOpCode(op) + + if not opts.no_headers: + headers = { + "node": "Node", + constants.SF_NAME: "Name", + constants.SF_SIZE: "Size", + constants.SF_USED: "Used", + constants.SF_FREE: "Free", + constants.SF_ALLOCATABLE: "Allocatable", + } + else: + headers = None + + unitfields = [constants.SF_SIZE, constants.SF_USED, constants.SF_FREE] + numfields = [constants.SF_SIZE, constants.SF_USED, constants.SF_FREE] + + data = GenerateTable(separator=opts.separator, headers=headers, + fields=selected_fields, unitfields=unitfields, + numfields=numfields, data=output, units=opts.units) + + for line in data: + ToStdout(line) + + return 0 + + def SetNodeParams(opts, args): """Modifies a node. @@ -571,6 +599,11 @@ commands = { 'volumes': (ListVolumes, ARGS_ANY, [DEBUG_OPT, NOHDR_OPT, SEP_OPT, USEUNITS_OPT, FIELDS_OPT], "[<node_name>...]", "List logical volumes on node(s)"), + 'physical-volumes': (ListPhysicalVolumes, ARGS_ANY, + [DEBUG_OPT, NOHDR_OPT, SEP_OPT, USEUNITS_OPT, + FIELDS_OPT], + "[<node_name>...]", + "List physical volumes on node(s)"), 'list-tags': (ListTags, ARGS_ONE, [DEBUG_OPT], "<node_name>", "List the tags of the given node"), 'add-tags': (AddTags, ARGS_ATLEAST(1), [DEBUG_OPT, TAG_SRC_OPT], diff --git a/test/mocks.py b/test/mocks.py index 58e68c3c3c74d1dccaa364a1170ac383dffef1c4..4c0ae072c0a9c007b08c345452ad9fdc0956921c 100644 --- a/test/mocks.py +++ b/test/mocks.py @@ -61,6 +61,10 @@ class FakeProc: def LogInfo(self, msg, *args, **kwargs): pass + def LogStep(self, current, total, message): + pass + + class FakeContext: """Fake context object""" diff --git a/tools/burnin b/tools/burnin index d0ef8775385b58581ceee1e5bd59ddcde93f9280..b38680fc4b53ddcc809a98a7ee3253afd9c023d2 100755 --- a/tools/burnin +++ b/tools/burnin @@ -41,11 +41,16 @@ from ganeti import utils USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...") +MAX_RETRIES = 3 class InstanceDown(Exception): """The checked instance was not up""" +class BurninFailure(Exception): + """Failure detected during burning""" + + def Usage(): """Shows program usage information and exits the program.""" @@ -106,6 +111,9 @@ class Burner(object): self.to_rem = [] self.queued_ops = [] self.opts = None + self.queue_retry = False + self.disk_count = self.disk_growth = self.disk_size = None + self.hvp = self.bep = None self.ParseOptions() self.cl = cli.GetClient() self.GetState() @@ -125,7 +133,39 @@ class Burner(object): if self.opts.verbose: Log(msg, indent=3) - def ExecOp(self, *ops): + def MaybeRetry(self, retry_count, msg, fn, *args): + """Possibly retry a given function execution. + + @type retry_count: int + @param retry_count: retry counter: + - 0: non-retryable action + - 1: last retry for a retryable action + - MAX_RETRIES: original try for a retryable action + @type msg: str + @param msg: the kind of the operation + @type fn: callable + @param fn: the function to be called + + """ + try: + val = fn(*args) + if retry_count > 0 and retry_count < MAX_RETRIES: + Log("Idempotent %s succeeded after %d retries" % + (msg, MAX_RETRIES - retry_count)) + return val + except Exception, err: + if retry_count == 0: + Log("Non-idempotent %s failed, aborting" % (msg, )) + raise + elif retry_count == 1: + Log("Idempotent %s repeated failure, aborting" % (msg, )) + raise + else: + Log("Idempotent %s failed, retry #%d/%d: %s" % + (msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err)) + self.MaybeRetry(retry_count - 1, msg, fn, *args) + + def _ExecOp(self, *ops): """Execute one or more opcodes and manage the exec buffer. @result: if only opcode has been passed, we return its result; @@ -139,20 +179,48 @@ class Burner(object): else: return results + def ExecOp(self, retry, *ops): + """Execute one or more opcodes and manage the exec buffer. + + @result: if only opcode has been passed, we return its result; + otherwise we return the list of results + + """ + if retry: + rval = MAX_RETRIES + else: + rval = 0 + return self.MaybeRetry(rval, "opcode", self._ExecOp, *ops) + def ExecOrQueue(self, name, *ops): """Execute an opcode and manage the exec buffer.""" if self.opts.parallel: self.queued_ops.append((ops, name)) else: - return self.ExecOp(*ops) + return self.ExecOp(self.queue_retry, *ops) + + def StartBatch(self, retry): + """Start a new batch of jobs. + + @param retry: whether this is a retryable batch + + """ + self.queued_ops = [] + self.queue_retry = retry def CommitQueue(self): """Execute all submitted opcodes in case of parallel burnin""" if not self.opts.parallel: return + if self.queue_retry: + rval = MAX_RETRIES + else: + rval = 0 + try: - results = self.ExecJobSet(self.queued_ops) + results = self.MaybeRetry(rval, "jobset", self.ExecJobSet, + self.queued_ops) finally: self.queued_ops = [] return results @@ -171,10 +239,45 @@ class Burner(object): results = [] for jid, (_, iname) in zip(job_ids, jobs): Log("waiting for job %s for %s" % (jid, iname), indent=2) - results.append(cli.PollJob(jid, cl=self.cl, feedback_fn=self.Feedback)) - + try: + results.append(cli.PollJob(jid, cl=self.cl, feedback_fn=self.Feedback)) + except Exception, err: + Log("Job for %s failed: %s" % (iname, err)) + if len(results) != len(jobs): + raise BurninFailure() return results + def _DoCheckInstances(fn): + """Decorator for checking instances. + + """ + def wrapper(self, *args, **kwargs): + val = fn(self, *args, **kwargs) + for instance in self.instances: + self._CheckInstanceAlive(instance) + return val + + return wrapper + + def _DoBatch(retry): + """Decorator for possible batch operations. + + Must come after the _DoCheckInstances decorator (if any). + + @param retry: whether this is a retryable batch, will be + passed to StartBatch + + """ + def wrap(fn): + def batched(self, *args, **kwargs): + self.StartBatch(retry) + val = fn(self, *args, **kwargs) + self.CommitQueue() + return val + return batched + + return wrap + def ParseOptions(self): """Parses the command line options. @@ -325,14 +428,14 @@ class Burner(object): try: op = opcodes.OpQueryNodes(output_fields=["name", "offline", "drained"], names=names, use_locking=True) - result = self.ExecOp(op) + result = self.ExecOp(True, op) except errors.GenericError, err: err_code, msg = cli.FormatError(err) Err(msg, exit_code=err_code) self.nodes = [data[0] for data in result if not (data[1] or data[2])] - result = self.ExecOp(opcodes.OpDiagnoseOS(output_fields=["name", "valid"], - names=[])) + op_diagos = opcodes.OpDiagnoseOS(output_fields=["name", "valid"], names=[]) + result = self.ExecOp(True, op_diagos) if not result: Err("Can't get the OS list") @@ -343,6 +446,8 @@ class Burner(object): if self.opts.os not in os_set: Err("OS '%s' not found" % self.opts.os) + @_DoCheckInstances + @_DoBatch(False) def BurnCreateInstances(self): """Create the given instances. @@ -388,11 +493,7 @@ class Burner(object): self.ExecOrQueue(instance, op) self.to_rem.append(instance) - self.CommitQueue() - - for instance in self.instances: - self._CheckInstanceAlive(instance) - + @_DoBatch(False) def BurnGrowDisks(self): """Grow both the os and the swap disks by the requested amount, if any.""" Log("Growing disks") @@ -404,8 +505,8 @@ class Burner(object): amount=growth, wait_for_sync=True) Log("increase disk/%s by %s MB" % (idx, growth), indent=2) self.ExecOrQueue(instance, op) - self.CommitQueue() + @_DoBatch(True) def BurnReplaceDisks1D8(self): """Replace disks on primary and secondary for drbd8.""" Log("Replacing disks on the same nodes") @@ -419,8 +520,8 @@ class Burner(object): Log("run %s" % mode, indent=2) ops.append(op) self.ExecOrQueue(instance, *ops) - self.CommitQueue() + @_DoBatch(True) def BurnReplaceDisks2(self): """Replace secondary node.""" Log("Changing the secondary node") @@ -442,8 +543,9 @@ class Burner(object): disks=[i for i in range(self.disk_count)]) Log("run %s %s" % (mode, msg), indent=2) self.ExecOrQueue(instance, op) - self.CommitQueue() + @_DoCheckInstances + @_DoBatch(False) def BurnFailover(self): """Failover the instances.""" Log("Failing over instances") @@ -453,10 +555,8 @@ class Burner(object): ignore_consistency=False) self.ExecOrQueue(instance, op) - self.CommitQueue() - for instance in self.instances: - self._CheckInstanceAlive(instance) + @_DoBatch(False) def BurnMigrate(self): """Migrate the instances.""" Log("Migrating instances") @@ -469,8 +569,9 @@ class Burner(object): cleanup=True) Log("migration and migration cleanup", indent=2) self.ExecOrQueue(instance, op1, op2) - self.CommitQueue() + @_DoCheckInstances + @_DoBatch(False) def BurnImportExport(self): """Export the instance, delete it, and import it back. @@ -486,7 +587,7 @@ class Burner(object): # read the full name of the instance nam_op = opcodes.OpQueryInstances(output_fields=["name"], names=[instance], use_locking=True) - full_name = self.ExecOp(nam_op)[0][0] + full_name = self.ExecOp(False, nam_op)[0][0] if self.opts.iallocator: pnode = snode = None @@ -535,10 +636,6 @@ class Burner(object): Log("remove export", indent=2) self.ExecOrQueue(instance, exp_op, rem_op, imp_op, erem_op) - self.CommitQueue() - for instance in self.instances: - self._CheckInstanceAlive(instance) - def StopInstanceOp(self, instance): """Stop given instance.""" return opcodes.OpShutdownInstance(instance_name=instance) @@ -552,6 +649,8 @@ class Burner(object): return opcodes.OpRenameInstance(instance_name=instance, new_name=instance_new) + @_DoCheckInstances + @_DoBatch(True) def BurnStopStart(self): """Stop/start the instances.""" Log("Stopping and starting instances") @@ -561,11 +660,7 @@ class Burner(object): op2 = self.StartInstanceOp(instance) self.ExecOrQueue(instance, op1, op2) - self.CommitQueue() - - for instance in self.instances: - self._CheckInstanceAlive(instance) - + @_DoBatch(False) def BurnRemove(self): """Remove the instances.""" Log("Removing instances") @@ -575,8 +670,6 @@ class Burner(object): ignore_failures=True) self.ExecOrQueue(instance, op) - self.CommitQueue() - def BurnRename(self): """Rename the instances. @@ -594,11 +687,13 @@ class Burner(object): op_rename2 = self.RenameInstanceOp(rename, instance) op_start1 = self.StartInstanceOp(rename) op_start2 = self.StartInstanceOp(instance) - self.ExecOp(op_stop1, op_rename1, op_start1) + self.ExecOp(False, op_stop1, op_rename1, op_start1) self._CheckInstanceAlive(rename) - self.ExecOp(op_stop2, op_rename2, op_start2) + self.ExecOp(False, op_stop2, op_rename2, op_start2) self._CheckInstanceAlive(instance) + @_DoCheckInstances + @_DoBatch(True) def BurnReinstall(self): """Reinstall the instances.""" Log("Reinstalling instances") @@ -613,11 +708,8 @@ class Burner(object): op4 = self.StartInstanceOp(instance) self.ExecOrQueue(instance, op1, op2, op3, op4) - self.CommitQueue() - - for instance in self.instances: - self._CheckInstanceAlive(instance) - + @_DoCheckInstances + @_DoBatch(True) def BurnReboot(self): """Reboot the instances.""" Log("Rebooting instances") @@ -632,11 +724,8 @@ class Burner(object): ops.append(op) self.ExecOrQueue(instance, *ops) - self.CommitQueue() - - for instance in self.instances: - self._CheckInstanceAlive(instance) - + @_DoCheckInstances + @_DoBatch(True) def BurnActivateDisks(self): """Activate and deactivate disks of the instances.""" Log("Activating/deactivating disks") @@ -650,10 +739,9 @@ class Burner(object): Log("activate disks when offline", indent=2) Log("deactivate disks (when offline)", indent=2) self.ExecOrQueue(instance, op_act, op_stop, op_act, op_deact, op_start) - self.CommitQueue() - for instance in self.instances: - self._CheckInstanceAlive(instance) + @_DoCheckInstances + @_DoBatch(False) def BurnAddRemoveDisks(self): """Add and remove an extra disk for the instances.""" Log("Adding and removing disks") @@ -669,10 +757,8 @@ class Burner(object): Log("adding a disk", indent=2) Log("removing last disk", indent=2) self.ExecOrQueue(instance, op_add, op_stop, op_rem, op_start) - self.CommitQueue() - for instance in self.instances: - self._CheckInstanceAlive(instance) + @_DoBatch(False) def BurnAddRemoveNICs(self): """Add and remove an extra NIC for the instances.""" Log("Adding and removing NICs") @@ -685,7 +771,6 @@ class Burner(object): Log("adding a NIC", indent=2) Log("removing last NIC", indent=2) self.ExecOrQueue(instance, op_add, op_rem) - self.CommitQueue() def _CheckInstanceAlive(self, instance): """Check if an instance is alive by doing http checks. @@ -784,7 +869,14 @@ class Burner(object): Log(self.GetFeedbackBuf()) Log("\n\n") if not self.opts.keep_instances: - self.BurnRemove() + try: + self.BurnRemove() + except Exception, err: + if has_err: # already detected errors, so errors in removal + # are quite expected + Log("Note: error detected during instance remove: %s" % str(err)) + else: # non-expected error + raise return 0 diff --git a/tools/lvmstrap b/tools/lvmstrap index ed92a12789f1feef1b7c79159d7d8e293a071c9d..73b4834e65e6ffe6c5b7570112fe66e7c0dd51bb 100755 --- a/tools/lvmstrap +++ b/tools/lvmstrap @@ -46,6 +46,7 @@ import time from ganeti.utils import RunCmd, ReadFile from ganeti import constants +from ganeti import cli USAGE = ("\tlvmstrap diskinfo\n" "\tlvmstrap [--vgname=NAME] [--allow-removable]" @@ -485,8 +486,15 @@ def ShowDiskInfo(opts): dlist = GetDiskList(opts) print "------- Disk information -------" - print ("%5s %7s %4s %5s %-10s %s" % - ("Name", "Size[M]", "Used", "Mount", "LVM?", "Info")) + headers = { + "name": "Name", + "size": "Size[M]", + "used": "Used", + "mount": "Mount", + "lvm": "LVM?", + "info": "Info" + } + fields = ["name", "size", "used", "mount", "lvm", "info"] flatlist = [] # Flatten the [(disk, [partition,...]), ...] list @@ -499,6 +507,7 @@ def ShowDiskInfo(opts): for partname, partsize, partdev in parts: flatlist.append((partname, partsize, partdev, "")) + strlist = [] for name, size, dev, in_use in flatlist: mp, vgname, fileinfo = DevInfo(name, dev, mounts) if mp is None: @@ -513,8 +522,15 @@ def ShowDiskInfo(opts): if len(name) > 3: # Indent partitions name = " %s" % name - print ("%-5s %7.2f %-4s %-5s %-10s %s" % - (name, float(size) / 1024 / 1024, in_use, mp, lvminfo, fileinfo)) + + strlist.append([name, "%.2f" % (float(size) / 1024 / 1024), + in_use, mp, lvminfo, fileinfo]) + + data = cli.GenerateTable(headers, fields, None, + strlist, numfields=["size"]) + + for line in data: + print line def CheckReread(name):