Rename OpMoveInstance and LUMoveInstance
[ganeti-local] / tools / burnin
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Burnin program
23
24 """
25
26 import sys
27 import optparse
28 import time
29 import socket
30 import urllib
31 from itertools import izip, islice, cycle
32 from cStringIO import StringIO
33
34 from ganeti import opcodes
35 from ganeti import constants
36 from ganeti import cli
37 from ganeti import errors
38 from ganeti import utils
39 from ganeti import hypervisor
40 from ganeti import compat
41
42 from ganeti.confd import client as confd_client
43
44
45 USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...")
46
47 MAX_RETRIES = 3
48 LOG_HEADERS = {
49   0: "- ",
50   1: "* ",
51   2: ""
52   }
53
54 class InstanceDown(Exception):
55   """The checked instance was not up"""
56
57
58 class BurninFailure(Exception):
59   """Failure detected during burning"""
60
61
62 def Usage():
63   """Shows program usage information and exits the program."""
64
65   print >> sys.stderr, "Usage:"
66   print >> sys.stderr, USAGE
67   sys.exit(2)
68
69
70 def Log(msg, *args, **kwargs):
71   """Simple function that prints out its argument.
72
73   """
74   if args:
75     msg = msg % args
76   indent = kwargs.get('indent', 0)
77   sys.stdout.write("%*s%s%s\n" % (2*indent, "",
78                                   LOG_HEADERS.get(indent, "  "), msg))
79   sys.stdout.flush()
80
81
82 def Err(msg, exit_code=1):
83   """Simple error logging that prints to stderr.
84
85   """
86   sys.stderr.write(msg + "\n")
87   sys.stderr.flush()
88   sys.exit(exit_code)
89
90
91 class SimpleOpener(urllib.FancyURLopener):
92   """A simple url opener"""
93   # pylint: disable-msg=W0221
94
95   def prompt_user_passwd(self, host, realm, clear_cache=0):
96     """No-interaction version of prompt_user_passwd."""
97     # we follow parent class' API
98     # pylint: disable-msg=W0613
99     return None, None
100
101   def http_error_default(self, url, fp, errcode, errmsg, headers):
102     """Custom error handling"""
103     # make sure sockets are not left in CLOSE_WAIT, this is similar
104     # but with a different exception to the BasicURLOpener class
105     _ = fp.read() # throw away data
106     fp.close()
107     raise InstanceDown("HTTP error returned: code %s, msg %s" %
108                        (errcode, errmsg))
109
110
111 OPTIONS = [
112   cli.cli_option("-o", "--os", dest="os", default=None,
113                  help="OS to use during burnin",
114                  metavar="<OS>",
115                  completion_suggest=cli.OPT_COMPL_ONE_OS),
116   cli.HYPERVISOR_OPT,
117   cli.OSPARAMS_OPT,
118   cli.cli_option("--disk-size", dest="disk_size",
119                  help="Disk size (determines disk count)",
120                  default="128m", type="string", metavar="<size,size,...>",
121                  completion_suggest=("128M 512M 1G 4G 1G,256M"
122                                      " 4G,1G,1G 10G").split()),
123   cli.cli_option("--disk-growth", dest="disk_growth", help="Disk growth",
124                  default="128m", type="string", metavar="<size,size,...>"),
125   cli.cli_option("--mem-size", dest="mem_size", help="Memory size",
126                  default=128, type="unit", metavar="<size>",
127                  completion_suggest=("128M 256M 512M 1G 4G 8G"
128                                      " 12G 16G").split()),
129   cli.DEBUG_OPT,
130   cli.VERBOSE_OPT,
131   cli.NOIPCHECK_OPT,
132   cli.NONAMECHECK_OPT,
133   cli.EARLY_RELEASE_OPT,
134   cli.cli_option("--no-replace1", dest="do_replace1",
135                  help="Skip disk replacement with the same secondary",
136                  action="store_false", default=True),
137   cli.cli_option("--no-replace2", dest="do_replace2",
138                  help="Skip disk replacement with a different secondary",
139                  action="store_false", default=True),
140   cli.cli_option("--no-failover", dest="do_failover",
141                  help="Skip instance failovers", action="store_false",
142                  default=True),
143   cli.cli_option("--no-migrate", dest="do_migrate",
144                  help="Skip instance live migration",
145                  action="store_false", default=True),
146   cli.cli_option("--no-move", dest="do_move",
147                  help="Skip instance moves", action="store_false",
148                  default=True),
149   cli.cli_option("--no-importexport", dest="do_importexport",
150                  help="Skip instance export/import", action="store_false",
151                  default=True),
152   cli.cli_option("--no-startstop", dest="do_startstop",
153                  help="Skip instance stop/start", action="store_false",
154                  default=True),
155   cli.cli_option("--no-reinstall", dest="do_reinstall",
156                  help="Skip instance reinstall", action="store_false",
157                  default=True),
158   cli.cli_option("--no-reboot", dest="do_reboot",
159                  help="Skip instance reboot", action="store_false",
160                  default=True),
161   cli.cli_option("--reboot-types", dest="reboot_types",
162                  help="Specify the reboot types", default=None),
163   cli.cli_option("--no-activate-disks", dest="do_activate_disks",
164                  help="Skip disk activation/deactivation",
165                  action="store_false", default=True),
166   cli.cli_option("--no-add-disks", dest="do_addremove_disks",
167                  help="Skip disk addition/removal",
168                  action="store_false", default=True),
169   cli.cli_option("--no-add-nics", dest="do_addremove_nics",
170                  help="Skip NIC addition/removal",
171                  action="store_false", default=True),
172   cli.cli_option("--no-nics", dest="nics",
173                  help="No network interfaces", action="store_const",
174                  const=[], default=[{}]),
175   cli.cli_option("--no-confd", dest="do_confd_tests",
176                  help="Skip confd queries",
177                  action="store_false", default=True),
178   cli.cli_option("--rename", dest="rename", default=None,
179                  help=("Give one unused instance name which is taken"
180                        " to start the renaming sequence"),
181                  metavar="<instance_name>"),
182   cli.cli_option("-t", "--disk-template", dest="disk_template",
183                  choices=list(constants.DISK_TEMPLATES),
184                  default=constants.DT_DRBD8,
185                  help="Disk template (diskless, file, plain or drbd) [drbd]"),
186   cli.cli_option("-n", "--nodes", dest="nodes", default="",
187                  help=("Comma separated list of nodes to perform"
188                        " the burnin on (defaults to all nodes)"),
189                  completion_suggest=cli.OPT_COMPL_MANY_NODES),
190   cli.cli_option("-I", "--iallocator", dest="iallocator",
191                  default=None, type="string",
192                  help=("Perform the allocation using an iallocator"
193                        " instead of fixed node spread (node restrictions no"
194                        " longer apply, therefore -n/--nodes must not be"
195                        " used"),
196                  completion_suggest=cli.OPT_COMPL_ONE_IALLOCATOR),
197   cli.cli_option("-p", "--parallel", default=False, action="store_true",
198                  dest="parallel",
199                  help=("Enable parallelization of some operations in"
200                        " order to speed burnin or to test granular locking")),
201   cli.cli_option("--net-timeout", default=15, type="int",
202                  dest="net_timeout",
203                  help=("The instance check network timeout in seconds"
204                        " (defaults to 15 seconds)"),
205                  completion_suggest="15 60 300 900".split()),
206   cli.cli_option("-C", "--http-check", default=False, action="store_true",
207                  dest="http_check",
208                  help=("Enable checking of instance status via http,"
209                        " looking for /hostname.txt that should contain the"
210                        " name of the instance")),
211   cli.cli_option("-K", "--keep-instances", default=False,
212                  action="store_true",
213                  dest="keep_instances",
214                  help=("Leave instances on the cluster after burnin,"
215                        " for investigation in case of errors or simply"
216                        " to use them")),
217   ]
218
219 # Mainly used for bash completion
220 ARGUMENTS = [cli.ArgInstance(min=1)]
221
222
223 def _DoCheckInstances(fn):
224   """Decorator for checking instances.
225
226   """
227   def wrapper(self, *args, **kwargs):
228     val = fn(self, *args, **kwargs)
229     for instance in self.instances:
230       self._CheckInstanceAlive(instance) # pylint: disable-msg=W0212
231     return val
232
233   return wrapper
234
235
236 def _DoBatch(retry):
237   """Decorator for possible batch operations.
238
239   Must come after the _DoCheckInstances decorator (if any).
240
241   @param retry: whether this is a retryable batch, will be
242       passed to StartBatch
243
244   """
245   def wrap(fn):
246     def batched(self, *args, **kwargs):
247       self.StartBatch(retry)
248       val = fn(self, *args, **kwargs)
249       self.CommitQueue()
250       return val
251     return batched
252
253   return wrap
254
255
256 class Burner(object):
257   """Burner class."""
258
259   def __init__(self):
260     """Constructor."""
261     utils.SetupLogging(constants.LOG_BURNIN, debug=False, stderr_logging=True)
262     self.url_opener = SimpleOpener()
263     self._feed_buf = StringIO()
264     self.nodes = []
265     self.instances = []
266     self.to_rem = []
267     self.queued_ops = []
268     self.opts = None
269     self.queue_retry = False
270     self.disk_count = self.disk_growth = self.disk_size = None
271     self.hvp = self.bep = None
272     self.ParseOptions()
273     self.cl = cli.GetClient()
274     self.GetState()
275
276   def ClearFeedbackBuf(self):
277     """Clear the feedback buffer."""
278     self._feed_buf.truncate(0)
279
280   def GetFeedbackBuf(self):
281     """Return the contents of the buffer."""
282     return self._feed_buf.getvalue()
283
284   def Feedback(self, msg):
285     """Acumulate feedback in our buffer."""
286     formatted_msg = "%s %s" % (time.ctime(utils.MergeTime(msg[0])), msg[2])
287     self._feed_buf.write(formatted_msg + "\n")
288     if self.opts.verbose:
289       Log(formatted_msg, indent=3)
290
291   def MaybeRetry(self, retry_count, msg, fn, *args):
292     """Possibly retry a given function execution.
293
294     @type retry_count: int
295     @param retry_count: retry counter:
296         - 0: non-retryable action
297         - 1: last retry for a retryable action
298         - MAX_RETRIES: original try for a retryable action
299     @type msg: str
300     @param msg: the kind of the operation
301     @type fn: callable
302     @param fn: the function to be called
303
304     """
305     try:
306       val = fn(*args)
307       if retry_count > 0 and retry_count < MAX_RETRIES:
308         Log("Idempotent %s succeeded after %d retries",
309             msg, MAX_RETRIES - retry_count)
310       return val
311     except Exception, err: # pylint: disable-msg=W0703
312       if retry_count == 0:
313         Log("Non-idempotent %s failed, aborting", msg)
314         raise
315       elif retry_count == 1:
316         Log("Idempotent %s repeated failure, aborting", msg)
317         raise
318       else:
319         Log("Idempotent %s failed, retry #%d/%d: %s",
320             msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err)
321         self.MaybeRetry(retry_count - 1, msg, fn, *args)
322
323   def _SetDebug(self, ops):
324     """Set the debug value on the given opcodes"""
325     for op in ops:
326       op.debug_level = self.opts.debug
327
328   def _ExecOp(self, *ops):
329     """Execute one or more opcodes and manage the exec buffer.
330
331     @return: if only opcode has been passed, we return its result;
332         otherwise we return the list of results
333
334     """
335     job_id = cli.SendJob(ops, cl=self.cl)
336     results = cli.PollJob(job_id, cl=self.cl, feedback_fn=self.Feedback)
337     if len(ops) == 1:
338       return results[0]
339     else:
340       return results
341
342   def ExecOp(self, retry, *ops):
343     """Execute one or more opcodes and manage the exec buffer.
344
345     @return: if only opcode has been passed, we return its result;
346         otherwise we return the list of results
347
348     """
349     if retry:
350       rval = MAX_RETRIES
351     else:
352       rval = 0
353     self._SetDebug(ops)
354     return self.MaybeRetry(rval, "opcode", self._ExecOp, *ops)
355
356   def ExecOrQueue(self, name, ops, post_process=None):
357     """Execute an opcode and manage the exec buffer."""
358     if self.opts.parallel:
359       self._SetDebug(ops)
360       self.queued_ops.append((ops, name, post_process))
361     else:
362       val = self.ExecOp(self.queue_retry, *ops) # pylint: disable-msg=W0142
363       if post_process is not None:
364         post_process()
365       return val
366
367   def StartBatch(self, retry):
368     """Start a new batch of jobs.
369
370     @param retry: whether this is a retryable batch
371
372     """
373     self.queued_ops = []
374     self.queue_retry = retry
375
376   def CommitQueue(self):
377     """Execute all submitted opcodes in case of parallel burnin"""
378     if not self.opts.parallel or not self.queued_ops:
379       return
380
381     if self.queue_retry:
382       rval = MAX_RETRIES
383     else:
384       rval = 0
385
386     try:
387       results = self.MaybeRetry(rval, "jobset", self.ExecJobSet,
388                                 self.queued_ops)
389     finally:
390       self.queued_ops = []
391     return results
392
393   def ExecJobSet(self, jobs):
394     """Execute a set of jobs and return once all are done.
395
396     The method will return the list of results, if all jobs are
397     successful. Otherwise, OpExecError will be raised from within
398     cli.py.
399
400     """
401     self.ClearFeedbackBuf()
402     jex = cli.JobExecutor(cl=self.cl, feedback_fn=self.Feedback)
403     for ops, name, _ in jobs:
404       jex.QueueJob(name, *ops) # pylint: disable-msg=W0142
405     try:
406       results = jex.GetResults()
407     except Exception, err: # pylint: disable-msg=W0703
408       Log("Jobs failed: %s", err)
409       raise BurninFailure()
410
411     fail = False
412     val = []
413     for (_, name, post_process), (success, result) in zip(jobs, results):
414       if success:
415         if post_process:
416           try:
417             post_process()
418           except Exception, err: # pylint: disable-msg=W0703
419             Log("Post process call for job %s failed: %s", name, err)
420             fail = True
421         val.append(result)
422       else:
423         fail = True
424
425     if fail:
426       raise BurninFailure()
427
428     return val
429
430   def ParseOptions(self):
431     """Parses the command line options.
432
433     In case of command line errors, it will show the usage and exit the
434     program.
435
436     """
437     parser = optparse.OptionParser(usage="\n%s" % USAGE,
438                                    version=("%%prog (ganeti) %s" %
439                                             constants.RELEASE_VERSION),
440                                    option_list=OPTIONS)
441
442     options, args = parser.parse_args()
443     if len(args) < 1 or options.os is None:
444       Usage()
445
446     supported_disk_templates = (constants.DT_DISKLESS,
447                                 constants.DT_FILE,
448                                 constants.DT_PLAIN,
449                                 constants.DT_DRBD8)
450     if options.disk_template not in supported_disk_templates:
451       Err("Unknown disk template '%s'" % options.disk_template)
452
453     if options.disk_template == constants.DT_DISKLESS:
454       disk_size = disk_growth = []
455       options.do_addremove_disks = False
456     else:
457       disk_size = [utils.ParseUnit(v) for v in options.disk_size.split(",")]
458       disk_growth = [utils.ParseUnit(v)
459                      for v in options.disk_growth.split(",")]
460       if len(disk_growth) != len(disk_size):
461         Err("Wrong disk sizes/growth combination")
462     if ((disk_size and options.disk_template == constants.DT_DISKLESS) or
463         (not disk_size and options.disk_template != constants.DT_DISKLESS)):
464       Err("Wrong disk count/disk template combination")
465
466     self.disk_size = disk_size
467     self.disk_growth = disk_growth
468     self.disk_count = len(disk_size)
469
470     if options.nodes and options.iallocator:
471       Err("Give either the nodes option or the iallocator option, not both")
472
473     if options.http_check and not options.name_check:
474       Err("Can't enable HTTP checks without name checks")
475
476     self.opts = options
477     self.instances = args
478     self.bep = {
479       constants.BE_MEMORY: options.mem_size,
480       constants.BE_VCPUS: 1,
481       }
482
483     self.hypervisor = None
484     self.hvp = {}
485     if options.hypervisor:
486       self.hypervisor, self.hvp = options.hypervisor
487
488     if options.reboot_types is None:
489       options.reboot_types = constants.REBOOT_TYPES
490     else:
491       options.reboot_types = options.reboot_types.split(",")
492       rt_diff = set(options.reboot_types).difference(constants.REBOOT_TYPES)
493       if rt_diff:
494         Err("Invalid reboot types specified: %s" % utils.CommaJoin(rt_diff))
495
496     socket.setdefaulttimeout(options.net_timeout)
497
498   def GetState(self):
499     """Read the cluster state from the master daemon."""
500     if self.opts.nodes:
501       names = self.opts.nodes.split(",")
502     else:
503       names = []
504     try:
505       op = opcodes.OpQueryNodes(output_fields=["name", "offline", "drained"],
506                                 names=names, use_locking=True)
507       result = self.ExecOp(True, op)
508     except errors.GenericError, err:
509       err_code, msg = cli.FormatError(err)
510       Err(msg, exit_code=err_code)
511     self.nodes = [data[0] for data in result if not (data[1] or data[2])]
512
513     op_diagnose = opcodes.OpDiagnoseOS(output_fields=["name",
514                                                       "variants",
515                                                       "hidden"],
516                                        names=[])
517     result = self.ExecOp(True, op_diagnose)
518
519     if not result:
520       Err("Can't get the OS list")
521
522     found = False
523     for (name, variants, _) in result:
524       if self.opts.os in cli.CalculateOSNames(name, variants):
525         found = True
526         break
527
528     if not found:
529       Err("OS '%s' not found" % self.opts.os)
530
531     cluster_info = self.cl.QueryClusterInfo()
532     self.cluster_info = cluster_info
533     if not self.cluster_info:
534       Err("Can't get cluster info")
535
536     default_nic_params = self.cluster_info["nicparams"][constants.PP_DEFAULT]
537     self.cluster_default_nicparams = default_nic_params
538     if self.hypervisor is None:
539       self.hypervisor = self.cluster_info["default_hypervisor"]
540     self.hv_class = hypervisor.GetHypervisorClass(self.hypervisor)
541
542   @_DoCheckInstances
543   @_DoBatch(False)
544   def BurnCreateInstances(self):
545     """Create the given instances.
546
547     """
548     self.to_rem = []
549     mytor = izip(cycle(self.nodes),
550                  islice(cycle(self.nodes), 1, None),
551                  self.instances)
552
553     Log("Creating instances")
554     for pnode, snode, instance in mytor:
555       Log("instance %s", instance, indent=1)
556       if self.opts.iallocator:
557         pnode = snode = None
558         msg = "with iallocator %s" % self.opts.iallocator
559       elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
560         snode = None
561         msg = "on %s" % pnode
562       else:
563         msg = "on %s, %s" % (pnode, snode)
564
565       Log(msg, indent=2)
566
567       op = opcodes.OpInstanceCreate(instance_name=instance,
568                                     disks = [ {"size": size}
569                                               for size in self.disk_size],
570                                     disk_template=self.opts.disk_template,
571                                     nics=self.opts.nics,
572                                     mode=constants.INSTANCE_CREATE,
573                                     os_type=self.opts.os,
574                                     pnode=pnode,
575                                     snode=snode,
576                                     start=True,
577                                     ip_check=self.opts.ip_check,
578                                     name_check=self.opts.name_check,
579                                     wait_for_sync=True,
580                                     file_driver="loop",
581                                     file_storage_dir=None,
582                                     iallocator=self.opts.iallocator,
583                                     beparams=self.bep,
584                                     hvparams=self.hvp,
585                                     hypervisor=self.hypervisor,
586                                     osparams=self.opts.osparams,
587                                     )
588       remove_instance = lambda name: lambda: self.to_rem.append(name)
589       self.ExecOrQueue(instance, [op], post_process=remove_instance(instance))
590
591   @_DoBatch(False)
592   def BurnGrowDisks(self):
593     """Grow both the os and the swap disks by the requested amount, if any."""
594     Log("Growing disks")
595     for instance in self.instances:
596       Log("instance %s", instance, indent=1)
597       for idx, growth in enumerate(self.disk_growth):
598         if growth > 0:
599           op = opcodes.OpInstanceGrowDisk(instance_name=instance, disk=idx,
600                                           amount=growth, wait_for_sync=True)
601           Log("increase disk/%s by %s MB", idx, growth, indent=2)
602           self.ExecOrQueue(instance, [op])
603
604   @_DoBatch(True)
605   def BurnReplaceDisks1D8(self):
606     """Replace disks on primary and secondary for drbd8."""
607     Log("Replacing disks on the same nodes")
608     for instance in self.instances:
609       Log("instance %s", instance, indent=1)
610       ops = []
611       for mode in constants.REPLACE_DISK_SEC, constants.REPLACE_DISK_PRI:
612         op = opcodes.OpReplaceDisks(instance_name=instance,
613                                     mode=mode,
614                                     disks=[i for i in range(self.disk_count)],
615                                     early_release=self.opts.early_release)
616         Log("run %s", mode, indent=2)
617         ops.append(op)
618       self.ExecOrQueue(instance, ops)
619
620   @_DoBatch(True)
621   def BurnReplaceDisks2(self):
622     """Replace secondary node."""
623     Log("Changing the secondary node")
624     mode = constants.REPLACE_DISK_CHG
625
626     mytor = izip(islice(cycle(self.nodes), 2, None),
627                  self.instances)
628     for tnode, instance in mytor:
629       Log("instance %s", instance, indent=1)
630       if self.opts.iallocator:
631         tnode = None
632         msg = "with iallocator %s" % self.opts.iallocator
633       else:
634         msg = tnode
635       op = opcodes.OpReplaceDisks(instance_name=instance,
636                                   mode=mode,
637                                   remote_node=tnode,
638                                   iallocator=self.opts.iallocator,
639                                   disks=[],
640                                   early_release=self.opts.early_release)
641       Log("run %s %s", mode, msg, indent=2)
642       self.ExecOrQueue(instance, [op])
643
644   @_DoCheckInstances
645   @_DoBatch(False)
646   def BurnFailover(self):
647     """Failover the instances."""
648     Log("Failing over instances")
649     for instance in self.instances:
650       Log("instance %s", instance, indent=1)
651       op = opcodes.OpInstanceFailover(instance_name=instance,
652                                       ignore_consistency=False)
653       self.ExecOrQueue(instance, [op])
654
655   @_DoCheckInstances
656   @_DoBatch(False)
657   def BurnMove(self):
658     """Move the instances."""
659     Log("Moving instances")
660     mytor = izip(islice(cycle(self.nodes), 1, None),
661                  self.instances)
662     for tnode, instance in mytor:
663       Log("instance %s", instance, indent=1)
664       op = opcodes.OpInstanceMove(instance_name=instance,
665                                   target_node=tnode)
666       self.ExecOrQueue(instance, [op])
667
668   @_DoBatch(False)
669   def BurnMigrate(self):
670     """Migrate the instances."""
671     Log("Migrating instances")
672     for instance in self.instances:
673       Log("instance %s", instance, indent=1)
674       op1 = opcodes.OpInstanceMigrate(instance_name=instance, mode=None,
675                                       cleanup=False)
676
677       op2 = opcodes.OpInstanceMigrate(instance_name=instance, mode=None,
678                                       cleanup=True)
679       Log("migration and migration cleanup", indent=2)
680       self.ExecOrQueue(instance, [op1, op2])
681
682   @_DoCheckInstances
683   @_DoBatch(False)
684   def BurnImportExport(self):
685     """Export the instance, delete it, and import it back.
686
687     """
688     Log("Exporting and re-importing instances")
689     mytor = izip(cycle(self.nodes),
690                  islice(cycle(self.nodes), 1, None),
691                  islice(cycle(self.nodes), 2, None),
692                  self.instances)
693
694     for pnode, snode, enode, instance in mytor:
695       Log("instance %s", instance, indent=1)
696       # read the full name of the instance
697       nam_op = opcodes.OpQueryInstances(output_fields=["name"],
698                                         names=[instance], use_locking=True)
699       full_name = self.ExecOp(False, nam_op)[0][0]
700
701       if self.opts.iallocator:
702         pnode = snode = None
703         import_log_msg = ("import from %s"
704                           " with iallocator %s" %
705                           (enode, self.opts.iallocator))
706       elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
707         snode = None
708         import_log_msg = ("import from %s to %s" %
709                           (enode, pnode))
710       else:
711         import_log_msg = ("import from %s to %s, %s" %
712                           (enode, pnode, snode))
713
714       exp_op = opcodes.OpBackupExport(instance_name=instance,
715                                       target_node=enode,
716                                       mode=constants.EXPORT_MODE_LOCAL,
717                                       shutdown=True)
718       rem_op = opcodes.OpRemoveInstance(instance_name=instance,
719                                         ignore_failures=True)
720       imp_dir = utils.PathJoin(constants.EXPORT_DIR, full_name)
721       imp_op = opcodes.OpInstanceCreate(instance_name=instance,
722                                         disks = [ {"size": size}
723                                                   for size in self.disk_size],
724                                         disk_template=self.opts.disk_template,
725                                         nics=self.opts.nics,
726                                         mode=constants.INSTANCE_IMPORT,
727                                         src_node=enode,
728                                         src_path=imp_dir,
729                                         pnode=pnode,
730                                         snode=snode,
731                                         start=True,
732                                         ip_check=self.opts.ip_check,
733                                         name_check=self.opts.name_check,
734                                         wait_for_sync=True,
735                                         file_storage_dir=None,
736                                         file_driver="loop",
737                                         iallocator=self.opts.iallocator,
738                                         beparams=self.bep,
739                                         hvparams=self.hvp,
740                                         osparams=self.opts.osparams,
741                                         )
742
743       erem_op = opcodes.OpBackupRemove(instance_name=instance)
744
745       Log("export to node %s", enode, indent=2)
746       Log("remove instance", indent=2)
747       Log(import_log_msg, indent=2)
748       Log("remove export", indent=2)
749       self.ExecOrQueue(instance, [exp_op, rem_op, imp_op, erem_op])
750
751   @staticmethod
752   def StopInstanceOp(instance):
753     """Stop given instance."""
754     return opcodes.OpShutdownInstance(instance_name=instance)
755
756   @staticmethod
757   def StartInstanceOp(instance):
758     """Start given instance."""
759     return opcodes.OpStartupInstance(instance_name=instance, force=False)
760
761   @staticmethod
762   def RenameInstanceOp(instance, instance_new):
763     """Rename instance."""
764     return opcodes.OpRenameInstance(instance_name=instance,
765                                     new_name=instance_new)
766
767   @_DoCheckInstances
768   @_DoBatch(True)
769   def BurnStopStart(self):
770     """Stop/start the instances."""
771     Log("Stopping and starting instances")
772     for instance in self.instances:
773       Log("instance %s", instance, indent=1)
774       op1 = self.StopInstanceOp(instance)
775       op2 = self.StartInstanceOp(instance)
776       self.ExecOrQueue(instance, [op1, op2])
777
778   @_DoBatch(False)
779   def BurnRemove(self):
780     """Remove the instances."""
781     Log("Removing instances")
782     for instance in self.to_rem:
783       Log("instance %s", instance, indent=1)
784       op = opcodes.OpRemoveInstance(instance_name=instance,
785                                     ignore_failures=True)
786       self.ExecOrQueue(instance, [op])
787
788   def BurnRename(self):
789     """Rename the instances.
790
791     Note that this function will not execute in parallel, since we
792     only have one target for rename.
793
794     """
795     Log("Renaming instances")
796     rename = self.opts.rename
797     for instance in self.instances:
798       Log("instance %s", instance, indent=1)
799       op_stop1 = self.StopInstanceOp(instance)
800       op_stop2 = self.StopInstanceOp(rename)
801       op_rename1 = self.RenameInstanceOp(instance, rename)
802       op_rename2 = self.RenameInstanceOp(rename, instance)
803       op_start1 = self.StartInstanceOp(rename)
804       op_start2 = self.StartInstanceOp(instance)
805       self.ExecOp(False, op_stop1, op_rename1, op_start1)
806       self._CheckInstanceAlive(rename)
807       self.ExecOp(False, op_stop2, op_rename2, op_start2)
808       self._CheckInstanceAlive(instance)
809
810   @_DoCheckInstances
811   @_DoBatch(True)
812   def BurnReinstall(self):
813     """Reinstall the instances."""
814     Log("Reinstalling instances")
815     for instance in self.instances:
816       Log("instance %s", instance, indent=1)
817       op1 = self.StopInstanceOp(instance)
818       op2 = opcodes.OpReinstallInstance(instance_name=instance)
819       Log("reinstall without passing the OS", indent=2)
820       op3 = opcodes.OpReinstallInstance(instance_name=instance,
821                                         os_type=self.opts.os)
822       Log("reinstall specifying the OS", indent=2)
823       op4 = self.StartInstanceOp(instance)
824       self.ExecOrQueue(instance, [op1, op2, op3, op4])
825
826   @_DoCheckInstances
827   @_DoBatch(True)
828   def BurnReboot(self):
829     """Reboot the instances."""
830     Log("Rebooting instances")
831     for instance in self.instances:
832       Log("instance %s", instance, indent=1)
833       ops = []
834       for reboot_type in self.opts.reboot_types:
835         op = opcodes.OpRebootInstance(instance_name=instance,
836                                       reboot_type=reboot_type,
837                                       ignore_secondaries=False)
838         Log("reboot with type '%s'", reboot_type, indent=2)
839         ops.append(op)
840       self.ExecOrQueue(instance, ops)
841
842   @_DoCheckInstances
843   @_DoBatch(True)
844   def BurnActivateDisks(self):
845     """Activate and deactivate disks of the instances."""
846     Log("Activating/deactivating disks")
847     for instance in self.instances:
848       Log("instance %s", instance, indent=1)
849       op_start = self.StartInstanceOp(instance)
850       op_act = opcodes.OpInstanceActivateDisks(instance_name=instance)
851       op_deact = opcodes.OpInstanceDeactivateDisks(instance_name=instance)
852       op_stop = self.StopInstanceOp(instance)
853       Log("activate disks when online", indent=2)
854       Log("activate disks when offline", indent=2)
855       Log("deactivate disks (when offline)", indent=2)
856       self.ExecOrQueue(instance, [op_act, op_stop, op_act, op_deact, op_start])
857
858   @_DoCheckInstances
859   @_DoBatch(False)
860   def BurnAddRemoveDisks(self):
861     """Add and remove an extra disk for the instances."""
862     Log("Adding and removing disks")
863     for instance in self.instances:
864       Log("instance %s", instance, indent=1)
865       op_add = opcodes.OpSetInstanceParams(\
866         instance_name=instance,
867         disks=[(constants.DDM_ADD, {"size": self.disk_size[0]})])
868       op_rem = opcodes.OpSetInstanceParams(\
869         instance_name=instance, disks=[(constants.DDM_REMOVE, {})])
870       op_stop = self.StopInstanceOp(instance)
871       op_start = self.StartInstanceOp(instance)
872       Log("adding a disk", indent=2)
873       Log("removing last disk", indent=2)
874       self.ExecOrQueue(instance, [op_add, op_stop, op_rem, op_start])
875
876   @_DoBatch(False)
877   def BurnAddRemoveNICs(self):
878     """Add and remove an extra NIC for the instances."""
879     Log("Adding and removing NICs")
880     for instance in self.instances:
881       Log("instance %s", instance, indent=1)
882       op_add = opcodes.OpSetInstanceParams(\
883         instance_name=instance, nics=[(constants.DDM_ADD, {})])
884       op_rem = opcodes.OpSetInstanceParams(\
885         instance_name=instance, nics=[(constants.DDM_REMOVE, {})])
886       Log("adding a NIC", indent=2)
887       Log("removing last NIC", indent=2)
888       self.ExecOrQueue(instance, [op_add, op_rem])
889
890   def ConfdCallback(self, reply):
891     """Callback for confd queries"""
892     if reply.type == confd_client.UPCALL_REPLY:
893       if reply.server_reply.status != constants.CONFD_REPL_STATUS_OK:
894         Err("Query %s gave non-ok status %s: %s" % (reply.orig_request,
895                                                     reply.server_reply.status,
896                                                     reply.server_reply))
897       if reply.orig_request.type == constants.CONFD_REQ_PING:
898         Log("Ping: OK", indent=1)
899       elif reply.orig_request.type == constants.CONFD_REQ_CLUSTER_MASTER:
900         if reply.server_reply.answer == self.cluster_info["master"]:
901           Log("Master: OK", indent=1)
902         else:
903           Err("Master: wrong: %s" % reply.server_reply.answer)
904       elif reply.orig_request.type == constants.CONFD_REQ_NODE_ROLE_BYNAME:
905         if reply.server_reply.answer == constants.CONFD_NODE_ROLE_MASTER:
906           Log("Node role for master: OK", indent=1)
907         else:
908           Err("Node role for master: wrong: %s" % reply.server_reply.answer)
909
910   def DoConfdRequestReply(self, req):
911     self.confd_counting_callback.RegisterQuery(req.rsalt)
912     self.confd_client.SendRequest(req, async=False)
913     while not self.confd_counting_callback.AllAnswered():
914       if not self.confd_client.ReceiveReply():
915         Err("Did not receive all expected confd replies")
916         break
917
918   def BurnConfd(self):
919     """Run confd queries for our instances.
920
921     The following confd queries are tested:
922       - CONFD_REQ_PING: simple ping
923       - CONFD_REQ_CLUSTER_MASTER: cluster master
924       - CONFD_REQ_NODE_ROLE_BYNAME: node role, for the master
925
926     """
927     Log("Checking confd results")
928
929     filter_callback = confd_client.ConfdFilterCallback(self.ConfdCallback)
930     counting_callback = confd_client.ConfdCountingCallback(filter_callback)
931     self.confd_counting_callback = counting_callback
932
933     self.confd_client = confd_client.GetConfdClient(counting_callback)
934
935     req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
936     self.DoConfdRequestReply(req)
937
938     req = confd_client.ConfdClientRequest(
939       type=constants.CONFD_REQ_CLUSTER_MASTER)
940     self.DoConfdRequestReply(req)
941
942     req = confd_client.ConfdClientRequest(
943         type=constants.CONFD_REQ_NODE_ROLE_BYNAME,
944         query=self.cluster_info["master"])
945     self.DoConfdRequestReply(req)
946
947   def _CheckInstanceAlive(self, instance):
948     """Check if an instance is alive by doing http checks.
949
950     This will try to retrieve the url on the instance /hostname.txt
951     and check that it contains the hostname of the instance. In case
952     we get ECONNREFUSED, we retry up to the net timeout seconds, for
953     any other error we abort.
954
955     """
956     if not self.opts.http_check:
957       return
958     end_time = time.time() + self.opts.net_timeout
959     url = None
960     while time.time() < end_time and url is None:
961       try:
962         url = self.url_opener.open("http://%s/hostname.txt" % instance)
963       except IOError:
964         # here we can have connection refused, no route to host, etc.
965         time.sleep(1)
966     if url is None:
967       raise InstanceDown(instance, "Cannot contact instance")
968     hostname = url.read().strip()
969     url.close()
970     if hostname != instance:
971       raise InstanceDown(instance, ("Hostname mismatch, expected %s, got %s" %
972                                     (instance, hostname)))
973
974   def BurninCluster(self):
975     """Test a cluster intensively.
976
977     This will create instances and then start/stop/failover them.
978     It is safe for existing instances but could impact performance.
979
980     """
981
982     opts = self.opts
983
984     Log("Testing global parameters")
985
986     if (len(self.nodes) == 1 and
987         opts.disk_template not in (constants.DT_DISKLESS, constants.DT_PLAIN,
988                                    constants.DT_FILE)):
989       Err("When one node is available/selected the disk template must"
990           " be 'diskless', 'file' or 'plain'")
991
992     has_err = True
993     try:
994       self.BurnCreateInstances()
995       if opts.do_replace1 and opts.disk_template in constants.DTS_NET_MIRROR:
996         self.BurnReplaceDisks1D8()
997       if (opts.do_replace2 and len(self.nodes) > 2 and
998           opts.disk_template in constants.DTS_NET_MIRROR) :
999         self.BurnReplaceDisks2()
1000
1001       if (opts.disk_template in constants.DTS_GROWABLE and
1002           compat.any(n > 0 for n in self.disk_growth)):
1003         self.BurnGrowDisks()
1004
1005       if opts.do_failover and opts.disk_template in constants.DTS_NET_MIRROR:
1006         self.BurnFailover()
1007
1008       if opts.do_migrate:
1009         if opts.disk_template != constants.DT_DRBD8:
1010           Log("Skipping migration (disk template not DRBD8)")
1011         elif not self.hv_class.CAN_MIGRATE:
1012           Log("Skipping migration (hypervisor %s does not support it)",
1013               self.hypervisor)
1014         else:
1015           self.BurnMigrate()
1016
1017       if (opts.do_move and len(self.nodes) > 1 and
1018           opts.disk_template in [constants.DT_PLAIN, constants.DT_FILE]):
1019         self.BurnMove()
1020
1021       if (opts.do_importexport and
1022           opts.disk_template not in (constants.DT_DISKLESS,
1023                                      constants.DT_FILE)):
1024         self.BurnImportExport()
1025
1026       if opts.do_reinstall:
1027         self.BurnReinstall()
1028
1029       if opts.do_reboot:
1030         self.BurnReboot()
1031
1032       if opts.do_addremove_disks:
1033         self.BurnAddRemoveDisks()
1034
1035       default_nic_mode = self.cluster_default_nicparams[constants.NIC_MODE]
1036       # Don't add/remove nics in routed mode, as we would need an ip to add
1037       # them with
1038       if opts.do_addremove_nics:
1039         if default_nic_mode == constants.NIC_MODE_BRIDGED:
1040           self.BurnAddRemoveNICs()
1041         else:
1042           Log("Skipping nic add/remove as the cluster is not in bridged mode")
1043
1044       if opts.do_activate_disks:
1045         self.BurnActivateDisks()
1046
1047       if opts.rename:
1048         self.BurnRename()
1049
1050       if opts.do_confd_tests:
1051         self.BurnConfd()
1052
1053       if opts.do_startstop:
1054         self.BurnStopStart()
1055
1056       has_err = False
1057     finally:
1058       if has_err:
1059         Log("Error detected: opcode buffer follows:\n\n")
1060         Log(self.GetFeedbackBuf())
1061         Log("\n\n")
1062       if not self.opts.keep_instances:
1063         try:
1064           self.BurnRemove()
1065         except Exception, err:  # pylint: disable-msg=W0703
1066           if has_err: # already detected errors, so errors in removal
1067                       # are quite expected
1068             Log("Note: error detected during instance remove: %s", err)
1069           else: # non-expected error
1070             raise
1071
1072     return 0
1073
1074
1075 def main():
1076   """Main function"""
1077
1078   burner = Burner()
1079   return burner.BurninCluster()
1080
1081
1082 if __name__ == "__main__":
1083   main()