cli: Exit with status 0 for --help
[ganeti-local] / tools / burnin
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Burnin program
23
24 """
25
26 import sys
27 import optparse
28 import time
29 import socket
30 import urllib
31 from itertools import izip, islice, cycle
32 from cStringIO import StringIO
33
34 from ganeti import opcodes
35 from ganeti import constants
36 from ganeti import cli
37 from ganeti import errors
38 from ganeti import utils
39 from ganeti import hypervisor
40 from ganeti import compat
41 from ganeti import pathutils
42
43 from ganeti.confd import client as confd_client
44
45
46 USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...")
47
48 MAX_RETRIES = 3
49 LOG_HEADERS = {
50   0: "- ",
51   1: "* ",
52   2: ""
53   }
54
55
56 class InstanceDown(Exception):
57   """The checked instance was not up"""
58
59
60 class BurninFailure(Exception):
61   """Failure detected during burning"""
62
63
64 def Usage():
65   """Shows program usage information and exits the program."""
66
67   print >> sys.stderr, "Usage:"
68   print >> sys.stderr, USAGE
69   sys.exit(2)
70
71
72 def Log(msg, *args, **kwargs):
73   """Simple function that prints out its argument.
74
75   """
76   if args:
77     msg = msg % args
78   indent = kwargs.get("indent", 0)
79   sys.stdout.write("%*s%s%s\n" % (2 * indent, "",
80                                   LOG_HEADERS.get(indent, "  "), msg))
81   sys.stdout.flush()
82
83
84 def Err(msg, exit_code=1):
85   """Simple error logging that prints to stderr.
86
87   """
88   sys.stderr.write(msg + "\n")
89   sys.stderr.flush()
90   sys.exit(exit_code)
91
92
93 class SimpleOpener(urllib.FancyURLopener):
94   """A simple url opener"""
95   # pylint: disable=W0221
96
97   def prompt_user_passwd(self, host, realm, clear_cache=0):
98     """No-interaction version of prompt_user_passwd."""
99     # we follow parent class' API
100     # pylint: disable=W0613
101     return None, None
102
103   def http_error_default(self, url, fp, errcode, errmsg, headers):
104     """Custom error handling"""
105     # make sure sockets are not left in CLOSE_WAIT, this is similar
106     # but with a different exception to the BasicURLOpener class
107     _ = fp.read() # throw away data
108     fp.close()
109     raise InstanceDown("HTTP error returned: code %s, msg %s" %
110                        (errcode, errmsg))
111
112
113 OPTIONS = [
114   cli.cli_option("-o", "--os", dest="os", default=None,
115                  help="OS to use during burnin",
116                  metavar="<OS>",
117                  completion_suggest=cli.OPT_COMPL_ONE_OS),
118   cli.HYPERVISOR_OPT,
119   cli.OSPARAMS_OPT,
120   cli.cli_option("--disk-size", dest="disk_size",
121                  help="Disk size (determines disk count)",
122                  default="128m", type="string", metavar="<size,size,...>",
123                  completion_suggest=("128M 512M 1G 4G 1G,256M"
124                                      " 4G,1G,1G 10G").split()),
125   cli.cli_option("--disk-growth", dest="disk_growth", help="Disk growth",
126                  default="128m", type="string", metavar="<size,size,...>"),
127   cli.cli_option("--mem-size", dest="mem_size", help="Memory size",
128                  default=None, type="unit", metavar="<size>",
129                  completion_suggest=("128M 256M 512M 1G 4G 8G"
130                                      " 12G 16G").split()),
131   cli.cli_option("--maxmem-size", dest="maxmem_size", help="Max Memory size",
132                  default=256, type="unit", metavar="<size>",
133                  completion_suggest=("128M 256M 512M 1G 4G 8G"
134                                      " 12G 16G").split()),
135   cli.cli_option("--minmem-size", dest="minmem_size", help="Min Memory size",
136                  default=128, type="unit", metavar="<size>",
137                  completion_suggest=("128M 256M 512M 1G 4G 8G"
138                                      " 12G 16G").split()),
139   cli.cli_option("--vcpu-count", dest="vcpu_count", help="VCPU count",
140                  default=3, type="unit", metavar="<count>",
141                  completion_suggest=("1 2 3 4").split()),
142   cli.DEBUG_OPT,
143   cli.VERBOSE_OPT,
144   cli.NOIPCHECK_OPT,
145   cli.NONAMECHECK_OPT,
146   cli.EARLY_RELEASE_OPT,
147   cli.cli_option("--no-replace1", dest="do_replace1",
148                  help="Skip disk replacement with the same secondary",
149                  action="store_false", default=True),
150   cli.cli_option("--no-replace2", dest="do_replace2",
151                  help="Skip disk replacement with a different secondary",
152                  action="store_false", default=True),
153   cli.cli_option("--no-failover", dest="do_failover",
154                  help="Skip instance failovers", action="store_false",
155                  default=True),
156   cli.cli_option("--no-migrate", dest="do_migrate",
157                  help="Skip instance live migration",
158                  action="store_false", default=True),
159   cli.cli_option("--no-move", dest="do_move",
160                  help="Skip instance moves", action="store_false",
161                  default=True),
162   cli.cli_option("--no-importexport", dest="do_importexport",
163                  help="Skip instance export/import", action="store_false",
164                  default=True),
165   cli.cli_option("--no-startstop", dest="do_startstop",
166                  help="Skip instance stop/start", action="store_false",
167                  default=True),
168   cli.cli_option("--no-reinstall", dest="do_reinstall",
169                  help="Skip instance reinstall", action="store_false",
170                  default=True),
171   cli.cli_option("--no-reboot", dest="do_reboot",
172                  help="Skip instance reboot", action="store_false",
173                  default=True),
174   cli.cli_option("--reboot-types", dest="reboot_types",
175                  help="Specify the reboot types", default=None),
176   cli.cli_option("--no-activate-disks", dest="do_activate_disks",
177                  help="Skip disk activation/deactivation",
178                  action="store_false", default=True),
179   cli.cli_option("--no-add-disks", dest="do_addremove_disks",
180                  help="Skip disk addition/removal",
181                  action="store_false", default=True),
182   cli.cli_option("--no-add-nics", dest="do_addremove_nics",
183                  help="Skip NIC addition/removal",
184                  action="store_false", default=True),
185   cli.cli_option("--no-nics", dest="nics",
186                  help="No network interfaces", action="store_const",
187                  const=[], default=[{}]),
188   cli.cli_option("--no-confd", dest="do_confd_tests",
189                  help="Skip confd queries",
190                  action="store_false", default=constants.ENABLE_CONFD),
191   cli.cli_option("--rename", dest="rename", default=None,
192                  help=("Give one unused instance name which is taken"
193                        " to start the renaming sequence"),
194                  metavar="<instance_name>"),
195   cli.cli_option("-t", "--disk-template", dest="disk_template",
196                  choices=list(constants.DISK_TEMPLATES),
197                  default=constants.DT_DRBD8,
198                  help="Disk template (diskless, file, plain, sharedfile"
199                  " or drbd) [drbd]"),
200   cli.cli_option("-n", "--nodes", dest="nodes", default="",
201                  help=("Comma separated list of nodes to perform"
202                        " the burnin on (defaults to all nodes)"),
203                  completion_suggest=cli.OPT_COMPL_MANY_NODES),
204   cli.cli_option("-I", "--iallocator", dest="iallocator",
205                  default=None, type="string",
206                  help=("Perform the allocation using an iallocator"
207                        " instead of fixed node spread (node restrictions no"
208                        " longer apply, therefore -n/--nodes must not be"
209                        " used"),
210                  completion_suggest=cli.OPT_COMPL_ONE_IALLOCATOR),
211   cli.cli_option("-p", "--parallel", default=False, action="store_true",
212                  dest="parallel",
213                  help=("Enable parallelization of some operations in"
214                        " order to speed burnin or to test granular locking")),
215   cli.cli_option("--net-timeout", default=15, type="int",
216                  dest="net_timeout",
217                  help=("The instance check network timeout in seconds"
218                        " (defaults to 15 seconds)"),
219                  completion_suggest="15 60 300 900".split()),
220   cli.cli_option("-C", "--http-check", default=False, action="store_true",
221                  dest="http_check",
222                  help=("Enable checking of instance status via http,"
223                        " looking for /hostname.txt that should contain the"
224                        " name of the instance")),
225   cli.cli_option("-K", "--keep-instances", default=False,
226                  action="store_true",
227                  dest="keep_instances",
228                  help=("Leave instances on the cluster after burnin,"
229                        " for investigation in case of errors or simply"
230                        " to use them")),
231   ]
232
233 # Mainly used for bash completion
234 ARGUMENTS = [cli.ArgInstance(min=1)]
235
236
237 def _DoCheckInstances(fn):
238   """Decorator for checking instances.
239
240   """
241   def wrapper(self, *args, **kwargs):
242     val = fn(self, *args, **kwargs)
243     for instance in self.instances:
244       self._CheckInstanceAlive(instance) # pylint: disable=W0212
245     return val
246
247   return wrapper
248
249
250 def _DoBatch(retry):
251   """Decorator for possible batch operations.
252
253   Must come after the _DoCheckInstances decorator (if any).
254
255   @param retry: whether this is a retryable batch, will be
256       passed to StartBatch
257
258   """
259   def wrap(fn):
260     def batched(self, *args, **kwargs):
261       self.StartBatch(retry)
262       val = fn(self, *args, **kwargs)
263       self.CommitQueue()
264       return val
265     return batched
266
267   return wrap
268
269
270 class Burner(object):
271   """Burner class."""
272
273   def __init__(self):
274     """Constructor."""
275     self.url_opener = SimpleOpener()
276     self._feed_buf = StringIO()
277     self.nodes = []
278     self.instances = []
279     self.to_rem = []
280     self.queued_ops = []
281     self.opts = None
282     self.queue_retry = False
283     self.disk_count = self.disk_growth = self.disk_size = None
284     self.hvp = self.bep = None
285     self.ParseOptions()
286     self.cl = cli.GetClient()
287     self.GetState()
288
289   def ClearFeedbackBuf(self):
290     """Clear the feedback buffer."""
291     self._feed_buf.truncate(0)
292
293   def GetFeedbackBuf(self):
294     """Return the contents of the buffer."""
295     return self._feed_buf.getvalue()
296
297   def Feedback(self, msg):
298     """Acumulate feedback in our buffer."""
299     formatted_msg = "%s %s" % (time.ctime(utils.MergeTime(msg[0])), msg[2])
300     self._feed_buf.write(formatted_msg + "\n")
301     if self.opts.verbose:
302       Log(formatted_msg, indent=3)
303
304   def MaybeRetry(self, retry_count, msg, fn, *args):
305     """Possibly retry a given function execution.
306
307     @type retry_count: int
308     @param retry_count: retry counter:
309         - 0: non-retryable action
310         - 1: last retry for a retryable action
311         - MAX_RETRIES: original try for a retryable action
312     @type msg: str
313     @param msg: the kind of the operation
314     @type fn: callable
315     @param fn: the function to be called
316
317     """
318     try:
319       val = fn(*args)
320       if retry_count > 0 and retry_count < MAX_RETRIES:
321         Log("Idempotent %s succeeded after %d retries",
322             msg, MAX_RETRIES - retry_count)
323       return val
324     except Exception, err: # pylint: disable=W0703
325       if retry_count == 0:
326         Log("Non-idempotent %s failed, aborting", msg)
327         raise
328       elif retry_count == 1:
329         Log("Idempotent %s repeated failure, aborting", msg)
330         raise
331       else:
332         Log("Idempotent %s failed, retry #%d/%d: %s",
333             msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err)
334         self.MaybeRetry(retry_count - 1, msg, fn, *args)
335
336   def _ExecOp(self, *ops):
337     """Execute one or more opcodes and manage the exec buffer.
338
339     @return: if only opcode has been passed, we return its result;
340         otherwise we return the list of results
341
342     """
343     job_id = cli.SendJob(ops, cl=self.cl)
344     results = cli.PollJob(job_id, cl=self.cl, feedback_fn=self.Feedback)
345     if len(ops) == 1:
346       return results[0]
347     else:
348       return results
349
350   def ExecOp(self, retry, *ops):
351     """Execute one or more opcodes and manage the exec buffer.
352
353     @return: if only opcode has been passed, we return its result;
354         otherwise we return the list of results
355
356     """
357     if retry:
358       rval = MAX_RETRIES
359     else:
360       rval = 0
361     cli.SetGenericOpcodeOpts(ops, self.opts)
362     return self.MaybeRetry(rval, "opcode", self._ExecOp, *ops)
363
364   def ExecOrQueue(self, name, ops, post_process=None):
365     """Execute an opcode and manage the exec buffer."""
366     if self.opts.parallel:
367       cli.SetGenericOpcodeOpts(ops, self.opts)
368       self.queued_ops.append((ops, name, post_process))
369     else:
370       val = self.ExecOp(self.queue_retry, *ops) # pylint: disable=W0142
371       if post_process is not None:
372         post_process()
373       return val
374
375   def StartBatch(self, retry):
376     """Start a new batch of jobs.
377
378     @param retry: whether this is a retryable batch
379
380     """
381     self.queued_ops = []
382     self.queue_retry = retry
383
384   def CommitQueue(self):
385     """Execute all submitted opcodes in case of parallel burnin"""
386     if not self.opts.parallel or not self.queued_ops:
387       return
388
389     if self.queue_retry:
390       rval = MAX_RETRIES
391     else:
392       rval = 0
393
394     try:
395       results = self.MaybeRetry(rval, "jobset", self.ExecJobSet,
396                                 self.queued_ops)
397     finally:
398       self.queued_ops = []
399     return results
400
401   def ExecJobSet(self, jobs):
402     """Execute a set of jobs and return once all are done.
403
404     The method will return the list of results, if all jobs are
405     successful. Otherwise, OpExecError will be raised from within
406     cli.py.
407
408     """
409     self.ClearFeedbackBuf()
410     jex = cli.JobExecutor(cl=self.cl, feedback_fn=self.Feedback)
411     for ops, name, _ in jobs:
412       jex.QueueJob(name, *ops) # pylint: disable=W0142
413     try:
414       results = jex.GetResults()
415     except Exception, err: # pylint: disable=W0703
416       Log("Jobs failed: %s", err)
417       raise BurninFailure()
418
419     fail = False
420     val = []
421     for (_, name, post_process), (success, result) in zip(jobs, results):
422       if success:
423         if post_process:
424           try:
425             post_process()
426           except Exception, err: # pylint: disable=W0703
427             Log("Post process call for job %s failed: %s", name, err)
428             fail = True
429         val.append(result)
430       else:
431         fail = True
432
433     if fail:
434       raise BurninFailure()
435
436     return val
437
438   def ParseOptions(self):
439     """Parses the command line options.
440
441     In case of command line errors, it will show the usage and exit the
442     program.
443
444     """
445     parser = optparse.OptionParser(usage="\n%s" % USAGE,
446                                    version=("%%prog (ganeti) %s" %
447                                             constants.RELEASE_VERSION),
448                                    option_list=OPTIONS)
449
450     options, args = parser.parse_args()
451     if len(args) < 1 or options.os is None:
452       Usage()
453
454     if options.mem_size:
455       options.maxmem_size = options.mem_size
456       options.minmem_size = options.mem_size
457     elif options.minmem_size > options.maxmem_size:
458       Err("Maximum memory lower than minimum memory")
459
460     supported_disk_templates = (constants.DT_DISKLESS,
461                                 constants.DT_FILE,
462                                 constants.DT_SHARED_FILE,
463                                 constants.DT_PLAIN,
464                                 constants.DT_DRBD8,
465                                 constants.DT_RBD,
466                                 )
467     if options.disk_template not in supported_disk_templates:
468       Err("Unknown disk template '%s'" % options.disk_template)
469
470     if options.disk_template == constants.DT_DISKLESS:
471       disk_size = disk_growth = []
472       options.do_addremove_disks = False
473     else:
474       disk_size = [utils.ParseUnit(v) for v in options.disk_size.split(",")]
475       disk_growth = [utils.ParseUnit(v)
476                      for v in options.disk_growth.split(",")]
477       if len(disk_growth) != len(disk_size):
478         Err("Wrong disk sizes/growth combination")
479     if ((disk_size and options.disk_template == constants.DT_DISKLESS) or
480         (not disk_size and options.disk_template != constants.DT_DISKLESS)):
481       Err("Wrong disk count/disk template combination")
482
483     self.disk_size = disk_size
484     self.disk_growth = disk_growth
485     self.disk_count = len(disk_size)
486
487     if options.nodes and options.iallocator:
488       Err("Give either the nodes option or the iallocator option, not both")
489
490     if options.http_check and not options.name_check:
491       Err("Can't enable HTTP checks without name checks")
492
493     self.opts = options
494     self.instances = args
495     self.bep = {
496       constants.BE_MINMEM: options.minmem_size,
497       constants.BE_MAXMEM: options.maxmem_size,
498       constants.BE_VCPUS: options.vcpu_count,
499       }
500
501     self.hypervisor = None
502     self.hvp = {}
503     if options.hypervisor:
504       self.hypervisor, self.hvp = options.hypervisor
505
506     if options.reboot_types is None:
507       options.reboot_types = constants.REBOOT_TYPES
508     else:
509       options.reboot_types = options.reboot_types.split(",")
510       rt_diff = set(options.reboot_types).difference(constants.REBOOT_TYPES)
511       if rt_diff:
512         Err("Invalid reboot types specified: %s" % utils.CommaJoin(rt_diff))
513
514     socket.setdefaulttimeout(options.net_timeout)
515
516   def GetState(self):
517     """Read the cluster state from the master daemon."""
518     if self.opts.nodes:
519       names = self.opts.nodes.split(",")
520     else:
521       names = []
522     try:
523       op = opcodes.OpNodeQuery(output_fields=["name", "offline", "drained"],
524                                names=names, use_locking=True)
525       result = self.ExecOp(True, op)
526     except errors.GenericError, err:
527       err_code, msg = cli.FormatError(err)
528       Err(msg, exit_code=err_code)
529     self.nodes = [data[0] for data in result if not (data[1] or data[2])]
530
531     op_diagnose = opcodes.OpOsDiagnose(output_fields=["name",
532                                                       "variants",
533                                                       "hidden"],
534                                        names=[])
535     result = self.ExecOp(True, op_diagnose)
536
537     if not result:
538       Err("Can't get the OS list")
539
540     found = False
541     for (name, variants, _) in result:
542       if self.opts.os in cli.CalculateOSNames(name, variants):
543         found = True
544         break
545
546     if not found:
547       Err("OS '%s' not found" % self.opts.os)
548
549     cluster_info = self.cl.QueryClusterInfo()
550     self.cluster_info = cluster_info
551     if not self.cluster_info:
552       Err("Can't get cluster info")
553
554     default_nic_params = self.cluster_info["nicparams"][constants.PP_DEFAULT]
555     self.cluster_default_nicparams = default_nic_params
556     if self.hypervisor is None:
557       self.hypervisor = self.cluster_info["default_hypervisor"]
558     self.hv_class = hypervisor.GetHypervisorClass(self.hypervisor)
559
560   @_DoCheckInstances
561   @_DoBatch(False)
562   def BurnCreateInstances(self):
563     """Create the given instances.
564
565     """
566     self.to_rem = []
567     mytor = izip(cycle(self.nodes),
568                  islice(cycle(self.nodes), 1, None),
569                  self.instances)
570
571     Log("Creating instances")
572     for pnode, snode, instance in mytor:
573       Log("instance %s", instance, indent=1)
574       if self.opts.iallocator:
575         pnode = snode = None
576         msg = "with iallocator %s" % self.opts.iallocator
577       elif self.opts.disk_template not in constants.DTS_INT_MIRROR:
578         snode = None
579         msg = "on %s" % pnode
580       else:
581         msg = "on %s, %s" % (pnode, snode)
582
583       Log(msg, indent=2)
584
585       op = opcodes.OpInstanceCreate(instance_name=instance,
586                                     disks=[{"size": size}
587                                            for size in self.disk_size],
588                                     disk_template=self.opts.disk_template,
589                                     nics=self.opts.nics,
590                                     mode=constants.INSTANCE_CREATE,
591                                     os_type=self.opts.os,
592                                     pnode=pnode,
593                                     snode=snode,
594                                     start=True,
595                                     ip_check=self.opts.ip_check,
596                                     name_check=self.opts.name_check,
597                                     wait_for_sync=True,
598                                     file_driver="loop",
599                                     file_storage_dir=None,
600                                     iallocator=self.opts.iallocator,
601                                     beparams=self.bep,
602                                     hvparams=self.hvp,
603                                     hypervisor=self.hypervisor,
604                                     osparams=self.opts.osparams,
605                                     )
606       remove_instance = lambda name: lambda: self.to_rem.append(name)
607       self.ExecOrQueue(instance, [op], post_process=remove_instance(instance))
608
609   @_DoBatch(False)
610   def BurnModifyRuntimeMemory(self):
611     """Alter the runtime memory."""
612     Log("Setting instance runtime memory")
613     for instance in self.instances:
614       Log("instance %s", instance, indent=1)
615       tgt_mem = self.bep[constants.BE_MINMEM]
616       op = opcodes.OpInstanceSetParams(instance_name=instance,
617                                        runtime_mem=tgt_mem)
618       Log("Set memory to %s MB", tgt_mem, indent=2)
619       self.ExecOrQueue(instance, [op])
620
621   @_DoBatch(False)
622   def BurnGrowDisks(self):
623     """Grow both the os and the swap disks by the requested amount, if any."""
624     Log("Growing disks")
625     for instance in self.instances:
626       Log("instance %s", instance, indent=1)
627       for idx, growth in enumerate(self.disk_growth):
628         if growth > 0:
629           op = opcodes.OpInstanceGrowDisk(instance_name=instance, disk=idx,
630                                           amount=growth, wait_for_sync=True)
631           Log("increase disk/%s by %s MB", idx, growth, indent=2)
632           self.ExecOrQueue(instance, [op])
633
634   @_DoBatch(True)
635   def BurnReplaceDisks1D8(self):
636     """Replace disks on primary and secondary for drbd8."""
637     Log("Replacing disks on the same nodes")
638     early_release = self.opts.early_release
639     for instance in self.instances:
640       Log("instance %s", instance, indent=1)
641       ops = []
642       for mode in constants.REPLACE_DISK_SEC, constants.REPLACE_DISK_PRI:
643         op = opcodes.OpInstanceReplaceDisks(instance_name=instance,
644                                             mode=mode,
645                                             disks=list(range(self.disk_count)),
646                                             early_release=early_release)
647         Log("run %s", mode, indent=2)
648         ops.append(op)
649       self.ExecOrQueue(instance, ops)
650
651   @_DoBatch(True)
652   def BurnReplaceDisks2(self):
653     """Replace secondary node."""
654     Log("Changing the secondary node")
655     mode = constants.REPLACE_DISK_CHG
656
657     mytor = izip(islice(cycle(self.nodes), 2, None),
658                  self.instances)
659     for tnode, instance in mytor:
660       Log("instance %s", instance, indent=1)
661       if self.opts.iallocator:
662         tnode = None
663         msg = "with iallocator %s" % self.opts.iallocator
664       else:
665         msg = tnode
666       op = opcodes.OpInstanceReplaceDisks(instance_name=instance,
667                                           mode=mode,
668                                           remote_node=tnode,
669                                           iallocator=self.opts.iallocator,
670                                           disks=[],
671                                           early_release=self.opts.early_release)
672       Log("run %s %s", mode, msg, indent=2)
673       self.ExecOrQueue(instance, [op])
674
675   @_DoCheckInstances
676   @_DoBatch(False)
677   def BurnFailover(self):
678     """Failover the instances."""
679     Log("Failing over instances")
680     for instance in self.instances:
681       Log("instance %s", instance, indent=1)
682       op = opcodes.OpInstanceFailover(instance_name=instance,
683                                       ignore_consistency=False)
684       self.ExecOrQueue(instance, [op])
685
686   @_DoCheckInstances
687   @_DoBatch(False)
688   def BurnMove(self):
689     """Move the instances."""
690     Log("Moving instances")
691     mytor = izip(islice(cycle(self.nodes), 1, None),
692                  self.instances)
693     for tnode, instance in mytor:
694       Log("instance %s", instance, indent=1)
695       op = opcodes.OpInstanceMove(instance_name=instance,
696                                   target_node=tnode)
697       self.ExecOrQueue(instance, [op])
698
699   @_DoBatch(False)
700   def BurnMigrate(self):
701     """Migrate the instances."""
702     Log("Migrating instances")
703     for instance in self.instances:
704       Log("instance %s", instance, indent=1)
705       op1 = opcodes.OpInstanceMigrate(instance_name=instance, mode=None,
706                                       cleanup=False)
707
708       op2 = opcodes.OpInstanceMigrate(instance_name=instance, mode=None,
709                                       cleanup=True)
710       Log("migration and migration cleanup", indent=2)
711       self.ExecOrQueue(instance, [op1, op2])
712
713   @_DoCheckInstances
714   @_DoBatch(False)
715   def BurnImportExport(self):
716     """Export the instance, delete it, and import it back.
717
718     """
719     Log("Exporting and re-importing instances")
720     mytor = izip(cycle(self.nodes),
721                  islice(cycle(self.nodes), 1, None),
722                  islice(cycle(self.nodes), 2, None),
723                  self.instances)
724
725     for pnode, snode, enode, instance in mytor:
726       Log("instance %s", instance, indent=1)
727       # read the full name of the instance
728       nam_op = opcodes.OpInstanceQuery(output_fields=["name"],
729                                        names=[instance], use_locking=True)
730       full_name = self.ExecOp(False, nam_op)[0][0]
731
732       if self.opts.iallocator:
733         pnode = snode = None
734         import_log_msg = ("import from %s"
735                           " with iallocator %s" %
736                           (enode, self.opts.iallocator))
737       elif self.opts.disk_template not in constants.DTS_INT_MIRROR:
738         snode = None
739         import_log_msg = ("import from %s to %s" %
740                           (enode, pnode))
741       else:
742         import_log_msg = ("import from %s to %s, %s" %
743                           (enode, pnode, snode))
744
745       exp_op = opcodes.OpBackupExport(instance_name=instance,
746                                       target_node=enode,
747                                       mode=constants.EXPORT_MODE_LOCAL,
748                                       shutdown=True)
749       rem_op = opcodes.OpInstanceRemove(instance_name=instance,
750                                         ignore_failures=True)
751       imp_dir = utils.PathJoin(pathutils.EXPORT_DIR, full_name)
752       imp_op = opcodes.OpInstanceCreate(instance_name=instance,
753                                         disks=[{"size": size}
754                                                for size in self.disk_size],
755                                         disk_template=self.opts.disk_template,
756                                         nics=self.opts.nics,
757                                         mode=constants.INSTANCE_IMPORT,
758                                         src_node=enode,
759                                         src_path=imp_dir,
760                                         pnode=pnode,
761                                         snode=snode,
762                                         start=True,
763                                         ip_check=self.opts.ip_check,
764                                         name_check=self.opts.name_check,
765                                         wait_for_sync=True,
766                                         file_storage_dir=None,
767                                         file_driver="loop",
768                                         iallocator=self.opts.iallocator,
769                                         beparams=self.bep,
770                                         hvparams=self.hvp,
771                                         osparams=self.opts.osparams,
772                                         )
773
774       erem_op = opcodes.OpBackupRemove(instance_name=instance)
775
776       Log("export to node %s", enode, indent=2)
777       Log("remove instance", indent=2)
778       Log(import_log_msg, indent=2)
779       Log("remove export", indent=2)
780       self.ExecOrQueue(instance, [exp_op, rem_op, imp_op, erem_op])
781
782   @staticmethod
783   def StopInstanceOp(instance):
784     """Stop given instance."""
785     return opcodes.OpInstanceShutdown(instance_name=instance)
786
787   @staticmethod
788   def StartInstanceOp(instance):
789     """Start given instance."""
790     return opcodes.OpInstanceStartup(instance_name=instance, force=False)
791
792   @staticmethod
793   def RenameInstanceOp(instance, instance_new):
794     """Rename instance."""
795     return opcodes.OpInstanceRename(instance_name=instance,
796                                     new_name=instance_new)
797
798   @_DoCheckInstances
799   @_DoBatch(True)
800   def BurnStopStart(self):
801     """Stop/start the instances."""
802     Log("Stopping and starting instances")
803     for instance in self.instances:
804       Log("instance %s", instance, indent=1)
805       op1 = self.StopInstanceOp(instance)
806       op2 = self.StartInstanceOp(instance)
807       self.ExecOrQueue(instance, [op1, op2])
808
809   @_DoBatch(False)
810   def BurnRemove(self):
811     """Remove the instances."""
812     Log("Removing instances")
813     for instance in self.to_rem:
814       Log("instance %s", instance, indent=1)
815       op = opcodes.OpInstanceRemove(instance_name=instance,
816                                     ignore_failures=True)
817       self.ExecOrQueue(instance, [op])
818
819   def BurnRename(self):
820     """Rename the instances.
821
822     Note that this function will not execute in parallel, since we
823     only have one target for rename.
824
825     """
826     Log("Renaming instances")
827     rename = self.opts.rename
828     for instance in self.instances:
829       Log("instance %s", instance, indent=1)
830       op_stop1 = self.StopInstanceOp(instance)
831       op_stop2 = self.StopInstanceOp(rename)
832       op_rename1 = self.RenameInstanceOp(instance, rename)
833       op_rename2 = self.RenameInstanceOp(rename, instance)
834       op_start1 = self.StartInstanceOp(rename)
835       op_start2 = self.StartInstanceOp(instance)
836       self.ExecOp(False, op_stop1, op_rename1, op_start1)
837       self._CheckInstanceAlive(rename)
838       self.ExecOp(False, op_stop2, op_rename2, op_start2)
839       self._CheckInstanceAlive(instance)
840
841   @_DoCheckInstances
842   @_DoBatch(True)
843   def BurnReinstall(self):
844     """Reinstall the instances."""
845     Log("Reinstalling instances")
846     for instance in self.instances:
847       Log("instance %s", instance, indent=1)
848       op1 = self.StopInstanceOp(instance)
849       op2 = opcodes.OpInstanceReinstall(instance_name=instance)
850       Log("reinstall without passing the OS", indent=2)
851       op3 = opcodes.OpInstanceReinstall(instance_name=instance,
852                                         os_type=self.opts.os)
853       Log("reinstall specifying the OS", indent=2)
854       op4 = self.StartInstanceOp(instance)
855       self.ExecOrQueue(instance, [op1, op2, op3, op4])
856
857   @_DoCheckInstances
858   @_DoBatch(True)
859   def BurnReboot(self):
860     """Reboot the instances."""
861     Log("Rebooting instances")
862     for instance in self.instances:
863       Log("instance %s", instance, indent=1)
864       ops = []
865       for reboot_type in self.opts.reboot_types:
866         op = opcodes.OpInstanceReboot(instance_name=instance,
867                                       reboot_type=reboot_type,
868                                       ignore_secondaries=False)
869         Log("reboot with type '%s'", reboot_type, indent=2)
870         ops.append(op)
871       self.ExecOrQueue(instance, ops)
872
873   @_DoCheckInstances
874   @_DoBatch(True)
875   def BurnActivateDisks(self):
876     """Activate and deactivate disks of the instances."""
877     Log("Activating/deactivating disks")
878     for instance in self.instances:
879       Log("instance %s", instance, indent=1)
880       op_start = self.StartInstanceOp(instance)
881       op_act = opcodes.OpInstanceActivateDisks(instance_name=instance)
882       op_deact = opcodes.OpInstanceDeactivateDisks(instance_name=instance)
883       op_stop = self.StopInstanceOp(instance)
884       Log("activate disks when online", indent=2)
885       Log("activate disks when offline", indent=2)
886       Log("deactivate disks (when offline)", indent=2)
887       self.ExecOrQueue(instance, [op_act, op_stop, op_act, op_deact, op_start])
888
889   @_DoCheckInstances
890   @_DoBatch(False)
891   def BurnAddRemoveDisks(self):
892     """Add and remove an extra disk for the instances."""
893     Log("Adding and removing disks")
894     for instance in self.instances:
895       Log("instance %s", instance, indent=1)
896       op_add = opcodes.OpInstanceSetParams(
897         instance_name=instance,
898         disks=[(constants.DDM_ADD, {"size": self.disk_size[0]})])
899       op_rem = opcodes.OpInstanceSetParams(
900         instance_name=instance, disks=[(constants.DDM_REMOVE, {})])
901       op_stop = self.StopInstanceOp(instance)
902       op_start = self.StartInstanceOp(instance)
903       Log("adding a disk", indent=2)
904       Log("removing last disk", indent=2)
905       self.ExecOrQueue(instance, [op_add, op_stop, op_rem, op_start])
906
907   @_DoBatch(False)
908   def BurnAddRemoveNICs(self):
909     """Add, change and remove an extra NIC for the instances."""
910     Log("Adding and removing NICs")
911     for instance in self.instances:
912       Log("instance %s", instance, indent=1)
913       op_add = opcodes.OpInstanceSetParams(
914         instance_name=instance, nics=[(constants.DDM_ADD, {})])
915       op_chg = opcodes.OpInstanceSetParams(
916         instance_name=instance, nics=[(constants.DDM_MODIFY,
917                                        -1, {"mac": constants.VALUE_GENERATE})])
918       op_rem = opcodes.OpInstanceSetParams(
919         instance_name=instance, nics=[(constants.DDM_REMOVE, {})])
920       Log("adding a NIC", indent=2)
921       Log("changing a NIC", indent=2)
922       Log("removing last NIC", indent=2)
923       self.ExecOrQueue(instance, [op_add, op_chg, op_rem])
924
925   def ConfdCallback(self, reply):
926     """Callback for confd queries"""
927     if reply.type == confd_client.UPCALL_REPLY:
928       if reply.server_reply.status != constants.CONFD_REPL_STATUS_OK:
929         Err("Query %s gave non-ok status %s: %s" % (reply.orig_request,
930                                                     reply.server_reply.status,
931                                                     reply.server_reply))
932       if reply.orig_request.type == constants.CONFD_REQ_PING:
933         Log("Ping: OK", indent=1)
934       elif reply.orig_request.type == constants.CONFD_REQ_CLUSTER_MASTER:
935         if reply.server_reply.answer == self.cluster_info["master"]:
936           Log("Master: OK", indent=1)
937         else:
938           Err("Master: wrong: %s" % reply.server_reply.answer)
939       elif reply.orig_request.type == constants.CONFD_REQ_NODE_ROLE_BYNAME:
940         if reply.server_reply.answer == constants.CONFD_NODE_ROLE_MASTER:
941           Log("Node role for master: OK", indent=1)
942         else:
943           Err("Node role for master: wrong: %s" % reply.server_reply.answer)
944
945   def DoConfdRequestReply(self, req):
946     self.confd_counting_callback.RegisterQuery(req.rsalt)
947     self.confd_client.SendRequest(req, async=False)
948     while not self.confd_counting_callback.AllAnswered():
949       if not self.confd_client.ReceiveReply():
950         Err("Did not receive all expected confd replies")
951         break
952
953   def BurnConfd(self):
954     """Run confd queries for our instances.
955
956     The following confd queries are tested:
957       - CONFD_REQ_PING: simple ping
958       - CONFD_REQ_CLUSTER_MASTER: cluster master
959       - CONFD_REQ_NODE_ROLE_BYNAME: node role, for the master
960
961     """
962     Log("Checking confd results")
963
964     filter_callback = confd_client.ConfdFilterCallback(self.ConfdCallback)
965     counting_callback = confd_client.ConfdCountingCallback(filter_callback)
966     self.confd_counting_callback = counting_callback
967
968     self.confd_client = confd_client.GetConfdClient(counting_callback)
969
970     req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
971     self.DoConfdRequestReply(req)
972
973     req = confd_client.ConfdClientRequest(
974       type=constants.CONFD_REQ_CLUSTER_MASTER)
975     self.DoConfdRequestReply(req)
976
977     req = confd_client.ConfdClientRequest(
978         type=constants.CONFD_REQ_NODE_ROLE_BYNAME,
979         query=self.cluster_info["master"])
980     self.DoConfdRequestReply(req)
981
982   def _CheckInstanceAlive(self, instance):
983     """Check if an instance is alive by doing http checks.
984
985     This will try to retrieve the url on the instance /hostname.txt
986     and check that it contains the hostname of the instance. In case
987     we get ECONNREFUSED, we retry up to the net timeout seconds, for
988     any other error we abort.
989
990     """
991     if not self.opts.http_check:
992       return
993     end_time = time.time() + self.opts.net_timeout
994     url = None
995     while time.time() < end_time and url is None:
996       try:
997         url = self.url_opener.open("http://%s/hostname.txt" % instance)
998       except IOError:
999         # here we can have connection refused, no route to host, etc.
1000         time.sleep(1)
1001     if url is None:
1002       raise InstanceDown(instance, "Cannot contact instance")
1003     hostname = url.read().strip()
1004     url.close()
1005     if hostname != instance:
1006       raise InstanceDown(instance, ("Hostname mismatch, expected %s, got %s" %
1007                                     (instance, hostname)))
1008
1009   def BurninCluster(self):
1010     """Test a cluster intensively.
1011
1012     This will create instances and then start/stop/failover them.
1013     It is safe for existing instances but could impact performance.
1014
1015     """
1016
1017     opts = self.opts
1018
1019     Log("Testing global parameters")
1020
1021     if (len(self.nodes) == 1 and
1022         opts.disk_template not in (constants.DT_DISKLESS, constants.DT_PLAIN,
1023                                    constants.DT_FILE,
1024                                    constants.DT_SHARED_FILE)):
1025       Err("When one node is available/selected the disk template must"
1026           " be 'diskless', 'file' or 'plain'")
1027
1028     if opts.do_confd_tests and not constants.ENABLE_CONFD:
1029       Err("You selected confd tests but confd was disabled at configure time")
1030
1031     has_err = True
1032     try:
1033       self.BurnCreateInstances()
1034
1035       if self.bep[constants.BE_MINMEM] < self.bep[constants.BE_MAXMEM]:
1036         self.BurnModifyRuntimeMemory()
1037
1038       if opts.do_replace1 and opts.disk_template in constants.DTS_INT_MIRROR:
1039         self.BurnReplaceDisks1D8()
1040       if (opts.do_replace2 and len(self.nodes) > 2 and
1041           opts.disk_template in constants.DTS_INT_MIRROR):
1042         self.BurnReplaceDisks2()
1043
1044       if (opts.disk_template in constants.DTS_GROWABLE and
1045           compat.any(n > 0 for n in self.disk_growth)):
1046         self.BurnGrowDisks()
1047
1048       if opts.do_failover and opts.disk_template in constants.DTS_MIRRORED:
1049         self.BurnFailover()
1050
1051       if opts.do_migrate:
1052         if opts.disk_template not in constants.DTS_MIRRORED:
1053           Log("Skipping migration (disk template %s does not support it)",
1054               opts.disk_template)
1055         elif not self.hv_class.CAN_MIGRATE:
1056           Log("Skipping migration (hypervisor %s does not support it)",
1057               self.hypervisor)
1058         else:
1059           self.BurnMigrate()
1060
1061       if (opts.do_move and len(self.nodes) > 1 and
1062           opts.disk_template in [constants.DT_PLAIN, constants.DT_FILE]):
1063         self.BurnMove()
1064
1065       if (opts.do_importexport and
1066           opts.disk_template not in (constants.DT_DISKLESS,
1067                                      constants.DT_SHARED_FILE,
1068                                      constants.DT_FILE)):
1069         self.BurnImportExport()
1070
1071       if opts.do_reinstall:
1072         self.BurnReinstall()
1073
1074       if opts.do_reboot:
1075         self.BurnReboot()
1076
1077       if opts.do_addremove_disks:
1078         self.BurnAddRemoveDisks()
1079
1080       default_nic_mode = self.cluster_default_nicparams[constants.NIC_MODE]
1081       # Don't add/remove nics in routed mode, as we would need an ip to add
1082       # them with
1083       if opts.do_addremove_nics:
1084         if default_nic_mode == constants.NIC_MODE_BRIDGED:
1085           self.BurnAddRemoveNICs()
1086         else:
1087           Log("Skipping nic add/remove as the cluster is not in bridged mode")
1088
1089       if opts.do_activate_disks:
1090         self.BurnActivateDisks()
1091
1092       if opts.rename:
1093         self.BurnRename()
1094
1095       if opts.do_confd_tests:
1096         self.BurnConfd()
1097
1098       if opts.do_startstop:
1099         self.BurnStopStart()
1100
1101       has_err = False
1102     finally:
1103       if has_err:
1104         Log("Error detected: opcode buffer follows:\n\n")
1105         Log(self.GetFeedbackBuf())
1106         Log("\n\n")
1107       if not self.opts.keep_instances:
1108         try:
1109           self.BurnRemove()
1110         except Exception, err:  # pylint: disable=W0703
1111           if has_err: # already detected errors, so errors in removal
1112                       # are quite expected
1113             Log("Note: error detected during instance remove: %s", err)
1114           else: # non-expected error
1115             raise
1116
1117     return constants.EXIT_SUCCESS
1118
1119
1120 def main():
1121   """Main function.
1122
1123   """
1124   utils.SetupLogging(pathutils.LOG_BURNIN, sys.argv[0],
1125                      debug=False, stderr_logging=True)
1126
1127   return Burner().BurninCluster()
1128
1129
1130 if __name__ == "__main__":
1131   main()