Add separate module for backported language functionality
[ganeti-local] / tools / burnin
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Burnin program
23
24 """
25
26 import sys
27 import optparse
28 import time
29 import socket
30 import urllib
31 from itertools import izip, islice, cycle
32 from cStringIO import StringIO
33
34 from ganeti import opcodes
35 from ganeti import constants
36 from ganeti import cli
37 from ganeti import errors
38 from ganeti import utils
39 from ganeti import hypervisor
40 from ganeti import compat
41
42 from ganeti.confd import client as confd_client
43
44
45 USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...")
46
47 MAX_RETRIES = 3
48 LOG_HEADERS = {
49   0: "- ",
50   1: "* ",
51   2: ""
52   }
53
54 class InstanceDown(Exception):
55   """The checked instance was not up"""
56
57
58 class BurninFailure(Exception):
59   """Failure detected during burning"""
60
61
62 def Usage():
63   """Shows program usage information and exits the program."""
64
65   print >> sys.stderr, "Usage:"
66   print >> sys.stderr, USAGE
67   sys.exit(2)
68
69
70 def Log(msg, *args, **kwargs):
71   """Simple function that prints out its argument.
72
73   """
74   if args:
75     msg = msg % args
76   indent = kwargs.get('indent', 0)
77   sys.stdout.write("%*s%s%s\n" % (2*indent, "",
78                                   LOG_HEADERS.get(indent, "  "), msg))
79   sys.stdout.flush()
80
81
82 def Err(msg, exit_code=1):
83   """Simple error logging that prints to stderr.
84
85   """
86   sys.stderr.write(msg + "\n")
87   sys.stderr.flush()
88   sys.exit(exit_code)
89
90
91 class SimpleOpener(urllib.FancyURLopener):
92   """A simple url opener"""
93   # pylint: disable-msg=W0221
94
95   def prompt_user_passwd(self, host, realm, clear_cache=0):
96     """No-interaction version of prompt_user_passwd."""
97     # we follow parent class' API
98     # pylint: disable-msg=W0613
99     return None, None
100
101   def http_error_default(self, url, fp, errcode, errmsg, headers):
102     """Custom error handling"""
103     # make sure sockets are not left in CLOSE_WAIT, this is similar
104     # but with a different exception to the BasicURLOpener class
105     _ = fp.read() # throw away data
106     fp.close()
107     raise InstanceDown("HTTP error returned: code %s, msg %s" %
108                        (errcode, errmsg))
109
110
111 OPTIONS = [
112   cli.cli_option("-o", "--os", dest="os", default=None,
113                  help="OS to use during burnin",
114                  metavar="<OS>",
115                  completion_suggest=cli.OPT_COMPL_ONE_OS),
116   cli.HYPERVISOR_OPT,
117   cli.cli_option("--disk-size", dest="disk_size",
118                  help="Disk size (determines disk count)",
119                  default="128m", type="string", metavar="<size,size,...>",
120                  completion_suggest=("128M 512M 1G 4G 1G,256M"
121                                      " 4G,1G,1G 10G").split()),
122   cli.cli_option("--disk-growth", dest="disk_growth", help="Disk growth",
123                  default="128m", type="string", metavar="<size,size,...>"),
124   cli.cli_option("--mem-size", dest="mem_size", help="Memory size",
125                  default=128, type="unit", metavar="<size>",
126                  completion_suggest=("128M 256M 512M 1G 4G 8G"
127                                      " 12G 16G").split()),
128   cli.DEBUG_OPT,
129   cli.VERBOSE_OPT,
130   cli.NOIPCHECK_OPT,
131   cli.NONAMECHECK_OPT,
132   cli.EARLY_RELEASE_OPT,
133   cli.cli_option("--no-replace1", dest="do_replace1",
134                  help="Skip disk replacement with the same secondary",
135                  action="store_false", default=True),
136   cli.cli_option("--no-replace2", dest="do_replace2",
137                  help="Skip disk replacement with a different secondary",
138                  action="store_false", default=True),
139   cli.cli_option("--no-failover", dest="do_failover",
140                  help="Skip instance failovers", action="store_false",
141                  default=True),
142   cli.cli_option("--no-migrate", dest="do_migrate",
143                  help="Skip instance live migration",
144                  action="store_false", default=True),
145   cli.cli_option("--no-move", dest="do_move",
146                  help="Skip instance moves", action="store_false",
147                  default=True),
148   cli.cli_option("--no-importexport", dest="do_importexport",
149                  help="Skip instance export/import", action="store_false",
150                  default=True),
151   cli.cli_option("--no-startstop", dest="do_startstop",
152                  help="Skip instance stop/start", action="store_false",
153                  default=True),
154   cli.cli_option("--no-reinstall", dest="do_reinstall",
155                  help="Skip instance reinstall", action="store_false",
156                  default=True),
157   cli.cli_option("--no-reboot", dest="do_reboot",
158                  help="Skip instance reboot", action="store_false",
159                  default=True),
160   cli.cli_option("--no-activate-disks", dest="do_activate_disks",
161                  help="Skip disk activation/deactivation",
162                  action="store_false", default=True),
163   cli.cli_option("--no-add-disks", dest="do_addremove_disks",
164                  help="Skip disk addition/removal",
165                  action="store_false", default=True),
166   cli.cli_option("--no-add-nics", dest="do_addremove_nics",
167                  help="Skip NIC addition/removal",
168                  action="store_false", default=True),
169   cli.cli_option("--no-nics", dest="nics",
170                  help="No network interfaces", action="store_const",
171                  const=[], default=[{}]),
172   cli.cli_option("--no-confd", dest="do_confd_tests",
173                  help="Skip confd queries",
174                  action="store_false", default=True),
175   cli.cli_option("--rename", dest="rename", default=None,
176                  help=("Give one unused instance name which is taken"
177                        " to start the renaming sequence"),
178                  metavar="<instance_name>"),
179   cli.cli_option("-t", "--disk-template", dest="disk_template",
180                  choices=list(constants.DISK_TEMPLATES),
181                  default=constants.DT_DRBD8,
182                  help="Disk template (diskless, file, plain or drbd) [drbd]"),
183   cli.cli_option("-n", "--nodes", dest="nodes", default="",
184                  help=("Comma separated list of nodes to perform"
185                        " the burnin on (defaults to all nodes)"),
186                  completion_suggest=cli.OPT_COMPL_MANY_NODES),
187   cli.cli_option("-I", "--iallocator", dest="iallocator",
188                  default=None, type="string",
189                  help=("Perform the allocation using an iallocator"
190                        " instead of fixed node spread (node restrictions no"
191                        " longer apply, therefore -n/--nodes must not be"
192                        " used"),
193                  completion_suggest=cli.OPT_COMPL_ONE_IALLOCATOR),
194   cli.cli_option("-p", "--parallel", default=False, action="store_true",
195                  dest="parallel",
196                  help=("Enable parallelization of some operations in"
197                        " order to speed burnin or to test granular locking")),
198   cli.cli_option("--net-timeout", default=15, type="int",
199                  dest="net_timeout",
200                  help=("The instance check network timeout in seconds"
201                        " (defaults to 15 seconds)"),
202                  completion_suggest="15 60 300 900".split()),
203   cli.cli_option("-C", "--http-check", default=False, action="store_true",
204                  dest="http_check",
205                  help=("Enable checking of instance status via http,"
206                        " looking for /hostname.txt that should contain the"
207                        " name of the instance")),
208   cli.cli_option("-K", "--keep-instances", default=False,
209                  action="store_true",
210                  dest="keep_instances",
211                  help=("Leave instances on the cluster after burnin,"
212                        " for investigation in case of errors or simply"
213                        " to use them")),
214   ]
215
216 # Mainly used for bash completion
217 ARGUMENTS = [cli.ArgInstance(min=1)]
218
219
220 def _DoCheckInstances(fn):
221   """Decorator for checking instances.
222
223   """
224   def wrapper(self, *args, **kwargs):
225     val = fn(self, *args, **kwargs)
226     for instance in self.instances:
227       self._CheckInstanceAlive(instance) # pylint: disable-msg=W0212
228     return val
229
230   return wrapper
231
232
233 def _DoBatch(retry):
234   """Decorator for possible batch operations.
235
236   Must come after the _DoCheckInstances decorator (if any).
237
238   @param retry: whether this is a retryable batch, will be
239       passed to StartBatch
240
241   """
242   def wrap(fn):
243     def batched(self, *args, **kwargs):
244       self.StartBatch(retry)
245       val = fn(self, *args, **kwargs)
246       self.CommitQueue()
247       return val
248     return batched
249
250   return wrap
251
252
253 class Burner(object):
254   """Burner class."""
255
256   def __init__(self):
257     """Constructor."""
258     utils.SetupLogging(constants.LOG_BURNIN, debug=False, stderr_logging=True)
259     self.url_opener = SimpleOpener()
260     self._feed_buf = StringIO()
261     self.nodes = []
262     self.instances = []
263     self.to_rem = []
264     self.queued_ops = []
265     self.opts = None
266     self.queue_retry = False
267     self.disk_count = self.disk_growth = self.disk_size = None
268     self.hvp = self.bep = None
269     self.ParseOptions()
270     self.cl = cli.GetClient()
271     self.GetState()
272
273   def ClearFeedbackBuf(self):
274     """Clear the feedback buffer."""
275     self._feed_buf.truncate(0)
276
277   def GetFeedbackBuf(self):
278     """Return the contents of the buffer."""
279     return self._feed_buf.getvalue()
280
281   def Feedback(self, msg):
282     """Acumulate feedback in our buffer."""
283     formatted_msg = "%s %s" % (time.ctime(utils.MergeTime(msg[0])), msg[2])
284     self._feed_buf.write(formatted_msg + "\n")
285     if self.opts.verbose:
286       Log(formatted_msg, indent=3)
287
288   def MaybeRetry(self, retry_count, msg, fn, *args):
289     """Possibly retry a given function execution.
290
291     @type retry_count: int
292     @param retry_count: retry counter:
293         - 0: non-retryable action
294         - 1: last retry for a retryable action
295         - MAX_RETRIES: original try for a retryable action
296     @type msg: str
297     @param msg: the kind of the operation
298     @type fn: callable
299     @param fn: the function to be called
300
301     """
302     try:
303       val = fn(*args)
304       if retry_count > 0 and retry_count < MAX_RETRIES:
305         Log("Idempotent %s succeeded after %d retries",
306             msg, MAX_RETRIES - retry_count)
307       return val
308     except Exception, err: # pylint: disable-msg=W0703
309       if retry_count == 0:
310         Log("Non-idempotent %s failed, aborting", msg)
311         raise
312       elif retry_count == 1:
313         Log("Idempotent %s repeated failure, aborting", msg)
314         raise
315       else:
316         Log("Idempotent %s failed, retry #%d/%d: %s",
317             msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err)
318         self.MaybeRetry(retry_count - 1, msg, fn, *args)
319
320   def _SetDebug(self, ops):
321     """Set the debug value on the given opcodes"""
322     for op in ops:
323       op.debug_level = self.opts.debug
324
325   def _ExecOp(self, *ops):
326     """Execute one or more opcodes and manage the exec buffer.
327
328     @return: if only opcode has been passed, we return its result;
329         otherwise we return the list of results
330
331     """
332     job_id = cli.SendJob(ops, cl=self.cl)
333     results = cli.PollJob(job_id, cl=self.cl, feedback_fn=self.Feedback)
334     if len(ops) == 1:
335       return results[0]
336     else:
337       return results
338
339   def ExecOp(self, retry, *ops):
340     """Execute one or more opcodes and manage the exec buffer.
341
342     @return: if only opcode has been passed, we return its result;
343         otherwise we return the list of results
344
345     """
346     if retry:
347       rval = MAX_RETRIES
348     else:
349       rval = 0
350     self._SetDebug(ops)
351     return self.MaybeRetry(rval, "opcode", self._ExecOp, *ops)
352
353   def ExecOrQueue(self, name, ops, post_process=None):
354     """Execute an opcode and manage the exec buffer."""
355     if self.opts.parallel:
356       self._SetDebug(ops)
357       self.queued_ops.append((ops, name, post_process))
358     else:
359       val = self.ExecOp(self.queue_retry, *ops) # pylint: disable-msg=W0142
360       if post_process is not None:
361         post_process()
362       return val
363
364   def StartBatch(self, retry):
365     """Start a new batch of jobs.
366
367     @param retry: whether this is a retryable batch
368
369     """
370     self.queued_ops = []
371     self.queue_retry = retry
372
373   def CommitQueue(self):
374     """Execute all submitted opcodes in case of parallel burnin"""
375     if not self.opts.parallel:
376       return
377
378     if self.queue_retry:
379       rval = MAX_RETRIES
380     else:
381       rval = 0
382
383     try:
384       results = self.MaybeRetry(rval, "jobset", self.ExecJobSet,
385                                 self.queued_ops)
386     finally:
387       self.queued_ops = []
388     return results
389
390   def ExecJobSet(self, jobs):
391     """Execute a set of jobs and return once all are done.
392
393     The method will return the list of results, if all jobs are
394     successful. Otherwise, OpExecError will be raised from within
395     cli.py.
396
397     """
398     self.ClearFeedbackBuf()
399     jex = cli.JobExecutor(cl=self.cl, feedback_fn=self.Feedback)
400     for ops, name, _ in jobs:
401       jex.QueueJob(name, *ops) # pylint: disable-msg=W0142
402     try:
403       results = jex.GetResults()
404     except Exception, err: # pylint: disable-msg=W0703
405       Log("Jobs failed: %s", err)
406       raise BurninFailure()
407
408     fail = False
409     val = []
410     for (_, name, post_process), (success, result) in zip(jobs, results):
411       if success:
412         if post_process:
413           try:
414             post_process()
415           except Exception, err: # pylint: disable-msg=W0703
416             Log("Post process call for job %s failed: %s", name, err)
417             fail = True
418         val.append(result)
419       else:
420         fail = True
421
422     if fail:
423       raise BurninFailure()
424
425     return val
426
427   def ParseOptions(self):
428     """Parses the command line options.
429
430     In case of command line errors, it will show the usage and exit the
431     program.
432
433     """
434     parser = optparse.OptionParser(usage="\n%s" % USAGE,
435                                    version=("%%prog (ganeti) %s" %
436                                             constants.RELEASE_VERSION),
437                                    option_list=OPTIONS)
438
439     options, args = parser.parse_args()
440     if len(args) < 1 or options.os is None:
441       Usage()
442
443     supported_disk_templates = (constants.DT_DISKLESS,
444                                 constants.DT_FILE,
445                                 constants.DT_PLAIN,
446                                 constants.DT_DRBD8)
447     if options.disk_template not in supported_disk_templates:
448       Err("Unknown disk template '%s'" % options.disk_template)
449
450     if options.disk_template == constants.DT_DISKLESS:
451       disk_size = disk_growth = []
452       options.do_addremove_disks = False
453     else:
454       disk_size = [utils.ParseUnit(v) for v in options.disk_size.split(",")]
455       disk_growth = [utils.ParseUnit(v)
456                      for v in options.disk_growth.split(",")]
457       if len(disk_growth) != len(disk_size):
458         Err("Wrong disk sizes/growth combination")
459     if ((disk_size and options.disk_template == constants.DT_DISKLESS) or
460         (not disk_size and options.disk_template != constants.DT_DISKLESS)):
461       Err("Wrong disk count/disk template combination")
462
463     self.disk_size = disk_size
464     self.disk_growth = disk_growth
465     self.disk_count = len(disk_size)
466
467     if options.nodes and options.iallocator:
468       Err("Give either the nodes option or the iallocator option, not both")
469
470     if options.http_check and not options.name_check:
471       Err("Can't enable HTTP checks without name checks")
472
473     self.opts = options
474     self.instances = args
475     self.bep = {
476       constants.BE_MEMORY: options.mem_size,
477       constants.BE_VCPUS: 1,
478       }
479
480     self.hypervisor = None
481     self.hvp = {}
482     if options.hypervisor:
483       self.hypervisor, self.hvp = options.hypervisor
484
485     socket.setdefaulttimeout(options.net_timeout)
486
487   def GetState(self):
488     """Read the cluster state from the master daemon."""
489     if self.opts.nodes:
490       names = self.opts.nodes.split(",")
491     else:
492       names = []
493     try:
494       op = opcodes.OpQueryNodes(output_fields=["name", "offline", "drained"],
495                                 names=names, use_locking=True)
496       result = self.ExecOp(True, op)
497     except errors.GenericError, err:
498       err_code, msg = cli.FormatError(err)
499       Err(msg, exit_code=err_code)
500     self.nodes = [data[0] for data in result if not (data[1] or data[2])]
501
502     op_diagnose = opcodes.OpDiagnoseOS(output_fields=["name", "valid",
503                                                       "variants"], names=[])
504     result = self.ExecOp(True, op_diagnose)
505
506     if not result:
507       Err("Can't get the OS list")
508
509     found = False
510     for (name, valid, variants) in result:
511       if valid and self.opts.os in cli.CalculateOSNames(name, variants):
512         found = True
513         break
514
515     if not found:
516       Err("OS '%s' not found" % self.opts.os)
517
518     cluster_info = self.cl.QueryClusterInfo()
519     self.cluster_info = cluster_info
520     if not self.cluster_info:
521       Err("Can't get cluster info")
522
523     default_nic_params = self.cluster_info["nicparams"][constants.PP_DEFAULT]
524     self.cluster_default_nicparams = default_nic_params
525     if self.hypervisor is None:
526       self.hypervisor = self.cluster_info["default_hypervisor"]
527     self.hv_class = hypervisor.GetHypervisorClass(self.hypervisor)
528
529   @_DoCheckInstances
530   @_DoBatch(False)
531   def BurnCreateInstances(self):
532     """Create the given instances.
533
534     """
535     self.to_rem = []
536     mytor = izip(cycle(self.nodes),
537                  islice(cycle(self.nodes), 1, None),
538                  self.instances)
539
540     Log("Creating instances")
541     for pnode, snode, instance in mytor:
542       Log("instance %s", instance, indent=1)
543       if self.opts.iallocator:
544         pnode = snode = None
545         msg = "with iallocator %s" % self.opts.iallocator
546       elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
547         snode = None
548         msg = "on %s" % pnode
549       else:
550         msg = "on %s, %s" % (pnode, snode)
551
552       Log(msg, indent=2)
553
554       op = opcodes.OpCreateInstance(instance_name=instance,
555                                     disks = [ {"size": size}
556                                               for size in self.disk_size],
557                                     disk_template=self.opts.disk_template,
558                                     nics=self.opts.nics,
559                                     mode=constants.INSTANCE_CREATE,
560                                     os_type=self.opts.os,
561                                     pnode=pnode,
562                                     snode=snode,
563                                     start=True,
564                                     ip_check=self.opts.ip_check,
565                                     name_check=self.opts.name_check,
566                                     wait_for_sync=True,
567                                     file_driver="loop",
568                                     file_storage_dir=None,
569                                     iallocator=self.opts.iallocator,
570                                     beparams=self.bep,
571                                     hvparams=self.hvp,
572                                     hypervisor=self.hypervisor,
573                                     )
574       remove_instance = lambda name: lambda: self.to_rem.append(name)
575       self.ExecOrQueue(instance, [op], post_process=remove_instance(instance))
576
577   @_DoBatch(False)
578   def BurnGrowDisks(self):
579     """Grow both the os and the swap disks by the requested amount, if any."""
580     Log("Growing disks")
581     for instance in self.instances:
582       Log("instance %s", instance, indent=1)
583       for idx, growth in enumerate(self.disk_growth):
584         if growth > 0:
585           op = opcodes.OpGrowDisk(instance_name=instance, disk=idx,
586                                   amount=growth, wait_for_sync=True)
587           Log("increase disk/%s by %s MB", idx, growth, indent=2)
588           self.ExecOrQueue(instance, [op])
589
590   @_DoBatch(True)
591   def BurnReplaceDisks1D8(self):
592     """Replace disks on primary and secondary for drbd8."""
593     Log("Replacing disks on the same nodes")
594     for instance in self.instances:
595       Log("instance %s", instance, indent=1)
596       ops = []
597       for mode in constants.REPLACE_DISK_SEC, constants.REPLACE_DISK_PRI:
598         op = opcodes.OpReplaceDisks(instance_name=instance,
599                                     mode=mode,
600                                     disks=[i for i in range(self.disk_count)],
601                                     early_release=self.opts.early_release)
602         Log("run %s", mode, indent=2)
603         ops.append(op)
604       self.ExecOrQueue(instance, ops)
605
606   @_DoBatch(True)
607   def BurnReplaceDisks2(self):
608     """Replace secondary node."""
609     Log("Changing the secondary node")
610     mode = constants.REPLACE_DISK_CHG
611
612     mytor = izip(islice(cycle(self.nodes), 2, None),
613                  self.instances)
614     for tnode, instance in mytor:
615       Log("instance %s", instance, indent=1)
616       if self.opts.iallocator:
617         tnode = None
618         msg = "with iallocator %s" % self.opts.iallocator
619       else:
620         msg = tnode
621       op = opcodes.OpReplaceDisks(instance_name=instance,
622                                   mode=mode,
623                                   remote_node=tnode,
624                                   iallocator=self.opts.iallocator,
625                                   disks=[],
626                                   early_release=self.opts.early_release)
627       Log("run %s %s", mode, msg, indent=2)
628       self.ExecOrQueue(instance, [op])
629
630   @_DoCheckInstances
631   @_DoBatch(False)
632   def BurnFailover(self):
633     """Failover the instances."""
634     Log("Failing over instances")
635     for instance in self.instances:
636       Log("instance %s", instance, indent=1)
637       op = opcodes.OpFailoverInstance(instance_name=instance,
638                                       ignore_consistency=False)
639       self.ExecOrQueue(instance, [op])
640
641   @_DoCheckInstances
642   @_DoBatch(False)
643   def BurnMove(self):
644     """Move the instances."""
645     Log("Moving instances")
646     mytor = izip(islice(cycle(self.nodes), 1, None),
647                  self.instances)
648     for tnode, instance in mytor:
649       Log("instance %s", instance, indent=1)
650       op = opcodes.OpMoveInstance(instance_name=instance,
651                                   target_node=tnode)
652       self.ExecOrQueue(instance, [op])
653
654   @_DoBatch(False)
655   def BurnMigrate(self):
656     """Migrate the instances."""
657     Log("Migrating instances")
658     for instance in self.instances:
659       Log("instance %s", instance, indent=1)
660       op1 = opcodes.OpMigrateInstance(instance_name=instance, live=True,
661                                       cleanup=False)
662
663       op2 = opcodes.OpMigrateInstance(instance_name=instance, live=True,
664                                       cleanup=True)
665       Log("migration and migration cleanup", indent=2)
666       self.ExecOrQueue(instance, [op1, op2])
667
668   @_DoCheckInstances
669   @_DoBatch(False)
670   def BurnImportExport(self):
671     """Export the instance, delete it, and import it back.
672
673     """
674     Log("Exporting and re-importing instances")
675     mytor = izip(cycle(self.nodes),
676                  islice(cycle(self.nodes), 1, None),
677                  islice(cycle(self.nodes), 2, None),
678                  self.instances)
679
680     for pnode, snode, enode, instance in mytor:
681       Log("instance %s", instance, indent=1)
682       # read the full name of the instance
683       nam_op = opcodes.OpQueryInstances(output_fields=["name"],
684                                         names=[instance], use_locking=True)
685       full_name = self.ExecOp(False, nam_op)[0][0]
686
687       if self.opts.iallocator:
688         pnode = snode = None
689         import_log_msg = ("import from %s"
690                           " with iallocator %s" %
691                           (enode, self.opts.iallocator))
692       elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
693         snode = None
694         import_log_msg = ("import from %s to %s" %
695                           (enode, pnode))
696       else:
697         import_log_msg = ("import from %s to %s, %s" %
698                           (enode, pnode, snode))
699
700       exp_op = opcodes.OpExportInstance(instance_name=instance,
701                                            target_node=enode,
702                                            shutdown=True)
703       rem_op = opcodes.OpRemoveInstance(instance_name=instance,
704                                         ignore_failures=True)
705       imp_dir = utils.PathJoin(constants.EXPORT_DIR, full_name)
706       imp_op = opcodes.OpCreateInstance(instance_name=instance,
707                                         disks = [ {"size": size}
708                                                   for size in self.disk_size],
709                                         disk_template=self.opts.disk_template,
710                                         nics=self.opts.nics,
711                                         mode=constants.INSTANCE_IMPORT,
712                                         src_node=enode,
713                                         src_path=imp_dir,
714                                         pnode=pnode,
715                                         snode=snode,
716                                         start=True,
717                                         ip_check=self.opts.ip_check,
718                                         name_check=self.opts.name_check,
719                                         wait_for_sync=True,
720                                         file_storage_dir=None,
721                                         file_driver="loop",
722                                         iallocator=self.opts.iallocator,
723                                         beparams=self.bep,
724                                         hvparams=self.hvp,
725                                         )
726
727       erem_op = opcodes.OpRemoveExport(instance_name=instance)
728
729       Log("export to node %s", enode, indent=2)
730       Log("remove instance", indent=2)
731       Log(import_log_msg, indent=2)
732       Log("remove export", indent=2)
733       self.ExecOrQueue(instance, [exp_op, rem_op, imp_op, erem_op])
734
735   @staticmethod
736   def StopInstanceOp(instance):
737     """Stop given instance."""
738     return opcodes.OpShutdownInstance(instance_name=instance)
739
740   @staticmethod
741   def StartInstanceOp(instance):
742     """Start given instance."""
743     return opcodes.OpStartupInstance(instance_name=instance, force=False)
744
745   @staticmethod
746   def RenameInstanceOp(instance, instance_new):
747     """Rename instance."""
748     return opcodes.OpRenameInstance(instance_name=instance,
749                                     new_name=instance_new)
750
751   @_DoCheckInstances
752   @_DoBatch(True)
753   def BurnStopStart(self):
754     """Stop/start the instances."""
755     Log("Stopping and starting instances")
756     for instance in self.instances:
757       Log("instance %s", instance, indent=1)
758       op1 = self.StopInstanceOp(instance)
759       op2 = self.StartInstanceOp(instance)
760       self.ExecOrQueue(instance, [op1, op2])
761
762   @_DoBatch(False)
763   def BurnRemove(self):
764     """Remove the instances."""
765     Log("Removing instances")
766     for instance in self.to_rem:
767       Log("instance %s", instance, indent=1)
768       op = opcodes.OpRemoveInstance(instance_name=instance,
769                                     ignore_failures=True)
770       self.ExecOrQueue(instance, [op])
771
772   def BurnRename(self):
773     """Rename the instances.
774
775     Note that this function will not execute in parallel, since we
776     only have one target for rename.
777
778     """
779     Log("Renaming instances")
780     rename = self.opts.rename
781     for instance in self.instances:
782       Log("instance %s", instance, indent=1)
783       op_stop1 = self.StopInstanceOp(instance)
784       op_stop2 = self.StopInstanceOp(rename)
785       op_rename1 = self.RenameInstanceOp(instance, rename)
786       op_rename2 = self.RenameInstanceOp(rename, instance)
787       op_start1 = self.StartInstanceOp(rename)
788       op_start2 = self.StartInstanceOp(instance)
789       self.ExecOp(False, op_stop1, op_rename1, op_start1)
790       self._CheckInstanceAlive(rename)
791       self.ExecOp(False, op_stop2, op_rename2, op_start2)
792       self._CheckInstanceAlive(instance)
793
794   @_DoCheckInstances
795   @_DoBatch(True)
796   def BurnReinstall(self):
797     """Reinstall the instances."""
798     Log("Reinstalling instances")
799     for instance in self.instances:
800       Log("instance %s", instance, indent=1)
801       op1 = self.StopInstanceOp(instance)
802       op2 = opcodes.OpReinstallInstance(instance_name=instance)
803       Log("reinstall without passing the OS", indent=2)
804       op3 = opcodes.OpReinstallInstance(instance_name=instance,
805                                         os_type=self.opts.os)
806       Log("reinstall specifying the OS", indent=2)
807       op4 = self.StartInstanceOp(instance)
808       self.ExecOrQueue(instance, [op1, op2, op3, op4])
809
810   @_DoCheckInstances
811   @_DoBatch(True)
812   def BurnReboot(self):
813     """Reboot the instances."""
814     Log("Rebooting instances")
815     for instance in self.instances:
816       Log("instance %s", instance, indent=1)
817       ops = []
818       for reboot_type in constants.REBOOT_TYPES:
819         op = opcodes.OpRebootInstance(instance_name=instance,
820                                       reboot_type=reboot_type,
821                                       ignore_secondaries=False)
822         Log("reboot with type '%s'", reboot_type, indent=2)
823         ops.append(op)
824       self.ExecOrQueue(instance, ops)
825
826   @_DoCheckInstances
827   @_DoBatch(True)
828   def BurnActivateDisks(self):
829     """Activate and deactivate disks of the instances."""
830     Log("Activating/deactivating disks")
831     for instance in self.instances:
832       Log("instance %s", instance, indent=1)
833       op_start = self.StartInstanceOp(instance)
834       op_act = opcodes.OpActivateInstanceDisks(instance_name=instance)
835       op_deact = opcodes.OpDeactivateInstanceDisks(instance_name=instance)
836       op_stop = self.StopInstanceOp(instance)
837       Log("activate disks when online", indent=2)
838       Log("activate disks when offline", indent=2)
839       Log("deactivate disks (when offline)", indent=2)
840       self.ExecOrQueue(instance, [op_act, op_stop, op_act, op_deact, op_start])
841
842   @_DoCheckInstances
843   @_DoBatch(False)
844   def BurnAddRemoveDisks(self):
845     """Add and remove an extra disk for the instances."""
846     Log("Adding and removing disks")
847     for instance in self.instances:
848       Log("instance %s", instance, indent=1)
849       op_add = opcodes.OpSetInstanceParams(\
850         instance_name=instance,
851         disks=[(constants.DDM_ADD, {"size": self.disk_size[0]})])
852       op_rem = opcodes.OpSetInstanceParams(\
853         instance_name=instance, disks=[(constants.DDM_REMOVE, {})])
854       op_stop = self.StopInstanceOp(instance)
855       op_start = self.StartInstanceOp(instance)
856       Log("adding a disk", indent=2)
857       Log("removing last disk", indent=2)
858       self.ExecOrQueue(instance, [op_add, op_stop, op_rem, op_start])
859
860   @_DoBatch(False)
861   def BurnAddRemoveNICs(self):
862     """Add and remove an extra NIC for the instances."""
863     Log("Adding and removing NICs")
864     for instance in self.instances:
865       Log("instance %s", instance, indent=1)
866       op_add = opcodes.OpSetInstanceParams(\
867         instance_name=instance, nics=[(constants.DDM_ADD, {})])
868       op_rem = opcodes.OpSetInstanceParams(\
869         instance_name=instance, nics=[(constants.DDM_REMOVE, {})])
870       Log("adding a NIC", indent=2)
871       Log("removing last NIC", indent=2)
872       self.ExecOrQueue(instance, [op_add, op_rem])
873
874   def ConfdCallback(self, reply):
875     """Callback for confd queries"""
876     if reply.type == confd_client.UPCALL_REPLY:
877       if reply.server_reply.status != constants.CONFD_REPL_STATUS_OK:
878         Err("Query %s gave non-ok status %s: %s" % (reply.orig_request,
879                                                     reply.server_reply.status,
880                                                     reply.server_reply))
881       if reply.orig_request.type == constants.CONFD_REQ_PING:
882         Log("Ping: OK", indent=1)
883       elif reply.orig_request.type == constants.CONFD_REQ_CLUSTER_MASTER:
884         if reply.server_reply.answer == self.cluster_info["master"]:
885           Log("Master: OK", indent=1)
886         else:
887           Err("Master: wrong: %s" % reply.server_reply.answer)
888       elif reply.orig_request.type == constants.CONFD_REQ_NODE_ROLE_BYNAME:
889         if reply.server_reply.answer == constants.CONFD_NODE_ROLE_MASTER:
890           Log("Node role for master: OK", indent=1)
891         else:
892           Err("Node role for master: wrong: %s" % reply.server_reply.answer)
893
894   def DoConfdRequestReply(self, req):
895     self.confd_counting_callback.RegisterQuery(req.rsalt)
896     self.confd_client.SendRequest(req, async=False)
897     while not self.confd_counting_callback.AllAnswered():
898       if not self.confd_client.ReceiveReply():
899         Err("Did not receive all expected confd replies")
900         break
901
902   def BurnConfd(self):
903     """Run confd queries for our instances.
904
905     The following confd queries are tested:
906       - CONFD_REQ_PING: simple ping
907       - CONFD_REQ_CLUSTER_MASTER: cluster master
908       - CONFD_REQ_NODE_ROLE_BYNAME: node role, for the master
909
910     """
911     Log("Checking confd results")
912
913     filter_callback = confd_client.ConfdFilterCallback(self.ConfdCallback)
914     counting_callback = confd_client.ConfdCountingCallback(filter_callback)
915     self.confd_counting_callback = counting_callback
916
917     self.confd_client = confd_client.GetConfdClient(counting_callback)
918
919     req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
920     self.DoConfdRequestReply(req)
921
922     req = confd_client.ConfdClientRequest(
923       type=constants.CONFD_REQ_CLUSTER_MASTER)
924     self.DoConfdRequestReply(req)
925
926     req = confd_client.ConfdClientRequest(
927         type=constants.CONFD_REQ_NODE_ROLE_BYNAME,
928         query=self.cluster_info["master"])
929     self.DoConfdRequestReply(req)
930
931   def _CheckInstanceAlive(self, instance):
932     """Check if an instance is alive by doing http checks.
933
934     This will try to retrieve the url on the instance /hostname.txt
935     and check that it contains the hostname of the instance. In case
936     we get ECONNREFUSED, we retry up to the net timeout seconds, for
937     any other error we abort.
938
939     """
940     if not self.opts.http_check:
941       return
942     end_time = time.time() + self.opts.net_timeout
943     url = None
944     while time.time() < end_time and url is None:
945       try:
946         url = self.url_opener.open("http://%s/hostname.txt" % instance)
947       except IOError:
948         # here we can have connection refused, no route to host, etc.
949         time.sleep(1)
950     if url is None:
951       raise InstanceDown(instance, "Cannot contact instance")
952     hostname = url.read().strip()
953     url.close()
954     if hostname != instance:
955       raise InstanceDown(instance, ("Hostname mismatch, expected %s, got %s" %
956                                     (instance, hostname)))
957
958   def BurninCluster(self):
959     """Test a cluster intensively.
960
961     This will create instances and then start/stop/failover them.
962     It is safe for existing instances but could impact performance.
963
964     """
965
966     opts = self.opts
967
968     Log("Testing global parameters")
969
970     if (len(self.nodes) == 1 and
971         opts.disk_template not in (constants.DT_DISKLESS, constants.DT_PLAIN,
972                                    constants.DT_FILE)):
973       Err("When one node is available/selected the disk template must"
974           " be 'diskless', 'file' or 'plain'")
975
976     has_err = True
977     try:
978       self.BurnCreateInstances()
979       if opts.do_replace1 and opts.disk_template in constants.DTS_NET_MIRROR:
980         self.BurnReplaceDisks1D8()
981       if (opts.do_replace2 and len(self.nodes) > 2 and
982           opts.disk_template in constants.DTS_NET_MIRROR) :
983         self.BurnReplaceDisks2()
984
985       if (opts.disk_template in constants.DTS_GROWABLE and
986           compat.any(self.disk_growth, lambda n: n > 0)):
987         self.BurnGrowDisks()
988
989       if opts.do_failover and opts.disk_template in constants.DTS_NET_MIRROR:
990         self.BurnFailover()
991
992       if opts.do_migrate:
993         if opts.disk_template != constants.DT_DRBD8:
994           Log("Skipping migration (disk template not DRBD8)")
995         elif not self.hv_class.CAN_MIGRATE:
996           Log("Skipping migration (hypervisor %s does not support it)",
997               self.hypervisor)
998         else:
999           self.BurnMigrate()
1000
1001       if (opts.do_move and len(self.nodes) > 1 and
1002           opts.disk_template in [constants.DT_PLAIN, constants.DT_FILE]):
1003         self.BurnMove()
1004
1005       if (opts.do_importexport and
1006           opts.disk_template not in (constants.DT_DISKLESS,
1007                                      constants.DT_FILE)):
1008         self.BurnImportExport()
1009
1010       if opts.do_reinstall:
1011         self.BurnReinstall()
1012
1013       if opts.do_reboot:
1014         self.BurnReboot()
1015
1016       if opts.do_addremove_disks:
1017         self.BurnAddRemoveDisks()
1018
1019       default_nic_mode = self.cluster_default_nicparams[constants.NIC_MODE]
1020       # Don't add/remove nics in routed mode, as we would need an ip to add
1021       # them with
1022       if opts.do_addremove_nics:
1023         if default_nic_mode == constants.NIC_MODE_BRIDGED:
1024           self.BurnAddRemoveNICs()
1025         else:
1026           Log("Skipping nic add/remove as the cluster is not in bridged mode")
1027
1028       if opts.do_activate_disks:
1029         self.BurnActivateDisks()
1030
1031       if opts.rename:
1032         self.BurnRename()
1033
1034       if opts.do_confd_tests:
1035         self.BurnConfd()
1036
1037       if opts.do_startstop:
1038         self.BurnStopStart()
1039
1040       has_err = False
1041     finally:
1042       if has_err:
1043         Log("Error detected: opcode buffer follows:\n\n")
1044         Log(self.GetFeedbackBuf())
1045         Log("\n\n")
1046       if not self.opts.keep_instances:
1047         try:
1048           self.BurnRemove()
1049         except Exception, err:  # pylint: disable-msg=W0703
1050           if has_err: # already detected errors, so errors in removal
1051                       # are quite expected
1052             Log("Note: error detected during instance remove: %s", err)
1053           else: # non-expected error
1054             raise
1055
1056     return 0
1057
1058
1059 def main():
1060   """Main function"""
1061
1062   burner = Burner()
1063   return burner.BurninCluster()
1064
1065
1066 if __name__ == "__main__":
1067   main()