Use the new dry-run mode in cmdlib
[ganeti-local] / lib / cmdlib.py
index 3ba5248..7ba8630 100644 (file)
@@ -129,17 +129,16 @@ class LogicalUnit(object):
     self.proc = processor
     self.op = op
     self.cfg = context.cfg
     self.proc = processor
     self.op = op
     self.cfg = context.cfg
+    self.glm = context.glm
     self.context = context
     self.rpc = rpc
     # Dicts used to declare locking needs to mcpu
     self.needed_locks = None
     self.context = context
     self.rpc = rpc
     # Dicts used to declare locking needs to mcpu
     self.needed_locks = None
-    self.acquired_locks = {}
     self.share_locks = dict.fromkeys(locking.LEVELS, 0)
     self.add_locks = {}
     self.remove_locks = {}
     # Used to force good behavior when calling helper functions
     self.recalculate_locks = {}
     self.share_locks = dict.fromkeys(locking.LEVELS, 0)
     self.add_locks = {}
     self.remove_locks = {}
     # Used to force good behavior when calling helper functions
     self.recalculate_locks = {}
-    self.__ssh = None
     # logging
     self.Log = processor.Log # pylint: disable-msg=C0103
     self.LogWarning = processor.LogWarning # pylint: disable-msg=C0103
     # logging
     self.Log = processor.Log # pylint: disable-msg=C0103
     self.LogWarning = processor.LogWarning # pylint: disable-msg=C0103
@@ -160,16 +159,6 @@ class LogicalUnit(object):
 
     self.CheckArguments()
 
 
     self.CheckArguments()
 
-  def __GetSSH(self):
-    """Returns the SshRunner object
-
-    """
-    if not self.__ssh:
-      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
-    return self.__ssh
-
-  ssh = property(fget=__GetSSH)
-
   def CheckArguments(self):
     """Check syntactic validity for the opcode arguments.
 
   def CheckArguments(self):
     """Check syntactic validity for the opcode arguments.
 
@@ -396,7 +385,7 @@ class LogicalUnit(object):
     # future we might want to have different behaviors depending on the value
     # of self.recalculate_locks[locking.LEVEL_NODE]
     wanted_nodes = []
     # future we might want to have different behaviors depending on the value
     # of self.recalculate_locks[locking.LEVEL_NODE]
     wanted_nodes = []
-    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
+    for instance_name in self.glm.list_owned(locking.LEVEL_INSTANCE):
       instance = self.context.cfg.GetInstanceInfo(instance_name)
       wanted_nodes.append(instance.primary_node)
       if not primary_only:
       instance = self.context.cfg.GetInstanceInfo(instance_name)
       wanted_nodes.append(instance.primary_node)
       if not primary_only:
@@ -510,7 +499,7 @@ class _QueryBase:
 
     """
     if self.do_locking:
 
     """
     if self.do_locking:
-      names = lu.acquired_locks[lock_level]
+      names = lu.glm.list_owned(lock_level)
     else:
       names = all_names
 
     else:
       names = all_names
 
@@ -521,7 +510,7 @@ class _QueryBase:
 
     # caller specified names and we must keep the same order
     assert self.names
 
     # caller specified names and we must keep the same order
     assert self.names
-    assert not self.do_locking or lu.acquired_locks[lock_level]
+    assert not self.do_locking or lu.glm.is_owned(lock_level)
 
     missing = set(self.wanted).difference(names)
     if missing:
 
     missing = set(self.wanted).difference(names)
     if missing:
@@ -641,6 +630,51 @@ def _GetUpdatedParams(old_params, update_dict,
   return params_copy
 
 
   return params_copy
 
 
+def _ReleaseLocks(lu, level, names=None, keep=None):
+  """Releases locks owned by an LU.
+
+  @type lu: L{LogicalUnit}
+  @param level: Lock level
+  @type names: list or None
+  @param names: Names of locks to release
+  @type keep: list or None
+  @param keep: Names of locks to retain
+
+  """
+  assert not (keep is not None and names is not None), \
+         "Only one of the 'names' and the 'keep' parameters can be given"
+
+  if names is not None:
+    should_release = names.__contains__
+  elif keep:
+    should_release = lambda name: name not in keep
+  else:
+    should_release = None
+
+  if should_release:
+    retain = []
+    release = []
+
+    # Determine which locks to release
+    for name in lu.glm.list_owned(level):
+      if should_release(name):
+        release.append(name)
+      else:
+        retain.append(name)
+
+    assert len(lu.glm.list_owned(level)) == (len(retain) + len(release))
+
+    # Release just some locks
+    lu.glm.release(level, names=release)
+
+    assert frozenset(lu.glm.list_owned(level)) == frozenset(retain)
+  else:
+    # Release everything
+    lu.glm.release(level)
+
+    assert not lu.glm.is_owned(level), "No locks should be owned"
+
+
 def _RunPostHook(lu, node_name):
   """Runs the post-hook for an opcode on a single node.
 
 def _RunPostHook(lu, node_name):
   """Runs the post-hook for an opcode on a single node.
 
@@ -1265,6 +1299,7 @@ class LUClusterVerify(LogicalUnit):
 
   ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG")
   ECLUSTERCERT = (TCLUSTER, "ECLUSTERCERT")
 
   ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG")
   ECLUSTERCERT = (TCLUSTER, "ECLUSTERCERT")
+  ECLUSTERFILECHECK = (TCLUSTER, "ECLUSTERFILECHECK")
   EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE")
   EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN")
   EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT")
   EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE")
   EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN")
   EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT")
@@ -1457,7 +1492,7 @@ class LUClusterVerify(LogicalUnit):
                  hv_name, item, hv_result)
 
     test = nresult.get(constants.NV_NODESETUP,
                  hv_name, item, hv_result)
 
     test = nresult.get(constants.NV_NODESETUP,
-                           ["Missing NODESETUP results"])
+                       ["Missing NODESETUP results"])
     _ErrorIf(test, self.ENODESETUP, node, "node setup error: %s",
              "; ".join(test))
 
     _ErrorIf(test, self.ENODESETUP, node, "node setup error: %s",
              "; ".join(test))
 
@@ -1694,51 +1729,94 @@ class LUClusterVerify(LogicalUnit):
         test = n_img.mfree < needed_mem
         self._ErrorIf(test, self.ENODEN1, node,
                       "not enough memory to accomodate instance failovers"
         test = n_img.mfree < needed_mem
         self._ErrorIf(test, self.ENODEN1, node,
                       "not enough memory to accomodate instance failovers"
-                      " should node %s fail", prinode)
+                      " should node %s fail (%dMiB needed, %dMiB available)",
+                      prinode, needed_mem, n_img.mfree)
 
 
-  def _VerifyNodeFiles(self, ninfo, nresult, file_list, local_cksum,
-                       master_files):
-    """Verifies and computes the node required file checksums.
+  @classmethod
+  def _VerifyFiles(cls, errorif, nodeinfo, master_node, all_nvinfo,
+                   (files_all, files_all_opt, files_mc, files_vm)):
+    """Verifies file checksums collected from all nodes.
 
 
-    @type ninfo: L{objects.Node}
-    @param ninfo: the node to check
-    @param nresult: the remote results for the node
-    @param file_list: required list of files
-    @param local_cksum: dictionary of local files and their checksums
-    @param master_files: list of files that only masters should have
+    @param errorif: Callback for reporting errors
+    @param nodeinfo: List of L{objects.Node} objects
+    @param master_node: Name of master node
+    @param all_nvinfo: RPC results
 
     """
 
     """
-    node = ninfo.name
-    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
+    node_names = frozenset(node.name for node in nodeinfo)
 
 
-    remote_cksum = nresult.get(constants.NV_FILELIST, None)
-    test = not isinstance(remote_cksum, dict)
-    _ErrorIf(test, self.ENODEFILECHECK, node,
-             "node hasn't returned file checksum data")
-    if test:
-      return
+    assert master_node in node_names
+    assert (len(files_all | files_all_opt | files_mc | files_vm) ==
+            sum(map(len, [files_all, files_all_opt, files_mc, files_vm]))), \
+           "Found file listed in more than one file list"
 
 
-    for file_name in file_list:
-      node_is_mc = ninfo.master_candidate
-      must_have = (file_name not in master_files) or node_is_mc
-      # missing
-      test1 = file_name not in remote_cksum
-      # invalid checksum
-      test2 = not test1 and remote_cksum[file_name] != local_cksum[file_name]
-      # existing and good
-      test3 = not test1 and remote_cksum[file_name] == local_cksum[file_name]
-      _ErrorIf(test1 and must_have, self.ENODEFILECHECK, node,
-               "file '%s' missing", file_name)
-      _ErrorIf(test2 and must_have, self.ENODEFILECHECK, node,
-               "file '%s' has wrong checksum", file_name)
-      # not candidate and this is not a must-have file
-      _ErrorIf(test2 and not must_have, self.ENODEFILECHECK, node,
-               "file '%s' should not exist on non master"
-               " candidates (and the file is outdated)", file_name)
-      # all good, except non-master/non-must have combination
-      _ErrorIf(test3 and not must_have, self.ENODEFILECHECK, node,
-               "file '%s' should not exist"
-               " on non master candidates", file_name)
+    # Define functions determining which nodes to consider for a file
+    file2nodefn = dict([(filename, fn)
+      for (files, fn) in [(files_all, None),
+                          (files_all_opt, None),
+                          (files_mc, lambda node: (node.master_candidate or
+                                                   node.name == master_node)),
+                          (files_vm, lambda node: node.vm_capable)]
+      for filename in files])
+
+    fileinfo = dict((filename, {}) for filename in file2nodefn.keys())
+
+    for node in nodeinfo:
+      nresult = all_nvinfo[node.name]
+
+      if nresult.fail_msg or not nresult.payload:
+        node_files = None
+      else:
+        node_files = nresult.payload.get(constants.NV_FILELIST, None)
+
+      test = not (node_files and isinstance(node_files, dict))
+      errorif(test, cls.ENODEFILECHECK, node.name,
+              "Node did not return file checksum data")
+      if test:
+        continue
+
+      for (filename, checksum) in node_files.items():
+        # Check if the file should be considered for a node
+        fn = file2nodefn[filename]
+        if fn is None or fn(node):
+          fileinfo[filename].setdefault(checksum, set()).add(node.name)
+
+    for (filename, checksums) in fileinfo.items():
+      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
+
+      # Nodes having the file
+      with_file = frozenset(node_name
+                            for nodes in fileinfo[filename].values()
+                            for node_name in nodes)
+
+      # Nodes missing file
+      missing_file = node_names - with_file
+
+      if filename in files_all_opt:
+        # All or no nodes
+        errorif(missing_file and missing_file != node_names,
+                cls.ECLUSTERFILECHECK, None,
+                "File %s is optional, but it must exist on all or no nodes (not"
+                " found on %s)",
+                filename, utils.CommaJoin(utils.NiceSort(missing_file)))
+      else:
+        errorif(missing_file, cls.ECLUSTERFILECHECK, None,
+                "File %s is missing from node(s) %s", filename,
+                utils.CommaJoin(utils.NiceSort(missing_file)))
+
+      # See if there are multiple versions of the file
+      test = len(checksums) > 1
+      if test:
+        variants = ["variant %s on %s" %
+                    (idx + 1, utils.CommaJoin(utils.NiceSort(nodes)))
+                    for (idx, (checksum, nodes)) in
+                      enumerate(sorted(checksums.items()))]
+      else:
+        variants = []
+
+      errorif(test, cls.ECLUSTERFILECHECK, None,
+              "File %s found with %s different checksums (%s)",
+              filename, len(checksums), "; ".join(variants))
 
   def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
                       drbd_map):
 
   def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
                       drbd_map):
@@ -1858,6 +1936,7 @@ class LUClusterVerify(LogicalUnit):
 
     assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
 
 
     assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
 
+    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
     for os_name, os_data in nimg.oslist.items():
       assert os_data, "Empty OS status for OS %s?!" % os_name
       f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
     for os_name, os_data in nimg.oslist.items():
       assert os_data, "Empty OS status for OS %s?!" % os_name
       f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
@@ -1885,11 +1964,12 @@ class LUClusterVerify(LogicalUnit):
         continue
       for kind, a, b in [("API version", f_api, b_api),
                          ("variants list", f_var, b_var),
         continue
       for kind, a, b in [("API version", f_api, b_api),
                          ("variants list", f_var, b_var),
-                         ("parameters", f_param, b_param)]:
+                         ("parameters", beautify_params(f_param),
+                          beautify_params(b_param))]:
         _ErrorIf(a != b, self.ENODEOS, node,
         _ErrorIf(a != b, self.ENODEOS, node,
-                 "OS %s %s differs from reference node %s: %s vs. %s",
+                 "OS %s for %s differs from reference node %s: [%s] vs. [%s]",
                  kind, os_name, base.name,
                  kind, os_name, base.name,
-                 utils.CommaJoin(a), utils.CommaJoin(b))
+                 utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
 
     # check any missing OSes
     missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
 
     # check any missing OSes
     missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
@@ -2173,19 +2253,14 @@ class LUClusterVerify(LogicalUnit):
     node_vol_should = {}
 
     # FIXME: verify OS list
     node_vol_should = {}
 
     # FIXME: verify OS list
+
+    # File verification
+    filemap = _ComputeAncillaryFiles(cluster, False)
+
     # do local checksums
     # do local checksums
-    master_files = [constants.CLUSTER_CONF_FILE]
     master_node = self.master_node = self.cfg.GetMasterNode()
     master_ip = self.cfg.GetMasterIP()
 
     master_node = self.master_node = self.cfg.GetMasterNode()
     master_ip = self.cfg.GetMasterIP()
 
-    file_names = ssconf.SimpleStore().GetFileList()
-    file_names.extend(constants.ALL_CERT_FILES)
-    file_names.extend(master_files)
-    if cluster.modify_etc_hosts:
-      file_names.append(constants.ETC_HOSTS)
-
-    local_checksums = utils.FingerprintFiles(file_names)
-
     # Compute the set of hypervisor parameters
     hvp_data = []
     for hv_name in hypervisors:
     # Compute the set of hypervisor parameters
     hvp_data = []
     for hv_name in hypervisors:
@@ -2207,7 +2282,10 @@ class LUClusterVerify(LogicalUnit):
 
     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
     node_verify_param = {
 
     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
     node_verify_param = {
-      constants.NV_FILELIST: file_names,
+      constants.NV_FILELIST:
+        utils.UniqueSequence(filename
+                             for files in filemap
+                             for filename in files),
       constants.NV_NODELIST: [node.name for node in nodeinfo
                               if not node.offline],
       constants.NV_HYPERVISOR: hypervisors,
       constants.NV_NODELIST: [node.name for node in nodeinfo
                               if not node.offline],
       constants.NV_HYPERVISOR: hypervisors,
@@ -2289,6 +2367,9 @@ class LUClusterVerify(LogicalUnit):
     feedback_fn("* Gathering disk information (%s nodes)" % len(nodelist))
     instdisk = self._CollectDiskInfo(nodelist, node_image, instanceinfo)
 
     feedback_fn("* Gathering disk information (%s nodes)" % len(nodelist))
     instdisk = self._CollectDiskInfo(nodelist, node_image, instanceinfo)
 
+    feedback_fn("* Verifying configuration file consistency")
+    self._VerifyFiles(_ErrorIf, nodeinfo, master_node, all_nvinfo, filemap)
+
     feedback_fn("* Verifying node status")
 
     refos_img = None
     feedback_fn("* Verifying node status")
 
     refos_img = None
@@ -2326,9 +2407,6 @@ class LUClusterVerify(LogicalUnit):
       nimg.call_ok = self._VerifyNode(node_i, nresult)
       self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
       self._VerifyNodeNetwork(node_i, nresult)
       nimg.call_ok = self._VerifyNode(node_i, nresult)
       self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
       self._VerifyNodeNetwork(node_i, nresult)
-      self._VerifyNodeFiles(node_i, nresult, file_names, local_checksums,
-                            master_files)
-
       self._VerifyOob(node_i, nresult)
 
       if nimg.vm_capable:
       self._VerifyOob(node_i, nresult)
 
       if nimg.vm_capable:
@@ -2571,10 +2649,7 @@ class LUClusterRepairDiskSizes(NoHooksLU):
 
   def ExpandNames(self):
     if self.op.instances:
 
   def ExpandNames(self):
     if self.op.instances:
-      self.wanted_names = []
-      for name in self.op.instances:
-        full_name = _ExpandInstanceName(self.cfg, name)
-        self.wanted_names.append(full_name)
+      self.wanted_names = _GetWantedInstances(self, self.op.instances)
       self.needed_locks = {
         locking.LEVEL_NODE: [],
         locking.LEVEL_INSTANCE: self.wanted_names,
       self.needed_locks = {
         locking.LEVEL_NODE: [],
         locking.LEVEL_INSTANCE: self.wanted_names,
@@ -2586,7 +2661,7 @@ class LUClusterRepairDiskSizes(NoHooksLU):
         locking.LEVEL_NODE: locking.ALL_SET,
         locking.LEVEL_INSTANCE: locking.ALL_SET,
         }
         locking.LEVEL_NODE: locking.ALL_SET,
         locking.LEVEL_INSTANCE: locking.ALL_SET,
         }
-    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
+    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
 
   def DeclareLocks(self, level):
     if level == locking.LEVEL_NODE and self.wanted_names is not None:
 
   def DeclareLocks(self, level):
     if level == locking.LEVEL_NODE and self.wanted_names is not None:
@@ -2599,7 +2674,7 @@ class LUClusterRepairDiskSizes(NoHooksLU):
 
     """
     if self.wanted_names is None:
 
     """
     if self.wanted_names is None:
-      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
+      self.wanted_names = self.glm.list_owned(locking.LEVEL_INSTANCE)
 
     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
                              in self.wanted_names]
 
     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
                              in self.wanted_names]
@@ -2824,7 +2899,7 @@ class LUClusterSetParams(LogicalUnit):
                                    " drbd-based instances exist",
                                    errors.ECODE_INVAL)
 
                                    " drbd-based instances exist",
                                    errors.ECODE_INVAL)
 
-    node_list = self.acquired_locks[locking.LEVEL_NODE]
+    node_list = self.glm.list_owned(locking.LEVEL_NODE)
 
     # if vg_name not None, checks given volume group on all nodes
     if self.op.vg_name:
 
     # if vg_name not None, checks given volume group on all nodes
     if self.op.vg_name:
@@ -2871,6 +2946,12 @@ class LUClusterSetParams(LogicalUnit):
       utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
       self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
 
       utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
       self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
 
+      # TODO: we need a more general way to handle resetting
+      # cluster-level parameters to default values
+      if self.new_ndparams["oob_program"] == "":
+        self.new_ndparams["oob_program"] = \
+            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
+
     if self.op.nicparams:
       utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
       self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
     if self.op.nicparams:
       utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
       self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
@@ -3112,6 +3193,50 @@ def _UploadHelper(lu, nodes, fname):
         lu.proc.LogWarning(msg)
 
 
         lu.proc.LogWarning(msg)
 
 
+def _ComputeAncillaryFiles(cluster, redist):
+  """Compute files external to Ganeti which need to be consistent.
+
+  @type redist: boolean
+  @param redist: Whether to include files which need to be redistributed
+
+  """
+  # Compute files for all nodes
+  files_all = set([
+    constants.SSH_KNOWN_HOSTS_FILE,
+    constants.CONFD_HMAC_KEY,
+    constants.CLUSTER_DOMAIN_SECRET_FILE,
+    ])
+
+  if not redist:
+    files_all.update(constants.ALL_CERT_FILES)
+    files_all.update(ssconf.SimpleStore().GetFileList())
+
+  if cluster.modify_etc_hosts:
+    files_all.add(constants.ETC_HOSTS)
+
+  # Files which must either exist on all nodes or on none
+  files_all_opt = set([
+    constants.RAPI_USERS_FILE,
+    ])
+
+  # Files which should only be on master candidates
+  files_mc = set()
+  if not redist:
+    files_mc.add(constants.CLUSTER_CONF_FILE)
+
+  # Files which should only be on VM-capable nodes
+  files_vm = set(filename
+    for hv_name in cluster.enabled_hypervisors
+    for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles())
+
+  # Filenames must be unique
+  assert (len(files_all | files_all_opt | files_mc | files_vm) ==
+          sum(map(len, [files_all, files_all_opt, files_mc, files_vm]))), \
+         "Found file listed in more than one file list"
+
+  return (files_all, files_all_opt, files_mc, files_vm)
+
+
 def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True):
   """Distribute additional files which are part of the cluster configuration.
 
 def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True):
   """Distribute additional files which are part of the cluster configuration.
 
@@ -3125,40 +3250,42 @@ def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True):
   @param additional_vm: whether the additional nodes are vm-capable or not
 
   """
   @param additional_vm: whether the additional nodes are vm-capable or not
 
   """
-  # 1. Gather target nodes
-  myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
-  dist_nodes = lu.cfg.GetOnlineNodeList()
-  nvm_nodes = lu.cfg.GetNonVmCapableNodeList()
-  vm_nodes = [name for name in dist_nodes if name not in nvm_nodes]
+  # Gather target nodes
+  cluster = lu.cfg.GetClusterInfo()
+  master_info = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
+
+  online_nodes = lu.cfg.GetOnlineNodeList()
+  vm_nodes = lu.cfg.GetVmCapableNodeList()
+
   if additional_nodes is not None:
   if additional_nodes is not None:
-    dist_nodes.extend(additional_nodes)
+    online_nodes.extend(additional_nodes)
     if additional_vm:
       vm_nodes.extend(additional_nodes)
     if additional_vm:
       vm_nodes.extend(additional_nodes)
-  if myself.name in dist_nodes:
-    dist_nodes.remove(myself.name)
-  if myself.name in vm_nodes:
-    vm_nodes.remove(myself.name)
-
-  # 2. Gather files to distribute
-  dist_files = set([constants.ETC_HOSTS,
-                    constants.SSH_KNOWN_HOSTS_FILE,
-                    constants.RAPI_CERT_FILE,
-                    constants.RAPI_USERS_FILE,
-                    constants.CONFD_HMAC_KEY,
-                    constants.CLUSTER_DOMAIN_SECRET_FILE,
-                   ])
-
-  vm_files = set()
-  enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
-  for hv_name in enabled_hypervisors:
-    hv_class = hypervisor.GetHypervisor(hv_name)
-    vm_files.update(hv_class.GetAncillaryFiles())
-
-  # 3. Perform the files upload
-  for fname in dist_files:
-    _UploadHelper(lu, dist_nodes, fname)
-  for fname in vm_files:
-    _UploadHelper(lu, vm_nodes, fname)
+
+  # Never distribute to master node
+  for nodelist in [online_nodes, vm_nodes]:
+    if master_info.name in nodelist:
+      nodelist.remove(master_info.name)
+
+  # Gather file lists
+  (files_all, files_all_opt, files_mc, files_vm) = \
+    _ComputeAncillaryFiles(cluster, True)
+
+  # Never re-distribute configuration file from here
+  assert not (constants.CLUSTER_CONF_FILE in files_all or
+              constants.CLUSTER_CONF_FILE in files_vm)
+  assert not files_mc, "Master candidates not handled in this function"
+
+  filemap = [
+    (online_nodes, files_all),
+    (online_nodes, files_all_opt),
+    (vm_nodes, files_vm),
+    ]
+
+  # Upload the files
+  for (node_list, files) in filemap:
+    for fname in files:
+      _UploadHelper(lu, node_list, fname)
 
 
 class LUClusterRedistConf(NoHooksLU):
 
 
 class LUClusterRedistConf(NoHooksLU):
@@ -3299,6 +3426,20 @@ class LUOobCommand(NoHooksLU):
   REG_BGL = False
   _SKIP_MASTER = (constants.OOB_POWER_OFF, constants.OOB_POWER_CYCLE)
 
   REG_BGL = False
   _SKIP_MASTER = (constants.OOB_POWER_OFF, constants.OOB_POWER_CYCLE)
 
+  def ExpandNames(self):
+    """Gather locks we need.
+
+    """
+    if self.op.node_names:
+      self.op.node_names = _GetWantedNodes(self, self.op.node_names)
+      lock_names = self.op.node_names
+    else:
+      lock_names = locking.ALL_SET
+
+    self.needed_locks = {
+      locking.LEVEL_NODE: lock_names,
+      }
+
   def CheckPrereq(self):
     """Check prerequisites.
 
   def CheckPrereq(self):
     """Check prerequisites.
 
@@ -3315,23 +3456,23 @@ class LUOobCommand(NoHooksLU):
     assert self.op.power_delay >= 0.0
 
     if self.op.node_names:
     assert self.op.power_delay >= 0.0
 
     if self.op.node_names:
-      if self.op.command in self._SKIP_MASTER:
-        if self.master_node in self.op.node_names:
-          master_node_obj = self.cfg.GetNodeInfo(self.master_node)
-          master_oob_handler = _SupportsOob(self.cfg, master_node_obj)
-
-          if master_oob_handler:
-            additional_text = ("Run '%s %s %s' if you want to operate on the"
-                               " master regardless") % (master_oob_handler,
-                                                        self.op.command,
-                                                        self.master_node)
-          else:
-            additional_text = "The master node does not support out-of-band"
+      if (self.op.command in self._SKIP_MASTER and
+          self.master_node in self.op.node_names):
+        master_node_obj = self.cfg.GetNodeInfo(self.master_node)
+        master_oob_handler = _SupportsOob(self.cfg, master_node_obj)
+
+        if master_oob_handler:
+          additional_text = ("run '%s %s %s' if you want to operate on the"
+                             " master regardless") % (master_oob_handler,
+                                                      self.op.command,
+                                                      self.master_node)
+        else:
+          additional_text = "it does not support out-of-band operations"
 
 
-          raise errors.OpPrereqError(("Operating on the master node %s is not"
-                                      " allowed for %s\n%s") %
-                                     (self.master_node, self.op.command,
-                                      additional_text), errors.ECODE_INVAL)
+        raise errors.OpPrereqError(("Operating on the master node %s is not"
+                                    " allowed for %s; %s") %
+                                   (self.master_node, self.op.command,
+                                    additional_text), errors.ECODE_INVAL)
     else:
       self.op.node_names = self.cfg.GetNodeList()
       if self.op.command in self._SKIP_MASTER:
     else:
       self.op.node_names = self.cfg.GetNodeList()
       if self.op.command in self._SKIP_MASTER:
@@ -3355,21 +3496,6 @@ class LUOobCommand(NoHooksLU):
                                     " not marked offline") % node_name,
                                    errors.ECODE_STATE)
 
                                     " not marked offline") % node_name,
                                    errors.ECODE_STATE)
 
-  def ExpandNames(self):
-    """Gather locks we need.
-
-    """
-    if self.op.node_names:
-      self.op.node_names = [_ExpandNodeName(self.cfg, name)
-                            for name in self.op.node_names]
-      lock_names = self.op.node_names
-    else:
-      lock_names = locking.ALL_SET
-
-    self.needed_locks = {
-      locking.LEVEL_NODE: lock_names,
-      }
-
   def Exec(self, feedback_fn):
     """Execute OOB and return result if we expect any.
 
   def Exec(self, feedback_fn):
     """Execute OOB and return result if we expect any.
 
@@ -3377,7 +3503,8 @@ class LUOobCommand(NoHooksLU):
     master_node = self.master_node
     ret = []
 
     master_node = self.master_node
     ret = []
 
-    for idx, node in enumerate(self.nodes):
+    for idx, node in enumerate(utils.NiceSort(self.nodes,
+                                              key=lambda node: node.name)):
       node_entry = [(constants.RS_NORMAL, node.name)]
       ret.append(node_entry)
 
       node_entry = [(constants.RS_NORMAL, node.name)]
       ret.append(node_entry)
 
@@ -3394,14 +3521,14 @@ class LUOobCommand(NoHooksLU):
                                      self.op.timeout)
 
       if result.fail_msg:
                                      self.op.timeout)
 
       if result.fail_msg:
-        self.LogWarning("On node '%s' out-of-band RPC failed with: %s",
+        self.LogWarning("Out-of-band RPC failed on node '%s': %s",
                         node.name, result.fail_msg)
         node_entry.append((constants.RS_NODATA, None))
       else:
         try:
           self._CheckPayload(result)
         except errors.OpExecError, err:
                         node.name, result.fail_msg)
         node_entry.append((constants.RS_NODATA, None))
       else:
         try:
           self._CheckPayload(result)
         except errors.OpExecError, err:
-          self.LogWarning("The payload returned by '%s' is not valid: %s",
+          self.LogWarning("Payload returned by node '%s' is not valid: %s",
                           node.name, err)
           node_entry.append((constants.RS_NODATA, None))
         else:
                           node.name, err)
           node_entry.append((constants.RS_NODATA, None))
         else:
@@ -3410,8 +3537,8 @@ class LUOobCommand(NoHooksLU):
             for item, status in result.payload:
               if status in [constants.OOB_STATUS_WARNING,
                             constants.OOB_STATUS_CRITICAL]:
             for item, status in result.payload:
               if status in [constants.OOB_STATUS_WARNING,
                             constants.OOB_STATUS_CRITICAL]:
-                self.LogWarning("On node '%s' item '%s' has status '%s'",
-                                node.name, item, status)
+                self.LogWarning("Item '%s' on node '%s' has status '%s'",
+                                item, node.name, status)
 
           if self.op.command == constants.OOB_POWER_ON:
             node.powered = True
 
           if self.op.command == constants.OOB_POWER_ON:
             node.powered = True
@@ -3540,7 +3667,10 @@ class _OsQuery(_QueryBase):
 
     """
     # Locking is not used
 
     """
     # Locking is not used
-    assert not (lu.acquired_locks or self.do_locking or self.use_locking)
+    assert not (compat.any(lu.glm.is_owned(level)
+                           for level in locking.LEVELS
+                           if level != locking.LEVEL_CLUSTER) or
+                self.do_locking or self.use_locking)
 
     valid_nodes = [node.name
                    for node in lu.cfg.GetAllNodesInfo().values()
 
     valid_nodes = [node.name
                    for node in lu.cfg.GetAllNodesInfo().values()
@@ -3681,15 +3811,14 @@ class LUNodeRemove(LogicalUnit):
 
     masternode = self.cfg.GetMasterNode()
     if node.name == masternode:
 
     masternode = self.cfg.GetMasterNode()
     if node.name == masternode:
-      raise errors.OpPrereqError("Node is the master node,"
-                                 " you need to failover first.",
-                                 errors.ECODE_INVAL)
+      raise errors.OpPrereqError("Node is the master node, failover to another"
+                                 " node is required", errors.ECODE_INVAL)
 
     for instance_name in instance_list:
       instance = self.cfg.GetInstanceInfo(instance_name)
       if node.name in instance.all_nodes:
         raise errors.OpPrereqError("Instance %s is still running on the node,"
 
     for instance_name in instance_list:
       instance = self.cfg.GetInstanceInfo(instance_name)
       if node.name in instance.all_nodes:
         raise errors.OpPrereqError("Instance %s is still running on the node,"
-                                   " please remove first." % instance_name,
+                                   " please remove first" % instance_name,
                                    errors.ECODE_INVAL)
     self.op.node_name = node.name
     self.node = node
                                    errors.ECODE_INVAL)
     self.op.node_name = node.name
     self.node = node
@@ -3847,7 +3976,7 @@ class LUNodeQueryvols(NoHooksLU):
     """Computes the list of nodes and their attributes.
 
     """
     """Computes the list of nodes and their attributes.
 
     """
-    nodenames = self.acquired_locks[locking.LEVEL_NODE]
+    nodenames = self.glm.list_owned(locking.LEVEL_NODE)
     volumes = self.rpc.call_node_volumes(nodenames)
 
     ilist = [self.cfg.GetInstanceInfo(iname) for iname
     volumes = self.rpc.call_node_volumes(nodenames)
 
     ilist = [self.cfg.GetInstanceInfo(iname) for iname
@@ -3925,7 +4054,7 @@ class LUNodeQueryStorage(NoHooksLU):
     """Computes the list of nodes and their attributes.
 
     """
     """Computes the list of nodes and their attributes.
 
     """
-    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
+    self.nodes = self.glm.list_owned(locking.LEVEL_NODE)
 
     # Always get name to sort by
     if constants.SF_NAME in self.op.output_fields:
 
     # Always get name to sort by
     if constants.SF_NAME in self.op.output_fields:
@@ -4037,10 +4166,16 @@ class _InstanceQuery(_QueryBase):
           bad_nodes.append(name)
         elif result.payload:
           for inst in result.payload:
           bad_nodes.append(name)
         elif result.payload:
           for inst in result.payload:
-            if all_info[inst].primary_node == name:
-              live_data.update(result.payload)
+            if inst in all_info:
+              if all_info[inst].primary_node == name:
+                live_data.update(result.payload)
+              else:
+                wrongnode_inst.add(inst)
             else:
             else:
-              wrongnode_inst.add(inst)
+              # orphan instance; we don't list it here as we don't
+              # handle this case yet in the output of instance listing
+              logging.warning("Orphan instance '%s' found on node %s",
+                              inst, name)
         # else no instance is alive
     else:
       live_data = {}
         # else no instance is alive
     else:
       live_data = {}
@@ -4166,6 +4301,11 @@ class LUNodeAdd(LogicalUnit):
     self.hostname = netutils.GetHostname(name=self.op.node_name,
                                          family=self.primary_ip_family)
     self.op.node_name = self.hostname.name
     self.hostname = netutils.GetHostname(name=self.op.node_name,
                                          family=self.primary_ip_family)
     self.op.node_name = self.hostname.name
+
+    if self.op.readd and self.op.node_name == self.cfg.GetMasterNode():
+      raise errors.OpPrereqError("Cannot readd the master node",
+                                 errors.ECODE_STATE)
+
     if self.op.readd and self.op.group:
       raise errors.OpPrereqError("Cannot pass a node group when a node is"
                                  " being readded", errors.ECODE_INVAL)
     if self.op.readd and self.op.group:
       raise errors.OpPrereqError("Cannot pass a node group when a node is"
                                  " being readded", errors.ECODE_INVAL)
@@ -4401,7 +4541,7 @@ class LUNodeAdd(LogicalUnit):
           feedback_fn("ssh/hostname verification failed"
                       " (checking from %s): %s" %
                       (verifier, nl_payload[failed]))
           feedback_fn("ssh/hostname verification failed"
                       " (checking from %s): %s" %
                       (verifier, nl_payload[failed]))
-        raise errors.OpExecError("ssh/hostname verification failed.")
+        raise errors.OpExecError("ssh/hostname verification failed")
 
     if self.op.readd:
       _RedistributeAncillaryFiles(self)
 
     if self.op.readd:
       _RedistributeAncillaryFiles(self)
@@ -4484,21 +4624,22 @@ class LUNodeSetParams(LogicalUnit):
     # If we have locked all instances, before waiting to lock nodes, release
     # all the ones living on nodes unrelated to the current operation.
     if level == locking.LEVEL_NODE and self.lock_instances:
     # If we have locked all instances, before waiting to lock nodes, release
     # all the ones living on nodes unrelated to the current operation.
     if level == locking.LEVEL_NODE and self.lock_instances:
-      instances_release = []
-      instances_keep = []
       self.affected_instances = []
       if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
       self.affected_instances = []
       if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
-        for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
+        instances_keep = []
+
+        # Build list of instances to release
+        for instance_name in self.glm.list_owned(locking.LEVEL_INSTANCE):
           instance = self.context.cfg.GetInstanceInfo(instance_name)
           instance = self.context.cfg.GetInstanceInfo(instance_name)
-          i_mirrored = instance.disk_template in constants.DTS_INT_MIRROR
-          if i_mirrored and self.op.node_name in instance.all_nodes:
+          if (instance.disk_template in constants.DTS_INT_MIRROR and
+              self.op.node_name in instance.all_nodes):
             instances_keep.append(instance_name)
             self.affected_instances.append(instance)
             instances_keep.append(instance_name)
             self.affected_instances.append(instance)
-          else:
-            instances_release.append(instance_name)
-        if instances_release:
-          self.context.glm.release(locking.LEVEL_INSTANCE, instances_release)
-          self.acquired_locks[locking.LEVEL_INSTANCE] = instances_keep
+
+        _ReleaseLocks(self, locking.LEVEL_INSTANCE, keep=instances_keep)
+
+        assert (set(self.glm.list_owned(locking.LEVEL_INSTANCE)) ==
+                set(instances_keep))
 
   def BuildHooksEnv(self):
     """Build hooks env.
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -4564,7 +4705,7 @@ class LUNodeSetParams(LogicalUnit):
 
     self.old_flags = old_flags = (node.master_candidate,
                                   node.drained, node.offline)
 
     self.old_flags = old_flags = (node.master_candidate,
                                   node.drained, node.offline)
-    assert old_flags in self._F2R, "Un-handled old flags  %s" % str(old_flags)
+    assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags)
     self.old_role = old_role = self._F2R[old_flags]
 
     # Check for ineffective changes
     self.old_role = old_role = self._F2R[old_flags]
 
     # Check for ineffective changes
@@ -4580,12 +4721,12 @@ class LUNodeSetParams(LogicalUnit):
     if _SupportsOob(self.cfg, node):
       if self.op.offline is False and not (node.powered or
                                            self.op.powered == True):
     if _SupportsOob(self.cfg, node):
       if self.op.offline is False and not (node.powered or
                                            self.op.powered == True):
-        raise errors.OpPrereqError(("Please power on node %s first before you"
-                                    " can reset offline state") %
+        raise errors.OpPrereqError(("Node %s needs to be turned on before its"
+                                    " offline status can be reset") %
                                    self.op.node_name)
     elif self.op.powered is not None:
       raise errors.OpPrereqError(("Unable to change powered state for node %s"
                                    self.op.node_name)
     elif self.op.powered is not None:
       raise errors.OpPrereqError(("Unable to change powered state for node %s"
-                                  " which does not support out-of-band"
+                                  " as it does not support out-of-band"
                                   " handling") % self.op.node_name)
 
     # If we're being deofflined/drained, we'll MC ourself if needed
                                   " handling") % self.op.node_name)
 
     # If we're being deofflined/drained, we'll MC ourself if needed
@@ -5596,7 +5737,7 @@ class LUInstanceRecreateDisks(LogicalUnit):
     else:
       for idx in self.op.disks:
         if idx >= len(instance.disks):
     else:
       for idx in self.op.disks:
         if idx >= len(instance.disks):
-          raise errors.OpPrereqError("Invalid disk index passed '%s'" % idx,
+          raise errors.OpPrereqError("Invalid disk index '%s'" % idx,
                                      errors.ECODE_INVAL)
 
     self.instance = instance
                                      errors.ECODE_INVAL)
 
     self.instance = instance
@@ -5627,7 +5768,7 @@ class LUInstanceRename(LogicalUnit):
     """
     if self.op.ip_check and not self.op.name_check:
       # TODO: make the ip check more flexible and not depend on the name check
     """
     if self.op.ip_check and not self.op.name_check:
       # TODO: make the ip check more flexible and not depend on the name check
-      raise errors.OpPrereqError("Cannot do ip check without a name check",
+      raise errors.OpPrereqError("IP address check requires a name check",
                                  errors.ECODE_INVAL)
 
   def BuildHooksEnv(self):
                                  errors.ECODE_INVAL)
 
   def BuildHooksEnv(self):
@@ -5697,9 +5838,11 @@ class LUInstanceRename(LogicalUnit):
       rename_file_storage = True
 
     self.cfg.RenameInstance(inst.name, self.op.new_name)
       rename_file_storage = True
 
     self.cfg.RenameInstance(inst.name, self.op.new_name)
-    # Change the instance lock. This is definitely safe while we hold the BGL
-    self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
-    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
+    # Change the instance lock. This is definitely safe while we hold the BGL.
+    # Otherwise the new lock would have to be added in acquired mode.
+    assert self.REQ_BGL
+    self.glm.remove(locking.LEVEL_INSTANCE, old_name)
+    self.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
 
     # re-read the instance from the configuration after rename
     inst = self.cfg.GetInstanceInfo(self.op.new_name)
 
     # re-read the instance from the configuration after rename
     inst = self.cfg.GetInstanceInfo(self.op.new_name)
@@ -5864,6 +6007,15 @@ class LUInstanceFailover(LogicalUnit):
     self.needed_locks[locking.LEVEL_NODE] = []
     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
 
     self.needed_locks[locking.LEVEL_NODE] = []
     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
 
+    ignore_consistency = self.op.ignore_consistency
+    shutdown_timeout = self.op.shutdown_timeout
+    self._migrater = TLMigrateInstance(self, self.op.instance_name,
+                                       cleanup=False,
+                                       failover=True,
+                                       ignore_consistency=ignore_consistency,
+                                       shutdown_timeout=shutdown_timeout)
+    self.tasklets = [self._migrater]
+
   def DeclareLocks(self, level):
     if level == locking.LEVEL_NODE:
       instance = self.context.cfg.GetInstanceInfo(self.op.instance_name)
   def DeclareLocks(self, level):
     if level == locking.LEVEL_NODE:
       instance = self.context.cfg.GetInstanceInfo(self.op.instance_name)
@@ -5883,13 +6035,14 @@ class LUInstanceFailover(LogicalUnit):
     This runs on master, primary and secondary nodes of the instance.
 
     """
     This runs on master, primary and secondary nodes of the instance.
 
     """
-    instance = self.instance
+    instance = self._migrater.instance
     source_node = instance.primary_node
     source_node = instance.primary_node
+    target_node = self.op.target_node
     env = {
       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
       "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
       "OLD_PRIMARY": source_node,
     env = {
       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
       "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
       "OLD_PRIMARY": source_node,
-      "NEW_PRIMARY": self.op.target_node,
+      "NEW_PRIMARY": target_node,
       }
 
     if instance.disk_template in constants.DTS_INT_MIRROR:
       }
 
     if instance.disk_template in constants.DTS_INT_MIRROR:
@@ -5906,171 +6059,9 @@ class LUInstanceFailover(LogicalUnit):
     """Build hooks nodes.
 
     """
     """Build hooks nodes.
 
     """
-    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
-    return (nl, nl + [self.instance.primary_node])
-
-  def CheckPrereq(self):
-    """Check prerequisites.
-
-    This checks that the instance is in the cluster.
-
-    """
-    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
-    assert self.instance is not None, \
-      "Cannot retrieve locked instance %s" % self.op.instance_name
-
-    bep = self.cfg.GetClusterInfo().FillBE(instance)
-    if instance.disk_template not in constants.DTS_MIRRORED:
-      raise errors.OpPrereqError("Instance's disk layout is not"
-                                 " mirrored, cannot failover.",
-                                 errors.ECODE_STATE)
-
-    if instance.disk_template in constants.DTS_EXT_MIRROR:
-      _CheckIAllocatorOrNode(self, "iallocator", "target_node")
-      if self.op.iallocator:
-        self._RunAllocator()
-        # Release all unnecessary node locks
-        nodes_keep = [instance.primary_node, self.op.target_node]
-        nodes_rel = [node for node in self.acquired_locks[locking.LEVEL_NODE]
-                     if node not in nodes_keep]
-        self.context.glm.release(locking.LEVEL_NODE, nodes_rel)
-        self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
-
-      # self.op.target_node is already populated, either directly or by the
-      # iallocator run
-      target_node = self.op.target_node
-
-    else:
-      secondary_nodes = instance.secondary_nodes
-      if not secondary_nodes:
-        raise errors.ConfigurationError("No secondary node but using"
-                                        " %s disk template" %
-                                        instance.disk_template)
-      target_node = secondary_nodes[0]
-
-      if self.op.iallocator or (self.op.target_node and
-                                self.op.target_node != target_node):
-        raise errors.OpPrereqError("Instances with disk template %s cannot"
-                                   " be failed over to arbitrary nodes"
-                                   " (neither an iallocator nor a target"
-                                   " node can be passed)" %
-                                   instance.disk_template, errors.ECODE_INVAL)
-    _CheckNodeOnline(self, target_node)
-    _CheckNodeNotDrained(self, target_node)
-
-    # Save target_node so that we can use it in BuildHooksEnv
-    self.op.target_node = target_node
-
-    if instance.admin_up:
-      # check memory requirements on the secondary node
-      _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
-                           instance.name, bep[constants.BE_MEMORY],
-                           instance.hypervisor)
-    else:
-      self.LogInfo("Not checking memory on the secondary node as"
-                   " instance will not be started")
-
-    # check bridge existance
-    _CheckInstanceBridgesExist(self, instance, node=target_node)
-
-  def Exec(self, feedback_fn):
-    """Failover an instance.
-
-    The failover is done by shutting it down on its present node and
-    starting it on the secondary.
-
-    """
-    instance = self.instance
-    primary_node = self.cfg.GetNodeInfo(instance.primary_node)
-
-    source_node = instance.primary_node
-    target_node = self.op.target_node
-
-    if instance.admin_up:
-      feedback_fn("* checking disk consistency between source and target")
-      for dev in instance.disks:
-        # for drbd, these are drbd over lvm
-        if not _CheckDiskConsistency(self, dev, target_node, False):
-          if not self.op.ignore_consistency:
-            raise errors.OpExecError("Disk %s is degraded on target node,"
-                                     " aborting failover." % dev.iv_name)
-    else:
-      feedback_fn("* not checking disk consistency as instance is not running")
-
-    feedback_fn("* shutting down instance on source node")
-    logging.info("Shutting down instance %s on node %s",
-                 instance.name, source_node)
-
-    result = self.rpc.call_instance_shutdown(source_node, instance,
-                                             self.op.shutdown_timeout)
-    msg = result.fail_msg
-    if msg:
-      if self.op.ignore_consistency or primary_node.offline:
-        self.proc.LogWarning("Could not shutdown instance %s on node %s."
-                             " Proceeding anyway. Please make sure node"
-                             " %s is down. Error details: %s",
-                             instance.name, source_node, source_node, msg)
-      else:
-        raise errors.OpExecError("Could not shutdown instance %s on"
-                                 " node %s: %s" %
-                                 (instance.name, source_node, msg))
-
-    feedback_fn("* deactivating the instance's disks on source node")
-    if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
-      raise errors.OpExecError("Can't shut down the instance's disks.")
-
-    instance.primary_node = target_node
-    # distribute new instance config to the other nodes
-    self.cfg.Update(instance, feedback_fn)
-
-    # Only start the instance if it's marked as up
-    if instance.admin_up:
-      feedback_fn("* activating the instance's disks on target node")
-      logging.info("Starting instance %s on node %s",
-                   instance.name, target_node)
-
-      disks_ok, _ = _AssembleInstanceDisks(self, instance,
-                                           ignore_secondaries=True)
-      if not disks_ok:
-        _ShutdownInstanceDisks(self, instance)
-        raise errors.OpExecError("Can't activate the instance's disks")
-
-      feedback_fn("* starting the instance on the target node")
-      result = self.rpc.call_instance_start(target_node, instance, None, None)
-      msg = result.fail_msg
-      if msg:
-        _ShutdownInstanceDisks(self, instance)
-        raise errors.OpExecError("Could not start instance %s on node %s: %s" %
-                                 (instance.name, target_node, msg))
-
-  def _RunAllocator(self):
-    """Run the allocator based on input opcode.
-
-    """
-    ial = IAllocator(self.cfg, self.rpc,
-                     mode=constants.IALLOCATOR_MODE_RELOC,
-                     name=self.instance.name,
-                     # TODO See why hail breaks with a single node below
-                     relocate_from=[self.instance.primary_node,
-                                    self.instance.primary_node],
-                     )
-
-    ial.Run(self.op.iallocator)
-
-    if not ial.success:
-      raise errors.OpPrereqError("Can't compute nodes using"
-                                 " iallocator '%s': %s" %
-                                 (self.op.iallocator, ial.info),
-                                 errors.ECODE_NORES)
-    if len(ial.result) != ial.required_nodes:
-      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
-                                 " of nodes (%s), required %s" %
-                                 (self.op.iallocator, len(ial.result),
-                                  ial.required_nodes), errors.ECODE_FAULT)
-    self.op.target_node = ial.result[0]
-    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
-                 self.instance.name, self.op.iallocator,
-                 utils.CommaJoin(ial.result))
+    instance = self._migrater.instance
+    nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
+    return (nl, nl + [instance.primary_node])
 
 
 class LUInstanceMigrate(LogicalUnit):
 
 
 class LUInstanceMigrate(LogicalUnit):
@@ -6094,8 +6085,9 @@ class LUInstanceMigrate(LogicalUnit):
     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
 
     self._migrater = TLMigrateInstance(self, self.op.instance_name,
     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
 
     self._migrater = TLMigrateInstance(self, self.op.instance_name,
-                                       self.op.cleanup, self.op.iallocator,
-                                       self.op.target_node)
+                                       cleanup=self.op.cleanup,
+                                       failover=False,
+                                       fallback=self.op.allow_failover)
     self.tasklets = [self._migrater]
 
   def DeclareLocks(self, level):
     self.tasklets = [self._migrater]
 
   def DeclareLocks(self, level):
@@ -6119,7 +6111,7 @@ class LUInstanceMigrate(LogicalUnit):
     """
     instance = self._migrater.instance
     source_node = instance.primary_node
     """
     instance = self._migrater.instance
     source_node = instance.primary_node
-    target_node = self._migrater.target_node
+    target_node = self.op.target_node
     env = _BuildInstanceHookEnvByObject(self, instance)
     env.update({
       "MIGRATE_LIVE": self._migrater.live,
     env = _BuildInstanceHookEnvByObject(self, instance)
     env.update({
       "MIGRATE_LIVE": self._migrater.live,
@@ -6355,8 +6347,7 @@ class LUNodeMigrate(LogicalUnit):
       logging.debug("Migrating instance %s", inst.name)
       names.append(inst.name)
 
       logging.debug("Migrating instance %s", inst.name)
       names.append(inst.name)
 
-      tasklets.append(TLMigrateInstance(self, inst.name, False,
-                                        self.op.iallocator, None))
+      tasklets.append(TLMigrateInstance(self, inst.name, cleanup=False))
 
       if inst.disk_template in constants.DTS_EXT_MIRROR:
         # We need to lock all nodes, as the iallocator will choose the
 
       if inst.disk_template in constants.DTS_EXT_MIRROR:
         # We need to lock all nodes, as the iallocator will choose the
@@ -6403,10 +6394,28 @@ class TLMigrateInstance(Tasklet):
   @type live: boolean
   @ivar live: whether the migration will be done live or non-live;
       this variable is initalized only after CheckPrereq has run
   @type live: boolean
   @ivar live: whether the migration will be done live or non-live;
       this variable is initalized only after CheckPrereq has run
+  @type cleanup: boolean
+  @ivar cleanup: Wheater we cleanup from a failed migration
+  @type iallocator: string
+  @ivar iallocator: The iallocator used to determine target_node
+  @type target_node: string
+  @ivar target_node: If given, the target_node to reallocate the instance to
+  @type failover: boolean
+  @ivar failover: Whether operation results in failover or migration
+  @type fallback: boolean
+  @ivar fallback: Whether fallback to failover is allowed if migration not
+                  possible
+  @type ignore_consistency: boolean
+  @ivar ignore_consistency: Wheter we should ignore consistency between source
+                            and target node
+  @type shutdown_timeout: int
+  @ivar shutdown_timeout: In case of failover timeout of the shutdown
 
   """
 
   """
-  def __init__(self, lu, instance_name, cleanup,
-               iallocator=None, target_node=None):
+  def __init__(self, lu, instance_name, cleanup=False,
+               failover=False, fallback=False,
+               ignore_consistency=False,
+               shutdown_timeout=constants.DEFAULT_SHUTDOWN_TIMEOUT):
     """Initializes this class.
 
     """
     """Initializes this class.
 
     """
@@ -6416,8 +6425,10 @@ class TLMigrateInstance(Tasklet):
     self.instance_name = instance_name
     self.cleanup = cleanup
     self.live = False # will be overridden later
     self.instance_name = instance_name
     self.cleanup = cleanup
     self.live = False # will be overridden later
-    self.iallocator = iallocator
-    self.target_node = target_node
+    self.failover = failover
+    self.fallback = fallback
+    self.ignore_consistency = ignore_consistency
+    self.shutdown_timeout = shutdown_timeout
 
   def CheckPrereq(self):
     """Check prerequisites.
 
   def CheckPrereq(self):
     """Check prerequisites.
@@ -6430,28 +6441,40 @@ class TLMigrateInstance(Tasklet):
     assert instance is not None
     self.instance = instance
 
     assert instance is not None
     self.instance = instance
 
+    if (not self.cleanup and not instance.admin_up and not self.failover and
+        self.fallback):
+      self.lu.LogInfo("Instance is marked down, fallback allowed, switching"
+                      " to failover")
+      self.failover = True
+
     if instance.disk_template not in constants.DTS_MIRRORED:
     if instance.disk_template not in constants.DTS_MIRRORED:
+      if self.failover:
+        text = "failovers"
+      else:
+        text = "migrations"
       raise errors.OpPrereqError("Instance's disk layout '%s' does not allow"
       raise errors.OpPrereqError("Instance's disk layout '%s' does not allow"
-                                 " migrations" % instance.disk_template,
+                                 " %s" % (instance.disk_template, text),
                                  errors.ECODE_STATE)
 
     if instance.disk_template in constants.DTS_EXT_MIRROR:
       _CheckIAllocatorOrNode(self.lu, "iallocator", "target_node")
 
                                  errors.ECODE_STATE)
 
     if instance.disk_template in constants.DTS_EXT_MIRROR:
       _CheckIAllocatorOrNode(self.lu, "iallocator", "target_node")
 
-      if self.iallocator:
+      if self.lu.op.iallocator:
         self._RunAllocator()
         self._RunAllocator()
+      else:
+        # We set set self.target_node as it is required by
+        # BuildHooksEnv
+        self.target_node = self.lu.op.target_node
 
       # self.target_node is already populated, either directly or by the
       # iallocator run
       target_node = self.target_node
 
       if len(self.lu.tasklets) == 1:
 
       # self.target_node is already populated, either directly or by the
       # iallocator run
       target_node = self.target_node
 
       if len(self.lu.tasklets) == 1:
-        # It is safe to remove locks only when we're the only tasklet in the LU
-        nodes_keep = [instance.primary_node, self.target_node]
-        nodes_rel = [node for node in self.lu.acquired_locks[locking.LEVEL_NODE]
-                     if node not in nodes_keep]
-        self.lu.context.glm.release(locking.LEVEL_NODE, nodes_rel)
-        self.lu.acquired_locks[locking.LEVEL_NODE] = nodes_keep
+        # It is safe to release locks only when we're the only tasklet
+        # in the LU
+        _ReleaseLocks(self.lu, locking.LEVEL_NODE,
+                      keep=[instance.primary_node, self.target_node])
 
     else:
       secondary_nodes = instance.secondary_nodes
 
     else:
       secondary_nodes = instance.secondary_nodes
@@ -6462,29 +6485,69 @@ class TLMigrateInstance(Tasklet):
       target_node = secondary_nodes[0]
       if self.lu.op.iallocator or (self.lu.op.target_node and
                                    self.lu.op.target_node != target_node):
       target_node = secondary_nodes[0]
       if self.lu.op.iallocator or (self.lu.op.target_node and
                                    self.lu.op.target_node != target_node):
+        if self.failover:
+          text = "failed over"
+        else:
+          text = "migrated"
         raise errors.OpPrereqError("Instances with disk template %s cannot"
         raise errors.OpPrereqError("Instances with disk template %s cannot"
-                                   " be migrated over to arbitrary nodes"
+                                   " be %s to arbitrary nodes"
                                    " (neither an iallocator nor a target"
                                    " node can be passed)" %
                                    " (neither an iallocator nor a target"
                                    " node can be passed)" %
-                                   instance.disk_template, errors.ECODE_INVAL)
+                                   (instance.disk_template, text),
+                                   errors.ECODE_INVAL)
 
     i_be = self.cfg.GetClusterInfo().FillBE(instance)
 
     # check memory requirements on the secondary node
 
     i_be = self.cfg.GetClusterInfo().FillBE(instance)
 
     # check memory requirements on the secondary node
-    _CheckNodeFreeMemory(self.lu, target_node, "migrating instance %s" %
-                         instance.name, i_be[constants.BE_MEMORY],
-                         instance.hypervisor)
+    if not self.failover or instance.admin_up:
+      _CheckNodeFreeMemory(self.lu, target_node, "migrating instance %s" %
+                           instance.name, i_be[constants.BE_MEMORY],
+                           instance.hypervisor)
+    else:
+      self.lu.LogInfo("Not checking memory on the secondary node as"
+                      " instance will not be started")
 
     # check bridge existance
     _CheckInstanceBridgesExist(self.lu, instance, node=target_node)
 
     if not self.cleanup:
       _CheckNodeNotDrained(self.lu, target_node)
 
     # check bridge existance
     _CheckInstanceBridgesExist(self.lu, instance, node=target_node)
 
     if not self.cleanup:
       _CheckNodeNotDrained(self.lu, target_node)
-      result = self.rpc.call_instance_migratable(instance.primary_node,
-                                                 instance)
-      result.Raise("Can't migrate, please use failover",
-                   prereq=True, ecode=errors.ECODE_STATE)
+      if not self.failover:
+        result = self.rpc.call_instance_migratable(instance.primary_node,
+                                                   instance)
+        if result.fail_msg and self.fallback:
+          self.lu.LogInfo("Can't migrate, instance offline, fallback to"
+                          " failover")
+          self.failover = True
+        else:
+          result.Raise("Can't migrate, please use failover",
+                       prereq=True, ecode=errors.ECODE_STATE)
 
 
+    assert not (self.failover and self.cleanup)
+
+    if not self.failover:
+      if self.lu.op.live is not None and self.lu.op.mode is not None:
+        raise errors.OpPrereqError("Only one of the 'live' and 'mode'"
+                                   " parameters are accepted",
+                                   errors.ECODE_INVAL)
+      if self.lu.op.live is not None:
+        if self.lu.op.live:
+          self.lu.op.mode = constants.HT_MIGRATION_LIVE
+        else:
+          self.lu.op.mode = constants.HT_MIGRATION_NONLIVE
+        # reset the 'live' parameter to None so that repeated
+        # invocations of CheckPrereq do not raise an exception
+        self.lu.op.live = None
+      elif self.lu.op.mode is None:
+        # read the default value from the hypervisor
+        i_hv = self.cfg.GetClusterInfo().FillHV(self.instance,
+                                                skip_globals=False)
+        self.lu.op.mode = i_hv[constants.HV_MIGRATION_MODE]
+
+      self.live = self.lu.op.mode == constants.HT_MIGRATION_LIVE
+    else:
+      # Failover is never live
+      self.live = False
 
   def _RunAllocator(self):
     """Run the allocator based on input opcode.
 
   def _RunAllocator(self):
     """Run the allocator based on input opcode.
@@ -6498,42 +6561,23 @@ class TLMigrateInstance(Tasklet):
                                     self.instance.primary_node],
                      )
 
                                     self.instance.primary_node],
                      )
 
-    ial.Run(self.iallocator)
+    ial.Run(self.lu.op.iallocator)
 
     if not ial.success:
       raise errors.OpPrereqError("Can't compute nodes using"
                                  " iallocator '%s': %s" %
 
     if not ial.success:
       raise errors.OpPrereqError("Can't compute nodes using"
                                  " iallocator '%s': %s" %
-                                 (self.iallocator, ial.info),
+                                 (self.lu.op.iallocator, ial.info),
                                  errors.ECODE_NORES)
     if len(ial.result) != ial.required_nodes:
       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
                                  " of nodes (%s), required %s" %
                                  errors.ECODE_NORES)
     if len(ial.result) != ial.required_nodes:
       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
                                  " of nodes (%s), required %s" %
-                                 (self.iallocator, len(ial.result),
+                                 (self.lu.op.iallocator, len(ial.result),
                                   ial.required_nodes), errors.ECODE_FAULT)
     self.target_node = ial.result[0]
     self.lu.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
                                   ial.required_nodes), errors.ECODE_FAULT)
     self.target_node = ial.result[0]
     self.lu.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
-                 self.instance_name, self.iallocator,
+                 self.instance_name, self.lu.op.iallocator,
                  utils.CommaJoin(ial.result))
 
                  utils.CommaJoin(ial.result))
 
-    if self.lu.op.live is not None and self.lu.op.mode is not None:
-      raise errors.OpPrereqError("Only one of the 'live' and 'mode'"
-                                 " parameters are accepted",
-                                 errors.ECODE_INVAL)
-    if self.lu.op.live is not None:
-      if self.lu.op.live:
-        self.lu.op.mode = constants.HT_MIGRATION_LIVE
-      else:
-        self.lu.op.mode = constants.HT_MIGRATION_NONLIVE
-      # reset the 'live' parameter to None so that repeated
-      # invocations of CheckPrereq do not raise an exception
-      self.lu.op.live = None
-    elif self.lu.op.mode is None:
-      # read the default value from the hypervisor
-      i_hv = self.cfg.GetClusterInfo().FillHV(self.instance, skip_globals=False)
-      self.lu.op.mode = i_hv[constants.HV_MIGRATION_MODE]
-
-    self.live = self.lu.op.mode == constants.HT_MIGRATION_LIVE
-
   def _WaitUntilSync(self):
     """Poll with custom rpc for disk sync.
 
   def _WaitUntilSync(self):
     """Poll with custom rpc for disk sync.
 
@@ -6627,15 +6671,15 @@ class TLMigrateInstance(Tasklet):
 
     if runningon_source and runningon_target:
       raise errors.OpExecError("Instance seems to be running on two nodes,"
 
     if runningon_source and runningon_target:
       raise errors.OpExecError("Instance seems to be running on two nodes,"
-                               " or the hypervisor is confused. You will have"
+                               " or the hypervisor is confused; you will have"
                                " to ensure manually that it runs only on one"
                                " to ensure manually that it runs only on one"
-                               " and restart this operation.")
+                               " and restart this operation")
 
     if not (runningon_source or runningon_target):
 
     if not (runningon_source or runningon_target):
-      raise errors.OpExecError("Instance does not seem to be running at all."
-                               " In this case, it's safer to repair by"
+      raise errors.OpExecError("Instance does not seem to be running at all;"
+                               " in this case it's safer to repair by"
                                " running 'gnt-instance stop' to ensure disk"
                                " running 'gnt-instance stop' to ensure disk"
-                               " shutdown, and then restarting it.")
+                               " shutdown, and then restarting it")
 
     if runningon_target:
       # the migration has actually succeeded, we need to update the config
 
     if runningon_target:
       # the migration has actually succeeded, we need to update the config
@@ -6677,10 +6721,9 @@ class TLMigrateInstance(Tasklet):
       self._GoReconnect(False)
       self._WaitUntilSync()
     except errors.OpExecError, err:
       self._GoReconnect(False)
       self._WaitUntilSync()
     except errors.OpExecError, err:
-      self.lu.LogWarning("Migration failed and I can't reconnect the"
-                         " drives: error '%s'\n"
-                         "Please look and recover the instance status" %
-                         str(err))
+      self.lu.LogWarning("Migration failed and I can't reconnect the drives,"
+                         " please try to recover the instance manually;"
+                         " error '%s'" % str(err))
 
   def _AbortMigration(self):
     """Call the hypervisor code to abort a started migration.
 
   def _AbortMigration(self):
     """Call the hypervisor code to abort a started migration.
@@ -6722,7 +6765,7 @@ class TLMigrateInstance(Tasklet):
       if not _CheckDiskConsistency(self.lu, dev, target_node, False):
         raise errors.OpExecError("Disk %s is degraded or not fully"
                                  " synchronized on target node,"
       if not _CheckDiskConsistency(self.lu, dev, target_node, False):
         raise errors.OpExecError("Disk %s is degraded or not fully"
                                  " synchronized on target node,"
-                                 " aborting migrate." % dev.iv_name)
+                                 " aborting migration" % dev.iv_name)
 
     # First get the migration information from the remote node
     result = self.rpc.call_migration_info(source_node, instance)
 
     # First get the migration information from the remote node
     result = self.rpc.call_migration_info(source_node, instance)
@@ -6759,7 +6802,6 @@ class TLMigrateInstance(Tasklet):
                                (instance.name, msg))
 
     self.feedback_fn("* migrating instance to %s" % target_node)
                                (instance.name, msg))
 
     self.feedback_fn("* migrating instance to %s" % target_node)
-    time.sleep(10)
     result = self.rpc.call_instance_migrate(source_node, instance,
                                             self.nodes_ip[target_node],
                                             self.live)
     result = self.rpc.call_instance_migrate(source_node, instance,
                                             self.nodes_ip[target_node],
                                             self.live)
@@ -6772,7 +6814,6 @@ class TLMigrateInstance(Tasklet):
       self._RevertDiskStatus()
       raise errors.OpExecError("Could not migrate instance %s: %s" %
                                (instance.name, msg))
       self._RevertDiskStatus()
       raise errors.OpExecError("Could not migrate instance %s: %s" %
                                (instance.name, msg))
-    time.sleep(10)
 
     instance.primary_node = target_node
     # distribute new instance config to the other nodes
 
     instance.primary_node = target_node
     # distribute new instance config to the other nodes
@@ -6798,14 +6839,82 @@ class TLMigrateInstance(Tasklet):
 
     self.feedback_fn("* done")
 
 
     self.feedback_fn("* done")
 
+  def _ExecFailover(self):
+    """Failover an instance.
+
+    The failover is done by shutting it down on its present node and
+    starting it on the secondary.
+
+    """
+    instance = self.instance
+    primary_node = self.cfg.GetNodeInfo(instance.primary_node)
+
+    source_node = instance.primary_node
+    target_node = self.target_node
+
+    if instance.admin_up:
+      self.feedback_fn("* checking disk consistency between source and target")
+      for dev in instance.disks:
+        # for drbd, these are drbd over lvm
+        if not _CheckDiskConsistency(self, dev, target_node, False):
+          if not self.ignore_consistency:
+            raise errors.OpExecError("Disk %s is degraded on target node,"
+                                     " aborting failover" % dev.iv_name)
+    else:
+      self.feedback_fn("* not checking disk consistency as instance is not"
+                       " running")
+
+    self.feedback_fn("* shutting down instance on source node")
+    logging.info("Shutting down instance %s on node %s",
+                 instance.name, source_node)
+
+    result = self.rpc.call_instance_shutdown(source_node, instance,
+                                             self.shutdown_timeout)
+    msg = result.fail_msg
+    if msg:
+      if self.ignore_consistency or primary_node.offline:
+        self.lu.LogWarning("Could not shutdown instance %s on node %s,"
+                           " proceeding anyway; please make sure node"
+                           " %s is down; error details: %s",
+                           instance.name, source_node, source_node, msg)
+      else:
+        raise errors.OpExecError("Could not shutdown instance %s on"
+                                 " node %s: %s" %
+                                 (instance.name, source_node, msg))
+
+    self.feedback_fn("* deactivating the instance's disks on source node")
+    if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
+      raise errors.OpExecError("Can't shut down the instance's disks.")
+
+    instance.primary_node = target_node
+    # distribute new instance config to the other nodes
+    self.cfg.Update(instance, self.feedback_fn)
+
+    # Only start the instance if it's marked as up
+    if instance.admin_up:
+      self.feedback_fn("* activating the instance's disks on target node")
+      logging.info("Starting instance %s on node %s",
+                   instance.name, target_node)
+
+      disks_ok, _ = _AssembleInstanceDisks(self, instance,
+                                           ignore_secondaries=True)
+      if not disks_ok:
+        _ShutdownInstanceDisks(self, instance)
+        raise errors.OpExecError("Can't activate the instance's disks")
+
+      self.feedback_fn("* starting the instance on the target node")
+      result = self.rpc.call_instance_start(target_node, instance, None, None)
+      msg = result.fail_msg
+      if msg:
+        _ShutdownInstanceDisks(self, instance)
+        raise errors.OpExecError("Could not start instance %s on node %s: %s" %
+                                 (instance.name, target_node, msg))
+
   def Exec(self, feedback_fn):
     """Perform the migration.
 
     """
   def Exec(self, feedback_fn):
     """Perform the migration.
 
     """
-    feedback_fn("Migrating instance %s" % self.instance.name)
-
     self.feedback_fn = feedback_fn
     self.feedback_fn = feedback_fn
-
     self.source_node = self.instance.primary_node
 
     # FIXME: if we implement migrate-to-any in DRBD, this needs fixing
     self.source_node = self.instance.primary_node
 
     # FIXME: if we implement migrate-to-any in DRBD, this needs fixing
@@ -6820,10 +6929,16 @@ class TLMigrateInstance(Tasklet):
       self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
       }
 
       self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
       }
 
-    if self.cleanup:
-      return self._ExecCleanup()
+    if self.failover:
+      feedback_fn("Failover instance %s" % self.instance.name)
+      self._ExecFailover()
     else:
     else:
-      return self._ExecMigration()
+      feedback_fn("Migrating instance %s" % self.instance.name)
+
+      if self.cleanup:
+        return self._ExecCleanup()
+      else:
+        return self._ExecMigration()
 
 
 def _CreateBlockDev(lu, node, instance, device, force_create,
 
 
 def _CreateBlockDev(lu, node, instance, device, force_create,
@@ -6911,17 +7026,18 @@ def _GenerateUniqueNames(lu, exts):
   return results
 
 
   return results
 
 
-def _GenerateDRBD8Branch(lu, primary, secondary, size, vgname, names, iv_name,
-                         p_minor, s_minor):
+def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names,
+                         iv_name, p_minor, s_minor):
   """Generate a drbd8 device complete with its children.
 
   """
   """Generate a drbd8 device complete with its children.
 
   """
+  assert len(vgnames) == len(names) == 2
   port = lu.cfg.AllocatePort()
   shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
   port = lu.cfg.AllocatePort()
   shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
-                          logical_id=(vgname, names[0]))
+                          logical_id=(vgnames[0], names[0]))
   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
-                          logical_id=(vgname, names[1]))
+                          logical_id=(vgnames[1], names[1]))
   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
                           logical_id=(primary, secondary, port,
                                       p_minor, s_minor,
   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
                           logical_id=(primary, secondary, port,
                                       p_minor, s_minor,
@@ -6976,9 +7092,11 @@ def _GenerateDiskTemplate(lu, template_name,
       names.append(lv_prefix + "_meta")
     for idx, disk in enumerate(disk_info):
       disk_index = idx + base_index
       names.append(lv_prefix + "_meta")
     for idx, disk in enumerate(disk_info):
       disk_index = idx + base_index
-      vg = disk.get(constants.IDISK_VG, vgname)
+      data_vg = disk.get(constants.IDISK_VG, vgname)
+      meta_vg = disk.get(constants.IDISK_METAVG, data_vg)
       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
-                                      disk[constants.IDISK_SIZE], vg,
+                                      disk[constants.IDISK_SIZE],
+                                      [data_vg, meta_vg],
                                       names[idx * 2:idx * 2 + 2],
                                       "disk/%d" % disk_index,
                                       minors[idx * 2], minors[idx * 2 + 1])
                                       names[idx * 2:idx * 2 + 2],
                                       "disk/%d" % disk_index,
                                       minors[idx * 2], minors[idx * 2 + 1])
@@ -7080,14 +7198,17 @@ def _WipeDisks(lu, instance):
 
   try:
     for idx, device in enumerate(instance.disks):
 
   try:
     for idx, device in enumerate(instance.disks):
-      lu.LogInfo("* Wiping disk %d", idx)
-      logging.info("Wiping disk %d for instance %s, node %s",
-                   idx, instance.name, node)
-
       # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
       # MAX_WIPE_CHUNK at max
       wipe_chunk_size = min(constants.MAX_WIPE_CHUNK, device.size / 100.0 *
                             constants.MIN_WIPE_CHUNK_PERCENT)
       # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
       # MAX_WIPE_CHUNK at max
       wipe_chunk_size = min(constants.MAX_WIPE_CHUNK, device.size / 100.0 *
                             constants.MIN_WIPE_CHUNK_PERCENT)
+      # we _must_ make this an int, otherwise rounding errors will
+      # occur
+      wipe_chunk_size = int(wipe_chunk_size)
+
+      lu.LogInfo("* Wiping disk %d", idx)
+      logging.info("Wiping disk %d for instance %s, node %s using"
+                   " chunk size %s", idx, instance.name, node, wipe_chunk_size)
 
       offset = 0
       size = device.size
 
       offset = 0
       size = device.size
@@ -7096,6 +7217,8 @@ def _WipeDisks(lu, instance):
 
       while offset < size:
         wipe_size = min(wipe_chunk_size, size - offset)
 
       while offset < size:
         wipe_size = min(wipe_chunk_size, size - offset)
+        logging.debug("Wiping disk %d, offset %s, chunk %s",
+                      idx, offset, wipe_size)
         result = lu.rpc.call_blockdev_wipe(node, device, offset, wipe_size)
         result.Raise("Could not wipe disk %d at offset %d for size %d" %
                      (idx, offset, wipe_size))
         result = lu.rpc.call_blockdev_wipe(node, device, offset, wipe_size)
         result.Raise("Could not wipe disk %d at offset %d for size %d" %
                      (idx, offset, wipe_size))
@@ -7113,8 +7236,8 @@ def _WipeDisks(lu, instance):
 
     for idx, success in enumerate(result.payload):
       if not success:
 
     for idx, success in enumerate(result.payload):
       if not success:
-        lu.LogWarning("Warning: Resume sync of disk %d failed. Please have a"
-                      " look at the status and troubleshoot the issue.", idx)
+        lu.LogWarning("Resume sync of disk %d failed, please have a"
+                      " look at the status and troubleshoot the issue", idx)
         logging.warn("resume-sync of instance %s for disks %d failed",
                      instance.name, idx)
 
         logging.warn("resume-sync of instance %s for disks %d failed",
                      instance.name, idx)
 
@@ -7363,8 +7486,8 @@ class LUInstanceCreate(LogicalUnit):
 
     if self.op.ip_check and not self.op.name_check:
       # TODO: make the ip check more flexible and not depend on the name check
 
     if self.op.ip_check and not self.op.name_check:
       # TODO: make the ip check more flexible and not depend on the name check
-      raise errors.OpPrereqError("Cannot do ip check without a name check",
-                                 errors.ECODE_INVAL)
+      raise errors.OpPrereqError("Cannot do IP address check without a name"
+                                 " check", errors.ECODE_INVAL)
 
     # check nics' parameter names
     for nic in self.op.nics:
 
     # check nics' parameter names
     for nic in self.op.nics:
@@ -7542,7 +7665,7 @@ class LUInstanceCreate(LogicalUnit):
         self.op.src_node = None
         if os.path.isabs(src_path):
           raise errors.OpPrereqError("Importing an instance from an absolute"
         self.op.src_node = None
         if os.path.isabs(src_path):
           raise errors.OpPrereqError("Importing an instance from an absolute"
-                                     " path requires a source node option.",
+                                     " path requires a source node option",
                                      errors.ECODE_INVAL)
       else:
         self.op.src_node = src_node = _ExpandNodeName(self.cfg, src_node)
                                      errors.ECODE_INVAL)
       else:
         self.op.src_node = src_node = _ExpandNodeName(self.cfg, src_node)
@@ -7644,7 +7767,7 @@ class LUInstanceCreate(LogicalUnit):
     src_path = self.op.src_path
 
     if src_node is None:
     src_path = self.op.src_path
 
     if src_node is None:
-      locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
+      locked_nodes = self.glm.list_owned(locking.LEVEL_NODE)
       exp_list = self.rpc.call_export_list(locked_nodes)
       found = False
       for node in exp_list:
       exp_list = self.rpc.call_export_list(locked_nodes)
       found = False
       for node in exp_list:
@@ -7894,10 +8017,13 @@ class LUInstanceCreate(LogicalUnit):
       except (TypeError, ValueError):
         raise errors.OpPrereqError("Invalid disk size '%s'" % size,
                                    errors.ECODE_INVAL)
       except (TypeError, ValueError):
         raise errors.OpPrereqError("Invalid disk size '%s'" % size,
                                    errors.ECODE_INVAL)
+
+      data_vg = disk.get(constants.IDISK_VG, default_vg)
       new_disk = {
         constants.IDISK_SIZE: size,
         constants.IDISK_MODE: mode,
       new_disk = {
         constants.IDISK_SIZE: size,
         constants.IDISK_MODE: mode,
-        constants.IDISK_VG: disk.get(constants.IDISK_VG, default_vg),
+        constants.IDISK_VG: data_vg,
+        constants.IDISK_METAVG: disk.get(constants.IDISK_METAVG, data_vg),
         }
       if constants.IDISK_ADOPT in disk:
         new_disk[constants.IDISK_ADOPT] = disk[constants.IDISK_ADOPT]
         }
       if constants.IDISK_ADOPT in disk:
         new_disk[constants.IDISK_ADOPT] = disk[constants.IDISK_ADOPT]
@@ -7988,7 +8114,7 @@ class LUInstanceCreate(LogicalUnit):
     if self.op.disk_template in constants.DTS_INT_MIRROR:
       if self.op.snode == pnode.name:
         raise errors.OpPrereqError("The secondary node cannot be the"
     if self.op.disk_template in constants.DTS_INT_MIRROR:
       if self.op.snode == pnode.name:
         raise errors.OpPrereqError("The secondary node cannot be the"
-                                   " primary node.", errors.ECODE_INVAL)
+                                   " primary node", errors.ECODE_INVAL)
       _CheckNodeOnline(self, self.op.snode)
       _CheckNodeNotDrained(self, self.op.snode)
       _CheckNodeVmCapable(self, self.op.snode)
       _CheckNodeOnline(self, self.op.snode)
       _CheckNodeNotDrained(self, self.op.snode)
       _CheckNodeVmCapable(self, self.op.snode)
@@ -8166,18 +8292,6 @@ class LUInstanceCreate(LogicalUnit):
           self.cfg.ReleaseDRBDMinors(instance)
           raise
 
           self.cfg.ReleaseDRBDMinors(instance)
           raise
 
-      if self.cfg.GetClusterInfo().prealloc_wipe_disks:
-        feedback_fn("* wiping instance disks...")
-        try:
-          _WipeDisks(self, iobj)
-        except errors.OpExecError:
-          self.LogWarning("Device wiping failed, reverting...")
-          try:
-            _RemoveDisks(self, iobj)
-          finally:
-            self.cfg.ReleaseDRBDMinors(instance)
-            raise
-
     feedback_fn("adding instance %s to cluster config" % instance)
 
     self.cfg.AddInstance(iobj, self.proc.GetECId())
     feedback_fn("adding instance %s to cluster config" % instance)
 
     self.cfg.AddInstance(iobj, self.proc.GetECId())
@@ -8185,18 +8299,28 @@ class LUInstanceCreate(LogicalUnit):
     # Declare that we don't want to remove the instance lock anymore, as we've
     # added the instance to the config
     del self.remove_locks[locking.LEVEL_INSTANCE]
     # Declare that we don't want to remove the instance lock anymore, as we've
     # added the instance to the config
     del self.remove_locks[locking.LEVEL_INSTANCE]
-    # Unlock all the nodes
+
     if self.op.mode == constants.INSTANCE_IMPORT:
     if self.op.mode == constants.INSTANCE_IMPORT:
-      nodes_keep = [self.op.src_node]
-      nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
-                       if node != self.op.src_node]
-      self.context.glm.release(locking.LEVEL_NODE, nodes_release)
-      self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
+      # Release unused nodes
+      _ReleaseLocks(self, locking.LEVEL_NODE, keep=[self.op.src_node])
     else:
     else:
-      self.context.glm.release(locking.LEVEL_NODE)
-      del self.acquired_locks[locking.LEVEL_NODE]
+      # Release all nodes
+      _ReleaseLocks(self, locking.LEVEL_NODE)
 
 
-    if self.op.wait_for_sync:
+    disk_abort = False
+    if not self.adopt_disks and self.cfg.GetClusterInfo().prealloc_wipe_disks:
+      feedback_fn("* wiping instance disks...")
+      try:
+        _WipeDisks(self, iobj)
+      except errors.OpExecError, err:
+        logging.exception("Wiping disks failed")
+        self.LogWarning("Wiping instance disks failed (%s)", err)
+        disk_abort = True
+
+    if disk_abort:
+      # Something is already wrong with the disks, don't do anything else
+      pass
+    elif self.op.wait_for_sync:
       disk_abort = not _WaitForSync(self, iobj)
     elif iobj.disk_template in constants.DTS_INT_MIRROR:
       # make sure the disks are not degraded (still sync-ing is ok)
       disk_abort = not _WaitForSync(self, iobj)
     elif iobj.disk_template in constants.DTS_INT_MIRROR:
       # make sure the disks are not degraded (still sync-ing is ok)
@@ -8380,24 +8504,29 @@ class LUInstanceReplaceDisks(LogicalUnit):
   def ExpandNames(self):
     self._ExpandAndLockInstance()
 
   def ExpandNames(self):
     self._ExpandAndLockInstance()
 
-    if self.op.iallocator is not None:
-      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+    assert locking.LEVEL_NODE not in self.needed_locks
+    assert locking.LEVEL_NODEGROUP not in self.needed_locks
+
+    assert self.op.iallocator is None or self.op.remote_node is None, \
+      "Conflicting options"
 
 
-    elif self.op.remote_node is not None:
-      remote_node = _ExpandNodeName(self.cfg, self.op.remote_node)
-      self.op.remote_node = remote_node
+    if self.op.remote_node is not None:
+      self.op.remote_node = _ExpandNodeName(self.cfg, self.op.remote_node)
 
       # Warning: do not remove the locking of the new secondary here
       # unless DRBD8.AddChildren is changed to work in parallel;
       # currently it doesn't since parallel invocations of
       # FindUnusedMinor will conflict
 
       # Warning: do not remove the locking of the new secondary here
       # unless DRBD8.AddChildren is changed to work in parallel;
       # currently it doesn't since parallel invocations of
       # FindUnusedMinor will conflict
-      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
+      self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node]
       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
-
     else:
       self.needed_locks[locking.LEVEL_NODE] = []
       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
 
     else:
       self.needed_locks[locking.LEVEL_NODE] = []
       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
 
+      if self.op.iallocator is not None:
+        # iallocator will select a new node in the same group
+        self.needed_locks[locking.LEVEL_NODEGROUP] = []
+
     self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
                                    self.op.iallocator, self.op.remote_node,
                                    self.op.disks, False, self.op.early_release)
     self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
                                    self.op.iallocator, self.op.remote_node,
                                    self.op.disks, False, self.op.early_release)
@@ -8405,11 +8534,26 @@ class LUInstanceReplaceDisks(LogicalUnit):
     self.tasklets = [self.replacer]
 
   def DeclareLocks(self, level):
     self.tasklets = [self.replacer]
 
   def DeclareLocks(self, level):
-    # If we're not already locking all nodes in the set we have to declare the
-    # instance's primary/secondary nodes.
-    if (level == locking.LEVEL_NODE and
-        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
-      self._LockInstancesNodes()
+    if level == locking.LEVEL_NODEGROUP:
+      assert self.op.remote_node is None
+      assert self.op.iallocator is not None
+      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
+
+      self.share_locks[locking.LEVEL_NODEGROUP] = 1
+      self.needed_locks[locking.LEVEL_NODEGROUP] = \
+        self.cfg.GetInstanceNodeGroups(self.op.instance_name)
+
+    elif level == locking.LEVEL_NODE:
+      if self.op.iallocator is not None:
+        assert self.op.remote_node is None
+        assert not self.needed_locks[locking.LEVEL_NODE]
+
+        # Lock member nodes of all locked groups
+        self.needed_locks[locking.LEVEL_NODE] = [node_name
+          for group_uuid in self.glm.list_owned(locking.LEVEL_NODEGROUP)
+          for node_name in self.cfg.GetNodeGroup(group_uuid).members]
+      else:
+        self._LockInstancesNodes()
 
   def BuildHooksEnv(self):
     """Build hooks env.
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -8439,6 +8583,26 @@ class LUInstanceReplaceDisks(LogicalUnit):
       nl.append(self.op.remote_node)
     return nl, nl
 
       nl.append(self.op.remote_node)
     return nl, nl
 
+  def CheckPrereq(self):
+    """Check prerequisites.
+
+    """
+    assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or
+            self.op.iallocator is None)
+
+    owned_groups = self.glm.list_owned(locking.LEVEL_NODEGROUP)
+    if owned_groups:
+      groups = self.cfg.GetInstanceNodeGroups(self.op.instance_name)
+      if owned_groups != groups:
+        raise errors.OpExecError("Node groups used by instance '%s' changed"
+                                 " since lock was acquired, current list is %r,"
+                                 " used to be '%s'" %
+                                 (self.op.instance_name,
+                                  utils.CommaJoin(groups),
+                                  utils.CommaJoin(owned_groups)))
+
+    return LogicalUnit.CheckPrereq(self)
+
 
 class TLReplaceDisks(Tasklet):
   """Replaces disks for an instance.
 
 class TLReplaceDisks(Tasklet):
   """Replaces disks for an instance.
@@ -8550,7 +8714,6 @@ class TLReplaceDisks(Tasklet):
 
     return True
 
 
     return True
 
-
   def CheckPrereq(self):
     """Check prerequisites.
 
   def CheckPrereq(self):
     """Check prerequisites.
 
@@ -8592,20 +8755,23 @@ class TLReplaceDisks(Tasklet):
       remote_node = self._RunAllocator(self.lu, self.iallocator_name,
                                        instance.name, instance.secondary_nodes)
 
       remote_node = self._RunAllocator(self.lu, self.iallocator_name,
                                        instance.name, instance.secondary_nodes)
 
-    if remote_node is not None:
+    if remote_node is None:
+      self.remote_node_info = None
+    else:
+      assert remote_node in self.lu.glm.list_owned(locking.LEVEL_NODE), \
+             "Remote node '%s' is not locked" % remote_node
+
       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
       assert self.remote_node_info is not None, \
         "Cannot retrieve locked node %s" % remote_node
       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
       assert self.remote_node_info is not None, \
         "Cannot retrieve locked node %s" % remote_node
-    else:
-      self.remote_node_info = None
 
     if remote_node == self.instance.primary_node:
       raise errors.OpPrereqError("The specified node is the primary node of"
 
     if remote_node == self.instance.primary_node:
       raise errors.OpPrereqError("The specified node is the primary node of"
-                                 " the instance.", errors.ECODE_INVAL)
+                                 " the instance", errors.ECODE_INVAL)
 
     if remote_node == secondary_node:
       raise errors.OpPrereqError("The specified node is already the"
 
     if remote_node == secondary_node:
       raise errors.OpPrereqError("The specified node is already the"
-                                 " secondary node of the instance.",
+                                 " secondary node of the instance",
                                  errors.ECODE_INVAL)
 
     if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
                                  errors.ECODE_INVAL)
 
     if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
@@ -8681,18 +8847,26 @@ class TLReplaceDisks(Tasklet):
     for node in check_nodes:
       _CheckNodeOnline(self.lu, node)
 
     for node in check_nodes:
       _CheckNodeOnline(self.lu, node)
 
+    touched_nodes = frozenset(node_name for node_name in [self.new_node,
+                                                          self.other_node,
+                                                          self.target_node]
+                              if node_name is not None)
+
+    # Release unneeded node locks
+    _ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
+
+    # Release any owned node group
+    if self.lu.glm.is_owned(locking.LEVEL_NODEGROUP):
+      _ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
+
     # Check whether disks are valid
     for disk_idx in self.disks:
       instance.FindDisk(disk_idx)
 
     # Get secondary node IP addresses
     # Check whether disks are valid
     for disk_idx in self.disks:
       instance.FindDisk(disk_idx)
 
     # Get secondary node IP addresses
-    node_2nd_ip = {}
-
-    for node_name in [self.target_node, self.other_node, self.new_node]:
-      if node_name is not None:
-        node_2nd_ip[node_name] = self.cfg.GetNodeInfo(node_name).secondary_ip
-
-    self.node_secondary_ip = node_2nd_ip
+    self.node_secondary_ip = \
+      dict((node_name, self.cfg.GetNodeInfo(node_name).secondary_ip)
+           for node_name in touched_nodes)
 
   def Exec(self, feedback_fn):
     """Execute disk replacement.
 
   def Exec(self, feedback_fn):
     """Execute disk replacement.
@@ -8703,6 +8877,20 @@ class TLReplaceDisks(Tasklet):
     if self.delay_iallocator:
       self._CheckPrereq2()
 
     if self.delay_iallocator:
       self._CheckPrereq2()
 
+    if __debug__:
+      # Verify owned locks before starting operation
+      owned_locks = self.lu.glm.list_owned(locking.LEVEL_NODE)
+      assert set(owned_locks) == set(self.node_secondary_ip), \
+          ("Incorrect node locks, owning %s, expected %s" %
+           (owned_locks, self.node_secondary_ip.keys()))
+
+      owned_locks = self.lu.glm.list_owned(locking.LEVEL_INSTANCE)
+      assert list(owned_locks) == [self.instance_name], \
+          "Instance '%s' not locked" % self.instance_name
+
+      assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \
+          "Should not own any node group lock at this point"
+
     if not self.disks:
       feedback_fn("No disks need replacement")
       return
     if not self.disks:
       feedback_fn("No disks need replacement")
       return
@@ -8723,14 +8911,24 @@ class TLReplaceDisks(Tasklet):
       else:
         fn = self._ExecDrbd8DiskOnly
 
       else:
         fn = self._ExecDrbd8DiskOnly
 
-      return fn(feedback_fn)
-
+      result = fn(feedback_fn)
     finally:
       # Deactivate the instance disks if we're replacing them on a
       # down instance
       if activate_disks:
         _SafeShutdownInstanceDisks(self.lu, self.instance)
 
     finally:
       # Deactivate the instance disks if we're replacing them on a
       # down instance
       if activate_disks:
         _SafeShutdownInstanceDisks(self.lu, self.instance)
 
+    if __debug__:
+      # Verify owned locks
+      owned_locks = self.lu.glm.list_owned(locking.LEVEL_NODE)
+      nodes = frozenset(self.node_secondary_ip)
+      assert ((self.early_release and not owned_locks) or
+              (not self.early_release and not (set(owned_locks) - nodes))), \
+        ("Not owning the correct locks, early_release=%s, owned=%r,"
+         " nodes=%r" % (self.early_release, owned_locks, nodes))
+
+    return result
+
   def _CheckVolumeGroup(self, nodes):
     self.lu.LogInfo("Checking volume groups")
 
   def _CheckVolumeGroup(self, nodes):
     self.lu.LogInfo("Checking volume groups")
 
@@ -8782,7 +8980,6 @@ class TLReplaceDisks(Tasklet):
                                  (node_name, self.instance.name))
 
   def _CreateNewStorage(self, node_name):
                                  (node_name, self.instance.name))
 
   def _CreateNewStorage(self, node_name):
-    vgname = self.cfg.GetVGName()
     iv_names = {}
 
     for idx, dev in enumerate(self.instance.disks):
     iv_names = {}
 
     for idx, dev in enumerate(self.instance.disks):
@@ -8796,10 +8993,12 @@ class TLReplaceDisks(Tasklet):
       lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
       names = _GenerateUniqueNames(self.lu, lv_names)
 
       lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
       names = _GenerateUniqueNames(self.lu, lv_names)
 
+      vg_data = dev.children[0].logical_id[0]
       lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
       lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
-                             logical_id=(vgname, names[0]))
+                             logical_id=(vg_data, names[0]))
+      vg_meta = dev.children[1].logical_id[0]
       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
-                             logical_id=(vgname, names[1]))
+                             logical_id=(vg_meta, names[1]))
 
       new_lvs = [lv_data, lv_meta]
       old_lvs = dev.children
 
       new_lvs = [lv_data, lv_meta]
       old_lvs = dev.children
@@ -8840,10 +9039,6 @@ class TLReplaceDisks(Tasklet):
           self.lu.LogWarning("Can't remove old LV: %s" % msg,
                              hint="remove unused LVs manually")
 
           self.lu.LogWarning("Can't remove old LV: %s" % msg,
                              hint="remove unused LVs manually")
 
-  def _ReleaseNodeLock(self, node_name):
-    """Releases the lock for a given node."""
-    self.lu.context.glm.release(locking.LEVEL_NODE, node_name)
-
   def _ExecDrbd8DiskOnly(self, feedback_fn):
     """Replace a disk on the primary or secondary for DRBD 8.
 
   def _ExecDrbd8DiskOnly(self, feedback_fn):
     """Replace a disk on the primary or secondary for DRBD 8.
 
@@ -8961,7 +9156,8 @@ class TLReplaceDisks(Tasklet):
       self._RemoveOldStorage(self.target_node, iv_names)
       # WARNING: we release both node locks here, do not do other RPCs
       # than WaitForSync to the primary node
       self._RemoveOldStorage(self.target_node, iv_names)
       # WARNING: we release both node locks here, do not do other RPCs
       # than WaitForSync to the primary node
-      self._ReleaseNodeLock([self.target_node, self.other_node])
+      _ReleaseLocks(self.lu, locking.LEVEL_NODE,
+                    names=[self.target_node, self.other_node])
 
     # Wait for sync
     # This can fail as the old devices are degraded and _WaitForSync
 
     # Wait for sync
     # This can fail as the old devices are degraded and _WaitForSync
@@ -9118,9 +9314,10 @@ class TLReplaceDisks(Tasklet):
       self._RemoveOldStorage(self.target_node, iv_names)
       # WARNING: we release all node locks here, do not do other RPCs
       # than WaitForSync to the primary node
       self._RemoveOldStorage(self.target_node, iv_names)
       # WARNING: we release all node locks here, do not do other RPCs
       # than WaitForSync to the primary node
-      self._ReleaseNodeLock([self.instance.primary_node,
-                             self.target_node,
-                             self.new_node])
+      _ReleaseLocks(self.lu, locking.LEVEL_NODE,
+                    names=[self.instance.primary_node,
+                           self.target_node,
+                           self.new_node])
 
     # Wait for sync
     # This can fail as the old devices are degraded and _WaitForSync
 
     # Wait for sync
     # This can fail as the old devices are degraded and _WaitForSync
@@ -9298,7 +9495,7 @@ class LUInstanceGrowDisk(LogicalUnit):
 
     if instance.disk_template not in constants.DTS_GROWABLE:
       raise errors.OpPrereqError("Instance's disk layout does not support"
 
     if instance.disk_template not in constants.DTS_GROWABLE:
       raise errors.OpPrereqError("Instance's disk layout does not support"
-                                 " growing.", errors.ECODE_INVAL)
+                                 " growing", errors.ECODE_INVAL)
 
     self.disk = instance.FindDisk(self.op.disk)
 
 
     self.disk = instance.FindDisk(self.op.disk)
 
@@ -9320,9 +9517,17 @@ class LUInstanceGrowDisk(LogicalUnit):
     if not disks_ok:
       raise errors.OpExecError("Cannot activate block device to grow")
 
     if not disks_ok:
       raise errors.OpExecError("Cannot activate block device to grow")
 
+    # First run all grow ops in dry-run mode
     for node in instance.all_nodes:
       self.cfg.SetDiskID(disk, node)
     for node in instance.all_nodes:
       self.cfg.SetDiskID(disk, node)
-      result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
+      result = self.rpc.call_blockdev_grow(node, disk, self.op.amount, True)
+      result.Raise("Grow request failed to node %s" % node)
+
+    # We know that (as far as we can test) operations across different
+    # nodes will succeed, time to run it for real
+    for node in instance.all_nodes:
+      self.cfg.SetDiskID(disk, node)
+      result = self.rpc.call_blockdev_grow(node, disk, self.op.amount, False)
       result.Raise("Grow request failed to node %s" % node)
 
       # TODO: Rewrite code to work properly
       result.Raise("Grow request failed to node %s" % node)
 
       # TODO: Rewrite code to work properly
@@ -9337,14 +9542,14 @@ class LUInstanceGrowDisk(LogicalUnit):
     if self.op.wait_for_sync:
       disk_abort = not _WaitForSync(self, instance, disks=[disk])
       if disk_abort:
     if self.op.wait_for_sync:
       disk_abort = not _WaitForSync(self, instance, disks=[disk])
       if disk_abort:
-        self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
-                             " status.\nPlease check the instance.")
+        self.proc.LogWarning("Disk sync-ing has not returned a good"
+                             " status; please check the instance")
       if not instance.admin_up:
         _SafeShutdownInstanceDisks(self, instance, disks=[disk])
     elif not instance.admin_up:
       self.proc.LogWarning("Not shutting down the disk even if the instance is"
                            " not supposed to be running because no wait for"
       if not instance.admin_up:
         _SafeShutdownInstanceDisks(self, instance, disks=[disk])
     elif not instance.admin_up:
       self.proc.LogWarning("Not shutting down the disk even if the instance is"
                            " not supposed to be running because no wait for"
-                           " sync mode was requested.")
+                           " sync mode was requested")
 
 
 class LUInstanceQueryData(NoHooksLU):
 
 
 class LUInstanceQueryData(NoHooksLU):
@@ -9355,23 +9560,33 @@ class LUInstanceQueryData(NoHooksLU):
 
   def ExpandNames(self):
     self.needed_locks = {}
 
   def ExpandNames(self):
     self.needed_locks = {}
-    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
 
 
-    if self.op.instances:
-      self.wanted_names = []
-      for name in self.op.instances:
-        full_name = _ExpandInstanceName(self.cfg, name)
-        self.wanted_names.append(full_name)
-      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
+    # Use locking if requested or when non-static information is wanted
+    if not (self.op.static or self.op.use_locking):
+      self.LogWarning("Non-static data requested, locks need to be acquired")
+      self.op.use_locking = True
+
+    if self.op.instances or not self.op.use_locking:
+      # Expand instance names right here
+      self.wanted_names = _GetWantedInstances(self, self.op.instances)
     else:
     else:
+      # Will use acquired locks
       self.wanted_names = None
       self.wanted_names = None
-      self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
 
 
-    self.needed_locks[locking.LEVEL_NODE] = []
-    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+    if self.op.use_locking:
+      self.share_locks = dict.fromkeys(locking.LEVELS, 1)
+
+      if self.wanted_names is None:
+        self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
+      else:
+        self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
+
+      self.needed_locks[locking.LEVEL_NODE] = []
+      self.share_locks = dict.fromkeys(locking.LEVELS, 1)
+      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
 
   def DeclareLocks(self, level):
 
   def DeclareLocks(self, level):
-    if level == locking.LEVEL_NODE:
+    if self.op.use_locking and level == locking.LEVEL_NODE:
       self._LockInstancesNodes()
 
   def CheckPrereq(self):
       self._LockInstancesNodes()
 
   def CheckPrereq(self):
@@ -9381,10 +9596,11 @@ class LUInstanceQueryData(NoHooksLU):
 
     """
     if self.wanted_names is None:
 
     """
     if self.wanted_names is None:
-      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
+      assert self.op.use_locking, "Locking was not used"
+      self.wanted_names = self.glm.list_owned(locking.LEVEL_INSTANCE)
 
 
-    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
-                             in self.wanted_names]
+    self.wanted_instances = [self.cfg.GetInstanceInfo(name)
+                             for name in self.wanted_names]
 
   def _ComputeBlockdevStatus(self, node, instance_name, dev):
     """Returns the status of a block device
 
   def _ComputeBlockdevStatus(self, node, instance_name, dev):
     """Returns the status of a block device
@@ -9430,7 +9646,7 @@ class LUInstanceQueryData(NoHooksLU):
     else:
       dev_children = []
 
     else:
       dev_children = []
 
-    data = {
+    return {
       "iv_name": dev.iv_name,
       "dev_type": dev.dev_type,
       "logical_id": dev.logical_id,
       "iv_name": dev.iv_name,
       "dev_type": dev.dev_type,
       "logical_id": dev.logical_id,
@@ -9442,8 +9658,6 @@ class LUInstanceQueryData(NoHooksLU):
       "size": dev.size,
       }
 
       "size": dev.size,
       }
 
-    return data
-
   def Exec(self, feedback_fn):
     """Gather and return data"""
     result = {}
   def Exec(self, feedback_fn):
     """Gather and return data"""
     result = {}
@@ -9471,7 +9685,7 @@ class LUInstanceQueryData(NoHooksLU):
       disks = [self._ComputeDiskStatus(instance, None, device)
                for device in instance.disks]
 
       disks = [self._ComputeDiskStatus(instance, None, device)
                for device in instance.disks]
 
-      idict = {
+      result[instance.name] = {
         "name": instance.name,
         "config_state": config_state,
         "run_state": remote_state,
         "name": instance.name,
         "config_state": config_state,
         "run_state": remote_state,
@@ -9496,8 +9710,6 @@ class LUInstanceQueryData(NoHooksLU):
         "uuid": instance.uuid,
         }
 
         "uuid": instance.uuid,
         }
 
-      result[instance.name] = idict
-
     return result
 
 
     return result
 
 
@@ -9948,7 +10160,8 @@ class LUInstanceSetParams(LogicalUnit):
     snode = self.op.remote_node
 
     # create a fake disk info for _GenerateDiskTemplate
     snode = self.op.remote_node
 
     # create a fake disk info for _GenerateDiskTemplate
-    disk_info = [{constants.IDISK_SIZE: d.size, constants.IDISK_MODE: d.mode}
+    disk_info = [{constants.IDISK_SIZE: d.size, constants.IDISK_MODE: d.mode,
+                  constants.IDISK_VG: d.logical_id[0]}
                  for d in instance.disks]
     new_disks = _GenerateDiskTemplate(self, self.op.disk_template,
                                       instance.name, pnode, [snode],
                  for d in instance.disks]
     new_disks = _GenerateDiskTemplate(self, self.op.disk_template,
                                       instance.name, pnode, [snode],
@@ -10192,7 +10405,7 @@ class LUBackupQuery(NoHooksLU):
         that node.
 
     """
         that node.
 
     """
-    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
+    self.nodes = self.glm.list_owned(locking.LEVEL_NODE)
     rpcresult = self.rpc.call_export_list(self.nodes)
     result = {}
     for node in rpcresult:
     rpcresult = self.rpc.call_export_list(self.nodes)
     result = {}
     for node in rpcresult:
@@ -10574,7 +10787,7 @@ class LUBackupRemove(NoHooksLU):
       fqdn_warn = True
       instance_name = self.op.instance_name
 
       fqdn_warn = True
       instance_name = self.op.instance_name
 
-    locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
+    locked_nodes = self.glm.list_owned(locking.LEVEL_NODE)
     exportlist = self.rpc.call_export_list(locked_nodes)
     found = False
     for node in exportlist:
     exportlist = self.rpc.call_export_list(locked_nodes)
     found = False
     for node in exportlist:
@@ -10712,7 +10925,7 @@ class LUGroupAssignNodes(NoHooksLU):
 
         if previous_splits:
           self.LogWarning("In addition, these already-split instances continue"
 
         if previous_splits:
           self.LogWarning("In addition, these already-split instances continue"
-                          " to be spit across groups: %s",
+                          " to be split across groups: %s",
                           utils.CommaJoin(utils.NiceSort(previous_splits)))
 
   def Exec(self, feedback_fn):
                           utils.CommaJoin(utils.NiceSort(previous_splits)))
 
   def Exec(self, feedback_fn):
@@ -10801,7 +11014,8 @@ class _GroupQuery(_QueryBase):
           missing.append(name)
 
       if missing:
           missing.append(name)
 
       if missing:
-        raise errors.OpPrereqError("Some groups do not exist: %s" % missing,
+        raise errors.OpPrereqError("Some groups do not exist: %s" %
+                                   utils.CommaJoin(missing),
                                    errors.ECODE_NOENT)
 
   def DeclareLocks(self, lu, level):
                                    errors.ECODE_NOENT)
 
   def DeclareLocks(self, lu, level):
@@ -11084,8 +11298,8 @@ class TagsLU(NoHooksLU): # pylint: disable-msg=W0223
   This is an abstract class which is the parent of all the other tags LUs.
 
   """
   This is an abstract class which is the parent of all the other tags LUs.
 
   """
-
   def ExpandNames(self):
   def ExpandNames(self):
+    self.group_uuid = None
     self.needed_locks = {}
     if self.op.kind == constants.TAG_NODE:
       self.op.name = _ExpandNodeName(self.cfg, self.op.name)
     self.needed_locks = {}
     if self.op.kind == constants.TAG_NODE:
       self.op.name = _ExpandNodeName(self.cfg, self.op.name)
@@ -11093,6 +11307,8 @@ class TagsLU(NoHooksLU): # pylint: disable-msg=W0223
     elif self.op.kind == constants.TAG_INSTANCE:
       self.op.name = _ExpandInstanceName(self.cfg, self.op.name)
       self.needed_locks[locking.LEVEL_INSTANCE] = self.op.name
     elif self.op.kind == constants.TAG_INSTANCE:
       self.op.name = _ExpandInstanceName(self.cfg, self.op.name)
       self.needed_locks[locking.LEVEL_INSTANCE] = self.op.name
+    elif self.op.kind == constants.TAG_NODEGROUP:
+      self.group_uuid = self.cfg.LookupNodeGroup(self.op.name)
 
     # FIXME: Acquire BGL for cluster tag operations (as of this writing it's
     # not possible to acquire the BGL based on opcode parameters)
 
     # FIXME: Acquire BGL for cluster tag operations (as of this writing it's
     # not possible to acquire the BGL based on opcode parameters)
@@ -11107,6 +11323,8 @@ class TagsLU(NoHooksLU): # pylint: disable-msg=W0223
       self.target = self.cfg.GetNodeInfo(self.op.name)
     elif self.op.kind == constants.TAG_INSTANCE:
       self.target = self.cfg.GetInstanceInfo(self.op.name)
       self.target = self.cfg.GetNodeInfo(self.op.name)
     elif self.op.kind == constants.TAG_INSTANCE:
       self.target = self.cfg.GetInstanceInfo(self.op.name)
+    elif self.op.kind == constants.TAG_NODEGROUP:
+      self.target = self.cfg.GetNodeGroup(self.group_uuid)
     else:
       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
                                  str(self.op.kind), errors.ECODE_INVAL)
     else:
       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
                                  str(self.op.kind), errors.ECODE_INVAL)
@@ -11162,6 +11380,8 @@ class LUTagsSearch(NoHooksLU):
     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
     nlist = cfg.GetAllNodesInfo().values()
     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
     nlist = cfg.GetAllNodesInfo().values()
     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
+    tgts.extend(("/nodegroup/%s" % n.name, n)
+                for n in cfg.GetAllNodeGroupsInfo().values())
     results = []
     for path, target in tgts:
       for tag in target.GetTags():
     results = []
     for path, target in tgts:
       for tag in target.GetTags():
@@ -11799,8 +12019,61 @@ class IAllocator(object):
     if not isinstance(rdict["result"], list):
       raise errors.OpExecError("Can't parse iallocator results: 'result' key"
                                " is not a list")
     if not isinstance(rdict["result"], list):
       raise errors.OpExecError("Can't parse iallocator results: 'result' key"
                                " is not a list")
+
+    if self.mode == constants.IALLOCATOR_MODE_RELOC:
+      assert self.relocate_from is not None
+      assert self.required_nodes == 1
+
+      node2group = dict((name, ndata["group"])
+                        for (name, ndata) in self.in_data["nodes"].items())
+
+      fn = compat.partial(self._NodesToGroups, node2group,
+                          self.in_data["nodegroups"])
+
+      request_groups = fn(self.relocate_from)
+      result_groups = fn(rdict["result"])
+
+      if result_groups != request_groups:
+        raise errors.OpExecError("Groups of nodes returned by iallocator (%s)"
+                                 " differ from original groups (%s)" %
+                                 (utils.CommaJoin(result_groups),
+                                  utils.CommaJoin(request_groups)))
+
     self.out_data = rdict
 
     self.out_data = rdict
 
+  @staticmethod
+  def _NodesToGroups(node2group, groups, nodes):
+    """Returns a list of unique group names for a list of nodes.
+
+    @type node2group: dict
+    @param node2group: Map from node name to group UUID
+    @type groups: dict
+    @param groups: Group information
+    @type nodes: list
+    @param nodes: Node names
+
+    """
+    result = set()
+
+    for node in nodes:
+      try:
+        group_uuid = node2group[node]
+      except KeyError:
+        # Ignore unknown node
+        pass
+      else:
+        try:
+          group = groups[group_uuid]
+        except KeyError:
+          # Can't find group, let's use UUID
+          group_name = group_uuid
+        else:
+          group_name = group["name"]
+
+        result.add(group_name)
+
+    return sorted(result)
+
 
 class LUTestAllocator(NoHooksLU):
   """Run allocator tests.
 
 class LUTestAllocator(NoHooksLU):
   """Run allocator tests.