Remove quotes from CommaJoin and convert to it
[ganeti-local] / tools / burnin
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Burnin program
23
24 """
25
26 import os
27 import sys
28 import optparse
29 import time
30 import socket
31 import urllib
32 from itertools import izip, islice, cycle
33 from cStringIO import StringIO
34
35 from ganeti import opcodes
36 from ganeti import constants
37 from ganeti import cli
38 from ganeti import errors
39 from ganeti import utils
40
41
42 USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...")
43
44 MAX_RETRIES = 3
45
46 class InstanceDown(Exception):
47   """The checked instance was not up"""
48
49
50 class BurninFailure(Exception):
51   """Failure detected during burning"""
52
53
54 def Usage():
55   """Shows program usage information and exits the program."""
56
57   print >> sys.stderr, "Usage:"
58   print >> sys.stderr, USAGE
59   sys.exit(2)
60
61
62 def Log(msg, indent=0):
63   """Simple function that prints out its argument.
64
65   """
66   headers = {
67     0: "- ",
68     1: "* ",
69     2: ""
70     }
71   sys.stdout.write("%*s%s%s\n" % (2*indent, "",
72                                    headers.get(indent, "  "), msg))
73   sys.stdout.flush()
74
75 def Err(msg, exit_code=1):
76   """Simple error logging that prints to stderr.
77
78   """
79   sys.stderr.write(msg + "\n")
80   sys.stderr.flush()
81   sys.exit(exit_code)
82
83
84 class SimpleOpener(urllib.FancyURLopener):
85   """A simple url opener"""
86
87   def prompt_user_passwd(self, host, realm, clear_cache = 0):
88     """No-interaction version of prompt_user_passwd."""
89     return None, None
90
91   def http_error_default(self, url, fp, errcode, errmsg, headers):
92     """Custom error handling"""
93     # make sure sockets are not left in CLOSE_WAIT, this is similar
94     # but with a different exception to the BasicURLOpener class
95     _ = fp.read() # throw away data
96     fp.close()
97     raise InstanceDown("HTTP error returned: code %s, msg %s" %
98                        (errcode, errmsg))
99
100
101 OPTIONS = [
102   cli.cli_option("-o", "--os", dest="os", default=None,
103                  help="OS to use during burnin",
104                  metavar="<OS>",
105                  completion_suggest=cli.OPT_COMPL_ONE_OS),
106   cli.cli_option("--disk-size", dest="disk_size",
107                  help="Disk size (determines disk count)",
108                  default="128m", type="string", metavar="<size,size,...>",
109                  completion_suggest=("128M 512M 1G 4G 1G,256M"
110                                      " 4G,1G,1G 10G").split()),
111   cli.cli_option("--disk-growth", dest="disk_growth", help="Disk growth",
112                  default="128m", type="string", metavar="<size,size,...>"),
113   cli.cli_option("--mem-size", dest="mem_size", help="Memory size",
114                  default=128, type="unit", metavar="<size>",
115                  completion_suggest=("128M 256M 512M 1G 4G 8G"
116                                      " 12G 16G").split()),
117   cli.VERBOSE_OPT,
118   cli.cli_option("--no-replace1", dest="do_replace1",
119                  help="Skip disk replacement with the same secondary",
120                  action="store_false", default=True),
121   cli.cli_option("--no-replace2", dest="do_replace2",
122                  help="Skip disk replacement with a different secondary",
123                  action="store_false", default=True),
124   cli.cli_option("--no-failover", dest="do_failover",
125                  help="Skip instance failovers", action="store_false",
126                  default=True),
127   cli.cli_option("--no-migrate", dest="do_migrate",
128                  help="Skip instance live migration",
129                  action="store_false", default=True),
130   cli.cli_option("--no-move", dest="do_move",
131                  help="Skip instance moves", action="store_false",
132                  default=True),
133   cli.cli_option("--no-importexport", dest="do_importexport",
134                  help="Skip instance export/import", action="store_false",
135                  default=True),
136   cli.cli_option("--no-startstop", dest="do_startstop",
137                  help="Skip instance stop/start", action="store_false",
138                  default=True),
139   cli.cli_option("--no-reinstall", dest="do_reinstall",
140                  help="Skip instance reinstall", action="store_false",
141                  default=True),
142   cli.cli_option("--no-reboot", dest="do_reboot",
143                  help="Skip instance reboot", action="store_false",
144                  default=True),
145   cli.cli_option("--no-activate-disks", dest="do_activate_disks",
146                  help="Skip disk activation/deactivation",
147                  action="store_false", default=True),
148   cli.cli_option("--no-add-disks", dest="do_addremove_disks",
149                  help="Skip disk addition/removal",
150                  action="store_false", default=True),
151   cli.cli_option("--no-add-nics", dest="do_addremove_nics",
152                  help="Skip NIC addition/removal",
153                  action="store_false", default=True),
154   cli.cli_option("--no-nics", dest="nics",
155                  help="No network interfaces", action="store_const",
156                  const=[], default=[{}]),
157   cli.cli_option("--rename", dest="rename", default=None,
158                  help=("Give one unused instance name which is taken"
159                        " to start the renaming sequence"),
160                  metavar="<instance_name>"),
161   cli.cli_option("-t", "--disk-template", dest="disk_template",
162                  choices=list(constants.DISK_TEMPLATES),
163                  default=constants.DT_DRBD8,
164                  help="Disk template (diskless, file, plain or drbd) [drbd]"),
165   cli.cli_option("-n", "--nodes", dest="nodes", default="",
166                  help=("Comma separated list of nodes to perform"
167                        " the burnin on (defaults to all nodes)"),
168                  completion_suggest=cli.OPT_COMPL_MANY_NODES),
169   cli.cli_option("-I", "--iallocator", dest="iallocator",
170                  default=None, type="string",
171                  help=("Perform the allocation using an iallocator"
172                        " instead of fixed node spread (node restrictions no"
173                        " longer apply, therefore -n/--nodes must not be"
174                        " used"),
175                  completion_suggest=cli.OPT_COMPL_ONE_IALLOCATOR),
176   cli.cli_option("-p", "--parallel", default=False, action="store_true",
177                  dest="parallel",
178                  help=("Enable parallelization of some operations in"
179                        " order to speed burnin or to test granular locking")),
180   cli.cli_option("--net-timeout", default=15, type="int",
181                  dest="net_timeout",
182                  help=("The instance check network timeout in seconds"
183                        " (defaults to 15 seconds)"),
184                  completion_suggest="15 60 300 900".split()),
185   cli.cli_option("-C", "--http-check", default=False, action="store_true",
186                  dest="http_check",
187                  help=("Enable checking of instance status via http,"
188                        " looking for /hostname.txt that should contain the"
189                        " name of the instance")),
190   cli.cli_option("-K", "--keep-instances", default=False,
191                  action="store_true",
192                  dest="keep_instances",
193                  help=("Leave instances on the cluster after burnin,"
194                        " for investigation in case of errors or simply"
195                        " to use them")),
196   ]
197
198 # Mainly used for bash completion
199 ARGUMENTS = [cli.ArgInstance(min=1)]
200
201
202 class Burner(object):
203   """Burner class."""
204
205   def __init__(self):
206     """Constructor."""
207     utils.SetupLogging(constants.LOG_BURNIN, debug=False, stderr_logging=True)
208     self.url_opener = SimpleOpener()
209     self._feed_buf = StringIO()
210     self.nodes = []
211     self.instances = []
212     self.to_rem = []
213     self.queued_ops = []
214     self.opts = None
215     self.queue_retry = False
216     self.disk_count = self.disk_growth = self.disk_size = None
217     self.hvp = self.bep = None
218     self.ParseOptions()
219     self.cl = cli.GetClient()
220     self.GetState()
221
222   def ClearFeedbackBuf(self):
223     """Clear the feedback buffer."""
224     self._feed_buf.truncate(0)
225
226   def GetFeedbackBuf(self):
227     """Return the contents of the buffer."""
228     return self._feed_buf.getvalue()
229
230   def Feedback(self, msg):
231     """Acumulate feedback in our buffer."""
232     formatted_msg = "%s %s" % (time.ctime(utils.MergeTime(msg[0])), msg[2])
233     self._feed_buf.write(formatted_msg + "\n")
234     if self.opts.verbose:
235       Log(formatted_msg, indent=3)
236
237   def MaybeRetry(self, retry_count, msg, fn, *args):
238     """Possibly retry a given function execution.
239
240     @type retry_count: int
241     @param retry_count: retry counter:
242         - 0: non-retryable action
243         - 1: last retry for a retryable action
244         - MAX_RETRIES: original try for a retryable action
245     @type msg: str
246     @param msg: the kind of the operation
247     @type fn: callable
248     @param fn: the function to be called
249
250     """
251     try:
252       val = fn(*args)
253       if retry_count > 0 and retry_count < MAX_RETRIES:
254         Log("Idempotent %s succeeded after %d retries" %
255             (msg, MAX_RETRIES - retry_count))
256       return val
257     except Exception, err:
258       if retry_count == 0:
259         Log("Non-idempotent %s failed, aborting" % (msg, ))
260         raise
261       elif retry_count == 1:
262         Log("Idempotent %s repeated failure, aborting" % (msg, ))
263         raise
264       else:
265         Log("Idempotent %s failed, retry #%d/%d: %s" %
266             (msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err))
267         self.MaybeRetry(retry_count - 1, msg, fn, *args)
268
269   def _ExecOp(self, *ops):
270     """Execute one or more opcodes and manage the exec buffer.
271
272     @result: if only opcode has been passed, we return its result;
273         otherwise we return the list of results
274
275     """
276     job_id = cli.SendJob(ops, cl=self.cl)
277     results = cli.PollJob(job_id, cl=self.cl, feedback_fn=self.Feedback)
278     if len(ops) == 1:
279       return results[0]
280     else:
281       return results
282
283   def ExecOp(self, retry, *ops):
284     """Execute one or more opcodes and manage the exec buffer.
285
286     @result: if only opcode has been passed, we return its result;
287         otherwise we return the list of results
288
289     """
290     if retry:
291       rval = MAX_RETRIES
292     else:
293       rval = 0
294     return self.MaybeRetry(rval, "opcode", self._ExecOp, *ops)
295
296   def ExecOrQueue(self, name, *ops):
297     """Execute an opcode and manage the exec buffer."""
298     if self.opts.parallel:
299       self.queued_ops.append((ops, name))
300     else:
301       return self.ExecOp(self.queue_retry, *ops)
302
303   def StartBatch(self, retry):
304     """Start a new batch of jobs.
305
306     @param retry: whether this is a retryable batch
307
308     """
309     self.queued_ops = []
310     self.queue_retry = retry
311
312   def CommitQueue(self):
313     """Execute all submitted opcodes in case of parallel burnin"""
314     if not self.opts.parallel:
315       return
316
317     if self.queue_retry:
318       rval = MAX_RETRIES
319     else:
320       rval = 0
321
322     try:
323       results = self.MaybeRetry(rval, "jobset", self.ExecJobSet,
324                                 self.queued_ops)
325     finally:
326       self.queued_ops = []
327     return results
328
329   def ExecJobSet(self, jobs):
330     """Execute a set of jobs and return once all are done.
331
332     The method will return the list of results, if all jobs are
333     successful. Otherwise, OpExecError will be raised from within
334     cli.py.
335
336     """
337     self.ClearFeedbackBuf()
338     job_ids = [cli.SendJob(row[0], cl=self.cl) for row in jobs]
339     Log("Submitted job ID(s) %s" % utils.CommaJoin(job_ids), indent=1)
340     results = []
341     for jid, (_, iname) in zip(job_ids, jobs):
342       Log("waiting for job %s for %s" % (jid, iname), indent=2)
343       try:
344         results.append(cli.PollJob(jid, cl=self.cl, feedback_fn=self.Feedback))
345       except Exception, err:
346         Log("Job for %s failed: %s" % (iname, err))
347     if len(results) != len(jobs):
348       raise BurninFailure()
349     return results
350
351   def _DoCheckInstances(fn):
352     """Decorator for checking instances.
353
354     """
355     def wrapper(self, *args, **kwargs):
356       val = fn(self, *args, **kwargs)
357       for instance in self.instances:
358         self._CheckInstanceAlive(instance)
359       return val
360
361     return wrapper
362
363   def _DoBatch(retry):
364     """Decorator for possible batch operations.
365
366     Must come after the _DoCheckInstances decorator (if any).
367
368     @param retry: whether this is a retryable batch, will be
369         passed to StartBatch
370
371     """
372     def wrap(fn):
373       def batched(self, *args, **kwargs):
374         self.StartBatch(retry)
375         val = fn(self, *args, **kwargs)
376         self.CommitQueue()
377         return val
378       return batched
379
380     return wrap
381
382   def ParseOptions(self):
383     """Parses the command line options.
384
385     In case of command line errors, it will show the usage and exit the
386     program.
387
388     """
389     parser = optparse.OptionParser(usage="\n%s" % USAGE,
390                                    version=("%%prog (ganeti) %s" %
391                                             constants.RELEASE_VERSION),
392                                    option_list=OPTIONS)
393
394     options, args = parser.parse_args()
395     if len(args) < 1 or options.os is None:
396       Usage()
397
398     supported_disk_templates = (constants.DT_DISKLESS,
399                                 constants.DT_FILE,
400                                 constants.DT_PLAIN,
401                                 constants.DT_DRBD8)
402     if options.disk_template not in supported_disk_templates:
403       Err("Unknown disk template '%s'" % options.disk_template)
404
405     if options.disk_template == constants.DT_DISKLESS:
406       disk_size = disk_growth = []
407       options.do_addremove_disks = False
408     else:
409       disk_size = [utils.ParseUnit(v) for v in options.disk_size.split(",")]
410       disk_growth = [utils.ParseUnit(v)
411                      for v in options.disk_growth.split(",")]
412       if len(disk_growth) != len(disk_size):
413         Err("Wrong disk sizes/growth combination")
414     if ((disk_size and options.disk_template == constants.DT_DISKLESS) or
415         (not disk_size and options.disk_template != constants.DT_DISKLESS)):
416       Err("Wrong disk count/disk template combination")
417
418     self.disk_size = disk_size
419     self.disk_growth = disk_growth
420     self.disk_count = len(disk_size)
421
422     if options.nodes and options.iallocator:
423       Err("Give either the nodes option or the iallocator option, not both")
424
425     self.opts = options
426     self.instances = args
427     self.bep = {
428       constants.BE_MEMORY: options.mem_size,
429       constants.BE_VCPUS: 1,
430       }
431     self.hvp = {}
432
433     socket.setdefaulttimeout(options.net_timeout)
434
435   def GetState(self):
436     """Read the cluster state from the config."""
437     if self.opts.nodes:
438       names = self.opts.nodes.split(",")
439     else:
440       names = []
441     try:
442       op = opcodes.OpQueryNodes(output_fields=["name", "offline", "drained"],
443                                 names=names, use_locking=True)
444       result = self.ExecOp(True, op)
445     except errors.GenericError, err:
446       err_code, msg = cli.FormatError(err)
447       Err(msg, exit_code=err_code)
448     self.nodes = [data[0] for data in result if not (data[1] or data[2])]
449
450     op_diagnose = opcodes.OpDiagnoseOS(output_fields=["name", "valid",
451                                                       "variants"], names=[])
452     result = self.ExecOp(True, op_diagnose)
453
454     if not result:
455       Err("Can't get the OS list")
456
457     found = False
458     for (name, valid, variants) in result:
459       if valid and self.opts.os in cli.CalculateOSNames(name, variants):
460         found = True
461         break
462
463     if not found:
464       Err("OS '%s' not found" % self.opts.os)
465
466   @_DoCheckInstances
467   @_DoBatch(False)
468   def BurnCreateInstances(self):
469     """Create the given instances.
470
471     """
472     self.to_rem = []
473     mytor = izip(cycle(self.nodes),
474                  islice(cycle(self.nodes), 1, None),
475                  self.instances)
476
477     Log("Creating instances")
478     for pnode, snode, instance in mytor:
479       Log("instance %s" % instance, indent=1)
480       if self.opts.iallocator:
481         pnode = snode = None
482         msg = "with iallocator %s" % self.opts.iallocator
483       elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
484         snode = None
485         msg = "on %s" % pnode
486       else:
487         msg = "on %s, %s" % (pnode, snode)
488
489       Log(msg, indent=2)
490
491       op = opcodes.OpCreateInstance(instance_name=instance,
492                                     disks = [ {"size": size}
493                                               for size in self.disk_size],
494                                     disk_template=self.opts.disk_template,
495                                     nics=self.opts.nics,
496                                     mode=constants.INSTANCE_CREATE,
497                                     os_type=self.opts.os,
498                                     pnode=pnode,
499                                     snode=snode,
500                                     start=True,
501                                     ip_check=True,
502                                     wait_for_sync=True,
503                                     file_driver="loop",
504                                     file_storage_dir=None,
505                                     iallocator=self.opts.iallocator,
506                                     beparams=self.bep,
507                                     hvparams=self.hvp,
508                                     )
509
510       self.ExecOrQueue(instance, op)
511       self.to_rem.append(instance)
512
513   @_DoBatch(False)
514   def BurnGrowDisks(self):
515     """Grow both the os and the swap disks by the requested amount, if any."""
516     Log("Growing disks")
517     for instance in self.instances:
518       Log("instance %s" % instance, indent=1)
519       for idx, growth in enumerate(self.disk_growth):
520         if growth > 0:
521           op = opcodes.OpGrowDisk(instance_name=instance, disk=idx,
522                                   amount=growth, wait_for_sync=True)
523           Log("increase disk/%s by %s MB" % (idx, growth), indent=2)
524           self.ExecOrQueue(instance, op)
525
526   @_DoBatch(True)
527   def BurnReplaceDisks1D8(self):
528     """Replace disks on primary and secondary for drbd8."""
529     Log("Replacing disks on the same nodes")
530     for instance in self.instances:
531       Log("instance %s" % instance, indent=1)
532       ops = []
533       for mode in constants.REPLACE_DISK_SEC, constants.REPLACE_DISK_PRI:
534         op = opcodes.OpReplaceDisks(instance_name=instance,
535                                     mode=mode,
536                                     disks=[i for i in range(self.disk_count)])
537         Log("run %s" % mode, indent=2)
538         ops.append(op)
539       self.ExecOrQueue(instance, *ops)
540
541   @_DoBatch(True)
542   def BurnReplaceDisks2(self):
543     """Replace secondary node."""
544     Log("Changing the secondary node")
545     mode = constants.REPLACE_DISK_CHG
546
547     mytor = izip(islice(cycle(self.nodes), 2, None),
548                  self.instances)
549     for tnode, instance in mytor:
550       Log("instance %s" % instance, indent=1)
551       if self.opts.iallocator:
552         tnode = None
553         msg = "with iallocator %s" % self.opts.iallocator
554       else:
555         msg = tnode
556       op = opcodes.OpReplaceDisks(instance_name=instance,
557                                   mode=mode,
558                                   remote_node=tnode,
559                                   iallocator=self.opts.iallocator,
560                                   disks=[])
561       Log("run %s %s" % (mode, msg), indent=2)
562       self.ExecOrQueue(instance, op)
563
564   @_DoCheckInstances
565   @_DoBatch(False)
566   def BurnFailover(self):
567     """Failover the instances."""
568     Log("Failing over instances")
569     for instance in self.instances:
570       Log("instance %s" % instance, indent=1)
571       op = opcodes.OpFailoverInstance(instance_name=instance,
572                                       ignore_consistency=False)
573       self.ExecOrQueue(instance, op)
574
575   @_DoCheckInstances
576   @_DoBatch(False)
577   def BurnMove(self):
578     """Move the instances."""
579     Log("Moving instances")
580     mytor = izip(islice(cycle(self.nodes), 1, None),
581                  self.instances)
582     for tnode, instance in mytor:
583       Log("instance %s" % instance, indent=1)
584       op = opcodes.OpMoveInstance(instance_name=instance,
585                                   target_node=tnode)
586       self.ExecOrQueue(instance, op)
587
588   @_DoBatch(False)
589   def BurnMigrate(self):
590     """Migrate the instances."""
591     Log("Migrating instances")
592     for instance in self.instances:
593       Log("instance %s" % instance, indent=1)
594       op1 = opcodes.OpMigrateInstance(instance_name=instance, live=True,
595                                       cleanup=False)
596
597       op2 = opcodes.OpMigrateInstance(instance_name=instance, live=True,
598                                       cleanup=True)
599       Log("migration and migration cleanup", indent=2)
600       self.ExecOrQueue(instance, op1, op2)
601
602   @_DoCheckInstances
603   @_DoBatch(False)
604   def BurnImportExport(self):
605     """Export the instance, delete it, and import it back.
606
607     """
608     Log("Exporting and re-importing instances")
609     mytor = izip(cycle(self.nodes),
610                  islice(cycle(self.nodes), 1, None),
611                  islice(cycle(self.nodes), 2, None),
612                  self.instances)
613
614     for pnode, snode, enode, instance in mytor:
615       Log("instance %s" % instance, indent=1)
616       # read the full name of the instance
617       nam_op = opcodes.OpQueryInstances(output_fields=["name"],
618                                         names=[instance], use_locking=True)
619       full_name = self.ExecOp(False, nam_op)[0][0]
620
621       if self.opts.iallocator:
622         pnode = snode = None
623         import_log_msg = ("import from %s"
624                           " with iallocator %s" %
625                           (enode, self.opts.iallocator))
626       elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
627         snode = None
628         import_log_msg = ("import from %s to %s" %
629                           (enode, pnode))
630       else:
631         import_log_msg = ("import from %s to %s, %s" %
632                           (enode, pnode, snode))
633
634       exp_op = opcodes.OpExportInstance(instance_name=instance,
635                                            target_node=enode,
636                                            shutdown=True)
637       rem_op = opcodes.OpRemoveInstance(instance_name=instance,
638                                         ignore_failures=True)
639       imp_dir = os.path.join(constants.EXPORT_DIR, full_name)
640       imp_op = opcodes.OpCreateInstance(instance_name=instance,
641                                         disks = [ {"size": size}
642                                                   for size in self.disk_size],
643                                         disk_template=self.opts.disk_template,
644                                         nics=self.opts.nics,
645                                         mode=constants.INSTANCE_IMPORT,
646                                         src_node=enode,
647                                         src_path=imp_dir,
648                                         pnode=pnode,
649                                         snode=snode,
650                                         start=True,
651                                         ip_check=True,
652                                         wait_for_sync=True,
653                                         file_storage_dir=None,
654                                         file_driver="loop",
655                                         iallocator=self.opts.iallocator,
656                                         beparams=self.bep,
657                                         hvparams=self.hvp,
658                                         )
659
660       erem_op = opcodes.OpRemoveExport(instance_name=instance)
661
662       Log("export to node %s" % enode, indent=2)
663       Log("remove instance", indent=2)
664       Log(import_log_msg, indent=2)
665       Log("remove export", indent=2)
666       self.ExecOrQueue(instance, exp_op, rem_op, imp_op, erem_op)
667
668   def StopInstanceOp(self, instance):
669     """Stop given instance."""
670     return opcodes.OpShutdownInstance(instance_name=instance)
671
672   def StartInstanceOp(self, instance):
673     """Start given instance."""
674     return opcodes.OpStartupInstance(instance_name=instance, force=False)
675
676   def RenameInstanceOp(self, instance, instance_new):
677     """Rename instance."""
678     return opcodes.OpRenameInstance(instance_name=instance,
679                                     new_name=instance_new)
680
681   @_DoCheckInstances
682   @_DoBatch(True)
683   def BurnStopStart(self):
684     """Stop/start the instances."""
685     Log("Stopping and starting instances")
686     for instance in self.instances:
687       Log("instance %s" % instance, indent=1)
688       op1 = self.StopInstanceOp(instance)
689       op2 = self.StartInstanceOp(instance)
690       self.ExecOrQueue(instance, op1, op2)
691
692   @_DoBatch(False)
693   def BurnRemove(self):
694     """Remove the instances."""
695     Log("Removing instances")
696     for instance in self.to_rem:
697       Log("instance %s" % instance, indent=1)
698       op = opcodes.OpRemoveInstance(instance_name=instance,
699                                     ignore_failures=True)
700       self.ExecOrQueue(instance, op)
701
702   def BurnRename(self):
703     """Rename the instances.
704
705     Note that this function will not execute in parallel, since we
706     only have one target for rename.
707
708     """
709     Log("Renaming instances")
710     rename = self.opts.rename
711     for instance in self.instances:
712       Log("instance %s" % instance, indent=1)
713       op_stop1 = self.StopInstanceOp(instance)
714       op_stop2 = self.StopInstanceOp(rename)
715       op_rename1 = self.RenameInstanceOp(instance, rename)
716       op_rename2 = self.RenameInstanceOp(rename, instance)
717       op_start1 = self.StartInstanceOp(rename)
718       op_start2 = self.StartInstanceOp(instance)
719       self.ExecOp(False, op_stop1, op_rename1, op_start1)
720       self._CheckInstanceAlive(rename)
721       self.ExecOp(False, op_stop2, op_rename2, op_start2)
722       self._CheckInstanceAlive(instance)
723
724   @_DoCheckInstances
725   @_DoBatch(True)
726   def BurnReinstall(self):
727     """Reinstall the instances."""
728     Log("Reinstalling instances")
729     for instance in self.instances:
730       Log("instance %s" % instance, indent=1)
731       op1 = self.StopInstanceOp(instance)
732       op2 = opcodes.OpReinstallInstance(instance_name=instance)
733       Log("reinstall without passing the OS", indent=2)
734       op3 = opcodes.OpReinstallInstance(instance_name=instance,
735                                         os_type=self.opts.os)
736       Log("reinstall specifying the OS", indent=2)
737       op4 = self.StartInstanceOp(instance)
738       self.ExecOrQueue(instance, op1, op2, op3, op4)
739
740   @_DoCheckInstances
741   @_DoBatch(True)
742   def BurnReboot(self):
743     """Reboot the instances."""
744     Log("Rebooting instances")
745     for instance in self.instances:
746       Log("instance %s" % instance, indent=1)
747       ops = []
748       for reboot_type in constants.REBOOT_TYPES:
749         op = opcodes.OpRebootInstance(instance_name=instance,
750                                       reboot_type=reboot_type,
751                                       ignore_secondaries=False)
752         Log("reboot with type '%s'" % reboot_type, indent=2)
753         ops.append(op)
754       self.ExecOrQueue(instance, *ops)
755
756   @_DoCheckInstances
757   @_DoBatch(True)
758   def BurnActivateDisks(self):
759     """Activate and deactivate disks of the instances."""
760     Log("Activating/deactivating disks")
761     for instance in self.instances:
762       Log("instance %s" % instance, indent=1)
763       op_start = self.StartInstanceOp(instance)
764       op_act = opcodes.OpActivateInstanceDisks(instance_name=instance)
765       op_deact = opcodes.OpDeactivateInstanceDisks(instance_name=instance)
766       op_stop = self.StopInstanceOp(instance)
767       Log("activate disks when online", indent=2)
768       Log("activate disks when offline", indent=2)
769       Log("deactivate disks (when offline)", indent=2)
770       self.ExecOrQueue(instance, op_act, op_stop, op_act, op_deact, op_start)
771
772   @_DoCheckInstances
773   @_DoBatch(False)
774   def BurnAddRemoveDisks(self):
775     """Add and remove an extra disk for the instances."""
776     Log("Adding and removing disks")
777     for instance in self.instances:
778       Log("instance %s" % instance, indent=1)
779       op_add = opcodes.OpSetInstanceParams(\
780         instance_name=instance,
781         disks=[(constants.DDM_ADD, {"size": self.disk_size[0]})])
782       op_rem = opcodes.OpSetInstanceParams(\
783         instance_name=instance, disks=[(constants.DDM_REMOVE, {})])
784       op_stop = self.StopInstanceOp(instance)
785       op_start = self.StartInstanceOp(instance)
786       Log("adding a disk", indent=2)
787       Log("removing last disk", indent=2)
788       self.ExecOrQueue(instance, op_add, op_stop, op_rem, op_start)
789
790   @_DoBatch(False)
791   def BurnAddRemoveNICs(self):
792     """Add and remove an extra NIC for the instances."""
793     Log("Adding and removing NICs")
794     for instance in self.instances:
795       Log("instance %s" % instance, indent=1)
796       op_add = opcodes.OpSetInstanceParams(\
797         instance_name=instance, nics=[(constants.DDM_ADD, {})])
798       op_rem = opcodes.OpSetInstanceParams(\
799         instance_name=instance, nics=[(constants.DDM_REMOVE, {})])
800       Log("adding a NIC", indent=2)
801       Log("removing last NIC", indent=2)
802       self.ExecOrQueue(instance, op_add, op_rem)
803
804   def _CheckInstanceAlive(self, instance):
805     """Check if an instance is alive by doing http checks.
806
807     This will try to retrieve the url on the instance /hostname.txt
808     and check that it contains the hostname of the instance. In case
809     we get ECONNREFUSED, we retry up to the net timeout seconds, for
810     any other error we abort.
811
812     """
813     if not self.opts.http_check:
814       return
815     end_time = time.time() + self.opts.net_timeout
816     url = None
817     while time.time() < end_time and url is None:
818       try:
819         url = self.url_opener.open("http://%s/hostname.txt" % instance)
820       except IOError:
821         # here we can have connection refused, no route to host, etc.
822         time.sleep(1)
823     if url is None:
824       raise InstanceDown(instance, "Cannot contact instance")
825     hostname = url.read().strip()
826     url.close()
827     if hostname != instance:
828       raise InstanceDown(instance, ("Hostname mismatch, expected %s, got %s" %
829                                     (instance, hostname)))
830
831   def BurninCluster(self):
832     """Test a cluster intensively.
833
834     This will create instances and then start/stop/failover them.
835     It is safe for existing instances but could impact performance.
836
837     """
838
839     opts = self.opts
840
841     Log("Testing global parameters")
842
843     if (len(self.nodes) == 1 and
844         opts.disk_template not in (constants.DT_DISKLESS, constants.DT_PLAIN,
845                                    constants.DT_FILE)):
846       Err("When one node is available/selected the disk template must"
847           " be 'diskless', 'file' or 'plain'")
848
849     has_err = True
850     try:
851       self.BurnCreateInstances()
852       if opts.do_replace1 and opts.disk_template in constants.DTS_NET_MIRROR:
853         self.BurnReplaceDisks1D8()
854       if (opts.do_replace2 and len(self.nodes) > 2 and
855           opts.disk_template in constants.DTS_NET_MIRROR) :
856         self.BurnReplaceDisks2()
857
858       if (opts.disk_template != constants.DT_DISKLESS and
859           utils.any(self.disk_growth, lambda n: n > 0)):
860         self.BurnGrowDisks()
861
862       if opts.do_failover and opts.disk_template in constants.DTS_NET_MIRROR:
863         self.BurnFailover()
864
865       if opts.do_migrate and opts.disk_template == constants.DT_DRBD8:
866         self.BurnMigrate()
867
868       if (opts.do_move and len(self.nodes) > 1 and
869           opts.disk_template in [constants.DT_PLAIN, constants.DT_FILE]):
870         self.BurnMove()
871
872       if (opts.do_importexport and
873           opts.disk_template not in (constants.DT_DISKLESS,
874                                      constants.DT_FILE)):
875         self.BurnImportExport()
876
877       if opts.do_reinstall:
878         self.BurnReinstall()
879
880       if opts.do_reboot:
881         self.BurnReboot()
882
883       if opts.do_addremove_disks:
884         self.BurnAddRemoveDisks()
885
886       if opts.do_addremove_nics:
887         self.BurnAddRemoveNICs()
888
889       if opts.do_activate_disks:
890         self.BurnActivateDisks()
891
892       if opts.rename:
893         self.BurnRename()
894
895       if opts.do_startstop:
896         self.BurnStopStart()
897
898       has_err = False
899     finally:
900       if has_err:
901         Log("Error detected: opcode buffer follows:\n\n")
902         Log(self.GetFeedbackBuf())
903         Log("\n\n")
904       if not self.opts.keep_instances:
905         try:
906           self.BurnRemove()
907         except Exception, err:
908           if has_err: # already detected errors, so errors in removal
909                       # are quite expected
910             Log("Note: error detected during instance remove: %s" % str(err))
911           else: # non-expected error
912             raise
913
914     return 0
915
916
917 def main():
918   """Main function"""
919
920   burner = Burner()
921   return burner.BurninCluster()
922
923
924 if __name__ == "__main__":
925   main()