Merge branch 'stable-2.6-esi' into stable-2.6-ippool-hotplug-esi
[ganeti-local] / tools / burnin
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Burnin program
23
24 """
25
26 import sys
27 import optparse
28 import time
29 import socket
30 import urllib
31 from itertools import izip, islice, cycle
32 from cStringIO import StringIO
33
34 from ganeti import opcodes
35 from ganeti import constants
36 from ganeti import cli
37 from ganeti import errors
38 from ganeti import utils
39 from ganeti import hypervisor
40 from ganeti import compat
41
42 from ganeti.confd import client as confd_client
43
44
45 USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...")
46
47 MAX_RETRIES = 3
48 LOG_HEADERS = {
49   0: "- ",
50   1: "* ",
51   2: ""
52   }
53
54
55 class InstanceDown(Exception):
56   """The checked instance was not up"""
57
58
59 class BurninFailure(Exception):
60   """Failure detected during burning"""
61
62
63 def Usage():
64   """Shows program usage information and exits the program."""
65
66   print >> sys.stderr, "Usage:"
67   print >> sys.stderr, USAGE
68   sys.exit(2)
69
70
71 def Log(msg, *args, **kwargs):
72   """Simple function that prints out its argument.
73
74   """
75   if args:
76     msg = msg % args
77   indent = kwargs.get("indent", 0)
78   sys.stdout.write("%*s%s%s\n" % (2 * indent, "",
79                                   LOG_HEADERS.get(indent, "  "), msg))
80   sys.stdout.flush()
81
82
83 def Err(msg, exit_code=1):
84   """Simple error logging that prints to stderr.
85
86   """
87   sys.stderr.write(msg + "\n")
88   sys.stderr.flush()
89   sys.exit(exit_code)
90
91
92 class SimpleOpener(urllib.FancyURLopener):
93   """A simple url opener"""
94   # pylint: disable=W0221
95
96   def prompt_user_passwd(self, host, realm, clear_cache=0):
97     """No-interaction version of prompt_user_passwd."""
98     # we follow parent class' API
99     # pylint: disable=W0613
100     return None, None
101
102   def http_error_default(self, url, fp, errcode, errmsg, headers):
103     """Custom error handling"""
104     # make sure sockets are not left in CLOSE_WAIT, this is similar
105     # but with a different exception to the BasicURLOpener class
106     _ = fp.read() # throw away data
107     fp.close()
108     raise InstanceDown("HTTP error returned: code %s, msg %s" %
109                        (errcode, errmsg))
110
111
112 OPTIONS = [
113   cli.cli_option("-o", "--os", dest="os", default=None,
114                  help="OS to use during burnin",
115                  metavar="<OS>",
116                  completion_suggest=cli.OPT_COMPL_ONE_OS),
117   cli.HYPERVISOR_OPT,
118   cli.OSPARAMS_OPT,
119   cli.cli_option("--disk-size", dest="disk_size",
120                  help="Disk size (determines disk count)",
121                  default="128m", type="string", metavar="<size,size,...>",
122                  completion_suggest=("128M 512M 1G 4G 1G,256M"
123                                      " 4G,1G,1G 10G").split()),
124   cli.cli_option("--disk-growth", dest="disk_growth", help="Disk growth",
125                  default="128m", type="string", metavar="<size,size,...>"),
126   cli.cli_option("--mem-size", dest="mem_size", help="Memory size",
127                  default=None, type="unit", metavar="<size>",
128                  completion_suggest=("128M 256M 512M 1G 4G 8G"
129                                      " 12G 16G").split()),
130   cli.cli_option("--maxmem-size", dest="maxmem_size", help="Max Memory size",
131                  default=256, type="unit", metavar="<size>",
132                  completion_suggest=("128M 256M 512M 1G 4G 8G"
133                                      " 12G 16G").split()),
134   cli.cli_option("--minmem-size", dest="minmem_size", help="Min Memory size",
135                  default=128, type="unit", metavar="<size>",
136                  completion_suggest=("128M 256M 512M 1G 4G 8G"
137                                      " 12G 16G").split()),
138   cli.cli_option("--vcpu-count", dest="vcpu_count", help="VCPU count",
139                  default=3, type="unit", metavar="<count>",
140                  completion_suggest=("1 2 3 4").split()),
141   cli.DEBUG_OPT,
142   cli.VERBOSE_OPT,
143   cli.NOIPCHECK_OPT,
144   cli.NONAMECHECK_OPT,
145   cli.EARLY_RELEASE_OPT,
146   cli.cli_option("--no-replace1", dest="do_replace1",
147                  help="Skip disk replacement with the same secondary",
148                  action="store_false", default=True),
149   cli.cli_option("--no-replace2", dest="do_replace2",
150                  help="Skip disk replacement with a different secondary",
151                  action="store_false", default=True),
152   cli.cli_option("--no-failover", dest="do_failover",
153                  help="Skip instance failovers", action="store_false",
154                  default=True),
155   cli.cli_option("--no-migrate", dest="do_migrate",
156                  help="Skip instance live migration",
157                  action="store_false", default=True),
158   cli.cli_option("--no-move", dest="do_move",
159                  help="Skip instance moves", action="store_false",
160                  default=True),
161   cli.cli_option("--no-importexport", dest="do_importexport",
162                  help="Skip instance export/import", action="store_false",
163                  default=True),
164   cli.cli_option("--no-startstop", dest="do_startstop",
165                  help="Skip instance stop/start", action="store_false",
166                  default=True),
167   cli.cli_option("--no-reinstall", dest="do_reinstall",
168                  help="Skip instance reinstall", action="store_false",
169                  default=True),
170   cli.cli_option("--no-reboot", dest="do_reboot",
171                  help="Skip instance reboot", action="store_false",
172                  default=True),
173   cli.cli_option("--reboot-types", dest="reboot_types",
174                  help="Specify the reboot types", default=None),
175   cli.cli_option("--no-activate-disks", dest="do_activate_disks",
176                  help="Skip disk activation/deactivation",
177                  action="store_false", default=True),
178   cli.cli_option("--no-add-disks", dest="do_addremove_disks",
179                  help="Skip disk addition/removal",
180                  action="store_false", default=True),
181   cli.cli_option("--no-add-nics", dest="do_addremove_nics",
182                  help="Skip NIC addition/removal",
183                  action="store_false", default=True),
184   cli.cli_option("--no-nics", dest="nics",
185                  help="No network interfaces", action="store_const",
186                  const=[], default=[{}]),
187   cli.cli_option("--no-confd", dest="do_confd_tests",
188                  help="Skip confd queries",
189                  action="store_false", default=constants.ENABLE_CONFD),
190   cli.cli_option("--rename", dest="rename", default=None,
191                  help=("Give one unused instance name which is taken"
192                        " to start the renaming sequence"),
193                  metavar="<instance_name>"),
194   cli.cli_option("-t", "--disk-template", dest="disk_template",
195                  choices=list(constants.DISK_TEMPLATES),
196                  default=constants.DT_DRBD8,
197                  help="Disk template (diskless, file, plain, sharedfile"
198                  " or drbd) [drbd]"),
199   cli.cli_option("-n", "--nodes", dest="nodes", default="",
200                  help=("Comma separated list of nodes to perform"
201                        " the burnin on (defaults to all nodes)"),
202                  completion_suggest=cli.OPT_COMPL_MANY_NODES),
203   cli.cli_option("-I", "--iallocator", dest="iallocator",
204                  default=None, type="string",
205                  help=("Perform the allocation using an iallocator"
206                        " instead of fixed node spread (node restrictions no"
207                        " longer apply, therefore -n/--nodes must not be"
208                        " used"),
209                  completion_suggest=cli.OPT_COMPL_ONE_IALLOCATOR),
210   cli.cli_option("-p", "--parallel", default=False, action="store_true",
211                  dest="parallel",
212                  help=("Enable parallelization of some operations in"
213                        " order to speed burnin or to test granular locking")),
214   cli.cli_option("--net-timeout", default=15, type="int",
215                  dest="net_timeout",
216                  help=("The instance check network timeout in seconds"
217                        " (defaults to 15 seconds)"),
218                  completion_suggest="15 60 300 900".split()),
219   cli.cli_option("-C", "--http-check", default=False, action="store_true",
220                  dest="http_check",
221                  help=("Enable checking of instance status via http,"
222                        " looking for /hostname.txt that should contain the"
223                        " name of the instance")),
224   cli.cli_option("-K", "--keep-instances", default=False,
225                  action="store_true",
226                  dest="keep_instances",
227                  help=("Leave instances on the cluster after burnin,"
228                        " for investigation in case of errors or simply"
229                        " to use them")),
230   ]
231
232 # Mainly used for bash completion
233 ARGUMENTS = [cli.ArgInstance(min=1)]
234
235
236 def _DoCheckInstances(fn):
237   """Decorator for checking instances.
238
239   """
240   def wrapper(self, *args, **kwargs):
241     val = fn(self, *args, **kwargs)
242     for instance in self.instances:
243       self._CheckInstanceAlive(instance) # pylint: disable=W0212
244     return val
245
246   return wrapper
247
248
249 def _DoBatch(retry):
250   """Decorator for possible batch operations.
251
252   Must come after the _DoCheckInstances decorator (if any).
253
254   @param retry: whether this is a retryable batch, will be
255       passed to StartBatch
256
257   """
258   def wrap(fn):
259     def batched(self, *args, **kwargs):
260       self.StartBatch(retry)
261       val = fn(self, *args, **kwargs)
262       self.CommitQueue()
263       return val
264     return batched
265
266   return wrap
267
268
269 class Burner(object):
270   """Burner class."""
271
272   def __init__(self):
273     """Constructor."""
274     self.url_opener = SimpleOpener()
275     self._feed_buf = StringIO()
276     self.nodes = []
277     self.instances = []
278     self.to_rem = []
279     self.queued_ops = []
280     self.opts = None
281     self.queue_retry = False
282     self.disk_count = self.disk_growth = self.disk_size = None
283     self.hvp = self.bep = None
284     self.ParseOptions()
285     self.cl = cli.GetClient()
286     self.GetState()
287
288   def ClearFeedbackBuf(self):
289     """Clear the feedback buffer."""
290     self._feed_buf.truncate(0)
291
292   def GetFeedbackBuf(self):
293     """Return the contents of the buffer."""
294     return self._feed_buf.getvalue()
295
296   def Feedback(self, msg):
297     """Acumulate feedback in our buffer."""
298     formatted_msg = "%s %s" % (time.ctime(utils.MergeTime(msg[0])), msg[2])
299     self._feed_buf.write(formatted_msg + "\n")
300     if self.opts.verbose:
301       Log(formatted_msg, indent=3)
302
303   def MaybeRetry(self, retry_count, msg, fn, *args):
304     """Possibly retry a given function execution.
305
306     @type retry_count: int
307     @param retry_count: retry counter:
308         - 0: non-retryable action
309         - 1: last retry for a retryable action
310         - MAX_RETRIES: original try for a retryable action
311     @type msg: str
312     @param msg: the kind of the operation
313     @type fn: callable
314     @param fn: the function to be called
315
316     """
317     try:
318       val = fn(*args)
319       if retry_count > 0 and retry_count < MAX_RETRIES:
320         Log("Idempotent %s succeeded after %d retries",
321             msg, MAX_RETRIES - retry_count)
322       return val
323     except Exception, err: # pylint: disable=W0703
324       if retry_count == 0:
325         Log("Non-idempotent %s failed, aborting", msg)
326         raise
327       elif retry_count == 1:
328         Log("Idempotent %s repeated failure, aborting", msg)
329         raise
330       else:
331         Log("Idempotent %s failed, retry #%d/%d: %s",
332             msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err)
333         self.MaybeRetry(retry_count - 1, msg, fn, *args)
334
335   def _ExecOp(self, *ops):
336     """Execute one or more opcodes and manage the exec buffer.
337
338     @return: if only opcode has been passed, we return its result;
339         otherwise we return the list of results
340
341     """
342     job_id = cli.SendJob(ops, cl=self.cl)
343     results = cli.PollJob(job_id, cl=self.cl, feedback_fn=self.Feedback)
344     if len(ops) == 1:
345       return results[0]
346     else:
347       return results
348
349   def ExecOp(self, retry, *ops):
350     """Execute one or more opcodes and manage the exec buffer.
351
352     @return: if only opcode has been passed, we return its result;
353         otherwise we return the list of results
354
355     """
356     if retry:
357       rval = MAX_RETRIES
358     else:
359       rval = 0
360     cli.SetGenericOpcodeOpts(ops, self.opts)
361     return self.MaybeRetry(rval, "opcode", self._ExecOp, *ops)
362
363   def ExecOrQueue(self, name, ops, post_process=None):
364     """Execute an opcode and manage the exec buffer."""
365     if self.opts.parallel:
366       cli.SetGenericOpcodeOpts(ops, self.opts)
367       self.queued_ops.append((ops, name, post_process))
368     else:
369       val = self.ExecOp(self.queue_retry, *ops) # pylint: disable=W0142
370       if post_process is not None:
371         post_process()
372       return val
373
374   def StartBatch(self, retry):
375     """Start a new batch of jobs.
376
377     @param retry: whether this is a retryable batch
378
379     """
380     self.queued_ops = []
381     self.queue_retry = retry
382
383   def CommitQueue(self):
384     """Execute all submitted opcodes in case of parallel burnin"""
385     if not self.opts.parallel or not self.queued_ops:
386       return
387
388     if self.queue_retry:
389       rval = MAX_RETRIES
390     else:
391       rval = 0
392
393     try:
394       results = self.MaybeRetry(rval, "jobset", self.ExecJobSet,
395                                 self.queued_ops)
396     finally:
397       self.queued_ops = []
398     return results
399
400   def ExecJobSet(self, jobs):
401     """Execute a set of jobs and return once all are done.
402
403     The method will return the list of results, if all jobs are
404     successful. Otherwise, OpExecError will be raised from within
405     cli.py.
406
407     """
408     self.ClearFeedbackBuf()
409     jex = cli.JobExecutor(cl=self.cl, feedback_fn=self.Feedback)
410     for ops, name, _ in jobs:
411       jex.QueueJob(name, *ops) # pylint: disable=W0142
412     try:
413       results = jex.GetResults()
414     except Exception, err: # pylint: disable=W0703
415       Log("Jobs failed: %s", err)
416       raise BurninFailure()
417
418     fail = False
419     val = []
420     for (_, name, post_process), (success, result) in zip(jobs, results):
421       if success:
422         if post_process:
423           try:
424             post_process()
425           except Exception, err: # pylint: disable=W0703
426             Log("Post process call for job %s failed: %s", name, err)
427             fail = True
428         val.append(result)
429       else:
430         fail = True
431
432     if fail:
433       raise BurninFailure()
434
435     return val
436
437   def ParseOptions(self):
438     """Parses the command line options.
439
440     In case of command line errors, it will show the usage and exit the
441     program.
442
443     """
444     parser = optparse.OptionParser(usage="\n%s" % USAGE,
445                                    version=("%%prog (ganeti) %s" %
446                                             constants.RELEASE_VERSION),
447                                    option_list=OPTIONS)
448
449     options, args = parser.parse_args()
450     if len(args) < 1 or options.os is None:
451       Usage()
452
453     if options.mem_size:
454       options.maxmem_size = options.mem_size
455       options.minmem_size = options.mem_size
456     elif options.minmem_size > options.maxmem_size:
457       Err("Maximum memory lower than minimum memory")
458
459     supported_disk_templates = (constants.DT_DISKLESS,
460                                 constants.DT_FILE,
461                                 constants.DT_SHARED_FILE,
462                                 constants.DT_PLAIN,
463                                 constants.DT_DRBD8,
464                                 constants.DT_RBD,
465                                 constants.DT_EXT,
466                                 )
467     if options.disk_template not in supported_disk_templates:
468       Err("Unknown disk template '%s'" % options.disk_template)
469
470     if options.disk_template == constants.DT_DISKLESS:
471       disk_size = disk_growth = []
472       options.do_addremove_disks = False
473     else:
474       disk_size = [utils.ParseUnit(v) for v in options.disk_size.split(",")]
475       disk_growth = [utils.ParseUnit(v)
476                      for v in options.disk_growth.split(",")]
477       if len(disk_growth) != len(disk_size):
478         Err("Wrong disk sizes/growth combination")
479     if ((disk_size and options.disk_template == constants.DT_DISKLESS) or
480         (not disk_size and options.disk_template != constants.DT_DISKLESS)):
481       Err("Wrong disk count/disk template combination")
482
483     self.disk_size = disk_size
484     self.disk_growth = disk_growth
485     self.disk_count = len(disk_size)
486
487     if options.nodes and options.iallocator:
488       Err("Give either the nodes option or the iallocator option, not both")
489
490     if options.http_check and not options.name_check:
491       Err("Can't enable HTTP checks without name checks")
492
493     self.opts = options
494     self.instances = args
495     self.bep = {
496       constants.BE_MINMEM: options.minmem_size,
497       constants.BE_MAXMEM: options.maxmem_size,
498       constants.BE_VCPUS: options.vcpu_count,
499       }
500
501     self.hypervisor = None
502     self.hvp = {}
503     if options.hypervisor:
504       self.hypervisor, self.hvp = options.hypervisor
505
506     if options.reboot_types is None:
507       options.reboot_types = constants.REBOOT_TYPES
508     else:
509       options.reboot_types = options.reboot_types.split(",")
510       rt_diff = set(options.reboot_types).difference(constants.REBOOT_TYPES)
511       if rt_diff:
512         Err("Invalid reboot types specified: %s" % utils.CommaJoin(rt_diff))
513
514     socket.setdefaulttimeout(options.net_timeout)
515
516   def GetState(self):
517     """Read the cluster state from the master daemon."""
518     if self.opts.nodes:
519       names = self.opts.nodes.split(",")
520     else:
521       names = []
522     try:
523       op = opcodes.OpNodeQuery(output_fields=["name", "offline", "drained"],
524                                names=names, use_locking=True)
525       result = self.ExecOp(True, op)
526     except errors.GenericError, err:
527       err_code, msg = cli.FormatError(err)
528       Err(msg, exit_code=err_code)
529     self.nodes = [data[0] for data in result if not (data[1] or data[2])]
530
531     op_diagnose = opcodes.OpOsDiagnose(output_fields=["name",
532                                                       "variants",
533                                                       "hidden"],
534                                        names=[])
535     result = self.ExecOp(True, op_diagnose)
536
537     if not result:
538       Err("Can't get the OS list")
539
540     found = False
541     for (name, variants, _) in result:
542       if self.opts.os in cli.CalculateOSNames(name, variants):
543         found = True
544         break
545
546     if not found:
547       Err("OS '%s' not found" % self.opts.os)
548
549     cluster_info = self.cl.QueryClusterInfo()
550     self.cluster_info = cluster_info
551     if not self.cluster_info:
552       Err("Can't get cluster info")
553
554     default_nic_params = self.cluster_info["nicparams"][constants.PP_DEFAULT]
555     self.cluster_default_nicparams = default_nic_params
556     if self.hypervisor is None:
557       self.hypervisor = self.cluster_info["default_hypervisor"]
558     self.hv_class = hypervisor.GetHypervisorClass(self.hypervisor)
559
560   @_DoCheckInstances
561   @_DoBatch(False)
562   def BurnCreateInstances(self):
563     """Create the given instances.
564
565     """
566     self.to_rem = []
567     mytor = izip(cycle(self.nodes),
568                  islice(cycle(self.nodes), 1, None),
569                  self.instances)
570
571     Log("Creating instances")
572     for pnode, snode, instance in mytor:
573       Log("instance %s", instance, indent=1)
574       if self.opts.iallocator:
575         pnode = snode = None
576         msg = "with iallocator %s" % self.opts.iallocator
577       elif self.opts.disk_template not in constants.DTS_INT_MIRROR:
578         snode = None
579         msg = "on %s" % pnode
580       else:
581         msg = "on %s, %s" % (pnode, snode)
582
583       Log(msg, indent=2)
584
585       op = opcodes.OpInstanceCreate(instance_name=instance,
586                                     disks=[{"size": size}
587                                            for size in self.disk_size],
588                                     disk_template=self.opts.disk_template,
589                                     nics=self.opts.nics,
590                                     mode=constants.INSTANCE_CREATE,
591                                     os_type=self.opts.os,
592                                     pnode=pnode,
593                                     snode=snode,
594                                     start=True,
595                                     ip_check=self.opts.ip_check,
596                                     name_check=self.opts.name_check,
597                                     wait_for_sync=True,
598                                     file_driver="loop",
599                                     file_storage_dir=None,
600                                     iallocator=self.opts.iallocator,
601                                     beparams=self.bep,
602                                     hvparams=self.hvp,
603                                     hypervisor=self.hypervisor,
604                                     osparams=self.opts.osparams,
605                                     )
606       remove_instance = lambda name: lambda: self.to_rem.append(name)
607       self.ExecOrQueue(instance, [op], post_process=remove_instance(instance))
608
609   @_DoBatch(False)
610   def BurnModifyRuntimeMemory(self):
611     """Alter the runtime memory."""
612     Log("Setting instance runtime memory")
613     for instance in self.instances:
614       Log("instance %s", instance, indent=1)
615       tgt_mem = self.bep[constants.BE_MINMEM]
616       op = opcodes.OpInstanceSetParams(instance_name=instance,
617                                        runtime_mem=tgt_mem)
618       Log("Set memory to %s MB", tgt_mem, indent=2)
619       self.ExecOrQueue(instance, [op])
620
621   @_DoBatch(False)
622   def BurnGrowDisks(self):
623     """Grow both the os and the swap disks by the requested amount, if any."""
624     Log("Growing disks")
625     for instance in self.instances:
626       Log("instance %s", instance, indent=1)
627       for idx, growth in enumerate(self.disk_growth):
628         if growth > 0:
629           op = opcodes.OpInstanceGrowDisk(instance_name=instance, disk=idx,
630                                           amount=growth, wait_for_sync=True)
631           Log("increase disk/%s by %s MB", idx, growth, indent=2)
632           self.ExecOrQueue(instance, [op])
633
634   @_DoBatch(True)
635   def BurnReplaceDisks1D8(self):
636     """Replace disks on primary and secondary for drbd8."""
637     Log("Replacing disks on the same nodes")
638     early_release = self.opts.early_release
639     for instance in self.instances:
640       Log("instance %s", instance, indent=1)
641       ops = []
642       for mode in constants.REPLACE_DISK_SEC, constants.REPLACE_DISK_PRI:
643         op = opcodes.OpInstanceReplaceDisks(instance_name=instance,
644                                             mode=mode,
645                                             disks=list(range(self.disk_count)),
646                                             early_release=early_release)
647         Log("run %s", mode, indent=2)
648         ops.append(op)
649       self.ExecOrQueue(instance, ops)
650
651   @_DoBatch(True)
652   def BurnReplaceDisks2(self):
653     """Replace secondary node."""
654     Log("Changing the secondary node")
655     mode = constants.REPLACE_DISK_CHG
656
657     mytor = izip(islice(cycle(self.nodes), 2, None),
658                  self.instances)
659     for tnode, instance in mytor:
660       Log("instance %s", instance, indent=1)
661       if self.opts.iallocator:
662         tnode = None
663         msg = "with iallocator %s" % self.opts.iallocator
664       else:
665         msg = tnode
666       op = opcodes.OpInstanceReplaceDisks(instance_name=instance,
667                                           mode=mode,
668                                           remote_node=tnode,
669                                           iallocator=self.opts.iallocator,
670                                           disks=[],
671                                           early_release=self.opts.early_release)
672       Log("run %s %s", mode, msg, indent=2)
673       self.ExecOrQueue(instance, [op])
674
675   @_DoCheckInstances
676   @_DoBatch(False)
677   def BurnFailover(self):
678     """Failover the instances."""
679     Log("Failing over instances")
680     for instance in self.instances:
681       Log("instance %s", instance, indent=1)
682       op = opcodes.OpInstanceFailover(instance_name=instance,
683                                       ignore_consistency=False)
684       self.ExecOrQueue(instance, [op])
685
686   @_DoCheckInstances
687   @_DoBatch(False)
688   def BurnMove(self):
689     """Move the instances."""
690     Log("Moving instances")
691     mytor = izip(islice(cycle(self.nodes), 1, None),
692                  self.instances)
693     for tnode, instance in mytor:
694       Log("instance %s", instance, indent=1)
695       op = opcodes.OpInstanceMove(instance_name=instance,
696                                   target_node=tnode)
697       self.ExecOrQueue(instance, [op])
698
699   @_DoBatch(False)
700   def BurnMigrate(self):
701     """Migrate the instances."""
702     Log("Migrating instances")
703     for instance in self.instances:
704       Log("instance %s", instance, indent=1)
705       op1 = opcodes.OpInstanceMigrate(instance_name=instance, mode=None,
706                                       cleanup=False)
707
708       op2 = opcodes.OpInstanceMigrate(instance_name=instance, mode=None,
709                                       cleanup=True)
710       Log("migration and migration cleanup", indent=2)
711       self.ExecOrQueue(instance, [op1, op2])
712
713   @_DoCheckInstances
714   @_DoBatch(False)
715   def BurnImportExport(self):
716     """Export the instance, delete it, and import it back.
717
718     """
719     Log("Exporting and re-importing instances")
720     mytor = izip(cycle(self.nodes),
721                  islice(cycle(self.nodes), 1, None),
722                  islice(cycle(self.nodes), 2, None),
723                  self.instances)
724
725     for pnode, snode, enode, instance in mytor:
726       Log("instance %s", instance, indent=1)
727       # read the full name of the instance
728       nam_op = opcodes.OpInstanceQuery(output_fields=["name"],
729                                        names=[instance], use_locking=True)
730       full_name = self.ExecOp(False, nam_op)[0][0]
731
732       if self.opts.iallocator:
733         pnode = snode = None
734         import_log_msg = ("import from %s"
735                           " with iallocator %s" %
736                           (enode, self.opts.iallocator))
737       elif self.opts.disk_template not in constants.DTS_INT_MIRROR:
738         snode = None
739         import_log_msg = ("import from %s to %s" %
740                           (enode, pnode))
741       else:
742         import_log_msg = ("import from %s to %s, %s" %
743                           (enode, pnode, snode))
744
745       exp_op = opcodes.OpBackupExport(instance_name=instance,
746                                       target_node=enode,
747                                       mode=constants.EXPORT_MODE_LOCAL,
748                                       shutdown=True)
749       rem_op = opcodes.OpInstanceRemove(instance_name=instance,
750                                         ignore_failures=True)
751       imp_dir = utils.PathJoin(constants.EXPORT_DIR, full_name)
752       imp_op = opcodes.OpInstanceCreate(instance_name=instance,
753                                         disks=[{"size": size}
754                                                for size in self.disk_size],
755                                         disk_template=self.opts.disk_template,
756                                         nics=self.opts.nics,
757                                         mode=constants.INSTANCE_IMPORT,
758                                         src_node=enode,
759                                         src_path=imp_dir,
760                                         pnode=pnode,
761                                         snode=snode,
762                                         start=True,
763                                         ip_check=self.opts.ip_check,
764                                         name_check=self.opts.name_check,
765                                         wait_for_sync=True,
766                                         file_storage_dir=None,
767                                         file_driver="loop",
768                                         iallocator=self.opts.iallocator,
769                                         beparams=self.bep,
770                                         hvparams=self.hvp,
771                                         osparams=self.opts.osparams,
772                                         )
773
774       erem_op = opcodes.OpBackupRemove(instance_name=instance)
775
776       Log("export to node %s", enode, indent=2)
777       Log("remove instance", indent=2)
778       Log(import_log_msg, indent=2)
779       Log("remove export", indent=2)
780       self.ExecOrQueue(instance, [exp_op, rem_op, imp_op, erem_op])
781
782   @staticmethod
783   def StopInstanceOp(instance):
784     """Stop given instance."""
785     return opcodes.OpInstanceShutdown(instance_name=instance)
786
787   @staticmethod
788   def StartInstanceOp(instance):
789     """Start given instance."""
790     return opcodes.OpInstanceStartup(instance_name=instance, force=False)
791
792   @staticmethod
793   def RenameInstanceOp(instance, instance_new):
794     """Rename instance."""
795     return opcodes.OpInstanceRename(instance_name=instance,
796                                     new_name=instance_new)
797
798   @_DoCheckInstances
799   @_DoBatch(True)
800   def BurnStopStart(self):
801     """Stop/start the instances."""
802     Log("Stopping and starting instances")
803     for instance in self.instances:
804       Log("instance %s", instance, indent=1)
805       op1 = self.StopInstanceOp(instance)
806       op2 = self.StartInstanceOp(instance)
807       self.ExecOrQueue(instance, [op1, op2])
808
809   @_DoBatch(False)
810   def BurnRemove(self):
811     """Remove the instances."""
812     Log("Removing instances")
813     for instance in self.to_rem:
814       Log("instance %s", instance, indent=1)
815       op = opcodes.OpInstanceRemove(instance_name=instance,
816                                     ignore_failures=True)
817       self.ExecOrQueue(instance, [op])
818
819   def BurnRename(self):
820     """Rename the instances.
821
822     Note that this function will not execute in parallel, since we
823     only have one target for rename.
824
825     """
826     Log("Renaming instances")
827     rename = self.opts.rename
828     for instance in self.instances:
829       Log("instance %s", instance, indent=1)
830       op_stop1 = self.StopInstanceOp(instance)
831       op_stop2 = self.StopInstanceOp(rename)
832       op_rename1 = self.RenameInstanceOp(instance, rename)
833       op_rename2 = self.RenameInstanceOp(rename, instance)
834       op_start1 = self.StartInstanceOp(rename)
835       op_start2 = self.StartInstanceOp(instance)
836       self.ExecOp(False, op_stop1, op_rename1, op_start1)
837       self._CheckInstanceAlive(rename)
838       self.ExecOp(False, op_stop2, op_rename2, op_start2)
839       self._CheckInstanceAlive(instance)
840
841   @_DoCheckInstances
842   @_DoBatch(True)
843   def BurnReinstall(self):
844     """Reinstall the instances."""
845     Log("Reinstalling instances")
846     for instance in self.instances:
847       Log("instance %s", instance, indent=1)
848       op1 = self.StopInstanceOp(instance)
849       op2 = opcodes.OpInstanceReinstall(instance_name=instance)
850       Log("reinstall without passing the OS", indent=2)
851       op3 = opcodes.OpInstanceReinstall(instance_name=instance,
852                                         os_type=self.opts.os)
853       Log("reinstall specifying the OS", indent=2)
854       op4 = self.StartInstanceOp(instance)
855       self.ExecOrQueue(instance, [op1, op2, op3, op4])
856
857   @_DoCheckInstances
858   @_DoBatch(True)
859   def BurnReboot(self):
860     """Reboot the instances."""
861     Log("Rebooting instances")
862     for instance in self.instances:
863       Log("instance %s", instance, indent=1)
864       ops = []
865       for reboot_type in self.opts.reboot_types:
866         op = opcodes.OpInstanceReboot(instance_name=instance,
867                                       reboot_type=reboot_type,
868                                       ignore_secondaries=False)
869         Log("reboot with type '%s'", reboot_type, indent=2)
870         ops.append(op)
871       self.ExecOrQueue(instance, ops)
872
873   @_DoCheckInstances
874   @_DoBatch(True)
875   def BurnActivateDisks(self):
876     """Activate and deactivate disks of the instances."""
877     Log("Activating/deactivating disks")
878     for instance in self.instances:
879       Log("instance %s", instance, indent=1)
880       op_start = self.StartInstanceOp(instance)
881       op_act = opcodes.OpInstanceActivateDisks(instance_name=instance)
882       op_deact = opcodes.OpInstanceDeactivateDisks(instance_name=instance)
883       op_stop = self.StopInstanceOp(instance)
884       Log("activate disks when online", indent=2)
885       Log("activate disks when offline", indent=2)
886       Log("deactivate disks (when offline)", indent=2)
887       self.ExecOrQueue(instance, [op_act, op_stop, op_act, op_deact, op_start])
888
889   @_DoCheckInstances
890   @_DoBatch(False)
891   def BurnAddRemoveDisks(self):
892     """Add and remove an extra disk for the instances."""
893     Log("Adding and removing disks")
894     for instance in self.instances:
895       Log("instance %s", instance, indent=1)
896       op_add = opcodes.OpInstanceSetParams(\
897         instance_name=instance,
898         disks=[(constants.DDM_ADD, {"size": self.disk_size[0]})])
899       op_rem = opcodes.OpInstanceSetParams(\
900         instance_name=instance, disks=[(constants.DDM_REMOVE, {})])
901       op_stop = self.StopInstanceOp(instance)
902       op_start = self.StartInstanceOp(instance)
903       Log("adding a disk", indent=2)
904       Log("removing last disk", indent=2)
905       self.ExecOrQueue(instance, [op_add, op_stop, op_rem, op_start])
906
907   @_DoBatch(False)
908   def BurnAddRemoveNICs(self):
909     """Add, change and remove an extra NIC for the instances."""
910     Log("Adding and removing NICs")
911     for instance in self.instances:
912       Log("instance %s", instance, indent=1)
913       op_add = opcodes.OpInstanceSetParams(\
914         instance_name=instance, nics=[(constants.DDM_ADD, {})])
915       op_chg = opcodes.OpInstanceSetParams(\
916         instance_name=instance, nics=[(constants.DDM_MODIFY,
917                                        -1, {"mac": constants.VALUE_GENERATE})])
918       op_rem = opcodes.OpInstanceSetParams(\
919         instance_name=instance, nics=[(constants.DDM_REMOVE, {})])
920       Log("adding a NIC", indent=2)
921       Log("changing a NIC", indent=2)
922       Log("removing last NIC", indent=2)
923       self.ExecOrQueue(instance, [op_add, op_chg, op_rem])
924
925   def ConfdCallback(self, reply):
926     """Callback for confd queries"""
927     if reply.type == confd_client.UPCALL_REPLY:
928       if reply.server_reply.status != constants.CONFD_REPL_STATUS_OK:
929         Err("Query %s gave non-ok status %s: %s" % (reply.orig_request,
930                                                     reply.server_reply.status,
931                                                     reply.server_reply))
932       if reply.orig_request.type == constants.CONFD_REQ_PING:
933         Log("Ping: OK", indent=1)
934       elif reply.orig_request.type == constants.CONFD_REQ_CLUSTER_MASTER:
935         if reply.server_reply.answer == self.cluster_info["master"]:
936           Log("Master: OK", indent=1)
937         else:
938           Err("Master: wrong: %s" % reply.server_reply.answer)
939       elif reply.orig_request.type == constants.CONFD_REQ_NODE_ROLE_BYNAME:
940         if reply.server_reply.answer == constants.CONFD_NODE_ROLE_MASTER:
941           Log("Node role for master: OK", indent=1)
942         else:
943           Err("Node role for master: wrong: %s" % reply.server_reply.answer)
944
945   def DoConfdRequestReply(self, req):
946     self.confd_counting_callback.RegisterQuery(req.rsalt)
947     self.confd_client.SendRequest(req, async=False)
948     while not self.confd_counting_callback.AllAnswered():
949       if not self.confd_client.ReceiveReply():
950         Err("Did not receive all expected confd replies")
951         break
952
953   def BurnConfd(self):
954     """Run confd queries for our instances.
955
956     The following confd queries are tested:
957       - CONFD_REQ_PING: simple ping
958       - CONFD_REQ_CLUSTER_MASTER: cluster master
959       - CONFD_REQ_NODE_ROLE_BYNAME: node role, for the master
960
961     """
962     Log("Checking confd results")
963
964     filter_callback = confd_client.ConfdFilterCallback(self.ConfdCallback)
965     counting_callback = confd_client.ConfdCountingCallback(filter_callback)
966     self.confd_counting_callback = counting_callback
967
968     self.confd_client = confd_client.GetConfdClient(counting_callback)
969
970     req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
971     self.DoConfdRequestReply(req)
972
973     req = confd_client.ConfdClientRequest(
974       type=constants.CONFD_REQ_CLUSTER_MASTER)
975     self.DoConfdRequestReply(req)
976
977     req = confd_client.ConfdClientRequest(
978         type=constants.CONFD_REQ_NODE_ROLE_BYNAME,
979         query=self.cluster_info["master"])
980     self.DoConfdRequestReply(req)
981
982   def _CheckInstanceAlive(self, instance):
983     """Check if an instance is alive by doing http checks.
984
985     This will try to retrieve the url on the instance /hostname.txt
986     and check that it contains the hostname of the instance. In case
987     we get ECONNREFUSED, we retry up to the net timeout seconds, for
988     any other error we abort.
989
990     """
991     if not self.opts.http_check:
992       return
993     end_time = time.time() + self.opts.net_timeout
994     url = None
995     while time.time() < end_time and url is None:
996       try:
997         url = self.url_opener.open("http://%s/hostname.txt" % instance)
998       except IOError:
999         # here we can have connection refused, no route to host, etc.
1000         time.sleep(1)
1001     if url is None:
1002       raise InstanceDown(instance, "Cannot contact instance")
1003     hostname = url.read().strip()
1004     url.close()
1005     if hostname != instance:
1006       raise InstanceDown(instance, ("Hostname mismatch, expected %s, got %s" %
1007                                     (instance, hostname)))
1008
1009   def BurninCluster(self):
1010     """Test a cluster intensively.
1011
1012     This will create instances and then start/stop/failover them.
1013     It is safe for existing instances but could impact performance.
1014
1015     """
1016
1017     opts = self.opts
1018
1019     Log("Testing global parameters")
1020
1021     if (len(self.nodes) == 1 and
1022         opts.disk_template not in (constants.DT_DISKLESS, constants.DT_PLAIN,
1023                                    constants.DT_FILE,
1024                                    constants.DT_SHARED_FILE)):
1025       Err("When one node is available/selected the disk template must"
1026           " be 'diskless', 'file' or 'plain'")
1027
1028     if opts.do_confd_tests and not constants.ENABLE_CONFD:
1029       Err("You selected confd tests but confd was disabled at configure time")
1030
1031     has_err = True
1032     try:
1033       self.BurnCreateInstances()
1034
1035       if self.bep[constants.BE_MINMEM] < self.bep[constants.BE_MAXMEM]:
1036         self.BurnModifyRuntimeMemory()
1037
1038       if opts.do_replace1 and opts.disk_template in constants.DTS_INT_MIRROR:
1039         self.BurnReplaceDisks1D8()
1040       if (opts.do_replace2 and len(self.nodes) > 2 and
1041           opts.disk_template in constants.DTS_INT_MIRROR):
1042         self.BurnReplaceDisks2()
1043
1044       if (opts.disk_template in constants.DTS_GROWABLE and
1045           compat.any(n > 0 for n in self.disk_growth)):
1046         self.BurnGrowDisks()
1047
1048       if opts.do_failover and opts.disk_template in constants.DTS_MIRRORED:
1049         self.BurnFailover()
1050
1051       if opts.do_migrate:
1052         if opts.disk_template not in constants.DTS_MIRRORED:
1053           Log("Skipping migration (disk template %s does not support it)",
1054               opts.disk_template)
1055         elif not self.hv_class.CAN_MIGRATE:
1056           Log("Skipping migration (hypervisor %s does not support it)",
1057               self.hypervisor)
1058         else:
1059           self.BurnMigrate()
1060
1061       if (opts.do_move and len(self.nodes) > 1 and
1062           opts.disk_template in [constants.DT_PLAIN, constants.DT_FILE]):
1063         self.BurnMove()
1064
1065       if (opts.do_importexport and
1066           opts.disk_template not in (constants.DT_DISKLESS,
1067                                      constants.DT_SHARED_FILE,
1068                                      constants.DT_FILE)):
1069         self.BurnImportExport()
1070
1071       if opts.do_reinstall:
1072         self.BurnReinstall()
1073
1074       if opts.do_reboot:
1075         self.BurnReboot()
1076
1077       if opts.do_addremove_disks:
1078         self.BurnAddRemoveDisks()
1079
1080       default_nic_mode = self.cluster_default_nicparams[constants.NIC_MODE]
1081       # Don't add/remove nics in routed mode, as we would need an ip to add
1082       # them with
1083       if opts.do_addremove_nics:
1084         if default_nic_mode == constants.NIC_MODE_BRIDGED:
1085           self.BurnAddRemoveNICs()
1086         else:
1087           Log("Skipping nic add/remove as the cluster is not in bridged mode")
1088
1089       if opts.do_activate_disks:
1090         self.BurnActivateDisks()
1091
1092       if opts.rename:
1093         self.BurnRename()
1094
1095       if opts.do_confd_tests:
1096         self.BurnConfd()
1097
1098       if opts.do_startstop:
1099         self.BurnStopStart()
1100
1101       has_err = False
1102     finally:
1103       if has_err:
1104         Log("Error detected: opcode buffer follows:\n\n")
1105         Log(self.GetFeedbackBuf())
1106         Log("\n\n")
1107       if not self.opts.keep_instances:
1108         try:
1109           self.BurnRemove()
1110         except Exception, err:  # pylint: disable=W0703
1111           if has_err: # already detected errors, so errors in removal
1112                       # are quite expected
1113             Log("Note: error detected during instance remove: %s", err)
1114           else: # non-expected error
1115             raise
1116
1117     return constants.EXIT_SUCCESS
1118
1119
1120 def main():
1121   """Main function.
1122
1123   """
1124   utils.SetupLogging(constants.LOG_BURNIN, sys.argv[0],
1125                      debug=False, stderr_logging=True)
1126
1127   return Burner().BurninCluster()
1128
1129
1130 if __name__ == "__main__":
1131   main()