Statistics
| Branch: | Tag: | Revision:

root / tools / burnin @ 42959a7f

History | View | Annotate | Download (41.5 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Burnin program
23

    
24
"""
25

    
26
import sys
27
import optparse
28
import time
29
import socket
30
import urllib
31
from itertools import izip, islice, cycle
32
from cStringIO import StringIO
33

    
34
from ganeti import opcodes
35
from ganeti import constants
36
from ganeti import cli
37
from ganeti import errors
38
from ganeti import utils
39
from ganeti import hypervisor
40
from ganeti import compat
41
from ganeti import pathutils
42

    
43
from ganeti.confd import client as confd_client
44

    
45

    
46
USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...")
47

    
48
MAX_RETRIES = 3
49
LOG_HEADERS = {
50
  0: "- ",
51
  1: "* ",
52
  2: "",
53
  }
54

    
55
#: Disk templates supporting a single node
56
_SINGLE_NODE_DISK_TEMPLATES = compat.UniqueFrozenset([
57
  constants.DT_DISKLESS,
58
  constants.DT_PLAIN,
59
  constants.DT_FILE,
60
  constants.DT_SHARED_FILE,
61
  ])
62

    
63
_SUPPORTED_DISK_TEMPLATES = compat.UniqueFrozenset([
64
  constants.DT_DISKLESS,
65
  constants.DT_DRBD8,
66
  constants.DT_EXT,
67
  constants.DT_FILE,
68
  constants.DT_PLAIN,
69
  constants.DT_RBD,
70
  constants.DT_SHARED_FILE,
71
  ])
72

    
73
#: Disk templates for which import/export is tested
74
_IMPEXP_DISK_TEMPLATES = (_SUPPORTED_DISK_TEMPLATES - frozenset([
75
  constants.DT_DISKLESS,
76
  constants.DT_FILE,
77
  constants.DT_SHARED_FILE,
78
  ]))
79

    
80

    
81
class InstanceDown(Exception):
82
  """The checked instance was not up"""
83

    
84

    
85
class BurninFailure(Exception):
86
  """Failure detected during burning"""
87

    
88

    
89
def Usage():
90
  """Shows program usage information and exits the program."""
91

    
92
  print >> sys.stderr, "Usage:"
93
  print >> sys.stderr, USAGE
94
  sys.exit(2)
95

    
96

    
97
def Log(msg, *args, **kwargs):
98
  """Simple function that prints out its argument.
99

    
100
  """
101
  if args:
102
    msg = msg % args
103
  indent = kwargs.get("indent", 0)
104
  sys.stdout.write("%*s%s%s\n" % (2 * indent, "",
105
                                  LOG_HEADERS.get(indent, "  "), msg))
106
  sys.stdout.flush()
107

    
108

    
109
def Err(msg, exit_code=1):
110
  """Simple error logging that prints to stderr.
111

    
112
  """
113
  sys.stderr.write(msg + "\n")
114
  sys.stderr.flush()
115
  sys.exit(exit_code)
116

    
117

    
118
class SimpleOpener(urllib.FancyURLopener):
119
  """A simple url opener"""
120
  # pylint: disable=W0221
121

    
122
  def prompt_user_passwd(self, host, realm, clear_cache=0):
123
    """No-interaction version of prompt_user_passwd."""
124
    # we follow parent class' API
125
    # pylint: disable=W0613
126
    return None, None
127

    
128
  def http_error_default(self, url, fp, errcode, errmsg, headers):
129
    """Custom error handling"""
130
    # make sure sockets are not left in CLOSE_WAIT, this is similar
131
    # but with a different exception to the BasicURLOpener class
132
    _ = fp.read() # throw away data
133
    fp.close()
134
    raise InstanceDown("HTTP error returned: code %s, msg %s" %
135
                       (errcode, errmsg))
136

    
137

    
138
OPTIONS = [
139
  cli.cli_option("-o", "--os", dest="os", default=None,
140
                 help="OS to use during burnin",
141
                 metavar="<OS>",
142
                 completion_suggest=cli.OPT_COMPL_ONE_OS),
143
  cli.HYPERVISOR_OPT,
144
  cli.OSPARAMS_OPT,
145
  cli.cli_option("--disk-size", dest="disk_size",
146
                 help="Disk size (determines disk count)",
147
                 default="128m", type="string", metavar="<size,size,...>",
148
                 completion_suggest=("128M 512M 1G 4G 1G,256M"
149
                                     " 4G,1G,1G 10G").split()),
150
  cli.cli_option("--disk-growth", dest="disk_growth", help="Disk growth",
151
                 default="128m", type="string", metavar="<size,size,...>"),
152
  cli.cli_option("--mem-size", dest="mem_size", help="Memory size",
153
                 default=None, type="unit", metavar="<size>",
154
                 completion_suggest=("128M 256M 512M 1G 4G 8G"
155
                                     " 12G 16G").split()),
156
  cli.cli_option("--maxmem-size", dest="maxmem_size", help="Max Memory size",
157
                 default=256, type="unit", metavar="<size>",
158
                 completion_suggest=("128M 256M 512M 1G 4G 8G"
159
                                     " 12G 16G").split()),
160
  cli.cli_option("--minmem-size", dest="minmem_size", help="Min Memory size",
161
                 default=128, type="unit", metavar="<size>",
162
                 completion_suggest=("128M 256M 512M 1G 4G 8G"
163
                                     " 12G 16G").split()),
164
  cli.cli_option("--vcpu-count", dest="vcpu_count", help="VCPU count",
165
                 default=3, type="unit", metavar="<count>",
166
                 completion_suggest=("1 2 3 4").split()),
167
  cli.DEBUG_OPT,
168
  cli.VERBOSE_OPT,
169
  cli.NOIPCHECK_OPT,
170
  cli.NONAMECHECK_OPT,
171
  cli.EARLY_RELEASE_OPT,
172
  cli.cli_option("--no-replace1", dest="do_replace1",
173
                 help="Skip disk replacement with the same secondary",
174
                 action="store_false", default=True),
175
  cli.cli_option("--no-replace2", dest="do_replace2",
176
                 help="Skip disk replacement with a different secondary",
177
                 action="store_false", default=True),
178
  cli.cli_option("--no-failover", dest="do_failover",
179
                 help="Skip instance failovers", action="store_false",
180
                 default=True),
181
  cli.cli_option("--no-migrate", dest="do_migrate",
182
                 help="Skip instance live migration",
183
                 action="store_false", default=True),
184
  cli.cli_option("--no-move", dest="do_move",
185
                 help="Skip instance moves", action="store_false",
186
                 default=True),
187
  cli.cli_option("--no-importexport", dest="do_importexport",
188
                 help="Skip instance export/import", action="store_false",
189
                 default=True),
190
  cli.cli_option("--no-startstop", dest="do_startstop",
191
                 help="Skip instance stop/start", action="store_false",
192
                 default=True),
193
  cli.cli_option("--no-reinstall", dest="do_reinstall",
194
                 help="Skip instance reinstall", action="store_false",
195
                 default=True),
196
  cli.cli_option("--no-reboot", dest="do_reboot",
197
                 help="Skip instance reboot", action="store_false",
198
                 default=True),
199
  cli.cli_option("--no-renamesame", dest="do_renamesame",
200
                 help="Skip instance rename to same name", action="store_false",
201
                 default=True),
202
  cli.cli_option("--reboot-types", dest="reboot_types",
203
                 help="Specify the reboot types", default=None),
204
  cli.cli_option("--no-activate-disks", dest="do_activate_disks",
205
                 help="Skip disk activation/deactivation",
206
                 action="store_false", default=True),
207
  cli.cli_option("--no-add-disks", dest="do_addremove_disks",
208
                 help="Skip disk addition/removal",
209
                 action="store_false", default=True),
210
  cli.cli_option("--no-add-nics", dest="do_addremove_nics",
211
                 help="Skip NIC addition/removal",
212
                 action="store_false", default=True),
213
  cli.cli_option("--no-nics", dest="nics",
214
                 help="No network interfaces", action="store_const",
215
                 const=[], default=[{}]),
216
  cli.cli_option("--no-confd", dest="do_confd_tests",
217
                 help="Skip confd queries",
218
                 action="store_false", default=constants.ENABLE_CONFD),
219
  cli.cli_option("--rename", dest="rename", default=None,
220
                 help=("Give one unused instance name which is taken"
221
                       " to start the renaming sequence"),
222
                 metavar="<instance_name>"),
223
  cli.cli_option("-t", "--disk-template", dest="disk_template",
224
                 choices=list(_SUPPORTED_DISK_TEMPLATES),
225
                 default=constants.DT_DRBD8,
226
                 help=("Disk template (default %s, otherwise one of %s)" %
227
                       (constants.DT_DRBD8,
228
                        utils.CommaJoin(_SUPPORTED_DISK_TEMPLATES)))),
229
  cli.cli_option("-n", "--nodes", dest="nodes", default="",
230
                 help=("Comma separated list of nodes to perform"
231
                       " the burnin on (defaults to all nodes)"),
232
                 completion_suggest=cli.OPT_COMPL_MANY_NODES),
233
  cli.cli_option("-I", "--iallocator", dest="iallocator",
234
                 default=None, type="string",
235
                 help=("Perform the allocation using an iallocator"
236
                       " instead of fixed node spread (node restrictions no"
237
                       " longer apply, therefore -n/--nodes must not be"
238
                       " used"),
239
                 completion_suggest=cli.OPT_COMPL_ONE_IALLOCATOR),
240
  cli.cli_option("-p", "--parallel", default=False, action="store_true",
241
                 dest="parallel",
242
                 help=("Enable parallelization of some operations in"
243
                       " order to speed burnin or to test granular locking")),
244
  cli.cli_option("--net-timeout", default=15, type="int",
245
                 dest="net_timeout",
246
                 help=("The instance check network timeout in seconds"
247
                       " (defaults to 15 seconds)"),
248
                 completion_suggest="15 60 300 900".split()),
249
  cli.cli_option("-C", "--http-check", default=False, action="store_true",
250
                 dest="http_check",
251
                 help=("Enable checking of instance status via http,"
252
                       " looking for /hostname.txt that should contain the"
253
                       " name of the instance")),
254
  cli.cli_option("-K", "--keep-instances", default=False,
255
                 action="store_true",
256
                 dest="keep_instances",
257
                 help=("Leave instances on the cluster after burnin,"
258
                       " for investigation in case of errors or simply"
259
                       " to use them")),
260
  ]
261

    
262
# Mainly used for bash completion
263
ARGUMENTS = [cli.ArgInstance(min=1)]
264

    
265

    
266
def _DoCheckInstances(fn):
267
  """Decorator for checking instances.
268

    
269
  """
270
  def wrapper(self, *args, **kwargs):
271
    val = fn(self, *args, **kwargs)
272
    for instance in self.instances:
273
      self._CheckInstanceAlive(instance) # pylint: disable=W0212
274
    return val
275

    
276
  return wrapper
277

    
278

    
279
def _DoBatch(retry):
280
  """Decorator for possible batch operations.
281

    
282
  Must come after the _DoCheckInstances decorator (if any).
283

    
284
  @param retry: whether this is a retryable batch, will be
285
      passed to StartBatch
286

    
287
  """
288
  def wrap(fn):
289
    def batched(self, *args, **kwargs):
290
      self.StartBatch(retry)
291
      val = fn(self, *args, **kwargs)
292
      self.CommitQueue()
293
      return val
294
    return batched
295

    
296
  return wrap
297

    
298

    
299
class Burner(object):
300
  """Burner class."""
301

    
302
  def __init__(self):
303
    """Constructor."""
304
    self.url_opener = SimpleOpener()
305
    self._feed_buf = StringIO()
306
    self.nodes = []
307
    self.instances = []
308
    self.to_rem = []
309
    self.queued_ops = []
310
    self.opts = None
311
    self.queue_retry = False
312
    self.disk_count = self.disk_growth = self.disk_size = None
313
    self.hvp = self.bep = None
314
    self.ParseOptions()
315
    self.cl = cli.GetClient()
316
    self.GetState()
317

    
318
  def ClearFeedbackBuf(self):
319
    """Clear the feedback buffer."""
320
    self._feed_buf.truncate(0)
321

    
322
  def GetFeedbackBuf(self):
323
    """Return the contents of the buffer."""
324
    return self._feed_buf.getvalue()
325

    
326
  def Feedback(self, msg):
327
    """Acumulate feedback in our buffer."""
328
    formatted_msg = "%s %s" % (time.ctime(utils.MergeTime(msg[0])), msg[2])
329
    self._feed_buf.write(formatted_msg + "\n")
330
    if self.opts.verbose:
331
      Log(formatted_msg, indent=3)
332

    
333
  def MaybeRetry(self, retry_count, msg, fn, *args):
334
    """Possibly retry a given function execution.
335

    
336
    @type retry_count: int
337
    @param retry_count: retry counter:
338
        - 0: non-retryable action
339
        - 1: last retry for a retryable action
340
        - MAX_RETRIES: original try for a retryable action
341
    @type msg: str
342
    @param msg: the kind of the operation
343
    @type fn: callable
344
    @param fn: the function to be called
345

    
346
    """
347
    try:
348
      val = fn(*args)
349
      if retry_count > 0 and retry_count < MAX_RETRIES:
350
        Log("Idempotent %s succeeded after %d retries",
351
            msg, MAX_RETRIES - retry_count)
352
      return val
353
    except Exception, err: # pylint: disable=W0703
354
      if retry_count == 0:
355
        Log("Non-idempotent %s failed, aborting", msg)
356
        raise
357
      elif retry_count == 1:
358
        Log("Idempotent %s repeated failure, aborting", msg)
359
        raise
360
      else:
361
        Log("Idempotent %s failed, retry #%d/%d: %s",
362
            msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err)
363
        self.MaybeRetry(retry_count - 1, msg, fn, *args)
364

    
365
  def _ExecOp(self, *ops):
366
    """Execute one or more opcodes and manage the exec buffer.
367

    
368
    @return: if only opcode has been passed, we return its result;
369
        otherwise we return the list of results
370

    
371
    """
372
    job_id = cli.SendJob(ops, cl=self.cl)
373
    results = cli.PollJob(job_id, cl=self.cl, feedback_fn=self.Feedback)
374
    if len(ops) == 1:
375
      return results[0]
376
    else:
377
      return results
378

    
379
  def ExecOp(self, retry, *ops):
380
    """Execute one or more opcodes and manage the exec buffer.
381

    
382
    @return: if only opcode has been passed, we return its result;
383
        otherwise we return the list of results
384

    
385
    """
386
    if retry:
387
      rval = MAX_RETRIES
388
    else:
389
      rval = 0
390
    cli.SetGenericOpcodeOpts(ops, self.opts)
391
    return self.MaybeRetry(rval, "opcode", self._ExecOp, *ops)
392

    
393
  def ExecOrQueue(self, name, ops, post_process=None):
394
    """Execute an opcode and manage the exec buffer."""
395
    if self.opts.parallel:
396
      cli.SetGenericOpcodeOpts(ops, self.opts)
397
      self.queued_ops.append((ops, name, post_process))
398
    else:
399
      val = self.ExecOp(self.queue_retry, *ops) # pylint: disable=W0142
400
      if post_process is not None:
401
        post_process()
402
      return val
403

    
404
  def StartBatch(self, retry):
405
    """Start a new batch of jobs.
406

    
407
    @param retry: whether this is a retryable batch
408

    
409
    """
410
    self.queued_ops = []
411
    self.queue_retry = retry
412

    
413
  def CommitQueue(self):
414
    """Execute all submitted opcodes in case of parallel burnin"""
415
    if not self.opts.parallel or not self.queued_ops:
416
      return
417

    
418
    if self.queue_retry:
419
      rval = MAX_RETRIES
420
    else:
421
      rval = 0
422

    
423
    try:
424
      results = self.MaybeRetry(rval, "jobset", self.ExecJobSet,
425
                                self.queued_ops)
426
    finally:
427
      self.queued_ops = []
428
    return results
429

    
430
  def ExecJobSet(self, jobs):
431
    """Execute a set of jobs and return once all are done.
432

    
433
    The method will return the list of results, if all jobs are
434
    successful. Otherwise, OpExecError will be raised from within
435
    cli.py.
436

    
437
    """
438
    self.ClearFeedbackBuf()
439
    jex = cli.JobExecutor(cl=self.cl, feedback_fn=self.Feedback)
440
    for ops, name, _ in jobs:
441
      jex.QueueJob(name, *ops) # pylint: disable=W0142
442
    try:
443
      results = jex.GetResults()
444
    except Exception, err: # pylint: disable=W0703
445
      Log("Jobs failed: %s", err)
446
      raise BurninFailure()
447

    
448
    fail = False
449
    val = []
450
    for (_, name, post_process), (success, result) in zip(jobs, results):
451
      if success:
452
        if post_process:
453
          try:
454
            post_process()
455
          except Exception, err: # pylint: disable=W0703
456
            Log("Post process call for job %s failed: %s", name, err)
457
            fail = True
458
        val.append(result)
459
      else:
460
        fail = True
461

    
462
    if fail:
463
      raise BurninFailure()
464

    
465
    return val
466

    
467
  def ParseOptions(self):
468
    """Parses the command line options.
469

    
470
    In case of command line errors, it will show the usage and exit the
471
    program.
472

    
473
    """
474
    parser = optparse.OptionParser(usage="\n%s" % USAGE,
475
                                   version=("%%prog (ganeti) %s" %
476
                                            constants.RELEASE_VERSION),
477
                                   option_list=OPTIONS)
478

    
479
    options, args = parser.parse_args()
480
    if len(args) < 1 or options.os is None:
481
      Usage()
482

    
483
    if options.mem_size:
484
      options.maxmem_size = options.mem_size
485
      options.minmem_size = options.mem_size
486
    elif options.minmem_size > options.maxmem_size:
487
      Err("Maximum memory lower than minimum memory")
488

    
489
    if options.disk_template not in _SUPPORTED_DISK_TEMPLATES:
490
      Err("Unknown or unsupported disk template '%s'" % options.disk_template)
491

    
492
    if options.disk_template == constants.DT_DISKLESS:
493
      disk_size = disk_growth = []
494
      options.do_addremove_disks = False
495
    else:
496
      disk_size = [utils.ParseUnit(v) for v in options.disk_size.split(",")]
497
      disk_growth = [utils.ParseUnit(v)
498
                     for v in options.disk_growth.split(",")]
499
      if len(disk_growth) != len(disk_size):
500
        Err("Wrong disk sizes/growth combination")
501
    if ((disk_size and options.disk_template == constants.DT_DISKLESS) or
502
        (not disk_size and options.disk_template != constants.DT_DISKLESS)):
503
      Err("Wrong disk count/disk template combination")
504

    
505
    self.disk_size = disk_size
506
    self.disk_growth = disk_growth
507
    self.disk_count = len(disk_size)
508

    
509
    if options.nodes and options.iallocator:
510
      Err("Give either the nodes option or the iallocator option, not both")
511

    
512
    if options.http_check and not options.name_check:
513
      Err("Can't enable HTTP checks without name checks")
514

    
515
    self.opts = options
516
    self.instances = args
517
    self.bep = {
518
      constants.BE_MINMEM: options.minmem_size,
519
      constants.BE_MAXMEM: options.maxmem_size,
520
      constants.BE_VCPUS: options.vcpu_count,
521
      }
522

    
523
    self.hypervisor = None
524
    self.hvp = {}
525
    if options.hypervisor:
526
      self.hypervisor, self.hvp = options.hypervisor
527

    
528
    if options.reboot_types is None:
529
      options.reboot_types = constants.REBOOT_TYPES
530
    else:
531
      options.reboot_types = options.reboot_types.split(",")
532
      rt_diff = set(options.reboot_types).difference(constants.REBOOT_TYPES)
533
      if rt_diff:
534
        Err("Invalid reboot types specified: %s" % utils.CommaJoin(rt_diff))
535

    
536
    socket.setdefaulttimeout(options.net_timeout)
537

    
538
  def GetState(self):
539
    """Read the cluster state from the master daemon."""
540
    if self.opts.nodes:
541
      names = self.opts.nodes.split(",")
542
    else:
543
      names = []
544
    try:
545
      op = opcodes.OpNodeQuery(output_fields=["name", "offline", "drained"],
546
                               names=names, use_locking=True)
547
      result = self.ExecOp(True, op)
548
    except errors.GenericError, err:
549
      err_code, msg = cli.FormatError(err)
550
      Err(msg, exit_code=err_code)
551
    self.nodes = [data[0] for data in result if not (data[1] or data[2])]
552

    
553
    op_diagnose = opcodes.OpOsDiagnose(output_fields=["name",
554
                                                      "variants",
555
                                                      "hidden"],
556
                                       names=[])
557
    result = self.ExecOp(True, op_diagnose)
558

    
559
    if not result:
560
      Err("Can't get the OS list")
561

    
562
    found = False
563
    for (name, variants, _) in result:
564
      if self.opts.os in cli.CalculateOSNames(name, variants):
565
        found = True
566
        break
567

    
568
    if not found:
569
      Err("OS '%s' not found" % self.opts.os)
570

    
571
    cluster_info = self.cl.QueryClusterInfo()
572
    self.cluster_info = cluster_info
573
    if not self.cluster_info:
574
      Err("Can't get cluster info")
575

    
576
    default_nic_params = self.cluster_info["nicparams"][constants.PP_DEFAULT]
577
    self.cluster_default_nicparams = default_nic_params
578
    if self.hypervisor is None:
579
      self.hypervisor = self.cluster_info["default_hypervisor"]
580
    self.hv_class = hypervisor.GetHypervisorClass(self.hypervisor)
581

    
582
  @_DoCheckInstances
583
  @_DoBatch(False)
584
  def BurnCreateInstances(self):
585
    """Create the given instances.
586

    
587
    """
588
    self.to_rem = []
589
    mytor = izip(cycle(self.nodes),
590
                 islice(cycle(self.nodes), 1, None),
591
                 self.instances)
592

    
593
    Log("Creating instances")
594
    for pnode, snode, instance in mytor:
595
      Log("instance %s", instance, indent=1)
596
      if self.opts.iallocator:
597
        pnode = snode = None
598
        msg = "with iallocator %s" % self.opts.iallocator
599
      elif self.opts.disk_template not in constants.DTS_INT_MIRROR:
600
        snode = None
601
        msg = "on %s" % pnode
602
      else:
603
        msg = "on %s, %s" % (pnode, snode)
604

    
605
      Log(msg, indent=2)
606

    
607
      op = opcodes.OpInstanceCreate(instance_name=instance,
608
                                    disks=[{"size": size}
609
                                           for size in self.disk_size],
610
                                    disk_template=self.opts.disk_template,
611
                                    nics=self.opts.nics,
612
                                    mode=constants.INSTANCE_CREATE,
613
                                    os_type=self.opts.os,
614
                                    pnode=pnode,
615
                                    snode=snode,
616
                                    start=True,
617
                                    ip_check=self.opts.ip_check,
618
                                    name_check=self.opts.name_check,
619
                                    wait_for_sync=True,
620
                                    file_driver="loop",
621
                                    file_storage_dir=None,
622
                                    iallocator=self.opts.iallocator,
623
                                    beparams=self.bep,
624
                                    hvparams=self.hvp,
625
                                    hypervisor=self.hypervisor,
626
                                    osparams=self.opts.osparams,
627
                                    )
628
      remove_instance = lambda name: lambda: self.to_rem.append(name)
629
      self.ExecOrQueue(instance, [op], post_process=remove_instance(instance))
630

    
631
  @_DoBatch(False)
632
  def BurnModifyRuntimeMemory(self):
633
    """Alter the runtime memory."""
634
    Log("Setting instance runtime memory")
635
    for instance in self.instances:
636
      Log("instance %s", instance, indent=1)
637
      tgt_mem = self.bep[constants.BE_MINMEM]
638
      op = opcodes.OpInstanceSetParams(instance_name=instance,
639
                                       runtime_mem=tgt_mem)
640
      Log("Set memory to %s MB", tgt_mem, indent=2)
641
      self.ExecOrQueue(instance, [op])
642

    
643
  @_DoBatch(False)
644
  def BurnGrowDisks(self):
645
    """Grow both the os and the swap disks by the requested amount, if any."""
646
    Log("Growing disks")
647
    for instance in self.instances:
648
      Log("instance %s", instance, indent=1)
649
      for idx, growth in enumerate(self.disk_growth):
650
        if growth > 0:
651
          op = opcodes.OpInstanceGrowDisk(instance_name=instance, disk=idx,
652
                                          amount=growth, wait_for_sync=True)
653
          Log("increase disk/%s by %s MB", idx, growth, indent=2)
654
          self.ExecOrQueue(instance, [op])
655

    
656
  @_DoBatch(True)
657
  def BurnReplaceDisks1D8(self):
658
    """Replace disks on primary and secondary for drbd8."""
659
    Log("Replacing disks on the same nodes")
660
    early_release = self.opts.early_release
661
    for instance in self.instances:
662
      Log("instance %s", instance, indent=1)
663
      ops = []
664
      for mode in constants.REPLACE_DISK_SEC, constants.REPLACE_DISK_PRI:
665
        op = opcodes.OpInstanceReplaceDisks(instance_name=instance,
666
                                            mode=mode,
667
                                            disks=list(range(self.disk_count)),
668
                                            early_release=early_release)
669
        Log("run %s", mode, indent=2)
670
        ops.append(op)
671
      self.ExecOrQueue(instance, ops)
672

    
673
  @_DoBatch(True)
674
  def BurnReplaceDisks2(self):
675
    """Replace secondary node."""
676
    Log("Changing the secondary node")
677
    mode = constants.REPLACE_DISK_CHG
678

    
679
    mytor = izip(islice(cycle(self.nodes), 2, None),
680
                 self.instances)
681
    for tnode, instance in mytor:
682
      Log("instance %s", instance, indent=1)
683
      if self.opts.iallocator:
684
        tnode = None
685
        msg = "with iallocator %s" % self.opts.iallocator
686
      else:
687
        msg = tnode
688
      op = opcodes.OpInstanceReplaceDisks(instance_name=instance,
689
                                          mode=mode,
690
                                          remote_node=tnode,
691
                                          iallocator=self.opts.iallocator,
692
                                          disks=[],
693
                                          early_release=self.opts.early_release)
694
      Log("run %s %s", mode, msg, indent=2)
695
      self.ExecOrQueue(instance, [op])
696

    
697
  @_DoCheckInstances
698
  @_DoBatch(False)
699
  def BurnFailover(self):
700
    """Failover the instances."""
701
    Log("Failing over instances")
702
    for instance in self.instances:
703
      Log("instance %s", instance, indent=1)
704
      op = opcodes.OpInstanceFailover(instance_name=instance,
705
                                      ignore_consistency=False)
706
      self.ExecOrQueue(instance, [op])
707

    
708
  @_DoCheckInstances
709
  @_DoBatch(False)
710
  def BurnMove(self):
711
    """Move the instances."""
712
    Log("Moving instances")
713
    mytor = izip(islice(cycle(self.nodes), 1, None),
714
                 self.instances)
715
    for tnode, instance in mytor:
716
      Log("instance %s", instance, indent=1)
717
      op = opcodes.OpInstanceMove(instance_name=instance,
718
                                  target_node=tnode)
719
      self.ExecOrQueue(instance, [op])
720

    
721
  @_DoBatch(False)
722
  def BurnMigrate(self):
723
    """Migrate the instances."""
724
    Log("Migrating instances")
725
    for instance in self.instances:
726
      Log("instance %s", instance, indent=1)
727
      op1 = opcodes.OpInstanceMigrate(instance_name=instance, mode=None,
728
                                      cleanup=False)
729

    
730
      op2 = opcodes.OpInstanceMigrate(instance_name=instance, mode=None,
731
                                      cleanup=True)
732
      Log("migration and migration cleanup", indent=2)
733
      self.ExecOrQueue(instance, [op1, op2])
734

    
735
  @_DoCheckInstances
736
  @_DoBatch(False)
737
  def BurnImportExport(self):
738
    """Export the instance, delete it, and import it back.
739

    
740
    """
741
    Log("Exporting and re-importing instances")
742
    mytor = izip(cycle(self.nodes),
743
                 islice(cycle(self.nodes), 1, None),
744
                 islice(cycle(self.nodes), 2, None),
745
                 self.instances)
746

    
747
    for pnode, snode, enode, instance in mytor:
748
      Log("instance %s", instance, indent=1)
749
      # read the full name of the instance
750
      nam_op = opcodes.OpInstanceQuery(output_fields=["name"],
751
                                       names=[instance], use_locking=True)
752
      full_name = self.ExecOp(False, nam_op)[0][0]
753

    
754
      if self.opts.iallocator:
755
        pnode = snode = None
756
        import_log_msg = ("import from %s"
757
                          " with iallocator %s" %
758
                          (enode, self.opts.iallocator))
759
      elif self.opts.disk_template not in constants.DTS_INT_MIRROR:
760
        snode = None
761
        import_log_msg = ("import from %s to %s" %
762
                          (enode, pnode))
763
      else:
764
        import_log_msg = ("import from %s to %s, %s" %
765
                          (enode, pnode, snode))
766

    
767
      exp_op = opcodes.OpBackupExport(instance_name=instance,
768
                                      target_node=enode,
769
                                      mode=constants.EXPORT_MODE_LOCAL,
770
                                      shutdown=True)
771
      rem_op = opcodes.OpInstanceRemove(instance_name=instance,
772
                                        ignore_failures=True)
773
      imp_dir = utils.PathJoin(pathutils.EXPORT_DIR, full_name)
774
      imp_op = opcodes.OpInstanceCreate(instance_name=instance,
775
                                        disks=[{"size": size}
776
                                               for size in self.disk_size],
777
                                        disk_template=self.opts.disk_template,
778
                                        nics=self.opts.nics,
779
                                        mode=constants.INSTANCE_IMPORT,
780
                                        src_node=enode,
781
                                        src_path=imp_dir,
782
                                        pnode=pnode,
783
                                        snode=snode,
784
                                        start=True,
785
                                        ip_check=self.opts.ip_check,
786
                                        name_check=self.opts.name_check,
787
                                        wait_for_sync=True,
788
                                        file_storage_dir=None,
789
                                        file_driver="loop",
790
                                        iallocator=self.opts.iallocator,
791
                                        beparams=self.bep,
792
                                        hvparams=self.hvp,
793
                                        osparams=self.opts.osparams,
794
                                        )
795

    
796
      erem_op = opcodes.OpBackupRemove(instance_name=instance)
797

    
798
      Log("export to node %s", enode, indent=2)
799
      Log("remove instance", indent=2)
800
      Log(import_log_msg, indent=2)
801
      Log("remove export", indent=2)
802
      self.ExecOrQueue(instance, [exp_op, rem_op, imp_op, erem_op])
803

    
804
  @staticmethod
805
  def StopInstanceOp(instance):
806
    """Stop given instance."""
807
    return opcodes.OpInstanceShutdown(instance_name=instance)
808

    
809
  @staticmethod
810
  def StartInstanceOp(instance):
811
    """Start given instance."""
812
    return opcodes.OpInstanceStartup(instance_name=instance, force=False)
813

    
814
  @staticmethod
815
  def RenameInstanceOp(instance, instance_new):
816
    """Rename instance."""
817
    return opcodes.OpInstanceRename(instance_name=instance,
818
                                    new_name=instance_new)
819

    
820
  @_DoCheckInstances
821
  @_DoBatch(True)
822
  def BurnStopStart(self):
823
    """Stop/start the instances."""
824
    Log("Stopping and starting instances")
825
    for instance in self.instances:
826
      Log("instance %s", instance, indent=1)
827
      op1 = self.StopInstanceOp(instance)
828
      op2 = self.StartInstanceOp(instance)
829
      self.ExecOrQueue(instance, [op1, op2])
830

    
831
  @_DoBatch(False)
832
  def BurnRemove(self):
833
    """Remove the instances."""
834
    Log("Removing instances")
835
    for instance in self.to_rem:
836
      Log("instance %s", instance, indent=1)
837
      op = opcodes.OpInstanceRemove(instance_name=instance,
838
                                    ignore_failures=True)
839
      self.ExecOrQueue(instance, [op])
840

    
841
  def BurnRename(self):
842
    """Rename the instances.
843

    
844
    Note that this function will not execute in parallel, since we
845
    only have one target for rename.
846

    
847
    """
848
    Log("Renaming instances")
849
    rename = self.opts.rename
850
    for instance in self.instances:
851
      Log("instance %s", instance, indent=1)
852
      op_stop1 = self.StopInstanceOp(instance)
853
      op_stop2 = self.StopInstanceOp(rename)
854
      op_rename1 = self.RenameInstanceOp(instance, rename)
855
      op_rename2 = self.RenameInstanceOp(rename, instance)
856
      op_start1 = self.StartInstanceOp(rename)
857
      op_start2 = self.StartInstanceOp(instance)
858
      self.ExecOp(False, op_stop1, op_rename1, op_start1)
859
      self._CheckInstanceAlive(rename)
860
      self.ExecOp(False, op_stop2, op_rename2, op_start2)
861
      self._CheckInstanceAlive(instance)
862

    
863
  @_DoCheckInstances
864
  @_DoBatch(True)
865
  def BurnReinstall(self):
866
    """Reinstall the instances."""
867
    Log("Reinstalling instances")
868
    for instance in self.instances:
869
      Log("instance %s", instance, indent=1)
870
      op1 = self.StopInstanceOp(instance)
871
      op2 = opcodes.OpInstanceReinstall(instance_name=instance)
872
      Log("reinstall without passing the OS", indent=2)
873
      op3 = opcodes.OpInstanceReinstall(instance_name=instance,
874
                                        os_type=self.opts.os)
875
      Log("reinstall specifying the OS", indent=2)
876
      op4 = self.StartInstanceOp(instance)
877
      self.ExecOrQueue(instance, [op1, op2, op3, op4])
878

    
879
  @_DoCheckInstances
880
  @_DoBatch(True)
881
  def BurnReboot(self):
882
    """Reboot the instances."""
883
    Log("Rebooting instances")
884
    for instance in self.instances:
885
      Log("instance %s", instance, indent=1)
886
      ops = []
887
      for reboot_type in self.opts.reboot_types:
888
        op = opcodes.OpInstanceReboot(instance_name=instance,
889
                                      reboot_type=reboot_type,
890
                                      ignore_secondaries=False)
891
        Log("reboot with type '%s'", reboot_type, indent=2)
892
        ops.append(op)
893
      self.ExecOrQueue(instance, ops)
894

    
895
  @_DoCheckInstances
896
  @_DoBatch(True)
897
  def BurnRenameSame(self):
898
    """Rename the instances to their own name."""
899
    Log("Renaming the instances to their own name")
900
    for instance in self.instances:
901
      Log("instance %s", instance, indent=1)
902
      op1 = self.StopInstanceOp(instance)
903
      op2 = self.RenameInstanceOp(instance, instance)
904
      Log("rename to the same name", indent=2)
905
      op4 = self.StartInstanceOp(instance)
906
      self.ExecOrQueue(instance, [op1, op2, op4])
907

    
908
  @_DoCheckInstances
909
  @_DoBatch(True)
910
  def BurnActivateDisks(self):
911
    """Activate and deactivate disks of the instances."""
912
    Log("Activating/deactivating disks")
913
    for instance in self.instances:
914
      Log("instance %s", instance, indent=1)
915
      op_start = self.StartInstanceOp(instance)
916
      op_act = opcodes.OpInstanceActivateDisks(instance_name=instance)
917
      op_deact = opcodes.OpInstanceDeactivateDisks(instance_name=instance)
918
      op_stop = self.StopInstanceOp(instance)
919
      Log("activate disks when online", indent=2)
920
      Log("activate disks when offline", indent=2)
921
      Log("deactivate disks (when offline)", indent=2)
922
      self.ExecOrQueue(instance, [op_act, op_stop, op_act, op_deact, op_start])
923

    
924
  @_DoCheckInstances
925
  @_DoBatch(False)
926
  def BurnAddRemoveDisks(self):
927
    """Add and remove an extra disk for the instances."""
928
    Log("Adding and removing disks")
929
    for instance in self.instances:
930
      Log("instance %s", instance, indent=1)
931
      op_add = opcodes.OpInstanceSetParams(
932
        instance_name=instance,
933
        disks=[(constants.DDM_ADD, {"size": self.disk_size[0]})])
934
      op_rem = opcodes.OpInstanceSetParams(
935
        instance_name=instance, disks=[(constants.DDM_REMOVE, {})])
936
      op_stop = self.StopInstanceOp(instance)
937
      op_start = self.StartInstanceOp(instance)
938
      Log("adding a disk", indent=2)
939
      Log("removing last disk", indent=2)
940
      self.ExecOrQueue(instance, [op_add, op_stop, op_rem, op_start])
941

    
942
  @_DoBatch(False)
943
  def BurnAddRemoveNICs(self):
944
    """Add, change and remove an extra NIC for the instances."""
945
    Log("Adding and removing NICs")
946
    for instance in self.instances:
947
      Log("instance %s", instance, indent=1)
948
      op_add = opcodes.OpInstanceSetParams(
949
        instance_name=instance, nics=[(constants.DDM_ADD, {})])
950
      op_chg = opcodes.OpInstanceSetParams(
951
        instance_name=instance, nics=[(constants.DDM_MODIFY,
952
                                       -1, {"mac": constants.VALUE_GENERATE})])
953
      op_rem = opcodes.OpInstanceSetParams(
954
        instance_name=instance, nics=[(constants.DDM_REMOVE, {})])
955
      Log("adding a NIC", indent=2)
956
      Log("changing a NIC", indent=2)
957
      Log("removing last NIC", indent=2)
958
      self.ExecOrQueue(instance, [op_add, op_chg, op_rem])
959

    
960
  def ConfdCallback(self, reply):
961
    """Callback for confd queries"""
962
    if reply.type == confd_client.UPCALL_REPLY:
963
      if reply.server_reply.status != constants.CONFD_REPL_STATUS_OK:
964
        Err("Query %s gave non-ok status %s: %s" % (reply.orig_request,
965
                                                    reply.server_reply.status,
966
                                                    reply.server_reply))
967
      if reply.orig_request.type == constants.CONFD_REQ_PING:
968
        Log("Ping: OK", indent=1)
969
      elif reply.orig_request.type == constants.CONFD_REQ_CLUSTER_MASTER:
970
        if reply.server_reply.answer == self.cluster_info["master"]:
971
          Log("Master: OK", indent=1)
972
        else:
973
          Err("Master: wrong: %s" % reply.server_reply.answer)
974
      elif reply.orig_request.type == constants.CONFD_REQ_NODE_ROLE_BYNAME:
975
        if reply.server_reply.answer == constants.CONFD_NODE_ROLE_MASTER:
976
          Log("Node role for master: OK", indent=1)
977
        else:
978
          Err("Node role for master: wrong: %s" % reply.server_reply.answer)
979

    
980
  def DoConfdRequestReply(self, req):
981
    self.confd_counting_callback.RegisterQuery(req.rsalt)
982
    self.confd_client.SendRequest(req, async=False)
983
    while not self.confd_counting_callback.AllAnswered():
984
      if not self.confd_client.ReceiveReply():
985
        Err("Did not receive all expected confd replies")
986
        break
987

    
988
  def BurnConfd(self):
989
    """Run confd queries for our instances.
990

    
991
    The following confd queries are tested:
992
      - CONFD_REQ_PING: simple ping
993
      - CONFD_REQ_CLUSTER_MASTER: cluster master
994
      - CONFD_REQ_NODE_ROLE_BYNAME: node role, for the master
995

    
996
    """
997
    Log("Checking confd results")
998

    
999
    filter_callback = confd_client.ConfdFilterCallback(self.ConfdCallback)
1000
    counting_callback = confd_client.ConfdCountingCallback(filter_callback)
1001
    self.confd_counting_callback = counting_callback
1002

    
1003
    self.confd_client = confd_client.GetConfdClient(counting_callback)
1004

    
1005
    req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
1006
    self.DoConfdRequestReply(req)
1007

    
1008
    req = confd_client.ConfdClientRequest(
1009
      type=constants.CONFD_REQ_CLUSTER_MASTER)
1010
    self.DoConfdRequestReply(req)
1011

    
1012
    req = confd_client.ConfdClientRequest(
1013
        type=constants.CONFD_REQ_NODE_ROLE_BYNAME,
1014
        query=self.cluster_info["master"])
1015
    self.DoConfdRequestReply(req)
1016

    
1017
  def _CheckInstanceAlive(self, instance):
1018
    """Check if an instance is alive by doing http checks.
1019

    
1020
    This will try to retrieve the url on the instance /hostname.txt
1021
    and check that it contains the hostname of the instance. In case
1022
    we get ECONNREFUSED, we retry up to the net timeout seconds, for
1023
    any other error we abort.
1024

    
1025
    """
1026
    if not self.opts.http_check:
1027
      return
1028
    end_time = time.time() + self.opts.net_timeout
1029
    url = None
1030
    while time.time() < end_time and url is None:
1031
      try:
1032
        url = self.url_opener.open("http://%s/hostname.txt" % instance)
1033
      except IOError:
1034
        # here we can have connection refused, no route to host, etc.
1035
        time.sleep(1)
1036
    if url is None:
1037
      raise InstanceDown(instance, "Cannot contact instance")
1038
    hostname = url.read().strip()
1039
    url.close()
1040
    if hostname != instance:
1041
      raise InstanceDown(instance, ("Hostname mismatch, expected %s, got %s" %
1042
                                    (instance, hostname)))
1043

    
1044
  def BurninCluster(self):
1045
    """Test a cluster intensively.
1046

    
1047
    This will create instances and then start/stop/failover them.
1048
    It is safe for existing instances but could impact performance.
1049

    
1050
    """
1051

    
1052
    opts = self.opts
1053

    
1054
    Log("Testing global parameters")
1055

    
1056
    if (len(self.nodes) == 1 and
1057
        opts.disk_template not in _SINGLE_NODE_DISK_TEMPLATES):
1058
      Err("When one node is available/selected the disk template must"
1059
          " be one of %s" % utils.CommaJoin(_SINGLE_NODE_DISK_TEMPLATES))
1060

    
1061
    if opts.do_confd_tests and not constants.ENABLE_CONFD:
1062
      Err("You selected confd tests but confd was disabled at configure time")
1063

    
1064
    has_err = True
1065
    try:
1066
      self.BurnCreateInstances()
1067

    
1068
      if self.bep[constants.BE_MINMEM] < self.bep[constants.BE_MAXMEM]:
1069
        self.BurnModifyRuntimeMemory()
1070

    
1071
      if opts.do_replace1 and opts.disk_template in constants.DTS_INT_MIRROR:
1072
        self.BurnReplaceDisks1D8()
1073
      if (opts.do_replace2 and len(self.nodes) > 2 and
1074
          opts.disk_template in constants.DTS_INT_MIRROR):
1075
        self.BurnReplaceDisks2()
1076

    
1077
      if (opts.disk_template in constants.DTS_GROWABLE and
1078
          compat.any(n > 0 for n in self.disk_growth)):
1079
        self.BurnGrowDisks()
1080

    
1081
      if opts.do_failover and opts.disk_template in constants.DTS_MIRRORED:
1082
        self.BurnFailover()
1083

    
1084
      if opts.do_migrate:
1085
        if opts.disk_template not in constants.DTS_MIRRORED:
1086
          Log("Skipping migration (disk template %s does not support it)",
1087
              opts.disk_template)
1088
        elif not self.hv_class.CAN_MIGRATE:
1089
          Log("Skipping migration (hypervisor %s does not support it)",
1090
              self.hypervisor)
1091
        else:
1092
          self.BurnMigrate()
1093

    
1094
      if (opts.do_move and len(self.nodes) > 1 and
1095
          opts.disk_template in [constants.DT_PLAIN, constants.DT_FILE]):
1096
        self.BurnMove()
1097

    
1098
      if (opts.do_importexport and
1099
          opts.disk_template in _IMPEXP_DISK_TEMPLATES):
1100
        self.BurnImportExport()
1101

    
1102
      if opts.do_reinstall:
1103
        self.BurnReinstall()
1104

    
1105
      if opts.do_reboot:
1106
        self.BurnReboot()
1107

    
1108
      if opts.do_renamesame:
1109
        self.BurnRenameSame()
1110

    
1111
      if opts.do_addremove_disks:
1112
        self.BurnAddRemoveDisks()
1113

    
1114
      default_nic_mode = self.cluster_default_nicparams[constants.NIC_MODE]
1115
      # Don't add/remove nics in routed mode, as we would need an ip to add
1116
      # them with
1117
      if opts.do_addremove_nics:
1118
        if default_nic_mode == constants.NIC_MODE_BRIDGED:
1119
          self.BurnAddRemoveNICs()
1120
        else:
1121
          Log("Skipping nic add/remove as the cluster is not in bridged mode")
1122

    
1123
      if opts.do_activate_disks:
1124
        self.BurnActivateDisks()
1125

    
1126
      if opts.rename:
1127
        self.BurnRename()
1128

    
1129
      if opts.do_confd_tests:
1130
        self.BurnConfd()
1131

    
1132
      if opts.do_startstop:
1133
        self.BurnStopStart()
1134

    
1135
      has_err = False
1136
    finally:
1137
      if has_err:
1138
        Log("Error detected: opcode buffer follows:\n\n")
1139
        Log(self.GetFeedbackBuf())
1140
        Log("\n\n")
1141
      if not self.opts.keep_instances:
1142
        try:
1143
          self.BurnRemove()
1144
        except Exception, err:  # pylint: disable=W0703
1145
          if has_err: # already detected errors, so errors in removal
1146
                      # are quite expected
1147
            Log("Note: error detected during instance remove: %s", err)
1148
          else: # non-expected error
1149
            raise
1150

    
1151
    return constants.EXIT_SUCCESS
1152

    
1153

    
1154
def main():
1155
  """Main function.
1156

    
1157
  """
1158
  utils.SetupLogging(pathutils.LOG_BURNIN, sys.argv[0],
1159
                     debug=False, stderr_logging=True)
1160

    
1161
  return Burner().BurninCluster()
1162

    
1163

    
1164
if __name__ == "__main__":
1165
  main()