X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/79f87a76432e3dff9bd5b61841bf18617638c7b3..a619a1dd084ee43f5c39c330554d48da08ffc85c:/tools/burnin diff --git a/tools/burnin b/tools/burnin index 4110c48..91aae6b 100755 --- a/tools/burnin +++ b/tools/burnin @@ -1,20 +1,63 @@ #!/usr/bin/python # +# Copyright (C) 2006, 2007 Google Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA +# 02110-1301, USA. + + +"""Burnin program + +""" + import sys import optparse +import time +import socket +import urllib +from itertools import izip, islice, cycle +from cStringIO import StringIO from ganeti import opcodes -from ganeti import mcpu -from ganeti import objects from ganeti import constants from ganeti import cli -from ganeti import logger from ganeti import errors from ganeti import utils +from ganeti import ssconf + +from ganeti.confd import client as confd_client + USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...") +MAX_RETRIES = 3 +LOG_HEADERS = { + 0: "- ", + 1: "* ", + 2: "" + } + +class InstanceDown(Exception): + """The checked instance was not up""" + + +class BurninFailure(Exception): + """Failure detected during burning""" + + def Usage(): """Shows program usage information and exits the program.""" @@ -23,193 +66,975 @@ def Usage(): sys.exit(2) -def Feedback(msg): +def Log(msg, *args, **kwargs): """Simple function that prints out its argument. """ - print msg + if args: + msg = msg % args + indent = kwargs.get('indent', 0) + sys.stdout.write("%*s%s%s\n" % (2*indent, "", + LOG_HEADERS.get(indent, " "), msg)) + sys.stdout.flush() + + +def Err(msg, exit_code=1): + """Simple error logging that prints to stderr. + + """ + sys.stderr.write(msg + "\n") + sys.stderr.flush() + sys.exit(exit_code) -def ParseOptions(): - """Parses the command line options. +class SimpleOpener(urllib.FancyURLopener): + """A simple url opener""" + # pylint: disable-msg=W0221 - In case of command line errors, it will show the usage and exit the - program. + def prompt_user_passwd(self, host, realm, clear_cache=0): + """No-interaction version of prompt_user_passwd.""" + # we follow parent class' API + # pylint: disable-msg=W0613 + return None, None + + def http_error_default(self, url, fp, errcode, errmsg, headers): + """Custom error handling""" + # make sure sockets are not left in CLOSE_WAIT, this is similar + # but with a different exception to the BasicURLOpener class + _ = fp.read() # throw away data + fp.close() + raise InstanceDown("HTTP error returned: code %s, msg %s" % + (errcode, errmsg)) + + +OPTIONS = [ + cli.cli_option("-o", "--os", dest="os", default=None, + help="OS to use during burnin", + metavar="", + completion_suggest=cli.OPT_COMPL_ONE_OS), + cli.cli_option("--disk-size", dest="disk_size", + help="Disk size (determines disk count)", + default="128m", type="string", metavar="", + completion_suggest=("128M 512M 1G 4G 1G,256M" + " 4G,1G,1G 10G").split()), + cli.cli_option("--disk-growth", dest="disk_growth", help="Disk growth", + default="128m", type="string", metavar=""), + cli.cli_option("--mem-size", dest="mem_size", help="Memory size", + default=128, type="unit", metavar="", + completion_suggest=("128M 256M 512M 1G 4G 8G" + " 12G 16G").split()), + cli.DEBUG_OPT, + cli.VERBOSE_OPT, + cli.NOIPCHECK_OPT, + cli.NONAMECHECK_OPT, + cli.EARLY_RELEASE_OPT, + cli.cli_option("--no-replace1", dest="do_replace1", + help="Skip disk replacement with the same secondary", + action="store_false", default=True), + cli.cli_option("--no-replace2", dest="do_replace2", + help="Skip disk replacement with a different secondary", + action="store_false", default=True), + cli.cli_option("--no-failover", dest="do_failover", + help="Skip instance failovers", action="store_false", + default=True), + cli.cli_option("--no-migrate", dest="do_migrate", + help="Skip instance live migration", + action="store_false", default=True), + cli.cli_option("--no-move", dest="do_move", + help="Skip instance moves", action="store_false", + default=True), + cli.cli_option("--no-importexport", dest="do_importexport", + help="Skip instance export/import", action="store_false", + default=True), + cli.cli_option("--no-startstop", dest="do_startstop", + help="Skip instance stop/start", action="store_false", + default=True), + cli.cli_option("--no-reinstall", dest="do_reinstall", + help="Skip instance reinstall", action="store_false", + default=True), + cli.cli_option("--no-reboot", dest="do_reboot", + help="Skip instance reboot", action="store_false", + default=True), + cli.cli_option("--no-activate-disks", dest="do_activate_disks", + help="Skip disk activation/deactivation", + action="store_false", default=True), + cli.cli_option("--no-add-disks", dest="do_addremove_disks", + help="Skip disk addition/removal", + action="store_false", default=True), + cli.cli_option("--no-add-nics", dest="do_addremove_nics", + help="Skip NIC addition/removal", + action="store_false", default=True), + cli.cli_option("--no-nics", dest="nics", + help="No network interfaces", action="store_const", + const=[], default=[{}]), + cli.cli_option("--no-confd", dest="do_confd_tests", + help="Skip confd queries", + action="store_false", default=True), + cli.cli_option("--rename", dest="rename", default=None, + help=("Give one unused instance name which is taken" + " to start the renaming sequence"), + metavar=""), + cli.cli_option("-t", "--disk-template", dest="disk_template", + choices=list(constants.DISK_TEMPLATES), + default=constants.DT_DRBD8, + help="Disk template (diskless, file, plain or drbd) [drbd]"), + cli.cli_option("-n", "--nodes", dest="nodes", default="", + help=("Comma separated list of nodes to perform" + " the burnin on (defaults to all nodes)"), + completion_suggest=cli.OPT_COMPL_MANY_NODES), + cli.cli_option("-I", "--iallocator", dest="iallocator", + default=None, type="string", + help=("Perform the allocation using an iallocator" + " instead of fixed node spread (node restrictions no" + " longer apply, therefore -n/--nodes must not be" + " used"), + completion_suggest=cli.OPT_COMPL_ONE_IALLOCATOR), + cli.cli_option("-p", "--parallel", default=False, action="store_true", + dest="parallel", + help=("Enable parallelization of some operations in" + " order to speed burnin or to test granular locking")), + cli.cli_option("--net-timeout", default=15, type="int", + dest="net_timeout", + help=("The instance check network timeout in seconds" + " (defaults to 15 seconds)"), + completion_suggest="15 60 300 900".split()), + cli.cli_option("-C", "--http-check", default=False, action="store_true", + dest="http_check", + help=("Enable checking of instance status via http," + " looking for /hostname.txt that should contain the" + " name of the instance")), + cli.cli_option("-K", "--keep-instances", default=False, + action="store_true", + dest="keep_instances", + help=("Leave instances on the cluster after burnin," + " for investigation in case of errors or simply" + " to use them")), + ] + +# Mainly used for bash completion +ARGUMENTS = [cli.ArgInstance(min=1)] + + +def _DoCheckInstances(fn): + """Decorator for checking instances. - Returns: - (options, args), as returned by OptionParser.parse_args """ + def wrapper(self, *args, **kwargs): + val = fn(self, *args, **kwargs) + for instance in self.instances: + self._CheckInstanceAlive(instance) # pylint: disable-msg=W0212 + return val + + return wrapper + + +def _DoBatch(retry): + """Decorator for possible batch operations. - parser = optparse.OptionParser(usage="\n%s" % USAGE, - version="%%prog (ganeti) %s" % - constants.RELEASE_VERSION, - option_class=cli.CliOption) - - parser.add_option("-o", "--os", dest="os", default=None, - help="OS to use during burnin", - metavar="") - parser.add_option("--os-size", dest="os_size", help="Disk size", - default=4 * 1024, type="unit", metavar="") - parser.add_option("--swap-size", dest="swap_size", help="Swap size", - default=4 * 1024, type="unit", metavar="") - parser.add_option("-v", "--verbose", - action="store_true", dest="verbose", default=False, - help="print command execution messages to stdout") - parser.add_option("--no-replace1", dest="do_replace1", - help="Do disk replacement with the same secondary", - action="store_false", default=True) - parser.add_option("--no-replace2", dest="do_replace2", - help="Do disk replacement with a different secondary", - action="store_false", default=True) - parser.add_option("--no-failover", dest="do_failover", - help="Do instance failovers", action="store_false", - default=True) - parser.add_option("-t", "--disk-template", dest="disk_template", - choices=("remote_raid1", "drbd8"), default="remote_raid1", - help="Template type for network mirroring (remote_raid1" - " or drbd8) [remote_raid1]") - - options, args = parser.parse_args() - if len(args) < 1 or options.os is None: - Usage() - - return options, args - - -def BurninCluster(opts, args): - """Test a cluster intensively. - - This will create instances and then start/stop/failover them. - It is safe for existing instances but could impact performance. + Must come after the _DoCheckInstances decorator (if any). + + @param retry: whether this is a retryable batch, will be + passed to StartBatch """ + def wrap(fn): + def batched(self, *args, **kwargs): + self.StartBatch(retry) + val = fn(self, *args, **kwargs) + self.CommitQueue() + return val + return batched + + return wrap + + +class Burner(object): + """Burner class.""" + + def __init__(self): + """Constructor.""" + utils.SetupLogging(constants.LOG_BURNIN, debug=False, stderr_logging=True) + self.url_opener = SimpleOpener() + self._feed_buf = StringIO() + self.nodes = [] + self.instances = [] + self.to_rem = [] + self.queued_ops = [] + self.opts = None + self.queue_retry = False + self.disk_count = self.disk_growth = self.disk_size = None + self.hvp = self.bep = None + self.ParseOptions() + self.cl = cli.GetClient() + self.ss = ssconf.SimpleStore() + self.GetState() + + def ClearFeedbackBuf(self): + """Clear the feedback buffer.""" + self._feed_buf.truncate(0) + + def GetFeedbackBuf(self): + """Return the contents of the buffer.""" + return self._feed_buf.getvalue() + + def Feedback(self, msg): + """Acumulate feedback in our buffer.""" + formatted_msg = "%s %s" % (time.ctime(utils.MergeTime(msg[0])), msg[2]) + self._feed_buf.write(formatted_msg + "\n") + if self.opts.verbose: + Log(formatted_msg, indent=3) + + def MaybeRetry(self, retry_count, msg, fn, *args): + """Possibly retry a given function execution. - logger.SetupLogging(debug=True, program="ganeti/burnin") - proc = mcpu.Processor(feedback=Feedback) - result = proc.ExecOpCode(opcodes.OpQueryNodes(output_fields=["name"], - names=[])) - nodelist = [data[0] for data in result] - - Feedback("- Testing global parameters") - - result = proc.ExecOpCode(opcodes.OpDiagnoseOS()) - - if not result: - Feedback("Can't get the OS list") - return 1 - - # filter non-valid OS-es - oses = {} - for node_name in result: - oses[node_name] = [obj for obj in result[node_name] - if isinstance(obj, objects.OS)] - - fnode = oses.keys()[0] - os_set = set([os_inst.name for os_inst in oses[fnode]]) - del oses[fnode] - for node in oses: - os_set &= set([os_inst.name for os_inst in oses[node]]) - - if opts.os not in os_set: - Feedback("OS '%s' not found" % opts.os) - return 1 - - to_remove = [] - if opts.disk_template == "remote_raid1": - disk_template = constants.DT_REMOTE_RAID1 - elif opts.disk_template == "drbd8": - disk_template = constants.DT_DRBD8 - else: - Feedback("Unknown disk template '%s'" % opts.disk_template) - return 1 - try: - idx = 0 - for instance_name in args: - next_idx = idx + 1 - if next_idx >= len(nodelist): - next_idx = 0 - pnode = nodelist[idx] - snode = nodelist[next_idx] - if len(nodelist) > 1: - tplate = disk_template + @type retry_count: int + @param retry_count: retry counter: + - 0: non-retryable action + - 1: last retry for a retryable action + - MAX_RETRIES: original try for a retryable action + @type msg: str + @param msg: the kind of the operation + @type fn: callable + @param fn: the function to be called + + """ + try: + val = fn(*args) + if retry_count > 0 and retry_count < MAX_RETRIES: + Log("Idempotent %s succeeded after %d retries", + msg, MAX_RETRIES - retry_count) + return val + except Exception, err: # pylint: disable-msg=W0703 + if retry_count == 0: + Log("Non-idempotent %s failed, aborting", msg) + raise + elif retry_count == 1: + Log("Idempotent %s repeated failure, aborting", msg) + raise else: - tplate = constants.DT_PLAIN + Log("Idempotent %s failed, retry #%d/%d: %s", + msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err) + self.MaybeRetry(retry_count - 1, msg, fn, *args) + + def _SetDebug(self, ops): + """Set the debug value on the given opcodes""" + for op in ops: + op.debug_level = self.opts.debug + + def _ExecOp(self, *ops): + """Execute one or more opcodes and manage the exec buffer. + + @result: if only opcode has been passed, we return its result; + otherwise we return the list of results + + """ + job_id = cli.SendJob(ops, cl=self.cl) + results = cli.PollJob(job_id, cl=self.cl, feedback_fn=self.Feedback) + if len(ops) == 1: + return results[0] + else: + return results + + def ExecOp(self, retry, *ops): + """Execute one or more opcodes and manage the exec buffer. + + @result: if only opcode has been passed, we return its result; + otherwise we return the list of results + + """ + if retry: + rval = MAX_RETRIES + else: + rval = 0 + self._SetDebug(ops) + return self.MaybeRetry(rval, "opcode", self._ExecOp, *ops) + + def ExecOrQueue(self, name, *ops): + """Execute an opcode and manage the exec buffer.""" + if self.opts.parallel: + self._SetDebug(ops) + self.queued_ops.append((ops, name)) + else: + return self.ExecOp(self.queue_retry, *ops) + + def StartBatch(self, retry): + """Start a new batch of jobs. + + @param retry: whether this is a retryable batch + + """ + self.queued_ops = [] + self.queue_retry = retry + + def CommitQueue(self): + """Execute all submitted opcodes in case of parallel burnin""" + if not self.opts.parallel: + return + + if self.queue_retry: + rval = MAX_RETRIES + else: + rval = 0 + + try: + results = self.MaybeRetry(rval, "jobset", self.ExecJobSet, + self.queued_ops) + finally: + self.queued_ops = [] + return results + + def ExecJobSet(self, jobs): + """Execute a set of jobs and return once all are done. + + The method will return the list of results, if all jobs are + successful. Otherwise, OpExecError will be raised from within + cli.py. + + """ + self.ClearFeedbackBuf() + jex = cli.JobExecutor(cl=self.cl, feedback_fn=self.Feedback) + for ops, name in jobs: + jex.QueueJob(name, *ops) # pylint: disable-msg=W0142 + try: + results = jex.GetResults() + except Exception, err: # pylint: disable-msg=W0703 + Log("Jobs failed: %s", err) + raise BurninFailure() + + if utils.any(results, lambda x: not x[0]): + raise BurninFailure() + + return [i[1] for i in results] + + def ParseOptions(self): + """Parses the command line options. + + In case of command line errors, it will show the usage and exit the + program. + + """ + parser = optparse.OptionParser(usage="\n%s" % USAGE, + version=("%%prog (ganeti) %s" % + constants.RELEASE_VERSION), + option_list=OPTIONS) + + options, args = parser.parse_args() + if len(args) < 1 or options.os is None: + Usage() + + supported_disk_templates = (constants.DT_DISKLESS, + constants.DT_FILE, + constants.DT_PLAIN, + constants.DT_DRBD8) + if options.disk_template not in supported_disk_templates: + Err("Unknown disk template '%s'" % options.disk_template) + + if options.disk_template == constants.DT_DISKLESS: + disk_size = disk_growth = [] + options.do_addremove_disks = False + else: + disk_size = [utils.ParseUnit(v) for v in options.disk_size.split(",")] + disk_growth = [utils.ParseUnit(v) + for v in options.disk_growth.split(",")] + if len(disk_growth) != len(disk_size): + Err("Wrong disk sizes/growth combination") + if ((disk_size and options.disk_template == constants.DT_DISKLESS) or + (not disk_size and options.disk_template != constants.DT_DISKLESS)): + Err("Wrong disk count/disk template combination") + + self.disk_size = disk_size + self.disk_growth = disk_growth + self.disk_count = len(disk_size) + + if options.nodes and options.iallocator: + Err("Give either the nodes option or the iallocator option, not both") + + if options.http_check and not options.name_check: + Err("Can't enable HTTP checks without name checks") + + self.opts = options + self.instances = args + self.bep = { + constants.BE_MEMORY: options.mem_size, + constants.BE_VCPUS: 1, + } + self.hvp = {} + + socket.setdefaulttimeout(options.net_timeout) + + def GetState(self): + """Read the cluster state from the master daemon.""" + if self.opts.nodes: + names = self.opts.nodes.split(",") + else: + names = [] + try: + op = opcodes.OpQueryNodes(output_fields=["name", "offline", "drained"], + names=names, use_locking=True) + result = self.ExecOp(True, op) + except errors.GenericError, err: + err_code, msg = cli.FormatError(err) + Err(msg, exit_code=err_code) + self.nodes = [data[0] for data in result if not (data[1] or data[2])] + + op_diagnose = opcodes.OpDiagnoseOS(output_fields=["name", "valid", + "variants"], names=[]) + result = self.ExecOp(True, op_diagnose) + + if not result: + Err("Can't get the OS list") + + found = False + for (name, valid, variants) in result: + if valid and self.opts.os in cli.CalculateOSNames(name, variants): + found = True + break + + if not found: + Err("OS '%s' not found" % self.opts.os) - op = opcodes.OpCreateInstance(instance_name=instance_name, mem_size=128, - disk_size=opts.os_size, - swap_size=opts.swap_size, - disk_template=tplate, + cluster_info = self.cl.QueryClusterInfo() + self.cluster_info = cluster_info + if not self.cluster_info: + Err("Can't get cluster info") + + default_nic_params = self.cluster_info["nicparams"][constants.PP_DEFAULT] + self.cluster_default_nicparams = default_nic_params + + @_DoCheckInstances + @_DoBatch(False) + def BurnCreateInstances(self): + """Create the given instances. + + """ + self.to_rem = [] + mytor = izip(cycle(self.nodes), + islice(cycle(self.nodes), 1, None), + self.instances) + + Log("Creating instances") + for pnode, snode, instance in mytor: + Log("instance %s", instance, indent=1) + if self.opts.iallocator: + pnode = snode = None + msg = "with iallocator %s" % self.opts.iallocator + elif self.opts.disk_template not in constants.DTS_NET_MIRROR: + snode = None + msg = "on %s" % pnode + else: + msg = "on %s, %s" % (pnode, snode) + + Log(msg, indent=2) + + op = opcodes.OpCreateInstance(instance_name=instance, + disks = [ {"size": size} + for size in self.disk_size], + disk_template=self.opts.disk_template, + nics=self.opts.nics, mode=constants.INSTANCE_CREATE, - os_type=opts.os, pnode=pnode, - snode=snode, vcpus=1, + os_type=self.opts.os, + pnode=pnode, + snode=snode, start=True, - ip_check=True, - wait_for_sync=True) - Feedback("- Add instance %s on node %s" % (instance_name, pnode)) - result = proc.ExecOpCode(op) - to_remove.append(instance_name) - idx = next_idx - - - if opts.do_replace1: - if len(nodelist) > 1: - # failover - for instance_name in args: - op = opcodes.OpReplaceDisks(instance_name=instance_name, - remote_node=None, - mode=constants.REPLACE_DISK_ALL, - disks=["sda", "sdb"]) - - Feedback("- Replace disks for instance %s" % (instance_name)) - result = proc.ExecOpCode(op) + ip_check=self.opts.ip_check, + name_check=self.opts.name_check, + wait_for_sync=True, + file_driver="loop", + file_storage_dir=None, + iallocator=self.opts.iallocator, + beparams=self.bep, + hvparams=self.hvp, + ) + + self.ExecOrQueue(instance, op) + self.to_rem.append(instance) + + @_DoBatch(False) + def BurnGrowDisks(self): + """Grow both the os and the swap disks by the requested amount, if any.""" + Log("Growing disks") + for instance in self.instances: + Log("instance %s", instance, indent=1) + for idx, growth in enumerate(self.disk_growth): + if growth > 0: + op = opcodes.OpGrowDisk(instance_name=instance, disk=idx, + amount=growth, wait_for_sync=True) + Log("increase disk/%s by %s MB", idx, growth, indent=2) + self.ExecOrQueue(instance, op) + + @_DoBatch(True) + def BurnReplaceDisks1D8(self): + """Replace disks on primary and secondary for drbd8.""" + Log("Replacing disks on the same nodes") + for instance in self.instances: + Log("instance %s", instance, indent=1) + ops = [] + for mode in constants.REPLACE_DISK_SEC, constants.REPLACE_DISK_PRI: + op = opcodes.OpReplaceDisks(instance_name=instance, + mode=mode, + disks=[i for i in range(self.disk_count)], + early_release=self.opts.early_release) + Log("run %s", mode, indent=2) + ops.append(op) + self.ExecOrQueue(instance, *ops) # pylint: disable-msg=W0142 + + @_DoBatch(True) + def BurnReplaceDisks2(self): + """Replace secondary node.""" + Log("Changing the secondary node") + mode = constants.REPLACE_DISK_CHG + + mytor = izip(islice(cycle(self.nodes), 2, None), + self.instances) + for tnode, instance in mytor: + Log("instance %s", instance, indent=1) + if self.opts.iallocator: + tnode = None + msg = "with iallocator %s" % self.opts.iallocator else: - Feedback("- Can't run replace1, not enough nodes") + msg = tnode + op = opcodes.OpReplaceDisks(instance_name=instance, + mode=mode, + remote_node=tnode, + iallocator=self.opts.iallocator, + disks=[], + early_release=self.opts.early_release) + Log("run %s %s", mode, msg, indent=2) + self.ExecOrQueue(instance, op) + + @_DoCheckInstances + @_DoBatch(False) + def BurnFailover(self): + """Failover the instances.""" + Log("Failing over instances") + for instance in self.instances: + Log("instance %s", instance, indent=1) + op = opcodes.OpFailoverInstance(instance_name=instance, + ignore_consistency=False) + self.ExecOrQueue(instance, op) + + @_DoCheckInstances + @_DoBatch(False) + def BurnMove(self): + """Move the instances.""" + Log("Moving instances") + mytor = izip(islice(cycle(self.nodes), 1, None), + self.instances) + for tnode, instance in mytor: + Log("instance %s", instance, indent=1) + op = opcodes.OpMoveInstance(instance_name=instance, + target_node=tnode) + self.ExecOrQueue(instance, op) + + @_DoBatch(False) + def BurnMigrate(self): + """Migrate the instances.""" + Log("Migrating instances") + for instance in self.instances: + Log("instance %s", instance, indent=1) + op1 = opcodes.OpMigrateInstance(instance_name=instance, live=True, + cleanup=False) + + op2 = opcodes.OpMigrateInstance(instance_name=instance, live=True, + cleanup=True) + Log("migration and migration cleanup", indent=2) + self.ExecOrQueue(instance, op1, op2) + + @_DoCheckInstances + @_DoBatch(False) + def BurnImportExport(self): + """Export the instance, delete it, and import it back. + + """ + Log("Exporting and re-importing instances") + mytor = izip(cycle(self.nodes), + islice(cycle(self.nodes), 1, None), + islice(cycle(self.nodes), 2, None), + self.instances) - if opts.do_failover: - if len(nodelist) > 1: - # failover - for instance_name in args: - op = opcodes.OpFailoverInstance(instance_name=instance_name, - ignore_consistency=True) + for pnode, snode, enode, instance in mytor: + Log("instance %s", instance, indent=1) + # read the full name of the instance + nam_op = opcodes.OpQueryInstances(output_fields=["name"], + names=[instance], use_locking=True) + full_name = self.ExecOp(False, nam_op)[0][0] - Feedback("- Failover instance %s" % (instance_name)) - result = proc.ExecOpCode(op) + if self.opts.iallocator: + pnode = snode = None + import_log_msg = ("import from %s" + " with iallocator %s" % + (enode, self.opts.iallocator)) + elif self.opts.disk_template not in constants.DTS_NET_MIRROR: + snode = None + import_log_msg = ("import from %s to %s" % + (enode, pnode)) else: - Feedback("- Can't run failovers, not enough nodes") - - # stop / start - for instance_name in args: - op = opcodes.OpShutdownInstance(instance_name=instance_name) - Feedback("- Shutdown instance %s" % instance_name) - result = proc.ExecOpCode(op) - op = opcodes.OpStartupInstance(instance_name=instance_name, force=False) - Feedback("- Start instance %s" % instance_name) - result = proc.ExecOpCode(op) - - finally: - # remove - for instance_name in to_remove: - op = opcodes.OpRemoveInstance(instance_name=instance_name) - Feedback("- Remove instance %s" % instance_name) - result = proc.ExecOpCode(op) - - return 0 + import_log_msg = ("import from %s to %s, %s" % + (enode, pnode, snode)) + + exp_op = opcodes.OpExportInstance(instance_name=instance, + target_node=enode, + shutdown=True) + rem_op = opcodes.OpRemoveInstance(instance_name=instance, + ignore_failures=True) + imp_dir = utils.PathJoin(constants.EXPORT_DIR, full_name) + imp_op = opcodes.OpCreateInstance(instance_name=instance, + disks = [ {"size": size} + for size in self.disk_size], + disk_template=self.opts.disk_template, + nics=self.opts.nics, + mode=constants.INSTANCE_IMPORT, + src_node=enode, + src_path=imp_dir, + pnode=pnode, + snode=snode, + start=True, + ip_check=self.opts.ip_check, + name_check=self.opts.name_check, + wait_for_sync=True, + file_storage_dir=None, + file_driver="loop", + iallocator=self.opts.iallocator, + beparams=self.bep, + hvparams=self.hvp, + ) + + erem_op = opcodes.OpRemoveExport(instance_name=instance) + + Log("export to node %s", enode, indent=2) + Log("remove instance", indent=2) + Log(import_log_msg, indent=2) + Log("remove export", indent=2) + self.ExecOrQueue(instance, exp_op, rem_op, imp_op, erem_op) + + @staticmethod + def StopInstanceOp(instance): + """Stop given instance.""" + return opcodes.OpShutdownInstance(instance_name=instance) + + @staticmethod + def StartInstanceOp(instance): + """Start given instance.""" + return opcodes.OpStartupInstance(instance_name=instance, force=False) + + @staticmethod + def RenameInstanceOp(instance, instance_new): + """Rename instance.""" + return opcodes.OpRenameInstance(instance_name=instance, + new_name=instance_new) + + @_DoCheckInstances + @_DoBatch(True) + def BurnStopStart(self): + """Stop/start the instances.""" + Log("Stopping and starting instances") + for instance in self.instances: + Log("instance %s", instance, indent=1) + op1 = self.StopInstanceOp(instance) + op2 = self.StartInstanceOp(instance) + self.ExecOrQueue(instance, op1, op2) + + @_DoBatch(False) + def BurnRemove(self): + """Remove the instances.""" + Log("Removing instances") + for instance in self.to_rem: + Log("instance %s", instance, indent=1) + op = opcodes.OpRemoveInstance(instance_name=instance, + ignore_failures=True) + self.ExecOrQueue(instance, op) + + def BurnRename(self): + """Rename the instances. + + Note that this function will not execute in parallel, since we + only have one target for rename. + + """ + Log("Renaming instances") + rename = self.opts.rename + for instance in self.instances: + Log("instance %s", instance, indent=1) + op_stop1 = self.StopInstanceOp(instance) + op_stop2 = self.StopInstanceOp(rename) + op_rename1 = self.RenameInstanceOp(instance, rename) + op_rename2 = self.RenameInstanceOp(rename, instance) + op_start1 = self.StartInstanceOp(rename) + op_start2 = self.StartInstanceOp(instance) + self.ExecOp(False, op_stop1, op_rename1, op_start1) + self._CheckInstanceAlive(rename) + self.ExecOp(False, op_stop2, op_rename2, op_start2) + self._CheckInstanceAlive(instance) + + @_DoCheckInstances + @_DoBatch(True) + def BurnReinstall(self): + """Reinstall the instances.""" + Log("Reinstalling instances") + for instance in self.instances: + Log("instance %s", instance, indent=1) + op1 = self.StopInstanceOp(instance) + op2 = opcodes.OpReinstallInstance(instance_name=instance) + Log("reinstall without passing the OS", indent=2) + op3 = opcodes.OpReinstallInstance(instance_name=instance, + os_type=self.opts.os) + Log("reinstall specifying the OS", indent=2) + op4 = self.StartInstanceOp(instance) + self.ExecOrQueue(instance, op1, op2, op3, op4) + + @_DoCheckInstances + @_DoBatch(True) + def BurnReboot(self): + """Reboot the instances.""" + Log("Rebooting instances") + for instance in self.instances: + Log("instance %s", instance, indent=1) + ops = [] + for reboot_type in constants.REBOOT_TYPES: + op = opcodes.OpRebootInstance(instance_name=instance, + reboot_type=reboot_type, + ignore_secondaries=False) + Log("reboot with type '%s'", reboot_type, indent=2) + ops.append(op) + self.ExecOrQueue(instance, *ops) # pylint: disable-msg=W0142 + + @_DoCheckInstances + @_DoBatch(True) + def BurnActivateDisks(self): + """Activate and deactivate disks of the instances.""" + Log("Activating/deactivating disks") + for instance in self.instances: + Log("instance %s", instance, indent=1) + op_start = self.StartInstanceOp(instance) + op_act = opcodes.OpActivateInstanceDisks(instance_name=instance) + op_deact = opcodes.OpDeactivateInstanceDisks(instance_name=instance) + op_stop = self.StopInstanceOp(instance) + Log("activate disks when online", indent=2) + Log("activate disks when offline", indent=2) + Log("deactivate disks (when offline)", indent=2) + self.ExecOrQueue(instance, op_act, op_stop, op_act, op_deact, op_start) + + @_DoCheckInstances + @_DoBatch(False) + def BurnAddRemoveDisks(self): + """Add and remove an extra disk for the instances.""" + Log("Adding and removing disks") + for instance in self.instances: + Log("instance %s", instance, indent=1) + op_add = opcodes.OpSetInstanceParams(\ + instance_name=instance, + disks=[(constants.DDM_ADD, {"size": self.disk_size[0]})]) + op_rem = opcodes.OpSetInstanceParams(\ + instance_name=instance, disks=[(constants.DDM_REMOVE, {})]) + op_stop = self.StopInstanceOp(instance) + op_start = self.StartInstanceOp(instance) + Log("adding a disk", indent=2) + Log("removing last disk", indent=2) + self.ExecOrQueue(instance, op_add, op_stop, op_rem, op_start) + + @_DoBatch(False) + def BurnAddRemoveNICs(self): + """Add and remove an extra NIC for the instances.""" + Log("Adding and removing NICs") + for instance in self.instances: + Log("instance %s", instance, indent=1) + op_add = opcodes.OpSetInstanceParams(\ + instance_name=instance, nics=[(constants.DDM_ADD, {})]) + op_rem = opcodes.OpSetInstanceParams(\ + instance_name=instance, nics=[(constants.DDM_REMOVE, {})]) + Log("adding a NIC", indent=2) + Log("removing last NIC", indent=2) + self.ExecOrQueue(instance, op_add, op_rem) + + def ConfdCallback(self, reply): + """Callback for confd queries""" + if reply.type == confd_client.UPCALL_REPLY: + if reply.server_reply.status != constants.CONFD_REPL_STATUS_OK: + Err("Query %s gave non-ok status %s: %s" % (reply.orig_request, + reply.server_reply.status, + reply.server_reply)) + if reply.orig_request.type == constants.CONFD_REQ_PING: + Log("Ping: OK", indent=1) + elif reply.orig_request.type == constants.CONFD_REQ_CLUSTER_MASTER: + if reply.server_reply.answer == self.cluster_info["master"]: + Log("Master: OK", indent=1) + else: + Err("Master: wrong: %s" % reply.server_reply.answer) + elif reply.orig_request.type == constants.CONFD_REQ_NODE_ROLE_BYNAME: + if reply.server_reply.answer == constants.CONFD_NODE_ROLE_MASTER: + Log("Node role for master: OK", indent=1) + else: + Err("Node role for master: wrong: %s" % reply.server_reply.answer) + + def DoConfdRequestReply(self, req): + self.confd_counting_callback.RegisterQuery(req.rsalt) + self.confd_client.SendRequest(req, async=False) + while not self.confd_counting_callback.AllAnswered(): + if not self.confd_client.ReceiveReply(): + Err("Did not receive all expected confd replies") + break + + def BurnConfd(self): + """Run confd queries for our instances. + + The following confd queries are tested: + - CONFD_REQ_PING: simple ping + - CONFD_REQ_CLUSTER_MASTER: cluster master + - CONFD_REQ_NODE_ROLE_BYNAME: node role, for the master + + """ + Log("Checking confd results") + + hmac_key = utils.ReadFile(constants.CONFD_HMAC_KEY) + mc_file = self.ss.KeyToFilename(constants.SS_MASTER_CANDIDATES_IPS) + mc_list = utils.ReadFile(mc_file).splitlines() + filter_callback = confd_client.ConfdFilterCallback(self.ConfdCallback) + counting_callback = confd_client.ConfdCountingCallback(filter_callback) + self.confd_counting_callback = counting_callback + + self.confd_client = confd_client.ConfdClient(hmac_key, mc_list, + counting_callback) + + req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING) + self.DoConfdRequestReply(req) + + req = confd_client.ConfdClientRequest( + type=constants.CONFD_REQ_CLUSTER_MASTER) + self.DoConfdRequestReply(req) + + req = confd_client.ConfdClientRequest( + type=constants.CONFD_REQ_NODE_ROLE_BYNAME, + query=self.cluster_info["master"]) + self.DoConfdRequestReply(req) + + def _CheckInstanceAlive(self, instance): + """Check if an instance is alive by doing http checks. + + This will try to retrieve the url on the instance /hostname.txt + and check that it contains the hostname of the instance. In case + we get ECONNREFUSED, we retry up to the net timeout seconds, for + any other error we abort. + + """ + if not self.opts.http_check: + return + end_time = time.time() + self.opts.net_timeout + url = None + while time.time() < end_time and url is None: + try: + url = self.url_opener.open("http://%s/hostname.txt" % instance) + except IOError: + # here we can have connection refused, no route to host, etc. + time.sleep(1) + if url is None: + raise InstanceDown(instance, "Cannot contact instance") + hostname = url.read().strip() + url.close() + if hostname != instance: + raise InstanceDown(instance, ("Hostname mismatch, expected %s, got %s" % + (instance, hostname))) + + def BurninCluster(self): + """Test a cluster intensively. + + This will create instances and then start/stop/failover them. + It is safe for existing instances but could impact performance. + + """ + + opts = self.opts + + Log("Testing global parameters") + + if (len(self.nodes) == 1 and + opts.disk_template not in (constants.DT_DISKLESS, constants.DT_PLAIN, + constants.DT_FILE)): + Err("When one node is available/selected the disk template must" + " be 'diskless', 'file' or 'plain'") + + has_err = True + try: + self.BurnCreateInstances() + if opts.do_replace1 and opts.disk_template in constants.DTS_NET_MIRROR: + self.BurnReplaceDisks1D8() + if (opts.do_replace2 and len(self.nodes) > 2 and + opts.disk_template in constants.DTS_NET_MIRROR) : + self.BurnReplaceDisks2() + + if (opts.disk_template != constants.DT_DISKLESS and + utils.any(self.disk_growth, lambda n: n > 0)): + self.BurnGrowDisks() + + if opts.do_failover and opts.disk_template in constants.DTS_NET_MIRROR: + self.BurnFailover() + + if opts.do_migrate and opts.disk_template == constants.DT_DRBD8: + self.BurnMigrate() + + if (opts.do_move and len(self.nodes) > 1 and + opts.disk_template in [constants.DT_PLAIN, constants.DT_FILE]): + self.BurnMove() + + if (opts.do_importexport and + opts.disk_template not in (constants.DT_DISKLESS, + constants.DT_FILE)): + self.BurnImportExport() + + if opts.do_reinstall: + self.BurnReinstall() + + if opts.do_reboot: + self.BurnReboot() + + if opts.do_addremove_disks: + self.BurnAddRemoveDisks() + + default_nic_mode = self.cluster_default_nicparams[constants.NIC_MODE] + # Don't add/remove nics in routed mode, as we would need an ip to add + # them with + if opts.do_addremove_nics: + if default_nic_mode == constants.NIC_MODE_BRIDGED: + self.BurnAddRemoveNICs() + else: + Log("Skipping nic add/remove as the cluster is not in bridged mode") + + if opts.do_activate_disks: + self.BurnActivateDisks() + + if opts.rename: + self.BurnRename() + + if opts.do_confd_tests: + self.BurnConfd() + + if opts.do_startstop: + self.BurnStopStart() + + has_err = False + finally: + if has_err: + Log("Error detected: opcode buffer follows:\n\n") + Log(self.GetFeedbackBuf()) + Log("\n\n") + if not self.opts.keep_instances: + try: + self.BurnRemove() + except Exception, err: # pylint: disable-msg=W0703 + if has_err: # already detected errors, so errors in removal + # are quite expected + Log("Note: error detected during instance remove: %s", err) + else: # non-expected error + raise + + return 0 + def main(): """Main function""" - opts, args = ParseOptions() - try: - utils.Lock('cmd', max_retries=15, debug=True) - except errors.LockError, err: - logger.ToStderr(str(err)) - return 1 - try: - retval = BurninCluster(opts, args) - finally: - utils.Unlock('cmd') - utils.LockCleanup() - return retval + burner = Burner() + return burner.BurninCluster() + if __name__ == "__main__": main()