Statistics
| Branch: | Tag: | Revision:

root / tools / burnin @ 7181fba0

History | View | Annotate | Download (40.4 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Burnin program
23

    
24
"""
25

    
26
import sys
27
import optparse
28
import time
29
import socket
30
import urllib
31
from itertools import izip, islice, cycle
32
from cStringIO import StringIO
33

    
34
from ganeti import opcodes
35
from ganeti import constants
36
from ganeti import cli
37
from ganeti import errors
38
from ganeti import utils
39
from ganeti import hypervisor
40
from ganeti import compat
41

    
42
from ganeti.confd import client as confd_client
43

    
44

    
45
USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...")
46

    
47
MAX_RETRIES = 3
48
LOG_HEADERS = {
49
  0: "- ",
50
  1: "* ",
51
  2: ""
52
  }
53

    
54

    
55
class InstanceDown(Exception):
56
  """The checked instance was not up"""
57

    
58

    
59
class BurninFailure(Exception):
60
  """Failure detected during burning"""
61

    
62

    
63
def Usage():
64
  """Shows program usage information and exits the program."""
65

    
66
  print >> sys.stderr, "Usage:"
67
  print >> sys.stderr, USAGE
68
  sys.exit(2)
69

    
70

    
71
def Log(msg, *args, **kwargs):
72
  """Simple function that prints out its argument.
73

    
74
  """
75
  if args:
76
    msg = msg % args
77
  indent = kwargs.get("indent", 0)
78
  sys.stdout.write("%*s%s%s\n" % (2 * indent, "",
79
                                  LOG_HEADERS.get(indent, "  "), msg))
80
  sys.stdout.flush()
81

    
82

    
83
def Err(msg, exit_code=1):
84
  """Simple error logging that prints to stderr.
85

    
86
  """
87
  sys.stderr.write(msg + "\n")
88
  sys.stderr.flush()
89
  sys.exit(exit_code)
90

    
91

    
92
class SimpleOpener(urllib.FancyURLopener):
93
  """A simple url opener"""
94
  # pylint: disable=W0221
95

    
96
  def prompt_user_passwd(self, host, realm, clear_cache=0):
97
    """No-interaction version of prompt_user_passwd."""
98
    # we follow parent class' API
99
    # pylint: disable=W0613
100
    return None, None
101

    
102
  def http_error_default(self, url, fp, errcode, errmsg, headers):
103
    """Custom error handling"""
104
    # make sure sockets are not left in CLOSE_WAIT, this is similar
105
    # but with a different exception to the BasicURLOpener class
106
    _ = fp.read() # throw away data
107
    fp.close()
108
    raise InstanceDown("HTTP error returned: code %s, msg %s" %
109
                       (errcode, errmsg))
110

    
111

    
112
OPTIONS = [
113
  cli.cli_option("-o", "--os", dest="os", default=None,
114
                 help="OS to use during burnin",
115
                 metavar="<OS>",
116
                 completion_suggest=cli.OPT_COMPL_ONE_OS),
117
  cli.HYPERVISOR_OPT,
118
  cli.OSPARAMS_OPT,
119
  cli.cli_option("--disk-size", dest="disk_size",
120
                 help="Disk size (determines disk count)",
121
                 default="128m", type="string", metavar="<size,size,...>",
122
                 completion_suggest=("128M 512M 1G 4G 1G,256M"
123
                                     " 4G,1G,1G 10G").split()),
124
  cli.cli_option("--disk-growth", dest="disk_growth", help="Disk growth",
125
                 default="128m", type="string", metavar="<size,size,...>"),
126
  cli.cli_option("--mem-size", dest="mem_size", help="Memory size",
127
                 default=None, type="unit", metavar="<size>",
128
                 completion_suggest=("128M 256M 512M 1G 4G 8G"
129
                                     " 12G 16G").split()),
130
  cli.cli_option("--maxmem-size", dest="maxmem_size", help="Max Memory size",
131
                 default=256, type="unit", metavar="<size>",
132
                 completion_suggest=("128M 256M 512M 1G 4G 8G"
133
                                     " 12G 16G").split()),
134
  cli.cli_option("--minmem-size", dest="minmem_size", help="Min Memory size",
135
                 default=128, type="unit", metavar="<size>",
136
                 completion_suggest=("128M 256M 512M 1G 4G 8G"
137
                                     " 12G 16G").split()),
138
  cli.cli_option("--vcpu-count", dest="vcpu_count", help="VCPU count",
139
                 default=3, type="unit", metavar="<count>",
140
                 completion_suggest=("1 2 3 4").split()),
141
  cli.DEBUG_OPT,
142
  cli.VERBOSE_OPT,
143
  cli.NOIPCHECK_OPT,
144
  cli.NONAMECHECK_OPT,
145
  cli.EARLY_RELEASE_OPT,
146
  cli.cli_option("--no-replace1", dest="do_replace1",
147
                 help="Skip disk replacement with the same secondary",
148
                 action="store_false", default=True),
149
  cli.cli_option("--no-replace2", dest="do_replace2",
150
                 help="Skip disk replacement with a different secondary",
151
                 action="store_false", default=True),
152
  cli.cli_option("--no-failover", dest="do_failover",
153
                 help="Skip instance failovers", action="store_false",
154
                 default=True),
155
  cli.cli_option("--no-migrate", dest="do_migrate",
156
                 help="Skip instance live migration",
157
                 action="store_false", default=True),
158
  cli.cli_option("--no-move", dest="do_move",
159
                 help="Skip instance moves", action="store_false",
160
                 default=True),
161
  cli.cli_option("--no-importexport", dest="do_importexport",
162
                 help="Skip instance export/import", action="store_false",
163
                 default=True),
164
  cli.cli_option("--no-startstop", dest="do_startstop",
165
                 help="Skip instance stop/start", action="store_false",
166
                 default=True),
167
  cli.cli_option("--no-reinstall", dest="do_reinstall",
168
                 help="Skip instance reinstall", action="store_false",
169
                 default=True),
170
  cli.cli_option("--no-reboot", dest="do_reboot",
171
                 help="Skip instance reboot", action="store_false",
172
                 default=True),
173
  cli.cli_option("--reboot-types", dest="reboot_types",
174
                 help="Specify the reboot types", default=None),
175
  cli.cli_option("--no-activate-disks", dest="do_activate_disks",
176
                 help="Skip disk activation/deactivation",
177
                 action="store_false", default=True),
178
  cli.cli_option("--no-add-disks", dest="do_addremove_disks",
179
                 help="Skip disk addition/removal",
180
                 action="store_false", default=True),
181
  cli.cli_option("--no-add-nics", dest="do_addremove_nics",
182
                 help="Skip NIC addition/removal",
183
                 action="store_false", default=True),
184
  cli.cli_option("--no-nics", dest="nics",
185
                 help="No network interfaces", action="store_const",
186
                 const=[], default=[{}]),
187
  cli.cli_option("--no-confd", dest="do_confd_tests",
188
                 help="Skip confd queries",
189
                 action="store_false", default=constants.ENABLE_CONFD),
190
  cli.cli_option("--rename", dest="rename", default=None,
191
                 help=("Give one unused instance name which is taken"
192
                       " to start the renaming sequence"),
193
                 metavar="<instance_name>"),
194
  cli.cli_option("-t", "--disk-template", dest="disk_template",
195
                 choices=list(constants.DISK_TEMPLATES),
196
                 default=constants.DT_DRBD8,
197
                 help="Disk template (diskless, file, plain, sharedfile"
198
                 " or drbd) [drbd]"),
199
  cli.cli_option("-n", "--nodes", dest="nodes", default="",
200
                 help=("Comma separated list of nodes to perform"
201
                       " the burnin on (defaults to all nodes)"),
202
                 completion_suggest=cli.OPT_COMPL_MANY_NODES),
203
  cli.cli_option("-I", "--iallocator", dest="iallocator",
204
                 default=None, type="string",
205
                 help=("Perform the allocation using an iallocator"
206
                       " instead of fixed node spread (node restrictions no"
207
                       " longer apply, therefore -n/--nodes must not be"
208
                       " used"),
209
                 completion_suggest=cli.OPT_COMPL_ONE_IALLOCATOR),
210
  cli.cli_option("-p", "--parallel", default=False, action="store_true",
211
                 dest="parallel",
212
                 help=("Enable parallelization of some operations in"
213
                       " order to speed burnin or to test granular locking")),
214
  cli.cli_option("--net-timeout", default=15, type="int",
215
                 dest="net_timeout",
216
                 help=("The instance check network timeout in seconds"
217
                       " (defaults to 15 seconds)"),
218
                 completion_suggest="15 60 300 900".split()),
219
  cli.cli_option("-C", "--http-check", default=False, action="store_true",
220
                 dest="http_check",
221
                 help=("Enable checking of instance status via http,"
222
                       " looking for /hostname.txt that should contain the"
223
                       " name of the instance")),
224
  cli.cli_option("-K", "--keep-instances", default=False,
225
                 action="store_true",
226
                 dest="keep_instances",
227
                 help=("Leave instances on the cluster after burnin,"
228
                       " for investigation in case of errors or simply"
229
                       " to use them")),
230
  ]
231

    
232
# Mainly used for bash completion
233
ARGUMENTS = [cli.ArgInstance(min=1)]
234

    
235

    
236
def _DoCheckInstances(fn):
237
  """Decorator for checking instances.
238

    
239
  """
240
  def wrapper(self, *args, **kwargs):
241
    val = fn(self, *args, **kwargs)
242
    for instance in self.instances:
243
      self._CheckInstanceAlive(instance) # pylint: disable=W0212
244
    return val
245

    
246
  return wrapper
247

    
248

    
249
def _DoBatch(retry):
250
  """Decorator for possible batch operations.
251

    
252
  Must come after the _DoCheckInstances decorator (if any).
253

    
254
  @param retry: whether this is a retryable batch, will be
255
      passed to StartBatch
256

    
257
  """
258
  def wrap(fn):
259
    def batched(self, *args, **kwargs):
260
      self.StartBatch(retry)
261
      val = fn(self, *args, **kwargs)
262
      self.CommitQueue()
263
      return val
264
    return batched
265

    
266
  return wrap
267

    
268

    
269
class Burner(object):
270
  """Burner class."""
271

    
272
  def __init__(self):
273
    """Constructor."""
274
    self.url_opener = SimpleOpener()
275
    self._feed_buf = StringIO()
276
    self.nodes = []
277
    self.instances = []
278
    self.to_rem = []
279
    self.queued_ops = []
280
    self.opts = None
281
    self.queue_retry = False
282
    self.disk_count = self.disk_growth = self.disk_size = None
283
    self.hvp = self.bep = None
284
    self.ParseOptions()
285
    self.cl = cli.GetClient()
286
    self.GetState()
287

    
288
  def ClearFeedbackBuf(self):
289
    """Clear the feedback buffer."""
290
    self._feed_buf.truncate(0)
291

    
292
  def GetFeedbackBuf(self):
293
    """Return the contents of the buffer."""
294
    return self._feed_buf.getvalue()
295

    
296
  def Feedback(self, msg):
297
    """Acumulate feedback in our buffer."""
298
    formatted_msg = "%s %s" % (time.ctime(utils.MergeTime(msg[0])), msg[2])
299
    self._feed_buf.write(formatted_msg + "\n")
300
    if self.opts.verbose:
301
      Log(formatted_msg, indent=3)
302

    
303
  def MaybeRetry(self, retry_count, msg, fn, *args):
304
    """Possibly retry a given function execution.
305

    
306
    @type retry_count: int
307
    @param retry_count: retry counter:
308
        - 0: non-retryable action
309
        - 1: last retry for a retryable action
310
        - MAX_RETRIES: original try for a retryable action
311
    @type msg: str
312
    @param msg: the kind of the operation
313
    @type fn: callable
314
    @param fn: the function to be called
315

    
316
    """
317
    try:
318
      val = fn(*args)
319
      if retry_count > 0 and retry_count < MAX_RETRIES:
320
        Log("Idempotent %s succeeded after %d retries",
321
            msg, MAX_RETRIES - retry_count)
322
      return val
323
    except Exception, err: # pylint: disable=W0703
324
      if retry_count == 0:
325
        Log("Non-idempotent %s failed, aborting", msg)
326
        raise
327
      elif retry_count == 1:
328
        Log("Idempotent %s repeated failure, aborting", msg)
329
        raise
330
      else:
331
        Log("Idempotent %s failed, retry #%d/%d: %s",
332
            msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err)
333
        self.MaybeRetry(retry_count - 1, msg, fn, *args)
334

    
335
  def _ExecOp(self, *ops):
336
    """Execute one or more opcodes and manage the exec buffer.
337

    
338
    @return: if only opcode has been passed, we return its result;
339
        otherwise we return the list of results
340

    
341
    """
342
    job_id = cli.SendJob(ops, cl=self.cl)
343
    results = cli.PollJob(job_id, cl=self.cl, feedback_fn=self.Feedback)
344
    if len(ops) == 1:
345
      return results[0]
346
    else:
347
      return results
348

    
349
  def ExecOp(self, retry, *ops):
350
    """Execute one or more opcodes and manage the exec buffer.
351

    
352
    @return: if only opcode has been passed, we return its result;
353
        otherwise we return the list of results
354

    
355
    """
356
    if retry:
357
      rval = MAX_RETRIES
358
    else:
359
      rval = 0
360
    cli.SetGenericOpcodeOpts(ops, self.opts)
361
    return self.MaybeRetry(rval, "opcode", self._ExecOp, *ops)
362

    
363
  def ExecOrQueue(self, name, ops, post_process=None):
364
    """Execute an opcode and manage the exec buffer."""
365
    if self.opts.parallel:
366
      cli.SetGenericOpcodeOpts(ops, self.opts)
367
      self.queued_ops.append((ops, name, post_process))
368
    else:
369
      val = self.ExecOp(self.queue_retry, *ops) # pylint: disable=W0142
370
      if post_process is not None:
371
        post_process()
372
      return val
373

    
374
  def StartBatch(self, retry):
375
    """Start a new batch of jobs.
376

    
377
    @param retry: whether this is a retryable batch
378

    
379
    """
380
    self.queued_ops = []
381
    self.queue_retry = retry
382

    
383
  def CommitQueue(self):
384
    """Execute all submitted opcodes in case of parallel burnin"""
385
    if not self.opts.parallel or not self.queued_ops:
386
      return
387

    
388
    if self.queue_retry:
389
      rval = MAX_RETRIES
390
    else:
391
      rval = 0
392

    
393
    try:
394
      results = self.MaybeRetry(rval, "jobset", self.ExecJobSet,
395
                                self.queued_ops)
396
    finally:
397
      self.queued_ops = []
398
    return results
399

    
400
  def ExecJobSet(self, jobs):
401
    """Execute a set of jobs and return once all are done.
402

    
403
    The method will return the list of results, if all jobs are
404
    successful. Otherwise, OpExecError will be raised from within
405
    cli.py.
406

    
407
    """
408
    self.ClearFeedbackBuf()
409
    jex = cli.JobExecutor(cl=self.cl, feedback_fn=self.Feedback)
410
    for ops, name, _ in jobs:
411
      jex.QueueJob(name, *ops) # pylint: disable=W0142
412
    try:
413
      results = jex.GetResults()
414
    except Exception, err: # pylint: disable=W0703
415
      Log("Jobs failed: %s", err)
416
      raise BurninFailure()
417

    
418
    fail = False
419
    val = []
420
    for (_, name, post_process), (success, result) in zip(jobs, results):
421
      if success:
422
        if post_process:
423
          try:
424
            post_process()
425
          except Exception, err: # pylint: disable=W0703
426
            Log("Post process call for job %s failed: %s", name, err)
427
            fail = True
428
        val.append(result)
429
      else:
430
        fail = True
431

    
432
    if fail:
433
      raise BurninFailure()
434

    
435
    return val
436

    
437
  def ParseOptions(self):
438
    """Parses the command line options.
439

    
440
    In case of command line errors, it will show the usage and exit the
441
    program.
442

    
443
    """
444
    parser = optparse.OptionParser(usage="\n%s" % USAGE,
445
                                   version=("%%prog (ganeti) %s" %
446
                                            constants.RELEASE_VERSION),
447
                                   option_list=OPTIONS)
448

    
449
    options, args = parser.parse_args()
450
    if len(args) < 1 or options.os is None:
451
      Usage()
452

    
453
    if options.mem_size:
454
      options.maxmem_size = options.mem_size
455
      options.minmem_size = options.mem_size
456
    elif options.minmem_size > options.maxmem_size:
457
      Err("Maximum memory lower than minimum memory")
458

    
459
    supported_disk_templates = (constants.DT_DISKLESS,
460
                                constants.DT_FILE,
461
                                constants.DT_SHARED_FILE,
462
                                constants.DT_PLAIN,
463
                                constants.DT_DRBD8,
464
                                constants.DT_RBD,
465
                                )
466
    if options.disk_template not in supported_disk_templates:
467
      Err("Unknown disk template '%s'" % options.disk_template)
468

    
469
    if options.disk_template == constants.DT_DISKLESS:
470
      disk_size = disk_growth = []
471
      options.do_addremove_disks = False
472
    else:
473
      disk_size = [utils.ParseUnit(v) for v in options.disk_size.split(",")]
474
      disk_growth = [utils.ParseUnit(v)
475
                     for v in options.disk_growth.split(",")]
476
      if len(disk_growth) != len(disk_size):
477
        Err("Wrong disk sizes/growth combination")
478
    if ((disk_size and options.disk_template == constants.DT_DISKLESS) or
479
        (not disk_size and options.disk_template != constants.DT_DISKLESS)):
480
      Err("Wrong disk count/disk template combination")
481

    
482
    self.disk_size = disk_size
483
    self.disk_growth = disk_growth
484
    self.disk_count = len(disk_size)
485

    
486
    if options.nodes and options.iallocator:
487
      Err("Give either the nodes option or the iallocator option, not both")
488

    
489
    if options.http_check and not options.name_check:
490
      Err("Can't enable HTTP checks without name checks")
491

    
492
    self.opts = options
493
    self.instances = args
494
    self.bep = {
495
      constants.BE_MINMEM: options.minmem_size,
496
      constants.BE_MAXMEM: options.maxmem_size,
497
      constants.BE_VCPUS: options.vcpu_count,
498
      }
499

    
500
    self.hypervisor = None
501
    self.hvp = {}
502
    if options.hypervisor:
503
      self.hypervisor, self.hvp = options.hypervisor
504

    
505
    if options.reboot_types is None:
506
      options.reboot_types = constants.REBOOT_TYPES
507
    else:
508
      options.reboot_types = options.reboot_types.split(",")
509
      rt_diff = set(options.reboot_types).difference(constants.REBOOT_TYPES)
510
      if rt_diff:
511
        Err("Invalid reboot types specified: %s" % utils.CommaJoin(rt_diff))
512

    
513
    socket.setdefaulttimeout(options.net_timeout)
514

    
515
  def GetState(self):
516
    """Read the cluster state from the master daemon."""
517
    if self.opts.nodes:
518
      names = self.opts.nodes.split(",")
519
    else:
520
      names = []
521
    try:
522
      op = opcodes.OpNodeQuery(output_fields=["name", "offline", "drained"],
523
                               names=names, use_locking=True)
524
      result = self.ExecOp(True, op)
525
    except errors.GenericError, err:
526
      err_code, msg = cli.FormatError(err)
527
      Err(msg, exit_code=err_code)
528
    self.nodes = [data[0] for data in result if not (data[1] or data[2])]
529

    
530
    op_diagnose = opcodes.OpOsDiagnose(output_fields=["name",
531
                                                      "variants",
532
                                                      "hidden"],
533
                                       names=[])
534
    result = self.ExecOp(True, op_diagnose)
535

    
536
    if not result:
537
      Err("Can't get the OS list")
538

    
539
    found = False
540
    for (name, variants, _) in result:
541
      if self.opts.os in cli.CalculateOSNames(name, variants):
542
        found = True
543
        break
544

    
545
    if not found:
546
      Err("OS '%s' not found" % self.opts.os)
547

    
548
    cluster_info = self.cl.QueryClusterInfo()
549
    self.cluster_info = cluster_info
550
    if not self.cluster_info:
551
      Err("Can't get cluster info")
552

    
553
    default_nic_params = self.cluster_info["nicparams"][constants.PP_DEFAULT]
554
    self.cluster_default_nicparams = default_nic_params
555
    if self.hypervisor is None:
556
      self.hypervisor = self.cluster_info["default_hypervisor"]
557
    self.hv_class = hypervisor.GetHypervisorClass(self.hypervisor)
558

    
559
  @_DoCheckInstances
560
  @_DoBatch(False)
561
  def BurnCreateInstances(self):
562
    """Create the given instances.
563

    
564
    """
565
    self.to_rem = []
566
    mytor = izip(cycle(self.nodes),
567
                 islice(cycle(self.nodes), 1, None),
568
                 self.instances)
569

    
570
    Log("Creating instances")
571
    for pnode, snode, instance in mytor:
572
      Log("instance %s", instance, indent=1)
573
      if self.opts.iallocator:
574
        pnode = snode = None
575
        msg = "with iallocator %s" % self.opts.iallocator
576
      elif self.opts.disk_template not in constants.DTS_INT_MIRROR:
577
        snode = None
578
        msg = "on %s" % pnode
579
      else:
580
        msg = "on %s, %s" % (pnode, snode)
581

    
582
      Log(msg, indent=2)
583

    
584
      op = opcodes.OpInstanceCreate(instance_name=instance,
585
                                    disks=[{"size": size}
586
                                           for size in self.disk_size],
587
                                    disk_template=self.opts.disk_template,
588
                                    nics=self.opts.nics,
589
                                    mode=constants.INSTANCE_CREATE,
590
                                    os_type=self.opts.os,
591
                                    pnode=pnode,
592
                                    snode=snode,
593
                                    start=True,
594
                                    ip_check=self.opts.ip_check,
595
                                    name_check=self.opts.name_check,
596
                                    wait_for_sync=True,
597
                                    file_driver="loop",
598
                                    file_storage_dir=None,
599
                                    iallocator=self.opts.iallocator,
600
                                    beparams=self.bep,
601
                                    hvparams=self.hvp,
602
                                    hypervisor=self.hypervisor,
603
                                    osparams=self.opts.osparams,
604
                                    )
605
      remove_instance = lambda name: lambda: self.to_rem.append(name)
606
      self.ExecOrQueue(instance, [op], post_process=remove_instance(instance))
607

    
608
  @_DoBatch(False)
609
  def BurnModifyRuntimeMemory(self):
610
    """Alter the runtime memory."""
611
    Log("Setting instance runtime memory")
612
    for instance in self.instances:
613
      Log("instance %s", instance, indent=1)
614
      tgt_mem = self.bep[constants.BE_MINMEM]
615
      op = opcodes.OpInstanceSetParams(instance_name=instance,
616
                                       runtime_mem=tgt_mem)
617
      Log("Set memory to %s MB", tgt_mem, indent=2)
618
      self.ExecOrQueue(instance, [op])
619

    
620
  @_DoBatch(False)
621
  def BurnGrowDisks(self):
622
    """Grow both the os and the swap disks by the requested amount, if any."""
623
    Log("Growing disks")
624
    for instance in self.instances:
625
      Log("instance %s", instance, indent=1)
626
      for idx, growth in enumerate(self.disk_growth):
627
        if growth > 0:
628
          op = opcodes.OpInstanceGrowDisk(instance_name=instance, disk=idx,
629
                                          amount=growth, wait_for_sync=True)
630
          Log("increase disk/%s by %s MB", idx, growth, indent=2)
631
          self.ExecOrQueue(instance, [op])
632

    
633
  @_DoBatch(True)
634
  def BurnReplaceDisks1D8(self):
635
    """Replace disks on primary and secondary for drbd8."""
636
    Log("Replacing disks on the same nodes")
637
    early_release = self.opts.early_release
638
    for instance in self.instances:
639
      Log("instance %s", instance, indent=1)
640
      ops = []
641
      for mode in constants.REPLACE_DISK_SEC, constants.REPLACE_DISK_PRI:
642
        op = opcodes.OpInstanceReplaceDisks(instance_name=instance,
643
                                            mode=mode,
644
                                            disks=list(range(self.disk_count)),
645
                                            early_release=early_release)
646
        Log("run %s", mode, indent=2)
647
        ops.append(op)
648
      self.ExecOrQueue(instance, ops)
649

    
650
  @_DoBatch(True)
651
  def BurnReplaceDisks2(self):
652
    """Replace secondary node."""
653
    Log("Changing the secondary node")
654
    mode = constants.REPLACE_DISK_CHG
655

    
656
    mytor = izip(islice(cycle(self.nodes), 2, None),
657
                 self.instances)
658
    for tnode, instance in mytor:
659
      Log("instance %s", instance, indent=1)
660
      if self.opts.iallocator:
661
        tnode = None
662
        msg = "with iallocator %s" % self.opts.iallocator
663
      else:
664
        msg = tnode
665
      op = opcodes.OpInstanceReplaceDisks(instance_name=instance,
666
                                          mode=mode,
667
                                          remote_node=tnode,
668
                                          iallocator=self.opts.iallocator,
669
                                          disks=[],
670
                                          early_release=self.opts.early_release)
671
      Log("run %s %s", mode, msg, indent=2)
672
      self.ExecOrQueue(instance, [op])
673

    
674
  @_DoCheckInstances
675
  @_DoBatch(False)
676
  def BurnFailover(self):
677
    """Failover the instances."""
678
    Log("Failing over instances")
679
    for instance in self.instances:
680
      Log("instance %s", instance, indent=1)
681
      op = opcodes.OpInstanceFailover(instance_name=instance,
682
                                      ignore_consistency=False)
683
      self.ExecOrQueue(instance, [op])
684

    
685
  @_DoCheckInstances
686
  @_DoBatch(False)
687
  def BurnMove(self):
688
    """Move the instances."""
689
    Log("Moving instances")
690
    mytor = izip(islice(cycle(self.nodes), 1, None),
691
                 self.instances)
692
    for tnode, instance in mytor:
693
      Log("instance %s", instance, indent=1)
694
      op = opcodes.OpInstanceMove(instance_name=instance,
695
                                  target_node=tnode)
696
      self.ExecOrQueue(instance, [op])
697

    
698
  @_DoBatch(False)
699
  def BurnMigrate(self):
700
    """Migrate the instances."""
701
    Log("Migrating instances")
702
    for instance in self.instances:
703
      Log("instance %s", instance, indent=1)
704
      op1 = opcodes.OpInstanceMigrate(instance_name=instance, mode=None,
705
                                      cleanup=False)
706

    
707
      op2 = opcodes.OpInstanceMigrate(instance_name=instance, mode=None,
708
                                      cleanup=True)
709
      Log("migration and migration cleanup", indent=2)
710
      self.ExecOrQueue(instance, [op1, op2])
711

    
712
  @_DoCheckInstances
713
  @_DoBatch(False)
714
  def BurnImportExport(self):
715
    """Export the instance, delete it, and import it back.
716

    
717
    """
718
    Log("Exporting and re-importing instances")
719
    mytor = izip(cycle(self.nodes),
720
                 islice(cycle(self.nodes), 1, None),
721
                 islice(cycle(self.nodes), 2, None),
722
                 self.instances)
723

    
724
    for pnode, snode, enode, instance in mytor:
725
      Log("instance %s", instance, indent=1)
726
      # read the full name of the instance
727
      nam_op = opcodes.OpInstanceQuery(output_fields=["name"],
728
                                       names=[instance], use_locking=True)
729
      full_name = self.ExecOp(False, nam_op)[0][0]
730

    
731
      if self.opts.iallocator:
732
        pnode = snode = None
733
        import_log_msg = ("import from %s"
734
                          " with iallocator %s" %
735
                          (enode, self.opts.iallocator))
736
      elif self.opts.disk_template not in constants.DTS_INT_MIRROR:
737
        snode = None
738
        import_log_msg = ("import from %s to %s" %
739
                          (enode, pnode))
740
      else:
741
        import_log_msg = ("import from %s to %s, %s" %
742
                          (enode, pnode, snode))
743

    
744
      exp_op = opcodes.OpBackupExport(instance_name=instance,
745
                                      target_node=enode,
746
                                      mode=constants.EXPORT_MODE_LOCAL,
747
                                      shutdown=True)
748
      rem_op = opcodes.OpInstanceRemove(instance_name=instance,
749
                                        ignore_failures=True)
750
      imp_dir = utils.PathJoin(constants.EXPORT_DIR, full_name)
751
      imp_op = opcodes.OpInstanceCreate(instance_name=instance,
752
                                        disks=[{"size": size}
753
                                               for size in self.disk_size],
754
                                        disk_template=self.opts.disk_template,
755
                                        nics=self.opts.nics,
756
                                        mode=constants.INSTANCE_IMPORT,
757
                                        src_node=enode,
758
                                        src_path=imp_dir,
759
                                        pnode=pnode,
760
                                        snode=snode,
761
                                        start=True,
762
                                        ip_check=self.opts.ip_check,
763
                                        name_check=self.opts.name_check,
764
                                        wait_for_sync=True,
765
                                        file_storage_dir=None,
766
                                        file_driver="loop",
767
                                        iallocator=self.opts.iallocator,
768
                                        beparams=self.bep,
769
                                        hvparams=self.hvp,
770
                                        osparams=self.opts.osparams,
771
                                        )
772

    
773
      erem_op = opcodes.OpBackupRemove(instance_name=instance)
774

    
775
      Log("export to node %s", enode, indent=2)
776
      Log("remove instance", indent=2)
777
      Log(import_log_msg, indent=2)
778
      Log("remove export", indent=2)
779
      self.ExecOrQueue(instance, [exp_op, rem_op, imp_op, erem_op])
780

    
781
  @staticmethod
782
  def StopInstanceOp(instance):
783
    """Stop given instance."""
784
    return opcodes.OpInstanceShutdown(instance_name=instance)
785

    
786
  @staticmethod
787
  def StartInstanceOp(instance):
788
    """Start given instance."""
789
    return opcodes.OpInstanceStartup(instance_name=instance, force=False)
790

    
791
  @staticmethod
792
  def RenameInstanceOp(instance, instance_new):
793
    """Rename instance."""
794
    return opcodes.OpInstanceRename(instance_name=instance,
795
                                    new_name=instance_new)
796

    
797
  @_DoCheckInstances
798
  @_DoBatch(True)
799
  def BurnStopStart(self):
800
    """Stop/start the instances."""
801
    Log("Stopping and starting instances")
802
    for instance in self.instances:
803
      Log("instance %s", instance, indent=1)
804
      op1 = self.StopInstanceOp(instance)
805
      op2 = self.StartInstanceOp(instance)
806
      self.ExecOrQueue(instance, [op1, op2])
807

    
808
  @_DoBatch(False)
809
  def BurnRemove(self):
810
    """Remove the instances."""
811
    Log("Removing instances")
812
    for instance in self.to_rem:
813
      Log("instance %s", instance, indent=1)
814
      op = opcodes.OpInstanceRemove(instance_name=instance,
815
                                    ignore_failures=True)
816
      self.ExecOrQueue(instance, [op])
817

    
818
  def BurnRename(self):
819
    """Rename the instances.
820

    
821
    Note that this function will not execute in parallel, since we
822
    only have one target for rename.
823

    
824
    """
825
    Log("Renaming instances")
826
    rename = self.opts.rename
827
    for instance in self.instances:
828
      Log("instance %s", instance, indent=1)
829
      op_stop1 = self.StopInstanceOp(instance)
830
      op_stop2 = self.StopInstanceOp(rename)
831
      op_rename1 = self.RenameInstanceOp(instance, rename)
832
      op_rename2 = self.RenameInstanceOp(rename, instance)
833
      op_start1 = self.StartInstanceOp(rename)
834
      op_start2 = self.StartInstanceOp(instance)
835
      self.ExecOp(False, op_stop1, op_rename1, op_start1)
836
      self._CheckInstanceAlive(rename)
837
      self.ExecOp(False, op_stop2, op_rename2, op_start2)
838
      self._CheckInstanceAlive(instance)
839

    
840
  @_DoCheckInstances
841
  @_DoBatch(True)
842
  def BurnReinstall(self):
843
    """Reinstall the instances."""
844
    Log("Reinstalling instances")
845
    for instance in self.instances:
846
      Log("instance %s", instance, indent=1)
847
      op1 = self.StopInstanceOp(instance)
848
      op2 = opcodes.OpInstanceReinstall(instance_name=instance)
849
      Log("reinstall without passing the OS", indent=2)
850
      op3 = opcodes.OpInstanceReinstall(instance_name=instance,
851
                                        os_type=self.opts.os)
852
      Log("reinstall specifying the OS", indent=2)
853
      op4 = self.StartInstanceOp(instance)
854
      self.ExecOrQueue(instance, [op1, op2, op3, op4])
855

    
856
  @_DoCheckInstances
857
  @_DoBatch(True)
858
  def BurnReboot(self):
859
    """Reboot the instances."""
860
    Log("Rebooting instances")
861
    for instance in self.instances:
862
      Log("instance %s", instance, indent=1)
863
      ops = []
864
      for reboot_type in self.opts.reboot_types:
865
        op = opcodes.OpInstanceReboot(instance_name=instance,
866
                                      reboot_type=reboot_type,
867
                                      ignore_secondaries=False)
868
        Log("reboot with type '%s'", reboot_type, indent=2)
869
        ops.append(op)
870
      self.ExecOrQueue(instance, ops)
871

    
872
  @_DoCheckInstances
873
  @_DoBatch(True)
874
  def BurnActivateDisks(self):
875
    """Activate and deactivate disks of the instances."""
876
    Log("Activating/deactivating disks")
877
    for instance in self.instances:
878
      Log("instance %s", instance, indent=1)
879
      op_start = self.StartInstanceOp(instance)
880
      op_act = opcodes.OpInstanceActivateDisks(instance_name=instance)
881
      op_deact = opcodes.OpInstanceDeactivateDisks(instance_name=instance)
882
      op_stop = self.StopInstanceOp(instance)
883
      Log("activate disks when online", indent=2)
884
      Log("activate disks when offline", indent=2)
885
      Log("deactivate disks (when offline)", indent=2)
886
      self.ExecOrQueue(instance, [op_act, op_stop, op_act, op_deact, op_start])
887

    
888
  @_DoCheckInstances
889
  @_DoBatch(False)
890
  def BurnAddRemoveDisks(self):
891
    """Add and remove an extra disk for the instances."""
892
    Log("Adding and removing disks")
893
    for instance in self.instances:
894
      Log("instance %s", instance, indent=1)
895
      op_add = opcodes.OpInstanceSetParams(\
896
        instance_name=instance,
897
        disks=[(constants.DDM_ADD, {"size": self.disk_size[0]})])
898
      op_rem = opcodes.OpInstanceSetParams(\
899
        instance_name=instance, disks=[(constants.DDM_REMOVE, {})])
900
      op_stop = self.StopInstanceOp(instance)
901
      op_start = self.StartInstanceOp(instance)
902
      Log("adding a disk", indent=2)
903
      Log("removing last disk", indent=2)
904
      self.ExecOrQueue(instance, [op_add, op_stop, op_rem, op_start])
905

    
906
  @_DoBatch(False)
907
  def BurnAddRemoveNICs(self):
908
    """Add and remove an extra NIC for the instances."""
909
    Log("Adding and removing NICs")
910
    for instance in self.instances:
911
      Log("instance %s", instance, indent=1)
912
      op_add = opcodes.OpInstanceSetParams(\
913
        instance_name=instance, nics=[(constants.DDM_ADD, {})])
914
      op_rem = opcodes.OpInstanceSetParams(\
915
        instance_name=instance, nics=[(constants.DDM_REMOVE, {})])
916
      Log("adding a NIC", indent=2)
917
      Log("removing last NIC", indent=2)
918
      self.ExecOrQueue(instance, [op_add, op_rem])
919

    
920
  def ConfdCallback(self, reply):
921
    """Callback for confd queries"""
922
    if reply.type == confd_client.UPCALL_REPLY:
923
      if reply.server_reply.status != constants.CONFD_REPL_STATUS_OK:
924
        Err("Query %s gave non-ok status %s: %s" % (reply.orig_request,
925
                                                    reply.server_reply.status,
926
                                                    reply.server_reply))
927
      if reply.orig_request.type == constants.CONFD_REQ_PING:
928
        Log("Ping: OK", indent=1)
929
      elif reply.orig_request.type == constants.CONFD_REQ_CLUSTER_MASTER:
930
        if reply.server_reply.answer == self.cluster_info["master"]:
931
          Log("Master: OK", indent=1)
932
        else:
933
          Err("Master: wrong: %s" % reply.server_reply.answer)
934
      elif reply.orig_request.type == constants.CONFD_REQ_NODE_ROLE_BYNAME:
935
        if reply.server_reply.answer == constants.CONFD_NODE_ROLE_MASTER:
936
          Log("Node role for master: OK", indent=1)
937
        else:
938
          Err("Node role for master: wrong: %s" % reply.server_reply.answer)
939

    
940
  def DoConfdRequestReply(self, req):
941
    self.confd_counting_callback.RegisterQuery(req.rsalt)
942
    self.confd_client.SendRequest(req, async=False)
943
    while not self.confd_counting_callback.AllAnswered():
944
      if not self.confd_client.ReceiveReply():
945
        Err("Did not receive all expected confd replies")
946
        break
947

    
948
  def BurnConfd(self):
949
    """Run confd queries for our instances.
950

    
951
    The following confd queries are tested:
952
      - CONFD_REQ_PING: simple ping
953
      - CONFD_REQ_CLUSTER_MASTER: cluster master
954
      - CONFD_REQ_NODE_ROLE_BYNAME: node role, for the master
955

    
956
    """
957
    Log("Checking confd results")
958

    
959
    filter_callback = confd_client.ConfdFilterCallback(self.ConfdCallback)
960
    counting_callback = confd_client.ConfdCountingCallback(filter_callback)
961
    self.confd_counting_callback = counting_callback
962

    
963
    self.confd_client = confd_client.GetConfdClient(counting_callback)
964

    
965
    req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
966
    self.DoConfdRequestReply(req)
967

    
968
    req = confd_client.ConfdClientRequest(
969
      type=constants.CONFD_REQ_CLUSTER_MASTER)
970
    self.DoConfdRequestReply(req)
971

    
972
    req = confd_client.ConfdClientRequest(
973
        type=constants.CONFD_REQ_NODE_ROLE_BYNAME,
974
        query=self.cluster_info["master"])
975
    self.DoConfdRequestReply(req)
976

    
977
  def _CheckInstanceAlive(self, instance):
978
    """Check if an instance is alive by doing http checks.
979

    
980
    This will try to retrieve the url on the instance /hostname.txt
981
    and check that it contains the hostname of the instance. In case
982
    we get ECONNREFUSED, we retry up to the net timeout seconds, for
983
    any other error we abort.
984

    
985
    """
986
    if not self.opts.http_check:
987
      return
988
    end_time = time.time() + self.opts.net_timeout
989
    url = None
990
    while time.time() < end_time and url is None:
991
      try:
992
        url = self.url_opener.open("http://%s/hostname.txt" % instance)
993
      except IOError:
994
        # here we can have connection refused, no route to host, etc.
995
        time.sleep(1)
996
    if url is None:
997
      raise InstanceDown(instance, "Cannot contact instance")
998
    hostname = url.read().strip()
999
    url.close()
1000
    if hostname != instance:
1001
      raise InstanceDown(instance, ("Hostname mismatch, expected %s, got %s" %
1002
                                    (instance, hostname)))
1003

    
1004
  def BurninCluster(self):
1005
    """Test a cluster intensively.
1006

    
1007
    This will create instances and then start/stop/failover them.
1008
    It is safe for existing instances but could impact performance.
1009

    
1010
    """
1011

    
1012
    opts = self.opts
1013

    
1014
    Log("Testing global parameters")
1015

    
1016
    if (len(self.nodes) == 1 and
1017
        opts.disk_template not in (constants.DT_DISKLESS, constants.DT_PLAIN,
1018
                                   constants.DT_FILE,
1019
                                   constants.DT_SHARED_FILE)):
1020
      Err("When one node is available/selected the disk template must"
1021
          " be 'diskless', 'file' or 'plain'")
1022

    
1023
    if opts.do_confd_tests and not constants.ENABLE_CONFD:
1024
      Err("You selected confd tests but confd was disabled at configure time")
1025

    
1026
    has_err = True
1027
    try:
1028
      self.BurnCreateInstances()
1029

    
1030
      if self.bep[constants.BE_MINMEM] < self.bep[constants.BE_MAXMEM]:
1031
        self.BurnModifyRuntimeMemory()
1032

    
1033
      if opts.do_replace1 and opts.disk_template in constants.DTS_INT_MIRROR:
1034
        self.BurnReplaceDisks1D8()
1035
      if (opts.do_replace2 and len(self.nodes) > 2 and
1036
          opts.disk_template in constants.DTS_INT_MIRROR):
1037
        self.BurnReplaceDisks2()
1038

    
1039
      if (opts.disk_template in constants.DTS_GROWABLE and
1040
          compat.any(n > 0 for n in self.disk_growth)):
1041
        self.BurnGrowDisks()
1042

    
1043
      if opts.do_failover and opts.disk_template in constants.DTS_MIRRORED:
1044
        self.BurnFailover()
1045

    
1046
      if opts.do_migrate:
1047
        if opts.disk_template not in constants.DTS_MIRRORED:
1048
          Log("Skipping migration (disk template %s does not support it)",
1049
              opts.disk_template)
1050
        elif not self.hv_class.CAN_MIGRATE:
1051
          Log("Skipping migration (hypervisor %s does not support it)",
1052
              self.hypervisor)
1053
        else:
1054
          self.BurnMigrate()
1055

    
1056
      if (opts.do_move and len(self.nodes) > 1 and
1057
          opts.disk_template in [constants.DT_PLAIN, constants.DT_FILE]):
1058
        self.BurnMove()
1059

    
1060
      if (opts.do_importexport and
1061
          opts.disk_template not in (constants.DT_DISKLESS,
1062
                                     constants.DT_SHARED_FILE,
1063
                                     constants.DT_FILE)):
1064
        self.BurnImportExport()
1065

    
1066
      if opts.do_reinstall:
1067
        self.BurnReinstall()
1068

    
1069
      if opts.do_reboot:
1070
        self.BurnReboot()
1071

    
1072
      if opts.do_addremove_disks:
1073
        self.BurnAddRemoveDisks()
1074

    
1075
      default_nic_mode = self.cluster_default_nicparams[constants.NIC_MODE]
1076
      # Don't add/remove nics in routed mode, as we would need an ip to add
1077
      # them with
1078
      if opts.do_addremove_nics:
1079
        if default_nic_mode == constants.NIC_MODE_BRIDGED:
1080
          self.BurnAddRemoveNICs()
1081
        else:
1082
          Log("Skipping nic add/remove as the cluster is not in bridged mode")
1083

    
1084
      if opts.do_activate_disks:
1085
        self.BurnActivateDisks()
1086

    
1087
      if opts.rename:
1088
        self.BurnRename()
1089

    
1090
      if opts.do_confd_tests:
1091
        self.BurnConfd()
1092

    
1093
      if opts.do_startstop:
1094
        self.BurnStopStart()
1095

    
1096
      has_err = False
1097
    finally:
1098
      if has_err:
1099
        Log("Error detected: opcode buffer follows:\n\n")
1100
        Log(self.GetFeedbackBuf())
1101
        Log("\n\n")
1102
      if not self.opts.keep_instances:
1103
        try:
1104
          self.BurnRemove()
1105
        except Exception, err:  # pylint: disable=W0703
1106
          if has_err: # already detected errors, so errors in removal
1107
                      # are quite expected
1108
            Log("Note: error detected during instance remove: %s", err)
1109
          else: # non-expected error
1110
            raise
1111

    
1112
    return constants.EXIT_SUCCESS
1113

    
1114

    
1115
def main():
1116
  """Main function.
1117

    
1118
  """
1119
  utils.SetupLogging(constants.LOG_BURNIN, sys.argv[0],
1120
                     debug=False, stderr_logging=True)
1121

    
1122
  return Burner().BurninCluster()
1123

    
1124

    
1125
if __name__ == "__main__":
1126
  main()