Statistics
| Branch: | Tag: | Revision:

root / tools / burnin @ 7260cfbe

History | View | Annotate | Download (32.9 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Burnin program
23

    
24
"""
25

    
26
import os
27
import sys
28
import optparse
29
import time
30
import socket
31
import urllib
32
from itertools import izip, islice, cycle
33
from cStringIO import StringIO
34

    
35
from ganeti import opcodes
36
from ganeti import constants
37
from ganeti import cli
38
from ganeti import errors
39
from ganeti import utils
40

    
41

    
42
USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...")
43

    
44
MAX_RETRIES = 3
45

    
46
class InstanceDown(Exception):
47
  """The checked instance was not up"""
48

    
49

    
50
class BurninFailure(Exception):
51
  """Failure detected during burning"""
52

    
53

    
54
def Usage():
55
  """Shows program usage information and exits the program."""
56

    
57
  print >> sys.stderr, "Usage:"
58
  print >> sys.stderr, USAGE
59
  sys.exit(2)
60

    
61

    
62
def Log(msg, indent=0):
63
  """Simple function that prints out its argument.
64

    
65
  """
66
  headers = {
67
    0: "- ",
68
    1: "* ",
69
    2: ""
70
    }
71
  sys.stdout.write("%*s%s%s\n" % (2*indent, "",
72
                                   headers.get(indent, "  "), msg))
73
  sys.stdout.flush()
74

    
75
def Err(msg, exit_code=1):
76
  """Simple error logging that prints to stderr.
77

    
78
  """
79
  sys.stderr.write(msg + "\n")
80
  sys.stderr.flush()
81
  sys.exit(exit_code)
82

    
83

    
84
class SimpleOpener(urllib.FancyURLopener):
85
  """A simple url opener"""
86
  # pylint: disable-msg=W0221
87

    
88
  def prompt_user_passwd(self, host, realm, clear_cache=0):
89
    """No-interaction version of prompt_user_passwd."""
90
    return None, None
91

    
92
  def http_error_default(self, url, fp, errcode, errmsg, headers):
93
    """Custom error handling"""
94
    # make sure sockets are not left in CLOSE_WAIT, this is similar
95
    # but with a different exception to the BasicURLOpener class
96
    _ = fp.read() # throw away data
97
    fp.close()
98
    raise InstanceDown("HTTP error returned: code %s, msg %s" %
99
                       (errcode, errmsg))
100

    
101

    
102
OPTIONS = [
103
  cli.cli_option("-o", "--os", dest="os", default=None,
104
                 help="OS to use during burnin",
105
                 metavar="<OS>",
106
                 completion_suggest=cli.OPT_COMPL_ONE_OS),
107
  cli.cli_option("--disk-size", dest="disk_size",
108
                 help="Disk size (determines disk count)",
109
                 default="128m", type="string", metavar="<size,size,...>",
110
                 completion_suggest=("128M 512M 1G 4G 1G,256M"
111
                                     " 4G,1G,1G 10G").split()),
112
  cli.cli_option("--disk-growth", dest="disk_growth", help="Disk growth",
113
                 default="128m", type="string", metavar="<size,size,...>"),
114
  cli.cli_option("--mem-size", dest="mem_size", help="Memory size",
115
                 default=128, type="unit", metavar="<size>",
116
                 completion_suggest=("128M 256M 512M 1G 4G 8G"
117
                                     " 12G 16G").split()),
118
  cli.VERBOSE_OPT,
119
  cli.NOIPCHECK_OPT,
120
  cli.NONAMECHECK_OPT,
121
  cli.cli_option("--no-replace1", dest="do_replace1",
122
                 help="Skip disk replacement with the same secondary",
123
                 action="store_false", default=True),
124
  cli.cli_option("--no-replace2", dest="do_replace2",
125
                 help="Skip disk replacement with a different secondary",
126
                 action="store_false", default=True),
127
  cli.cli_option("--no-failover", dest="do_failover",
128
                 help="Skip instance failovers", action="store_false",
129
                 default=True),
130
  cli.cli_option("--no-migrate", dest="do_migrate",
131
                 help="Skip instance live migration",
132
                 action="store_false", default=True),
133
  cli.cli_option("--no-move", dest="do_move",
134
                 help="Skip instance moves", action="store_false",
135
                 default=True),
136
  cli.cli_option("--no-importexport", dest="do_importexport",
137
                 help="Skip instance export/import", action="store_false",
138
                 default=True),
139
  cli.cli_option("--no-startstop", dest="do_startstop",
140
                 help="Skip instance stop/start", action="store_false",
141
                 default=True),
142
  cli.cli_option("--no-reinstall", dest="do_reinstall",
143
                 help="Skip instance reinstall", action="store_false",
144
                 default=True),
145
  cli.cli_option("--no-reboot", dest="do_reboot",
146
                 help="Skip instance reboot", action="store_false",
147
                 default=True),
148
  cli.cli_option("--no-activate-disks", dest="do_activate_disks",
149
                 help="Skip disk activation/deactivation",
150
                 action="store_false", default=True),
151
  cli.cli_option("--no-add-disks", dest="do_addremove_disks",
152
                 help="Skip disk addition/removal",
153
                 action="store_false", default=True),
154
  cli.cli_option("--no-add-nics", dest="do_addremove_nics",
155
                 help="Skip NIC addition/removal",
156
                 action="store_false", default=True),
157
  cli.cli_option("--no-nics", dest="nics",
158
                 help="No network interfaces", action="store_const",
159
                 const=[], default=[{}]),
160
  cli.cli_option("--rename", dest="rename", default=None,
161
                 help=("Give one unused instance name which is taken"
162
                       " to start the renaming sequence"),
163
                 metavar="<instance_name>"),
164
  cli.cli_option("-t", "--disk-template", dest="disk_template",
165
                 choices=list(constants.DISK_TEMPLATES),
166
                 default=constants.DT_DRBD8,
167
                 help="Disk template (diskless, file, plain or drbd) [drbd]"),
168
  cli.cli_option("-n", "--nodes", dest="nodes", default="",
169
                 help=("Comma separated list of nodes to perform"
170
                       " the burnin on (defaults to all nodes)"),
171
                 completion_suggest=cli.OPT_COMPL_MANY_NODES),
172
  cli.cli_option("-I", "--iallocator", dest="iallocator",
173
                 default=None, type="string",
174
                 help=("Perform the allocation using an iallocator"
175
                       " instead of fixed node spread (node restrictions no"
176
                       " longer apply, therefore -n/--nodes must not be"
177
                       " used"),
178
                 completion_suggest=cli.OPT_COMPL_ONE_IALLOCATOR),
179
  cli.cli_option("-p", "--parallel", default=False, action="store_true",
180
                 dest="parallel",
181
                 help=("Enable parallelization of some operations in"
182
                       " order to speed burnin or to test granular locking")),
183
  cli.cli_option("--net-timeout", default=15, type="int",
184
                 dest="net_timeout",
185
                 help=("The instance check network timeout in seconds"
186
                       " (defaults to 15 seconds)"),
187
                 completion_suggest="15 60 300 900".split()),
188
  cli.cli_option("-C", "--http-check", default=False, action="store_true",
189
                 dest="http_check",
190
                 help=("Enable checking of instance status via http,"
191
                       " looking for /hostname.txt that should contain the"
192
                       " name of the instance")),
193
  cli.cli_option("-K", "--keep-instances", default=False,
194
                 action="store_true",
195
                 dest="keep_instances",
196
                 help=("Leave instances on the cluster after burnin,"
197
                       " for investigation in case of errors or simply"
198
                       " to use them")),
199
  ]
200

    
201
# Mainly used for bash completion
202
ARGUMENTS = [cli.ArgInstance(min=1)]
203

    
204

    
205
def _DoCheckInstances(fn):
206
  """Decorator for checking instances.
207

    
208
  """
209
  def wrapper(self, *args, **kwargs):
210
    val = fn(self, *args, **kwargs)
211
    for instance in self.instances:
212
      self._CheckInstanceAlive(instance) # pylint: disable-msg=W0212
213
    return val
214

    
215
  return wrapper
216

    
217

    
218
def _DoBatch(retry):
219
  """Decorator for possible batch operations.
220

    
221
  Must come after the _DoCheckInstances decorator (if any).
222

    
223
  @param retry: whether this is a retryable batch, will be
224
      passed to StartBatch
225

    
226
  """
227
  def wrap(fn):
228
    def batched(self, *args, **kwargs):
229
      self.StartBatch(retry)
230
      val = fn(self, *args, **kwargs)
231
      self.CommitQueue()
232
      return val
233
    return batched
234

    
235
  return wrap
236

    
237

    
238
class Burner(object):
239
  """Burner class."""
240

    
241
  def __init__(self):
242
    """Constructor."""
243
    utils.SetupLogging(constants.LOG_BURNIN, debug=False, stderr_logging=True)
244
    self.url_opener = SimpleOpener()
245
    self._feed_buf = StringIO()
246
    self.nodes = []
247
    self.instances = []
248
    self.to_rem = []
249
    self.queued_ops = []
250
    self.opts = None
251
    self.queue_retry = False
252
    self.disk_count = self.disk_growth = self.disk_size = None
253
    self.hvp = self.bep = None
254
    self.ParseOptions()
255
    self.cl = cli.GetClient()
256
    self.GetState()
257

    
258
  def ClearFeedbackBuf(self):
259
    """Clear the feedback buffer."""
260
    self._feed_buf.truncate(0)
261

    
262
  def GetFeedbackBuf(self):
263
    """Return the contents of the buffer."""
264
    return self._feed_buf.getvalue()
265

    
266
  def Feedback(self, msg):
267
    """Acumulate feedback in our buffer."""
268
    formatted_msg = "%s %s" % (time.ctime(utils.MergeTime(msg[0])), msg[2])
269
    self._feed_buf.write(formatted_msg + "\n")
270
    if self.opts.verbose:
271
      Log(formatted_msg, indent=3)
272

    
273
  def MaybeRetry(self, retry_count, msg, fn, *args):
274
    """Possibly retry a given function execution.
275

    
276
    @type retry_count: int
277
    @param retry_count: retry counter:
278
        - 0: non-retryable action
279
        - 1: last retry for a retryable action
280
        - MAX_RETRIES: original try for a retryable action
281
    @type msg: str
282
    @param msg: the kind of the operation
283
    @type fn: callable
284
    @param fn: the function to be called
285

    
286
    """
287
    try:
288
      val = fn(*args)
289
      if retry_count > 0 and retry_count < MAX_RETRIES:
290
        Log("Idempotent %s succeeded after %d retries" %
291
            (msg, MAX_RETRIES - retry_count))
292
      return val
293
    except Exception, err: # pylint: disable-msg=W0703
294
      if retry_count == 0:
295
        Log("Non-idempotent %s failed, aborting" % (msg, ))
296
        raise
297
      elif retry_count == 1:
298
        Log("Idempotent %s repeated failure, aborting" % (msg, ))
299
        raise
300
      else:
301
        Log("Idempotent %s failed, retry #%d/%d: %s" %
302
            (msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err))
303
        self.MaybeRetry(retry_count - 1, msg, fn, *args)
304

    
305
  def _ExecOp(self, *ops):
306
    """Execute one or more opcodes and manage the exec buffer.
307

    
308
    @result: if only opcode has been passed, we return its result;
309
        otherwise we return the list of results
310

    
311
    """
312
    job_id = cli.SendJob(ops, cl=self.cl)
313
    results = cli.PollJob(job_id, cl=self.cl, feedback_fn=self.Feedback)
314
    if len(ops) == 1:
315
      return results[0]
316
    else:
317
      return results
318

    
319
  def ExecOp(self, retry, *ops):
320
    """Execute one or more opcodes and manage the exec buffer.
321

    
322
    @result: if only opcode has been passed, we return its result;
323
        otherwise we return the list of results
324

    
325
    """
326
    if retry:
327
      rval = MAX_RETRIES
328
    else:
329
      rval = 0
330
    return self.MaybeRetry(rval, "opcode", self._ExecOp, *ops)
331

    
332
  def ExecOrQueue(self, name, *ops):
333
    """Execute an opcode and manage the exec buffer."""
334
    if self.opts.parallel:
335
      self.queued_ops.append((ops, name))
336
    else:
337
      return self.ExecOp(self.queue_retry, *ops)
338

    
339
  def StartBatch(self, retry):
340
    """Start a new batch of jobs.
341

    
342
    @param retry: whether this is a retryable batch
343

    
344
    """
345
    self.queued_ops = []
346
    self.queue_retry = retry
347

    
348
  def CommitQueue(self):
349
    """Execute all submitted opcodes in case of parallel burnin"""
350
    if not self.opts.parallel:
351
      return
352

    
353
    if self.queue_retry:
354
      rval = MAX_RETRIES
355
    else:
356
      rval = 0
357

    
358
    try:
359
      results = self.MaybeRetry(rval, "jobset", self.ExecJobSet,
360
                                self.queued_ops)
361
    finally:
362
      self.queued_ops = []
363
    return results
364

    
365
  def ExecJobSet(self, jobs):
366
    """Execute a set of jobs and return once all are done.
367

    
368
    The method will return the list of results, if all jobs are
369
    successful. Otherwise, OpExecError will be raised from within
370
    cli.py.
371

    
372
    """
373
    self.ClearFeedbackBuf()
374
    job_ids = [cli.SendJob(row[0], cl=self.cl) for row in jobs]
375
    Log("Submitted job ID(s) %s" % utils.CommaJoin(job_ids), indent=1)
376
    results = []
377
    for jid, (_, iname) in zip(job_ids, jobs):
378
      Log("waiting for job %s for %s" % (jid, iname), indent=2)
379
      try:
380
        results.append(cli.PollJob(jid, cl=self.cl, feedback_fn=self.Feedback))
381
      except Exception, err: # pylint: disable-msg=W0703
382
        Log("Job for %s failed: %s" % (iname, err))
383
    if len(results) != len(jobs):
384
      raise BurninFailure()
385
    return results
386

    
387
  def ParseOptions(self):
388
    """Parses the command line options.
389

    
390
    In case of command line errors, it will show the usage and exit the
391
    program.
392

    
393
    """
394
    parser = optparse.OptionParser(usage="\n%s" % USAGE,
395
                                   version=("%%prog (ganeti) %s" %
396
                                            constants.RELEASE_VERSION),
397
                                   option_list=OPTIONS)
398

    
399
    options, args = parser.parse_args()
400
    if len(args) < 1 or options.os is None:
401
      Usage()
402

    
403
    supported_disk_templates = (constants.DT_DISKLESS,
404
                                constants.DT_FILE,
405
                                constants.DT_PLAIN,
406
                                constants.DT_DRBD8)
407
    if options.disk_template not in supported_disk_templates:
408
      Err("Unknown disk template '%s'" % options.disk_template)
409

    
410
    if options.disk_template == constants.DT_DISKLESS:
411
      disk_size = disk_growth = []
412
      options.do_addremove_disks = False
413
    else:
414
      disk_size = [utils.ParseUnit(v) for v in options.disk_size.split(",")]
415
      disk_growth = [utils.ParseUnit(v)
416
                     for v in options.disk_growth.split(",")]
417
      if len(disk_growth) != len(disk_size):
418
        Err("Wrong disk sizes/growth combination")
419
    if ((disk_size and options.disk_template == constants.DT_DISKLESS) or
420
        (not disk_size and options.disk_template != constants.DT_DISKLESS)):
421
      Err("Wrong disk count/disk template combination")
422

    
423
    self.disk_size = disk_size
424
    self.disk_growth = disk_growth
425
    self.disk_count = len(disk_size)
426

    
427
    if options.nodes and options.iallocator:
428
      Err("Give either the nodes option or the iallocator option, not both")
429

    
430
    if options.http_check and not options.name_check:
431
      Err("Can't enable HTTP checks without name checks")
432

    
433
    self.opts = options
434
    self.instances = args
435
    self.bep = {
436
      constants.BE_MEMORY: options.mem_size,
437
      constants.BE_VCPUS: 1,
438
      }
439
    self.hvp = {}
440

    
441
    socket.setdefaulttimeout(options.net_timeout)
442

    
443
  def GetState(self):
444
    """Read the cluster state from the config."""
445
    if self.opts.nodes:
446
      names = self.opts.nodes.split(",")
447
    else:
448
      names = []
449
    try:
450
      op = opcodes.OpQueryNodes(output_fields=["name", "offline", "drained"],
451
                                names=names, use_locking=True)
452
      result = self.ExecOp(True, op)
453
    except errors.GenericError, err:
454
      err_code, msg = cli.FormatError(err)
455
      Err(msg, exit_code=err_code)
456
    self.nodes = [data[0] for data in result if not (data[1] or data[2])]
457

    
458
    op_diagnose = opcodes.OpDiagnoseOS(output_fields=["name", "valid",
459
                                                      "variants"], names=[])
460
    result = self.ExecOp(True, op_diagnose)
461

    
462
    if not result:
463
      Err("Can't get the OS list")
464

    
465
    found = False
466
    for (name, valid, variants) in result:
467
      if valid and self.opts.os in cli.CalculateOSNames(name, variants):
468
        found = True
469
        break
470

    
471
    if not found:
472
      Err("OS '%s' not found" % self.opts.os)
473

    
474
  @_DoCheckInstances
475
  @_DoBatch(False)
476
  def BurnCreateInstances(self):
477
    """Create the given instances.
478

    
479
    """
480
    self.to_rem = []
481
    mytor = izip(cycle(self.nodes),
482
                 islice(cycle(self.nodes), 1, None),
483
                 self.instances)
484

    
485
    Log("Creating instances")
486
    for pnode, snode, instance in mytor:
487
      Log("instance %s" % instance, indent=1)
488
      if self.opts.iallocator:
489
        pnode = snode = None
490
        msg = "with iallocator %s" % self.opts.iallocator
491
      elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
492
        snode = None
493
        msg = "on %s" % pnode
494
      else:
495
        msg = "on %s, %s" % (pnode, snode)
496

    
497
      Log(msg, indent=2)
498

    
499
      op = opcodes.OpCreateInstance(instance_name=instance,
500
                                    disks = [ {"size": size}
501
                                              for size in self.disk_size],
502
                                    disk_template=self.opts.disk_template,
503
                                    nics=self.opts.nics,
504
                                    mode=constants.INSTANCE_CREATE,
505
                                    os_type=self.opts.os,
506
                                    pnode=pnode,
507
                                    snode=snode,
508
                                    start=True,
509
                                    ip_check=self.opts.ip_check,
510
                                    name_check=self.opts.name_check,
511
                                    wait_for_sync=True,
512
                                    file_driver="loop",
513
                                    file_storage_dir=None,
514
                                    iallocator=self.opts.iallocator,
515
                                    beparams=self.bep,
516
                                    hvparams=self.hvp,
517
                                    )
518

    
519
      self.ExecOrQueue(instance, op)
520
      self.to_rem.append(instance)
521

    
522
  @_DoBatch(False)
523
  def BurnGrowDisks(self):
524
    """Grow both the os and the swap disks by the requested amount, if any."""
525
    Log("Growing disks")
526
    for instance in self.instances:
527
      Log("instance %s" % instance, indent=1)
528
      for idx, growth in enumerate(self.disk_growth):
529
        if growth > 0:
530
          op = opcodes.OpGrowDisk(instance_name=instance, disk=idx,
531
                                  amount=growth, wait_for_sync=True)
532
          Log("increase disk/%s by %s MB" % (idx, growth), indent=2)
533
          self.ExecOrQueue(instance, op)
534

    
535
  @_DoBatch(True)
536
  def BurnReplaceDisks1D8(self):
537
    """Replace disks on primary and secondary for drbd8."""
538
    Log("Replacing disks on the same nodes")
539
    for instance in self.instances:
540
      Log("instance %s" % instance, indent=1)
541
      ops = []
542
      for mode in constants.REPLACE_DISK_SEC, constants.REPLACE_DISK_PRI:
543
        op = opcodes.OpReplaceDisks(instance_name=instance,
544
                                    mode=mode,
545
                                    disks=[i for i in range(self.disk_count)])
546
        Log("run %s" % mode, indent=2)
547
        ops.append(op)
548
      self.ExecOrQueue(instance, *ops) # pylint: disable-msg=W0142
549

    
550
  @_DoBatch(True)
551
  def BurnReplaceDisks2(self):
552
    """Replace secondary node."""
553
    Log("Changing the secondary node")
554
    mode = constants.REPLACE_DISK_CHG
555

    
556
    mytor = izip(islice(cycle(self.nodes), 2, None),
557
                 self.instances)
558
    for tnode, instance in mytor:
559
      Log("instance %s" % instance, indent=1)
560
      if self.opts.iallocator:
561
        tnode = None
562
        msg = "with iallocator %s" % self.opts.iallocator
563
      else:
564
        msg = tnode
565
      op = opcodes.OpReplaceDisks(instance_name=instance,
566
                                  mode=mode,
567
                                  remote_node=tnode,
568
                                  iallocator=self.opts.iallocator,
569
                                  disks=[])
570
      Log("run %s %s" % (mode, msg), indent=2)
571
      self.ExecOrQueue(instance, op)
572

    
573
  @_DoCheckInstances
574
  @_DoBatch(False)
575
  def BurnFailover(self):
576
    """Failover the instances."""
577
    Log("Failing over instances")
578
    for instance in self.instances:
579
      Log("instance %s" % instance, indent=1)
580
      op = opcodes.OpFailoverInstance(instance_name=instance,
581
                                      ignore_consistency=False)
582
      self.ExecOrQueue(instance, op)
583

    
584
  @_DoCheckInstances
585
  @_DoBatch(False)
586
  def BurnMove(self):
587
    """Move the instances."""
588
    Log("Moving instances")
589
    mytor = izip(islice(cycle(self.nodes), 1, None),
590
                 self.instances)
591
    for tnode, instance in mytor:
592
      Log("instance %s" % instance, indent=1)
593
      op = opcodes.OpMoveInstance(instance_name=instance,
594
                                  target_node=tnode)
595
      self.ExecOrQueue(instance, op)
596

    
597
  @_DoBatch(False)
598
  def BurnMigrate(self):
599
    """Migrate the instances."""
600
    Log("Migrating instances")
601
    for instance in self.instances:
602
      Log("instance %s" % instance, indent=1)
603
      op1 = opcodes.OpMigrateInstance(instance_name=instance, live=True,
604
                                      cleanup=False)
605

    
606
      op2 = opcodes.OpMigrateInstance(instance_name=instance, live=True,
607
                                      cleanup=True)
608
      Log("migration and migration cleanup", indent=2)
609
      self.ExecOrQueue(instance, op1, op2)
610

    
611
  @_DoCheckInstances
612
  @_DoBatch(False)
613
  def BurnImportExport(self):
614
    """Export the instance, delete it, and import it back.
615

    
616
    """
617
    Log("Exporting and re-importing instances")
618
    mytor = izip(cycle(self.nodes),
619
                 islice(cycle(self.nodes), 1, None),
620
                 islice(cycle(self.nodes), 2, None),
621
                 self.instances)
622

    
623
    for pnode, snode, enode, instance in mytor:
624
      Log("instance %s" % instance, indent=1)
625
      # read the full name of the instance
626
      nam_op = opcodes.OpQueryInstances(output_fields=["name"],
627
                                        names=[instance], use_locking=True)
628
      full_name = self.ExecOp(False, nam_op)[0][0]
629

    
630
      if self.opts.iallocator:
631
        pnode = snode = None
632
        import_log_msg = ("import from %s"
633
                          " with iallocator %s" %
634
                          (enode, self.opts.iallocator))
635
      elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
636
        snode = None
637
        import_log_msg = ("import from %s to %s" %
638
                          (enode, pnode))
639
      else:
640
        import_log_msg = ("import from %s to %s, %s" %
641
                          (enode, pnode, snode))
642

    
643
      exp_op = opcodes.OpExportInstance(instance_name=instance,
644
                                           target_node=enode,
645
                                           shutdown=True)
646
      rem_op = opcodes.OpRemoveInstance(instance_name=instance,
647
                                        ignore_failures=True)
648
      imp_dir = os.path.join(constants.EXPORT_DIR, full_name)
649
      imp_op = opcodes.OpCreateInstance(instance_name=instance,
650
                                        disks = [ {"size": size}
651
                                                  for size in self.disk_size],
652
                                        disk_template=self.opts.disk_template,
653
                                        nics=self.opts.nics,
654
                                        mode=constants.INSTANCE_IMPORT,
655
                                        src_node=enode,
656
                                        src_path=imp_dir,
657
                                        pnode=pnode,
658
                                        snode=snode,
659
                                        start=True,
660
                                        ip_check=self.opts.ip_check,
661
                                        name_check=self.opts.name_check,
662
                                        wait_for_sync=True,
663
                                        file_storage_dir=None,
664
                                        file_driver="loop",
665
                                        iallocator=self.opts.iallocator,
666
                                        beparams=self.bep,
667
                                        hvparams=self.hvp,
668
                                        )
669

    
670
      erem_op = opcodes.OpRemoveExport(instance_name=instance)
671

    
672
      Log("export to node %s" % enode, indent=2)
673
      Log("remove instance", indent=2)
674
      Log(import_log_msg, indent=2)
675
      Log("remove export", indent=2)
676
      self.ExecOrQueue(instance, exp_op, rem_op, imp_op, erem_op)
677

    
678
  def StopInstanceOp(self, instance):
679
    """Stop given instance."""
680
    return opcodes.OpShutdownInstance(instance_name=instance)
681

    
682
  def StartInstanceOp(self, instance):
683
    """Start given instance."""
684
    return opcodes.OpStartupInstance(instance_name=instance, force=False)
685

    
686
  def RenameInstanceOp(self, instance, instance_new):
687
    """Rename instance."""
688
    return opcodes.OpRenameInstance(instance_name=instance,
689
                                    new_name=instance_new)
690

    
691
  @_DoCheckInstances
692
  @_DoBatch(True)
693
  def BurnStopStart(self):
694
    """Stop/start the instances."""
695
    Log("Stopping and starting instances")
696
    for instance in self.instances:
697
      Log("instance %s" % instance, indent=1)
698
      op1 = self.StopInstanceOp(instance)
699
      op2 = self.StartInstanceOp(instance)
700
      self.ExecOrQueue(instance, op1, op2)
701

    
702
  @_DoBatch(False)
703
  def BurnRemove(self):
704
    """Remove the instances."""
705
    Log("Removing instances")
706
    for instance in self.to_rem:
707
      Log("instance %s" % instance, indent=1)
708
      op = opcodes.OpRemoveInstance(instance_name=instance,
709
                                    ignore_failures=True)
710
      self.ExecOrQueue(instance, op)
711

    
712
  def BurnRename(self):
713
    """Rename the instances.
714

    
715
    Note that this function will not execute in parallel, since we
716
    only have one target for rename.
717

    
718
    """
719
    Log("Renaming instances")
720
    rename = self.opts.rename
721
    for instance in self.instances:
722
      Log("instance %s" % instance, indent=1)
723
      op_stop1 = self.StopInstanceOp(instance)
724
      op_stop2 = self.StopInstanceOp(rename)
725
      op_rename1 = self.RenameInstanceOp(instance, rename)
726
      op_rename2 = self.RenameInstanceOp(rename, instance)
727
      op_start1 = self.StartInstanceOp(rename)
728
      op_start2 = self.StartInstanceOp(instance)
729
      self.ExecOp(False, op_stop1, op_rename1, op_start1)
730
      self._CheckInstanceAlive(rename)
731
      self.ExecOp(False, op_stop2, op_rename2, op_start2)
732
      self._CheckInstanceAlive(instance)
733

    
734
  @_DoCheckInstances
735
  @_DoBatch(True)
736
  def BurnReinstall(self):
737
    """Reinstall the instances."""
738
    Log("Reinstalling instances")
739
    for instance in self.instances:
740
      Log("instance %s" % instance, indent=1)
741
      op1 = self.StopInstanceOp(instance)
742
      op2 = opcodes.OpReinstallInstance(instance_name=instance)
743
      Log("reinstall without passing the OS", indent=2)
744
      op3 = opcodes.OpReinstallInstance(instance_name=instance,
745
                                        os_type=self.opts.os)
746
      Log("reinstall specifying the OS", indent=2)
747
      op4 = self.StartInstanceOp(instance)
748
      self.ExecOrQueue(instance, op1, op2, op3, op4)
749

    
750
  @_DoCheckInstances
751
  @_DoBatch(True)
752
  def BurnReboot(self):
753
    """Reboot the instances."""
754
    Log("Rebooting instances")
755
    for instance in self.instances:
756
      Log("instance %s" % instance, indent=1)
757
      ops = []
758
      for reboot_type in constants.REBOOT_TYPES:
759
        op = opcodes.OpRebootInstance(instance_name=instance,
760
                                      reboot_type=reboot_type,
761
                                      ignore_secondaries=False)
762
        Log("reboot with type '%s'" % reboot_type, indent=2)
763
        ops.append(op)
764
      self.ExecOrQueue(instance, *ops) # pylint: disable-msg=W0142
765

    
766
  @_DoCheckInstances
767
  @_DoBatch(True)
768
  def BurnActivateDisks(self):
769
    """Activate and deactivate disks of the instances."""
770
    Log("Activating/deactivating disks")
771
    for instance in self.instances:
772
      Log("instance %s" % instance, indent=1)
773
      op_start = self.StartInstanceOp(instance)
774
      op_act = opcodes.OpActivateInstanceDisks(instance_name=instance)
775
      op_deact = opcodes.OpDeactivateInstanceDisks(instance_name=instance)
776
      op_stop = self.StopInstanceOp(instance)
777
      Log("activate disks when online", indent=2)
778
      Log("activate disks when offline", indent=2)
779
      Log("deactivate disks (when offline)", indent=2)
780
      self.ExecOrQueue(instance, op_act, op_stop, op_act, op_deact, op_start)
781

    
782
  @_DoCheckInstances
783
  @_DoBatch(False)
784
  def BurnAddRemoveDisks(self):
785
    """Add and remove an extra disk for the instances."""
786
    Log("Adding and removing disks")
787
    for instance in self.instances:
788
      Log("instance %s" % instance, indent=1)
789
      op_add = opcodes.OpSetInstanceParams(\
790
        instance_name=instance,
791
        disks=[(constants.DDM_ADD, {"size": self.disk_size[0]})])
792
      op_rem = opcodes.OpSetInstanceParams(\
793
        instance_name=instance, disks=[(constants.DDM_REMOVE, {})])
794
      op_stop = self.StopInstanceOp(instance)
795
      op_start = self.StartInstanceOp(instance)
796
      Log("adding a disk", indent=2)
797
      Log("removing last disk", indent=2)
798
      self.ExecOrQueue(instance, op_add, op_stop, op_rem, op_start)
799

    
800
  @_DoBatch(False)
801
  def BurnAddRemoveNICs(self):
802
    """Add and remove an extra NIC for the instances."""
803
    Log("Adding and removing NICs")
804
    for instance in self.instances:
805
      Log("instance %s" % instance, indent=1)
806
      op_add = opcodes.OpSetInstanceParams(\
807
        instance_name=instance, nics=[(constants.DDM_ADD, {})])
808
      op_rem = opcodes.OpSetInstanceParams(\
809
        instance_name=instance, nics=[(constants.DDM_REMOVE, {})])
810
      Log("adding a NIC", indent=2)
811
      Log("removing last NIC", indent=2)
812
      self.ExecOrQueue(instance, op_add, op_rem)
813

    
814
  def _CheckInstanceAlive(self, instance):
815
    """Check if an instance is alive by doing http checks.
816

    
817
    This will try to retrieve the url on the instance /hostname.txt
818
    and check that it contains the hostname of the instance. In case
819
    we get ECONNREFUSED, we retry up to the net timeout seconds, for
820
    any other error we abort.
821

    
822
    """
823
    if not self.opts.http_check:
824
      return
825
    end_time = time.time() + self.opts.net_timeout
826
    url = None
827
    while time.time() < end_time and url is None:
828
      try:
829
        url = self.url_opener.open("http://%s/hostname.txt" % instance)
830
      except IOError:
831
        # here we can have connection refused, no route to host, etc.
832
        time.sleep(1)
833
    if url is None:
834
      raise InstanceDown(instance, "Cannot contact instance")
835
    hostname = url.read().strip()
836
    url.close()
837
    if hostname != instance:
838
      raise InstanceDown(instance, ("Hostname mismatch, expected %s, got %s" %
839
                                    (instance, hostname)))
840

    
841
  def BurninCluster(self):
842
    """Test a cluster intensively.
843

    
844
    This will create instances and then start/stop/failover them.
845
    It is safe for existing instances but could impact performance.
846

    
847
    """
848

    
849
    opts = self.opts
850

    
851
    Log("Testing global parameters")
852

    
853
    if (len(self.nodes) == 1 and
854
        opts.disk_template not in (constants.DT_DISKLESS, constants.DT_PLAIN,
855
                                   constants.DT_FILE)):
856
      Err("When one node is available/selected the disk template must"
857
          " be 'diskless', 'file' or 'plain'")
858

    
859
    has_err = True
860
    try:
861
      self.BurnCreateInstances()
862
      if opts.do_replace1 and opts.disk_template in constants.DTS_NET_MIRROR:
863
        self.BurnReplaceDisks1D8()
864
      if (opts.do_replace2 and len(self.nodes) > 2 and
865
          opts.disk_template in constants.DTS_NET_MIRROR) :
866
        self.BurnReplaceDisks2()
867

    
868
      if (opts.disk_template != constants.DT_DISKLESS and
869
          utils.any(self.disk_growth, lambda n: n > 0)):
870
        self.BurnGrowDisks()
871

    
872
      if opts.do_failover and opts.disk_template in constants.DTS_NET_MIRROR:
873
        self.BurnFailover()
874

    
875
      if opts.do_migrate and opts.disk_template == constants.DT_DRBD8:
876
        self.BurnMigrate()
877

    
878
      if (opts.do_move and len(self.nodes) > 1 and
879
          opts.disk_template in [constants.DT_PLAIN, constants.DT_FILE]):
880
        self.BurnMove()
881

    
882
      if (opts.do_importexport and
883
          opts.disk_template not in (constants.DT_DISKLESS,
884
                                     constants.DT_FILE)):
885
        self.BurnImportExport()
886

    
887
      if opts.do_reinstall:
888
        self.BurnReinstall()
889

    
890
      if opts.do_reboot:
891
        self.BurnReboot()
892

    
893
      if opts.do_addremove_disks:
894
        self.BurnAddRemoveDisks()
895

    
896
      if opts.do_addremove_nics:
897
        self.BurnAddRemoveNICs()
898

    
899
      if opts.do_activate_disks:
900
        self.BurnActivateDisks()
901

    
902
      if opts.rename:
903
        self.BurnRename()
904

    
905
      if opts.do_startstop:
906
        self.BurnStopStart()
907

    
908
      has_err = False
909
    finally:
910
      if has_err:
911
        Log("Error detected: opcode buffer follows:\n\n")
912
        Log(self.GetFeedbackBuf())
913
        Log("\n\n")
914
      if not self.opts.keep_instances:
915
        try:
916
          self.BurnRemove()
917
        except Exception, err:  # pylint: disable-msg=W0703
918
          if has_err: # already detected errors, so errors in removal
919
                      # are quite expected
920
            Log("Note: error detected during instance remove: %s" % str(err))
921
          else: # non-expected error
922
            raise
923

    
924
    return 0
925

    
926

    
927
def main():
928
  """Main function"""
929

    
930
  burner = Burner()
931
  return burner.BurninCluster()
932

    
933

    
934
if __name__ == "__main__":
935
  main()