Statistics
| Branch: | Tag: | Revision:

root / lib / tools / burnin.py @ 8e55e20f

History | View | Annotate | Download (41.5 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Burnin program
23

24
"""
25

    
26
import sys
27
import optparse
28
import time
29
import socket
30
import urllib
31
from itertools import izip, islice, cycle
32
from cStringIO import StringIO
33

    
34
from ganeti import opcodes
35
from ganeti import constants
36
from ganeti import cli
37
from ganeti import errors
38
from ganeti import utils
39
from ganeti import hypervisor
40
from ganeti import compat
41
from ganeti import pathutils
42

    
43
from ganeti.confd import client as confd_client
44

    
45

    
46
USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...")
47

    
48
MAX_RETRIES = 3
49
LOG_HEADERS = {
50
  0: "- ",
51
  1: "* ",
52
  2: "",
53
  }
54

    
55
#: Disk templates supporting a single node
56
_SINGLE_NODE_DISK_TEMPLATES = compat.UniqueFrozenset([
57
  constants.DT_DISKLESS,
58
  constants.DT_PLAIN,
59
  constants.DT_FILE,
60
  constants.DT_SHARED_FILE,
61
  constants.DT_EXT,
62
  constants.DT_RBD,
63
  ])
64

    
65
_SUPPORTED_DISK_TEMPLATES = compat.UniqueFrozenset([
66
  constants.DT_DISKLESS,
67
  constants.DT_DRBD8,
68
  constants.DT_EXT,
69
  constants.DT_FILE,
70
  constants.DT_PLAIN,
71
  constants.DT_RBD,
72
  constants.DT_SHARED_FILE,
73
  ])
74

    
75
#: Disk templates for which import/export is tested
76
_IMPEXP_DISK_TEMPLATES = (_SUPPORTED_DISK_TEMPLATES - frozenset([
77
  constants.DT_DISKLESS,
78
  constants.DT_FILE,
79
  constants.DT_SHARED_FILE,
80
  ]))
81

    
82

    
83
class InstanceDown(Exception):
84
  """The checked instance was not up"""
85

    
86

    
87
class BurninFailure(Exception):
88
  """Failure detected during burning"""
89

    
90

    
91
def Usage():
92
  """Shows program usage information and exits the program."""
93

    
94
  print >> sys.stderr, "Usage:"
95
  print >> sys.stderr, USAGE
96
  sys.exit(2)
97

    
98

    
99
def Log(msg, *args, **kwargs):
100
  """Simple function that prints out its argument.
101

102
  """
103
  if args:
104
    msg = msg % args
105
  indent = kwargs.get("indent", 0)
106
  sys.stdout.write("%*s%s%s\n" % (2 * indent, "",
107
                                  LOG_HEADERS.get(indent, "  "), msg))
108
  sys.stdout.flush()
109

    
110

    
111
def Err(msg, exit_code=1):
112
  """Simple error logging that prints to stderr.
113

114
  """
115
  sys.stderr.write(msg + "\n")
116
  sys.stderr.flush()
117
  sys.exit(exit_code)
118

    
119

    
120
class SimpleOpener(urllib.FancyURLopener):
121
  """A simple url opener"""
122
  # pylint: disable=W0221
123

    
124
  def prompt_user_passwd(self, host, realm, clear_cache=0):
125
    """No-interaction version of prompt_user_passwd."""
126
    # we follow parent class' API
127
    # pylint: disable=W0613
128
    return None, None
129

    
130
  def http_error_default(self, url, fp, errcode, errmsg, headers):
131
    """Custom error handling"""
132
    # make sure sockets are not left in CLOSE_WAIT, this is similar
133
    # but with a different exception to the BasicURLOpener class
134
    _ = fp.read() # throw away data
135
    fp.close()
136
    raise InstanceDown("HTTP error returned: code %s, msg %s" %
137
                       (errcode, errmsg))
138

    
139

    
140
OPTIONS = [
141
  cli.cli_option("-o", "--os", dest="os", default=None,
142
                 help="OS to use during burnin",
143
                 metavar="<OS>",
144
                 completion_suggest=cli.OPT_COMPL_ONE_OS),
145
  cli.HYPERVISOR_OPT,
146
  cli.OSPARAMS_OPT,
147
  cli.cli_option("--disk-size", dest="disk_size",
148
                 help="Disk size (determines disk count)",
149
                 default="128m", type="string", metavar="<size,size,...>",
150
                 completion_suggest=("128M 512M 1G 4G 1G,256M"
151
                                     " 4G,1G,1G 10G").split()),
152
  cli.cli_option("--disk-growth", dest="disk_growth", help="Disk growth",
153
                 default="128m", type="string", metavar="<size,size,...>"),
154
  cli.cli_option("--mem-size", dest="mem_size", help="Memory size",
155
                 default=None, type="unit", metavar="<size>",
156
                 completion_suggest=("128M 256M 512M 1G 4G 8G"
157
                                     " 12G 16G").split()),
158
  cli.cli_option("--maxmem-size", dest="maxmem_size", help="Max Memory size",
159
                 default=256, type="unit", metavar="<size>",
160
                 completion_suggest=("128M 256M 512M 1G 4G 8G"
161
                                     " 12G 16G").split()),
162
  cli.cli_option("--minmem-size", dest="minmem_size", help="Min Memory size",
163
                 default=128, type="unit", metavar="<size>",
164
                 completion_suggest=("128M 256M 512M 1G 4G 8G"
165
                                     " 12G 16G").split()),
166
  cli.cli_option("--vcpu-count", dest="vcpu_count", help="VCPU count",
167
                 default=3, type="unit", metavar="<count>",
168
                 completion_suggest=("1 2 3 4").split()),
169
  cli.DEBUG_OPT,
170
  cli.VERBOSE_OPT,
171
  cli.NOIPCHECK_OPT,
172
  cli.NONAMECHECK_OPT,
173
  cli.EARLY_RELEASE_OPT,
174
  cli.cli_option("--no-replace1", dest="do_replace1",
175
                 help="Skip disk replacement with the same secondary",
176
                 action="store_false", default=True),
177
  cli.cli_option("--no-replace2", dest="do_replace2",
178
                 help="Skip disk replacement with a different secondary",
179
                 action="store_false", default=True),
180
  cli.cli_option("--no-failover", dest="do_failover",
181
                 help="Skip instance failovers", action="store_false",
182
                 default=True),
183
  cli.cli_option("--no-migrate", dest="do_migrate",
184
                 help="Skip instance live migration",
185
                 action="store_false", default=True),
186
  cli.cli_option("--no-move", dest="do_move",
187
                 help="Skip instance moves", action="store_false",
188
                 default=True),
189
  cli.cli_option("--no-importexport", dest="do_importexport",
190
                 help="Skip instance export/import", action="store_false",
191
                 default=True),
192
  cli.cli_option("--no-startstop", dest="do_startstop",
193
                 help="Skip instance stop/start", action="store_false",
194
                 default=True),
195
  cli.cli_option("--no-reinstall", dest="do_reinstall",
196
                 help="Skip instance reinstall", action="store_false",
197
                 default=True),
198
  cli.cli_option("--no-reboot", dest="do_reboot",
199
                 help="Skip instance reboot", action="store_false",
200
                 default=True),
201
  cli.cli_option("--no-renamesame", dest="do_renamesame",
202
                 help="Skip instance rename to same name", action="store_false",
203
                 default=True),
204
  cli.cli_option("--reboot-types", dest="reboot_types",
205
                 help="Specify the reboot types", default=None),
206
  cli.cli_option("--no-activate-disks", dest="do_activate_disks",
207
                 help="Skip disk activation/deactivation",
208
                 action="store_false", default=True),
209
  cli.cli_option("--no-add-disks", dest="do_addremove_disks",
210
                 help="Skip disk addition/removal",
211
                 action="store_false", default=True),
212
  cli.cli_option("--no-add-nics", dest="do_addremove_nics",
213
                 help="Skip NIC addition/removal",
214
                 action="store_false", default=True),
215
  cli.cli_option("--no-nics", dest="nics",
216
                 help="No network interfaces", action="store_const",
217
                 const=[], default=[{}]),
218
  cli.cli_option("--no-confd", dest="do_confd_tests",
219
                 help="Skip confd queries",
220
                 action="store_false", default=constants.ENABLE_CONFD),
221
  cli.cli_option("--rename", dest="rename", default=None,
222
                 help=("Give one unused instance name which is taken"
223
                       " to start the renaming sequence"),
224
                 metavar="<instance_name>"),
225
  cli.cli_option("-t", "--disk-template", dest="disk_template",
226
                 choices=list(_SUPPORTED_DISK_TEMPLATES),
227
                 default=constants.DT_DRBD8,
228
                 help=("Disk template (default %s, otherwise one of %s)" %
229
                       (constants.DT_DRBD8,
230
                        utils.CommaJoin(_SUPPORTED_DISK_TEMPLATES)))),
231
  cli.cli_option("-n", "--nodes", dest="nodes", default="",
232
                 help=("Comma separated list of nodes to perform"
233
                       " the burnin on (defaults to all nodes)"),
234
                 completion_suggest=cli.OPT_COMPL_MANY_NODES),
235
  cli.cli_option("-I", "--iallocator", dest="iallocator",
236
                 default=None, type="string",
237
                 help=("Perform the allocation using an iallocator"
238
                       " instead of fixed node spread (node restrictions no"
239
                       " longer apply, therefore -n/--nodes must not be"
240
                       " used"),
241
                 completion_suggest=cli.OPT_COMPL_ONE_IALLOCATOR),
242
  cli.cli_option("-p", "--parallel", default=False, action="store_true",
243
                 dest="parallel",
244
                 help=("Enable parallelization of some operations in"
245
                       " order to speed burnin or to test granular locking")),
246
  cli.cli_option("--net-timeout", default=15, type="int",
247
                 dest="net_timeout",
248
                 help=("The instance check network timeout in seconds"
249
                       " (defaults to 15 seconds)"),
250
                 completion_suggest="15 60 300 900".split()),
251
  cli.cli_option("-C", "--http-check", default=False, action="store_true",
252
                 dest="http_check",
253
                 help=("Enable checking of instance status via http,"
254
                       " looking for /hostname.txt that should contain the"
255
                       " name of the instance")),
256
  cli.cli_option("-K", "--keep-instances", default=False,
257
                 action="store_true",
258
                 dest="keep_instances",
259
                 help=("Leave instances on the cluster after burnin,"
260
                       " for investigation in case of errors or simply"
261
                       " to use them")),
262
  ]
263

    
264
# Mainly used for bash completion
265
ARGUMENTS = [cli.ArgInstance(min=1)]
266

    
267

    
268
def _DoCheckInstances(fn):
269
  """Decorator for checking instances.
270

271
  """
272
  def wrapper(self, *args, **kwargs):
273
    val = fn(self, *args, **kwargs)
274
    for instance in self.instances:
275
      self._CheckInstanceAlive(instance) # pylint: disable=W0212
276
    return val
277

    
278
  return wrapper
279

    
280

    
281
def _DoBatch(retry):
282
  """Decorator for possible batch operations.
283

284
  Must come after the _DoCheckInstances decorator (if any).
285

286
  @param retry: whether this is a retryable batch, will be
287
      passed to StartBatch
288

289
  """
290
  def wrap(fn):
291
    def batched(self, *args, **kwargs):
292
      self.StartBatch(retry)
293
      val = fn(self, *args, **kwargs)
294
      self.CommitQueue()
295
      return val
296
    return batched
297

    
298
  return wrap
299

    
300

    
301
class Burner(object):
302
  """Burner class."""
303

    
304
  def __init__(self):
305
    """Constructor."""
306
    self.url_opener = SimpleOpener()
307
    self._feed_buf = StringIO()
308
    self.nodes = []
309
    self.instances = []
310
    self.to_rem = []
311
    self.queued_ops = []
312
    self.opts = None
313
    self.queue_retry = False
314
    self.disk_count = self.disk_growth = self.disk_size = None
315
    self.hvp = self.bep = None
316
    self.ParseOptions()
317
    self.cl = cli.GetClient()
318
    self.GetState()
319

    
320
  def ClearFeedbackBuf(self):
321
    """Clear the feedback buffer."""
322
    self._feed_buf.truncate(0)
323

    
324
  def GetFeedbackBuf(self):
325
    """Return the contents of the buffer."""
326
    return self._feed_buf.getvalue()
327

    
328
  def Feedback(self, msg):
329
    """Acumulate feedback in our buffer."""
330
    formatted_msg = "%s %s" % (time.ctime(utils.MergeTime(msg[0])), msg[2])
331
    self._feed_buf.write(formatted_msg + "\n")
332
    if self.opts.verbose:
333
      Log(formatted_msg, indent=3)
334

    
335
  def MaybeRetry(self, retry_count, msg, fn, *args):
336
    """Possibly retry a given function execution.
337

338
    @type retry_count: int
339
    @param retry_count: retry counter:
340
        - 0: non-retryable action
341
        - 1: last retry for a retryable action
342
        - MAX_RETRIES: original try for a retryable action
343
    @type msg: str
344
    @param msg: the kind of the operation
345
    @type fn: callable
346
    @param fn: the function to be called
347

348
    """
349
    try:
350
      val = fn(*args)
351
      if retry_count > 0 and retry_count < MAX_RETRIES:
352
        Log("Idempotent %s succeeded after %d retries",
353
            msg, MAX_RETRIES - retry_count)
354
      return val
355
    except Exception, err: # pylint: disable=W0703
356
      if retry_count == 0:
357
        Log("Non-idempotent %s failed, aborting", msg)
358
        raise
359
      elif retry_count == 1:
360
        Log("Idempotent %s repeated failure, aborting", msg)
361
        raise
362
      else:
363
        Log("Idempotent %s failed, retry #%d/%d: %s",
364
            msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err)
365
        self.MaybeRetry(retry_count - 1, msg, fn, *args)
366

    
367
  def _ExecOp(self, *ops):
368
    """Execute one or more opcodes and manage the exec buffer.
369

370
    @return: if only opcode has been passed, we return its result;
371
        otherwise we return the list of results
372

373
    """
374
    job_id = cli.SendJob(ops, cl=self.cl)
375
    results = cli.PollJob(job_id, cl=self.cl, feedback_fn=self.Feedback)
376
    if len(ops) == 1:
377
      return results[0]
378
    else:
379
      return results
380

    
381
  def ExecOp(self, retry, *ops):
382
    """Execute one or more opcodes and manage the exec buffer.
383

384
    @return: if only opcode has been passed, we return its result;
385
        otherwise we return the list of results
386

387
    """
388
    if retry:
389
      rval = MAX_RETRIES
390
    else:
391
      rval = 0
392
    cli.SetGenericOpcodeOpts(ops, self.opts)
393
    return self.MaybeRetry(rval, "opcode", self._ExecOp, *ops)
394

    
395
  def ExecOrQueue(self, name, ops, post_process=None):
396
    """Execute an opcode and manage the exec buffer."""
397
    if self.opts.parallel:
398
      cli.SetGenericOpcodeOpts(ops, self.opts)
399
      self.queued_ops.append((ops, name, post_process))
400
    else:
401
      val = self.ExecOp(self.queue_retry, *ops) # pylint: disable=W0142
402
      if post_process is not None:
403
        post_process()
404
      return val
405

    
406
  def StartBatch(self, retry):
407
    """Start a new batch of jobs.
408

409
    @param retry: whether this is a retryable batch
410

411
    """
412
    self.queued_ops = []
413
    self.queue_retry = retry
414

    
415
  def CommitQueue(self):
416
    """Execute all submitted opcodes in case of parallel burnin"""
417
    if not self.opts.parallel or not self.queued_ops:
418
      return
419

    
420
    if self.queue_retry:
421
      rval = MAX_RETRIES
422
    else:
423
      rval = 0
424

    
425
    try:
426
      results = self.MaybeRetry(rval, "jobset", self.ExecJobSet,
427
                                self.queued_ops)
428
    finally:
429
      self.queued_ops = []
430
    return results
431

    
432
  def ExecJobSet(self, jobs):
433
    """Execute a set of jobs and return once all are done.
434

435
    The method will return the list of results, if all jobs are
436
    successful. Otherwise, OpExecError will be raised from within
437
    cli.py.
438

439
    """
440
    self.ClearFeedbackBuf()
441
    jex = cli.JobExecutor(cl=self.cl, feedback_fn=self.Feedback)
442
    for ops, name, _ in jobs:
443
      jex.QueueJob(name, *ops) # pylint: disable=W0142
444
    try:
445
      results = jex.GetResults()
446
    except Exception, err: # pylint: disable=W0703
447
      Log("Jobs failed: %s", err)
448
      raise BurninFailure()
449

    
450
    fail = False
451
    val = []
452
    for (_, name, post_process), (success, result) in zip(jobs, results):
453
      if success:
454
        if post_process:
455
          try:
456
            post_process()
457
          except Exception, err: # pylint: disable=W0703
458
            Log("Post process call for job %s failed: %s", name, err)
459
            fail = True
460
        val.append(result)
461
      else:
462
        fail = True
463

    
464
    if fail:
465
      raise BurninFailure()
466

    
467
    return val
468

    
469
  def ParseOptions(self):
470
    """Parses the command line options.
471

472
    In case of command line errors, it will show the usage and exit the
473
    program.
474

475
    """
476
    parser = optparse.OptionParser(usage="\n%s" % USAGE,
477
                                   version=("%%prog (ganeti) %s" %
478
                                            constants.RELEASE_VERSION),
479
                                   option_list=OPTIONS)
480

    
481
    options, args = parser.parse_args()
482
    if len(args) < 1 or options.os is None:
483
      Usage()
484

    
485
    if options.mem_size:
486
      options.maxmem_size = options.mem_size
487
      options.minmem_size = options.mem_size
488
    elif options.minmem_size > options.maxmem_size:
489
      Err("Maximum memory lower than minimum memory")
490

    
491
    if options.disk_template not in _SUPPORTED_DISK_TEMPLATES:
492
      Err("Unknown or unsupported disk template '%s'" % options.disk_template)
493

    
494
    if options.disk_template == constants.DT_DISKLESS:
495
      disk_size = disk_growth = []
496
      options.do_addremove_disks = False
497
    else:
498
      disk_size = [utils.ParseUnit(v) for v in options.disk_size.split(",")]
499
      disk_growth = [utils.ParseUnit(v)
500
                     for v in options.disk_growth.split(",")]
501
      if len(disk_growth) != len(disk_size):
502
        Err("Wrong disk sizes/growth combination")
503
    if ((disk_size and options.disk_template == constants.DT_DISKLESS) or
504
        (not disk_size and options.disk_template != constants.DT_DISKLESS)):
505
      Err("Wrong disk count/disk template combination")
506

    
507
    self.disk_size = disk_size
508
    self.disk_growth = disk_growth
509
    self.disk_count = len(disk_size)
510

    
511
    if options.nodes and options.iallocator:
512
      Err("Give either the nodes option or the iallocator option, not both")
513

    
514
    if options.http_check and not options.name_check:
515
      Err("Can't enable HTTP checks without name checks")
516

    
517
    self.opts = options
518
    self.instances = args
519
    self.bep = {
520
      constants.BE_MINMEM: options.minmem_size,
521
      constants.BE_MAXMEM: options.maxmem_size,
522
      constants.BE_VCPUS: options.vcpu_count,
523
      }
524

    
525
    self.hypervisor = None
526
    self.hvp = {}
527
    if options.hypervisor:
528
      self.hypervisor, self.hvp = options.hypervisor
529

    
530
    if options.reboot_types is None:
531
      options.reboot_types = constants.REBOOT_TYPES
532
    else:
533
      options.reboot_types = options.reboot_types.split(",")
534
      rt_diff = set(options.reboot_types).difference(constants.REBOOT_TYPES)
535
      if rt_diff:
536
        Err("Invalid reboot types specified: %s" % utils.CommaJoin(rt_diff))
537

    
538
    socket.setdefaulttimeout(options.net_timeout)
539

    
540
  def GetState(self):
541
    """Read the cluster state from the master daemon."""
542
    if self.opts.nodes:
543
      names = self.opts.nodes.split(",")
544
    else:
545
      names = []
546
    try:
547
      op = opcodes.OpNodeQuery(output_fields=["name", "offline", "drained"],
548
                               names=names, use_locking=True)
549
      result = self.ExecOp(True, op)
550
    except errors.GenericError, err:
551
      err_code, msg = cli.FormatError(err)
552
      Err(msg, exit_code=err_code)
553
    self.nodes = [data[0] for data in result if not (data[1] or data[2])]
554

    
555
    op_diagnose = opcodes.OpOsDiagnose(output_fields=["name",
556
                                                      "variants",
557
                                                      "hidden"],
558
                                       names=[])
559
    result = self.ExecOp(True, op_diagnose)
560

    
561
    if not result:
562
      Err("Can't get the OS list")
563

    
564
    found = False
565
    for (name, variants, _) in result:
566
      if self.opts.os in cli.CalculateOSNames(name, variants):
567
        found = True
568
        break
569

    
570
    if not found:
571
      Err("OS '%s' not found" % self.opts.os)
572

    
573
    cluster_info = self.cl.QueryClusterInfo()
574
    self.cluster_info = cluster_info
575
    if not self.cluster_info:
576
      Err("Can't get cluster info")
577

    
578
    default_nic_params = self.cluster_info["nicparams"][constants.PP_DEFAULT]
579
    self.cluster_default_nicparams = default_nic_params
580
    if self.hypervisor is None:
581
      self.hypervisor = self.cluster_info["default_hypervisor"]
582
    self.hv_class = hypervisor.GetHypervisorClass(self.hypervisor)
583

    
584
  @_DoCheckInstances
585
  @_DoBatch(False)
586
  def BurnCreateInstances(self):
587
    """Create the given instances.
588

589
    """
590
    self.to_rem = []
591
    mytor = izip(cycle(self.nodes),
592
                 islice(cycle(self.nodes), 1, None),
593
                 self.instances)
594

    
595
    Log("Creating instances")
596
    for pnode, snode, instance in mytor:
597
      Log("instance %s", instance, indent=1)
598
      if self.opts.iallocator:
599
        pnode = snode = None
600
        msg = "with iallocator %s" % self.opts.iallocator
601
      elif self.opts.disk_template not in constants.DTS_INT_MIRROR:
602
        snode = None
603
        msg = "on %s" % pnode
604
      else:
605
        msg = "on %s, %s" % (pnode, snode)
606

    
607
      Log(msg, indent=2)
608

    
609
      op = opcodes.OpInstanceCreate(instance_name=instance,
610
                                    disks=[{"size": size}
611
                                           for size in self.disk_size],
612
                                    disk_template=self.opts.disk_template,
613
                                    nics=self.opts.nics,
614
                                    mode=constants.INSTANCE_CREATE,
615
                                    os_type=self.opts.os,
616
                                    pnode=pnode,
617
                                    snode=snode,
618
                                    start=True,
619
                                    ip_check=self.opts.ip_check,
620
                                    name_check=self.opts.name_check,
621
                                    wait_for_sync=True,
622
                                    file_driver="loop",
623
                                    file_storage_dir=None,
624
                                    iallocator=self.opts.iallocator,
625
                                    beparams=self.bep,
626
                                    hvparams=self.hvp,
627
                                    hypervisor=self.hypervisor,
628
                                    osparams=self.opts.osparams,
629
                                    )
630
      remove_instance = lambda name: lambda: self.to_rem.append(name)
631
      self.ExecOrQueue(instance, [op], post_process=remove_instance(instance))
632

    
633
  @_DoBatch(False)
634
  def BurnModifyRuntimeMemory(self):
635
    """Alter the runtime memory."""
636
    Log("Setting instance runtime memory")
637
    for instance in self.instances:
638
      Log("instance %s", instance, indent=1)
639
      tgt_mem = self.bep[constants.BE_MINMEM]
640
      op = opcodes.OpInstanceSetParams(instance_name=instance,
641
                                       runtime_mem=tgt_mem)
642
      Log("Set memory to %s MB", tgt_mem, indent=2)
643
      self.ExecOrQueue(instance, [op])
644

    
645
  @_DoBatch(False)
646
  def BurnGrowDisks(self):
647
    """Grow both the os and the swap disks by the requested amount, if any."""
648
    Log("Growing disks")
649
    for instance in self.instances:
650
      Log("instance %s", instance, indent=1)
651
      for idx, growth in enumerate(self.disk_growth):
652
        if growth > 0:
653
          op = opcodes.OpInstanceGrowDisk(instance_name=instance, disk=idx,
654
                                          amount=growth, wait_for_sync=True)
655
          Log("increase disk/%s by %s MB", idx, growth, indent=2)
656
          self.ExecOrQueue(instance, [op])
657

    
658
  @_DoBatch(True)
659
  def BurnReplaceDisks1D8(self):
660
    """Replace disks on primary and secondary for drbd8."""
661
    Log("Replacing disks on the same nodes")
662
    early_release = self.opts.early_release
663
    for instance in self.instances:
664
      Log("instance %s", instance, indent=1)
665
      ops = []
666
      for mode in constants.REPLACE_DISK_SEC, constants.REPLACE_DISK_PRI:
667
        op = opcodes.OpInstanceReplaceDisks(instance_name=instance,
668
                                            mode=mode,
669
                                            disks=list(range(self.disk_count)),
670
                                            early_release=early_release)
671
        Log("run %s", mode, indent=2)
672
        ops.append(op)
673
      self.ExecOrQueue(instance, ops)
674

    
675
  @_DoBatch(True)
676
  def BurnReplaceDisks2(self):
677
    """Replace secondary node."""
678
    Log("Changing the secondary node")
679
    mode = constants.REPLACE_DISK_CHG
680

    
681
    mytor = izip(islice(cycle(self.nodes), 2, None),
682
                 self.instances)
683
    for tnode, instance in mytor:
684
      Log("instance %s", instance, indent=1)
685
      if self.opts.iallocator:
686
        tnode = None
687
        msg = "with iallocator %s" % self.opts.iallocator
688
      else:
689
        msg = tnode
690
      op = opcodes.OpInstanceReplaceDisks(instance_name=instance,
691
                                          mode=mode,
692
                                          remote_node=tnode,
693
                                          iallocator=self.opts.iallocator,
694
                                          disks=[],
695
                                          early_release=self.opts.early_release)
696
      Log("run %s %s", mode, msg, indent=2)
697
      self.ExecOrQueue(instance, [op])
698

    
699
  @_DoCheckInstances
700
  @_DoBatch(False)
701
  def BurnFailover(self):
702
    """Failover the instances."""
703
    Log("Failing over instances")
704
    for instance in self.instances:
705
      Log("instance %s", instance, indent=1)
706
      op = opcodes.OpInstanceFailover(instance_name=instance,
707
                                      ignore_consistency=False)
708
      self.ExecOrQueue(instance, [op])
709

    
710
  @_DoCheckInstances
711
  @_DoBatch(False)
712
  def BurnMove(self):
713
    """Move the instances."""
714
    Log("Moving instances")
715
    mytor = izip(islice(cycle(self.nodes), 1, None),
716
                 self.instances)
717
    for tnode, instance in mytor:
718
      Log("instance %s", instance, indent=1)
719
      op = opcodes.OpInstanceMove(instance_name=instance,
720
                                  target_node=tnode)
721
      self.ExecOrQueue(instance, [op])
722

    
723
  @_DoBatch(False)
724
  def BurnMigrate(self):
725
    """Migrate the instances."""
726
    Log("Migrating instances")
727
    for instance in self.instances:
728
      Log("instance %s", instance, indent=1)
729
      op1 = opcodes.OpInstanceMigrate(instance_name=instance, mode=None,
730
                                      cleanup=False)
731

    
732
      op2 = opcodes.OpInstanceMigrate(instance_name=instance, mode=None,
733
                                      cleanup=True)
734
      Log("migration and migration cleanup", indent=2)
735
      self.ExecOrQueue(instance, [op1, op2])
736

    
737
  @_DoCheckInstances
738
  @_DoBatch(False)
739
  def BurnImportExport(self):
740
    """Export the instance, delete it, and import it back.
741

742
    """
743
    Log("Exporting and re-importing instances")
744
    mytor = izip(cycle(self.nodes),
745
                 islice(cycle(self.nodes), 1, None),
746
                 islice(cycle(self.nodes), 2, None),
747
                 self.instances)
748

    
749
    for pnode, snode, enode, instance in mytor:
750
      Log("instance %s", instance, indent=1)
751
      # read the full name of the instance
752
      nam_op = opcodes.OpInstanceQuery(output_fields=["name"],
753
                                       names=[instance], use_locking=True)
754
      full_name = self.ExecOp(False, nam_op)[0][0]
755

    
756
      if self.opts.iallocator:
757
        pnode = snode = None
758
        import_log_msg = ("import from %s"
759
                          " with iallocator %s" %
760
                          (enode, self.opts.iallocator))
761
      elif self.opts.disk_template not in constants.DTS_INT_MIRROR:
762
        snode = None
763
        import_log_msg = ("import from %s to %s" %
764
                          (enode, pnode))
765
      else:
766
        import_log_msg = ("import from %s to %s, %s" %
767
                          (enode, pnode, snode))
768

    
769
      exp_op = opcodes.OpBackupExport(instance_name=instance,
770
                                      target_node=enode,
771
                                      mode=constants.EXPORT_MODE_LOCAL,
772
                                      shutdown=True)
773
      rem_op = opcodes.OpInstanceRemove(instance_name=instance,
774
                                        ignore_failures=True)
775
      imp_dir = utils.PathJoin(pathutils.EXPORT_DIR, full_name)
776
      imp_op = opcodes.OpInstanceCreate(instance_name=instance,
777
                                        disks=[{"size": size}
778
                                               for size in self.disk_size],
779
                                        disk_template=self.opts.disk_template,
780
                                        nics=self.opts.nics,
781
                                        mode=constants.INSTANCE_IMPORT,
782
                                        src_node=enode,
783
                                        src_path=imp_dir,
784
                                        pnode=pnode,
785
                                        snode=snode,
786
                                        start=True,
787
                                        ip_check=self.opts.ip_check,
788
                                        name_check=self.opts.name_check,
789
                                        wait_for_sync=True,
790
                                        file_storage_dir=None,
791
                                        file_driver="loop",
792
                                        iallocator=self.opts.iallocator,
793
                                        beparams=self.bep,
794
                                        hvparams=self.hvp,
795
                                        osparams=self.opts.osparams,
796
                                        )
797

    
798
      erem_op = opcodes.OpBackupRemove(instance_name=instance)
799

    
800
      Log("export to node %s", enode, indent=2)
801
      Log("remove instance", indent=2)
802
      Log(import_log_msg, indent=2)
803
      Log("remove export", indent=2)
804
      self.ExecOrQueue(instance, [exp_op, rem_op, imp_op, erem_op])
805

    
806
  @staticmethod
807
  def StopInstanceOp(instance):
808
    """Stop given instance."""
809
    return opcodes.OpInstanceShutdown(instance_name=instance)
810

    
811
  @staticmethod
812
  def StartInstanceOp(instance):
813
    """Start given instance."""
814
    return opcodes.OpInstanceStartup(instance_name=instance, force=False)
815

    
816
  @staticmethod
817
  def RenameInstanceOp(instance, instance_new):
818
    """Rename instance."""
819
    return opcodes.OpInstanceRename(instance_name=instance,
820
                                    new_name=instance_new)
821

    
822
  @_DoCheckInstances
823
  @_DoBatch(True)
824
  def BurnStopStart(self):
825
    """Stop/start the instances."""
826
    Log("Stopping and starting instances")
827
    for instance in self.instances:
828
      Log("instance %s", instance, indent=1)
829
      op1 = self.StopInstanceOp(instance)
830
      op2 = self.StartInstanceOp(instance)
831
      self.ExecOrQueue(instance, [op1, op2])
832

    
833
  @_DoBatch(False)
834
  def BurnRemove(self):
835
    """Remove the instances."""
836
    Log("Removing instances")
837
    for instance in self.to_rem:
838
      Log("instance %s", instance, indent=1)
839
      op = opcodes.OpInstanceRemove(instance_name=instance,
840
                                    ignore_failures=True)
841
      self.ExecOrQueue(instance, [op])
842

    
843
  def BurnRename(self):
844
    """Rename the instances.
845

846
    Note that this function will not execute in parallel, since we
847
    only have one target for rename.
848

849
    """
850
    Log("Renaming instances")
851
    rename = self.opts.rename
852
    for instance in self.instances:
853
      Log("instance %s", instance, indent=1)
854
      op_stop1 = self.StopInstanceOp(instance)
855
      op_stop2 = self.StopInstanceOp(rename)
856
      op_rename1 = self.RenameInstanceOp(instance, rename)
857
      op_rename2 = self.RenameInstanceOp(rename, instance)
858
      op_start1 = self.StartInstanceOp(rename)
859
      op_start2 = self.StartInstanceOp(instance)
860
      self.ExecOp(False, op_stop1, op_rename1, op_start1)
861
      self._CheckInstanceAlive(rename)
862
      self.ExecOp(False, op_stop2, op_rename2, op_start2)
863
      self._CheckInstanceAlive(instance)
864

    
865
  @_DoCheckInstances
866
  @_DoBatch(True)
867
  def BurnReinstall(self):
868
    """Reinstall the instances."""
869
    Log("Reinstalling instances")
870
    for instance in self.instances:
871
      Log("instance %s", instance, indent=1)
872
      op1 = self.StopInstanceOp(instance)
873
      op2 = opcodes.OpInstanceReinstall(instance_name=instance)
874
      Log("reinstall without passing the OS", indent=2)
875
      op3 = opcodes.OpInstanceReinstall(instance_name=instance,
876
                                        os_type=self.opts.os)
877
      Log("reinstall specifying the OS", indent=2)
878
      op4 = self.StartInstanceOp(instance)
879
      self.ExecOrQueue(instance, [op1, op2, op3, op4])
880

    
881
  @_DoCheckInstances
882
  @_DoBatch(True)
883
  def BurnReboot(self):
884
    """Reboot the instances."""
885
    Log("Rebooting instances")
886
    for instance in self.instances:
887
      Log("instance %s", instance, indent=1)
888
      ops = []
889
      for reboot_type in self.opts.reboot_types:
890
        op = opcodes.OpInstanceReboot(instance_name=instance,
891
                                      reboot_type=reboot_type,
892
                                      ignore_secondaries=False)
893
        Log("reboot with type '%s'", reboot_type, indent=2)
894
        ops.append(op)
895
      self.ExecOrQueue(instance, ops)
896

    
897
  @_DoCheckInstances
898
  @_DoBatch(True)
899
  def BurnRenameSame(self):
900
    """Rename the instances to their own name."""
901
    Log("Renaming the instances to their own name")
902
    for instance in self.instances:
903
      Log("instance %s", instance, indent=1)
904
      op1 = self.StopInstanceOp(instance)
905
      op2 = self.RenameInstanceOp(instance, instance)
906
      Log("rename to the same name", indent=2)
907
      op4 = self.StartInstanceOp(instance)
908
      self.ExecOrQueue(instance, [op1, op2, op4])
909

    
910
  @_DoCheckInstances
911
  @_DoBatch(True)
912
  def BurnActivateDisks(self):
913
    """Activate and deactivate disks of the instances."""
914
    Log("Activating/deactivating disks")
915
    for instance in self.instances:
916
      Log("instance %s", instance, indent=1)
917
      op_start = self.StartInstanceOp(instance)
918
      op_act = opcodes.OpInstanceActivateDisks(instance_name=instance)
919
      op_deact = opcodes.OpInstanceDeactivateDisks(instance_name=instance)
920
      op_stop = self.StopInstanceOp(instance)
921
      Log("activate disks when online", indent=2)
922
      Log("activate disks when offline", indent=2)
923
      Log("deactivate disks (when offline)", indent=2)
924
      self.ExecOrQueue(instance, [op_act, op_stop, op_act, op_deact, op_start])
925

    
926
  @_DoCheckInstances
927
  @_DoBatch(False)
928
  def BurnAddRemoveDisks(self):
929
    """Add and remove an extra disk for the instances."""
930
    Log("Adding and removing disks")
931
    for instance in self.instances:
932
      Log("instance %s", instance, indent=1)
933
      op_add = opcodes.OpInstanceSetParams(
934
        instance_name=instance,
935
        disks=[(constants.DDM_ADD, {"size": self.disk_size[0]})])
936
      op_rem = opcodes.OpInstanceSetParams(
937
        instance_name=instance, disks=[(constants.DDM_REMOVE, {})])
938
      op_stop = self.StopInstanceOp(instance)
939
      op_start = self.StartInstanceOp(instance)
940
      Log("adding a disk", indent=2)
941
      Log("removing last disk", indent=2)
942
      self.ExecOrQueue(instance, [op_add, op_stop, op_rem, op_start])
943

    
944
  @_DoBatch(False)
945
  def BurnAddRemoveNICs(self):
946
    """Add, change and remove an extra NIC for the instances."""
947
    Log("Adding and removing NICs")
948
    for instance in self.instances:
949
      Log("instance %s", instance, indent=1)
950
      op_add = opcodes.OpInstanceSetParams(
951
        instance_name=instance, nics=[(constants.DDM_ADD, {})])
952
      op_chg = opcodes.OpInstanceSetParams(
953
        instance_name=instance, nics=[(constants.DDM_MODIFY,
954
                                       -1, {"mac": constants.VALUE_GENERATE})])
955
      op_rem = opcodes.OpInstanceSetParams(
956
        instance_name=instance, nics=[(constants.DDM_REMOVE, {})])
957
      Log("adding a NIC", indent=2)
958
      Log("changing a NIC", indent=2)
959
      Log("removing last NIC", indent=2)
960
      self.ExecOrQueue(instance, [op_add, op_chg, op_rem])
961

    
962
  def ConfdCallback(self, reply):
963
    """Callback for confd queries"""
964
    if reply.type == confd_client.UPCALL_REPLY:
965
      if reply.server_reply.status != constants.CONFD_REPL_STATUS_OK:
966
        Err("Query %s gave non-ok status %s: %s" % (reply.orig_request,
967
                                                    reply.server_reply.status,
968
                                                    reply.server_reply))
969
      if reply.orig_request.type == constants.CONFD_REQ_PING:
970
        Log("Ping: OK", indent=1)
971
      elif reply.orig_request.type == constants.CONFD_REQ_CLUSTER_MASTER:
972
        if reply.server_reply.answer == self.cluster_info["master"]:
973
          Log("Master: OK", indent=1)
974
        else:
975
          Err("Master: wrong: %s" % reply.server_reply.answer)
976
      elif reply.orig_request.type == constants.CONFD_REQ_NODE_ROLE_BYNAME:
977
        if reply.server_reply.answer == constants.CONFD_NODE_ROLE_MASTER:
978
          Log("Node role for master: OK", indent=1)
979
        else:
980
          Err("Node role for master: wrong: %s" % reply.server_reply.answer)
981

    
982
  def DoConfdRequestReply(self, req):
983
    self.confd_counting_callback.RegisterQuery(req.rsalt)
984
    self.confd_client.SendRequest(req, async=False)
985
    while not self.confd_counting_callback.AllAnswered():
986
      if not self.confd_client.ReceiveReply():
987
        Err("Did not receive all expected confd replies")
988
        break
989

    
990
  def BurnConfd(self):
991
    """Run confd queries for our instances.
992

993
    The following confd queries are tested:
994
      - CONFD_REQ_PING: simple ping
995
      - CONFD_REQ_CLUSTER_MASTER: cluster master
996
      - CONFD_REQ_NODE_ROLE_BYNAME: node role, for the master
997

998
    """
999
    Log("Checking confd results")
1000

    
1001
    filter_callback = confd_client.ConfdFilterCallback(self.ConfdCallback)
1002
    counting_callback = confd_client.ConfdCountingCallback(filter_callback)
1003
    self.confd_counting_callback = counting_callback
1004

    
1005
    self.confd_client = confd_client.GetConfdClient(counting_callback)
1006

    
1007
    req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
1008
    self.DoConfdRequestReply(req)
1009

    
1010
    req = confd_client.ConfdClientRequest(
1011
      type=constants.CONFD_REQ_CLUSTER_MASTER)
1012
    self.DoConfdRequestReply(req)
1013

    
1014
    req = confd_client.ConfdClientRequest(
1015
        type=constants.CONFD_REQ_NODE_ROLE_BYNAME,
1016
        query=self.cluster_info["master"])
1017
    self.DoConfdRequestReply(req)
1018

    
1019
  def _CheckInstanceAlive(self, instance):
1020
    """Check if an instance is alive by doing http checks.
1021

1022
    This will try to retrieve the url on the instance /hostname.txt
1023
    and check that it contains the hostname of the instance. In case
1024
    we get ECONNREFUSED, we retry up to the net timeout seconds, for
1025
    any other error we abort.
1026

1027
    """
1028
    if not self.opts.http_check:
1029
      return
1030
    end_time = time.time() + self.opts.net_timeout
1031
    url = None
1032
    while time.time() < end_time and url is None:
1033
      try:
1034
        url = self.url_opener.open("http://%s/hostname.txt" % instance)
1035
      except IOError:
1036
        # here we can have connection refused, no route to host, etc.
1037
        time.sleep(1)
1038
    if url is None:
1039
      raise InstanceDown(instance, "Cannot contact instance")
1040
    hostname = url.read().strip()
1041
    url.close()
1042
    if hostname != instance:
1043
      raise InstanceDown(instance, ("Hostname mismatch, expected %s, got %s" %
1044
                                    (instance, hostname)))
1045

    
1046
  def BurninCluster(self):
1047
    """Test a cluster intensively.
1048

1049
    This will create instances and then start/stop/failover them.
1050
    It is safe for existing instances but could impact performance.
1051

1052
    """
1053

    
1054
    opts = self.opts
1055

    
1056
    Log("Testing global parameters")
1057

    
1058
    if (len(self.nodes) == 1 and
1059
        opts.disk_template not in _SINGLE_NODE_DISK_TEMPLATES):
1060
      Err("When one node is available/selected the disk template must"
1061
          " be one of %s" % utils.CommaJoin(_SINGLE_NODE_DISK_TEMPLATES))
1062

    
1063
    if opts.do_confd_tests and not constants.ENABLE_CONFD:
1064
      Err("You selected confd tests but confd was disabled at configure time")
1065

    
1066
    has_err = True
1067
    try:
1068
      self.BurnCreateInstances()
1069

    
1070
      if self.bep[constants.BE_MINMEM] < self.bep[constants.BE_MAXMEM]:
1071
        self.BurnModifyRuntimeMemory()
1072

    
1073
      if opts.do_replace1 and opts.disk_template in constants.DTS_INT_MIRROR:
1074
        self.BurnReplaceDisks1D8()
1075
      if (opts.do_replace2 and len(self.nodes) > 2 and
1076
          opts.disk_template in constants.DTS_INT_MIRROR):
1077
        self.BurnReplaceDisks2()
1078

    
1079
      if (opts.disk_template in constants.DTS_GROWABLE and
1080
          compat.any(n > 0 for n in self.disk_growth)):
1081
        self.BurnGrowDisks()
1082

    
1083
      if opts.do_failover and opts.disk_template in constants.DTS_MIRRORED:
1084
        self.BurnFailover()
1085

    
1086
      if opts.do_migrate:
1087
        if opts.disk_template not in constants.DTS_MIRRORED:
1088
          Log("Skipping migration (disk template %s does not support it)",
1089
              opts.disk_template)
1090
        elif not self.hv_class.CAN_MIGRATE:
1091
          Log("Skipping migration (hypervisor %s does not support it)",
1092
              self.hypervisor)
1093
        else:
1094
          self.BurnMigrate()
1095

    
1096
      if (opts.do_move and len(self.nodes) > 1 and
1097
          opts.disk_template in [constants.DT_PLAIN, constants.DT_FILE]):
1098
        self.BurnMove()
1099

    
1100
      if (opts.do_importexport and
1101
          opts.disk_template in _IMPEXP_DISK_TEMPLATES):
1102
        self.BurnImportExport()
1103

    
1104
      if opts.do_reinstall:
1105
        self.BurnReinstall()
1106

    
1107
      if opts.do_reboot:
1108
        self.BurnReboot()
1109

    
1110
      if opts.do_renamesame:
1111
        self.BurnRenameSame()
1112

    
1113
      if opts.do_addremove_disks:
1114
        self.BurnAddRemoveDisks()
1115

    
1116
      default_nic_mode = self.cluster_default_nicparams[constants.NIC_MODE]
1117
      # Don't add/remove nics in routed mode, as we would need an ip to add
1118
      # them with
1119
      if opts.do_addremove_nics:
1120
        if default_nic_mode == constants.NIC_MODE_BRIDGED:
1121
          self.BurnAddRemoveNICs()
1122
        else:
1123
          Log("Skipping nic add/remove as the cluster is not in bridged mode")
1124

    
1125
      if opts.do_activate_disks:
1126
        self.BurnActivateDisks()
1127

    
1128
      if opts.rename:
1129
        self.BurnRename()
1130

    
1131
      if opts.do_confd_tests:
1132
        self.BurnConfd()
1133

    
1134
      if opts.do_startstop:
1135
        self.BurnStopStart()
1136

    
1137
      has_err = False
1138
    finally:
1139
      if has_err:
1140
        Log("Error detected: opcode buffer follows:\n\n")
1141
        Log(self.GetFeedbackBuf())
1142
        Log("\n\n")
1143
      if not self.opts.keep_instances:
1144
        try:
1145
          self.BurnRemove()
1146
        except Exception, err:  # pylint: disable=W0703
1147
          if has_err: # already detected errors, so errors in removal
1148
                      # are quite expected
1149
            Log("Note: error detected during instance remove: %s", err)
1150
          else: # non-expected error
1151
            raise
1152

    
1153
    return constants.EXIT_SUCCESS
1154

    
1155

    
1156
def Main():
1157
  """Main function.
1158

1159
  """
1160
  utils.SetupLogging(pathutils.LOG_BURNIN, sys.argv[0],
1161
                     debug=False, stderr_logging=True)
1162

    
1163
  return Burner().BurninCluster()