Statistics
| Branch: | Tag: | Revision:

root / tools / burnin @ 3f17ef02

History | View | Annotate | Download (38.4 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Burnin program
23

    
24
"""
25

    
26
import sys
27
import optparse
28
import time
29
import socket
30
import urllib
31
from itertools import izip, islice, cycle
32
from cStringIO import StringIO
33

    
34
from ganeti import opcodes
35
from ganeti import constants
36
from ganeti import cli
37
from ganeti import errors
38
from ganeti import utils
39
from ganeti import hypervisor
40
from ganeti import compat
41

    
42
from ganeti.confd import client as confd_client
43

    
44

    
45
USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...")
46

    
47
MAX_RETRIES = 3
48
LOG_HEADERS = {
49
  0: "- ",
50
  1: "* ",
51
  2: ""
52
  }
53

    
54
class InstanceDown(Exception):
55
  """The checked instance was not up"""
56

    
57

    
58
class BurninFailure(Exception):
59
  """Failure detected during burning"""
60

    
61

    
62
def Usage():
63
  """Shows program usage information and exits the program."""
64

    
65
  print >> sys.stderr, "Usage:"
66
  print >> sys.stderr, USAGE
67
  sys.exit(2)
68

    
69

    
70
def Log(msg, *args, **kwargs):
71
  """Simple function that prints out its argument.
72

    
73
  """
74
  if args:
75
    msg = msg % args
76
  indent = kwargs.get('indent', 0)
77
  sys.stdout.write("%*s%s%s\n" % (2*indent, "",
78
                                  LOG_HEADERS.get(indent, "  "), msg))
79
  sys.stdout.flush()
80

    
81

    
82
def Err(msg, exit_code=1):
83
  """Simple error logging that prints to stderr.
84

    
85
  """
86
  sys.stderr.write(msg + "\n")
87
  sys.stderr.flush()
88
  sys.exit(exit_code)
89

    
90

    
91
class SimpleOpener(urllib.FancyURLopener):
92
  """A simple url opener"""
93
  # pylint: disable-msg=W0221
94

    
95
  def prompt_user_passwd(self, host, realm, clear_cache=0):
96
    """No-interaction version of prompt_user_passwd."""
97
    # we follow parent class' API
98
    # pylint: disable-msg=W0613
99
    return None, None
100

    
101
  def http_error_default(self, url, fp, errcode, errmsg, headers):
102
    """Custom error handling"""
103
    # make sure sockets are not left in CLOSE_WAIT, this is similar
104
    # but with a different exception to the BasicURLOpener class
105
    _ = fp.read() # throw away data
106
    fp.close()
107
    raise InstanceDown("HTTP error returned: code %s, msg %s" %
108
                       (errcode, errmsg))
109

    
110

    
111
OPTIONS = [
112
  cli.cli_option("-o", "--os", dest="os", default=None,
113
                 help="OS to use during burnin",
114
                 metavar="<OS>",
115
                 completion_suggest=cli.OPT_COMPL_ONE_OS),
116
  cli.HYPERVISOR_OPT,
117
  cli.OSPARAMS_OPT,
118
  cli.cli_option("--disk-size", dest="disk_size",
119
                 help="Disk size (determines disk count)",
120
                 default="128m", type="string", metavar="<size,size,...>",
121
                 completion_suggest=("128M 512M 1G 4G 1G,256M"
122
                                     " 4G,1G,1G 10G").split()),
123
  cli.cli_option("--disk-growth", dest="disk_growth", help="Disk growth",
124
                 default="128m", type="string", metavar="<size,size,...>"),
125
  cli.cli_option("--mem-size", dest="mem_size", help="Memory size",
126
                 default=128, type="unit", metavar="<size>",
127
                 completion_suggest=("128M 256M 512M 1G 4G 8G"
128
                                     " 12G 16G").split()),
129
  cli.DEBUG_OPT,
130
  cli.VERBOSE_OPT,
131
  cli.NOIPCHECK_OPT,
132
  cli.NONAMECHECK_OPT,
133
  cli.EARLY_RELEASE_OPT,
134
  cli.cli_option("--no-replace1", dest="do_replace1",
135
                 help="Skip disk replacement with the same secondary",
136
                 action="store_false", default=True),
137
  cli.cli_option("--no-replace2", dest="do_replace2",
138
                 help="Skip disk replacement with a different secondary",
139
                 action="store_false", default=True),
140
  cli.cli_option("--no-failover", dest="do_failover",
141
                 help="Skip instance failovers", action="store_false",
142
                 default=True),
143
  cli.cli_option("--no-migrate", dest="do_migrate",
144
                 help="Skip instance live migration",
145
                 action="store_false", default=True),
146
  cli.cli_option("--no-move", dest="do_move",
147
                 help="Skip instance moves", action="store_false",
148
                 default=True),
149
  cli.cli_option("--no-importexport", dest="do_importexport",
150
                 help="Skip instance export/import", action="store_false",
151
                 default=True),
152
  cli.cli_option("--no-startstop", dest="do_startstop",
153
                 help="Skip instance stop/start", action="store_false",
154
                 default=True),
155
  cli.cli_option("--no-reinstall", dest="do_reinstall",
156
                 help="Skip instance reinstall", action="store_false",
157
                 default=True),
158
  cli.cli_option("--no-reboot", dest="do_reboot",
159
                 help="Skip instance reboot", action="store_false",
160
                 default=True),
161
  cli.cli_option("--reboot-types", dest="reboot_types",
162
                 help="Specify the reboot types", default=None),
163
  cli.cli_option("--no-activate-disks", dest="do_activate_disks",
164
                 help="Skip disk activation/deactivation",
165
                 action="store_false", default=True),
166
  cli.cli_option("--no-add-disks", dest="do_addremove_disks",
167
                 help="Skip disk addition/removal",
168
                 action="store_false", default=True),
169
  cli.cli_option("--no-add-nics", dest="do_addremove_nics",
170
                 help="Skip NIC addition/removal",
171
                 action="store_false", default=True),
172
  cli.cli_option("--no-nics", dest="nics",
173
                 help="No network interfaces", action="store_const",
174
                 const=[], default=[{}]),
175
  cli.cli_option("--no-confd", dest="do_confd_tests",
176
                 help="Skip confd queries",
177
                 action="store_false", default=True),
178
  cli.cli_option("--rename", dest="rename", default=None,
179
                 help=("Give one unused instance name which is taken"
180
                       " to start the renaming sequence"),
181
                 metavar="<instance_name>"),
182
  cli.cli_option("-t", "--disk-template", dest="disk_template",
183
                 choices=list(constants.DISK_TEMPLATES),
184
                 default=constants.DT_DRBD8,
185
                 help="Disk template (diskless, file, plain or drbd) [drbd]"),
186
  cli.cli_option("-n", "--nodes", dest="nodes", default="",
187
                 help=("Comma separated list of nodes to perform"
188
                       " the burnin on (defaults to all nodes)"),
189
                 completion_suggest=cli.OPT_COMPL_MANY_NODES),
190
  cli.cli_option("-I", "--iallocator", dest="iallocator",
191
                 default=None, type="string",
192
                 help=("Perform the allocation using an iallocator"
193
                       " instead of fixed node spread (node restrictions no"
194
                       " longer apply, therefore -n/--nodes must not be"
195
                       " used"),
196
                 completion_suggest=cli.OPT_COMPL_ONE_IALLOCATOR),
197
  cli.cli_option("-p", "--parallel", default=False, action="store_true",
198
                 dest="parallel",
199
                 help=("Enable parallelization of some operations in"
200
                       " order to speed burnin or to test granular locking")),
201
  cli.cli_option("--net-timeout", default=15, type="int",
202
                 dest="net_timeout",
203
                 help=("The instance check network timeout in seconds"
204
                       " (defaults to 15 seconds)"),
205
                 completion_suggest="15 60 300 900".split()),
206
  cli.cli_option("-C", "--http-check", default=False, action="store_true",
207
                 dest="http_check",
208
                 help=("Enable checking of instance status via http,"
209
                       " looking for /hostname.txt that should contain the"
210
                       " name of the instance")),
211
  cli.cli_option("-K", "--keep-instances", default=False,
212
                 action="store_true",
213
                 dest="keep_instances",
214
                 help=("Leave instances on the cluster after burnin,"
215
                       " for investigation in case of errors or simply"
216
                       " to use them")),
217
  ]
218

    
219
# Mainly used for bash completion
220
ARGUMENTS = [cli.ArgInstance(min=1)]
221

    
222

    
223
def _DoCheckInstances(fn):
224
  """Decorator for checking instances.
225

    
226
  """
227
  def wrapper(self, *args, **kwargs):
228
    val = fn(self, *args, **kwargs)
229
    for instance in self.instances:
230
      self._CheckInstanceAlive(instance) # pylint: disable-msg=W0212
231
    return val
232

    
233
  return wrapper
234

    
235

    
236
def _DoBatch(retry):
237
  """Decorator for possible batch operations.
238

    
239
  Must come after the _DoCheckInstances decorator (if any).
240

    
241
  @param retry: whether this is a retryable batch, will be
242
      passed to StartBatch
243

    
244
  """
245
  def wrap(fn):
246
    def batched(self, *args, **kwargs):
247
      self.StartBatch(retry)
248
      val = fn(self, *args, **kwargs)
249
      self.CommitQueue()
250
      return val
251
    return batched
252

    
253
  return wrap
254

    
255

    
256
class Burner(object):
257
  """Burner class."""
258

    
259
  def __init__(self):
260
    """Constructor."""
261
    self.url_opener = SimpleOpener()
262
    self._feed_buf = StringIO()
263
    self.nodes = []
264
    self.instances = []
265
    self.to_rem = []
266
    self.queued_ops = []
267
    self.opts = None
268
    self.queue_retry = False
269
    self.disk_count = self.disk_growth = self.disk_size = None
270
    self.hvp = self.bep = None
271
    self.ParseOptions()
272
    self.cl = cli.GetClient()
273
    self.GetState()
274

    
275
  def ClearFeedbackBuf(self):
276
    """Clear the feedback buffer."""
277
    self._feed_buf.truncate(0)
278

    
279
  def GetFeedbackBuf(self):
280
    """Return the contents of the buffer."""
281
    return self._feed_buf.getvalue()
282

    
283
  def Feedback(self, msg):
284
    """Acumulate feedback in our buffer."""
285
    formatted_msg = "%s %s" % (time.ctime(utils.MergeTime(msg[0])), msg[2])
286
    self._feed_buf.write(formatted_msg + "\n")
287
    if self.opts.verbose:
288
      Log(formatted_msg, indent=3)
289

    
290
  def MaybeRetry(self, retry_count, msg, fn, *args):
291
    """Possibly retry a given function execution.
292

    
293
    @type retry_count: int
294
    @param retry_count: retry counter:
295
        - 0: non-retryable action
296
        - 1: last retry for a retryable action
297
        - MAX_RETRIES: original try for a retryable action
298
    @type msg: str
299
    @param msg: the kind of the operation
300
    @type fn: callable
301
    @param fn: the function to be called
302

    
303
    """
304
    try:
305
      val = fn(*args)
306
      if retry_count > 0 and retry_count < MAX_RETRIES:
307
        Log("Idempotent %s succeeded after %d retries",
308
            msg, MAX_RETRIES - retry_count)
309
      return val
310
    except Exception, err: # pylint: disable-msg=W0703
311
      if retry_count == 0:
312
        Log("Non-idempotent %s failed, aborting", msg)
313
        raise
314
      elif retry_count == 1:
315
        Log("Idempotent %s repeated failure, aborting", msg)
316
        raise
317
      else:
318
        Log("Idempotent %s failed, retry #%d/%d: %s",
319
            msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err)
320
        self.MaybeRetry(retry_count - 1, msg, fn, *args)
321

    
322
  def _ExecOp(self, *ops):
323
    """Execute one or more opcodes and manage the exec buffer.
324

    
325
    @return: if only opcode has been passed, we return its result;
326
        otherwise we return the list of results
327

    
328
    """
329
    job_id = cli.SendJob(ops, cl=self.cl)
330
    results = cli.PollJob(job_id, cl=self.cl, feedback_fn=self.Feedback)
331
    if len(ops) == 1:
332
      return results[0]
333
    else:
334
      return results
335

    
336
  def ExecOp(self, retry, *ops):
337
    """Execute one or more opcodes and manage the exec buffer.
338

    
339
    @return: if only opcode has been passed, we return its result;
340
        otherwise we return the list of results
341

    
342
    """
343
    if retry:
344
      rval = MAX_RETRIES
345
    else:
346
      rval = 0
347
    cli.SetGenericOpcodeOpts(ops, self.opts)
348
    return self.MaybeRetry(rval, "opcode", self._ExecOp, *ops)
349

    
350
  def ExecOrQueue(self, name, ops, post_process=None):
351
    """Execute an opcode and manage the exec buffer."""
352
    if self.opts.parallel:
353
      cli.SetGenericOpcodeOpts(ops, self.opts)
354
      self.queued_ops.append((ops, name, post_process))
355
    else:
356
      val = self.ExecOp(self.queue_retry, *ops) # pylint: disable-msg=W0142
357
      if post_process is not None:
358
        post_process()
359
      return val
360

    
361
  def StartBatch(self, retry):
362
    """Start a new batch of jobs.
363

    
364
    @param retry: whether this is a retryable batch
365

    
366
    """
367
    self.queued_ops = []
368
    self.queue_retry = retry
369

    
370
  def CommitQueue(self):
371
    """Execute all submitted opcodes in case of parallel burnin"""
372
    if not self.opts.parallel or not self.queued_ops:
373
      return
374

    
375
    if self.queue_retry:
376
      rval = MAX_RETRIES
377
    else:
378
      rval = 0
379

    
380
    try:
381
      results = self.MaybeRetry(rval, "jobset", self.ExecJobSet,
382
                                self.queued_ops)
383
    finally:
384
      self.queued_ops = []
385
    return results
386

    
387
  def ExecJobSet(self, jobs):
388
    """Execute a set of jobs and return once all are done.
389

    
390
    The method will return the list of results, if all jobs are
391
    successful. Otherwise, OpExecError will be raised from within
392
    cli.py.
393

    
394
    """
395
    self.ClearFeedbackBuf()
396
    jex = cli.JobExecutor(cl=self.cl, feedback_fn=self.Feedback)
397
    for ops, name, _ in jobs:
398
      jex.QueueJob(name, *ops) # pylint: disable-msg=W0142
399
    try:
400
      results = jex.GetResults()
401
    except Exception, err: # pylint: disable-msg=W0703
402
      Log("Jobs failed: %s", err)
403
      raise BurninFailure()
404

    
405
    fail = False
406
    val = []
407
    for (_, name, post_process), (success, result) in zip(jobs, results):
408
      if success:
409
        if post_process:
410
          try:
411
            post_process()
412
          except Exception, err: # pylint: disable-msg=W0703
413
            Log("Post process call for job %s failed: %s", name, err)
414
            fail = True
415
        val.append(result)
416
      else:
417
        fail = True
418

    
419
    if fail:
420
      raise BurninFailure()
421

    
422
    return val
423

    
424
  def ParseOptions(self):
425
    """Parses the command line options.
426

    
427
    In case of command line errors, it will show the usage and exit the
428
    program.
429

    
430
    """
431
    parser = optparse.OptionParser(usage="\n%s" % USAGE,
432
                                   version=("%%prog (ganeti) %s" %
433
                                            constants.RELEASE_VERSION),
434
                                   option_list=OPTIONS)
435

    
436
    options, args = parser.parse_args()
437
    if len(args) < 1 or options.os is None:
438
      Usage()
439

    
440
    supported_disk_templates = (constants.DT_DISKLESS,
441
                                constants.DT_FILE,
442
                                constants.DT_PLAIN,
443
                                constants.DT_DRBD8)
444
    if options.disk_template not in supported_disk_templates:
445
      Err("Unknown disk template '%s'" % options.disk_template)
446

    
447
    if options.disk_template == constants.DT_DISKLESS:
448
      disk_size = disk_growth = []
449
      options.do_addremove_disks = False
450
    else:
451
      disk_size = [utils.ParseUnit(v) for v in options.disk_size.split(",")]
452
      disk_growth = [utils.ParseUnit(v)
453
                     for v in options.disk_growth.split(",")]
454
      if len(disk_growth) != len(disk_size):
455
        Err("Wrong disk sizes/growth combination")
456
    if ((disk_size and options.disk_template == constants.DT_DISKLESS) or
457
        (not disk_size and options.disk_template != constants.DT_DISKLESS)):
458
      Err("Wrong disk count/disk template combination")
459

    
460
    self.disk_size = disk_size
461
    self.disk_growth = disk_growth
462
    self.disk_count = len(disk_size)
463

    
464
    if options.nodes and options.iallocator:
465
      Err("Give either the nodes option or the iallocator option, not both")
466

    
467
    if options.http_check and not options.name_check:
468
      Err("Can't enable HTTP checks without name checks")
469

    
470
    self.opts = options
471
    self.instances = args
472
    self.bep = {
473
      constants.BE_MEMORY: options.mem_size,
474
      constants.BE_VCPUS: 1,
475
      }
476

    
477
    self.hypervisor = None
478
    self.hvp = {}
479
    if options.hypervisor:
480
      self.hypervisor, self.hvp = options.hypervisor
481

    
482
    if options.reboot_types is None:
483
      options.reboot_types = constants.REBOOT_TYPES
484
    else:
485
      options.reboot_types = options.reboot_types.split(",")
486
      rt_diff = set(options.reboot_types).difference(constants.REBOOT_TYPES)
487
      if rt_diff:
488
        Err("Invalid reboot types specified: %s" % utils.CommaJoin(rt_diff))
489

    
490
    socket.setdefaulttimeout(options.net_timeout)
491

    
492
  def GetState(self):
493
    """Read the cluster state from the master daemon."""
494
    if self.opts.nodes:
495
      names = self.opts.nodes.split(",")
496
    else:
497
      names = []
498
    try:
499
      op = opcodes.OpNodeQuery(output_fields=["name", "offline", "drained"],
500
                               names=names, use_locking=True)
501
      result = self.ExecOp(True, op)
502
    except errors.GenericError, err:
503
      err_code, msg = cli.FormatError(err)
504
      Err(msg, exit_code=err_code)
505
    self.nodes = [data[0] for data in result if not (data[1] or data[2])]
506

    
507
    op_diagnose = opcodes.OpOsDiagnose(output_fields=["name",
508
                                                      "variants",
509
                                                      "hidden"],
510
                                       names=[])
511
    result = self.ExecOp(True, op_diagnose)
512

    
513
    if not result:
514
      Err("Can't get the OS list")
515

    
516
    found = False
517
    for (name, variants, _) in result:
518
      if self.opts.os in cli.CalculateOSNames(name, variants):
519
        found = True
520
        break
521

    
522
    if not found:
523
      Err("OS '%s' not found" % self.opts.os)
524

    
525
    cluster_info = self.cl.QueryClusterInfo()
526
    self.cluster_info = cluster_info
527
    if not self.cluster_info:
528
      Err("Can't get cluster info")
529

    
530
    default_nic_params = self.cluster_info["nicparams"][constants.PP_DEFAULT]
531
    self.cluster_default_nicparams = default_nic_params
532
    if self.hypervisor is None:
533
      self.hypervisor = self.cluster_info["default_hypervisor"]
534
    self.hv_class = hypervisor.GetHypervisorClass(self.hypervisor)
535

    
536
  @_DoCheckInstances
537
  @_DoBatch(False)
538
  def BurnCreateInstances(self):
539
    """Create the given instances.
540

    
541
    """
542
    self.to_rem = []
543
    mytor = izip(cycle(self.nodes),
544
                 islice(cycle(self.nodes), 1, None),
545
                 self.instances)
546

    
547
    Log("Creating instances")
548
    for pnode, snode, instance in mytor:
549
      Log("instance %s", instance, indent=1)
550
      if self.opts.iallocator:
551
        pnode = snode = None
552
        msg = "with iallocator %s" % self.opts.iallocator
553
      elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
554
        snode = None
555
        msg = "on %s" % pnode
556
      else:
557
        msg = "on %s, %s" % (pnode, snode)
558

    
559
      Log(msg, indent=2)
560

    
561
      op = opcodes.OpInstanceCreate(instance_name=instance,
562
                                    disks = [ {"size": size}
563
                                              for size in self.disk_size],
564
                                    disk_template=self.opts.disk_template,
565
                                    nics=self.opts.nics,
566
                                    mode=constants.INSTANCE_CREATE,
567
                                    os_type=self.opts.os,
568
                                    pnode=pnode,
569
                                    snode=snode,
570
                                    start=True,
571
                                    ip_check=self.opts.ip_check,
572
                                    name_check=self.opts.name_check,
573
                                    wait_for_sync=True,
574
                                    file_driver="loop",
575
                                    file_storage_dir=None,
576
                                    iallocator=self.opts.iallocator,
577
                                    beparams=self.bep,
578
                                    hvparams=self.hvp,
579
                                    hypervisor=self.hypervisor,
580
                                    osparams=self.opts.osparams,
581
                                    )
582
      remove_instance = lambda name: lambda: self.to_rem.append(name)
583
      self.ExecOrQueue(instance, [op], post_process=remove_instance(instance))
584

    
585
  @_DoBatch(False)
586
  def BurnGrowDisks(self):
587
    """Grow both the os and the swap disks by the requested amount, if any."""
588
    Log("Growing disks")
589
    for instance in self.instances:
590
      Log("instance %s", instance, indent=1)
591
      for idx, growth in enumerate(self.disk_growth):
592
        if growth > 0:
593
          op = opcodes.OpInstanceGrowDisk(instance_name=instance, disk=idx,
594
                                          amount=growth, wait_for_sync=True)
595
          Log("increase disk/%s by %s MB", idx, growth, indent=2)
596
          self.ExecOrQueue(instance, [op])
597

    
598
  @_DoBatch(True)
599
  def BurnReplaceDisks1D8(self):
600
    """Replace disks on primary and secondary for drbd8."""
601
    Log("Replacing disks on the same nodes")
602
    early_release = self.opts.early_release
603
    for instance in self.instances:
604
      Log("instance %s", instance, indent=1)
605
      ops = []
606
      for mode in constants.REPLACE_DISK_SEC, constants.REPLACE_DISK_PRI:
607
        op = opcodes.OpInstanceReplaceDisks(instance_name=instance,
608
                                            mode=mode,
609
                                            disks=list(range(self.disk_count)),
610
                                            early_release=early_release)
611
        Log("run %s", mode, indent=2)
612
        ops.append(op)
613
      self.ExecOrQueue(instance, ops)
614

    
615
  @_DoBatch(True)
616
  def BurnReplaceDisks2(self):
617
    """Replace secondary node."""
618
    Log("Changing the secondary node")
619
    mode = constants.REPLACE_DISK_CHG
620

    
621
    mytor = izip(islice(cycle(self.nodes), 2, None),
622
                 self.instances)
623
    for tnode, instance in mytor:
624
      Log("instance %s", instance, indent=1)
625
      if self.opts.iallocator:
626
        tnode = None
627
        msg = "with iallocator %s" % self.opts.iallocator
628
      else:
629
        msg = tnode
630
      op = opcodes.OpInstanceReplaceDisks(instance_name=instance,
631
                                          mode=mode,
632
                                          remote_node=tnode,
633
                                          iallocator=self.opts.iallocator,
634
                                          disks=[],
635
                                          early_release=self.opts.early_release)
636
      Log("run %s %s", mode, msg, indent=2)
637
      self.ExecOrQueue(instance, [op])
638

    
639
  @_DoCheckInstances
640
  @_DoBatch(False)
641
  def BurnFailover(self):
642
    """Failover the instances."""
643
    Log("Failing over instances")
644
    for instance in self.instances:
645
      Log("instance %s", instance, indent=1)
646
      op = opcodes.OpInstanceFailover(instance_name=instance,
647
                                      ignore_consistency=False)
648
      self.ExecOrQueue(instance, [op])
649

    
650
  @_DoCheckInstances
651
  @_DoBatch(False)
652
  def BurnMove(self):
653
    """Move the instances."""
654
    Log("Moving instances")
655
    mytor = izip(islice(cycle(self.nodes), 1, None),
656
                 self.instances)
657
    for tnode, instance in mytor:
658
      Log("instance %s", instance, indent=1)
659
      op = opcodes.OpInstanceMove(instance_name=instance,
660
                                  target_node=tnode)
661
      self.ExecOrQueue(instance, [op])
662

    
663
  @_DoBatch(False)
664
  def BurnMigrate(self):
665
    """Migrate the instances."""
666
    Log("Migrating instances")
667
    for instance in self.instances:
668
      Log("instance %s", instance, indent=1)
669
      op1 = opcodes.OpInstanceMigrate(instance_name=instance, mode=None,
670
                                      cleanup=False)
671

    
672
      op2 = opcodes.OpInstanceMigrate(instance_name=instance, mode=None,
673
                                      cleanup=True)
674
      Log("migration and migration cleanup", indent=2)
675
      self.ExecOrQueue(instance, [op1, op2])
676

    
677
  @_DoCheckInstances
678
  @_DoBatch(False)
679
  def BurnImportExport(self):
680
    """Export the instance, delete it, and import it back.
681

    
682
    """
683
    Log("Exporting and re-importing instances")
684
    mytor = izip(cycle(self.nodes),
685
                 islice(cycle(self.nodes), 1, None),
686
                 islice(cycle(self.nodes), 2, None),
687
                 self.instances)
688

    
689
    for pnode, snode, enode, instance in mytor:
690
      Log("instance %s", instance, indent=1)
691
      # read the full name of the instance
692
      nam_op = opcodes.OpInstanceQuery(output_fields=["name"],
693
                                       names=[instance], use_locking=True)
694
      full_name = self.ExecOp(False, nam_op)[0][0]
695

    
696
      if self.opts.iallocator:
697
        pnode = snode = None
698
        import_log_msg = ("import from %s"
699
                          " with iallocator %s" %
700
                          (enode, self.opts.iallocator))
701
      elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
702
        snode = None
703
        import_log_msg = ("import from %s to %s" %
704
                          (enode, pnode))
705
      else:
706
        import_log_msg = ("import from %s to %s, %s" %
707
                          (enode, pnode, snode))
708

    
709
      exp_op = opcodes.OpBackupExport(instance_name=instance,
710
                                      target_node=enode,
711
                                      mode=constants.EXPORT_MODE_LOCAL,
712
                                      shutdown=True)
713
      rem_op = opcodes.OpInstanceRemove(instance_name=instance,
714
                                        ignore_failures=True)
715
      imp_dir = utils.PathJoin(constants.EXPORT_DIR, full_name)
716
      imp_op = opcodes.OpInstanceCreate(instance_name=instance,
717
                                        disks = [ {"size": size}
718
                                                  for size in self.disk_size],
719
                                        disk_template=self.opts.disk_template,
720
                                        nics=self.opts.nics,
721
                                        mode=constants.INSTANCE_IMPORT,
722
                                        src_node=enode,
723
                                        src_path=imp_dir,
724
                                        pnode=pnode,
725
                                        snode=snode,
726
                                        start=True,
727
                                        ip_check=self.opts.ip_check,
728
                                        name_check=self.opts.name_check,
729
                                        wait_for_sync=True,
730
                                        file_storage_dir=None,
731
                                        file_driver="loop",
732
                                        iallocator=self.opts.iallocator,
733
                                        beparams=self.bep,
734
                                        hvparams=self.hvp,
735
                                        osparams=self.opts.osparams,
736
                                        )
737

    
738
      erem_op = opcodes.OpBackupRemove(instance_name=instance)
739

    
740
      Log("export to node %s", enode, indent=2)
741
      Log("remove instance", indent=2)
742
      Log(import_log_msg, indent=2)
743
      Log("remove export", indent=2)
744
      self.ExecOrQueue(instance, [exp_op, rem_op, imp_op, erem_op])
745

    
746
  @staticmethod
747
  def StopInstanceOp(instance):
748
    """Stop given instance."""
749
    return opcodes.OpInstanceShutdown(instance_name=instance)
750

    
751
  @staticmethod
752
  def StartInstanceOp(instance):
753
    """Start given instance."""
754
    return opcodes.OpInstanceStartup(instance_name=instance, force=False)
755

    
756
  @staticmethod
757
  def RenameInstanceOp(instance, instance_new):
758
    """Rename instance."""
759
    return opcodes.OpInstanceRename(instance_name=instance,
760
                                    new_name=instance_new)
761

    
762
  @_DoCheckInstances
763
  @_DoBatch(True)
764
  def BurnStopStart(self):
765
    """Stop/start the instances."""
766
    Log("Stopping and starting instances")
767
    for instance in self.instances:
768
      Log("instance %s", instance, indent=1)
769
      op1 = self.StopInstanceOp(instance)
770
      op2 = self.StartInstanceOp(instance)
771
      self.ExecOrQueue(instance, [op1, op2])
772

    
773
  @_DoBatch(False)
774
  def BurnRemove(self):
775
    """Remove the instances."""
776
    Log("Removing instances")
777
    for instance in self.to_rem:
778
      Log("instance %s", instance, indent=1)
779
      op = opcodes.OpInstanceRemove(instance_name=instance,
780
                                    ignore_failures=True)
781
      self.ExecOrQueue(instance, [op])
782

    
783
  def BurnRename(self):
784
    """Rename the instances.
785

    
786
    Note that this function will not execute in parallel, since we
787
    only have one target for rename.
788

    
789
    """
790
    Log("Renaming instances")
791
    rename = self.opts.rename
792
    for instance in self.instances:
793
      Log("instance %s", instance, indent=1)
794
      op_stop1 = self.StopInstanceOp(instance)
795
      op_stop2 = self.StopInstanceOp(rename)
796
      op_rename1 = self.RenameInstanceOp(instance, rename)
797
      op_rename2 = self.RenameInstanceOp(rename, instance)
798
      op_start1 = self.StartInstanceOp(rename)
799
      op_start2 = self.StartInstanceOp(instance)
800
      self.ExecOp(False, op_stop1, op_rename1, op_start1)
801
      self._CheckInstanceAlive(rename)
802
      self.ExecOp(False, op_stop2, op_rename2, op_start2)
803
      self._CheckInstanceAlive(instance)
804

    
805
  @_DoCheckInstances
806
  @_DoBatch(True)
807
  def BurnReinstall(self):
808
    """Reinstall the instances."""
809
    Log("Reinstalling instances")
810
    for instance in self.instances:
811
      Log("instance %s", instance, indent=1)
812
      op1 = self.StopInstanceOp(instance)
813
      op2 = opcodes.OpInstanceReinstall(instance_name=instance)
814
      Log("reinstall without passing the OS", indent=2)
815
      op3 = opcodes.OpInstanceReinstall(instance_name=instance,
816
                                        os_type=self.opts.os)
817
      Log("reinstall specifying the OS", indent=2)
818
      op4 = self.StartInstanceOp(instance)
819
      self.ExecOrQueue(instance, [op1, op2, op3, op4])
820

    
821
  @_DoCheckInstances
822
  @_DoBatch(True)
823
  def BurnReboot(self):
824
    """Reboot the instances."""
825
    Log("Rebooting instances")
826
    for instance in self.instances:
827
      Log("instance %s", instance, indent=1)
828
      ops = []
829
      for reboot_type in self.opts.reboot_types:
830
        op = opcodes.OpInstanceReboot(instance_name=instance,
831
                                      reboot_type=reboot_type,
832
                                      ignore_secondaries=False)
833
        Log("reboot with type '%s'", reboot_type, indent=2)
834
        ops.append(op)
835
      self.ExecOrQueue(instance, ops)
836

    
837
  @_DoCheckInstances
838
  @_DoBatch(True)
839
  def BurnActivateDisks(self):
840
    """Activate and deactivate disks of the instances."""
841
    Log("Activating/deactivating disks")
842
    for instance in self.instances:
843
      Log("instance %s", instance, indent=1)
844
      op_start = self.StartInstanceOp(instance)
845
      op_act = opcodes.OpInstanceActivateDisks(instance_name=instance)
846
      op_deact = opcodes.OpInstanceDeactivateDisks(instance_name=instance)
847
      op_stop = self.StopInstanceOp(instance)
848
      Log("activate disks when online", indent=2)
849
      Log("activate disks when offline", indent=2)
850
      Log("deactivate disks (when offline)", indent=2)
851
      self.ExecOrQueue(instance, [op_act, op_stop, op_act, op_deact, op_start])
852

    
853
  @_DoCheckInstances
854
  @_DoBatch(False)
855
  def BurnAddRemoveDisks(self):
856
    """Add and remove an extra disk for the instances."""
857
    Log("Adding and removing disks")
858
    for instance in self.instances:
859
      Log("instance %s", instance, indent=1)
860
      op_add = opcodes.OpInstanceSetParams(\
861
        instance_name=instance,
862
        disks=[(constants.DDM_ADD, {"size": self.disk_size[0]})])
863
      op_rem = opcodes.OpInstanceSetParams(\
864
        instance_name=instance, disks=[(constants.DDM_REMOVE, {})])
865
      op_stop = self.StopInstanceOp(instance)
866
      op_start = self.StartInstanceOp(instance)
867
      Log("adding a disk", indent=2)
868
      Log("removing last disk", indent=2)
869
      self.ExecOrQueue(instance, [op_add, op_stop, op_rem, op_start])
870

    
871
  @_DoBatch(False)
872
  def BurnAddRemoveNICs(self):
873
    """Add and remove an extra NIC for the instances."""
874
    Log("Adding and removing NICs")
875
    for instance in self.instances:
876
      Log("instance %s", instance, indent=1)
877
      op_add = opcodes.OpInstanceSetParams(\
878
        instance_name=instance, nics=[(constants.DDM_ADD, {})])
879
      op_rem = opcodes.OpInstanceSetParams(\
880
        instance_name=instance, nics=[(constants.DDM_REMOVE, {})])
881
      Log("adding a NIC", indent=2)
882
      Log("removing last NIC", indent=2)
883
      self.ExecOrQueue(instance, [op_add, op_rem])
884

    
885
  def ConfdCallback(self, reply):
886
    """Callback for confd queries"""
887
    if reply.type == confd_client.UPCALL_REPLY:
888
      if reply.server_reply.status != constants.CONFD_REPL_STATUS_OK:
889
        Err("Query %s gave non-ok status %s: %s" % (reply.orig_request,
890
                                                    reply.server_reply.status,
891
                                                    reply.server_reply))
892
      if reply.orig_request.type == constants.CONFD_REQ_PING:
893
        Log("Ping: OK", indent=1)
894
      elif reply.orig_request.type == constants.CONFD_REQ_CLUSTER_MASTER:
895
        if reply.server_reply.answer == self.cluster_info["master"]:
896
          Log("Master: OK", indent=1)
897
        else:
898
          Err("Master: wrong: %s" % reply.server_reply.answer)
899
      elif reply.orig_request.type == constants.CONFD_REQ_NODE_ROLE_BYNAME:
900
        if reply.server_reply.answer == constants.CONFD_NODE_ROLE_MASTER:
901
          Log("Node role for master: OK", indent=1)
902
        else:
903
          Err("Node role for master: wrong: %s" % reply.server_reply.answer)
904

    
905
  def DoConfdRequestReply(self, req):
906
    self.confd_counting_callback.RegisterQuery(req.rsalt)
907
    self.confd_client.SendRequest(req, async=False)
908
    while not self.confd_counting_callback.AllAnswered():
909
      if not self.confd_client.ReceiveReply():
910
        Err("Did not receive all expected confd replies")
911
        break
912

    
913
  def BurnConfd(self):
914
    """Run confd queries for our instances.
915

    
916
    The following confd queries are tested:
917
      - CONFD_REQ_PING: simple ping
918
      - CONFD_REQ_CLUSTER_MASTER: cluster master
919
      - CONFD_REQ_NODE_ROLE_BYNAME: node role, for the master
920

    
921
    """
922
    Log("Checking confd results")
923

    
924
    filter_callback = confd_client.ConfdFilterCallback(self.ConfdCallback)
925
    counting_callback = confd_client.ConfdCountingCallback(filter_callback)
926
    self.confd_counting_callback = counting_callback
927

    
928
    self.confd_client = confd_client.GetConfdClient(counting_callback)
929

    
930
    req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
931
    self.DoConfdRequestReply(req)
932

    
933
    req = confd_client.ConfdClientRequest(
934
      type=constants.CONFD_REQ_CLUSTER_MASTER)
935
    self.DoConfdRequestReply(req)
936

    
937
    req = confd_client.ConfdClientRequest(
938
        type=constants.CONFD_REQ_NODE_ROLE_BYNAME,
939
        query=self.cluster_info["master"])
940
    self.DoConfdRequestReply(req)
941

    
942
  def _CheckInstanceAlive(self, instance):
943
    """Check if an instance is alive by doing http checks.
944

    
945
    This will try to retrieve the url on the instance /hostname.txt
946
    and check that it contains the hostname of the instance. In case
947
    we get ECONNREFUSED, we retry up to the net timeout seconds, for
948
    any other error we abort.
949

    
950
    """
951
    if not self.opts.http_check:
952
      return
953
    end_time = time.time() + self.opts.net_timeout
954
    url = None
955
    while time.time() < end_time and url is None:
956
      try:
957
        url = self.url_opener.open("http://%s/hostname.txt" % instance)
958
      except IOError:
959
        # here we can have connection refused, no route to host, etc.
960
        time.sleep(1)
961
    if url is None:
962
      raise InstanceDown(instance, "Cannot contact instance")
963
    hostname = url.read().strip()
964
    url.close()
965
    if hostname != instance:
966
      raise InstanceDown(instance, ("Hostname mismatch, expected %s, got %s" %
967
                                    (instance, hostname)))
968

    
969
  def BurninCluster(self):
970
    """Test a cluster intensively.
971

    
972
    This will create instances and then start/stop/failover them.
973
    It is safe for existing instances but could impact performance.
974

    
975
    """
976

    
977
    opts = self.opts
978

    
979
    Log("Testing global parameters")
980

    
981
    if (len(self.nodes) == 1 and
982
        opts.disk_template not in (constants.DT_DISKLESS, constants.DT_PLAIN,
983
                                   constants.DT_FILE)):
984
      Err("When one node is available/selected the disk template must"
985
          " be 'diskless', 'file' or 'plain'")
986

    
987
    has_err = True
988
    try:
989
      self.BurnCreateInstances()
990
      if opts.do_replace1 and opts.disk_template in constants.DTS_NET_MIRROR:
991
        self.BurnReplaceDisks1D8()
992
      if (opts.do_replace2 and len(self.nodes) > 2 and
993
          opts.disk_template in constants.DTS_NET_MIRROR) :
994
        self.BurnReplaceDisks2()
995

    
996
      if (opts.disk_template in constants.DTS_GROWABLE and
997
          compat.any(n > 0 for n in self.disk_growth)):
998
        self.BurnGrowDisks()
999

    
1000
      if opts.do_failover and opts.disk_template in constants.DTS_NET_MIRROR:
1001
        self.BurnFailover()
1002

    
1003
      if opts.do_migrate:
1004
        if opts.disk_template != constants.DT_DRBD8:
1005
          Log("Skipping migration (disk template not DRBD8)")
1006
        elif not self.hv_class.CAN_MIGRATE:
1007
          Log("Skipping migration (hypervisor %s does not support it)",
1008
              self.hypervisor)
1009
        else:
1010
          self.BurnMigrate()
1011

    
1012
      if (opts.do_move and len(self.nodes) > 1 and
1013
          opts.disk_template in [constants.DT_PLAIN, constants.DT_FILE]):
1014
        self.BurnMove()
1015

    
1016
      if (opts.do_importexport and
1017
          opts.disk_template not in (constants.DT_DISKLESS,
1018
                                     constants.DT_FILE)):
1019
        self.BurnImportExport()
1020

    
1021
      if opts.do_reinstall:
1022
        self.BurnReinstall()
1023

    
1024
      if opts.do_reboot:
1025
        self.BurnReboot()
1026

    
1027
      if opts.do_addremove_disks:
1028
        self.BurnAddRemoveDisks()
1029

    
1030
      default_nic_mode = self.cluster_default_nicparams[constants.NIC_MODE]
1031
      # Don't add/remove nics in routed mode, as we would need an ip to add
1032
      # them with
1033
      if opts.do_addremove_nics:
1034
        if default_nic_mode == constants.NIC_MODE_BRIDGED:
1035
          self.BurnAddRemoveNICs()
1036
        else:
1037
          Log("Skipping nic add/remove as the cluster is not in bridged mode")
1038

    
1039
      if opts.do_activate_disks:
1040
        self.BurnActivateDisks()
1041

    
1042
      if opts.rename:
1043
        self.BurnRename()
1044

    
1045
      if opts.do_confd_tests:
1046
        self.BurnConfd()
1047

    
1048
      if opts.do_startstop:
1049
        self.BurnStopStart()
1050

    
1051
      has_err = False
1052
    finally:
1053
      if has_err:
1054
        Log("Error detected: opcode buffer follows:\n\n")
1055
        Log(self.GetFeedbackBuf())
1056
        Log("\n\n")
1057
      if not self.opts.keep_instances:
1058
        try:
1059
          self.BurnRemove()
1060
        except Exception, err:  # pylint: disable-msg=W0703
1061
          if has_err: # already detected errors, so errors in removal
1062
                      # are quite expected
1063
            Log("Note: error detected during instance remove: %s", err)
1064
          else: # non-expected error
1065
            raise
1066

    
1067
    return constants.EXIT_SUCCESS
1068

    
1069

    
1070
def main():
1071
  """Main function.
1072

    
1073
  """
1074
  utils.SetupLogging(constants.LOG_BURNIN, debug=False, stderr_logging=True)
1075

    
1076
  return Burner().BurninCluster()
1077

    
1078

    
1079
if __name__ == "__main__":
1080
  main()