#!/usr/bin/python # # Copyright (C) 2006, 2007 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA # 02110-1301, USA. """Burnin program """ import os import sys import optparse import time import socket import urllib2 import errno from itertools import izip, islice, cycle from cStringIO import StringIO from ganeti import opcodes from ganeti import mcpu from ganeti import constants from ganeti import cli from ganeti import errors from ganeti import utils USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...") class InstanceDown(Exception): """The checked instance was not up""" def Usage(): """Shows program usage information and exits the program.""" print >> sys.stderr, "Usage:" print >> sys.stderr, USAGE sys.exit(2) def Log(msg): """Simple function that prints out its argument. """ print msg sys.stdout.flush() class Burner(object): """Burner class.""" def __init__(self): """Constructor.""" utils.SetupLogging(constants.LOG_BURNIN, debug=False, stderr_logging=True) self._feed_buf = StringIO() self.nodes = [] self.instances = [] self.to_rem = [] self.opts = None self.cl = cli.GetClient() self.ParseOptions() self.GetState() def ClearFeedbackBuf(self): """Clear the feedback buffer.""" self._feed_buf.truncate(0) def GetFeedbackBuf(self): """Return the contents of the buffer.""" return self._feed_buf.getvalue() def Feedback(self, msg): """Acumulate feedback in our buffer.""" self._feed_buf.write("%s %s\n" % (time.ctime(utils.MergeTime(msg[0])), msg[2])) if self.opts.verbose: Log(msg) def ExecOp(self, op): """Execute an opcode and manage the exec buffer.""" self.ClearFeedbackBuf() return cli.SubmitOpCode(op, feedback_fn=self.Feedback, cl=self.cl) def ExecJobSet(self, jobs): """Execute a set of jobs and return once all are done. The method will return the list of results, if all jobs are successfull. Otherwise, OpExecError will be raised from within cli.py. """ self.ClearFeedbackBuf() job_ids = [cli.SendJob(job, cl=self.cl) for job in jobs] Log("- Submitted job IDs %s" % ", ".join(job_ids)) results = [] for jid in job_ids: Log("- Waiting for job %s" % jid) results.append(cli.PollJob(jid, cl=self.cl, feedback_fn=self.Feedback)) return results def ParseOptions(self): """Parses the command line options. In case of command line errors, it will show the usage and exit the program. """ parser = optparse.OptionParser(usage="\n%s" % USAGE, version="%%prog (ganeti) %s" % constants.RELEASE_VERSION, option_class=cli.CliOption) parser.add_option("-o", "--os", dest="os", default=None, help="OS to use during burnin", metavar="<OS>") parser.add_option("--disk-size", dest="disk_size", help="Disk size (determines disk count)", default="128m", type="string", metavar="<size,size,...>") parser.add_option("--disk-growth", dest="disk_growth", help="Disk growth", default="128m", type="string", metavar="<size,size,...>") parser.add_option("--mem-size", dest="mem_size", help="Memory size", default=128, type="unit", metavar="<size>") parser.add_option("-v", "--verbose", action="store_true", dest="verbose", default=False, help="print command execution messages to stdout") parser.add_option("--no-replace1", dest="do_replace1", help="Skip disk replacement with the same secondary", action="store_false", default=True) parser.add_option("--no-replace2", dest="do_replace2", help="Skip disk replacement with a different secondary", action="store_false", default=True) parser.add_option("--no-failover", dest="do_failover", help="Skip instance failovers", action="store_false", default=True) parser.add_option("--no-importexport", dest="do_importexport", help="Skip instance export/import", action="store_false", default=True) parser.add_option("--no-startstop", dest="do_startstop", help="Skip instance stop/start", action="store_false", default=True) parser.add_option("--no-reinstall", dest="do_reinstall", help="Skip instance reinstall", action="store_false", default=True) parser.add_option("--no-reboot", dest="do_reboot", help="Skip instance reboot", action="store_false", default=True) parser.add_option("--no-activate-disks", dest="do_activate_disks", help="Skip disk activation/deactivation", action="store_false", default=True) parser.add_option("--no-add-disks", dest="do_addremove_disks", help="Skip disk addition/removal", action="store_false", default=True) parser.add_option("--no-add-nics", dest="do_addremove_nics", help="Skip NIC addition/removal", action="store_false", default=True) parser.add_option("--no-nics", dest="nics", help="No network interfaces", action="store_const", const=[], default=[{}]) parser.add_option("--rename", dest="rename", default=None, help="Give one unused instance name which is taken" " to start the renaming sequence", metavar="<instance_name>") parser.add_option("-t", "--disk-template", dest="disk_template", choices=("diskless", "file", "plain", "drbd"), default="drbd", help="Disk template (diskless, file, plain or drbd)" " [drbd]") parser.add_option("-n", "--nodes", dest="nodes", default="", help="Comma separated list of nodes to perform" " the burnin on (defaults to all nodes)") parser.add_option("--iallocator", dest="iallocator", default=None, type="string", help="Perform the allocation using an iallocator" " instead of fixed node spread (node restrictions no" " longer apply, therefore -n/--nodes must not be used") parser.add_option("-p", "--parallel", default=False, action="store_true", dest="parallel", help="Enable parallelization of some operations in" " order to speed burnin or to test granular locking") parser.add_option("--net-timeout", default=15, type="int", dest="net_timeout", help="The instance check network timeout in seconds" " (defaults to 15 seconds)") parser.add_option("-C", "--http-check", default=False, action="store_true", dest="http_check", help="Enable checking of instance status via http," " looking for /hostname.txt that should contain the" " name of the instance") options, args = parser.parse_args() if len(args) < 1 or options.os is None: Usage() supported_disk_templates = (constants.DT_DISKLESS, constants.DT_FILE, constants.DT_PLAIN, constants.DT_DRBD8) if options.disk_template not in supported_disk_templates: Log("Unknown disk template '%s'" % options.disk_template) sys.exit(1) if options.disk_template == constants.DT_DISKLESS: disk_size = disk_growth = [] options.do_addremove_disks = False else: disk_size = [utils.ParseUnit(v) for v in options.disk_size.split(",")] disk_growth = [utils.ParseUnit(v) for v in options.disk_growth.split(",")] if len(disk_growth) != len(disk_size): Log("Wrong disk sizes/growth combination") sys.exit(1) if ((disk_size and options.disk_template == constants.DT_DISKLESS) or (not disk_size and options.disk_template != constants.DT_DISKLESS)): Log("Wrong disk count/disk template combination") sys.exit(1) self.disk_size = disk_size self.disk_growth = disk_growth self.disk_count = len(disk_size) if options.nodes and options.iallocator: Log("Give either the nodes option or the iallocator option, not both") sys.exit(1) self.opts = options self.instances = args self.bep = { constants.BE_MEMORY: options.mem_size, constants.BE_VCPUS: 1, } self.hvp = {} socket.setdefaulttimeout(options.net_timeout) def GetState(self): """Read the cluster state from the config.""" if self.opts.nodes: names = self.opts.nodes.split(",") else: names = [] try: op = opcodes.OpQueryNodes(output_fields=["name", "offline"], names=names) result = self.ExecOp(op) except errors.GenericError, err: err_code, msg = cli.FormatError(err) Log(msg) sys.exit(err_code) self.nodes = [data[0] for data in result if not data[1]] result = self.ExecOp(opcodes.OpDiagnoseOS(output_fields=["name", "valid"], names=[])) if not result: Log("Can't get the OS list") sys.exit(1) # filter non-valid OS-es os_set = [val[0] for val in result if val[1]] if self.opts.os not in os_set: Log("OS '%s' not found" % self.opts.os) sys.exit(1) def CreateInstances(self): """Create the given instances. """ self.to_rem = [] mytor = izip(cycle(self.nodes), islice(cycle(self.nodes), 1, None), self.instances) jobset = [] for pnode, snode, instance in mytor: if self.opts.iallocator: pnode = snode = None Log("- Add instance %s (iallocator: %s)" % (instance, self.opts.iallocator)) elif self.opts.disk_template not in constants.DTS_NET_MIRROR: snode = None Log("- Add instance %s on node %s" % (instance, pnode)) else: Log("- Add instance %s on nodes %s/%s" % (instance, pnode, snode)) op = opcodes.OpCreateInstance(instance_name=instance, disks = [ {"size": size} for size in self.disk_size], disk_template=self.opts.disk_template, nics=self.opts.nics, mode=constants.INSTANCE_CREATE, os_type=self.opts.os, pnode=pnode, snode=snode, start=True, ip_check=True, wait_for_sync=True, file_driver="loop", file_storage_dir=None, iallocator=self.opts.iallocator, beparams=self.bep, hvparams=self.hvp, ) if self.opts.parallel: jobset.append([op]) # FIXME: here we should not append to to_rem uncoditionally, # but only when the job is successful self.to_rem.append(instance) else: self.ExecOp(op) self.to_rem.append(instance) if self.opts.parallel: self.ExecJobSet(jobset) for instance in self.instances: self._CheckInstanceAlive(instance) def GrowDisks(self): """Grow both the os and the swap disks by the requested amount, if any.""" for instance in self.instances: for idx, growth in enumerate(self.disk_growth): if growth > 0: op = opcodes.OpGrowDisk(instance_name=instance, disk=idx, amount=growth, wait_for_sync=True) Log("- Increase %s's disk/%s by %s MB" % (instance, idx, growth)) self.ExecOp(op) def ReplaceDisks1D8(self): """Replace disks on primary and secondary for drbd8.""" for instance in self.instances: for mode in constants.REPLACE_DISK_SEC, constants.REPLACE_DISK_PRI: op = opcodes.OpReplaceDisks(instance_name=instance, mode=mode, disks=[i for i in range(self.disk_count)]) Log("- Replace disks (%s) for instance %s" % (mode, instance)) self.ExecOp(op) def ReplaceDisks2(self): """Replace secondary node.""" mode = constants.REPLACE_DISK_CHG mytor = izip(islice(cycle(self.nodes), 2, None), self.instances) for tnode, instance in mytor: if self.opts.iallocator: tnode = None op = opcodes.OpReplaceDisks(instance_name=instance, mode=mode, remote_node=tnode, iallocator=self.opts.iallocator, disks=[i for i in range(self.disk_count)]) Log("- Replace secondary (%s) for instance %s" % (mode, instance)) self.ExecOp(op) def Failover(self): """Failover the instances.""" for instance in self.instances: op = opcodes.OpFailoverInstance(instance_name=instance, ignore_consistency=False) Log("- Failover instance %s" % (instance)) self.ExecOp(op) for instance in self.instances: self._CheckInstanceAlive(instance) def ImportExport(self): """Export the instance, delete it, and import it back. """ mytor = izip(cycle(self.nodes), islice(cycle(self.nodes), 1, None), islice(cycle(self.nodes), 2, None), self.instances) for pnode, snode, enode, instance in mytor: if self.opts.iallocator: pnode = snode = None import_log_msg = ("- Import instance %s from node %s" " (iallocator: %s)" % (instance, enode, self.opts.iallocator)) elif self.opts.disk_template not in constants.DTS_NET_MIRROR: snode = None import_log_msg = ("- Import instance %s from node %s to node %s" % (instance, enode, pnode)) else: import_log_msg = ("- Import instance %s from node %s to nodes %s/%s" % (instance, enode, pnode, snode)) exp_op = opcodes.OpExportInstance(instance_name=instance, target_node=enode, shutdown=True) rem_op = opcodes.OpRemoveInstance(instance_name=instance, ignore_failures=True) nam_op = opcodes.OpQueryInstances(output_fields=["name"], names=[instance]) full_name = self.ExecOp(nam_op)[0][0] imp_dir = os.path.join(constants.EXPORT_DIR, full_name) imp_op = opcodes.OpCreateInstance(instance_name=instance, disks = [ {"size": size} for size in self.disk_size], disk_template=self.opts.disk_template, nics=self.opts.nics, mode=constants.INSTANCE_IMPORT, src_node=enode, src_path=imp_dir, pnode=pnode, snode=snode, start=True, ip_check=True, wait_for_sync=True, file_storage_dir=None, file_driver="loop", iallocator=self.opts.iallocator, beparams=self.bep, hvparams=self.hvp, ) erem_op = opcodes.OpRemoveExport(instance_name=instance) Log("- Export instance %s to node %s" % (instance, enode)) self.ExecOp(exp_op) Log("- Remove instance %s" % (instance)) self.ExecOp(rem_op) self.to_rem.remove(instance) Log(import_log_msg) self.ExecOp(imp_op) Log("- Remove export of instance %s" % (instance)) self.ExecOp(erem_op) self.to_rem.append(instance) for instance in self.instances: self._CheckInstanceAlive(instance) def StopInstance(self, instance): """Stop given instance.""" op = opcodes.OpShutdownInstance(instance_name=instance) Log("- Shutdown instance %s" % instance) self.ExecOp(op) def StartInstance(self, instance): """Start given instance.""" op = opcodes.OpStartupInstance(instance_name=instance, force=False) Log("- Start instance %s" % instance) self.ExecOp(op) def RenameInstance(self, instance, instance_new): """Rename instance.""" op = opcodes.OpRenameInstance(instance_name=instance, new_name=instance_new) Log("- Rename instance %s to %s" % (instance, instance_new)) self.ExecOp(op) def StopStart(self): """Stop/start the instances.""" for instance in self.instances: self.StopInstance(instance) self.StartInstance(instance) for instance in self.instances: self._CheckInstanceAlive(instance) def Remove(self): """Remove the instances.""" for instance in self.to_rem: op = opcodes.OpRemoveInstance(instance_name=instance, ignore_failures=True) Log("- Remove instance %s" % instance) self.ExecOp(op) def Rename(self): """Rename the instances.""" rename = self.opts.rename for instance in self.instances: self.StopInstance(instance) self.RenameInstance(instance, rename) self.StartInstance(rename) self._CheckInstanceAlive(rename) self.StopInstance(rename) self.RenameInstance(rename, instance) self.StartInstance(instance) for instance in self.instances: self._CheckInstanceAlive(instance) def Reinstall(self): """Reinstall the instances.""" for instance in self.instances: self.StopInstance(instance) op = opcodes.OpReinstallInstance(instance_name=instance) Log("- Reinstall instance %s without passing the OS" % (instance,)) self.ExecOp(op) op = opcodes.OpReinstallInstance(instance_name=instance, os_type=self.opts.os) Log("- Reinstall instance %s specifying the OS" % (instance,)) self.ExecOp(op) self.StartInstance(instance) for instance in self.instances: self._CheckInstanceAlive(instance) def Reboot(self): """Reinstall the instances.""" for instance in self.instances: for reboot_type in constants.REBOOT_TYPES: op = opcodes.OpRebootInstance(instance_name=instance, reboot_type=reboot_type, ignore_secondaries=False) Log("- Reboot instance %s with type '%s'" % (instance, reboot_type)) self.ExecOp(op) self._CheckInstanceAlive(instance) def ActivateDisks(self): """Activate and deactivate disks of the instances.""" for instance in self.instances: op_act = opcodes.OpActivateInstanceDisks(instance_name=instance) op_deact = opcodes.OpDeactivateInstanceDisks(instance_name=instance) Log("- Activate disks of online instance %s" % (instance,)) self.ExecOp(op_act) self.StopInstance(instance) Log("- Activate disks of offline instance %s" % (instance,)) self.ExecOp(op_act) Log("- Deactivate disks of offline instance %s" % (instance,)) self.ExecOp(op_deact) self.StartInstance(instance) for instance in self.instances: self._CheckInstanceAlive(instance) def AddRemoveDisks(self): """Add and remove an extra disk for the instances.""" for instance in self.instances: op_add = opcodes.OpSetInstanceParams(\ instance_name=instance, disks=[(constants.DDM_ADD, {"size": self.disk_size[0]})]) op_rem = opcodes.OpSetInstanceParams(\ instance_name=instance, disks=[(constants.DDM_REMOVE, {})]) Log("- Adding a disk to instance %s" % (instance,)) self.ExecOp(op_add) self.StopInstance(instance) Log("- Removing the last disk of instance %s" % (instance,)) self.ExecOp(op_rem) self.StartInstance(instance) for instance in self.instances: self._CheckInstanceAlive(instance) def AddRemoveNICs(self): """Add and remove an extra NIC for the instances.""" for instance in self.instances: op_add = opcodes.OpSetInstanceParams(\ instance_name=instance, nics=[(constants.DDM_ADD, {})]) op_rem = opcodes.OpSetInstanceParams(\ instance_name=instance, nics=[(constants.DDM_REMOVE, {})]) Log("- Adding a NIC to instance %s" % (instance,)) self.ExecOp(op_add) Log("- Removing the last NIC of instance %s" % (instance,)) self.ExecOp(op_rem) def _CheckInstanceAlive(self, instance): """Check if an instance is alive by doing http checks. This will try to retrieve the url on the instance /hostname.txt and check that it contains the hostname of the instance. In case we get ECONNREFUSED, we retry up to the net timeout seconds, for any other error we abort. """ if not self.opts.http_check: return try: for retries in range(self.opts.net_timeout): try: url = urllib2.urlopen("http://%s/hostname.txt" % instance) except urllib2.URLError, err: if err.args[0][0] == errno.ECONNREFUSED: time.sleep(1) continue raise except urllib2.URLError, err: raise InstanceDown(instance, str(err)) hostname = url.read().strip() if hostname != instance: raise InstanceDown(instance, ("Hostname mismatch, expected %s, got %s" % (instance, hostname))) def BurninCluster(self): """Test a cluster intensively. This will create instances and then start/stop/failover them. It is safe for existing instances but could impact performance. """ opts = self.opts Log("- Testing global parameters") if (len(self.nodes) == 1 and opts.disk_template not in (constants.DT_DISKLESS, constants.DT_PLAIN, constants.DT_FILE)): Log("When one node is available/selected the disk template must" " be 'diskless', 'file' or 'plain'") sys.exit(1) has_err = True try: self.CreateInstances() if opts.do_replace1 and opts.disk_template in constants.DTS_NET_MIRROR: self.ReplaceDisks1D8() if (opts.do_replace2 and len(self.nodes) > 2 and opts.disk_template in constants.DTS_NET_MIRROR) : self.ReplaceDisks2() if opts.disk_template != constants.DT_DISKLESS: self.GrowDisks() if opts.do_failover and opts.disk_template in constants.DTS_NET_MIRROR: self.Failover() if (opts.do_importexport and opts.disk_template not in (constants.DT_DISKLESS, constants.DT_FILE)): self.ImportExport() if opts.do_reinstall: self.Reinstall() if opts.do_reboot: self.Reboot() if opts.do_addremove_disks: self.AddRemoveDisks() if opts.do_addremove_nics: self.AddRemoveNICs() if opts.do_activate_disks: self.ActivateDisks() if opts.do_startstop: self.StopStart() if opts.rename: self.Rename() has_err = False finally: if has_err: Log("Error detected: opcode buffer follows:\n\n") Log(self.GetFeedbackBuf()) Log("\n\n") self.Remove() return 0 def main(): """Main function""" burner = Burner() return burner.BurninCluster() if __name__ == "__main__": main()