Statistics
| Branch: | Tag: | Revision:

root / tools / burnin @ 1817dca9

History | View | Annotate | Download (38.8 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Burnin program
23

    
24
"""
25

    
26
import sys
27
import optparse
28
import time
29
import socket
30
import urllib
31
from itertools import izip, islice, cycle
32
from cStringIO import StringIO
33

    
34
from ganeti import opcodes
35
from ganeti import constants
36
from ganeti import cli
37
from ganeti import errors
38
from ganeti import utils
39
from ganeti import hypervisor
40
from ganeti import compat
41

    
42
from ganeti.confd import client as confd_client
43

    
44

    
45
USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...")
46

    
47
MAX_RETRIES = 3
48
LOG_HEADERS = {
49
  0: "- ",
50
  1: "* ",
51
  2: ""
52
  }
53

    
54

    
55
class InstanceDown(Exception):
56
  """The checked instance was not up"""
57

    
58

    
59
class BurninFailure(Exception):
60
  """Failure detected during burning"""
61

    
62

    
63
def Usage():
64
  """Shows program usage information and exits the program."""
65

    
66
  print >> sys.stderr, "Usage:"
67
  print >> sys.stderr, USAGE
68
  sys.exit(2)
69

    
70

    
71
def Log(msg, *args, **kwargs):
72
  """Simple function that prints out its argument.
73

    
74
  """
75
  if args:
76
    msg = msg % args
77
  indent = kwargs.get("indent", 0)
78
  sys.stdout.write("%*s%s%s\n" % (2 * indent, "",
79
                                  LOG_HEADERS.get(indent, "  "), msg))
80
  sys.stdout.flush()
81

    
82

    
83
def Err(msg, exit_code=1):
84
  """Simple error logging that prints to stderr.
85

    
86
  """
87
  sys.stderr.write(msg + "\n")
88
  sys.stderr.flush()
89
  sys.exit(exit_code)
90

    
91

    
92
class SimpleOpener(urllib.FancyURLopener):
93
  """A simple url opener"""
94
  # pylint: disable=W0221
95

    
96
  def prompt_user_passwd(self, host, realm, clear_cache=0):
97
    """No-interaction version of prompt_user_passwd."""
98
    # we follow parent class' API
99
    # pylint: disable=W0613
100
    return None, None
101

    
102
  def http_error_default(self, url, fp, errcode, errmsg, headers):
103
    """Custom error handling"""
104
    # make sure sockets are not left in CLOSE_WAIT, this is similar
105
    # but with a different exception to the BasicURLOpener class
106
    _ = fp.read() # throw away data
107
    fp.close()
108
    raise InstanceDown("HTTP error returned: code %s, msg %s" %
109
                       (errcode, errmsg))
110

    
111

    
112
OPTIONS = [
113
  cli.cli_option("-o", "--os", dest="os", default=None,
114
                 help="OS to use during burnin",
115
                 metavar="<OS>",
116
                 completion_suggest=cli.OPT_COMPL_ONE_OS),
117
  cli.HYPERVISOR_OPT,
118
  cli.OSPARAMS_OPT,
119
  cli.cli_option("--disk-size", dest="disk_size",
120
                 help="Disk size (determines disk count)",
121
                 default="128m", type="string", metavar="<size,size,...>",
122
                 completion_suggest=("128M 512M 1G 4G 1G,256M"
123
                                     " 4G,1G,1G 10G").split()),
124
  cli.cli_option("--disk-growth", dest="disk_growth", help="Disk growth",
125
                 default="128m", type="string", metavar="<size,size,...>"),
126
  cli.cli_option("--mem-size", dest="mem_size", help="Memory size",
127
                 default=128, type="unit", metavar="<size>",
128
                 completion_suggest=("128M 256M 512M 1G 4G 8G"
129
                                     " 12G 16G").split()),
130
  cli.cli_option("--vcpu-count", dest="vcpu_count", help="VCPU count",
131
                 default=3, type="unit", metavar="<count>",
132
                 completion_suggest=("1 2 3 4").split()),
133
  cli.DEBUG_OPT,
134
  cli.VERBOSE_OPT,
135
  cli.NOIPCHECK_OPT,
136
  cli.NONAMECHECK_OPT,
137
  cli.EARLY_RELEASE_OPT,
138
  cli.cli_option("--no-replace1", dest="do_replace1",
139
                 help="Skip disk replacement with the same secondary",
140
                 action="store_false", default=True),
141
  cli.cli_option("--no-replace2", dest="do_replace2",
142
                 help="Skip disk replacement with a different secondary",
143
                 action="store_false", default=True),
144
  cli.cli_option("--no-failover", dest="do_failover",
145
                 help="Skip instance failovers", action="store_false",
146
                 default=True),
147
  cli.cli_option("--no-migrate", dest="do_migrate",
148
                 help="Skip instance live migration",
149
                 action="store_false", default=True),
150
  cli.cli_option("--no-move", dest="do_move",
151
                 help="Skip instance moves", action="store_false",
152
                 default=True),
153
  cli.cli_option("--no-importexport", dest="do_importexport",
154
                 help="Skip instance export/import", action="store_false",
155
                 default=True),
156
  cli.cli_option("--no-startstop", dest="do_startstop",
157
                 help="Skip instance stop/start", action="store_false",
158
                 default=True),
159
  cli.cli_option("--no-reinstall", dest="do_reinstall",
160
                 help="Skip instance reinstall", action="store_false",
161
                 default=True),
162
  cli.cli_option("--no-reboot", dest="do_reboot",
163
                 help="Skip instance reboot", action="store_false",
164
                 default=True),
165
  cli.cli_option("--reboot-types", dest="reboot_types",
166
                 help="Specify the reboot types", default=None),
167
  cli.cli_option("--no-activate-disks", dest="do_activate_disks",
168
                 help="Skip disk activation/deactivation",
169
                 action="store_false", default=True),
170
  cli.cli_option("--no-add-disks", dest="do_addremove_disks",
171
                 help="Skip disk addition/removal",
172
                 action="store_false", default=True),
173
  cli.cli_option("--no-add-nics", dest="do_addremove_nics",
174
                 help="Skip NIC addition/removal",
175
                 action="store_false", default=True),
176
  cli.cli_option("--no-nics", dest="nics",
177
                 help="No network interfaces", action="store_const",
178
                 const=[], default=[{}]),
179
  cli.cli_option("--no-confd", dest="do_confd_tests",
180
                 help="Skip confd queries",
181
                 action="store_false", default=True),
182
  cli.cli_option("--rename", dest="rename", default=None,
183
                 help=("Give one unused instance name which is taken"
184
                       " to start the renaming sequence"),
185
                 metavar="<instance_name>"),
186
  cli.cli_option("-t", "--disk-template", dest="disk_template",
187
                 choices=list(constants.DISK_TEMPLATES),
188
                 default=constants.DT_DRBD8,
189
                 help="Disk template (diskless, file, plain, sharedfile"
190
                 " or drbd) [drbd]"),
191
  cli.cli_option("-n", "--nodes", dest="nodes", default="",
192
                 help=("Comma separated list of nodes to perform"
193
                       " the burnin on (defaults to all nodes)"),
194
                 completion_suggest=cli.OPT_COMPL_MANY_NODES),
195
  cli.cli_option("-I", "--iallocator", dest="iallocator",
196
                 default=None, type="string",
197
                 help=("Perform the allocation using an iallocator"
198
                       " instead of fixed node spread (node restrictions no"
199
                       " longer apply, therefore -n/--nodes must not be"
200
                       " used"),
201
                 completion_suggest=cli.OPT_COMPL_ONE_IALLOCATOR),
202
  cli.cli_option("-p", "--parallel", default=False, action="store_true",
203
                 dest="parallel",
204
                 help=("Enable parallelization of some operations in"
205
                       " order to speed burnin or to test granular locking")),
206
  cli.cli_option("--net-timeout", default=15, type="int",
207
                 dest="net_timeout",
208
                 help=("The instance check network timeout in seconds"
209
                       " (defaults to 15 seconds)"),
210
                 completion_suggest="15 60 300 900".split()),
211
  cli.cli_option("-C", "--http-check", default=False, action="store_true",
212
                 dest="http_check",
213
                 help=("Enable checking of instance status via http,"
214
                       " looking for /hostname.txt that should contain the"
215
                       " name of the instance")),
216
  cli.cli_option("-K", "--keep-instances", default=False,
217
                 action="store_true",
218
                 dest="keep_instances",
219
                 help=("Leave instances on the cluster after burnin,"
220
                       " for investigation in case of errors or simply"
221
                       " to use them")),
222
  ]
223

    
224
# Mainly used for bash completion
225
ARGUMENTS = [cli.ArgInstance(min=1)]
226

    
227

    
228
def _DoCheckInstances(fn):
229
  """Decorator for checking instances.
230

    
231
  """
232
  def wrapper(self, *args, **kwargs):
233
    val = fn(self, *args, **kwargs)
234
    for instance in self.instances:
235
      self._CheckInstanceAlive(instance) # pylint: disable=W0212
236
    return val
237

    
238
  return wrapper
239

    
240

    
241
def _DoBatch(retry):
242
  """Decorator for possible batch operations.
243

    
244
  Must come after the _DoCheckInstances decorator (if any).
245

    
246
  @param retry: whether this is a retryable batch, will be
247
      passed to StartBatch
248

    
249
  """
250
  def wrap(fn):
251
    def batched(self, *args, **kwargs):
252
      self.StartBatch(retry)
253
      val = fn(self, *args, **kwargs)
254
      self.CommitQueue()
255
      return val
256
    return batched
257

    
258
  return wrap
259

    
260

    
261
class Burner(object):
262
  """Burner class."""
263

    
264
  def __init__(self):
265
    """Constructor."""
266
    self.url_opener = SimpleOpener()
267
    self._feed_buf = StringIO()
268
    self.nodes = []
269
    self.instances = []
270
    self.to_rem = []
271
    self.queued_ops = []
272
    self.opts = None
273
    self.queue_retry = False
274
    self.disk_count = self.disk_growth = self.disk_size = None
275
    self.hvp = self.bep = None
276
    self.ParseOptions()
277
    self.cl = cli.GetClient()
278
    self.GetState()
279

    
280
  def ClearFeedbackBuf(self):
281
    """Clear the feedback buffer."""
282
    self._feed_buf.truncate(0)
283

    
284
  def GetFeedbackBuf(self):
285
    """Return the contents of the buffer."""
286
    return self._feed_buf.getvalue()
287

    
288
  def Feedback(self, msg):
289
    """Acumulate feedback in our buffer."""
290
    formatted_msg = "%s %s" % (time.ctime(utils.MergeTime(msg[0])), msg[2])
291
    self._feed_buf.write(formatted_msg + "\n")
292
    if self.opts.verbose:
293
      Log(formatted_msg, indent=3)
294

    
295
  def MaybeRetry(self, retry_count, msg, fn, *args):
296
    """Possibly retry a given function execution.
297

    
298
    @type retry_count: int
299
    @param retry_count: retry counter:
300
        - 0: non-retryable action
301
        - 1: last retry for a retryable action
302
        - MAX_RETRIES: original try for a retryable action
303
    @type msg: str
304
    @param msg: the kind of the operation
305
    @type fn: callable
306
    @param fn: the function to be called
307

    
308
    """
309
    try:
310
      val = fn(*args)
311
      if retry_count > 0 and retry_count < MAX_RETRIES:
312
        Log("Idempotent %s succeeded after %d retries",
313
            msg, MAX_RETRIES - retry_count)
314
      return val
315
    except Exception, err: # pylint: disable=W0703
316
      if retry_count == 0:
317
        Log("Non-idempotent %s failed, aborting", msg)
318
        raise
319
      elif retry_count == 1:
320
        Log("Idempotent %s repeated failure, aborting", msg)
321
        raise
322
      else:
323
        Log("Idempotent %s failed, retry #%d/%d: %s",
324
            msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err)
325
        self.MaybeRetry(retry_count - 1, msg, fn, *args)
326

    
327
  def _ExecOp(self, *ops):
328
    """Execute one or more opcodes and manage the exec buffer.
329

    
330
    @return: if only opcode has been passed, we return its result;
331
        otherwise we return the list of results
332

    
333
    """
334
    job_id = cli.SendJob(ops, cl=self.cl)
335
    results = cli.PollJob(job_id, cl=self.cl, feedback_fn=self.Feedback)
336
    if len(ops) == 1:
337
      return results[0]
338
    else:
339
      return results
340

    
341
  def ExecOp(self, retry, *ops):
342
    """Execute one or more opcodes and manage the exec buffer.
343

    
344
    @return: if only opcode has been passed, we return its result;
345
        otherwise we return the list of results
346

    
347
    """
348
    if retry:
349
      rval = MAX_RETRIES
350
    else:
351
      rval = 0
352
    cli.SetGenericOpcodeOpts(ops, self.opts)
353
    return self.MaybeRetry(rval, "opcode", self._ExecOp, *ops)
354

    
355
  def ExecOrQueue(self, name, ops, post_process=None):
356
    """Execute an opcode and manage the exec buffer."""
357
    if self.opts.parallel:
358
      cli.SetGenericOpcodeOpts(ops, self.opts)
359
      self.queued_ops.append((ops, name, post_process))
360
    else:
361
      val = self.ExecOp(self.queue_retry, *ops) # pylint: disable=W0142
362
      if post_process is not None:
363
        post_process()
364
      return val
365

    
366
  def StartBatch(self, retry):
367
    """Start a new batch of jobs.
368

    
369
    @param retry: whether this is a retryable batch
370

    
371
    """
372
    self.queued_ops = []
373
    self.queue_retry = retry
374

    
375
  def CommitQueue(self):
376
    """Execute all submitted opcodes in case of parallel burnin"""
377
    if not self.opts.parallel or not self.queued_ops:
378
      return
379

    
380
    if self.queue_retry:
381
      rval = MAX_RETRIES
382
    else:
383
      rval = 0
384

    
385
    try:
386
      results = self.MaybeRetry(rval, "jobset", self.ExecJobSet,
387
                                self.queued_ops)
388
    finally:
389
      self.queued_ops = []
390
    return results
391

    
392
  def ExecJobSet(self, jobs):
393
    """Execute a set of jobs and return once all are done.
394

    
395
    The method will return the list of results, if all jobs are
396
    successful. Otherwise, OpExecError will be raised from within
397
    cli.py.
398

    
399
    """
400
    self.ClearFeedbackBuf()
401
    jex = cli.JobExecutor(cl=self.cl, feedback_fn=self.Feedback)
402
    for ops, name, _ in jobs:
403
      jex.QueueJob(name, *ops) # pylint: disable=W0142
404
    try:
405
      results = jex.GetResults()
406
    except Exception, err: # pylint: disable=W0703
407
      Log("Jobs failed: %s", err)
408
      raise BurninFailure()
409

    
410
    fail = False
411
    val = []
412
    for (_, name, post_process), (success, result) in zip(jobs, results):
413
      if success:
414
        if post_process:
415
          try:
416
            post_process()
417
          except Exception, err: # pylint: disable=W0703
418
            Log("Post process call for job %s failed: %s", name, err)
419
            fail = True
420
        val.append(result)
421
      else:
422
        fail = True
423

    
424
    if fail:
425
      raise BurninFailure()
426

    
427
    return val
428

    
429
  def ParseOptions(self):
430
    """Parses the command line options.
431

    
432
    In case of command line errors, it will show the usage and exit the
433
    program.
434

    
435
    """
436
    parser = optparse.OptionParser(usage="\n%s" % USAGE,
437
                                   version=("%%prog (ganeti) %s" %
438
                                            constants.RELEASE_VERSION),
439
                                   option_list=OPTIONS)
440

    
441
    options, args = parser.parse_args()
442
    if len(args) < 1 or options.os is None:
443
      Usage()
444

    
445
    supported_disk_templates = (constants.DT_DISKLESS,
446
                                constants.DT_FILE,
447
                                constants.DT_SHARED_FILE,
448
                                constants.DT_PLAIN,
449
                                constants.DT_DRBD8)
450
    if options.disk_template not in supported_disk_templates:
451
      Err("Unknown disk template '%s'" % options.disk_template)
452

    
453
    if options.disk_template == constants.DT_DISKLESS:
454
      disk_size = disk_growth = []
455
      options.do_addremove_disks = False
456
    else:
457
      disk_size = [utils.ParseUnit(v) for v in options.disk_size.split(",")]
458
      disk_growth = [utils.ParseUnit(v)
459
                     for v in options.disk_growth.split(",")]
460
      if len(disk_growth) != len(disk_size):
461
        Err("Wrong disk sizes/growth combination")
462
    if ((disk_size and options.disk_template == constants.DT_DISKLESS) or
463
        (not disk_size and options.disk_template != constants.DT_DISKLESS)):
464
      Err("Wrong disk count/disk template combination")
465

    
466
    self.disk_size = disk_size
467
    self.disk_growth = disk_growth
468
    self.disk_count = len(disk_size)
469

    
470
    if options.nodes and options.iallocator:
471
      Err("Give either the nodes option or the iallocator option, not both")
472

    
473
    if options.http_check and not options.name_check:
474
      Err("Can't enable HTTP checks without name checks")
475

    
476
    self.opts = options
477
    self.instances = args
478
    self.bep = {
479
      constants.BE_MEMORY: options.mem_size,
480
      constants.BE_VCPUS: options.vcpu_count,
481
      }
482

    
483
    self.hypervisor = None
484
    self.hvp = {}
485
    if options.hypervisor:
486
      self.hypervisor, self.hvp = options.hypervisor
487

    
488
    if options.reboot_types is None:
489
      options.reboot_types = constants.REBOOT_TYPES
490
    else:
491
      options.reboot_types = options.reboot_types.split(",")
492
      rt_diff = set(options.reboot_types).difference(constants.REBOOT_TYPES)
493
      if rt_diff:
494
        Err("Invalid reboot types specified: %s" % utils.CommaJoin(rt_diff))
495

    
496
    socket.setdefaulttimeout(options.net_timeout)
497

    
498
  def GetState(self):
499
    """Read the cluster state from the master daemon."""
500
    if self.opts.nodes:
501
      names = self.opts.nodes.split(",")
502
    else:
503
      names = []
504
    try:
505
      op = opcodes.OpNodeQuery(output_fields=["name", "offline", "drained"],
506
                               names=names, use_locking=True)
507
      result = self.ExecOp(True, op)
508
    except errors.GenericError, err:
509
      err_code, msg = cli.FormatError(err)
510
      Err(msg, exit_code=err_code)
511
    self.nodes = [data[0] for data in result if not (data[1] or data[2])]
512

    
513
    op_diagnose = opcodes.OpOsDiagnose(output_fields=["name",
514
                                                      "variants",
515
                                                      "hidden"],
516
                                       names=[])
517
    result = self.ExecOp(True, op_diagnose)
518

    
519
    if not result:
520
      Err("Can't get the OS list")
521

    
522
    found = False
523
    for (name, variants, _) in result:
524
      if self.opts.os in cli.CalculateOSNames(name, variants):
525
        found = True
526
        break
527

    
528
    if not found:
529
      Err("OS '%s' not found" % self.opts.os)
530

    
531
    cluster_info = self.cl.QueryClusterInfo()
532
    self.cluster_info = cluster_info
533
    if not self.cluster_info:
534
      Err("Can't get cluster info")
535

    
536
    default_nic_params = self.cluster_info["nicparams"][constants.PP_DEFAULT]
537
    self.cluster_default_nicparams = default_nic_params
538
    if self.hypervisor is None:
539
      self.hypervisor = self.cluster_info["default_hypervisor"]
540
    self.hv_class = hypervisor.GetHypervisorClass(self.hypervisor)
541

    
542
  @_DoCheckInstances
543
  @_DoBatch(False)
544
  def BurnCreateInstances(self):
545
    """Create the given instances.
546

    
547
    """
548
    self.to_rem = []
549
    mytor = izip(cycle(self.nodes),
550
                 islice(cycle(self.nodes), 1, None),
551
                 self.instances)
552

    
553
    Log("Creating instances")
554
    for pnode, snode, instance in mytor:
555
      Log("instance %s", instance, indent=1)
556
      if self.opts.iallocator:
557
        pnode = snode = None
558
        msg = "with iallocator %s" % self.opts.iallocator
559
      elif self.opts.disk_template not in constants.DTS_INT_MIRROR:
560
        snode = None
561
        msg = "on %s" % pnode
562
      else:
563
        msg = "on %s, %s" % (pnode, snode)
564

    
565
      Log(msg, indent=2)
566

    
567
      op = opcodes.OpInstanceCreate(instance_name=instance,
568
                                    disks=[{"size": size}
569
                                           for size in self.disk_size],
570
                                    disk_template=self.opts.disk_template,
571
                                    nics=self.opts.nics,
572
                                    mode=constants.INSTANCE_CREATE,
573
                                    os_type=self.opts.os,
574
                                    pnode=pnode,
575
                                    snode=snode,
576
                                    start=True,
577
                                    ip_check=self.opts.ip_check,
578
                                    name_check=self.opts.name_check,
579
                                    wait_for_sync=True,
580
                                    file_driver="loop",
581
                                    file_storage_dir=None,
582
                                    iallocator=self.opts.iallocator,
583
                                    beparams=self.bep,
584
                                    hvparams=self.hvp,
585
                                    hypervisor=self.hypervisor,
586
                                    osparams=self.opts.osparams,
587
                                    )
588
      remove_instance = lambda name: lambda: self.to_rem.append(name)
589
      self.ExecOrQueue(instance, [op], post_process=remove_instance(instance))
590

    
591
  @_DoBatch(False)
592
  def BurnGrowDisks(self):
593
    """Grow both the os and the swap disks by the requested amount, if any."""
594
    Log("Growing disks")
595
    for instance in self.instances:
596
      Log("instance %s", instance, indent=1)
597
      for idx, growth in enumerate(self.disk_growth):
598
        if growth > 0:
599
          op = opcodes.OpInstanceGrowDisk(instance_name=instance, disk=idx,
600
                                          amount=growth, wait_for_sync=True)
601
          Log("increase disk/%s by %s MB", idx, growth, indent=2)
602
          self.ExecOrQueue(instance, [op])
603

    
604
  @_DoBatch(True)
605
  def BurnReplaceDisks1D8(self):
606
    """Replace disks on primary and secondary for drbd8."""
607
    Log("Replacing disks on the same nodes")
608
    early_release = self.opts.early_release
609
    for instance in self.instances:
610
      Log("instance %s", instance, indent=1)
611
      ops = []
612
      for mode in constants.REPLACE_DISK_SEC, constants.REPLACE_DISK_PRI:
613
        op = opcodes.OpInstanceReplaceDisks(instance_name=instance,
614
                                            mode=mode,
615
                                            disks=list(range(self.disk_count)),
616
                                            early_release=early_release)
617
        Log("run %s", mode, indent=2)
618
        ops.append(op)
619
      self.ExecOrQueue(instance, ops)
620

    
621
  @_DoBatch(True)
622
  def BurnReplaceDisks2(self):
623
    """Replace secondary node."""
624
    Log("Changing the secondary node")
625
    mode = constants.REPLACE_DISK_CHG
626

    
627
    mytor = izip(islice(cycle(self.nodes), 2, None),
628
                 self.instances)
629
    for tnode, instance in mytor:
630
      Log("instance %s", instance, indent=1)
631
      if self.opts.iallocator:
632
        tnode = None
633
        msg = "with iallocator %s" % self.opts.iallocator
634
      else:
635
        msg = tnode
636
      op = opcodes.OpInstanceReplaceDisks(instance_name=instance,
637
                                          mode=mode,
638
                                          remote_node=tnode,
639
                                          iallocator=self.opts.iallocator,
640
                                          disks=[],
641
                                          early_release=self.opts.early_release)
642
      Log("run %s %s", mode, msg, indent=2)
643
      self.ExecOrQueue(instance, [op])
644

    
645
  @_DoCheckInstances
646
  @_DoBatch(False)
647
  def BurnFailover(self):
648
    """Failover the instances."""
649
    Log("Failing over instances")
650
    for instance in self.instances:
651
      Log("instance %s", instance, indent=1)
652
      op = opcodes.OpInstanceFailover(instance_name=instance,
653
                                      ignore_consistency=False)
654
      self.ExecOrQueue(instance, [op])
655

    
656
  @_DoCheckInstances
657
  @_DoBatch(False)
658
  def BurnMove(self):
659
    """Move the instances."""
660
    Log("Moving instances")
661
    mytor = izip(islice(cycle(self.nodes), 1, None),
662
                 self.instances)
663
    for tnode, instance in mytor:
664
      Log("instance %s", instance, indent=1)
665
      op = opcodes.OpInstanceMove(instance_name=instance,
666
                                  target_node=tnode)
667
      self.ExecOrQueue(instance, [op])
668

    
669
  @_DoBatch(False)
670
  def BurnMigrate(self):
671
    """Migrate the instances."""
672
    Log("Migrating instances")
673
    for instance in self.instances:
674
      Log("instance %s", instance, indent=1)
675
      op1 = opcodes.OpInstanceMigrate(instance_name=instance, mode=None,
676
                                      cleanup=False)
677

    
678
      op2 = opcodes.OpInstanceMigrate(instance_name=instance, mode=None,
679
                                      cleanup=True)
680
      Log("migration and migration cleanup", indent=2)
681
      self.ExecOrQueue(instance, [op1, op2])
682

    
683
  @_DoCheckInstances
684
  @_DoBatch(False)
685
  def BurnImportExport(self):
686
    """Export the instance, delete it, and import it back.
687

    
688
    """
689
    Log("Exporting and re-importing instances")
690
    mytor = izip(cycle(self.nodes),
691
                 islice(cycle(self.nodes), 1, None),
692
                 islice(cycle(self.nodes), 2, None),
693
                 self.instances)
694

    
695
    for pnode, snode, enode, instance in mytor:
696
      Log("instance %s", instance, indent=1)
697
      # read the full name of the instance
698
      nam_op = opcodes.OpInstanceQuery(output_fields=["name"],
699
                                       names=[instance], use_locking=True)
700
      full_name = self.ExecOp(False, nam_op)[0][0]
701

    
702
      if self.opts.iallocator:
703
        pnode = snode = None
704
        import_log_msg = ("import from %s"
705
                          " with iallocator %s" %
706
                          (enode, self.opts.iallocator))
707
      elif self.opts.disk_template not in constants.DTS_INT_MIRROR:
708
        snode = None
709
        import_log_msg = ("import from %s to %s" %
710
                          (enode, pnode))
711
      else:
712
        import_log_msg = ("import from %s to %s, %s" %
713
                          (enode, pnode, snode))
714

    
715
      exp_op = opcodes.OpBackupExport(instance_name=instance,
716
                                      target_node=enode,
717
                                      mode=constants.EXPORT_MODE_LOCAL,
718
                                      shutdown=True)
719
      rem_op = opcodes.OpInstanceRemove(instance_name=instance,
720
                                        ignore_failures=True)
721
      imp_dir = utils.PathJoin(constants.EXPORT_DIR, full_name)
722
      imp_op = opcodes.OpInstanceCreate(instance_name=instance,
723
                                        disks=[{"size": size}
724
                                               for size in self.disk_size],
725
                                        disk_template=self.opts.disk_template,
726
                                        nics=self.opts.nics,
727
                                        mode=constants.INSTANCE_IMPORT,
728
                                        src_node=enode,
729
                                        src_path=imp_dir,
730
                                        pnode=pnode,
731
                                        snode=snode,
732
                                        start=True,
733
                                        ip_check=self.opts.ip_check,
734
                                        name_check=self.opts.name_check,
735
                                        wait_for_sync=True,
736
                                        file_storage_dir=None,
737
                                        file_driver="loop",
738
                                        iallocator=self.opts.iallocator,
739
                                        beparams=self.bep,
740
                                        hvparams=self.hvp,
741
                                        osparams=self.opts.osparams,
742
                                        )
743

    
744
      erem_op = opcodes.OpBackupRemove(instance_name=instance)
745

    
746
      Log("export to node %s", enode, indent=2)
747
      Log("remove instance", indent=2)
748
      Log(import_log_msg, indent=2)
749
      Log("remove export", indent=2)
750
      self.ExecOrQueue(instance, [exp_op, rem_op, imp_op, erem_op])
751

    
752
  @staticmethod
753
  def StopInstanceOp(instance):
754
    """Stop given instance."""
755
    return opcodes.OpInstanceShutdown(instance_name=instance)
756

    
757
  @staticmethod
758
  def StartInstanceOp(instance):
759
    """Start given instance."""
760
    return opcodes.OpInstanceStartup(instance_name=instance, force=False)
761

    
762
  @staticmethod
763
  def RenameInstanceOp(instance, instance_new):
764
    """Rename instance."""
765
    return opcodes.OpInstanceRename(instance_name=instance,
766
                                    new_name=instance_new)
767

    
768
  @_DoCheckInstances
769
  @_DoBatch(True)
770
  def BurnStopStart(self):
771
    """Stop/start the instances."""
772
    Log("Stopping and starting instances")
773
    for instance in self.instances:
774
      Log("instance %s", instance, indent=1)
775
      op1 = self.StopInstanceOp(instance)
776
      op2 = self.StartInstanceOp(instance)
777
      self.ExecOrQueue(instance, [op1, op2])
778

    
779
  @_DoBatch(False)
780
  def BurnRemove(self):
781
    """Remove the instances."""
782
    Log("Removing instances")
783
    for instance in self.to_rem:
784
      Log("instance %s", instance, indent=1)
785
      op = opcodes.OpInstanceRemove(instance_name=instance,
786
                                    ignore_failures=True)
787
      self.ExecOrQueue(instance, [op])
788

    
789
  def BurnRename(self):
790
    """Rename the instances.
791

    
792
    Note that this function will not execute in parallel, since we
793
    only have one target for rename.
794

    
795
    """
796
    Log("Renaming instances")
797
    rename = self.opts.rename
798
    for instance in self.instances:
799
      Log("instance %s", instance, indent=1)
800
      op_stop1 = self.StopInstanceOp(instance)
801
      op_stop2 = self.StopInstanceOp(rename)
802
      op_rename1 = self.RenameInstanceOp(instance, rename)
803
      op_rename2 = self.RenameInstanceOp(rename, instance)
804
      op_start1 = self.StartInstanceOp(rename)
805
      op_start2 = self.StartInstanceOp(instance)
806
      self.ExecOp(False, op_stop1, op_rename1, op_start1)
807
      self._CheckInstanceAlive(rename)
808
      self.ExecOp(False, op_stop2, op_rename2, op_start2)
809
      self._CheckInstanceAlive(instance)
810

    
811
  @_DoCheckInstances
812
  @_DoBatch(True)
813
  def BurnReinstall(self):
814
    """Reinstall the instances."""
815
    Log("Reinstalling instances")
816
    for instance in self.instances:
817
      Log("instance %s", instance, indent=1)
818
      op1 = self.StopInstanceOp(instance)
819
      op2 = opcodes.OpInstanceReinstall(instance_name=instance)
820
      Log("reinstall without passing the OS", indent=2)
821
      op3 = opcodes.OpInstanceReinstall(instance_name=instance,
822
                                        os_type=self.opts.os)
823
      Log("reinstall specifying the OS", indent=2)
824
      op4 = self.StartInstanceOp(instance)
825
      self.ExecOrQueue(instance, [op1, op2, op3, op4])
826

    
827
  @_DoCheckInstances
828
  @_DoBatch(True)
829
  def BurnReboot(self):
830
    """Reboot the instances."""
831
    Log("Rebooting instances")
832
    for instance in self.instances:
833
      Log("instance %s", instance, indent=1)
834
      ops = []
835
      for reboot_type in self.opts.reboot_types:
836
        op = opcodes.OpInstanceReboot(instance_name=instance,
837
                                      reboot_type=reboot_type,
838
                                      ignore_secondaries=False)
839
        Log("reboot with type '%s'", reboot_type, indent=2)
840
        ops.append(op)
841
      self.ExecOrQueue(instance, ops)
842

    
843
  @_DoCheckInstances
844
  @_DoBatch(True)
845
  def BurnActivateDisks(self):
846
    """Activate and deactivate disks of the instances."""
847
    Log("Activating/deactivating disks")
848
    for instance in self.instances:
849
      Log("instance %s", instance, indent=1)
850
      op_start = self.StartInstanceOp(instance)
851
      op_act = opcodes.OpInstanceActivateDisks(instance_name=instance)
852
      op_deact = opcodes.OpInstanceDeactivateDisks(instance_name=instance)
853
      op_stop = self.StopInstanceOp(instance)
854
      Log("activate disks when online", indent=2)
855
      Log("activate disks when offline", indent=2)
856
      Log("deactivate disks (when offline)", indent=2)
857
      self.ExecOrQueue(instance, [op_act, op_stop, op_act, op_deact, op_start])
858

    
859
  @_DoCheckInstances
860
  @_DoBatch(False)
861
  def BurnAddRemoveDisks(self):
862
    """Add and remove an extra disk for the instances."""
863
    Log("Adding and removing disks")
864
    for instance in self.instances:
865
      Log("instance %s", instance, indent=1)
866
      op_add = opcodes.OpInstanceSetParams(\
867
        instance_name=instance,
868
        disks=[(constants.DDM_ADD, {"size": self.disk_size[0]})])
869
      op_rem = opcodes.OpInstanceSetParams(\
870
        instance_name=instance, disks=[(constants.DDM_REMOVE, {})])
871
      op_stop = self.StopInstanceOp(instance)
872
      op_start = self.StartInstanceOp(instance)
873
      Log("adding a disk", indent=2)
874
      Log("removing last disk", indent=2)
875
      self.ExecOrQueue(instance, [op_add, op_stop, op_rem, op_start])
876

    
877
  @_DoBatch(False)
878
  def BurnAddRemoveNICs(self):
879
    """Add and remove an extra NIC for the instances."""
880
    Log("Adding and removing NICs")
881
    for instance in self.instances:
882
      Log("instance %s", instance, indent=1)
883
      op_add = opcodes.OpInstanceSetParams(\
884
        instance_name=instance, nics=[(constants.DDM_ADD, {})])
885
      op_rem = opcodes.OpInstanceSetParams(\
886
        instance_name=instance, nics=[(constants.DDM_REMOVE, {})])
887
      Log("adding a NIC", indent=2)
888
      Log("removing last NIC", indent=2)
889
      self.ExecOrQueue(instance, [op_add, op_rem])
890

    
891
  def ConfdCallback(self, reply):
892
    """Callback for confd queries"""
893
    if reply.type == confd_client.UPCALL_REPLY:
894
      if reply.server_reply.status != constants.CONFD_REPL_STATUS_OK:
895
        Err("Query %s gave non-ok status %s: %s" % (reply.orig_request,
896
                                                    reply.server_reply.status,
897
                                                    reply.server_reply))
898
      if reply.orig_request.type == constants.CONFD_REQ_PING:
899
        Log("Ping: OK", indent=1)
900
      elif reply.orig_request.type == constants.CONFD_REQ_CLUSTER_MASTER:
901
        if reply.server_reply.answer == self.cluster_info["master"]:
902
          Log("Master: OK", indent=1)
903
        else:
904
          Err("Master: wrong: %s" % reply.server_reply.answer)
905
      elif reply.orig_request.type == constants.CONFD_REQ_NODE_ROLE_BYNAME:
906
        if reply.server_reply.answer == constants.CONFD_NODE_ROLE_MASTER:
907
          Log("Node role for master: OK", indent=1)
908
        else:
909
          Err("Node role for master: wrong: %s" % reply.server_reply.answer)
910

    
911
  def DoConfdRequestReply(self, req):
912
    self.confd_counting_callback.RegisterQuery(req.rsalt)
913
    self.confd_client.SendRequest(req, async=False)
914
    while not self.confd_counting_callback.AllAnswered():
915
      if not self.confd_client.ReceiveReply():
916
        Err("Did not receive all expected confd replies")
917
        break
918

    
919
  def BurnConfd(self):
920
    """Run confd queries for our instances.
921

    
922
    The following confd queries are tested:
923
      - CONFD_REQ_PING: simple ping
924
      - CONFD_REQ_CLUSTER_MASTER: cluster master
925
      - CONFD_REQ_NODE_ROLE_BYNAME: node role, for the master
926

    
927
    """
928
    Log("Checking confd results")
929

    
930
    filter_callback = confd_client.ConfdFilterCallback(self.ConfdCallback)
931
    counting_callback = confd_client.ConfdCountingCallback(filter_callback)
932
    self.confd_counting_callback = counting_callback
933

    
934
    self.confd_client = confd_client.GetConfdClient(counting_callback)
935

    
936
    req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
937
    self.DoConfdRequestReply(req)
938

    
939
    req = confd_client.ConfdClientRequest(
940
      type=constants.CONFD_REQ_CLUSTER_MASTER)
941
    self.DoConfdRequestReply(req)
942

    
943
    req = confd_client.ConfdClientRequest(
944
        type=constants.CONFD_REQ_NODE_ROLE_BYNAME,
945
        query=self.cluster_info["master"])
946
    self.DoConfdRequestReply(req)
947

    
948
  def _CheckInstanceAlive(self, instance):
949
    """Check if an instance is alive by doing http checks.
950

    
951
    This will try to retrieve the url on the instance /hostname.txt
952
    and check that it contains the hostname of the instance. In case
953
    we get ECONNREFUSED, we retry up to the net timeout seconds, for
954
    any other error we abort.
955

    
956
    """
957
    if not self.opts.http_check:
958
      return
959
    end_time = time.time() + self.opts.net_timeout
960
    url = None
961
    while time.time() < end_time and url is None:
962
      try:
963
        url = self.url_opener.open("http://%s/hostname.txt" % instance)
964
      except IOError:
965
        # here we can have connection refused, no route to host, etc.
966
        time.sleep(1)
967
    if url is None:
968
      raise InstanceDown(instance, "Cannot contact instance")
969
    hostname = url.read().strip()
970
    url.close()
971
    if hostname != instance:
972
      raise InstanceDown(instance, ("Hostname mismatch, expected %s, got %s" %
973
                                    (instance, hostname)))
974

    
975
  def BurninCluster(self):
976
    """Test a cluster intensively.
977

    
978
    This will create instances and then start/stop/failover them.
979
    It is safe for existing instances but could impact performance.
980

    
981
    """
982

    
983
    opts = self.opts
984

    
985
    Log("Testing global parameters")
986

    
987
    if (len(self.nodes) == 1 and
988
        opts.disk_template not in (constants.DT_DISKLESS, constants.DT_PLAIN,
989
                                   constants.DT_FILE,
990
                                   constants.DT_SHARED_FILE)):
991
      Err("When one node is available/selected the disk template must"
992
          " be 'diskless', 'file' or 'plain'")
993

    
994
    has_err = True
995
    try:
996
      self.BurnCreateInstances()
997
      if opts.do_replace1 and opts.disk_template in constants.DTS_INT_MIRROR:
998
        self.BurnReplaceDisks1D8()
999
      if (opts.do_replace2 and len(self.nodes) > 2 and
1000
          opts.disk_template in constants.DTS_INT_MIRROR):
1001
        self.BurnReplaceDisks2()
1002

    
1003
      if (opts.disk_template in constants.DTS_GROWABLE and
1004
          compat.any(n > 0 for n in self.disk_growth)):
1005
        self.BurnGrowDisks()
1006

    
1007
      if opts.do_failover and opts.disk_template in constants.DTS_MIRRORED:
1008
        self.BurnFailover()
1009

    
1010
      if opts.do_migrate:
1011
        if opts.disk_template not in constants.DTS_MIRRORED:
1012
          Log("Skipping migration (disk template %s does not support it)",
1013
              opts.disk_template)
1014
        elif not self.hv_class.CAN_MIGRATE:
1015
          Log("Skipping migration (hypervisor %s does not support it)",
1016
              self.hypervisor)
1017
        else:
1018
          self.BurnMigrate()
1019

    
1020
      if (opts.do_move and len(self.nodes) > 1 and
1021
          opts.disk_template in [constants.DT_PLAIN, constants.DT_FILE]):
1022
        self.BurnMove()
1023

    
1024
      if (opts.do_importexport and
1025
          opts.disk_template not in (constants.DT_DISKLESS,
1026
                                     constants.DT_SHARED_FILE,
1027
                                     constants.DT_FILE)):
1028
        self.BurnImportExport()
1029

    
1030
      if opts.do_reinstall:
1031
        self.BurnReinstall()
1032

    
1033
      if opts.do_reboot:
1034
        self.BurnReboot()
1035

    
1036
      if opts.do_addremove_disks:
1037
        self.BurnAddRemoveDisks()
1038

    
1039
      default_nic_mode = self.cluster_default_nicparams[constants.NIC_MODE]
1040
      # Don't add/remove nics in routed mode, as we would need an ip to add
1041
      # them with
1042
      if opts.do_addremove_nics:
1043
        if default_nic_mode == constants.NIC_MODE_BRIDGED:
1044
          self.BurnAddRemoveNICs()
1045
        else:
1046
          Log("Skipping nic add/remove as the cluster is not in bridged mode")
1047

    
1048
      if opts.do_activate_disks:
1049
        self.BurnActivateDisks()
1050

    
1051
      if opts.rename:
1052
        self.BurnRename()
1053

    
1054
      if opts.do_confd_tests:
1055
        self.BurnConfd()
1056

    
1057
      if opts.do_startstop:
1058
        self.BurnStopStart()
1059

    
1060
      has_err = False
1061
    finally:
1062
      if has_err:
1063
        Log("Error detected: opcode buffer follows:\n\n")
1064
        Log(self.GetFeedbackBuf())
1065
        Log("\n\n")
1066
      if not self.opts.keep_instances:
1067
        try:
1068
          self.BurnRemove()
1069
        except Exception, err:  # pylint: disable=W0703
1070
          if has_err: # already detected errors, so errors in removal
1071
                      # are quite expected
1072
            Log("Note: error detected during instance remove: %s", err)
1073
          else: # non-expected error
1074
            raise
1075

    
1076
    return constants.EXIT_SUCCESS
1077

    
1078

    
1079
def main():
1080
  """Main function.
1081

    
1082
  """
1083
  utils.SetupLogging(constants.LOG_BURNIN, sys.argv[0],
1084
                     debug=False, stderr_logging=True)
1085

    
1086
  return Burner().BurninCluster()
1087

    
1088

    
1089
if __name__ == "__main__":
1090
  main()