Revision 87e25be1

b/Makefile.am
316 316
	lib/cmdlib/node.py \
317 317
	lib/cmdlib/instance.py \
318 318
	lib/cmdlib/instance_storage.py \
319
	lib/cmdlib/instance_migration.py \
319 320
	lib/cmdlib/instance_utils.py \
320 321
	lib/cmdlib/backup.py \
321 322
	lib/cmdlib/query.py \
b/lib/cmdlib/__init__.py
79 79
  LUInstanceReinstall, \
80 80
  LUInstanceReboot, \
81 81
  LUInstanceConsole, \
82
  LUInstanceFailover, \
83
  LUInstanceMigrate, \
84 82
  LUInstanceMultiAlloc, \
85 83
  LUInstanceSetParams, \
86 84
  LUInstanceChangeGroup
......
90 88
  LUInstanceReplaceDisks, \
91 89
  LUInstanceActivateDisks, \
92 90
  LUInstanceDeactivateDisks
91
from ganeti.cmdlib.instance_migration import \
92
  LUInstanceFailover, \
93
  LUInstanceMigrate
93 94
from ganeti.cmdlib.backup import \
94 95
  LUBackupQuery, \
95 96
  LUBackupPrepare, \
b/lib/cmdlib/instance.py
27 27
import logging
28 28
import operator
29 29
import os
30
import time
31 30

  
32 31
from ganeti import compat
33 32
from ganeti import constants
......
47 46
from ganeti import query
48 47

  
49 48
from ganeti.cmdlib.base import NoHooksLU, LogicalUnit, _QueryBase, \
50
  ResultWithJobs, Tasklet
49
  ResultWithJobs
51 50

  
52 51
from ganeti.cmdlib.common import INSTANCE_ONLINE, INSTANCE_DOWN, \
53 52
  INSTANCE_NOT_RUNNING, CAN_CHANGE_INSTANCE_OFFLINE, _CheckNodeOnline, \
......
58 57
  _GetUpdatedParams, _ExpandInstanceName, _ComputeIPolicySpecViolation, \
59 58
  _CheckInstanceState, _ExpandNodeName
60 59
from ganeti.cmdlib.instance_storage import _CreateDisks, \
61
  _CheckNodesFreeDiskPerVG, _WipeDisks, _WaitForSync, _CheckDiskConsistency, \
60
  _CheckNodesFreeDiskPerVG, _WipeDisks, _WaitForSync, \
62 61
  _IsExclusiveStorageEnabledNodeName, _CreateSingleBlockDev, _ComputeDisks, \
63 62
  _CheckRADOSFreeSpace, _ComputeDiskSizePerVG, _GenerateDiskTemplate, \
64 63
  _CreateBlockDev, _StartInstanceDisks, _ShutdownInstanceDisks, \
65
  _AssembleInstanceDisks, _ExpandCheckDisks
64
  _AssembleInstanceDisks
66 65
from ganeti.cmdlib.instance_utils import _BuildInstanceHookEnvByObject, \
67 66
  _GetClusterDomainSecret, _BuildInstanceHookEnv, _NICListToTuple, \
68 67
  _NICToTuple, _CheckNodeNotDrained, _RemoveInstance, _CopyLockList, \
69 68
  _ReleaseLocks, _CheckNodeVmCapable, _CheckTargetNodeIPolicy, \
70
  _GetInstanceInfoText, _RemoveDisks
69
  _GetInstanceInfoText, _RemoveDisks, _CheckNodeFreeMemory, \
70
  _CheckInstanceBridgesExist, _CheckNicsBridgesExist
71 71

  
72 72
import ganeti.masterd.instance
73 73

  
......
338 338
    _CheckOSVariant(result.payload, os_name)
339 339

  
340 340

  
341
def _CheckNicsBridgesExist(lu, target_nics, target_node):
342
  """Check that the brigdes needed by a list of nics exist.
343

  
344
  """
345
  cluster = lu.cfg.GetClusterInfo()
346
  paramslist = [cluster.SimpleFillNIC(nic.nicparams) for nic in target_nics]
347
  brlist = [params[constants.NIC_LINK] for params in paramslist
348
            if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
349
  if brlist:
350
    result = lu.rpc.call_bridges_exist(target_node, brlist)
351
    result.Raise("Error checking bridges on destination node '%s'" %
352
                 target_node, prereq=True, ecode=errors.ECODE_ENVIRON)
353

  
354

  
355
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
356
  """Checks if a node has enough free memory.
357

  
358
  This function checks if a given node has the needed amount of free
359
  memory. In case the node has less memory or we cannot get the
360
  information from the node, this function raises an OpPrereqError
361
  exception.
362

  
363
  @type lu: C{LogicalUnit}
364
  @param lu: a logical unit from which we get configuration data
365
  @type node: C{str}
366
  @param node: the node to check
367
  @type reason: C{str}
368
  @param reason: string to use in the error message
369
  @type requested: C{int}
370
  @param requested: the amount of memory in MiB to check for
371
  @type hypervisor_name: C{str}
372
  @param hypervisor_name: the hypervisor to ask for memory stats
373
  @rtype: integer
374
  @return: node current free memory
375
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
376
      we cannot check the node
377

  
378
  """
379
  nodeinfo = lu.rpc.call_node_info([node], None, [hypervisor_name], False)
380
  nodeinfo[node].Raise("Can't get data from node %s" % node,
381
                       prereq=True, ecode=errors.ECODE_ENVIRON)
382
  (_, _, (hv_info, )) = nodeinfo[node].payload
383

  
384
  free_mem = hv_info.get("memory_free", None)
385
  if not isinstance(free_mem, int):
386
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
387
                               " was '%s'" % (node, free_mem),
388
                               errors.ECODE_ENVIRON)
389
  if requested > free_mem:
390
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
391
                               " needed %s MiB, available %s MiB" %
392
                               (node, reason, requested, free_mem),
393
                               errors.ECODE_NORES)
394
  return free_mem
395

  
396

  
397 341
class LUInstanceCreate(LogicalUnit):
398 342
  """Create an instance.
399 343

  
......
1680 1624
    _RemoveInstance(self, feedback_fn, instance, self.op.ignore_failures)
1681 1625

  
1682 1626

  
1683
def _CheckInstanceBridgesExist(lu, instance, node=None):
1684
  """Check that the brigdes needed by an instance exist.
1685

  
1686
  """
1687
  if node is None:
1688
    node = instance.primary_node
1689
  _CheckNicsBridgesExist(lu, instance.nics, node)
1690

  
1691

  
1692 1627
class LUInstanceMove(LogicalUnit):
1693 1628
  """Move an instance by data-copying.
1694 1629

  
......
2743 2678
    return _GetInstanceConsole(self.cfg.GetClusterInfo(), instance)
2744 2679

  
2745 2680

  
2746
def _DeclareLocksForMigration(lu, level):
2747
  """Declares locks for L{TLMigrateInstance}.
2748

  
2749
  @type lu: L{LogicalUnit}
2750
  @param level: Lock level
2751

  
2752
  """
2753
  if level == locking.LEVEL_NODE_ALLOC:
2754
    assert lu.op.instance_name in lu.owned_locks(locking.LEVEL_INSTANCE)
2755

  
2756
    instance = lu.cfg.GetInstanceInfo(lu.op.instance_name)
2757

  
2758
    # Node locks are already declared here rather than at LEVEL_NODE as we need
2759
    # the instance object anyway to declare the node allocation lock.
2760
    if instance.disk_template in constants.DTS_EXT_MIRROR:
2761
      if lu.op.target_node is None:
2762
        lu.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2763
        lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
2764
      else:
2765
        lu.needed_locks[locking.LEVEL_NODE] = [instance.primary_node,
2766
                                               lu.op.target_node]
2767
      del lu.recalculate_locks[locking.LEVEL_NODE]
2768
    else:
2769
      lu._LockInstancesNodes() # pylint: disable=W0212
2770

  
2771
  elif level == locking.LEVEL_NODE:
2772
    # Node locks are declared together with the node allocation lock
2773
    assert (lu.needed_locks[locking.LEVEL_NODE] or
2774
            lu.needed_locks[locking.LEVEL_NODE] is locking.ALL_SET)
2775

  
2776
  elif level == locking.LEVEL_NODE_RES:
2777
    # Copy node locks
2778
    lu.needed_locks[locking.LEVEL_NODE_RES] = \
2779
      _CopyLockList(lu.needed_locks[locking.LEVEL_NODE])
2780

  
2781

  
2782
def _ExpandNamesForMigration(lu):
2783
  """Expands names for use with L{TLMigrateInstance}.
2784

  
2785
  @type lu: L{LogicalUnit}
2786

  
2787
  """
2788
  if lu.op.target_node is not None:
2789
    lu.op.target_node = _ExpandNodeName(lu.cfg, lu.op.target_node)
2790

  
2791
  lu.needed_locks[locking.LEVEL_NODE] = []
2792
  lu.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2793

  
2794
  lu.needed_locks[locking.LEVEL_NODE_RES] = []
2795
  lu.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
2796

  
2797
  # The node allocation lock is actually only needed for externally replicated
2798
  # instances (e.g. sharedfile or RBD) and if an iallocator is used.
2799
  lu.needed_locks[locking.LEVEL_NODE_ALLOC] = []
2800

  
2801

  
2802
class LUInstanceFailover(LogicalUnit):
2803
  """Failover an instance.
2804

  
2805
  """
2806
  HPATH = "instance-failover"
2807
  HTYPE = constants.HTYPE_INSTANCE
2808
  REQ_BGL = False
2809

  
2810
  def CheckArguments(self):
2811
    """Check the arguments.
2812

  
2813
    """
2814
    self.iallocator = getattr(self.op, "iallocator", None)
2815
    self.target_node = getattr(self.op, "target_node", None)
2816

  
2817
  def ExpandNames(self):
2818
    self._ExpandAndLockInstance()
2819
    _ExpandNamesForMigration(self)
2820

  
2821
    self._migrater = \
2822
      TLMigrateInstance(self, self.op.instance_name, False, True, False,
2823
                        self.op.ignore_consistency, True,
2824
                        self.op.shutdown_timeout, self.op.ignore_ipolicy)
2825

  
2826
    self.tasklets = [self._migrater]
2827

  
2828
  def DeclareLocks(self, level):
2829
    _DeclareLocksForMigration(self, level)
2830

  
2831
  def BuildHooksEnv(self):
2832
    """Build hooks env.
2833

  
2834
    This runs on master, primary and secondary nodes of the instance.
2835

  
2836
    """
2837
    instance = self._migrater.instance
2838
    source_node = instance.primary_node
2839
    target_node = self.op.target_node
2840
    env = {
2841
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2842
      "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
2843
      "OLD_PRIMARY": source_node,
2844
      "NEW_PRIMARY": target_node,
2845
      }
2846

  
2847
    if instance.disk_template in constants.DTS_INT_MIRROR:
2848
      env["OLD_SECONDARY"] = instance.secondary_nodes[0]
2849
      env["NEW_SECONDARY"] = source_node
2850
    else:
2851
      env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = ""
2852

  
2853
    env.update(_BuildInstanceHookEnvByObject(self, instance))
2854

  
2855
    return env
2856

  
2857
  def BuildHooksNodes(self):
2858
    """Build hooks nodes.
2859

  
2860
    """
2861
    instance = self._migrater.instance
2862
    nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
2863
    return (nl, nl + [instance.primary_node])
2864

  
2865

  
2866
class LUInstanceMigrate(LogicalUnit):
2867
  """Migrate an instance.
2868

  
2869
  This is migration without shutting down, compared to the failover,
2870
  which is done with shutdown.
2871

  
2872
  """
2873
  HPATH = "instance-migrate"
2874
  HTYPE = constants.HTYPE_INSTANCE
2875
  REQ_BGL = False
2876

  
2877
  def ExpandNames(self):
2878
    self._ExpandAndLockInstance()
2879
    _ExpandNamesForMigration(self)
2880

  
2881
    self._migrater = \
2882
      TLMigrateInstance(self, self.op.instance_name, self.op.cleanup,
2883
                        False, self.op.allow_failover, False,
2884
                        self.op.allow_runtime_changes,
2885
                        constants.DEFAULT_SHUTDOWN_TIMEOUT,
2886
                        self.op.ignore_ipolicy)
2887

  
2888
    self.tasklets = [self._migrater]
2889

  
2890
  def DeclareLocks(self, level):
2891
    _DeclareLocksForMigration(self, level)
2892

  
2893
  def BuildHooksEnv(self):
2894
    """Build hooks env.
2895

  
2896
    This runs on master, primary and secondary nodes of the instance.
2897

  
2898
    """
2899
    instance = self._migrater.instance
2900
    source_node = instance.primary_node
2901
    target_node = self.op.target_node
2902
    env = _BuildInstanceHookEnvByObject(self, instance)
2903
    env.update({
2904
      "MIGRATE_LIVE": self._migrater.live,
2905
      "MIGRATE_CLEANUP": self.op.cleanup,
2906
      "OLD_PRIMARY": source_node,
2907
      "NEW_PRIMARY": target_node,
2908
      "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
2909
      })
2910

  
2911
    if instance.disk_template in constants.DTS_INT_MIRROR:
2912
      env["OLD_SECONDARY"] = target_node
2913
      env["NEW_SECONDARY"] = source_node
2914
    else:
2915
      env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = None
2916

  
2917
    return env
2918

  
2919
  def BuildHooksNodes(self):
2920
    """Build hooks nodes.
2921

  
2922
    """
2923
    instance = self._migrater.instance
2924
    snodes = list(instance.secondary_nodes)
2925
    nl = [self.cfg.GetMasterNode(), instance.primary_node] + snodes
2926
    return (nl, nl)
2927

  
2928

  
2929 2681
class LUInstanceMultiAlloc(NoHooksLU):
2930 2682
  """Allocates multiple instances at the same time.
2931 2683

  
......
4592 4344
                 " instance '%s'", len(jobs), self.op.instance_name)
4593 4345

  
4594 4346
    return ResultWithJobs(jobs)
4595

  
4596

  
4597
class TLMigrateInstance(Tasklet):
4598
  """Tasklet class for instance migration.
4599

  
4600
  @type live: boolean
4601
  @ivar live: whether the migration will be done live or non-live;
4602
      this variable is initalized only after CheckPrereq has run
4603
  @type cleanup: boolean
4604
  @ivar cleanup: Wheater we cleanup from a failed migration
4605
  @type iallocator: string
4606
  @ivar iallocator: The iallocator used to determine target_node
4607
  @type target_node: string
4608
  @ivar target_node: If given, the target_node to reallocate the instance to
4609
  @type failover: boolean
4610
  @ivar failover: Whether operation results in failover or migration
4611
  @type fallback: boolean
4612
  @ivar fallback: Whether fallback to failover is allowed if migration not
4613
                  possible
4614
  @type ignore_consistency: boolean
4615
  @ivar ignore_consistency: Wheter we should ignore consistency between source
4616
                            and target node
4617
  @type shutdown_timeout: int
4618
  @ivar shutdown_timeout: In case of failover timeout of the shutdown
4619
  @type ignore_ipolicy: bool
4620
  @ivar ignore_ipolicy: If true, we can ignore instance policy when migrating
4621

  
4622
  """
4623

  
4624
  # Constants
4625
  _MIGRATION_POLL_INTERVAL = 1      # seconds
4626
  _MIGRATION_FEEDBACK_INTERVAL = 10 # seconds
4627

  
4628
  def __init__(self, lu, instance_name, cleanup, failover, fallback,
4629
               ignore_consistency, allow_runtime_changes, shutdown_timeout,
4630
               ignore_ipolicy):
4631
    """Initializes this class.
4632

  
4633
    """
4634
    Tasklet.__init__(self, lu)
4635

  
4636
    # Parameters
4637
    self.instance_name = instance_name
4638
    self.cleanup = cleanup
4639
    self.live = False # will be overridden later
4640
    self.failover = failover
4641
    self.fallback = fallback
4642
    self.ignore_consistency = ignore_consistency
4643
    self.shutdown_timeout = shutdown_timeout
4644
    self.ignore_ipolicy = ignore_ipolicy
4645
    self.allow_runtime_changes = allow_runtime_changes
4646

  
4647
  def CheckPrereq(self):
4648
    """Check prerequisites.
4649

  
4650
    This checks that the instance is in the cluster.
4651

  
4652
    """
4653
    instance_name = _ExpandInstanceName(self.lu.cfg, self.instance_name)
4654
    instance = self.cfg.GetInstanceInfo(instance_name)
4655
    assert instance is not None
4656
    self.instance = instance
4657
    cluster = self.cfg.GetClusterInfo()
4658

  
4659
    if (not self.cleanup and
4660
        not instance.admin_state == constants.ADMINST_UP and
4661
        not self.failover and self.fallback):
4662
      self.lu.LogInfo("Instance is marked down or offline, fallback allowed,"
4663
                      " switching to failover")
4664
      self.failover = True
4665

  
4666
    if instance.disk_template not in constants.DTS_MIRRORED:
4667
      if self.failover:
4668
        text = "failovers"
4669
      else:
4670
        text = "migrations"
4671
      raise errors.OpPrereqError("Instance's disk layout '%s' does not allow"
4672
                                 " %s" % (instance.disk_template, text),
4673
                                 errors.ECODE_STATE)
4674

  
4675
    if instance.disk_template in constants.DTS_EXT_MIRROR:
4676
      _CheckIAllocatorOrNode(self.lu, "iallocator", "target_node")
4677

  
4678
      if self.lu.op.iallocator:
4679
        assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC)
4680
        self._RunAllocator()
4681
      else:
4682
        # We set set self.target_node as it is required by
4683
        # BuildHooksEnv
4684
        self.target_node = self.lu.op.target_node
4685

  
4686
      # Check that the target node is correct in terms of instance policy
4687
      nodeinfo = self.cfg.GetNodeInfo(self.target_node)
4688
      group_info = self.cfg.GetNodeGroup(nodeinfo.group)
4689
      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
4690
                                                              group_info)
4691
      _CheckTargetNodeIPolicy(self.lu, ipolicy, instance, nodeinfo, self.cfg,
4692
                              ignore=self.ignore_ipolicy)
4693

  
4694
      # self.target_node is already populated, either directly or by the
4695
      # iallocator run
4696
      target_node = self.target_node
4697
      if self.target_node == instance.primary_node:
4698
        raise errors.OpPrereqError("Cannot migrate instance %s"
4699
                                   " to its primary (%s)" %
4700
                                   (instance.name, instance.primary_node),
4701
                                   errors.ECODE_STATE)
4702

  
4703
      if len(self.lu.tasklets) == 1:
4704
        # It is safe to release locks only when we're the only tasklet
4705
        # in the LU
4706
        _ReleaseLocks(self.lu, locking.LEVEL_NODE,
4707
                      keep=[instance.primary_node, self.target_node])
4708
        _ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
4709

  
4710
    else:
4711
      assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
4712

  
4713
      secondary_nodes = instance.secondary_nodes
4714
      if not secondary_nodes:
4715
        raise errors.ConfigurationError("No secondary node but using"
4716
                                        " %s disk template" %
4717
                                        instance.disk_template)
4718
      target_node = secondary_nodes[0]
4719
      if self.lu.op.iallocator or (self.lu.op.target_node and
4720
                                   self.lu.op.target_node != target_node):
4721
        if self.failover:
4722
          text = "failed over"
4723
        else:
4724
          text = "migrated"
4725
        raise errors.OpPrereqError("Instances with disk template %s cannot"
4726
                                   " be %s to arbitrary nodes"
4727
                                   " (neither an iallocator nor a target"
4728
                                   " node can be passed)" %
4729
                                   (instance.disk_template, text),
4730
                                   errors.ECODE_INVAL)
4731
      nodeinfo = self.cfg.GetNodeInfo(target_node)
4732
      group_info = self.cfg.GetNodeGroup(nodeinfo.group)
4733
      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
4734
                                                              group_info)
4735
      _CheckTargetNodeIPolicy(self.lu, ipolicy, instance, nodeinfo, self.cfg,
4736
                              ignore=self.ignore_ipolicy)
4737

  
4738
    i_be = cluster.FillBE(instance)
4739

  
4740
    # check memory requirements on the secondary node
4741
    if (not self.cleanup and
4742
         (not self.failover or instance.admin_state == constants.ADMINST_UP)):
4743
      self.tgt_free_mem = _CheckNodeFreeMemory(self.lu, target_node,
4744
                                               "migrating instance %s" %
4745
                                               instance.name,
4746
                                               i_be[constants.BE_MINMEM],
4747
                                               instance.hypervisor)
4748
    else:
4749
      self.lu.LogInfo("Not checking memory on the secondary node as"
4750
                      " instance will not be started")
4751

  
4752
    # check if failover must be forced instead of migration
4753
    if (not self.cleanup and not self.failover and
4754
        i_be[constants.BE_ALWAYS_FAILOVER]):
4755
      self.lu.LogInfo("Instance configured to always failover; fallback"
4756
                      " to failover")
4757
      self.failover = True
4758

  
4759
    # check bridge existance
4760
    _CheckInstanceBridgesExist(self.lu, instance, node=target_node)
4761

  
4762
    if not self.cleanup:
4763
      _CheckNodeNotDrained(self.lu, target_node)
4764
      if not self.failover:
4765
        result = self.rpc.call_instance_migratable(instance.primary_node,
4766
                                                   instance)
4767
        if result.fail_msg and self.fallback:
4768
          self.lu.LogInfo("Can't migrate, instance offline, fallback to"
4769
                          " failover")
4770
          self.failover = True
4771
        else:
4772
          result.Raise("Can't migrate, please use failover",
4773
                       prereq=True, ecode=errors.ECODE_STATE)
4774

  
4775
    assert not (self.failover and self.cleanup)
4776

  
4777
    if not self.failover:
4778
      if self.lu.op.live is not None and self.lu.op.mode is not None:
4779
        raise errors.OpPrereqError("Only one of the 'live' and 'mode'"
4780
                                   " parameters are accepted",
4781
                                   errors.ECODE_INVAL)
4782
      if self.lu.op.live is not None:
4783
        if self.lu.op.live:
4784
          self.lu.op.mode = constants.HT_MIGRATION_LIVE
4785
        else:
4786
          self.lu.op.mode = constants.HT_MIGRATION_NONLIVE
4787
        # reset the 'live' parameter to None so that repeated
4788
        # invocations of CheckPrereq do not raise an exception
4789
        self.lu.op.live = None
4790
      elif self.lu.op.mode is None:
4791
        # read the default value from the hypervisor
4792
        i_hv = cluster.FillHV(self.instance, skip_globals=False)
4793
        self.lu.op.mode = i_hv[constants.HV_MIGRATION_MODE]
4794

  
4795
      self.live = self.lu.op.mode == constants.HT_MIGRATION_LIVE
4796
    else:
4797
      # Failover is never live
4798
      self.live = False
4799

  
4800
    if not (self.failover or self.cleanup):
4801
      remote_info = self.rpc.call_instance_info(instance.primary_node,
4802
                                                instance.name,
4803
                                                instance.hypervisor)
4804
      remote_info.Raise("Error checking instance on node %s" %
4805
                        instance.primary_node)
4806
      instance_running = bool(remote_info.payload)
4807
      if instance_running:
4808
        self.current_mem = int(remote_info.payload["memory"])
4809

  
4810
  def _RunAllocator(self):
4811
    """Run the allocator based on input opcode.
4812

  
4813
    """
4814
    assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC)
4815

  
4816
    # FIXME: add a self.ignore_ipolicy option
4817
    req = iallocator.IAReqRelocate(name=self.instance_name,
4818
                                   relocate_from=[self.instance.primary_node])
4819
    ial = iallocator.IAllocator(self.cfg, self.rpc, req)
4820

  
4821
    ial.Run(self.lu.op.iallocator)
4822

  
4823
    if not ial.success:
4824
      raise errors.OpPrereqError("Can't compute nodes using"
4825
                                 " iallocator '%s': %s" %
4826
                                 (self.lu.op.iallocator, ial.info),
4827
                                 errors.ECODE_NORES)
4828
    self.target_node = ial.result[0]
4829
    self.lu.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4830
                    self.instance_name, self.lu.op.iallocator,
4831
                    utils.CommaJoin(ial.result))
4832

  
4833
  def _WaitUntilSync(self):
4834
    """Poll with custom rpc for disk sync.
4835

  
4836
    This uses our own step-based rpc call.
4837

  
4838
    """
4839
    self.feedback_fn("* wait until resync is done")
4840
    all_done = False
4841
    while not all_done:
4842
      all_done = True
4843
      result = self.rpc.call_drbd_wait_sync(self.all_nodes,
4844
                                            self.nodes_ip,
4845
                                            (self.instance.disks,
4846
                                             self.instance))
4847
      min_percent = 100
4848
      for node, nres in result.items():
4849
        nres.Raise("Cannot resync disks on node %s" % node)
4850
        node_done, node_percent = nres.payload
4851
        all_done = all_done and node_done
4852
        if node_percent is not None:
4853
          min_percent = min(min_percent, node_percent)
4854
      if not all_done:
4855
        if min_percent < 100:
4856
          self.feedback_fn("   - progress: %.1f%%" % min_percent)
4857
        time.sleep(2)
4858

  
4859
  def _EnsureSecondary(self, node):
4860
    """Demote a node to secondary.
4861

  
4862
    """
4863
    self.feedback_fn("* switching node %s to secondary mode" % node)
4864

  
4865
    for dev in self.instance.disks:
4866
      self.cfg.SetDiskID(dev, node)
4867

  
4868
    result = self.rpc.call_blockdev_close(node, self.instance.name,
4869
                                          self.instance.disks)
4870
    result.Raise("Cannot change disk to secondary on node %s" % node)
4871

  
4872
  def _GoStandalone(self):
4873
    """Disconnect from the network.
4874

  
4875
    """
4876
    self.feedback_fn("* changing into standalone mode")
4877
    result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
4878
                                               self.instance.disks)
4879
    for node, nres in result.items():
4880
      nres.Raise("Cannot disconnect disks node %s" % node)
4881

  
4882
  def _GoReconnect(self, multimaster):
4883
    """Reconnect to the network.
4884

  
4885
    """
4886
    if multimaster:
4887
      msg = "dual-master"
4888
    else:
4889
      msg = "single-master"
4890
    self.feedback_fn("* changing disks into %s mode" % msg)
4891
    result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
4892
                                           (self.instance.disks, self.instance),
4893
                                           self.instance.name, multimaster)
4894
    for node, nres in result.items():
4895
      nres.Raise("Cannot change disks config on node %s" % node)
4896

  
4897
  def _ExecCleanup(self):
4898
    """Try to cleanup after a failed migration.
4899

  
4900
    The cleanup is done by:
4901
      - check that the instance is running only on one node
4902
        (and update the config if needed)
4903
      - change disks on its secondary node to secondary
4904
      - wait until disks are fully synchronized
4905
      - disconnect from the network
4906
      - change disks into single-master mode
4907
      - wait again until disks are fully synchronized
4908

  
4909
    """
4910
    instance = self.instance
4911
    target_node = self.target_node
4912
    source_node = self.source_node
4913

  
4914
    # check running on only one node
4915
    self.feedback_fn("* checking where the instance actually runs"
4916
                     " (if this hangs, the hypervisor might be in"
4917
                     " a bad state)")
4918
    ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
4919
    for node, result in ins_l.items():
4920
      result.Raise("Can't contact node %s" % node)
4921

  
4922
    runningon_source = instance.name in ins_l[source_node].payload
4923
    runningon_target = instance.name in ins_l[target_node].payload
4924

  
4925
    if runningon_source and runningon_target:
4926
      raise errors.OpExecError("Instance seems to be running on two nodes,"
4927
                               " or the hypervisor is confused; you will have"
4928
                               " to ensure manually that it runs only on one"
4929
                               " and restart this operation")
4930

  
4931
    if not (runningon_source or runningon_target):
4932
      raise errors.OpExecError("Instance does not seem to be running at all;"
4933
                               " in this case it's safer to repair by"
4934
                               " running 'gnt-instance stop' to ensure disk"
4935
                               " shutdown, and then restarting it")
4936

  
4937
    if runningon_target:
4938
      # the migration has actually succeeded, we need to update the config
4939
      self.feedback_fn("* instance running on secondary node (%s),"
4940
                       " updating config" % target_node)
4941
      instance.primary_node = target_node
4942
      self.cfg.Update(instance, self.feedback_fn)
4943
      demoted_node = source_node
4944
    else:
4945
      self.feedback_fn("* instance confirmed to be running on its"
4946
                       " primary node (%s)" % source_node)
4947
      demoted_node = target_node
4948

  
4949
    if instance.disk_template in constants.DTS_INT_MIRROR:
4950
      self._EnsureSecondary(demoted_node)
4951
      try:
4952
        self._WaitUntilSync()
4953
      except errors.OpExecError:
4954
        # we ignore here errors, since if the device is standalone, it
4955
        # won't be able to sync
4956
        pass
4957
      self._GoStandalone()
4958
      self._GoReconnect(False)
4959
      self._WaitUntilSync()
4960

  
4961
    self.feedback_fn("* done")
4962

  
4963
  def _RevertDiskStatus(self):
4964
    """Try to revert the disk status after a failed migration.
4965

  
4966
    """
4967
    target_node = self.target_node
4968
    if self.instance.disk_template in constants.DTS_EXT_MIRROR:
4969
      return
4970

  
4971
    try:
4972
      self._EnsureSecondary(target_node)
4973
      self._GoStandalone()
4974
      self._GoReconnect(False)
4975
      self._WaitUntilSync()
4976
    except errors.OpExecError, err:
4977
      self.lu.LogWarning("Migration failed and I can't reconnect the drives,"
4978
                         " please try to recover the instance manually;"
4979
                         " error '%s'" % str(err))
4980

  
4981
  def _AbortMigration(self):
4982
    """Call the hypervisor code to abort a started migration.
4983

  
4984
    """
4985
    instance = self.instance
4986
    target_node = self.target_node
4987
    source_node = self.source_node
4988
    migration_info = self.migration_info
4989

  
4990
    abort_result = self.rpc.call_instance_finalize_migration_dst(target_node,
4991
                                                                 instance,
4992
                                                                 migration_info,
4993
                                                                 False)
4994
    abort_msg = abort_result.fail_msg
4995
    if abort_msg:
4996
      logging.error("Aborting migration failed on target node %s: %s",
4997
                    target_node, abort_msg)
4998
      # Don't raise an exception here, as we stil have to try to revert the
4999
      # disk status, even if this step failed.
5000

  
5001
    abort_result = self.rpc.call_instance_finalize_migration_src(
5002
      source_node, instance, False, self.live)
5003
    abort_msg = abort_result.fail_msg
5004
    if abort_msg:
5005
      logging.error("Aborting migration failed on source node %s: %s",
5006
                    source_node, abort_msg)
5007

  
5008
  def _ExecMigration(self):
5009
    """Migrate an instance.
5010

  
5011
    The migrate is done by:
5012
      - change the disks into dual-master mode
5013
      - wait until disks are fully synchronized again
5014
      - migrate the instance
5015
      - change disks on the new secondary node (the old primary) to secondary
5016
      - wait until disks are fully synchronized
5017
      - change disks into single-master mode
5018

  
5019
    """
5020
    instance = self.instance
5021
    target_node = self.target_node
5022
    source_node = self.source_node
5023

  
5024
    # Check for hypervisor version mismatch and warn the user.
5025
    nodeinfo = self.rpc.call_node_info([source_node, target_node],
5026
                                       None, [self.instance.hypervisor], False)
5027
    for ninfo in nodeinfo.values():
5028
      ninfo.Raise("Unable to retrieve node information from node '%s'" %
5029
                  ninfo.node)
5030
    (_, _, (src_info, )) = nodeinfo[source_node].payload
5031
    (_, _, (dst_info, )) = nodeinfo[target_node].payload
5032

  
5033
    if ((constants.HV_NODEINFO_KEY_VERSION in src_info) and
5034
        (constants.HV_NODEINFO_KEY_VERSION in dst_info)):
5035
      src_version = src_info[constants.HV_NODEINFO_KEY_VERSION]
5036
      dst_version = dst_info[constants.HV_NODEINFO_KEY_VERSION]
5037
      if src_version != dst_version:
5038
        self.feedback_fn("* warning: hypervisor version mismatch between"
5039
                         " source (%s) and target (%s) node" %
5040
                         (src_version, dst_version))
5041

  
5042
    self.feedback_fn("* checking disk consistency between source and target")
5043
    for (idx, dev) in enumerate(instance.disks):
5044
      if not _CheckDiskConsistency(self.lu, instance, dev, target_node, False):
5045
        raise errors.OpExecError("Disk %s is degraded or not fully"
5046
                                 " synchronized on target node,"
5047
                                 " aborting migration" % idx)
5048

  
5049
    if self.current_mem > self.tgt_free_mem:
5050
      if not self.allow_runtime_changes:
5051
        raise errors.OpExecError("Memory ballooning not allowed and not enough"
5052
                                 " free memory to fit instance %s on target"
5053
                                 " node %s (have %dMB, need %dMB)" %
5054
                                 (instance.name, target_node,
5055
                                  self.tgt_free_mem, self.current_mem))
5056
      self.feedback_fn("* setting instance memory to %s" % self.tgt_free_mem)
5057
      rpcres = self.rpc.call_instance_balloon_memory(instance.primary_node,
5058
                                                     instance,
5059
                                                     self.tgt_free_mem)
5060
      rpcres.Raise("Cannot modify instance runtime memory")
5061

  
5062
    # First get the migration information from the remote node
5063
    result = self.rpc.call_migration_info(source_node, instance)
5064
    msg = result.fail_msg
5065
    if msg:
5066
      log_err = ("Failed fetching source migration information from %s: %s" %
5067
                 (source_node, msg))
5068
      logging.error(log_err)
5069
      raise errors.OpExecError(log_err)
5070

  
5071
    self.migration_info = migration_info = result.payload
5072

  
5073
    if self.instance.disk_template not in constants.DTS_EXT_MIRROR:
5074
      # Then switch the disks to master/master mode
5075
      self._EnsureSecondary(target_node)
5076
      self._GoStandalone()
5077
      self._GoReconnect(True)
5078
      self._WaitUntilSync()
5079

  
5080
    self.feedback_fn("* preparing %s to accept the instance" % target_node)
5081
    result = self.rpc.call_accept_instance(target_node,
5082
                                           instance,
5083
                                           migration_info,
5084
                                           self.nodes_ip[target_node])
5085

  
5086
    msg = result.fail_msg
5087
    if msg:
5088
      logging.error("Instance pre-migration failed, trying to revert"
5089
                    " disk status: %s", msg)
5090
      self.feedback_fn("Pre-migration failed, aborting")
5091
      self._AbortMigration()
5092
      self._RevertDiskStatus()
5093
      raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
5094
                               (instance.name, msg))
5095

  
5096
    self.feedback_fn("* migrating instance to %s" % target_node)
5097
    result = self.rpc.call_instance_migrate(source_node, instance,
5098
                                            self.nodes_ip[target_node],
5099
                                            self.live)
5100
    msg = result.fail_msg
5101
    if msg:
5102
      logging.error("Instance migration failed, trying to revert"
5103
                    " disk status: %s", msg)
5104
      self.feedback_fn("Migration failed, aborting")
5105
      self._AbortMigration()
5106
      self._RevertDiskStatus()
5107
      raise errors.OpExecError("Could not migrate instance %s: %s" %
5108
                               (instance.name, msg))
5109

  
5110
    self.feedback_fn("* starting memory transfer")
5111
    last_feedback = time.time()
5112
    while True:
5113
      result = self.rpc.call_instance_get_migration_status(source_node,
5114
                                                           instance)
5115
      msg = result.fail_msg
5116
      ms = result.payload   # MigrationStatus instance
5117
      if msg or (ms.status in constants.HV_MIGRATION_FAILED_STATUSES):
5118
        logging.error("Instance migration failed, trying to revert"
5119
                      " disk status: %s", msg)
5120
        self.feedback_fn("Migration failed, aborting")
5121
        self._AbortMigration()
5122
        self._RevertDiskStatus()
5123
        if not msg:
5124
          msg = "hypervisor returned failure"
5125
        raise errors.OpExecError("Could not migrate instance %s: %s" %
5126
                                 (instance.name, msg))
5127

  
5128
      if result.payload.status != constants.HV_MIGRATION_ACTIVE:
5129
        self.feedback_fn("* memory transfer complete")
5130
        break
5131

  
5132
      if (utils.TimeoutExpired(last_feedback,
5133
                               self._MIGRATION_FEEDBACK_INTERVAL) and
5134
          ms.transferred_ram is not None):
5135
        mem_progress = 100 * float(ms.transferred_ram) / float(ms.total_ram)
5136
        self.feedback_fn("* memory transfer progress: %.2f %%" % mem_progress)
5137
        last_feedback = time.time()
5138

  
5139
      time.sleep(self._MIGRATION_POLL_INTERVAL)
5140

  
5141
    result = self.rpc.call_instance_finalize_migration_src(source_node,
5142
                                                           instance,
5143
                                                           True,
5144
                                                           self.live)
5145
    msg = result.fail_msg
5146
    if msg:
5147
      logging.error("Instance migration succeeded, but finalization failed"
5148
                    " on the source node: %s", msg)
5149
      raise errors.OpExecError("Could not finalize instance migration: %s" %
5150
                               msg)
5151

  
5152
    instance.primary_node = target_node
5153

  
5154
    # distribute new instance config to the other nodes
5155
    self.cfg.Update(instance, self.feedback_fn)
5156

  
5157
    result = self.rpc.call_instance_finalize_migration_dst(target_node,
5158
                                                           instance,
5159
                                                           migration_info,
5160
                                                           True)
5161
    msg = result.fail_msg
5162
    if msg:
5163
      logging.error("Instance migration succeeded, but finalization failed"
5164
                    " on the target node: %s", msg)
5165
      raise errors.OpExecError("Could not finalize instance migration: %s" %
5166
                               msg)
5167

  
5168
    if self.instance.disk_template not in constants.DTS_EXT_MIRROR:
5169
      self._EnsureSecondary(source_node)
5170
      self._WaitUntilSync()
5171
      self._GoStandalone()
5172
      self._GoReconnect(False)
5173
      self._WaitUntilSync()
5174

  
5175
    # If the instance's disk template is `rbd' or `ext' and there was a
5176
    # successful migration, unmap the device from the source node.
5177
    if self.instance.disk_template in (constants.DT_RBD, constants.DT_EXT):
5178
      disks = _ExpandCheckDisks(instance, instance.disks)
5179
      self.feedback_fn("* unmapping instance's disks from %s" % source_node)
5180
      for disk in disks:
5181
        result = self.rpc.call_blockdev_shutdown(source_node, (disk, instance))
5182
        msg = result.fail_msg
5183
        if msg:
5184
          logging.error("Migration was successful, but couldn't unmap the"
5185
                        " block device %s on source node %s: %s",
5186
                        disk.iv_name, source_node, msg)
5187
          logging.error("You need to unmap the device %s manually on %s",
5188
                        disk.iv_name, source_node)
5189

  
5190
    self.feedback_fn("* done")
5191

  
5192
  def _ExecFailover(self):
5193
    """Failover an instance.
5194

  
5195
    The failover is done by shutting it down on its present node and
5196
    starting it on the secondary.
5197

  
5198
    """
5199
    instance = self.instance
5200
    primary_node = self.cfg.GetNodeInfo(instance.primary_node)
5201

  
5202
    source_node = instance.primary_node
5203
    target_node = self.target_node
5204

  
5205
    if instance.admin_state == constants.ADMINST_UP:
5206
      self.feedback_fn("* checking disk consistency between source and target")
5207
      for (idx, dev) in enumerate(instance.disks):
5208
        # for drbd, these are drbd over lvm
5209
        if not _CheckDiskConsistency(self.lu, instance, dev, target_node,
5210
                                     False):
5211
          if primary_node.offline:
5212
            self.feedback_fn("Node %s is offline, ignoring degraded disk %s on"
5213
                             " target node %s" %
5214
                             (primary_node.name, idx, target_node))
5215
          elif not self.ignore_consistency:
5216
            raise errors.OpExecError("Disk %s is degraded on target node,"
5217
                                     " aborting failover" % idx)
5218
    else:
5219
      self.feedback_fn("* not checking disk consistency as instance is not"
5220
                       " running")
5221

  
5222
    self.feedback_fn("* shutting down instance on source node")
5223
    logging.info("Shutting down instance %s on node %s",
5224
                 instance.name, source_node)
5225

  
5226
    result = self.rpc.call_instance_shutdown(source_node, instance,
5227
                                             self.shutdown_timeout,
5228
                                             self.lu.op.reason)
5229
    msg = result.fail_msg
5230
    if msg:
5231
      if self.ignore_consistency or primary_node.offline:
5232
        self.lu.LogWarning("Could not shutdown instance %s on node %s,"
5233
                           " proceeding anyway; please make sure node"
5234
                           " %s is down; error details: %s",
5235
                           instance.name, source_node, source_node, msg)
5236
      else:
5237
        raise errors.OpExecError("Could not shutdown instance %s on"
5238
                                 " node %s: %s" %
5239
                                 (instance.name, source_node, msg))
5240

  
5241
    self.feedback_fn("* deactivating the instance's disks on source node")
5242
    if not _ShutdownInstanceDisks(self.lu, instance, ignore_primary=True):
5243
      raise errors.OpExecError("Can't shut down the instance's disks")
5244

  
5245
    instance.primary_node = target_node
5246
    # distribute new instance config to the other nodes
5247
    self.cfg.Update(instance, self.feedback_fn)
5248

  
5249
    # Only start the instance if it's marked as up
5250
    if instance.admin_state == constants.ADMINST_UP:
5251
      self.feedback_fn("* activating the instance's disks on target node %s" %
5252
                       target_node)
5253
      logging.info("Starting instance %s on node %s",
5254
                   instance.name, target_node)
5255

  
5256
      disks_ok, _ = _AssembleInstanceDisks(self.lu, instance,
5257
                                           ignore_secondaries=True)
5258
      if not disks_ok:
5259
        _ShutdownInstanceDisks(self.lu, instance)
5260
        raise errors.OpExecError("Can't activate the instance's disks")
5261

  
5262
      self.feedback_fn("* starting the instance on the target node %s" %
5263
                       target_node)
5264
      result = self.rpc.call_instance_start(target_node, (instance, None, None),
5265
                                            False, self.lu.op.reason)
5266
      msg = result.fail_msg
5267
      if msg:
5268
        _ShutdownInstanceDisks(self.lu, instance)
5269
        raise errors.OpExecError("Could not start instance %s on node %s: %s" %
5270
                                 (instance.name, target_node, msg))
5271

  
5272
  def Exec(self, feedback_fn):
5273
    """Perform the migration.
5274

  
5275
    """
5276
    self.feedback_fn = feedback_fn
5277
    self.source_node = self.instance.primary_node
5278

  
5279
    # FIXME: if we implement migrate-to-any in DRBD, this needs fixing
5280
    if self.instance.disk_template in constants.DTS_INT_MIRROR:
5281
      self.target_node = self.instance.secondary_nodes[0]
5282
      # Otherwise self.target_node has been populated either
5283
      # directly, or through an iallocator.
5284

  
5285
    self.all_nodes = [self.source_node, self.target_node]
5286
    self.nodes_ip = dict((name, node.secondary_ip) for (name, node)
5287
                         in self.cfg.GetMultiNodeInfo(self.all_nodes))
5288

  
5289
    if self.failover:
5290
      feedback_fn("Failover instance %s" % self.instance.name)
5291
      self._ExecFailover()
5292
    else:
5293
      feedback_fn("Migrating instance %s" % self.instance.name)
5294

  
5295
      if self.cleanup:
5296
        return self._ExecCleanup()
5297
      else:
5298
        return self._ExecMigration()
b/lib/cmdlib/instance_migration.py
1
#
2
#
3

  
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

  
21

  
22
"""Logical units dealing with instance migration an failover."""
23

  
24
import logging
25
import time
26

  
27
from ganeti import constants
28
from ganeti import errors
29
from ganeti import locking
30
from ganeti.masterd import iallocator
31
from ganeti import utils
32
from ganeti.cmdlib.base import LogicalUnit, Tasklet
33
from ganeti.cmdlib.common import _ExpandInstanceName, \
34
  _CheckIAllocatorOrNode, _ExpandNodeName
35
from ganeti.cmdlib.instance_storage import _CheckDiskConsistency, \
36
  _ExpandCheckDisks, _ShutdownInstanceDisks, _AssembleInstanceDisks
37
from ganeti.cmdlib.instance_utils import _BuildInstanceHookEnvByObject, \
38
  _CheckTargetNodeIPolicy, _ReleaseLocks, _CheckNodeNotDrained, \
39
  _CopyLockList, _CheckNodeFreeMemory, _CheckInstanceBridgesExist
40

  
41
import ganeti.masterd.instance
42

  
43

  
44
def _ExpandNamesForMigration(lu):
45
  """Expands names for use with L{TLMigrateInstance}.
46

  
47
  @type lu: L{LogicalUnit}
48

  
49
  """
50
  if lu.op.target_node is not None:
51
    lu.op.target_node = _ExpandNodeName(lu.cfg, lu.op.target_node)
52

  
53
  lu.needed_locks[locking.LEVEL_NODE] = []
54
  lu.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
55

  
56
  lu.needed_locks[locking.LEVEL_NODE_RES] = []
57
  lu.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
58

  
59
  # The node allocation lock is actually only needed for externally replicated
60
  # instances (e.g. sharedfile or RBD) and if an iallocator is used.
61
  lu.needed_locks[locking.LEVEL_NODE_ALLOC] = []
62

  
63

  
64
def _DeclareLocksForMigration(lu, level):
65
  """Declares locks for L{TLMigrateInstance}.
66

  
67
  @type lu: L{LogicalUnit}
68
  @param level: Lock level
69

  
70
  """
71
  if level == locking.LEVEL_NODE_ALLOC:
72
    assert lu.op.instance_name in lu.owned_locks(locking.LEVEL_INSTANCE)
73

  
74
    instance = lu.cfg.GetInstanceInfo(lu.op.instance_name)
75

  
76
    # Node locks are already declared here rather than at LEVEL_NODE as we need
77
    # the instance object anyway to declare the node allocation lock.
78
    if instance.disk_template in constants.DTS_EXT_MIRROR:
79
      if lu.op.target_node is None:
80
        lu.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
81
        lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
82
      else:
83
        lu.needed_locks[locking.LEVEL_NODE] = [instance.primary_node,
84
                                               lu.op.target_node]
85
      del lu.recalculate_locks[locking.LEVEL_NODE]
86
    else:
87
      lu._LockInstancesNodes() # pylint: disable=W0212
88

  
89
  elif level == locking.LEVEL_NODE:
90
    # Node locks are declared together with the node allocation lock
91
    assert (lu.needed_locks[locking.LEVEL_NODE] or
92
            lu.needed_locks[locking.LEVEL_NODE] is locking.ALL_SET)
93

  
94
  elif level == locking.LEVEL_NODE_RES:
95
    # Copy node locks
96
    lu.needed_locks[locking.LEVEL_NODE_RES] = \
97
      _CopyLockList(lu.needed_locks[locking.LEVEL_NODE])
98

  
99

  
100
class LUInstanceFailover(LogicalUnit):
101
  """Failover an instance.
102

  
103
  """
104
  HPATH = "instance-failover"
105
  HTYPE = constants.HTYPE_INSTANCE
106
  REQ_BGL = False
107

  
108
  def CheckArguments(self):
109
    """Check the arguments.
110

  
111
    """
112
    self.iallocator = getattr(self.op, "iallocator", None)
113
    self.target_node = getattr(self.op, "target_node", None)
114

  
115
  def ExpandNames(self):
116
    self._ExpandAndLockInstance()
117
    _ExpandNamesForMigration(self)
118

  
119
    self._migrater = \
120
      TLMigrateInstance(self, self.op.instance_name, False, True, False,
121
                        self.op.ignore_consistency, True,
122
                        self.op.shutdown_timeout, self.op.ignore_ipolicy)
123

  
124
    self.tasklets = [self._migrater]
125

  
126
  def DeclareLocks(self, level):
127
    _DeclareLocksForMigration(self, level)
128

  
129
  def BuildHooksEnv(self):
130
    """Build hooks env.
131

  
132
    This runs on master, primary and secondary nodes of the instance.
133

  
134
    """
135
    instance = self._migrater.instance
136
    source_node = instance.primary_node
137
    target_node = self.op.target_node
138
    env = {
139
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
140
      "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
141
      "OLD_PRIMARY": source_node,
142
      "NEW_PRIMARY": target_node,
143
      }
144

  
145
    if instance.disk_template in constants.DTS_INT_MIRROR:
146
      env["OLD_SECONDARY"] = instance.secondary_nodes[0]
147
      env["NEW_SECONDARY"] = source_node
148
    else:
149
      env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = ""
150

  
151
    env.update(_BuildInstanceHookEnvByObject(self, instance))
152

  
153
    return env
154

  
155
  def BuildHooksNodes(self):
156
    """Build hooks nodes.
157

  
158
    """
159
    instance = self._migrater.instance
160
    nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
161
    return (nl, nl + [instance.primary_node])
162

  
163

  
164
class LUInstanceMigrate(LogicalUnit):
165
  """Migrate an instance.
166

  
167
  This is migration without shutting down, compared to the failover,
168
  which is done with shutdown.
169

  
170
  """
171
  HPATH = "instance-migrate"
172
  HTYPE = constants.HTYPE_INSTANCE
173
  REQ_BGL = False
174

  
175
  def ExpandNames(self):
176
    self._ExpandAndLockInstance()
177
    _ExpandNamesForMigration(self)
178

  
179
    self._migrater = \
180
      TLMigrateInstance(self, self.op.instance_name, self.op.cleanup,
181
                        False, self.op.allow_failover, False,
182
                        self.op.allow_runtime_changes,
183
                        constants.DEFAULT_SHUTDOWN_TIMEOUT,
184
                        self.op.ignore_ipolicy)
185

  
186
    self.tasklets = [self._migrater]
187

  
188
  def DeclareLocks(self, level):
189
    _DeclareLocksForMigration(self, level)
190

  
191
  def BuildHooksEnv(self):
192
    """Build hooks env.
193

  
194
    This runs on master, primary and secondary nodes of the instance.
195

  
196
    """
197
    instance = self._migrater.instance
198
    source_node = instance.primary_node
199
    target_node = self.op.target_node
200
    env = _BuildInstanceHookEnvByObject(self, instance)
201
    env.update({
202
      "MIGRATE_LIVE": self._migrater.live,
203
      "MIGRATE_CLEANUP": self.op.cleanup,
204
      "OLD_PRIMARY": source_node,
205
      "NEW_PRIMARY": target_node,
206
      "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
207
      })
208

  
209
    if instance.disk_template in constants.DTS_INT_MIRROR:
210
      env["OLD_SECONDARY"] = target_node
211
      env["NEW_SECONDARY"] = source_node
212
    else:
213
      env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = None
214

  
215
    return env
216

  
217
  def BuildHooksNodes(self):
218
    """Build hooks nodes.
219

  
220
    """
221
    instance = self._migrater.instance
222
    snodes = list(instance.secondary_nodes)
223
    nl = [self.cfg.GetMasterNode(), instance.primary_node] + snodes
224
    return (nl, nl)
225

  
226

  
227
class TLMigrateInstance(Tasklet):
228
  """Tasklet class for instance migration.
229

  
230
  @type live: boolean
231
  @ivar live: whether the migration will be done live or non-live;
232
      this variable is initalized only after CheckPrereq has run
233
  @type cleanup: boolean
234
  @ivar cleanup: Wheater we cleanup from a failed migration
235
  @type iallocator: string
236
  @ivar iallocator: The iallocator used to determine target_node
237
  @type target_node: string
238
  @ivar target_node: If given, the target_node to reallocate the instance to
239
  @type failover: boolean
240
  @ivar failover: Whether operation results in failover or migration
241
  @type fallback: boolean
242
  @ivar fallback: Whether fallback to failover is allowed if migration not
243
                  possible
244
  @type ignore_consistency: boolean
245
  @ivar ignore_consistency: Wheter we should ignore consistency between source
246
                            and target node
247
  @type shutdown_timeout: int
248
  @ivar shutdown_timeout: In case of failover timeout of the shutdown
249
  @type ignore_ipolicy: bool
250
  @ivar ignore_ipolicy: If true, we can ignore instance policy when migrating
251

  
252
  """
253

  
254
  # Constants
255
  _MIGRATION_POLL_INTERVAL = 1      # seconds
256
  _MIGRATION_FEEDBACK_INTERVAL = 10 # seconds
257

  
258
  def __init__(self, lu, instance_name, cleanup, failover, fallback,
259
               ignore_consistency, allow_runtime_changes, shutdown_timeout,
260
               ignore_ipolicy):
261
    """Initializes this class.
262

  
263
    """
264
    Tasklet.__init__(self, lu)
265

  
266
    # Parameters
267
    self.instance_name = instance_name
268
    self.cleanup = cleanup
269
    self.live = False # will be overridden later
270
    self.failover = failover
271
    self.fallback = fallback
272
    self.ignore_consistency = ignore_consistency
273
    self.shutdown_timeout = shutdown_timeout
274
    self.ignore_ipolicy = ignore_ipolicy
275
    self.allow_runtime_changes = allow_runtime_changes
276

  
277
  def CheckPrereq(self):
278
    """Check prerequisites.
279

  
280
    This checks that the instance is in the cluster.
281

  
282
    """
283
    instance_name = _ExpandInstanceName(self.lu.cfg, self.instance_name)
284
    instance = self.cfg.GetInstanceInfo(instance_name)
285
    assert instance is not None
286
    self.instance = instance
287
    cluster = self.cfg.GetClusterInfo()
288

  
289
    if (not self.cleanup and
290
        not instance.admin_state == constants.ADMINST_UP and
291
        not self.failover and self.fallback):
292
      self.lu.LogInfo("Instance is marked down or offline, fallback allowed,"
293
                      " switching to failover")
294
      self.failover = True
295

  
296
    if instance.disk_template not in constants.DTS_MIRRORED:
297
      if self.failover:
298
        text = "failovers"
299
      else:
300
        text = "migrations"
301
      raise errors.OpPrereqError("Instance's disk layout '%s' does not allow"
302
                                 " %s" % (instance.disk_template, text),
303
                                 errors.ECODE_STATE)
304

  
305
    if instance.disk_template in constants.DTS_EXT_MIRROR:
306
      _CheckIAllocatorOrNode(self.lu, "iallocator", "target_node")
307

  
308
      if self.lu.op.iallocator:
309
        assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC)
310
        self._RunAllocator()
311
      else:
312
        # We set set self.target_node as it is required by
313
        # BuildHooksEnv
314
        self.target_node = self.lu.op.target_node
315

  
316
      # Check that the target node is correct in terms of instance policy
317
      nodeinfo = self.cfg.GetNodeInfo(self.target_node)
318
      group_info = self.cfg.GetNodeGroup(nodeinfo.group)
319
      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
320
                                                              group_info)
321
      _CheckTargetNodeIPolicy(self.lu, ipolicy, instance, nodeinfo, self.cfg,
322
                              ignore=self.ignore_ipolicy)
323

  
324
      # self.target_node is already populated, either directly or by the
325
      # iallocator run
326
      target_node = self.target_node
327
      if self.target_node == instance.primary_node:
328
        raise errors.OpPrereqError("Cannot migrate instance %s"
329
                                   " to its primary (%s)" %
330
                                   (instance.name, instance.primary_node),
331
                                   errors.ECODE_STATE)
332

  
333
      if len(self.lu.tasklets) == 1:
334
        # It is safe to release locks only when we're the only tasklet
335
        # in the LU
336
        _ReleaseLocks(self.lu, locking.LEVEL_NODE,
337
                      keep=[instance.primary_node, self.target_node])
338
        _ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
339

  
340
    else:
341
      assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
342

  
343
      secondary_nodes = instance.secondary_nodes
344
      if not secondary_nodes:
345
        raise errors.ConfigurationError("No secondary node but using"
346
                                        " %s disk template" %
347
                                        instance.disk_template)
348
      target_node = secondary_nodes[0]
349
      if self.lu.op.iallocator or (self.lu.op.target_node and
350
                                   self.lu.op.target_node != target_node):
351
        if self.failover:
352
          text = "failed over"
353
        else:
354
          text = "migrated"
355
        raise errors.OpPrereqError("Instances with disk template %s cannot"
356
                                   " be %s to arbitrary nodes"
357
                                   " (neither an iallocator nor a target"
358
                                   " node can be passed)" %
359
                                   (instance.disk_template, text),
360
                                   errors.ECODE_INVAL)
361
      nodeinfo = self.cfg.GetNodeInfo(target_node)
362
      group_info = self.cfg.GetNodeGroup(nodeinfo.group)
363
      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
364
                                                              group_info)
365
      _CheckTargetNodeIPolicy(self.lu, ipolicy, instance, nodeinfo, self.cfg,
366
                              ignore=self.ignore_ipolicy)
367

  
368
    i_be = cluster.FillBE(instance)
369

  
370
    # check memory requirements on the secondary node
371
    if (not self.cleanup and
372
         (not self.failover or instance.admin_state == constants.ADMINST_UP)):
373
      self.tgt_free_mem = _CheckNodeFreeMemory(self.lu, target_node,
374
                                               "migrating instance %s" %
375
                                               instance.name,
376
                                               i_be[constants.BE_MINMEM],
377
                                               instance.hypervisor)
378
    else:
379
      self.lu.LogInfo("Not checking memory on the secondary node as"
380
                      " instance will not be started")
381

  
382
    # check if failover must be forced instead of migration
383
    if (not self.cleanup and not self.failover and
384
        i_be[constants.BE_ALWAYS_FAILOVER]):
385
      self.lu.LogInfo("Instance configured to always failover; fallback"
386
                      " to failover")
387
      self.failover = True
388

  
389
    # check bridge existance
390
    _CheckInstanceBridgesExist(self.lu, instance, node=target_node)
391

  
392
    if not self.cleanup:
393
      _CheckNodeNotDrained(self.lu, target_node)
394
      if not self.failover:
395
        result = self.rpc.call_instance_migratable(instance.primary_node,
396
                                                   instance)
397
        if result.fail_msg and self.fallback:
398
          self.lu.LogInfo("Can't migrate, instance offline, fallback to"
399
                          " failover")
400
          self.failover = True
401
        else:
402
          result.Raise("Can't migrate, please use failover",
403
                       prereq=True, ecode=errors.ECODE_STATE)
404

  
405
    assert not (self.failover and self.cleanup)
406

  
407
    if not self.failover:
408
      if self.lu.op.live is not None and self.lu.op.mode is not None:
409
        raise errors.OpPrereqError("Only one of the 'live' and 'mode'"
410
                                   " parameters are accepted",
411
                                   errors.ECODE_INVAL)
412
      if self.lu.op.live is not None:
413
        if self.lu.op.live:
414
          self.lu.op.mode = constants.HT_MIGRATION_LIVE
415
        else:
416
          self.lu.op.mode = constants.HT_MIGRATION_NONLIVE
417
        # reset the 'live' parameter to None so that repeated
418
        # invocations of CheckPrereq do not raise an exception
419
        self.lu.op.live = None
420
      elif self.lu.op.mode is None:
421
        # read the default value from the hypervisor
422
        i_hv = cluster.FillHV(self.instance, skip_globals=False)
423
        self.lu.op.mode = i_hv[constants.HV_MIGRATION_MODE]
424

  
425
      self.live = self.lu.op.mode == constants.HT_MIGRATION_LIVE
426
    else:
427
      # Failover is never live
428
      self.live = False
429

  
... This diff was truncated because it exceeds the maximum size that can be displayed.

Also available in: Unified diff