DeprecationWarning fixes for pylint
[ganeti-local] / tools / burnin
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Burnin program
23
24 """
25
26 import sys
27 import optparse
28 import time
29 import socket
30 import urllib
31 from itertools import izip, islice, cycle
32 from cStringIO import StringIO
33
34 from ganeti import opcodes
35 from ganeti import constants
36 from ganeti import cli
37 from ganeti import errors
38 from ganeti import utils
39 from ganeti import hypervisor
40 from ganeti import compat
41
42 from ganeti.confd import client as confd_client
43
44
45 USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...")
46
47 MAX_RETRIES = 3
48 LOG_HEADERS = {
49   0: "- ",
50   1: "* ",
51   2: ""
52   }
53
54
55 class InstanceDown(Exception):
56   """The checked instance was not up"""
57
58
59 class BurninFailure(Exception):
60   """Failure detected during burning"""
61
62
63 def Usage():
64   """Shows program usage information and exits the program."""
65
66   print >> sys.stderr, "Usage:"
67   print >> sys.stderr, USAGE
68   sys.exit(2)
69
70
71 def Log(msg, *args, **kwargs):
72   """Simple function that prints out its argument.
73
74   """
75   if args:
76     msg = msg % args
77   indent = kwargs.get("indent", 0)
78   sys.stdout.write("%*s%s%s\n" % (2 * indent, "",
79                                   LOG_HEADERS.get(indent, "  "), msg))
80   sys.stdout.flush()
81
82
83 def Err(msg, exit_code=1):
84   """Simple error logging that prints to stderr.
85
86   """
87   sys.stderr.write(msg + "\n")
88   sys.stderr.flush()
89   sys.exit(exit_code)
90
91
92 class SimpleOpener(urllib.FancyURLopener):
93   """A simple url opener"""
94   # pylint: disable=W0221
95
96   def prompt_user_passwd(self, host, realm, clear_cache=0):
97     """No-interaction version of prompt_user_passwd."""
98     # we follow parent class' API
99     # pylint: disable=W0613
100     return None, None
101
102   def http_error_default(self, url, fp, errcode, errmsg, headers):
103     """Custom error handling"""
104     # make sure sockets are not left in CLOSE_WAIT, this is similar
105     # but with a different exception to the BasicURLOpener class
106     _ = fp.read() # throw away data
107     fp.close()
108     raise InstanceDown("HTTP error returned: code %s, msg %s" %
109                        (errcode, errmsg))
110
111
112 OPTIONS = [
113   cli.cli_option("-o", "--os", dest="os", default=None,
114                  help="OS to use during burnin",
115                  metavar="<OS>",
116                  completion_suggest=cli.OPT_COMPL_ONE_OS),
117   cli.HYPERVISOR_OPT,
118   cli.OSPARAMS_OPT,
119   cli.cli_option("--disk-size", dest="disk_size",
120                  help="Disk size (determines disk count)",
121                  default="128m", type="string", metavar="<size,size,...>",
122                  completion_suggest=("128M 512M 1G 4G 1G,256M"
123                                      " 4G,1G,1G 10G").split()),
124   cli.cli_option("--disk-growth", dest="disk_growth", help="Disk growth",
125                  default="128m", type="string", metavar="<size,size,...>"),
126   cli.cli_option("--mem-size", dest="mem_size", help="Memory size",
127                  default=128, type="unit", metavar="<size>",
128                  completion_suggest=("128M 256M 512M 1G 4G 8G"
129                                      " 12G 16G").split()),
130   cli.cli_option("--vcpu-count", dest="vcpu_count", help="VCPU count",
131                  default=3, type="unit", metavar="<count>",
132                  completion_suggest=("1 2 3 4").split()),
133   cli.DEBUG_OPT,
134   cli.VERBOSE_OPT,
135   cli.NOIPCHECK_OPT,
136   cli.NONAMECHECK_OPT,
137   cli.EARLY_RELEASE_OPT,
138   cli.cli_option("--no-replace1", dest="do_replace1",
139                  help="Skip disk replacement with the same secondary",
140                  action="store_false", default=True),
141   cli.cli_option("--no-replace2", dest="do_replace2",
142                  help="Skip disk replacement with a different secondary",
143                  action="store_false", default=True),
144   cli.cli_option("--no-failover", dest="do_failover",
145                  help="Skip instance failovers", action="store_false",
146                  default=True),
147   cli.cli_option("--no-migrate", dest="do_migrate",
148                  help="Skip instance live migration",
149                  action="store_false", default=True),
150   cli.cli_option("--no-move", dest="do_move",
151                  help="Skip instance moves", action="store_false",
152                  default=True),
153   cli.cli_option("--no-importexport", dest="do_importexport",
154                  help="Skip instance export/import", action="store_false",
155                  default=True),
156   cli.cli_option("--no-startstop", dest="do_startstop",
157                  help="Skip instance stop/start", action="store_false",
158                  default=True),
159   cli.cli_option("--no-reinstall", dest="do_reinstall",
160                  help="Skip instance reinstall", action="store_false",
161                  default=True),
162   cli.cli_option("--no-reboot", dest="do_reboot",
163                  help="Skip instance reboot", action="store_false",
164                  default=True),
165   cli.cli_option("--reboot-types", dest="reboot_types",
166                  help="Specify the reboot types", default=None),
167   cli.cli_option("--no-activate-disks", dest="do_activate_disks",
168                  help="Skip disk activation/deactivation",
169                  action="store_false", default=True),
170   cli.cli_option("--no-add-disks", dest="do_addremove_disks",
171                  help="Skip disk addition/removal",
172                  action="store_false", default=True),
173   cli.cli_option("--no-add-nics", dest="do_addremove_nics",
174                  help="Skip NIC addition/removal",
175                  action="store_false", default=True),
176   cli.cli_option("--no-nics", dest="nics",
177                  help="No network interfaces", action="store_const",
178                  const=[], default=[{}]),
179   cli.cli_option("--no-confd", dest="do_confd_tests",
180                  help="Skip confd queries",
181                  action="store_false", default=True),
182   cli.cli_option("--rename", dest="rename", default=None,
183                  help=("Give one unused instance name which is taken"
184                        " to start the renaming sequence"),
185                  metavar="<instance_name>"),
186   cli.cli_option("-t", "--disk-template", dest="disk_template",
187                  choices=list(constants.DISK_TEMPLATES),
188                  default=constants.DT_DRBD8,
189                  help="Disk template (diskless, file, plain, sharedfile"
190                  " or drbd) [drbd]"),
191   cli.cli_option("-n", "--nodes", dest="nodes", default="",
192                  help=("Comma separated list of nodes to perform"
193                        " the burnin on (defaults to all nodes)"),
194                  completion_suggest=cli.OPT_COMPL_MANY_NODES),
195   cli.cli_option("-I", "--iallocator", dest="iallocator",
196                  default=None, type="string",
197                  help=("Perform the allocation using an iallocator"
198                        " instead of fixed node spread (node restrictions no"
199                        " longer apply, therefore -n/--nodes must not be"
200                        " used"),
201                  completion_suggest=cli.OPT_COMPL_ONE_IALLOCATOR),
202   cli.cli_option("-p", "--parallel", default=False, action="store_true",
203                  dest="parallel",
204                  help=("Enable parallelization of some operations in"
205                        " order to speed burnin or to test granular locking")),
206   cli.cli_option("--net-timeout", default=15, type="int",
207                  dest="net_timeout",
208                  help=("The instance check network timeout in seconds"
209                        " (defaults to 15 seconds)"),
210                  completion_suggest="15 60 300 900".split()),
211   cli.cli_option("-C", "--http-check", default=False, action="store_true",
212                  dest="http_check",
213                  help=("Enable checking of instance status via http,"
214                        " looking for /hostname.txt that should contain the"
215                        " name of the instance")),
216   cli.cli_option("-K", "--keep-instances", default=False,
217                  action="store_true",
218                  dest="keep_instances",
219                  help=("Leave instances on the cluster after burnin,"
220                        " for investigation in case of errors or simply"
221                        " to use them")),
222   ]
223
224 # Mainly used for bash completion
225 ARGUMENTS = [cli.ArgInstance(min=1)]
226
227
228 def _DoCheckInstances(fn):
229   """Decorator for checking instances.
230
231   """
232   def wrapper(self, *args, **kwargs):
233     val = fn(self, *args, **kwargs)
234     for instance in self.instances:
235       self._CheckInstanceAlive(instance) # pylint: disable=W0212
236     return val
237
238   return wrapper
239
240
241 def _DoBatch(retry):
242   """Decorator for possible batch operations.
243
244   Must come after the _DoCheckInstances decorator (if any).
245
246   @param retry: whether this is a retryable batch, will be
247       passed to StartBatch
248
249   """
250   def wrap(fn):
251     def batched(self, *args, **kwargs):
252       self.StartBatch(retry)
253       val = fn(self, *args, **kwargs)
254       self.CommitQueue()
255       return val
256     return batched
257
258   return wrap
259
260
261 class Burner(object):
262   """Burner class."""
263
264   def __init__(self):
265     """Constructor."""
266     self.url_opener = SimpleOpener()
267     self._feed_buf = StringIO()
268     self.nodes = []
269     self.instances = []
270     self.to_rem = []
271     self.queued_ops = []
272     self.opts = None
273     self.queue_retry = False
274     self.disk_count = self.disk_growth = self.disk_size = None
275     self.hvp = self.bep = None
276     self.ParseOptions()
277     self.cl = cli.GetClient()
278     self.GetState()
279
280   def ClearFeedbackBuf(self):
281     """Clear the feedback buffer."""
282     self._feed_buf.truncate(0)
283
284   def GetFeedbackBuf(self):
285     """Return the contents of the buffer."""
286     return self._feed_buf.getvalue()
287
288   def Feedback(self, msg):
289     """Acumulate feedback in our buffer."""
290     formatted_msg = "%s %s" % (time.ctime(utils.MergeTime(msg[0])), msg[2])
291     self._feed_buf.write(formatted_msg + "\n")
292     if self.opts.verbose:
293       Log(formatted_msg, indent=3)
294
295   def MaybeRetry(self, retry_count, msg, fn, *args):
296     """Possibly retry a given function execution.
297
298     @type retry_count: int
299     @param retry_count: retry counter:
300         - 0: non-retryable action
301         - 1: last retry for a retryable action
302         - MAX_RETRIES: original try for a retryable action
303     @type msg: str
304     @param msg: the kind of the operation
305     @type fn: callable
306     @param fn: the function to be called
307
308     """
309     try:
310       val = fn(*args)
311       if retry_count > 0 and retry_count < MAX_RETRIES:
312         Log("Idempotent %s succeeded after %d retries",
313             msg, MAX_RETRIES - retry_count)
314       return val
315     except Exception, err: # pylint: disable=W0703
316       if retry_count == 0:
317         Log("Non-idempotent %s failed, aborting", msg)
318         raise
319       elif retry_count == 1:
320         Log("Idempotent %s repeated failure, aborting", msg)
321         raise
322       else:
323         Log("Idempotent %s failed, retry #%d/%d: %s",
324             msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err)
325         self.MaybeRetry(retry_count - 1, msg, fn, *args)
326
327   def _ExecOp(self, *ops):
328     """Execute one or more opcodes and manage the exec buffer.
329
330     @return: if only opcode has been passed, we return its result;
331         otherwise we return the list of results
332
333     """
334     job_id = cli.SendJob(ops, cl=self.cl)
335     results = cli.PollJob(job_id, cl=self.cl, feedback_fn=self.Feedback)
336     if len(ops) == 1:
337       return results[0]
338     else:
339       return results
340
341   def ExecOp(self, retry, *ops):
342     """Execute one or more opcodes and manage the exec buffer.
343
344     @return: if only opcode has been passed, we return its result;
345         otherwise we return the list of results
346
347     """
348     if retry:
349       rval = MAX_RETRIES
350     else:
351       rval = 0
352     cli.SetGenericOpcodeOpts(ops, self.opts)
353     return self.MaybeRetry(rval, "opcode", self._ExecOp, *ops)
354
355   def ExecOrQueue(self, name, ops, post_process=None):
356     """Execute an opcode and manage the exec buffer."""
357     if self.opts.parallel:
358       cli.SetGenericOpcodeOpts(ops, self.opts)
359       self.queued_ops.append((ops, name, post_process))
360     else:
361       val = self.ExecOp(self.queue_retry, *ops) # pylint: disable=W0142
362       if post_process is not None:
363         post_process()
364       return val
365
366   def StartBatch(self, retry):
367     """Start a new batch of jobs.
368
369     @param retry: whether this is a retryable batch
370
371     """
372     self.queued_ops = []
373     self.queue_retry = retry
374
375   def CommitQueue(self):
376     """Execute all submitted opcodes in case of parallel burnin"""
377     if not self.opts.parallel or not self.queued_ops:
378       return
379
380     if self.queue_retry:
381       rval = MAX_RETRIES
382     else:
383       rval = 0
384
385     try:
386       results = self.MaybeRetry(rval, "jobset", self.ExecJobSet,
387                                 self.queued_ops)
388     finally:
389       self.queued_ops = []
390     return results
391
392   def ExecJobSet(self, jobs):
393     """Execute a set of jobs and return once all are done.
394
395     The method will return the list of results, if all jobs are
396     successful. Otherwise, OpExecError will be raised from within
397     cli.py.
398
399     """
400     self.ClearFeedbackBuf()
401     jex = cli.JobExecutor(cl=self.cl, feedback_fn=self.Feedback)
402     for ops, name, _ in jobs:
403       jex.QueueJob(name, *ops) # pylint: disable=W0142
404     try:
405       results = jex.GetResults()
406     except Exception, err: # pylint: disable=W0703
407       Log("Jobs failed: %s", err)
408       raise BurninFailure()
409
410     fail = False
411     val = []
412     for (_, name, post_process), (success, result) in zip(jobs, results):
413       if success:
414         if post_process:
415           try:
416             post_process()
417           except Exception, err: # pylint: disable=W0703
418             Log("Post process call for job %s failed: %s", name, err)
419             fail = True
420         val.append(result)
421       else:
422         fail = True
423
424     if fail:
425       raise BurninFailure()
426
427     return val
428
429   def ParseOptions(self):
430     """Parses the command line options.
431
432     In case of command line errors, it will show the usage and exit the
433     program.
434
435     """
436     parser = optparse.OptionParser(usage="\n%s" % USAGE,
437                                    version=("%%prog (ganeti) %s" %
438                                             constants.RELEASE_VERSION),
439                                    option_list=OPTIONS)
440
441     options, args = parser.parse_args()
442     if len(args) < 1 or options.os is None:
443       Usage()
444
445     supported_disk_templates = (constants.DT_DISKLESS,
446                                 constants.DT_FILE,
447                                 constants.DT_SHARED_FILE,
448                                 constants.DT_PLAIN,
449                                 constants.DT_DRBD8)
450     if options.disk_template not in supported_disk_templates:
451       Err("Unknown disk template '%s'" % options.disk_template)
452
453     if options.disk_template == constants.DT_DISKLESS:
454       disk_size = disk_growth = []
455       options.do_addremove_disks = False
456     else:
457       disk_size = [utils.ParseUnit(v) for v in options.disk_size.split(",")]
458       disk_growth = [utils.ParseUnit(v)
459                      for v in options.disk_growth.split(",")]
460       if len(disk_growth) != len(disk_size):
461         Err("Wrong disk sizes/growth combination")
462     if ((disk_size and options.disk_template == constants.DT_DISKLESS) or
463         (not disk_size and options.disk_template != constants.DT_DISKLESS)):
464       Err("Wrong disk count/disk template combination")
465
466     self.disk_size = disk_size
467     self.disk_growth = disk_growth
468     self.disk_count = len(disk_size)
469
470     if options.nodes and options.iallocator:
471       Err("Give either the nodes option or the iallocator option, not both")
472
473     if options.http_check and not options.name_check:
474       Err("Can't enable HTTP checks without name checks")
475
476     self.opts = options
477     self.instances = args
478     self.bep = {
479       constants.BE_MEMORY: options.mem_size,
480       constants.BE_VCPUS: options.vcpu_count,
481       }
482
483     self.hypervisor = None
484     self.hvp = {}
485     if options.hypervisor:
486       self.hypervisor, self.hvp = options.hypervisor
487
488     if options.reboot_types is None:
489       options.reboot_types = constants.REBOOT_TYPES
490     else:
491       options.reboot_types = options.reboot_types.split(",")
492       rt_diff = set(options.reboot_types).difference(constants.REBOOT_TYPES)
493       if rt_diff:
494         Err("Invalid reboot types specified: %s" % utils.CommaJoin(rt_diff))
495
496     socket.setdefaulttimeout(options.net_timeout)
497
498   def GetState(self):
499     """Read the cluster state from the master daemon."""
500     if self.opts.nodes:
501       names = self.opts.nodes.split(",")
502     else:
503       names = []
504     try:
505       op = opcodes.OpNodeQuery(output_fields=["name", "offline", "drained"],
506                                names=names, use_locking=True)
507       result = self.ExecOp(True, op)
508     except errors.GenericError, err:
509       err_code, msg = cli.FormatError(err)
510       Err(msg, exit_code=err_code)
511     self.nodes = [data[0] for data in result if not (data[1] or data[2])]
512
513     op_diagnose = opcodes.OpOsDiagnose(output_fields=["name",
514                                                       "variants",
515                                                       "hidden"],
516                                        names=[])
517     result = self.ExecOp(True, op_diagnose)
518
519     if not result:
520       Err("Can't get the OS list")
521
522     found = False
523     for (name, variants, _) in result:
524       if self.opts.os in cli.CalculateOSNames(name, variants):
525         found = True
526         break
527
528     if not found:
529       Err("OS '%s' not found" % self.opts.os)
530
531     cluster_info = self.cl.QueryClusterInfo()
532     self.cluster_info = cluster_info
533     if not self.cluster_info:
534       Err("Can't get cluster info")
535
536     default_nic_params = self.cluster_info["nicparams"][constants.PP_DEFAULT]
537     self.cluster_default_nicparams = default_nic_params
538     if self.hypervisor is None:
539       self.hypervisor = self.cluster_info["default_hypervisor"]
540     self.hv_class = hypervisor.GetHypervisorClass(self.hypervisor)
541
542   @_DoCheckInstances
543   @_DoBatch(False)
544   def BurnCreateInstances(self):
545     """Create the given instances.
546
547     """
548     self.to_rem = []
549     mytor = izip(cycle(self.nodes),
550                  islice(cycle(self.nodes), 1, None),
551                  self.instances)
552
553     Log("Creating instances")
554     for pnode, snode, instance in mytor:
555       Log("instance %s", instance, indent=1)
556       if self.opts.iallocator:
557         pnode = snode = None
558         msg = "with iallocator %s" % self.opts.iallocator
559       elif self.opts.disk_template not in constants.DTS_INT_MIRROR:
560         snode = None
561         msg = "on %s" % pnode
562       else:
563         msg = "on %s, %s" % (pnode, snode)
564
565       Log(msg, indent=2)
566
567       op = opcodes.OpInstanceCreate(instance_name=instance,
568                                     disks=[{"size": size}
569                                            for size in self.disk_size],
570                                     disk_template=self.opts.disk_template,
571                                     nics=self.opts.nics,
572                                     mode=constants.INSTANCE_CREATE,
573                                     os_type=self.opts.os,
574                                     pnode=pnode,
575                                     snode=snode,
576                                     start=True,
577                                     ip_check=self.opts.ip_check,
578                                     name_check=self.opts.name_check,
579                                     wait_for_sync=True,
580                                     file_driver="loop",
581                                     file_storage_dir=None,
582                                     iallocator=self.opts.iallocator,
583                                     beparams=self.bep,
584                                     hvparams=self.hvp,
585                                     hypervisor=self.hypervisor,
586                                     osparams=self.opts.osparams,
587                                     )
588       remove_instance = lambda name: lambda: self.to_rem.append(name)
589       self.ExecOrQueue(instance, [op], post_process=remove_instance(instance))
590
591   @_DoBatch(False)
592   def BurnGrowDisks(self):
593     """Grow both the os and the swap disks by the requested amount, if any."""
594     Log("Growing disks")
595     for instance in self.instances:
596       Log("instance %s", instance, indent=1)
597       for idx, growth in enumerate(self.disk_growth):
598         if growth > 0:
599           op = opcodes.OpInstanceGrowDisk(instance_name=instance, disk=idx,
600                                           amount=growth, wait_for_sync=True)
601           Log("increase disk/%s by %s MB", idx, growth, indent=2)
602           self.ExecOrQueue(instance, [op])
603
604   @_DoBatch(True)
605   def BurnReplaceDisks1D8(self):
606     """Replace disks on primary and secondary for drbd8."""
607     Log("Replacing disks on the same nodes")
608     early_release = self.opts.early_release
609     for instance in self.instances:
610       Log("instance %s", instance, indent=1)
611       ops = []
612       for mode in constants.REPLACE_DISK_SEC, constants.REPLACE_DISK_PRI:
613         op = opcodes.OpInstanceReplaceDisks(instance_name=instance,
614                                             mode=mode,
615                                             disks=list(range(self.disk_count)),
616                                             early_release=early_release)
617         Log("run %s", mode, indent=2)
618         ops.append(op)
619       self.ExecOrQueue(instance, ops)
620
621   @_DoBatch(True)
622   def BurnReplaceDisks2(self):
623     """Replace secondary node."""
624     Log("Changing the secondary node")
625     mode = constants.REPLACE_DISK_CHG
626
627     mytor = izip(islice(cycle(self.nodes), 2, None),
628                  self.instances)
629     for tnode, instance in mytor:
630       Log("instance %s", instance, indent=1)
631       if self.opts.iallocator:
632         tnode = None
633         msg = "with iallocator %s" % self.opts.iallocator
634       else:
635         msg = tnode
636       op = opcodes.OpInstanceReplaceDisks(instance_name=instance,
637                                           mode=mode,
638                                           remote_node=tnode,
639                                           iallocator=self.opts.iallocator,
640                                           disks=[],
641                                           early_release=self.opts.early_release)
642       Log("run %s %s", mode, msg, indent=2)
643       self.ExecOrQueue(instance, [op])
644
645   @_DoCheckInstances
646   @_DoBatch(False)
647   def BurnFailover(self):
648     """Failover the instances."""
649     Log("Failing over instances")
650     for instance in self.instances:
651       Log("instance %s", instance, indent=1)
652       op = opcodes.OpInstanceFailover(instance_name=instance,
653                                       ignore_consistency=False)
654       self.ExecOrQueue(instance, [op])
655
656   @_DoCheckInstances
657   @_DoBatch(False)
658   def BurnMove(self):
659     """Move the instances."""
660     Log("Moving instances")
661     mytor = izip(islice(cycle(self.nodes), 1, None),
662                  self.instances)
663     for tnode, instance in mytor:
664       Log("instance %s", instance, indent=1)
665       op = opcodes.OpInstanceMove(instance_name=instance,
666                                   target_node=tnode)
667       self.ExecOrQueue(instance, [op])
668
669   @_DoBatch(False)
670   def BurnMigrate(self):
671     """Migrate the instances."""
672     Log("Migrating instances")
673     for instance in self.instances:
674       Log("instance %s", instance, indent=1)
675       op1 = opcodes.OpInstanceMigrate(instance_name=instance, mode=None,
676                                       cleanup=False)
677
678       op2 = opcodes.OpInstanceMigrate(instance_name=instance, mode=None,
679                                       cleanup=True)
680       Log("migration and migration cleanup", indent=2)
681       self.ExecOrQueue(instance, [op1, op2])
682
683   @_DoCheckInstances
684   @_DoBatch(False)
685   def BurnImportExport(self):
686     """Export the instance, delete it, and import it back.
687
688     """
689     Log("Exporting and re-importing instances")
690     mytor = izip(cycle(self.nodes),
691                  islice(cycle(self.nodes), 1, None),
692                  islice(cycle(self.nodes), 2, None),
693                  self.instances)
694
695     for pnode, snode, enode, instance in mytor:
696       Log("instance %s", instance, indent=1)
697       # read the full name of the instance
698       nam_op = opcodes.OpInstanceQuery(output_fields=["name"],
699                                        names=[instance], use_locking=True)
700       full_name = self.ExecOp(False, nam_op)[0][0]
701
702       if self.opts.iallocator:
703         pnode = snode = None
704         import_log_msg = ("import from %s"
705                           " with iallocator %s" %
706                           (enode, self.opts.iallocator))
707       elif self.opts.disk_template not in constants.DTS_INT_MIRROR:
708         snode = None
709         import_log_msg = ("import from %s to %s" %
710                           (enode, pnode))
711       else:
712         import_log_msg = ("import from %s to %s, %s" %
713                           (enode, pnode, snode))
714
715       exp_op = opcodes.OpBackupExport(instance_name=instance,
716                                       target_node=enode,
717                                       mode=constants.EXPORT_MODE_LOCAL,
718                                       shutdown=True)
719       rem_op = opcodes.OpInstanceRemove(instance_name=instance,
720                                         ignore_failures=True)
721       imp_dir = utils.PathJoin(constants.EXPORT_DIR, full_name)
722       imp_op = opcodes.OpInstanceCreate(instance_name=instance,
723                                         disks=[{"size": size}
724                                                for size in self.disk_size],
725                                         disk_template=self.opts.disk_template,
726                                         nics=self.opts.nics,
727                                         mode=constants.INSTANCE_IMPORT,
728                                         src_node=enode,
729                                         src_path=imp_dir,
730                                         pnode=pnode,
731                                         snode=snode,
732                                         start=True,
733                                         ip_check=self.opts.ip_check,
734                                         name_check=self.opts.name_check,
735                                         wait_for_sync=True,
736                                         file_storage_dir=None,
737                                         file_driver="loop",
738                                         iallocator=self.opts.iallocator,
739                                         beparams=self.bep,
740                                         hvparams=self.hvp,
741                                         osparams=self.opts.osparams,
742                                         )
743
744       erem_op = opcodes.OpBackupRemove(instance_name=instance)
745
746       Log("export to node %s", enode, indent=2)
747       Log("remove instance", indent=2)
748       Log(import_log_msg, indent=2)
749       Log("remove export", indent=2)
750       self.ExecOrQueue(instance, [exp_op, rem_op, imp_op, erem_op])
751
752   @staticmethod
753   def StopInstanceOp(instance):
754     """Stop given instance."""
755     return opcodes.OpInstanceShutdown(instance_name=instance)
756
757   @staticmethod
758   def StartInstanceOp(instance):
759     """Start given instance."""
760     return opcodes.OpInstanceStartup(instance_name=instance, force=False)
761
762   @staticmethod
763   def RenameInstanceOp(instance, instance_new):
764     """Rename instance."""
765     return opcodes.OpInstanceRename(instance_name=instance,
766                                     new_name=instance_new)
767
768   @_DoCheckInstances
769   @_DoBatch(True)
770   def BurnStopStart(self):
771     """Stop/start the instances."""
772     Log("Stopping and starting instances")
773     for instance in self.instances:
774       Log("instance %s", instance, indent=1)
775       op1 = self.StopInstanceOp(instance)
776       op2 = self.StartInstanceOp(instance)
777       self.ExecOrQueue(instance, [op1, op2])
778
779   @_DoBatch(False)
780   def BurnRemove(self):
781     """Remove the instances."""
782     Log("Removing instances")
783     for instance in self.to_rem:
784       Log("instance %s", instance, indent=1)
785       op = opcodes.OpInstanceRemove(instance_name=instance,
786                                     ignore_failures=True)
787       self.ExecOrQueue(instance, [op])
788
789   def BurnRename(self):
790     """Rename the instances.
791
792     Note that this function will not execute in parallel, since we
793     only have one target for rename.
794
795     """
796     Log("Renaming instances")
797     rename = self.opts.rename
798     for instance in self.instances:
799       Log("instance %s", instance, indent=1)
800       op_stop1 = self.StopInstanceOp(instance)
801       op_stop2 = self.StopInstanceOp(rename)
802       op_rename1 = self.RenameInstanceOp(instance, rename)
803       op_rename2 = self.RenameInstanceOp(rename, instance)
804       op_start1 = self.StartInstanceOp(rename)
805       op_start2 = self.StartInstanceOp(instance)
806       self.ExecOp(False, op_stop1, op_rename1, op_start1)
807       self._CheckInstanceAlive(rename)
808       self.ExecOp(False, op_stop2, op_rename2, op_start2)
809       self._CheckInstanceAlive(instance)
810
811   @_DoCheckInstances
812   @_DoBatch(True)
813   def BurnReinstall(self):
814     """Reinstall the instances."""
815     Log("Reinstalling instances")
816     for instance in self.instances:
817       Log("instance %s", instance, indent=1)
818       op1 = self.StopInstanceOp(instance)
819       op2 = opcodes.OpInstanceReinstall(instance_name=instance)
820       Log("reinstall without passing the OS", indent=2)
821       op3 = opcodes.OpInstanceReinstall(instance_name=instance,
822                                         os_type=self.opts.os)
823       Log("reinstall specifying the OS", indent=2)
824       op4 = self.StartInstanceOp(instance)
825       self.ExecOrQueue(instance, [op1, op2, op3, op4])
826
827   @_DoCheckInstances
828   @_DoBatch(True)
829   def BurnReboot(self):
830     """Reboot the instances."""
831     Log("Rebooting instances")
832     for instance in self.instances:
833       Log("instance %s", instance, indent=1)
834       ops = []
835       for reboot_type in self.opts.reboot_types:
836         op = opcodes.OpInstanceReboot(instance_name=instance,
837                                       reboot_type=reboot_type,
838                                       ignore_secondaries=False)
839         Log("reboot with type '%s'", reboot_type, indent=2)
840         ops.append(op)
841       self.ExecOrQueue(instance, ops)
842
843   @_DoCheckInstances
844   @_DoBatch(True)
845   def BurnActivateDisks(self):
846     """Activate and deactivate disks of the instances."""
847     Log("Activating/deactivating disks")
848     for instance in self.instances:
849       Log("instance %s", instance, indent=1)
850       op_start = self.StartInstanceOp(instance)
851       op_act = opcodes.OpInstanceActivateDisks(instance_name=instance)
852       op_deact = opcodes.OpInstanceDeactivateDisks(instance_name=instance)
853       op_stop = self.StopInstanceOp(instance)
854       Log("activate disks when online", indent=2)
855       Log("activate disks when offline", indent=2)
856       Log("deactivate disks (when offline)", indent=2)
857       self.ExecOrQueue(instance, [op_act, op_stop, op_act, op_deact, op_start])
858
859   @_DoCheckInstances
860   @_DoBatch(False)
861   def BurnAddRemoveDisks(self):
862     """Add and remove an extra disk for the instances."""
863     Log("Adding and removing disks")
864     for instance in self.instances:
865       Log("instance %s", instance, indent=1)
866       op_add = opcodes.OpInstanceSetParams(\
867         instance_name=instance,
868         disks=[(constants.DDM_ADD, {"size": self.disk_size[0]})])
869       op_rem = opcodes.OpInstanceSetParams(\
870         instance_name=instance, disks=[(constants.DDM_REMOVE, {})])
871       op_stop = self.StopInstanceOp(instance)
872       op_start = self.StartInstanceOp(instance)
873       Log("adding a disk", indent=2)
874       Log("removing last disk", indent=2)
875       self.ExecOrQueue(instance, [op_add, op_stop, op_rem, op_start])
876
877   @_DoBatch(False)
878   def BurnAddRemoveNICs(self):
879     """Add and remove an extra NIC for the instances."""
880     Log("Adding and removing NICs")
881     for instance in self.instances:
882       Log("instance %s", instance, indent=1)
883       op_add = opcodes.OpInstanceSetParams(\
884         instance_name=instance, nics=[(constants.DDM_ADD, {})])
885       op_rem = opcodes.OpInstanceSetParams(\
886         instance_name=instance, nics=[(constants.DDM_REMOVE, {})])
887       Log("adding a NIC", indent=2)
888       Log("removing last NIC", indent=2)
889       self.ExecOrQueue(instance, [op_add, op_rem])
890
891   def ConfdCallback(self, reply):
892     """Callback for confd queries"""
893     if reply.type == confd_client.UPCALL_REPLY:
894       if reply.server_reply.status != constants.CONFD_REPL_STATUS_OK:
895         Err("Query %s gave non-ok status %s: %s" % (reply.orig_request,
896                                                     reply.server_reply.status,
897                                                     reply.server_reply))
898       if reply.orig_request.type == constants.CONFD_REQ_PING:
899         Log("Ping: OK", indent=1)
900       elif reply.orig_request.type == constants.CONFD_REQ_CLUSTER_MASTER:
901         if reply.server_reply.answer == self.cluster_info["master"]:
902           Log("Master: OK", indent=1)
903         else:
904           Err("Master: wrong: %s" % reply.server_reply.answer)
905       elif reply.orig_request.type == constants.CONFD_REQ_NODE_ROLE_BYNAME:
906         if reply.server_reply.answer == constants.CONFD_NODE_ROLE_MASTER:
907           Log("Node role for master: OK", indent=1)
908         else:
909           Err("Node role for master: wrong: %s" % reply.server_reply.answer)
910
911   def DoConfdRequestReply(self, req):
912     self.confd_counting_callback.RegisterQuery(req.rsalt)
913     self.confd_client.SendRequest(req, async=False)
914     while not self.confd_counting_callback.AllAnswered():
915       if not self.confd_client.ReceiveReply():
916         Err("Did not receive all expected confd replies")
917         break
918
919   def BurnConfd(self):
920     """Run confd queries for our instances.
921
922     The following confd queries are tested:
923       - CONFD_REQ_PING: simple ping
924       - CONFD_REQ_CLUSTER_MASTER: cluster master
925       - CONFD_REQ_NODE_ROLE_BYNAME: node role, for the master
926
927     """
928     Log("Checking confd results")
929
930     filter_callback = confd_client.ConfdFilterCallback(self.ConfdCallback)
931     counting_callback = confd_client.ConfdCountingCallback(filter_callback)
932     self.confd_counting_callback = counting_callback
933
934     self.confd_client = confd_client.GetConfdClient(counting_callback)
935
936     req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
937     self.DoConfdRequestReply(req)
938
939     req = confd_client.ConfdClientRequest(
940       type=constants.CONFD_REQ_CLUSTER_MASTER)
941     self.DoConfdRequestReply(req)
942
943     req = confd_client.ConfdClientRequest(
944         type=constants.CONFD_REQ_NODE_ROLE_BYNAME,
945         query=self.cluster_info["master"])
946     self.DoConfdRequestReply(req)
947
948   def _CheckInstanceAlive(self, instance):
949     """Check if an instance is alive by doing http checks.
950
951     This will try to retrieve the url on the instance /hostname.txt
952     and check that it contains the hostname of the instance. In case
953     we get ECONNREFUSED, we retry up to the net timeout seconds, for
954     any other error we abort.
955
956     """
957     if not self.opts.http_check:
958       return
959     end_time = time.time() + self.opts.net_timeout
960     url = None
961     while time.time() < end_time and url is None:
962       try:
963         url = self.url_opener.open("http://%s/hostname.txt" % instance)
964       except IOError:
965         # here we can have connection refused, no route to host, etc.
966         time.sleep(1)
967     if url is None:
968       raise InstanceDown(instance, "Cannot contact instance")
969     hostname = url.read().strip()
970     url.close()
971     if hostname != instance:
972       raise InstanceDown(instance, ("Hostname mismatch, expected %s, got %s" %
973                                     (instance, hostname)))
974
975   def BurninCluster(self):
976     """Test a cluster intensively.
977
978     This will create instances and then start/stop/failover them.
979     It is safe for existing instances but could impact performance.
980
981     """
982
983     opts = self.opts
984
985     Log("Testing global parameters")
986
987     if (len(self.nodes) == 1 and
988         opts.disk_template not in (constants.DT_DISKLESS, constants.DT_PLAIN,
989                                    constants.DT_FILE,
990                                    constants.DT_SHARED_FILE)):
991       Err("When one node is available/selected the disk template must"
992           " be 'diskless', 'file' or 'plain'")
993
994     has_err = True
995     try:
996       self.BurnCreateInstances()
997       if opts.do_replace1 and opts.disk_template in constants.DTS_INT_MIRROR:
998         self.BurnReplaceDisks1D8()
999       if (opts.do_replace2 and len(self.nodes) > 2 and
1000           opts.disk_template in constants.DTS_INT_MIRROR):
1001         self.BurnReplaceDisks2()
1002
1003       if (opts.disk_template in constants.DTS_GROWABLE and
1004           compat.any(n > 0 for n in self.disk_growth)):
1005         self.BurnGrowDisks()
1006
1007       if opts.do_failover and opts.disk_template in constants.DTS_MIRRORED:
1008         self.BurnFailover()
1009
1010       if opts.do_migrate:
1011         if opts.disk_template not in constants.DTS_MIRRORED:
1012           Log("Skipping migration (disk template %s does not support it)",
1013               opts.disk_template)
1014         elif not self.hv_class.CAN_MIGRATE:
1015           Log("Skipping migration (hypervisor %s does not support it)",
1016               self.hypervisor)
1017         else:
1018           self.BurnMigrate()
1019
1020       if (opts.do_move and len(self.nodes) > 1 and
1021           opts.disk_template in [constants.DT_PLAIN, constants.DT_FILE]):
1022         self.BurnMove()
1023
1024       if (opts.do_importexport and
1025           opts.disk_template not in (constants.DT_DISKLESS,
1026                                      constants.DT_SHARED_FILE,
1027                                      constants.DT_FILE)):
1028         self.BurnImportExport()
1029
1030       if opts.do_reinstall:
1031         self.BurnReinstall()
1032
1033       if opts.do_reboot:
1034         self.BurnReboot()
1035
1036       if opts.do_addremove_disks:
1037         self.BurnAddRemoveDisks()
1038
1039       default_nic_mode = self.cluster_default_nicparams[constants.NIC_MODE]
1040       # Don't add/remove nics in routed mode, as we would need an ip to add
1041       # them with
1042       if opts.do_addremove_nics:
1043         if default_nic_mode == constants.NIC_MODE_BRIDGED:
1044           self.BurnAddRemoveNICs()
1045         else:
1046           Log("Skipping nic add/remove as the cluster is not in bridged mode")
1047
1048       if opts.do_activate_disks:
1049         self.BurnActivateDisks()
1050
1051       if opts.rename:
1052         self.BurnRename()
1053
1054       if opts.do_confd_tests:
1055         self.BurnConfd()
1056
1057       if opts.do_startstop:
1058         self.BurnStopStart()
1059
1060       has_err = False
1061     finally:
1062       if has_err:
1063         Log("Error detected: opcode buffer follows:\n\n")
1064         Log(self.GetFeedbackBuf())
1065         Log("\n\n")
1066       if not self.opts.keep_instances:
1067         try:
1068           self.BurnRemove()
1069         except Exception, err:  # pylint: disable=W0703
1070           if has_err: # already detected errors, so errors in removal
1071                       # are quite expected
1072             Log("Note: error detected during instance remove: %s", err)
1073           else: # non-expected error
1074             raise
1075
1076     return constants.EXIT_SUCCESS
1077
1078
1079 def main():
1080   """Main function.
1081
1082   """
1083   utils.SetupLogging(constants.LOG_BURNIN, sys.argv[0],
1084                      debug=False, stderr_logging=True)
1085
1086   return Burner().BurninCluster()
1087
1088
1089 if __name__ == "__main__":
1090   main()