Statistics
| Branch: | Tag: | Revision:

root / tools / burnin @ aac0352d

History | View | Annotate | Download (37.5 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Burnin program
23

    
24
"""
25

    
26
import sys
27
import optparse
28
import time
29
import socket
30
import urllib
31
from itertools import izip, islice, cycle
32
from cStringIO import StringIO
33

    
34
from ganeti import opcodes
35
from ganeti import constants
36
from ganeti import cli
37
from ganeti import errors
38
from ganeti import utils
39
from ganeti import hypervisor
40

    
41
from ganeti.confd import client as confd_client
42

    
43

    
44
USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...")
45

    
46
MAX_RETRIES = 3
47
LOG_HEADERS = {
48
  0: "- ",
49
  1: "* ",
50
  2: ""
51
  }
52

    
53
class InstanceDown(Exception):
54
  """The checked instance was not up"""
55

    
56

    
57
class BurninFailure(Exception):
58
  """Failure detected during burning"""
59

    
60

    
61
def Usage():
62
  """Shows program usage information and exits the program."""
63

    
64
  print >> sys.stderr, "Usage:"
65
  print >> sys.stderr, USAGE
66
  sys.exit(2)
67

    
68

    
69
def Log(msg, *args, **kwargs):
70
  """Simple function that prints out its argument.
71

    
72
  """
73
  if args:
74
    msg = msg % args
75
  indent = kwargs.get('indent', 0)
76
  sys.stdout.write("%*s%s%s\n" % (2*indent, "",
77
                                  LOG_HEADERS.get(indent, "  "), msg))
78
  sys.stdout.flush()
79

    
80

    
81
def Err(msg, exit_code=1):
82
  """Simple error logging that prints to stderr.
83

    
84
  """
85
  sys.stderr.write(msg + "\n")
86
  sys.stderr.flush()
87
  sys.exit(exit_code)
88

    
89

    
90
class SimpleOpener(urllib.FancyURLopener):
91
  """A simple url opener"""
92
  # pylint: disable-msg=W0221
93

    
94
  def prompt_user_passwd(self, host, realm, clear_cache=0):
95
    """No-interaction version of prompt_user_passwd."""
96
    # we follow parent class' API
97
    # pylint: disable-msg=W0613
98
    return None, None
99

    
100
  def http_error_default(self, url, fp, errcode, errmsg, headers):
101
    """Custom error handling"""
102
    # make sure sockets are not left in CLOSE_WAIT, this is similar
103
    # but with a different exception to the BasicURLOpener class
104
    _ = fp.read() # throw away data
105
    fp.close()
106
    raise InstanceDown("HTTP error returned: code %s, msg %s" %
107
                       (errcode, errmsg))
108

    
109

    
110
OPTIONS = [
111
  cli.cli_option("-o", "--os", dest="os", default=None,
112
                 help="OS to use during burnin",
113
                 metavar="<OS>",
114
                 completion_suggest=cli.OPT_COMPL_ONE_OS),
115
  cli.HYPERVISOR_OPT,
116
  cli.cli_option("--disk-size", dest="disk_size",
117
                 help="Disk size (determines disk count)",
118
                 default="128m", type="string", metavar="<size,size,...>",
119
                 completion_suggest=("128M 512M 1G 4G 1G,256M"
120
                                     " 4G,1G,1G 10G").split()),
121
  cli.cli_option("--disk-growth", dest="disk_growth", help="Disk growth",
122
                 default="128m", type="string", metavar="<size,size,...>"),
123
  cli.cli_option("--mem-size", dest="mem_size", help="Memory size",
124
                 default=128, type="unit", metavar="<size>",
125
                 completion_suggest=("128M 256M 512M 1G 4G 8G"
126
                                     " 12G 16G").split()),
127
  cli.DEBUG_OPT,
128
  cli.VERBOSE_OPT,
129
  cli.NOIPCHECK_OPT,
130
  cli.NONAMECHECK_OPT,
131
  cli.EARLY_RELEASE_OPT,
132
  cli.cli_option("--no-replace1", dest="do_replace1",
133
                 help="Skip disk replacement with the same secondary",
134
                 action="store_false", default=True),
135
  cli.cli_option("--no-replace2", dest="do_replace2",
136
                 help="Skip disk replacement with a different secondary",
137
                 action="store_false", default=True),
138
  cli.cli_option("--no-failover", dest="do_failover",
139
                 help="Skip instance failovers", action="store_false",
140
                 default=True),
141
  cli.cli_option("--no-migrate", dest="do_migrate",
142
                 help="Skip instance live migration",
143
                 action="store_false", default=True),
144
  cli.cli_option("--no-move", dest="do_move",
145
                 help="Skip instance moves", action="store_false",
146
                 default=True),
147
  cli.cli_option("--no-importexport", dest="do_importexport",
148
                 help="Skip instance export/import", action="store_false",
149
                 default=True),
150
  cli.cli_option("--no-startstop", dest="do_startstop",
151
                 help="Skip instance stop/start", action="store_false",
152
                 default=True),
153
  cli.cli_option("--no-reinstall", dest="do_reinstall",
154
                 help="Skip instance reinstall", action="store_false",
155
                 default=True),
156
  cli.cli_option("--no-reboot", dest="do_reboot",
157
                 help="Skip instance reboot", action="store_false",
158
                 default=True),
159
  cli.cli_option("--no-activate-disks", dest="do_activate_disks",
160
                 help="Skip disk activation/deactivation",
161
                 action="store_false", default=True),
162
  cli.cli_option("--no-add-disks", dest="do_addremove_disks",
163
                 help="Skip disk addition/removal",
164
                 action="store_false", default=True),
165
  cli.cli_option("--no-add-nics", dest="do_addremove_nics",
166
                 help="Skip NIC addition/removal",
167
                 action="store_false", default=True),
168
  cli.cli_option("--no-nics", dest="nics",
169
                 help="No network interfaces", action="store_const",
170
                 const=[], default=[{}]),
171
  cli.cli_option("--no-confd", dest="do_confd_tests",
172
                 help="Skip confd queries",
173
                 action="store_false", default=True),
174
  cli.cli_option("--rename", dest="rename", default=None,
175
                 help=("Give one unused instance name which is taken"
176
                       " to start the renaming sequence"),
177
                 metavar="<instance_name>"),
178
  cli.cli_option("-t", "--disk-template", dest="disk_template",
179
                 choices=list(constants.DISK_TEMPLATES),
180
                 default=constants.DT_DRBD8,
181
                 help="Disk template (diskless, file, plain or drbd) [drbd]"),
182
  cli.cli_option("-n", "--nodes", dest="nodes", default="",
183
                 help=("Comma separated list of nodes to perform"
184
                       " the burnin on (defaults to all nodes)"),
185
                 completion_suggest=cli.OPT_COMPL_MANY_NODES),
186
  cli.cli_option("-I", "--iallocator", dest="iallocator",
187
                 default=None, type="string",
188
                 help=("Perform the allocation using an iallocator"
189
                       " instead of fixed node spread (node restrictions no"
190
                       " longer apply, therefore -n/--nodes must not be"
191
                       " used"),
192
                 completion_suggest=cli.OPT_COMPL_ONE_IALLOCATOR),
193
  cli.cli_option("-p", "--parallel", default=False, action="store_true",
194
                 dest="parallel",
195
                 help=("Enable parallelization of some operations in"
196
                       " order to speed burnin or to test granular locking")),
197
  cli.cli_option("--net-timeout", default=15, type="int",
198
                 dest="net_timeout",
199
                 help=("The instance check network timeout in seconds"
200
                       " (defaults to 15 seconds)"),
201
                 completion_suggest="15 60 300 900".split()),
202
  cli.cli_option("-C", "--http-check", default=False, action="store_true",
203
                 dest="http_check",
204
                 help=("Enable checking of instance status via http,"
205
                       " looking for /hostname.txt that should contain the"
206
                       " name of the instance")),
207
  cli.cli_option("-K", "--keep-instances", default=False,
208
                 action="store_true",
209
                 dest="keep_instances",
210
                 help=("Leave instances on the cluster after burnin,"
211
                       " for investigation in case of errors or simply"
212
                       " to use them")),
213
  ]
214

    
215
# Mainly used for bash completion
216
ARGUMENTS = [cli.ArgInstance(min=1)]
217

    
218

    
219
def _DoCheckInstances(fn):
220
  """Decorator for checking instances.
221

    
222
  """
223
  def wrapper(self, *args, **kwargs):
224
    val = fn(self, *args, **kwargs)
225
    for instance in self.instances:
226
      self._CheckInstanceAlive(instance) # pylint: disable-msg=W0212
227
    return val
228

    
229
  return wrapper
230

    
231

    
232
def _DoBatch(retry):
233
  """Decorator for possible batch operations.
234

    
235
  Must come after the _DoCheckInstances decorator (if any).
236

    
237
  @param retry: whether this is a retryable batch, will be
238
      passed to StartBatch
239

    
240
  """
241
  def wrap(fn):
242
    def batched(self, *args, **kwargs):
243
      self.StartBatch(retry)
244
      val = fn(self, *args, **kwargs)
245
      self.CommitQueue()
246
      return val
247
    return batched
248

    
249
  return wrap
250

    
251

    
252
class Burner(object):
253
  """Burner class."""
254

    
255
  def __init__(self):
256
    """Constructor."""
257
    utils.SetupLogging(constants.LOG_BURNIN, debug=False, stderr_logging=True)
258
    self.url_opener = SimpleOpener()
259
    self._feed_buf = StringIO()
260
    self.nodes = []
261
    self.instances = []
262
    self.to_rem = []
263
    self.queued_ops = []
264
    self.opts = None
265
    self.queue_retry = False
266
    self.disk_count = self.disk_growth = self.disk_size = None
267
    self.hvp = self.bep = None
268
    self.ParseOptions()
269
    self.cl = cli.GetClient()
270
    self.GetState()
271

    
272
  def ClearFeedbackBuf(self):
273
    """Clear the feedback buffer."""
274
    self._feed_buf.truncate(0)
275

    
276
  def GetFeedbackBuf(self):
277
    """Return the contents of the buffer."""
278
    return self._feed_buf.getvalue()
279

    
280
  def Feedback(self, msg):
281
    """Acumulate feedback in our buffer."""
282
    formatted_msg = "%s %s" % (time.ctime(utils.MergeTime(msg[0])), msg[2])
283
    self._feed_buf.write(formatted_msg + "\n")
284
    if self.opts.verbose:
285
      Log(formatted_msg, indent=3)
286

    
287
  def MaybeRetry(self, retry_count, msg, fn, *args):
288
    """Possibly retry a given function execution.
289

    
290
    @type retry_count: int
291
    @param retry_count: retry counter:
292
        - 0: non-retryable action
293
        - 1: last retry for a retryable action
294
        - MAX_RETRIES: original try for a retryable action
295
    @type msg: str
296
    @param msg: the kind of the operation
297
    @type fn: callable
298
    @param fn: the function to be called
299

    
300
    """
301
    try:
302
      val = fn(*args)
303
      if retry_count > 0 and retry_count < MAX_RETRIES:
304
        Log("Idempotent %s succeeded after %d retries",
305
            msg, MAX_RETRIES - retry_count)
306
      return val
307
    except Exception, err: # pylint: disable-msg=W0703
308
      if retry_count == 0:
309
        Log("Non-idempotent %s failed, aborting", msg)
310
        raise
311
      elif retry_count == 1:
312
        Log("Idempotent %s repeated failure, aborting", msg)
313
        raise
314
      else:
315
        Log("Idempotent %s failed, retry #%d/%d: %s",
316
            msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err)
317
        self.MaybeRetry(retry_count - 1, msg, fn, *args)
318

    
319
  def _SetDebug(self, ops):
320
    """Set the debug value on the given opcodes"""
321
    for op in ops:
322
      op.debug_level = self.opts.debug
323

    
324
  def _ExecOp(self, *ops):
325
    """Execute one or more opcodes and manage the exec buffer.
326

    
327
    @return: if only opcode has been passed, we return its result;
328
        otherwise we return the list of results
329

    
330
    """
331
    job_id = cli.SendJob(ops, cl=self.cl)
332
    results = cli.PollJob(job_id, cl=self.cl, feedback_fn=self.Feedback)
333
    if len(ops) == 1:
334
      return results[0]
335
    else:
336
      return results
337

    
338
  def ExecOp(self, retry, *ops):
339
    """Execute one or more opcodes and manage the exec buffer.
340

    
341
    @return: if only opcode has been passed, we return its result;
342
        otherwise we return the list of results
343

    
344
    """
345
    if retry:
346
      rval = MAX_RETRIES
347
    else:
348
      rval = 0
349
    self._SetDebug(ops)
350
    return self.MaybeRetry(rval, "opcode", self._ExecOp, *ops)
351

    
352
  def ExecOrQueue(self, name, ops, post_process=None):
353
    """Execute an opcode and manage the exec buffer."""
354
    if self.opts.parallel:
355
      self._SetDebug(ops)
356
      self.queued_ops.append((ops, name, post_process))
357
    else:
358
      val = self.ExecOp(self.queue_retry, *ops) # pylint: disable-msg=W0142
359
      if post_process is not None:
360
        post_process()
361
      return val
362

    
363
  def StartBatch(self, retry):
364
    """Start a new batch of jobs.
365

    
366
    @param retry: whether this is a retryable batch
367

    
368
    """
369
    self.queued_ops = []
370
    self.queue_retry = retry
371

    
372
  def CommitQueue(self):
373
    """Execute all submitted opcodes in case of parallel burnin"""
374
    if not self.opts.parallel:
375
      return
376

    
377
    if self.queue_retry:
378
      rval = MAX_RETRIES
379
    else:
380
      rval = 0
381

    
382
    try:
383
      results = self.MaybeRetry(rval, "jobset", self.ExecJobSet,
384
                                self.queued_ops)
385
    finally:
386
      self.queued_ops = []
387
    return results
388

    
389
  def ExecJobSet(self, jobs):
390
    """Execute a set of jobs and return once all are done.
391

    
392
    The method will return the list of results, if all jobs are
393
    successful. Otherwise, OpExecError will be raised from within
394
    cli.py.
395

    
396
    """
397
    self.ClearFeedbackBuf()
398
    jex = cli.JobExecutor(cl=self.cl, feedback_fn=self.Feedback)
399
    for ops, name, _ in jobs:
400
      jex.QueueJob(name, *ops) # pylint: disable-msg=W0142
401
    try:
402
      results = jex.GetResults()
403
    except Exception, err: # pylint: disable-msg=W0703
404
      Log("Jobs failed: %s", err)
405
      raise BurninFailure()
406

    
407
    fail = False
408
    val = []
409
    for (_, name, post_process), (success, result) in zip(jobs, results):
410
      if success:
411
        if post_process:
412
          try:
413
            post_process()
414
          except Exception, err: # pylint: disable-msg=W0703
415
            Log("Post process call for job %s failed: %s", name, err)
416
            fail = True
417
        val.append(result)
418
      else:
419
        fail = True
420

    
421
    if fail:
422
      raise BurninFailure()
423

    
424
    return val
425

    
426
  def ParseOptions(self):
427
    """Parses the command line options.
428

    
429
    In case of command line errors, it will show the usage and exit the
430
    program.
431

    
432
    """
433
    parser = optparse.OptionParser(usage="\n%s" % USAGE,
434
                                   version=("%%prog (ganeti) %s" %
435
                                            constants.RELEASE_VERSION),
436
                                   option_list=OPTIONS)
437

    
438
    options, args = parser.parse_args()
439
    if len(args) < 1 or options.os is None:
440
      Usage()
441

    
442
    supported_disk_templates = (constants.DT_DISKLESS,
443
                                constants.DT_FILE,
444
                                constants.DT_PLAIN,
445
                                constants.DT_DRBD8)
446
    if options.disk_template not in supported_disk_templates:
447
      Err("Unknown disk template '%s'" % options.disk_template)
448

    
449
    if options.disk_template == constants.DT_DISKLESS:
450
      disk_size = disk_growth = []
451
      options.do_addremove_disks = False
452
    else:
453
      disk_size = [utils.ParseUnit(v) for v in options.disk_size.split(",")]
454
      disk_growth = [utils.ParseUnit(v)
455
                     for v in options.disk_growth.split(",")]
456
      if len(disk_growth) != len(disk_size):
457
        Err("Wrong disk sizes/growth combination")
458
    if ((disk_size and options.disk_template == constants.DT_DISKLESS) or
459
        (not disk_size and options.disk_template != constants.DT_DISKLESS)):
460
      Err("Wrong disk count/disk template combination")
461

    
462
    self.disk_size = disk_size
463
    self.disk_growth = disk_growth
464
    self.disk_count = len(disk_size)
465

    
466
    if options.nodes and options.iallocator:
467
      Err("Give either the nodes option or the iallocator option, not both")
468

    
469
    if options.http_check and not options.name_check:
470
      Err("Can't enable HTTP checks without name checks")
471

    
472
    self.opts = options
473
    self.instances = args
474
    self.bep = {
475
      constants.BE_MEMORY: options.mem_size,
476
      constants.BE_VCPUS: 1,
477
      }
478

    
479
    self.hypervisor = None
480
    self.hvp = {}
481
    if options.hypervisor:
482
      self.hypervisor, self.hvp = options.hypervisor
483

    
484
    socket.setdefaulttimeout(options.net_timeout)
485

    
486
  def GetState(self):
487
    """Read the cluster state from the master daemon."""
488
    if self.opts.nodes:
489
      names = self.opts.nodes.split(",")
490
    else:
491
      names = []
492
    try:
493
      op = opcodes.OpQueryNodes(output_fields=["name", "offline", "drained"],
494
                                names=names, use_locking=True)
495
      result = self.ExecOp(True, op)
496
    except errors.GenericError, err:
497
      err_code, msg = cli.FormatError(err)
498
      Err(msg, exit_code=err_code)
499
    self.nodes = [data[0] for data in result if not (data[1] or data[2])]
500

    
501
    op_diagnose = opcodes.OpDiagnoseOS(output_fields=["name", "valid",
502
                                                      "variants"], names=[])
503
    result = self.ExecOp(True, op_diagnose)
504

    
505
    if not result:
506
      Err("Can't get the OS list")
507

    
508
    found = False
509
    for (name, valid, variants) in result:
510
      if valid and self.opts.os in cli.CalculateOSNames(name, variants):
511
        found = True
512
        break
513

    
514
    if not found:
515
      Err("OS '%s' not found" % self.opts.os)
516

    
517
    cluster_info = self.cl.QueryClusterInfo()
518
    self.cluster_info = cluster_info
519
    if not self.cluster_info:
520
      Err("Can't get cluster info")
521

    
522
    default_nic_params = self.cluster_info["nicparams"][constants.PP_DEFAULT]
523
    self.cluster_default_nicparams = default_nic_params
524
    if self.hypervisor is None:
525
      self.hypervisor = self.cluster_info["default_hypervisor"]
526
    self.hv_class = hypervisor.GetHypervisorClass(self.hypervisor)
527

    
528
  @_DoCheckInstances
529
  @_DoBatch(False)
530
  def BurnCreateInstances(self):
531
    """Create the given instances.
532

    
533
    """
534
    self.to_rem = []
535
    mytor = izip(cycle(self.nodes),
536
                 islice(cycle(self.nodes), 1, None),
537
                 self.instances)
538

    
539
    Log("Creating instances")
540
    for pnode, snode, instance in mytor:
541
      Log("instance %s", instance, indent=1)
542
      if self.opts.iallocator:
543
        pnode = snode = None
544
        msg = "with iallocator %s" % self.opts.iallocator
545
      elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
546
        snode = None
547
        msg = "on %s" % pnode
548
      else:
549
        msg = "on %s, %s" % (pnode, snode)
550

    
551
      Log(msg, indent=2)
552

    
553
      op = opcodes.OpCreateInstance(instance_name=instance,
554
                                    disks = [ {"size": size}
555
                                              for size in self.disk_size],
556
                                    disk_template=self.opts.disk_template,
557
                                    nics=self.opts.nics,
558
                                    mode=constants.INSTANCE_CREATE,
559
                                    os_type=self.opts.os,
560
                                    pnode=pnode,
561
                                    snode=snode,
562
                                    start=True,
563
                                    ip_check=self.opts.ip_check,
564
                                    name_check=self.opts.name_check,
565
                                    wait_for_sync=True,
566
                                    file_driver="loop",
567
                                    file_storage_dir=None,
568
                                    iallocator=self.opts.iallocator,
569
                                    beparams=self.bep,
570
                                    hvparams=self.hvp,
571
                                    hypervisor=self.hypervisor,
572
                                    )
573
      remove_instance = lambda name: lambda: self.to_rem.append(name)
574
      self.ExecOrQueue(instance, [op], post_process=remove_instance(instance))
575

    
576
  @_DoBatch(False)
577
  def BurnGrowDisks(self):
578
    """Grow both the os and the swap disks by the requested amount, if any."""
579
    Log("Growing disks")
580
    for instance in self.instances:
581
      Log("instance %s", instance, indent=1)
582
      for idx, growth in enumerate(self.disk_growth):
583
        if growth > 0:
584
          op = opcodes.OpGrowDisk(instance_name=instance, disk=idx,
585
                                  amount=growth, wait_for_sync=True)
586
          Log("increase disk/%s by %s MB", idx, growth, indent=2)
587
          self.ExecOrQueue(instance, [op])
588

    
589
  @_DoBatch(True)
590
  def BurnReplaceDisks1D8(self):
591
    """Replace disks on primary and secondary for drbd8."""
592
    Log("Replacing disks on the same nodes")
593
    for instance in self.instances:
594
      Log("instance %s", instance, indent=1)
595
      ops = []
596
      for mode in constants.REPLACE_DISK_SEC, constants.REPLACE_DISK_PRI:
597
        op = opcodes.OpReplaceDisks(instance_name=instance,
598
                                    mode=mode,
599
                                    disks=[i for i in range(self.disk_count)],
600
                                    early_release=self.opts.early_release)
601
        Log("run %s", mode, indent=2)
602
        ops.append(op)
603
      self.ExecOrQueue(instance, ops)
604

    
605
  @_DoBatch(True)
606
  def BurnReplaceDisks2(self):
607
    """Replace secondary node."""
608
    Log("Changing the secondary node")
609
    mode = constants.REPLACE_DISK_CHG
610

    
611
    mytor = izip(islice(cycle(self.nodes), 2, None),
612
                 self.instances)
613
    for tnode, instance in mytor:
614
      Log("instance %s", instance, indent=1)
615
      if self.opts.iallocator:
616
        tnode = None
617
        msg = "with iallocator %s" % self.opts.iallocator
618
      else:
619
        msg = tnode
620
      op = opcodes.OpReplaceDisks(instance_name=instance,
621
                                  mode=mode,
622
                                  remote_node=tnode,
623
                                  iallocator=self.opts.iallocator,
624
                                  disks=[],
625
                                  early_release=self.opts.early_release)
626
      Log("run %s %s", mode, msg, indent=2)
627
      self.ExecOrQueue(instance, [op])
628

    
629
  @_DoCheckInstances
630
  @_DoBatch(False)
631
  def BurnFailover(self):
632
    """Failover the instances."""
633
    Log("Failing over instances")
634
    for instance in self.instances:
635
      Log("instance %s", instance, indent=1)
636
      op = opcodes.OpFailoverInstance(instance_name=instance,
637
                                      ignore_consistency=False)
638
      self.ExecOrQueue(instance, [op])
639

    
640
  @_DoCheckInstances
641
  @_DoBatch(False)
642
  def BurnMove(self):
643
    """Move the instances."""
644
    Log("Moving instances")
645
    mytor = izip(islice(cycle(self.nodes), 1, None),
646
                 self.instances)
647
    for tnode, instance in mytor:
648
      Log("instance %s", instance, indent=1)
649
      op = opcodes.OpMoveInstance(instance_name=instance,
650
                                  target_node=tnode)
651
      self.ExecOrQueue(instance, [op])
652

    
653
  @_DoBatch(False)
654
  def BurnMigrate(self):
655
    """Migrate the instances."""
656
    Log("Migrating instances")
657
    for instance in self.instances:
658
      Log("instance %s", instance, indent=1)
659
      op1 = opcodes.OpMigrateInstance(instance_name=instance, live=True,
660
                                      cleanup=False)
661

    
662
      op2 = opcodes.OpMigrateInstance(instance_name=instance, live=True,
663
                                      cleanup=True)
664
      Log("migration and migration cleanup", indent=2)
665
      self.ExecOrQueue(instance, [op1, op2])
666

    
667
  @_DoCheckInstances
668
  @_DoBatch(False)
669
  def BurnImportExport(self):
670
    """Export the instance, delete it, and import it back.
671

    
672
    """
673
    Log("Exporting and re-importing instances")
674
    mytor = izip(cycle(self.nodes),
675
                 islice(cycle(self.nodes), 1, None),
676
                 islice(cycle(self.nodes), 2, None),
677
                 self.instances)
678

    
679
    for pnode, snode, enode, instance in mytor:
680
      Log("instance %s", instance, indent=1)
681
      # read the full name of the instance
682
      nam_op = opcodes.OpQueryInstances(output_fields=["name"],
683
                                        names=[instance], use_locking=True)
684
      full_name = self.ExecOp(False, nam_op)[0][0]
685

    
686
      if self.opts.iallocator:
687
        pnode = snode = None
688
        import_log_msg = ("import from %s"
689
                          " with iallocator %s" %
690
                          (enode, self.opts.iallocator))
691
      elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
692
        snode = None
693
        import_log_msg = ("import from %s to %s" %
694
                          (enode, pnode))
695
      else:
696
        import_log_msg = ("import from %s to %s, %s" %
697
                          (enode, pnode, snode))
698

    
699
      exp_op = opcodes.OpExportInstance(instance_name=instance,
700
                                           target_node=enode,
701
                                           shutdown=True)
702
      rem_op = opcodes.OpRemoveInstance(instance_name=instance,
703
                                        ignore_failures=True)
704
      imp_dir = utils.PathJoin(constants.EXPORT_DIR, full_name)
705
      imp_op = opcodes.OpCreateInstance(instance_name=instance,
706
                                        disks = [ {"size": size}
707
                                                  for size in self.disk_size],
708
                                        disk_template=self.opts.disk_template,
709
                                        nics=self.opts.nics,
710
                                        mode=constants.INSTANCE_IMPORT,
711
                                        src_node=enode,
712
                                        src_path=imp_dir,
713
                                        pnode=pnode,
714
                                        snode=snode,
715
                                        start=True,
716
                                        ip_check=self.opts.ip_check,
717
                                        name_check=self.opts.name_check,
718
                                        wait_for_sync=True,
719
                                        file_storage_dir=None,
720
                                        file_driver="loop",
721
                                        iallocator=self.opts.iallocator,
722
                                        beparams=self.bep,
723
                                        hvparams=self.hvp,
724
                                        )
725

    
726
      erem_op = opcodes.OpRemoveExport(instance_name=instance)
727

    
728
      Log("export to node %s", enode, indent=2)
729
      Log("remove instance", indent=2)
730
      Log(import_log_msg, indent=2)
731
      Log("remove export", indent=2)
732
      self.ExecOrQueue(instance, [exp_op, rem_op, imp_op, erem_op])
733

    
734
  @staticmethod
735
  def StopInstanceOp(instance):
736
    """Stop given instance."""
737
    return opcodes.OpShutdownInstance(instance_name=instance)
738

    
739
  @staticmethod
740
  def StartInstanceOp(instance):
741
    """Start given instance."""
742
    return opcodes.OpStartupInstance(instance_name=instance, force=False)
743

    
744
  @staticmethod
745
  def RenameInstanceOp(instance, instance_new):
746
    """Rename instance."""
747
    return opcodes.OpRenameInstance(instance_name=instance,
748
                                    new_name=instance_new)
749

    
750
  @_DoCheckInstances
751
  @_DoBatch(True)
752
  def BurnStopStart(self):
753
    """Stop/start the instances."""
754
    Log("Stopping and starting instances")
755
    for instance in self.instances:
756
      Log("instance %s", instance, indent=1)
757
      op1 = self.StopInstanceOp(instance)
758
      op2 = self.StartInstanceOp(instance)
759
      self.ExecOrQueue(instance, [op1, op2])
760

    
761
  @_DoBatch(False)
762
  def BurnRemove(self):
763
    """Remove the instances."""
764
    Log("Removing instances")
765
    for instance in self.to_rem:
766
      Log("instance %s", instance, indent=1)
767
      op = opcodes.OpRemoveInstance(instance_name=instance,
768
                                    ignore_failures=True)
769
      self.ExecOrQueue(instance, [op])
770

    
771
  def BurnRename(self):
772
    """Rename the instances.
773

    
774
    Note that this function will not execute in parallel, since we
775
    only have one target for rename.
776

    
777
    """
778
    Log("Renaming instances")
779
    rename = self.opts.rename
780
    for instance in self.instances:
781
      Log("instance %s", instance, indent=1)
782
      op_stop1 = self.StopInstanceOp(instance)
783
      op_stop2 = self.StopInstanceOp(rename)
784
      op_rename1 = self.RenameInstanceOp(instance, rename)
785
      op_rename2 = self.RenameInstanceOp(rename, instance)
786
      op_start1 = self.StartInstanceOp(rename)
787
      op_start2 = self.StartInstanceOp(instance)
788
      self.ExecOp(False, op_stop1, op_rename1, op_start1)
789
      self._CheckInstanceAlive(rename)
790
      self.ExecOp(False, op_stop2, op_rename2, op_start2)
791
      self._CheckInstanceAlive(instance)
792

    
793
  @_DoCheckInstances
794
  @_DoBatch(True)
795
  def BurnReinstall(self):
796
    """Reinstall the instances."""
797
    Log("Reinstalling instances")
798
    for instance in self.instances:
799
      Log("instance %s", instance, indent=1)
800
      op1 = self.StopInstanceOp(instance)
801
      op2 = opcodes.OpReinstallInstance(instance_name=instance)
802
      Log("reinstall without passing the OS", indent=2)
803
      op3 = opcodes.OpReinstallInstance(instance_name=instance,
804
                                        os_type=self.opts.os)
805
      Log("reinstall specifying the OS", indent=2)
806
      op4 = self.StartInstanceOp(instance)
807
      self.ExecOrQueue(instance, [op1, op2, op3, op4])
808

    
809
  @_DoCheckInstances
810
  @_DoBatch(True)
811
  def BurnReboot(self):
812
    """Reboot the instances."""
813
    Log("Rebooting instances")
814
    for instance in self.instances:
815
      Log("instance %s", instance, indent=1)
816
      ops = []
817
      for reboot_type in constants.REBOOT_TYPES:
818
        op = opcodes.OpRebootInstance(instance_name=instance,
819
                                      reboot_type=reboot_type,
820
                                      ignore_secondaries=False)
821
        Log("reboot with type '%s'", reboot_type, indent=2)
822
        ops.append(op)
823
      self.ExecOrQueue(instance, ops)
824

    
825
  @_DoCheckInstances
826
  @_DoBatch(True)
827
  def BurnActivateDisks(self):
828
    """Activate and deactivate disks of the instances."""
829
    Log("Activating/deactivating disks")
830
    for instance in self.instances:
831
      Log("instance %s", instance, indent=1)
832
      op_start = self.StartInstanceOp(instance)
833
      op_act = opcodes.OpActivateInstanceDisks(instance_name=instance)
834
      op_deact = opcodes.OpDeactivateInstanceDisks(instance_name=instance)
835
      op_stop = self.StopInstanceOp(instance)
836
      Log("activate disks when online", indent=2)
837
      Log("activate disks when offline", indent=2)
838
      Log("deactivate disks (when offline)", indent=2)
839
      self.ExecOrQueue(instance, [op_act, op_stop, op_act, op_deact, op_start])
840

    
841
  @_DoCheckInstances
842
  @_DoBatch(False)
843
  def BurnAddRemoveDisks(self):
844
    """Add and remove an extra disk for the instances."""
845
    Log("Adding and removing disks")
846
    for instance in self.instances:
847
      Log("instance %s", instance, indent=1)
848
      op_add = opcodes.OpSetInstanceParams(\
849
        instance_name=instance,
850
        disks=[(constants.DDM_ADD, {"size": self.disk_size[0]})])
851
      op_rem = opcodes.OpSetInstanceParams(\
852
        instance_name=instance, disks=[(constants.DDM_REMOVE, {})])
853
      op_stop = self.StopInstanceOp(instance)
854
      op_start = self.StartInstanceOp(instance)
855
      Log("adding a disk", indent=2)
856
      Log("removing last disk", indent=2)
857
      self.ExecOrQueue(instance, [op_add, op_stop, op_rem, op_start])
858

    
859
  @_DoBatch(False)
860
  def BurnAddRemoveNICs(self):
861
    """Add and remove an extra NIC for the instances."""
862
    Log("Adding and removing NICs")
863
    for instance in self.instances:
864
      Log("instance %s", instance, indent=1)
865
      op_add = opcodes.OpSetInstanceParams(\
866
        instance_name=instance, nics=[(constants.DDM_ADD, {})])
867
      op_rem = opcodes.OpSetInstanceParams(\
868
        instance_name=instance, nics=[(constants.DDM_REMOVE, {})])
869
      Log("adding a NIC", indent=2)
870
      Log("removing last NIC", indent=2)
871
      self.ExecOrQueue(instance, [op_add, op_rem])
872

    
873
  def ConfdCallback(self, reply):
874
    """Callback for confd queries"""
875
    if reply.type == confd_client.UPCALL_REPLY:
876
      if reply.server_reply.status != constants.CONFD_REPL_STATUS_OK:
877
        Err("Query %s gave non-ok status %s: %s" % (reply.orig_request,
878
                                                    reply.server_reply.status,
879
                                                    reply.server_reply))
880
      if reply.orig_request.type == constants.CONFD_REQ_PING:
881
        Log("Ping: OK", indent=1)
882
      elif reply.orig_request.type == constants.CONFD_REQ_CLUSTER_MASTER:
883
        if reply.server_reply.answer == self.cluster_info["master"]:
884
          Log("Master: OK", indent=1)
885
        else:
886
          Err("Master: wrong: %s" % reply.server_reply.answer)
887
      elif reply.orig_request.type == constants.CONFD_REQ_NODE_ROLE_BYNAME:
888
        if reply.server_reply.answer == constants.CONFD_NODE_ROLE_MASTER:
889
          Log("Node role for master: OK", indent=1)
890
        else:
891
          Err("Node role for master: wrong: %s" % reply.server_reply.answer)
892

    
893
  def DoConfdRequestReply(self, req):
894
    self.confd_counting_callback.RegisterQuery(req.rsalt)
895
    self.confd_client.SendRequest(req, async=False)
896
    while not self.confd_counting_callback.AllAnswered():
897
      if not self.confd_client.ReceiveReply():
898
        Err("Did not receive all expected confd replies")
899
        break
900

    
901
  def BurnConfd(self):
902
    """Run confd queries for our instances.
903

    
904
    The following confd queries are tested:
905
      - CONFD_REQ_PING: simple ping
906
      - CONFD_REQ_CLUSTER_MASTER: cluster master
907
      - CONFD_REQ_NODE_ROLE_BYNAME: node role, for the master
908

    
909
    """
910
    Log("Checking confd results")
911

    
912
    filter_callback = confd_client.ConfdFilterCallback(self.ConfdCallback)
913
    counting_callback = confd_client.ConfdCountingCallback(filter_callback)
914
    self.confd_counting_callback = counting_callback
915

    
916
    self.confd_client = confd_client.GetConfdClient(counting_callback)
917

    
918
    req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
919
    self.DoConfdRequestReply(req)
920

    
921
    req = confd_client.ConfdClientRequest(
922
      type=constants.CONFD_REQ_CLUSTER_MASTER)
923
    self.DoConfdRequestReply(req)
924

    
925
    req = confd_client.ConfdClientRequest(
926
        type=constants.CONFD_REQ_NODE_ROLE_BYNAME,
927
        query=self.cluster_info["master"])
928
    self.DoConfdRequestReply(req)
929

    
930
  def _CheckInstanceAlive(self, instance):
931
    """Check if an instance is alive by doing http checks.
932

    
933
    This will try to retrieve the url on the instance /hostname.txt
934
    and check that it contains the hostname of the instance. In case
935
    we get ECONNREFUSED, we retry up to the net timeout seconds, for
936
    any other error we abort.
937

    
938
    """
939
    if not self.opts.http_check:
940
      return
941
    end_time = time.time() + self.opts.net_timeout
942
    url = None
943
    while time.time() < end_time and url is None:
944
      try:
945
        url = self.url_opener.open("http://%s/hostname.txt" % instance)
946
      except IOError:
947
        # here we can have connection refused, no route to host, etc.
948
        time.sleep(1)
949
    if url is None:
950
      raise InstanceDown(instance, "Cannot contact instance")
951
    hostname = url.read().strip()
952
    url.close()
953
    if hostname != instance:
954
      raise InstanceDown(instance, ("Hostname mismatch, expected %s, got %s" %
955
                                    (instance, hostname)))
956

    
957
  def BurninCluster(self):
958
    """Test a cluster intensively.
959

    
960
    This will create instances and then start/stop/failover them.
961
    It is safe for existing instances but could impact performance.
962

    
963
    """
964

    
965
    opts = self.opts
966

    
967
    Log("Testing global parameters")
968

    
969
    if (len(self.nodes) == 1 and
970
        opts.disk_template not in (constants.DT_DISKLESS, constants.DT_PLAIN,
971
                                   constants.DT_FILE)):
972
      Err("When one node is available/selected the disk template must"
973
          " be 'diskless', 'file' or 'plain'")
974

    
975
    has_err = True
976
    try:
977
      self.BurnCreateInstances()
978
      if opts.do_replace1 and opts.disk_template in constants.DTS_NET_MIRROR:
979
        self.BurnReplaceDisks1D8()
980
      if (opts.do_replace2 and len(self.nodes) > 2 and
981
          opts.disk_template in constants.DTS_NET_MIRROR) :
982
        self.BurnReplaceDisks2()
983

    
984
      if (opts.disk_template in constants.DTS_GROWABLE and
985
          utils.any(self.disk_growth, lambda n: n > 0)):
986
        self.BurnGrowDisks()
987

    
988
      if opts.do_failover and opts.disk_template in constants.DTS_NET_MIRROR:
989
        self.BurnFailover()
990

    
991
      if opts.do_migrate:
992
        if opts.disk_template != constants.DT_DRBD8:
993
          Log("Skipping migration (disk template not DRBD8)")
994
        elif not self.hv_class.CAN_MIGRATE:
995
          Log("Skipping migration (hypervisor %s does not support it)",
996
              self.hypervisor)
997
        else:
998
          self.BurnMigrate()
999

    
1000
      if (opts.do_move and len(self.nodes) > 1 and
1001
          opts.disk_template in [constants.DT_PLAIN, constants.DT_FILE]):
1002
        self.BurnMove()
1003

    
1004
      if (opts.do_importexport and
1005
          opts.disk_template not in (constants.DT_DISKLESS,
1006
                                     constants.DT_FILE)):
1007
        self.BurnImportExport()
1008

    
1009
      if opts.do_reinstall:
1010
        self.BurnReinstall()
1011

    
1012
      if opts.do_reboot:
1013
        self.BurnReboot()
1014

    
1015
      if opts.do_addremove_disks:
1016
        self.BurnAddRemoveDisks()
1017

    
1018
      default_nic_mode = self.cluster_default_nicparams[constants.NIC_MODE]
1019
      # Don't add/remove nics in routed mode, as we would need an ip to add
1020
      # them with
1021
      if opts.do_addremove_nics:
1022
        if default_nic_mode == constants.NIC_MODE_BRIDGED:
1023
          self.BurnAddRemoveNICs()
1024
        else:
1025
          Log("Skipping nic add/remove as the cluster is not in bridged mode")
1026

    
1027
      if opts.do_activate_disks:
1028
        self.BurnActivateDisks()
1029

    
1030
      if opts.rename:
1031
        self.BurnRename()
1032

    
1033
      if opts.do_confd_tests:
1034
        self.BurnConfd()
1035

    
1036
      if opts.do_startstop:
1037
        self.BurnStopStart()
1038

    
1039
      has_err = False
1040
    finally:
1041
      if has_err:
1042
        Log("Error detected: opcode buffer follows:\n\n")
1043
        Log(self.GetFeedbackBuf())
1044
        Log("\n\n")
1045
      if not self.opts.keep_instances:
1046
        try:
1047
          self.BurnRemove()
1048
        except Exception, err:  # pylint: disable-msg=W0703
1049
          if has_err: # already detected errors, so errors in removal
1050
                      # are quite expected
1051
            Log("Note: error detected during instance remove: %s", err)
1052
          else: # non-expected error
1053
            raise
1054

    
1055
    return 0
1056

    
1057

    
1058
def main():
1059
  """Main function"""
1060

    
1061
  burner = Burner()
1062
  return burner.BurninCluster()
1063

    
1064

    
1065
if __name__ == "__main__":
1066
  main()