Statistics
| Branch: | Tag: | Revision:

root / tools / burnin @ ee3e37a7

History | View | Annotate | Download (38.4 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Burnin program
23

    
24
"""
25

    
26
import sys
27
import optparse
28
import time
29
import socket
30
import urllib
31
from itertools import izip, islice, cycle
32
from cStringIO import StringIO
33

    
34
from ganeti import opcodes
35
from ganeti import constants
36
from ganeti import cli
37
from ganeti import errors
38
from ganeti import utils
39
from ganeti import hypervisor
40
from ganeti import compat
41

    
42
from ganeti.confd import client as confd_client
43

    
44

    
45
USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...")
46

    
47
MAX_RETRIES = 3
48
LOG_HEADERS = {
49
  0: "- ",
50
  1: "* ",
51
  2: ""
52
  }
53

    
54
class InstanceDown(Exception):
55
  """The checked instance was not up"""
56

    
57

    
58
class BurninFailure(Exception):
59
  """Failure detected during burning"""
60

    
61

    
62
def Usage():
63
  """Shows program usage information and exits the program."""
64

    
65
  print >> sys.stderr, "Usage:"
66
  print >> sys.stderr, USAGE
67
  sys.exit(2)
68

    
69

    
70
def Log(msg, *args, **kwargs):
71
  """Simple function that prints out its argument.
72

    
73
  """
74
  if args:
75
    msg = msg % args
76
  indent = kwargs.get('indent', 0)
77
  sys.stdout.write("%*s%s%s\n" % (2*indent, "",
78
                                  LOG_HEADERS.get(indent, "  "), msg))
79
  sys.stdout.flush()
80

    
81

    
82
def Err(msg, exit_code=1):
83
  """Simple error logging that prints to stderr.
84

    
85
  """
86
  sys.stderr.write(msg + "\n")
87
  sys.stderr.flush()
88
  sys.exit(exit_code)
89

    
90

    
91
class SimpleOpener(urllib.FancyURLopener):
92
  """A simple url opener"""
93
  # pylint: disable-msg=W0221
94

    
95
  def prompt_user_passwd(self, host, realm, clear_cache=0):
96
    """No-interaction version of prompt_user_passwd."""
97
    # we follow parent class' API
98
    # pylint: disable-msg=W0613
99
    return None, None
100

    
101
  def http_error_default(self, url, fp, errcode, errmsg, headers):
102
    """Custom error handling"""
103
    # make sure sockets are not left in CLOSE_WAIT, this is similar
104
    # but with a different exception to the BasicURLOpener class
105
    _ = fp.read() # throw away data
106
    fp.close()
107
    raise InstanceDown("HTTP error returned: code %s, msg %s" %
108
                       (errcode, errmsg))
109

    
110

    
111
OPTIONS = [
112
  cli.cli_option("-o", "--os", dest="os", default=None,
113
                 help="OS to use during burnin",
114
                 metavar="<OS>",
115
                 completion_suggest=cli.OPT_COMPL_ONE_OS),
116
  cli.HYPERVISOR_OPT,
117
  cli.OSPARAMS_OPT,
118
  cli.cli_option("--disk-size", dest="disk_size",
119
                 help="Disk size (determines disk count)",
120
                 default="128m", type="string", metavar="<size,size,...>",
121
                 completion_suggest=("128M 512M 1G 4G 1G,256M"
122
                                     " 4G,1G,1G 10G").split()),
123
  cli.cli_option("--disk-growth", dest="disk_growth", help="Disk growth",
124
                 default="128m", type="string", metavar="<size,size,...>"),
125
  cli.cli_option("--mem-size", dest="mem_size", help="Memory size",
126
                 default=128, type="unit", metavar="<size>",
127
                 completion_suggest=("128M 256M 512M 1G 4G 8G"
128
                                     " 12G 16G").split()),
129
  cli.DEBUG_OPT,
130
  cli.VERBOSE_OPT,
131
  cli.NOIPCHECK_OPT,
132
  cli.NONAMECHECK_OPT,
133
  cli.EARLY_RELEASE_OPT,
134
  cli.cli_option("--no-replace1", dest="do_replace1",
135
                 help="Skip disk replacement with the same secondary",
136
                 action="store_false", default=True),
137
  cli.cli_option("--no-replace2", dest="do_replace2",
138
                 help="Skip disk replacement with a different secondary",
139
                 action="store_false", default=True),
140
  cli.cli_option("--no-failover", dest="do_failover",
141
                 help="Skip instance failovers", action="store_false",
142
                 default=True),
143
  cli.cli_option("--no-migrate", dest="do_migrate",
144
                 help="Skip instance live migration",
145
                 action="store_false", default=True),
146
  cli.cli_option("--no-move", dest="do_move",
147
                 help="Skip instance moves", action="store_false",
148
                 default=True),
149
  cli.cli_option("--no-importexport", dest="do_importexport",
150
                 help="Skip instance export/import", action="store_false",
151
                 default=True),
152
  cli.cli_option("--no-startstop", dest="do_startstop",
153
                 help="Skip instance stop/start", action="store_false",
154
                 default=True),
155
  cli.cli_option("--no-reinstall", dest="do_reinstall",
156
                 help="Skip instance reinstall", action="store_false",
157
                 default=True),
158
  cli.cli_option("--no-reboot", dest="do_reboot",
159
                 help="Skip instance reboot", action="store_false",
160
                 default=True),
161
  cli.cli_option("--reboot-types", dest="reboot_types",
162
                 help="Specify the reboot types", default=None),
163
  cli.cli_option("--no-activate-disks", dest="do_activate_disks",
164
                 help="Skip disk activation/deactivation",
165
                 action="store_false", default=True),
166
  cli.cli_option("--no-add-disks", dest="do_addremove_disks",
167
                 help="Skip disk addition/removal",
168
                 action="store_false", default=True),
169
  cli.cli_option("--no-add-nics", dest="do_addremove_nics",
170
                 help="Skip NIC addition/removal",
171
                 action="store_false", default=True),
172
  cli.cli_option("--no-nics", dest="nics",
173
                 help="No network interfaces", action="store_const",
174
                 const=[], default=[{}]),
175
  cli.cli_option("--no-confd", dest="do_confd_tests",
176
                 help="Skip confd queries",
177
                 action="store_false", default=True),
178
  cli.cli_option("--rename", dest="rename", default=None,
179
                 help=("Give one unused instance name which is taken"
180
                       " to start the renaming sequence"),
181
                 metavar="<instance_name>"),
182
  cli.cli_option("-t", "--disk-template", dest="disk_template",
183
                 choices=list(constants.DISK_TEMPLATES),
184
                 default=constants.DT_DRBD8,
185
                 help="Disk template (diskless, file, plain or drbd) [drbd]"),
186
  cli.cli_option("-n", "--nodes", dest="nodes", default="",
187
                 help=("Comma separated list of nodes to perform"
188
                       " the burnin on (defaults to all nodes)"),
189
                 completion_suggest=cli.OPT_COMPL_MANY_NODES),
190
  cli.cli_option("-I", "--iallocator", dest="iallocator",
191
                 default=None, type="string",
192
                 help=("Perform the allocation using an iallocator"
193
                       " instead of fixed node spread (node restrictions no"
194
                       " longer apply, therefore -n/--nodes must not be"
195
                       " used"),
196
                 completion_suggest=cli.OPT_COMPL_ONE_IALLOCATOR),
197
  cli.cli_option("-p", "--parallel", default=False, action="store_true",
198
                 dest="parallel",
199
                 help=("Enable parallelization of some operations in"
200
                       " order to speed burnin or to test granular locking")),
201
  cli.cli_option("--net-timeout", default=15, type="int",
202
                 dest="net_timeout",
203
                 help=("The instance check network timeout in seconds"
204
                       " (defaults to 15 seconds)"),
205
                 completion_suggest="15 60 300 900".split()),
206
  cli.cli_option("-C", "--http-check", default=False, action="store_true",
207
                 dest="http_check",
208
                 help=("Enable checking of instance status via http,"
209
                       " looking for /hostname.txt that should contain the"
210
                       " name of the instance")),
211
  cli.cli_option("-K", "--keep-instances", default=False,
212
                 action="store_true",
213
                 dest="keep_instances",
214
                 help=("Leave instances on the cluster after burnin,"
215
                       " for investigation in case of errors or simply"
216
                       " to use them")),
217
  ]
218

    
219
# Mainly used for bash completion
220
ARGUMENTS = [cli.ArgInstance(min=1)]
221

    
222

    
223
def _DoCheckInstances(fn):
224
  """Decorator for checking instances.
225

    
226
  """
227
  def wrapper(self, *args, **kwargs):
228
    val = fn(self, *args, **kwargs)
229
    for instance in self.instances:
230
      self._CheckInstanceAlive(instance) # pylint: disable-msg=W0212
231
    return val
232

    
233
  return wrapper
234

    
235

    
236
def _DoBatch(retry):
237
  """Decorator for possible batch operations.
238

    
239
  Must come after the _DoCheckInstances decorator (if any).
240

    
241
  @param retry: whether this is a retryable batch, will be
242
      passed to StartBatch
243

    
244
  """
245
  def wrap(fn):
246
    def batched(self, *args, **kwargs):
247
      self.StartBatch(retry)
248
      val = fn(self, *args, **kwargs)
249
      self.CommitQueue()
250
      return val
251
    return batched
252

    
253
  return wrap
254

    
255

    
256
class Burner(object):
257
  """Burner class."""
258

    
259
  def __init__(self):
260
    """Constructor."""
261
    utils.SetupLogging(constants.LOG_BURNIN, debug=False, stderr_logging=True)
262
    self.url_opener = SimpleOpener()
263
    self._feed_buf = StringIO()
264
    self.nodes = []
265
    self.instances = []
266
    self.to_rem = []
267
    self.queued_ops = []
268
    self.opts = None
269
    self.queue_retry = False
270
    self.disk_count = self.disk_growth = self.disk_size = None
271
    self.hvp = self.bep = None
272
    self.ParseOptions()
273
    self.cl = cli.GetClient()
274
    self.GetState()
275

    
276
  def ClearFeedbackBuf(self):
277
    """Clear the feedback buffer."""
278
    self._feed_buf.truncate(0)
279

    
280
  def GetFeedbackBuf(self):
281
    """Return the contents of the buffer."""
282
    return self._feed_buf.getvalue()
283

    
284
  def Feedback(self, msg):
285
    """Acumulate feedback in our buffer."""
286
    formatted_msg = "%s %s" % (time.ctime(utils.MergeTime(msg[0])), msg[2])
287
    self._feed_buf.write(formatted_msg + "\n")
288
    if self.opts.verbose:
289
      Log(formatted_msg, indent=3)
290

    
291
  def MaybeRetry(self, retry_count, msg, fn, *args):
292
    """Possibly retry a given function execution.
293

    
294
    @type retry_count: int
295
    @param retry_count: retry counter:
296
        - 0: non-retryable action
297
        - 1: last retry for a retryable action
298
        - MAX_RETRIES: original try for a retryable action
299
    @type msg: str
300
    @param msg: the kind of the operation
301
    @type fn: callable
302
    @param fn: the function to be called
303

    
304
    """
305
    try:
306
      val = fn(*args)
307
      if retry_count > 0 and retry_count < MAX_RETRIES:
308
        Log("Idempotent %s succeeded after %d retries",
309
            msg, MAX_RETRIES - retry_count)
310
      return val
311
    except Exception, err: # pylint: disable-msg=W0703
312
      if retry_count == 0:
313
        Log("Non-idempotent %s failed, aborting", msg)
314
        raise
315
      elif retry_count == 1:
316
        Log("Idempotent %s repeated failure, aborting", msg)
317
        raise
318
      else:
319
        Log("Idempotent %s failed, retry #%d/%d: %s",
320
            msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err)
321
        self.MaybeRetry(retry_count - 1, msg, fn, *args)
322

    
323
  def _SetDebug(self, ops):
324
    """Set the debug value on the given opcodes"""
325
    for op in ops:
326
      op.debug_level = self.opts.debug
327

    
328
  def _ExecOp(self, *ops):
329
    """Execute one or more opcodes and manage the exec buffer.
330

    
331
    @return: if only opcode has been passed, we return its result;
332
        otherwise we return the list of results
333

    
334
    """
335
    job_id = cli.SendJob(ops, cl=self.cl)
336
    results = cli.PollJob(job_id, cl=self.cl, feedback_fn=self.Feedback)
337
    if len(ops) == 1:
338
      return results[0]
339
    else:
340
      return results
341

    
342
  def ExecOp(self, retry, *ops):
343
    """Execute one or more opcodes and manage the exec buffer.
344

    
345
    @return: if only opcode has been passed, we return its result;
346
        otherwise we return the list of results
347

    
348
    """
349
    if retry:
350
      rval = MAX_RETRIES
351
    else:
352
      rval = 0
353
    self._SetDebug(ops)
354
    return self.MaybeRetry(rval, "opcode", self._ExecOp, *ops)
355

    
356
  def ExecOrQueue(self, name, ops, post_process=None):
357
    """Execute an opcode and manage the exec buffer."""
358
    if self.opts.parallel:
359
      self._SetDebug(ops)
360
      self.queued_ops.append((ops, name, post_process))
361
    else:
362
      val = self.ExecOp(self.queue_retry, *ops) # pylint: disable-msg=W0142
363
      if post_process is not None:
364
        post_process()
365
      return val
366

    
367
  def StartBatch(self, retry):
368
    """Start a new batch of jobs.
369

    
370
    @param retry: whether this is a retryable batch
371

    
372
    """
373
    self.queued_ops = []
374
    self.queue_retry = retry
375

    
376
  def CommitQueue(self):
377
    """Execute all submitted opcodes in case of parallel burnin"""
378
    if not self.opts.parallel or not self.queued_ops:
379
      return
380

    
381
    if self.queue_retry:
382
      rval = MAX_RETRIES
383
    else:
384
      rval = 0
385

    
386
    try:
387
      results = self.MaybeRetry(rval, "jobset", self.ExecJobSet,
388
                                self.queued_ops)
389
    finally:
390
      self.queued_ops = []
391
    return results
392

    
393
  def ExecJobSet(self, jobs):
394
    """Execute a set of jobs and return once all are done.
395

    
396
    The method will return the list of results, if all jobs are
397
    successful. Otherwise, OpExecError will be raised from within
398
    cli.py.
399

    
400
    """
401
    self.ClearFeedbackBuf()
402
    jex = cli.JobExecutor(cl=self.cl, feedback_fn=self.Feedback)
403
    for ops, name, _ in jobs:
404
      jex.QueueJob(name, *ops) # pylint: disable-msg=W0142
405
    try:
406
      results = jex.GetResults()
407
    except Exception, err: # pylint: disable-msg=W0703
408
      Log("Jobs failed: %s", err)
409
      raise BurninFailure()
410

    
411
    fail = False
412
    val = []
413
    for (_, name, post_process), (success, result) in zip(jobs, results):
414
      if success:
415
        if post_process:
416
          try:
417
            post_process()
418
          except Exception, err: # pylint: disable-msg=W0703
419
            Log("Post process call for job %s failed: %s", name, err)
420
            fail = True
421
        val.append(result)
422
      else:
423
        fail = True
424

    
425
    if fail:
426
      raise BurninFailure()
427

    
428
    return val
429

    
430
  def ParseOptions(self):
431
    """Parses the command line options.
432

    
433
    In case of command line errors, it will show the usage and exit the
434
    program.
435

    
436
    """
437
    parser = optparse.OptionParser(usage="\n%s" % USAGE,
438
                                   version=("%%prog (ganeti) %s" %
439
                                            constants.RELEASE_VERSION),
440
                                   option_list=OPTIONS)
441

    
442
    options, args = parser.parse_args()
443
    if len(args) < 1 or options.os is None:
444
      Usage()
445

    
446
    supported_disk_templates = (constants.DT_DISKLESS,
447
                                constants.DT_FILE,
448
                                constants.DT_PLAIN,
449
                                constants.DT_DRBD8)
450
    if options.disk_template not in supported_disk_templates:
451
      Err("Unknown disk template '%s'" % options.disk_template)
452

    
453
    if options.disk_template == constants.DT_DISKLESS:
454
      disk_size = disk_growth = []
455
      options.do_addremove_disks = False
456
    else:
457
      disk_size = [utils.ParseUnit(v) for v in options.disk_size.split(",")]
458
      disk_growth = [utils.ParseUnit(v)
459
                     for v in options.disk_growth.split(",")]
460
      if len(disk_growth) != len(disk_size):
461
        Err("Wrong disk sizes/growth combination")
462
    if ((disk_size and options.disk_template == constants.DT_DISKLESS) or
463
        (not disk_size and options.disk_template != constants.DT_DISKLESS)):
464
      Err("Wrong disk count/disk template combination")
465

    
466
    self.disk_size = disk_size
467
    self.disk_growth = disk_growth
468
    self.disk_count = len(disk_size)
469

    
470
    if options.nodes and options.iallocator:
471
      Err("Give either the nodes option or the iallocator option, not both")
472

    
473
    if options.http_check and not options.name_check:
474
      Err("Can't enable HTTP checks without name checks")
475

    
476
    self.opts = options
477
    self.instances = args
478
    self.bep = {
479
      constants.BE_MEMORY: options.mem_size,
480
      constants.BE_VCPUS: 1,
481
      }
482

    
483
    self.hypervisor = None
484
    self.hvp = {}
485
    if options.hypervisor:
486
      self.hypervisor, self.hvp = options.hypervisor
487

    
488
    if options.reboot_types is None:
489
      options.reboot_types = constants.REBOOT_TYPES
490
    else:
491
      options.reboot_types = options.reboot_types.split(",")
492
      rt_diff = set(options.reboot_types).difference(constants.REBOOT_TYPES)
493
      if rt_diff:
494
        Err("Invalid reboot types specified: %s" % utils.CommaJoin(rt_diff))
495

    
496
    socket.setdefaulttimeout(options.net_timeout)
497

    
498
  def GetState(self):
499
    """Read the cluster state from the master daemon."""
500
    if self.opts.nodes:
501
      names = self.opts.nodes.split(",")
502
    else:
503
      names = []
504
    try:
505
      op = opcodes.OpQueryNodes(output_fields=["name", "offline", "drained"],
506
                                names=names, use_locking=True)
507
      result = self.ExecOp(True, op)
508
    except errors.GenericError, err:
509
      err_code, msg = cli.FormatError(err)
510
      Err(msg, exit_code=err_code)
511
    self.nodes = [data[0] for data in result if not (data[1] or data[2])]
512

    
513
    op_diagnose = opcodes.OpDiagnoseOS(output_fields=["name",
514
                                                      "variants",
515
                                                      "hidden"],
516
                                       names=[])
517
    result = self.ExecOp(True, op_diagnose)
518

    
519
    if not result:
520
      Err("Can't get the OS list")
521

    
522
    found = False
523
    for (name, variants, _) in result:
524
      if self.opts.os in cli.CalculateOSNames(name, variants):
525
        found = True
526
        break
527

    
528
    if not found:
529
      Err("OS '%s' not found" % self.opts.os)
530

    
531
    cluster_info = self.cl.QueryClusterInfo()
532
    self.cluster_info = cluster_info
533
    if not self.cluster_info:
534
      Err("Can't get cluster info")
535

    
536
    default_nic_params = self.cluster_info["nicparams"][constants.PP_DEFAULT]
537
    self.cluster_default_nicparams = default_nic_params
538
    if self.hypervisor is None:
539
      self.hypervisor = self.cluster_info["default_hypervisor"]
540
    self.hv_class = hypervisor.GetHypervisorClass(self.hypervisor)
541

    
542
  @_DoCheckInstances
543
  @_DoBatch(False)
544
  def BurnCreateInstances(self):
545
    """Create the given instances.
546

    
547
    """
548
    self.to_rem = []
549
    mytor = izip(cycle(self.nodes),
550
                 islice(cycle(self.nodes), 1, None),
551
                 self.instances)
552

    
553
    Log("Creating instances")
554
    for pnode, snode, instance in mytor:
555
      Log("instance %s", instance, indent=1)
556
      if self.opts.iallocator:
557
        pnode = snode = None
558
        msg = "with iallocator %s" % self.opts.iallocator
559
      elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
560
        snode = None
561
        msg = "on %s" % pnode
562
      else:
563
        msg = "on %s, %s" % (pnode, snode)
564

    
565
      Log(msg, indent=2)
566

    
567
      op = opcodes.OpInstanceCreate(instance_name=instance,
568
                                    disks = [ {"size": size}
569
                                              for size in self.disk_size],
570
                                    disk_template=self.opts.disk_template,
571
                                    nics=self.opts.nics,
572
                                    mode=constants.INSTANCE_CREATE,
573
                                    os_type=self.opts.os,
574
                                    pnode=pnode,
575
                                    snode=snode,
576
                                    start=True,
577
                                    ip_check=self.opts.ip_check,
578
                                    name_check=self.opts.name_check,
579
                                    wait_for_sync=True,
580
                                    file_driver="loop",
581
                                    file_storage_dir=None,
582
                                    iallocator=self.opts.iallocator,
583
                                    beparams=self.bep,
584
                                    hvparams=self.hvp,
585
                                    hypervisor=self.hypervisor,
586
                                    osparams=self.opts.osparams,
587
                                    )
588
      remove_instance = lambda name: lambda: self.to_rem.append(name)
589
      self.ExecOrQueue(instance, [op], post_process=remove_instance(instance))
590

    
591
  @_DoBatch(False)
592
  def BurnGrowDisks(self):
593
    """Grow both the os and the swap disks by the requested amount, if any."""
594
    Log("Growing disks")
595
    for instance in self.instances:
596
      Log("instance %s", instance, indent=1)
597
      for idx, growth in enumerate(self.disk_growth):
598
        if growth > 0:
599
          op = opcodes.OpInstanceGrowDisk(instance_name=instance, disk=idx,
600
                                          amount=growth, wait_for_sync=True)
601
          Log("increase disk/%s by %s MB", idx, growth, indent=2)
602
          self.ExecOrQueue(instance, [op])
603

    
604
  @_DoBatch(True)
605
  def BurnReplaceDisks1D8(self):
606
    """Replace disks on primary and secondary for drbd8."""
607
    Log("Replacing disks on the same nodes")
608
    early_release = self.opts.early_release
609
    for instance in self.instances:
610
      Log("instance %s", instance, indent=1)
611
      ops = []
612
      for mode in constants.REPLACE_DISK_SEC, constants.REPLACE_DISK_PRI:
613
        op = opcodes.OpInstanceReplaceDisks(instance_name=instance,
614
                                            mode=mode,
615
                                            disks=list(range(self.disk_count)),
616
                                            early_release=early_release)
617
        Log("run %s", mode, indent=2)
618
        ops.append(op)
619
      self.ExecOrQueue(instance, ops)
620

    
621
  @_DoBatch(True)
622
  def BurnReplaceDisks2(self):
623
    """Replace secondary node."""
624
    Log("Changing the secondary node")
625
    mode = constants.REPLACE_DISK_CHG
626

    
627
    mytor = izip(islice(cycle(self.nodes), 2, None),
628
                 self.instances)
629
    for tnode, instance in mytor:
630
      Log("instance %s", instance, indent=1)
631
      if self.opts.iallocator:
632
        tnode = None
633
        msg = "with iallocator %s" % self.opts.iallocator
634
      else:
635
        msg = tnode
636
      op = opcodes.OpInstanceReplaceDisks(instance_name=instance,
637
                                          mode=mode,
638
                                          remote_node=tnode,
639
                                          iallocator=self.opts.iallocator,
640
                                          disks=[],
641
                                          early_release=self.opts.early_release)
642
      Log("run %s %s", mode, msg, indent=2)
643
      self.ExecOrQueue(instance, [op])
644

    
645
  @_DoCheckInstances
646
  @_DoBatch(False)
647
  def BurnFailover(self):
648
    """Failover the instances."""
649
    Log("Failing over instances")
650
    for instance in self.instances:
651
      Log("instance %s", instance, indent=1)
652
      op = opcodes.OpInstanceFailover(instance_name=instance,
653
                                      ignore_consistency=False)
654
      self.ExecOrQueue(instance, [op])
655

    
656
  @_DoCheckInstances
657
  @_DoBatch(False)
658
  def BurnMove(self):
659
    """Move the instances."""
660
    Log("Moving instances")
661
    mytor = izip(islice(cycle(self.nodes), 1, None),
662
                 self.instances)
663
    for tnode, instance in mytor:
664
      Log("instance %s", instance, indent=1)
665
      op = opcodes.OpInstanceMove(instance_name=instance,
666
                                  target_node=tnode)
667
      self.ExecOrQueue(instance, [op])
668

    
669
  @_DoBatch(False)
670
  def BurnMigrate(self):
671
    """Migrate the instances."""
672
    Log("Migrating instances")
673
    for instance in self.instances:
674
      Log("instance %s", instance, indent=1)
675
      op1 = opcodes.OpInstanceMigrate(instance_name=instance, mode=None,
676
                                      cleanup=False)
677

    
678
      op2 = opcodes.OpInstanceMigrate(instance_name=instance, mode=None,
679
                                      cleanup=True)
680
      Log("migration and migration cleanup", indent=2)
681
      self.ExecOrQueue(instance, [op1, op2])
682

    
683
  @_DoCheckInstances
684
  @_DoBatch(False)
685
  def BurnImportExport(self):
686
    """Export the instance, delete it, and import it back.
687

    
688
    """
689
    Log("Exporting and re-importing instances")
690
    mytor = izip(cycle(self.nodes),
691
                 islice(cycle(self.nodes), 1, None),
692
                 islice(cycle(self.nodes), 2, None),
693
                 self.instances)
694

    
695
    for pnode, snode, enode, instance in mytor:
696
      Log("instance %s", instance, indent=1)
697
      # read the full name of the instance
698
      nam_op = opcodes.OpInstanceQuery(output_fields=["name"],
699
                                       names=[instance], use_locking=True)
700
      full_name = self.ExecOp(False, nam_op)[0][0]
701

    
702
      if self.opts.iallocator:
703
        pnode = snode = None
704
        import_log_msg = ("import from %s"
705
                          " with iallocator %s" %
706
                          (enode, self.opts.iallocator))
707
      elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
708
        snode = None
709
        import_log_msg = ("import from %s to %s" %
710
                          (enode, pnode))
711
      else:
712
        import_log_msg = ("import from %s to %s, %s" %
713
                          (enode, pnode, snode))
714

    
715
      exp_op = opcodes.OpBackupExport(instance_name=instance,
716
                                      target_node=enode,
717
                                      mode=constants.EXPORT_MODE_LOCAL,
718
                                      shutdown=True)
719
      rem_op = opcodes.OpInstanceRemove(instance_name=instance,
720
                                        ignore_failures=True)
721
      imp_dir = utils.PathJoin(constants.EXPORT_DIR, full_name)
722
      imp_op = opcodes.OpInstanceCreate(instance_name=instance,
723
                                        disks = [ {"size": size}
724
                                                  for size in self.disk_size],
725
                                        disk_template=self.opts.disk_template,
726
                                        nics=self.opts.nics,
727
                                        mode=constants.INSTANCE_IMPORT,
728
                                        src_node=enode,
729
                                        src_path=imp_dir,
730
                                        pnode=pnode,
731
                                        snode=snode,
732
                                        start=True,
733
                                        ip_check=self.opts.ip_check,
734
                                        name_check=self.opts.name_check,
735
                                        wait_for_sync=True,
736
                                        file_storage_dir=None,
737
                                        file_driver="loop",
738
                                        iallocator=self.opts.iallocator,
739
                                        beparams=self.bep,
740
                                        hvparams=self.hvp,
741
                                        osparams=self.opts.osparams,
742
                                        )
743

    
744
      erem_op = opcodes.OpBackupRemove(instance_name=instance)
745

    
746
      Log("export to node %s", enode, indent=2)
747
      Log("remove instance", indent=2)
748
      Log(import_log_msg, indent=2)
749
      Log("remove export", indent=2)
750
      self.ExecOrQueue(instance, [exp_op, rem_op, imp_op, erem_op])
751

    
752
  @staticmethod
753
  def StopInstanceOp(instance):
754
    """Stop given instance."""
755
    return opcodes.OpInstanceShutdown(instance_name=instance)
756

    
757
  @staticmethod
758
  def StartInstanceOp(instance):
759
    """Start given instance."""
760
    return opcodes.OpStartupInstance(instance_name=instance, force=False)
761

    
762
  @staticmethod
763
  def RenameInstanceOp(instance, instance_new):
764
    """Rename instance."""
765
    return opcodes.OpInstanceRename(instance_name=instance,
766
                                    new_name=instance_new)
767

    
768
  @_DoCheckInstances
769
  @_DoBatch(True)
770
  def BurnStopStart(self):
771
    """Stop/start the instances."""
772
    Log("Stopping and starting instances")
773
    for instance in self.instances:
774
      Log("instance %s", instance, indent=1)
775
      op1 = self.StopInstanceOp(instance)
776
      op2 = self.StartInstanceOp(instance)
777
      self.ExecOrQueue(instance, [op1, op2])
778

    
779
  @_DoBatch(False)
780
  def BurnRemove(self):
781
    """Remove the instances."""
782
    Log("Removing instances")
783
    for instance in self.to_rem:
784
      Log("instance %s", instance, indent=1)
785
      op = opcodes.OpInstanceRemove(instance_name=instance,
786
                                    ignore_failures=True)
787
      self.ExecOrQueue(instance, [op])
788

    
789
  def BurnRename(self):
790
    """Rename the instances.
791

    
792
    Note that this function will not execute in parallel, since we
793
    only have one target for rename.
794

    
795
    """
796
    Log("Renaming instances")
797
    rename = self.opts.rename
798
    for instance in self.instances:
799
      Log("instance %s", instance, indent=1)
800
      op_stop1 = self.StopInstanceOp(instance)
801
      op_stop2 = self.StopInstanceOp(rename)
802
      op_rename1 = self.RenameInstanceOp(instance, rename)
803
      op_rename2 = self.RenameInstanceOp(rename, instance)
804
      op_start1 = self.StartInstanceOp(rename)
805
      op_start2 = self.StartInstanceOp(instance)
806
      self.ExecOp(False, op_stop1, op_rename1, op_start1)
807
      self._CheckInstanceAlive(rename)
808
      self.ExecOp(False, op_stop2, op_rename2, op_start2)
809
      self._CheckInstanceAlive(instance)
810

    
811
  @_DoCheckInstances
812
  @_DoBatch(True)
813
  def BurnReinstall(self):
814
    """Reinstall the instances."""
815
    Log("Reinstalling instances")
816
    for instance in self.instances:
817
      Log("instance %s", instance, indent=1)
818
      op1 = self.StopInstanceOp(instance)
819
      op2 = opcodes.OpInstanceReinstall(instance_name=instance)
820
      Log("reinstall without passing the OS", indent=2)
821
      op3 = opcodes.OpInstanceReinstall(instance_name=instance,
822
                                        os_type=self.opts.os)
823
      Log("reinstall specifying the OS", indent=2)
824
      op4 = self.StartInstanceOp(instance)
825
      self.ExecOrQueue(instance, [op1, op2, op3, op4])
826

    
827
  @_DoCheckInstances
828
  @_DoBatch(True)
829
  def BurnReboot(self):
830
    """Reboot the instances."""
831
    Log("Rebooting instances")
832
    for instance in self.instances:
833
      Log("instance %s", instance, indent=1)
834
      ops = []
835
      for reboot_type in self.opts.reboot_types:
836
        op = opcodes.OpInstanceReboot(instance_name=instance,
837
                                      reboot_type=reboot_type,
838
                                      ignore_secondaries=False)
839
        Log("reboot with type '%s'", reboot_type, indent=2)
840
        ops.append(op)
841
      self.ExecOrQueue(instance, ops)
842

    
843
  @_DoCheckInstances
844
  @_DoBatch(True)
845
  def BurnActivateDisks(self):
846
    """Activate and deactivate disks of the instances."""
847
    Log("Activating/deactivating disks")
848
    for instance in self.instances:
849
      Log("instance %s", instance, indent=1)
850
      op_start = self.StartInstanceOp(instance)
851
      op_act = opcodes.OpInstanceActivateDisks(instance_name=instance)
852
      op_deact = opcodes.OpInstanceDeactivateDisks(instance_name=instance)
853
      op_stop = self.StopInstanceOp(instance)
854
      Log("activate disks when online", indent=2)
855
      Log("activate disks when offline", indent=2)
856
      Log("deactivate disks (when offline)", indent=2)
857
      self.ExecOrQueue(instance, [op_act, op_stop, op_act, op_deact, op_start])
858

    
859
  @_DoCheckInstances
860
  @_DoBatch(False)
861
  def BurnAddRemoveDisks(self):
862
    """Add and remove an extra disk for the instances."""
863
    Log("Adding and removing disks")
864
    for instance in self.instances:
865
      Log("instance %s", instance, indent=1)
866
      op_add = opcodes.OpInstanceSetParams(\
867
        instance_name=instance,
868
        disks=[(constants.DDM_ADD, {"size": self.disk_size[0]})])
869
      op_rem = opcodes.OpInstanceSetParams(\
870
        instance_name=instance, disks=[(constants.DDM_REMOVE, {})])
871
      op_stop = self.StopInstanceOp(instance)
872
      op_start = self.StartInstanceOp(instance)
873
      Log("adding a disk", indent=2)
874
      Log("removing last disk", indent=2)
875
      self.ExecOrQueue(instance, [op_add, op_stop, op_rem, op_start])
876

    
877
  @_DoBatch(False)
878
  def BurnAddRemoveNICs(self):
879
    """Add and remove an extra NIC for the instances."""
880
    Log("Adding and removing NICs")
881
    for instance in self.instances:
882
      Log("instance %s", instance, indent=1)
883
      op_add = opcodes.OpInstanceSetParams(\
884
        instance_name=instance, nics=[(constants.DDM_ADD, {})])
885
      op_rem = opcodes.OpInstanceSetParams(\
886
        instance_name=instance, nics=[(constants.DDM_REMOVE, {})])
887
      Log("adding a NIC", indent=2)
888
      Log("removing last NIC", indent=2)
889
      self.ExecOrQueue(instance, [op_add, op_rem])
890

    
891
  def ConfdCallback(self, reply):
892
    """Callback for confd queries"""
893
    if reply.type == confd_client.UPCALL_REPLY:
894
      if reply.server_reply.status != constants.CONFD_REPL_STATUS_OK:
895
        Err("Query %s gave non-ok status %s: %s" % (reply.orig_request,
896
                                                    reply.server_reply.status,
897
                                                    reply.server_reply))
898
      if reply.orig_request.type == constants.CONFD_REQ_PING:
899
        Log("Ping: OK", indent=1)
900
      elif reply.orig_request.type == constants.CONFD_REQ_CLUSTER_MASTER:
901
        if reply.server_reply.answer == self.cluster_info["master"]:
902
          Log("Master: OK", indent=1)
903
        else:
904
          Err("Master: wrong: %s" % reply.server_reply.answer)
905
      elif reply.orig_request.type == constants.CONFD_REQ_NODE_ROLE_BYNAME:
906
        if reply.server_reply.answer == constants.CONFD_NODE_ROLE_MASTER:
907
          Log("Node role for master: OK", indent=1)
908
        else:
909
          Err("Node role for master: wrong: %s" % reply.server_reply.answer)
910

    
911
  def DoConfdRequestReply(self, req):
912
    self.confd_counting_callback.RegisterQuery(req.rsalt)
913
    self.confd_client.SendRequest(req, async=False)
914
    while not self.confd_counting_callback.AllAnswered():
915
      if not self.confd_client.ReceiveReply():
916
        Err("Did not receive all expected confd replies")
917
        break
918

    
919
  def BurnConfd(self):
920
    """Run confd queries for our instances.
921

    
922
    The following confd queries are tested:
923
      - CONFD_REQ_PING: simple ping
924
      - CONFD_REQ_CLUSTER_MASTER: cluster master
925
      - CONFD_REQ_NODE_ROLE_BYNAME: node role, for the master
926

    
927
    """
928
    Log("Checking confd results")
929

    
930
    filter_callback = confd_client.ConfdFilterCallback(self.ConfdCallback)
931
    counting_callback = confd_client.ConfdCountingCallback(filter_callback)
932
    self.confd_counting_callback = counting_callback
933

    
934
    self.confd_client = confd_client.GetConfdClient(counting_callback)
935

    
936
    req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
937
    self.DoConfdRequestReply(req)
938

    
939
    req = confd_client.ConfdClientRequest(
940
      type=constants.CONFD_REQ_CLUSTER_MASTER)
941
    self.DoConfdRequestReply(req)
942

    
943
    req = confd_client.ConfdClientRequest(
944
        type=constants.CONFD_REQ_NODE_ROLE_BYNAME,
945
        query=self.cluster_info["master"])
946
    self.DoConfdRequestReply(req)
947

    
948
  def _CheckInstanceAlive(self, instance):
949
    """Check if an instance is alive by doing http checks.
950

    
951
    This will try to retrieve the url on the instance /hostname.txt
952
    and check that it contains the hostname of the instance. In case
953
    we get ECONNREFUSED, we retry up to the net timeout seconds, for
954
    any other error we abort.
955

    
956
    """
957
    if not self.opts.http_check:
958
      return
959
    end_time = time.time() + self.opts.net_timeout
960
    url = None
961
    while time.time() < end_time and url is None:
962
      try:
963
        url = self.url_opener.open("http://%s/hostname.txt" % instance)
964
      except IOError:
965
        # here we can have connection refused, no route to host, etc.
966
        time.sleep(1)
967
    if url is None:
968
      raise InstanceDown(instance, "Cannot contact instance")
969
    hostname = url.read().strip()
970
    url.close()
971
    if hostname != instance:
972
      raise InstanceDown(instance, ("Hostname mismatch, expected %s, got %s" %
973
                                    (instance, hostname)))
974

    
975
  def BurninCluster(self):
976
    """Test a cluster intensively.
977

    
978
    This will create instances and then start/stop/failover them.
979
    It is safe for existing instances but could impact performance.
980

    
981
    """
982

    
983
    opts = self.opts
984

    
985
    Log("Testing global parameters")
986

    
987
    if (len(self.nodes) == 1 and
988
        opts.disk_template not in (constants.DT_DISKLESS, constants.DT_PLAIN,
989
                                   constants.DT_FILE)):
990
      Err("When one node is available/selected the disk template must"
991
          " be 'diskless', 'file' or 'plain'")
992

    
993
    has_err = True
994
    try:
995
      self.BurnCreateInstances()
996
      if opts.do_replace1 and opts.disk_template in constants.DTS_NET_MIRROR:
997
        self.BurnReplaceDisks1D8()
998
      if (opts.do_replace2 and len(self.nodes) > 2 and
999
          opts.disk_template in constants.DTS_NET_MIRROR) :
1000
        self.BurnReplaceDisks2()
1001

    
1002
      if (opts.disk_template in constants.DTS_GROWABLE and
1003
          compat.any(n > 0 for n in self.disk_growth)):
1004
        self.BurnGrowDisks()
1005

    
1006
      if opts.do_failover and opts.disk_template in constants.DTS_NET_MIRROR:
1007
        self.BurnFailover()
1008

    
1009
      if opts.do_migrate:
1010
        if opts.disk_template != constants.DT_DRBD8:
1011
          Log("Skipping migration (disk template not DRBD8)")
1012
        elif not self.hv_class.CAN_MIGRATE:
1013
          Log("Skipping migration (hypervisor %s does not support it)",
1014
              self.hypervisor)
1015
        else:
1016
          self.BurnMigrate()
1017

    
1018
      if (opts.do_move and len(self.nodes) > 1 and
1019
          opts.disk_template in [constants.DT_PLAIN, constants.DT_FILE]):
1020
        self.BurnMove()
1021

    
1022
      if (opts.do_importexport and
1023
          opts.disk_template not in (constants.DT_DISKLESS,
1024
                                     constants.DT_FILE)):
1025
        self.BurnImportExport()
1026

    
1027
      if opts.do_reinstall:
1028
        self.BurnReinstall()
1029

    
1030
      if opts.do_reboot:
1031
        self.BurnReboot()
1032

    
1033
      if opts.do_addremove_disks:
1034
        self.BurnAddRemoveDisks()
1035

    
1036
      default_nic_mode = self.cluster_default_nicparams[constants.NIC_MODE]
1037
      # Don't add/remove nics in routed mode, as we would need an ip to add
1038
      # them with
1039
      if opts.do_addremove_nics:
1040
        if default_nic_mode == constants.NIC_MODE_BRIDGED:
1041
          self.BurnAddRemoveNICs()
1042
        else:
1043
          Log("Skipping nic add/remove as the cluster is not in bridged mode")
1044

    
1045
      if opts.do_activate_disks:
1046
        self.BurnActivateDisks()
1047

    
1048
      if opts.rename:
1049
        self.BurnRename()
1050

    
1051
      if opts.do_confd_tests:
1052
        self.BurnConfd()
1053

    
1054
      if opts.do_startstop:
1055
        self.BurnStopStart()
1056

    
1057
      has_err = False
1058
    finally:
1059
      if has_err:
1060
        Log("Error detected: opcode buffer follows:\n\n")
1061
        Log(self.GetFeedbackBuf())
1062
        Log("\n\n")
1063
      if not self.opts.keep_instances:
1064
        try:
1065
          self.BurnRemove()
1066
        except Exception, err:  # pylint: disable-msg=W0703
1067
          if has_err: # already detected errors, so errors in removal
1068
                      # are quite expected
1069
            Log("Note: error detected during instance remove: %s", err)
1070
          else: # non-expected error
1071
            raise
1072

    
1073
    return 0
1074

    
1075

    
1076
def main():
1077
  """Main function"""
1078

    
1079
  burner = Burner()
1080
  return burner.BurninCluster()
1081

    
1082

    
1083
if __name__ == "__main__":
1084
  main()