Revision c723c163

b/tools/burnin
29 29
import time
30 30
import socket
31 31
import urllib
32
import errno
33 32
from itertools import izip, islice, cycle
34 33
from cStringIO import StringIO
35 34

  
36 35
from ganeti import opcodes
37
from ganeti import mcpu
38 36
from ganeti import constants
39 37
from ganeti import cli
40 38
from ganeti import errors
......
106 104
    self.nodes = []
107 105
    self.instances = []
108 106
    self.to_rem = []
107
    self.queued_ops = []
109 108
    self.opts = None
110 109
    self.ParseOptions()
111 110
    self.cl = cli.GetClient()
......
126 125
    if self.opts.verbose:
127 126
      Log(msg, indent=3)
128 127

  
129
  def ExecOp(self, op):
128
  def ExecOp(self, *ops):
129
    """Execute one or more opcodes and manage the exec buffer.
130

  
131
    @result: if only opcode has been passed, we return its result;
132
        otherwise we return the list of results
133

  
134
    """
135
    job_id = cli.SendJob(ops, cl=self.cl)
136
    results = cli.PollJob(job_id, cl=self.cl, feedback_fn=self.Feedback)
137
    if len(ops) == 1:
138
      return results[0]
139
    else:
140
      return results
141

  
142
  def ExecOrQueue(self, name, *ops):
130 143
    """Execute an opcode and manage the exec buffer."""
131
    self.ClearFeedbackBuf()
132
    return cli.SubmitOpCode(op, feedback_fn=self.Feedback, cl=self.cl)
144
    if self.opts.parallel:
145
      self.queued_ops.append((ops, name))
146
    else:
147
      return self.ExecOp(*ops)
148

  
149
  def CommitQueue(self):
150
    """Execute all submitted opcodes in case of parallel burnin"""
151
    if not self.opts.parallel:
152
      return
153

  
154
    try:
155
      results = self.ExecJobSet(self.queued_ops)
156
    finally:
157
      self.queued_ops = []
158
    return results
133 159

  
134 160
  def ExecJobSet(self, jobs):
135 161
    """Execute a set of jobs and return once all are done.
136 162

  
137 163
    The method will return the list of results, if all jobs are
138
    successfull. Otherwise, OpExecError will be raised from within
164
    successful. Otherwise, OpExecError will be raised from within
139 165
    cli.py.
140 166

  
141 167
    """
142 168
    self.ClearFeedbackBuf()
143
    job_ids = [cli.SendJob(job, cl=self.cl) for job in jobs]
144
    Log("Submitted job IDs %s" % ", ".join(job_ids), indent=1)
169
    job_ids = [cli.SendJob(row[0], cl=self.cl) for row in jobs]
170
    Log("Submitted job ID(s) %s" % ", ".join(job_ids), indent=1)
145 171
    results = []
146
    for jid in job_ids:
147
      Log("Waiting for job %s" % jid, indent=2)
172
    for jid, (_, iname) in zip(job_ids, jobs):
173
      Log("waiting for job %s for %s" % (jid, iname), indent=2)
148 174
      results.append(cli.PollJob(jid, cl=self.cl, feedback_fn=self.Feedback))
149 175

  
150 176
    return results
......
316 342
    if self.opts.os not in os_set:
317 343
      Err("OS '%s' not found" % self.opts.os)
318 344

  
319
  def CreateInstances(self):
345
  def BurnCreateInstances(self):
320 346
    """Create the given instances.
321 347

  
322 348
    """
......
324 350
    mytor = izip(cycle(self.nodes),
325 351
                 islice(cycle(self.nodes), 1, None),
326 352
                 self.instances)
327
    jobset = []
328 353

  
329 354
    Log("Creating instances")
330 355
    for pnode, snode, instance in mytor:
......
359 384
                                    hvparams=self.hvp,
360 385
                                    )
361 386

  
362
      if self.opts.parallel:
363
        jobset.append([op])
364
        # FIXME: here we should not append to to_rem uncoditionally,
365
        # but only when the job is successful
366
        self.to_rem.append(instance)
367
      else:
368
        self.ExecOp(op)
369
        self.to_rem.append(instance)
370
    if self.opts.parallel:
371
      self.ExecJobSet(jobset)
387
      self.ExecOrQueue(instance, op)
388
      self.to_rem.append(instance)
389

  
390
    self.CommitQueue()
372 391

  
373 392
    for instance in self.instances:
374 393
      self._CheckInstanceAlive(instance)
375 394

  
376
  def GrowDisks(self):
395
  def BurnGrowDisks(self):
377 396
    """Grow both the os and the swap disks by the requested amount, if any."""
378 397
    Log("Growing disks")
379 398
    for instance in self.instances:
......
383 402
          op = opcodes.OpGrowDisk(instance_name=instance, disk=idx,
384 403
                                  amount=growth, wait_for_sync=True)
385 404
          Log("increase disk/%s by %s MB" % (idx, growth), indent=2)
386
          self.ExecOp(op)
405
          self.ExecOrQueue(instance, op)
406
    self.CommitQueue()
387 407

  
388
  def ReplaceDisks1D8(self):
408
  def BurnReplaceDisks1D8(self):
389 409
    """Replace disks on primary and secondary for drbd8."""
390 410
    Log("Replacing disks on the same nodes")
391 411
    for instance in self.instances:
392 412
      Log("instance %s" % instance, indent=1)
413
      ops = []
393 414
      for mode in constants.REPLACE_DISK_SEC, constants.REPLACE_DISK_PRI:
394 415
        op = opcodes.OpReplaceDisks(instance_name=instance,
395 416
                                    mode=mode,
396 417
                                    disks=[i for i in range(self.disk_count)])
397 418
        Log("run %s" % mode, indent=2)
398
        self.ExecOp(op)
419
        ops.append(op)
420
      self.ExecOrQueue(instance, *ops)
421
    self.CommitQueue()
399 422

  
400
  def ReplaceDisks2(self):
423
  def BurnReplaceDisks2(self):
401 424
    """Replace secondary node."""
402 425
    Log("Changing the secondary node")
403 426
    mode = constants.REPLACE_DISK_CHG
......
417 440
                                  iallocator=self.opts.iallocator,
418 441
                                  disks=[i for i in range(self.disk_count)])
419 442
      Log("run %s %s" % (mode, msg), indent=2)
420
      self.ExecOp(op)
443
      self.ExecOrQueue(instance, op)
444
    self.CommitQueue()
421 445

  
422
  def Failover(self):
446
  def BurnFailover(self):
423 447
    """Failover the instances."""
424 448
    Log("Failing over instances")
425 449
    for instance in self.instances:
......
427 451
      op = opcodes.OpFailoverInstance(instance_name=instance,
428 452
                                      ignore_consistency=False)
429 453

  
430
      self.ExecOp(op)
454
      self.ExecOrQueue(instance, op)
455
    self.CommitQueue()
431 456
    for instance in self.instances:
432 457
      self._CheckInstanceAlive(instance)
433 458

  
434
  def Migrate(self):
459
  def BurnMigrate(self):
435 460
    """Migrate the instances."""
436 461
    Log("Migrating instances")
437 462
    for instance in self.instances:
438 463
      Log("instance %s" % instance, indent=1)
439
      op = opcodes.OpMigrateInstance(instance_name=instance, live=True,
440
                                     cleanup=False)
464
      op1 = opcodes.OpMigrateInstance(instance_name=instance, live=True,
465
                                      cleanup=False)
441 466

  
442
      Log("migration", indent=2)
443
      self.ExecOp(op)
444
      op = opcodes.OpMigrateInstance(instance_name=instance, live=True,
445
                                     cleanup=True)
446
      Log("migration cleanup", indent=2)
447
      self.ExecOp(op)
467
      op2 = opcodes.OpMigrateInstance(instance_name=instance, live=True,
468
                                      cleanup=True)
469
      Log("migration and migration cleanup", indent=2)
470
      self.ExecOrQueue(instance, op1, op2)
471
    self.CommitQueue()
448 472

  
449
  def ImportExport(self):
473
  def BurnImportExport(self):
450 474
    """Export the instance, delete it, and import it back.
451 475

  
452 476
    """
......
458 482

  
459 483
    for pnode, snode, enode, instance in mytor:
460 484
      Log("instance %s" % instance, indent=1)
485
      # read the full name of the instance
486
      nam_op = opcodes.OpQueryInstances(output_fields=["name"],
487
                                           names=[instance])
488
      full_name = self.ExecOp(nam_op)[0][0]
489

  
461 490
      if self.opts.iallocator:
462 491
        pnode = snode = None
463 492
        import_log_msg = ("import from %s"
......
476 505
                                           shutdown=True)
477 506
      rem_op = opcodes.OpRemoveInstance(instance_name=instance,
478 507
                                        ignore_failures=True)
479
      nam_op = opcodes.OpQueryInstances(output_fields=["name"],
480
                                           names=[instance])
481
      full_name = self.ExecOp(nam_op)[0][0]
482 508
      imp_dir = os.path.join(constants.EXPORT_DIR, full_name)
483 509
      imp_op = opcodes.OpCreateInstance(instance_name=instance,
484 510
                                        disks = [ {"size": size}
......
503 529
      erem_op = opcodes.OpRemoveExport(instance_name=instance)
504 530

  
505 531
      Log("export to node %s" % enode, indent=2)
506
      self.ExecOp(exp_op)
507 532
      Log("remove instance", indent=2)
508
      self.ExecOp(rem_op)
509
      self.to_rem.remove(instance)
510 533
      Log(import_log_msg, indent=2)
511
      self.ExecOp(imp_op)
512 534
      Log("remove export", indent=2)
513
      self.ExecOp(erem_op)
514

  
515
      self.to_rem.append(instance)
535
      self.ExecOrQueue(instance, exp_op, rem_op, imp_op, erem_op)
516 536

  
537
    self.CommitQueue()
517 538
    for instance in self.instances:
518 539
      self._CheckInstanceAlive(instance)
519 540

  
520
  def StopInstance(self, instance):
541
  def StopInstanceOp(self, instance):
521 542
    """Stop given instance."""
522
    op = opcodes.OpShutdownInstance(instance_name=instance)
523
    Log("shutdown", indent=2)
524
    self.ExecOp(op)
543
    return opcodes.OpShutdownInstance(instance_name=instance)
525 544

  
526
  def StartInstance(self, instance):
545
  def StartInstanceOp(self, instance):
527 546
    """Start given instance."""
528
    op = opcodes.OpStartupInstance(instance_name=instance, force=False)
529
    Log("startup", indent=2)
530
    self.ExecOp(op)
547
    return opcodes.OpStartupInstance(instance_name=instance, force=False)
531 548

  
532
  def RenameInstance(self, instance, instance_new):
549
  def RenameInstanceOp(self, instance, instance_new):
533 550
    """Rename instance."""
534
    op = opcodes.OpRenameInstance(instance_name=instance,
535
                                  new_name=instance_new)
536
    Log("rename to %s" % instance_new, indent=2)
537
    self.ExecOp(op)
551
    return opcodes.OpRenameInstance(instance_name=instance,
552
                                    new_name=instance_new)
538 553

  
539
  def StopStart(self):
554
  def BurnStopStart(self):
540 555
    """Stop/start the instances."""
541 556
    Log("Stopping and starting instances")
542 557
    for instance in self.instances:
543 558
      Log("instance %s" % instance, indent=1)
544
      self.StopInstance(instance)
545
      self.StartInstance(instance)
559
      op1 = self.StopInstanceOp(instance)
560
      op2 = self.StartInstanceOp(instance)
561
      self.ExecOrQueue(instance, op1, op2)
562

  
563
    self.CommitQueue()
546 564

  
547 565
    for instance in self.instances:
548 566
      self._CheckInstanceAlive(instance)
549 567

  
550
  def Remove(self):
568
  def BurnRemove(self):
551 569
    """Remove the instances."""
552 570
    Log("Removing instances")
553 571
    for instance in self.to_rem:
554 572
      Log("instance %s" % instance, indent=1)
555 573
      op = opcodes.OpRemoveInstance(instance_name=instance,
556 574
                                    ignore_failures=True)
557
      self.ExecOp(op)
575
      self.ExecOrQueue(instance, op)
576

  
577
    self.CommitQueue()
578

  
579
  def BurnRename(self):
580
    """Rename the instances.
558 581

  
559
  def Rename(self):
560
    """Rename the instances."""
582
    Note that this function will not execute in parallel, since we
583
    only have one target for rename.
584

  
585
    """
561 586
    Log("Renaming instances")
562 587
    rename = self.opts.rename
563 588
    for instance in self.instances:
564 589
      Log("instance %s" % instance, indent=1)
565
      self.StopInstance(instance)
566
      self.RenameInstance(instance, rename)
567
      self.StartInstance(rename)
590
      op_stop = self.StopInstanceOp(instance)
591
      op_rename1 = self.RenameInstanceOp(instance, rename)
592
      op_rename2 = self.RenameInstanceOp(rename, instance)
593
      op_start1 = self.StartInstanceOp(rename)
594
      op_start2 = self.StartInstanceOp(instance)
595
      self.ExecOp(op_stop, op_rename1, op_start1)
568 596
      self._CheckInstanceAlive(rename)
569
      self.StopInstance(rename)
570
      self.RenameInstance(rename, instance)
571
      self.StartInstance(instance)
572

  
573
    for instance in self.instances:
597
      self.ExecOp(op_stop, op_rename2, op_start2)
574 598
      self._CheckInstanceAlive(instance)
575 599

  
576
  def Reinstall(self):
600
  def BurnReinstall(self):
577 601
    """Reinstall the instances."""
578 602
    Log("Reinstalling instances")
579 603
    for instance in self.instances:
580 604
      Log("instance %s" % instance, indent=1)
581
      self.StopInstance(instance)
582
      op = opcodes.OpReinstallInstance(instance_name=instance)
605
      op1 = self.StopInstanceOp(instance)
606
      op2 = opcodes.OpReinstallInstance(instance_name=instance)
583 607
      Log("reinstall without passing the OS", indent=2)
584
      self.ExecOp(op)
585
      op = opcodes.OpReinstallInstance(instance_name=instance,
586
                                       os_type=self.opts.os)
608
      op3 = opcodes.OpReinstallInstance(instance_name=instance,
609
                                        os_type=self.opts.os)
587 610
      Log("reinstall specifying the OS", indent=2)
588
      self.ExecOp(op)
589
      self.StartInstance(instance)
611
      op4 = self.StartInstanceOp(instance)
612
      self.ExecOrQueue(instance, op1, op2, op3, op4)
613

  
614
    self.CommitQueue()
615

  
590 616
    for instance in self.instances:
591 617
      self._CheckInstanceAlive(instance)
592 618

  
593
  def Reboot(self):
619
  def BurnReboot(self):
594 620
    """Reboot the instances."""
595 621
    Log("Rebooting instances")
596 622
    for instance in self.instances:
597 623
      Log("instance %s" % instance, indent=1)
624
      ops = []
598 625
      for reboot_type in constants.REBOOT_TYPES:
599 626
        op = opcodes.OpRebootInstance(instance_name=instance,
600 627
                                      reboot_type=reboot_type,
601 628
                                      ignore_secondaries=False)
602 629
        Log("reboot with type '%s'" % reboot_type, indent=2)
603
        self.ExecOp(op)
604
        self._CheckInstanceAlive(instance)
630
        ops.append(op)
631
      self.ExecOrQueue(instance, *ops)
632

  
633
    self.CommitQueue()
634

  
635
    for instance in self.instances:
636
      self._CheckInstanceAlive(instance)
605 637

  
606
  def ActivateDisks(self):
638
  def BurnActivateDisks(self):
607 639
    """Activate and deactivate disks of the instances."""
608 640
    Log("Activating/deactivating disks")
609 641
    for instance in self.instances:
610 642
      Log("instance %s" % instance, indent=1)
643
      op_start = self.StartInstanceOp(instance)
611 644
      op_act = opcodes.OpActivateInstanceDisks(instance_name=instance)
612 645
      op_deact = opcodes.OpDeactivateInstanceDisks(instance_name=instance)
646
      op_stop = self.StopInstanceOp(instance)
613 647
      Log("activate disks when online", indent=2)
614
      self.ExecOp(op_act)
615
      self.StopInstance(instance)
616 648
      Log("activate disks when offline", indent=2)
617
      self.ExecOp(op_act)
618 649
      Log("deactivate disks (when offline)", indent=2)
619
      self.ExecOp(op_deact)
620
      self.StartInstance(instance)
650
      self.ExecOrQueue(instance, op_act, op_stop, op_act, op_deact, op_start)
651
    self.CommitQueue()
621 652
    for instance in self.instances:
622 653
      self._CheckInstanceAlive(instance)
623 654

  
624
  def AddRemoveDisks(self):
655
  def BurnAddRemoveDisks(self):
625 656
    """Add and remove an extra disk for the instances."""
626 657
    Log("Adding and removing disks")
627 658
    for instance in self.instances:
......
631 662
        disks=[(constants.DDM_ADD, {"size": self.disk_size[0]})])
632 663
      op_rem = opcodes.OpSetInstanceParams(\
633 664
        instance_name=instance, disks=[(constants.DDM_REMOVE, {})])
665
      op_stop = self.StopInstanceOp(instance)
666
      op_start = self.StartInstanceOp(instance)
634 667
      Log("adding a disk", indent=2)
635
      self.ExecOp(op_add)
636
      self.StopInstance(instance)
637 668
      Log("removing last disk", indent=2)
638
      self.ExecOp(op_rem)
639
      self.StartInstance(instance)
669
      self.ExecOrQueue(instance, op_add, op_stop, op_rem, op_start)
670
    self.CommitQueue()
640 671
    for instance in self.instances:
641 672
      self._CheckInstanceAlive(instance)
642 673

  
643
  def AddRemoveNICs(self):
674
  def BurnAddRemoveNICs(self):
644 675
    """Add and remove an extra NIC for the instances."""
645 676
    Log("Adding and removing NICs")
646 677
    for instance in self.instances:
......
650 681
      op_rem = opcodes.OpSetInstanceParams(\
651 682
        instance_name=instance, nics=[(constants.DDM_REMOVE, {})])
652 683
      Log("adding a NIC", indent=2)
653
      self.ExecOp(op_add)
654 684
      Log("removing last NIC", indent=2)
655
      self.ExecOp(op_rem)
685
      self.ExecOrQueue(instance, op_add, op_rem)
686
    self.CommitQueue()
656 687

  
657 688
  def _CheckInstanceAlive(self, instance):
658 689
    """Check if an instance is alive by doing http checks.
......
670 701
    while time.time() < end_time and url is None:
671 702
      try:
672 703
        url = self.url_opener.open("http://%s/hostname.txt" % instance)
673
      except IOError, err:
704
      except IOError:
674 705
        # here we can have connection refused, no route to host, etc.
675 706
        time.sleep(1)
676 707
    if url is None:
......
701 732

  
702 733
    has_err = True
703 734
    try:
704
      self.CreateInstances()
735
      self.BurnCreateInstances()
705 736
      if opts.do_replace1 and opts.disk_template in constants.DTS_NET_MIRROR:
706
        self.ReplaceDisks1D8()
737
        self.BurnReplaceDisks1D8()
707 738
      if (opts.do_replace2 and len(self.nodes) > 2 and
708 739
          opts.disk_template in constants.DTS_NET_MIRROR) :
709
        self.ReplaceDisks2()
740
        self.BurnReplaceDisks2()
710 741

  
711 742
      if (opts.disk_template != constants.DT_DISKLESS and
712 743
          utils.any(self.disk_growth, lambda n: n > 0)):
713
        self.GrowDisks()
744
        self.BurnGrowDisks()
714 745

  
715 746
      if opts.do_failover and opts.disk_template in constants.DTS_NET_MIRROR:
716
        self.Failover()
747
        self.BurnFailover()
717 748

  
718 749
      if opts.do_migrate and opts.disk_template == constants.DT_DRBD8:
719
        self.Migrate()
750
        self.BurnMigrate()
720 751

  
721 752
      if (opts.do_importexport and
722 753
          opts.disk_template not in (constants.DT_DISKLESS,
723 754
                                     constants.DT_FILE)):
724
        self.ImportExport()
755
        self.BurnImportExport()
725 756

  
726 757
      if opts.do_reinstall:
727
        self.Reinstall()
758
        self.BurnReinstall()
728 759

  
729 760
      if opts.do_reboot:
730
        self.Reboot()
761
        self.BurnReboot()
731 762

  
732 763
      if opts.do_addremove_disks:
733
        self.AddRemoveDisks()
764
        self.BurnAddRemoveDisks()
734 765

  
735 766
      if opts.do_addremove_nics:
736
        self.AddRemoveNICs()
767
        self.BurnAddRemoveNICs()
737 768

  
738 769
      if opts.do_activate_disks:
739
        self.ActivateDisks()
770
        self.BurnActivateDisks()
740 771

  
741 772
      if opts.rename:
742
        self.Rename()
773
        self.BurnRename()
743 774

  
744 775
      if opts.do_startstop:
745
        self.StopStart()
776
        self.BurnStopStart()
746 777

  
747 778
      has_err = False
748 779
    finally:
......
751 782
        Log(self.GetFeedbackBuf())
752 783
        Log("\n\n")
753 784
      if not self.opts.keep_instances:
754
        self.Remove()
785
        self.BurnRemove()
755 786

  
756 787
    return 0
757 788

  

Also available in: Unified diff