Statistics
| Branch: | Tag: | Revision:

root / tools / burnin @ 1b334175

History | View | Annotate | Download (33.3 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Burnin program
23

    
24
"""
25

    
26
import sys
27
import optparse
28
import time
29
import socket
30
import urllib
31
from itertools import izip, islice, cycle
32
from cStringIO import StringIO
33

    
34
from ganeti import opcodes
35
from ganeti import constants
36
from ganeti import cli
37
from ganeti import errors
38
from ganeti import utils
39

    
40

    
41
USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...")
42

    
43
MAX_RETRIES = 3
44
LOG_HEADERS = {
45
  0: "- ",
46
  1: "* ",
47
  2: ""
48
  }
49

    
50
class InstanceDown(Exception):
51
  """The checked instance was not up"""
52

    
53

    
54
class BurninFailure(Exception):
55
  """Failure detected during burning"""
56

    
57

    
58
def Usage():
59
  """Shows program usage information and exits the program."""
60

    
61
  print >> sys.stderr, "Usage:"
62
  print >> sys.stderr, USAGE
63
  sys.exit(2)
64

    
65

    
66
def Log(msg, *args, **kwargs):
67
  """Simple function that prints out its argument.
68

    
69
  """
70
  if args:
71
    msg = msg % args
72
  indent = kwargs.get('indent', 0)
73
  sys.stdout.write("%*s%s%s\n" % (2*indent, "",
74
                                  LOG_HEADERS.get(indent, "  "), msg))
75
  sys.stdout.flush()
76

    
77

    
78
def Err(msg, exit_code=1):
79
  """Simple error logging that prints to stderr.
80

    
81
  """
82
  sys.stderr.write(msg + "\n")
83
  sys.stderr.flush()
84
  sys.exit(exit_code)
85

    
86

    
87
class SimpleOpener(urllib.FancyURLopener):
88
  """A simple url opener"""
89
  # pylint: disable-msg=W0221
90

    
91
  def prompt_user_passwd(self, host, realm, clear_cache=0):
92
    """No-interaction version of prompt_user_passwd."""
93
    # we follow parent class' API
94
    # pylint: disable-msg=W0613
95
    return None, None
96

    
97
  def http_error_default(self, url, fp, errcode, errmsg, headers):
98
    """Custom error handling"""
99
    # make sure sockets are not left in CLOSE_WAIT, this is similar
100
    # but with a different exception to the BasicURLOpener class
101
    _ = fp.read() # throw away data
102
    fp.close()
103
    raise InstanceDown("HTTP error returned: code %s, msg %s" %
104
                       (errcode, errmsg))
105

    
106

    
107
OPTIONS = [
108
  cli.cli_option("-o", "--os", dest="os", default=None,
109
                 help="OS to use during burnin",
110
                 metavar="<OS>",
111
                 completion_suggest=cli.OPT_COMPL_ONE_OS),
112
  cli.cli_option("--disk-size", dest="disk_size",
113
                 help="Disk size (determines disk count)",
114
                 default="128m", type="string", metavar="<size,size,...>",
115
                 completion_suggest=("128M 512M 1G 4G 1G,256M"
116
                                     " 4G,1G,1G 10G").split()),
117
  cli.cli_option("--disk-growth", dest="disk_growth", help="Disk growth",
118
                 default="128m", type="string", metavar="<size,size,...>"),
119
  cli.cli_option("--mem-size", dest="mem_size", help="Memory size",
120
                 default=128, type="unit", metavar="<size>",
121
                 completion_suggest=("128M 256M 512M 1G 4G 8G"
122
                                     " 12G 16G").split()),
123
  cli.DEBUG_OPT,
124
  cli.VERBOSE_OPT,
125
  cli.NOIPCHECK_OPT,
126
  cli.NONAMECHECK_OPT,
127
  cli.EARLY_RELEASE_OPT,
128
  cli.cli_option("--no-replace1", dest="do_replace1",
129
                 help="Skip disk replacement with the same secondary",
130
                 action="store_false", default=True),
131
  cli.cli_option("--no-replace2", dest="do_replace2",
132
                 help="Skip disk replacement with a different secondary",
133
                 action="store_false", default=True),
134
  cli.cli_option("--no-failover", dest="do_failover",
135
                 help="Skip instance failovers", action="store_false",
136
                 default=True),
137
  cli.cli_option("--no-migrate", dest="do_migrate",
138
                 help="Skip instance live migration",
139
                 action="store_false", default=True),
140
  cli.cli_option("--no-move", dest="do_move",
141
                 help="Skip instance moves", action="store_false",
142
                 default=True),
143
  cli.cli_option("--no-importexport", dest="do_importexport",
144
                 help="Skip instance export/import", action="store_false",
145
                 default=True),
146
  cli.cli_option("--no-startstop", dest="do_startstop",
147
                 help="Skip instance stop/start", action="store_false",
148
                 default=True),
149
  cli.cli_option("--no-reinstall", dest="do_reinstall",
150
                 help="Skip instance reinstall", action="store_false",
151
                 default=True),
152
  cli.cli_option("--no-reboot", dest="do_reboot",
153
                 help="Skip instance reboot", action="store_false",
154
                 default=True),
155
  cli.cli_option("--no-activate-disks", dest="do_activate_disks",
156
                 help="Skip disk activation/deactivation",
157
                 action="store_false", default=True),
158
  cli.cli_option("--no-add-disks", dest="do_addremove_disks",
159
                 help="Skip disk addition/removal",
160
                 action="store_false", default=True),
161
  cli.cli_option("--no-add-nics", dest="do_addremove_nics",
162
                 help="Skip NIC addition/removal",
163
                 action="store_false", default=True),
164
  cli.cli_option("--no-nics", dest="nics",
165
                 help="No network interfaces", action="store_const",
166
                 const=[], default=[{}]),
167
  cli.cli_option("--rename", dest="rename", default=None,
168
                 help=("Give one unused instance name which is taken"
169
                       " to start the renaming sequence"),
170
                 metavar="<instance_name>"),
171
  cli.cli_option("-t", "--disk-template", dest="disk_template",
172
                 choices=list(constants.DISK_TEMPLATES),
173
                 default=constants.DT_DRBD8,
174
                 help="Disk template (diskless, file, plain or drbd) [drbd]"),
175
  cli.cli_option("-n", "--nodes", dest="nodes", default="",
176
                 help=("Comma separated list of nodes to perform"
177
                       " the burnin on (defaults to all nodes)"),
178
                 completion_suggest=cli.OPT_COMPL_MANY_NODES),
179
  cli.cli_option("-I", "--iallocator", dest="iallocator",
180
                 default=None, type="string",
181
                 help=("Perform the allocation using an iallocator"
182
                       " instead of fixed node spread (node restrictions no"
183
                       " longer apply, therefore -n/--nodes must not be"
184
                       " used"),
185
                 completion_suggest=cli.OPT_COMPL_ONE_IALLOCATOR),
186
  cli.cli_option("-p", "--parallel", default=False, action="store_true",
187
                 dest="parallel",
188
                 help=("Enable parallelization of some operations in"
189
                       " order to speed burnin or to test granular locking")),
190
  cli.cli_option("--net-timeout", default=15, type="int",
191
                 dest="net_timeout",
192
                 help=("The instance check network timeout in seconds"
193
                       " (defaults to 15 seconds)"),
194
                 completion_suggest="15 60 300 900".split()),
195
  cli.cli_option("-C", "--http-check", default=False, action="store_true",
196
                 dest="http_check",
197
                 help=("Enable checking of instance status via http,"
198
                       " looking for /hostname.txt that should contain the"
199
                       " name of the instance")),
200
  cli.cli_option("-K", "--keep-instances", default=False,
201
                 action="store_true",
202
                 dest="keep_instances",
203
                 help=("Leave instances on the cluster after burnin,"
204
                       " for investigation in case of errors or simply"
205
                       " to use them")),
206
  ]
207

    
208
# Mainly used for bash completion
209
ARGUMENTS = [cli.ArgInstance(min=1)]
210

    
211

    
212
def _DoCheckInstances(fn):
213
  """Decorator for checking instances.
214

    
215
  """
216
  def wrapper(self, *args, **kwargs):
217
    val = fn(self, *args, **kwargs)
218
    for instance in self.instances:
219
      self._CheckInstanceAlive(instance) # pylint: disable-msg=W0212
220
    return val
221

    
222
  return wrapper
223

    
224

    
225
def _DoBatch(retry):
226
  """Decorator for possible batch operations.
227

    
228
  Must come after the _DoCheckInstances decorator (if any).
229

    
230
  @param retry: whether this is a retryable batch, will be
231
      passed to StartBatch
232

    
233
  """
234
  def wrap(fn):
235
    def batched(self, *args, **kwargs):
236
      self.StartBatch(retry)
237
      val = fn(self, *args, **kwargs)
238
      self.CommitQueue()
239
      return val
240
    return batched
241

    
242
  return wrap
243

    
244

    
245
class Burner(object):
246
  """Burner class."""
247

    
248
  def __init__(self):
249
    """Constructor."""
250
    utils.SetupLogging(constants.LOG_BURNIN, debug=False, stderr_logging=True)
251
    self.url_opener = SimpleOpener()
252
    self._feed_buf = StringIO()
253
    self.nodes = []
254
    self.instances = []
255
    self.to_rem = []
256
    self.queued_ops = []
257
    self.opts = None
258
    self.queue_retry = False
259
    self.disk_count = self.disk_growth = self.disk_size = None
260
    self.hvp = self.bep = None
261
    self.ParseOptions()
262
    self.cl = cli.GetClient()
263
    self.GetState()
264

    
265
  def ClearFeedbackBuf(self):
266
    """Clear the feedback buffer."""
267
    self._feed_buf.truncate(0)
268

    
269
  def GetFeedbackBuf(self):
270
    """Return the contents of the buffer."""
271
    return self._feed_buf.getvalue()
272

    
273
  def Feedback(self, msg):
274
    """Acumulate feedback in our buffer."""
275
    formatted_msg = "%s %s" % (time.ctime(utils.MergeTime(msg[0])), msg[2])
276
    self._feed_buf.write(formatted_msg + "\n")
277
    if self.opts.verbose:
278
      Log(formatted_msg, indent=3)
279

    
280
  def MaybeRetry(self, retry_count, msg, fn, *args):
281
    """Possibly retry a given function execution.
282

    
283
    @type retry_count: int
284
    @param retry_count: retry counter:
285
        - 0: non-retryable action
286
        - 1: last retry for a retryable action
287
        - MAX_RETRIES: original try for a retryable action
288
    @type msg: str
289
    @param msg: the kind of the operation
290
    @type fn: callable
291
    @param fn: the function to be called
292

    
293
    """
294
    try:
295
      val = fn(*args)
296
      if retry_count > 0 and retry_count < MAX_RETRIES:
297
        Log("Idempotent %s succeeded after %d retries",
298
            msg, MAX_RETRIES - retry_count)
299
      return val
300
    except Exception, err: # pylint: disable-msg=W0703
301
      if retry_count == 0:
302
        Log("Non-idempotent %s failed, aborting", msg)
303
        raise
304
      elif retry_count == 1:
305
        Log("Idempotent %s repeated failure, aborting", msg)
306
        raise
307
      else:
308
        Log("Idempotent %s failed, retry #%d/%d: %s",
309
            msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err)
310
        self.MaybeRetry(retry_count - 1, msg, fn, *args)
311

    
312
  def _SetDebug(self, ops):
313
    """Set the debug value on the given opcodes"""
314
    for op in ops:
315
      op.debug_level = self.opts.debug
316

    
317
  def _ExecOp(self, *ops):
318
    """Execute one or more opcodes and manage the exec buffer.
319

    
320
    @result: if only opcode has been passed, we return its result;
321
        otherwise we return the list of results
322

    
323
    """
324
    job_id = cli.SendJob(ops, cl=self.cl)
325
    results = cli.PollJob(job_id, cl=self.cl, feedback_fn=self.Feedback)
326
    if len(ops) == 1:
327
      return results[0]
328
    else:
329
      return results
330

    
331
  def ExecOp(self, retry, *ops):
332
    """Execute one or more opcodes and manage the exec buffer.
333

    
334
    @result: if only opcode has been passed, we return its result;
335
        otherwise we return the list of results
336

    
337
    """
338
    if retry:
339
      rval = MAX_RETRIES
340
    else:
341
      rval = 0
342
    self._SetDebug(ops)
343
    return self.MaybeRetry(rval, "opcode", self._ExecOp, *ops)
344

    
345
  def ExecOrQueue(self, name, *ops):
346
    """Execute an opcode and manage the exec buffer."""
347
    if self.opts.parallel:
348
      self._SetDebug(ops)
349
      self.queued_ops.append((ops, name))
350
    else:
351
      return self.ExecOp(self.queue_retry, *ops)
352

    
353
  def StartBatch(self, retry):
354
    """Start a new batch of jobs.
355

    
356
    @param retry: whether this is a retryable batch
357

    
358
    """
359
    self.queued_ops = []
360
    self.queue_retry = retry
361

    
362
  def CommitQueue(self):
363
    """Execute all submitted opcodes in case of parallel burnin"""
364
    if not self.opts.parallel:
365
      return
366

    
367
    if self.queue_retry:
368
      rval = MAX_RETRIES
369
    else:
370
      rval = 0
371

    
372
    try:
373
      results = self.MaybeRetry(rval, "jobset", self.ExecJobSet,
374
                                self.queued_ops)
375
    finally:
376
      self.queued_ops = []
377
    return results
378

    
379
  def ExecJobSet(self, jobs):
380
    """Execute a set of jobs and return once all are done.
381

    
382
    The method will return the list of results, if all jobs are
383
    successful. Otherwise, OpExecError will be raised from within
384
    cli.py.
385

    
386
    """
387
    self.ClearFeedbackBuf()
388
    job_ids = [cli.SendJob(row[0], cl=self.cl) for row in jobs]
389
    Log("Submitted job ID(s) %s", utils.CommaJoin(job_ids), indent=1)
390
    results = []
391
    for jid, (_, iname) in zip(job_ids, jobs):
392
      Log("waiting for job %s for %s", jid, iname, indent=2)
393
      try:
394
        results.append(cli.PollJob(jid, cl=self.cl, feedback_fn=self.Feedback))
395
      except Exception, err: # pylint: disable-msg=W0703
396
        Log("Job for %s failed: %s", iname, err)
397
    if len(results) != len(jobs):
398
      raise BurninFailure()
399
    return results
400

    
401
  def ParseOptions(self):
402
    """Parses the command line options.
403

    
404
    In case of command line errors, it will show the usage and exit the
405
    program.
406

    
407
    """
408
    parser = optparse.OptionParser(usage="\n%s" % USAGE,
409
                                   version=("%%prog (ganeti) %s" %
410
                                            constants.RELEASE_VERSION),
411
                                   option_list=OPTIONS)
412

    
413
    options, args = parser.parse_args()
414
    if len(args) < 1 or options.os is None:
415
      Usage()
416

    
417
    supported_disk_templates = (constants.DT_DISKLESS,
418
                                constants.DT_FILE,
419
                                constants.DT_PLAIN,
420
                                constants.DT_DRBD8)
421
    if options.disk_template not in supported_disk_templates:
422
      Err("Unknown disk template '%s'" % options.disk_template)
423

    
424
    if options.disk_template == constants.DT_DISKLESS:
425
      disk_size = disk_growth = []
426
      options.do_addremove_disks = False
427
    else:
428
      disk_size = [utils.ParseUnit(v) for v in options.disk_size.split(",")]
429
      disk_growth = [utils.ParseUnit(v)
430
                     for v in options.disk_growth.split(",")]
431
      if len(disk_growth) != len(disk_size):
432
        Err("Wrong disk sizes/growth combination")
433
    if ((disk_size and options.disk_template == constants.DT_DISKLESS) or
434
        (not disk_size and options.disk_template != constants.DT_DISKLESS)):
435
      Err("Wrong disk count/disk template combination")
436

    
437
    self.disk_size = disk_size
438
    self.disk_growth = disk_growth
439
    self.disk_count = len(disk_size)
440

    
441
    if options.nodes and options.iallocator:
442
      Err("Give either the nodes option or the iallocator option, not both")
443

    
444
    if options.http_check and not options.name_check:
445
      Err("Can't enable HTTP checks without name checks")
446

    
447
    self.opts = options
448
    self.instances = args
449
    self.bep = {
450
      constants.BE_MEMORY: options.mem_size,
451
      constants.BE_VCPUS: 1,
452
      }
453
    self.hvp = {}
454

    
455
    socket.setdefaulttimeout(options.net_timeout)
456

    
457
  def GetState(self):
458
    """Read the cluster state from the config."""
459
    if self.opts.nodes:
460
      names = self.opts.nodes.split(",")
461
    else:
462
      names = []
463
    try:
464
      op = opcodes.OpQueryNodes(output_fields=["name", "offline", "drained"],
465
                                names=names, use_locking=True)
466
      result = self.ExecOp(True, op)
467
    except errors.GenericError, err:
468
      err_code, msg = cli.FormatError(err)
469
      Err(msg, exit_code=err_code)
470
    self.nodes = [data[0] for data in result if not (data[1] or data[2])]
471

    
472
    op_diagnose = opcodes.OpDiagnoseOS(output_fields=["name", "valid",
473
                                                      "variants"], names=[])
474
    result = self.ExecOp(True, op_diagnose)
475

    
476
    if not result:
477
      Err("Can't get the OS list")
478

    
479
    found = False
480
    for (name, valid, variants) in result:
481
      if valid and self.opts.os in cli.CalculateOSNames(name, variants):
482
        found = True
483
        break
484

    
485
    if not found:
486
      Err("OS '%s' not found" % self.opts.os)
487

    
488
  @_DoCheckInstances
489
  @_DoBatch(False)
490
  def BurnCreateInstances(self):
491
    """Create the given instances.
492

    
493
    """
494
    self.to_rem = []
495
    mytor = izip(cycle(self.nodes),
496
                 islice(cycle(self.nodes), 1, None),
497
                 self.instances)
498

    
499
    Log("Creating instances")
500
    for pnode, snode, instance in mytor:
501
      Log("instance %s", instance, indent=1)
502
      if self.opts.iallocator:
503
        pnode = snode = None
504
        msg = "with iallocator %s" % self.opts.iallocator
505
      elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
506
        snode = None
507
        msg = "on %s" % pnode
508
      else:
509
        msg = "on %s, %s" % (pnode, snode)
510

    
511
      Log(msg, indent=2)
512

    
513
      op = opcodes.OpCreateInstance(instance_name=instance,
514
                                    disks = [ {"size": size}
515
                                              for size in self.disk_size],
516
                                    disk_template=self.opts.disk_template,
517
                                    nics=self.opts.nics,
518
                                    mode=constants.INSTANCE_CREATE,
519
                                    os_type=self.opts.os,
520
                                    pnode=pnode,
521
                                    snode=snode,
522
                                    start=True,
523
                                    ip_check=self.opts.ip_check,
524
                                    name_check=self.opts.name_check,
525
                                    wait_for_sync=True,
526
                                    file_driver="loop",
527
                                    file_storage_dir=None,
528
                                    iallocator=self.opts.iallocator,
529
                                    beparams=self.bep,
530
                                    hvparams=self.hvp,
531
                                    )
532

    
533
      self.ExecOrQueue(instance, op)
534
      self.to_rem.append(instance)
535

    
536
  @_DoBatch(False)
537
  def BurnGrowDisks(self):
538
    """Grow both the os and the swap disks by the requested amount, if any."""
539
    Log("Growing disks")
540
    for instance in self.instances:
541
      Log("instance %s", instance, indent=1)
542
      for idx, growth in enumerate(self.disk_growth):
543
        if growth > 0:
544
          op = opcodes.OpGrowDisk(instance_name=instance, disk=idx,
545
                                  amount=growth, wait_for_sync=True)
546
          Log("increase disk/%s by %s MB", idx, growth, indent=2)
547
          self.ExecOrQueue(instance, op)
548

    
549
  @_DoBatch(True)
550
  def BurnReplaceDisks1D8(self):
551
    """Replace disks on primary and secondary for drbd8."""
552
    Log("Replacing disks on the same nodes")
553
    for instance in self.instances:
554
      Log("instance %s", instance, indent=1)
555
      ops = []
556
      for mode in constants.REPLACE_DISK_SEC, constants.REPLACE_DISK_PRI:
557
        op = opcodes.OpReplaceDisks(instance_name=instance,
558
                                    mode=mode,
559
                                    disks=[i for i in range(self.disk_count)],
560
                                    early_release=self.opts.early_release)
561
        Log("run %s", mode, indent=2)
562
        ops.append(op)
563
      self.ExecOrQueue(instance, *ops) # pylint: disable-msg=W0142
564

    
565
  @_DoBatch(True)
566
  def BurnReplaceDisks2(self):
567
    """Replace secondary node."""
568
    Log("Changing the secondary node")
569
    mode = constants.REPLACE_DISK_CHG
570

    
571
    mytor = izip(islice(cycle(self.nodes), 2, None),
572
                 self.instances)
573
    for tnode, instance in mytor:
574
      Log("instance %s", instance, indent=1)
575
      if self.opts.iallocator:
576
        tnode = None
577
        msg = "with iallocator %s" % self.opts.iallocator
578
      else:
579
        msg = tnode
580
      op = opcodes.OpReplaceDisks(instance_name=instance,
581
                                  mode=mode,
582
                                  remote_node=tnode,
583
                                  iallocator=self.opts.iallocator,
584
                                  disks=[],
585
                                  early_release=self.opts.early_release)
586
      Log("run %s %s", mode, msg, indent=2)
587
      self.ExecOrQueue(instance, op)
588

    
589
  @_DoCheckInstances
590
  @_DoBatch(False)
591
  def BurnFailover(self):
592
    """Failover the instances."""
593
    Log("Failing over instances")
594
    for instance in self.instances:
595
      Log("instance %s", instance, indent=1)
596
      op = opcodes.OpFailoverInstance(instance_name=instance,
597
                                      ignore_consistency=False)
598
      self.ExecOrQueue(instance, op)
599

    
600
  @_DoCheckInstances
601
  @_DoBatch(False)
602
  def BurnMove(self):
603
    """Move the instances."""
604
    Log("Moving instances")
605
    mytor = izip(islice(cycle(self.nodes), 1, None),
606
                 self.instances)
607
    for tnode, instance in mytor:
608
      Log("instance %s", instance, indent=1)
609
      op = opcodes.OpMoveInstance(instance_name=instance,
610
                                  target_node=tnode)
611
      self.ExecOrQueue(instance, op)
612

    
613
  @_DoBatch(False)
614
  def BurnMigrate(self):
615
    """Migrate the instances."""
616
    Log("Migrating instances")
617
    for instance in self.instances:
618
      Log("instance %s", instance, indent=1)
619
      op1 = opcodes.OpMigrateInstance(instance_name=instance, live=True,
620
                                      cleanup=False)
621

    
622
      op2 = opcodes.OpMigrateInstance(instance_name=instance, live=True,
623
                                      cleanup=True)
624
      Log("migration and migration cleanup", indent=2)
625
      self.ExecOrQueue(instance, op1, op2)
626

    
627
  @_DoCheckInstances
628
  @_DoBatch(False)
629
  def BurnImportExport(self):
630
    """Export the instance, delete it, and import it back.
631

    
632
    """
633
    Log("Exporting and re-importing instances")
634
    mytor = izip(cycle(self.nodes),
635
                 islice(cycle(self.nodes), 1, None),
636
                 islice(cycle(self.nodes), 2, None),
637
                 self.instances)
638

    
639
    for pnode, snode, enode, instance in mytor:
640
      Log("instance %s", instance, indent=1)
641
      # read the full name of the instance
642
      nam_op = opcodes.OpQueryInstances(output_fields=["name"],
643
                                        names=[instance], use_locking=True)
644
      full_name = self.ExecOp(False, nam_op)[0][0]
645

    
646
      if self.opts.iallocator:
647
        pnode = snode = None
648
        import_log_msg = ("import from %s"
649
                          " with iallocator %s" %
650
                          (enode, self.opts.iallocator))
651
      elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
652
        snode = None
653
        import_log_msg = ("import from %s to %s" %
654
                          (enode, pnode))
655
      else:
656
        import_log_msg = ("import from %s to %s, %s" %
657
                          (enode, pnode, snode))
658

    
659
      exp_op = opcodes.OpExportInstance(instance_name=instance,
660
                                           target_node=enode,
661
                                           shutdown=True)
662
      rem_op = opcodes.OpRemoveInstance(instance_name=instance,
663
                                        ignore_failures=True)
664
      imp_dir = utils.PathJoin(constants.EXPORT_DIR, full_name)
665
      imp_op = opcodes.OpCreateInstance(instance_name=instance,
666
                                        disks = [ {"size": size}
667
                                                  for size in self.disk_size],
668
                                        disk_template=self.opts.disk_template,
669
                                        nics=self.opts.nics,
670
                                        mode=constants.INSTANCE_IMPORT,
671
                                        src_node=enode,
672
                                        src_path=imp_dir,
673
                                        pnode=pnode,
674
                                        snode=snode,
675
                                        start=True,
676
                                        ip_check=self.opts.ip_check,
677
                                        name_check=self.opts.name_check,
678
                                        wait_for_sync=True,
679
                                        file_storage_dir=None,
680
                                        file_driver="loop",
681
                                        iallocator=self.opts.iallocator,
682
                                        beparams=self.bep,
683
                                        hvparams=self.hvp,
684
                                        )
685

    
686
      erem_op = opcodes.OpRemoveExport(instance_name=instance)
687

    
688
      Log("export to node %s", enode, indent=2)
689
      Log("remove instance", indent=2)
690
      Log(import_log_msg, indent=2)
691
      Log("remove export", indent=2)
692
      self.ExecOrQueue(instance, exp_op, rem_op, imp_op, erem_op)
693

    
694
  @staticmethod
695
  def StopInstanceOp(instance):
696
    """Stop given instance."""
697
    return opcodes.OpShutdownInstance(instance_name=instance)
698

    
699
  @staticmethod
700
  def StartInstanceOp(instance):
701
    """Start given instance."""
702
    return opcodes.OpStartupInstance(instance_name=instance, force=False)
703

    
704
  @staticmethod
705
  def RenameInstanceOp(instance, instance_new):
706
    """Rename instance."""
707
    return opcodes.OpRenameInstance(instance_name=instance,
708
                                    new_name=instance_new)
709

    
710
  @_DoCheckInstances
711
  @_DoBatch(True)
712
  def BurnStopStart(self):
713
    """Stop/start the instances."""
714
    Log("Stopping and starting instances")
715
    for instance in self.instances:
716
      Log("instance %s", instance, indent=1)
717
      op1 = self.StopInstanceOp(instance)
718
      op2 = self.StartInstanceOp(instance)
719
      self.ExecOrQueue(instance, op1, op2)
720

    
721
  @_DoBatch(False)
722
  def BurnRemove(self):
723
    """Remove the instances."""
724
    Log("Removing instances")
725
    for instance in self.to_rem:
726
      Log("instance %s", instance, indent=1)
727
      op = opcodes.OpRemoveInstance(instance_name=instance,
728
                                    ignore_failures=True)
729
      self.ExecOrQueue(instance, op)
730

    
731
  def BurnRename(self):
732
    """Rename the instances.
733

    
734
    Note that this function will not execute in parallel, since we
735
    only have one target for rename.
736

    
737
    """
738
    Log("Renaming instances")
739
    rename = self.opts.rename
740
    for instance in self.instances:
741
      Log("instance %s", instance, indent=1)
742
      op_stop1 = self.StopInstanceOp(instance)
743
      op_stop2 = self.StopInstanceOp(rename)
744
      op_rename1 = self.RenameInstanceOp(instance, rename)
745
      op_rename2 = self.RenameInstanceOp(rename, instance)
746
      op_start1 = self.StartInstanceOp(rename)
747
      op_start2 = self.StartInstanceOp(instance)
748
      self.ExecOp(False, op_stop1, op_rename1, op_start1)
749
      self._CheckInstanceAlive(rename)
750
      self.ExecOp(False, op_stop2, op_rename2, op_start2)
751
      self._CheckInstanceAlive(instance)
752

    
753
  @_DoCheckInstances
754
  @_DoBatch(True)
755
  def BurnReinstall(self):
756
    """Reinstall the instances."""
757
    Log("Reinstalling instances")
758
    for instance in self.instances:
759
      Log("instance %s", instance, indent=1)
760
      op1 = self.StopInstanceOp(instance)
761
      op2 = opcodes.OpReinstallInstance(instance_name=instance)
762
      Log("reinstall without passing the OS", indent=2)
763
      op3 = opcodes.OpReinstallInstance(instance_name=instance,
764
                                        os_type=self.opts.os)
765
      Log("reinstall specifying the OS", indent=2)
766
      op4 = self.StartInstanceOp(instance)
767
      self.ExecOrQueue(instance, op1, op2, op3, op4)
768

    
769
  @_DoCheckInstances
770
  @_DoBatch(True)
771
  def BurnReboot(self):
772
    """Reboot the instances."""
773
    Log("Rebooting instances")
774
    for instance in self.instances:
775
      Log("instance %s", instance, indent=1)
776
      ops = []
777
      for reboot_type in constants.REBOOT_TYPES:
778
        op = opcodes.OpRebootInstance(instance_name=instance,
779
                                      reboot_type=reboot_type,
780
                                      ignore_secondaries=False)
781
        Log("reboot with type '%s'", reboot_type, indent=2)
782
        ops.append(op)
783
      self.ExecOrQueue(instance, *ops) # pylint: disable-msg=W0142
784

    
785
  @_DoCheckInstances
786
  @_DoBatch(True)
787
  def BurnActivateDisks(self):
788
    """Activate and deactivate disks of the instances."""
789
    Log("Activating/deactivating disks")
790
    for instance in self.instances:
791
      Log("instance %s", instance, indent=1)
792
      op_start = self.StartInstanceOp(instance)
793
      op_act = opcodes.OpActivateInstanceDisks(instance_name=instance)
794
      op_deact = opcodes.OpDeactivateInstanceDisks(instance_name=instance)
795
      op_stop = self.StopInstanceOp(instance)
796
      Log("activate disks when online", indent=2)
797
      Log("activate disks when offline", indent=2)
798
      Log("deactivate disks (when offline)", indent=2)
799
      self.ExecOrQueue(instance, op_act, op_stop, op_act, op_deact, op_start)
800

    
801
  @_DoCheckInstances
802
  @_DoBatch(False)
803
  def BurnAddRemoveDisks(self):
804
    """Add and remove an extra disk for the instances."""
805
    Log("Adding and removing disks")
806
    for instance in self.instances:
807
      Log("instance %s", instance, indent=1)
808
      op_add = opcodes.OpSetInstanceParams(\
809
        instance_name=instance,
810
        disks=[(constants.DDM_ADD, {"size": self.disk_size[0]})])
811
      op_rem = opcodes.OpSetInstanceParams(\
812
        instance_name=instance, disks=[(constants.DDM_REMOVE, {})])
813
      op_stop = self.StopInstanceOp(instance)
814
      op_start = self.StartInstanceOp(instance)
815
      Log("adding a disk", indent=2)
816
      Log("removing last disk", indent=2)
817
      self.ExecOrQueue(instance, op_add, op_stop, op_rem, op_start)
818

    
819
  @_DoBatch(False)
820
  def BurnAddRemoveNICs(self):
821
    """Add and remove an extra NIC for the instances."""
822
    Log("Adding and removing NICs")
823
    for instance in self.instances:
824
      Log("instance %s", instance, indent=1)
825
      op_add = opcodes.OpSetInstanceParams(\
826
        instance_name=instance, nics=[(constants.DDM_ADD, {})])
827
      op_rem = opcodes.OpSetInstanceParams(\
828
        instance_name=instance, nics=[(constants.DDM_REMOVE, {})])
829
      Log("adding a NIC", indent=2)
830
      Log("removing last NIC", indent=2)
831
      self.ExecOrQueue(instance, op_add, op_rem)
832

    
833
  def _CheckInstanceAlive(self, instance):
834
    """Check if an instance is alive by doing http checks.
835

    
836
    This will try to retrieve the url on the instance /hostname.txt
837
    and check that it contains the hostname of the instance. In case
838
    we get ECONNREFUSED, we retry up to the net timeout seconds, for
839
    any other error we abort.
840

    
841
    """
842
    if not self.opts.http_check:
843
      return
844
    end_time = time.time() + self.opts.net_timeout
845
    url = None
846
    while time.time() < end_time and url is None:
847
      try:
848
        url = self.url_opener.open("http://%s/hostname.txt" % instance)
849
      except IOError:
850
        # here we can have connection refused, no route to host, etc.
851
        time.sleep(1)
852
    if url is None:
853
      raise InstanceDown(instance, "Cannot contact instance")
854
    hostname = url.read().strip()
855
    url.close()
856
    if hostname != instance:
857
      raise InstanceDown(instance, ("Hostname mismatch, expected %s, got %s" %
858
                                    (instance, hostname)))
859

    
860
  def BurninCluster(self):
861
    """Test a cluster intensively.
862

    
863
    This will create instances and then start/stop/failover them.
864
    It is safe for existing instances but could impact performance.
865

    
866
    """
867

    
868
    opts = self.opts
869

    
870
    Log("Testing global parameters")
871

    
872
    if (len(self.nodes) == 1 and
873
        opts.disk_template not in (constants.DT_DISKLESS, constants.DT_PLAIN,
874
                                   constants.DT_FILE)):
875
      Err("When one node is available/selected the disk template must"
876
          " be 'diskless', 'file' or 'plain'")
877

    
878
    has_err = True
879
    try:
880
      self.BurnCreateInstances()
881
      if opts.do_replace1 and opts.disk_template in constants.DTS_NET_MIRROR:
882
        self.BurnReplaceDisks1D8()
883
      if (opts.do_replace2 and len(self.nodes) > 2 and
884
          opts.disk_template in constants.DTS_NET_MIRROR) :
885
        self.BurnReplaceDisks2()
886

    
887
      if (opts.disk_template != constants.DT_DISKLESS and
888
          utils.any(self.disk_growth, lambda n: n > 0)):
889
        self.BurnGrowDisks()
890

    
891
      if opts.do_failover and opts.disk_template in constants.DTS_NET_MIRROR:
892
        self.BurnFailover()
893

    
894
      if opts.do_migrate and opts.disk_template == constants.DT_DRBD8:
895
        self.BurnMigrate()
896

    
897
      if (opts.do_move and len(self.nodes) > 1 and
898
          opts.disk_template in [constants.DT_PLAIN, constants.DT_FILE]):
899
        self.BurnMove()
900

    
901
      if (opts.do_importexport and
902
          opts.disk_template not in (constants.DT_DISKLESS,
903
                                     constants.DT_FILE)):
904
        self.BurnImportExport()
905

    
906
      if opts.do_reinstall:
907
        self.BurnReinstall()
908

    
909
      if opts.do_reboot:
910
        self.BurnReboot()
911

    
912
      if opts.do_addremove_disks:
913
        self.BurnAddRemoveDisks()
914

    
915
      if opts.do_addremove_nics:
916
        self.BurnAddRemoveNICs()
917

    
918
      if opts.do_activate_disks:
919
        self.BurnActivateDisks()
920

    
921
      if opts.rename:
922
        self.BurnRename()
923

    
924
      if opts.do_startstop:
925
        self.BurnStopStart()
926

    
927
      has_err = False
928
    finally:
929
      if has_err:
930
        Log("Error detected: opcode buffer follows:\n\n")
931
        Log(self.GetFeedbackBuf())
932
        Log("\n\n")
933
      if not self.opts.keep_instances:
934
        try:
935
          self.BurnRemove()
936
        except Exception, err:  # pylint: disable-msg=W0703
937
          if has_err: # already detected errors, so errors in removal
938
                      # are quite expected
939
            Log("Note: error detected during instance remove: %s", err)
940
          else: # non-expected error
941
            raise
942

    
943
    return 0
944

    
945

    
946
def main():
947
  """Main function"""
948

    
949
  burner = Burner()
950
  return burner.BurninCluster()
951

    
952

    
953
if __name__ == "__main__":
954
  main()