burnin: move batch init/commit into a decorator
[ganeti-local] / tools / burnin
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Burnin program
23
24 """
25
26 import os
27 import sys
28 import optparse
29 import time
30 import socket
31 import urllib
32 from itertools import izip, islice, cycle
33 from cStringIO import StringIO
34
35 from ganeti import opcodes
36 from ganeti import constants
37 from ganeti import cli
38 from ganeti import errors
39 from ganeti import utils
40
41
42 USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...")
43
44 MAX_RETRIES = 3
45
46 class InstanceDown(Exception):
47   """The checked instance was not up"""
48
49
50 class BurninFailure(Exception):
51   """Failure detected during burning"""
52
53
54 def Usage():
55   """Shows program usage information and exits the program."""
56
57   print >> sys.stderr, "Usage:"
58   print >> sys.stderr, USAGE
59   sys.exit(2)
60
61
62 def Log(msg, indent=0):
63   """Simple function that prints out its argument.
64
65   """
66   headers = {
67     0: "- ",
68     1: "* ",
69     2: ""
70     }
71   sys.stdout.write("%*s%s%s\n" % (2*indent, "",
72                                    headers.get(indent, "  "), msg))
73   sys.stdout.flush()
74
75 def Err(msg, exit_code=1):
76   """Simple error logging that prints to stderr.
77
78   """
79   sys.stderr.write(msg + "\n")
80   sys.stderr.flush()
81   sys.exit(exit_code)
82
83
84 class SimpleOpener(urllib.FancyURLopener):
85   """A simple url opener"""
86
87   def prompt_user_passwd(self, host, realm, clear_cache = 0):
88     """No-interaction version of prompt_user_passwd."""
89     return None, None
90
91   def http_error_default(self, url, fp, errcode, errmsg, headers):
92     """Custom error handling"""
93     # make sure sockets are not left in CLOSE_WAIT, this is similar
94     # but with a different exception to the BasicURLOpener class
95     _ = fp.read() # throw away data
96     fp.close()
97     raise InstanceDown("HTTP error returned: code %s, msg %s" %
98                        (errcode, errmsg))
99
100
101 class Burner(object):
102   """Burner class."""
103
104   def __init__(self):
105     """Constructor."""
106     utils.SetupLogging(constants.LOG_BURNIN, debug=False, stderr_logging=True)
107     self.url_opener = SimpleOpener()
108     self._feed_buf = StringIO()
109     self.nodes = []
110     self.instances = []
111     self.to_rem = []
112     self.queued_ops = []
113     self.opts = None
114     self.queue_retry = False
115     self.disk_count = self.disk_growth = self.disk_size = None
116     self.hvp = self.bep = None
117     self.ParseOptions()
118     self.cl = cli.GetClient()
119     self.GetState()
120
121   def ClearFeedbackBuf(self):
122     """Clear the feedback buffer."""
123     self._feed_buf.truncate(0)
124
125   def GetFeedbackBuf(self):
126     """Return the contents of the buffer."""
127     return self._feed_buf.getvalue()
128
129   def Feedback(self, msg):
130     """Acumulate feedback in our buffer."""
131     self._feed_buf.write("%s %s\n" % (time.ctime(utils.MergeTime(msg[0])),
132                                       msg[2]))
133     if self.opts.verbose:
134       Log(msg, indent=3)
135
136   def MaybeRetry(self, retry_count, msg, fn, *args):
137     """Possibly retry a given function execution.
138
139     @type retry_count: int
140     @param retry_count: retry counter:
141         - 0: non-retryable action
142         - 1: last retry for a retryable action
143         - MAX_RETRIES: original try for a retryable action
144     @type msg: str
145     @param msg: the kind of the operation
146     @type fn: callable
147     @param fn: the function to be called
148
149     """
150     try:
151       val = fn(*args)
152       if retry_count > 0 and retry_count < MAX_RETRIES:
153         Log("Idempotent %s succeeded after %d retries" %
154             (msg, MAX_RETRIES - retry_count))
155       return val
156     except Exception, err:
157       if retry_count == 0:
158         Log("Non-idempotent %s failed, aborting" % (msg, ))
159         raise
160       elif retry_count == 1:
161         Log("Idempotent %s repeated failure, aborting" % (msg, ))
162         raise
163       else:
164         Log("Idempotent %s failed, retry #%d/%d: %s" %
165             (msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err))
166         self.MaybeRetry(retry_count - 1, msg, fn, *args)
167
168   def _ExecOp(self, *ops):
169     """Execute one or more opcodes and manage the exec buffer.
170
171     @result: if only opcode has been passed, we return its result;
172         otherwise we return the list of results
173
174     """
175     job_id = cli.SendJob(ops, cl=self.cl)
176     results = cli.PollJob(job_id, cl=self.cl, feedback_fn=self.Feedback)
177     if len(ops) == 1:
178       return results[0]
179     else:
180       return results
181
182   def ExecOp(self, retry, *ops):
183     """Execute one or more opcodes and manage the exec buffer.
184
185     @result: if only opcode has been passed, we return its result;
186         otherwise we return the list of results
187
188     """
189     if retry:
190       rval = MAX_RETRIES
191     else:
192       rval = 0
193     return self.MaybeRetry(rval, "opcode", self._ExecOp, *ops)
194
195   def ExecOrQueue(self, name, *ops):
196     """Execute an opcode and manage the exec buffer."""
197     if self.opts.parallel:
198       self.queued_ops.append((ops, name))
199     else:
200       return self.ExecOp(self.queue_retry, *ops)
201
202   def StartBatch(self, retry):
203     """Start a new batch of jobs.
204
205     @param retry: whether this is a retryable batch
206
207     """
208     self.queued_ops = []
209     self.queue_retry = retry
210
211   def CommitQueue(self):
212     """Execute all submitted opcodes in case of parallel burnin"""
213     if not self.opts.parallel:
214       return
215
216     if self.queue_retry:
217       rval = MAX_RETRIES
218     else:
219       rval = 0
220
221     try:
222       results = self.MaybeRetry(rval, "jobset", self.ExecJobSet,
223                                 self.queued_ops)
224     finally:
225       self.queued_ops = []
226     return results
227
228   def ExecJobSet(self, jobs):
229     """Execute a set of jobs and return once all are done.
230
231     The method will return the list of results, if all jobs are
232     successful. Otherwise, OpExecError will be raised from within
233     cli.py.
234
235     """
236     self.ClearFeedbackBuf()
237     job_ids = [cli.SendJob(row[0], cl=self.cl) for row in jobs]
238     Log("Submitted job ID(s) %s" % ", ".join(job_ids), indent=1)
239     results = []
240     for jid, (_, iname) in zip(job_ids, jobs):
241       Log("waiting for job %s for %s" % (jid, iname), indent=2)
242       try:
243         results.append(cli.PollJob(jid, cl=self.cl, feedback_fn=self.Feedback))
244       except Exception, err:
245         Log("Job for %s failed: %s" % (iname, err))
246     if len(results) != len(jobs):
247       raise BurninFailure()
248     return results
249
250   def _DoCheckInstances(fn):
251     """Decorator for checking instances.
252
253     """
254     def wrapper(self, *args, **kwargs):
255       val = fn(self, *args, **kwargs)
256       for instance in self.instances:
257         self._CheckInstanceAlive(instance)
258       return val
259
260     return wrapper
261
262   def _DoBatch(retry):
263     """Decorator for possible batch operations.
264
265     Must come after the _DoCheckInstances decorator (if any).
266
267     @param retry: whether this is a retryable batch, will be
268         passed to StartBatch
269
270     """
271     def wrap(fn):
272       def batched(self, *args, **kwargs):
273         self.StartBatch(retry)
274         val = fn(self, *args, **kwargs)
275         self.CommitQueue()
276         return val
277       return batched
278
279     return wrap
280
281   def ParseOptions(self):
282     """Parses the command line options.
283
284     In case of command line errors, it will show the usage and exit the
285     program.
286
287     """
288
289     parser = optparse.OptionParser(usage="\n%s" % USAGE,
290                                    version="%%prog (ganeti) %s" %
291                                    constants.RELEASE_VERSION,
292                                    option_class=cli.CliOption)
293
294     parser.add_option("-o", "--os", dest="os", default=None,
295                       help="OS to use during burnin",
296                       metavar="<OS>")
297     parser.add_option("--disk-size", dest="disk_size",
298                       help="Disk size (determines disk count)",
299                       default="128m", type="string", metavar="<size,size,...>")
300     parser.add_option("--disk-growth", dest="disk_growth", help="Disk growth",
301                       default="128m", type="string", metavar="<size,size,...>")
302     parser.add_option("--mem-size", dest="mem_size", help="Memory size",
303                       default=128, type="unit", metavar="<size>")
304     parser.add_option("-v", "--verbose",
305                       action="store_true", dest="verbose", default=False,
306                       help="print command execution messages to stdout")
307     parser.add_option("--no-replace1", dest="do_replace1",
308                       help="Skip disk replacement with the same secondary",
309                       action="store_false", default=True)
310     parser.add_option("--no-replace2", dest="do_replace2",
311                       help="Skip disk replacement with a different secondary",
312                       action="store_false", default=True)
313     parser.add_option("--no-failover", dest="do_failover",
314                       help="Skip instance failovers", action="store_false",
315                       default=True)
316     parser.add_option("--no-migrate", dest="do_migrate",
317                       help="Skip instance live migration",
318                       action="store_false", default=True)
319     parser.add_option("--no-importexport", dest="do_importexport",
320                       help="Skip instance export/import", action="store_false",
321                       default=True)
322     parser.add_option("--no-startstop", dest="do_startstop",
323                       help="Skip instance stop/start", action="store_false",
324                       default=True)
325     parser.add_option("--no-reinstall", dest="do_reinstall",
326                       help="Skip instance reinstall", action="store_false",
327                       default=True)
328     parser.add_option("--no-reboot", dest="do_reboot",
329                       help="Skip instance reboot", action="store_false",
330                       default=True)
331     parser.add_option("--no-activate-disks", dest="do_activate_disks",
332                       help="Skip disk activation/deactivation",
333                       action="store_false", default=True)
334     parser.add_option("--no-add-disks", dest="do_addremove_disks",
335                       help="Skip disk addition/removal",
336                       action="store_false", default=True)
337     parser.add_option("--no-add-nics", dest="do_addremove_nics",
338                       help="Skip NIC addition/removal",
339                       action="store_false", default=True)
340     parser.add_option("--no-nics", dest="nics",
341                       help="No network interfaces", action="store_const",
342                       const=[], default=[{}])
343     parser.add_option("--rename", dest="rename", default=None,
344                       help="Give one unused instance name which is taken"
345                            " to start the renaming sequence",
346                       metavar="<instance_name>")
347     parser.add_option("-t", "--disk-template", dest="disk_template",
348                       choices=("diskless", "file", "plain", "drbd"),
349                       default="drbd",
350                       help="Disk template (diskless, file, plain or drbd)"
351                             " [drbd]")
352     parser.add_option("-n", "--nodes", dest="nodes", default="",
353                       help="Comma separated list of nodes to perform"
354                       " the burnin on (defaults to all nodes)")
355     parser.add_option("-I", "--iallocator", dest="iallocator",
356                       default=None, type="string",
357                       help="Perform the allocation using an iallocator"
358                       " instead of fixed node spread (node restrictions no"
359                       " longer apply, therefore -n/--nodes must not be used")
360     parser.add_option("-p", "--parallel", default=False, action="store_true",
361                       dest="parallel",
362                       help="Enable parallelization of some operations in"
363                       " order to speed burnin or to test granular locking")
364     parser.add_option("--net-timeout", default=15, type="int",
365                       dest="net_timeout",
366                       help="The instance check network timeout in seconds"
367                       " (defaults to 15 seconds)")
368     parser.add_option("-C", "--http-check", default=False, action="store_true",
369                       dest="http_check",
370                       help="Enable checking of instance status via http,"
371                       " looking for /hostname.txt that should contain the"
372                       " name of the instance")
373     parser.add_option("-K", "--keep-instances", default=False,
374                       action="store_true",
375                       dest="keep_instances",
376                       help="Leave instances on the cluster after burnin,"
377                       " for investigation in case of errors or simply"
378                       " to use them")
379
380
381     options, args = parser.parse_args()
382     if len(args) < 1 or options.os is None:
383       Usage()
384
385     supported_disk_templates = (constants.DT_DISKLESS,
386                                 constants.DT_FILE,
387                                 constants.DT_PLAIN,
388                                 constants.DT_DRBD8)
389     if options.disk_template not in supported_disk_templates:
390       Err("Unknown disk template '%s'" % options.disk_template)
391
392     if options.disk_template == constants.DT_DISKLESS:
393       disk_size = disk_growth = []
394       options.do_addremove_disks = False
395     else:
396       disk_size = [utils.ParseUnit(v) for v in options.disk_size.split(",")]
397       disk_growth = [utils.ParseUnit(v)
398                      for v in options.disk_growth.split(",")]
399       if len(disk_growth) != len(disk_size):
400         Err("Wrong disk sizes/growth combination")
401     if ((disk_size and options.disk_template == constants.DT_DISKLESS) or
402         (not disk_size and options.disk_template != constants.DT_DISKLESS)):
403       Err("Wrong disk count/disk template combination")
404
405     self.disk_size = disk_size
406     self.disk_growth = disk_growth
407     self.disk_count = len(disk_size)
408
409     if options.nodes and options.iallocator:
410       Err("Give either the nodes option or the iallocator option, not both")
411
412     self.opts = options
413     self.instances = args
414     self.bep = {
415       constants.BE_MEMORY: options.mem_size,
416       constants.BE_VCPUS: 1,
417       }
418     self.hvp = {}
419
420     socket.setdefaulttimeout(options.net_timeout)
421
422   def GetState(self):
423     """Read the cluster state from the config."""
424     if self.opts.nodes:
425       names = self.opts.nodes.split(",")
426     else:
427       names = []
428     try:
429       op = opcodes.OpQueryNodes(output_fields=["name", "offline", "drained"],
430                                 names=names, use_locking=True)
431       result = self.ExecOp(True, op)
432     except errors.GenericError, err:
433       err_code, msg = cli.FormatError(err)
434       Err(msg, exit_code=err_code)
435     self.nodes = [data[0] for data in result if not (data[1] or data[2])]
436
437     op_diagos = opcodes.OpDiagnoseOS(output_fields=["name", "valid"], names=[])
438     result = self.ExecOp(True, op_diagos)
439
440     if not result:
441       Err("Can't get the OS list")
442
443     # filter non-valid OS-es
444     os_set = [val[0] for val in result if val[1]]
445
446     if self.opts.os not in os_set:
447       Err("OS '%s' not found" % self.opts.os)
448
449   @_DoCheckInstances
450   @_DoBatch(False)
451   def BurnCreateInstances(self):
452     """Create the given instances.
453
454     """
455     self.to_rem = []
456     mytor = izip(cycle(self.nodes),
457                  islice(cycle(self.nodes), 1, None),
458                  self.instances)
459
460     Log("Creating instances")
461     for pnode, snode, instance in mytor:
462       Log("instance %s" % instance, indent=1)
463       if self.opts.iallocator:
464         pnode = snode = None
465         msg = "with iallocator %s" % self.opts.iallocator
466       elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
467         snode = None
468         msg = "on %s" % pnode
469       else:
470         msg = "on %s, %s" % (pnode, snode)
471
472       Log(msg, indent=2)
473
474       op = opcodes.OpCreateInstance(instance_name=instance,
475                                     disks = [ {"size": size}
476                                               for size in self.disk_size],
477                                     disk_template=self.opts.disk_template,
478                                     nics=self.opts.nics,
479                                     mode=constants.INSTANCE_CREATE,
480                                     os_type=self.opts.os,
481                                     pnode=pnode,
482                                     snode=snode,
483                                     start=True,
484                                     ip_check=True,
485                                     wait_for_sync=True,
486                                     file_driver="loop",
487                                     file_storage_dir=None,
488                                     iallocator=self.opts.iallocator,
489                                     beparams=self.bep,
490                                     hvparams=self.hvp,
491                                     )
492
493       self.ExecOrQueue(instance, op)
494       self.to_rem.append(instance)
495
496   @_DoBatch(False)
497   def BurnGrowDisks(self):
498     """Grow both the os and the swap disks by the requested amount, if any."""
499     Log("Growing disks")
500     for instance in self.instances:
501       Log("instance %s" % instance, indent=1)
502       for idx, growth in enumerate(self.disk_growth):
503         if growth > 0:
504           op = opcodes.OpGrowDisk(instance_name=instance, disk=idx,
505                                   amount=growth, wait_for_sync=True)
506           Log("increase disk/%s by %s MB" % (idx, growth), indent=2)
507           self.ExecOrQueue(instance, op)
508
509   @_DoBatch(True)
510   def BurnReplaceDisks1D8(self):
511     """Replace disks on primary and secondary for drbd8."""
512     Log("Replacing disks on the same nodes")
513     for instance in self.instances:
514       Log("instance %s" % instance, indent=1)
515       ops = []
516       for mode in constants.REPLACE_DISK_SEC, constants.REPLACE_DISK_PRI:
517         op = opcodes.OpReplaceDisks(instance_name=instance,
518                                     mode=mode,
519                                     disks=[i for i in range(self.disk_count)])
520         Log("run %s" % mode, indent=2)
521         ops.append(op)
522       self.ExecOrQueue(instance, *ops)
523
524   @_DoBatch(True)
525   def BurnReplaceDisks2(self):
526     """Replace secondary node."""
527     Log("Changing the secondary node")
528     mode = constants.REPLACE_DISK_CHG
529
530     mytor = izip(islice(cycle(self.nodes), 2, None),
531                  self.instances)
532     for tnode, instance in mytor:
533       Log("instance %s" % instance, indent=1)
534       if self.opts.iallocator:
535         tnode = None
536         msg = "with iallocator %s" % self.opts.iallocator
537       else:
538         msg = tnode
539       op = opcodes.OpReplaceDisks(instance_name=instance,
540                                   mode=mode,
541                                   remote_node=tnode,
542                                   iallocator=self.opts.iallocator,
543                                   disks=[i for i in range(self.disk_count)])
544       Log("run %s %s" % (mode, msg), indent=2)
545       self.ExecOrQueue(instance, op)
546
547   @_DoCheckInstances
548   @_DoBatch(False)
549   def BurnFailover(self):
550     """Failover the instances."""
551     Log("Failing over instances")
552     for instance in self.instances:
553       Log("instance %s" % instance, indent=1)
554       op = opcodes.OpFailoverInstance(instance_name=instance,
555                                       ignore_consistency=False)
556
557       self.ExecOrQueue(instance, op)
558
559   @_DoBatch(False)
560   def BurnMigrate(self):
561     """Migrate the instances."""
562     Log("Migrating instances")
563     for instance in self.instances:
564       Log("instance %s" % instance, indent=1)
565       op1 = opcodes.OpMigrateInstance(instance_name=instance, live=True,
566                                       cleanup=False)
567
568       op2 = opcodes.OpMigrateInstance(instance_name=instance, live=True,
569                                       cleanup=True)
570       Log("migration and migration cleanup", indent=2)
571       self.ExecOrQueue(instance, op1, op2)
572
573   @_DoCheckInstances
574   @_DoBatch(False)
575   def BurnImportExport(self):
576     """Export the instance, delete it, and import it back.
577
578     """
579     Log("Exporting and re-importing instances")
580     mytor = izip(cycle(self.nodes),
581                  islice(cycle(self.nodes), 1, None),
582                  islice(cycle(self.nodes), 2, None),
583                  self.instances)
584
585     for pnode, snode, enode, instance in mytor:
586       Log("instance %s" % instance, indent=1)
587       # read the full name of the instance
588       nam_op = opcodes.OpQueryInstances(output_fields=["name"],
589                                         names=[instance], use_locking=True)
590       full_name = self.ExecOp(False, nam_op)[0][0]
591
592       if self.opts.iallocator:
593         pnode = snode = None
594         import_log_msg = ("import from %s"
595                           " with iallocator %s" %
596                           (enode, self.opts.iallocator))
597       elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
598         snode = None
599         import_log_msg = ("import from %s to %s" %
600                           (enode, pnode))
601       else:
602         import_log_msg = ("import from %s to %s, %s" %
603                           (enode, pnode, snode))
604
605       exp_op = opcodes.OpExportInstance(instance_name=instance,
606                                            target_node=enode,
607                                            shutdown=True)
608       rem_op = opcodes.OpRemoveInstance(instance_name=instance,
609                                         ignore_failures=True)
610       imp_dir = os.path.join(constants.EXPORT_DIR, full_name)
611       imp_op = opcodes.OpCreateInstance(instance_name=instance,
612                                         disks = [ {"size": size}
613                                                   for size in self.disk_size],
614                                         disk_template=self.opts.disk_template,
615                                         nics=self.opts.nics,
616                                         mode=constants.INSTANCE_IMPORT,
617                                         src_node=enode,
618                                         src_path=imp_dir,
619                                         pnode=pnode,
620                                         snode=snode,
621                                         start=True,
622                                         ip_check=True,
623                                         wait_for_sync=True,
624                                         file_storage_dir=None,
625                                         file_driver="loop",
626                                         iallocator=self.opts.iallocator,
627                                         beparams=self.bep,
628                                         hvparams=self.hvp,
629                                         )
630
631       erem_op = opcodes.OpRemoveExport(instance_name=instance)
632
633       Log("export to node %s" % enode, indent=2)
634       Log("remove instance", indent=2)
635       Log(import_log_msg, indent=2)
636       Log("remove export", indent=2)
637       self.ExecOrQueue(instance, exp_op, rem_op, imp_op, erem_op)
638
639   def StopInstanceOp(self, instance):
640     """Stop given instance."""
641     return opcodes.OpShutdownInstance(instance_name=instance)
642
643   def StartInstanceOp(self, instance):
644     """Start given instance."""
645     return opcodes.OpStartupInstance(instance_name=instance, force=False)
646
647   def RenameInstanceOp(self, instance, instance_new):
648     """Rename instance."""
649     return opcodes.OpRenameInstance(instance_name=instance,
650                                     new_name=instance_new)
651
652   @_DoCheckInstances
653   @_DoBatch(True)
654   def BurnStopStart(self):
655     """Stop/start the instances."""
656     Log("Stopping and starting instances")
657     for instance in self.instances:
658       Log("instance %s" % instance, indent=1)
659       op1 = self.StopInstanceOp(instance)
660       op2 = self.StartInstanceOp(instance)
661       self.ExecOrQueue(instance, op1, op2)
662
663   @_DoBatch(False)
664   def BurnRemove(self):
665     """Remove the instances."""
666     Log("Removing instances")
667     for instance in self.to_rem:
668       Log("instance %s" % instance, indent=1)
669       op = opcodes.OpRemoveInstance(instance_name=instance,
670                                     ignore_failures=True)
671       self.ExecOrQueue(instance, op)
672
673   def BurnRename(self):
674     """Rename the instances.
675
676     Note that this function will not execute in parallel, since we
677     only have one target for rename.
678
679     """
680     Log("Renaming instances")
681     rename = self.opts.rename
682     for instance in self.instances:
683       Log("instance %s" % instance, indent=1)
684       op_stop1 = self.StopInstanceOp(instance)
685       op_stop2 = self.StopInstanceOp(rename)
686       op_rename1 = self.RenameInstanceOp(instance, rename)
687       op_rename2 = self.RenameInstanceOp(rename, instance)
688       op_start1 = self.StartInstanceOp(rename)
689       op_start2 = self.StartInstanceOp(instance)
690       self.ExecOp(False, op_stop1, op_rename1, op_start1)
691       self._CheckInstanceAlive(rename)
692       self.ExecOp(False, op_stop2, op_rename2, op_start2)
693       self._CheckInstanceAlive(instance)
694
695   @_DoCheckInstances
696   @_DoBatch(True)
697   def BurnReinstall(self):
698     """Reinstall the instances."""
699     Log("Reinstalling instances")
700     for instance in self.instances:
701       Log("instance %s" % instance, indent=1)
702       op1 = self.StopInstanceOp(instance)
703       op2 = opcodes.OpReinstallInstance(instance_name=instance)
704       Log("reinstall without passing the OS", indent=2)
705       op3 = opcodes.OpReinstallInstance(instance_name=instance,
706                                         os_type=self.opts.os)
707       Log("reinstall specifying the OS", indent=2)
708       op4 = self.StartInstanceOp(instance)
709       self.ExecOrQueue(instance, op1, op2, op3, op4)
710
711   @_DoCheckInstances
712   @_DoBatch(True)
713   def BurnReboot(self):
714     """Reboot the instances."""
715     Log("Rebooting instances")
716     for instance in self.instances:
717       Log("instance %s" % instance, indent=1)
718       ops = []
719       for reboot_type in constants.REBOOT_TYPES:
720         op = opcodes.OpRebootInstance(instance_name=instance,
721                                       reboot_type=reboot_type,
722                                       ignore_secondaries=False)
723         Log("reboot with type '%s'" % reboot_type, indent=2)
724         ops.append(op)
725       self.ExecOrQueue(instance, *ops)
726
727   @_DoCheckInstances
728   @_DoBatch(True)
729   def BurnActivateDisks(self):
730     """Activate and deactivate disks of the instances."""
731     Log("Activating/deactivating disks")
732     for instance in self.instances:
733       Log("instance %s" % instance, indent=1)
734       op_start = self.StartInstanceOp(instance)
735       op_act = opcodes.OpActivateInstanceDisks(instance_name=instance)
736       op_deact = opcodes.OpDeactivateInstanceDisks(instance_name=instance)
737       op_stop = self.StopInstanceOp(instance)
738       Log("activate disks when online", indent=2)
739       Log("activate disks when offline", indent=2)
740       Log("deactivate disks (when offline)", indent=2)
741       self.ExecOrQueue(instance, op_act, op_stop, op_act, op_deact, op_start)
742
743   @_DoCheckInstances
744   @_DoBatch(False)
745   def BurnAddRemoveDisks(self):
746     """Add and remove an extra disk for the instances."""
747     Log("Adding and removing disks")
748     for instance in self.instances:
749       Log("instance %s" % instance, indent=1)
750       op_add = opcodes.OpSetInstanceParams(\
751         instance_name=instance,
752         disks=[(constants.DDM_ADD, {"size": self.disk_size[0]})])
753       op_rem = opcodes.OpSetInstanceParams(\
754         instance_name=instance, disks=[(constants.DDM_REMOVE, {})])
755       op_stop = self.StopInstanceOp(instance)
756       op_start = self.StartInstanceOp(instance)
757       Log("adding a disk", indent=2)
758       Log("removing last disk", indent=2)
759       self.ExecOrQueue(instance, op_add, op_stop, op_rem, op_start)
760
761   @_DoBatch(False)
762   def BurnAddRemoveNICs(self):
763     """Add and remove an extra NIC for the instances."""
764     Log("Adding and removing NICs")
765     for instance in self.instances:
766       Log("instance %s" % instance, indent=1)
767       op_add = opcodes.OpSetInstanceParams(\
768         instance_name=instance, nics=[(constants.DDM_ADD, {})])
769       op_rem = opcodes.OpSetInstanceParams(\
770         instance_name=instance, nics=[(constants.DDM_REMOVE, {})])
771       Log("adding a NIC", indent=2)
772       Log("removing last NIC", indent=2)
773       self.ExecOrQueue(instance, op_add, op_rem)
774
775   def _CheckInstanceAlive(self, instance):
776     """Check if an instance is alive by doing http checks.
777
778     This will try to retrieve the url on the instance /hostname.txt
779     and check that it contains the hostname of the instance. In case
780     we get ECONNREFUSED, we retry up to the net timeout seconds, for
781     any other error we abort.
782
783     """
784     if not self.opts.http_check:
785       return
786     end_time = time.time() + self.opts.net_timeout
787     url = None
788     while time.time() < end_time and url is None:
789       try:
790         url = self.url_opener.open("http://%s/hostname.txt" % instance)
791       except IOError:
792         # here we can have connection refused, no route to host, etc.
793         time.sleep(1)
794     if url is None:
795       raise InstanceDown(instance, "Cannot contact instance")
796     hostname = url.read().strip()
797     url.close()
798     if hostname != instance:
799       raise InstanceDown(instance, ("Hostname mismatch, expected %s, got %s" %
800                                     (instance, hostname)))
801
802   def BurninCluster(self):
803     """Test a cluster intensively.
804
805     This will create instances and then start/stop/failover them.
806     It is safe for existing instances but could impact performance.
807
808     """
809
810     opts = self.opts
811
812     Log("Testing global parameters")
813
814     if (len(self.nodes) == 1 and
815         opts.disk_template not in (constants.DT_DISKLESS, constants.DT_PLAIN,
816                                    constants.DT_FILE)):
817       Err("When one node is available/selected the disk template must"
818           " be 'diskless', 'file' or 'plain'")
819
820     has_err = True
821     try:
822       self.BurnCreateInstances()
823       if opts.do_replace1 and opts.disk_template in constants.DTS_NET_MIRROR:
824         self.BurnReplaceDisks1D8()
825       if (opts.do_replace2 and len(self.nodes) > 2 and
826           opts.disk_template in constants.DTS_NET_MIRROR) :
827         self.BurnReplaceDisks2()
828
829       if (opts.disk_template != constants.DT_DISKLESS and
830           utils.any(self.disk_growth, lambda n: n > 0)):
831         self.BurnGrowDisks()
832
833       if opts.do_failover and opts.disk_template in constants.DTS_NET_MIRROR:
834         self.BurnFailover()
835
836       if opts.do_migrate and opts.disk_template == constants.DT_DRBD8:
837         self.BurnMigrate()
838
839       if (opts.do_importexport and
840           opts.disk_template not in (constants.DT_DISKLESS,
841                                      constants.DT_FILE)):
842         self.BurnImportExport()
843
844       if opts.do_reinstall:
845         self.BurnReinstall()
846
847       if opts.do_reboot:
848         self.BurnReboot()
849
850       if opts.do_addremove_disks:
851         self.BurnAddRemoveDisks()
852
853       if opts.do_addremove_nics:
854         self.BurnAddRemoveNICs()
855
856       if opts.do_activate_disks:
857         self.BurnActivateDisks()
858
859       if opts.rename:
860         self.BurnRename()
861
862       if opts.do_startstop:
863         self.BurnStopStart()
864
865       has_err = False
866     finally:
867       if has_err:
868         Log("Error detected: opcode buffer follows:\n\n")
869         Log(self.GetFeedbackBuf())
870         Log("\n\n")
871       if not self.opts.keep_instances:
872         try:
873           self.BurnRemove()
874         except Exception, err:
875           if has_err: # already detected errors, so errors in removal
876                       # are quite expected
877             Log("Note: error detected during instance remove: %s" % str(err))
878           else: # non-expected error
879             raise
880
881     return 0
882
883
884 def main():
885   """Main function"""
886
887   burner = Burner()
888   return burner.BurninCluster()
889
890
891 if __name__ == "__main__":
892   main()