Statistics
| Branch: | Tag: | Revision:

root / tools / burnin @ 5073fd8f

History | View | Annotate | Download (38.3 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Burnin program
23

    
24
"""
25

    
26
import sys
27
import optparse
28
import time
29
import socket
30
import urllib
31
from itertools import izip, islice, cycle
32
from cStringIO import StringIO
33

    
34
from ganeti import opcodes
35
from ganeti import constants
36
from ganeti import cli
37
from ganeti import errors
38
from ganeti import utils
39
from ganeti import hypervisor
40
from ganeti import compat
41

    
42
from ganeti.confd import client as confd_client
43

    
44

    
45
USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...")
46

    
47
MAX_RETRIES = 3
48
LOG_HEADERS = {
49
  0: "- ",
50
  1: "* ",
51
  2: ""
52
  }
53

    
54
class InstanceDown(Exception):
55
  """The checked instance was not up"""
56

    
57

    
58
class BurninFailure(Exception):
59
  """Failure detected during burning"""
60

    
61

    
62
def Usage():
63
  """Shows program usage information and exits the program."""
64

    
65
  print >> sys.stderr, "Usage:"
66
  print >> sys.stderr, USAGE
67
  sys.exit(2)
68

    
69

    
70
def Log(msg, *args, **kwargs):
71
  """Simple function that prints out its argument.
72

    
73
  """
74
  if args:
75
    msg = msg % args
76
  indent = kwargs.get('indent', 0)
77
  sys.stdout.write("%*s%s%s\n" % (2*indent, "",
78
                                  LOG_HEADERS.get(indent, "  "), msg))
79
  sys.stdout.flush()
80

    
81

    
82
def Err(msg, exit_code=1):
83
  """Simple error logging that prints to stderr.
84

    
85
  """
86
  sys.stderr.write(msg + "\n")
87
  sys.stderr.flush()
88
  sys.exit(exit_code)
89

    
90

    
91
class SimpleOpener(urllib.FancyURLopener):
92
  """A simple url opener"""
93
  # pylint: disable-msg=W0221
94

    
95
  def prompt_user_passwd(self, host, realm, clear_cache=0):
96
    """No-interaction version of prompt_user_passwd."""
97
    # we follow parent class' API
98
    # pylint: disable-msg=W0613
99
    return None, None
100

    
101
  def http_error_default(self, url, fp, errcode, errmsg, headers):
102
    """Custom error handling"""
103
    # make sure sockets are not left in CLOSE_WAIT, this is similar
104
    # but with a different exception to the BasicURLOpener class
105
    _ = fp.read() # throw away data
106
    fp.close()
107
    raise InstanceDown("HTTP error returned: code %s, msg %s" %
108
                       (errcode, errmsg))
109

    
110

    
111
OPTIONS = [
112
  cli.cli_option("-o", "--os", dest="os", default=None,
113
                 help="OS to use during burnin",
114
                 metavar="<OS>",
115
                 completion_suggest=cli.OPT_COMPL_ONE_OS),
116
  cli.HYPERVISOR_OPT,
117
  cli.OSPARAMS_OPT,
118
  cli.cli_option("--disk-size", dest="disk_size",
119
                 help="Disk size (determines disk count)",
120
                 default="128m", type="string", metavar="<size,size,...>",
121
                 completion_suggest=("128M 512M 1G 4G 1G,256M"
122
                                     " 4G,1G,1G 10G").split()),
123
  cli.cli_option("--disk-growth", dest="disk_growth", help="Disk growth",
124
                 default="128m", type="string", metavar="<size,size,...>"),
125
  cli.cli_option("--mem-size", dest="mem_size", help="Memory size",
126
                 default=128, type="unit", metavar="<size>",
127
                 completion_suggest=("128M 256M 512M 1G 4G 8G"
128
                                     " 12G 16G").split()),
129
  cli.DEBUG_OPT,
130
  cli.VERBOSE_OPT,
131
  cli.NOIPCHECK_OPT,
132
  cli.NONAMECHECK_OPT,
133
  cli.EARLY_RELEASE_OPT,
134
  cli.cli_option("--no-replace1", dest="do_replace1",
135
                 help="Skip disk replacement with the same secondary",
136
                 action="store_false", default=True),
137
  cli.cli_option("--no-replace2", dest="do_replace2",
138
                 help="Skip disk replacement with a different secondary",
139
                 action="store_false", default=True),
140
  cli.cli_option("--no-failover", dest="do_failover",
141
                 help="Skip instance failovers", action="store_false",
142
                 default=True),
143
  cli.cli_option("--no-migrate", dest="do_migrate",
144
                 help="Skip instance live migration",
145
                 action="store_false", default=True),
146
  cli.cli_option("--no-move", dest="do_move",
147
                 help="Skip instance moves", action="store_false",
148
                 default=True),
149
  cli.cli_option("--no-importexport", dest="do_importexport",
150
                 help="Skip instance export/import", action="store_false",
151
                 default=True),
152
  cli.cli_option("--no-startstop", dest="do_startstop",
153
                 help="Skip instance stop/start", action="store_false",
154
                 default=True),
155
  cli.cli_option("--no-reinstall", dest="do_reinstall",
156
                 help="Skip instance reinstall", action="store_false",
157
                 default=True),
158
  cli.cli_option("--no-reboot", dest="do_reboot",
159
                 help="Skip instance reboot", action="store_false",
160
                 default=True),
161
  cli.cli_option("--reboot-types", dest="reboot_types",
162
                 help="Specify the reboot types", default=None),
163
  cli.cli_option("--no-activate-disks", dest="do_activate_disks",
164
                 help="Skip disk activation/deactivation",
165
                 action="store_false", default=True),
166
  cli.cli_option("--no-add-disks", dest="do_addremove_disks",
167
                 help="Skip disk addition/removal",
168
                 action="store_false", default=True),
169
  cli.cli_option("--no-add-nics", dest="do_addremove_nics",
170
                 help="Skip NIC addition/removal",
171
                 action="store_false", default=True),
172
  cli.cli_option("--no-nics", dest="nics",
173
                 help="No network interfaces", action="store_const",
174
                 const=[], default=[{}]),
175
  cli.cli_option("--no-confd", dest="do_confd_tests",
176
                 help="Skip confd queries",
177
                 action="store_false", default=True),
178
  cli.cli_option("--rename", dest="rename", default=None,
179
                 help=("Give one unused instance name which is taken"
180
                       " to start the renaming sequence"),
181
                 metavar="<instance_name>"),
182
  cli.cli_option("-t", "--disk-template", dest="disk_template",
183
                 choices=list(constants.DISK_TEMPLATES),
184
                 default=constants.DT_DRBD8,
185
                 help="Disk template (diskless, file, plain or drbd) [drbd]"),
186
  cli.cli_option("-n", "--nodes", dest="nodes", default="",
187
                 help=("Comma separated list of nodes to perform"
188
                       " the burnin on (defaults to all nodes)"),
189
                 completion_suggest=cli.OPT_COMPL_MANY_NODES),
190
  cli.cli_option("-I", "--iallocator", dest="iallocator",
191
                 default=None, type="string",
192
                 help=("Perform the allocation using an iallocator"
193
                       " instead of fixed node spread (node restrictions no"
194
                       " longer apply, therefore -n/--nodes must not be"
195
                       " used"),
196
                 completion_suggest=cli.OPT_COMPL_ONE_IALLOCATOR),
197
  cli.cli_option("-p", "--parallel", default=False, action="store_true",
198
                 dest="parallel",
199
                 help=("Enable parallelization of some operations in"
200
                       " order to speed burnin or to test granular locking")),
201
  cli.cli_option("--net-timeout", default=15, type="int",
202
                 dest="net_timeout",
203
                 help=("The instance check network timeout in seconds"
204
                       " (defaults to 15 seconds)"),
205
                 completion_suggest="15 60 300 900".split()),
206
  cli.cli_option("-C", "--http-check", default=False, action="store_true",
207
                 dest="http_check",
208
                 help=("Enable checking of instance status via http,"
209
                       " looking for /hostname.txt that should contain the"
210
                       " name of the instance")),
211
  cli.cli_option("-K", "--keep-instances", default=False,
212
                 action="store_true",
213
                 dest="keep_instances",
214
                 help=("Leave instances on the cluster after burnin,"
215
                       " for investigation in case of errors or simply"
216
                       " to use them")),
217
  ]
218

    
219
# Mainly used for bash completion
220
ARGUMENTS = [cli.ArgInstance(min=1)]
221

    
222

    
223
def _DoCheckInstances(fn):
224
  """Decorator for checking instances.
225

    
226
  """
227
  def wrapper(self, *args, **kwargs):
228
    val = fn(self, *args, **kwargs)
229
    for instance in self.instances:
230
      self._CheckInstanceAlive(instance) # pylint: disable-msg=W0212
231
    return val
232

    
233
  return wrapper
234

    
235

    
236
def _DoBatch(retry):
237
  """Decorator for possible batch operations.
238

    
239
  Must come after the _DoCheckInstances decorator (if any).
240

    
241
  @param retry: whether this is a retryable batch, will be
242
      passed to StartBatch
243

    
244
  """
245
  def wrap(fn):
246
    def batched(self, *args, **kwargs):
247
      self.StartBatch(retry)
248
      val = fn(self, *args, **kwargs)
249
      self.CommitQueue()
250
      return val
251
    return batched
252

    
253
  return wrap
254

    
255

    
256
class Burner(object):
257
  """Burner class."""
258

    
259
  def __init__(self):
260
    """Constructor."""
261
    utils.SetupLogging(constants.LOG_BURNIN, debug=False, stderr_logging=True)
262
    self.url_opener = SimpleOpener()
263
    self._feed_buf = StringIO()
264
    self.nodes = []
265
    self.instances = []
266
    self.to_rem = []
267
    self.queued_ops = []
268
    self.opts = None
269
    self.queue_retry = False
270
    self.disk_count = self.disk_growth = self.disk_size = None
271
    self.hvp = self.bep = None
272
    self.ParseOptions()
273
    self.cl = cli.GetClient()
274
    self.GetState()
275

    
276
  def ClearFeedbackBuf(self):
277
    """Clear the feedback buffer."""
278
    self._feed_buf.truncate(0)
279

    
280
  def GetFeedbackBuf(self):
281
    """Return the contents of the buffer."""
282
    return self._feed_buf.getvalue()
283

    
284
  def Feedback(self, msg):
285
    """Acumulate feedback in our buffer."""
286
    formatted_msg = "%s %s" % (time.ctime(utils.MergeTime(msg[0])), msg[2])
287
    self._feed_buf.write(formatted_msg + "\n")
288
    if self.opts.verbose:
289
      Log(formatted_msg, indent=3)
290

    
291
  def MaybeRetry(self, retry_count, msg, fn, *args):
292
    """Possibly retry a given function execution.
293

    
294
    @type retry_count: int
295
    @param retry_count: retry counter:
296
        - 0: non-retryable action
297
        - 1: last retry for a retryable action
298
        - MAX_RETRIES: original try for a retryable action
299
    @type msg: str
300
    @param msg: the kind of the operation
301
    @type fn: callable
302
    @param fn: the function to be called
303

    
304
    """
305
    try:
306
      val = fn(*args)
307
      if retry_count > 0 and retry_count < MAX_RETRIES:
308
        Log("Idempotent %s succeeded after %d retries",
309
            msg, MAX_RETRIES - retry_count)
310
      return val
311
    except Exception, err: # pylint: disable-msg=W0703
312
      if retry_count == 0:
313
        Log("Non-idempotent %s failed, aborting", msg)
314
        raise
315
      elif retry_count == 1:
316
        Log("Idempotent %s repeated failure, aborting", msg)
317
        raise
318
      else:
319
        Log("Idempotent %s failed, retry #%d/%d: %s",
320
            msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err)
321
        self.MaybeRetry(retry_count - 1, msg, fn, *args)
322

    
323
  def _SetDebug(self, ops):
324
    """Set the debug value on the given opcodes"""
325
    for op in ops:
326
      op.debug_level = self.opts.debug
327

    
328
  def _ExecOp(self, *ops):
329
    """Execute one or more opcodes and manage the exec buffer.
330

    
331
    @return: if only opcode has been passed, we return its result;
332
        otherwise we return the list of results
333

    
334
    """
335
    job_id = cli.SendJob(ops, cl=self.cl)
336
    results = cli.PollJob(job_id, cl=self.cl, feedback_fn=self.Feedback)
337
    if len(ops) == 1:
338
      return results[0]
339
    else:
340
      return results
341

    
342
  def ExecOp(self, retry, *ops):
343
    """Execute one or more opcodes and manage the exec buffer.
344

    
345
    @return: if only opcode has been passed, we return its result;
346
        otherwise we return the list of results
347

    
348
    """
349
    if retry:
350
      rval = MAX_RETRIES
351
    else:
352
      rval = 0
353
    self._SetDebug(ops)
354
    return self.MaybeRetry(rval, "opcode", self._ExecOp, *ops)
355

    
356
  def ExecOrQueue(self, name, ops, post_process=None):
357
    """Execute an opcode and manage the exec buffer."""
358
    if self.opts.parallel:
359
      self._SetDebug(ops)
360
      self.queued_ops.append((ops, name, post_process))
361
    else:
362
      val = self.ExecOp(self.queue_retry, *ops) # pylint: disable-msg=W0142
363
      if post_process is not None:
364
        post_process()
365
      return val
366

    
367
  def StartBatch(self, retry):
368
    """Start a new batch of jobs.
369

    
370
    @param retry: whether this is a retryable batch
371

    
372
    """
373
    self.queued_ops = []
374
    self.queue_retry = retry
375

    
376
  def CommitQueue(self):
377
    """Execute all submitted opcodes in case of parallel burnin"""
378
    if not self.opts.parallel or not self.queued_ops:
379
      return
380

    
381
    if self.queue_retry:
382
      rval = MAX_RETRIES
383
    else:
384
      rval = 0
385

    
386
    try:
387
      results = self.MaybeRetry(rval, "jobset", self.ExecJobSet,
388
                                self.queued_ops)
389
    finally:
390
      self.queued_ops = []
391
    return results
392

    
393
  def ExecJobSet(self, jobs):
394
    """Execute a set of jobs and return once all are done.
395

    
396
    The method will return the list of results, if all jobs are
397
    successful. Otherwise, OpExecError will be raised from within
398
    cli.py.
399

    
400
    """
401
    self.ClearFeedbackBuf()
402
    jex = cli.JobExecutor(cl=self.cl, feedback_fn=self.Feedback)
403
    for ops, name, _ in jobs:
404
      jex.QueueJob(name, *ops) # pylint: disable-msg=W0142
405
    try:
406
      results = jex.GetResults()
407
    except Exception, err: # pylint: disable-msg=W0703
408
      Log("Jobs failed: %s", err)
409
      raise BurninFailure()
410

    
411
    fail = False
412
    val = []
413
    for (_, name, post_process), (success, result) in zip(jobs, results):
414
      if success:
415
        if post_process:
416
          try:
417
            post_process()
418
          except Exception, err: # pylint: disable-msg=W0703
419
            Log("Post process call for job %s failed: %s", name, err)
420
            fail = True
421
        val.append(result)
422
      else:
423
        fail = True
424

    
425
    if fail:
426
      raise BurninFailure()
427

    
428
    return val
429

    
430
  def ParseOptions(self):
431
    """Parses the command line options.
432

    
433
    In case of command line errors, it will show the usage and exit the
434
    program.
435

    
436
    """
437
    parser = optparse.OptionParser(usage="\n%s" % USAGE,
438
                                   version=("%%prog (ganeti) %s" %
439
                                            constants.RELEASE_VERSION),
440
                                   option_list=OPTIONS)
441

    
442
    options, args = parser.parse_args()
443
    if len(args) < 1 or options.os is None:
444
      Usage()
445

    
446
    supported_disk_templates = (constants.DT_DISKLESS,
447
                                constants.DT_FILE,
448
                                constants.DT_PLAIN,
449
                                constants.DT_DRBD8)
450
    if options.disk_template not in supported_disk_templates:
451
      Err("Unknown disk template '%s'" % options.disk_template)
452

    
453
    if options.disk_template == constants.DT_DISKLESS:
454
      disk_size = disk_growth = []
455
      options.do_addremove_disks = False
456
    else:
457
      disk_size = [utils.ParseUnit(v) for v in options.disk_size.split(",")]
458
      disk_growth = [utils.ParseUnit(v)
459
                     for v in options.disk_growth.split(",")]
460
      if len(disk_growth) != len(disk_size):
461
        Err("Wrong disk sizes/growth combination")
462
    if ((disk_size and options.disk_template == constants.DT_DISKLESS) or
463
        (not disk_size and options.disk_template != constants.DT_DISKLESS)):
464
      Err("Wrong disk count/disk template combination")
465

    
466
    self.disk_size = disk_size
467
    self.disk_growth = disk_growth
468
    self.disk_count = len(disk_size)
469

    
470
    if options.nodes and options.iallocator:
471
      Err("Give either the nodes option or the iallocator option, not both")
472

    
473
    if options.http_check and not options.name_check:
474
      Err("Can't enable HTTP checks without name checks")
475

    
476
    self.opts = options
477
    self.instances = args
478
    self.bep = {
479
      constants.BE_MEMORY: options.mem_size,
480
      constants.BE_VCPUS: 1,
481
      }
482

    
483
    self.hypervisor = None
484
    self.hvp = {}
485
    if options.hypervisor:
486
      self.hypervisor, self.hvp = options.hypervisor
487

    
488
    if options.reboot_types is None:
489
      options.reboot_types = constants.REBOOT_TYPES
490
    else:
491
      options.reboot_types = options.reboot_types.split(",")
492
      rt_diff = set(options.reboot_types).difference(constants.REBOOT_TYPES)
493
      if rt_diff:
494
        Err("Invalid reboot types specified: %s" % utils.CommaJoin(rt_diff))
495

    
496
    socket.setdefaulttimeout(options.net_timeout)
497

    
498
  def GetState(self):
499
    """Read the cluster state from the master daemon."""
500
    if self.opts.nodes:
501
      names = self.opts.nodes.split(",")
502
    else:
503
      names = []
504
    try:
505
      op = opcodes.OpQueryNodes(output_fields=["name", "offline", "drained"],
506
                                names=names, use_locking=True)
507
      result = self.ExecOp(True, op)
508
    except errors.GenericError, err:
509
      err_code, msg = cli.FormatError(err)
510
      Err(msg, exit_code=err_code)
511
    self.nodes = [data[0] for data in result if not (data[1] or data[2])]
512

    
513
    op_diagnose = opcodes.OpDiagnoseOS(output_fields=["name",
514
                                                      "variants",
515
                                                      "hidden"],
516
                                       names=[])
517
    result = self.ExecOp(True, op_diagnose)
518

    
519
    if not result:
520
      Err("Can't get the OS list")
521

    
522
    found = False
523
    for (name, variants, _) in result:
524
      if self.opts.os in cli.CalculateOSNames(name, variants):
525
        found = True
526
        break
527

    
528
    if not found:
529
      Err("OS '%s' not found" % self.opts.os)
530

    
531
    cluster_info = self.cl.QueryClusterInfo()
532
    self.cluster_info = cluster_info
533
    if not self.cluster_info:
534
      Err("Can't get cluster info")
535

    
536
    default_nic_params = self.cluster_info["nicparams"][constants.PP_DEFAULT]
537
    self.cluster_default_nicparams = default_nic_params
538
    if self.hypervisor is None:
539
      self.hypervisor = self.cluster_info["default_hypervisor"]
540
    self.hv_class = hypervisor.GetHypervisorClass(self.hypervisor)
541

    
542
  @_DoCheckInstances
543
  @_DoBatch(False)
544
  def BurnCreateInstances(self):
545
    """Create the given instances.
546

    
547
    """
548
    self.to_rem = []
549
    mytor = izip(cycle(self.nodes),
550
                 islice(cycle(self.nodes), 1, None),
551
                 self.instances)
552

    
553
    Log("Creating instances")
554
    for pnode, snode, instance in mytor:
555
      Log("instance %s", instance, indent=1)
556
      if self.opts.iallocator:
557
        pnode = snode = None
558
        msg = "with iallocator %s" % self.opts.iallocator
559
      elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
560
        snode = None
561
        msg = "on %s" % pnode
562
      else:
563
        msg = "on %s, %s" % (pnode, snode)
564

    
565
      Log(msg, indent=2)
566

    
567
      op = opcodes.OpInstanceCreate(instance_name=instance,
568
                                    disks = [ {"size": size}
569
                                              for size in self.disk_size],
570
                                    disk_template=self.opts.disk_template,
571
                                    nics=self.opts.nics,
572
                                    mode=constants.INSTANCE_CREATE,
573
                                    os_type=self.opts.os,
574
                                    pnode=pnode,
575
                                    snode=snode,
576
                                    start=True,
577
                                    ip_check=self.opts.ip_check,
578
                                    name_check=self.opts.name_check,
579
                                    wait_for_sync=True,
580
                                    file_driver="loop",
581
                                    file_storage_dir=None,
582
                                    iallocator=self.opts.iallocator,
583
                                    beparams=self.bep,
584
                                    hvparams=self.hvp,
585
                                    hypervisor=self.hypervisor,
586
                                    osparams=self.opts.osparams,
587
                                    )
588
      remove_instance = lambda name: lambda: self.to_rem.append(name)
589
      self.ExecOrQueue(instance, [op], post_process=remove_instance(instance))
590

    
591
  @_DoBatch(False)
592
  def BurnGrowDisks(self):
593
    """Grow both the os and the swap disks by the requested amount, if any."""
594
    Log("Growing disks")
595
    for instance in self.instances:
596
      Log("instance %s", instance, indent=1)
597
      for idx, growth in enumerate(self.disk_growth):
598
        if growth > 0:
599
          op = opcodes.OpInstanceGrowDisk(instance_name=instance, disk=idx,
600
                                          amount=growth, wait_for_sync=True)
601
          Log("increase disk/%s by %s MB", idx, growth, indent=2)
602
          self.ExecOrQueue(instance, [op])
603

    
604
  @_DoBatch(True)
605
  def BurnReplaceDisks1D8(self):
606
    """Replace disks on primary and secondary for drbd8."""
607
    Log("Replacing disks on the same nodes")
608
    for instance in self.instances:
609
      Log("instance %s", instance, indent=1)
610
      ops = []
611
      for mode in constants.REPLACE_DISK_SEC, constants.REPLACE_DISK_PRI:
612
        op = opcodes.OpReplaceDisks(instance_name=instance,
613
                                    mode=mode,
614
                                    disks=[i for i in range(self.disk_count)],
615
                                    early_release=self.opts.early_release)
616
        Log("run %s", mode, indent=2)
617
        ops.append(op)
618
      self.ExecOrQueue(instance, ops)
619

    
620
  @_DoBatch(True)
621
  def BurnReplaceDisks2(self):
622
    """Replace secondary node."""
623
    Log("Changing the secondary node")
624
    mode = constants.REPLACE_DISK_CHG
625

    
626
    mytor = izip(islice(cycle(self.nodes), 2, None),
627
                 self.instances)
628
    for tnode, instance in mytor:
629
      Log("instance %s", instance, indent=1)
630
      if self.opts.iallocator:
631
        tnode = None
632
        msg = "with iallocator %s" % self.opts.iallocator
633
      else:
634
        msg = tnode
635
      op = opcodes.OpReplaceDisks(instance_name=instance,
636
                                  mode=mode,
637
                                  remote_node=tnode,
638
                                  iallocator=self.opts.iallocator,
639
                                  disks=[],
640
                                  early_release=self.opts.early_release)
641
      Log("run %s %s", mode, msg, indent=2)
642
      self.ExecOrQueue(instance, [op])
643

    
644
  @_DoCheckInstances
645
  @_DoBatch(False)
646
  def BurnFailover(self):
647
    """Failover the instances."""
648
    Log("Failing over instances")
649
    for instance in self.instances:
650
      Log("instance %s", instance, indent=1)
651
      op = opcodes.OpInstanceFailover(instance_name=instance,
652
                                      ignore_consistency=False)
653
      self.ExecOrQueue(instance, [op])
654

    
655
  @_DoCheckInstances
656
  @_DoBatch(False)
657
  def BurnMove(self):
658
    """Move the instances."""
659
    Log("Moving instances")
660
    mytor = izip(islice(cycle(self.nodes), 1, None),
661
                 self.instances)
662
    for tnode, instance in mytor:
663
      Log("instance %s", instance, indent=1)
664
      op = opcodes.OpInstanceMove(instance_name=instance,
665
                                  target_node=tnode)
666
      self.ExecOrQueue(instance, [op])
667

    
668
  @_DoBatch(False)
669
  def BurnMigrate(self):
670
    """Migrate the instances."""
671
    Log("Migrating instances")
672
    for instance in self.instances:
673
      Log("instance %s", instance, indent=1)
674
      op1 = opcodes.OpInstanceMigrate(instance_name=instance, mode=None,
675
                                      cleanup=False)
676

    
677
      op2 = opcodes.OpInstanceMigrate(instance_name=instance, mode=None,
678
                                      cleanup=True)
679
      Log("migration and migration cleanup", indent=2)
680
      self.ExecOrQueue(instance, [op1, op2])
681

    
682
  @_DoCheckInstances
683
  @_DoBatch(False)
684
  def BurnImportExport(self):
685
    """Export the instance, delete it, and import it back.
686

    
687
    """
688
    Log("Exporting and re-importing instances")
689
    mytor = izip(cycle(self.nodes),
690
                 islice(cycle(self.nodes), 1, None),
691
                 islice(cycle(self.nodes), 2, None),
692
                 self.instances)
693

    
694
    for pnode, snode, enode, instance in mytor:
695
      Log("instance %s", instance, indent=1)
696
      # read the full name of the instance
697
      nam_op = opcodes.OpInstanceQuery(output_fields=["name"],
698
                                       names=[instance], use_locking=True)
699
      full_name = self.ExecOp(False, nam_op)[0][0]
700

    
701
      if self.opts.iallocator:
702
        pnode = snode = None
703
        import_log_msg = ("import from %s"
704
                          " with iallocator %s" %
705
                          (enode, self.opts.iallocator))
706
      elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
707
        snode = None
708
        import_log_msg = ("import from %s to %s" %
709
                          (enode, pnode))
710
      else:
711
        import_log_msg = ("import from %s to %s, %s" %
712
                          (enode, pnode, snode))
713

    
714
      exp_op = opcodes.OpBackupExport(instance_name=instance,
715
                                      target_node=enode,
716
                                      mode=constants.EXPORT_MODE_LOCAL,
717
                                      shutdown=True)
718
      rem_op = opcodes.OpRemoveInstance(instance_name=instance,
719
                                        ignore_failures=True)
720
      imp_dir = utils.PathJoin(constants.EXPORT_DIR, full_name)
721
      imp_op = opcodes.OpInstanceCreate(instance_name=instance,
722
                                        disks = [ {"size": size}
723
                                                  for size in self.disk_size],
724
                                        disk_template=self.opts.disk_template,
725
                                        nics=self.opts.nics,
726
                                        mode=constants.INSTANCE_IMPORT,
727
                                        src_node=enode,
728
                                        src_path=imp_dir,
729
                                        pnode=pnode,
730
                                        snode=snode,
731
                                        start=True,
732
                                        ip_check=self.opts.ip_check,
733
                                        name_check=self.opts.name_check,
734
                                        wait_for_sync=True,
735
                                        file_storage_dir=None,
736
                                        file_driver="loop",
737
                                        iallocator=self.opts.iallocator,
738
                                        beparams=self.bep,
739
                                        hvparams=self.hvp,
740
                                        osparams=self.opts.osparams,
741
                                        )
742

    
743
      erem_op = opcodes.OpBackupRemove(instance_name=instance)
744

    
745
      Log("export to node %s", enode, indent=2)
746
      Log("remove instance", indent=2)
747
      Log(import_log_msg, indent=2)
748
      Log("remove export", indent=2)
749
      self.ExecOrQueue(instance, [exp_op, rem_op, imp_op, erem_op])
750

    
751
  @staticmethod
752
  def StopInstanceOp(instance):
753
    """Stop given instance."""
754
    return opcodes.OpShutdownInstance(instance_name=instance)
755

    
756
  @staticmethod
757
  def StartInstanceOp(instance):
758
    """Start given instance."""
759
    return opcodes.OpStartupInstance(instance_name=instance, force=False)
760

    
761
  @staticmethod
762
  def RenameInstanceOp(instance, instance_new):
763
    """Rename instance."""
764
    return opcodes.OpRenameInstance(instance_name=instance,
765
                                    new_name=instance_new)
766

    
767
  @_DoCheckInstances
768
  @_DoBatch(True)
769
  def BurnStopStart(self):
770
    """Stop/start the instances."""
771
    Log("Stopping and starting instances")
772
    for instance in self.instances:
773
      Log("instance %s", instance, indent=1)
774
      op1 = self.StopInstanceOp(instance)
775
      op2 = self.StartInstanceOp(instance)
776
      self.ExecOrQueue(instance, [op1, op2])
777

    
778
  @_DoBatch(False)
779
  def BurnRemove(self):
780
    """Remove the instances."""
781
    Log("Removing instances")
782
    for instance in self.to_rem:
783
      Log("instance %s", instance, indent=1)
784
      op = opcodes.OpRemoveInstance(instance_name=instance,
785
                                    ignore_failures=True)
786
      self.ExecOrQueue(instance, [op])
787

    
788
  def BurnRename(self):
789
    """Rename the instances.
790

    
791
    Note that this function will not execute in parallel, since we
792
    only have one target for rename.
793

    
794
    """
795
    Log("Renaming instances")
796
    rename = self.opts.rename
797
    for instance in self.instances:
798
      Log("instance %s", instance, indent=1)
799
      op_stop1 = self.StopInstanceOp(instance)
800
      op_stop2 = self.StopInstanceOp(rename)
801
      op_rename1 = self.RenameInstanceOp(instance, rename)
802
      op_rename2 = self.RenameInstanceOp(rename, instance)
803
      op_start1 = self.StartInstanceOp(rename)
804
      op_start2 = self.StartInstanceOp(instance)
805
      self.ExecOp(False, op_stop1, op_rename1, op_start1)
806
      self._CheckInstanceAlive(rename)
807
      self.ExecOp(False, op_stop2, op_rename2, op_start2)
808
      self._CheckInstanceAlive(instance)
809

    
810
  @_DoCheckInstances
811
  @_DoBatch(True)
812
  def BurnReinstall(self):
813
    """Reinstall the instances."""
814
    Log("Reinstalling instances")
815
    for instance in self.instances:
816
      Log("instance %s", instance, indent=1)
817
      op1 = self.StopInstanceOp(instance)
818
      op2 = opcodes.OpInstanceReinstall(instance_name=instance)
819
      Log("reinstall without passing the OS", indent=2)
820
      op3 = opcodes.OpInstanceReinstall(instance_name=instance,
821
                                        os_type=self.opts.os)
822
      Log("reinstall specifying the OS", indent=2)
823
      op4 = self.StartInstanceOp(instance)
824
      self.ExecOrQueue(instance, [op1, op2, op3, op4])
825

    
826
  @_DoCheckInstances
827
  @_DoBatch(True)
828
  def BurnReboot(self):
829
    """Reboot the instances."""
830
    Log("Rebooting instances")
831
    for instance in self.instances:
832
      Log("instance %s", instance, indent=1)
833
      ops = []
834
      for reboot_type in self.opts.reboot_types:
835
        op = opcodes.OpInstanceReboot(instance_name=instance,
836
                                      reboot_type=reboot_type,
837
                                      ignore_secondaries=False)
838
        Log("reboot with type '%s'", reboot_type, indent=2)
839
        ops.append(op)
840
      self.ExecOrQueue(instance, ops)
841

    
842
  @_DoCheckInstances
843
  @_DoBatch(True)
844
  def BurnActivateDisks(self):
845
    """Activate and deactivate disks of the instances."""
846
    Log("Activating/deactivating disks")
847
    for instance in self.instances:
848
      Log("instance %s", instance, indent=1)
849
      op_start = self.StartInstanceOp(instance)
850
      op_act = opcodes.OpInstanceActivateDisks(instance_name=instance)
851
      op_deact = opcodes.OpInstanceDeactivateDisks(instance_name=instance)
852
      op_stop = self.StopInstanceOp(instance)
853
      Log("activate disks when online", indent=2)
854
      Log("activate disks when offline", indent=2)
855
      Log("deactivate disks (when offline)", indent=2)
856
      self.ExecOrQueue(instance, [op_act, op_stop, op_act, op_deact, op_start])
857

    
858
  @_DoCheckInstances
859
  @_DoBatch(False)
860
  def BurnAddRemoveDisks(self):
861
    """Add and remove an extra disk for the instances."""
862
    Log("Adding and removing disks")
863
    for instance in self.instances:
864
      Log("instance %s", instance, indent=1)
865
      op_add = opcodes.OpSetInstanceParams(\
866
        instance_name=instance,
867
        disks=[(constants.DDM_ADD, {"size": self.disk_size[0]})])
868
      op_rem = opcodes.OpSetInstanceParams(\
869
        instance_name=instance, disks=[(constants.DDM_REMOVE, {})])
870
      op_stop = self.StopInstanceOp(instance)
871
      op_start = self.StartInstanceOp(instance)
872
      Log("adding a disk", indent=2)
873
      Log("removing last disk", indent=2)
874
      self.ExecOrQueue(instance, [op_add, op_stop, op_rem, op_start])
875

    
876
  @_DoBatch(False)
877
  def BurnAddRemoveNICs(self):
878
    """Add and remove an extra NIC for the instances."""
879
    Log("Adding and removing NICs")
880
    for instance in self.instances:
881
      Log("instance %s", instance, indent=1)
882
      op_add = opcodes.OpSetInstanceParams(\
883
        instance_name=instance, nics=[(constants.DDM_ADD, {})])
884
      op_rem = opcodes.OpSetInstanceParams(\
885
        instance_name=instance, nics=[(constants.DDM_REMOVE, {})])
886
      Log("adding a NIC", indent=2)
887
      Log("removing last NIC", indent=2)
888
      self.ExecOrQueue(instance, [op_add, op_rem])
889

    
890
  def ConfdCallback(self, reply):
891
    """Callback for confd queries"""
892
    if reply.type == confd_client.UPCALL_REPLY:
893
      if reply.server_reply.status != constants.CONFD_REPL_STATUS_OK:
894
        Err("Query %s gave non-ok status %s: %s" % (reply.orig_request,
895
                                                    reply.server_reply.status,
896
                                                    reply.server_reply))
897
      if reply.orig_request.type == constants.CONFD_REQ_PING:
898
        Log("Ping: OK", indent=1)
899
      elif reply.orig_request.type == constants.CONFD_REQ_CLUSTER_MASTER:
900
        if reply.server_reply.answer == self.cluster_info["master"]:
901
          Log("Master: OK", indent=1)
902
        else:
903
          Err("Master: wrong: %s" % reply.server_reply.answer)
904
      elif reply.orig_request.type == constants.CONFD_REQ_NODE_ROLE_BYNAME:
905
        if reply.server_reply.answer == constants.CONFD_NODE_ROLE_MASTER:
906
          Log("Node role for master: OK", indent=1)
907
        else:
908
          Err("Node role for master: wrong: %s" % reply.server_reply.answer)
909

    
910
  def DoConfdRequestReply(self, req):
911
    self.confd_counting_callback.RegisterQuery(req.rsalt)
912
    self.confd_client.SendRequest(req, async=False)
913
    while not self.confd_counting_callback.AllAnswered():
914
      if not self.confd_client.ReceiveReply():
915
        Err("Did not receive all expected confd replies")
916
        break
917

    
918
  def BurnConfd(self):
919
    """Run confd queries for our instances.
920

    
921
    The following confd queries are tested:
922
      - CONFD_REQ_PING: simple ping
923
      - CONFD_REQ_CLUSTER_MASTER: cluster master
924
      - CONFD_REQ_NODE_ROLE_BYNAME: node role, for the master
925

    
926
    """
927
    Log("Checking confd results")
928

    
929
    filter_callback = confd_client.ConfdFilterCallback(self.ConfdCallback)
930
    counting_callback = confd_client.ConfdCountingCallback(filter_callback)
931
    self.confd_counting_callback = counting_callback
932

    
933
    self.confd_client = confd_client.GetConfdClient(counting_callback)
934

    
935
    req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
936
    self.DoConfdRequestReply(req)
937

    
938
    req = confd_client.ConfdClientRequest(
939
      type=constants.CONFD_REQ_CLUSTER_MASTER)
940
    self.DoConfdRequestReply(req)
941

    
942
    req = confd_client.ConfdClientRequest(
943
        type=constants.CONFD_REQ_NODE_ROLE_BYNAME,
944
        query=self.cluster_info["master"])
945
    self.DoConfdRequestReply(req)
946

    
947
  def _CheckInstanceAlive(self, instance):
948
    """Check if an instance is alive by doing http checks.
949

    
950
    This will try to retrieve the url on the instance /hostname.txt
951
    and check that it contains the hostname of the instance. In case
952
    we get ECONNREFUSED, we retry up to the net timeout seconds, for
953
    any other error we abort.
954

    
955
    """
956
    if not self.opts.http_check:
957
      return
958
    end_time = time.time() + self.opts.net_timeout
959
    url = None
960
    while time.time() < end_time and url is None:
961
      try:
962
        url = self.url_opener.open("http://%s/hostname.txt" % instance)
963
      except IOError:
964
        # here we can have connection refused, no route to host, etc.
965
        time.sleep(1)
966
    if url is None:
967
      raise InstanceDown(instance, "Cannot contact instance")
968
    hostname = url.read().strip()
969
    url.close()
970
    if hostname != instance:
971
      raise InstanceDown(instance, ("Hostname mismatch, expected %s, got %s" %
972
                                    (instance, hostname)))
973

    
974
  def BurninCluster(self):
975
    """Test a cluster intensively.
976

    
977
    This will create instances and then start/stop/failover them.
978
    It is safe for existing instances but could impact performance.
979

    
980
    """
981

    
982
    opts = self.opts
983

    
984
    Log("Testing global parameters")
985

    
986
    if (len(self.nodes) == 1 and
987
        opts.disk_template not in (constants.DT_DISKLESS, constants.DT_PLAIN,
988
                                   constants.DT_FILE)):
989
      Err("When one node is available/selected the disk template must"
990
          " be 'diskless', 'file' or 'plain'")
991

    
992
    has_err = True
993
    try:
994
      self.BurnCreateInstances()
995
      if opts.do_replace1 and opts.disk_template in constants.DTS_NET_MIRROR:
996
        self.BurnReplaceDisks1D8()
997
      if (opts.do_replace2 and len(self.nodes) > 2 and
998
          opts.disk_template in constants.DTS_NET_MIRROR) :
999
        self.BurnReplaceDisks2()
1000

    
1001
      if (opts.disk_template in constants.DTS_GROWABLE and
1002
          compat.any(n > 0 for n in self.disk_growth)):
1003
        self.BurnGrowDisks()
1004

    
1005
      if opts.do_failover and opts.disk_template in constants.DTS_NET_MIRROR:
1006
        self.BurnFailover()
1007

    
1008
      if opts.do_migrate:
1009
        if opts.disk_template != constants.DT_DRBD8:
1010
          Log("Skipping migration (disk template not DRBD8)")
1011
        elif not self.hv_class.CAN_MIGRATE:
1012
          Log("Skipping migration (hypervisor %s does not support it)",
1013
              self.hypervisor)
1014
        else:
1015
          self.BurnMigrate()
1016

    
1017
      if (opts.do_move and len(self.nodes) > 1 and
1018
          opts.disk_template in [constants.DT_PLAIN, constants.DT_FILE]):
1019
        self.BurnMove()
1020

    
1021
      if (opts.do_importexport and
1022
          opts.disk_template not in (constants.DT_DISKLESS,
1023
                                     constants.DT_FILE)):
1024
        self.BurnImportExport()
1025

    
1026
      if opts.do_reinstall:
1027
        self.BurnReinstall()
1028

    
1029
      if opts.do_reboot:
1030
        self.BurnReboot()
1031

    
1032
      if opts.do_addremove_disks:
1033
        self.BurnAddRemoveDisks()
1034

    
1035
      default_nic_mode = self.cluster_default_nicparams[constants.NIC_MODE]
1036
      # Don't add/remove nics in routed mode, as we would need an ip to add
1037
      # them with
1038
      if opts.do_addremove_nics:
1039
        if default_nic_mode == constants.NIC_MODE_BRIDGED:
1040
          self.BurnAddRemoveNICs()
1041
        else:
1042
          Log("Skipping nic add/remove as the cluster is not in bridged mode")
1043

    
1044
      if opts.do_activate_disks:
1045
        self.BurnActivateDisks()
1046

    
1047
      if opts.rename:
1048
        self.BurnRename()
1049

    
1050
      if opts.do_confd_tests:
1051
        self.BurnConfd()
1052

    
1053
      if opts.do_startstop:
1054
        self.BurnStopStart()
1055

    
1056
      has_err = False
1057
    finally:
1058
      if has_err:
1059
        Log("Error detected: opcode buffer follows:\n\n")
1060
        Log(self.GetFeedbackBuf())
1061
        Log("\n\n")
1062
      if not self.opts.keep_instances:
1063
        try:
1064
          self.BurnRemove()
1065
        except Exception, err:  # pylint: disable-msg=W0703
1066
          if has_err: # already detected errors, so errors in removal
1067
                      # are quite expected
1068
            Log("Note: error detected during instance remove: %s", err)
1069
          else: # non-expected error
1070
            raise
1071

    
1072
    return 0
1073

    
1074

    
1075
def main():
1076
  """Main function"""
1077

    
1078
  burner = Burner()
1079
  return burner.BurninCluster()
1080

    
1081

    
1082
if __name__ == "__main__":
1083
  main()