4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
32 from itertools import izip, islice, cycle
33 from cStringIO import StringIO
35 from ganeti import opcodes
36 from ganeti import constants
37 from ganeti import cli
38 from ganeti import errors
39 from ganeti import utils
42 USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...")
45 class InstanceDown(Exception):
46 """The checked instance was not up"""
50 """Shows program usage information and exits the program."""
52 print >> sys.stderr, "Usage:"
53 print >> sys.stderr, USAGE
57 def Log(msg, indent=0):
58 """Simple function that prints out its argument.
66 sys.stdout.write("%*s%s%s\n" % (2*indent, "",
67 headers.get(indent, " "), msg))
70 def Err(msg, exit_code=1):
71 """Simple error logging that prints to stderr.
74 sys.stderr.write(msg + "\n")
79 class SimpleOpener(urllib.FancyURLopener):
80 """A simple url opener"""
82 def prompt_user_passwd(self, host, realm, clear_cache = 0):
83 """No-interaction version of prompt_user_passwd."""
86 def http_error_default(self, url, fp, errcode, errmsg, headers):
87 """Custom error handling"""
88 # make sure sockets are not left in CLOSE_WAIT, this is similar
89 # but with a different exception to the BasicURLOpener class
90 _ = fp.read() # throw away data
92 raise InstanceDown("HTTP error returned: code %s, msg %s" %
101 utils.SetupLogging(constants.LOG_BURNIN, debug=False, stderr_logging=True)
102 self.url_opener = SimpleOpener()
103 self._feed_buf = StringIO()
110 self.cl = cli.GetClient()
113 def ClearFeedbackBuf(self):
114 """Clear the feedback buffer."""
115 self._feed_buf.truncate(0)
117 def GetFeedbackBuf(self):
118 """Return the contents of the buffer."""
119 return self._feed_buf.getvalue()
121 def Feedback(self, msg):
122 """Acumulate feedback in our buffer."""
123 self._feed_buf.write("%s %s\n" % (time.ctime(utils.MergeTime(msg[0])),
125 if self.opts.verbose:
128 def ExecOp(self, *ops):
129 """Execute one or more opcodes and manage the exec buffer.
131 @result: if only opcode has been passed, we return its result;
132 otherwise we return the list of results
135 job_id = cli.SendJob(ops, cl=self.cl)
136 results = cli.PollJob(job_id, cl=self.cl, feedback_fn=self.Feedback)
142 def ExecOrQueue(self, name, *ops):
143 """Execute an opcode and manage the exec buffer."""
144 if self.opts.parallel:
145 self.queued_ops.append((ops, name))
147 return self.ExecOp(*ops)
149 def CommitQueue(self):
150 """Execute all submitted opcodes in case of parallel burnin"""
151 if not self.opts.parallel:
155 results = self.ExecJobSet(self.queued_ops)
160 def ExecJobSet(self, jobs):
161 """Execute a set of jobs and return once all are done.
163 The method will return the list of results, if all jobs are
164 successful. Otherwise, OpExecError will be raised from within
168 self.ClearFeedbackBuf()
169 job_ids = [cli.SendJob(row[0], cl=self.cl) for row in jobs]
170 Log("Submitted job ID(s) %s" % ", ".join(job_ids), indent=1)
172 for jid, (_, iname) in zip(job_ids, jobs):
173 Log("waiting for job %s for %s" % (jid, iname), indent=2)
174 results.append(cli.PollJob(jid, cl=self.cl, feedback_fn=self.Feedback))
178 def ParseOptions(self):
179 """Parses the command line options.
181 In case of command line errors, it will show the usage and exit the
186 parser = optparse.OptionParser(usage="\n%s" % USAGE,
187 version="%%prog (ganeti) %s" %
188 constants.RELEASE_VERSION,
189 option_class=cli.CliOption)
191 parser.add_option("-o", "--os", dest="os", default=None,
192 help="OS to use during burnin",
194 parser.add_option("--disk-size", dest="disk_size",
195 help="Disk size (determines disk count)",
196 default="128m", type="string", metavar="<size,size,...>")
197 parser.add_option("--disk-growth", dest="disk_growth", help="Disk growth",
198 default="128m", type="string", metavar="<size,size,...>")
199 parser.add_option("--mem-size", dest="mem_size", help="Memory size",
200 default=128, type="unit", metavar="<size>")
201 parser.add_option("-v", "--verbose",
202 action="store_true", dest="verbose", default=False,
203 help="print command execution messages to stdout")
204 parser.add_option("--no-replace1", dest="do_replace1",
205 help="Skip disk replacement with the same secondary",
206 action="store_false", default=True)
207 parser.add_option("--no-replace2", dest="do_replace2",
208 help="Skip disk replacement with a different secondary",
209 action="store_false", default=True)
210 parser.add_option("--no-failover", dest="do_failover",
211 help="Skip instance failovers", action="store_false",
213 parser.add_option("--no-migrate", dest="do_migrate",
214 help="Skip instance live migration",
215 action="store_false", default=True)
216 parser.add_option("--no-importexport", dest="do_importexport",
217 help="Skip instance export/import", action="store_false",
219 parser.add_option("--no-startstop", dest="do_startstop",
220 help="Skip instance stop/start", action="store_false",
222 parser.add_option("--no-reinstall", dest="do_reinstall",
223 help="Skip instance reinstall", action="store_false",
225 parser.add_option("--no-reboot", dest="do_reboot",
226 help="Skip instance reboot", action="store_false",
228 parser.add_option("--no-activate-disks", dest="do_activate_disks",
229 help="Skip disk activation/deactivation",
230 action="store_false", default=True)
231 parser.add_option("--no-add-disks", dest="do_addremove_disks",
232 help="Skip disk addition/removal",
233 action="store_false", default=True)
234 parser.add_option("--no-add-nics", dest="do_addremove_nics",
235 help="Skip NIC addition/removal",
236 action="store_false", default=True)
237 parser.add_option("--no-nics", dest="nics",
238 help="No network interfaces", action="store_const",
239 const=[], default=[{}])
240 parser.add_option("--rename", dest="rename", default=None,
241 help="Give one unused instance name which is taken"
242 " to start the renaming sequence",
243 metavar="<instance_name>")
244 parser.add_option("-t", "--disk-template", dest="disk_template",
245 choices=("diskless", "file", "plain", "drbd"),
247 help="Disk template (diskless, file, plain or drbd)"
249 parser.add_option("-n", "--nodes", dest="nodes", default="",
250 help="Comma separated list of nodes to perform"
251 " the burnin on (defaults to all nodes)")
252 parser.add_option("-I", "--iallocator", dest="iallocator",
253 default=None, type="string",
254 help="Perform the allocation using an iallocator"
255 " instead of fixed node spread (node restrictions no"
256 " longer apply, therefore -n/--nodes must not be used")
257 parser.add_option("-p", "--parallel", default=False, action="store_true",
259 help="Enable parallelization of some operations in"
260 " order to speed burnin or to test granular locking")
261 parser.add_option("--net-timeout", default=15, type="int",
263 help="The instance check network timeout in seconds"
264 " (defaults to 15 seconds)")
265 parser.add_option("-C", "--http-check", default=False, action="store_true",
267 help="Enable checking of instance status via http,"
268 " looking for /hostname.txt that should contain the"
269 " name of the instance")
270 parser.add_option("-K", "--keep-instances", default=False,
272 dest="keep_instances",
273 help="Leave instances on the cluster after burnin,"
274 " for investigation in case of errors or simply"
278 options, args = parser.parse_args()
279 if len(args) < 1 or options.os is None:
282 supported_disk_templates = (constants.DT_DISKLESS,
286 if options.disk_template not in supported_disk_templates:
287 Err("Unknown disk template '%s'" % options.disk_template)
289 if options.disk_template == constants.DT_DISKLESS:
290 disk_size = disk_growth = []
291 options.do_addremove_disks = False
293 disk_size = [utils.ParseUnit(v) for v in options.disk_size.split(",")]
294 disk_growth = [utils.ParseUnit(v)
295 for v in options.disk_growth.split(",")]
296 if len(disk_growth) != len(disk_size):
297 Err("Wrong disk sizes/growth combination")
298 if ((disk_size and options.disk_template == constants.DT_DISKLESS) or
299 (not disk_size and options.disk_template != constants.DT_DISKLESS)):
300 Err("Wrong disk count/disk template combination")
302 self.disk_size = disk_size
303 self.disk_growth = disk_growth
304 self.disk_count = len(disk_size)
306 if options.nodes and options.iallocator:
307 Err("Give either the nodes option or the iallocator option, not both")
310 self.instances = args
312 constants.BE_MEMORY: options.mem_size,
313 constants.BE_VCPUS: 1,
317 socket.setdefaulttimeout(options.net_timeout)
320 """Read the cluster state from the config."""
322 names = self.opts.nodes.split(",")
326 op = opcodes.OpQueryNodes(output_fields=["name", "offline"], names=names)
327 result = self.ExecOp(op)
328 except errors.GenericError, err:
329 err_code, msg = cli.FormatError(err)
330 Err(msg, exit_code=err_code)
331 self.nodes = [data[0] for data in result if not data[1]]
333 result = self.ExecOp(opcodes.OpDiagnoseOS(output_fields=["name", "valid"],
337 Err("Can't get the OS list")
339 # filter non-valid OS-es
340 os_set = [val[0] for val in result if val[1]]
342 if self.opts.os not in os_set:
343 Err("OS '%s' not found" % self.opts.os)
345 def BurnCreateInstances(self):
346 """Create the given instances.
350 mytor = izip(cycle(self.nodes),
351 islice(cycle(self.nodes), 1, None),
354 Log("Creating instances")
355 for pnode, snode, instance in mytor:
356 Log("instance %s" % instance, indent=1)
357 if self.opts.iallocator:
359 msg = "with iallocator %s" % self.opts.iallocator
360 elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
362 msg = "on %s" % pnode
364 msg = "on %s, %s" % (pnode, snode)
368 op = opcodes.OpCreateInstance(instance_name=instance,
369 disks = [ {"size": size}
370 for size in self.disk_size],
371 disk_template=self.opts.disk_template,
373 mode=constants.INSTANCE_CREATE,
374 os_type=self.opts.os,
381 file_storage_dir=None,
382 iallocator=self.opts.iallocator,
387 self.ExecOrQueue(instance, op)
388 self.to_rem.append(instance)
392 for instance in self.instances:
393 self._CheckInstanceAlive(instance)
395 def BurnGrowDisks(self):
396 """Grow both the os and the swap disks by the requested amount, if any."""
398 for instance in self.instances:
399 Log("instance %s" % instance, indent=1)
400 for idx, growth in enumerate(self.disk_growth):
402 op = opcodes.OpGrowDisk(instance_name=instance, disk=idx,
403 amount=growth, wait_for_sync=True)
404 Log("increase disk/%s by %s MB" % (idx, growth), indent=2)
405 self.ExecOrQueue(instance, op)
408 def BurnReplaceDisks1D8(self):
409 """Replace disks on primary and secondary for drbd8."""
410 Log("Replacing disks on the same nodes")
411 for instance in self.instances:
412 Log("instance %s" % instance, indent=1)
414 for mode in constants.REPLACE_DISK_SEC, constants.REPLACE_DISK_PRI:
415 op = opcodes.OpReplaceDisks(instance_name=instance,
417 disks=[i for i in range(self.disk_count)])
418 Log("run %s" % mode, indent=2)
420 self.ExecOrQueue(instance, *ops)
423 def BurnReplaceDisks2(self):
424 """Replace secondary node."""
425 Log("Changing the secondary node")
426 mode = constants.REPLACE_DISK_CHG
428 mytor = izip(islice(cycle(self.nodes), 2, None),
430 for tnode, instance in mytor:
431 Log("instance %s" % instance, indent=1)
432 if self.opts.iallocator:
434 msg = "with iallocator %s" % self.opts.iallocator
437 op = opcodes.OpReplaceDisks(instance_name=instance,
440 iallocator=self.opts.iallocator,
441 disks=[i for i in range(self.disk_count)])
442 Log("run %s %s" % (mode, msg), indent=2)
443 self.ExecOrQueue(instance, op)
446 def BurnFailover(self):
447 """Failover the instances."""
448 Log("Failing over instances")
449 for instance in self.instances:
450 Log("instance %s" % instance, indent=1)
451 op = opcodes.OpFailoverInstance(instance_name=instance,
452 ignore_consistency=False)
454 self.ExecOrQueue(instance, op)
456 for instance in self.instances:
457 self._CheckInstanceAlive(instance)
459 def BurnMigrate(self):
460 """Migrate the instances."""
461 Log("Migrating instances")
462 for instance in self.instances:
463 Log("instance %s" % instance, indent=1)
464 op1 = opcodes.OpMigrateInstance(instance_name=instance, live=True,
467 op2 = opcodes.OpMigrateInstance(instance_name=instance, live=True,
469 Log("migration and migration cleanup", indent=2)
470 self.ExecOrQueue(instance, op1, op2)
473 def BurnImportExport(self):
474 """Export the instance, delete it, and import it back.
477 Log("Exporting and re-importing instances")
478 mytor = izip(cycle(self.nodes),
479 islice(cycle(self.nodes), 1, None),
480 islice(cycle(self.nodes), 2, None),
483 for pnode, snode, enode, instance in mytor:
484 Log("instance %s" % instance, indent=1)
485 # read the full name of the instance
486 nam_op = opcodes.OpQueryInstances(output_fields=["name"],
488 full_name = self.ExecOp(nam_op)[0][0]
490 if self.opts.iallocator:
492 import_log_msg = ("import from %s"
493 " with iallocator %s" %
494 (enode, self.opts.iallocator))
495 elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
497 import_log_msg = ("import from %s to %s" %
500 import_log_msg = ("import from %s to %s, %s" %
501 (enode, pnode, snode))
503 exp_op = opcodes.OpExportInstance(instance_name=instance,
506 rem_op = opcodes.OpRemoveInstance(instance_name=instance,
507 ignore_failures=True)
508 imp_dir = os.path.join(constants.EXPORT_DIR, full_name)
509 imp_op = opcodes.OpCreateInstance(instance_name=instance,
510 disks = [ {"size": size}
511 for size in self.disk_size],
512 disk_template=self.opts.disk_template,
514 mode=constants.INSTANCE_IMPORT,
522 file_storage_dir=None,
524 iallocator=self.opts.iallocator,
529 erem_op = opcodes.OpRemoveExport(instance_name=instance)
531 Log("export to node %s" % enode, indent=2)
532 Log("remove instance", indent=2)
533 Log(import_log_msg, indent=2)
534 Log("remove export", indent=2)
535 self.ExecOrQueue(instance, exp_op, rem_op, imp_op, erem_op)
538 for instance in self.instances:
539 self._CheckInstanceAlive(instance)
541 def StopInstanceOp(self, instance):
542 """Stop given instance."""
543 return opcodes.OpShutdownInstance(instance_name=instance)
545 def StartInstanceOp(self, instance):
546 """Start given instance."""
547 return opcodes.OpStartupInstance(instance_name=instance, force=False)
549 def RenameInstanceOp(self, instance, instance_new):
550 """Rename instance."""
551 return opcodes.OpRenameInstance(instance_name=instance,
552 new_name=instance_new)
554 def BurnStopStart(self):
555 """Stop/start the instances."""
556 Log("Stopping and starting instances")
557 for instance in self.instances:
558 Log("instance %s" % instance, indent=1)
559 op1 = self.StopInstanceOp(instance)
560 op2 = self.StartInstanceOp(instance)
561 self.ExecOrQueue(instance, op1, op2)
565 for instance in self.instances:
566 self._CheckInstanceAlive(instance)
568 def BurnRemove(self):
569 """Remove the instances."""
570 Log("Removing instances")
571 for instance in self.to_rem:
572 Log("instance %s" % instance, indent=1)
573 op = opcodes.OpRemoveInstance(instance_name=instance,
574 ignore_failures=True)
575 self.ExecOrQueue(instance, op)
579 def BurnRename(self):
580 """Rename the instances.
582 Note that this function will not execute in parallel, since we
583 only have one target for rename.
586 Log("Renaming instances")
587 rename = self.opts.rename
588 for instance in self.instances:
589 Log("instance %s" % instance, indent=1)
590 op_stop = self.StopInstanceOp(instance)
591 op_rename1 = self.RenameInstanceOp(instance, rename)
592 op_rename2 = self.RenameInstanceOp(rename, instance)
593 op_start1 = self.StartInstanceOp(rename)
594 op_start2 = self.StartInstanceOp(instance)
595 self.ExecOp(op_stop, op_rename1, op_start1)
596 self._CheckInstanceAlive(rename)
597 self.ExecOp(op_stop, op_rename2, op_start2)
598 self._CheckInstanceAlive(instance)
600 def BurnReinstall(self):
601 """Reinstall the instances."""
602 Log("Reinstalling instances")
603 for instance in self.instances:
604 Log("instance %s" % instance, indent=1)
605 op1 = self.StopInstanceOp(instance)
606 op2 = opcodes.OpReinstallInstance(instance_name=instance)
607 Log("reinstall without passing the OS", indent=2)
608 op3 = opcodes.OpReinstallInstance(instance_name=instance,
609 os_type=self.opts.os)
610 Log("reinstall specifying the OS", indent=2)
611 op4 = self.StartInstanceOp(instance)
612 self.ExecOrQueue(instance, op1, op2, op3, op4)
616 for instance in self.instances:
617 self._CheckInstanceAlive(instance)
619 def BurnReboot(self):
620 """Reboot the instances."""
621 Log("Rebooting instances")
622 for instance in self.instances:
623 Log("instance %s" % instance, indent=1)
625 for reboot_type in constants.REBOOT_TYPES:
626 op = opcodes.OpRebootInstance(instance_name=instance,
627 reboot_type=reboot_type,
628 ignore_secondaries=False)
629 Log("reboot with type '%s'" % reboot_type, indent=2)
631 self.ExecOrQueue(instance, *ops)
635 for instance in self.instances:
636 self._CheckInstanceAlive(instance)
638 def BurnActivateDisks(self):
639 """Activate and deactivate disks of the instances."""
640 Log("Activating/deactivating disks")
641 for instance in self.instances:
642 Log("instance %s" % instance, indent=1)
643 op_start = self.StartInstanceOp(instance)
644 op_act = opcodes.OpActivateInstanceDisks(instance_name=instance)
645 op_deact = opcodes.OpDeactivateInstanceDisks(instance_name=instance)
646 op_stop = self.StopInstanceOp(instance)
647 Log("activate disks when online", indent=2)
648 Log("activate disks when offline", indent=2)
649 Log("deactivate disks (when offline)", indent=2)
650 self.ExecOrQueue(instance, op_act, op_stop, op_act, op_deact, op_start)
652 for instance in self.instances:
653 self._CheckInstanceAlive(instance)
655 def BurnAddRemoveDisks(self):
656 """Add and remove an extra disk for the instances."""
657 Log("Adding and removing disks")
658 for instance in self.instances:
659 Log("instance %s" % instance, indent=1)
660 op_add = opcodes.OpSetInstanceParams(\
661 instance_name=instance,
662 disks=[(constants.DDM_ADD, {"size": self.disk_size[0]})])
663 op_rem = opcodes.OpSetInstanceParams(\
664 instance_name=instance, disks=[(constants.DDM_REMOVE, {})])
665 op_stop = self.StopInstanceOp(instance)
666 op_start = self.StartInstanceOp(instance)
667 Log("adding a disk", indent=2)
668 Log("removing last disk", indent=2)
669 self.ExecOrQueue(instance, op_add, op_stop, op_rem, op_start)
671 for instance in self.instances:
672 self._CheckInstanceAlive(instance)
674 def BurnAddRemoveNICs(self):
675 """Add and remove an extra NIC for the instances."""
676 Log("Adding and removing NICs")
677 for instance in self.instances:
678 Log("instance %s" % instance, indent=1)
679 op_add = opcodes.OpSetInstanceParams(\
680 instance_name=instance, nics=[(constants.DDM_ADD, {})])
681 op_rem = opcodes.OpSetInstanceParams(\
682 instance_name=instance, nics=[(constants.DDM_REMOVE, {})])
683 Log("adding a NIC", indent=2)
684 Log("removing last NIC", indent=2)
685 self.ExecOrQueue(instance, op_add, op_rem)
688 def _CheckInstanceAlive(self, instance):
689 """Check if an instance is alive by doing http checks.
691 This will try to retrieve the url on the instance /hostname.txt
692 and check that it contains the hostname of the instance. In case
693 we get ECONNREFUSED, we retry up to the net timeout seconds, for
694 any other error we abort.
697 if not self.opts.http_check:
699 end_time = time.time() + self.opts.net_timeout
701 while time.time() < end_time and url is None:
703 url = self.url_opener.open("http://%s/hostname.txt" % instance)
705 # here we can have connection refused, no route to host, etc.
708 raise InstanceDown(instance, "Cannot contact instance")
709 hostname = url.read().strip()
711 if hostname != instance:
712 raise InstanceDown(instance, ("Hostname mismatch, expected %s, got %s" %
713 (instance, hostname)))
715 def BurninCluster(self):
716 """Test a cluster intensively.
718 This will create instances and then start/stop/failover them.
719 It is safe for existing instances but could impact performance.
725 Log("Testing global parameters")
727 if (len(self.nodes) == 1 and
728 opts.disk_template not in (constants.DT_DISKLESS, constants.DT_PLAIN,
730 Err("When one node is available/selected the disk template must"
731 " be 'diskless', 'file' or 'plain'")
735 self.BurnCreateInstances()
736 if opts.do_replace1 and opts.disk_template in constants.DTS_NET_MIRROR:
737 self.BurnReplaceDisks1D8()
738 if (opts.do_replace2 and len(self.nodes) > 2 and
739 opts.disk_template in constants.DTS_NET_MIRROR) :
740 self.BurnReplaceDisks2()
742 if (opts.disk_template != constants.DT_DISKLESS and
743 utils.any(self.disk_growth, lambda n: n > 0)):
746 if opts.do_failover and opts.disk_template in constants.DTS_NET_MIRROR:
749 if opts.do_migrate and opts.disk_template == constants.DT_DRBD8:
752 if (opts.do_importexport and
753 opts.disk_template not in (constants.DT_DISKLESS,
755 self.BurnImportExport()
757 if opts.do_reinstall:
763 if opts.do_addremove_disks:
764 self.BurnAddRemoveDisks()
766 if opts.do_addremove_nics:
767 self.BurnAddRemoveNICs()
769 if opts.do_activate_disks:
770 self.BurnActivateDisks()
775 if opts.do_startstop:
781 Log("Error detected: opcode buffer follows:\n\n")
782 Log(self.GetFeedbackBuf())
784 if not self.opts.keep_instances:
794 return burner.BurninCluster()
797 if __name__ == "__main__":