4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
31 from itertools import izip, islice, cycle
32 from cStringIO import StringIO
34 from ganeti import opcodes
35 from ganeti import constants
36 from ganeti import cli
37 from ganeti import errors
38 from ganeti import utils
39 from ganeti import hypervisor
40 from ganeti import compat
42 from ganeti.confd import client as confd_client
45 USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...")
54 class InstanceDown(Exception):
55 """The checked instance was not up"""
58 class BurninFailure(Exception):
59 """Failure detected during burning"""
63 """Shows program usage information and exits the program."""
65 print >> sys.stderr, "Usage:"
66 print >> sys.stderr, USAGE
70 def Log(msg, *args, **kwargs):
71 """Simple function that prints out its argument.
76 indent = kwargs.get('indent', 0)
77 sys.stdout.write("%*s%s%s\n" % (2*indent, "",
78 LOG_HEADERS.get(indent, " "), msg))
82 def Err(msg, exit_code=1):
83 """Simple error logging that prints to stderr.
86 sys.stderr.write(msg + "\n")
91 class SimpleOpener(urllib.FancyURLopener):
92 """A simple url opener"""
93 # pylint: disable-msg=W0221
95 def prompt_user_passwd(self, host, realm, clear_cache=0):
96 """No-interaction version of prompt_user_passwd."""
97 # we follow parent class' API
98 # pylint: disable-msg=W0613
101 def http_error_default(self, url, fp, errcode, errmsg, headers):
102 """Custom error handling"""
103 # make sure sockets are not left in CLOSE_WAIT, this is similar
104 # but with a different exception to the BasicURLOpener class
105 _ = fp.read() # throw away data
107 raise InstanceDown("HTTP error returned: code %s, msg %s" %
112 cli.cli_option("-o", "--os", dest="os", default=None,
113 help="OS to use during burnin",
115 completion_suggest=cli.OPT_COMPL_ONE_OS),
118 cli.cli_option("--disk-size", dest="disk_size",
119 help="Disk size (determines disk count)",
120 default="128m", type="string", metavar="<size,size,...>",
121 completion_suggest=("128M 512M 1G 4G 1G,256M"
122 " 4G,1G,1G 10G").split()),
123 cli.cli_option("--disk-growth", dest="disk_growth", help="Disk growth",
124 default="128m", type="string", metavar="<size,size,...>"),
125 cli.cli_option("--mem-size", dest="mem_size", help="Memory size",
126 default=128, type="unit", metavar="<size>",
127 completion_suggest=("128M 256M 512M 1G 4G 8G"
128 " 12G 16G").split()),
133 cli.EARLY_RELEASE_OPT,
134 cli.cli_option("--no-replace1", dest="do_replace1",
135 help="Skip disk replacement with the same secondary",
136 action="store_false", default=True),
137 cli.cli_option("--no-replace2", dest="do_replace2",
138 help="Skip disk replacement with a different secondary",
139 action="store_false", default=True),
140 cli.cli_option("--no-failover", dest="do_failover",
141 help="Skip instance failovers", action="store_false",
143 cli.cli_option("--no-migrate", dest="do_migrate",
144 help="Skip instance live migration",
145 action="store_false", default=True),
146 cli.cli_option("--no-move", dest="do_move",
147 help="Skip instance moves", action="store_false",
149 cli.cli_option("--no-importexport", dest="do_importexport",
150 help="Skip instance export/import", action="store_false",
152 cli.cli_option("--no-startstop", dest="do_startstop",
153 help="Skip instance stop/start", action="store_false",
155 cli.cli_option("--no-reinstall", dest="do_reinstall",
156 help="Skip instance reinstall", action="store_false",
158 cli.cli_option("--no-reboot", dest="do_reboot",
159 help="Skip instance reboot", action="store_false",
161 cli.cli_option("--reboot-types", dest="reboot_types",
162 help="Specify the reboot types", default=None),
163 cli.cli_option("--no-activate-disks", dest="do_activate_disks",
164 help="Skip disk activation/deactivation",
165 action="store_false", default=True),
166 cli.cli_option("--no-add-disks", dest="do_addremove_disks",
167 help="Skip disk addition/removal",
168 action="store_false", default=True),
169 cli.cli_option("--no-add-nics", dest="do_addremove_nics",
170 help="Skip NIC addition/removal",
171 action="store_false", default=True),
172 cli.cli_option("--no-nics", dest="nics",
173 help="No network interfaces", action="store_const",
174 const=[], default=[{}]),
175 cli.cli_option("--no-confd", dest="do_confd_tests",
176 help="Skip confd queries",
177 action="store_false", default=True),
178 cli.cli_option("--rename", dest="rename", default=None,
179 help=("Give one unused instance name which is taken"
180 " to start the renaming sequence"),
181 metavar="<instance_name>"),
182 cli.cli_option("-t", "--disk-template", dest="disk_template",
183 choices=list(constants.DISK_TEMPLATES),
184 default=constants.DT_DRBD8,
185 help="Disk template (diskless, file, plain or drbd) [drbd]"),
186 cli.cli_option("-n", "--nodes", dest="nodes", default="",
187 help=("Comma separated list of nodes to perform"
188 " the burnin on (defaults to all nodes)"),
189 completion_suggest=cli.OPT_COMPL_MANY_NODES),
190 cli.cli_option("-I", "--iallocator", dest="iallocator",
191 default=None, type="string",
192 help=("Perform the allocation using an iallocator"
193 " instead of fixed node spread (node restrictions no"
194 " longer apply, therefore -n/--nodes must not be"
196 completion_suggest=cli.OPT_COMPL_ONE_IALLOCATOR),
197 cli.cli_option("-p", "--parallel", default=False, action="store_true",
199 help=("Enable parallelization of some operations in"
200 " order to speed burnin or to test granular locking")),
201 cli.cli_option("--net-timeout", default=15, type="int",
203 help=("The instance check network timeout in seconds"
204 " (defaults to 15 seconds)"),
205 completion_suggest="15 60 300 900".split()),
206 cli.cli_option("-C", "--http-check", default=False, action="store_true",
208 help=("Enable checking of instance status via http,"
209 " looking for /hostname.txt that should contain the"
210 " name of the instance")),
211 cli.cli_option("-K", "--keep-instances", default=False,
213 dest="keep_instances",
214 help=("Leave instances on the cluster after burnin,"
215 " for investigation in case of errors or simply"
219 # Mainly used for bash completion
220 ARGUMENTS = [cli.ArgInstance(min=1)]
223 def _DoCheckInstances(fn):
224 """Decorator for checking instances.
227 def wrapper(self, *args, **kwargs):
228 val = fn(self, *args, **kwargs)
229 for instance in self.instances:
230 self._CheckInstanceAlive(instance) # pylint: disable-msg=W0212
237 """Decorator for possible batch operations.
239 Must come after the _DoCheckInstances decorator (if any).
241 @param retry: whether this is a retryable batch, will be
246 def batched(self, *args, **kwargs):
247 self.StartBatch(retry)
248 val = fn(self, *args, **kwargs)
256 class Burner(object):
261 utils.SetupLogging(constants.LOG_BURNIN, debug=False, stderr_logging=True)
262 self.url_opener = SimpleOpener()
263 self._feed_buf = StringIO()
269 self.queue_retry = False
270 self.disk_count = self.disk_growth = self.disk_size = None
271 self.hvp = self.bep = None
273 self.cl = cli.GetClient()
276 def ClearFeedbackBuf(self):
277 """Clear the feedback buffer."""
278 self._feed_buf.truncate(0)
280 def GetFeedbackBuf(self):
281 """Return the contents of the buffer."""
282 return self._feed_buf.getvalue()
284 def Feedback(self, msg):
285 """Acumulate feedback in our buffer."""
286 formatted_msg = "%s %s" % (time.ctime(utils.MergeTime(msg[0])), msg[2])
287 self._feed_buf.write(formatted_msg + "\n")
288 if self.opts.verbose:
289 Log(formatted_msg, indent=3)
291 def MaybeRetry(self, retry_count, msg, fn, *args):
292 """Possibly retry a given function execution.
294 @type retry_count: int
295 @param retry_count: retry counter:
296 - 0: non-retryable action
297 - 1: last retry for a retryable action
298 - MAX_RETRIES: original try for a retryable action
300 @param msg: the kind of the operation
302 @param fn: the function to be called
307 if retry_count > 0 and retry_count < MAX_RETRIES:
308 Log("Idempotent %s succeeded after %d retries",
309 msg, MAX_RETRIES - retry_count)
311 except Exception, err: # pylint: disable-msg=W0703
313 Log("Non-idempotent %s failed, aborting", msg)
315 elif retry_count == 1:
316 Log("Idempotent %s repeated failure, aborting", msg)
319 Log("Idempotent %s failed, retry #%d/%d: %s",
320 msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err)
321 self.MaybeRetry(retry_count - 1, msg, fn, *args)
323 def _SetDebug(self, ops):
324 """Set the debug value on the given opcodes"""
326 op.debug_level = self.opts.debug
328 def _ExecOp(self, *ops):
329 """Execute one or more opcodes and manage the exec buffer.
331 @return: if only opcode has been passed, we return its result;
332 otherwise we return the list of results
335 job_id = cli.SendJob(ops, cl=self.cl)
336 results = cli.PollJob(job_id, cl=self.cl, feedback_fn=self.Feedback)
342 def ExecOp(self, retry, *ops):
343 """Execute one or more opcodes and manage the exec buffer.
345 @return: if only opcode has been passed, we return its result;
346 otherwise we return the list of results
354 return self.MaybeRetry(rval, "opcode", self._ExecOp, *ops)
356 def ExecOrQueue(self, name, ops, post_process=None):
357 """Execute an opcode and manage the exec buffer."""
358 if self.opts.parallel:
360 self.queued_ops.append((ops, name, post_process))
362 val = self.ExecOp(self.queue_retry, *ops) # pylint: disable-msg=W0142
363 if post_process is not None:
367 def StartBatch(self, retry):
368 """Start a new batch of jobs.
370 @param retry: whether this is a retryable batch
374 self.queue_retry = retry
376 def CommitQueue(self):
377 """Execute all submitted opcodes in case of parallel burnin"""
378 if not self.opts.parallel or not self.queued_ops:
387 results = self.MaybeRetry(rval, "jobset", self.ExecJobSet,
393 def ExecJobSet(self, jobs):
394 """Execute a set of jobs and return once all are done.
396 The method will return the list of results, if all jobs are
397 successful. Otherwise, OpExecError will be raised from within
401 self.ClearFeedbackBuf()
402 jex = cli.JobExecutor(cl=self.cl, feedback_fn=self.Feedback)
403 for ops, name, _ in jobs:
404 jex.QueueJob(name, *ops) # pylint: disable-msg=W0142
406 results = jex.GetResults()
407 except Exception, err: # pylint: disable-msg=W0703
408 Log("Jobs failed: %s", err)
409 raise BurninFailure()
413 for (_, name, post_process), (success, result) in zip(jobs, results):
418 except Exception, err: # pylint: disable-msg=W0703
419 Log("Post process call for job %s failed: %s", name, err)
426 raise BurninFailure()
430 def ParseOptions(self):
431 """Parses the command line options.
433 In case of command line errors, it will show the usage and exit the
437 parser = optparse.OptionParser(usage="\n%s" % USAGE,
438 version=("%%prog (ganeti) %s" %
439 constants.RELEASE_VERSION),
442 options, args = parser.parse_args()
443 if len(args) < 1 or options.os is None:
446 supported_disk_templates = (constants.DT_DISKLESS,
450 if options.disk_template not in supported_disk_templates:
451 Err("Unknown disk template '%s'" % options.disk_template)
453 if options.disk_template == constants.DT_DISKLESS:
454 disk_size = disk_growth = []
455 options.do_addremove_disks = False
457 disk_size = [utils.ParseUnit(v) for v in options.disk_size.split(",")]
458 disk_growth = [utils.ParseUnit(v)
459 for v in options.disk_growth.split(",")]
460 if len(disk_growth) != len(disk_size):
461 Err("Wrong disk sizes/growth combination")
462 if ((disk_size and options.disk_template == constants.DT_DISKLESS) or
463 (not disk_size and options.disk_template != constants.DT_DISKLESS)):
464 Err("Wrong disk count/disk template combination")
466 self.disk_size = disk_size
467 self.disk_growth = disk_growth
468 self.disk_count = len(disk_size)
470 if options.nodes and options.iallocator:
471 Err("Give either the nodes option or the iallocator option, not both")
473 if options.http_check and not options.name_check:
474 Err("Can't enable HTTP checks without name checks")
477 self.instances = args
479 constants.BE_MEMORY: options.mem_size,
480 constants.BE_VCPUS: 1,
483 self.hypervisor = None
485 if options.hypervisor:
486 self.hypervisor, self.hvp = options.hypervisor
488 if options.reboot_types is None:
489 options.reboot_types = constants.REBOOT_TYPES
491 options.reboot_types = options.reboot_types.split(",")
492 rt_diff = set(options.reboot_types).difference(constants.REBOOT_TYPES)
494 Err("Invalid reboot types specified: %s" % utils.CommaJoin(rt_diff))
496 socket.setdefaulttimeout(options.net_timeout)
499 """Read the cluster state from the master daemon."""
501 names = self.opts.nodes.split(",")
505 op = opcodes.OpQueryNodes(output_fields=["name", "offline", "drained"],
506 names=names, use_locking=True)
507 result = self.ExecOp(True, op)
508 except errors.GenericError, err:
509 err_code, msg = cli.FormatError(err)
510 Err(msg, exit_code=err_code)
511 self.nodes = [data[0] for data in result if not (data[1] or data[2])]
513 op_diagnose = opcodes.OpDiagnoseOS(output_fields=["name",
517 result = self.ExecOp(True, op_diagnose)
520 Err("Can't get the OS list")
523 for (name, variants, _) in result:
524 if self.opts.os in cli.CalculateOSNames(name, variants):
529 Err("OS '%s' not found" % self.opts.os)
531 cluster_info = self.cl.QueryClusterInfo()
532 self.cluster_info = cluster_info
533 if not self.cluster_info:
534 Err("Can't get cluster info")
536 default_nic_params = self.cluster_info["nicparams"][constants.PP_DEFAULT]
537 self.cluster_default_nicparams = default_nic_params
538 if self.hypervisor is None:
539 self.hypervisor = self.cluster_info["default_hypervisor"]
540 self.hv_class = hypervisor.GetHypervisorClass(self.hypervisor)
544 def BurnCreateInstances(self):
545 """Create the given instances.
549 mytor = izip(cycle(self.nodes),
550 islice(cycle(self.nodes), 1, None),
553 Log("Creating instances")
554 for pnode, snode, instance in mytor:
555 Log("instance %s", instance, indent=1)
556 if self.opts.iallocator:
558 msg = "with iallocator %s" % self.opts.iallocator
559 elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
561 msg = "on %s" % pnode
563 msg = "on %s, %s" % (pnode, snode)
567 op = opcodes.OpInstanceCreate(instance_name=instance,
568 disks = [ {"size": size}
569 for size in self.disk_size],
570 disk_template=self.opts.disk_template,
572 mode=constants.INSTANCE_CREATE,
573 os_type=self.opts.os,
577 ip_check=self.opts.ip_check,
578 name_check=self.opts.name_check,
581 file_storage_dir=None,
582 iallocator=self.opts.iallocator,
585 hypervisor=self.hypervisor,
586 osparams=self.opts.osparams,
588 remove_instance = lambda name: lambda: self.to_rem.append(name)
589 self.ExecOrQueue(instance, [op], post_process=remove_instance(instance))
592 def BurnGrowDisks(self):
593 """Grow both the os and the swap disks by the requested amount, if any."""
595 for instance in self.instances:
596 Log("instance %s", instance, indent=1)
597 for idx, growth in enumerate(self.disk_growth):
599 op = opcodes.OpInstanceGrowDisk(instance_name=instance, disk=idx,
600 amount=growth, wait_for_sync=True)
601 Log("increase disk/%s by %s MB", idx, growth, indent=2)
602 self.ExecOrQueue(instance, [op])
605 def BurnReplaceDisks1D8(self):
606 """Replace disks on primary and secondary for drbd8."""
607 Log("Replacing disks on the same nodes")
608 for instance in self.instances:
609 Log("instance %s", instance, indent=1)
611 for mode in constants.REPLACE_DISK_SEC, constants.REPLACE_DISK_PRI:
612 op = opcodes.OpReplaceDisks(instance_name=instance,
614 disks=[i for i in range(self.disk_count)],
615 early_release=self.opts.early_release)
616 Log("run %s", mode, indent=2)
618 self.ExecOrQueue(instance, ops)
621 def BurnReplaceDisks2(self):
622 """Replace secondary node."""
623 Log("Changing the secondary node")
624 mode = constants.REPLACE_DISK_CHG
626 mytor = izip(islice(cycle(self.nodes), 2, None),
628 for tnode, instance in mytor:
629 Log("instance %s", instance, indent=1)
630 if self.opts.iallocator:
632 msg = "with iallocator %s" % self.opts.iallocator
635 op = opcodes.OpReplaceDisks(instance_name=instance,
638 iallocator=self.opts.iallocator,
640 early_release=self.opts.early_release)
641 Log("run %s %s", mode, msg, indent=2)
642 self.ExecOrQueue(instance, [op])
646 def BurnFailover(self):
647 """Failover the instances."""
648 Log("Failing over instances")
649 for instance in self.instances:
650 Log("instance %s", instance, indent=1)
651 op = opcodes.OpInstanceFailover(instance_name=instance,
652 ignore_consistency=False)
653 self.ExecOrQueue(instance, [op])
658 """Move the instances."""
659 Log("Moving instances")
660 mytor = izip(islice(cycle(self.nodes), 1, None),
662 for tnode, instance in mytor:
663 Log("instance %s", instance, indent=1)
664 op = opcodes.OpInstanceMove(instance_name=instance,
666 self.ExecOrQueue(instance, [op])
669 def BurnMigrate(self):
670 """Migrate the instances."""
671 Log("Migrating instances")
672 for instance in self.instances:
673 Log("instance %s", instance, indent=1)
674 op1 = opcodes.OpInstanceMigrate(instance_name=instance, mode=None,
677 op2 = opcodes.OpInstanceMigrate(instance_name=instance, mode=None,
679 Log("migration and migration cleanup", indent=2)
680 self.ExecOrQueue(instance, [op1, op2])
684 def BurnImportExport(self):
685 """Export the instance, delete it, and import it back.
688 Log("Exporting and re-importing instances")
689 mytor = izip(cycle(self.nodes),
690 islice(cycle(self.nodes), 1, None),
691 islice(cycle(self.nodes), 2, None),
694 for pnode, snode, enode, instance in mytor:
695 Log("instance %s", instance, indent=1)
696 # read the full name of the instance
697 nam_op = opcodes.OpInstanceQuery(output_fields=["name"],
698 names=[instance], use_locking=True)
699 full_name = self.ExecOp(False, nam_op)[0][0]
701 if self.opts.iallocator:
703 import_log_msg = ("import from %s"
704 " with iallocator %s" %
705 (enode, self.opts.iallocator))
706 elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
708 import_log_msg = ("import from %s to %s" %
711 import_log_msg = ("import from %s to %s, %s" %
712 (enode, pnode, snode))
714 exp_op = opcodes.OpBackupExport(instance_name=instance,
716 mode=constants.EXPORT_MODE_LOCAL,
718 rem_op = opcodes.OpRemoveInstance(instance_name=instance,
719 ignore_failures=True)
720 imp_dir = utils.PathJoin(constants.EXPORT_DIR, full_name)
721 imp_op = opcodes.OpInstanceCreate(instance_name=instance,
722 disks = [ {"size": size}
723 for size in self.disk_size],
724 disk_template=self.opts.disk_template,
726 mode=constants.INSTANCE_IMPORT,
732 ip_check=self.opts.ip_check,
733 name_check=self.opts.name_check,
735 file_storage_dir=None,
737 iallocator=self.opts.iallocator,
740 osparams=self.opts.osparams,
743 erem_op = opcodes.OpBackupRemove(instance_name=instance)
745 Log("export to node %s", enode, indent=2)
746 Log("remove instance", indent=2)
747 Log(import_log_msg, indent=2)
748 Log("remove export", indent=2)
749 self.ExecOrQueue(instance, [exp_op, rem_op, imp_op, erem_op])
752 def StopInstanceOp(instance):
753 """Stop given instance."""
754 return opcodes.OpShutdownInstance(instance_name=instance)
757 def StartInstanceOp(instance):
758 """Start given instance."""
759 return opcodes.OpStartupInstance(instance_name=instance, force=False)
762 def RenameInstanceOp(instance, instance_new):
763 """Rename instance."""
764 return opcodes.OpRenameInstance(instance_name=instance,
765 new_name=instance_new)
769 def BurnStopStart(self):
770 """Stop/start the instances."""
771 Log("Stopping and starting instances")
772 for instance in self.instances:
773 Log("instance %s", instance, indent=1)
774 op1 = self.StopInstanceOp(instance)
775 op2 = self.StartInstanceOp(instance)
776 self.ExecOrQueue(instance, [op1, op2])
779 def BurnRemove(self):
780 """Remove the instances."""
781 Log("Removing instances")
782 for instance in self.to_rem:
783 Log("instance %s", instance, indent=1)
784 op = opcodes.OpRemoveInstance(instance_name=instance,
785 ignore_failures=True)
786 self.ExecOrQueue(instance, [op])
788 def BurnRename(self):
789 """Rename the instances.
791 Note that this function will not execute in parallel, since we
792 only have one target for rename.
795 Log("Renaming instances")
796 rename = self.opts.rename
797 for instance in self.instances:
798 Log("instance %s", instance, indent=1)
799 op_stop1 = self.StopInstanceOp(instance)
800 op_stop2 = self.StopInstanceOp(rename)
801 op_rename1 = self.RenameInstanceOp(instance, rename)
802 op_rename2 = self.RenameInstanceOp(rename, instance)
803 op_start1 = self.StartInstanceOp(rename)
804 op_start2 = self.StartInstanceOp(instance)
805 self.ExecOp(False, op_stop1, op_rename1, op_start1)
806 self._CheckInstanceAlive(rename)
807 self.ExecOp(False, op_stop2, op_rename2, op_start2)
808 self._CheckInstanceAlive(instance)
812 def BurnReinstall(self):
813 """Reinstall the instances."""
814 Log("Reinstalling instances")
815 for instance in self.instances:
816 Log("instance %s", instance, indent=1)
817 op1 = self.StopInstanceOp(instance)
818 op2 = opcodes.OpReinstallInstance(instance_name=instance)
819 Log("reinstall without passing the OS", indent=2)
820 op3 = opcodes.OpReinstallInstance(instance_name=instance,
821 os_type=self.opts.os)
822 Log("reinstall specifying the OS", indent=2)
823 op4 = self.StartInstanceOp(instance)
824 self.ExecOrQueue(instance, [op1, op2, op3, op4])
828 def BurnReboot(self):
829 """Reboot the instances."""
830 Log("Rebooting instances")
831 for instance in self.instances:
832 Log("instance %s", instance, indent=1)
834 for reboot_type in self.opts.reboot_types:
835 op = opcodes.OpInstanceReboot(instance_name=instance,
836 reboot_type=reboot_type,
837 ignore_secondaries=False)
838 Log("reboot with type '%s'", reboot_type, indent=2)
840 self.ExecOrQueue(instance, ops)
844 def BurnActivateDisks(self):
845 """Activate and deactivate disks of the instances."""
846 Log("Activating/deactivating disks")
847 for instance in self.instances:
848 Log("instance %s", instance, indent=1)
849 op_start = self.StartInstanceOp(instance)
850 op_act = opcodes.OpInstanceActivateDisks(instance_name=instance)
851 op_deact = opcodes.OpInstanceDeactivateDisks(instance_name=instance)
852 op_stop = self.StopInstanceOp(instance)
853 Log("activate disks when online", indent=2)
854 Log("activate disks when offline", indent=2)
855 Log("deactivate disks (when offline)", indent=2)
856 self.ExecOrQueue(instance, [op_act, op_stop, op_act, op_deact, op_start])
860 def BurnAddRemoveDisks(self):
861 """Add and remove an extra disk for the instances."""
862 Log("Adding and removing disks")
863 for instance in self.instances:
864 Log("instance %s", instance, indent=1)
865 op_add = opcodes.OpSetInstanceParams(\
866 instance_name=instance,
867 disks=[(constants.DDM_ADD, {"size": self.disk_size[0]})])
868 op_rem = opcodes.OpSetInstanceParams(\
869 instance_name=instance, disks=[(constants.DDM_REMOVE, {})])
870 op_stop = self.StopInstanceOp(instance)
871 op_start = self.StartInstanceOp(instance)
872 Log("adding a disk", indent=2)
873 Log("removing last disk", indent=2)
874 self.ExecOrQueue(instance, [op_add, op_stop, op_rem, op_start])
877 def BurnAddRemoveNICs(self):
878 """Add and remove an extra NIC for the instances."""
879 Log("Adding and removing NICs")
880 for instance in self.instances:
881 Log("instance %s", instance, indent=1)
882 op_add = opcodes.OpSetInstanceParams(\
883 instance_name=instance, nics=[(constants.DDM_ADD, {})])
884 op_rem = opcodes.OpSetInstanceParams(\
885 instance_name=instance, nics=[(constants.DDM_REMOVE, {})])
886 Log("adding a NIC", indent=2)
887 Log("removing last NIC", indent=2)
888 self.ExecOrQueue(instance, [op_add, op_rem])
890 def ConfdCallback(self, reply):
891 """Callback for confd queries"""
892 if reply.type == confd_client.UPCALL_REPLY:
893 if reply.server_reply.status != constants.CONFD_REPL_STATUS_OK:
894 Err("Query %s gave non-ok status %s: %s" % (reply.orig_request,
895 reply.server_reply.status,
897 if reply.orig_request.type == constants.CONFD_REQ_PING:
898 Log("Ping: OK", indent=1)
899 elif reply.orig_request.type == constants.CONFD_REQ_CLUSTER_MASTER:
900 if reply.server_reply.answer == self.cluster_info["master"]:
901 Log("Master: OK", indent=1)
903 Err("Master: wrong: %s" % reply.server_reply.answer)
904 elif reply.orig_request.type == constants.CONFD_REQ_NODE_ROLE_BYNAME:
905 if reply.server_reply.answer == constants.CONFD_NODE_ROLE_MASTER:
906 Log("Node role for master: OK", indent=1)
908 Err("Node role for master: wrong: %s" % reply.server_reply.answer)
910 def DoConfdRequestReply(self, req):
911 self.confd_counting_callback.RegisterQuery(req.rsalt)
912 self.confd_client.SendRequest(req, async=False)
913 while not self.confd_counting_callback.AllAnswered():
914 if not self.confd_client.ReceiveReply():
915 Err("Did not receive all expected confd replies")
919 """Run confd queries for our instances.
921 The following confd queries are tested:
922 - CONFD_REQ_PING: simple ping
923 - CONFD_REQ_CLUSTER_MASTER: cluster master
924 - CONFD_REQ_NODE_ROLE_BYNAME: node role, for the master
927 Log("Checking confd results")
929 filter_callback = confd_client.ConfdFilterCallback(self.ConfdCallback)
930 counting_callback = confd_client.ConfdCountingCallback(filter_callback)
931 self.confd_counting_callback = counting_callback
933 self.confd_client = confd_client.GetConfdClient(counting_callback)
935 req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
936 self.DoConfdRequestReply(req)
938 req = confd_client.ConfdClientRequest(
939 type=constants.CONFD_REQ_CLUSTER_MASTER)
940 self.DoConfdRequestReply(req)
942 req = confd_client.ConfdClientRequest(
943 type=constants.CONFD_REQ_NODE_ROLE_BYNAME,
944 query=self.cluster_info["master"])
945 self.DoConfdRequestReply(req)
947 def _CheckInstanceAlive(self, instance):
948 """Check if an instance is alive by doing http checks.
950 This will try to retrieve the url on the instance /hostname.txt
951 and check that it contains the hostname of the instance. In case
952 we get ECONNREFUSED, we retry up to the net timeout seconds, for
953 any other error we abort.
956 if not self.opts.http_check:
958 end_time = time.time() + self.opts.net_timeout
960 while time.time() < end_time and url is None:
962 url = self.url_opener.open("http://%s/hostname.txt" % instance)
964 # here we can have connection refused, no route to host, etc.
967 raise InstanceDown(instance, "Cannot contact instance")
968 hostname = url.read().strip()
970 if hostname != instance:
971 raise InstanceDown(instance, ("Hostname mismatch, expected %s, got %s" %
972 (instance, hostname)))
974 def BurninCluster(self):
975 """Test a cluster intensively.
977 This will create instances and then start/stop/failover them.
978 It is safe for existing instances but could impact performance.
984 Log("Testing global parameters")
986 if (len(self.nodes) == 1 and
987 opts.disk_template not in (constants.DT_DISKLESS, constants.DT_PLAIN,
989 Err("When one node is available/selected the disk template must"
990 " be 'diskless', 'file' or 'plain'")
994 self.BurnCreateInstances()
995 if opts.do_replace1 and opts.disk_template in constants.DTS_NET_MIRROR:
996 self.BurnReplaceDisks1D8()
997 if (opts.do_replace2 and len(self.nodes) > 2 and
998 opts.disk_template in constants.DTS_NET_MIRROR) :
999 self.BurnReplaceDisks2()
1001 if (opts.disk_template in constants.DTS_GROWABLE and
1002 compat.any(n > 0 for n in self.disk_growth)):
1003 self.BurnGrowDisks()
1005 if opts.do_failover and opts.disk_template in constants.DTS_NET_MIRROR:
1009 if opts.disk_template != constants.DT_DRBD8:
1010 Log("Skipping migration (disk template not DRBD8)")
1011 elif not self.hv_class.CAN_MIGRATE:
1012 Log("Skipping migration (hypervisor %s does not support it)",
1017 if (opts.do_move and len(self.nodes) > 1 and
1018 opts.disk_template in [constants.DT_PLAIN, constants.DT_FILE]):
1021 if (opts.do_importexport and
1022 opts.disk_template not in (constants.DT_DISKLESS,
1023 constants.DT_FILE)):
1024 self.BurnImportExport()
1026 if opts.do_reinstall:
1027 self.BurnReinstall()
1032 if opts.do_addremove_disks:
1033 self.BurnAddRemoveDisks()
1035 default_nic_mode = self.cluster_default_nicparams[constants.NIC_MODE]
1036 # Don't add/remove nics in routed mode, as we would need an ip to add
1038 if opts.do_addremove_nics:
1039 if default_nic_mode == constants.NIC_MODE_BRIDGED:
1040 self.BurnAddRemoveNICs()
1042 Log("Skipping nic add/remove as the cluster is not in bridged mode")
1044 if opts.do_activate_disks:
1045 self.BurnActivateDisks()
1050 if opts.do_confd_tests:
1053 if opts.do_startstop:
1054 self.BurnStopStart()
1059 Log("Error detected: opcode buffer follows:\n\n")
1060 Log(self.GetFeedbackBuf())
1062 if not self.opts.keep_instances:
1065 except Exception, err: # pylint: disable-msg=W0703
1066 if has_err: # already detected errors, so errors in removal
1067 # are quite expected
1068 Log("Note: error detected during instance remove: %s", err)
1069 else: # non-expected error
1079 return burner.BurninCluster()
1082 if __name__ == "__main__":