Statistics
| Branch: | Tag: | Revision:

root / tools / burnin @ 85edf27e

History | View | Annotate | Download (37.8 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Burnin program
23

    
24
"""
25

    
26
import sys
27
import optparse
28
import time
29
import socket
30
import urllib
31
from itertools import izip, islice, cycle
32
from cStringIO import StringIO
33

    
34
from ganeti import opcodes
35
from ganeti import constants
36
from ganeti import cli
37
from ganeti import errors
38
from ganeti import utils
39
from ganeti import hypervisor
40
from ganeti import compat
41

    
42
from ganeti.confd import client as confd_client
43

    
44

    
45
USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...")
46

    
47
MAX_RETRIES = 3
48
LOG_HEADERS = {
49
  0: "- ",
50
  1: "* ",
51
  2: ""
52
  }
53

    
54
class InstanceDown(Exception):
55
  """The checked instance was not up"""
56

    
57

    
58
class BurninFailure(Exception):
59
  """Failure detected during burning"""
60

    
61

    
62
def Usage():
63
  """Shows program usage information and exits the program."""
64

    
65
  print >> sys.stderr, "Usage:"
66
  print >> sys.stderr, USAGE
67
  sys.exit(2)
68

    
69

    
70
def Log(msg, *args, **kwargs):
71
  """Simple function that prints out its argument.
72

    
73
  """
74
  if args:
75
    msg = msg % args
76
  indent = kwargs.get('indent', 0)
77
  sys.stdout.write("%*s%s%s\n" % (2*indent, "",
78
                                  LOG_HEADERS.get(indent, "  "), msg))
79
  sys.stdout.flush()
80

    
81

    
82
def Err(msg, exit_code=1):
83
  """Simple error logging that prints to stderr.
84

    
85
  """
86
  sys.stderr.write(msg + "\n")
87
  sys.stderr.flush()
88
  sys.exit(exit_code)
89

    
90

    
91
class SimpleOpener(urllib.FancyURLopener):
92
  """A simple url opener"""
93
  # pylint: disable-msg=W0221
94

    
95
  def prompt_user_passwd(self, host, realm, clear_cache=0):
96
    """No-interaction version of prompt_user_passwd."""
97
    # we follow parent class' API
98
    # pylint: disable-msg=W0613
99
    return None, None
100

    
101
  def http_error_default(self, url, fp, errcode, errmsg, headers):
102
    """Custom error handling"""
103
    # make sure sockets are not left in CLOSE_WAIT, this is similar
104
    # but with a different exception to the BasicURLOpener class
105
    _ = fp.read() # throw away data
106
    fp.close()
107
    raise InstanceDown("HTTP error returned: code %s, msg %s" %
108
                       (errcode, errmsg))
109

    
110

    
111
OPTIONS = [
112
  cli.cli_option("-o", "--os", dest="os", default=None,
113
                 help="OS to use during burnin",
114
                 metavar="<OS>",
115
                 completion_suggest=cli.OPT_COMPL_ONE_OS),
116
  cli.HYPERVISOR_OPT,
117
  cli.OSPARAMS_OPT,
118
  cli.cli_option("--disk-size", dest="disk_size",
119
                 help="Disk size (determines disk count)",
120
                 default="128m", type="string", metavar="<size,size,...>",
121
                 completion_suggest=("128M 512M 1G 4G 1G,256M"
122
                                     " 4G,1G,1G 10G").split()),
123
  cli.cli_option("--disk-growth", dest="disk_growth", help="Disk growth",
124
                 default="128m", type="string", metavar="<size,size,...>"),
125
  cli.cli_option("--mem-size", dest="mem_size", help="Memory size",
126
                 default=128, type="unit", metavar="<size>",
127
                 completion_suggest=("128M 256M 512M 1G 4G 8G"
128
                                     " 12G 16G").split()),
129
  cli.DEBUG_OPT,
130
  cli.VERBOSE_OPT,
131
  cli.NOIPCHECK_OPT,
132
  cli.NONAMECHECK_OPT,
133
  cli.EARLY_RELEASE_OPT,
134
  cli.cli_option("--no-replace1", dest="do_replace1",
135
                 help="Skip disk replacement with the same secondary",
136
                 action="store_false", default=True),
137
  cli.cli_option("--no-replace2", dest="do_replace2",
138
                 help="Skip disk replacement with a different secondary",
139
                 action="store_false", default=True),
140
  cli.cli_option("--no-failover", dest="do_failover",
141
                 help="Skip instance failovers", action="store_false",
142
                 default=True),
143
  cli.cli_option("--no-migrate", dest="do_migrate",
144
                 help="Skip instance live migration",
145
                 action="store_false", default=True),
146
  cli.cli_option("--no-move", dest="do_move",
147
                 help="Skip instance moves", action="store_false",
148
                 default=True),
149
  cli.cli_option("--no-importexport", dest="do_importexport",
150
                 help="Skip instance export/import", action="store_false",
151
                 default=True),
152
  cli.cli_option("--no-startstop", dest="do_startstop",
153
                 help="Skip instance stop/start", action="store_false",
154
                 default=True),
155
  cli.cli_option("--no-reinstall", dest="do_reinstall",
156
                 help="Skip instance reinstall", action="store_false",
157
                 default=True),
158
  cli.cli_option("--no-reboot", dest="do_reboot",
159
                 help="Skip instance reboot", action="store_false",
160
                 default=True),
161
  cli.cli_option("--no-activate-disks", dest="do_activate_disks",
162
                 help="Skip disk activation/deactivation",
163
                 action="store_false", default=True),
164
  cli.cli_option("--no-add-disks", dest="do_addremove_disks",
165
                 help="Skip disk addition/removal",
166
                 action="store_false", default=True),
167
  cli.cli_option("--no-add-nics", dest="do_addremove_nics",
168
                 help="Skip NIC addition/removal",
169
                 action="store_false", default=True),
170
  cli.cli_option("--no-nics", dest="nics",
171
                 help="No network interfaces", action="store_const",
172
                 const=[], default=[{}]),
173
  cli.cli_option("--no-confd", dest="do_confd_tests",
174
                 help="Skip confd queries",
175
                 action="store_false", default=True),
176
  cli.cli_option("--rename", dest="rename", default=None,
177
                 help=("Give one unused instance name which is taken"
178
                       " to start the renaming sequence"),
179
                 metavar="<instance_name>"),
180
  cli.cli_option("-t", "--disk-template", dest="disk_template",
181
                 choices=list(constants.DISK_TEMPLATES),
182
                 default=constants.DT_DRBD8,
183
                 help="Disk template (diskless, file, plain or drbd) [drbd]"),
184
  cli.cli_option("-n", "--nodes", dest="nodes", default="",
185
                 help=("Comma separated list of nodes to perform"
186
                       " the burnin on (defaults to all nodes)"),
187
                 completion_suggest=cli.OPT_COMPL_MANY_NODES),
188
  cli.cli_option("-I", "--iallocator", dest="iallocator",
189
                 default=None, type="string",
190
                 help=("Perform the allocation using an iallocator"
191
                       " instead of fixed node spread (node restrictions no"
192
                       " longer apply, therefore -n/--nodes must not be"
193
                       " used"),
194
                 completion_suggest=cli.OPT_COMPL_ONE_IALLOCATOR),
195
  cli.cli_option("-p", "--parallel", default=False, action="store_true",
196
                 dest="parallel",
197
                 help=("Enable parallelization of some operations in"
198
                       " order to speed burnin or to test granular locking")),
199
  cli.cli_option("--net-timeout", default=15, type="int",
200
                 dest="net_timeout",
201
                 help=("The instance check network timeout in seconds"
202
                       " (defaults to 15 seconds)"),
203
                 completion_suggest="15 60 300 900".split()),
204
  cli.cli_option("-C", "--http-check", default=False, action="store_true",
205
                 dest="http_check",
206
                 help=("Enable checking of instance status via http,"
207
                       " looking for /hostname.txt that should contain the"
208
                       " name of the instance")),
209
  cli.cli_option("-K", "--keep-instances", default=False,
210
                 action="store_true",
211
                 dest="keep_instances",
212
                 help=("Leave instances on the cluster after burnin,"
213
                       " for investigation in case of errors or simply"
214
                       " to use them")),
215
  ]
216

    
217
# Mainly used for bash completion
218
ARGUMENTS = [cli.ArgInstance(min=1)]
219

    
220

    
221
def _DoCheckInstances(fn):
222
  """Decorator for checking instances.
223

    
224
  """
225
  def wrapper(self, *args, **kwargs):
226
    val = fn(self, *args, **kwargs)
227
    for instance in self.instances:
228
      self._CheckInstanceAlive(instance) # pylint: disable-msg=W0212
229
    return val
230

    
231
  return wrapper
232

    
233

    
234
def _DoBatch(retry):
235
  """Decorator for possible batch operations.
236

    
237
  Must come after the _DoCheckInstances decorator (if any).
238

    
239
  @param retry: whether this is a retryable batch, will be
240
      passed to StartBatch
241

    
242
  """
243
  def wrap(fn):
244
    def batched(self, *args, **kwargs):
245
      self.StartBatch(retry)
246
      val = fn(self, *args, **kwargs)
247
      self.CommitQueue()
248
      return val
249
    return batched
250

    
251
  return wrap
252

    
253

    
254
class Burner(object):
255
  """Burner class."""
256

    
257
  def __init__(self):
258
    """Constructor."""
259
    utils.SetupLogging(constants.LOG_BURNIN, debug=False, stderr_logging=True)
260
    self.url_opener = SimpleOpener()
261
    self._feed_buf = StringIO()
262
    self.nodes = []
263
    self.instances = []
264
    self.to_rem = []
265
    self.queued_ops = []
266
    self.opts = None
267
    self.queue_retry = False
268
    self.disk_count = self.disk_growth = self.disk_size = None
269
    self.hvp = self.bep = None
270
    self.ParseOptions()
271
    self.cl = cli.GetClient()
272
    self.GetState()
273

    
274
  def ClearFeedbackBuf(self):
275
    """Clear the feedback buffer."""
276
    self._feed_buf.truncate(0)
277

    
278
  def GetFeedbackBuf(self):
279
    """Return the contents of the buffer."""
280
    return self._feed_buf.getvalue()
281

    
282
  def Feedback(self, msg):
283
    """Acumulate feedback in our buffer."""
284
    formatted_msg = "%s %s" % (time.ctime(utils.MergeTime(msg[0])), msg[2])
285
    self._feed_buf.write(formatted_msg + "\n")
286
    if self.opts.verbose:
287
      Log(formatted_msg, indent=3)
288

    
289
  def MaybeRetry(self, retry_count, msg, fn, *args):
290
    """Possibly retry a given function execution.
291

    
292
    @type retry_count: int
293
    @param retry_count: retry counter:
294
        - 0: non-retryable action
295
        - 1: last retry for a retryable action
296
        - MAX_RETRIES: original try for a retryable action
297
    @type msg: str
298
    @param msg: the kind of the operation
299
    @type fn: callable
300
    @param fn: the function to be called
301

    
302
    """
303
    try:
304
      val = fn(*args)
305
      if retry_count > 0 and retry_count < MAX_RETRIES:
306
        Log("Idempotent %s succeeded after %d retries",
307
            msg, MAX_RETRIES - retry_count)
308
      return val
309
    except Exception, err: # pylint: disable-msg=W0703
310
      if retry_count == 0:
311
        Log("Non-idempotent %s failed, aborting", msg)
312
        raise
313
      elif retry_count == 1:
314
        Log("Idempotent %s repeated failure, aborting", msg)
315
        raise
316
      else:
317
        Log("Idempotent %s failed, retry #%d/%d: %s",
318
            msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err)
319
        self.MaybeRetry(retry_count - 1, msg, fn, *args)
320

    
321
  def _SetDebug(self, ops):
322
    """Set the debug value on the given opcodes"""
323
    for op in ops:
324
      op.debug_level = self.opts.debug
325

    
326
  def _ExecOp(self, *ops):
327
    """Execute one or more opcodes and manage the exec buffer.
328

    
329
    @return: if only opcode has been passed, we return its result;
330
        otherwise we return the list of results
331

    
332
    """
333
    job_id = cli.SendJob(ops, cl=self.cl)
334
    results = cli.PollJob(job_id, cl=self.cl, feedback_fn=self.Feedback)
335
    if len(ops) == 1:
336
      return results[0]
337
    else:
338
      return results
339

    
340
  def ExecOp(self, retry, *ops):
341
    """Execute one or more opcodes and manage the exec buffer.
342

    
343
    @return: if only opcode has been passed, we return its result;
344
        otherwise we return the list of results
345

    
346
    """
347
    if retry:
348
      rval = MAX_RETRIES
349
    else:
350
      rval = 0
351
    self._SetDebug(ops)
352
    return self.MaybeRetry(rval, "opcode", self._ExecOp, *ops)
353

    
354
  def ExecOrQueue(self, name, ops, post_process=None):
355
    """Execute an opcode and manage the exec buffer."""
356
    if self.opts.parallel:
357
      self._SetDebug(ops)
358
      self.queued_ops.append((ops, name, post_process))
359
    else:
360
      val = self.ExecOp(self.queue_retry, *ops) # pylint: disable-msg=W0142
361
      if post_process is not None:
362
        post_process()
363
      return val
364

    
365
  def StartBatch(self, retry):
366
    """Start a new batch of jobs.
367

    
368
    @param retry: whether this is a retryable batch
369

    
370
    """
371
    self.queued_ops = []
372
    self.queue_retry = retry
373

    
374
  def CommitQueue(self):
375
    """Execute all submitted opcodes in case of parallel burnin"""
376
    if not self.opts.parallel:
377
      return
378

    
379
    if self.queue_retry:
380
      rval = MAX_RETRIES
381
    else:
382
      rval = 0
383

    
384
    try:
385
      results = self.MaybeRetry(rval, "jobset", self.ExecJobSet,
386
                                self.queued_ops)
387
    finally:
388
      self.queued_ops = []
389
    return results
390

    
391
  def ExecJobSet(self, jobs):
392
    """Execute a set of jobs and return once all are done.
393

    
394
    The method will return the list of results, if all jobs are
395
    successful. Otherwise, OpExecError will be raised from within
396
    cli.py.
397

    
398
    """
399
    self.ClearFeedbackBuf()
400
    jex = cli.JobExecutor(cl=self.cl, feedback_fn=self.Feedback)
401
    for ops, name, _ in jobs:
402
      jex.QueueJob(name, *ops) # pylint: disable-msg=W0142
403
    try:
404
      results = jex.GetResults()
405
    except Exception, err: # pylint: disable-msg=W0703
406
      Log("Jobs failed: %s", err)
407
      raise BurninFailure()
408

    
409
    fail = False
410
    val = []
411
    for (_, name, post_process), (success, result) in zip(jobs, results):
412
      if success:
413
        if post_process:
414
          try:
415
            post_process()
416
          except Exception, err: # pylint: disable-msg=W0703
417
            Log("Post process call for job %s failed: %s", name, err)
418
            fail = True
419
        val.append(result)
420
      else:
421
        fail = True
422

    
423
    if fail:
424
      raise BurninFailure()
425

    
426
    return val
427

    
428
  def ParseOptions(self):
429
    """Parses the command line options.
430

    
431
    In case of command line errors, it will show the usage and exit the
432
    program.
433

    
434
    """
435
    parser = optparse.OptionParser(usage="\n%s" % USAGE,
436
                                   version=("%%prog (ganeti) %s" %
437
                                            constants.RELEASE_VERSION),
438
                                   option_list=OPTIONS)
439

    
440
    options, args = parser.parse_args()
441
    if len(args) < 1 or options.os is None:
442
      Usage()
443

    
444
    supported_disk_templates = (constants.DT_DISKLESS,
445
                                constants.DT_FILE,
446
                                constants.DT_PLAIN,
447
                                constants.DT_DRBD8)
448
    if options.disk_template not in supported_disk_templates:
449
      Err("Unknown disk template '%s'" % options.disk_template)
450

    
451
    if options.disk_template == constants.DT_DISKLESS:
452
      disk_size = disk_growth = []
453
      options.do_addremove_disks = False
454
    else:
455
      disk_size = [utils.ParseUnit(v) for v in options.disk_size.split(",")]
456
      disk_growth = [utils.ParseUnit(v)
457
                     for v in options.disk_growth.split(",")]
458
      if len(disk_growth) != len(disk_size):
459
        Err("Wrong disk sizes/growth combination")
460
    if ((disk_size and options.disk_template == constants.DT_DISKLESS) or
461
        (not disk_size and options.disk_template != constants.DT_DISKLESS)):
462
      Err("Wrong disk count/disk template combination")
463

    
464
    self.disk_size = disk_size
465
    self.disk_growth = disk_growth
466
    self.disk_count = len(disk_size)
467

    
468
    if options.nodes and options.iallocator:
469
      Err("Give either the nodes option or the iallocator option, not both")
470

    
471
    if options.http_check and not options.name_check:
472
      Err("Can't enable HTTP checks without name checks")
473

    
474
    self.opts = options
475
    self.instances = args
476
    self.bep = {
477
      constants.BE_MEMORY: options.mem_size,
478
      constants.BE_VCPUS: 1,
479
      }
480

    
481
    self.hypervisor = None
482
    self.hvp = {}
483
    if options.hypervisor:
484
      self.hypervisor, self.hvp = options.hypervisor
485

    
486
    socket.setdefaulttimeout(options.net_timeout)
487

    
488
  def GetState(self):
489
    """Read the cluster state from the master daemon."""
490
    if self.opts.nodes:
491
      names = self.opts.nodes.split(",")
492
    else:
493
      names = []
494
    try:
495
      op = opcodes.OpQueryNodes(output_fields=["name", "offline", "drained"],
496
                                names=names, use_locking=True)
497
      result = self.ExecOp(True, op)
498
    except errors.GenericError, err:
499
      err_code, msg = cli.FormatError(err)
500
      Err(msg, exit_code=err_code)
501
    self.nodes = [data[0] for data in result if not (data[1] or data[2])]
502

    
503
    op_diagnose = opcodes.OpDiagnoseOS(output_fields=["name", "valid",
504
                                                      "variants"], names=[])
505
    result = self.ExecOp(True, op_diagnose)
506

    
507
    if not result:
508
      Err("Can't get the OS list")
509

    
510
    found = False
511
    for (name, valid, variants) in result:
512
      if valid and self.opts.os in cli.CalculateOSNames(name, variants):
513
        found = True
514
        break
515

    
516
    if not found:
517
      Err("OS '%s' not found" % self.opts.os)
518

    
519
    cluster_info = self.cl.QueryClusterInfo()
520
    self.cluster_info = cluster_info
521
    if not self.cluster_info:
522
      Err("Can't get cluster info")
523

    
524
    default_nic_params = self.cluster_info["nicparams"][constants.PP_DEFAULT]
525
    self.cluster_default_nicparams = default_nic_params
526
    if self.hypervisor is None:
527
      self.hypervisor = self.cluster_info["default_hypervisor"]
528
    self.hv_class = hypervisor.GetHypervisorClass(self.hypervisor)
529

    
530
  @_DoCheckInstances
531
  @_DoBatch(False)
532
  def BurnCreateInstances(self):
533
    """Create the given instances.
534

    
535
    """
536
    self.to_rem = []
537
    mytor = izip(cycle(self.nodes),
538
                 islice(cycle(self.nodes), 1, None),
539
                 self.instances)
540

    
541
    Log("Creating instances")
542
    for pnode, snode, instance in mytor:
543
      Log("instance %s", instance, indent=1)
544
      if self.opts.iallocator:
545
        pnode = snode = None
546
        msg = "with iallocator %s" % self.opts.iallocator
547
      elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
548
        snode = None
549
        msg = "on %s" % pnode
550
      else:
551
        msg = "on %s, %s" % (pnode, snode)
552

    
553
      Log(msg, indent=2)
554

    
555
      op = opcodes.OpCreateInstance(instance_name=instance,
556
                                    disks = [ {"size": size}
557
                                              for size in self.disk_size],
558
                                    disk_template=self.opts.disk_template,
559
                                    nics=self.opts.nics,
560
                                    mode=constants.INSTANCE_CREATE,
561
                                    os_type=self.opts.os,
562
                                    pnode=pnode,
563
                                    snode=snode,
564
                                    start=True,
565
                                    ip_check=self.opts.ip_check,
566
                                    name_check=self.opts.name_check,
567
                                    wait_for_sync=True,
568
                                    file_driver="loop",
569
                                    file_storage_dir=None,
570
                                    iallocator=self.opts.iallocator,
571
                                    beparams=self.bep,
572
                                    hvparams=self.hvp,
573
                                    hypervisor=self.hypervisor,
574
                                    osparams=self.opts.osparams,
575
                                    )
576
      remove_instance = lambda name: lambda: self.to_rem.append(name)
577
      self.ExecOrQueue(instance, [op], post_process=remove_instance(instance))
578

    
579
  @_DoBatch(False)
580
  def BurnGrowDisks(self):
581
    """Grow both the os and the swap disks by the requested amount, if any."""
582
    Log("Growing disks")
583
    for instance in self.instances:
584
      Log("instance %s", instance, indent=1)
585
      for idx, growth in enumerate(self.disk_growth):
586
        if growth > 0:
587
          op = opcodes.OpGrowDisk(instance_name=instance, disk=idx,
588
                                  amount=growth, wait_for_sync=True)
589
          Log("increase disk/%s by %s MB", idx, growth, indent=2)
590
          self.ExecOrQueue(instance, [op])
591

    
592
  @_DoBatch(True)
593
  def BurnReplaceDisks1D8(self):
594
    """Replace disks on primary and secondary for drbd8."""
595
    Log("Replacing disks on the same nodes")
596
    for instance in self.instances:
597
      Log("instance %s", instance, indent=1)
598
      ops = []
599
      for mode in constants.REPLACE_DISK_SEC, constants.REPLACE_DISK_PRI:
600
        op = opcodes.OpReplaceDisks(instance_name=instance,
601
                                    mode=mode,
602
                                    disks=[i for i in range(self.disk_count)],
603
                                    early_release=self.opts.early_release)
604
        Log("run %s", mode, indent=2)
605
        ops.append(op)
606
      self.ExecOrQueue(instance, ops)
607

    
608
  @_DoBatch(True)
609
  def BurnReplaceDisks2(self):
610
    """Replace secondary node."""
611
    Log("Changing the secondary node")
612
    mode = constants.REPLACE_DISK_CHG
613

    
614
    mytor = izip(islice(cycle(self.nodes), 2, None),
615
                 self.instances)
616
    for tnode, instance in mytor:
617
      Log("instance %s", instance, indent=1)
618
      if self.opts.iallocator:
619
        tnode = None
620
        msg = "with iallocator %s" % self.opts.iallocator
621
      else:
622
        msg = tnode
623
      op = opcodes.OpReplaceDisks(instance_name=instance,
624
                                  mode=mode,
625
                                  remote_node=tnode,
626
                                  iallocator=self.opts.iallocator,
627
                                  disks=[],
628
                                  early_release=self.opts.early_release)
629
      Log("run %s %s", mode, msg, indent=2)
630
      self.ExecOrQueue(instance, [op])
631

    
632
  @_DoCheckInstances
633
  @_DoBatch(False)
634
  def BurnFailover(self):
635
    """Failover the instances."""
636
    Log("Failing over instances")
637
    for instance in self.instances:
638
      Log("instance %s", instance, indent=1)
639
      op = opcodes.OpFailoverInstance(instance_name=instance,
640
                                      ignore_consistency=False)
641
      self.ExecOrQueue(instance, [op])
642

    
643
  @_DoCheckInstances
644
  @_DoBatch(False)
645
  def BurnMove(self):
646
    """Move the instances."""
647
    Log("Moving instances")
648
    mytor = izip(islice(cycle(self.nodes), 1, None),
649
                 self.instances)
650
    for tnode, instance in mytor:
651
      Log("instance %s", instance, indent=1)
652
      op = opcodes.OpMoveInstance(instance_name=instance,
653
                                  target_node=tnode)
654
      self.ExecOrQueue(instance, [op])
655

    
656
  @_DoBatch(False)
657
  def BurnMigrate(self):
658
    """Migrate the instances."""
659
    Log("Migrating instances")
660
    for instance in self.instances:
661
      Log("instance %s", instance, indent=1)
662
      op1 = opcodes.OpMigrateInstance(instance_name=instance, live=True,
663
                                      cleanup=False)
664

    
665
      op2 = opcodes.OpMigrateInstance(instance_name=instance, live=True,
666
                                      cleanup=True)
667
      Log("migration and migration cleanup", indent=2)
668
      self.ExecOrQueue(instance, [op1, op2])
669

    
670
  @_DoCheckInstances
671
  @_DoBatch(False)
672
  def BurnImportExport(self):
673
    """Export the instance, delete it, and import it back.
674

    
675
    """
676
    Log("Exporting and re-importing instances")
677
    mytor = izip(cycle(self.nodes),
678
                 islice(cycle(self.nodes), 1, None),
679
                 islice(cycle(self.nodes), 2, None),
680
                 self.instances)
681

    
682
    for pnode, snode, enode, instance in mytor:
683
      Log("instance %s", instance, indent=1)
684
      # read the full name of the instance
685
      nam_op = opcodes.OpQueryInstances(output_fields=["name"],
686
                                        names=[instance], use_locking=True)
687
      full_name = self.ExecOp(False, nam_op)[0][0]
688

    
689
      if self.opts.iallocator:
690
        pnode = snode = None
691
        import_log_msg = ("import from %s"
692
                          " with iallocator %s" %
693
                          (enode, self.opts.iallocator))
694
      elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
695
        snode = None
696
        import_log_msg = ("import from %s to %s" %
697
                          (enode, pnode))
698
      else:
699
        import_log_msg = ("import from %s to %s, %s" %
700
                          (enode, pnode, snode))
701

    
702
      exp_op = opcodes.OpExportInstance(instance_name=instance,
703
                                        target_node=enode,
704
                                        mode=constants.EXPORT_MODE_LOCAL,
705
                                        shutdown=True)
706
      rem_op = opcodes.OpRemoveInstance(instance_name=instance,
707
                                        ignore_failures=True)
708
      imp_dir = utils.PathJoin(constants.EXPORT_DIR, full_name)
709
      imp_op = opcodes.OpCreateInstance(instance_name=instance,
710
                                        disks = [ {"size": size}
711
                                                  for size in self.disk_size],
712
                                        disk_template=self.opts.disk_template,
713
                                        nics=self.opts.nics,
714
                                        mode=constants.INSTANCE_IMPORT,
715
                                        src_node=enode,
716
                                        src_path=imp_dir,
717
                                        pnode=pnode,
718
                                        snode=snode,
719
                                        start=True,
720
                                        ip_check=self.opts.ip_check,
721
                                        name_check=self.opts.name_check,
722
                                        wait_for_sync=True,
723
                                        file_storage_dir=None,
724
                                        file_driver="loop",
725
                                        iallocator=self.opts.iallocator,
726
                                        beparams=self.bep,
727
                                        hvparams=self.hvp,
728
                                        osparams=self.opts.osparams,
729
                                        )
730

    
731
      erem_op = opcodes.OpRemoveExport(instance_name=instance)
732

    
733
      Log("export to node %s", enode, indent=2)
734
      Log("remove instance", indent=2)
735
      Log(import_log_msg, indent=2)
736
      Log("remove export", indent=2)
737
      self.ExecOrQueue(instance, [exp_op, rem_op, imp_op, erem_op])
738

    
739
  @staticmethod
740
  def StopInstanceOp(instance):
741
    """Stop given instance."""
742
    return opcodes.OpShutdownInstance(instance_name=instance)
743

    
744
  @staticmethod
745
  def StartInstanceOp(instance):
746
    """Start given instance."""
747
    return opcodes.OpStartupInstance(instance_name=instance, force=False)
748

    
749
  @staticmethod
750
  def RenameInstanceOp(instance, instance_new):
751
    """Rename instance."""
752
    return opcodes.OpRenameInstance(instance_name=instance,
753
                                    new_name=instance_new)
754

    
755
  @_DoCheckInstances
756
  @_DoBatch(True)
757
  def BurnStopStart(self):
758
    """Stop/start the instances."""
759
    Log("Stopping and starting instances")
760
    for instance in self.instances:
761
      Log("instance %s", instance, indent=1)
762
      op1 = self.StopInstanceOp(instance)
763
      op2 = self.StartInstanceOp(instance)
764
      self.ExecOrQueue(instance, [op1, op2])
765

    
766
  @_DoBatch(False)
767
  def BurnRemove(self):
768
    """Remove the instances."""
769
    Log("Removing instances")
770
    for instance in self.to_rem:
771
      Log("instance %s", instance, indent=1)
772
      op = opcodes.OpRemoveInstance(instance_name=instance,
773
                                    ignore_failures=True)
774
      self.ExecOrQueue(instance, [op])
775

    
776
  def BurnRename(self):
777
    """Rename the instances.
778

    
779
    Note that this function will not execute in parallel, since we
780
    only have one target for rename.
781

    
782
    """
783
    Log("Renaming instances")
784
    rename = self.opts.rename
785
    for instance in self.instances:
786
      Log("instance %s", instance, indent=1)
787
      op_stop1 = self.StopInstanceOp(instance)
788
      op_stop2 = self.StopInstanceOp(rename)
789
      op_rename1 = self.RenameInstanceOp(instance, rename)
790
      op_rename2 = self.RenameInstanceOp(rename, instance)
791
      op_start1 = self.StartInstanceOp(rename)
792
      op_start2 = self.StartInstanceOp(instance)
793
      self.ExecOp(False, op_stop1, op_rename1, op_start1)
794
      self._CheckInstanceAlive(rename)
795
      self.ExecOp(False, op_stop2, op_rename2, op_start2)
796
      self._CheckInstanceAlive(instance)
797

    
798
  @_DoCheckInstances
799
  @_DoBatch(True)
800
  def BurnReinstall(self):
801
    """Reinstall the instances."""
802
    Log("Reinstalling instances")
803
    for instance in self.instances:
804
      Log("instance %s", instance, indent=1)
805
      op1 = self.StopInstanceOp(instance)
806
      op2 = opcodes.OpReinstallInstance(instance_name=instance)
807
      Log("reinstall without passing the OS", indent=2)
808
      op3 = opcodes.OpReinstallInstance(instance_name=instance,
809
                                        os_type=self.opts.os)
810
      Log("reinstall specifying the OS", indent=2)
811
      op4 = self.StartInstanceOp(instance)
812
      self.ExecOrQueue(instance, [op1, op2, op3, op4])
813

    
814
  @_DoCheckInstances
815
  @_DoBatch(True)
816
  def BurnReboot(self):
817
    """Reboot the instances."""
818
    Log("Rebooting instances")
819
    for instance in self.instances:
820
      Log("instance %s", instance, indent=1)
821
      ops = []
822
      for reboot_type in constants.REBOOT_TYPES:
823
        op = opcodes.OpRebootInstance(instance_name=instance,
824
                                      reboot_type=reboot_type,
825
                                      ignore_secondaries=False)
826
        Log("reboot with type '%s'", reboot_type, indent=2)
827
        ops.append(op)
828
      self.ExecOrQueue(instance, ops)
829

    
830
  @_DoCheckInstances
831
  @_DoBatch(True)
832
  def BurnActivateDisks(self):
833
    """Activate and deactivate disks of the instances."""
834
    Log("Activating/deactivating disks")
835
    for instance in self.instances:
836
      Log("instance %s", instance, indent=1)
837
      op_start = self.StartInstanceOp(instance)
838
      op_act = opcodes.OpActivateInstanceDisks(instance_name=instance)
839
      op_deact = opcodes.OpDeactivateInstanceDisks(instance_name=instance)
840
      op_stop = self.StopInstanceOp(instance)
841
      Log("activate disks when online", indent=2)
842
      Log("activate disks when offline", indent=2)
843
      Log("deactivate disks (when offline)", indent=2)
844
      self.ExecOrQueue(instance, [op_act, op_stop, op_act, op_deact, op_start])
845

    
846
  @_DoCheckInstances
847
  @_DoBatch(False)
848
  def BurnAddRemoveDisks(self):
849
    """Add and remove an extra disk for the instances."""
850
    Log("Adding and removing disks")
851
    for instance in self.instances:
852
      Log("instance %s", instance, indent=1)
853
      op_add = opcodes.OpSetInstanceParams(\
854
        instance_name=instance,
855
        disks=[(constants.DDM_ADD, {"size": self.disk_size[0]})])
856
      op_rem = opcodes.OpSetInstanceParams(\
857
        instance_name=instance, disks=[(constants.DDM_REMOVE, {})])
858
      op_stop = self.StopInstanceOp(instance)
859
      op_start = self.StartInstanceOp(instance)
860
      Log("adding a disk", indent=2)
861
      Log("removing last disk", indent=2)
862
      self.ExecOrQueue(instance, [op_add, op_stop, op_rem, op_start])
863

    
864
  @_DoBatch(False)
865
  def BurnAddRemoveNICs(self):
866
    """Add and remove an extra NIC for the instances."""
867
    Log("Adding and removing NICs")
868
    for instance in self.instances:
869
      Log("instance %s", instance, indent=1)
870
      op_add = opcodes.OpSetInstanceParams(\
871
        instance_name=instance, nics=[(constants.DDM_ADD, {})])
872
      op_rem = opcodes.OpSetInstanceParams(\
873
        instance_name=instance, nics=[(constants.DDM_REMOVE, {})])
874
      Log("adding a NIC", indent=2)
875
      Log("removing last NIC", indent=2)
876
      self.ExecOrQueue(instance, [op_add, op_rem])
877

    
878
  def ConfdCallback(self, reply):
879
    """Callback for confd queries"""
880
    if reply.type == confd_client.UPCALL_REPLY:
881
      if reply.server_reply.status != constants.CONFD_REPL_STATUS_OK:
882
        Err("Query %s gave non-ok status %s: %s" % (reply.orig_request,
883
                                                    reply.server_reply.status,
884
                                                    reply.server_reply))
885
      if reply.orig_request.type == constants.CONFD_REQ_PING:
886
        Log("Ping: OK", indent=1)
887
      elif reply.orig_request.type == constants.CONFD_REQ_CLUSTER_MASTER:
888
        if reply.server_reply.answer == self.cluster_info["master"]:
889
          Log("Master: OK", indent=1)
890
        else:
891
          Err("Master: wrong: %s" % reply.server_reply.answer)
892
      elif reply.orig_request.type == constants.CONFD_REQ_NODE_ROLE_BYNAME:
893
        if reply.server_reply.answer == constants.CONFD_NODE_ROLE_MASTER:
894
          Log("Node role for master: OK", indent=1)
895
        else:
896
          Err("Node role for master: wrong: %s" % reply.server_reply.answer)
897

    
898
  def DoConfdRequestReply(self, req):
899
    self.confd_counting_callback.RegisterQuery(req.rsalt)
900
    self.confd_client.SendRequest(req, async=False)
901
    while not self.confd_counting_callback.AllAnswered():
902
      if not self.confd_client.ReceiveReply():
903
        Err("Did not receive all expected confd replies")
904
        break
905

    
906
  def BurnConfd(self):
907
    """Run confd queries for our instances.
908

    
909
    The following confd queries are tested:
910
      - CONFD_REQ_PING: simple ping
911
      - CONFD_REQ_CLUSTER_MASTER: cluster master
912
      - CONFD_REQ_NODE_ROLE_BYNAME: node role, for the master
913

    
914
    """
915
    Log("Checking confd results")
916

    
917
    filter_callback = confd_client.ConfdFilterCallback(self.ConfdCallback)
918
    counting_callback = confd_client.ConfdCountingCallback(filter_callback)
919
    self.confd_counting_callback = counting_callback
920

    
921
    self.confd_client = confd_client.GetConfdClient(counting_callback)
922

    
923
    req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
924
    self.DoConfdRequestReply(req)
925

    
926
    req = confd_client.ConfdClientRequest(
927
      type=constants.CONFD_REQ_CLUSTER_MASTER)
928
    self.DoConfdRequestReply(req)
929

    
930
    req = confd_client.ConfdClientRequest(
931
        type=constants.CONFD_REQ_NODE_ROLE_BYNAME,
932
        query=self.cluster_info["master"])
933
    self.DoConfdRequestReply(req)
934

    
935
  def _CheckInstanceAlive(self, instance):
936
    """Check if an instance is alive by doing http checks.
937

    
938
    This will try to retrieve the url on the instance /hostname.txt
939
    and check that it contains the hostname of the instance. In case
940
    we get ECONNREFUSED, we retry up to the net timeout seconds, for
941
    any other error we abort.
942

    
943
    """
944
    if not self.opts.http_check:
945
      return
946
    end_time = time.time() + self.opts.net_timeout
947
    url = None
948
    while time.time() < end_time and url is None:
949
      try:
950
        url = self.url_opener.open("http://%s/hostname.txt" % instance)
951
      except IOError:
952
        # here we can have connection refused, no route to host, etc.
953
        time.sleep(1)
954
    if url is None:
955
      raise InstanceDown(instance, "Cannot contact instance")
956
    hostname = url.read().strip()
957
    url.close()
958
    if hostname != instance:
959
      raise InstanceDown(instance, ("Hostname mismatch, expected %s, got %s" %
960
                                    (instance, hostname)))
961

    
962
  def BurninCluster(self):
963
    """Test a cluster intensively.
964

    
965
    This will create instances and then start/stop/failover them.
966
    It is safe for existing instances but could impact performance.
967

    
968
    """
969

    
970
    opts = self.opts
971

    
972
    Log("Testing global parameters")
973

    
974
    if (len(self.nodes) == 1 and
975
        opts.disk_template not in (constants.DT_DISKLESS, constants.DT_PLAIN,
976
                                   constants.DT_FILE)):
977
      Err("When one node is available/selected the disk template must"
978
          " be 'diskless', 'file' or 'plain'")
979

    
980
    has_err = True
981
    try:
982
      self.BurnCreateInstances()
983
      if opts.do_replace1 and opts.disk_template in constants.DTS_NET_MIRROR:
984
        self.BurnReplaceDisks1D8()
985
      if (opts.do_replace2 and len(self.nodes) > 2 and
986
          opts.disk_template in constants.DTS_NET_MIRROR) :
987
        self.BurnReplaceDisks2()
988

    
989
      if (opts.disk_template in constants.DTS_GROWABLE and
990
          compat.any(self.disk_growth, lambda n: n > 0)):
991
        self.BurnGrowDisks()
992

    
993
      if opts.do_failover and opts.disk_template in constants.DTS_NET_MIRROR:
994
        self.BurnFailover()
995

    
996
      if opts.do_migrate:
997
        if opts.disk_template != constants.DT_DRBD8:
998
          Log("Skipping migration (disk template not DRBD8)")
999
        elif not self.hv_class.CAN_MIGRATE:
1000
          Log("Skipping migration (hypervisor %s does not support it)",
1001
              self.hypervisor)
1002
        else:
1003
          self.BurnMigrate()
1004

    
1005
      if (opts.do_move and len(self.nodes) > 1 and
1006
          opts.disk_template in [constants.DT_PLAIN, constants.DT_FILE]):
1007
        self.BurnMove()
1008

    
1009
      if (opts.do_importexport and
1010
          opts.disk_template not in (constants.DT_DISKLESS,
1011
                                     constants.DT_FILE)):
1012
        self.BurnImportExport()
1013

    
1014
      if opts.do_reinstall:
1015
        self.BurnReinstall()
1016

    
1017
      if opts.do_reboot:
1018
        self.BurnReboot()
1019

    
1020
      if opts.do_addremove_disks:
1021
        self.BurnAddRemoveDisks()
1022

    
1023
      default_nic_mode = self.cluster_default_nicparams[constants.NIC_MODE]
1024
      # Don't add/remove nics in routed mode, as we would need an ip to add
1025
      # them with
1026
      if opts.do_addremove_nics:
1027
        if default_nic_mode == constants.NIC_MODE_BRIDGED:
1028
          self.BurnAddRemoveNICs()
1029
        else:
1030
          Log("Skipping nic add/remove as the cluster is not in bridged mode")
1031

    
1032
      if opts.do_activate_disks:
1033
        self.BurnActivateDisks()
1034

    
1035
      if opts.rename:
1036
        self.BurnRename()
1037

    
1038
      if opts.do_confd_tests:
1039
        self.BurnConfd()
1040

    
1041
      if opts.do_startstop:
1042
        self.BurnStopStart()
1043

    
1044
      has_err = False
1045
    finally:
1046
      if has_err:
1047
        Log("Error detected: opcode buffer follows:\n\n")
1048
        Log(self.GetFeedbackBuf())
1049
        Log("\n\n")
1050
      if not self.opts.keep_instances:
1051
        try:
1052
          self.BurnRemove()
1053
        except Exception, err:  # pylint: disable-msg=W0703
1054
          if has_err: # already detected errors, so errors in removal
1055
                      # are quite expected
1056
            Log("Note: error detected during instance remove: %s", err)
1057
          else: # non-expected error
1058
            raise
1059

    
1060
    return 0
1061

    
1062

    
1063
def main():
1064
  """Main function"""
1065

    
1066
  burner = Burner()
1067
  return burner.BurninCluster()
1068

    
1069

    
1070
if __name__ == "__main__":
1071
  main()