Statistics
| Branch: | Tag: | Revision:

root / tools / burnin @ 2c035435

History | View | Annotate | Download (36.8 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Burnin program
23

    
24
"""
25

    
26
import sys
27
import optparse
28
import time
29
import socket
30
import urllib
31
from itertools import izip, islice, cycle
32
from cStringIO import StringIO
33

    
34
from ganeti import opcodes
35
from ganeti import constants
36
from ganeti import cli
37
from ganeti import errors
38
from ganeti import utils
39

    
40
from ganeti.confd import client as confd_client
41

    
42

    
43
USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...")
44

    
45
MAX_RETRIES = 3
46
LOG_HEADERS = {
47
  0: "- ",
48
  1: "* ",
49
  2: ""
50
  }
51

    
52
class InstanceDown(Exception):
53
  """The checked instance was not up"""
54

    
55

    
56
class BurninFailure(Exception):
57
  """Failure detected during burning"""
58

    
59

    
60
def Usage():
61
  """Shows program usage information and exits the program."""
62

    
63
  print >> sys.stderr, "Usage:"
64
  print >> sys.stderr, USAGE
65
  sys.exit(2)
66

    
67

    
68
def Log(msg, *args, **kwargs):
69
  """Simple function that prints out its argument.
70

    
71
  """
72
  if args:
73
    msg = msg % args
74
  indent = kwargs.get('indent', 0)
75
  sys.stdout.write("%*s%s%s\n" % (2*indent, "",
76
                                  LOG_HEADERS.get(indent, "  "), msg))
77
  sys.stdout.flush()
78

    
79

    
80
def Err(msg, exit_code=1):
81
  """Simple error logging that prints to stderr.
82

    
83
  """
84
  sys.stderr.write(msg + "\n")
85
  sys.stderr.flush()
86
  sys.exit(exit_code)
87

    
88

    
89
class SimpleOpener(urllib.FancyURLopener):
90
  """A simple url opener"""
91
  # pylint: disable-msg=W0221
92

    
93
  def prompt_user_passwd(self, host, realm, clear_cache=0):
94
    """No-interaction version of prompt_user_passwd."""
95
    # we follow parent class' API
96
    # pylint: disable-msg=W0613
97
    return None, None
98

    
99
  def http_error_default(self, url, fp, errcode, errmsg, headers):
100
    """Custom error handling"""
101
    # make sure sockets are not left in CLOSE_WAIT, this is similar
102
    # but with a different exception to the BasicURLOpener class
103
    _ = fp.read() # throw away data
104
    fp.close()
105
    raise InstanceDown("HTTP error returned: code %s, msg %s" %
106
                       (errcode, errmsg))
107

    
108

    
109
OPTIONS = [
110
  cli.cli_option("-o", "--os", dest="os", default=None,
111
                 help="OS to use during burnin",
112
                 metavar="<OS>",
113
                 completion_suggest=cli.OPT_COMPL_ONE_OS),
114
  cli.cli_option("--disk-size", dest="disk_size",
115
                 help="Disk size (determines disk count)",
116
                 default="128m", type="string", metavar="<size,size,...>",
117
                 completion_suggest=("128M 512M 1G 4G 1G,256M"
118
                                     " 4G,1G,1G 10G").split()),
119
  cli.cli_option("--disk-growth", dest="disk_growth", help="Disk growth",
120
                 default="128m", type="string", metavar="<size,size,...>"),
121
  cli.cli_option("--mem-size", dest="mem_size", help="Memory size",
122
                 default=128, type="unit", metavar="<size>",
123
                 completion_suggest=("128M 256M 512M 1G 4G 8G"
124
                                     " 12G 16G").split()),
125
  cli.DEBUG_OPT,
126
  cli.VERBOSE_OPT,
127
  cli.NOIPCHECK_OPT,
128
  cli.NONAMECHECK_OPT,
129
  cli.EARLY_RELEASE_OPT,
130
  cli.cli_option("--no-replace1", dest="do_replace1",
131
                 help="Skip disk replacement with the same secondary",
132
                 action="store_false", default=True),
133
  cli.cli_option("--no-replace2", dest="do_replace2",
134
                 help="Skip disk replacement with a different secondary",
135
                 action="store_false", default=True),
136
  cli.cli_option("--no-failover", dest="do_failover",
137
                 help="Skip instance failovers", action="store_false",
138
                 default=True),
139
  cli.cli_option("--no-migrate", dest="do_migrate",
140
                 help="Skip instance live migration",
141
                 action="store_false", default=True),
142
  cli.cli_option("--no-move", dest="do_move",
143
                 help="Skip instance moves", action="store_false",
144
                 default=True),
145
  cli.cli_option("--no-importexport", dest="do_importexport",
146
                 help="Skip instance export/import", action="store_false",
147
                 default=True),
148
  cli.cli_option("--no-startstop", dest="do_startstop",
149
                 help="Skip instance stop/start", action="store_false",
150
                 default=True),
151
  cli.cli_option("--no-reinstall", dest="do_reinstall",
152
                 help="Skip instance reinstall", action="store_false",
153
                 default=True),
154
  cli.cli_option("--no-reboot", dest="do_reboot",
155
                 help="Skip instance reboot", action="store_false",
156
                 default=True),
157
  cli.cli_option("--no-activate-disks", dest="do_activate_disks",
158
                 help="Skip disk activation/deactivation",
159
                 action="store_false", default=True),
160
  cli.cli_option("--no-add-disks", dest="do_addremove_disks",
161
                 help="Skip disk addition/removal",
162
                 action="store_false", default=True),
163
  cli.cli_option("--no-add-nics", dest="do_addremove_nics",
164
                 help="Skip NIC addition/removal",
165
                 action="store_false", default=True),
166
  cli.cli_option("--no-nics", dest="nics",
167
                 help="No network interfaces", action="store_const",
168
                 const=[], default=[{}]),
169
  cli.cli_option("--no-confd", dest="do_confd_tests",
170
                 help="Skip confd queries",
171
                 action="store_false", default=True),
172
  cli.cli_option("--rename", dest="rename", default=None,
173
                 help=("Give one unused instance name which is taken"
174
                       " to start the renaming sequence"),
175
                 metavar="<instance_name>"),
176
  cli.cli_option("-t", "--disk-template", dest="disk_template",
177
                 choices=list(constants.DISK_TEMPLATES),
178
                 default=constants.DT_DRBD8,
179
                 help="Disk template (diskless, file, plain or drbd) [drbd]"),
180
  cli.cli_option("-n", "--nodes", dest="nodes", default="",
181
                 help=("Comma separated list of nodes to perform"
182
                       " the burnin on (defaults to all nodes)"),
183
                 completion_suggest=cli.OPT_COMPL_MANY_NODES),
184
  cli.cli_option("-I", "--iallocator", dest="iallocator",
185
                 default=None, type="string",
186
                 help=("Perform the allocation using an iallocator"
187
                       " instead of fixed node spread (node restrictions no"
188
                       " longer apply, therefore -n/--nodes must not be"
189
                       " used"),
190
                 completion_suggest=cli.OPT_COMPL_ONE_IALLOCATOR),
191
  cli.cli_option("-p", "--parallel", default=False, action="store_true",
192
                 dest="parallel",
193
                 help=("Enable parallelization of some operations in"
194
                       " order to speed burnin or to test granular locking")),
195
  cli.cli_option("--net-timeout", default=15, type="int",
196
                 dest="net_timeout",
197
                 help=("The instance check network timeout in seconds"
198
                       " (defaults to 15 seconds)"),
199
                 completion_suggest="15 60 300 900".split()),
200
  cli.cli_option("-C", "--http-check", default=False, action="store_true",
201
                 dest="http_check",
202
                 help=("Enable checking of instance status via http,"
203
                       " looking for /hostname.txt that should contain the"
204
                       " name of the instance")),
205
  cli.cli_option("-K", "--keep-instances", default=False,
206
                 action="store_true",
207
                 dest="keep_instances",
208
                 help=("Leave instances on the cluster after burnin,"
209
                       " for investigation in case of errors or simply"
210
                       " to use them")),
211
  ]
212

    
213
# Mainly used for bash completion
214
ARGUMENTS = [cli.ArgInstance(min=1)]
215

    
216

    
217
def _DoCheckInstances(fn):
218
  """Decorator for checking instances.
219

    
220
  """
221
  def wrapper(self, *args, **kwargs):
222
    val = fn(self, *args, **kwargs)
223
    for instance in self.instances:
224
      self._CheckInstanceAlive(instance) # pylint: disable-msg=W0212
225
    return val
226

    
227
  return wrapper
228

    
229

    
230
def _DoBatch(retry):
231
  """Decorator for possible batch operations.
232

    
233
  Must come after the _DoCheckInstances decorator (if any).
234

    
235
  @param retry: whether this is a retryable batch, will be
236
      passed to StartBatch
237

    
238
  """
239
  def wrap(fn):
240
    def batched(self, *args, **kwargs):
241
      self.StartBatch(retry)
242
      val = fn(self, *args, **kwargs)
243
      self.CommitQueue()
244
      return val
245
    return batched
246

    
247
  return wrap
248

    
249

    
250
class Burner(object):
251
  """Burner class."""
252

    
253
  def __init__(self):
254
    """Constructor."""
255
    utils.SetupLogging(constants.LOG_BURNIN, debug=False, stderr_logging=True)
256
    self.url_opener = SimpleOpener()
257
    self._feed_buf = StringIO()
258
    self.nodes = []
259
    self.instances = []
260
    self.to_rem = []
261
    self.queued_ops = []
262
    self.opts = None
263
    self.queue_retry = False
264
    self.disk_count = self.disk_growth = self.disk_size = None
265
    self.hvp = self.bep = None
266
    self.ParseOptions()
267
    self.cl = cli.GetClient()
268
    self.GetState()
269

    
270
  def ClearFeedbackBuf(self):
271
    """Clear the feedback buffer."""
272
    self._feed_buf.truncate(0)
273

    
274
  def GetFeedbackBuf(self):
275
    """Return the contents of the buffer."""
276
    return self._feed_buf.getvalue()
277

    
278
  def Feedback(self, msg):
279
    """Acumulate feedback in our buffer."""
280
    formatted_msg = "%s %s" % (time.ctime(utils.MergeTime(msg[0])), msg[2])
281
    self._feed_buf.write(formatted_msg + "\n")
282
    if self.opts.verbose:
283
      Log(formatted_msg, indent=3)
284

    
285
  def MaybeRetry(self, retry_count, msg, fn, *args):
286
    """Possibly retry a given function execution.
287

    
288
    @type retry_count: int
289
    @param retry_count: retry counter:
290
        - 0: non-retryable action
291
        - 1: last retry for a retryable action
292
        - MAX_RETRIES: original try for a retryable action
293
    @type msg: str
294
    @param msg: the kind of the operation
295
    @type fn: callable
296
    @param fn: the function to be called
297

    
298
    """
299
    try:
300
      val = fn(*args)
301
      if retry_count > 0 and retry_count < MAX_RETRIES:
302
        Log("Idempotent %s succeeded after %d retries",
303
            msg, MAX_RETRIES - retry_count)
304
      return val
305
    except Exception, err: # pylint: disable-msg=W0703
306
      if retry_count == 0:
307
        Log("Non-idempotent %s failed, aborting", msg)
308
        raise
309
      elif retry_count == 1:
310
        Log("Idempotent %s repeated failure, aborting", msg)
311
        raise
312
      else:
313
        Log("Idempotent %s failed, retry #%d/%d: %s",
314
            msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err)
315
        self.MaybeRetry(retry_count - 1, msg, fn, *args)
316

    
317
  def _SetDebug(self, ops):
318
    """Set the debug value on the given opcodes"""
319
    for op in ops:
320
      op.debug_level = self.opts.debug
321

    
322
  def _ExecOp(self, *ops):
323
    """Execute one or more opcodes and manage the exec buffer.
324

    
325
    @result: if only opcode has been passed, we return its result;
326
        otherwise we return the list of results
327

    
328
    """
329
    job_id = cli.SendJob(ops, cl=self.cl)
330
    results = cli.PollJob(job_id, cl=self.cl, feedback_fn=self.Feedback)
331
    if len(ops) == 1:
332
      return results[0]
333
    else:
334
      return results
335

    
336
  def ExecOp(self, retry, *ops):
337
    """Execute one or more opcodes and manage the exec buffer.
338

    
339
    @result: if only opcode has been passed, we return its result;
340
        otherwise we return the list of results
341

    
342
    """
343
    if retry:
344
      rval = MAX_RETRIES
345
    else:
346
      rval = 0
347
    self._SetDebug(ops)
348
    return self.MaybeRetry(rval, "opcode", self._ExecOp, *ops)
349

    
350
  def ExecOrQueue(self, name, ops, post_process=None):
351
    """Execute an opcode and manage the exec buffer."""
352
    if self.opts.parallel:
353
      self._SetDebug(ops)
354
      self.queued_ops.append((ops, name, post_process))
355
    else:
356
      val = self.ExecOp(self.queue_retry, *ops) # pylint: disable-msg=W0142
357
      if post_process is not None:
358
        post_process()
359
      return val
360

    
361
  def StartBatch(self, retry):
362
    """Start a new batch of jobs.
363

    
364
    @param retry: whether this is a retryable batch
365

    
366
    """
367
    self.queued_ops = []
368
    self.queue_retry = retry
369

    
370
  def CommitQueue(self):
371
    """Execute all submitted opcodes in case of parallel burnin"""
372
    if not self.opts.parallel:
373
      return
374

    
375
    if self.queue_retry:
376
      rval = MAX_RETRIES
377
    else:
378
      rval = 0
379

    
380
    try:
381
      results = self.MaybeRetry(rval, "jobset", self.ExecJobSet,
382
                                self.queued_ops)
383
    finally:
384
      self.queued_ops = []
385
    return results
386

    
387
  def ExecJobSet(self, jobs):
388
    """Execute a set of jobs and return once all are done.
389

    
390
    The method will return the list of results, if all jobs are
391
    successful. Otherwise, OpExecError will be raised from within
392
    cli.py.
393

    
394
    """
395
    self.ClearFeedbackBuf()
396
    jex = cli.JobExecutor(cl=self.cl, feedback_fn=self.Feedback)
397
    for ops, name, _ in jobs:
398
      jex.QueueJob(name, *ops) # pylint: disable-msg=W0142
399
    try:
400
      results = jex.GetResults()
401
    except Exception, err: # pylint: disable-msg=W0703
402
      Log("Jobs failed: %s", err)
403
      raise BurninFailure()
404

    
405
    fail = False
406
    val = []
407
    for (_, name, post_process), (success, result) in zip(jobs, results):
408
      if success:
409
        if post_process:
410
          try:
411
            post_process()
412
          except Exception, err: # pylint: disable-msg=W0703
413
            Log("Post process call for job %s failed: %s", name, err)
414
            fail = True
415
        val.append(result)
416
      else:
417
        fail = True
418

    
419
    if fail:
420
      raise BurninFailure()
421

    
422
    return val
423

    
424
  def ParseOptions(self):
425
    """Parses the command line options.
426

    
427
    In case of command line errors, it will show the usage and exit the
428
    program.
429

    
430
    """
431
    parser = optparse.OptionParser(usage="\n%s" % USAGE,
432
                                   version=("%%prog (ganeti) %s" %
433
                                            constants.RELEASE_VERSION),
434
                                   option_list=OPTIONS)
435

    
436
    options, args = parser.parse_args()
437
    if len(args) < 1 or options.os is None:
438
      Usage()
439

    
440
    supported_disk_templates = (constants.DT_DISKLESS,
441
                                constants.DT_FILE,
442
                                constants.DT_PLAIN,
443
                                constants.DT_DRBD8)
444
    if options.disk_template not in supported_disk_templates:
445
      Err("Unknown disk template '%s'" % options.disk_template)
446

    
447
    if options.disk_template == constants.DT_DISKLESS:
448
      disk_size = disk_growth = []
449
      options.do_addremove_disks = False
450
    else:
451
      disk_size = [utils.ParseUnit(v) for v in options.disk_size.split(",")]
452
      disk_growth = [utils.ParseUnit(v)
453
                     for v in options.disk_growth.split(",")]
454
      if len(disk_growth) != len(disk_size):
455
        Err("Wrong disk sizes/growth combination")
456
    if ((disk_size and options.disk_template == constants.DT_DISKLESS) or
457
        (not disk_size and options.disk_template != constants.DT_DISKLESS)):
458
      Err("Wrong disk count/disk template combination")
459

    
460
    self.disk_size = disk_size
461
    self.disk_growth = disk_growth
462
    self.disk_count = len(disk_size)
463

    
464
    if options.nodes and options.iallocator:
465
      Err("Give either the nodes option or the iallocator option, not both")
466

    
467
    if options.http_check and not options.name_check:
468
      Err("Can't enable HTTP checks without name checks")
469

    
470
    self.opts = options
471
    self.instances = args
472
    self.bep = {
473
      constants.BE_MEMORY: options.mem_size,
474
      constants.BE_VCPUS: 1,
475
      }
476
    self.hvp = {}
477

    
478
    socket.setdefaulttimeout(options.net_timeout)
479

    
480
  def GetState(self):
481
    """Read the cluster state from the master daemon."""
482
    if self.opts.nodes:
483
      names = self.opts.nodes.split(",")
484
    else:
485
      names = []
486
    try:
487
      op = opcodes.OpQueryNodes(output_fields=["name", "offline", "drained"],
488
                                names=names, use_locking=True)
489
      result = self.ExecOp(True, op)
490
    except errors.GenericError, err:
491
      err_code, msg = cli.FormatError(err)
492
      Err(msg, exit_code=err_code)
493
    self.nodes = [data[0] for data in result if not (data[1] or data[2])]
494

    
495
    op_diagnose = opcodes.OpDiagnoseOS(output_fields=["name", "valid",
496
                                                      "variants"], names=[])
497
    result = self.ExecOp(True, op_diagnose)
498

    
499
    if not result:
500
      Err("Can't get the OS list")
501

    
502
    found = False
503
    for (name, valid, variants) in result:
504
      if valid and self.opts.os in cli.CalculateOSNames(name, variants):
505
        found = True
506
        break
507

    
508
    if not found:
509
      Err("OS '%s' not found" % self.opts.os)
510

    
511
    cluster_info = self.cl.QueryClusterInfo()
512
    self.cluster_info = cluster_info
513
    if not self.cluster_info:
514
      Err("Can't get cluster info")
515

    
516
    default_nic_params = self.cluster_info["nicparams"][constants.PP_DEFAULT]
517
    self.cluster_default_nicparams = default_nic_params
518

    
519
  @_DoCheckInstances
520
  @_DoBatch(False)
521
  def BurnCreateInstances(self):
522
    """Create the given instances.
523

    
524
    """
525
    self.to_rem = []
526
    mytor = izip(cycle(self.nodes),
527
                 islice(cycle(self.nodes), 1, None),
528
                 self.instances)
529

    
530
    Log("Creating instances")
531
    for pnode, snode, instance in mytor:
532
      Log("instance %s", instance, indent=1)
533
      if self.opts.iallocator:
534
        pnode = snode = None
535
        msg = "with iallocator %s" % self.opts.iallocator
536
      elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
537
        snode = None
538
        msg = "on %s" % pnode
539
      else:
540
        msg = "on %s, %s" % (pnode, snode)
541

    
542
      Log(msg, indent=2)
543

    
544
      op = opcodes.OpCreateInstance(instance_name=instance,
545
                                    disks = [ {"size": size}
546
                                              for size in self.disk_size],
547
                                    disk_template=self.opts.disk_template,
548
                                    nics=self.opts.nics,
549
                                    mode=constants.INSTANCE_CREATE,
550
                                    os_type=self.opts.os,
551
                                    pnode=pnode,
552
                                    snode=snode,
553
                                    start=True,
554
                                    ip_check=self.opts.ip_check,
555
                                    name_check=self.opts.name_check,
556
                                    wait_for_sync=True,
557
                                    file_driver="loop",
558
                                    file_storage_dir=None,
559
                                    iallocator=self.opts.iallocator,
560
                                    beparams=self.bep,
561
                                    hvparams=self.hvp,
562
                                    )
563

    
564
      self.ExecOrQueue(instance, [op])
565
      self.to_rem.append(instance)
566

    
567
  @_DoBatch(False)
568
  def BurnGrowDisks(self):
569
    """Grow both the os and the swap disks by the requested amount, if any."""
570
    Log("Growing disks")
571
    for instance in self.instances:
572
      Log("instance %s", instance, indent=1)
573
      for idx, growth in enumerate(self.disk_growth):
574
        if growth > 0:
575
          op = opcodes.OpGrowDisk(instance_name=instance, disk=idx,
576
                                  amount=growth, wait_for_sync=True)
577
          Log("increase disk/%s by %s MB", idx, growth, indent=2)
578
          self.ExecOrQueue(instance, [op])
579

    
580
  @_DoBatch(True)
581
  def BurnReplaceDisks1D8(self):
582
    """Replace disks on primary and secondary for drbd8."""
583
    Log("Replacing disks on the same nodes")
584
    for instance in self.instances:
585
      Log("instance %s", instance, indent=1)
586
      ops = []
587
      for mode in constants.REPLACE_DISK_SEC, constants.REPLACE_DISK_PRI:
588
        op = opcodes.OpReplaceDisks(instance_name=instance,
589
                                    mode=mode,
590
                                    disks=[i for i in range(self.disk_count)],
591
                                    early_release=self.opts.early_release)
592
        Log("run %s", mode, indent=2)
593
        ops.append(op)
594
      self.ExecOrQueue(instance, ops)
595

    
596
  @_DoBatch(True)
597
  def BurnReplaceDisks2(self):
598
    """Replace secondary node."""
599
    Log("Changing the secondary node")
600
    mode = constants.REPLACE_DISK_CHG
601

    
602
    mytor = izip(islice(cycle(self.nodes), 2, None),
603
                 self.instances)
604
    for tnode, instance in mytor:
605
      Log("instance %s", instance, indent=1)
606
      if self.opts.iallocator:
607
        tnode = None
608
        msg = "with iallocator %s" % self.opts.iallocator
609
      else:
610
        msg = tnode
611
      op = opcodes.OpReplaceDisks(instance_name=instance,
612
                                  mode=mode,
613
                                  remote_node=tnode,
614
                                  iallocator=self.opts.iallocator,
615
                                  disks=[],
616
                                  early_release=self.opts.early_release)
617
      Log("run %s %s", mode, msg, indent=2)
618
      self.ExecOrQueue(instance, [op])
619

    
620
  @_DoCheckInstances
621
  @_DoBatch(False)
622
  def BurnFailover(self):
623
    """Failover the instances."""
624
    Log("Failing over instances")
625
    for instance in self.instances:
626
      Log("instance %s", instance, indent=1)
627
      op = opcodes.OpFailoverInstance(instance_name=instance,
628
                                      ignore_consistency=False)
629
      self.ExecOrQueue(instance, [op])
630

    
631
  @_DoCheckInstances
632
  @_DoBatch(False)
633
  def BurnMove(self):
634
    """Move the instances."""
635
    Log("Moving instances")
636
    mytor = izip(islice(cycle(self.nodes), 1, None),
637
                 self.instances)
638
    for tnode, instance in mytor:
639
      Log("instance %s", instance, indent=1)
640
      op = opcodes.OpMoveInstance(instance_name=instance,
641
                                  target_node=tnode)
642
      self.ExecOrQueue(instance, [op])
643

    
644
  @_DoBatch(False)
645
  def BurnMigrate(self):
646
    """Migrate the instances."""
647
    Log("Migrating instances")
648
    for instance in self.instances:
649
      Log("instance %s", instance, indent=1)
650
      op1 = opcodes.OpMigrateInstance(instance_name=instance, live=True,
651
                                      cleanup=False)
652

    
653
      op2 = opcodes.OpMigrateInstance(instance_name=instance, live=True,
654
                                      cleanup=True)
655
      Log("migration and migration cleanup", indent=2)
656
      self.ExecOrQueue(instance, [op1, op2])
657

    
658
  @_DoCheckInstances
659
  @_DoBatch(False)
660
  def BurnImportExport(self):
661
    """Export the instance, delete it, and import it back.
662

    
663
    """
664
    Log("Exporting and re-importing instances")
665
    mytor = izip(cycle(self.nodes),
666
                 islice(cycle(self.nodes), 1, None),
667
                 islice(cycle(self.nodes), 2, None),
668
                 self.instances)
669

    
670
    for pnode, snode, enode, instance in mytor:
671
      Log("instance %s", instance, indent=1)
672
      # read the full name of the instance
673
      nam_op = opcodes.OpQueryInstances(output_fields=["name"],
674
                                        names=[instance], use_locking=True)
675
      full_name = self.ExecOp(False, nam_op)[0][0]
676

    
677
      if self.opts.iallocator:
678
        pnode = snode = None
679
        import_log_msg = ("import from %s"
680
                          " with iallocator %s" %
681
                          (enode, self.opts.iallocator))
682
      elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
683
        snode = None
684
        import_log_msg = ("import from %s to %s" %
685
                          (enode, pnode))
686
      else:
687
        import_log_msg = ("import from %s to %s, %s" %
688
                          (enode, pnode, snode))
689

    
690
      exp_op = opcodes.OpExportInstance(instance_name=instance,
691
                                           target_node=enode,
692
                                           shutdown=True)
693
      rem_op = opcodes.OpRemoveInstance(instance_name=instance,
694
                                        ignore_failures=True)
695
      imp_dir = utils.PathJoin(constants.EXPORT_DIR, full_name)
696
      imp_op = opcodes.OpCreateInstance(instance_name=instance,
697
                                        disks = [ {"size": size}
698
                                                  for size in self.disk_size],
699
                                        disk_template=self.opts.disk_template,
700
                                        nics=self.opts.nics,
701
                                        mode=constants.INSTANCE_IMPORT,
702
                                        src_node=enode,
703
                                        src_path=imp_dir,
704
                                        pnode=pnode,
705
                                        snode=snode,
706
                                        start=True,
707
                                        ip_check=self.opts.ip_check,
708
                                        name_check=self.opts.name_check,
709
                                        wait_for_sync=True,
710
                                        file_storage_dir=None,
711
                                        file_driver="loop",
712
                                        iallocator=self.opts.iallocator,
713
                                        beparams=self.bep,
714
                                        hvparams=self.hvp,
715
                                        )
716

    
717
      erem_op = opcodes.OpRemoveExport(instance_name=instance)
718

    
719
      Log("export to node %s", enode, indent=2)
720
      Log("remove instance", indent=2)
721
      Log(import_log_msg, indent=2)
722
      Log("remove export", indent=2)
723
      self.ExecOrQueue(instance, [exp_op, rem_op, imp_op, erem_op])
724

    
725
  @staticmethod
726
  def StopInstanceOp(instance):
727
    """Stop given instance."""
728
    return opcodes.OpShutdownInstance(instance_name=instance)
729

    
730
  @staticmethod
731
  def StartInstanceOp(instance):
732
    """Start given instance."""
733
    return opcodes.OpStartupInstance(instance_name=instance, force=False)
734

    
735
  @staticmethod
736
  def RenameInstanceOp(instance, instance_new):
737
    """Rename instance."""
738
    return opcodes.OpRenameInstance(instance_name=instance,
739
                                    new_name=instance_new)
740

    
741
  @_DoCheckInstances
742
  @_DoBatch(True)
743
  def BurnStopStart(self):
744
    """Stop/start the instances."""
745
    Log("Stopping and starting instances")
746
    for instance in self.instances:
747
      Log("instance %s", instance, indent=1)
748
      op1 = self.StopInstanceOp(instance)
749
      op2 = self.StartInstanceOp(instance)
750
      self.ExecOrQueue(instance, [op1, op2])
751

    
752
  @_DoBatch(False)
753
  def BurnRemove(self):
754
    """Remove the instances."""
755
    Log("Removing instances")
756
    for instance in self.to_rem:
757
      Log("instance %s", instance, indent=1)
758
      op = opcodes.OpRemoveInstance(instance_name=instance,
759
                                    ignore_failures=True)
760
      self.ExecOrQueue(instance, [op])
761

    
762
  def BurnRename(self):
763
    """Rename the instances.
764

    
765
    Note that this function will not execute in parallel, since we
766
    only have one target for rename.
767

    
768
    """
769
    Log("Renaming instances")
770
    rename = self.opts.rename
771
    for instance in self.instances:
772
      Log("instance %s", instance, indent=1)
773
      op_stop1 = self.StopInstanceOp(instance)
774
      op_stop2 = self.StopInstanceOp(rename)
775
      op_rename1 = self.RenameInstanceOp(instance, rename)
776
      op_rename2 = self.RenameInstanceOp(rename, instance)
777
      op_start1 = self.StartInstanceOp(rename)
778
      op_start2 = self.StartInstanceOp(instance)
779
      self.ExecOp(False, op_stop1, op_rename1, op_start1)
780
      self._CheckInstanceAlive(rename)
781
      self.ExecOp(False, op_stop2, op_rename2, op_start2)
782
      self._CheckInstanceAlive(instance)
783

    
784
  @_DoCheckInstances
785
  @_DoBatch(True)
786
  def BurnReinstall(self):
787
    """Reinstall the instances."""
788
    Log("Reinstalling instances")
789
    for instance in self.instances:
790
      Log("instance %s", instance, indent=1)
791
      op1 = self.StopInstanceOp(instance)
792
      op2 = opcodes.OpReinstallInstance(instance_name=instance)
793
      Log("reinstall without passing the OS", indent=2)
794
      op3 = opcodes.OpReinstallInstance(instance_name=instance,
795
                                        os_type=self.opts.os)
796
      Log("reinstall specifying the OS", indent=2)
797
      op4 = self.StartInstanceOp(instance)
798
      self.ExecOrQueue(instance, [op1, op2, op3, op4])
799

    
800
  @_DoCheckInstances
801
  @_DoBatch(True)
802
  def BurnReboot(self):
803
    """Reboot the instances."""
804
    Log("Rebooting instances")
805
    for instance in self.instances:
806
      Log("instance %s", instance, indent=1)
807
      ops = []
808
      for reboot_type in constants.REBOOT_TYPES:
809
        op = opcodes.OpRebootInstance(instance_name=instance,
810
                                      reboot_type=reboot_type,
811
                                      ignore_secondaries=False)
812
        Log("reboot with type '%s'", reboot_type, indent=2)
813
        ops.append(op)
814
      self.ExecOrQueue(instance, ops)
815

    
816
  @_DoCheckInstances
817
  @_DoBatch(True)
818
  def BurnActivateDisks(self):
819
    """Activate and deactivate disks of the instances."""
820
    Log("Activating/deactivating disks")
821
    for instance in self.instances:
822
      Log("instance %s", instance, indent=1)
823
      op_start = self.StartInstanceOp(instance)
824
      op_act = opcodes.OpActivateInstanceDisks(instance_name=instance)
825
      op_deact = opcodes.OpDeactivateInstanceDisks(instance_name=instance)
826
      op_stop = self.StopInstanceOp(instance)
827
      Log("activate disks when online", indent=2)
828
      Log("activate disks when offline", indent=2)
829
      Log("deactivate disks (when offline)", indent=2)
830
      self.ExecOrQueue(instance, [op_act, op_stop, op_act, op_deact, op_start])
831

    
832
  @_DoCheckInstances
833
  @_DoBatch(False)
834
  def BurnAddRemoveDisks(self):
835
    """Add and remove an extra disk for the instances."""
836
    Log("Adding and removing disks")
837
    for instance in self.instances:
838
      Log("instance %s", instance, indent=1)
839
      op_add = opcodes.OpSetInstanceParams(\
840
        instance_name=instance,
841
        disks=[(constants.DDM_ADD, {"size": self.disk_size[0]})])
842
      op_rem = opcodes.OpSetInstanceParams(\
843
        instance_name=instance, disks=[(constants.DDM_REMOVE, {})])
844
      op_stop = self.StopInstanceOp(instance)
845
      op_start = self.StartInstanceOp(instance)
846
      Log("adding a disk", indent=2)
847
      Log("removing last disk", indent=2)
848
      self.ExecOrQueue(instance, [op_add, op_stop, op_rem, op_start])
849

    
850
  @_DoBatch(False)
851
  def BurnAddRemoveNICs(self):
852
    """Add and remove an extra NIC for the instances."""
853
    Log("Adding and removing NICs")
854
    for instance in self.instances:
855
      Log("instance %s", instance, indent=1)
856
      op_add = opcodes.OpSetInstanceParams(\
857
        instance_name=instance, nics=[(constants.DDM_ADD, {})])
858
      op_rem = opcodes.OpSetInstanceParams(\
859
        instance_name=instance, nics=[(constants.DDM_REMOVE, {})])
860
      Log("adding a NIC", indent=2)
861
      Log("removing last NIC", indent=2)
862
      self.ExecOrQueue(instance, [op_add, op_rem])
863

    
864
  def ConfdCallback(self, reply):
865
    """Callback for confd queries"""
866
    if reply.type == confd_client.UPCALL_REPLY:
867
      if reply.server_reply.status != constants.CONFD_REPL_STATUS_OK:
868
        Err("Query %s gave non-ok status %s: %s" % (reply.orig_request,
869
                                                    reply.server_reply.status,
870
                                                    reply.server_reply))
871
      if reply.orig_request.type == constants.CONFD_REQ_PING:
872
        Log("Ping: OK", indent=1)
873
      elif reply.orig_request.type == constants.CONFD_REQ_CLUSTER_MASTER:
874
        if reply.server_reply.answer == self.cluster_info["master"]:
875
          Log("Master: OK", indent=1)
876
        else:
877
          Err("Master: wrong: %s" % reply.server_reply.answer)
878
      elif reply.orig_request.type == constants.CONFD_REQ_NODE_ROLE_BYNAME:
879
        if reply.server_reply.answer == constants.CONFD_NODE_ROLE_MASTER:
880
          Log("Node role for master: OK", indent=1)
881
        else:
882
          Err("Node role for master: wrong: %s" % reply.server_reply.answer)
883

    
884
  def DoConfdRequestReply(self, req):
885
    self.confd_counting_callback.RegisterQuery(req.rsalt)
886
    self.confd_client.SendRequest(req, async=False)
887
    while not self.confd_counting_callback.AllAnswered():
888
      if not self.confd_client.ReceiveReply():
889
        Err("Did not receive all expected confd replies")
890
        break
891

    
892
  def BurnConfd(self):
893
    """Run confd queries for our instances.
894

    
895
    The following confd queries are tested:
896
    - CONFD_REQ_PING: simple ping
897
    - CONFD_REQ_CLUSTER_MASTER: cluster master
898
    - CONFD_REQ_NODE_ROLE_BYNAME: node role, for the master
899

    
900
    """
901
    Log("Checking confd results")
902

    
903
    filter_callback = confd_client.ConfdFilterCallback(self.ConfdCallback)
904
    counting_callback = confd_client.ConfdCountingCallback(filter_callback)
905
    self.confd_counting_callback = counting_callback
906

    
907
    self.confd_client = confd_client.GetConfdClient(counting_callback)
908

    
909
    req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
910
    self.DoConfdRequestReply(req)
911

    
912
    req = confd_client.ConfdClientRequest(
913
      type=constants.CONFD_REQ_CLUSTER_MASTER)
914
    self.DoConfdRequestReply(req)
915

    
916
    req = confd_client.ConfdClientRequest(
917
        type=constants.CONFD_REQ_NODE_ROLE_BYNAME,
918
        query=self.cluster_info["master"])
919
    self.DoConfdRequestReply(req)
920

    
921
  def _CheckInstanceAlive(self, instance):
922
    """Check if an instance is alive by doing http checks.
923

    
924
    This will try to retrieve the url on the instance /hostname.txt
925
    and check that it contains the hostname of the instance. In case
926
    we get ECONNREFUSED, we retry up to the net timeout seconds, for
927
    any other error we abort.
928

    
929
    """
930
    if not self.opts.http_check:
931
      return
932
    end_time = time.time() + self.opts.net_timeout
933
    url = None
934
    while time.time() < end_time and url is None:
935
      try:
936
        url = self.url_opener.open("http://%s/hostname.txt" % instance)
937
      except IOError:
938
        # here we can have connection refused, no route to host, etc.
939
        time.sleep(1)
940
    if url is None:
941
      raise InstanceDown(instance, "Cannot contact instance")
942
    hostname = url.read().strip()
943
    url.close()
944
    if hostname != instance:
945
      raise InstanceDown(instance, ("Hostname mismatch, expected %s, got %s" %
946
                                    (instance, hostname)))
947

    
948
  def BurninCluster(self):
949
    """Test a cluster intensively.
950

    
951
    This will create instances and then start/stop/failover them.
952
    It is safe for existing instances but could impact performance.
953

    
954
    """
955

    
956
    opts = self.opts
957

    
958
    Log("Testing global parameters")
959

    
960
    if (len(self.nodes) == 1 and
961
        opts.disk_template not in (constants.DT_DISKLESS, constants.DT_PLAIN,
962
                                   constants.DT_FILE)):
963
      Err("When one node is available/selected the disk template must"
964
          " be 'diskless', 'file' or 'plain'")
965

    
966
    has_err = True
967
    try:
968
      self.BurnCreateInstances()
969
      if opts.do_replace1 and opts.disk_template in constants.DTS_NET_MIRROR:
970
        self.BurnReplaceDisks1D8()
971
      if (opts.do_replace2 and len(self.nodes) > 2 and
972
          opts.disk_template in constants.DTS_NET_MIRROR) :
973
        self.BurnReplaceDisks2()
974

    
975
      if (opts.disk_template in constants.DTS_GROWABLE and
976
          utils.any(self.disk_growth, lambda n: n > 0)):
977
        self.BurnGrowDisks()
978

    
979
      if opts.do_failover and opts.disk_template in constants.DTS_NET_MIRROR:
980
        self.BurnFailover()
981

    
982
      if opts.do_migrate and opts.disk_template == constants.DT_DRBD8:
983
        self.BurnMigrate()
984

    
985
      if (opts.do_move and len(self.nodes) > 1 and
986
          opts.disk_template in [constants.DT_PLAIN, constants.DT_FILE]):
987
        self.BurnMove()
988

    
989
      if (opts.do_importexport and
990
          opts.disk_template not in (constants.DT_DISKLESS,
991
                                     constants.DT_FILE)):
992
        self.BurnImportExport()
993

    
994
      if opts.do_reinstall:
995
        self.BurnReinstall()
996

    
997
      if opts.do_reboot:
998
        self.BurnReboot()
999

    
1000
      if opts.do_addremove_disks:
1001
        self.BurnAddRemoveDisks()
1002

    
1003
      default_nic_mode = self.cluster_default_nicparams[constants.NIC_MODE]
1004
      # Don't add/remove nics in routed mode, as we would need an ip to add
1005
      # them with
1006
      if opts.do_addremove_nics:
1007
        if default_nic_mode == constants.NIC_MODE_BRIDGED:
1008
          self.BurnAddRemoveNICs()
1009
        else:
1010
          Log("Skipping nic add/remove as the cluster is not in bridged mode")
1011

    
1012
      if opts.do_activate_disks:
1013
        self.BurnActivateDisks()
1014

    
1015
      if opts.rename:
1016
        self.BurnRename()
1017

    
1018
      if opts.do_confd_tests:
1019
        self.BurnConfd()
1020

    
1021
      if opts.do_startstop:
1022
        self.BurnStopStart()
1023

    
1024
      has_err = False
1025
    finally:
1026
      if has_err:
1027
        Log("Error detected: opcode buffer follows:\n\n")
1028
        Log(self.GetFeedbackBuf())
1029
        Log("\n\n")
1030
      if not self.opts.keep_instances:
1031
        try:
1032
          self.BurnRemove()
1033
        except Exception, err:  # pylint: disable-msg=W0703
1034
          if has_err: # already detected errors, so errors in removal
1035
                      # are quite expected
1036
            Log("Note: error detected during instance remove: %s", err)
1037
          else: # non-expected error
1038
            raise
1039

    
1040
    return 0
1041

    
1042

    
1043
def main():
1044
  """Main function"""
1045

    
1046
  burner = Burner()
1047
  return burner.BurninCluster()
1048

    
1049

    
1050
if __name__ == "__main__":
1051
  main()