X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/eb61f8d36dc075ccde8ee131c98c1a356af77f2d..ea9c753d1c5e376f14b30198f68124369cc2a09f:/tools/burnin diff --git a/tools/burnin b/tools/burnin index eae59f5..c5d1612 100755 --- a/tools/burnin +++ b/tools/burnin @@ -1,7 +1,7 @@ #!/usr/bin/python # -# Copyright (C) 2006, 2007 Google Inc. +# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -23,31 +23,43 @@ """ -import os import sys import optparse import time import socket -import urllib2 -import errno +import urllib from itertools import izip, islice, cycle from cStringIO import StringIO from ganeti import opcodes -from ganeti import mcpu from ganeti import constants from ganeti import cli from ganeti import errors from ganeti import utils +from ganeti import hypervisor +from ganeti import compat + +from ganeti.confd import client as confd_client USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...") +MAX_RETRIES = 3 +LOG_HEADERS = { + 0: "- ", + 1: "* ", + 2: "" + } + class InstanceDown(Exception): """The checked instance was not up""" +class BurninFailure(Exception): + """Failure detected during burning""" + + def Usage(): """Shows program usage information and exits the program.""" @@ -56,27 +68,213 @@ def Usage(): sys.exit(2) -def Log(msg): +def Log(msg, *args, **kwargs): """Simple function that prints out its argument. """ - print msg + if args: + msg = msg % args + indent = kwargs.get("indent", 0) + sys.stdout.write("%*s%s%s\n" % (2 * indent, "", + LOG_HEADERS.get(indent, " "), msg)) sys.stdout.flush() +def Err(msg, exit_code=1): + """Simple error logging that prints to stderr. + + """ + sys.stderr.write(msg + "\n") + sys.stderr.flush() + sys.exit(exit_code) + + +class SimpleOpener(urllib.FancyURLopener): + """A simple url opener""" + # pylint: disable=W0221 + + def prompt_user_passwd(self, host, realm, clear_cache=0): + """No-interaction version of prompt_user_passwd.""" + # we follow parent class' API + # pylint: disable=W0613 + return None, None + + def http_error_default(self, url, fp, errcode, errmsg, headers): + """Custom error handling""" + # make sure sockets are not left in CLOSE_WAIT, this is similar + # but with a different exception to the BasicURLOpener class + _ = fp.read() # throw away data + fp.close() + raise InstanceDown("HTTP error returned: code %s, msg %s" % + (errcode, errmsg)) + + +OPTIONS = [ + cli.cli_option("-o", "--os", dest="os", default=None, + help="OS to use during burnin", + metavar="", + completion_suggest=cli.OPT_COMPL_ONE_OS), + cli.HYPERVISOR_OPT, + cli.OSPARAMS_OPT, + cli.cli_option("--disk-size", dest="disk_size", + help="Disk size (determines disk count)", + default="128m", type="string", metavar="", + completion_suggest=("128M 512M 1G 4G 1G,256M" + " 4G,1G,1G 10G").split()), + cli.cli_option("--disk-growth", dest="disk_growth", help="Disk growth", + default="128m", type="string", metavar=""), + cli.cli_option("--mem-size", dest="mem_size", help="Memory size", + default=128, type="unit", metavar="", + completion_suggest=("128M 256M 512M 1G 4G 8G" + " 12G 16G").split()), + cli.cli_option("--vcpu-count", dest="vcpu_count", help="VCPU count", + default=3, type="unit", metavar="", + completion_suggest=("1 2 3 4").split()), + cli.DEBUG_OPT, + cli.VERBOSE_OPT, + cli.NOIPCHECK_OPT, + cli.NONAMECHECK_OPT, + cli.EARLY_RELEASE_OPT, + cli.cli_option("--no-replace1", dest="do_replace1", + help="Skip disk replacement with the same secondary", + action="store_false", default=True), + cli.cli_option("--no-replace2", dest="do_replace2", + help="Skip disk replacement with a different secondary", + action="store_false", default=True), + cli.cli_option("--no-failover", dest="do_failover", + help="Skip instance failovers", action="store_false", + default=True), + cli.cli_option("--no-migrate", dest="do_migrate", + help="Skip instance live migration", + action="store_false", default=True), + cli.cli_option("--no-move", dest="do_move", + help="Skip instance moves", action="store_false", + default=True), + cli.cli_option("--no-importexport", dest="do_importexport", + help="Skip instance export/import", action="store_false", + default=True), + cli.cli_option("--no-startstop", dest="do_startstop", + help="Skip instance stop/start", action="store_false", + default=True), + cli.cli_option("--no-reinstall", dest="do_reinstall", + help="Skip instance reinstall", action="store_false", + default=True), + cli.cli_option("--no-reboot", dest="do_reboot", + help="Skip instance reboot", action="store_false", + default=True), + cli.cli_option("--reboot-types", dest="reboot_types", + help="Specify the reboot types", default=None), + cli.cli_option("--no-activate-disks", dest="do_activate_disks", + help="Skip disk activation/deactivation", + action="store_false", default=True), + cli.cli_option("--no-add-disks", dest="do_addremove_disks", + help="Skip disk addition/removal", + action="store_false", default=True), + cli.cli_option("--no-add-nics", dest="do_addremove_nics", + help="Skip NIC addition/removal", + action="store_false", default=True), + cli.cli_option("--no-nics", dest="nics", + help="No network interfaces", action="store_const", + const=[], default=[{}]), + cli.cli_option("--no-confd", dest="do_confd_tests", + help="Skip confd queries", + action="store_false", default=True), + cli.cli_option("--rename", dest="rename", default=None, + help=("Give one unused instance name which is taken" + " to start the renaming sequence"), + metavar=""), + cli.cli_option("-t", "--disk-template", dest="disk_template", + choices=list(constants.DISK_TEMPLATES), + default=constants.DT_DRBD8, + help="Disk template (diskless, file, plain, sharedfile" + " or drbd) [drbd]"), + cli.cli_option("-n", "--nodes", dest="nodes", default="", + help=("Comma separated list of nodes to perform" + " the burnin on (defaults to all nodes)"), + completion_suggest=cli.OPT_COMPL_MANY_NODES), + cli.cli_option("-I", "--iallocator", dest="iallocator", + default=None, type="string", + help=("Perform the allocation using an iallocator" + " instead of fixed node spread (node restrictions no" + " longer apply, therefore -n/--nodes must not be" + " used"), + completion_suggest=cli.OPT_COMPL_ONE_IALLOCATOR), + cli.cli_option("-p", "--parallel", default=False, action="store_true", + dest="parallel", + help=("Enable parallelization of some operations in" + " order to speed burnin or to test granular locking")), + cli.cli_option("--net-timeout", default=15, type="int", + dest="net_timeout", + help=("The instance check network timeout in seconds" + " (defaults to 15 seconds)"), + completion_suggest="15 60 300 900".split()), + cli.cli_option("-C", "--http-check", default=False, action="store_true", + dest="http_check", + help=("Enable checking of instance status via http," + " looking for /hostname.txt that should contain the" + " name of the instance")), + cli.cli_option("-K", "--keep-instances", default=False, + action="store_true", + dest="keep_instances", + help=("Leave instances on the cluster after burnin," + " for investigation in case of errors or simply" + " to use them")), + ] + +# Mainly used for bash completion +ARGUMENTS = [cli.ArgInstance(min=1)] + + +def _DoCheckInstances(fn): + """Decorator for checking instances. + + """ + def wrapper(self, *args, **kwargs): + val = fn(self, *args, **kwargs) + for instance in self.instances: + self._CheckInstanceAlive(instance) # pylint: disable=W0212 + return val + + return wrapper + + +def _DoBatch(retry): + """Decorator for possible batch operations. + + Must come after the _DoCheckInstances decorator (if any). + + @param retry: whether this is a retryable batch, will be + passed to StartBatch + + """ + def wrap(fn): + def batched(self, *args, **kwargs): + self.StartBatch(retry) + val = fn(self, *args, **kwargs) + self.CommitQueue() + return val + return batched + + return wrap + + class Burner(object): """Burner class.""" def __init__(self): """Constructor.""" - utils.SetupLogging(constants.LOG_BURNIN, debug=False, stderr_logging=True) + self.url_opener = SimpleOpener() self._feed_buf = StringIO() self.nodes = [] self.instances = [] self.to_rem = [] + self.queued_ops = [] self.opts = None - self.cl = cli.GetClient() + self.queue_retry = False + self.disk_count = self.disk_growth = self.disk_size = None + self.hvp = self.bep = None self.ParseOptions() + self.cl = cli.GetClient() self.GetState() def ClearFeedbackBuf(self): @@ -89,33 +287,144 @@ class Burner(object): def Feedback(self, msg): """Acumulate feedback in our buffer.""" - self._feed_buf.write("%s %s\n" % (time.ctime(utils.MergeTime(msg[0])), - msg[2])) + formatted_msg = "%s %s" % (time.ctime(utils.MergeTime(msg[0])), msg[2]) + self._feed_buf.write(formatted_msg + "\n") if self.opts.verbose: - Log(msg) + Log(formatted_msg, indent=3) + + def MaybeRetry(self, retry_count, msg, fn, *args): + """Possibly retry a given function execution. + + @type retry_count: int + @param retry_count: retry counter: + - 0: non-retryable action + - 1: last retry for a retryable action + - MAX_RETRIES: original try for a retryable action + @type msg: str + @param msg: the kind of the operation + @type fn: callable + @param fn: the function to be called + + """ + try: + val = fn(*args) + if retry_count > 0 and retry_count < MAX_RETRIES: + Log("Idempotent %s succeeded after %d retries", + msg, MAX_RETRIES - retry_count) + return val + except Exception, err: # pylint: disable=W0703 + if retry_count == 0: + Log("Non-idempotent %s failed, aborting", msg) + raise + elif retry_count == 1: + Log("Idempotent %s repeated failure, aborting", msg) + raise + else: + Log("Idempotent %s failed, retry #%d/%d: %s", + msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err) + self.MaybeRetry(retry_count - 1, msg, fn, *args) + + def _ExecOp(self, *ops): + """Execute one or more opcodes and manage the exec buffer. + + @return: if only opcode has been passed, we return its result; + otherwise we return the list of results + + """ + job_id = cli.SendJob(ops, cl=self.cl) + results = cli.PollJob(job_id, cl=self.cl, feedback_fn=self.Feedback) + if len(ops) == 1: + return results[0] + else: + return results + + def ExecOp(self, retry, *ops): + """Execute one or more opcodes and manage the exec buffer. + + @return: if only opcode has been passed, we return its result; + otherwise we return the list of results - def ExecOp(self, op): + """ + if retry: + rval = MAX_RETRIES + else: + rval = 0 + cli.SetGenericOpcodeOpts(ops, self.opts) + return self.MaybeRetry(rval, "opcode", self._ExecOp, *ops) + + def ExecOrQueue(self, name, ops, post_process=None): """Execute an opcode and manage the exec buffer.""" - self.ClearFeedbackBuf() - return cli.SubmitOpCode(op, feedback_fn=self.Feedback, cl=self.cl) + if self.opts.parallel: + cli.SetGenericOpcodeOpts(ops, self.opts) + self.queued_ops.append((ops, name, post_process)) + else: + val = self.ExecOp(self.queue_retry, *ops) # pylint: disable=W0142 + if post_process is not None: + post_process() + return val + + def StartBatch(self, retry): + """Start a new batch of jobs. + + @param retry: whether this is a retryable batch + + """ + self.queued_ops = [] + self.queue_retry = retry + + def CommitQueue(self): + """Execute all submitted opcodes in case of parallel burnin""" + if not self.opts.parallel or not self.queued_ops: + return + + if self.queue_retry: + rval = MAX_RETRIES + else: + rval = 0 + + try: + results = self.MaybeRetry(rval, "jobset", self.ExecJobSet, + self.queued_ops) + finally: + self.queued_ops = [] + return results def ExecJobSet(self, jobs): """Execute a set of jobs and return once all are done. The method will return the list of results, if all jobs are - successfull. Otherwise, OpExecError will be raised from within + successful. Otherwise, OpExecError will be raised from within cli.py. """ self.ClearFeedbackBuf() - job_ids = [cli.SendJob(job, cl=self.cl) for job in jobs] - Log("- Submitted job IDs %s" % ", ".join(job_ids)) - results = [] - for jid in job_ids: - Log("- Waiting for job %s" % jid) - results.append(cli.PollJob(jid, cl=self.cl, feedback_fn=self.Feedback)) + jex = cli.JobExecutor(cl=self.cl, feedback_fn=self.Feedback) + for ops, name, _ in jobs: + jex.QueueJob(name, *ops) # pylint: disable=W0142 + try: + results = jex.GetResults() + except Exception, err: # pylint: disable=W0703 + Log("Jobs failed: %s", err) + raise BurninFailure() + + fail = False + val = [] + for (_, name, post_process), (success, result) in zip(jobs, results): + if success: + if post_process: + try: + post_process() + except Exception, err: # pylint: disable=W0703 + Log("Post process call for job %s failed: %s", name, err) + fail = True + val.append(result) + else: + fail = True - return results + if fail: + raise BurninFailure() + + return val def ParseOptions(self): """Parses the command line options. @@ -124,89 +433,10 @@ class Burner(object): program. """ - parser = optparse.OptionParser(usage="\n%s" % USAGE, - version="%%prog (ganeti) %s" % - constants.RELEASE_VERSION, - option_class=cli.CliOption) - - parser.add_option("-o", "--os", dest="os", default=None, - help="OS to use during burnin", - metavar="") - parser.add_option("--disk-size", dest="disk_size", - help="Disk size (determines disk count)", - default="128m", type="string", metavar="") - parser.add_option("--disk-growth", dest="disk_growth", help="Disk growth", - default="128m", type="string", metavar="") - parser.add_option("--mem-size", dest="mem_size", help="Memory size", - default=128, type="unit", metavar="") - parser.add_option("-v", "--verbose", - action="store_true", dest="verbose", default=False, - help="print command execution messages to stdout") - parser.add_option("--no-replace1", dest="do_replace1", - help="Skip disk replacement with the same secondary", - action="store_false", default=True) - parser.add_option("--no-replace2", dest="do_replace2", - help="Skip disk replacement with a different secondary", - action="store_false", default=True) - parser.add_option("--no-failover", dest="do_failover", - help="Skip instance failovers", action="store_false", - default=True) - parser.add_option("--no-importexport", dest="do_importexport", - help="Skip instance export/import", action="store_false", - default=True) - parser.add_option("--no-startstop", dest="do_startstop", - help="Skip instance stop/start", action="store_false", - default=True) - parser.add_option("--no-reinstall", dest="do_reinstall", - help="Skip instance reinstall", action="store_false", - default=True) - parser.add_option("--no-reboot", dest="do_reboot", - help="Skip instance reboot", action="store_false", - default=True) - parser.add_option("--no-activate-disks", dest="do_activate_disks", - help="Skip disk activation/deactivation", - action="store_false", default=True) - parser.add_option("--no-add-disks", dest="do_addremove_disks", - help="Skip disk addition/removal", - action="store_false", default=True) - parser.add_option("--no-add-nics", dest="do_addremove_nics", - help="Skip NIC addition/removal", - action="store_false", default=True) - parser.add_option("--no-nics", dest="nics", - help="No network interfaces", action="store_const", - const=[], default=[{}]) - parser.add_option("--rename", dest="rename", default=None, - help="Give one unused instance name which is taken" - " to start the renaming sequence", - metavar="") - parser.add_option("-t", "--disk-template", dest="disk_template", - choices=("diskless", "file", "plain", "drbd"), - default="drbd", - help="Disk template (diskless, file, plain or drbd)" - " [drbd]") - parser.add_option("-n", "--nodes", dest="nodes", default="", - help="Comma separated list of nodes to perform" - " the burnin on (defaults to all nodes)") - parser.add_option("--iallocator", dest="iallocator", - default=None, type="string", - help="Perform the allocation using an iallocator" - " instead of fixed node spread (node restrictions no" - " longer apply, therefore -n/--nodes must not be used") - parser.add_option("-p", "--parallel", default=False, action="store_true", - dest="parallel", - help="Enable parallelization of some operations in" - " order to speed burnin or to test granular locking") - parser.add_option("--net-timeout", default=15, type="int", - dest="net_timeout", - help="The instance check network timeout in seconds" - " (defaults to 15 seconds)") - parser.add_option("-C", "--http-check", default=False, action="store_true", - dest="http_check", - help="Enable checking of instance status via http," - " looking for /hostname.txt that should contain the" - " name of the instance") - + version=("%%prog (ganeti) %s" % + constants.RELEASE_VERSION), + option_list=OPTIONS) options, args = parser.parse_args() if len(args) < 1 or options.os is None: @@ -214,11 +444,11 @@ class Burner(object): supported_disk_templates = (constants.DT_DISKLESS, constants.DT_FILE, + constants.DT_SHARED_FILE, constants.DT_PLAIN, constants.DT_DRBD8) if options.disk_template not in supported_disk_templates: - Log("Unknown disk template '%s'" % options.disk_template) - sys.exit(1) + Err("Unknown disk template '%s'" % options.disk_template) if options.disk_template == constants.DT_DISKLESS: disk_size = disk_growth = [] @@ -228,61 +458,90 @@ class Burner(object): disk_growth = [utils.ParseUnit(v) for v in options.disk_growth.split(",")] if len(disk_growth) != len(disk_size): - Log("Wrong disk sizes/growth combination") - sys.exit(1) + Err("Wrong disk sizes/growth combination") if ((disk_size and options.disk_template == constants.DT_DISKLESS) or (not disk_size and options.disk_template != constants.DT_DISKLESS)): - Log("Wrong disk count/disk template combination") - sys.exit(1) + Err("Wrong disk count/disk template combination") self.disk_size = disk_size self.disk_growth = disk_growth self.disk_count = len(disk_size) if options.nodes and options.iallocator: - Log("Give either the nodes option or the iallocator option, not both") - sys.exit(1) + Err("Give either the nodes option or the iallocator option, not both") + + if options.http_check and not options.name_check: + Err("Can't enable HTTP checks without name checks") self.opts = options self.instances = args self.bep = { constants.BE_MEMORY: options.mem_size, - constants.BE_VCPUS: 1, + constants.BE_VCPUS: options.vcpu_count, } + + self.hypervisor = None self.hvp = {} + if options.hypervisor: + self.hypervisor, self.hvp = options.hypervisor + + if options.reboot_types is None: + options.reboot_types = constants.REBOOT_TYPES + else: + options.reboot_types = options.reboot_types.split(",") + rt_diff = set(options.reboot_types).difference(constants.REBOOT_TYPES) + if rt_diff: + Err("Invalid reboot types specified: %s" % utils.CommaJoin(rt_diff)) socket.setdefaulttimeout(options.net_timeout) def GetState(self): - """Read the cluster state from the config.""" + """Read the cluster state from the master daemon.""" if self.opts.nodes: names = self.opts.nodes.split(",") else: names = [] try: - op = opcodes.OpQueryNodes(output_fields=["name", "offline"], names=names) - result = self.ExecOp(op) + op = opcodes.OpNodeQuery(output_fields=["name", "offline", "drained"], + names=names, use_locking=True) + result = self.ExecOp(True, op) except errors.GenericError, err: err_code, msg = cli.FormatError(err) - Log(msg) - sys.exit(err_code) - self.nodes = [data[0] for data in result if not data[1]] + Err(msg, exit_code=err_code) + self.nodes = [data[0] for data in result if not (data[1] or data[2])] - result = self.ExecOp(opcodes.OpDiagnoseOS(output_fields=["name", "valid"], - names=[])) + op_diagnose = opcodes.OpOsDiagnose(output_fields=["name", + "variants", + "hidden"], + names=[]) + result = self.ExecOp(True, op_diagnose) if not result: - Log("Can't get the OS list") - sys.exit(1) - - # filter non-valid OS-es - os_set = [val[0] for val in result if val[1]] - - if self.opts.os not in os_set: - Log("OS '%s' not found" % self.opts.os) - sys.exit(1) - - def CreateInstances(self): + Err("Can't get the OS list") + + found = False + for (name, variants, _) in result: + if self.opts.os in cli.CalculateOSNames(name, variants): + found = True + break + + if not found: + Err("OS '%s' not found" % self.opts.os) + + cluster_info = self.cl.QueryClusterInfo() + self.cluster_info = cluster_info + if not self.cluster_info: + Err("Can't get cluster info") + + default_nic_params = self.cluster_info["nicparams"][constants.PP_DEFAULT] + self.cluster_default_nicparams = default_nic_params + if self.hypervisor is None: + self.hypervisor = self.cluster_info["default_hypervisor"] + self.hv_class = hypervisor.GetHypervisorClass(self.hypervisor) + + @_DoCheckInstances + @_DoBatch(False) + def BurnCreateInstances(self): """Create the given instances. """ @@ -290,22 +549,24 @@ class Burner(object): mytor = izip(cycle(self.nodes), islice(cycle(self.nodes), 1, None), self.instances) - jobset = [] + Log("Creating instances") for pnode, snode, instance in mytor: + Log("instance %s", instance, indent=1) if self.opts.iallocator: pnode = snode = None - Log("- Add instance %s (iallocator: %s)" % - (instance, self.opts.iallocator)) - elif self.opts.disk_template not in constants.DTS_NET_MIRROR: + msg = "with iallocator %s" % self.opts.iallocator + elif self.opts.disk_template not in constants.DTS_INT_MIRROR: snode = None - Log("- Add instance %s on node %s" % (instance, pnode)) + msg = "on %s" % pnode else: - Log("- Add instance %s on nodes %s/%s" % (instance, pnode, snode)) + msg = "on %s, %s" % (pnode, snode) + + Log(msg, indent=2) - op = opcodes.OpCreateInstance(instance_name=instance, - disks = [ {"size": size} - for size in self.disk_size], + op = opcodes.OpInstanceCreate(instance_name=instance, + disks=[{"size": size} + for size in self.disk_size], disk_template=self.opts.disk_template, nics=self.opts.nics, mode=constants.INSTANCE_CREATE, @@ -313,115 +574,154 @@ class Burner(object): pnode=pnode, snode=snode, start=True, - ip_check=True, + ip_check=self.opts.ip_check, + name_check=self.opts.name_check, wait_for_sync=True, file_driver="loop", file_storage_dir=None, iallocator=self.opts.iallocator, beparams=self.bep, hvparams=self.hvp, + hypervisor=self.hypervisor, + osparams=self.opts.osparams, ) + remove_instance = lambda name: lambda: self.to_rem.append(name) + self.ExecOrQueue(instance, [op], post_process=remove_instance(instance)) - if self.opts.parallel: - jobset.append([op]) - # FIXME: here we should not append to to_rem uncoditionally, - # but only when the job is successful - self.to_rem.append(instance) - else: - self.ExecOp(op) - self.to_rem.append(instance) - if self.opts.parallel: - self.ExecJobSet(jobset) - - for instance in self.instances: - self._CheckInstanceAlive(instance) - - def GrowDisks(self): + @_DoBatch(False) + def BurnGrowDisks(self): """Grow both the os and the swap disks by the requested amount, if any.""" + Log("Growing disks") for instance in self.instances: + Log("instance %s", instance, indent=1) for idx, growth in enumerate(self.disk_growth): if growth > 0: - op = opcodes.OpGrowDisk(instance_name=instance, disk=idx, - amount=growth, wait_for_sync=True) - Log("- Increase %s's disk/%s by %s MB" % (instance, idx, growth)) - self.ExecOp(op) + op = opcodes.OpInstanceGrowDisk(instance_name=instance, disk=idx, + amount=growth, wait_for_sync=True) + Log("increase disk/%s by %s MB", idx, growth, indent=2) + self.ExecOrQueue(instance, [op]) - def ReplaceDisks1D8(self): + @_DoBatch(True) + def BurnReplaceDisks1D8(self): """Replace disks on primary and secondary for drbd8.""" + Log("Replacing disks on the same nodes") + early_release = self.opts.early_release for instance in self.instances: + Log("instance %s", instance, indent=1) + ops = [] for mode in constants.REPLACE_DISK_SEC, constants.REPLACE_DISK_PRI: - op = opcodes.OpReplaceDisks(instance_name=instance, - mode=mode, - disks=[i for i in range(self.disk_count)]) - Log("- Replace disks (%s) for instance %s" % (mode, instance)) - self.ExecOp(op) - - def ReplaceDisks2(self): + op = opcodes.OpInstanceReplaceDisks(instance_name=instance, + mode=mode, + disks=list(range(self.disk_count)), + early_release=early_release) + Log("run %s", mode, indent=2) + ops.append(op) + self.ExecOrQueue(instance, ops) + + @_DoBatch(True) + def BurnReplaceDisks2(self): """Replace secondary node.""" + Log("Changing the secondary node") mode = constants.REPLACE_DISK_CHG mytor = izip(islice(cycle(self.nodes), 2, None), self.instances) for tnode, instance in mytor: + Log("instance %s", instance, indent=1) if self.opts.iallocator: tnode = None - op = opcodes.OpReplaceDisks(instance_name=instance, - mode=mode, - remote_node=tnode, - iallocator=self.opts.iallocator, - disks=[i for i in range(self.disk_count)]) - Log("- Replace secondary (%s) for instance %s" % (mode, instance)) - self.ExecOp(op) - - def Failover(self): + msg = "with iallocator %s" % self.opts.iallocator + else: + msg = tnode + op = opcodes.OpInstanceReplaceDisks(instance_name=instance, + mode=mode, + remote_node=tnode, + iallocator=self.opts.iallocator, + disks=[], + early_release=self.opts.early_release) + Log("run %s %s", mode, msg, indent=2) + self.ExecOrQueue(instance, [op]) + + @_DoCheckInstances + @_DoBatch(False) + def BurnFailover(self): """Failover the instances.""" - + Log("Failing over instances") for instance in self.instances: - op = opcodes.OpFailoverInstance(instance_name=instance, + Log("instance %s", instance, indent=1) + op = opcodes.OpInstanceFailover(instance_name=instance, ignore_consistency=False) - - Log("- Failover instance %s" % (instance)) - self.ExecOp(op) + self.ExecOrQueue(instance, [op]) + + @_DoCheckInstances + @_DoBatch(False) + def BurnMove(self): + """Move the instances.""" + Log("Moving instances") + mytor = izip(islice(cycle(self.nodes), 1, None), + self.instances) + for tnode, instance in mytor: + Log("instance %s", instance, indent=1) + op = opcodes.OpInstanceMove(instance_name=instance, + target_node=tnode) + self.ExecOrQueue(instance, [op]) + + @_DoBatch(False) + def BurnMigrate(self): + """Migrate the instances.""" + Log("Migrating instances") for instance in self.instances: - self._CheckInstanceAlive(instance) - - def ImportExport(self): + Log("instance %s", instance, indent=1) + op1 = opcodes.OpInstanceMigrate(instance_name=instance, mode=None, + cleanup=False) + + op2 = opcodes.OpInstanceMigrate(instance_name=instance, mode=None, + cleanup=True) + Log("migration and migration cleanup", indent=2) + self.ExecOrQueue(instance, [op1, op2]) + + @_DoCheckInstances + @_DoBatch(False) + def BurnImportExport(self): """Export the instance, delete it, and import it back. """ - + Log("Exporting and re-importing instances") mytor = izip(cycle(self.nodes), islice(cycle(self.nodes), 1, None), islice(cycle(self.nodes), 2, None), self.instances) for pnode, snode, enode, instance in mytor: + Log("instance %s", instance, indent=1) + # read the full name of the instance + nam_op = opcodes.OpInstanceQuery(output_fields=["name"], + names=[instance], use_locking=True) + full_name = self.ExecOp(False, nam_op)[0][0] if self.opts.iallocator: pnode = snode = None - import_log_msg = ("- Import instance %s from node %s" - " (iallocator: %s)" % - (instance, enode, self.opts.iallocator)) - elif self.opts.disk_template not in constants.DTS_NET_MIRROR: + import_log_msg = ("import from %s" + " with iallocator %s" % + (enode, self.opts.iallocator)) + elif self.opts.disk_template not in constants.DTS_INT_MIRROR: snode = None - import_log_msg = ("- Import instance %s from node %s to node %s" % - (instance, enode, pnode)) + import_log_msg = ("import from %s to %s" % + (enode, pnode)) else: - import_log_msg = ("- Import instance %s from node %s to nodes %s/%s" % - (instance, enode, pnode, snode)) - - exp_op = opcodes.OpExportInstance(instance_name=instance, - target_node=enode, - shutdown=True) - rem_op = opcodes.OpRemoveInstance(instance_name=instance, + import_log_msg = ("import from %s to %s, %s" % + (enode, pnode, snode)) + + exp_op = opcodes.OpBackupExport(instance_name=instance, + target_node=enode, + mode=constants.EXPORT_MODE_LOCAL, + shutdown=True) + rem_op = opcodes.OpInstanceRemove(instance_name=instance, ignore_failures=True) - nam_op = opcodes.OpQueryInstances(output_fields=["name"], - names=[instance]) - full_name = self.ExecOp(nam_op)[0][0] - imp_dir = os.path.join(constants.EXPORT_DIR, full_name) - imp_op = opcodes.OpCreateInstance(instance_name=instance, - disks = [ {"size": size} - for size in self.disk_size], + imp_dir = utils.PathJoin(constants.EXPORT_DIR, full_name) + imp_op = opcodes.OpInstanceCreate(instance_name=instance, + disks=[{"size": size} + for size in self.disk_size], disk_template=self.opts.disk_template, nics=self.opts.nics, mode=constants.INSTANCE_IMPORT, @@ -430,153 +730,220 @@ class Burner(object): pnode=pnode, snode=snode, start=True, - ip_check=True, + ip_check=self.opts.ip_check, + name_check=self.opts.name_check, wait_for_sync=True, file_storage_dir=None, file_driver="loop", iallocator=self.opts.iallocator, beparams=self.bep, hvparams=self.hvp, + osparams=self.opts.osparams, ) - erem_op = opcodes.OpRemoveExport(instance_name=instance) - - Log("- Export instance %s to node %s" % (instance, enode)) - self.ExecOp(exp_op) - Log("- Remove instance %s" % (instance)) - self.ExecOp(rem_op) - self.to_rem.remove(instance) - Log(import_log_msg) - self.ExecOp(imp_op) - Log("- Remove export of instance %s" % (instance)) - self.ExecOp(erem_op) + erem_op = opcodes.OpBackupRemove(instance_name=instance) - self.to_rem.append(instance) + Log("export to node %s", enode, indent=2) + Log("remove instance", indent=2) + Log(import_log_msg, indent=2) + Log("remove export", indent=2) + self.ExecOrQueue(instance, [exp_op, rem_op, imp_op, erem_op]) - for instance in self.instances: - self._CheckInstanceAlive(instance) - - def StopInstance(self, instance): + @staticmethod + def StopInstanceOp(instance): """Stop given instance.""" - op = opcodes.OpShutdownInstance(instance_name=instance) - Log("- Shutdown instance %s" % instance) - self.ExecOp(op) + return opcodes.OpInstanceShutdown(instance_name=instance) - def StartInstance(self, instance): + @staticmethod + def StartInstanceOp(instance): """Start given instance.""" - op = opcodes.OpStartupInstance(instance_name=instance, force=False) - Log("- Start instance %s" % instance) - self.ExecOp(op) + return opcodes.OpInstanceStartup(instance_name=instance, force=False) - def RenameInstance(self, instance, instance_new): + @staticmethod + def RenameInstanceOp(instance, instance_new): """Rename instance.""" - op = opcodes.OpRenameInstance(instance_name=instance, - new_name=instance_new) - Log("- Rename instance %s to %s" % (instance, instance_new)) - self.ExecOp(op) + return opcodes.OpInstanceRename(instance_name=instance, + new_name=instance_new) - def StopStart(self): + @_DoCheckInstances + @_DoBatch(True) + def BurnStopStart(self): """Stop/start the instances.""" + Log("Stopping and starting instances") for instance in self.instances: - self.StopInstance(instance) - self.StartInstance(instance) + Log("instance %s", instance, indent=1) + op1 = self.StopInstanceOp(instance) + op2 = self.StartInstanceOp(instance) + self.ExecOrQueue(instance, [op1, op2]) - for instance in self.instances: - self._CheckInstanceAlive(instance) - - def Remove(self): + @_DoBatch(False) + def BurnRemove(self): """Remove the instances.""" + Log("Removing instances") for instance in self.to_rem: - op = opcodes.OpRemoveInstance(instance_name=instance, + Log("instance %s", instance, indent=1) + op = opcodes.OpInstanceRemove(instance_name=instance, ignore_failures=True) - Log("- Remove instance %s" % instance) - self.ExecOp(op) + self.ExecOrQueue(instance, [op]) + + def BurnRename(self): + """Rename the instances. - def Rename(self): - """Rename the instances.""" + Note that this function will not execute in parallel, since we + only have one target for rename. + + """ + Log("Renaming instances") rename = self.opts.rename for instance in self.instances: - self.StopInstance(instance) - self.RenameInstance(instance, rename) - self.StartInstance(rename) + Log("instance %s", instance, indent=1) + op_stop1 = self.StopInstanceOp(instance) + op_stop2 = self.StopInstanceOp(rename) + op_rename1 = self.RenameInstanceOp(instance, rename) + op_rename2 = self.RenameInstanceOp(rename, instance) + op_start1 = self.StartInstanceOp(rename) + op_start2 = self.StartInstanceOp(instance) + self.ExecOp(False, op_stop1, op_rename1, op_start1) self._CheckInstanceAlive(rename) - self.StopInstance(rename) - self.RenameInstance(rename, instance) - self.StartInstance(instance) - - for instance in self.instances: + self.ExecOp(False, op_stop2, op_rename2, op_start2) self._CheckInstanceAlive(instance) - def Reinstall(self): + @_DoCheckInstances + @_DoBatch(True) + def BurnReinstall(self): """Reinstall the instances.""" + Log("Reinstalling instances") for instance in self.instances: - self.StopInstance(instance) - op = opcodes.OpReinstallInstance(instance_name=instance) - Log("- Reinstall instance %s without passing the OS" % (instance,)) - self.ExecOp(op) - op = opcodes.OpReinstallInstance(instance_name=instance, - os_type=self.opts.os) - Log("- Reinstall instance %s specifying the OS" % (instance,)) - self.ExecOp(op) - self.StartInstance(instance) - for instance in self.instances: - self._CheckInstanceAlive(instance) - - def Reboot(self): - """Reinstall the instances.""" + Log("instance %s", instance, indent=1) + op1 = self.StopInstanceOp(instance) + op2 = opcodes.OpInstanceReinstall(instance_name=instance) + Log("reinstall without passing the OS", indent=2) + op3 = opcodes.OpInstanceReinstall(instance_name=instance, + os_type=self.opts.os) + Log("reinstall specifying the OS", indent=2) + op4 = self.StartInstanceOp(instance) + self.ExecOrQueue(instance, [op1, op2, op3, op4]) + + @_DoCheckInstances + @_DoBatch(True) + def BurnReboot(self): + """Reboot the instances.""" + Log("Rebooting instances") for instance in self.instances: - for reboot_type in constants.REBOOT_TYPES: - op = opcodes.OpRebootInstance(instance_name=instance, + Log("instance %s", instance, indent=1) + ops = [] + for reboot_type in self.opts.reboot_types: + op = opcodes.OpInstanceReboot(instance_name=instance, reboot_type=reboot_type, ignore_secondaries=False) - Log("- Reboot instance %s with type '%s'" % (instance, reboot_type)) - self.ExecOp(op) - self._CheckInstanceAlive(instance) + Log("reboot with type '%s'", reboot_type, indent=2) + ops.append(op) + self.ExecOrQueue(instance, ops) - def ActivateDisks(self): + @_DoCheckInstances + @_DoBatch(True) + def BurnActivateDisks(self): """Activate and deactivate disks of the instances.""" + Log("Activating/deactivating disks") for instance in self.instances: - op_act = opcodes.OpActivateInstanceDisks(instance_name=instance) - op_deact = opcodes.OpDeactivateInstanceDisks(instance_name=instance) - Log("- Activate disks of online instance %s" % (instance,)) - self.ExecOp(op_act) - self.StopInstance(instance) - Log("- Activate disks of offline instance %s" % (instance,)) - self.ExecOp(op_act) - Log("- Deactivate disks of offline instance %s" % (instance,)) - self.ExecOp(op_deact) - self.StartInstance(instance) - for instance in self.instances: - self._CheckInstanceAlive(instance) - - def AddRemoveDisks(self): + Log("instance %s", instance, indent=1) + op_start = self.StartInstanceOp(instance) + op_act = opcodes.OpInstanceActivateDisks(instance_name=instance) + op_deact = opcodes.OpInstanceDeactivateDisks(instance_name=instance) + op_stop = self.StopInstanceOp(instance) + Log("activate disks when online", indent=2) + Log("activate disks when offline", indent=2) + Log("deactivate disks (when offline)", indent=2) + self.ExecOrQueue(instance, [op_act, op_stop, op_act, op_deact, op_start]) + + @_DoCheckInstances + @_DoBatch(False) + def BurnAddRemoveDisks(self): """Add and remove an extra disk for the instances.""" + Log("Adding and removing disks") for instance in self.instances: - op_add = opcodes.OpSetInstanceParams(\ + Log("instance %s", instance, indent=1) + op_add = opcodes.OpInstanceSetParams(\ instance_name=instance, disks=[(constants.DDM_ADD, {"size": self.disk_size[0]})]) - op_rem = opcodes.OpSetInstanceParams(\ + op_rem = opcodes.OpInstanceSetParams(\ instance_name=instance, disks=[(constants.DDM_REMOVE, {})]) - Log("- Adding a disk to instance %s" % (instance,)) - self.ExecOp(op_add) - self.StopInstance(instance) - Log("- Removing the last disk of instance %s" % (instance,)) - self.ExecOp(op_rem) - self.StartInstance(instance) - for instance in self.instances: - self._CheckInstanceAlive(instance) - - def AddRemoveNICs(self): + op_stop = self.StopInstanceOp(instance) + op_start = self.StartInstanceOp(instance) + Log("adding a disk", indent=2) + Log("removing last disk", indent=2) + self.ExecOrQueue(instance, [op_add, op_stop, op_rem, op_start]) + + @_DoBatch(False) + def BurnAddRemoveNICs(self): """Add and remove an extra NIC for the instances.""" + Log("Adding and removing NICs") for instance in self.instances: - op_add = opcodes.OpSetInstanceParams(\ + Log("instance %s", instance, indent=1) + op_add = opcodes.OpInstanceSetParams(\ instance_name=instance, nics=[(constants.DDM_ADD, {})]) - op_rem = opcodes.OpSetInstanceParams(\ + op_rem = opcodes.OpInstanceSetParams(\ instance_name=instance, nics=[(constants.DDM_REMOVE, {})]) - Log("- Adding a NIC to instance %s" % (instance,)) - self.ExecOp(op_add) - Log("- Removing the last NIC of instance %s" % (instance,)) - self.ExecOp(op_rem) + Log("adding a NIC", indent=2) + Log("removing last NIC", indent=2) + self.ExecOrQueue(instance, [op_add, op_rem]) + + def ConfdCallback(self, reply): + """Callback for confd queries""" + if reply.type == confd_client.UPCALL_REPLY: + if reply.server_reply.status != constants.CONFD_REPL_STATUS_OK: + Err("Query %s gave non-ok status %s: %s" % (reply.orig_request, + reply.server_reply.status, + reply.server_reply)) + if reply.orig_request.type == constants.CONFD_REQ_PING: + Log("Ping: OK", indent=1) + elif reply.orig_request.type == constants.CONFD_REQ_CLUSTER_MASTER: + if reply.server_reply.answer == self.cluster_info["master"]: + Log("Master: OK", indent=1) + else: + Err("Master: wrong: %s" % reply.server_reply.answer) + elif reply.orig_request.type == constants.CONFD_REQ_NODE_ROLE_BYNAME: + if reply.server_reply.answer == constants.CONFD_NODE_ROLE_MASTER: + Log("Node role for master: OK", indent=1) + else: + Err("Node role for master: wrong: %s" % reply.server_reply.answer) + + def DoConfdRequestReply(self, req): + self.confd_counting_callback.RegisterQuery(req.rsalt) + self.confd_client.SendRequest(req, async=False) + while not self.confd_counting_callback.AllAnswered(): + if not self.confd_client.ReceiveReply(): + Err("Did not receive all expected confd replies") + break + + def BurnConfd(self): + """Run confd queries for our instances. + + The following confd queries are tested: + - CONFD_REQ_PING: simple ping + - CONFD_REQ_CLUSTER_MASTER: cluster master + - CONFD_REQ_NODE_ROLE_BYNAME: node role, for the master + + """ + Log("Checking confd results") + + filter_callback = confd_client.ConfdFilterCallback(self.ConfdCallback) + counting_callback = confd_client.ConfdCountingCallback(filter_callback) + self.confd_counting_callback = counting_callback + + self.confd_client = confd_client.GetConfdClient(counting_callback) + + req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING) + self.DoConfdRequestReply(req) + + req = confd_client.ConfdClientRequest( + type=constants.CONFD_REQ_CLUSTER_MASTER) + self.DoConfdRequestReply(req) + + req = confd_client.ConfdClientRequest( + type=constants.CONFD_REQ_NODE_ROLE_BYNAME, + query=self.cluster_info["master"]) + self.DoConfdRequestReply(req) def _CheckInstanceAlive(self, instance): """Check if an instance is alive by doing http checks. @@ -589,18 +956,18 @@ class Burner(object): """ if not self.opts.http_check: return - try: - for retries in range(self.opts.net_timeout): - try: - url = urllib2.urlopen("http://%s/hostname.txt" % instance) - except urllib2.URLError, err: - if err.args[0][0] == errno.ECONNREFUSED: - time.sleep(1) - continue - raise - except urllib2.URLError, err: - raise InstanceDown(instance, str(err)) + end_time = time.time() + self.opts.net_timeout + url = None + while time.time() < end_time and url is None: + try: + url = self.url_opener.open("http://%s/hostname.txt" % instance) + except IOError: + # here we can have connection refused, no route to host, etc. + time.sleep(1) + if url is None: + raise InstanceDown(instance, "Cannot contact instance") hostname = url.read().strip() + url.close() if hostname != instance: raise InstanceDown(instance, ("Hostname mismatch, expected %s, got %s" % (instance, hostname))) @@ -615,55 +982,80 @@ class Burner(object): opts = self.opts - Log("- Testing global parameters") + Log("Testing global parameters") if (len(self.nodes) == 1 and opts.disk_template not in (constants.DT_DISKLESS, constants.DT_PLAIN, - constants.DT_FILE)): - Log("When one node is available/selected the disk template must" + constants.DT_FILE, + constants.DT_SHARED_FILE)): + Err("When one node is available/selected the disk template must" " be 'diskless', 'file' or 'plain'") - sys.exit(1) has_err = True try: - self.CreateInstances() - if opts.do_replace1 and opts.disk_template in constants.DTS_NET_MIRROR: - self.ReplaceDisks1D8() + self.BurnCreateInstances() + if opts.do_replace1 and opts.disk_template in constants.DTS_INT_MIRROR: + self.BurnReplaceDisks1D8() if (opts.do_replace2 and len(self.nodes) > 2 and - opts.disk_template in constants.DTS_NET_MIRROR) : - self.ReplaceDisks2() - - if opts.disk_template != constants.DT_DISKLESS: - self.GrowDisks() - - if opts.do_failover and opts.disk_template in constants.DTS_NET_MIRROR: - self.Failover() + opts.disk_template in constants.DTS_INT_MIRROR): + self.BurnReplaceDisks2() + + if (opts.disk_template in constants.DTS_GROWABLE and + compat.any(n > 0 for n in self.disk_growth)): + self.BurnGrowDisks() + + if opts.do_failover and opts.disk_template in constants.DTS_MIRRORED: + self.BurnFailover() + + if opts.do_migrate: + if opts.disk_template not in constants.DTS_MIRRORED: + Log("Skipping migration (disk template %s does not support it)", + opts.disk_template) + elif not self.hv_class.CAN_MIGRATE: + Log("Skipping migration (hypervisor %s does not support it)", + self.hypervisor) + else: + self.BurnMigrate() + + if (opts.do_move and len(self.nodes) > 1 and + opts.disk_template in [constants.DT_PLAIN, constants.DT_FILE]): + self.BurnMove() if (opts.do_importexport and opts.disk_template not in (constants.DT_DISKLESS, + constants.DT_SHARED_FILE, constants.DT_FILE)): - self.ImportExport() + self.BurnImportExport() if opts.do_reinstall: - self.Reinstall() + self.BurnReinstall() if opts.do_reboot: - self.Reboot() + self.BurnReboot() if opts.do_addremove_disks: - self.AddRemoveDisks() + self.BurnAddRemoveDisks() + default_nic_mode = self.cluster_default_nicparams[constants.NIC_MODE] + # Don't add/remove nics in routed mode, as we would need an ip to add + # them with if opts.do_addremove_nics: - self.AddRemoveNICs() + if default_nic_mode == constants.NIC_MODE_BRIDGED: + self.BurnAddRemoveNICs() + else: + Log("Skipping nic add/remove as the cluster is not in bridged mode") if opts.do_activate_disks: - self.ActivateDisks() + self.BurnActivateDisks() if opts.rename: - self.Rename() + self.BurnRename() + + if opts.do_confd_tests: + self.BurnConfd() if opts.do_startstop: - self.StopStart() + self.BurnStopStart() has_err = False finally: @@ -671,16 +1063,27 @@ class Burner(object): Log("Error detected: opcode buffer follows:\n\n") Log(self.GetFeedbackBuf()) Log("\n\n") - self.Remove() + if not self.opts.keep_instances: + try: + self.BurnRemove() + except Exception, err: # pylint: disable=W0703 + if has_err: # already detected errors, so errors in removal + # are quite expected + Log("Note: error detected during instance remove: %s", err) + else: # non-expected error + raise - return 0 + return constants.EXIT_SUCCESS def main(): - """Main function""" + """Main function. + + """ + utils.SetupLogging(constants.LOG_BURNIN, sys.argv[0], + debug=False, stderr_logging=True) - burner = Burner() - return burner.BurninCluster() + return Burner().BurninCluster() if __name__ == "__main__":