Statistics
| Branch: | Tag: | Revision:

root / tools / burnin @ d22dfef7

History | View | Annotate | Download (38.2 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Burnin program
23

    
24
"""
25

    
26
import sys
27
import optparse
28
import time
29
import socket
30
import urllib
31
from itertools import izip, islice, cycle
32
from cStringIO import StringIO
33

    
34
from ganeti import opcodes
35
from ganeti import constants
36
from ganeti import cli
37
from ganeti import errors
38
from ganeti import utils
39
from ganeti import hypervisor
40
from ganeti import compat
41

    
42
from ganeti.confd import client as confd_client
43

    
44

    
45
USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...")
46

    
47
MAX_RETRIES = 3
48
LOG_HEADERS = {
49
  0: "- ",
50
  1: "* ",
51
  2: ""
52
  }
53

    
54
class InstanceDown(Exception):
55
  """The checked instance was not up"""
56

    
57

    
58
class BurninFailure(Exception):
59
  """Failure detected during burning"""
60

    
61

    
62
def Usage():
63
  """Shows program usage information and exits the program."""
64

    
65
  print >> sys.stderr, "Usage:"
66
  print >> sys.stderr, USAGE
67
  sys.exit(2)
68

    
69

    
70
def Log(msg, *args, **kwargs):
71
  """Simple function that prints out its argument.
72

    
73
  """
74
  if args:
75
    msg = msg % args
76
  indent = kwargs.get('indent', 0)
77
  sys.stdout.write("%*s%s%s\n" % (2*indent, "",
78
                                  LOG_HEADERS.get(indent, "  "), msg))
79
  sys.stdout.flush()
80

    
81

    
82
def Err(msg, exit_code=1):
83
  """Simple error logging that prints to stderr.
84

    
85
  """
86
  sys.stderr.write(msg + "\n")
87
  sys.stderr.flush()
88
  sys.exit(exit_code)
89

    
90

    
91
class SimpleOpener(urllib.FancyURLopener):
92
  """A simple url opener"""
93
  # pylint: disable-msg=W0221
94

    
95
  def prompt_user_passwd(self, host, realm, clear_cache=0):
96
    """No-interaction version of prompt_user_passwd."""
97
    # we follow parent class' API
98
    # pylint: disable-msg=W0613
99
    return None, None
100

    
101
  def http_error_default(self, url, fp, errcode, errmsg, headers):
102
    """Custom error handling"""
103
    # make sure sockets are not left in CLOSE_WAIT, this is similar
104
    # but with a different exception to the BasicURLOpener class
105
    _ = fp.read() # throw away data
106
    fp.close()
107
    raise InstanceDown("HTTP error returned: code %s, msg %s" %
108
                       (errcode, errmsg))
109

    
110

    
111
OPTIONS = [
112
  cli.cli_option("-o", "--os", dest="os", default=None,
113
                 help="OS to use during burnin",
114
                 metavar="<OS>",
115
                 completion_suggest=cli.OPT_COMPL_ONE_OS),
116
  cli.HYPERVISOR_OPT,
117
  cli.OSPARAMS_OPT,
118
  cli.cli_option("--disk-size", dest="disk_size",
119
                 help="Disk size (determines disk count)",
120
                 default="128m", type="string", metavar="<size,size,...>",
121
                 completion_suggest=("128M 512M 1G 4G 1G,256M"
122
                                     " 4G,1G,1G 10G").split()),
123
  cli.cli_option("--disk-growth", dest="disk_growth", help="Disk growth",
124
                 default="128m", type="string", metavar="<size,size,...>"),
125
  cli.cli_option("--mem-size", dest="mem_size", help="Memory size",
126
                 default=128, type="unit", metavar="<size>",
127
                 completion_suggest=("128M 256M 512M 1G 4G 8G"
128
                                     " 12G 16G").split()),
129
  cli.DEBUG_OPT,
130
  cli.VERBOSE_OPT,
131
  cli.NOIPCHECK_OPT,
132
  cli.NONAMECHECK_OPT,
133
  cli.EARLY_RELEASE_OPT,
134
  cli.cli_option("--no-replace1", dest="do_replace1",
135
                 help="Skip disk replacement with the same secondary",
136
                 action="store_false", default=True),
137
  cli.cli_option("--no-replace2", dest="do_replace2",
138
                 help="Skip disk replacement with a different secondary",
139
                 action="store_false", default=True),
140
  cli.cli_option("--no-failover", dest="do_failover",
141
                 help="Skip instance failovers", action="store_false",
142
                 default=True),
143
  cli.cli_option("--no-migrate", dest="do_migrate",
144
                 help="Skip instance live migration",
145
                 action="store_false", default=True),
146
  cli.cli_option("--no-move", dest="do_move",
147
                 help="Skip instance moves", action="store_false",
148
                 default=True),
149
  cli.cli_option("--no-importexport", dest="do_importexport",
150
                 help="Skip instance export/import", action="store_false",
151
                 default=True),
152
  cli.cli_option("--no-startstop", dest="do_startstop",
153
                 help="Skip instance stop/start", action="store_false",
154
                 default=True),
155
  cli.cli_option("--no-reinstall", dest="do_reinstall",
156
                 help="Skip instance reinstall", action="store_false",
157
                 default=True),
158
  cli.cli_option("--no-reboot", dest="do_reboot",
159
                 help="Skip instance reboot", action="store_false",
160
                 default=True),
161
  cli.cli_option("--reboot-types", dest="reboot_types",
162
                 help="Specify the reboot types", default=None),
163
  cli.cli_option("--no-activate-disks", dest="do_activate_disks",
164
                 help="Skip disk activation/deactivation",
165
                 action="store_false", default=True),
166
  cli.cli_option("--no-add-disks", dest="do_addremove_disks",
167
                 help="Skip disk addition/removal",
168
                 action="store_false", default=True),
169
  cli.cli_option("--no-add-nics", dest="do_addremove_nics",
170
                 help="Skip NIC addition/removal",
171
                 action="store_false", default=True),
172
  cli.cli_option("--no-nics", dest="nics",
173
                 help="No network interfaces", action="store_const",
174
                 const=[], default=[{}]),
175
  cli.cli_option("--no-confd", dest="do_confd_tests",
176
                 help="Skip confd queries",
177
                 action="store_false", default=True),
178
  cli.cli_option("--rename", dest="rename", default=None,
179
                 help=("Give one unused instance name which is taken"
180
                       " to start the renaming sequence"),
181
                 metavar="<instance_name>"),
182
  cli.cli_option("-t", "--disk-template", dest="disk_template",
183
                 choices=list(constants.DISK_TEMPLATES),
184
                 default=constants.DT_DRBD8,
185
                 help="Disk template (diskless, file, plain or drbd) [drbd]"),
186
  cli.cli_option("-n", "--nodes", dest="nodes", default="",
187
                 help=("Comma separated list of nodes to perform"
188
                       " the burnin on (defaults to all nodes)"),
189
                 completion_suggest=cli.OPT_COMPL_MANY_NODES),
190
  cli.cli_option("-I", "--iallocator", dest="iallocator",
191
                 default=None, type="string",
192
                 help=("Perform the allocation using an iallocator"
193
                       " instead of fixed node spread (node restrictions no"
194
                       " longer apply, therefore -n/--nodes must not be"
195
                       " used"),
196
                 completion_suggest=cli.OPT_COMPL_ONE_IALLOCATOR),
197
  cli.cli_option("-p", "--parallel", default=False, action="store_true",
198
                 dest="parallel",
199
                 help=("Enable parallelization of some operations in"
200
                       " order to speed burnin or to test granular locking")),
201
  cli.cli_option("--net-timeout", default=15, type="int",
202
                 dest="net_timeout",
203
                 help=("The instance check network timeout in seconds"
204
                       " (defaults to 15 seconds)"),
205
                 completion_suggest="15 60 300 900".split()),
206
  cli.cli_option("-C", "--http-check", default=False, action="store_true",
207
                 dest="http_check",
208
                 help=("Enable checking of instance status via http,"
209
                       " looking for /hostname.txt that should contain the"
210
                       " name of the instance")),
211
  cli.cli_option("-K", "--keep-instances", default=False,
212
                 action="store_true",
213
                 dest="keep_instances",
214
                 help=("Leave instances on the cluster after burnin,"
215
                       " for investigation in case of errors or simply"
216
                       " to use them")),
217
  ]
218

    
219
# Mainly used for bash completion
220
ARGUMENTS = [cli.ArgInstance(min=1)]
221

    
222

    
223
def _DoCheckInstances(fn):
224
  """Decorator for checking instances.
225

    
226
  """
227
  def wrapper(self, *args, **kwargs):
228
    val = fn(self, *args, **kwargs)
229
    for instance in self.instances:
230
      self._CheckInstanceAlive(instance) # pylint: disable-msg=W0212
231
    return val
232

    
233
  return wrapper
234

    
235

    
236
def _DoBatch(retry):
237
  """Decorator for possible batch operations.
238

    
239
  Must come after the _DoCheckInstances decorator (if any).
240

    
241
  @param retry: whether this is a retryable batch, will be
242
      passed to StartBatch
243

    
244
  """
245
  def wrap(fn):
246
    def batched(self, *args, **kwargs):
247
      self.StartBatch(retry)
248
      val = fn(self, *args, **kwargs)
249
      self.CommitQueue()
250
      return val
251
    return batched
252

    
253
  return wrap
254

    
255

    
256
class Burner(object):
257
  """Burner class."""
258

    
259
  def __init__(self):
260
    """Constructor."""
261
    utils.SetupLogging(constants.LOG_BURNIN, debug=False, stderr_logging=True)
262
    self.url_opener = SimpleOpener()
263
    self._feed_buf = StringIO()
264
    self.nodes = []
265
    self.instances = []
266
    self.to_rem = []
267
    self.queued_ops = []
268
    self.opts = None
269
    self.queue_retry = False
270
    self.disk_count = self.disk_growth = self.disk_size = None
271
    self.hvp = self.bep = None
272
    self.ParseOptions()
273
    self.cl = cli.GetClient()
274
    self.GetState()
275

    
276
  def ClearFeedbackBuf(self):
277
    """Clear the feedback buffer."""
278
    self._feed_buf.truncate(0)
279

    
280
  def GetFeedbackBuf(self):
281
    """Return the contents of the buffer."""
282
    return self._feed_buf.getvalue()
283

    
284
  def Feedback(self, msg):
285
    """Acumulate feedback in our buffer."""
286
    formatted_msg = "%s %s" % (time.ctime(utils.MergeTime(msg[0])), msg[2])
287
    self._feed_buf.write(formatted_msg + "\n")
288
    if self.opts.verbose:
289
      Log(formatted_msg, indent=3)
290

    
291
  def MaybeRetry(self, retry_count, msg, fn, *args):
292
    """Possibly retry a given function execution.
293

    
294
    @type retry_count: int
295
    @param retry_count: retry counter:
296
        - 0: non-retryable action
297
        - 1: last retry for a retryable action
298
        - MAX_RETRIES: original try for a retryable action
299
    @type msg: str
300
    @param msg: the kind of the operation
301
    @type fn: callable
302
    @param fn: the function to be called
303

    
304
    """
305
    try:
306
      val = fn(*args)
307
      if retry_count > 0 and retry_count < MAX_RETRIES:
308
        Log("Idempotent %s succeeded after %d retries",
309
            msg, MAX_RETRIES - retry_count)
310
      return val
311
    except Exception, err: # pylint: disable-msg=W0703
312
      if retry_count == 0:
313
        Log("Non-idempotent %s failed, aborting", msg)
314
        raise
315
      elif retry_count == 1:
316
        Log("Idempotent %s repeated failure, aborting", msg)
317
        raise
318
      else:
319
        Log("Idempotent %s failed, retry #%d/%d: %s",
320
            msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err)
321
        self.MaybeRetry(retry_count - 1, msg, fn, *args)
322

    
323
  def _SetDebug(self, ops):
324
    """Set the debug value on the given opcodes"""
325
    for op in ops:
326
      op.debug_level = self.opts.debug
327

    
328
  def _ExecOp(self, *ops):
329
    """Execute one or more opcodes and manage the exec buffer.
330

    
331
    @return: if only opcode has been passed, we return its result;
332
        otherwise we return the list of results
333

    
334
    """
335
    job_id = cli.SendJob(ops, cl=self.cl)
336
    results = cli.PollJob(job_id, cl=self.cl, feedback_fn=self.Feedback)
337
    if len(ops) == 1:
338
      return results[0]
339
    else:
340
      return results
341

    
342
  def ExecOp(self, retry, *ops):
343
    """Execute one or more opcodes and manage the exec buffer.
344

    
345
    @return: if only opcode has been passed, we return its result;
346
        otherwise we return the list of results
347

    
348
    """
349
    if retry:
350
      rval = MAX_RETRIES
351
    else:
352
      rval = 0
353
    self._SetDebug(ops)
354
    return self.MaybeRetry(rval, "opcode", self._ExecOp, *ops)
355

    
356
  def ExecOrQueue(self, name, ops, post_process=None):
357
    """Execute an opcode and manage the exec buffer."""
358
    if self.opts.parallel:
359
      self._SetDebug(ops)
360
      self.queued_ops.append((ops, name, post_process))
361
    else:
362
      val = self.ExecOp(self.queue_retry, *ops) # pylint: disable-msg=W0142
363
      if post_process is not None:
364
        post_process()
365
      return val
366

    
367
  def StartBatch(self, retry):
368
    """Start a new batch of jobs.
369

    
370
    @param retry: whether this is a retryable batch
371

    
372
    """
373
    self.queued_ops = []
374
    self.queue_retry = retry
375

    
376
  def CommitQueue(self):
377
    """Execute all submitted opcodes in case of parallel burnin"""
378
    if not self.opts.parallel or not self.queued_ops:
379
      return
380

    
381
    if self.queue_retry:
382
      rval = MAX_RETRIES
383
    else:
384
      rval = 0
385

    
386
    try:
387
      results = self.MaybeRetry(rval, "jobset", self.ExecJobSet,
388
                                self.queued_ops)
389
    finally:
390
      self.queued_ops = []
391
    return results
392

    
393
  def ExecJobSet(self, jobs):
394
    """Execute a set of jobs and return once all are done.
395

    
396
    The method will return the list of results, if all jobs are
397
    successful. Otherwise, OpExecError will be raised from within
398
    cli.py.
399

    
400
    """
401
    self.ClearFeedbackBuf()
402
    jex = cli.JobExecutor(cl=self.cl, feedback_fn=self.Feedback)
403
    for ops, name, _ in jobs:
404
      jex.QueueJob(name, *ops) # pylint: disable-msg=W0142
405
    try:
406
      results = jex.GetResults()
407
    except Exception, err: # pylint: disable-msg=W0703
408
      Log("Jobs failed: %s", err)
409
      raise BurninFailure()
410

    
411
    fail = False
412
    val = []
413
    for (_, name, post_process), (success, result) in zip(jobs, results):
414
      if success:
415
        if post_process:
416
          try:
417
            post_process()
418
          except Exception, err: # pylint: disable-msg=W0703
419
            Log("Post process call for job %s failed: %s", name, err)
420
            fail = True
421
        val.append(result)
422
      else:
423
        fail = True
424

    
425
    if fail:
426
      raise BurninFailure()
427

    
428
    return val
429

    
430
  def ParseOptions(self):
431
    """Parses the command line options.
432

    
433
    In case of command line errors, it will show the usage and exit the
434
    program.
435

    
436
    """
437
    parser = optparse.OptionParser(usage="\n%s" % USAGE,
438
                                   version=("%%prog (ganeti) %s" %
439
                                            constants.RELEASE_VERSION),
440
                                   option_list=OPTIONS)
441

    
442
    options, args = parser.parse_args()
443
    if len(args) < 1 or options.os is None:
444
      Usage()
445

    
446
    supported_disk_templates = (constants.DT_DISKLESS,
447
                                constants.DT_FILE,
448
                                constants.DT_PLAIN,
449
                                constants.DT_DRBD8)
450
    if options.disk_template not in supported_disk_templates:
451
      Err("Unknown disk template '%s'" % options.disk_template)
452

    
453
    if options.disk_template == constants.DT_DISKLESS:
454
      disk_size = disk_growth = []
455
      options.do_addremove_disks = False
456
    else:
457
      disk_size = [utils.ParseUnit(v) for v in options.disk_size.split(",")]
458
      disk_growth = [utils.ParseUnit(v)
459
                     for v in options.disk_growth.split(",")]
460
      if len(disk_growth) != len(disk_size):
461
        Err("Wrong disk sizes/growth combination")
462
    if ((disk_size and options.disk_template == constants.DT_DISKLESS) or
463
        (not disk_size and options.disk_template != constants.DT_DISKLESS)):
464
      Err("Wrong disk count/disk template combination")
465

    
466
    self.disk_size = disk_size
467
    self.disk_growth = disk_growth
468
    self.disk_count = len(disk_size)
469

    
470
    if options.nodes and options.iallocator:
471
      Err("Give either the nodes option or the iallocator option, not both")
472

    
473
    if options.http_check and not options.name_check:
474
      Err("Can't enable HTTP checks without name checks")
475

    
476
    self.opts = options
477
    self.instances = args
478
    self.bep = {
479
      constants.BE_MEMORY: options.mem_size,
480
      constants.BE_VCPUS: 1,
481
      }
482

    
483
    self.hypervisor = None
484
    self.hvp = {}
485
    if options.hypervisor:
486
      self.hypervisor, self.hvp = options.hypervisor
487

    
488
    if options.reboot_types is None:
489
      options.reboot_types = constants.REBOOT_TYPES
490
    else:
491
      options.reboot_types = options.reboot_types.split(",")
492
      rt_diff = set(options.reboot_types).difference(constants.REBOOT_TYPES)
493
      if rt_diff:
494
        Err("Invalid reboot types specified: %s" % utils.CommaJoin(rt_diff))
495

    
496
    socket.setdefaulttimeout(options.net_timeout)
497

    
498
  def GetState(self):
499
    """Read the cluster state from the master daemon."""
500
    if self.opts.nodes:
501
      names = self.opts.nodes.split(",")
502
    else:
503
      names = []
504
    try:
505
      op = opcodes.OpQueryNodes(output_fields=["name", "offline", "drained"],
506
                                names=names, use_locking=True)
507
      result = self.ExecOp(True, op)
508
    except errors.GenericError, err:
509
      err_code, msg = cli.FormatError(err)
510
      Err(msg, exit_code=err_code)
511
    self.nodes = [data[0] for data in result if not (data[1] or data[2])]
512

    
513
    op_diagnose = opcodes.OpDiagnoseOS(output_fields=["name", "variants"],
514
                                       names=[])
515
    result = self.ExecOp(True, op_diagnose)
516

    
517
    if not result:
518
      Err("Can't get the OS list")
519

    
520
    found = False
521
    for (name, variants) in result:
522
      if self.opts.os in cli.CalculateOSNames(name, variants):
523
        found = True
524
        break
525

    
526
    if not found:
527
      Err("OS '%s' not found" % self.opts.os)
528

    
529
    cluster_info = self.cl.QueryClusterInfo()
530
    self.cluster_info = cluster_info
531
    if not self.cluster_info:
532
      Err("Can't get cluster info")
533

    
534
    default_nic_params = self.cluster_info["nicparams"][constants.PP_DEFAULT]
535
    self.cluster_default_nicparams = default_nic_params
536
    if self.hypervisor is None:
537
      self.hypervisor = self.cluster_info["default_hypervisor"]
538
    self.hv_class = hypervisor.GetHypervisorClass(self.hypervisor)
539

    
540
  @_DoCheckInstances
541
  @_DoBatch(False)
542
  def BurnCreateInstances(self):
543
    """Create the given instances.
544

    
545
    """
546
    self.to_rem = []
547
    mytor = izip(cycle(self.nodes),
548
                 islice(cycle(self.nodes), 1, None),
549
                 self.instances)
550

    
551
    Log("Creating instances")
552
    for pnode, snode, instance in mytor:
553
      Log("instance %s", instance, indent=1)
554
      if self.opts.iallocator:
555
        pnode = snode = None
556
        msg = "with iallocator %s" % self.opts.iallocator
557
      elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
558
        snode = None
559
        msg = "on %s" % pnode
560
      else:
561
        msg = "on %s, %s" % (pnode, snode)
562

    
563
      Log(msg, indent=2)
564

    
565
      op = opcodes.OpCreateInstance(instance_name=instance,
566
                                    disks = [ {"size": size}
567
                                              for size in self.disk_size],
568
                                    disk_template=self.opts.disk_template,
569
                                    nics=self.opts.nics,
570
                                    mode=constants.INSTANCE_CREATE,
571
                                    os_type=self.opts.os,
572
                                    pnode=pnode,
573
                                    snode=snode,
574
                                    start=True,
575
                                    ip_check=self.opts.ip_check,
576
                                    name_check=self.opts.name_check,
577
                                    wait_for_sync=True,
578
                                    file_driver="loop",
579
                                    file_storage_dir=None,
580
                                    iallocator=self.opts.iallocator,
581
                                    beparams=self.bep,
582
                                    hvparams=self.hvp,
583
                                    hypervisor=self.hypervisor,
584
                                    osparams=self.opts.osparams,
585
                                    )
586
      remove_instance = lambda name: lambda: self.to_rem.append(name)
587
      self.ExecOrQueue(instance, [op], post_process=remove_instance(instance))
588

    
589
  @_DoBatch(False)
590
  def BurnGrowDisks(self):
591
    """Grow both the os and the swap disks by the requested amount, if any."""
592
    Log("Growing disks")
593
    for instance in self.instances:
594
      Log("instance %s", instance, indent=1)
595
      for idx, growth in enumerate(self.disk_growth):
596
        if growth > 0:
597
          op = opcodes.OpGrowDisk(instance_name=instance, disk=idx,
598
                                  amount=growth, wait_for_sync=True)
599
          Log("increase disk/%s by %s MB", idx, growth, indent=2)
600
          self.ExecOrQueue(instance, [op])
601

    
602
  @_DoBatch(True)
603
  def BurnReplaceDisks1D8(self):
604
    """Replace disks on primary and secondary for drbd8."""
605
    Log("Replacing disks on the same nodes")
606
    for instance in self.instances:
607
      Log("instance %s", instance, indent=1)
608
      ops = []
609
      for mode in constants.REPLACE_DISK_SEC, constants.REPLACE_DISK_PRI:
610
        op = opcodes.OpReplaceDisks(instance_name=instance,
611
                                    mode=mode,
612
                                    disks=[i for i in range(self.disk_count)],
613
                                    early_release=self.opts.early_release)
614
        Log("run %s", mode, indent=2)
615
        ops.append(op)
616
      self.ExecOrQueue(instance, ops)
617

    
618
  @_DoBatch(True)
619
  def BurnReplaceDisks2(self):
620
    """Replace secondary node."""
621
    Log("Changing the secondary node")
622
    mode = constants.REPLACE_DISK_CHG
623

    
624
    mytor = izip(islice(cycle(self.nodes), 2, None),
625
                 self.instances)
626
    for tnode, instance in mytor:
627
      Log("instance %s", instance, indent=1)
628
      if self.opts.iallocator:
629
        tnode = None
630
        msg = "with iallocator %s" % self.opts.iallocator
631
      else:
632
        msg = tnode
633
      op = opcodes.OpReplaceDisks(instance_name=instance,
634
                                  mode=mode,
635
                                  remote_node=tnode,
636
                                  iallocator=self.opts.iallocator,
637
                                  disks=[],
638
                                  early_release=self.opts.early_release)
639
      Log("run %s %s", mode, msg, indent=2)
640
      self.ExecOrQueue(instance, [op])
641

    
642
  @_DoCheckInstances
643
  @_DoBatch(False)
644
  def BurnFailover(self):
645
    """Failover the instances."""
646
    Log("Failing over instances")
647
    for instance in self.instances:
648
      Log("instance %s", instance, indent=1)
649
      op = opcodes.OpFailoverInstance(instance_name=instance,
650
                                      ignore_consistency=False)
651
      self.ExecOrQueue(instance, [op])
652

    
653
  @_DoCheckInstances
654
  @_DoBatch(False)
655
  def BurnMove(self):
656
    """Move the instances."""
657
    Log("Moving instances")
658
    mytor = izip(islice(cycle(self.nodes), 1, None),
659
                 self.instances)
660
    for tnode, instance in mytor:
661
      Log("instance %s", instance, indent=1)
662
      op = opcodes.OpMoveInstance(instance_name=instance,
663
                                  target_node=tnode)
664
      self.ExecOrQueue(instance, [op])
665

    
666
  @_DoBatch(False)
667
  def BurnMigrate(self):
668
    """Migrate the instances."""
669
    Log("Migrating instances")
670
    for instance in self.instances:
671
      Log("instance %s", instance, indent=1)
672
      op1 = opcodes.OpMigrateInstance(instance_name=instance, mode=None,
673
                                      cleanup=False)
674

    
675
      op2 = opcodes.OpMigrateInstance(instance_name=instance, mode=None,
676
                                      cleanup=True)
677
      Log("migration and migration cleanup", indent=2)
678
      self.ExecOrQueue(instance, [op1, op2])
679

    
680
  @_DoCheckInstances
681
  @_DoBatch(False)
682
  def BurnImportExport(self):
683
    """Export the instance, delete it, and import it back.
684

    
685
    """
686
    Log("Exporting and re-importing instances")
687
    mytor = izip(cycle(self.nodes),
688
                 islice(cycle(self.nodes), 1, None),
689
                 islice(cycle(self.nodes), 2, None),
690
                 self.instances)
691

    
692
    for pnode, snode, enode, instance in mytor:
693
      Log("instance %s", instance, indent=1)
694
      # read the full name of the instance
695
      nam_op = opcodes.OpQueryInstances(output_fields=["name"],
696
                                        names=[instance], use_locking=True)
697
      full_name = self.ExecOp(False, nam_op)[0][0]
698

    
699
      if self.opts.iallocator:
700
        pnode = snode = None
701
        import_log_msg = ("import from %s"
702
                          " with iallocator %s" %
703
                          (enode, self.opts.iallocator))
704
      elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
705
        snode = None
706
        import_log_msg = ("import from %s to %s" %
707
                          (enode, pnode))
708
      else:
709
        import_log_msg = ("import from %s to %s, %s" %
710
                          (enode, pnode, snode))
711

    
712
      exp_op = opcodes.OpExportInstance(instance_name=instance,
713
                                        target_node=enode,
714
                                        mode=constants.EXPORT_MODE_LOCAL,
715
                                        shutdown=True)
716
      rem_op = opcodes.OpRemoveInstance(instance_name=instance,
717
                                        ignore_failures=True)
718
      imp_dir = utils.PathJoin(constants.EXPORT_DIR, full_name)
719
      imp_op = opcodes.OpCreateInstance(instance_name=instance,
720
                                        disks = [ {"size": size}
721
                                                  for size in self.disk_size],
722
                                        disk_template=self.opts.disk_template,
723
                                        nics=self.opts.nics,
724
                                        mode=constants.INSTANCE_IMPORT,
725
                                        src_node=enode,
726
                                        src_path=imp_dir,
727
                                        pnode=pnode,
728
                                        snode=snode,
729
                                        start=True,
730
                                        ip_check=self.opts.ip_check,
731
                                        name_check=self.opts.name_check,
732
                                        wait_for_sync=True,
733
                                        file_storage_dir=None,
734
                                        file_driver="loop",
735
                                        iallocator=self.opts.iallocator,
736
                                        beparams=self.bep,
737
                                        hvparams=self.hvp,
738
                                        osparams=self.opts.osparams,
739
                                        )
740

    
741
      erem_op = opcodes.OpRemoveExport(instance_name=instance)
742

    
743
      Log("export to node %s", enode, indent=2)
744
      Log("remove instance", indent=2)
745
      Log(import_log_msg, indent=2)
746
      Log("remove export", indent=2)
747
      self.ExecOrQueue(instance, [exp_op, rem_op, imp_op, erem_op])
748

    
749
  @staticmethod
750
  def StopInstanceOp(instance):
751
    """Stop given instance."""
752
    return opcodes.OpShutdownInstance(instance_name=instance)
753

    
754
  @staticmethod
755
  def StartInstanceOp(instance):
756
    """Start given instance."""
757
    return opcodes.OpStartupInstance(instance_name=instance, force=False)
758

    
759
  @staticmethod
760
  def RenameInstanceOp(instance, instance_new):
761
    """Rename instance."""
762
    return opcodes.OpRenameInstance(instance_name=instance,
763
                                    new_name=instance_new)
764

    
765
  @_DoCheckInstances
766
  @_DoBatch(True)
767
  def BurnStopStart(self):
768
    """Stop/start the instances."""
769
    Log("Stopping and starting instances")
770
    for instance in self.instances:
771
      Log("instance %s", instance, indent=1)
772
      op1 = self.StopInstanceOp(instance)
773
      op2 = self.StartInstanceOp(instance)
774
      self.ExecOrQueue(instance, [op1, op2])
775

    
776
  @_DoBatch(False)
777
  def BurnRemove(self):
778
    """Remove the instances."""
779
    Log("Removing instances")
780
    for instance in self.to_rem:
781
      Log("instance %s", instance, indent=1)
782
      op = opcodes.OpRemoveInstance(instance_name=instance,
783
                                    ignore_failures=True)
784
      self.ExecOrQueue(instance, [op])
785

    
786
  def BurnRename(self):
787
    """Rename the instances.
788

    
789
    Note that this function will not execute in parallel, since we
790
    only have one target for rename.
791

    
792
    """
793
    Log("Renaming instances")
794
    rename = self.opts.rename
795
    for instance in self.instances:
796
      Log("instance %s", instance, indent=1)
797
      op_stop1 = self.StopInstanceOp(instance)
798
      op_stop2 = self.StopInstanceOp(rename)
799
      op_rename1 = self.RenameInstanceOp(instance, rename)
800
      op_rename2 = self.RenameInstanceOp(rename, instance)
801
      op_start1 = self.StartInstanceOp(rename)
802
      op_start2 = self.StartInstanceOp(instance)
803
      self.ExecOp(False, op_stop1, op_rename1, op_start1)
804
      self._CheckInstanceAlive(rename)
805
      self.ExecOp(False, op_stop2, op_rename2, op_start2)
806
      self._CheckInstanceAlive(instance)
807

    
808
  @_DoCheckInstances
809
  @_DoBatch(True)
810
  def BurnReinstall(self):
811
    """Reinstall the instances."""
812
    Log("Reinstalling instances")
813
    for instance in self.instances:
814
      Log("instance %s", instance, indent=1)
815
      op1 = self.StopInstanceOp(instance)
816
      op2 = opcodes.OpReinstallInstance(instance_name=instance)
817
      Log("reinstall without passing the OS", indent=2)
818
      op3 = opcodes.OpReinstallInstance(instance_name=instance,
819
                                        os_type=self.opts.os)
820
      Log("reinstall specifying the OS", indent=2)
821
      op4 = self.StartInstanceOp(instance)
822
      self.ExecOrQueue(instance, [op1, op2, op3, op4])
823

    
824
  @_DoCheckInstances
825
  @_DoBatch(True)
826
  def BurnReboot(self):
827
    """Reboot the instances."""
828
    Log("Rebooting instances")
829
    for instance in self.instances:
830
      Log("instance %s", instance, indent=1)
831
      ops = []
832
      for reboot_type in self.opts.reboot_types:
833
        op = opcodes.OpRebootInstance(instance_name=instance,
834
                                      reboot_type=reboot_type,
835
                                      ignore_secondaries=False)
836
        Log("reboot with type '%s'", reboot_type, indent=2)
837
        ops.append(op)
838
      self.ExecOrQueue(instance, ops)
839

    
840
  @_DoCheckInstances
841
  @_DoBatch(True)
842
  def BurnActivateDisks(self):
843
    """Activate and deactivate disks of the instances."""
844
    Log("Activating/deactivating disks")
845
    for instance in self.instances:
846
      Log("instance %s", instance, indent=1)
847
      op_start = self.StartInstanceOp(instance)
848
      op_act = opcodes.OpActivateInstanceDisks(instance_name=instance)
849
      op_deact = opcodes.OpDeactivateInstanceDisks(instance_name=instance)
850
      op_stop = self.StopInstanceOp(instance)
851
      Log("activate disks when online", indent=2)
852
      Log("activate disks when offline", indent=2)
853
      Log("deactivate disks (when offline)", indent=2)
854
      self.ExecOrQueue(instance, [op_act, op_stop, op_act, op_deact, op_start])
855

    
856
  @_DoCheckInstances
857
  @_DoBatch(False)
858
  def BurnAddRemoveDisks(self):
859
    """Add and remove an extra disk for the instances."""
860
    Log("Adding and removing disks")
861
    for instance in self.instances:
862
      Log("instance %s", instance, indent=1)
863
      op_add = opcodes.OpSetInstanceParams(\
864
        instance_name=instance,
865
        disks=[(constants.DDM_ADD, {"size": self.disk_size[0]})])
866
      op_rem = opcodes.OpSetInstanceParams(\
867
        instance_name=instance, disks=[(constants.DDM_REMOVE, {})])
868
      op_stop = self.StopInstanceOp(instance)
869
      op_start = self.StartInstanceOp(instance)
870
      Log("adding a disk", indent=2)
871
      Log("removing last disk", indent=2)
872
      self.ExecOrQueue(instance, [op_add, op_stop, op_rem, op_start])
873

    
874
  @_DoBatch(False)
875
  def BurnAddRemoveNICs(self):
876
    """Add and remove an extra NIC for the instances."""
877
    Log("Adding and removing NICs")
878
    for instance in self.instances:
879
      Log("instance %s", instance, indent=1)
880
      op_add = opcodes.OpSetInstanceParams(\
881
        instance_name=instance, nics=[(constants.DDM_ADD, {})])
882
      op_rem = opcodes.OpSetInstanceParams(\
883
        instance_name=instance, nics=[(constants.DDM_REMOVE, {})])
884
      Log("adding a NIC", indent=2)
885
      Log("removing last NIC", indent=2)
886
      self.ExecOrQueue(instance, [op_add, op_rem])
887

    
888
  def ConfdCallback(self, reply):
889
    """Callback for confd queries"""
890
    if reply.type == confd_client.UPCALL_REPLY:
891
      if reply.server_reply.status != constants.CONFD_REPL_STATUS_OK:
892
        Err("Query %s gave non-ok status %s: %s" % (reply.orig_request,
893
                                                    reply.server_reply.status,
894
                                                    reply.server_reply))
895
      if reply.orig_request.type == constants.CONFD_REQ_PING:
896
        Log("Ping: OK", indent=1)
897
      elif reply.orig_request.type == constants.CONFD_REQ_CLUSTER_MASTER:
898
        if reply.server_reply.answer == self.cluster_info["master"]:
899
          Log("Master: OK", indent=1)
900
        else:
901
          Err("Master: wrong: %s" % reply.server_reply.answer)
902
      elif reply.orig_request.type == constants.CONFD_REQ_NODE_ROLE_BYNAME:
903
        if reply.server_reply.answer == constants.CONFD_NODE_ROLE_MASTER:
904
          Log("Node role for master: OK", indent=1)
905
        else:
906
          Err("Node role for master: wrong: %s" % reply.server_reply.answer)
907

    
908
  def DoConfdRequestReply(self, req):
909
    self.confd_counting_callback.RegisterQuery(req.rsalt)
910
    self.confd_client.SendRequest(req, async=False)
911
    while not self.confd_counting_callback.AllAnswered():
912
      if not self.confd_client.ReceiveReply():
913
        Err("Did not receive all expected confd replies")
914
        break
915

    
916
  def BurnConfd(self):
917
    """Run confd queries for our instances.
918

    
919
    The following confd queries are tested:
920
      - CONFD_REQ_PING: simple ping
921
      - CONFD_REQ_CLUSTER_MASTER: cluster master
922
      - CONFD_REQ_NODE_ROLE_BYNAME: node role, for the master
923

    
924
    """
925
    Log("Checking confd results")
926

    
927
    filter_callback = confd_client.ConfdFilterCallback(self.ConfdCallback)
928
    counting_callback = confd_client.ConfdCountingCallback(filter_callback)
929
    self.confd_counting_callback = counting_callback
930

    
931
    self.confd_client = confd_client.GetConfdClient(counting_callback)
932

    
933
    req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
934
    self.DoConfdRequestReply(req)
935

    
936
    req = confd_client.ConfdClientRequest(
937
      type=constants.CONFD_REQ_CLUSTER_MASTER)
938
    self.DoConfdRequestReply(req)
939

    
940
    req = confd_client.ConfdClientRequest(
941
        type=constants.CONFD_REQ_NODE_ROLE_BYNAME,
942
        query=self.cluster_info["master"])
943
    self.DoConfdRequestReply(req)
944

    
945
  def _CheckInstanceAlive(self, instance):
946
    """Check if an instance is alive by doing http checks.
947

    
948
    This will try to retrieve the url on the instance /hostname.txt
949
    and check that it contains the hostname of the instance. In case
950
    we get ECONNREFUSED, we retry up to the net timeout seconds, for
951
    any other error we abort.
952

    
953
    """
954
    if not self.opts.http_check:
955
      return
956
    end_time = time.time() + self.opts.net_timeout
957
    url = None
958
    while time.time() < end_time and url is None:
959
      try:
960
        url = self.url_opener.open("http://%s/hostname.txt" % instance)
961
      except IOError:
962
        # here we can have connection refused, no route to host, etc.
963
        time.sleep(1)
964
    if url is None:
965
      raise InstanceDown(instance, "Cannot contact instance")
966
    hostname = url.read().strip()
967
    url.close()
968
    if hostname != instance:
969
      raise InstanceDown(instance, ("Hostname mismatch, expected %s, got %s" %
970
                                    (instance, hostname)))
971

    
972
  def BurninCluster(self):
973
    """Test a cluster intensively.
974

    
975
    This will create instances and then start/stop/failover them.
976
    It is safe for existing instances but could impact performance.
977

    
978
    """
979

    
980
    opts = self.opts
981

    
982
    Log("Testing global parameters")
983

    
984
    if (len(self.nodes) == 1 and
985
        opts.disk_template not in (constants.DT_DISKLESS, constants.DT_PLAIN,
986
                                   constants.DT_FILE)):
987
      Err("When one node is available/selected the disk template must"
988
          " be 'diskless', 'file' or 'plain'")
989

    
990
    has_err = True
991
    try:
992
      self.BurnCreateInstances()
993
      if opts.do_replace1 and opts.disk_template in constants.DTS_NET_MIRROR:
994
        self.BurnReplaceDisks1D8()
995
      if (opts.do_replace2 and len(self.nodes) > 2 and
996
          opts.disk_template in constants.DTS_NET_MIRROR) :
997
        self.BurnReplaceDisks2()
998

    
999
      if (opts.disk_template in constants.DTS_GROWABLE and
1000
          compat.any(n > 0 for n in self.disk_growth)):
1001
        self.BurnGrowDisks()
1002

    
1003
      if opts.do_failover and opts.disk_template in constants.DTS_NET_MIRROR:
1004
        self.BurnFailover()
1005

    
1006
      if opts.do_migrate:
1007
        if opts.disk_template != constants.DT_DRBD8:
1008
          Log("Skipping migration (disk template not DRBD8)")
1009
        elif not self.hv_class.CAN_MIGRATE:
1010
          Log("Skipping migration (hypervisor %s does not support it)",
1011
              self.hypervisor)
1012
        else:
1013
          self.BurnMigrate()
1014

    
1015
      if (opts.do_move and len(self.nodes) > 1 and
1016
          opts.disk_template in [constants.DT_PLAIN, constants.DT_FILE]):
1017
        self.BurnMove()
1018

    
1019
      if (opts.do_importexport and
1020
          opts.disk_template not in (constants.DT_DISKLESS,
1021
                                     constants.DT_FILE)):
1022
        self.BurnImportExport()
1023

    
1024
      if opts.do_reinstall:
1025
        self.BurnReinstall()
1026

    
1027
      if opts.do_reboot:
1028
        self.BurnReboot()
1029

    
1030
      if opts.do_addremove_disks:
1031
        self.BurnAddRemoveDisks()
1032

    
1033
      default_nic_mode = self.cluster_default_nicparams[constants.NIC_MODE]
1034
      # Don't add/remove nics in routed mode, as we would need an ip to add
1035
      # them with
1036
      if opts.do_addremove_nics:
1037
        if default_nic_mode == constants.NIC_MODE_BRIDGED:
1038
          self.BurnAddRemoveNICs()
1039
        else:
1040
          Log("Skipping nic add/remove as the cluster is not in bridged mode")
1041

    
1042
      if opts.do_activate_disks:
1043
        self.BurnActivateDisks()
1044

    
1045
      if opts.rename:
1046
        self.BurnRename()
1047

    
1048
      if opts.do_confd_tests:
1049
        self.BurnConfd()
1050

    
1051
      if opts.do_startstop:
1052
        self.BurnStopStart()
1053

    
1054
      has_err = False
1055
    finally:
1056
      if has_err:
1057
        Log("Error detected: opcode buffer follows:\n\n")
1058
        Log(self.GetFeedbackBuf())
1059
        Log("\n\n")
1060
      if not self.opts.keep_instances:
1061
        try:
1062
          self.BurnRemove()
1063
        except Exception, err:  # pylint: disable-msg=W0703
1064
          if has_err: # already detected errors, so errors in removal
1065
                      # are quite expected
1066
            Log("Note: error detected during instance remove: %s", err)
1067
          else: # non-expected error
1068
            raise
1069

    
1070
    return 0
1071

    
1072

    
1073
def main():
1074
  """Main function"""
1075

    
1076
  burner = Burner()
1077
  return burner.BurninCluster()
1078

    
1079

    
1080
if __name__ == "__main__":
1081
  main()