Statistics
| Branch: | Tag: | Revision:

root / tools / burnin @ 9869360c

History | View | Annotate | Download (38.6 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Burnin program
23

    
24
"""
25

    
26
import sys
27
import optparse
28
import time
29
import socket
30
import urllib
31
from itertools import izip, islice, cycle
32
from cStringIO import StringIO
33

    
34
from ganeti import opcodes
35
from ganeti import constants
36
from ganeti import cli
37
from ganeti import errors
38
from ganeti import utils
39
from ganeti import hypervisor
40
from ganeti import compat
41

    
42
from ganeti.confd import client as confd_client
43

    
44

    
45
USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...")
46

    
47
MAX_RETRIES = 3
48
LOG_HEADERS = {
49
  0: "- ",
50
  1: "* ",
51
  2: ""
52
  }
53

    
54
class InstanceDown(Exception):
55
  """The checked instance was not up"""
56

    
57

    
58
class BurninFailure(Exception):
59
  """Failure detected during burning"""
60

    
61

    
62
def Usage():
63
  """Shows program usage information and exits the program."""
64

    
65
  print >> sys.stderr, "Usage:"
66
  print >> sys.stderr, USAGE
67
  sys.exit(2)
68

    
69

    
70
def Log(msg, *args, **kwargs):
71
  """Simple function that prints out its argument.
72

    
73
  """
74
  if args:
75
    msg = msg % args
76
  indent = kwargs.get('indent', 0)
77
  sys.stdout.write("%*s%s%s\n" % (2*indent, "",
78
                                  LOG_HEADERS.get(indent, "  "), msg))
79
  sys.stdout.flush()
80

    
81

    
82
def Err(msg, exit_code=1):
83
  """Simple error logging that prints to stderr.
84

    
85
  """
86
  sys.stderr.write(msg + "\n")
87
  sys.stderr.flush()
88
  sys.exit(exit_code)
89

    
90

    
91
class SimpleOpener(urllib.FancyURLopener):
92
  """A simple url opener"""
93
  # pylint: disable-msg=W0221
94

    
95
  def prompt_user_passwd(self, host, realm, clear_cache=0):
96
    """No-interaction version of prompt_user_passwd."""
97
    # we follow parent class' API
98
    # pylint: disable-msg=W0613
99
    return None, None
100

    
101
  def http_error_default(self, url, fp, errcode, errmsg, headers):
102
    """Custom error handling"""
103
    # make sure sockets are not left in CLOSE_WAIT, this is similar
104
    # but with a different exception to the BasicURLOpener class
105
    _ = fp.read() # throw away data
106
    fp.close()
107
    raise InstanceDown("HTTP error returned: code %s, msg %s" %
108
                       (errcode, errmsg))
109

    
110

    
111
OPTIONS = [
112
  cli.cli_option("-o", "--os", dest="os", default=None,
113
                 help="OS to use during burnin",
114
                 metavar="<OS>",
115
                 completion_suggest=cli.OPT_COMPL_ONE_OS),
116
  cli.HYPERVISOR_OPT,
117
  cli.OSPARAMS_OPT,
118
  cli.cli_option("--disk-size", dest="disk_size",
119
                 help="Disk size (determines disk count)",
120
                 default="128m", type="string", metavar="<size,size,...>",
121
                 completion_suggest=("128M 512M 1G 4G 1G,256M"
122
                                     " 4G,1G,1G 10G").split()),
123
  cli.cli_option("--disk-growth", dest="disk_growth", help="Disk growth",
124
                 default="128m", type="string", metavar="<size,size,...>"),
125
  cli.cli_option("--mem-size", dest="mem_size", help="Memory size",
126
                 default=128, type="unit", metavar="<size>",
127
                 completion_suggest=("128M 256M 512M 1G 4G 8G"
128
                                     " 12G 16G").split()),
129
  cli.DEBUG_OPT,
130
  cli.VERBOSE_OPT,
131
  cli.NOIPCHECK_OPT,
132
  cli.NONAMECHECK_OPT,
133
  cli.EARLY_RELEASE_OPT,
134
  cli.cli_option("--no-replace1", dest="do_replace1",
135
                 help="Skip disk replacement with the same secondary",
136
                 action="store_false", default=True),
137
  cli.cli_option("--no-replace2", dest="do_replace2",
138
                 help="Skip disk replacement with a different secondary",
139
                 action="store_false", default=True),
140
  cli.cli_option("--no-failover", dest="do_failover",
141
                 help="Skip instance failovers", action="store_false",
142
                 default=True),
143
  cli.cli_option("--no-migrate", dest="do_migrate",
144
                 help="Skip instance live migration",
145
                 action="store_false", default=True),
146
  cli.cli_option("--no-move", dest="do_move",
147
                 help="Skip instance moves", action="store_false",
148
                 default=True),
149
  cli.cli_option("--no-importexport", dest="do_importexport",
150
                 help="Skip instance export/import", action="store_false",
151
                 default=True),
152
  cli.cli_option("--no-startstop", dest="do_startstop",
153
                 help="Skip instance stop/start", action="store_false",
154
                 default=True),
155
  cli.cli_option("--no-reinstall", dest="do_reinstall",
156
                 help="Skip instance reinstall", action="store_false",
157
                 default=True),
158
  cli.cli_option("--no-reboot", dest="do_reboot",
159
                 help="Skip instance reboot", action="store_false",
160
                 default=True),
161
  cli.cli_option("--reboot-types", dest="reboot_types",
162
                 help="Specify the reboot types", default=None),
163
  cli.cli_option("--no-activate-disks", dest="do_activate_disks",
164
                 help="Skip disk activation/deactivation",
165
                 action="store_false", default=True),
166
  cli.cli_option("--no-add-disks", dest="do_addremove_disks",
167
                 help="Skip disk addition/removal",
168
                 action="store_false", default=True),
169
  cli.cli_option("--no-add-nics", dest="do_addremove_nics",
170
                 help="Skip NIC addition/removal",
171
                 action="store_false", default=True),
172
  cli.cli_option("--no-nics", dest="nics",
173
                 help="No network interfaces", action="store_const",
174
                 const=[], default=[{}]),
175
  cli.cli_option("--no-confd", dest="do_confd_tests",
176
                 help="Skip confd queries",
177
                 action="store_false", default=True),
178
  cli.cli_option("--rename", dest="rename", default=None,
179
                 help=("Give one unused instance name which is taken"
180
                       " to start the renaming sequence"),
181
                 metavar="<instance_name>"),
182
  cli.cli_option("-t", "--disk-template", dest="disk_template",
183
                 choices=list(constants.DISK_TEMPLATES),
184
                 default=constants.DT_DRBD8,
185
                 help="Disk template (diskless, file, plain, sharedfile"
186
                 " or drbd) [drbd]"),
187
  cli.cli_option("-n", "--nodes", dest="nodes", default="",
188
                 help=("Comma separated list of nodes to perform"
189
                       " the burnin on (defaults to all nodes)"),
190
                 completion_suggest=cli.OPT_COMPL_MANY_NODES),
191
  cli.cli_option("-I", "--iallocator", dest="iallocator",
192
                 default=None, type="string",
193
                 help=("Perform the allocation using an iallocator"
194
                       " instead of fixed node spread (node restrictions no"
195
                       " longer apply, therefore -n/--nodes must not be"
196
                       " used"),
197
                 completion_suggest=cli.OPT_COMPL_ONE_IALLOCATOR),
198
  cli.cli_option("-p", "--parallel", default=False, action="store_true",
199
                 dest="parallel",
200
                 help=("Enable parallelization of some operations in"
201
                       " order to speed burnin or to test granular locking")),
202
  cli.cli_option("--net-timeout", default=15, type="int",
203
                 dest="net_timeout",
204
                 help=("The instance check network timeout in seconds"
205
                       " (defaults to 15 seconds)"),
206
                 completion_suggest="15 60 300 900".split()),
207
  cli.cli_option("-C", "--http-check", default=False, action="store_true",
208
                 dest="http_check",
209
                 help=("Enable checking of instance status via http,"
210
                       " looking for /hostname.txt that should contain the"
211
                       " name of the instance")),
212
  cli.cli_option("-K", "--keep-instances", default=False,
213
                 action="store_true",
214
                 dest="keep_instances",
215
                 help=("Leave instances on the cluster after burnin,"
216
                       " for investigation in case of errors or simply"
217
                       " to use them")),
218
  ]
219

    
220
# Mainly used for bash completion
221
ARGUMENTS = [cli.ArgInstance(min=1)]
222

    
223

    
224
def _DoCheckInstances(fn):
225
  """Decorator for checking instances.
226

    
227
  """
228
  def wrapper(self, *args, **kwargs):
229
    val = fn(self, *args, **kwargs)
230
    for instance in self.instances:
231
      self._CheckInstanceAlive(instance) # pylint: disable-msg=W0212
232
    return val
233

    
234
  return wrapper
235

    
236

    
237
def _DoBatch(retry):
238
  """Decorator for possible batch operations.
239

    
240
  Must come after the _DoCheckInstances decorator (if any).
241

    
242
  @param retry: whether this is a retryable batch, will be
243
      passed to StartBatch
244

    
245
  """
246
  def wrap(fn):
247
    def batched(self, *args, **kwargs):
248
      self.StartBatch(retry)
249
      val = fn(self, *args, **kwargs)
250
      self.CommitQueue()
251
      return val
252
    return batched
253

    
254
  return wrap
255

    
256

    
257
class Burner(object):
258
  """Burner class."""
259

    
260
  def __init__(self):
261
    """Constructor."""
262
    self.url_opener = SimpleOpener()
263
    self._feed_buf = StringIO()
264
    self.nodes = []
265
    self.instances = []
266
    self.to_rem = []
267
    self.queued_ops = []
268
    self.opts = None
269
    self.queue_retry = False
270
    self.disk_count = self.disk_growth = self.disk_size = None
271
    self.hvp = self.bep = None
272
    self.ParseOptions()
273
    self.cl = cli.GetClient()
274
    self.GetState()
275

    
276
  def ClearFeedbackBuf(self):
277
    """Clear the feedback buffer."""
278
    self._feed_buf.truncate(0)
279

    
280
  def GetFeedbackBuf(self):
281
    """Return the contents of the buffer."""
282
    return self._feed_buf.getvalue()
283

    
284
  def Feedback(self, msg):
285
    """Acumulate feedback in our buffer."""
286
    formatted_msg = "%s %s" % (time.ctime(utils.MergeTime(msg[0])), msg[2])
287
    self._feed_buf.write(formatted_msg + "\n")
288
    if self.opts.verbose:
289
      Log(formatted_msg, indent=3)
290

    
291
  def MaybeRetry(self, retry_count, msg, fn, *args):
292
    """Possibly retry a given function execution.
293

    
294
    @type retry_count: int
295
    @param retry_count: retry counter:
296
        - 0: non-retryable action
297
        - 1: last retry for a retryable action
298
        - MAX_RETRIES: original try for a retryable action
299
    @type msg: str
300
    @param msg: the kind of the operation
301
    @type fn: callable
302
    @param fn: the function to be called
303

    
304
    """
305
    try:
306
      val = fn(*args)
307
      if retry_count > 0 and retry_count < MAX_RETRIES:
308
        Log("Idempotent %s succeeded after %d retries",
309
            msg, MAX_RETRIES - retry_count)
310
      return val
311
    except Exception, err: # pylint: disable-msg=W0703
312
      if retry_count == 0:
313
        Log("Non-idempotent %s failed, aborting", msg)
314
        raise
315
      elif retry_count == 1:
316
        Log("Idempotent %s repeated failure, aborting", msg)
317
        raise
318
      else:
319
        Log("Idempotent %s failed, retry #%d/%d: %s",
320
            msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err)
321
        self.MaybeRetry(retry_count - 1, msg, fn, *args)
322

    
323
  def _ExecOp(self, *ops):
324
    """Execute one or more opcodes and manage the exec buffer.
325

    
326
    @return: if only opcode has been passed, we return its result;
327
        otherwise we return the list of results
328

    
329
    """
330
    job_id = cli.SendJob(ops, cl=self.cl)
331
    results = cli.PollJob(job_id, cl=self.cl, feedback_fn=self.Feedback)
332
    if len(ops) == 1:
333
      return results[0]
334
    else:
335
      return results
336

    
337
  def ExecOp(self, retry, *ops):
338
    """Execute one or more opcodes and manage the exec buffer.
339

    
340
    @return: if only opcode has been passed, we return its result;
341
        otherwise we return the list of results
342

    
343
    """
344
    if retry:
345
      rval = MAX_RETRIES
346
    else:
347
      rval = 0
348
    cli.SetGenericOpcodeOpts(ops, self.opts)
349
    return self.MaybeRetry(rval, "opcode", self._ExecOp, *ops)
350

    
351
  def ExecOrQueue(self, name, ops, post_process=None):
352
    """Execute an opcode and manage the exec buffer."""
353
    if self.opts.parallel:
354
      cli.SetGenericOpcodeOpts(ops, self.opts)
355
      self.queued_ops.append((ops, name, post_process))
356
    else:
357
      val = self.ExecOp(self.queue_retry, *ops) # pylint: disable-msg=W0142
358
      if post_process is not None:
359
        post_process()
360
      return val
361

    
362
  def StartBatch(self, retry):
363
    """Start a new batch of jobs.
364

    
365
    @param retry: whether this is a retryable batch
366

    
367
    """
368
    self.queued_ops = []
369
    self.queue_retry = retry
370

    
371
  def CommitQueue(self):
372
    """Execute all submitted opcodes in case of parallel burnin"""
373
    if not self.opts.parallel or not self.queued_ops:
374
      return
375

    
376
    if self.queue_retry:
377
      rval = MAX_RETRIES
378
    else:
379
      rval = 0
380

    
381
    try:
382
      results = self.MaybeRetry(rval, "jobset", self.ExecJobSet,
383
                                self.queued_ops)
384
    finally:
385
      self.queued_ops = []
386
    return results
387

    
388
  def ExecJobSet(self, jobs):
389
    """Execute a set of jobs and return once all are done.
390

    
391
    The method will return the list of results, if all jobs are
392
    successful. Otherwise, OpExecError will be raised from within
393
    cli.py.
394

    
395
    """
396
    self.ClearFeedbackBuf()
397
    jex = cli.JobExecutor(cl=self.cl, feedback_fn=self.Feedback)
398
    for ops, name, _ in jobs:
399
      jex.QueueJob(name, *ops) # pylint: disable-msg=W0142
400
    try:
401
      results = jex.GetResults()
402
    except Exception, err: # pylint: disable-msg=W0703
403
      Log("Jobs failed: %s", err)
404
      raise BurninFailure()
405

    
406
    fail = False
407
    val = []
408
    for (_, name, post_process), (success, result) in zip(jobs, results):
409
      if success:
410
        if post_process:
411
          try:
412
            post_process()
413
          except Exception, err: # pylint: disable-msg=W0703
414
            Log("Post process call for job %s failed: %s", name, err)
415
            fail = True
416
        val.append(result)
417
      else:
418
        fail = True
419

    
420
    if fail:
421
      raise BurninFailure()
422

    
423
    return val
424

    
425
  def ParseOptions(self):
426
    """Parses the command line options.
427

    
428
    In case of command line errors, it will show the usage and exit the
429
    program.
430

    
431
    """
432
    parser = optparse.OptionParser(usage="\n%s" % USAGE,
433
                                   version=("%%prog (ganeti) %s" %
434
                                            constants.RELEASE_VERSION),
435
                                   option_list=OPTIONS)
436

    
437
    options, args = parser.parse_args()
438
    if len(args) < 1 or options.os is None:
439
      Usage()
440

    
441
    supported_disk_templates = (constants.DT_DISKLESS,
442
                                constants.DT_FILE,
443
                                constants.DT_SHARED_FILE,
444
                                constants.DT_PLAIN,
445
                                constants.DT_DRBD8)
446
    if options.disk_template not in supported_disk_templates:
447
      Err("Unknown disk template '%s'" % options.disk_template)
448

    
449
    if options.disk_template == constants.DT_DISKLESS:
450
      disk_size = disk_growth = []
451
      options.do_addremove_disks = False
452
    else:
453
      disk_size = [utils.ParseUnit(v) for v in options.disk_size.split(",")]
454
      disk_growth = [utils.ParseUnit(v)
455
                     for v in options.disk_growth.split(",")]
456
      if len(disk_growth) != len(disk_size):
457
        Err("Wrong disk sizes/growth combination")
458
    if ((disk_size and options.disk_template == constants.DT_DISKLESS) or
459
        (not disk_size and options.disk_template != constants.DT_DISKLESS)):
460
      Err("Wrong disk count/disk template combination")
461

    
462
    self.disk_size = disk_size
463
    self.disk_growth = disk_growth
464
    self.disk_count = len(disk_size)
465

    
466
    if options.nodes and options.iallocator:
467
      Err("Give either the nodes option or the iallocator option, not both")
468

    
469
    if options.http_check and not options.name_check:
470
      Err("Can't enable HTTP checks without name checks")
471

    
472
    self.opts = options
473
    self.instances = args
474
    self.bep = {
475
      constants.BE_MEMORY: options.mem_size,
476
      constants.BE_VCPUS: 1,
477
      }
478

    
479
    self.hypervisor = None
480
    self.hvp = {}
481
    if options.hypervisor:
482
      self.hypervisor, self.hvp = options.hypervisor
483

    
484
    if options.reboot_types is None:
485
      options.reboot_types = constants.REBOOT_TYPES
486
    else:
487
      options.reboot_types = options.reboot_types.split(",")
488
      rt_diff = set(options.reboot_types).difference(constants.REBOOT_TYPES)
489
      if rt_diff:
490
        Err("Invalid reboot types specified: %s" % utils.CommaJoin(rt_diff))
491

    
492
    socket.setdefaulttimeout(options.net_timeout)
493

    
494
  def GetState(self):
495
    """Read the cluster state from the master daemon."""
496
    if self.opts.nodes:
497
      names = self.opts.nodes.split(",")
498
    else:
499
      names = []
500
    try:
501
      op = opcodes.OpNodeQuery(output_fields=["name", "offline", "drained"],
502
                               names=names, use_locking=True)
503
      result = self.ExecOp(True, op)
504
    except errors.GenericError, err:
505
      err_code, msg = cli.FormatError(err)
506
      Err(msg, exit_code=err_code)
507
    self.nodes = [data[0] for data in result if not (data[1] or data[2])]
508

    
509
    op_diagnose = opcodes.OpOsDiagnose(output_fields=["name",
510
                                                      "variants",
511
                                                      "hidden"],
512
                                       names=[])
513
    result = self.ExecOp(True, op_diagnose)
514

    
515
    if not result:
516
      Err("Can't get the OS list")
517

    
518
    found = False
519
    for (name, variants, _) in result:
520
      if self.opts.os in cli.CalculateOSNames(name, variants):
521
        found = True
522
        break
523

    
524
    if not found:
525
      Err("OS '%s' not found" % self.opts.os)
526

    
527
    cluster_info = self.cl.QueryClusterInfo()
528
    self.cluster_info = cluster_info
529
    if not self.cluster_info:
530
      Err("Can't get cluster info")
531

    
532
    default_nic_params = self.cluster_info["nicparams"][constants.PP_DEFAULT]
533
    self.cluster_default_nicparams = default_nic_params
534
    if self.hypervisor is None:
535
      self.hypervisor = self.cluster_info["default_hypervisor"]
536
    self.hv_class = hypervisor.GetHypervisorClass(self.hypervisor)
537

    
538
  @_DoCheckInstances
539
  @_DoBatch(False)
540
  def BurnCreateInstances(self):
541
    """Create the given instances.
542

    
543
    """
544
    self.to_rem = []
545
    mytor = izip(cycle(self.nodes),
546
                 islice(cycle(self.nodes), 1, None),
547
                 self.instances)
548

    
549
    Log("Creating instances")
550
    for pnode, snode, instance in mytor:
551
      Log("instance %s", instance, indent=1)
552
      if self.opts.iallocator:
553
        pnode = snode = None
554
        msg = "with iallocator %s" % self.opts.iallocator
555
      elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
556
        snode = None
557
        msg = "on %s" % pnode
558
      else:
559
        msg = "on %s, %s" % (pnode, snode)
560

    
561
      Log(msg, indent=2)
562

    
563
      op = opcodes.OpInstanceCreate(instance_name=instance,
564
                                    disks = [ {"size": size}
565
                                              for size in self.disk_size],
566
                                    disk_template=self.opts.disk_template,
567
                                    nics=self.opts.nics,
568
                                    mode=constants.INSTANCE_CREATE,
569
                                    os_type=self.opts.os,
570
                                    pnode=pnode,
571
                                    snode=snode,
572
                                    start=True,
573
                                    ip_check=self.opts.ip_check,
574
                                    name_check=self.opts.name_check,
575
                                    wait_for_sync=True,
576
                                    file_driver="loop",
577
                                    file_storage_dir=None,
578
                                    iallocator=self.opts.iallocator,
579
                                    beparams=self.bep,
580
                                    hvparams=self.hvp,
581
                                    hypervisor=self.hypervisor,
582
                                    osparams=self.opts.osparams,
583
                                    )
584
      remove_instance = lambda name: lambda: self.to_rem.append(name)
585
      self.ExecOrQueue(instance, [op], post_process=remove_instance(instance))
586

    
587
  @_DoBatch(False)
588
  def BurnGrowDisks(self):
589
    """Grow both the os and the swap disks by the requested amount, if any."""
590
    Log("Growing disks")
591
    for instance in self.instances:
592
      Log("instance %s", instance, indent=1)
593
      for idx, growth in enumerate(self.disk_growth):
594
        if growth > 0:
595
          op = opcodes.OpInstanceGrowDisk(instance_name=instance, disk=idx,
596
                                          amount=growth, wait_for_sync=True)
597
          Log("increase disk/%s by %s MB", idx, growth, indent=2)
598
          self.ExecOrQueue(instance, [op])
599

    
600
  @_DoBatch(True)
601
  def BurnReplaceDisks1D8(self):
602
    """Replace disks on primary and secondary for drbd8."""
603
    Log("Replacing disks on the same nodes")
604
    early_release = self.opts.early_release
605
    for instance in self.instances:
606
      Log("instance %s", instance, indent=1)
607
      ops = []
608
      for mode in constants.REPLACE_DISK_SEC, constants.REPLACE_DISK_PRI:
609
        op = opcodes.OpInstanceReplaceDisks(instance_name=instance,
610
                                            mode=mode,
611
                                            disks=list(range(self.disk_count)),
612
                                            early_release=early_release)
613
        Log("run %s", mode, indent=2)
614
        ops.append(op)
615
      self.ExecOrQueue(instance, ops)
616

    
617
  @_DoBatch(True)
618
  def BurnReplaceDisks2(self):
619
    """Replace secondary node."""
620
    Log("Changing the secondary node")
621
    mode = constants.REPLACE_DISK_CHG
622

    
623
    mytor = izip(islice(cycle(self.nodes), 2, None),
624
                 self.instances)
625
    for tnode, instance in mytor:
626
      Log("instance %s", instance, indent=1)
627
      if self.opts.iallocator:
628
        tnode = None
629
        msg = "with iallocator %s" % self.opts.iallocator
630
      else:
631
        msg = tnode
632
      op = opcodes.OpInstanceReplaceDisks(instance_name=instance,
633
                                          mode=mode,
634
                                          remote_node=tnode,
635
                                          iallocator=self.opts.iallocator,
636
                                          disks=[],
637
                                          early_release=self.opts.early_release)
638
      Log("run %s %s", mode, msg, indent=2)
639
      self.ExecOrQueue(instance, [op])
640

    
641
  @_DoCheckInstances
642
  @_DoBatch(False)
643
  def BurnFailover(self):
644
    """Failover the instances."""
645
    Log("Failing over instances")
646
    for instance in self.instances:
647
      Log("instance %s", instance, indent=1)
648
      op = opcodes.OpInstanceFailover(instance_name=instance,
649
                                      ignore_consistency=False)
650
      self.ExecOrQueue(instance, [op])
651

    
652
  @_DoCheckInstances
653
  @_DoBatch(False)
654
  def BurnMove(self):
655
    """Move the instances."""
656
    Log("Moving instances")
657
    mytor = izip(islice(cycle(self.nodes), 1, None),
658
                 self.instances)
659
    for tnode, instance in mytor:
660
      Log("instance %s", instance, indent=1)
661
      op = opcodes.OpInstanceMove(instance_name=instance,
662
                                  target_node=tnode)
663
      self.ExecOrQueue(instance, [op])
664

    
665
  @_DoBatch(False)
666
  def BurnMigrate(self):
667
    """Migrate the instances."""
668
    Log("Migrating instances")
669
    for instance in self.instances:
670
      Log("instance %s", instance, indent=1)
671
      op1 = opcodes.OpInstanceMigrate(instance_name=instance, mode=None,
672
                                      cleanup=False)
673

    
674
      op2 = opcodes.OpInstanceMigrate(instance_name=instance, mode=None,
675
                                      cleanup=True)
676
      Log("migration and migration cleanup", indent=2)
677
      self.ExecOrQueue(instance, [op1, op2])
678

    
679
  @_DoCheckInstances
680
  @_DoBatch(False)
681
  def BurnImportExport(self):
682
    """Export the instance, delete it, and import it back.
683

    
684
    """
685
    Log("Exporting and re-importing instances")
686
    mytor = izip(cycle(self.nodes),
687
                 islice(cycle(self.nodes), 1, None),
688
                 islice(cycle(self.nodes), 2, None),
689
                 self.instances)
690

    
691
    for pnode, snode, enode, instance in mytor:
692
      Log("instance %s", instance, indent=1)
693
      # read the full name of the instance
694
      nam_op = opcodes.OpInstanceQuery(output_fields=["name"],
695
                                       names=[instance], use_locking=True)
696
      full_name = self.ExecOp(False, nam_op)[0][0]
697

    
698
      if self.opts.iallocator:
699
        pnode = snode = None
700
        import_log_msg = ("import from %s"
701
                          " with iallocator %s" %
702
                          (enode, self.opts.iallocator))
703
      elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
704
        snode = None
705
        import_log_msg = ("import from %s to %s" %
706
                          (enode, pnode))
707
      else:
708
        import_log_msg = ("import from %s to %s, %s" %
709
                          (enode, pnode, snode))
710

    
711
      exp_op = opcodes.OpBackupExport(instance_name=instance,
712
                                      target_node=enode,
713
                                      mode=constants.EXPORT_MODE_LOCAL,
714
                                      shutdown=True)
715
      rem_op = opcodes.OpInstanceRemove(instance_name=instance,
716
                                        ignore_failures=True)
717
      imp_dir = utils.PathJoin(constants.EXPORT_DIR, full_name)
718
      imp_op = opcodes.OpInstanceCreate(instance_name=instance,
719
                                        disks = [ {"size": size}
720
                                                  for size in self.disk_size],
721
                                        disk_template=self.opts.disk_template,
722
                                        nics=self.opts.nics,
723
                                        mode=constants.INSTANCE_IMPORT,
724
                                        src_node=enode,
725
                                        src_path=imp_dir,
726
                                        pnode=pnode,
727
                                        snode=snode,
728
                                        start=True,
729
                                        ip_check=self.opts.ip_check,
730
                                        name_check=self.opts.name_check,
731
                                        wait_for_sync=True,
732
                                        file_storage_dir=None,
733
                                        file_driver="loop",
734
                                        iallocator=self.opts.iallocator,
735
                                        beparams=self.bep,
736
                                        hvparams=self.hvp,
737
                                        osparams=self.opts.osparams,
738
                                        )
739

    
740
      erem_op = opcodes.OpBackupRemove(instance_name=instance)
741

    
742
      Log("export to node %s", enode, indent=2)
743
      Log("remove instance", indent=2)
744
      Log(import_log_msg, indent=2)
745
      Log("remove export", indent=2)
746
      self.ExecOrQueue(instance, [exp_op, rem_op, imp_op, erem_op])
747

    
748
  @staticmethod
749
  def StopInstanceOp(instance):
750
    """Stop given instance."""
751
    return opcodes.OpInstanceShutdown(instance_name=instance)
752

    
753
  @staticmethod
754
  def StartInstanceOp(instance):
755
    """Start given instance."""
756
    return opcodes.OpInstanceStartup(instance_name=instance, force=False)
757

    
758
  @staticmethod
759
  def RenameInstanceOp(instance, instance_new):
760
    """Rename instance."""
761
    return opcodes.OpInstanceRename(instance_name=instance,
762
                                    new_name=instance_new)
763

    
764
  @_DoCheckInstances
765
  @_DoBatch(True)
766
  def BurnStopStart(self):
767
    """Stop/start the instances."""
768
    Log("Stopping and starting instances")
769
    for instance in self.instances:
770
      Log("instance %s", instance, indent=1)
771
      op1 = self.StopInstanceOp(instance)
772
      op2 = self.StartInstanceOp(instance)
773
      self.ExecOrQueue(instance, [op1, op2])
774

    
775
  @_DoBatch(False)
776
  def BurnRemove(self):
777
    """Remove the instances."""
778
    Log("Removing instances")
779
    for instance in self.to_rem:
780
      Log("instance %s", instance, indent=1)
781
      op = opcodes.OpInstanceRemove(instance_name=instance,
782
                                    ignore_failures=True)
783
      self.ExecOrQueue(instance, [op])
784

    
785
  def BurnRename(self):
786
    """Rename the instances.
787

    
788
    Note that this function will not execute in parallel, since we
789
    only have one target for rename.
790

    
791
    """
792
    Log("Renaming instances")
793
    rename = self.opts.rename
794
    for instance in self.instances:
795
      Log("instance %s", instance, indent=1)
796
      op_stop1 = self.StopInstanceOp(instance)
797
      op_stop2 = self.StopInstanceOp(rename)
798
      op_rename1 = self.RenameInstanceOp(instance, rename)
799
      op_rename2 = self.RenameInstanceOp(rename, instance)
800
      op_start1 = self.StartInstanceOp(rename)
801
      op_start2 = self.StartInstanceOp(instance)
802
      self.ExecOp(False, op_stop1, op_rename1, op_start1)
803
      self._CheckInstanceAlive(rename)
804
      self.ExecOp(False, op_stop2, op_rename2, op_start2)
805
      self._CheckInstanceAlive(instance)
806

    
807
  @_DoCheckInstances
808
  @_DoBatch(True)
809
  def BurnReinstall(self):
810
    """Reinstall the instances."""
811
    Log("Reinstalling instances")
812
    for instance in self.instances:
813
      Log("instance %s", instance, indent=1)
814
      op1 = self.StopInstanceOp(instance)
815
      op2 = opcodes.OpInstanceReinstall(instance_name=instance)
816
      Log("reinstall without passing the OS", indent=2)
817
      op3 = opcodes.OpInstanceReinstall(instance_name=instance,
818
                                        os_type=self.opts.os)
819
      Log("reinstall specifying the OS", indent=2)
820
      op4 = self.StartInstanceOp(instance)
821
      self.ExecOrQueue(instance, [op1, op2, op3, op4])
822

    
823
  @_DoCheckInstances
824
  @_DoBatch(True)
825
  def BurnReboot(self):
826
    """Reboot the instances."""
827
    Log("Rebooting instances")
828
    for instance in self.instances:
829
      Log("instance %s", instance, indent=1)
830
      ops = []
831
      for reboot_type in self.opts.reboot_types:
832
        op = opcodes.OpInstanceReboot(instance_name=instance,
833
                                      reboot_type=reboot_type,
834
                                      ignore_secondaries=False)
835
        Log("reboot with type '%s'", reboot_type, indent=2)
836
        ops.append(op)
837
      self.ExecOrQueue(instance, ops)
838

    
839
  @_DoCheckInstances
840
  @_DoBatch(True)
841
  def BurnActivateDisks(self):
842
    """Activate and deactivate disks of the instances."""
843
    Log("Activating/deactivating disks")
844
    for instance in self.instances:
845
      Log("instance %s", instance, indent=1)
846
      op_start = self.StartInstanceOp(instance)
847
      op_act = opcodes.OpInstanceActivateDisks(instance_name=instance)
848
      op_deact = opcodes.OpInstanceDeactivateDisks(instance_name=instance)
849
      op_stop = self.StopInstanceOp(instance)
850
      Log("activate disks when online", indent=2)
851
      Log("activate disks when offline", indent=2)
852
      Log("deactivate disks (when offline)", indent=2)
853
      self.ExecOrQueue(instance, [op_act, op_stop, op_act, op_deact, op_start])
854

    
855
  @_DoCheckInstances
856
  @_DoBatch(False)
857
  def BurnAddRemoveDisks(self):
858
    """Add and remove an extra disk for the instances."""
859
    Log("Adding and removing disks")
860
    for instance in self.instances:
861
      Log("instance %s", instance, indent=1)
862
      op_add = opcodes.OpInstanceSetParams(\
863
        instance_name=instance,
864
        disks=[(constants.DDM_ADD, {"size": self.disk_size[0]})])
865
      op_rem = opcodes.OpInstanceSetParams(\
866
        instance_name=instance, disks=[(constants.DDM_REMOVE, {})])
867
      op_stop = self.StopInstanceOp(instance)
868
      op_start = self.StartInstanceOp(instance)
869
      Log("adding a disk", indent=2)
870
      Log("removing last disk", indent=2)
871
      self.ExecOrQueue(instance, [op_add, op_stop, op_rem, op_start])
872

    
873
  @_DoBatch(False)
874
  def BurnAddRemoveNICs(self):
875
    """Add and remove an extra NIC for the instances."""
876
    Log("Adding and removing NICs")
877
    for instance in self.instances:
878
      Log("instance %s", instance, indent=1)
879
      op_add = opcodes.OpInstanceSetParams(\
880
        instance_name=instance, nics=[(constants.DDM_ADD, {})])
881
      op_rem = opcodes.OpInstanceSetParams(\
882
        instance_name=instance, nics=[(constants.DDM_REMOVE, {})])
883
      Log("adding a NIC", indent=2)
884
      Log("removing last NIC", indent=2)
885
      self.ExecOrQueue(instance, [op_add, op_rem])
886

    
887
  def ConfdCallback(self, reply):
888
    """Callback for confd queries"""
889
    if reply.type == confd_client.UPCALL_REPLY:
890
      if reply.server_reply.status != constants.CONFD_REPL_STATUS_OK:
891
        Err("Query %s gave non-ok status %s: %s" % (reply.orig_request,
892
                                                    reply.server_reply.status,
893
                                                    reply.server_reply))
894
      if reply.orig_request.type == constants.CONFD_REQ_PING:
895
        Log("Ping: OK", indent=1)
896
      elif reply.orig_request.type == constants.CONFD_REQ_CLUSTER_MASTER:
897
        if reply.server_reply.answer == self.cluster_info["master"]:
898
          Log("Master: OK", indent=1)
899
        else:
900
          Err("Master: wrong: %s" % reply.server_reply.answer)
901
      elif reply.orig_request.type == constants.CONFD_REQ_NODE_ROLE_BYNAME:
902
        if reply.server_reply.answer == constants.CONFD_NODE_ROLE_MASTER:
903
          Log("Node role for master: OK", indent=1)
904
        else:
905
          Err("Node role for master: wrong: %s" % reply.server_reply.answer)
906

    
907
  def DoConfdRequestReply(self, req):
908
    self.confd_counting_callback.RegisterQuery(req.rsalt)
909
    self.confd_client.SendRequest(req, async=False)
910
    while not self.confd_counting_callback.AllAnswered():
911
      if not self.confd_client.ReceiveReply():
912
        Err("Did not receive all expected confd replies")
913
        break
914

    
915
  def BurnConfd(self):
916
    """Run confd queries for our instances.
917

    
918
    The following confd queries are tested:
919
      - CONFD_REQ_PING: simple ping
920
      - CONFD_REQ_CLUSTER_MASTER: cluster master
921
      - CONFD_REQ_NODE_ROLE_BYNAME: node role, for the master
922

    
923
    """
924
    Log("Checking confd results")
925

    
926
    filter_callback = confd_client.ConfdFilterCallback(self.ConfdCallback)
927
    counting_callback = confd_client.ConfdCountingCallback(filter_callback)
928
    self.confd_counting_callback = counting_callback
929

    
930
    self.confd_client = confd_client.GetConfdClient(counting_callback)
931

    
932
    req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
933
    self.DoConfdRequestReply(req)
934

    
935
    req = confd_client.ConfdClientRequest(
936
      type=constants.CONFD_REQ_CLUSTER_MASTER)
937
    self.DoConfdRequestReply(req)
938

    
939
    req = confd_client.ConfdClientRequest(
940
        type=constants.CONFD_REQ_NODE_ROLE_BYNAME,
941
        query=self.cluster_info["master"])
942
    self.DoConfdRequestReply(req)
943

    
944
  def _CheckInstanceAlive(self, instance):
945
    """Check if an instance is alive by doing http checks.
946

    
947
    This will try to retrieve the url on the instance /hostname.txt
948
    and check that it contains the hostname of the instance. In case
949
    we get ECONNREFUSED, we retry up to the net timeout seconds, for
950
    any other error we abort.
951

    
952
    """
953
    if not self.opts.http_check:
954
      return
955
    end_time = time.time() + self.opts.net_timeout
956
    url = None
957
    while time.time() < end_time and url is None:
958
      try:
959
        url = self.url_opener.open("http://%s/hostname.txt" % instance)
960
      except IOError:
961
        # here we can have connection refused, no route to host, etc.
962
        time.sleep(1)
963
    if url is None:
964
      raise InstanceDown(instance, "Cannot contact instance")
965
    hostname = url.read().strip()
966
    url.close()
967
    if hostname != instance:
968
      raise InstanceDown(instance, ("Hostname mismatch, expected %s, got %s" %
969
                                    (instance, hostname)))
970

    
971
  def BurninCluster(self):
972
    """Test a cluster intensively.
973

    
974
    This will create instances and then start/stop/failover them.
975
    It is safe for existing instances but could impact performance.
976

    
977
    """
978

    
979
    opts = self.opts
980

    
981
    Log("Testing global parameters")
982

    
983
    if (len(self.nodes) == 1 and
984
        opts.disk_template not in (constants.DT_DISKLESS, constants.DT_PLAIN,
985
                                   constants.DT_FILE,
986
                                   constants.DT_SHARED_FILE)):
987
      Err("When one node is available/selected the disk template must"
988
          " be 'diskless', 'file' or 'plain'")
989

    
990
    has_err = True
991
    try:
992
      self.BurnCreateInstances()
993
      if opts.do_replace1 and opts.disk_template in constants.DTS_NET_MIRROR:
994
        self.BurnReplaceDisks1D8()
995
      if (opts.do_replace2 and len(self.nodes) > 2 and
996
          opts.disk_template in constants.DTS_NET_MIRROR) :
997
        self.BurnReplaceDisks2()
998

    
999
      if (opts.disk_template in constants.DTS_GROWABLE and
1000
          compat.any(n > 0 for n in self.disk_growth)):
1001
        self.BurnGrowDisks()
1002

    
1003
      if opts.do_failover and opts.disk_template in constants.DTS_MIRRORED:
1004
        self.BurnFailover()
1005

    
1006
      if opts.do_migrate:
1007
        if opts.disk_template not in constants.DTS_MIRRORED:
1008
          Log("Skipping migration (disk template %s does not support it)",
1009
              opts.disk_template)
1010
        elif not self.hv_class.CAN_MIGRATE:
1011
          Log("Skipping migration (hypervisor %s does not support it)",
1012
              self.hypervisor)
1013
        else:
1014
          self.BurnMigrate()
1015

    
1016
      if (opts.do_move and len(self.nodes) > 1 and
1017
          opts.disk_template in [constants.DT_PLAIN, constants.DT_FILE]):
1018
        self.BurnMove()
1019

    
1020
      if (opts.do_importexport and
1021
          opts.disk_template not in (constants.DT_DISKLESS,
1022
                                     constants.DT_SHARED_FILE,
1023
                                     constants.DT_FILE)):
1024
        self.BurnImportExport()
1025

    
1026
      if opts.do_reinstall:
1027
        self.BurnReinstall()
1028

    
1029
      if opts.do_reboot:
1030
        self.BurnReboot()
1031

    
1032
      if opts.do_addremove_disks:
1033
        self.BurnAddRemoveDisks()
1034

    
1035
      default_nic_mode = self.cluster_default_nicparams[constants.NIC_MODE]
1036
      # Don't add/remove nics in routed mode, as we would need an ip to add
1037
      # them with
1038
      if opts.do_addremove_nics:
1039
        if default_nic_mode == constants.NIC_MODE_BRIDGED:
1040
          self.BurnAddRemoveNICs()
1041
        else:
1042
          Log("Skipping nic add/remove as the cluster is not in bridged mode")
1043

    
1044
      if opts.do_activate_disks:
1045
        self.BurnActivateDisks()
1046

    
1047
      if opts.rename:
1048
        self.BurnRename()
1049

    
1050
      if opts.do_confd_tests:
1051
        self.BurnConfd()
1052

    
1053
      if opts.do_startstop:
1054
        self.BurnStopStart()
1055

    
1056
      has_err = False
1057
    finally:
1058
      if has_err:
1059
        Log("Error detected: opcode buffer follows:\n\n")
1060
        Log(self.GetFeedbackBuf())
1061
        Log("\n\n")
1062
      if not self.opts.keep_instances:
1063
        try:
1064
          self.BurnRemove()
1065
        except Exception, err:  # pylint: disable-msg=W0703
1066
          if has_err: # already detected errors, so errors in removal
1067
                      # are quite expected
1068
            Log("Note: error detected during instance remove: %s", err)
1069
          else: # non-expected error
1070
            raise
1071

    
1072
    return constants.EXIT_SUCCESS
1073

    
1074

    
1075
def main():
1076
  """Main function.
1077

    
1078
  """
1079
  utils.SetupLogging(constants.LOG_BURNIN, sys.argv[0],
1080
                     debug=False, stderr_logging=True)
1081

    
1082
  return Burner().BurninCluster()
1083

    
1084

    
1085
if __name__ == "__main__":
1086
  main()