X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/265e624459a420b31ddab0d848e4160ce55c5a84..ea9c753d1c5e376f14b30198f68124369cc2a09f:/tools/burnin diff --git a/tools/burnin b/tools/burnin index a17e852..c5d1612 100755 --- a/tools/burnin +++ b/tools/burnin @@ -1,7 +1,7 @@ #!/usr/bin/python # -# Copyright (C) 2006, 2007 Google Inc. +# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -23,7 +23,6 @@ """ -import os import sys import optparse import time @@ -37,11 +36,21 @@ from ganeti import constants from ganeti import cli from ganeti import errors from ganeti import utils +from ganeti import hypervisor +from ganeti import compat + +from ganeti.confd import client as confd_client USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...") MAX_RETRIES = 3 +LOG_HEADERS = { + 0: "- ", + 1: "* ", + 2: "" + } + class InstanceDown(Exception): """The checked instance was not up""" @@ -59,19 +68,18 @@ def Usage(): sys.exit(2) -def Log(msg, indent=0): +def Log(msg, *args, **kwargs): """Simple function that prints out its argument. """ - headers = { - 0: "- ", - 1: "* ", - 2: "" - } - sys.stdout.write("%*s%s%s\n" % (2*indent, "", - headers.get(indent, " "), msg)) + if args: + msg = msg % args + indent = kwargs.get("indent", 0) + sys.stdout.write("%*s%s%s\n" % (2 * indent, "", + LOG_HEADERS.get(indent, " "), msg)) sys.stdout.flush() + def Err(msg, exit_code=1): """Simple error logging that prints to stderr. @@ -83,12 +91,12 @@ def Err(msg, exit_code=1): class SimpleOpener(urllib.FancyURLopener): """A simple url opener""" - # pylint: disable-msg=W0221 + # pylint: disable=W0221 def prompt_user_passwd(self, host, realm, clear_cache=0): """No-interaction version of prompt_user_passwd.""" # we follow parent class' API - # pylint: disable-msg=W0613 + # pylint: disable=W0613 return None, None def http_error_default(self, url, fp, errcode, errmsg, headers): @@ -106,6 +114,8 @@ OPTIONS = [ help="OS to use during burnin", metavar="", completion_suggest=cli.OPT_COMPL_ONE_OS), + cli.HYPERVISOR_OPT, + cli.OSPARAMS_OPT, cli.cli_option("--disk-size", dest="disk_size", help="Disk size (determines disk count)", default="128m", type="string", metavar="", @@ -117,6 +127,9 @@ OPTIONS = [ default=128, type="unit", metavar="", completion_suggest=("128M 256M 512M 1G 4G 8G" " 12G 16G").split()), + cli.cli_option("--vcpu-count", dest="vcpu_count", help="VCPU count", + default=3, type="unit", metavar="", + completion_suggest=("1 2 3 4").split()), cli.DEBUG_OPT, cli.VERBOSE_OPT, cli.NOIPCHECK_OPT, @@ -149,6 +162,8 @@ OPTIONS = [ cli.cli_option("--no-reboot", dest="do_reboot", help="Skip instance reboot", action="store_false", default=True), + cli.cli_option("--reboot-types", dest="reboot_types", + help="Specify the reboot types", default=None), cli.cli_option("--no-activate-disks", dest="do_activate_disks", help="Skip disk activation/deactivation", action="store_false", default=True), @@ -161,6 +176,9 @@ OPTIONS = [ cli.cli_option("--no-nics", dest="nics", help="No network interfaces", action="store_const", const=[], default=[{}]), + cli.cli_option("--no-confd", dest="do_confd_tests", + help="Skip confd queries", + action="store_false", default=True), cli.cli_option("--rename", dest="rename", default=None, help=("Give one unused instance name which is taken" " to start the renaming sequence"), @@ -168,7 +186,8 @@ OPTIONS = [ cli.cli_option("-t", "--disk-template", dest="disk_template", choices=list(constants.DISK_TEMPLATES), default=constants.DT_DRBD8, - help="Disk template (diskless, file, plain or drbd) [drbd]"), + help="Disk template (diskless, file, plain, sharedfile" + " or drbd) [drbd]"), cli.cli_option("-n", "--nodes", dest="nodes", default="", help=("Comma separated list of nodes to perform" " the burnin on (defaults to all nodes)"), @@ -213,7 +232,7 @@ def _DoCheckInstances(fn): def wrapper(self, *args, **kwargs): val = fn(self, *args, **kwargs) for instance in self.instances: - self._CheckInstanceAlive(instance) # pylint: disable-msg=W0212 + self._CheckInstanceAlive(instance) # pylint: disable=W0212 return val return wrapper @@ -244,7 +263,6 @@ class Burner(object): def __init__(self): """Constructor.""" - utils.SetupLogging(constants.LOG_BURNIN, debug=False, stderr_logging=True) self.url_opener = SimpleOpener() self._feed_buf = StringIO() self.nodes = [] @@ -291,30 +309,25 @@ class Burner(object): try: val = fn(*args) if retry_count > 0 and retry_count < MAX_RETRIES: - Log("Idempotent %s succeeded after %d retries" % - (msg, MAX_RETRIES - retry_count)) + Log("Idempotent %s succeeded after %d retries", + msg, MAX_RETRIES - retry_count) return val - except Exception, err: # pylint: disable-msg=W0703 + except Exception, err: # pylint: disable=W0703 if retry_count == 0: - Log("Non-idempotent %s failed, aborting" % (msg, )) + Log("Non-idempotent %s failed, aborting", msg) raise elif retry_count == 1: - Log("Idempotent %s repeated failure, aborting" % (msg, )) + Log("Idempotent %s repeated failure, aborting", msg) raise else: - Log("Idempotent %s failed, retry #%d/%d: %s" % - (msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err)) + Log("Idempotent %s failed, retry #%d/%d: %s", + msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err) self.MaybeRetry(retry_count - 1, msg, fn, *args) - def _SetDebug(self, ops): - """Set the debug value on the given opcodes""" - for op in ops: - op.debug_level = self.opts.debug - def _ExecOp(self, *ops): """Execute one or more opcodes and manage the exec buffer. - @result: if only opcode has been passed, we return its result; + @return: if only opcode has been passed, we return its result; otherwise we return the list of results """ @@ -328,7 +341,7 @@ class Burner(object): def ExecOp(self, retry, *ops): """Execute one or more opcodes and manage the exec buffer. - @result: if only opcode has been passed, we return its result; + @return: if only opcode has been passed, we return its result; otherwise we return the list of results """ @@ -336,16 +349,19 @@ class Burner(object): rval = MAX_RETRIES else: rval = 0 - self._SetDebug(ops) + cli.SetGenericOpcodeOpts(ops, self.opts) return self.MaybeRetry(rval, "opcode", self._ExecOp, *ops) - def ExecOrQueue(self, name, *ops): + def ExecOrQueue(self, name, ops, post_process=None): """Execute an opcode and manage the exec buffer.""" if self.opts.parallel: - self._SetDebug(ops) - self.queued_ops.append((ops, name)) + cli.SetGenericOpcodeOpts(ops, self.opts) + self.queued_ops.append((ops, name, post_process)) else: - return self.ExecOp(self.queue_retry, *ops) + val = self.ExecOp(self.queue_retry, *ops) # pylint: disable=W0142 + if post_process is not None: + post_process() + return val def StartBatch(self, retry): """Start a new batch of jobs. @@ -358,7 +374,7 @@ class Burner(object): def CommitQueue(self): """Execute all submitted opcodes in case of parallel burnin""" - if not self.opts.parallel: + if not self.opts.parallel or not self.queued_ops: return if self.queue_retry: @@ -382,18 +398,33 @@ class Burner(object): """ self.ClearFeedbackBuf() - job_ids = [cli.SendJob(row[0], cl=self.cl) for row in jobs] - Log("Submitted job ID(s) %s" % utils.CommaJoin(job_ids), indent=1) - results = [] - for jid, (_, iname) in zip(job_ids, jobs): - Log("waiting for job %s for %s" % (jid, iname), indent=2) - try: - results.append(cli.PollJob(jid, cl=self.cl, feedback_fn=self.Feedback)) - except Exception, err: # pylint: disable-msg=W0703 - Log("Job for %s failed: %s" % (iname, err)) - if len(results) != len(jobs): + jex = cli.JobExecutor(cl=self.cl, feedback_fn=self.Feedback) + for ops, name, _ in jobs: + jex.QueueJob(name, *ops) # pylint: disable=W0142 + try: + results = jex.GetResults() + except Exception, err: # pylint: disable=W0703 + Log("Jobs failed: %s", err) raise BurninFailure() - return results + + fail = False + val = [] + for (_, name, post_process), (success, result) in zip(jobs, results): + if success: + if post_process: + try: + post_process() + except Exception, err: # pylint: disable=W0703 + Log("Post process call for job %s failed: %s", name, err) + fail = True + val.append(result) + else: + fail = True + + if fail: + raise BurninFailure() + + return val def ParseOptions(self): """Parses the command line options. @@ -413,6 +444,7 @@ class Burner(object): supported_disk_templates = (constants.DT_DISKLESS, constants.DT_FILE, + constants.DT_SHARED_FILE, constants.DT_PLAIN, constants.DT_DRBD8) if options.disk_template not in supported_disk_templates: @@ -445,43 +477,68 @@ class Burner(object): self.instances = args self.bep = { constants.BE_MEMORY: options.mem_size, - constants.BE_VCPUS: 1, + constants.BE_VCPUS: options.vcpu_count, } + + self.hypervisor = None self.hvp = {} + if options.hypervisor: + self.hypervisor, self.hvp = options.hypervisor + + if options.reboot_types is None: + options.reboot_types = constants.REBOOT_TYPES + else: + options.reboot_types = options.reboot_types.split(",") + rt_diff = set(options.reboot_types).difference(constants.REBOOT_TYPES) + if rt_diff: + Err("Invalid reboot types specified: %s" % utils.CommaJoin(rt_diff)) socket.setdefaulttimeout(options.net_timeout) def GetState(self): - """Read the cluster state from the config.""" + """Read the cluster state from the master daemon.""" if self.opts.nodes: names = self.opts.nodes.split(",") else: names = [] try: - op = opcodes.OpQueryNodes(output_fields=["name", "offline", "drained"], - names=names, use_locking=True) + op = opcodes.OpNodeQuery(output_fields=["name", "offline", "drained"], + names=names, use_locking=True) result = self.ExecOp(True, op) except errors.GenericError, err: err_code, msg = cli.FormatError(err) Err(msg, exit_code=err_code) self.nodes = [data[0] for data in result if not (data[1] or data[2])] - op_diagnose = opcodes.OpDiagnoseOS(output_fields=["name", "valid", - "variants"], names=[]) + op_diagnose = opcodes.OpOsDiagnose(output_fields=["name", + "variants", + "hidden"], + names=[]) result = self.ExecOp(True, op_diagnose) if not result: Err("Can't get the OS list") found = False - for (name, valid, variants) in result: - if valid and self.opts.os in cli.CalculateOSNames(name, variants): + for (name, variants, _) in result: + if self.opts.os in cli.CalculateOSNames(name, variants): found = True break if not found: Err("OS '%s' not found" % self.opts.os) + cluster_info = self.cl.QueryClusterInfo() + self.cluster_info = cluster_info + if not self.cluster_info: + Err("Can't get cluster info") + + default_nic_params = self.cluster_info["nicparams"][constants.PP_DEFAULT] + self.cluster_default_nicparams = default_nic_params + if self.hypervisor is None: + self.hypervisor = self.cluster_info["default_hypervisor"] + self.hv_class = hypervisor.GetHypervisorClass(self.hypervisor) + @_DoCheckInstances @_DoBatch(False) def BurnCreateInstances(self): @@ -495,11 +552,11 @@ class Burner(object): Log("Creating instances") for pnode, snode, instance in mytor: - Log("instance %s" % instance, indent=1) + Log("instance %s", instance, indent=1) if self.opts.iallocator: pnode = snode = None msg = "with iallocator %s" % self.opts.iallocator - elif self.opts.disk_template not in constants.DTS_NET_MIRROR: + elif self.opts.disk_template not in constants.DTS_INT_MIRROR: snode = None msg = "on %s" % pnode else: @@ -507,9 +564,9 @@ class Burner(object): Log(msg, indent=2) - op = opcodes.OpCreateInstance(instance_name=instance, - disks = [ {"size": size} - for size in self.disk_size], + op = opcodes.OpInstanceCreate(instance_name=instance, + disks=[{"size": size} + for size in self.disk_size], disk_template=self.opts.disk_template, nics=self.opts.nics, mode=constants.INSTANCE_CREATE, @@ -525,39 +582,41 @@ class Burner(object): iallocator=self.opts.iallocator, beparams=self.bep, hvparams=self.hvp, + hypervisor=self.hypervisor, + osparams=self.opts.osparams, ) - - self.ExecOrQueue(instance, op) - self.to_rem.append(instance) + remove_instance = lambda name: lambda: self.to_rem.append(name) + self.ExecOrQueue(instance, [op], post_process=remove_instance(instance)) @_DoBatch(False) def BurnGrowDisks(self): """Grow both the os and the swap disks by the requested amount, if any.""" Log("Growing disks") for instance in self.instances: - Log("instance %s" % instance, indent=1) + Log("instance %s", instance, indent=1) for idx, growth in enumerate(self.disk_growth): if growth > 0: - op = opcodes.OpGrowDisk(instance_name=instance, disk=idx, - amount=growth, wait_for_sync=True) - Log("increase disk/%s by %s MB" % (idx, growth), indent=2) - self.ExecOrQueue(instance, op) + op = opcodes.OpInstanceGrowDisk(instance_name=instance, disk=idx, + amount=growth, wait_for_sync=True) + Log("increase disk/%s by %s MB", idx, growth, indent=2) + self.ExecOrQueue(instance, [op]) @_DoBatch(True) def BurnReplaceDisks1D8(self): """Replace disks on primary and secondary for drbd8.""" Log("Replacing disks on the same nodes") + early_release = self.opts.early_release for instance in self.instances: - Log("instance %s" % instance, indent=1) + Log("instance %s", instance, indent=1) ops = [] for mode in constants.REPLACE_DISK_SEC, constants.REPLACE_DISK_PRI: - op = opcodes.OpReplaceDisks(instance_name=instance, - mode=mode, - disks=[i for i in range(self.disk_count)], - early_release=self.opts.early_release) - Log("run %s" % mode, indent=2) + op = opcodes.OpInstanceReplaceDisks(instance_name=instance, + mode=mode, + disks=list(range(self.disk_count)), + early_release=early_release) + Log("run %s", mode, indent=2) ops.append(op) - self.ExecOrQueue(instance, *ops) # pylint: disable-msg=W0142 + self.ExecOrQueue(instance, ops) @_DoBatch(True) def BurnReplaceDisks2(self): @@ -568,20 +627,20 @@ class Burner(object): mytor = izip(islice(cycle(self.nodes), 2, None), self.instances) for tnode, instance in mytor: - Log("instance %s" % instance, indent=1) + Log("instance %s", instance, indent=1) if self.opts.iallocator: tnode = None msg = "with iallocator %s" % self.opts.iallocator else: msg = tnode - op = opcodes.OpReplaceDisks(instance_name=instance, - mode=mode, - remote_node=tnode, - iallocator=self.opts.iallocator, - disks=[], - early_release=self.opts.early_release) - Log("run %s %s" % (mode, msg), indent=2) - self.ExecOrQueue(instance, op) + op = opcodes.OpInstanceReplaceDisks(instance_name=instance, + mode=mode, + remote_node=tnode, + iallocator=self.opts.iallocator, + disks=[], + early_release=self.opts.early_release) + Log("run %s %s", mode, msg, indent=2) + self.ExecOrQueue(instance, [op]) @_DoCheckInstances @_DoBatch(False) @@ -589,10 +648,10 @@ class Burner(object): """Failover the instances.""" Log("Failing over instances") for instance in self.instances: - Log("instance %s" % instance, indent=1) - op = opcodes.OpFailoverInstance(instance_name=instance, + Log("instance %s", instance, indent=1) + op = opcodes.OpInstanceFailover(instance_name=instance, ignore_consistency=False) - self.ExecOrQueue(instance, op) + self.ExecOrQueue(instance, [op]) @_DoCheckInstances @_DoBatch(False) @@ -602,24 +661,24 @@ class Burner(object): mytor = izip(islice(cycle(self.nodes), 1, None), self.instances) for tnode, instance in mytor: - Log("instance %s" % instance, indent=1) - op = opcodes.OpMoveInstance(instance_name=instance, + Log("instance %s", instance, indent=1) + op = opcodes.OpInstanceMove(instance_name=instance, target_node=tnode) - self.ExecOrQueue(instance, op) + self.ExecOrQueue(instance, [op]) @_DoBatch(False) def BurnMigrate(self): """Migrate the instances.""" Log("Migrating instances") for instance in self.instances: - Log("instance %s" % instance, indent=1) - op1 = opcodes.OpMigrateInstance(instance_name=instance, live=True, + Log("instance %s", instance, indent=1) + op1 = opcodes.OpInstanceMigrate(instance_name=instance, mode=None, cleanup=False) - op2 = opcodes.OpMigrateInstance(instance_name=instance, live=True, + op2 = opcodes.OpInstanceMigrate(instance_name=instance, mode=None, cleanup=True) Log("migration and migration cleanup", indent=2) - self.ExecOrQueue(instance, op1, op2) + self.ExecOrQueue(instance, [op1, op2]) @_DoCheckInstances @_DoBatch(False) @@ -634,10 +693,10 @@ class Burner(object): self.instances) for pnode, snode, enode, instance in mytor: - Log("instance %s" % instance, indent=1) + Log("instance %s", instance, indent=1) # read the full name of the instance - nam_op = opcodes.OpQueryInstances(output_fields=["name"], - names=[instance], use_locking=True) + nam_op = opcodes.OpInstanceQuery(output_fields=["name"], + names=[instance], use_locking=True) full_name = self.ExecOp(False, nam_op)[0][0] if self.opts.iallocator: @@ -645,7 +704,7 @@ class Burner(object): import_log_msg = ("import from %s" " with iallocator %s" % (enode, self.opts.iallocator)) - elif self.opts.disk_template not in constants.DTS_NET_MIRROR: + elif self.opts.disk_template not in constants.DTS_INT_MIRROR: snode = None import_log_msg = ("import from %s to %s" % (enode, pnode)) @@ -653,15 +712,16 @@ class Burner(object): import_log_msg = ("import from %s to %s, %s" % (enode, pnode, snode)) - exp_op = opcodes.OpExportInstance(instance_name=instance, - target_node=enode, - shutdown=True) - rem_op = opcodes.OpRemoveInstance(instance_name=instance, + exp_op = opcodes.OpBackupExport(instance_name=instance, + target_node=enode, + mode=constants.EXPORT_MODE_LOCAL, + shutdown=True) + rem_op = opcodes.OpInstanceRemove(instance_name=instance, ignore_failures=True) - imp_dir = os.path.join(constants.EXPORT_DIR, full_name) - imp_op = opcodes.OpCreateInstance(instance_name=instance, - disks = [ {"size": size} - for size in self.disk_size], + imp_dir = utils.PathJoin(constants.EXPORT_DIR, full_name) + imp_op = opcodes.OpInstanceCreate(instance_name=instance, + disks=[{"size": size} + for size in self.disk_size], disk_template=self.opts.disk_template, nics=self.opts.nics, mode=constants.INSTANCE_IMPORT, @@ -678,30 +738,31 @@ class Burner(object): iallocator=self.opts.iallocator, beparams=self.bep, hvparams=self.hvp, + osparams=self.opts.osparams, ) - erem_op = opcodes.OpRemoveExport(instance_name=instance) + erem_op = opcodes.OpBackupRemove(instance_name=instance) - Log("export to node %s" % enode, indent=2) + Log("export to node %s", enode, indent=2) Log("remove instance", indent=2) Log(import_log_msg, indent=2) Log("remove export", indent=2) - self.ExecOrQueue(instance, exp_op, rem_op, imp_op, erem_op) + self.ExecOrQueue(instance, [exp_op, rem_op, imp_op, erem_op]) @staticmethod def StopInstanceOp(instance): """Stop given instance.""" - return opcodes.OpShutdownInstance(instance_name=instance) + return opcodes.OpInstanceShutdown(instance_name=instance) @staticmethod def StartInstanceOp(instance): """Start given instance.""" - return opcodes.OpStartupInstance(instance_name=instance, force=False) + return opcodes.OpInstanceStartup(instance_name=instance, force=False) @staticmethod def RenameInstanceOp(instance, instance_new): """Rename instance.""" - return opcodes.OpRenameInstance(instance_name=instance, + return opcodes.OpInstanceRename(instance_name=instance, new_name=instance_new) @_DoCheckInstances @@ -710,20 +771,20 @@ class Burner(object): """Stop/start the instances.""" Log("Stopping and starting instances") for instance in self.instances: - Log("instance %s" % instance, indent=1) + Log("instance %s", instance, indent=1) op1 = self.StopInstanceOp(instance) op2 = self.StartInstanceOp(instance) - self.ExecOrQueue(instance, op1, op2) + self.ExecOrQueue(instance, [op1, op2]) @_DoBatch(False) def BurnRemove(self): """Remove the instances.""" Log("Removing instances") for instance in self.to_rem: - Log("instance %s" % instance, indent=1) - op = opcodes.OpRemoveInstance(instance_name=instance, + Log("instance %s", instance, indent=1) + op = opcodes.OpInstanceRemove(instance_name=instance, ignore_failures=True) - self.ExecOrQueue(instance, op) + self.ExecOrQueue(instance, [op]) def BurnRename(self): """Rename the instances. @@ -735,7 +796,7 @@ class Burner(object): Log("Renaming instances") rename = self.opts.rename for instance in self.instances: - Log("instance %s" % instance, indent=1) + Log("instance %s", instance, indent=1) op_stop1 = self.StopInstanceOp(instance) op_stop2 = self.StopInstanceOp(rename) op_rename1 = self.RenameInstanceOp(instance, rename) @@ -753,15 +814,15 @@ class Burner(object): """Reinstall the instances.""" Log("Reinstalling instances") for instance in self.instances: - Log("instance %s" % instance, indent=1) + Log("instance %s", instance, indent=1) op1 = self.StopInstanceOp(instance) - op2 = opcodes.OpReinstallInstance(instance_name=instance) + op2 = opcodes.OpInstanceReinstall(instance_name=instance) Log("reinstall without passing the OS", indent=2) - op3 = opcodes.OpReinstallInstance(instance_name=instance, + op3 = opcodes.OpInstanceReinstall(instance_name=instance, os_type=self.opts.os) Log("reinstall specifying the OS", indent=2) op4 = self.StartInstanceOp(instance) - self.ExecOrQueue(instance, op1, op2, op3, op4) + self.ExecOrQueue(instance, [op1, op2, op3, op4]) @_DoCheckInstances @_DoBatch(True) @@ -769,15 +830,15 @@ class Burner(object): """Reboot the instances.""" Log("Rebooting instances") for instance in self.instances: - Log("instance %s" % instance, indent=1) + Log("instance %s", instance, indent=1) ops = [] - for reboot_type in constants.REBOOT_TYPES: - op = opcodes.OpRebootInstance(instance_name=instance, + for reboot_type in self.opts.reboot_types: + op = opcodes.OpInstanceReboot(instance_name=instance, reboot_type=reboot_type, ignore_secondaries=False) - Log("reboot with type '%s'" % reboot_type, indent=2) + Log("reboot with type '%s'", reboot_type, indent=2) ops.append(op) - self.ExecOrQueue(instance, *ops) # pylint: disable-msg=W0142 + self.ExecOrQueue(instance, ops) @_DoCheckInstances @_DoBatch(True) @@ -785,15 +846,15 @@ class Burner(object): """Activate and deactivate disks of the instances.""" Log("Activating/deactivating disks") for instance in self.instances: - Log("instance %s" % instance, indent=1) + Log("instance %s", instance, indent=1) op_start = self.StartInstanceOp(instance) - op_act = opcodes.OpActivateInstanceDisks(instance_name=instance) - op_deact = opcodes.OpDeactivateInstanceDisks(instance_name=instance) + op_act = opcodes.OpInstanceActivateDisks(instance_name=instance) + op_deact = opcodes.OpInstanceDeactivateDisks(instance_name=instance) op_stop = self.StopInstanceOp(instance) Log("activate disks when online", indent=2) Log("activate disks when offline", indent=2) Log("deactivate disks (when offline)", indent=2) - self.ExecOrQueue(instance, op_act, op_stop, op_act, op_deact, op_start) + self.ExecOrQueue(instance, [op_act, op_stop, op_act, op_deact, op_start]) @_DoCheckInstances @_DoBatch(False) @@ -801,31 +862,88 @@ class Burner(object): """Add and remove an extra disk for the instances.""" Log("Adding and removing disks") for instance in self.instances: - Log("instance %s" % instance, indent=1) - op_add = opcodes.OpSetInstanceParams(\ + Log("instance %s", instance, indent=1) + op_add = opcodes.OpInstanceSetParams(\ instance_name=instance, disks=[(constants.DDM_ADD, {"size": self.disk_size[0]})]) - op_rem = opcodes.OpSetInstanceParams(\ + op_rem = opcodes.OpInstanceSetParams(\ instance_name=instance, disks=[(constants.DDM_REMOVE, {})]) op_stop = self.StopInstanceOp(instance) op_start = self.StartInstanceOp(instance) Log("adding a disk", indent=2) Log("removing last disk", indent=2) - self.ExecOrQueue(instance, op_add, op_stop, op_rem, op_start) + self.ExecOrQueue(instance, [op_add, op_stop, op_rem, op_start]) @_DoBatch(False) def BurnAddRemoveNICs(self): """Add and remove an extra NIC for the instances.""" Log("Adding and removing NICs") for instance in self.instances: - Log("instance %s" % instance, indent=1) - op_add = opcodes.OpSetInstanceParams(\ + Log("instance %s", instance, indent=1) + op_add = opcodes.OpInstanceSetParams(\ instance_name=instance, nics=[(constants.DDM_ADD, {})]) - op_rem = opcodes.OpSetInstanceParams(\ + op_rem = opcodes.OpInstanceSetParams(\ instance_name=instance, nics=[(constants.DDM_REMOVE, {})]) Log("adding a NIC", indent=2) Log("removing last NIC", indent=2) - self.ExecOrQueue(instance, op_add, op_rem) + self.ExecOrQueue(instance, [op_add, op_rem]) + + def ConfdCallback(self, reply): + """Callback for confd queries""" + if reply.type == confd_client.UPCALL_REPLY: + if reply.server_reply.status != constants.CONFD_REPL_STATUS_OK: + Err("Query %s gave non-ok status %s: %s" % (reply.orig_request, + reply.server_reply.status, + reply.server_reply)) + if reply.orig_request.type == constants.CONFD_REQ_PING: + Log("Ping: OK", indent=1) + elif reply.orig_request.type == constants.CONFD_REQ_CLUSTER_MASTER: + if reply.server_reply.answer == self.cluster_info["master"]: + Log("Master: OK", indent=1) + else: + Err("Master: wrong: %s" % reply.server_reply.answer) + elif reply.orig_request.type == constants.CONFD_REQ_NODE_ROLE_BYNAME: + if reply.server_reply.answer == constants.CONFD_NODE_ROLE_MASTER: + Log("Node role for master: OK", indent=1) + else: + Err("Node role for master: wrong: %s" % reply.server_reply.answer) + + def DoConfdRequestReply(self, req): + self.confd_counting_callback.RegisterQuery(req.rsalt) + self.confd_client.SendRequest(req, async=False) + while not self.confd_counting_callback.AllAnswered(): + if not self.confd_client.ReceiveReply(): + Err("Did not receive all expected confd replies") + break + + def BurnConfd(self): + """Run confd queries for our instances. + + The following confd queries are tested: + - CONFD_REQ_PING: simple ping + - CONFD_REQ_CLUSTER_MASTER: cluster master + - CONFD_REQ_NODE_ROLE_BYNAME: node role, for the master + + """ + Log("Checking confd results") + + filter_callback = confd_client.ConfdFilterCallback(self.ConfdCallback) + counting_callback = confd_client.ConfdCountingCallback(filter_callback) + self.confd_counting_callback = counting_callback + + self.confd_client = confd_client.GetConfdClient(counting_callback) + + req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING) + self.DoConfdRequestReply(req) + + req = confd_client.ConfdClientRequest( + type=constants.CONFD_REQ_CLUSTER_MASTER) + self.DoConfdRequestReply(req) + + req = confd_client.ConfdClientRequest( + type=constants.CONFD_REQ_NODE_ROLE_BYNAME, + query=self.cluster_info["master"]) + self.DoConfdRequestReply(req) def _CheckInstanceAlive(self, instance): """Check if an instance is alive by doing http checks. @@ -868,28 +986,36 @@ class Burner(object): if (len(self.nodes) == 1 and opts.disk_template not in (constants.DT_DISKLESS, constants.DT_PLAIN, - constants.DT_FILE)): + constants.DT_FILE, + constants.DT_SHARED_FILE)): Err("When one node is available/selected the disk template must" " be 'diskless', 'file' or 'plain'") has_err = True try: self.BurnCreateInstances() - if opts.do_replace1 and opts.disk_template in constants.DTS_NET_MIRROR: + if opts.do_replace1 and opts.disk_template in constants.DTS_INT_MIRROR: self.BurnReplaceDisks1D8() if (opts.do_replace2 and len(self.nodes) > 2 and - opts.disk_template in constants.DTS_NET_MIRROR) : + opts.disk_template in constants.DTS_INT_MIRROR): self.BurnReplaceDisks2() - if (opts.disk_template != constants.DT_DISKLESS and - utils.any(self.disk_growth, lambda n: n > 0)): + if (opts.disk_template in constants.DTS_GROWABLE and + compat.any(n > 0 for n in self.disk_growth)): self.BurnGrowDisks() - if opts.do_failover and opts.disk_template in constants.DTS_NET_MIRROR: + if opts.do_failover and opts.disk_template in constants.DTS_MIRRORED: self.BurnFailover() - if opts.do_migrate and opts.disk_template == constants.DT_DRBD8: - self.BurnMigrate() + if opts.do_migrate: + if opts.disk_template not in constants.DTS_MIRRORED: + Log("Skipping migration (disk template %s does not support it)", + opts.disk_template) + elif not self.hv_class.CAN_MIGRATE: + Log("Skipping migration (hypervisor %s does not support it)", + self.hypervisor) + else: + self.BurnMigrate() if (opts.do_move and len(self.nodes) > 1 and opts.disk_template in [constants.DT_PLAIN, constants.DT_FILE]): @@ -897,6 +1023,7 @@ class Burner(object): if (opts.do_importexport and opts.disk_template not in (constants.DT_DISKLESS, + constants.DT_SHARED_FILE, constants.DT_FILE)): self.BurnImportExport() @@ -909,8 +1036,14 @@ class Burner(object): if opts.do_addremove_disks: self.BurnAddRemoveDisks() + default_nic_mode = self.cluster_default_nicparams[constants.NIC_MODE] + # Don't add/remove nics in routed mode, as we would need an ip to add + # them with if opts.do_addremove_nics: - self.BurnAddRemoveNICs() + if default_nic_mode == constants.NIC_MODE_BRIDGED: + self.BurnAddRemoveNICs() + else: + Log("Skipping nic add/remove as the cluster is not in bridged mode") if opts.do_activate_disks: self.BurnActivateDisks() @@ -918,6 +1051,9 @@ class Burner(object): if opts.rename: self.BurnRename() + if opts.do_confd_tests: + self.BurnConfd() + if opts.do_startstop: self.BurnStopStart() @@ -930,21 +1066,24 @@ class Burner(object): if not self.opts.keep_instances: try: self.BurnRemove() - except Exception, err: # pylint: disable-msg=W0703 + except Exception, err: # pylint: disable=W0703 if has_err: # already detected errors, so errors in removal # are quite expected - Log("Note: error detected during instance remove: %s" % str(err)) + Log("Note: error detected during instance remove: %s", err) else: # non-expected error raise - return 0 + return constants.EXIT_SUCCESS def main(): - """Main function""" + """Main function. + + """ + utils.SetupLogging(constants.LOG_BURNIN, sys.argv[0], + debug=False, stderr_logging=True) - burner = Burner() - return burner.BurninCluster() + return Burner().BurninCluster() if __name__ == "__main__":