gnt-network: Fix CLI issues with node groups
[ganeti-local] / tools / burnin
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Burnin program
23
24 """
25
26 import sys
27 import optparse
28 import time
29 import socket
30 import urllib
31 from itertools import izip, islice, cycle
32 from cStringIO import StringIO
33
34 from ganeti import opcodes
35 from ganeti import constants
36 from ganeti import cli
37 from ganeti import errors
38 from ganeti import utils
39 from ganeti import hypervisor
40 from ganeti import compat
41 from ganeti import pathutils
42
43 from ganeti.confd import client as confd_client
44
45
46 USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...")
47
48 MAX_RETRIES = 3
49 LOG_HEADERS = {
50   0: "- ",
51   1: "* ",
52   2: "",
53   }
54
55
56 class InstanceDown(Exception):
57   """The checked instance was not up"""
58
59
60 class BurninFailure(Exception):
61   """Failure detected during burning"""
62
63
64 def Usage():
65   """Shows program usage information and exits the program."""
66
67   print >> sys.stderr, "Usage:"
68   print >> sys.stderr, USAGE
69   sys.exit(2)
70
71
72 def Log(msg, *args, **kwargs):
73   """Simple function that prints out its argument.
74
75   """
76   if args:
77     msg = msg % args
78   indent = kwargs.get("indent", 0)
79   sys.stdout.write("%*s%s%s\n" % (2 * indent, "",
80                                   LOG_HEADERS.get(indent, "  "), msg))
81   sys.stdout.flush()
82
83
84 def Err(msg, exit_code=1):
85   """Simple error logging that prints to stderr.
86
87   """
88   sys.stderr.write(msg + "\n")
89   sys.stderr.flush()
90   sys.exit(exit_code)
91
92
93 class SimpleOpener(urllib.FancyURLopener):
94   """A simple url opener"""
95   # pylint: disable=W0221
96
97   def prompt_user_passwd(self, host, realm, clear_cache=0):
98     """No-interaction version of prompt_user_passwd."""
99     # we follow parent class' API
100     # pylint: disable=W0613
101     return None, None
102
103   def http_error_default(self, url, fp, errcode, errmsg, headers):
104     """Custom error handling"""
105     # make sure sockets are not left in CLOSE_WAIT, this is similar
106     # but with a different exception to the BasicURLOpener class
107     _ = fp.read() # throw away data
108     fp.close()
109     raise InstanceDown("HTTP error returned: code %s, msg %s" %
110                        (errcode, errmsg))
111
112
113 OPTIONS = [
114   cli.cli_option("-o", "--os", dest="os", default=None,
115                  help="OS to use during burnin",
116                  metavar="<OS>",
117                  completion_suggest=cli.OPT_COMPL_ONE_OS),
118   cli.HYPERVISOR_OPT,
119   cli.OSPARAMS_OPT,
120   cli.cli_option("--disk-size", dest="disk_size",
121                  help="Disk size (determines disk count)",
122                  default="128m", type="string", metavar="<size,size,...>",
123                  completion_suggest=("128M 512M 1G 4G 1G,256M"
124                                      " 4G,1G,1G 10G").split()),
125   cli.cli_option("--disk-growth", dest="disk_growth", help="Disk growth",
126                  default="128m", type="string", metavar="<size,size,...>"),
127   cli.cli_option("--mem-size", dest="mem_size", help="Memory size",
128                  default=None, type="unit", metavar="<size>",
129                  completion_suggest=("128M 256M 512M 1G 4G 8G"
130                                      " 12G 16G").split()),
131   cli.cli_option("--maxmem-size", dest="maxmem_size", help="Max Memory size",
132                  default=256, type="unit", metavar="<size>",
133                  completion_suggest=("128M 256M 512M 1G 4G 8G"
134                                      " 12G 16G").split()),
135   cli.cli_option("--minmem-size", dest="minmem_size", help="Min Memory size",
136                  default=128, type="unit", metavar="<size>",
137                  completion_suggest=("128M 256M 512M 1G 4G 8G"
138                                      " 12G 16G").split()),
139   cli.cli_option("--vcpu-count", dest="vcpu_count", help="VCPU count",
140                  default=3, type="unit", metavar="<count>",
141                  completion_suggest=("1 2 3 4").split()),
142   cli.DEBUG_OPT,
143   cli.VERBOSE_OPT,
144   cli.NOIPCHECK_OPT,
145   cli.NONAMECHECK_OPT,
146   cli.EARLY_RELEASE_OPT,
147   cli.cli_option("--no-replace1", dest="do_replace1",
148                  help="Skip disk replacement with the same secondary",
149                  action="store_false", default=True),
150   cli.cli_option("--no-replace2", dest="do_replace2",
151                  help="Skip disk replacement with a different secondary",
152                  action="store_false", default=True),
153   cli.cli_option("--no-failover", dest="do_failover",
154                  help="Skip instance failovers", action="store_false",
155                  default=True),
156   cli.cli_option("--no-migrate", dest="do_migrate",
157                  help="Skip instance live migration",
158                  action="store_false", default=True),
159   cli.cli_option("--no-move", dest="do_move",
160                  help="Skip instance moves", action="store_false",
161                  default=True),
162   cli.cli_option("--no-importexport", dest="do_importexport",
163                  help="Skip instance export/import", action="store_false",
164                  default=True),
165   cli.cli_option("--no-startstop", dest="do_startstop",
166                  help="Skip instance stop/start", action="store_false",
167                  default=True),
168   cli.cli_option("--no-reinstall", dest="do_reinstall",
169                  help="Skip instance reinstall", action="store_false",
170                  default=True),
171   cli.cli_option("--no-reboot", dest="do_reboot",
172                  help="Skip instance reboot", action="store_false",
173                  default=True),
174   cli.cli_option("--no-renamesame", dest="do_renamesame",
175                  help="Skip instance rename to same name", action="store_false",
176                  default=True),
177   cli.cli_option("--reboot-types", dest="reboot_types",
178                  help="Specify the reboot types", default=None),
179   cli.cli_option("--no-activate-disks", dest="do_activate_disks",
180                  help="Skip disk activation/deactivation",
181                  action="store_false", default=True),
182   cli.cli_option("--no-add-disks", dest="do_addremove_disks",
183                  help="Skip disk addition/removal",
184                  action="store_false", default=True),
185   cli.cli_option("--no-add-nics", dest="do_addremove_nics",
186                  help="Skip NIC addition/removal",
187                  action="store_false", default=True),
188   cli.cli_option("--no-nics", dest="nics",
189                  help="No network interfaces", action="store_const",
190                  const=[], default=[{}]),
191   cli.cli_option("--no-confd", dest="do_confd_tests",
192                  help="Skip confd queries",
193                  action="store_false", default=constants.ENABLE_CONFD),
194   cli.cli_option("--rename", dest="rename", default=None,
195                  help=("Give one unused instance name which is taken"
196                        " to start the renaming sequence"),
197                  metavar="<instance_name>"),
198   cli.cli_option("-t", "--disk-template", dest="disk_template",
199                  choices=list(constants.DISK_TEMPLATES),
200                  default=constants.DT_DRBD8,
201                  help="Disk template (diskless, file, plain, sharedfile"
202                  " or drbd) [drbd]"),
203   cli.cli_option("-n", "--nodes", dest="nodes", default="",
204                  help=("Comma separated list of nodes to perform"
205                        " the burnin on (defaults to all nodes)"),
206                  completion_suggest=cli.OPT_COMPL_MANY_NODES),
207   cli.cli_option("-I", "--iallocator", dest="iallocator",
208                  default=None, type="string",
209                  help=("Perform the allocation using an iallocator"
210                        " instead of fixed node spread (node restrictions no"
211                        " longer apply, therefore -n/--nodes must not be"
212                        " used"),
213                  completion_suggest=cli.OPT_COMPL_ONE_IALLOCATOR),
214   cli.cli_option("-p", "--parallel", default=False, action="store_true",
215                  dest="parallel",
216                  help=("Enable parallelization of some operations in"
217                        " order to speed burnin or to test granular locking")),
218   cli.cli_option("--net-timeout", default=15, type="int",
219                  dest="net_timeout",
220                  help=("The instance check network timeout in seconds"
221                        " (defaults to 15 seconds)"),
222                  completion_suggest="15 60 300 900".split()),
223   cli.cli_option("-C", "--http-check", default=False, action="store_true",
224                  dest="http_check",
225                  help=("Enable checking of instance status via http,"
226                        " looking for /hostname.txt that should contain the"
227                        " name of the instance")),
228   cli.cli_option("-K", "--keep-instances", default=False,
229                  action="store_true",
230                  dest="keep_instances",
231                  help=("Leave instances on the cluster after burnin,"
232                        " for investigation in case of errors or simply"
233                        " to use them")),
234   ]
235
236 # Mainly used for bash completion
237 ARGUMENTS = [cli.ArgInstance(min=1)]
238
239
240 def _DoCheckInstances(fn):
241   """Decorator for checking instances.
242
243   """
244   def wrapper(self, *args, **kwargs):
245     val = fn(self, *args, **kwargs)
246     for instance in self.instances:
247       self._CheckInstanceAlive(instance) # pylint: disable=W0212
248     return val
249
250   return wrapper
251
252
253 def _DoBatch(retry):
254   """Decorator for possible batch operations.
255
256   Must come after the _DoCheckInstances decorator (if any).
257
258   @param retry: whether this is a retryable batch, will be
259       passed to StartBatch
260
261   """
262   def wrap(fn):
263     def batched(self, *args, **kwargs):
264       self.StartBatch(retry)
265       val = fn(self, *args, **kwargs)
266       self.CommitQueue()
267       return val
268     return batched
269
270   return wrap
271
272
273 class Burner(object):
274   """Burner class."""
275
276   def __init__(self):
277     """Constructor."""
278     self.url_opener = SimpleOpener()
279     self._feed_buf = StringIO()
280     self.nodes = []
281     self.instances = []
282     self.to_rem = []
283     self.queued_ops = []
284     self.opts = None
285     self.queue_retry = False
286     self.disk_count = self.disk_growth = self.disk_size = None
287     self.hvp = self.bep = None
288     self.ParseOptions()
289     self.cl = cli.GetClient()
290     self.GetState()
291
292   def ClearFeedbackBuf(self):
293     """Clear the feedback buffer."""
294     self._feed_buf.truncate(0)
295
296   def GetFeedbackBuf(self):
297     """Return the contents of the buffer."""
298     return self._feed_buf.getvalue()
299
300   def Feedback(self, msg):
301     """Acumulate feedback in our buffer."""
302     formatted_msg = "%s %s" % (time.ctime(utils.MergeTime(msg[0])), msg[2])
303     self._feed_buf.write(formatted_msg + "\n")
304     if self.opts.verbose:
305       Log(formatted_msg, indent=3)
306
307   def MaybeRetry(self, retry_count, msg, fn, *args):
308     """Possibly retry a given function execution.
309
310     @type retry_count: int
311     @param retry_count: retry counter:
312         - 0: non-retryable action
313         - 1: last retry for a retryable action
314         - MAX_RETRIES: original try for a retryable action
315     @type msg: str
316     @param msg: the kind of the operation
317     @type fn: callable
318     @param fn: the function to be called
319
320     """
321     try:
322       val = fn(*args)
323       if retry_count > 0 and retry_count < MAX_RETRIES:
324         Log("Idempotent %s succeeded after %d retries",
325             msg, MAX_RETRIES - retry_count)
326       return val
327     except Exception, err: # pylint: disable=W0703
328       if retry_count == 0:
329         Log("Non-idempotent %s failed, aborting", msg)
330         raise
331       elif retry_count == 1:
332         Log("Idempotent %s repeated failure, aborting", msg)
333         raise
334       else:
335         Log("Idempotent %s failed, retry #%d/%d: %s",
336             msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err)
337         self.MaybeRetry(retry_count - 1, msg, fn, *args)
338
339   def _ExecOp(self, *ops):
340     """Execute one or more opcodes and manage the exec buffer.
341
342     @return: if only opcode has been passed, we return its result;
343         otherwise we return the list of results
344
345     """
346     job_id = cli.SendJob(ops, cl=self.cl)
347     results = cli.PollJob(job_id, cl=self.cl, feedback_fn=self.Feedback)
348     if len(ops) == 1:
349       return results[0]
350     else:
351       return results
352
353   def ExecOp(self, retry, *ops):
354     """Execute one or more opcodes and manage the exec buffer.
355
356     @return: if only opcode has been passed, we return its result;
357         otherwise we return the list of results
358
359     """
360     if retry:
361       rval = MAX_RETRIES
362     else:
363       rval = 0
364     cli.SetGenericOpcodeOpts(ops, self.opts)
365     return self.MaybeRetry(rval, "opcode", self._ExecOp, *ops)
366
367   def ExecOrQueue(self, name, ops, post_process=None):
368     """Execute an opcode and manage the exec buffer."""
369     if self.opts.parallel:
370       cli.SetGenericOpcodeOpts(ops, self.opts)
371       self.queued_ops.append((ops, name, post_process))
372     else:
373       val = self.ExecOp(self.queue_retry, *ops) # pylint: disable=W0142
374       if post_process is not None:
375         post_process()
376       return val
377
378   def StartBatch(self, retry):
379     """Start a new batch of jobs.
380
381     @param retry: whether this is a retryable batch
382
383     """
384     self.queued_ops = []
385     self.queue_retry = retry
386
387   def CommitQueue(self):
388     """Execute all submitted opcodes in case of parallel burnin"""
389     if not self.opts.parallel or not self.queued_ops:
390       return
391
392     if self.queue_retry:
393       rval = MAX_RETRIES
394     else:
395       rval = 0
396
397     try:
398       results = self.MaybeRetry(rval, "jobset", self.ExecJobSet,
399                                 self.queued_ops)
400     finally:
401       self.queued_ops = []
402     return results
403
404   def ExecJobSet(self, jobs):
405     """Execute a set of jobs and return once all are done.
406
407     The method will return the list of results, if all jobs are
408     successful. Otherwise, OpExecError will be raised from within
409     cli.py.
410
411     """
412     self.ClearFeedbackBuf()
413     jex = cli.JobExecutor(cl=self.cl, feedback_fn=self.Feedback)
414     for ops, name, _ in jobs:
415       jex.QueueJob(name, *ops) # pylint: disable=W0142
416     try:
417       results = jex.GetResults()
418     except Exception, err: # pylint: disable=W0703
419       Log("Jobs failed: %s", err)
420       raise BurninFailure()
421
422     fail = False
423     val = []
424     for (_, name, post_process), (success, result) in zip(jobs, results):
425       if success:
426         if post_process:
427           try:
428             post_process()
429           except Exception, err: # pylint: disable=W0703
430             Log("Post process call for job %s failed: %s", name, err)
431             fail = True
432         val.append(result)
433       else:
434         fail = True
435
436     if fail:
437       raise BurninFailure()
438
439     return val
440
441   def ParseOptions(self):
442     """Parses the command line options.
443
444     In case of command line errors, it will show the usage and exit the
445     program.
446
447     """
448     parser = optparse.OptionParser(usage="\n%s" % USAGE,
449                                    version=("%%prog (ganeti) %s" %
450                                             constants.RELEASE_VERSION),
451                                    option_list=OPTIONS)
452
453     options, args = parser.parse_args()
454     if len(args) < 1 or options.os is None:
455       Usage()
456
457     if options.mem_size:
458       options.maxmem_size = options.mem_size
459       options.minmem_size = options.mem_size
460     elif options.minmem_size > options.maxmem_size:
461       Err("Maximum memory lower than minimum memory")
462
463     supported_disk_templates = (constants.DT_DISKLESS,
464                                 constants.DT_FILE,
465                                 constants.DT_SHARED_FILE,
466                                 constants.DT_PLAIN,
467                                 constants.DT_DRBD8,
468                                 constants.DT_RBD,
469                                 constants.DT_EXT,
470                                 )
471     if options.disk_template not in supported_disk_templates:
472       Err("Unknown disk template '%s'" % options.disk_template)
473
474     if options.disk_template == constants.DT_DISKLESS:
475       disk_size = disk_growth = []
476       options.do_addremove_disks = False
477     else:
478       disk_size = [utils.ParseUnit(v) for v in options.disk_size.split(",")]
479       disk_growth = [utils.ParseUnit(v)
480                      for v in options.disk_growth.split(",")]
481       if len(disk_growth) != len(disk_size):
482         Err("Wrong disk sizes/growth combination")
483     if ((disk_size and options.disk_template == constants.DT_DISKLESS) or
484         (not disk_size and options.disk_template != constants.DT_DISKLESS)):
485       Err("Wrong disk count/disk template combination")
486
487     self.disk_size = disk_size
488     self.disk_growth = disk_growth
489     self.disk_count = len(disk_size)
490
491     if options.nodes and options.iallocator:
492       Err("Give either the nodes option or the iallocator option, not both")
493
494     if options.http_check and not options.name_check:
495       Err("Can't enable HTTP checks without name checks")
496
497     self.opts = options
498     self.instances = args
499     self.bep = {
500       constants.BE_MINMEM: options.minmem_size,
501       constants.BE_MAXMEM: options.maxmem_size,
502       constants.BE_VCPUS: options.vcpu_count,
503       }
504
505     self.hypervisor = None
506     self.hvp = {}
507     if options.hypervisor:
508       self.hypervisor, self.hvp = options.hypervisor
509
510     if options.reboot_types is None:
511       options.reboot_types = constants.REBOOT_TYPES
512     else:
513       options.reboot_types = options.reboot_types.split(",")
514       rt_diff = set(options.reboot_types).difference(constants.REBOOT_TYPES)
515       if rt_diff:
516         Err("Invalid reboot types specified: %s" % utils.CommaJoin(rt_diff))
517
518     socket.setdefaulttimeout(options.net_timeout)
519
520   def GetState(self):
521     """Read the cluster state from the master daemon."""
522     if self.opts.nodes:
523       names = self.opts.nodes.split(",")
524     else:
525       names = []
526     try:
527       op = opcodes.OpNodeQuery(output_fields=["name", "offline", "drained"],
528                                names=names, use_locking=True)
529       result = self.ExecOp(True, op)
530     except errors.GenericError, err:
531       err_code, msg = cli.FormatError(err)
532       Err(msg, exit_code=err_code)
533     self.nodes = [data[0] for data in result if not (data[1] or data[2])]
534
535     op_diagnose = opcodes.OpOsDiagnose(output_fields=["name",
536                                                       "variants",
537                                                       "hidden"],
538                                        names=[])
539     result = self.ExecOp(True, op_diagnose)
540
541     if not result:
542       Err("Can't get the OS list")
543
544     found = False
545     for (name, variants, _) in result:
546       if self.opts.os in cli.CalculateOSNames(name, variants):
547         found = True
548         break
549
550     if not found:
551       Err("OS '%s' not found" % self.opts.os)
552
553     cluster_info = self.cl.QueryClusterInfo()
554     self.cluster_info = cluster_info
555     if not self.cluster_info:
556       Err("Can't get cluster info")
557
558     default_nic_params = self.cluster_info["nicparams"][constants.PP_DEFAULT]
559     self.cluster_default_nicparams = default_nic_params
560     if self.hypervisor is None:
561       self.hypervisor = self.cluster_info["default_hypervisor"]
562     self.hv_class = hypervisor.GetHypervisorClass(self.hypervisor)
563
564   @_DoCheckInstances
565   @_DoBatch(False)
566   def BurnCreateInstances(self):
567     """Create the given instances.
568
569     """
570     self.to_rem = []
571     mytor = izip(cycle(self.nodes),
572                  islice(cycle(self.nodes), 1, None),
573                  self.instances)
574
575     Log("Creating instances")
576     for pnode, snode, instance in mytor:
577       Log("instance %s", instance, indent=1)
578       if self.opts.iallocator:
579         pnode = snode = None
580         msg = "with iallocator %s" % self.opts.iallocator
581       elif self.opts.disk_template not in constants.DTS_INT_MIRROR:
582         snode = None
583         msg = "on %s" % pnode
584       else:
585         msg = "on %s, %s" % (pnode, snode)
586
587       Log(msg, indent=2)
588
589       op = opcodes.OpInstanceCreate(instance_name=instance,
590                                     disks=[{"size": size}
591                                            for size in self.disk_size],
592                                     disk_template=self.opts.disk_template,
593                                     nics=self.opts.nics,
594                                     mode=constants.INSTANCE_CREATE,
595                                     os_type=self.opts.os,
596                                     pnode=pnode,
597                                     snode=snode,
598                                     start=True,
599                                     ip_check=self.opts.ip_check,
600                                     name_check=self.opts.name_check,
601                                     wait_for_sync=True,
602                                     file_driver="loop",
603                                     file_storage_dir=None,
604                                     iallocator=self.opts.iallocator,
605                                     beparams=self.bep,
606                                     hvparams=self.hvp,
607                                     hypervisor=self.hypervisor,
608                                     osparams=self.opts.osparams,
609                                     )
610       remove_instance = lambda name: lambda: self.to_rem.append(name)
611       self.ExecOrQueue(instance, [op], post_process=remove_instance(instance))
612
613   @_DoBatch(False)
614   def BurnModifyRuntimeMemory(self):
615     """Alter the runtime memory."""
616     Log("Setting instance runtime memory")
617     for instance in self.instances:
618       Log("instance %s", instance, indent=1)
619       tgt_mem = self.bep[constants.BE_MINMEM]
620       op = opcodes.OpInstanceSetParams(instance_name=instance,
621                                        runtime_mem=tgt_mem)
622       Log("Set memory to %s MB", tgt_mem, indent=2)
623       self.ExecOrQueue(instance, [op])
624
625   @_DoBatch(False)
626   def BurnGrowDisks(self):
627     """Grow both the os and the swap disks by the requested amount, if any."""
628     Log("Growing disks")
629     for instance in self.instances:
630       Log("instance %s", instance, indent=1)
631       for idx, growth in enumerate(self.disk_growth):
632         if growth > 0:
633           op = opcodes.OpInstanceGrowDisk(instance_name=instance, disk=idx,
634                                           amount=growth, wait_for_sync=True)
635           Log("increase disk/%s by %s MB", idx, growth, indent=2)
636           self.ExecOrQueue(instance, [op])
637
638   @_DoBatch(True)
639   def BurnReplaceDisks1D8(self):
640     """Replace disks on primary and secondary for drbd8."""
641     Log("Replacing disks on the same nodes")
642     early_release = self.opts.early_release
643     for instance in self.instances:
644       Log("instance %s", instance, indent=1)
645       ops = []
646       for mode in constants.REPLACE_DISK_SEC, constants.REPLACE_DISK_PRI:
647         op = opcodes.OpInstanceReplaceDisks(instance_name=instance,
648                                             mode=mode,
649                                             disks=list(range(self.disk_count)),
650                                             early_release=early_release)
651         Log("run %s", mode, indent=2)
652         ops.append(op)
653       self.ExecOrQueue(instance, ops)
654
655   @_DoBatch(True)
656   def BurnReplaceDisks2(self):
657     """Replace secondary node."""
658     Log("Changing the secondary node")
659     mode = constants.REPLACE_DISK_CHG
660
661     mytor = izip(islice(cycle(self.nodes), 2, None),
662                  self.instances)
663     for tnode, instance in mytor:
664       Log("instance %s", instance, indent=1)
665       if self.opts.iallocator:
666         tnode = None
667         msg = "with iallocator %s" % self.opts.iallocator
668       else:
669         msg = tnode
670       op = opcodes.OpInstanceReplaceDisks(instance_name=instance,
671                                           mode=mode,
672                                           remote_node=tnode,
673                                           iallocator=self.opts.iallocator,
674                                           disks=[],
675                                           early_release=self.opts.early_release)
676       Log("run %s %s", mode, msg, indent=2)
677       self.ExecOrQueue(instance, [op])
678
679   @_DoCheckInstances
680   @_DoBatch(False)
681   def BurnFailover(self):
682     """Failover the instances."""
683     Log("Failing over instances")
684     for instance in self.instances:
685       Log("instance %s", instance, indent=1)
686       op = opcodes.OpInstanceFailover(instance_name=instance,
687                                       ignore_consistency=False)
688       self.ExecOrQueue(instance, [op])
689
690   @_DoCheckInstances
691   @_DoBatch(False)
692   def BurnMove(self):
693     """Move the instances."""
694     Log("Moving instances")
695     mytor = izip(islice(cycle(self.nodes), 1, None),
696                  self.instances)
697     for tnode, instance in mytor:
698       Log("instance %s", instance, indent=1)
699       op = opcodes.OpInstanceMove(instance_name=instance,
700                                   target_node=tnode)
701       self.ExecOrQueue(instance, [op])
702
703   @_DoBatch(False)
704   def BurnMigrate(self):
705     """Migrate the instances."""
706     Log("Migrating instances")
707     for instance in self.instances:
708       Log("instance %s", instance, indent=1)
709       op1 = opcodes.OpInstanceMigrate(instance_name=instance, mode=None,
710                                       cleanup=False)
711
712       op2 = opcodes.OpInstanceMigrate(instance_name=instance, mode=None,
713                                       cleanup=True)
714       Log("migration and migration cleanup", indent=2)
715       self.ExecOrQueue(instance, [op1, op2])
716
717   @_DoCheckInstances
718   @_DoBatch(False)
719   def BurnImportExport(self):
720     """Export the instance, delete it, and import it back.
721
722     """
723     Log("Exporting and re-importing instances")
724     mytor = izip(cycle(self.nodes),
725                  islice(cycle(self.nodes), 1, None),
726                  islice(cycle(self.nodes), 2, None),
727                  self.instances)
728
729     for pnode, snode, enode, instance in mytor:
730       Log("instance %s", instance, indent=1)
731       # read the full name of the instance
732       nam_op = opcodes.OpInstanceQuery(output_fields=["name"],
733                                        names=[instance], use_locking=True)
734       full_name = self.ExecOp(False, nam_op)[0][0]
735
736       if self.opts.iallocator:
737         pnode = snode = None
738         import_log_msg = ("import from %s"
739                           " with iallocator %s" %
740                           (enode, self.opts.iallocator))
741       elif self.opts.disk_template not in constants.DTS_INT_MIRROR:
742         snode = None
743         import_log_msg = ("import from %s to %s" %
744                           (enode, pnode))
745       else:
746         import_log_msg = ("import from %s to %s, %s" %
747                           (enode, pnode, snode))
748
749       exp_op = opcodes.OpBackupExport(instance_name=instance,
750                                       target_node=enode,
751                                       mode=constants.EXPORT_MODE_LOCAL,
752                                       shutdown=True)
753       rem_op = opcodes.OpInstanceRemove(instance_name=instance,
754                                         ignore_failures=True)
755       imp_dir = utils.PathJoin(pathutils.EXPORT_DIR, full_name)
756       imp_op = opcodes.OpInstanceCreate(instance_name=instance,
757                                         disks=[{"size": size}
758                                                for size in self.disk_size],
759                                         disk_template=self.opts.disk_template,
760                                         nics=self.opts.nics,
761                                         mode=constants.INSTANCE_IMPORT,
762                                         src_node=enode,
763                                         src_path=imp_dir,
764                                         pnode=pnode,
765                                         snode=snode,
766                                         start=True,
767                                         ip_check=self.opts.ip_check,
768                                         name_check=self.opts.name_check,
769                                         wait_for_sync=True,
770                                         file_storage_dir=None,
771                                         file_driver="loop",
772                                         iallocator=self.opts.iallocator,
773                                         beparams=self.bep,
774                                         hvparams=self.hvp,
775                                         osparams=self.opts.osparams,
776                                         )
777
778       erem_op = opcodes.OpBackupRemove(instance_name=instance)
779
780       Log("export to node %s", enode, indent=2)
781       Log("remove instance", indent=2)
782       Log(import_log_msg, indent=2)
783       Log("remove export", indent=2)
784       self.ExecOrQueue(instance, [exp_op, rem_op, imp_op, erem_op])
785
786   @staticmethod
787   def StopInstanceOp(instance):
788     """Stop given instance."""
789     return opcodes.OpInstanceShutdown(instance_name=instance)
790
791   @staticmethod
792   def StartInstanceOp(instance):
793     """Start given instance."""
794     return opcodes.OpInstanceStartup(instance_name=instance, force=False)
795
796   @staticmethod
797   def RenameInstanceOp(instance, instance_new):
798     """Rename instance."""
799     return opcodes.OpInstanceRename(instance_name=instance,
800                                     new_name=instance_new)
801
802   @_DoCheckInstances
803   @_DoBatch(True)
804   def BurnStopStart(self):
805     """Stop/start the instances."""
806     Log("Stopping and starting instances")
807     for instance in self.instances:
808       Log("instance %s", instance, indent=1)
809       op1 = self.StopInstanceOp(instance)
810       op2 = self.StartInstanceOp(instance)
811       self.ExecOrQueue(instance, [op1, op2])
812
813   @_DoBatch(False)
814   def BurnRemove(self):
815     """Remove the instances."""
816     Log("Removing instances")
817     for instance in self.to_rem:
818       Log("instance %s", instance, indent=1)
819       op = opcodes.OpInstanceRemove(instance_name=instance,
820                                     ignore_failures=True)
821       self.ExecOrQueue(instance, [op])
822
823   def BurnRename(self):
824     """Rename the instances.
825
826     Note that this function will not execute in parallel, since we
827     only have one target for rename.
828
829     """
830     Log("Renaming instances")
831     rename = self.opts.rename
832     for instance in self.instances:
833       Log("instance %s", instance, indent=1)
834       op_stop1 = self.StopInstanceOp(instance)
835       op_stop2 = self.StopInstanceOp(rename)
836       op_rename1 = self.RenameInstanceOp(instance, rename)
837       op_rename2 = self.RenameInstanceOp(rename, instance)
838       op_start1 = self.StartInstanceOp(rename)
839       op_start2 = self.StartInstanceOp(instance)
840       self.ExecOp(False, op_stop1, op_rename1, op_start1)
841       self._CheckInstanceAlive(rename)
842       self.ExecOp(False, op_stop2, op_rename2, op_start2)
843       self._CheckInstanceAlive(instance)
844
845   @_DoCheckInstances
846   @_DoBatch(True)
847   def BurnReinstall(self):
848     """Reinstall the instances."""
849     Log("Reinstalling instances")
850     for instance in self.instances:
851       Log("instance %s", instance, indent=1)
852       op1 = self.StopInstanceOp(instance)
853       op2 = opcodes.OpInstanceReinstall(instance_name=instance)
854       Log("reinstall without passing the OS", indent=2)
855       op3 = opcodes.OpInstanceReinstall(instance_name=instance,
856                                         os_type=self.opts.os)
857       Log("reinstall specifying the OS", indent=2)
858       op4 = self.StartInstanceOp(instance)
859       self.ExecOrQueue(instance, [op1, op2, op3, op4])
860
861   @_DoCheckInstances
862   @_DoBatch(True)
863   def BurnReboot(self):
864     """Reboot the instances."""
865     Log("Rebooting instances")
866     for instance in self.instances:
867       Log("instance %s", instance, indent=1)
868       ops = []
869       for reboot_type in self.opts.reboot_types:
870         op = opcodes.OpInstanceReboot(instance_name=instance,
871                                       reboot_type=reboot_type,
872                                       ignore_secondaries=False)
873         Log("reboot with type '%s'", reboot_type, indent=2)
874         ops.append(op)
875       self.ExecOrQueue(instance, ops)
876
877   @_DoCheckInstances
878   @_DoBatch(True)
879   def BurnRenameSame(self):
880     """Rename the instances to their own name."""
881     Log("Renaming the instances to their own name")
882     for instance in self.instances:
883       Log("instance %s", instance, indent=1)
884       op1 = self.StopInstanceOp(instance)
885       op2 = self.RenameInstanceOp(instance, instance)
886       Log("rename to the same name", indent=2)
887       op4 = self.StartInstanceOp(instance)
888       self.ExecOrQueue(instance, [op1, op2, op4])
889
890   @_DoCheckInstances
891   @_DoBatch(True)
892   def BurnActivateDisks(self):
893     """Activate and deactivate disks of the instances."""
894     Log("Activating/deactivating disks")
895     for instance in self.instances:
896       Log("instance %s", instance, indent=1)
897       op_start = self.StartInstanceOp(instance)
898       op_act = opcodes.OpInstanceActivateDisks(instance_name=instance)
899       op_deact = opcodes.OpInstanceDeactivateDisks(instance_name=instance)
900       op_stop = self.StopInstanceOp(instance)
901       Log("activate disks when online", indent=2)
902       Log("activate disks when offline", indent=2)
903       Log("deactivate disks (when offline)", indent=2)
904       self.ExecOrQueue(instance, [op_act, op_stop, op_act, op_deact, op_start])
905
906   @_DoCheckInstances
907   @_DoBatch(False)
908   def BurnAddRemoveDisks(self):
909     """Add and remove an extra disk for the instances."""
910     Log("Adding and removing disks")
911     for instance in self.instances:
912       Log("instance %s", instance, indent=1)
913       op_add = opcodes.OpInstanceSetParams(
914         instance_name=instance,
915         disks=[(constants.DDM_ADD, {"size": self.disk_size[0]})])
916       op_rem = opcodes.OpInstanceSetParams(
917         instance_name=instance, disks=[(constants.DDM_REMOVE, {})])
918       op_stop = self.StopInstanceOp(instance)
919       op_start = self.StartInstanceOp(instance)
920       Log("adding a disk", indent=2)
921       Log("removing last disk", indent=2)
922       self.ExecOrQueue(instance, [op_add, op_stop, op_rem, op_start])
923
924   @_DoBatch(False)
925   def BurnAddRemoveNICs(self):
926     """Add, change and remove an extra NIC for the instances."""
927     Log("Adding and removing NICs")
928     for instance in self.instances:
929       Log("instance %s", instance, indent=1)
930       op_add = opcodes.OpInstanceSetParams(
931         instance_name=instance, nics=[(constants.DDM_ADD, {})])
932       op_chg = opcodes.OpInstanceSetParams(
933         instance_name=instance, nics=[(constants.DDM_MODIFY,
934                                        -1, {"mac": constants.VALUE_GENERATE})])
935       op_rem = opcodes.OpInstanceSetParams(
936         instance_name=instance, nics=[(constants.DDM_REMOVE, {})])
937       Log("adding a NIC", indent=2)
938       Log("changing a NIC", indent=2)
939       Log("removing last NIC", indent=2)
940       self.ExecOrQueue(instance, [op_add, op_chg, op_rem])
941
942   def ConfdCallback(self, reply):
943     """Callback for confd queries"""
944     if reply.type == confd_client.UPCALL_REPLY:
945       if reply.server_reply.status != constants.CONFD_REPL_STATUS_OK:
946         Err("Query %s gave non-ok status %s: %s" % (reply.orig_request,
947                                                     reply.server_reply.status,
948                                                     reply.server_reply))
949       if reply.orig_request.type == constants.CONFD_REQ_PING:
950         Log("Ping: OK", indent=1)
951       elif reply.orig_request.type == constants.CONFD_REQ_CLUSTER_MASTER:
952         if reply.server_reply.answer == self.cluster_info["master"]:
953           Log("Master: OK", indent=1)
954         else:
955           Err("Master: wrong: %s" % reply.server_reply.answer)
956       elif reply.orig_request.type == constants.CONFD_REQ_NODE_ROLE_BYNAME:
957         if reply.server_reply.answer == constants.CONFD_NODE_ROLE_MASTER:
958           Log("Node role for master: OK", indent=1)
959         else:
960           Err("Node role for master: wrong: %s" % reply.server_reply.answer)
961
962   def DoConfdRequestReply(self, req):
963     self.confd_counting_callback.RegisterQuery(req.rsalt)
964     self.confd_client.SendRequest(req, async=False)
965     while not self.confd_counting_callback.AllAnswered():
966       if not self.confd_client.ReceiveReply():
967         Err("Did not receive all expected confd replies")
968         break
969
970   def BurnConfd(self):
971     """Run confd queries for our instances.
972
973     The following confd queries are tested:
974       - CONFD_REQ_PING: simple ping
975       - CONFD_REQ_CLUSTER_MASTER: cluster master
976       - CONFD_REQ_NODE_ROLE_BYNAME: node role, for the master
977
978     """
979     Log("Checking confd results")
980
981     filter_callback = confd_client.ConfdFilterCallback(self.ConfdCallback)
982     counting_callback = confd_client.ConfdCountingCallback(filter_callback)
983     self.confd_counting_callback = counting_callback
984
985     self.confd_client = confd_client.GetConfdClient(counting_callback)
986
987     req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
988     self.DoConfdRequestReply(req)
989
990     req = confd_client.ConfdClientRequest(
991       type=constants.CONFD_REQ_CLUSTER_MASTER)
992     self.DoConfdRequestReply(req)
993
994     req = confd_client.ConfdClientRequest(
995         type=constants.CONFD_REQ_NODE_ROLE_BYNAME,
996         query=self.cluster_info["master"])
997     self.DoConfdRequestReply(req)
998
999   def _CheckInstanceAlive(self, instance):
1000     """Check if an instance is alive by doing http checks.
1001
1002     This will try to retrieve the url on the instance /hostname.txt
1003     and check that it contains the hostname of the instance. In case
1004     we get ECONNREFUSED, we retry up to the net timeout seconds, for
1005     any other error we abort.
1006
1007     """
1008     if not self.opts.http_check:
1009       return
1010     end_time = time.time() + self.opts.net_timeout
1011     url = None
1012     while time.time() < end_time and url is None:
1013       try:
1014         url = self.url_opener.open("http://%s/hostname.txt" % instance)
1015       except IOError:
1016         # here we can have connection refused, no route to host, etc.
1017         time.sleep(1)
1018     if url is None:
1019       raise InstanceDown(instance, "Cannot contact instance")
1020     hostname = url.read().strip()
1021     url.close()
1022     if hostname != instance:
1023       raise InstanceDown(instance, ("Hostname mismatch, expected %s, got %s" %
1024                                     (instance, hostname)))
1025
1026   def BurninCluster(self):
1027     """Test a cluster intensively.
1028
1029     This will create instances and then start/stop/failover them.
1030     It is safe for existing instances but could impact performance.
1031
1032     """
1033
1034     opts = self.opts
1035
1036     Log("Testing global parameters")
1037
1038     if (len(self.nodes) == 1 and
1039         opts.disk_template not in (constants.DT_DISKLESS, constants.DT_PLAIN,
1040                                    constants.DT_FILE,
1041                                    constants.DT_SHARED_FILE)):
1042       Err("When one node is available/selected the disk template must"
1043           " be 'diskless', 'file' or 'plain'")
1044
1045     if opts.do_confd_tests and not constants.ENABLE_CONFD:
1046       Err("You selected confd tests but confd was disabled at configure time")
1047
1048     has_err = True
1049     try:
1050       self.BurnCreateInstances()
1051
1052       if self.bep[constants.BE_MINMEM] < self.bep[constants.BE_MAXMEM]:
1053         self.BurnModifyRuntimeMemory()
1054
1055       if opts.do_replace1 and opts.disk_template in constants.DTS_INT_MIRROR:
1056         self.BurnReplaceDisks1D8()
1057       if (opts.do_replace2 and len(self.nodes) > 2 and
1058           opts.disk_template in constants.DTS_INT_MIRROR):
1059         self.BurnReplaceDisks2()
1060
1061       if (opts.disk_template in constants.DTS_GROWABLE and
1062           compat.any(n > 0 for n in self.disk_growth)):
1063         self.BurnGrowDisks()
1064
1065       if opts.do_failover and opts.disk_template in constants.DTS_MIRRORED:
1066         self.BurnFailover()
1067
1068       if opts.do_migrate:
1069         if opts.disk_template not in constants.DTS_MIRRORED:
1070           Log("Skipping migration (disk template %s does not support it)",
1071               opts.disk_template)
1072         elif not self.hv_class.CAN_MIGRATE:
1073           Log("Skipping migration (hypervisor %s does not support it)",
1074               self.hypervisor)
1075         else:
1076           self.BurnMigrate()
1077
1078       if (opts.do_move and len(self.nodes) > 1 and
1079           opts.disk_template in [constants.DT_PLAIN, constants.DT_FILE]):
1080         self.BurnMove()
1081
1082       if (opts.do_importexport and
1083           opts.disk_template not in (constants.DT_DISKLESS,
1084                                      constants.DT_SHARED_FILE,
1085                                      constants.DT_FILE)):
1086         self.BurnImportExport()
1087
1088       if opts.do_reinstall:
1089         self.BurnReinstall()
1090
1091       if opts.do_reboot:
1092         self.BurnReboot()
1093
1094       if opts.do_renamesame:
1095         self.BurnRenameSame()
1096
1097       if opts.do_addremove_disks:
1098         self.BurnAddRemoveDisks()
1099
1100       default_nic_mode = self.cluster_default_nicparams[constants.NIC_MODE]
1101       # Don't add/remove nics in routed mode, as we would need an ip to add
1102       # them with
1103       if opts.do_addremove_nics:
1104         if default_nic_mode == constants.NIC_MODE_BRIDGED:
1105           self.BurnAddRemoveNICs()
1106         else:
1107           Log("Skipping nic add/remove as the cluster is not in bridged mode")
1108
1109       if opts.do_activate_disks:
1110         self.BurnActivateDisks()
1111
1112       if opts.rename:
1113         self.BurnRename()
1114
1115       if opts.do_confd_tests:
1116         self.BurnConfd()
1117
1118       if opts.do_startstop:
1119         self.BurnStopStart()
1120
1121       has_err = False
1122     finally:
1123       if has_err:
1124         Log("Error detected: opcode buffer follows:\n\n")
1125         Log(self.GetFeedbackBuf())
1126         Log("\n\n")
1127       if not self.opts.keep_instances:
1128         try:
1129           self.BurnRemove()
1130         except Exception, err:  # pylint: disable=W0703
1131           if has_err: # already detected errors, so errors in removal
1132                       # are quite expected
1133             Log("Note: error detected during instance remove: %s", err)
1134           else: # non-expected error
1135             raise
1136
1137     return constants.EXIT_SUCCESS
1138
1139
1140 def main():
1141   """Main function.
1142
1143   """
1144   utils.SetupLogging(pathutils.LOG_BURNIN, sys.argv[0],
1145                      debug=False, stderr_logging=True)
1146
1147   return Burner().BurninCluster()
1148
1149
1150 if __name__ == "__main__":
1151   main()