Statistics
| Branch: | Tag: | Revision:

root / tools / burnin @ 728489a3

History | View | Annotate | Download (36.7 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Burnin program
23

    
24
"""
25

    
26
import sys
27
import optparse
28
import time
29
import socket
30
import urllib
31
from itertools import izip, islice, cycle
32
from cStringIO import StringIO
33

    
34
from ganeti import opcodes
35
from ganeti import constants
36
from ganeti import cli
37
from ganeti import errors
38
from ganeti import utils
39
from ganeti import ssconf
40

    
41
from ganeti.confd import client as confd_client
42

    
43

    
44
USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...")
45

    
46
MAX_RETRIES = 3
47
LOG_HEADERS = {
48
  0: "- ",
49
  1: "* ",
50
  2: ""
51
  }
52

    
53
class InstanceDown(Exception):
54
  """The checked instance was not up"""
55

    
56

    
57
class BurninFailure(Exception):
58
  """Failure detected during burning"""
59

    
60

    
61
def Usage():
62
  """Shows program usage information and exits the program."""
63

    
64
  print >> sys.stderr, "Usage:"
65
  print >> sys.stderr, USAGE
66
  sys.exit(2)
67

    
68

    
69
def Log(msg, *args, **kwargs):
70
  """Simple function that prints out its argument.
71

    
72
  """
73
  if args:
74
    msg = msg % args
75
  indent = kwargs.get('indent', 0)
76
  sys.stdout.write("%*s%s%s\n" % (2*indent, "",
77
                                  LOG_HEADERS.get(indent, "  "), msg))
78
  sys.stdout.flush()
79

    
80

    
81
def Err(msg, exit_code=1):
82
  """Simple error logging that prints to stderr.
83

    
84
  """
85
  sys.stderr.write(msg + "\n")
86
  sys.stderr.flush()
87
  sys.exit(exit_code)
88

    
89

    
90
class SimpleOpener(urllib.FancyURLopener):
91
  """A simple url opener"""
92
  # pylint: disable-msg=W0221
93

    
94
  def prompt_user_passwd(self, host, realm, clear_cache=0):
95
    """No-interaction version of prompt_user_passwd."""
96
    # we follow parent class' API
97
    # pylint: disable-msg=W0613
98
    return None, None
99

    
100
  def http_error_default(self, url, fp, errcode, errmsg, headers):
101
    """Custom error handling"""
102
    # make sure sockets are not left in CLOSE_WAIT, this is similar
103
    # but with a different exception to the BasicURLOpener class
104
    _ = fp.read() # throw away data
105
    fp.close()
106
    raise InstanceDown("HTTP error returned: code %s, msg %s" %
107
                       (errcode, errmsg))
108

    
109

    
110
OPTIONS = [
111
  cli.cli_option("-o", "--os", dest="os", default=None,
112
                 help="OS to use during burnin",
113
                 metavar="<OS>",
114
                 completion_suggest=cli.OPT_COMPL_ONE_OS),
115
  cli.cli_option("--disk-size", dest="disk_size",
116
                 help="Disk size (determines disk count)",
117
                 default="128m", type="string", metavar="<size,size,...>",
118
                 completion_suggest=("128M 512M 1G 4G 1G,256M"
119
                                     " 4G,1G,1G 10G").split()),
120
  cli.cli_option("--disk-growth", dest="disk_growth", help="Disk growth",
121
                 default="128m", type="string", metavar="<size,size,...>"),
122
  cli.cli_option("--mem-size", dest="mem_size", help="Memory size",
123
                 default=128, type="unit", metavar="<size>",
124
                 completion_suggest=("128M 256M 512M 1G 4G 8G"
125
                                     " 12G 16G").split()),
126
  cli.DEBUG_OPT,
127
  cli.VERBOSE_OPT,
128
  cli.NOIPCHECK_OPT,
129
  cli.NONAMECHECK_OPT,
130
  cli.EARLY_RELEASE_OPT,
131
  cli.cli_option("--no-replace1", dest="do_replace1",
132
                 help="Skip disk replacement with the same secondary",
133
                 action="store_false", default=True),
134
  cli.cli_option("--no-replace2", dest="do_replace2",
135
                 help="Skip disk replacement with a different secondary",
136
                 action="store_false", default=True),
137
  cli.cli_option("--no-failover", dest="do_failover",
138
                 help="Skip instance failovers", action="store_false",
139
                 default=True),
140
  cli.cli_option("--no-migrate", dest="do_migrate",
141
                 help="Skip instance live migration",
142
                 action="store_false", default=True),
143
  cli.cli_option("--no-move", dest="do_move",
144
                 help="Skip instance moves", action="store_false",
145
                 default=True),
146
  cli.cli_option("--no-importexport", dest="do_importexport",
147
                 help="Skip instance export/import", action="store_false",
148
                 default=True),
149
  cli.cli_option("--no-startstop", dest="do_startstop",
150
                 help="Skip instance stop/start", action="store_false",
151
                 default=True),
152
  cli.cli_option("--no-reinstall", dest="do_reinstall",
153
                 help="Skip instance reinstall", action="store_false",
154
                 default=True),
155
  cli.cli_option("--no-reboot", dest="do_reboot",
156
                 help="Skip instance reboot", action="store_false",
157
                 default=True),
158
  cli.cli_option("--no-activate-disks", dest="do_activate_disks",
159
                 help="Skip disk activation/deactivation",
160
                 action="store_false", default=True),
161
  cli.cli_option("--no-add-disks", dest="do_addremove_disks",
162
                 help="Skip disk addition/removal",
163
                 action="store_false", default=True),
164
  cli.cli_option("--no-add-nics", dest="do_addremove_nics",
165
                 help="Skip NIC addition/removal",
166
                 action="store_false", default=True),
167
  cli.cli_option("--no-nics", dest="nics",
168
                 help="No network interfaces", action="store_const",
169
                 const=[], default=[{}]),
170
  cli.cli_option("--no-confd", dest="do_confd_tests",
171
                 help="Skip confd queries",
172
                 action="store_false", default=True),
173
  cli.cli_option("--rename", dest="rename", default=None,
174
                 help=("Give one unused instance name which is taken"
175
                       " to start the renaming sequence"),
176
                 metavar="<instance_name>"),
177
  cli.cli_option("-t", "--disk-template", dest="disk_template",
178
                 choices=list(constants.DISK_TEMPLATES),
179
                 default=constants.DT_DRBD8,
180
                 help="Disk template (diskless, file, plain or drbd) [drbd]"),
181
  cli.cli_option("-n", "--nodes", dest="nodes", default="",
182
                 help=("Comma separated list of nodes to perform"
183
                       " the burnin on (defaults to all nodes)"),
184
                 completion_suggest=cli.OPT_COMPL_MANY_NODES),
185
  cli.cli_option("-I", "--iallocator", dest="iallocator",
186
                 default=None, type="string",
187
                 help=("Perform the allocation using an iallocator"
188
                       " instead of fixed node spread (node restrictions no"
189
                       " longer apply, therefore -n/--nodes must not be"
190
                       " used"),
191
                 completion_suggest=cli.OPT_COMPL_ONE_IALLOCATOR),
192
  cli.cli_option("-p", "--parallel", default=False, action="store_true",
193
                 dest="parallel",
194
                 help=("Enable parallelization of some operations in"
195
                       " order to speed burnin or to test granular locking")),
196
  cli.cli_option("--net-timeout", default=15, type="int",
197
                 dest="net_timeout",
198
                 help=("The instance check network timeout in seconds"
199
                       " (defaults to 15 seconds)"),
200
                 completion_suggest="15 60 300 900".split()),
201
  cli.cli_option("-C", "--http-check", default=False, action="store_true",
202
                 dest="http_check",
203
                 help=("Enable checking of instance status via http,"
204
                       " looking for /hostname.txt that should contain the"
205
                       " name of the instance")),
206
  cli.cli_option("-K", "--keep-instances", default=False,
207
                 action="store_true",
208
                 dest="keep_instances",
209
                 help=("Leave instances on the cluster after burnin,"
210
                       " for investigation in case of errors or simply"
211
                       " to use them")),
212
  ]
213

    
214
# Mainly used for bash completion
215
ARGUMENTS = [cli.ArgInstance(min=1)]
216

    
217

    
218
def _DoCheckInstances(fn):
219
  """Decorator for checking instances.
220

    
221
  """
222
  def wrapper(self, *args, **kwargs):
223
    val = fn(self, *args, **kwargs)
224
    for instance in self.instances:
225
      self._CheckInstanceAlive(instance) # pylint: disable-msg=W0212
226
    return val
227

    
228
  return wrapper
229

    
230

    
231
def _DoBatch(retry):
232
  """Decorator for possible batch operations.
233

    
234
  Must come after the _DoCheckInstances decorator (if any).
235

    
236
  @param retry: whether this is a retryable batch, will be
237
      passed to StartBatch
238

    
239
  """
240
  def wrap(fn):
241
    def batched(self, *args, **kwargs):
242
      self.StartBatch(retry)
243
      val = fn(self, *args, **kwargs)
244
      self.CommitQueue()
245
      return val
246
    return batched
247

    
248
  return wrap
249

    
250

    
251
class Burner(object):
252
  """Burner class."""
253

    
254
  def __init__(self):
255
    """Constructor."""
256
    utils.SetupLogging(constants.LOG_BURNIN, debug=False, stderr_logging=True)
257
    self.url_opener = SimpleOpener()
258
    self._feed_buf = StringIO()
259
    self.nodes = []
260
    self.instances = []
261
    self.to_rem = []
262
    self.queued_ops = []
263
    self.opts = None
264
    self.queue_retry = False
265
    self.disk_count = self.disk_growth = self.disk_size = None
266
    self.hvp = self.bep = None
267
    self.ParseOptions()
268
    self.cl = cli.GetClient()
269
    self.ss = ssconf.SimpleStore()
270
    self.GetState()
271

    
272
  def ClearFeedbackBuf(self):
273
    """Clear the feedback buffer."""
274
    self._feed_buf.truncate(0)
275

    
276
  def GetFeedbackBuf(self):
277
    """Return the contents of the buffer."""
278
    return self._feed_buf.getvalue()
279

    
280
  def Feedback(self, msg):
281
    """Acumulate feedback in our buffer."""
282
    formatted_msg = "%s %s" % (time.ctime(utils.MergeTime(msg[0])), msg[2])
283
    self._feed_buf.write(formatted_msg + "\n")
284
    if self.opts.verbose:
285
      Log(formatted_msg, indent=3)
286

    
287
  def MaybeRetry(self, retry_count, msg, fn, *args):
288
    """Possibly retry a given function execution.
289

    
290
    @type retry_count: int
291
    @param retry_count: retry counter:
292
        - 0: non-retryable action
293
        - 1: last retry for a retryable action
294
        - MAX_RETRIES: original try for a retryable action
295
    @type msg: str
296
    @param msg: the kind of the operation
297
    @type fn: callable
298
    @param fn: the function to be called
299

    
300
    """
301
    try:
302
      val = fn(*args)
303
      if retry_count > 0 and retry_count < MAX_RETRIES:
304
        Log("Idempotent %s succeeded after %d retries",
305
            msg, MAX_RETRIES - retry_count)
306
      return val
307
    except Exception, err: # pylint: disable-msg=W0703
308
      if retry_count == 0:
309
        Log("Non-idempotent %s failed, aborting", msg)
310
        raise
311
      elif retry_count == 1:
312
        Log("Idempotent %s repeated failure, aborting", msg)
313
        raise
314
      else:
315
        Log("Idempotent %s failed, retry #%d/%d: %s",
316
            msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err)
317
        self.MaybeRetry(retry_count - 1, msg, fn, *args)
318

    
319
  def _SetDebug(self, ops):
320
    """Set the debug value on the given opcodes"""
321
    for op in ops:
322
      op.debug_level = self.opts.debug
323

    
324
  def _ExecOp(self, *ops):
325
    """Execute one or more opcodes and manage the exec buffer.
326

    
327
    @result: if only opcode has been passed, we return its result;
328
        otherwise we return the list of results
329

    
330
    """
331
    job_id = cli.SendJob(ops, cl=self.cl)
332
    results = cli.PollJob(job_id, cl=self.cl, feedback_fn=self.Feedback)
333
    if len(ops) == 1:
334
      return results[0]
335
    else:
336
      return results
337

    
338
  def ExecOp(self, retry, *ops):
339
    """Execute one or more opcodes and manage the exec buffer.
340

    
341
    @result: if only opcode has been passed, we return its result;
342
        otherwise we return the list of results
343

    
344
    """
345
    if retry:
346
      rval = MAX_RETRIES
347
    else:
348
      rval = 0
349
    self._SetDebug(ops)
350
    return self.MaybeRetry(rval, "opcode", self._ExecOp, *ops)
351

    
352
  def ExecOrQueue(self, name, *ops):
353
    """Execute an opcode and manage the exec buffer."""
354
    if self.opts.parallel:
355
      self._SetDebug(ops)
356
      self.queued_ops.append((ops, name))
357
    else:
358
      return self.ExecOp(self.queue_retry, *ops)
359

    
360
  def StartBatch(self, retry):
361
    """Start a new batch of jobs.
362

    
363
    @param retry: whether this is a retryable batch
364

    
365
    """
366
    self.queued_ops = []
367
    self.queue_retry = retry
368

    
369
  def CommitQueue(self):
370
    """Execute all submitted opcodes in case of parallel burnin"""
371
    if not self.opts.parallel:
372
      return
373

    
374
    if self.queue_retry:
375
      rval = MAX_RETRIES
376
    else:
377
      rval = 0
378

    
379
    try:
380
      results = self.MaybeRetry(rval, "jobset", self.ExecJobSet,
381
                                self.queued_ops)
382
    finally:
383
      self.queued_ops = []
384
    return results
385

    
386
  def ExecJobSet(self, jobs):
387
    """Execute a set of jobs and return once all are done.
388

    
389
    The method will return the list of results, if all jobs are
390
    successful. Otherwise, OpExecError will be raised from within
391
    cli.py.
392

    
393
    """
394
    self.ClearFeedbackBuf()
395
    jex = cli.JobExecutor(cl=self.cl, feedback_fn=self.Feedback)
396
    for ops, name in jobs:
397
      jex.QueueJob(name, *ops) # pylint: disable-msg=W0142
398
    try:
399
      results = jex.GetResults()
400
    except Exception, err: # pylint: disable-msg=W0703
401
      Log("Jobs failed: %s", err)
402
      raise BurninFailure()
403

    
404
    if utils.any(results, lambda x: not x[0]):
405
      raise BurninFailure()
406

    
407
    return [i[1] for i in results]
408

    
409
  def ParseOptions(self):
410
    """Parses the command line options.
411

    
412
    In case of command line errors, it will show the usage and exit the
413
    program.
414

    
415
    """
416
    parser = optparse.OptionParser(usage="\n%s" % USAGE,
417
                                   version=("%%prog (ganeti) %s" %
418
                                            constants.RELEASE_VERSION),
419
                                   option_list=OPTIONS)
420

    
421
    options, args = parser.parse_args()
422
    if len(args) < 1 or options.os is None:
423
      Usage()
424

    
425
    supported_disk_templates = (constants.DT_DISKLESS,
426
                                constants.DT_FILE,
427
                                constants.DT_PLAIN,
428
                                constants.DT_DRBD8)
429
    if options.disk_template not in supported_disk_templates:
430
      Err("Unknown disk template '%s'" % options.disk_template)
431

    
432
    if options.disk_template == constants.DT_DISKLESS:
433
      disk_size = disk_growth = []
434
      options.do_addremove_disks = False
435
    else:
436
      disk_size = [utils.ParseUnit(v) for v in options.disk_size.split(",")]
437
      disk_growth = [utils.ParseUnit(v)
438
                     for v in options.disk_growth.split(",")]
439
      if len(disk_growth) != len(disk_size):
440
        Err("Wrong disk sizes/growth combination")
441
    if ((disk_size and options.disk_template == constants.DT_DISKLESS) or
442
        (not disk_size and options.disk_template != constants.DT_DISKLESS)):
443
      Err("Wrong disk count/disk template combination")
444

    
445
    self.disk_size = disk_size
446
    self.disk_growth = disk_growth
447
    self.disk_count = len(disk_size)
448

    
449
    if options.nodes and options.iallocator:
450
      Err("Give either the nodes option or the iallocator option, not both")
451

    
452
    if options.http_check and not options.name_check:
453
      Err("Can't enable HTTP checks without name checks")
454

    
455
    self.opts = options
456
    self.instances = args
457
    self.bep = {
458
      constants.BE_MEMORY: options.mem_size,
459
      constants.BE_VCPUS: 1,
460
      }
461
    self.hvp = {}
462

    
463
    socket.setdefaulttimeout(options.net_timeout)
464

    
465
  def GetState(self):
466
    """Read the cluster state from the master daemon."""
467
    if self.opts.nodes:
468
      names = self.opts.nodes.split(",")
469
    else:
470
      names = []
471
    try:
472
      op = opcodes.OpQueryNodes(output_fields=["name", "offline", "drained"],
473
                                names=names, use_locking=True)
474
      result = self.ExecOp(True, op)
475
    except errors.GenericError, err:
476
      err_code, msg = cli.FormatError(err)
477
      Err(msg, exit_code=err_code)
478
    self.nodes = [data[0] for data in result if not (data[1] or data[2])]
479

    
480
    op_diagnose = opcodes.OpDiagnoseOS(output_fields=["name", "valid",
481
                                                      "variants"], names=[])
482
    result = self.ExecOp(True, op_diagnose)
483

    
484
    if not result:
485
      Err("Can't get the OS list")
486

    
487
    found = False
488
    for (name, valid, variants) in result:
489
      if valid and self.opts.os in cli.CalculateOSNames(name, variants):
490
        found = True
491
        break
492

    
493
    if not found:
494
      Err("OS '%s' not found" % self.opts.os)
495

    
496
    cluster_info = self.cl.QueryClusterInfo()
497
    self.cluster_info = cluster_info
498
    if not self.cluster_info:
499
      Err("Can't get cluster info")
500

    
501
    default_nic_params = self.cluster_info["nicparams"][constants.PP_DEFAULT]
502
    self.cluster_default_nicparams = default_nic_params
503

    
504
  @_DoCheckInstances
505
  @_DoBatch(False)
506
  def BurnCreateInstances(self):
507
    """Create the given instances.
508

    
509
    """
510
    self.to_rem = []
511
    mytor = izip(cycle(self.nodes),
512
                 islice(cycle(self.nodes), 1, None),
513
                 self.instances)
514

    
515
    Log("Creating instances")
516
    for pnode, snode, instance in mytor:
517
      Log("instance %s", instance, indent=1)
518
      if self.opts.iallocator:
519
        pnode = snode = None
520
        msg = "with iallocator %s" % self.opts.iallocator
521
      elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
522
        snode = None
523
        msg = "on %s" % pnode
524
      else:
525
        msg = "on %s, %s" % (pnode, snode)
526

    
527
      Log(msg, indent=2)
528

    
529
      op = opcodes.OpCreateInstance(instance_name=instance,
530
                                    disks = [ {"size": size}
531
                                              for size in self.disk_size],
532
                                    disk_template=self.opts.disk_template,
533
                                    nics=self.opts.nics,
534
                                    mode=constants.INSTANCE_CREATE,
535
                                    os_type=self.opts.os,
536
                                    pnode=pnode,
537
                                    snode=snode,
538
                                    start=True,
539
                                    ip_check=self.opts.ip_check,
540
                                    name_check=self.opts.name_check,
541
                                    wait_for_sync=True,
542
                                    file_driver="loop",
543
                                    file_storage_dir=None,
544
                                    iallocator=self.opts.iallocator,
545
                                    beparams=self.bep,
546
                                    hvparams=self.hvp,
547
                                    )
548

    
549
      self.ExecOrQueue(instance, op)
550
      self.to_rem.append(instance)
551

    
552
  @_DoBatch(False)
553
  def BurnGrowDisks(self):
554
    """Grow both the os and the swap disks by the requested amount, if any."""
555
    Log("Growing disks")
556
    for instance in self.instances:
557
      Log("instance %s", instance, indent=1)
558
      for idx, growth in enumerate(self.disk_growth):
559
        if growth > 0:
560
          op = opcodes.OpGrowDisk(instance_name=instance, disk=idx,
561
                                  amount=growth, wait_for_sync=True)
562
          Log("increase disk/%s by %s MB", idx, growth, indent=2)
563
          self.ExecOrQueue(instance, op)
564

    
565
  @_DoBatch(True)
566
  def BurnReplaceDisks1D8(self):
567
    """Replace disks on primary and secondary for drbd8."""
568
    Log("Replacing disks on the same nodes")
569
    for instance in self.instances:
570
      Log("instance %s", instance, indent=1)
571
      ops = []
572
      for mode in constants.REPLACE_DISK_SEC, constants.REPLACE_DISK_PRI:
573
        op = opcodes.OpReplaceDisks(instance_name=instance,
574
                                    mode=mode,
575
                                    disks=[i for i in range(self.disk_count)],
576
                                    early_release=self.opts.early_release)
577
        Log("run %s", mode, indent=2)
578
        ops.append(op)
579
      self.ExecOrQueue(instance, *ops) # pylint: disable-msg=W0142
580

    
581
  @_DoBatch(True)
582
  def BurnReplaceDisks2(self):
583
    """Replace secondary node."""
584
    Log("Changing the secondary node")
585
    mode = constants.REPLACE_DISK_CHG
586

    
587
    mytor = izip(islice(cycle(self.nodes), 2, None),
588
                 self.instances)
589
    for tnode, instance in mytor:
590
      Log("instance %s", instance, indent=1)
591
      if self.opts.iallocator:
592
        tnode = None
593
        msg = "with iallocator %s" % self.opts.iallocator
594
      else:
595
        msg = tnode
596
      op = opcodes.OpReplaceDisks(instance_name=instance,
597
                                  mode=mode,
598
                                  remote_node=tnode,
599
                                  iallocator=self.opts.iallocator,
600
                                  disks=[],
601
                                  early_release=self.opts.early_release)
602
      Log("run %s %s", mode, msg, indent=2)
603
      self.ExecOrQueue(instance, op)
604

    
605
  @_DoCheckInstances
606
  @_DoBatch(False)
607
  def BurnFailover(self):
608
    """Failover the instances."""
609
    Log("Failing over instances")
610
    for instance in self.instances:
611
      Log("instance %s", instance, indent=1)
612
      op = opcodes.OpFailoverInstance(instance_name=instance,
613
                                      ignore_consistency=False)
614
      self.ExecOrQueue(instance, op)
615

    
616
  @_DoCheckInstances
617
  @_DoBatch(False)
618
  def BurnMove(self):
619
    """Move the instances."""
620
    Log("Moving instances")
621
    mytor = izip(islice(cycle(self.nodes), 1, None),
622
                 self.instances)
623
    for tnode, instance in mytor:
624
      Log("instance %s", instance, indent=1)
625
      op = opcodes.OpMoveInstance(instance_name=instance,
626
                                  target_node=tnode)
627
      self.ExecOrQueue(instance, op)
628

    
629
  @_DoBatch(False)
630
  def BurnMigrate(self):
631
    """Migrate the instances."""
632
    Log("Migrating instances")
633
    for instance in self.instances:
634
      Log("instance %s", instance, indent=1)
635
      op1 = opcodes.OpMigrateInstance(instance_name=instance, live=True,
636
                                      cleanup=False)
637

    
638
      op2 = opcodes.OpMigrateInstance(instance_name=instance, live=True,
639
                                      cleanup=True)
640
      Log("migration and migration cleanup", indent=2)
641
      self.ExecOrQueue(instance, op1, op2)
642

    
643
  @_DoCheckInstances
644
  @_DoBatch(False)
645
  def BurnImportExport(self):
646
    """Export the instance, delete it, and import it back.
647

    
648
    """
649
    Log("Exporting and re-importing instances")
650
    mytor = izip(cycle(self.nodes),
651
                 islice(cycle(self.nodes), 1, None),
652
                 islice(cycle(self.nodes), 2, None),
653
                 self.instances)
654

    
655
    for pnode, snode, enode, instance in mytor:
656
      Log("instance %s", instance, indent=1)
657
      # read the full name of the instance
658
      nam_op = opcodes.OpQueryInstances(output_fields=["name"],
659
                                        names=[instance], use_locking=True)
660
      full_name = self.ExecOp(False, nam_op)[0][0]
661

    
662
      if self.opts.iallocator:
663
        pnode = snode = None
664
        import_log_msg = ("import from %s"
665
                          " with iallocator %s" %
666
                          (enode, self.opts.iallocator))
667
      elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
668
        snode = None
669
        import_log_msg = ("import from %s to %s" %
670
                          (enode, pnode))
671
      else:
672
        import_log_msg = ("import from %s to %s, %s" %
673
                          (enode, pnode, snode))
674

    
675
      exp_op = opcodes.OpExportInstance(instance_name=instance,
676
                                           target_node=enode,
677
                                           shutdown=True)
678
      rem_op = opcodes.OpRemoveInstance(instance_name=instance,
679
                                        ignore_failures=True)
680
      imp_dir = utils.PathJoin(constants.EXPORT_DIR, full_name)
681
      imp_op = opcodes.OpCreateInstance(instance_name=instance,
682
                                        disks = [ {"size": size}
683
                                                  for size in self.disk_size],
684
                                        disk_template=self.opts.disk_template,
685
                                        nics=self.opts.nics,
686
                                        mode=constants.INSTANCE_IMPORT,
687
                                        src_node=enode,
688
                                        src_path=imp_dir,
689
                                        pnode=pnode,
690
                                        snode=snode,
691
                                        start=True,
692
                                        ip_check=self.opts.ip_check,
693
                                        name_check=self.opts.name_check,
694
                                        wait_for_sync=True,
695
                                        file_storage_dir=None,
696
                                        file_driver="loop",
697
                                        iallocator=self.opts.iallocator,
698
                                        beparams=self.bep,
699
                                        hvparams=self.hvp,
700
                                        )
701

    
702
      erem_op = opcodes.OpRemoveExport(instance_name=instance)
703

    
704
      Log("export to node %s", enode, indent=2)
705
      Log("remove instance", indent=2)
706
      Log(import_log_msg, indent=2)
707
      Log("remove export", indent=2)
708
      self.ExecOrQueue(instance, exp_op, rem_op, imp_op, erem_op)
709

    
710
  @staticmethod
711
  def StopInstanceOp(instance):
712
    """Stop given instance."""
713
    return opcodes.OpShutdownInstance(instance_name=instance)
714

    
715
  @staticmethod
716
  def StartInstanceOp(instance):
717
    """Start given instance."""
718
    return opcodes.OpStartupInstance(instance_name=instance, force=False)
719

    
720
  @staticmethod
721
  def RenameInstanceOp(instance, instance_new):
722
    """Rename instance."""
723
    return opcodes.OpRenameInstance(instance_name=instance,
724
                                    new_name=instance_new)
725

    
726
  @_DoCheckInstances
727
  @_DoBatch(True)
728
  def BurnStopStart(self):
729
    """Stop/start the instances."""
730
    Log("Stopping and starting instances")
731
    for instance in self.instances:
732
      Log("instance %s", instance, indent=1)
733
      op1 = self.StopInstanceOp(instance)
734
      op2 = self.StartInstanceOp(instance)
735
      self.ExecOrQueue(instance, op1, op2)
736

    
737
  @_DoBatch(False)
738
  def BurnRemove(self):
739
    """Remove the instances."""
740
    Log("Removing instances")
741
    for instance in self.to_rem:
742
      Log("instance %s", instance, indent=1)
743
      op = opcodes.OpRemoveInstance(instance_name=instance,
744
                                    ignore_failures=True)
745
      self.ExecOrQueue(instance, op)
746

    
747
  def BurnRename(self):
748
    """Rename the instances.
749

    
750
    Note that this function will not execute in parallel, since we
751
    only have one target for rename.
752

    
753
    """
754
    Log("Renaming instances")
755
    rename = self.opts.rename
756
    for instance in self.instances:
757
      Log("instance %s", instance, indent=1)
758
      op_stop1 = self.StopInstanceOp(instance)
759
      op_stop2 = self.StopInstanceOp(rename)
760
      op_rename1 = self.RenameInstanceOp(instance, rename)
761
      op_rename2 = self.RenameInstanceOp(rename, instance)
762
      op_start1 = self.StartInstanceOp(rename)
763
      op_start2 = self.StartInstanceOp(instance)
764
      self.ExecOp(False, op_stop1, op_rename1, op_start1)
765
      self._CheckInstanceAlive(rename)
766
      self.ExecOp(False, op_stop2, op_rename2, op_start2)
767
      self._CheckInstanceAlive(instance)
768

    
769
  @_DoCheckInstances
770
  @_DoBatch(True)
771
  def BurnReinstall(self):
772
    """Reinstall the instances."""
773
    Log("Reinstalling instances")
774
    for instance in self.instances:
775
      Log("instance %s", instance, indent=1)
776
      op1 = self.StopInstanceOp(instance)
777
      op2 = opcodes.OpReinstallInstance(instance_name=instance)
778
      Log("reinstall without passing the OS", indent=2)
779
      op3 = opcodes.OpReinstallInstance(instance_name=instance,
780
                                        os_type=self.opts.os)
781
      Log("reinstall specifying the OS", indent=2)
782
      op4 = self.StartInstanceOp(instance)
783
      self.ExecOrQueue(instance, op1, op2, op3, op4)
784

    
785
  @_DoCheckInstances
786
  @_DoBatch(True)
787
  def BurnReboot(self):
788
    """Reboot the instances."""
789
    Log("Rebooting instances")
790
    for instance in self.instances:
791
      Log("instance %s", instance, indent=1)
792
      ops = []
793
      for reboot_type in constants.REBOOT_TYPES:
794
        op = opcodes.OpRebootInstance(instance_name=instance,
795
                                      reboot_type=reboot_type,
796
                                      ignore_secondaries=False)
797
        Log("reboot with type '%s'", reboot_type, indent=2)
798
        ops.append(op)
799
      self.ExecOrQueue(instance, *ops) # pylint: disable-msg=W0142
800

    
801
  @_DoCheckInstances
802
  @_DoBatch(True)
803
  def BurnActivateDisks(self):
804
    """Activate and deactivate disks of the instances."""
805
    Log("Activating/deactivating disks")
806
    for instance in self.instances:
807
      Log("instance %s", instance, indent=1)
808
      op_start = self.StartInstanceOp(instance)
809
      op_act = opcodes.OpActivateInstanceDisks(instance_name=instance)
810
      op_deact = opcodes.OpDeactivateInstanceDisks(instance_name=instance)
811
      op_stop = self.StopInstanceOp(instance)
812
      Log("activate disks when online", indent=2)
813
      Log("activate disks when offline", indent=2)
814
      Log("deactivate disks (when offline)", indent=2)
815
      self.ExecOrQueue(instance, op_act, op_stop, op_act, op_deact, op_start)
816

    
817
  @_DoCheckInstances
818
  @_DoBatch(False)
819
  def BurnAddRemoveDisks(self):
820
    """Add and remove an extra disk for the instances."""
821
    Log("Adding and removing disks")
822
    for instance in self.instances:
823
      Log("instance %s", instance, indent=1)
824
      op_add = opcodes.OpSetInstanceParams(\
825
        instance_name=instance,
826
        disks=[(constants.DDM_ADD, {"size": self.disk_size[0]})])
827
      op_rem = opcodes.OpSetInstanceParams(\
828
        instance_name=instance, disks=[(constants.DDM_REMOVE, {})])
829
      op_stop = self.StopInstanceOp(instance)
830
      op_start = self.StartInstanceOp(instance)
831
      Log("adding a disk", indent=2)
832
      Log("removing last disk", indent=2)
833
      self.ExecOrQueue(instance, op_add, op_stop, op_rem, op_start)
834

    
835
  @_DoBatch(False)
836
  def BurnAddRemoveNICs(self):
837
    """Add and remove an extra NIC for the instances."""
838
    Log("Adding and removing NICs")
839
    for instance in self.instances:
840
      Log("instance %s", instance, indent=1)
841
      op_add = opcodes.OpSetInstanceParams(\
842
        instance_name=instance, nics=[(constants.DDM_ADD, {})])
843
      op_rem = opcodes.OpSetInstanceParams(\
844
        instance_name=instance, nics=[(constants.DDM_REMOVE, {})])
845
      Log("adding a NIC", indent=2)
846
      Log("removing last NIC", indent=2)
847
      self.ExecOrQueue(instance, op_add, op_rem)
848

    
849
  def ConfdCallback(self, reply):
850
    """Callback for confd queries"""
851
    if reply.type == confd_client.UPCALL_REPLY:
852
      if reply.server_reply.status != constants.CONFD_REPL_STATUS_OK:
853
        Err("Query %s gave non-ok status %s: %s" % (reply.orig_request,
854
                                                    reply.server_reply.status,
855
                                                    reply.server_reply))
856
      if reply.orig_request.type == constants.CONFD_REQ_PING:
857
        Log("Ping: OK", indent=1)
858
      elif reply.orig_request.type == constants.CONFD_REQ_CLUSTER_MASTER:
859
        if reply.server_reply.answer == self.cluster_info["master"]:
860
          Log("Master: OK", indent=1)
861
        else:
862
          Err("Master: wrong: %s" % reply.server_reply.answer)
863
      elif reply.orig_request.type == constants.CONFD_REQ_NODE_ROLE_BYNAME:
864
        if reply.server_reply.answer == constants.CONFD_NODE_ROLE_MASTER:
865
          Log("Node role for master: OK", indent=1)
866
        else:
867
          Err("Node role for master: wrong: %s" % reply.server_reply.answer)
868

    
869
  def DoConfdRequestReply(self, req):
870
    self.confd_counting_callback.RegisterQuery(req.rsalt)
871
    self.confd_client.SendRequest(req, async=False)
872
    while not self.confd_counting_callback.AllAnswered():
873
      if not self.confd_client.ReceiveReply():
874
        Err("Did not receive all expected confd replies")
875
        break
876

    
877
  def BurnConfd(self):
878
    """Run confd queries for our instances.
879

    
880
    The following confd queries are tested:
881
    - CONFD_REQ_PING: simple ping
882
    - CONFD_REQ_CLUSTER_MASTER: cluster master
883
    - CONFD_REQ_NODE_ROLE_BYNAME: node role, for the master
884

    
885
    """
886
    Log("Checking confd results")
887

    
888
    hmac_key = utils.ReadFile(constants.CONFD_HMAC_KEY)
889
    mc_file = self.ss.KeyToFilename(constants.SS_MASTER_CANDIDATES_IPS)
890
    mc_list = utils.ReadFile(mc_file).splitlines()
891
    filter_callback = confd_client.ConfdFilterCallback(self.ConfdCallback)
892
    counting_callback = confd_client.ConfdCountingCallback(filter_callback)
893
    self.confd_counting_callback = counting_callback
894

    
895
    self.confd_client = confd_client.ConfdClient(hmac_key, mc_list,
896
                                                 counting_callback)
897

    
898
    req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
899
    self.DoConfdRequestReply(req)
900

    
901
    req = confd_client.ConfdClientRequest(
902
      type=constants.CONFD_REQ_CLUSTER_MASTER)
903
    self.DoConfdRequestReply(req)
904

    
905
    req = confd_client.ConfdClientRequest(
906
        type=constants.CONFD_REQ_NODE_ROLE_BYNAME,
907
        query=self.cluster_info["master"])
908
    self.DoConfdRequestReply(req)
909

    
910
  def _CheckInstanceAlive(self, instance):
911
    """Check if an instance is alive by doing http checks.
912

    
913
    This will try to retrieve the url on the instance /hostname.txt
914
    and check that it contains the hostname of the instance. In case
915
    we get ECONNREFUSED, we retry up to the net timeout seconds, for
916
    any other error we abort.
917

    
918
    """
919
    if not self.opts.http_check:
920
      return
921
    end_time = time.time() + self.opts.net_timeout
922
    url = None
923
    while time.time() < end_time and url is None:
924
      try:
925
        url = self.url_opener.open("http://%s/hostname.txt" % instance)
926
      except IOError:
927
        # here we can have connection refused, no route to host, etc.
928
        time.sleep(1)
929
    if url is None:
930
      raise InstanceDown(instance, "Cannot contact instance")
931
    hostname = url.read().strip()
932
    url.close()
933
    if hostname != instance:
934
      raise InstanceDown(instance, ("Hostname mismatch, expected %s, got %s" %
935
                                    (instance, hostname)))
936

    
937
  def BurninCluster(self):
938
    """Test a cluster intensively.
939

    
940
    This will create instances and then start/stop/failover them.
941
    It is safe for existing instances but could impact performance.
942

    
943
    """
944

    
945
    opts = self.opts
946

    
947
    Log("Testing global parameters")
948

    
949
    if (len(self.nodes) == 1 and
950
        opts.disk_template not in (constants.DT_DISKLESS, constants.DT_PLAIN,
951
                                   constants.DT_FILE)):
952
      Err("When one node is available/selected the disk template must"
953
          " be 'diskless', 'file' or 'plain'")
954

    
955
    has_err = True
956
    try:
957
      self.BurnCreateInstances()
958
      if opts.do_replace1 and opts.disk_template in constants.DTS_NET_MIRROR:
959
        self.BurnReplaceDisks1D8()
960
      if (opts.do_replace2 and len(self.nodes) > 2 and
961
          opts.disk_template in constants.DTS_NET_MIRROR) :
962
        self.BurnReplaceDisks2()
963

    
964
      if (opts.disk_template in constants.DTS_GROWABLE and
965
          utils.any(self.disk_growth, lambda n: n > 0)):
966
        self.BurnGrowDisks()
967

    
968
      if opts.do_failover and opts.disk_template in constants.DTS_NET_MIRROR:
969
        self.BurnFailover()
970

    
971
      if opts.do_migrate and opts.disk_template == constants.DT_DRBD8:
972
        self.BurnMigrate()
973

    
974
      if (opts.do_move and len(self.nodes) > 1 and
975
          opts.disk_template in [constants.DT_PLAIN, constants.DT_FILE]):
976
        self.BurnMove()
977

    
978
      if (opts.do_importexport and
979
          opts.disk_template not in (constants.DT_DISKLESS,
980
                                     constants.DT_FILE)):
981
        self.BurnImportExport()
982

    
983
      if opts.do_reinstall:
984
        self.BurnReinstall()
985

    
986
      if opts.do_reboot:
987
        self.BurnReboot()
988

    
989
      if opts.do_addremove_disks:
990
        self.BurnAddRemoveDisks()
991

    
992
      default_nic_mode = self.cluster_default_nicparams[constants.NIC_MODE]
993
      # Don't add/remove nics in routed mode, as we would need an ip to add
994
      # them with
995
      if opts.do_addremove_nics:
996
        if default_nic_mode == constants.NIC_MODE_BRIDGED:
997
          self.BurnAddRemoveNICs()
998
        else:
999
          Log("Skipping nic add/remove as the cluster is not in bridged mode")
1000

    
1001
      if opts.do_activate_disks:
1002
        self.BurnActivateDisks()
1003

    
1004
      if opts.rename:
1005
        self.BurnRename()
1006

    
1007
      if opts.do_confd_tests:
1008
        self.BurnConfd()
1009

    
1010
      if opts.do_startstop:
1011
        self.BurnStopStart()
1012

    
1013
      has_err = False
1014
    finally:
1015
      if has_err:
1016
        Log("Error detected: opcode buffer follows:\n\n")
1017
        Log(self.GetFeedbackBuf())
1018
        Log("\n\n")
1019
      if not self.opts.keep_instances:
1020
        try:
1021
          self.BurnRemove()
1022
        except Exception, err:  # pylint: disable-msg=W0703
1023
          if has_err: # already detected errors, so errors in removal
1024
                      # are quite expected
1025
            Log("Note: error detected during instance remove: %s", err)
1026
          else: # non-expected error
1027
            raise
1028

    
1029
    return 0
1030

    
1031

    
1032
def main():
1033
  """Main function"""
1034

    
1035
  burner = Burner()
1036
  return burner.BurninCluster()
1037

    
1038

    
1039
if __name__ == "__main__":
1040
  main()