Update docstrings in tools/ and enable epydoc
[ganeti-local] / tools / burnin
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Burnin program
23
24 """
25
26 import sys
27 import optparse
28 import time
29 import socket
30 import urllib
31 from itertools import izip, islice, cycle
32 from cStringIO import StringIO
33
34 from ganeti import opcodes
35 from ganeti import constants
36 from ganeti import cli
37 from ganeti import errors
38 from ganeti import utils
39
40 from ganeti.confd import client as confd_client
41
42
43 USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...")
44
45 MAX_RETRIES = 3
46 LOG_HEADERS = {
47   0: "- ",
48   1: "* ",
49   2: ""
50   }
51
52 class InstanceDown(Exception):
53   """The checked instance was not up"""
54
55
56 class BurninFailure(Exception):
57   """Failure detected during burning"""
58
59
60 def Usage():
61   """Shows program usage information and exits the program."""
62
63   print >> sys.stderr, "Usage:"
64   print >> sys.stderr, USAGE
65   sys.exit(2)
66
67
68 def Log(msg, *args, **kwargs):
69   """Simple function that prints out its argument.
70
71   """
72   if args:
73     msg = msg % args
74   indent = kwargs.get('indent', 0)
75   sys.stdout.write("%*s%s%s\n" % (2*indent, "",
76                                   LOG_HEADERS.get(indent, "  "), msg))
77   sys.stdout.flush()
78
79
80 def Err(msg, exit_code=1):
81   """Simple error logging that prints to stderr.
82
83   """
84   sys.stderr.write(msg + "\n")
85   sys.stderr.flush()
86   sys.exit(exit_code)
87
88
89 class SimpleOpener(urllib.FancyURLopener):
90   """A simple url opener"""
91   # pylint: disable-msg=W0221
92
93   def prompt_user_passwd(self, host, realm, clear_cache=0):
94     """No-interaction version of prompt_user_passwd."""
95     # we follow parent class' API
96     # pylint: disable-msg=W0613
97     return None, None
98
99   def http_error_default(self, url, fp, errcode, errmsg, headers):
100     """Custom error handling"""
101     # make sure sockets are not left in CLOSE_WAIT, this is similar
102     # but with a different exception to the BasicURLOpener class
103     _ = fp.read() # throw away data
104     fp.close()
105     raise InstanceDown("HTTP error returned: code %s, msg %s" %
106                        (errcode, errmsg))
107
108
109 OPTIONS = [
110   cli.cli_option("-o", "--os", dest="os", default=None,
111                  help="OS to use during burnin",
112                  metavar="<OS>",
113                  completion_suggest=cli.OPT_COMPL_ONE_OS),
114   cli.cli_option("--disk-size", dest="disk_size",
115                  help="Disk size (determines disk count)",
116                  default="128m", type="string", metavar="<size,size,...>",
117                  completion_suggest=("128M 512M 1G 4G 1G,256M"
118                                      " 4G,1G,1G 10G").split()),
119   cli.cli_option("--disk-growth", dest="disk_growth", help="Disk growth",
120                  default="128m", type="string", metavar="<size,size,...>"),
121   cli.cli_option("--mem-size", dest="mem_size", help="Memory size",
122                  default=128, type="unit", metavar="<size>",
123                  completion_suggest=("128M 256M 512M 1G 4G 8G"
124                                      " 12G 16G").split()),
125   cli.DEBUG_OPT,
126   cli.VERBOSE_OPT,
127   cli.NOIPCHECK_OPT,
128   cli.NONAMECHECK_OPT,
129   cli.EARLY_RELEASE_OPT,
130   cli.cli_option("--no-replace1", dest="do_replace1",
131                  help="Skip disk replacement with the same secondary",
132                  action="store_false", default=True),
133   cli.cli_option("--no-replace2", dest="do_replace2",
134                  help="Skip disk replacement with a different secondary",
135                  action="store_false", default=True),
136   cli.cli_option("--no-failover", dest="do_failover",
137                  help="Skip instance failovers", action="store_false",
138                  default=True),
139   cli.cli_option("--no-migrate", dest="do_migrate",
140                  help="Skip instance live migration",
141                  action="store_false", default=True),
142   cli.cli_option("--no-move", dest="do_move",
143                  help="Skip instance moves", action="store_false",
144                  default=True),
145   cli.cli_option("--no-importexport", dest="do_importexport",
146                  help="Skip instance export/import", action="store_false",
147                  default=True),
148   cli.cli_option("--no-startstop", dest="do_startstop",
149                  help="Skip instance stop/start", action="store_false",
150                  default=True),
151   cli.cli_option("--no-reinstall", dest="do_reinstall",
152                  help="Skip instance reinstall", action="store_false",
153                  default=True),
154   cli.cli_option("--no-reboot", dest="do_reboot",
155                  help="Skip instance reboot", action="store_false",
156                  default=True),
157   cli.cli_option("--no-activate-disks", dest="do_activate_disks",
158                  help="Skip disk activation/deactivation",
159                  action="store_false", default=True),
160   cli.cli_option("--no-add-disks", dest="do_addremove_disks",
161                  help="Skip disk addition/removal",
162                  action="store_false", default=True),
163   cli.cli_option("--no-add-nics", dest="do_addremove_nics",
164                  help="Skip NIC addition/removal",
165                  action="store_false", default=True),
166   cli.cli_option("--no-nics", dest="nics",
167                  help="No network interfaces", action="store_const",
168                  const=[], default=[{}]),
169   cli.cli_option("--no-confd", dest="do_confd_tests",
170                  help="Skip confd queries",
171                  action="store_false", default=True),
172   cli.cli_option("--rename", dest="rename", default=None,
173                  help=("Give one unused instance name which is taken"
174                        " to start the renaming sequence"),
175                  metavar="<instance_name>"),
176   cli.cli_option("-t", "--disk-template", dest="disk_template",
177                  choices=list(constants.DISK_TEMPLATES),
178                  default=constants.DT_DRBD8,
179                  help="Disk template (diskless, file, plain or drbd) [drbd]"),
180   cli.cli_option("-n", "--nodes", dest="nodes", default="",
181                  help=("Comma separated list of nodes to perform"
182                        " the burnin on (defaults to all nodes)"),
183                  completion_suggest=cli.OPT_COMPL_MANY_NODES),
184   cli.cli_option("-I", "--iallocator", dest="iallocator",
185                  default=None, type="string",
186                  help=("Perform the allocation using an iallocator"
187                        " instead of fixed node spread (node restrictions no"
188                        " longer apply, therefore -n/--nodes must not be"
189                        " used"),
190                  completion_suggest=cli.OPT_COMPL_ONE_IALLOCATOR),
191   cli.cli_option("-p", "--parallel", default=False, action="store_true",
192                  dest="parallel",
193                  help=("Enable parallelization of some operations in"
194                        " order to speed burnin or to test granular locking")),
195   cli.cli_option("--net-timeout", default=15, type="int",
196                  dest="net_timeout",
197                  help=("The instance check network timeout in seconds"
198                        " (defaults to 15 seconds)"),
199                  completion_suggest="15 60 300 900".split()),
200   cli.cli_option("-C", "--http-check", default=False, action="store_true",
201                  dest="http_check",
202                  help=("Enable checking of instance status via http,"
203                        " looking for /hostname.txt that should contain the"
204                        " name of the instance")),
205   cli.cli_option("-K", "--keep-instances", default=False,
206                  action="store_true",
207                  dest="keep_instances",
208                  help=("Leave instances on the cluster after burnin,"
209                        " for investigation in case of errors or simply"
210                        " to use them")),
211   ]
212
213 # Mainly used for bash completion
214 ARGUMENTS = [cli.ArgInstance(min=1)]
215
216
217 def _DoCheckInstances(fn):
218   """Decorator for checking instances.
219
220   """
221   def wrapper(self, *args, **kwargs):
222     val = fn(self, *args, **kwargs)
223     for instance in self.instances:
224       self._CheckInstanceAlive(instance) # pylint: disable-msg=W0212
225     return val
226
227   return wrapper
228
229
230 def _DoBatch(retry):
231   """Decorator for possible batch operations.
232
233   Must come after the _DoCheckInstances decorator (if any).
234
235   @param retry: whether this is a retryable batch, will be
236       passed to StartBatch
237
238   """
239   def wrap(fn):
240     def batched(self, *args, **kwargs):
241       self.StartBatch(retry)
242       val = fn(self, *args, **kwargs)
243       self.CommitQueue()
244       return val
245     return batched
246
247   return wrap
248
249
250 class Burner(object):
251   """Burner class."""
252
253   def __init__(self):
254     """Constructor."""
255     utils.SetupLogging(constants.LOG_BURNIN, debug=False, stderr_logging=True)
256     self.url_opener = SimpleOpener()
257     self._feed_buf = StringIO()
258     self.nodes = []
259     self.instances = []
260     self.to_rem = []
261     self.queued_ops = []
262     self.opts = None
263     self.queue_retry = False
264     self.disk_count = self.disk_growth = self.disk_size = None
265     self.hvp = self.bep = None
266     self.ParseOptions()
267     self.cl = cli.GetClient()
268     self.GetState()
269
270   def ClearFeedbackBuf(self):
271     """Clear the feedback buffer."""
272     self._feed_buf.truncate(0)
273
274   def GetFeedbackBuf(self):
275     """Return the contents of the buffer."""
276     return self._feed_buf.getvalue()
277
278   def Feedback(self, msg):
279     """Acumulate feedback in our buffer."""
280     formatted_msg = "%s %s" % (time.ctime(utils.MergeTime(msg[0])), msg[2])
281     self._feed_buf.write(formatted_msg + "\n")
282     if self.opts.verbose:
283       Log(formatted_msg, indent=3)
284
285   def MaybeRetry(self, retry_count, msg, fn, *args):
286     """Possibly retry a given function execution.
287
288     @type retry_count: int
289     @param retry_count: retry counter:
290         - 0: non-retryable action
291         - 1: last retry for a retryable action
292         - MAX_RETRIES: original try for a retryable action
293     @type msg: str
294     @param msg: the kind of the operation
295     @type fn: callable
296     @param fn: the function to be called
297
298     """
299     try:
300       val = fn(*args)
301       if retry_count > 0 and retry_count < MAX_RETRIES:
302         Log("Idempotent %s succeeded after %d retries",
303             msg, MAX_RETRIES - retry_count)
304       return val
305     except Exception, err: # pylint: disable-msg=W0703
306       if retry_count == 0:
307         Log("Non-idempotent %s failed, aborting", msg)
308         raise
309       elif retry_count == 1:
310         Log("Idempotent %s repeated failure, aborting", msg)
311         raise
312       else:
313         Log("Idempotent %s failed, retry #%d/%d: %s",
314             msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err)
315         self.MaybeRetry(retry_count - 1, msg, fn, *args)
316
317   def _SetDebug(self, ops):
318     """Set the debug value on the given opcodes"""
319     for op in ops:
320       op.debug_level = self.opts.debug
321
322   def _ExecOp(self, *ops):
323     """Execute one or more opcodes and manage the exec buffer.
324
325     @return: if only opcode has been passed, we return its result;
326         otherwise we return the list of results
327
328     """
329     job_id = cli.SendJob(ops, cl=self.cl)
330     results = cli.PollJob(job_id, cl=self.cl, feedback_fn=self.Feedback)
331     if len(ops) == 1:
332       return results[0]
333     else:
334       return results
335
336   def ExecOp(self, retry, *ops):
337     """Execute one or more opcodes and manage the exec buffer.
338
339     @return: if only opcode has been passed, we return its result;
340         otherwise we return the list of results
341
342     """
343     if retry:
344       rval = MAX_RETRIES
345     else:
346       rval = 0
347     self._SetDebug(ops)
348     return self.MaybeRetry(rval, "opcode", self._ExecOp, *ops)
349
350   def ExecOrQueue(self, name, ops, post_process=None):
351     """Execute an opcode and manage the exec buffer."""
352     if self.opts.parallel:
353       self._SetDebug(ops)
354       self.queued_ops.append((ops, name, post_process))
355     else:
356       val = self.ExecOp(self.queue_retry, *ops) # pylint: disable-msg=W0142
357       if post_process is not None:
358         post_process()
359       return val
360
361   def StartBatch(self, retry):
362     """Start a new batch of jobs.
363
364     @param retry: whether this is a retryable batch
365
366     """
367     self.queued_ops = []
368     self.queue_retry = retry
369
370   def CommitQueue(self):
371     """Execute all submitted opcodes in case of parallel burnin"""
372     if not self.opts.parallel:
373       return
374
375     if self.queue_retry:
376       rval = MAX_RETRIES
377     else:
378       rval = 0
379
380     try:
381       results = self.MaybeRetry(rval, "jobset", self.ExecJobSet,
382                                 self.queued_ops)
383     finally:
384       self.queued_ops = []
385     return results
386
387   def ExecJobSet(self, jobs):
388     """Execute a set of jobs and return once all are done.
389
390     The method will return the list of results, if all jobs are
391     successful. Otherwise, OpExecError will be raised from within
392     cli.py.
393
394     """
395     self.ClearFeedbackBuf()
396     jex = cli.JobExecutor(cl=self.cl, feedback_fn=self.Feedback)
397     for ops, name, _ in jobs:
398       jex.QueueJob(name, *ops) # pylint: disable-msg=W0142
399     try:
400       results = jex.GetResults()
401     except Exception, err: # pylint: disable-msg=W0703
402       Log("Jobs failed: %s", err)
403       raise BurninFailure()
404
405     fail = False
406     val = []
407     for (_, name, post_process), (success, result) in zip(jobs, results):
408       if success:
409         if post_process:
410           try:
411             post_process()
412           except Exception, err: # pylint: disable-msg=W0703
413             Log("Post process call for job %s failed: %s", name, err)
414             fail = True
415         val.append(result)
416       else:
417         fail = True
418
419     if fail:
420       raise BurninFailure()
421
422     return val
423
424   def ParseOptions(self):
425     """Parses the command line options.
426
427     In case of command line errors, it will show the usage and exit the
428     program.
429
430     """
431     parser = optparse.OptionParser(usage="\n%s" % USAGE,
432                                    version=("%%prog (ganeti) %s" %
433                                             constants.RELEASE_VERSION),
434                                    option_list=OPTIONS)
435
436     options, args = parser.parse_args()
437     if len(args) < 1 or options.os is None:
438       Usage()
439
440     supported_disk_templates = (constants.DT_DISKLESS,
441                                 constants.DT_FILE,
442                                 constants.DT_PLAIN,
443                                 constants.DT_DRBD8)
444     if options.disk_template not in supported_disk_templates:
445       Err("Unknown disk template '%s'" % options.disk_template)
446
447     if options.disk_template == constants.DT_DISKLESS:
448       disk_size = disk_growth = []
449       options.do_addremove_disks = False
450     else:
451       disk_size = [utils.ParseUnit(v) for v in options.disk_size.split(",")]
452       disk_growth = [utils.ParseUnit(v)
453                      for v in options.disk_growth.split(",")]
454       if len(disk_growth) != len(disk_size):
455         Err("Wrong disk sizes/growth combination")
456     if ((disk_size and options.disk_template == constants.DT_DISKLESS) or
457         (not disk_size and options.disk_template != constants.DT_DISKLESS)):
458       Err("Wrong disk count/disk template combination")
459
460     self.disk_size = disk_size
461     self.disk_growth = disk_growth
462     self.disk_count = len(disk_size)
463
464     if options.nodes and options.iallocator:
465       Err("Give either the nodes option or the iallocator option, not both")
466
467     if options.http_check and not options.name_check:
468       Err("Can't enable HTTP checks without name checks")
469
470     self.opts = options
471     self.instances = args
472     self.bep = {
473       constants.BE_MEMORY: options.mem_size,
474       constants.BE_VCPUS: 1,
475       }
476     self.hvp = {}
477
478     socket.setdefaulttimeout(options.net_timeout)
479
480   def GetState(self):
481     """Read the cluster state from the master daemon."""
482     if self.opts.nodes:
483       names = self.opts.nodes.split(",")
484     else:
485       names = []
486     try:
487       op = opcodes.OpQueryNodes(output_fields=["name", "offline", "drained"],
488                                 names=names, use_locking=True)
489       result = self.ExecOp(True, op)
490     except errors.GenericError, err:
491       err_code, msg = cli.FormatError(err)
492       Err(msg, exit_code=err_code)
493     self.nodes = [data[0] for data in result if not (data[1] or data[2])]
494
495     op_diagnose = opcodes.OpDiagnoseOS(output_fields=["name", "valid",
496                                                       "variants"], names=[])
497     result = self.ExecOp(True, op_diagnose)
498
499     if not result:
500       Err("Can't get the OS list")
501
502     found = False
503     for (name, valid, variants) in result:
504       if valid and self.opts.os in cli.CalculateOSNames(name, variants):
505         found = True
506         break
507
508     if not found:
509       Err("OS '%s' not found" % self.opts.os)
510
511     cluster_info = self.cl.QueryClusterInfo()
512     self.cluster_info = cluster_info
513     if not self.cluster_info:
514       Err("Can't get cluster info")
515
516     default_nic_params = self.cluster_info["nicparams"][constants.PP_DEFAULT]
517     self.cluster_default_nicparams = default_nic_params
518
519   @_DoCheckInstances
520   @_DoBatch(False)
521   def BurnCreateInstances(self):
522     """Create the given instances.
523
524     """
525     self.to_rem = []
526     mytor = izip(cycle(self.nodes),
527                  islice(cycle(self.nodes), 1, None),
528                  self.instances)
529
530     Log("Creating instances")
531     for pnode, snode, instance in mytor:
532       Log("instance %s", instance, indent=1)
533       if self.opts.iallocator:
534         pnode = snode = None
535         msg = "with iallocator %s" % self.opts.iallocator
536       elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
537         snode = None
538         msg = "on %s" % pnode
539       else:
540         msg = "on %s, %s" % (pnode, snode)
541
542       Log(msg, indent=2)
543
544       op = opcodes.OpCreateInstance(instance_name=instance,
545                                     disks = [ {"size": size}
546                                               for size in self.disk_size],
547                                     disk_template=self.opts.disk_template,
548                                     nics=self.opts.nics,
549                                     mode=constants.INSTANCE_CREATE,
550                                     os_type=self.opts.os,
551                                     pnode=pnode,
552                                     snode=snode,
553                                     start=True,
554                                     ip_check=self.opts.ip_check,
555                                     name_check=self.opts.name_check,
556                                     wait_for_sync=True,
557                                     file_driver="loop",
558                                     file_storage_dir=None,
559                                     iallocator=self.opts.iallocator,
560                                     beparams=self.bep,
561                                     hvparams=self.hvp,
562                                     )
563       remove_instance = lambda name: lambda: self.to_rem.append(name)
564       self.ExecOrQueue(instance, [op], post_process=remove_instance(instance))
565
566   @_DoBatch(False)
567   def BurnGrowDisks(self):
568     """Grow both the os and the swap disks by the requested amount, if any."""
569     Log("Growing disks")
570     for instance in self.instances:
571       Log("instance %s", instance, indent=1)
572       for idx, growth in enumerate(self.disk_growth):
573         if growth > 0:
574           op = opcodes.OpGrowDisk(instance_name=instance, disk=idx,
575                                   amount=growth, wait_for_sync=True)
576           Log("increase disk/%s by %s MB", idx, growth, indent=2)
577           self.ExecOrQueue(instance, [op])
578
579   @_DoBatch(True)
580   def BurnReplaceDisks1D8(self):
581     """Replace disks on primary and secondary for drbd8."""
582     Log("Replacing disks on the same nodes")
583     for instance in self.instances:
584       Log("instance %s", instance, indent=1)
585       ops = []
586       for mode in constants.REPLACE_DISK_SEC, constants.REPLACE_DISK_PRI:
587         op = opcodes.OpReplaceDisks(instance_name=instance,
588                                     mode=mode,
589                                     disks=[i for i in range(self.disk_count)],
590                                     early_release=self.opts.early_release)
591         Log("run %s", mode, indent=2)
592         ops.append(op)
593       self.ExecOrQueue(instance, ops)
594
595   @_DoBatch(True)
596   def BurnReplaceDisks2(self):
597     """Replace secondary node."""
598     Log("Changing the secondary node")
599     mode = constants.REPLACE_DISK_CHG
600
601     mytor = izip(islice(cycle(self.nodes), 2, None),
602                  self.instances)
603     for tnode, instance in mytor:
604       Log("instance %s", instance, indent=1)
605       if self.opts.iallocator:
606         tnode = None
607         msg = "with iallocator %s" % self.opts.iallocator
608       else:
609         msg = tnode
610       op = opcodes.OpReplaceDisks(instance_name=instance,
611                                   mode=mode,
612                                   remote_node=tnode,
613                                   iallocator=self.opts.iallocator,
614                                   disks=[],
615                                   early_release=self.opts.early_release)
616       Log("run %s %s", mode, msg, indent=2)
617       self.ExecOrQueue(instance, [op])
618
619   @_DoCheckInstances
620   @_DoBatch(False)
621   def BurnFailover(self):
622     """Failover the instances."""
623     Log("Failing over instances")
624     for instance in self.instances:
625       Log("instance %s", instance, indent=1)
626       op = opcodes.OpFailoverInstance(instance_name=instance,
627                                       ignore_consistency=False)
628       self.ExecOrQueue(instance, [op])
629
630   @_DoCheckInstances
631   @_DoBatch(False)
632   def BurnMove(self):
633     """Move the instances."""
634     Log("Moving instances")
635     mytor = izip(islice(cycle(self.nodes), 1, None),
636                  self.instances)
637     for tnode, instance in mytor:
638       Log("instance %s", instance, indent=1)
639       op = opcodes.OpMoveInstance(instance_name=instance,
640                                   target_node=tnode)
641       self.ExecOrQueue(instance, [op])
642
643   @_DoBatch(False)
644   def BurnMigrate(self):
645     """Migrate the instances."""
646     Log("Migrating instances")
647     for instance in self.instances:
648       Log("instance %s", instance, indent=1)
649       op1 = opcodes.OpMigrateInstance(instance_name=instance, live=True,
650                                       cleanup=False)
651
652       op2 = opcodes.OpMigrateInstance(instance_name=instance, live=True,
653                                       cleanup=True)
654       Log("migration and migration cleanup", indent=2)
655       self.ExecOrQueue(instance, [op1, op2])
656
657   @_DoCheckInstances
658   @_DoBatch(False)
659   def BurnImportExport(self):
660     """Export the instance, delete it, and import it back.
661
662     """
663     Log("Exporting and re-importing instances")
664     mytor = izip(cycle(self.nodes),
665                  islice(cycle(self.nodes), 1, None),
666                  islice(cycle(self.nodes), 2, None),
667                  self.instances)
668
669     for pnode, snode, enode, instance in mytor:
670       Log("instance %s", instance, indent=1)
671       # read the full name of the instance
672       nam_op = opcodes.OpQueryInstances(output_fields=["name"],
673                                         names=[instance], use_locking=True)
674       full_name = self.ExecOp(False, nam_op)[0][0]
675
676       if self.opts.iallocator:
677         pnode = snode = None
678         import_log_msg = ("import from %s"
679                           " with iallocator %s" %
680                           (enode, self.opts.iallocator))
681       elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
682         snode = None
683         import_log_msg = ("import from %s to %s" %
684                           (enode, pnode))
685       else:
686         import_log_msg = ("import from %s to %s, %s" %
687                           (enode, pnode, snode))
688
689       exp_op = opcodes.OpExportInstance(instance_name=instance,
690                                            target_node=enode,
691                                            shutdown=True)
692       rem_op = opcodes.OpRemoveInstance(instance_name=instance,
693                                         ignore_failures=True)
694       imp_dir = utils.PathJoin(constants.EXPORT_DIR, full_name)
695       imp_op = opcodes.OpCreateInstance(instance_name=instance,
696                                         disks = [ {"size": size}
697                                                   for size in self.disk_size],
698                                         disk_template=self.opts.disk_template,
699                                         nics=self.opts.nics,
700                                         mode=constants.INSTANCE_IMPORT,
701                                         src_node=enode,
702                                         src_path=imp_dir,
703                                         pnode=pnode,
704                                         snode=snode,
705                                         start=True,
706                                         ip_check=self.opts.ip_check,
707                                         name_check=self.opts.name_check,
708                                         wait_for_sync=True,
709                                         file_storage_dir=None,
710                                         file_driver="loop",
711                                         iallocator=self.opts.iallocator,
712                                         beparams=self.bep,
713                                         hvparams=self.hvp,
714                                         )
715
716       erem_op = opcodes.OpRemoveExport(instance_name=instance)
717
718       Log("export to node %s", enode, indent=2)
719       Log("remove instance", indent=2)
720       Log(import_log_msg, indent=2)
721       Log("remove export", indent=2)
722       self.ExecOrQueue(instance, [exp_op, rem_op, imp_op, erem_op])
723
724   @staticmethod
725   def StopInstanceOp(instance):
726     """Stop given instance."""
727     return opcodes.OpShutdownInstance(instance_name=instance)
728
729   @staticmethod
730   def StartInstanceOp(instance):
731     """Start given instance."""
732     return opcodes.OpStartupInstance(instance_name=instance, force=False)
733
734   @staticmethod
735   def RenameInstanceOp(instance, instance_new):
736     """Rename instance."""
737     return opcodes.OpRenameInstance(instance_name=instance,
738                                     new_name=instance_new)
739
740   @_DoCheckInstances
741   @_DoBatch(True)
742   def BurnStopStart(self):
743     """Stop/start the instances."""
744     Log("Stopping and starting instances")
745     for instance in self.instances:
746       Log("instance %s", instance, indent=1)
747       op1 = self.StopInstanceOp(instance)
748       op2 = self.StartInstanceOp(instance)
749       self.ExecOrQueue(instance, [op1, op2])
750
751   @_DoBatch(False)
752   def BurnRemove(self):
753     """Remove the instances."""
754     Log("Removing instances")
755     for instance in self.to_rem:
756       Log("instance %s", instance, indent=1)
757       op = opcodes.OpRemoveInstance(instance_name=instance,
758                                     ignore_failures=True)
759       self.ExecOrQueue(instance, [op])
760
761   def BurnRename(self):
762     """Rename the instances.
763
764     Note that this function will not execute in parallel, since we
765     only have one target for rename.
766
767     """
768     Log("Renaming instances")
769     rename = self.opts.rename
770     for instance in self.instances:
771       Log("instance %s", instance, indent=1)
772       op_stop1 = self.StopInstanceOp(instance)
773       op_stop2 = self.StopInstanceOp(rename)
774       op_rename1 = self.RenameInstanceOp(instance, rename)
775       op_rename2 = self.RenameInstanceOp(rename, instance)
776       op_start1 = self.StartInstanceOp(rename)
777       op_start2 = self.StartInstanceOp(instance)
778       self.ExecOp(False, op_stop1, op_rename1, op_start1)
779       self._CheckInstanceAlive(rename)
780       self.ExecOp(False, op_stop2, op_rename2, op_start2)
781       self._CheckInstanceAlive(instance)
782
783   @_DoCheckInstances
784   @_DoBatch(True)
785   def BurnReinstall(self):
786     """Reinstall the instances."""
787     Log("Reinstalling instances")
788     for instance in self.instances:
789       Log("instance %s", instance, indent=1)
790       op1 = self.StopInstanceOp(instance)
791       op2 = opcodes.OpReinstallInstance(instance_name=instance)
792       Log("reinstall without passing the OS", indent=2)
793       op3 = opcodes.OpReinstallInstance(instance_name=instance,
794                                         os_type=self.opts.os)
795       Log("reinstall specifying the OS", indent=2)
796       op4 = self.StartInstanceOp(instance)
797       self.ExecOrQueue(instance, [op1, op2, op3, op4])
798
799   @_DoCheckInstances
800   @_DoBatch(True)
801   def BurnReboot(self):
802     """Reboot the instances."""
803     Log("Rebooting instances")
804     for instance in self.instances:
805       Log("instance %s", instance, indent=1)
806       ops = []
807       for reboot_type in constants.REBOOT_TYPES:
808         op = opcodes.OpRebootInstance(instance_name=instance,
809                                       reboot_type=reboot_type,
810                                       ignore_secondaries=False)
811         Log("reboot with type '%s'", reboot_type, indent=2)
812         ops.append(op)
813       self.ExecOrQueue(instance, ops)
814
815   @_DoCheckInstances
816   @_DoBatch(True)
817   def BurnActivateDisks(self):
818     """Activate and deactivate disks of the instances."""
819     Log("Activating/deactivating disks")
820     for instance in self.instances:
821       Log("instance %s", instance, indent=1)
822       op_start = self.StartInstanceOp(instance)
823       op_act = opcodes.OpActivateInstanceDisks(instance_name=instance)
824       op_deact = opcodes.OpDeactivateInstanceDisks(instance_name=instance)
825       op_stop = self.StopInstanceOp(instance)
826       Log("activate disks when online", indent=2)
827       Log("activate disks when offline", indent=2)
828       Log("deactivate disks (when offline)", indent=2)
829       self.ExecOrQueue(instance, [op_act, op_stop, op_act, op_deact, op_start])
830
831   @_DoCheckInstances
832   @_DoBatch(False)
833   def BurnAddRemoveDisks(self):
834     """Add and remove an extra disk for the instances."""
835     Log("Adding and removing disks")
836     for instance in self.instances:
837       Log("instance %s", instance, indent=1)
838       op_add = opcodes.OpSetInstanceParams(\
839         instance_name=instance,
840         disks=[(constants.DDM_ADD, {"size": self.disk_size[0]})])
841       op_rem = opcodes.OpSetInstanceParams(\
842         instance_name=instance, disks=[(constants.DDM_REMOVE, {})])
843       op_stop = self.StopInstanceOp(instance)
844       op_start = self.StartInstanceOp(instance)
845       Log("adding a disk", indent=2)
846       Log("removing last disk", indent=2)
847       self.ExecOrQueue(instance, [op_add, op_stop, op_rem, op_start])
848
849   @_DoBatch(False)
850   def BurnAddRemoveNICs(self):
851     """Add and remove an extra NIC for the instances."""
852     Log("Adding and removing NICs")
853     for instance in self.instances:
854       Log("instance %s", instance, indent=1)
855       op_add = opcodes.OpSetInstanceParams(\
856         instance_name=instance, nics=[(constants.DDM_ADD, {})])
857       op_rem = opcodes.OpSetInstanceParams(\
858         instance_name=instance, nics=[(constants.DDM_REMOVE, {})])
859       Log("adding a NIC", indent=2)
860       Log("removing last NIC", indent=2)
861       self.ExecOrQueue(instance, [op_add, op_rem])
862
863   def ConfdCallback(self, reply):
864     """Callback for confd queries"""
865     if reply.type == confd_client.UPCALL_REPLY:
866       if reply.server_reply.status != constants.CONFD_REPL_STATUS_OK:
867         Err("Query %s gave non-ok status %s: %s" % (reply.orig_request,
868                                                     reply.server_reply.status,
869                                                     reply.server_reply))
870       if reply.orig_request.type == constants.CONFD_REQ_PING:
871         Log("Ping: OK", indent=1)
872       elif reply.orig_request.type == constants.CONFD_REQ_CLUSTER_MASTER:
873         if reply.server_reply.answer == self.cluster_info["master"]:
874           Log("Master: OK", indent=1)
875         else:
876           Err("Master: wrong: %s" % reply.server_reply.answer)
877       elif reply.orig_request.type == constants.CONFD_REQ_NODE_ROLE_BYNAME:
878         if reply.server_reply.answer == constants.CONFD_NODE_ROLE_MASTER:
879           Log("Node role for master: OK", indent=1)
880         else:
881           Err("Node role for master: wrong: %s" % reply.server_reply.answer)
882
883   def DoConfdRequestReply(self, req):
884     self.confd_counting_callback.RegisterQuery(req.rsalt)
885     self.confd_client.SendRequest(req, async=False)
886     while not self.confd_counting_callback.AllAnswered():
887       if not self.confd_client.ReceiveReply():
888         Err("Did not receive all expected confd replies")
889         break
890
891   def BurnConfd(self):
892     """Run confd queries for our instances.
893
894     The following confd queries are tested:
895       - CONFD_REQ_PING: simple ping
896       - CONFD_REQ_CLUSTER_MASTER: cluster master
897       - CONFD_REQ_NODE_ROLE_BYNAME: node role, for the master
898
899     """
900     Log("Checking confd results")
901
902     filter_callback = confd_client.ConfdFilterCallback(self.ConfdCallback)
903     counting_callback = confd_client.ConfdCountingCallback(filter_callback)
904     self.confd_counting_callback = counting_callback
905
906     self.confd_client = confd_client.GetConfdClient(counting_callback)
907
908     req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
909     self.DoConfdRequestReply(req)
910
911     req = confd_client.ConfdClientRequest(
912       type=constants.CONFD_REQ_CLUSTER_MASTER)
913     self.DoConfdRequestReply(req)
914
915     req = confd_client.ConfdClientRequest(
916         type=constants.CONFD_REQ_NODE_ROLE_BYNAME,
917         query=self.cluster_info["master"])
918     self.DoConfdRequestReply(req)
919
920   def _CheckInstanceAlive(self, instance):
921     """Check if an instance is alive by doing http checks.
922
923     This will try to retrieve the url on the instance /hostname.txt
924     and check that it contains the hostname of the instance. In case
925     we get ECONNREFUSED, we retry up to the net timeout seconds, for
926     any other error we abort.
927
928     """
929     if not self.opts.http_check:
930       return
931     end_time = time.time() + self.opts.net_timeout
932     url = None
933     while time.time() < end_time and url is None:
934       try:
935         url = self.url_opener.open("http://%s/hostname.txt" % instance)
936       except IOError:
937         # here we can have connection refused, no route to host, etc.
938         time.sleep(1)
939     if url is None:
940       raise InstanceDown(instance, "Cannot contact instance")
941     hostname = url.read().strip()
942     url.close()
943     if hostname != instance:
944       raise InstanceDown(instance, ("Hostname mismatch, expected %s, got %s" %
945                                     (instance, hostname)))
946
947   def BurninCluster(self):
948     """Test a cluster intensively.
949
950     This will create instances and then start/stop/failover them.
951     It is safe for existing instances but could impact performance.
952
953     """
954
955     opts = self.opts
956
957     Log("Testing global parameters")
958
959     if (len(self.nodes) == 1 and
960         opts.disk_template not in (constants.DT_DISKLESS, constants.DT_PLAIN,
961                                    constants.DT_FILE)):
962       Err("When one node is available/selected the disk template must"
963           " be 'diskless', 'file' or 'plain'")
964
965     has_err = True
966     try:
967       self.BurnCreateInstances()
968       if opts.do_replace1 and opts.disk_template in constants.DTS_NET_MIRROR:
969         self.BurnReplaceDisks1D8()
970       if (opts.do_replace2 and len(self.nodes) > 2 and
971           opts.disk_template in constants.DTS_NET_MIRROR) :
972         self.BurnReplaceDisks2()
973
974       if (opts.disk_template in constants.DTS_GROWABLE and
975           utils.any(self.disk_growth, lambda n: n > 0)):
976         self.BurnGrowDisks()
977
978       if opts.do_failover and opts.disk_template in constants.DTS_NET_MIRROR:
979         self.BurnFailover()
980
981       if opts.do_migrate and opts.disk_template == constants.DT_DRBD8:
982         self.BurnMigrate()
983
984       if (opts.do_move and len(self.nodes) > 1 and
985           opts.disk_template in [constants.DT_PLAIN, constants.DT_FILE]):
986         self.BurnMove()
987
988       if (opts.do_importexport and
989           opts.disk_template not in (constants.DT_DISKLESS,
990                                      constants.DT_FILE)):
991         self.BurnImportExport()
992
993       if opts.do_reinstall:
994         self.BurnReinstall()
995
996       if opts.do_reboot:
997         self.BurnReboot()
998
999       if opts.do_addremove_disks:
1000         self.BurnAddRemoveDisks()
1001
1002       default_nic_mode = self.cluster_default_nicparams[constants.NIC_MODE]
1003       # Don't add/remove nics in routed mode, as we would need an ip to add
1004       # them with
1005       if opts.do_addremove_nics:
1006         if default_nic_mode == constants.NIC_MODE_BRIDGED:
1007           self.BurnAddRemoveNICs()
1008         else:
1009           Log("Skipping nic add/remove as the cluster is not in bridged mode")
1010
1011       if opts.do_activate_disks:
1012         self.BurnActivateDisks()
1013
1014       if opts.rename:
1015         self.BurnRename()
1016
1017       if opts.do_confd_tests:
1018         self.BurnConfd()
1019
1020       if opts.do_startstop:
1021         self.BurnStopStart()
1022
1023       has_err = False
1024     finally:
1025       if has_err:
1026         Log("Error detected: opcode buffer follows:\n\n")
1027         Log(self.GetFeedbackBuf())
1028         Log("\n\n")
1029       if not self.opts.keep_instances:
1030         try:
1031           self.BurnRemove()
1032         except Exception, err:  # pylint: disable-msg=W0703
1033           if has_err: # already detected errors, so errors in removal
1034                       # are quite expected
1035             Log("Note: error detected during instance remove: %s", err)
1036           else: # non-expected error
1037             raise
1038
1039     return 0
1040
1041
1042 def main():
1043   """Main function"""
1044
1045   burner = Burner()
1046   return burner.BurninCluster()
1047
1048
1049 if __name__ == "__main__":
1050   main()