Statistics
| Branch: | Tag: | Revision:

root / tools / burnin @ cea881e5

History | View | Annotate | Download (37.5 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Burnin program
23

    
24
"""
25

    
26
import sys
27
import optparse
28
import time
29
import socket
30
import urllib
31
from itertools import izip, islice, cycle
32
from cStringIO import StringIO
33

    
34
from ganeti import opcodes
35
from ganeti import constants
36
from ganeti import cli
37
from ganeti import errors
38
from ganeti import utils
39
from ganeti import hypervisor
40
from ganeti import compat
41

    
42
from ganeti.confd import client as confd_client
43

    
44

    
45
USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...")
46

    
47
MAX_RETRIES = 3
48
LOG_HEADERS = {
49
  0: "- ",
50
  1: "* ",
51
  2: ""
52
  }
53

    
54
class InstanceDown(Exception):
55
  """The checked instance was not up"""
56

    
57

    
58
class BurninFailure(Exception):
59
  """Failure detected during burning"""
60

    
61

    
62
def Usage():
63
  """Shows program usage information and exits the program."""
64

    
65
  print >> sys.stderr, "Usage:"
66
  print >> sys.stderr, USAGE
67
  sys.exit(2)
68

    
69

    
70
def Log(msg, *args, **kwargs):
71
  """Simple function that prints out its argument.
72

    
73
  """
74
  if args:
75
    msg = msg % args
76
  indent = kwargs.get('indent', 0)
77
  sys.stdout.write("%*s%s%s\n" % (2*indent, "",
78
                                  LOG_HEADERS.get(indent, "  "), msg))
79
  sys.stdout.flush()
80

    
81

    
82
def Err(msg, exit_code=1):
83
  """Simple error logging that prints to stderr.
84

    
85
  """
86
  sys.stderr.write(msg + "\n")
87
  sys.stderr.flush()
88
  sys.exit(exit_code)
89

    
90

    
91
class SimpleOpener(urllib.FancyURLopener):
92
  """A simple url opener"""
93
  # pylint: disable-msg=W0221
94

    
95
  def prompt_user_passwd(self, host, realm, clear_cache=0):
96
    """No-interaction version of prompt_user_passwd."""
97
    # we follow parent class' API
98
    # pylint: disable-msg=W0613
99
    return None, None
100

    
101
  def http_error_default(self, url, fp, errcode, errmsg, headers):
102
    """Custom error handling"""
103
    # make sure sockets are not left in CLOSE_WAIT, this is similar
104
    # but with a different exception to the BasicURLOpener class
105
    _ = fp.read() # throw away data
106
    fp.close()
107
    raise InstanceDown("HTTP error returned: code %s, msg %s" %
108
                       (errcode, errmsg))
109

    
110

    
111
OPTIONS = [
112
  cli.cli_option("-o", "--os", dest="os", default=None,
113
                 help="OS to use during burnin",
114
                 metavar="<OS>",
115
                 completion_suggest=cli.OPT_COMPL_ONE_OS),
116
  cli.HYPERVISOR_OPT,
117
  cli.cli_option("--disk-size", dest="disk_size",
118
                 help="Disk size (determines disk count)",
119
                 default="128m", type="string", metavar="<size,size,...>",
120
                 completion_suggest=("128M 512M 1G 4G 1G,256M"
121
                                     " 4G,1G,1G 10G").split()),
122
  cli.cli_option("--disk-growth", dest="disk_growth", help="Disk growth",
123
                 default="128m", type="string", metavar="<size,size,...>"),
124
  cli.cli_option("--mem-size", dest="mem_size", help="Memory size",
125
                 default=128, type="unit", metavar="<size>",
126
                 completion_suggest=("128M 256M 512M 1G 4G 8G"
127
                                     " 12G 16G").split()),
128
  cli.DEBUG_OPT,
129
  cli.VERBOSE_OPT,
130
  cli.NOIPCHECK_OPT,
131
  cli.NONAMECHECK_OPT,
132
  cli.EARLY_RELEASE_OPT,
133
  cli.cli_option("--no-replace1", dest="do_replace1",
134
                 help="Skip disk replacement with the same secondary",
135
                 action="store_false", default=True),
136
  cli.cli_option("--no-replace2", dest="do_replace2",
137
                 help="Skip disk replacement with a different secondary",
138
                 action="store_false", default=True),
139
  cli.cli_option("--no-failover", dest="do_failover",
140
                 help="Skip instance failovers", action="store_false",
141
                 default=True),
142
  cli.cli_option("--no-migrate", dest="do_migrate",
143
                 help="Skip instance live migration",
144
                 action="store_false", default=True),
145
  cli.cli_option("--no-move", dest="do_move",
146
                 help="Skip instance moves", action="store_false",
147
                 default=True),
148
  cli.cli_option("--no-importexport", dest="do_importexport",
149
                 help="Skip instance export/import", action="store_false",
150
                 default=True),
151
  cli.cli_option("--no-startstop", dest="do_startstop",
152
                 help="Skip instance stop/start", action="store_false",
153
                 default=True),
154
  cli.cli_option("--no-reinstall", dest="do_reinstall",
155
                 help="Skip instance reinstall", action="store_false",
156
                 default=True),
157
  cli.cli_option("--no-reboot", dest="do_reboot",
158
                 help="Skip instance reboot", action="store_false",
159
                 default=True),
160
  cli.cli_option("--no-activate-disks", dest="do_activate_disks",
161
                 help="Skip disk activation/deactivation",
162
                 action="store_false", default=True),
163
  cli.cli_option("--no-add-disks", dest="do_addremove_disks",
164
                 help="Skip disk addition/removal",
165
                 action="store_false", default=True),
166
  cli.cli_option("--no-add-nics", dest="do_addremove_nics",
167
                 help="Skip NIC addition/removal",
168
                 action="store_false", default=True),
169
  cli.cli_option("--no-nics", dest="nics",
170
                 help="No network interfaces", action="store_const",
171
                 const=[], default=[{}]),
172
  cli.cli_option("--no-confd", dest="do_confd_tests",
173
                 help="Skip confd queries",
174
                 action="store_false", default=True),
175
  cli.cli_option("--rename", dest="rename", default=None,
176
                 help=("Give one unused instance name which is taken"
177
                       " to start the renaming sequence"),
178
                 metavar="<instance_name>"),
179
  cli.cli_option("-t", "--disk-template", dest="disk_template",
180
                 choices=list(constants.DISK_TEMPLATES),
181
                 default=constants.DT_DRBD8,
182
                 help="Disk template (diskless, file, plain or drbd) [drbd]"),
183
  cli.cli_option("-n", "--nodes", dest="nodes", default="",
184
                 help=("Comma separated list of nodes to perform"
185
                       " the burnin on (defaults to all nodes)"),
186
                 completion_suggest=cli.OPT_COMPL_MANY_NODES),
187
  cli.cli_option("-I", "--iallocator", dest="iallocator",
188
                 default=None, type="string",
189
                 help=("Perform the allocation using an iallocator"
190
                       " instead of fixed node spread (node restrictions no"
191
                       " longer apply, therefore -n/--nodes must not be"
192
                       " used"),
193
                 completion_suggest=cli.OPT_COMPL_ONE_IALLOCATOR),
194
  cli.cli_option("-p", "--parallel", default=False, action="store_true",
195
                 dest="parallel",
196
                 help=("Enable parallelization of some operations in"
197
                       " order to speed burnin or to test granular locking")),
198
  cli.cli_option("--net-timeout", default=15, type="int",
199
                 dest="net_timeout",
200
                 help=("The instance check network timeout in seconds"
201
                       " (defaults to 15 seconds)"),
202
                 completion_suggest="15 60 300 900".split()),
203
  cli.cli_option("-C", "--http-check", default=False, action="store_true",
204
                 dest="http_check",
205
                 help=("Enable checking of instance status via http,"
206
                       " looking for /hostname.txt that should contain the"
207
                       " name of the instance")),
208
  cli.cli_option("-K", "--keep-instances", default=False,
209
                 action="store_true",
210
                 dest="keep_instances",
211
                 help=("Leave instances on the cluster after burnin,"
212
                       " for investigation in case of errors or simply"
213
                       " to use them")),
214
  ]
215

    
216
# Mainly used for bash completion
217
ARGUMENTS = [cli.ArgInstance(min=1)]
218

    
219

    
220
def _DoCheckInstances(fn):
221
  """Decorator for checking instances.
222

    
223
  """
224
  def wrapper(self, *args, **kwargs):
225
    val = fn(self, *args, **kwargs)
226
    for instance in self.instances:
227
      self._CheckInstanceAlive(instance) # pylint: disable-msg=W0212
228
    return val
229

    
230
  return wrapper
231

    
232

    
233
def _DoBatch(retry):
234
  """Decorator for possible batch operations.
235

    
236
  Must come after the _DoCheckInstances decorator (if any).
237

    
238
  @param retry: whether this is a retryable batch, will be
239
      passed to StartBatch
240

    
241
  """
242
  def wrap(fn):
243
    def batched(self, *args, **kwargs):
244
      self.StartBatch(retry)
245
      val = fn(self, *args, **kwargs)
246
      self.CommitQueue()
247
      return val
248
    return batched
249

    
250
  return wrap
251

    
252

    
253
class Burner(object):
254
  """Burner class."""
255

    
256
  def __init__(self):
257
    """Constructor."""
258
    utils.SetupLogging(constants.LOG_BURNIN, debug=False, stderr_logging=True)
259
    self.url_opener = SimpleOpener()
260
    self._feed_buf = StringIO()
261
    self.nodes = []
262
    self.instances = []
263
    self.to_rem = []
264
    self.queued_ops = []
265
    self.opts = None
266
    self.queue_retry = False
267
    self.disk_count = self.disk_growth = self.disk_size = None
268
    self.hvp = self.bep = None
269
    self.ParseOptions()
270
    self.cl = cli.GetClient()
271
    self.GetState()
272

    
273
  def ClearFeedbackBuf(self):
274
    """Clear the feedback buffer."""
275
    self._feed_buf.truncate(0)
276

    
277
  def GetFeedbackBuf(self):
278
    """Return the contents of the buffer."""
279
    return self._feed_buf.getvalue()
280

    
281
  def Feedback(self, msg):
282
    """Acumulate feedback in our buffer."""
283
    formatted_msg = "%s %s" % (time.ctime(utils.MergeTime(msg[0])), msg[2])
284
    self._feed_buf.write(formatted_msg + "\n")
285
    if self.opts.verbose:
286
      Log(formatted_msg, indent=3)
287

    
288
  def MaybeRetry(self, retry_count, msg, fn, *args):
289
    """Possibly retry a given function execution.
290

    
291
    @type retry_count: int
292
    @param retry_count: retry counter:
293
        - 0: non-retryable action
294
        - 1: last retry for a retryable action
295
        - MAX_RETRIES: original try for a retryable action
296
    @type msg: str
297
    @param msg: the kind of the operation
298
    @type fn: callable
299
    @param fn: the function to be called
300

    
301
    """
302
    try:
303
      val = fn(*args)
304
      if retry_count > 0 and retry_count < MAX_RETRIES:
305
        Log("Idempotent %s succeeded after %d retries",
306
            msg, MAX_RETRIES - retry_count)
307
      return val
308
    except Exception, err: # pylint: disable-msg=W0703
309
      if retry_count == 0:
310
        Log("Non-idempotent %s failed, aborting", msg)
311
        raise
312
      elif retry_count == 1:
313
        Log("Idempotent %s repeated failure, aborting", msg)
314
        raise
315
      else:
316
        Log("Idempotent %s failed, retry #%d/%d: %s",
317
            msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err)
318
        self.MaybeRetry(retry_count - 1, msg, fn, *args)
319

    
320
  def _SetDebug(self, ops):
321
    """Set the debug value on the given opcodes"""
322
    for op in ops:
323
      op.debug_level = self.opts.debug
324

    
325
  def _ExecOp(self, *ops):
326
    """Execute one or more opcodes and manage the exec buffer.
327

    
328
    @return: if only opcode has been passed, we return its result;
329
        otherwise we return the list of results
330

    
331
    """
332
    job_id = cli.SendJob(ops, cl=self.cl)
333
    results = cli.PollJob(job_id, cl=self.cl, feedback_fn=self.Feedback)
334
    if len(ops) == 1:
335
      return results[0]
336
    else:
337
      return results
338

    
339
  def ExecOp(self, retry, *ops):
340
    """Execute one or more opcodes and manage the exec buffer.
341

    
342
    @return: if only opcode has been passed, we return its result;
343
        otherwise we return the list of results
344

    
345
    """
346
    if retry:
347
      rval = MAX_RETRIES
348
    else:
349
      rval = 0
350
    self._SetDebug(ops)
351
    return self.MaybeRetry(rval, "opcode", self._ExecOp, *ops)
352

    
353
  def ExecOrQueue(self, name, ops, post_process=None):
354
    """Execute an opcode and manage the exec buffer."""
355
    if self.opts.parallel:
356
      self._SetDebug(ops)
357
      self.queued_ops.append((ops, name, post_process))
358
    else:
359
      val = self.ExecOp(self.queue_retry, *ops) # pylint: disable-msg=W0142
360
      if post_process is not None:
361
        post_process()
362
      return val
363

    
364
  def StartBatch(self, retry):
365
    """Start a new batch of jobs.
366

    
367
    @param retry: whether this is a retryable batch
368

    
369
    """
370
    self.queued_ops = []
371
    self.queue_retry = retry
372

    
373
  def CommitQueue(self):
374
    """Execute all submitted opcodes in case of parallel burnin"""
375
    if not self.opts.parallel:
376
      return
377

    
378
    if self.queue_retry:
379
      rval = MAX_RETRIES
380
    else:
381
      rval = 0
382

    
383
    try:
384
      results = self.MaybeRetry(rval, "jobset", self.ExecJobSet,
385
                                self.queued_ops)
386
    finally:
387
      self.queued_ops = []
388
    return results
389

    
390
  def ExecJobSet(self, jobs):
391
    """Execute a set of jobs and return once all are done.
392

    
393
    The method will return the list of results, if all jobs are
394
    successful. Otherwise, OpExecError will be raised from within
395
    cli.py.
396

    
397
    """
398
    self.ClearFeedbackBuf()
399
    jex = cli.JobExecutor(cl=self.cl, feedback_fn=self.Feedback)
400
    for ops, name, _ in jobs:
401
      jex.QueueJob(name, *ops) # pylint: disable-msg=W0142
402
    try:
403
      results = jex.GetResults()
404
    except Exception, err: # pylint: disable-msg=W0703
405
      Log("Jobs failed: %s", err)
406
      raise BurninFailure()
407

    
408
    fail = False
409
    val = []
410
    for (_, name, post_process), (success, result) in zip(jobs, results):
411
      if success:
412
        if post_process:
413
          try:
414
            post_process()
415
          except Exception, err: # pylint: disable-msg=W0703
416
            Log("Post process call for job %s failed: %s", name, err)
417
            fail = True
418
        val.append(result)
419
      else:
420
        fail = True
421

    
422
    if fail:
423
      raise BurninFailure()
424

    
425
    return val
426

    
427
  def ParseOptions(self):
428
    """Parses the command line options.
429

    
430
    In case of command line errors, it will show the usage and exit the
431
    program.
432

    
433
    """
434
    parser = optparse.OptionParser(usage="\n%s" % USAGE,
435
                                   version=("%%prog (ganeti) %s" %
436
                                            constants.RELEASE_VERSION),
437
                                   option_list=OPTIONS)
438

    
439
    options, args = parser.parse_args()
440
    if len(args) < 1 or options.os is None:
441
      Usage()
442

    
443
    supported_disk_templates = (constants.DT_DISKLESS,
444
                                constants.DT_FILE,
445
                                constants.DT_PLAIN,
446
                                constants.DT_DRBD8)
447
    if options.disk_template not in supported_disk_templates:
448
      Err("Unknown disk template '%s'" % options.disk_template)
449

    
450
    if options.disk_template == constants.DT_DISKLESS:
451
      disk_size = disk_growth = []
452
      options.do_addremove_disks = False
453
    else:
454
      disk_size = [utils.ParseUnit(v) for v in options.disk_size.split(",")]
455
      disk_growth = [utils.ParseUnit(v)
456
                     for v in options.disk_growth.split(",")]
457
      if len(disk_growth) != len(disk_size):
458
        Err("Wrong disk sizes/growth combination")
459
    if ((disk_size and options.disk_template == constants.DT_DISKLESS) or
460
        (not disk_size and options.disk_template != constants.DT_DISKLESS)):
461
      Err("Wrong disk count/disk template combination")
462

    
463
    self.disk_size = disk_size
464
    self.disk_growth = disk_growth
465
    self.disk_count = len(disk_size)
466

    
467
    if options.nodes and options.iallocator:
468
      Err("Give either the nodes option or the iallocator option, not both")
469

    
470
    if options.http_check and not options.name_check:
471
      Err("Can't enable HTTP checks without name checks")
472

    
473
    self.opts = options
474
    self.instances = args
475
    self.bep = {
476
      constants.BE_MEMORY: options.mem_size,
477
      constants.BE_VCPUS: 1,
478
      }
479

    
480
    self.hypervisor = None
481
    self.hvp = {}
482
    if options.hypervisor:
483
      self.hypervisor, self.hvp = options.hypervisor
484

    
485
    socket.setdefaulttimeout(options.net_timeout)
486

    
487
  def GetState(self):
488
    """Read the cluster state from the master daemon."""
489
    if self.opts.nodes:
490
      names = self.opts.nodes.split(",")
491
    else:
492
      names = []
493
    try:
494
      op = opcodes.OpQueryNodes(output_fields=["name", "offline", "drained"],
495
                                names=names, use_locking=True)
496
      result = self.ExecOp(True, op)
497
    except errors.GenericError, err:
498
      err_code, msg = cli.FormatError(err)
499
      Err(msg, exit_code=err_code)
500
    self.nodes = [data[0] for data in result if not (data[1] or data[2])]
501

    
502
    op_diagnose = opcodes.OpDiagnoseOS(output_fields=["name", "valid",
503
                                                      "variants"], names=[])
504
    result = self.ExecOp(True, op_diagnose)
505

    
506
    if not result:
507
      Err("Can't get the OS list")
508

    
509
    found = False
510
    for (name, valid, variants) in result:
511
      if valid and self.opts.os in cli.CalculateOSNames(name, variants):
512
        found = True
513
        break
514

    
515
    if not found:
516
      Err("OS '%s' not found" % self.opts.os)
517

    
518
    cluster_info = self.cl.QueryClusterInfo()
519
    self.cluster_info = cluster_info
520
    if not self.cluster_info:
521
      Err("Can't get cluster info")
522

    
523
    default_nic_params = self.cluster_info["nicparams"][constants.PP_DEFAULT]
524
    self.cluster_default_nicparams = default_nic_params
525
    if self.hypervisor is None:
526
      self.hypervisor = self.cluster_info["default_hypervisor"]
527
    self.hv_class = hypervisor.GetHypervisorClass(self.hypervisor)
528

    
529
  @_DoCheckInstances
530
  @_DoBatch(False)
531
  def BurnCreateInstances(self):
532
    """Create the given instances.
533

    
534
    """
535
    self.to_rem = []
536
    mytor = izip(cycle(self.nodes),
537
                 islice(cycle(self.nodes), 1, None),
538
                 self.instances)
539

    
540
    Log("Creating instances")
541
    for pnode, snode, instance in mytor:
542
      Log("instance %s", instance, indent=1)
543
      if self.opts.iallocator:
544
        pnode = snode = None
545
        msg = "with iallocator %s" % self.opts.iallocator
546
      elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
547
        snode = None
548
        msg = "on %s" % pnode
549
      else:
550
        msg = "on %s, %s" % (pnode, snode)
551

    
552
      Log(msg, indent=2)
553

    
554
      op = opcodes.OpCreateInstance(instance_name=instance,
555
                                    disks = [ {"size": size}
556
                                              for size in self.disk_size],
557
                                    disk_template=self.opts.disk_template,
558
                                    nics=self.opts.nics,
559
                                    mode=constants.INSTANCE_CREATE,
560
                                    os_type=self.opts.os,
561
                                    pnode=pnode,
562
                                    snode=snode,
563
                                    start=True,
564
                                    ip_check=self.opts.ip_check,
565
                                    name_check=self.opts.name_check,
566
                                    wait_for_sync=True,
567
                                    file_driver="loop",
568
                                    file_storage_dir=None,
569
                                    iallocator=self.opts.iallocator,
570
                                    beparams=self.bep,
571
                                    hvparams=self.hvp,
572
                                    hypervisor=self.hypervisor,
573
                                    )
574
      remove_instance = lambda name: lambda: self.to_rem.append(name)
575
      self.ExecOrQueue(instance, [op], post_process=remove_instance(instance))
576

    
577
  @_DoBatch(False)
578
  def BurnGrowDisks(self):
579
    """Grow both the os and the swap disks by the requested amount, if any."""
580
    Log("Growing disks")
581
    for instance in self.instances:
582
      Log("instance %s", instance, indent=1)
583
      for idx, growth in enumerate(self.disk_growth):
584
        if growth > 0:
585
          op = opcodes.OpGrowDisk(instance_name=instance, disk=idx,
586
                                  amount=growth, wait_for_sync=True)
587
          Log("increase disk/%s by %s MB", idx, growth, indent=2)
588
          self.ExecOrQueue(instance, [op])
589

    
590
  @_DoBatch(True)
591
  def BurnReplaceDisks1D8(self):
592
    """Replace disks on primary and secondary for drbd8."""
593
    Log("Replacing disks on the same nodes")
594
    for instance in self.instances:
595
      Log("instance %s", instance, indent=1)
596
      ops = []
597
      for mode in constants.REPLACE_DISK_SEC, constants.REPLACE_DISK_PRI:
598
        op = opcodes.OpReplaceDisks(instance_name=instance,
599
                                    mode=mode,
600
                                    disks=[i for i in range(self.disk_count)],
601
                                    early_release=self.opts.early_release)
602
        Log("run %s", mode, indent=2)
603
        ops.append(op)
604
      self.ExecOrQueue(instance, ops)
605

    
606
  @_DoBatch(True)
607
  def BurnReplaceDisks2(self):
608
    """Replace secondary node."""
609
    Log("Changing the secondary node")
610
    mode = constants.REPLACE_DISK_CHG
611

    
612
    mytor = izip(islice(cycle(self.nodes), 2, None),
613
                 self.instances)
614
    for tnode, instance in mytor:
615
      Log("instance %s", instance, indent=1)
616
      if self.opts.iallocator:
617
        tnode = None
618
        msg = "with iallocator %s" % self.opts.iallocator
619
      else:
620
        msg = tnode
621
      op = opcodes.OpReplaceDisks(instance_name=instance,
622
                                  mode=mode,
623
                                  remote_node=tnode,
624
                                  iallocator=self.opts.iallocator,
625
                                  disks=[],
626
                                  early_release=self.opts.early_release)
627
      Log("run %s %s", mode, msg, indent=2)
628
      self.ExecOrQueue(instance, [op])
629

    
630
  @_DoCheckInstances
631
  @_DoBatch(False)
632
  def BurnFailover(self):
633
    """Failover the instances."""
634
    Log("Failing over instances")
635
    for instance in self.instances:
636
      Log("instance %s", instance, indent=1)
637
      op = opcodes.OpFailoverInstance(instance_name=instance,
638
                                      ignore_consistency=False)
639
      self.ExecOrQueue(instance, [op])
640

    
641
  @_DoCheckInstances
642
  @_DoBatch(False)
643
  def BurnMove(self):
644
    """Move the instances."""
645
    Log("Moving instances")
646
    mytor = izip(islice(cycle(self.nodes), 1, None),
647
                 self.instances)
648
    for tnode, instance in mytor:
649
      Log("instance %s", instance, indent=1)
650
      op = opcodes.OpMoveInstance(instance_name=instance,
651
                                  target_node=tnode)
652
      self.ExecOrQueue(instance, [op])
653

    
654
  @_DoBatch(False)
655
  def BurnMigrate(self):
656
    """Migrate the instances."""
657
    Log("Migrating instances")
658
    for instance in self.instances:
659
      Log("instance %s", instance, indent=1)
660
      op1 = opcodes.OpMigrateInstance(instance_name=instance, live=True,
661
                                      cleanup=False)
662

    
663
      op2 = opcodes.OpMigrateInstance(instance_name=instance, live=True,
664
                                      cleanup=True)
665
      Log("migration and migration cleanup", indent=2)
666
      self.ExecOrQueue(instance, [op1, op2])
667

    
668
  @_DoCheckInstances
669
  @_DoBatch(False)
670
  def BurnImportExport(self):
671
    """Export the instance, delete it, and import it back.
672

    
673
    """
674
    Log("Exporting and re-importing instances")
675
    mytor = izip(cycle(self.nodes),
676
                 islice(cycle(self.nodes), 1, None),
677
                 islice(cycle(self.nodes), 2, None),
678
                 self.instances)
679

    
680
    for pnode, snode, enode, instance in mytor:
681
      Log("instance %s", instance, indent=1)
682
      # read the full name of the instance
683
      nam_op = opcodes.OpQueryInstances(output_fields=["name"],
684
                                        names=[instance], use_locking=True)
685
      full_name = self.ExecOp(False, nam_op)[0][0]
686

    
687
      if self.opts.iallocator:
688
        pnode = snode = None
689
        import_log_msg = ("import from %s"
690
                          " with iallocator %s" %
691
                          (enode, self.opts.iallocator))
692
      elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
693
        snode = None
694
        import_log_msg = ("import from %s to %s" %
695
                          (enode, pnode))
696
      else:
697
        import_log_msg = ("import from %s to %s, %s" %
698
                          (enode, pnode, snode))
699

    
700
      exp_op = opcodes.OpExportInstance(instance_name=instance,
701
                                           target_node=enode,
702
                                           shutdown=True)
703
      rem_op = opcodes.OpRemoveInstance(instance_name=instance,
704
                                        ignore_failures=True)
705
      imp_dir = utils.PathJoin(constants.EXPORT_DIR, full_name)
706
      imp_op = opcodes.OpCreateInstance(instance_name=instance,
707
                                        disks = [ {"size": size}
708
                                                  for size in self.disk_size],
709
                                        disk_template=self.opts.disk_template,
710
                                        nics=self.opts.nics,
711
                                        mode=constants.INSTANCE_IMPORT,
712
                                        src_node=enode,
713
                                        src_path=imp_dir,
714
                                        pnode=pnode,
715
                                        snode=snode,
716
                                        start=True,
717
                                        ip_check=self.opts.ip_check,
718
                                        name_check=self.opts.name_check,
719
                                        wait_for_sync=True,
720
                                        file_storage_dir=None,
721
                                        file_driver="loop",
722
                                        iallocator=self.opts.iallocator,
723
                                        beparams=self.bep,
724
                                        hvparams=self.hvp,
725
                                        )
726

    
727
      erem_op = opcodes.OpRemoveExport(instance_name=instance)
728

    
729
      Log("export to node %s", enode, indent=2)
730
      Log("remove instance", indent=2)
731
      Log(import_log_msg, indent=2)
732
      Log("remove export", indent=2)
733
      self.ExecOrQueue(instance, [exp_op, rem_op, imp_op, erem_op])
734

    
735
  @staticmethod
736
  def StopInstanceOp(instance):
737
    """Stop given instance."""
738
    return opcodes.OpShutdownInstance(instance_name=instance)
739

    
740
  @staticmethod
741
  def StartInstanceOp(instance):
742
    """Start given instance."""
743
    return opcodes.OpStartupInstance(instance_name=instance, force=False)
744

    
745
  @staticmethod
746
  def RenameInstanceOp(instance, instance_new):
747
    """Rename instance."""
748
    return opcodes.OpRenameInstance(instance_name=instance,
749
                                    new_name=instance_new)
750

    
751
  @_DoCheckInstances
752
  @_DoBatch(True)
753
  def BurnStopStart(self):
754
    """Stop/start the instances."""
755
    Log("Stopping and starting instances")
756
    for instance in self.instances:
757
      Log("instance %s", instance, indent=1)
758
      op1 = self.StopInstanceOp(instance)
759
      op2 = self.StartInstanceOp(instance)
760
      self.ExecOrQueue(instance, [op1, op2])
761

    
762
  @_DoBatch(False)
763
  def BurnRemove(self):
764
    """Remove the instances."""
765
    Log("Removing instances")
766
    for instance in self.to_rem:
767
      Log("instance %s", instance, indent=1)
768
      op = opcodes.OpRemoveInstance(instance_name=instance,
769
                                    ignore_failures=True)
770
      self.ExecOrQueue(instance, [op])
771

    
772
  def BurnRename(self):
773
    """Rename the instances.
774

    
775
    Note that this function will not execute in parallel, since we
776
    only have one target for rename.
777

    
778
    """
779
    Log("Renaming instances")
780
    rename = self.opts.rename
781
    for instance in self.instances:
782
      Log("instance %s", instance, indent=1)
783
      op_stop1 = self.StopInstanceOp(instance)
784
      op_stop2 = self.StopInstanceOp(rename)
785
      op_rename1 = self.RenameInstanceOp(instance, rename)
786
      op_rename2 = self.RenameInstanceOp(rename, instance)
787
      op_start1 = self.StartInstanceOp(rename)
788
      op_start2 = self.StartInstanceOp(instance)
789
      self.ExecOp(False, op_stop1, op_rename1, op_start1)
790
      self._CheckInstanceAlive(rename)
791
      self.ExecOp(False, op_stop2, op_rename2, op_start2)
792
      self._CheckInstanceAlive(instance)
793

    
794
  @_DoCheckInstances
795
  @_DoBatch(True)
796
  def BurnReinstall(self):
797
    """Reinstall the instances."""
798
    Log("Reinstalling instances")
799
    for instance in self.instances:
800
      Log("instance %s", instance, indent=1)
801
      op1 = self.StopInstanceOp(instance)
802
      op2 = opcodes.OpReinstallInstance(instance_name=instance)
803
      Log("reinstall without passing the OS", indent=2)
804
      op3 = opcodes.OpReinstallInstance(instance_name=instance,
805
                                        os_type=self.opts.os)
806
      Log("reinstall specifying the OS", indent=2)
807
      op4 = self.StartInstanceOp(instance)
808
      self.ExecOrQueue(instance, [op1, op2, op3, op4])
809

    
810
  @_DoCheckInstances
811
  @_DoBatch(True)
812
  def BurnReboot(self):
813
    """Reboot the instances."""
814
    Log("Rebooting instances")
815
    for instance in self.instances:
816
      Log("instance %s", instance, indent=1)
817
      ops = []
818
      for reboot_type in constants.REBOOT_TYPES:
819
        op = opcodes.OpRebootInstance(instance_name=instance,
820
                                      reboot_type=reboot_type,
821
                                      ignore_secondaries=False)
822
        Log("reboot with type '%s'", reboot_type, indent=2)
823
        ops.append(op)
824
      self.ExecOrQueue(instance, ops)
825

    
826
  @_DoCheckInstances
827
  @_DoBatch(True)
828
  def BurnActivateDisks(self):
829
    """Activate and deactivate disks of the instances."""
830
    Log("Activating/deactivating disks")
831
    for instance in self.instances:
832
      Log("instance %s", instance, indent=1)
833
      op_start = self.StartInstanceOp(instance)
834
      op_act = opcodes.OpActivateInstanceDisks(instance_name=instance)
835
      op_deact = opcodes.OpDeactivateInstanceDisks(instance_name=instance)
836
      op_stop = self.StopInstanceOp(instance)
837
      Log("activate disks when online", indent=2)
838
      Log("activate disks when offline", indent=2)
839
      Log("deactivate disks (when offline)", indent=2)
840
      self.ExecOrQueue(instance, [op_act, op_stop, op_act, op_deact, op_start])
841

    
842
  @_DoCheckInstances
843
  @_DoBatch(False)
844
  def BurnAddRemoveDisks(self):
845
    """Add and remove an extra disk for the instances."""
846
    Log("Adding and removing disks")
847
    for instance in self.instances:
848
      Log("instance %s", instance, indent=1)
849
      op_add = opcodes.OpSetInstanceParams(\
850
        instance_name=instance,
851
        disks=[(constants.DDM_ADD, {"size": self.disk_size[0]})])
852
      op_rem = opcodes.OpSetInstanceParams(\
853
        instance_name=instance, disks=[(constants.DDM_REMOVE, {})])
854
      op_stop = self.StopInstanceOp(instance)
855
      op_start = self.StartInstanceOp(instance)
856
      Log("adding a disk", indent=2)
857
      Log("removing last disk", indent=2)
858
      self.ExecOrQueue(instance, [op_add, op_stop, op_rem, op_start])
859

    
860
  @_DoBatch(False)
861
  def BurnAddRemoveNICs(self):
862
    """Add and remove an extra NIC for the instances."""
863
    Log("Adding and removing NICs")
864
    for instance in self.instances:
865
      Log("instance %s", instance, indent=1)
866
      op_add = opcodes.OpSetInstanceParams(\
867
        instance_name=instance, nics=[(constants.DDM_ADD, {})])
868
      op_rem = opcodes.OpSetInstanceParams(\
869
        instance_name=instance, nics=[(constants.DDM_REMOVE, {})])
870
      Log("adding a NIC", indent=2)
871
      Log("removing last NIC", indent=2)
872
      self.ExecOrQueue(instance, [op_add, op_rem])
873

    
874
  def ConfdCallback(self, reply):
875
    """Callback for confd queries"""
876
    if reply.type == confd_client.UPCALL_REPLY:
877
      if reply.server_reply.status != constants.CONFD_REPL_STATUS_OK:
878
        Err("Query %s gave non-ok status %s: %s" % (reply.orig_request,
879
                                                    reply.server_reply.status,
880
                                                    reply.server_reply))
881
      if reply.orig_request.type == constants.CONFD_REQ_PING:
882
        Log("Ping: OK", indent=1)
883
      elif reply.orig_request.type == constants.CONFD_REQ_CLUSTER_MASTER:
884
        if reply.server_reply.answer == self.cluster_info["master"]:
885
          Log("Master: OK", indent=1)
886
        else:
887
          Err("Master: wrong: %s" % reply.server_reply.answer)
888
      elif reply.orig_request.type == constants.CONFD_REQ_NODE_ROLE_BYNAME:
889
        if reply.server_reply.answer == constants.CONFD_NODE_ROLE_MASTER:
890
          Log("Node role for master: OK", indent=1)
891
        else:
892
          Err("Node role for master: wrong: %s" % reply.server_reply.answer)
893

    
894
  def DoConfdRequestReply(self, req):
895
    self.confd_counting_callback.RegisterQuery(req.rsalt)
896
    self.confd_client.SendRequest(req, async=False)
897
    while not self.confd_counting_callback.AllAnswered():
898
      if not self.confd_client.ReceiveReply():
899
        Err("Did not receive all expected confd replies")
900
        break
901

    
902
  def BurnConfd(self):
903
    """Run confd queries for our instances.
904

    
905
    The following confd queries are tested:
906
      - CONFD_REQ_PING: simple ping
907
      - CONFD_REQ_CLUSTER_MASTER: cluster master
908
      - CONFD_REQ_NODE_ROLE_BYNAME: node role, for the master
909

    
910
    """
911
    Log("Checking confd results")
912

    
913
    filter_callback = confd_client.ConfdFilterCallback(self.ConfdCallback)
914
    counting_callback = confd_client.ConfdCountingCallback(filter_callback)
915
    self.confd_counting_callback = counting_callback
916

    
917
    self.confd_client = confd_client.GetConfdClient(counting_callback)
918

    
919
    req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
920
    self.DoConfdRequestReply(req)
921

    
922
    req = confd_client.ConfdClientRequest(
923
      type=constants.CONFD_REQ_CLUSTER_MASTER)
924
    self.DoConfdRequestReply(req)
925

    
926
    req = confd_client.ConfdClientRequest(
927
        type=constants.CONFD_REQ_NODE_ROLE_BYNAME,
928
        query=self.cluster_info["master"])
929
    self.DoConfdRequestReply(req)
930

    
931
  def _CheckInstanceAlive(self, instance):
932
    """Check if an instance is alive by doing http checks.
933

    
934
    This will try to retrieve the url on the instance /hostname.txt
935
    and check that it contains the hostname of the instance. In case
936
    we get ECONNREFUSED, we retry up to the net timeout seconds, for
937
    any other error we abort.
938

    
939
    """
940
    if not self.opts.http_check:
941
      return
942
    end_time = time.time() + self.opts.net_timeout
943
    url = None
944
    while time.time() < end_time and url is None:
945
      try:
946
        url = self.url_opener.open("http://%s/hostname.txt" % instance)
947
      except IOError:
948
        # here we can have connection refused, no route to host, etc.
949
        time.sleep(1)
950
    if url is None:
951
      raise InstanceDown(instance, "Cannot contact instance")
952
    hostname = url.read().strip()
953
    url.close()
954
    if hostname != instance:
955
      raise InstanceDown(instance, ("Hostname mismatch, expected %s, got %s" %
956
                                    (instance, hostname)))
957

    
958
  def BurninCluster(self):
959
    """Test a cluster intensively.
960

    
961
    This will create instances and then start/stop/failover them.
962
    It is safe for existing instances but could impact performance.
963

    
964
    """
965

    
966
    opts = self.opts
967

    
968
    Log("Testing global parameters")
969

    
970
    if (len(self.nodes) == 1 and
971
        opts.disk_template not in (constants.DT_DISKLESS, constants.DT_PLAIN,
972
                                   constants.DT_FILE)):
973
      Err("When one node is available/selected the disk template must"
974
          " be 'diskless', 'file' or 'plain'")
975

    
976
    has_err = True
977
    try:
978
      self.BurnCreateInstances()
979
      if opts.do_replace1 and opts.disk_template in constants.DTS_NET_MIRROR:
980
        self.BurnReplaceDisks1D8()
981
      if (opts.do_replace2 and len(self.nodes) > 2 and
982
          opts.disk_template in constants.DTS_NET_MIRROR) :
983
        self.BurnReplaceDisks2()
984

    
985
      if (opts.disk_template in constants.DTS_GROWABLE and
986
          compat.any(self.disk_growth, lambda n: n > 0)):
987
        self.BurnGrowDisks()
988

    
989
      if opts.do_failover and opts.disk_template in constants.DTS_NET_MIRROR:
990
        self.BurnFailover()
991

    
992
      if opts.do_migrate:
993
        if opts.disk_template != constants.DT_DRBD8:
994
          Log("Skipping migration (disk template not DRBD8)")
995
        elif not self.hv_class.CAN_MIGRATE:
996
          Log("Skipping migration (hypervisor %s does not support it)",
997
              self.hypervisor)
998
        else:
999
          self.BurnMigrate()
1000

    
1001
      if (opts.do_move and len(self.nodes) > 1 and
1002
          opts.disk_template in [constants.DT_PLAIN, constants.DT_FILE]):
1003
        self.BurnMove()
1004

    
1005
      if (opts.do_importexport and
1006
          opts.disk_template not in (constants.DT_DISKLESS,
1007
                                     constants.DT_FILE)):
1008
        self.BurnImportExport()
1009

    
1010
      if opts.do_reinstall:
1011
        self.BurnReinstall()
1012

    
1013
      if opts.do_reboot:
1014
        self.BurnReboot()
1015

    
1016
      if opts.do_addremove_disks:
1017
        self.BurnAddRemoveDisks()
1018

    
1019
      default_nic_mode = self.cluster_default_nicparams[constants.NIC_MODE]
1020
      # Don't add/remove nics in routed mode, as we would need an ip to add
1021
      # them with
1022
      if opts.do_addremove_nics:
1023
        if default_nic_mode == constants.NIC_MODE_BRIDGED:
1024
          self.BurnAddRemoveNICs()
1025
        else:
1026
          Log("Skipping nic add/remove as the cluster is not in bridged mode")
1027

    
1028
      if opts.do_activate_disks:
1029
        self.BurnActivateDisks()
1030

    
1031
      if opts.rename:
1032
        self.BurnRename()
1033

    
1034
      if opts.do_confd_tests:
1035
        self.BurnConfd()
1036

    
1037
      if opts.do_startstop:
1038
        self.BurnStopStart()
1039

    
1040
      has_err = False
1041
    finally:
1042
      if has_err:
1043
        Log("Error detected: opcode buffer follows:\n\n")
1044
        Log(self.GetFeedbackBuf())
1045
        Log("\n\n")
1046
      if not self.opts.keep_instances:
1047
        try:
1048
          self.BurnRemove()
1049
        except Exception, err:  # pylint: disable-msg=W0703
1050
          if has_err: # already detected errors, so errors in removal
1051
                      # are quite expected
1052
            Log("Note: error detected during instance remove: %s", err)
1053
          else: # non-expected error
1054
            raise
1055

    
1056
    return 0
1057

    
1058

    
1059
def main():
1060
  """Main function"""
1061

    
1062
  burner = Burner()
1063
  return burner.BurninCluster()
1064

    
1065

    
1066
if __name__ == "__main__":
1067
  main()