Switch burnin to cli.JobExecutor
[ganeti-local] / tools / burnin
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Burnin program
23
24 """
25
26 import sys
27 import optparse
28 import time
29 import socket
30 import urllib
31 from itertools import izip, islice, cycle
32 from cStringIO import StringIO
33
34 from ganeti import opcodes
35 from ganeti import constants
36 from ganeti import cli
37 from ganeti import errors
38 from ganeti import utils
39
40
41 USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...")
42
43 MAX_RETRIES = 3
44 LOG_HEADERS = {
45   0: "- ",
46   1: "* ",
47   2: ""
48   }
49
50 class InstanceDown(Exception):
51   """The checked instance was not up"""
52
53
54 class BurninFailure(Exception):
55   """Failure detected during burning"""
56
57
58 def Usage():
59   """Shows program usage information and exits the program."""
60
61   print >> sys.stderr, "Usage:"
62   print >> sys.stderr, USAGE
63   sys.exit(2)
64
65
66 def Log(msg, *args, **kwargs):
67   """Simple function that prints out its argument.
68
69   """
70   if args:
71     msg = msg % args
72   indent = kwargs.get('indent', 0)
73   sys.stdout.write("%*s%s%s\n" % (2*indent, "",
74                                   LOG_HEADERS.get(indent, "  "), msg))
75   sys.stdout.flush()
76
77
78 def Err(msg, exit_code=1):
79   """Simple error logging that prints to stderr.
80
81   """
82   sys.stderr.write(msg + "\n")
83   sys.stderr.flush()
84   sys.exit(exit_code)
85
86
87 class SimpleOpener(urllib.FancyURLopener):
88   """A simple url opener"""
89   # pylint: disable-msg=W0221
90
91   def prompt_user_passwd(self, host, realm, clear_cache=0):
92     """No-interaction version of prompt_user_passwd."""
93     # we follow parent class' API
94     # pylint: disable-msg=W0613
95     return None, None
96
97   def http_error_default(self, url, fp, errcode, errmsg, headers):
98     """Custom error handling"""
99     # make sure sockets are not left in CLOSE_WAIT, this is similar
100     # but with a different exception to the BasicURLOpener class
101     _ = fp.read() # throw away data
102     fp.close()
103     raise InstanceDown("HTTP error returned: code %s, msg %s" %
104                        (errcode, errmsg))
105
106
107 OPTIONS = [
108   cli.cli_option("-o", "--os", dest="os", default=None,
109                  help="OS to use during burnin",
110                  metavar="<OS>",
111                  completion_suggest=cli.OPT_COMPL_ONE_OS),
112   cli.cli_option("--disk-size", dest="disk_size",
113                  help="Disk size (determines disk count)",
114                  default="128m", type="string", metavar="<size,size,...>",
115                  completion_suggest=("128M 512M 1G 4G 1G,256M"
116                                      " 4G,1G,1G 10G").split()),
117   cli.cli_option("--disk-growth", dest="disk_growth", help="Disk growth",
118                  default="128m", type="string", metavar="<size,size,...>"),
119   cli.cli_option("--mem-size", dest="mem_size", help="Memory size",
120                  default=128, type="unit", metavar="<size>",
121                  completion_suggest=("128M 256M 512M 1G 4G 8G"
122                                      " 12G 16G").split()),
123   cli.DEBUG_OPT,
124   cli.VERBOSE_OPT,
125   cli.NOIPCHECK_OPT,
126   cli.NONAMECHECK_OPT,
127   cli.EARLY_RELEASE_OPT,
128   cli.cli_option("--no-replace1", dest="do_replace1",
129                  help="Skip disk replacement with the same secondary",
130                  action="store_false", default=True),
131   cli.cli_option("--no-replace2", dest="do_replace2",
132                  help="Skip disk replacement with a different secondary",
133                  action="store_false", default=True),
134   cli.cli_option("--no-failover", dest="do_failover",
135                  help="Skip instance failovers", action="store_false",
136                  default=True),
137   cli.cli_option("--no-migrate", dest="do_migrate",
138                  help="Skip instance live migration",
139                  action="store_false", default=True),
140   cli.cli_option("--no-move", dest="do_move",
141                  help="Skip instance moves", action="store_false",
142                  default=True),
143   cli.cli_option("--no-importexport", dest="do_importexport",
144                  help="Skip instance export/import", action="store_false",
145                  default=True),
146   cli.cli_option("--no-startstop", dest="do_startstop",
147                  help="Skip instance stop/start", action="store_false",
148                  default=True),
149   cli.cli_option("--no-reinstall", dest="do_reinstall",
150                  help="Skip instance reinstall", action="store_false",
151                  default=True),
152   cli.cli_option("--no-reboot", dest="do_reboot",
153                  help="Skip instance reboot", action="store_false",
154                  default=True),
155   cli.cli_option("--no-activate-disks", dest="do_activate_disks",
156                  help="Skip disk activation/deactivation",
157                  action="store_false", default=True),
158   cli.cli_option("--no-add-disks", dest="do_addremove_disks",
159                  help="Skip disk addition/removal",
160                  action="store_false", default=True),
161   cli.cli_option("--no-add-nics", dest="do_addremove_nics",
162                  help="Skip NIC addition/removal",
163                  action="store_false", default=True),
164   cli.cli_option("--no-nics", dest="nics",
165                  help="No network interfaces", action="store_const",
166                  const=[], default=[{}]),
167   cli.cli_option("--rename", dest="rename", default=None,
168                  help=("Give one unused instance name which is taken"
169                        " to start the renaming sequence"),
170                  metavar="<instance_name>"),
171   cli.cli_option("-t", "--disk-template", dest="disk_template",
172                  choices=list(constants.DISK_TEMPLATES),
173                  default=constants.DT_DRBD8,
174                  help="Disk template (diskless, file, plain or drbd) [drbd]"),
175   cli.cli_option("-n", "--nodes", dest="nodes", default="",
176                  help=("Comma separated list of nodes to perform"
177                        " the burnin on (defaults to all nodes)"),
178                  completion_suggest=cli.OPT_COMPL_MANY_NODES),
179   cli.cli_option("-I", "--iallocator", dest="iallocator",
180                  default=None, type="string",
181                  help=("Perform the allocation using an iallocator"
182                        " instead of fixed node spread (node restrictions no"
183                        " longer apply, therefore -n/--nodes must not be"
184                        " used"),
185                  completion_suggest=cli.OPT_COMPL_ONE_IALLOCATOR),
186   cli.cli_option("-p", "--parallel", default=False, action="store_true",
187                  dest="parallel",
188                  help=("Enable parallelization of some operations in"
189                        " order to speed burnin or to test granular locking")),
190   cli.cli_option("--net-timeout", default=15, type="int",
191                  dest="net_timeout",
192                  help=("The instance check network timeout in seconds"
193                        " (defaults to 15 seconds)"),
194                  completion_suggest="15 60 300 900".split()),
195   cli.cli_option("-C", "--http-check", default=False, action="store_true",
196                  dest="http_check",
197                  help=("Enable checking of instance status via http,"
198                        " looking for /hostname.txt that should contain the"
199                        " name of the instance")),
200   cli.cli_option("-K", "--keep-instances", default=False,
201                  action="store_true",
202                  dest="keep_instances",
203                  help=("Leave instances on the cluster after burnin,"
204                        " for investigation in case of errors or simply"
205                        " to use them")),
206   ]
207
208 # Mainly used for bash completion
209 ARGUMENTS = [cli.ArgInstance(min=1)]
210
211
212 def _DoCheckInstances(fn):
213   """Decorator for checking instances.
214
215   """
216   def wrapper(self, *args, **kwargs):
217     val = fn(self, *args, **kwargs)
218     for instance in self.instances:
219       self._CheckInstanceAlive(instance) # pylint: disable-msg=W0212
220     return val
221
222   return wrapper
223
224
225 def _DoBatch(retry):
226   """Decorator for possible batch operations.
227
228   Must come after the _DoCheckInstances decorator (if any).
229
230   @param retry: whether this is a retryable batch, will be
231       passed to StartBatch
232
233   """
234   def wrap(fn):
235     def batched(self, *args, **kwargs):
236       self.StartBatch(retry)
237       val = fn(self, *args, **kwargs)
238       self.CommitQueue()
239       return val
240     return batched
241
242   return wrap
243
244
245 class Burner(object):
246   """Burner class."""
247
248   def __init__(self):
249     """Constructor."""
250     utils.SetupLogging(constants.LOG_BURNIN, debug=False, stderr_logging=True)
251     self.url_opener = SimpleOpener()
252     self._feed_buf = StringIO()
253     self.nodes = []
254     self.instances = []
255     self.to_rem = []
256     self.queued_ops = []
257     self.opts = None
258     self.queue_retry = False
259     self.disk_count = self.disk_growth = self.disk_size = None
260     self.hvp = self.bep = None
261     self.ParseOptions()
262     self.cl = cli.GetClient()
263     self.GetState()
264
265   def ClearFeedbackBuf(self):
266     """Clear the feedback buffer."""
267     self._feed_buf.truncate(0)
268
269   def GetFeedbackBuf(self):
270     """Return the contents of the buffer."""
271     return self._feed_buf.getvalue()
272
273   def Feedback(self, msg):
274     """Acumulate feedback in our buffer."""
275     formatted_msg = "%s %s" % (time.ctime(utils.MergeTime(msg[0])), msg[2])
276     self._feed_buf.write(formatted_msg + "\n")
277     if self.opts.verbose:
278       Log(formatted_msg, indent=3)
279
280   def MaybeRetry(self, retry_count, msg, fn, *args):
281     """Possibly retry a given function execution.
282
283     @type retry_count: int
284     @param retry_count: retry counter:
285         - 0: non-retryable action
286         - 1: last retry for a retryable action
287         - MAX_RETRIES: original try for a retryable action
288     @type msg: str
289     @param msg: the kind of the operation
290     @type fn: callable
291     @param fn: the function to be called
292
293     """
294     try:
295       val = fn(*args)
296       if retry_count > 0 and retry_count < MAX_RETRIES:
297         Log("Idempotent %s succeeded after %d retries",
298             msg, MAX_RETRIES - retry_count)
299       return val
300     except Exception, err: # pylint: disable-msg=W0703
301       if retry_count == 0:
302         Log("Non-idempotent %s failed, aborting", msg)
303         raise
304       elif retry_count == 1:
305         Log("Idempotent %s repeated failure, aborting", msg)
306         raise
307       else:
308         Log("Idempotent %s failed, retry #%d/%d: %s",
309             msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err)
310         self.MaybeRetry(retry_count - 1, msg, fn, *args)
311
312   def _SetDebug(self, ops):
313     """Set the debug value on the given opcodes"""
314     for op in ops:
315       op.debug_level = self.opts.debug
316
317   def _ExecOp(self, *ops):
318     """Execute one or more opcodes and manage the exec buffer.
319
320     @result: if only opcode has been passed, we return its result;
321         otherwise we return the list of results
322
323     """
324     job_id = cli.SendJob(ops, cl=self.cl)
325     results = cli.PollJob(job_id, cl=self.cl, feedback_fn=self.Feedback)
326     if len(ops) == 1:
327       return results[0]
328     else:
329       return results
330
331   def ExecOp(self, retry, *ops):
332     """Execute one or more opcodes and manage the exec buffer.
333
334     @result: if only opcode has been passed, we return its result;
335         otherwise we return the list of results
336
337     """
338     if retry:
339       rval = MAX_RETRIES
340     else:
341       rval = 0
342     self._SetDebug(ops)
343     return self.MaybeRetry(rval, "opcode", self._ExecOp, *ops)
344
345   def ExecOrQueue(self, name, *ops):
346     """Execute an opcode and manage the exec buffer."""
347     if self.opts.parallel:
348       self._SetDebug(ops)
349       self.queued_ops.append((ops, name))
350     else:
351       return self.ExecOp(self.queue_retry, *ops)
352
353   def StartBatch(self, retry):
354     """Start a new batch of jobs.
355
356     @param retry: whether this is a retryable batch
357
358     """
359     self.queued_ops = []
360     self.queue_retry = retry
361
362   def CommitQueue(self):
363     """Execute all submitted opcodes in case of parallel burnin"""
364     if not self.opts.parallel:
365       return
366
367     if self.queue_retry:
368       rval = MAX_RETRIES
369     else:
370       rval = 0
371
372     try:
373       results = self.MaybeRetry(rval, "jobset", self.ExecJobSet,
374                                 self.queued_ops)
375     finally:
376       self.queued_ops = []
377     return results
378
379   def ExecJobSet(self, jobs):
380     """Execute a set of jobs and return once all are done.
381
382     The method will return the list of results, if all jobs are
383     successful. Otherwise, OpExecError will be raised from within
384     cli.py.
385
386     """
387     self.ClearFeedbackBuf()
388     jex = cli.JobExecutor(cl=self.cl, feedback_fn=self.Feedback)
389     for ops, name in jobs:
390       jex.QueueJob(name, *ops) # pylint: disable-msg=W0142
391     try:
392       results = jex.GetResults()
393     except Exception, err: # pylint: disable-msg=W0703
394       Log("Jobs failed: %s", err)
395       raise BurninFailure()
396
397     if utils.any(results, lambda x: not x[0]):
398       raise BurninFailure()
399
400     return [i[1] for i in results]
401
402   def ParseOptions(self):
403     """Parses the command line options.
404
405     In case of command line errors, it will show the usage and exit the
406     program.
407
408     """
409     parser = optparse.OptionParser(usage="\n%s" % USAGE,
410                                    version=("%%prog (ganeti) %s" %
411                                             constants.RELEASE_VERSION),
412                                    option_list=OPTIONS)
413
414     options, args = parser.parse_args()
415     if len(args) < 1 or options.os is None:
416       Usage()
417
418     supported_disk_templates = (constants.DT_DISKLESS,
419                                 constants.DT_FILE,
420                                 constants.DT_PLAIN,
421                                 constants.DT_DRBD8)
422     if options.disk_template not in supported_disk_templates:
423       Err("Unknown disk template '%s'" % options.disk_template)
424
425     if options.disk_template == constants.DT_DISKLESS:
426       disk_size = disk_growth = []
427       options.do_addremove_disks = False
428     else:
429       disk_size = [utils.ParseUnit(v) for v in options.disk_size.split(",")]
430       disk_growth = [utils.ParseUnit(v)
431                      for v in options.disk_growth.split(",")]
432       if len(disk_growth) != len(disk_size):
433         Err("Wrong disk sizes/growth combination")
434     if ((disk_size and options.disk_template == constants.DT_DISKLESS) or
435         (not disk_size and options.disk_template != constants.DT_DISKLESS)):
436       Err("Wrong disk count/disk template combination")
437
438     self.disk_size = disk_size
439     self.disk_growth = disk_growth
440     self.disk_count = len(disk_size)
441
442     if options.nodes and options.iallocator:
443       Err("Give either the nodes option or the iallocator option, not both")
444
445     if options.http_check and not options.name_check:
446       Err("Can't enable HTTP checks without name checks")
447
448     self.opts = options
449     self.instances = args
450     self.bep = {
451       constants.BE_MEMORY: options.mem_size,
452       constants.BE_VCPUS: 1,
453       }
454     self.hvp = {}
455
456     socket.setdefaulttimeout(options.net_timeout)
457
458   def GetState(self):
459     """Read the cluster state from the config."""
460     if self.opts.nodes:
461       names = self.opts.nodes.split(",")
462     else:
463       names = []
464     try:
465       op = opcodes.OpQueryNodes(output_fields=["name", "offline", "drained"],
466                                 names=names, use_locking=True)
467       result = self.ExecOp(True, op)
468     except errors.GenericError, err:
469       err_code, msg = cli.FormatError(err)
470       Err(msg, exit_code=err_code)
471     self.nodes = [data[0] for data in result if not (data[1] or data[2])]
472
473     op_diagnose = opcodes.OpDiagnoseOS(output_fields=["name", "valid",
474                                                       "variants"], names=[])
475     result = self.ExecOp(True, op_diagnose)
476
477     if not result:
478       Err("Can't get the OS list")
479
480     found = False
481     for (name, valid, variants) in result:
482       if valid and self.opts.os in cli.CalculateOSNames(name, variants):
483         found = True
484         break
485
486     if not found:
487       Err("OS '%s' not found" % self.opts.os)
488
489   @_DoCheckInstances
490   @_DoBatch(False)
491   def BurnCreateInstances(self):
492     """Create the given instances.
493
494     """
495     self.to_rem = []
496     mytor = izip(cycle(self.nodes),
497                  islice(cycle(self.nodes), 1, None),
498                  self.instances)
499
500     Log("Creating instances")
501     for pnode, snode, instance in mytor:
502       Log("instance %s", instance, indent=1)
503       if self.opts.iallocator:
504         pnode = snode = None
505         msg = "with iallocator %s" % self.opts.iallocator
506       elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
507         snode = None
508         msg = "on %s" % pnode
509       else:
510         msg = "on %s, %s" % (pnode, snode)
511
512       Log(msg, indent=2)
513
514       op = opcodes.OpCreateInstance(instance_name=instance,
515                                     disks = [ {"size": size}
516                                               for size in self.disk_size],
517                                     disk_template=self.opts.disk_template,
518                                     nics=self.opts.nics,
519                                     mode=constants.INSTANCE_CREATE,
520                                     os_type=self.opts.os,
521                                     pnode=pnode,
522                                     snode=snode,
523                                     start=True,
524                                     ip_check=self.opts.ip_check,
525                                     name_check=self.opts.name_check,
526                                     wait_for_sync=True,
527                                     file_driver="loop",
528                                     file_storage_dir=None,
529                                     iallocator=self.opts.iallocator,
530                                     beparams=self.bep,
531                                     hvparams=self.hvp,
532                                     )
533
534       self.ExecOrQueue(instance, op)
535       self.to_rem.append(instance)
536
537   @_DoBatch(False)
538   def BurnGrowDisks(self):
539     """Grow both the os and the swap disks by the requested amount, if any."""
540     Log("Growing disks")
541     for instance in self.instances:
542       Log("instance %s", instance, indent=1)
543       for idx, growth in enumerate(self.disk_growth):
544         if growth > 0:
545           op = opcodes.OpGrowDisk(instance_name=instance, disk=idx,
546                                   amount=growth, wait_for_sync=True)
547           Log("increase disk/%s by %s MB", idx, growth, indent=2)
548           self.ExecOrQueue(instance, op)
549
550   @_DoBatch(True)
551   def BurnReplaceDisks1D8(self):
552     """Replace disks on primary and secondary for drbd8."""
553     Log("Replacing disks on the same nodes")
554     for instance in self.instances:
555       Log("instance %s", instance, indent=1)
556       ops = []
557       for mode in constants.REPLACE_DISK_SEC, constants.REPLACE_DISK_PRI:
558         op = opcodes.OpReplaceDisks(instance_name=instance,
559                                     mode=mode,
560                                     disks=[i for i in range(self.disk_count)],
561                                     early_release=self.opts.early_release)
562         Log("run %s", mode, indent=2)
563         ops.append(op)
564       self.ExecOrQueue(instance, *ops) # pylint: disable-msg=W0142
565
566   @_DoBatch(True)
567   def BurnReplaceDisks2(self):
568     """Replace secondary node."""
569     Log("Changing the secondary node")
570     mode = constants.REPLACE_DISK_CHG
571
572     mytor = izip(islice(cycle(self.nodes), 2, None),
573                  self.instances)
574     for tnode, instance in mytor:
575       Log("instance %s", instance, indent=1)
576       if self.opts.iallocator:
577         tnode = None
578         msg = "with iallocator %s" % self.opts.iallocator
579       else:
580         msg = tnode
581       op = opcodes.OpReplaceDisks(instance_name=instance,
582                                   mode=mode,
583                                   remote_node=tnode,
584                                   iallocator=self.opts.iallocator,
585                                   disks=[],
586                                   early_release=self.opts.early_release)
587       Log("run %s %s", mode, msg, indent=2)
588       self.ExecOrQueue(instance, op)
589
590   @_DoCheckInstances
591   @_DoBatch(False)
592   def BurnFailover(self):
593     """Failover the instances."""
594     Log("Failing over instances")
595     for instance in self.instances:
596       Log("instance %s", instance, indent=1)
597       op = opcodes.OpFailoverInstance(instance_name=instance,
598                                       ignore_consistency=False)
599       self.ExecOrQueue(instance, op)
600
601   @_DoCheckInstances
602   @_DoBatch(False)
603   def BurnMove(self):
604     """Move the instances."""
605     Log("Moving instances")
606     mytor = izip(islice(cycle(self.nodes), 1, None),
607                  self.instances)
608     for tnode, instance in mytor:
609       Log("instance %s", instance, indent=1)
610       op = opcodes.OpMoveInstance(instance_name=instance,
611                                   target_node=tnode)
612       self.ExecOrQueue(instance, op)
613
614   @_DoBatch(False)
615   def BurnMigrate(self):
616     """Migrate the instances."""
617     Log("Migrating instances")
618     for instance in self.instances:
619       Log("instance %s", instance, indent=1)
620       op1 = opcodes.OpMigrateInstance(instance_name=instance, live=True,
621                                       cleanup=False)
622
623       op2 = opcodes.OpMigrateInstance(instance_name=instance, live=True,
624                                       cleanup=True)
625       Log("migration and migration cleanup", indent=2)
626       self.ExecOrQueue(instance, op1, op2)
627
628   @_DoCheckInstances
629   @_DoBatch(False)
630   def BurnImportExport(self):
631     """Export the instance, delete it, and import it back.
632
633     """
634     Log("Exporting and re-importing instances")
635     mytor = izip(cycle(self.nodes),
636                  islice(cycle(self.nodes), 1, None),
637                  islice(cycle(self.nodes), 2, None),
638                  self.instances)
639
640     for pnode, snode, enode, instance in mytor:
641       Log("instance %s", instance, indent=1)
642       # read the full name of the instance
643       nam_op = opcodes.OpQueryInstances(output_fields=["name"],
644                                         names=[instance], use_locking=True)
645       full_name = self.ExecOp(False, nam_op)[0][0]
646
647       if self.opts.iallocator:
648         pnode = snode = None
649         import_log_msg = ("import from %s"
650                           " with iallocator %s" %
651                           (enode, self.opts.iallocator))
652       elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
653         snode = None
654         import_log_msg = ("import from %s to %s" %
655                           (enode, pnode))
656       else:
657         import_log_msg = ("import from %s to %s, %s" %
658                           (enode, pnode, snode))
659
660       exp_op = opcodes.OpExportInstance(instance_name=instance,
661                                            target_node=enode,
662                                            shutdown=True)
663       rem_op = opcodes.OpRemoveInstance(instance_name=instance,
664                                         ignore_failures=True)
665       imp_dir = utils.PathJoin(constants.EXPORT_DIR, full_name)
666       imp_op = opcodes.OpCreateInstance(instance_name=instance,
667                                         disks = [ {"size": size}
668                                                   for size in self.disk_size],
669                                         disk_template=self.opts.disk_template,
670                                         nics=self.opts.nics,
671                                         mode=constants.INSTANCE_IMPORT,
672                                         src_node=enode,
673                                         src_path=imp_dir,
674                                         pnode=pnode,
675                                         snode=snode,
676                                         start=True,
677                                         ip_check=self.opts.ip_check,
678                                         name_check=self.opts.name_check,
679                                         wait_for_sync=True,
680                                         file_storage_dir=None,
681                                         file_driver="loop",
682                                         iallocator=self.opts.iallocator,
683                                         beparams=self.bep,
684                                         hvparams=self.hvp,
685                                         )
686
687       erem_op = opcodes.OpRemoveExport(instance_name=instance)
688
689       Log("export to node %s", enode, indent=2)
690       Log("remove instance", indent=2)
691       Log(import_log_msg, indent=2)
692       Log("remove export", indent=2)
693       self.ExecOrQueue(instance, exp_op, rem_op, imp_op, erem_op)
694
695   @staticmethod
696   def StopInstanceOp(instance):
697     """Stop given instance."""
698     return opcodes.OpShutdownInstance(instance_name=instance)
699
700   @staticmethod
701   def StartInstanceOp(instance):
702     """Start given instance."""
703     return opcodes.OpStartupInstance(instance_name=instance, force=False)
704
705   @staticmethod
706   def RenameInstanceOp(instance, instance_new):
707     """Rename instance."""
708     return opcodes.OpRenameInstance(instance_name=instance,
709                                     new_name=instance_new)
710
711   @_DoCheckInstances
712   @_DoBatch(True)
713   def BurnStopStart(self):
714     """Stop/start the instances."""
715     Log("Stopping and starting instances")
716     for instance in self.instances:
717       Log("instance %s", instance, indent=1)
718       op1 = self.StopInstanceOp(instance)
719       op2 = self.StartInstanceOp(instance)
720       self.ExecOrQueue(instance, op1, op2)
721
722   @_DoBatch(False)
723   def BurnRemove(self):
724     """Remove the instances."""
725     Log("Removing instances")
726     for instance in self.to_rem:
727       Log("instance %s", instance, indent=1)
728       op = opcodes.OpRemoveInstance(instance_name=instance,
729                                     ignore_failures=True)
730       self.ExecOrQueue(instance, op)
731
732   def BurnRename(self):
733     """Rename the instances.
734
735     Note that this function will not execute in parallel, since we
736     only have one target for rename.
737
738     """
739     Log("Renaming instances")
740     rename = self.opts.rename
741     for instance in self.instances:
742       Log("instance %s", instance, indent=1)
743       op_stop1 = self.StopInstanceOp(instance)
744       op_stop2 = self.StopInstanceOp(rename)
745       op_rename1 = self.RenameInstanceOp(instance, rename)
746       op_rename2 = self.RenameInstanceOp(rename, instance)
747       op_start1 = self.StartInstanceOp(rename)
748       op_start2 = self.StartInstanceOp(instance)
749       self.ExecOp(False, op_stop1, op_rename1, op_start1)
750       self._CheckInstanceAlive(rename)
751       self.ExecOp(False, op_stop2, op_rename2, op_start2)
752       self._CheckInstanceAlive(instance)
753
754   @_DoCheckInstances
755   @_DoBatch(True)
756   def BurnReinstall(self):
757     """Reinstall the instances."""
758     Log("Reinstalling instances")
759     for instance in self.instances:
760       Log("instance %s", instance, indent=1)
761       op1 = self.StopInstanceOp(instance)
762       op2 = opcodes.OpReinstallInstance(instance_name=instance)
763       Log("reinstall without passing the OS", indent=2)
764       op3 = opcodes.OpReinstallInstance(instance_name=instance,
765                                         os_type=self.opts.os)
766       Log("reinstall specifying the OS", indent=2)
767       op4 = self.StartInstanceOp(instance)
768       self.ExecOrQueue(instance, op1, op2, op3, op4)
769
770   @_DoCheckInstances
771   @_DoBatch(True)
772   def BurnReboot(self):
773     """Reboot the instances."""
774     Log("Rebooting instances")
775     for instance in self.instances:
776       Log("instance %s", instance, indent=1)
777       ops = []
778       for reboot_type in constants.REBOOT_TYPES:
779         op = opcodes.OpRebootInstance(instance_name=instance,
780                                       reboot_type=reboot_type,
781                                       ignore_secondaries=False)
782         Log("reboot with type '%s'", reboot_type, indent=2)
783         ops.append(op)
784       self.ExecOrQueue(instance, *ops) # pylint: disable-msg=W0142
785
786   @_DoCheckInstances
787   @_DoBatch(True)
788   def BurnActivateDisks(self):
789     """Activate and deactivate disks of the instances."""
790     Log("Activating/deactivating disks")
791     for instance in self.instances:
792       Log("instance %s", instance, indent=1)
793       op_start = self.StartInstanceOp(instance)
794       op_act = opcodes.OpActivateInstanceDisks(instance_name=instance)
795       op_deact = opcodes.OpDeactivateInstanceDisks(instance_name=instance)
796       op_stop = self.StopInstanceOp(instance)
797       Log("activate disks when online", indent=2)
798       Log("activate disks when offline", indent=2)
799       Log("deactivate disks (when offline)", indent=2)
800       self.ExecOrQueue(instance, op_act, op_stop, op_act, op_deact, op_start)
801
802   @_DoCheckInstances
803   @_DoBatch(False)
804   def BurnAddRemoveDisks(self):
805     """Add and remove an extra disk for the instances."""
806     Log("Adding and removing disks")
807     for instance in self.instances:
808       Log("instance %s", instance, indent=1)
809       op_add = opcodes.OpSetInstanceParams(\
810         instance_name=instance,
811         disks=[(constants.DDM_ADD, {"size": self.disk_size[0]})])
812       op_rem = opcodes.OpSetInstanceParams(\
813         instance_name=instance, disks=[(constants.DDM_REMOVE, {})])
814       op_stop = self.StopInstanceOp(instance)
815       op_start = self.StartInstanceOp(instance)
816       Log("adding a disk", indent=2)
817       Log("removing last disk", indent=2)
818       self.ExecOrQueue(instance, op_add, op_stop, op_rem, op_start)
819
820   @_DoBatch(False)
821   def BurnAddRemoveNICs(self):
822     """Add and remove an extra NIC for the instances."""
823     Log("Adding and removing NICs")
824     for instance in self.instances:
825       Log("instance %s", instance, indent=1)
826       op_add = opcodes.OpSetInstanceParams(\
827         instance_name=instance, nics=[(constants.DDM_ADD, {})])
828       op_rem = opcodes.OpSetInstanceParams(\
829         instance_name=instance, nics=[(constants.DDM_REMOVE, {})])
830       Log("adding a NIC", indent=2)
831       Log("removing last NIC", indent=2)
832       self.ExecOrQueue(instance, op_add, op_rem)
833
834   def _CheckInstanceAlive(self, instance):
835     """Check if an instance is alive by doing http checks.
836
837     This will try to retrieve the url on the instance /hostname.txt
838     and check that it contains the hostname of the instance. In case
839     we get ECONNREFUSED, we retry up to the net timeout seconds, for
840     any other error we abort.
841
842     """
843     if not self.opts.http_check:
844       return
845     end_time = time.time() + self.opts.net_timeout
846     url = None
847     while time.time() < end_time and url is None:
848       try:
849         url = self.url_opener.open("http://%s/hostname.txt" % instance)
850       except IOError:
851         # here we can have connection refused, no route to host, etc.
852         time.sleep(1)
853     if url is None:
854       raise InstanceDown(instance, "Cannot contact instance")
855     hostname = url.read().strip()
856     url.close()
857     if hostname != instance:
858       raise InstanceDown(instance, ("Hostname mismatch, expected %s, got %s" %
859                                     (instance, hostname)))
860
861   def BurninCluster(self):
862     """Test a cluster intensively.
863
864     This will create instances and then start/stop/failover them.
865     It is safe for existing instances but could impact performance.
866
867     """
868
869     opts = self.opts
870
871     Log("Testing global parameters")
872
873     if (len(self.nodes) == 1 and
874         opts.disk_template not in (constants.DT_DISKLESS, constants.DT_PLAIN,
875                                    constants.DT_FILE)):
876       Err("When one node is available/selected the disk template must"
877           " be 'diskless', 'file' or 'plain'")
878
879     has_err = True
880     try:
881       self.BurnCreateInstances()
882       if opts.do_replace1 and opts.disk_template in constants.DTS_NET_MIRROR:
883         self.BurnReplaceDisks1D8()
884       if (opts.do_replace2 and len(self.nodes) > 2 and
885           opts.disk_template in constants.DTS_NET_MIRROR) :
886         self.BurnReplaceDisks2()
887
888       if (opts.disk_template != constants.DT_DISKLESS and
889           utils.any(self.disk_growth, lambda n: n > 0)):
890         self.BurnGrowDisks()
891
892       if opts.do_failover and opts.disk_template in constants.DTS_NET_MIRROR:
893         self.BurnFailover()
894
895       if opts.do_migrate and opts.disk_template == constants.DT_DRBD8:
896         self.BurnMigrate()
897
898       if (opts.do_move and len(self.nodes) > 1 and
899           opts.disk_template in [constants.DT_PLAIN, constants.DT_FILE]):
900         self.BurnMove()
901
902       if (opts.do_importexport and
903           opts.disk_template not in (constants.DT_DISKLESS,
904                                      constants.DT_FILE)):
905         self.BurnImportExport()
906
907       if opts.do_reinstall:
908         self.BurnReinstall()
909
910       if opts.do_reboot:
911         self.BurnReboot()
912
913       if opts.do_addremove_disks:
914         self.BurnAddRemoveDisks()
915
916       if opts.do_addremove_nics:
917         self.BurnAddRemoveNICs()
918
919       if opts.do_activate_disks:
920         self.BurnActivateDisks()
921
922       if opts.rename:
923         self.BurnRename()
924
925       if opts.do_startstop:
926         self.BurnStopStart()
927
928       has_err = False
929     finally:
930       if has_err:
931         Log("Error detected: opcode buffer follows:\n\n")
932         Log(self.GetFeedbackBuf())
933         Log("\n\n")
934       if not self.opts.keep_instances:
935         try:
936           self.BurnRemove()
937         except Exception, err:  # pylint: disable-msg=W0703
938           if has_err: # already detected errors, so errors in removal
939                       # are quite expected
940             Log("Note: error detected during instance remove: %s", err)
941           else: # non-expected error
942             raise
943
944     return 0
945
946
947 def main():
948   """Main function"""
949
950   burner = Burner()
951   return burner.BurninCluster()
952
953
954 if __name__ == "__main__":
955   main()