Statistics
| Branch: | Tag: | Revision:

root / tools / burnin @ d0ffa390

History | View | Annotate | Download (38.6 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Burnin program
23

    
24
"""
25

    
26
import sys
27
import optparse
28
import time
29
import socket
30
import urllib
31
from itertools import izip, islice, cycle
32
from cStringIO import StringIO
33

    
34
from ganeti import opcodes
35
from ganeti import constants
36
from ganeti import cli
37
from ganeti import errors
38
from ganeti import utils
39
from ganeti import hypervisor
40
from ganeti import compat
41

    
42
from ganeti.confd import client as confd_client
43

    
44

    
45
USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...")
46

    
47
MAX_RETRIES = 3
48
LOG_HEADERS = {
49
  0: "- ",
50
  1: "* ",
51
  2: ""
52
  }
53

    
54
class InstanceDown(Exception):
55
  """The checked instance was not up"""
56

    
57

    
58
class BurninFailure(Exception):
59
  """Failure detected during burning"""
60

    
61

    
62
def Usage():
63
  """Shows program usage information and exits the program."""
64

    
65
  print >> sys.stderr, "Usage:"
66
  print >> sys.stderr, USAGE
67
  sys.exit(2)
68

    
69

    
70
def Log(msg, *args, **kwargs):
71
  """Simple function that prints out its argument.
72

    
73
  """
74
  if args:
75
    msg = msg % args
76
  indent = kwargs.get('indent', 0)
77
  sys.stdout.write("%*s%s%s\n" % (2*indent, "",
78
                                  LOG_HEADERS.get(indent, "  "), msg))
79
  sys.stdout.flush()
80

    
81

    
82
def Err(msg, exit_code=1):
83
  """Simple error logging that prints to stderr.
84

    
85
  """
86
  sys.stderr.write(msg + "\n")
87
  sys.stderr.flush()
88
  sys.exit(exit_code)
89

    
90

    
91
class SimpleOpener(urllib.FancyURLopener):
92
  """A simple url opener"""
93
  # pylint: disable-msg=W0221
94

    
95
  def prompt_user_passwd(self, host, realm, clear_cache=0):
96
    """No-interaction version of prompt_user_passwd."""
97
    # we follow parent class' API
98
    # pylint: disable-msg=W0613
99
    return None, None
100

    
101
  def http_error_default(self, url, fp, errcode, errmsg, headers):
102
    """Custom error handling"""
103
    # make sure sockets are not left in CLOSE_WAIT, this is similar
104
    # but with a different exception to the BasicURLOpener class
105
    _ = fp.read() # throw away data
106
    fp.close()
107
    raise InstanceDown("HTTP error returned: code %s, msg %s" %
108
                       (errcode, errmsg))
109

    
110

    
111
OPTIONS = [
112
  cli.cli_option("-o", "--os", dest="os", default=None,
113
                 help="OS to use during burnin",
114
                 metavar="<OS>",
115
                 completion_suggest=cli.OPT_COMPL_ONE_OS),
116
  cli.HYPERVISOR_OPT,
117
  cli.OSPARAMS_OPT,
118
  cli.cli_option("--disk-size", dest="disk_size",
119
                 help="Disk size (determines disk count)",
120
                 default="128m", type="string", metavar="<size,size,...>",
121
                 completion_suggest=("128M 512M 1G 4G 1G,256M"
122
                                     " 4G,1G,1G 10G").split()),
123
  cli.cli_option("--disk-growth", dest="disk_growth", help="Disk growth",
124
                 default="128m", type="string", metavar="<size,size,...>"),
125
  cli.cli_option("--mem-size", dest="mem_size", help="Memory size",
126
                 default=128, type="unit", metavar="<size>",
127
                 completion_suggest=("128M 256M 512M 1G 4G 8G"
128
                                     " 12G 16G").split()),
129
  cli.cli_option("--vcpu-count", dest="vcpu_count", help="VCPU count",
130
                 default=3, type="unit", metavar="<count>",
131
                 completion_suggest=("1 2 3 4").split()),
132
  cli.DEBUG_OPT,
133
  cli.VERBOSE_OPT,
134
  cli.NOIPCHECK_OPT,
135
  cli.NONAMECHECK_OPT,
136
  cli.EARLY_RELEASE_OPT,
137
  cli.cli_option("--no-replace1", dest="do_replace1",
138
                 help="Skip disk replacement with the same secondary",
139
                 action="store_false", default=True),
140
  cli.cli_option("--no-replace2", dest="do_replace2",
141
                 help="Skip disk replacement with a different secondary",
142
                 action="store_false", default=True),
143
  cli.cli_option("--no-failover", dest="do_failover",
144
                 help="Skip instance failovers", action="store_false",
145
                 default=True),
146
  cli.cli_option("--no-migrate", dest="do_migrate",
147
                 help="Skip instance live migration",
148
                 action="store_false", default=True),
149
  cli.cli_option("--no-move", dest="do_move",
150
                 help="Skip instance moves", action="store_false",
151
                 default=True),
152
  cli.cli_option("--no-importexport", dest="do_importexport",
153
                 help="Skip instance export/import", action="store_false",
154
                 default=True),
155
  cli.cli_option("--no-startstop", dest="do_startstop",
156
                 help="Skip instance stop/start", action="store_false",
157
                 default=True),
158
  cli.cli_option("--no-reinstall", dest="do_reinstall",
159
                 help="Skip instance reinstall", action="store_false",
160
                 default=True),
161
  cli.cli_option("--no-reboot", dest="do_reboot",
162
                 help="Skip instance reboot", action="store_false",
163
                 default=True),
164
  cli.cli_option("--reboot-types", dest="reboot_types",
165
                 help="Specify the reboot types", default=None),
166
  cli.cli_option("--no-activate-disks", dest="do_activate_disks",
167
                 help="Skip disk activation/deactivation",
168
                 action="store_false", default=True),
169
  cli.cli_option("--no-add-disks", dest="do_addremove_disks",
170
                 help="Skip disk addition/removal",
171
                 action="store_false", default=True),
172
  cli.cli_option("--no-add-nics", dest="do_addremove_nics",
173
                 help="Skip NIC addition/removal",
174
                 action="store_false", default=True),
175
  cli.cli_option("--no-nics", dest="nics",
176
                 help="No network interfaces", action="store_const",
177
                 const=[], default=[{}]),
178
  cli.cli_option("--no-confd", dest="do_confd_tests",
179
                 help="Skip confd queries",
180
                 action="store_false", default=True),
181
  cli.cli_option("--rename", dest="rename", default=None,
182
                 help=("Give one unused instance name which is taken"
183
                       " to start the renaming sequence"),
184
                 metavar="<instance_name>"),
185
  cli.cli_option("-t", "--disk-template", dest="disk_template",
186
                 choices=list(constants.DISK_TEMPLATES),
187
                 default=constants.DT_DRBD8,
188
                 help="Disk template (diskless, file, plain or drbd) [drbd]"),
189
  cli.cli_option("-n", "--nodes", dest="nodes", default="",
190
                 help=("Comma separated list of nodes to perform"
191
                       " the burnin on (defaults to all nodes)"),
192
                 completion_suggest=cli.OPT_COMPL_MANY_NODES),
193
  cli.cli_option("-I", "--iallocator", dest="iallocator",
194
                 default=None, type="string",
195
                 help=("Perform the allocation using an iallocator"
196
                       " instead of fixed node spread (node restrictions no"
197
                       " longer apply, therefore -n/--nodes must not be"
198
                       " used"),
199
                 completion_suggest=cli.OPT_COMPL_ONE_IALLOCATOR),
200
  cli.cli_option("-p", "--parallel", default=False, action="store_true",
201
                 dest="parallel",
202
                 help=("Enable parallelization of some operations in"
203
                       " order to speed burnin or to test granular locking")),
204
  cli.cli_option("--net-timeout", default=15, type="int",
205
                 dest="net_timeout",
206
                 help=("The instance check network timeout in seconds"
207
                       " (defaults to 15 seconds)"),
208
                 completion_suggest="15 60 300 900".split()),
209
  cli.cli_option("-C", "--http-check", default=False, action="store_true",
210
                 dest="http_check",
211
                 help=("Enable checking of instance status via http,"
212
                       " looking for /hostname.txt that should contain the"
213
                       " name of the instance")),
214
  cli.cli_option("-K", "--keep-instances", default=False,
215
                 action="store_true",
216
                 dest="keep_instances",
217
                 help=("Leave instances on the cluster after burnin,"
218
                       " for investigation in case of errors or simply"
219
                       " to use them")),
220
  ]
221

    
222
# Mainly used for bash completion
223
ARGUMENTS = [cli.ArgInstance(min=1)]
224

    
225

    
226
def _DoCheckInstances(fn):
227
  """Decorator for checking instances.
228

    
229
  """
230
  def wrapper(self, *args, **kwargs):
231
    val = fn(self, *args, **kwargs)
232
    for instance in self.instances:
233
      self._CheckInstanceAlive(instance) # pylint: disable-msg=W0212
234
    return val
235

    
236
  return wrapper
237

    
238

    
239
def _DoBatch(retry):
240
  """Decorator for possible batch operations.
241

    
242
  Must come after the _DoCheckInstances decorator (if any).
243

    
244
  @param retry: whether this is a retryable batch, will be
245
      passed to StartBatch
246

    
247
  """
248
  def wrap(fn):
249
    def batched(self, *args, **kwargs):
250
      self.StartBatch(retry)
251
      val = fn(self, *args, **kwargs)
252
      self.CommitQueue()
253
      return val
254
    return batched
255

    
256
  return wrap
257

    
258

    
259
class Burner(object):
260
  """Burner class."""
261

    
262
  def __init__(self):
263
    """Constructor."""
264
    self.url_opener = SimpleOpener()
265
    self._feed_buf = StringIO()
266
    self.nodes = []
267
    self.instances = []
268
    self.to_rem = []
269
    self.queued_ops = []
270
    self.opts = None
271
    self.queue_retry = False
272
    self.disk_count = self.disk_growth = self.disk_size = None
273
    self.hvp = self.bep = None
274
    self.ParseOptions()
275
    self.cl = cli.GetClient()
276
    self.GetState()
277

    
278
  def ClearFeedbackBuf(self):
279
    """Clear the feedback buffer."""
280
    self._feed_buf.truncate(0)
281

    
282
  def GetFeedbackBuf(self):
283
    """Return the contents of the buffer."""
284
    return self._feed_buf.getvalue()
285

    
286
  def Feedback(self, msg):
287
    """Acumulate feedback in our buffer."""
288
    formatted_msg = "%s %s" % (time.ctime(utils.MergeTime(msg[0])), msg[2])
289
    self._feed_buf.write(formatted_msg + "\n")
290
    if self.opts.verbose:
291
      Log(formatted_msg, indent=3)
292

    
293
  def MaybeRetry(self, retry_count, msg, fn, *args):
294
    """Possibly retry a given function execution.
295

    
296
    @type retry_count: int
297
    @param retry_count: retry counter:
298
        - 0: non-retryable action
299
        - 1: last retry for a retryable action
300
        - MAX_RETRIES: original try for a retryable action
301
    @type msg: str
302
    @param msg: the kind of the operation
303
    @type fn: callable
304
    @param fn: the function to be called
305

    
306
    """
307
    try:
308
      val = fn(*args)
309
      if retry_count > 0 and retry_count < MAX_RETRIES:
310
        Log("Idempotent %s succeeded after %d retries",
311
            msg, MAX_RETRIES - retry_count)
312
      return val
313
    except Exception, err: # pylint: disable-msg=W0703
314
      if retry_count == 0:
315
        Log("Non-idempotent %s failed, aborting", msg)
316
        raise
317
      elif retry_count == 1:
318
        Log("Idempotent %s repeated failure, aborting", msg)
319
        raise
320
      else:
321
        Log("Idempotent %s failed, retry #%d/%d: %s",
322
            msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err)
323
        self.MaybeRetry(retry_count - 1, msg, fn, *args)
324

    
325
  def _ExecOp(self, *ops):
326
    """Execute one or more opcodes and manage the exec buffer.
327

    
328
    @return: if only opcode has been passed, we return its result;
329
        otherwise we return the list of results
330

    
331
    """
332
    job_id = cli.SendJob(ops, cl=self.cl)
333
    results = cli.PollJob(job_id, cl=self.cl, feedback_fn=self.Feedback)
334
    if len(ops) == 1:
335
      return results[0]
336
    else:
337
      return results
338

    
339
  def ExecOp(self, retry, *ops):
340
    """Execute one or more opcodes and manage the exec buffer.
341

    
342
    @return: if only opcode has been passed, we return its result;
343
        otherwise we return the list of results
344

    
345
    """
346
    if retry:
347
      rval = MAX_RETRIES
348
    else:
349
      rval = 0
350
    cli.SetGenericOpcodeOpts(ops, self.opts)
351
    return self.MaybeRetry(rval, "opcode", self._ExecOp, *ops)
352

    
353
  def ExecOrQueue(self, name, ops, post_process=None):
354
    """Execute an opcode and manage the exec buffer."""
355
    if self.opts.parallel:
356
      cli.SetGenericOpcodeOpts(ops, self.opts)
357
      self.queued_ops.append((ops, name, post_process))
358
    else:
359
      val = self.ExecOp(self.queue_retry, *ops) # pylint: disable-msg=W0142
360
      if post_process is not None:
361
        post_process()
362
      return val
363

    
364
  def StartBatch(self, retry):
365
    """Start a new batch of jobs.
366

    
367
    @param retry: whether this is a retryable batch
368

    
369
    """
370
    self.queued_ops = []
371
    self.queue_retry = retry
372

    
373
  def CommitQueue(self):
374
    """Execute all submitted opcodes in case of parallel burnin"""
375
    if not self.opts.parallel or not self.queued_ops:
376
      return
377

    
378
    if self.queue_retry:
379
      rval = MAX_RETRIES
380
    else:
381
      rval = 0
382

    
383
    try:
384
      results = self.MaybeRetry(rval, "jobset", self.ExecJobSet,
385
                                self.queued_ops)
386
    finally:
387
      self.queued_ops = []
388
    return results
389

    
390
  def ExecJobSet(self, jobs):
391
    """Execute a set of jobs and return once all are done.
392

    
393
    The method will return the list of results, if all jobs are
394
    successful. Otherwise, OpExecError will be raised from within
395
    cli.py.
396

    
397
    """
398
    self.ClearFeedbackBuf()
399
    jex = cli.JobExecutor(cl=self.cl, feedback_fn=self.Feedback)
400
    for ops, name, _ in jobs:
401
      jex.QueueJob(name, *ops) # pylint: disable-msg=W0142
402
    try:
403
      results = jex.GetResults()
404
    except Exception, err: # pylint: disable-msg=W0703
405
      Log("Jobs failed: %s", err)
406
      raise BurninFailure()
407

    
408
    fail = False
409
    val = []
410
    for (_, name, post_process), (success, result) in zip(jobs, results):
411
      if success:
412
        if post_process:
413
          try:
414
            post_process()
415
          except Exception, err: # pylint: disable-msg=W0703
416
            Log("Post process call for job %s failed: %s", name, err)
417
            fail = True
418
        val.append(result)
419
      else:
420
        fail = True
421

    
422
    if fail:
423
      raise BurninFailure()
424

    
425
    return val
426

    
427
  def ParseOptions(self):
428
    """Parses the command line options.
429

    
430
    In case of command line errors, it will show the usage and exit the
431
    program.
432

    
433
    """
434
    parser = optparse.OptionParser(usage="\n%s" % USAGE,
435
                                   version=("%%prog (ganeti) %s" %
436
                                            constants.RELEASE_VERSION),
437
                                   option_list=OPTIONS)
438

    
439
    options, args = parser.parse_args()
440
    if len(args) < 1 or options.os is None:
441
      Usage()
442

    
443
    supported_disk_templates = (constants.DT_DISKLESS,
444
                                constants.DT_FILE,
445
                                constants.DT_PLAIN,
446
                                constants.DT_DRBD8)
447
    if options.disk_template not in supported_disk_templates:
448
      Err("Unknown disk template '%s'" % options.disk_template)
449

    
450
    if options.disk_template == constants.DT_DISKLESS:
451
      disk_size = disk_growth = []
452
      options.do_addremove_disks = False
453
    else:
454
      disk_size = [utils.ParseUnit(v) for v in options.disk_size.split(",")]
455
      disk_growth = [utils.ParseUnit(v)
456
                     for v in options.disk_growth.split(",")]
457
      if len(disk_growth) != len(disk_size):
458
        Err("Wrong disk sizes/growth combination")
459
    if ((disk_size and options.disk_template == constants.DT_DISKLESS) or
460
        (not disk_size and options.disk_template != constants.DT_DISKLESS)):
461
      Err("Wrong disk count/disk template combination")
462

    
463
    self.disk_size = disk_size
464
    self.disk_growth = disk_growth
465
    self.disk_count = len(disk_size)
466

    
467
    if options.nodes and options.iallocator:
468
      Err("Give either the nodes option or the iallocator option, not both")
469

    
470
    if options.http_check and not options.name_check:
471
      Err("Can't enable HTTP checks without name checks")
472

    
473
    self.opts = options
474
    self.instances = args
475
    self.bep = {
476
      constants.BE_MEMORY: options.mem_size,
477
      constants.BE_VCPUS: options.vcpu_count,
478
      }
479

    
480
    self.hypervisor = None
481
    self.hvp = {}
482
    if options.hypervisor:
483
      self.hypervisor, self.hvp = options.hypervisor
484

    
485
    if options.reboot_types is None:
486
      options.reboot_types = constants.REBOOT_TYPES
487
    else:
488
      options.reboot_types = options.reboot_types.split(",")
489
      rt_diff = set(options.reboot_types).difference(constants.REBOOT_TYPES)
490
      if rt_diff:
491
        Err("Invalid reboot types specified: %s" % utils.CommaJoin(rt_diff))
492

    
493
    socket.setdefaulttimeout(options.net_timeout)
494

    
495
  def GetState(self):
496
    """Read the cluster state from the master daemon."""
497
    if self.opts.nodes:
498
      names = self.opts.nodes.split(",")
499
    else:
500
      names = []
501
    try:
502
      op = opcodes.OpNodeQuery(output_fields=["name", "offline", "drained"],
503
                               names=names, use_locking=True)
504
      result = self.ExecOp(True, op)
505
    except errors.GenericError, err:
506
      err_code, msg = cli.FormatError(err)
507
      Err(msg, exit_code=err_code)
508
    self.nodes = [data[0] for data in result if not (data[1] or data[2])]
509

    
510
    op_diagnose = opcodes.OpOsDiagnose(output_fields=["name",
511
                                                      "variants",
512
                                                      "hidden"],
513
                                       names=[])
514
    result = self.ExecOp(True, op_diagnose)
515

    
516
    if not result:
517
      Err("Can't get the OS list")
518

    
519
    found = False
520
    for (name, variants, _) in result:
521
      if self.opts.os in cli.CalculateOSNames(name, variants):
522
        found = True
523
        break
524

    
525
    if not found:
526
      Err("OS '%s' not found" % self.opts.os)
527

    
528
    cluster_info = self.cl.QueryClusterInfo()
529
    self.cluster_info = cluster_info
530
    if not self.cluster_info:
531
      Err("Can't get cluster info")
532

    
533
    default_nic_params = self.cluster_info["nicparams"][constants.PP_DEFAULT]
534
    self.cluster_default_nicparams = default_nic_params
535
    if self.hypervisor is None:
536
      self.hypervisor = self.cluster_info["default_hypervisor"]
537
    self.hv_class = hypervisor.GetHypervisorClass(self.hypervisor)
538

    
539
  @_DoCheckInstances
540
  @_DoBatch(False)
541
  def BurnCreateInstances(self):
542
    """Create the given instances.
543

    
544
    """
545
    self.to_rem = []
546
    mytor = izip(cycle(self.nodes),
547
                 islice(cycle(self.nodes), 1, None),
548
                 self.instances)
549

    
550
    Log("Creating instances")
551
    for pnode, snode, instance in mytor:
552
      Log("instance %s", instance, indent=1)
553
      if self.opts.iallocator:
554
        pnode = snode = None
555
        msg = "with iallocator %s" % self.opts.iallocator
556
      elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
557
        snode = None
558
        msg = "on %s" % pnode
559
      else:
560
        msg = "on %s, %s" % (pnode, snode)
561

    
562
      Log(msg, indent=2)
563

    
564
      op = opcodes.OpInstanceCreate(instance_name=instance,
565
                                    disks = [ {"size": size}
566
                                              for size in self.disk_size],
567
                                    disk_template=self.opts.disk_template,
568
                                    nics=self.opts.nics,
569
                                    mode=constants.INSTANCE_CREATE,
570
                                    os_type=self.opts.os,
571
                                    pnode=pnode,
572
                                    snode=snode,
573
                                    start=True,
574
                                    ip_check=self.opts.ip_check,
575
                                    name_check=self.opts.name_check,
576
                                    wait_for_sync=True,
577
                                    file_driver="loop",
578
                                    file_storage_dir=None,
579
                                    iallocator=self.opts.iallocator,
580
                                    beparams=self.bep,
581
                                    hvparams=self.hvp,
582
                                    hypervisor=self.hypervisor,
583
                                    osparams=self.opts.osparams,
584
                                    )
585
      remove_instance = lambda name: lambda: self.to_rem.append(name)
586
      self.ExecOrQueue(instance, [op], post_process=remove_instance(instance))
587

    
588
  @_DoBatch(False)
589
  def BurnGrowDisks(self):
590
    """Grow both the os and the swap disks by the requested amount, if any."""
591
    Log("Growing disks")
592
    for instance in self.instances:
593
      Log("instance %s", instance, indent=1)
594
      for idx, growth in enumerate(self.disk_growth):
595
        if growth > 0:
596
          op = opcodes.OpInstanceGrowDisk(instance_name=instance, disk=idx,
597
                                          amount=growth, wait_for_sync=True)
598
          Log("increase disk/%s by %s MB", idx, growth, indent=2)
599
          self.ExecOrQueue(instance, [op])
600

    
601
  @_DoBatch(True)
602
  def BurnReplaceDisks1D8(self):
603
    """Replace disks on primary and secondary for drbd8."""
604
    Log("Replacing disks on the same nodes")
605
    early_release = self.opts.early_release
606
    for instance in self.instances:
607
      Log("instance %s", instance, indent=1)
608
      ops = []
609
      for mode in constants.REPLACE_DISK_SEC, constants.REPLACE_DISK_PRI:
610
        op = opcodes.OpInstanceReplaceDisks(instance_name=instance,
611
                                            mode=mode,
612
                                            disks=list(range(self.disk_count)),
613
                                            early_release=early_release)
614
        Log("run %s", mode, indent=2)
615
        ops.append(op)
616
      self.ExecOrQueue(instance, ops)
617

    
618
  @_DoBatch(True)
619
  def BurnReplaceDisks2(self):
620
    """Replace secondary node."""
621
    Log("Changing the secondary node")
622
    mode = constants.REPLACE_DISK_CHG
623

    
624
    mytor = izip(islice(cycle(self.nodes), 2, None),
625
                 self.instances)
626
    for tnode, instance in mytor:
627
      Log("instance %s", instance, indent=1)
628
      if self.opts.iallocator:
629
        tnode = None
630
        msg = "with iallocator %s" % self.opts.iallocator
631
      else:
632
        msg = tnode
633
      op = opcodes.OpInstanceReplaceDisks(instance_name=instance,
634
                                          mode=mode,
635
                                          remote_node=tnode,
636
                                          iallocator=self.opts.iallocator,
637
                                          disks=[],
638
                                          early_release=self.opts.early_release)
639
      Log("run %s %s", mode, msg, indent=2)
640
      self.ExecOrQueue(instance, [op])
641

    
642
  @_DoCheckInstances
643
  @_DoBatch(False)
644
  def BurnFailover(self):
645
    """Failover the instances."""
646
    Log("Failing over instances")
647
    for instance in self.instances:
648
      Log("instance %s", instance, indent=1)
649
      op = opcodes.OpInstanceFailover(instance_name=instance,
650
                                      ignore_consistency=False)
651
      self.ExecOrQueue(instance, [op])
652

    
653
  @_DoCheckInstances
654
  @_DoBatch(False)
655
  def BurnMove(self):
656
    """Move the instances."""
657
    Log("Moving instances")
658
    mytor = izip(islice(cycle(self.nodes), 1, None),
659
                 self.instances)
660
    for tnode, instance in mytor:
661
      Log("instance %s", instance, indent=1)
662
      op = opcodes.OpInstanceMove(instance_name=instance,
663
                                  target_node=tnode)
664
      self.ExecOrQueue(instance, [op])
665

    
666
  @_DoBatch(False)
667
  def BurnMigrate(self):
668
    """Migrate the instances."""
669
    Log("Migrating instances")
670
    for instance in self.instances:
671
      Log("instance %s", instance, indent=1)
672
      op1 = opcodes.OpInstanceMigrate(instance_name=instance, mode=None,
673
                                      cleanup=False)
674

    
675
      op2 = opcodes.OpInstanceMigrate(instance_name=instance, mode=None,
676
                                      cleanup=True)
677
      Log("migration and migration cleanup", indent=2)
678
      self.ExecOrQueue(instance, [op1, op2])
679

    
680
  @_DoCheckInstances
681
  @_DoBatch(False)
682
  def BurnImportExport(self):
683
    """Export the instance, delete it, and import it back.
684

    
685
    """
686
    Log("Exporting and re-importing instances")
687
    mytor = izip(cycle(self.nodes),
688
                 islice(cycle(self.nodes), 1, None),
689
                 islice(cycle(self.nodes), 2, None),
690
                 self.instances)
691

    
692
    for pnode, snode, enode, instance in mytor:
693
      Log("instance %s", instance, indent=1)
694
      # read the full name of the instance
695
      nam_op = opcodes.OpInstanceQuery(output_fields=["name"],
696
                                       names=[instance], use_locking=True)
697
      full_name = self.ExecOp(False, nam_op)[0][0]
698

    
699
      if self.opts.iallocator:
700
        pnode = snode = None
701
        import_log_msg = ("import from %s"
702
                          " with iallocator %s" %
703
                          (enode, self.opts.iallocator))
704
      elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
705
        snode = None
706
        import_log_msg = ("import from %s to %s" %
707
                          (enode, pnode))
708
      else:
709
        import_log_msg = ("import from %s to %s, %s" %
710
                          (enode, pnode, snode))
711

    
712
      exp_op = opcodes.OpBackupExport(instance_name=instance,
713
                                      target_node=enode,
714
                                      mode=constants.EXPORT_MODE_LOCAL,
715
                                      shutdown=True)
716
      rem_op = opcodes.OpInstanceRemove(instance_name=instance,
717
                                        ignore_failures=True)
718
      imp_dir = utils.PathJoin(constants.EXPORT_DIR, full_name)
719
      imp_op = opcodes.OpInstanceCreate(instance_name=instance,
720
                                        disks = [ {"size": size}
721
                                                  for size in self.disk_size],
722
                                        disk_template=self.opts.disk_template,
723
                                        nics=self.opts.nics,
724
                                        mode=constants.INSTANCE_IMPORT,
725
                                        src_node=enode,
726
                                        src_path=imp_dir,
727
                                        pnode=pnode,
728
                                        snode=snode,
729
                                        start=True,
730
                                        ip_check=self.opts.ip_check,
731
                                        name_check=self.opts.name_check,
732
                                        wait_for_sync=True,
733
                                        file_storage_dir=None,
734
                                        file_driver="loop",
735
                                        iallocator=self.opts.iallocator,
736
                                        beparams=self.bep,
737
                                        hvparams=self.hvp,
738
                                        osparams=self.opts.osparams,
739
                                        )
740

    
741
      erem_op = opcodes.OpBackupRemove(instance_name=instance)
742

    
743
      Log("export to node %s", enode, indent=2)
744
      Log("remove instance", indent=2)
745
      Log(import_log_msg, indent=2)
746
      Log("remove export", indent=2)
747
      self.ExecOrQueue(instance, [exp_op, rem_op, imp_op, erem_op])
748

    
749
  @staticmethod
750
  def StopInstanceOp(instance):
751
    """Stop given instance."""
752
    return opcodes.OpInstanceShutdown(instance_name=instance)
753

    
754
  @staticmethod
755
  def StartInstanceOp(instance):
756
    """Start given instance."""
757
    return opcodes.OpInstanceStartup(instance_name=instance, force=False)
758

    
759
  @staticmethod
760
  def RenameInstanceOp(instance, instance_new):
761
    """Rename instance."""
762
    return opcodes.OpInstanceRename(instance_name=instance,
763
                                    new_name=instance_new)
764

    
765
  @_DoCheckInstances
766
  @_DoBatch(True)
767
  def BurnStopStart(self):
768
    """Stop/start the instances."""
769
    Log("Stopping and starting instances")
770
    for instance in self.instances:
771
      Log("instance %s", instance, indent=1)
772
      op1 = self.StopInstanceOp(instance)
773
      op2 = self.StartInstanceOp(instance)
774
      self.ExecOrQueue(instance, [op1, op2])
775

    
776
  @_DoBatch(False)
777
  def BurnRemove(self):
778
    """Remove the instances."""
779
    Log("Removing instances")
780
    for instance in self.to_rem:
781
      Log("instance %s", instance, indent=1)
782
      op = opcodes.OpInstanceRemove(instance_name=instance,
783
                                    ignore_failures=True)
784
      self.ExecOrQueue(instance, [op])
785

    
786
  def BurnRename(self):
787
    """Rename the instances.
788

    
789
    Note that this function will not execute in parallel, since we
790
    only have one target for rename.
791

    
792
    """
793
    Log("Renaming instances")
794
    rename = self.opts.rename
795
    for instance in self.instances:
796
      Log("instance %s", instance, indent=1)
797
      op_stop1 = self.StopInstanceOp(instance)
798
      op_stop2 = self.StopInstanceOp(rename)
799
      op_rename1 = self.RenameInstanceOp(instance, rename)
800
      op_rename2 = self.RenameInstanceOp(rename, instance)
801
      op_start1 = self.StartInstanceOp(rename)
802
      op_start2 = self.StartInstanceOp(instance)
803
      self.ExecOp(False, op_stop1, op_rename1, op_start1)
804
      self._CheckInstanceAlive(rename)
805
      self.ExecOp(False, op_stop2, op_rename2, op_start2)
806
      self._CheckInstanceAlive(instance)
807

    
808
  @_DoCheckInstances
809
  @_DoBatch(True)
810
  def BurnReinstall(self):
811
    """Reinstall the instances."""
812
    Log("Reinstalling instances")
813
    for instance in self.instances:
814
      Log("instance %s", instance, indent=1)
815
      op1 = self.StopInstanceOp(instance)
816
      op2 = opcodes.OpInstanceReinstall(instance_name=instance)
817
      Log("reinstall without passing the OS", indent=2)
818
      op3 = opcodes.OpInstanceReinstall(instance_name=instance,
819
                                        os_type=self.opts.os)
820
      Log("reinstall specifying the OS", indent=2)
821
      op4 = self.StartInstanceOp(instance)
822
      self.ExecOrQueue(instance, [op1, op2, op3, op4])
823

    
824
  @_DoCheckInstances
825
  @_DoBatch(True)
826
  def BurnReboot(self):
827
    """Reboot the instances."""
828
    Log("Rebooting instances")
829
    for instance in self.instances:
830
      Log("instance %s", instance, indent=1)
831
      ops = []
832
      for reboot_type in self.opts.reboot_types:
833
        op = opcodes.OpInstanceReboot(instance_name=instance,
834
                                      reboot_type=reboot_type,
835
                                      ignore_secondaries=False)
836
        Log("reboot with type '%s'", reboot_type, indent=2)
837
        ops.append(op)
838
      self.ExecOrQueue(instance, ops)
839

    
840
  @_DoCheckInstances
841
  @_DoBatch(True)
842
  def BurnActivateDisks(self):
843
    """Activate and deactivate disks of the instances."""
844
    Log("Activating/deactivating disks")
845
    for instance in self.instances:
846
      Log("instance %s", instance, indent=1)
847
      op_start = self.StartInstanceOp(instance)
848
      op_act = opcodes.OpInstanceActivateDisks(instance_name=instance)
849
      op_deact = opcodes.OpInstanceDeactivateDisks(instance_name=instance)
850
      op_stop = self.StopInstanceOp(instance)
851
      Log("activate disks when online", indent=2)
852
      Log("activate disks when offline", indent=2)
853
      Log("deactivate disks (when offline)", indent=2)
854
      self.ExecOrQueue(instance, [op_act, op_stop, op_act, op_deact, op_start])
855

    
856
  @_DoCheckInstances
857
  @_DoBatch(False)
858
  def BurnAddRemoveDisks(self):
859
    """Add and remove an extra disk for the instances."""
860
    Log("Adding and removing disks")
861
    for instance in self.instances:
862
      Log("instance %s", instance, indent=1)
863
      op_add = opcodes.OpInstanceSetParams(\
864
        instance_name=instance,
865
        disks=[(constants.DDM_ADD, {"size": self.disk_size[0]})])
866
      op_rem = opcodes.OpInstanceSetParams(\
867
        instance_name=instance, disks=[(constants.DDM_REMOVE, {})])
868
      op_stop = self.StopInstanceOp(instance)
869
      op_start = self.StartInstanceOp(instance)
870
      Log("adding a disk", indent=2)
871
      Log("removing last disk", indent=2)
872
      self.ExecOrQueue(instance, [op_add, op_stop, op_rem, op_start])
873

    
874
  @_DoBatch(False)
875
  def BurnAddRemoveNICs(self):
876
    """Add and remove an extra NIC for the instances."""
877
    Log("Adding and removing NICs")
878
    for instance in self.instances:
879
      Log("instance %s", instance, indent=1)
880
      op_add = opcodes.OpInstanceSetParams(\
881
        instance_name=instance, nics=[(constants.DDM_ADD, {})])
882
      op_rem = opcodes.OpInstanceSetParams(\
883
        instance_name=instance, nics=[(constants.DDM_REMOVE, {})])
884
      Log("adding a NIC", indent=2)
885
      Log("removing last NIC", indent=2)
886
      self.ExecOrQueue(instance, [op_add, op_rem])
887

    
888
  def ConfdCallback(self, reply):
889
    """Callback for confd queries"""
890
    if reply.type == confd_client.UPCALL_REPLY:
891
      if reply.server_reply.status != constants.CONFD_REPL_STATUS_OK:
892
        Err("Query %s gave non-ok status %s: %s" % (reply.orig_request,
893
                                                    reply.server_reply.status,
894
                                                    reply.server_reply))
895
      if reply.orig_request.type == constants.CONFD_REQ_PING:
896
        Log("Ping: OK", indent=1)
897
      elif reply.orig_request.type == constants.CONFD_REQ_CLUSTER_MASTER:
898
        if reply.server_reply.answer == self.cluster_info["master"]:
899
          Log("Master: OK", indent=1)
900
        else:
901
          Err("Master: wrong: %s" % reply.server_reply.answer)
902
      elif reply.orig_request.type == constants.CONFD_REQ_NODE_ROLE_BYNAME:
903
        if reply.server_reply.answer == constants.CONFD_NODE_ROLE_MASTER:
904
          Log("Node role for master: OK", indent=1)
905
        else:
906
          Err("Node role for master: wrong: %s" % reply.server_reply.answer)
907

    
908
  def DoConfdRequestReply(self, req):
909
    self.confd_counting_callback.RegisterQuery(req.rsalt)
910
    self.confd_client.SendRequest(req, async=False)
911
    while not self.confd_counting_callback.AllAnswered():
912
      if not self.confd_client.ReceiveReply():
913
        Err("Did not receive all expected confd replies")
914
        break
915

    
916
  def BurnConfd(self):
917
    """Run confd queries for our instances.
918

    
919
    The following confd queries are tested:
920
      - CONFD_REQ_PING: simple ping
921
      - CONFD_REQ_CLUSTER_MASTER: cluster master
922
      - CONFD_REQ_NODE_ROLE_BYNAME: node role, for the master
923

    
924
    """
925
    Log("Checking confd results")
926

    
927
    filter_callback = confd_client.ConfdFilterCallback(self.ConfdCallback)
928
    counting_callback = confd_client.ConfdCountingCallback(filter_callback)
929
    self.confd_counting_callback = counting_callback
930

    
931
    self.confd_client = confd_client.GetConfdClient(counting_callback)
932

    
933
    req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
934
    self.DoConfdRequestReply(req)
935

    
936
    req = confd_client.ConfdClientRequest(
937
      type=constants.CONFD_REQ_CLUSTER_MASTER)
938
    self.DoConfdRequestReply(req)
939

    
940
    req = confd_client.ConfdClientRequest(
941
        type=constants.CONFD_REQ_NODE_ROLE_BYNAME,
942
        query=self.cluster_info["master"])
943
    self.DoConfdRequestReply(req)
944

    
945
  def _CheckInstanceAlive(self, instance):
946
    """Check if an instance is alive by doing http checks.
947

    
948
    This will try to retrieve the url on the instance /hostname.txt
949
    and check that it contains the hostname of the instance. In case
950
    we get ECONNREFUSED, we retry up to the net timeout seconds, for
951
    any other error we abort.
952

    
953
    """
954
    if not self.opts.http_check:
955
      return
956
    end_time = time.time() + self.opts.net_timeout
957
    url = None
958
    while time.time() < end_time and url is None:
959
      try:
960
        url = self.url_opener.open("http://%s/hostname.txt" % instance)
961
      except IOError:
962
        # here we can have connection refused, no route to host, etc.
963
        time.sleep(1)
964
    if url is None:
965
      raise InstanceDown(instance, "Cannot contact instance")
966
    hostname = url.read().strip()
967
    url.close()
968
    if hostname != instance:
969
      raise InstanceDown(instance, ("Hostname mismatch, expected %s, got %s" %
970
                                    (instance, hostname)))
971

    
972
  def BurninCluster(self):
973
    """Test a cluster intensively.
974

    
975
    This will create instances and then start/stop/failover them.
976
    It is safe for existing instances but could impact performance.
977

    
978
    """
979

    
980
    opts = self.opts
981

    
982
    Log("Testing global parameters")
983

    
984
    if (len(self.nodes) == 1 and
985
        opts.disk_template not in (constants.DT_DISKLESS, constants.DT_PLAIN,
986
                                   constants.DT_FILE)):
987
      Err("When one node is available/selected the disk template must"
988
          " be 'diskless', 'file' or 'plain'")
989

    
990
    has_err = True
991
    try:
992
      self.BurnCreateInstances()
993
      if opts.do_replace1 and opts.disk_template in constants.DTS_NET_MIRROR:
994
        self.BurnReplaceDisks1D8()
995
      if (opts.do_replace2 and len(self.nodes) > 2 and
996
          opts.disk_template in constants.DTS_NET_MIRROR) :
997
        self.BurnReplaceDisks2()
998

    
999
      if (opts.disk_template in constants.DTS_GROWABLE and
1000
          compat.any(n > 0 for n in self.disk_growth)):
1001
        self.BurnGrowDisks()
1002

    
1003
      if opts.do_failover and opts.disk_template in constants.DTS_NET_MIRROR:
1004
        self.BurnFailover()
1005

    
1006
      if opts.do_migrate:
1007
        if opts.disk_template != constants.DT_DRBD8:
1008
          Log("Skipping migration (disk template not DRBD8)")
1009
        elif not self.hv_class.CAN_MIGRATE:
1010
          Log("Skipping migration (hypervisor %s does not support it)",
1011
              self.hypervisor)
1012
        else:
1013
          self.BurnMigrate()
1014

    
1015
      if (opts.do_move and len(self.nodes) > 1 and
1016
          opts.disk_template in [constants.DT_PLAIN, constants.DT_FILE]):
1017
        self.BurnMove()
1018

    
1019
      if (opts.do_importexport and
1020
          opts.disk_template not in (constants.DT_DISKLESS,
1021
                                     constants.DT_FILE)):
1022
        self.BurnImportExport()
1023

    
1024
      if opts.do_reinstall:
1025
        self.BurnReinstall()
1026

    
1027
      if opts.do_reboot:
1028
        self.BurnReboot()
1029

    
1030
      if opts.do_addremove_disks:
1031
        self.BurnAddRemoveDisks()
1032

    
1033
      default_nic_mode = self.cluster_default_nicparams[constants.NIC_MODE]
1034
      # Don't add/remove nics in routed mode, as we would need an ip to add
1035
      # them with
1036
      if opts.do_addremove_nics:
1037
        if default_nic_mode == constants.NIC_MODE_BRIDGED:
1038
          self.BurnAddRemoveNICs()
1039
        else:
1040
          Log("Skipping nic add/remove as the cluster is not in bridged mode")
1041

    
1042
      if opts.do_activate_disks:
1043
        self.BurnActivateDisks()
1044

    
1045
      if opts.rename:
1046
        self.BurnRename()
1047

    
1048
      if opts.do_confd_tests:
1049
        self.BurnConfd()
1050

    
1051
      if opts.do_startstop:
1052
        self.BurnStopStart()
1053

    
1054
      has_err = False
1055
    finally:
1056
      if has_err:
1057
        Log("Error detected: opcode buffer follows:\n\n")
1058
        Log(self.GetFeedbackBuf())
1059
        Log("\n\n")
1060
      if not self.opts.keep_instances:
1061
        try:
1062
          self.BurnRemove()
1063
        except Exception, err:  # pylint: disable-msg=W0703
1064
          if has_err: # already detected errors, so errors in removal
1065
                      # are quite expected
1066
            Log("Note: error detected during instance remove: %s", err)
1067
          else: # non-expected error
1068
            raise
1069

    
1070
    return constants.EXIT_SUCCESS
1071

    
1072

    
1073
def main():
1074
  """Main function.
1075

    
1076
  """
1077
  utils.SetupLogging(constants.LOG_BURNIN, sys.argv[0],
1078
                     debug=False, stderr_logging=True)
1079

    
1080
  return Burner().BurninCluster()
1081

    
1082

    
1083
if __name__ == "__main__":
1084
  main()