4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
32 from itertools import izip, islice, cycle
33 from cStringIO import StringIO
35 from ganeti import opcodes
36 from ganeti import constants
37 from ganeti import cli
38 from ganeti import errors
39 from ganeti import utils
42 USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...")
45 class InstanceDown(Exception):
46 """The checked instance was not up"""
50 """Shows program usage information and exits the program."""
52 print >> sys.stderr, "Usage:"
53 print >> sys.stderr, USAGE
57 def Log(msg, indent=0):
58 """Simple function that prints out its argument.
66 sys.stdout.write("%*s%s%s\n" % (2*indent, "",
67 headers.get(indent, " "), msg))
70 def Err(msg, exit_code=1):
71 """Simple error logging that prints to stderr.
74 sys.stderr.write(msg + "\n")
79 class SimpleOpener(urllib.FancyURLopener):
80 """A simple url opener"""
82 def prompt_user_passwd(self, host, realm, clear_cache = 0):
83 """No-interaction version of prompt_user_passwd."""
86 def http_error_default(self, url, fp, errcode, errmsg, headers):
87 """Custom error handling"""
88 # make sure sockets are not left in CLOSE_WAIT, this is similar
89 # but with a different exception to the BasicURLOpener class
90 _ = fp.read() # throw away data
92 raise InstanceDown("HTTP error returned: code %s, msg %s" %
101 utils.SetupLogging(constants.LOG_BURNIN, debug=False, stderr_logging=True)
102 self.url_opener = SimpleOpener()
103 self._feed_buf = StringIO()
110 self.cl = cli.GetClient()
113 def ClearFeedbackBuf(self):
114 """Clear the feedback buffer."""
115 self._feed_buf.truncate(0)
117 def GetFeedbackBuf(self):
118 """Return the contents of the buffer."""
119 return self._feed_buf.getvalue()
121 def Feedback(self, msg):
122 """Acumulate feedback in our buffer."""
123 self._feed_buf.write("%s %s\n" % (time.ctime(utils.MergeTime(msg[0])),
125 if self.opts.verbose:
128 def ExecOp(self, *ops):
129 """Execute one or more opcodes and manage the exec buffer.
131 @result: if only opcode has been passed, we return its result;
132 otherwise we return the list of results
135 job_id = cli.SendJob(ops, cl=self.cl)
136 results = cli.PollJob(job_id, cl=self.cl, feedback_fn=self.Feedback)
142 def ExecOrQueue(self, name, *ops):
143 """Execute an opcode and manage the exec buffer."""
144 if self.opts.parallel:
145 self.queued_ops.append((ops, name))
147 return self.ExecOp(*ops)
149 def CommitQueue(self):
150 """Execute all submitted opcodes in case of parallel burnin"""
151 if not self.opts.parallel:
155 results = self.ExecJobSet(self.queued_ops)
160 def ExecJobSet(self, jobs):
161 """Execute a set of jobs and return once all are done.
163 The method will return the list of results, if all jobs are
164 successful. Otherwise, OpExecError will be raised from within
168 self.ClearFeedbackBuf()
169 job_ids = [cli.SendJob(row[0], cl=self.cl) for row in jobs]
170 Log("Submitted job ID(s) %s" % ", ".join(job_ids), indent=1)
172 for jid, (_, iname) in zip(job_ids, jobs):
173 Log("waiting for job %s for %s" % (jid, iname), indent=2)
174 results.append(cli.PollJob(jid, cl=self.cl, feedback_fn=self.Feedback))
178 def ParseOptions(self):
179 """Parses the command line options.
181 In case of command line errors, it will show the usage and exit the
186 parser = optparse.OptionParser(usage="\n%s" % USAGE,
187 version="%%prog (ganeti) %s" %
188 constants.RELEASE_VERSION,
189 option_class=cli.CliOption)
191 parser.add_option("-o", "--os", dest="os", default=None,
192 help="OS to use during burnin",
194 parser.add_option("--disk-size", dest="disk_size",
195 help="Disk size (determines disk count)",
196 default="128m", type="string", metavar="<size,size,...>")
197 parser.add_option("--disk-growth", dest="disk_growth", help="Disk growth",
198 default="128m", type="string", metavar="<size,size,...>")
199 parser.add_option("--mem-size", dest="mem_size", help="Memory size",
200 default=128, type="unit", metavar="<size>")
201 parser.add_option("-v", "--verbose",
202 action="store_true", dest="verbose", default=False,
203 help="print command execution messages to stdout")
204 parser.add_option("--no-replace1", dest="do_replace1",
205 help="Skip disk replacement with the same secondary",
206 action="store_false", default=True)
207 parser.add_option("--no-replace2", dest="do_replace2",
208 help="Skip disk replacement with a different secondary",
209 action="store_false", default=True)
210 parser.add_option("--no-failover", dest="do_failover",
211 help="Skip instance failovers", action="store_false",
213 parser.add_option("--no-migrate", dest="do_migrate",
214 help="Skip instance live migration",
215 action="store_false", default=True)
216 parser.add_option("--no-importexport", dest="do_importexport",
217 help="Skip instance export/import", action="store_false",
219 parser.add_option("--no-startstop", dest="do_startstop",
220 help="Skip instance stop/start", action="store_false",
222 parser.add_option("--no-reinstall", dest="do_reinstall",
223 help="Skip instance reinstall", action="store_false",
225 parser.add_option("--no-reboot", dest="do_reboot",
226 help="Skip instance reboot", action="store_false",
228 parser.add_option("--no-activate-disks", dest="do_activate_disks",
229 help="Skip disk activation/deactivation",
230 action="store_false", default=True)
231 parser.add_option("--no-add-disks", dest="do_addremove_disks",
232 help="Skip disk addition/removal",
233 action="store_false", default=True)
234 parser.add_option("--no-add-nics", dest="do_addremove_nics",
235 help="Skip NIC addition/removal",
236 action="store_false", default=True)
237 parser.add_option("--no-nics", dest="nics",
238 help="No network interfaces", action="store_const",
239 const=[], default=[{}])
240 parser.add_option("--rename", dest="rename", default=None,
241 help="Give one unused instance name which is taken"
242 " to start the renaming sequence",
243 metavar="<instance_name>")
244 parser.add_option("-t", "--disk-template", dest="disk_template",
245 choices=("diskless", "file", "plain", "drbd"),
247 help="Disk template (diskless, file, plain or drbd)"
249 parser.add_option("-n", "--nodes", dest="nodes", default="",
250 help="Comma separated list of nodes to perform"
251 " the burnin on (defaults to all nodes)")
252 parser.add_option("-I", "--iallocator", dest="iallocator",
253 default=None, type="string",
254 help="Perform the allocation using an iallocator"
255 " instead of fixed node spread (node restrictions no"
256 " longer apply, therefore -n/--nodes must not be used")
257 parser.add_option("-p", "--parallel", default=False, action="store_true",
259 help="Enable parallelization of some operations in"
260 " order to speed burnin or to test granular locking")
261 parser.add_option("--net-timeout", default=15, type="int",
263 help="The instance check network timeout in seconds"
264 " (defaults to 15 seconds)")
265 parser.add_option("-C", "--http-check", default=False, action="store_true",
267 help="Enable checking of instance status via http,"
268 " looking for /hostname.txt that should contain the"
269 " name of the instance")
270 parser.add_option("-K", "--keep-instances", default=False,
272 dest="keep_instances",
273 help="Leave instances on the cluster after burnin,"
274 " for investigation in case of errors or simply"
278 options, args = parser.parse_args()
279 if len(args) < 1 or options.os is None:
282 supported_disk_templates = (constants.DT_DISKLESS,
286 if options.disk_template not in supported_disk_templates:
287 Err("Unknown disk template '%s'" % options.disk_template)
289 if options.disk_template == constants.DT_DISKLESS:
290 disk_size = disk_growth = []
291 options.do_addremove_disks = False
293 disk_size = [utils.ParseUnit(v) for v in options.disk_size.split(",")]
294 disk_growth = [utils.ParseUnit(v)
295 for v in options.disk_growth.split(",")]
296 if len(disk_growth) != len(disk_size):
297 Err("Wrong disk sizes/growth combination")
298 if ((disk_size and options.disk_template == constants.DT_DISKLESS) or
299 (not disk_size and options.disk_template != constants.DT_DISKLESS)):
300 Err("Wrong disk count/disk template combination")
302 self.disk_size = disk_size
303 self.disk_growth = disk_growth
304 self.disk_count = len(disk_size)
306 if options.nodes and options.iallocator:
307 Err("Give either the nodes option or the iallocator option, not both")
310 self.instances = args
312 constants.BE_MEMORY: options.mem_size,
313 constants.BE_VCPUS: 1,
317 socket.setdefaulttimeout(options.net_timeout)
320 """Read the cluster state from the config."""
322 names = self.opts.nodes.split(",")
326 op = opcodes.OpQueryNodes(output_fields=["name", "offline", "drained"],
327 names=names, use_locking=True)
328 result = self.ExecOp(op)
329 except errors.GenericError, err:
330 err_code, msg = cli.FormatError(err)
331 Err(msg, exit_code=err_code)
332 self.nodes = [data[0] for data in result if not (data[1] or data[2])]
334 result = self.ExecOp(opcodes.OpDiagnoseOS(output_fields=["name", "valid"],
338 Err("Can't get the OS list")
340 # filter non-valid OS-es
341 os_set = [val[0] for val in result if val[1]]
343 if self.opts.os not in os_set:
344 Err("OS '%s' not found" % self.opts.os)
346 def BurnCreateInstances(self):
347 """Create the given instances.
351 mytor = izip(cycle(self.nodes),
352 islice(cycle(self.nodes), 1, None),
355 Log("Creating instances")
356 for pnode, snode, instance in mytor:
357 Log("instance %s" % instance, indent=1)
358 if self.opts.iallocator:
360 msg = "with iallocator %s" % self.opts.iallocator
361 elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
363 msg = "on %s" % pnode
365 msg = "on %s, %s" % (pnode, snode)
369 op = opcodes.OpCreateInstance(instance_name=instance,
370 disks = [ {"size": size}
371 for size in self.disk_size],
372 disk_template=self.opts.disk_template,
374 mode=constants.INSTANCE_CREATE,
375 os_type=self.opts.os,
382 file_storage_dir=None,
383 iallocator=self.opts.iallocator,
388 self.ExecOrQueue(instance, op)
389 self.to_rem.append(instance)
393 for instance in self.instances:
394 self._CheckInstanceAlive(instance)
396 def BurnGrowDisks(self):
397 """Grow both the os and the swap disks by the requested amount, if any."""
399 for instance in self.instances:
400 Log("instance %s" % instance, indent=1)
401 for idx, growth in enumerate(self.disk_growth):
403 op = opcodes.OpGrowDisk(instance_name=instance, disk=idx,
404 amount=growth, wait_for_sync=True)
405 Log("increase disk/%s by %s MB" % (idx, growth), indent=2)
406 self.ExecOrQueue(instance, op)
409 def BurnReplaceDisks1D8(self):
410 """Replace disks on primary and secondary for drbd8."""
411 Log("Replacing disks on the same nodes")
412 for instance in self.instances:
413 Log("instance %s" % instance, indent=1)
415 for mode in constants.REPLACE_DISK_SEC, constants.REPLACE_DISK_PRI:
416 op = opcodes.OpReplaceDisks(instance_name=instance,
418 disks=[i for i in range(self.disk_count)])
419 Log("run %s" % mode, indent=2)
421 self.ExecOrQueue(instance, *ops)
424 def BurnReplaceDisks2(self):
425 """Replace secondary node."""
426 Log("Changing the secondary node")
427 mode = constants.REPLACE_DISK_CHG
429 mytor = izip(islice(cycle(self.nodes), 2, None),
431 for tnode, instance in mytor:
432 Log("instance %s" % instance, indent=1)
433 if self.opts.iallocator:
435 msg = "with iallocator %s" % self.opts.iallocator
438 op = opcodes.OpReplaceDisks(instance_name=instance,
441 iallocator=self.opts.iallocator,
442 disks=[i for i in range(self.disk_count)])
443 Log("run %s %s" % (mode, msg), indent=2)
444 self.ExecOrQueue(instance, op)
447 def BurnFailover(self):
448 """Failover the instances."""
449 Log("Failing over instances")
450 for instance in self.instances:
451 Log("instance %s" % instance, indent=1)
452 op = opcodes.OpFailoverInstance(instance_name=instance,
453 ignore_consistency=False)
455 self.ExecOrQueue(instance, op)
457 for instance in self.instances:
458 self._CheckInstanceAlive(instance)
460 def BurnMigrate(self):
461 """Migrate the instances."""
462 Log("Migrating instances")
463 for instance in self.instances:
464 Log("instance %s" % instance, indent=1)
465 op1 = opcodes.OpMigrateInstance(instance_name=instance, live=True,
468 op2 = opcodes.OpMigrateInstance(instance_name=instance, live=True,
470 Log("migration and migration cleanup", indent=2)
471 self.ExecOrQueue(instance, op1, op2)
474 def BurnImportExport(self):
475 """Export the instance, delete it, and import it back.
478 Log("Exporting and re-importing instances")
479 mytor = izip(cycle(self.nodes),
480 islice(cycle(self.nodes), 1, None),
481 islice(cycle(self.nodes), 2, None),
484 for pnode, snode, enode, instance in mytor:
485 Log("instance %s" % instance, indent=1)
486 # read the full name of the instance
487 nam_op = opcodes.OpQueryInstances(output_fields=["name"],
488 names=[instance], use_locking=True)
489 full_name = self.ExecOp(nam_op)[0][0]
491 if self.opts.iallocator:
493 import_log_msg = ("import from %s"
494 " with iallocator %s" %
495 (enode, self.opts.iallocator))
496 elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
498 import_log_msg = ("import from %s to %s" %
501 import_log_msg = ("import from %s to %s, %s" %
502 (enode, pnode, snode))
504 exp_op = opcodes.OpExportInstance(instance_name=instance,
507 rem_op = opcodes.OpRemoveInstance(instance_name=instance,
508 ignore_failures=True)
509 imp_dir = os.path.join(constants.EXPORT_DIR, full_name)
510 imp_op = opcodes.OpCreateInstance(instance_name=instance,
511 disks = [ {"size": size}
512 for size in self.disk_size],
513 disk_template=self.opts.disk_template,
515 mode=constants.INSTANCE_IMPORT,
523 file_storage_dir=None,
525 iallocator=self.opts.iallocator,
530 erem_op = opcodes.OpRemoveExport(instance_name=instance)
532 Log("export to node %s" % enode, indent=2)
533 Log("remove instance", indent=2)
534 Log(import_log_msg, indent=2)
535 Log("remove export", indent=2)
536 self.ExecOrQueue(instance, exp_op, rem_op, imp_op, erem_op)
539 for instance in self.instances:
540 self._CheckInstanceAlive(instance)
542 def StopInstanceOp(self, instance):
543 """Stop given instance."""
544 return opcodes.OpShutdownInstance(instance_name=instance)
546 def StartInstanceOp(self, instance):
547 """Start given instance."""
548 return opcodes.OpStartupInstance(instance_name=instance, force=False)
550 def RenameInstanceOp(self, instance, instance_new):
551 """Rename instance."""
552 return opcodes.OpRenameInstance(instance_name=instance,
553 new_name=instance_new)
555 def BurnStopStart(self):
556 """Stop/start the instances."""
557 Log("Stopping and starting instances")
558 for instance in self.instances:
559 Log("instance %s" % instance, indent=1)
560 op1 = self.StopInstanceOp(instance)
561 op2 = self.StartInstanceOp(instance)
562 self.ExecOrQueue(instance, op1, op2)
566 for instance in self.instances:
567 self._CheckInstanceAlive(instance)
569 def BurnRemove(self):
570 """Remove the instances."""
571 Log("Removing instances")
572 for instance in self.to_rem:
573 Log("instance %s" % instance, indent=1)
574 op = opcodes.OpRemoveInstance(instance_name=instance,
575 ignore_failures=True)
576 self.ExecOrQueue(instance, op)
580 def BurnRename(self):
581 """Rename the instances.
583 Note that this function will not execute in parallel, since we
584 only have one target for rename.
587 Log("Renaming instances")
588 rename = self.opts.rename
589 for instance in self.instances:
590 Log("instance %s" % instance, indent=1)
591 op_stop1 = self.StopInstanceOp(instance)
592 op_stop2 = self.StopInstanceOp(rename)
593 op_rename1 = self.RenameInstanceOp(instance, rename)
594 op_rename2 = self.RenameInstanceOp(rename, instance)
595 op_start1 = self.StartInstanceOp(rename)
596 op_start2 = self.StartInstanceOp(instance)
597 self.ExecOp(op_stop1, op_rename1, op_start1)
598 self._CheckInstanceAlive(rename)
599 self.ExecOp(op_stop2, op_rename2, op_start2)
600 self._CheckInstanceAlive(instance)
602 def BurnReinstall(self):
603 """Reinstall the instances."""
604 Log("Reinstalling instances")
605 for instance in self.instances:
606 Log("instance %s" % instance, indent=1)
607 op1 = self.StopInstanceOp(instance)
608 op2 = opcodes.OpReinstallInstance(instance_name=instance)
609 Log("reinstall without passing the OS", indent=2)
610 op3 = opcodes.OpReinstallInstance(instance_name=instance,
611 os_type=self.opts.os)
612 Log("reinstall specifying the OS", indent=2)
613 op4 = self.StartInstanceOp(instance)
614 self.ExecOrQueue(instance, op1, op2, op3, op4)
618 for instance in self.instances:
619 self._CheckInstanceAlive(instance)
621 def BurnReboot(self):
622 """Reboot the instances."""
623 Log("Rebooting instances")
624 for instance in self.instances:
625 Log("instance %s" % instance, indent=1)
627 for reboot_type in constants.REBOOT_TYPES:
628 op = opcodes.OpRebootInstance(instance_name=instance,
629 reboot_type=reboot_type,
630 ignore_secondaries=False)
631 Log("reboot with type '%s'" % reboot_type, indent=2)
633 self.ExecOrQueue(instance, *ops)
637 for instance in self.instances:
638 self._CheckInstanceAlive(instance)
640 def BurnActivateDisks(self):
641 """Activate and deactivate disks of the instances."""
642 Log("Activating/deactivating disks")
643 for instance in self.instances:
644 Log("instance %s" % instance, indent=1)
645 op_start = self.StartInstanceOp(instance)
646 op_act = opcodes.OpActivateInstanceDisks(instance_name=instance)
647 op_deact = opcodes.OpDeactivateInstanceDisks(instance_name=instance)
648 op_stop = self.StopInstanceOp(instance)
649 Log("activate disks when online", indent=2)
650 Log("activate disks when offline", indent=2)
651 Log("deactivate disks (when offline)", indent=2)
652 self.ExecOrQueue(instance, op_act, op_stop, op_act, op_deact, op_start)
654 for instance in self.instances:
655 self._CheckInstanceAlive(instance)
657 def BurnAddRemoveDisks(self):
658 """Add and remove an extra disk for the instances."""
659 Log("Adding and removing disks")
660 for instance in self.instances:
661 Log("instance %s" % instance, indent=1)
662 op_add = opcodes.OpSetInstanceParams(\
663 instance_name=instance,
664 disks=[(constants.DDM_ADD, {"size": self.disk_size[0]})])
665 op_rem = opcodes.OpSetInstanceParams(\
666 instance_name=instance, disks=[(constants.DDM_REMOVE, {})])
667 op_stop = self.StopInstanceOp(instance)
668 op_start = self.StartInstanceOp(instance)
669 Log("adding a disk", indent=2)
670 Log("removing last disk", indent=2)
671 self.ExecOrQueue(instance, op_add, op_stop, op_rem, op_start)
673 for instance in self.instances:
674 self._CheckInstanceAlive(instance)
676 def BurnAddRemoveNICs(self):
677 """Add and remove an extra NIC for the instances."""
678 Log("Adding and removing NICs")
679 for instance in self.instances:
680 Log("instance %s" % instance, indent=1)
681 op_add = opcodes.OpSetInstanceParams(\
682 instance_name=instance, nics=[(constants.DDM_ADD, {})])
683 op_rem = opcodes.OpSetInstanceParams(\
684 instance_name=instance, nics=[(constants.DDM_REMOVE, {})])
685 Log("adding a NIC", indent=2)
686 Log("removing last NIC", indent=2)
687 self.ExecOrQueue(instance, op_add, op_rem)
690 def _CheckInstanceAlive(self, instance):
691 """Check if an instance is alive by doing http checks.
693 This will try to retrieve the url on the instance /hostname.txt
694 and check that it contains the hostname of the instance. In case
695 we get ECONNREFUSED, we retry up to the net timeout seconds, for
696 any other error we abort.
699 if not self.opts.http_check:
701 end_time = time.time() + self.opts.net_timeout
703 while time.time() < end_time and url is None:
705 url = self.url_opener.open("http://%s/hostname.txt" % instance)
707 # here we can have connection refused, no route to host, etc.
710 raise InstanceDown(instance, "Cannot contact instance")
711 hostname = url.read().strip()
713 if hostname != instance:
714 raise InstanceDown(instance, ("Hostname mismatch, expected %s, got %s" %
715 (instance, hostname)))
717 def BurninCluster(self):
718 """Test a cluster intensively.
720 This will create instances and then start/stop/failover them.
721 It is safe for existing instances but could impact performance.
727 Log("Testing global parameters")
729 if (len(self.nodes) == 1 and
730 opts.disk_template not in (constants.DT_DISKLESS, constants.DT_PLAIN,
732 Err("When one node is available/selected the disk template must"
733 " be 'diskless', 'file' or 'plain'")
737 self.BurnCreateInstances()
738 if opts.do_replace1 and opts.disk_template in constants.DTS_NET_MIRROR:
739 self.BurnReplaceDisks1D8()
740 if (opts.do_replace2 and len(self.nodes) > 2 and
741 opts.disk_template in constants.DTS_NET_MIRROR) :
742 self.BurnReplaceDisks2()
744 if (opts.disk_template != constants.DT_DISKLESS and
745 utils.any(self.disk_growth, lambda n: n > 0)):
748 if opts.do_failover and opts.disk_template in constants.DTS_NET_MIRROR:
751 if opts.do_migrate and opts.disk_template == constants.DT_DRBD8:
754 if (opts.do_importexport and
755 opts.disk_template not in (constants.DT_DISKLESS,
757 self.BurnImportExport()
759 if opts.do_reinstall:
765 if opts.do_addremove_disks:
766 self.BurnAddRemoveDisks()
768 if opts.do_addremove_nics:
769 self.BurnAddRemoveNICs()
771 if opts.do_activate_disks:
772 self.BurnActivateDisks()
777 if opts.do_startstop:
783 Log("Error detected: opcode buffer follows:\n\n")
784 Log(self.GetFeedbackBuf())
786 if not self.opts.keep_instances:
796 return burner.BurninCluster()
799 if __name__ == "__main__":