Rename OpDiagnoseOS and LUDiagnoseOS
[ganeti-local] / tools / burnin
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Burnin program
23
24 """
25
26 import sys
27 import optparse
28 import time
29 import socket
30 import urllib
31 from itertools import izip, islice, cycle
32 from cStringIO import StringIO
33
34 from ganeti import opcodes
35 from ganeti import constants
36 from ganeti import cli
37 from ganeti import errors
38 from ganeti import utils
39 from ganeti import hypervisor
40 from ganeti import compat
41
42 from ganeti.confd import client as confd_client
43
44
45 USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...")
46
47 MAX_RETRIES = 3
48 LOG_HEADERS = {
49   0: "- ",
50   1: "* ",
51   2: ""
52   }
53
54 class InstanceDown(Exception):
55   """The checked instance was not up"""
56
57
58 class BurninFailure(Exception):
59   """Failure detected during burning"""
60
61
62 def Usage():
63   """Shows program usage information and exits the program."""
64
65   print >> sys.stderr, "Usage:"
66   print >> sys.stderr, USAGE
67   sys.exit(2)
68
69
70 def Log(msg, *args, **kwargs):
71   """Simple function that prints out its argument.
72
73   """
74   if args:
75     msg = msg % args
76   indent = kwargs.get('indent', 0)
77   sys.stdout.write("%*s%s%s\n" % (2*indent, "",
78                                   LOG_HEADERS.get(indent, "  "), msg))
79   sys.stdout.flush()
80
81
82 def Err(msg, exit_code=1):
83   """Simple error logging that prints to stderr.
84
85   """
86   sys.stderr.write(msg + "\n")
87   sys.stderr.flush()
88   sys.exit(exit_code)
89
90
91 class SimpleOpener(urllib.FancyURLopener):
92   """A simple url opener"""
93   # pylint: disable-msg=W0221
94
95   def prompt_user_passwd(self, host, realm, clear_cache=0):
96     """No-interaction version of prompt_user_passwd."""
97     # we follow parent class' API
98     # pylint: disable-msg=W0613
99     return None, None
100
101   def http_error_default(self, url, fp, errcode, errmsg, headers):
102     """Custom error handling"""
103     # make sure sockets are not left in CLOSE_WAIT, this is similar
104     # but with a different exception to the BasicURLOpener class
105     _ = fp.read() # throw away data
106     fp.close()
107     raise InstanceDown("HTTP error returned: code %s, msg %s" %
108                        (errcode, errmsg))
109
110
111 OPTIONS = [
112   cli.cli_option("-o", "--os", dest="os", default=None,
113                  help="OS to use during burnin",
114                  metavar="<OS>",
115                  completion_suggest=cli.OPT_COMPL_ONE_OS),
116   cli.HYPERVISOR_OPT,
117   cli.OSPARAMS_OPT,
118   cli.cli_option("--disk-size", dest="disk_size",
119                  help="Disk size (determines disk count)",
120                  default="128m", type="string", metavar="<size,size,...>",
121                  completion_suggest=("128M 512M 1G 4G 1G,256M"
122                                      " 4G,1G,1G 10G").split()),
123   cli.cli_option("--disk-growth", dest="disk_growth", help="Disk growth",
124                  default="128m", type="string", metavar="<size,size,...>"),
125   cli.cli_option("--mem-size", dest="mem_size", help="Memory size",
126                  default=128, type="unit", metavar="<size>",
127                  completion_suggest=("128M 256M 512M 1G 4G 8G"
128                                      " 12G 16G").split()),
129   cli.DEBUG_OPT,
130   cli.VERBOSE_OPT,
131   cli.NOIPCHECK_OPT,
132   cli.NONAMECHECK_OPT,
133   cli.EARLY_RELEASE_OPT,
134   cli.cli_option("--no-replace1", dest="do_replace1",
135                  help="Skip disk replacement with the same secondary",
136                  action="store_false", default=True),
137   cli.cli_option("--no-replace2", dest="do_replace2",
138                  help="Skip disk replacement with a different secondary",
139                  action="store_false", default=True),
140   cli.cli_option("--no-failover", dest="do_failover",
141                  help="Skip instance failovers", action="store_false",
142                  default=True),
143   cli.cli_option("--no-migrate", dest="do_migrate",
144                  help="Skip instance live migration",
145                  action="store_false", default=True),
146   cli.cli_option("--no-move", dest="do_move",
147                  help="Skip instance moves", action="store_false",
148                  default=True),
149   cli.cli_option("--no-importexport", dest="do_importexport",
150                  help="Skip instance export/import", action="store_false",
151                  default=True),
152   cli.cli_option("--no-startstop", dest="do_startstop",
153                  help="Skip instance stop/start", action="store_false",
154                  default=True),
155   cli.cli_option("--no-reinstall", dest="do_reinstall",
156                  help="Skip instance reinstall", action="store_false",
157                  default=True),
158   cli.cli_option("--no-reboot", dest="do_reboot",
159                  help="Skip instance reboot", action="store_false",
160                  default=True),
161   cli.cli_option("--reboot-types", dest="reboot_types",
162                  help="Specify the reboot types", default=None),
163   cli.cli_option("--no-activate-disks", dest="do_activate_disks",
164                  help="Skip disk activation/deactivation",
165                  action="store_false", default=True),
166   cli.cli_option("--no-add-disks", dest="do_addremove_disks",
167                  help="Skip disk addition/removal",
168                  action="store_false", default=True),
169   cli.cli_option("--no-add-nics", dest="do_addremove_nics",
170                  help="Skip NIC addition/removal",
171                  action="store_false", default=True),
172   cli.cli_option("--no-nics", dest="nics",
173                  help="No network interfaces", action="store_const",
174                  const=[], default=[{}]),
175   cli.cli_option("--no-confd", dest="do_confd_tests",
176                  help="Skip confd queries",
177                  action="store_false", default=True),
178   cli.cli_option("--rename", dest="rename", default=None,
179                  help=("Give one unused instance name which is taken"
180                        " to start the renaming sequence"),
181                  metavar="<instance_name>"),
182   cli.cli_option("-t", "--disk-template", dest="disk_template",
183                  choices=list(constants.DISK_TEMPLATES),
184                  default=constants.DT_DRBD8,
185                  help="Disk template (diskless, file, plain or drbd) [drbd]"),
186   cli.cli_option("-n", "--nodes", dest="nodes", default="",
187                  help=("Comma separated list of nodes to perform"
188                        " the burnin on (defaults to all nodes)"),
189                  completion_suggest=cli.OPT_COMPL_MANY_NODES),
190   cli.cli_option("-I", "--iallocator", dest="iallocator",
191                  default=None, type="string",
192                  help=("Perform the allocation using an iallocator"
193                        " instead of fixed node spread (node restrictions no"
194                        " longer apply, therefore -n/--nodes must not be"
195                        " used"),
196                  completion_suggest=cli.OPT_COMPL_ONE_IALLOCATOR),
197   cli.cli_option("-p", "--parallel", default=False, action="store_true",
198                  dest="parallel",
199                  help=("Enable parallelization of some operations in"
200                        " order to speed burnin or to test granular locking")),
201   cli.cli_option("--net-timeout", default=15, type="int",
202                  dest="net_timeout",
203                  help=("The instance check network timeout in seconds"
204                        " (defaults to 15 seconds)"),
205                  completion_suggest="15 60 300 900".split()),
206   cli.cli_option("-C", "--http-check", default=False, action="store_true",
207                  dest="http_check",
208                  help=("Enable checking of instance status via http,"
209                        " looking for /hostname.txt that should contain the"
210                        " name of the instance")),
211   cli.cli_option("-K", "--keep-instances", default=False,
212                  action="store_true",
213                  dest="keep_instances",
214                  help=("Leave instances on the cluster after burnin,"
215                        " for investigation in case of errors or simply"
216                        " to use them")),
217   ]
218
219 # Mainly used for bash completion
220 ARGUMENTS = [cli.ArgInstance(min=1)]
221
222
223 def _DoCheckInstances(fn):
224   """Decorator for checking instances.
225
226   """
227   def wrapper(self, *args, **kwargs):
228     val = fn(self, *args, **kwargs)
229     for instance in self.instances:
230       self._CheckInstanceAlive(instance) # pylint: disable-msg=W0212
231     return val
232
233   return wrapper
234
235
236 def _DoBatch(retry):
237   """Decorator for possible batch operations.
238
239   Must come after the _DoCheckInstances decorator (if any).
240
241   @param retry: whether this is a retryable batch, will be
242       passed to StartBatch
243
244   """
245   def wrap(fn):
246     def batched(self, *args, **kwargs):
247       self.StartBatch(retry)
248       val = fn(self, *args, **kwargs)
249       self.CommitQueue()
250       return val
251     return batched
252
253   return wrap
254
255
256 class Burner(object):
257   """Burner class."""
258
259   def __init__(self):
260     """Constructor."""
261     utils.SetupLogging(constants.LOG_BURNIN, debug=False, stderr_logging=True)
262     self.url_opener = SimpleOpener()
263     self._feed_buf = StringIO()
264     self.nodes = []
265     self.instances = []
266     self.to_rem = []
267     self.queued_ops = []
268     self.opts = None
269     self.queue_retry = False
270     self.disk_count = self.disk_growth = self.disk_size = None
271     self.hvp = self.bep = None
272     self.ParseOptions()
273     self.cl = cli.GetClient()
274     self.GetState()
275
276   def ClearFeedbackBuf(self):
277     """Clear the feedback buffer."""
278     self._feed_buf.truncate(0)
279
280   def GetFeedbackBuf(self):
281     """Return the contents of the buffer."""
282     return self._feed_buf.getvalue()
283
284   def Feedback(self, msg):
285     """Acumulate feedback in our buffer."""
286     formatted_msg = "%s %s" % (time.ctime(utils.MergeTime(msg[0])), msg[2])
287     self._feed_buf.write(formatted_msg + "\n")
288     if self.opts.verbose:
289       Log(formatted_msg, indent=3)
290
291   def MaybeRetry(self, retry_count, msg, fn, *args):
292     """Possibly retry a given function execution.
293
294     @type retry_count: int
295     @param retry_count: retry counter:
296         - 0: non-retryable action
297         - 1: last retry for a retryable action
298         - MAX_RETRIES: original try for a retryable action
299     @type msg: str
300     @param msg: the kind of the operation
301     @type fn: callable
302     @param fn: the function to be called
303
304     """
305     try:
306       val = fn(*args)
307       if retry_count > 0 and retry_count < MAX_RETRIES:
308         Log("Idempotent %s succeeded after %d retries",
309             msg, MAX_RETRIES - retry_count)
310       return val
311     except Exception, err: # pylint: disable-msg=W0703
312       if retry_count == 0:
313         Log("Non-idempotent %s failed, aborting", msg)
314         raise
315       elif retry_count == 1:
316         Log("Idempotent %s repeated failure, aborting", msg)
317         raise
318       else:
319         Log("Idempotent %s failed, retry #%d/%d: %s",
320             msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err)
321         self.MaybeRetry(retry_count - 1, msg, fn, *args)
322
323   def _SetDebug(self, ops):
324     """Set the debug value on the given opcodes"""
325     for op in ops:
326       op.debug_level = self.opts.debug
327
328   def _ExecOp(self, *ops):
329     """Execute one or more opcodes and manage the exec buffer.
330
331     @return: if only opcode has been passed, we return its result;
332         otherwise we return the list of results
333
334     """
335     job_id = cli.SendJob(ops, cl=self.cl)
336     results = cli.PollJob(job_id, cl=self.cl, feedback_fn=self.Feedback)
337     if len(ops) == 1:
338       return results[0]
339     else:
340       return results
341
342   def ExecOp(self, retry, *ops):
343     """Execute one or more opcodes and manage the exec buffer.
344
345     @return: if only opcode has been passed, we return its result;
346         otherwise we return the list of results
347
348     """
349     if retry:
350       rval = MAX_RETRIES
351     else:
352       rval = 0
353     self._SetDebug(ops)
354     return self.MaybeRetry(rval, "opcode", self._ExecOp, *ops)
355
356   def ExecOrQueue(self, name, ops, post_process=None):
357     """Execute an opcode and manage the exec buffer."""
358     if self.opts.parallel:
359       self._SetDebug(ops)
360       self.queued_ops.append((ops, name, post_process))
361     else:
362       val = self.ExecOp(self.queue_retry, *ops) # pylint: disable-msg=W0142
363       if post_process is not None:
364         post_process()
365       return val
366
367   def StartBatch(self, retry):
368     """Start a new batch of jobs.
369
370     @param retry: whether this is a retryable batch
371
372     """
373     self.queued_ops = []
374     self.queue_retry = retry
375
376   def CommitQueue(self):
377     """Execute all submitted opcodes in case of parallel burnin"""
378     if not self.opts.parallel or not self.queued_ops:
379       return
380
381     if self.queue_retry:
382       rval = MAX_RETRIES
383     else:
384       rval = 0
385
386     try:
387       results = self.MaybeRetry(rval, "jobset", self.ExecJobSet,
388                                 self.queued_ops)
389     finally:
390       self.queued_ops = []
391     return results
392
393   def ExecJobSet(self, jobs):
394     """Execute a set of jobs and return once all are done.
395
396     The method will return the list of results, if all jobs are
397     successful. Otherwise, OpExecError will be raised from within
398     cli.py.
399
400     """
401     self.ClearFeedbackBuf()
402     jex = cli.JobExecutor(cl=self.cl, feedback_fn=self.Feedback)
403     for ops, name, _ in jobs:
404       jex.QueueJob(name, *ops) # pylint: disable-msg=W0142
405     try:
406       results = jex.GetResults()
407     except Exception, err: # pylint: disable-msg=W0703
408       Log("Jobs failed: %s", err)
409       raise BurninFailure()
410
411     fail = False
412     val = []
413     for (_, name, post_process), (success, result) in zip(jobs, results):
414       if success:
415         if post_process:
416           try:
417             post_process()
418           except Exception, err: # pylint: disable-msg=W0703
419             Log("Post process call for job %s failed: %s", name, err)
420             fail = True
421         val.append(result)
422       else:
423         fail = True
424
425     if fail:
426       raise BurninFailure()
427
428     return val
429
430   def ParseOptions(self):
431     """Parses the command line options.
432
433     In case of command line errors, it will show the usage and exit the
434     program.
435
436     """
437     parser = optparse.OptionParser(usage="\n%s" % USAGE,
438                                    version=("%%prog (ganeti) %s" %
439                                             constants.RELEASE_VERSION),
440                                    option_list=OPTIONS)
441
442     options, args = parser.parse_args()
443     if len(args) < 1 or options.os is None:
444       Usage()
445
446     supported_disk_templates = (constants.DT_DISKLESS,
447                                 constants.DT_FILE,
448                                 constants.DT_PLAIN,
449                                 constants.DT_DRBD8)
450     if options.disk_template not in supported_disk_templates:
451       Err("Unknown disk template '%s'" % options.disk_template)
452
453     if options.disk_template == constants.DT_DISKLESS:
454       disk_size = disk_growth = []
455       options.do_addremove_disks = False
456     else:
457       disk_size = [utils.ParseUnit(v) for v in options.disk_size.split(",")]
458       disk_growth = [utils.ParseUnit(v)
459                      for v in options.disk_growth.split(",")]
460       if len(disk_growth) != len(disk_size):
461         Err("Wrong disk sizes/growth combination")
462     if ((disk_size and options.disk_template == constants.DT_DISKLESS) or
463         (not disk_size and options.disk_template != constants.DT_DISKLESS)):
464       Err("Wrong disk count/disk template combination")
465
466     self.disk_size = disk_size
467     self.disk_growth = disk_growth
468     self.disk_count = len(disk_size)
469
470     if options.nodes and options.iallocator:
471       Err("Give either the nodes option or the iallocator option, not both")
472
473     if options.http_check and not options.name_check:
474       Err("Can't enable HTTP checks without name checks")
475
476     self.opts = options
477     self.instances = args
478     self.bep = {
479       constants.BE_MEMORY: options.mem_size,
480       constants.BE_VCPUS: 1,
481       }
482
483     self.hypervisor = None
484     self.hvp = {}
485     if options.hypervisor:
486       self.hypervisor, self.hvp = options.hypervisor
487
488     if options.reboot_types is None:
489       options.reboot_types = constants.REBOOT_TYPES
490     else:
491       options.reboot_types = options.reboot_types.split(",")
492       rt_diff = set(options.reboot_types).difference(constants.REBOOT_TYPES)
493       if rt_diff:
494         Err("Invalid reboot types specified: %s" % utils.CommaJoin(rt_diff))
495
496     socket.setdefaulttimeout(options.net_timeout)
497
498   def GetState(self):
499     """Read the cluster state from the master daemon."""
500     if self.opts.nodes:
501       names = self.opts.nodes.split(",")
502     else:
503       names = []
504     try:
505       op = opcodes.OpNodeQuery(output_fields=["name", "offline", "drained"],
506                                names=names, use_locking=True)
507       result = self.ExecOp(True, op)
508     except errors.GenericError, err:
509       err_code, msg = cli.FormatError(err)
510       Err(msg, exit_code=err_code)
511     self.nodes = [data[0] for data in result if not (data[1] or data[2])]
512
513     op_diagnose = opcodes.OpOsDiagnose(output_fields=["name",
514                                                       "variants",
515                                                       "hidden"],
516                                        names=[])
517     result = self.ExecOp(True, op_diagnose)
518
519     if not result:
520       Err("Can't get the OS list")
521
522     found = False
523     for (name, variants, _) in result:
524       if self.opts.os in cli.CalculateOSNames(name, variants):
525         found = True
526         break
527
528     if not found:
529       Err("OS '%s' not found" % self.opts.os)
530
531     cluster_info = self.cl.QueryClusterInfo()
532     self.cluster_info = cluster_info
533     if not self.cluster_info:
534       Err("Can't get cluster info")
535
536     default_nic_params = self.cluster_info["nicparams"][constants.PP_DEFAULT]
537     self.cluster_default_nicparams = default_nic_params
538     if self.hypervisor is None:
539       self.hypervisor = self.cluster_info["default_hypervisor"]
540     self.hv_class = hypervisor.GetHypervisorClass(self.hypervisor)
541
542   @_DoCheckInstances
543   @_DoBatch(False)
544   def BurnCreateInstances(self):
545     """Create the given instances.
546
547     """
548     self.to_rem = []
549     mytor = izip(cycle(self.nodes),
550                  islice(cycle(self.nodes), 1, None),
551                  self.instances)
552
553     Log("Creating instances")
554     for pnode, snode, instance in mytor:
555       Log("instance %s", instance, indent=1)
556       if self.opts.iallocator:
557         pnode = snode = None
558         msg = "with iallocator %s" % self.opts.iallocator
559       elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
560         snode = None
561         msg = "on %s" % pnode
562       else:
563         msg = "on %s, %s" % (pnode, snode)
564
565       Log(msg, indent=2)
566
567       op = opcodes.OpInstanceCreate(instance_name=instance,
568                                     disks = [ {"size": size}
569                                               for size in self.disk_size],
570                                     disk_template=self.opts.disk_template,
571                                     nics=self.opts.nics,
572                                     mode=constants.INSTANCE_CREATE,
573                                     os_type=self.opts.os,
574                                     pnode=pnode,
575                                     snode=snode,
576                                     start=True,
577                                     ip_check=self.opts.ip_check,
578                                     name_check=self.opts.name_check,
579                                     wait_for_sync=True,
580                                     file_driver="loop",
581                                     file_storage_dir=None,
582                                     iallocator=self.opts.iallocator,
583                                     beparams=self.bep,
584                                     hvparams=self.hvp,
585                                     hypervisor=self.hypervisor,
586                                     osparams=self.opts.osparams,
587                                     )
588       remove_instance = lambda name: lambda: self.to_rem.append(name)
589       self.ExecOrQueue(instance, [op], post_process=remove_instance(instance))
590
591   @_DoBatch(False)
592   def BurnGrowDisks(self):
593     """Grow both the os and the swap disks by the requested amount, if any."""
594     Log("Growing disks")
595     for instance in self.instances:
596       Log("instance %s", instance, indent=1)
597       for idx, growth in enumerate(self.disk_growth):
598         if growth > 0:
599           op = opcodes.OpInstanceGrowDisk(instance_name=instance, disk=idx,
600                                           amount=growth, wait_for_sync=True)
601           Log("increase disk/%s by %s MB", idx, growth, indent=2)
602           self.ExecOrQueue(instance, [op])
603
604   @_DoBatch(True)
605   def BurnReplaceDisks1D8(self):
606     """Replace disks on primary and secondary for drbd8."""
607     Log("Replacing disks on the same nodes")
608     early_release = self.opts.early_release
609     for instance in self.instances:
610       Log("instance %s", instance, indent=1)
611       ops = []
612       for mode in constants.REPLACE_DISK_SEC, constants.REPLACE_DISK_PRI:
613         op = opcodes.OpInstanceReplaceDisks(instance_name=instance,
614                                             mode=mode,
615                                             disks=list(range(self.disk_count)),
616                                             early_release=early_release)
617         Log("run %s", mode, indent=2)
618         ops.append(op)
619       self.ExecOrQueue(instance, ops)
620
621   @_DoBatch(True)
622   def BurnReplaceDisks2(self):
623     """Replace secondary node."""
624     Log("Changing the secondary node")
625     mode = constants.REPLACE_DISK_CHG
626
627     mytor = izip(islice(cycle(self.nodes), 2, None),
628                  self.instances)
629     for tnode, instance in mytor:
630       Log("instance %s", instance, indent=1)
631       if self.opts.iallocator:
632         tnode = None
633         msg = "with iallocator %s" % self.opts.iallocator
634       else:
635         msg = tnode
636       op = opcodes.OpInstanceReplaceDisks(instance_name=instance,
637                                           mode=mode,
638                                           remote_node=tnode,
639                                           iallocator=self.opts.iallocator,
640                                           disks=[],
641                                           early_release=self.opts.early_release)
642       Log("run %s %s", mode, msg, indent=2)
643       self.ExecOrQueue(instance, [op])
644
645   @_DoCheckInstances
646   @_DoBatch(False)
647   def BurnFailover(self):
648     """Failover the instances."""
649     Log("Failing over instances")
650     for instance in self.instances:
651       Log("instance %s", instance, indent=1)
652       op = opcodes.OpInstanceFailover(instance_name=instance,
653                                       ignore_consistency=False)
654       self.ExecOrQueue(instance, [op])
655
656   @_DoCheckInstances
657   @_DoBatch(False)
658   def BurnMove(self):
659     """Move the instances."""
660     Log("Moving instances")
661     mytor = izip(islice(cycle(self.nodes), 1, None),
662                  self.instances)
663     for tnode, instance in mytor:
664       Log("instance %s", instance, indent=1)
665       op = opcodes.OpInstanceMove(instance_name=instance,
666                                   target_node=tnode)
667       self.ExecOrQueue(instance, [op])
668
669   @_DoBatch(False)
670   def BurnMigrate(self):
671     """Migrate the instances."""
672     Log("Migrating instances")
673     for instance in self.instances:
674       Log("instance %s", instance, indent=1)
675       op1 = opcodes.OpInstanceMigrate(instance_name=instance, mode=None,
676                                       cleanup=False)
677
678       op2 = opcodes.OpInstanceMigrate(instance_name=instance, mode=None,
679                                       cleanup=True)
680       Log("migration and migration cleanup", indent=2)
681       self.ExecOrQueue(instance, [op1, op2])
682
683   @_DoCheckInstances
684   @_DoBatch(False)
685   def BurnImportExport(self):
686     """Export the instance, delete it, and import it back.
687
688     """
689     Log("Exporting and re-importing instances")
690     mytor = izip(cycle(self.nodes),
691                  islice(cycle(self.nodes), 1, None),
692                  islice(cycle(self.nodes), 2, None),
693                  self.instances)
694
695     for pnode, snode, enode, instance in mytor:
696       Log("instance %s", instance, indent=1)
697       # read the full name of the instance
698       nam_op = opcodes.OpInstanceQuery(output_fields=["name"],
699                                        names=[instance], use_locking=True)
700       full_name = self.ExecOp(False, nam_op)[0][0]
701
702       if self.opts.iallocator:
703         pnode = snode = None
704         import_log_msg = ("import from %s"
705                           " with iallocator %s" %
706                           (enode, self.opts.iallocator))
707       elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
708         snode = None
709         import_log_msg = ("import from %s to %s" %
710                           (enode, pnode))
711       else:
712         import_log_msg = ("import from %s to %s, %s" %
713                           (enode, pnode, snode))
714
715       exp_op = opcodes.OpBackupExport(instance_name=instance,
716                                       target_node=enode,
717                                       mode=constants.EXPORT_MODE_LOCAL,
718                                       shutdown=True)
719       rem_op = opcodes.OpInstanceRemove(instance_name=instance,
720                                         ignore_failures=True)
721       imp_dir = utils.PathJoin(constants.EXPORT_DIR, full_name)
722       imp_op = opcodes.OpInstanceCreate(instance_name=instance,
723                                         disks = [ {"size": size}
724                                                   for size in self.disk_size],
725                                         disk_template=self.opts.disk_template,
726                                         nics=self.opts.nics,
727                                         mode=constants.INSTANCE_IMPORT,
728                                         src_node=enode,
729                                         src_path=imp_dir,
730                                         pnode=pnode,
731                                         snode=snode,
732                                         start=True,
733                                         ip_check=self.opts.ip_check,
734                                         name_check=self.opts.name_check,
735                                         wait_for_sync=True,
736                                         file_storage_dir=None,
737                                         file_driver="loop",
738                                         iallocator=self.opts.iallocator,
739                                         beparams=self.bep,
740                                         hvparams=self.hvp,
741                                         osparams=self.opts.osparams,
742                                         )
743
744       erem_op = opcodes.OpBackupRemove(instance_name=instance)
745
746       Log("export to node %s", enode, indent=2)
747       Log("remove instance", indent=2)
748       Log(import_log_msg, indent=2)
749       Log("remove export", indent=2)
750       self.ExecOrQueue(instance, [exp_op, rem_op, imp_op, erem_op])
751
752   @staticmethod
753   def StopInstanceOp(instance):
754     """Stop given instance."""
755     return opcodes.OpInstanceShutdown(instance_name=instance)
756
757   @staticmethod
758   def StartInstanceOp(instance):
759     """Start given instance."""
760     return opcodes.OpInstanceStartup(instance_name=instance, force=False)
761
762   @staticmethod
763   def RenameInstanceOp(instance, instance_new):
764     """Rename instance."""
765     return opcodes.OpInstanceRename(instance_name=instance,
766                                     new_name=instance_new)
767
768   @_DoCheckInstances
769   @_DoBatch(True)
770   def BurnStopStart(self):
771     """Stop/start the instances."""
772     Log("Stopping and starting instances")
773     for instance in self.instances:
774       Log("instance %s", instance, indent=1)
775       op1 = self.StopInstanceOp(instance)
776       op2 = self.StartInstanceOp(instance)
777       self.ExecOrQueue(instance, [op1, op2])
778
779   @_DoBatch(False)
780   def BurnRemove(self):
781     """Remove the instances."""
782     Log("Removing instances")
783     for instance in self.to_rem:
784       Log("instance %s", instance, indent=1)
785       op = opcodes.OpInstanceRemove(instance_name=instance,
786                                     ignore_failures=True)
787       self.ExecOrQueue(instance, [op])
788
789   def BurnRename(self):
790     """Rename the instances.
791
792     Note that this function will not execute in parallel, since we
793     only have one target for rename.
794
795     """
796     Log("Renaming instances")
797     rename = self.opts.rename
798     for instance in self.instances:
799       Log("instance %s", instance, indent=1)
800       op_stop1 = self.StopInstanceOp(instance)
801       op_stop2 = self.StopInstanceOp(rename)
802       op_rename1 = self.RenameInstanceOp(instance, rename)
803       op_rename2 = self.RenameInstanceOp(rename, instance)
804       op_start1 = self.StartInstanceOp(rename)
805       op_start2 = self.StartInstanceOp(instance)
806       self.ExecOp(False, op_stop1, op_rename1, op_start1)
807       self._CheckInstanceAlive(rename)
808       self.ExecOp(False, op_stop2, op_rename2, op_start2)
809       self._CheckInstanceAlive(instance)
810
811   @_DoCheckInstances
812   @_DoBatch(True)
813   def BurnReinstall(self):
814     """Reinstall the instances."""
815     Log("Reinstalling instances")
816     for instance in self.instances:
817       Log("instance %s", instance, indent=1)
818       op1 = self.StopInstanceOp(instance)
819       op2 = opcodes.OpInstanceReinstall(instance_name=instance)
820       Log("reinstall without passing the OS", indent=2)
821       op3 = opcodes.OpInstanceReinstall(instance_name=instance,
822                                         os_type=self.opts.os)
823       Log("reinstall specifying the OS", indent=2)
824       op4 = self.StartInstanceOp(instance)
825       self.ExecOrQueue(instance, [op1, op2, op3, op4])
826
827   @_DoCheckInstances
828   @_DoBatch(True)
829   def BurnReboot(self):
830     """Reboot the instances."""
831     Log("Rebooting instances")
832     for instance in self.instances:
833       Log("instance %s", instance, indent=1)
834       ops = []
835       for reboot_type in self.opts.reboot_types:
836         op = opcodes.OpInstanceReboot(instance_name=instance,
837                                       reboot_type=reboot_type,
838                                       ignore_secondaries=False)
839         Log("reboot with type '%s'", reboot_type, indent=2)
840         ops.append(op)
841       self.ExecOrQueue(instance, ops)
842
843   @_DoCheckInstances
844   @_DoBatch(True)
845   def BurnActivateDisks(self):
846     """Activate and deactivate disks of the instances."""
847     Log("Activating/deactivating disks")
848     for instance in self.instances:
849       Log("instance %s", instance, indent=1)
850       op_start = self.StartInstanceOp(instance)
851       op_act = opcodes.OpInstanceActivateDisks(instance_name=instance)
852       op_deact = opcodes.OpInstanceDeactivateDisks(instance_name=instance)
853       op_stop = self.StopInstanceOp(instance)
854       Log("activate disks when online", indent=2)
855       Log("activate disks when offline", indent=2)
856       Log("deactivate disks (when offline)", indent=2)
857       self.ExecOrQueue(instance, [op_act, op_stop, op_act, op_deact, op_start])
858
859   @_DoCheckInstances
860   @_DoBatch(False)
861   def BurnAddRemoveDisks(self):
862     """Add and remove an extra disk for the instances."""
863     Log("Adding and removing disks")
864     for instance in self.instances:
865       Log("instance %s", instance, indent=1)
866       op_add = opcodes.OpInstanceSetParams(\
867         instance_name=instance,
868         disks=[(constants.DDM_ADD, {"size": self.disk_size[0]})])
869       op_rem = opcodes.OpInstanceSetParams(\
870         instance_name=instance, disks=[(constants.DDM_REMOVE, {})])
871       op_stop = self.StopInstanceOp(instance)
872       op_start = self.StartInstanceOp(instance)
873       Log("adding a disk", indent=2)
874       Log("removing last disk", indent=2)
875       self.ExecOrQueue(instance, [op_add, op_stop, op_rem, op_start])
876
877   @_DoBatch(False)
878   def BurnAddRemoveNICs(self):
879     """Add and remove an extra NIC for the instances."""
880     Log("Adding and removing NICs")
881     for instance in self.instances:
882       Log("instance %s", instance, indent=1)
883       op_add = opcodes.OpInstanceSetParams(\
884         instance_name=instance, nics=[(constants.DDM_ADD, {})])
885       op_rem = opcodes.OpInstanceSetParams(\
886         instance_name=instance, nics=[(constants.DDM_REMOVE, {})])
887       Log("adding a NIC", indent=2)
888       Log("removing last NIC", indent=2)
889       self.ExecOrQueue(instance, [op_add, op_rem])
890
891   def ConfdCallback(self, reply):
892     """Callback for confd queries"""
893     if reply.type == confd_client.UPCALL_REPLY:
894       if reply.server_reply.status != constants.CONFD_REPL_STATUS_OK:
895         Err("Query %s gave non-ok status %s: %s" % (reply.orig_request,
896                                                     reply.server_reply.status,
897                                                     reply.server_reply))
898       if reply.orig_request.type == constants.CONFD_REQ_PING:
899         Log("Ping: OK", indent=1)
900       elif reply.orig_request.type == constants.CONFD_REQ_CLUSTER_MASTER:
901         if reply.server_reply.answer == self.cluster_info["master"]:
902           Log("Master: OK", indent=1)
903         else:
904           Err("Master: wrong: %s" % reply.server_reply.answer)
905       elif reply.orig_request.type == constants.CONFD_REQ_NODE_ROLE_BYNAME:
906         if reply.server_reply.answer == constants.CONFD_NODE_ROLE_MASTER:
907           Log("Node role for master: OK", indent=1)
908         else:
909           Err("Node role for master: wrong: %s" % reply.server_reply.answer)
910
911   def DoConfdRequestReply(self, req):
912     self.confd_counting_callback.RegisterQuery(req.rsalt)
913     self.confd_client.SendRequest(req, async=False)
914     while not self.confd_counting_callback.AllAnswered():
915       if not self.confd_client.ReceiveReply():
916         Err("Did not receive all expected confd replies")
917         break
918
919   def BurnConfd(self):
920     """Run confd queries for our instances.
921
922     The following confd queries are tested:
923       - CONFD_REQ_PING: simple ping
924       - CONFD_REQ_CLUSTER_MASTER: cluster master
925       - CONFD_REQ_NODE_ROLE_BYNAME: node role, for the master
926
927     """
928     Log("Checking confd results")
929
930     filter_callback = confd_client.ConfdFilterCallback(self.ConfdCallback)
931     counting_callback = confd_client.ConfdCountingCallback(filter_callback)
932     self.confd_counting_callback = counting_callback
933
934     self.confd_client = confd_client.GetConfdClient(counting_callback)
935
936     req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
937     self.DoConfdRequestReply(req)
938
939     req = confd_client.ConfdClientRequest(
940       type=constants.CONFD_REQ_CLUSTER_MASTER)
941     self.DoConfdRequestReply(req)
942
943     req = confd_client.ConfdClientRequest(
944         type=constants.CONFD_REQ_NODE_ROLE_BYNAME,
945         query=self.cluster_info["master"])
946     self.DoConfdRequestReply(req)
947
948   def _CheckInstanceAlive(self, instance):
949     """Check if an instance is alive by doing http checks.
950
951     This will try to retrieve the url on the instance /hostname.txt
952     and check that it contains the hostname of the instance. In case
953     we get ECONNREFUSED, we retry up to the net timeout seconds, for
954     any other error we abort.
955
956     """
957     if not self.opts.http_check:
958       return
959     end_time = time.time() + self.opts.net_timeout
960     url = None
961     while time.time() < end_time and url is None:
962       try:
963         url = self.url_opener.open("http://%s/hostname.txt" % instance)
964       except IOError:
965         # here we can have connection refused, no route to host, etc.
966         time.sleep(1)
967     if url is None:
968       raise InstanceDown(instance, "Cannot contact instance")
969     hostname = url.read().strip()
970     url.close()
971     if hostname != instance:
972       raise InstanceDown(instance, ("Hostname mismatch, expected %s, got %s" %
973                                     (instance, hostname)))
974
975   def BurninCluster(self):
976     """Test a cluster intensively.
977
978     This will create instances and then start/stop/failover them.
979     It is safe for existing instances but could impact performance.
980
981     """
982
983     opts = self.opts
984
985     Log("Testing global parameters")
986
987     if (len(self.nodes) == 1 and
988         opts.disk_template not in (constants.DT_DISKLESS, constants.DT_PLAIN,
989                                    constants.DT_FILE)):
990       Err("When one node is available/selected the disk template must"
991           " be 'diskless', 'file' or 'plain'")
992
993     has_err = True
994     try:
995       self.BurnCreateInstances()
996       if opts.do_replace1 and opts.disk_template in constants.DTS_NET_MIRROR:
997         self.BurnReplaceDisks1D8()
998       if (opts.do_replace2 and len(self.nodes) > 2 and
999           opts.disk_template in constants.DTS_NET_MIRROR) :
1000         self.BurnReplaceDisks2()
1001
1002       if (opts.disk_template in constants.DTS_GROWABLE and
1003           compat.any(n > 0 for n in self.disk_growth)):
1004         self.BurnGrowDisks()
1005
1006       if opts.do_failover and opts.disk_template in constants.DTS_NET_MIRROR:
1007         self.BurnFailover()
1008
1009       if opts.do_migrate:
1010         if opts.disk_template != constants.DT_DRBD8:
1011           Log("Skipping migration (disk template not DRBD8)")
1012         elif not self.hv_class.CAN_MIGRATE:
1013           Log("Skipping migration (hypervisor %s does not support it)",
1014               self.hypervisor)
1015         else:
1016           self.BurnMigrate()
1017
1018       if (opts.do_move and len(self.nodes) > 1 and
1019           opts.disk_template in [constants.DT_PLAIN, constants.DT_FILE]):
1020         self.BurnMove()
1021
1022       if (opts.do_importexport and
1023           opts.disk_template not in (constants.DT_DISKLESS,
1024                                      constants.DT_FILE)):
1025         self.BurnImportExport()
1026
1027       if opts.do_reinstall:
1028         self.BurnReinstall()
1029
1030       if opts.do_reboot:
1031         self.BurnReboot()
1032
1033       if opts.do_addremove_disks:
1034         self.BurnAddRemoveDisks()
1035
1036       default_nic_mode = self.cluster_default_nicparams[constants.NIC_MODE]
1037       # Don't add/remove nics in routed mode, as we would need an ip to add
1038       # them with
1039       if opts.do_addremove_nics:
1040         if default_nic_mode == constants.NIC_MODE_BRIDGED:
1041           self.BurnAddRemoveNICs()
1042         else:
1043           Log("Skipping nic add/remove as the cluster is not in bridged mode")
1044
1045       if opts.do_activate_disks:
1046         self.BurnActivateDisks()
1047
1048       if opts.rename:
1049         self.BurnRename()
1050
1051       if opts.do_confd_tests:
1052         self.BurnConfd()
1053
1054       if opts.do_startstop:
1055         self.BurnStopStart()
1056
1057       has_err = False
1058     finally:
1059       if has_err:
1060         Log("Error detected: opcode buffer follows:\n\n")
1061         Log(self.GetFeedbackBuf())
1062         Log("\n\n")
1063       if not self.opts.keep_instances:
1064         try:
1065           self.BurnRemove()
1066         except Exception, err:  # pylint: disable-msg=W0703
1067           if has_err: # already detected errors, so errors in removal
1068                       # are quite expected
1069             Log("Note: error detected during instance remove: %s", err)
1070           else: # non-expected error
1071             raise
1072
1073     return 0
1074
1075
1076 def main():
1077   """Main function"""
1078
1079   burner = Burner()
1080   return burner.BurninCluster()
1081
1082
1083 if __name__ == "__main__":
1084   main()