Statistics
| Branch: | Tag: | Revision:

root / tools / burnin @ 5b349fd1

History | View | Annotate | Download (36.4 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Burnin program
23

    
24
"""
25

    
26
import sys
27
import optparse
28
import time
29
import socket
30
import urllib
31
from itertools import izip, islice, cycle
32
from cStringIO import StringIO
33

    
34
from ganeti import opcodes
35
from ganeti import constants
36
from ganeti import cli
37
from ganeti import errors
38
from ganeti import utils
39

    
40
from ganeti.confd import client as confd_client
41

    
42

    
43
USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...")
44

    
45
MAX_RETRIES = 3
46
LOG_HEADERS = {
47
  0: "- ",
48
  1: "* ",
49
  2: ""
50
  }
51

    
52
class InstanceDown(Exception):
53
  """The checked instance was not up"""
54

    
55

    
56
class BurninFailure(Exception):
57
  """Failure detected during burning"""
58

    
59

    
60
def Usage():
61
  """Shows program usage information and exits the program."""
62

    
63
  print >> sys.stderr, "Usage:"
64
  print >> sys.stderr, USAGE
65
  sys.exit(2)
66

    
67

    
68
def Log(msg, *args, **kwargs):
69
  """Simple function that prints out its argument.
70

    
71
  """
72
  if args:
73
    msg = msg % args
74
  indent = kwargs.get('indent', 0)
75
  sys.stdout.write("%*s%s%s\n" % (2*indent, "",
76
                                  LOG_HEADERS.get(indent, "  "), msg))
77
  sys.stdout.flush()
78

    
79

    
80
def Err(msg, exit_code=1):
81
  """Simple error logging that prints to stderr.
82

    
83
  """
84
  sys.stderr.write(msg + "\n")
85
  sys.stderr.flush()
86
  sys.exit(exit_code)
87

    
88

    
89
class SimpleOpener(urllib.FancyURLopener):
90
  """A simple url opener"""
91
  # pylint: disable-msg=W0221
92

    
93
  def prompt_user_passwd(self, host, realm, clear_cache=0):
94
    """No-interaction version of prompt_user_passwd."""
95
    # we follow parent class' API
96
    # pylint: disable-msg=W0613
97
    return None, None
98

    
99
  def http_error_default(self, url, fp, errcode, errmsg, headers):
100
    """Custom error handling"""
101
    # make sure sockets are not left in CLOSE_WAIT, this is similar
102
    # but with a different exception to the BasicURLOpener class
103
    _ = fp.read() # throw away data
104
    fp.close()
105
    raise InstanceDown("HTTP error returned: code %s, msg %s" %
106
                       (errcode, errmsg))
107

    
108

    
109
OPTIONS = [
110
  cli.cli_option("-o", "--os", dest="os", default=None,
111
                 help="OS to use during burnin",
112
                 metavar="<OS>",
113
                 completion_suggest=cli.OPT_COMPL_ONE_OS),
114
  cli.cli_option("--disk-size", dest="disk_size",
115
                 help="Disk size (determines disk count)",
116
                 default="128m", type="string", metavar="<size,size,...>",
117
                 completion_suggest=("128M 512M 1G 4G 1G,256M"
118
                                     " 4G,1G,1G 10G").split()),
119
  cli.cli_option("--disk-growth", dest="disk_growth", help="Disk growth",
120
                 default="128m", type="string", metavar="<size,size,...>"),
121
  cli.cli_option("--mem-size", dest="mem_size", help="Memory size",
122
                 default=128, type="unit", metavar="<size>",
123
                 completion_suggest=("128M 256M 512M 1G 4G 8G"
124
                                     " 12G 16G").split()),
125
  cli.DEBUG_OPT,
126
  cli.VERBOSE_OPT,
127
  cli.NOIPCHECK_OPT,
128
  cli.NONAMECHECK_OPT,
129
  cli.EARLY_RELEASE_OPT,
130
  cli.cli_option("--no-replace1", dest="do_replace1",
131
                 help="Skip disk replacement with the same secondary",
132
                 action="store_false", default=True),
133
  cli.cli_option("--no-replace2", dest="do_replace2",
134
                 help="Skip disk replacement with a different secondary",
135
                 action="store_false", default=True),
136
  cli.cli_option("--no-failover", dest="do_failover",
137
                 help="Skip instance failovers", action="store_false",
138
                 default=True),
139
  cli.cli_option("--no-migrate", dest="do_migrate",
140
                 help="Skip instance live migration",
141
                 action="store_false", default=True),
142
  cli.cli_option("--no-move", dest="do_move",
143
                 help="Skip instance moves", action="store_false",
144
                 default=True),
145
  cli.cli_option("--no-importexport", dest="do_importexport",
146
                 help="Skip instance export/import", action="store_false",
147
                 default=True),
148
  cli.cli_option("--no-startstop", dest="do_startstop",
149
                 help="Skip instance stop/start", action="store_false",
150
                 default=True),
151
  cli.cli_option("--no-reinstall", dest="do_reinstall",
152
                 help="Skip instance reinstall", action="store_false",
153
                 default=True),
154
  cli.cli_option("--no-reboot", dest="do_reboot",
155
                 help="Skip instance reboot", action="store_false",
156
                 default=True),
157
  cli.cli_option("--no-activate-disks", dest="do_activate_disks",
158
                 help="Skip disk activation/deactivation",
159
                 action="store_false", default=True),
160
  cli.cli_option("--no-add-disks", dest="do_addremove_disks",
161
                 help="Skip disk addition/removal",
162
                 action="store_false", default=True),
163
  cli.cli_option("--no-add-nics", dest="do_addremove_nics",
164
                 help="Skip NIC addition/removal",
165
                 action="store_false", default=True),
166
  cli.cli_option("--no-nics", dest="nics",
167
                 help="No network interfaces", action="store_const",
168
                 const=[], default=[{}]),
169
  cli.cli_option("--no-confd", dest="do_confd_tests",
170
                 help="Skip confd queries",
171
                 action="store_false", default=True),
172
  cli.cli_option("--rename", dest="rename", default=None,
173
                 help=("Give one unused instance name which is taken"
174
                       " to start the renaming sequence"),
175
                 metavar="<instance_name>"),
176
  cli.cli_option("-t", "--disk-template", dest="disk_template",
177
                 choices=list(constants.DISK_TEMPLATES),
178
                 default=constants.DT_DRBD8,
179
                 help="Disk template (diskless, file, plain or drbd) [drbd]"),
180
  cli.cli_option("-n", "--nodes", dest="nodes", default="",
181
                 help=("Comma separated list of nodes to perform"
182
                       " the burnin on (defaults to all nodes)"),
183
                 completion_suggest=cli.OPT_COMPL_MANY_NODES),
184
  cli.cli_option("-I", "--iallocator", dest="iallocator",
185
                 default=None, type="string",
186
                 help=("Perform the allocation using an iallocator"
187
                       " instead of fixed node spread (node restrictions no"
188
                       " longer apply, therefore -n/--nodes must not be"
189
                       " used"),
190
                 completion_suggest=cli.OPT_COMPL_ONE_IALLOCATOR),
191
  cli.cli_option("-p", "--parallel", default=False, action="store_true",
192
                 dest="parallel",
193
                 help=("Enable parallelization of some operations in"
194
                       " order to speed burnin or to test granular locking")),
195
  cli.cli_option("--net-timeout", default=15, type="int",
196
                 dest="net_timeout",
197
                 help=("The instance check network timeout in seconds"
198
                       " (defaults to 15 seconds)"),
199
                 completion_suggest="15 60 300 900".split()),
200
  cli.cli_option("-C", "--http-check", default=False, action="store_true",
201
                 dest="http_check",
202
                 help=("Enable checking of instance status via http,"
203
                       " looking for /hostname.txt that should contain the"
204
                       " name of the instance")),
205
  cli.cli_option("-K", "--keep-instances", default=False,
206
                 action="store_true",
207
                 dest="keep_instances",
208
                 help=("Leave instances on the cluster after burnin,"
209
                       " for investigation in case of errors or simply"
210
                       " to use them")),
211
  ]
212

    
213
# Mainly used for bash completion
214
ARGUMENTS = [cli.ArgInstance(min=1)]
215

    
216

    
217
def _DoCheckInstances(fn):
218
  """Decorator for checking instances.
219

    
220
  """
221
  def wrapper(self, *args, **kwargs):
222
    val = fn(self, *args, **kwargs)
223
    for instance in self.instances:
224
      self._CheckInstanceAlive(instance) # pylint: disable-msg=W0212
225
    return val
226

    
227
  return wrapper
228

    
229

    
230
def _DoBatch(retry):
231
  """Decorator for possible batch operations.
232

    
233
  Must come after the _DoCheckInstances decorator (if any).
234

    
235
  @param retry: whether this is a retryable batch, will be
236
      passed to StartBatch
237

    
238
  """
239
  def wrap(fn):
240
    def batched(self, *args, **kwargs):
241
      self.StartBatch(retry)
242
      val = fn(self, *args, **kwargs)
243
      self.CommitQueue()
244
      return val
245
    return batched
246

    
247
  return wrap
248

    
249

    
250
class Burner(object):
251
  """Burner class."""
252

    
253
  def __init__(self):
254
    """Constructor."""
255
    utils.SetupLogging(constants.LOG_BURNIN, debug=False, stderr_logging=True)
256
    self.url_opener = SimpleOpener()
257
    self._feed_buf = StringIO()
258
    self.nodes = []
259
    self.instances = []
260
    self.to_rem = []
261
    self.queued_ops = []
262
    self.opts = None
263
    self.queue_retry = False
264
    self.disk_count = self.disk_growth = self.disk_size = None
265
    self.hvp = self.bep = None
266
    self.ParseOptions()
267
    self.cl = cli.GetClient()
268
    self.GetState()
269

    
270
  def ClearFeedbackBuf(self):
271
    """Clear the feedback buffer."""
272
    self._feed_buf.truncate(0)
273

    
274
  def GetFeedbackBuf(self):
275
    """Return the contents of the buffer."""
276
    return self._feed_buf.getvalue()
277

    
278
  def Feedback(self, msg):
279
    """Acumulate feedback in our buffer."""
280
    formatted_msg = "%s %s" % (time.ctime(utils.MergeTime(msg[0])), msg[2])
281
    self._feed_buf.write(formatted_msg + "\n")
282
    if self.opts.verbose:
283
      Log(formatted_msg, indent=3)
284

    
285
  def MaybeRetry(self, retry_count, msg, fn, *args):
286
    """Possibly retry a given function execution.
287

    
288
    @type retry_count: int
289
    @param retry_count: retry counter:
290
        - 0: non-retryable action
291
        - 1: last retry for a retryable action
292
        - MAX_RETRIES: original try for a retryable action
293
    @type msg: str
294
    @param msg: the kind of the operation
295
    @type fn: callable
296
    @param fn: the function to be called
297

    
298
    """
299
    try:
300
      val = fn(*args)
301
      if retry_count > 0 and retry_count < MAX_RETRIES:
302
        Log("Idempotent %s succeeded after %d retries",
303
            msg, MAX_RETRIES - retry_count)
304
      return val
305
    except Exception, err: # pylint: disable-msg=W0703
306
      if retry_count == 0:
307
        Log("Non-idempotent %s failed, aborting", msg)
308
        raise
309
      elif retry_count == 1:
310
        Log("Idempotent %s repeated failure, aborting", msg)
311
        raise
312
      else:
313
        Log("Idempotent %s failed, retry #%d/%d: %s",
314
            msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err)
315
        self.MaybeRetry(retry_count - 1, msg, fn, *args)
316

    
317
  def _SetDebug(self, ops):
318
    """Set the debug value on the given opcodes"""
319
    for op in ops:
320
      op.debug_level = self.opts.debug
321

    
322
  def _ExecOp(self, *ops):
323
    """Execute one or more opcodes and manage the exec buffer.
324

    
325
    @result: if only opcode has been passed, we return its result;
326
        otherwise we return the list of results
327

    
328
    """
329
    job_id = cli.SendJob(ops, cl=self.cl)
330
    results = cli.PollJob(job_id, cl=self.cl, feedback_fn=self.Feedback)
331
    if len(ops) == 1:
332
      return results[0]
333
    else:
334
      return results
335

    
336
  def ExecOp(self, retry, *ops):
337
    """Execute one or more opcodes and manage the exec buffer.
338

    
339
    @result: if only opcode has been passed, we return its result;
340
        otherwise we return the list of results
341

    
342
    """
343
    if retry:
344
      rval = MAX_RETRIES
345
    else:
346
      rval = 0
347
    self._SetDebug(ops)
348
    return self.MaybeRetry(rval, "opcode", self._ExecOp, *ops)
349

    
350
  def ExecOrQueue(self, name, *ops):
351
    """Execute an opcode and manage the exec buffer."""
352
    if self.opts.parallel:
353
      self._SetDebug(ops)
354
      self.queued_ops.append((ops, name))
355
    else:
356
      return self.ExecOp(self.queue_retry, *ops)
357

    
358
  def StartBatch(self, retry):
359
    """Start a new batch of jobs.
360

    
361
    @param retry: whether this is a retryable batch
362

    
363
    """
364
    self.queued_ops = []
365
    self.queue_retry = retry
366

    
367
  def CommitQueue(self):
368
    """Execute all submitted opcodes in case of parallel burnin"""
369
    if not self.opts.parallel:
370
      return
371

    
372
    if self.queue_retry:
373
      rval = MAX_RETRIES
374
    else:
375
      rval = 0
376

    
377
    try:
378
      results = self.MaybeRetry(rval, "jobset", self.ExecJobSet,
379
                                self.queued_ops)
380
    finally:
381
      self.queued_ops = []
382
    return results
383

    
384
  def ExecJobSet(self, jobs):
385
    """Execute a set of jobs and return once all are done.
386

    
387
    The method will return the list of results, if all jobs are
388
    successful. Otherwise, OpExecError will be raised from within
389
    cli.py.
390

    
391
    """
392
    self.ClearFeedbackBuf()
393
    jex = cli.JobExecutor(cl=self.cl, feedback_fn=self.Feedback)
394
    for ops, name in jobs:
395
      jex.QueueJob(name, *ops) # pylint: disable-msg=W0142
396
    try:
397
      results = jex.GetResults()
398
    except Exception, err: # pylint: disable-msg=W0703
399
      Log("Jobs failed: %s", err)
400
      raise BurninFailure()
401

    
402
    if utils.any(results, lambda x: not x[0]):
403
      raise BurninFailure()
404

    
405
    return [i[1] for i in results]
406

    
407
  def ParseOptions(self):
408
    """Parses the command line options.
409

    
410
    In case of command line errors, it will show the usage and exit the
411
    program.
412

    
413
    """
414
    parser = optparse.OptionParser(usage="\n%s" % USAGE,
415
                                   version=("%%prog (ganeti) %s" %
416
                                            constants.RELEASE_VERSION),
417
                                   option_list=OPTIONS)
418

    
419
    options, args = parser.parse_args()
420
    if len(args) < 1 or options.os is None:
421
      Usage()
422

    
423
    supported_disk_templates = (constants.DT_DISKLESS,
424
                                constants.DT_FILE,
425
                                constants.DT_PLAIN,
426
                                constants.DT_DRBD8)
427
    if options.disk_template not in supported_disk_templates:
428
      Err("Unknown disk template '%s'" % options.disk_template)
429

    
430
    if options.disk_template == constants.DT_DISKLESS:
431
      disk_size = disk_growth = []
432
      options.do_addremove_disks = False
433
    else:
434
      disk_size = [utils.ParseUnit(v) for v in options.disk_size.split(",")]
435
      disk_growth = [utils.ParseUnit(v)
436
                     for v in options.disk_growth.split(",")]
437
      if len(disk_growth) != len(disk_size):
438
        Err("Wrong disk sizes/growth combination")
439
    if ((disk_size and options.disk_template == constants.DT_DISKLESS) or
440
        (not disk_size and options.disk_template != constants.DT_DISKLESS)):
441
      Err("Wrong disk count/disk template combination")
442

    
443
    self.disk_size = disk_size
444
    self.disk_growth = disk_growth
445
    self.disk_count = len(disk_size)
446

    
447
    if options.nodes and options.iallocator:
448
      Err("Give either the nodes option or the iallocator option, not both")
449

    
450
    if options.http_check and not options.name_check:
451
      Err("Can't enable HTTP checks without name checks")
452

    
453
    self.opts = options
454
    self.instances = args
455
    self.bep = {
456
      constants.BE_MEMORY: options.mem_size,
457
      constants.BE_VCPUS: 1,
458
      }
459
    self.hvp = {}
460

    
461
    socket.setdefaulttimeout(options.net_timeout)
462

    
463
  def GetState(self):
464
    """Read the cluster state from the master daemon."""
465
    if self.opts.nodes:
466
      names = self.opts.nodes.split(",")
467
    else:
468
      names = []
469
    try:
470
      op = opcodes.OpQueryNodes(output_fields=["name", "offline", "drained"],
471
                                names=names, use_locking=True)
472
      result = self.ExecOp(True, op)
473
    except errors.GenericError, err:
474
      err_code, msg = cli.FormatError(err)
475
      Err(msg, exit_code=err_code)
476
    self.nodes = [data[0] for data in result if not (data[1] or data[2])]
477

    
478
    op_diagnose = opcodes.OpDiagnoseOS(output_fields=["name", "valid",
479
                                                      "variants"], names=[])
480
    result = self.ExecOp(True, op_diagnose)
481

    
482
    if not result:
483
      Err("Can't get the OS list")
484

    
485
    found = False
486
    for (name, valid, variants) in result:
487
      if valid and self.opts.os in cli.CalculateOSNames(name, variants):
488
        found = True
489
        break
490

    
491
    if not found:
492
      Err("OS '%s' not found" % self.opts.os)
493

    
494
    cluster_info = self.cl.QueryClusterInfo()
495
    self.cluster_info = cluster_info
496
    if not self.cluster_info:
497
      Err("Can't get cluster info")
498

    
499
    default_nic_params = self.cluster_info["nicparams"][constants.PP_DEFAULT]
500
    self.cluster_default_nicparams = default_nic_params
501

    
502
  @_DoCheckInstances
503
  @_DoBatch(False)
504
  def BurnCreateInstances(self):
505
    """Create the given instances.
506

    
507
    """
508
    self.to_rem = []
509
    mytor = izip(cycle(self.nodes),
510
                 islice(cycle(self.nodes), 1, None),
511
                 self.instances)
512

    
513
    Log("Creating instances")
514
    for pnode, snode, instance in mytor:
515
      Log("instance %s", instance, indent=1)
516
      if self.opts.iallocator:
517
        pnode = snode = None
518
        msg = "with iallocator %s" % self.opts.iallocator
519
      elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
520
        snode = None
521
        msg = "on %s" % pnode
522
      else:
523
        msg = "on %s, %s" % (pnode, snode)
524

    
525
      Log(msg, indent=2)
526

    
527
      op = opcodes.OpCreateInstance(instance_name=instance,
528
                                    disks = [ {"size": size}
529
                                              for size in self.disk_size],
530
                                    disk_template=self.opts.disk_template,
531
                                    nics=self.opts.nics,
532
                                    mode=constants.INSTANCE_CREATE,
533
                                    os_type=self.opts.os,
534
                                    pnode=pnode,
535
                                    snode=snode,
536
                                    start=True,
537
                                    ip_check=self.opts.ip_check,
538
                                    name_check=self.opts.name_check,
539
                                    wait_for_sync=True,
540
                                    file_driver="loop",
541
                                    file_storage_dir=None,
542
                                    iallocator=self.opts.iallocator,
543
                                    beparams=self.bep,
544
                                    hvparams=self.hvp,
545
                                    )
546

    
547
      self.ExecOrQueue(instance, op)
548
      self.to_rem.append(instance)
549

    
550
  @_DoBatch(False)
551
  def BurnGrowDisks(self):
552
    """Grow both the os and the swap disks by the requested amount, if any."""
553
    Log("Growing disks")
554
    for instance in self.instances:
555
      Log("instance %s", instance, indent=1)
556
      for idx, growth in enumerate(self.disk_growth):
557
        if growth > 0:
558
          op = opcodes.OpGrowDisk(instance_name=instance, disk=idx,
559
                                  amount=growth, wait_for_sync=True)
560
          Log("increase disk/%s by %s MB", idx, growth, indent=2)
561
          self.ExecOrQueue(instance, op)
562

    
563
  @_DoBatch(True)
564
  def BurnReplaceDisks1D8(self):
565
    """Replace disks on primary and secondary for drbd8."""
566
    Log("Replacing disks on the same nodes")
567
    for instance in self.instances:
568
      Log("instance %s", instance, indent=1)
569
      ops = []
570
      for mode in constants.REPLACE_DISK_SEC, constants.REPLACE_DISK_PRI:
571
        op = opcodes.OpReplaceDisks(instance_name=instance,
572
                                    mode=mode,
573
                                    disks=[i for i in range(self.disk_count)],
574
                                    early_release=self.opts.early_release)
575
        Log("run %s", mode, indent=2)
576
        ops.append(op)
577
      self.ExecOrQueue(instance, *ops) # pylint: disable-msg=W0142
578

    
579
  @_DoBatch(True)
580
  def BurnReplaceDisks2(self):
581
    """Replace secondary node."""
582
    Log("Changing the secondary node")
583
    mode = constants.REPLACE_DISK_CHG
584

    
585
    mytor = izip(islice(cycle(self.nodes), 2, None),
586
                 self.instances)
587
    for tnode, instance in mytor:
588
      Log("instance %s", instance, indent=1)
589
      if self.opts.iallocator:
590
        tnode = None
591
        msg = "with iallocator %s" % self.opts.iallocator
592
      else:
593
        msg = tnode
594
      op = opcodes.OpReplaceDisks(instance_name=instance,
595
                                  mode=mode,
596
                                  remote_node=tnode,
597
                                  iallocator=self.opts.iallocator,
598
                                  disks=[],
599
                                  early_release=self.opts.early_release)
600
      Log("run %s %s", mode, msg, indent=2)
601
      self.ExecOrQueue(instance, op)
602

    
603
  @_DoCheckInstances
604
  @_DoBatch(False)
605
  def BurnFailover(self):
606
    """Failover the instances."""
607
    Log("Failing over instances")
608
    for instance in self.instances:
609
      Log("instance %s", instance, indent=1)
610
      op = opcodes.OpFailoverInstance(instance_name=instance,
611
                                      ignore_consistency=False)
612
      self.ExecOrQueue(instance, op)
613

    
614
  @_DoCheckInstances
615
  @_DoBatch(False)
616
  def BurnMove(self):
617
    """Move the instances."""
618
    Log("Moving instances")
619
    mytor = izip(islice(cycle(self.nodes), 1, None),
620
                 self.instances)
621
    for tnode, instance in mytor:
622
      Log("instance %s", instance, indent=1)
623
      op = opcodes.OpMoveInstance(instance_name=instance,
624
                                  target_node=tnode)
625
      self.ExecOrQueue(instance, op)
626

    
627
  @_DoBatch(False)
628
  def BurnMigrate(self):
629
    """Migrate the instances."""
630
    Log("Migrating instances")
631
    for instance in self.instances:
632
      Log("instance %s", instance, indent=1)
633
      op1 = opcodes.OpMigrateInstance(instance_name=instance, live=True,
634
                                      cleanup=False)
635

    
636
      op2 = opcodes.OpMigrateInstance(instance_name=instance, live=True,
637
                                      cleanup=True)
638
      Log("migration and migration cleanup", indent=2)
639
      self.ExecOrQueue(instance, op1, op2)
640

    
641
  @_DoCheckInstances
642
  @_DoBatch(False)
643
  def BurnImportExport(self):
644
    """Export the instance, delete it, and import it back.
645

    
646
    """
647
    Log("Exporting and re-importing instances")
648
    mytor = izip(cycle(self.nodes),
649
                 islice(cycle(self.nodes), 1, None),
650
                 islice(cycle(self.nodes), 2, None),
651
                 self.instances)
652

    
653
    for pnode, snode, enode, instance in mytor:
654
      Log("instance %s", instance, indent=1)
655
      # read the full name of the instance
656
      nam_op = opcodes.OpQueryInstances(output_fields=["name"],
657
                                        names=[instance], use_locking=True)
658
      full_name = self.ExecOp(False, nam_op)[0][0]
659

    
660
      if self.opts.iallocator:
661
        pnode = snode = None
662
        import_log_msg = ("import from %s"
663
                          " with iallocator %s" %
664
                          (enode, self.opts.iallocator))
665
      elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
666
        snode = None
667
        import_log_msg = ("import from %s to %s" %
668
                          (enode, pnode))
669
      else:
670
        import_log_msg = ("import from %s to %s, %s" %
671
                          (enode, pnode, snode))
672

    
673
      exp_op = opcodes.OpExportInstance(instance_name=instance,
674
                                           target_node=enode,
675
                                           shutdown=True)
676
      rem_op = opcodes.OpRemoveInstance(instance_name=instance,
677
                                        ignore_failures=True)
678
      imp_dir = utils.PathJoin(constants.EXPORT_DIR, full_name)
679
      imp_op = opcodes.OpCreateInstance(instance_name=instance,
680
                                        disks = [ {"size": size}
681
                                                  for size in self.disk_size],
682
                                        disk_template=self.opts.disk_template,
683
                                        nics=self.opts.nics,
684
                                        mode=constants.INSTANCE_IMPORT,
685
                                        src_node=enode,
686
                                        src_path=imp_dir,
687
                                        pnode=pnode,
688
                                        snode=snode,
689
                                        start=True,
690
                                        ip_check=self.opts.ip_check,
691
                                        name_check=self.opts.name_check,
692
                                        wait_for_sync=True,
693
                                        file_storage_dir=None,
694
                                        file_driver="loop",
695
                                        iallocator=self.opts.iallocator,
696
                                        beparams=self.bep,
697
                                        hvparams=self.hvp,
698
                                        )
699

    
700
      erem_op = opcodes.OpRemoveExport(instance_name=instance)
701

    
702
      Log("export to node %s", enode, indent=2)
703
      Log("remove instance", indent=2)
704
      Log(import_log_msg, indent=2)
705
      Log("remove export", indent=2)
706
      self.ExecOrQueue(instance, exp_op, rem_op, imp_op, erem_op)
707

    
708
  @staticmethod
709
  def StopInstanceOp(instance):
710
    """Stop given instance."""
711
    return opcodes.OpShutdownInstance(instance_name=instance)
712

    
713
  @staticmethod
714
  def StartInstanceOp(instance):
715
    """Start given instance."""
716
    return opcodes.OpStartupInstance(instance_name=instance, force=False)
717

    
718
  @staticmethod
719
  def RenameInstanceOp(instance, instance_new):
720
    """Rename instance."""
721
    return opcodes.OpRenameInstance(instance_name=instance,
722
                                    new_name=instance_new)
723

    
724
  @_DoCheckInstances
725
  @_DoBatch(True)
726
  def BurnStopStart(self):
727
    """Stop/start the instances."""
728
    Log("Stopping and starting instances")
729
    for instance in self.instances:
730
      Log("instance %s", instance, indent=1)
731
      op1 = self.StopInstanceOp(instance)
732
      op2 = self.StartInstanceOp(instance)
733
      self.ExecOrQueue(instance, op1, op2)
734

    
735
  @_DoBatch(False)
736
  def BurnRemove(self):
737
    """Remove the instances."""
738
    Log("Removing instances")
739
    for instance in self.to_rem:
740
      Log("instance %s", instance, indent=1)
741
      op = opcodes.OpRemoveInstance(instance_name=instance,
742
                                    ignore_failures=True)
743
      self.ExecOrQueue(instance, op)
744

    
745
  def BurnRename(self):
746
    """Rename the instances.
747

    
748
    Note that this function will not execute in parallel, since we
749
    only have one target for rename.
750

    
751
    """
752
    Log("Renaming instances")
753
    rename = self.opts.rename
754
    for instance in self.instances:
755
      Log("instance %s", instance, indent=1)
756
      op_stop1 = self.StopInstanceOp(instance)
757
      op_stop2 = self.StopInstanceOp(rename)
758
      op_rename1 = self.RenameInstanceOp(instance, rename)
759
      op_rename2 = self.RenameInstanceOp(rename, instance)
760
      op_start1 = self.StartInstanceOp(rename)
761
      op_start2 = self.StartInstanceOp(instance)
762
      self.ExecOp(False, op_stop1, op_rename1, op_start1)
763
      self._CheckInstanceAlive(rename)
764
      self.ExecOp(False, op_stop2, op_rename2, op_start2)
765
      self._CheckInstanceAlive(instance)
766

    
767
  @_DoCheckInstances
768
  @_DoBatch(True)
769
  def BurnReinstall(self):
770
    """Reinstall the instances."""
771
    Log("Reinstalling instances")
772
    for instance in self.instances:
773
      Log("instance %s", instance, indent=1)
774
      op1 = self.StopInstanceOp(instance)
775
      op2 = opcodes.OpReinstallInstance(instance_name=instance)
776
      Log("reinstall without passing the OS", indent=2)
777
      op3 = opcodes.OpReinstallInstance(instance_name=instance,
778
                                        os_type=self.opts.os)
779
      Log("reinstall specifying the OS", indent=2)
780
      op4 = self.StartInstanceOp(instance)
781
      self.ExecOrQueue(instance, op1, op2, op3, op4)
782

    
783
  @_DoCheckInstances
784
  @_DoBatch(True)
785
  def BurnReboot(self):
786
    """Reboot the instances."""
787
    Log("Rebooting instances")
788
    for instance in self.instances:
789
      Log("instance %s", instance, indent=1)
790
      ops = []
791
      for reboot_type in constants.REBOOT_TYPES:
792
        op = opcodes.OpRebootInstance(instance_name=instance,
793
                                      reboot_type=reboot_type,
794
                                      ignore_secondaries=False)
795
        Log("reboot with type '%s'", reboot_type, indent=2)
796
        ops.append(op)
797
      self.ExecOrQueue(instance, *ops) # pylint: disable-msg=W0142
798

    
799
  @_DoCheckInstances
800
  @_DoBatch(True)
801
  def BurnActivateDisks(self):
802
    """Activate and deactivate disks of the instances."""
803
    Log("Activating/deactivating disks")
804
    for instance in self.instances:
805
      Log("instance %s", instance, indent=1)
806
      op_start = self.StartInstanceOp(instance)
807
      op_act = opcodes.OpActivateInstanceDisks(instance_name=instance)
808
      op_deact = opcodes.OpDeactivateInstanceDisks(instance_name=instance)
809
      op_stop = self.StopInstanceOp(instance)
810
      Log("activate disks when online", indent=2)
811
      Log("activate disks when offline", indent=2)
812
      Log("deactivate disks (when offline)", indent=2)
813
      self.ExecOrQueue(instance, op_act, op_stop, op_act, op_deact, op_start)
814

    
815
  @_DoCheckInstances
816
  @_DoBatch(False)
817
  def BurnAddRemoveDisks(self):
818
    """Add and remove an extra disk for the instances."""
819
    Log("Adding and removing disks")
820
    for instance in self.instances:
821
      Log("instance %s", instance, indent=1)
822
      op_add = opcodes.OpSetInstanceParams(\
823
        instance_name=instance,
824
        disks=[(constants.DDM_ADD, {"size": self.disk_size[0]})])
825
      op_rem = opcodes.OpSetInstanceParams(\
826
        instance_name=instance, disks=[(constants.DDM_REMOVE, {})])
827
      op_stop = self.StopInstanceOp(instance)
828
      op_start = self.StartInstanceOp(instance)
829
      Log("adding a disk", indent=2)
830
      Log("removing last disk", indent=2)
831
      self.ExecOrQueue(instance, op_add, op_stop, op_rem, op_start)
832

    
833
  @_DoBatch(False)
834
  def BurnAddRemoveNICs(self):
835
    """Add and remove an extra NIC for the instances."""
836
    Log("Adding and removing NICs")
837
    for instance in self.instances:
838
      Log("instance %s", instance, indent=1)
839
      op_add = opcodes.OpSetInstanceParams(\
840
        instance_name=instance, nics=[(constants.DDM_ADD, {})])
841
      op_rem = opcodes.OpSetInstanceParams(\
842
        instance_name=instance, nics=[(constants.DDM_REMOVE, {})])
843
      Log("adding a NIC", indent=2)
844
      Log("removing last NIC", indent=2)
845
      self.ExecOrQueue(instance, op_add, op_rem)
846

    
847
  def ConfdCallback(self, reply):
848
    """Callback for confd queries"""
849
    if reply.type == confd_client.UPCALL_REPLY:
850
      if reply.server_reply.status != constants.CONFD_REPL_STATUS_OK:
851
        Err("Query %s gave non-ok status %s: %s" % (reply.orig_request,
852
                                                    reply.server_reply.status,
853
                                                    reply.server_reply))
854
      if reply.orig_request.type == constants.CONFD_REQ_PING:
855
        Log("Ping: OK", indent=1)
856
      elif reply.orig_request.type == constants.CONFD_REQ_CLUSTER_MASTER:
857
        if reply.server_reply.answer == self.cluster_info["master"]:
858
          Log("Master: OK", indent=1)
859
        else:
860
          Err("Master: wrong: %s" % reply.server_reply.answer)
861
      elif reply.orig_request.type == constants.CONFD_REQ_NODE_ROLE_BYNAME:
862
        if reply.server_reply.answer == constants.CONFD_NODE_ROLE_MASTER:
863
          Log("Node role for master: OK", indent=1)
864
        else:
865
          Err("Node role for master: wrong: %s" % reply.server_reply.answer)
866

    
867
  def DoConfdRequestReply(self, req):
868
    self.confd_counting_callback.RegisterQuery(req.rsalt)
869
    self.confd_client.SendRequest(req, async=False)
870
    while not self.confd_counting_callback.AllAnswered():
871
      if not self.confd_client.ReceiveReply():
872
        Err("Did not receive all expected confd replies")
873
        break
874

    
875
  def BurnConfd(self):
876
    """Run confd queries for our instances.
877

    
878
    The following confd queries are tested:
879
    - CONFD_REQ_PING: simple ping
880
    - CONFD_REQ_CLUSTER_MASTER: cluster master
881
    - CONFD_REQ_NODE_ROLE_BYNAME: node role, for the master
882

    
883
    """
884
    Log("Checking confd results")
885

    
886
    filter_callback = confd_client.ConfdFilterCallback(self.ConfdCallback)
887
    counting_callback = confd_client.ConfdCountingCallback(filter_callback)
888
    self.confd_counting_callback = counting_callback
889

    
890
    self.confd_client = confd_client.GetConfdClient(counting_callback)
891

    
892
    req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
893
    self.DoConfdRequestReply(req)
894

    
895
    req = confd_client.ConfdClientRequest(
896
      type=constants.CONFD_REQ_CLUSTER_MASTER)
897
    self.DoConfdRequestReply(req)
898

    
899
    req = confd_client.ConfdClientRequest(
900
        type=constants.CONFD_REQ_NODE_ROLE_BYNAME,
901
        query=self.cluster_info["master"])
902
    self.DoConfdRequestReply(req)
903

    
904
  def _CheckInstanceAlive(self, instance):
905
    """Check if an instance is alive by doing http checks.
906

    
907
    This will try to retrieve the url on the instance /hostname.txt
908
    and check that it contains the hostname of the instance. In case
909
    we get ECONNREFUSED, we retry up to the net timeout seconds, for
910
    any other error we abort.
911

    
912
    """
913
    if not self.opts.http_check:
914
      return
915
    end_time = time.time() + self.opts.net_timeout
916
    url = None
917
    while time.time() < end_time and url is None:
918
      try:
919
        url = self.url_opener.open("http://%s/hostname.txt" % instance)
920
      except IOError:
921
        # here we can have connection refused, no route to host, etc.
922
        time.sleep(1)
923
    if url is None:
924
      raise InstanceDown(instance, "Cannot contact instance")
925
    hostname = url.read().strip()
926
    url.close()
927
    if hostname != instance:
928
      raise InstanceDown(instance, ("Hostname mismatch, expected %s, got %s" %
929
                                    (instance, hostname)))
930

    
931
  def BurninCluster(self):
932
    """Test a cluster intensively.
933

    
934
    This will create instances and then start/stop/failover them.
935
    It is safe for existing instances but could impact performance.
936

    
937
    """
938

    
939
    opts = self.opts
940

    
941
    Log("Testing global parameters")
942

    
943
    if (len(self.nodes) == 1 and
944
        opts.disk_template not in (constants.DT_DISKLESS, constants.DT_PLAIN,
945
                                   constants.DT_FILE)):
946
      Err("When one node is available/selected the disk template must"
947
          " be 'diskless', 'file' or 'plain'")
948

    
949
    has_err = True
950
    try:
951
      self.BurnCreateInstances()
952
      if opts.do_replace1 and opts.disk_template in constants.DTS_NET_MIRROR:
953
        self.BurnReplaceDisks1D8()
954
      if (opts.do_replace2 and len(self.nodes) > 2 and
955
          opts.disk_template in constants.DTS_NET_MIRROR) :
956
        self.BurnReplaceDisks2()
957

    
958
      if (opts.disk_template in constants.DTS_GROWABLE and
959
          utils.any(self.disk_growth, lambda n: n > 0)):
960
        self.BurnGrowDisks()
961

    
962
      if opts.do_failover and opts.disk_template in constants.DTS_NET_MIRROR:
963
        self.BurnFailover()
964

    
965
      if opts.do_migrate and opts.disk_template == constants.DT_DRBD8:
966
        self.BurnMigrate()
967

    
968
      if (opts.do_move and len(self.nodes) > 1 and
969
          opts.disk_template in [constants.DT_PLAIN, constants.DT_FILE]):
970
        self.BurnMove()
971

    
972
      if (opts.do_importexport and
973
          opts.disk_template not in (constants.DT_DISKLESS,
974
                                     constants.DT_FILE)):
975
        self.BurnImportExport()
976

    
977
      if opts.do_reinstall:
978
        self.BurnReinstall()
979

    
980
      if opts.do_reboot:
981
        self.BurnReboot()
982

    
983
      if opts.do_addremove_disks:
984
        self.BurnAddRemoveDisks()
985

    
986
      default_nic_mode = self.cluster_default_nicparams[constants.NIC_MODE]
987
      # Don't add/remove nics in routed mode, as we would need an ip to add
988
      # them with
989
      if opts.do_addremove_nics:
990
        if default_nic_mode == constants.NIC_MODE_BRIDGED:
991
          self.BurnAddRemoveNICs()
992
        else:
993
          Log("Skipping nic add/remove as the cluster is not in bridged mode")
994

    
995
      if opts.do_activate_disks:
996
        self.BurnActivateDisks()
997

    
998
      if opts.rename:
999
        self.BurnRename()
1000

    
1001
      if opts.do_confd_tests:
1002
        self.BurnConfd()
1003

    
1004
      if opts.do_startstop:
1005
        self.BurnStopStart()
1006

    
1007
      has_err = False
1008
    finally:
1009
      if has_err:
1010
        Log("Error detected: opcode buffer follows:\n\n")
1011
        Log(self.GetFeedbackBuf())
1012
        Log("\n\n")
1013
      if not self.opts.keep_instances:
1014
        try:
1015
          self.BurnRemove()
1016
        except Exception, err:  # pylint: disable-msg=W0703
1017
          if has_err: # already detected errors, so errors in removal
1018
                      # are quite expected
1019
            Log("Note: error detected during instance remove: %s", err)
1020
          else: # non-expected error
1021
            raise
1022

    
1023
    return 0
1024

    
1025

    
1026
def main():
1027
  """Main function"""
1028

    
1029
  burner = Burner()
1030
  return burner.BurninCluster()
1031

    
1032

    
1033
if __name__ == "__main__":
1034
  main()