Statistics
| Branch: | Tag: | Revision:

root / tools / burnin @ 7ea7bcf6

History | View | Annotate | Download (33.1 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Burnin program
23

    
24
"""
25

    
26
import os
27
import sys
28
import optparse
29
import time
30
import socket
31
import urllib
32
from itertools import izip, islice, cycle
33
from cStringIO import StringIO
34

    
35
from ganeti import opcodes
36
from ganeti import constants
37
from ganeti import cli
38
from ganeti import errors
39
from ganeti import utils
40

    
41

    
42
USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...")
43

    
44
MAX_RETRIES = 3
45

    
46
class InstanceDown(Exception):
47
  """The checked instance was not up"""
48

    
49

    
50
class BurninFailure(Exception):
51
  """Failure detected during burning"""
52

    
53

    
54
def Usage():
55
  """Shows program usage information and exits the program."""
56

    
57
  print >> sys.stderr, "Usage:"
58
  print >> sys.stderr, USAGE
59
  sys.exit(2)
60

    
61

    
62
def Log(msg, indent=0):
63
  """Simple function that prints out its argument.
64

    
65
  """
66
  headers = {
67
    0: "- ",
68
    1: "* ",
69
    2: ""
70
    }
71
  sys.stdout.write("%*s%s%s\n" % (2*indent, "",
72
                                   headers.get(indent, "  "), msg))
73
  sys.stdout.flush()
74

    
75
def Err(msg, exit_code=1):
76
  """Simple error logging that prints to stderr.
77

    
78
  """
79
  sys.stderr.write(msg + "\n")
80
  sys.stderr.flush()
81
  sys.exit(exit_code)
82

    
83

    
84
class SimpleOpener(urllib.FancyURLopener):
85
  """A simple url opener"""
86
  # pylint: disable-msg=W0221
87

    
88
  def prompt_user_passwd(self, host, realm, clear_cache=0):
89
    """No-interaction version of prompt_user_passwd."""
90
    # we follow parent class' API
91
    # pylint: disable-msg=W0613
92
    return None, None
93

    
94
  def http_error_default(self, url, fp, errcode, errmsg, headers):
95
    """Custom error handling"""
96
    # make sure sockets are not left in CLOSE_WAIT, this is similar
97
    # but with a different exception to the BasicURLOpener class
98
    _ = fp.read() # throw away data
99
    fp.close()
100
    raise InstanceDown("HTTP error returned: code %s, msg %s" %
101
                       (errcode, errmsg))
102

    
103

    
104
OPTIONS = [
105
  cli.cli_option("-o", "--os", dest="os", default=None,
106
                 help="OS to use during burnin",
107
                 metavar="<OS>",
108
                 completion_suggest=cli.OPT_COMPL_ONE_OS),
109
  cli.cli_option("--disk-size", dest="disk_size",
110
                 help="Disk size (determines disk count)",
111
                 default="128m", type="string", metavar="<size,size,...>",
112
                 completion_suggest=("128M 512M 1G 4G 1G,256M"
113
                                     " 4G,1G,1G 10G").split()),
114
  cli.cli_option("--disk-growth", dest="disk_growth", help="Disk growth",
115
                 default="128m", type="string", metavar="<size,size,...>"),
116
  cli.cli_option("--mem-size", dest="mem_size", help="Memory size",
117
                 default=128, type="unit", metavar="<size>",
118
                 completion_suggest=("128M 256M 512M 1G 4G 8G"
119
                                     " 12G 16G").split()),
120
  cli.VERBOSE_OPT,
121
  cli.NOIPCHECK_OPT,
122
  cli.NONAMECHECK_OPT,
123
  cli.EARLY_RELEASE_OPT,
124
  cli.cli_option("--no-replace1", dest="do_replace1",
125
                 help="Skip disk replacement with the same secondary",
126
                 action="store_false", default=True),
127
  cli.cli_option("--no-replace2", dest="do_replace2",
128
                 help="Skip disk replacement with a different secondary",
129
                 action="store_false", default=True),
130
  cli.cli_option("--no-failover", dest="do_failover",
131
                 help="Skip instance failovers", action="store_false",
132
                 default=True),
133
  cli.cli_option("--no-migrate", dest="do_migrate",
134
                 help="Skip instance live migration",
135
                 action="store_false", default=True),
136
  cli.cli_option("--no-move", dest="do_move",
137
                 help="Skip instance moves", action="store_false",
138
                 default=True),
139
  cli.cli_option("--no-importexport", dest="do_importexport",
140
                 help="Skip instance export/import", action="store_false",
141
                 default=True),
142
  cli.cli_option("--no-startstop", dest="do_startstop",
143
                 help="Skip instance stop/start", action="store_false",
144
                 default=True),
145
  cli.cli_option("--no-reinstall", dest="do_reinstall",
146
                 help="Skip instance reinstall", action="store_false",
147
                 default=True),
148
  cli.cli_option("--no-reboot", dest="do_reboot",
149
                 help="Skip instance reboot", action="store_false",
150
                 default=True),
151
  cli.cli_option("--no-activate-disks", dest="do_activate_disks",
152
                 help="Skip disk activation/deactivation",
153
                 action="store_false", default=True),
154
  cli.cli_option("--no-add-disks", dest="do_addremove_disks",
155
                 help="Skip disk addition/removal",
156
                 action="store_false", default=True),
157
  cli.cli_option("--no-add-nics", dest="do_addremove_nics",
158
                 help="Skip NIC addition/removal",
159
                 action="store_false", default=True),
160
  cli.cli_option("--no-nics", dest="nics",
161
                 help="No network interfaces", action="store_const",
162
                 const=[], default=[{}]),
163
  cli.cli_option("--rename", dest="rename", default=None,
164
                 help=("Give one unused instance name which is taken"
165
                       " to start the renaming sequence"),
166
                 metavar="<instance_name>"),
167
  cli.cli_option("-t", "--disk-template", dest="disk_template",
168
                 choices=list(constants.DISK_TEMPLATES),
169
                 default=constants.DT_DRBD8,
170
                 help="Disk template (diskless, file, plain or drbd) [drbd]"),
171
  cli.cli_option("-n", "--nodes", dest="nodes", default="",
172
                 help=("Comma separated list of nodes to perform"
173
                       " the burnin on (defaults to all nodes)"),
174
                 completion_suggest=cli.OPT_COMPL_MANY_NODES),
175
  cli.cli_option("-I", "--iallocator", dest="iallocator",
176
                 default=None, type="string",
177
                 help=("Perform the allocation using an iallocator"
178
                       " instead of fixed node spread (node restrictions no"
179
                       " longer apply, therefore -n/--nodes must not be"
180
                       " used"),
181
                 completion_suggest=cli.OPT_COMPL_ONE_IALLOCATOR),
182
  cli.cli_option("-p", "--parallel", default=False, action="store_true",
183
                 dest="parallel",
184
                 help=("Enable parallelization of some operations in"
185
                       " order to speed burnin or to test granular locking")),
186
  cli.cli_option("--net-timeout", default=15, type="int",
187
                 dest="net_timeout",
188
                 help=("The instance check network timeout in seconds"
189
                       " (defaults to 15 seconds)"),
190
                 completion_suggest="15 60 300 900".split()),
191
  cli.cli_option("-C", "--http-check", default=False, action="store_true",
192
                 dest="http_check",
193
                 help=("Enable checking of instance status via http,"
194
                       " looking for /hostname.txt that should contain the"
195
                       " name of the instance")),
196
  cli.cli_option("-K", "--keep-instances", default=False,
197
                 action="store_true",
198
                 dest="keep_instances",
199
                 help=("Leave instances on the cluster after burnin,"
200
                       " for investigation in case of errors or simply"
201
                       " to use them")),
202
  ]
203

    
204
# Mainly used for bash completion
205
ARGUMENTS = [cli.ArgInstance(min=1)]
206

    
207

    
208
def _DoCheckInstances(fn):
209
  """Decorator for checking instances.
210

    
211
  """
212
  def wrapper(self, *args, **kwargs):
213
    val = fn(self, *args, **kwargs)
214
    for instance in self.instances:
215
      self._CheckInstanceAlive(instance) # pylint: disable-msg=W0212
216
    return val
217

    
218
  return wrapper
219

    
220

    
221
def _DoBatch(retry):
222
  """Decorator for possible batch operations.
223

    
224
  Must come after the _DoCheckInstances decorator (if any).
225

    
226
  @param retry: whether this is a retryable batch, will be
227
      passed to StartBatch
228

    
229
  """
230
  def wrap(fn):
231
    def batched(self, *args, **kwargs):
232
      self.StartBatch(retry)
233
      val = fn(self, *args, **kwargs)
234
      self.CommitQueue()
235
      return val
236
    return batched
237

    
238
  return wrap
239

    
240

    
241
class Burner(object):
242
  """Burner class."""
243

    
244
  def __init__(self):
245
    """Constructor."""
246
    utils.SetupLogging(constants.LOG_BURNIN, debug=False, stderr_logging=True)
247
    self.url_opener = SimpleOpener()
248
    self._feed_buf = StringIO()
249
    self.nodes = []
250
    self.instances = []
251
    self.to_rem = []
252
    self.queued_ops = []
253
    self.opts = None
254
    self.queue_retry = False
255
    self.disk_count = self.disk_growth = self.disk_size = None
256
    self.hvp = self.bep = None
257
    self.ParseOptions()
258
    self.cl = cli.GetClient()
259
    self.GetState()
260

    
261
  def ClearFeedbackBuf(self):
262
    """Clear the feedback buffer."""
263
    self._feed_buf.truncate(0)
264

    
265
  def GetFeedbackBuf(self):
266
    """Return the contents of the buffer."""
267
    return self._feed_buf.getvalue()
268

    
269
  def Feedback(self, msg):
270
    """Acumulate feedback in our buffer."""
271
    formatted_msg = "%s %s" % (time.ctime(utils.MergeTime(msg[0])), msg[2])
272
    self._feed_buf.write(formatted_msg + "\n")
273
    if self.opts.verbose:
274
      Log(formatted_msg, indent=3)
275

    
276
  def MaybeRetry(self, retry_count, msg, fn, *args):
277
    """Possibly retry a given function execution.
278

    
279
    @type retry_count: int
280
    @param retry_count: retry counter:
281
        - 0: non-retryable action
282
        - 1: last retry for a retryable action
283
        - MAX_RETRIES: original try for a retryable action
284
    @type msg: str
285
    @param msg: the kind of the operation
286
    @type fn: callable
287
    @param fn: the function to be called
288

    
289
    """
290
    try:
291
      val = fn(*args)
292
      if retry_count > 0 and retry_count < MAX_RETRIES:
293
        Log("Idempotent %s succeeded after %d retries" %
294
            (msg, MAX_RETRIES - retry_count))
295
      return val
296
    except Exception, err: # pylint: disable-msg=W0703
297
      if retry_count == 0:
298
        Log("Non-idempotent %s failed, aborting" % (msg, ))
299
        raise
300
      elif retry_count == 1:
301
        Log("Idempotent %s repeated failure, aborting" % (msg, ))
302
        raise
303
      else:
304
        Log("Idempotent %s failed, retry #%d/%d: %s" %
305
            (msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err))
306
        self.MaybeRetry(retry_count - 1, msg, fn, *args)
307

    
308
  def _ExecOp(self, *ops):
309
    """Execute one or more opcodes and manage the exec buffer.
310

    
311
    @result: if only opcode has been passed, we return its result;
312
        otherwise we return the list of results
313

    
314
    """
315
    job_id = cli.SendJob(ops, cl=self.cl)
316
    results = cli.PollJob(job_id, cl=self.cl, feedback_fn=self.Feedback)
317
    if len(ops) == 1:
318
      return results[0]
319
    else:
320
      return results
321

    
322
  def ExecOp(self, retry, *ops):
323
    """Execute one or more opcodes and manage the exec buffer.
324

    
325
    @result: if only opcode has been passed, we return its result;
326
        otherwise we return the list of results
327

    
328
    """
329
    if retry:
330
      rval = MAX_RETRIES
331
    else:
332
      rval = 0
333
    return self.MaybeRetry(rval, "opcode", self._ExecOp, *ops)
334

    
335
  def ExecOrQueue(self, name, *ops):
336
    """Execute an opcode and manage the exec buffer."""
337
    if self.opts.parallel:
338
      self.queued_ops.append((ops, name))
339
    else:
340
      return self.ExecOp(self.queue_retry, *ops)
341

    
342
  def StartBatch(self, retry):
343
    """Start a new batch of jobs.
344

    
345
    @param retry: whether this is a retryable batch
346

    
347
    """
348
    self.queued_ops = []
349
    self.queue_retry = retry
350

    
351
  def CommitQueue(self):
352
    """Execute all submitted opcodes in case of parallel burnin"""
353
    if not self.opts.parallel:
354
      return
355

    
356
    if self.queue_retry:
357
      rval = MAX_RETRIES
358
    else:
359
      rval = 0
360

    
361
    try:
362
      results = self.MaybeRetry(rval, "jobset", self.ExecJobSet,
363
                                self.queued_ops)
364
    finally:
365
      self.queued_ops = []
366
    return results
367

    
368
  def ExecJobSet(self, jobs):
369
    """Execute a set of jobs and return once all are done.
370

    
371
    The method will return the list of results, if all jobs are
372
    successful. Otherwise, OpExecError will be raised from within
373
    cli.py.
374

    
375
    """
376
    self.ClearFeedbackBuf()
377
    job_ids = [cli.SendJob(row[0], cl=self.cl) for row in jobs]
378
    Log("Submitted job ID(s) %s" % utils.CommaJoin(job_ids), indent=1)
379
    results = []
380
    for jid, (_, iname) in zip(job_ids, jobs):
381
      Log("waiting for job %s for %s" % (jid, iname), indent=2)
382
      try:
383
        results.append(cli.PollJob(jid, cl=self.cl, feedback_fn=self.Feedback))
384
      except Exception, err: # pylint: disable-msg=W0703
385
        Log("Job for %s failed: %s" % (iname, err))
386
    if len(results) != len(jobs):
387
      raise BurninFailure()
388
    return results
389

    
390
  def ParseOptions(self):
391
    """Parses the command line options.
392

    
393
    In case of command line errors, it will show the usage and exit the
394
    program.
395

    
396
    """
397
    parser = optparse.OptionParser(usage="\n%s" % USAGE,
398
                                   version=("%%prog (ganeti) %s" %
399
                                            constants.RELEASE_VERSION),
400
                                   option_list=OPTIONS)
401

    
402
    options, args = parser.parse_args()
403
    if len(args) < 1 or options.os is None:
404
      Usage()
405

    
406
    supported_disk_templates = (constants.DT_DISKLESS,
407
                                constants.DT_FILE,
408
                                constants.DT_PLAIN,
409
                                constants.DT_DRBD8)
410
    if options.disk_template not in supported_disk_templates:
411
      Err("Unknown disk template '%s'" % options.disk_template)
412

    
413
    if options.disk_template == constants.DT_DISKLESS:
414
      disk_size = disk_growth = []
415
      options.do_addremove_disks = False
416
    else:
417
      disk_size = [utils.ParseUnit(v) for v in options.disk_size.split(",")]
418
      disk_growth = [utils.ParseUnit(v)
419
                     for v in options.disk_growth.split(",")]
420
      if len(disk_growth) != len(disk_size):
421
        Err("Wrong disk sizes/growth combination")
422
    if ((disk_size and options.disk_template == constants.DT_DISKLESS) or
423
        (not disk_size and options.disk_template != constants.DT_DISKLESS)):
424
      Err("Wrong disk count/disk template combination")
425

    
426
    self.disk_size = disk_size
427
    self.disk_growth = disk_growth
428
    self.disk_count = len(disk_size)
429

    
430
    if options.nodes and options.iallocator:
431
      Err("Give either the nodes option or the iallocator option, not both")
432

    
433
    if options.http_check and not options.name_check:
434
      Err("Can't enable HTTP checks without name checks")
435

    
436
    self.opts = options
437
    self.instances = args
438
    self.bep = {
439
      constants.BE_MEMORY: options.mem_size,
440
      constants.BE_VCPUS: 1,
441
      }
442
    self.hvp = {}
443

    
444
    socket.setdefaulttimeout(options.net_timeout)
445

    
446
  def GetState(self):
447
    """Read the cluster state from the config."""
448
    if self.opts.nodes:
449
      names = self.opts.nodes.split(",")
450
    else:
451
      names = []
452
    try:
453
      op = opcodes.OpQueryNodes(output_fields=["name", "offline", "drained"],
454
                                names=names, use_locking=True)
455
      result = self.ExecOp(True, op)
456
    except errors.GenericError, err:
457
      err_code, msg = cli.FormatError(err)
458
      Err(msg, exit_code=err_code)
459
    self.nodes = [data[0] for data in result if not (data[1] or data[2])]
460

    
461
    op_diagnose = opcodes.OpDiagnoseOS(output_fields=["name", "valid",
462
                                                      "variants"], names=[])
463
    result = self.ExecOp(True, op_diagnose)
464

    
465
    if not result:
466
      Err("Can't get the OS list")
467

    
468
    found = False
469
    for (name, valid, variants) in result:
470
      if valid and self.opts.os in cli.CalculateOSNames(name, variants):
471
        found = True
472
        break
473

    
474
    if not found:
475
      Err("OS '%s' not found" % self.opts.os)
476

    
477
  @_DoCheckInstances
478
  @_DoBatch(False)
479
  def BurnCreateInstances(self):
480
    """Create the given instances.
481

    
482
    """
483
    self.to_rem = []
484
    mytor = izip(cycle(self.nodes),
485
                 islice(cycle(self.nodes), 1, None),
486
                 self.instances)
487

    
488
    Log("Creating instances")
489
    for pnode, snode, instance in mytor:
490
      Log("instance %s" % instance, indent=1)
491
      if self.opts.iallocator:
492
        pnode = snode = None
493
        msg = "with iallocator %s" % self.opts.iallocator
494
      elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
495
        snode = None
496
        msg = "on %s" % pnode
497
      else:
498
        msg = "on %s, %s" % (pnode, snode)
499

    
500
      Log(msg, indent=2)
501

    
502
      op = opcodes.OpCreateInstance(instance_name=instance,
503
                                    disks = [ {"size": size}
504
                                              for size in self.disk_size],
505
                                    disk_template=self.opts.disk_template,
506
                                    nics=self.opts.nics,
507
                                    mode=constants.INSTANCE_CREATE,
508
                                    os_type=self.opts.os,
509
                                    pnode=pnode,
510
                                    snode=snode,
511
                                    start=True,
512
                                    ip_check=self.opts.ip_check,
513
                                    name_check=self.opts.name_check,
514
                                    wait_for_sync=True,
515
                                    file_driver="loop",
516
                                    file_storage_dir=None,
517
                                    iallocator=self.opts.iallocator,
518
                                    beparams=self.bep,
519
                                    hvparams=self.hvp,
520
                                    )
521

    
522
      self.ExecOrQueue(instance, op)
523
      self.to_rem.append(instance)
524

    
525
  @_DoBatch(False)
526
  def BurnGrowDisks(self):
527
    """Grow both the os and the swap disks by the requested amount, if any."""
528
    Log("Growing disks")
529
    for instance in self.instances:
530
      Log("instance %s" % instance, indent=1)
531
      for idx, growth in enumerate(self.disk_growth):
532
        if growth > 0:
533
          op = opcodes.OpGrowDisk(instance_name=instance, disk=idx,
534
                                  amount=growth, wait_for_sync=True)
535
          Log("increase disk/%s by %s MB" % (idx, growth), indent=2)
536
          self.ExecOrQueue(instance, op)
537

    
538
  @_DoBatch(True)
539
  def BurnReplaceDisks1D8(self):
540
    """Replace disks on primary and secondary for drbd8."""
541
    Log("Replacing disks on the same nodes")
542
    for instance in self.instances:
543
      Log("instance %s" % instance, indent=1)
544
      ops = []
545
      for mode in constants.REPLACE_DISK_SEC, constants.REPLACE_DISK_PRI:
546
        op = opcodes.OpReplaceDisks(instance_name=instance,
547
                                    mode=mode,
548
                                    disks=[i for i in range(self.disk_count)],
549
                                    early_release=self.opts.early_release)
550
        Log("run %s" % mode, indent=2)
551
        ops.append(op)
552
      self.ExecOrQueue(instance, *ops) # pylint: disable-msg=W0142
553

    
554
  @_DoBatch(True)
555
  def BurnReplaceDisks2(self):
556
    """Replace secondary node."""
557
    Log("Changing the secondary node")
558
    mode = constants.REPLACE_DISK_CHG
559

    
560
    mytor = izip(islice(cycle(self.nodes), 2, None),
561
                 self.instances)
562
    for tnode, instance in mytor:
563
      Log("instance %s" % instance, indent=1)
564
      if self.opts.iallocator:
565
        tnode = None
566
        msg = "with iallocator %s" % self.opts.iallocator
567
      else:
568
        msg = tnode
569
      op = opcodes.OpReplaceDisks(instance_name=instance,
570
                                  mode=mode,
571
                                  remote_node=tnode,
572
                                  iallocator=self.opts.iallocator,
573
                                  disks=[],
574
                                  early_release=self.opts.early_release)
575
      Log("run %s %s" % (mode, msg), indent=2)
576
      self.ExecOrQueue(instance, op)
577

    
578
  @_DoCheckInstances
579
  @_DoBatch(False)
580
  def BurnFailover(self):
581
    """Failover the instances."""
582
    Log("Failing over instances")
583
    for instance in self.instances:
584
      Log("instance %s" % instance, indent=1)
585
      op = opcodes.OpFailoverInstance(instance_name=instance,
586
                                      ignore_consistency=False)
587
      self.ExecOrQueue(instance, op)
588

    
589
  @_DoCheckInstances
590
  @_DoBatch(False)
591
  def BurnMove(self):
592
    """Move the instances."""
593
    Log("Moving instances")
594
    mytor = izip(islice(cycle(self.nodes), 1, None),
595
                 self.instances)
596
    for tnode, instance in mytor:
597
      Log("instance %s" % instance, indent=1)
598
      op = opcodes.OpMoveInstance(instance_name=instance,
599
                                  target_node=tnode)
600
      self.ExecOrQueue(instance, op)
601

    
602
  @_DoBatch(False)
603
  def BurnMigrate(self):
604
    """Migrate the instances."""
605
    Log("Migrating instances")
606
    for instance in self.instances:
607
      Log("instance %s" % instance, indent=1)
608
      op1 = opcodes.OpMigrateInstance(instance_name=instance, live=True,
609
                                      cleanup=False)
610

    
611
      op2 = opcodes.OpMigrateInstance(instance_name=instance, live=True,
612
                                      cleanup=True)
613
      Log("migration and migration cleanup", indent=2)
614
      self.ExecOrQueue(instance, op1, op2)
615

    
616
  @_DoCheckInstances
617
  @_DoBatch(False)
618
  def BurnImportExport(self):
619
    """Export the instance, delete it, and import it back.
620

    
621
    """
622
    Log("Exporting and re-importing instances")
623
    mytor = izip(cycle(self.nodes),
624
                 islice(cycle(self.nodes), 1, None),
625
                 islice(cycle(self.nodes), 2, None),
626
                 self.instances)
627

    
628
    for pnode, snode, enode, instance in mytor:
629
      Log("instance %s" % instance, indent=1)
630
      # read the full name of the instance
631
      nam_op = opcodes.OpQueryInstances(output_fields=["name"],
632
                                        names=[instance], use_locking=True)
633
      full_name = self.ExecOp(False, nam_op)[0][0]
634

    
635
      if self.opts.iallocator:
636
        pnode = snode = None
637
        import_log_msg = ("import from %s"
638
                          " with iallocator %s" %
639
                          (enode, self.opts.iallocator))
640
      elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
641
        snode = None
642
        import_log_msg = ("import from %s to %s" %
643
                          (enode, pnode))
644
      else:
645
        import_log_msg = ("import from %s to %s, %s" %
646
                          (enode, pnode, snode))
647

    
648
      exp_op = opcodes.OpExportInstance(instance_name=instance,
649
                                           target_node=enode,
650
                                           shutdown=True)
651
      rem_op = opcodes.OpRemoveInstance(instance_name=instance,
652
                                        ignore_failures=True)
653
      imp_dir = os.path.join(constants.EXPORT_DIR, full_name)
654
      imp_op = opcodes.OpCreateInstance(instance_name=instance,
655
                                        disks = [ {"size": size}
656
                                                  for size in self.disk_size],
657
                                        disk_template=self.opts.disk_template,
658
                                        nics=self.opts.nics,
659
                                        mode=constants.INSTANCE_IMPORT,
660
                                        src_node=enode,
661
                                        src_path=imp_dir,
662
                                        pnode=pnode,
663
                                        snode=snode,
664
                                        start=True,
665
                                        ip_check=self.opts.ip_check,
666
                                        name_check=self.opts.name_check,
667
                                        wait_for_sync=True,
668
                                        file_storage_dir=None,
669
                                        file_driver="loop",
670
                                        iallocator=self.opts.iallocator,
671
                                        beparams=self.bep,
672
                                        hvparams=self.hvp,
673
                                        )
674

    
675
      erem_op = opcodes.OpRemoveExport(instance_name=instance)
676

    
677
      Log("export to node %s" % enode, indent=2)
678
      Log("remove instance", indent=2)
679
      Log(import_log_msg, indent=2)
680
      Log("remove export", indent=2)
681
      self.ExecOrQueue(instance, exp_op, rem_op, imp_op, erem_op)
682

    
683
  @staticmethod
684
  def StopInstanceOp(instance):
685
    """Stop given instance."""
686
    return opcodes.OpShutdownInstance(instance_name=instance)
687

    
688
  @staticmethod
689
  def StartInstanceOp(instance):
690
    """Start given instance."""
691
    return opcodes.OpStartupInstance(instance_name=instance, force=False)
692

    
693
  @staticmethod
694
  def RenameInstanceOp(instance, instance_new):
695
    """Rename instance."""
696
    return opcodes.OpRenameInstance(instance_name=instance,
697
                                    new_name=instance_new)
698

    
699
  @_DoCheckInstances
700
  @_DoBatch(True)
701
  def BurnStopStart(self):
702
    """Stop/start the instances."""
703
    Log("Stopping and starting instances")
704
    for instance in self.instances:
705
      Log("instance %s" % instance, indent=1)
706
      op1 = self.StopInstanceOp(instance)
707
      op2 = self.StartInstanceOp(instance)
708
      self.ExecOrQueue(instance, op1, op2)
709

    
710
  @_DoBatch(False)
711
  def BurnRemove(self):
712
    """Remove the instances."""
713
    Log("Removing instances")
714
    for instance in self.to_rem:
715
      Log("instance %s" % instance, indent=1)
716
      op = opcodes.OpRemoveInstance(instance_name=instance,
717
                                    ignore_failures=True)
718
      self.ExecOrQueue(instance, op)
719

    
720
  def BurnRename(self):
721
    """Rename the instances.
722

    
723
    Note that this function will not execute in parallel, since we
724
    only have one target for rename.
725

    
726
    """
727
    Log("Renaming instances")
728
    rename = self.opts.rename
729
    for instance in self.instances:
730
      Log("instance %s" % instance, indent=1)
731
      op_stop1 = self.StopInstanceOp(instance)
732
      op_stop2 = self.StopInstanceOp(rename)
733
      op_rename1 = self.RenameInstanceOp(instance, rename)
734
      op_rename2 = self.RenameInstanceOp(rename, instance)
735
      op_start1 = self.StartInstanceOp(rename)
736
      op_start2 = self.StartInstanceOp(instance)
737
      self.ExecOp(False, op_stop1, op_rename1, op_start1)
738
      self._CheckInstanceAlive(rename)
739
      self.ExecOp(False, op_stop2, op_rename2, op_start2)
740
      self._CheckInstanceAlive(instance)
741

    
742
  @_DoCheckInstances
743
  @_DoBatch(True)
744
  def BurnReinstall(self):
745
    """Reinstall the instances."""
746
    Log("Reinstalling instances")
747
    for instance in self.instances:
748
      Log("instance %s" % instance, indent=1)
749
      op1 = self.StopInstanceOp(instance)
750
      op2 = opcodes.OpReinstallInstance(instance_name=instance)
751
      Log("reinstall without passing the OS", indent=2)
752
      op3 = opcodes.OpReinstallInstance(instance_name=instance,
753
                                        os_type=self.opts.os)
754
      Log("reinstall specifying the OS", indent=2)
755
      op4 = self.StartInstanceOp(instance)
756
      self.ExecOrQueue(instance, op1, op2, op3, op4)
757

    
758
  @_DoCheckInstances
759
  @_DoBatch(True)
760
  def BurnReboot(self):
761
    """Reboot the instances."""
762
    Log("Rebooting instances")
763
    for instance in self.instances:
764
      Log("instance %s" % instance, indent=1)
765
      ops = []
766
      for reboot_type in constants.REBOOT_TYPES:
767
        op = opcodes.OpRebootInstance(instance_name=instance,
768
                                      reboot_type=reboot_type,
769
                                      ignore_secondaries=False)
770
        Log("reboot with type '%s'" % reboot_type, indent=2)
771
        ops.append(op)
772
      self.ExecOrQueue(instance, *ops) # pylint: disable-msg=W0142
773

    
774
  @_DoCheckInstances
775
  @_DoBatch(True)
776
  def BurnActivateDisks(self):
777
    """Activate and deactivate disks of the instances."""
778
    Log("Activating/deactivating disks")
779
    for instance in self.instances:
780
      Log("instance %s" % instance, indent=1)
781
      op_start = self.StartInstanceOp(instance)
782
      op_act = opcodes.OpActivateInstanceDisks(instance_name=instance)
783
      op_deact = opcodes.OpDeactivateInstanceDisks(instance_name=instance)
784
      op_stop = self.StopInstanceOp(instance)
785
      Log("activate disks when online", indent=2)
786
      Log("activate disks when offline", indent=2)
787
      Log("deactivate disks (when offline)", indent=2)
788
      self.ExecOrQueue(instance, op_act, op_stop, op_act, op_deact, op_start)
789

    
790
  @_DoCheckInstances
791
  @_DoBatch(False)
792
  def BurnAddRemoveDisks(self):
793
    """Add and remove an extra disk for the instances."""
794
    Log("Adding and removing disks")
795
    for instance in self.instances:
796
      Log("instance %s" % instance, indent=1)
797
      op_add = opcodes.OpSetInstanceParams(\
798
        instance_name=instance,
799
        disks=[(constants.DDM_ADD, {"size": self.disk_size[0]})])
800
      op_rem = opcodes.OpSetInstanceParams(\
801
        instance_name=instance, disks=[(constants.DDM_REMOVE, {})])
802
      op_stop = self.StopInstanceOp(instance)
803
      op_start = self.StartInstanceOp(instance)
804
      Log("adding a disk", indent=2)
805
      Log("removing last disk", indent=2)
806
      self.ExecOrQueue(instance, op_add, op_stop, op_rem, op_start)
807

    
808
  @_DoBatch(False)
809
  def BurnAddRemoveNICs(self):
810
    """Add and remove an extra NIC for the instances."""
811
    Log("Adding and removing NICs")
812
    for instance in self.instances:
813
      Log("instance %s" % instance, indent=1)
814
      op_add = opcodes.OpSetInstanceParams(\
815
        instance_name=instance, nics=[(constants.DDM_ADD, {})])
816
      op_rem = opcodes.OpSetInstanceParams(\
817
        instance_name=instance, nics=[(constants.DDM_REMOVE, {})])
818
      Log("adding a NIC", indent=2)
819
      Log("removing last NIC", indent=2)
820
      self.ExecOrQueue(instance, op_add, op_rem)
821

    
822
  def _CheckInstanceAlive(self, instance):
823
    """Check if an instance is alive by doing http checks.
824

    
825
    This will try to retrieve the url on the instance /hostname.txt
826
    and check that it contains the hostname of the instance. In case
827
    we get ECONNREFUSED, we retry up to the net timeout seconds, for
828
    any other error we abort.
829

    
830
    """
831
    if not self.opts.http_check:
832
      return
833
    end_time = time.time() + self.opts.net_timeout
834
    url = None
835
    while time.time() < end_time and url is None:
836
      try:
837
        url = self.url_opener.open("http://%s/hostname.txt" % instance)
838
      except IOError:
839
        # here we can have connection refused, no route to host, etc.
840
        time.sleep(1)
841
    if url is None:
842
      raise InstanceDown(instance, "Cannot contact instance")
843
    hostname = url.read().strip()
844
    url.close()
845
    if hostname != instance:
846
      raise InstanceDown(instance, ("Hostname mismatch, expected %s, got %s" %
847
                                    (instance, hostname)))
848

    
849
  def BurninCluster(self):
850
    """Test a cluster intensively.
851

    
852
    This will create instances and then start/stop/failover them.
853
    It is safe for existing instances but could impact performance.
854

    
855
    """
856

    
857
    opts = self.opts
858

    
859
    Log("Testing global parameters")
860

    
861
    if (len(self.nodes) == 1 and
862
        opts.disk_template not in (constants.DT_DISKLESS, constants.DT_PLAIN,
863
                                   constants.DT_FILE)):
864
      Err("When one node is available/selected the disk template must"
865
          " be 'diskless', 'file' or 'plain'")
866

    
867
    has_err = True
868
    try:
869
      self.BurnCreateInstances()
870
      if opts.do_replace1 and opts.disk_template in constants.DTS_NET_MIRROR:
871
        self.BurnReplaceDisks1D8()
872
      if (opts.do_replace2 and len(self.nodes) > 2 and
873
          opts.disk_template in constants.DTS_NET_MIRROR) :
874
        self.BurnReplaceDisks2()
875

    
876
      if (opts.disk_template != constants.DT_DISKLESS and
877
          utils.any(self.disk_growth, lambda n: n > 0)):
878
        self.BurnGrowDisks()
879

    
880
      if opts.do_failover and opts.disk_template in constants.DTS_NET_MIRROR:
881
        self.BurnFailover()
882

    
883
      if opts.do_migrate and opts.disk_template == constants.DT_DRBD8:
884
        self.BurnMigrate()
885

    
886
      if (opts.do_move and len(self.nodes) > 1 and
887
          opts.disk_template in [constants.DT_PLAIN, constants.DT_FILE]):
888
        self.BurnMove()
889

    
890
      if (opts.do_importexport and
891
          opts.disk_template not in (constants.DT_DISKLESS,
892
                                     constants.DT_FILE)):
893
        self.BurnImportExport()
894

    
895
      if opts.do_reinstall:
896
        self.BurnReinstall()
897

    
898
      if opts.do_reboot:
899
        self.BurnReboot()
900

    
901
      if opts.do_addremove_disks:
902
        self.BurnAddRemoveDisks()
903

    
904
      if opts.do_addremove_nics:
905
        self.BurnAddRemoveNICs()
906

    
907
      if opts.do_activate_disks:
908
        self.BurnActivateDisks()
909

    
910
      if opts.rename:
911
        self.BurnRename()
912

    
913
      if opts.do_startstop:
914
        self.BurnStopStart()
915

    
916
      has_err = False
917
    finally:
918
      if has_err:
919
        Log("Error detected: opcode buffer follows:\n\n")
920
        Log(self.GetFeedbackBuf())
921
        Log("\n\n")
922
      if not self.opts.keep_instances:
923
        try:
924
          self.BurnRemove()
925
        except Exception, err:  # pylint: disable-msg=W0703
926
          if has_err: # already detected errors, so errors in removal
927
                      # are quite expected
928
            Log("Note: error detected during instance remove: %s" % str(err))
929
          else: # non-expected error
930
            raise
931

    
932
    return 0
933

    
934

    
935
def main():
936
  """Main function"""
937

    
938
  burner = Burner()
939
  return burner.BurninCluster()
940

    
941

    
942
if __name__ == "__main__":
943
  main()