4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
31 from itertools import izip, islice, cycle
32 from cStringIO import StringIO
34 from ganeti import opcodes
35 from ganeti import constants
36 from ganeti import cli
37 from ganeti import errors
38 from ganeti import utils
39 from ganeti import ssconf
41 from ganeti.confd import client as confd_client
44 USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...")
53 class InstanceDown(Exception):
54 """The checked instance was not up"""
57 class BurninFailure(Exception):
58 """Failure detected during burning"""
62 """Shows program usage information and exits the program."""
64 print >> sys.stderr, "Usage:"
65 print >> sys.stderr, USAGE
69 def Log(msg, *args, **kwargs):
70 """Simple function that prints out its argument.
75 indent = kwargs.get('indent', 0)
76 sys.stdout.write("%*s%s%s\n" % (2*indent, "",
77 LOG_HEADERS.get(indent, " "), msg))
81 def Err(msg, exit_code=1):
82 """Simple error logging that prints to stderr.
85 sys.stderr.write(msg + "\n")
90 class SimpleOpener(urllib.FancyURLopener):
91 """A simple url opener"""
92 # pylint: disable-msg=W0221
94 def prompt_user_passwd(self, host, realm, clear_cache=0):
95 """No-interaction version of prompt_user_passwd."""
96 # we follow parent class' API
97 # pylint: disable-msg=W0613
100 def http_error_default(self, url, fp, errcode, errmsg, headers):
101 """Custom error handling"""
102 # make sure sockets are not left in CLOSE_WAIT, this is similar
103 # but with a different exception to the BasicURLOpener class
104 _ = fp.read() # throw away data
106 raise InstanceDown("HTTP error returned: code %s, msg %s" %
111 cli.cli_option("-o", "--os", dest="os", default=None,
112 help="OS to use during burnin",
114 completion_suggest=cli.OPT_COMPL_ONE_OS),
115 cli.cli_option("--disk-size", dest="disk_size",
116 help="Disk size (determines disk count)",
117 default="128m", type="string", metavar="<size,size,...>",
118 completion_suggest=("128M 512M 1G 4G 1G,256M"
119 " 4G,1G,1G 10G").split()),
120 cli.cli_option("--disk-growth", dest="disk_growth", help="Disk growth",
121 default="128m", type="string", metavar="<size,size,...>"),
122 cli.cli_option("--mem-size", dest="mem_size", help="Memory size",
123 default=128, type="unit", metavar="<size>",
124 completion_suggest=("128M 256M 512M 1G 4G 8G"
125 " 12G 16G").split()),
130 cli.EARLY_RELEASE_OPT,
131 cli.cli_option("--no-replace1", dest="do_replace1",
132 help="Skip disk replacement with the same secondary",
133 action="store_false", default=True),
134 cli.cli_option("--no-replace2", dest="do_replace2",
135 help="Skip disk replacement with a different secondary",
136 action="store_false", default=True),
137 cli.cli_option("--no-failover", dest="do_failover",
138 help="Skip instance failovers", action="store_false",
140 cli.cli_option("--no-migrate", dest="do_migrate",
141 help="Skip instance live migration",
142 action="store_false", default=True),
143 cli.cli_option("--no-move", dest="do_move",
144 help="Skip instance moves", action="store_false",
146 cli.cli_option("--no-importexport", dest="do_importexport",
147 help="Skip instance export/import", action="store_false",
149 cli.cli_option("--no-startstop", dest="do_startstop",
150 help="Skip instance stop/start", action="store_false",
152 cli.cli_option("--no-reinstall", dest="do_reinstall",
153 help="Skip instance reinstall", action="store_false",
155 cli.cli_option("--no-reboot", dest="do_reboot",
156 help="Skip instance reboot", action="store_false",
158 cli.cli_option("--no-activate-disks", dest="do_activate_disks",
159 help="Skip disk activation/deactivation",
160 action="store_false", default=True),
161 cli.cli_option("--no-add-disks", dest="do_addremove_disks",
162 help="Skip disk addition/removal",
163 action="store_false", default=True),
164 cli.cli_option("--no-add-nics", dest="do_addremove_nics",
165 help="Skip NIC addition/removal",
166 action="store_false", default=True),
167 cli.cli_option("--no-nics", dest="nics",
168 help="No network interfaces", action="store_const",
169 const=[], default=[{}]),
170 cli.cli_option("--no-confd", dest="do_confd_tests",
171 help="Skip confd queries",
172 action="store_false", default=True),
173 cli.cli_option("--rename", dest="rename", default=None,
174 help=("Give one unused instance name which is taken"
175 " to start the renaming sequence"),
176 metavar="<instance_name>"),
177 cli.cli_option("-t", "--disk-template", dest="disk_template",
178 choices=list(constants.DISK_TEMPLATES),
179 default=constants.DT_DRBD8,
180 help="Disk template (diskless, file, plain or drbd) [drbd]"),
181 cli.cli_option("-n", "--nodes", dest="nodes", default="",
182 help=("Comma separated list of nodes to perform"
183 " the burnin on (defaults to all nodes)"),
184 completion_suggest=cli.OPT_COMPL_MANY_NODES),
185 cli.cli_option("-I", "--iallocator", dest="iallocator",
186 default=None, type="string",
187 help=("Perform the allocation using an iallocator"
188 " instead of fixed node spread (node restrictions no"
189 " longer apply, therefore -n/--nodes must not be"
191 completion_suggest=cli.OPT_COMPL_ONE_IALLOCATOR),
192 cli.cli_option("-p", "--parallel", default=False, action="store_true",
194 help=("Enable parallelization of some operations in"
195 " order to speed burnin or to test granular locking")),
196 cli.cli_option("--net-timeout", default=15, type="int",
198 help=("The instance check network timeout in seconds"
199 " (defaults to 15 seconds)"),
200 completion_suggest="15 60 300 900".split()),
201 cli.cli_option("-C", "--http-check", default=False, action="store_true",
203 help=("Enable checking of instance status via http,"
204 " looking for /hostname.txt that should contain the"
205 " name of the instance")),
206 cli.cli_option("-K", "--keep-instances", default=False,
208 dest="keep_instances",
209 help=("Leave instances on the cluster after burnin,"
210 " for investigation in case of errors or simply"
214 # Mainly used for bash completion
215 ARGUMENTS = [cli.ArgInstance(min=1)]
218 def _DoCheckInstances(fn):
219 """Decorator for checking instances.
222 def wrapper(self, *args, **kwargs):
223 val = fn(self, *args, **kwargs)
224 for instance in self.instances:
225 self._CheckInstanceAlive(instance) # pylint: disable-msg=W0212
232 """Decorator for possible batch operations.
234 Must come after the _DoCheckInstances decorator (if any).
236 @param retry: whether this is a retryable batch, will be
241 def batched(self, *args, **kwargs):
242 self.StartBatch(retry)
243 val = fn(self, *args, **kwargs)
251 class Burner(object):
256 utils.SetupLogging(constants.LOG_BURNIN, debug=False, stderr_logging=True)
257 self.url_opener = SimpleOpener()
258 self._feed_buf = StringIO()
264 self.queue_retry = False
265 self.disk_count = self.disk_growth = self.disk_size = None
266 self.hvp = self.bep = None
268 self.cl = cli.GetClient()
269 self.ss = ssconf.SimpleStore()
272 def ClearFeedbackBuf(self):
273 """Clear the feedback buffer."""
274 self._feed_buf.truncate(0)
276 def GetFeedbackBuf(self):
277 """Return the contents of the buffer."""
278 return self._feed_buf.getvalue()
280 def Feedback(self, msg):
281 """Acumulate feedback in our buffer."""
282 formatted_msg = "%s %s" % (time.ctime(utils.MergeTime(msg[0])), msg[2])
283 self._feed_buf.write(formatted_msg + "\n")
284 if self.opts.verbose:
285 Log(formatted_msg, indent=3)
287 def MaybeRetry(self, retry_count, msg, fn, *args):
288 """Possibly retry a given function execution.
290 @type retry_count: int
291 @param retry_count: retry counter:
292 - 0: non-retryable action
293 - 1: last retry for a retryable action
294 - MAX_RETRIES: original try for a retryable action
296 @param msg: the kind of the operation
298 @param fn: the function to be called
303 if retry_count > 0 and retry_count < MAX_RETRIES:
304 Log("Idempotent %s succeeded after %d retries",
305 msg, MAX_RETRIES - retry_count)
307 except Exception, err: # pylint: disable-msg=W0703
309 Log("Non-idempotent %s failed, aborting", msg)
311 elif retry_count == 1:
312 Log("Idempotent %s repeated failure, aborting", msg)
315 Log("Idempotent %s failed, retry #%d/%d: %s",
316 msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err)
317 self.MaybeRetry(retry_count - 1, msg, fn, *args)
319 def _SetDebug(self, ops):
320 """Set the debug value on the given opcodes"""
322 op.debug_level = self.opts.debug
324 def _ExecOp(self, *ops):
325 """Execute one or more opcodes and manage the exec buffer.
327 @result: if only opcode has been passed, we return its result;
328 otherwise we return the list of results
331 job_id = cli.SendJob(ops, cl=self.cl)
332 results = cli.PollJob(job_id, cl=self.cl, feedback_fn=self.Feedback)
338 def ExecOp(self, retry, *ops):
339 """Execute one or more opcodes and manage the exec buffer.
341 @result: if only opcode has been passed, we return its result;
342 otherwise we return the list of results
350 return self.MaybeRetry(rval, "opcode", self._ExecOp, *ops)
352 def ExecOrQueue(self, name, *ops):
353 """Execute an opcode and manage the exec buffer."""
354 if self.opts.parallel:
356 self.queued_ops.append((ops, name))
358 return self.ExecOp(self.queue_retry, *ops)
360 def StartBatch(self, retry):
361 """Start a new batch of jobs.
363 @param retry: whether this is a retryable batch
367 self.queue_retry = retry
369 def CommitQueue(self):
370 """Execute all submitted opcodes in case of parallel burnin"""
371 if not self.opts.parallel:
380 results = self.MaybeRetry(rval, "jobset", self.ExecJobSet,
386 def ExecJobSet(self, jobs):
387 """Execute a set of jobs and return once all are done.
389 The method will return the list of results, if all jobs are
390 successful. Otherwise, OpExecError will be raised from within
394 self.ClearFeedbackBuf()
395 jex = cli.JobExecutor(cl=self.cl, feedback_fn=self.Feedback)
396 for ops, name in jobs:
397 jex.QueueJob(name, *ops) # pylint: disable-msg=W0142
399 results = jex.GetResults()
400 except Exception, err: # pylint: disable-msg=W0703
401 Log("Jobs failed: %s", err)
402 raise BurninFailure()
404 if utils.any(results, lambda x: not x[0]):
405 raise BurninFailure()
407 return [i[1] for i in results]
409 def ParseOptions(self):
410 """Parses the command line options.
412 In case of command line errors, it will show the usage and exit the
416 parser = optparse.OptionParser(usage="\n%s" % USAGE,
417 version=("%%prog (ganeti) %s" %
418 constants.RELEASE_VERSION),
421 options, args = parser.parse_args()
422 if len(args) < 1 or options.os is None:
425 supported_disk_templates = (constants.DT_DISKLESS,
429 if options.disk_template not in supported_disk_templates:
430 Err("Unknown disk template '%s'" % options.disk_template)
432 if options.disk_template == constants.DT_DISKLESS:
433 disk_size = disk_growth = []
434 options.do_addremove_disks = False
436 disk_size = [utils.ParseUnit(v) for v in options.disk_size.split(",")]
437 disk_growth = [utils.ParseUnit(v)
438 for v in options.disk_growth.split(",")]
439 if len(disk_growth) != len(disk_size):
440 Err("Wrong disk sizes/growth combination")
441 if ((disk_size and options.disk_template == constants.DT_DISKLESS) or
442 (not disk_size and options.disk_template != constants.DT_DISKLESS)):
443 Err("Wrong disk count/disk template combination")
445 self.disk_size = disk_size
446 self.disk_growth = disk_growth
447 self.disk_count = len(disk_size)
449 if options.nodes and options.iallocator:
450 Err("Give either the nodes option or the iallocator option, not both")
452 if options.http_check and not options.name_check:
453 Err("Can't enable HTTP checks without name checks")
456 self.instances = args
458 constants.BE_MEMORY: options.mem_size,
459 constants.BE_VCPUS: 1,
463 socket.setdefaulttimeout(options.net_timeout)
466 """Read the cluster state from the master daemon."""
468 names = self.opts.nodes.split(",")
472 op = opcodes.OpQueryNodes(output_fields=["name", "offline", "drained"],
473 names=names, use_locking=True)
474 result = self.ExecOp(True, op)
475 except errors.GenericError, err:
476 err_code, msg = cli.FormatError(err)
477 Err(msg, exit_code=err_code)
478 self.nodes = [data[0] for data in result if not (data[1] or data[2])]
480 op_diagnose = opcodes.OpDiagnoseOS(output_fields=["name", "valid",
481 "variants"], names=[])
482 result = self.ExecOp(True, op_diagnose)
485 Err("Can't get the OS list")
488 for (name, valid, variants) in result:
489 if valid and self.opts.os in cli.CalculateOSNames(name, variants):
494 Err("OS '%s' not found" % self.opts.os)
496 cluster_info = self.cl.QueryClusterInfo()
497 self.cluster_info = cluster_info
498 if not self.cluster_info:
499 Err("Can't get cluster info")
501 default_nic_params = self.cluster_info["nicparams"][constants.PP_DEFAULT]
502 self.cluster_default_nicparams = default_nic_params
506 def BurnCreateInstances(self):
507 """Create the given instances.
511 mytor = izip(cycle(self.nodes),
512 islice(cycle(self.nodes), 1, None),
515 Log("Creating instances")
516 for pnode, snode, instance in mytor:
517 Log("instance %s", instance, indent=1)
518 if self.opts.iallocator:
520 msg = "with iallocator %s" % self.opts.iallocator
521 elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
523 msg = "on %s" % pnode
525 msg = "on %s, %s" % (pnode, snode)
529 op = opcodes.OpCreateInstance(instance_name=instance,
530 disks = [ {"size": size}
531 for size in self.disk_size],
532 disk_template=self.opts.disk_template,
534 mode=constants.INSTANCE_CREATE,
535 os_type=self.opts.os,
539 ip_check=self.opts.ip_check,
540 name_check=self.opts.name_check,
543 file_storage_dir=None,
544 iallocator=self.opts.iallocator,
549 self.ExecOrQueue(instance, op)
550 self.to_rem.append(instance)
553 def BurnGrowDisks(self):
554 """Grow both the os and the swap disks by the requested amount, if any."""
556 for instance in self.instances:
557 Log("instance %s", instance, indent=1)
558 for idx, growth in enumerate(self.disk_growth):
560 op = opcodes.OpGrowDisk(instance_name=instance, disk=idx,
561 amount=growth, wait_for_sync=True)
562 Log("increase disk/%s by %s MB", idx, growth, indent=2)
563 self.ExecOrQueue(instance, op)
566 def BurnReplaceDisks1D8(self):
567 """Replace disks on primary and secondary for drbd8."""
568 Log("Replacing disks on the same nodes")
569 for instance in self.instances:
570 Log("instance %s", instance, indent=1)
572 for mode in constants.REPLACE_DISK_SEC, constants.REPLACE_DISK_PRI:
573 op = opcodes.OpReplaceDisks(instance_name=instance,
575 disks=[i for i in range(self.disk_count)],
576 early_release=self.opts.early_release)
577 Log("run %s", mode, indent=2)
579 self.ExecOrQueue(instance, *ops) # pylint: disable-msg=W0142
582 def BurnReplaceDisks2(self):
583 """Replace secondary node."""
584 Log("Changing the secondary node")
585 mode = constants.REPLACE_DISK_CHG
587 mytor = izip(islice(cycle(self.nodes), 2, None),
589 for tnode, instance in mytor:
590 Log("instance %s", instance, indent=1)
591 if self.opts.iallocator:
593 msg = "with iallocator %s" % self.opts.iallocator
596 op = opcodes.OpReplaceDisks(instance_name=instance,
599 iallocator=self.opts.iallocator,
601 early_release=self.opts.early_release)
602 Log("run %s %s", mode, msg, indent=2)
603 self.ExecOrQueue(instance, op)
607 def BurnFailover(self):
608 """Failover the instances."""
609 Log("Failing over instances")
610 for instance in self.instances:
611 Log("instance %s", instance, indent=1)
612 op = opcodes.OpFailoverInstance(instance_name=instance,
613 ignore_consistency=False)
614 self.ExecOrQueue(instance, op)
619 """Move the instances."""
620 Log("Moving instances")
621 mytor = izip(islice(cycle(self.nodes), 1, None),
623 for tnode, instance in mytor:
624 Log("instance %s", instance, indent=1)
625 op = opcodes.OpMoveInstance(instance_name=instance,
627 self.ExecOrQueue(instance, op)
630 def BurnMigrate(self):
631 """Migrate the instances."""
632 Log("Migrating instances")
633 for instance in self.instances:
634 Log("instance %s", instance, indent=1)
635 op1 = opcodes.OpMigrateInstance(instance_name=instance, live=True,
638 op2 = opcodes.OpMigrateInstance(instance_name=instance, live=True,
640 Log("migration and migration cleanup", indent=2)
641 self.ExecOrQueue(instance, op1, op2)
645 def BurnImportExport(self):
646 """Export the instance, delete it, and import it back.
649 Log("Exporting and re-importing instances")
650 mytor = izip(cycle(self.nodes),
651 islice(cycle(self.nodes), 1, None),
652 islice(cycle(self.nodes), 2, None),
655 for pnode, snode, enode, instance in mytor:
656 Log("instance %s", instance, indent=1)
657 # read the full name of the instance
658 nam_op = opcodes.OpQueryInstances(output_fields=["name"],
659 names=[instance], use_locking=True)
660 full_name = self.ExecOp(False, nam_op)[0][0]
662 if self.opts.iallocator:
664 import_log_msg = ("import from %s"
665 " with iallocator %s" %
666 (enode, self.opts.iallocator))
667 elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
669 import_log_msg = ("import from %s to %s" %
672 import_log_msg = ("import from %s to %s, %s" %
673 (enode, pnode, snode))
675 exp_op = opcodes.OpExportInstance(instance_name=instance,
678 rem_op = opcodes.OpRemoveInstance(instance_name=instance,
679 ignore_failures=True)
680 imp_dir = utils.PathJoin(constants.EXPORT_DIR, full_name)
681 imp_op = opcodes.OpCreateInstance(instance_name=instance,
682 disks = [ {"size": size}
683 for size in self.disk_size],
684 disk_template=self.opts.disk_template,
686 mode=constants.INSTANCE_IMPORT,
692 ip_check=self.opts.ip_check,
693 name_check=self.opts.name_check,
695 file_storage_dir=None,
697 iallocator=self.opts.iallocator,
702 erem_op = opcodes.OpRemoveExport(instance_name=instance)
704 Log("export to node %s", enode, indent=2)
705 Log("remove instance", indent=2)
706 Log(import_log_msg, indent=2)
707 Log("remove export", indent=2)
708 self.ExecOrQueue(instance, exp_op, rem_op, imp_op, erem_op)
711 def StopInstanceOp(instance):
712 """Stop given instance."""
713 return opcodes.OpShutdownInstance(instance_name=instance)
716 def StartInstanceOp(instance):
717 """Start given instance."""
718 return opcodes.OpStartupInstance(instance_name=instance, force=False)
721 def RenameInstanceOp(instance, instance_new):
722 """Rename instance."""
723 return opcodes.OpRenameInstance(instance_name=instance,
724 new_name=instance_new)
728 def BurnStopStart(self):
729 """Stop/start the instances."""
730 Log("Stopping and starting instances")
731 for instance in self.instances:
732 Log("instance %s", instance, indent=1)
733 op1 = self.StopInstanceOp(instance)
734 op2 = self.StartInstanceOp(instance)
735 self.ExecOrQueue(instance, op1, op2)
738 def BurnRemove(self):
739 """Remove the instances."""
740 Log("Removing instances")
741 for instance in self.to_rem:
742 Log("instance %s", instance, indent=1)
743 op = opcodes.OpRemoveInstance(instance_name=instance,
744 ignore_failures=True)
745 self.ExecOrQueue(instance, op)
747 def BurnRename(self):
748 """Rename the instances.
750 Note that this function will not execute in parallel, since we
751 only have one target for rename.
754 Log("Renaming instances")
755 rename = self.opts.rename
756 for instance in self.instances:
757 Log("instance %s", instance, indent=1)
758 op_stop1 = self.StopInstanceOp(instance)
759 op_stop2 = self.StopInstanceOp(rename)
760 op_rename1 = self.RenameInstanceOp(instance, rename)
761 op_rename2 = self.RenameInstanceOp(rename, instance)
762 op_start1 = self.StartInstanceOp(rename)
763 op_start2 = self.StartInstanceOp(instance)
764 self.ExecOp(False, op_stop1, op_rename1, op_start1)
765 self._CheckInstanceAlive(rename)
766 self.ExecOp(False, op_stop2, op_rename2, op_start2)
767 self._CheckInstanceAlive(instance)
771 def BurnReinstall(self):
772 """Reinstall the instances."""
773 Log("Reinstalling instances")
774 for instance in self.instances:
775 Log("instance %s", instance, indent=1)
776 op1 = self.StopInstanceOp(instance)
777 op2 = opcodes.OpReinstallInstance(instance_name=instance)
778 Log("reinstall without passing the OS", indent=2)
779 op3 = opcodes.OpReinstallInstance(instance_name=instance,
780 os_type=self.opts.os)
781 Log("reinstall specifying the OS", indent=2)
782 op4 = self.StartInstanceOp(instance)
783 self.ExecOrQueue(instance, op1, op2, op3, op4)
787 def BurnReboot(self):
788 """Reboot the instances."""
789 Log("Rebooting instances")
790 for instance in self.instances:
791 Log("instance %s", instance, indent=1)
793 for reboot_type in constants.REBOOT_TYPES:
794 op = opcodes.OpRebootInstance(instance_name=instance,
795 reboot_type=reboot_type,
796 ignore_secondaries=False)
797 Log("reboot with type '%s'", reboot_type, indent=2)
799 self.ExecOrQueue(instance, *ops) # pylint: disable-msg=W0142
803 def BurnActivateDisks(self):
804 """Activate and deactivate disks of the instances."""
805 Log("Activating/deactivating disks")
806 for instance in self.instances:
807 Log("instance %s", instance, indent=1)
808 op_start = self.StartInstanceOp(instance)
809 op_act = opcodes.OpActivateInstanceDisks(instance_name=instance)
810 op_deact = opcodes.OpDeactivateInstanceDisks(instance_name=instance)
811 op_stop = self.StopInstanceOp(instance)
812 Log("activate disks when online", indent=2)
813 Log("activate disks when offline", indent=2)
814 Log("deactivate disks (when offline)", indent=2)
815 self.ExecOrQueue(instance, op_act, op_stop, op_act, op_deact, op_start)
819 def BurnAddRemoveDisks(self):
820 """Add and remove an extra disk for the instances."""
821 Log("Adding and removing disks")
822 for instance in self.instances:
823 Log("instance %s", instance, indent=1)
824 op_add = opcodes.OpSetInstanceParams(\
825 instance_name=instance,
826 disks=[(constants.DDM_ADD, {"size": self.disk_size[0]})])
827 op_rem = opcodes.OpSetInstanceParams(\
828 instance_name=instance, disks=[(constants.DDM_REMOVE, {})])
829 op_stop = self.StopInstanceOp(instance)
830 op_start = self.StartInstanceOp(instance)
831 Log("adding a disk", indent=2)
832 Log("removing last disk", indent=2)
833 self.ExecOrQueue(instance, op_add, op_stop, op_rem, op_start)
836 def BurnAddRemoveNICs(self):
837 """Add and remove an extra NIC for the instances."""
838 Log("Adding and removing NICs")
839 for instance in self.instances:
840 Log("instance %s", instance, indent=1)
841 op_add = opcodes.OpSetInstanceParams(\
842 instance_name=instance, nics=[(constants.DDM_ADD, {})])
843 op_rem = opcodes.OpSetInstanceParams(\
844 instance_name=instance, nics=[(constants.DDM_REMOVE, {})])
845 Log("adding a NIC", indent=2)
846 Log("removing last NIC", indent=2)
847 self.ExecOrQueue(instance, op_add, op_rem)
849 def ConfdCallback(self, reply):
850 """Callback for confd queries"""
851 if reply.type == confd_client.UPCALL_REPLY:
852 if reply.server_reply.status != constants.CONFD_REPL_STATUS_OK:
853 Err("Query %s gave non-ok status %s: %s" % (reply.orig_request,
854 reply.server_reply.status,
856 if reply.orig_request.type == constants.CONFD_REQ_PING:
857 Log("Ping: OK", indent=1)
858 elif reply.orig_request.type == constants.CONFD_REQ_CLUSTER_MASTER:
859 if reply.server_reply.answer == self.cluster_info["master"]:
860 Log("Master: OK", indent=1)
862 Err("Master: wrong: %s" % reply.server_reply.answer)
863 elif reply.orig_request.type == constants.CONFD_REQ_NODE_ROLE_BYNAME:
864 if reply.server_reply.answer == constants.CONFD_NODE_ROLE_MASTER:
865 Log("Node role for master: OK", indent=1)
867 Err("Node role for master: wrong: %s" % reply.server_reply.answer)
869 def DoConfdRequestReply(self, req):
870 self.confd_counting_callback.RegisterQuery(req.rsalt)
871 self.confd_client.SendRequest(req, async=False)
872 while not self.confd_counting_callback.AllAnswered():
873 if not self.confd_client.ReceiveReply():
874 Err("Did not receive all expected confd replies")
878 """Run confd queries for our instances.
880 The following confd queries are tested:
881 - CONFD_REQ_PING: simple ping
882 - CONFD_REQ_CLUSTER_MASTER: cluster master
883 - CONFD_REQ_NODE_ROLE_BYNAME: node role, for the master
886 Log("Checking confd results")
888 hmac_key = utils.ReadFile(constants.CONFD_HMAC_KEY)
889 mc_file = self.ss.KeyToFilename(constants.SS_MASTER_CANDIDATES_IPS)
890 mc_list = utils.ReadFile(mc_file).splitlines()
891 filter_callback = confd_client.ConfdFilterCallback(self.ConfdCallback)
892 counting_callback = confd_client.ConfdCountingCallback(filter_callback)
893 self.confd_counting_callback = counting_callback
895 self.confd_client = confd_client.ConfdClient(hmac_key, mc_list,
898 req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
899 self.DoConfdRequestReply(req)
901 req = confd_client.ConfdClientRequest(
902 type=constants.CONFD_REQ_CLUSTER_MASTER)
903 self.DoConfdRequestReply(req)
905 req = confd_client.ConfdClientRequest(
906 type=constants.CONFD_REQ_NODE_ROLE_BYNAME,
907 query=self.cluster_info["master"])
908 self.DoConfdRequestReply(req)
910 def _CheckInstanceAlive(self, instance):
911 """Check if an instance is alive by doing http checks.
913 This will try to retrieve the url on the instance /hostname.txt
914 and check that it contains the hostname of the instance. In case
915 we get ECONNREFUSED, we retry up to the net timeout seconds, for
916 any other error we abort.
919 if not self.opts.http_check:
921 end_time = time.time() + self.opts.net_timeout
923 while time.time() < end_time and url is None:
925 url = self.url_opener.open("http://%s/hostname.txt" % instance)
927 # here we can have connection refused, no route to host, etc.
930 raise InstanceDown(instance, "Cannot contact instance")
931 hostname = url.read().strip()
933 if hostname != instance:
934 raise InstanceDown(instance, ("Hostname mismatch, expected %s, got %s" %
935 (instance, hostname)))
937 def BurninCluster(self):
938 """Test a cluster intensively.
940 This will create instances and then start/stop/failover them.
941 It is safe for existing instances but could impact performance.
947 Log("Testing global parameters")
949 if (len(self.nodes) == 1 and
950 opts.disk_template not in (constants.DT_DISKLESS, constants.DT_PLAIN,
952 Err("When one node is available/selected the disk template must"
953 " be 'diskless', 'file' or 'plain'")
957 self.BurnCreateInstances()
958 if opts.do_replace1 and opts.disk_template in constants.DTS_NET_MIRROR:
959 self.BurnReplaceDisks1D8()
960 if (opts.do_replace2 and len(self.nodes) > 2 and
961 opts.disk_template in constants.DTS_NET_MIRROR) :
962 self.BurnReplaceDisks2()
964 if (opts.disk_template in constants.DTS_GROWABLE and
965 utils.any(self.disk_growth, lambda n: n > 0)):
968 if opts.do_failover and opts.disk_template in constants.DTS_NET_MIRROR:
971 if opts.do_migrate and opts.disk_template == constants.DT_DRBD8:
974 if (opts.do_move and len(self.nodes) > 1 and
975 opts.disk_template in [constants.DT_PLAIN, constants.DT_FILE]):
978 if (opts.do_importexport and
979 opts.disk_template not in (constants.DT_DISKLESS,
981 self.BurnImportExport()
983 if opts.do_reinstall:
989 if opts.do_addremove_disks:
990 self.BurnAddRemoveDisks()
992 default_nic_mode = self.cluster_default_nicparams[constants.NIC_MODE]
993 # Don't add/remove nics in routed mode, as we would need an ip to add
995 if opts.do_addremove_nics:
996 if default_nic_mode == constants.NIC_MODE_BRIDGED:
997 self.BurnAddRemoveNICs()
999 Log("Skipping nic add/remove as the cluster is not in bridged mode")
1001 if opts.do_activate_disks:
1002 self.BurnActivateDisks()
1007 if opts.do_confd_tests:
1010 if opts.do_startstop:
1011 self.BurnStopStart()
1016 Log("Error detected: opcode buffer follows:\n\n")
1017 Log(self.GetFeedbackBuf())
1019 if not self.opts.keep_instances:
1022 except Exception, err: # pylint: disable-msg=W0703
1023 if has_err: # already detected errors, so errors in removal
1024 # are quite expected
1025 Log("Note: error detected during instance remove: %s", err)
1026 else: # non-expected error
1036 return burner.BurninCluster()
1039 if __name__ == "__main__":