Fix burnin error when trying to grow a file volume
[ganeti-local] / tools / burnin
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Burnin program
23
24 """
25
26 import sys
27 import optparse
28 import time
29 import socket
30 import urllib
31 from itertools import izip, islice, cycle
32 from cStringIO import StringIO
33
34 from ganeti import opcodes
35 from ganeti import constants
36 from ganeti import cli
37 from ganeti import errors
38 from ganeti import utils
39 from ganeti import ssconf
40
41 from ganeti.confd import client as confd_client
42
43
44 USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...")
45
46 MAX_RETRIES = 3
47 LOG_HEADERS = {
48   0: "- ",
49   1: "* ",
50   2: ""
51   }
52
53 class InstanceDown(Exception):
54   """The checked instance was not up"""
55
56
57 class BurninFailure(Exception):
58   """Failure detected during burning"""
59
60
61 def Usage():
62   """Shows program usage information and exits the program."""
63
64   print >> sys.stderr, "Usage:"
65   print >> sys.stderr, USAGE
66   sys.exit(2)
67
68
69 def Log(msg, *args, **kwargs):
70   """Simple function that prints out its argument.
71
72   """
73   if args:
74     msg = msg % args
75   indent = kwargs.get('indent', 0)
76   sys.stdout.write("%*s%s%s\n" % (2*indent, "",
77                                   LOG_HEADERS.get(indent, "  "), msg))
78   sys.stdout.flush()
79
80
81 def Err(msg, exit_code=1):
82   """Simple error logging that prints to stderr.
83
84   """
85   sys.stderr.write(msg + "\n")
86   sys.stderr.flush()
87   sys.exit(exit_code)
88
89
90 class SimpleOpener(urllib.FancyURLopener):
91   """A simple url opener"""
92   # pylint: disable-msg=W0221
93
94   def prompt_user_passwd(self, host, realm, clear_cache=0):
95     """No-interaction version of prompt_user_passwd."""
96     # we follow parent class' API
97     # pylint: disable-msg=W0613
98     return None, None
99
100   def http_error_default(self, url, fp, errcode, errmsg, headers):
101     """Custom error handling"""
102     # make sure sockets are not left in CLOSE_WAIT, this is similar
103     # but with a different exception to the BasicURLOpener class
104     _ = fp.read() # throw away data
105     fp.close()
106     raise InstanceDown("HTTP error returned: code %s, msg %s" %
107                        (errcode, errmsg))
108
109
110 OPTIONS = [
111   cli.cli_option("-o", "--os", dest="os", default=None,
112                  help="OS to use during burnin",
113                  metavar="<OS>",
114                  completion_suggest=cli.OPT_COMPL_ONE_OS),
115   cli.cli_option("--disk-size", dest="disk_size",
116                  help="Disk size (determines disk count)",
117                  default="128m", type="string", metavar="<size,size,...>",
118                  completion_suggest=("128M 512M 1G 4G 1G,256M"
119                                      " 4G,1G,1G 10G").split()),
120   cli.cli_option("--disk-growth", dest="disk_growth", help="Disk growth",
121                  default="128m", type="string", metavar="<size,size,...>"),
122   cli.cli_option("--mem-size", dest="mem_size", help="Memory size",
123                  default=128, type="unit", metavar="<size>",
124                  completion_suggest=("128M 256M 512M 1G 4G 8G"
125                                      " 12G 16G").split()),
126   cli.DEBUG_OPT,
127   cli.VERBOSE_OPT,
128   cli.NOIPCHECK_OPT,
129   cli.NONAMECHECK_OPT,
130   cli.EARLY_RELEASE_OPT,
131   cli.cli_option("--no-replace1", dest="do_replace1",
132                  help="Skip disk replacement with the same secondary",
133                  action="store_false", default=True),
134   cli.cli_option("--no-replace2", dest="do_replace2",
135                  help="Skip disk replacement with a different secondary",
136                  action="store_false", default=True),
137   cli.cli_option("--no-failover", dest="do_failover",
138                  help="Skip instance failovers", action="store_false",
139                  default=True),
140   cli.cli_option("--no-migrate", dest="do_migrate",
141                  help="Skip instance live migration",
142                  action="store_false", default=True),
143   cli.cli_option("--no-move", dest="do_move",
144                  help="Skip instance moves", action="store_false",
145                  default=True),
146   cli.cli_option("--no-importexport", dest="do_importexport",
147                  help="Skip instance export/import", action="store_false",
148                  default=True),
149   cli.cli_option("--no-startstop", dest="do_startstop",
150                  help="Skip instance stop/start", action="store_false",
151                  default=True),
152   cli.cli_option("--no-reinstall", dest="do_reinstall",
153                  help="Skip instance reinstall", action="store_false",
154                  default=True),
155   cli.cli_option("--no-reboot", dest="do_reboot",
156                  help="Skip instance reboot", action="store_false",
157                  default=True),
158   cli.cli_option("--no-activate-disks", dest="do_activate_disks",
159                  help="Skip disk activation/deactivation",
160                  action="store_false", default=True),
161   cli.cli_option("--no-add-disks", dest="do_addremove_disks",
162                  help="Skip disk addition/removal",
163                  action="store_false", default=True),
164   cli.cli_option("--no-add-nics", dest="do_addremove_nics",
165                  help="Skip NIC addition/removal",
166                  action="store_false", default=True),
167   cli.cli_option("--no-nics", dest="nics",
168                  help="No network interfaces", action="store_const",
169                  const=[], default=[{}]),
170   cli.cli_option("--no-confd", dest="do_confd_tests",
171                  help="Skip confd queries",
172                  action="store_false", default=True),
173   cli.cli_option("--rename", dest="rename", default=None,
174                  help=("Give one unused instance name which is taken"
175                        " to start the renaming sequence"),
176                  metavar="<instance_name>"),
177   cli.cli_option("-t", "--disk-template", dest="disk_template",
178                  choices=list(constants.DISK_TEMPLATES),
179                  default=constants.DT_DRBD8,
180                  help="Disk template (diskless, file, plain or drbd) [drbd]"),
181   cli.cli_option("-n", "--nodes", dest="nodes", default="",
182                  help=("Comma separated list of nodes to perform"
183                        " the burnin on (defaults to all nodes)"),
184                  completion_suggest=cli.OPT_COMPL_MANY_NODES),
185   cli.cli_option("-I", "--iallocator", dest="iallocator",
186                  default=None, type="string",
187                  help=("Perform the allocation using an iallocator"
188                        " instead of fixed node spread (node restrictions no"
189                        " longer apply, therefore -n/--nodes must not be"
190                        " used"),
191                  completion_suggest=cli.OPT_COMPL_ONE_IALLOCATOR),
192   cli.cli_option("-p", "--parallel", default=False, action="store_true",
193                  dest="parallel",
194                  help=("Enable parallelization of some operations in"
195                        " order to speed burnin or to test granular locking")),
196   cli.cli_option("--net-timeout", default=15, type="int",
197                  dest="net_timeout",
198                  help=("The instance check network timeout in seconds"
199                        " (defaults to 15 seconds)"),
200                  completion_suggest="15 60 300 900".split()),
201   cli.cli_option("-C", "--http-check", default=False, action="store_true",
202                  dest="http_check",
203                  help=("Enable checking of instance status via http,"
204                        " looking for /hostname.txt that should contain the"
205                        " name of the instance")),
206   cli.cli_option("-K", "--keep-instances", default=False,
207                  action="store_true",
208                  dest="keep_instances",
209                  help=("Leave instances on the cluster after burnin,"
210                        " for investigation in case of errors or simply"
211                        " to use them")),
212   ]
213
214 # Mainly used for bash completion
215 ARGUMENTS = [cli.ArgInstance(min=1)]
216
217
218 def _DoCheckInstances(fn):
219   """Decorator for checking instances.
220
221   """
222   def wrapper(self, *args, **kwargs):
223     val = fn(self, *args, **kwargs)
224     for instance in self.instances:
225       self._CheckInstanceAlive(instance) # pylint: disable-msg=W0212
226     return val
227
228   return wrapper
229
230
231 def _DoBatch(retry):
232   """Decorator for possible batch operations.
233
234   Must come after the _DoCheckInstances decorator (if any).
235
236   @param retry: whether this is a retryable batch, will be
237       passed to StartBatch
238
239   """
240   def wrap(fn):
241     def batched(self, *args, **kwargs):
242       self.StartBatch(retry)
243       val = fn(self, *args, **kwargs)
244       self.CommitQueue()
245       return val
246     return batched
247
248   return wrap
249
250
251 class Burner(object):
252   """Burner class."""
253
254   def __init__(self):
255     """Constructor."""
256     utils.SetupLogging(constants.LOG_BURNIN, debug=False, stderr_logging=True)
257     self.url_opener = SimpleOpener()
258     self._feed_buf = StringIO()
259     self.nodes = []
260     self.instances = []
261     self.to_rem = []
262     self.queued_ops = []
263     self.opts = None
264     self.queue_retry = False
265     self.disk_count = self.disk_growth = self.disk_size = None
266     self.hvp = self.bep = None
267     self.ParseOptions()
268     self.cl = cli.GetClient()
269     self.ss = ssconf.SimpleStore()
270     self.GetState()
271
272   def ClearFeedbackBuf(self):
273     """Clear the feedback buffer."""
274     self._feed_buf.truncate(0)
275
276   def GetFeedbackBuf(self):
277     """Return the contents of the buffer."""
278     return self._feed_buf.getvalue()
279
280   def Feedback(self, msg):
281     """Acumulate feedback in our buffer."""
282     formatted_msg = "%s %s" % (time.ctime(utils.MergeTime(msg[0])), msg[2])
283     self._feed_buf.write(formatted_msg + "\n")
284     if self.opts.verbose:
285       Log(formatted_msg, indent=3)
286
287   def MaybeRetry(self, retry_count, msg, fn, *args):
288     """Possibly retry a given function execution.
289
290     @type retry_count: int
291     @param retry_count: retry counter:
292         - 0: non-retryable action
293         - 1: last retry for a retryable action
294         - MAX_RETRIES: original try for a retryable action
295     @type msg: str
296     @param msg: the kind of the operation
297     @type fn: callable
298     @param fn: the function to be called
299
300     """
301     try:
302       val = fn(*args)
303       if retry_count > 0 and retry_count < MAX_RETRIES:
304         Log("Idempotent %s succeeded after %d retries",
305             msg, MAX_RETRIES - retry_count)
306       return val
307     except Exception, err: # pylint: disable-msg=W0703
308       if retry_count == 0:
309         Log("Non-idempotent %s failed, aborting", msg)
310         raise
311       elif retry_count == 1:
312         Log("Idempotent %s repeated failure, aborting", msg)
313         raise
314       else:
315         Log("Idempotent %s failed, retry #%d/%d: %s",
316             msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err)
317         self.MaybeRetry(retry_count - 1, msg, fn, *args)
318
319   def _SetDebug(self, ops):
320     """Set the debug value on the given opcodes"""
321     for op in ops:
322       op.debug_level = self.opts.debug
323
324   def _ExecOp(self, *ops):
325     """Execute one or more opcodes and manage the exec buffer.
326
327     @result: if only opcode has been passed, we return its result;
328         otherwise we return the list of results
329
330     """
331     job_id = cli.SendJob(ops, cl=self.cl)
332     results = cli.PollJob(job_id, cl=self.cl, feedback_fn=self.Feedback)
333     if len(ops) == 1:
334       return results[0]
335     else:
336       return results
337
338   def ExecOp(self, retry, *ops):
339     """Execute one or more opcodes and manage the exec buffer.
340
341     @result: if only opcode has been passed, we return its result;
342         otherwise we return the list of results
343
344     """
345     if retry:
346       rval = MAX_RETRIES
347     else:
348       rval = 0
349     self._SetDebug(ops)
350     return self.MaybeRetry(rval, "opcode", self._ExecOp, *ops)
351
352   def ExecOrQueue(self, name, *ops):
353     """Execute an opcode and manage the exec buffer."""
354     if self.opts.parallel:
355       self._SetDebug(ops)
356       self.queued_ops.append((ops, name))
357     else:
358       return self.ExecOp(self.queue_retry, *ops)
359
360   def StartBatch(self, retry):
361     """Start a new batch of jobs.
362
363     @param retry: whether this is a retryable batch
364
365     """
366     self.queued_ops = []
367     self.queue_retry = retry
368
369   def CommitQueue(self):
370     """Execute all submitted opcodes in case of parallel burnin"""
371     if not self.opts.parallel:
372       return
373
374     if self.queue_retry:
375       rval = MAX_RETRIES
376     else:
377       rval = 0
378
379     try:
380       results = self.MaybeRetry(rval, "jobset", self.ExecJobSet,
381                                 self.queued_ops)
382     finally:
383       self.queued_ops = []
384     return results
385
386   def ExecJobSet(self, jobs):
387     """Execute a set of jobs and return once all are done.
388
389     The method will return the list of results, if all jobs are
390     successful. Otherwise, OpExecError will be raised from within
391     cli.py.
392
393     """
394     self.ClearFeedbackBuf()
395     jex = cli.JobExecutor(cl=self.cl, feedback_fn=self.Feedback)
396     for ops, name in jobs:
397       jex.QueueJob(name, *ops) # pylint: disable-msg=W0142
398     try:
399       results = jex.GetResults()
400     except Exception, err: # pylint: disable-msg=W0703
401       Log("Jobs failed: %s", err)
402       raise BurninFailure()
403
404     if utils.any(results, lambda x: not x[0]):
405       raise BurninFailure()
406
407     return [i[1] for i in results]
408
409   def ParseOptions(self):
410     """Parses the command line options.
411
412     In case of command line errors, it will show the usage and exit the
413     program.
414
415     """
416     parser = optparse.OptionParser(usage="\n%s" % USAGE,
417                                    version=("%%prog (ganeti) %s" %
418                                             constants.RELEASE_VERSION),
419                                    option_list=OPTIONS)
420
421     options, args = parser.parse_args()
422     if len(args) < 1 or options.os is None:
423       Usage()
424
425     supported_disk_templates = (constants.DT_DISKLESS,
426                                 constants.DT_FILE,
427                                 constants.DT_PLAIN,
428                                 constants.DT_DRBD8)
429     if options.disk_template not in supported_disk_templates:
430       Err("Unknown disk template '%s'" % options.disk_template)
431
432     if options.disk_template == constants.DT_DISKLESS:
433       disk_size = disk_growth = []
434       options.do_addremove_disks = False
435     else:
436       disk_size = [utils.ParseUnit(v) for v in options.disk_size.split(",")]
437       disk_growth = [utils.ParseUnit(v)
438                      for v in options.disk_growth.split(",")]
439       if len(disk_growth) != len(disk_size):
440         Err("Wrong disk sizes/growth combination")
441     if ((disk_size and options.disk_template == constants.DT_DISKLESS) or
442         (not disk_size and options.disk_template != constants.DT_DISKLESS)):
443       Err("Wrong disk count/disk template combination")
444
445     self.disk_size = disk_size
446     self.disk_growth = disk_growth
447     self.disk_count = len(disk_size)
448
449     if options.nodes and options.iallocator:
450       Err("Give either the nodes option or the iallocator option, not both")
451
452     if options.http_check and not options.name_check:
453       Err("Can't enable HTTP checks without name checks")
454
455     self.opts = options
456     self.instances = args
457     self.bep = {
458       constants.BE_MEMORY: options.mem_size,
459       constants.BE_VCPUS: 1,
460       }
461     self.hvp = {}
462
463     socket.setdefaulttimeout(options.net_timeout)
464
465   def GetState(self):
466     """Read the cluster state from the master daemon."""
467     if self.opts.nodes:
468       names = self.opts.nodes.split(",")
469     else:
470       names = []
471     try:
472       op = opcodes.OpQueryNodes(output_fields=["name", "offline", "drained"],
473                                 names=names, use_locking=True)
474       result = self.ExecOp(True, op)
475     except errors.GenericError, err:
476       err_code, msg = cli.FormatError(err)
477       Err(msg, exit_code=err_code)
478     self.nodes = [data[0] for data in result if not (data[1] or data[2])]
479
480     op_diagnose = opcodes.OpDiagnoseOS(output_fields=["name", "valid",
481                                                       "variants"], names=[])
482     result = self.ExecOp(True, op_diagnose)
483
484     if not result:
485       Err("Can't get the OS list")
486
487     found = False
488     for (name, valid, variants) in result:
489       if valid and self.opts.os in cli.CalculateOSNames(name, variants):
490         found = True
491         break
492
493     if not found:
494       Err("OS '%s' not found" % self.opts.os)
495
496     cluster_info = self.cl.QueryClusterInfo()
497     self.cluster_info = cluster_info
498     if not self.cluster_info:
499       Err("Can't get cluster info")
500
501     default_nic_params = self.cluster_info["nicparams"][constants.PP_DEFAULT]
502     self.cluster_default_nicparams = default_nic_params
503
504   @_DoCheckInstances
505   @_DoBatch(False)
506   def BurnCreateInstances(self):
507     """Create the given instances.
508
509     """
510     self.to_rem = []
511     mytor = izip(cycle(self.nodes),
512                  islice(cycle(self.nodes), 1, None),
513                  self.instances)
514
515     Log("Creating instances")
516     for pnode, snode, instance in mytor:
517       Log("instance %s", instance, indent=1)
518       if self.opts.iallocator:
519         pnode = snode = None
520         msg = "with iallocator %s" % self.opts.iallocator
521       elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
522         snode = None
523         msg = "on %s" % pnode
524       else:
525         msg = "on %s, %s" % (pnode, snode)
526
527       Log(msg, indent=2)
528
529       op = opcodes.OpCreateInstance(instance_name=instance,
530                                     disks = [ {"size": size}
531                                               for size in self.disk_size],
532                                     disk_template=self.opts.disk_template,
533                                     nics=self.opts.nics,
534                                     mode=constants.INSTANCE_CREATE,
535                                     os_type=self.opts.os,
536                                     pnode=pnode,
537                                     snode=snode,
538                                     start=True,
539                                     ip_check=self.opts.ip_check,
540                                     name_check=self.opts.name_check,
541                                     wait_for_sync=True,
542                                     file_driver="loop",
543                                     file_storage_dir=None,
544                                     iallocator=self.opts.iallocator,
545                                     beparams=self.bep,
546                                     hvparams=self.hvp,
547                                     )
548
549       self.ExecOrQueue(instance, op)
550       self.to_rem.append(instance)
551
552   @_DoBatch(False)
553   def BurnGrowDisks(self):
554     """Grow both the os and the swap disks by the requested amount, if any."""
555     Log("Growing disks")
556     for instance in self.instances:
557       Log("instance %s", instance, indent=1)
558       for idx, growth in enumerate(self.disk_growth):
559         if growth > 0:
560           op = opcodes.OpGrowDisk(instance_name=instance, disk=idx,
561                                   amount=growth, wait_for_sync=True)
562           Log("increase disk/%s by %s MB", idx, growth, indent=2)
563           self.ExecOrQueue(instance, op)
564
565   @_DoBatch(True)
566   def BurnReplaceDisks1D8(self):
567     """Replace disks on primary and secondary for drbd8."""
568     Log("Replacing disks on the same nodes")
569     for instance in self.instances:
570       Log("instance %s", instance, indent=1)
571       ops = []
572       for mode in constants.REPLACE_DISK_SEC, constants.REPLACE_DISK_PRI:
573         op = opcodes.OpReplaceDisks(instance_name=instance,
574                                     mode=mode,
575                                     disks=[i for i in range(self.disk_count)],
576                                     early_release=self.opts.early_release)
577         Log("run %s", mode, indent=2)
578         ops.append(op)
579       self.ExecOrQueue(instance, *ops) # pylint: disable-msg=W0142
580
581   @_DoBatch(True)
582   def BurnReplaceDisks2(self):
583     """Replace secondary node."""
584     Log("Changing the secondary node")
585     mode = constants.REPLACE_DISK_CHG
586
587     mytor = izip(islice(cycle(self.nodes), 2, None),
588                  self.instances)
589     for tnode, instance in mytor:
590       Log("instance %s", instance, indent=1)
591       if self.opts.iallocator:
592         tnode = None
593         msg = "with iallocator %s" % self.opts.iallocator
594       else:
595         msg = tnode
596       op = opcodes.OpReplaceDisks(instance_name=instance,
597                                   mode=mode,
598                                   remote_node=tnode,
599                                   iallocator=self.opts.iallocator,
600                                   disks=[],
601                                   early_release=self.opts.early_release)
602       Log("run %s %s", mode, msg, indent=2)
603       self.ExecOrQueue(instance, op)
604
605   @_DoCheckInstances
606   @_DoBatch(False)
607   def BurnFailover(self):
608     """Failover the instances."""
609     Log("Failing over instances")
610     for instance in self.instances:
611       Log("instance %s", instance, indent=1)
612       op = opcodes.OpFailoverInstance(instance_name=instance,
613                                       ignore_consistency=False)
614       self.ExecOrQueue(instance, op)
615
616   @_DoCheckInstances
617   @_DoBatch(False)
618   def BurnMove(self):
619     """Move the instances."""
620     Log("Moving instances")
621     mytor = izip(islice(cycle(self.nodes), 1, None),
622                  self.instances)
623     for tnode, instance in mytor:
624       Log("instance %s", instance, indent=1)
625       op = opcodes.OpMoveInstance(instance_name=instance,
626                                   target_node=tnode)
627       self.ExecOrQueue(instance, op)
628
629   @_DoBatch(False)
630   def BurnMigrate(self):
631     """Migrate the instances."""
632     Log("Migrating instances")
633     for instance in self.instances:
634       Log("instance %s", instance, indent=1)
635       op1 = opcodes.OpMigrateInstance(instance_name=instance, live=True,
636                                       cleanup=False)
637
638       op2 = opcodes.OpMigrateInstance(instance_name=instance, live=True,
639                                       cleanup=True)
640       Log("migration and migration cleanup", indent=2)
641       self.ExecOrQueue(instance, op1, op2)
642
643   @_DoCheckInstances
644   @_DoBatch(False)
645   def BurnImportExport(self):
646     """Export the instance, delete it, and import it back.
647
648     """
649     Log("Exporting and re-importing instances")
650     mytor = izip(cycle(self.nodes),
651                  islice(cycle(self.nodes), 1, None),
652                  islice(cycle(self.nodes), 2, None),
653                  self.instances)
654
655     for pnode, snode, enode, instance in mytor:
656       Log("instance %s", instance, indent=1)
657       # read the full name of the instance
658       nam_op = opcodes.OpQueryInstances(output_fields=["name"],
659                                         names=[instance], use_locking=True)
660       full_name = self.ExecOp(False, nam_op)[0][0]
661
662       if self.opts.iallocator:
663         pnode = snode = None
664         import_log_msg = ("import from %s"
665                           " with iallocator %s" %
666                           (enode, self.opts.iallocator))
667       elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
668         snode = None
669         import_log_msg = ("import from %s to %s" %
670                           (enode, pnode))
671       else:
672         import_log_msg = ("import from %s to %s, %s" %
673                           (enode, pnode, snode))
674
675       exp_op = opcodes.OpExportInstance(instance_name=instance,
676                                            target_node=enode,
677                                            shutdown=True)
678       rem_op = opcodes.OpRemoveInstance(instance_name=instance,
679                                         ignore_failures=True)
680       imp_dir = utils.PathJoin(constants.EXPORT_DIR, full_name)
681       imp_op = opcodes.OpCreateInstance(instance_name=instance,
682                                         disks = [ {"size": size}
683                                                   for size in self.disk_size],
684                                         disk_template=self.opts.disk_template,
685                                         nics=self.opts.nics,
686                                         mode=constants.INSTANCE_IMPORT,
687                                         src_node=enode,
688                                         src_path=imp_dir,
689                                         pnode=pnode,
690                                         snode=snode,
691                                         start=True,
692                                         ip_check=self.opts.ip_check,
693                                         name_check=self.opts.name_check,
694                                         wait_for_sync=True,
695                                         file_storage_dir=None,
696                                         file_driver="loop",
697                                         iallocator=self.opts.iallocator,
698                                         beparams=self.bep,
699                                         hvparams=self.hvp,
700                                         )
701
702       erem_op = opcodes.OpRemoveExport(instance_name=instance)
703
704       Log("export to node %s", enode, indent=2)
705       Log("remove instance", indent=2)
706       Log(import_log_msg, indent=2)
707       Log("remove export", indent=2)
708       self.ExecOrQueue(instance, exp_op, rem_op, imp_op, erem_op)
709
710   @staticmethod
711   def StopInstanceOp(instance):
712     """Stop given instance."""
713     return opcodes.OpShutdownInstance(instance_name=instance)
714
715   @staticmethod
716   def StartInstanceOp(instance):
717     """Start given instance."""
718     return opcodes.OpStartupInstance(instance_name=instance, force=False)
719
720   @staticmethod
721   def RenameInstanceOp(instance, instance_new):
722     """Rename instance."""
723     return opcodes.OpRenameInstance(instance_name=instance,
724                                     new_name=instance_new)
725
726   @_DoCheckInstances
727   @_DoBatch(True)
728   def BurnStopStart(self):
729     """Stop/start the instances."""
730     Log("Stopping and starting instances")
731     for instance in self.instances:
732       Log("instance %s", instance, indent=1)
733       op1 = self.StopInstanceOp(instance)
734       op2 = self.StartInstanceOp(instance)
735       self.ExecOrQueue(instance, op1, op2)
736
737   @_DoBatch(False)
738   def BurnRemove(self):
739     """Remove the instances."""
740     Log("Removing instances")
741     for instance in self.to_rem:
742       Log("instance %s", instance, indent=1)
743       op = opcodes.OpRemoveInstance(instance_name=instance,
744                                     ignore_failures=True)
745       self.ExecOrQueue(instance, op)
746
747   def BurnRename(self):
748     """Rename the instances.
749
750     Note that this function will not execute in parallel, since we
751     only have one target for rename.
752
753     """
754     Log("Renaming instances")
755     rename = self.opts.rename
756     for instance in self.instances:
757       Log("instance %s", instance, indent=1)
758       op_stop1 = self.StopInstanceOp(instance)
759       op_stop2 = self.StopInstanceOp(rename)
760       op_rename1 = self.RenameInstanceOp(instance, rename)
761       op_rename2 = self.RenameInstanceOp(rename, instance)
762       op_start1 = self.StartInstanceOp(rename)
763       op_start2 = self.StartInstanceOp(instance)
764       self.ExecOp(False, op_stop1, op_rename1, op_start1)
765       self._CheckInstanceAlive(rename)
766       self.ExecOp(False, op_stop2, op_rename2, op_start2)
767       self._CheckInstanceAlive(instance)
768
769   @_DoCheckInstances
770   @_DoBatch(True)
771   def BurnReinstall(self):
772     """Reinstall the instances."""
773     Log("Reinstalling instances")
774     for instance in self.instances:
775       Log("instance %s", instance, indent=1)
776       op1 = self.StopInstanceOp(instance)
777       op2 = opcodes.OpReinstallInstance(instance_name=instance)
778       Log("reinstall without passing the OS", indent=2)
779       op3 = opcodes.OpReinstallInstance(instance_name=instance,
780                                         os_type=self.opts.os)
781       Log("reinstall specifying the OS", indent=2)
782       op4 = self.StartInstanceOp(instance)
783       self.ExecOrQueue(instance, op1, op2, op3, op4)
784
785   @_DoCheckInstances
786   @_DoBatch(True)
787   def BurnReboot(self):
788     """Reboot the instances."""
789     Log("Rebooting instances")
790     for instance in self.instances:
791       Log("instance %s", instance, indent=1)
792       ops = []
793       for reboot_type in constants.REBOOT_TYPES:
794         op = opcodes.OpRebootInstance(instance_name=instance,
795                                       reboot_type=reboot_type,
796                                       ignore_secondaries=False)
797         Log("reboot with type '%s'", reboot_type, indent=2)
798         ops.append(op)
799       self.ExecOrQueue(instance, *ops) # pylint: disable-msg=W0142
800
801   @_DoCheckInstances
802   @_DoBatch(True)
803   def BurnActivateDisks(self):
804     """Activate and deactivate disks of the instances."""
805     Log("Activating/deactivating disks")
806     for instance in self.instances:
807       Log("instance %s", instance, indent=1)
808       op_start = self.StartInstanceOp(instance)
809       op_act = opcodes.OpActivateInstanceDisks(instance_name=instance)
810       op_deact = opcodes.OpDeactivateInstanceDisks(instance_name=instance)
811       op_stop = self.StopInstanceOp(instance)
812       Log("activate disks when online", indent=2)
813       Log("activate disks when offline", indent=2)
814       Log("deactivate disks (when offline)", indent=2)
815       self.ExecOrQueue(instance, op_act, op_stop, op_act, op_deact, op_start)
816
817   @_DoCheckInstances
818   @_DoBatch(False)
819   def BurnAddRemoveDisks(self):
820     """Add and remove an extra disk for the instances."""
821     Log("Adding and removing disks")
822     for instance in self.instances:
823       Log("instance %s", instance, indent=1)
824       op_add = opcodes.OpSetInstanceParams(\
825         instance_name=instance,
826         disks=[(constants.DDM_ADD, {"size": self.disk_size[0]})])
827       op_rem = opcodes.OpSetInstanceParams(\
828         instance_name=instance, disks=[(constants.DDM_REMOVE, {})])
829       op_stop = self.StopInstanceOp(instance)
830       op_start = self.StartInstanceOp(instance)
831       Log("adding a disk", indent=2)
832       Log("removing last disk", indent=2)
833       self.ExecOrQueue(instance, op_add, op_stop, op_rem, op_start)
834
835   @_DoBatch(False)
836   def BurnAddRemoveNICs(self):
837     """Add and remove an extra NIC for the instances."""
838     Log("Adding and removing NICs")
839     for instance in self.instances:
840       Log("instance %s", instance, indent=1)
841       op_add = opcodes.OpSetInstanceParams(\
842         instance_name=instance, nics=[(constants.DDM_ADD, {})])
843       op_rem = opcodes.OpSetInstanceParams(\
844         instance_name=instance, nics=[(constants.DDM_REMOVE, {})])
845       Log("adding a NIC", indent=2)
846       Log("removing last NIC", indent=2)
847       self.ExecOrQueue(instance, op_add, op_rem)
848
849   def ConfdCallback(self, reply):
850     """Callback for confd queries"""
851     if reply.type == confd_client.UPCALL_REPLY:
852       if reply.server_reply.status != constants.CONFD_REPL_STATUS_OK:
853         Err("Query %s gave non-ok status %s: %s" % (reply.orig_request,
854                                                     reply.server_reply.status,
855                                                     reply.server_reply))
856       if reply.orig_request.type == constants.CONFD_REQ_PING:
857         Log("Ping: OK", indent=1)
858       elif reply.orig_request.type == constants.CONFD_REQ_CLUSTER_MASTER:
859         if reply.server_reply.answer == self.cluster_info["master"]:
860           Log("Master: OK", indent=1)
861         else:
862           Err("Master: wrong: %s" % reply.server_reply.answer)
863       elif reply.orig_request.type == constants.CONFD_REQ_NODE_ROLE_BYNAME:
864         if reply.server_reply.answer == constants.CONFD_NODE_ROLE_MASTER:
865           Log("Node role for master: OK", indent=1)
866         else:
867           Err("Node role for master: wrong: %s" % reply.server_reply.answer)
868
869   def DoConfdRequestReply(self, req):
870     self.confd_counting_callback.RegisterQuery(req.rsalt)
871     self.confd_client.SendRequest(req, async=False)
872     while not self.confd_counting_callback.AllAnswered():
873       if not self.confd_client.ReceiveReply():
874         Err("Did not receive all expected confd replies")
875         break
876
877   def BurnConfd(self):
878     """Run confd queries for our instances.
879
880     The following confd queries are tested:
881     - CONFD_REQ_PING: simple ping
882     - CONFD_REQ_CLUSTER_MASTER: cluster master
883     - CONFD_REQ_NODE_ROLE_BYNAME: node role, for the master
884
885     """
886     Log("Checking confd results")
887
888     hmac_key = utils.ReadFile(constants.CONFD_HMAC_KEY)
889     mc_file = self.ss.KeyToFilename(constants.SS_MASTER_CANDIDATES_IPS)
890     mc_list = utils.ReadFile(mc_file).splitlines()
891     filter_callback = confd_client.ConfdFilterCallback(self.ConfdCallback)
892     counting_callback = confd_client.ConfdCountingCallback(filter_callback)
893     self.confd_counting_callback = counting_callback
894
895     self.confd_client = confd_client.ConfdClient(hmac_key, mc_list,
896                                                  counting_callback)
897
898     req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
899     self.DoConfdRequestReply(req)
900
901     req = confd_client.ConfdClientRequest(
902       type=constants.CONFD_REQ_CLUSTER_MASTER)
903     self.DoConfdRequestReply(req)
904
905     req = confd_client.ConfdClientRequest(
906         type=constants.CONFD_REQ_NODE_ROLE_BYNAME,
907         query=self.cluster_info["master"])
908     self.DoConfdRequestReply(req)
909
910   def _CheckInstanceAlive(self, instance):
911     """Check if an instance is alive by doing http checks.
912
913     This will try to retrieve the url on the instance /hostname.txt
914     and check that it contains the hostname of the instance. In case
915     we get ECONNREFUSED, we retry up to the net timeout seconds, for
916     any other error we abort.
917
918     """
919     if not self.opts.http_check:
920       return
921     end_time = time.time() + self.opts.net_timeout
922     url = None
923     while time.time() < end_time and url is None:
924       try:
925         url = self.url_opener.open("http://%s/hostname.txt" % instance)
926       except IOError:
927         # here we can have connection refused, no route to host, etc.
928         time.sleep(1)
929     if url is None:
930       raise InstanceDown(instance, "Cannot contact instance")
931     hostname = url.read().strip()
932     url.close()
933     if hostname != instance:
934       raise InstanceDown(instance, ("Hostname mismatch, expected %s, got %s" %
935                                     (instance, hostname)))
936
937   def BurninCluster(self):
938     """Test a cluster intensively.
939
940     This will create instances and then start/stop/failover them.
941     It is safe for existing instances but could impact performance.
942
943     """
944
945     opts = self.opts
946
947     Log("Testing global parameters")
948
949     if (len(self.nodes) == 1 and
950         opts.disk_template not in (constants.DT_DISKLESS, constants.DT_PLAIN,
951                                    constants.DT_FILE)):
952       Err("When one node is available/selected the disk template must"
953           " be 'diskless', 'file' or 'plain'")
954
955     has_err = True
956     try:
957       self.BurnCreateInstances()
958       if opts.do_replace1 and opts.disk_template in constants.DTS_NET_MIRROR:
959         self.BurnReplaceDisks1D8()
960       if (opts.do_replace2 and len(self.nodes) > 2 and
961           opts.disk_template in constants.DTS_NET_MIRROR) :
962         self.BurnReplaceDisks2()
963
964       if (opts.disk_template in constants.DTS_GROWABLE and
965           utils.any(self.disk_growth, lambda n: n > 0)):
966         self.BurnGrowDisks()
967
968       if opts.do_failover and opts.disk_template in constants.DTS_NET_MIRROR:
969         self.BurnFailover()
970
971       if opts.do_migrate and opts.disk_template == constants.DT_DRBD8:
972         self.BurnMigrate()
973
974       if (opts.do_move and len(self.nodes) > 1 and
975           opts.disk_template in [constants.DT_PLAIN, constants.DT_FILE]):
976         self.BurnMove()
977
978       if (opts.do_importexport and
979           opts.disk_template not in (constants.DT_DISKLESS,
980                                      constants.DT_FILE)):
981         self.BurnImportExport()
982
983       if opts.do_reinstall:
984         self.BurnReinstall()
985
986       if opts.do_reboot:
987         self.BurnReboot()
988
989       if opts.do_addremove_disks:
990         self.BurnAddRemoveDisks()
991
992       default_nic_mode = self.cluster_default_nicparams[constants.NIC_MODE]
993       # Don't add/remove nics in routed mode, as we would need an ip to add
994       # them with
995       if opts.do_addremove_nics:
996         if default_nic_mode == constants.NIC_MODE_BRIDGED:
997           self.BurnAddRemoveNICs()
998         else:
999           Log("Skipping nic add/remove as the cluster is not in bridged mode")
1000
1001       if opts.do_activate_disks:
1002         self.BurnActivateDisks()
1003
1004       if opts.rename:
1005         self.BurnRename()
1006
1007       if opts.do_confd_tests:
1008         self.BurnConfd()
1009
1010       if opts.do_startstop:
1011         self.BurnStopStart()
1012
1013       has_err = False
1014     finally:
1015       if has_err:
1016         Log("Error detected: opcode buffer follows:\n\n")
1017         Log(self.GetFeedbackBuf())
1018         Log("\n\n")
1019       if not self.opts.keep_instances:
1020         try:
1021           self.BurnRemove()
1022         except Exception, err:  # pylint: disable-msg=W0703
1023           if has_err: # already detected errors, so errors in removal
1024                       # are quite expected
1025             Log("Note: error detected during instance remove: %s", err)
1026           else: # non-expected error
1027             raise
1028
1029     return 0
1030
1031
1032 def main():
1033   """Main function"""
1034
1035   burner = Burner()
1036   return burner.BurninCluster()
1037
1038
1039 if __name__ == "__main__":
1040   main()