Version bump for 2.8.4 and NEWS update
[ganeti-local] / lib / tools / burnin.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Burnin program
23
24 """
25
26 import sys
27 import optparse
28 import time
29 import socket
30 import urllib
31 from itertools import izip, islice, cycle
32 from cStringIO import StringIO
33
34 from ganeti import opcodes
35 from ganeti import constants
36 from ganeti import cli
37 from ganeti import errors
38 from ganeti import utils
39 from ganeti import hypervisor
40 from ganeti import compat
41 from ganeti import pathutils
42
43 from ganeti.confd import client as confd_client
44
45
46 USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...")
47
48 MAX_RETRIES = 3
49 LOG_HEADERS = {
50   0: "- ",
51   1: "* ",
52   2: "",
53   }
54
55 #: Disk templates supporting a single node
56 _SINGLE_NODE_DISK_TEMPLATES = compat.UniqueFrozenset([
57   constants.DT_DISKLESS,
58   constants.DT_PLAIN,
59   constants.DT_FILE,
60   constants.DT_SHARED_FILE,
61   constants.DT_EXT,
62   constants.DT_RBD,
63   ])
64
65 _SUPPORTED_DISK_TEMPLATES = compat.UniqueFrozenset([
66   constants.DT_DISKLESS,
67   constants.DT_DRBD8,
68   constants.DT_EXT,
69   constants.DT_FILE,
70   constants.DT_PLAIN,
71   constants.DT_RBD,
72   constants.DT_SHARED_FILE,
73   ])
74
75 #: Disk templates for which import/export is tested
76 _IMPEXP_DISK_TEMPLATES = (_SUPPORTED_DISK_TEMPLATES - frozenset([
77   constants.DT_DISKLESS,
78   constants.DT_FILE,
79   constants.DT_SHARED_FILE,
80   ]))
81
82
83 class InstanceDown(Exception):
84   """The checked instance was not up"""
85
86
87 class BurninFailure(Exception):
88   """Failure detected during burning"""
89
90
91 def Usage():
92   """Shows program usage information and exits the program."""
93
94   print >> sys.stderr, "Usage:"
95   print >> sys.stderr, USAGE
96   sys.exit(2)
97
98
99 def Log(msg, *args, **kwargs):
100   """Simple function that prints out its argument.
101
102   """
103   if args:
104     msg = msg % args
105   indent = kwargs.get("indent", 0)
106   sys.stdout.write("%*s%s%s\n" % (2 * indent, "",
107                                   LOG_HEADERS.get(indent, "  "), msg))
108   sys.stdout.flush()
109
110
111 def Err(msg, exit_code=1):
112   """Simple error logging that prints to stderr.
113
114   """
115   sys.stderr.write(msg + "\n")
116   sys.stderr.flush()
117   sys.exit(exit_code)
118
119
120 class SimpleOpener(urllib.FancyURLopener):
121   """A simple url opener"""
122   # pylint: disable=W0221
123
124   def prompt_user_passwd(self, host, realm, clear_cache=0):
125     """No-interaction version of prompt_user_passwd."""
126     # we follow parent class' API
127     # pylint: disable=W0613
128     return None, None
129
130   def http_error_default(self, url, fp, errcode, errmsg, headers):
131     """Custom error handling"""
132     # make sure sockets are not left in CLOSE_WAIT, this is similar
133     # but with a different exception to the BasicURLOpener class
134     _ = fp.read() # throw away data
135     fp.close()
136     raise InstanceDown("HTTP error returned: code %s, msg %s" %
137                        (errcode, errmsg))
138
139
140 OPTIONS = [
141   cli.cli_option("-o", "--os", dest="os", default=None,
142                  help="OS to use during burnin",
143                  metavar="<OS>",
144                  completion_suggest=cli.OPT_COMPL_ONE_OS),
145   cli.HYPERVISOR_OPT,
146   cli.OSPARAMS_OPT,
147   cli.cli_option("--disk-size", dest="disk_size",
148                  help="Disk size (determines disk count)",
149                  default="128m", type="string", metavar="<size,size,...>",
150                  completion_suggest=("128M 512M 1G 4G 1G,256M"
151                                      " 4G,1G,1G 10G").split()),
152   cli.cli_option("--disk-growth", dest="disk_growth", help="Disk growth",
153                  default="128m", type="string", metavar="<size,size,...>"),
154   cli.cli_option("--mem-size", dest="mem_size", help="Memory size",
155                  default=None, type="unit", metavar="<size>",
156                  completion_suggest=("128M 256M 512M 1G 4G 8G"
157                                      " 12G 16G").split()),
158   cli.cli_option("--maxmem-size", dest="maxmem_size", help="Max Memory size",
159                  default=256, type="unit", metavar="<size>",
160                  completion_suggest=("128M 256M 512M 1G 4G 8G"
161                                      " 12G 16G").split()),
162   cli.cli_option("--minmem-size", dest="minmem_size", help="Min Memory size",
163                  default=128, type="unit", metavar="<size>",
164                  completion_suggest=("128M 256M 512M 1G 4G 8G"
165                                      " 12G 16G").split()),
166   cli.cli_option("--vcpu-count", dest="vcpu_count", help="VCPU count",
167                  default=3, type="unit", metavar="<count>",
168                  completion_suggest=("1 2 3 4").split()),
169   cli.DEBUG_OPT,
170   cli.VERBOSE_OPT,
171   cli.NOIPCHECK_OPT,
172   cli.NONAMECHECK_OPT,
173   cli.EARLY_RELEASE_OPT,
174   cli.cli_option("--no-replace1", dest="do_replace1",
175                  help="Skip disk replacement with the same secondary",
176                  action="store_false", default=True),
177   cli.cli_option("--no-replace2", dest="do_replace2",
178                  help="Skip disk replacement with a different secondary",
179                  action="store_false", default=True),
180   cli.cli_option("--no-failover", dest="do_failover",
181                  help="Skip instance failovers", action="store_false",
182                  default=True),
183   cli.cli_option("--no-migrate", dest="do_migrate",
184                  help="Skip instance live migration",
185                  action="store_false", default=True),
186   cli.cli_option("--no-move", dest="do_move",
187                  help="Skip instance moves", action="store_false",
188                  default=True),
189   cli.cli_option("--no-importexport", dest="do_importexport",
190                  help="Skip instance export/import", action="store_false",
191                  default=True),
192   cli.cli_option("--no-startstop", dest="do_startstop",
193                  help="Skip instance stop/start", action="store_false",
194                  default=True),
195   cli.cli_option("--no-reinstall", dest="do_reinstall",
196                  help="Skip instance reinstall", action="store_false",
197                  default=True),
198   cli.cli_option("--no-reboot", dest="do_reboot",
199                  help="Skip instance reboot", action="store_false",
200                  default=True),
201   cli.cli_option("--no-renamesame", dest="do_renamesame",
202                  help="Skip instance rename to same name", action="store_false",
203                  default=True),
204   cli.cli_option("--reboot-types", dest="reboot_types",
205                  help="Specify the reboot types", default=None),
206   cli.cli_option("--no-activate-disks", dest="do_activate_disks",
207                  help="Skip disk activation/deactivation",
208                  action="store_false", default=True),
209   cli.cli_option("--no-add-disks", dest="do_addremove_disks",
210                  help="Skip disk addition/removal",
211                  action="store_false", default=True),
212   cli.cli_option("--no-add-nics", dest="do_addremove_nics",
213                  help="Skip NIC addition/removal",
214                  action="store_false", default=True),
215   cli.cli_option("--no-nics", dest="nics",
216                  help="No network interfaces", action="store_const",
217                  const=[], default=[{}]),
218   cli.cli_option("--no-confd", dest="do_confd_tests",
219                  help="Skip confd queries",
220                  action="store_false", default=constants.ENABLE_CONFD),
221   cli.cli_option("--rename", dest="rename", default=None,
222                  help=("Give one unused instance name which is taken"
223                        " to start the renaming sequence"),
224                  metavar="<instance_name>"),
225   cli.cli_option("-t", "--disk-template", dest="disk_template",
226                  choices=list(_SUPPORTED_DISK_TEMPLATES),
227                  default=constants.DT_DRBD8,
228                  help=("Disk template (default %s, otherwise one of %s)" %
229                        (constants.DT_DRBD8,
230                         utils.CommaJoin(_SUPPORTED_DISK_TEMPLATES)))),
231   cli.cli_option("-n", "--nodes", dest="nodes", default="",
232                  help=("Comma separated list of nodes to perform"
233                        " the burnin on (defaults to all nodes)"),
234                  completion_suggest=cli.OPT_COMPL_MANY_NODES),
235   cli.cli_option("-I", "--iallocator", dest="iallocator",
236                  default=None, type="string",
237                  help=("Perform the allocation using an iallocator"
238                        " instead of fixed node spread (node restrictions no"
239                        " longer apply, therefore -n/--nodes must not be"
240                        " used"),
241                  completion_suggest=cli.OPT_COMPL_ONE_IALLOCATOR),
242   cli.cli_option("-p", "--parallel", default=False, action="store_true",
243                  dest="parallel",
244                  help=("Enable parallelization of some operations in"
245                        " order to speed burnin or to test granular locking")),
246   cli.cli_option("--net-timeout", default=15, type="int",
247                  dest="net_timeout",
248                  help=("The instance check network timeout in seconds"
249                        " (defaults to 15 seconds)"),
250                  completion_suggest="15 60 300 900".split()),
251   cli.cli_option("-C", "--http-check", default=False, action="store_true",
252                  dest="http_check",
253                  help=("Enable checking of instance status via http,"
254                        " looking for /hostname.txt that should contain the"
255                        " name of the instance")),
256   cli.cli_option("-K", "--keep-instances", default=False,
257                  action="store_true",
258                  dest="keep_instances",
259                  help=("Leave instances on the cluster after burnin,"
260                        " for investigation in case of errors or simply"
261                        " to use them")),
262   cli.REASON_OPT,
263   ]
264
265 # Mainly used for bash completion
266 ARGUMENTS = [cli.ArgInstance(min=1)]
267
268
269 def _DoCheckInstances(fn):
270   """Decorator for checking instances.
271
272   """
273   def wrapper(self, *args, **kwargs):
274     val = fn(self, *args, **kwargs)
275     for instance in self.instances:
276       self._CheckInstanceAlive(instance) # pylint: disable=W0212
277     return val
278
279   return wrapper
280
281
282 def _DoBatch(retry):
283   """Decorator for possible batch operations.
284
285   Must come after the _DoCheckInstances decorator (if any).
286
287   @param retry: whether this is a retryable batch, will be
288       passed to StartBatch
289
290   """
291   def wrap(fn):
292     def batched(self, *args, **kwargs):
293       self.StartBatch(retry)
294       val = fn(self, *args, **kwargs)
295       self.CommitQueue()
296       return val
297     return batched
298
299   return wrap
300
301
302 class Burner(object):
303   """Burner class."""
304
305   def __init__(self):
306     """Constructor."""
307     self.url_opener = SimpleOpener()
308     self._feed_buf = StringIO()
309     self.nodes = []
310     self.instances = []
311     self.to_rem = []
312     self.queued_ops = []
313     self.opts = None
314     self.queue_retry = False
315     self.disk_count = self.disk_growth = self.disk_size = None
316     self.hvp = self.bep = None
317     self.ParseOptions()
318     self.cl = cli.GetClient()
319     self.GetState()
320
321   def ClearFeedbackBuf(self):
322     """Clear the feedback buffer."""
323     self._feed_buf.truncate(0)
324
325   def GetFeedbackBuf(self):
326     """Return the contents of the buffer."""
327     return self._feed_buf.getvalue()
328
329   def Feedback(self, msg):
330     """Acumulate feedback in our buffer."""
331     formatted_msg = "%s %s" % (time.ctime(utils.MergeTime(msg[0])), msg[2])
332     self._feed_buf.write(formatted_msg + "\n")
333     if self.opts.verbose:
334       Log(formatted_msg, indent=3)
335
336   def MaybeRetry(self, retry_count, msg, fn, *args):
337     """Possibly retry a given function execution.
338
339     @type retry_count: int
340     @param retry_count: retry counter:
341         - 0: non-retryable action
342         - 1: last retry for a retryable action
343         - MAX_RETRIES: original try for a retryable action
344     @type msg: str
345     @param msg: the kind of the operation
346     @type fn: callable
347     @param fn: the function to be called
348
349     """
350     try:
351       val = fn(*args)
352       if retry_count > 0 and retry_count < MAX_RETRIES:
353         Log("Idempotent %s succeeded after %d retries",
354             msg, MAX_RETRIES - retry_count)
355       return val
356     except Exception, err: # pylint: disable=W0703
357       if retry_count == 0:
358         Log("Non-idempotent %s failed, aborting", msg)
359         raise
360       elif retry_count == 1:
361         Log("Idempotent %s repeated failure, aborting", msg)
362         raise
363       else:
364         Log("Idempotent %s failed, retry #%d/%d: %s",
365             msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err)
366         self.MaybeRetry(retry_count - 1, msg, fn, *args)
367
368   def _ExecOp(self, *ops):
369     """Execute one or more opcodes and manage the exec buffer.
370
371     @return: if only opcode has been passed, we return its result;
372         otherwise we return the list of results
373
374     """
375     job_id = cli.SendJob(ops, cl=self.cl)
376     results = cli.PollJob(job_id, cl=self.cl, feedback_fn=self.Feedback)
377     if len(ops) == 1:
378       return results[0]
379     else:
380       return results
381
382   def ExecOp(self, retry, *ops):
383     """Execute one or more opcodes and manage the exec buffer.
384
385     @return: if only opcode has been passed, we return its result;
386         otherwise we return the list of results
387
388     """
389     if retry:
390       rval = MAX_RETRIES
391     else:
392       rval = 0
393     cli.SetGenericOpcodeOpts(ops, self.opts)
394     return self.MaybeRetry(rval, "opcode", self._ExecOp, *ops)
395
396   def ExecOrQueue(self, name, ops, post_process=None):
397     """Execute an opcode and manage the exec buffer."""
398     if self.opts.parallel:
399       cli.SetGenericOpcodeOpts(ops, self.opts)
400       self.queued_ops.append((ops, name, post_process))
401     else:
402       val = self.ExecOp(self.queue_retry, *ops) # pylint: disable=W0142
403       if post_process is not None:
404         post_process()
405       return val
406
407   def StartBatch(self, retry):
408     """Start a new batch of jobs.
409
410     @param retry: whether this is a retryable batch
411
412     """
413     self.queued_ops = []
414     self.queue_retry = retry
415
416   def CommitQueue(self):
417     """Execute all submitted opcodes in case of parallel burnin"""
418     if not self.opts.parallel or not self.queued_ops:
419       return
420
421     if self.queue_retry:
422       rval = MAX_RETRIES
423     else:
424       rval = 0
425
426     try:
427       results = self.MaybeRetry(rval, "jobset", self.ExecJobSet,
428                                 self.queued_ops)
429     finally:
430       self.queued_ops = []
431     return results
432
433   def ExecJobSet(self, jobs):
434     """Execute a set of jobs and return once all are done.
435
436     The method will return the list of results, if all jobs are
437     successful. Otherwise, OpExecError will be raised from within
438     cli.py.
439
440     """
441     self.ClearFeedbackBuf()
442     jex = cli.JobExecutor(cl=self.cl, feedback_fn=self.Feedback)
443     for ops, name, _ in jobs:
444       jex.QueueJob(name, *ops) # pylint: disable=W0142
445     try:
446       results = jex.GetResults()
447     except Exception, err: # pylint: disable=W0703
448       Log("Jobs failed: %s", err)
449       raise BurninFailure()
450
451     fail = False
452     val = []
453     for (_, name, post_process), (success, result) in zip(jobs, results):
454       if success:
455         if post_process:
456           try:
457             post_process()
458           except Exception, err: # pylint: disable=W0703
459             Log("Post process call for job %s failed: %s", name, err)
460             fail = True
461         val.append(result)
462       else:
463         fail = True
464
465     if fail:
466       raise BurninFailure()
467
468     return val
469
470   def ParseOptions(self):
471     """Parses the command line options.
472
473     In case of command line errors, it will show the usage and exit the
474     program.
475
476     """
477     parser = optparse.OptionParser(usage="\n%s" % USAGE,
478                                    version=("%%prog (ganeti) %s" %
479                                             constants.RELEASE_VERSION),
480                                    option_list=OPTIONS)
481
482     options, args = parser.parse_args()
483     if len(args) < 1 or options.os is None:
484       Usage()
485
486     if options.mem_size:
487       options.maxmem_size = options.mem_size
488       options.minmem_size = options.mem_size
489     elif options.minmem_size > options.maxmem_size:
490       Err("Maximum memory lower than minimum memory")
491
492     if options.disk_template not in _SUPPORTED_DISK_TEMPLATES:
493       Err("Unknown or unsupported disk template '%s'" % options.disk_template)
494
495     if options.disk_template == constants.DT_DISKLESS:
496       disk_size = disk_growth = []
497       options.do_addremove_disks = False
498     else:
499       disk_size = [utils.ParseUnit(v) for v in options.disk_size.split(",")]
500       disk_growth = [utils.ParseUnit(v)
501                      for v in options.disk_growth.split(",")]
502       if len(disk_growth) != len(disk_size):
503         Err("Wrong disk sizes/growth combination")
504     if ((disk_size and options.disk_template == constants.DT_DISKLESS) or
505         (not disk_size and options.disk_template != constants.DT_DISKLESS)):
506       Err("Wrong disk count/disk template combination")
507
508     self.disk_size = disk_size
509     self.disk_growth = disk_growth
510     self.disk_count = len(disk_size)
511
512     if options.nodes and options.iallocator:
513       Err("Give either the nodes option or the iallocator option, not both")
514
515     if options.http_check and not options.name_check:
516       Err("Can't enable HTTP checks without name checks")
517
518     self.opts = options
519     self.instances = args
520     self.bep = {
521       constants.BE_MINMEM: options.minmem_size,
522       constants.BE_MAXMEM: options.maxmem_size,
523       constants.BE_VCPUS: options.vcpu_count,
524       }
525
526     self.hypervisor = None
527     self.hvp = {}
528     if options.hypervisor:
529       self.hypervisor, self.hvp = options.hypervisor
530
531     if options.reboot_types is None:
532       options.reboot_types = constants.REBOOT_TYPES
533     else:
534       options.reboot_types = options.reboot_types.split(",")
535       rt_diff = set(options.reboot_types).difference(constants.REBOOT_TYPES)
536       if rt_diff:
537         Err("Invalid reboot types specified: %s" % utils.CommaJoin(rt_diff))
538
539     socket.setdefaulttimeout(options.net_timeout)
540
541   def GetState(self):
542     """Read the cluster state from the master daemon."""
543     if self.opts.nodes:
544       names = self.opts.nodes.split(",")
545     else:
546       names = []
547     try:
548       op = opcodes.OpNodeQuery(output_fields=["name", "offline", "drained"],
549                                names=names, use_locking=True)
550       result = self.ExecOp(True, op)
551     except errors.GenericError, err:
552       err_code, msg = cli.FormatError(err)
553       Err(msg, exit_code=err_code)
554     self.nodes = [data[0] for data in result if not (data[1] or data[2])]
555
556     op_diagnose = opcodes.OpOsDiagnose(output_fields=["name",
557                                                       "variants",
558                                                       "hidden"],
559                                        names=[])
560     result = self.ExecOp(True, op_diagnose)
561
562     if not result:
563       Err("Can't get the OS list")
564
565     found = False
566     for (name, variants, _) in result:
567       if self.opts.os in cli.CalculateOSNames(name, variants):
568         found = True
569         break
570
571     if not found:
572       Err("OS '%s' not found" % self.opts.os)
573
574     cluster_info = self.cl.QueryClusterInfo()
575     self.cluster_info = cluster_info
576     if not self.cluster_info:
577       Err("Can't get cluster info")
578
579     default_nic_params = self.cluster_info["nicparams"][constants.PP_DEFAULT]
580     self.cluster_default_nicparams = default_nic_params
581     if self.hypervisor is None:
582       self.hypervisor = self.cluster_info["default_hypervisor"]
583     self.hv_can_migrate = \
584       hypervisor.GetHypervisorClass(self.hypervisor).CAN_MIGRATE
585
586   @_DoCheckInstances
587   @_DoBatch(False)
588   def BurnCreateInstances(self):
589     """Create the given instances.
590
591     """
592     self.to_rem = []
593     mytor = izip(cycle(self.nodes),
594                  islice(cycle(self.nodes), 1, None),
595                  self.instances)
596
597     Log("Creating instances")
598     for pnode, snode, instance in mytor:
599       Log("instance %s", instance, indent=1)
600       if self.opts.iallocator:
601         pnode = snode = None
602         msg = "with iallocator %s" % self.opts.iallocator
603       elif self.opts.disk_template not in constants.DTS_INT_MIRROR:
604         snode = None
605         msg = "on %s" % pnode
606       else:
607         msg = "on %s, %s" % (pnode, snode)
608
609       Log(msg, indent=2)
610
611       op = opcodes.OpInstanceCreate(instance_name=instance,
612                                     disks=[{"size": size}
613                                            for size in self.disk_size],
614                                     disk_template=self.opts.disk_template,
615                                     nics=self.opts.nics,
616                                     mode=constants.INSTANCE_CREATE,
617                                     os_type=self.opts.os,
618                                     pnode=pnode,
619                                     snode=snode,
620                                     start=True,
621                                     ip_check=self.opts.ip_check,
622                                     name_check=self.opts.name_check,
623                                     wait_for_sync=True,
624                                     file_driver="loop",
625                                     file_storage_dir=None,
626                                     iallocator=self.opts.iallocator,
627                                     beparams=self.bep,
628                                     hvparams=self.hvp,
629                                     hypervisor=self.hypervisor,
630                                     osparams=self.opts.osparams,
631                                     )
632       remove_instance = lambda name: lambda: self.to_rem.append(name)
633       self.ExecOrQueue(instance, [op], post_process=remove_instance(instance))
634
635   @_DoBatch(False)
636   def BurnModifyRuntimeMemory(self):
637     """Alter the runtime memory."""
638     Log("Setting instance runtime memory")
639     for instance in self.instances:
640       Log("instance %s", instance, indent=1)
641       tgt_mem = self.bep[constants.BE_MINMEM]
642       op = opcodes.OpInstanceSetParams(instance_name=instance,
643                                        runtime_mem=tgt_mem)
644       Log("Set memory to %s MB", tgt_mem, indent=2)
645       self.ExecOrQueue(instance, [op])
646
647   @_DoBatch(False)
648   def BurnGrowDisks(self):
649     """Grow both the os and the swap disks by the requested amount, if any."""
650     Log("Growing disks")
651     for instance in self.instances:
652       Log("instance %s", instance, indent=1)
653       for idx, growth in enumerate(self.disk_growth):
654         if growth > 0:
655           op = opcodes.OpInstanceGrowDisk(instance_name=instance, disk=idx,
656                                           amount=growth, wait_for_sync=True)
657           Log("increase disk/%s by %s MB", idx, growth, indent=2)
658           self.ExecOrQueue(instance, [op])
659
660   @_DoBatch(True)
661   def BurnReplaceDisks1D8(self):
662     """Replace disks on primary and secondary for drbd8."""
663     Log("Replacing disks on the same nodes")
664     early_release = self.opts.early_release
665     for instance in self.instances:
666       Log("instance %s", instance, indent=1)
667       ops = []
668       for mode in constants.REPLACE_DISK_SEC, constants.REPLACE_DISK_PRI:
669         op = opcodes.OpInstanceReplaceDisks(instance_name=instance,
670                                             mode=mode,
671                                             disks=list(range(self.disk_count)),
672                                             early_release=early_release)
673         Log("run %s", mode, indent=2)
674         ops.append(op)
675       self.ExecOrQueue(instance, ops)
676
677   @_DoBatch(True)
678   def BurnReplaceDisks2(self):
679     """Replace secondary node."""
680     Log("Changing the secondary node")
681     mode = constants.REPLACE_DISK_CHG
682
683     mytor = izip(islice(cycle(self.nodes), 2, None),
684                  self.instances)
685     for tnode, instance in mytor:
686       Log("instance %s", instance, indent=1)
687       if self.opts.iallocator:
688         tnode = None
689         msg = "with iallocator %s" % self.opts.iallocator
690       else:
691         msg = tnode
692       op = opcodes.OpInstanceReplaceDisks(instance_name=instance,
693                                           mode=mode,
694                                           remote_node=tnode,
695                                           iallocator=self.opts.iallocator,
696                                           disks=[],
697                                           early_release=self.opts.early_release)
698       Log("run %s %s", mode, msg, indent=2)
699       self.ExecOrQueue(instance, [op])
700
701   @_DoCheckInstances
702   @_DoBatch(False)
703   def BurnFailover(self):
704     """Failover the instances."""
705     Log("Failing over instances")
706     for instance in self.instances:
707       Log("instance %s", instance, indent=1)
708       op = opcodes.OpInstanceFailover(instance_name=instance,
709                                       ignore_consistency=False)
710       self.ExecOrQueue(instance, [op])
711
712   @_DoCheckInstances
713   @_DoBatch(False)
714   def BurnMove(self):
715     """Move the instances."""
716     Log("Moving instances")
717     mytor = izip(islice(cycle(self.nodes), 1, None),
718                  self.instances)
719     for tnode, instance in mytor:
720       Log("instance %s", instance, indent=1)
721       op = opcodes.OpInstanceMove(instance_name=instance,
722                                   target_node=tnode)
723       self.ExecOrQueue(instance, [op])
724
725   @_DoBatch(False)
726   def BurnMigrate(self):
727     """Migrate the instances."""
728     Log("Migrating instances")
729     for instance in self.instances:
730       Log("instance %s", instance, indent=1)
731       op1 = opcodes.OpInstanceMigrate(instance_name=instance, mode=None,
732                                       cleanup=False)
733
734       op2 = opcodes.OpInstanceMigrate(instance_name=instance, mode=None,
735                                       cleanup=True)
736       Log("migration and migration cleanup", indent=2)
737       self.ExecOrQueue(instance, [op1, op2])
738
739   @_DoCheckInstances
740   @_DoBatch(False)
741   def BurnImportExport(self):
742     """Export the instance, delete it, and import it back.
743
744     """
745     Log("Exporting and re-importing instances")
746     mytor = izip(cycle(self.nodes),
747                  islice(cycle(self.nodes), 1, None),
748                  islice(cycle(self.nodes), 2, None),
749                  self.instances)
750
751     for pnode, snode, enode, instance in mytor:
752       Log("instance %s", instance, indent=1)
753       # read the full name of the instance
754       nam_op = opcodes.OpInstanceQuery(output_fields=["name"],
755                                        names=[instance], use_locking=True)
756       full_name = self.ExecOp(False, nam_op)[0][0]
757
758       if self.opts.iallocator:
759         pnode = snode = None
760         import_log_msg = ("import from %s"
761                           " with iallocator %s" %
762                           (enode, self.opts.iallocator))
763       elif self.opts.disk_template not in constants.DTS_INT_MIRROR:
764         snode = None
765         import_log_msg = ("import from %s to %s" %
766                           (enode, pnode))
767       else:
768         import_log_msg = ("import from %s to %s, %s" %
769                           (enode, pnode, snode))
770
771       exp_op = opcodes.OpBackupExport(instance_name=instance,
772                                       target_node=enode,
773                                       mode=constants.EXPORT_MODE_LOCAL,
774                                       shutdown=True)
775       rem_op = opcodes.OpInstanceRemove(instance_name=instance,
776                                         ignore_failures=True)
777       imp_dir = utils.PathJoin(pathutils.EXPORT_DIR, full_name)
778       imp_op = opcodes.OpInstanceCreate(instance_name=instance,
779                                         disks=[{"size": size}
780                                                for size in self.disk_size],
781                                         disk_template=self.opts.disk_template,
782                                         nics=self.opts.nics,
783                                         mode=constants.INSTANCE_IMPORT,
784                                         src_node=enode,
785                                         src_path=imp_dir,
786                                         pnode=pnode,
787                                         snode=snode,
788                                         start=True,
789                                         ip_check=self.opts.ip_check,
790                                         name_check=self.opts.name_check,
791                                         wait_for_sync=True,
792                                         file_storage_dir=None,
793                                         file_driver="loop",
794                                         iallocator=self.opts.iallocator,
795                                         beparams=self.bep,
796                                         hvparams=self.hvp,
797                                         osparams=self.opts.osparams,
798                                         )
799
800       erem_op = opcodes.OpBackupRemove(instance_name=instance)
801
802       Log("export to node %s", enode, indent=2)
803       Log("remove instance", indent=2)
804       Log(import_log_msg, indent=2)
805       Log("remove export", indent=2)
806       self.ExecOrQueue(instance, [exp_op, rem_op, imp_op, erem_op])
807
808   @staticmethod
809   def StopInstanceOp(instance):
810     """Stop given instance."""
811     return opcodes.OpInstanceShutdown(instance_name=instance)
812
813   @staticmethod
814   def StartInstanceOp(instance):
815     """Start given instance."""
816     return opcodes.OpInstanceStartup(instance_name=instance, force=False)
817
818   @staticmethod
819   def RenameInstanceOp(instance, instance_new):
820     """Rename instance."""
821     return opcodes.OpInstanceRename(instance_name=instance,
822                                     new_name=instance_new)
823
824   @_DoCheckInstances
825   @_DoBatch(True)
826   def BurnStopStart(self):
827     """Stop/start the instances."""
828     Log("Stopping and starting instances")
829     for instance in self.instances:
830       Log("instance %s", instance, indent=1)
831       op1 = self.StopInstanceOp(instance)
832       op2 = self.StartInstanceOp(instance)
833       self.ExecOrQueue(instance, [op1, op2])
834
835   @_DoBatch(False)
836   def BurnRemove(self):
837     """Remove the instances."""
838     Log("Removing instances")
839     for instance in self.to_rem:
840       Log("instance %s", instance, indent=1)
841       op = opcodes.OpInstanceRemove(instance_name=instance,
842                                     ignore_failures=True)
843       self.ExecOrQueue(instance, [op])
844
845   def BurnRename(self):
846     """Rename the instances.
847
848     Note that this function will not execute in parallel, since we
849     only have one target for rename.
850
851     """
852     Log("Renaming instances")
853     rename = self.opts.rename
854     for instance in self.instances:
855       Log("instance %s", instance, indent=1)
856       op_stop1 = self.StopInstanceOp(instance)
857       op_stop2 = self.StopInstanceOp(rename)
858       op_rename1 = self.RenameInstanceOp(instance, rename)
859       op_rename2 = self.RenameInstanceOp(rename, instance)
860       op_start1 = self.StartInstanceOp(rename)
861       op_start2 = self.StartInstanceOp(instance)
862       self.ExecOp(False, op_stop1, op_rename1, op_start1)
863       self._CheckInstanceAlive(rename)
864       self.ExecOp(False, op_stop2, op_rename2, op_start2)
865       self._CheckInstanceAlive(instance)
866
867   @_DoCheckInstances
868   @_DoBatch(True)
869   def BurnReinstall(self):
870     """Reinstall the instances."""
871     Log("Reinstalling instances")
872     for instance in self.instances:
873       Log("instance %s", instance, indent=1)
874       op1 = self.StopInstanceOp(instance)
875       op2 = opcodes.OpInstanceReinstall(instance_name=instance)
876       Log("reinstall without passing the OS", indent=2)
877       op3 = opcodes.OpInstanceReinstall(instance_name=instance,
878                                         os_type=self.opts.os)
879       Log("reinstall specifying the OS", indent=2)
880       op4 = self.StartInstanceOp(instance)
881       self.ExecOrQueue(instance, [op1, op2, op3, op4])
882
883   @_DoCheckInstances
884   @_DoBatch(True)
885   def BurnReboot(self):
886     """Reboot the instances."""
887     Log("Rebooting instances")
888     for instance in self.instances:
889       Log("instance %s", instance, indent=1)
890       ops = []
891       for reboot_type in self.opts.reboot_types:
892         op = opcodes.OpInstanceReboot(instance_name=instance,
893                                       reboot_type=reboot_type,
894                                       ignore_secondaries=False)
895         Log("reboot with type '%s'", reboot_type, indent=2)
896         ops.append(op)
897       self.ExecOrQueue(instance, ops)
898
899   @_DoCheckInstances
900   @_DoBatch(True)
901   def BurnRenameSame(self):
902     """Rename the instances to their own name."""
903     Log("Renaming the instances to their own name")
904     for instance in self.instances:
905       Log("instance %s", instance, indent=1)
906       op1 = self.StopInstanceOp(instance)
907       op2 = self.RenameInstanceOp(instance, instance)
908       Log("rename to the same name", indent=2)
909       op4 = self.StartInstanceOp(instance)
910       self.ExecOrQueue(instance, [op1, op2, op4])
911
912   @_DoCheckInstances
913   @_DoBatch(True)
914   def BurnActivateDisks(self):
915     """Activate and deactivate disks of the instances."""
916     Log("Activating/deactivating disks")
917     for instance in self.instances:
918       Log("instance %s", instance, indent=1)
919       op_start = self.StartInstanceOp(instance)
920       op_act = opcodes.OpInstanceActivateDisks(instance_name=instance)
921       op_deact = opcodes.OpInstanceDeactivateDisks(instance_name=instance)
922       op_stop = self.StopInstanceOp(instance)
923       Log("activate disks when online", indent=2)
924       Log("activate disks when offline", indent=2)
925       Log("deactivate disks (when offline)", indent=2)
926       self.ExecOrQueue(instance, [op_act, op_stop, op_act, op_deact, op_start])
927
928   @_DoCheckInstances
929   @_DoBatch(False)
930   def BurnAddRemoveDisks(self):
931     """Add and remove an extra disk for the instances."""
932     Log("Adding and removing disks")
933     for instance in self.instances:
934       Log("instance %s", instance, indent=1)
935       op_add = opcodes.OpInstanceSetParams(
936         instance_name=instance,
937         disks=[(constants.DDM_ADD, {"size": self.disk_size[0]})])
938       op_rem = opcodes.OpInstanceSetParams(
939         instance_name=instance, disks=[(constants.DDM_REMOVE, {})])
940       op_stop = self.StopInstanceOp(instance)
941       op_start = self.StartInstanceOp(instance)
942       Log("adding a disk", indent=2)
943       Log("removing last disk", indent=2)
944       self.ExecOrQueue(instance, [op_add, op_stop, op_rem, op_start])
945
946   @_DoBatch(False)
947   def BurnAddRemoveNICs(self):
948     """Add, change and remove an extra NIC for the instances."""
949     Log("Adding and removing NICs")
950     for instance in self.instances:
951       Log("instance %s", instance, indent=1)
952       op_add = opcodes.OpInstanceSetParams(
953         instance_name=instance, nics=[(constants.DDM_ADD, {})])
954       op_chg = opcodes.OpInstanceSetParams(
955         instance_name=instance, nics=[(constants.DDM_MODIFY,
956                                        -1, {"mac": constants.VALUE_GENERATE})])
957       op_rem = opcodes.OpInstanceSetParams(
958         instance_name=instance, nics=[(constants.DDM_REMOVE, {})])
959       Log("adding a NIC", indent=2)
960       Log("changing a NIC", indent=2)
961       Log("removing last NIC", indent=2)
962       self.ExecOrQueue(instance, [op_add, op_chg, op_rem])
963
964   def ConfdCallback(self, reply):
965     """Callback for confd queries"""
966     if reply.type == confd_client.UPCALL_REPLY:
967       if reply.server_reply.status != constants.CONFD_REPL_STATUS_OK:
968         Err("Query %s gave non-ok status %s: %s" % (reply.orig_request,
969                                                     reply.server_reply.status,
970                                                     reply.server_reply))
971       if reply.orig_request.type == constants.CONFD_REQ_PING:
972         Log("Ping: OK", indent=1)
973       elif reply.orig_request.type == constants.CONFD_REQ_CLUSTER_MASTER:
974         if reply.server_reply.answer == self.cluster_info["master"]:
975           Log("Master: OK", indent=1)
976         else:
977           Err("Master: wrong: %s" % reply.server_reply.answer)
978       elif reply.orig_request.type == constants.CONFD_REQ_NODE_ROLE_BYNAME:
979         if reply.server_reply.answer == constants.CONFD_NODE_ROLE_MASTER:
980           Log("Node role for master: OK", indent=1)
981         else:
982           Err("Node role for master: wrong: %s" % reply.server_reply.answer)
983
984   def DoConfdRequestReply(self, req):
985     self.confd_counting_callback.RegisterQuery(req.rsalt)
986     self.confd_client.SendRequest(req, async=False)
987     while not self.confd_counting_callback.AllAnswered():
988       if not self.confd_client.ReceiveReply():
989         Err("Did not receive all expected confd replies")
990         break
991
992   def BurnConfd(self):
993     """Run confd queries for our instances.
994
995     The following confd queries are tested:
996       - CONFD_REQ_PING: simple ping
997       - CONFD_REQ_CLUSTER_MASTER: cluster master
998       - CONFD_REQ_NODE_ROLE_BYNAME: node role, for the master
999
1000     """
1001     Log("Checking confd results")
1002
1003     filter_callback = confd_client.ConfdFilterCallback(self.ConfdCallback)
1004     counting_callback = confd_client.ConfdCountingCallback(filter_callback)
1005     self.confd_counting_callback = counting_callback
1006
1007     self.confd_client = confd_client.GetConfdClient(counting_callback)
1008
1009     req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
1010     self.DoConfdRequestReply(req)
1011
1012     req = confd_client.ConfdClientRequest(
1013       type=constants.CONFD_REQ_CLUSTER_MASTER)
1014     self.DoConfdRequestReply(req)
1015
1016     req = confd_client.ConfdClientRequest(
1017         type=constants.CONFD_REQ_NODE_ROLE_BYNAME,
1018         query=self.cluster_info["master"])
1019     self.DoConfdRequestReply(req)
1020
1021   def _CheckInstanceAlive(self, instance):
1022     """Check if an instance is alive by doing http checks.
1023
1024     This will try to retrieve the url on the instance /hostname.txt
1025     and check that it contains the hostname of the instance. In case
1026     we get ECONNREFUSED, we retry up to the net timeout seconds, for
1027     any other error we abort.
1028
1029     """
1030     if not self.opts.http_check:
1031       return
1032     end_time = time.time() + self.opts.net_timeout
1033     url = None
1034     while time.time() < end_time and url is None:
1035       try:
1036         url = self.url_opener.open("http://%s/hostname.txt" % instance)
1037       except IOError:
1038         # here we can have connection refused, no route to host, etc.
1039         time.sleep(1)
1040     if url is None:
1041       raise InstanceDown(instance, "Cannot contact instance")
1042     hostname = url.read().strip()
1043     url.close()
1044     if hostname != instance:
1045       raise InstanceDown(instance, ("Hostname mismatch, expected %s, got %s" %
1046                                     (instance, hostname)))
1047
1048   def BurninCluster(self):
1049     """Test a cluster intensively.
1050
1051     This will create instances and then start/stop/failover them.
1052     It is safe for existing instances but could impact performance.
1053
1054     """
1055
1056     opts = self.opts
1057
1058     Log("Testing global parameters")
1059
1060     if (len(self.nodes) == 1 and
1061         opts.disk_template not in _SINGLE_NODE_DISK_TEMPLATES):
1062       Err("When one node is available/selected the disk template must"
1063           " be one of %s" % utils.CommaJoin(_SINGLE_NODE_DISK_TEMPLATES))
1064
1065     if opts.do_confd_tests and not constants.ENABLE_CONFD:
1066       Err("You selected confd tests but confd was disabled at configure time")
1067
1068     has_err = True
1069     try:
1070       self.BurnCreateInstances()
1071
1072       if self.bep[constants.BE_MINMEM] < self.bep[constants.BE_MAXMEM]:
1073         self.BurnModifyRuntimeMemory()
1074
1075       if opts.do_replace1 and opts.disk_template in constants.DTS_INT_MIRROR:
1076         self.BurnReplaceDisks1D8()
1077       if (opts.do_replace2 and len(self.nodes) > 2 and
1078           opts.disk_template in constants.DTS_INT_MIRROR):
1079         self.BurnReplaceDisks2()
1080
1081       if (opts.disk_template in constants.DTS_GROWABLE and
1082           compat.any(n > 0 for n in self.disk_growth)):
1083         self.BurnGrowDisks()
1084
1085       if opts.do_failover and opts.disk_template in constants.DTS_MIRRORED:
1086         self.BurnFailover()
1087
1088       if opts.do_migrate:
1089         if opts.disk_template not in constants.DTS_MIRRORED:
1090           Log("Skipping migration (disk template %s does not support it)",
1091               opts.disk_template)
1092         elif not self.hv_can_migrate:
1093           Log("Skipping migration (hypervisor %s does not support it)",
1094               self.hypervisor)
1095         else:
1096           self.BurnMigrate()
1097
1098       if (opts.do_move and len(self.nodes) > 1 and
1099           opts.disk_template in [constants.DT_PLAIN, constants.DT_FILE]):
1100         self.BurnMove()
1101
1102       if (opts.do_importexport and
1103           opts.disk_template in _IMPEXP_DISK_TEMPLATES):
1104         self.BurnImportExport()
1105
1106       if opts.do_reinstall:
1107         self.BurnReinstall()
1108
1109       if opts.do_reboot:
1110         self.BurnReboot()
1111
1112       if opts.do_renamesame:
1113         self.BurnRenameSame()
1114
1115       if opts.do_addremove_disks:
1116         self.BurnAddRemoveDisks()
1117
1118       default_nic_mode = self.cluster_default_nicparams[constants.NIC_MODE]
1119       # Don't add/remove nics in routed mode, as we would need an ip to add
1120       # them with
1121       if opts.do_addremove_nics:
1122         if default_nic_mode == constants.NIC_MODE_BRIDGED:
1123           self.BurnAddRemoveNICs()
1124         else:
1125           Log("Skipping nic add/remove as the cluster is not in bridged mode")
1126
1127       if opts.do_activate_disks:
1128         self.BurnActivateDisks()
1129
1130       if opts.rename:
1131         self.BurnRename()
1132
1133       if opts.do_confd_tests:
1134         self.BurnConfd()
1135
1136       if opts.do_startstop:
1137         self.BurnStopStart()
1138
1139       has_err = False
1140     finally:
1141       if has_err:
1142         Log("Error detected: opcode buffer follows:\n\n")
1143         Log(self.GetFeedbackBuf())
1144         Log("\n\n")
1145       if not self.opts.keep_instances:
1146         try:
1147           self.BurnRemove()
1148         except Exception, err:  # pylint: disable=W0703
1149           if has_err: # already detected errors, so errors in removal
1150                       # are quite expected
1151             Log("Note: error detected during instance remove: %s", err)
1152           else: # non-expected error
1153             raise
1154
1155     return constants.EXIT_SUCCESS
1156
1157
1158 def Main():
1159   """Main function.
1160
1161   """
1162   utils.SetupLogging(pathutils.LOG_BURNIN, sys.argv[0],
1163                      debug=False, stderr_logging=True)
1164
1165   return Burner().BurninCluster()