X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/1325da74059c5971c0ff0ef1cbd44892de98e9a7..ea9c753d1c5e376f14b30198f68124369cc2a09f:/tools/burnin diff --git a/tools/burnin b/tools/burnin index 0c2e94d..c5d1612 100755 --- a/tools/burnin +++ b/tools/burnin @@ -1,7 +1,7 @@ #!/usr/bin/python # -# Copyright (C) 2006, 2007 Google Inc. +# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -23,7 +23,6 @@ """ -import os import sys import optparse import time @@ -37,15 +36,30 @@ from ganeti import constants from ganeti import cli from ganeti import errors from ganeti import utils +from ganeti import hypervisor +from ganeti import compat + +from ganeti.confd import client as confd_client USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...") +MAX_RETRIES = 3 +LOG_HEADERS = { + 0: "- ", + 1: "* ", + 2: "" + } + class InstanceDown(Exception): """The checked instance was not up""" +class BurninFailure(Exception): + """Failure detected during burning""" + + def Usage(): """Shows program usage information and exits the program.""" @@ -54,19 +68,18 @@ def Usage(): sys.exit(2) -def Log(msg, indent=0): +def Log(msg, *args, **kwargs): """Simple function that prints out its argument. """ - headers = { - 0: "- ", - 1: "* ", - 2: "" - } - sys.stdout.write("%*s%s%s\n" % (2*indent, "", - headers.get(indent, " "), msg)) + if args: + msg = msg % args + indent = kwargs.get("indent", 0) + sys.stdout.write("%*s%s%s\n" % (2 * indent, "", + LOG_HEADERS.get(indent, " "), msg)) sys.stdout.flush() + def Err(msg, exit_code=1): """Simple error logging that prints to stderr. @@ -78,9 +91,12 @@ def Err(msg, exit_code=1): class SimpleOpener(urllib.FancyURLopener): """A simple url opener""" + # pylint: disable=W0221 - def prompt_user_passwd(self, host, realm, clear_cache = 0): + def prompt_user_passwd(self, host, realm, clear_cache=0): """No-interaction version of prompt_user_passwd.""" + # we follow parent class' API + # pylint: disable=W0613 return None, None def http_error_default(self, url, fp, errcode, errmsg, headers): @@ -93,12 +109,160 @@ class SimpleOpener(urllib.FancyURLopener): (errcode, errmsg)) +OPTIONS = [ + cli.cli_option("-o", "--os", dest="os", default=None, + help="OS to use during burnin", + metavar="", + completion_suggest=cli.OPT_COMPL_ONE_OS), + cli.HYPERVISOR_OPT, + cli.OSPARAMS_OPT, + cli.cli_option("--disk-size", dest="disk_size", + help="Disk size (determines disk count)", + default="128m", type="string", metavar="", + completion_suggest=("128M 512M 1G 4G 1G,256M" + " 4G,1G,1G 10G").split()), + cli.cli_option("--disk-growth", dest="disk_growth", help="Disk growth", + default="128m", type="string", metavar=""), + cli.cli_option("--mem-size", dest="mem_size", help="Memory size", + default=128, type="unit", metavar="", + completion_suggest=("128M 256M 512M 1G 4G 8G" + " 12G 16G").split()), + cli.cli_option("--vcpu-count", dest="vcpu_count", help="VCPU count", + default=3, type="unit", metavar="", + completion_suggest=("1 2 3 4").split()), + cli.DEBUG_OPT, + cli.VERBOSE_OPT, + cli.NOIPCHECK_OPT, + cli.NONAMECHECK_OPT, + cli.EARLY_RELEASE_OPT, + cli.cli_option("--no-replace1", dest="do_replace1", + help="Skip disk replacement with the same secondary", + action="store_false", default=True), + cli.cli_option("--no-replace2", dest="do_replace2", + help="Skip disk replacement with a different secondary", + action="store_false", default=True), + cli.cli_option("--no-failover", dest="do_failover", + help="Skip instance failovers", action="store_false", + default=True), + cli.cli_option("--no-migrate", dest="do_migrate", + help="Skip instance live migration", + action="store_false", default=True), + cli.cli_option("--no-move", dest="do_move", + help="Skip instance moves", action="store_false", + default=True), + cli.cli_option("--no-importexport", dest="do_importexport", + help="Skip instance export/import", action="store_false", + default=True), + cli.cli_option("--no-startstop", dest="do_startstop", + help="Skip instance stop/start", action="store_false", + default=True), + cli.cli_option("--no-reinstall", dest="do_reinstall", + help="Skip instance reinstall", action="store_false", + default=True), + cli.cli_option("--no-reboot", dest="do_reboot", + help="Skip instance reboot", action="store_false", + default=True), + cli.cli_option("--reboot-types", dest="reboot_types", + help="Specify the reboot types", default=None), + cli.cli_option("--no-activate-disks", dest="do_activate_disks", + help="Skip disk activation/deactivation", + action="store_false", default=True), + cli.cli_option("--no-add-disks", dest="do_addremove_disks", + help="Skip disk addition/removal", + action="store_false", default=True), + cli.cli_option("--no-add-nics", dest="do_addremove_nics", + help="Skip NIC addition/removal", + action="store_false", default=True), + cli.cli_option("--no-nics", dest="nics", + help="No network interfaces", action="store_const", + const=[], default=[{}]), + cli.cli_option("--no-confd", dest="do_confd_tests", + help="Skip confd queries", + action="store_false", default=True), + cli.cli_option("--rename", dest="rename", default=None, + help=("Give one unused instance name which is taken" + " to start the renaming sequence"), + metavar=""), + cli.cli_option("-t", "--disk-template", dest="disk_template", + choices=list(constants.DISK_TEMPLATES), + default=constants.DT_DRBD8, + help="Disk template (diskless, file, plain, sharedfile" + " or drbd) [drbd]"), + cli.cli_option("-n", "--nodes", dest="nodes", default="", + help=("Comma separated list of nodes to perform" + " the burnin on (defaults to all nodes)"), + completion_suggest=cli.OPT_COMPL_MANY_NODES), + cli.cli_option("-I", "--iallocator", dest="iallocator", + default=None, type="string", + help=("Perform the allocation using an iallocator" + " instead of fixed node spread (node restrictions no" + " longer apply, therefore -n/--nodes must not be" + " used"), + completion_suggest=cli.OPT_COMPL_ONE_IALLOCATOR), + cli.cli_option("-p", "--parallel", default=False, action="store_true", + dest="parallel", + help=("Enable parallelization of some operations in" + " order to speed burnin or to test granular locking")), + cli.cli_option("--net-timeout", default=15, type="int", + dest="net_timeout", + help=("The instance check network timeout in seconds" + " (defaults to 15 seconds)"), + completion_suggest="15 60 300 900".split()), + cli.cli_option("-C", "--http-check", default=False, action="store_true", + dest="http_check", + help=("Enable checking of instance status via http," + " looking for /hostname.txt that should contain the" + " name of the instance")), + cli.cli_option("-K", "--keep-instances", default=False, + action="store_true", + dest="keep_instances", + help=("Leave instances on the cluster after burnin," + " for investigation in case of errors or simply" + " to use them")), + ] + +# Mainly used for bash completion +ARGUMENTS = [cli.ArgInstance(min=1)] + + +def _DoCheckInstances(fn): + """Decorator for checking instances. + + """ + def wrapper(self, *args, **kwargs): + val = fn(self, *args, **kwargs) + for instance in self.instances: + self._CheckInstanceAlive(instance) # pylint: disable=W0212 + return val + + return wrapper + + +def _DoBatch(retry): + """Decorator for possible batch operations. + + Must come after the _DoCheckInstances decorator (if any). + + @param retry: whether this is a retryable batch, will be + passed to StartBatch + + """ + def wrap(fn): + def batched(self, *args, **kwargs): + self.StartBatch(retry) + val = fn(self, *args, **kwargs) + self.CommitQueue() + return val + return batched + + return wrap + + class Burner(object): """Burner class.""" def __init__(self): """Constructor.""" - utils.SetupLogging(constants.LOG_BURNIN, debug=False, stderr_logging=True) self.url_opener = SimpleOpener() self._feed_buf = StringIO() self.nodes = [] @@ -106,6 +270,9 @@ class Burner(object): self.to_rem = [] self.queued_ops = [] self.opts = None + self.queue_retry = False + self.disk_count = self.disk_growth = self.disk_size = None + self.hvp = self.bep = None self.ParseOptions() self.cl = cli.GetClient() self.GetState() @@ -120,15 +287,47 @@ class Burner(object): def Feedback(self, msg): """Acumulate feedback in our buffer.""" - self._feed_buf.write("%s %s\n" % (time.ctime(utils.MergeTime(msg[0])), - msg[2])) + formatted_msg = "%s %s" % (time.ctime(utils.MergeTime(msg[0])), msg[2]) + self._feed_buf.write(formatted_msg + "\n") if self.opts.verbose: - Log(msg, indent=3) + Log(formatted_msg, indent=3) + + def MaybeRetry(self, retry_count, msg, fn, *args): + """Possibly retry a given function execution. + + @type retry_count: int + @param retry_count: retry counter: + - 0: non-retryable action + - 1: last retry for a retryable action + - MAX_RETRIES: original try for a retryable action + @type msg: str + @param msg: the kind of the operation + @type fn: callable + @param fn: the function to be called + + """ + try: + val = fn(*args) + if retry_count > 0 and retry_count < MAX_RETRIES: + Log("Idempotent %s succeeded after %d retries", + msg, MAX_RETRIES - retry_count) + return val + except Exception, err: # pylint: disable=W0703 + if retry_count == 0: + Log("Non-idempotent %s failed, aborting", msg) + raise + elif retry_count == 1: + Log("Idempotent %s repeated failure, aborting", msg) + raise + else: + Log("Idempotent %s failed, retry #%d/%d: %s", + msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err) + self.MaybeRetry(retry_count - 1, msg, fn, *args) - def ExecOp(self, *ops): + def _ExecOp(self, *ops): """Execute one or more opcodes and manage the exec buffer. - @result: if only opcode has been passed, we return its result; + @return: if only opcode has been passed, we return its result; otherwise we return the list of results """ @@ -139,20 +338,53 @@ class Burner(object): else: return results - def ExecOrQueue(self, name, *ops): + def ExecOp(self, retry, *ops): + """Execute one or more opcodes and manage the exec buffer. + + @return: if only opcode has been passed, we return its result; + otherwise we return the list of results + + """ + if retry: + rval = MAX_RETRIES + else: + rval = 0 + cli.SetGenericOpcodeOpts(ops, self.opts) + return self.MaybeRetry(rval, "opcode", self._ExecOp, *ops) + + def ExecOrQueue(self, name, ops, post_process=None): """Execute an opcode and manage the exec buffer.""" if self.opts.parallel: - self.queued_ops.append((ops, name)) + cli.SetGenericOpcodeOpts(ops, self.opts) + self.queued_ops.append((ops, name, post_process)) else: - return self.ExecOp(*ops) + val = self.ExecOp(self.queue_retry, *ops) # pylint: disable=W0142 + if post_process is not None: + post_process() + return val + + def StartBatch(self, retry): + """Start a new batch of jobs. + + @param retry: whether this is a retryable batch + + """ + self.queued_ops = [] + self.queue_retry = retry def CommitQueue(self): """Execute all submitted opcodes in case of parallel burnin""" - if not self.opts.parallel: + if not self.opts.parallel or not self.queued_ops: return + if self.queue_retry: + rval = MAX_RETRIES + else: + rval = 0 + try: - results = self.ExecJobSet(self.queued_ops) + results = self.MaybeRetry(rval, "jobset", self.ExecJobSet, + self.queued_ops) finally: self.queued_ops = [] return results @@ -166,14 +398,33 @@ class Burner(object): """ self.ClearFeedbackBuf() - job_ids = [cli.SendJob(row[0], cl=self.cl) for row in jobs] - Log("Submitted job ID(s) %s" % ", ".join(job_ids), indent=1) - results = [] - for jid, (_, iname) in zip(job_ids, jobs): - Log("waiting for job %s for %s" % (jid, iname), indent=2) - results.append(cli.PollJob(jid, cl=self.cl, feedback_fn=self.Feedback)) + jex = cli.JobExecutor(cl=self.cl, feedback_fn=self.Feedback) + for ops, name, _ in jobs: + jex.QueueJob(name, *ops) # pylint: disable=W0142 + try: + results = jex.GetResults() + except Exception, err: # pylint: disable=W0703 + Log("Jobs failed: %s", err) + raise BurninFailure() + + fail = False + val = [] + for (_, name, post_process), (success, result) in zip(jobs, results): + if success: + if post_process: + try: + post_process() + except Exception, err: # pylint: disable=W0703 + Log("Post process call for job %s failed: %s", name, err) + fail = True + val.append(result) + else: + fail = True - return results + if fail: + raise BurninFailure() + + return val def ParseOptions(self): """Parses the command line options. @@ -182,98 +433,10 @@ class Burner(object): program. """ - parser = optparse.OptionParser(usage="\n%s" % USAGE, - version="%%prog (ganeti) %s" % - constants.RELEASE_VERSION, - option_class=cli.CliOption) - - parser.add_option("-o", "--os", dest="os", default=None, - help="OS to use during burnin", - metavar="") - parser.add_option("--disk-size", dest="disk_size", - help="Disk size (determines disk count)", - default="128m", type="string", metavar="") - parser.add_option("--disk-growth", dest="disk_growth", help="Disk growth", - default="128m", type="string", metavar="") - parser.add_option("--mem-size", dest="mem_size", help="Memory size", - default=128, type="unit", metavar="") - parser.add_option("-v", "--verbose", - action="store_true", dest="verbose", default=False, - help="print command execution messages to stdout") - parser.add_option("--no-replace1", dest="do_replace1", - help="Skip disk replacement with the same secondary", - action="store_false", default=True) - parser.add_option("--no-replace2", dest="do_replace2", - help="Skip disk replacement with a different secondary", - action="store_false", default=True) - parser.add_option("--no-failover", dest="do_failover", - help="Skip instance failovers", action="store_false", - default=True) - parser.add_option("--no-migrate", dest="do_migrate", - help="Skip instance live migration", - action="store_false", default=True) - parser.add_option("--no-importexport", dest="do_importexport", - help="Skip instance export/import", action="store_false", - default=True) - parser.add_option("--no-startstop", dest="do_startstop", - help="Skip instance stop/start", action="store_false", - default=True) - parser.add_option("--no-reinstall", dest="do_reinstall", - help="Skip instance reinstall", action="store_false", - default=True) - parser.add_option("--no-reboot", dest="do_reboot", - help="Skip instance reboot", action="store_false", - default=True) - parser.add_option("--no-activate-disks", dest="do_activate_disks", - help="Skip disk activation/deactivation", - action="store_false", default=True) - parser.add_option("--no-add-disks", dest="do_addremove_disks", - help="Skip disk addition/removal", - action="store_false", default=True) - parser.add_option("--no-add-nics", dest="do_addremove_nics", - help="Skip NIC addition/removal", - action="store_false", default=True) - parser.add_option("--no-nics", dest="nics", - help="No network interfaces", action="store_const", - const=[], default=[{}]) - parser.add_option("--rename", dest="rename", default=None, - help="Give one unused instance name which is taken" - " to start the renaming sequence", - metavar="") - parser.add_option("-t", "--disk-template", dest="disk_template", - choices=("diskless", "file", "plain", "drbd"), - default="drbd", - help="Disk template (diskless, file, plain or drbd)" - " [drbd]") - parser.add_option("-n", "--nodes", dest="nodes", default="", - help="Comma separated list of nodes to perform" - " the burnin on (defaults to all nodes)") - parser.add_option("-I", "--iallocator", dest="iallocator", - default=None, type="string", - help="Perform the allocation using an iallocator" - " instead of fixed node spread (node restrictions no" - " longer apply, therefore -n/--nodes must not be used") - parser.add_option("-p", "--parallel", default=False, action="store_true", - dest="parallel", - help="Enable parallelization of some operations in" - " order to speed burnin or to test granular locking") - parser.add_option("--net-timeout", default=15, type="int", - dest="net_timeout", - help="The instance check network timeout in seconds" - " (defaults to 15 seconds)") - parser.add_option("-C", "--http-check", default=False, action="store_true", - dest="http_check", - help="Enable checking of instance status via http," - " looking for /hostname.txt that should contain the" - " name of the instance") - parser.add_option("-K", "--keep-instances", default=False, - action="store_true", - dest="keep_instances", - help="Leave instances on the cluster after burnin," - " for investigation in case of errors or simply" - " to use them") - + version=("%%prog (ganeti) %s" % + constants.RELEASE_VERSION), + option_list=OPTIONS) options, args = parser.parse_args() if len(args) < 1 or options.os is None: @@ -281,6 +444,7 @@ class Burner(object): supported_disk_templates = (constants.DT_DISKLESS, constants.DT_FILE, + constants.DT_SHARED_FILE, constants.DT_PLAIN, constants.DT_DRBD8) if options.disk_template not in supported_disk_templates: @@ -306,42 +470,77 @@ class Burner(object): if options.nodes and options.iallocator: Err("Give either the nodes option or the iallocator option, not both") + if options.http_check and not options.name_check: + Err("Can't enable HTTP checks without name checks") + self.opts = options self.instances = args self.bep = { constants.BE_MEMORY: options.mem_size, - constants.BE_VCPUS: 1, + constants.BE_VCPUS: options.vcpu_count, } + + self.hypervisor = None self.hvp = {} + if options.hypervisor: + self.hypervisor, self.hvp = options.hypervisor + + if options.reboot_types is None: + options.reboot_types = constants.REBOOT_TYPES + else: + options.reboot_types = options.reboot_types.split(",") + rt_diff = set(options.reboot_types).difference(constants.REBOOT_TYPES) + if rt_diff: + Err("Invalid reboot types specified: %s" % utils.CommaJoin(rt_diff)) socket.setdefaulttimeout(options.net_timeout) def GetState(self): - """Read the cluster state from the config.""" + """Read the cluster state from the master daemon.""" if self.opts.nodes: names = self.opts.nodes.split(",") else: names = [] try: - op = opcodes.OpQueryNodes(output_fields=["name", "offline"], names=names) - result = self.ExecOp(op) + op = opcodes.OpNodeQuery(output_fields=["name", "offline", "drained"], + names=names, use_locking=True) + result = self.ExecOp(True, op) except errors.GenericError, err: err_code, msg = cli.FormatError(err) Err(msg, exit_code=err_code) - self.nodes = [data[0] for data in result if not data[1]] + self.nodes = [data[0] for data in result if not (data[1] or data[2])] - result = self.ExecOp(opcodes.OpDiagnoseOS(output_fields=["name", "valid"], - names=[])) + op_diagnose = opcodes.OpOsDiagnose(output_fields=["name", + "variants", + "hidden"], + names=[]) + result = self.ExecOp(True, op_diagnose) if not result: Err("Can't get the OS list") - # filter non-valid OS-es - os_set = [val[0] for val in result if val[1]] + found = False + for (name, variants, _) in result: + if self.opts.os in cli.CalculateOSNames(name, variants): + found = True + break - if self.opts.os not in os_set: + if not found: Err("OS '%s' not found" % self.opts.os) + cluster_info = self.cl.QueryClusterInfo() + self.cluster_info = cluster_info + if not self.cluster_info: + Err("Can't get cluster info") + + default_nic_params = self.cluster_info["nicparams"][constants.PP_DEFAULT] + self.cluster_default_nicparams = default_nic_params + if self.hypervisor is None: + self.hypervisor = self.cluster_info["default_hypervisor"] + self.hv_class = hypervisor.GetHypervisorClass(self.hypervisor) + + @_DoCheckInstances + @_DoBatch(False) def BurnCreateInstances(self): """Create the given instances. @@ -353,11 +552,11 @@ class Burner(object): Log("Creating instances") for pnode, snode, instance in mytor: - Log("instance %s" % instance, indent=1) + Log("instance %s", instance, indent=1) if self.opts.iallocator: pnode = snode = None msg = "with iallocator %s" % self.opts.iallocator - elif self.opts.disk_template not in constants.DTS_NET_MIRROR: + elif self.opts.disk_template not in constants.DTS_INT_MIRROR: snode = None msg = "on %s" % pnode else: @@ -365,9 +564,9 @@ class Burner(object): Log(msg, indent=2) - op = opcodes.OpCreateInstance(instance_name=instance, - disks = [ {"size": size} - for size in self.disk_size], + op = opcodes.OpInstanceCreate(instance_name=instance, + disks=[{"size": size} + for size in self.disk_size], disk_template=self.opts.disk_template, nics=self.opts.nics, mode=constants.INSTANCE_CREATE, @@ -375,51 +574,51 @@ class Burner(object): pnode=pnode, snode=snode, start=True, - ip_check=True, + ip_check=self.opts.ip_check, + name_check=self.opts.name_check, wait_for_sync=True, file_driver="loop", file_storage_dir=None, iallocator=self.opts.iallocator, beparams=self.bep, hvparams=self.hvp, + hypervisor=self.hypervisor, + osparams=self.opts.osparams, ) + remove_instance = lambda name: lambda: self.to_rem.append(name) + self.ExecOrQueue(instance, [op], post_process=remove_instance(instance)) - self.ExecOrQueue(instance, op) - self.to_rem.append(instance) - - self.CommitQueue() - - for instance in self.instances: - self._CheckInstanceAlive(instance) - + @_DoBatch(False) def BurnGrowDisks(self): """Grow both the os and the swap disks by the requested amount, if any.""" Log("Growing disks") for instance in self.instances: - Log("instance %s" % instance, indent=1) + Log("instance %s", instance, indent=1) for idx, growth in enumerate(self.disk_growth): if growth > 0: - op = opcodes.OpGrowDisk(instance_name=instance, disk=idx, - amount=growth, wait_for_sync=True) - Log("increase disk/%s by %s MB" % (idx, growth), indent=2) - self.ExecOrQueue(instance, op) - self.CommitQueue() + op = opcodes.OpInstanceGrowDisk(instance_name=instance, disk=idx, + amount=growth, wait_for_sync=True) + Log("increase disk/%s by %s MB", idx, growth, indent=2) + self.ExecOrQueue(instance, [op]) + @_DoBatch(True) def BurnReplaceDisks1D8(self): """Replace disks on primary and secondary for drbd8.""" Log("Replacing disks on the same nodes") + early_release = self.opts.early_release for instance in self.instances: - Log("instance %s" % instance, indent=1) + Log("instance %s", instance, indent=1) ops = [] for mode in constants.REPLACE_DISK_SEC, constants.REPLACE_DISK_PRI: - op = opcodes.OpReplaceDisks(instance_name=instance, - mode=mode, - disks=[i for i in range(self.disk_count)]) - Log("run %s" % mode, indent=2) + op = opcodes.OpInstanceReplaceDisks(instance_name=instance, + mode=mode, + disks=list(range(self.disk_count)), + early_release=early_release) + Log("run %s", mode, indent=2) ops.append(op) - self.ExecOrQueue(instance, *ops) - self.CommitQueue() + self.ExecOrQueue(instance, ops) + @_DoBatch(True) def BurnReplaceDisks2(self): """Replace secondary node.""" Log("Changing the secondary node") @@ -428,48 +627,61 @@ class Burner(object): mytor = izip(islice(cycle(self.nodes), 2, None), self.instances) for tnode, instance in mytor: - Log("instance %s" % instance, indent=1) + Log("instance %s", instance, indent=1) if self.opts.iallocator: tnode = None msg = "with iallocator %s" % self.opts.iallocator else: msg = tnode - op = opcodes.OpReplaceDisks(instance_name=instance, - mode=mode, - remote_node=tnode, - iallocator=self.opts.iallocator, - disks=[i for i in range(self.disk_count)]) - Log("run %s %s" % (mode, msg), indent=2) - self.ExecOrQueue(instance, op) - self.CommitQueue() - + op = opcodes.OpInstanceReplaceDisks(instance_name=instance, + mode=mode, + remote_node=tnode, + iallocator=self.opts.iallocator, + disks=[], + early_release=self.opts.early_release) + Log("run %s %s", mode, msg, indent=2) + self.ExecOrQueue(instance, [op]) + + @_DoCheckInstances + @_DoBatch(False) def BurnFailover(self): """Failover the instances.""" Log("Failing over instances") for instance in self.instances: - Log("instance %s" % instance, indent=1) - op = opcodes.OpFailoverInstance(instance_name=instance, + Log("instance %s", instance, indent=1) + op = opcodes.OpInstanceFailover(instance_name=instance, ignore_consistency=False) + self.ExecOrQueue(instance, [op]) + + @_DoCheckInstances + @_DoBatch(False) + def BurnMove(self): + """Move the instances.""" + Log("Moving instances") + mytor = izip(islice(cycle(self.nodes), 1, None), + self.instances) + for tnode, instance in mytor: + Log("instance %s", instance, indent=1) + op = opcodes.OpInstanceMove(instance_name=instance, + target_node=tnode) + self.ExecOrQueue(instance, [op]) - self.ExecOrQueue(instance, op) - self.CommitQueue() - for instance in self.instances: - self._CheckInstanceAlive(instance) - + @_DoBatch(False) def BurnMigrate(self): """Migrate the instances.""" Log("Migrating instances") for instance in self.instances: - Log("instance %s" % instance, indent=1) - op1 = opcodes.OpMigrateInstance(instance_name=instance, live=True, + Log("instance %s", instance, indent=1) + op1 = opcodes.OpInstanceMigrate(instance_name=instance, mode=None, cleanup=False) - op2 = opcodes.OpMigrateInstance(instance_name=instance, live=True, + op2 = opcodes.OpInstanceMigrate(instance_name=instance, mode=None, cleanup=True) Log("migration and migration cleanup", indent=2) - self.ExecOrQueue(instance, op1, op2) - self.CommitQueue() + self.ExecOrQueue(instance, [op1, op2]) + @_DoCheckInstances + @_DoBatch(False) def BurnImportExport(self): """Export the instance, delete it, and import it back. @@ -481,18 +693,18 @@ class Burner(object): self.instances) for pnode, snode, enode, instance in mytor: - Log("instance %s" % instance, indent=1) + Log("instance %s", instance, indent=1) # read the full name of the instance - nam_op = opcodes.OpQueryInstances(output_fields=["name"], - names=[instance]) - full_name = self.ExecOp(nam_op)[0][0] + nam_op = opcodes.OpInstanceQuery(output_fields=["name"], + names=[instance], use_locking=True) + full_name = self.ExecOp(False, nam_op)[0][0] if self.opts.iallocator: pnode = snode = None import_log_msg = ("import from %s" " with iallocator %s" % (enode, self.opts.iallocator)) - elif self.opts.disk_template not in constants.DTS_NET_MIRROR: + elif self.opts.disk_template not in constants.DTS_INT_MIRROR: snode = None import_log_msg = ("import from %s to %s" % (enode, pnode)) @@ -500,15 +712,16 @@ class Burner(object): import_log_msg = ("import from %s to %s, %s" % (enode, pnode, snode)) - exp_op = opcodes.OpExportInstance(instance_name=instance, - target_node=enode, - shutdown=True) - rem_op = opcodes.OpRemoveInstance(instance_name=instance, + exp_op = opcodes.OpBackupExport(instance_name=instance, + target_node=enode, + mode=constants.EXPORT_MODE_LOCAL, + shutdown=True) + rem_op = opcodes.OpInstanceRemove(instance_name=instance, ignore_failures=True) - imp_dir = os.path.join(constants.EXPORT_DIR, full_name) - imp_op = opcodes.OpCreateInstance(instance_name=instance, - disks = [ {"size": size} - for size in self.disk_size], + imp_dir = utils.PathJoin(constants.EXPORT_DIR, full_name) + imp_op = opcodes.OpInstanceCreate(instance_name=instance, + disks=[{"size": size} + for size in self.disk_size], disk_template=self.opts.disk_template, nics=self.opts.nics, mode=constants.INSTANCE_IMPORT, @@ -517,64 +730,61 @@ class Burner(object): pnode=pnode, snode=snode, start=True, - ip_check=True, + ip_check=self.opts.ip_check, + name_check=self.opts.name_check, wait_for_sync=True, file_storage_dir=None, file_driver="loop", iallocator=self.opts.iallocator, beparams=self.bep, hvparams=self.hvp, + osparams=self.opts.osparams, ) - erem_op = opcodes.OpRemoveExport(instance_name=instance) + erem_op = opcodes.OpBackupRemove(instance_name=instance) - Log("export to node %s" % enode, indent=2) + Log("export to node %s", enode, indent=2) Log("remove instance", indent=2) Log(import_log_msg, indent=2) Log("remove export", indent=2) - self.ExecOrQueue(instance, exp_op, rem_op, imp_op, erem_op) - - self.CommitQueue() - for instance in self.instances: - self._CheckInstanceAlive(instance) + self.ExecOrQueue(instance, [exp_op, rem_op, imp_op, erem_op]) - def StopInstanceOp(self, instance): + @staticmethod + def StopInstanceOp(instance): """Stop given instance.""" - return opcodes.OpShutdownInstance(instance_name=instance) + return opcodes.OpInstanceShutdown(instance_name=instance) - def StartInstanceOp(self, instance): + @staticmethod + def StartInstanceOp(instance): """Start given instance.""" - return opcodes.OpStartupInstance(instance_name=instance, force=False) + return opcodes.OpInstanceStartup(instance_name=instance, force=False) - def RenameInstanceOp(self, instance, instance_new): + @staticmethod + def RenameInstanceOp(instance, instance_new): """Rename instance.""" - return opcodes.OpRenameInstance(instance_name=instance, + return opcodes.OpInstanceRename(instance_name=instance, new_name=instance_new) + @_DoCheckInstances + @_DoBatch(True) def BurnStopStart(self): """Stop/start the instances.""" Log("Stopping and starting instances") for instance in self.instances: - Log("instance %s" % instance, indent=1) + Log("instance %s", instance, indent=1) op1 = self.StopInstanceOp(instance) op2 = self.StartInstanceOp(instance) - self.ExecOrQueue(instance, op1, op2) - - self.CommitQueue() - - for instance in self.instances: - self._CheckInstanceAlive(instance) + self.ExecOrQueue(instance, [op1, op2]) + @_DoBatch(False) def BurnRemove(self): """Remove the instances.""" Log("Removing instances") for instance in self.to_rem: - Log("instance %s" % instance, indent=1) - op = opcodes.OpRemoveInstance(instance_name=instance, + Log("instance %s", instance, indent=1) + op = opcodes.OpInstanceRemove(instance_name=instance, ignore_failures=True) - self.ExecOrQueue(instance, op) - - self.CommitQueue() + self.ExecOrQueue(instance, [op]) def BurnRename(self): """Rename the instances. @@ -586,104 +796,154 @@ class Burner(object): Log("Renaming instances") rename = self.opts.rename for instance in self.instances: - Log("instance %s" % instance, indent=1) - op_stop = self.StopInstanceOp(instance) + Log("instance %s", instance, indent=1) + op_stop1 = self.StopInstanceOp(instance) + op_stop2 = self.StopInstanceOp(rename) op_rename1 = self.RenameInstanceOp(instance, rename) op_rename2 = self.RenameInstanceOp(rename, instance) op_start1 = self.StartInstanceOp(rename) op_start2 = self.StartInstanceOp(instance) - self.ExecOp(op_stop, op_rename1, op_start1) + self.ExecOp(False, op_stop1, op_rename1, op_start1) self._CheckInstanceAlive(rename) - self.ExecOp(op_stop, op_rename2, op_start2) + self.ExecOp(False, op_stop2, op_rename2, op_start2) self._CheckInstanceAlive(instance) + @_DoCheckInstances + @_DoBatch(True) def BurnReinstall(self): """Reinstall the instances.""" Log("Reinstalling instances") for instance in self.instances: - Log("instance %s" % instance, indent=1) + Log("instance %s", instance, indent=1) op1 = self.StopInstanceOp(instance) - op2 = opcodes.OpReinstallInstance(instance_name=instance) + op2 = opcodes.OpInstanceReinstall(instance_name=instance) Log("reinstall without passing the OS", indent=2) - op3 = opcodes.OpReinstallInstance(instance_name=instance, + op3 = opcodes.OpInstanceReinstall(instance_name=instance, os_type=self.opts.os) Log("reinstall specifying the OS", indent=2) op4 = self.StartInstanceOp(instance) - self.ExecOrQueue(instance, op1, op2, op3, op4) - - self.CommitQueue() - - for instance in self.instances: - self._CheckInstanceAlive(instance) + self.ExecOrQueue(instance, [op1, op2, op3, op4]) + @_DoCheckInstances + @_DoBatch(True) def BurnReboot(self): """Reboot the instances.""" Log("Rebooting instances") for instance in self.instances: - Log("instance %s" % instance, indent=1) + Log("instance %s", instance, indent=1) ops = [] - for reboot_type in constants.REBOOT_TYPES: - op = opcodes.OpRebootInstance(instance_name=instance, + for reboot_type in self.opts.reboot_types: + op = opcodes.OpInstanceReboot(instance_name=instance, reboot_type=reboot_type, ignore_secondaries=False) - Log("reboot with type '%s'" % reboot_type, indent=2) + Log("reboot with type '%s'", reboot_type, indent=2) ops.append(op) - self.ExecOrQueue(instance, *ops) - - self.CommitQueue() - - for instance in self.instances: - self._CheckInstanceAlive(instance) + self.ExecOrQueue(instance, ops) + @_DoCheckInstances + @_DoBatch(True) def BurnActivateDisks(self): """Activate and deactivate disks of the instances.""" Log("Activating/deactivating disks") for instance in self.instances: - Log("instance %s" % instance, indent=1) + Log("instance %s", instance, indent=1) op_start = self.StartInstanceOp(instance) - op_act = opcodes.OpActivateInstanceDisks(instance_name=instance) - op_deact = opcodes.OpDeactivateInstanceDisks(instance_name=instance) + op_act = opcodes.OpInstanceActivateDisks(instance_name=instance) + op_deact = opcodes.OpInstanceDeactivateDisks(instance_name=instance) op_stop = self.StopInstanceOp(instance) Log("activate disks when online", indent=2) Log("activate disks when offline", indent=2) Log("deactivate disks (when offline)", indent=2) - self.ExecOrQueue(instance, op_act, op_stop, op_act, op_deact, op_start) - self.CommitQueue() - for instance in self.instances: - self._CheckInstanceAlive(instance) + self.ExecOrQueue(instance, [op_act, op_stop, op_act, op_deact, op_start]) + @_DoCheckInstances + @_DoBatch(False) def BurnAddRemoveDisks(self): """Add and remove an extra disk for the instances.""" Log("Adding and removing disks") for instance in self.instances: - Log("instance %s" % instance, indent=1) - op_add = opcodes.OpSetInstanceParams(\ + Log("instance %s", instance, indent=1) + op_add = opcodes.OpInstanceSetParams(\ instance_name=instance, disks=[(constants.DDM_ADD, {"size": self.disk_size[0]})]) - op_rem = opcodes.OpSetInstanceParams(\ + op_rem = opcodes.OpInstanceSetParams(\ instance_name=instance, disks=[(constants.DDM_REMOVE, {})]) op_stop = self.StopInstanceOp(instance) op_start = self.StartInstanceOp(instance) Log("adding a disk", indent=2) Log("removing last disk", indent=2) - self.ExecOrQueue(instance, op_add, op_stop, op_rem, op_start) - self.CommitQueue() - for instance in self.instances: - self._CheckInstanceAlive(instance) + self.ExecOrQueue(instance, [op_add, op_stop, op_rem, op_start]) + @_DoBatch(False) def BurnAddRemoveNICs(self): """Add and remove an extra NIC for the instances.""" Log("Adding and removing NICs") for instance in self.instances: - Log("instance %s" % instance, indent=1) - op_add = opcodes.OpSetInstanceParams(\ + Log("instance %s", instance, indent=1) + op_add = opcodes.OpInstanceSetParams(\ instance_name=instance, nics=[(constants.DDM_ADD, {})]) - op_rem = opcodes.OpSetInstanceParams(\ + op_rem = opcodes.OpInstanceSetParams(\ instance_name=instance, nics=[(constants.DDM_REMOVE, {})]) Log("adding a NIC", indent=2) Log("removing last NIC", indent=2) - self.ExecOrQueue(instance, op_add, op_rem) - self.CommitQueue() + self.ExecOrQueue(instance, [op_add, op_rem]) + + def ConfdCallback(self, reply): + """Callback for confd queries""" + if reply.type == confd_client.UPCALL_REPLY: + if reply.server_reply.status != constants.CONFD_REPL_STATUS_OK: + Err("Query %s gave non-ok status %s: %s" % (reply.orig_request, + reply.server_reply.status, + reply.server_reply)) + if reply.orig_request.type == constants.CONFD_REQ_PING: + Log("Ping: OK", indent=1) + elif reply.orig_request.type == constants.CONFD_REQ_CLUSTER_MASTER: + if reply.server_reply.answer == self.cluster_info["master"]: + Log("Master: OK", indent=1) + else: + Err("Master: wrong: %s" % reply.server_reply.answer) + elif reply.orig_request.type == constants.CONFD_REQ_NODE_ROLE_BYNAME: + if reply.server_reply.answer == constants.CONFD_NODE_ROLE_MASTER: + Log("Node role for master: OK", indent=1) + else: + Err("Node role for master: wrong: %s" % reply.server_reply.answer) + + def DoConfdRequestReply(self, req): + self.confd_counting_callback.RegisterQuery(req.rsalt) + self.confd_client.SendRequest(req, async=False) + while not self.confd_counting_callback.AllAnswered(): + if not self.confd_client.ReceiveReply(): + Err("Did not receive all expected confd replies") + break + + def BurnConfd(self): + """Run confd queries for our instances. + + The following confd queries are tested: + - CONFD_REQ_PING: simple ping + - CONFD_REQ_CLUSTER_MASTER: cluster master + - CONFD_REQ_NODE_ROLE_BYNAME: node role, for the master + + """ + Log("Checking confd results") + + filter_callback = confd_client.ConfdFilterCallback(self.ConfdCallback) + counting_callback = confd_client.ConfdCountingCallback(filter_callback) + self.confd_counting_callback = counting_callback + + self.confd_client = confd_client.GetConfdClient(counting_callback) + + req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING) + self.DoConfdRequestReply(req) + + req = confd_client.ConfdClientRequest( + type=constants.CONFD_REQ_CLUSTER_MASTER) + self.DoConfdRequestReply(req) + + req = confd_client.ConfdClientRequest( + type=constants.CONFD_REQ_NODE_ROLE_BYNAME, + query=self.cluster_info["master"]) + self.DoConfdRequestReply(req) def _CheckInstanceAlive(self, instance): """Check if an instance is alive by doing http checks. @@ -726,31 +986,44 @@ class Burner(object): if (len(self.nodes) == 1 and opts.disk_template not in (constants.DT_DISKLESS, constants.DT_PLAIN, - constants.DT_FILE)): + constants.DT_FILE, + constants.DT_SHARED_FILE)): Err("When one node is available/selected the disk template must" " be 'diskless', 'file' or 'plain'") has_err = True try: self.BurnCreateInstances() - if opts.do_replace1 and opts.disk_template in constants.DTS_NET_MIRROR: + if opts.do_replace1 and opts.disk_template in constants.DTS_INT_MIRROR: self.BurnReplaceDisks1D8() if (opts.do_replace2 and len(self.nodes) > 2 and - opts.disk_template in constants.DTS_NET_MIRROR) : + opts.disk_template in constants.DTS_INT_MIRROR): self.BurnReplaceDisks2() - if (opts.disk_template != constants.DT_DISKLESS and - utils.any(self.disk_growth, lambda n: n > 0)): + if (opts.disk_template in constants.DTS_GROWABLE and + compat.any(n > 0 for n in self.disk_growth)): self.BurnGrowDisks() - if opts.do_failover and opts.disk_template in constants.DTS_NET_MIRROR: + if opts.do_failover and opts.disk_template in constants.DTS_MIRRORED: self.BurnFailover() - if opts.do_migrate and opts.disk_template == constants.DT_DRBD8: - self.BurnMigrate() + if opts.do_migrate: + if opts.disk_template not in constants.DTS_MIRRORED: + Log("Skipping migration (disk template %s does not support it)", + opts.disk_template) + elif not self.hv_class.CAN_MIGRATE: + Log("Skipping migration (hypervisor %s does not support it)", + self.hypervisor) + else: + self.BurnMigrate() + + if (opts.do_move and len(self.nodes) > 1 and + opts.disk_template in [constants.DT_PLAIN, constants.DT_FILE]): + self.BurnMove() if (opts.do_importexport and opts.disk_template not in (constants.DT_DISKLESS, + constants.DT_SHARED_FILE, constants.DT_FILE)): self.BurnImportExport() @@ -763,8 +1036,14 @@ class Burner(object): if opts.do_addremove_disks: self.BurnAddRemoveDisks() + default_nic_mode = self.cluster_default_nicparams[constants.NIC_MODE] + # Don't add/remove nics in routed mode, as we would need an ip to add + # them with if opts.do_addremove_nics: - self.BurnAddRemoveNICs() + if default_nic_mode == constants.NIC_MODE_BRIDGED: + self.BurnAddRemoveNICs() + else: + Log("Skipping nic add/remove as the cluster is not in bridged mode") if opts.do_activate_disks: self.BurnActivateDisks() @@ -772,6 +1051,9 @@ class Burner(object): if opts.rename: self.BurnRename() + if opts.do_confd_tests: + self.BurnConfd() + if opts.do_startstop: self.BurnStopStart() @@ -782,16 +1064,26 @@ class Burner(object): Log(self.GetFeedbackBuf()) Log("\n\n") if not self.opts.keep_instances: - self.BurnRemove() + try: + self.BurnRemove() + except Exception, err: # pylint: disable=W0703 + if has_err: # already detected errors, so errors in removal + # are quite expected + Log("Note: error detected during instance remove: %s", err) + else: # non-expected error + raise - return 0 + return constants.EXIT_SUCCESS def main(): - """Main function""" + """Main function. + + """ + utils.SetupLogging(constants.LOG_BURNIN, sys.argv[0], + debug=False, stderr_logging=True) - burner = Burner() - return burner.BurninCluster() + return Burner().BurninCluster() if __name__ == "__main__":