Statistics
| Branch: | Tag: | Revision:

root / tools / burnin @ fcad7225

History | View | Annotate | Download (38.8 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Burnin program
23

    
24
"""
25

    
26
import sys
27
import optparse
28
import time
29
import socket
30
import urllib
31
from itertools import izip, islice, cycle
32
from cStringIO import StringIO
33

    
34
from ganeti import opcodes
35
from ganeti import constants
36
from ganeti import cli
37
from ganeti import errors
38
from ganeti import utils
39
from ganeti import hypervisor
40
from ganeti import compat
41

    
42
from ganeti.confd import client as confd_client
43

    
44

    
45
USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...")
46

    
47
MAX_RETRIES = 3
48
LOG_HEADERS = {
49
  0: "- ",
50
  1: "* ",
51
  2: ""
52
  }
53

    
54
class InstanceDown(Exception):
55
  """The checked instance was not up"""
56

    
57

    
58
class BurninFailure(Exception):
59
  """Failure detected during burning"""
60

    
61

    
62
def Usage():
63
  """Shows program usage information and exits the program."""
64

    
65
  print >> sys.stderr, "Usage:"
66
  print >> sys.stderr, USAGE
67
  sys.exit(2)
68

    
69

    
70
def Log(msg, *args, **kwargs):
71
  """Simple function that prints out its argument.
72

    
73
  """
74
  if args:
75
    msg = msg % args
76
  indent = kwargs.get('indent', 0)
77
  sys.stdout.write("%*s%s%s\n" % (2*indent, "",
78
                                  LOG_HEADERS.get(indent, "  "), msg))
79
  sys.stdout.flush()
80

    
81

    
82
def Err(msg, exit_code=1):
83
  """Simple error logging that prints to stderr.
84

    
85
  """
86
  sys.stderr.write(msg + "\n")
87
  sys.stderr.flush()
88
  sys.exit(exit_code)
89

    
90

    
91
class SimpleOpener(urllib.FancyURLopener):
92
  """A simple url opener"""
93
  # pylint: disable-msg=W0221
94

    
95
  def prompt_user_passwd(self, host, realm, clear_cache=0):
96
    """No-interaction version of prompt_user_passwd."""
97
    # we follow parent class' API
98
    # pylint: disable-msg=W0613
99
    return None, None
100

    
101
  def http_error_default(self, url, fp, errcode, errmsg, headers):
102
    """Custom error handling"""
103
    # make sure sockets are not left in CLOSE_WAIT, this is similar
104
    # but with a different exception to the BasicURLOpener class
105
    _ = fp.read() # throw away data
106
    fp.close()
107
    raise InstanceDown("HTTP error returned: code %s, msg %s" %
108
                       (errcode, errmsg))
109

    
110

    
111
OPTIONS = [
112
  cli.cli_option("-o", "--os", dest="os", default=None,
113
                 help="OS to use during burnin",
114
                 metavar="<OS>",
115
                 completion_suggest=cli.OPT_COMPL_ONE_OS),
116
  cli.HYPERVISOR_OPT,
117
  cli.OSPARAMS_OPT,
118
  cli.cli_option("--disk-size", dest="disk_size",
119
                 help="Disk size (determines disk count)",
120
                 default="128m", type="string", metavar="<size,size,...>",
121
                 completion_suggest=("128M 512M 1G 4G 1G,256M"
122
                                     " 4G,1G,1G 10G").split()),
123
  cli.cli_option("--disk-growth", dest="disk_growth", help="Disk growth",
124
                 default="128m", type="string", metavar="<size,size,...>"),
125
  cli.cli_option("--mem-size", dest="mem_size", help="Memory size",
126
                 default=128, type="unit", metavar="<size>",
127
                 completion_suggest=("128M 256M 512M 1G 4G 8G"
128
                                     " 12G 16G").split()),
129
  cli.cli_option("--vcpu-count", dest="vcpu_count", help="VCPU count",
130
                 default=3, type="unit", metavar="<count>",
131
                 completion_suggest=("1 2 3 4").split()),
132
  cli.DEBUG_OPT,
133
  cli.VERBOSE_OPT,
134
  cli.NOIPCHECK_OPT,
135
  cli.NONAMECHECK_OPT,
136
  cli.EARLY_RELEASE_OPT,
137
  cli.cli_option("--no-replace1", dest="do_replace1",
138
                 help="Skip disk replacement with the same secondary",
139
                 action="store_false", default=True),
140
  cli.cli_option("--no-replace2", dest="do_replace2",
141
                 help="Skip disk replacement with a different secondary",
142
                 action="store_false", default=True),
143
  cli.cli_option("--no-failover", dest="do_failover",
144
                 help="Skip instance failovers", action="store_false",
145
                 default=True),
146
  cli.cli_option("--no-migrate", dest="do_migrate",
147
                 help="Skip instance live migration",
148
                 action="store_false", default=True),
149
  cli.cli_option("--no-move", dest="do_move",
150
                 help="Skip instance moves", action="store_false",
151
                 default=True),
152
  cli.cli_option("--no-importexport", dest="do_importexport",
153
                 help="Skip instance export/import", action="store_false",
154
                 default=True),
155
  cli.cli_option("--no-startstop", dest="do_startstop",
156
                 help="Skip instance stop/start", action="store_false",
157
                 default=True),
158
  cli.cli_option("--no-reinstall", dest="do_reinstall",
159
                 help="Skip instance reinstall", action="store_false",
160
                 default=True),
161
  cli.cli_option("--no-reboot", dest="do_reboot",
162
                 help="Skip instance reboot", action="store_false",
163
                 default=True),
164
  cli.cli_option("--reboot-types", dest="reboot_types",
165
                 help="Specify the reboot types", default=None),
166
  cli.cli_option("--no-activate-disks", dest="do_activate_disks",
167
                 help="Skip disk activation/deactivation",
168
                 action="store_false", default=True),
169
  cli.cli_option("--no-add-disks", dest="do_addremove_disks",
170
                 help="Skip disk addition/removal",
171
                 action="store_false", default=True),
172
  cli.cli_option("--no-add-nics", dest="do_addremove_nics",
173
                 help="Skip NIC addition/removal",
174
                 action="store_false", default=True),
175
  cli.cli_option("--no-nics", dest="nics",
176
                 help="No network interfaces", action="store_const",
177
                 const=[], default=[{}]),
178
  cli.cli_option("--no-confd", dest="do_confd_tests",
179
                 help="Skip confd queries",
180
                 action="store_false", default=True),
181
  cli.cli_option("--rename", dest="rename", default=None,
182
                 help=("Give one unused instance name which is taken"
183
                       " to start the renaming sequence"),
184
                 metavar="<instance_name>"),
185
  cli.cli_option("-t", "--disk-template", dest="disk_template",
186
                 choices=list(constants.DISK_TEMPLATES),
187
                 default=constants.DT_DRBD8,
188
                 help="Disk template (diskless, file, plain, sharedfile"
189
                 " or drbd) [drbd]"),
190
  cli.cli_option("-n", "--nodes", dest="nodes", default="",
191
                 help=("Comma separated list of nodes to perform"
192
                       " the burnin on (defaults to all nodes)"),
193
                 completion_suggest=cli.OPT_COMPL_MANY_NODES),
194
  cli.cli_option("-I", "--iallocator", dest="iallocator",
195
                 default=None, type="string",
196
                 help=("Perform the allocation using an iallocator"
197
                       " instead of fixed node spread (node restrictions no"
198
                       " longer apply, therefore -n/--nodes must not be"
199
                       " used"),
200
                 completion_suggest=cli.OPT_COMPL_ONE_IALLOCATOR),
201
  cli.cli_option("-p", "--parallel", default=False, action="store_true",
202
                 dest="parallel",
203
                 help=("Enable parallelization of some operations in"
204
                       " order to speed burnin or to test granular locking")),
205
  cli.cli_option("--net-timeout", default=15, type="int",
206
                 dest="net_timeout",
207
                 help=("The instance check network timeout in seconds"
208
                       " (defaults to 15 seconds)"),
209
                 completion_suggest="15 60 300 900".split()),
210
  cli.cli_option("-C", "--http-check", default=False, action="store_true",
211
                 dest="http_check",
212
                 help=("Enable checking of instance status via http,"
213
                       " looking for /hostname.txt that should contain the"
214
                       " name of the instance")),
215
  cli.cli_option("-K", "--keep-instances", default=False,
216
                 action="store_true",
217
                 dest="keep_instances",
218
                 help=("Leave instances on the cluster after burnin,"
219
                       " for investigation in case of errors or simply"
220
                       " to use them")),
221
  ]
222

    
223
# Mainly used for bash completion
224
ARGUMENTS = [cli.ArgInstance(min=1)]
225

    
226

    
227
def _DoCheckInstances(fn):
228
  """Decorator for checking instances.
229

    
230
  """
231
  def wrapper(self, *args, **kwargs):
232
    val = fn(self, *args, **kwargs)
233
    for instance in self.instances:
234
      self._CheckInstanceAlive(instance) # pylint: disable-msg=W0212
235
    return val
236

    
237
  return wrapper
238

    
239

    
240
def _DoBatch(retry):
241
  """Decorator for possible batch operations.
242

    
243
  Must come after the _DoCheckInstances decorator (if any).
244

    
245
  @param retry: whether this is a retryable batch, will be
246
      passed to StartBatch
247

    
248
  """
249
  def wrap(fn):
250
    def batched(self, *args, **kwargs):
251
      self.StartBatch(retry)
252
      val = fn(self, *args, **kwargs)
253
      self.CommitQueue()
254
      return val
255
    return batched
256

    
257
  return wrap
258

    
259

    
260
class Burner(object):
261
  """Burner class."""
262

    
263
  def __init__(self):
264
    """Constructor."""
265
    self.url_opener = SimpleOpener()
266
    self._feed_buf = StringIO()
267
    self.nodes = []
268
    self.instances = []
269
    self.to_rem = []
270
    self.queued_ops = []
271
    self.opts = None
272
    self.queue_retry = False
273
    self.disk_count = self.disk_growth = self.disk_size = None
274
    self.hvp = self.bep = None
275
    self.ParseOptions()
276
    self.cl = cli.GetClient()
277
    self.GetState()
278

    
279
  def ClearFeedbackBuf(self):
280
    """Clear the feedback buffer."""
281
    self._feed_buf.truncate(0)
282

    
283
  def GetFeedbackBuf(self):
284
    """Return the contents of the buffer."""
285
    return self._feed_buf.getvalue()
286

    
287
  def Feedback(self, msg):
288
    """Acumulate feedback in our buffer."""
289
    formatted_msg = "%s %s" % (time.ctime(utils.MergeTime(msg[0])), msg[2])
290
    self._feed_buf.write(formatted_msg + "\n")
291
    if self.opts.verbose:
292
      Log(formatted_msg, indent=3)
293

    
294
  def MaybeRetry(self, retry_count, msg, fn, *args):
295
    """Possibly retry a given function execution.
296

    
297
    @type retry_count: int
298
    @param retry_count: retry counter:
299
        - 0: non-retryable action
300
        - 1: last retry for a retryable action
301
        - MAX_RETRIES: original try for a retryable action
302
    @type msg: str
303
    @param msg: the kind of the operation
304
    @type fn: callable
305
    @param fn: the function to be called
306

    
307
    """
308
    try:
309
      val = fn(*args)
310
      if retry_count > 0 and retry_count < MAX_RETRIES:
311
        Log("Idempotent %s succeeded after %d retries",
312
            msg, MAX_RETRIES - retry_count)
313
      return val
314
    except Exception, err: # pylint: disable-msg=W0703
315
      if retry_count == 0:
316
        Log("Non-idempotent %s failed, aborting", msg)
317
        raise
318
      elif retry_count == 1:
319
        Log("Idempotent %s repeated failure, aborting", msg)
320
        raise
321
      else:
322
        Log("Idempotent %s failed, retry #%d/%d: %s",
323
            msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err)
324
        self.MaybeRetry(retry_count - 1, msg, fn, *args)
325

    
326
  def _ExecOp(self, *ops):
327
    """Execute one or more opcodes and manage the exec buffer.
328

    
329
    @return: if only opcode has been passed, we return its result;
330
        otherwise we return the list of results
331

    
332
    """
333
    job_id = cli.SendJob(ops, cl=self.cl)
334
    results = cli.PollJob(job_id, cl=self.cl, feedback_fn=self.Feedback)
335
    if len(ops) == 1:
336
      return results[0]
337
    else:
338
      return results
339

    
340
  def ExecOp(self, retry, *ops):
341
    """Execute one or more opcodes and manage the exec buffer.
342

    
343
    @return: if only opcode has been passed, we return its result;
344
        otherwise we return the list of results
345

    
346
    """
347
    if retry:
348
      rval = MAX_RETRIES
349
    else:
350
      rval = 0
351
    cli.SetGenericOpcodeOpts(ops, self.opts)
352
    return self.MaybeRetry(rval, "opcode", self._ExecOp, *ops)
353

    
354
  def ExecOrQueue(self, name, ops, post_process=None):
355
    """Execute an opcode and manage the exec buffer."""
356
    if self.opts.parallel:
357
      cli.SetGenericOpcodeOpts(ops, self.opts)
358
      self.queued_ops.append((ops, name, post_process))
359
    else:
360
      val = self.ExecOp(self.queue_retry, *ops) # pylint: disable-msg=W0142
361
      if post_process is not None:
362
        post_process()
363
      return val
364

    
365
  def StartBatch(self, retry):
366
    """Start a new batch of jobs.
367

    
368
    @param retry: whether this is a retryable batch
369

    
370
    """
371
    self.queued_ops = []
372
    self.queue_retry = retry
373

    
374
  def CommitQueue(self):
375
    """Execute all submitted opcodes in case of parallel burnin"""
376
    if not self.opts.parallel or not self.queued_ops:
377
      return
378

    
379
    if self.queue_retry:
380
      rval = MAX_RETRIES
381
    else:
382
      rval = 0
383

    
384
    try:
385
      results = self.MaybeRetry(rval, "jobset", self.ExecJobSet,
386
                                self.queued_ops)
387
    finally:
388
      self.queued_ops = []
389
    return results
390

    
391
  def ExecJobSet(self, jobs):
392
    """Execute a set of jobs and return once all are done.
393

    
394
    The method will return the list of results, if all jobs are
395
    successful. Otherwise, OpExecError will be raised from within
396
    cli.py.
397

    
398
    """
399
    self.ClearFeedbackBuf()
400
    jex = cli.JobExecutor(cl=self.cl, feedback_fn=self.Feedback)
401
    for ops, name, _ in jobs:
402
      jex.QueueJob(name, *ops) # pylint: disable-msg=W0142
403
    try:
404
      results = jex.GetResults()
405
    except Exception, err: # pylint: disable-msg=W0703
406
      Log("Jobs failed: %s", err)
407
      raise BurninFailure()
408

    
409
    fail = False
410
    val = []
411
    for (_, name, post_process), (success, result) in zip(jobs, results):
412
      if success:
413
        if post_process:
414
          try:
415
            post_process()
416
          except Exception, err: # pylint: disable-msg=W0703
417
            Log("Post process call for job %s failed: %s", name, err)
418
            fail = True
419
        val.append(result)
420
      else:
421
        fail = True
422

    
423
    if fail:
424
      raise BurninFailure()
425

    
426
    return val
427

    
428
  def ParseOptions(self):
429
    """Parses the command line options.
430

    
431
    In case of command line errors, it will show the usage and exit the
432
    program.
433

    
434
    """
435
    parser = optparse.OptionParser(usage="\n%s" % USAGE,
436
                                   version=("%%prog (ganeti) %s" %
437
                                            constants.RELEASE_VERSION),
438
                                   option_list=OPTIONS)
439

    
440
    options, args = parser.parse_args()
441
    if len(args) < 1 or options.os is None:
442
      Usage()
443

    
444
    supported_disk_templates = (constants.DT_DISKLESS,
445
                                constants.DT_FILE,
446
                                constants.DT_SHARED_FILE,
447
                                constants.DT_PLAIN,
448
                                constants.DT_DRBD8)
449
    if options.disk_template not in supported_disk_templates:
450
      Err("Unknown disk template '%s'" % options.disk_template)
451

    
452
    if options.disk_template == constants.DT_DISKLESS:
453
      disk_size = disk_growth = []
454
      options.do_addremove_disks = False
455
    else:
456
      disk_size = [utils.ParseUnit(v) for v in options.disk_size.split(",")]
457
      disk_growth = [utils.ParseUnit(v)
458
                     for v in options.disk_growth.split(",")]
459
      if len(disk_growth) != len(disk_size):
460
        Err("Wrong disk sizes/growth combination")
461
    if ((disk_size and options.disk_template == constants.DT_DISKLESS) or
462
        (not disk_size and options.disk_template != constants.DT_DISKLESS)):
463
      Err("Wrong disk count/disk template combination")
464

    
465
    self.disk_size = disk_size
466
    self.disk_growth = disk_growth
467
    self.disk_count = len(disk_size)
468

    
469
    if options.nodes and options.iallocator:
470
      Err("Give either the nodes option or the iallocator option, not both")
471

    
472
    if options.http_check and not options.name_check:
473
      Err("Can't enable HTTP checks without name checks")
474

    
475
    self.opts = options
476
    self.instances = args
477
    self.bep = {
478
      constants.BE_MEMORY: options.mem_size,
479
      constants.BE_VCPUS: options.vcpu_count,
480
      }
481

    
482
    self.hypervisor = None
483
    self.hvp = {}
484
    if options.hypervisor:
485
      self.hypervisor, self.hvp = options.hypervisor
486

    
487
    if options.reboot_types is None:
488
      options.reboot_types = constants.REBOOT_TYPES
489
    else:
490
      options.reboot_types = options.reboot_types.split(",")
491
      rt_diff = set(options.reboot_types).difference(constants.REBOOT_TYPES)
492
      if rt_diff:
493
        Err("Invalid reboot types specified: %s" % utils.CommaJoin(rt_diff))
494

    
495
    socket.setdefaulttimeout(options.net_timeout)
496

    
497
  def GetState(self):
498
    """Read the cluster state from the master daemon."""
499
    if self.opts.nodes:
500
      names = self.opts.nodes.split(",")
501
    else:
502
      names = []
503
    try:
504
      op = opcodes.OpNodeQuery(output_fields=["name", "offline", "drained"],
505
                               names=names, use_locking=True)
506
      result = self.ExecOp(True, op)
507
    except errors.GenericError, err:
508
      err_code, msg = cli.FormatError(err)
509
      Err(msg, exit_code=err_code)
510
    self.nodes = [data[0] for data in result if not (data[1] or data[2])]
511

    
512
    op_diagnose = opcodes.OpOsDiagnose(output_fields=["name",
513
                                                      "variants",
514
                                                      "hidden"],
515
                                       names=[])
516
    result = self.ExecOp(True, op_diagnose)
517

    
518
    if not result:
519
      Err("Can't get the OS list")
520

    
521
    found = False
522
    for (name, variants, _) in result:
523
      if self.opts.os in cli.CalculateOSNames(name, variants):
524
        found = True
525
        break
526

    
527
    if not found:
528
      Err("OS '%s' not found" % self.opts.os)
529

    
530
    cluster_info = self.cl.QueryClusterInfo()
531
    self.cluster_info = cluster_info
532
    if not self.cluster_info:
533
      Err("Can't get cluster info")
534

    
535
    default_nic_params = self.cluster_info["nicparams"][constants.PP_DEFAULT]
536
    self.cluster_default_nicparams = default_nic_params
537
    if self.hypervisor is None:
538
      self.hypervisor = self.cluster_info["default_hypervisor"]
539
    self.hv_class = hypervisor.GetHypervisorClass(self.hypervisor)
540

    
541
  @_DoCheckInstances
542
  @_DoBatch(False)
543
  def BurnCreateInstances(self):
544
    """Create the given instances.
545

    
546
    """
547
    self.to_rem = []
548
    mytor = izip(cycle(self.nodes),
549
                 islice(cycle(self.nodes), 1, None),
550
                 self.instances)
551

    
552
    Log("Creating instances")
553
    for pnode, snode, instance in mytor:
554
      Log("instance %s", instance, indent=1)
555
      if self.opts.iallocator:
556
        pnode = snode = None
557
        msg = "with iallocator %s" % self.opts.iallocator
558
      elif self.opts.disk_template not in constants.DTS_INT_MIRROR:
559
        snode = None
560
        msg = "on %s" % pnode
561
      else:
562
        msg = "on %s, %s" % (pnode, snode)
563

    
564
      Log(msg, indent=2)
565

    
566
      op = opcodes.OpInstanceCreate(instance_name=instance,
567
                                    disks = [ {"size": size}
568
                                              for size in self.disk_size],
569
                                    disk_template=self.opts.disk_template,
570
                                    nics=self.opts.nics,
571
                                    mode=constants.INSTANCE_CREATE,
572
                                    os_type=self.opts.os,
573
                                    pnode=pnode,
574
                                    snode=snode,
575
                                    start=True,
576
                                    ip_check=self.opts.ip_check,
577
                                    name_check=self.opts.name_check,
578
                                    wait_for_sync=True,
579
                                    file_driver="loop",
580
                                    file_storage_dir=None,
581
                                    iallocator=self.opts.iallocator,
582
                                    beparams=self.bep,
583
                                    hvparams=self.hvp,
584
                                    hypervisor=self.hypervisor,
585
                                    osparams=self.opts.osparams,
586
                                    )
587
      remove_instance = lambda name: lambda: self.to_rem.append(name)
588
      self.ExecOrQueue(instance, [op], post_process=remove_instance(instance))
589

    
590
  @_DoBatch(False)
591
  def BurnGrowDisks(self):
592
    """Grow both the os and the swap disks by the requested amount, if any."""
593
    Log("Growing disks")
594
    for instance in self.instances:
595
      Log("instance %s", instance, indent=1)
596
      for idx, growth in enumerate(self.disk_growth):
597
        if growth > 0:
598
          op = opcodes.OpInstanceGrowDisk(instance_name=instance, disk=idx,
599
                                          amount=growth, wait_for_sync=True)
600
          Log("increase disk/%s by %s MB", idx, growth, indent=2)
601
          self.ExecOrQueue(instance, [op])
602

    
603
  @_DoBatch(True)
604
  def BurnReplaceDisks1D8(self):
605
    """Replace disks on primary and secondary for drbd8."""
606
    Log("Replacing disks on the same nodes")
607
    early_release = self.opts.early_release
608
    for instance in self.instances:
609
      Log("instance %s", instance, indent=1)
610
      ops = []
611
      for mode in constants.REPLACE_DISK_SEC, constants.REPLACE_DISK_PRI:
612
        op = opcodes.OpInstanceReplaceDisks(instance_name=instance,
613
                                            mode=mode,
614
                                            disks=list(range(self.disk_count)),
615
                                            early_release=early_release)
616
        Log("run %s", mode, indent=2)
617
        ops.append(op)
618
      self.ExecOrQueue(instance, ops)
619

    
620
  @_DoBatch(True)
621
  def BurnReplaceDisks2(self):
622
    """Replace secondary node."""
623
    Log("Changing the secondary node")
624
    mode = constants.REPLACE_DISK_CHG
625

    
626
    mytor = izip(islice(cycle(self.nodes), 2, None),
627
                 self.instances)
628
    for tnode, instance in mytor:
629
      Log("instance %s", instance, indent=1)
630
      if self.opts.iallocator:
631
        tnode = None
632
        msg = "with iallocator %s" % self.opts.iallocator
633
      else:
634
        msg = tnode
635
      op = opcodes.OpInstanceReplaceDisks(instance_name=instance,
636
                                          mode=mode,
637
                                          remote_node=tnode,
638
                                          iallocator=self.opts.iallocator,
639
                                          disks=[],
640
                                          early_release=self.opts.early_release)
641
      Log("run %s %s", mode, msg, indent=2)
642
      self.ExecOrQueue(instance, [op])
643

    
644
  @_DoCheckInstances
645
  @_DoBatch(False)
646
  def BurnFailover(self):
647
    """Failover the instances."""
648
    Log("Failing over instances")
649
    for instance in self.instances:
650
      Log("instance %s", instance, indent=1)
651
      op = opcodes.OpInstanceFailover(instance_name=instance,
652
                                      ignore_consistency=False)
653
      self.ExecOrQueue(instance, [op])
654

    
655
  @_DoCheckInstances
656
  @_DoBatch(False)
657
  def BurnMove(self):
658
    """Move the instances."""
659
    Log("Moving instances")
660
    mytor = izip(islice(cycle(self.nodes), 1, None),
661
                 self.instances)
662
    for tnode, instance in mytor:
663
      Log("instance %s", instance, indent=1)
664
      op = opcodes.OpInstanceMove(instance_name=instance,
665
                                  target_node=tnode)
666
      self.ExecOrQueue(instance, [op])
667

    
668
  @_DoBatch(False)
669
  def BurnMigrate(self):
670
    """Migrate the instances."""
671
    Log("Migrating instances")
672
    for instance in self.instances:
673
      Log("instance %s", instance, indent=1)
674
      op1 = opcodes.OpInstanceMigrate(instance_name=instance, mode=None,
675
                                      cleanup=False)
676

    
677
      op2 = opcodes.OpInstanceMigrate(instance_name=instance, mode=None,
678
                                      cleanup=True)
679
      Log("migration and migration cleanup", indent=2)
680
      self.ExecOrQueue(instance, [op1, op2])
681

    
682
  @_DoCheckInstances
683
  @_DoBatch(False)
684
  def BurnImportExport(self):
685
    """Export the instance, delete it, and import it back.
686

    
687
    """
688
    Log("Exporting and re-importing instances")
689
    mytor = izip(cycle(self.nodes),
690
                 islice(cycle(self.nodes), 1, None),
691
                 islice(cycle(self.nodes), 2, None),
692
                 self.instances)
693

    
694
    for pnode, snode, enode, instance in mytor:
695
      Log("instance %s", instance, indent=1)
696
      # read the full name of the instance
697
      nam_op = opcodes.OpInstanceQuery(output_fields=["name"],
698
                                       names=[instance], use_locking=True)
699
      full_name = self.ExecOp(False, nam_op)[0][0]
700

    
701
      if self.opts.iallocator:
702
        pnode = snode = None
703
        import_log_msg = ("import from %s"
704
                          " with iallocator %s" %
705
                          (enode, self.opts.iallocator))
706
      elif self.opts.disk_template not in constants.DTS_INT_MIRROR:
707
        snode = None
708
        import_log_msg = ("import from %s to %s" %
709
                          (enode, pnode))
710
      else:
711
        import_log_msg = ("import from %s to %s, %s" %
712
                          (enode, pnode, snode))
713

    
714
      exp_op = opcodes.OpBackupExport(instance_name=instance,
715
                                      target_node=enode,
716
                                      mode=constants.EXPORT_MODE_LOCAL,
717
                                      shutdown=True)
718
      rem_op = opcodes.OpInstanceRemove(instance_name=instance,
719
                                        ignore_failures=True)
720
      imp_dir = utils.PathJoin(constants.EXPORT_DIR, full_name)
721
      imp_op = opcodes.OpInstanceCreate(instance_name=instance,
722
                                        disks = [ {"size": size}
723
                                                  for size in self.disk_size],
724
                                        disk_template=self.opts.disk_template,
725
                                        nics=self.opts.nics,
726
                                        mode=constants.INSTANCE_IMPORT,
727
                                        src_node=enode,
728
                                        src_path=imp_dir,
729
                                        pnode=pnode,
730
                                        snode=snode,
731
                                        start=True,
732
                                        ip_check=self.opts.ip_check,
733
                                        name_check=self.opts.name_check,
734
                                        wait_for_sync=True,
735
                                        file_storage_dir=None,
736
                                        file_driver="loop",
737
                                        iallocator=self.opts.iallocator,
738
                                        beparams=self.bep,
739
                                        hvparams=self.hvp,
740
                                        osparams=self.opts.osparams,
741
                                        )
742

    
743
      erem_op = opcodes.OpBackupRemove(instance_name=instance)
744

    
745
      Log("export to node %s", enode, indent=2)
746
      Log("remove instance", indent=2)
747
      Log(import_log_msg, indent=2)
748
      Log("remove export", indent=2)
749
      self.ExecOrQueue(instance, [exp_op, rem_op, imp_op, erem_op])
750

    
751
  @staticmethod
752
  def StopInstanceOp(instance):
753
    """Stop given instance."""
754
    return opcodes.OpInstanceShutdown(instance_name=instance)
755

    
756
  @staticmethod
757
  def StartInstanceOp(instance):
758
    """Start given instance."""
759
    return opcodes.OpInstanceStartup(instance_name=instance, force=False)
760

    
761
  @staticmethod
762
  def RenameInstanceOp(instance, instance_new):
763
    """Rename instance."""
764
    return opcodes.OpInstanceRename(instance_name=instance,
765
                                    new_name=instance_new)
766

    
767
  @_DoCheckInstances
768
  @_DoBatch(True)
769
  def BurnStopStart(self):
770
    """Stop/start the instances."""
771
    Log("Stopping and starting instances")
772
    for instance in self.instances:
773
      Log("instance %s", instance, indent=1)
774
      op1 = self.StopInstanceOp(instance)
775
      op2 = self.StartInstanceOp(instance)
776
      self.ExecOrQueue(instance, [op1, op2])
777

    
778
  @_DoBatch(False)
779
  def BurnRemove(self):
780
    """Remove the instances."""
781
    Log("Removing instances")
782
    for instance in self.to_rem:
783
      Log("instance %s", instance, indent=1)
784
      op = opcodes.OpInstanceRemove(instance_name=instance,
785
                                    ignore_failures=True)
786
      self.ExecOrQueue(instance, [op])
787

    
788
  def BurnRename(self):
789
    """Rename the instances.
790

    
791
    Note that this function will not execute in parallel, since we
792
    only have one target for rename.
793

    
794
    """
795
    Log("Renaming instances")
796
    rename = self.opts.rename
797
    for instance in self.instances:
798
      Log("instance %s", instance, indent=1)
799
      op_stop1 = self.StopInstanceOp(instance)
800
      op_stop2 = self.StopInstanceOp(rename)
801
      op_rename1 = self.RenameInstanceOp(instance, rename)
802
      op_rename2 = self.RenameInstanceOp(rename, instance)
803
      op_start1 = self.StartInstanceOp(rename)
804
      op_start2 = self.StartInstanceOp(instance)
805
      self.ExecOp(False, op_stop1, op_rename1, op_start1)
806
      self._CheckInstanceAlive(rename)
807
      self.ExecOp(False, op_stop2, op_rename2, op_start2)
808
      self._CheckInstanceAlive(instance)
809

    
810
  @_DoCheckInstances
811
  @_DoBatch(True)
812
  def BurnReinstall(self):
813
    """Reinstall the instances."""
814
    Log("Reinstalling instances")
815
    for instance in self.instances:
816
      Log("instance %s", instance, indent=1)
817
      op1 = self.StopInstanceOp(instance)
818
      op2 = opcodes.OpInstanceReinstall(instance_name=instance)
819
      Log("reinstall without passing the OS", indent=2)
820
      op3 = opcodes.OpInstanceReinstall(instance_name=instance,
821
                                        os_type=self.opts.os)
822
      Log("reinstall specifying the OS", indent=2)
823
      op4 = self.StartInstanceOp(instance)
824
      self.ExecOrQueue(instance, [op1, op2, op3, op4])
825

    
826
  @_DoCheckInstances
827
  @_DoBatch(True)
828
  def BurnReboot(self):
829
    """Reboot the instances."""
830
    Log("Rebooting instances")
831
    for instance in self.instances:
832
      Log("instance %s", instance, indent=1)
833
      ops = []
834
      for reboot_type in self.opts.reboot_types:
835
        op = opcodes.OpInstanceReboot(instance_name=instance,
836
                                      reboot_type=reboot_type,
837
                                      ignore_secondaries=False)
838
        Log("reboot with type '%s'", reboot_type, indent=2)
839
        ops.append(op)
840
      self.ExecOrQueue(instance, ops)
841

    
842
  @_DoCheckInstances
843
  @_DoBatch(True)
844
  def BurnActivateDisks(self):
845
    """Activate and deactivate disks of the instances."""
846
    Log("Activating/deactivating disks")
847
    for instance in self.instances:
848
      Log("instance %s", instance, indent=1)
849
      op_start = self.StartInstanceOp(instance)
850
      op_act = opcodes.OpInstanceActivateDisks(instance_name=instance)
851
      op_deact = opcodes.OpInstanceDeactivateDisks(instance_name=instance)
852
      op_stop = self.StopInstanceOp(instance)
853
      Log("activate disks when online", indent=2)
854
      Log("activate disks when offline", indent=2)
855
      Log("deactivate disks (when offline)", indent=2)
856
      self.ExecOrQueue(instance, [op_act, op_stop, op_act, op_deact, op_start])
857

    
858
  @_DoCheckInstances
859
  @_DoBatch(False)
860
  def BurnAddRemoveDisks(self):
861
    """Add and remove an extra disk for the instances."""
862
    Log("Adding and removing disks")
863
    for instance in self.instances:
864
      Log("instance %s", instance, indent=1)
865
      op_add = opcodes.OpInstanceSetParams(\
866
        instance_name=instance,
867
        disks=[(constants.DDM_ADD, {"size": self.disk_size[0]})])
868
      op_rem = opcodes.OpInstanceSetParams(\
869
        instance_name=instance, disks=[(constants.DDM_REMOVE, {})])
870
      op_stop = self.StopInstanceOp(instance)
871
      op_start = self.StartInstanceOp(instance)
872
      Log("adding a disk", indent=2)
873
      Log("removing last disk", indent=2)
874
      self.ExecOrQueue(instance, [op_add, op_stop, op_rem, op_start])
875

    
876
  @_DoBatch(False)
877
  def BurnAddRemoveNICs(self):
878
    """Add and remove an extra NIC for the instances."""
879
    Log("Adding and removing NICs")
880
    for instance in self.instances:
881
      Log("instance %s", instance, indent=1)
882
      op_add = opcodes.OpInstanceSetParams(\
883
        instance_name=instance, nics=[(constants.DDM_ADD, {})])
884
      op_rem = opcodes.OpInstanceSetParams(\
885
        instance_name=instance, nics=[(constants.DDM_REMOVE, {})])
886
      Log("adding a NIC", indent=2)
887
      Log("removing last NIC", indent=2)
888
      self.ExecOrQueue(instance, [op_add, op_rem])
889

    
890
  def ConfdCallback(self, reply):
891
    """Callback for confd queries"""
892
    if reply.type == confd_client.UPCALL_REPLY:
893
      if reply.server_reply.status != constants.CONFD_REPL_STATUS_OK:
894
        Err("Query %s gave non-ok status %s: %s" % (reply.orig_request,
895
                                                    reply.server_reply.status,
896
                                                    reply.server_reply))
897
      if reply.orig_request.type == constants.CONFD_REQ_PING:
898
        Log("Ping: OK", indent=1)
899
      elif reply.orig_request.type == constants.CONFD_REQ_CLUSTER_MASTER:
900
        if reply.server_reply.answer == self.cluster_info["master"]:
901
          Log("Master: OK", indent=1)
902
        else:
903
          Err("Master: wrong: %s" % reply.server_reply.answer)
904
      elif reply.orig_request.type == constants.CONFD_REQ_NODE_ROLE_BYNAME:
905
        if reply.server_reply.answer == constants.CONFD_NODE_ROLE_MASTER:
906
          Log("Node role for master: OK", indent=1)
907
        else:
908
          Err("Node role for master: wrong: %s" % reply.server_reply.answer)
909

    
910
  def DoConfdRequestReply(self, req):
911
    self.confd_counting_callback.RegisterQuery(req.rsalt)
912
    self.confd_client.SendRequest(req, async=False)
913
    while not self.confd_counting_callback.AllAnswered():
914
      if not self.confd_client.ReceiveReply():
915
        Err("Did not receive all expected confd replies")
916
        break
917

    
918
  def BurnConfd(self):
919
    """Run confd queries for our instances.
920

    
921
    The following confd queries are tested:
922
      - CONFD_REQ_PING: simple ping
923
      - CONFD_REQ_CLUSTER_MASTER: cluster master
924
      - CONFD_REQ_NODE_ROLE_BYNAME: node role, for the master
925

    
926
    """
927
    Log("Checking confd results")
928

    
929
    filter_callback = confd_client.ConfdFilterCallback(self.ConfdCallback)
930
    counting_callback = confd_client.ConfdCountingCallback(filter_callback)
931
    self.confd_counting_callback = counting_callback
932

    
933
    self.confd_client = confd_client.GetConfdClient(counting_callback)
934

    
935
    req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
936
    self.DoConfdRequestReply(req)
937

    
938
    req = confd_client.ConfdClientRequest(
939
      type=constants.CONFD_REQ_CLUSTER_MASTER)
940
    self.DoConfdRequestReply(req)
941

    
942
    req = confd_client.ConfdClientRequest(
943
        type=constants.CONFD_REQ_NODE_ROLE_BYNAME,
944
        query=self.cluster_info["master"])
945
    self.DoConfdRequestReply(req)
946

    
947
  def _CheckInstanceAlive(self, instance):
948
    """Check if an instance is alive by doing http checks.
949

    
950
    This will try to retrieve the url on the instance /hostname.txt
951
    and check that it contains the hostname of the instance. In case
952
    we get ECONNREFUSED, we retry up to the net timeout seconds, for
953
    any other error we abort.
954

    
955
    """
956
    if not self.opts.http_check:
957
      return
958
    end_time = time.time() + self.opts.net_timeout
959
    url = None
960
    while time.time() < end_time and url is None:
961
      try:
962
        url = self.url_opener.open("http://%s/hostname.txt" % instance)
963
      except IOError:
964
        # here we can have connection refused, no route to host, etc.
965
        time.sleep(1)
966
    if url is None:
967
      raise InstanceDown(instance, "Cannot contact instance")
968
    hostname = url.read().strip()
969
    url.close()
970
    if hostname != instance:
971
      raise InstanceDown(instance, ("Hostname mismatch, expected %s, got %s" %
972
                                    (instance, hostname)))
973

    
974
  def BurninCluster(self):
975
    """Test a cluster intensively.
976

    
977
    This will create instances and then start/stop/failover them.
978
    It is safe for existing instances but could impact performance.
979

    
980
    """
981

    
982
    opts = self.opts
983

    
984
    Log("Testing global parameters")
985

    
986
    if (len(self.nodes) == 1 and
987
        opts.disk_template not in (constants.DT_DISKLESS, constants.DT_PLAIN,
988
                                   constants.DT_FILE,
989
                                   constants.DT_SHARED_FILE)):
990
      Err("When one node is available/selected the disk template must"
991
          " be 'diskless', 'file' or 'plain'")
992

    
993
    has_err = True
994
    try:
995
      self.BurnCreateInstances()
996
      if opts.do_replace1 and opts.disk_template in constants.DTS_INT_MIRROR:
997
        self.BurnReplaceDisks1D8()
998
      if (opts.do_replace2 and len(self.nodes) > 2 and
999
          opts.disk_template in constants.DTS_INT_MIRROR) :
1000
        self.BurnReplaceDisks2()
1001

    
1002
      if (opts.disk_template in constants.DTS_GROWABLE and
1003
          compat.any(n > 0 for n in self.disk_growth)):
1004
        self.BurnGrowDisks()
1005

    
1006
      if opts.do_failover and opts.disk_template in constants.DTS_MIRRORED:
1007
        self.BurnFailover()
1008

    
1009
      if opts.do_migrate:
1010
        if opts.disk_template not in constants.DTS_MIRRORED:
1011
          Log("Skipping migration (disk template %s does not support it)",
1012
              opts.disk_template)
1013
        elif not self.hv_class.CAN_MIGRATE:
1014
          Log("Skipping migration (hypervisor %s does not support it)",
1015
              self.hypervisor)
1016
        else:
1017
          self.BurnMigrate()
1018

    
1019
      if (opts.do_move and len(self.nodes) > 1 and
1020
          opts.disk_template in [constants.DT_PLAIN, constants.DT_FILE]):
1021
        self.BurnMove()
1022

    
1023
      if (opts.do_importexport and
1024
          opts.disk_template not in (constants.DT_DISKLESS,
1025
                                     constants.DT_SHARED_FILE,
1026
                                     constants.DT_FILE)):
1027
        self.BurnImportExport()
1028

    
1029
      if opts.do_reinstall:
1030
        self.BurnReinstall()
1031

    
1032
      if opts.do_reboot:
1033
        self.BurnReboot()
1034

    
1035
      if opts.do_addremove_disks:
1036
        self.BurnAddRemoveDisks()
1037

    
1038
      default_nic_mode = self.cluster_default_nicparams[constants.NIC_MODE]
1039
      # Don't add/remove nics in routed mode, as we would need an ip to add
1040
      # them with
1041
      if opts.do_addremove_nics:
1042
        if default_nic_mode == constants.NIC_MODE_BRIDGED:
1043
          self.BurnAddRemoveNICs()
1044
        else:
1045
          Log("Skipping nic add/remove as the cluster is not in bridged mode")
1046

    
1047
      if opts.do_activate_disks:
1048
        self.BurnActivateDisks()
1049

    
1050
      if opts.rename:
1051
        self.BurnRename()
1052

    
1053
      if opts.do_confd_tests:
1054
        self.BurnConfd()
1055

    
1056
      if opts.do_startstop:
1057
        self.BurnStopStart()
1058

    
1059
      has_err = False
1060
    finally:
1061
      if has_err:
1062
        Log("Error detected: opcode buffer follows:\n\n")
1063
        Log(self.GetFeedbackBuf())
1064
        Log("\n\n")
1065
      if not self.opts.keep_instances:
1066
        try:
1067
          self.BurnRemove()
1068
        except Exception, err:  # pylint: disable-msg=W0703
1069
          if has_err: # already detected errors, so errors in removal
1070
                      # are quite expected
1071
            Log("Note: error detected during instance remove: %s", err)
1072
          else: # non-expected error
1073
            raise
1074

    
1075
    return constants.EXIT_SUCCESS
1076

    
1077

    
1078
def main():
1079
  """Main function.
1080

    
1081
  """
1082
  utils.SetupLogging(constants.LOG_BURNIN, sys.argv[0],
1083
                     debug=False, stderr_logging=True)
1084

    
1085
  return Burner().BurninCluster()
1086

    
1087

    
1088
if __name__ == "__main__":
1089
  main()