USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...")
+MAX_RETRIES = 3
class InstanceDown(Exception):
"""The checked instance was not up"""
+class BurninFailure(Exception):
+ """Failure detected during burning"""
+
+
def Usage():
"""Shows program usage information and exits the program."""
(errcode, errmsg))
+OPTIONS = [
+ cli.cli_option("-o", "--os", dest="os", default=None,
+ help="OS to use during burnin",
+ metavar="<OS>",
+ completion_suggest=cli.OPT_COMPL_ONE_OS),
+ cli.cli_option("--disk-size", dest="disk_size",
+ help="Disk size (determines disk count)",
+ default="128m", type="string", metavar="<size,size,...>",
+ completion_suggest=("128M 512M 1G 4G 1G,256M"
+ " 4G,1G,1G 10G").split()),
+ cli.cli_option("--disk-growth", dest="disk_growth", help="Disk growth",
+ default="128m", type="string", metavar="<size,size,...>"),
+ cli.cli_option("--mem-size", dest="mem_size", help="Memory size",
+ default=128, type="unit", metavar="<size>",
+ completion_suggest=("128M 256M 512M 1G 4G 8G"
+ " 12G 16G").split()),
+ cli.VERBOSE_OPT,
+ cli.NOIPCHECK_OPT,
+ cli.NONAMECHECK_OPT,
+ cli.cli_option("--no-replace1", dest="do_replace1",
+ help="Skip disk replacement with the same secondary",
+ action="store_false", default=True),
+ cli.cli_option("--no-replace2", dest="do_replace2",
+ help="Skip disk replacement with a different secondary",
+ action="store_false", default=True),
+ cli.cli_option("--no-failover", dest="do_failover",
+ help="Skip instance failovers", action="store_false",
+ default=True),
+ cli.cli_option("--no-migrate", dest="do_migrate",
+ help="Skip instance live migration",
+ action="store_false", default=True),
+ cli.cli_option("--no-move", dest="do_move",
+ help="Skip instance moves", action="store_false",
+ default=True),
+ cli.cli_option("--no-importexport", dest="do_importexport",
+ help="Skip instance export/import", action="store_false",
+ default=True),
+ cli.cli_option("--no-startstop", dest="do_startstop",
+ help="Skip instance stop/start", action="store_false",
+ default=True),
+ cli.cli_option("--no-reinstall", dest="do_reinstall",
+ help="Skip instance reinstall", action="store_false",
+ default=True),
+ cli.cli_option("--no-reboot", dest="do_reboot",
+ help="Skip instance reboot", action="store_false",
+ default=True),
+ cli.cli_option("--no-activate-disks", dest="do_activate_disks",
+ help="Skip disk activation/deactivation",
+ action="store_false", default=True),
+ cli.cli_option("--no-add-disks", dest="do_addremove_disks",
+ help="Skip disk addition/removal",
+ action="store_false", default=True),
+ cli.cli_option("--no-add-nics", dest="do_addremove_nics",
+ help="Skip NIC addition/removal",
+ action="store_false", default=True),
+ cli.cli_option("--no-nics", dest="nics",
+ help="No network interfaces", action="store_const",
+ const=[], default=[{}]),
+ cli.cli_option("--rename", dest="rename", default=None,
+ help=("Give one unused instance name which is taken"
+ " to start the renaming sequence"),
+ metavar="<instance_name>"),
+ cli.cli_option("-t", "--disk-template", dest="disk_template",
+ choices=list(constants.DISK_TEMPLATES),
+ default=constants.DT_DRBD8,
+ help="Disk template (diskless, file, plain or drbd) [drbd]"),
+ cli.cli_option("-n", "--nodes", dest="nodes", default="",
+ help=("Comma separated list of nodes to perform"
+ " the burnin on (defaults to all nodes)"),
+ completion_suggest=cli.OPT_COMPL_MANY_NODES),
+ cli.cli_option("-I", "--iallocator", dest="iallocator",
+ default=None, type="string",
+ help=("Perform the allocation using an iallocator"
+ " instead of fixed node spread (node restrictions no"
+ " longer apply, therefore -n/--nodes must not be"
+ " used"),
+ completion_suggest=cli.OPT_COMPL_ONE_IALLOCATOR),
+ cli.cli_option("-p", "--parallel", default=False, action="store_true",
+ dest="parallel",
+ help=("Enable parallelization of some operations in"
+ " order to speed burnin or to test granular locking")),
+ cli.cli_option("--net-timeout", default=15, type="int",
+ dest="net_timeout",
+ help=("The instance check network timeout in seconds"
+ " (defaults to 15 seconds)"),
+ completion_suggest="15 60 300 900".split()),
+ cli.cli_option("-C", "--http-check", default=False, action="store_true",
+ dest="http_check",
+ help=("Enable checking of instance status via http,"
+ " looking for /hostname.txt that should contain the"
+ " name of the instance")),
+ cli.cli_option("-K", "--keep-instances", default=False,
+ action="store_true",
+ dest="keep_instances",
+ help=("Leave instances on the cluster after burnin,"
+ " for investigation in case of errors or simply"
+ " to use them")),
+ ]
+
+# Mainly used for bash completion
+ARGUMENTS = [cli.ArgInstance(min=1)]
+
+
+def _DoCheckInstances(fn):
+ """Decorator for checking instances.
+
+ """
+ def wrapper(self, *args, **kwargs):
+ val = fn(self, *args, **kwargs)
+ for instance in self.instances:
+ self._CheckInstanceAlive(instance)
+ return val
+
+ return wrapper
+
+
+def _DoBatch(retry):
+ """Decorator for possible batch operations.
+
+ Must come after the _DoCheckInstances decorator (if any).
+
+ @param retry: whether this is a retryable batch, will be
+ passed to StartBatch
+
+ """
+ def wrap(fn):
+ def batched(self, *args, **kwargs):
+ self.StartBatch(retry)
+ val = fn(self, *args, **kwargs)
+ self.CommitQueue()
+ return val
+ return batched
+
+ return wrap
+
+
class Burner(object):
"""Burner class."""
self.to_rem = []
self.queued_ops = []
self.opts = None
+ self.queue_retry = False
+ self.disk_count = self.disk_growth = self.disk_size = None
+ self.hvp = self.bep = None
self.ParseOptions()
self.cl = cli.GetClient()
self.GetState()
def Feedback(self, msg):
"""Acumulate feedback in our buffer."""
- self._feed_buf.write("%s %s\n" % (time.ctime(utils.MergeTime(msg[0])),
- msg[2]))
+ formatted_msg = "%s %s" % (time.ctime(utils.MergeTime(msg[0])), msg[2])
+ self._feed_buf.write(formatted_msg + "\n")
if self.opts.verbose:
- Log(msg, indent=3)
+ Log(formatted_msg, indent=3)
+
+ def MaybeRetry(self, retry_count, msg, fn, *args):
+ """Possibly retry a given function execution.
- def ExecOp(self, *ops):
+ @type retry_count: int
+ @param retry_count: retry counter:
+ - 0: non-retryable action
+ - 1: last retry for a retryable action
+ - MAX_RETRIES: original try for a retryable action
+ @type msg: str
+ @param msg: the kind of the operation
+ @type fn: callable
+ @param fn: the function to be called
+
+ """
+ try:
+ val = fn(*args)
+ if retry_count > 0 and retry_count < MAX_RETRIES:
+ Log("Idempotent %s succeeded after %d retries" %
+ (msg, MAX_RETRIES - retry_count))
+ return val
+ except Exception, err:
+ if retry_count == 0:
+ Log("Non-idempotent %s failed, aborting" % (msg, ))
+ raise
+ elif retry_count == 1:
+ Log("Idempotent %s repeated failure, aborting" % (msg, ))
+ raise
+ else:
+ Log("Idempotent %s failed, retry #%d/%d: %s" %
+ (msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err))
+ self.MaybeRetry(retry_count - 1, msg, fn, *args)
+
+ def _ExecOp(self, *ops):
"""Execute one or more opcodes and manage the exec buffer.
@result: if only opcode has been passed, we return its result;
else:
return results
+ def ExecOp(self, retry, *ops):
+ """Execute one or more opcodes and manage the exec buffer.
+
+ @result: if only opcode has been passed, we return its result;
+ otherwise we return the list of results
+
+ """
+ if retry:
+ rval = MAX_RETRIES
+ else:
+ rval = 0
+ return self.MaybeRetry(rval, "opcode", self._ExecOp, *ops)
+
def ExecOrQueue(self, name, *ops):
"""Execute an opcode and manage the exec buffer."""
if self.opts.parallel:
self.queued_ops.append((ops, name))
else:
- return self.ExecOp(*ops)
+ return self.ExecOp(self.queue_retry, *ops)
+
+ def StartBatch(self, retry):
+ """Start a new batch of jobs.
+
+ @param retry: whether this is a retryable batch
+
+ """
+ self.queued_ops = []
+ self.queue_retry = retry
def CommitQueue(self):
"""Execute all submitted opcodes in case of parallel burnin"""
if not self.opts.parallel:
return
+ if self.queue_retry:
+ rval = MAX_RETRIES
+ else:
+ rval = 0
+
try:
- results = self.ExecJobSet(self.queued_ops)
+ results = self.MaybeRetry(rval, "jobset", self.ExecJobSet,
+ self.queued_ops)
finally:
self.queued_ops = []
return results
"""
self.ClearFeedbackBuf()
job_ids = [cli.SendJob(row[0], cl=self.cl) for row in jobs]
- Log("Submitted job ID(s) %s" % ", ".join(job_ids), indent=1)
+ Log("Submitted job ID(s) %s" % utils.CommaJoin(job_ids), indent=1)
results = []
for jid, (_, iname) in zip(job_ids, jobs):
Log("waiting for job %s for %s" % (jid, iname), indent=2)
- results.append(cli.PollJob(jid, cl=self.cl, feedback_fn=self.Feedback))
-
+ try:
+ results.append(cli.PollJob(jid, cl=self.cl, feedback_fn=self.Feedback))
+ except Exception, err:
+ Log("Job for %s failed: %s" % (iname, err))
+ if len(results) != len(jobs):
+ raise BurninFailure()
return results
def ParseOptions(self):
program.
"""
-
parser = optparse.OptionParser(usage="\n%s" % USAGE,
- version="%%prog (ganeti) %s" %
- constants.RELEASE_VERSION,
- option_class=cli.CliOption)
-
- parser.add_option("-o", "--os", dest="os", default=None,
- help="OS to use during burnin",
- metavar="<OS>")
- parser.add_option("--disk-size", dest="disk_size",
- help="Disk size (determines disk count)",
- default="128m", type="string", metavar="<size,size,...>")
- parser.add_option("--disk-growth", dest="disk_growth", help="Disk growth",
- default="128m", type="string", metavar="<size,size,...>")
- parser.add_option("--mem-size", dest="mem_size", help="Memory size",
- default=128, type="unit", metavar="<size>")
- parser.add_option("-v", "--verbose",
- action="store_true", dest="verbose", default=False,
- help="print command execution messages to stdout")
- parser.add_option("--no-replace1", dest="do_replace1",
- help="Skip disk replacement with the same secondary",
- action="store_false", default=True)
- parser.add_option("--no-replace2", dest="do_replace2",
- help="Skip disk replacement with a different secondary",
- action="store_false", default=True)
- parser.add_option("--no-failover", dest="do_failover",
- help="Skip instance failovers", action="store_false",
- default=True)
- parser.add_option("--no-migrate", dest="do_migrate",
- help="Skip instance live migration",
- action="store_false", default=True)
- parser.add_option("--no-importexport", dest="do_importexport",
- help="Skip instance export/import", action="store_false",
- default=True)
- parser.add_option("--no-startstop", dest="do_startstop",
- help="Skip instance stop/start", action="store_false",
- default=True)
- parser.add_option("--no-reinstall", dest="do_reinstall",
- help="Skip instance reinstall", action="store_false",
- default=True)
- parser.add_option("--no-reboot", dest="do_reboot",
- help="Skip instance reboot", action="store_false",
- default=True)
- parser.add_option("--no-activate-disks", dest="do_activate_disks",
- help="Skip disk activation/deactivation",
- action="store_false", default=True)
- parser.add_option("--no-add-disks", dest="do_addremove_disks",
- help="Skip disk addition/removal",
- action="store_false", default=True)
- parser.add_option("--no-add-nics", dest="do_addremove_nics",
- help="Skip NIC addition/removal",
- action="store_false", default=True)
- parser.add_option("--no-nics", dest="nics",
- help="No network interfaces", action="store_const",
- const=[], default=[{}])
- parser.add_option("--rename", dest="rename", default=None,
- help="Give one unused instance name which is taken"
- " to start the renaming sequence",
- metavar="<instance_name>")
- parser.add_option("-t", "--disk-template", dest="disk_template",
- choices=("diskless", "file", "plain", "drbd"),
- default="drbd",
- help="Disk template (diskless, file, plain or drbd)"
- " [drbd]")
- parser.add_option("-n", "--nodes", dest="nodes", default="",
- help="Comma separated list of nodes to perform"
- " the burnin on (defaults to all nodes)")
- parser.add_option("-I", "--iallocator", dest="iallocator",
- default=None, type="string",
- help="Perform the allocation using an iallocator"
- " instead of fixed node spread (node restrictions no"
- " longer apply, therefore -n/--nodes must not be used")
- parser.add_option("-p", "--parallel", default=False, action="store_true",
- dest="parallel",
- help="Enable parallelization of some operations in"
- " order to speed burnin or to test granular locking")
- parser.add_option("--net-timeout", default=15, type="int",
- dest="net_timeout",
- help="The instance check network timeout in seconds"
- " (defaults to 15 seconds)")
- parser.add_option("-C", "--http-check", default=False, action="store_true",
- dest="http_check",
- help="Enable checking of instance status via http,"
- " looking for /hostname.txt that should contain the"
- " name of the instance")
- parser.add_option("-K", "--keep-instances", default=False,
- action="store_true",
- dest="keep_instances",
- help="Leave instances on the cluster after burnin,"
- " for investigation in case of errors or simply"
- " to use them")
-
+ version=("%%prog (ganeti) %s" %
+ constants.RELEASE_VERSION),
+ option_list=OPTIONS)
options, args = parser.parse_args()
if len(args) < 1 or options.os is None:
if options.nodes and options.iallocator:
Err("Give either the nodes option or the iallocator option, not both")
+ if options.http_check and not options.name_check:
+ Err("Can't enable HTTP checks without name checks")
+
self.opts = options
self.instances = args
self.bep = {
else:
names = []
try:
- op = opcodes.OpQueryNodes(output_fields=["name", "offline"],
+ op = opcodes.OpQueryNodes(output_fields=["name", "offline", "drained"],
names=names, use_locking=True)
- result = self.ExecOp(op)
+ result = self.ExecOp(True, op)
except errors.GenericError, err:
err_code, msg = cli.FormatError(err)
Err(msg, exit_code=err_code)
- self.nodes = [data[0] for data in result if not data[1]]
+ self.nodes = [data[0] for data in result if not (data[1] or data[2])]
- result = self.ExecOp(opcodes.OpDiagnoseOS(output_fields=["name", "valid"],
- names=[]))
+ op_diagnose = opcodes.OpDiagnoseOS(output_fields=["name", "valid",
+ "variants"], names=[])
+ result = self.ExecOp(True, op_diagnose)
if not result:
Err("Can't get the OS list")
- # filter non-valid OS-es
- os_set = [val[0] for val in result if val[1]]
+ found = False
+ for (name, valid, variants) in result:
+ if valid and self.opts.os in cli.CalculateOSNames(name, variants):
+ found = True
+ break
- if self.opts.os not in os_set:
+ if not found:
Err("OS '%s' not found" % self.opts.os)
+ @_DoCheckInstances
+ @_DoBatch(False)
def BurnCreateInstances(self):
"""Create the given instances.
pnode=pnode,
snode=snode,
start=True,
- ip_check=True,
+ ip_check=self.opts.ip_check,
+ name_check=self.opts.name_check,
wait_for_sync=True,
file_driver="loop",
file_storage_dir=None,
self.ExecOrQueue(instance, op)
self.to_rem.append(instance)
- self.CommitQueue()
-
- for instance in self.instances:
- self._CheckInstanceAlive(instance)
-
+ @_DoBatch(False)
def BurnGrowDisks(self):
"""Grow both the os and the swap disks by the requested amount, if any."""
Log("Growing disks")
amount=growth, wait_for_sync=True)
Log("increase disk/%s by %s MB" % (idx, growth), indent=2)
self.ExecOrQueue(instance, op)
- self.CommitQueue()
+ @_DoBatch(True)
def BurnReplaceDisks1D8(self):
"""Replace disks on primary and secondary for drbd8."""
Log("Replacing disks on the same nodes")
Log("run %s" % mode, indent=2)
ops.append(op)
self.ExecOrQueue(instance, *ops)
- self.CommitQueue()
+ @_DoBatch(True)
def BurnReplaceDisks2(self):
"""Replace secondary node."""
Log("Changing the secondary node")
mode=mode,
remote_node=tnode,
iallocator=self.opts.iallocator,
- disks=[i for i in range(self.disk_count)])
+ disks=[])
Log("run %s %s" % (mode, msg), indent=2)
self.ExecOrQueue(instance, op)
- self.CommitQueue()
+ @_DoCheckInstances
+ @_DoBatch(False)
def BurnFailover(self):
"""Failover the instances."""
Log("Failing over instances")
Log("instance %s" % instance, indent=1)
op = opcodes.OpFailoverInstance(instance_name=instance,
ignore_consistency=False)
+ self.ExecOrQueue(instance, op)
+ @_DoCheckInstances
+ @_DoBatch(False)
+ def BurnMove(self):
+ """Move the instances."""
+ Log("Moving instances")
+ mytor = izip(islice(cycle(self.nodes), 1, None),
+ self.instances)
+ for tnode, instance in mytor:
+ Log("instance %s" % instance, indent=1)
+ op = opcodes.OpMoveInstance(instance_name=instance,
+ target_node=tnode)
self.ExecOrQueue(instance, op)
- self.CommitQueue()
- for instance in self.instances:
- self._CheckInstanceAlive(instance)
+ @_DoBatch(False)
def BurnMigrate(self):
"""Migrate the instances."""
Log("Migrating instances")
cleanup=True)
Log("migration and migration cleanup", indent=2)
self.ExecOrQueue(instance, op1, op2)
- self.CommitQueue()
+ @_DoCheckInstances
+ @_DoBatch(False)
def BurnImportExport(self):
"""Export the instance, delete it, and import it back.
# read the full name of the instance
nam_op = opcodes.OpQueryInstances(output_fields=["name"],
names=[instance], use_locking=True)
- full_name = self.ExecOp(nam_op)[0][0]
+ full_name = self.ExecOp(False, nam_op)[0][0]
if self.opts.iallocator:
pnode = snode = None
pnode=pnode,
snode=snode,
start=True,
- ip_check=True,
+ ip_check=self.opts.ip_check,
+ name_check=self.opts.name_check,
wait_for_sync=True,
file_storage_dir=None,
file_driver="loop",
Log("remove export", indent=2)
self.ExecOrQueue(instance, exp_op, rem_op, imp_op, erem_op)
- self.CommitQueue()
- for instance in self.instances:
- self._CheckInstanceAlive(instance)
-
def StopInstanceOp(self, instance):
"""Stop given instance."""
return opcodes.OpShutdownInstance(instance_name=instance)
return opcodes.OpRenameInstance(instance_name=instance,
new_name=instance_new)
+ @_DoCheckInstances
+ @_DoBatch(True)
def BurnStopStart(self):
"""Stop/start the instances."""
Log("Stopping and starting instances")
op2 = self.StartInstanceOp(instance)
self.ExecOrQueue(instance, op1, op2)
- self.CommitQueue()
-
- for instance in self.instances:
- self._CheckInstanceAlive(instance)
-
+ @_DoBatch(False)
def BurnRemove(self):
"""Remove the instances."""
Log("Removing instances")
ignore_failures=True)
self.ExecOrQueue(instance, op)
- self.CommitQueue()
-
def BurnRename(self):
"""Rename the instances.
rename = self.opts.rename
for instance in self.instances:
Log("instance %s" % instance, indent=1)
- op_stop = self.StopInstanceOp(instance)
+ op_stop1 = self.StopInstanceOp(instance)
+ op_stop2 = self.StopInstanceOp(rename)
op_rename1 = self.RenameInstanceOp(instance, rename)
op_rename2 = self.RenameInstanceOp(rename, instance)
op_start1 = self.StartInstanceOp(rename)
op_start2 = self.StartInstanceOp(instance)
- self.ExecOp(op_stop, op_rename1, op_start1)
+ self.ExecOp(False, op_stop1, op_rename1, op_start1)
self._CheckInstanceAlive(rename)
- self.ExecOp(op_stop, op_rename2, op_start2)
+ self.ExecOp(False, op_stop2, op_rename2, op_start2)
self._CheckInstanceAlive(instance)
+ @_DoCheckInstances
+ @_DoBatch(True)
def BurnReinstall(self):
"""Reinstall the instances."""
Log("Reinstalling instances")
op4 = self.StartInstanceOp(instance)
self.ExecOrQueue(instance, op1, op2, op3, op4)
- self.CommitQueue()
-
- for instance in self.instances:
- self._CheckInstanceAlive(instance)
-
+ @_DoCheckInstances
+ @_DoBatch(True)
def BurnReboot(self):
"""Reboot the instances."""
Log("Rebooting instances")
ops.append(op)
self.ExecOrQueue(instance, *ops)
- self.CommitQueue()
-
- for instance in self.instances:
- self._CheckInstanceAlive(instance)
-
+ @_DoCheckInstances
+ @_DoBatch(True)
def BurnActivateDisks(self):
"""Activate and deactivate disks of the instances."""
Log("Activating/deactivating disks")
Log("activate disks when offline", indent=2)
Log("deactivate disks (when offline)", indent=2)
self.ExecOrQueue(instance, op_act, op_stop, op_act, op_deact, op_start)
- self.CommitQueue()
- for instance in self.instances:
- self._CheckInstanceAlive(instance)
+ @_DoCheckInstances
+ @_DoBatch(False)
def BurnAddRemoveDisks(self):
"""Add and remove an extra disk for the instances."""
Log("Adding and removing disks")
Log("adding a disk", indent=2)
Log("removing last disk", indent=2)
self.ExecOrQueue(instance, op_add, op_stop, op_rem, op_start)
- self.CommitQueue()
- for instance in self.instances:
- self._CheckInstanceAlive(instance)
+ @_DoBatch(False)
def BurnAddRemoveNICs(self):
"""Add and remove an extra NIC for the instances."""
Log("Adding and removing NICs")
Log("adding a NIC", indent=2)
Log("removing last NIC", indent=2)
self.ExecOrQueue(instance, op_add, op_rem)
- self.CommitQueue()
def _CheckInstanceAlive(self, instance):
"""Check if an instance is alive by doing http checks.
if opts.do_migrate and opts.disk_template == constants.DT_DRBD8:
self.BurnMigrate()
+ if (opts.do_move and len(self.nodes) > 1 and
+ opts.disk_template in [constants.DT_PLAIN, constants.DT_FILE]):
+ self.BurnMove()
+
if (opts.do_importexport and
opts.disk_template not in (constants.DT_DISKLESS,
constants.DT_FILE)):
Log(self.GetFeedbackBuf())
Log("\n\n")
if not self.opts.keep_instances:
- self.BurnRemove()
+ try:
+ self.BurnRemove()
+ except Exception, err:
+ if has_err: # already detected errors, so errors in removal
+ # are quite expected
+ Log("Note: error detected during instance remove: %s" % str(err))
+ else: # non-expected error
+ raise
return 0