X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/9e32b93bdc08d507998157c15a0fa9ab1478199a..a619a1dd084ee43f5c39c330554d48da08ffc85c:/tools/burnin diff --git a/tools/burnin b/tools/burnin index 395edbf..91aae6b 100755 --- a/tools/burnin +++ b/tools/burnin @@ -23,7 +23,6 @@ """ -import os import sys import optparse import time @@ -37,11 +36,19 @@ from ganeti import constants from ganeti import cli from ganeti import errors from ganeti import utils +from ganeti import ssconf + +from ganeti.confd import client as confd_client USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...") MAX_RETRIES = 3 +LOG_HEADERS = { + 0: "- ", + 1: "* ", + 2: "" + } class InstanceDown(Exception): """The checked instance was not up""" @@ -59,19 +66,18 @@ def Usage(): sys.exit(2) -def Log(msg, indent=0): +def Log(msg, *args, **kwargs): """Simple function that prints out its argument. """ - headers = { - 0: "- ", - 1: "* ", - 2: "" - } + if args: + msg = msg % args + indent = kwargs.get('indent', 0) sys.stdout.write("%*s%s%s\n" % (2*indent, "", - headers.get(indent, " "), msg)) + LOG_HEADERS.get(indent, " "), msg)) sys.stdout.flush() + def Err(msg, exit_code=1): """Simple error logging that prints to stderr. @@ -83,9 +89,12 @@ def Err(msg, exit_code=1): class SimpleOpener(urllib.FancyURLopener): """A simple url opener""" + # pylint: disable-msg=W0221 - def prompt_user_passwd(self, host, realm, clear_cache = 0): + def prompt_user_passwd(self, host, realm, clear_cache=0): """No-interaction version of prompt_user_passwd.""" + # we follow parent class' API + # pylint: disable-msg=W0613 return None, None def http_error_default(self, url, fp, errcode, errmsg, headers): @@ -114,7 +123,11 @@ OPTIONS = [ default=128, type="unit", metavar="", completion_suggest=("128M 256M 512M 1G 4G 8G" " 12G 16G").split()), + cli.DEBUG_OPT, cli.VERBOSE_OPT, + cli.NOIPCHECK_OPT, + cli.NONAMECHECK_OPT, + cli.EARLY_RELEASE_OPT, cli.cli_option("--no-replace1", dest="do_replace1", help="Skip disk replacement with the same secondary", action="store_false", default=True), @@ -154,6 +167,9 @@ OPTIONS = [ cli.cli_option("--no-nics", dest="nics", help="No network interfaces", action="store_const", const=[], default=[{}]), + cli.cli_option("--no-confd", dest="do_confd_tests", + help="Skip confd queries", + action="store_false", default=True), cli.cli_option("--rename", dest="rename", default=None, help=("Give one unused instance name which is taken" " to start the renaming sequence"), @@ -199,6 +215,39 @@ OPTIONS = [ ARGUMENTS = [cli.ArgInstance(min=1)] +def _DoCheckInstances(fn): + """Decorator for checking instances. + + """ + def wrapper(self, *args, **kwargs): + val = fn(self, *args, **kwargs) + for instance in self.instances: + self._CheckInstanceAlive(instance) # pylint: disable-msg=W0212 + return val + + return wrapper + + +def _DoBatch(retry): + """Decorator for possible batch operations. + + Must come after the _DoCheckInstances decorator (if any). + + @param retry: whether this is a retryable batch, will be + passed to StartBatch + + """ + def wrap(fn): + def batched(self, *args, **kwargs): + self.StartBatch(retry) + val = fn(self, *args, **kwargs) + self.CommitQueue() + return val + return batched + + return wrap + + class Burner(object): """Burner class.""" @@ -217,6 +266,7 @@ class Burner(object): self.hvp = self.bep = None self.ParseOptions() self.cl = cli.GetClient() + self.ss = ssconf.SimpleStore() self.GetState() def ClearFeedbackBuf(self): @@ -251,21 +301,26 @@ class Burner(object): try: val = fn(*args) if retry_count > 0 and retry_count < MAX_RETRIES: - Log("Idempotent %s succeeded after %d retries" % - (msg, MAX_RETRIES - retry_count)) + Log("Idempotent %s succeeded after %d retries", + msg, MAX_RETRIES - retry_count) return val - except Exception, err: + except Exception, err: # pylint: disable-msg=W0703 if retry_count == 0: - Log("Non-idempotent %s failed, aborting" % (msg, )) + Log("Non-idempotent %s failed, aborting", msg) raise elif retry_count == 1: - Log("Idempotent %s repeated failure, aborting" % (msg, )) + Log("Idempotent %s repeated failure, aborting", msg) raise else: - Log("Idempotent %s failed, retry #%d/%d: %s" % - (msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err)) + Log("Idempotent %s failed, retry #%d/%d: %s", + msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err) self.MaybeRetry(retry_count - 1, msg, fn, *args) + def _SetDebug(self, ops): + """Set the debug value on the given opcodes""" + for op in ops: + op.debug_level = self.opts.debug + def _ExecOp(self, *ops): """Execute one or more opcodes and manage the exec buffer. @@ -291,11 +346,13 @@ class Burner(object): rval = MAX_RETRIES else: rval = 0 + self._SetDebug(ops) return self.MaybeRetry(rval, "opcode", self._ExecOp, *ops) def ExecOrQueue(self, name, *ops): """Execute an opcode and manage the exec buffer.""" if self.opts.parallel: + self._SetDebug(ops) self.queued_ops.append((ops, name)) else: return self.ExecOp(self.queue_retry, *ops) @@ -335,49 +392,19 @@ class Burner(object): """ self.ClearFeedbackBuf() - job_ids = [cli.SendJob(row[0], cl=self.cl) for row in jobs] - Log("Submitted job ID(s) %s" % ", ".join(job_ids), indent=1) - results = [] - for jid, (_, iname) in zip(job_ids, jobs): - Log("waiting for job %s for %s" % (jid, iname), indent=2) - try: - results.append(cli.PollJob(jid, cl=self.cl, feedback_fn=self.Feedback)) - except Exception, err: - Log("Job for %s failed: %s" % (iname, err)) - if len(results) != len(jobs): + jex = cli.JobExecutor(cl=self.cl, feedback_fn=self.Feedback) + for ops, name in jobs: + jex.QueueJob(name, *ops) # pylint: disable-msg=W0142 + try: + results = jex.GetResults() + except Exception, err: # pylint: disable-msg=W0703 + Log("Jobs failed: %s", err) raise BurninFailure() - return results - - def _DoCheckInstances(fn): - """Decorator for checking instances. - - """ - def wrapper(self, *args, **kwargs): - val = fn(self, *args, **kwargs) - for instance in self.instances: - self._CheckInstanceAlive(instance) - return val - - return wrapper - - def _DoBatch(retry): - """Decorator for possible batch operations. - - Must come after the _DoCheckInstances decorator (if any). - @param retry: whether this is a retryable batch, will be - passed to StartBatch - - """ - def wrap(fn): - def batched(self, *args, **kwargs): - self.StartBatch(retry) - val = fn(self, *args, **kwargs) - self.CommitQueue() - return val - return batched + if utils.any(results, lambda x: not x[0]): + raise BurninFailure() - return wrap + return [i[1] for i in results] def ParseOptions(self): """Parses the command line options. @@ -422,6 +449,9 @@ class Burner(object): if options.nodes and options.iallocator: Err("Give either the nodes option or the iallocator option, not both") + if options.http_check and not options.name_check: + Err("Can't enable HTTP checks without name checks") + self.opts = options self.instances = args self.bep = { @@ -433,7 +463,7 @@ class Burner(object): socket.setdefaulttimeout(options.net_timeout) def GetState(self): - """Read the cluster state from the config.""" + """Read the cluster state from the master daemon.""" if self.opts.nodes: names = self.opts.nodes.split(",") else: @@ -463,6 +493,14 @@ class Burner(object): if not found: Err("OS '%s' not found" % self.opts.os) + cluster_info = self.cl.QueryClusterInfo() + self.cluster_info = cluster_info + if not self.cluster_info: + Err("Can't get cluster info") + + default_nic_params = self.cluster_info["nicparams"][constants.PP_DEFAULT] + self.cluster_default_nicparams = default_nic_params + @_DoCheckInstances @_DoBatch(False) def BurnCreateInstances(self): @@ -476,7 +514,7 @@ class Burner(object): Log("Creating instances") for pnode, snode, instance in mytor: - Log("instance %s" % instance, indent=1) + Log("instance %s", instance, indent=1) if self.opts.iallocator: pnode = snode = None msg = "with iallocator %s" % self.opts.iallocator @@ -498,7 +536,8 @@ class Burner(object): pnode=pnode, snode=snode, start=True, - ip_check=True, + ip_check=self.opts.ip_check, + name_check=self.opts.name_check, wait_for_sync=True, file_driver="loop", file_storage_dir=None, @@ -515,12 +554,12 @@ class Burner(object): """Grow both the os and the swap disks by the requested amount, if any.""" Log("Growing disks") for instance in self.instances: - Log("instance %s" % instance, indent=1) + Log("instance %s", instance, indent=1) for idx, growth in enumerate(self.disk_growth): if growth > 0: op = opcodes.OpGrowDisk(instance_name=instance, disk=idx, amount=growth, wait_for_sync=True) - Log("increase disk/%s by %s MB" % (idx, growth), indent=2) + Log("increase disk/%s by %s MB", idx, growth, indent=2) self.ExecOrQueue(instance, op) @_DoBatch(True) @@ -528,15 +567,16 @@ class Burner(object): """Replace disks on primary and secondary for drbd8.""" Log("Replacing disks on the same nodes") for instance in self.instances: - Log("instance %s" % instance, indent=1) + Log("instance %s", instance, indent=1) ops = [] for mode in constants.REPLACE_DISK_SEC, constants.REPLACE_DISK_PRI: op = opcodes.OpReplaceDisks(instance_name=instance, mode=mode, - disks=[i for i in range(self.disk_count)]) - Log("run %s" % mode, indent=2) + disks=[i for i in range(self.disk_count)], + early_release=self.opts.early_release) + Log("run %s", mode, indent=2) ops.append(op) - self.ExecOrQueue(instance, *ops) + self.ExecOrQueue(instance, *ops) # pylint: disable-msg=W0142 @_DoBatch(True) def BurnReplaceDisks2(self): @@ -547,7 +587,7 @@ class Burner(object): mytor = izip(islice(cycle(self.nodes), 2, None), self.instances) for tnode, instance in mytor: - Log("instance %s" % instance, indent=1) + Log("instance %s", instance, indent=1) if self.opts.iallocator: tnode = None msg = "with iallocator %s" % self.opts.iallocator @@ -557,8 +597,9 @@ class Burner(object): mode=mode, remote_node=tnode, iallocator=self.opts.iallocator, - disks=[]) - Log("run %s %s" % (mode, msg), indent=2) + disks=[], + early_release=self.opts.early_release) + Log("run %s %s", mode, msg, indent=2) self.ExecOrQueue(instance, op) @_DoCheckInstances @@ -567,7 +608,7 @@ class Burner(object): """Failover the instances.""" Log("Failing over instances") for instance in self.instances: - Log("instance %s" % instance, indent=1) + Log("instance %s", instance, indent=1) op = opcodes.OpFailoverInstance(instance_name=instance, ignore_consistency=False) self.ExecOrQueue(instance, op) @@ -580,7 +621,7 @@ class Burner(object): mytor = izip(islice(cycle(self.nodes), 1, None), self.instances) for tnode, instance in mytor: - Log("instance %s" % instance, indent=1) + Log("instance %s", instance, indent=1) op = opcodes.OpMoveInstance(instance_name=instance, target_node=tnode) self.ExecOrQueue(instance, op) @@ -590,7 +631,7 @@ class Burner(object): """Migrate the instances.""" Log("Migrating instances") for instance in self.instances: - Log("instance %s" % instance, indent=1) + Log("instance %s", instance, indent=1) op1 = opcodes.OpMigrateInstance(instance_name=instance, live=True, cleanup=False) @@ -612,7 +653,7 @@ class Burner(object): self.instances) for pnode, snode, enode, instance in mytor: - Log("instance %s" % instance, indent=1) + Log("instance %s", instance, indent=1) # read the full name of the instance nam_op = opcodes.OpQueryInstances(output_fields=["name"], names=[instance], use_locking=True) @@ -636,7 +677,7 @@ class Burner(object): shutdown=True) rem_op = opcodes.OpRemoveInstance(instance_name=instance, ignore_failures=True) - imp_dir = os.path.join(constants.EXPORT_DIR, full_name) + imp_dir = utils.PathJoin(constants.EXPORT_DIR, full_name) imp_op = opcodes.OpCreateInstance(instance_name=instance, disks = [ {"size": size} for size in self.disk_size], @@ -648,7 +689,8 @@ class Burner(object): pnode=pnode, snode=snode, start=True, - ip_check=True, + ip_check=self.opts.ip_check, + name_check=self.opts.name_check, wait_for_sync=True, file_storage_dir=None, file_driver="loop", @@ -659,21 +701,24 @@ class Burner(object): erem_op = opcodes.OpRemoveExport(instance_name=instance) - Log("export to node %s" % enode, indent=2) + Log("export to node %s", enode, indent=2) Log("remove instance", indent=2) Log(import_log_msg, indent=2) Log("remove export", indent=2) self.ExecOrQueue(instance, exp_op, rem_op, imp_op, erem_op) - def StopInstanceOp(self, instance): + @staticmethod + def StopInstanceOp(instance): """Stop given instance.""" return opcodes.OpShutdownInstance(instance_name=instance) - def StartInstanceOp(self, instance): + @staticmethod + def StartInstanceOp(instance): """Start given instance.""" return opcodes.OpStartupInstance(instance_name=instance, force=False) - def RenameInstanceOp(self, instance, instance_new): + @staticmethod + def RenameInstanceOp(instance, instance_new): """Rename instance.""" return opcodes.OpRenameInstance(instance_name=instance, new_name=instance_new) @@ -684,7 +729,7 @@ class Burner(object): """Stop/start the instances.""" Log("Stopping and starting instances") for instance in self.instances: - Log("instance %s" % instance, indent=1) + Log("instance %s", instance, indent=1) op1 = self.StopInstanceOp(instance) op2 = self.StartInstanceOp(instance) self.ExecOrQueue(instance, op1, op2) @@ -694,7 +739,7 @@ class Burner(object): """Remove the instances.""" Log("Removing instances") for instance in self.to_rem: - Log("instance %s" % instance, indent=1) + Log("instance %s", instance, indent=1) op = opcodes.OpRemoveInstance(instance_name=instance, ignore_failures=True) self.ExecOrQueue(instance, op) @@ -709,7 +754,7 @@ class Burner(object): Log("Renaming instances") rename = self.opts.rename for instance in self.instances: - Log("instance %s" % instance, indent=1) + Log("instance %s", instance, indent=1) op_stop1 = self.StopInstanceOp(instance) op_stop2 = self.StopInstanceOp(rename) op_rename1 = self.RenameInstanceOp(instance, rename) @@ -727,7 +772,7 @@ class Burner(object): """Reinstall the instances.""" Log("Reinstalling instances") for instance in self.instances: - Log("instance %s" % instance, indent=1) + Log("instance %s", instance, indent=1) op1 = self.StopInstanceOp(instance) op2 = opcodes.OpReinstallInstance(instance_name=instance) Log("reinstall without passing the OS", indent=2) @@ -743,15 +788,15 @@ class Burner(object): """Reboot the instances.""" Log("Rebooting instances") for instance in self.instances: - Log("instance %s" % instance, indent=1) + Log("instance %s", instance, indent=1) ops = [] for reboot_type in constants.REBOOT_TYPES: op = opcodes.OpRebootInstance(instance_name=instance, reboot_type=reboot_type, ignore_secondaries=False) - Log("reboot with type '%s'" % reboot_type, indent=2) + Log("reboot with type '%s'", reboot_type, indent=2) ops.append(op) - self.ExecOrQueue(instance, *ops) + self.ExecOrQueue(instance, *ops) # pylint: disable-msg=W0142 @_DoCheckInstances @_DoBatch(True) @@ -759,7 +804,7 @@ class Burner(object): """Activate and deactivate disks of the instances.""" Log("Activating/deactivating disks") for instance in self.instances: - Log("instance %s" % instance, indent=1) + Log("instance %s", instance, indent=1) op_start = self.StartInstanceOp(instance) op_act = opcodes.OpActivateInstanceDisks(instance_name=instance) op_deact = opcodes.OpDeactivateInstanceDisks(instance_name=instance) @@ -775,7 +820,7 @@ class Burner(object): """Add and remove an extra disk for the instances.""" Log("Adding and removing disks") for instance in self.instances: - Log("instance %s" % instance, indent=1) + Log("instance %s", instance, indent=1) op_add = opcodes.OpSetInstanceParams(\ instance_name=instance, disks=[(constants.DDM_ADD, {"size": self.disk_size[0]})]) @@ -792,7 +837,7 @@ class Burner(object): """Add and remove an extra NIC for the instances.""" Log("Adding and removing NICs") for instance in self.instances: - Log("instance %s" % instance, indent=1) + Log("instance %s", instance, indent=1) op_add = opcodes.OpSetInstanceParams(\ instance_name=instance, nics=[(constants.DDM_ADD, {})]) op_rem = opcodes.OpSetInstanceParams(\ @@ -801,6 +846,67 @@ class Burner(object): Log("removing last NIC", indent=2) self.ExecOrQueue(instance, op_add, op_rem) + def ConfdCallback(self, reply): + """Callback for confd queries""" + if reply.type == confd_client.UPCALL_REPLY: + if reply.server_reply.status != constants.CONFD_REPL_STATUS_OK: + Err("Query %s gave non-ok status %s: %s" % (reply.orig_request, + reply.server_reply.status, + reply.server_reply)) + if reply.orig_request.type == constants.CONFD_REQ_PING: + Log("Ping: OK", indent=1) + elif reply.orig_request.type == constants.CONFD_REQ_CLUSTER_MASTER: + if reply.server_reply.answer == self.cluster_info["master"]: + Log("Master: OK", indent=1) + else: + Err("Master: wrong: %s" % reply.server_reply.answer) + elif reply.orig_request.type == constants.CONFD_REQ_NODE_ROLE_BYNAME: + if reply.server_reply.answer == constants.CONFD_NODE_ROLE_MASTER: + Log("Node role for master: OK", indent=1) + else: + Err("Node role for master: wrong: %s" % reply.server_reply.answer) + + def DoConfdRequestReply(self, req): + self.confd_counting_callback.RegisterQuery(req.rsalt) + self.confd_client.SendRequest(req, async=False) + while not self.confd_counting_callback.AllAnswered(): + if not self.confd_client.ReceiveReply(): + Err("Did not receive all expected confd replies") + break + + def BurnConfd(self): + """Run confd queries for our instances. + + The following confd queries are tested: + - CONFD_REQ_PING: simple ping + - CONFD_REQ_CLUSTER_MASTER: cluster master + - CONFD_REQ_NODE_ROLE_BYNAME: node role, for the master + + """ + Log("Checking confd results") + + hmac_key = utils.ReadFile(constants.CONFD_HMAC_KEY) + mc_file = self.ss.KeyToFilename(constants.SS_MASTER_CANDIDATES_IPS) + mc_list = utils.ReadFile(mc_file).splitlines() + filter_callback = confd_client.ConfdFilterCallback(self.ConfdCallback) + counting_callback = confd_client.ConfdCountingCallback(filter_callback) + self.confd_counting_callback = counting_callback + + self.confd_client = confd_client.ConfdClient(hmac_key, mc_list, + counting_callback) + + req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING) + self.DoConfdRequestReply(req) + + req = confd_client.ConfdClientRequest( + type=constants.CONFD_REQ_CLUSTER_MASTER) + self.DoConfdRequestReply(req) + + req = confd_client.ConfdClientRequest( + type=constants.CONFD_REQ_NODE_ROLE_BYNAME, + query=self.cluster_info["master"]) + self.DoConfdRequestReply(req) + def _CheckInstanceAlive(self, instance): """Check if an instance is alive by doing http checks. @@ -883,8 +989,14 @@ class Burner(object): if opts.do_addremove_disks: self.BurnAddRemoveDisks() + default_nic_mode = self.cluster_default_nicparams[constants.NIC_MODE] + # Don't add/remove nics in routed mode, as we would need an ip to add + # them with if opts.do_addremove_nics: - self.BurnAddRemoveNICs() + if default_nic_mode == constants.NIC_MODE_BRIDGED: + self.BurnAddRemoveNICs() + else: + Log("Skipping nic add/remove as the cluster is not in bridged mode") if opts.do_activate_disks: self.BurnActivateDisks() @@ -892,6 +1004,9 @@ class Burner(object): if opts.rename: self.BurnRename() + if opts.do_confd_tests: + self.BurnConfd() + if opts.do_startstop: self.BurnStopStart() @@ -904,10 +1019,10 @@ class Burner(object): if not self.opts.keep_instances: try: self.BurnRemove() - except Exception, err: + except Exception, err: # pylint: disable-msg=W0703 if has_err: # already detected errors, so errors in removal # are quite expected - Log("Note: error detected during instance remove: %s" % str(err)) + Log("Note: error detected during instance remove: %s", err) else: # non-expected error raise