Statistics
| Branch: | Tag: | Revision:

root / tools / burnin @ 454723b5

History | View | Annotate | Download (36.9 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Burnin program
23

    
24
"""
25

    
26
import sys
27
import optparse
28
import time
29
import socket
30
import urllib
31
from itertools import izip, islice, cycle
32
from cStringIO import StringIO
33

    
34
from ganeti import opcodes
35
from ganeti import constants
36
from ganeti import cli
37
from ganeti import errors
38
from ganeti import utils
39

    
40
from ganeti.confd import client as confd_client
41

    
42

    
43
USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...")
44

    
45
MAX_RETRIES = 3
46
LOG_HEADERS = {
47
  0: "- ",
48
  1: "* ",
49
  2: ""
50
  }
51

    
52
class InstanceDown(Exception):
53
  """The checked instance was not up"""
54

    
55

    
56
class BurninFailure(Exception):
57
  """Failure detected during burning"""
58

    
59

    
60
def Usage():
61
  """Shows program usage information and exits the program."""
62

    
63
  print >> sys.stderr, "Usage:"
64
  print >> sys.stderr, USAGE
65
  sys.exit(2)
66

    
67

    
68
def Log(msg, *args, **kwargs):
69
  """Simple function that prints out its argument.
70

    
71
  """
72
  if args:
73
    msg = msg % args
74
  indent = kwargs.get('indent', 0)
75
  sys.stdout.write("%*s%s%s\n" % (2*indent, "",
76
                                  LOG_HEADERS.get(indent, "  "), msg))
77
  sys.stdout.flush()
78

    
79

    
80
def Err(msg, exit_code=1):
81
  """Simple error logging that prints to stderr.
82

    
83
  """
84
  sys.stderr.write(msg + "\n")
85
  sys.stderr.flush()
86
  sys.exit(exit_code)
87

    
88

    
89
class SimpleOpener(urllib.FancyURLopener):
90
  """A simple url opener"""
91
  # pylint: disable-msg=W0221
92

    
93
  def prompt_user_passwd(self, host, realm, clear_cache=0):
94
    """No-interaction version of prompt_user_passwd."""
95
    # we follow parent class' API
96
    # pylint: disable-msg=W0613
97
    return None, None
98

    
99
  def http_error_default(self, url, fp, errcode, errmsg, headers):
100
    """Custom error handling"""
101
    # make sure sockets are not left in CLOSE_WAIT, this is similar
102
    # but with a different exception to the BasicURLOpener class
103
    _ = fp.read() # throw away data
104
    fp.close()
105
    raise InstanceDown("HTTP error returned: code %s, msg %s" %
106
                       (errcode, errmsg))
107

    
108

    
109
OPTIONS = [
110
  cli.cli_option("-o", "--os", dest="os", default=None,
111
                 help="OS to use during burnin",
112
                 metavar="<OS>",
113
                 completion_suggest=cli.OPT_COMPL_ONE_OS),
114
  cli.cli_option("--disk-size", dest="disk_size",
115
                 help="Disk size (determines disk count)",
116
                 default="128m", type="string", metavar="<size,size,...>",
117
                 completion_suggest=("128M 512M 1G 4G 1G,256M"
118
                                     " 4G,1G,1G 10G").split()),
119
  cli.cli_option("--disk-growth", dest="disk_growth", help="Disk growth",
120
                 default="128m", type="string", metavar="<size,size,...>"),
121
  cli.cli_option("--mem-size", dest="mem_size", help="Memory size",
122
                 default=128, type="unit", metavar="<size>",
123
                 completion_suggest=("128M 256M 512M 1G 4G 8G"
124
                                     " 12G 16G").split()),
125
  cli.DEBUG_OPT,
126
  cli.VERBOSE_OPT,
127
  cli.NOIPCHECK_OPT,
128
  cli.NONAMECHECK_OPT,
129
  cli.EARLY_RELEASE_OPT,
130
  cli.cli_option("--no-replace1", dest="do_replace1",
131
                 help="Skip disk replacement with the same secondary",
132
                 action="store_false", default=True),
133
  cli.cli_option("--no-replace2", dest="do_replace2",
134
                 help="Skip disk replacement with a different secondary",
135
                 action="store_false", default=True),
136
  cli.cli_option("--no-failover", dest="do_failover",
137
                 help="Skip instance failovers", action="store_false",
138
                 default=True),
139
  cli.cli_option("--no-migrate", dest="do_migrate",
140
                 help="Skip instance live migration",
141
                 action="store_false", default=True),
142
  cli.cli_option("--no-move", dest="do_move",
143
                 help="Skip instance moves", action="store_false",
144
                 default=True),
145
  cli.cli_option("--no-importexport", dest="do_importexport",
146
                 help="Skip instance export/import", action="store_false",
147
                 default=True),
148
  cli.cli_option("--no-startstop", dest="do_startstop",
149
                 help="Skip instance stop/start", action="store_false",
150
                 default=True),
151
  cli.cli_option("--no-reinstall", dest="do_reinstall",
152
                 help="Skip instance reinstall", action="store_false",
153
                 default=True),
154
  cli.cli_option("--no-reboot", dest="do_reboot",
155
                 help="Skip instance reboot", action="store_false",
156
                 default=True),
157
  cli.cli_option("--no-activate-disks", dest="do_activate_disks",
158
                 help="Skip disk activation/deactivation",
159
                 action="store_false", default=True),
160
  cli.cli_option("--no-add-disks", dest="do_addremove_disks",
161
                 help="Skip disk addition/removal",
162
                 action="store_false", default=True),
163
  cli.cli_option("--no-add-nics", dest="do_addremove_nics",
164
                 help="Skip NIC addition/removal",
165
                 action="store_false", default=True),
166
  cli.cli_option("--no-nics", dest="nics",
167
                 help="No network interfaces", action="store_const",
168
                 const=[], default=[{}]),
169
  cli.cli_option("--no-confd", dest="do_confd_tests",
170
                 help="Skip confd queries",
171
                 action="store_false", default=True),
172
  cli.cli_option("--rename", dest="rename", default=None,
173
                 help=("Give one unused instance name which is taken"
174
                       " to start the renaming sequence"),
175
                 metavar="<instance_name>"),
176
  cli.cli_option("-t", "--disk-template", dest="disk_template",
177
                 choices=list(constants.DISK_TEMPLATES),
178
                 default=constants.DT_DRBD8,
179
                 help="Disk template (diskless, file, plain or drbd) [drbd]"),
180
  cli.cli_option("-n", "--nodes", dest="nodes", default="",
181
                 help=("Comma separated list of nodes to perform"
182
                       " the burnin on (defaults to all nodes)"),
183
                 completion_suggest=cli.OPT_COMPL_MANY_NODES),
184
  cli.cli_option("-I", "--iallocator", dest="iallocator",
185
                 default=None, type="string",
186
                 help=("Perform the allocation using an iallocator"
187
                       " instead of fixed node spread (node restrictions no"
188
                       " longer apply, therefore -n/--nodes must not be"
189
                       " used"),
190
                 completion_suggest=cli.OPT_COMPL_ONE_IALLOCATOR),
191
  cli.cli_option("-p", "--parallel", default=False, action="store_true",
192
                 dest="parallel",
193
                 help=("Enable parallelization of some operations in"
194
                       " order to speed burnin or to test granular locking")),
195
  cli.cli_option("--net-timeout", default=15, type="int",
196
                 dest="net_timeout",
197
                 help=("The instance check network timeout in seconds"
198
                       " (defaults to 15 seconds)"),
199
                 completion_suggest="15 60 300 900".split()),
200
  cli.cli_option("-C", "--http-check", default=False, action="store_true",
201
                 dest="http_check",
202
                 help=("Enable checking of instance status via http,"
203
                       " looking for /hostname.txt that should contain the"
204
                       " name of the instance")),
205
  cli.cli_option("-K", "--keep-instances", default=False,
206
                 action="store_true",
207
                 dest="keep_instances",
208
                 help=("Leave instances on the cluster after burnin,"
209
                       " for investigation in case of errors or simply"
210
                       " to use them")),
211
  ]
212

    
213
# Mainly used for bash completion
214
ARGUMENTS = [cli.ArgInstance(min=1)]
215

    
216

    
217
def _DoCheckInstances(fn):
218
  """Decorator for checking instances.
219

    
220
  """
221
  def wrapper(self, *args, **kwargs):
222
    val = fn(self, *args, **kwargs)
223
    for instance in self.instances:
224
      self._CheckInstanceAlive(instance) # pylint: disable-msg=W0212
225
    return val
226

    
227
  return wrapper
228

    
229

    
230
def _DoBatch(retry):
231
  """Decorator for possible batch operations.
232

    
233
  Must come after the _DoCheckInstances decorator (if any).
234

    
235
  @param retry: whether this is a retryable batch, will be
236
      passed to StartBatch
237

    
238
  """
239
  def wrap(fn):
240
    def batched(self, *args, **kwargs):
241
      self.StartBatch(retry)
242
      val = fn(self, *args, **kwargs)
243
      self.CommitQueue()
244
      return val
245
    return batched
246

    
247
  return wrap
248

    
249

    
250
class Burner(object):
251
  """Burner class."""
252

    
253
  def __init__(self):
254
    """Constructor."""
255
    utils.SetupLogging(constants.LOG_BURNIN, debug=False, stderr_logging=True)
256
    self.url_opener = SimpleOpener()
257
    self._feed_buf = StringIO()
258
    self.nodes = []
259
    self.instances = []
260
    self.to_rem = []
261
    self.queued_ops = []
262
    self.opts = None
263
    self.queue_retry = False
264
    self.disk_count = self.disk_growth = self.disk_size = None
265
    self.hvp = self.bep = None
266
    self.ParseOptions()
267
    self.cl = cli.GetClient()
268
    self.GetState()
269

    
270
  def ClearFeedbackBuf(self):
271
    """Clear the feedback buffer."""
272
    self._feed_buf.truncate(0)
273

    
274
  def GetFeedbackBuf(self):
275
    """Return the contents of the buffer."""
276
    return self._feed_buf.getvalue()
277

    
278
  def Feedback(self, msg):
279
    """Acumulate feedback in our buffer."""
280
    formatted_msg = "%s %s" % (time.ctime(utils.MergeTime(msg[0])), msg[2])
281
    self._feed_buf.write(formatted_msg + "\n")
282
    if self.opts.verbose:
283
      Log(formatted_msg, indent=3)
284

    
285
  def MaybeRetry(self, retry_count, msg, fn, *args):
286
    """Possibly retry a given function execution.
287

    
288
    @type retry_count: int
289
    @param retry_count: retry counter:
290
        - 0: non-retryable action
291
        - 1: last retry for a retryable action
292
        - MAX_RETRIES: original try for a retryable action
293
    @type msg: str
294
    @param msg: the kind of the operation
295
    @type fn: callable
296
    @param fn: the function to be called
297

    
298
    """
299
    try:
300
      val = fn(*args)
301
      if retry_count > 0 and retry_count < MAX_RETRIES:
302
        Log("Idempotent %s succeeded after %d retries",
303
            msg, MAX_RETRIES - retry_count)
304
      return val
305
    except Exception, err: # pylint: disable-msg=W0703
306
      if retry_count == 0:
307
        Log("Non-idempotent %s failed, aborting", msg)
308
        raise
309
      elif retry_count == 1:
310
        Log("Idempotent %s repeated failure, aborting", msg)
311
        raise
312
      else:
313
        Log("Idempotent %s failed, retry #%d/%d: %s",
314
            msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err)
315
        self.MaybeRetry(retry_count - 1, msg, fn, *args)
316

    
317
  def _SetDebug(self, ops):
318
    """Set the debug value on the given opcodes"""
319
    for op in ops:
320
      op.debug_level = self.opts.debug
321

    
322
  def _ExecOp(self, *ops):
323
    """Execute one or more opcodes and manage the exec buffer.
324

    
325
    @return: if only opcode has been passed, we return its result;
326
        otherwise we return the list of results
327

    
328
    """
329
    job_id = cli.SendJob(ops, cl=self.cl)
330
    results = cli.PollJob(job_id, cl=self.cl, feedback_fn=self.Feedback)
331
    if len(ops) == 1:
332
      return results[0]
333
    else:
334
      return results
335

    
336
  def ExecOp(self, retry, *ops):
337
    """Execute one or more opcodes and manage the exec buffer.
338

    
339
    @return: if only opcode has been passed, we return its result;
340
        otherwise we return the list of results
341

    
342
    """
343
    if retry:
344
      rval = MAX_RETRIES
345
    else:
346
      rval = 0
347
    self._SetDebug(ops)
348
    return self.MaybeRetry(rval, "opcode", self._ExecOp, *ops)
349

    
350
  def ExecOrQueue(self, name, ops, post_process=None):
351
    """Execute an opcode and manage the exec buffer."""
352
    if self.opts.parallel:
353
      self._SetDebug(ops)
354
      self.queued_ops.append((ops, name, post_process))
355
    else:
356
      val = self.ExecOp(self.queue_retry, *ops) # pylint: disable-msg=W0142
357
      if post_process is not None:
358
        post_process()
359
      return val
360

    
361
  def StartBatch(self, retry):
362
    """Start a new batch of jobs.
363

    
364
    @param retry: whether this is a retryable batch
365

    
366
    """
367
    self.queued_ops = []
368
    self.queue_retry = retry
369

    
370
  def CommitQueue(self):
371
    """Execute all submitted opcodes in case of parallel burnin"""
372
    if not self.opts.parallel:
373
      return
374

    
375
    if self.queue_retry:
376
      rval = MAX_RETRIES
377
    else:
378
      rval = 0
379

    
380
    try:
381
      results = self.MaybeRetry(rval, "jobset", self.ExecJobSet,
382
                                self.queued_ops)
383
    finally:
384
      self.queued_ops = []
385
    return results
386

    
387
  def ExecJobSet(self, jobs):
388
    """Execute a set of jobs and return once all are done.
389

    
390
    The method will return the list of results, if all jobs are
391
    successful. Otherwise, OpExecError will be raised from within
392
    cli.py.
393

    
394
    """
395
    self.ClearFeedbackBuf()
396
    jex = cli.JobExecutor(cl=self.cl, feedback_fn=self.Feedback)
397
    for ops, name, _ in jobs:
398
      jex.QueueJob(name, *ops) # pylint: disable-msg=W0142
399
    try:
400
      results = jex.GetResults()
401
    except Exception, err: # pylint: disable-msg=W0703
402
      Log("Jobs failed: %s", err)
403
      raise BurninFailure()
404

    
405
    fail = False
406
    val = []
407
    for (_, name, post_process), (success, result) in zip(jobs, results):
408
      if success:
409
        if post_process:
410
          try:
411
            post_process()
412
          except Exception, err: # pylint: disable-msg=W0703
413
            Log("Post process call for job %s failed: %s", name, err)
414
            fail = True
415
        val.append(result)
416
      else:
417
        fail = True
418

    
419
    if fail:
420
      raise BurninFailure()
421

    
422
    return val
423

    
424
  def ParseOptions(self):
425
    """Parses the command line options.
426

    
427
    In case of command line errors, it will show the usage and exit the
428
    program.
429

    
430
    """
431
    parser = optparse.OptionParser(usage="\n%s" % USAGE,
432
                                   version=("%%prog (ganeti) %s" %
433
                                            constants.RELEASE_VERSION),
434
                                   option_list=OPTIONS)
435

    
436
    options, args = parser.parse_args()
437
    if len(args) < 1 or options.os is None:
438
      Usage()
439

    
440
    supported_disk_templates = (constants.DT_DISKLESS,
441
                                constants.DT_FILE,
442
                                constants.DT_PLAIN,
443
                                constants.DT_DRBD8)
444
    if options.disk_template not in supported_disk_templates:
445
      Err("Unknown disk template '%s'" % options.disk_template)
446

    
447
    if options.disk_template == constants.DT_DISKLESS:
448
      disk_size = disk_growth = []
449
      options.do_addremove_disks = False
450
    else:
451
      disk_size = [utils.ParseUnit(v) for v in options.disk_size.split(",")]
452
      disk_growth = [utils.ParseUnit(v)
453
                     for v in options.disk_growth.split(",")]
454
      if len(disk_growth) != len(disk_size):
455
        Err("Wrong disk sizes/growth combination")
456
    if ((disk_size and options.disk_template == constants.DT_DISKLESS) or
457
        (not disk_size and options.disk_template != constants.DT_DISKLESS)):
458
      Err("Wrong disk count/disk template combination")
459

    
460
    self.disk_size = disk_size
461
    self.disk_growth = disk_growth
462
    self.disk_count = len(disk_size)
463

    
464
    if options.nodes and options.iallocator:
465
      Err("Give either the nodes option or the iallocator option, not both")
466

    
467
    if options.http_check and not options.name_check:
468
      Err("Can't enable HTTP checks without name checks")
469

    
470
    self.opts = options
471
    self.instances = args
472
    self.bep = {
473
      constants.BE_MEMORY: options.mem_size,
474
      constants.BE_VCPUS: 1,
475
      }
476
    self.hvp = {}
477

    
478
    socket.setdefaulttimeout(options.net_timeout)
479

    
480
  def GetState(self):
481
    """Read the cluster state from the master daemon."""
482
    if self.opts.nodes:
483
      names = self.opts.nodes.split(",")
484
    else:
485
      names = []
486
    try:
487
      op = opcodes.OpQueryNodes(output_fields=["name", "offline", "drained"],
488
                                names=names, use_locking=True)
489
      result = self.ExecOp(True, op)
490
    except errors.GenericError, err:
491
      err_code, msg = cli.FormatError(err)
492
      Err(msg, exit_code=err_code)
493
    self.nodes = [data[0] for data in result if not (data[1] or data[2])]
494

    
495
    op_diagnose = opcodes.OpDiagnoseOS(output_fields=["name", "valid",
496
                                                      "variants"], names=[])
497
    result = self.ExecOp(True, op_diagnose)
498

    
499
    if not result:
500
      Err("Can't get the OS list")
501

    
502
    found = False
503
    for (name, valid, variants) in result:
504
      if valid and self.opts.os in cli.CalculateOSNames(name, variants):
505
        found = True
506
        break
507

    
508
    if not found:
509
      Err("OS '%s' not found" % self.opts.os)
510

    
511
    cluster_info = self.cl.QueryClusterInfo()
512
    self.cluster_info = cluster_info
513
    if not self.cluster_info:
514
      Err("Can't get cluster info")
515

    
516
    default_nic_params = self.cluster_info["nicparams"][constants.PP_DEFAULT]
517
    self.cluster_default_nicparams = default_nic_params
518

    
519
  @_DoCheckInstances
520
  @_DoBatch(False)
521
  def BurnCreateInstances(self):
522
    """Create the given instances.
523

    
524
    """
525
    self.to_rem = []
526
    mytor = izip(cycle(self.nodes),
527
                 islice(cycle(self.nodes), 1, None),
528
                 self.instances)
529

    
530
    Log("Creating instances")
531
    for pnode, snode, instance in mytor:
532
      Log("instance %s", instance, indent=1)
533
      if self.opts.iallocator:
534
        pnode = snode = None
535
        msg = "with iallocator %s" % self.opts.iallocator
536
      elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
537
        snode = None
538
        msg = "on %s" % pnode
539
      else:
540
        msg = "on %s, %s" % (pnode, snode)
541

    
542
      Log(msg, indent=2)
543

    
544
      op = opcodes.OpCreateInstance(instance_name=instance,
545
                                    disks = [ {"size": size}
546
                                              for size in self.disk_size],
547
                                    disk_template=self.opts.disk_template,
548
                                    nics=self.opts.nics,
549
                                    mode=constants.INSTANCE_CREATE,
550
                                    os_type=self.opts.os,
551
                                    pnode=pnode,
552
                                    snode=snode,
553
                                    start=True,
554
                                    ip_check=self.opts.ip_check,
555
                                    name_check=self.opts.name_check,
556
                                    wait_for_sync=True,
557
                                    file_driver="loop",
558
                                    file_storage_dir=None,
559
                                    iallocator=self.opts.iallocator,
560
                                    beparams=self.bep,
561
                                    hvparams=self.hvp,
562
                                    )
563
      remove_instance = lambda name: lambda: self.to_rem.append(name)
564
      self.ExecOrQueue(instance, [op], post_process=remove_instance(instance))
565

    
566
  @_DoBatch(False)
567
  def BurnGrowDisks(self):
568
    """Grow both the os and the swap disks by the requested amount, if any."""
569
    Log("Growing disks")
570
    for instance in self.instances:
571
      Log("instance %s", instance, indent=1)
572
      for idx, growth in enumerate(self.disk_growth):
573
        if growth > 0:
574
          op = opcodes.OpGrowDisk(instance_name=instance, disk=idx,
575
                                  amount=growth, wait_for_sync=True)
576
          Log("increase disk/%s by %s MB", idx, growth, indent=2)
577
          self.ExecOrQueue(instance, [op])
578

    
579
  @_DoBatch(True)
580
  def BurnReplaceDisks1D8(self):
581
    """Replace disks on primary and secondary for drbd8."""
582
    Log("Replacing disks on the same nodes")
583
    for instance in self.instances:
584
      Log("instance %s", instance, indent=1)
585
      ops = []
586
      for mode in constants.REPLACE_DISK_SEC, constants.REPLACE_DISK_PRI:
587
        op = opcodes.OpReplaceDisks(instance_name=instance,
588
                                    mode=mode,
589
                                    disks=[i for i in range(self.disk_count)],
590
                                    early_release=self.opts.early_release)
591
        Log("run %s", mode, indent=2)
592
        ops.append(op)
593
      self.ExecOrQueue(instance, ops)
594

    
595
  @_DoBatch(True)
596
  def BurnReplaceDisks2(self):
597
    """Replace secondary node."""
598
    Log("Changing the secondary node")
599
    mode = constants.REPLACE_DISK_CHG
600

    
601
    mytor = izip(islice(cycle(self.nodes), 2, None),
602
                 self.instances)
603
    for tnode, instance in mytor:
604
      Log("instance %s", instance, indent=1)
605
      if self.opts.iallocator:
606
        tnode = None
607
        msg = "with iallocator %s" % self.opts.iallocator
608
      else:
609
        msg = tnode
610
      op = opcodes.OpReplaceDisks(instance_name=instance,
611
                                  mode=mode,
612
                                  remote_node=tnode,
613
                                  iallocator=self.opts.iallocator,
614
                                  disks=[],
615
                                  early_release=self.opts.early_release)
616
      Log("run %s %s", mode, msg, indent=2)
617
      self.ExecOrQueue(instance, [op])
618

    
619
  @_DoCheckInstances
620
  @_DoBatch(False)
621
  def BurnFailover(self):
622
    """Failover the instances."""
623
    Log("Failing over instances")
624
    for instance in self.instances:
625
      Log("instance %s", instance, indent=1)
626
      op = opcodes.OpFailoverInstance(instance_name=instance,
627
                                      ignore_consistency=False)
628
      self.ExecOrQueue(instance, [op])
629

    
630
  @_DoCheckInstances
631
  @_DoBatch(False)
632
  def BurnMove(self):
633
    """Move the instances."""
634
    Log("Moving instances")
635
    mytor = izip(islice(cycle(self.nodes), 1, None),
636
                 self.instances)
637
    for tnode, instance in mytor:
638
      Log("instance %s", instance, indent=1)
639
      op = opcodes.OpMoveInstance(instance_name=instance,
640
                                  target_node=tnode)
641
      self.ExecOrQueue(instance, [op])
642

    
643
  @_DoBatch(False)
644
  def BurnMigrate(self):
645
    """Migrate the instances."""
646
    Log("Migrating instances")
647
    for instance in self.instances:
648
      Log("instance %s", instance, indent=1)
649
      op1 = opcodes.OpMigrateInstance(instance_name=instance, live=True,
650
                                      cleanup=False)
651

    
652
      op2 = opcodes.OpMigrateInstance(instance_name=instance, live=True,
653
                                      cleanup=True)
654
      Log("migration and migration cleanup", indent=2)
655
      self.ExecOrQueue(instance, [op1, op2])
656

    
657
  @_DoCheckInstances
658
  @_DoBatch(False)
659
  def BurnImportExport(self):
660
    """Export the instance, delete it, and import it back.
661

    
662
    """
663
    Log("Exporting and re-importing instances")
664
    mytor = izip(cycle(self.nodes),
665
                 islice(cycle(self.nodes), 1, None),
666
                 islice(cycle(self.nodes), 2, None),
667
                 self.instances)
668

    
669
    for pnode, snode, enode, instance in mytor:
670
      Log("instance %s", instance, indent=1)
671
      # read the full name of the instance
672
      nam_op = opcodes.OpQueryInstances(output_fields=["name"],
673
                                        names=[instance], use_locking=True)
674
      full_name = self.ExecOp(False, nam_op)[0][0]
675

    
676
      if self.opts.iallocator:
677
        pnode = snode = None
678
        import_log_msg = ("import from %s"
679
                          " with iallocator %s" %
680
                          (enode, self.opts.iallocator))
681
      elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
682
        snode = None
683
        import_log_msg = ("import from %s to %s" %
684
                          (enode, pnode))
685
      else:
686
        import_log_msg = ("import from %s to %s, %s" %
687
                          (enode, pnode, snode))
688

    
689
      exp_op = opcodes.OpExportInstance(instance_name=instance,
690
                                           target_node=enode,
691
                                           shutdown=True)
692
      rem_op = opcodes.OpRemoveInstance(instance_name=instance,
693
                                        ignore_failures=True)
694
      imp_dir = utils.PathJoin(constants.EXPORT_DIR, full_name)
695
      imp_op = opcodes.OpCreateInstance(instance_name=instance,
696
                                        disks = [ {"size": size}
697
                                                  for size in self.disk_size],
698
                                        disk_template=self.opts.disk_template,
699
                                        nics=self.opts.nics,
700
                                        mode=constants.INSTANCE_IMPORT,
701
                                        src_node=enode,
702
                                        src_path=imp_dir,
703
                                        pnode=pnode,
704
                                        snode=snode,
705
                                        start=True,
706
                                        ip_check=self.opts.ip_check,
707
                                        name_check=self.opts.name_check,
708
                                        wait_for_sync=True,
709
                                        file_storage_dir=None,
710
                                        file_driver="loop",
711
                                        iallocator=self.opts.iallocator,
712
                                        beparams=self.bep,
713
                                        hvparams=self.hvp,
714
                                        )
715

    
716
      erem_op = opcodes.OpRemoveExport(instance_name=instance)
717

    
718
      Log("export to node %s", enode, indent=2)
719
      Log("remove instance", indent=2)
720
      Log(import_log_msg, indent=2)
721
      Log("remove export", indent=2)
722
      self.ExecOrQueue(instance, [exp_op, rem_op, imp_op, erem_op])
723

    
724
  @staticmethod
725
  def StopInstanceOp(instance):
726
    """Stop given instance."""
727
    return opcodes.OpShutdownInstance(instance_name=instance)
728

    
729
  @staticmethod
730
  def StartInstanceOp(instance):
731
    """Start given instance."""
732
    return opcodes.OpStartupInstance(instance_name=instance, force=False)
733

    
734
  @staticmethod
735
  def RenameInstanceOp(instance, instance_new):
736
    """Rename instance."""
737
    return opcodes.OpRenameInstance(instance_name=instance,
738
                                    new_name=instance_new)
739

    
740
  @_DoCheckInstances
741
  @_DoBatch(True)
742
  def BurnStopStart(self):
743
    """Stop/start the instances."""
744
    Log("Stopping and starting instances")
745
    for instance in self.instances:
746
      Log("instance %s", instance, indent=1)
747
      op1 = self.StopInstanceOp(instance)
748
      op2 = self.StartInstanceOp(instance)
749
      self.ExecOrQueue(instance, [op1, op2])
750

    
751
  @_DoBatch(False)
752
  def BurnRemove(self):
753
    """Remove the instances."""
754
    Log("Removing instances")
755
    for instance in self.to_rem:
756
      Log("instance %s", instance, indent=1)
757
      op = opcodes.OpRemoveInstance(instance_name=instance,
758
                                    ignore_failures=True)
759
      self.ExecOrQueue(instance, [op])
760

    
761
  def BurnRename(self):
762
    """Rename the instances.
763

    
764
    Note that this function will not execute in parallel, since we
765
    only have one target for rename.
766

    
767
    """
768
    Log("Renaming instances")
769
    rename = self.opts.rename
770
    for instance in self.instances:
771
      Log("instance %s", instance, indent=1)
772
      op_stop1 = self.StopInstanceOp(instance)
773
      op_stop2 = self.StopInstanceOp(rename)
774
      op_rename1 = self.RenameInstanceOp(instance, rename)
775
      op_rename2 = self.RenameInstanceOp(rename, instance)
776
      op_start1 = self.StartInstanceOp(rename)
777
      op_start2 = self.StartInstanceOp(instance)
778
      self.ExecOp(False, op_stop1, op_rename1, op_start1)
779
      self._CheckInstanceAlive(rename)
780
      self.ExecOp(False, op_stop2, op_rename2, op_start2)
781
      self._CheckInstanceAlive(instance)
782

    
783
  @_DoCheckInstances
784
  @_DoBatch(True)
785
  def BurnReinstall(self):
786
    """Reinstall the instances."""
787
    Log("Reinstalling instances")
788
    for instance in self.instances:
789
      Log("instance %s", instance, indent=1)
790
      op1 = self.StopInstanceOp(instance)
791
      op2 = opcodes.OpReinstallInstance(instance_name=instance)
792
      Log("reinstall without passing the OS", indent=2)
793
      op3 = opcodes.OpReinstallInstance(instance_name=instance,
794
                                        os_type=self.opts.os)
795
      Log("reinstall specifying the OS", indent=2)
796
      op4 = self.StartInstanceOp(instance)
797
      self.ExecOrQueue(instance, [op1, op2, op3, op4])
798

    
799
  @_DoCheckInstances
800
  @_DoBatch(True)
801
  def BurnReboot(self):
802
    """Reboot the instances."""
803
    Log("Rebooting instances")
804
    for instance in self.instances:
805
      Log("instance %s", instance, indent=1)
806
      ops = []
807
      for reboot_type in constants.REBOOT_TYPES:
808
        op = opcodes.OpRebootInstance(instance_name=instance,
809
                                      reboot_type=reboot_type,
810
                                      ignore_secondaries=False)
811
        Log("reboot with type '%s'", reboot_type, indent=2)
812
        ops.append(op)
813
      self.ExecOrQueue(instance, ops)
814

    
815
  @_DoCheckInstances
816
  @_DoBatch(True)
817
  def BurnActivateDisks(self):
818
    """Activate and deactivate disks of the instances."""
819
    Log("Activating/deactivating disks")
820
    for instance in self.instances:
821
      Log("instance %s", instance, indent=1)
822
      op_start = self.StartInstanceOp(instance)
823
      op_act = opcodes.OpActivateInstanceDisks(instance_name=instance)
824
      op_deact = opcodes.OpDeactivateInstanceDisks(instance_name=instance)
825
      op_stop = self.StopInstanceOp(instance)
826
      Log("activate disks when online", indent=2)
827
      Log("activate disks when offline", indent=2)
828
      Log("deactivate disks (when offline)", indent=2)
829
      self.ExecOrQueue(instance, [op_act, op_stop, op_act, op_deact, op_start])
830

    
831
  @_DoCheckInstances
832
  @_DoBatch(False)
833
  def BurnAddRemoveDisks(self):
834
    """Add and remove an extra disk for the instances."""
835
    Log("Adding and removing disks")
836
    for instance in self.instances:
837
      Log("instance %s", instance, indent=1)
838
      op_add = opcodes.OpSetInstanceParams(\
839
        instance_name=instance,
840
        disks=[(constants.DDM_ADD, {"size": self.disk_size[0]})])
841
      op_rem = opcodes.OpSetInstanceParams(\
842
        instance_name=instance, disks=[(constants.DDM_REMOVE, {})])
843
      op_stop = self.StopInstanceOp(instance)
844
      op_start = self.StartInstanceOp(instance)
845
      Log("adding a disk", indent=2)
846
      Log("removing last disk", indent=2)
847
      self.ExecOrQueue(instance, [op_add, op_stop, op_rem, op_start])
848

    
849
  @_DoBatch(False)
850
  def BurnAddRemoveNICs(self):
851
    """Add and remove an extra NIC for the instances."""
852
    Log("Adding and removing NICs")
853
    for instance in self.instances:
854
      Log("instance %s", instance, indent=1)
855
      op_add = opcodes.OpSetInstanceParams(\
856
        instance_name=instance, nics=[(constants.DDM_ADD, {})])
857
      op_rem = opcodes.OpSetInstanceParams(\
858
        instance_name=instance, nics=[(constants.DDM_REMOVE, {})])
859
      Log("adding a NIC", indent=2)
860
      Log("removing last NIC", indent=2)
861
      self.ExecOrQueue(instance, [op_add, op_rem])
862

    
863
  def ConfdCallback(self, reply):
864
    """Callback for confd queries"""
865
    if reply.type == confd_client.UPCALL_REPLY:
866
      if reply.server_reply.status != constants.CONFD_REPL_STATUS_OK:
867
        Err("Query %s gave non-ok status %s: %s" % (reply.orig_request,
868
                                                    reply.server_reply.status,
869
                                                    reply.server_reply))
870
      if reply.orig_request.type == constants.CONFD_REQ_PING:
871
        Log("Ping: OK", indent=1)
872
      elif reply.orig_request.type == constants.CONFD_REQ_CLUSTER_MASTER:
873
        if reply.server_reply.answer == self.cluster_info["master"]:
874
          Log("Master: OK", indent=1)
875
        else:
876
          Err("Master: wrong: %s" % reply.server_reply.answer)
877
      elif reply.orig_request.type == constants.CONFD_REQ_NODE_ROLE_BYNAME:
878
        if reply.server_reply.answer == constants.CONFD_NODE_ROLE_MASTER:
879
          Log("Node role for master: OK", indent=1)
880
        else:
881
          Err("Node role for master: wrong: %s" % reply.server_reply.answer)
882

    
883
  def DoConfdRequestReply(self, req):
884
    self.confd_counting_callback.RegisterQuery(req.rsalt)
885
    self.confd_client.SendRequest(req, async=False)
886
    while not self.confd_counting_callback.AllAnswered():
887
      if not self.confd_client.ReceiveReply():
888
        Err("Did not receive all expected confd replies")
889
        break
890

    
891
  def BurnConfd(self):
892
    """Run confd queries for our instances.
893

    
894
    The following confd queries are tested:
895
      - CONFD_REQ_PING: simple ping
896
      - CONFD_REQ_CLUSTER_MASTER: cluster master
897
      - CONFD_REQ_NODE_ROLE_BYNAME: node role, for the master
898

    
899
    """
900
    Log("Checking confd results")
901

    
902
    filter_callback = confd_client.ConfdFilterCallback(self.ConfdCallback)
903
    counting_callback = confd_client.ConfdCountingCallback(filter_callback)
904
    self.confd_counting_callback = counting_callback
905

    
906
    self.confd_client = confd_client.GetConfdClient(counting_callback)
907

    
908
    req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
909
    self.DoConfdRequestReply(req)
910

    
911
    req = confd_client.ConfdClientRequest(
912
      type=constants.CONFD_REQ_CLUSTER_MASTER)
913
    self.DoConfdRequestReply(req)
914

    
915
    req = confd_client.ConfdClientRequest(
916
        type=constants.CONFD_REQ_NODE_ROLE_BYNAME,
917
        query=self.cluster_info["master"])
918
    self.DoConfdRequestReply(req)
919

    
920
  def _CheckInstanceAlive(self, instance):
921
    """Check if an instance is alive by doing http checks.
922

    
923
    This will try to retrieve the url on the instance /hostname.txt
924
    and check that it contains the hostname of the instance. In case
925
    we get ECONNREFUSED, we retry up to the net timeout seconds, for
926
    any other error we abort.
927

    
928
    """
929
    if not self.opts.http_check:
930
      return
931
    end_time = time.time() + self.opts.net_timeout
932
    url = None
933
    while time.time() < end_time and url is None:
934
      try:
935
        url = self.url_opener.open("http://%s/hostname.txt" % instance)
936
      except IOError:
937
        # here we can have connection refused, no route to host, etc.
938
        time.sleep(1)
939
    if url is None:
940
      raise InstanceDown(instance, "Cannot contact instance")
941
    hostname = url.read().strip()
942
    url.close()
943
    if hostname != instance:
944
      raise InstanceDown(instance, ("Hostname mismatch, expected %s, got %s" %
945
                                    (instance, hostname)))
946

    
947
  def BurninCluster(self):
948
    """Test a cluster intensively.
949

    
950
    This will create instances and then start/stop/failover them.
951
    It is safe for existing instances but could impact performance.
952

    
953
    """
954

    
955
    opts = self.opts
956

    
957
    Log("Testing global parameters")
958

    
959
    if (len(self.nodes) == 1 and
960
        opts.disk_template not in (constants.DT_DISKLESS, constants.DT_PLAIN,
961
                                   constants.DT_FILE)):
962
      Err("When one node is available/selected the disk template must"
963
          " be 'diskless', 'file' or 'plain'")
964

    
965
    has_err = True
966
    try:
967
      self.BurnCreateInstances()
968
      if opts.do_replace1 and opts.disk_template in constants.DTS_NET_MIRROR:
969
        self.BurnReplaceDisks1D8()
970
      if (opts.do_replace2 and len(self.nodes) > 2 and
971
          opts.disk_template in constants.DTS_NET_MIRROR) :
972
        self.BurnReplaceDisks2()
973

    
974
      if (opts.disk_template in constants.DTS_GROWABLE and
975
          utils.any(self.disk_growth, lambda n: n > 0)):
976
        self.BurnGrowDisks()
977

    
978
      if opts.do_failover and opts.disk_template in constants.DTS_NET_MIRROR:
979
        self.BurnFailover()
980

    
981
      if opts.do_migrate and opts.disk_template == constants.DT_DRBD8:
982
        self.BurnMigrate()
983

    
984
      if (opts.do_move and len(self.nodes) > 1 and
985
          opts.disk_template in [constants.DT_PLAIN, constants.DT_FILE]):
986
        self.BurnMove()
987

    
988
      if (opts.do_importexport and
989
          opts.disk_template not in (constants.DT_DISKLESS,
990
                                     constants.DT_FILE)):
991
        self.BurnImportExport()
992

    
993
      if opts.do_reinstall:
994
        self.BurnReinstall()
995

    
996
      if opts.do_reboot:
997
        self.BurnReboot()
998

    
999
      if opts.do_addremove_disks:
1000
        self.BurnAddRemoveDisks()
1001

    
1002
      default_nic_mode = self.cluster_default_nicparams[constants.NIC_MODE]
1003
      # Don't add/remove nics in routed mode, as we would need an ip to add
1004
      # them with
1005
      if opts.do_addremove_nics:
1006
        if default_nic_mode == constants.NIC_MODE_BRIDGED:
1007
          self.BurnAddRemoveNICs()
1008
        else:
1009
          Log("Skipping nic add/remove as the cluster is not in bridged mode")
1010

    
1011
      if opts.do_activate_disks:
1012
        self.BurnActivateDisks()
1013

    
1014
      if opts.rename:
1015
        self.BurnRename()
1016

    
1017
      if opts.do_confd_tests:
1018
        self.BurnConfd()
1019

    
1020
      if opts.do_startstop:
1021
        self.BurnStopStart()
1022

    
1023
      has_err = False
1024
    finally:
1025
      if has_err:
1026
        Log("Error detected: opcode buffer follows:\n\n")
1027
        Log(self.GetFeedbackBuf())
1028
        Log("\n\n")
1029
      if not self.opts.keep_instances:
1030
        try:
1031
          self.BurnRemove()
1032
        except Exception, err:  # pylint: disable-msg=W0703
1033
          if has_err: # already detected errors, so errors in removal
1034
                      # are quite expected
1035
            Log("Note: error detected during instance remove: %s", err)
1036
          else: # non-expected error
1037
            raise
1038

    
1039
    return 0
1040

    
1041

    
1042
def main():
1043
  """Main function"""
1044

    
1045
  burner = Burner()
1046
  return burner.BurninCluster()
1047

    
1048

    
1049
if __name__ == "__main__":
1050
  main()