+ import_log_msg = ("import from %s to %s, %s" %
+ (enode, pnode, snode))
+
+ exp_op = opcodes.OpExportInstance(instance_name=instance,
+ target_node=enode,
+ shutdown=True)
+ rem_op = opcodes.OpRemoveInstance(instance_name=instance,
+ ignore_failures=True)
+ imp_dir = utils.PathJoin(constants.EXPORT_DIR, full_name)
+ imp_op = opcodes.OpCreateInstance(instance_name=instance,
+ disks = [ {"size": size}
+ for size in self.disk_size],
+ disk_template=self.opts.disk_template,
+ nics=self.opts.nics,
+ mode=constants.INSTANCE_IMPORT,
+ src_node=enode,
+ src_path=imp_dir,
+ pnode=pnode,
+ snode=snode,
+ start=True,
+ ip_check=self.opts.ip_check,
+ name_check=self.opts.name_check,
+ wait_for_sync=True,
+ file_storage_dir=None,
+ file_driver="loop",
+ iallocator=self.opts.iallocator,
+ beparams=self.bep,
+ hvparams=self.hvp,
+ )
+
+ erem_op = opcodes.OpRemoveExport(instance_name=instance)
+
+ Log("export to node %s", enode, indent=2)
+ Log("remove instance", indent=2)
+ Log(import_log_msg, indent=2)
+ Log("remove export", indent=2)
+ self.ExecOrQueue(instance, exp_op, rem_op, imp_op, erem_op)
+
+ @staticmethod
+ def StopInstanceOp(instance):
+ """Stop given instance."""
+ return opcodes.OpShutdownInstance(instance_name=instance)
+
+ @staticmethod
+ def StartInstanceOp(instance):
+ """Start given instance."""
+ return opcodes.OpStartupInstance(instance_name=instance, force=False)
+
+ @staticmethod
+ def RenameInstanceOp(instance, instance_new):
+ """Rename instance."""
+ return opcodes.OpRenameInstance(instance_name=instance,
+ new_name=instance_new)
+
+ @_DoCheckInstances
+ @_DoBatch(True)
+ def BurnStopStart(self):
+ """Stop/start the instances."""
+ Log("Stopping and starting instances")
+ for instance in self.instances:
+ Log("instance %s", instance, indent=1)
+ op1 = self.StopInstanceOp(instance)
+ op2 = self.StartInstanceOp(instance)
+ self.ExecOrQueue(instance, op1, op2)
+
+ @_DoBatch(False)
+ def BurnRemove(self):
+ """Remove the instances."""
+ Log("Removing instances")
+ for instance in self.to_rem:
+ Log("instance %s", instance, indent=1)
+ op = opcodes.OpRemoveInstance(instance_name=instance,
+ ignore_failures=True)
+ self.ExecOrQueue(instance, op)
+
+ def BurnRename(self):
+ """Rename the instances.
+
+ Note that this function will not execute in parallel, since we
+ only have one target for rename.
+
+ """
+ Log("Renaming instances")
+ rename = self.opts.rename
+ for instance in self.instances:
+ Log("instance %s", instance, indent=1)
+ op_stop1 = self.StopInstanceOp(instance)
+ op_stop2 = self.StopInstanceOp(rename)
+ op_rename1 = self.RenameInstanceOp(instance, rename)
+ op_rename2 = self.RenameInstanceOp(rename, instance)
+ op_start1 = self.StartInstanceOp(rename)
+ op_start2 = self.StartInstanceOp(instance)
+ self.ExecOp(False, op_stop1, op_rename1, op_start1)
+ self._CheckInstanceAlive(rename)
+ self.ExecOp(False, op_stop2, op_rename2, op_start2)
+ self._CheckInstanceAlive(instance)
+
+ @_DoCheckInstances
+ @_DoBatch(True)
+ def BurnReinstall(self):
+ """Reinstall the instances."""
+ Log("Reinstalling instances")
+ for instance in self.instances:
+ Log("instance %s", instance, indent=1)
+ op1 = self.StopInstanceOp(instance)
+ op2 = opcodes.OpReinstallInstance(instance_name=instance)
+ Log("reinstall without passing the OS", indent=2)
+ op3 = opcodes.OpReinstallInstance(instance_name=instance,
+ os_type=self.opts.os)
+ Log("reinstall specifying the OS", indent=2)
+ op4 = self.StartInstanceOp(instance)
+ self.ExecOrQueue(instance, op1, op2, op3, op4)
+
+ @_DoCheckInstances
+ @_DoBatch(True)
+ def BurnReboot(self):
+ """Reboot the instances."""
+ Log("Rebooting instances")
+ for instance in self.instances:
+ Log("instance %s", instance, indent=1)
+ ops = []
+ for reboot_type in constants.REBOOT_TYPES:
+ op = opcodes.OpRebootInstance(instance_name=instance,
+ reboot_type=reboot_type,
+ ignore_secondaries=False)
+ Log("reboot with type '%s'", reboot_type, indent=2)
+ ops.append(op)
+ self.ExecOrQueue(instance, *ops) # pylint: disable-msg=W0142
+
+ @_DoCheckInstances
+ @_DoBatch(True)
+ def BurnActivateDisks(self):
+ """Activate and deactivate disks of the instances."""
+ Log("Activating/deactivating disks")
+ for instance in self.instances:
+ Log("instance %s", instance, indent=1)
+ op_start = self.StartInstanceOp(instance)
+ op_act = opcodes.OpActivateInstanceDisks(instance_name=instance)
+ op_deact = opcodes.OpDeactivateInstanceDisks(instance_name=instance)
+ op_stop = self.StopInstanceOp(instance)
+ Log("activate disks when online", indent=2)
+ Log("activate disks when offline", indent=2)
+ Log("deactivate disks (when offline)", indent=2)
+ self.ExecOrQueue(instance, op_act, op_stop, op_act, op_deact, op_start)
+
+ @_DoCheckInstances
+ @_DoBatch(False)
+ def BurnAddRemoveDisks(self):
+ """Add and remove an extra disk for the instances."""
+ Log("Adding and removing disks")
+ for instance in self.instances:
+ Log("instance %s", instance, indent=1)
+ op_add = opcodes.OpSetInstanceParams(\
+ instance_name=instance,
+ disks=[(constants.DDM_ADD, {"size": self.disk_size[0]})])
+ op_rem = opcodes.OpSetInstanceParams(\
+ instance_name=instance, disks=[(constants.DDM_REMOVE, {})])
+ op_stop = self.StopInstanceOp(instance)
+ op_start = self.StartInstanceOp(instance)
+ Log("adding a disk", indent=2)
+ Log("removing last disk", indent=2)
+ self.ExecOrQueue(instance, op_add, op_stop, op_rem, op_start)
+
+ @_DoBatch(False)
+ def BurnAddRemoveNICs(self):
+ """Add and remove an extra NIC for the instances."""
+ Log("Adding and removing NICs")
+ for instance in self.instances:
+ Log("instance %s", instance, indent=1)
+ op_add = opcodes.OpSetInstanceParams(\
+ instance_name=instance, nics=[(constants.DDM_ADD, {})])
+ op_rem = opcodes.OpSetInstanceParams(\
+ instance_name=instance, nics=[(constants.DDM_REMOVE, {})])
+ Log("adding a NIC", indent=2)
+ Log("removing last NIC", indent=2)
+ self.ExecOrQueue(instance, op_add, op_rem)
+
+ def ConfdCallback(self, reply):
+ """Callback for confd queries"""
+ if reply.type == confd_client.UPCALL_REPLY:
+ if reply.server_reply.status != constants.CONFD_REPL_STATUS_OK:
+ Err("Query %s gave non-ok status %s: %s" % (reply.orig_request,
+ reply.server_reply.status,
+ reply.server_reply))
+ if reply.orig_request.type == constants.CONFD_REQ_PING:
+ Log("Ping: OK", indent=1)
+ elif reply.orig_request.type == constants.CONFD_REQ_CLUSTER_MASTER:
+ if reply.server_reply.answer == self.cluster_info["master"]:
+ Log("Master: OK", indent=1)
+ else:
+ Err("Master: wrong: %s" % reply.server_reply.answer)
+ elif reply.orig_request.type == constants.CONFD_REQ_NODE_ROLE_BYNAME:
+ if reply.server_reply.answer == constants.CONFD_NODE_ROLE_MASTER:
+ Log("Node role for master: OK", indent=1)
+ else:
+ Err("Node role for master: wrong: %s" % reply.server_reply.answer)
+
+ def DoConfdRequestReply(self, req):
+ self.confd_counting_callback.RegisterQuery(req.rsalt)
+ self.confd_client.SendRequest(req, async=False)
+ while not self.confd_counting_callback.AllAnswered():
+ if not self.confd_client.ReceiveReply():
+ Err("Did not receive all expected confd replies")
+ break
+
+ def BurnConfd(self):
+ """Run confd queries for our instances.
+
+ The following confd queries are tested:
+ - CONFD_REQ_PING: simple ping
+ - CONFD_REQ_CLUSTER_MASTER: cluster master
+ - CONFD_REQ_NODE_ROLE_BYNAME: node role, for the master
+
+ """
+ Log("Checking confd results")
+
+ hmac_key = utils.ReadFile(constants.CONFD_HMAC_KEY)
+ mc_file = self.ss.KeyToFilename(constants.SS_MASTER_CANDIDATES_IPS)
+ mc_list = utils.ReadFile(mc_file).splitlines()
+ filter_callback = confd_client.ConfdFilterCallback(self.ConfdCallback)
+ counting_callback = confd_client.ConfdCountingCallback(filter_callback)
+ self.confd_counting_callback = counting_callback
+
+ self.confd_client = confd_client.ConfdClient(hmac_key, mc_list,
+ counting_callback)
+
+ req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
+ self.DoConfdRequestReply(req)
+
+ req = confd_client.ConfdClientRequest(
+ type=constants.CONFD_REQ_CLUSTER_MASTER)
+ self.DoConfdRequestReply(req)
+
+ req = confd_client.ConfdClientRequest(
+ type=constants.CONFD_REQ_NODE_ROLE_BYNAME,
+ query=self.cluster_info["master"])
+ self.DoConfdRequestReply(req)
+
+ def _CheckInstanceAlive(self, instance):
+ """Check if an instance is alive by doing http checks.
+
+ This will try to retrieve the url on the instance /hostname.txt
+ and check that it contains the hostname of the instance. In case
+ we get ECONNREFUSED, we retry up to the net timeout seconds, for
+ any other error we abort.
+
+ """
+ if not self.opts.http_check:
+ return
+ end_time = time.time() + self.opts.net_timeout
+ url = None
+ while time.time() < end_time and url is None:
+ try:
+ url = self.url_opener.open("http://%s/hostname.txt" % instance)
+ except IOError:
+ # here we can have connection refused, no route to host, etc.
+ time.sleep(1)
+ if url is None:
+ raise InstanceDown(instance, "Cannot contact instance")
+ hostname = url.read().strip()
+ url.close()
+ if hostname != instance:
+ raise InstanceDown(instance, ("Hostname mismatch, expected %s, got %s" %
+ (instance, hostname)))
+
+ def BurninCluster(self):
+ """Test a cluster intensively.
+
+ This will create instances and then start/stop/failover them.
+ It is safe for existing instances but could impact performance.
+
+ """
+
+ opts = self.opts
+
+ Log("Testing global parameters")
+
+ if (len(self.nodes) == 1 and
+ opts.disk_template not in (constants.DT_DISKLESS, constants.DT_PLAIN,
+ constants.DT_FILE)):
+ Err("When one node is available/selected the disk template must"
+ " be 'diskless', 'file' or 'plain'")
+
+ has_err = True
+ try:
+ self.BurnCreateInstances()
+ if opts.do_replace1 and opts.disk_template in constants.DTS_NET_MIRROR:
+ self.BurnReplaceDisks1D8()
+ if (opts.do_replace2 and len(self.nodes) > 2 and
+ opts.disk_template in constants.DTS_NET_MIRROR) :
+ self.BurnReplaceDisks2()
+
+ if (opts.disk_template != constants.DT_DISKLESS and
+ utils.any(self.disk_growth, lambda n: n > 0)):
+ self.BurnGrowDisks()
+
+ if opts.do_failover and opts.disk_template in constants.DTS_NET_MIRROR:
+ self.BurnFailover()
+
+ if opts.do_migrate and opts.disk_template == constants.DT_DRBD8:
+ self.BurnMigrate()
+
+ if (opts.do_move and len(self.nodes) > 1 and
+ opts.disk_template in [constants.DT_PLAIN, constants.DT_FILE]):
+ self.BurnMove()
+
+ if (opts.do_importexport and
+ opts.disk_template not in (constants.DT_DISKLESS,
+ constants.DT_FILE)):
+ self.BurnImportExport()
+
+ if opts.do_reinstall:
+ self.BurnReinstall()
+
+ if opts.do_reboot:
+ self.BurnReboot()
+
+ if opts.do_addremove_disks:
+ self.BurnAddRemoveDisks()
+
+ default_nic_mode = self.cluster_default_nicparams[constants.NIC_MODE]
+ # Don't add/remove nics in routed mode, as we would need an ip to add
+ # them with
+ if opts.do_addremove_nics:
+ if default_nic_mode == constants.NIC_MODE_BRIDGED:
+ self.BurnAddRemoveNICs()
+ else:
+ Log("Skipping nic add/remove as the cluster is not in bridged mode")
+
+ if opts.do_activate_disks:
+ self.BurnActivateDisks()
+
+ if opts.rename:
+ self.BurnRename()
+
+ if opts.do_confd_tests:
+ self.BurnConfd()
+
+ if opts.do_startstop:
+ self.BurnStopStart()
+
+ has_err = False
+ finally:
+ if has_err:
+ Log("Error detected: opcode buffer follows:\n\n")
+ Log(self.GetFeedbackBuf())
+ Log("\n\n")
+ if not self.opts.keep_instances:
+ try:
+ self.BurnRemove()
+ except Exception, err: # pylint: disable-msg=W0703
+ if has_err: # already detected errors, so errors in removal
+ # are quite expected
+ Log("Note: error detected during instance remove: %s", err)
+ else: # non-expected error
+ raise
+
+ return 0
+