Statistics
| Branch: | Tag: | Revision:

root / tools / burnin @ 3f1cf151

History | View | Annotate | Download (33.3 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Burnin program
23

    
24
"""
25

    
26
import os
27
import sys
28
import optparse
29
import time
30
import socket
31
import urllib
32
from itertools import izip, islice, cycle
33
from cStringIO import StringIO
34

    
35
from ganeti import opcodes
36
from ganeti import constants
37
from ganeti import cli
38
from ganeti import errors
39
from ganeti import utils
40

    
41

    
42
USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...")
43

    
44
MAX_RETRIES = 3
45

    
46
class InstanceDown(Exception):
47
  """The checked instance was not up"""
48

    
49

    
50
class BurninFailure(Exception):
51
  """Failure detected during burning"""
52

    
53

    
54
def Usage():
55
  """Shows program usage information and exits the program."""
56

    
57
  print >> sys.stderr, "Usage:"
58
  print >> sys.stderr, USAGE
59
  sys.exit(2)
60

    
61

    
62
def Log(msg, indent=0):
63
  """Simple function that prints out its argument.
64

    
65
  """
66
  headers = {
67
    0: "- ",
68
    1: "* ",
69
    2: ""
70
    }
71
  sys.stdout.write("%*s%s%s\n" % (2*indent, "",
72
                                   headers.get(indent, "  "), msg))
73
  sys.stdout.flush()
74

    
75
def Err(msg, exit_code=1):
76
  """Simple error logging that prints to stderr.
77

    
78
  """
79
  sys.stderr.write(msg + "\n")
80
  sys.stderr.flush()
81
  sys.exit(exit_code)
82

    
83

    
84
class SimpleOpener(urllib.FancyURLopener):
85
  """A simple url opener"""
86
  # pylint: disable-msg=W0221
87

    
88
  def prompt_user_passwd(self, host, realm, clear_cache=0):
89
    """No-interaction version of prompt_user_passwd."""
90
    # we follow parent class' API
91
    # pylint: disable-msg=W0613
92
    return None, None
93

    
94
  def http_error_default(self, url, fp, errcode, errmsg, headers):
95
    """Custom error handling"""
96
    # make sure sockets are not left in CLOSE_WAIT, this is similar
97
    # but with a different exception to the BasicURLOpener class
98
    _ = fp.read() # throw away data
99
    fp.close()
100
    raise InstanceDown("HTTP error returned: code %s, msg %s" %
101
                       (errcode, errmsg))
102

    
103

    
104
OPTIONS = [
105
  cli.cli_option("-o", "--os", dest="os", default=None,
106
                 help="OS to use during burnin",
107
                 metavar="<OS>",
108
                 completion_suggest=cli.OPT_COMPL_ONE_OS),
109
  cli.cli_option("--disk-size", dest="disk_size",
110
                 help="Disk size (determines disk count)",
111
                 default="128m", type="string", metavar="<size,size,...>",
112
                 completion_suggest=("128M 512M 1G 4G 1G,256M"
113
                                     " 4G,1G,1G 10G").split()),
114
  cli.cli_option("--disk-growth", dest="disk_growth", help="Disk growth",
115
                 default="128m", type="string", metavar="<size,size,...>"),
116
  cli.cli_option("--mem-size", dest="mem_size", help="Memory size",
117
                 default=128, type="unit", metavar="<size>",
118
                 completion_suggest=("128M 256M 512M 1G 4G 8G"
119
                                     " 12G 16G").split()),
120
  cli.DEBUG_OPT,
121
  cli.VERBOSE_OPT,
122
  cli.NOIPCHECK_OPT,
123
  cli.NONAMECHECK_OPT,
124
  cli.EARLY_RELEASE_OPT,
125
  cli.cli_option("--no-replace1", dest="do_replace1",
126
                 help="Skip disk replacement with the same secondary",
127
                 action="store_false", default=True),
128
  cli.cli_option("--no-replace2", dest="do_replace2",
129
                 help="Skip disk replacement with a different secondary",
130
                 action="store_false", default=True),
131
  cli.cli_option("--no-failover", dest="do_failover",
132
                 help="Skip instance failovers", action="store_false",
133
                 default=True),
134
  cli.cli_option("--no-migrate", dest="do_migrate",
135
                 help="Skip instance live migration",
136
                 action="store_false", default=True),
137
  cli.cli_option("--no-move", dest="do_move",
138
                 help="Skip instance moves", action="store_false",
139
                 default=True),
140
  cli.cli_option("--no-importexport", dest="do_importexport",
141
                 help="Skip instance export/import", action="store_false",
142
                 default=True),
143
  cli.cli_option("--no-startstop", dest="do_startstop",
144
                 help="Skip instance stop/start", action="store_false",
145
                 default=True),
146
  cli.cli_option("--no-reinstall", dest="do_reinstall",
147
                 help="Skip instance reinstall", action="store_false",
148
                 default=True),
149
  cli.cli_option("--no-reboot", dest="do_reboot",
150
                 help="Skip instance reboot", action="store_false",
151
                 default=True),
152
  cli.cli_option("--no-activate-disks", dest="do_activate_disks",
153
                 help="Skip disk activation/deactivation",
154
                 action="store_false", default=True),
155
  cli.cli_option("--no-add-disks", dest="do_addremove_disks",
156
                 help="Skip disk addition/removal",
157
                 action="store_false", default=True),
158
  cli.cli_option("--no-add-nics", dest="do_addremove_nics",
159
                 help="Skip NIC addition/removal",
160
                 action="store_false", default=True),
161
  cli.cli_option("--no-nics", dest="nics",
162
                 help="No network interfaces", action="store_const",
163
                 const=[], default=[{}]),
164
  cli.cli_option("--rename", dest="rename", default=None,
165
                 help=("Give one unused instance name which is taken"
166
                       " to start the renaming sequence"),
167
                 metavar="<instance_name>"),
168
  cli.cli_option("-t", "--disk-template", dest="disk_template",
169
                 choices=list(constants.DISK_TEMPLATES),
170
                 default=constants.DT_DRBD8,
171
                 help="Disk template (diskless, file, plain or drbd) [drbd]"),
172
  cli.cli_option("-n", "--nodes", dest="nodes", default="",
173
                 help=("Comma separated list of nodes to perform"
174
                       " the burnin on (defaults to all nodes)"),
175
                 completion_suggest=cli.OPT_COMPL_MANY_NODES),
176
  cli.cli_option("-I", "--iallocator", dest="iallocator",
177
                 default=None, type="string",
178
                 help=("Perform the allocation using an iallocator"
179
                       " instead of fixed node spread (node restrictions no"
180
                       " longer apply, therefore -n/--nodes must not be"
181
                       " used"),
182
                 completion_suggest=cli.OPT_COMPL_ONE_IALLOCATOR),
183
  cli.cli_option("-p", "--parallel", default=False, action="store_true",
184
                 dest="parallel",
185
                 help=("Enable parallelization of some operations in"
186
                       " order to speed burnin or to test granular locking")),
187
  cli.cli_option("--net-timeout", default=15, type="int",
188
                 dest="net_timeout",
189
                 help=("The instance check network timeout in seconds"
190
                       " (defaults to 15 seconds)"),
191
                 completion_suggest="15 60 300 900".split()),
192
  cli.cli_option("-C", "--http-check", default=False, action="store_true",
193
                 dest="http_check",
194
                 help=("Enable checking of instance status via http,"
195
                       " looking for /hostname.txt that should contain the"
196
                       " name of the instance")),
197
  cli.cli_option("-K", "--keep-instances", default=False,
198
                 action="store_true",
199
                 dest="keep_instances",
200
                 help=("Leave instances on the cluster after burnin,"
201
                       " for investigation in case of errors or simply"
202
                       " to use them")),
203
  ]
204

    
205
# Mainly used for bash completion
206
ARGUMENTS = [cli.ArgInstance(min=1)]
207

    
208

    
209
def _DoCheckInstances(fn):
210
  """Decorator for checking instances.
211

    
212
  """
213
  def wrapper(self, *args, **kwargs):
214
    val = fn(self, *args, **kwargs)
215
    for instance in self.instances:
216
      self._CheckInstanceAlive(instance) # pylint: disable-msg=W0212
217
    return val
218

    
219
  return wrapper
220

    
221

    
222
def _DoBatch(retry):
223
  """Decorator for possible batch operations.
224

    
225
  Must come after the _DoCheckInstances decorator (if any).
226

    
227
  @param retry: whether this is a retryable batch, will be
228
      passed to StartBatch
229

    
230
  """
231
  def wrap(fn):
232
    def batched(self, *args, **kwargs):
233
      self.StartBatch(retry)
234
      val = fn(self, *args, **kwargs)
235
      self.CommitQueue()
236
      return val
237
    return batched
238

    
239
  return wrap
240

    
241

    
242
class Burner(object):
243
  """Burner class."""
244

    
245
  def __init__(self):
246
    """Constructor."""
247
    utils.SetupLogging(constants.LOG_BURNIN, debug=False, stderr_logging=True)
248
    self.url_opener = SimpleOpener()
249
    self._feed_buf = StringIO()
250
    self.nodes = []
251
    self.instances = []
252
    self.to_rem = []
253
    self.queued_ops = []
254
    self.opts = None
255
    self.queue_retry = False
256
    self.disk_count = self.disk_growth = self.disk_size = None
257
    self.hvp = self.bep = None
258
    self.ParseOptions()
259
    self.cl = cli.GetClient()
260
    self.GetState()
261

    
262
  def ClearFeedbackBuf(self):
263
    """Clear the feedback buffer."""
264
    self._feed_buf.truncate(0)
265

    
266
  def GetFeedbackBuf(self):
267
    """Return the contents of the buffer."""
268
    return self._feed_buf.getvalue()
269

    
270
  def Feedback(self, msg):
271
    """Acumulate feedback in our buffer."""
272
    formatted_msg = "%s %s" % (time.ctime(utils.MergeTime(msg[0])), msg[2])
273
    self._feed_buf.write(formatted_msg + "\n")
274
    if self.opts.verbose:
275
      Log(formatted_msg, indent=3)
276

    
277
  def MaybeRetry(self, retry_count, msg, fn, *args):
278
    """Possibly retry a given function execution.
279

    
280
    @type retry_count: int
281
    @param retry_count: retry counter:
282
        - 0: non-retryable action
283
        - 1: last retry for a retryable action
284
        - MAX_RETRIES: original try for a retryable action
285
    @type msg: str
286
    @param msg: the kind of the operation
287
    @type fn: callable
288
    @param fn: the function to be called
289

    
290
    """
291
    try:
292
      val = fn(*args)
293
      if retry_count > 0 and retry_count < MAX_RETRIES:
294
        Log("Idempotent %s succeeded after %d retries" %
295
            (msg, MAX_RETRIES - retry_count))
296
      return val
297
    except Exception, err: # pylint: disable-msg=W0703
298
      if retry_count == 0:
299
        Log("Non-idempotent %s failed, aborting" % (msg, ))
300
        raise
301
      elif retry_count == 1:
302
        Log("Idempotent %s repeated failure, aborting" % (msg, ))
303
        raise
304
      else:
305
        Log("Idempotent %s failed, retry #%d/%d: %s" %
306
            (msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err))
307
        self.MaybeRetry(retry_count - 1, msg, fn, *args)
308

    
309
  def _SetDebug(self, ops):
310
    """Set the debug value on the given opcodes"""
311
    for op in ops:
312
      op.debug_level = self.opts.debug
313

    
314
  def _ExecOp(self, *ops):
315
    """Execute one or more opcodes and manage the exec buffer.
316

    
317
    @result: if only opcode has been passed, we return its result;
318
        otherwise we return the list of results
319

    
320
    """
321
    job_id = cli.SendJob(ops, cl=self.cl)
322
    results = cli.PollJob(job_id, cl=self.cl, feedback_fn=self.Feedback)
323
    if len(ops) == 1:
324
      return results[0]
325
    else:
326
      return results
327

    
328
  def ExecOp(self, retry, *ops):
329
    """Execute one or more opcodes and manage the exec buffer.
330

    
331
    @result: if only opcode has been passed, we return its result;
332
        otherwise we return the list of results
333

    
334
    """
335
    if retry:
336
      rval = MAX_RETRIES
337
    else:
338
      rval = 0
339
    self._SetDebug(ops)
340
    return self.MaybeRetry(rval, "opcode", self._ExecOp, *ops)
341

    
342
  def ExecOrQueue(self, name, *ops):
343
    """Execute an opcode and manage the exec buffer."""
344
    if self.opts.parallel:
345
      self._SetDebug(ops)
346
      self.queued_ops.append((ops, name))
347
    else:
348
      return self.ExecOp(self.queue_retry, *ops)
349

    
350
  def StartBatch(self, retry):
351
    """Start a new batch of jobs.
352

    
353
    @param retry: whether this is a retryable batch
354

    
355
    """
356
    self.queued_ops = []
357
    self.queue_retry = retry
358

    
359
  def CommitQueue(self):
360
    """Execute all submitted opcodes in case of parallel burnin"""
361
    if not self.opts.parallel:
362
      return
363

    
364
    if self.queue_retry:
365
      rval = MAX_RETRIES
366
    else:
367
      rval = 0
368

    
369
    try:
370
      results = self.MaybeRetry(rval, "jobset", self.ExecJobSet,
371
                                self.queued_ops)
372
    finally:
373
      self.queued_ops = []
374
    return results
375

    
376
  def ExecJobSet(self, jobs):
377
    """Execute a set of jobs and return once all are done.
378

    
379
    The method will return the list of results, if all jobs are
380
    successful. Otherwise, OpExecError will be raised from within
381
    cli.py.
382

    
383
    """
384
    self.ClearFeedbackBuf()
385
    job_ids = [cli.SendJob(row[0], cl=self.cl) for row in jobs]
386
    Log("Submitted job ID(s) %s" % utils.CommaJoin(job_ids), indent=1)
387
    results = []
388
    for jid, (_, iname) in zip(job_ids, jobs):
389
      Log("waiting for job %s for %s" % (jid, iname), indent=2)
390
      try:
391
        results.append(cli.PollJob(jid, cl=self.cl, feedback_fn=self.Feedback))
392
      except Exception, err: # pylint: disable-msg=W0703
393
        Log("Job for %s failed: %s" % (iname, err))
394
    if len(results) != len(jobs):
395
      raise BurninFailure()
396
    return results
397

    
398
  def ParseOptions(self):
399
    """Parses the command line options.
400

    
401
    In case of command line errors, it will show the usage and exit the
402
    program.
403

    
404
    """
405
    parser = optparse.OptionParser(usage="\n%s" % USAGE,
406
                                   version=("%%prog (ganeti) %s" %
407
                                            constants.RELEASE_VERSION),
408
                                   option_list=OPTIONS)
409

    
410
    options, args = parser.parse_args()
411
    if len(args) < 1 or options.os is None:
412
      Usage()
413

    
414
    supported_disk_templates = (constants.DT_DISKLESS,
415
                                constants.DT_FILE,
416
                                constants.DT_PLAIN,
417
                                constants.DT_DRBD8)
418
    if options.disk_template not in supported_disk_templates:
419
      Err("Unknown disk template '%s'" % options.disk_template)
420

    
421
    if options.disk_template == constants.DT_DISKLESS:
422
      disk_size = disk_growth = []
423
      options.do_addremove_disks = False
424
    else:
425
      disk_size = [utils.ParseUnit(v) for v in options.disk_size.split(",")]
426
      disk_growth = [utils.ParseUnit(v)
427
                     for v in options.disk_growth.split(",")]
428
      if len(disk_growth) != len(disk_size):
429
        Err("Wrong disk sizes/growth combination")
430
    if ((disk_size and options.disk_template == constants.DT_DISKLESS) or
431
        (not disk_size and options.disk_template != constants.DT_DISKLESS)):
432
      Err("Wrong disk count/disk template combination")
433

    
434
    self.disk_size = disk_size
435
    self.disk_growth = disk_growth
436
    self.disk_count = len(disk_size)
437

    
438
    if options.nodes and options.iallocator:
439
      Err("Give either the nodes option or the iallocator option, not both")
440

    
441
    if options.http_check and not options.name_check:
442
      Err("Can't enable HTTP checks without name checks")
443

    
444
    self.opts = options
445
    self.instances = args
446
    self.bep = {
447
      constants.BE_MEMORY: options.mem_size,
448
      constants.BE_VCPUS: 1,
449
      }
450
    self.hvp = {}
451

    
452
    socket.setdefaulttimeout(options.net_timeout)
453

    
454
  def GetState(self):
455
    """Read the cluster state from the config."""
456
    if self.opts.nodes:
457
      names = self.opts.nodes.split(",")
458
    else:
459
      names = []
460
    try:
461
      op = opcodes.OpQueryNodes(output_fields=["name", "offline", "drained"],
462
                                names=names, use_locking=True)
463
      result = self.ExecOp(True, op)
464
    except errors.GenericError, err:
465
      err_code, msg = cli.FormatError(err)
466
      Err(msg, exit_code=err_code)
467
    self.nodes = [data[0] for data in result if not (data[1] or data[2])]
468

    
469
    op_diagnose = opcodes.OpDiagnoseOS(output_fields=["name", "valid",
470
                                                      "variants"], names=[])
471
    result = self.ExecOp(True, op_diagnose)
472

    
473
    if not result:
474
      Err("Can't get the OS list")
475

    
476
    found = False
477
    for (name, valid, variants) in result:
478
      if valid and self.opts.os in cli.CalculateOSNames(name, variants):
479
        found = True
480
        break
481

    
482
    if not found:
483
      Err("OS '%s' not found" % self.opts.os)
484

    
485
  @_DoCheckInstances
486
  @_DoBatch(False)
487
  def BurnCreateInstances(self):
488
    """Create the given instances.
489

    
490
    """
491
    self.to_rem = []
492
    mytor = izip(cycle(self.nodes),
493
                 islice(cycle(self.nodes), 1, None),
494
                 self.instances)
495

    
496
    Log("Creating instances")
497
    for pnode, snode, instance in mytor:
498
      Log("instance %s" % instance, indent=1)
499
      if self.opts.iallocator:
500
        pnode = snode = None
501
        msg = "with iallocator %s" % self.opts.iallocator
502
      elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
503
        snode = None
504
        msg = "on %s" % pnode
505
      else:
506
        msg = "on %s, %s" % (pnode, snode)
507

    
508
      Log(msg, indent=2)
509

    
510
      op = opcodes.OpCreateInstance(instance_name=instance,
511
                                    disks = [ {"size": size}
512
                                              for size in self.disk_size],
513
                                    disk_template=self.opts.disk_template,
514
                                    nics=self.opts.nics,
515
                                    mode=constants.INSTANCE_CREATE,
516
                                    os_type=self.opts.os,
517
                                    pnode=pnode,
518
                                    snode=snode,
519
                                    start=True,
520
                                    ip_check=self.opts.ip_check,
521
                                    name_check=self.opts.name_check,
522
                                    wait_for_sync=True,
523
                                    file_driver="loop",
524
                                    file_storage_dir=None,
525
                                    iallocator=self.opts.iallocator,
526
                                    beparams=self.bep,
527
                                    hvparams=self.hvp,
528
                                    )
529

    
530
      self.ExecOrQueue(instance, op)
531
      self.to_rem.append(instance)
532

    
533
  @_DoBatch(False)
534
  def BurnGrowDisks(self):
535
    """Grow both the os and the swap disks by the requested amount, if any."""
536
    Log("Growing disks")
537
    for instance in self.instances:
538
      Log("instance %s" % instance, indent=1)
539
      for idx, growth in enumerate(self.disk_growth):
540
        if growth > 0:
541
          op = opcodes.OpGrowDisk(instance_name=instance, disk=idx,
542
                                  amount=growth, wait_for_sync=True)
543
          Log("increase disk/%s by %s MB" % (idx, growth), indent=2)
544
          self.ExecOrQueue(instance, op)
545

    
546
  @_DoBatch(True)
547
  def BurnReplaceDisks1D8(self):
548
    """Replace disks on primary and secondary for drbd8."""
549
    Log("Replacing disks on the same nodes")
550
    for instance in self.instances:
551
      Log("instance %s" % instance, indent=1)
552
      ops = []
553
      for mode in constants.REPLACE_DISK_SEC, constants.REPLACE_DISK_PRI:
554
        op = opcodes.OpReplaceDisks(instance_name=instance,
555
                                    mode=mode,
556
                                    disks=[i for i in range(self.disk_count)],
557
                                    early_release=self.opts.early_release)
558
        Log("run %s" % mode, indent=2)
559
        ops.append(op)
560
      self.ExecOrQueue(instance, *ops) # pylint: disable-msg=W0142
561

    
562
  @_DoBatch(True)
563
  def BurnReplaceDisks2(self):
564
    """Replace secondary node."""
565
    Log("Changing the secondary node")
566
    mode = constants.REPLACE_DISK_CHG
567

    
568
    mytor = izip(islice(cycle(self.nodes), 2, None),
569
                 self.instances)
570
    for tnode, instance in mytor:
571
      Log("instance %s" % instance, indent=1)
572
      if self.opts.iallocator:
573
        tnode = None
574
        msg = "with iallocator %s" % self.opts.iallocator
575
      else:
576
        msg = tnode
577
      op = opcodes.OpReplaceDisks(instance_name=instance,
578
                                  mode=mode,
579
                                  remote_node=tnode,
580
                                  iallocator=self.opts.iallocator,
581
                                  disks=[],
582
                                  early_release=self.opts.early_release)
583
      Log("run %s %s" % (mode, msg), indent=2)
584
      self.ExecOrQueue(instance, op)
585

    
586
  @_DoCheckInstances
587
  @_DoBatch(False)
588
  def BurnFailover(self):
589
    """Failover the instances."""
590
    Log("Failing over instances")
591
    for instance in self.instances:
592
      Log("instance %s" % instance, indent=1)
593
      op = opcodes.OpFailoverInstance(instance_name=instance,
594
                                      ignore_consistency=False)
595
      self.ExecOrQueue(instance, op)
596

    
597
  @_DoCheckInstances
598
  @_DoBatch(False)
599
  def BurnMove(self):
600
    """Move the instances."""
601
    Log("Moving instances")
602
    mytor = izip(islice(cycle(self.nodes), 1, None),
603
                 self.instances)
604
    for tnode, instance in mytor:
605
      Log("instance %s" % instance, indent=1)
606
      op = opcodes.OpMoveInstance(instance_name=instance,
607
                                  target_node=tnode)
608
      self.ExecOrQueue(instance, op)
609

    
610
  @_DoBatch(False)
611
  def BurnMigrate(self):
612
    """Migrate the instances."""
613
    Log("Migrating instances")
614
    for instance in self.instances:
615
      Log("instance %s" % instance, indent=1)
616
      op1 = opcodes.OpMigrateInstance(instance_name=instance, live=True,
617
                                      cleanup=False)
618

    
619
      op2 = opcodes.OpMigrateInstance(instance_name=instance, live=True,
620
                                      cleanup=True)
621
      Log("migration and migration cleanup", indent=2)
622
      self.ExecOrQueue(instance, op1, op2)
623

    
624
  @_DoCheckInstances
625
  @_DoBatch(False)
626
  def BurnImportExport(self):
627
    """Export the instance, delete it, and import it back.
628

    
629
    """
630
    Log("Exporting and re-importing instances")
631
    mytor = izip(cycle(self.nodes),
632
                 islice(cycle(self.nodes), 1, None),
633
                 islice(cycle(self.nodes), 2, None),
634
                 self.instances)
635

    
636
    for pnode, snode, enode, instance in mytor:
637
      Log("instance %s" % instance, indent=1)
638
      # read the full name of the instance
639
      nam_op = opcodes.OpQueryInstances(output_fields=["name"],
640
                                        names=[instance], use_locking=True)
641
      full_name = self.ExecOp(False, nam_op)[0][0]
642

    
643
      if self.opts.iallocator:
644
        pnode = snode = None
645
        import_log_msg = ("import from %s"
646
                          " with iallocator %s" %
647
                          (enode, self.opts.iallocator))
648
      elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
649
        snode = None
650
        import_log_msg = ("import from %s to %s" %
651
                          (enode, pnode))
652
      else:
653
        import_log_msg = ("import from %s to %s, %s" %
654
                          (enode, pnode, snode))
655

    
656
      exp_op = opcodes.OpExportInstance(instance_name=instance,
657
                                           target_node=enode,
658
                                           shutdown=True)
659
      rem_op = opcodes.OpRemoveInstance(instance_name=instance,
660
                                        ignore_failures=True)
661
      imp_dir = os.path.join(constants.EXPORT_DIR, full_name)
662
      imp_op = opcodes.OpCreateInstance(instance_name=instance,
663
                                        disks = [ {"size": size}
664
                                                  for size in self.disk_size],
665
                                        disk_template=self.opts.disk_template,
666
                                        nics=self.opts.nics,
667
                                        mode=constants.INSTANCE_IMPORT,
668
                                        src_node=enode,
669
                                        src_path=imp_dir,
670
                                        pnode=pnode,
671
                                        snode=snode,
672
                                        start=True,
673
                                        ip_check=self.opts.ip_check,
674
                                        name_check=self.opts.name_check,
675
                                        wait_for_sync=True,
676
                                        file_storage_dir=None,
677
                                        file_driver="loop",
678
                                        iallocator=self.opts.iallocator,
679
                                        beparams=self.bep,
680
                                        hvparams=self.hvp,
681
                                        )
682

    
683
      erem_op = opcodes.OpRemoveExport(instance_name=instance)
684

    
685
      Log("export to node %s" % enode, indent=2)
686
      Log("remove instance", indent=2)
687
      Log(import_log_msg, indent=2)
688
      Log("remove export", indent=2)
689
      self.ExecOrQueue(instance, exp_op, rem_op, imp_op, erem_op)
690

    
691
  @staticmethod
692
  def StopInstanceOp(instance):
693
    """Stop given instance."""
694
    return opcodes.OpShutdownInstance(instance_name=instance)
695

    
696
  @staticmethod
697
  def StartInstanceOp(instance):
698
    """Start given instance."""
699
    return opcodes.OpStartupInstance(instance_name=instance, force=False)
700

    
701
  @staticmethod
702
  def RenameInstanceOp(instance, instance_new):
703
    """Rename instance."""
704
    return opcodes.OpRenameInstance(instance_name=instance,
705
                                    new_name=instance_new)
706

    
707
  @_DoCheckInstances
708
  @_DoBatch(True)
709
  def BurnStopStart(self):
710
    """Stop/start the instances."""
711
    Log("Stopping and starting instances")
712
    for instance in self.instances:
713
      Log("instance %s" % instance, indent=1)
714
      op1 = self.StopInstanceOp(instance)
715
      op2 = self.StartInstanceOp(instance)
716
      self.ExecOrQueue(instance, op1, op2)
717

    
718
  @_DoBatch(False)
719
  def BurnRemove(self):
720
    """Remove the instances."""
721
    Log("Removing instances")
722
    for instance in self.to_rem:
723
      Log("instance %s" % instance, indent=1)
724
      op = opcodes.OpRemoveInstance(instance_name=instance,
725
                                    ignore_failures=True)
726
      self.ExecOrQueue(instance, op)
727

    
728
  def BurnRename(self):
729
    """Rename the instances.
730

    
731
    Note that this function will not execute in parallel, since we
732
    only have one target for rename.
733

    
734
    """
735
    Log("Renaming instances")
736
    rename = self.opts.rename
737
    for instance in self.instances:
738
      Log("instance %s" % instance, indent=1)
739
      op_stop1 = self.StopInstanceOp(instance)
740
      op_stop2 = self.StopInstanceOp(rename)
741
      op_rename1 = self.RenameInstanceOp(instance, rename)
742
      op_rename2 = self.RenameInstanceOp(rename, instance)
743
      op_start1 = self.StartInstanceOp(rename)
744
      op_start2 = self.StartInstanceOp(instance)
745
      self.ExecOp(False, op_stop1, op_rename1, op_start1)
746
      self._CheckInstanceAlive(rename)
747
      self.ExecOp(False, op_stop2, op_rename2, op_start2)
748
      self._CheckInstanceAlive(instance)
749

    
750
  @_DoCheckInstances
751
  @_DoBatch(True)
752
  def BurnReinstall(self):
753
    """Reinstall the instances."""
754
    Log("Reinstalling instances")
755
    for instance in self.instances:
756
      Log("instance %s" % instance, indent=1)
757
      op1 = self.StopInstanceOp(instance)
758
      op2 = opcodes.OpReinstallInstance(instance_name=instance)
759
      Log("reinstall without passing the OS", indent=2)
760
      op3 = opcodes.OpReinstallInstance(instance_name=instance,
761
                                        os_type=self.opts.os)
762
      Log("reinstall specifying the OS", indent=2)
763
      op4 = self.StartInstanceOp(instance)
764
      self.ExecOrQueue(instance, op1, op2, op3, op4)
765

    
766
  @_DoCheckInstances
767
  @_DoBatch(True)
768
  def BurnReboot(self):
769
    """Reboot the instances."""
770
    Log("Rebooting instances")
771
    for instance in self.instances:
772
      Log("instance %s" % instance, indent=1)
773
      ops = []
774
      for reboot_type in constants.REBOOT_TYPES:
775
        op = opcodes.OpRebootInstance(instance_name=instance,
776
                                      reboot_type=reboot_type,
777
                                      ignore_secondaries=False)
778
        Log("reboot with type '%s'" % reboot_type, indent=2)
779
        ops.append(op)
780
      self.ExecOrQueue(instance, *ops) # pylint: disable-msg=W0142
781

    
782
  @_DoCheckInstances
783
  @_DoBatch(True)
784
  def BurnActivateDisks(self):
785
    """Activate and deactivate disks of the instances."""
786
    Log("Activating/deactivating disks")
787
    for instance in self.instances:
788
      Log("instance %s" % instance, indent=1)
789
      op_start = self.StartInstanceOp(instance)
790
      op_act = opcodes.OpActivateInstanceDisks(instance_name=instance)
791
      op_deact = opcodes.OpDeactivateInstanceDisks(instance_name=instance)
792
      op_stop = self.StopInstanceOp(instance)
793
      Log("activate disks when online", indent=2)
794
      Log("activate disks when offline", indent=2)
795
      Log("deactivate disks (when offline)", indent=2)
796
      self.ExecOrQueue(instance, op_act, op_stop, op_act, op_deact, op_start)
797

    
798
  @_DoCheckInstances
799
  @_DoBatch(False)
800
  def BurnAddRemoveDisks(self):
801
    """Add and remove an extra disk for the instances."""
802
    Log("Adding and removing disks")
803
    for instance in self.instances:
804
      Log("instance %s" % instance, indent=1)
805
      op_add = opcodes.OpSetInstanceParams(\
806
        instance_name=instance,
807
        disks=[(constants.DDM_ADD, {"size": self.disk_size[0]})])
808
      op_rem = opcodes.OpSetInstanceParams(\
809
        instance_name=instance, disks=[(constants.DDM_REMOVE, {})])
810
      op_stop = self.StopInstanceOp(instance)
811
      op_start = self.StartInstanceOp(instance)
812
      Log("adding a disk", indent=2)
813
      Log("removing last disk", indent=2)
814
      self.ExecOrQueue(instance, op_add, op_stop, op_rem, op_start)
815

    
816
  @_DoBatch(False)
817
  def BurnAddRemoveNICs(self):
818
    """Add and remove an extra NIC for the instances."""
819
    Log("Adding and removing NICs")
820
    for instance in self.instances:
821
      Log("instance %s" % instance, indent=1)
822
      op_add = opcodes.OpSetInstanceParams(\
823
        instance_name=instance, nics=[(constants.DDM_ADD, {})])
824
      op_rem = opcodes.OpSetInstanceParams(\
825
        instance_name=instance, nics=[(constants.DDM_REMOVE, {})])
826
      Log("adding a NIC", indent=2)
827
      Log("removing last NIC", indent=2)
828
      self.ExecOrQueue(instance, op_add, op_rem)
829

    
830
  def _CheckInstanceAlive(self, instance):
831
    """Check if an instance is alive by doing http checks.
832

    
833
    This will try to retrieve the url on the instance /hostname.txt
834
    and check that it contains the hostname of the instance. In case
835
    we get ECONNREFUSED, we retry up to the net timeout seconds, for
836
    any other error we abort.
837

    
838
    """
839
    if not self.opts.http_check:
840
      return
841
    end_time = time.time() + self.opts.net_timeout
842
    url = None
843
    while time.time() < end_time and url is None:
844
      try:
845
        url = self.url_opener.open("http://%s/hostname.txt" % instance)
846
      except IOError:
847
        # here we can have connection refused, no route to host, etc.
848
        time.sleep(1)
849
    if url is None:
850
      raise InstanceDown(instance, "Cannot contact instance")
851
    hostname = url.read().strip()
852
    url.close()
853
    if hostname != instance:
854
      raise InstanceDown(instance, ("Hostname mismatch, expected %s, got %s" %
855
                                    (instance, hostname)))
856

    
857
  def BurninCluster(self):
858
    """Test a cluster intensively.
859

    
860
    This will create instances and then start/stop/failover them.
861
    It is safe for existing instances but could impact performance.
862

    
863
    """
864

    
865
    opts = self.opts
866

    
867
    Log("Testing global parameters")
868

    
869
    if (len(self.nodes) == 1 and
870
        opts.disk_template not in (constants.DT_DISKLESS, constants.DT_PLAIN,
871
                                   constants.DT_FILE)):
872
      Err("When one node is available/selected the disk template must"
873
          " be 'diskless', 'file' or 'plain'")
874

    
875
    has_err = True
876
    try:
877
      self.BurnCreateInstances()
878
      if opts.do_replace1 and opts.disk_template in constants.DTS_NET_MIRROR:
879
        self.BurnReplaceDisks1D8()
880
      if (opts.do_replace2 and len(self.nodes) > 2 and
881
          opts.disk_template in constants.DTS_NET_MIRROR) :
882
        self.BurnReplaceDisks2()
883

    
884
      if (opts.disk_template != constants.DT_DISKLESS and
885
          utils.any(self.disk_growth, lambda n: n > 0)):
886
        self.BurnGrowDisks()
887

    
888
      if opts.do_failover and opts.disk_template in constants.DTS_NET_MIRROR:
889
        self.BurnFailover()
890

    
891
      if opts.do_migrate and opts.disk_template == constants.DT_DRBD8:
892
        self.BurnMigrate()
893

    
894
      if (opts.do_move and len(self.nodes) > 1 and
895
          opts.disk_template in [constants.DT_PLAIN, constants.DT_FILE]):
896
        self.BurnMove()
897

    
898
      if (opts.do_importexport and
899
          opts.disk_template not in (constants.DT_DISKLESS,
900
                                     constants.DT_FILE)):
901
        self.BurnImportExport()
902

    
903
      if opts.do_reinstall:
904
        self.BurnReinstall()
905

    
906
      if opts.do_reboot:
907
        self.BurnReboot()
908

    
909
      if opts.do_addremove_disks:
910
        self.BurnAddRemoveDisks()
911

    
912
      if opts.do_addremove_nics:
913
        self.BurnAddRemoveNICs()
914

    
915
      if opts.do_activate_disks:
916
        self.BurnActivateDisks()
917

    
918
      if opts.rename:
919
        self.BurnRename()
920

    
921
      if opts.do_startstop:
922
        self.BurnStopStart()
923

    
924
      has_err = False
925
    finally:
926
      if has_err:
927
        Log("Error detected: opcode buffer follows:\n\n")
928
        Log(self.GetFeedbackBuf())
929
        Log("\n\n")
930
      if not self.opts.keep_instances:
931
        try:
932
          self.BurnRemove()
933
        except Exception, err:  # pylint: disable-msg=W0703
934
          if has_err: # already detected errors, so errors in removal
935
                      # are quite expected
936
            Log("Note: error detected during instance remove: %s" % str(err))
937
          else: # non-expected error
938
            raise
939

    
940
    return 0
941

    
942

    
943
def main():
944
  """Main function"""
945

    
946
  burner = Burner()
947
  return burner.BurninCluster()
948

    
949

    
950
if __name__ == "__main__":
951
  main()