Statistics
| Branch: | Tag: | Revision:

root / tools / burnin @ 9fdc92fa

History | View | Annotate | Download (41.4 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Burnin program
23

    
24
"""
25

    
26
import sys
27
import optparse
28
import time
29
import socket
30
import urllib
31
from itertools import izip, islice, cycle
32
from cStringIO import StringIO
33

    
34
from ganeti import opcodes
35
from ganeti import constants
36
from ganeti import cli
37
from ganeti import errors
38
from ganeti import utils
39
from ganeti import hypervisor
40
from ganeti import compat
41
from ganeti import pathutils
42

    
43
from ganeti.confd import client as confd_client
44

    
45

    
46
USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...")
47

    
48
MAX_RETRIES = 3
49
LOG_HEADERS = {
50
  0: "- ",
51
  1: "* ",
52
  2: "",
53
  }
54

    
55

    
56
class InstanceDown(Exception):
57
  """The checked instance was not up"""
58

    
59

    
60
class BurninFailure(Exception):
61
  """Failure detected during burning"""
62

    
63

    
64
def Usage():
65
  """Shows program usage information and exits the program."""
66

    
67
  print >> sys.stderr, "Usage:"
68
  print >> sys.stderr, USAGE
69
  sys.exit(2)
70

    
71

    
72
def Log(msg, *args, **kwargs):
73
  """Simple function that prints out its argument.
74

    
75
  """
76
  if args:
77
    msg = msg % args
78
  indent = kwargs.get("indent", 0)
79
  sys.stdout.write("%*s%s%s\n" % (2 * indent, "",
80
                                  LOG_HEADERS.get(indent, "  "), msg))
81
  sys.stdout.flush()
82

    
83

    
84
def Err(msg, exit_code=1):
85
  """Simple error logging that prints to stderr.
86

    
87
  """
88
  sys.stderr.write(msg + "\n")
89
  sys.stderr.flush()
90
  sys.exit(exit_code)
91

    
92

    
93
class SimpleOpener(urllib.FancyURLopener):
94
  """A simple url opener"""
95
  # pylint: disable=W0221
96

    
97
  def prompt_user_passwd(self, host, realm, clear_cache=0):
98
    """No-interaction version of prompt_user_passwd."""
99
    # we follow parent class' API
100
    # pylint: disable=W0613
101
    return None, None
102

    
103
  def http_error_default(self, url, fp, errcode, errmsg, headers):
104
    """Custom error handling"""
105
    # make sure sockets are not left in CLOSE_WAIT, this is similar
106
    # but with a different exception to the BasicURLOpener class
107
    _ = fp.read() # throw away data
108
    fp.close()
109
    raise InstanceDown("HTTP error returned: code %s, msg %s" %
110
                       (errcode, errmsg))
111

    
112

    
113
OPTIONS = [
114
  cli.cli_option("-o", "--os", dest="os", default=None,
115
                 help="OS to use during burnin",
116
                 metavar="<OS>",
117
                 completion_suggest=cli.OPT_COMPL_ONE_OS),
118
  cli.HYPERVISOR_OPT,
119
  cli.OSPARAMS_OPT,
120
  cli.cli_option("--disk-size", dest="disk_size",
121
                 help="Disk size (determines disk count)",
122
                 default="128m", type="string", metavar="<size,size,...>",
123
                 completion_suggest=("128M 512M 1G 4G 1G,256M"
124
                                     " 4G,1G,1G 10G").split()),
125
  cli.cli_option("--disk-growth", dest="disk_growth", help="Disk growth",
126
                 default="128m", type="string", metavar="<size,size,...>"),
127
  cli.cli_option("--mem-size", dest="mem_size", help="Memory size",
128
                 default=None, type="unit", metavar="<size>",
129
                 completion_suggest=("128M 256M 512M 1G 4G 8G"
130
                                     " 12G 16G").split()),
131
  cli.cli_option("--maxmem-size", dest="maxmem_size", help="Max Memory size",
132
                 default=256, type="unit", metavar="<size>",
133
                 completion_suggest=("128M 256M 512M 1G 4G 8G"
134
                                     " 12G 16G").split()),
135
  cli.cli_option("--minmem-size", dest="minmem_size", help="Min Memory size",
136
                 default=128, type="unit", metavar="<size>",
137
                 completion_suggest=("128M 256M 512M 1G 4G 8G"
138
                                     " 12G 16G").split()),
139
  cli.cli_option("--vcpu-count", dest="vcpu_count", help="VCPU count",
140
                 default=3, type="unit", metavar="<count>",
141
                 completion_suggest=("1 2 3 4").split()),
142
  cli.DEBUG_OPT,
143
  cli.VERBOSE_OPT,
144
  cli.NOIPCHECK_OPT,
145
  cli.NONAMECHECK_OPT,
146
  cli.EARLY_RELEASE_OPT,
147
  cli.cli_option("--no-replace1", dest="do_replace1",
148
                 help="Skip disk replacement with the same secondary",
149
                 action="store_false", default=True),
150
  cli.cli_option("--no-replace2", dest="do_replace2",
151
                 help="Skip disk replacement with a different secondary",
152
                 action="store_false", default=True),
153
  cli.cli_option("--no-failover", dest="do_failover",
154
                 help="Skip instance failovers", action="store_false",
155
                 default=True),
156
  cli.cli_option("--no-migrate", dest="do_migrate",
157
                 help="Skip instance live migration",
158
                 action="store_false", default=True),
159
  cli.cli_option("--no-move", dest="do_move",
160
                 help="Skip instance moves", action="store_false",
161
                 default=True),
162
  cli.cli_option("--no-importexport", dest="do_importexport",
163
                 help="Skip instance export/import", action="store_false",
164
                 default=True),
165
  cli.cli_option("--no-startstop", dest="do_startstop",
166
                 help="Skip instance stop/start", action="store_false",
167
                 default=True),
168
  cli.cli_option("--no-reinstall", dest="do_reinstall",
169
                 help="Skip instance reinstall", action="store_false",
170
                 default=True),
171
  cli.cli_option("--no-reboot", dest="do_reboot",
172
                 help="Skip instance reboot", action="store_false",
173
                 default=True),
174
  cli.cli_option("--no-renamesame", dest="do_renamesame",
175
                 help="Skip instance rename to same name", action="store_false",
176
                 default=True),
177
  cli.cli_option("--reboot-types", dest="reboot_types",
178
                 help="Specify the reboot types", default=None),
179
  cli.cli_option("--no-activate-disks", dest="do_activate_disks",
180
                 help="Skip disk activation/deactivation",
181
                 action="store_false", default=True),
182
  cli.cli_option("--no-add-disks", dest="do_addremove_disks",
183
                 help="Skip disk addition/removal",
184
                 action="store_false", default=True),
185
  cli.cli_option("--no-add-nics", dest="do_addremove_nics",
186
                 help="Skip NIC addition/removal",
187
                 action="store_false", default=True),
188
  cli.cli_option("--no-nics", dest="nics",
189
                 help="No network interfaces", action="store_const",
190
                 const=[], default=[{}]),
191
  cli.cli_option("--no-confd", dest="do_confd_tests",
192
                 help="Skip confd queries",
193
                 action="store_false", default=constants.ENABLE_CONFD),
194
  cli.cli_option("--rename", dest="rename", default=None,
195
                 help=("Give one unused instance name which is taken"
196
                       " to start the renaming sequence"),
197
                 metavar="<instance_name>"),
198
  cli.cli_option("-t", "--disk-template", dest="disk_template",
199
                 choices=list(constants.DISK_TEMPLATES),
200
                 default=constants.DT_DRBD8,
201
                 help="Disk template (diskless, file, plain, sharedfile"
202
                 " or drbd) [drbd]"),
203
  cli.cli_option("-n", "--nodes", dest="nodes", default="",
204
                 help=("Comma separated list of nodes to perform"
205
                       " the burnin on (defaults to all nodes)"),
206
                 completion_suggest=cli.OPT_COMPL_MANY_NODES),
207
  cli.cli_option("-I", "--iallocator", dest="iallocator",
208
                 default=None, type="string",
209
                 help=("Perform the allocation using an iallocator"
210
                       " instead of fixed node spread (node restrictions no"
211
                       " longer apply, therefore -n/--nodes must not be"
212
                       " used"),
213
                 completion_suggest=cli.OPT_COMPL_ONE_IALLOCATOR),
214
  cli.cli_option("-p", "--parallel", default=False, action="store_true",
215
                 dest="parallel",
216
                 help=("Enable parallelization of some operations in"
217
                       " order to speed burnin or to test granular locking")),
218
  cli.cli_option("--net-timeout", default=15, type="int",
219
                 dest="net_timeout",
220
                 help=("The instance check network timeout in seconds"
221
                       " (defaults to 15 seconds)"),
222
                 completion_suggest="15 60 300 900".split()),
223
  cli.cli_option("-C", "--http-check", default=False, action="store_true",
224
                 dest="http_check",
225
                 help=("Enable checking of instance status via http,"
226
                       " looking for /hostname.txt that should contain the"
227
                       " name of the instance")),
228
  cli.cli_option("-K", "--keep-instances", default=False,
229
                 action="store_true",
230
                 dest="keep_instances",
231
                 help=("Leave instances on the cluster after burnin,"
232
                       " for investigation in case of errors or simply"
233
                       " to use them")),
234
  ]
235

    
236
# Mainly used for bash completion
237
ARGUMENTS = [cli.ArgInstance(min=1)]
238

    
239

    
240
def _DoCheckInstances(fn):
241
  """Decorator for checking instances.
242

    
243
  """
244
  def wrapper(self, *args, **kwargs):
245
    val = fn(self, *args, **kwargs)
246
    for instance in self.instances:
247
      self._CheckInstanceAlive(instance) # pylint: disable=W0212
248
    return val
249

    
250
  return wrapper
251

    
252

    
253
def _DoBatch(retry):
254
  """Decorator for possible batch operations.
255

    
256
  Must come after the _DoCheckInstances decorator (if any).
257

    
258
  @param retry: whether this is a retryable batch, will be
259
      passed to StartBatch
260

    
261
  """
262
  def wrap(fn):
263
    def batched(self, *args, **kwargs):
264
      self.StartBatch(retry)
265
      val = fn(self, *args, **kwargs)
266
      self.CommitQueue()
267
      return val
268
    return batched
269

    
270
  return wrap
271

    
272

    
273
class Burner(object):
274
  """Burner class."""
275

    
276
  def __init__(self):
277
    """Constructor."""
278
    self.url_opener = SimpleOpener()
279
    self._feed_buf = StringIO()
280
    self.nodes = []
281
    self.instances = []
282
    self.to_rem = []
283
    self.queued_ops = []
284
    self.opts = None
285
    self.queue_retry = False
286
    self.disk_count = self.disk_growth = self.disk_size = None
287
    self.hvp = self.bep = None
288
    self.ParseOptions()
289
    self.cl = cli.GetClient()
290
    self.GetState()
291

    
292
  def ClearFeedbackBuf(self):
293
    """Clear the feedback buffer."""
294
    self._feed_buf.truncate(0)
295

    
296
  def GetFeedbackBuf(self):
297
    """Return the contents of the buffer."""
298
    return self._feed_buf.getvalue()
299

    
300
  def Feedback(self, msg):
301
    """Acumulate feedback in our buffer."""
302
    formatted_msg = "%s %s" % (time.ctime(utils.MergeTime(msg[0])), msg[2])
303
    self._feed_buf.write(formatted_msg + "\n")
304
    if self.opts.verbose:
305
      Log(formatted_msg, indent=3)
306

    
307
  def MaybeRetry(self, retry_count, msg, fn, *args):
308
    """Possibly retry a given function execution.
309

    
310
    @type retry_count: int
311
    @param retry_count: retry counter:
312
        - 0: non-retryable action
313
        - 1: last retry for a retryable action
314
        - MAX_RETRIES: original try for a retryable action
315
    @type msg: str
316
    @param msg: the kind of the operation
317
    @type fn: callable
318
    @param fn: the function to be called
319

    
320
    """
321
    try:
322
      val = fn(*args)
323
      if retry_count > 0 and retry_count < MAX_RETRIES:
324
        Log("Idempotent %s succeeded after %d retries",
325
            msg, MAX_RETRIES - retry_count)
326
      return val
327
    except Exception, err: # pylint: disable=W0703
328
      if retry_count == 0:
329
        Log("Non-idempotent %s failed, aborting", msg)
330
        raise
331
      elif retry_count == 1:
332
        Log("Idempotent %s repeated failure, aborting", msg)
333
        raise
334
      else:
335
        Log("Idempotent %s failed, retry #%d/%d: %s",
336
            msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err)
337
        self.MaybeRetry(retry_count - 1, msg, fn, *args)
338

    
339
  def _ExecOp(self, *ops):
340
    """Execute one or more opcodes and manage the exec buffer.
341

    
342
    @return: if only opcode has been passed, we return its result;
343
        otherwise we return the list of results
344

    
345
    """
346
    job_id = cli.SendJob(ops, cl=self.cl)
347
    results = cli.PollJob(job_id, cl=self.cl, feedback_fn=self.Feedback)
348
    if len(ops) == 1:
349
      return results[0]
350
    else:
351
      return results
352

    
353
  def ExecOp(self, retry, *ops):
354
    """Execute one or more opcodes and manage the exec buffer.
355

    
356
    @return: if only opcode has been passed, we return its result;
357
        otherwise we return the list of results
358

    
359
    """
360
    if retry:
361
      rval = MAX_RETRIES
362
    else:
363
      rval = 0
364
    cli.SetGenericOpcodeOpts(ops, self.opts)
365
    return self.MaybeRetry(rval, "opcode", self._ExecOp, *ops)
366

    
367
  def ExecOrQueue(self, name, ops, post_process=None):
368
    """Execute an opcode and manage the exec buffer."""
369
    if self.opts.parallel:
370
      cli.SetGenericOpcodeOpts(ops, self.opts)
371
      self.queued_ops.append((ops, name, post_process))
372
    else:
373
      val = self.ExecOp(self.queue_retry, *ops) # pylint: disable=W0142
374
      if post_process is not None:
375
        post_process()
376
      return val
377

    
378
  def StartBatch(self, retry):
379
    """Start a new batch of jobs.
380

    
381
    @param retry: whether this is a retryable batch
382

    
383
    """
384
    self.queued_ops = []
385
    self.queue_retry = retry
386

    
387
  def CommitQueue(self):
388
    """Execute all submitted opcodes in case of parallel burnin"""
389
    if not self.opts.parallel or not self.queued_ops:
390
      return
391

    
392
    if self.queue_retry:
393
      rval = MAX_RETRIES
394
    else:
395
      rval = 0
396

    
397
    try:
398
      results = self.MaybeRetry(rval, "jobset", self.ExecJobSet,
399
                                self.queued_ops)
400
    finally:
401
      self.queued_ops = []
402
    return results
403

    
404
  def ExecJobSet(self, jobs):
405
    """Execute a set of jobs and return once all are done.
406

    
407
    The method will return the list of results, if all jobs are
408
    successful. Otherwise, OpExecError will be raised from within
409
    cli.py.
410

    
411
    """
412
    self.ClearFeedbackBuf()
413
    jex = cli.JobExecutor(cl=self.cl, feedback_fn=self.Feedback)
414
    for ops, name, _ in jobs:
415
      jex.QueueJob(name, *ops) # pylint: disable=W0142
416
    try:
417
      results = jex.GetResults()
418
    except Exception, err: # pylint: disable=W0703
419
      Log("Jobs failed: %s", err)
420
      raise BurninFailure()
421

    
422
    fail = False
423
    val = []
424
    for (_, name, post_process), (success, result) in zip(jobs, results):
425
      if success:
426
        if post_process:
427
          try:
428
            post_process()
429
          except Exception, err: # pylint: disable=W0703
430
            Log("Post process call for job %s failed: %s", name, err)
431
            fail = True
432
        val.append(result)
433
      else:
434
        fail = True
435

    
436
    if fail:
437
      raise BurninFailure()
438

    
439
    return val
440

    
441
  def ParseOptions(self):
442
    """Parses the command line options.
443

    
444
    In case of command line errors, it will show the usage and exit the
445
    program.
446

    
447
    """
448
    parser = optparse.OptionParser(usage="\n%s" % USAGE,
449
                                   version=("%%prog (ganeti) %s" %
450
                                            constants.RELEASE_VERSION),
451
                                   option_list=OPTIONS)
452

    
453
    options, args = parser.parse_args()
454
    if len(args) < 1 or options.os is None:
455
      Usage()
456

    
457
    if options.mem_size:
458
      options.maxmem_size = options.mem_size
459
      options.minmem_size = options.mem_size
460
    elif options.minmem_size > options.maxmem_size:
461
      Err("Maximum memory lower than minimum memory")
462

    
463
    supported_disk_templates = (constants.DT_DISKLESS,
464
                                constants.DT_FILE,
465
                                constants.DT_SHARED_FILE,
466
                                constants.DT_PLAIN,
467
                                constants.DT_DRBD8,
468
                                constants.DT_RBD,
469
                                )
470
    if options.disk_template not in supported_disk_templates:
471
      Err("Unknown disk template '%s'" % options.disk_template)
472

    
473
    if options.disk_template == constants.DT_DISKLESS:
474
      disk_size = disk_growth = []
475
      options.do_addremove_disks = False
476
    else:
477
      disk_size = [utils.ParseUnit(v) for v in options.disk_size.split(",")]
478
      disk_growth = [utils.ParseUnit(v)
479
                     for v in options.disk_growth.split(",")]
480
      if len(disk_growth) != len(disk_size):
481
        Err("Wrong disk sizes/growth combination")
482
    if ((disk_size and options.disk_template == constants.DT_DISKLESS) or
483
        (not disk_size and options.disk_template != constants.DT_DISKLESS)):
484
      Err("Wrong disk count/disk template combination")
485

    
486
    self.disk_size = disk_size
487
    self.disk_growth = disk_growth
488
    self.disk_count = len(disk_size)
489

    
490
    if options.nodes and options.iallocator:
491
      Err("Give either the nodes option or the iallocator option, not both")
492

    
493
    if options.http_check and not options.name_check:
494
      Err("Can't enable HTTP checks without name checks")
495

    
496
    self.opts = options
497
    self.instances = args
498
    self.bep = {
499
      constants.BE_MINMEM: options.minmem_size,
500
      constants.BE_MAXMEM: options.maxmem_size,
501
      constants.BE_VCPUS: options.vcpu_count,
502
      }
503

    
504
    self.hypervisor = None
505
    self.hvp = {}
506
    if options.hypervisor:
507
      self.hypervisor, self.hvp = options.hypervisor
508

    
509
    if options.reboot_types is None:
510
      options.reboot_types = constants.REBOOT_TYPES
511
    else:
512
      options.reboot_types = options.reboot_types.split(",")
513
      rt_diff = set(options.reboot_types).difference(constants.REBOOT_TYPES)
514
      if rt_diff:
515
        Err("Invalid reboot types specified: %s" % utils.CommaJoin(rt_diff))
516

    
517
    socket.setdefaulttimeout(options.net_timeout)
518

    
519
  def GetState(self):
520
    """Read the cluster state from the master daemon."""
521
    if self.opts.nodes:
522
      names = self.opts.nodes.split(",")
523
    else:
524
      names = []
525
    try:
526
      op = opcodes.OpNodeQuery(output_fields=["name", "offline", "drained"],
527
                               names=names, use_locking=True)
528
      result = self.ExecOp(True, op)
529
    except errors.GenericError, err:
530
      err_code, msg = cli.FormatError(err)
531
      Err(msg, exit_code=err_code)
532
    self.nodes = [data[0] for data in result if not (data[1] or data[2])]
533

    
534
    op_diagnose = opcodes.OpOsDiagnose(output_fields=["name",
535
                                                      "variants",
536
                                                      "hidden"],
537
                                       names=[])
538
    result = self.ExecOp(True, op_diagnose)
539

    
540
    if not result:
541
      Err("Can't get the OS list")
542

    
543
    found = False
544
    for (name, variants, _) in result:
545
      if self.opts.os in cli.CalculateOSNames(name, variants):
546
        found = True
547
        break
548

    
549
    if not found:
550
      Err("OS '%s' not found" % self.opts.os)
551

    
552
    cluster_info = self.cl.QueryClusterInfo()
553
    self.cluster_info = cluster_info
554
    if not self.cluster_info:
555
      Err("Can't get cluster info")
556

    
557
    default_nic_params = self.cluster_info["nicparams"][constants.PP_DEFAULT]
558
    self.cluster_default_nicparams = default_nic_params
559
    if self.hypervisor is None:
560
      self.hypervisor = self.cluster_info["default_hypervisor"]
561
    self.hv_class = hypervisor.GetHypervisorClass(self.hypervisor)
562

    
563
  @_DoCheckInstances
564
  @_DoBatch(False)
565
  def BurnCreateInstances(self):
566
    """Create the given instances.
567

    
568
    """
569
    self.to_rem = []
570
    mytor = izip(cycle(self.nodes),
571
                 islice(cycle(self.nodes), 1, None),
572
                 self.instances)
573

    
574
    Log("Creating instances")
575
    for pnode, snode, instance in mytor:
576
      Log("instance %s", instance, indent=1)
577
      if self.opts.iallocator:
578
        pnode = snode = None
579
        msg = "with iallocator %s" % self.opts.iallocator
580
      elif self.opts.disk_template not in constants.DTS_INT_MIRROR:
581
        snode = None
582
        msg = "on %s" % pnode
583
      else:
584
        msg = "on %s, %s" % (pnode, snode)
585

    
586
      Log(msg, indent=2)
587

    
588
      op = opcodes.OpInstanceCreate(instance_name=instance,
589
                                    disks=[{"size": size}
590
                                           for size in self.disk_size],
591
                                    disk_template=self.opts.disk_template,
592
                                    nics=self.opts.nics,
593
                                    mode=constants.INSTANCE_CREATE,
594
                                    os_type=self.opts.os,
595
                                    pnode=pnode,
596
                                    snode=snode,
597
                                    start=True,
598
                                    ip_check=self.opts.ip_check,
599
                                    name_check=self.opts.name_check,
600
                                    wait_for_sync=True,
601
                                    file_driver="loop",
602
                                    file_storage_dir=None,
603
                                    iallocator=self.opts.iallocator,
604
                                    beparams=self.bep,
605
                                    hvparams=self.hvp,
606
                                    hypervisor=self.hypervisor,
607
                                    osparams=self.opts.osparams,
608
                                    )
609
      remove_instance = lambda name: lambda: self.to_rem.append(name)
610
      self.ExecOrQueue(instance, [op], post_process=remove_instance(instance))
611

    
612
  @_DoBatch(False)
613
  def BurnModifyRuntimeMemory(self):
614
    """Alter the runtime memory."""
615
    Log("Setting instance runtime memory")
616
    for instance in self.instances:
617
      Log("instance %s", instance, indent=1)
618
      tgt_mem = self.bep[constants.BE_MINMEM]
619
      op = opcodes.OpInstanceSetParams(instance_name=instance,
620
                                       runtime_mem=tgt_mem)
621
      Log("Set memory to %s MB", tgt_mem, indent=2)
622
      self.ExecOrQueue(instance, [op])
623

    
624
  @_DoBatch(False)
625
  def BurnGrowDisks(self):
626
    """Grow both the os and the swap disks by the requested amount, if any."""
627
    Log("Growing disks")
628
    for instance in self.instances:
629
      Log("instance %s", instance, indent=1)
630
      for idx, growth in enumerate(self.disk_growth):
631
        if growth > 0:
632
          op = opcodes.OpInstanceGrowDisk(instance_name=instance, disk=idx,
633
                                          amount=growth, wait_for_sync=True)
634
          Log("increase disk/%s by %s MB", idx, growth, indent=2)
635
          self.ExecOrQueue(instance, [op])
636

    
637
  @_DoBatch(True)
638
  def BurnReplaceDisks1D8(self):
639
    """Replace disks on primary and secondary for drbd8."""
640
    Log("Replacing disks on the same nodes")
641
    early_release = self.opts.early_release
642
    for instance in self.instances:
643
      Log("instance %s", instance, indent=1)
644
      ops = []
645
      for mode in constants.REPLACE_DISK_SEC, constants.REPLACE_DISK_PRI:
646
        op = opcodes.OpInstanceReplaceDisks(instance_name=instance,
647
                                            mode=mode,
648
                                            disks=list(range(self.disk_count)),
649
                                            early_release=early_release)
650
        Log("run %s", mode, indent=2)
651
        ops.append(op)
652
      self.ExecOrQueue(instance, ops)
653

    
654
  @_DoBatch(True)
655
  def BurnReplaceDisks2(self):
656
    """Replace secondary node."""
657
    Log("Changing the secondary node")
658
    mode = constants.REPLACE_DISK_CHG
659

    
660
    mytor = izip(islice(cycle(self.nodes), 2, None),
661
                 self.instances)
662
    for tnode, instance in mytor:
663
      Log("instance %s", instance, indent=1)
664
      if self.opts.iallocator:
665
        tnode = None
666
        msg = "with iallocator %s" % self.opts.iallocator
667
      else:
668
        msg = tnode
669
      op = opcodes.OpInstanceReplaceDisks(instance_name=instance,
670
                                          mode=mode,
671
                                          remote_node=tnode,
672
                                          iallocator=self.opts.iallocator,
673
                                          disks=[],
674
                                          early_release=self.opts.early_release)
675
      Log("run %s %s", mode, msg, indent=2)
676
      self.ExecOrQueue(instance, [op])
677

    
678
  @_DoCheckInstances
679
  @_DoBatch(False)
680
  def BurnFailover(self):
681
    """Failover the instances."""
682
    Log("Failing over instances")
683
    for instance in self.instances:
684
      Log("instance %s", instance, indent=1)
685
      op = opcodes.OpInstanceFailover(instance_name=instance,
686
                                      ignore_consistency=False)
687
      self.ExecOrQueue(instance, [op])
688

    
689
  @_DoCheckInstances
690
  @_DoBatch(False)
691
  def BurnMove(self):
692
    """Move the instances."""
693
    Log("Moving instances")
694
    mytor = izip(islice(cycle(self.nodes), 1, None),
695
                 self.instances)
696
    for tnode, instance in mytor:
697
      Log("instance %s", instance, indent=1)
698
      op = opcodes.OpInstanceMove(instance_name=instance,
699
                                  target_node=tnode)
700
      self.ExecOrQueue(instance, [op])
701

    
702
  @_DoBatch(False)
703
  def BurnMigrate(self):
704
    """Migrate the instances."""
705
    Log("Migrating instances")
706
    for instance in self.instances:
707
      Log("instance %s", instance, indent=1)
708
      op1 = opcodes.OpInstanceMigrate(instance_name=instance, mode=None,
709
                                      cleanup=False)
710

    
711
      op2 = opcodes.OpInstanceMigrate(instance_name=instance, mode=None,
712
                                      cleanup=True)
713
      Log("migration and migration cleanup", indent=2)
714
      self.ExecOrQueue(instance, [op1, op2])
715

    
716
  @_DoCheckInstances
717
  @_DoBatch(False)
718
  def BurnImportExport(self):
719
    """Export the instance, delete it, and import it back.
720

    
721
    """
722
    Log("Exporting and re-importing instances")
723
    mytor = izip(cycle(self.nodes),
724
                 islice(cycle(self.nodes), 1, None),
725
                 islice(cycle(self.nodes), 2, None),
726
                 self.instances)
727

    
728
    for pnode, snode, enode, instance in mytor:
729
      Log("instance %s", instance, indent=1)
730
      # read the full name of the instance
731
      nam_op = opcodes.OpInstanceQuery(output_fields=["name"],
732
                                       names=[instance], use_locking=True)
733
      full_name = self.ExecOp(False, nam_op)[0][0]
734

    
735
      if self.opts.iallocator:
736
        pnode = snode = None
737
        import_log_msg = ("import from %s"
738
                          " with iallocator %s" %
739
                          (enode, self.opts.iallocator))
740
      elif self.opts.disk_template not in constants.DTS_INT_MIRROR:
741
        snode = None
742
        import_log_msg = ("import from %s to %s" %
743
                          (enode, pnode))
744
      else:
745
        import_log_msg = ("import from %s to %s, %s" %
746
                          (enode, pnode, snode))
747

    
748
      exp_op = opcodes.OpBackupExport(instance_name=instance,
749
                                      target_node=enode,
750
                                      mode=constants.EXPORT_MODE_LOCAL,
751
                                      shutdown=True)
752
      rem_op = opcodes.OpInstanceRemove(instance_name=instance,
753
                                        ignore_failures=True)
754
      imp_dir = utils.PathJoin(pathutils.EXPORT_DIR, full_name)
755
      imp_op = opcodes.OpInstanceCreate(instance_name=instance,
756
                                        disks=[{"size": size}
757
                                               for size in self.disk_size],
758
                                        disk_template=self.opts.disk_template,
759
                                        nics=self.opts.nics,
760
                                        mode=constants.INSTANCE_IMPORT,
761
                                        src_node=enode,
762
                                        src_path=imp_dir,
763
                                        pnode=pnode,
764
                                        snode=snode,
765
                                        start=True,
766
                                        ip_check=self.opts.ip_check,
767
                                        name_check=self.opts.name_check,
768
                                        wait_for_sync=True,
769
                                        file_storage_dir=None,
770
                                        file_driver="loop",
771
                                        iallocator=self.opts.iallocator,
772
                                        beparams=self.bep,
773
                                        hvparams=self.hvp,
774
                                        osparams=self.opts.osparams,
775
                                        )
776

    
777
      erem_op = opcodes.OpBackupRemove(instance_name=instance)
778

    
779
      Log("export to node %s", enode, indent=2)
780
      Log("remove instance", indent=2)
781
      Log(import_log_msg, indent=2)
782
      Log("remove export", indent=2)
783
      self.ExecOrQueue(instance, [exp_op, rem_op, imp_op, erem_op])
784

    
785
  @staticmethod
786
  def StopInstanceOp(instance):
787
    """Stop given instance."""
788
    return opcodes.OpInstanceShutdown(instance_name=instance)
789

    
790
  @staticmethod
791
  def StartInstanceOp(instance):
792
    """Start given instance."""
793
    return opcodes.OpInstanceStartup(instance_name=instance, force=False)
794

    
795
  @staticmethod
796
  def RenameInstanceOp(instance, instance_new):
797
    """Rename instance."""
798
    return opcodes.OpInstanceRename(instance_name=instance,
799
                                    new_name=instance_new)
800

    
801
  @_DoCheckInstances
802
  @_DoBatch(True)
803
  def BurnStopStart(self):
804
    """Stop/start the instances."""
805
    Log("Stopping and starting instances")
806
    for instance in self.instances:
807
      Log("instance %s", instance, indent=1)
808
      op1 = self.StopInstanceOp(instance)
809
      op2 = self.StartInstanceOp(instance)
810
      self.ExecOrQueue(instance, [op1, op2])
811

    
812
  @_DoBatch(False)
813
  def BurnRemove(self):
814
    """Remove the instances."""
815
    Log("Removing instances")
816
    for instance in self.to_rem:
817
      Log("instance %s", instance, indent=1)
818
      op = opcodes.OpInstanceRemove(instance_name=instance,
819
                                    ignore_failures=True)
820
      self.ExecOrQueue(instance, [op])
821

    
822
  def BurnRename(self):
823
    """Rename the instances.
824

    
825
    Note that this function will not execute in parallel, since we
826
    only have one target for rename.
827

    
828
    """
829
    Log("Renaming instances")
830
    rename = self.opts.rename
831
    for instance in self.instances:
832
      Log("instance %s", instance, indent=1)
833
      op_stop1 = self.StopInstanceOp(instance)
834
      op_stop2 = self.StopInstanceOp(rename)
835
      op_rename1 = self.RenameInstanceOp(instance, rename)
836
      op_rename2 = self.RenameInstanceOp(rename, instance)
837
      op_start1 = self.StartInstanceOp(rename)
838
      op_start2 = self.StartInstanceOp(instance)
839
      self.ExecOp(False, op_stop1, op_rename1, op_start1)
840
      self._CheckInstanceAlive(rename)
841
      self.ExecOp(False, op_stop2, op_rename2, op_start2)
842
      self._CheckInstanceAlive(instance)
843

    
844
  @_DoCheckInstances
845
  @_DoBatch(True)
846
  def BurnReinstall(self):
847
    """Reinstall the instances."""
848
    Log("Reinstalling instances")
849
    for instance in self.instances:
850
      Log("instance %s", instance, indent=1)
851
      op1 = self.StopInstanceOp(instance)
852
      op2 = opcodes.OpInstanceReinstall(instance_name=instance)
853
      Log("reinstall without passing the OS", indent=2)
854
      op3 = opcodes.OpInstanceReinstall(instance_name=instance,
855
                                        os_type=self.opts.os)
856
      Log("reinstall specifying the OS", indent=2)
857
      op4 = self.StartInstanceOp(instance)
858
      self.ExecOrQueue(instance, [op1, op2, op3, op4])
859

    
860
  @_DoCheckInstances
861
  @_DoBatch(True)
862
  def BurnReboot(self):
863
    """Reboot the instances."""
864
    Log("Rebooting instances")
865
    for instance in self.instances:
866
      Log("instance %s", instance, indent=1)
867
      ops = []
868
      for reboot_type in self.opts.reboot_types:
869
        op = opcodes.OpInstanceReboot(instance_name=instance,
870
                                      reboot_type=reboot_type,
871
                                      ignore_secondaries=False)
872
        Log("reboot with type '%s'", reboot_type, indent=2)
873
        ops.append(op)
874
      self.ExecOrQueue(instance, ops)
875

    
876
  @_DoCheckInstances
877
  @_DoBatch(True)
878
  def BurnRenameSame(self):
879
    """Rename the instances to their own name."""
880
    Log("Renaming the instances to their own name")
881
    for instance in self.instances:
882
      Log("instance %s", instance, indent=1)
883
      op1 = self.StopInstanceOp(instance)
884
      op2 = self.RenameInstanceOp(instance, instance)
885
      Log("rename to the same name", indent=2)
886
      op4 = self.StartInstanceOp(instance)
887
      self.ExecOrQueue(instance, [op1, op2, op4])
888

    
889
  @_DoCheckInstances
890
  @_DoBatch(True)
891
  def BurnActivateDisks(self):
892
    """Activate and deactivate disks of the instances."""
893
    Log("Activating/deactivating disks")
894
    for instance in self.instances:
895
      Log("instance %s", instance, indent=1)
896
      op_start = self.StartInstanceOp(instance)
897
      op_act = opcodes.OpInstanceActivateDisks(instance_name=instance)
898
      op_deact = opcodes.OpInstanceDeactivateDisks(instance_name=instance)
899
      op_stop = self.StopInstanceOp(instance)
900
      Log("activate disks when online", indent=2)
901
      Log("activate disks when offline", indent=2)
902
      Log("deactivate disks (when offline)", indent=2)
903
      self.ExecOrQueue(instance, [op_act, op_stop, op_act, op_deact, op_start])
904

    
905
  @_DoCheckInstances
906
  @_DoBatch(False)
907
  def BurnAddRemoveDisks(self):
908
    """Add and remove an extra disk for the instances."""
909
    Log("Adding and removing disks")
910
    for instance in self.instances:
911
      Log("instance %s", instance, indent=1)
912
      op_add = opcodes.OpInstanceSetParams(
913
        instance_name=instance,
914
        disks=[(constants.DDM_ADD, {"size": self.disk_size[0]})])
915
      op_rem = opcodes.OpInstanceSetParams(
916
        instance_name=instance, disks=[(constants.DDM_REMOVE, {})])
917
      op_stop = self.StopInstanceOp(instance)
918
      op_start = self.StartInstanceOp(instance)
919
      Log("adding a disk", indent=2)
920
      Log("removing last disk", indent=2)
921
      self.ExecOrQueue(instance, [op_add, op_stop, op_rem, op_start])
922

    
923
  @_DoBatch(False)
924
  def BurnAddRemoveNICs(self):
925
    """Add, change and remove an extra NIC for the instances."""
926
    Log("Adding and removing NICs")
927
    for instance in self.instances:
928
      Log("instance %s", instance, indent=1)
929
      op_add = opcodes.OpInstanceSetParams(
930
        instance_name=instance, nics=[(constants.DDM_ADD, {})])
931
      op_chg = opcodes.OpInstanceSetParams(
932
        instance_name=instance, nics=[(constants.DDM_MODIFY,
933
                                       -1, {"mac": constants.VALUE_GENERATE})])
934
      op_rem = opcodes.OpInstanceSetParams(
935
        instance_name=instance, nics=[(constants.DDM_REMOVE, {})])
936
      Log("adding a NIC", indent=2)
937
      Log("changing a NIC", indent=2)
938
      Log("removing last NIC", indent=2)
939
      self.ExecOrQueue(instance, [op_add, op_chg, op_rem])
940

    
941
  def ConfdCallback(self, reply):
942
    """Callback for confd queries"""
943
    if reply.type == confd_client.UPCALL_REPLY:
944
      if reply.server_reply.status != constants.CONFD_REPL_STATUS_OK:
945
        Err("Query %s gave non-ok status %s: %s" % (reply.orig_request,
946
                                                    reply.server_reply.status,
947
                                                    reply.server_reply))
948
      if reply.orig_request.type == constants.CONFD_REQ_PING:
949
        Log("Ping: OK", indent=1)
950
      elif reply.orig_request.type == constants.CONFD_REQ_CLUSTER_MASTER:
951
        if reply.server_reply.answer == self.cluster_info["master"]:
952
          Log("Master: OK", indent=1)
953
        else:
954
          Err("Master: wrong: %s" % reply.server_reply.answer)
955
      elif reply.orig_request.type == constants.CONFD_REQ_NODE_ROLE_BYNAME:
956
        if reply.server_reply.answer == constants.CONFD_NODE_ROLE_MASTER:
957
          Log("Node role for master: OK", indent=1)
958
        else:
959
          Err("Node role for master: wrong: %s" % reply.server_reply.answer)
960

    
961
  def DoConfdRequestReply(self, req):
962
    self.confd_counting_callback.RegisterQuery(req.rsalt)
963
    self.confd_client.SendRequest(req, async=False)
964
    while not self.confd_counting_callback.AllAnswered():
965
      if not self.confd_client.ReceiveReply():
966
        Err("Did not receive all expected confd replies")
967
        break
968

    
969
  def BurnConfd(self):
970
    """Run confd queries for our instances.
971

    
972
    The following confd queries are tested:
973
      - CONFD_REQ_PING: simple ping
974
      - CONFD_REQ_CLUSTER_MASTER: cluster master
975
      - CONFD_REQ_NODE_ROLE_BYNAME: node role, for the master
976

    
977
    """
978
    Log("Checking confd results")
979

    
980
    filter_callback = confd_client.ConfdFilterCallback(self.ConfdCallback)
981
    counting_callback = confd_client.ConfdCountingCallback(filter_callback)
982
    self.confd_counting_callback = counting_callback
983

    
984
    self.confd_client = confd_client.GetConfdClient(counting_callback)
985

    
986
    req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
987
    self.DoConfdRequestReply(req)
988

    
989
    req = confd_client.ConfdClientRequest(
990
      type=constants.CONFD_REQ_CLUSTER_MASTER)
991
    self.DoConfdRequestReply(req)
992

    
993
    req = confd_client.ConfdClientRequest(
994
        type=constants.CONFD_REQ_NODE_ROLE_BYNAME,
995
        query=self.cluster_info["master"])
996
    self.DoConfdRequestReply(req)
997

    
998
  def _CheckInstanceAlive(self, instance):
999
    """Check if an instance is alive by doing http checks.
1000

    
1001
    This will try to retrieve the url on the instance /hostname.txt
1002
    and check that it contains the hostname of the instance. In case
1003
    we get ECONNREFUSED, we retry up to the net timeout seconds, for
1004
    any other error we abort.
1005

    
1006
    """
1007
    if not self.opts.http_check:
1008
      return
1009
    end_time = time.time() + self.opts.net_timeout
1010
    url = None
1011
    while time.time() < end_time and url is None:
1012
      try:
1013
        url = self.url_opener.open("http://%s/hostname.txt" % instance)
1014
      except IOError:
1015
        # here we can have connection refused, no route to host, etc.
1016
        time.sleep(1)
1017
    if url is None:
1018
      raise InstanceDown(instance, "Cannot contact instance")
1019
    hostname = url.read().strip()
1020
    url.close()
1021
    if hostname != instance:
1022
      raise InstanceDown(instance, ("Hostname mismatch, expected %s, got %s" %
1023
                                    (instance, hostname)))
1024

    
1025
  def BurninCluster(self):
1026
    """Test a cluster intensively.
1027

    
1028
    This will create instances and then start/stop/failover them.
1029
    It is safe for existing instances but could impact performance.
1030

    
1031
    """
1032

    
1033
    opts = self.opts
1034

    
1035
    Log("Testing global parameters")
1036

    
1037
    if (len(self.nodes) == 1 and
1038
        opts.disk_template not in (constants.DT_DISKLESS, constants.DT_PLAIN,
1039
                                   constants.DT_FILE,
1040
                                   constants.DT_SHARED_FILE)):
1041
      Err("When one node is available/selected the disk template must"
1042
          " be 'diskless', 'file' or 'plain'")
1043

    
1044
    if opts.do_confd_tests and not constants.ENABLE_CONFD:
1045
      Err("You selected confd tests but confd was disabled at configure time")
1046

    
1047
    has_err = True
1048
    try:
1049
      self.BurnCreateInstances()
1050

    
1051
      if self.bep[constants.BE_MINMEM] < self.bep[constants.BE_MAXMEM]:
1052
        self.BurnModifyRuntimeMemory()
1053

    
1054
      if opts.do_replace1 and opts.disk_template in constants.DTS_INT_MIRROR:
1055
        self.BurnReplaceDisks1D8()
1056
      if (opts.do_replace2 and len(self.nodes) > 2 and
1057
          opts.disk_template in constants.DTS_INT_MIRROR):
1058
        self.BurnReplaceDisks2()
1059

    
1060
      if (opts.disk_template in constants.DTS_GROWABLE and
1061
          compat.any(n > 0 for n in self.disk_growth)):
1062
        self.BurnGrowDisks()
1063

    
1064
      if opts.do_failover and opts.disk_template in constants.DTS_MIRRORED:
1065
        self.BurnFailover()
1066

    
1067
      if opts.do_migrate:
1068
        if opts.disk_template not in constants.DTS_MIRRORED:
1069
          Log("Skipping migration (disk template %s does not support it)",
1070
              opts.disk_template)
1071
        elif not self.hv_class.CAN_MIGRATE:
1072
          Log("Skipping migration (hypervisor %s does not support it)",
1073
              self.hypervisor)
1074
        else:
1075
          self.BurnMigrate()
1076

    
1077
      if (opts.do_move and len(self.nodes) > 1 and
1078
          opts.disk_template in [constants.DT_PLAIN, constants.DT_FILE]):
1079
        self.BurnMove()
1080

    
1081
      if (opts.do_importexport and
1082
          opts.disk_template not in (constants.DT_DISKLESS,
1083
                                     constants.DT_SHARED_FILE,
1084
                                     constants.DT_FILE)):
1085
        self.BurnImportExport()
1086

    
1087
      if opts.do_reinstall:
1088
        self.BurnReinstall()
1089

    
1090
      if opts.do_reboot:
1091
        self.BurnReboot()
1092

    
1093
      if opts.do_renamesame:
1094
        self.BurnRenameSame()
1095

    
1096
      if opts.do_addremove_disks:
1097
        self.BurnAddRemoveDisks()
1098

    
1099
      default_nic_mode = self.cluster_default_nicparams[constants.NIC_MODE]
1100
      # Don't add/remove nics in routed mode, as we would need an ip to add
1101
      # them with
1102
      if opts.do_addremove_nics:
1103
        if default_nic_mode == constants.NIC_MODE_BRIDGED:
1104
          self.BurnAddRemoveNICs()
1105
        else:
1106
          Log("Skipping nic add/remove as the cluster is not in bridged mode")
1107

    
1108
      if opts.do_activate_disks:
1109
        self.BurnActivateDisks()
1110

    
1111
      if opts.rename:
1112
        self.BurnRename()
1113

    
1114
      if opts.do_confd_tests:
1115
        self.BurnConfd()
1116

    
1117
      if opts.do_startstop:
1118
        self.BurnStopStart()
1119

    
1120
      has_err = False
1121
    finally:
1122
      if has_err:
1123
        Log("Error detected: opcode buffer follows:\n\n")
1124
        Log(self.GetFeedbackBuf())
1125
        Log("\n\n")
1126
      if not self.opts.keep_instances:
1127
        try:
1128
          self.BurnRemove()
1129
        except Exception, err:  # pylint: disable=W0703
1130
          if has_err: # already detected errors, so errors in removal
1131
                      # are quite expected
1132
            Log("Note: error detected during instance remove: %s", err)
1133
          else: # non-expected error
1134
            raise
1135

    
1136
    return constants.EXIT_SUCCESS
1137

    
1138

    
1139
def main():
1140
  """Main function.
1141

    
1142
  """
1143
  utils.SetupLogging(pathutils.LOG_BURNIN, sys.argv[0],
1144
                     debug=False, stderr_logging=True)
1145

    
1146
  return Burner().BurninCluster()
1147

    
1148

    
1149
if __name__ == "__main__":
1150
  main()