Statistics
| Branch: | Tag: | Revision:

root / lib / tools / burnin.py @ 2af8b9c9

History | View | Annotate | Download (41.6 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Burnin program
23

24
"""
25

    
26
import sys
27
import optparse
28
import time
29
import socket
30
import urllib
31
from itertools import izip, islice, cycle
32
from cStringIO import StringIO
33

    
34
from ganeti import opcodes
35
from ganeti import constants
36
from ganeti import cli
37
from ganeti import errors
38
from ganeti import utils
39
from ganeti import hypervisor
40
from ganeti import compat
41
from ganeti import pathutils
42

    
43
from ganeti.confd import client as confd_client
44
from ganeti.runtime import (GetClient)
45

    
46

    
47
USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...")
48

    
49
MAX_RETRIES = 3
50
LOG_HEADERS = {
51
  0: "- ",
52
  1: "* ",
53
  2: "",
54
  }
55

    
56
#: Disk templates supporting a single node
57
_SINGLE_NODE_DISK_TEMPLATES = compat.UniqueFrozenset([
58
  constants.DT_DISKLESS,
59
  constants.DT_PLAIN,
60
  constants.DT_FILE,
61
  constants.DT_SHARED_FILE,
62
  constants.DT_EXT,
63
  constants.DT_RBD,
64
  constants.DT_GLUSTER
65
  ])
66

    
67
_SUPPORTED_DISK_TEMPLATES = compat.UniqueFrozenset([
68
  constants.DT_DISKLESS,
69
  constants.DT_DRBD8,
70
  constants.DT_EXT,
71
  constants.DT_FILE,
72
  constants.DT_PLAIN,
73
  constants.DT_RBD,
74
  constants.DT_SHARED_FILE,
75
  constants.DT_GLUSTER
76
  ])
77

    
78
#: Disk templates for which import/export is tested
79
_IMPEXP_DISK_TEMPLATES = (_SUPPORTED_DISK_TEMPLATES - frozenset([
80
  constants.DT_DISKLESS,
81
  constants.DT_FILE,
82
  constants.DT_SHARED_FILE,
83
  constants.DT_GLUSTER
84
  ]))
85

    
86

    
87
class InstanceDown(Exception):
88
  """The checked instance was not up"""
89

    
90

    
91
class BurninFailure(Exception):
92
  """Failure detected during burning"""
93

    
94

    
95
def Usage():
96
  """Shows program usage information and exits the program."""
97

    
98
  print >> sys.stderr, "Usage:"
99
  print >> sys.stderr, USAGE
100
  sys.exit(2)
101

    
102

    
103
def Log(msg, *args, **kwargs):
104
  """Simple function that prints out its argument.
105

106
  """
107
  if args:
108
    msg = msg % args
109
  indent = kwargs.get("indent", 0)
110
  sys.stdout.write("%*s%s%s\n" % (2 * indent, "",
111
                                  LOG_HEADERS.get(indent, "  "), msg))
112
  sys.stdout.flush()
113

    
114

    
115
def Err(msg, exit_code=1):
116
  """Simple error logging that prints to stderr.
117

118
  """
119
  sys.stderr.write(msg + "\n")
120
  sys.stderr.flush()
121
  sys.exit(exit_code)
122

    
123

    
124
class SimpleOpener(urllib.FancyURLopener):
125
  """A simple url opener"""
126
  # pylint: disable=W0221
127

    
128
  def prompt_user_passwd(self, host, realm, clear_cache=0):
129
    """No-interaction version of prompt_user_passwd."""
130
    # we follow parent class' API
131
    # pylint: disable=W0613
132
    return None, None
133

    
134
  def http_error_default(self, url, fp, errcode, errmsg, headers):
135
    """Custom error handling"""
136
    # make sure sockets are not left in CLOSE_WAIT, this is similar
137
    # but with a different exception to the BasicURLOpener class
138
    _ = fp.read() # throw away data
139
    fp.close()
140
    raise InstanceDown("HTTP error returned: code %s, msg %s" %
141
                       (errcode, errmsg))
142

    
143

    
144
OPTIONS = [
145
  cli.cli_option("-o", "--os", dest="os", default=None,
146
                 help="OS to use during burnin",
147
                 metavar="<OS>",
148
                 completion_suggest=cli.OPT_COMPL_ONE_OS),
149
  cli.HYPERVISOR_OPT,
150
  cli.OSPARAMS_OPT,
151
  cli.cli_option("--disk-size", dest="disk_size",
152
                 help="Disk size (determines disk count)",
153
                 default="128m", type="string", metavar="<size,size,...>",
154
                 completion_suggest=("128M 512M 1G 4G 1G,256M"
155
                                     " 4G,1G,1G 10G").split()),
156
  cli.cli_option("--disk-growth", dest="disk_growth", help="Disk growth",
157
                 default="128m", type="string", metavar="<size,size,...>"),
158
  cli.cli_option("--mem-size", dest="mem_size", help="Memory size",
159
                 default=None, type="unit", metavar="<size>",
160
                 completion_suggest=("128M 256M 512M 1G 4G 8G"
161
                                     " 12G 16G").split()),
162
  cli.cli_option("--maxmem-size", dest="maxmem_size", help="Max Memory size",
163
                 default=256, type="unit", metavar="<size>",
164
                 completion_suggest=("128M 256M 512M 1G 4G 8G"
165
                                     " 12G 16G").split()),
166
  cli.cli_option("--minmem-size", dest="minmem_size", help="Min Memory size",
167
                 default=128, type="unit", metavar="<size>",
168
                 completion_suggest=("128M 256M 512M 1G 4G 8G"
169
                                     " 12G 16G").split()),
170
  cli.cli_option("--vcpu-count", dest="vcpu_count", help="VCPU count",
171
                 default=3, type="unit", metavar="<count>",
172
                 completion_suggest=("1 2 3 4").split()),
173
  cli.DEBUG_OPT,
174
  cli.VERBOSE_OPT,
175
  cli.NOIPCHECK_OPT,
176
  cli.NONAMECHECK_OPT,
177
  cli.EARLY_RELEASE_OPT,
178
  cli.cli_option("--no-replace1", dest="do_replace1",
179
                 help="Skip disk replacement with the same secondary",
180
                 action="store_false", default=True),
181
  cli.cli_option("--no-replace2", dest="do_replace2",
182
                 help="Skip disk replacement with a different secondary",
183
                 action="store_false", default=True),
184
  cli.cli_option("--no-failover", dest="do_failover",
185
                 help="Skip instance failovers", action="store_false",
186
                 default=True),
187
  cli.cli_option("--no-migrate", dest="do_migrate",
188
                 help="Skip instance live migration",
189
                 action="store_false", default=True),
190
  cli.cli_option("--no-move", dest="do_move",
191
                 help="Skip instance moves", action="store_false",
192
                 default=True),
193
  cli.cli_option("--no-importexport", dest="do_importexport",
194
                 help="Skip instance export/import", action="store_false",
195
                 default=True),
196
  cli.cli_option("--no-startstop", dest="do_startstop",
197
                 help="Skip instance stop/start", action="store_false",
198
                 default=True),
199
  cli.cli_option("--no-reinstall", dest="do_reinstall",
200
                 help="Skip instance reinstall", action="store_false",
201
                 default=True),
202
  cli.cli_option("--no-reboot", dest="do_reboot",
203
                 help="Skip instance reboot", action="store_false",
204
                 default=True),
205
  cli.cli_option("--no-renamesame", dest="do_renamesame",
206
                 help="Skip instance rename to same name", action="store_false",
207
                 default=True),
208
  cli.cli_option("--reboot-types", dest="reboot_types",
209
                 help="Specify the reboot types", default=None),
210
  cli.cli_option("--no-activate-disks", dest="do_activate_disks",
211
                 help="Skip disk activation/deactivation",
212
                 action="store_false", default=True),
213
  cli.cli_option("--no-add-disks", dest="do_addremove_disks",
214
                 help="Skip disk addition/removal",
215
                 action="store_false", default=True),
216
  cli.cli_option("--no-add-nics", dest="do_addremove_nics",
217
                 help="Skip NIC addition/removal",
218
                 action="store_false", default=True),
219
  cli.cli_option("--no-nics", dest="nics",
220
                 help="No network interfaces", action="store_const",
221
                 const=[], default=[{}]),
222
  cli.cli_option("--no-confd", dest="do_confd_tests",
223
                 help="Skip confd queries",
224
                 action="store_false", default=constants.ENABLE_CONFD),
225
  cli.cli_option("--rename", dest="rename", default=None,
226
                 help=("Give one unused instance name which is taken"
227
                       " to start the renaming sequence"),
228
                 metavar="<instance_name>"),
229
  cli.cli_option("-t", "--disk-template", dest="disk_template",
230
                 choices=list(_SUPPORTED_DISK_TEMPLATES),
231
                 default=constants.DT_DRBD8,
232
                 help=("Disk template (default %s, otherwise one of %s)" %
233
                       (constants.DT_DRBD8,
234
                        utils.CommaJoin(_SUPPORTED_DISK_TEMPLATES)))),
235
  cli.cli_option("-n", "--nodes", dest="nodes", default="",
236
                 help=("Comma separated list of nodes to perform"
237
                       " the burnin on (defaults to all nodes)"),
238
                 completion_suggest=cli.OPT_COMPL_MANY_NODES),
239
  cli.cli_option("-I", "--iallocator", dest="iallocator",
240
                 default=None, type="string",
241
                 help=("Perform the allocation using an iallocator"
242
                       " instead of fixed node spread (node restrictions no"
243
                       " longer apply, therefore -n/--nodes must not be"
244
                       " used"),
245
                 completion_suggest=cli.OPT_COMPL_ONE_IALLOCATOR),
246
  cli.cli_option("-p", "--parallel", default=False, action="store_true",
247
                 dest="parallel",
248
                 help=("Enable parallelization of some operations in"
249
                       " order to speed burnin or to test granular locking")),
250
  cli.cli_option("--net-timeout", default=15, type="int",
251
                 dest="net_timeout",
252
                 help=("The instance check network timeout in seconds"
253
                       " (defaults to 15 seconds)"),
254
                 completion_suggest="15 60 300 900".split()),
255
  cli.cli_option("-C", "--http-check", default=False, action="store_true",
256
                 dest="http_check",
257
                 help=("Enable checking of instance status via http,"
258
                       " looking for /hostname.txt that should contain the"
259
                       " name of the instance")),
260
  cli.cli_option("-K", "--keep-instances", default=False,
261
                 action="store_true",
262
                 dest="keep_instances",
263
                 help=("Leave instances on the cluster after burnin,"
264
                       " for investigation in case of errors or simply"
265
                       " to use them")),
266
  cli.REASON_OPT,
267
  ]
268

    
269
# Mainly used for bash completion
270
ARGUMENTS = [cli.ArgInstance(min=1)]
271

    
272

    
273
def _DoCheckInstances(fn):
274
  """Decorator for checking instances.
275

276
  """
277
  def wrapper(self, *args, **kwargs):
278
    val = fn(self, *args, **kwargs)
279
    for instance in self.instances:
280
      self._CheckInstanceAlive(instance) # pylint: disable=W0212
281
    return val
282

    
283
  return wrapper
284

    
285

    
286
def _DoBatch(retry):
287
  """Decorator for possible batch operations.
288

289
  Must come after the _DoCheckInstances decorator (if any).
290

291
  @param retry: whether this is a retryable batch, will be
292
      passed to StartBatch
293

294
  """
295
  def wrap(fn):
296
    def batched(self, *args, **kwargs):
297
      self.StartBatch(retry)
298
      val = fn(self, *args, **kwargs)
299
      self.CommitQueue()
300
      return val
301
    return batched
302

    
303
  return wrap
304

    
305

    
306
class Burner(object):
307
  """Burner class."""
308

    
309
  def __init__(self):
310
    """Constructor."""
311
    self.url_opener = SimpleOpener()
312
    self._feed_buf = StringIO()
313
    self.nodes = []
314
    self.instances = []
315
    self.to_rem = []
316
    self.queued_ops = []
317
    self.opts = None
318
    self.queue_retry = False
319
    self.disk_count = self.disk_growth = self.disk_size = None
320
    self.hvp = self.bep = None
321
    self.ParseOptions()
322
    self.cl = cli.GetClient()
323
    self.GetState()
324

    
325
  def ClearFeedbackBuf(self):
326
    """Clear the feedback buffer."""
327
    self._feed_buf.truncate(0)
328

    
329
  def GetFeedbackBuf(self):
330
    """Return the contents of the buffer."""
331
    return self._feed_buf.getvalue()
332

    
333
  def Feedback(self, msg):
334
    """Acumulate feedback in our buffer."""
335
    formatted_msg = "%s %s" % (time.ctime(utils.MergeTime(msg[0])), msg[2])
336
    self._feed_buf.write(formatted_msg + "\n")
337
    if self.opts.verbose:
338
      Log(formatted_msg, indent=3)
339

    
340
  def MaybeRetry(self, retry_count, msg, fn, *args):
341
    """Possibly retry a given function execution.
342

343
    @type retry_count: int
344
    @param retry_count: retry counter:
345
        - 0: non-retryable action
346
        - 1: last retry for a retryable action
347
        - MAX_RETRIES: original try for a retryable action
348
    @type msg: str
349
    @param msg: the kind of the operation
350
    @type fn: callable
351
    @param fn: the function to be called
352

353
    """
354
    try:
355
      val = fn(*args)
356
      if retry_count > 0 and retry_count < MAX_RETRIES:
357
        Log("Idempotent %s succeeded after %d retries",
358
            msg, MAX_RETRIES - retry_count)
359
      return val
360
    except Exception, err: # pylint: disable=W0703
361
      if retry_count == 0:
362
        Log("Non-idempotent %s failed, aborting", msg)
363
        raise
364
      elif retry_count == 1:
365
        Log("Idempotent %s repeated failure, aborting", msg)
366
        raise
367
      else:
368
        Log("Idempotent %s failed, retry #%d/%d: %s",
369
            msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err)
370
        self.MaybeRetry(retry_count - 1, msg, fn, *args)
371

    
372
  def _ExecOp(self, *ops):
373
    """Execute one or more opcodes and manage the exec buffer.
374

375
    @return: if only opcode has been passed, we return its result;
376
        otherwise we return the list of results
377

378
    """
379
    job_id = cli.SendJob(ops, cl=self.cl)
380
    results = cli.PollJob(job_id, cl=self.cl, feedback_fn=self.Feedback)
381
    if len(ops) == 1:
382
      return results[0]
383
    else:
384
      return results
385

    
386
  def ExecOp(self, retry, *ops):
387
    """Execute one or more opcodes and manage the exec buffer.
388

389
    @return: if only opcode has been passed, we return its result;
390
        otherwise we return the list of results
391

392
    """
393
    if retry:
394
      rval = MAX_RETRIES
395
    else:
396
      rval = 0
397
    cli.SetGenericOpcodeOpts(ops, self.opts)
398
    return self.MaybeRetry(rval, "opcode", self._ExecOp, *ops)
399

    
400
  def ExecOrQueue(self, name, ops, post_process=None):
401
    """Execute an opcode and manage the exec buffer."""
402
    if self.opts.parallel:
403
      cli.SetGenericOpcodeOpts(ops, self.opts)
404
      self.queued_ops.append((ops, name, post_process))
405
    else:
406
      val = self.ExecOp(self.queue_retry, *ops) # pylint: disable=W0142
407
      if post_process is not None:
408
        post_process()
409
      return val
410

    
411
  def StartBatch(self, retry):
412
    """Start a new batch of jobs.
413

414
    @param retry: whether this is a retryable batch
415

416
    """
417
    self.queued_ops = []
418
    self.queue_retry = retry
419

    
420
  def CommitQueue(self):
421
    """Execute all submitted opcodes in case of parallel burnin"""
422
    if not self.opts.parallel or not self.queued_ops:
423
      return
424

    
425
    if self.queue_retry:
426
      rval = MAX_RETRIES
427
    else:
428
      rval = 0
429

    
430
    try:
431
      results = self.MaybeRetry(rval, "jobset", self.ExecJobSet,
432
                                self.queued_ops)
433
    finally:
434
      self.queued_ops = []
435
    return results
436

    
437
  def ExecJobSet(self, jobs):
438
    """Execute a set of jobs and return once all are done.
439

440
    The method will return the list of results, if all jobs are
441
    successful. Otherwise, OpExecError will be raised from within
442
    cli.py.
443

444
    """
445
    self.ClearFeedbackBuf()
446
    jex = cli.JobExecutor(cl=self.cl, feedback_fn=self.Feedback)
447
    for ops, name, _ in jobs:
448
      jex.QueueJob(name, *ops) # pylint: disable=W0142
449
    try:
450
      results = jex.GetResults()
451
    except Exception, err: # pylint: disable=W0703
452
      Log("Jobs failed: %s", err)
453
      raise BurninFailure()
454

    
455
    fail = False
456
    val = []
457
    for (_, name, post_process), (success, result) in zip(jobs, results):
458
      if success:
459
        if post_process:
460
          try:
461
            post_process()
462
          except Exception, err: # pylint: disable=W0703
463
            Log("Post process call for job %s failed: %s", name, err)
464
            fail = True
465
        val.append(result)
466
      else:
467
        fail = True
468

    
469
    if fail:
470
      raise BurninFailure()
471

    
472
    return val
473

    
474
  def ParseOptions(self):
475
    """Parses the command line options.
476

477
    In case of command line errors, it will show the usage and exit the
478
    program.
479

480
    """
481
    parser = optparse.OptionParser(usage="\n%s" % USAGE,
482
                                   version=("%%prog (ganeti) %s" %
483
                                            constants.RELEASE_VERSION),
484
                                   option_list=OPTIONS)
485

    
486
    options, args = parser.parse_args()
487
    if len(args) < 1 or options.os is None:
488
      Usage()
489

    
490
    if options.mem_size:
491
      options.maxmem_size = options.mem_size
492
      options.minmem_size = options.mem_size
493
    elif options.minmem_size > options.maxmem_size:
494
      Err("Maximum memory lower than minimum memory")
495

    
496
    if options.disk_template not in _SUPPORTED_DISK_TEMPLATES:
497
      Err("Unknown or unsupported disk template '%s'" % options.disk_template)
498

    
499
    if options.disk_template == constants.DT_DISKLESS:
500
      disk_size = disk_growth = []
501
      options.do_addremove_disks = False
502
    else:
503
      disk_size = [utils.ParseUnit(v) for v in options.disk_size.split(",")]
504
      disk_growth = [utils.ParseUnit(v)
505
                     for v in options.disk_growth.split(",")]
506
      if len(disk_growth) != len(disk_size):
507
        Err("Wrong disk sizes/growth combination")
508
    if ((disk_size and options.disk_template == constants.DT_DISKLESS) or
509
        (not disk_size and options.disk_template != constants.DT_DISKLESS)):
510
      Err("Wrong disk count/disk template combination")
511

    
512
    self.disk_size = disk_size
513
    self.disk_growth = disk_growth
514
    self.disk_count = len(disk_size)
515

    
516
    if options.nodes and options.iallocator:
517
      Err("Give either the nodes option or the iallocator option, not both")
518

    
519
    if options.http_check and not options.name_check:
520
      Err("Can't enable HTTP checks without name checks")
521

    
522
    self.opts = options
523
    self.instances = args
524
    self.bep = {
525
      constants.BE_MINMEM: options.minmem_size,
526
      constants.BE_MAXMEM: options.maxmem_size,
527
      constants.BE_VCPUS: options.vcpu_count,
528
      }
529

    
530
    self.hypervisor = None
531
    self.hvp = {}
532
    if options.hypervisor:
533
      self.hypervisor, self.hvp = options.hypervisor
534

    
535
    if options.reboot_types is None:
536
      options.reboot_types = constants.REBOOT_TYPES
537
    else:
538
      options.reboot_types = options.reboot_types.split(",")
539
      rt_diff = set(options.reboot_types).difference(constants.REBOOT_TYPES)
540
      if rt_diff:
541
        Err("Invalid reboot types specified: %s" % utils.CommaJoin(rt_diff))
542

    
543
    socket.setdefaulttimeout(options.net_timeout)
544

    
545
  def GetState(self):
546
    """Read the cluster state from the master daemon."""
547
    if self.opts.nodes:
548
      names = self.opts.nodes.split(",")
549
    else:
550
      names = []
551
    try:
552
      qcl = GetClient()
553
      result = qcl.QueryNodes(names, ["name", "offline", "drained"], False)
554
    except errors.GenericError, err:
555
      err_code, msg = cli.FormatError(err)
556
      Err(msg, exit_code=err_code)
557
    finally:
558
      qcl.Close()
559
    self.nodes = [data[0] for data in result if not (data[1] or data[2])]
560

    
561
    op_diagnose = opcodes.OpOsDiagnose(output_fields=["name",
562
                                                      "variants",
563
                                                      "hidden"],
564
                                       names=[])
565
    result = self.ExecOp(True, op_diagnose)
566

    
567
    if not result:
568
      Err("Can't get the OS list")
569

    
570
    found = False
571
    for (name, variants, _) in result:
572
      if self.opts.os in cli.CalculateOSNames(name, variants):
573
        found = True
574
        break
575

    
576
    if not found:
577
      Err("OS '%s' not found" % self.opts.os)
578

    
579
    cluster_info = self.cl.QueryClusterInfo()
580
    self.cluster_info = cluster_info
581
    if not self.cluster_info:
582
      Err("Can't get cluster info")
583

    
584
    default_nic_params = self.cluster_info["nicparams"][constants.PP_DEFAULT]
585
    self.cluster_default_nicparams = default_nic_params
586
    if self.hypervisor is None:
587
      self.hypervisor = self.cluster_info["default_hypervisor"]
588
    self.hv_can_migrate = \
589
      hypervisor.GetHypervisorClass(self.hypervisor).CAN_MIGRATE
590

    
591
  @_DoCheckInstances
592
  @_DoBatch(False)
593
  def BurnCreateInstances(self):
594
    """Create the given instances.
595

596
    """
597
    self.to_rem = []
598
    mytor = izip(cycle(self.nodes),
599
                 islice(cycle(self.nodes), 1, None),
600
                 self.instances)
601

    
602
    Log("Creating instances")
603
    for pnode, snode, instance in mytor:
604
      Log("instance %s", instance, indent=1)
605
      if self.opts.iallocator:
606
        pnode = snode = None
607
        msg = "with iallocator %s" % self.opts.iallocator
608
      elif self.opts.disk_template not in constants.DTS_INT_MIRROR:
609
        snode = None
610
        msg = "on %s" % pnode
611
      else:
612
        msg = "on %s, %s" % (pnode, snode)
613

    
614
      Log(msg, indent=2)
615

    
616
      op = opcodes.OpInstanceCreate(instance_name=instance,
617
                                    disks=[{"size": size}
618
                                           for size in self.disk_size],
619
                                    disk_template=self.opts.disk_template,
620
                                    nics=self.opts.nics,
621
                                    mode=constants.INSTANCE_CREATE,
622
                                    os_type=self.opts.os,
623
                                    pnode=pnode,
624
                                    snode=snode,
625
                                    start=True,
626
                                    ip_check=self.opts.ip_check,
627
                                    name_check=self.opts.name_check,
628
                                    wait_for_sync=True,
629
                                    file_driver="loop",
630
                                    file_storage_dir=None,
631
                                    iallocator=self.opts.iallocator,
632
                                    beparams=self.bep,
633
                                    hvparams=self.hvp,
634
                                    hypervisor=self.hypervisor,
635
                                    osparams=self.opts.osparams,
636
                                    )
637
      remove_instance = lambda name: lambda: self.to_rem.append(name)
638
      self.ExecOrQueue(instance, [op], post_process=remove_instance(instance))
639

    
640
  @_DoBatch(False)
641
  def BurnModifyRuntimeMemory(self):
642
    """Alter the runtime memory."""
643
    Log("Setting instance runtime memory")
644
    for instance in self.instances:
645
      Log("instance %s", instance, indent=1)
646
      tgt_mem = self.bep[constants.BE_MINMEM]
647
      op = opcodes.OpInstanceSetParams(instance_name=instance,
648
                                       runtime_mem=tgt_mem)
649
      Log("Set memory to %s MB", tgt_mem, indent=2)
650
      self.ExecOrQueue(instance, [op])
651

    
652
  @_DoBatch(False)
653
  def BurnGrowDisks(self):
654
    """Grow both the os and the swap disks by the requested amount, if any."""
655
    Log("Growing disks")
656
    for instance in self.instances:
657
      Log("instance %s", instance, indent=1)
658
      for idx, growth in enumerate(self.disk_growth):
659
        if growth > 0:
660
          op = opcodes.OpInstanceGrowDisk(instance_name=instance, disk=idx,
661
                                          amount=growth, wait_for_sync=True)
662
          Log("increase disk/%s by %s MB", idx, growth, indent=2)
663
          self.ExecOrQueue(instance, [op])
664

    
665
  @_DoBatch(True)
666
  def BurnReplaceDisks1D8(self):
667
    """Replace disks on primary and secondary for drbd8."""
668
    Log("Replacing disks on the same nodes")
669
    early_release = self.opts.early_release
670
    for instance in self.instances:
671
      Log("instance %s", instance, indent=1)
672
      ops = []
673
      for mode in constants.REPLACE_DISK_SEC, constants.REPLACE_DISK_PRI:
674
        op = opcodes.OpInstanceReplaceDisks(instance_name=instance,
675
                                            mode=mode,
676
                                            disks=list(range(self.disk_count)),
677
                                            early_release=early_release)
678
        Log("run %s", mode, indent=2)
679
        ops.append(op)
680
      self.ExecOrQueue(instance, ops)
681

    
682
  @_DoBatch(True)
683
  def BurnReplaceDisks2(self):
684
    """Replace secondary node."""
685
    Log("Changing the secondary node")
686
    mode = constants.REPLACE_DISK_CHG
687

    
688
    mytor = izip(islice(cycle(self.nodes), 2, None),
689
                 self.instances)
690
    for tnode, instance in mytor:
691
      Log("instance %s", instance, indent=1)
692
      if self.opts.iallocator:
693
        tnode = None
694
        msg = "with iallocator %s" % self.opts.iallocator
695
      else:
696
        msg = tnode
697
      op = opcodes.OpInstanceReplaceDisks(instance_name=instance,
698
                                          mode=mode,
699
                                          remote_node=tnode,
700
                                          iallocator=self.opts.iallocator,
701
                                          disks=[],
702
                                          early_release=self.opts.early_release)
703
      Log("run %s %s", mode, msg, indent=2)
704
      self.ExecOrQueue(instance, [op])
705

    
706
  @_DoCheckInstances
707
  @_DoBatch(False)
708
  def BurnFailover(self):
709
    """Failover the instances."""
710
    Log("Failing over instances")
711
    for instance in self.instances:
712
      Log("instance %s", instance, indent=1)
713
      op = opcodes.OpInstanceFailover(instance_name=instance,
714
                                      ignore_consistency=False)
715
      self.ExecOrQueue(instance, [op])
716

    
717
  @_DoCheckInstances
718
  @_DoBatch(False)
719
  def BurnMove(self):
720
    """Move the instances."""
721
    Log("Moving instances")
722
    mytor = izip(islice(cycle(self.nodes), 1, None),
723
                 self.instances)
724
    for tnode, instance in mytor:
725
      Log("instance %s", instance, indent=1)
726
      op = opcodes.OpInstanceMove(instance_name=instance,
727
                                  target_node=tnode)
728
      self.ExecOrQueue(instance, [op])
729

    
730
  @_DoBatch(False)
731
  def BurnMigrate(self):
732
    """Migrate the instances."""
733
    Log("Migrating instances")
734
    for instance in self.instances:
735
      Log("instance %s", instance, indent=1)
736
      op1 = opcodes.OpInstanceMigrate(instance_name=instance, mode=None,
737
                                      cleanup=False)
738

    
739
      op2 = opcodes.OpInstanceMigrate(instance_name=instance, mode=None,
740
                                      cleanup=True)
741
      Log("migration and migration cleanup", indent=2)
742
      self.ExecOrQueue(instance, [op1, op2])
743

    
744
  @_DoCheckInstances
745
  @_DoBatch(False)
746
  def BurnImportExport(self):
747
    """Export the instance, delete it, and import it back.
748

749
    """
750
    Log("Exporting and re-importing instances")
751
    mytor = izip(cycle(self.nodes),
752
                 islice(cycle(self.nodes), 1, None),
753
                 islice(cycle(self.nodes), 2, None),
754
                 self.instances)
755

    
756
    qcl = GetClient()
757
    for pnode, snode, enode, instance in mytor:
758
      Log("instance %s", instance, indent=1)
759
      # read the full name of the instance
760
      ((full_name, ), ) = qcl.QueryInstances([instance], ["name"], False)
761

    
762
      if self.opts.iallocator:
763
        pnode = snode = None
764
        import_log_msg = ("import from %s"
765
                          " with iallocator %s" %
766
                          (enode, self.opts.iallocator))
767
      elif self.opts.disk_template not in constants.DTS_INT_MIRROR:
768
        snode = None
769
        import_log_msg = ("import from %s to %s" %
770
                          (enode, pnode))
771
      else:
772
        import_log_msg = ("import from %s to %s, %s" %
773
                          (enode, pnode, snode))
774

    
775
      exp_op = opcodes.OpBackupExport(instance_name=instance,
776
                                      target_node=enode,
777
                                      mode=constants.EXPORT_MODE_LOCAL,
778
                                      shutdown=True)
779
      rem_op = opcodes.OpInstanceRemove(instance_name=instance,
780
                                        ignore_failures=True)
781
      imp_dir = utils.PathJoin(pathutils.EXPORT_DIR, full_name)
782
      imp_op = opcodes.OpInstanceCreate(instance_name=instance,
783
                                        disks=[{"size": size}
784
                                               for size in self.disk_size],
785
                                        disk_template=self.opts.disk_template,
786
                                        nics=self.opts.nics,
787
                                        mode=constants.INSTANCE_IMPORT,
788
                                        src_node=enode,
789
                                        src_path=imp_dir,
790
                                        pnode=pnode,
791
                                        snode=snode,
792
                                        start=True,
793
                                        ip_check=self.opts.ip_check,
794
                                        name_check=self.opts.name_check,
795
                                        wait_for_sync=True,
796
                                        file_storage_dir=None,
797
                                        file_driver="loop",
798
                                        iallocator=self.opts.iallocator,
799
                                        beparams=self.bep,
800
                                        hvparams=self.hvp,
801
                                        osparams=self.opts.osparams,
802
                                        )
803

    
804
      erem_op = opcodes.OpBackupRemove(instance_name=instance)
805

    
806
      Log("export to node %s", enode, indent=2)
807
      Log("remove instance", indent=2)
808
      Log(import_log_msg, indent=2)
809
      Log("remove export", indent=2)
810
      self.ExecOrQueue(instance, [exp_op, rem_op, imp_op, erem_op])
811
    qcl.Close()
812

    
813
  @staticmethod
814
  def StopInstanceOp(instance):
815
    """Stop given instance."""
816
    return opcodes.OpInstanceShutdown(instance_name=instance)
817

    
818
  @staticmethod
819
  def StartInstanceOp(instance):
820
    """Start given instance."""
821
    return opcodes.OpInstanceStartup(instance_name=instance, force=False)
822

    
823
  @staticmethod
824
  def RenameInstanceOp(instance, instance_new):
825
    """Rename instance."""
826
    return opcodes.OpInstanceRename(instance_name=instance,
827
                                    new_name=instance_new)
828

    
829
  @_DoCheckInstances
830
  @_DoBatch(True)
831
  def BurnStopStart(self):
832
    """Stop/start the instances."""
833
    Log("Stopping and starting instances")
834
    for instance in self.instances:
835
      Log("instance %s", instance, indent=1)
836
      op1 = self.StopInstanceOp(instance)
837
      op2 = self.StartInstanceOp(instance)
838
      self.ExecOrQueue(instance, [op1, op2])
839

    
840
  @_DoBatch(False)
841
  def BurnRemove(self):
842
    """Remove the instances."""
843
    Log("Removing instances")
844
    for instance in self.to_rem:
845
      Log("instance %s", instance, indent=1)
846
      op = opcodes.OpInstanceRemove(instance_name=instance,
847
                                    ignore_failures=True)
848
      self.ExecOrQueue(instance, [op])
849

    
850
  def BurnRename(self):
851
    """Rename the instances.
852

853
    Note that this function will not execute in parallel, since we
854
    only have one target for rename.
855

856
    """
857
    Log("Renaming instances")
858
    rename = self.opts.rename
859
    for instance in self.instances:
860
      Log("instance %s", instance, indent=1)
861
      op_stop1 = self.StopInstanceOp(instance)
862
      op_stop2 = self.StopInstanceOp(rename)
863
      op_rename1 = self.RenameInstanceOp(instance, rename)
864
      op_rename2 = self.RenameInstanceOp(rename, instance)
865
      op_start1 = self.StartInstanceOp(rename)
866
      op_start2 = self.StartInstanceOp(instance)
867
      self.ExecOp(False, op_stop1, op_rename1, op_start1)
868
      self._CheckInstanceAlive(rename)
869
      self.ExecOp(False, op_stop2, op_rename2, op_start2)
870
      self._CheckInstanceAlive(instance)
871

    
872
  @_DoCheckInstances
873
  @_DoBatch(True)
874
  def BurnReinstall(self):
875
    """Reinstall the instances."""
876
    Log("Reinstalling instances")
877
    for instance in self.instances:
878
      Log("instance %s", instance, indent=1)
879
      op1 = self.StopInstanceOp(instance)
880
      op2 = opcodes.OpInstanceReinstall(instance_name=instance)
881
      Log("reinstall without passing the OS", indent=2)
882
      op3 = opcodes.OpInstanceReinstall(instance_name=instance,
883
                                        os_type=self.opts.os)
884
      Log("reinstall specifying the OS", indent=2)
885
      op4 = self.StartInstanceOp(instance)
886
      self.ExecOrQueue(instance, [op1, op2, op3, op4])
887

    
888
  @_DoCheckInstances
889
  @_DoBatch(True)
890
  def BurnReboot(self):
891
    """Reboot the instances."""
892
    Log("Rebooting instances")
893
    for instance in self.instances:
894
      Log("instance %s", instance, indent=1)
895
      ops = []
896
      for reboot_type in self.opts.reboot_types:
897
        op = opcodes.OpInstanceReboot(instance_name=instance,
898
                                      reboot_type=reboot_type,
899
                                      ignore_secondaries=False)
900
        Log("reboot with type '%s'", reboot_type, indent=2)
901
        ops.append(op)
902
      self.ExecOrQueue(instance, ops)
903

    
904
  @_DoCheckInstances
905
  @_DoBatch(True)
906
  def BurnRenameSame(self):
907
    """Rename the instances to their own name."""
908
    Log("Renaming the instances to their own name")
909
    for instance in self.instances:
910
      Log("instance %s", instance, indent=1)
911
      op1 = self.StopInstanceOp(instance)
912
      op2 = self.RenameInstanceOp(instance, instance)
913
      Log("rename to the same name", indent=2)
914
      op4 = self.StartInstanceOp(instance)
915
      self.ExecOrQueue(instance, [op1, op2, op4])
916

    
917
  @_DoCheckInstances
918
  @_DoBatch(True)
919
  def BurnActivateDisks(self):
920
    """Activate and deactivate disks of the instances."""
921
    Log("Activating/deactivating disks")
922
    for instance in self.instances:
923
      Log("instance %s", instance, indent=1)
924
      op_start = self.StartInstanceOp(instance)
925
      op_act = opcodes.OpInstanceActivateDisks(instance_name=instance)
926
      op_deact = opcodes.OpInstanceDeactivateDisks(instance_name=instance)
927
      op_stop = self.StopInstanceOp(instance)
928
      Log("activate disks when online", indent=2)
929
      Log("activate disks when offline", indent=2)
930
      Log("deactivate disks (when offline)", indent=2)
931
      self.ExecOrQueue(instance, [op_act, op_stop, op_act, op_deact, op_start])
932

    
933
  @_DoCheckInstances
934
  @_DoBatch(False)
935
  def BurnAddRemoveDisks(self):
936
    """Add and remove an extra disk for the instances."""
937
    Log("Adding and removing disks")
938
    for instance in self.instances:
939
      Log("instance %s", instance, indent=1)
940
      op_add = opcodes.OpInstanceSetParams(
941
        instance_name=instance,
942
        disks=[(constants.DDM_ADD, {"size": self.disk_size[0]})])
943
      op_rem = opcodes.OpInstanceSetParams(
944
        instance_name=instance, disks=[(constants.DDM_REMOVE, {})])
945
      op_stop = self.StopInstanceOp(instance)
946
      op_start = self.StartInstanceOp(instance)
947
      Log("adding a disk", indent=2)
948
      Log("removing last disk", indent=2)
949
      self.ExecOrQueue(instance, [op_add, op_stop, op_rem, op_start])
950

    
951
  @_DoBatch(False)
952
  def BurnAddRemoveNICs(self):
953
    """Add, change and remove an extra NIC for the instances."""
954
    Log("Adding and removing NICs")
955
    for instance in self.instances:
956
      Log("instance %s", instance, indent=1)
957
      op_add = opcodes.OpInstanceSetParams(
958
        instance_name=instance, nics=[(constants.DDM_ADD, {})])
959
      op_chg = opcodes.OpInstanceSetParams(
960
        instance_name=instance, nics=[(constants.DDM_MODIFY,
961
                                       -1, {"mac": constants.VALUE_GENERATE})])
962
      op_rem = opcodes.OpInstanceSetParams(
963
        instance_name=instance, nics=[(constants.DDM_REMOVE, {})])
964
      Log("adding a NIC", indent=2)
965
      Log("changing a NIC", indent=2)
966
      Log("removing last NIC", indent=2)
967
      self.ExecOrQueue(instance, [op_add, op_chg, op_rem])
968

    
969
  def ConfdCallback(self, reply):
970
    """Callback for confd queries"""
971
    if reply.type == confd_client.UPCALL_REPLY:
972
      if reply.server_reply.status != constants.CONFD_REPL_STATUS_OK:
973
        Err("Query %s gave non-ok status %s: %s" % (reply.orig_request,
974
                                                    reply.server_reply.status,
975
                                                    reply.server_reply))
976
      if reply.orig_request.type == constants.CONFD_REQ_PING:
977
        Log("Ping: OK", indent=1)
978
      elif reply.orig_request.type == constants.CONFD_REQ_CLUSTER_MASTER:
979
        if reply.server_reply.answer == self.cluster_info["master"]:
980
          Log("Master: OK", indent=1)
981
        else:
982
          Err("Master: wrong: %s" % reply.server_reply.answer)
983
      elif reply.orig_request.type == constants.CONFD_REQ_NODE_ROLE_BYNAME:
984
        if reply.server_reply.answer == constants.CONFD_NODE_ROLE_MASTER:
985
          Log("Node role for master: OK", indent=1)
986
        else:
987
          Err("Node role for master: wrong: %s" % reply.server_reply.answer)
988

    
989
  def DoConfdRequestReply(self, req):
990
    self.confd_counting_callback.RegisterQuery(req.rsalt)
991
    self.confd_client.SendRequest(req, async=False)
992
    while not self.confd_counting_callback.AllAnswered():
993
      if not self.confd_client.ReceiveReply():
994
        Err("Did not receive all expected confd replies")
995
        break
996

    
997
  def BurnConfd(self):
998
    """Run confd queries for our instances.
999

1000
    The following confd queries are tested:
1001
      - CONFD_REQ_PING: simple ping
1002
      - CONFD_REQ_CLUSTER_MASTER: cluster master
1003
      - CONFD_REQ_NODE_ROLE_BYNAME: node role, for the master
1004

1005
    """
1006
    Log("Checking confd results")
1007

    
1008
    filter_callback = confd_client.ConfdFilterCallback(self.ConfdCallback)
1009
    counting_callback = confd_client.ConfdCountingCallback(filter_callback)
1010
    self.confd_counting_callback = counting_callback
1011

    
1012
    self.confd_client = confd_client.GetConfdClient(counting_callback)
1013

    
1014
    req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
1015
    self.DoConfdRequestReply(req)
1016

    
1017
    req = confd_client.ConfdClientRequest(
1018
      type=constants.CONFD_REQ_CLUSTER_MASTER)
1019
    self.DoConfdRequestReply(req)
1020

    
1021
    req = confd_client.ConfdClientRequest(
1022
        type=constants.CONFD_REQ_NODE_ROLE_BYNAME,
1023
        query=self.cluster_info["master"])
1024
    self.DoConfdRequestReply(req)
1025

    
1026
  def _CheckInstanceAlive(self, instance):
1027
    """Check if an instance is alive by doing http checks.
1028

1029
    This will try to retrieve the url on the instance /hostname.txt
1030
    and check that it contains the hostname of the instance. In case
1031
    we get ECONNREFUSED, we retry up to the net timeout seconds, for
1032
    any other error we abort.
1033

1034
    """
1035
    if not self.opts.http_check:
1036
      return
1037
    end_time = time.time() + self.opts.net_timeout
1038
    url = None
1039
    while time.time() < end_time and url is None:
1040
      try:
1041
        url = self.url_opener.open("http://%s/hostname.txt" % instance)
1042
      except IOError:
1043
        # here we can have connection refused, no route to host, etc.
1044
        time.sleep(1)
1045
    if url is None:
1046
      raise InstanceDown(instance, "Cannot contact instance")
1047
    hostname = url.read().strip()
1048
    url.close()
1049
    if hostname != instance:
1050
      raise InstanceDown(instance, ("Hostname mismatch, expected %s, got %s" %
1051
                                    (instance, hostname)))
1052

    
1053
  def BurninCluster(self):
1054
    """Test a cluster intensively.
1055

1056
    This will create instances and then start/stop/failover them.
1057
    It is safe for existing instances but could impact performance.
1058

1059
    """
1060

    
1061
    Log("Testing global parameters")
1062

    
1063
    if (len(self.nodes) == 1 and
1064
        self.opts.disk_template not in _SINGLE_NODE_DISK_TEMPLATES):
1065
      Err("When one node is available/selected the disk template must"
1066
          " be one of %s" % utils.CommaJoin(_SINGLE_NODE_DISK_TEMPLATES))
1067

    
1068
    if self.opts.do_confd_tests and not constants.ENABLE_CONFD:
1069
      Err("You selected confd tests but confd was disabled at configure time")
1070

    
1071
    has_err = True
1072
    try:
1073
      self.BurnCreateInstances()
1074

    
1075
      if self.bep[constants.BE_MINMEM] < self.bep[constants.BE_MAXMEM]:
1076
        self.BurnModifyRuntimeMemory()
1077

    
1078
      if self.opts.do_replace1 and \
1079
           self.opts.disk_template in constants.DTS_INT_MIRROR:
1080
        self.BurnReplaceDisks1D8()
1081
      if (self.opts.do_replace2 and len(self.nodes) > 2 and
1082
          self.opts.disk_template in constants.DTS_INT_MIRROR):
1083
        self.BurnReplaceDisks2()
1084

    
1085
      if (self.opts.disk_template in constants.DTS_GROWABLE and
1086
          compat.any(n > 0 for n in self.disk_growth)):
1087
        self.BurnGrowDisks()
1088

    
1089
      if self.opts.do_failover and \
1090
           self.opts.disk_template in constants.DTS_MIRRORED:
1091
        self.BurnFailover()
1092

    
1093
      if self.opts.do_migrate:
1094
        if self.opts.disk_template not in constants.DTS_MIRRORED:
1095
          Log("Skipping migration (disk template %s does not support it)",
1096
              self.opts.disk_template)
1097
        elif not self.hv_can_migrate:
1098
          Log("Skipping migration (hypervisor %s does not support it)",
1099
              self.hypervisor)
1100
        else:
1101
          self.BurnMigrate()
1102

    
1103
      if (self.opts.do_move and len(self.nodes) > 1 and
1104
          self.opts.disk_template in [constants.DT_PLAIN, constants.DT_FILE]):
1105
        self.BurnMove()
1106

    
1107
      if (self.opts.do_importexport and
1108
          self.opts.disk_template in _IMPEXP_DISK_TEMPLATES):
1109
        self.BurnImportExport()
1110

    
1111
      if self.opts.do_reinstall:
1112
        self.BurnReinstall()
1113

    
1114
      if self.opts.do_reboot:
1115
        self.BurnReboot()
1116

    
1117
      if self.opts.do_renamesame:
1118
        self.BurnRenameSame()
1119

    
1120
      if self.opts.do_addremove_disks:
1121
        self.BurnAddRemoveDisks()
1122

    
1123
      default_nic_mode = self.cluster_default_nicparams[constants.NIC_MODE]
1124
      # Don't add/remove nics in routed mode, as we would need an ip to add
1125
      # them with
1126
      if self.opts.do_addremove_nics:
1127
        if default_nic_mode == constants.NIC_MODE_BRIDGED:
1128
          self.BurnAddRemoveNICs()
1129
        else:
1130
          Log("Skipping nic add/remove as the cluster is not in bridged mode")
1131

    
1132
      if self.opts.do_activate_disks:
1133
        self.BurnActivateDisks()
1134

    
1135
      if self.opts.rename:
1136
        self.BurnRename()
1137

    
1138
      if self.opts.do_confd_tests:
1139
        self.BurnConfd()
1140

    
1141
      if self.opts.do_startstop:
1142
        self.BurnStopStart()
1143

    
1144
      has_err = False
1145
    finally:
1146
      if has_err:
1147
        Log("Error detected: opcode buffer follows:\n\n")
1148
        Log(self.GetFeedbackBuf())
1149
        Log("\n\n")
1150
      if not self.opts.keep_instances:
1151
        try:
1152
          self.BurnRemove()
1153
        except Exception, err:  # pylint: disable=W0703
1154
          if has_err: # already detected errors, so errors in removal
1155
                      # are quite expected
1156
            Log("Note: error detected during instance remove: %s", err)
1157
          else: # non-expected error
1158
            raise
1159

    
1160
    return constants.EXIT_SUCCESS
1161

    
1162

    
1163
def Main():
1164
  """Main function.
1165

1166
  """
1167
  utils.SetupLogging(pathutils.LOG_BURNIN, sys.argv[0],
1168
                     debug=False, stderr_logging=True)
1169

    
1170
  return Burner().BurninCluster()