Add opcode parameter descriptions
[ganeti-local] / tools / burnin
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Burnin program
23
24 """
25
26 import sys
27 import optparse
28 import time
29 import socket
30 import urllib
31 from itertools import izip, islice, cycle
32 from cStringIO import StringIO
33
34 from ganeti import opcodes
35 from ganeti import constants
36 from ganeti import cli
37 from ganeti import errors
38 from ganeti import utils
39 from ganeti import hypervisor
40 from ganeti import compat
41
42 from ganeti.confd import client as confd_client
43
44
45 USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...")
46
47 MAX_RETRIES = 3
48 LOG_HEADERS = {
49   0: "- ",
50   1: "* ",
51   2: ""
52   }
53
54
55 class InstanceDown(Exception):
56   """The checked instance was not up"""
57
58
59 class BurninFailure(Exception):
60   """Failure detected during burning"""
61
62
63 def Usage():
64   """Shows program usage information and exits the program."""
65
66   print >> sys.stderr, "Usage:"
67   print >> sys.stderr, USAGE
68   sys.exit(2)
69
70
71 def Log(msg, *args, **kwargs):
72   """Simple function that prints out its argument.
73
74   """
75   if args:
76     msg = msg % args
77   indent = kwargs.get("indent", 0)
78   sys.stdout.write("%*s%s%s\n" % (2 * indent, "",
79                                   LOG_HEADERS.get(indent, "  "), msg))
80   sys.stdout.flush()
81
82
83 def Err(msg, exit_code=1):
84   """Simple error logging that prints to stderr.
85
86   """
87   sys.stderr.write(msg + "\n")
88   sys.stderr.flush()
89   sys.exit(exit_code)
90
91
92 class SimpleOpener(urllib.FancyURLopener):
93   """A simple url opener"""
94   # pylint: disable=W0221
95
96   def prompt_user_passwd(self, host, realm, clear_cache=0):
97     """No-interaction version of prompt_user_passwd."""
98     # we follow parent class' API
99     # pylint: disable=W0613
100     return None, None
101
102   def http_error_default(self, url, fp, errcode, errmsg, headers):
103     """Custom error handling"""
104     # make sure sockets are not left in CLOSE_WAIT, this is similar
105     # but with a different exception to the BasicURLOpener class
106     _ = fp.read() # throw away data
107     fp.close()
108     raise InstanceDown("HTTP error returned: code %s, msg %s" %
109                        (errcode, errmsg))
110
111
112 OPTIONS = [
113   cli.cli_option("-o", "--os", dest="os", default=None,
114                  help="OS to use during burnin",
115                  metavar="<OS>",
116                  completion_suggest=cli.OPT_COMPL_ONE_OS),
117   cli.HYPERVISOR_OPT,
118   cli.OSPARAMS_OPT,
119   cli.cli_option("--disk-size", dest="disk_size",
120                  help="Disk size (determines disk count)",
121                  default="128m", type="string", metavar="<size,size,...>",
122                  completion_suggest=("128M 512M 1G 4G 1G,256M"
123                                      " 4G,1G,1G 10G").split()),
124   cli.cli_option("--disk-growth", dest="disk_growth", help="Disk growth",
125                  default="128m", type="string", metavar="<size,size,...>"),
126   cli.cli_option("--mem-size", dest="mem_size", help="Memory size",
127                  default=None, type="unit", metavar="<size>",
128                  completion_suggest=("128M 256M 512M 1G 4G 8G"
129                                      " 12G 16G").split()),
130   cli.cli_option("--maxmem-size", dest="maxmem_size", help="Max Memory size",
131                  default=256, type="unit", metavar="<size>",
132                  completion_suggest=("128M 256M 512M 1G 4G 8G"
133                                      " 12G 16G").split()),
134   cli.cli_option("--minmem-size", dest="minmem_size", help="Min Memory size",
135                  default=128, type="unit", metavar="<size>",
136                  completion_suggest=("128M 256M 512M 1G 4G 8G"
137                                      " 12G 16G").split()),
138   cli.cli_option("--vcpu-count", dest="vcpu_count", help="VCPU count",
139                  default=3, type="unit", metavar="<count>",
140                  completion_suggest=("1 2 3 4").split()),
141   cli.DEBUG_OPT,
142   cli.VERBOSE_OPT,
143   cli.NOIPCHECK_OPT,
144   cli.NONAMECHECK_OPT,
145   cli.EARLY_RELEASE_OPT,
146   cli.cli_option("--no-replace1", dest="do_replace1",
147                  help="Skip disk replacement with the same secondary",
148                  action="store_false", default=True),
149   cli.cli_option("--no-replace2", dest="do_replace2",
150                  help="Skip disk replacement with a different secondary",
151                  action="store_false", default=True),
152   cli.cli_option("--no-failover", dest="do_failover",
153                  help="Skip instance failovers", action="store_false",
154                  default=True),
155   cli.cli_option("--no-migrate", dest="do_migrate",
156                  help="Skip instance live migration",
157                  action="store_false", default=True),
158   cli.cli_option("--no-move", dest="do_move",
159                  help="Skip instance moves", action="store_false",
160                  default=True),
161   cli.cli_option("--no-importexport", dest="do_importexport",
162                  help="Skip instance export/import", action="store_false",
163                  default=True),
164   cli.cli_option("--no-startstop", dest="do_startstop",
165                  help="Skip instance stop/start", action="store_false",
166                  default=True),
167   cli.cli_option("--no-reinstall", dest="do_reinstall",
168                  help="Skip instance reinstall", action="store_false",
169                  default=True),
170   cli.cli_option("--no-reboot", dest="do_reboot",
171                  help="Skip instance reboot", action="store_false",
172                  default=True),
173   cli.cli_option("--reboot-types", dest="reboot_types",
174                  help="Specify the reboot types", default=None),
175   cli.cli_option("--no-activate-disks", dest="do_activate_disks",
176                  help="Skip disk activation/deactivation",
177                  action="store_false", default=True),
178   cli.cli_option("--no-add-disks", dest="do_addremove_disks",
179                  help="Skip disk addition/removal",
180                  action="store_false", default=True),
181   cli.cli_option("--no-add-nics", dest="do_addremove_nics",
182                  help="Skip NIC addition/removal",
183                  action="store_false", default=True),
184   cli.cli_option("--no-nics", dest="nics",
185                  help="No network interfaces", action="store_const",
186                  const=[], default=[{}]),
187   cli.cli_option("--no-confd", dest="do_confd_tests",
188                  help="Skip confd queries",
189                  action="store_false", default=constants.ENABLE_CONFD),
190   cli.cli_option("--rename", dest="rename", default=None,
191                  help=("Give one unused instance name which is taken"
192                        " to start the renaming sequence"),
193                  metavar="<instance_name>"),
194   cli.cli_option("-t", "--disk-template", dest="disk_template",
195                  choices=list(constants.DISK_TEMPLATES),
196                  default=constants.DT_DRBD8,
197                  help="Disk template (diskless, file, plain, sharedfile"
198                  " or drbd) [drbd]"),
199   cli.cli_option("-n", "--nodes", dest="nodes", default="",
200                  help=("Comma separated list of nodes to perform"
201                        " the burnin on (defaults to all nodes)"),
202                  completion_suggest=cli.OPT_COMPL_MANY_NODES),
203   cli.cli_option("-I", "--iallocator", dest="iallocator",
204                  default=None, type="string",
205                  help=("Perform the allocation using an iallocator"
206                        " instead of fixed node spread (node restrictions no"
207                        " longer apply, therefore -n/--nodes must not be"
208                        " used"),
209                  completion_suggest=cli.OPT_COMPL_ONE_IALLOCATOR),
210   cli.cli_option("-p", "--parallel", default=False, action="store_true",
211                  dest="parallel",
212                  help=("Enable parallelization of some operations in"
213                        " order to speed burnin or to test granular locking")),
214   cli.cli_option("--net-timeout", default=15, type="int",
215                  dest="net_timeout",
216                  help=("The instance check network timeout in seconds"
217                        " (defaults to 15 seconds)"),
218                  completion_suggest="15 60 300 900".split()),
219   cli.cli_option("-C", "--http-check", default=False, action="store_true",
220                  dest="http_check",
221                  help=("Enable checking of instance status via http,"
222                        " looking for /hostname.txt that should contain the"
223                        " name of the instance")),
224   cli.cli_option("-K", "--keep-instances", default=False,
225                  action="store_true",
226                  dest="keep_instances",
227                  help=("Leave instances on the cluster after burnin,"
228                        " for investigation in case of errors or simply"
229                        " to use them")),
230   ]
231
232 # Mainly used for bash completion
233 ARGUMENTS = [cli.ArgInstance(min=1)]
234
235
236 def _DoCheckInstances(fn):
237   """Decorator for checking instances.
238
239   """
240   def wrapper(self, *args, **kwargs):
241     val = fn(self, *args, **kwargs)
242     for instance in self.instances:
243       self._CheckInstanceAlive(instance) # pylint: disable=W0212
244     return val
245
246   return wrapper
247
248
249 def _DoBatch(retry):
250   """Decorator for possible batch operations.
251
252   Must come after the _DoCheckInstances decorator (if any).
253
254   @param retry: whether this is a retryable batch, will be
255       passed to StartBatch
256
257   """
258   def wrap(fn):
259     def batched(self, *args, **kwargs):
260       self.StartBatch(retry)
261       val = fn(self, *args, **kwargs)
262       self.CommitQueue()
263       return val
264     return batched
265
266   return wrap
267
268
269 class Burner(object):
270   """Burner class."""
271
272   def __init__(self):
273     """Constructor."""
274     self.url_opener = SimpleOpener()
275     self._feed_buf = StringIO()
276     self.nodes = []
277     self.instances = []
278     self.to_rem = []
279     self.queued_ops = []
280     self.opts = None
281     self.queue_retry = False
282     self.disk_count = self.disk_growth = self.disk_size = None
283     self.hvp = self.bep = None
284     self.ParseOptions()
285     self.cl = cli.GetClient()
286     self.GetState()
287
288   def ClearFeedbackBuf(self):
289     """Clear the feedback buffer."""
290     self._feed_buf.truncate(0)
291
292   def GetFeedbackBuf(self):
293     """Return the contents of the buffer."""
294     return self._feed_buf.getvalue()
295
296   def Feedback(self, msg):
297     """Acumulate feedback in our buffer."""
298     formatted_msg = "%s %s" % (time.ctime(utils.MergeTime(msg[0])), msg[2])
299     self._feed_buf.write(formatted_msg + "\n")
300     if self.opts.verbose:
301       Log(formatted_msg, indent=3)
302
303   def MaybeRetry(self, retry_count, msg, fn, *args):
304     """Possibly retry a given function execution.
305
306     @type retry_count: int
307     @param retry_count: retry counter:
308         - 0: non-retryable action
309         - 1: last retry for a retryable action
310         - MAX_RETRIES: original try for a retryable action
311     @type msg: str
312     @param msg: the kind of the operation
313     @type fn: callable
314     @param fn: the function to be called
315
316     """
317     try:
318       val = fn(*args)
319       if retry_count > 0 and retry_count < MAX_RETRIES:
320         Log("Idempotent %s succeeded after %d retries",
321             msg, MAX_RETRIES - retry_count)
322       return val
323     except Exception, err: # pylint: disable=W0703
324       if retry_count == 0:
325         Log("Non-idempotent %s failed, aborting", msg)
326         raise
327       elif retry_count == 1:
328         Log("Idempotent %s repeated failure, aborting", msg)
329         raise
330       else:
331         Log("Idempotent %s failed, retry #%d/%d: %s",
332             msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err)
333         self.MaybeRetry(retry_count - 1, msg, fn, *args)
334
335   def _ExecOp(self, *ops):
336     """Execute one or more opcodes and manage the exec buffer.
337
338     @return: if only opcode has been passed, we return its result;
339         otherwise we return the list of results
340
341     """
342     job_id = cli.SendJob(ops, cl=self.cl)
343     results = cli.PollJob(job_id, cl=self.cl, feedback_fn=self.Feedback)
344     if len(ops) == 1:
345       return results[0]
346     else:
347       return results
348
349   def ExecOp(self, retry, *ops):
350     """Execute one or more opcodes and manage the exec buffer.
351
352     @return: if only opcode has been passed, we return its result;
353         otherwise we return the list of results
354
355     """
356     if retry:
357       rval = MAX_RETRIES
358     else:
359       rval = 0
360     cli.SetGenericOpcodeOpts(ops, self.opts)
361     return self.MaybeRetry(rval, "opcode", self._ExecOp, *ops)
362
363   def ExecOrQueue(self, name, ops, post_process=None):
364     """Execute an opcode and manage the exec buffer."""
365     if self.opts.parallel:
366       cli.SetGenericOpcodeOpts(ops, self.opts)
367       self.queued_ops.append((ops, name, post_process))
368     else:
369       val = self.ExecOp(self.queue_retry, *ops) # pylint: disable=W0142
370       if post_process is not None:
371         post_process()
372       return val
373
374   def StartBatch(self, retry):
375     """Start a new batch of jobs.
376
377     @param retry: whether this is a retryable batch
378
379     """
380     self.queued_ops = []
381     self.queue_retry = retry
382
383   def CommitQueue(self):
384     """Execute all submitted opcodes in case of parallel burnin"""
385     if not self.opts.parallel or not self.queued_ops:
386       return
387
388     if self.queue_retry:
389       rval = MAX_RETRIES
390     else:
391       rval = 0
392
393     try:
394       results = self.MaybeRetry(rval, "jobset", self.ExecJobSet,
395                                 self.queued_ops)
396     finally:
397       self.queued_ops = []
398     return results
399
400   def ExecJobSet(self, jobs):
401     """Execute a set of jobs and return once all are done.
402
403     The method will return the list of results, if all jobs are
404     successful. Otherwise, OpExecError will be raised from within
405     cli.py.
406
407     """
408     self.ClearFeedbackBuf()
409     jex = cli.JobExecutor(cl=self.cl, feedback_fn=self.Feedback)
410     for ops, name, _ in jobs:
411       jex.QueueJob(name, *ops) # pylint: disable=W0142
412     try:
413       results = jex.GetResults()
414     except Exception, err: # pylint: disable=W0703
415       Log("Jobs failed: %s", err)
416       raise BurninFailure()
417
418     fail = False
419     val = []
420     for (_, name, post_process), (success, result) in zip(jobs, results):
421       if success:
422         if post_process:
423           try:
424             post_process()
425           except Exception, err: # pylint: disable=W0703
426             Log("Post process call for job %s failed: %s", name, err)
427             fail = True
428         val.append(result)
429       else:
430         fail = True
431
432     if fail:
433       raise BurninFailure()
434
435     return val
436
437   def ParseOptions(self):
438     """Parses the command line options.
439
440     In case of command line errors, it will show the usage and exit the
441     program.
442
443     """
444     parser = optparse.OptionParser(usage="\n%s" % USAGE,
445                                    version=("%%prog (ganeti) %s" %
446                                             constants.RELEASE_VERSION),
447                                    option_list=OPTIONS)
448
449     options, args = parser.parse_args()
450     if len(args) < 1 or options.os is None:
451       Usage()
452
453     if options.mem_size:
454       options.maxmem_size = options.mem_size
455       options.minmem_size = options.mem_size
456     elif options.minmem_size > options.maxmem_size:
457       Err("Maximum memory lower than minimum memory")
458
459     supported_disk_templates = (constants.DT_DISKLESS,
460                                 constants.DT_FILE,
461                                 constants.DT_SHARED_FILE,
462                                 constants.DT_PLAIN,
463                                 constants.DT_DRBD8,
464                                 constants.DT_RBD,
465                                 )
466     if options.disk_template not in supported_disk_templates:
467       Err("Unknown disk template '%s'" % options.disk_template)
468
469     if options.disk_template == constants.DT_DISKLESS:
470       disk_size = disk_growth = []
471       options.do_addremove_disks = False
472     else:
473       disk_size = [utils.ParseUnit(v) for v in options.disk_size.split(",")]
474       disk_growth = [utils.ParseUnit(v)
475                      for v in options.disk_growth.split(",")]
476       if len(disk_growth) != len(disk_size):
477         Err("Wrong disk sizes/growth combination")
478     if ((disk_size and options.disk_template == constants.DT_DISKLESS) or
479         (not disk_size and options.disk_template != constants.DT_DISKLESS)):
480       Err("Wrong disk count/disk template combination")
481
482     self.disk_size = disk_size
483     self.disk_growth = disk_growth
484     self.disk_count = len(disk_size)
485
486     if options.nodes and options.iallocator:
487       Err("Give either the nodes option or the iallocator option, not both")
488
489     if options.http_check and not options.name_check:
490       Err("Can't enable HTTP checks without name checks")
491
492     self.opts = options
493     self.instances = args
494     self.bep = {
495       constants.BE_MINMEM: options.minmem_size,
496       constants.BE_MAXMEM: options.maxmem_size,
497       constants.BE_VCPUS: options.vcpu_count,
498       }
499
500     self.hypervisor = None
501     self.hvp = {}
502     if options.hypervisor:
503       self.hypervisor, self.hvp = options.hypervisor
504
505     if options.reboot_types is None:
506       options.reboot_types = constants.REBOOT_TYPES
507     else:
508       options.reboot_types = options.reboot_types.split(",")
509       rt_diff = set(options.reboot_types).difference(constants.REBOOT_TYPES)
510       if rt_diff:
511         Err("Invalid reboot types specified: %s" % utils.CommaJoin(rt_diff))
512
513     socket.setdefaulttimeout(options.net_timeout)
514
515   def GetState(self):
516     """Read the cluster state from the master daemon."""
517     if self.opts.nodes:
518       names = self.opts.nodes.split(",")
519     else:
520       names = []
521     try:
522       op = opcodes.OpNodeQuery(output_fields=["name", "offline", "drained"],
523                                names=names, use_locking=True)
524       result = self.ExecOp(True, op)
525     except errors.GenericError, err:
526       err_code, msg = cli.FormatError(err)
527       Err(msg, exit_code=err_code)
528     self.nodes = [data[0] for data in result if not (data[1] or data[2])]
529
530     op_diagnose = opcodes.OpOsDiagnose(output_fields=["name",
531                                                       "variants",
532                                                       "hidden"],
533                                        names=[])
534     result = self.ExecOp(True, op_diagnose)
535
536     if not result:
537       Err("Can't get the OS list")
538
539     found = False
540     for (name, variants, _) in result:
541       if self.opts.os in cli.CalculateOSNames(name, variants):
542         found = True
543         break
544
545     if not found:
546       Err("OS '%s' not found" % self.opts.os)
547
548     cluster_info = self.cl.QueryClusterInfo()
549     self.cluster_info = cluster_info
550     if not self.cluster_info:
551       Err("Can't get cluster info")
552
553     default_nic_params = self.cluster_info["nicparams"][constants.PP_DEFAULT]
554     self.cluster_default_nicparams = default_nic_params
555     if self.hypervisor is None:
556       self.hypervisor = self.cluster_info["default_hypervisor"]
557     self.hv_class = hypervisor.GetHypervisorClass(self.hypervisor)
558
559   @_DoCheckInstances
560   @_DoBatch(False)
561   def BurnCreateInstances(self):
562     """Create the given instances.
563
564     """
565     self.to_rem = []
566     mytor = izip(cycle(self.nodes),
567                  islice(cycle(self.nodes), 1, None),
568                  self.instances)
569
570     Log("Creating instances")
571     for pnode, snode, instance in mytor:
572       Log("instance %s", instance, indent=1)
573       if self.opts.iallocator:
574         pnode = snode = None
575         msg = "with iallocator %s" % self.opts.iallocator
576       elif self.opts.disk_template not in constants.DTS_INT_MIRROR:
577         snode = None
578         msg = "on %s" % pnode
579       else:
580         msg = "on %s, %s" % (pnode, snode)
581
582       Log(msg, indent=2)
583
584       op = opcodes.OpInstanceCreate(instance_name=instance,
585                                     disks=[{"size": size}
586                                            for size in self.disk_size],
587                                     disk_template=self.opts.disk_template,
588                                     nics=self.opts.nics,
589                                     mode=constants.INSTANCE_CREATE,
590                                     os_type=self.opts.os,
591                                     pnode=pnode,
592                                     snode=snode,
593                                     start=True,
594                                     ip_check=self.opts.ip_check,
595                                     name_check=self.opts.name_check,
596                                     wait_for_sync=True,
597                                     file_driver="loop",
598                                     file_storage_dir=None,
599                                     iallocator=self.opts.iallocator,
600                                     beparams=self.bep,
601                                     hvparams=self.hvp,
602                                     hypervisor=self.hypervisor,
603                                     osparams=self.opts.osparams,
604                                     )
605       remove_instance = lambda name: lambda: self.to_rem.append(name)
606       self.ExecOrQueue(instance, [op], post_process=remove_instance(instance))
607
608   @_DoBatch(False)
609   def BurnModifyRuntimeMemory(self):
610     """Alter the runtime memory."""
611     Log("Setting instance runtime memory")
612     for instance in self.instances:
613       Log("instance %s", instance, indent=1)
614       tgt_mem = self.bep[constants.BE_MINMEM]
615       op = opcodes.OpInstanceSetParams(instance_name=instance,
616                                        runtime_mem=tgt_mem)
617       Log("Set memory to %s MB", tgt_mem, indent=2)
618       self.ExecOrQueue(instance, [op])
619
620   @_DoBatch(False)
621   def BurnGrowDisks(self):
622     """Grow both the os and the swap disks by the requested amount, if any."""
623     Log("Growing disks")
624     for instance in self.instances:
625       Log("instance %s", instance, indent=1)
626       for idx, growth in enumerate(self.disk_growth):
627         if growth > 0:
628           op = opcodes.OpInstanceGrowDisk(instance_name=instance, disk=idx,
629                                           amount=growth, wait_for_sync=True)
630           Log("increase disk/%s by %s MB", idx, growth, indent=2)
631           self.ExecOrQueue(instance, [op])
632
633   @_DoBatch(True)
634   def BurnReplaceDisks1D8(self):
635     """Replace disks on primary and secondary for drbd8."""
636     Log("Replacing disks on the same nodes")
637     early_release = self.opts.early_release
638     for instance in self.instances:
639       Log("instance %s", instance, indent=1)
640       ops = []
641       for mode in constants.REPLACE_DISK_SEC, constants.REPLACE_DISK_PRI:
642         op = opcodes.OpInstanceReplaceDisks(instance_name=instance,
643                                             mode=mode,
644                                             disks=list(range(self.disk_count)),
645                                             early_release=early_release)
646         Log("run %s", mode, indent=2)
647         ops.append(op)
648       self.ExecOrQueue(instance, ops)
649
650   @_DoBatch(True)
651   def BurnReplaceDisks2(self):
652     """Replace secondary node."""
653     Log("Changing the secondary node")
654     mode = constants.REPLACE_DISK_CHG
655
656     mytor = izip(islice(cycle(self.nodes), 2, None),
657                  self.instances)
658     for tnode, instance in mytor:
659       Log("instance %s", instance, indent=1)
660       if self.opts.iallocator:
661         tnode = None
662         msg = "with iallocator %s" % self.opts.iallocator
663       else:
664         msg = tnode
665       op = opcodes.OpInstanceReplaceDisks(instance_name=instance,
666                                           mode=mode,
667                                           remote_node=tnode,
668                                           iallocator=self.opts.iallocator,
669                                           disks=[],
670                                           early_release=self.opts.early_release)
671       Log("run %s %s", mode, msg, indent=2)
672       self.ExecOrQueue(instance, [op])
673
674   @_DoCheckInstances
675   @_DoBatch(False)
676   def BurnFailover(self):
677     """Failover the instances."""
678     Log("Failing over instances")
679     for instance in self.instances:
680       Log("instance %s", instance, indent=1)
681       op = opcodes.OpInstanceFailover(instance_name=instance,
682                                       ignore_consistency=False)
683       self.ExecOrQueue(instance, [op])
684
685   @_DoCheckInstances
686   @_DoBatch(False)
687   def BurnMove(self):
688     """Move the instances."""
689     Log("Moving instances")
690     mytor = izip(islice(cycle(self.nodes), 1, None),
691                  self.instances)
692     for tnode, instance in mytor:
693       Log("instance %s", instance, indent=1)
694       op = opcodes.OpInstanceMove(instance_name=instance,
695                                   target_node=tnode)
696       self.ExecOrQueue(instance, [op])
697
698   @_DoBatch(False)
699   def BurnMigrate(self):
700     """Migrate the instances."""
701     Log("Migrating instances")
702     for instance in self.instances:
703       Log("instance %s", instance, indent=1)
704       op1 = opcodes.OpInstanceMigrate(instance_name=instance, mode=None,
705                                       cleanup=False)
706
707       op2 = opcodes.OpInstanceMigrate(instance_name=instance, mode=None,
708                                       cleanup=True)
709       Log("migration and migration cleanup", indent=2)
710       self.ExecOrQueue(instance, [op1, op2])
711
712   @_DoCheckInstances
713   @_DoBatch(False)
714   def BurnImportExport(self):
715     """Export the instance, delete it, and import it back.
716
717     """
718     Log("Exporting and re-importing instances")
719     mytor = izip(cycle(self.nodes),
720                  islice(cycle(self.nodes), 1, None),
721                  islice(cycle(self.nodes), 2, None),
722                  self.instances)
723
724     for pnode, snode, enode, instance in mytor:
725       Log("instance %s", instance, indent=1)
726       # read the full name of the instance
727       nam_op = opcodes.OpInstanceQuery(output_fields=["name"],
728                                        names=[instance], use_locking=True)
729       full_name = self.ExecOp(False, nam_op)[0][0]
730
731       if self.opts.iallocator:
732         pnode = snode = None
733         import_log_msg = ("import from %s"
734                           " with iallocator %s" %
735                           (enode, self.opts.iallocator))
736       elif self.opts.disk_template not in constants.DTS_INT_MIRROR:
737         snode = None
738         import_log_msg = ("import from %s to %s" %
739                           (enode, pnode))
740       else:
741         import_log_msg = ("import from %s to %s, %s" %
742                           (enode, pnode, snode))
743
744       exp_op = opcodes.OpBackupExport(instance_name=instance,
745                                       target_node=enode,
746                                       mode=constants.EXPORT_MODE_LOCAL,
747                                       shutdown=True)
748       rem_op = opcodes.OpInstanceRemove(instance_name=instance,
749                                         ignore_failures=True)
750       imp_dir = utils.PathJoin(constants.EXPORT_DIR, full_name)
751       imp_op = opcodes.OpInstanceCreate(instance_name=instance,
752                                         disks=[{"size": size}
753                                                for size in self.disk_size],
754                                         disk_template=self.opts.disk_template,
755                                         nics=self.opts.nics,
756                                         mode=constants.INSTANCE_IMPORT,
757                                         src_node=enode,
758                                         src_path=imp_dir,
759                                         pnode=pnode,
760                                         snode=snode,
761                                         start=True,
762                                         ip_check=self.opts.ip_check,
763                                         name_check=self.opts.name_check,
764                                         wait_for_sync=True,
765                                         file_storage_dir=None,
766                                         file_driver="loop",
767                                         iallocator=self.opts.iallocator,
768                                         beparams=self.bep,
769                                         hvparams=self.hvp,
770                                         osparams=self.opts.osparams,
771                                         )
772
773       erem_op = opcodes.OpBackupRemove(instance_name=instance)
774
775       Log("export to node %s", enode, indent=2)
776       Log("remove instance", indent=2)
777       Log(import_log_msg, indent=2)
778       Log("remove export", indent=2)
779       self.ExecOrQueue(instance, [exp_op, rem_op, imp_op, erem_op])
780
781   @staticmethod
782   def StopInstanceOp(instance):
783     """Stop given instance."""
784     return opcodes.OpInstanceShutdown(instance_name=instance)
785
786   @staticmethod
787   def StartInstanceOp(instance):
788     """Start given instance."""
789     return opcodes.OpInstanceStartup(instance_name=instance, force=False)
790
791   @staticmethod
792   def RenameInstanceOp(instance, instance_new):
793     """Rename instance."""
794     return opcodes.OpInstanceRename(instance_name=instance,
795                                     new_name=instance_new)
796
797   @_DoCheckInstances
798   @_DoBatch(True)
799   def BurnStopStart(self):
800     """Stop/start the instances."""
801     Log("Stopping and starting instances")
802     for instance in self.instances:
803       Log("instance %s", instance, indent=1)
804       op1 = self.StopInstanceOp(instance)
805       op2 = self.StartInstanceOp(instance)
806       self.ExecOrQueue(instance, [op1, op2])
807
808   @_DoBatch(False)
809   def BurnRemove(self):
810     """Remove the instances."""
811     Log("Removing instances")
812     for instance in self.to_rem:
813       Log("instance %s", instance, indent=1)
814       op = opcodes.OpInstanceRemove(instance_name=instance,
815                                     ignore_failures=True)
816       self.ExecOrQueue(instance, [op])
817
818   def BurnRename(self):
819     """Rename the instances.
820
821     Note that this function will not execute in parallel, since we
822     only have one target for rename.
823
824     """
825     Log("Renaming instances")
826     rename = self.opts.rename
827     for instance in self.instances:
828       Log("instance %s", instance, indent=1)
829       op_stop1 = self.StopInstanceOp(instance)
830       op_stop2 = self.StopInstanceOp(rename)
831       op_rename1 = self.RenameInstanceOp(instance, rename)
832       op_rename2 = self.RenameInstanceOp(rename, instance)
833       op_start1 = self.StartInstanceOp(rename)
834       op_start2 = self.StartInstanceOp(instance)
835       self.ExecOp(False, op_stop1, op_rename1, op_start1)
836       self._CheckInstanceAlive(rename)
837       self.ExecOp(False, op_stop2, op_rename2, op_start2)
838       self._CheckInstanceAlive(instance)
839
840   @_DoCheckInstances
841   @_DoBatch(True)
842   def BurnReinstall(self):
843     """Reinstall the instances."""
844     Log("Reinstalling instances")
845     for instance in self.instances:
846       Log("instance %s", instance, indent=1)
847       op1 = self.StopInstanceOp(instance)
848       op2 = opcodes.OpInstanceReinstall(instance_name=instance)
849       Log("reinstall without passing the OS", indent=2)
850       op3 = opcodes.OpInstanceReinstall(instance_name=instance,
851                                         os_type=self.opts.os)
852       Log("reinstall specifying the OS", indent=2)
853       op4 = self.StartInstanceOp(instance)
854       self.ExecOrQueue(instance, [op1, op2, op3, op4])
855
856   @_DoCheckInstances
857   @_DoBatch(True)
858   def BurnReboot(self):
859     """Reboot the instances."""
860     Log("Rebooting instances")
861     for instance in self.instances:
862       Log("instance %s", instance, indent=1)
863       ops = []
864       for reboot_type in self.opts.reboot_types:
865         op = opcodes.OpInstanceReboot(instance_name=instance,
866                                       reboot_type=reboot_type,
867                                       ignore_secondaries=False)
868         Log("reboot with type '%s'", reboot_type, indent=2)
869         ops.append(op)
870       self.ExecOrQueue(instance, ops)
871
872   @_DoCheckInstances
873   @_DoBatch(True)
874   def BurnActivateDisks(self):
875     """Activate and deactivate disks of the instances."""
876     Log("Activating/deactivating disks")
877     for instance in self.instances:
878       Log("instance %s", instance, indent=1)
879       op_start = self.StartInstanceOp(instance)
880       op_act = opcodes.OpInstanceActivateDisks(instance_name=instance)
881       op_deact = opcodes.OpInstanceDeactivateDisks(instance_name=instance)
882       op_stop = self.StopInstanceOp(instance)
883       Log("activate disks when online", indent=2)
884       Log("activate disks when offline", indent=2)
885       Log("deactivate disks (when offline)", indent=2)
886       self.ExecOrQueue(instance, [op_act, op_stop, op_act, op_deact, op_start])
887
888   @_DoCheckInstances
889   @_DoBatch(False)
890   def BurnAddRemoveDisks(self):
891     """Add and remove an extra disk for the instances."""
892     Log("Adding and removing disks")
893     for instance in self.instances:
894       Log("instance %s", instance, indent=1)
895       op_add = opcodes.OpInstanceSetParams(\
896         instance_name=instance,
897         disks=[(constants.DDM_ADD, {"size": self.disk_size[0]})])
898       op_rem = opcodes.OpInstanceSetParams(\
899         instance_name=instance, disks=[(constants.DDM_REMOVE, {})])
900       op_stop = self.StopInstanceOp(instance)
901       op_start = self.StartInstanceOp(instance)
902       Log("adding a disk", indent=2)
903       Log("removing last disk", indent=2)
904       self.ExecOrQueue(instance, [op_add, op_stop, op_rem, op_start])
905
906   @_DoBatch(False)
907   def BurnAddRemoveNICs(self):
908     """Add and remove an extra NIC for the instances."""
909     Log("Adding and removing NICs")
910     for instance in self.instances:
911       Log("instance %s", instance, indent=1)
912       op_add = opcodes.OpInstanceSetParams(\
913         instance_name=instance, nics=[(constants.DDM_ADD, {})])
914       op_rem = opcodes.OpInstanceSetParams(\
915         instance_name=instance, nics=[(constants.DDM_REMOVE, {})])
916       Log("adding a NIC", indent=2)
917       Log("removing last NIC", indent=2)
918       self.ExecOrQueue(instance, [op_add, op_rem])
919
920   def ConfdCallback(self, reply):
921     """Callback for confd queries"""
922     if reply.type == confd_client.UPCALL_REPLY:
923       if reply.server_reply.status != constants.CONFD_REPL_STATUS_OK:
924         Err("Query %s gave non-ok status %s: %s" % (reply.orig_request,
925                                                     reply.server_reply.status,
926                                                     reply.server_reply))
927       if reply.orig_request.type == constants.CONFD_REQ_PING:
928         Log("Ping: OK", indent=1)
929       elif reply.orig_request.type == constants.CONFD_REQ_CLUSTER_MASTER:
930         if reply.server_reply.answer == self.cluster_info["master"]:
931           Log("Master: OK", indent=1)
932         else:
933           Err("Master: wrong: %s" % reply.server_reply.answer)
934       elif reply.orig_request.type == constants.CONFD_REQ_NODE_ROLE_BYNAME:
935         if reply.server_reply.answer == constants.CONFD_NODE_ROLE_MASTER:
936           Log("Node role for master: OK", indent=1)
937         else:
938           Err("Node role for master: wrong: %s" % reply.server_reply.answer)
939
940   def DoConfdRequestReply(self, req):
941     self.confd_counting_callback.RegisterQuery(req.rsalt)
942     self.confd_client.SendRequest(req, async=False)
943     while not self.confd_counting_callback.AllAnswered():
944       if not self.confd_client.ReceiveReply():
945         Err("Did not receive all expected confd replies")
946         break
947
948   def BurnConfd(self):
949     """Run confd queries for our instances.
950
951     The following confd queries are tested:
952       - CONFD_REQ_PING: simple ping
953       - CONFD_REQ_CLUSTER_MASTER: cluster master
954       - CONFD_REQ_NODE_ROLE_BYNAME: node role, for the master
955
956     """
957     Log("Checking confd results")
958
959     filter_callback = confd_client.ConfdFilterCallback(self.ConfdCallback)
960     counting_callback = confd_client.ConfdCountingCallback(filter_callback)
961     self.confd_counting_callback = counting_callback
962
963     self.confd_client = confd_client.GetConfdClient(counting_callback)
964
965     req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
966     self.DoConfdRequestReply(req)
967
968     req = confd_client.ConfdClientRequest(
969       type=constants.CONFD_REQ_CLUSTER_MASTER)
970     self.DoConfdRequestReply(req)
971
972     req = confd_client.ConfdClientRequest(
973         type=constants.CONFD_REQ_NODE_ROLE_BYNAME,
974         query=self.cluster_info["master"])
975     self.DoConfdRequestReply(req)
976
977   def _CheckInstanceAlive(self, instance):
978     """Check if an instance is alive by doing http checks.
979
980     This will try to retrieve the url on the instance /hostname.txt
981     and check that it contains the hostname of the instance. In case
982     we get ECONNREFUSED, we retry up to the net timeout seconds, for
983     any other error we abort.
984
985     """
986     if not self.opts.http_check:
987       return
988     end_time = time.time() + self.opts.net_timeout
989     url = None
990     while time.time() < end_time and url is None:
991       try:
992         url = self.url_opener.open("http://%s/hostname.txt" % instance)
993       except IOError:
994         # here we can have connection refused, no route to host, etc.
995         time.sleep(1)
996     if url is None:
997       raise InstanceDown(instance, "Cannot contact instance")
998     hostname = url.read().strip()
999     url.close()
1000     if hostname != instance:
1001       raise InstanceDown(instance, ("Hostname mismatch, expected %s, got %s" %
1002                                     (instance, hostname)))
1003
1004   def BurninCluster(self):
1005     """Test a cluster intensively.
1006
1007     This will create instances and then start/stop/failover them.
1008     It is safe for existing instances but could impact performance.
1009
1010     """
1011
1012     opts = self.opts
1013
1014     Log("Testing global parameters")
1015
1016     if (len(self.nodes) == 1 and
1017         opts.disk_template not in (constants.DT_DISKLESS, constants.DT_PLAIN,
1018                                    constants.DT_FILE,
1019                                    constants.DT_SHARED_FILE)):
1020       Err("When one node is available/selected the disk template must"
1021           " be 'diskless', 'file' or 'plain'")
1022
1023     if opts.do_confd_tests and not constants.ENABLE_CONFD:
1024       Err("You selected confd tests but confd was disabled at configure time")
1025
1026     has_err = True
1027     try:
1028       self.BurnCreateInstances()
1029
1030       if self.bep[constants.BE_MINMEM] < self.bep[constants.BE_MAXMEM]:
1031         self.BurnModifyRuntimeMemory()
1032
1033       if opts.do_replace1 and opts.disk_template in constants.DTS_INT_MIRROR:
1034         self.BurnReplaceDisks1D8()
1035       if (opts.do_replace2 and len(self.nodes) > 2 and
1036           opts.disk_template in constants.DTS_INT_MIRROR):
1037         self.BurnReplaceDisks2()
1038
1039       if (opts.disk_template in constants.DTS_GROWABLE and
1040           compat.any(n > 0 for n in self.disk_growth)):
1041         self.BurnGrowDisks()
1042
1043       if opts.do_failover and opts.disk_template in constants.DTS_MIRRORED:
1044         self.BurnFailover()
1045
1046       if opts.do_migrate:
1047         if opts.disk_template not in constants.DTS_MIRRORED:
1048           Log("Skipping migration (disk template %s does not support it)",
1049               opts.disk_template)
1050         elif not self.hv_class.CAN_MIGRATE:
1051           Log("Skipping migration (hypervisor %s does not support it)",
1052               self.hypervisor)
1053         else:
1054           self.BurnMigrate()
1055
1056       if (opts.do_move and len(self.nodes) > 1 and
1057           opts.disk_template in [constants.DT_PLAIN, constants.DT_FILE]):
1058         self.BurnMove()
1059
1060       if (opts.do_importexport and
1061           opts.disk_template not in (constants.DT_DISKLESS,
1062                                      constants.DT_SHARED_FILE,
1063                                      constants.DT_FILE)):
1064         self.BurnImportExport()
1065
1066       if opts.do_reinstall:
1067         self.BurnReinstall()
1068
1069       if opts.do_reboot:
1070         self.BurnReboot()
1071
1072       if opts.do_addremove_disks:
1073         self.BurnAddRemoveDisks()
1074
1075       default_nic_mode = self.cluster_default_nicparams[constants.NIC_MODE]
1076       # Don't add/remove nics in routed mode, as we would need an ip to add
1077       # them with
1078       if opts.do_addremove_nics:
1079         if default_nic_mode == constants.NIC_MODE_BRIDGED:
1080           self.BurnAddRemoveNICs()
1081         else:
1082           Log("Skipping nic add/remove as the cluster is not in bridged mode")
1083
1084       if opts.do_activate_disks:
1085         self.BurnActivateDisks()
1086
1087       if opts.rename:
1088         self.BurnRename()
1089
1090       if opts.do_confd_tests:
1091         self.BurnConfd()
1092
1093       if opts.do_startstop:
1094         self.BurnStopStart()
1095
1096       has_err = False
1097     finally:
1098       if has_err:
1099         Log("Error detected: opcode buffer follows:\n\n")
1100         Log(self.GetFeedbackBuf())
1101         Log("\n\n")
1102       if not self.opts.keep_instances:
1103         try:
1104           self.BurnRemove()
1105         except Exception, err:  # pylint: disable=W0703
1106           if has_err: # already detected errors, so errors in removal
1107                       # are quite expected
1108             Log("Note: error detected during instance remove: %s", err)
1109           else: # non-expected error
1110             raise
1111
1112     return constants.EXIT_SUCCESS
1113
1114
1115 def main():
1116   """Main function.
1117
1118   """
1119   utils.SetupLogging(constants.LOG_BURNIN, sys.argv[0],
1120                      debug=False, stderr_logging=True)
1121
1122   return Burner().BurninCluster()
1123
1124
1125 if __name__ == "__main__":
1126   main()