Statistics
| Branch: | Tag: | Revision:

root / tools / burnin @ 544ca43b

History | View | Annotate | Download (32.7 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Burnin program
23

    
24
"""
25

    
26
import os
27
import sys
28
import optparse
29
import time
30
import socket
31
import urllib
32
from itertools import izip, islice, cycle
33
from cStringIO import StringIO
34

    
35
from ganeti import opcodes
36
from ganeti import constants
37
from ganeti import cli
38
from ganeti import errors
39
from ganeti import utils
40

    
41

    
42
USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...")
43

    
44
MAX_RETRIES = 3
45

    
46
class InstanceDown(Exception):
47
  """The checked instance was not up"""
48

    
49

    
50
class BurninFailure(Exception):
51
  """Failure detected during burning"""
52

    
53

    
54
def Usage():
55
  """Shows program usage information and exits the program."""
56

    
57
  print >> sys.stderr, "Usage:"
58
  print >> sys.stderr, USAGE
59
  sys.exit(2)
60

    
61

    
62
def Log(msg, indent=0):
63
  """Simple function that prints out its argument.
64

    
65
  """
66
  headers = {
67
    0: "- ",
68
    1: "* ",
69
    2: ""
70
    }
71
  sys.stdout.write("%*s%s%s\n" % (2*indent, "",
72
                                   headers.get(indent, "  "), msg))
73
  sys.stdout.flush()
74

    
75
def Err(msg, exit_code=1):
76
  """Simple error logging that prints to stderr.
77

    
78
  """
79
  sys.stderr.write(msg + "\n")
80
  sys.stderr.flush()
81
  sys.exit(exit_code)
82

    
83

    
84
class SimpleOpener(urllib.FancyURLopener):
85
  """A simple url opener"""
86

    
87
  def prompt_user_passwd(self, host, realm, clear_cache = 0):
88
    """No-interaction version of prompt_user_passwd."""
89
    return None, None
90

    
91
  def http_error_default(self, url, fp, errcode, errmsg, headers):
92
    """Custom error handling"""
93
    # make sure sockets are not left in CLOSE_WAIT, this is similar
94
    # but with a different exception to the BasicURLOpener class
95
    _ = fp.read() # throw away data
96
    fp.close()
97
    raise InstanceDown("HTTP error returned: code %s, msg %s" %
98
                       (errcode, errmsg))
99

    
100

    
101
OPTIONS = [
102
  cli.cli_option("-o", "--os", dest="os", default=None,
103
                 help="OS to use during burnin",
104
                 metavar="<OS>",
105
                 completion_suggest=cli.OPT_COMPL_ONE_OS),
106
  cli.cli_option("--disk-size", dest="disk_size",
107
                 help="Disk size (determines disk count)",
108
                 default="128m", type="string", metavar="<size,size,...>",
109
                 completion_suggest=("128M 512M 1G 4G 1G,256M"
110
                                     " 4G,1G,1G 10G").split()),
111
  cli.cli_option("--disk-growth", dest="disk_growth", help="Disk growth",
112
                 default="128m", type="string", metavar="<size,size,...>"),
113
  cli.cli_option("--mem-size", dest="mem_size", help="Memory size",
114
                 default=128, type="unit", metavar="<size>",
115
                 completion_suggest=("128M 256M 512M 1G 4G 8G"
116
                                     " 12G 16G").split()),
117
  cli.VERBOSE_OPT,
118
  cli.NOIPCHECK_OPT,
119
  cli.NONAMECHECK_OPT,
120
  cli.cli_option("--no-replace1", dest="do_replace1",
121
                 help="Skip disk replacement with the same secondary",
122
                 action="store_false", default=True),
123
  cli.cli_option("--no-replace2", dest="do_replace2",
124
                 help="Skip disk replacement with a different secondary",
125
                 action="store_false", default=True),
126
  cli.cli_option("--no-failover", dest="do_failover",
127
                 help="Skip instance failovers", action="store_false",
128
                 default=True),
129
  cli.cli_option("--no-migrate", dest="do_migrate",
130
                 help="Skip instance live migration",
131
                 action="store_false", default=True),
132
  cli.cli_option("--no-move", dest="do_move",
133
                 help="Skip instance moves", action="store_false",
134
                 default=True),
135
  cli.cli_option("--no-importexport", dest="do_importexport",
136
                 help="Skip instance export/import", action="store_false",
137
                 default=True),
138
  cli.cli_option("--no-startstop", dest="do_startstop",
139
                 help="Skip instance stop/start", action="store_false",
140
                 default=True),
141
  cli.cli_option("--no-reinstall", dest="do_reinstall",
142
                 help="Skip instance reinstall", action="store_false",
143
                 default=True),
144
  cli.cli_option("--no-reboot", dest="do_reboot",
145
                 help="Skip instance reboot", action="store_false",
146
                 default=True),
147
  cli.cli_option("--no-activate-disks", dest="do_activate_disks",
148
                 help="Skip disk activation/deactivation",
149
                 action="store_false", default=True),
150
  cli.cli_option("--no-add-disks", dest="do_addremove_disks",
151
                 help="Skip disk addition/removal",
152
                 action="store_false", default=True),
153
  cli.cli_option("--no-add-nics", dest="do_addremove_nics",
154
                 help="Skip NIC addition/removal",
155
                 action="store_false", default=True),
156
  cli.cli_option("--no-nics", dest="nics",
157
                 help="No network interfaces", action="store_const",
158
                 const=[], default=[{}]),
159
  cli.cli_option("--rename", dest="rename", default=None,
160
                 help=("Give one unused instance name which is taken"
161
                       " to start the renaming sequence"),
162
                 metavar="<instance_name>"),
163
  cli.cli_option("-t", "--disk-template", dest="disk_template",
164
                 choices=list(constants.DISK_TEMPLATES),
165
                 default=constants.DT_DRBD8,
166
                 help="Disk template (diskless, file, plain or drbd) [drbd]"),
167
  cli.cli_option("-n", "--nodes", dest="nodes", default="",
168
                 help=("Comma separated list of nodes to perform"
169
                       " the burnin on (defaults to all nodes)"),
170
                 completion_suggest=cli.OPT_COMPL_MANY_NODES),
171
  cli.cli_option("-I", "--iallocator", dest="iallocator",
172
                 default=None, type="string",
173
                 help=("Perform the allocation using an iallocator"
174
                       " instead of fixed node spread (node restrictions no"
175
                       " longer apply, therefore -n/--nodes must not be"
176
                       " used"),
177
                 completion_suggest=cli.OPT_COMPL_ONE_IALLOCATOR),
178
  cli.cli_option("-p", "--parallel", default=False, action="store_true",
179
                 dest="parallel",
180
                 help=("Enable parallelization of some operations in"
181
                       " order to speed burnin or to test granular locking")),
182
  cli.cli_option("--net-timeout", default=15, type="int",
183
                 dest="net_timeout",
184
                 help=("The instance check network timeout in seconds"
185
                       " (defaults to 15 seconds)"),
186
                 completion_suggest="15 60 300 900".split()),
187
  cli.cli_option("-C", "--http-check", default=False, action="store_true",
188
                 dest="http_check",
189
                 help=("Enable checking of instance status via http,"
190
                       " looking for /hostname.txt that should contain the"
191
                       " name of the instance")),
192
  cli.cli_option("-K", "--keep-instances", default=False,
193
                 action="store_true",
194
                 dest="keep_instances",
195
                 help=("Leave instances on the cluster after burnin,"
196
                       " for investigation in case of errors or simply"
197
                       " to use them")),
198
  ]
199

    
200
# Mainly used for bash completion
201
ARGUMENTS = [cli.ArgInstance(min=1)]
202

    
203

    
204
class Burner(object):
205
  """Burner class."""
206

    
207
  def __init__(self):
208
    """Constructor."""
209
    utils.SetupLogging(constants.LOG_BURNIN, debug=False, stderr_logging=True)
210
    self.url_opener = SimpleOpener()
211
    self._feed_buf = StringIO()
212
    self.nodes = []
213
    self.instances = []
214
    self.to_rem = []
215
    self.queued_ops = []
216
    self.opts = None
217
    self.queue_retry = False
218
    self.disk_count = self.disk_growth = self.disk_size = None
219
    self.hvp = self.bep = None
220
    self.ParseOptions()
221
    self.cl = cli.GetClient()
222
    self.GetState()
223

    
224
  def ClearFeedbackBuf(self):
225
    """Clear the feedback buffer."""
226
    self._feed_buf.truncate(0)
227

    
228
  def GetFeedbackBuf(self):
229
    """Return the contents of the buffer."""
230
    return self._feed_buf.getvalue()
231

    
232
  def Feedback(self, msg):
233
    """Acumulate feedback in our buffer."""
234
    formatted_msg = "%s %s" % (time.ctime(utils.MergeTime(msg[0])), msg[2])
235
    self._feed_buf.write(formatted_msg + "\n")
236
    if self.opts.verbose:
237
      Log(formatted_msg, indent=3)
238

    
239
  def MaybeRetry(self, retry_count, msg, fn, *args):
240
    """Possibly retry a given function execution.
241

    
242
    @type retry_count: int
243
    @param retry_count: retry counter:
244
        - 0: non-retryable action
245
        - 1: last retry for a retryable action
246
        - MAX_RETRIES: original try for a retryable action
247
    @type msg: str
248
    @param msg: the kind of the operation
249
    @type fn: callable
250
    @param fn: the function to be called
251

    
252
    """
253
    try:
254
      val = fn(*args)
255
      if retry_count > 0 and retry_count < MAX_RETRIES:
256
        Log("Idempotent %s succeeded after %d retries" %
257
            (msg, MAX_RETRIES - retry_count))
258
      return val
259
    except Exception, err:
260
      if retry_count == 0:
261
        Log("Non-idempotent %s failed, aborting" % (msg, ))
262
        raise
263
      elif retry_count == 1:
264
        Log("Idempotent %s repeated failure, aborting" % (msg, ))
265
        raise
266
      else:
267
        Log("Idempotent %s failed, retry #%d/%d: %s" %
268
            (msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err))
269
        self.MaybeRetry(retry_count - 1, msg, fn, *args)
270

    
271
  def _ExecOp(self, *ops):
272
    """Execute one or more opcodes and manage the exec buffer.
273

    
274
    @result: if only opcode has been passed, we return its result;
275
        otherwise we return the list of results
276

    
277
    """
278
    job_id = cli.SendJob(ops, cl=self.cl)
279
    results = cli.PollJob(job_id, cl=self.cl, feedback_fn=self.Feedback)
280
    if len(ops) == 1:
281
      return results[0]
282
    else:
283
      return results
284

    
285
  def ExecOp(self, retry, *ops):
286
    """Execute one or more opcodes and manage the exec buffer.
287

    
288
    @result: if only opcode has been passed, we return its result;
289
        otherwise we return the list of results
290

    
291
    """
292
    if retry:
293
      rval = MAX_RETRIES
294
    else:
295
      rval = 0
296
    return self.MaybeRetry(rval, "opcode", self._ExecOp, *ops)
297

    
298
  def ExecOrQueue(self, name, *ops):
299
    """Execute an opcode and manage the exec buffer."""
300
    if self.opts.parallel:
301
      self.queued_ops.append((ops, name))
302
    else:
303
      return self.ExecOp(self.queue_retry, *ops)
304

    
305
  def StartBatch(self, retry):
306
    """Start a new batch of jobs.
307

    
308
    @param retry: whether this is a retryable batch
309

    
310
    """
311
    self.queued_ops = []
312
    self.queue_retry = retry
313

    
314
  def CommitQueue(self):
315
    """Execute all submitted opcodes in case of parallel burnin"""
316
    if not self.opts.parallel:
317
      return
318

    
319
    if self.queue_retry:
320
      rval = MAX_RETRIES
321
    else:
322
      rval = 0
323

    
324
    try:
325
      results = self.MaybeRetry(rval, "jobset", self.ExecJobSet,
326
                                self.queued_ops)
327
    finally:
328
      self.queued_ops = []
329
    return results
330

    
331
  def ExecJobSet(self, jobs):
332
    """Execute a set of jobs and return once all are done.
333

    
334
    The method will return the list of results, if all jobs are
335
    successful. Otherwise, OpExecError will be raised from within
336
    cli.py.
337

    
338
    """
339
    self.ClearFeedbackBuf()
340
    job_ids = [cli.SendJob(row[0], cl=self.cl) for row in jobs]
341
    Log("Submitted job ID(s) %s" % utils.CommaJoin(job_ids), indent=1)
342
    results = []
343
    for jid, (_, iname) in zip(job_ids, jobs):
344
      Log("waiting for job %s for %s" % (jid, iname), indent=2)
345
      try:
346
        results.append(cli.PollJob(jid, cl=self.cl, feedback_fn=self.Feedback))
347
      except Exception, err:
348
        Log("Job for %s failed: %s" % (iname, err))
349
    if len(results) != len(jobs):
350
      raise BurninFailure()
351
    return results
352

    
353
  def _DoCheckInstances(fn):
354
    """Decorator for checking instances.
355

    
356
    """
357
    def wrapper(self, *args, **kwargs):
358
      val = fn(self, *args, **kwargs)
359
      for instance in self.instances:
360
        self._CheckInstanceAlive(instance)
361
      return val
362

    
363
    return wrapper
364

    
365
  def _DoBatch(retry):
366
    """Decorator for possible batch operations.
367

    
368
    Must come after the _DoCheckInstances decorator (if any).
369

    
370
    @param retry: whether this is a retryable batch, will be
371
        passed to StartBatch
372

    
373
    """
374
    def wrap(fn):
375
      def batched(self, *args, **kwargs):
376
        self.StartBatch(retry)
377
        val = fn(self, *args, **kwargs)
378
        self.CommitQueue()
379
        return val
380
      return batched
381

    
382
    return wrap
383

    
384
  def ParseOptions(self):
385
    """Parses the command line options.
386

    
387
    In case of command line errors, it will show the usage and exit the
388
    program.
389

    
390
    """
391
    parser = optparse.OptionParser(usage="\n%s" % USAGE,
392
                                   version=("%%prog (ganeti) %s" %
393
                                            constants.RELEASE_VERSION),
394
                                   option_list=OPTIONS)
395

    
396
    options, args = parser.parse_args()
397
    if len(args) < 1 or options.os is None:
398
      Usage()
399

    
400
    supported_disk_templates = (constants.DT_DISKLESS,
401
                                constants.DT_FILE,
402
                                constants.DT_PLAIN,
403
                                constants.DT_DRBD8)
404
    if options.disk_template not in supported_disk_templates:
405
      Err("Unknown disk template '%s'" % options.disk_template)
406

    
407
    if options.disk_template == constants.DT_DISKLESS:
408
      disk_size = disk_growth = []
409
      options.do_addremove_disks = False
410
    else:
411
      disk_size = [utils.ParseUnit(v) for v in options.disk_size.split(",")]
412
      disk_growth = [utils.ParseUnit(v)
413
                     for v in options.disk_growth.split(",")]
414
      if len(disk_growth) != len(disk_size):
415
        Err("Wrong disk sizes/growth combination")
416
    if ((disk_size and options.disk_template == constants.DT_DISKLESS) or
417
        (not disk_size and options.disk_template != constants.DT_DISKLESS)):
418
      Err("Wrong disk count/disk template combination")
419

    
420
    self.disk_size = disk_size
421
    self.disk_growth = disk_growth
422
    self.disk_count = len(disk_size)
423

    
424
    if options.nodes and options.iallocator:
425
      Err("Give either the nodes option or the iallocator option, not both")
426

    
427
    if options.http_check and not options.name_check:
428
      Err("Can't enable HTTP checks without name checks")
429

    
430
    self.opts = options
431
    self.instances = args
432
    self.bep = {
433
      constants.BE_MEMORY: options.mem_size,
434
      constants.BE_VCPUS: 1,
435
      }
436
    self.hvp = {}
437

    
438
    socket.setdefaulttimeout(options.net_timeout)
439

    
440
  def GetState(self):
441
    """Read the cluster state from the config."""
442
    if self.opts.nodes:
443
      names = self.opts.nodes.split(",")
444
    else:
445
      names = []
446
    try:
447
      op = opcodes.OpQueryNodes(output_fields=["name", "offline", "drained"],
448
                                names=names, use_locking=True)
449
      result = self.ExecOp(True, op)
450
    except errors.GenericError, err:
451
      err_code, msg = cli.FormatError(err)
452
      Err(msg, exit_code=err_code)
453
    self.nodes = [data[0] for data in result if not (data[1] or data[2])]
454

    
455
    op_diagnose = opcodes.OpDiagnoseOS(output_fields=["name", "valid",
456
                                                      "variants"], names=[])
457
    result = self.ExecOp(True, op_diagnose)
458

    
459
    if not result:
460
      Err("Can't get the OS list")
461

    
462
    found = False
463
    for (name, valid, variants) in result:
464
      if valid and self.opts.os in cli.CalculateOSNames(name, variants):
465
        found = True
466
        break
467

    
468
    if not found:
469
      Err("OS '%s' not found" % self.opts.os)
470

    
471
  @_DoCheckInstances
472
  @_DoBatch(False)
473
  def BurnCreateInstances(self):
474
    """Create the given instances.
475

    
476
    """
477
    self.to_rem = []
478
    mytor = izip(cycle(self.nodes),
479
                 islice(cycle(self.nodes), 1, None),
480
                 self.instances)
481

    
482
    Log("Creating instances")
483
    for pnode, snode, instance in mytor:
484
      Log("instance %s" % instance, indent=1)
485
      if self.opts.iallocator:
486
        pnode = snode = None
487
        msg = "with iallocator %s" % self.opts.iallocator
488
      elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
489
        snode = None
490
        msg = "on %s" % pnode
491
      else:
492
        msg = "on %s, %s" % (pnode, snode)
493

    
494
      Log(msg, indent=2)
495

    
496
      op = opcodes.OpCreateInstance(instance_name=instance,
497
                                    disks = [ {"size": size}
498
                                              for size in self.disk_size],
499
                                    disk_template=self.opts.disk_template,
500
                                    nics=self.opts.nics,
501
                                    mode=constants.INSTANCE_CREATE,
502
                                    os_type=self.opts.os,
503
                                    pnode=pnode,
504
                                    snode=snode,
505
                                    start=True,
506
                                    ip_check=self.opts.ip_check,
507
                                    name_check=self.opts.name_check,
508
                                    wait_for_sync=True,
509
                                    file_driver="loop",
510
                                    file_storage_dir=None,
511
                                    iallocator=self.opts.iallocator,
512
                                    beparams=self.bep,
513
                                    hvparams=self.hvp,
514
                                    )
515

    
516
      self.ExecOrQueue(instance, op)
517
      self.to_rem.append(instance)
518

    
519
  @_DoBatch(False)
520
  def BurnGrowDisks(self):
521
    """Grow both the os and the swap disks by the requested amount, if any."""
522
    Log("Growing disks")
523
    for instance in self.instances:
524
      Log("instance %s" % instance, indent=1)
525
      for idx, growth in enumerate(self.disk_growth):
526
        if growth > 0:
527
          op = opcodes.OpGrowDisk(instance_name=instance, disk=idx,
528
                                  amount=growth, wait_for_sync=True)
529
          Log("increase disk/%s by %s MB" % (idx, growth), indent=2)
530
          self.ExecOrQueue(instance, op)
531

    
532
  @_DoBatch(True)
533
  def BurnReplaceDisks1D8(self):
534
    """Replace disks on primary and secondary for drbd8."""
535
    Log("Replacing disks on the same nodes")
536
    for instance in self.instances:
537
      Log("instance %s" % instance, indent=1)
538
      ops = []
539
      for mode in constants.REPLACE_DISK_SEC, constants.REPLACE_DISK_PRI:
540
        op = opcodes.OpReplaceDisks(instance_name=instance,
541
                                    mode=mode,
542
                                    disks=[i for i in range(self.disk_count)])
543
        Log("run %s" % mode, indent=2)
544
        ops.append(op)
545
      self.ExecOrQueue(instance, *ops)
546

    
547
  @_DoBatch(True)
548
  def BurnReplaceDisks2(self):
549
    """Replace secondary node."""
550
    Log("Changing the secondary node")
551
    mode = constants.REPLACE_DISK_CHG
552

    
553
    mytor = izip(islice(cycle(self.nodes), 2, None),
554
                 self.instances)
555
    for tnode, instance in mytor:
556
      Log("instance %s" % instance, indent=1)
557
      if self.opts.iallocator:
558
        tnode = None
559
        msg = "with iallocator %s" % self.opts.iallocator
560
      else:
561
        msg = tnode
562
      op = opcodes.OpReplaceDisks(instance_name=instance,
563
                                  mode=mode,
564
                                  remote_node=tnode,
565
                                  iallocator=self.opts.iallocator,
566
                                  disks=[])
567
      Log("run %s %s" % (mode, msg), indent=2)
568
      self.ExecOrQueue(instance, op)
569

    
570
  @_DoCheckInstances
571
  @_DoBatch(False)
572
  def BurnFailover(self):
573
    """Failover the instances."""
574
    Log("Failing over instances")
575
    for instance in self.instances:
576
      Log("instance %s" % instance, indent=1)
577
      op = opcodes.OpFailoverInstance(instance_name=instance,
578
                                      ignore_consistency=False)
579
      self.ExecOrQueue(instance, op)
580

    
581
  @_DoCheckInstances
582
  @_DoBatch(False)
583
  def BurnMove(self):
584
    """Move the instances."""
585
    Log("Moving instances")
586
    mytor = izip(islice(cycle(self.nodes), 1, None),
587
                 self.instances)
588
    for tnode, instance in mytor:
589
      Log("instance %s" % instance, indent=1)
590
      op = opcodes.OpMoveInstance(instance_name=instance,
591
                                  target_node=tnode)
592
      self.ExecOrQueue(instance, op)
593

    
594
  @_DoBatch(False)
595
  def BurnMigrate(self):
596
    """Migrate the instances."""
597
    Log("Migrating instances")
598
    for instance in self.instances:
599
      Log("instance %s" % instance, indent=1)
600
      op1 = opcodes.OpMigrateInstance(instance_name=instance, live=True,
601
                                      cleanup=False)
602

    
603
      op2 = opcodes.OpMigrateInstance(instance_name=instance, live=True,
604
                                      cleanup=True)
605
      Log("migration and migration cleanup", indent=2)
606
      self.ExecOrQueue(instance, op1, op2)
607

    
608
  @_DoCheckInstances
609
  @_DoBatch(False)
610
  def BurnImportExport(self):
611
    """Export the instance, delete it, and import it back.
612

    
613
    """
614
    Log("Exporting and re-importing instances")
615
    mytor = izip(cycle(self.nodes),
616
                 islice(cycle(self.nodes), 1, None),
617
                 islice(cycle(self.nodes), 2, None),
618
                 self.instances)
619

    
620
    for pnode, snode, enode, instance in mytor:
621
      Log("instance %s" % instance, indent=1)
622
      # read the full name of the instance
623
      nam_op = opcodes.OpQueryInstances(output_fields=["name"],
624
                                        names=[instance], use_locking=True)
625
      full_name = self.ExecOp(False, nam_op)[0][0]
626

    
627
      if self.opts.iallocator:
628
        pnode = snode = None
629
        import_log_msg = ("import from %s"
630
                          " with iallocator %s" %
631
                          (enode, self.opts.iallocator))
632
      elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
633
        snode = None
634
        import_log_msg = ("import from %s to %s" %
635
                          (enode, pnode))
636
      else:
637
        import_log_msg = ("import from %s to %s, %s" %
638
                          (enode, pnode, snode))
639

    
640
      exp_op = opcodes.OpExportInstance(instance_name=instance,
641
                                           target_node=enode,
642
                                           shutdown=True)
643
      rem_op = opcodes.OpRemoveInstance(instance_name=instance,
644
                                        ignore_failures=True)
645
      imp_dir = os.path.join(constants.EXPORT_DIR, full_name)
646
      imp_op = opcodes.OpCreateInstance(instance_name=instance,
647
                                        disks = [ {"size": size}
648
                                                  for size in self.disk_size],
649
                                        disk_template=self.opts.disk_template,
650
                                        nics=self.opts.nics,
651
                                        mode=constants.INSTANCE_IMPORT,
652
                                        src_node=enode,
653
                                        src_path=imp_dir,
654
                                        pnode=pnode,
655
                                        snode=snode,
656
                                        start=True,
657
                                        ip_check=self.opts.ip_check,
658
                                        name_check=self.opts.name_check,
659
                                        wait_for_sync=True,
660
                                        file_storage_dir=None,
661
                                        file_driver="loop",
662
                                        iallocator=self.opts.iallocator,
663
                                        beparams=self.bep,
664
                                        hvparams=self.hvp,
665
                                        )
666

    
667
      erem_op = opcodes.OpRemoveExport(instance_name=instance)
668

    
669
      Log("export to node %s" % enode, indent=2)
670
      Log("remove instance", indent=2)
671
      Log(import_log_msg, indent=2)
672
      Log("remove export", indent=2)
673
      self.ExecOrQueue(instance, exp_op, rem_op, imp_op, erem_op)
674

    
675
  def StopInstanceOp(self, instance):
676
    """Stop given instance."""
677
    return opcodes.OpShutdownInstance(instance_name=instance)
678

    
679
  def StartInstanceOp(self, instance):
680
    """Start given instance."""
681
    return opcodes.OpStartupInstance(instance_name=instance, force=False)
682

    
683
  def RenameInstanceOp(self, instance, instance_new):
684
    """Rename instance."""
685
    return opcodes.OpRenameInstance(instance_name=instance,
686
                                    new_name=instance_new)
687

    
688
  @_DoCheckInstances
689
  @_DoBatch(True)
690
  def BurnStopStart(self):
691
    """Stop/start the instances."""
692
    Log("Stopping and starting instances")
693
    for instance in self.instances:
694
      Log("instance %s" % instance, indent=1)
695
      op1 = self.StopInstanceOp(instance)
696
      op2 = self.StartInstanceOp(instance)
697
      self.ExecOrQueue(instance, op1, op2)
698

    
699
  @_DoBatch(False)
700
  def BurnRemove(self):
701
    """Remove the instances."""
702
    Log("Removing instances")
703
    for instance in self.to_rem:
704
      Log("instance %s" % instance, indent=1)
705
      op = opcodes.OpRemoveInstance(instance_name=instance,
706
                                    ignore_failures=True)
707
      self.ExecOrQueue(instance, op)
708

    
709
  def BurnRename(self):
710
    """Rename the instances.
711

    
712
    Note that this function will not execute in parallel, since we
713
    only have one target for rename.
714

    
715
    """
716
    Log("Renaming instances")
717
    rename = self.opts.rename
718
    for instance in self.instances:
719
      Log("instance %s" % instance, indent=1)
720
      op_stop1 = self.StopInstanceOp(instance)
721
      op_stop2 = self.StopInstanceOp(rename)
722
      op_rename1 = self.RenameInstanceOp(instance, rename)
723
      op_rename2 = self.RenameInstanceOp(rename, instance)
724
      op_start1 = self.StartInstanceOp(rename)
725
      op_start2 = self.StartInstanceOp(instance)
726
      self.ExecOp(False, op_stop1, op_rename1, op_start1)
727
      self._CheckInstanceAlive(rename)
728
      self.ExecOp(False, op_stop2, op_rename2, op_start2)
729
      self._CheckInstanceAlive(instance)
730

    
731
  @_DoCheckInstances
732
  @_DoBatch(True)
733
  def BurnReinstall(self):
734
    """Reinstall the instances."""
735
    Log("Reinstalling instances")
736
    for instance in self.instances:
737
      Log("instance %s" % instance, indent=1)
738
      op1 = self.StopInstanceOp(instance)
739
      op2 = opcodes.OpReinstallInstance(instance_name=instance)
740
      Log("reinstall without passing the OS", indent=2)
741
      op3 = opcodes.OpReinstallInstance(instance_name=instance,
742
                                        os_type=self.opts.os)
743
      Log("reinstall specifying the OS", indent=2)
744
      op4 = self.StartInstanceOp(instance)
745
      self.ExecOrQueue(instance, op1, op2, op3, op4)
746

    
747
  @_DoCheckInstances
748
  @_DoBatch(True)
749
  def BurnReboot(self):
750
    """Reboot the instances."""
751
    Log("Rebooting instances")
752
    for instance in self.instances:
753
      Log("instance %s" % instance, indent=1)
754
      ops = []
755
      for reboot_type in constants.REBOOT_TYPES:
756
        op = opcodes.OpRebootInstance(instance_name=instance,
757
                                      reboot_type=reboot_type,
758
                                      ignore_secondaries=False)
759
        Log("reboot with type '%s'" % reboot_type, indent=2)
760
        ops.append(op)
761
      self.ExecOrQueue(instance, *ops)
762

    
763
  @_DoCheckInstances
764
  @_DoBatch(True)
765
  def BurnActivateDisks(self):
766
    """Activate and deactivate disks of the instances."""
767
    Log("Activating/deactivating disks")
768
    for instance in self.instances:
769
      Log("instance %s" % instance, indent=1)
770
      op_start = self.StartInstanceOp(instance)
771
      op_act = opcodes.OpActivateInstanceDisks(instance_name=instance)
772
      op_deact = opcodes.OpDeactivateInstanceDisks(instance_name=instance)
773
      op_stop = self.StopInstanceOp(instance)
774
      Log("activate disks when online", indent=2)
775
      Log("activate disks when offline", indent=2)
776
      Log("deactivate disks (when offline)", indent=2)
777
      self.ExecOrQueue(instance, op_act, op_stop, op_act, op_deact, op_start)
778

    
779
  @_DoCheckInstances
780
  @_DoBatch(False)
781
  def BurnAddRemoveDisks(self):
782
    """Add and remove an extra disk for the instances."""
783
    Log("Adding and removing disks")
784
    for instance in self.instances:
785
      Log("instance %s" % instance, indent=1)
786
      op_add = opcodes.OpSetInstanceParams(\
787
        instance_name=instance,
788
        disks=[(constants.DDM_ADD, {"size": self.disk_size[0]})])
789
      op_rem = opcodes.OpSetInstanceParams(\
790
        instance_name=instance, disks=[(constants.DDM_REMOVE, {})])
791
      op_stop = self.StopInstanceOp(instance)
792
      op_start = self.StartInstanceOp(instance)
793
      Log("adding a disk", indent=2)
794
      Log("removing last disk", indent=2)
795
      self.ExecOrQueue(instance, op_add, op_stop, op_rem, op_start)
796

    
797
  @_DoBatch(False)
798
  def BurnAddRemoveNICs(self):
799
    """Add and remove an extra NIC for the instances."""
800
    Log("Adding and removing NICs")
801
    for instance in self.instances:
802
      Log("instance %s" % instance, indent=1)
803
      op_add = opcodes.OpSetInstanceParams(\
804
        instance_name=instance, nics=[(constants.DDM_ADD, {})])
805
      op_rem = opcodes.OpSetInstanceParams(\
806
        instance_name=instance, nics=[(constants.DDM_REMOVE, {})])
807
      Log("adding a NIC", indent=2)
808
      Log("removing last NIC", indent=2)
809
      self.ExecOrQueue(instance, op_add, op_rem)
810

    
811
  def _CheckInstanceAlive(self, instance):
812
    """Check if an instance is alive by doing http checks.
813

    
814
    This will try to retrieve the url on the instance /hostname.txt
815
    and check that it contains the hostname of the instance. In case
816
    we get ECONNREFUSED, we retry up to the net timeout seconds, for
817
    any other error we abort.
818

    
819
    """
820
    if not self.opts.http_check:
821
      return
822
    end_time = time.time() + self.opts.net_timeout
823
    url = None
824
    while time.time() < end_time and url is None:
825
      try:
826
        url = self.url_opener.open("http://%s/hostname.txt" % instance)
827
      except IOError:
828
        # here we can have connection refused, no route to host, etc.
829
        time.sleep(1)
830
    if url is None:
831
      raise InstanceDown(instance, "Cannot contact instance")
832
    hostname = url.read().strip()
833
    url.close()
834
    if hostname != instance:
835
      raise InstanceDown(instance, ("Hostname mismatch, expected %s, got %s" %
836
                                    (instance, hostname)))
837

    
838
  def BurninCluster(self):
839
    """Test a cluster intensively.
840

    
841
    This will create instances and then start/stop/failover them.
842
    It is safe for existing instances but could impact performance.
843

    
844
    """
845

    
846
    opts = self.opts
847

    
848
    Log("Testing global parameters")
849

    
850
    if (len(self.nodes) == 1 and
851
        opts.disk_template not in (constants.DT_DISKLESS, constants.DT_PLAIN,
852
                                   constants.DT_FILE)):
853
      Err("When one node is available/selected the disk template must"
854
          " be 'diskless', 'file' or 'plain'")
855

    
856
    has_err = True
857
    try:
858
      self.BurnCreateInstances()
859
      if opts.do_replace1 and opts.disk_template in constants.DTS_NET_MIRROR:
860
        self.BurnReplaceDisks1D8()
861
      if (opts.do_replace2 and len(self.nodes) > 2 and
862
          opts.disk_template in constants.DTS_NET_MIRROR) :
863
        self.BurnReplaceDisks2()
864

    
865
      if (opts.disk_template != constants.DT_DISKLESS and
866
          utils.any(self.disk_growth, lambda n: n > 0)):
867
        self.BurnGrowDisks()
868

    
869
      if opts.do_failover and opts.disk_template in constants.DTS_NET_MIRROR:
870
        self.BurnFailover()
871

    
872
      if opts.do_migrate and opts.disk_template == constants.DT_DRBD8:
873
        self.BurnMigrate()
874

    
875
      if (opts.do_move and len(self.nodes) > 1 and
876
          opts.disk_template in [constants.DT_PLAIN, constants.DT_FILE]):
877
        self.BurnMove()
878

    
879
      if (opts.do_importexport and
880
          opts.disk_template not in (constants.DT_DISKLESS,
881
                                     constants.DT_FILE)):
882
        self.BurnImportExport()
883

    
884
      if opts.do_reinstall:
885
        self.BurnReinstall()
886

    
887
      if opts.do_reboot:
888
        self.BurnReboot()
889

    
890
      if opts.do_addremove_disks:
891
        self.BurnAddRemoveDisks()
892

    
893
      if opts.do_addremove_nics:
894
        self.BurnAddRemoveNICs()
895

    
896
      if opts.do_activate_disks:
897
        self.BurnActivateDisks()
898

    
899
      if opts.rename:
900
        self.BurnRename()
901

    
902
      if opts.do_startstop:
903
        self.BurnStopStart()
904

    
905
      has_err = False
906
    finally:
907
      if has_err:
908
        Log("Error detected: opcode buffer follows:\n\n")
909
        Log(self.GetFeedbackBuf())
910
        Log("\n\n")
911
      if not self.opts.keep_instances:
912
        try:
913
          self.BurnRemove()
914
        except Exception, err:
915
          if has_err: # already detected errors, so errors in removal
916
                      # are quite expected
917
            Log("Note: error detected during instance remove: %s" % str(err))
918
          else: # non-expected error
919
            raise
920

    
921
    return 0
922

    
923

    
924
def main():
925
  """Main function"""
926

    
927
  burner = Burner()
928
  return burner.BurninCluster()
929

    
930

    
931
if __name__ == "__main__":
932
  main()