X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/801cda9486b2fe11da62a2f967e1185bd2f84c22..2d6db53aea63533829fa60ae30c574bcdb7fad3b:/tools/burnin diff --git a/tools/burnin b/tools/burnin index 80076b6..381e54a 100755 --- a/tools/burnin +++ b/tools/burnin @@ -28,13 +28,11 @@ import sys import optparse import time import socket -import urllib2 -import errno +import urllib from itertools import izip, islice, cycle from cStringIO import StringIO from ganeti import opcodes -from ganeti import mcpu from ganeti import constants from ganeti import cli from ganeti import errors @@ -43,11 +41,16 @@ from ganeti import utils USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...") +MAX_RETRIES = 3 class InstanceDown(Exception): """The checked instance was not up""" +class BurninFailure(Exception): + """Failure detected during burning""" + + def Usage(): """Shows program usage information and exits the program.""" @@ -77,19 +80,42 @@ def Err(msg, exit_code=1): sys.stderr.flush() sys.exit(exit_code) + +class SimpleOpener(urllib.FancyURLopener): + """A simple url opener""" + + def prompt_user_passwd(self, host, realm, clear_cache = 0): + """No-interaction version of prompt_user_passwd.""" + return None, None + + def http_error_default(self, url, fp, errcode, errmsg, headers): + """Custom error handling""" + # make sure sockets are not left in CLOSE_WAIT, this is similar + # but with a different exception to the BasicURLOpener class + _ = fp.read() # throw away data + fp.close() + raise InstanceDown("HTTP error returned: code %s, msg %s" % + (errcode, errmsg)) + + class Burner(object): """Burner class.""" def __init__(self): """Constructor.""" utils.SetupLogging(constants.LOG_BURNIN, debug=False, stderr_logging=True) + self.url_opener = SimpleOpener() self._feed_buf = StringIO() self.nodes = [] self.instances = [] self.to_rem = [] + self.queued_ops = [] self.opts = None - self.cl = cli.GetClient() + self.queue_retry = False + self.disk_count = self.disk_growth = self.disk_size = None + self.hvp = self.bep = None self.ParseOptions() + self.cl = cli.GetClient() self.GetState() def ClearFeedbackBuf(self): @@ -107,29 +133,151 @@ class Burner(object): if self.opts.verbose: Log(msg, indent=3) - def ExecOp(self, op): + def MaybeRetry(self, retry_count, msg, fn, *args): + """Possibly retry a given function execution. + + @type retry_count: int + @param retry_count: retry counter: + - 0: non-retryable action + - 1: last retry for a retryable action + - MAX_RETRIES: original try for a retryable action + @type msg: str + @param msg: the kind of the operation + @type fn: callable + @param fn: the function to be called + + """ + try: + val = fn(*args) + if retry_count > 0 and retry_count < MAX_RETRIES: + Log("Idempotent %s succeeded after %d retries" % + (msg, MAX_RETRIES - retry_count)) + return val + except Exception, err: + if retry_count == 0: + Log("Non-idempotent %s failed, aborting" % (msg, )) + raise + elif retry_count == 1: + Log("Idempotent %s repeated failure, aborting" % (msg, )) + raise + else: + Log("Idempotent %s failed, retry #%d/%d: %s" % + (msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err)) + self.MaybeRetry(retry_count - 1, msg, fn, *args) + + def _ExecOp(self, *ops): + """Execute one or more opcodes and manage the exec buffer. + + @result: if only opcode has been passed, we return its result; + otherwise we return the list of results + + """ + job_id = cli.SendJob(ops, cl=self.cl) + results = cli.PollJob(job_id, cl=self.cl, feedback_fn=self.Feedback) + if len(ops) == 1: + return results[0] + else: + return results + + def ExecOp(self, retry, *ops): + """Execute one or more opcodes and manage the exec buffer. + + @result: if only opcode has been passed, we return its result; + otherwise we return the list of results + + """ + if retry: + rval = MAX_RETRIES + else: + rval = 0 + return self.MaybeRetry(rval, "opcode", self._ExecOp, *ops) + + def ExecOrQueue(self, name, *ops): """Execute an opcode and manage the exec buffer.""" - self.ClearFeedbackBuf() - return cli.SubmitOpCode(op, feedback_fn=self.Feedback, cl=self.cl) + if self.opts.parallel: + self.queued_ops.append((ops, name)) + else: + return self.ExecOp(self.queue_retry, *ops) + + def StartBatch(self, retry): + """Start a new batch of jobs. + + @param retry: whether this is a retryable batch + + """ + self.queued_ops = [] + self.queue_retry = retry + + def CommitQueue(self): + """Execute all submitted opcodes in case of parallel burnin""" + if not self.opts.parallel: + return + + if self.queue_retry: + rval = MAX_RETRIES + else: + rval = 0 + + try: + results = self.MaybeRetry(rval, "jobset", self.ExecJobSet, + self.queued_ops) + finally: + self.queued_ops = [] + return results def ExecJobSet(self, jobs): """Execute a set of jobs and return once all are done. The method will return the list of results, if all jobs are - successfull. Otherwise, OpExecError will be raised from within + successful. Otherwise, OpExecError will be raised from within cli.py. """ self.ClearFeedbackBuf() - job_ids = [cli.SendJob(job, cl=self.cl) for job in jobs] - Log("Submitted job IDs %s" % ", ".join(job_ids), indent=1) + job_ids = [cli.SendJob(row[0], cl=self.cl) for row in jobs] + Log("Submitted job ID(s) %s" % ", ".join(job_ids), indent=1) results = [] - for jid in job_ids: - Log("Waiting for job %s" % jid, indent=2) - results.append(cli.PollJob(jid, cl=self.cl, feedback_fn=self.Feedback)) - + for jid, (_, iname) in zip(job_ids, jobs): + Log("waiting for job %s for %s" % (jid, iname), indent=2) + try: + results.append(cli.PollJob(jid, cl=self.cl, feedback_fn=self.Feedback)) + except Exception, err: + Log("Job for %s failed: %s" % (iname, err)) + if len(results) != len(jobs): + raise BurninFailure() return results + def _DoCheckInstances(fn): + """Decorator for checking instances. + + """ + def wrapper(self, *args, **kwargs): + val = fn(self, *args, **kwargs) + for instance in self.instances: + self._CheckInstanceAlive(instance) + return val + + return wrapper + + def _DoBatch(retry): + """Decorator for possible batch operations. + + Must come after the _DoCheckInstances decorator (if any). + + @param retry: whether this is a retryable batch, will be + passed to StartBatch + + """ + def wrap(fn): + def batched(self, *args, **kwargs): + self.StartBatch(retry) + val = fn(self, *args, **kwargs) + self.CommitQueue() + return val + return batched + + return wrap + def ParseOptions(self): """Parses the command line options. @@ -204,7 +352,7 @@ class Burner(object): parser.add_option("-n", "--nodes", dest="nodes", default="", help="Comma separated list of nodes to perform" " the burnin on (defaults to all nodes)") - parser.add_option("--iallocator", dest="iallocator", + parser.add_option("-I", "--iallocator", dest="iallocator", default=None, type="string", help="Perform the allocation using an iallocator" " instead of fixed node spread (node restrictions no" @@ -222,6 +370,12 @@ class Burner(object): help="Enable checking of instance status via http," " looking for /hostname.txt that should contain the" " name of the instance") + parser.add_option("-K", "--keep-instances", default=False, + action="store_true", + dest="keep_instances", + help="Leave instances on the cluster after burnin," + " for investigation in case of errors or simply" + " to use them") options, args = parser.parse_args() @@ -272,15 +426,16 @@ class Burner(object): else: names = [] try: - op = opcodes.OpQueryNodes(output_fields=["name", "offline"], names=names) - result = self.ExecOp(op) + op = opcodes.OpQueryNodes(output_fields=["name", "offline", "drained"], + names=names, use_locking=True) + result = self.ExecOp(True, op) except errors.GenericError, err: err_code, msg = cli.FormatError(err) Err(msg, exit_code=err_code) - self.nodes = [data[0] for data in result if not data[1]] + self.nodes = [data[0] for data in result if not (data[1] or data[2])] - result = self.ExecOp(opcodes.OpDiagnoseOS(output_fields=["name", "valid"], - names=[])) + op_diagos = opcodes.OpDiagnoseOS(output_fields=["name", "valid"], names=[]) + result = self.ExecOp(True, op_diagos) if not result: Err("Can't get the OS list") @@ -291,7 +446,9 @@ class Burner(object): if self.opts.os not in os_set: Err("OS '%s' not found" % self.opts.os) - def CreateInstances(self): + @_DoCheckInstances + @_DoBatch(False) + def BurnCreateInstances(self): """Create the given instances. """ @@ -299,7 +456,6 @@ class Burner(object): mytor = izip(cycle(self.nodes), islice(cycle(self.nodes), 1, None), self.instances) - jobset = [] Log("Creating instances") for pnode, snode, instance in mytor: @@ -334,21 +490,11 @@ class Burner(object): hvparams=self.hvp, ) - if self.opts.parallel: - jobset.append([op]) - # FIXME: here we should not append to to_rem uncoditionally, - # but only when the job is successful - self.to_rem.append(instance) - else: - self.ExecOp(op) - self.to_rem.append(instance) - if self.opts.parallel: - self.ExecJobSet(jobset) - - for instance in self.instances: - self._CheckInstanceAlive(instance) + self.ExecOrQueue(instance, op) + self.to_rem.append(instance) - def GrowDisks(self): + @_DoBatch(False) + def BurnGrowDisks(self): """Grow both the os and the swap disks by the requested amount, if any.""" Log("Growing disks") for instance in self.instances: @@ -358,21 +504,25 @@ class Burner(object): op = opcodes.OpGrowDisk(instance_name=instance, disk=idx, amount=growth, wait_for_sync=True) Log("increase disk/%s by %s MB" % (idx, growth), indent=2) - self.ExecOp(op) + self.ExecOrQueue(instance, op) - def ReplaceDisks1D8(self): + @_DoBatch(True) + def BurnReplaceDisks1D8(self): """Replace disks on primary and secondary for drbd8.""" Log("Replacing disks on the same nodes") for instance in self.instances: Log("instance %s" % instance, indent=1) + ops = [] for mode in constants.REPLACE_DISK_SEC, constants.REPLACE_DISK_PRI: op = opcodes.OpReplaceDisks(instance_name=instance, mode=mode, disks=[i for i in range(self.disk_count)]) Log("run %s" % mode, indent=2) - self.ExecOp(op) + ops.append(op) + self.ExecOrQueue(instance, *ops) - def ReplaceDisks2(self): + @_DoBatch(True) + def BurnReplaceDisks2(self): """Replace secondary node.""" Log("Changing the secondary node") mode = constants.REPLACE_DISK_CHG @@ -390,11 +540,13 @@ class Burner(object): mode=mode, remote_node=tnode, iallocator=self.opts.iallocator, - disks=[i for i in range(self.disk_count)]) + disks=[]) Log("run %s %s" % (mode, msg), indent=2) - self.ExecOp(op) + self.ExecOrQueue(instance, op) - def Failover(self): + @_DoCheckInstances + @_DoBatch(False) + def BurnFailover(self): """Failover the instances.""" Log("Failing over instances") for instance in self.instances: @@ -402,26 +554,25 @@ class Burner(object): op = opcodes.OpFailoverInstance(instance_name=instance, ignore_consistency=False) - self.ExecOp(op) - for instance in self.instances: - self._CheckInstanceAlive(instance) + self.ExecOrQueue(instance, op) - def Migrate(self): + @_DoBatch(False) + def BurnMigrate(self): """Migrate the instances.""" Log("Migrating instances") for instance in self.instances: Log("instance %s" % instance, indent=1) - op = opcodes.OpMigrateInstance(instance_name=instance, live=True, - cleanup=False) + op1 = opcodes.OpMigrateInstance(instance_name=instance, live=True, + cleanup=False) - Log("migration", indent=2) - self.ExecOp(op) - op = opcodes.OpMigrateInstance(instance_name=instance, live=True, - cleanup=True) - Log("migration cleanup", indent=2) - self.ExecOp(op) + op2 = opcodes.OpMigrateInstance(instance_name=instance, live=True, + cleanup=True) + Log("migration and migration cleanup", indent=2) + self.ExecOrQueue(instance, op1, op2) - def ImportExport(self): + @_DoCheckInstances + @_DoBatch(False) + def BurnImportExport(self): """Export the instance, delete it, and import it back. """ @@ -433,6 +584,11 @@ class Burner(object): for pnode, snode, enode, instance in mytor: Log("instance %s" % instance, indent=1) + # read the full name of the instance + nam_op = opcodes.OpQueryInstances(output_fields=["name"], + names=[instance], use_locking=True) + full_name = self.ExecOp(False, nam_op)[0][0] + if self.opts.iallocator: pnode = snode = None import_log_msg = ("import from %s" @@ -451,9 +607,6 @@ class Burner(object): shutdown=True) rem_op = opcodes.OpRemoveInstance(instance_name=instance, ignore_failures=True) - nam_op = opcodes.OpQueryInstances(output_fields=["name"], - names=[instance]) - full_name = self.ExecOp(nam_op)[0][0] imp_dir = os.path.join(constants.EXPORT_DIR, full_name) imp_op = opcodes.OpCreateInstance(instance_name=instance, disks = [ {"size": size} @@ -478,125 +631,118 @@ class Burner(object): erem_op = opcodes.OpRemoveExport(instance_name=instance) Log("export to node %s" % enode, indent=2) - self.ExecOp(exp_op) Log("remove instance", indent=2) - self.ExecOp(rem_op) - self.to_rem.remove(instance) Log(import_log_msg, indent=2) - self.ExecOp(imp_op) Log("remove export", indent=2) - self.ExecOp(erem_op) - - self.to_rem.append(instance) + self.ExecOrQueue(instance, exp_op, rem_op, imp_op, erem_op) - for instance in self.instances: - self._CheckInstanceAlive(instance) - - def StopInstance(self, instance): + def StopInstanceOp(self, instance): """Stop given instance.""" - op = opcodes.OpShutdownInstance(instance_name=instance) - Log("shutdown", indent=2) - self.ExecOp(op) + return opcodes.OpShutdownInstance(instance_name=instance) - def StartInstance(self, instance): + def StartInstanceOp(self, instance): """Start given instance.""" - op = opcodes.OpStartupInstance(instance_name=instance, force=False) - Log("startup", indent=2) - self.ExecOp(op) + return opcodes.OpStartupInstance(instance_name=instance, force=False) - def RenameInstance(self, instance, instance_new): + def RenameInstanceOp(self, instance, instance_new): """Rename instance.""" - op = opcodes.OpRenameInstance(instance_name=instance, - new_name=instance_new) - Log("rename to %s" % instance_new, indent=2) - self.ExecOp(op) + return opcodes.OpRenameInstance(instance_name=instance, + new_name=instance_new) - def StopStart(self): + @_DoCheckInstances + @_DoBatch(True) + def BurnStopStart(self): """Stop/start the instances.""" Log("Stopping and starting instances") for instance in self.instances: Log("instance %s" % instance, indent=1) - self.StopInstance(instance) - self.StartInstance(instance) - - for instance in self.instances: - self._CheckInstanceAlive(instance) + op1 = self.StopInstanceOp(instance) + op2 = self.StartInstanceOp(instance) + self.ExecOrQueue(instance, op1, op2) - def Remove(self): + @_DoBatch(False) + def BurnRemove(self): """Remove the instances.""" Log("Removing instances") for instance in self.to_rem: Log("instance %s" % instance, indent=1) op = opcodes.OpRemoveInstance(instance_name=instance, ignore_failures=True) - self.ExecOp(op) + self.ExecOrQueue(instance, op) - def Rename(self): - """Rename the instances.""" + def BurnRename(self): + """Rename the instances. + + Note that this function will not execute in parallel, since we + only have one target for rename. + + """ Log("Renaming instances") rename = self.opts.rename for instance in self.instances: Log("instance %s" % instance, indent=1) - self.StopInstance(instance) - self.RenameInstance(instance, rename) - self.StartInstance(rename) + op_stop1 = self.StopInstanceOp(instance) + op_stop2 = self.StopInstanceOp(rename) + op_rename1 = self.RenameInstanceOp(instance, rename) + op_rename2 = self.RenameInstanceOp(rename, instance) + op_start1 = self.StartInstanceOp(rename) + op_start2 = self.StartInstanceOp(instance) + self.ExecOp(False, op_stop1, op_rename1, op_start1) self._CheckInstanceAlive(rename) - self.StopInstance(rename) - self.RenameInstance(rename, instance) - self.StartInstance(instance) - - for instance in self.instances: + self.ExecOp(False, op_stop2, op_rename2, op_start2) self._CheckInstanceAlive(instance) - def Reinstall(self): + @_DoCheckInstances + @_DoBatch(True) + def BurnReinstall(self): """Reinstall the instances.""" Log("Reinstalling instances") for instance in self.instances: Log("instance %s" % instance, indent=1) - self.StopInstance(instance) - op = opcodes.OpReinstallInstance(instance_name=instance) + op1 = self.StopInstanceOp(instance) + op2 = opcodes.OpReinstallInstance(instance_name=instance) Log("reinstall without passing the OS", indent=2) - self.ExecOp(op) - op = opcodes.OpReinstallInstance(instance_name=instance, - os_type=self.opts.os) + op3 = opcodes.OpReinstallInstance(instance_name=instance, + os_type=self.opts.os) Log("reinstall specifying the OS", indent=2) - self.ExecOp(op) - self.StartInstance(instance) - for instance in self.instances: - self._CheckInstanceAlive(instance) + op4 = self.StartInstanceOp(instance) + self.ExecOrQueue(instance, op1, op2, op3, op4) - def Reboot(self): + @_DoCheckInstances + @_DoBatch(True) + def BurnReboot(self): """Reboot the instances.""" Log("Rebooting instances") for instance in self.instances: Log("instance %s" % instance, indent=1) + ops = [] for reboot_type in constants.REBOOT_TYPES: op = opcodes.OpRebootInstance(instance_name=instance, reboot_type=reboot_type, ignore_secondaries=False) Log("reboot with type '%s'" % reboot_type, indent=2) - self.ExecOp(op) - self._CheckInstanceAlive(instance) + ops.append(op) + self.ExecOrQueue(instance, *ops) - def ActivateDisks(self): + @_DoCheckInstances + @_DoBatch(True) + def BurnActivateDisks(self): """Activate and deactivate disks of the instances.""" Log("Activating/deactivating disks") for instance in self.instances: Log("instance %s" % instance, indent=1) + op_start = self.StartInstanceOp(instance) op_act = opcodes.OpActivateInstanceDisks(instance_name=instance) op_deact = opcodes.OpDeactivateInstanceDisks(instance_name=instance) + op_stop = self.StopInstanceOp(instance) Log("activate disks when online", indent=2) - self.ExecOp(op_act) - self.StopInstance(instance) Log("activate disks when offline", indent=2) - self.ExecOp(op_act) Log("deactivate disks (when offline)", indent=2) - self.ExecOp(op_deact) - self.StartInstance(instance) - for instance in self.instances: - self._CheckInstanceAlive(instance) + self.ExecOrQueue(instance, op_act, op_stop, op_act, op_deact, op_start) - def AddRemoveDisks(self): + @_DoCheckInstances + @_DoBatch(False) + def BurnAddRemoveDisks(self): """Add and remove an extra disk for the instances.""" Log("Adding and removing disks") for instance in self.instances: @@ -606,16 +752,14 @@ class Burner(object): disks=[(constants.DDM_ADD, {"size": self.disk_size[0]})]) op_rem = opcodes.OpSetInstanceParams(\ instance_name=instance, disks=[(constants.DDM_REMOVE, {})]) + op_stop = self.StopInstanceOp(instance) + op_start = self.StartInstanceOp(instance) Log("adding a disk", indent=2) - self.ExecOp(op_add) - self.StopInstance(instance) Log("removing last disk", indent=2) - self.ExecOp(op_rem) - self.StartInstance(instance) - for instance in self.instances: - self._CheckInstanceAlive(instance) + self.ExecOrQueue(instance, op_add, op_stop, op_rem, op_start) - def AddRemoveNICs(self): + @_DoBatch(False) + def BurnAddRemoveNICs(self): """Add and remove an extra NIC for the instances.""" Log("Adding and removing NICs") for instance in self.instances: @@ -625,9 +769,8 @@ class Burner(object): op_rem = opcodes.OpSetInstanceParams(\ instance_name=instance, nics=[(constants.DDM_REMOVE, {})]) Log("adding a NIC", indent=2) - self.ExecOp(op_add) Log("removing last NIC", indent=2) - self.ExecOp(op_rem) + self.ExecOrQueue(instance, op_add, op_rem) def _CheckInstanceAlive(self, instance): """Check if an instance is alive by doing http checks. @@ -640,18 +783,18 @@ class Burner(object): """ if not self.opts.http_check: return - try: - for retries in range(self.opts.net_timeout): - try: - url = urllib2.urlopen("http://%s/hostname.txt" % instance) - except urllib2.URLError, err: - if err.args[0][0] == errno.ECONNREFUSED: - time.sleep(1) - continue - raise - except urllib2.URLError, err: - raise InstanceDown(instance, str(err)) + end_time = time.time() + self.opts.net_timeout + url = None + while time.time() < end_time and url is None: + try: + url = self.url_opener.open("http://%s/hostname.txt" % instance) + except IOError: + # here we can have connection refused, no route to host, etc. + time.sleep(1) + if url is None: + raise InstanceDown(instance, "Cannot contact instance") hostname = url.read().strip() + url.close() if hostname != instance: raise InstanceDown(instance, ("Hostname mismatch, expected %s, got %s" % (instance, hostname))) @@ -676,47 +819,48 @@ class Burner(object): has_err = True try: - self.CreateInstances() + self.BurnCreateInstances() if opts.do_replace1 and opts.disk_template in constants.DTS_NET_MIRROR: - self.ReplaceDisks1D8() + self.BurnReplaceDisks1D8() if (opts.do_replace2 and len(self.nodes) > 2 and opts.disk_template in constants.DTS_NET_MIRROR) : - self.ReplaceDisks2() + self.BurnReplaceDisks2() - if opts.disk_template != constants.DT_DISKLESS: - self.GrowDisks() + if (opts.disk_template != constants.DT_DISKLESS and + utils.any(self.disk_growth, lambda n: n > 0)): + self.BurnGrowDisks() if opts.do_failover and opts.disk_template in constants.DTS_NET_MIRROR: - self.Failover() + self.BurnFailover() if opts.do_migrate and opts.disk_template == constants.DT_DRBD8: - self.Migrate() + self.BurnMigrate() if (opts.do_importexport and opts.disk_template not in (constants.DT_DISKLESS, constants.DT_FILE)): - self.ImportExport() + self.BurnImportExport() if opts.do_reinstall: - self.Reinstall() + self.BurnReinstall() if opts.do_reboot: - self.Reboot() + self.BurnReboot() if opts.do_addremove_disks: - self.AddRemoveDisks() + self.BurnAddRemoveDisks() if opts.do_addremove_nics: - self.AddRemoveNICs() + self.BurnAddRemoveNICs() if opts.do_activate_disks: - self.ActivateDisks() + self.BurnActivateDisks() if opts.rename: - self.Rename() + self.BurnRename() if opts.do_startstop: - self.StopStart() + self.BurnStopStart() has_err = False finally: @@ -724,7 +868,15 @@ class Burner(object): Log("Error detected: opcode buffer follows:\n\n") Log(self.GetFeedbackBuf()) Log("\n\n") - self.Remove() + if not self.opts.keep_instances: + try: + self.BurnRemove() + except Exception, err: + if has_err: # already detected errors, so errors in removal + # are quite expected + Log("Note: error detected during instance remove: %s" % str(err)) + else: # non-expected error + raise return 0