Disable the cluster-merge tool for the moment
[ganeti-local] / tools / burnin
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Burnin program
23
24 """
25
26 import sys
27 import optparse
28 import time
29 import socket
30 import urllib
31 from itertools import izip, islice, cycle
32 from cStringIO import StringIO
33
34 from ganeti import opcodes
35 from ganeti import constants
36 from ganeti import cli
37 from ganeti import errors
38 from ganeti import utils
39 from ganeti import hypervisor
40 from ganeti import compat
41
42 from ganeti.confd import client as confd_client
43
44
45 USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...")
46
47 MAX_RETRIES = 3
48 LOG_HEADERS = {
49   0: "- ",
50   1: "* ",
51   2: ""
52   }
53
54 class InstanceDown(Exception):
55   """The checked instance was not up"""
56
57
58 class BurninFailure(Exception):
59   """Failure detected during burning"""
60
61
62 def Usage():
63   """Shows program usage information and exits the program."""
64
65   print >> sys.stderr, "Usage:"
66   print >> sys.stderr, USAGE
67   sys.exit(2)
68
69
70 def Log(msg, *args, **kwargs):
71   """Simple function that prints out its argument.
72
73   """
74   if args:
75     msg = msg % args
76   indent = kwargs.get('indent', 0)
77   sys.stdout.write("%*s%s%s\n" % (2*indent, "",
78                                   LOG_HEADERS.get(indent, "  "), msg))
79   sys.stdout.flush()
80
81
82 def Err(msg, exit_code=1):
83   """Simple error logging that prints to stderr.
84
85   """
86   sys.stderr.write(msg + "\n")
87   sys.stderr.flush()
88   sys.exit(exit_code)
89
90
91 class SimpleOpener(urllib.FancyURLopener):
92   """A simple url opener"""
93   # pylint: disable-msg=W0221
94
95   def prompt_user_passwd(self, host, realm, clear_cache=0):
96     """No-interaction version of prompt_user_passwd."""
97     # we follow parent class' API
98     # pylint: disable-msg=W0613
99     return None, None
100
101   def http_error_default(self, url, fp, errcode, errmsg, headers):
102     """Custom error handling"""
103     # make sure sockets are not left in CLOSE_WAIT, this is similar
104     # but with a different exception to the BasicURLOpener class
105     _ = fp.read() # throw away data
106     fp.close()
107     raise InstanceDown("HTTP error returned: code %s, msg %s" %
108                        (errcode, errmsg))
109
110
111 OPTIONS = [
112   cli.cli_option("-o", "--os", dest="os", default=None,
113                  help="OS to use during burnin",
114                  metavar="<OS>",
115                  completion_suggest=cli.OPT_COMPL_ONE_OS),
116   cli.HYPERVISOR_OPT,
117   cli.OSPARAMS_OPT,
118   cli.cli_option("--disk-size", dest="disk_size",
119                  help="Disk size (determines disk count)",
120                  default="128m", type="string", metavar="<size,size,...>",
121                  completion_suggest=("128M 512M 1G 4G 1G,256M"
122                                      " 4G,1G,1G 10G").split()),
123   cli.cli_option("--disk-growth", dest="disk_growth", help="Disk growth",
124                  default="128m", type="string", metavar="<size,size,...>"),
125   cli.cli_option("--mem-size", dest="mem_size", help="Memory size",
126                  default=128, type="unit", metavar="<size>",
127                  completion_suggest=("128M 256M 512M 1G 4G 8G"
128                                      " 12G 16G").split()),
129   cli.DEBUG_OPT,
130   cli.VERBOSE_OPT,
131   cli.NOIPCHECK_OPT,
132   cli.NONAMECHECK_OPT,
133   cli.EARLY_RELEASE_OPT,
134   cli.cli_option("--no-replace1", dest="do_replace1",
135                  help="Skip disk replacement with the same secondary",
136                  action="store_false", default=True),
137   cli.cli_option("--no-replace2", dest="do_replace2",
138                  help="Skip disk replacement with a different secondary",
139                  action="store_false", default=True),
140   cli.cli_option("--no-failover", dest="do_failover",
141                  help="Skip instance failovers", action="store_false",
142                  default=True),
143   cli.cli_option("--no-migrate", dest="do_migrate",
144                  help="Skip instance live migration",
145                  action="store_false", default=True),
146   cli.cli_option("--no-move", dest="do_move",
147                  help="Skip instance moves", action="store_false",
148                  default=True),
149   cli.cli_option("--no-importexport", dest="do_importexport",
150                  help="Skip instance export/import", action="store_false",
151                  default=True),
152   cli.cli_option("--no-startstop", dest="do_startstop",
153                  help="Skip instance stop/start", action="store_false",
154                  default=True),
155   cli.cli_option("--no-reinstall", dest="do_reinstall",
156                  help="Skip instance reinstall", action="store_false",
157                  default=True),
158   cli.cli_option("--no-reboot", dest="do_reboot",
159                  help="Skip instance reboot", action="store_false",
160                  default=True),
161   cli.cli_option("--reboot-types", dest="reboot_types",
162                  help="Specify the reboot types", default=None),
163   cli.cli_option("--no-activate-disks", dest="do_activate_disks",
164                  help="Skip disk activation/deactivation",
165                  action="store_false", default=True),
166   cli.cli_option("--no-add-disks", dest="do_addremove_disks",
167                  help="Skip disk addition/removal",
168                  action="store_false", default=True),
169   cli.cli_option("--no-add-nics", dest="do_addremove_nics",
170                  help="Skip NIC addition/removal",
171                  action="store_false", default=True),
172   cli.cli_option("--no-nics", dest="nics",
173                  help="No network interfaces", action="store_const",
174                  const=[], default=[{}]),
175   cli.cli_option("--no-confd", dest="do_confd_tests",
176                  help="Skip confd queries",
177                  action="store_false", default=True),
178   cli.cli_option("--rename", dest="rename", default=None,
179                  help=("Give one unused instance name which is taken"
180                        " to start the renaming sequence"),
181                  metavar="<instance_name>"),
182   cli.cli_option("-t", "--disk-template", dest="disk_template",
183                  choices=list(constants.DISK_TEMPLATES),
184                  default=constants.DT_DRBD8,
185                  help="Disk template (diskless, file, plain or drbd) [drbd]"),
186   cli.cli_option("-n", "--nodes", dest="nodes", default="",
187                  help=("Comma separated list of nodes to perform"
188                        " the burnin on (defaults to all nodes)"),
189                  completion_suggest=cli.OPT_COMPL_MANY_NODES),
190   cli.cli_option("-I", "--iallocator", dest="iallocator",
191                  default=None, type="string",
192                  help=("Perform the allocation using an iallocator"
193                        " instead of fixed node spread (node restrictions no"
194                        " longer apply, therefore -n/--nodes must not be"
195                        " used"),
196                  completion_suggest=cli.OPT_COMPL_ONE_IALLOCATOR),
197   cli.cli_option("-p", "--parallel", default=False, action="store_true",
198                  dest="parallel",
199                  help=("Enable parallelization of some operations in"
200                        " order to speed burnin or to test granular locking")),
201   cli.cli_option("--net-timeout", default=15, type="int",
202                  dest="net_timeout",
203                  help=("The instance check network timeout in seconds"
204                        " (defaults to 15 seconds)"),
205                  completion_suggest="15 60 300 900".split()),
206   cli.cli_option("-C", "--http-check", default=False, action="store_true",
207                  dest="http_check",
208                  help=("Enable checking of instance status via http,"
209                        " looking for /hostname.txt that should contain the"
210                        " name of the instance")),
211   cli.cli_option("-K", "--keep-instances", default=False,
212                  action="store_true",
213                  dest="keep_instances",
214                  help=("Leave instances on the cluster after burnin,"
215                        " for investigation in case of errors or simply"
216                        " to use them")),
217   ]
218
219 # Mainly used for bash completion
220 ARGUMENTS = [cli.ArgInstance(min=1)]
221
222
223 def _DoCheckInstances(fn):
224   """Decorator for checking instances.
225
226   """
227   def wrapper(self, *args, **kwargs):
228     val = fn(self, *args, **kwargs)
229     for instance in self.instances:
230       self._CheckInstanceAlive(instance) # pylint: disable-msg=W0212
231     return val
232
233   return wrapper
234
235
236 def _DoBatch(retry):
237   """Decorator for possible batch operations.
238
239   Must come after the _DoCheckInstances decorator (if any).
240
241   @param retry: whether this is a retryable batch, will be
242       passed to StartBatch
243
244   """
245   def wrap(fn):
246     def batched(self, *args, **kwargs):
247       self.StartBatch(retry)
248       val = fn(self, *args, **kwargs)
249       self.CommitQueue()
250       return val
251     return batched
252
253   return wrap
254
255
256 class Burner(object):
257   """Burner class."""
258
259   def __init__(self):
260     """Constructor."""
261     self.url_opener = SimpleOpener()
262     self._feed_buf = StringIO()
263     self.nodes = []
264     self.instances = []
265     self.to_rem = []
266     self.queued_ops = []
267     self.opts = None
268     self.queue_retry = False
269     self.disk_count = self.disk_growth = self.disk_size = None
270     self.hvp = self.bep = None
271     self.ParseOptions()
272     self.cl = cli.GetClient()
273     self.GetState()
274
275   def ClearFeedbackBuf(self):
276     """Clear the feedback buffer."""
277     self._feed_buf.truncate(0)
278
279   def GetFeedbackBuf(self):
280     """Return the contents of the buffer."""
281     return self._feed_buf.getvalue()
282
283   def Feedback(self, msg):
284     """Acumulate feedback in our buffer."""
285     formatted_msg = "%s %s" % (time.ctime(utils.MergeTime(msg[0])), msg[2])
286     self._feed_buf.write(formatted_msg + "\n")
287     if self.opts.verbose:
288       Log(formatted_msg, indent=3)
289
290   def MaybeRetry(self, retry_count, msg, fn, *args):
291     """Possibly retry a given function execution.
292
293     @type retry_count: int
294     @param retry_count: retry counter:
295         - 0: non-retryable action
296         - 1: last retry for a retryable action
297         - MAX_RETRIES: original try for a retryable action
298     @type msg: str
299     @param msg: the kind of the operation
300     @type fn: callable
301     @param fn: the function to be called
302
303     """
304     try:
305       val = fn(*args)
306       if retry_count > 0 and retry_count < MAX_RETRIES:
307         Log("Idempotent %s succeeded after %d retries",
308             msg, MAX_RETRIES - retry_count)
309       return val
310     except Exception, err: # pylint: disable-msg=W0703
311       if retry_count == 0:
312         Log("Non-idempotent %s failed, aborting", msg)
313         raise
314       elif retry_count == 1:
315         Log("Idempotent %s repeated failure, aborting", msg)
316         raise
317       else:
318         Log("Idempotent %s failed, retry #%d/%d: %s",
319             msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err)
320         self.MaybeRetry(retry_count - 1, msg, fn, *args)
321
322   def _ExecOp(self, *ops):
323     """Execute one or more opcodes and manage the exec buffer.
324
325     @return: if only opcode has been passed, we return its result;
326         otherwise we return the list of results
327
328     """
329     job_id = cli.SendJob(ops, cl=self.cl)
330     results = cli.PollJob(job_id, cl=self.cl, feedback_fn=self.Feedback)
331     if len(ops) == 1:
332       return results[0]
333     else:
334       return results
335
336   def ExecOp(self, retry, *ops):
337     """Execute one or more opcodes and manage the exec buffer.
338
339     @return: if only opcode has been passed, we return its result;
340         otherwise we return the list of results
341
342     """
343     if retry:
344       rval = MAX_RETRIES
345     else:
346       rval = 0
347     cli.SetGenericOpcodeOpts(ops, self.opts)
348     return self.MaybeRetry(rval, "opcode", self._ExecOp, *ops)
349
350   def ExecOrQueue(self, name, ops, post_process=None):
351     """Execute an opcode and manage the exec buffer."""
352     if self.opts.parallel:
353       cli.SetGenericOpcodeOpts(ops, self.opts)
354       self.queued_ops.append((ops, name, post_process))
355     else:
356       val = self.ExecOp(self.queue_retry, *ops) # pylint: disable-msg=W0142
357       if post_process is not None:
358         post_process()
359       return val
360
361   def StartBatch(self, retry):
362     """Start a new batch of jobs.
363
364     @param retry: whether this is a retryable batch
365
366     """
367     self.queued_ops = []
368     self.queue_retry = retry
369
370   def CommitQueue(self):
371     """Execute all submitted opcodes in case of parallel burnin"""
372     if not self.opts.parallel or not self.queued_ops:
373       return
374
375     if self.queue_retry:
376       rval = MAX_RETRIES
377     else:
378       rval = 0
379
380     try:
381       results = self.MaybeRetry(rval, "jobset", self.ExecJobSet,
382                                 self.queued_ops)
383     finally:
384       self.queued_ops = []
385     return results
386
387   def ExecJobSet(self, jobs):
388     """Execute a set of jobs and return once all are done.
389
390     The method will return the list of results, if all jobs are
391     successful. Otherwise, OpExecError will be raised from within
392     cli.py.
393
394     """
395     self.ClearFeedbackBuf()
396     jex = cli.JobExecutor(cl=self.cl, feedback_fn=self.Feedback)
397     for ops, name, _ in jobs:
398       jex.QueueJob(name, *ops) # pylint: disable-msg=W0142
399     try:
400       results = jex.GetResults()
401     except Exception, err: # pylint: disable-msg=W0703
402       Log("Jobs failed: %s", err)
403       raise BurninFailure()
404
405     fail = False
406     val = []
407     for (_, name, post_process), (success, result) in zip(jobs, results):
408       if success:
409         if post_process:
410           try:
411             post_process()
412           except Exception, err: # pylint: disable-msg=W0703
413             Log("Post process call for job %s failed: %s", name, err)
414             fail = True
415         val.append(result)
416       else:
417         fail = True
418
419     if fail:
420       raise BurninFailure()
421
422     return val
423
424   def ParseOptions(self):
425     """Parses the command line options.
426
427     In case of command line errors, it will show the usage and exit the
428     program.
429
430     """
431     parser = optparse.OptionParser(usage="\n%s" % USAGE,
432                                    version=("%%prog (ganeti) %s" %
433                                             constants.RELEASE_VERSION),
434                                    option_list=OPTIONS)
435
436     options, args = parser.parse_args()
437     if len(args) < 1 or options.os is None:
438       Usage()
439
440     supported_disk_templates = (constants.DT_DISKLESS,
441                                 constants.DT_FILE,
442                                 constants.DT_PLAIN,
443                                 constants.DT_DRBD8)
444     if options.disk_template not in supported_disk_templates:
445       Err("Unknown disk template '%s'" % options.disk_template)
446
447     if options.disk_template == constants.DT_DISKLESS:
448       disk_size = disk_growth = []
449       options.do_addremove_disks = False
450     else:
451       disk_size = [utils.ParseUnit(v) for v in options.disk_size.split(",")]
452       disk_growth = [utils.ParseUnit(v)
453                      for v in options.disk_growth.split(",")]
454       if len(disk_growth) != len(disk_size):
455         Err("Wrong disk sizes/growth combination")
456     if ((disk_size and options.disk_template == constants.DT_DISKLESS) or
457         (not disk_size and options.disk_template != constants.DT_DISKLESS)):
458       Err("Wrong disk count/disk template combination")
459
460     self.disk_size = disk_size
461     self.disk_growth = disk_growth
462     self.disk_count = len(disk_size)
463
464     if options.nodes and options.iallocator:
465       Err("Give either the nodes option or the iallocator option, not both")
466
467     if options.http_check and not options.name_check:
468       Err("Can't enable HTTP checks without name checks")
469
470     self.opts = options
471     self.instances = args
472     self.bep = {
473       constants.BE_MEMORY: options.mem_size,
474       constants.BE_VCPUS: 1,
475       }
476
477     self.hypervisor = None
478     self.hvp = {}
479     if options.hypervisor:
480       self.hypervisor, self.hvp = options.hypervisor
481
482     if options.reboot_types is None:
483       options.reboot_types = constants.REBOOT_TYPES
484     else:
485       options.reboot_types = options.reboot_types.split(",")
486       rt_diff = set(options.reboot_types).difference(constants.REBOOT_TYPES)
487       if rt_diff:
488         Err("Invalid reboot types specified: %s" % utils.CommaJoin(rt_diff))
489
490     socket.setdefaulttimeout(options.net_timeout)
491
492   def GetState(self):
493     """Read the cluster state from the master daemon."""
494     if self.opts.nodes:
495       names = self.opts.nodes.split(",")
496     else:
497       names = []
498     try:
499       op = opcodes.OpNodeQuery(output_fields=["name", "offline", "drained"],
500                                names=names, use_locking=True)
501       result = self.ExecOp(True, op)
502     except errors.GenericError, err:
503       err_code, msg = cli.FormatError(err)
504       Err(msg, exit_code=err_code)
505     self.nodes = [data[0] for data in result if not (data[1] or data[2])]
506
507     op_diagnose = opcodes.OpOsDiagnose(output_fields=["name",
508                                                       "variants",
509                                                       "hidden"],
510                                        names=[])
511     result = self.ExecOp(True, op_diagnose)
512
513     if not result:
514       Err("Can't get the OS list")
515
516     found = False
517     for (name, variants, _) in result:
518       if self.opts.os in cli.CalculateOSNames(name, variants):
519         found = True
520         break
521
522     if not found:
523       Err("OS '%s' not found" % self.opts.os)
524
525     cluster_info = self.cl.QueryClusterInfo()
526     self.cluster_info = cluster_info
527     if not self.cluster_info:
528       Err("Can't get cluster info")
529
530     default_nic_params = self.cluster_info["nicparams"][constants.PP_DEFAULT]
531     self.cluster_default_nicparams = default_nic_params
532     if self.hypervisor is None:
533       self.hypervisor = self.cluster_info["default_hypervisor"]
534     self.hv_class = hypervisor.GetHypervisorClass(self.hypervisor)
535
536   @_DoCheckInstances
537   @_DoBatch(False)
538   def BurnCreateInstances(self):
539     """Create the given instances.
540
541     """
542     self.to_rem = []
543     mytor = izip(cycle(self.nodes),
544                  islice(cycle(self.nodes), 1, None),
545                  self.instances)
546
547     Log("Creating instances")
548     for pnode, snode, instance in mytor:
549       Log("instance %s", instance, indent=1)
550       if self.opts.iallocator:
551         pnode = snode = None
552         msg = "with iallocator %s" % self.opts.iallocator
553       elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
554         snode = None
555         msg = "on %s" % pnode
556       else:
557         msg = "on %s, %s" % (pnode, snode)
558
559       Log(msg, indent=2)
560
561       op = opcodes.OpInstanceCreate(instance_name=instance,
562                                     disks = [ {"size": size}
563                                               for size in self.disk_size],
564                                     disk_template=self.opts.disk_template,
565                                     nics=self.opts.nics,
566                                     mode=constants.INSTANCE_CREATE,
567                                     os_type=self.opts.os,
568                                     pnode=pnode,
569                                     snode=snode,
570                                     start=True,
571                                     ip_check=self.opts.ip_check,
572                                     name_check=self.opts.name_check,
573                                     wait_for_sync=True,
574                                     file_driver="loop",
575                                     file_storage_dir=None,
576                                     iallocator=self.opts.iallocator,
577                                     beparams=self.bep,
578                                     hvparams=self.hvp,
579                                     hypervisor=self.hypervisor,
580                                     osparams=self.opts.osparams,
581                                     )
582       remove_instance = lambda name: lambda: self.to_rem.append(name)
583       self.ExecOrQueue(instance, [op], post_process=remove_instance(instance))
584
585   @_DoBatch(False)
586   def BurnGrowDisks(self):
587     """Grow both the os and the swap disks by the requested amount, if any."""
588     Log("Growing disks")
589     for instance in self.instances:
590       Log("instance %s", instance, indent=1)
591       for idx, growth in enumerate(self.disk_growth):
592         if growth > 0:
593           op = opcodes.OpInstanceGrowDisk(instance_name=instance, disk=idx,
594                                           amount=growth, wait_for_sync=True)
595           Log("increase disk/%s by %s MB", idx, growth, indent=2)
596           self.ExecOrQueue(instance, [op])
597
598   @_DoBatch(True)
599   def BurnReplaceDisks1D8(self):
600     """Replace disks on primary and secondary for drbd8."""
601     Log("Replacing disks on the same nodes")
602     early_release = self.opts.early_release
603     for instance in self.instances:
604       Log("instance %s", instance, indent=1)
605       ops = []
606       for mode in constants.REPLACE_DISK_SEC, constants.REPLACE_DISK_PRI:
607         op = opcodes.OpInstanceReplaceDisks(instance_name=instance,
608                                             mode=mode,
609                                             disks=list(range(self.disk_count)),
610                                             early_release=early_release)
611         Log("run %s", mode, indent=2)
612         ops.append(op)
613       self.ExecOrQueue(instance, ops)
614
615   @_DoBatch(True)
616   def BurnReplaceDisks2(self):
617     """Replace secondary node."""
618     Log("Changing the secondary node")
619     mode = constants.REPLACE_DISK_CHG
620
621     mytor = izip(islice(cycle(self.nodes), 2, None),
622                  self.instances)
623     for tnode, instance in mytor:
624       Log("instance %s", instance, indent=1)
625       if self.opts.iallocator:
626         tnode = None
627         msg = "with iallocator %s" % self.opts.iallocator
628       else:
629         msg = tnode
630       op = opcodes.OpInstanceReplaceDisks(instance_name=instance,
631                                           mode=mode,
632                                           remote_node=tnode,
633                                           iallocator=self.opts.iallocator,
634                                           disks=[],
635                                           early_release=self.opts.early_release)
636       Log("run %s %s", mode, msg, indent=2)
637       self.ExecOrQueue(instance, [op])
638
639   @_DoCheckInstances
640   @_DoBatch(False)
641   def BurnFailover(self):
642     """Failover the instances."""
643     Log("Failing over instances")
644     for instance in self.instances:
645       Log("instance %s", instance, indent=1)
646       op = opcodes.OpInstanceFailover(instance_name=instance,
647                                       ignore_consistency=False)
648       self.ExecOrQueue(instance, [op])
649
650   @_DoCheckInstances
651   @_DoBatch(False)
652   def BurnMove(self):
653     """Move the instances."""
654     Log("Moving instances")
655     mytor = izip(islice(cycle(self.nodes), 1, None),
656                  self.instances)
657     for tnode, instance in mytor:
658       Log("instance %s", instance, indent=1)
659       op = opcodes.OpInstanceMove(instance_name=instance,
660                                   target_node=tnode)
661       self.ExecOrQueue(instance, [op])
662
663   @_DoBatch(False)
664   def BurnMigrate(self):
665     """Migrate the instances."""
666     Log("Migrating instances")
667     for instance in self.instances:
668       Log("instance %s", instance, indent=1)
669       op1 = opcodes.OpInstanceMigrate(instance_name=instance, mode=None,
670                                       cleanup=False)
671
672       op2 = opcodes.OpInstanceMigrate(instance_name=instance, mode=None,
673                                       cleanup=True)
674       Log("migration and migration cleanup", indent=2)
675       self.ExecOrQueue(instance, [op1, op2])
676
677   @_DoCheckInstances
678   @_DoBatch(False)
679   def BurnImportExport(self):
680     """Export the instance, delete it, and import it back.
681
682     """
683     Log("Exporting and re-importing instances")
684     mytor = izip(cycle(self.nodes),
685                  islice(cycle(self.nodes), 1, None),
686                  islice(cycle(self.nodes), 2, None),
687                  self.instances)
688
689     for pnode, snode, enode, instance in mytor:
690       Log("instance %s", instance, indent=1)
691       # read the full name of the instance
692       nam_op = opcodes.OpInstanceQuery(output_fields=["name"],
693                                        names=[instance], use_locking=True)
694       full_name = self.ExecOp(False, nam_op)[0][0]
695
696       if self.opts.iallocator:
697         pnode = snode = None
698         import_log_msg = ("import from %s"
699                           " with iallocator %s" %
700                           (enode, self.opts.iallocator))
701       elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
702         snode = None
703         import_log_msg = ("import from %s to %s" %
704                           (enode, pnode))
705       else:
706         import_log_msg = ("import from %s to %s, %s" %
707                           (enode, pnode, snode))
708
709       exp_op = opcodes.OpBackupExport(instance_name=instance,
710                                       target_node=enode,
711                                       mode=constants.EXPORT_MODE_LOCAL,
712                                       shutdown=True)
713       rem_op = opcodes.OpInstanceRemove(instance_name=instance,
714                                         ignore_failures=True)
715       imp_dir = utils.PathJoin(constants.EXPORT_DIR, full_name)
716       imp_op = opcodes.OpInstanceCreate(instance_name=instance,
717                                         disks = [ {"size": size}
718                                                   for size in self.disk_size],
719                                         disk_template=self.opts.disk_template,
720                                         nics=self.opts.nics,
721                                         mode=constants.INSTANCE_IMPORT,
722                                         src_node=enode,
723                                         src_path=imp_dir,
724                                         pnode=pnode,
725                                         snode=snode,
726                                         start=True,
727                                         ip_check=self.opts.ip_check,
728                                         name_check=self.opts.name_check,
729                                         wait_for_sync=True,
730                                         file_storage_dir=None,
731                                         file_driver="loop",
732                                         iallocator=self.opts.iallocator,
733                                         beparams=self.bep,
734                                         hvparams=self.hvp,
735                                         osparams=self.opts.osparams,
736                                         )
737
738       erem_op = opcodes.OpBackupRemove(instance_name=instance)
739
740       Log("export to node %s", enode, indent=2)
741       Log("remove instance", indent=2)
742       Log(import_log_msg, indent=2)
743       Log("remove export", indent=2)
744       self.ExecOrQueue(instance, [exp_op, rem_op, imp_op, erem_op])
745
746   @staticmethod
747   def StopInstanceOp(instance):
748     """Stop given instance."""
749     return opcodes.OpInstanceShutdown(instance_name=instance)
750
751   @staticmethod
752   def StartInstanceOp(instance):
753     """Start given instance."""
754     return opcodes.OpInstanceStartup(instance_name=instance, force=False)
755
756   @staticmethod
757   def RenameInstanceOp(instance, instance_new):
758     """Rename instance."""
759     return opcodes.OpInstanceRename(instance_name=instance,
760                                     new_name=instance_new)
761
762   @_DoCheckInstances
763   @_DoBatch(True)
764   def BurnStopStart(self):
765     """Stop/start the instances."""
766     Log("Stopping and starting instances")
767     for instance in self.instances:
768       Log("instance %s", instance, indent=1)
769       op1 = self.StopInstanceOp(instance)
770       op2 = self.StartInstanceOp(instance)
771       self.ExecOrQueue(instance, [op1, op2])
772
773   @_DoBatch(False)
774   def BurnRemove(self):
775     """Remove the instances."""
776     Log("Removing instances")
777     for instance in self.to_rem:
778       Log("instance %s", instance, indent=1)
779       op = opcodes.OpInstanceRemove(instance_name=instance,
780                                     ignore_failures=True)
781       self.ExecOrQueue(instance, [op])
782
783   def BurnRename(self):
784     """Rename the instances.
785
786     Note that this function will not execute in parallel, since we
787     only have one target for rename.
788
789     """
790     Log("Renaming instances")
791     rename = self.opts.rename
792     for instance in self.instances:
793       Log("instance %s", instance, indent=1)
794       op_stop1 = self.StopInstanceOp(instance)
795       op_stop2 = self.StopInstanceOp(rename)
796       op_rename1 = self.RenameInstanceOp(instance, rename)
797       op_rename2 = self.RenameInstanceOp(rename, instance)
798       op_start1 = self.StartInstanceOp(rename)
799       op_start2 = self.StartInstanceOp(instance)
800       self.ExecOp(False, op_stop1, op_rename1, op_start1)
801       self._CheckInstanceAlive(rename)
802       self.ExecOp(False, op_stop2, op_rename2, op_start2)
803       self._CheckInstanceAlive(instance)
804
805   @_DoCheckInstances
806   @_DoBatch(True)
807   def BurnReinstall(self):
808     """Reinstall the instances."""
809     Log("Reinstalling instances")
810     for instance in self.instances:
811       Log("instance %s", instance, indent=1)
812       op1 = self.StopInstanceOp(instance)
813       op2 = opcodes.OpInstanceReinstall(instance_name=instance)
814       Log("reinstall without passing the OS", indent=2)
815       op3 = opcodes.OpInstanceReinstall(instance_name=instance,
816                                         os_type=self.opts.os)
817       Log("reinstall specifying the OS", indent=2)
818       op4 = self.StartInstanceOp(instance)
819       self.ExecOrQueue(instance, [op1, op2, op3, op4])
820
821   @_DoCheckInstances
822   @_DoBatch(True)
823   def BurnReboot(self):
824     """Reboot the instances."""
825     Log("Rebooting instances")
826     for instance in self.instances:
827       Log("instance %s", instance, indent=1)
828       ops = []
829       for reboot_type in self.opts.reboot_types:
830         op = opcodes.OpInstanceReboot(instance_name=instance,
831                                       reboot_type=reboot_type,
832                                       ignore_secondaries=False)
833         Log("reboot with type '%s'", reboot_type, indent=2)
834         ops.append(op)
835       self.ExecOrQueue(instance, ops)
836
837   @_DoCheckInstances
838   @_DoBatch(True)
839   def BurnActivateDisks(self):
840     """Activate and deactivate disks of the instances."""
841     Log("Activating/deactivating disks")
842     for instance in self.instances:
843       Log("instance %s", instance, indent=1)
844       op_start = self.StartInstanceOp(instance)
845       op_act = opcodes.OpInstanceActivateDisks(instance_name=instance)
846       op_deact = opcodes.OpInstanceDeactivateDisks(instance_name=instance)
847       op_stop = self.StopInstanceOp(instance)
848       Log("activate disks when online", indent=2)
849       Log("activate disks when offline", indent=2)
850       Log("deactivate disks (when offline)", indent=2)
851       self.ExecOrQueue(instance, [op_act, op_stop, op_act, op_deact, op_start])
852
853   @_DoCheckInstances
854   @_DoBatch(False)
855   def BurnAddRemoveDisks(self):
856     """Add and remove an extra disk for the instances."""
857     Log("Adding and removing disks")
858     for instance in self.instances:
859       Log("instance %s", instance, indent=1)
860       op_add = opcodes.OpInstanceSetParams(\
861         instance_name=instance,
862         disks=[(constants.DDM_ADD, {"size": self.disk_size[0]})])
863       op_rem = opcodes.OpInstanceSetParams(\
864         instance_name=instance, disks=[(constants.DDM_REMOVE, {})])
865       op_stop = self.StopInstanceOp(instance)
866       op_start = self.StartInstanceOp(instance)
867       Log("adding a disk", indent=2)
868       Log("removing last disk", indent=2)
869       self.ExecOrQueue(instance, [op_add, op_stop, op_rem, op_start])
870
871   @_DoBatch(False)
872   def BurnAddRemoveNICs(self):
873     """Add and remove an extra NIC for the instances."""
874     Log("Adding and removing NICs")
875     for instance in self.instances:
876       Log("instance %s", instance, indent=1)
877       op_add = opcodes.OpInstanceSetParams(\
878         instance_name=instance, nics=[(constants.DDM_ADD, {})])
879       op_rem = opcodes.OpInstanceSetParams(\
880         instance_name=instance, nics=[(constants.DDM_REMOVE, {})])
881       Log("adding a NIC", indent=2)
882       Log("removing last NIC", indent=2)
883       self.ExecOrQueue(instance, [op_add, op_rem])
884
885   def ConfdCallback(self, reply):
886     """Callback for confd queries"""
887     if reply.type == confd_client.UPCALL_REPLY:
888       if reply.server_reply.status != constants.CONFD_REPL_STATUS_OK:
889         Err("Query %s gave non-ok status %s: %s" % (reply.orig_request,
890                                                     reply.server_reply.status,
891                                                     reply.server_reply))
892       if reply.orig_request.type == constants.CONFD_REQ_PING:
893         Log("Ping: OK", indent=1)
894       elif reply.orig_request.type == constants.CONFD_REQ_CLUSTER_MASTER:
895         if reply.server_reply.answer == self.cluster_info["master"]:
896           Log("Master: OK", indent=1)
897         else:
898           Err("Master: wrong: %s" % reply.server_reply.answer)
899       elif reply.orig_request.type == constants.CONFD_REQ_NODE_ROLE_BYNAME:
900         if reply.server_reply.answer == constants.CONFD_NODE_ROLE_MASTER:
901           Log("Node role for master: OK", indent=1)
902         else:
903           Err("Node role for master: wrong: %s" % reply.server_reply.answer)
904
905   def DoConfdRequestReply(self, req):
906     self.confd_counting_callback.RegisterQuery(req.rsalt)
907     self.confd_client.SendRequest(req, async=False)
908     while not self.confd_counting_callback.AllAnswered():
909       if not self.confd_client.ReceiveReply():
910         Err("Did not receive all expected confd replies")
911         break
912
913   def BurnConfd(self):
914     """Run confd queries for our instances.
915
916     The following confd queries are tested:
917       - CONFD_REQ_PING: simple ping
918       - CONFD_REQ_CLUSTER_MASTER: cluster master
919       - CONFD_REQ_NODE_ROLE_BYNAME: node role, for the master
920
921     """
922     Log("Checking confd results")
923
924     filter_callback = confd_client.ConfdFilterCallback(self.ConfdCallback)
925     counting_callback = confd_client.ConfdCountingCallback(filter_callback)
926     self.confd_counting_callback = counting_callback
927
928     self.confd_client = confd_client.GetConfdClient(counting_callback)
929
930     req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
931     self.DoConfdRequestReply(req)
932
933     req = confd_client.ConfdClientRequest(
934       type=constants.CONFD_REQ_CLUSTER_MASTER)
935     self.DoConfdRequestReply(req)
936
937     req = confd_client.ConfdClientRequest(
938         type=constants.CONFD_REQ_NODE_ROLE_BYNAME,
939         query=self.cluster_info["master"])
940     self.DoConfdRequestReply(req)
941
942   def _CheckInstanceAlive(self, instance):
943     """Check if an instance is alive by doing http checks.
944
945     This will try to retrieve the url on the instance /hostname.txt
946     and check that it contains the hostname of the instance. In case
947     we get ECONNREFUSED, we retry up to the net timeout seconds, for
948     any other error we abort.
949
950     """
951     if not self.opts.http_check:
952       return
953     end_time = time.time() + self.opts.net_timeout
954     url = None
955     while time.time() < end_time and url is None:
956       try:
957         url = self.url_opener.open("http://%s/hostname.txt" % instance)
958       except IOError:
959         # here we can have connection refused, no route to host, etc.
960         time.sleep(1)
961     if url is None:
962       raise InstanceDown(instance, "Cannot contact instance")
963     hostname = url.read().strip()
964     url.close()
965     if hostname != instance:
966       raise InstanceDown(instance, ("Hostname mismatch, expected %s, got %s" %
967                                     (instance, hostname)))
968
969   def BurninCluster(self):
970     """Test a cluster intensively.
971
972     This will create instances and then start/stop/failover them.
973     It is safe for existing instances but could impact performance.
974
975     """
976
977     opts = self.opts
978
979     Log("Testing global parameters")
980
981     if (len(self.nodes) == 1 and
982         opts.disk_template not in (constants.DT_DISKLESS, constants.DT_PLAIN,
983                                    constants.DT_FILE)):
984       Err("When one node is available/selected the disk template must"
985           " be 'diskless', 'file' or 'plain'")
986
987     has_err = True
988     try:
989       self.BurnCreateInstances()
990       if opts.do_replace1 and opts.disk_template in constants.DTS_NET_MIRROR:
991         self.BurnReplaceDisks1D8()
992       if (opts.do_replace2 and len(self.nodes) > 2 and
993           opts.disk_template in constants.DTS_NET_MIRROR) :
994         self.BurnReplaceDisks2()
995
996       if (opts.disk_template in constants.DTS_GROWABLE and
997           compat.any(n > 0 for n in self.disk_growth)):
998         self.BurnGrowDisks()
999
1000       if opts.do_failover and opts.disk_template in constants.DTS_NET_MIRROR:
1001         self.BurnFailover()
1002
1003       if opts.do_migrate:
1004         if opts.disk_template != constants.DT_DRBD8:
1005           Log("Skipping migration (disk template not DRBD8)")
1006         elif not self.hv_class.CAN_MIGRATE:
1007           Log("Skipping migration (hypervisor %s does not support it)",
1008               self.hypervisor)
1009         else:
1010           self.BurnMigrate()
1011
1012       if (opts.do_move and len(self.nodes) > 1 and
1013           opts.disk_template in [constants.DT_PLAIN, constants.DT_FILE]):
1014         self.BurnMove()
1015
1016       if (opts.do_importexport and
1017           opts.disk_template not in (constants.DT_DISKLESS,
1018                                      constants.DT_FILE)):
1019         self.BurnImportExport()
1020
1021       if opts.do_reinstall:
1022         self.BurnReinstall()
1023
1024       if opts.do_reboot:
1025         self.BurnReboot()
1026
1027       if opts.do_addremove_disks:
1028         self.BurnAddRemoveDisks()
1029
1030       default_nic_mode = self.cluster_default_nicparams[constants.NIC_MODE]
1031       # Don't add/remove nics in routed mode, as we would need an ip to add
1032       # them with
1033       if opts.do_addremove_nics:
1034         if default_nic_mode == constants.NIC_MODE_BRIDGED:
1035           self.BurnAddRemoveNICs()
1036         else:
1037           Log("Skipping nic add/remove as the cluster is not in bridged mode")
1038
1039       if opts.do_activate_disks:
1040         self.BurnActivateDisks()
1041
1042       if opts.rename:
1043         self.BurnRename()
1044
1045       if opts.do_confd_tests:
1046         self.BurnConfd()
1047
1048       if opts.do_startstop:
1049         self.BurnStopStart()
1050
1051       has_err = False
1052     finally:
1053       if has_err:
1054         Log("Error detected: opcode buffer follows:\n\n")
1055         Log(self.GetFeedbackBuf())
1056         Log("\n\n")
1057       if not self.opts.keep_instances:
1058         try:
1059           self.BurnRemove()
1060         except Exception, err:  # pylint: disable-msg=W0703
1061           if has_err: # already detected errors, so errors in removal
1062                       # are quite expected
1063             Log("Note: error detected during instance remove: %s", err)
1064           else: # non-expected error
1065             raise
1066
1067     return constants.EXIT_SUCCESS
1068
1069
1070 def main():
1071   """Main function.
1072
1073   """
1074   utils.SetupLogging(constants.LOG_BURNIN, sys.argv[0],
1075                      debug=False, stderr_logging=True)
1076
1077   return Burner().BurninCluster()
1078
1079
1080 if __name__ == "__main__":
1081   main()