Statistics
| Branch: | Tag: | Revision:

root / tools / burnin @ 376631d1

History | View | Annotate | Download (41.4 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Burnin program
23

    
24
"""
25

    
26
import sys
27
import optparse
28
import time
29
import socket
30
import urllib
31
from itertools import izip, islice, cycle
32
from cStringIO import StringIO
33

    
34
from ganeti import opcodes
35
from ganeti import constants
36
from ganeti import cli
37
from ganeti import errors
38
from ganeti import utils
39
from ganeti import hypervisor
40
from ganeti import compat
41
from ganeti import pathutils
42

    
43
from ganeti.confd import client as confd_client
44

    
45

    
46
USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...")
47

    
48
MAX_RETRIES = 3
49
LOG_HEADERS = {
50
  0: "- ",
51
  1: "* ",
52
  2: "",
53
  }
54

    
55

    
56
class InstanceDown(Exception):
57
  """The checked instance was not up"""
58

    
59

    
60
class BurninFailure(Exception):
61
  """Failure detected during burning"""
62

    
63

    
64
def Usage():
65
  """Shows program usage information and exits the program."""
66

    
67
  print >> sys.stderr, "Usage:"
68
  print >> sys.stderr, USAGE
69
  sys.exit(2)
70

    
71

    
72
def Log(msg, *args, **kwargs):
73
  """Simple function that prints out its argument.
74

    
75
  """
76
  if args:
77
    msg = msg % args
78
  indent = kwargs.get("indent", 0)
79
  sys.stdout.write("%*s%s%s\n" % (2 * indent, "",
80
                                  LOG_HEADERS.get(indent, "  "), msg))
81
  sys.stdout.flush()
82

    
83

    
84
def Err(msg, exit_code=1):
85
  """Simple error logging that prints to stderr.
86

    
87
  """
88
  sys.stderr.write(msg + "\n")
89
  sys.stderr.flush()
90
  sys.exit(exit_code)
91

    
92

    
93
class SimpleOpener(urllib.FancyURLopener):
94
  """A simple url opener"""
95
  # pylint: disable=W0221
96

    
97
  def prompt_user_passwd(self, host, realm, clear_cache=0):
98
    """No-interaction version of prompt_user_passwd."""
99
    # we follow parent class' API
100
    # pylint: disable=W0613
101
    return None, None
102

    
103
  def http_error_default(self, url, fp, errcode, errmsg, headers):
104
    """Custom error handling"""
105
    # make sure sockets are not left in CLOSE_WAIT, this is similar
106
    # but with a different exception to the BasicURLOpener class
107
    _ = fp.read() # throw away data
108
    fp.close()
109
    raise InstanceDown("HTTP error returned: code %s, msg %s" %
110
                       (errcode, errmsg))
111

    
112

    
113
OPTIONS = [
114
  cli.cli_option("-o", "--os", dest="os", default=None,
115
                 help="OS to use during burnin",
116
                 metavar="<OS>",
117
                 completion_suggest=cli.OPT_COMPL_ONE_OS),
118
  cli.HYPERVISOR_OPT,
119
  cli.OSPARAMS_OPT,
120
  cli.cli_option("--disk-size", dest="disk_size",
121
                 help="Disk size (determines disk count)",
122
                 default="128m", type="string", metavar="<size,size,...>",
123
                 completion_suggest=("128M 512M 1G 4G 1G,256M"
124
                                     " 4G,1G,1G 10G").split()),
125
  cli.cli_option("--disk-growth", dest="disk_growth", help="Disk growth",
126
                 default="128m", type="string", metavar="<size,size,...>"),
127
  cli.cli_option("--mem-size", dest="mem_size", help="Memory size",
128
                 default=None, type="unit", metavar="<size>",
129
                 completion_suggest=("128M 256M 512M 1G 4G 8G"
130
                                     " 12G 16G").split()),
131
  cli.cli_option("--maxmem-size", dest="maxmem_size", help="Max Memory size",
132
                 default=256, type="unit", metavar="<size>",
133
                 completion_suggest=("128M 256M 512M 1G 4G 8G"
134
                                     " 12G 16G").split()),
135
  cli.cli_option("--minmem-size", dest="minmem_size", help="Min Memory size",
136
                 default=128, type="unit", metavar="<size>",
137
                 completion_suggest=("128M 256M 512M 1G 4G 8G"
138
                                     " 12G 16G").split()),
139
  cli.cli_option("--vcpu-count", dest="vcpu_count", help="VCPU count",
140
                 default=3, type="unit", metavar="<count>",
141
                 completion_suggest=("1 2 3 4").split()),
142
  cli.DEBUG_OPT,
143
  cli.VERBOSE_OPT,
144
  cli.NOIPCHECK_OPT,
145
  cli.NONAMECHECK_OPT,
146
  cli.EARLY_RELEASE_OPT,
147
  cli.cli_option("--no-replace1", dest="do_replace1",
148
                 help="Skip disk replacement with the same secondary",
149
                 action="store_false", default=True),
150
  cli.cli_option("--no-replace2", dest="do_replace2",
151
                 help="Skip disk replacement with a different secondary",
152
                 action="store_false", default=True),
153
  cli.cli_option("--no-failover", dest="do_failover",
154
                 help="Skip instance failovers", action="store_false",
155
                 default=True),
156
  cli.cli_option("--no-migrate", dest="do_migrate",
157
                 help="Skip instance live migration",
158
                 action="store_false", default=True),
159
  cli.cli_option("--no-move", dest="do_move",
160
                 help="Skip instance moves", action="store_false",
161
                 default=True),
162
  cli.cli_option("--no-importexport", dest="do_importexport",
163
                 help="Skip instance export/import", action="store_false",
164
                 default=True),
165
  cli.cli_option("--no-startstop", dest="do_startstop",
166
                 help="Skip instance stop/start", action="store_false",
167
                 default=True),
168
  cli.cli_option("--no-reinstall", dest="do_reinstall",
169
                 help="Skip instance reinstall", action="store_false",
170
                 default=True),
171
  cli.cli_option("--no-reboot", dest="do_reboot",
172
                 help="Skip instance reboot", action="store_false",
173
                 default=True),
174
  cli.cli_option("--no-renamesame", dest="do_renamesame",
175
                 help="Skip instance rename to same name", action="store_false",
176
                 default=True),
177
  cli.cli_option("--reboot-types", dest="reboot_types",
178
                 help="Specify the reboot types", default=None),
179
  cli.cli_option("--no-activate-disks", dest="do_activate_disks",
180
                 help="Skip disk activation/deactivation",
181
                 action="store_false", default=True),
182
  cli.cli_option("--no-add-disks", dest="do_addremove_disks",
183
                 help="Skip disk addition/removal",
184
                 action="store_false", default=True),
185
  cli.cli_option("--no-add-nics", dest="do_addremove_nics",
186
                 help="Skip NIC addition/removal",
187
                 action="store_false", default=True),
188
  cli.cli_option("--no-nics", dest="nics",
189
                 help="No network interfaces", action="store_const",
190
                 const=[], default=[{}]),
191
  cli.cli_option("--no-confd", dest="do_confd_tests",
192
                 help="Skip confd queries",
193
                 action="store_false", default=constants.ENABLE_CONFD),
194
  cli.cli_option("--rename", dest="rename", default=None,
195
                 help=("Give one unused instance name which is taken"
196
                       " to start the renaming sequence"),
197
                 metavar="<instance_name>"),
198
  cli.cli_option("-t", "--disk-template", dest="disk_template",
199
                 choices=list(constants.DISK_TEMPLATES),
200
                 default=constants.DT_DRBD8,
201
                 help="Disk template (diskless, file, plain, sharedfile"
202
                 " or drbd) [drbd]"),
203
  cli.cli_option("-n", "--nodes", dest="nodes", default="",
204
                 help=("Comma separated list of nodes to perform"
205
                       " the burnin on (defaults to all nodes)"),
206
                 completion_suggest=cli.OPT_COMPL_MANY_NODES),
207
  cli.cli_option("-I", "--iallocator", dest="iallocator",
208
                 default=None, type="string",
209
                 help=("Perform the allocation using an iallocator"
210
                       " instead of fixed node spread (node restrictions no"
211
                       " longer apply, therefore -n/--nodes must not be"
212
                       " used"),
213
                 completion_suggest=cli.OPT_COMPL_ONE_IALLOCATOR),
214
  cli.cli_option("-p", "--parallel", default=False, action="store_true",
215
                 dest="parallel",
216
                 help=("Enable parallelization of some operations in"
217
                       " order to speed burnin or to test granular locking")),
218
  cli.cli_option("--net-timeout", default=15, type="int",
219
                 dest="net_timeout",
220
                 help=("The instance check network timeout in seconds"
221
                       " (defaults to 15 seconds)"),
222
                 completion_suggest="15 60 300 900".split()),
223
  cli.cli_option("-C", "--http-check", default=False, action="store_true",
224
                 dest="http_check",
225
                 help=("Enable checking of instance status via http,"
226
                       " looking for /hostname.txt that should contain the"
227
                       " name of the instance")),
228
  cli.cli_option("-K", "--keep-instances", default=False,
229
                 action="store_true",
230
                 dest="keep_instances",
231
                 help=("Leave instances on the cluster after burnin,"
232
                       " for investigation in case of errors or simply"
233
                       " to use them")),
234
  ]
235

    
236
# Mainly used for bash completion
237
ARGUMENTS = [cli.ArgInstance(min=1)]
238

    
239

    
240
def _DoCheckInstances(fn):
241
  """Decorator for checking instances.
242

    
243
  """
244
  def wrapper(self, *args, **kwargs):
245
    val = fn(self, *args, **kwargs)
246
    for instance in self.instances:
247
      self._CheckInstanceAlive(instance) # pylint: disable=W0212
248
    return val
249

    
250
  return wrapper
251

    
252

    
253
def _DoBatch(retry):
254
  """Decorator for possible batch operations.
255

    
256
  Must come after the _DoCheckInstances decorator (if any).
257

    
258
  @param retry: whether this is a retryable batch, will be
259
      passed to StartBatch
260

    
261
  """
262
  def wrap(fn):
263
    def batched(self, *args, **kwargs):
264
      self.StartBatch(retry)
265
      val = fn(self, *args, **kwargs)
266
      self.CommitQueue()
267
      return val
268
    return batched
269

    
270
  return wrap
271

    
272

    
273
class Burner(object):
274
  """Burner class."""
275

    
276
  def __init__(self):
277
    """Constructor."""
278
    self.url_opener = SimpleOpener()
279
    self._feed_buf = StringIO()
280
    self.nodes = []
281
    self.instances = []
282
    self.to_rem = []
283
    self.queued_ops = []
284
    self.opts = None
285
    self.queue_retry = False
286
    self.disk_count = self.disk_growth = self.disk_size = None
287
    self.hvp = self.bep = None
288
    self.ParseOptions()
289
    self.cl = cli.GetClient()
290
    self.GetState()
291

    
292
  def ClearFeedbackBuf(self):
293
    """Clear the feedback buffer."""
294
    self._feed_buf.truncate(0)
295

    
296
  def GetFeedbackBuf(self):
297
    """Return the contents of the buffer."""
298
    return self._feed_buf.getvalue()
299

    
300
  def Feedback(self, msg):
301
    """Acumulate feedback in our buffer."""
302
    formatted_msg = "%s %s" % (time.ctime(utils.MergeTime(msg[0])), msg[2])
303
    self._feed_buf.write(formatted_msg + "\n")
304
    if self.opts.verbose:
305
      Log(formatted_msg, indent=3)
306

    
307
  def MaybeRetry(self, retry_count, msg, fn, *args):
308
    """Possibly retry a given function execution.
309

    
310
    @type retry_count: int
311
    @param retry_count: retry counter:
312
        - 0: non-retryable action
313
        - 1: last retry for a retryable action
314
        - MAX_RETRIES: original try for a retryable action
315
    @type msg: str
316
    @param msg: the kind of the operation
317
    @type fn: callable
318
    @param fn: the function to be called
319

    
320
    """
321
    try:
322
      val = fn(*args)
323
      if retry_count > 0 and retry_count < MAX_RETRIES:
324
        Log("Idempotent %s succeeded after %d retries",
325
            msg, MAX_RETRIES - retry_count)
326
      return val
327
    except Exception, err: # pylint: disable=W0703
328
      if retry_count == 0:
329
        Log("Non-idempotent %s failed, aborting", msg)
330
        raise
331
      elif retry_count == 1:
332
        Log("Idempotent %s repeated failure, aborting", msg)
333
        raise
334
      else:
335
        Log("Idempotent %s failed, retry #%d/%d: %s",
336
            msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err)
337
        self.MaybeRetry(retry_count - 1, msg, fn, *args)
338

    
339
  def _ExecOp(self, *ops):
340
    """Execute one or more opcodes and manage the exec buffer.
341

    
342
    @return: if only opcode has been passed, we return its result;
343
        otherwise we return the list of results
344

    
345
    """
346
    job_id = cli.SendJob(ops, cl=self.cl)
347
    results = cli.PollJob(job_id, cl=self.cl, feedback_fn=self.Feedback)
348
    if len(ops) == 1:
349
      return results[0]
350
    else:
351
      return results
352

    
353
  def ExecOp(self, retry, *ops):
354
    """Execute one or more opcodes and manage the exec buffer.
355

    
356
    @return: if only opcode has been passed, we return its result;
357
        otherwise we return the list of results
358

    
359
    """
360
    if retry:
361
      rval = MAX_RETRIES
362
    else:
363
      rval = 0
364
    cli.SetGenericOpcodeOpts(ops, self.opts)
365
    return self.MaybeRetry(rval, "opcode", self._ExecOp, *ops)
366

    
367
  def ExecOrQueue(self, name, ops, post_process=None):
368
    """Execute an opcode and manage the exec buffer."""
369
    if self.opts.parallel:
370
      cli.SetGenericOpcodeOpts(ops, self.opts)
371
      self.queued_ops.append((ops, name, post_process))
372
    else:
373
      val = self.ExecOp(self.queue_retry, *ops) # pylint: disable=W0142
374
      if post_process is not None:
375
        post_process()
376
      return val
377

    
378
  def StartBatch(self, retry):
379
    """Start a new batch of jobs.
380

    
381
    @param retry: whether this is a retryable batch
382

    
383
    """
384
    self.queued_ops = []
385
    self.queue_retry = retry
386

    
387
  def CommitQueue(self):
388
    """Execute all submitted opcodes in case of parallel burnin"""
389
    if not self.opts.parallel or not self.queued_ops:
390
      return
391

    
392
    if self.queue_retry:
393
      rval = MAX_RETRIES
394
    else:
395
      rval = 0
396

    
397
    try:
398
      results = self.MaybeRetry(rval, "jobset", self.ExecJobSet,
399
                                self.queued_ops)
400
    finally:
401
      self.queued_ops = []
402
    return results
403

    
404
  def ExecJobSet(self, jobs):
405
    """Execute a set of jobs and return once all are done.
406

    
407
    The method will return the list of results, if all jobs are
408
    successful. Otherwise, OpExecError will be raised from within
409
    cli.py.
410

    
411
    """
412
    self.ClearFeedbackBuf()
413
    jex = cli.JobExecutor(cl=self.cl, feedback_fn=self.Feedback)
414
    for ops, name, _ in jobs:
415
      jex.QueueJob(name, *ops) # pylint: disable=W0142
416
    try:
417
      results = jex.GetResults()
418
    except Exception, err: # pylint: disable=W0703
419
      Log("Jobs failed: %s", err)
420
      raise BurninFailure()
421

    
422
    fail = False
423
    val = []
424
    for (_, name, post_process), (success, result) in zip(jobs, results):
425
      if success:
426
        if post_process:
427
          try:
428
            post_process()
429
          except Exception, err: # pylint: disable=W0703
430
            Log("Post process call for job %s failed: %s", name, err)
431
            fail = True
432
        val.append(result)
433
      else:
434
        fail = True
435

    
436
    if fail:
437
      raise BurninFailure()
438

    
439
    return val
440

    
441
  def ParseOptions(self):
442
    """Parses the command line options.
443

    
444
    In case of command line errors, it will show the usage and exit the
445
    program.
446

    
447
    """
448
    parser = optparse.OptionParser(usage="\n%s" % USAGE,
449
                                   version=("%%prog (ganeti) %s" %
450
                                            constants.RELEASE_VERSION),
451
                                   option_list=OPTIONS)
452

    
453
    options, args = parser.parse_args()
454
    if len(args) < 1 or options.os is None:
455
      Usage()
456

    
457
    if options.mem_size:
458
      options.maxmem_size = options.mem_size
459
      options.minmem_size = options.mem_size
460
    elif options.minmem_size > options.maxmem_size:
461
      Err("Maximum memory lower than minimum memory")
462

    
463
    supported_disk_templates = (constants.DT_DISKLESS,
464
                                constants.DT_FILE,
465
                                constants.DT_SHARED_FILE,
466
                                constants.DT_PLAIN,
467
                                constants.DT_DRBD8,
468
                                constants.DT_RBD,
469
                                constants.DT_EXT,
470
                                )
471
    if options.disk_template not in supported_disk_templates:
472
      Err("Unknown disk template '%s'" % options.disk_template)
473

    
474
    if options.disk_template == constants.DT_DISKLESS:
475
      disk_size = disk_growth = []
476
      options.do_addremove_disks = False
477
    else:
478
      disk_size = [utils.ParseUnit(v) for v in options.disk_size.split(",")]
479
      disk_growth = [utils.ParseUnit(v)
480
                     for v in options.disk_growth.split(",")]
481
      if len(disk_growth) != len(disk_size):
482
        Err("Wrong disk sizes/growth combination")
483
    if ((disk_size and options.disk_template == constants.DT_DISKLESS) or
484
        (not disk_size and options.disk_template != constants.DT_DISKLESS)):
485
      Err("Wrong disk count/disk template combination")
486

    
487
    self.disk_size = disk_size
488
    self.disk_growth = disk_growth
489
    self.disk_count = len(disk_size)
490

    
491
    if options.nodes and options.iallocator:
492
      Err("Give either the nodes option or the iallocator option, not both")
493

    
494
    if options.http_check and not options.name_check:
495
      Err("Can't enable HTTP checks without name checks")
496

    
497
    self.opts = options
498
    self.instances = args
499
    self.bep = {
500
      constants.BE_MINMEM: options.minmem_size,
501
      constants.BE_MAXMEM: options.maxmem_size,
502
      constants.BE_VCPUS: options.vcpu_count,
503
      }
504

    
505
    self.hypervisor = None
506
    self.hvp = {}
507
    if options.hypervisor:
508
      self.hypervisor, self.hvp = options.hypervisor
509

    
510
    if options.reboot_types is None:
511
      options.reboot_types = constants.REBOOT_TYPES
512
    else:
513
      options.reboot_types = options.reboot_types.split(",")
514
      rt_diff = set(options.reboot_types).difference(constants.REBOOT_TYPES)
515
      if rt_diff:
516
        Err("Invalid reboot types specified: %s" % utils.CommaJoin(rt_diff))
517

    
518
    socket.setdefaulttimeout(options.net_timeout)
519

    
520
  def GetState(self):
521
    """Read the cluster state from the master daemon."""
522
    if self.opts.nodes:
523
      names = self.opts.nodes.split(",")
524
    else:
525
      names = []
526
    try:
527
      op = opcodes.OpNodeQuery(output_fields=["name", "offline", "drained"],
528
                               names=names, use_locking=True)
529
      result = self.ExecOp(True, op)
530
    except errors.GenericError, err:
531
      err_code, msg = cli.FormatError(err)
532
      Err(msg, exit_code=err_code)
533
    self.nodes = [data[0] for data in result if not (data[1] or data[2])]
534

    
535
    op_diagnose = opcodes.OpOsDiagnose(output_fields=["name",
536
                                                      "variants",
537
                                                      "hidden"],
538
                                       names=[])
539
    result = self.ExecOp(True, op_diagnose)
540

    
541
    if not result:
542
      Err("Can't get the OS list")
543

    
544
    found = False
545
    for (name, variants, _) in result:
546
      if self.opts.os in cli.CalculateOSNames(name, variants):
547
        found = True
548
        break
549

    
550
    if not found:
551
      Err("OS '%s' not found" % self.opts.os)
552

    
553
    cluster_info = self.cl.QueryClusterInfo()
554
    self.cluster_info = cluster_info
555
    if not self.cluster_info:
556
      Err("Can't get cluster info")
557

    
558
    default_nic_params = self.cluster_info["nicparams"][constants.PP_DEFAULT]
559
    self.cluster_default_nicparams = default_nic_params
560
    if self.hypervisor is None:
561
      self.hypervisor = self.cluster_info["default_hypervisor"]
562
    self.hv_class = hypervisor.GetHypervisorClass(self.hypervisor)
563

    
564
  @_DoCheckInstances
565
  @_DoBatch(False)
566
  def BurnCreateInstances(self):
567
    """Create the given instances.
568

    
569
    """
570
    self.to_rem = []
571
    mytor = izip(cycle(self.nodes),
572
                 islice(cycle(self.nodes), 1, None),
573
                 self.instances)
574

    
575
    Log("Creating instances")
576
    for pnode, snode, instance in mytor:
577
      Log("instance %s", instance, indent=1)
578
      if self.opts.iallocator:
579
        pnode = snode = None
580
        msg = "with iallocator %s" % self.opts.iallocator
581
      elif self.opts.disk_template not in constants.DTS_INT_MIRROR:
582
        snode = None
583
        msg = "on %s" % pnode
584
      else:
585
        msg = "on %s, %s" % (pnode, snode)
586

    
587
      Log(msg, indent=2)
588

    
589
      op = opcodes.OpInstanceCreate(instance_name=instance,
590
                                    disks=[{"size": size}
591
                                           for size in self.disk_size],
592
                                    disk_template=self.opts.disk_template,
593
                                    nics=self.opts.nics,
594
                                    mode=constants.INSTANCE_CREATE,
595
                                    os_type=self.opts.os,
596
                                    pnode=pnode,
597
                                    snode=snode,
598
                                    start=True,
599
                                    ip_check=self.opts.ip_check,
600
                                    name_check=self.opts.name_check,
601
                                    wait_for_sync=True,
602
                                    file_driver="loop",
603
                                    file_storage_dir=None,
604
                                    iallocator=self.opts.iallocator,
605
                                    beparams=self.bep,
606
                                    hvparams=self.hvp,
607
                                    hypervisor=self.hypervisor,
608
                                    osparams=self.opts.osparams,
609
                                    )
610
      remove_instance = lambda name: lambda: self.to_rem.append(name)
611
      self.ExecOrQueue(instance, [op], post_process=remove_instance(instance))
612

    
613
  @_DoBatch(False)
614
  def BurnModifyRuntimeMemory(self):
615
    """Alter the runtime memory."""
616
    Log("Setting instance runtime memory")
617
    for instance in self.instances:
618
      Log("instance %s", instance, indent=1)
619
      tgt_mem = self.bep[constants.BE_MINMEM]
620
      op = opcodes.OpInstanceSetParams(instance_name=instance,
621
                                       runtime_mem=tgt_mem)
622
      Log("Set memory to %s MB", tgt_mem, indent=2)
623
      self.ExecOrQueue(instance, [op])
624

    
625
  @_DoBatch(False)
626
  def BurnGrowDisks(self):
627
    """Grow both the os and the swap disks by the requested amount, if any."""
628
    Log("Growing disks")
629
    for instance in self.instances:
630
      Log("instance %s", instance, indent=1)
631
      for idx, growth in enumerate(self.disk_growth):
632
        if growth > 0:
633
          op = opcodes.OpInstanceGrowDisk(instance_name=instance, disk=idx,
634
                                          amount=growth, wait_for_sync=True)
635
          Log("increase disk/%s by %s MB", idx, growth, indent=2)
636
          self.ExecOrQueue(instance, [op])
637

    
638
  @_DoBatch(True)
639
  def BurnReplaceDisks1D8(self):
640
    """Replace disks on primary and secondary for drbd8."""
641
    Log("Replacing disks on the same nodes")
642
    early_release = self.opts.early_release
643
    for instance in self.instances:
644
      Log("instance %s", instance, indent=1)
645
      ops = []
646
      for mode in constants.REPLACE_DISK_SEC, constants.REPLACE_DISK_PRI:
647
        op = opcodes.OpInstanceReplaceDisks(instance_name=instance,
648
                                            mode=mode,
649
                                            disks=list(range(self.disk_count)),
650
                                            early_release=early_release)
651
        Log("run %s", mode, indent=2)
652
        ops.append(op)
653
      self.ExecOrQueue(instance, ops)
654

    
655
  @_DoBatch(True)
656
  def BurnReplaceDisks2(self):
657
    """Replace secondary node."""
658
    Log("Changing the secondary node")
659
    mode = constants.REPLACE_DISK_CHG
660

    
661
    mytor = izip(islice(cycle(self.nodes), 2, None),
662
                 self.instances)
663
    for tnode, instance in mytor:
664
      Log("instance %s", instance, indent=1)
665
      if self.opts.iallocator:
666
        tnode = None
667
        msg = "with iallocator %s" % self.opts.iallocator
668
      else:
669
        msg = tnode
670
      op = opcodes.OpInstanceReplaceDisks(instance_name=instance,
671
                                          mode=mode,
672
                                          remote_node=tnode,
673
                                          iallocator=self.opts.iallocator,
674
                                          disks=[],
675
                                          early_release=self.opts.early_release)
676
      Log("run %s %s", mode, msg, indent=2)
677
      self.ExecOrQueue(instance, [op])
678

    
679
  @_DoCheckInstances
680
  @_DoBatch(False)
681
  def BurnFailover(self):
682
    """Failover the instances."""
683
    Log("Failing over instances")
684
    for instance in self.instances:
685
      Log("instance %s", instance, indent=1)
686
      op = opcodes.OpInstanceFailover(instance_name=instance,
687
                                      ignore_consistency=False)
688
      self.ExecOrQueue(instance, [op])
689

    
690
  @_DoCheckInstances
691
  @_DoBatch(False)
692
  def BurnMove(self):
693
    """Move the instances."""
694
    Log("Moving instances")
695
    mytor = izip(islice(cycle(self.nodes), 1, None),
696
                 self.instances)
697
    for tnode, instance in mytor:
698
      Log("instance %s", instance, indent=1)
699
      op = opcodes.OpInstanceMove(instance_name=instance,
700
                                  target_node=tnode)
701
      self.ExecOrQueue(instance, [op])
702

    
703
  @_DoBatch(False)
704
  def BurnMigrate(self):
705
    """Migrate the instances."""
706
    Log("Migrating instances")
707
    for instance in self.instances:
708
      Log("instance %s", instance, indent=1)
709
      op1 = opcodes.OpInstanceMigrate(instance_name=instance, mode=None,
710
                                      cleanup=False)
711

    
712
      op2 = opcodes.OpInstanceMigrate(instance_name=instance, mode=None,
713
                                      cleanup=True)
714
      Log("migration and migration cleanup", indent=2)
715
      self.ExecOrQueue(instance, [op1, op2])
716

    
717
  @_DoCheckInstances
718
  @_DoBatch(False)
719
  def BurnImportExport(self):
720
    """Export the instance, delete it, and import it back.
721

    
722
    """
723
    Log("Exporting and re-importing instances")
724
    mytor = izip(cycle(self.nodes),
725
                 islice(cycle(self.nodes), 1, None),
726
                 islice(cycle(self.nodes), 2, None),
727
                 self.instances)
728

    
729
    for pnode, snode, enode, instance in mytor:
730
      Log("instance %s", instance, indent=1)
731
      # read the full name of the instance
732
      nam_op = opcodes.OpInstanceQuery(output_fields=["name"],
733
                                       names=[instance], use_locking=True)
734
      full_name = self.ExecOp(False, nam_op)[0][0]
735

    
736
      if self.opts.iallocator:
737
        pnode = snode = None
738
        import_log_msg = ("import from %s"
739
                          " with iallocator %s" %
740
                          (enode, self.opts.iallocator))
741
      elif self.opts.disk_template not in constants.DTS_INT_MIRROR:
742
        snode = None
743
        import_log_msg = ("import from %s to %s" %
744
                          (enode, pnode))
745
      else:
746
        import_log_msg = ("import from %s to %s, %s" %
747
                          (enode, pnode, snode))
748

    
749
      exp_op = opcodes.OpBackupExport(instance_name=instance,
750
                                      target_node=enode,
751
                                      mode=constants.EXPORT_MODE_LOCAL,
752
                                      shutdown=True)
753
      rem_op = opcodes.OpInstanceRemove(instance_name=instance,
754
                                        ignore_failures=True)
755
      imp_dir = utils.PathJoin(pathutils.EXPORT_DIR, full_name)
756
      imp_op = opcodes.OpInstanceCreate(instance_name=instance,
757
                                        disks=[{"size": size}
758
                                               for size in self.disk_size],
759
                                        disk_template=self.opts.disk_template,
760
                                        nics=self.opts.nics,
761
                                        mode=constants.INSTANCE_IMPORT,
762
                                        src_node=enode,
763
                                        src_path=imp_dir,
764
                                        pnode=pnode,
765
                                        snode=snode,
766
                                        start=True,
767
                                        ip_check=self.opts.ip_check,
768
                                        name_check=self.opts.name_check,
769
                                        wait_for_sync=True,
770
                                        file_storage_dir=None,
771
                                        file_driver="loop",
772
                                        iallocator=self.opts.iallocator,
773
                                        beparams=self.bep,
774
                                        hvparams=self.hvp,
775
                                        osparams=self.opts.osparams,
776
                                        )
777

    
778
      erem_op = opcodes.OpBackupRemove(instance_name=instance)
779

    
780
      Log("export to node %s", enode, indent=2)
781
      Log("remove instance", indent=2)
782
      Log(import_log_msg, indent=2)
783
      Log("remove export", indent=2)
784
      self.ExecOrQueue(instance, [exp_op, rem_op, imp_op, erem_op])
785

    
786
  @staticmethod
787
  def StopInstanceOp(instance):
788
    """Stop given instance."""
789
    return opcodes.OpInstanceShutdown(instance_name=instance)
790

    
791
  @staticmethod
792
  def StartInstanceOp(instance):
793
    """Start given instance."""
794
    return opcodes.OpInstanceStartup(instance_name=instance, force=False)
795

    
796
  @staticmethod
797
  def RenameInstanceOp(instance, instance_new):
798
    """Rename instance."""
799
    return opcodes.OpInstanceRename(instance_name=instance,
800
                                    new_name=instance_new)
801

    
802
  @_DoCheckInstances
803
  @_DoBatch(True)
804
  def BurnStopStart(self):
805
    """Stop/start the instances."""
806
    Log("Stopping and starting instances")
807
    for instance in self.instances:
808
      Log("instance %s", instance, indent=1)
809
      op1 = self.StopInstanceOp(instance)
810
      op2 = self.StartInstanceOp(instance)
811
      self.ExecOrQueue(instance, [op1, op2])
812

    
813
  @_DoBatch(False)
814
  def BurnRemove(self):
815
    """Remove the instances."""
816
    Log("Removing instances")
817
    for instance in self.to_rem:
818
      Log("instance %s", instance, indent=1)
819
      op = opcodes.OpInstanceRemove(instance_name=instance,
820
                                    ignore_failures=True)
821
      self.ExecOrQueue(instance, [op])
822

    
823
  def BurnRename(self):
824
    """Rename the instances.
825

    
826
    Note that this function will not execute in parallel, since we
827
    only have one target for rename.
828

    
829
    """
830
    Log("Renaming instances")
831
    rename = self.opts.rename
832
    for instance in self.instances:
833
      Log("instance %s", instance, indent=1)
834
      op_stop1 = self.StopInstanceOp(instance)
835
      op_stop2 = self.StopInstanceOp(rename)
836
      op_rename1 = self.RenameInstanceOp(instance, rename)
837
      op_rename2 = self.RenameInstanceOp(rename, instance)
838
      op_start1 = self.StartInstanceOp(rename)
839
      op_start2 = self.StartInstanceOp(instance)
840
      self.ExecOp(False, op_stop1, op_rename1, op_start1)
841
      self._CheckInstanceAlive(rename)
842
      self.ExecOp(False, op_stop2, op_rename2, op_start2)
843
      self._CheckInstanceAlive(instance)
844

    
845
  @_DoCheckInstances
846
  @_DoBatch(True)
847
  def BurnReinstall(self):
848
    """Reinstall the instances."""
849
    Log("Reinstalling instances")
850
    for instance in self.instances:
851
      Log("instance %s", instance, indent=1)
852
      op1 = self.StopInstanceOp(instance)
853
      op2 = opcodes.OpInstanceReinstall(instance_name=instance)
854
      Log("reinstall without passing the OS", indent=2)
855
      op3 = opcodes.OpInstanceReinstall(instance_name=instance,
856
                                        os_type=self.opts.os)
857
      Log("reinstall specifying the OS", indent=2)
858
      op4 = self.StartInstanceOp(instance)
859
      self.ExecOrQueue(instance, [op1, op2, op3, op4])
860

    
861
  @_DoCheckInstances
862
  @_DoBatch(True)
863
  def BurnReboot(self):
864
    """Reboot the instances."""
865
    Log("Rebooting instances")
866
    for instance in self.instances:
867
      Log("instance %s", instance, indent=1)
868
      ops = []
869
      for reboot_type in self.opts.reboot_types:
870
        op = opcodes.OpInstanceReboot(instance_name=instance,
871
                                      reboot_type=reboot_type,
872
                                      ignore_secondaries=False)
873
        Log("reboot with type '%s'", reboot_type, indent=2)
874
        ops.append(op)
875
      self.ExecOrQueue(instance, ops)
876

    
877
  @_DoCheckInstances
878
  @_DoBatch(True)
879
  def BurnRenameSame(self):
880
    """Rename the instances to their own name."""
881
    Log("Renaming the instances to their own name")
882
    for instance in self.instances:
883
      Log("instance %s", instance, indent=1)
884
      op1 = self.StopInstanceOp(instance)
885
      op2 = self.RenameInstanceOp(instance, instance)
886
      Log("rename to the same name", indent=2)
887
      op4 = self.StartInstanceOp(instance)
888
      self.ExecOrQueue(instance, [op1, op2, op4])
889

    
890
  @_DoCheckInstances
891
  @_DoBatch(True)
892
  def BurnActivateDisks(self):
893
    """Activate and deactivate disks of the instances."""
894
    Log("Activating/deactivating disks")
895
    for instance in self.instances:
896
      Log("instance %s", instance, indent=1)
897
      op_start = self.StartInstanceOp(instance)
898
      op_act = opcodes.OpInstanceActivateDisks(instance_name=instance)
899
      op_deact = opcodes.OpInstanceDeactivateDisks(instance_name=instance)
900
      op_stop = self.StopInstanceOp(instance)
901
      Log("activate disks when online", indent=2)
902
      Log("activate disks when offline", indent=2)
903
      Log("deactivate disks (when offline)", indent=2)
904
      self.ExecOrQueue(instance, [op_act, op_stop, op_act, op_deact, op_start])
905

    
906
  @_DoCheckInstances
907
  @_DoBatch(False)
908
  def BurnAddRemoveDisks(self):
909
    """Add and remove an extra disk for the instances."""
910
    Log("Adding and removing disks")
911
    for instance in self.instances:
912
      Log("instance %s", instance, indent=1)
913
      op_add = opcodes.OpInstanceSetParams(
914
        instance_name=instance,
915
        disks=[(constants.DDM_ADD, {"size": self.disk_size[0]})])
916
      op_rem = opcodes.OpInstanceSetParams(
917
        instance_name=instance, disks=[(constants.DDM_REMOVE, {})])
918
      op_stop = self.StopInstanceOp(instance)
919
      op_start = self.StartInstanceOp(instance)
920
      Log("adding a disk", indent=2)
921
      Log("removing last disk", indent=2)
922
      self.ExecOrQueue(instance, [op_add, op_stop, op_rem, op_start])
923

    
924
  @_DoBatch(False)
925
  def BurnAddRemoveNICs(self):
926
    """Add, change and remove an extra NIC for the instances."""
927
    Log("Adding and removing NICs")
928
    for instance in self.instances:
929
      Log("instance %s", instance, indent=1)
930
      op_add = opcodes.OpInstanceSetParams(
931
        instance_name=instance, nics=[(constants.DDM_ADD, {})])
932
      op_chg = opcodes.OpInstanceSetParams(
933
        instance_name=instance, nics=[(constants.DDM_MODIFY,
934
                                       -1, {"mac": constants.VALUE_GENERATE})])
935
      op_rem = opcodes.OpInstanceSetParams(
936
        instance_name=instance, nics=[(constants.DDM_REMOVE, {})])
937
      Log("adding a NIC", indent=2)
938
      Log("changing a NIC", indent=2)
939
      Log("removing last NIC", indent=2)
940
      self.ExecOrQueue(instance, [op_add, op_chg, op_rem])
941

    
942
  def ConfdCallback(self, reply):
943
    """Callback for confd queries"""
944
    if reply.type == confd_client.UPCALL_REPLY:
945
      if reply.server_reply.status != constants.CONFD_REPL_STATUS_OK:
946
        Err("Query %s gave non-ok status %s: %s" % (reply.orig_request,
947
                                                    reply.server_reply.status,
948
                                                    reply.server_reply))
949
      if reply.orig_request.type == constants.CONFD_REQ_PING:
950
        Log("Ping: OK", indent=1)
951
      elif reply.orig_request.type == constants.CONFD_REQ_CLUSTER_MASTER:
952
        if reply.server_reply.answer == self.cluster_info["master"]:
953
          Log("Master: OK", indent=1)
954
        else:
955
          Err("Master: wrong: %s" % reply.server_reply.answer)
956
      elif reply.orig_request.type == constants.CONFD_REQ_NODE_ROLE_BYNAME:
957
        if reply.server_reply.answer == constants.CONFD_NODE_ROLE_MASTER:
958
          Log("Node role for master: OK", indent=1)
959
        else:
960
          Err("Node role for master: wrong: %s" % reply.server_reply.answer)
961

    
962
  def DoConfdRequestReply(self, req):
963
    self.confd_counting_callback.RegisterQuery(req.rsalt)
964
    self.confd_client.SendRequest(req, async=False)
965
    while not self.confd_counting_callback.AllAnswered():
966
      if not self.confd_client.ReceiveReply():
967
        Err("Did not receive all expected confd replies")
968
        break
969

    
970
  def BurnConfd(self):
971
    """Run confd queries for our instances.
972

    
973
    The following confd queries are tested:
974
      - CONFD_REQ_PING: simple ping
975
      - CONFD_REQ_CLUSTER_MASTER: cluster master
976
      - CONFD_REQ_NODE_ROLE_BYNAME: node role, for the master
977

    
978
    """
979
    Log("Checking confd results")
980

    
981
    filter_callback = confd_client.ConfdFilterCallback(self.ConfdCallback)
982
    counting_callback = confd_client.ConfdCountingCallback(filter_callback)
983
    self.confd_counting_callback = counting_callback
984

    
985
    self.confd_client = confd_client.GetConfdClient(counting_callback)
986

    
987
    req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
988
    self.DoConfdRequestReply(req)
989

    
990
    req = confd_client.ConfdClientRequest(
991
      type=constants.CONFD_REQ_CLUSTER_MASTER)
992
    self.DoConfdRequestReply(req)
993

    
994
    req = confd_client.ConfdClientRequest(
995
        type=constants.CONFD_REQ_NODE_ROLE_BYNAME,
996
        query=self.cluster_info["master"])
997
    self.DoConfdRequestReply(req)
998

    
999
  def _CheckInstanceAlive(self, instance):
1000
    """Check if an instance is alive by doing http checks.
1001

    
1002
    This will try to retrieve the url on the instance /hostname.txt
1003
    and check that it contains the hostname of the instance. In case
1004
    we get ECONNREFUSED, we retry up to the net timeout seconds, for
1005
    any other error we abort.
1006

    
1007
    """
1008
    if not self.opts.http_check:
1009
      return
1010
    end_time = time.time() + self.opts.net_timeout
1011
    url = None
1012
    while time.time() < end_time and url is None:
1013
      try:
1014
        url = self.url_opener.open("http://%s/hostname.txt" % instance)
1015
      except IOError:
1016
        # here we can have connection refused, no route to host, etc.
1017
        time.sleep(1)
1018
    if url is None:
1019
      raise InstanceDown(instance, "Cannot contact instance")
1020
    hostname = url.read().strip()
1021
    url.close()
1022
    if hostname != instance:
1023
      raise InstanceDown(instance, ("Hostname mismatch, expected %s, got %s" %
1024
                                    (instance, hostname)))
1025

    
1026
  def BurninCluster(self):
1027
    """Test a cluster intensively.
1028

    
1029
    This will create instances and then start/stop/failover them.
1030
    It is safe for existing instances but could impact performance.
1031

    
1032
    """
1033

    
1034
    opts = self.opts
1035

    
1036
    Log("Testing global parameters")
1037

    
1038
    if (len(self.nodes) == 1 and
1039
        opts.disk_template not in (constants.DT_DISKLESS, constants.DT_PLAIN,
1040
                                   constants.DT_FILE,
1041
                                   constants.DT_SHARED_FILE)):
1042
      Err("When one node is available/selected the disk template must"
1043
          " be 'diskless', 'file' or 'plain'")
1044

    
1045
    if opts.do_confd_tests and not constants.ENABLE_CONFD:
1046
      Err("You selected confd tests but confd was disabled at configure time")
1047

    
1048
    has_err = True
1049
    try:
1050
      self.BurnCreateInstances()
1051

    
1052
      if self.bep[constants.BE_MINMEM] < self.bep[constants.BE_MAXMEM]:
1053
        self.BurnModifyRuntimeMemory()
1054

    
1055
      if opts.do_replace1 and opts.disk_template in constants.DTS_INT_MIRROR:
1056
        self.BurnReplaceDisks1D8()
1057
      if (opts.do_replace2 and len(self.nodes) > 2 and
1058
          opts.disk_template in constants.DTS_INT_MIRROR):
1059
        self.BurnReplaceDisks2()
1060

    
1061
      if (opts.disk_template in constants.DTS_GROWABLE and
1062
          compat.any(n > 0 for n in self.disk_growth)):
1063
        self.BurnGrowDisks()
1064

    
1065
      if opts.do_failover and opts.disk_template in constants.DTS_MIRRORED:
1066
        self.BurnFailover()
1067

    
1068
      if opts.do_migrate:
1069
        if opts.disk_template not in constants.DTS_MIRRORED:
1070
          Log("Skipping migration (disk template %s does not support it)",
1071
              opts.disk_template)
1072
        elif not self.hv_class.CAN_MIGRATE:
1073
          Log("Skipping migration (hypervisor %s does not support it)",
1074
              self.hypervisor)
1075
        else:
1076
          self.BurnMigrate()
1077

    
1078
      if (opts.do_move and len(self.nodes) > 1 and
1079
          opts.disk_template in [constants.DT_PLAIN, constants.DT_FILE]):
1080
        self.BurnMove()
1081

    
1082
      if (opts.do_importexport and
1083
          opts.disk_template not in (constants.DT_DISKLESS,
1084
                                     constants.DT_SHARED_FILE,
1085
                                     constants.DT_FILE)):
1086
        self.BurnImportExport()
1087

    
1088
      if opts.do_reinstall:
1089
        self.BurnReinstall()
1090

    
1091
      if opts.do_reboot:
1092
        self.BurnReboot()
1093

    
1094
      if opts.do_renamesame:
1095
        self.BurnRenameSame()
1096

    
1097
      if opts.do_addremove_disks:
1098
        self.BurnAddRemoveDisks()
1099

    
1100
      default_nic_mode = self.cluster_default_nicparams[constants.NIC_MODE]
1101
      # Don't add/remove nics in routed mode, as we would need an ip to add
1102
      # them with
1103
      if opts.do_addremove_nics:
1104
        if default_nic_mode == constants.NIC_MODE_BRIDGED:
1105
          self.BurnAddRemoveNICs()
1106
        else:
1107
          Log("Skipping nic add/remove as the cluster is not in bridged mode")
1108

    
1109
      if opts.do_activate_disks:
1110
        self.BurnActivateDisks()
1111

    
1112
      if opts.rename:
1113
        self.BurnRename()
1114

    
1115
      if opts.do_confd_tests:
1116
        self.BurnConfd()
1117

    
1118
      if opts.do_startstop:
1119
        self.BurnStopStart()
1120

    
1121
      has_err = False
1122
    finally:
1123
      if has_err:
1124
        Log("Error detected: opcode buffer follows:\n\n")
1125
        Log(self.GetFeedbackBuf())
1126
        Log("\n\n")
1127
      if not self.opts.keep_instances:
1128
        try:
1129
          self.BurnRemove()
1130
        except Exception, err:  # pylint: disable=W0703
1131
          if has_err: # already detected errors, so errors in removal
1132
                      # are quite expected
1133
            Log("Note: error detected during instance remove: %s", err)
1134
          else: # non-expected error
1135
            raise
1136

    
1137
    return constants.EXIT_SUCCESS
1138

    
1139

    
1140
def main():
1141
  """Main function.
1142

    
1143
  """
1144
  utils.SetupLogging(pathutils.LOG_BURNIN, sys.argv[0],
1145
                     debug=False, stderr_logging=True)
1146

    
1147
  return Burner().BurninCluster()
1148

    
1149

    
1150
if __name__ == "__main__":
1151
  main()