Statistics
| Branch: | Tag: | Revision:

root / tools / burnin @ 3c286190

History | View | Annotate | Download (40.7 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Burnin program
23

    
24
"""
25

    
26
import sys
27
import optparse
28
import time
29
import socket
30
import urllib
31
from itertools import izip, islice, cycle
32
from cStringIO import StringIO
33

    
34
from ganeti import opcodes
35
from ganeti import constants
36
from ganeti import cli
37
from ganeti import errors
38
from ganeti import utils
39
from ganeti import hypervisor
40
from ganeti import compat
41
from ganeti import pathutils
42

    
43
from ganeti.confd import client as confd_client
44

    
45

    
46
USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...")
47

    
48
MAX_RETRIES = 3
49
LOG_HEADERS = {
50
  0: "- ",
51
  1: "* ",
52
  2: "",
53
  }
54

    
55

    
56
class InstanceDown(Exception):
57
  """The checked instance was not up"""
58

    
59

    
60
class BurninFailure(Exception):
61
  """Failure detected during burning"""
62

    
63

    
64
def Usage():
65
  """Shows program usage information and exits the program."""
66

    
67
  print >> sys.stderr, "Usage:"
68
  print >> sys.stderr, USAGE
69
  sys.exit(2)
70

    
71

    
72
def Log(msg, *args, **kwargs):
73
  """Simple function that prints out its argument.
74

    
75
  """
76
  if args:
77
    msg = msg % args
78
  indent = kwargs.get("indent", 0)
79
  sys.stdout.write("%*s%s%s\n" % (2 * indent, "",
80
                                  LOG_HEADERS.get(indent, "  "), msg))
81
  sys.stdout.flush()
82

    
83

    
84
def Err(msg, exit_code=1):
85
  """Simple error logging that prints to stderr.
86

    
87
  """
88
  sys.stderr.write(msg + "\n")
89
  sys.stderr.flush()
90
  sys.exit(exit_code)
91

    
92

    
93
class SimpleOpener(urllib.FancyURLopener):
94
  """A simple url opener"""
95
  # pylint: disable=W0221
96

    
97
  def prompt_user_passwd(self, host, realm, clear_cache=0):
98
    """No-interaction version of prompt_user_passwd."""
99
    # we follow parent class' API
100
    # pylint: disable=W0613
101
    return None, None
102

    
103
  def http_error_default(self, url, fp, errcode, errmsg, headers):
104
    """Custom error handling"""
105
    # make sure sockets are not left in CLOSE_WAIT, this is similar
106
    # but with a different exception to the BasicURLOpener class
107
    _ = fp.read() # throw away data
108
    fp.close()
109
    raise InstanceDown("HTTP error returned: code %s, msg %s" %
110
                       (errcode, errmsg))
111

    
112

    
113
OPTIONS = [
114
  cli.cli_option("-o", "--os", dest="os", default=None,
115
                 help="OS to use during burnin",
116
                 metavar="<OS>",
117
                 completion_suggest=cli.OPT_COMPL_ONE_OS),
118
  cli.HYPERVISOR_OPT,
119
  cli.OSPARAMS_OPT,
120
  cli.cli_option("--disk-size", dest="disk_size",
121
                 help="Disk size (determines disk count)",
122
                 default="128m", type="string", metavar="<size,size,...>",
123
                 completion_suggest=("128M 512M 1G 4G 1G,256M"
124
                                     " 4G,1G,1G 10G").split()),
125
  cli.cli_option("--disk-growth", dest="disk_growth", help="Disk growth",
126
                 default="128m", type="string", metavar="<size,size,...>"),
127
  cli.cli_option("--mem-size", dest="mem_size", help="Memory size",
128
                 default=None, type="unit", metavar="<size>",
129
                 completion_suggest=("128M 256M 512M 1G 4G 8G"
130
                                     " 12G 16G").split()),
131
  cli.cli_option("--maxmem-size", dest="maxmem_size", help="Max Memory size",
132
                 default=256, type="unit", metavar="<size>",
133
                 completion_suggest=("128M 256M 512M 1G 4G 8G"
134
                                     " 12G 16G").split()),
135
  cli.cli_option("--minmem-size", dest="minmem_size", help="Min Memory size",
136
                 default=128, type="unit", metavar="<size>",
137
                 completion_suggest=("128M 256M 512M 1G 4G 8G"
138
                                     " 12G 16G").split()),
139
  cli.cli_option("--vcpu-count", dest="vcpu_count", help="VCPU count",
140
                 default=3, type="unit", metavar="<count>",
141
                 completion_suggest=("1 2 3 4").split()),
142
  cli.DEBUG_OPT,
143
  cli.VERBOSE_OPT,
144
  cli.NOIPCHECK_OPT,
145
  cli.NONAMECHECK_OPT,
146
  cli.EARLY_RELEASE_OPT,
147
  cli.cli_option("--no-replace1", dest="do_replace1",
148
                 help="Skip disk replacement with the same secondary",
149
                 action="store_false", default=True),
150
  cli.cli_option("--no-replace2", dest="do_replace2",
151
                 help="Skip disk replacement with a different secondary",
152
                 action="store_false", default=True),
153
  cli.cli_option("--no-failover", dest="do_failover",
154
                 help="Skip instance failovers", action="store_false",
155
                 default=True),
156
  cli.cli_option("--no-migrate", dest="do_migrate",
157
                 help="Skip instance live migration",
158
                 action="store_false", default=True),
159
  cli.cli_option("--no-move", dest="do_move",
160
                 help="Skip instance moves", action="store_false",
161
                 default=True),
162
  cli.cli_option("--no-importexport", dest="do_importexport",
163
                 help="Skip instance export/import", action="store_false",
164
                 default=True),
165
  cli.cli_option("--no-startstop", dest="do_startstop",
166
                 help="Skip instance stop/start", action="store_false",
167
                 default=True),
168
  cli.cli_option("--no-reinstall", dest="do_reinstall",
169
                 help="Skip instance reinstall", action="store_false",
170
                 default=True),
171
  cli.cli_option("--no-reboot", dest="do_reboot",
172
                 help="Skip instance reboot", action="store_false",
173
                 default=True),
174
  cli.cli_option("--reboot-types", dest="reboot_types",
175
                 help="Specify the reboot types", default=None),
176
  cli.cli_option("--no-activate-disks", dest="do_activate_disks",
177
                 help="Skip disk activation/deactivation",
178
                 action="store_false", default=True),
179
  cli.cli_option("--no-add-disks", dest="do_addremove_disks",
180
                 help="Skip disk addition/removal",
181
                 action="store_false", default=True),
182
  cli.cli_option("--no-add-nics", dest="do_addremove_nics",
183
                 help="Skip NIC addition/removal",
184
                 action="store_false", default=True),
185
  cli.cli_option("--no-nics", dest="nics",
186
                 help="No network interfaces", action="store_const",
187
                 const=[], default=[{}]),
188
  cli.cli_option("--no-confd", dest="do_confd_tests",
189
                 help="Skip confd queries",
190
                 action="store_false", default=constants.ENABLE_CONFD),
191
  cli.cli_option("--rename", dest="rename", default=None,
192
                 help=("Give one unused instance name which is taken"
193
                       " to start the renaming sequence"),
194
                 metavar="<instance_name>"),
195
  cli.cli_option("-t", "--disk-template", dest="disk_template",
196
                 choices=list(constants.DISK_TEMPLATES),
197
                 default=constants.DT_DRBD8,
198
                 help="Disk template (diskless, file, plain, sharedfile"
199
                 " or drbd) [drbd]"),
200
  cli.cli_option("-n", "--nodes", dest="nodes", default="",
201
                 help=("Comma separated list of nodes to perform"
202
                       " the burnin on (defaults to all nodes)"),
203
                 completion_suggest=cli.OPT_COMPL_MANY_NODES),
204
  cli.cli_option("-I", "--iallocator", dest="iallocator",
205
                 default=None, type="string",
206
                 help=("Perform the allocation using an iallocator"
207
                       " instead of fixed node spread (node restrictions no"
208
                       " longer apply, therefore -n/--nodes must not be"
209
                       " used"),
210
                 completion_suggest=cli.OPT_COMPL_ONE_IALLOCATOR),
211
  cli.cli_option("-p", "--parallel", default=False, action="store_true",
212
                 dest="parallel",
213
                 help=("Enable parallelization of some operations in"
214
                       " order to speed burnin or to test granular locking")),
215
  cli.cli_option("--net-timeout", default=15, type="int",
216
                 dest="net_timeout",
217
                 help=("The instance check network timeout in seconds"
218
                       " (defaults to 15 seconds)"),
219
                 completion_suggest="15 60 300 900".split()),
220
  cli.cli_option("-C", "--http-check", default=False, action="store_true",
221
                 dest="http_check",
222
                 help=("Enable checking of instance status via http,"
223
                       " looking for /hostname.txt that should contain the"
224
                       " name of the instance")),
225
  cli.cli_option("-K", "--keep-instances", default=False,
226
                 action="store_true",
227
                 dest="keep_instances",
228
                 help=("Leave instances on the cluster after burnin,"
229
                       " for investigation in case of errors or simply"
230
                       " to use them")),
231
  ]
232

    
233
# Mainly used for bash completion
234
ARGUMENTS = [cli.ArgInstance(min=1)]
235

    
236

    
237
def _DoCheckInstances(fn):
238
  """Decorator for checking instances.
239

    
240
  """
241
  def wrapper(self, *args, **kwargs):
242
    val = fn(self, *args, **kwargs)
243
    for instance in self.instances:
244
      self._CheckInstanceAlive(instance) # pylint: disable=W0212
245
    return val
246

    
247
  return wrapper
248

    
249

    
250
def _DoBatch(retry):
251
  """Decorator for possible batch operations.
252

    
253
  Must come after the _DoCheckInstances decorator (if any).
254

    
255
  @param retry: whether this is a retryable batch, will be
256
      passed to StartBatch
257

    
258
  """
259
  def wrap(fn):
260
    def batched(self, *args, **kwargs):
261
      self.StartBatch(retry)
262
      val = fn(self, *args, **kwargs)
263
      self.CommitQueue()
264
      return val
265
    return batched
266

    
267
  return wrap
268

    
269

    
270
class Burner(object):
271
  """Burner class."""
272

    
273
  def __init__(self):
274
    """Constructor."""
275
    self.url_opener = SimpleOpener()
276
    self._feed_buf = StringIO()
277
    self.nodes = []
278
    self.instances = []
279
    self.to_rem = []
280
    self.queued_ops = []
281
    self.opts = None
282
    self.queue_retry = False
283
    self.disk_count = self.disk_growth = self.disk_size = None
284
    self.hvp = self.bep = None
285
    self.ParseOptions()
286
    self.cl = cli.GetClient()
287
    self.GetState()
288

    
289
  def ClearFeedbackBuf(self):
290
    """Clear the feedback buffer."""
291
    self._feed_buf.truncate(0)
292

    
293
  def GetFeedbackBuf(self):
294
    """Return the contents of the buffer."""
295
    return self._feed_buf.getvalue()
296

    
297
  def Feedback(self, msg):
298
    """Acumulate feedback in our buffer."""
299
    formatted_msg = "%s %s" % (time.ctime(utils.MergeTime(msg[0])), msg[2])
300
    self._feed_buf.write(formatted_msg + "\n")
301
    if self.opts.verbose:
302
      Log(formatted_msg, indent=3)
303

    
304
  def MaybeRetry(self, retry_count, msg, fn, *args):
305
    """Possibly retry a given function execution.
306

    
307
    @type retry_count: int
308
    @param retry_count: retry counter:
309
        - 0: non-retryable action
310
        - 1: last retry for a retryable action
311
        - MAX_RETRIES: original try for a retryable action
312
    @type msg: str
313
    @param msg: the kind of the operation
314
    @type fn: callable
315
    @param fn: the function to be called
316

    
317
    """
318
    try:
319
      val = fn(*args)
320
      if retry_count > 0 and retry_count < MAX_RETRIES:
321
        Log("Idempotent %s succeeded after %d retries",
322
            msg, MAX_RETRIES - retry_count)
323
      return val
324
    except Exception, err: # pylint: disable=W0703
325
      if retry_count == 0:
326
        Log("Non-idempotent %s failed, aborting", msg)
327
        raise
328
      elif retry_count == 1:
329
        Log("Idempotent %s repeated failure, aborting", msg)
330
        raise
331
      else:
332
        Log("Idempotent %s failed, retry #%d/%d: %s",
333
            msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err)
334
        self.MaybeRetry(retry_count - 1, msg, fn, *args)
335

    
336
  def _ExecOp(self, *ops):
337
    """Execute one or more opcodes and manage the exec buffer.
338

    
339
    @return: if only opcode has been passed, we return its result;
340
        otherwise we return the list of results
341

    
342
    """
343
    job_id = cli.SendJob(ops, cl=self.cl)
344
    results = cli.PollJob(job_id, cl=self.cl, feedback_fn=self.Feedback)
345
    if len(ops) == 1:
346
      return results[0]
347
    else:
348
      return results
349

    
350
  def ExecOp(self, retry, *ops):
351
    """Execute one or more opcodes and manage the exec buffer.
352

    
353
    @return: if only opcode has been passed, we return its result;
354
        otherwise we return the list of results
355

    
356
    """
357
    if retry:
358
      rval = MAX_RETRIES
359
    else:
360
      rval = 0
361
    cli.SetGenericOpcodeOpts(ops, self.opts)
362
    return self.MaybeRetry(rval, "opcode", self._ExecOp, *ops)
363

    
364
  def ExecOrQueue(self, name, ops, post_process=None):
365
    """Execute an opcode and manage the exec buffer."""
366
    if self.opts.parallel:
367
      cli.SetGenericOpcodeOpts(ops, self.opts)
368
      self.queued_ops.append((ops, name, post_process))
369
    else:
370
      val = self.ExecOp(self.queue_retry, *ops) # pylint: disable=W0142
371
      if post_process is not None:
372
        post_process()
373
      return val
374

    
375
  def StartBatch(self, retry):
376
    """Start a new batch of jobs.
377

    
378
    @param retry: whether this is a retryable batch
379

    
380
    """
381
    self.queued_ops = []
382
    self.queue_retry = retry
383

    
384
  def CommitQueue(self):
385
    """Execute all submitted opcodes in case of parallel burnin"""
386
    if not self.opts.parallel or not self.queued_ops:
387
      return
388

    
389
    if self.queue_retry:
390
      rval = MAX_RETRIES
391
    else:
392
      rval = 0
393

    
394
    try:
395
      results = self.MaybeRetry(rval, "jobset", self.ExecJobSet,
396
                                self.queued_ops)
397
    finally:
398
      self.queued_ops = []
399
    return results
400

    
401
  def ExecJobSet(self, jobs):
402
    """Execute a set of jobs and return once all are done.
403

    
404
    The method will return the list of results, if all jobs are
405
    successful. Otherwise, OpExecError will be raised from within
406
    cli.py.
407

    
408
    """
409
    self.ClearFeedbackBuf()
410
    jex = cli.JobExecutor(cl=self.cl, feedback_fn=self.Feedback)
411
    for ops, name, _ in jobs:
412
      jex.QueueJob(name, *ops) # pylint: disable=W0142
413
    try:
414
      results = jex.GetResults()
415
    except Exception, err: # pylint: disable=W0703
416
      Log("Jobs failed: %s", err)
417
      raise BurninFailure()
418

    
419
    fail = False
420
    val = []
421
    for (_, name, post_process), (success, result) in zip(jobs, results):
422
      if success:
423
        if post_process:
424
          try:
425
            post_process()
426
          except Exception, err: # pylint: disable=W0703
427
            Log("Post process call for job %s failed: %s", name, err)
428
            fail = True
429
        val.append(result)
430
      else:
431
        fail = True
432

    
433
    if fail:
434
      raise BurninFailure()
435

    
436
    return val
437

    
438
  def ParseOptions(self):
439
    """Parses the command line options.
440

    
441
    In case of command line errors, it will show the usage and exit the
442
    program.
443

    
444
    """
445
    parser = optparse.OptionParser(usage="\n%s" % USAGE,
446
                                   version=("%%prog (ganeti) %s" %
447
                                            constants.RELEASE_VERSION),
448
                                   option_list=OPTIONS)
449

    
450
    options, args = parser.parse_args()
451
    if len(args) < 1 or options.os is None:
452
      Usage()
453

    
454
    if options.mem_size:
455
      options.maxmem_size = options.mem_size
456
      options.minmem_size = options.mem_size
457
    elif options.minmem_size > options.maxmem_size:
458
      Err("Maximum memory lower than minimum memory")
459

    
460
    supported_disk_templates = (constants.DT_DISKLESS,
461
                                constants.DT_FILE,
462
                                constants.DT_SHARED_FILE,
463
                                constants.DT_PLAIN,
464
                                constants.DT_DRBD8,
465
                                constants.DT_RBD,
466
                                )
467
    if options.disk_template not in supported_disk_templates:
468
      Err("Unknown disk template '%s'" % options.disk_template)
469

    
470
    if options.disk_template == constants.DT_DISKLESS:
471
      disk_size = disk_growth = []
472
      options.do_addremove_disks = False
473
    else:
474
      disk_size = [utils.ParseUnit(v) for v in options.disk_size.split(",")]
475
      disk_growth = [utils.ParseUnit(v)
476
                     for v in options.disk_growth.split(",")]
477
      if len(disk_growth) != len(disk_size):
478
        Err("Wrong disk sizes/growth combination")
479
    if ((disk_size and options.disk_template == constants.DT_DISKLESS) or
480
        (not disk_size and options.disk_template != constants.DT_DISKLESS)):
481
      Err("Wrong disk count/disk template combination")
482

    
483
    self.disk_size = disk_size
484
    self.disk_growth = disk_growth
485
    self.disk_count = len(disk_size)
486

    
487
    if options.nodes and options.iallocator:
488
      Err("Give either the nodes option or the iallocator option, not both")
489

    
490
    if options.http_check and not options.name_check:
491
      Err("Can't enable HTTP checks without name checks")
492

    
493
    self.opts = options
494
    self.instances = args
495
    self.bep = {
496
      constants.BE_MINMEM: options.minmem_size,
497
      constants.BE_MAXMEM: options.maxmem_size,
498
      constants.BE_VCPUS: options.vcpu_count,
499
      }
500

    
501
    self.hypervisor = None
502
    self.hvp = {}
503
    if options.hypervisor:
504
      self.hypervisor, self.hvp = options.hypervisor
505

    
506
    if options.reboot_types is None:
507
      options.reboot_types = constants.REBOOT_TYPES
508
    else:
509
      options.reboot_types = options.reboot_types.split(",")
510
      rt_diff = set(options.reboot_types).difference(constants.REBOOT_TYPES)
511
      if rt_diff:
512
        Err("Invalid reboot types specified: %s" % utils.CommaJoin(rt_diff))
513

    
514
    socket.setdefaulttimeout(options.net_timeout)
515

    
516
  def GetState(self):
517
    """Read the cluster state from the master daemon."""
518
    if self.opts.nodes:
519
      names = self.opts.nodes.split(",")
520
    else:
521
      names = []
522
    try:
523
      op = opcodes.OpNodeQuery(output_fields=["name", "offline", "drained"],
524
                               names=names, use_locking=True)
525
      result = self.ExecOp(True, op)
526
    except errors.GenericError, err:
527
      err_code, msg = cli.FormatError(err)
528
      Err(msg, exit_code=err_code)
529
    self.nodes = [data[0] for data in result if not (data[1] or data[2])]
530

    
531
    op_diagnose = opcodes.OpOsDiagnose(output_fields=["name",
532
                                                      "variants",
533
                                                      "hidden"],
534
                                       names=[])
535
    result = self.ExecOp(True, op_diagnose)
536

    
537
    if not result:
538
      Err("Can't get the OS list")
539

    
540
    found = False
541
    for (name, variants, _) in result:
542
      if self.opts.os in cli.CalculateOSNames(name, variants):
543
        found = True
544
        break
545

    
546
    if not found:
547
      Err("OS '%s' not found" % self.opts.os)
548

    
549
    cluster_info = self.cl.QueryClusterInfo()
550
    self.cluster_info = cluster_info
551
    if not self.cluster_info:
552
      Err("Can't get cluster info")
553

    
554
    default_nic_params = self.cluster_info["nicparams"][constants.PP_DEFAULT]
555
    self.cluster_default_nicparams = default_nic_params
556
    if self.hypervisor is None:
557
      self.hypervisor = self.cluster_info["default_hypervisor"]
558
    self.hv_class = hypervisor.GetHypervisorClass(self.hypervisor)
559

    
560
  @_DoCheckInstances
561
  @_DoBatch(False)
562
  def BurnCreateInstances(self):
563
    """Create the given instances.
564

    
565
    """
566
    self.to_rem = []
567
    mytor = izip(cycle(self.nodes),
568
                 islice(cycle(self.nodes), 1, None),
569
                 self.instances)
570

    
571
    Log("Creating instances")
572
    for pnode, snode, instance in mytor:
573
      Log("instance %s", instance, indent=1)
574
      if self.opts.iallocator:
575
        pnode = snode = None
576
        msg = "with iallocator %s" % self.opts.iallocator
577
      elif self.opts.disk_template not in constants.DTS_INT_MIRROR:
578
        snode = None
579
        msg = "on %s" % pnode
580
      else:
581
        msg = "on %s, %s" % (pnode, snode)
582

    
583
      Log(msg, indent=2)
584

    
585
      op = opcodes.OpInstanceCreate(instance_name=instance,
586
                                    disks=[{"size": size}
587
                                           for size in self.disk_size],
588
                                    disk_template=self.opts.disk_template,
589
                                    nics=self.opts.nics,
590
                                    mode=constants.INSTANCE_CREATE,
591
                                    os_type=self.opts.os,
592
                                    pnode=pnode,
593
                                    snode=snode,
594
                                    start=True,
595
                                    ip_check=self.opts.ip_check,
596
                                    name_check=self.opts.name_check,
597
                                    wait_for_sync=True,
598
                                    file_driver="loop",
599
                                    file_storage_dir=None,
600
                                    iallocator=self.opts.iallocator,
601
                                    beparams=self.bep,
602
                                    hvparams=self.hvp,
603
                                    hypervisor=self.hypervisor,
604
                                    osparams=self.opts.osparams,
605
                                    )
606
      remove_instance = lambda name: lambda: self.to_rem.append(name)
607
      self.ExecOrQueue(instance, [op], post_process=remove_instance(instance))
608

    
609
  @_DoBatch(False)
610
  def BurnModifyRuntimeMemory(self):
611
    """Alter the runtime memory."""
612
    Log("Setting instance runtime memory")
613
    for instance in self.instances:
614
      Log("instance %s", instance, indent=1)
615
      tgt_mem = self.bep[constants.BE_MINMEM]
616
      op = opcodes.OpInstanceSetParams(instance_name=instance,
617
                                       runtime_mem=tgt_mem)
618
      Log("Set memory to %s MB", tgt_mem, indent=2)
619
      self.ExecOrQueue(instance, [op])
620

    
621
  @_DoBatch(False)
622
  def BurnGrowDisks(self):
623
    """Grow both the os and the swap disks by the requested amount, if any."""
624
    Log("Growing disks")
625
    for instance in self.instances:
626
      Log("instance %s", instance, indent=1)
627
      for idx, growth in enumerate(self.disk_growth):
628
        if growth > 0:
629
          op = opcodes.OpInstanceGrowDisk(instance_name=instance, disk=idx,
630
                                          amount=growth, wait_for_sync=True)
631
          Log("increase disk/%s by %s MB", idx, growth, indent=2)
632
          self.ExecOrQueue(instance, [op])
633

    
634
  @_DoBatch(True)
635
  def BurnReplaceDisks1D8(self):
636
    """Replace disks on primary and secondary for drbd8."""
637
    Log("Replacing disks on the same nodes")
638
    early_release = self.opts.early_release
639
    for instance in self.instances:
640
      Log("instance %s", instance, indent=1)
641
      ops = []
642
      for mode in constants.REPLACE_DISK_SEC, constants.REPLACE_DISK_PRI:
643
        op = opcodes.OpInstanceReplaceDisks(instance_name=instance,
644
                                            mode=mode,
645
                                            disks=list(range(self.disk_count)),
646
                                            early_release=early_release)
647
        Log("run %s", mode, indent=2)
648
        ops.append(op)
649
      self.ExecOrQueue(instance, ops)
650

    
651
  @_DoBatch(True)
652
  def BurnReplaceDisks2(self):
653
    """Replace secondary node."""
654
    Log("Changing the secondary node")
655
    mode = constants.REPLACE_DISK_CHG
656

    
657
    mytor = izip(islice(cycle(self.nodes), 2, None),
658
                 self.instances)
659
    for tnode, instance in mytor:
660
      Log("instance %s", instance, indent=1)
661
      if self.opts.iallocator:
662
        tnode = None
663
        msg = "with iallocator %s" % self.opts.iallocator
664
      else:
665
        msg = tnode
666
      op = opcodes.OpInstanceReplaceDisks(instance_name=instance,
667
                                          mode=mode,
668
                                          remote_node=tnode,
669
                                          iallocator=self.opts.iallocator,
670
                                          disks=[],
671
                                          early_release=self.opts.early_release)
672
      Log("run %s %s", mode, msg, indent=2)
673
      self.ExecOrQueue(instance, [op])
674

    
675
  @_DoCheckInstances
676
  @_DoBatch(False)
677
  def BurnFailover(self):
678
    """Failover the instances."""
679
    Log("Failing over instances")
680
    for instance in self.instances:
681
      Log("instance %s", instance, indent=1)
682
      op = opcodes.OpInstanceFailover(instance_name=instance,
683
                                      ignore_consistency=False)
684
      self.ExecOrQueue(instance, [op])
685

    
686
  @_DoCheckInstances
687
  @_DoBatch(False)
688
  def BurnMove(self):
689
    """Move the instances."""
690
    Log("Moving instances")
691
    mytor = izip(islice(cycle(self.nodes), 1, None),
692
                 self.instances)
693
    for tnode, instance in mytor:
694
      Log("instance %s", instance, indent=1)
695
      op = opcodes.OpInstanceMove(instance_name=instance,
696
                                  target_node=tnode)
697
      self.ExecOrQueue(instance, [op])
698

    
699
  @_DoBatch(False)
700
  def BurnMigrate(self):
701
    """Migrate the instances."""
702
    Log("Migrating instances")
703
    for instance in self.instances:
704
      Log("instance %s", instance, indent=1)
705
      op1 = opcodes.OpInstanceMigrate(instance_name=instance, mode=None,
706
                                      cleanup=False)
707

    
708
      op2 = opcodes.OpInstanceMigrate(instance_name=instance, mode=None,
709
                                      cleanup=True)
710
      Log("migration and migration cleanup", indent=2)
711
      self.ExecOrQueue(instance, [op1, op2])
712

    
713
  @_DoCheckInstances
714
  @_DoBatch(False)
715
  def BurnImportExport(self):
716
    """Export the instance, delete it, and import it back.
717

    
718
    """
719
    Log("Exporting and re-importing instances")
720
    mytor = izip(cycle(self.nodes),
721
                 islice(cycle(self.nodes), 1, None),
722
                 islice(cycle(self.nodes), 2, None),
723
                 self.instances)
724

    
725
    for pnode, snode, enode, instance in mytor:
726
      Log("instance %s", instance, indent=1)
727
      # read the full name of the instance
728
      nam_op = opcodes.OpInstanceQuery(output_fields=["name"],
729
                                       names=[instance], use_locking=True)
730
      full_name = self.ExecOp(False, nam_op)[0][0]
731

    
732
      if self.opts.iallocator:
733
        pnode = snode = None
734
        import_log_msg = ("import from %s"
735
                          " with iallocator %s" %
736
                          (enode, self.opts.iallocator))
737
      elif self.opts.disk_template not in constants.DTS_INT_MIRROR:
738
        snode = None
739
        import_log_msg = ("import from %s to %s" %
740
                          (enode, pnode))
741
      else:
742
        import_log_msg = ("import from %s to %s, %s" %
743
                          (enode, pnode, snode))
744

    
745
      exp_op = opcodes.OpBackupExport(instance_name=instance,
746
                                      target_node=enode,
747
                                      mode=constants.EXPORT_MODE_LOCAL,
748
                                      shutdown=True)
749
      rem_op = opcodes.OpInstanceRemove(instance_name=instance,
750
                                        ignore_failures=True)
751
      imp_dir = utils.PathJoin(pathutils.EXPORT_DIR, full_name)
752
      imp_op = opcodes.OpInstanceCreate(instance_name=instance,
753
                                        disks=[{"size": size}
754
                                               for size in self.disk_size],
755
                                        disk_template=self.opts.disk_template,
756
                                        nics=self.opts.nics,
757
                                        mode=constants.INSTANCE_IMPORT,
758
                                        src_node=enode,
759
                                        src_path=imp_dir,
760
                                        pnode=pnode,
761
                                        snode=snode,
762
                                        start=True,
763
                                        ip_check=self.opts.ip_check,
764
                                        name_check=self.opts.name_check,
765
                                        wait_for_sync=True,
766
                                        file_storage_dir=None,
767
                                        file_driver="loop",
768
                                        iallocator=self.opts.iallocator,
769
                                        beparams=self.bep,
770
                                        hvparams=self.hvp,
771
                                        osparams=self.opts.osparams,
772
                                        )
773

    
774
      erem_op = opcodes.OpBackupRemove(instance_name=instance)
775

    
776
      Log("export to node %s", enode, indent=2)
777
      Log("remove instance", indent=2)
778
      Log(import_log_msg, indent=2)
779
      Log("remove export", indent=2)
780
      self.ExecOrQueue(instance, [exp_op, rem_op, imp_op, erem_op])
781

    
782
  @staticmethod
783
  def StopInstanceOp(instance):
784
    """Stop given instance."""
785
    return opcodes.OpInstanceShutdown(instance_name=instance)
786

    
787
  @staticmethod
788
  def StartInstanceOp(instance):
789
    """Start given instance."""
790
    return opcodes.OpInstanceStartup(instance_name=instance, force=False)
791

    
792
  @staticmethod
793
  def RenameInstanceOp(instance, instance_new):
794
    """Rename instance."""
795
    return opcodes.OpInstanceRename(instance_name=instance,
796
                                    new_name=instance_new)
797

    
798
  @_DoCheckInstances
799
  @_DoBatch(True)
800
  def BurnStopStart(self):
801
    """Stop/start the instances."""
802
    Log("Stopping and starting instances")
803
    for instance in self.instances:
804
      Log("instance %s", instance, indent=1)
805
      op1 = self.StopInstanceOp(instance)
806
      op2 = self.StartInstanceOp(instance)
807
      self.ExecOrQueue(instance, [op1, op2])
808

    
809
  @_DoBatch(False)
810
  def BurnRemove(self):
811
    """Remove the instances."""
812
    Log("Removing instances")
813
    for instance in self.to_rem:
814
      Log("instance %s", instance, indent=1)
815
      op = opcodes.OpInstanceRemove(instance_name=instance,
816
                                    ignore_failures=True)
817
      self.ExecOrQueue(instance, [op])
818

    
819
  def BurnRename(self):
820
    """Rename the instances.
821

    
822
    Note that this function will not execute in parallel, since we
823
    only have one target for rename.
824

    
825
    """
826
    Log("Renaming instances")
827
    rename = self.opts.rename
828
    for instance in self.instances:
829
      Log("instance %s", instance, indent=1)
830
      op_stop1 = self.StopInstanceOp(instance)
831
      op_stop2 = self.StopInstanceOp(rename)
832
      op_rename1 = self.RenameInstanceOp(instance, rename)
833
      op_rename2 = self.RenameInstanceOp(rename, instance)
834
      op_start1 = self.StartInstanceOp(rename)
835
      op_start2 = self.StartInstanceOp(instance)
836
      self.ExecOp(False, op_stop1, op_rename1, op_start1)
837
      self._CheckInstanceAlive(rename)
838
      self.ExecOp(False, op_stop2, op_rename2, op_start2)
839
      self._CheckInstanceAlive(instance)
840

    
841
  @_DoCheckInstances
842
  @_DoBatch(True)
843
  def BurnReinstall(self):
844
    """Reinstall the instances."""
845
    Log("Reinstalling instances")
846
    for instance in self.instances:
847
      Log("instance %s", instance, indent=1)
848
      op1 = self.StopInstanceOp(instance)
849
      op2 = opcodes.OpInstanceReinstall(instance_name=instance)
850
      Log("reinstall without passing the OS", indent=2)
851
      op3 = opcodes.OpInstanceReinstall(instance_name=instance,
852
                                        os_type=self.opts.os)
853
      Log("reinstall specifying the OS", indent=2)
854
      op4 = self.StartInstanceOp(instance)
855
      self.ExecOrQueue(instance, [op1, op2, op3, op4])
856

    
857
  @_DoCheckInstances
858
  @_DoBatch(True)
859
  def BurnReboot(self):
860
    """Reboot the instances."""
861
    Log("Rebooting instances")
862
    for instance in self.instances:
863
      Log("instance %s", instance, indent=1)
864
      ops = []
865
      for reboot_type in self.opts.reboot_types:
866
        op = opcodes.OpInstanceReboot(instance_name=instance,
867
                                      reboot_type=reboot_type,
868
                                      ignore_secondaries=False)
869
        Log("reboot with type '%s'", reboot_type, indent=2)
870
        ops.append(op)
871
      self.ExecOrQueue(instance, ops)
872

    
873
  @_DoCheckInstances
874
  @_DoBatch(True)
875
  def BurnActivateDisks(self):
876
    """Activate and deactivate disks of the instances."""
877
    Log("Activating/deactivating disks")
878
    for instance in self.instances:
879
      Log("instance %s", instance, indent=1)
880
      op_start = self.StartInstanceOp(instance)
881
      op_act = opcodes.OpInstanceActivateDisks(instance_name=instance)
882
      op_deact = opcodes.OpInstanceDeactivateDisks(instance_name=instance)
883
      op_stop = self.StopInstanceOp(instance)
884
      Log("activate disks when online", indent=2)
885
      Log("activate disks when offline", indent=2)
886
      Log("deactivate disks (when offline)", indent=2)
887
      self.ExecOrQueue(instance, [op_act, op_stop, op_act, op_deact, op_start])
888

    
889
  @_DoCheckInstances
890
  @_DoBatch(False)
891
  def BurnAddRemoveDisks(self):
892
    """Add and remove an extra disk for the instances."""
893
    Log("Adding and removing disks")
894
    for instance in self.instances:
895
      Log("instance %s", instance, indent=1)
896
      op_add = opcodes.OpInstanceSetParams(
897
        instance_name=instance,
898
        disks=[(constants.DDM_ADD, {"size": self.disk_size[0]})])
899
      op_rem = opcodes.OpInstanceSetParams(
900
        instance_name=instance, disks=[(constants.DDM_REMOVE, {})])
901
      op_stop = self.StopInstanceOp(instance)
902
      op_start = self.StartInstanceOp(instance)
903
      Log("adding a disk", indent=2)
904
      Log("removing last disk", indent=2)
905
      self.ExecOrQueue(instance, [op_add, op_stop, op_rem, op_start])
906

    
907
  @_DoBatch(False)
908
  def BurnAddRemoveNICs(self):
909
    """Add, change and remove an extra NIC for the instances."""
910
    Log("Adding and removing NICs")
911
    for instance in self.instances:
912
      Log("instance %s", instance, indent=1)
913
      op_add = opcodes.OpInstanceSetParams(
914
        instance_name=instance, nics=[(constants.DDM_ADD, {})])
915
      op_chg = opcodes.OpInstanceSetParams(
916
        instance_name=instance, nics=[(constants.DDM_MODIFY,
917
                                       -1, {"mac": constants.VALUE_GENERATE})])
918
      op_rem = opcodes.OpInstanceSetParams(
919
        instance_name=instance, nics=[(constants.DDM_REMOVE, {})])
920
      Log("adding a NIC", indent=2)
921
      Log("changing a NIC", indent=2)
922
      Log("removing last NIC", indent=2)
923
      self.ExecOrQueue(instance, [op_add, op_chg, op_rem])
924

    
925
  def ConfdCallback(self, reply):
926
    """Callback for confd queries"""
927
    if reply.type == confd_client.UPCALL_REPLY:
928
      if reply.server_reply.status != constants.CONFD_REPL_STATUS_OK:
929
        Err("Query %s gave non-ok status %s: %s" % (reply.orig_request,
930
                                                    reply.server_reply.status,
931
                                                    reply.server_reply))
932
      if reply.orig_request.type == constants.CONFD_REQ_PING:
933
        Log("Ping: OK", indent=1)
934
      elif reply.orig_request.type == constants.CONFD_REQ_CLUSTER_MASTER:
935
        if reply.server_reply.answer == self.cluster_info["master"]:
936
          Log("Master: OK", indent=1)
937
        else:
938
          Err("Master: wrong: %s" % reply.server_reply.answer)
939
      elif reply.orig_request.type == constants.CONFD_REQ_NODE_ROLE_BYNAME:
940
        if reply.server_reply.answer == constants.CONFD_NODE_ROLE_MASTER:
941
          Log("Node role for master: OK", indent=1)
942
        else:
943
          Err("Node role for master: wrong: %s" % reply.server_reply.answer)
944

    
945
  def DoConfdRequestReply(self, req):
946
    self.confd_counting_callback.RegisterQuery(req.rsalt)
947
    self.confd_client.SendRequest(req, async=False)
948
    while not self.confd_counting_callback.AllAnswered():
949
      if not self.confd_client.ReceiveReply():
950
        Err("Did not receive all expected confd replies")
951
        break
952

    
953
  def BurnConfd(self):
954
    """Run confd queries for our instances.
955

    
956
    The following confd queries are tested:
957
      - CONFD_REQ_PING: simple ping
958
      - CONFD_REQ_CLUSTER_MASTER: cluster master
959
      - CONFD_REQ_NODE_ROLE_BYNAME: node role, for the master
960

    
961
    """
962
    Log("Checking confd results")
963

    
964
    filter_callback = confd_client.ConfdFilterCallback(self.ConfdCallback)
965
    counting_callback = confd_client.ConfdCountingCallback(filter_callback)
966
    self.confd_counting_callback = counting_callback
967

    
968
    self.confd_client = confd_client.GetConfdClient(counting_callback)
969

    
970
    req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
971
    self.DoConfdRequestReply(req)
972

    
973
    req = confd_client.ConfdClientRequest(
974
      type=constants.CONFD_REQ_CLUSTER_MASTER)
975
    self.DoConfdRequestReply(req)
976

    
977
    req = confd_client.ConfdClientRequest(
978
        type=constants.CONFD_REQ_NODE_ROLE_BYNAME,
979
        query=self.cluster_info["master"])
980
    self.DoConfdRequestReply(req)
981

    
982
  def _CheckInstanceAlive(self, instance):
983
    """Check if an instance is alive by doing http checks.
984

    
985
    This will try to retrieve the url on the instance /hostname.txt
986
    and check that it contains the hostname of the instance. In case
987
    we get ECONNREFUSED, we retry up to the net timeout seconds, for
988
    any other error we abort.
989

    
990
    """
991
    if not self.opts.http_check:
992
      return
993
    end_time = time.time() + self.opts.net_timeout
994
    url = None
995
    while time.time() < end_time and url is None:
996
      try:
997
        url = self.url_opener.open("http://%s/hostname.txt" % instance)
998
      except IOError:
999
        # here we can have connection refused, no route to host, etc.
1000
        time.sleep(1)
1001
    if url is None:
1002
      raise InstanceDown(instance, "Cannot contact instance")
1003
    hostname = url.read().strip()
1004
    url.close()
1005
    if hostname != instance:
1006
      raise InstanceDown(instance, ("Hostname mismatch, expected %s, got %s" %
1007
                                    (instance, hostname)))
1008

    
1009
  def BurninCluster(self):
1010
    """Test a cluster intensively.
1011

    
1012
    This will create instances and then start/stop/failover them.
1013
    It is safe for existing instances but could impact performance.
1014

    
1015
    """
1016

    
1017
    opts = self.opts
1018

    
1019
    Log("Testing global parameters")
1020

    
1021
    if (len(self.nodes) == 1 and
1022
        opts.disk_template not in (constants.DT_DISKLESS, constants.DT_PLAIN,
1023
                                   constants.DT_FILE,
1024
                                   constants.DT_SHARED_FILE)):
1025
      Err("When one node is available/selected the disk template must"
1026
          " be 'diskless', 'file' or 'plain'")
1027

    
1028
    if opts.do_confd_tests and not constants.ENABLE_CONFD:
1029
      Err("You selected confd tests but confd was disabled at configure time")
1030

    
1031
    has_err = True
1032
    try:
1033
      self.BurnCreateInstances()
1034

    
1035
      if self.bep[constants.BE_MINMEM] < self.bep[constants.BE_MAXMEM]:
1036
        self.BurnModifyRuntimeMemory()
1037

    
1038
      if opts.do_replace1 and opts.disk_template in constants.DTS_INT_MIRROR:
1039
        self.BurnReplaceDisks1D8()
1040
      if (opts.do_replace2 and len(self.nodes) > 2 and
1041
          opts.disk_template in constants.DTS_INT_MIRROR):
1042
        self.BurnReplaceDisks2()
1043

    
1044
      if (opts.disk_template in constants.DTS_GROWABLE and
1045
          compat.any(n > 0 for n in self.disk_growth)):
1046
        self.BurnGrowDisks()
1047

    
1048
      if opts.do_failover and opts.disk_template in constants.DTS_MIRRORED:
1049
        self.BurnFailover()
1050

    
1051
      if opts.do_migrate:
1052
        if opts.disk_template not in constants.DTS_MIRRORED:
1053
          Log("Skipping migration (disk template %s does not support it)",
1054
              opts.disk_template)
1055
        elif not self.hv_class.CAN_MIGRATE:
1056
          Log("Skipping migration (hypervisor %s does not support it)",
1057
              self.hypervisor)
1058
        else:
1059
          self.BurnMigrate()
1060

    
1061
      if (opts.do_move and len(self.nodes) > 1 and
1062
          opts.disk_template in [constants.DT_PLAIN, constants.DT_FILE]):
1063
        self.BurnMove()
1064

    
1065
      if (opts.do_importexport and
1066
          opts.disk_template not in (constants.DT_DISKLESS,
1067
                                     constants.DT_SHARED_FILE,
1068
                                     constants.DT_FILE)):
1069
        self.BurnImportExport()
1070

    
1071
      if opts.do_reinstall:
1072
        self.BurnReinstall()
1073

    
1074
      if opts.do_reboot:
1075
        self.BurnReboot()
1076

    
1077
      if opts.do_addremove_disks:
1078
        self.BurnAddRemoveDisks()
1079

    
1080
      default_nic_mode = self.cluster_default_nicparams[constants.NIC_MODE]
1081
      # Don't add/remove nics in routed mode, as we would need an ip to add
1082
      # them with
1083
      if opts.do_addremove_nics:
1084
        if default_nic_mode == constants.NIC_MODE_BRIDGED:
1085
          self.BurnAddRemoveNICs()
1086
        else:
1087
          Log("Skipping nic add/remove as the cluster is not in bridged mode")
1088

    
1089
      if opts.do_activate_disks:
1090
        self.BurnActivateDisks()
1091

    
1092
      if opts.rename:
1093
        self.BurnRename()
1094

    
1095
      if opts.do_confd_tests:
1096
        self.BurnConfd()
1097

    
1098
      if opts.do_startstop:
1099
        self.BurnStopStart()
1100

    
1101
      has_err = False
1102
    finally:
1103
      if has_err:
1104
        Log("Error detected: opcode buffer follows:\n\n")
1105
        Log(self.GetFeedbackBuf())
1106
        Log("\n\n")
1107
      if not self.opts.keep_instances:
1108
        try:
1109
          self.BurnRemove()
1110
        except Exception, err:  # pylint: disable=W0703
1111
          if has_err: # already detected errors, so errors in removal
1112
                      # are quite expected
1113
            Log("Note: error detected during instance remove: %s", err)
1114
          else: # non-expected error
1115
            raise
1116

    
1117
    return constants.EXIT_SUCCESS
1118

    
1119

    
1120
def main():
1121
  """Main function.
1122

    
1123
  """
1124
  utils.SetupLogging(pathutils.LOG_BURNIN, sys.argv[0],
1125
                     debug=False, stderr_logging=True)
1126

    
1127
  return Burner().BurninCluster()
1128

    
1129

    
1130
if __name__ == "__main__":
1131
  main()