Statistics
| Branch: | Tag: | Revision:

root / lib / tools / burnin.py @ cfe9bed3

History | View | Annotate | Download (41.6 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Burnin program
23

24
"""
25

    
26
import sys
27
import optparse
28
import time
29
import socket
30
import urllib
31
from itertools import izip, islice, cycle
32
from cStringIO import StringIO
33

    
34
from ganeti import opcodes
35
from ganeti import constants
36
from ganeti import cli
37
from ganeti import errors
38
from ganeti import utils
39
from ganeti import hypervisor
40
from ganeti import compat
41
from ganeti import pathutils
42

    
43
from ganeti.confd import client as confd_client
44
from ganeti.runtime import (GetClient)
45

    
46

    
47
USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...")
48

    
49
MAX_RETRIES = 3
50
LOG_HEADERS = {
51
  0: "- ",
52
  1: "* ",
53
  2: "",
54
  }
55

    
56
#: Disk templates supporting a single node
57
_SINGLE_NODE_DISK_TEMPLATES = compat.UniqueFrozenset([
58
  constants.DT_DISKLESS,
59
  constants.DT_PLAIN,
60
  constants.DT_FILE,
61
  constants.DT_SHARED_FILE,
62
  constants.DT_EXT,
63
  constants.DT_RBD,
64
  ])
65

    
66
_SUPPORTED_DISK_TEMPLATES = compat.UniqueFrozenset([
67
  constants.DT_DISKLESS,
68
  constants.DT_DRBD8,
69
  constants.DT_EXT,
70
  constants.DT_FILE,
71
  constants.DT_PLAIN,
72
  constants.DT_RBD,
73
  constants.DT_SHARED_FILE,
74
  ])
75

    
76
#: Disk templates for which import/export is tested
77
_IMPEXP_DISK_TEMPLATES = (_SUPPORTED_DISK_TEMPLATES - frozenset([
78
  constants.DT_DISKLESS,
79
  constants.DT_FILE,
80
  constants.DT_SHARED_FILE,
81
  ]))
82

    
83

    
84
class InstanceDown(Exception):
85
  """The checked instance was not up"""
86

    
87

    
88
class BurninFailure(Exception):
89
  """Failure detected during burning"""
90

    
91

    
92
def Usage():
93
  """Shows program usage information and exits the program."""
94

    
95
  print >> sys.stderr, "Usage:"
96
  print >> sys.stderr, USAGE
97
  sys.exit(2)
98

    
99

    
100
def Log(msg, *args, **kwargs):
101
  """Simple function that prints out its argument.
102

103
  """
104
  if args:
105
    msg = msg % args
106
  indent = kwargs.get("indent", 0)
107
  sys.stdout.write("%*s%s%s\n" % (2 * indent, "",
108
                                  LOG_HEADERS.get(indent, "  "), msg))
109
  sys.stdout.flush()
110

    
111

    
112
def Err(msg, exit_code=1):
113
  """Simple error logging that prints to stderr.
114

115
  """
116
  sys.stderr.write(msg + "\n")
117
  sys.stderr.flush()
118
  sys.exit(exit_code)
119

    
120

    
121
class SimpleOpener(urllib.FancyURLopener):
122
  """A simple url opener"""
123
  # pylint: disable=W0221
124

    
125
  def prompt_user_passwd(self, host, realm, clear_cache=0):
126
    """No-interaction version of prompt_user_passwd."""
127
    # we follow parent class' API
128
    # pylint: disable=W0613
129
    return None, None
130

    
131
  def http_error_default(self, url, fp, errcode, errmsg, headers):
132
    """Custom error handling"""
133
    # make sure sockets are not left in CLOSE_WAIT, this is similar
134
    # but with a different exception to the BasicURLOpener class
135
    _ = fp.read() # throw away data
136
    fp.close()
137
    raise InstanceDown("HTTP error returned: code %s, msg %s" %
138
                       (errcode, errmsg))
139

    
140

    
141
OPTIONS = [
142
  cli.cli_option("-o", "--os", dest="os", default=None,
143
                 help="OS to use during burnin",
144
                 metavar="<OS>",
145
                 completion_suggest=cli.OPT_COMPL_ONE_OS),
146
  cli.HYPERVISOR_OPT,
147
  cli.OSPARAMS_OPT,
148
  cli.cli_option("--disk-size", dest="disk_size",
149
                 help="Disk size (determines disk count)",
150
                 default="128m", type="string", metavar="<size,size,...>",
151
                 completion_suggest=("128M 512M 1G 4G 1G,256M"
152
                                     " 4G,1G,1G 10G").split()),
153
  cli.cli_option("--disk-growth", dest="disk_growth", help="Disk growth",
154
                 default="128m", type="string", metavar="<size,size,...>"),
155
  cli.cli_option("--mem-size", dest="mem_size", help="Memory size",
156
                 default=None, type="unit", metavar="<size>",
157
                 completion_suggest=("128M 256M 512M 1G 4G 8G"
158
                                     " 12G 16G").split()),
159
  cli.cli_option("--maxmem-size", dest="maxmem_size", help="Max Memory size",
160
                 default=256, type="unit", metavar="<size>",
161
                 completion_suggest=("128M 256M 512M 1G 4G 8G"
162
                                     " 12G 16G").split()),
163
  cli.cli_option("--minmem-size", dest="minmem_size", help="Min Memory size",
164
                 default=128, type="unit", metavar="<size>",
165
                 completion_suggest=("128M 256M 512M 1G 4G 8G"
166
                                     " 12G 16G").split()),
167
  cli.cli_option("--vcpu-count", dest="vcpu_count", help="VCPU count",
168
                 default=3, type="unit", metavar="<count>",
169
                 completion_suggest=("1 2 3 4").split()),
170
  cli.DEBUG_OPT,
171
  cli.VERBOSE_OPT,
172
  cli.NOIPCHECK_OPT,
173
  cli.NONAMECHECK_OPT,
174
  cli.EARLY_RELEASE_OPT,
175
  cli.cli_option("--no-replace1", dest="do_replace1",
176
                 help="Skip disk replacement with the same secondary",
177
                 action="store_false", default=True),
178
  cli.cli_option("--no-replace2", dest="do_replace2",
179
                 help="Skip disk replacement with a different secondary",
180
                 action="store_false", default=True),
181
  cli.cli_option("--no-failover", dest="do_failover",
182
                 help="Skip instance failovers", action="store_false",
183
                 default=True),
184
  cli.cli_option("--no-migrate", dest="do_migrate",
185
                 help="Skip instance live migration",
186
                 action="store_false", default=True),
187
  cli.cli_option("--no-move", dest="do_move",
188
                 help="Skip instance moves", action="store_false",
189
                 default=True),
190
  cli.cli_option("--no-importexport", dest="do_importexport",
191
                 help="Skip instance export/import", action="store_false",
192
                 default=True),
193
  cli.cli_option("--no-startstop", dest="do_startstop",
194
                 help="Skip instance stop/start", action="store_false",
195
                 default=True),
196
  cli.cli_option("--no-reinstall", dest="do_reinstall",
197
                 help="Skip instance reinstall", action="store_false",
198
                 default=True),
199
  cli.cli_option("--no-reboot", dest="do_reboot",
200
                 help="Skip instance reboot", action="store_false",
201
                 default=True),
202
  cli.cli_option("--no-renamesame", dest="do_renamesame",
203
                 help="Skip instance rename to same name", action="store_false",
204
                 default=True),
205
  cli.cli_option("--reboot-types", dest="reboot_types",
206
                 help="Specify the reboot types", default=None),
207
  cli.cli_option("--no-activate-disks", dest="do_activate_disks",
208
                 help="Skip disk activation/deactivation",
209
                 action="store_false", default=True),
210
  cli.cli_option("--no-add-disks", dest="do_addremove_disks",
211
                 help="Skip disk addition/removal",
212
                 action="store_false", default=True),
213
  cli.cli_option("--no-add-nics", dest="do_addremove_nics",
214
                 help="Skip NIC addition/removal",
215
                 action="store_false", default=True),
216
  cli.cli_option("--no-nics", dest="nics",
217
                 help="No network interfaces", action="store_const",
218
                 const=[], default=[{}]),
219
  cli.cli_option("--no-confd", dest="do_confd_tests",
220
                 help="Skip confd queries",
221
                 action="store_false", default=constants.ENABLE_CONFD),
222
  cli.cli_option("--rename", dest="rename", default=None,
223
                 help=("Give one unused instance name which is taken"
224
                       " to start the renaming sequence"),
225
                 metavar="<instance_name>"),
226
  cli.cli_option("-t", "--disk-template", dest="disk_template",
227
                 choices=list(_SUPPORTED_DISK_TEMPLATES),
228
                 default=constants.DT_DRBD8,
229
                 help=("Disk template (default %s, otherwise one of %s)" %
230
                       (constants.DT_DRBD8,
231
                        utils.CommaJoin(_SUPPORTED_DISK_TEMPLATES)))),
232
  cli.cli_option("-n", "--nodes", dest="nodes", default="",
233
                 help=("Comma separated list of nodes to perform"
234
                       " the burnin on (defaults to all nodes)"),
235
                 completion_suggest=cli.OPT_COMPL_MANY_NODES),
236
  cli.cli_option("-I", "--iallocator", dest="iallocator",
237
                 default=None, type="string",
238
                 help=("Perform the allocation using an iallocator"
239
                       " instead of fixed node spread (node restrictions no"
240
                       " longer apply, therefore -n/--nodes must not be"
241
                       " used"),
242
                 completion_suggest=cli.OPT_COMPL_ONE_IALLOCATOR),
243
  cli.cli_option("-p", "--parallel", default=False, action="store_true",
244
                 dest="parallel",
245
                 help=("Enable parallelization of some operations in"
246
                       " order to speed burnin or to test granular locking")),
247
  cli.cli_option("--net-timeout", default=15, type="int",
248
                 dest="net_timeout",
249
                 help=("The instance check network timeout in seconds"
250
                       " (defaults to 15 seconds)"),
251
                 completion_suggest="15 60 300 900".split()),
252
  cli.cli_option("-C", "--http-check", default=False, action="store_true",
253
                 dest="http_check",
254
                 help=("Enable checking of instance status via http,"
255
                       " looking for /hostname.txt that should contain the"
256
                       " name of the instance")),
257
  cli.cli_option("-K", "--keep-instances", default=False,
258
                 action="store_true",
259
                 dest="keep_instances",
260
                 help=("Leave instances on the cluster after burnin,"
261
                       " for investigation in case of errors or simply"
262
                       " to use them")),
263
  cli.REASON_OPT,
264
  ]
265

    
266
# Mainly used for bash completion
267
ARGUMENTS = [cli.ArgInstance(min=1)]
268

    
269

    
270
def _DoCheckInstances(fn):
271
  """Decorator for checking instances.
272

273
  """
274
  def wrapper(self, *args, **kwargs):
275
    val = fn(self, *args, **kwargs)
276
    for instance in self.instances:
277
      self._CheckInstanceAlive(instance) # pylint: disable=W0212
278
    return val
279

    
280
  return wrapper
281

    
282

    
283
def _DoBatch(retry):
284
  """Decorator for possible batch operations.
285

286
  Must come after the _DoCheckInstances decorator (if any).
287

288
  @param retry: whether this is a retryable batch, will be
289
      passed to StartBatch
290

291
  """
292
  def wrap(fn):
293
    def batched(self, *args, **kwargs):
294
      self.StartBatch(retry)
295
      val = fn(self, *args, **kwargs)
296
      self.CommitQueue()
297
      return val
298
    return batched
299

    
300
  return wrap
301

    
302

    
303
class Burner(object):
304
  """Burner class."""
305

    
306
  def __init__(self):
307
    """Constructor."""
308
    self.url_opener = SimpleOpener()
309
    self._feed_buf = StringIO()
310
    self.nodes = []
311
    self.instances = []
312
    self.to_rem = []
313
    self.queued_ops = []
314
    self.opts = None
315
    self.queue_retry = False
316
    self.disk_count = self.disk_growth = self.disk_size = None
317
    self.hvp = self.bep = None
318
    self.ParseOptions()
319
    self.cl = cli.GetClient()
320
    self.GetState()
321

    
322
  def ClearFeedbackBuf(self):
323
    """Clear the feedback buffer."""
324
    self._feed_buf.truncate(0)
325

    
326
  def GetFeedbackBuf(self):
327
    """Return the contents of the buffer."""
328
    return self._feed_buf.getvalue()
329

    
330
  def Feedback(self, msg):
331
    """Acumulate feedback in our buffer."""
332
    formatted_msg = "%s %s" % (time.ctime(utils.MergeTime(msg[0])), msg[2])
333
    self._feed_buf.write(formatted_msg + "\n")
334
    if self.opts.verbose:
335
      Log(formatted_msg, indent=3)
336

    
337
  def MaybeRetry(self, retry_count, msg, fn, *args):
338
    """Possibly retry a given function execution.
339

340
    @type retry_count: int
341
    @param retry_count: retry counter:
342
        - 0: non-retryable action
343
        - 1: last retry for a retryable action
344
        - MAX_RETRIES: original try for a retryable action
345
    @type msg: str
346
    @param msg: the kind of the operation
347
    @type fn: callable
348
    @param fn: the function to be called
349

350
    """
351
    try:
352
      val = fn(*args)
353
      if retry_count > 0 and retry_count < MAX_RETRIES:
354
        Log("Idempotent %s succeeded after %d retries",
355
            msg, MAX_RETRIES - retry_count)
356
      return val
357
    except Exception, err: # pylint: disable=W0703
358
      if retry_count == 0:
359
        Log("Non-idempotent %s failed, aborting", msg)
360
        raise
361
      elif retry_count == 1:
362
        Log("Idempotent %s repeated failure, aborting", msg)
363
        raise
364
      else:
365
        Log("Idempotent %s failed, retry #%d/%d: %s",
366
            msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err)
367
        self.MaybeRetry(retry_count - 1, msg, fn, *args)
368

    
369
  def _ExecOp(self, *ops):
370
    """Execute one or more opcodes and manage the exec buffer.
371

372
    @return: if only opcode has been passed, we return its result;
373
        otherwise we return the list of results
374

375
    """
376
    job_id = cli.SendJob(ops, cl=self.cl)
377
    results = cli.PollJob(job_id, cl=self.cl, feedback_fn=self.Feedback)
378
    if len(ops) == 1:
379
      return results[0]
380
    else:
381
      return results
382

    
383
  def ExecOp(self, retry, *ops):
384
    """Execute one or more opcodes and manage the exec buffer.
385

386
    @return: if only opcode has been passed, we return its result;
387
        otherwise we return the list of results
388

389
    """
390
    if retry:
391
      rval = MAX_RETRIES
392
    else:
393
      rval = 0
394
    cli.SetGenericOpcodeOpts(ops, self.opts)
395
    return self.MaybeRetry(rval, "opcode", self._ExecOp, *ops)
396

    
397
  def ExecOrQueue(self, name, ops, post_process=None):
398
    """Execute an opcode and manage the exec buffer."""
399
    if self.opts.parallel:
400
      cli.SetGenericOpcodeOpts(ops, self.opts)
401
      self.queued_ops.append((ops, name, post_process))
402
    else:
403
      val = self.ExecOp(self.queue_retry, *ops) # pylint: disable=W0142
404
      if post_process is not None:
405
        post_process()
406
      return val
407

    
408
  def StartBatch(self, retry):
409
    """Start a new batch of jobs.
410

411
    @param retry: whether this is a retryable batch
412

413
    """
414
    self.queued_ops = []
415
    self.queue_retry = retry
416

    
417
  def CommitQueue(self):
418
    """Execute all submitted opcodes in case of parallel burnin"""
419
    if not self.opts.parallel or not self.queued_ops:
420
      return
421

    
422
    if self.queue_retry:
423
      rval = MAX_RETRIES
424
    else:
425
      rval = 0
426

    
427
    try:
428
      results = self.MaybeRetry(rval, "jobset", self.ExecJobSet,
429
                                self.queued_ops)
430
    finally:
431
      self.queued_ops = []
432
    return results
433

    
434
  def ExecJobSet(self, jobs):
435
    """Execute a set of jobs and return once all are done.
436

437
    The method will return the list of results, if all jobs are
438
    successful. Otherwise, OpExecError will be raised from within
439
    cli.py.
440

441
    """
442
    self.ClearFeedbackBuf()
443
    jex = cli.JobExecutor(cl=self.cl, feedback_fn=self.Feedback)
444
    for ops, name, _ in jobs:
445
      jex.QueueJob(name, *ops) # pylint: disable=W0142
446
    try:
447
      results = jex.GetResults()
448
    except Exception, err: # pylint: disable=W0703
449
      Log("Jobs failed: %s", err)
450
      raise BurninFailure()
451

    
452
    fail = False
453
    val = []
454
    for (_, name, post_process), (success, result) in zip(jobs, results):
455
      if success:
456
        if post_process:
457
          try:
458
            post_process()
459
          except Exception, err: # pylint: disable=W0703
460
            Log("Post process call for job %s failed: %s", name, err)
461
            fail = True
462
        val.append(result)
463
      else:
464
        fail = True
465

    
466
    if fail:
467
      raise BurninFailure()
468

    
469
    return val
470

    
471
  def ParseOptions(self):
472
    """Parses the command line options.
473

474
    In case of command line errors, it will show the usage and exit the
475
    program.
476

477
    """
478
    parser = optparse.OptionParser(usage="\n%s" % USAGE,
479
                                   version=("%%prog (ganeti) %s" %
480
                                            constants.RELEASE_VERSION),
481
                                   option_list=OPTIONS)
482

    
483
    options, args = parser.parse_args()
484
    if len(args) < 1 or options.os is None:
485
      Usage()
486

    
487
    if options.mem_size:
488
      options.maxmem_size = options.mem_size
489
      options.minmem_size = options.mem_size
490
    elif options.minmem_size > options.maxmem_size:
491
      Err("Maximum memory lower than minimum memory")
492

    
493
    if options.disk_template not in _SUPPORTED_DISK_TEMPLATES:
494
      Err("Unknown or unsupported disk template '%s'" % options.disk_template)
495

    
496
    if options.disk_template == constants.DT_DISKLESS:
497
      disk_size = disk_growth = []
498
      options.do_addremove_disks = False
499
    else:
500
      disk_size = [utils.ParseUnit(v) for v in options.disk_size.split(",")]
501
      disk_growth = [utils.ParseUnit(v)
502
                     for v in options.disk_growth.split(",")]
503
      if len(disk_growth) != len(disk_size):
504
        Err("Wrong disk sizes/growth combination")
505
    if ((disk_size and options.disk_template == constants.DT_DISKLESS) or
506
        (not disk_size and options.disk_template != constants.DT_DISKLESS)):
507
      Err("Wrong disk count/disk template combination")
508

    
509
    self.disk_size = disk_size
510
    self.disk_growth = disk_growth
511
    self.disk_count = len(disk_size)
512

    
513
    if options.nodes and options.iallocator:
514
      Err("Give either the nodes option or the iallocator option, not both")
515

    
516
    if options.http_check and not options.name_check:
517
      Err("Can't enable HTTP checks without name checks")
518

    
519
    self.opts = options
520
    self.instances = args
521
    self.bep = {
522
      constants.BE_MINMEM: options.minmem_size,
523
      constants.BE_MAXMEM: options.maxmem_size,
524
      constants.BE_VCPUS: options.vcpu_count,
525
      }
526

    
527
    self.hypervisor = None
528
    self.hvp = {}
529
    if options.hypervisor:
530
      self.hypervisor, self.hvp = options.hypervisor
531

    
532
    if options.reboot_types is None:
533
      options.reboot_types = constants.REBOOT_TYPES
534
    else:
535
      options.reboot_types = options.reboot_types.split(",")
536
      rt_diff = set(options.reboot_types).difference(constants.REBOOT_TYPES)
537
      if rt_diff:
538
        Err("Invalid reboot types specified: %s" % utils.CommaJoin(rt_diff))
539

    
540
    socket.setdefaulttimeout(options.net_timeout)
541

    
542
  def GetState(self):
543
    """Read the cluster state from the master daemon."""
544
    if self.opts.nodes:
545
      names = self.opts.nodes.split(",")
546
    else:
547
      names = []
548
    try:
549
      qcl = GetClient(query=True)
550
      result = qcl.QueryNodes(names, ["name", "offline", "drained"], False)
551
    except errors.GenericError, err:
552
      err_code, msg = cli.FormatError(err)
553
      Err(msg, exit_code=err_code)
554
    finally:
555
      qcl.Close()
556
    self.nodes = [data[0] for data in result if not (data[1] or data[2])]
557

    
558
    op_diagnose = opcodes.OpOsDiagnose(output_fields=["name",
559
                                                      "variants",
560
                                                      "hidden"],
561
                                       names=[])
562
    result = self.ExecOp(True, op_diagnose)
563

    
564
    if not result:
565
      Err("Can't get the OS list")
566

    
567
    found = False
568
    for (name, variants, _) in result:
569
      if self.opts.os in cli.CalculateOSNames(name, variants):
570
        found = True
571
        break
572

    
573
    if not found:
574
      Err("OS '%s' not found" % self.opts.os)
575

    
576
    cluster_info = self.cl.QueryClusterInfo()
577
    self.cluster_info = cluster_info
578
    if not self.cluster_info:
579
      Err("Can't get cluster info")
580

    
581
    default_nic_params = self.cluster_info["nicparams"][constants.PP_DEFAULT]
582
    self.cluster_default_nicparams = default_nic_params
583
    if self.hypervisor is None:
584
      self.hypervisor = self.cluster_info["default_hypervisor"]
585
    self.hv_can_migrate = \
586
      hypervisor.GetHypervisorClass(self.hypervisor).CAN_MIGRATE
587

    
588
  @_DoCheckInstances
589
  @_DoBatch(False)
590
  def BurnCreateInstances(self):
591
    """Create the given instances.
592

593
    """
594
    self.to_rem = []
595
    mytor = izip(cycle(self.nodes),
596
                 islice(cycle(self.nodes), 1, None),
597
                 self.instances)
598

    
599
    Log("Creating instances")
600
    for pnode, snode, instance in mytor:
601
      Log("instance %s", instance, indent=1)
602
      if self.opts.iallocator:
603
        pnode = snode = None
604
        msg = "with iallocator %s" % self.opts.iallocator
605
      elif self.opts.disk_template not in constants.DTS_INT_MIRROR:
606
        snode = None
607
        msg = "on %s" % pnode
608
      else:
609
        msg = "on %s, %s" % (pnode, snode)
610

    
611
      Log(msg, indent=2)
612

    
613
      op = opcodes.OpInstanceCreate(instance_name=instance,
614
                                    disks=[{"size": size}
615
                                           for size in self.disk_size],
616
                                    disk_template=self.opts.disk_template,
617
                                    nics=self.opts.nics,
618
                                    mode=constants.INSTANCE_CREATE,
619
                                    os_type=self.opts.os,
620
                                    pnode=pnode,
621
                                    snode=snode,
622
                                    start=True,
623
                                    ip_check=self.opts.ip_check,
624
                                    name_check=self.opts.name_check,
625
                                    wait_for_sync=True,
626
                                    file_driver="loop",
627
                                    file_storage_dir=None,
628
                                    iallocator=self.opts.iallocator,
629
                                    beparams=self.bep,
630
                                    hvparams=self.hvp,
631
                                    hypervisor=self.hypervisor,
632
                                    osparams=self.opts.osparams,
633
                                    )
634
      remove_instance = lambda name: lambda: self.to_rem.append(name)
635
      self.ExecOrQueue(instance, [op], post_process=remove_instance(instance))
636

    
637
  @_DoBatch(False)
638
  def BurnModifyRuntimeMemory(self):
639
    """Alter the runtime memory."""
640
    Log("Setting instance runtime memory")
641
    for instance in self.instances:
642
      Log("instance %s", instance, indent=1)
643
      tgt_mem = self.bep[constants.BE_MINMEM]
644
      op = opcodes.OpInstanceSetParams(instance_name=instance,
645
                                       runtime_mem=tgt_mem)
646
      Log("Set memory to %s MB", tgt_mem, indent=2)
647
      self.ExecOrQueue(instance, [op])
648

    
649
  @_DoBatch(False)
650
  def BurnGrowDisks(self):
651
    """Grow both the os and the swap disks by the requested amount, if any."""
652
    Log("Growing disks")
653
    for instance in self.instances:
654
      Log("instance %s", instance, indent=1)
655
      for idx, growth in enumerate(self.disk_growth):
656
        if growth > 0:
657
          op = opcodes.OpInstanceGrowDisk(instance_name=instance, disk=idx,
658
                                          amount=growth, wait_for_sync=True)
659
          Log("increase disk/%s by %s MB", idx, growth, indent=2)
660
          self.ExecOrQueue(instance, [op])
661

    
662
  @_DoBatch(True)
663
  def BurnReplaceDisks1D8(self):
664
    """Replace disks on primary and secondary for drbd8."""
665
    Log("Replacing disks on the same nodes")
666
    early_release = self.opts.early_release
667
    for instance in self.instances:
668
      Log("instance %s", instance, indent=1)
669
      ops = []
670
      for mode in constants.REPLACE_DISK_SEC, constants.REPLACE_DISK_PRI:
671
        op = opcodes.OpInstanceReplaceDisks(instance_name=instance,
672
                                            mode=mode,
673
                                            disks=list(range(self.disk_count)),
674
                                            early_release=early_release)
675
        Log("run %s", mode, indent=2)
676
        ops.append(op)
677
      self.ExecOrQueue(instance, ops)
678

    
679
  @_DoBatch(True)
680
  def BurnReplaceDisks2(self):
681
    """Replace secondary node."""
682
    Log("Changing the secondary node")
683
    mode = constants.REPLACE_DISK_CHG
684

    
685
    mytor = izip(islice(cycle(self.nodes), 2, None),
686
                 self.instances)
687
    for tnode, instance in mytor:
688
      Log("instance %s", instance, indent=1)
689
      if self.opts.iallocator:
690
        tnode = None
691
        msg = "with iallocator %s" % self.opts.iallocator
692
      else:
693
        msg = tnode
694
      op = opcodes.OpInstanceReplaceDisks(instance_name=instance,
695
                                          mode=mode,
696
                                          remote_node=tnode,
697
                                          iallocator=self.opts.iallocator,
698
                                          disks=[],
699
                                          early_release=self.opts.early_release)
700
      Log("run %s %s", mode, msg, indent=2)
701
      self.ExecOrQueue(instance, [op])
702

    
703
  @_DoCheckInstances
704
  @_DoBatch(False)
705
  def BurnFailover(self):
706
    """Failover the instances."""
707
    Log("Failing over instances")
708
    for instance in self.instances:
709
      Log("instance %s", instance, indent=1)
710
      op = opcodes.OpInstanceFailover(instance_name=instance,
711
                                      ignore_consistency=False)
712
      self.ExecOrQueue(instance, [op])
713

    
714
  @_DoCheckInstances
715
  @_DoBatch(False)
716
  def BurnMove(self):
717
    """Move the instances."""
718
    Log("Moving instances")
719
    mytor = izip(islice(cycle(self.nodes), 1, None),
720
                 self.instances)
721
    for tnode, instance in mytor:
722
      Log("instance %s", instance, indent=1)
723
      op = opcodes.OpInstanceMove(instance_name=instance,
724
                                  target_node=tnode)
725
      self.ExecOrQueue(instance, [op])
726

    
727
  @_DoBatch(False)
728
  def BurnMigrate(self):
729
    """Migrate the instances."""
730
    Log("Migrating instances")
731
    for instance in self.instances:
732
      Log("instance %s", instance, indent=1)
733
      op1 = opcodes.OpInstanceMigrate(instance_name=instance, mode=None,
734
                                      cleanup=False)
735

    
736
      op2 = opcodes.OpInstanceMigrate(instance_name=instance, mode=None,
737
                                      cleanup=True)
738
      Log("migration and migration cleanup", indent=2)
739
      self.ExecOrQueue(instance, [op1, op2])
740

    
741
  @_DoCheckInstances
742
  @_DoBatch(False)
743
  def BurnImportExport(self):
744
    """Export the instance, delete it, and import it back.
745

746
    """
747
    Log("Exporting and re-importing instances")
748
    mytor = izip(cycle(self.nodes),
749
                 islice(cycle(self.nodes), 1, None),
750
                 islice(cycle(self.nodes), 2, None),
751
                 self.instances)
752

    
753
    qcl = GetClient(query=True)
754
    for pnode, snode, enode, instance in mytor:
755
      Log("instance %s", instance, indent=1)
756
      # read the full name of the instance
757
      ((full_name, ), ) = qcl.QueryInstances([instance], ["name"], False)
758

    
759
      if self.opts.iallocator:
760
        pnode = snode = None
761
        import_log_msg = ("import from %s"
762
                          " with iallocator %s" %
763
                          (enode, self.opts.iallocator))
764
      elif self.opts.disk_template not in constants.DTS_INT_MIRROR:
765
        snode = None
766
        import_log_msg = ("import from %s to %s" %
767
                          (enode, pnode))
768
      else:
769
        import_log_msg = ("import from %s to %s, %s" %
770
                          (enode, pnode, snode))
771

    
772
      exp_op = opcodes.OpBackupExport(instance_name=instance,
773
                                      target_node=enode,
774
                                      mode=constants.EXPORT_MODE_LOCAL,
775
                                      shutdown=True)
776
      rem_op = opcodes.OpInstanceRemove(instance_name=instance,
777
                                        ignore_failures=True)
778
      imp_dir = utils.PathJoin(pathutils.EXPORT_DIR, full_name)
779
      imp_op = opcodes.OpInstanceCreate(instance_name=instance,
780
                                        disks=[{"size": size}
781
                                               for size in self.disk_size],
782
                                        disk_template=self.opts.disk_template,
783
                                        nics=self.opts.nics,
784
                                        mode=constants.INSTANCE_IMPORT,
785
                                        src_node=enode,
786
                                        src_path=imp_dir,
787
                                        pnode=pnode,
788
                                        snode=snode,
789
                                        start=True,
790
                                        ip_check=self.opts.ip_check,
791
                                        name_check=self.opts.name_check,
792
                                        wait_for_sync=True,
793
                                        file_storage_dir=None,
794
                                        file_driver="loop",
795
                                        iallocator=self.opts.iallocator,
796
                                        beparams=self.bep,
797
                                        hvparams=self.hvp,
798
                                        osparams=self.opts.osparams,
799
                                        )
800

    
801
      erem_op = opcodes.OpBackupRemove(instance_name=instance)
802

    
803
      Log("export to node %s", enode, indent=2)
804
      Log("remove instance", indent=2)
805
      Log(import_log_msg, indent=2)
806
      Log("remove export", indent=2)
807
      self.ExecOrQueue(instance, [exp_op, rem_op, imp_op, erem_op])
808
    qcl.Close()
809

    
810
  @staticmethod
811
  def StopInstanceOp(instance):
812
    """Stop given instance."""
813
    return opcodes.OpInstanceShutdown(instance_name=instance)
814

    
815
  @staticmethod
816
  def StartInstanceOp(instance):
817
    """Start given instance."""
818
    return opcodes.OpInstanceStartup(instance_name=instance, force=False)
819

    
820
  @staticmethod
821
  def RenameInstanceOp(instance, instance_new):
822
    """Rename instance."""
823
    return opcodes.OpInstanceRename(instance_name=instance,
824
                                    new_name=instance_new)
825

    
826
  @_DoCheckInstances
827
  @_DoBatch(True)
828
  def BurnStopStart(self):
829
    """Stop/start the instances."""
830
    Log("Stopping and starting instances")
831
    for instance in self.instances:
832
      Log("instance %s", instance, indent=1)
833
      op1 = self.StopInstanceOp(instance)
834
      op2 = self.StartInstanceOp(instance)
835
      self.ExecOrQueue(instance, [op1, op2])
836

    
837
  @_DoBatch(False)
838
  def BurnRemove(self):
839
    """Remove the instances."""
840
    Log("Removing instances")
841
    for instance in self.to_rem:
842
      Log("instance %s", instance, indent=1)
843
      op = opcodes.OpInstanceRemove(instance_name=instance,
844
                                    ignore_failures=True)
845
      self.ExecOrQueue(instance, [op])
846

    
847
  def BurnRename(self):
848
    """Rename the instances.
849

850
    Note that this function will not execute in parallel, since we
851
    only have one target for rename.
852

853
    """
854
    Log("Renaming instances")
855
    rename = self.opts.rename
856
    for instance in self.instances:
857
      Log("instance %s", instance, indent=1)
858
      op_stop1 = self.StopInstanceOp(instance)
859
      op_stop2 = self.StopInstanceOp(rename)
860
      op_rename1 = self.RenameInstanceOp(instance, rename)
861
      op_rename2 = self.RenameInstanceOp(rename, instance)
862
      op_start1 = self.StartInstanceOp(rename)
863
      op_start2 = self.StartInstanceOp(instance)
864
      self.ExecOp(False, op_stop1, op_rename1, op_start1)
865
      self._CheckInstanceAlive(rename)
866
      self.ExecOp(False, op_stop2, op_rename2, op_start2)
867
      self._CheckInstanceAlive(instance)
868

    
869
  @_DoCheckInstances
870
  @_DoBatch(True)
871
  def BurnReinstall(self):
872
    """Reinstall the instances."""
873
    Log("Reinstalling instances")
874
    for instance in self.instances:
875
      Log("instance %s", instance, indent=1)
876
      op1 = self.StopInstanceOp(instance)
877
      op2 = opcodes.OpInstanceReinstall(instance_name=instance)
878
      Log("reinstall without passing the OS", indent=2)
879
      op3 = opcodes.OpInstanceReinstall(instance_name=instance,
880
                                        os_type=self.opts.os)
881
      Log("reinstall specifying the OS", indent=2)
882
      op4 = self.StartInstanceOp(instance)
883
      self.ExecOrQueue(instance, [op1, op2, op3, op4])
884

    
885
  @_DoCheckInstances
886
  @_DoBatch(True)
887
  def BurnReboot(self):
888
    """Reboot the instances."""
889
    Log("Rebooting instances")
890
    for instance in self.instances:
891
      Log("instance %s", instance, indent=1)
892
      ops = []
893
      for reboot_type in self.opts.reboot_types:
894
        op = opcodes.OpInstanceReboot(instance_name=instance,
895
                                      reboot_type=reboot_type,
896
                                      ignore_secondaries=False)
897
        Log("reboot with type '%s'", reboot_type, indent=2)
898
        ops.append(op)
899
      self.ExecOrQueue(instance, ops)
900

    
901
  @_DoCheckInstances
902
  @_DoBatch(True)
903
  def BurnRenameSame(self):
904
    """Rename the instances to their own name."""
905
    Log("Renaming the instances to their own name")
906
    for instance in self.instances:
907
      Log("instance %s", instance, indent=1)
908
      op1 = self.StopInstanceOp(instance)
909
      op2 = self.RenameInstanceOp(instance, instance)
910
      Log("rename to the same name", indent=2)
911
      op4 = self.StartInstanceOp(instance)
912
      self.ExecOrQueue(instance, [op1, op2, op4])
913

    
914
  @_DoCheckInstances
915
  @_DoBatch(True)
916
  def BurnActivateDisks(self):
917
    """Activate and deactivate disks of the instances."""
918
    Log("Activating/deactivating disks")
919
    for instance in self.instances:
920
      Log("instance %s", instance, indent=1)
921
      op_start = self.StartInstanceOp(instance)
922
      op_act = opcodes.OpInstanceActivateDisks(instance_name=instance)
923
      op_deact = opcodes.OpInstanceDeactivateDisks(instance_name=instance)
924
      op_stop = self.StopInstanceOp(instance)
925
      Log("activate disks when online", indent=2)
926
      Log("activate disks when offline", indent=2)
927
      Log("deactivate disks (when offline)", indent=2)
928
      self.ExecOrQueue(instance, [op_act, op_stop, op_act, op_deact, op_start])
929

    
930
  @_DoCheckInstances
931
  @_DoBatch(False)
932
  def BurnAddRemoveDisks(self):
933
    """Add and remove an extra disk for the instances."""
934
    Log("Adding and removing disks")
935
    for instance in self.instances:
936
      Log("instance %s", instance, indent=1)
937
      op_add = opcodes.OpInstanceSetParams(
938
        instance_name=instance,
939
        disks=[(constants.DDM_ADD, {"size": self.disk_size[0]})])
940
      op_rem = opcodes.OpInstanceSetParams(
941
        instance_name=instance, disks=[(constants.DDM_REMOVE, {})])
942
      op_stop = self.StopInstanceOp(instance)
943
      op_start = self.StartInstanceOp(instance)
944
      Log("adding a disk", indent=2)
945
      Log("removing last disk", indent=2)
946
      self.ExecOrQueue(instance, [op_add, op_stop, op_rem, op_start])
947

    
948
  @_DoBatch(False)
949
  def BurnAddRemoveNICs(self):
950
    """Add, change and remove an extra NIC for the instances."""
951
    Log("Adding and removing NICs")
952
    for instance in self.instances:
953
      Log("instance %s", instance, indent=1)
954
      op_add = opcodes.OpInstanceSetParams(
955
        instance_name=instance, nics=[(constants.DDM_ADD, {})])
956
      op_chg = opcodes.OpInstanceSetParams(
957
        instance_name=instance, nics=[(constants.DDM_MODIFY,
958
                                       -1, {"mac": constants.VALUE_GENERATE})])
959
      op_rem = opcodes.OpInstanceSetParams(
960
        instance_name=instance, nics=[(constants.DDM_REMOVE, {})])
961
      Log("adding a NIC", indent=2)
962
      Log("changing a NIC", indent=2)
963
      Log("removing last NIC", indent=2)
964
      self.ExecOrQueue(instance, [op_add, op_chg, op_rem])
965

    
966
  def ConfdCallback(self, reply):
967
    """Callback for confd queries"""
968
    if reply.type == confd_client.UPCALL_REPLY:
969
      if reply.server_reply.status != constants.CONFD_REPL_STATUS_OK:
970
        Err("Query %s gave non-ok status %s: %s" % (reply.orig_request,
971
                                                    reply.server_reply.status,
972
                                                    reply.server_reply))
973
      if reply.orig_request.type == constants.CONFD_REQ_PING:
974
        Log("Ping: OK", indent=1)
975
      elif reply.orig_request.type == constants.CONFD_REQ_CLUSTER_MASTER:
976
        if reply.server_reply.answer == self.cluster_info["master"]:
977
          Log("Master: OK", indent=1)
978
        else:
979
          Err("Master: wrong: %s" % reply.server_reply.answer)
980
      elif reply.orig_request.type == constants.CONFD_REQ_NODE_ROLE_BYNAME:
981
        if reply.server_reply.answer == constants.CONFD_NODE_ROLE_MASTER:
982
          Log("Node role for master: OK", indent=1)
983
        else:
984
          Err("Node role for master: wrong: %s" % reply.server_reply.answer)
985

    
986
  def DoConfdRequestReply(self, req):
987
    self.confd_counting_callback.RegisterQuery(req.rsalt)
988
    self.confd_client.SendRequest(req, async=False)
989
    while not self.confd_counting_callback.AllAnswered():
990
      if not self.confd_client.ReceiveReply():
991
        Err("Did not receive all expected confd replies")
992
        break
993

    
994
  def BurnConfd(self):
995
    """Run confd queries for our instances.
996

997
    The following confd queries are tested:
998
      - CONFD_REQ_PING: simple ping
999
      - CONFD_REQ_CLUSTER_MASTER: cluster master
1000
      - CONFD_REQ_NODE_ROLE_BYNAME: node role, for the master
1001

1002
    """
1003
    Log("Checking confd results")
1004

    
1005
    filter_callback = confd_client.ConfdFilterCallback(self.ConfdCallback)
1006
    counting_callback = confd_client.ConfdCountingCallback(filter_callback)
1007
    self.confd_counting_callback = counting_callback
1008

    
1009
    self.confd_client = confd_client.GetConfdClient(counting_callback)
1010

    
1011
    req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
1012
    self.DoConfdRequestReply(req)
1013

    
1014
    req = confd_client.ConfdClientRequest(
1015
      type=constants.CONFD_REQ_CLUSTER_MASTER)
1016
    self.DoConfdRequestReply(req)
1017

    
1018
    req = confd_client.ConfdClientRequest(
1019
        type=constants.CONFD_REQ_NODE_ROLE_BYNAME,
1020
        query=self.cluster_info["master"])
1021
    self.DoConfdRequestReply(req)
1022

    
1023
  def _CheckInstanceAlive(self, instance):
1024
    """Check if an instance is alive by doing http checks.
1025

1026
    This will try to retrieve the url on the instance /hostname.txt
1027
    and check that it contains the hostname of the instance. In case
1028
    we get ECONNREFUSED, we retry up to the net timeout seconds, for
1029
    any other error we abort.
1030

1031
    """
1032
    if not self.opts.http_check:
1033
      return
1034
    end_time = time.time() + self.opts.net_timeout
1035
    url = None
1036
    while time.time() < end_time and url is None:
1037
      try:
1038
        url = self.url_opener.open("http://%s/hostname.txt" % instance)
1039
      except IOError:
1040
        # here we can have connection refused, no route to host, etc.
1041
        time.sleep(1)
1042
    if url is None:
1043
      raise InstanceDown(instance, "Cannot contact instance")
1044
    hostname = url.read().strip()
1045
    url.close()
1046
    if hostname != instance:
1047
      raise InstanceDown(instance, ("Hostname mismatch, expected %s, got %s" %
1048
                                    (instance, hostname)))
1049

    
1050
  def BurninCluster(self):
1051
    """Test a cluster intensively.
1052

1053
    This will create instances and then start/stop/failover them.
1054
    It is safe for existing instances but could impact performance.
1055

1056
    """
1057

    
1058
    Log("Testing global parameters")
1059

    
1060
    if (len(self.nodes) == 1 and
1061
        self.opts.disk_template not in _SINGLE_NODE_DISK_TEMPLATES):
1062
      Err("When one node is available/selected the disk template must"
1063
          " be one of %s" % utils.CommaJoin(_SINGLE_NODE_DISK_TEMPLATES))
1064

    
1065
    if self.opts.do_confd_tests and not constants.ENABLE_CONFD:
1066
      Err("You selected confd tests but confd was disabled at configure time")
1067

    
1068
    has_err = True
1069
    try:
1070
      self.BurnCreateInstances()
1071

    
1072
      if self.bep[constants.BE_MINMEM] < self.bep[constants.BE_MAXMEM]:
1073
        self.BurnModifyRuntimeMemory()
1074

    
1075
      if self.opts.do_replace1 and \
1076
           self.opts.disk_template in constants.DTS_INT_MIRROR:
1077
        self.BurnReplaceDisks1D8()
1078
      if (self.opts.do_replace2 and len(self.nodes) > 2 and
1079
          self.opts.disk_template in constants.DTS_INT_MIRROR):
1080
        self.BurnReplaceDisks2()
1081

    
1082
      if (self.opts.disk_template in constants.DTS_GROWABLE and
1083
          compat.any(n > 0 for n in self.disk_growth)):
1084
        self.BurnGrowDisks()
1085

    
1086
      if self.opts.do_failover and \
1087
           self.opts.disk_template in constants.DTS_MIRRORED:
1088
        self.BurnFailover()
1089

    
1090
      if self.opts.do_migrate:
1091
        if self.opts.disk_template not in constants.DTS_MIRRORED:
1092
          Log("Skipping migration (disk template %s does not support it)",
1093
              self.opts.disk_template)
1094
        elif not self.hv_can_migrate:
1095
          Log("Skipping migration (hypervisor %s does not support it)",
1096
              self.hypervisor)
1097
        else:
1098
          self.BurnMigrate()
1099

    
1100
      if (self.opts.do_move and len(self.nodes) > 1 and
1101
          self.opts.disk_template in [constants.DT_PLAIN, constants.DT_FILE]):
1102
        self.BurnMove()
1103

    
1104
      if (self.opts.do_importexport and
1105
          self.opts.disk_template in _IMPEXP_DISK_TEMPLATES):
1106
        self.BurnImportExport()
1107

    
1108
      if self.opts.do_reinstall:
1109
        self.BurnReinstall()
1110

    
1111
      if self.opts.do_reboot:
1112
        self.BurnReboot()
1113

    
1114
      if self.opts.do_renamesame:
1115
        self.BurnRenameSame()
1116

    
1117
      if self.opts.do_addremove_disks:
1118
        self.BurnAddRemoveDisks()
1119

    
1120
      default_nic_mode = self.cluster_default_nicparams[constants.NIC_MODE]
1121
      # Don't add/remove nics in routed mode, as we would need an ip to add
1122
      # them with
1123
      if self.opts.do_addremove_nics:
1124
        if default_nic_mode == constants.NIC_MODE_BRIDGED:
1125
          self.BurnAddRemoveNICs()
1126
        else:
1127
          Log("Skipping nic add/remove as the cluster is not in bridged mode")
1128

    
1129
      if self.opts.do_activate_disks:
1130
        self.BurnActivateDisks()
1131

    
1132
      if self.opts.rename:
1133
        self.BurnRename()
1134

    
1135
      if self.opts.do_confd_tests:
1136
        self.BurnConfd()
1137

    
1138
      if self.opts.do_startstop:
1139
        self.BurnStopStart()
1140

    
1141
      has_err = False
1142
    finally:
1143
      if has_err:
1144
        Log("Error detected: opcode buffer follows:\n\n")
1145
        Log(self.GetFeedbackBuf())
1146
        Log("\n\n")
1147
      if not self.opts.keep_instances:
1148
        try:
1149
          self.BurnRemove()
1150
        except Exception, err:  # pylint: disable=W0703
1151
          if has_err: # already detected errors, so errors in removal
1152
                      # are quite expected
1153
            Log("Note: error detected during instance remove: %s", err)
1154
          else: # non-expected error
1155
            raise
1156

    
1157
    return constants.EXIT_SUCCESS
1158

    
1159

    
1160
def Main():
1161
  """Main function.
1162

1163
  """
1164
  utils.SetupLogging(pathutils.LOG_BURNIN, sys.argv[0],
1165
                     debug=False, stderr_logging=True)
1166

    
1167
  return Burner().BurninCluster()