Statistics
| Branch: | Tag: | Revision:

root / lib / tools / burnin.py @ 94d5cee9

History | View | Annotate | Download (41.5 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Burnin program
23

24
"""
25

    
26
import sys
27
import optparse
28
import time
29
import socket
30
import urllib
31
from itertools import izip, islice, cycle
32
from cStringIO import StringIO
33

    
34
from ganeti import opcodes
35
from ganeti import constants
36
from ganeti import cli
37
from ganeti import errors
38
from ganeti import utils
39
from ganeti import hypervisor
40
from ganeti import compat
41
from ganeti import pathutils
42

    
43
from ganeti.confd import client as confd_client
44

    
45

    
46
USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...")
47

    
48
MAX_RETRIES = 3
49
LOG_HEADERS = {
50
  0: "- ",
51
  1: "* ",
52
  2: "",
53
  }
54

    
55
#: Disk templates supporting a single node
56
_SINGLE_NODE_DISK_TEMPLATES = compat.UniqueFrozenset([
57
  constants.DT_DISKLESS,
58
  constants.DT_PLAIN,
59
  constants.DT_FILE,
60
  constants.DT_SHARED_FILE,
61
  constants.DT_EXT,
62
  constants.DT_RBD,
63
  ])
64

    
65
_SUPPORTED_DISK_TEMPLATES = compat.UniqueFrozenset([
66
  constants.DT_DISKLESS,
67
  constants.DT_DRBD8,
68
  constants.DT_EXT,
69
  constants.DT_FILE,
70
  constants.DT_PLAIN,
71
  constants.DT_RBD,
72
  constants.DT_SHARED_FILE,
73
  ])
74

    
75
#: Disk templates for which import/export is tested
76
_IMPEXP_DISK_TEMPLATES = (_SUPPORTED_DISK_TEMPLATES - frozenset([
77
  constants.DT_DISKLESS,
78
  constants.DT_FILE,
79
  constants.DT_SHARED_FILE,
80
  ]))
81

    
82

    
83
class InstanceDown(Exception):
84
  """The checked instance was not up"""
85

    
86

    
87
class BurninFailure(Exception):
88
  """Failure detected during burning"""
89

    
90

    
91
def Usage():
92
  """Shows program usage information and exits the program."""
93

    
94
  print >> sys.stderr, "Usage:"
95
  print >> sys.stderr, USAGE
96
  sys.exit(2)
97

    
98

    
99
def Log(msg, *args, **kwargs):
100
  """Simple function that prints out its argument.
101

102
  """
103
  if args:
104
    msg = msg % args
105
  indent = kwargs.get("indent", 0)
106
  sys.stdout.write("%*s%s%s\n" % (2 * indent, "",
107
                                  LOG_HEADERS.get(indent, "  "), msg))
108
  sys.stdout.flush()
109

    
110

    
111
def Err(msg, exit_code=1):
112
  """Simple error logging that prints to stderr.
113

114
  """
115
  sys.stderr.write(msg + "\n")
116
  sys.stderr.flush()
117
  sys.exit(exit_code)
118

    
119

    
120
class SimpleOpener(urllib.FancyURLopener):
121
  """A simple url opener"""
122
  # pylint: disable=W0221
123

    
124
  def prompt_user_passwd(self, host, realm, clear_cache=0):
125
    """No-interaction version of prompt_user_passwd."""
126
    # we follow parent class' API
127
    # pylint: disable=W0613
128
    return None, None
129

    
130
  def http_error_default(self, url, fp, errcode, errmsg, headers):
131
    """Custom error handling"""
132
    # make sure sockets are not left in CLOSE_WAIT, this is similar
133
    # but with a different exception to the BasicURLOpener class
134
    _ = fp.read() # throw away data
135
    fp.close()
136
    raise InstanceDown("HTTP error returned: code %s, msg %s" %
137
                       (errcode, errmsg))
138

    
139

    
140
OPTIONS = [
141
  cli.cli_option("-o", "--os", dest="os", default=None,
142
                 help="OS to use during burnin",
143
                 metavar="<OS>",
144
                 completion_suggest=cli.OPT_COMPL_ONE_OS),
145
  cli.HYPERVISOR_OPT,
146
  cli.OSPARAMS_OPT,
147
  cli.cli_option("--disk-size", dest="disk_size",
148
                 help="Disk size (determines disk count)",
149
                 default="128m", type="string", metavar="<size,size,...>",
150
                 completion_suggest=("128M 512M 1G 4G 1G,256M"
151
                                     " 4G,1G,1G 10G").split()),
152
  cli.cli_option("--disk-growth", dest="disk_growth", help="Disk growth",
153
                 default="128m", type="string", metavar="<size,size,...>"),
154
  cli.cli_option("--mem-size", dest="mem_size", help="Memory size",
155
                 default=None, type="unit", metavar="<size>",
156
                 completion_suggest=("128M 256M 512M 1G 4G 8G"
157
                                     " 12G 16G").split()),
158
  cli.cli_option("--maxmem-size", dest="maxmem_size", help="Max Memory size",
159
                 default=256, type="unit", metavar="<size>",
160
                 completion_suggest=("128M 256M 512M 1G 4G 8G"
161
                                     " 12G 16G").split()),
162
  cli.cli_option("--minmem-size", dest="minmem_size", help="Min Memory size",
163
                 default=128, type="unit", metavar="<size>",
164
                 completion_suggest=("128M 256M 512M 1G 4G 8G"
165
                                     " 12G 16G").split()),
166
  cli.cli_option("--vcpu-count", dest="vcpu_count", help="VCPU count",
167
                 default=3, type="unit", metavar="<count>",
168
                 completion_suggest=("1 2 3 4").split()),
169
  cli.DEBUG_OPT,
170
  cli.VERBOSE_OPT,
171
  cli.NOIPCHECK_OPT,
172
  cli.NONAMECHECK_OPT,
173
  cli.EARLY_RELEASE_OPT,
174
  cli.cli_option("--no-replace1", dest="do_replace1",
175
                 help="Skip disk replacement with the same secondary",
176
                 action="store_false", default=True),
177
  cli.cli_option("--no-replace2", dest="do_replace2",
178
                 help="Skip disk replacement with a different secondary",
179
                 action="store_false", default=True),
180
  cli.cli_option("--no-failover", dest="do_failover",
181
                 help="Skip instance failovers", action="store_false",
182
                 default=True),
183
  cli.cli_option("--no-migrate", dest="do_migrate",
184
                 help="Skip instance live migration",
185
                 action="store_false", default=True),
186
  cli.cli_option("--no-move", dest="do_move",
187
                 help="Skip instance moves", action="store_false",
188
                 default=True),
189
  cli.cli_option("--no-importexport", dest="do_importexport",
190
                 help="Skip instance export/import", action="store_false",
191
                 default=True),
192
  cli.cli_option("--no-startstop", dest="do_startstop",
193
                 help="Skip instance stop/start", action="store_false",
194
                 default=True),
195
  cli.cli_option("--no-reinstall", dest="do_reinstall",
196
                 help="Skip instance reinstall", action="store_false",
197
                 default=True),
198
  cli.cli_option("--no-reboot", dest="do_reboot",
199
                 help="Skip instance reboot", action="store_false",
200
                 default=True),
201
  cli.cli_option("--no-renamesame", dest="do_renamesame",
202
                 help="Skip instance rename to same name", action="store_false",
203
                 default=True),
204
  cli.cli_option("--reboot-types", dest="reboot_types",
205
                 help="Specify the reboot types", default=None),
206
  cli.cli_option("--no-activate-disks", dest="do_activate_disks",
207
                 help="Skip disk activation/deactivation",
208
                 action="store_false", default=True),
209
  cli.cli_option("--no-add-disks", dest="do_addremove_disks",
210
                 help="Skip disk addition/removal",
211
                 action="store_false", default=True),
212
  cli.cli_option("--no-add-nics", dest="do_addremove_nics",
213
                 help="Skip NIC addition/removal",
214
                 action="store_false", default=True),
215
  cli.cli_option("--no-nics", dest="nics",
216
                 help="No network interfaces", action="store_const",
217
                 const=[], default=[{}]),
218
  cli.cli_option("--no-confd", dest="do_confd_tests",
219
                 help="Skip confd queries",
220
                 action="store_false", default=constants.ENABLE_CONFD),
221
  cli.cli_option("--rename", dest="rename", default=None,
222
                 help=("Give one unused instance name which is taken"
223
                       " to start the renaming sequence"),
224
                 metavar="<instance_name>"),
225
  cli.cli_option("-t", "--disk-template", dest="disk_template",
226
                 choices=list(_SUPPORTED_DISK_TEMPLATES),
227
                 default=constants.DT_DRBD8,
228
                 help=("Disk template (default %s, otherwise one of %s)" %
229
                       (constants.DT_DRBD8,
230
                        utils.CommaJoin(_SUPPORTED_DISK_TEMPLATES)))),
231
  cli.cli_option("-n", "--nodes", dest="nodes", default="",
232
                 help=("Comma separated list of nodes to perform"
233
                       " the burnin on (defaults to all nodes)"),
234
                 completion_suggest=cli.OPT_COMPL_MANY_NODES),
235
  cli.cli_option("-I", "--iallocator", dest="iallocator",
236
                 default=None, type="string",
237
                 help=("Perform the allocation using an iallocator"
238
                       " instead of fixed node spread (node restrictions no"
239
                       " longer apply, therefore -n/--nodes must not be"
240
                       " used"),
241
                 completion_suggest=cli.OPT_COMPL_ONE_IALLOCATOR),
242
  cli.cli_option("-p", "--parallel", default=False, action="store_true",
243
                 dest="parallel",
244
                 help=("Enable parallelization of some operations in"
245
                       " order to speed burnin or to test granular locking")),
246
  cli.cli_option("--net-timeout", default=15, type="int",
247
                 dest="net_timeout",
248
                 help=("The instance check network timeout in seconds"
249
                       " (defaults to 15 seconds)"),
250
                 completion_suggest="15 60 300 900".split()),
251
  cli.cli_option("-C", "--http-check", default=False, action="store_true",
252
                 dest="http_check",
253
                 help=("Enable checking of instance status via http,"
254
                       " looking for /hostname.txt that should contain the"
255
                       " name of the instance")),
256
  cli.cli_option("-K", "--keep-instances", default=False,
257
                 action="store_true",
258
                 dest="keep_instances",
259
                 help=("Leave instances on the cluster after burnin,"
260
                       " for investigation in case of errors or simply"
261
                       " to use them")),
262
  ]
263

    
264
# Mainly used for bash completion
265
ARGUMENTS = [cli.ArgInstance(min=1)]
266

    
267

    
268
def _DoCheckInstances(fn):
269
  """Decorator for checking instances.
270

271
  """
272
  def wrapper(self, *args, **kwargs):
273
    val = fn(self, *args, **kwargs)
274
    for instance in self.instances:
275
      self._CheckInstanceAlive(instance) # pylint: disable=W0212
276
    return val
277

    
278
  return wrapper
279

    
280

    
281
def _DoBatch(retry):
282
  """Decorator for possible batch operations.
283

284
  Must come after the _DoCheckInstances decorator (if any).
285

286
  @param retry: whether this is a retryable batch, will be
287
      passed to StartBatch
288

289
  """
290
  def wrap(fn):
291
    def batched(self, *args, **kwargs):
292
      self.StartBatch(retry)
293
      val = fn(self, *args, **kwargs)
294
      self.CommitQueue()
295
      return val
296
    return batched
297

    
298
  return wrap
299

    
300

    
301
class Burner(object):
302
  """Burner class."""
303

    
304
  def __init__(self):
305
    """Constructor."""
306
    self.url_opener = SimpleOpener()
307
    self._feed_buf = StringIO()
308
    self.nodes = []
309
    self.instances = []
310
    self.to_rem = []
311
    self.queued_ops = []
312
    self.opts = None
313
    self.queue_retry = False
314
    self.disk_count = self.disk_growth = self.disk_size = None
315
    self.hvp = self.bep = None
316
    self.ParseOptions()
317
    self.cl = cli.GetClient()
318
    self.GetState()
319

    
320
  def ClearFeedbackBuf(self):
321
    """Clear the feedback buffer."""
322
    self._feed_buf.truncate(0)
323

    
324
  def GetFeedbackBuf(self):
325
    """Return the contents of the buffer."""
326
    return self._feed_buf.getvalue()
327

    
328
  def Feedback(self, msg):
329
    """Acumulate feedback in our buffer."""
330
    formatted_msg = "%s %s" % (time.ctime(utils.MergeTime(msg[0])), msg[2])
331
    self._feed_buf.write(formatted_msg + "\n")
332
    if self.opts.verbose:
333
      Log(formatted_msg, indent=3)
334

    
335
  def MaybeRetry(self, retry_count, msg, fn, *args):
336
    """Possibly retry a given function execution.
337

338
    @type retry_count: int
339
    @param retry_count: retry counter:
340
        - 0: non-retryable action
341
        - 1: last retry for a retryable action
342
        - MAX_RETRIES: original try for a retryable action
343
    @type msg: str
344
    @param msg: the kind of the operation
345
    @type fn: callable
346
    @param fn: the function to be called
347

348
    """
349
    try:
350
      val = fn(*args)
351
      if retry_count > 0 and retry_count < MAX_RETRIES:
352
        Log("Idempotent %s succeeded after %d retries",
353
            msg, MAX_RETRIES - retry_count)
354
      return val
355
    except Exception, err: # pylint: disable=W0703
356
      if retry_count == 0:
357
        Log("Non-idempotent %s failed, aborting", msg)
358
        raise
359
      elif retry_count == 1:
360
        Log("Idempotent %s repeated failure, aborting", msg)
361
        raise
362
      else:
363
        Log("Idempotent %s failed, retry #%d/%d: %s",
364
            msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err)
365
        self.MaybeRetry(retry_count - 1, msg, fn, *args)
366

    
367
  def _ExecOp(self, *ops):
368
    """Execute one or more opcodes and manage the exec buffer.
369

370
    @return: if only opcode has been passed, we return its result;
371
        otherwise we return the list of results
372

373
    """
374
    job_id = cli.SendJob(ops, cl=self.cl)
375
    results = cli.PollJob(job_id, cl=self.cl, feedback_fn=self.Feedback)
376
    if len(ops) == 1:
377
      return results[0]
378
    else:
379
      return results
380

    
381
  def ExecOp(self, retry, *ops):
382
    """Execute one or more opcodes and manage the exec buffer.
383

384
    @return: if only opcode has been passed, we return its result;
385
        otherwise we return the list of results
386

387
    """
388
    if retry:
389
      rval = MAX_RETRIES
390
    else:
391
      rval = 0
392
    cli.SetGenericOpcodeOpts(ops, self.opts)
393
    return self.MaybeRetry(rval, "opcode", self._ExecOp, *ops)
394

    
395
  def ExecOrQueue(self, name, ops, post_process=None):
396
    """Execute an opcode and manage the exec buffer."""
397
    if self.opts.parallel:
398
      cli.SetGenericOpcodeOpts(ops, self.opts)
399
      self.queued_ops.append((ops, name, post_process))
400
    else:
401
      val = self.ExecOp(self.queue_retry, *ops) # pylint: disable=W0142
402
      if post_process is not None:
403
        post_process()
404
      return val
405

    
406
  def StartBatch(self, retry):
407
    """Start a new batch of jobs.
408

409
    @param retry: whether this is a retryable batch
410

411
    """
412
    self.queued_ops = []
413
    self.queue_retry = retry
414

    
415
  def CommitQueue(self):
416
    """Execute all submitted opcodes in case of parallel burnin"""
417
    if not self.opts.parallel or not self.queued_ops:
418
      return
419

    
420
    if self.queue_retry:
421
      rval = MAX_RETRIES
422
    else:
423
      rval = 0
424

    
425
    try:
426
      results = self.MaybeRetry(rval, "jobset", self.ExecJobSet,
427
                                self.queued_ops)
428
    finally:
429
      self.queued_ops = []
430
    return results
431

    
432
  def ExecJobSet(self, jobs):
433
    """Execute a set of jobs and return once all are done.
434

435
    The method will return the list of results, if all jobs are
436
    successful. Otherwise, OpExecError will be raised from within
437
    cli.py.
438

439
    """
440
    self.ClearFeedbackBuf()
441
    jex = cli.JobExecutor(cl=self.cl, feedback_fn=self.Feedback)
442
    for ops, name, _ in jobs:
443
      jex.QueueJob(name, *ops) # pylint: disable=W0142
444
    try:
445
      results = jex.GetResults()
446
    except Exception, err: # pylint: disable=W0703
447
      Log("Jobs failed: %s", err)
448
      raise BurninFailure()
449

    
450
    fail = False
451
    val = []
452
    for (_, name, post_process), (success, result) in zip(jobs, results):
453
      if success:
454
        if post_process:
455
          try:
456
            post_process()
457
          except Exception, err: # pylint: disable=W0703
458
            Log("Post process call for job %s failed: %s", name, err)
459
            fail = True
460
        val.append(result)
461
      else:
462
        fail = True
463

    
464
    if fail:
465
      raise BurninFailure()
466

    
467
    return val
468

    
469
  def ParseOptions(self):
470
    """Parses the command line options.
471

472
    In case of command line errors, it will show the usage and exit the
473
    program.
474

475
    """
476
    parser = optparse.OptionParser(usage="\n%s" % USAGE,
477
                                   version=("%%prog (ganeti) %s" %
478
                                            constants.RELEASE_VERSION),
479
                                   option_list=OPTIONS)
480

    
481
    options, args = parser.parse_args()
482
    if len(args) < 1 or options.os is None:
483
      Usage()
484

    
485
    if options.mem_size:
486
      options.maxmem_size = options.mem_size
487
      options.minmem_size = options.mem_size
488
    elif options.minmem_size > options.maxmem_size:
489
      Err("Maximum memory lower than minimum memory")
490

    
491
    if options.disk_template not in _SUPPORTED_DISK_TEMPLATES:
492
      Err("Unknown or unsupported disk template '%s'" % options.disk_template)
493

    
494
    if options.disk_template == constants.DT_DISKLESS:
495
      disk_size = disk_growth = []
496
      options.do_addremove_disks = False
497
    else:
498
      disk_size = [utils.ParseUnit(v) for v in options.disk_size.split(",")]
499
      disk_growth = [utils.ParseUnit(v)
500
                     for v in options.disk_growth.split(",")]
501
      if len(disk_growth) != len(disk_size):
502
        Err("Wrong disk sizes/growth combination")
503
    if ((disk_size and options.disk_template == constants.DT_DISKLESS) or
504
        (not disk_size and options.disk_template != constants.DT_DISKLESS)):
505
      Err("Wrong disk count/disk template combination")
506

    
507
    self.disk_size = disk_size
508
    self.disk_growth = disk_growth
509
    self.disk_count = len(disk_size)
510

    
511
    if options.nodes and options.iallocator:
512
      Err("Give either the nodes option or the iallocator option, not both")
513

    
514
    if options.http_check and not options.name_check:
515
      Err("Can't enable HTTP checks without name checks")
516

    
517
    self.opts = options
518
    self.instances = args
519
    self.bep = {
520
      constants.BE_MINMEM: options.minmem_size,
521
      constants.BE_MAXMEM: options.maxmem_size,
522
      constants.BE_VCPUS: options.vcpu_count,
523
      }
524

    
525
    self.hypervisor = None
526
    self.hvp = {}
527
    if options.hypervisor:
528
      self.hypervisor, self.hvp = options.hypervisor
529

    
530
    if options.reboot_types is None:
531
      options.reboot_types = constants.REBOOT_TYPES
532
    else:
533
      options.reboot_types = options.reboot_types.split(",")
534
      rt_diff = set(options.reboot_types).difference(constants.REBOOT_TYPES)
535
      if rt_diff:
536
        Err("Invalid reboot types specified: %s" % utils.CommaJoin(rt_diff))
537

    
538
    socket.setdefaulttimeout(options.net_timeout)
539

    
540
  def GetState(self):
541
    """Read the cluster state from the master daemon."""
542
    if self.opts.nodes:
543
      names = self.opts.nodes.split(",")
544
    else:
545
      names = []
546
    try:
547
      op = opcodes.OpNodeQuery(output_fields=["name", "offline", "drained"],
548
                               names=names, use_locking=True)
549
      result = self.ExecOp(True, op)
550
    except errors.GenericError, err:
551
      err_code, msg = cli.FormatError(err)
552
      Err(msg, exit_code=err_code)
553
    self.nodes = [data[0] for data in result if not (data[1] or data[2])]
554

    
555
    op_diagnose = opcodes.OpOsDiagnose(output_fields=["name",
556
                                                      "variants",
557
                                                      "hidden"],
558
                                       names=[])
559
    result = self.ExecOp(True, op_diagnose)
560

    
561
    if not result:
562
      Err("Can't get the OS list")
563

    
564
    found = False
565
    for (name, variants, _) in result:
566
      if self.opts.os in cli.CalculateOSNames(name, variants):
567
        found = True
568
        break
569

    
570
    if not found:
571
      Err("OS '%s' not found" % self.opts.os)
572

    
573
    cluster_info = self.cl.QueryClusterInfo()
574
    self.cluster_info = cluster_info
575
    if not self.cluster_info:
576
      Err("Can't get cluster info")
577

    
578
    default_nic_params = self.cluster_info["nicparams"][constants.PP_DEFAULT]
579
    self.cluster_default_nicparams = default_nic_params
580
    if self.hypervisor is None:
581
      self.hypervisor = self.cluster_info["default_hypervisor"]
582
    self.hv_can_migrate = \
583
      hypervisor.GetHypervisorClass(self.hypervisor).CAN_MIGRATE
584

    
585
  @_DoCheckInstances
586
  @_DoBatch(False)
587
  def BurnCreateInstances(self):
588
    """Create the given instances.
589

590
    """
591
    self.to_rem = []
592
    mytor = izip(cycle(self.nodes),
593
                 islice(cycle(self.nodes), 1, None),
594
                 self.instances)
595

    
596
    Log("Creating instances")
597
    for pnode, snode, instance in mytor:
598
      Log("instance %s", instance, indent=1)
599
      if self.opts.iallocator:
600
        pnode = snode = None
601
        msg = "with iallocator %s" % self.opts.iallocator
602
      elif self.opts.disk_template not in constants.DTS_INT_MIRROR:
603
        snode = None
604
        msg = "on %s" % pnode
605
      else:
606
        msg = "on %s, %s" % (pnode, snode)
607

    
608
      Log(msg, indent=2)
609

    
610
      op = opcodes.OpInstanceCreate(instance_name=instance,
611
                                    disks=[{"size": size}
612
                                           for size in self.disk_size],
613
                                    disk_template=self.opts.disk_template,
614
                                    nics=self.opts.nics,
615
                                    mode=constants.INSTANCE_CREATE,
616
                                    os_type=self.opts.os,
617
                                    pnode=pnode,
618
                                    snode=snode,
619
                                    start=True,
620
                                    ip_check=self.opts.ip_check,
621
                                    name_check=self.opts.name_check,
622
                                    wait_for_sync=True,
623
                                    file_driver="loop",
624
                                    file_storage_dir=None,
625
                                    iallocator=self.opts.iallocator,
626
                                    beparams=self.bep,
627
                                    hvparams=self.hvp,
628
                                    hypervisor=self.hypervisor,
629
                                    osparams=self.opts.osparams,
630
                                    )
631
      remove_instance = lambda name: lambda: self.to_rem.append(name)
632
      self.ExecOrQueue(instance, [op], post_process=remove_instance(instance))
633

    
634
  @_DoBatch(False)
635
  def BurnModifyRuntimeMemory(self):
636
    """Alter the runtime memory."""
637
    Log("Setting instance runtime memory")
638
    for instance in self.instances:
639
      Log("instance %s", instance, indent=1)
640
      tgt_mem = self.bep[constants.BE_MINMEM]
641
      op = opcodes.OpInstanceSetParams(instance_name=instance,
642
                                       runtime_mem=tgt_mem)
643
      Log("Set memory to %s MB", tgt_mem, indent=2)
644
      self.ExecOrQueue(instance, [op])
645

    
646
  @_DoBatch(False)
647
  def BurnGrowDisks(self):
648
    """Grow both the os and the swap disks by the requested amount, if any."""
649
    Log("Growing disks")
650
    for instance in self.instances:
651
      Log("instance %s", instance, indent=1)
652
      for idx, growth in enumerate(self.disk_growth):
653
        if growth > 0:
654
          op = opcodes.OpInstanceGrowDisk(instance_name=instance, disk=idx,
655
                                          amount=growth, wait_for_sync=True)
656
          Log("increase disk/%s by %s MB", idx, growth, indent=2)
657
          self.ExecOrQueue(instance, [op])
658

    
659
  @_DoBatch(True)
660
  def BurnReplaceDisks1D8(self):
661
    """Replace disks on primary and secondary for drbd8."""
662
    Log("Replacing disks on the same nodes")
663
    early_release = self.opts.early_release
664
    for instance in self.instances:
665
      Log("instance %s", instance, indent=1)
666
      ops = []
667
      for mode in constants.REPLACE_DISK_SEC, constants.REPLACE_DISK_PRI:
668
        op = opcodes.OpInstanceReplaceDisks(instance_name=instance,
669
                                            mode=mode,
670
                                            disks=list(range(self.disk_count)),
671
                                            early_release=early_release)
672
        Log("run %s", mode, indent=2)
673
        ops.append(op)
674
      self.ExecOrQueue(instance, ops)
675

    
676
  @_DoBatch(True)
677
  def BurnReplaceDisks2(self):
678
    """Replace secondary node."""
679
    Log("Changing the secondary node")
680
    mode = constants.REPLACE_DISK_CHG
681

    
682
    mytor = izip(islice(cycle(self.nodes), 2, None),
683
                 self.instances)
684
    for tnode, instance in mytor:
685
      Log("instance %s", instance, indent=1)
686
      if self.opts.iallocator:
687
        tnode = None
688
        msg = "with iallocator %s" % self.opts.iallocator
689
      else:
690
        msg = tnode
691
      op = opcodes.OpInstanceReplaceDisks(instance_name=instance,
692
                                          mode=mode,
693
                                          remote_node=tnode,
694
                                          iallocator=self.opts.iallocator,
695
                                          disks=[],
696
                                          early_release=self.opts.early_release)
697
      Log("run %s %s", mode, msg, indent=2)
698
      self.ExecOrQueue(instance, [op])
699

    
700
  @_DoCheckInstances
701
  @_DoBatch(False)
702
  def BurnFailover(self):
703
    """Failover the instances."""
704
    Log("Failing over instances")
705
    for instance in self.instances:
706
      Log("instance %s", instance, indent=1)
707
      op = opcodes.OpInstanceFailover(instance_name=instance,
708
                                      ignore_consistency=False)
709
      self.ExecOrQueue(instance, [op])
710

    
711
  @_DoCheckInstances
712
  @_DoBatch(False)
713
  def BurnMove(self):
714
    """Move the instances."""
715
    Log("Moving instances")
716
    mytor = izip(islice(cycle(self.nodes), 1, None),
717
                 self.instances)
718
    for tnode, instance in mytor:
719
      Log("instance %s", instance, indent=1)
720
      op = opcodes.OpInstanceMove(instance_name=instance,
721
                                  target_node=tnode)
722
      self.ExecOrQueue(instance, [op])
723

    
724
  @_DoBatch(False)
725
  def BurnMigrate(self):
726
    """Migrate the instances."""
727
    Log("Migrating instances")
728
    for instance in self.instances:
729
      Log("instance %s", instance, indent=1)
730
      op1 = opcodes.OpInstanceMigrate(instance_name=instance, mode=None,
731
                                      cleanup=False)
732

    
733
      op2 = opcodes.OpInstanceMigrate(instance_name=instance, mode=None,
734
                                      cleanup=True)
735
      Log("migration and migration cleanup", indent=2)
736
      self.ExecOrQueue(instance, [op1, op2])
737

    
738
  @_DoCheckInstances
739
  @_DoBatch(False)
740
  def BurnImportExport(self):
741
    """Export the instance, delete it, and import it back.
742

743
    """
744
    Log("Exporting and re-importing instances")
745
    mytor = izip(cycle(self.nodes),
746
                 islice(cycle(self.nodes), 1, None),
747
                 islice(cycle(self.nodes), 2, None),
748
                 self.instances)
749

    
750
    for pnode, snode, enode, instance in mytor:
751
      Log("instance %s", instance, indent=1)
752
      # read the full name of the instance
753
      nam_op = opcodes.OpInstanceQuery(output_fields=["name"],
754
                                       names=[instance], use_locking=True)
755
      full_name = self.ExecOp(False, nam_op)[0][0]
756

    
757
      if self.opts.iallocator:
758
        pnode = snode = None
759
        import_log_msg = ("import from %s"
760
                          " with iallocator %s" %
761
                          (enode, self.opts.iallocator))
762
      elif self.opts.disk_template not in constants.DTS_INT_MIRROR:
763
        snode = None
764
        import_log_msg = ("import from %s to %s" %
765
                          (enode, pnode))
766
      else:
767
        import_log_msg = ("import from %s to %s, %s" %
768
                          (enode, pnode, snode))
769

    
770
      exp_op = opcodes.OpBackupExport(instance_name=instance,
771
                                      target_node=enode,
772
                                      mode=constants.EXPORT_MODE_LOCAL,
773
                                      shutdown=True)
774
      rem_op = opcodes.OpInstanceRemove(instance_name=instance,
775
                                        ignore_failures=True)
776
      imp_dir = utils.PathJoin(pathutils.EXPORT_DIR, full_name)
777
      imp_op = opcodes.OpInstanceCreate(instance_name=instance,
778
                                        disks=[{"size": size}
779
                                               for size in self.disk_size],
780
                                        disk_template=self.opts.disk_template,
781
                                        nics=self.opts.nics,
782
                                        mode=constants.INSTANCE_IMPORT,
783
                                        src_node=enode,
784
                                        src_path=imp_dir,
785
                                        pnode=pnode,
786
                                        snode=snode,
787
                                        start=True,
788
                                        ip_check=self.opts.ip_check,
789
                                        name_check=self.opts.name_check,
790
                                        wait_for_sync=True,
791
                                        file_storage_dir=None,
792
                                        file_driver="loop",
793
                                        iallocator=self.opts.iallocator,
794
                                        beparams=self.bep,
795
                                        hvparams=self.hvp,
796
                                        osparams=self.opts.osparams,
797
                                        )
798

    
799
      erem_op = opcodes.OpBackupRemove(instance_name=instance)
800

    
801
      Log("export to node %s", enode, indent=2)
802
      Log("remove instance", indent=2)
803
      Log(import_log_msg, indent=2)
804
      Log("remove export", indent=2)
805
      self.ExecOrQueue(instance, [exp_op, rem_op, imp_op, erem_op])
806

    
807
  @staticmethod
808
  def StopInstanceOp(instance):
809
    """Stop given instance."""
810
    return opcodes.OpInstanceShutdown(instance_name=instance)
811

    
812
  @staticmethod
813
  def StartInstanceOp(instance):
814
    """Start given instance."""
815
    return opcodes.OpInstanceStartup(instance_name=instance, force=False)
816

    
817
  @staticmethod
818
  def RenameInstanceOp(instance, instance_new):
819
    """Rename instance."""
820
    return opcodes.OpInstanceRename(instance_name=instance,
821
                                    new_name=instance_new)
822

    
823
  @_DoCheckInstances
824
  @_DoBatch(True)
825
  def BurnStopStart(self):
826
    """Stop/start the instances."""
827
    Log("Stopping and starting instances")
828
    for instance in self.instances:
829
      Log("instance %s", instance, indent=1)
830
      op1 = self.StopInstanceOp(instance)
831
      op2 = self.StartInstanceOp(instance)
832
      self.ExecOrQueue(instance, [op1, op2])
833

    
834
  @_DoBatch(False)
835
  def BurnRemove(self):
836
    """Remove the instances."""
837
    Log("Removing instances")
838
    for instance in self.to_rem:
839
      Log("instance %s", instance, indent=1)
840
      op = opcodes.OpInstanceRemove(instance_name=instance,
841
                                    ignore_failures=True)
842
      self.ExecOrQueue(instance, [op])
843

    
844
  def BurnRename(self):
845
    """Rename the instances.
846

847
    Note that this function will not execute in parallel, since we
848
    only have one target for rename.
849

850
    """
851
    Log("Renaming instances")
852
    rename = self.opts.rename
853
    for instance in self.instances:
854
      Log("instance %s", instance, indent=1)
855
      op_stop1 = self.StopInstanceOp(instance)
856
      op_stop2 = self.StopInstanceOp(rename)
857
      op_rename1 = self.RenameInstanceOp(instance, rename)
858
      op_rename2 = self.RenameInstanceOp(rename, instance)
859
      op_start1 = self.StartInstanceOp(rename)
860
      op_start2 = self.StartInstanceOp(instance)
861
      self.ExecOp(False, op_stop1, op_rename1, op_start1)
862
      self._CheckInstanceAlive(rename)
863
      self.ExecOp(False, op_stop2, op_rename2, op_start2)
864
      self._CheckInstanceAlive(instance)
865

    
866
  @_DoCheckInstances
867
  @_DoBatch(True)
868
  def BurnReinstall(self):
869
    """Reinstall the instances."""
870
    Log("Reinstalling instances")
871
    for instance in self.instances:
872
      Log("instance %s", instance, indent=1)
873
      op1 = self.StopInstanceOp(instance)
874
      op2 = opcodes.OpInstanceReinstall(instance_name=instance)
875
      Log("reinstall without passing the OS", indent=2)
876
      op3 = opcodes.OpInstanceReinstall(instance_name=instance,
877
                                        os_type=self.opts.os)
878
      Log("reinstall specifying the OS", indent=2)
879
      op4 = self.StartInstanceOp(instance)
880
      self.ExecOrQueue(instance, [op1, op2, op3, op4])
881

    
882
  @_DoCheckInstances
883
  @_DoBatch(True)
884
  def BurnReboot(self):
885
    """Reboot the instances."""
886
    Log("Rebooting instances")
887
    for instance in self.instances:
888
      Log("instance %s", instance, indent=1)
889
      ops = []
890
      for reboot_type in self.opts.reboot_types:
891
        op = opcodes.OpInstanceReboot(instance_name=instance,
892
                                      reboot_type=reboot_type,
893
                                      ignore_secondaries=False)
894
        Log("reboot with type '%s'", reboot_type, indent=2)
895
        ops.append(op)
896
      self.ExecOrQueue(instance, ops)
897

    
898
  @_DoCheckInstances
899
  @_DoBatch(True)
900
  def BurnRenameSame(self):
901
    """Rename the instances to their own name."""
902
    Log("Renaming the instances to their own name")
903
    for instance in self.instances:
904
      Log("instance %s", instance, indent=1)
905
      op1 = self.StopInstanceOp(instance)
906
      op2 = self.RenameInstanceOp(instance, instance)
907
      Log("rename to the same name", indent=2)
908
      op4 = self.StartInstanceOp(instance)
909
      self.ExecOrQueue(instance, [op1, op2, op4])
910

    
911
  @_DoCheckInstances
912
  @_DoBatch(True)
913
  def BurnActivateDisks(self):
914
    """Activate and deactivate disks of the instances."""
915
    Log("Activating/deactivating disks")
916
    for instance in self.instances:
917
      Log("instance %s", instance, indent=1)
918
      op_start = self.StartInstanceOp(instance)
919
      op_act = opcodes.OpInstanceActivateDisks(instance_name=instance)
920
      op_deact = opcodes.OpInstanceDeactivateDisks(instance_name=instance)
921
      op_stop = self.StopInstanceOp(instance)
922
      Log("activate disks when online", indent=2)
923
      Log("activate disks when offline", indent=2)
924
      Log("deactivate disks (when offline)", indent=2)
925
      self.ExecOrQueue(instance, [op_act, op_stop, op_act, op_deact, op_start])
926

    
927
  @_DoCheckInstances
928
  @_DoBatch(False)
929
  def BurnAddRemoveDisks(self):
930
    """Add and remove an extra disk for the instances."""
931
    Log("Adding and removing disks")
932
    for instance in self.instances:
933
      Log("instance %s", instance, indent=1)
934
      op_add = opcodes.OpInstanceSetParams(
935
        instance_name=instance,
936
        disks=[(constants.DDM_ADD, {"size": self.disk_size[0]})])
937
      op_rem = opcodes.OpInstanceSetParams(
938
        instance_name=instance, disks=[(constants.DDM_REMOVE, {})])
939
      op_stop = self.StopInstanceOp(instance)
940
      op_start = self.StartInstanceOp(instance)
941
      Log("adding a disk", indent=2)
942
      Log("removing last disk", indent=2)
943
      self.ExecOrQueue(instance, [op_add, op_stop, op_rem, op_start])
944

    
945
  @_DoBatch(False)
946
  def BurnAddRemoveNICs(self):
947
    """Add, change and remove an extra NIC for the instances."""
948
    Log("Adding and removing NICs")
949
    for instance in self.instances:
950
      Log("instance %s", instance, indent=1)
951
      op_add = opcodes.OpInstanceSetParams(
952
        instance_name=instance, nics=[(constants.DDM_ADD, {})])
953
      op_chg = opcodes.OpInstanceSetParams(
954
        instance_name=instance, nics=[(constants.DDM_MODIFY,
955
                                       -1, {"mac": constants.VALUE_GENERATE})])
956
      op_rem = opcodes.OpInstanceSetParams(
957
        instance_name=instance, nics=[(constants.DDM_REMOVE, {})])
958
      Log("adding a NIC", indent=2)
959
      Log("changing a NIC", indent=2)
960
      Log("removing last NIC", indent=2)
961
      self.ExecOrQueue(instance, [op_add, op_chg, op_rem])
962

    
963
  def ConfdCallback(self, reply):
964
    """Callback for confd queries"""
965
    if reply.type == confd_client.UPCALL_REPLY:
966
      if reply.server_reply.status != constants.CONFD_REPL_STATUS_OK:
967
        Err("Query %s gave non-ok status %s: %s" % (reply.orig_request,
968
                                                    reply.server_reply.status,
969
                                                    reply.server_reply))
970
      if reply.orig_request.type == constants.CONFD_REQ_PING:
971
        Log("Ping: OK", indent=1)
972
      elif reply.orig_request.type == constants.CONFD_REQ_CLUSTER_MASTER:
973
        if reply.server_reply.answer == self.cluster_info["master"]:
974
          Log("Master: OK", indent=1)
975
        else:
976
          Err("Master: wrong: %s" % reply.server_reply.answer)
977
      elif reply.orig_request.type == constants.CONFD_REQ_NODE_ROLE_BYNAME:
978
        if reply.server_reply.answer == constants.CONFD_NODE_ROLE_MASTER:
979
          Log("Node role for master: OK", indent=1)
980
        else:
981
          Err("Node role for master: wrong: %s" % reply.server_reply.answer)
982

    
983
  def DoConfdRequestReply(self, req):
984
    self.confd_counting_callback.RegisterQuery(req.rsalt)
985
    self.confd_client.SendRequest(req, async=False)
986
    while not self.confd_counting_callback.AllAnswered():
987
      if not self.confd_client.ReceiveReply():
988
        Err("Did not receive all expected confd replies")
989
        break
990

    
991
  def BurnConfd(self):
992
    """Run confd queries for our instances.
993

994
    The following confd queries are tested:
995
      - CONFD_REQ_PING: simple ping
996
      - CONFD_REQ_CLUSTER_MASTER: cluster master
997
      - CONFD_REQ_NODE_ROLE_BYNAME: node role, for the master
998

999
    """
1000
    Log("Checking confd results")
1001

    
1002
    filter_callback = confd_client.ConfdFilterCallback(self.ConfdCallback)
1003
    counting_callback = confd_client.ConfdCountingCallback(filter_callback)
1004
    self.confd_counting_callback = counting_callback
1005

    
1006
    self.confd_client = confd_client.GetConfdClient(counting_callback)
1007

    
1008
    req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
1009
    self.DoConfdRequestReply(req)
1010

    
1011
    req = confd_client.ConfdClientRequest(
1012
      type=constants.CONFD_REQ_CLUSTER_MASTER)
1013
    self.DoConfdRequestReply(req)
1014

    
1015
    req = confd_client.ConfdClientRequest(
1016
        type=constants.CONFD_REQ_NODE_ROLE_BYNAME,
1017
        query=self.cluster_info["master"])
1018
    self.DoConfdRequestReply(req)
1019

    
1020
  def _CheckInstanceAlive(self, instance):
1021
    """Check if an instance is alive by doing http checks.
1022

1023
    This will try to retrieve the url on the instance /hostname.txt
1024
    and check that it contains the hostname of the instance. In case
1025
    we get ECONNREFUSED, we retry up to the net timeout seconds, for
1026
    any other error we abort.
1027

1028
    """
1029
    if not self.opts.http_check:
1030
      return
1031
    end_time = time.time() + self.opts.net_timeout
1032
    url = None
1033
    while time.time() < end_time and url is None:
1034
      try:
1035
        url = self.url_opener.open("http://%s/hostname.txt" % instance)
1036
      except IOError:
1037
        # here we can have connection refused, no route to host, etc.
1038
        time.sleep(1)
1039
    if url is None:
1040
      raise InstanceDown(instance, "Cannot contact instance")
1041
    hostname = url.read().strip()
1042
    url.close()
1043
    if hostname != instance:
1044
      raise InstanceDown(instance, ("Hostname mismatch, expected %s, got %s" %
1045
                                    (instance, hostname)))
1046

    
1047
  def BurninCluster(self):
1048
    """Test a cluster intensively.
1049

1050
    This will create instances and then start/stop/failover them.
1051
    It is safe for existing instances but could impact performance.
1052

1053
    """
1054

    
1055
    opts = self.opts
1056

    
1057
    Log("Testing global parameters")
1058

    
1059
    if (len(self.nodes) == 1 and
1060
        opts.disk_template not in _SINGLE_NODE_DISK_TEMPLATES):
1061
      Err("When one node is available/selected the disk template must"
1062
          " be one of %s" % utils.CommaJoin(_SINGLE_NODE_DISK_TEMPLATES))
1063

    
1064
    if opts.do_confd_tests and not constants.ENABLE_CONFD:
1065
      Err("You selected confd tests but confd was disabled at configure time")
1066

    
1067
    has_err = True
1068
    try:
1069
      self.BurnCreateInstances()
1070

    
1071
      if self.bep[constants.BE_MINMEM] < self.bep[constants.BE_MAXMEM]:
1072
        self.BurnModifyRuntimeMemory()
1073

    
1074
      if opts.do_replace1 and opts.disk_template in constants.DTS_INT_MIRROR:
1075
        self.BurnReplaceDisks1D8()
1076
      if (opts.do_replace2 and len(self.nodes) > 2 and
1077
          opts.disk_template in constants.DTS_INT_MIRROR):
1078
        self.BurnReplaceDisks2()
1079

    
1080
      if (opts.disk_template in constants.DTS_GROWABLE and
1081
          compat.any(n > 0 for n in self.disk_growth)):
1082
        self.BurnGrowDisks()
1083

    
1084
      if opts.do_failover and opts.disk_template in constants.DTS_MIRRORED:
1085
        self.BurnFailover()
1086

    
1087
      if opts.do_migrate:
1088
        if opts.disk_template not in constants.DTS_MIRRORED:
1089
          Log("Skipping migration (disk template %s does not support it)",
1090
              opts.disk_template)
1091
        elif not self.hv_can_migrate:
1092
          Log("Skipping migration (hypervisor %s does not support it)",
1093
              self.hypervisor)
1094
        else:
1095
          self.BurnMigrate()
1096

    
1097
      if (opts.do_move and len(self.nodes) > 1 and
1098
          opts.disk_template in [constants.DT_PLAIN, constants.DT_FILE]):
1099
        self.BurnMove()
1100

    
1101
      if (opts.do_importexport and
1102
          opts.disk_template in _IMPEXP_DISK_TEMPLATES):
1103
        self.BurnImportExport()
1104

    
1105
      if opts.do_reinstall:
1106
        self.BurnReinstall()
1107

    
1108
      if opts.do_reboot:
1109
        self.BurnReboot()
1110

    
1111
      if opts.do_renamesame:
1112
        self.BurnRenameSame()
1113

    
1114
      if opts.do_addremove_disks:
1115
        self.BurnAddRemoveDisks()
1116

    
1117
      default_nic_mode = self.cluster_default_nicparams[constants.NIC_MODE]
1118
      # Don't add/remove nics in routed mode, as we would need an ip to add
1119
      # them with
1120
      if opts.do_addremove_nics:
1121
        if default_nic_mode == constants.NIC_MODE_BRIDGED:
1122
          self.BurnAddRemoveNICs()
1123
        else:
1124
          Log("Skipping nic add/remove as the cluster is not in bridged mode")
1125

    
1126
      if opts.do_activate_disks:
1127
        self.BurnActivateDisks()
1128

    
1129
      if opts.rename:
1130
        self.BurnRename()
1131

    
1132
      if opts.do_confd_tests:
1133
        self.BurnConfd()
1134

    
1135
      if opts.do_startstop:
1136
        self.BurnStopStart()
1137

    
1138
      has_err = False
1139
    finally:
1140
      if has_err:
1141
        Log("Error detected: opcode buffer follows:\n\n")
1142
        Log(self.GetFeedbackBuf())
1143
        Log("\n\n")
1144
      if not self.opts.keep_instances:
1145
        try:
1146
          self.BurnRemove()
1147
        except Exception, err:  # pylint: disable=W0703
1148
          if has_err: # already detected errors, so errors in removal
1149
                      # are quite expected
1150
            Log("Note: error detected during instance remove: %s", err)
1151
          else: # non-expected error
1152
            raise
1153

    
1154
    return constants.EXIT_SUCCESS
1155

    
1156

    
1157
def Main():
1158
  """Main function.
1159

1160
  """
1161
  utils.SetupLogging(pathutils.LOG_BURNIN, sys.argv[0],
1162
                     debug=False, stderr_logging=True)
1163

    
1164
  return Burner().BurninCluster()