Change behaviour of OpDiagnoseOS w.r.t. 'valid'
[ganeti-local] / tools / burnin
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Burnin program
23
24 """
25
26 import sys
27 import optparse
28 import time
29 import socket
30 import urllib
31 from itertools import izip, islice, cycle
32 from cStringIO import StringIO
33
34 from ganeti import opcodes
35 from ganeti import constants
36 from ganeti import cli
37 from ganeti import errors
38 from ganeti import utils
39 from ganeti import hypervisor
40 from ganeti import compat
41
42 from ganeti.confd import client as confd_client
43
44
45 USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...")
46
47 MAX_RETRIES = 3
48 LOG_HEADERS = {
49   0: "- ",
50   1: "* ",
51   2: ""
52   }
53
54 class InstanceDown(Exception):
55   """The checked instance was not up"""
56
57
58 class BurninFailure(Exception):
59   """Failure detected during burning"""
60
61
62 def Usage():
63   """Shows program usage information and exits the program."""
64
65   print >> sys.stderr, "Usage:"
66   print >> sys.stderr, USAGE
67   sys.exit(2)
68
69
70 def Log(msg, *args, **kwargs):
71   """Simple function that prints out its argument.
72
73   """
74   if args:
75     msg = msg % args
76   indent = kwargs.get('indent', 0)
77   sys.stdout.write("%*s%s%s\n" % (2*indent, "",
78                                   LOG_HEADERS.get(indent, "  "), msg))
79   sys.stdout.flush()
80
81
82 def Err(msg, exit_code=1):
83   """Simple error logging that prints to stderr.
84
85   """
86   sys.stderr.write(msg + "\n")
87   sys.stderr.flush()
88   sys.exit(exit_code)
89
90
91 class SimpleOpener(urllib.FancyURLopener):
92   """A simple url opener"""
93   # pylint: disable-msg=W0221
94
95   def prompt_user_passwd(self, host, realm, clear_cache=0):
96     """No-interaction version of prompt_user_passwd."""
97     # we follow parent class' API
98     # pylint: disable-msg=W0613
99     return None, None
100
101   def http_error_default(self, url, fp, errcode, errmsg, headers):
102     """Custom error handling"""
103     # make sure sockets are not left in CLOSE_WAIT, this is similar
104     # but with a different exception to the BasicURLOpener class
105     _ = fp.read() # throw away data
106     fp.close()
107     raise InstanceDown("HTTP error returned: code %s, msg %s" %
108                        (errcode, errmsg))
109
110
111 OPTIONS = [
112   cli.cli_option("-o", "--os", dest="os", default=None,
113                  help="OS to use during burnin",
114                  metavar="<OS>",
115                  completion_suggest=cli.OPT_COMPL_ONE_OS),
116   cli.HYPERVISOR_OPT,
117   cli.OSPARAMS_OPT,
118   cli.cli_option("--disk-size", dest="disk_size",
119                  help="Disk size (determines disk count)",
120                  default="128m", type="string", metavar="<size,size,...>",
121                  completion_suggest=("128M 512M 1G 4G 1G,256M"
122                                      " 4G,1G,1G 10G").split()),
123   cli.cli_option("--disk-growth", dest="disk_growth", help="Disk growth",
124                  default="128m", type="string", metavar="<size,size,...>"),
125   cli.cli_option("--mem-size", dest="mem_size", help="Memory size",
126                  default=128, type="unit", metavar="<size>",
127                  completion_suggest=("128M 256M 512M 1G 4G 8G"
128                                      " 12G 16G").split()),
129   cli.DEBUG_OPT,
130   cli.VERBOSE_OPT,
131   cli.NOIPCHECK_OPT,
132   cli.NONAMECHECK_OPT,
133   cli.EARLY_RELEASE_OPT,
134   cli.cli_option("--no-replace1", dest="do_replace1",
135                  help="Skip disk replacement with the same secondary",
136                  action="store_false", default=True),
137   cli.cli_option("--no-replace2", dest="do_replace2",
138                  help="Skip disk replacement with a different secondary",
139                  action="store_false", default=True),
140   cli.cli_option("--no-failover", dest="do_failover",
141                  help="Skip instance failovers", action="store_false",
142                  default=True),
143   cli.cli_option("--no-migrate", dest="do_migrate",
144                  help="Skip instance live migration",
145                  action="store_false", default=True),
146   cli.cli_option("--no-move", dest="do_move",
147                  help="Skip instance moves", action="store_false",
148                  default=True),
149   cli.cli_option("--no-importexport", dest="do_importexport",
150                  help="Skip instance export/import", action="store_false",
151                  default=True),
152   cli.cli_option("--no-startstop", dest="do_startstop",
153                  help="Skip instance stop/start", action="store_false",
154                  default=True),
155   cli.cli_option("--no-reinstall", dest="do_reinstall",
156                  help="Skip instance reinstall", action="store_false",
157                  default=True),
158   cli.cli_option("--no-reboot", dest="do_reboot",
159                  help="Skip instance reboot", action="store_false",
160                  default=True),
161   cli.cli_option("--reboot-types", dest="reboot_types",
162                  help="Specify the reboot types", default=None),
163   cli.cli_option("--no-activate-disks", dest="do_activate_disks",
164                  help="Skip disk activation/deactivation",
165                  action="store_false", default=True),
166   cli.cli_option("--no-add-disks", dest="do_addremove_disks",
167                  help="Skip disk addition/removal",
168                  action="store_false", default=True),
169   cli.cli_option("--no-add-nics", dest="do_addremove_nics",
170                  help="Skip NIC addition/removal",
171                  action="store_false", default=True),
172   cli.cli_option("--no-nics", dest="nics",
173                  help="No network interfaces", action="store_const",
174                  const=[], default=[{}]),
175   cli.cli_option("--no-confd", dest="do_confd_tests",
176                  help="Skip confd queries",
177                  action="store_false", default=True),
178   cli.cli_option("--rename", dest="rename", default=None,
179                  help=("Give one unused instance name which is taken"
180                        " to start the renaming sequence"),
181                  metavar="<instance_name>"),
182   cli.cli_option("-t", "--disk-template", dest="disk_template",
183                  choices=list(constants.DISK_TEMPLATES),
184                  default=constants.DT_DRBD8,
185                  help="Disk template (diskless, file, plain or drbd) [drbd]"),
186   cli.cli_option("-n", "--nodes", dest="nodes", default="",
187                  help=("Comma separated list of nodes to perform"
188                        " the burnin on (defaults to all nodes)"),
189                  completion_suggest=cli.OPT_COMPL_MANY_NODES),
190   cli.cli_option("-I", "--iallocator", dest="iallocator",
191                  default=None, type="string",
192                  help=("Perform the allocation using an iallocator"
193                        " instead of fixed node spread (node restrictions no"
194                        " longer apply, therefore -n/--nodes must not be"
195                        " used"),
196                  completion_suggest=cli.OPT_COMPL_ONE_IALLOCATOR),
197   cli.cli_option("-p", "--parallel", default=False, action="store_true",
198                  dest="parallel",
199                  help=("Enable parallelization of some operations in"
200                        " order to speed burnin or to test granular locking")),
201   cli.cli_option("--net-timeout", default=15, type="int",
202                  dest="net_timeout",
203                  help=("The instance check network timeout in seconds"
204                        " (defaults to 15 seconds)"),
205                  completion_suggest="15 60 300 900".split()),
206   cli.cli_option("-C", "--http-check", default=False, action="store_true",
207                  dest="http_check",
208                  help=("Enable checking of instance status via http,"
209                        " looking for /hostname.txt that should contain the"
210                        " name of the instance")),
211   cli.cli_option("-K", "--keep-instances", default=False,
212                  action="store_true",
213                  dest="keep_instances",
214                  help=("Leave instances on the cluster after burnin,"
215                        " for investigation in case of errors or simply"
216                        " to use them")),
217   ]
218
219 # Mainly used for bash completion
220 ARGUMENTS = [cli.ArgInstance(min=1)]
221
222
223 def _DoCheckInstances(fn):
224   """Decorator for checking instances.
225
226   """
227   def wrapper(self, *args, **kwargs):
228     val = fn(self, *args, **kwargs)
229     for instance in self.instances:
230       self._CheckInstanceAlive(instance) # pylint: disable-msg=W0212
231     return val
232
233   return wrapper
234
235
236 def _DoBatch(retry):
237   """Decorator for possible batch operations.
238
239   Must come after the _DoCheckInstances decorator (if any).
240
241   @param retry: whether this is a retryable batch, will be
242       passed to StartBatch
243
244   """
245   def wrap(fn):
246     def batched(self, *args, **kwargs):
247       self.StartBatch(retry)
248       val = fn(self, *args, **kwargs)
249       self.CommitQueue()
250       return val
251     return batched
252
253   return wrap
254
255
256 class Burner(object):
257   """Burner class."""
258
259   def __init__(self):
260     """Constructor."""
261     utils.SetupLogging(constants.LOG_BURNIN, debug=False, stderr_logging=True)
262     self.url_opener = SimpleOpener()
263     self._feed_buf = StringIO()
264     self.nodes = []
265     self.instances = []
266     self.to_rem = []
267     self.queued_ops = []
268     self.opts = None
269     self.queue_retry = False
270     self.disk_count = self.disk_growth = self.disk_size = None
271     self.hvp = self.bep = None
272     self.ParseOptions()
273     self.cl = cli.GetClient()
274     self.GetState()
275
276   def ClearFeedbackBuf(self):
277     """Clear the feedback buffer."""
278     self._feed_buf.truncate(0)
279
280   def GetFeedbackBuf(self):
281     """Return the contents of the buffer."""
282     return self._feed_buf.getvalue()
283
284   def Feedback(self, msg):
285     """Acumulate feedback in our buffer."""
286     formatted_msg = "%s %s" % (time.ctime(utils.MergeTime(msg[0])), msg[2])
287     self._feed_buf.write(formatted_msg + "\n")
288     if self.opts.verbose:
289       Log(formatted_msg, indent=3)
290
291   def MaybeRetry(self, retry_count, msg, fn, *args):
292     """Possibly retry a given function execution.
293
294     @type retry_count: int
295     @param retry_count: retry counter:
296         - 0: non-retryable action
297         - 1: last retry for a retryable action
298         - MAX_RETRIES: original try for a retryable action
299     @type msg: str
300     @param msg: the kind of the operation
301     @type fn: callable
302     @param fn: the function to be called
303
304     """
305     try:
306       val = fn(*args)
307       if retry_count > 0 and retry_count < MAX_RETRIES:
308         Log("Idempotent %s succeeded after %d retries",
309             msg, MAX_RETRIES - retry_count)
310       return val
311     except Exception, err: # pylint: disable-msg=W0703
312       if retry_count == 0:
313         Log("Non-idempotent %s failed, aborting", msg)
314         raise
315       elif retry_count == 1:
316         Log("Idempotent %s repeated failure, aborting", msg)
317         raise
318       else:
319         Log("Idempotent %s failed, retry #%d/%d: %s",
320             msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err)
321         self.MaybeRetry(retry_count - 1, msg, fn, *args)
322
323   def _SetDebug(self, ops):
324     """Set the debug value on the given opcodes"""
325     for op in ops:
326       op.debug_level = self.opts.debug
327
328   def _ExecOp(self, *ops):
329     """Execute one or more opcodes and manage the exec buffer.
330
331     @return: if only opcode has been passed, we return its result;
332         otherwise we return the list of results
333
334     """
335     job_id = cli.SendJob(ops, cl=self.cl)
336     results = cli.PollJob(job_id, cl=self.cl, feedback_fn=self.Feedback)
337     if len(ops) == 1:
338       return results[0]
339     else:
340       return results
341
342   def ExecOp(self, retry, *ops):
343     """Execute one or more opcodes and manage the exec buffer.
344
345     @return: if only opcode has been passed, we return its result;
346         otherwise we return the list of results
347
348     """
349     if retry:
350       rval = MAX_RETRIES
351     else:
352       rval = 0
353     self._SetDebug(ops)
354     return self.MaybeRetry(rval, "opcode", self._ExecOp, *ops)
355
356   def ExecOrQueue(self, name, ops, post_process=None):
357     """Execute an opcode and manage the exec buffer."""
358     if self.opts.parallel:
359       self._SetDebug(ops)
360       self.queued_ops.append((ops, name, post_process))
361     else:
362       val = self.ExecOp(self.queue_retry, *ops) # pylint: disable-msg=W0142
363       if post_process is not None:
364         post_process()
365       return val
366
367   def StartBatch(self, retry):
368     """Start a new batch of jobs.
369
370     @param retry: whether this is a retryable batch
371
372     """
373     self.queued_ops = []
374     self.queue_retry = retry
375
376   def CommitQueue(self):
377     """Execute all submitted opcodes in case of parallel burnin"""
378     if not self.opts.parallel or not self.queued_ops:
379       return
380
381     if self.queue_retry:
382       rval = MAX_RETRIES
383     else:
384       rval = 0
385
386     try:
387       results = self.MaybeRetry(rval, "jobset", self.ExecJobSet,
388                                 self.queued_ops)
389     finally:
390       self.queued_ops = []
391     return results
392
393   def ExecJobSet(self, jobs):
394     """Execute a set of jobs and return once all are done.
395
396     The method will return the list of results, if all jobs are
397     successful. Otherwise, OpExecError will be raised from within
398     cli.py.
399
400     """
401     self.ClearFeedbackBuf()
402     jex = cli.JobExecutor(cl=self.cl, feedback_fn=self.Feedback)
403     for ops, name, _ in jobs:
404       jex.QueueJob(name, *ops) # pylint: disable-msg=W0142
405     try:
406       results = jex.GetResults()
407     except Exception, err: # pylint: disable-msg=W0703
408       Log("Jobs failed: %s", err)
409       raise BurninFailure()
410
411     fail = False
412     val = []
413     for (_, name, post_process), (success, result) in zip(jobs, results):
414       if success:
415         if post_process:
416           try:
417             post_process()
418           except Exception, err: # pylint: disable-msg=W0703
419             Log("Post process call for job %s failed: %s", name, err)
420             fail = True
421         val.append(result)
422       else:
423         fail = True
424
425     if fail:
426       raise BurninFailure()
427
428     return val
429
430   def ParseOptions(self):
431     """Parses the command line options.
432
433     In case of command line errors, it will show the usage and exit the
434     program.
435
436     """
437     parser = optparse.OptionParser(usage="\n%s" % USAGE,
438                                    version=("%%prog (ganeti) %s" %
439                                             constants.RELEASE_VERSION),
440                                    option_list=OPTIONS)
441
442     options, args = parser.parse_args()
443     if len(args) < 1 or options.os is None:
444       Usage()
445
446     supported_disk_templates = (constants.DT_DISKLESS,
447                                 constants.DT_FILE,
448                                 constants.DT_PLAIN,
449                                 constants.DT_DRBD8)
450     if options.disk_template not in supported_disk_templates:
451       Err("Unknown disk template '%s'" % options.disk_template)
452
453     if options.disk_template == constants.DT_DISKLESS:
454       disk_size = disk_growth = []
455       options.do_addremove_disks = False
456     else:
457       disk_size = [utils.ParseUnit(v) for v in options.disk_size.split(",")]
458       disk_growth = [utils.ParseUnit(v)
459                      for v in options.disk_growth.split(",")]
460       if len(disk_growth) != len(disk_size):
461         Err("Wrong disk sizes/growth combination")
462     if ((disk_size and options.disk_template == constants.DT_DISKLESS) or
463         (not disk_size and options.disk_template != constants.DT_DISKLESS)):
464       Err("Wrong disk count/disk template combination")
465
466     self.disk_size = disk_size
467     self.disk_growth = disk_growth
468     self.disk_count = len(disk_size)
469
470     if options.nodes and options.iallocator:
471       Err("Give either the nodes option or the iallocator option, not both")
472
473     if options.http_check and not options.name_check:
474       Err("Can't enable HTTP checks without name checks")
475
476     self.opts = options
477     self.instances = args
478     self.bep = {
479       constants.BE_MEMORY: options.mem_size,
480       constants.BE_VCPUS: 1,
481       }
482
483     self.hypervisor = None
484     self.hvp = {}
485     if options.hypervisor:
486       self.hypervisor, self.hvp = options.hypervisor
487
488     if options.reboot_types is None:
489       options.reboot_types = constants.REBOOT_TYPES
490     else:
491       options.reboot_types = options.reboot_types.split(",")
492       rt_diff = set(options.reboot_types).difference(constants.REBOOT_TYPES)
493       if rt_diff:
494         Err("Invalid reboot types specified: %s" % utils.CommaJoin(rt_diff))
495
496     socket.setdefaulttimeout(options.net_timeout)
497
498   def GetState(self):
499     """Read the cluster state from the master daemon."""
500     if self.opts.nodes:
501       names = self.opts.nodes.split(",")
502     else:
503       names = []
504     try:
505       op = opcodes.OpQueryNodes(output_fields=["name", "offline", "drained"],
506                                 names=names, use_locking=True)
507       result = self.ExecOp(True, op)
508     except errors.GenericError, err:
509       err_code, msg = cli.FormatError(err)
510       Err(msg, exit_code=err_code)
511     self.nodes = [data[0] for data in result if not (data[1] or data[2])]
512
513     op_diagnose = opcodes.OpDiagnoseOS(output_fields=["name", "variants"],
514                                        names=[])
515     result = self.ExecOp(True, op_diagnose)
516
517     if not result:
518       Err("Can't get the OS list")
519
520     found = False
521     for (name, variants) in result:
522       if self.opts.os in cli.CalculateOSNames(name, variants):
523         found = True
524         break
525
526     if not found:
527       Err("OS '%s' not found" % self.opts.os)
528
529     cluster_info = self.cl.QueryClusterInfo()
530     self.cluster_info = cluster_info
531     if not self.cluster_info:
532       Err("Can't get cluster info")
533
534     default_nic_params = self.cluster_info["nicparams"][constants.PP_DEFAULT]
535     self.cluster_default_nicparams = default_nic_params
536     if self.hypervisor is None:
537       self.hypervisor = self.cluster_info["default_hypervisor"]
538     self.hv_class = hypervisor.GetHypervisorClass(self.hypervisor)
539
540   @_DoCheckInstances
541   @_DoBatch(False)
542   def BurnCreateInstances(self):
543     """Create the given instances.
544
545     """
546     self.to_rem = []
547     mytor = izip(cycle(self.nodes),
548                  islice(cycle(self.nodes), 1, None),
549                  self.instances)
550
551     Log("Creating instances")
552     for pnode, snode, instance in mytor:
553       Log("instance %s", instance, indent=1)
554       if self.opts.iallocator:
555         pnode = snode = None
556         msg = "with iallocator %s" % self.opts.iallocator
557       elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
558         snode = None
559         msg = "on %s" % pnode
560       else:
561         msg = "on %s, %s" % (pnode, snode)
562
563       Log(msg, indent=2)
564
565       op = opcodes.OpCreateInstance(instance_name=instance,
566                                     disks = [ {"size": size}
567                                               for size in self.disk_size],
568                                     disk_template=self.opts.disk_template,
569                                     nics=self.opts.nics,
570                                     mode=constants.INSTANCE_CREATE,
571                                     os_type=self.opts.os,
572                                     pnode=pnode,
573                                     snode=snode,
574                                     start=True,
575                                     ip_check=self.opts.ip_check,
576                                     name_check=self.opts.name_check,
577                                     wait_for_sync=True,
578                                     file_driver="loop",
579                                     file_storage_dir=None,
580                                     iallocator=self.opts.iallocator,
581                                     beparams=self.bep,
582                                     hvparams=self.hvp,
583                                     hypervisor=self.hypervisor,
584                                     osparams=self.opts.osparams,
585                                     )
586       remove_instance = lambda name: lambda: self.to_rem.append(name)
587       self.ExecOrQueue(instance, [op], post_process=remove_instance(instance))
588
589   @_DoBatch(False)
590   def BurnGrowDisks(self):
591     """Grow both the os and the swap disks by the requested amount, if any."""
592     Log("Growing disks")
593     for instance in self.instances:
594       Log("instance %s", instance, indent=1)
595       for idx, growth in enumerate(self.disk_growth):
596         if growth > 0:
597           op = opcodes.OpGrowDisk(instance_name=instance, disk=idx,
598                                   amount=growth, wait_for_sync=True)
599           Log("increase disk/%s by %s MB", idx, growth, indent=2)
600           self.ExecOrQueue(instance, [op])
601
602   @_DoBatch(True)
603   def BurnReplaceDisks1D8(self):
604     """Replace disks on primary and secondary for drbd8."""
605     Log("Replacing disks on the same nodes")
606     for instance in self.instances:
607       Log("instance %s", instance, indent=1)
608       ops = []
609       for mode in constants.REPLACE_DISK_SEC, constants.REPLACE_DISK_PRI:
610         op = opcodes.OpReplaceDisks(instance_name=instance,
611                                     mode=mode,
612                                     disks=[i for i in range(self.disk_count)],
613                                     early_release=self.opts.early_release)
614         Log("run %s", mode, indent=2)
615         ops.append(op)
616       self.ExecOrQueue(instance, ops)
617
618   @_DoBatch(True)
619   def BurnReplaceDisks2(self):
620     """Replace secondary node."""
621     Log("Changing the secondary node")
622     mode = constants.REPLACE_DISK_CHG
623
624     mytor = izip(islice(cycle(self.nodes), 2, None),
625                  self.instances)
626     for tnode, instance in mytor:
627       Log("instance %s", instance, indent=1)
628       if self.opts.iallocator:
629         tnode = None
630         msg = "with iallocator %s" % self.opts.iallocator
631       else:
632         msg = tnode
633       op = opcodes.OpReplaceDisks(instance_name=instance,
634                                   mode=mode,
635                                   remote_node=tnode,
636                                   iallocator=self.opts.iallocator,
637                                   disks=[],
638                                   early_release=self.opts.early_release)
639       Log("run %s %s", mode, msg, indent=2)
640       self.ExecOrQueue(instance, [op])
641
642   @_DoCheckInstances
643   @_DoBatch(False)
644   def BurnFailover(self):
645     """Failover the instances."""
646     Log("Failing over instances")
647     for instance in self.instances:
648       Log("instance %s", instance, indent=1)
649       op = opcodes.OpFailoverInstance(instance_name=instance,
650                                       ignore_consistency=False)
651       self.ExecOrQueue(instance, [op])
652
653   @_DoCheckInstances
654   @_DoBatch(False)
655   def BurnMove(self):
656     """Move the instances."""
657     Log("Moving instances")
658     mytor = izip(islice(cycle(self.nodes), 1, None),
659                  self.instances)
660     for tnode, instance in mytor:
661       Log("instance %s", instance, indent=1)
662       op = opcodes.OpMoveInstance(instance_name=instance,
663                                   target_node=tnode)
664       self.ExecOrQueue(instance, [op])
665
666   @_DoBatch(False)
667   def BurnMigrate(self):
668     """Migrate the instances."""
669     Log("Migrating instances")
670     for instance in self.instances:
671       Log("instance %s", instance, indent=1)
672       op1 = opcodes.OpMigrateInstance(instance_name=instance, mode=None,
673                                       cleanup=False)
674
675       op2 = opcodes.OpMigrateInstance(instance_name=instance, mode=None,
676                                       cleanup=True)
677       Log("migration and migration cleanup", indent=2)
678       self.ExecOrQueue(instance, [op1, op2])
679
680   @_DoCheckInstances
681   @_DoBatch(False)
682   def BurnImportExport(self):
683     """Export the instance, delete it, and import it back.
684
685     """
686     Log("Exporting and re-importing instances")
687     mytor = izip(cycle(self.nodes),
688                  islice(cycle(self.nodes), 1, None),
689                  islice(cycle(self.nodes), 2, None),
690                  self.instances)
691
692     for pnode, snode, enode, instance in mytor:
693       Log("instance %s", instance, indent=1)
694       # read the full name of the instance
695       nam_op = opcodes.OpQueryInstances(output_fields=["name"],
696                                         names=[instance], use_locking=True)
697       full_name = self.ExecOp(False, nam_op)[0][0]
698
699       if self.opts.iallocator:
700         pnode = snode = None
701         import_log_msg = ("import from %s"
702                           " with iallocator %s" %
703                           (enode, self.opts.iallocator))
704       elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
705         snode = None
706         import_log_msg = ("import from %s to %s" %
707                           (enode, pnode))
708       else:
709         import_log_msg = ("import from %s to %s, %s" %
710                           (enode, pnode, snode))
711
712       exp_op = opcodes.OpExportInstance(instance_name=instance,
713                                         target_node=enode,
714                                         mode=constants.EXPORT_MODE_LOCAL,
715                                         shutdown=True)
716       rem_op = opcodes.OpRemoveInstance(instance_name=instance,
717                                         ignore_failures=True)
718       imp_dir = utils.PathJoin(constants.EXPORT_DIR, full_name)
719       imp_op = opcodes.OpCreateInstance(instance_name=instance,
720                                         disks = [ {"size": size}
721                                                   for size in self.disk_size],
722                                         disk_template=self.opts.disk_template,
723                                         nics=self.opts.nics,
724                                         mode=constants.INSTANCE_IMPORT,
725                                         src_node=enode,
726                                         src_path=imp_dir,
727                                         pnode=pnode,
728                                         snode=snode,
729                                         start=True,
730                                         ip_check=self.opts.ip_check,
731                                         name_check=self.opts.name_check,
732                                         wait_for_sync=True,
733                                         file_storage_dir=None,
734                                         file_driver="loop",
735                                         iallocator=self.opts.iallocator,
736                                         beparams=self.bep,
737                                         hvparams=self.hvp,
738                                         osparams=self.opts.osparams,
739                                         )
740
741       erem_op = opcodes.OpRemoveExport(instance_name=instance)
742
743       Log("export to node %s", enode, indent=2)
744       Log("remove instance", indent=2)
745       Log(import_log_msg, indent=2)
746       Log("remove export", indent=2)
747       self.ExecOrQueue(instance, [exp_op, rem_op, imp_op, erem_op])
748
749   @staticmethod
750   def StopInstanceOp(instance):
751     """Stop given instance."""
752     return opcodes.OpShutdownInstance(instance_name=instance)
753
754   @staticmethod
755   def StartInstanceOp(instance):
756     """Start given instance."""
757     return opcodes.OpStartupInstance(instance_name=instance, force=False)
758
759   @staticmethod
760   def RenameInstanceOp(instance, instance_new):
761     """Rename instance."""
762     return opcodes.OpRenameInstance(instance_name=instance,
763                                     new_name=instance_new)
764
765   @_DoCheckInstances
766   @_DoBatch(True)
767   def BurnStopStart(self):
768     """Stop/start the instances."""
769     Log("Stopping and starting instances")
770     for instance in self.instances:
771       Log("instance %s", instance, indent=1)
772       op1 = self.StopInstanceOp(instance)
773       op2 = self.StartInstanceOp(instance)
774       self.ExecOrQueue(instance, [op1, op2])
775
776   @_DoBatch(False)
777   def BurnRemove(self):
778     """Remove the instances."""
779     Log("Removing instances")
780     for instance in self.to_rem:
781       Log("instance %s", instance, indent=1)
782       op = opcodes.OpRemoveInstance(instance_name=instance,
783                                     ignore_failures=True)
784       self.ExecOrQueue(instance, [op])
785
786   def BurnRename(self):
787     """Rename the instances.
788
789     Note that this function will not execute in parallel, since we
790     only have one target for rename.
791
792     """
793     Log("Renaming instances")
794     rename = self.opts.rename
795     for instance in self.instances:
796       Log("instance %s", instance, indent=1)
797       op_stop1 = self.StopInstanceOp(instance)
798       op_stop2 = self.StopInstanceOp(rename)
799       op_rename1 = self.RenameInstanceOp(instance, rename)
800       op_rename2 = self.RenameInstanceOp(rename, instance)
801       op_start1 = self.StartInstanceOp(rename)
802       op_start2 = self.StartInstanceOp(instance)
803       self.ExecOp(False, op_stop1, op_rename1, op_start1)
804       self._CheckInstanceAlive(rename)
805       self.ExecOp(False, op_stop2, op_rename2, op_start2)
806       self._CheckInstanceAlive(instance)
807
808   @_DoCheckInstances
809   @_DoBatch(True)
810   def BurnReinstall(self):
811     """Reinstall the instances."""
812     Log("Reinstalling instances")
813     for instance in self.instances:
814       Log("instance %s", instance, indent=1)
815       op1 = self.StopInstanceOp(instance)
816       op2 = opcodes.OpReinstallInstance(instance_name=instance)
817       Log("reinstall without passing the OS", indent=2)
818       op3 = opcodes.OpReinstallInstance(instance_name=instance,
819                                         os_type=self.opts.os)
820       Log("reinstall specifying the OS", indent=2)
821       op4 = self.StartInstanceOp(instance)
822       self.ExecOrQueue(instance, [op1, op2, op3, op4])
823
824   @_DoCheckInstances
825   @_DoBatch(True)
826   def BurnReboot(self):
827     """Reboot the instances."""
828     Log("Rebooting instances")
829     for instance in self.instances:
830       Log("instance %s", instance, indent=1)
831       ops = []
832       for reboot_type in self.opts.reboot_types:
833         op = opcodes.OpRebootInstance(instance_name=instance,
834                                       reboot_type=reboot_type,
835                                       ignore_secondaries=False)
836         Log("reboot with type '%s'", reboot_type, indent=2)
837         ops.append(op)
838       self.ExecOrQueue(instance, ops)
839
840   @_DoCheckInstances
841   @_DoBatch(True)
842   def BurnActivateDisks(self):
843     """Activate and deactivate disks of the instances."""
844     Log("Activating/deactivating disks")
845     for instance in self.instances:
846       Log("instance %s", instance, indent=1)
847       op_start = self.StartInstanceOp(instance)
848       op_act = opcodes.OpActivateInstanceDisks(instance_name=instance)
849       op_deact = opcodes.OpDeactivateInstanceDisks(instance_name=instance)
850       op_stop = self.StopInstanceOp(instance)
851       Log("activate disks when online", indent=2)
852       Log("activate disks when offline", indent=2)
853       Log("deactivate disks (when offline)", indent=2)
854       self.ExecOrQueue(instance, [op_act, op_stop, op_act, op_deact, op_start])
855
856   @_DoCheckInstances
857   @_DoBatch(False)
858   def BurnAddRemoveDisks(self):
859     """Add and remove an extra disk for the instances."""
860     Log("Adding and removing disks")
861     for instance in self.instances:
862       Log("instance %s", instance, indent=1)
863       op_add = opcodes.OpSetInstanceParams(\
864         instance_name=instance,
865         disks=[(constants.DDM_ADD, {"size": self.disk_size[0]})])
866       op_rem = opcodes.OpSetInstanceParams(\
867         instance_name=instance, disks=[(constants.DDM_REMOVE, {})])
868       op_stop = self.StopInstanceOp(instance)
869       op_start = self.StartInstanceOp(instance)
870       Log("adding a disk", indent=2)
871       Log("removing last disk", indent=2)
872       self.ExecOrQueue(instance, [op_add, op_stop, op_rem, op_start])
873
874   @_DoBatch(False)
875   def BurnAddRemoveNICs(self):
876     """Add and remove an extra NIC for the instances."""
877     Log("Adding and removing NICs")
878     for instance in self.instances:
879       Log("instance %s", instance, indent=1)
880       op_add = opcodes.OpSetInstanceParams(\
881         instance_name=instance, nics=[(constants.DDM_ADD, {})])
882       op_rem = opcodes.OpSetInstanceParams(\
883         instance_name=instance, nics=[(constants.DDM_REMOVE, {})])
884       Log("adding a NIC", indent=2)
885       Log("removing last NIC", indent=2)
886       self.ExecOrQueue(instance, [op_add, op_rem])
887
888   def ConfdCallback(self, reply):
889     """Callback for confd queries"""
890     if reply.type == confd_client.UPCALL_REPLY:
891       if reply.server_reply.status != constants.CONFD_REPL_STATUS_OK:
892         Err("Query %s gave non-ok status %s: %s" % (reply.orig_request,
893                                                     reply.server_reply.status,
894                                                     reply.server_reply))
895       if reply.orig_request.type == constants.CONFD_REQ_PING:
896         Log("Ping: OK", indent=1)
897       elif reply.orig_request.type == constants.CONFD_REQ_CLUSTER_MASTER:
898         if reply.server_reply.answer == self.cluster_info["master"]:
899           Log("Master: OK", indent=1)
900         else:
901           Err("Master: wrong: %s" % reply.server_reply.answer)
902       elif reply.orig_request.type == constants.CONFD_REQ_NODE_ROLE_BYNAME:
903         if reply.server_reply.answer == constants.CONFD_NODE_ROLE_MASTER:
904           Log("Node role for master: OK", indent=1)
905         else:
906           Err("Node role for master: wrong: %s" % reply.server_reply.answer)
907
908   def DoConfdRequestReply(self, req):
909     self.confd_counting_callback.RegisterQuery(req.rsalt)
910     self.confd_client.SendRequest(req, async=False)
911     while not self.confd_counting_callback.AllAnswered():
912       if not self.confd_client.ReceiveReply():
913         Err("Did not receive all expected confd replies")
914         break
915
916   def BurnConfd(self):
917     """Run confd queries for our instances.
918
919     The following confd queries are tested:
920       - CONFD_REQ_PING: simple ping
921       - CONFD_REQ_CLUSTER_MASTER: cluster master
922       - CONFD_REQ_NODE_ROLE_BYNAME: node role, for the master
923
924     """
925     Log("Checking confd results")
926
927     filter_callback = confd_client.ConfdFilterCallback(self.ConfdCallback)
928     counting_callback = confd_client.ConfdCountingCallback(filter_callback)
929     self.confd_counting_callback = counting_callback
930
931     self.confd_client = confd_client.GetConfdClient(counting_callback)
932
933     req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
934     self.DoConfdRequestReply(req)
935
936     req = confd_client.ConfdClientRequest(
937       type=constants.CONFD_REQ_CLUSTER_MASTER)
938     self.DoConfdRequestReply(req)
939
940     req = confd_client.ConfdClientRequest(
941         type=constants.CONFD_REQ_NODE_ROLE_BYNAME,
942         query=self.cluster_info["master"])
943     self.DoConfdRequestReply(req)
944
945   def _CheckInstanceAlive(self, instance):
946     """Check if an instance is alive by doing http checks.
947
948     This will try to retrieve the url on the instance /hostname.txt
949     and check that it contains the hostname of the instance. In case
950     we get ECONNREFUSED, we retry up to the net timeout seconds, for
951     any other error we abort.
952
953     """
954     if not self.opts.http_check:
955       return
956     end_time = time.time() + self.opts.net_timeout
957     url = None
958     while time.time() < end_time and url is None:
959       try:
960         url = self.url_opener.open("http://%s/hostname.txt" % instance)
961       except IOError:
962         # here we can have connection refused, no route to host, etc.
963         time.sleep(1)
964     if url is None:
965       raise InstanceDown(instance, "Cannot contact instance")
966     hostname = url.read().strip()
967     url.close()
968     if hostname != instance:
969       raise InstanceDown(instance, ("Hostname mismatch, expected %s, got %s" %
970                                     (instance, hostname)))
971
972   def BurninCluster(self):
973     """Test a cluster intensively.
974
975     This will create instances and then start/stop/failover them.
976     It is safe for existing instances but could impact performance.
977
978     """
979
980     opts = self.opts
981
982     Log("Testing global parameters")
983
984     if (len(self.nodes) == 1 and
985         opts.disk_template not in (constants.DT_DISKLESS, constants.DT_PLAIN,
986                                    constants.DT_FILE)):
987       Err("When one node is available/selected the disk template must"
988           " be 'diskless', 'file' or 'plain'")
989
990     has_err = True
991     try:
992       self.BurnCreateInstances()
993       if opts.do_replace1 and opts.disk_template in constants.DTS_NET_MIRROR:
994         self.BurnReplaceDisks1D8()
995       if (opts.do_replace2 and len(self.nodes) > 2 and
996           opts.disk_template in constants.DTS_NET_MIRROR) :
997         self.BurnReplaceDisks2()
998
999       if (opts.disk_template in constants.DTS_GROWABLE and
1000           compat.any(n > 0 for n in self.disk_growth)):
1001         self.BurnGrowDisks()
1002
1003       if opts.do_failover and opts.disk_template in constants.DTS_NET_MIRROR:
1004         self.BurnFailover()
1005
1006       if opts.do_migrate:
1007         if opts.disk_template != constants.DT_DRBD8:
1008           Log("Skipping migration (disk template not DRBD8)")
1009         elif not self.hv_class.CAN_MIGRATE:
1010           Log("Skipping migration (hypervisor %s does not support it)",
1011               self.hypervisor)
1012         else:
1013           self.BurnMigrate()
1014
1015       if (opts.do_move and len(self.nodes) > 1 and
1016           opts.disk_template in [constants.DT_PLAIN, constants.DT_FILE]):
1017         self.BurnMove()
1018
1019       if (opts.do_importexport and
1020           opts.disk_template not in (constants.DT_DISKLESS,
1021                                      constants.DT_FILE)):
1022         self.BurnImportExport()
1023
1024       if opts.do_reinstall:
1025         self.BurnReinstall()
1026
1027       if opts.do_reboot:
1028         self.BurnReboot()
1029
1030       if opts.do_addremove_disks:
1031         self.BurnAddRemoveDisks()
1032
1033       default_nic_mode = self.cluster_default_nicparams[constants.NIC_MODE]
1034       # Don't add/remove nics in routed mode, as we would need an ip to add
1035       # them with
1036       if opts.do_addremove_nics:
1037         if default_nic_mode == constants.NIC_MODE_BRIDGED:
1038           self.BurnAddRemoveNICs()
1039         else:
1040           Log("Skipping nic add/remove as the cluster is not in bridged mode")
1041
1042       if opts.do_activate_disks:
1043         self.BurnActivateDisks()
1044
1045       if opts.rename:
1046         self.BurnRename()
1047
1048       if opts.do_confd_tests:
1049         self.BurnConfd()
1050
1051       if opts.do_startstop:
1052         self.BurnStopStart()
1053
1054       has_err = False
1055     finally:
1056       if has_err:
1057         Log("Error detected: opcode buffer follows:\n\n")
1058         Log(self.GetFeedbackBuf())
1059         Log("\n\n")
1060       if not self.opts.keep_instances:
1061         try:
1062           self.BurnRemove()
1063         except Exception, err:  # pylint: disable-msg=W0703
1064           if has_err: # already detected errors, so errors in removal
1065                       # are quite expected
1066             Log("Note: error detected during instance remove: %s", err)
1067           else: # non-expected error
1068             raise
1069
1070     return 0
1071
1072
1073 def main():
1074   """Main function"""
1075
1076   burner = Burner()
1077   return burner.BurninCluster()
1078
1079
1080 if __name__ == "__main__":
1081   main()