4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
31 from itertools import izip, islice, cycle
32 from cStringIO import StringIO
34 from ganeti import opcodes
35 from ganeti import constants
36 from ganeti import cli
37 from ganeti import errors
38 from ganeti import utils
39 from ganeti import hypervisor
40 from ganeti import compat
42 from ganeti.confd import client as confd_client
45 USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...")
54 class InstanceDown(Exception):
55 """The checked instance was not up"""
58 class BurninFailure(Exception):
59 """Failure detected during burning"""
63 """Shows program usage information and exits the program."""
65 print >> sys.stderr, "Usage:"
66 print >> sys.stderr, USAGE
70 def Log(msg, *args, **kwargs):
71 """Simple function that prints out its argument.
76 indent = kwargs.get('indent', 0)
77 sys.stdout.write("%*s%s%s\n" % (2*indent, "",
78 LOG_HEADERS.get(indent, " "), msg))
82 def Err(msg, exit_code=1):
83 """Simple error logging that prints to stderr.
86 sys.stderr.write(msg + "\n")
91 class SimpleOpener(urllib.FancyURLopener):
92 """A simple url opener"""
93 # pylint: disable-msg=W0221
95 def prompt_user_passwd(self, host, realm, clear_cache=0):
96 """No-interaction version of prompt_user_passwd."""
97 # we follow parent class' API
98 # pylint: disable-msg=W0613
101 def http_error_default(self, url, fp, errcode, errmsg, headers):
102 """Custom error handling"""
103 # make sure sockets are not left in CLOSE_WAIT, this is similar
104 # but with a different exception to the BasicURLOpener class
105 _ = fp.read() # throw away data
107 raise InstanceDown("HTTP error returned: code %s, msg %s" %
112 cli.cli_option("-o", "--os", dest="os", default=None,
113 help="OS to use during burnin",
115 completion_suggest=cli.OPT_COMPL_ONE_OS),
117 cli.cli_option("--disk-size", dest="disk_size",
118 help="Disk size (determines disk count)",
119 default="128m", type="string", metavar="<size,size,...>",
120 completion_suggest=("128M 512M 1G 4G 1G,256M"
121 " 4G,1G,1G 10G").split()),
122 cli.cli_option("--disk-growth", dest="disk_growth", help="Disk growth",
123 default="128m", type="string", metavar="<size,size,...>"),
124 cli.cli_option("--mem-size", dest="mem_size", help="Memory size",
125 default=128, type="unit", metavar="<size>",
126 completion_suggest=("128M 256M 512M 1G 4G 8G"
127 " 12G 16G").split()),
132 cli.EARLY_RELEASE_OPT,
133 cli.cli_option("--no-replace1", dest="do_replace1",
134 help="Skip disk replacement with the same secondary",
135 action="store_false", default=True),
136 cli.cli_option("--no-replace2", dest="do_replace2",
137 help="Skip disk replacement with a different secondary",
138 action="store_false", default=True),
139 cli.cli_option("--no-failover", dest="do_failover",
140 help="Skip instance failovers", action="store_false",
142 cli.cli_option("--no-migrate", dest="do_migrate",
143 help="Skip instance live migration",
144 action="store_false", default=True),
145 cli.cli_option("--no-move", dest="do_move",
146 help="Skip instance moves", action="store_false",
148 cli.cli_option("--no-importexport", dest="do_importexport",
149 help="Skip instance export/import", action="store_false",
151 cli.cli_option("--no-startstop", dest="do_startstop",
152 help="Skip instance stop/start", action="store_false",
154 cli.cli_option("--no-reinstall", dest="do_reinstall",
155 help="Skip instance reinstall", action="store_false",
157 cli.cli_option("--no-reboot", dest="do_reboot",
158 help="Skip instance reboot", action="store_false",
160 cli.cli_option("--no-activate-disks", dest="do_activate_disks",
161 help="Skip disk activation/deactivation",
162 action="store_false", default=True),
163 cli.cli_option("--no-add-disks", dest="do_addremove_disks",
164 help="Skip disk addition/removal",
165 action="store_false", default=True),
166 cli.cli_option("--no-add-nics", dest="do_addremove_nics",
167 help="Skip NIC addition/removal",
168 action="store_false", default=True),
169 cli.cli_option("--no-nics", dest="nics",
170 help="No network interfaces", action="store_const",
171 const=[], default=[{}]),
172 cli.cli_option("--no-confd", dest="do_confd_tests",
173 help="Skip confd queries",
174 action="store_false", default=True),
175 cli.cli_option("--rename", dest="rename", default=None,
176 help=("Give one unused instance name which is taken"
177 " to start the renaming sequence"),
178 metavar="<instance_name>"),
179 cli.cli_option("-t", "--disk-template", dest="disk_template",
180 choices=list(constants.DISK_TEMPLATES),
181 default=constants.DT_DRBD8,
182 help="Disk template (diskless, file, plain or drbd) [drbd]"),
183 cli.cli_option("-n", "--nodes", dest="nodes", default="",
184 help=("Comma separated list of nodes to perform"
185 " the burnin on (defaults to all nodes)"),
186 completion_suggest=cli.OPT_COMPL_MANY_NODES),
187 cli.cli_option("-I", "--iallocator", dest="iallocator",
188 default=None, type="string",
189 help=("Perform the allocation using an iallocator"
190 " instead of fixed node spread (node restrictions no"
191 " longer apply, therefore -n/--nodes must not be"
193 completion_suggest=cli.OPT_COMPL_ONE_IALLOCATOR),
194 cli.cli_option("-p", "--parallel", default=False, action="store_true",
196 help=("Enable parallelization of some operations in"
197 " order to speed burnin or to test granular locking")),
198 cli.cli_option("--net-timeout", default=15, type="int",
200 help=("The instance check network timeout in seconds"
201 " (defaults to 15 seconds)"),
202 completion_suggest="15 60 300 900".split()),
203 cli.cli_option("-C", "--http-check", default=False, action="store_true",
205 help=("Enable checking of instance status via http,"
206 " looking for /hostname.txt that should contain the"
207 " name of the instance")),
208 cli.cli_option("-K", "--keep-instances", default=False,
210 dest="keep_instances",
211 help=("Leave instances on the cluster after burnin,"
212 " for investigation in case of errors or simply"
216 # Mainly used for bash completion
217 ARGUMENTS = [cli.ArgInstance(min=1)]
220 def _DoCheckInstances(fn):
221 """Decorator for checking instances.
224 def wrapper(self, *args, **kwargs):
225 val = fn(self, *args, **kwargs)
226 for instance in self.instances:
227 self._CheckInstanceAlive(instance) # pylint: disable-msg=W0212
234 """Decorator for possible batch operations.
236 Must come after the _DoCheckInstances decorator (if any).
238 @param retry: whether this is a retryable batch, will be
243 def batched(self, *args, **kwargs):
244 self.StartBatch(retry)
245 val = fn(self, *args, **kwargs)
253 class Burner(object):
258 utils.SetupLogging(constants.LOG_BURNIN, debug=False, stderr_logging=True)
259 self.url_opener = SimpleOpener()
260 self._feed_buf = StringIO()
266 self.queue_retry = False
267 self.disk_count = self.disk_growth = self.disk_size = None
268 self.hvp = self.bep = None
270 self.cl = cli.GetClient()
273 def ClearFeedbackBuf(self):
274 """Clear the feedback buffer."""
275 self._feed_buf.truncate(0)
277 def GetFeedbackBuf(self):
278 """Return the contents of the buffer."""
279 return self._feed_buf.getvalue()
281 def Feedback(self, msg):
282 """Acumulate feedback in our buffer."""
283 formatted_msg = "%s %s" % (time.ctime(utils.MergeTime(msg[0])), msg[2])
284 self._feed_buf.write(formatted_msg + "\n")
285 if self.opts.verbose:
286 Log(formatted_msg, indent=3)
288 def MaybeRetry(self, retry_count, msg, fn, *args):
289 """Possibly retry a given function execution.
291 @type retry_count: int
292 @param retry_count: retry counter:
293 - 0: non-retryable action
294 - 1: last retry for a retryable action
295 - MAX_RETRIES: original try for a retryable action
297 @param msg: the kind of the operation
299 @param fn: the function to be called
304 if retry_count > 0 and retry_count < MAX_RETRIES:
305 Log("Idempotent %s succeeded after %d retries",
306 msg, MAX_RETRIES - retry_count)
308 except Exception, err: # pylint: disable-msg=W0703
310 Log("Non-idempotent %s failed, aborting", msg)
312 elif retry_count == 1:
313 Log("Idempotent %s repeated failure, aborting", msg)
316 Log("Idempotent %s failed, retry #%d/%d: %s",
317 msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err)
318 self.MaybeRetry(retry_count - 1, msg, fn, *args)
320 def _SetDebug(self, ops):
321 """Set the debug value on the given opcodes"""
323 op.debug_level = self.opts.debug
325 def _ExecOp(self, *ops):
326 """Execute one or more opcodes and manage the exec buffer.
328 @return: if only opcode has been passed, we return its result;
329 otherwise we return the list of results
332 job_id = cli.SendJob(ops, cl=self.cl)
333 results = cli.PollJob(job_id, cl=self.cl, feedback_fn=self.Feedback)
339 def ExecOp(self, retry, *ops):
340 """Execute one or more opcodes and manage the exec buffer.
342 @return: if only opcode has been passed, we return its result;
343 otherwise we return the list of results
351 return self.MaybeRetry(rval, "opcode", self._ExecOp, *ops)
353 def ExecOrQueue(self, name, ops, post_process=None):
354 """Execute an opcode and manage the exec buffer."""
355 if self.opts.parallel:
357 self.queued_ops.append((ops, name, post_process))
359 val = self.ExecOp(self.queue_retry, *ops) # pylint: disable-msg=W0142
360 if post_process is not None:
364 def StartBatch(self, retry):
365 """Start a new batch of jobs.
367 @param retry: whether this is a retryable batch
371 self.queue_retry = retry
373 def CommitQueue(self):
374 """Execute all submitted opcodes in case of parallel burnin"""
375 if not self.opts.parallel:
384 results = self.MaybeRetry(rval, "jobset", self.ExecJobSet,
390 def ExecJobSet(self, jobs):
391 """Execute a set of jobs and return once all are done.
393 The method will return the list of results, if all jobs are
394 successful. Otherwise, OpExecError will be raised from within
398 self.ClearFeedbackBuf()
399 jex = cli.JobExecutor(cl=self.cl, feedback_fn=self.Feedback)
400 for ops, name, _ in jobs:
401 jex.QueueJob(name, *ops) # pylint: disable-msg=W0142
403 results = jex.GetResults()
404 except Exception, err: # pylint: disable-msg=W0703
405 Log("Jobs failed: %s", err)
406 raise BurninFailure()
410 for (_, name, post_process), (success, result) in zip(jobs, results):
415 except Exception, err: # pylint: disable-msg=W0703
416 Log("Post process call for job %s failed: %s", name, err)
423 raise BurninFailure()
427 def ParseOptions(self):
428 """Parses the command line options.
430 In case of command line errors, it will show the usage and exit the
434 parser = optparse.OptionParser(usage="\n%s" % USAGE,
435 version=("%%prog (ganeti) %s" %
436 constants.RELEASE_VERSION),
439 options, args = parser.parse_args()
440 if len(args) < 1 or options.os is None:
443 supported_disk_templates = (constants.DT_DISKLESS,
447 if options.disk_template not in supported_disk_templates:
448 Err("Unknown disk template '%s'" % options.disk_template)
450 if options.disk_template == constants.DT_DISKLESS:
451 disk_size = disk_growth = []
452 options.do_addremove_disks = False
454 disk_size = [utils.ParseUnit(v) for v in options.disk_size.split(",")]
455 disk_growth = [utils.ParseUnit(v)
456 for v in options.disk_growth.split(",")]
457 if len(disk_growth) != len(disk_size):
458 Err("Wrong disk sizes/growth combination")
459 if ((disk_size and options.disk_template == constants.DT_DISKLESS) or
460 (not disk_size and options.disk_template != constants.DT_DISKLESS)):
461 Err("Wrong disk count/disk template combination")
463 self.disk_size = disk_size
464 self.disk_growth = disk_growth
465 self.disk_count = len(disk_size)
467 if options.nodes and options.iallocator:
468 Err("Give either the nodes option or the iallocator option, not both")
470 if options.http_check and not options.name_check:
471 Err("Can't enable HTTP checks without name checks")
474 self.instances = args
476 constants.BE_MEMORY: options.mem_size,
477 constants.BE_VCPUS: 1,
480 self.hypervisor = None
482 if options.hypervisor:
483 self.hypervisor, self.hvp = options.hypervisor
485 socket.setdefaulttimeout(options.net_timeout)
488 """Read the cluster state from the master daemon."""
490 names = self.opts.nodes.split(",")
494 op = opcodes.OpQueryNodes(output_fields=["name", "offline", "drained"],
495 names=names, use_locking=True)
496 result = self.ExecOp(True, op)
497 except errors.GenericError, err:
498 err_code, msg = cli.FormatError(err)
499 Err(msg, exit_code=err_code)
500 self.nodes = [data[0] for data in result if not (data[1] or data[2])]
502 op_diagnose = opcodes.OpDiagnoseOS(output_fields=["name", "valid",
503 "variants"], names=[])
504 result = self.ExecOp(True, op_diagnose)
507 Err("Can't get the OS list")
510 for (name, valid, variants) in result:
511 if valid and self.opts.os in cli.CalculateOSNames(name, variants):
516 Err("OS '%s' not found" % self.opts.os)
518 cluster_info = self.cl.QueryClusterInfo()
519 self.cluster_info = cluster_info
520 if not self.cluster_info:
521 Err("Can't get cluster info")
523 default_nic_params = self.cluster_info["nicparams"][constants.PP_DEFAULT]
524 self.cluster_default_nicparams = default_nic_params
525 if self.hypervisor is None:
526 self.hypervisor = self.cluster_info["default_hypervisor"]
527 self.hv_class = hypervisor.GetHypervisorClass(self.hypervisor)
531 def BurnCreateInstances(self):
532 """Create the given instances.
536 mytor = izip(cycle(self.nodes),
537 islice(cycle(self.nodes), 1, None),
540 Log("Creating instances")
541 for pnode, snode, instance in mytor:
542 Log("instance %s", instance, indent=1)
543 if self.opts.iallocator:
545 msg = "with iallocator %s" % self.opts.iallocator
546 elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
548 msg = "on %s" % pnode
550 msg = "on %s, %s" % (pnode, snode)
554 op = opcodes.OpCreateInstance(instance_name=instance,
555 disks = [ {"size": size}
556 for size in self.disk_size],
557 disk_template=self.opts.disk_template,
559 mode=constants.INSTANCE_CREATE,
560 os_type=self.opts.os,
564 ip_check=self.opts.ip_check,
565 name_check=self.opts.name_check,
568 file_storage_dir=None,
569 iallocator=self.opts.iallocator,
572 hypervisor=self.hypervisor,
574 remove_instance = lambda name: lambda: self.to_rem.append(name)
575 self.ExecOrQueue(instance, [op], post_process=remove_instance(instance))
578 def BurnGrowDisks(self):
579 """Grow both the os and the swap disks by the requested amount, if any."""
581 for instance in self.instances:
582 Log("instance %s", instance, indent=1)
583 for idx, growth in enumerate(self.disk_growth):
585 op = opcodes.OpGrowDisk(instance_name=instance, disk=idx,
586 amount=growth, wait_for_sync=True)
587 Log("increase disk/%s by %s MB", idx, growth, indent=2)
588 self.ExecOrQueue(instance, [op])
591 def BurnReplaceDisks1D8(self):
592 """Replace disks on primary and secondary for drbd8."""
593 Log("Replacing disks on the same nodes")
594 for instance in self.instances:
595 Log("instance %s", instance, indent=1)
597 for mode in constants.REPLACE_DISK_SEC, constants.REPLACE_DISK_PRI:
598 op = opcodes.OpReplaceDisks(instance_name=instance,
600 disks=[i for i in range(self.disk_count)],
601 early_release=self.opts.early_release)
602 Log("run %s", mode, indent=2)
604 self.ExecOrQueue(instance, ops)
607 def BurnReplaceDisks2(self):
608 """Replace secondary node."""
609 Log("Changing the secondary node")
610 mode = constants.REPLACE_DISK_CHG
612 mytor = izip(islice(cycle(self.nodes), 2, None),
614 for tnode, instance in mytor:
615 Log("instance %s", instance, indent=1)
616 if self.opts.iallocator:
618 msg = "with iallocator %s" % self.opts.iallocator
621 op = opcodes.OpReplaceDisks(instance_name=instance,
624 iallocator=self.opts.iallocator,
626 early_release=self.opts.early_release)
627 Log("run %s %s", mode, msg, indent=2)
628 self.ExecOrQueue(instance, [op])
632 def BurnFailover(self):
633 """Failover the instances."""
634 Log("Failing over instances")
635 for instance in self.instances:
636 Log("instance %s", instance, indent=1)
637 op = opcodes.OpFailoverInstance(instance_name=instance,
638 ignore_consistency=False)
639 self.ExecOrQueue(instance, [op])
644 """Move the instances."""
645 Log("Moving instances")
646 mytor = izip(islice(cycle(self.nodes), 1, None),
648 for tnode, instance in mytor:
649 Log("instance %s", instance, indent=1)
650 op = opcodes.OpMoveInstance(instance_name=instance,
652 self.ExecOrQueue(instance, [op])
655 def BurnMigrate(self):
656 """Migrate the instances."""
657 Log("Migrating instances")
658 for instance in self.instances:
659 Log("instance %s", instance, indent=1)
660 op1 = opcodes.OpMigrateInstance(instance_name=instance, live=True,
663 op2 = opcodes.OpMigrateInstance(instance_name=instance, live=True,
665 Log("migration and migration cleanup", indent=2)
666 self.ExecOrQueue(instance, [op1, op2])
670 def BurnImportExport(self):
671 """Export the instance, delete it, and import it back.
674 Log("Exporting and re-importing instances")
675 mytor = izip(cycle(self.nodes),
676 islice(cycle(self.nodes), 1, None),
677 islice(cycle(self.nodes), 2, None),
680 for pnode, snode, enode, instance in mytor:
681 Log("instance %s", instance, indent=1)
682 # read the full name of the instance
683 nam_op = opcodes.OpQueryInstances(output_fields=["name"],
684 names=[instance], use_locking=True)
685 full_name = self.ExecOp(False, nam_op)[0][0]
687 if self.opts.iallocator:
689 import_log_msg = ("import from %s"
690 " with iallocator %s" %
691 (enode, self.opts.iallocator))
692 elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
694 import_log_msg = ("import from %s to %s" %
697 import_log_msg = ("import from %s to %s, %s" %
698 (enode, pnode, snode))
700 exp_op = opcodes.OpExportInstance(instance_name=instance,
703 rem_op = opcodes.OpRemoveInstance(instance_name=instance,
704 ignore_failures=True)
705 imp_dir = utils.PathJoin(constants.EXPORT_DIR, full_name)
706 imp_op = opcodes.OpCreateInstance(instance_name=instance,
707 disks = [ {"size": size}
708 for size in self.disk_size],
709 disk_template=self.opts.disk_template,
711 mode=constants.INSTANCE_IMPORT,
717 ip_check=self.opts.ip_check,
718 name_check=self.opts.name_check,
720 file_storage_dir=None,
722 iallocator=self.opts.iallocator,
727 erem_op = opcodes.OpRemoveExport(instance_name=instance)
729 Log("export to node %s", enode, indent=2)
730 Log("remove instance", indent=2)
731 Log(import_log_msg, indent=2)
732 Log("remove export", indent=2)
733 self.ExecOrQueue(instance, [exp_op, rem_op, imp_op, erem_op])
736 def StopInstanceOp(instance):
737 """Stop given instance."""
738 return opcodes.OpShutdownInstance(instance_name=instance)
741 def StartInstanceOp(instance):
742 """Start given instance."""
743 return opcodes.OpStartupInstance(instance_name=instance, force=False)
746 def RenameInstanceOp(instance, instance_new):
747 """Rename instance."""
748 return opcodes.OpRenameInstance(instance_name=instance,
749 new_name=instance_new)
753 def BurnStopStart(self):
754 """Stop/start the instances."""
755 Log("Stopping and starting instances")
756 for instance in self.instances:
757 Log("instance %s", instance, indent=1)
758 op1 = self.StopInstanceOp(instance)
759 op2 = self.StartInstanceOp(instance)
760 self.ExecOrQueue(instance, [op1, op2])
763 def BurnRemove(self):
764 """Remove the instances."""
765 Log("Removing instances")
766 for instance in self.to_rem:
767 Log("instance %s", instance, indent=1)
768 op = opcodes.OpRemoveInstance(instance_name=instance,
769 ignore_failures=True)
770 self.ExecOrQueue(instance, [op])
772 def BurnRename(self):
773 """Rename the instances.
775 Note that this function will not execute in parallel, since we
776 only have one target for rename.
779 Log("Renaming instances")
780 rename = self.opts.rename
781 for instance in self.instances:
782 Log("instance %s", instance, indent=1)
783 op_stop1 = self.StopInstanceOp(instance)
784 op_stop2 = self.StopInstanceOp(rename)
785 op_rename1 = self.RenameInstanceOp(instance, rename)
786 op_rename2 = self.RenameInstanceOp(rename, instance)
787 op_start1 = self.StartInstanceOp(rename)
788 op_start2 = self.StartInstanceOp(instance)
789 self.ExecOp(False, op_stop1, op_rename1, op_start1)
790 self._CheckInstanceAlive(rename)
791 self.ExecOp(False, op_stop2, op_rename2, op_start2)
792 self._CheckInstanceAlive(instance)
796 def BurnReinstall(self):
797 """Reinstall the instances."""
798 Log("Reinstalling instances")
799 for instance in self.instances:
800 Log("instance %s", instance, indent=1)
801 op1 = self.StopInstanceOp(instance)
802 op2 = opcodes.OpReinstallInstance(instance_name=instance)
803 Log("reinstall without passing the OS", indent=2)
804 op3 = opcodes.OpReinstallInstance(instance_name=instance,
805 os_type=self.opts.os)
806 Log("reinstall specifying the OS", indent=2)
807 op4 = self.StartInstanceOp(instance)
808 self.ExecOrQueue(instance, [op1, op2, op3, op4])
812 def BurnReboot(self):
813 """Reboot the instances."""
814 Log("Rebooting instances")
815 for instance in self.instances:
816 Log("instance %s", instance, indent=1)
818 for reboot_type in constants.REBOOT_TYPES:
819 op = opcodes.OpRebootInstance(instance_name=instance,
820 reboot_type=reboot_type,
821 ignore_secondaries=False)
822 Log("reboot with type '%s'", reboot_type, indent=2)
824 self.ExecOrQueue(instance, ops)
828 def BurnActivateDisks(self):
829 """Activate and deactivate disks of the instances."""
830 Log("Activating/deactivating disks")
831 for instance in self.instances:
832 Log("instance %s", instance, indent=1)
833 op_start = self.StartInstanceOp(instance)
834 op_act = opcodes.OpActivateInstanceDisks(instance_name=instance)
835 op_deact = opcodes.OpDeactivateInstanceDisks(instance_name=instance)
836 op_stop = self.StopInstanceOp(instance)
837 Log("activate disks when online", indent=2)
838 Log("activate disks when offline", indent=2)
839 Log("deactivate disks (when offline)", indent=2)
840 self.ExecOrQueue(instance, [op_act, op_stop, op_act, op_deact, op_start])
844 def BurnAddRemoveDisks(self):
845 """Add and remove an extra disk for the instances."""
846 Log("Adding and removing disks")
847 for instance in self.instances:
848 Log("instance %s", instance, indent=1)
849 op_add = opcodes.OpSetInstanceParams(\
850 instance_name=instance,
851 disks=[(constants.DDM_ADD, {"size": self.disk_size[0]})])
852 op_rem = opcodes.OpSetInstanceParams(\
853 instance_name=instance, disks=[(constants.DDM_REMOVE, {})])
854 op_stop = self.StopInstanceOp(instance)
855 op_start = self.StartInstanceOp(instance)
856 Log("adding a disk", indent=2)
857 Log("removing last disk", indent=2)
858 self.ExecOrQueue(instance, [op_add, op_stop, op_rem, op_start])
861 def BurnAddRemoveNICs(self):
862 """Add and remove an extra NIC for the instances."""
863 Log("Adding and removing NICs")
864 for instance in self.instances:
865 Log("instance %s", instance, indent=1)
866 op_add = opcodes.OpSetInstanceParams(\
867 instance_name=instance, nics=[(constants.DDM_ADD, {})])
868 op_rem = opcodes.OpSetInstanceParams(\
869 instance_name=instance, nics=[(constants.DDM_REMOVE, {})])
870 Log("adding a NIC", indent=2)
871 Log("removing last NIC", indent=2)
872 self.ExecOrQueue(instance, [op_add, op_rem])
874 def ConfdCallback(self, reply):
875 """Callback for confd queries"""
876 if reply.type == confd_client.UPCALL_REPLY:
877 if reply.server_reply.status != constants.CONFD_REPL_STATUS_OK:
878 Err("Query %s gave non-ok status %s: %s" % (reply.orig_request,
879 reply.server_reply.status,
881 if reply.orig_request.type == constants.CONFD_REQ_PING:
882 Log("Ping: OK", indent=1)
883 elif reply.orig_request.type == constants.CONFD_REQ_CLUSTER_MASTER:
884 if reply.server_reply.answer == self.cluster_info["master"]:
885 Log("Master: OK", indent=1)
887 Err("Master: wrong: %s" % reply.server_reply.answer)
888 elif reply.orig_request.type == constants.CONFD_REQ_NODE_ROLE_BYNAME:
889 if reply.server_reply.answer == constants.CONFD_NODE_ROLE_MASTER:
890 Log("Node role for master: OK", indent=1)
892 Err("Node role for master: wrong: %s" % reply.server_reply.answer)
894 def DoConfdRequestReply(self, req):
895 self.confd_counting_callback.RegisterQuery(req.rsalt)
896 self.confd_client.SendRequest(req, async=False)
897 while not self.confd_counting_callback.AllAnswered():
898 if not self.confd_client.ReceiveReply():
899 Err("Did not receive all expected confd replies")
903 """Run confd queries for our instances.
905 The following confd queries are tested:
906 - CONFD_REQ_PING: simple ping
907 - CONFD_REQ_CLUSTER_MASTER: cluster master
908 - CONFD_REQ_NODE_ROLE_BYNAME: node role, for the master
911 Log("Checking confd results")
913 filter_callback = confd_client.ConfdFilterCallback(self.ConfdCallback)
914 counting_callback = confd_client.ConfdCountingCallback(filter_callback)
915 self.confd_counting_callback = counting_callback
917 self.confd_client = confd_client.GetConfdClient(counting_callback)
919 req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
920 self.DoConfdRequestReply(req)
922 req = confd_client.ConfdClientRequest(
923 type=constants.CONFD_REQ_CLUSTER_MASTER)
924 self.DoConfdRequestReply(req)
926 req = confd_client.ConfdClientRequest(
927 type=constants.CONFD_REQ_NODE_ROLE_BYNAME,
928 query=self.cluster_info["master"])
929 self.DoConfdRequestReply(req)
931 def _CheckInstanceAlive(self, instance):
932 """Check if an instance is alive by doing http checks.
934 This will try to retrieve the url on the instance /hostname.txt
935 and check that it contains the hostname of the instance. In case
936 we get ECONNREFUSED, we retry up to the net timeout seconds, for
937 any other error we abort.
940 if not self.opts.http_check:
942 end_time = time.time() + self.opts.net_timeout
944 while time.time() < end_time and url is None:
946 url = self.url_opener.open("http://%s/hostname.txt" % instance)
948 # here we can have connection refused, no route to host, etc.
951 raise InstanceDown(instance, "Cannot contact instance")
952 hostname = url.read().strip()
954 if hostname != instance:
955 raise InstanceDown(instance, ("Hostname mismatch, expected %s, got %s" %
956 (instance, hostname)))
958 def BurninCluster(self):
959 """Test a cluster intensively.
961 This will create instances and then start/stop/failover them.
962 It is safe for existing instances but could impact performance.
968 Log("Testing global parameters")
970 if (len(self.nodes) == 1 and
971 opts.disk_template not in (constants.DT_DISKLESS, constants.DT_PLAIN,
973 Err("When one node is available/selected the disk template must"
974 " be 'diskless', 'file' or 'plain'")
978 self.BurnCreateInstances()
979 if opts.do_replace1 and opts.disk_template in constants.DTS_NET_MIRROR:
980 self.BurnReplaceDisks1D8()
981 if (opts.do_replace2 and len(self.nodes) > 2 and
982 opts.disk_template in constants.DTS_NET_MIRROR) :
983 self.BurnReplaceDisks2()
985 if (opts.disk_template in constants.DTS_GROWABLE and
986 compat.any(self.disk_growth, lambda n: n > 0)):
989 if opts.do_failover and opts.disk_template in constants.DTS_NET_MIRROR:
993 if opts.disk_template != constants.DT_DRBD8:
994 Log("Skipping migration (disk template not DRBD8)")
995 elif not self.hv_class.CAN_MIGRATE:
996 Log("Skipping migration (hypervisor %s does not support it)",
1001 if (opts.do_move and len(self.nodes) > 1 and
1002 opts.disk_template in [constants.DT_PLAIN, constants.DT_FILE]):
1005 if (opts.do_importexport and
1006 opts.disk_template not in (constants.DT_DISKLESS,
1007 constants.DT_FILE)):
1008 self.BurnImportExport()
1010 if opts.do_reinstall:
1011 self.BurnReinstall()
1016 if opts.do_addremove_disks:
1017 self.BurnAddRemoveDisks()
1019 default_nic_mode = self.cluster_default_nicparams[constants.NIC_MODE]
1020 # Don't add/remove nics in routed mode, as we would need an ip to add
1022 if opts.do_addremove_nics:
1023 if default_nic_mode == constants.NIC_MODE_BRIDGED:
1024 self.BurnAddRemoveNICs()
1026 Log("Skipping nic add/remove as the cluster is not in bridged mode")
1028 if opts.do_activate_disks:
1029 self.BurnActivateDisks()
1034 if opts.do_confd_tests:
1037 if opts.do_startstop:
1038 self.BurnStopStart()
1043 Log("Error detected: opcode buffer follows:\n\n")
1044 Log(self.GetFeedbackBuf())
1046 if not self.opts.keep_instances:
1049 except Exception, err: # pylint: disable-msg=W0703
1050 if has_err: # already detected errors, so errors in removal
1051 # are quite expected
1052 Log("Note: error detected during instance remove: %s", err)
1053 else: # non-expected error
1063 return burner.BurninCluster()
1066 if __name__ == "__main__":