Statistics
| Branch: | Tag: | Revision:

root / lib / tools / burnin.py @ bbfa259c

History | View | Annotate | Download (41.5 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Burnin program
23

24
"""
25

    
26
import sys
27
import optparse
28
import time
29
import socket
30
import urllib
31
from itertools import izip, islice, cycle
32
from cStringIO import StringIO
33

    
34
from ganeti import opcodes
35
from ganeti import constants
36
from ganeti import cli
37
from ganeti import errors
38
from ganeti import utils
39
from ganeti import hypervisor
40
from ganeti import compat
41
from ganeti import pathutils
42

    
43
from ganeti.confd import client as confd_client
44

    
45

    
46
USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...")
47

    
48
MAX_RETRIES = 3
49
LOG_HEADERS = {
50
  0: "- ",
51
  1: "* ",
52
  2: "",
53
  }
54

    
55
#: Disk templates supporting a single node
56
_SINGLE_NODE_DISK_TEMPLATES = compat.UniqueFrozenset([
57
  constants.DT_DISKLESS,
58
  constants.DT_PLAIN,
59
  constants.DT_FILE,
60
  constants.DT_SHARED_FILE,
61
  constants.DT_EXT,
62
  constants.DT_RBD,
63
  ])
64

    
65
_SUPPORTED_DISK_TEMPLATES = compat.UniqueFrozenset([
66
  constants.DT_DISKLESS,
67
  constants.DT_DRBD8,
68
  constants.DT_EXT,
69
  constants.DT_FILE,
70
  constants.DT_PLAIN,
71
  constants.DT_RBD,
72
  constants.DT_SHARED_FILE,
73
  ])
74

    
75
#: Disk templates for which import/export is tested
76
_IMPEXP_DISK_TEMPLATES = (_SUPPORTED_DISK_TEMPLATES - frozenset([
77
  constants.DT_DISKLESS,
78
  constants.DT_FILE,
79
  constants.DT_SHARED_FILE,
80
  ]))
81

    
82

    
83
class InstanceDown(Exception):
84
  """The checked instance was not up"""
85

    
86

    
87
class BurninFailure(Exception):
88
  """Failure detected during burning"""
89

    
90

    
91
def Usage():
92
  """Shows program usage information and exits the program."""
93

    
94
  print >> sys.stderr, "Usage:"
95
  print >> sys.stderr, USAGE
96
  sys.exit(2)
97

    
98

    
99
def Log(msg, *args, **kwargs):
100
  """Simple function that prints out its argument.
101

102
  """
103
  if args:
104
    msg = msg % args
105
  indent = kwargs.get("indent", 0)
106
  sys.stdout.write("%*s%s%s\n" % (2 * indent, "",
107
                                  LOG_HEADERS.get(indent, "  "), msg))
108
  sys.stdout.flush()
109

    
110

    
111
def Err(msg, exit_code=1):
112
  """Simple error logging that prints to stderr.
113

114
  """
115
  sys.stderr.write(msg + "\n")
116
  sys.stderr.flush()
117
  sys.exit(exit_code)
118

    
119

    
120
class SimpleOpener(urllib.FancyURLopener):
121
  """A simple url opener"""
122
  # pylint: disable=W0221
123

    
124
  def prompt_user_passwd(self, host, realm, clear_cache=0):
125
    """No-interaction version of prompt_user_passwd."""
126
    # we follow parent class' API
127
    # pylint: disable=W0613
128
    return None, None
129

    
130
  def http_error_default(self, url, fp, errcode, errmsg, headers):
131
    """Custom error handling"""
132
    # make sure sockets are not left in CLOSE_WAIT, this is similar
133
    # but with a different exception to the BasicURLOpener class
134
    _ = fp.read() # throw away data
135
    fp.close()
136
    raise InstanceDown("HTTP error returned: code %s, msg %s" %
137
                       (errcode, errmsg))
138

    
139

    
140
OPTIONS = [
141
  cli.cli_option("-o", "--os", dest="os", default=None,
142
                 help="OS to use during burnin",
143
                 metavar="<OS>",
144
                 completion_suggest=cli.OPT_COMPL_ONE_OS),
145
  cli.HYPERVISOR_OPT,
146
  cli.OSPARAMS_OPT,
147
  cli.cli_option("--disk-size", dest="disk_size",
148
                 help="Disk size (determines disk count)",
149
                 default="128m", type="string", metavar="<size,size,...>",
150
                 completion_suggest=("128M 512M 1G 4G 1G,256M"
151
                                     " 4G,1G,1G 10G").split()),
152
  cli.cli_option("--disk-growth", dest="disk_growth", help="Disk growth",
153
                 default="128m", type="string", metavar="<size,size,...>"),
154
  cli.cli_option("--mem-size", dest="mem_size", help="Memory size",
155
                 default=None, type="unit", metavar="<size>",
156
                 completion_suggest=("128M 256M 512M 1G 4G 8G"
157
                                     " 12G 16G").split()),
158
  cli.cli_option("--maxmem-size", dest="maxmem_size", help="Max Memory size",
159
                 default=256, type="unit", metavar="<size>",
160
                 completion_suggest=("128M 256M 512M 1G 4G 8G"
161
                                     " 12G 16G").split()),
162
  cli.cli_option("--minmem-size", dest="minmem_size", help="Min Memory size",
163
                 default=128, type="unit", metavar="<size>",
164
                 completion_suggest=("128M 256M 512M 1G 4G 8G"
165
                                     " 12G 16G").split()),
166
  cli.cli_option("--vcpu-count", dest="vcpu_count", help="VCPU count",
167
                 default=3, type="unit", metavar="<count>",
168
                 completion_suggest=("1 2 3 4").split()),
169
  cli.DEBUG_OPT,
170
  cli.VERBOSE_OPT,
171
  cli.NOIPCHECK_OPT,
172
  cli.NONAMECHECK_OPT,
173
  cli.EARLY_RELEASE_OPT,
174
  cli.cli_option("--no-replace1", dest="do_replace1",
175
                 help="Skip disk replacement with the same secondary",
176
                 action="store_false", default=True),
177
  cli.cli_option("--no-replace2", dest="do_replace2",
178
                 help="Skip disk replacement with a different secondary",
179
                 action="store_false", default=True),
180
  cli.cli_option("--no-failover", dest="do_failover",
181
                 help="Skip instance failovers", action="store_false",
182
                 default=True),
183
  cli.cli_option("--no-migrate", dest="do_migrate",
184
                 help="Skip instance live migration",
185
                 action="store_false", default=True),
186
  cli.cli_option("--no-move", dest="do_move",
187
                 help="Skip instance moves", action="store_false",
188
                 default=True),
189
  cli.cli_option("--no-importexport", dest="do_importexport",
190
                 help="Skip instance export/import", action="store_false",
191
                 default=True),
192
  cli.cli_option("--no-startstop", dest="do_startstop",
193
                 help="Skip instance stop/start", action="store_false",
194
                 default=True),
195
  cli.cli_option("--no-reinstall", dest="do_reinstall",
196
                 help="Skip instance reinstall", action="store_false",
197
                 default=True),
198
  cli.cli_option("--no-reboot", dest="do_reboot",
199
                 help="Skip instance reboot", action="store_false",
200
                 default=True),
201
  cli.cli_option("--no-renamesame", dest="do_renamesame",
202
                 help="Skip instance rename to same name", action="store_false",
203
                 default=True),
204
  cli.cli_option("--reboot-types", dest="reboot_types",
205
                 help="Specify the reboot types", default=None),
206
  cli.cli_option("--no-activate-disks", dest="do_activate_disks",
207
                 help="Skip disk activation/deactivation",
208
                 action="store_false", default=True),
209
  cli.cli_option("--no-add-disks", dest="do_addremove_disks",
210
                 help="Skip disk addition/removal",
211
                 action="store_false", default=True),
212
  cli.cli_option("--no-add-nics", dest="do_addremove_nics",
213
                 help="Skip NIC addition/removal",
214
                 action="store_false", default=True),
215
  cli.cli_option("--no-nics", dest="nics",
216
                 help="No network interfaces", action="store_const",
217
                 const=[], default=[{}]),
218
  cli.cli_option("--no-confd", dest="do_confd_tests",
219
                 help="Skip confd queries",
220
                 action="store_false", default=constants.ENABLE_CONFD),
221
  cli.cli_option("--rename", dest="rename", default=None,
222
                 help=("Give one unused instance name which is taken"
223
                       " to start the renaming sequence"),
224
                 metavar="<instance_name>"),
225
  cli.cli_option("-t", "--disk-template", dest="disk_template",
226
                 choices=list(_SUPPORTED_DISK_TEMPLATES),
227
                 default=constants.DT_DRBD8,
228
                 help=("Disk template (default %s, otherwise one of %s)" %
229
                       (constants.DT_DRBD8,
230
                        utils.CommaJoin(_SUPPORTED_DISK_TEMPLATES)))),
231
  cli.cli_option("-n", "--nodes", dest="nodes", default="",
232
                 help=("Comma separated list of nodes to perform"
233
                       " the burnin on (defaults to all nodes)"),
234
                 completion_suggest=cli.OPT_COMPL_MANY_NODES),
235
  cli.cli_option("-I", "--iallocator", dest="iallocator",
236
                 default=None, type="string",
237
                 help=("Perform the allocation using an iallocator"
238
                       " instead of fixed node spread (node restrictions no"
239
                       " longer apply, therefore -n/--nodes must not be"
240
                       " used"),
241
                 completion_suggest=cli.OPT_COMPL_ONE_IALLOCATOR),
242
  cli.cli_option("-p", "--parallel", default=False, action="store_true",
243
                 dest="parallel",
244
                 help=("Enable parallelization of some operations in"
245
                       " order to speed burnin or to test granular locking")),
246
  cli.cli_option("--net-timeout", default=15, type="int",
247
                 dest="net_timeout",
248
                 help=("The instance check network timeout in seconds"
249
                       " (defaults to 15 seconds)"),
250
                 completion_suggest="15 60 300 900".split()),
251
  cli.cli_option("-C", "--http-check", default=False, action="store_true",
252
                 dest="http_check",
253
                 help=("Enable checking of instance status via http,"
254
                       " looking for /hostname.txt that should contain the"
255
                       " name of the instance")),
256
  cli.cli_option("-K", "--keep-instances", default=False,
257
                 action="store_true",
258
                 dest="keep_instances",
259
                 help=("Leave instances on the cluster after burnin,"
260
                       " for investigation in case of errors or simply"
261
                       " to use them")),
262
  cli.REASON_OPT,
263
  ]
264

    
265
# Mainly used for bash completion
266
ARGUMENTS = [cli.ArgInstance(min=1)]
267

    
268

    
269
def _DoCheckInstances(fn):
270
  """Decorator for checking instances.
271

272
  """
273
  def wrapper(self, *args, **kwargs):
274
    val = fn(self, *args, **kwargs)
275
    for instance in self.instances:
276
      self._CheckInstanceAlive(instance) # pylint: disable=W0212
277
    return val
278

    
279
  return wrapper
280

    
281

    
282
def _DoBatch(retry):
283
  """Decorator for possible batch operations.
284

285
  Must come after the _DoCheckInstances decorator (if any).
286

287
  @param retry: whether this is a retryable batch, will be
288
      passed to StartBatch
289

290
  """
291
  def wrap(fn):
292
    def batched(self, *args, **kwargs):
293
      self.StartBatch(retry)
294
      val = fn(self, *args, **kwargs)
295
      self.CommitQueue()
296
      return val
297
    return batched
298

    
299
  return wrap
300

    
301

    
302
class Burner(object):
303
  """Burner class."""
304

    
305
  def __init__(self):
306
    """Constructor."""
307
    self.url_opener = SimpleOpener()
308
    self._feed_buf = StringIO()
309
    self.nodes = []
310
    self.instances = []
311
    self.to_rem = []
312
    self.queued_ops = []
313
    self.opts = None
314
    self.queue_retry = False
315
    self.disk_count = self.disk_growth = self.disk_size = None
316
    self.hvp = self.bep = None
317
    self.ParseOptions()
318
    self.cl = cli.GetClient()
319
    self.GetState()
320

    
321
  def ClearFeedbackBuf(self):
322
    """Clear the feedback buffer."""
323
    self._feed_buf.truncate(0)
324

    
325
  def GetFeedbackBuf(self):
326
    """Return the contents of the buffer."""
327
    return self._feed_buf.getvalue()
328

    
329
  def Feedback(self, msg):
330
    """Acumulate feedback in our buffer."""
331
    formatted_msg = "%s %s" % (time.ctime(utils.MergeTime(msg[0])), msg[2])
332
    self._feed_buf.write(formatted_msg + "\n")
333
    if self.opts.verbose:
334
      Log(formatted_msg, indent=3)
335

    
336
  def MaybeRetry(self, retry_count, msg, fn, *args):
337
    """Possibly retry a given function execution.
338

339
    @type retry_count: int
340
    @param retry_count: retry counter:
341
        - 0: non-retryable action
342
        - 1: last retry for a retryable action
343
        - MAX_RETRIES: original try for a retryable action
344
    @type msg: str
345
    @param msg: the kind of the operation
346
    @type fn: callable
347
    @param fn: the function to be called
348

349
    """
350
    try:
351
      val = fn(*args)
352
      if retry_count > 0 and retry_count < MAX_RETRIES:
353
        Log("Idempotent %s succeeded after %d retries",
354
            msg, MAX_RETRIES - retry_count)
355
      return val
356
    except Exception, err: # pylint: disable=W0703
357
      if retry_count == 0:
358
        Log("Non-idempotent %s failed, aborting", msg)
359
        raise
360
      elif retry_count == 1:
361
        Log("Idempotent %s repeated failure, aborting", msg)
362
        raise
363
      else:
364
        Log("Idempotent %s failed, retry #%d/%d: %s",
365
            msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err)
366
        self.MaybeRetry(retry_count - 1, msg, fn, *args)
367

    
368
  def _ExecOp(self, *ops):
369
    """Execute one or more opcodes and manage the exec buffer.
370

371
    @return: if only opcode has been passed, we return its result;
372
        otherwise we return the list of results
373

374
    """
375
    job_id = cli.SendJob(ops, cl=self.cl)
376
    results = cli.PollJob(job_id, cl=self.cl, feedback_fn=self.Feedback)
377
    if len(ops) == 1:
378
      return results[0]
379
    else:
380
      return results
381

    
382
  def ExecOp(self, retry, *ops):
383
    """Execute one or more opcodes and manage the exec buffer.
384

385
    @return: if only opcode has been passed, we return its result;
386
        otherwise we return the list of results
387

388
    """
389
    if retry:
390
      rval = MAX_RETRIES
391
    else:
392
      rval = 0
393
    cli.SetGenericOpcodeOpts(ops, self.opts)
394
    return self.MaybeRetry(rval, "opcode", self._ExecOp, *ops)
395

    
396
  def ExecOrQueue(self, name, ops, post_process=None):
397
    """Execute an opcode and manage the exec buffer."""
398
    if self.opts.parallel:
399
      cli.SetGenericOpcodeOpts(ops, self.opts)
400
      self.queued_ops.append((ops, name, post_process))
401
    else:
402
      val = self.ExecOp(self.queue_retry, *ops) # pylint: disable=W0142
403
      if post_process is not None:
404
        post_process()
405
      return val
406

    
407
  def StartBatch(self, retry):
408
    """Start a new batch of jobs.
409

410
    @param retry: whether this is a retryable batch
411

412
    """
413
    self.queued_ops = []
414
    self.queue_retry = retry
415

    
416
  def CommitQueue(self):
417
    """Execute all submitted opcodes in case of parallel burnin"""
418
    if not self.opts.parallel or not self.queued_ops:
419
      return
420

    
421
    if self.queue_retry:
422
      rval = MAX_RETRIES
423
    else:
424
      rval = 0
425

    
426
    try:
427
      results = self.MaybeRetry(rval, "jobset", self.ExecJobSet,
428
                                self.queued_ops)
429
    finally:
430
      self.queued_ops = []
431
    return results
432

    
433
  def ExecJobSet(self, jobs):
434
    """Execute a set of jobs and return once all are done.
435

436
    The method will return the list of results, if all jobs are
437
    successful. Otherwise, OpExecError will be raised from within
438
    cli.py.
439

440
    """
441
    self.ClearFeedbackBuf()
442
    jex = cli.JobExecutor(cl=self.cl, feedback_fn=self.Feedback)
443
    for ops, name, _ in jobs:
444
      jex.QueueJob(name, *ops) # pylint: disable=W0142
445
    try:
446
      results = jex.GetResults()
447
    except Exception, err: # pylint: disable=W0703
448
      Log("Jobs failed: %s", err)
449
      raise BurninFailure()
450

    
451
    fail = False
452
    val = []
453
    for (_, name, post_process), (success, result) in zip(jobs, results):
454
      if success:
455
        if post_process:
456
          try:
457
            post_process()
458
          except Exception, err: # pylint: disable=W0703
459
            Log("Post process call for job %s failed: %s", name, err)
460
            fail = True
461
        val.append(result)
462
      else:
463
        fail = True
464

    
465
    if fail:
466
      raise BurninFailure()
467

    
468
    return val
469

    
470
  def ParseOptions(self):
471
    """Parses the command line options.
472

473
    In case of command line errors, it will show the usage and exit the
474
    program.
475

476
    """
477
    parser = optparse.OptionParser(usage="\n%s" % USAGE,
478
                                   version=("%%prog (ganeti) %s" %
479
                                            constants.RELEASE_VERSION),
480
                                   option_list=OPTIONS)
481

    
482
    options, args = parser.parse_args()
483
    if len(args) < 1 or options.os is None:
484
      Usage()
485

    
486
    if options.mem_size:
487
      options.maxmem_size = options.mem_size
488
      options.minmem_size = options.mem_size
489
    elif options.minmem_size > options.maxmem_size:
490
      Err("Maximum memory lower than minimum memory")
491

    
492
    if options.disk_template not in _SUPPORTED_DISK_TEMPLATES:
493
      Err("Unknown or unsupported disk template '%s'" % options.disk_template)
494

    
495
    if options.disk_template == constants.DT_DISKLESS:
496
      disk_size = disk_growth = []
497
      options.do_addremove_disks = False
498
    else:
499
      disk_size = [utils.ParseUnit(v) for v in options.disk_size.split(",")]
500
      disk_growth = [utils.ParseUnit(v)
501
                     for v in options.disk_growth.split(",")]
502
      if len(disk_growth) != len(disk_size):
503
        Err("Wrong disk sizes/growth combination")
504
    if ((disk_size and options.disk_template == constants.DT_DISKLESS) or
505
        (not disk_size and options.disk_template != constants.DT_DISKLESS)):
506
      Err("Wrong disk count/disk template combination")
507

    
508
    self.disk_size = disk_size
509
    self.disk_growth = disk_growth
510
    self.disk_count = len(disk_size)
511

    
512
    if options.nodes and options.iallocator:
513
      Err("Give either the nodes option or the iallocator option, not both")
514

    
515
    if options.http_check and not options.name_check:
516
      Err("Can't enable HTTP checks without name checks")
517

    
518
    self.opts = options
519
    self.instances = args
520
    self.bep = {
521
      constants.BE_MINMEM: options.minmem_size,
522
      constants.BE_MAXMEM: options.maxmem_size,
523
      constants.BE_VCPUS: options.vcpu_count,
524
      }
525

    
526
    self.hypervisor = None
527
    self.hvp = {}
528
    if options.hypervisor:
529
      self.hypervisor, self.hvp = options.hypervisor
530

    
531
    if options.reboot_types is None:
532
      options.reboot_types = constants.REBOOT_TYPES
533
    else:
534
      options.reboot_types = options.reboot_types.split(",")
535
      rt_diff = set(options.reboot_types).difference(constants.REBOOT_TYPES)
536
      if rt_diff:
537
        Err("Invalid reboot types specified: %s" % utils.CommaJoin(rt_diff))
538

    
539
    socket.setdefaulttimeout(options.net_timeout)
540

    
541
  def GetState(self):
542
    """Read the cluster state from the master daemon."""
543
    if self.opts.nodes:
544
      names = self.opts.nodes.split(",")
545
    else:
546
      names = []
547
    try:
548
      op = opcodes.OpNodeQuery(output_fields=["name", "offline", "drained"],
549
                               names=names, use_locking=True)
550
      result = self.ExecOp(True, op)
551
    except errors.GenericError, err:
552
      err_code, msg = cli.FormatError(err)
553
      Err(msg, exit_code=err_code)
554
    self.nodes = [data[0] for data in result if not (data[1] or data[2])]
555

    
556
    op_diagnose = opcodes.OpOsDiagnose(output_fields=["name",
557
                                                      "variants",
558
                                                      "hidden"],
559
                                       names=[])
560
    result = self.ExecOp(True, op_diagnose)
561

    
562
    if not result:
563
      Err("Can't get the OS list")
564

    
565
    found = False
566
    for (name, variants, _) in result:
567
      if self.opts.os in cli.CalculateOSNames(name, variants):
568
        found = True
569
        break
570

    
571
    if not found:
572
      Err("OS '%s' not found" % self.opts.os)
573

    
574
    cluster_info = self.cl.QueryClusterInfo()
575
    self.cluster_info = cluster_info
576
    if not self.cluster_info:
577
      Err("Can't get cluster info")
578

    
579
    default_nic_params = self.cluster_info["nicparams"][constants.PP_DEFAULT]
580
    self.cluster_default_nicparams = default_nic_params
581
    if self.hypervisor is None:
582
      self.hypervisor = self.cluster_info["default_hypervisor"]
583
    self.hv_can_migrate = \
584
      hypervisor.GetHypervisorClass(self.hypervisor).CAN_MIGRATE
585

    
586
  @_DoCheckInstances
587
  @_DoBatch(False)
588
  def BurnCreateInstances(self):
589
    """Create the given instances.
590

591
    """
592
    self.to_rem = []
593
    mytor = izip(cycle(self.nodes),
594
                 islice(cycle(self.nodes), 1, None),
595
                 self.instances)
596

    
597
    Log("Creating instances")
598
    for pnode, snode, instance in mytor:
599
      Log("instance %s", instance, indent=1)
600
      if self.opts.iallocator:
601
        pnode = snode = None
602
        msg = "with iallocator %s" % self.opts.iallocator
603
      elif self.opts.disk_template not in constants.DTS_INT_MIRROR:
604
        snode = None
605
        msg = "on %s" % pnode
606
      else:
607
        msg = "on %s, %s" % (pnode, snode)
608

    
609
      Log(msg, indent=2)
610

    
611
      op = opcodes.OpInstanceCreate(instance_name=instance,
612
                                    disks=[{"size": size}
613
                                           for size in self.disk_size],
614
                                    disk_template=self.opts.disk_template,
615
                                    nics=self.opts.nics,
616
                                    mode=constants.INSTANCE_CREATE,
617
                                    os_type=self.opts.os,
618
                                    pnode=pnode,
619
                                    snode=snode,
620
                                    start=True,
621
                                    ip_check=self.opts.ip_check,
622
                                    name_check=self.opts.name_check,
623
                                    wait_for_sync=True,
624
                                    file_driver="loop",
625
                                    file_storage_dir=None,
626
                                    iallocator=self.opts.iallocator,
627
                                    beparams=self.bep,
628
                                    hvparams=self.hvp,
629
                                    hypervisor=self.hypervisor,
630
                                    osparams=self.opts.osparams,
631
                                    )
632
      remove_instance = lambda name: lambda: self.to_rem.append(name)
633
      self.ExecOrQueue(instance, [op], post_process=remove_instance(instance))
634

    
635
  @_DoBatch(False)
636
  def BurnModifyRuntimeMemory(self):
637
    """Alter the runtime memory."""
638
    Log("Setting instance runtime memory")
639
    for instance in self.instances:
640
      Log("instance %s", instance, indent=1)
641
      tgt_mem = self.bep[constants.BE_MINMEM]
642
      op = opcodes.OpInstanceSetParams(instance_name=instance,
643
                                       runtime_mem=tgt_mem)
644
      Log("Set memory to %s MB", tgt_mem, indent=2)
645
      self.ExecOrQueue(instance, [op])
646

    
647
  @_DoBatch(False)
648
  def BurnGrowDisks(self):
649
    """Grow both the os and the swap disks by the requested amount, if any."""
650
    Log("Growing disks")
651
    for instance in self.instances:
652
      Log("instance %s", instance, indent=1)
653
      for idx, growth in enumerate(self.disk_growth):
654
        if growth > 0:
655
          op = opcodes.OpInstanceGrowDisk(instance_name=instance, disk=idx,
656
                                          amount=growth, wait_for_sync=True)
657
          Log("increase disk/%s by %s MB", idx, growth, indent=2)
658
          self.ExecOrQueue(instance, [op])
659

    
660
  @_DoBatch(True)
661
  def BurnReplaceDisks1D8(self):
662
    """Replace disks on primary and secondary for drbd8."""
663
    Log("Replacing disks on the same nodes")
664
    early_release = self.opts.early_release
665
    for instance in self.instances:
666
      Log("instance %s", instance, indent=1)
667
      ops = []
668
      for mode in constants.REPLACE_DISK_SEC, constants.REPLACE_DISK_PRI:
669
        op = opcodes.OpInstanceReplaceDisks(instance_name=instance,
670
                                            mode=mode,
671
                                            disks=list(range(self.disk_count)),
672
                                            early_release=early_release)
673
        Log("run %s", mode, indent=2)
674
        ops.append(op)
675
      self.ExecOrQueue(instance, ops)
676

    
677
  @_DoBatch(True)
678
  def BurnReplaceDisks2(self):
679
    """Replace secondary node."""
680
    Log("Changing the secondary node")
681
    mode = constants.REPLACE_DISK_CHG
682

    
683
    mytor = izip(islice(cycle(self.nodes), 2, None),
684
                 self.instances)
685
    for tnode, instance in mytor:
686
      Log("instance %s", instance, indent=1)
687
      if self.opts.iallocator:
688
        tnode = None
689
        msg = "with iallocator %s" % self.opts.iallocator
690
      else:
691
        msg = tnode
692
      op = opcodes.OpInstanceReplaceDisks(instance_name=instance,
693
                                          mode=mode,
694
                                          remote_node=tnode,
695
                                          iallocator=self.opts.iallocator,
696
                                          disks=[],
697
                                          early_release=self.opts.early_release)
698
      Log("run %s %s", mode, msg, indent=2)
699
      self.ExecOrQueue(instance, [op])
700

    
701
  @_DoCheckInstances
702
  @_DoBatch(False)
703
  def BurnFailover(self):
704
    """Failover the instances."""
705
    Log("Failing over instances")
706
    for instance in self.instances:
707
      Log("instance %s", instance, indent=1)
708
      op = opcodes.OpInstanceFailover(instance_name=instance,
709
                                      ignore_consistency=False)
710
      self.ExecOrQueue(instance, [op])
711

    
712
  @_DoCheckInstances
713
  @_DoBatch(False)
714
  def BurnMove(self):
715
    """Move the instances."""
716
    Log("Moving instances")
717
    mytor = izip(islice(cycle(self.nodes), 1, None),
718
                 self.instances)
719
    for tnode, instance in mytor:
720
      Log("instance %s", instance, indent=1)
721
      op = opcodes.OpInstanceMove(instance_name=instance,
722
                                  target_node=tnode)
723
      self.ExecOrQueue(instance, [op])
724

    
725
  @_DoBatch(False)
726
  def BurnMigrate(self):
727
    """Migrate the instances."""
728
    Log("Migrating instances")
729
    for instance in self.instances:
730
      Log("instance %s", instance, indent=1)
731
      op1 = opcodes.OpInstanceMigrate(instance_name=instance, mode=None,
732
                                      cleanup=False)
733

    
734
      op2 = opcodes.OpInstanceMigrate(instance_name=instance, mode=None,
735
                                      cleanup=True)
736
      Log("migration and migration cleanup", indent=2)
737
      self.ExecOrQueue(instance, [op1, op2])
738

    
739
  @_DoCheckInstances
740
  @_DoBatch(False)
741
  def BurnImportExport(self):
742
    """Export the instance, delete it, and import it back.
743

744
    """
745
    Log("Exporting and re-importing instances")
746
    mytor = izip(cycle(self.nodes),
747
                 islice(cycle(self.nodes), 1, None),
748
                 islice(cycle(self.nodes), 2, None),
749
                 self.instances)
750

    
751
    for pnode, snode, enode, instance in mytor:
752
      Log("instance %s", instance, indent=1)
753
      # read the full name of the instance
754
      nam_op = opcodes.OpInstanceQuery(output_fields=["name"],
755
                                       names=[instance], use_locking=True)
756
      full_name = self.ExecOp(False, nam_op)[0][0]
757

    
758
      if self.opts.iallocator:
759
        pnode = snode = None
760
        import_log_msg = ("import from %s"
761
                          " with iallocator %s" %
762
                          (enode, self.opts.iallocator))
763
      elif self.opts.disk_template not in constants.DTS_INT_MIRROR:
764
        snode = None
765
        import_log_msg = ("import from %s to %s" %
766
                          (enode, pnode))
767
      else:
768
        import_log_msg = ("import from %s to %s, %s" %
769
                          (enode, pnode, snode))
770

    
771
      exp_op = opcodes.OpBackupExport(instance_name=instance,
772
                                      target_node=enode,
773
                                      mode=constants.EXPORT_MODE_LOCAL,
774
                                      shutdown=True)
775
      rem_op = opcodes.OpInstanceRemove(instance_name=instance,
776
                                        ignore_failures=True)
777
      imp_dir = utils.PathJoin(pathutils.EXPORT_DIR, full_name)
778
      imp_op = opcodes.OpInstanceCreate(instance_name=instance,
779
                                        disks=[{"size": size}
780
                                               for size in self.disk_size],
781
                                        disk_template=self.opts.disk_template,
782
                                        nics=self.opts.nics,
783
                                        mode=constants.INSTANCE_IMPORT,
784
                                        src_node=enode,
785
                                        src_path=imp_dir,
786
                                        pnode=pnode,
787
                                        snode=snode,
788
                                        start=True,
789
                                        ip_check=self.opts.ip_check,
790
                                        name_check=self.opts.name_check,
791
                                        wait_for_sync=True,
792
                                        file_storage_dir=None,
793
                                        file_driver="loop",
794
                                        iallocator=self.opts.iallocator,
795
                                        beparams=self.bep,
796
                                        hvparams=self.hvp,
797
                                        osparams=self.opts.osparams,
798
                                        )
799

    
800
      erem_op = opcodes.OpBackupRemove(instance_name=instance)
801

    
802
      Log("export to node %s", enode, indent=2)
803
      Log("remove instance", indent=2)
804
      Log(import_log_msg, indent=2)
805
      Log("remove export", indent=2)
806
      self.ExecOrQueue(instance, [exp_op, rem_op, imp_op, erem_op])
807

    
808
  @staticmethod
809
  def StopInstanceOp(instance):
810
    """Stop given instance."""
811
    return opcodes.OpInstanceShutdown(instance_name=instance)
812

    
813
  @staticmethod
814
  def StartInstanceOp(instance):
815
    """Start given instance."""
816
    return opcodes.OpInstanceStartup(instance_name=instance, force=False)
817

    
818
  @staticmethod
819
  def RenameInstanceOp(instance, instance_new):
820
    """Rename instance."""
821
    return opcodes.OpInstanceRename(instance_name=instance,
822
                                    new_name=instance_new)
823

    
824
  @_DoCheckInstances
825
  @_DoBatch(True)
826
  def BurnStopStart(self):
827
    """Stop/start the instances."""
828
    Log("Stopping and starting instances")
829
    for instance in self.instances:
830
      Log("instance %s", instance, indent=1)
831
      op1 = self.StopInstanceOp(instance)
832
      op2 = self.StartInstanceOp(instance)
833
      self.ExecOrQueue(instance, [op1, op2])
834

    
835
  @_DoBatch(False)
836
  def BurnRemove(self):
837
    """Remove the instances."""
838
    Log("Removing instances")
839
    for instance in self.to_rem:
840
      Log("instance %s", instance, indent=1)
841
      op = opcodes.OpInstanceRemove(instance_name=instance,
842
                                    ignore_failures=True)
843
      self.ExecOrQueue(instance, [op])
844

    
845
  def BurnRename(self):
846
    """Rename the instances.
847

848
    Note that this function will not execute in parallel, since we
849
    only have one target for rename.
850

851
    """
852
    Log("Renaming instances")
853
    rename = self.opts.rename
854
    for instance in self.instances:
855
      Log("instance %s", instance, indent=1)
856
      op_stop1 = self.StopInstanceOp(instance)
857
      op_stop2 = self.StopInstanceOp(rename)
858
      op_rename1 = self.RenameInstanceOp(instance, rename)
859
      op_rename2 = self.RenameInstanceOp(rename, instance)
860
      op_start1 = self.StartInstanceOp(rename)
861
      op_start2 = self.StartInstanceOp(instance)
862
      self.ExecOp(False, op_stop1, op_rename1, op_start1)
863
      self._CheckInstanceAlive(rename)
864
      self.ExecOp(False, op_stop2, op_rename2, op_start2)
865
      self._CheckInstanceAlive(instance)
866

    
867
  @_DoCheckInstances
868
  @_DoBatch(True)
869
  def BurnReinstall(self):
870
    """Reinstall the instances."""
871
    Log("Reinstalling instances")
872
    for instance in self.instances:
873
      Log("instance %s", instance, indent=1)
874
      op1 = self.StopInstanceOp(instance)
875
      op2 = opcodes.OpInstanceReinstall(instance_name=instance)
876
      Log("reinstall without passing the OS", indent=2)
877
      op3 = opcodes.OpInstanceReinstall(instance_name=instance,
878
                                        os_type=self.opts.os)
879
      Log("reinstall specifying the OS", indent=2)
880
      op4 = self.StartInstanceOp(instance)
881
      self.ExecOrQueue(instance, [op1, op2, op3, op4])
882

    
883
  @_DoCheckInstances
884
  @_DoBatch(True)
885
  def BurnReboot(self):
886
    """Reboot the instances."""
887
    Log("Rebooting instances")
888
    for instance in self.instances:
889
      Log("instance %s", instance, indent=1)
890
      ops = []
891
      for reboot_type in self.opts.reboot_types:
892
        op = opcodes.OpInstanceReboot(instance_name=instance,
893
                                      reboot_type=reboot_type,
894
                                      ignore_secondaries=False)
895
        Log("reboot with type '%s'", reboot_type, indent=2)
896
        ops.append(op)
897
      self.ExecOrQueue(instance, ops)
898

    
899
  @_DoCheckInstances
900
  @_DoBatch(True)
901
  def BurnRenameSame(self):
902
    """Rename the instances to their own name."""
903
    Log("Renaming the instances to their own name")
904
    for instance in self.instances:
905
      Log("instance %s", instance, indent=1)
906
      op1 = self.StopInstanceOp(instance)
907
      op2 = self.RenameInstanceOp(instance, instance)
908
      Log("rename to the same name", indent=2)
909
      op4 = self.StartInstanceOp(instance)
910
      self.ExecOrQueue(instance, [op1, op2, op4])
911

    
912
  @_DoCheckInstances
913
  @_DoBatch(True)
914
  def BurnActivateDisks(self):
915
    """Activate and deactivate disks of the instances."""
916
    Log("Activating/deactivating disks")
917
    for instance in self.instances:
918
      Log("instance %s", instance, indent=1)
919
      op_start = self.StartInstanceOp(instance)
920
      op_act = opcodes.OpInstanceActivateDisks(instance_name=instance)
921
      op_deact = opcodes.OpInstanceDeactivateDisks(instance_name=instance)
922
      op_stop = self.StopInstanceOp(instance)
923
      Log("activate disks when online", indent=2)
924
      Log("activate disks when offline", indent=2)
925
      Log("deactivate disks (when offline)", indent=2)
926
      self.ExecOrQueue(instance, [op_act, op_stop, op_act, op_deact, op_start])
927

    
928
  @_DoCheckInstances
929
  @_DoBatch(False)
930
  def BurnAddRemoveDisks(self):
931
    """Add and remove an extra disk for the instances."""
932
    Log("Adding and removing disks")
933
    for instance in self.instances:
934
      Log("instance %s", instance, indent=1)
935
      op_add = opcodes.OpInstanceSetParams(
936
        instance_name=instance,
937
        disks=[(constants.DDM_ADD, {"size": self.disk_size[0]})])
938
      op_rem = opcodes.OpInstanceSetParams(
939
        instance_name=instance, disks=[(constants.DDM_REMOVE, {})])
940
      op_stop = self.StopInstanceOp(instance)
941
      op_start = self.StartInstanceOp(instance)
942
      Log("adding a disk", indent=2)
943
      Log("removing last disk", indent=2)
944
      self.ExecOrQueue(instance, [op_add, op_stop, op_rem, op_start])
945

    
946
  @_DoBatch(False)
947
  def BurnAddRemoveNICs(self):
948
    """Add, change and remove an extra NIC for the instances."""
949
    Log("Adding and removing NICs")
950
    for instance in self.instances:
951
      Log("instance %s", instance, indent=1)
952
      op_add = opcodes.OpInstanceSetParams(
953
        instance_name=instance, nics=[(constants.DDM_ADD, {})])
954
      op_chg = opcodes.OpInstanceSetParams(
955
        instance_name=instance, nics=[(constants.DDM_MODIFY,
956
                                       -1, {"mac": constants.VALUE_GENERATE})])
957
      op_rem = opcodes.OpInstanceSetParams(
958
        instance_name=instance, nics=[(constants.DDM_REMOVE, {})])
959
      Log("adding a NIC", indent=2)
960
      Log("changing a NIC", indent=2)
961
      Log("removing last NIC", indent=2)
962
      self.ExecOrQueue(instance, [op_add, op_chg, op_rem])
963

    
964
  def ConfdCallback(self, reply):
965
    """Callback for confd queries"""
966
    if reply.type == confd_client.UPCALL_REPLY:
967
      if reply.server_reply.status != constants.CONFD_REPL_STATUS_OK:
968
        Err("Query %s gave non-ok status %s: %s" % (reply.orig_request,
969
                                                    reply.server_reply.status,
970
                                                    reply.server_reply))
971
      if reply.orig_request.type == constants.CONFD_REQ_PING:
972
        Log("Ping: OK", indent=1)
973
      elif reply.orig_request.type == constants.CONFD_REQ_CLUSTER_MASTER:
974
        if reply.server_reply.answer == self.cluster_info["master"]:
975
          Log("Master: OK", indent=1)
976
        else:
977
          Err("Master: wrong: %s" % reply.server_reply.answer)
978
      elif reply.orig_request.type == constants.CONFD_REQ_NODE_ROLE_BYNAME:
979
        if reply.server_reply.answer == constants.CONFD_NODE_ROLE_MASTER:
980
          Log("Node role for master: OK", indent=1)
981
        else:
982
          Err("Node role for master: wrong: %s" % reply.server_reply.answer)
983

    
984
  def DoConfdRequestReply(self, req):
985
    self.confd_counting_callback.RegisterQuery(req.rsalt)
986
    self.confd_client.SendRequest(req, async=False)
987
    while not self.confd_counting_callback.AllAnswered():
988
      if not self.confd_client.ReceiveReply():
989
        Err("Did not receive all expected confd replies")
990
        break
991

    
992
  def BurnConfd(self):
993
    """Run confd queries for our instances.
994

995
    The following confd queries are tested:
996
      - CONFD_REQ_PING: simple ping
997
      - CONFD_REQ_CLUSTER_MASTER: cluster master
998
      - CONFD_REQ_NODE_ROLE_BYNAME: node role, for the master
999

1000
    """
1001
    Log("Checking confd results")
1002

    
1003
    filter_callback = confd_client.ConfdFilterCallback(self.ConfdCallback)
1004
    counting_callback = confd_client.ConfdCountingCallback(filter_callback)
1005
    self.confd_counting_callback = counting_callback
1006

    
1007
    self.confd_client = confd_client.GetConfdClient(counting_callback)
1008

    
1009
    req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
1010
    self.DoConfdRequestReply(req)
1011

    
1012
    req = confd_client.ConfdClientRequest(
1013
      type=constants.CONFD_REQ_CLUSTER_MASTER)
1014
    self.DoConfdRequestReply(req)
1015

    
1016
    req = confd_client.ConfdClientRequest(
1017
        type=constants.CONFD_REQ_NODE_ROLE_BYNAME,
1018
        query=self.cluster_info["master"])
1019
    self.DoConfdRequestReply(req)
1020

    
1021
  def _CheckInstanceAlive(self, instance):
1022
    """Check if an instance is alive by doing http checks.
1023

1024
    This will try to retrieve the url on the instance /hostname.txt
1025
    and check that it contains the hostname of the instance. In case
1026
    we get ECONNREFUSED, we retry up to the net timeout seconds, for
1027
    any other error we abort.
1028

1029
    """
1030
    if not self.opts.http_check:
1031
      return
1032
    end_time = time.time() + self.opts.net_timeout
1033
    url = None
1034
    while time.time() < end_time and url is None:
1035
      try:
1036
        url = self.url_opener.open("http://%s/hostname.txt" % instance)
1037
      except IOError:
1038
        # here we can have connection refused, no route to host, etc.
1039
        time.sleep(1)
1040
    if url is None:
1041
      raise InstanceDown(instance, "Cannot contact instance")
1042
    hostname = url.read().strip()
1043
    url.close()
1044
    if hostname != instance:
1045
      raise InstanceDown(instance, ("Hostname mismatch, expected %s, got %s" %
1046
                                    (instance, hostname)))
1047

    
1048
  def BurninCluster(self):
1049
    """Test a cluster intensively.
1050

1051
    This will create instances and then start/stop/failover them.
1052
    It is safe for existing instances but could impact performance.
1053

1054
    """
1055

    
1056
    opts = self.opts
1057

    
1058
    Log("Testing global parameters")
1059

    
1060
    if (len(self.nodes) == 1 and
1061
        opts.disk_template not in _SINGLE_NODE_DISK_TEMPLATES):
1062
      Err("When one node is available/selected the disk template must"
1063
          " be one of %s" % utils.CommaJoin(_SINGLE_NODE_DISK_TEMPLATES))
1064

    
1065
    if opts.do_confd_tests and not constants.ENABLE_CONFD:
1066
      Err("You selected confd tests but confd was disabled at configure time")
1067

    
1068
    has_err = True
1069
    try:
1070
      self.BurnCreateInstances()
1071

    
1072
      if self.bep[constants.BE_MINMEM] < self.bep[constants.BE_MAXMEM]:
1073
        self.BurnModifyRuntimeMemory()
1074

    
1075
      if opts.do_replace1 and opts.disk_template in constants.DTS_INT_MIRROR:
1076
        self.BurnReplaceDisks1D8()
1077
      if (opts.do_replace2 and len(self.nodes) > 2 and
1078
          opts.disk_template in constants.DTS_INT_MIRROR):
1079
        self.BurnReplaceDisks2()
1080

    
1081
      if (opts.disk_template in constants.DTS_GROWABLE and
1082
          compat.any(n > 0 for n in self.disk_growth)):
1083
        self.BurnGrowDisks()
1084

    
1085
      if opts.do_failover and opts.disk_template in constants.DTS_MIRRORED:
1086
        self.BurnFailover()
1087

    
1088
      if opts.do_migrate:
1089
        if opts.disk_template not in constants.DTS_MIRRORED:
1090
          Log("Skipping migration (disk template %s does not support it)",
1091
              opts.disk_template)
1092
        elif not self.hv_can_migrate:
1093
          Log("Skipping migration (hypervisor %s does not support it)",
1094
              self.hypervisor)
1095
        else:
1096
          self.BurnMigrate()
1097

    
1098
      if (opts.do_move and len(self.nodes) > 1 and
1099
          opts.disk_template in [constants.DT_PLAIN, constants.DT_FILE]):
1100
        self.BurnMove()
1101

    
1102
      if (opts.do_importexport and
1103
          opts.disk_template in _IMPEXP_DISK_TEMPLATES):
1104
        self.BurnImportExport()
1105

    
1106
      if opts.do_reinstall:
1107
        self.BurnReinstall()
1108

    
1109
      if opts.do_reboot:
1110
        self.BurnReboot()
1111

    
1112
      if opts.do_renamesame:
1113
        self.BurnRenameSame()
1114

    
1115
      if opts.do_addremove_disks:
1116
        self.BurnAddRemoveDisks()
1117

    
1118
      default_nic_mode = self.cluster_default_nicparams[constants.NIC_MODE]
1119
      # Don't add/remove nics in routed mode, as we would need an ip to add
1120
      # them with
1121
      if opts.do_addremove_nics:
1122
        if default_nic_mode == constants.NIC_MODE_BRIDGED:
1123
          self.BurnAddRemoveNICs()
1124
        else:
1125
          Log("Skipping nic add/remove as the cluster is not in bridged mode")
1126

    
1127
      if opts.do_activate_disks:
1128
        self.BurnActivateDisks()
1129

    
1130
      if opts.rename:
1131
        self.BurnRename()
1132

    
1133
      if opts.do_confd_tests:
1134
        self.BurnConfd()
1135

    
1136
      if opts.do_startstop:
1137
        self.BurnStopStart()
1138

    
1139
      has_err = False
1140
    finally:
1141
      if has_err:
1142
        Log("Error detected: opcode buffer follows:\n\n")
1143
        Log(self.GetFeedbackBuf())
1144
        Log("\n\n")
1145
      if not self.opts.keep_instances:
1146
        try:
1147
          self.BurnRemove()
1148
        except Exception, err:  # pylint: disable=W0703
1149
          if has_err: # already detected errors, so errors in removal
1150
                      # are quite expected
1151
            Log("Note: error detected during instance remove: %s", err)
1152
          else: # non-expected error
1153
            raise
1154

    
1155
    return constants.EXIT_SUCCESS
1156

    
1157

    
1158
def Main():
1159
  """Main function.
1160

1161
  """
1162
  utils.SetupLogging(pathutils.LOG_BURNIN, sys.argv[0],
1163
                     debug=False, stderr_logging=True)
1164

    
1165
  return Burner().BurninCluster()