diff --git a/.gitignore b/.gitignore index 0b4512b15222d80c33a3428562c7119127340b2e..cff682ba557a02694151859e9e1ec81357b4be2c 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,7 @@ # global ignores *.py[co] +*.swp # / /Makefile @@ -26,6 +27,9 @@ /*.tar.bz2 /*.tar.gz +# daemons +/daemons/ganeti-cleaner + # devel /devel/clean-cluster /devel/upload diff --git a/Makefile.am b/Makefile.am index 2a2466242f79b3b743b01638d2ee4c425bac6e47..062f8272953fbf1afc7ee547155a5d2875bd6c9c 100644 --- a/Makefile.am +++ b/Makefile.am @@ -26,6 +26,7 @@ DIRS = \ devel \ doc \ doc/examples \ + doc/examples/hooks \ lib \ lib/http \ lib/hypervisor \ @@ -44,6 +45,7 @@ MAINTAINERCLEANFILES = \ CLEANFILES = \ autotools/replace_vars.sed \ + daemons/ganeti-cleaner \ devel/upload \ doc/rapi-resources.gen \ doc/examples/bash_completion \ @@ -138,6 +140,9 @@ dist_sbin_SCRIPTS = \ scripts/gnt-node \ scripts/gnt-os +nodist_sbin_SCRIPTS = \ + daemons/ganeti-cleaner + dist_tools_SCRIPTS = \ tools/burnin \ tools/cfgshell \ @@ -148,7 +153,9 @@ EXTRA_DIST = \ $(MAINTAINERCLEANFILES) \ NEWS \ DEVNOTES \ + pylintrc \ autotools/docbook-wrapper \ + daemons/ganeti-cleaner.in \ devel/upload.in \ $(docrst) \ $(docdot) \ @@ -157,6 +164,7 @@ EXTRA_DIST = \ doc/examples/ganeti.initd.in \ doc/examples/ganeti.cron.in \ doc/examples/dumb-allocator \ + doc/examples/hooks/ethers \ doc/locking.txt \ test/testutils.py \ test/mocks.py \ @@ -241,6 +249,11 @@ doc/examples/%: doc/examples/%.in stamp-directories \ $(REPLACE_VARS_SED) sed -f $(REPLACE_VARS_SED) < $< > $@ +daemons/ganeti-cleaner: daemons/ganeti-cleaner.in stamp-directories \ + $(REPLACE_VARS_SED) + sed -f $(REPLACE_VARS_SED) < $< > $@ + chmod +x $@ + doc/%.html: doc/%.rst @test -n "$(RST2HTML)" || { echo 'rst2html' not found during configure; exit 1; } $(RST2HTML) $< $@ diff --git a/daemons/ganeti-cleaner.in b/daemons/ganeti-cleaner.in new file mode 100755 index 0000000000000000000000000000000000000000..1815e6d1e1015f593cfe1130eaa4edf44a214cf7 --- /dev/null +++ b/daemons/ganeti-cleaner.in @@ -0,0 +1,39 @@ +#!/bin/bash +# + +# Copyright (C) 2009 Google Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA +# 02110-1301, USA. + +set -e + +DATA_DIR=@LOCALSTATEDIR@/lib/ganeti +QUEUE_ARCHIVE_DIR=$DATA_DIR/queue/archive + +# Define how many days archived jobs should be left alone +REMOVE_AFTER=21 + +# Exit if machine is not part of a cluster +[[ -e $DATA_DIR/ssconf_master_node ]] || echo 0 + +# Exit if queue archive directory doesn't exist +[[ -d $QUEUE_ARCHIVE_DIR ]] || exit 0 + +# Remove old jobs +find $QUEUE_ARCHIVE_DIR -mindepth 2 -type f -mtime +$REMOVE_AFTER -print0 | \ +xargs -r0 rm -f + +exit 0 diff --git a/daemons/ganeti-masterd b/daemons/ganeti-masterd index ca864e9940c4b8d12a4caa61f9c843670d0bf169..d81b9444e449a2b5a1898ed29b42eac0759b548f 100755 --- a/daemons/ganeti-masterd +++ b/daemons/ganeti-masterd @@ -28,13 +28,10 @@ inheritance from parent classes requires it. import os -import errno import sys import SocketServer import time import collections -import Queue -import random import signal import logging @@ -195,6 +192,7 @@ class ClientRqHandler(SocketServer.BaseRequestHandler): def send_message(self, msg): #print "sending", msg + # TODO: sendall is not guaranteed to send everything self.request.sendall(msg + self.EOM) @@ -213,6 +211,13 @@ class ClientOps: ops = [opcodes.OpCode.LoadOpCode(state) for state in args] return queue.SubmitJob(ops) + if method == luxi.REQ_SUBMIT_MANY_JOBS: + logging.info("Received multiple jobs") + jobs = [] + for ops in args: + jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops]) + return queue.SubmitManyJobs(jobs) + elif method == luxi.REQ_CANCEL_JOB: job_id = args logging.info("Received job cancel request for %s", job_id) @@ -465,7 +470,6 @@ def main(): """Main function""" options, args = ParseOptions() - utils.debug = options.debug utils.no_fork = True if options.fork: @@ -516,7 +520,7 @@ def main(): rpc.Init() try: # activate ip - master_node = ssconf.SimpleConfigReader().GetMasterNode() + master_node = ssconf.SimpleStore().GetMasterNode() if not rpc.RpcRunner.call_node_start_master(master_node, False, False): logging.error("Can't activate master IP address") diff --git a/daemons/ganeti-noded b/daemons/ganeti-noded index 36e8e7053fcbb24ccac5f301defd0fef90771e46..cc60f87cb8be2f5ab8f4590b33179250416c0b23 100755 --- a/daemons/ganeti-noded +++ b/daemons/ganeti-noded @@ -26,9 +26,7 @@ import os import sys -import traceback import SocketServer -import errno import logging import signal @@ -754,7 +752,6 @@ def main(): global queue_lock options, args = ParseOptions() - utils.debug = options.debug if options.fork: utils.CloseFDs() @@ -762,13 +759,9 @@ def main(): for fname in (constants.SSL_CERT_FILE,): if not os.path.isfile(fname): print "config %s not there, will not run." % fname - sys.exit(5) + sys.exit(constants.EXIT_NOTCLUSTER) - try: - port = utils.GetNodeDaemonPort() - except errors.ConfigurationError, err: - print "Cluster configuration incomplete: '%s'" % str(err) - sys.exit(5) + port = utils.GetNodeDaemonPort() dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS] dirs.append((constants.LOG_OS_DIR, 0750)) diff --git a/daemons/ganeti-rapi b/daemons/ganeti-rapi index 7b9710a9bf46930beeda8342d10f32a263c96939..2288f17f4d20e2b858d5eedf4f676dbc65507c00 100755 --- a/daemons/ganeti-rapi +++ b/daemons/ganeti-rapi @@ -206,17 +206,20 @@ def ParseOptions(): parser.add_option("-f", "--foreground", dest="fork", help="Don't detach from the current terminal", default=True, action="store_false") + parser.add_option("-b", "--bind", dest="bind_address", + help="Bind address", + default="", metavar="ADDRESS") options, args = parser.parse_args() if len(args) != 0: print >> sys.stderr, "Usage: %s [-d] [-p port]" % sys.argv[0] - sys.exit(1) + sys.exit(constants.EXIT_FAILURE) if options.ssl and not (options.ssl_cert and options.ssl_key): print >> sys.stderr, ("For secure mode please provide " - "--ssl-key and --ssl-cert arguments") - sys.exit(1) + "--ssl-key and --ssl-cert arguments") + sys.exit(constants.EXIT_FAILURE) return options, args @@ -237,7 +240,7 @@ def main(): ssl_cert_path=options.ssl_cert) except Exception, err: sys.stderr.write("Can't load the SSL certificate/key: %s\n" % (err,)) - sys.exit(1) + sys.exit(constants.EXIT_FAILURE) else: ssl_params = None @@ -252,7 +255,7 @@ def main(): utils.WritePidFile(constants.RAPI_PID) try: mainloop = daemon.Mainloop() - server = RemoteApiHttpServer(mainloop, "", options.port, + server = RemoteApiHttpServer(mainloop, options.bind_address, options.port, ssl_params=ssl_params, ssl_verify_peer=False, request_executor_class= JsonErrorRequestExecutor) diff --git a/doc/examples/ganeti.cron.in b/doc/examples/ganeti.cron.in index 155411a5ce1b9fe69c230ae0c28cbb26257e05da..6d7d9b052c0c19c48899565d377fbf499ebcd6aa 100644 --- a/doc/examples/ganeti.cron.in +++ b/doc/examples/ganeti.cron.in @@ -1,3 +1,7 @@ PATH=/sbin:/bin:/usr/sbin:/usr/bin:/usr/local/sbin:/usr/local/bin -# restart failed instances + +# Restart failed instances (every 5 minutes) */5 * * * * root [ -x @SBINDIR@/ganeti-watcher ] && @SBINDIR@/ganeti-watcher + +# Clean job archive (at 01:45 AM) +45 1 * * * root [ -x @SBINDIR@/ganeti-cleaner ] && @SBINDIR@/ganeti-cleaner diff --git a/doc/examples/ganeti.initd.in b/doc/examples/ganeti.initd.in index b346530d9001ea025c02f84ec074e063785f7feb..d77595e0d85b06313f862259fe910dffced849da 100644 --- a/doc/examples/ganeti.initd.in +++ b/doc/examples/ganeti.initd.in @@ -3,12 +3,12 @@ # based on skeleton from Debian GNU/Linux ### BEGIN INIT INFO # Provides: ganeti -# Required-Start: $syslog $remote_fs xend -# Required-Stop: $syslog $remote_fs xend +# Required-Start: $syslog $remote_fs +# Required-Stop: $syslog $remote_fs # Default-Start: 2 3 4 5 # Default-Stop: 0 1 6 -# Short-Description: Ganeti Xen Cluster Manager -# Description: Ganeti Xen Cluster Manager +# Short-Description: Ganeti Cluster Manager +# Description: Ganeti Cluster Manager ### END INIT INFO PATH=/sbin:/bin:/usr/sbin:/usr/bin:/usr/local/sbin:/usr/local/bin @@ -18,24 +18,18 @@ GANETIRUNDIR="@LOCALSTATEDIR@/run/ganeti" GANETI_DEFAULTS_FILE="@SYSCONFDIR@/default/ganeti" -NODED_NAME="ganeti-noded" -NODED="@PREFIX@/sbin/${NODED_NAME}" -NODED_PID="${GANETIRUNDIR}/${NODED_NAME}.pid" +NODED="ganeti-noded" NODED_ARGS="" -MASTERD_NAME="ganeti-masterd" -MASTERD="@PREFIX@/sbin/${MASTERD_NAME}" -MASTERD_PID="${GANETIRUNDIR}/${MASTERD_NAME}.pid" +MASTERD="ganeti-masterd" MASTERD_ARGS="" -RAPI_NAME="ganeti-rapi" -RAPI="@PREFIX@/sbin/${RAPI_NAME}" -RAPI_PID="${GANETIRUNDIR}/${RAPI_NAME}.pid" +RAPI="ganeti-rapi" RAPI_ARGS="" SCRIPTNAME="@SYSCONFDIR@/init.d/ganeti" -test -f $NODED || exit 0 +test -f "@PREFIX@/sbin/$NODED" || exit 0 . /lib/lsb/init-functions @@ -71,47 +65,66 @@ check_exitcode() { } start_action() { - # called as start_action daemon pidfile + # called as start_action daemon-name local daemon="$1"; shift - local pidfile="$1"; shift log_action_begin_msg "$daemon" - start-stop-daemon --start --quiet --exec "$daemon" --pidfile "$pidfile" \ + start-stop-daemon --start --quiet \ + --pidfile "${GANETIRUNDIR}/${daemon}.pid" \ + --startas "@PREFIX@/sbin/$daemon" \ + --oknodo \ -- "$@" check_exitcode $? } stop_action() { - # called as stop_action daemon pidfile - log_action_begin_msg "$1" + # called as stop_action daemon-name + local daemon="$1" + log_action_begin_msg "$daemon" start-stop-daemon --stop --quiet --oknodo \ - --retry 30 --pidfile "$2" + --retry 30 --pidfile "${GANETIRUNDIR}/${daemon}.pid" check_exitcode $? } +maybe_do() { + requested="$1"; shift + action="$1"; shift + target="$1" + if [ -z "$requested" -o "$requested" = "$target" ]; then + $action "$@" + fi +} + +if [ -n "$2" -a \ + "$2" != "$NODED" -a \ + "$2" != "$MASTERD" -a \ + "$2" != "$RAPI" ]; then + log_failure_msg "Unknown daemon '$2' requested" + exit 1 +fi case "$1" in start) - log_daemon_msg "Starting $DESC" "$NAME" + log_daemon_msg "Starting $DESC" "$2" check_config - start_action $NODED $NODED_PID $NODED_ARGS - start_action $MASTERD $MASTERD_PID $MASTERD_ARGS - start_action $RAPI $RAPI_PID $RAPI_ARGS + maybe_do "$2" start_action $NODED $NODED_ARGS + maybe_do "$2" start_action $MASTERD $MASTERD_ARGS + maybe_do "$2" start_action $RAPI $RAPI_ARGS ;; stop) - log_daemon_msg "Stopping $DESC" "$NAME" - stop_action $RAPI $RAPI_PID - stop_action $MASTERD $MASTERD_PID - stop_action $NODED $NODED_PID + log_daemon_msg "Stopping $DESC" "$2" + maybe_do "$2" stop_action $RAPI + maybe_do "$2" stop_action $MASTERD + maybe_do "$2" stop_action $NODED ;; restart|force-reload) - log_daemon_msg "Reloading $DESC" - stop_action $RAPI $RAPI_PID - stop_action $MASTERD $MASTERD_PID - stop_action $NODED $NODED_PID + log_daemon_msg "Reloading $DESC" "$2" + maybe_do "$2" stop_action $RAPI + maybe_do "$2" stop_action $MASTERD + maybe_do "$2" stop_action $NODED check_config - start_action $NODED $NODED_PID - start_action $MASTERD $MASTERD_PID - start_action $RAPI $RAPI_PID + maybe_do "$2" start_action $NODED $NODED_ARGS + maybe_do "$2" start_action $MASTERD $MASTERD_ARGS + maybe_do "$2" start_action $RAPI $RAPI_ARGS ;; *) log_success_msg "Usage: $SCRIPTNAME {start|stop|force-reload|restart}" diff --git a/doc/examples/hooks/ethers b/doc/examples/hooks/ethers new file mode 100755 index 0000000000000000000000000000000000000000..387ed7e6fc14625d87a84053db05097ee3b016eb --- /dev/null +++ b/doc/examples/hooks/ethers @@ -0,0 +1,88 @@ +#!/bin/bash + +# Copyright (C) 2009 Google Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA +# 02110-1301, USA. + +# This is an example ganeti hook that writes the instance mac addresses in the +# node's /etc/ether file. It will pic up the first nic connected to the +# TARGET_BRIDGE bridge, and write it down with the syntax "MAC INSTANCE_NAME". + +# The hook will also send a HUP signal the daemon whose PID is in +# DAEMON_PID_FILE, so that it can load the new /etc/ethers file and use it. +# This has been tested in conjunction with dnsmasq's dhcp implementation. + +# It will also remove any other occurrences for the same instance in the +# aformentioned file. This hook supports the "instance-add", "instance-modify" +# "instance-remove", and "instance-mirror-replace" ganeti post hook paths. To +# install it add a symlink from those hooks' directories to where this file is +# installed (with a mode which permits execution). + +# TARGET_BRIDGE: We'll only add the first nic which gets connected to this +# bridge to /etc/ethers. +TARGET_BRIDGE="br0" +DAEMON_PID_FILE="/var/run/dnsmasq.pid" + +# In order to handle concurrent execution of this lock, we use the $LOCKFILE. +# LOCKFILE_CREATE and LOCKFILE_REMOVE are the path names for the lockfile-progs +# programs which we use as helpers. +LOCKFILE="/var/lock/ganeti_ethers" +LOCKFILE_CREATE="/usr/bin/lockfile-create" +LOCKFILE_REMOVE="/usr/bin/lockfile-remove" + +hooks_path=$GANETI_HOOKS_PATH +[ -n "$hooks_path" ] || exit 1 +instance=$GANETI_INSTANCE_NAME +[ -n "$instance" ] || exit 1 +nic_count=$GANETI_INSTANCE_NIC_COUNT + +acquire_lockfile() { + $LOCKFILE_CREATE $LOCKFILE || exit 1 + trap "$LOCKFILE_REMOVE $LOCKFILE" EXIT +} + +update_ethers_from_new() { + chmod 644 /etc/ethers.new + mv /etc/ethers.new /etc/ethers + [ -f "$DAEMON_PID_FILE" ] && kill -HUP $(< $DAEMON_PID_FILE) +} + +if [ "$hooks_path" = "instance-add" -o \ + "$hooks_path" = "instance-modify" -o \ + "$hooks_path" = "instance-mirror-replace" ] +then + for i in $(seq 0 $((nic_count - 1)) ); do + bridge_var="GANETI_INSTANCE_NIC${i}_BRIDGE" + bridge=${!bridge_var} + if [ -n "$bridge" -a "$bridge" = "$TARGET_BRIDGE" ]; then + mac_var="GANETI_INSTANCE_NIC${i}_MAC" + mac=${!mac_var} + acquire_lockfile + cat /etc/ethers | awk -- "! /^([[:xdigit:]:]*)[[:blank:]]+$instance\>/; + END {print \"$mac\t$instance\"}" > /etc/ethers.new + update_ethers_from_new + break + fi + done +fi +if [ "$hooks_path" = "instance-remove" -o \ + \( "$hooks_path" = "instance-modify" -a "$nic_count" -eq 0 \) ]; then + acquire_lockfile + cat /etc/ethers | awk -- "! /^([[:xdigit:]:]*)[[:blank:]]+$instance\>/" \ + > /etc/ethers.new + update_ethers_from_new +fi + diff --git a/doc/hooks.rst b/doc/hooks.rst index 7dbe7d5e92ad0bafb1f1a74bbd5cd3dbeadbd3a7..b2f05ce5b00f9bf68dd219749e9813d4734b970e 100644 --- a/doc/hooks.rst +++ b/doc/hooks.rst @@ -104,7 +104,7 @@ The scripts will be run as follows: be left -All informations about the cluster is passed using environment +All information about the cluster is passed using environment variables. Different operations will have sligthly different environments, but most of the variables are common. diff --git a/doc/iallocator.rst b/doc/iallocator.rst index f4b8bfcf81f7d64c4790ecf935f8ceeb7e93bfeb..408b908cfc67f6af90c1e4c5aec268771c58b7c6 100644 --- a/doc/iallocator.rst +++ b/doc/iallocator.rst @@ -176,7 +176,7 @@ instances nodes dictionary with the data for the nodes in the cluster, indexed by - the node name; the dict contains: + the node name; the dict contains [*]_ : total_disk the total disk size of this node (mebibytes) @@ -225,15 +225,19 @@ nodes or ``offline`` flags set. More details about these of node status flags is available in the manpage *ganeti(7)*. +.. [*] Note that no run-time data is present for offline or drained nodes; + this means the tags total_memory, reserved_memory, free_memory, total_disk, + free_disk, total_cpus, i_pri_memory and i_pri_up memory will be absent -Respone message -~~~~~~~~~~~~~~~ + +Response message +~~~~~~~~~~~~~~~~ The response message is much more simple than the input one. It is also a dict having three keys: success - a boolean value denoting if the allocation was successfull or not + a boolean value denoting if the allocation was successful or not info a string with information from the scripts; if the allocation fails, diff --git a/lib/backend.py b/lib/backend.py index 2ef35fb0b736e1213e0ece86b03da0938cb543dc..f4e308e651ad505818c89543ee98c202863ba5e1 100644 --- a/lib/backend.py +++ b/lib/backend.py @@ -19,7 +19,12 @@ # 02110-1301, USA. -"""Functions used by the node daemon""" +"""Functions used by the node daemon + +@var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in + the L{UploadFile} function + +""" import os @@ -115,6 +120,23 @@ def _CleanDirectory(path, exclude=None): utils.RemoveFile(full_name) +def _BuildUploadFileList(): + """Build the list of allowed upload files. + + This is abstracted so that it's built only once at module import time. + + """ + return frozenset([ + constants.CLUSTER_CONF_FILE, + constants.ETC_HOSTS, + constants.SSH_KNOWN_HOSTS_FILE, + constants.VNC_PASSWORD_FILE, + ]) + + +_ALLOWED_UPLOAD_FILES = _BuildUploadFileList() + + def JobQueuePurge(): """Removes job queue files and archived jobs. @@ -141,7 +163,7 @@ def GetMasterInfo(): master_netdev = cfg.GetMasterNetdev() master_ip = cfg.GetMasterIP() master_node = cfg.GetMasterNode() - except errors.ConfigurationError, err: + except errors.ConfigurationError: logging.exception("Cluster configuration incomplete") return (None, None, None) return (master_netdev, master_ip, master_node) @@ -320,7 +342,7 @@ def LeaveCluster(): def GetNodeInfo(vgname, hypervisor_type): - """Gives back a hash with different informations about the node. + """Gives back a hash with different information about the node. @type vgname: C{string} @param vgname: the name of the volume group to ask for disk space information @@ -585,7 +607,7 @@ def GetInstanceList(hypervisor_list): try: names = hypervisor.GetHypervisor(hname).ListInstances() results.extend(names) - except errors.HypervisorError, err: + except errors.HypervisorError: logging.exception("Error enumerating instances for hypevisor %s", hname) raise @@ -593,7 +615,7 @@ def GetInstanceList(hypervisor_list): def GetInstanceInfo(instance, hname): - """Gives back the informations about an instance as a dictionary. + """Gives back the information about an instance as a dictionary. @type instance: string @param instance: the instance name @@ -758,7 +780,7 @@ def RunRenameInstance(instance, old_name): def _GetVGInfo(vg_name): - """Get informations about the volume group. + """Get information about the volume group. @type vg_name: str @param vg_name: the volume group which we query @@ -930,7 +952,7 @@ def InstanceShutdown(instance): # test every 10secs for 2min time.sleep(1) - for dummy in range(11): + for _ in range(11): if instance.name not in GetInstanceList([hv_name]): break time.sleep(10) @@ -1044,7 +1066,7 @@ def AcceptInstance(instance, info, target): msg = "Failed to accept instance" logging.exception(msg) return (False, '%s: %s' % (msg, err)) - return (True, "Accept successfull") + return (True, "Accept successful") def FinalizeMigration(instance, info, success): @@ -1092,7 +1114,7 @@ def MigrateInstance(instance, target, live): msg = "Failed to migrate instance" logging.exception(msg) return (False, "%s: %s" % (msg, err)) - return (True, "Migration successfull") + return (True, "Migration successful") def BlockdevCreate(disk, size, owner, on_primary, info): @@ -1285,7 +1307,7 @@ def BlockdevAssemble(disk, owner, as_primary): def BlockdevShutdown(disk): """Shut down a block device. - First, if the device is assembled (Attach() is successfull), then + First, if the device is assembled (Attach() is successful), then the device is shutdown. Then the children of the device are shutdown. @@ -1403,7 +1425,7 @@ def BlockdevGetmirrorstatus(disks): def _RecursiveFindBD(disk): """Check if a device is activated. - If so, return informations about the real device. + If so, return information about the real device. @type disk: L{objects.Disk} @param disk: the disk object we need to find @@ -1423,7 +1445,7 @@ def _RecursiveFindBD(disk): def BlockdevFind(disk): """Check if a device is activated. - If it is, return informations about the real device. + If it is, return information about the real device. @type disk: L{objects.Disk} @param disk: the disk to find @@ -1498,14 +1520,7 @@ def UploadFile(file_name, data, mode, uid, gid, atime, mtime): file_name) return False - allowed_files = [ - constants.CLUSTER_CONF_FILE, - constants.ETC_HOSTS, - constants.SSH_KNOWN_HOSTS_FILE, - constants.VNC_PASSWORD_FILE, - ] - - if file_name not in allowed_files: + if file_name not in _ALLOWED_UPLOAD_FILES: logging.error("Filename passed to UploadFile not in allowed" " upload targets: '%s'", file_name) return False @@ -2062,7 +2077,7 @@ def BlockdevRename(devlist): # but we don't have the owner here - maybe parse from existing # cache? for now, we only lose lvm data when we rename, which # is less critical than DRBD or MD - except errors.BlockDeviceError, err: + except errors.BlockDeviceError: logging.exception("Can't rename device '%s' to '%s'", dev, unique_id) result = False return result @@ -2132,7 +2147,7 @@ def RemoveFileStorageDir(file_storage_dir): @param file_storage_dir: the directory we should cleanup @rtype: tuple (success,) @return: tuple of one element, C{success}, denoting - whether the operation was successfull + whether the operation was successful """ file_storage_dir = _TransformFileStorageDir(file_storage_dir) @@ -2147,7 +2162,7 @@ def RemoveFileStorageDir(file_storage_dir): # deletes dir only if empty, otherwise we want to return False try: os.rmdir(file_storage_dir) - except OSError, err: + except OSError: logging.exception("Cannot remove file storage directory '%s'", file_storage_dir) result = False, @@ -2176,7 +2191,7 @@ def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir): if os.path.isdir(old_file_storage_dir): try: os.rename(old_file_storage_dir, new_file_storage_dir) - except OSError, err: + except OSError: logging.exception("Cannot rename '%s' to '%s'", old_file_storage_dir, new_file_storage_dir) result = False, @@ -2582,7 +2597,7 @@ class HooksRunner(object): dir_name = "%s/%s" % (self._BASE_DIR, subdir) try: dir_contents = utils.ListVisibleFiles(dir_name) - except OSError, err: + except OSError: # FIXME: must log output in case of failures return rr @@ -2705,7 +2720,7 @@ class DevCacheManager(object): fdata = "%s %s %s\n" % (str(owner), state, iv_name) try: utils.WriteFile(fpath, data=fdata) - except EnvironmentError, err: + except EnvironmentError: logging.exception("Can't update bdev cache for %s", dev_path) @classmethod @@ -2727,5 +2742,5 @@ class DevCacheManager(object): fpath = cls._ConvertPath(dev_path) try: utils.RemoveFile(fpath) - except EnvironmentError, err: + except EnvironmentError: logging.exception("Can't update bdev cache for %s", dev_path) diff --git a/lib/bdev.py b/lib/bdev.py index 5c2c8bf2fa374b773134e429e4c606aef28d2762..131fbcf0181ea3fc7ce741cc9b534a202643d148 100644 --- a/lib/bdev.py +++ b/lib/bdev.py @@ -161,7 +161,7 @@ class BlockDev(object): """Remove this device. This makes sense only for some of the device types: LV and file - storeage. Also note that if the device can't attach, the removal + storage. Also note that if the device can't attach, the removal can't be completed. """ @@ -486,7 +486,7 @@ class LogicalVolume(BlockDev): def Assemble(self): """Assemble the device. - We alway run `lvchange -ay` on the LV to ensure it's active before + We always run `lvchange -ay` on the LV to ensure it's active before use, as there were cases when xenvg was not active after boot (also possibly after disk issues). @@ -1310,14 +1310,14 @@ class DRBD8(BaseDRBD): If sync_percent is None, it means all is ok - If estimated_time is None, it means we can't esimate + If estimated_time is None, it means we can't estimate the time needed, otherwise it's the time left in seconds. We set the is_degraded parameter to True on two conditions: network not connected or local disk missing. - We compute the ldisk parameter based on wheter we have a local + We compute the ldisk parameter based on whether we have a local disk or not. @rtype: tuple @@ -1387,14 +1387,14 @@ class DRBD8(BaseDRBD): ever_disconnected = _IgnoreError(self._ShutdownNet, self.minor) timeout_limit = time.time() + self._NET_RECONFIG_TIMEOUT - sleep_time = 0.100 # we start the retry time at 100 miliseconds + sleep_time = 0.100 # we start the retry time at 100 milliseconds while time.time() < timeout_limit: status = self.GetProcStatus() if status.is_standalone: break # retry the disconnect, it seems possible that due to a # well-time disconnect on the peer, my disconnect command might - # be ingored and forgotten + # be ignored and forgotten ever_disconnected = _IgnoreError(self._ShutdownNet, self.minor) or \ ever_disconnected time.sleep(sleep_time) @@ -1700,7 +1700,7 @@ class FileStorage(BlockDev): def Shutdown(self): """Shutdown the device. - This is a no-op for the file type, as we don't deacivate + This is a no-op for the file type, as we don't deactivate the file on shutdown. """ diff --git a/lib/bootstrap.py b/lib/bootstrap.py index 3f96561af6ee2d9b30928403b67a66cbb7b29a92..4fce0d31a3221f14e8cc200b3909f1b59f5ad130 100644 --- a/lib/bootstrap.py +++ b/lib/bootstrap.py @@ -79,24 +79,27 @@ def _GenerateSelfSignedSslCert(file_name, validity=(365 * 5)): """ (fd, tmp_file_name) = tempfile.mkstemp(dir=os.path.dirname(file_name)) try: - # Set permissions before writing key - os.chmod(tmp_file_name, 0600) - - result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024", - "-days", str(validity), "-nodes", "-x509", - "-keyout", tmp_file_name, "-out", tmp_file_name, - "-batch"]) - if result.failed: - raise errors.OpExecError("Could not generate SSL certificate, command" - " %s had exitcode %s and error message %s" % - (result.cmd, result.exit_code, result.output)) - - # Make read-only - os.chmod(tmp_file_name, 0400) - - os.rename(tmp_file_name, file_name) + try: + # Set permissions before writing key + os.chmod(tmp_file_name, 0600) + + result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024", + "-days", str(validity), "-nodes", "-x509", + "-keyout", tmp_file_name, "-out", tmp_file_name, + "-batch"]) + if result.failed: + raise errors.OpExecError("Could not generate SSL certificate, command" + " %s had exitcode %s and error message %s" % + (result.cmd, result.exit_code, result.output)) + + # Make read-only + os.chmod(tmp_file_name, 0400) + + os.rename(tmp_file_name, file_name) + finally: + utils.RemoveFile(tmp_file_name) finally: - utils.RemoveFile(tmp_file_name) + os.close(fd) def _InitGanetiServerSetup(): @@ -123,7 +126,8 @@ def _InitGanetiServerSetup(): def InitCluster(cluster_name, mac_prefix, def_bridge, master_netdev, file_storage_dir, candidate_pool_size, secondary_ip=None, vg_name=None, beparams=None, hvparams=None, - enabled_hypervisors=None, default_hypervisor=None): + enabled_hypervisors=None, default_hypervisor=None, + modify_etc_hosts=True): """Initialise the cluster. @type candidate_pool_size: int @@ -134,6 +138,14 @@ def InitCluster(cluster_name, mac_prefix, def_bridge, if config.ConfigWriter.IsCluster(): raise errors.OpPrereqError("Cluster is already initialised") + if not enabled_hypervisors: + raise errors.OpPrereqError("Enabled hypervisors list must contain at" + " least one member") + invalid_hvs = set(enabled_hypervisors) - constants.HYPER_TYPES + if invalid_hvs: + raise errors.OpPrereqError("Enabled hypervisors contains invalid" + " entries: %s" % invalid_hvs) + hostname = utils.HostInfo() if hostname.ip.startswith("127."): @@ -225,7 +237,9 @@ def InitCluster(cluster_name, mac_prefix, def_bridge, f.close() sshkey = sshline.split(" ")[1] - utils.AddHostToEtcHosts(hostname.name) + if modify_etc_hosts: + utils.AddHostToEtcHosts(hostname.name) + _InitSSHSetup() # init of cluster config file @@ -247,6 +261,7 @@ def InitCluster(cluster_name, mac_prefix, def_bridge, beparams={constants.BEGR_DEFAULT: beparams}, hvparams=hvparams, candidate_pool_size=candidate_pool_size, + modify_etc_hosts=modify_etc_hosts, ) master_node_config = objects.Node(name=hostname.name, primary_ip=hostname.ip, @@ -483,7 +498,7 @@ def GatherMasterVotes(node_list): @type node_list: list @param node_list: the list of nodes to query for master info; the current - node wil be removed if it is in the list + node will be removed if it is in the list @rtype: list @return: list of (node, votes) diff --git a/lib/cli.py b/lib/cli.py index d351f2f77fffa6f3ce7fbe37a9342894f88dc29d..8ba71988e872584be0111f661ab5b1733badfbab 100644 --- a/lib/cli.py +++ b/lib/cli.py @@ -41,6 +41,7 @@ from ganeti import rpc from optparse import (OptionParser, make_option, TitledHelpFormatter, Option, OptionValueError) + __all__ = ["DEBUG_OPT", "NOHDR_OPT", "SEP_OPT", "GenericMain", "SubmitOpCode", "GetClient", "cli_option", "ikv_option", "keyval_option", @@ -55,6 +56,7 @@ __all__ = ["DEBUG_OPT", "NOHDR_OPT", "SEP_OPT", "GenericMain", ] + def _ExtractTagsObject(opts, args): """Extract the tag type object. @@ -120,7 +122,7 @@ def ListTags(opts, args): result = list(result) result.sort() for tag in result: - print tag + ToStdout(tag) def AddTags(opts, args): @@ -320,7 +322,7 @@ keyval_option = KeyValOption def _ParseArgs(argv, commands, aliases): """Parser for the command line arguments. - This function parses the arguements and returns the function which + This function parses the arguments and returns the function which must be executed together with its (modified) arguments. @param argv: the command line @@ -335,7 +337,7 @@ def _ParseArgs(argv, commands, aliases): binary = argv[0].split("/")[-1] if len(argv) > 1 and argv[1] == "--version": - print "%s (ganeti) %s" % (binary, constants.RELEASE_VERSION) + ToStdout("%s (ganeti) %s", binary, constants.RELEASE_VERSION) # Quit right away. That way we don't have to care about this special # argument. optparse.py does it the same. sys.exit(0) @@ -345,22 +347,27 @@ def _ParseArgs(argv, commands, aliases): # let's do a nice thing sortedcmds = commands.keys() sortedcmds.sort() - print ("Usage: %(bin)s {command} [options...] [argument...]" - "\n%(bin)s <command> --help to see details, or" - " man %(bin)s\n" % {"bin": binary}) + + ToStdout("Usage: %s {command} [options...] [argument...]", binary) + ToStdout("%s <command> --help to see details, or man %s", binary, binary) + ToStdout("") + # compute the max line length for cmd + usage mlen = max([len(" %s" % cmd) for cmd in commands]) mlen = min(60, mlen) # should not get here... + # and format a nice command list - print "Commands:" + ToStdout("Commands:") for cmd in sortedcmds: cmdstr = " %s" % (cmd,) help_text = commands[cmd][4] - help_lines = textwrap.wrap(help_text, 79-3-mlen) - print "%-*s - %s" % (mlen, cmdstr, help_lines.pop(0)) + help_lines = textwrap.wrap(help_text, 79 - 3 - mlen) + ToStdout("%-*s - %s", mlen, cmdstr, help_lines.pop(0)) for line in help_lines: - print "%-*s %s" % (mlen, "", line) - print + ToStdout("%-*s %s", mlen, "", line) + + ToStdout("") + return None, None, None # get command, unalias it, and look it up in commands @@ -385,15 +392,13 @@ def _ParseArgs(argv, commands, aliases): options, args = parser.parse_args() if nargs is None: if len(args) != 0: - print >> sys.stderr, ("Error: Command %s expects no arguments" % cmd) + ToStderr("Error: Command %s expects no arguments", cmd) return None, None, None elif nargs < 0 and len(args) != -nargs: - print >> sys.stderr, ("Error: Command %s expects %d argument(s)" % - (cmd, -nargs)) + ToStderr("Error: Command %s expects %d argument(s)", cmd, -nargs) return None, None, None elif nargs >= 0 and len(args) < nargs: - print >> sys.stderr, ("Error: Command %s expects at least %d argument(s)" % - (cmd, nargs)) + ToStderr("Error: Command %s expects at least %d argument(s)", cmd, nargs) return None, None, None return func, options, args @@ -438,10 +443,10 @@ def AskUser(text, choices=None): choices = [('y', True, 'Perform the operation'), ('n', False, 'Do not perform the operation')] if not choices or not isinstance(choices, list): - raise errors.ProgrammerError("Invalid choiches argument to AskUser") + raise errors.ProgrammerError("Invalid choices argument to AskUser") for entry in choices: if not isinstance(entry, tuple) or len(entry) < 3 or entry[0] == '?': - raise errors.ProgrammerError("Invalid choiches element to AskUser") + raise errors.ProgrammerError("Invalid choices element to AskUser") answer = choices[-1][1] new_text = [] @@ -539,7 +544,7 @@ def PollJob(job_id, cl=None, feedback_fn=None): feedback_fn(log_entry[1:]) else: encoded = utils.SafeEncode(message) - print "%s %s" % (time.ctime(utils.MergeTime(timestamp)), encoded) + ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)), encoded) prev_logmsg_serial = max(prev_logmsg_serial, serial) # TODO: Handle canceled and archived jobs @@ -735,8 +740,6 @@ def GenericMain(commands, override=None, aliases=None): utils.SetupLogging(constants.LOG_COMMANDS, debug=options.debug, stderr_logging=True, program=binary) - utils.debug = options.debug - if old_cmdline: logging.info("run with arguments '%s'", old_cmdline) else: @@ -747,7 +750,7 @@ def GenericMain(commands, override=None, aliases=None): except (errors.GenericError, luxi.ProtocolError, JobSubmittedException), err: result, err_msg = FormatError(err) - logging.exception("Error durring command processing") + logging.exception("Error during command processing") ToStderr(err_msg) return result @@ -994,15 +997,23 @@ class JobExecutor(object): cl = GetClient() self.cl = cl self.verbose = verbose + self.jobs = [] def QueueJob(self, name, *ops): - """Submit a job for execution. + """Record a job for later submit. @type name: string @param name: a description of the job, will be used in WaitJobSet """ - job_id = SendJob(ops, cl=self.cl) - self.queue.append((job_id, name)) + self.queue.append((name, ops)) + + def SubmitPending(self): + """Submit all pending jobs. + + """ + results = self.cl.SubmitManyJobs([row[1] for row in self.queue]) + for ((status, data), (name, _)) in zip(results, self.queue): + self.jobs.append((status, data, name)) def GetResults(self): """Wait for and return the results of all jobs. @@ -1013,10 +1024,18 @@ class JobExecutor(object): there will be the error message """ + if not self.jobs: + self.SubmitPending() results = [] if self.verbose: - ToStdout("Submitted jobs %s", ", ".join(row[0] for row in self.queue)) - for jid, name in self.queue: + ok_jobs = [row[1] for row in self.jobs if row[0]] + if ok_jobs: + ToStdout("Submitted jobs %s", ", ".join(ok_jobs)) + for submit_status, jid, name in self.jobs: + if not submit_status: + ToStderr("Failed to submit job for %s: %s", name, jid) + results.append((False, jid)) + continue if self.verbose: ToStdout("Waiting for job %s for %s...", jid, name) try: @@ -1041,5 +1060,10 @@ class JobExecutor(object): if wait: return self.GetResults() else: - for jid, name in self.queue: - ToStdout("%s: %s", jid, name) + if not self.jobs: + self.SubmitPending() + for status, result, name in self.jobs: + if status: + ToStdout("%s: %s", result, name) + else: + ToStderr("Failure for %s: %s", name, result) diff --git a/lib/cmdlib.py b/lib/cmdlib.py index a9b3bac26a4e0894de21241e40afa1a491183fd8..bc46ba4f16bc8f4b82f96fe85098b3e2578df1ac 100644 --- a/lib/cmdlib.py +++ b/lib/cmdlib.py @@ -26,12 +26,10 @@ import os import os.path import time -import tempfile import re import platform import logging import copy -import random from ganeti import ssh from ganeti import utils @@ -40,7 +38,6 @@ from ganeti import hypervisor from ganeti import locking from ganeti import constants from ganeti import objects -from ganeti import opcodes from ganeti import serializer from ganeti import ssconf @@ -68,7 +65,7 @@ class LogicalUnit(object): def __init__(self, processor, op, context, rpc): """Constructor for LogicalUnit. - This needs to be overriden in derived classes in order to check op + This needs to be overridden in derived classes in order to check op validity. """ @@ -116,7 +113,7 @@ class LogicalUnit(object): CheckPrereq, doing these separate is better because: - ExpandNames is left as as purely a lock-related function - - CheckPrereq is run after we have aquired locks (and possible + - CheckPrereq is run after we have acquired locks (and possible waited for them) The function is allowed to change the self.op attribute so that @@ -454,7 +451,7 @@ def _CheckNodeNotDrained(lu, node): def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status, memory, vcpus, nics, disk_template, disks, - bep, hvp, hypervisor): + bep, hvp, hypervisor_name): """Builds instance related env variables for hooks This builds the hook environment from individual variables. @@ -477,15 +474,15 @@ def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status, @param nics: list of tuples (ip, bridge, mac) representing the NICs the instance has @type disk_template: string - @param disk_template: the distk template of the instance + @param disk_template: the disk template of the instance @type disks: list @param disks: the list of (size, mode) pairs @type bep: dict @param bep: the backend parameters for the instance @type hvp: dict @param hvp: the hypervisor parameters for the instance - @type hypervisor: string - @param hypervisor: the hypervisor for the instance + @type hypervisor_name: string + @param hypervisor_name: the hypervisor for the instance @rtype: dict @return: the hook environment for this instance @@ -504,7 +501,7 @@ def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status, "INSTANCE_MEMORY": memory, "INSTANCE_VCPUS": vcpus, "INSTANCE_DISK_TEMPLATE": disk_template, - "INSTANCE_HYPERVISOR": hypervisor, + "INSTANCE_HYPERVISOR": hypervisor_name, } if nics: @@ -568,7 +565,7 @@ def _BuildInstanceHookEnvByObject(lu, instance, override=None): 'disks': [(disk.size, disk.mode) for disk in instance.disks], 'bep': bep, 'hvp': hvp, - 'hypervisor': instance.hypervisor, + 'hypervisor_name': instance.hypervisor, } if override: args.update(override) @@ -592,10 +589,10 @@ def _AdjustCandidatePool(lu): def _CheckInstanceBridgesExist(lu, instance): - """Check that the brigdes needed by an instance exist. + """Check that the bridges needed by an instance exist. """ - # check bridges existance + # check bridges existence brlist = [nic.bridge for nic in instance.nics] result = lu.rpc.call_bridges_exist(instance.primary_node, brlist) result.Raise() @@ -616,7 +613,7 @@ class LUDestroyCluster(NoHooksLU): This checks whether the cluster is empty. - Any errors are signalled by raising errors.OpPrereqError. + Any errors are signaled by raising errors.OpPrereqError. """ master = self.cfg.GetMasterNode() @@ -669,7 +666,7 @@ class LUVerifyCluster(LogicalUnit): Test list: - compares ganeti version - - checks vg existance and size > 20G + - checks vg existence and size > 20G - checks config file checksum - checks ssh to other nodes @@ -908,7 +905,7 @@ class LUVerifyCluster(LogicalUnit): if bep[constants.BE_AUTO_BALANCE]: needed_mem += bep[constants.BE_MEMORY] if nodeinfo['mfree'] < needed_mem: - feedback_fn(" - ERROR: not enough memory on node %s to accomodate" + feedback_fn(" - ERROR: not enough memory on node %s to accommodate" " failovers should node %s fail" % (node, prinode)) bad = True return bad @@ -927,7 +924,7 @@ class LUVerifyCluster(LogicalUnit): def BuildHooksEnv(self): """Build hooks env. - Cluster-Verify hooks just rone in the post phase and their failure makes + Cluster-Verify hooks just ran in the post phase and their failure makes the output be logged in the verify output and the verification to fail. """ @@ -1194,7 +1191,7 @@ class LUVerifyCluster(LogicalUnit): return not bad def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result): - """Analize the post-hooks' result + """Analyze the post-hooks' result This method analyses the hook result, handles it, and sends some nicely-formatted feedback back to the user. @@ -1293,7 +1290,6 @@ class LUVerifyDisks(NoHooksLU): node_lvs = self.rpc.call_volume_list(nodes, vg_name) - to_act = set() for node in nodes: # node_volume lvs = node_lvs[node] @@ -1536,7 +1532,7 @@ def _RecursiveCheckIfLVMBased(disk): @type disk: L{objects.Disk} @param disk: the disk to check - @rtype: booleean + @rtype: boolean @return: boolean indicating whether a LD_LV dev_type was found or not """ @@ -1642,6 +1638,13 @@ class LUSetClusterParams(LogicalUnit): if self.op.enabled_hypervisors is not None: self.hv_list = self.op.enabled_hypervisors + if not self.hv_list: + raise errors.OpPrereqError("Enabled hypervisors list must contain at" + " least one member") + invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES + if invalid_hvs: + raise errors.OpPrereqError("Enabled hypervisors contains invalid" + " entries: %s" % invalid_hvs) else: self.hv_list = cluster.enabled_hypervisors @@ -1937,7 +1940,7 @@ class LURemoveNode(LogicalUnit): - it does not have primary or secondary instances - it's not the master - Any errors are signalled by raising errors.OpPrereqError. + Any errors are signaled by raising errors.OpPrereqError. """ node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name)) @@ -2258,7 +2261,7 @@ class LUAddNode(LogicalUnit): - it is resolvable - its parameters (single/dual homed) matches the cluster - Any errors are signalled by raising errors.OpPrereqError. + Any errors are signaled by raising errors.OpPrereqError. """ node_name = self.op.node_name @@ -2312,7 +2315,7 @@ class LUAddNode(LogicalUnit): raise errors.OpPrereqError("The master has a private ip but the" " new node doesn't have one") - # checks reachablity + # checks reachability if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT): raise errors.OpPrereqError("Node not reachable by ping") @@ -2403,7 +2406,8 @@ class LUAddNode(LogicalUnit): " new node: %s" % msg) # Add node to our /etc/hosts, and add key to known_hosts - utils.AddHostToEtcHosts(new_node.name) + if self.cfg.GetClusterInfo().modify_etc_hosts: + utils.AddHostToEtcHosts(new_node.name) if new_node.secondary_ip != new_node.primary_ip: result = self.rpc.call_node_has_ip_address(new_node.name, @@ -2523,12 +2527,16 @@ class LUSetNodeParams(LogicalUnit): """ node = self.node = self.cfg.GetNodeInfo(self.op.node_name) + if (self.op.master_candidate is not None or + self.op.drained is not None or + self.op.offline is not None): + # we can't change the master's node flags + if self.op.node_name == self.cfg.GetMasterNode(): + raise errors.OpPrereqError("The master role can be changed" + " only via masterfailover") + if ((self.op.master_candidate == False or self.op.offline == True or self.op.drained == True) and node.master_candidate): - # we will demote the node from master_candidate - if self.op.node_name == self.cfg.GetMasterNode(): - raise errors.OpPrereqError("The master node has to be a" - " master candidate, online and not drained") cp_size = self.cfg.GetClusterInfo().candidate_pool_size num_candidates, _ = self.cfg.GetMasterCandidateStats() if num_candidates <= cp_size: @@ -2635,14 +2643,15 @@ class LUQueryClusterInfo(NoHooksLU): "master": cluster.master_node, "default_hypervisor": cluster.default_hypervisor, "enabled_hypervisors": cluster.enabled_hypervisors, - "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor]) - for hypervisor in cluster.enabled_hypervisors]), + "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name]) + for hypervisor_name in cluster.enabled_hypervisors]), "beparams": cluster.beparams, "candidate_pool_size": cluster.candidate_pool_size, "default_bridge": cluster.default_bridge, "master_netdev": cluster.master_netdev, "volume_group_name": cluster.volume_group_name, "file_storage_dir": cluster.file_storage_dir, + "tags": list(cluster.GetTags()), } return result @@ -2814,7 +2823,7 @@ def _StartInstanceDisks(lu, instance, force): """Start the disks of an instance. """ - disks_ok, dummy = _AssembleInstanceDisks(lu, instance, + disks_ok, _ = _AssembleInstanceDisks(lu, instance, ignore_secondaries=force) if not disks_ok: _ShutdownInstanceDisks(lu, instance) @@ -3003,7 +3012,7 @@ class LUStartupInstance(LogicalUnit): _CheckNodeOnline(self, instance.primary_node) bep = self.cfg.GetClusterInfo().FillBE(instance) - # check bridges existance + # check bridges existence _CheckInstanceBridgesExist(self, instance) remote_info = self.rpc.call_instance_info(instance.primary_node, @@ -3081,7 +3090,7 @@ class LURebootInstance(LogicalUnit): _CheckNodeOnline(self, instance.primary_node) - # check bridges existance + # check bridges existence _CheckInstanceBridgesExist(self, instance) def Exec(self, feedback_fn): @@ -3752,7 +3761,7 @@ class LUFailoverInstance(LogicalUnit): self.LogInfo("Not checking memory on the secondary node as" " instance will not be started") - # check bridge existance + # check bridge existence brlist = [nic.bridge for nic in instance.nics] result = self.rpc.call_bridges_exist(target_node, brlist) result.Raise() @@ -3812,7 +3821,7 @@ class LUFailoverInstance(LogicalUnit): logging.info("Starting instance %s on node %s", instance.name, target_node) - disks_ok, dummy = _AssembleInstanceDisks(self, instance, + disks_ok, _ = _AssembleInstanceDisks(self, instance, ignore_secondaries=True) if not disks_ok: _ShutdownInstanceDisks(self, instance) @@ -3890,7 +3899,7 @@ class LUMigrateInstance(LogicalUnit): instance.name, i_be[constants.BE_MEMORY], instance.hypervisor) - # check bridge existance + # check bridge existence brlist = [nic.bridge for nic in instance.nics] result = self.rpc.call_bridges_exist(target_node, brlist) if result.failed or not result.data: @@ -4326,7 +4335,7 @@ def _GenerateDiskTemplate(lu, template_name, if len(secondary_nodes) != 0: raise errors.ProgrammerError("Wrong template configuration") - names = _GenerateUniqueNames(lu, [".disk%d" % i + names = _GenerateUniqueNames(lu, [".disk%d" % (base_index + i) for i in range(disk_count)]) for idx, disk in enumerate(disk_info): disk_index = idx + base_index @@ -4343,7 +4352,7 @@ def _GenerateDiskTemplate(lu, template_name, [primary_node, remote_node] * len(disk_info), instance_name) names = [] - for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i + for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i) for i in range(disk_count)]): names.append(lv_prefix + "_data") names.append(lv_prefix + "_meta") @@ -4611,6 +4620,12 @@ class LUCreateInstance(LogicalUnit): if not utils.IsValidMac(mac.lower()): raise errors.OpPrereqError("Invalid MAC address specified: %s" % mac) + else: + # or validate/reserve the current one + if self.cfg.IsMacInUse(mac): + raise errors.OpPrereqError("MAC address %s already in use" + " in cluster" % mac) + # bridge verification bridge = nic.get("bridge", None) if bridge is None: @@ -4749,7 +4764,7 @@ class LUCreateInstance(LogicalUnit): disks=[(d["size"], d["mode"]) for d in self.disks], bep=self.be_full, hvp=self.hv_full, - hypervisor=self.op.hypervisor, + hypervisor_name=self.op.hypervisor, )) nl = ([self.cfg.GetMasterNode(), self.op.pnode] + @@ -5568,7 +5583,6 @@ class LUReplaceDisks(LogicalUnit): logging.debug("Allocated minors %s" % (minors,)) self.proc.LogStep(4, steps_total, "changing drbd configuration") for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)): - size = dev.size info("activating a new drbd on %s for disk/%d" % (new_node, idx)) # create new devices on new_node; note that we create two IDs: # one without port, so the drbd will be activated without @@ -6103,7 +6117,7 @@ class LUSetInstanceParams(LogicalUnit): This only checks the instance list against the existing names. """ - force = self.force = self.op.force + self.force = self.op.force # checking the new params on the primary/secondary nodes @@ -6426,7 +6440,7 @@ class LUExportInstance(LogicalUnit): # remove it from its current node. In the future we could fix this by: # - making a tasklet to search (share-lock all), then create the new one, # then one to remove, after - # - removing the removal operation altoghether + # - removing the removal operation altogether self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET def DeclareLocks(self, level): @@ -6932,7 +6946,7 @@ class IAllocator(object): "master_candidate": ninfo.master_candidate, } - if not ninfo.offline: + if not (ninfo.offline or ninfo.drained): nresult.Raise() if not isinstance(nresult.data, dict): raise errors.OpExecError("Can't get data for node %s" % nname) @@ -7089,7 +7103,6 @@ class IAllocator(object): """ if call_fn is None: call_fn = self.lu.rpc.call_iallocator_runner - data = self.in_text result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text) result.Raise() diff --git a/lib/config.py b/lib/config.py index 4075156522989f8ff128631db96f6032a4631677..58871d511557b051c0ca5283619b02fdd4ac7b64 100644 --- a/lib/config.py +++ b/lib/config.py @@ -273,6 +273,20 @@ class ConfigWriter: data = self._config_data seen_lids = [] seen_pids = [] + + # global cluster checks + if not data.cluster.enabled_hypervisors: + result.append("enabled hypervisors list doesn't have any entries") + invalid_hvs = set(data.cluster.enabled_hypervisors) - constants.HYPER_TYPES + if invalid_hvs: + result.append("enabled hypervisors contains invalid entries: %s" % + invalid_hvs) + + if data.cluster.master_node not in data.nodes: + result.append("cluster has invalid primary node '%s'" % + data.cluster.master_node) + + # per-instance checks for instance_name in data.instances: instance = data.instances[instance_name] if instance.primary_node not in data.nodes: @@ -474,8 +488,8 @@ class ConfigWriter: def _AppendUsedPorts(instance_name, disk, used): duplicates = [] if disk.dev_type == constants.LD_DRBD8 and len(disk.logical_id) >= 5: - nodeA, nodeB, dummy, minorA, minorB = disk.logical_id[:5] - for node, port in ((nodeA, minorA), (nodeB, minorB)): + node_a, node_b, _, minor_a, minor_b = disk.logical_id[:5] + for node, port in ((node_a, minor_a), (node_b, minor_b)): assert node in used, ("Node '%s' of instance '%s' not found" " in node list" % (node, instance_name)) if port in used[node]: @@ -796,7 +810,7 @@ class ConfigWriter: self._config_data.instances.keys()) def _UnlockedGetInstanceInfo(self, instance_name): - """Returns informations about an instance. + """Returns information about an instance. This function is for internal use, when the config lock is already held. @@ -808,9 +822,9 @@ class ConfigWriter: @locking.ssynchronized(_config_lock, shared=1) def GetInstanceInfo(self, instance_name): - """Returns informations about an instance. + """Returns information about an instance. - It takes the information from the configuration file. Other informations of + It takes the information from the configuration file. Other information of an instance are taken from the live systems. @param instance_name: name of the instance, e.g. @@ -1150,32 +1164,6 @@ class ConfigWriter: constants.SS_RELEASE_VERSION: constants.RELEASE_VERSION, } - @locking.ssynchronized(_config_lock) - def InitConfig(self, version, cluster_config, master_node_config): - """Create the initial cluster configuration. - - It will contain the current node, which will also be the master - node, and no instances. - - @type version: int - @param version: Configuration version - @type cluster_config: objects.Cluster - @param cluster_config: Cluster configuration - @type master_node_config: objects.Node - @param master_node_config: Master node configuration - - """ - nodes = { - master_node_config.name: master_node_config, - } - - self._config_data = objects.ConfigData(version=version, - cluster=cluster_config, - nodes=nodes, - instances={}, - serial_no=1) - self._WriteConfig() - @locking.ssynchronized(_config_lock, shared=1) def GetVGName(self): """Return the volume group name. @@ -1208,7 +1196,7 @@ class ConfigWriter: @locking.ssynchronized(_config_lock, shared=1) def GetClusterInfo(self): - """Returns informations about the cluster + """Returns information about the cluster @rtype: L{objects.Cluster} @return: the cluster object diff --git a/lib/constants.py b/lib/constants.py index 3949f56b7634841e0cd498a7c1dffad91c86510b..e5dafb174fa2f3a1b2141d8412f9de510a745c74 100644 --- a/lib/constants.py +++ b/lib/constants.py @@ -224,6 +224,7 @@ DDM_REMOVE = 'remove' # common exit codes EXIT_SUCCESS = 0 EXIT_FAILURE = 1 +EXIT_NOTCLUSTER = 5 EXIT_NOTMASTER = 11 EXIT_NODESETUP_ERROR = 12 EXIT_CONFIRMATION = 13 # need user confirmation @@ -307,6 +308,7 @@ HV_INITRD_PATH = "initrd_path" HV_ROOT_PATH = "root_path" HV_SERIAL_CONSOLE = "serial_console" HV_USB_MOUSE = "usb_mouse" +HV_DEVICE_MODEL = "device_model" HVS_PARAMETER_TYPES = { HV_BOOT_ORDER: VTYPE_STRING, @@ -325,6 +327,7 @@ HVS_PARAMETER_TYPES = { HV_ROOT_PATH: VTYPE_STRING, HV_SERIAL_CONSOLE: VTYPE_BOOL, HV_USB_MOUSE: VTYPE_STRING, + HV_DEVICE_MODEL: VTYPE_STRING, } HVS_PARAMETERS = frozenset(HVS_PARAMETER_TYPES.keys()) @@ -453,20 +456,24 @@ JOB_STATUS_CANCELED = "canceled" JOB_STATUS_SUCCESS = "success" JOB_STATUS_ERROR = "error" +# OpCode status +# not yet finalized OP_STATUS_QUEUED = "queued" OP_STATUS_WAITLOCK = "waiting" OP_STATUS_CANCELING = "canceling" OP_STATUS_RUNNING = "running" +# finalized OP_STATUS_CANCELED = "canceled" OP_STATUS_SUCCESS = "success" OP_STATUS_ERROR = "error" +OPS_FINALIZED = frozenset([OP_STATUS_CANCELED, + OP_STATUS_SUCCESS, + OP_STATUS_ERROR]) # Execution log types ELOG_MESSAGE = "message" ELOG_PROGRESS = "progress" -# Temporary RAPI constants until we have cluster parameters -RAPI_ENABLE = True RAPI_PORT = 5080 # max dynamic devices @@ -505,6 +512,8 @@ HVC_DEFAULTS = { HV_VNC_BIND_ADDRESS: '0.0.0.0', HV_ACPI: True, HV_PAE: True, + HV_KERNEL_PATH: "/usr/lib/xen/boot/hvmloader", + HV_DEVICE_MODEL: "/usr/lib/xen/bin/qemu-dm", }, HT_KVM: { HV_KERNEL_PATH: "/boot/vmlinuz-2.6-kvmU", diff --git a/lib/daemon.py b/lib/daemon.py index 26f9cdfe742d61e4313027a9ac6acc3ecbcdc112..5115837e2a343dc0ab272dd502134ab0cccc98d4 100644 --- a/lib/daemon.py +++ b/lib/daemon.py @@ -215,7 +215,7 @@ class Mainloop(object): """ for owner in self._signal_wait: - owner.OnSignal(signal.SIGCHLD) + owner.OnSignal(signum) def RegisterIO(self, owner, fd, condition): """Registers a receiver for I/O notifications diff --git a/lib/http/__init__.py b/lib/http/__init__.py index 008cf9cb6712592dd08a718fe66a256df6cf29db..c98fa586dd10323f265ec11cd01f0b5e884f93e7 100644 --- a/lib/http/__init__.py +++ b/lib/http/__init__.py @@ -367,15 +367,12 @@ def SocketOperation(sock, op, arg1, timeout): # TODO: event_poll/event_check/override if op in (SOCKOP_SEND, SOCKOP_HANDSHAKE): event_poll = select.POLLOUT - event_check = select.POLLOUT elif op == SOCKOP_RECV: event_poll = select.POLLIN - event_check = select.POLLIN | select.POLLPRI elif op == SOCKOP_SHUTDOWN: event_poll = None - event_check = None # The timeout is only used when OpenSSL requests polling for a condition. # It is not advisable to have no timeout for shutdown. @@ -744,7 +741,7 @@ class HttpMessageWriter(object): def HasMessageBody(self): """Checks whether the HTTP message contains a body. - Can be overriden by subclasses. + Can be overridden by subclasses. """ return bool(self._msg.body) @@ -937,7 +934,7 @@ class HttpMessageReader(object): def ParseStartLine(self, start_line): """Parses the start line of a message. - Must be overriden by subclass. + Must be overridden by subclass. @type start_line: string @param start_line: Start line string diff --git a/lib/http/auth.py b/lib/http/auth.py index 8a8d7201e604bcc99ea8e9cce57f844affa2498d..670b897b3a8d1fc6253a158f398cbd14999ead7c 100644 --- a/lib/http/auth.py +++ b/lib/http/auth.py @@ -23,12 +23,10 @@ """ import logging -import time import re import base64 import binascii -from ganeti import constants from ganeti import utils from ganeti import http @@ -80,7 +78,7 @@ class HttpServerRequestAuthentication(object): def GetAuthRealm(self, req): """Returns the authentication realm for a request. - MAY be overriden by a subclass, which then can return different realms for + MAY be overridden by a subclass, which then can return different realms for different paths. Returning "None" means no authentication is needed for a request. @@ -195,7 +193,7 @@ class HttpServerRequestAuthentication(object): def Authenticate(self, req, user, password): """Checks the password for a user. - This function MUST be overriden by a subclass. + This function MUST be overridden by a subclass. """ raise NotImplementedError() diff --git a/lib/http/client.py b/lib/http/client.py index 776fadeae37545f8659f1bb6cc896feddc840e90..717581f6f4295dfdab325a6bae15780d3793c4cc 100644 --- a/lib/http/client.py +++ b/lib/http/client.py @@ -22,23 +22,13 @@ """ -import BaseHTTPServer -import cgi -import logging -import OpenSSL import os import select import socket -import sys -import time -import signal import errno import threading -from ganeti import constants -from ganeti import serializer from ganeti import workerpool -from ganeti import utils from ganeti import http diff --git a/lib/http/server.py b/lib/http/server.py index b74eb3674121dc64958c3a81916971fde2aaad19..0afdcd00d3da52e9d25541f250a8d04a20b7c28b 100644 --- a/lib/http/server.py +++ b/lib/http/server.py @@ -31,9 +31,6 @@ import socket import time import signal -from ganeti import constants -from ganeti import serializer -from ganeti import utils from ganeti import http @@ -498,7 +495,7 @@ class HttpServer(http.HttpBase): # As soon as too many children run, we'll not respond to new # requests. The real solution would be to add a timeout for children # and killing them after some time. - pid, status = os.waitpid(0, 0) + pid, _ = os.waitpid(0, 0) except os.error: pid = None if pid and pid in self._children: @@ -536,14 +533,14 @@ class HttpServer(http.HttpBase): def PreHandleRequest(self, req): """Called before handling a request. - Can be overriden by a subclass. + Can be overridden by a subclass. """ def HandleRequest(self, req): """Handles a request. - Must be overriden by subclass. + Must be overridden by subclass. """ raise NotImplementedError() diff --git a/lib/hypervisor/hv_fake.py b/lib/hypervisor/hv_fake.py index ccac8427730d419579e141910fba4f204f95b449..73ed60ee9f223a2d6889b0f1255c302b7cfdb83a 100644 --- a/lib/hypervisor/hv_fake.py +++ b/lib/hypervisor/hv_fake.py @@ -25,7 +25,6 @@ import os import os.path -import re from ganeti import utils from ganeti import constants diff --git a/lib/hypervisor/hv_xen.py b/lib/hypervisor/hv_xen.py index 4579f4db8b5e6677ed08a80bfc6aceb647697f32..8a29d6b704b6ade68e64a05baa2625516794e292 100644 --- a/lib/hypervisor/hv_xen.py +++ b/lib/hypervisor/hv_xen.py @@ -89,7 +89,7 @@ class XenHypervisor(hv_base.BaseHypervisor): @return: list of (name, id, memory, vcpus, state, time spent) """ - for dummy in range(5): + for _ in range(5): result = utils.RunCmd(["xm", "list"]) if not result.failed: break @@ -518,6 +518,8 @@ class XenHvmHypervisor(XenHypervisor): constants.HV_NIC_TYPE, constants.HV_PAE, constants.HV_VNC_BIND_ADDRESS, + constants.HV_KERNEL_PATH, + constants.HV_DEVICE_MODEL, ] @classmethod @@ -559,6 +561,19 @@ class XenHvmHypervisor(XenHypervisor): " be an absolute path or None, not %s" % iso_path) + if not hvparams[constants.HV_KERNEL_PATH]: + raise errors.HypervisorError("Need a kernel for the instance") + + if not os.path.isabs(hvparams[constants.HV_KERNEL_PATH]): + raise errors.HypervisorError("The kernel path must be an absolute path") + + if not hvparams[constants.HV_DEVICE_MODEL]: + raise errors.HypervisorError("Need a device model for the instance") + + if not os.path.isabs(hvparams[constants.HV_DEVICE_MODEL]): + raise errors.HypervisorError("The device model must be an absolute path") + + def ValidateParameters(self, hvparams): """Check the given parameters for validity. @@ -580,6 +595,16 @@ class XenHvmHypervisor(XenHypervisor): " an existing regular file, not %s" % iso_path) + kernel_path = hvparams[constants.HV_KERNEL_PATH] + if not os.path.isfile(kernel_path): + raise errors.HypervisorError("Instance kernel '%s' not found or" + " not a file" % kernel_path) + + device_model = hvparams[constants.HV_DEVICE_MODEL] + if not os.path.isfile(device_model): + raise errors.HypervisorError("Device model '%s' not found or" + " not a file" % device_model) + @classmethod def _WriteConfigFile(cls, instance, block_devices): """Create a Xen 3.1 HVM config file. @@ -589,25 +614,25 @@ class XenHvmHypervisor(XenHypervisor): config = StringIO() config.write("# this is autogenerated by Ganeti, please do not edit\n#\n") - config.write("kernel = '/usr/lib/xen/boot/hvmloader'\n") + + # kernel handling + kpath = hvp[constants.HV_KERNEL_PATH] + config.write("kernel = '%s'\n" % kpath) + config.write("builder = 'hvm'\n") config.write("memory = %d\n" % instance.beparams[constants.BE_MEMORY]) config.write("vcpus = %d\n" % instance.beparams[constants.BE_VCPUS]) config.write("name = '%s'\n" % instance.name) - if instance.hvparams[constants.HV_PAE]: + if hvp[constants.HV_PAE]: config.write("pae = 1\n") else: config.write("pae = 0\n") - if instance.hvparams[constants.HV_ACPI]: + if hvp[constants.HV_ACPI]: config.write("acpi = 1\n") else: config.write("acpi = 0\n") config.write("apic = 1\n") - arch = os.uname()[4] - if '64' in arch: - config.write("device_model = '/usr/lib64/xen/bin/qemu-dm'\n") - else: - config.write("device_model = '/usr/lib/xen/bin/qemu-dm'\n") + config.write("device_model = '%s'\n" % hvp[constants.HV_DEVICE_MODEL]) config.write("boot = '%s'\n" % hvp[constants.HV_BOOT_ORDER]) config.write("sdl = 0\n") config.write("usb = 1\n") diff --git a/lib/jqueue.py b/lib/jqueue.py index 3364a93bb180028e0519bd3f1b221f785ae746e1..9219caedecee7ca141ecdf5b78140ad394aebf9a 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -69,7 +69,7 @@ def TimeStampNow(): class _QueuedOpCode(object): - """Encasulates an opcode object. + """Encapsulates an opcode object. @ivar log: holds the execution log and consists of tuples of the form C{(log_serial, timestamp, level, message)} @@ -80,6 +80,10 @@ class _QueuedOpCode(object): @ivar stop_timestamp: timestamp for the end of the execution """ + __slots__ = ["input", "status", "result", "log", + "start_timestamp", "end_timestamp", + "__weakref__"] + def __init__(self, op): """Constructor for the _QuededOpCode. @@ -152,6 +156,11 @@ class _QueuedJob(object): @ivar change: a Condition variable we use for waiting for job changes """ + __slots__ = ["queue", "id", "ops", "run_op_index", "log_serial", + "received_timestamp", "start_timestamp", "end_timestamp", + "change", + "__weakref__"] + def __init__(self, queue, job_id, ops): """Constructor for the _QueuedJob. @@ -286,7 +295,7 @@ class _QueuedJob(object): """Selectively returns the log entries. @type newer_than: None or int - @param newer_than: if this is None, return all log enties, + @param newer_than: if this is None, return all log entries, otherwise return only the log entries with serial higher than this value @rtype: list @@ -304,6 +313,26 @@ class _QueuedJob(object): return entries + def MarkUnfinishedOps(self, status, result): + """Mark unfinished opcodes with a given status and result. + + This is an utility function for marking all running or waiting to + be run opcodes with a given status. Opcodes which are already + finalised are not changed. + + @param status: a given opcode status + @param result: the opcode result + + """ + not_marked = True + for op in self.ops: + if op.status in constants.OPS_FINALIZED: + assert not_marked, "Finalized opcodes found after non-finalized ones" + continue + op.status = status + op.result = result + not_marked = False + class _JobQueueWorker(workerpool.BaseWorker): """The actual job workers. @@ -353,6 +382,15 @@ class _JobQueueWorker(workerpool.BaseWorker): count = len(job.ops) for idx, op in enumerate(job.ops): op_summary = op.input.Summary() + if op.status == constants.OP_STATUS_SUCCESS: + # this is a job that was partially completed before master + # daemon shutdown, so it can be expected that some opcodes + # are already completed successfully (if any did error + # out, then the whole job should have been aborted and not + # resubmitted for processing) + logging.info("Op %s/%s: opcode %s already processed, skipping", + idx + 1, count, op_summary) + continue try: logging.info("Op %s/%s: Starting opcode %s", idx + 1, count, op_summary) @@ -446,7 +484,7 @@ class _JobQueueWorker(workerpool.BaseWorker): queue.acquire() try: try: - job.run_op_idx = -1 + job.run_op_index = -1 job.end_timestamp = TimeStampNow() queue.UpdateJobUnlocked(job) finally: @@ -469,7 +507,7 @@ class _JobQueueWorkerPool(workerpool.WorkerPool): class JobQueue(object): - """Quue used to manaage the jobs. + """Queue used to manage the jobs. @cvar _RE_JOB_FILE: regex matching the valid job file names @@ -575,9 +613,8 @@ class JobQueue(object): constants.JOB_STATUS_CANCELING): logging.warning("Unfinished job %s found: %s", job.id, job) try: - for op in job.ops: - op.status = constants.OP_STATUS_ERROR - op.result = "Unclean master daemon shutdown" + job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, + "Unclean master daemon shutdown") finally: self.UpdateJobUnlocked(job) @@ -651,7 +688,7 @@ class JobQueue(object): Since we aim to keep consistency should this node (the current master) fail, we will log errors if our rpc fail, and especially - log the case when more than half of the nodes failes. + log the case when more than half of the nodes fails. @param result: the data as returned from the rpc call @type nodes: list @@ -759,26 +796,31 @@ class JobQueue(object): """ return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY) - def _NewSerialUnlocked(self): + def _NewSerialsUnlocked(self, count): """Generates a new job identifier. Job identifiers are unique during the lifetime of a cluster. + @type count: integer + @param count: how many serials to return @rtype: str @return: a string representing the job identifier. """ + assert count > 0 # New number - serial = self._last_serial + 1 + serial = self._last_serial + count # Write to file self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE, "%s\n" % serial) + result = [self._FormatJobID(v) + for v in range(self._last_serial, serial + 1)] # Keep it only if we were able to write the file self._last_serial = serial - return self._FormatJobID(serial) + return result @staticmethod def _GetJobPath(job_id): @@ -934,7 +976,7 @@ class JobQueue(object): and in the future we might merge them. @type drain_flag: boolean - @param drain_flag: wheter to set or unset the drain flag + @param drain_flag: Whether to set or unset the drain flag """ if drain_flag: @@ -943,14 +985,15 @@ class JobQueue(object): utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE) return True - @utils.LockedMethod @_RequireOpenQueue - def SubmitJob(self, ops): + def _SubmitJobUnlocked(self, job_id, ops): """Create and store a new job. This enters the job into our job queue and also puts it on the new queue, in order for it to be picked up by the queue processors. + @type job_id: job ID + @param jod_id: the job ID for the new job @type ops: list @param ops: The list of OpCodes that will become the new job. @rtype: job ID @@ -959,7 +1002,7 @@ class JobQueue(object): """ if self._IsQueueMarkedDrain(): - raise errors.JobQueueDrainError() + raise errors.JobQueueDrainError("Job queue is drained, refusing job") # Check job queue size size = len(self._ListJobFiles()) @@ -972,8 +1015,6 @@ class JobQueue(object): if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT: raise errors.JobQueueFull() - # Get job identifier - job_id = self._NewSerialUnlocked() job = _QueuedJob(self, job_id, ops) # Write to disk @@ -987,6 +1028,39 @@ class JobQueue(object): return job.id + @utils.LockedMethod + @_RequireOpenQueue + def SubmitJob(self, ops): + """Create and store a new job. + + @see: L{_SubmitJobUnlocked} + + """ + job_id = self._NewSerialsUnlocked(1)[0] + return self._SubmitJobUnlocked(job_id, ops) + + @utils.LockedMethod + @_RequireOpenQueue + def SubmitManyJobs(self, jobs): + """Create and store multiple jobs. + + @see: L{_SubmitJobUnlocked} + + """ + results = [] + all_job_ids = self._NewSerialsUnlocked(len(jobs)) + for job_id, ops in zip(all_job_ids, jobs): + try: + data = self._SubmitJobUnlocked(job_id, ops) + status = True + except errors.GenericError, err: + data = str(err) + status = False + results.append((status, data)) + + return results + + @_RequireOpenQueue def UpdateJobUnlocked(self, job): """Update a job's on disk storage. @@ -1034,6 +1108,10 @@ class JobQueue(object): """ logging.debug("Waiting for changes in job %s", job_id) + + job_info = None + log_entries = None + end_time = time.time() + timeout while True: delta_time = end_time - time.time() @@ -1075,7 +1153,10 @@ class JobQueue(object): logging.debug("Job %s changed", job_id) - return (job_info, log_entries) + if job_info is None and log_entries is None: + return None + else: + return (job_info, log_entries) @utils.LockedMethod @_RequireOpenQueue @@ -1099,8 +1180,8 @@ class JobQueue(object): if job_status not in (constants.JOB_STATUS_QUEUED, constants.JOB_STATUS_WAITLOCK): - logging.debug("Job %s is no longer in the queue", job.id) - return (False, "Job %s is no longer in the queue" % job.id) + logging.debug("Job %s is no longer waiting in the queue", job.id) + return (False, "Job %s is no longer waiting in the queue" % job.id) if job_status == constants.JOB_STATUS_QUEUED: self.CancelJobUnlocked(job) @@ -1109,8 +1190,7 @@ class JobQueue(object): elif job_status == constants.JOB_STATUS_WAITLOCK: # The worker will notice the new status and cancel the job try: - for op in job.ops: - op.status = constants.OP_STATUS_CANCELING + job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None) finally: self.UpdateJobUnlocked(job) return (True, "Job %s will be canceled" % job.id) @@ -1121,9 +1201,8 @@ class JobQueue(object): """ try: - for op in job.ops: - op.status = constants.OP_STATUS_CANCELED - op.result = "Job canceled by request" + job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED, + "Job canceled by request") finally: self.UpdateJobUnlocked(job) diff --git a/lib/jstore.py b/lib/jstore.py index 4d9189e39eee121bd861a713a20116f2387ba904..5c5996807e7881d8f1ead4a4bb551a6a653f6188 100644 --- a/lib/jstore.py +++ b/lib/jstore.py @@ -22,9 +22,7 @@ """Module implementing the job queue handling.""" import os -import logging import errno -import re from ganeti import constants from ganeti import errors diff --git a/lib/locking.py b/lib/locking.py index 647e14f4c030b393dc3addfe144bfa645c5e8627..d24abdf4dc72d08be7b0e927b645ca64c1ed34c0 100644 --- a/lib/locking.py +++ b/lib/locking.py @@ -297,7 +297,7 @@ class SharedLock: # Whenever we want to acquire a full LockSet we pass None as the value -# to acquire. Hide this behing this nicely named constant. +# to acquire. Hide this behind this nicely named constant. ALL_SET = None @@ -498,7 +498,7 @@ class LockSet: # Of course something is going to be really wrong, after this. if lock._is_owned(): lock.release() - raise + raise except: # If something went wrong and we had the set-lock let's release it... @@ -689,7 +689,7 @@ BGL = 'BGL' class GanetiLockManager: """The Ganeti Locking Library - The purpouse of this small library is to manage locking for ganeti clusters + The purpose of this small library is to manage locking for ganeti clusters in a central place, while at the same time doing dynamic checks against possible deadlocks. It will also make it easier to transition to a different lock type should we migrate away from python threads. @@ -774,7 +774,7 @@ class GanetiLockManager: """Acquire a set of resource locks, at the same level. @param level: the level at which the locks shall be acquired; - it must be a memmber of LEVELS. + it must be a member of LEVELS. @param names: the names of the locks which shall be acquired (special lock names, or instance/node names) @param shared: whether to acquire in shared mode; by default @@ -809,7 +809,7 @@ class GanetiLockManager: mode, before releasing them. @param level: the level at which the locks shall be released; - it must be a memmber of LEVELS + it must be a member of LEVELS @param names: the names of the locks which shall be released (defaults to all the locks acquired at that level) @@ -827,7 +827,7 @@ class GanetiLockManager: """Add locks at the specified level. @param level: the level at which the locks shall be added; - it must be a memmber of LEVELS_MOD. + it must be a member of LEVELS_MOD. @param names: names of the locks to acquire @param acquired: whether to acquire the newly added locks @param shared: whether the acquisition will be shared diff --git a/lib/luxi.py b/lib/luxi.py index 308de9f618512964be37e09ed5d54cce4daea8be..11ea61d29d830724401f0f189bf95abbdc1654dd 100644 --- a/lib/luxi.py +++ b/lib/luxi.py @@ -45,6 +45,7 @@ KEY_SUCCESS = "success" KEY_RESULT = "result" REQ_SUBMIT_JOB = "SubmitJob" +REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs" REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange" REQ_CANCEL_JOB = "CancelJob" REQ_ARCHIVE_JOB = "ArchiveJob" @@ -186,12 +187,13 @@ class Transport: raise EncodingError("Message terminator found in payload") self._CheckSocket() try: + # TODO: sendall is not guaranteed to send everything self.socket.sendall(msg + self.eom) except socket.timeout, err: raise TimeoutError("Sending timeout: %s" % str(err)) def Recv(self): - """Try to receive a messae from the socket. + """Try to receive a message from the socket. In case we already have messages queued, we just return from the queue. Otherwise, we try to read data with a _rwtimeout network @@ -204,10 +206,16 @@ class Transport: while not self._msgs: if time.time() > etime: raise TimeoutError("Extended receive timeout") - try: - data = self.socket.recv(4096) - except socket.timeout, err: - raise TimeoutError("Receive timeout: %s" % str(err)) + while True: + try: + data = self.socket.recv(4096) + except socket.error, err: + if err.args and err.args[0] == errno.EAGAIN: + continue + raise + except socket.timeout, err: + raise TimeoutError("Receive timeout: %s" % str(err)) + break if not data: raise ConnectionClosedError("Connection closed while reading") new_msgs = (self._buffer + data).split(self.eom) @@ -277,7 +285,7 @@ class Client(object): old_transp = self.transport self.transport = None old_transp.Close() - except Exception, err: + except Exception: pass def CallMethod(self, method, args): @@ -335,6 +343,12 @@ class Client(object): ops_state = map(lambda op: op.__getstate__(), ops) return self.CallMethod(REQ_SUBMIT_JOB, ops_state) + def SubmitManyJobs(self, jobs): + jobs_state = [] + for ops in jobs: + jobs_state.append([op.__getstate__() for op in ops]) + return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state) + def CancelJob(self, job_id): return self.CallMethod(REQ_CANCEL_JOB, job_id) diff --git a/lib/mcpu.py b/lib/mcpu.py index 1e42d12cf93a0b84db373a37d8d79a239a795eeb..959a83785331c847569a3f2f0c083f838a76cdf9 100644 --- a/lib/mcpu.py +++ b/lib/mcpu.py @@ -159,7 +159,7 @@ class Processor(object): self.context.glm.add(level, add_locks, acquired=1, shared=share) except errors.LockError: raise errors.OpPrereqError( - "Coudn't add locks (%s), probably because of a race condition" + "Couldn't add locks (%s), probably because of a race condition" " with another job, who added them first" % add_locks) try: try: @@ -188,7 +188,7 @@ class Processor(object): @type run_notifier: callable (no arguments) or None @param run_notifier: this function (if callable) will be called when we are about to call the lu's Exec() method, that - is, after we have aquired all locks + is, after we have acquired all locks """ if not isinstance(op, opcodes.OpCode): @@ -362,4 +362,4 @@ class HooksMaster(object): phase = constants.HOOKS_PHASE_POST hpath = constants.HOOKS_NAME_CFGUPDATE nodes = [self.lu.cfg.GetMasterNode()] - results = self._RunWrapper(nodes, hpath, phase) + self._RunWrapper(nodes, hpath, phase) diff --git a/lib/objects.py b/lib/objects.py index 947678bec28f3ebb53229c6edf71c205e9f5994d..d5f446ed7c25ea0742f7e2a202d263f36549295a 100644 --- a/lib/objects.py +++ b/lib/objects.py @@ -58,6 +58,7 @@ class ConfigObject(object): def __init__(self, **kwargs): for k, v in kwargs.iteritems(): setattr(self, k, v) + self.UpgradeConfig() def __getattr__(self, name): if name not in self.__slots__: @@ -165,6 +166,15 @@ class ConfigObject(object): """Implement __repr__ for ConfigObjects.""" return repr(self.ToDict()) + def UpgradeConfig(self): + """Fill defaults for missing configuration values. + + This method will be called at object init time, and its implementation will + be object dependent. + + """ + pass + class TaggableObject(ConfigObject): """An generic class supporting tags. @@ -521,10 +531,10 @@ class Disk(ConfigObject): """Checks that this disk is correctly configured. """ - errors = [] + all_errors = [] if self.mode not in constants.DISK_ACCESS_SET: - errors.append("Disk access mode '%s' is invalid" % (self.mode, )) - return errors + all_errors.append("Disk access mode '%s' is invalid" % (self.mode, )) + return all_errors class Instance(TaggableObject): @@ -742,8 +752,30 @@ class Cluster(TaggableObject): "hvparams", "beparams", "candidate_pool_size", + "modify_etc_hosts", ] + def UpgradeConfig(self): + """Fill defaults for missing configuration values. + + """ + if self.hvparams is None: + self.hvparams = constants.HVC_DEFAULTS + else: + for hypervisor in self.hvparams: + self.hvparams[hypervisor] = self.FillDict( + constants.HVC_DEFAULTS[hypervisor], self.hvparams[hypervisor]) + + if self.beparams is None: + self.beparams = {constants.BEGR_DEFAULT: constants.BEC_DEFAULTS} + else: + for begroup in self.beparams: + self.beparams[begroup] = self.FillDict(constants.BEC_DEFAULTS, + self.beparams[begroup]) + + if self.modify_etc_hosts is None: + self.modify_etc_hosts = True + def ToDict(self): """Custom function for cluster. diff --git a/lib/opcodes.py b/lib/opcodes.py index 8fe24a9051811e181c7b062e89abb74c088a6c45..6a86477adfdc7163672e6952c47bc3433be5102a 100644 --- a/lib/opcodes.py +++ b/lib/opcodes.py @@ -142,14 +142,9 @@ class OpCode(BaseOpCode): raise ValueError("Invalid data to LoadOpcode, missing OP_ID") op_id = data["OP_ID"] op_class = None - for item in globals().values(): - if (isinstance(item, type) and - issubclass(item, cls) and - hasattr(item, "OP_ID") and - getattr(item, "OP_ID") == op_id): - op_class = item - break - if op_class is None: + if op_id in OP_MAPPING: + op_class = OP_MAPPING[op_id] + else: raise ValueError("Invalid data to LoadOpCode: OP_ID %s unsupported" % op_id) op = op_class() @@ -598,3 +593,7 @@ class OpTestAllocator(OpCode): "mem_size", "disks", "disk_template", "os", "tags", "nics", "vcpus", "hypervisor", ] + +OP_MAPPING = dict([(v.OP_ID, v) for v in globals().values() + if (isinstance(v, type) and issubclass(v, OpCode) and + hasattr(v, "OP_ID"))]) diff --git a/lib/rapi/baserlib.py b/lib/rapi/baserlib.py index 7a025274c0964433c3e55c4b87c51d652dc6cb96..77a7e625d5c0f95e4b6076325c2a1db1cd18881f 100644 --- a/lib/rapi/baserlib.py +++ b/lib/rapi/baserlib.py @@ -25,8 +25,6 @@ import logging -import ganeti.cli - from ganeti import luxi from ganeti import rapi from ganeti import http @@ -247,7 +245,7 @@ class R_Generic(object): val = 0 try: val = int(val) - except (ValueError, TypeError), err: + except (ValueError, TypeError): raise http.HttpBadRequest("Invalid value for the" " '%s' parameter" % (name,)) return val diff --git a/lib/rapi/rlib2.py b/lib/rapi/rlib2.py index 45f649bea6611f3eda98395d9043a500095fdcd1..96671ec9020cae46946c43c033d7aa782aad07d6 100644 --- a/lib/rapi/rlib2.py +++ b/lib/rapi/rlib2.py @@ -46,6 +46,8 @@ N_FIELDS = ["name", "offline", "master_candidate", "drained", "mtotal", "mnode", "mfree", "pinst_cnt", "sinst_cnt", "tags", "ctotal", "cnodes", "csockets", + "pip", "sip", "serial_no", "role", + "pinst_list", "sinst_list", ] @@ -441,8 +443,7 @@ class R_2_instances_name_reboot(baserlib.R_Generic): instance_name = self.items[0] reboot_type = self.queryargs.get('type', [constants.INSTANCE_REBOOT_HARD])[0] - ignore_secondaries = bool(self.queryargs.get('ignore_secondaries', - [False])[0]) + ignore_secondaries = bool(self._checkIntVariable('ignore_secondaries')) op = opcodes.OpRebootInstance(instance_name=instance_name, reboot_type=reboot_type, ignore_secondaries=ignore_secondaries) @@ -467,7 +468,7 @@ class R_2_instances_name_startup(baserlib.R_Generic): """ instance_name = self.items[0] - force_startup = bool(self.queryargs.get('force', [False])[0]) + force_startup = bool(self._checkIntVariable('force')) op = opcodes.OpStartupInstance(instance_name=instance_name, force=force_startup) diff --git a/lib/rpc.py b/lib/rpc.py index 6392f50cb072f200eb32daafca8d807576b7c8be..e13885903c597c7ffdab6b252ad8efdec48a6750 100644 --- a/lib/rpc.py +++ b/lib/rpc.py @@ -31,7 +31,6 @@ # R0904: Too many public methods import os -import socket import logging import zlib import base64 @@ -83,7 +82,7 @@ class RpcResult(object): calls we can't raise an exception just because one one out of many failed, and therefore we use this class to encapsulate the result. - @ivar data: the data payload, for successfull results, or None + @ivar data: the data payload, for successful results, or None @type failed: boolean @ivar failed: whether the operation failed at RPC level (not application level on the remote node) @@ -161,7 +160,7 @@ class Client: list of nodes, will contact (in parallel) all nodes, and return a dict of results (key: node name, value: result). - One current bug is that generic failure is still signalled by + One current bug is that generic failure is still signaled by 'False' result, which is not good. This overloading of values can cause bugs. @@ -220,7 +219,7 @@ class Client: @return: List of RPC results """ - assert _http_manager, "RPC module not intialized" + assert _http_manager, "RPC module not initialized" _http_manager.ExecRequests(self.nc.values()) @@ -269,9 +268,9 @@ class RpcRunner(object): @type instance: L{objects.Instance} @param instance: an Instance object @type hvp: dict or None - @param hvp: a dictionary with overriden hypervisor parameters + @param hvp: a dictionary with overridden hypervisor parameters @type bep: dict or None - @param bep: a dictionary with overriden backend parameters + @param bep: a dictionary with overridden backend parameters @rtype: dict @return: the instance dict, with the hvparams filled with the cluster defaults @@ -290,7 +289,7 @@ class RpcRunner(object): def _ConnectList(self, client, node_list, call): """Helper for computing node addresses. - @type client: L{Client} + @type client: L{ganeti.rpc.Client} @param client: a C{Client} instance @type node_list: list @param node_list: the node list we should connect @@ -320,7 +319,7 @@ class RpcRunner(object): def _ConnectNode(self, client, node, call): """Helper for computing one node's address. - @type client: L{Client} + @type client: L{ganeti.rpc.Client} @param client: a C{Client} instance @type node: str @param node: the node we should connect diff --git a/lib/ssconf.py b/lib/ssconf.py index 19a95c97a90d0165a14eb102ee756e58f17ce44a..78ae9b0280886d4b149ded1eeac00898ba8132bc 100644 --- a/lib/ssconf.py +++ b/lib/ssconf.py @@ -94,12 +94,6 @@ class SimpleConfigWriter(SimpleConfigReader): """Simple class to write configuration file. """ - def SetMasterNode(self, node): - """Change master node. - - """ - self._config_data["cluster"]["master_node"] = node - def Save(self): """Writes configuration file. diff --git a/lib/ssh.py b/lib/ssh.py index 40df9996500dd1228399f37b89366bed03a27202..f0362b4b81933a3939d0f2975def532948be2ea2 100644 --- a/lib/ssh.py +++ b/lib/ssh.py @@ -201,7 +201,7 @@ class SshRunner: connected to). This is used to detect problems in ssh known_hosts files - (conflicting known hosts) and incosistencies between dns/hosts + (conflicting known hosts) and inconsistencies between dns/hosts entries and local machine names @param node: nodename of a host to check; can be short or diff --git a/lib/utils.py b/lib/utils.py index ac781fb621c6bbed469d5d69f839148d66c7b72d..df2d18027e83b7783e146cbbe58f7efa92317980 100644 --- a/lib/utils.py +++ b/lib/utils.py @@ -27,7 +27,6 @@ the command line scripts. """ -import sys import os import time import subprocess @@ -59,7 +58,6 @@ from ganeti import constants _locksheld = [] _re_shell_unquoted = re.compile('^[-.,=:/_+@A-Za-z0-9]+$') -debug = False debug_locks = False #: when set to True, L{RunCmd} is disabled @@ -136,7 +134,7 @@ def RunCmd(cmd, env=None, output=None, cwd='/'): directory for the command; the default will be / @rtype: L{RunResult} @return: RunResult instance - @raise erors.ProgrammerError: if we call this when forks are disabled + @raise errors.ProgrammerError: if we call this when forks are disabled """ if no_fork: @@ -687,7 +685,7 @@ def TryConvert(fn, val): """ try: nv = fn(val) - except (ValueError, TypeError), err: + except (ValueError, TypeError): nv = val return nv @@ -701,7 +699,7 @@ def IsValidIP(ip): @type ip: str @param ip: the address to be checked @rtype: a regular expression match object - @return: a regular epression match object, or None if the + @return: a regular expression match object, or None if the address is not valid """ @@ -734,7 +732,7 @@ def BuildShellCmd(template, *args): This function will check all arguments in the args list so that they are valid shell parameters (i.e. they don't contain shell - metacharaters). If everything is ok, it will return the result of + metacharacters). If everything is ok, it will return the result of template % args. @type template: str @@ -1063,7 +1061,7 @@ def ShellQuoteArgs(args): @type args: list @param args: list of arguments to be quoted @rtype: str - @return: the quoted arguments concatenaned with spaces + @return: the quoted arguments concatenated with spaces """ return ' '.join([ShellQuote(i) for i in args]) @@ -1080,7 +1078,7 @@ def TcpPing(target, port, timeout=10, live_port_needed=False, source=None): @type port: int @param port: the port to connect to @type timeout: int - @param timeout: the timeout on the connection attemp + @param timeout: the timeout on the connection attempt @type live_port_needed: boolean @param live_port_needed: whether a closed port will cause the function to return failure, as if there was a timeout @@ -1097,7 +1095,7 @@ def TcpPing(target, port, timeout=10, live_port_needed=False, source=None): if source is not None: try: sock.bind((source, 0)) - except socket.error, (errcode, errstring): + except socket.error, (errcode, _): if errcode == errno.EADDRNOTAVAIL: success = False @@ -1122,7 +1120,7 @@ def OwnIpAddress(address): address. @type address: string - @param address: the addres to check + @param address: the address to check @rtype: bool @return: True if we own the address @@ -1218,7 +1216,7 @@ def ReadFile(file_name, size=None): @type size: None or int @param size: Read at most size bytes @rtype: str - @return: the (possibly partial) conent of the file + @return: the (possibly partial) content of the file """ f = open(file_name, "r") @@ -1360,14 +1358,14 @@ def FirstFree(seq, base=0): def all(seq, pred=bool): "Returns True if pred(x) is True for every element in the iterable" - for elem in itertools.ifilterfalse(pred, seq): + for _ in itertools.ifilterfalse(pred, seq): return False return True def any(seq, pred=bool): "Returns True if pred(x) is True for at least one element in the iterable" - for elem in itertools.ifilter(pred, seq): + for _ in itertools.ifilter(pred, seq): return True return False @@ -1378,7 +1376,7 @@ def UniqueSequence(seq): Element order is preserved. @type seq: sequence - @param seq: the sequence with the source elementes + @param seq: the sequence with the source elements @rtype: list @return: list of unique elements from seq @@ -1390,7 +1388,7 @@ def UniqueSequence(seq): def IsValidMac(mac): """Predicate to check if a MAC address is valid. - Checks wether the supplied MAC address is formally correct, only + Checks whether the supplied MAC address is formally correct, only accepts colon separated format. @type mac: str @@ -1552,7 +1550,6 @@ def RemovePidFile(name): @param name: the daemon name used to derive the pidfile name """ - pid = os.getpid() pidfilename = DaemonPidFileName(name) # TODO: we could check here that the file contains our pid try: @@ -1831,7 +1828,7 @@ def SafeEncode(text): """ if isinstance(text, unicode): - # onli if unicode; if str already, we handle it below + # only if unicode; if str already, we handle it below text = text.encode('ascii', 'backslashreplace') resu = "" for char in text: diff --git a/man/gnt-debug.sgml b/man/gnt-debug.sgml index 89fd9e4a1f3aecc63e36d2e9a7864f5c352781b3..a42a44175d39f255d3323998d2c6902f4fde6920 100644 --- a/man/gnt-debug.sgml +++ b/man/gnt-debug.sgml @@ -145,7 +145,10 @@ <cmdsynopsis> <command>submit-job</command> - + <arg choice="opt">--verbose</arg> + <arg choice="opt">--timing-stats</arg> + <arg choice="opt">--job-repeat <option>N</option></arg> + <arg choice="opt">--op-repeat <option>N</option></arg> <arg choice="req" rep="repeat">opcodes_file</arg> </cmdsynopsis> @@ -156,6 +159,24 @@ command line. </para> + <para> + The <option>verbose</option> option will job the job IDs of + the submitted jobs and the progress in waiting for the jobs; + the <option>timing-stats</option> option will show some + overall statistics with the number of total opcodes and jobs + submitted, and time time for each stage (submit, exec, total). + </para> + + <para> + The <option>job-repeat</option> and <option>op-repeat</option> + options allow to submit multiple copies of the passed + arguments; the job repeat will cause N copies of each job + (input file) to be submitted (equivalent to passing the + arguments N times) while the op repeat will cause each job to + contain multiple copies of the opcodes (equivalent to each + file containing N copies of the opcodes). + </para> + </refsect2> </refsect1> diff --git a/man/gnt-instance.sgml b/man/gnt-instance.sgml index d17a4a6b29e7ef91939ce6bd2a51d9f49f088510..a703d68b6a21136284a21b52f7043328a7ca3374 100644 --- a/man/gnt-instance.sgml +++ b/man/gnt-instance.sgml @@ -1229,31 +1229,47 @@ instance5: 11225 <cmdsynopsis> <command>reinstall</command> <arg choice="opt">-o <replaceable>os-type</replaceable></arg> - <arg choice="opt">-f <replaceable>force</replaceable></arg> <arg>--select-os</arg> + <arg choice="opt">-f <replaceable>force</replaceable></arg> + <arg>--force-multiple</arg> + <sbr> + <group choice="opt"> + <arg>--instance</arg> + <arg>--node</arg> + <arg>--primary</arg> + <arg>--secondary</arg> + <arg>--all</arg> + </group> <arg>--submit</arg> - <arg choice="req"><replaceable>instance</replaceable></arg> + <arg choice="opt" rep="repeat"><replaceable>instance</replaceable></arg> </cmdsynopsis> <para> - Reinstalls the operating system on the given instance. The - instance must be stopped when running this command. If the + Reinstalls the operating system on the given instance(s). The + instance(s) must be stopped when running this command. If the <option>--os-type</option> is specified, the operating system is changed. </para> - <para> - Since reinstall is potentially dangerous command, the user - will be required to confirm this action, unless the - <option>-f</option> flag is passed. - </para> - <para> The <option>--select-os</option> option switches to an interactive OS reinstall. The user is prompted to select the OS template from the list of available OS templates. </para> + <para> + Since this is a potentially dangerous command, the user will + be required to confirm this action, unless the + <option>-f</option> flag is passed. When multiple instances + are selected (either by passing multiple arguments or by + using the <option>--node</option>, + <option>--primary</option>, <option>--secondary</option> or + <option>--all</option> options), the user must pass both the + <option>--force</option> and + <option>--force-multiple</option> options to skip the + interactive confirmation. + </para> + <para> The <option>--submit</option> option is used to send the job to the master daemon but not wait for its completion. The job diff --git a/man/gnt-job.sgml b/man/gnt-job.sgml index bf81ec7062e1c476caa9564b87008556633845e7..2fa73074745af410d5f254bb11e430a90b65e206 100644 --- a/man/gnt-job.sgml +++ b/man/gnt-job.sgml @@ -107,7 +107,7 @@ <refsect2> <title>INFO</title> <cmdsynopsis> - <command>cancel</command> + <command>info</command> <arg choice="req" rep="repeat"><replaceable>id</replaceable></arg> </cmdsynopsis> @@ -231,6 +231,20 @@ </para> </refsect2> + + <refsect2> + <title>WATCH</title> + <cmdsynopsis> + <command>watch</command> + <arg>id</arg> + </cmdsynopsis> + + <para> + This command follows the output of the job by the given + <replaceable>id</replaceable> and prints it. + </para> + </refsect2> + </refsect1> &footer; diff --git a/man/gnt-node.sgml b/man/gnt-node.sgml index a35042dbd926d06ed2f597adfc9e287a0fbd0d0b..278a6335d80504938c7b22241500427dc0707e53 100644 --- a/man/gnt-node.sgml +++ b/man/gnt-node.sgml @@ -91,7 +91,7 @@ discussion in <citerefentry> <refentrytitle>gnt-cluster</refentrytitle> <manvolnum>8</manvolnum> </citerefentry> for more - informations. + information. </para> <para> diff --git a/pylintrc b/pylintrc new file mode 100644 index 0000000000000000000000000000000000000000..61163e7f2a3f33490e68cde8fddb35f70d98a178 --- /dev/null +++ b/pylintrc @@ -0,0 +1,78 @@ +# Configuration file for pylint (http://www.logilab.org/project/pylint). See +# http://www.logilab.org/card/pylintfeatures for more detailed variable +# descriptions. + +[MASTER] +profile = no +ignore = +persistent = no +cache-size = 50000 +load-plugins = + +[REPORTS] +output-format = colorized +include-ids = no +files-output = no +reports = no +evaluation = 10.0 - ((float(5 * error + warning + refactor + convention) / statement) * 10) +comment = yes + +[BASIC] +required-attributes = +no-docstring-rgx = __.*__ +module-rgx = (([a-z_][a-z0-9_]*)|([A-Z][a-zA-Z0-9]+))$ +const-rgx = ((_{0,2}[A-Z][A-Z0-9_]*)|(__.*__))$ +class-rgx = _?[A-Z][a-zA-Z0-9]+$ +function-rgx = (_?([A-Z]+[a-z0-9]+([A-Z]+[a-z0-9]*)*)|main)$ +method-rgx = (_{0,2}[A-Z]+[a-z0-9]+([A-Z]+[a-z0-9]*)*|__.*__)$ +attr-rgx = [a-z_][a-z0-9_]{1,30}$ +argument-rgx = [a-z_][a-z0-9_]*$ +variable-rgx = (_?([a-z_][a-z0-9_]*)|([A-Z0-9_]+))$ +inlinevar-rgx = [A-Za-z_][A-Za-z0-9_]*$ +good-names = i,j,k,_ +bad-names = foo,bar,baz,toto,tutu,tata +bad-functions = + +[TYPECHECK] +ignore-mixin-members = yes +zope = no +acquired-members = + +[VARIABLES] +init-import = no +dummy-variables-rgx = _ +additional-builtins = + +[CLASSES] +ignore-iface-methods = +defining-attr-methods = __init__,__new__,setUp + +[DESIGN] +max-args = 6 +max-locals = 15 +max-returns = 6 +max-branchs = 12 +max-statements = 50 +max-parents = 7 +max-attributes = 7 +min-public-methods = 2 +max-public-methods = 20 + +[IMPORTS] +deprecated-modules = regsub,string,TERMIOS,Bastion,rexec +import-graph = +ext-import-graph = +int-import-graph = + +[FORMAT] +max-line-length = 80 +max-module-lines = 1000 +indent-string = " " + +[MISCELLANEOUS] +notes = FIXME,XXX,TODO + +[SIMILARITIES] +min-similarity-lines = 4 +ignore-comments = yes +ignore-docstrings = yes diff --git a/qa/qa_rapi.py b/qa/qa_rapi.py index 7c668c5184436215900922150357364a6d2b50ff..76b96f10b74f2894dff97599006a837ebb4f131f 100644 --- a/qa/qa_rapi.py +++ b/qa/qa_rapi.py @@ -58,19 +58,7 @@ def Enabled(): """Return whether remote API tests should be run. """ - return constants.RAPI_ENABLE and qa_config.TestEnabled('rapi') - - -def PrintRemoteAPIWarning(): - """Print warning if remote API is not enabled. - - """ - if constants.RAPI_ENABLE or not qa_config.TestEnabled('rapi'): - return - msg = ("Remote API is not enabled in this Ganeti build. Please run" - " `configure [...] --enable-rapi'.") - print - print qa_utils.FormatWarning(msg) + return qa_config.TestEnabled('rapi') def _DoTests(uris): diff --git a/scripts/gnt-cluster b/scripts/gnt-cluster index b5fd16664c0a0f36ba6c453666e3421da82558d1..2d53d1d6a0d66a925ee6b084bfd397258cebb654 100755 --- a/scripts/gnt-cluster +++ b/scripts/gnt-cluster @@ -93,11 +93,6 @@ def InitCluster(opts, args): hvparams[hv][parameter] = constants.HVC_DEFAULTS[hv][parameter] utils.ForceDictType(hvparams[hv], constants.HVS_PARAMETER_TYPES) - for hv in hvlist: - if hv not in constants.HYPER_TYPES: - ToStderr("invalid hypervisor: %s", hv) - return 1 - bootstrap.InitCluster(cluster_name=args[0], secondary_ip=opts.secondary_ip, vg_name=vg_name, @@ -110,6 +105,7 @@ def InitCluster(opts, args): hvparams=hvparams, beparams=beparams, candidate_pool_size=opts.candidate_pool_size, + modify_etc_hosts=opts.modify_etc_hosts, ) return 0 @@ -232,6 +228,13 @@ def ShowClusterConfig(opts, args): ToStdout("Architecture (this node): %s (%s)", result["architecture"][0], result["architecture"][1]) + if result["tags"]: + tags = ", ".join(utils.NiceSort(result["tags"])) + else: + tags = "(none)" + + ToStdout("Tags: %s", tags) + ToStdout("Default hypervisor: %s", result["default_hypervisor"]) ToStdout("Enabled hypervisors: %s", ", ".join(result["enabled_hypervisors"])) @@ -587,6 +590,10 @@ commands = { help="No support for lvm based instances" " (cluster-wide)", action="store_false", default=True,), + make_option("--no-etc-hosts", dest="modify_etc_hosts", + help="Don't modify /etc/hosts" + " (cluster-wide)", + action="store_false", default=True,), make_option("--enabled-hypervisors", dest="enabled_hypervisors", help="Comma-separated list of hypervisors", type="string", default=None), diff --git a/scripts/gnt-debug b/scripts/gnt-debug index d3bf05450c3c08cc6ad94de10d744dbc4d3273cd..df48e6034cdb3f317c92828f1bbe65ba89b4f89e 100755 --- a/scripts/gnt-debug +++ b/scripts/gnt-debug @@ -71,19 +71,37 @@ def GenericOpCodes(opts, args): """ cl = cli.GetClient() - job_data = [] - job_ids = [] - for fname in args: - op_data = simplejson.loads(open(fname).read()) - op_list = [opcodes.OpCode.LoadOpCode(val) for val in op_data] - job_data.append((fname, op_list)) - for fname, op_list in job_data: - jid = cli.SendJob(op_list, cl=cl) - ToStdout("File '%s', job id: %s", fname, jid) - job_ids.append(jid) - for jid in job_ids: - ToStdout("Waiting for job id %s", jid) - cli.PollJob(jid, cl=cl) + jex = cli.JobExecutor(cl=cl, verbose=opts.verbose) + + job_cnt = 0 + op_cnt = 0 + if opts.timing_stats: + ToStdout("Loading...") + for job_idx in range(opts.rep_job): + for fname in args: + op_data = simplejson.loads(open(fname).read()) + op_list = [opcodes.OpCode.LoadOpCode(val) for val in op_data] + op_list = op_list * opts.rep_op + jex.QueueJob("file %s/%d" % (fname, job_idx), *op_list) + op_cnt += len(op_list) + job_cnt += 1 + + if opts.timing_stats: + t1 = time.time() + ToStdout("Submitting...") + jex.SubmitPending() + + if opts.timing_stats: + t2 = time.time() + ToStdout("Executing...") + jex.GetResults() + if opts.timing_stats: + t3 = time.time() + ToStdout("C:op %4d" % op_cnt) + ToStdout("C:job %4d" % job_cnt) + ToStdout("T:submit %4.4f" % (t2-t1)) + ToStdout("T:exec %4.4f" % (t3-t2)) + ToStdout("T:total %4.4f" % (t3-t1)) return 0 @@ -148,6 +166,20 @@ commands = { "[opts...] <duration>", "Executes a TestDelay OpCode"), 'submit-job': (GenericOpCodes, ARGS_ATLEAST(1), [DEBUG_OPT, + make_option("--op-repeat", type="int", default="1", + dest="rep_op", + help="Repeat the opcode sequence this number" + " of times"), + make_option("--job-repeat", type="int", default="1", + dest="rep_job", + help="Repeat the job this number" + " of times"), + make_option("-v", "--verbose", default=False, + action="store_true", + help="Make the operation more verbose"), + make_option("--timing-stats", default=False, + action="store_true", + help="Show timing stats"), ], "<op_list_file...>", "Submits jobs built from json files" " containing a list of serialized opcodes"), diff --git a/scripts/gnt-instance b/scripts/gnt-instance index e16fed7b6804626ed5a646a01109402f8cb795c3..056337700f6d61bc503d623bfca802e2baf98373 100755 --- a/scripts/gnt-instance +++ b/scripts/gnt-instance @@ -116,7 +116,7 @@ def _ExpandMultiNames(mode, names, client=None): return inames -def _ConfirmOperation(inames, text): +def _ConfirmOperation(inames, text, extra=""): """Ask the user to confirm an operation on a list of instances. This function is used to request confirmation for doing an operation @@ -133,8 +133,8 @@ def _ConfirmOperation(inames, text): """ count = len(inames) - msg = ("The %s will operate on %d instances.\n" - "Do you want to continue?" % (text, count)) + msg = ("The %s will operate on %d instances.\n%s" + "Do you want to continue?" % (text, count, extra)) affected = ("\nAffected instances:\n" + "\n".join([" %s" % name for name in inames])) @@ -160,7 +160,7 @@ def _EnsureInstancesExist(client, names): This function will raise an OpPrereqError in case they don't exist. Otherwise it will exit cleanly. - @type client: L{luxi.Client} + @type client: L{ganeti.luxi.Client} @param client: the client to use for the query @type names: list @param names: the list of instance names to query @@ -441,6 +441,8 @@ def BatchCreate(opts, args): ToStderr("Can't parse the instance definition file: %s" % str(err)) return 1 + jex = JobExecutor() + # Iterate over the instances and do: # * Populate the specs with default value # * Validate the instance specs @@ -486,7 +488,9 @@ def BatchCreate(opts, args): file_storage_dir=specs['file_storage_dir'], file_driver=specs['file_driver']) - ToStdout("%s: %s", name, cli.SendJob([op])) + jex.QueueJob(name, op) + # we never want to wait, just show the submitted job IDs + jex.WaitOrShow(False) return 0 @@ -502,8 +506,15 @@ def ReinstallInstance(opts, args): @return: the desired exit code """ - instance_name = args[0] + # first, compute the desired name list + if opts.multi_mode is None: + opts.multi_mode = _SHUTDOWN_INSTANCES + + inames = _ExpandMultiNames(opts.multi_mode, args) + if not inames: + raise errors.OpPrereqError("Selection filter does not match any instances") + # second, if requested, ask for an OS if opts.select_os is True: op = opcodes.OpDiagnoseOS(output_fields=["name", "valid"], names=[]) result = SubmitOpCode(op) @@ -525,23 +536,35 @@ def ReinstallInstance(opts, args): choices) if selected == 'exit': - ToStdout("User aborted reinstall, exiting") + ToStderr("User aborted reinstall, exiting") return 1 os_name = selected else: os_name = opts.os - if not opts.force: - usertext = ("This will reinstall the instance %s and remove" - " all data. Continue?") % instance_name - if not AskUser(usertext): + # third, get confirmation: multi-reinstall requires --force-multi + # *and* --force, single-reinstall just --force + multi_on = opts.multi_mode != _SHUTDOWN_INSTANCES or len(inames) > 1 + if multi_on: + warn_msg = "Note: this will remove *all* data for the below instances!\n" + if not ((opts.force_multi and opts.force) or + _ConfirmOperation(inames, "reinstall", extra=warn_msg)): return 1 + else: + if not opts.force: + usertext = ("This will reinstall the instance %s and remove" + " all data. Continue?") % inames[0] + if not AskUser(usertext): + return 1 + + jex = JobExecutor(verbose=multi_on) + for instance_name in inames: + op = opcodes.OpReinstallInstance(instance_name=instance_name, + os_type=os_name) + jex.QueueJob(instance_name, op) - op = opcodes.OpReinstallInstance(instance_name=instance_name, - os_type=os_name) - SubmitOrSend(op, opts) - + jex.WaitOrShow(not opts.submit_only) return 0 @@ -1374,8 +1397,11 @@ commands = { " The default field" " list is (in order): %s." % ", ".join(_LIST_DEF_FIELDS), ), - 'reinstall': (ReinstallInstance, ARGS_ONE, + 'reinstall': (ReinstallInstance, ARGS_ANY, [DEBUG_OPT, FORCE_OPT, os_opt, + m_force_multi, + m_node_opt, m_pri_node_opt, m_sec_node_opt, + m_clust_opt, m_inst_opt, make_option("--select-os", dest="select_os", action="store_true", default=False, help="Interactive OS reinstall, lists available" diff --git a/scripts/gnt-job b/scripts/gnt-job index 2da75a35b1da77b5ccf864de88ccfbc2aa0d95d3..1402583e6ae8ba80c3d628aaac0ed2cb7730a7b5 100755 --- a/scripts/gnt-job +++ b/scripts/gnt-job @@ -29,6 +29,7 @@ from ganeti.cli import * from ganeti import constants from ganeti import errors from ganeti import utils +from ganeti import cli #: default list of fields for L{ListJobs} @@ -312,6 +313,32 @@ def ShowJobs(opts, args): return 0 +def WatchJob(opts, args): + """Follow a job and print its output as it arrives. + + @param opts: the command line options selected by the user + @type args: list + @param args: Contains the job ID + @rtype: int + @return: the desired exit code + + """ + job_id = args[0] + + msg = ("Output from job %s follows" % job_id) + ToStdout(msg) + ToStdout("-" * len(msg)) + + retcode = 0 + try: + cli.PollJob(job_id) + except errors.GenericError, err: + (retcode, job_result) = cli.FormatError(err) + ToStderr("Job %s failed: %s", job_id, job_result) + + return retcode + + commands = { 'list': (ListJobs, ARGS_ANY, [DEBUG_OPT, NOHDR_OPT, SEP_OPT, FIELDS_OPT], @@ -336,6 +363,9 @@ commands = { 'info': (ShowJobs, ARGS_ANY, [DEBUG_OPT], "<job-id> [<job-id> ...]", "Show detailed information about the specified jobs"), + 'watch': (WatchJob, ARGS_ONE, [DEBUG_OPT], + "<job-id>", + "Follows a job and prints its output as it arrives"), } diff --git a/scripts/gnt-node b/scripts/gnt-node index bb244d58690fa3afb7fc7546ca2bda0ea8309dd7..90ec77a1974b5f4b35c8135c4d0ca31544ea9b2e 100755 --- a/scripts/gnt-node +++ b/scripts/gnt-node @@ -189,7 +189,7 @@ def EvacuateNode(opts, args): cnt = [dst_node, iallocator].count(None) if cnt != 1: - raise errors.OpPrereqError("One and only one of the -n and -i" + raise errors.OpPrereqError("One and only one of the -n and -I" " options must be passed") selected_fields = ["name", "sinst_list"] @@ -541,7 +541,7 @@ commands = { choices=('yes', 'no'), default=None, help="Set the drained flag on the node"), ], - "<instance>", "Alters the parameters of an instance"), + "<node_name>", "Alters the parameters of a node"), 'remove': (RemoveNode, ARGS_ONE, [DEBUG_OPT], "<node_name>", "Removes a node from the cluster"), 'volumes': (ListVolumes, ARGS_ANY, diff --git a/test/ganeti.config_unittest.py b/test/ganeti.config_unittest.py index 24c9491ca435724f8613cc845e2a3615ac3aa448..37bcb2081768e308a7b3b3f9a946fd5b7797c00a 100755 --- a/test/ganeti.config_unittest.py +++ b/test/ganeti.config_unittest.py @@ -69,6 +69,7 @@ class TestConfigRunner(unittest.TestCase): default_bridge=constants.DEFAULT_BRIDGE, tcpudp_port_pool=set(), default_hypervisor=constants.HT_FAKE, + enabled_hypervisors=[constants.HT_FAKE], master_node=me.name, master_ip="127.0.0.1", master_netdev=constants.DEFAULT_BRIDGE, diff --git a/tools/burnin b/tools/burnin index d0ef8775385b58581ceee1e5bd59ddcde93f9280..b38680fc4b53ddcc809a98a7ee3253afd9c023d2 100755 --- a/tools/burnin +++ b/tools/burnin @@ -41,11 +41,16 @@ from ganeti import utils USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...") +MAX_RETRIES = 3 class InstanceDown(Exception): """The checked instance was not up""" +class BurninFailure(Exception): + """Failure detected during burning""" + + def Usage(): """Shows program usage information and exits the program.""" @@ -106,6 +111,9 @@ class Burner(object): self.to_rem = [] self.queued_ops = [] self.opts = None + self.queue_retry = False + self.disk_count = self.disk_growth = self.disk_size = None + self.hvp = self.bep = None self.ParseOptions() self.cl = cli.GetClient() self.GetState() @@ -125,7 +133,39 @@ class Burner(object): if self.opts.verbose: Log(msg, indent=3) - def ExecOp(self, *ops): + def MaybeRetry(self, retry_count, msg, fn, *args): + """Possibly retry a given function execution. + + @type retry_count: int + @param retry_count: retry counter: + - 0: non-retryable action + - 1: last retry for a retryable action + - MAX_RETRIES: original try for a retryable action + @type msg: str + @param msg: the kind of the operation + @type fn: callable + @param fn: the function to be called + + """ + try: + val = fn(*args) + if retry_count > 0 and retry_count < MAX_RETRIES: + Log("Idempotent %s succeeded after %d retries" % + (msg, MAX_RETRIES - retry_count)) + return val + except Exception, err: + if retry_count == 0: + Log("Non-idempotent %s failed, aborting" % (msg, )) + raise + elif retry_count == 1: + Log("Idempotent %s repeated failure, aborting" % (msg, )) + raise + else: + Log("Idempotent %s failed, retry #%d/%d: %s" % + (msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err)) + self.MaybeRetry(retry_count - 1, msg, fn, *args) + + def _ExecOp(self, *ops): """Execute one or more opcodes and manage the exec buffer. @result: if only opcode has been passed, we return its result; @@ -139,20 +179,48 @@ class Burner(object): else: return results + def ExecOp(self, retry, *ops): + """Execute one or more opcodes and manage the exec buffer. + + @result: if only opcode has been passed, we return its result; + otherwise we return the list of results + + """ + if retry: + rval = MAX_RETRIES + else: + rval = 0 + return self.MaybeRetry(rval, "opcode", self._ExecOp, *ops) + def ExecOrQueue(self, name, *ops): """Execute an opcode and manage the exec buffer.""" if self.opts.parallel: self.queued_ops.append((ops, name)) else: - return self.ExecOp(*ops) + return self.ExecOp(self.queue_retry, *ops) + + def StartBatch(self, retry): + """Start a new batch of jobs. + + @param retry: whether this is a retryable batch + + """ + self.queued_ops = [] + self.queue_retry = retry def CommitQueue(self): """Execute all submitted opcodes in case of parallel burnin""" if not self.opts.parallel: return + if self.queue_retry: + rval = MAX_RETRIES + else: + rval = 0 + try: - results = self.ExecJobSet(self.queued_ops) + results = self.MaybeRetry(rval, "jobset", self.ExecJobSet, + self.queued_ops) finally: self.queued_ops = [] return results @@ -171,10 +239,45 @@ class Burner(object): results = [] for jid, (_, iname) in zip(job_ids, jobs): Log("waiting for job %s for %s" % (jid, iname), indent=2) - results.append(cli.PollJob(jid, cl=self.cl, feedback_fn=self.Feedback)) - + try: + results.append(cli.PollJob(jid, cl=self.cl, feedback_fn=self.Feedback)) + except Exception, err: + Log("Job for %s failed: %s" % (iname, err)) + if len(results) != len(jobs): + raise BurninFailure() return results + def _DoCheckInstances(fn): + """Decorator for checking instances. + + """ + def wrapper(self, *args, **kwargs): + val = fn(self, *args, **kwargs) + for instance in self.instances: + self._CheckInstanceAlive(instance) + return val + + return wrapper + + def _DoBatch(retry): + """Decorator for possible batch operations. + + Must come after the _DoCheckInstances decorator (if any). + + @param retry: whether this is a retryable batch, will be + passed to StartBatch + + """ + def wrap(fn): + def batched(self, *args, **kwargs): + self.StartBatch(retry) + val = fn(self, *args, **kwargs) + self.CommitQueue() + return val + return batched + + return wrap + def ParseOptions(self): """Parses the command line options. @@ -325,14 +428,14 @@ class Burner(object): try: op = opcodes.OpQueryNodes(output_fields=["name", "offline", "drained"], names=names, use_locking=True) - result = self.ExecOp(op) + result = self.ExecOp(True, op) except errors.GenericError, err: err_code, msg = cli.FormatError(err) Err(msg, exit_code=err_code) self.nodes = [data[0] for data in result if not (data[1] or data[2])] - result = self.ExecOp(opcodes.OpDiagnoseOS(output_fields=["name", "valid"], - names=[])) + op_diagos = opcodes.OpDiagnoseOS(output_fields=["name", "valid"], names=[]) + result = self.ExecOp(True, op_diagos) if not result: Err("Can't get the OS list") @@ -343,6 +446,8 @@ class Burner(object): if self.opts.os not in os_set: Err("OS '%s' not found" % self.opts.os) + @_DoCheckInstances + @_DoBatch(False) def BurnCreateInstances(self): """Create the given instances. @@ -388,11 +493,7 @@ class Burner(object): self.ExecOrQueue(instance, op) self.to_rem.append(instance) - self.CommitQueue() - - for instance in self.instances: - self._CheckInstanceAlive(instance) - + @_DoBatch(False) def BurnGrowDisks(self): """Grow both the os and the swap disks by the requested amount, if any.""" Log("Growing disks") @@ -404,8 +505,8 @@ class Burner(object): amount=growth, wait_for_sync=True) Log("increase disk/%s by %s MB" % (idx, growth), indent=2) self.ExecOrQueue(instance, op) - self.CommitQueue() + @_DoBatch(True) def BurnReplaceDisks1D8(self): """Replace disks on primary and secondary for drbd8.""" Log("Replacing disks on the same nodes") @@ -419,8 +520,8 @@ class Burner(object): Log("run %s" % mode, indent=2) ops.append(op) self.ExecOrQueue(instance, *ops) - self.CommitQueue() + @_DoBatch(True) def BurnReplaceDisks2(self): """Replace secondary node.""" Log("Changing the secondary node") @@ -442,8 +543,9 @@ class Burner(object): disks=[i for i in range(self.disk_count)]) Log("run %s %s" % (mode, msg), indent=2) self.ExecOrQueue(instance, op) - self.CommitQueue() + @_DoCheckInstances + @_DoBatch(False) def BurnFailover(self): """Failover the instances.""" Log("Failing over instances") @@ -453,10 +555,8 @@ class Burner(object): ignore_consistency=False) self.ExecOrQueue(instance, op) - self.CommitQueue() - for instance in self.instances: - self._CheckInstanceAlive(instance) + @_DoBatch(False) def BurnMigrate(self): """Migrate the instances.""" Log("Migrating instances") @@ -469,8 +569,9 @@ class Burner(object): cleanup=True) Log("migration and migration cleanup", indent=2) self.ExecOrQueue(instance, op1, op2) - self.CommitQueue() + @_DoCheckInstances + @_DoBatch(False) def BurnImportExport(self): """Export the instance, delete it, and import it back. @@ -486,7 +587,7 @@ class Burner(object): # read the full name of the instance nam_op = opcodes.OpQueryInstances(output_fields=["name"], names=[instance], use_locking=True) - full_name = self.ExecOp(nam_op)[0][0] + full_name = self.ExecOp(False, nam_op)[0][0] if self.opts.iallocator: pnode = snode = None @@ -535,10 +636,6 @@ class Burner(object): Log("remove export", indent=2) self.ExecOrQueue(instance, exp_op, rem_op, imp_op, erem_op) - self.CommitQueue() - for instance in self.instances: - self._CheckInstanceAlive(instance) - def StopInstanceOp(self, instance): """Stop given instance.""" return opcodes.OpShutdownInstance(instance_name=instance) @@ -552,6 +649,8 @@ class Burner(object): return opcodes.OpRenameInstance(instance_name=instance, new_name=instance_new) + @_DoCheckInstances + @_DoBatch(True) def BurnStopStart(self): """Stop/start the instances.""" Log("Stopping and starting instances") @@ -561,11 +660,7 @@ class Burner(object): op2 = self.StartInstanceOp(instance) self.ExecOrQueue(instance, op1, op2) - self.CommitQueue() - - for instance in self.instances: - self._CheckInstanceAlive(instance) - + @_DoBatch(False) def BurnRemove(self): """Remove the instances.""" Log("Removing instances") @@ -575,8 +670,6 @@ class Burner(object): ignore_failures=True) self.ExecOrQueue(instance, op) - self.CommitQueue() - def BurnRename(self): """Rename the instances. @@ -594,11 +687,13 @@ class Burner(object): op_rename2 = self.RenameInstanceOp(rename, instance) op_start1 = self.StartInstanceOp(rename) op_start2 = self.StartInstanceOp(instance) - self.ExecOp(op_stop1, op_rename1, op_start1) + self.ExecOp(False, op_stop1, op_rename1, op_start1) self._CheckInstanceAlive(rename) - self.ExecOp(op_stop2, op_rename2, op_start2) + self.ExecOp(False, op_stop2, op_rename2, op_start2) self._CheckInstanceAlive(instance) + @_DoCheckInstances + @_DoBatch(True) def BurnReinstall(self): """Reinstall the instances.""" Log("Reinstalling instances") @@ -613,11 +708,8 @@ class Burner(object): op4 = self.StartInstanceOp(instance) self.ExecOrQueue(instance, op1, op2, op3, op4) - self.CommitQueue() - - for instance in self.instances: - self._CheckInstanceAlive(instance) - + @_DoCheckInstances + @_DoBatch(True) def BurnReboot(self): """Reboot the instances.""" Log("Rebooting instances") @@ -632,11 +724,8 @@ class Burner(object): ops.append(op) self.ExecOrQueue(instance, *ops) - self.CommitQueue() - - for instance in self.instances: - self._CheckInstanceAlive(instance) - + @_DoCheckInstances + @_DoBatch(True) def BurnActivateDisks(self): """Activate and deactivate disks of the instances.""" Log("Activating/deactivating disks") @@ -650,10 +739,9 @@ class Burner(object): Log("activate disks when offline", indent=2) Log("deactivate disks (when offline)", indent=2) self.ExecOrQueue(instance, op_act, op_stop, op_act, op_deact, op_start) - self.CommitQueue() - for instance in self.instances: - self._CheckInstanceAlive(instance) + @_DoCheckInstances + @_DoBatch(False) def BurnAddRemoveDisks(self): """Add and remove an extra disk for the instances.""" Log("Adding and removing disks") @@ -669,10 +757,8 @@ class Burner(object): Log("adding a disk", indent=2) Log("removing last disk", indent=2) self.ExecOrQueue(instance, op_add, op_stop, op_rem, op_start) - self.CommitQueue() - for instance in self.instances: - self._CheckInstanceAlive(instance) + @_DoBatch(False) def BurnAddRemoveNICs(self): """Add and remove an extra NIC for the instances.""" Log("Adding and removing NICs") @@ -685,7 +771,6 @@ class Burner(object): Log("adding a NIC", indent=2) Log("removing last NIC", indent=2) self.ExecOrQueue(instance, op_add, op_rem) - self.CommitQueue() def _CheckInstanceAlive(self, instance): """Check if an instance is alive by doing http checks. @@ -784,7 +869,14 @@ class Burner(object): Log(self.GetFeedbackBuf()) Log("\n\n") if not self.opts.keep_instances: - self.BurnRemove() + try: + self.BurnRemove() + except Exception, err: + if has_err: # already detected errors, so errors in removal + # are quite expected + Log("Note: error detected during instance remove: %s" % str(err)) + else: # non-expected error + raise return 0 diff --git a/tools/lvmstrap b/tools/lvmstrap index 8af2f61e139694dc4e5110fd13b3b576625c96e7..a10296e1184542d9b0ae243dbe844c559a1f192d 100755 --- a/tools/lvmstrap +++ b/tools/lvmstrap @@ -46,6 +46,7 @@ import time from ganeti.utils import RunCmd from ganeti import constants +from ganeti import cli USAGE = ("\tlvmstrap diskinfo\n" "\tlvmstrap [--vgname=NAME] [--allow-removable]" @@ -267,7 +268,7 @@ def CheckSysDev(name, devnum): devnum: the device number, e.g. 0x803 (2051 in decimal) for sda3 Returns: - None; failure of the check is signalled by raising a + None; failure of the check is signaled by raising a SysconfigError exception """ @@ -449,7 +450,7 @@ def GetMountInfo(): def DevInfo(name, dev, mountinfo): - """Computes miscellaneous informations about a block device. + """Computes miscellaneous information about a block device. Args: name: the device name, e.g. sda @@ -478,7 +479,7 @@ def DevInfo(name, dev, mountinfo): def ShowDiskInfo(opts): """Shows a nicely formatted block device list for this system. - This function shows the user a table with the informations gathered + This function shows the user a table with the information gathered by the other functions defined, in order to help the user make a choice about which disks should be allocated to our volume group. @@ -487,8 +488,15 @@ def ShowDiskInfo(opts): dlist = GetDiskList(opts) print "------- Disk information -------" - print ("%5s %7s %4s %5s %-10s %s" % - ("Name", "Size[M]", "Used", "Mount", "LVM?", "Info")) + headers = { + "name": "Name", + "size": "Size[M]", + "used": "Used", + "mount": "Mount", + "lvm": "LVM?", + "info": "Info" + } + fields = ["name", "size", "used", "mount", "lvm", "info"] flatlist = [] # Flatten the [(disk, [partition,...]), ...] list @@ -501,6 +509,7 @@ def ShowDiskInfo(opts): for partname, partsize, partdev in parts: flatlist.append((partname, partsize, partdev, "")) + strlist = [] for name, size, dev, in_use in flatlist: mp, vgname, fileinfo = DevInfo(name, dev, mounts) if mp is None: @@ -515,8 +524,15 @@ def ShowDiskInfo(opts): if len(name) > 3: # Indent partitions name = " %s" % name - print ("%-5s %7.2f %-4s %-5s %-10s %s" % - (name, float(size) / 1024 / 1024, in_use, mp, lvminfo, fileinfo)) + + strlist.append([name, "%.2f" % (float(size) / 1024 / 1024), + in_use, mp, lvminfo, fileinfo]) + + data = cli.GenerateTable(headers, fields, None, + strlist, numfields=["size"]) + + for line in data: + print line def CheckReread(name):