Statistics
| Branch: | Tag: | Revision:

root / tools / burnin @ 6b7d5878

History | View | Annotate | Download (33.2 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Burnin program
23

    
24
"""
25

    
26
import sys
27
import optparse
28
import time
29
import socket
30
import urllib
31
from itertools import izip, islice, cycle
32
from cStringIO import StringIO
33

    
34
from ganeti import opcodes
35
from ganeti import constants
36
from ganeti import cli
37
from ganeti import errors
38
from ganeti import utils
39

    
40

    
41
USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...")
42

    
43
MAX_RETRIES = 3
44
LOG_HEADERS = {
45
  0: "- ",
46
  1: "* ",
47
  2: ""
48
  }
49

    
50
class InstanceDown(Exception):
51
  """The checked instance was not up"""
52

    
53

    
54
class BurninFailure(Exception):
55
  """Failure detected during burning"""
56

    
57

    
58
def Usage():
59
  """Shows program usage information and exits the program."""
60

    
61
  print >> sys.stderr, "Usage:"
62
  print >> sys.stderr, USAGE
63
  sys.exit(2)
64

    
65

    
66
def Log(msg, *args, **kwargs):
67
  """Simple function that prints out its argument.
68

    
69
  """
70
  if args:
71
    msg = msg % args
72
  indent = kwargs.get('indent', 0)
73
  sys.stdout.write("%*s%s%s\n" % (2*indent, "",
74
                                  LOG_HEADERS.get(indent, "  "), msg))
75
  sys.stdout.flush()
76

    
77

    
78
def Err(msg, exit_code=1):
79
  """Simple error logging that prints to stderr.
80

    
81
  """
82
  sys.stderr.write(msg + "\n")
83
  sys.stderr.flush()
84
  sys.exit(exit_code)
85

    
86

    
87
class SimpleOpener(urllib.FancyURLopener):
88
  """A simple url opener"""
89
  # pylint: disable-msg=W0221
90

    
91
  def prompt_user_passwd(self, host, realm, clear_cache=0):
92
    """No-interaction version of prompt_user_passwd."""
93
    # we follow parent class' API
94
    # pylint: disable-msg=W0613
95
    return None, None
96

    
97
  def http_error_default(self, url, fp, errcode, errmsg, headers):
98
    """Custom error handling"""
99
    # make sure sockets are not left in CLOSE_WAIT, this is similar
100
    # but with a different exception to the BasicURLOpener class
101
    _ = fp.read() # throw away data
102
    fp.close()
103
    raise InstanceDown("HTTP error returned: code %s, msg %s" %
104
                       (errcode, errmsg))
105

    
106

    
107
OPTIONS = [
108
  cli.cli_option("-o", "--os", dest="os", default=None,
109
                 help="OS to use during burnin",
110
                 metavar="<OS>",
111
                 completion_suggest=cli.OPT_COMPL_ONE_OS),
112
  cli.cli_option("--disk-size", dest="disk_size",
113
                 help="Disk size (determines disk count)",
114
                 default="128m", type="string", metavar="<size,size,...>",
115
                 completion_suggest=("128M 512M 1G 4G 1G,256M"
116
                                     " 4G,1G,1G 10G").split()),
117
  cli.cli_option("--disk-growth", dest="disk_growth", help="Disk growth",
118
                 default="128m", type="string", metavar="<size,size,...>"),
119
  cli.cli_option("--mem-size", dest="mem_size", help="Memory size",
120
                 default=128, type="unit", metavar="<size>",
121
                 completion_suggest=("128M 256M 512M 1G 4G 8G"
122
                                     " 12G 16G").split()),
123
  cli.DEBUG_OPT,
124
  cli.VERBOSE_OPT,
125
  cli.NOIPCHECK_OPT,
126
  cli.NONAMECHECK_OPT,
127
  cli.EARLY_RELEASE_OPT,
128
  cli.cli_option("--no-replace1", dest="do_replace1",
129
                 help="Skip disk replacement with the same secondary",
130
                 action="store_false", default=True),
131
  cli.cli_option("--no-replace2", dest="do_replace2",
132
                 help="Skip disk replacement with a different secondary",
133
                 action="store_false", default=True),
134
  cli.cli_option("--no-failover", dest="do_failover",
135
                 help="Skip instance failovers", action="store_false",
136
                 default=True),
137
  cli.cli_option("--no-migrate", dest="do_migrate",
138
                 help="Skip instance live migration",
139
                 action="store_false", default=True),
140
  cli.cli_option("--no-move", dest="do_move",
141
                 help="Skip instance moves", action="store_false",
142
                 default=True),
143
  cli.cli_option("--no-importexport", dest="do_importexport",
144
                 help="Skip instance export/import", action="store_false",
145
                 default=True),
146
  cli.cli_option("--no-startstop", dest="do_startstop",
147
                 help="Skip instance stop/start", action="store_false",
148
                 default=True),
149
  cli.cli_option("--no-reinstall", dest="do_reinstall",
150
                 help="Skip instance reinstall", action="store_false",
151
                 default=True),
152
  cli.cli_option("--no-reboot", dest="do_reboot",
153
                 help="Skip instance reboot", action="store_false",
154
                 default=True),
155
  cli.cli_option("--no-activate-disks", dest="do_activate_disks",
156
                 help="Skip disk activation/deactivation",
157
                 action="store_false", default=True),
158
  cli.cli_option("--no-add-disks", dest="do_addremove_disks",
159
                 help="Skip disk addition/removal",
160
                 action="store_false", default=True),
161
  cli.cli_option("--no-add-nics", dest="do_addremove_nics",
162
                 help="Skip NIC addition/removal",
163
                 action="store_false", default=True),
164
  cli.cli_option("--no-nics", dest="nics",
165
                 help="No network interfaces", action="store_const",
166
                 const=[], default=[{}]),
167
  cli.cli_option("--rename", dest="rename", default=None,
168
                 help=("Give one unused instance name which is taken"
169
                       " to start the renaming sequence"),
170
                 metavar="<instance_name>"),
171
  cli.cli_option("-t", "--disk-template", dest="disk_template",
172
                 choices=list(constants.DISK_TEMPLATES),
173
                 default=constants.DT_DRBD8,
174
                 help="Disk template (diskless, file, plain or drbd) [drbd]"),
175
  cli.cli_option("-n", "--nodes", dest="nodes", default="",
176
                 help=("Comma separated list of nodes to perform"
177
                       " the burnin on (defaults to all nodes)"),
178
                 completion_suggest=cli.OPT_COMPL_MANY_NODES),
179
  cli.cli_option("-I", "--iallocator", dest="iallocator",
180
                 default=None, type="string",
181
                 help=("Perform the allocation using an iallocator"
182
                       " instead of fixed node spread (node restrictions no"
183
                       " longer apply, therefore -n/--nodes must not be"
184
                       " used"),
185
                 completion_suggest=cli.OPT_COMPL_ONE_IALLOCATOR),
186
  cli.cli_option("-p", "--parallel", default=False, action="store_true",
187
                 dest="parallel",
188
                 help=("Enable parallelization of some operations in"
189
                       " order to speed burnin or to test granular locking")),
190
  cli.cli_option("--net-timeout", default=15, type="int",
191
                 dest="net_timeout",
192
                 help=("The instance check network timeout in seconds"
193
                       " (defaults to 15 seconds)"),
194
                 completion_suggest="15 60 300 900".split()),
195
  cli.cli_option("-C", "--http-check", default=False, action="store_true",
196
                 dest="http_check",
197
                 help=("Enable checking of instance status via http,"
198
                       " looking for /hostname.txt that should contain the"
199
                       " name of the instance")),
200
  cli.cli_option("-K", "--keep-instances", default=False,
201
                 action="store_true",
202
                 dest="keep_instances",
203
                 help=("Leave instances on the cluster after burnin,"
204
                       " for investigation in case of errors or simply"
205
                       " to use them")),
206
  ]
207

    
208
# Mainly used for bash completion
209
ARGUMENTS = [cli.ArgInstance(min=1)]
210

    
211

    
212
def _DoCheckInstances(fn):
213
  """Decorator for checking instances.
214

    
215
  """
216
  def wrapper(self, *args, **kwargs):
217
    val = fn(self, *args, **kwargs)
218
    for instance in self.instances:
219
      self._CheckInstanceAlive(instance) # pylint: disable-msg=W0212
220
    return val
221

    
222
  return wrapper
223

    
224

    
225
def _DoBatch(retry):
226
  """Decorator for possible batch operations.
227

    
228
  Must come after the _DoCheckInstances decorator (if any).
229

    
230
  @param retry: whether this is a retryable batch, will be
231
      passed to StartBatch
232

    
233
  """
234
  def wrap(fn):
235
    def batched(self, *args, **kwargs):
236
      self.StartBatch(retry)
237
      val = fn(self, *args, **kwargs)
238
      self.CommitQueue()
239
      return val
240
    return batched
241

    
242
  return wrap
243

    
244

    
245
class Burner(object):
246
  """Burner class."""
247

    
248
  def __init__(self):
249
    """Constructor."""
250
    utils.SetupLogging(constants.LOG_BURNIN, debug=False, stderr_logging=True)
251
    self.url_opener = SimpleOpener()
252
    self._feed_buf = StringIO()
253
    self.nodes = []
254
    self.instances = []
255
    self.to_rem = []
256
    self.queued_ops = []
257
    self.opts = None
258
    self.queue_retry = False
259
    self.disk_count = self.disk_growth = self.disk_size = None
260
    self.hvp = self.bep = None
261
    self.ParseOptions()
262
    self.cl = cli.GetClient()
263
    self.GetState()
264

    
265
  def ClearFeedbackBuf(self):
266
    """Clear the feedback buffer."""
267
    self._feed_buf.truncate(0)
268

    
269
  def GetFeedbackBuf(self):
270
    """Return the contents of the buffer."""
271
    return self._feed_buf.getvalue()
272

    
273
  def Feedback(self, msg):
274
    """Acumulate feedback in our buffer."""
275
    formatted_msg = "%s %s" % (time.ctime(utils.MergeTime(msg[0])), msg[2])
276
    self._feed_buf.write(formatted_msg + "\n")
277
    if self.opts.verbose:
278
      Log(formatted_msg, indent=3)
279

    
280
  def MaybeRetry(self, retry_count, msg, fn, *args):
281
    """Possibly retry a given function execution.
282

    
283
    @type retry_count: int
284
    @param retry_count: retry counter:
285
        - 0: non-retryable action
286
        - 1: last retry for a retryable action
287
        - MAX_RETRIES: original try for a retryable action
288
    @type msg: str
289
    @param msg: the kind of the operation
290
    @type fn: callable
291
    @param fn: the function to be called
292

    
293
    """
294
    try:
295
      val = fn(*args)
296
      if retry_count > 0 and retry_count < MAX_RETRIES:
297
        Log("Idempotent %s succeeded after %d retries",
298
            msg, MAX_RETRIES - retry_count)
299
      return val
300
    except Exception, err: # pylint: disable-msg=W0703
301
      if retry_count == 0:
302
        Log("Non-idempotent %s failed, aborting", msg)
303
        raise
304
      elif retry_count == 1:
305
        Log("Idempotent %s repeated failure, aborting", msg)
306
        raise
307
      else:
308
        Log("Idempotent %s failed, retry #%d/%d: %s",
309
            msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err)
310
        self.MaybeRetry(retry_count - 1, msg, fn, *args)
311

    
312
  def _SetDebug(self, ops):
313
    """Set the debug value on the given opcodes"""
314
    for op in ops:
315
      op.debug_level = self.opts.debug
316

    
317
  def _ExecOp(self, *ops):
318
    """Execute one or more opcodes and manage the exec buffer.
319

    
320
    @result: if only opcode has been passed, we return its result;
321
        otherwise we return the list of results
322

    
323
    """
324
    job_id = cli.SendJob(ops, cl=self.cl)
325
    results = cli.PollJob(job_id, cl=self.cl, feedback_fn=self.Feedback)
326
    if len(ops) == 1:
327
      return results[0]
328
    else:
329
      return results
330

    
331
  def ExecOp(self, retry, *ops):
332
    """Execute one or more opcodes and manage the exec buffer.
333

    
334
    @result: if only opcode has been passed, we return its result;
335
        otherwise we return the list of results
336

    
337
    """
338
    if retry:
339
      rval = MAX_RETRIES
340
    else:
341
      rval = 0
342
    self._SetDebug(ops)
343
    return self.MaybeRetry(rval, "opcode", self._ExecOp, *ops)
344

    
345
  def ExecOrQueue(self, name, *ops):
346
    """Execute an opcode and manage the exec buffer."""
347
    if self.opts.parallel:
348
      self._SetDebug(ops)
349
      self.queued_ops.append((ops, name))
350
    else:
351
      return self.ExecOp(self.queue_retry, *ops)
352

    
353
  def StartBatch(self, retry):
354
    """Start a new batch of jobs.
355

    
356
    @param retry: whether this is a retryable batch
357

    
358
    """
359
    self.queued_ops = []
360
    self.queue_retry = retry
361

    
362
  def CommitQueue(self):
363
    """Execute all submitted opcodes in case of parallel burnin"""
364
    if not self.opts.parallel:
365
      return
366

    
367
    if self.queue_retry:
368
      rval = MAX_RETRIES
369
    else:
370
      rval = 0
371

    
372
    try:
373
      results = self.MaybeRetry(rval, "jobset", self.ExecJobSet,
374
                                self.queued_ops)
375
    finally:
376
      self.queued_ops = []
377
    return results
378

    
379
  def ExecJobSet(self, jobs):
380
    """Execute a set of jobs and return once all are done.
381

    
382
    The method will return the list of results, if all jobs are
383
    successful. Otherwise, OpExecError will be raised from within
384
    cli.py.
385

    
386
    """
387
    self.ClearFeedbackBuf()
388
    jex = cli.JobExecutor(cl=self.cl, feedback_fn=self.Feedback)
389
    for ops, name in jobs:
390
      jex.QueueJob(name, *ops) # pylint: disable-msg=W0142
391
    try:
392
      results = jex.GetResults()
393
    except Exception, err: # pylint: disable-msg=W0703
394
      Log("Jobs failed: %s", err)
395
      raise BurninFailure()
396

    
397
    if utils.any(results, lambda x: not x[0]):
398
      raise BurninFailure()
399

    
400
    return [i[1] for i in results]
401

    
402
  def ParseOptions(self):
403
    """Parses the command line options.
404

    
405
    In case of command line errors, it will show the usage and exit the
406
    program.
407

    
408
    """
409
    parser = optparse.OptionParser(usage="\n%s" % USAGE,
410
                                   version=("%%prog (ganeti) %s" %
411
                                            constants.RELEASE_VERSION),
412
                                   option_list=OPTIONS)
413

    
414
    options, args = parser.parse_args()
415
    if len(args) < 1 or options.os is None:
416
      Usage()
417

    
418
    supported_disk_templates = (constants.DT_DISKLESS,
419
                                constants.DT_FILE,
420
                                constants.DT_PLAIN,
421
                                constants.DT_DRBD8)
422
    if options.disk_template not in supported_disk_templates:
423
      Err("Unknown disk template '%s'" % options.disk_template)
424

    
425
    if options.disk_template == constants.DT_DISKLESS:
426
      disk_size = disk_growth = []
427
      options.do_addremove_disks = False
428
    else:
429
      disk_size = [utils.ParseUnit(v) for v in options.disk_size.split(",")]
430
      disk_growth = [utils.ParseUnit(v)
431
                     for v in options.disk_growth.split(",")]
432
      if len(disk_growth) != len(disk_size):
433
        Err("Wrong disk sizes/growth combination")
434
    if ((disk_size and options.disk_template == constants.DT_DISKLESS) or
435
        (not disk_size and options.disk_template != constants.DT_DISKLESS)):
436
      Err("Wrong disk count/disk template combination")
437

    
438
    self.disk_size = disk_size
439
    self.disk_growth = disk_growth
440
    self.disk_count = len(disk_size)
441

    
442
    if options.nodes and options.iallocator:
443
      Err("Give either the nodes option or the iallocator option, not both")
444

    
445
    if options.http_check and not options.name_check:
446
      Err("Can't enable HTTP checks without name checks")
447

    
448
    self.opts = options
449
    self.instances = args
450
    self.bep = {
451
      constants.BE_MEMORY: options.mem_size,
452
      constants.BE_VCPUS: 1,
453
      }
454
    self.hvp = {}
455

    
456
    socket.setdefaulttimeout(options.net_timeout)
457

    
458
  def GetState(self):
459
    """Read the cluster state from the config."""
460
    if self.opts.nodes:
461
      names = self.opts.nodes.split(",")
462
    else:
463
      names = []
464
    try:
465
      op = opcodes.OpQueryNodes(output_fields=["name", "offline", "drained"],
466
                                names=names, use_locking=True)
467
      result = self.ExecOp(True, op)
468
    except errors.GenericError, err:
469
      err_code, msg = cli.FormatError(err)
470
      Err(msg, exit_code=err_code)
471
    self.nodes = [data[0] for data in result if not (data[1] or data[2])]
472

    
473
    op_diagnose = opcodes.OpDiagnoseOS(output_fields=["name", "valid",
474
                                                      "variants"], names=[])
475
    result = self.ExecOp(True, op_diagnose)
476

    
477
    if not result:
478
      Err("Can't get the OS list")
479

    
480
    found = False
481
    for (name, valid, variants) in result:
482
      if valid and self.opts.os in cli.CalculateOSNames(name, variants):
483
        found = True
484
        break
485

    
486
    if not found:
487
      Err("OS '%s' not found" % self.opts.os)
488

    
489
  @_DoCheckInstances
490
  @_DoBatch(False)
491
  def BurnCreateInstances(self):
492
    """Create the given instances.
493

    
494
    """
495
    self.to_rem = []
496
    mytor = izip(cycle(self.nodes),
497
                 islice(cycle(self.nodes), 1, None),
498
                 self.instances)
499

    
500
    Log("Creating instances")
501
    for pnode, snode, instance in mytor:
502
      Log("instance %s", instance, indent=1)
503
      if self.opts.iallocator:
504
        pnode = snode = None
505
        msg = "with iallocator %s" % self.opts.iallocator
506
      elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
507
        snode = None
508
        msg = "on %s" % pnode
509
      else:
510
        msg = "on %s, %s" % (pnode, snode)
511

    
512
      Log(msg, indent=2)
513

    
514
      op = opcodes.OpCreateInstance(instance_name=instance,
515
                                    disks = [ {"size": size}
516
                                              for size in self.disk_size],
517
                                    disk_template=self.opts.disk_template,
518
                                    nics=self.opts.nics,
519
                                    mode=constants.INSTANCE_CREATE,
520
                                    os_type=self.opts.os,
521
                                    pnode=pnode,
522
                                    snode=snode,
523
                                    start=True,
524
                                    ip_check=self.opts.ip_check,
525
                                    name_check=self.opts.name_check,
526
                                    wait_for_sync=True,
527
                                    file_driver="loop",
528
                                    file_storage_dir=None,
529
                                    iallocator=self.opts.iallocator,
530
                                    beparams=self.bep,
531
                                    hvparams=self.hvp,
532
                                    )
533

    
534
      self.ExecOrQueue(instance, op)
535
      self.to_rem.append(instance)
536

    
537
  @_DoBatch(False)
538
  def BurnGrowDisks(self):
539
    """Grow both the os and the swap disks by the requested amount, if any."""
540
    Log("Growing disks")
541
    for instance in self.instances:
542
      Log("instance %s", instance, indent=1)
543
      for idx, growth in enumerate(self.disk_growth):
544
        if growth > 0:
545
          op = opcodes.OpGrowDisk(instance_name=instance, disk=idx,
546
                                  amount=growth, wait_for_sync=True)
547
          Log("increase disk/%s by %s MB", idx, growth, indent=2)
548
          self.ExecOrQueue(instance, op)
549

    
550
  @_DoBatch(True)
551
  def BurnReplaceDisks1D8(self):
552
    """Replace disks on primary and secondary for drbd8."""
553
    Log("Replacing disks on the same nodes")
554
    for instance in self.instances:
555
      Log("instance %s", instance, indent=1)
556
      ops = []
557
      for mode in constants.REPLACE_DISK_SEC, constants.REPLACE_DISK_PRI:
558
        op = opcodes.OpReplaceDisks(instance_name=instance,
559
                                    mode=mode,
560
                                    disks=[i for i in range(self.disk_count)],
561
                                    early_release=self.opts.early_release)
562
        Log("run %s", mode, indent=2)
563
        ops.append(op)
564
      self.ExecOrQueue(instance, *ops) # pylint: disable-msg=W0142
565

    
566
  @_DoBatch(True)
567
  def BurnReplaceDisks2(self):
568
    """Replace secondary node."""
569
    Log("Changing the secondary node")
570
    mode = constants.REPLACE_DISK_CHG
571

    
572
    mytor = izip(islice(cycle(self.nodes), 2, None),
573
                 self.instances)
574
    for tnode, instance in mytor:
575
      Log("instance %s", instance, indent=1)
576
      if self.opts.iallocator:
577
        tnode = None
578
        msg = "with iallocator %s" % self.opts.iallocator
579
      else:
580
        msg = tnode
581
      op = opcodes.OpReplaceDisks(instance_name=instance,
582
                                  mode=mode,
583
                                  remote_node=tnode,
584
                                  iallocator=self.opts.iallocator,
585
                                  disks=[],
586
                                  early_release=self.opts.early_release)
587
      Log("run %s %s", mode, msg, indent=2)
588
      self.ExecOrQueue(instance, op)
589

    
590
  @_DoCheckInstances
591
  @_DoBatch(False)
592
  def BurnFailover(self):
593
    """Failover the instances."""
594
    Log("Failing over instances")
595
    for instance in self.instances:
596
      Log("instance %s", instance, indent=1)
597
      op = opcodes.OpFailoverInstance(instance_name=instance,
598
                                      ignore_consistency=False)
599
      self.ExecOrQueue(instance, op)
600

    
601
  @_DoCheckInstances
602
  @_DoBatch(False)
603
  def BurnMove(self):
604
    """Move the instances."""
605
    Log("Moving instances")
606
    mytor = izip(islice(cycle(self.nodes), 1, None),
607
                 self.instances)
608
    for tnode, instance in mytor:
609
      Log("instance %s", instance, indent=1)
610
      op = opcodes.OpMoveInstance(instance_name=instance,
611
                                  target_node=tnode)
612
      self.ExecOrQueue(instance, op)
613

    
614
  @_DoBatch(False)
615
  def BurnMigrate(self):
616
    """Migrate the instances."""
617
    Log("Migrating instances")
618
    for instance in self.instances:
619
      Log("instance %s", instance, indent=1)
620
      op1 = opcodes.OpMigrateInstance(instance_name=instance, live=True,
621
                                      cleanup=False)
622

    
623
      op2 = opcodes.OpMigrateInstance(instance_name=instance, live=True,
624
                                      cleanup=True)
625
      Log("migration and migration cleanup", indent=2)
626
      self.ExecOrQueue(instance, op1, op2)
627

    
628
  @_DoCheckInstances
629
  @_DoBatch(False)
630
  def BurnImportExport(self):
631
    """Export the instance, delete it, and import it back.
632

    
633
    """
634
    Log("Exporting and re-importing instances")
635
    mytor = izip(cycle(self.nodes),
636
                 islice(cycle(self.nodes), 1, None),
637
                 islice(cycle(self.nodes), 2, None),
638
                 self.instances)
639

    
640
    for pnode, snode, enode, instance in mytor:
641
      Log("instance %s", instance, indent=1)
642
      # read the full name of the instance
643
      nam_op = opcodes.OpQueryInstances(output_fields=["name"],
644
                                        names=[instance], use_locking=True)
645
      full_name = self.ExecOp(False, nam_op)[0][0]
646

    
647
      if self.opts.iallocator:
648
        pnode = snode = None
649
        import_log_msg = ("import from %s"
650
                          " with iallocator %s" %
651
                          (enode, self.opts.iallocator))
652
      elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
653
        snode = None
654
        import_log_msg = ("import from %s to %s" %
655
                          (enode, pnode))
656
      else:
657
        import_log_msg = ("import from %s to %s, %s" %
658
                          (enode, pnode, snode))
659

    
660
      exp_op = opcodes.OpExportInstance(instance_name=instance,
661
                                           target_node=enode,
662
                                           shutdown=True)
663
      rem_op = opcodes.OpRemoveInstance(instance_name=instance,
664
                                        ignore_failures=True)
665
      imp_dir = utils.PathJoin(constants.EXPORT_DIR, full_name)
666
      imp_op = opcodes.OpCreateInstance(instance_name=instance,
667
                                        disks = [ {"size": size}
668
                                                  for size in self.disk_size],
669
                                        disk_template=self.opts.disk_template,
670
                                        nics=self.opts.nics,
671
                                        mode=constants.INSTANCE_IMPORT,
672
                                        src_node=enode,
673
                                        src_path=imp_dir,
674
                                        pnode=pnode,
675
                                        snode=snode,
676
                                        start=True,
677
                                        ip_check=self.opts.ip_check,
678
                                        name_check=self.opts.name_check,
679
                                        wait_for_sync=True,
680
                                        file_storage_dir=None,
681
                                        file_driver="loop",
682
                                        iallocator=self.opts.iallocator,
683
                                        beparams=self.bep,
684
                                        hvparams=self.hvp,
685
                                        )
686

    
687
      erem_op = opcodes.OpRemoveExport(instance_name=instance)
688

    
689
      Log("export to node %s", enode, indent=2)
690
      Log("remove instance", indent=2)
691
      Log(import_log_msg, indent=2)
692
      Log("remove export", indent=2)
693
      self.ExecOrQueue(instance, exp_op, rem_op, imp_op, erem_op)
694

    
695
  @staticmethod
696
  def StopInstanceOp(instance):
697
    """Stop given instance."""
698
    return opcodes.OpShutdownInstance(instance_name=instance)
699

    
700
  @staticmethod
701
  def StartInstanceOp(instance):
702
    """Start given instance."""
703
    return opcodes.OpStartupInstance(instance_name=instance, force=False)
704

    
705
  @staticmethod
706
  def RenameInstanceOp(instance, instance_new):
707
    """Rename instance."""
708
    return opcodes.OpRenameInstance(instance_name=instance,
709
                                    new_name=instance_new)
710

    
711
  @_DoCheckInstances
712
  @_DoBatch(True)
713
  def BurnStopStart(self):
714
    """Stop/start the instances."""
715
    Log("Stopping and starting instances")
716
    for instance in self.instances:
717
      Log("instance %s", instance, indent=1)
718
      op1 = self.StopInstanceOp(instance)
719
      op2 = self.StartInstanceOp(instance)
720
      self.ExecOrQueue(instance, op1, op2)
721

    
722
  @_DoBatch(False)
723
  def BurnRemove(self):
724
    """Remove the instances."""
725
    Log("Removing instances")
726
    for instance in self.to_rem:
727
      Log("instance %s", instance, indent=1)
728
      op = opcodes.OpRemoveInstance(instance_name=instance,
729
                                    ignore_failures=True)
730
      self.ExecOrQueue(instance, op)
731

    
732
  def BurnRename(self):
733
    """Rename the instances.
734

    
735
    Note that this function will not execute in parallel, since we
736
    only have one target for rename.
737

    
738
    """
739
    Log("Renaming instances")
740
    rename = self.opts.rename
741
    for instance in self.instances:
742
      Log("instance %s", instance, indent=1)
743
      op_stop1 = self.StopInstanceOp(instance)
744
      op_stop2 = self.StopInstanceOp(rename)
745
      op_rename1 = self.RenameInstanceOp(instance, rename)
746
      op_rename2 = self.RenameInstanceOp(rename, instance)
747
      op_start1 = self.StartInstanceOp(rename)
748
      op_start2 = self.StartInstanceOp(instance)
749
      self.ExecOp(False, op_stop1, op_rename1, op_start1)
750
      self._CheckInstanceAlive(rename)
751
      self.ExecOp(False, op_stop2, op_rename2, op_start2)
752
      self._CheckInstanceAlive(instance)
753

    
754
  @_DoCheckInstances
755
  @_DoBatch(True)
756
  def BurnReinstall(self):
757
    """Reinstall the instances."""
758
    Log("Reinstalling instances")
759
    for instance in self.instances:
760
      Log("instance %s", instance, indent=1)
761
      op1 = self.StopInstanceOp(instance)
762
      op2 = opcodes.OpReinstallInstance(instance_name=instance)
763
      Log("reinstall without passing the OS", indent=2)
764
      op3 = opcodes.OpReinstallInstance(instance_name=instance,
765
                                        os_type=self.opts.os)
766
      Log("reinstall specifying the OS", indent=2)
767
      op4 = self.StartInstanceOp(instance)
768
      self.ExecOrQueue(instance, op1, op2, op3, op4)
769

    
770
  @_DoCheckInstances
771
  @_DoBatch(True)
772
  def BurnReboot(self):
773
    """Reboot the instances."""
774
    Log("Rebooting instances")
775
    for instance in self.instances:
776
      Log("instance %s", instance, indent=1)
777
      ops = []
778
      for reboot_type in constants.REBOOT_TYPES:
779
        op = opcodes.OpRebootInstance(instance_name=instance,
780
                                      reboot_type=reboot_type,
781
                                      ignore_secondaries=False)
782
        Log("reboot with type '%s'", reboot_type, indent=2)
783
        ops.append(op)
784
      self.ExecOrQueue(instance, *ops) # pylint: disable-msg=W0142
785

    
786
  @_DoCheckInstances
787
  @_DoBatch(True)
788
  def BurnActivateDisks(self):
789
    """Activate and deactivate disks of the instances."""
790
    Log("Activating/deactivating disks")
791
    for instance in self.instances:
792
      Log("instance %s", instance, indent=1)
793
      op_start = self.StartInstanceOp(instance)
794
      op_act = opcodes.OpActivateInstanceDisks(instance_name=instance)
795
      op_deact = opcodes.OpDeactivateInstanceDisks(instance_name=instance)
796
      op_stop = self.StopInstanceOp(instance)
797
      Log("activate disks when online", indent=2)
798
      Log("activate disks when offline", indent=2)
799
      Log("deactivate disks (when offline)", indent=2)
800
      self.ExecOrQueue(instance, op_act, op_stop, op_act, op_deact, op_start)
801

    
802
  @_DoCheckInstances
803
  @_DoBatch(False)
804
  def BurnAddRemoveDisks(self):
805
    """Add and remove an extra disk for the instances."""
806
    Log("Adding and removing disks")
807
    for instance in self.instances:
808
      Log("instance %s", instance, indent=1)
809
      op_add = opcodes.OpSetInstanceParams(\
810
        instance_name=instance,
811
        disks=[(constants.DDM_ADD, {"size": self.disk_size[0]})])
812
      op_rem = opcodes.OpSetInstanceParams(\
813
        instance_name=instance, disks=[(constants.DDM_REMOVE, {})])
814
      op_stop = self.StopInstanceOp(instance)
815
      op_start = self.StartInstanceOp(instance)
816
      Log("adding a disk", indent=2)
817
      Log("removing last disk", indent=2)
818
      self.ExecOrQueue(instance, op_add, op_stop, op_rem, op_start)
819

    
820
  @_DoBatch(False)
821
  def BurnAddRemoveNICs(self):
822
    """Add and remove an extra NIC for the instances."""
823
    Log("Adding and removing NICs")
824
    for instance in self.instances:
825
      Log("instance %s", instance, indent=1)
826
      op_add = opcodes.OpSetInstanceParams(\
827
        instance_name=instance, nics=[(constants.DDM_ADD, {})])
828
      op_rem = opcodes.OpSetInstanceParams(\
829
        instance_name=instance, nics=[(constants.DDM_REMOVE, {})])
830
      Log("adding a NIC", indent=2)
831
      Log("removing last NIC", indent=2)
832
      self.ExecOrQueue(instance, op_add, op_rem)
833

    
834
  def _CheckInstanceAlive(self, instance):
835
    """Check if an instance is alive by doing http checks.
836

    
837
    This will try to retrieve the url on the instance /hostname.txt
838
    and check that it contains the hostname of the instance. In case
839
    we get ECONNREFUSED, we retry up to the net timeout seconds, for
840
    any other error we abort.
841

    
842
    """
843
    if not self.opts.http_check:
844
      return
845
    end_time = time.time() + self.opts.net_timeout
846
    url = None
847
    while time.time() < end_time and url is None:
848
      try:
849
        url = self.url_opener.open("http://%s/hostname.txt" % instance)
850
      except IOError:
851
        # here we can have connection refused, no route to host, etc.
852
        time.sleep(1)
853
    if url is None:
854
      raise InstanceDown(instance, "Cannot contact instance")
855
    hostname = url.read().strip()
856
    url.close()
857
    if hostname != instance:
858
      raise InstanceDown(instance, ("Hostname mismatch, expected %s, got %s" %
859
                                    (instance, hostname)))
860

    
861
  def BurninCluster(self):
862
    """Test a cluster intensively.
863

    
864
    This will create instances and then start/stop/failover them.
865
    It is safe for existing instances but could impact performance.
866

    
867
    """
868

    
869
    opts = self.opts
870

    
871
    Log("Testing global parameters")
872

    
873
    if (len(self.nodes) == 1 and
874
        opts.disk_template not in (constants.DT_DISKLESS, constants.DT_PLAIN,
875
                                   constants.DT_FILE)):
876
      Err("When one node is available/selected the disk template must"
877
          " be 'diskless', 'file' or 'plain'")
878

    
879
    has_err = True
880
    try:
881
      self.BurnCreateInstances()
882
      if opts.do_replace1 and opts.disk_template in constants.DTS_NET_MIRROR:
883
        self.BurnReplaceDisks1D8()
884
      if (opts.do_replace2 and len(self.nodes) > 2 and
885
          opts.disk_template in constants.DTS_NET_MIRROR) :
886
        self.BurnReplaceDisks2()
887

    
888
      if (opts.disk_template != constants.DT_DISKLESS and
889
          utils.any(self.disk_growth, lambda n: n > 0)):
890
        self.BurnGrowDisks()
891

    
892
      if opts.do_failover and opts.disk_template in constants.DTS_NET_MIRROR:
893
        self.BurnFailover()
894

    
895
      if opts.do_migrate and opts.disk_template == constants.DT_DRBD8:
896
        self.BurnMigrate()
897

    
898
      if (opts.do_move and len(self.nodes) > 1 and
899
          opts.disk_template in [constants.DT_PLAIN, constants.DT_FILE]):
900
        self.BurnMove()
901

    
902
      if (opts.do_importexport and
903
          opts.disk_template not in (constants.DT_DISKLESS,
904
                                     constants.DT_FILE)):
905
        self.BurnImportExport()
906

    
907
      if opts.do_reinstall:
908
        self.BurnReinstall()
909

    
910
      if opts.do_reboot:
911
        self.BurnReboot()
912

    
913
      if opts.do_addremove_disks:
914
        self.BurnAddRemoveDisks()
915

    
916
      if opts.do_addremove_nics:
917
        self.BurnAddRemoveNICs()
918

    
919
      if opts.do_activate_disks:
920
        self.BurnActivateDisks()
921

    
922
      if opts.rename:
923
        self.BurnRename()
924

    
925
      if opts.do_startstop:
926
        self.BurnStopStart()
927

    
928
      has_err = False
929
    finally:
930
      if has_err:
931
        Log("Error detected: opcode buffer follows:\n\n")
932
        Log(self.GetFeedbackBuf())
933
        Log("\n\n")
934
      if not self.opts.keep_instances:
935
        try:
936
          self.BurnRemove()
937
        except Exception, err:  # pylint: disable-msg=W0703
938
          if has_err: # already detected errors, so errors in removal
939
                      # are quite expected
940
            Log("Note: error detected during instance remove: %s", err)
941
          else: # non-expected error
942
            raise
943

    
944
    return 0
945

    
946

    
947
def main():
948
  """Main function"""
949

    
950
  burner = Burner()
951
  return burner.BurninCluster()
952

    
953

    
954
if __name__ == "__main__":
955
  main()