Statistics
| Branch: | Tag: | Revision:

root / tools / burnin @ 9cdb9578

History | View | Annotate | Download (32.3 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Burnin program
23

    
24
"""
25

    
26
import os
27
import sys
28
import optparse
29
import time
30
import socket
31
import urllib
32
from itertools import izip, islice, cycle
33
from cStringIO import StringIO
34

    
35
from ganeti import opcodes
36
from ganeti import constants
37
from ganeti import cli
38
from ganeti import errors
39
from ganeti import utils
40

    
41

    
42
USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...")
43

    
44
MAX_RETRIES = 3
45

    
46
class InstanceDown(Exception):
47
  """The checked instance was not up"""
48

    
49

    
50
class BurninFailure(Exception):
51
  """Failure detected during burning"""
52

    
53

    
54
def Usage():
55
  """Shows program usage information and exits the program."""
56

    
57
  print >> sys.stderr, "Usage:"
58
  print >> sys.stderr, USAGE
59
  sys.exit(2)
60

    
61

    
62
def Log(msg, indent=0):
63
  """Simple function that prints out its argument.
64

    
65
  """
66
  headers = {
67
    0: "- ",
68
    1: "* ",
69
    2: ""
70
    }
71
  sys.stdout.write("%*s%s%s\n" % (2*indent, "",
72
                                   headers.get(indent, "  "), msg))
73
  sys.stdout.flush()
74

    
75
def Err(msg, exit_code=1):
76
  """Simple error logging that prints to stderr.
77

    
78
  """
79
  sys.stderr.write(msg + "\n")
80
  sys.stderr.flush()
81
  sys.exit(exit_code)
82

    
83

    
84
class SimpleOpener(urllib.FancyURLopener):
85
  """A simple url opener"""
86

    
87
  def prompt_user_passwd(self, host, realm, clear_cache = 0):
88
    """No-interaction version of prompt_user_passwd."""
89
    return None, None
90

    
91
  def http_error_default(self, url, fp, errcode, errmsg, headers):
92
    """Custom error handling"""
93
    # make sure sockets are not left in CLOSE_WAIT, this is similar
94
    # but with a different exception to the BasicURLOpener class
95
    _ = fp.read() # throw away data
96
    fp.close()
97
    raise InstanceDown("HTTP error returned: code %s, msg %s" %
98
                       (errcode, errmsg))
99

    
100

    
101
OPTIONS = [
102
  cli.cli_option("-o", "--os", dest="os", default=None,
103
                 help="OS to use during burnin",
104
                 metavar="<OS>",
105
                 completion_suggest=cli.OPT_COMPL_ONE_OS),
106
  cli.cli_option("--disk-size", dest="disk_size",
107
                 help="Disk size (determines disk count)",
108
                 default="128m", type="string", metavar="<size,size,...>",
109
                 completion_suggest=("128M 512M 1G 4G 1G,256M"
110
                                     " 4G,1G,1G 10G").split()),
111
  cli.cli_option("--disk-growth", dest="disk_growth", help="Disk growth",
112
                 default="128m", type="string", metavar="<size,size,...>"),
113
  cli.cli_option("--mem-size", dest="mem_size", help="Memory size",
114
                 default=128, type="unit", metavar="<size>",
115
                 completion_suggest=("128M 256M 512M 1G 4G 8G"
116
                                     " 12G 16G").split()),
117
  cli.VERBOSE_OPT,
118
  cli.cli_option("--no-replace1", dest="do_replace1",
119
                 help="Skip disk replacement with the same secondary",
120
                 action="store_false", default=True),
121
  cli.cli_option("--no-replace2", dest="do_replace2",
122
                 help="Skip disk replacement with a different secondary",
123
                 action="store_false", default=True),
124
  cli.cli_option("--no-failover", dest="do_failover",
125
                 help="Skip instance failovers", action="store_false",
126
                 default=True),
127
  cli.cli_option("--no-migrate", dest="do_migrate",
128
                 help="Skip instance live migration",
129
                 action="store_false", default=True),
130
  cli.cli_option("--no-move", dest="do_move",
131
                 help="Skip instance moves", action="store_false",
132
                 default=True),
133
  cli.cli_option("--no-importexport", dest="do_importexport",
134
                 help="Skip instance export/import", action="store_false",
135
                 default=True),
136
  cli.cli_option("--no-startstop", dest="do_startstop",
137
                 help="Skip instance stop/start", action="store_false",
138
                 default=True),
139
  cli.cli_option("--no-reinstall", dest="do_reinstall",
140
                 help="Skip instance reinstall", action="store_false",
141
                 default=True),
142
  cli.cli_option("--no-reboot", dest="do_reboot",
143
                 help="Skip instance reboot", action="store_false",
144
                 default=True),
145
  cli.cli_option("--no-activate-disks", dest="do_activate_disks",
146
                 help="Skip disk activation/deactivation",
147
                 action="store_false", default=True),
148
  cli.cli_option("--no-add-disks", dest="do_addremove_disks",
149
                 help="Skip disk addition/removal",
150
                 action="store_false", default=True),
151
  cli.cli_option("--no-add-nics", dest="do_addremove_nics",
152
                 help="Skip NIC addition/removal",
153
                 action="store_false", default=True),
154
  cli.cli_option("--no-nics", dest="nics",
155
                 help="No network interfaces", action="store_const",
156
                 const=[], default=[{}]),
157
  cli.cli_option("--rename", dest="rename", default=None,
158
                 help=("Give one unused instance name which is taken"
159
                       " to start the renaming sequence"),
160
                 metavar="<instance_name>"),
161
  cli.cli_option("-t", "--disk-template", dest="disk_template",
162
                 choices=list(constants.DISK_TEMPLATES),
163
                 default=constants.DT_DRBD8,
164
                 help="Disk template (diskless, file, plain or drbd) [drbd]"),
165
  cli.cli_option("-n", "--nodes", dest="nodes", default="",
166
                 help=("Comma separated list of nodes to perform"
167
                       " the burnin on (defaults to all nodes)"),
168
                 completion_suggest=cli.OPT_COMPL_MANY_NODES),
169
  cli.cli_option("-I", "--iallocator", dest="iallocator",
170
                 default=None, type="string",
171
                 help=("Perform the allocation using an iallocator"
172
                       " instead of fixed node spread (node restrictions no"
173
                       " longer apply, therefore -n/--nodes must not be"
174
                       " used"),
175
                 completion_suggest=cli.OPT_COMPL_ONE_IALLOCATOR),
176
  cli.cli_option("-p", "--parallel", default=False, action="store_true",
177
                 dest="parallel",
178
                 help=("Enable parallelization of some operations in"
179
                       " order to speed burnin or to test granular locking")),
180
  cli.cli_option("--net-timeout", default=15, type="int",
181
                 dest="net_timeout",
182
                 help=("The instance check network timeout in seconds"
183
                       " (defaults to 15 seconds)"),
184
                 completion_suggest="15 60 300 900".split()),
185
  cli.cli_option("-C", "--http-check", default=False, action="store_true",
186
                 dest="http_check",
187
                 help=("Enable checking of instance status via http,"
188
                       " looking for /hostname.txt that should contain the"
189
                       " name of the instance")),
190
  cli.cli_option("-K", "--keep-instances", default=False,
191
                 action="store_true",
192
                 dest="keep_instances",
193
                 help=("Leave instances on the cluster after burnin,"
194
                       " for investigation in case of errors or simply"
195
                       " to use them")),
196
  ]
197

    
198
# Mainly used for bash completion
199
ARGUMENTS = [cli.ArgInstance(min=1)]
200

    
201

    
202
class Burner(object):
203
  """Burner class."""
204

    
205
  def __init__(self):
206
    """Constructor."""
207
    utils.SetupLogging(constants.LOG_BURNIN, debug=False, stderr_logging=True)
208
    self.url_opener = SimpleOpener()
209
    self._feed_buf = StringIO()
210
    self.nodes = []
211
    self.instances = []
212
    self.to_rem = []
213
    self.queued_ops = []
214
    self.opts = None
215
    self.queue_retry = False
216
    self.disk_count = self.disk_growth = self.disk_size = None
217
    self.hvp = self.bep = None
218
    self.ParseOptions()
219
    self.cl = cli.GetClient()
220
    self.GetState()
221

    
222
  def ClearFeedbackBuf(self):
223
    """Clear the feedback buffer."""
224
    self._feed_buf.truncate(0)
225

    
226
  def GetFeedbackBuf(self):
227
    """Return the contents of the buffer."""
228
    return self._feed_buf.getvalue()
229

    
230
  def Feedback(self, msg):
231
    """Acumulate feedback in our buffer."""
232
    self._feed_buf.write("%s %s\n" % (time.ctime(utils.MergeTime(msg[0])),
233
                                      msg[2]))
234
    if self.opts.verbose:
235
      Log(msg, indent=3)
236

    
237
  def MaybeRetry(self, retry_count, msg, fn, *args):
238
    """Possibly retry a given function execution.
239

    
240
    @type retry_count: int
241
    @param retry_count: retry counter:
242
        - 0: non-retryable action
243
        - 1: last retry for a retryable action
244
        - MAX_RETRIES: original try for a retryable action
245
    @type msg: str
246
    @param msg: the kind of the operation
247
    @type fn: callable
248
    @param fn: the function to be called
249

    
250
    """
251
    try:
252
      val = fn(*args)
253
      if retry_count > 0 and retry_count < MAX_RETRIES:
254
        Log("Idempotent %s succeeded after %d retries" %
255
            (msg, MAX_RETRIES - retry_count))
256
      return val
257
    except Exception, err:
258
      if retry_count == 0:
259
        Log("Non-idempotent %s failed, aborting" % (msg, ))
260
        raise
261
      elif retry_count == 1:
262
        Log("Idempotent %s repeated failure, aborting" % (msg, ))
263
        raise
264
      else:
265
        Log("Idempotent %s failed, retry #%d/%d: %s" %
266
            (msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err))
267
        self.MaybeRetry(retry_count - 1, msg, fn, *args)
268

    
269
  def _ExecOp(self, *ops):
270
    """Execute one or more opcodes and manage the exec buffer.
271

    
272
    @result: if only opcode has been passed, we return its result;
273
        otherwise we return the list of results
274

    
275
    """
276
    job_id = cli.SendJob(ops, cl=self.cl)
277
    results = cli.PollJob(job_id, cl=self.cl, feedback_fn=self.Feedback)
278
    if len(ops) == 1:
279
      return results[0]
280
    else:
281
      return results
282

    
283
  def ExecOp(self, retry, *ops):
284
    """Execute one or more opcodes and manage the exec buffer.
285

    
286
    @result: if only opcode has been passed, we return its result;
287
        otherwise we return the list of results
288

    
289
    """
290
    if retry:
291
      rval = MAX_RETRIES
292
    else:
293
      rval = 0
294
    return self.MaybeRetry(rval, "opcode", self._ExecOp, *ops)
295

    
296
  def ExecOrQueue(self, name, *ops):
297
    """Execute an opcode and manage the exec buffer."""
298
    if self.opts.parallel:
299
      self.queued_ops.append((ops, name))
300
    else:
301
      return self.ExecOp(self.queue_retry, *ops)
302

    
303
  def StartBatch(self, retry):
304
    """Start a new batch of jobs.
305

    
306
    @param retry: whether this is a retryable batch
307

    
308
    """
309
    self.queued_ops = []
310
    self.queue_retry = retry
311

    
312
  def CommitQueue(self):
313
    """Execute all submitted opcodes in case of parallel burnin"""
314
    if not self.opts.parallel:
315
      return
316

    
317
    if self.queue_retry:
318
      rval = MAX_RETRIES
319
    else:
320
      rval = 0
321

    
322
    try:
323
      results = self.MaybeRetry(rval, "jobset", self.ExecJobSet,
324
                                self.queued_ops)
325
    finally:
326
      self.queued_ops = []
327
    return results
328

    
329
  def ExecJobSet(self, jobs):
330
    """Execute a set of jobs and return once all are done.
331

    
332
    The method will return the list of results, if all jobs are
333
    successful. Otherwise, OpExecError will be raised from within
334
    cli.py.
335

    
336
    """
337
    self.ClearFeedbackBuf()
338
    job_ids = [cli.SendJob(row[0], cl=self.cl) for row in jobs]
339
    Log("Submitted job ID(s) %s" % ", ".join(job_ids), indent=1)
340
    results = []
341
    for jid, (_, iname) in zip(job_ids, jobs):
342
      Log("waiting for job %s for %s" % (jid, iname), indent=2)
343
      try:
344
        results.append(cli.PollJob(jid, cl=self.cl, feedback_fn=self.Feedback))
345
      except Exception, err:
346
        Log("Job for %s failed: %s" % (iname, err))
347
    if len(results) != len(jobs):
348
      raise BurninFailure()
349
    return results
350

    
351
  def _DoCheckInstances(fn):
352
    """Decorator for checking instances.
353

    
354
    """
355
    def wrapper(self, *args, **kwargs):
356
      val = fn(self, *args, **kwargs)
357
      for instance in self.instances:
358
        self._CheckInstanceAlive(instance)
359
      return val
360

    
361
    return wrapper
362

    
363
  def _DoBatch(retry):
364
    """Decorator for possible batch operations.
365

    
366
    Must come after the _DoCheckInstances decorator (if any).
367

    
368
    @param retry: whether this is a retryable batch, will be
369
        passed to StartBatch
370

    
371
    """
372
    def wrap(fn):
373
      def batched(self, *args, **kwargs):
374
        self.StartBatch(retry)
375
        val = fn(self, *args, **kwargs)
376
        self.CommitQueue()
377
        return val
378
      return batched
379

    
380
    return wrap
381

    
382
  def ParseOptions(self):
383
    """Parses the command line options.
384

    
385
    In case of command line errors, it will show the usage and exit the
386
    program.
387

    
388
    """
389
    parser = optparse.OptionParser(usage="\n%s" % USAGE,
390
                                   version=("%%prog (ganeti) %s" %
391
                                            constants.RELEASE_VERSION),
392
                                   option_list=OPTIONS)
393

    
394
    options, args = parser.parse_args()
395
    if len(args) < 1 or options.os is None:
396
      Usage()
397

    
398
    supported_disk_templates = (constants.DT_DISKLESS,
399
                                constants.DT_FILE,
400
                                constants.DT_PLAIN,
401
                                constants.DT_DRBD8)
402
    if options.disk_template not in supported_disk_templates:
403
      Err("Unknown disk template '%s'" % options.disk_template)
404

    
405
    if options.disk_template == constants.DT_DISKLESS:
406
      disk_size = disk_growth = []
407
      options.do_addremove_disks = False
408
    else:
409
      disk_size = [utils.ParseUnit(v) for v in options.disk_size.split(",")]
410
      disk_growth = [utils.ParseUnit(v)
411
                     for v in options.disk_growth.split(",")]
412
      if len(disk_growth) != len(disk_size):
413
        Err("Wrong disk sizes/growth combination")
414
    if ((disk_size and options.disk_template == constants.DT_DISKLESS) or
415
        (not disk_size and options.disk_template != constants.DT_DISKLESS)):
416
      Err("Wrong disk count/disk template combination")
417

    
418
    self.disk_size = disk_size
419
    self.disk_growth = disk_growth
420
    self.disk_count = len(disk_size)
421

    
422
    if options.nodes and options.iallocator:
423
      Err("Give either the nodes option or the iallocator option, not both")
424

    
425
    self.opts = options
426
    self.instances = args
427
    self.bep = {
428
      constants.BE_MEMORY: options.mem_size,
429
      constants.BE_VCPUS: 1,
430
      }
431
    self.hvp = {}
432

    
433
    socket.setdefaulttimeout(options.net_timeout)
434

    
435
  def GetState(self):
436
    """Read the cluster state from the config."""
437
    if self.opts.nodes:
438
      names = self.opts.nodes.split(",")
439
    else:
440
      names = []
441
    try:
442
      op = opcodes.OpQueryNodes(output_fields=["name", "offline", "drained"],
443
                                names=names, use_locking=True)
444
      result = self.ExecOp(True, op)
445
    except errors.GenericError, err:
446
      err_code, msg = cli.FormatError(err)
447
      Err(msg, exit_code=err_code)
448
    self.nodes = [data[0] for data in result if not (data[1] or data[2])]
449

    
450
    op_diagos = opcodes.OpDiagnoseOS(output_fields=["name", "valid"], names=[])
451
    result = self.ExecOp(True, op_diagos)
452

    
453
    if not result:
454
      Err("Can't get the OS list")
455

    
456
    # filter non-valid OS-es
457
    os_set = [val[0] for val in result if val[1]]
458

    
459
    if self.opts.os not in os_set:
460
      Err("OS '%s' not found" % self.opts.os)
461

    
462
  @_DoCheckInstances
463
  @_DoBatch(False)
464
  def BurnCreateInstances(self):
465
    """Create the given instances.
466

    
467
    """
468
    self.to_rem = []
469
    mytor = izip(cycle(self.nodes),
470
                 islice(cycle(self.nodes), 1, None),
471
                 self.instances)
472

    
473
    Log("Creating instances")
474
    for pnode, snode, instance in mytor:
475
      Log("instance %s" % instance, indent=1)
476
      if self.opts.iallocator:
477
        pnode = snode = None
478
        msg = "with iallocator %s" % self.opts.iallocator
479
      elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
480
        snode = None
481
        msg = "on %s" % pnode
482
      else:
483
        msg = "on %s, %s" % (pnode, snode)
484

    
485
      Log(msg, indent=2)
486

    
487
      op = opcodes.OpCreateInstance(instance_name=instance,
488
                                    disks = [ {"size": size}
489
                                              for size in self.disk_size],
490
                                    disk_template=self.opts.disk_template,
491
                                    nics=self.opts.nics,
492
                                    mode=constants.INSTANCE_CREATE,
493
                                    os_type=self.opts.os,
494
                                    pnode=pnode,
495
                                    snode=snode,
496
                                    start=True,
497
                                    ip_check=True,
498
                                    wait_for_sync=True,
499
                                    file_driver="loop",
500
                                    file_storage_dir=None,
501
                                    iallocator=self.opts.iallocator,
502
                                    beparams=self.bep,
503
                                    hvparams=self.hvp,
504
                                    )
505

    
506
      self.ExecOrQueue(instance, op)
507
      self.to_rem.append(instance)
508

    
509
  @_DoBatch(False)
510
  def BurnGrowDisks(self):
511
    """Grow both the os and the swap disks by the requested amount, if any."""
512
    Log("Growing disks")
513
    for instance in self.instances:
514
      Log("instance %s" % instance, indent=1)
515
      for idx, growth in enumerate(self.disk_growth):
516
        if growth > 0:
517
          op = opcodes.OpGrowDisk(instance_name=instance, disk=idx,
518
                                  amount=growth, wait_for_sync=True)
519
          Log("increase disk/%s by %s MB" % (idx, growth), indent=2)
520
          self.ExecOrQueue(instance, op)
521

    
522
  @_DoBatch(True)
523
  def BurnReplaceDisks1D8(self):
524
    """Replace disks on primary and secondary for drbd8."""
525
    Log("Replacing disks on the same nodes")
526
    for instance in self.instances:
527
      Log("instance %s" % instance, indent=1)
528
      ops = []
529
      for mode in constants.REPLACE_DISK_SEC, constants.REPLACE_DISK_PRI:
530
        op = opcodes.OpReplaceDisks(instance_name=instance,
531
                                    mode=mode,
532
                                    disks=[i for i in range(self.disk_count)])
533
        Log("run %s" % mode, indent=2)
534
        ops.append(op)
535
      self.ExecOrQueue(instance, *ops)
536

    
537
  @_DoBatch(True)
538
  def BurnReplaceDisks2(self):
539
    """Replace secondary node."""
540
    Log("Changing the secondary node")
541
    mode = constants.REPLACE_DISK_CHG
542

    
543
    mytor = izip(islice(cycle(self.nodes), 2, None),
544
                 self.instances)
545
    for tnode, instance in mytor:
546
      Log("instance %s" % instance, indent=1)
547
      if self.opts.iallocator:
548
        tnode = None
549
        msg = "with iallocator %s" % self.opts.iallocator
550
      else:
551
        msg = tnode
552
      op = opcodes.OpReplaceDisks(instance_name=instance,
553
                                  mode=mode,
554
                                  remote_node=tnode,
555
                                  iallocator=self.opts.iallocator,
556
                                  disks=[])
557
      Log("run %s %s" % (mode, msg), indent=2)
558
      self.ExecOrQueue(instance, op)
559

    
560
  @_DoCheckInstances
561
  @_DoBatch(False)
562
  def BurnFailover(self):
563
    """Failover the instances."""
564
    Log("Failing over instances")
565
    for instance in self.instances:
566
      Log("instance %s" % instance, indent=1)
567
      op = opcodes.OpFailoverInstance(instance_name=instance,
568
                                      ignore_consistency=False)
569
      self.ExecOrQueue(instance, op)
570

    
571
  @_DoCheckInstances
572
  @_DoBatch(False)
573
  def BurnMove(self):
574
    """Move the instances."""
575
    Log("Moving instances")
576
    mytor = izip(islice(cycle(self.nodes), 1, None),
577
                 self.instances)
578
    for tnode, instance in mytor:
579
      Log("instance %s" % instance, indent=1)
580
      op = opcodes.OpMoveInstance(instance_name=instance,
581
                                  target_node=tnode)
582
      self.ExecOrQueue(instance, op)
583

    
584
  @_DoBatch(False)
585
  def BurnMigrate(self):
586
    """Migrate the instances."""
587
    Log("Migrating instances")
588
    for instance in self.instances:
589
      Log("instance %s" % instance, indent=1)
590
      op1 = opcodes.OpMigrateInstance(instance_name=instance, live=True,
591
                                      cleanup=False)
592

    
593
      op2 = opcodes.OpMigrateInstance(instance_name=instance, live=True,
594
                                      cleanup=True)
595
      Log("migration and migration cleanup", indent=2)
596
      self.ExecOrQueue(instance, op1, op2)
597

    
598
  @_DoCheckInstances
599
  @_DoBatch(False)
600
  def BurnImportExport(self):
601
    """Export the instance, delete it, and import it back.
602

    
603
    """
604
    Log("Exporting and re-importing instances")
605
    mytor = izip(cycle(self.nodes),
606
                 islice(cycle(self.nodes), 1, None),
607
                 islice(cycle(self.nodes), 2, None),
608
                 self.instances)
609

    
610
    for pnode, snode, enode, instance in mytor:
611
      Log("instance %s" % instance, indent=1)
612
      # read the full name of the instance
613
      nam_op = opcodes.OpQueryInstances(output_fields=["name"],
614
                                        names=[instance], use_locking=True)
615
      full_name = self.ExecOp(False, nam_op)[0][0]
616

    
617
      if self.opts.iallocator:
618
        pnode = snode = None
619
        import_log_msg = ("import from %s"
620
                          " with iallocator %s" %
621
                          (enode, self.opts.iallocator))
622
      elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
623
        snode = None
624
        import_log_msg = ("import from %s to %s" %
625
                          (enode, pnode))
626
      else:
627
        import_log_msg = ("import from %s to %s, %s" %
628
                          (enode, pnode, snode))
629

    
630
      exp_op = opcodes.OpExportInstance(instance_name=instance,
631
                                           target_node=enode,
632
                                           shutdown=True)
633
      rem_op = opcodes.OpRemoveInstance(instance_name=instance,
634
                                        ignore_failures=True)
635
      imp_dir = os.path.join(constants.EXPORT_DIR, full_name)
636
      imp_op = opcodes.OpCreateInstance(instance_name=instance,
637
                                        disks = [ {"size": size}
638
                                                  for size in self.disk_size],
639
                                        disk_template=self.opts.disk_template,
640
                                        nics=self.opts.nics,
641
                                        mode=constants.INSTANCE_IMPORT,
642
                                        src_node=enode,
643
                                        src_path=imp_dir,
644
                                        pnode=pnode,
645
                                        snode=snode,
646
                                        start=True,
647
                                        ip_check=True,
648
                                        wait_for_sync=True,
649
                                        file_storage_dir=None,
650
                                        file_driver="loop",
651
                                        iallocator=self.opts.iallocator,
652
                                        beparams=self.bep,
653
                                        hvparams=self.hvp,
654
                                        )
655

    
656
      erem_op = opcodes.OpRemoveExport(instance_name=instance)
657

    
658
      Log("export to node %s" % enode, indent=2)
659
      Log("remove instance", indent=2)
660
      Log(import_log_msg, indent=2)
661
      Log("remove export", indent=2)
662
      self.ExecOrQueue(instance, exp_op, rem_op, imp_op, erem_op)
663

    
664
  def StopInstanceOp(self, instance):
665
    """Stop given instance."""
666
    return opcodes.OpShutdownInstance(instance_name=instance)
667

    
668
  def StartInstanceOp(self, instance):
669
    """Start given instance."""
670
    return opcodes.OpStartupInstance(instance_name=instance, force=False)
671

    
672
  def RenameInstanceOp(self, instance, instance_new):
673
    """Rename instance."""
674
    return opcodes.OpRenameInstance(instance_name=instance,
675
                                    new_name=instance_new)
676

    
677
  @_DoCheckInstances
678
  @_DoBatch(True)
679
  def BurnStopStart(self):
680
    """Stop/start the instances."""
681
    Log("Stopping and starting instances")
682
    for instance in self.instances:
683
      Log("instance %s" % instance, indent=1)
684
      op1 = self.StopInstanceOp(instance)
685
      op2 = self.StartInstanceOp(instance)
686
      self.ExecOrQueue(instance, op1, op2)
687

    
688
  @_DoBatch(False)
689
  def BurnRemove(self):
690
    """Remove the instances."""
691
    Log("Removing instances")
692
    for instance in self.to_rem:
693
      Log("instance %s" % instance, indent=1)
694
      op = opcodes.OpRemoveInstance(instance_name=instance,
695
                                    ignore_failures=True)
696
      self.ExecOrQueue(instance, op)
697

    
698
  def BurnRename(self):
699
    """Rename the instances.
700

    
701
    Note that this function will not execute in parallel, since we
702
    only have one target for rename.
703

    
704
    """
705
    Log("Renaming instances")
706
    rename = self.opts.rename
707
    for instance in self.instances:
708
      Log("instance %s" % instance, indent=1)
709
      op_stop1 = self.StopInstanceOp(instance)
710
      op_stop2 = self.StopInstanceOp(rename)
711
      op_rename1 = self.RenameInstanceOp(instance, rename)
712
      op_rename2 = self.RenameInstanceOp(rename, instance)
713
      op_start1 = self.StartInstanceOp(rename)
714
      op_start2 = self.StartInstanceOp(instance)
715
      self.ExecOp(False, op_stop1, op_rename1, op_start1)
716
      self._CheckInstanceAlive(rename)
717
      self.ExecOp(False, op_stop2, op_rename2, op_start2)
718
      self._CheckInstanceAlive(instance)
719

    
720
  @_DoCheckInstances
721
  @_DoBatch(True)
722
  def BurnReinstall(self):
723
    """Reinstall the instances."""
724
    Log("Reinstalling instances")
725
    for instance in self.instances:
726
      Log("instance %s" % instance, indent=1)
727
      op1 = self.StopInstanceOp(instance)
728
      op2 = opcodes.OpReinstallInstance(instance_name=instance)
729
      Log("reinstall without passing the OS", indent=2)
730
      op3 = opcodes.OpReinstallInstance(instance_name=instance,
731
                                        os_type=self.opts.os)
732
      Log("reinstall specifying the OS", indent=2)
733
      op4 = self.StartInstanceOp(instance)
734
      self.ExecOrQueue(instance, op1, op2, op3, op4)
735

    
736
  @_DoCheckInstances
737
  @_DoBatch(True)
738
  def BurnReboot(self):
739
    """Reboot the instances."""
740
    Log("Rebooting instances")
741
    for instance in self.instances:
742
      Log("instance %s" % instance, indent=1)
743
      ops = []
744
      for reboot_type in constants.REBOOT_TYPES:
745
        op = opcodes.OpRebootInstance(instance_name=instance,
746
                                      reboot_type=reboot_type,
747
                                      ignore_secondaries=False)
748
        Log("reboot with type '%s'" % reboot_type, indent=2)
749
        ops.append(op)
750
      self.ExecOrQueue(instance, *ops)
751

    
752
  @_DoCheckInstances
753
  @_DoBatch(True)
754
  def BurnActivateDisks(self):
755
    """Activate and deactivate disks of the instances."""
756
    Log("Activating/deactivating disks")
757
    for instance in self.instances:
758
      Log("instance %s" % instance, indent=1)
759
      op_start = self.StartInstanceOp(instance)
760
      op_act = opcodes.OpActivateInstanceDisks(instance_name=instance)
761
      op_deact = opcodes.OpDeactivateInstanceDisks(instance_name=instance)
762
      op_stop = self.StopInstanceOp(instance)
763
      Log("activate disks when online", indent=2)
764
      Log("activate disks when offline", indent=2)
765
      Log("deactivate disks (when offline)", indent=2)
766
      self.ExecOrQueue(instance, op_act, op_stop, op_act, op_deact, op_start)
767

    
768
  @_DoCheckInstances
769
  @_DoBatch(False)
770
  def BurnAddRemoveDisks(self):
771
    """Add and remove an extra disk for the instances."""
772
    Log("Adding and removing disks")
773
    for instance in self.instances:
774
      Log("instance %s" % instance, indent=1)
775
      op_add = opcodes.OpSetInstanceParams(\
776
        instance_name=instance,
777
        disks=[(constants.DDM_ADD, {"size": self.disk_size[0]})])
778
      op_rem = opcodes.OpSetInstanceParams(\
779
        instance_name=instance, disks=[(constants.DDM_REMOVE, {})])
780
      op_stop = self.StopInstanceOp(instance)
781
      op_start = self.StartInstanceOp(instance)
782
      Log("adding a disk", indent=2)
783
      Log("removing last disk", indent=2)
784
      self.ExecOrQueue(instance, op_add, op_stop, op_rem, op_start)
785

    
786
  @_DoBatch(False)
787
  def BurnAddRemoveNICs(self):
788
    """Add and remove an extra NIC for the instances."""
789
    Log("Adding and removing NICs")
790
    for instance in self.instances:
791
      Log("instance %s" % instance, indent=1)
792
      op_add = opcodes.OpSetInstanceParams(\
793
        instance_name=instance, nics=[(constants.DDM_ADD, {})])
794
      op_rem = opcodes.OpSetInstanceParams(\
795
        instance_name=instance, nics=[(constants.DDM_REMOVE, {})])
796
      Log("adding a NIC", indent=2)
797
      Log("removing last NIC", indent=2)
798
      self.ExecOrQueue(instance, op_add, op_rem)
799

    
800
  def _CheckInstanceAlive(self, instance):
801
    """Check if an instance is alive by doing http checks.
802

    
803
    This will try to retrieve the url on the instance /hostname.txt
804
    and check that it contains the hostname of the instance. In case
805
    we get ECONNREFUSED, we retry up to the net timeout seconds, for
806
    any other error we abort.
807

    
808
    """
809
    if not self.opts.http_check:
810
      return
811
    end_time = time.time() + self.opts.net_timeout
812
    url = None
813
    while time.time() < end_time and url is None:
814
      try:
815
        url = self.url_opener.open("http://%s/hostname.txt" % instance)
816
      except IOError:
817
        # here we can have connection refused, no route to host, etc.
818
        time.sleep(1)
819
    if url is None:
820
      raise InstanceDown(instance, "Cannot contact instance")
821
    hostname = url.read().strip()
822
    url.close()
823
    if hostname != instance:
824
      raise InstanceDown(instance, ("Hostname mismatch, expected %s, got %s" %
825
                                    (instance, hostname)))
826

    
827
  def BurninCluster(self):
828
    """Test a cluster intensively.
829

    
830
    This will create instances and then start/stop/failover them.
831
    It is safe for existing instances but could impact performance.
832

    
833
    """
834

    
835
    opts = self.opts
836

    
837
    Log("Testing global parameters")
838

    
839
    if (len(self.nodes) == 1 and
840
        opts.disk_template not in (constants.DT_DISKLESS, constants.DT_PLAIN,
841
                                   constants.DT_FILE)):
842
      Err("When one node is available/selected the disk template must"
843
          " be 'diskless', 'file' or 'plain'")
844

    
845
    has_err = True
846
    try:
847
      self.BurnCreateInstances()
848
      if opts.do_replace1 and opts.disk_template in constants.DTS_NET_MIRROR:
849
        self.BurnReplaceDisks1D8()
850
      if (opts.do_replace2 and len(self.nodes) > 2 and
851
          opts.disk_template in constants.DTS_NET_MIRROR) :
852
        self.BurnReplaceDisks2()
853

    
854
      if (opts.disk_template != constants.DT_DISKLESS and
855
          utils.any(self.disk_growth, lambda n: n > 0)):
856
        self.BurnGrowDisks()
857

    
858
      if opts.do_failover and opts.disk_template in constants.DTS_NET_MIRROR:
859
        self.BurnFailover()
860

    
861
      if opts.do_migrate and opts.disk_template == constants.DT_DRBD8:
862
        self.BurnMigrate()
863

    
864
      if opts.do_move and opts.disk_template in [constants.DT_PLAIN,
865
                                                 constants.DT_FILE]:
866
        self.BurnMove()
867

    
868
      if (opts.do_importexport and
869
          opts.disk_template not in (constants.DT_DISKLESS,
870
                                     constants.DT_FILE)):
871
        self.BurnImportExport()
872

    
873
      if opts.do_reinstall:
874
        self.BurnReinstall()
875

    
876
      if opts.do_reboot:
877
        self.BurnReboot()
878

    
879
      if opts.do_addremove_disks:
880
        self.BurnAddRemoveDisks()
881

    
882
      if opts.do_addremove_nics:
883
        self.BurnAddRemoveNICs()
884

    
885
      if opts.do_activate_disks:
886
        self.BurnActivateDisks()
887

    
888
      if opts.rename:
889
        self.BurnRename()
890

    
891
      if opts.do_startstop:
892
        self.BurnStopStart()
893

    
894
      has_err = False
895
    finally:
896
      if has_err:
897
        Log("Error detected: opcode buffer follows:\n\n")
898
        Log(self.GetFeedbackBuf())
899
        Log("\n\n")
900
      if not self.opts.keep_instances:
901
        try:
902
          self.BurnRemove()
903
        except Exception, err:
904
          if has_err: # already detected errors, so errors in removal
905
                      # are quite expected
906
            Log("Note: error detected during instance remove: %s" % str(err))
907
          else: # non-expected error
908
            raise
909

    
910
    return 0
911

    
912

    
913
def main():
914
  """Main function"""
915

    
916
  burner = Burner()
917
  return burner.BurninCluster()
918

    
919

    
920
if __name__ == "__main__":
921
  main()