+ sys.stderr.write(msg + "\n")
+ sys.stderr.flush()
+ sys.exit(exit_code)
+
+
+class SimpleOpener(urllib.FancyURLopener):
+ """A simple url opener"""
+
+ def prompt_user_passwd(self, host, realm, clear_cache = 0):
+ """No-interaction version of prompt_user_passwd."""
+ return None, None
+
+ def http_error_default(self, url, fp, errcode, errmsg, headers):
+ """Custom error handling"""
+ # make sure sockets are not left in CLOSE_WAIT, this is similar
+ # but with a different exception to the BasicURLOpener class
+ _ = fp.read() # throw away data
+ fp.close()
+ raise InstanceDown("HTTP error returned: code %s, msg %s" %
+ (errcode, errmsg))
+
+
+class Burner(object):
+ """Burner class."""
+
+ def __init__(self):
+ """Constructor."""
+ utils.SetupLogging(constants.LOG_BURNIN, debug=False, stderr_logging=True)
+ self.url_opener = SimpleOpener()
+ self._feed_buf = StringIO()
+ self.nodes = []
+ self.instances = []
+ self.to_rem = []
+ self.queued_ops = []
+ self.opts = None
+ self.ParseOptions()
+ self.cl = cli.GetClient()
+ self.GetState()
+
+ def ClearFeedbackBuf(self):
+ """Clear the feedback buffer."""
+ self._feed_buf.truncate(0)
+
+ def GetFeedbackBuf(self):
+ """Return the contents of the buffer."""
+ return self._feed_buf.getvalue()
+
+ def Feedback(self, msg):
+ """Acumulate feedback in our buffer."""
+ self._feed_buf.write("%s %s\n" % (time.ctime(utils.MergeTime(msg[0])),
+ msg[2]))
+ if self.opts.verbose:
+ Log(msg, indent=3)
+
+ def ExecOp(self, *ops):
+ """Execute one or more opcodes and manage the exec buffer.
+
+ @result: if only opcode has been passed, we return its result;
+ otherwise we return the list of results
+
+ """
+ job_id = cli.SendJob(ops, cl=self.cl)
+ results = cli.PollJob(job_id, cl=self.cl, feedback_fn=self.Feedback)
+ if len(ops) == 1:
+ return results[0]
+ else:
+ return results
+
+ def ExecOrQueue(self, name, *ops):
+ """Execute an opcode and manage the exec buffer."""
+ if self.opts.parallel:
+ self.queued_ops.append((ops, name))
+ else:
+ return self.ExecOp(*ops)
+
+ def CommitQueue(self):
+ """Execute all submitted opcodes in case of parallel burnin"""
+ if not self.opts.parallel:
+ return
+
+ try:
+ results = self.ExecJobSet(self.queued_ops)
+ finally:
+ self.queued_ops = []
+ return results
+
+ def ExecJobSet(self, jobs):
+ """Execute a set of jobs and return once all are done.
+
+ The method will return the list of results, if all jobs are
+ successful. Otherwise, OpExecError will be raised from within
+ cli.py.
+
+ """
+ self.ClearFeedbackBuf()
+ job_ids = [cli.SendJob(row[0], cl=self.cl) for row in jobs]
+ Log("Submitted job ID(s) %s" % ", ".join(job_ids), indent=1)
+ results = []
+ for jid, (_, iname) in zip(job_ids, jobs):
+ Log("waiting for job %s for %s" % (jid, iname), indent=2)
+ results.append(cli.PollJob(jid, cl=self.cl, feedback_fn=self.Feedback))
+
+ return results
+
+ def ParseOptions(self):
+ """Parses the command line options.
+
+ In case of command line errors, it will show the usage and exit the
+ program.
+
+ """
+
+ parser = optparse.OptionParser(usage="\n%s" % USAGE,
+ version="%%prog (ganeti) %s" %
+ constants.RELEASE_VERSION,
+ option_class=cli.CliOption)
+
+ parser.add_option("-o", "--os", dest="os", default=None,
+ help="OS to use during burnin",
+ metavar="<OS>")
+ parser.add_option("--disk-size", dest="disk_size",
+ help="Disk size (determines disk count)",
+ default="128m", type="string", metavar="<size,size,...>")
+ parser.add_option("--disk-growth", dest="disk_growth", help="Disk growth",
+ default="128m", type="string", metavar="<size,size,...>")
+ parser.add_option("--mem-size", dest="mem_size", help="Memory size",
+ default=128, type="unit", metavar="<size>")
+ parser.add_option("-v", "--verbose",
+ action="store_true", dest="verbose", default=False,
+ help="print command execution messages to stdout")
+ parser.add_option("--no-replace1", dest="do_replace1",
+ help="Skip disk replacement with the same secondary",
+ action="store_false", default=True)
+ parser.add_option("--no-replace2", dest="do_replace2",
+ help="Skip disk replacement with a different secondary",
+ action="store_false", default=True)
+ parser.add_option("--no-failover", dest="do_failover",
+ help="Skip instance failovers", action="store_false",
+ default=True)
+ parser.add_option("--no-migrate", dest="do_migrate",
+ help="Skip instance live migration",
+ action="store_false", default=True)
+ parser.add_option("--no-importexport", dest="do_importexport",
+ help="Skip instance export/import", action="store_false",
+ default=True)
+ parser.add_option("--no-startstop", dest="do_startstop",
+ help="Skip instance stop/start", action="store_false",
+ default=True)
+ parser.add_option("--no-reinstall", dest="do_reinstall",
+ help="Skip instance reinstall", action="store_false",
+ default=True)
+ parser.add_option("--no-reboot", dest="do_reboot",
+ help="Skip instance reboot", action="store_false",
+ default=True)
+ parser.add_option("--no-activate-disks", dest="do_activate_disks",
+ help="Skip disk activation/deactivation",
+ action="store_false", default=True)
+ parser.add_option("--no-add-disks", dest="do_addremove_disks",
+ help="Skip disk addition/removal",
+ action="store_false", default=True)
+ parser.add_option("--no-add-nics", dest="do_addremove_nics",
+ help="Skip NIC addition/removal",
+ action="store_false", default=True)
+ parser.add_option("--no-nics", dest="nics",
+ help="No network interfaces", action="store_const",
+ const=[], default=[{}])
+ parser.add_option("--rename", dest="rename", default=None,
+ help="Give one unused instance name which is taken"
+ " to start the renaming sequence",
+ metavar="<instance_name>")
+ parser.add_option("-t", "--disk-template", dest="disk_template",
+ choices=("diskless", "file", "plain", "drbd"),
+ default="drbd",
+ help="Disk template (diskless, file, plain or drbd)"
+ " [drbd]")
+ parser.add_option("-n", "--nodes", dest="nodes", default="",
+ help="Comma separated list of nodes to perform"
+ " the burnin on (defaults to all nodes)")
+ parser.add_option("-I", "--iallocator", dest="iallocator",
+ default=None, type="string",
+ help="Perform the allocation using an iallocator"
+ " instead of fixed node spread (node restrictions no"
+ " longer apply, therefore -n/--nodes must not be used")
+ parser.add_option("-p", "--parallel", default=False, action="store_true",
+ dest="parallel",
+ help="Enable parallelization of some operations in"
+ " order to speed burnin or to test granular locking")
+ parser.add_option("--net-timeout", default=15, type="int",
+ dest="net_timeout",
+ help="The instance check network timeout in seconds"
+ " (defaults to 15 seconds)")
+ parser.add_option("-C", "--http-check", default=False, action="store_true",
+ dest="http_check",
+ help="Enable checking of instance status via http,"
+ " looking for /hostname.txt that should contain the"
+ " name of the instance")
+ parser.add_option("-K", "--keep-instances", default=False,
+ action="store_true",
+ dest="keep_instances",
+ help="Leave instances on the cluster after burnin,"
+ " for investigation in case of errors or simply"
+ " to use them")
+
+
+ options, args = parser.parse_args()
+ if len(args) < 1 or options.os is None:
+ Usage()
+
+ supported_disk_templates = (constants.DT_DISKLESS,
+ constants.DT_FILE,
+ constants.DT_PLAIN,
+ constants.DT_DRBD8)
+ if options.disk_template not in supported_disk_templates:
+ Err("Unknown disk template '%s'" % options.disk_template)
+
+ if options.disk_template == constants.DT_DISKLESS:
+ disk_size = disk_growth = []
+ options.do_addremove_disks = False
+ else:
+ disk_size = [utils.ParseUnit(v) for v in options.disk_size.split(",")]
+ disk_growth = [utils.ParseUnit(v)
+ for v in options.disk_growth.split(",")]
+ if len(disk_growth) != len(disk_size):
+ Err("Wrong disk sizes/growth combination")
+ if ((disk_size and options.disk_template == constants.DT_DISKLESS) or
+ (not disk_size and options.disk_template != constants.DT_DISKLESS)):
+ Err("Wrong disk count/disk template combination")
+
+ self.disk_size = disk_size
+ self.disk_growth = disk_growth
+ self.disk_count = len(disk_size)
+
+ if options.nodes and options.iallocator:
+ Err("Give either the nodes option or the iallocator option, not both")
+
+ self.opts = options
+ self.instances = args
+ self.bep = {
+ constants.BE_MEMORY: options.mem_size,
+ constants.BE_VCPUS: 1,
+ }
+ self.hvp = {}
+
+ socket.setdefaulttimeout(options.net_timeout)
+
+ def GetState(self):
+ """Read the cluster state from the config."""
+ if self.opts.nodes:
+ names = self.opts.nodes.split(",")
+ else:
+ names = []
+ try:
+ op = opcodes.OpQueryNodes(output_fields=["name", "offline", "drained"],
+ names=names, use_locking=True)
+ result = self.ExecOp(op)
+ except errors.GenericError, err:
+ err_code, msg = cli.FormatError(err)
+ Err(msg, exit_code=err_code)
+ self.nodes = [data[0] for data in result if not (data[1] or data[2])]
+
+ result = self.ExecOp(opcodes.OpDiagnoseOS(output_fields=["name", "valid"],
+ names=[]))
+
+ if not result:
+ Err("Can't get the OS list")
+
+ # filter non-valid OS-es
+ os_set = [val[0] for val in result if val[1]]
+
+ if self.opts.os not in os_set:
+ Err("OS '%s' not found" % self.opts.os)
+
+ def BurnCreateInstances(self):
+ """Create the given instances.
+
+ """
+ self.to_rem = []
+ mytor = izip(cycle(self.nodes),
+ islice(cycle(self.nodes), 1, None),
+ self.instances)
+
+ Log("Creating instances")
+ for pnode, snode, instance in mytor:
+ Log("instance %s" % instance, indent=1)
+ if self.opts.iallocator:
+ pnode = snode = None
+ msg = "with iallocator %s" % self.opts.iallocator
+ elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
+ snode = None
+ msg = "on %s" % pnode