Statistics
| Branch: | Tag: | Revision:

root / tools / burnin @ 1d103c02

History | View | Annotate | Download (38 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Burnin program
23

    
24
"""
25

    
26
import sys
27
import optparse
28
import time
29
import socket
30
import urllib
31
from itertools import izip, islice, cycle
32
from cStringIO import StringIO
33

    
34
from ganeti import opcodes
35
from ganeti import constants
36
from ganeti import cli
37
from ganeti import errors
38
from ganeti import utils
39
from ganeti import hypervisor
40
from ganeti import compat
41

    
42
from ganeti.confd import client as confd_client
43

    
44

    
45
USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...")
46

    
47
MAX_RETRIES = 3
48
LOG_HEADERS = {
49
  0: "- ",
50
  1: "* ",
51
  2: ""
52
  }
53

    
54
class InstanceDown(Exception):
55
  """The checked instance was not up"""
56

    
57

    
58
class BurninFailure(Exception):
59
  """Failure detected during burning"""
60

    
61

    
62
def Usage():
63
  """Shows program usage information and exits the program."""
64

    
65
  print >> sys.stderr, "Usage:"
66
  print >> sys.stderr, USAGE
67
  sys.exit(2)
68

    
69

    
70
def Log(msg, *args, **kwargs):
71
  """Simple function that prints out its argument.
72

    
73
  """
74
  if args:
75
    msg = msg % args
76
  indent = kwargs.get('indent', 0)
77
  sys.stdout.write("%*s%s%s\n" % (2*indent, "",
78
                                  LOG_HEADERS.get(indent, "  "), msg))
79
  sys.stdout.flush()
80

    
81

    
82
def Err(msg, exit_code=1):
83
  """Simple error logging that prints to stderr.
84

    
85
  """
86
  sys.stderr.write(msg + "\n")
87
  sys.stderr.flush()
88
  sys.exit(exit_code)
89

    
90

    
91
class SimpleOpener(urllib.FancyURLopener):
92
  """A simple url opener"""
93
  # pylint: disable-msg=W0221
94

    
95
  def prompt_user_passwd(self, host, realm, clear_cache=0):
96
    """No-interaction version of prompt_user_passwd."""
97
    # we follow parent class' API
98
    # pylint: disable-msg=W0613
99
    return None, None
100

    
101
  def http_error_default(self, url, fp, errcode, errmsg, headers):
102
    """Custom error handling"""
103
    # make sure sockets are not left in CLOSE_WAIT, this is similar
104
    # but with a different exception to the BasicURLOpener class
105
    _ = fp.read() # throw away data
106
    fp.close()
107
    raise InstanceDown("HTTP error returned: code %s, msg %s" %
108
                       (errcode, errmsg))
109

    
110

    
111
OPTIONS = [
112
  cli.cli_option("-o", "--os", dest="os", default=None,
113
                 help="OS to use during burnin",
114
                 metavar="<OS>",
115
                 completion_suggest=cli.OPT_COMPL_ONE_OS),
116
  cli.HYPERVISOR_OPT,
117
  cli.cli_option("--disk-size", dest="disk_size",
118
                 help="Disk size (determines disk count)",
119
                 default="128m", type="string", metavar="<size,size,...>",
120
                 completion_suggest=("128M 512M 1G 4G 1G,256M"
121
                                     " 4G,1G,1G 10G").split()),
122
  cli.cli_option("--disk-growth", dest="disk_growth", help="Disk growth",
123
                 default="128m", type="string", metavar="<size,size,...>"),
124
  cli.cli_option("--mem-size", dest="mem_size", help="Memory size",
125
                 default=128, type="unit", metavar="<size>",
126
                 completion_suggest=("128M 256M 512M 1G 4G 8G"
127
                                     " 12G 16G").split()),
128
  cli.DEBUG_OPT,
129
  cli.VERBOSE_OPT,
130
  cli.NOIPCHECK_OPT,
131
  cli.NONAMECHECK_OPT,
132
  cli.EARLY_RELEASE_OPT,
133
  cli.cli_option("--no-replace1", dest="do_replace1",
134
                 help="Skip disk replacement with the same secondary",
135
                 action="store_false", default=True),
136
  cli.cli_option("--no-replace2", dest="do_replace2",
137
                 help="Skip disk replacement with a different secondary",
138
                 action="store_false", default=True),
139
  cli.cli_option("--no-failover", dest="do_failover",
140
                 help="Skip instance failovers", action="store_false",
141
                 default=True),
142
  cli.cli_option("--no-migrate", dest="do_migrate",
143
                 help="Skip instance live migration",
144
                 action="store_false", default=True),
145
  cli.cli_option("--no-move", dest="do_move",
146
                 help="Skip instance moves", action="store_false",
147
                 default=True),
148
  cli.cli_option("--no-importexport", dest="do_importexport",
149
                 help="Skip instance export/import", action="store_false",
150
                 default=True),
151
  cli.cli_option("--no-startstop", dest="do_startstop",
152
                 help="Skip instance stop/start", action="store_false",
153
                 default=True),
154
  cli.cli_option("--no-reinstall", dest="do_reinstall",
155
                 help="Skip instance reinstall", action="store_false",
156
                 default=True),
157
  cli.cli_option("--no-reboot", dest="do_reboot",
158
                 help="Skip instance reboot", action="store_false",
159
                 default=True),
160
  cli.cli_option("--reboot-types", dest="reboot_types",
161
                 help="Specify the reboot types", default=None),
162
  cli.cli_option("--no-activate-disks", dest="do_activate_disks",
163
                 help="Skip disk activation/deactivation",
164
                 action="store_false", default=True),
165
  cli.cli_option("--no-add-disks", dest="do_addremove_disks",
166
                 help="Skip disk addition/removal",
167
                 action="store_false", default=True),
168
  cli.cli_option("--no-add-nics", dest="do_addremove_nics",
169
                 help="Skip NIC addition/removal",
170
                 action="store_false", default=True),
171
  cli.cli_option("--no-nics", dest="nics",
172
                 help="No network interfaces", action="store_const",
173
                 const=[], default=[{}]),
174
  cli.cli_option("--no-confd", dest="do_confd_tests",
175
                 help="Skip confd queries",
176
                 action="store_false", default=True),
177
  cli.cli_option("--rename", dest="rename", default=None,
178
                 help=("Give one unused instance name which is taken"
179
                       " to start the renaming sequence"),
180
                 metavar="<instance_name>"),
181
  cli.cli_option("-t", "--disk-template", dest="disk_template",
182
                 choices=list(constants.DISK_TEMPLATES),
183
                 default=constants.DT_DRBD8,
184
                 help="Disk template (diskless, file, plain or drbd) [drbd]"),
185
  cli.cli_option("-n", "--nodes", dest="nodes", default="",
186
                 help=("Comma separated list of nodes to perform"
187
                       " the burnin on (defaults to all nodes)"),
188
                 completion_suggest=cli.OPT_COMPL_MANY_NODES),
189
  cli.cli_option("-I", "--iallocator", dest="iallocator",
190
                 default=None, type="string",
191
                 help=("Perform the allocation using an iallocator"
192
                       " instead of fixed node spread (node restrictions no"
193
                       " longer apply, therefore -n/--nodes must not be"
194
                       " used"),
195
                 completion_suggest=cli.OPT_COMPL_ONE_IALLOCATOR),
196
  cli.cli_option("-p", "--parallel", default=False, action="store_true",
197
                 dest="parallel",
198
                 help=("Enable parallelization of some operations in"
199
                       " order to speed burnin or to test granular locking")),
200
  cli.cli_option("--net-timeout", default=15, type="int",
201
                 dest="net_timeout",
202
                 help=("The instance check network timeout in seconds"
203
                       " (defaults to 15 seconds)"),
204
                 completion_suggest="15 60 300 900".split()),
205
  cli.cli_option("-C", "--http-check", default=False, action="store_true",
206
                 dest="http_check",
207
                 help=("Enable checking of instance status via http,"
208
                       " looking for /hostname.txt that should contain the"
209
                       " name of the instance")),
210
  cli.cli_option("-K", "--keep-instances", default=False,
211
                 action="store_true",
212
                 dest="keep_instances",
213
                 help=("Leave instances on the cluster after burnin,"
214
                       " for investigation in case of errors or simply"
215
                       " to use them")),
216
  ]
217

    
218
# Mainly used for bash completion
219
ARGUMENTS = [cli.ArgInstance(min=1)]
220

    
221

    
222
def _DoCheckInstances(fn):
223
  """Decorator for checking instances.
224

    
225
  """
226
  def wrapper(self, *args, **kwargs):
227
    val = fn(self, *args, **kwargs)
228
    for instance in self.instances:
229
      self._CheckInstanceAlive(instance) # pylint: disable-msg=W0212
230
    return val
231

    
232
  return wrapper
233

    
234

    
235
def _DoBatch(retry):
236
  """Decorator for possible batch operations.
237

    
238
  Must come after the _DoCheckInstances decorator (if any).
239

    
240
  @param retry: whether this is a retryable batch, will be
241
      passed to StartBatch
242

    
243
  """
244
  def wrap(fn):
245
    def batched(self, *args, **kwargs):
246
      self.StartBatch(retry)
247
      val = fn(self, *args, **kwargs)
248
      self.CommitQueue()
249
      return val
250
    return batched
251

    
252
  return wrap
253

    
254

    
255
class Burner(object):
256
  """Burner class."""
257

    
258
  def __init__(self):
259
    """Constructor."""
260
    utils.SetupLogging(constants.LOG_BURNIN, debug=False, stderr_logging=True)
261
    self.url_opener = SimpleOpener()
262
    self._feed_buf = StringIO()
263
    self.nodes = []
264
    self.instances = []
265
    self.to_rem = []
266
    self.queued_ops = []
267
    self.opts = None
268
    self.queue_retry = False
269
    self.disk_count = self.disk_growth = self.disk_size = None
270
    self.hvp = self.bep = None
271
    self.ParseOptions()
272
    self.cl = cli.GetClient()
273
    self.GetState()
274

    
275
  def ClearFeedbackBuf(self):
276
    """Clear the feedback buffer."""
277
    self._feed_buf.truncate(0)
278

    
279
  def GetFeedbackBuf(self):
280
    """Return the contents of the buffer."""
281
    return self._feed_buf.getvalue()
282

    
283
  def Feedback(self, msg):
284
    """Acumulate feedback in our buffer."""
285
    formatted_msg = "%s %s" % (time.ctime(utils.MergeTime(msg[0])), msg[2])
286
    self._feed_buf.write(formatted_msg + "\n")
287
    if self.opts.verbose:
288
      Log(formatted_msg, indent=3)
289

    
290
  def MaybeRetry(self, retry_count, msg, fn, *args):
291
    """Possibly retry a given function execution.
292

    
293
    @type retry_count: int
294
    @param retry_count: retry counter:
295
        - 0: non-retryable action
296
        - 1: last retry for a retryable action
297
        - MAX_RETRIES: original try for a retryable action
298
    @type msg: str
299
    @param msg: the kind of the operation
300
    @type fn: callable
301
    @param fn: the function to be called
302

    
303
    """
304
    try:
305
      val = fn(*args)
306
      if retry_count > 0 and retry_count < MAX_RETRIES:
307
        Log("Idempotent %s succeeded after %d retries",
308
            msg, MAX_RETRIES - retry_count)
309
      return val
310
    except Exception, err: # pylint: disable-msg=W0703
311
      if retry_count == 0:
312
        Log("Non-idempotent %s failed, aborting", msg)
313
        raise
314
      elif retry_count == 1:
315
        Log("Idempotent %s repeated failure, aborting", msg)
316
        raise
317
      else:
318
        Log("Idempotent %s failed, retry #%d/%d: %s",
319
            msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err)
320
        self.MaybeRetry(retry_count - 1, msg, fn, *args)
321

    
322
  def _SetDebug(self, ops):
323
    """Set the debug value on the given opcodes"""
324
    for op in ops:
325
      op.debug_level = self.opts.debug
326

    
327
  def _ExecOp(self, *ops):
328
    """Execute one or more opcodes and manage the exec buffer.
329

    
330
    @return: if only opcode has been passed, we return its result;
331
        otherwise we return the list of results
332

    
333
    """
334
    job_id = cli.SendJob(ops, cl=self.cl)
335
    results = cli.PollJob(job_id, cl=self.cl, feedback_fn=self.Feedback)
336
    if len(ops) == 1:
337
      return results[0]
338
    else:
339
      return results
340

    
341
  def ExecOp(self, retry, *ops):
342
    """Execute one or more opcodes and manage the exec buffer.
343

    
344
    @return: if only opcode has been passed, we return its result;
345
        otherwise we return the list of results
346

    
347
    """
348
    if retry:
349
      rval = MAX_RETRIES
350
    else:
351
      rval = 0
352
    self._SetDebug(ops)
353
    return self.MaybeRetry(rval, "opcode", self._ExecOp, *ops)
354

    
355
  def ExecOrQueue(self, name, ops, post_process=None):
356
    """Execute an opcode and manage the exec buffer."""
357
    if self.opts.parallel:
358
      self._SetDebug(ops)
359
      self.queued_ops.append((ops, name, post_process))
360
    else:
361
      val = self.ExecOp(self.queue_retry, *ops) # pylint: disable-msg=W0142
362
      if post_process is not None:
363
        post_process()
364
      return val
365

    
366
  def StartBatch(self, retry):
367
    """Start a new batch of jobs.
368

    
369
    @param retry: whether this is a retryable batch
370

    
371
    """
372
    self.queued_ops = []
373
    self.queue_retry = retry
374

    
375
  def CommitQueue(self):
376
    """Execute all submitted opcodes in case of parallel burnin"""
377
    if not self.opts.parallel:
378
      return
379

    
380
    if self.queue_retry:
381
      rval = MAX_RETRIES
382
    else:
383
      rval = 0
384

    
385
    try:
386
      results = self.MaybeRetry(rval, "jobset", self.ExecJobSet,
387
                                self.queued_ops)
388
    finally:
389
      self.queued_ops = []
390
    return results
391

    
392
  def ExecJobSet(self, jobs):
393
    """Execute a set of jobs and return once all are done.
394

    
395
    The method will return the list of results, if all jobs are
396
    successful. Otherwise, OpExecError will be raised from within
397
    cli.py.
398

    
399
    """
400
    self.ClearFeedbackBuf()
401
    jex = cli.JobExecutor(cl=self.cl, feedback_fn=self.Feedback)
402
    for ops, name, _ in jobs:
403
      jex.QueueJob(name, *ops) # pylint: disable-msg=W0142
404
    try:
405
      results = jex.GetResults()
406
    except Exception, err: # pylint: disable-msg=W0703
407
      Log("Jobs failed: %s", err)
408
      raise BurninFailure()
409

    
410
    fail = False
411
    val = []
412
    for (_, name, post_process), (success, result) in zip(jobs, results):
413
      if success:
414
        if post_process:
415
          try:
416
            post_process()
417
          except Exception, err: # pylint: disable-msg=W0703
418
            Log("Post process call for job %s failed: %s", name, err)
419
            fail = True
420
        val.append(result)
421
      else:
422
        fail = True
423

    
424
    if fail:
425
      raise BurninFailure()
426

    
427
    return val
428

    
429
  def ParseOptions(self):
430
    """Parses the command line options.
431

    
432
    In case of command line errors, it will show the usage and exit the
433
    program.
434

    
435
    """
436
    parser = optparse.OptionParser(usage="\n%s" % USAGE,
437
                                   version=("%%prog (ganeti) %s" %
438
                                            constants.RELEASE_VERSION),
439
                                   option_list=OPTIONS)
440

    
441
    options, args = parser.parse_args()
442
    if len(args) < 1 or options.os is None:
443
      Usage()
444

    
445
    supported_disk_templates = (constants.DT_DISKLESS,
446
                                constants.DT_FILE,
447
                                constants.DT_PLAIN,
448
                                constants.DT_DRBD8)
449
    if options.disk_template not in supported_disk_templates:
450
      Err("Unknown disk template '%s'" % options.disk_template)
451

    
452
    if options.disk_template == constants.DT_DISKLESS:
453
      disk_size = disk_growth = []
454
      options.do_addremove_disks = False
455
    else:
456
      disk_size = [utils.ParseUnit(v) for v in options.disk_size.split(",")]
457
      disk_growth = [utils.ParseUnit(v)
458
                     for v in options.disk_growth.split(",")]
459
      if len(disk_growth) != len(disk_size):
460
        Err("Wrong disk sizes/growth combination")
461
    if ((disk_size and options.disk_template == constants.DT_DISKLESS) or
462
        (not disk_size and options.disk_template != constants.DT_DISKLESS)):
463
      Err("Wrong disk count/disk template combination")
464

    
465
    self.disk_size = disk_size
466
    self.disk_growth = disk_growth
467
    self.disk_count = len(disk_size)
468

    
469
    if options.nodes and options.iallocator:
470
      Err("Give either the nodes option or the iallocator option, not both")
471

    
472
    if options.http_check and not options.name_check:
473
      Err("Can't enable HTTP checks without name checks")
474

    
475
    self.opts = options
476
    self.instances = args
477
    self.bep = {
478
      constants.BE_MEMORY: options.mem_size,
479
      constants.BE_VCPUS: 1,
480
      }
481

    
482
    self.hypervisor = None
483
    self.hvp = {}
484
    if options.hypervisor:
485
      self.hypervisor, self.hvp = options.hypervisor
486

    
487
    if options.reboot_types is None:
488
      options.reboot_types = constants.REBOOT_TYPES
489
    else:
490
      options.reboot_types = options.reboot_types.split(",")
491
      rt_diff = set(options.reboot_types).difference(constants.REBOOT_TYPES)
492
      if rt_diff:
493
        Err("Invalid reboot types specified: %s" % utils.CommaJoin(rt_diff))
494

    
495
    socket.setdefaulttimeout(options.net_timeout)
496

    
497
  def GetState(self):
498
    """Read the cluster state from the master daemon."""
499
    if self.opts.nodes:
500
      names = self.opts.nodes.split(",")
501
    else:
502
      names = []
503
    try:
504
      op = opcodes.OpQueryNodes(output_fields=["name", "offline", "drained"],
505
                                names=names, use_locking=True)
506
      result = self.ExecOp(True, op)
507
    except errors.GenericError, err:
508
      err_code, msg = cli.FormatError(err)
509
      Err(msg, exit_code=err_code)
510
    self.nodes = [data[0] for data in result if not (data[1] or data[2])]
511

    
512
    op_diagnose = opcodes.OpDiagnoseOS(output_fields=["name", "valid",
513
                                                      "variants"], names=[])
514
    result = self.ExecOp(True, op_diagnose)
515

    
516
    if not result:
517
      Err("Can't get the OS list")
518

    
519
    found = False
520
    for (name, valid, variants) in result:
521
      if valid and self.opts.os in cli.CalculateOSNames(name, variants):
522
        found = True
523
        break
524

    
525
    if not found:
526
      Err("OS '%s' not found" % self.opts.os)
527

    
528
    cluster_info = self.cl.QueryClusterInfo()
529
    self.cluster_info = cluster_info
530
    if not self.cluster_info:
531
      Err("Can't get cluster info")
532

    
533
    default_nic_params = self.cluster_info["nicparams"][constants.PP_DEFAULT]
534
    self.cluster_default_nicparams = default_nic_params
535
    if self.hypervisor is None:
536
      self.hypervisor = self.cluster_info["default_hypervisor"]
537
    self.hv_class = hypervisor.GetHypervisorClass(self.hypervisor)
538

    
539
  @_DoCheckInstances
540
  @_DoBatch(False)
541
  def BurnCreateInstances(self):
542
    """Create the given instances.
543

    
544
    """
545
    self.to_rem = []
546
    mytor = izip(cycle(self.nodes),
547
                 islice(cycle(self.nodes), 1, None),
548
                 self.instances)
549

    
550
    Log("Creating instances")
551
    for pnode, snode, instance in mytor:
552
      Log("instance %s", instance, indent=1)
553
      if self.opts.iallocator:
554
        pnode = snode = None
555
        msg = "with iallocator %s" % self.opts.iallocator
556
      elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
557
        snode = None
558
        msg = "on %s" % pnode
559
      else:
560
        msg = "on %s, %s" % (pnode, snode)
561

    
562
      Log(msg, indent=2)
563

    
564
      op = opcodes.OpCreateInstance(instance_name=instance,
565
                                    disks = [ {"size": size}
566
                                              for size in self.disk_size],
567
                                    disk_template=self.opts.disk_template,
568
                                    nics=self.opts.nics,
569
                                    mode=constants.INSTANCE_CREATE,
570
                                    os_type=self.opts.os,
571
                                    pnode=pnode,
572
                                    snode=snode,
573
                                    start=True,
574
                                    ip_check=self.opts.ip_check,
575
                                    name_check=self.opts.name_check,
576
                                    wait_for_sync=True,
577
                                    file_driver="loop",
578
                                    file_storage_dir=None,
579
                                    iallocator=self.opts.iallocator,
580
                                    beparams=self.bep,
581
                                    hvparams=self.hvp,
582
                                    hypervisor=self.hypervisor,
583
                                    )
584
      remove_instance = lambda name: lambda: self.to_rem.append(name)
585
      self.ExecOrQueue(instance, [op], post_process=remove_instance(instance))
586

    
587
  @_DoBatch(False)
588
  def BurnGrowDisks(self):
589
    """Grow both the os and the swap disks by the requested amount, if any."""
590
    Log("Growing disks")
591
    for instance in self.instances:
592
      Log("instance %s", instance, indent=1)
593
      for idx, growth in enumerate(self.disk_growth):
594
        if growth > 0:
595
          op = opcodes.OpGrowDisk(instance_name=instance, disk=idx,
596
                                  amount=growth, wait_for_sync=True)
597
          Log("increase disk/%s by %s MB", idx, growth, indent=2)
598
          self.ExecOrQueue(instance, [op])
599

    
600
  @_DoBatch(True)
601
  def BurnReplaceDisks1D8(self):
602
    """Replace disks on primary and secondary for drbd8."""
603
    Log("Replacing disks on the same nodes")
604
    for instance in self.instances:
605
      Log("instance %s", instance, indent=1)
606
      ops = []
607
      for mode in constants.REPLACE_DISK_SEC, constants.REPLACE_DISK_PRI:
608
        op = opcodes.OpReplaceDisks(instance_name=instance,
609
                                    mode=mode,
610
                                    disks=[i for i in range(self.disk_count)],
611
                                    early_release=self.opts.early_release)
612
        Log("run %s", mode, indent=2)
613
        ops.append(op)
614
      self.ExecOrQueue(instance, ops)
615

    
616
  @_DoBatch(True)
617
  def BurnReplaceDisks2(self):
618
    """Replace secondary node."""
619
    Log("Changing the secondary node")
620
    mode = constants.REPLACE_DISK_CHG
621

    
622
    mytor = izip(islice(cycle(self.nodes), 2, None),
623
                 self.instances)
624
    for tnode, instance in mytor:
625
      Log("instance %s", instance, indent=1)
626
      if self.opts.iallocator:
627
        tnode = None
628
        msg = "with iallocator %s" % self.opts.iallocator
629
      else:
630
        msg = tnode
631
      op = opcodes.OpReplaceDisks(instance_name=instance,
632
                                  mode=mode,
633
                                  remote_node=tnode,
634
                                  iallocator=self.opts.iallocator,
635
                                  disks=[],
636
                                  early_release=self.opts.early_release)
637
      Log("run %s %s", mode, msg, indent=2)
638
      self.ExecOrQueue(instance, [op])
639

    
640
  @_DoCheckInstances
641
  @_DoBatch(False)
642
  def BurnFailover(self):
643
    """Failover the instances."""
644
    Log("Failing over instances")
645
    for instance in self.instances:
646
      Log("instance %s", instance, indent=1)
647
      op = opcodes.OpFailoverInstance(instance_name=instance,
648
                                      ignore_consistency=False)
649
      self.ExecOrQueue(instance, [op])
650

    
651
  @_DoCheckInstances
652
  @_DoBatch(False)
653
  def BurnMove(self):
654
    """Move the instances."""
655
    Log("Moving instances")
656
    mytor = izip(islice(cycle(self.nodes), 1, None),
657
                 self.instances)
658
    for tnode, instance in mytor:
659
      Log("instance %s", instance, indent=1)
660
      op = opcodes.OpMoveInstance(instance_name=instance,
661
                                  target_node=tnode)
662
      self.ExecOrQueue(instance, [op])
663

    
664
  @_DoBatch(False)
665
  def BurnMigrate(self):
666
    """Migrate the instances."""
667
    Log("Migrating instances")
668
    for instance in self.instances:
669
      Log("instance %s", instance, indent=1)
670
      op1 = opcodes.OpMigrateInstance(instance_name=instance, live=True,
671
                                      cleanup=False)
672

    
673
      op2 = opcodes.OpMigrateInstance(instance_name=instance, live=True,
674
                                      cleanup=True)
675
      Log("migration and migration cleanup", indent=2)
676
      self.ExecOrQueue(instance, [op1, op2])
677

    
678
  @_DoCheckInstances
679
  @_DoBatch(False)
680
  def BurnImportExport(self):
681
    """Export the instance, delete it, and import it back.
682

    
683
    """
684
    Log("Exporting and re-importing instances")
685
    mytor = izip(cycle(self.nodes),
686
                 islice(cycle(self.nodes), 1, None),
687
                 islice(cycle(self.nodes), 2, None),
688
                 self.instances)
689

    
690
    for pnode, snode, enode, instance in mytor:
691
      Log("instance %s", instance, indent=1)
692
      # read the full name of the instance
693
      nam_op = opcodes.OpQueryInstances(output_fields=["name"],
694
                                        names=[instance], use_locking=True)
695
      full_name = self.ExecOp(False, nam_op)[0][0]
696

    
697
      if self.opts.iallocator:
698
        pnode = snode = None
699
        import_log_msg = ("import from %s"
700
                          " with iallocator %s" %
701
                          (enode, self.opts.iallocator))
702
      elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
703
        snode = None
704
        import_log_msg = ("import from %s to %s" %
705
                          (enode, pnode))
706
      else:
707
        import_log_msg = ("import from %s to %s, %s" %
708
                          (enode, pnode, snode))
709

    
710
      exp_op = opcodes.OpExportInstance(instance_name=instance,
711
                                           target_node=enode,
712
                                           shutdown=True)
713
      rem_op = opcodes.OpRemoveInstance(instance_name=instance,
714
                                        ignore_failures=True)
715
      imp_dir = utils.PathJoin(constants.EXPORT_DIR, full_name)
716
      imp_op = opcodes.OpCreateInstance(instance_name=instance,
717
                                        disks = [ {"size": size}
718
                                                  for size in self.disk_size],
719
                                        disk_template=self.opts.disk_template,
720
                                        nics=self.opts.nics,
721
                                        mode=constants.INSTANCE_IMPORT,
722
                                        src_node=enode,
723
                                        src_path=imp_dir,
724
                                        pnode=pnode,
725
                                        snode=snode,
726
                                        start=True,
727
                                        ip_check=self.opts.ip_check,
728
                                        name_check=self.opts.name_check,
729
                                        wait_for_sync=True,
730
                                        file_storage_dir=None,
731
                                        file_driver="loop",
732
                                        iallocator=self.opts.iallocator,
733
                                        beparams=self.bep,
734
                                        hvparams=self.hvp,
735
                                        )
736

    
737
      erem_op = opcodes.OpRemoveExport(instance_name=instance)
738

    
739
      Log("export to node %s", enode, indent=2)
740
      Log("remove instance", indent=2)
741
      Log(import_log_msg, indent=2)
742
      Log("remove export", indent=2)
743
      self.ExecOrQueue(instance, [exp_op, rem_op, imp_op, erem_op])
744

    
745
  @staticmethod
746
  def StopInstanceOp(instance):
747
    """Stop given instance."""
748
    return opcodes.OpShutdownInstance(instance_name=instance)
749

    
750
  @staticmethod
751
  def StartInstanceOp(instance):
752
    """Start given instance."""
753
    return opcodes.OpStartupInstance(instance_name=instance, force=False)
754

    
755
  @staticmethod
756
  def RenameInstanceOp(instance, instance_new):
757
    """Rename instance."""
758
    return opcodes.OpRenameInstance(instance_name=instance,
759
                                    new_name=instance_new)
760

    
761
  @_DoCheckInstances
762
  @_DoBatch(True)
763
  def BurnStopStart(self):
764
    """Stop/start the instances."""
765
    Log("Stopping and starting instances")
766
    for instance in self.instances:
767
      Log("instance %s", instance, indent=1)
768
      op1 = self.StopInstanceOp(instance)
769
      op2 = self.StartInstanceOp(instance)
770
      self.ExecOrQueue(instance, [op1, op2])
771

    
772
  @_DoBatch(False)
773
  def BurnRemove(self):
774
    """Remove the instances."""
775
    Log("Removing instances")
776
    for instance in self.to_rem:
777
      Log("instance %s", instance, indent=1)
778
      op = opcodes.OpRemoveInstance(instance_name=instance,
779
                                    ignore_failures=True)
780
      self.ExecOrQueue(instance, [op])
781

    
782
  def BurnRename(self):
783
    """Rename the instances.
784

    
785
    Note that this function will not execute in parallel, since we
786
    only have one target for rename.
787

    
788
    """
789
    Log("Renaming instances")
790
    rename = self.opts.rename
791
    for instance in self.instances:
792
      Log("instance %s", instance, indent=1)
793
      op_stop1 = self.StopInstanceOp(instance)
794
      op_stop2 = self.StopInstanceOp(rename)
795
      op_rename1 = self.RenameInstanceOp(instance, rename)
796
      op_rename2 = self.RenameInstanceOp(rename, instance)
797
      op_start1 = self.StartInstanceOp(rename)
798
      op_start2 = self.StartInstanceOp(instance)
799
      self.ExecOp(False, op_stop1, op_rename1, op_start1)
800
      self._CheckInstanceAlive(rename)
801
      self.ExecOp(False, op_stop2, op_rename2, op_start2)
802
      self._CheckInstanceAlive(instance)
803

    
804
  @_DoCheckInstances
805
  @_DoBatch(True)
806
  def BurnReinstall(self):
807
    """Reinstall the instances."""
808
    Log("Reinstalling instances")
809
    for instance in self.instances:
810
      Log("instance %s", instance, indent=1)
811
      op1 = self.StopInstanceOp(instance)
812
      op2 = opcodes.OpReinstallInstance(instance_name=instance)
813
      Log("reinstall without passing the OS", indent=2)
814
      op3 = opcodes.OpReinstallInstance(instance_name=instance,
815
                                        os_type=self.opts.os)
816
      Log("reinstall specifying the OS", indent=2)
817
      op4 = self.StartInstanceOp(instance)
818
      self.ExecOrQueue(instance, [op1, op2, op3, op4])
819

    
820
  @_DoCheckInstances
821
  @_DoBatch(True)
822
  def BurnReboot(self):
823
    """Reboot the instances."""
824
    Log("Rebooting instances")
825
    for instance in self.instances:
826
      Log("instance %s", instance, indent=1)
827
      ops = []
828
      for reboot_type in self.opts.reboot_types:
829
        op = opcodes.OpRebootInstance(instance_name=instance,
830
                                      reboot_type=reboot_type,
831
                                      ignore_secondaries=False)
832
        Log("reboot with type '%s'", reboot_type, indent=2)
833
        ops.append(op)
834
      self.ExecOrQueue(instance, ops)
835

    
836
  @_DoCheckInstances
837
  @_DoBatch(True)
838
  def BurnActivateDisks(self):
839
    """Activate and deactivate disks of the instances."""
840
    Log("Activating/deactivating disks")
841
    for instance in self.instances:
842
      Log("instance %s", instance, indent=1)
843
      op_start = self.StartInstanceOp(instance)
844
      op_act = opcodes.OpActivateInstanceDisks(instance_name=instance)
845
      op_deact = opcodes.OpDeactivateInstanceDisks(instance_name=instance)
846
      op_stop = self.StopInstanceOp(instance)
847
      Log("activate disks when online", indent=2)
848
      Log("activate disks when offline", indent=2)
849
      Log("deactivate disks (when offline)", indent=2)
850
      self.ExecOrQueue(instance, [op_act, op_stop, op_act, op_deact, op_start])
851

    
852
  @_DoCheckInstances
853
  @_DoBatch(False)
854
  def BurnAddRemoveDisks(self):
855
    """Add and remove an extra disk for the instances."""
856
    Log("Adding and removing disks")
857
    for instance in self.instances:
858
      Log("instance %s", instance, indent=1)
859
      op_add = opcodes.OpSetInstanceParams(\
860
        instance_name=instance,
861
        disks=[(constants.DDM_ADD, {"size": self.disk_size[0]})])
862
      op_rem = opcodes.OpSetInstanceParams(\
863
        instance_name=instance, disks=[(constants.DDM_REMOVE, {})])
864
      op_stop = self.StopInstanceOp(instance)
865
      op_start = self.StartInstanceOp(instance)
866
      Log("adding a disk", indent=2)
867
      Log("removing last disk", indent=2)
868
      self.ExecOrQueue(instance, [op_add, op_stop, op_rem, op_start])
869

    
870
  @_DoBatch(False)
871
  def BurnAddRemoveNICs(self):
872
    """Add and remove an extra NIC for the instances."""
873
    Log("Adding and removing NICs")
874
    for instance in self.instances:
875
      Log("instance %s", instance, indent=1)
876
      op_add = opcodes.OpSetInstanceParams(\
877
        instance_name=instance, nics=[(constants.DDM_ADD, {})])
878
      op_rem = opcodes.OpSetInstanceParams(\
879
        instance_name=instance, nics=[(constants.DDM_REMOVE, {})])
880
      Log("adding a NIC", indent=2)
881
      Log("removing last NIC", indent=2)
882
      self.ExecOrQueue(instance, [op_add, op_rem])
883

    
884
  def ConfdCallback(self, reply):
885
    """Callback for confd queries"""
886
    if reply.type == confd_client.UPCALL_REPLY:
887
      if reply.server_reply.status != constants.CONFD_REPL_STATUS_OK:
888
        Err("Query %s gave non-ok status %s: %s" % (reply.orig_request,
889
                                                    reply.server_reply.status,
890
                                                    reply.server_reply))
891
      if reply.orig_request.type == constants.CONFD_REQ_PING:
892
        Log("Ping: OK", indent=1)
893
      elif reply.orig_request.type == constants.CONFD_REQ_CLUSTER_MASTER:
894
        if reply.server_reply.answer == self.cluster_info["master"]:
895
          Log("Master: OK", indent=1)
896
        else:
897
          Err("Master: wrong: %s" % reply.server_reply.answer)
898
      elif reply.orig_request.type == constants.CONFD_REQ_NODE_ROLE_BYNAME:
899
        if reply.server_reply.answer == constants.CONFD_NODE_ROLE_MASTER:
900
          Log("Node role for master: OK", indent=1)
901
        else:
902
          Err("Node role for master: wrong: %s" % reply.server_reply.answer)
903

    
904
  def DoConfdRequestReply(self, req):
905
    self.confd_counting_callback.RegisterQuery(req.rsalt)
906
    self.confd_client.SendRequest(req, async=False)
907
    while not self.confd_counting_callback.AllAnswered():
908
      if not self.confd_client.ReceiveReply():
909
        Err("Did not receive all expected confd replies")
910
        break
911

    
912
  def BurnConfd(self):
913
    """Run confd queries for our instances.
914

    
915
    The following confd queries are tested:
916
      - CONFD_REQ_PING: simple ping
917
      - CONFD_REQ_CLUSTER_MASTER: cluster master
918
      - CONFD_REQ_NODE_ROLE_BYNAME: node role, for the master
919

    
920
    """
921
    Log("Checking confd results")
922

    
923
    filter_callback = confd_client.ConfdFilterCallback(self.ConfdCallback)
924
    counting_callback = confd_client.ConfdCountingCallback(filter_callback)
925
    self.confd_counting_callback = counting_callback
926

    
927
    self.confd_client = confd_client.GetConfdClient(counting_callback)
928

    
929
    req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
930
    self.DoConfdRequestReply(req)
931

    
932
    req = confd_client.ConfdClientRequest(
933
      type=constants.CONFD_REQ_CLUSTER_MASTER)
934
    self.DoConfdRequestReply(req)
935

    
936
    req = confd_client.ConfdClientRequest(
937
        type=constants.CONFD_REQ_NODE_ROLE_BYNAME,
938
        query=self.cluster_info["master"])
939
    self.DoConfdRequestReply(req)
940

    
941
  def _CheckInstanceAlive(self, instance):
942
    """Check if an instance is alive by doing http checks.
943

    
944
    This will try to retrieve the url on the instance /hostname.txt
945
    and check that it contains the hostname of the instance. In case
946
    we get ECONNREFUSED, we retry up to the net timeout seconds, for
947
    any other error we abort.
948

    
949
    """
950
    if not self.opts.http_check:
951
      return
952
    end_time = time.time() + self.opts.net_timeout
953
    url = None
954
    while time.time() < end_time and url is None:
955
      try:
956
        url = self.url_opener.open("http://%s/hostname.txt" % instance)
957
      except IOError:
958
        # here we can have connection refused, no route to host, etc.
959
        time.sleep(1)
960
    if url is None:
961
      raise InstanceDown(instance, "Cannot contact instance")
962
    hostname = url.read().strip()
963
    url.close()
964
    if hostname != instance:
965
      raise InstanceDown(instance, ("Hostname mismatch, expected %s, got %s" %
966
                                    (instance, hostname)))
967

    
968
  def BurninCluster(self):
969
    """Test a cluster intensively.
970

    
971
    This will create instances and then start/stop/failover them.
972
    It is safe for existing instances but could impact performance.
973

    
974
    """
975

    
976
    opts = self.opts
977

    
978
    Log("Testing global parameters")
979

    
980
    if (len(self.nodes) == 1 and
981
        opts.disk_template not in (constants.DT_DISKLESS, constants.DT_PLAIN,
982
                                   constants.DT_FILE)):
983
      Err("When one node is available/selected the disk template must"
984
          " be 'diskless', 'file' or 'plain'")
985

    
986
    has_err = True
987
    try:
988
      self.BurnCreateInstances()
989
      if opts.do_replace1 and opts.disk_template in constants.DTS_NET_MIRROR:
990
        self.BurnReplaceDisks1D8()
991
      if (opts.do_replace2 and len(self.nodes) > 2 and
992
          opts.disk_template in constants.DTS_NET_MIRROR) :
993
        self.BurnReplaceDisks2()
994

    
995
      if (opts.disk_template in constants.DTS_GROWABLE and
996
          compat.any(self.disk_growth, lambda n: n > 0)):
997
        self.BurnGrowDisks()
998

    
999
      if opts.do_failover and opts.disk_template in constants.DTS_NET_MIRROR:
1000
        self.BurnFailover()
1001

    
1002
      if opts.do_migrate:
1003
        if opts.disk_template != constants.DT_DRBD8:
1004
          Log("Skipping migration (disk template not DRBD8)")
1005
        elif not self.hv_class.CAN_MIGRATE:
1006
          Log("Skipping migration (hypervisor %s does not support it)",
1007
              self.hypervisor)
1008
        else:
1009
          self.BurnMigrate()
1010

    
1011
      if (opts.do_move and len(self.nodes) > 1 and
1012
          opts.disk_template in [constants.DT_PLAIN, constants.DT_FILE]):
1013
        self.BurnMove()
1014

    
1015
      if (opts.do_importexport and
1016
          opts.disk_template not in (constants.DT_DISKLESS,
1017
                                     constants.DT_FILE)):
1018
        self.BurnImportExport()
1019

    
1020
      if opts.do_reinstall:
1021
        self.BurnReinstall()
1022

    
1023
      if opts.do_reboot:
1024
        self.BurnReboot()
1025

    
1026
      if opts.do_addremove_disks:
1027
        self.BurnAddRemoveDisks()
1028

    
1029
      default_nic_mode = self.cluster_default_nicparams[constants.NIC_MODE]
1030
      # Don't add/remove nics in routed mode, as we would need an ip to add
1031
      # them with
1032
      if opts.do_addremove_nics:
1033
        if default_nic_mode == constants.NIC_MODE_BRIDGED:
1034
          self.BurnAddRemoveNICs()
1035
        else:
1036
          Log("Skipping nic add/remove as the cluster is not in bridged mode")
1037

    
1038
      if opts.do_activate_disks:
1039
        self.BurnActivateDisks()
1040

    
1041
      if opts.rename:
1042
        self.BurnRename()
1043

    
1044
      if opts.do_confd_tests:
1045
        self.BurnConfd()
1046

    
1047
      if opts.do_startstop:
1048
        self.BurnStopStart()
1049

    
1050
      has_err = False
1051
    finally:
1052
      if has_err:
1053
        Log("Error detected: opcode buffer follows:\n\n")
1054
        Log(self.GetFeedbackBuf())
1055
        Log("\n\n")
1056
      if not self.opts.keep_instances:
1057
        try:
1058
          self.BurnRemove()
1059
        except Exception, err:  # pylint: disable-msg=W0703
1060
          if has_err: # already detected errors, so errors in removal
1061
                      # are quite expected
1062
            Log("Note: error detected during instance remove: %s", err)
1063
          else: # non-expected error
1064
            raise
1065

    
1066
    return 0
1067

    
1068

    
1069
def main():
1070
  """Main function"""
1071

    
1072
  burner = Burner()
1073
  return burner.BurninCluster()
1074

    
1075

    
1076
if __name__ == "__main__":
1077
  main()