KVM: configure bridged NICs at migration start
[ganeti-local] / lib / cmdlib.py
index 4381949..c5dbcd5 100644 (file)
@@ -1397,6 +1397,13 @@ class LUClusterVerify(LogicalUnit):
         _ErrorIf(test, self.ENODEHV, node,
                  "hypervisor %s verify failure: '%s'", hv_name, hv_result)
 
+    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
+    if ninfo.vm_capable and isinstance(hvp_result, list):
+      for item, hv_name, hv_result in hvp_result:
+        _ErrorIf(True, self.ENODEHV, node,
+                 "hypervisor %s parameter verify failure (source %s): %s",
+                 hv_name, item, hv_result)
+
     test = nresult.get(constants.NV_NODESETUP,
                            ["Missing NODESETUP results"])
     _ErrorIf(test, self.ENODESETUP, node, "node setup error: %s",
@@ -1437,7 +1444,7 @@ class LUClusterVerify(LogicalUnit):
              ntime_diff)
 
   def _VerifyNodeLVM(self, ninfo, nresult, vg_name):
-    """Check the node time.
+    """Check the node LVM results.
 
     @type ninfo: L{objects.Node}
     @param ninfo: the node to check
@@ -1473,8 +1480,31 @@ class LUClusterVerify(LogicalUnit):
         _ErrorIf(test, self.ENODELVM, node, "Invalid character ':' in PV"
                  " '%s' of VG '%s'", pvname, owner_vg)
 
+  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
+    """Check the node bridges.
+
+    @type ninfo: L{objects.Node}
+    @param ninfo: the node to check
+    @param nresult: the remote results for the node
+    @param bridges: the expected list of bridges
+
+    """
+    if not bridges:
+      return
+
+    node = ninfo.name
+    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
+
+    missing = nresult.get(constants.NV_BRIDGES, None)
+    test = not isinstance(missing, list)
+    _ErrorIf(test, self.ENODENET, node,
+             "did not return valid bridge information")
+    if not test:
+      _ErrorIf(bool(missing), self.ENODENET, node, "missing bridges: %s" %
+               utils.CommaJoin(sorted(missing)))
+
   def _VerifyNodeNetwork(self, ninfo, nresult):
-    """Check the node time.
+    """Check the node network connectivity results.
 
     @type ninfo: L{objects.Node}
     @param ninfo: the node to check
@@ -1547,7 +1577,7 @@ class LUClusterVerify(LogicalUnit):
                node_current)
 
     for node, n_img in node_image.items():
-      if (not node == node_current):
+      if node != node_current:
         test = instance in n_img.instances
         _ErrorIf(test, self.EINSTANCEWRONGNODE, instance,
                  "instance should not run on node %s", node)
@@ -1557,7 +1587,11 @@ class LUClusterVerify(LogicalUnit):
                 for idx, (success, status) in enumerate(disks)]
 
     for nname, success, bdev_status, idx in diskdata:
-      _ErrorIf(instanceconfig.admin_up and not success,
+      # the 'ghost node' construction in Exec() ensures that we have a
+      # node here
+      snode = node_image[nname]
+      bad_snode = snode.ghost or snode.offline
+      _ErrorIf(instanceconfig.admin_up and not success and not bad_snode,
                self.EINSTANCEFAULTYDISK, instance,
                "couldn't retrieve status for disk/%s on %s: %s",
                idx, nname, bdev_status)
@@ -1615,6 +1649,12 @@ class LUClusterVerify(LogicalUnit):
       # WARNING: we currently take into account down instances as well
       # as up ones, considering that even if they're down someone
       # might want to start them even in the event of a node failure.
+      if n_img.offline:
+        # we're skipping offline nodes from the N+1 warning, since
+        # most likely we don't have good memory infromation from them;
+        # we already list instances living on such nodes, and that's
+        # enough warning
+        continue
       for prinode, instances in n_img.sbp.items():
         needed_mem = 0
         for instance in instances:
@@ -1624,7 +1664,8 @@ class LUClusterVerify(LogicalUnit):
         test = n_img.mfree < needed_mem
         self._ErrorIf(test, self.ENODEN1, node,
                       "not enough memory to accomodate instance failovers"
-                      " should node %s fail", prinode)
+                      " should node %s fail (%dMiB needed, %dMiB available)",
+                      prinode, needed_mem, n_img.mfree)
 
   def _VerifyNodeFiles(self, ninfo, nresult, file_list, local_cksum,
                        master_files):
@@ -1788,6 +1829,7 @@ class LUClusterVerify(LogicalUnit):
 
     assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
 
+    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
     for os_name, os_data in nimg.oslist.items():
       assert os_data, "Empty OS status for OS %s?!" % os_name
       f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
@@ -1815,11 +1857,12 @@ class LUClusterVerify(LogicalUnit):
         continue
       for kind, a, b in [("API version", f_api, b_api),
                          ("variants list", f_var, b_var),
-                         ("parameters", f_param, b_param)]:
+                         ("parameters", beautify_params(f_param),
+                          beautify_params(b_param))]:
         _ErrorIf(a != b, self.ENODEOS, node,
-                 "OS %s %s differs from reference node %s: %s vs. %s",
+                 "OS %s for %s differs from reference node %s: [%s] vs. [%s]",
                  kind, os_name, base.name,
-                 utils.CommaJoin(a), utils.CommaJoin(b))
+                 utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
 
     # check any missing OSes
     missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
@@ -2029,6 +2072,21 @@ class LUClusterVerify(LogicalUnit):
 
     return instdisk
 
+  def _VerifyHVP(self, hvp_data):
+    """Verifies locally the syntax of the hypervisor parameters.
+
+    """
+    for item, hv_name, hv_params in hvp_data:
+      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
+             (item, hv_name))
+      try:
+        hv_class = hypervisor.GetHypervisor(hv_name)
+        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
+        hv_class.CheckParameterSyntax(hv_params)
+      except errors.GenericError, err:
+        self._ErrorIf(True, self.ECLUSTERCFG, None, msg % str(err))
+
+
   def BuildHooksEnv(self):
     """Build hooks env.
 
@@ -2067,12 +2125,11 @@ class LUClusterVerify(LogicalUnit):
     drbd_helper = self.cfg.GetDRBDHelper()
     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
     cluster = self.cfg.GetClusterInfo()
-    nodelist = utils.NiceSort(self.cfg.GetNodeList())
-    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
-    nodeinfo_byname = dict(zip(nodelist, nodeinfo))
-    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
-    instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
-                        for iname in instancelist)
+    nodeinfo_byname = self.cfg.GetAllNodesInfo()
+    nodelist = utils.NiceSort(nodeinfo_byname.keys())
+    nodeinfo = [nodeinfo_byname[nname] for nname in nodelist]
+    instanceinfo = self.cfg.GetAllInstancesInfo()
+    instancelist = utils.NiceSort(instanceinfo.keys())
     groupinfo = self.cfg.GetAllNodeGroupsInfo()
     i_non_redundant = [] # Non redundant instances
     i_non_a_balanced = [] # Non auto-balanced instances
@@ -2094,12 +2151,32 @@ class LUClusterVerify(LogicalUnit):
 
     local_checksums = utils.FingerprintFiles(file_names)
 
+    # Compute the set of hypervisor parameters
+    hvp_data = []
+    for hv_name in hypervisors:
+      hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
+    for os_name, os_hvp in cluster.os_hvp.items():
+      for hv_name, hv_params in os_hvp.items():
+        if not hv_params:
+          continue
+        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
+        hvp_data.append(("os %s" % os_name, hv_name, full_params))
+    # TODO: collapse identical parameter values in a single one
+    for instance in instanceinfo.values():
+      if not instance.hvparams:
+        continue
+      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
+                       cluster.FillHV(instance)))
+    # and verify them locally
+    self._VerifyHVP(hvp_data)
+
     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
     node_verify_param = {
       constants.NV_FILELIST: file_names,
       constants.NV_NODELIST: [node.name for node in nodeinfo
                               if not node.offline],
       constants.NV_HYPERVISOR: hypervisors,
+      constants.NV_HVPARAMS: hvp_data,
       constants.NV_NODENETTEST: [(node.name, node.primary_ip,
                                   node.secondary_ip) for node in nodeinfo
                                  if not node.offline],
@@ -2122,6 +2199,21 @@ class LUClusterVerify(LogicalUnit):
     if drbd_helper:
       node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
 
+    # bridge checks
+    # FIXME: this needs to be changed per node-group, not cluster-wide
+    bridges = set()
+    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
+    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
+      bridges.add(default_nicpp[constants.NIC_LINK])
+    for instance in instanceinfo.values():
+      for nic in instance.nics:
+        full_nic = cluster.SimpleFillNIC(nic.nicparams)
+        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
+          bridges.add(full_nic[constants.NIC_LINK])
+
+    if bridges:
+      node_verify_param[constants.NV_BRIDGES] = list(bridges)
+
     # Build our expected cluster state
     node_image = dict((node.name, self.NodeImage(offline=node.offline,
                                                  name=node.name,
@@ -2232,6 +2324,7 @@ class LUClusterVerify(LogicalUnit):
           if refos_img is None:
             refos_img = nimg
           self._VerifyNodeOS(node_i, nimg, refos_img)
+        self._VerifyNodeBridges(node_i, nresult, bridges)
 
     feedback_fn("* Verifying instance status")
     for instance in instancelist:
@@ -2248,8 +2341,8 @@ class LUClusterVerify(LogicalUnit):
                self.ENODERPC, pnode, "instance %s, connection to"
                " primary node failed", instance)
 
-      if pnode_img.offline:
-        inst_nodes_offline.append(pnode)
+      _ErrorIf(pnode_img.offline, self.EINSTANCEBADNODE, instance,
+               "instance lives on offline node %s", inst_config.primary_node)
 
       # If the instance is non-redundant we cannot survive losing its primary
       # node, so we are not N+1 compliant. On the other hand we have no disk
@@ -2298,7 +2391,7 @@ class LUClusterVerify(LogicalUnit):
 
       # warn that the instance lives on offline nodes
       _ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance,
-               "instance lives on offline node(s) %s",
+               "instance has offline secondary node(s) %s",
                utils.CommaJoin(inst_nodes_offline))
       # ... or ghost/non-vm_capable nodes
       for node in inst_config.all_nodes:
@@ -2406,14 +2499,12 @@ class LUClusterVerifyDisks(NoHooksLU):
     result = res_nodes, res_instances, res_missing = {}, [], {}
 
     nodes = utils.NiceSort(self.cfg.GetVmCapableNodeList())
-    instances = [self.cfg.GetInstanceInfo(name)
-                 for name in self.cfg.GetInstanceList()]
+    instances = self.cfg.GetAllInstancesInfo().values()
 
     nv_dict = {}
     for inst in instances:
       inst_lvs = {}
-      if (not inst.admin_up or
-          inst.disk_template not in constants.DTS_NET_MIRROR):
+      if not inst.admin_up:
         continue
       inst.MapLVsByNode(inst_lvs)
       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
@@ -2424,14 +2515,8 @@ class LUClusterVerifyDisks(NoHooksLU):
     if not nv_dict:
       return result
 
-    vg_names = self.rpc.call_vg_list(nodes)
-    for node in nodes:
-      vg_names[node].Raise("Cannot get list of VGs")
-
-    for node in nodes:
-      # node_volume
-      node_res = self.rpc.call_lv_list([node],
-                                       vg_names[node].payload.keys())[node]
+    node_lvs = self.rpc.call_lv_list(nodes, [])
+    for node, node_res in node_lvs.items():
       if node_res.offline:
         continue
       msg = node_res.fail_msg
@@ -2540,16 +2625,18 @@ class LUClusterRepairDiskSizes(NoHooksLU):
       newl = [v[2].Copy() for v in dskl]
       for dsk in newl:
         self.cfg.SetDiskID(dsk, node)
-      result = self.rpc.call_blockdev_getsizes(node, newl)
+      result = self.rpc.call_blockdev_getsize(node, newl)
       if result.fail_msg:
-        self.LogWarning("Failure in blockdev_getsizes call to node"
+        self.LogWarning("Failure in blockdev_getsize call to node"
                         " %s, ignoring", node)
         continue
-      if len(result.data) != len(dskl):
+      if len(result.payload) != len(dskl):
+        logging.warning("Invalid result from node %s: len(dksl)=%d,"
+                        " result.payload=%s", node, len(dskl), result.payload)
         self.LogWarning("Invalid result from node %s, ignoring node results",
                         node)
         continue
-      for ((instance, idx, disk), size) in zip(dskl, result.data):
+      for ((instance, idx, disk), size) in zip(dskl, result.payload):
         if size is None:
           self.LogWarning("Disk %d of instance %s did not return size"
                           " information, ignoring", idx, instance.name)
@@ -2755,6 +2842,12 @@ class LUClusterSetParams(LogicalUnit):
       utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
       self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
 
+      # TODO: we need a more general way to handle resetting
+      # cluster-level parameters to default values
+      if self.new_ndparams["oob_program"] == "":
+        self.new_ndparams["oob_program"] = \
+            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
+
     if self.op.nicparams:
       utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
       self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
@@ -2777,8 +2870,8 @@ class LUClusterSetParams(LogicalUnit):
           # if we're moving instances to routed, check that they have an ip
           target_mode = params_filled[constants.NIC_MODE]
           if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
-            nic_errors.append("Instance %s, nic/%d: routed nick with no ip" %
-                              (instance.name, nic_idx))
+            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
+                              " address" % (instance.name, nic_idx))
       if nic_errors:
         raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
                                    "\n".join(nic_errors))
@@ -3192,31 +3285,33 @@ class LUOobCommand(NoHooksLU):
     Any errors are signaled by raising errors.OpPrereqError.
 
     """
-    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
-    node = self.cfg.GetNodeInfo(self.op.node_name)
-
-    if node is None:
-      raise errors.OpPrereqError("Node %s not found" % self.op.node_name)
-
-    self.oob_program = _SupportsOob(self.cfg, node)
+    self.nodes = []
+    for node_name in self.op.node_names:
+      node = self.cfg.GetNodeInfo(node_name)
 
-    if not self.oob_program:
-      raise errors.OpPrereqError("OOB is not supported for node %s" %
-                                 self.op.node_name)
-
-    if self.op.command == constants.OOB_POWER_OFF and not node.offline:
-      raise errors.OpPrereqError(("Cannot power off node %s because it is"
-                                  " not marked offline") % self.op.node_name)
+      if node is None:
+        raise errors.OpPrereqError("Node %s not found" % node_name,
+                                   errors.ECODE_NOENT)
+      else:
+        self.nodes.append(node)
 
-    self.node = node
+      if (self.op.command == constants.OOB_POWER_OFF and not node.offline):
+        raise errors.OpPrereqError(("Cannot power off node %s because it is"
+                                    " not marked offline") % node_name,
+                                   errors.ECODE_STATE)
 
   def ExpandNames(self):
     """Gather locks we need.
 
     """
-    node_name = _ExpandNodeName(self.cfg, self.op.node_name)
+    if self.op.node_names:
+      self.op.node_names = [_ExpandNodeName(self.cfg, name)
+                            for name in self.op.node_names]
+    else:
+      self.op.node_names = self.cfg.GetNodeList()
+
     self.needed_locks = {
-      locking.LEVEL_NODE: [node_name],
+      locking.LEVEL_NODE: self.op.node_names,
       }
 
   def Exec(self, feedback_fn):
@@ -3224,40 +3319,63 @@ class LUOobCommand(NoHooksLU):
 
     """
     master_node = self.cfg.GetMasterNode()
-    node = self.node
-
-    logging.info("Executing out-of-band command '%s' using '%s' on %s",
-                 self.op.command, self.oob_program, self.op.node_name)
-    result = self.rpc.call_run_oob(master_node, self.oob_program,
-                                   self.op.command, self.op.node_name,
-                                   self.op.timeout)
+    ret = []
 
-    result.Raise("An error occurred on execution of OOB helper")
+    for node in self.nodes:
+      node_entry = [(constants.RS_NORMAL, node.name)]
+      ret.append(node_entry)
 
-    self._CheckPayload(result)
+      oob_program = _SupportsOob(self.cfg, node)
 
-    if self.op.command == constants.OOB_HEALTH:
-      # For health we should log important events
-      for item, status in result.payload:
-        if status in [constants.OOB_STATUS_WARNING,
-                      constants.OOB_STATUS_CRITICAL]:
-          logging.warning("On node '%s' item '%s' has status '%s'",
-                          self.op.node_name, item, status)
-
-    if self.op.command == constants.OOB_POWER_ON:
-      node.powered = True
-    elif self.op.command == constants.OOB_POWER_OFF:
-      node.powered = False
-    elif self.op.command == constants.OOB_POWER_STATUS:
-      powered = result.payload[constants.OOB_POWER_STATUS_POWERED]
-      if powered != self.node.powered:
-        logging.warning(("Recorded power state (%s) of node '%s' does not match"
-                         " actual power state (%s)"), node.powered,
-                        self.op.node_name, powered)
+      if not oob_program:
+        node_entry.append((constants.RS_UNAVAIL, None))
+        continue
 
-    self.cfg.Update(node, feedback_fn)
+      logging.info("Executing out-of-band command '%s' using '%s' on %s",
+                   self.op.command, oob_program, node.name)
+      result = self.rpc.call_run_oob(master_node, oob_program,
+                                     self.op.command, node.name,
+                                     self.op.timeout)
 
-    return result.payload
+      if result.fail_msg:
+        self.LogWarning("On node '%s' out-of-band RPC failed with: %s",
+                        node.name, result.fail_msg)
+        node_entry.append((constants.RS_NODATA, None))
+      else:
+        try:
+          self._CheckPayload(result)
+        except errors.OpExecError, err:
+          self.LogWarning("The payload returned by '%s' is not valid: %s",
+                          node.name, err)
+          node_entry.append((constants.RS_NODATA, None))
+        else:
+          if self.op.command == constants.OOB_HEALTH:
+            # For health we should log important events
+            for item, status in result.payload:
+              if status in [constants.OOB_STATUS_WARNING,
+                            constants.OOB_STATUS_CRITICAL]:
+                self.LogWarning("On node '%s' item '%s' has status '%s'",
+                                node.name, item, status)
+
+          if self.op.command == constants.OOB_POWER_ON:
+            node.powered = True
+          elif self.op.command == constants.OOB_POWER_OFF:
+            node.powered = False
+          elif self.op.command == constants.OOB_POWER_STATUS:
+            powered = result.payload[constants.OOB_POWER_STATUS_POWERED]
+            if powered != node.powered:
+              logging.warning(("Recorded power state (%s) of node '%s' does not"
+                               " match actual power state (%s)"), node.powered,
+                              node.name, powered)
+
+          # For configuration changing commands we should update the node
+          if self.op.command in (constants.OOB_POWER_ON,
+                                 constants.OOB_POWER_OFF):
+            self.cfg.Update(node, feedback_fn)
+
+          node_entry.append((constants.RS_NORMAL, result.payload))
+
+    return ret
 
   def _CheckPayload(self, result):
     """Checks if the payload is valid.
@@ -3271,10 +3389,11 @@ class LUOobCommand(NoHooksLU):
       if not isinstance(result.payload, list):
         errs.append("command 'health' is expected to return a list but got %s" %
                     type(result.payload))
-      for item, status in result.payload:
-        if status not in constants.OOB_STATUSES:
-          errs.append("health item '%s' has invalid status '%s'" %
-                      (item, status))
+      else:
+        for item, status in result.payload:
+          if status not in constants.OOB_STATUSES:
+            errs.append("health item '%s' has invalid status '%s'" %
+                        (item, status))
 
     if self.op.command == constants.OOB_POWER_STATUS:
       if not isinstance(result.payload, dict):
@@ -3369,7 +3488,9 @@ class LUOsDiagnose(NoHooksLU):
     """Compute the list of OSes.
 
     """
-    valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
+    valid_nodes = [node.name
+                   for node in self.cfg.GetAllNodesInfo().values()
+                   if not node.offline and node.vm_capable]
     node_data = self.rpc.call_os_diagnose(valid_nodes)
     pol = self._DiagnoseByOS(node_data)
     output = []
@@ -3558,7 +3679,10 @@ class _NodeQuery(_QueryBase):
 
     # Gather data as requested
     if query.NQ_LIVE in self.requested_data:
-      node_data = lu.rpc.call_node_info(nodenames, lu.cfg.GetVGName(),
+      # filter out non-vm_capable nodes
+      toquery_nodes = [name for name in nodenames if all_info[name].vm_capable]
+
+      node_data = lu.rpc.call_node_info(toquery_nodes, lu.cfg.GetVGName(),
                                         lu.cfg.GetHypervisorType())
       live_data = dict((name, nresult.payload)
                        for (name, nresult) in node_data.items()
@@ -3806,18 +3930,21 @@ class _InstanceQuery(_QueryBase):
     """Computes the list of instances and their attributes.
 
     """
+    cluster = lu.cfg.GetClusterInfo()
     all_info = lu.cfg.GetAllInstancesInfo()
 
     instance_names = self._GetNames(lu, all_info.keys(), locking.LEVEL_INSTANCE)
 
     instance_list = [all_info[name] for name in instance_names]
-    nodes = frozenset([inst.primary_node for inst in instance_list])
+    nodes = frozenset(itertools.chain(*(inst.all_nodes
+                                        for inst in instance_list)))
     hv_list = list(set([inst.hypervisor for inst in instance_list]))
     bad_nodes = []
     offline_nodes = []
+    wrongnode_inst = set()
 
     # Gather data as requested
-    if query.IQ_LIVE in self.requested_data:
+    if self.requested_data & set([query.IQ_LIVE, query.IQ_CONSOLE]):
       live_data = {}
       node_data = lu.rpc.call_all_instances_info(nodes, hv_list)
       for name in nodes:
@@ -3829,7 +3956,17 @@ class _InstanceQuery(_QueryBase):
         if result.fail_msg:
           bad_nodes.append(name)
         elif result.payload:
-          live_data.update(result.payload)
+          for inst in result.payload:
+            if inst in all_info:
+              if all_info[inst].primary_node == name:
+                live_data.update(result.payload)
+              else:
+                wrongnode_inst.add(inst)
+            else:
+              # orphan instance; we don't list it here as we don't
+              # handle this case yet in the output of instance listing
+              logging.warning("Orphan instance '%s' found on node %s",
+                              inst, name)
         # else no instance is alive
     else:
       live_data = {}
@@ -3843,9 +3980,21 @@ class _InstanceQuery(_QueryBase):
     else:
       disk_usage = None
 
+    if query.IQ_CONSOLE in self.requested_data:
+      consinfo = {}
+      for inst in instance_list:
+        if inst.name in live_data:
+          # Instance is running
+          consinfo[inst.name] = _GetInstanceConsole(cluster, inst)
+        else:
+          consinfo[inst.name] = None
+      assert set(consinfo.keys()) == set(instance_names)
+    else:
+      consinfo = None
+
     return query.InstanceQueryData(instance_list, lu.cfg.GetClusterInfo(),
                                    disk_usage, offline_nodes, bad_nodes,
-                                   live_data)
+                                   live_data, wrongnode_inst, consinfo)
 
 
 class LUQuery(NoHooksLU):
@@ -3944,6 +4093,11 @@ class LUNodeAdd(LogicalUnit):
     self.hostname = netutils.GetHostname(name=self.op.node_name,
                                          family=self.primary_ip_family)
     self.op.node_name = self.hostname.name
+
+    if self.op.readd and self.op.node_name == self.cfg.GetMasterNode():
+      raise errors.OpPrereqError("Cannot readd the master node",
+                                 errors.ECODE_STATE)
+
     if self.op.readd and self.op.group:
       raise errors.OpPrereqError("Cannot pass a node group when a node is"
                                  " being readded", errors.ECODE_INVAL)
@@ -4172,7 +4326,7 @@ class LUNodeAdd(LogicalUnit):
           feedback_fn("ssh/hostname verification failed"
                       " (checking from %s): %s" %
                       (verifier, nl_payload[failed]))
-        raise errors.OpExecError("ssh/hostname verification failed.")
+        raise errors.OpExecError("ssh/hostname verification failed")
 
     if self.op.readd:
       _RedistributeAncillaryFiles(self)
@@ -4319,15 +4473,15 @@ class LUNodeSetParams(LogicalUnit):
                                    errors.ECODE_STATE)
 
     if node.master_candidate and self.might_demote and not self.lock_all:
-      assert not self.op.auto_promote, "auto-promote set but lock_all not"
+      assert not self.op.auto_promote, "auto_promote set but lock_all not"
       # check if after removing the current node, we're missing master
       # candidates
       (mc_remaining, mc_should, _) = \
           self.cfg.GetMasterCandidateStats(exceptions=[node.name])
       if mc_remaining < mc_should:
         raise errors.OpPrereqError("Not enough master candidates, please"
-                                   " pass auto_promote to allow promotion",
-                                   errors.ECODE_STATE)
+                                   " pass auto promote option to allow"
+                                   " promotion", errors.ECODE_STATE)
 
     self.old_flags = old_flags = (node.master_candidate,
                                   node.drained, node.offline)
@@ -4578,6 +4732,8 @@ class LUClusterQuery(NoHooksLU):
       "reserved_lvs": cluster.reserved_lvs,
       "primary_ip_version": primary_ip_version,
       "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
+      "hidden_os": cluster.hidden_os,
+      "blacklisted_os": cluster.blacklisted_os,
       }
 
     return result
@@ -4700,13 +4856,13 @@ def _AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
   # SyncSource, etc.)
 
   # 1st pass, assemble on all nodes in secondary mode
-  for inst_disk in disks:
+  for idx, inst_disk in enumerate(disks):
     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
       if ignore_size:
         node_disk = node_disk.Copy()
         node_disk.UnsetSize()
       lu.cfg.SetDiskID(node_disk, node)
-      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
+      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False, idx)
       msg = result.fail_msg
       if msg:
         lu.proc.LogWarning("Could not prepare block device %s on node %s"
@@ -4718,7 +4874,7 @@ def _AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
   # FIXME: race condition on drbd migration to primary
 
   # 2nd pass, do only the primary node
-  for inst_disk in disks:
+  for idx, inst_disk in enumerate(disks):
     dev_path = None
 
     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
@@ -4728,7 +4884,7 @@ def _AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
         node_disk = node_disk.Copy()
         node_disk.UnsetSize()
       lu.cfg.SetDiskID(node_disk, node)
-      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
+      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True, idx)
       msg = result.fail_msg
       if msg:
         lu.proc.LogWarning("Could not prepare block device %s on node %s"
@@ -4794,7 +4950,10 @@ class LUInstanceDeactivateDisks(NoHooksLU):
 
     """
     instance = self.instance
-    _SafeShutdownInstanceDisks(self, instance)
+    if self.op.force:
+      _ShutdownInstanceDisks(self, instance)
+    else:
+      _SafeShutdownInstanceDisks(self, instance)
 
 
 def _SafeShutdownInstanceDisks(lu, instance, disks=None):
@@ -5032,7 +5191,8 @@ class LUInstanceStartup(LogicalUnit):
     instance = self.instance
     force = self.op.force
 
-    self.cfg.MarkInstanceUp(instance.name)
+    if not self.op.no_remember:
+      self.cfg.MarkInstanceUp(instance.name)
 
     if self.primary_offline:
       assert self.op.ignore_offline_nodes
@@ -5173,7 +5333,8 @@ class LUInstanceShutdown(LogicalUnit):
     node_current = instance.primary_node
     timeout = self.op.timeout
 
-    self.cfg.MarkInstanceDown(instance.name)
+    if not self.op.no_remember:
+      self.cfg.MarkInstanceDown(instance.name)
 
     if self.primary_offline:
       assert self.op.ignore_offline_nodes
@@ -5281,8 +5442,25 @@ class LUInstanceRecreateDisks(LogicalUnit):
   HTYPE = constants.HTYPE_INSTANCE
   REQ_BGL = False
 
+  def CheckArguments(self):
+    # normalise the disk list
+    self.op.disks = sorted(frozenset(self.op.disks))
+
   def ExpandNames(self):
     self._ExpandAndLockInstance()
+    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
+    if self.op.nodes:
+      self.op.nodes = [_ExpandNodeName(self.cfg, n) for n in self.op.nodes]
+      self.needed_locks[locking.LEVEL_NODE] = list(self.op.nodes)
+    else:
+      self.needed_locks[locking.LEVEL_NODE] = []
+
+  def DeclareLocks(self, level):
+    if level == locking.LEVEL_NODE:
+      # if we replace the nodes, we only need to lock the old primary,
+      # otherwise we need to lock all nodes for disk re-creation
+      primary_only = bool(self.op.nodes)
+      self._LockInstancesNodes(primary_only=primary_only)
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -5303,12 +5481,31 @@ class LUInstanceRecreateDisks(LogicalUnit):
     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
     assert instance is not None, \
       "Cannot retrieve locked instance %s" % self.op.instance_name
-    _CheckNodeOnline(self, instance.primary_node)
+    if self.op.nodes:
+      if len(self.op.nodes) != len(instance.all_nodes):
+        raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
+                                   " %d replacement nodes were specified" %
+                                   (instance.name, len(instance.all_nodes),
+                                    len(self.op.nodes)),
+                                   errors.ECODE_INVAL)
+      assert instance.disk_template != constants.DT_DRBD8 or \
+          len(self.op.nodes) == 2
+      assert instance.disk_template != constants.DT_PLAIN or \
+          len(self.op.nodes) == 1
+      primary_node = self.op.nodes[0]
+    else:
+      primary_node = instance.primary_node
+    _CheckNodeOnline(self, primary_node)
 
     if instance.disk_template == constants.DT_DISKLESS:
       raise errors.OpPrereqError("Instance '%s' has no disks" %
                                  self.op.instance_name, errors.ECODE_INVAL)
-    _CheckInstanceDown(self, instance, "cannot recreate disks")
+    # if we replace nodes *and* the old primary is offline, we don't
+    # check
+    assert instance.primary_node in self.needed_locks[locking.LEVEL_NODE]
+    old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
+    if not (self.op.nodes and old_pnode.offline):
+      _CheckInstanceDown(self, instance, "cannot recreate disks")
 
     if not self.op.disks:
       self.op.disks = range(len(instance.disks))
@@ -5317,18 +5514,39 @@ class LUInstanceRecreateDisks(LogicalUnit):
         if idx >= len(instance.disks):
           raise errors.OpPrereqError("Invalid disk index passed '%s'" % idx,
                                      errors.ECODE_INVAL)
-
+    if self.op.disks != range(len(instance.disks)) and self.op.nodes:
+      raise errors.OpPrereqError("Can't recreate disks partially and"
+                                 " change the nodes at the same time",
+                                 errors.ECODE_INVAL)
     self.instance = instance
 
   def Exec(self, feedback_fn):
     """Recreate the disks.
 
     """
+    # change primary node, if needed
+    if self.op.nodes:
+      self.instance.primary_node = self.op.nodes[0]
+      self.LogWarning("Changing the instance's nodes, you will have to"
+                      " remove any disks left on the older nodes manually")
+
     to_skip = []
-    for idx, _ in enumerate(self.instance.disks):
+    for idx, disk in enumerate(self.instance.disks):
       if idx not in self.op.disks: # disk idx has not been passed in
         to_skip.append(idx)
         continue
+      # update secondaries for disks, if needed
+      if self.op.nodes:
+        if disk.dev_type == constants.LD_DRBD8:
+          # need to update the nodes
+          assert len(self.op.nodes) == 2
+          logical_id = list(disk.logical_id)
+          logical_id[0] = self.op.nodes[0]
+          logical_id[1] = self.op.nodes[1]
+          disk.logical_id = tuple(logical_id)
+
+    if self.op.nodes:
+      self.cfg.Update(self.instance, feedback_fn)
 
     _CreateDisks(self, self.instance, to_skip=to_skip)
 
@@ -5377,8 +5595,9 @@ class LUInstanceRename(LogicalUnit):
     new_name = self.op.new_name
     if self.op.name_check:
       hostname = netutils.GetHostname(name=new_name)
-      self.LogInfo("Resolved given name '%s' to '%s'", new_name,
-                   hostname.name)
+      if hostname != new_name:
+        self.LogInfo("Resolved given name '%s' to '%s'", new_name,
+                     hostname.name)
       new_name = self.op.new_name = hostname.name
       if (self.op.ip_check and
           netutils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT)):
@@ -5863,7 +6082,7 @@ class LUInstanceMove(LogicalUnit):
     for idx, disk in enumerate(instance.disks):
       self.LogInfo("Copying data for disk %d", idx)
       result = self.rpc.call_blockdev_assemble(target_node, disk,
-                                               instance.name, True)
+                                               instance.name, True, idx)
       if result.fail_msg:
         self.LogWarning("Can't assemble newly created disk %d: %s",
                         idx, result.fail_msg)
@@ -6406,17 +6625,18 @@ def _GenerateUniqueNames(lu, exts):
   return results
 
 
-def _GenerateDRBD8Branch(lu, primary, secondary, size, vgname, names, iv_name,
-                         p_minor, s_minor):
+def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names,
+                         iv_name, p_minor, s_minor):
   """Generate a drbd8 device complete with its children.
 
   """
+  assert len(vgnames) == len(names) == 2
   port = lu.cfg.AllocatePort()
   shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
-                          logical_id=(vgname, names[0]))
+                          logical_id=(vgnames[0], names[0]))
   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
-                          logical_id=(vgname, names[1]))
+                          logical_id=(vgnames[1], names[1]))
   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
                           logical_id=(primary, secondary, port,
                                       p_minor, s_minor,
@@ -6470,9 +6690,11 @@ def _GenerateDiskTemplate(lu, template_name,
       names.append(lv_prefix + "_meta")
     for idx, disk in enumerate(disk_info):
       disk_index = idx + base_index
-      vg = disk.get("vg", vgname)
+      data_vg = disk.get("vg", vgname)
+      meta_vg = disk.get("metavg", data_vg)
       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
-                                      disk["size"], vg, names[idx*2:idx*2+2],
+                                      disk["size"], [data_vg, meta_vg],
+                                      names[idx*2:idx*2+2],
                                       "disk/%d" % disk_index,
                                       minors[idx*2], minors[idx*2+1])
       disk_dev.mode = disk["mode"]
@@ -6528,6 +6750,10 @@ def _WipeDisks(lu, instance):
 
   """
   node = instance.primary_node
+
+  for device in instance.disks:
+    lu.cfg.SetDiskID(device, node)
+
   logging.info("Pause sync of instance %s disks", instance.name)
   result = lu.rpc.call_blockdev_pause_resume_sync(node, instance.disks, True)
 
@@ -6538,13 +6764,17 @@ def _WipeDisks(lu, instance):
 
   try:
     for idx, device in enumerate(instance.disks):
-      lu.LogInfo("* Wiping disk %d", idx)
-      logging.info("Wiping disk %d for instance %s", idx, instance.name)
-
       # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
       # MAX_WIPE_CHUNK at max
       wipe_chunk_size = min(constants.MAX_WIPE_CHUNK, device.size / 100.0 *
                             constants.MIN_WIPE_CHUNK_PERCENT)
+      # we _must_ make this an int, otherwise rounding errors will
+      # occur
+      wipe_chunk_size = int(wipe_chunk_size)
+
+      lu.LogInfo("* Wiping disk %d", idx)
+      logging.info("Wiping disk %d for instance %s, node %s using"
+                   " chunk size %s", idx, instance.name, node, wipe_chunk_size)
 
       offset = 0
       size = device.size
@@ -6553,6 +6783,8 @@ def _WipeDisks(lu, instance):
 
       while offset < size:
         wipe_size = min(wipe_chunk_size, size - offset)
+        logging.debug("Wiping disk %d, offset %s, chunk %s",
+                      idx, offset, wipe_size)
         result = lu.rpc.call_blockdev_wipe(node, device, offset, wipe_size)
         result.Raise("Could not wipe disk %d at offset %d for size %d" %
                      (idx, offset, wipe_size))
@@ -6720,6 +6952,21 @@ def _ComputeDiskSize(disk_template, disks):
   return req_size_dict[disk_template]
 
 
+def _FilterVmNodes(lu, nodenames):
+  """Filters out non-vm_capable nodes from a list.
+
+  @type lu: L{LogicalUnit}
+  @param lu: the logical unit for which we check
+  @type nodenames: list
+  @param nodenames: the list of nodes on which we should check
+  @rtype: list
+  @return: the list of vm-capable nodes
+
+  """
+  vm_nodes = frozenset(lu.cfg.GetNonVmCapableNodeList())
+  return [name for name in nodenames if name not in vm_nodes]
+
+
 def _CheckHVParams(lu, nodenames, hvname, hvparams):
   """Hypervisor parameter validation.
 
@@ -6737,6 +6984,7 @@ def _CheckHVParams(lu, nodenames, hvname, hvparams):
   @raise errors.OpPrereqError: if the parameters are not valid
 
   """
+  nodenames = _FilterVmNodes(lu, nodenames)
   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
                                                   hvname,
                                                   hvparams)
@@ -6764,6 +7012,7 @@ def _CheckOSParams(lu, required, nodenames, osname, osparams):
   @raise errors.OpPrereqError: if the parameters are not valid
 
   """
+  nodenames = _FilterVmNodes(lu, nodenames)
   result = lu.rpc.call_os_validate(required, nodenames, osname,
                                    [constants.OS_VALIDATE_PARAMETERS],
                                    osparams)
@@ -6847,9 +7096,8 @@ class LUInstanceCreate(LogicalUnit):
       raise errors.OpPrereqError("Invalid file driver name '%s'" %
                                  self.op.file_driver, errors.ECODE_INVAL)
 
-    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
-      raise errors.OpPrereqError("File storage directory path not absolute",
-                                 errors.ECODE_INVAL)
+    if self.op.disk_template == constants.DT_FILE:
+      opcodes.RequireFileStorage()
 
     ### Node/iallocator related checks
     _CheckIAllocatorOrNode(self, "iallocator", "pnode")
@@ -7197,10 +7445,35 @@ class LUInstanceCreate(LogicalUnit):
       if name in os_defs and os_defs[name] == self.op.osparams[name]:
         del self.op.osparams[name]
 
+  def _CalculateFileStorageDir(self):
+    """Calculate final instance file storage dir.
+
+    """
+    # file storage dir calculation/check
+    self.instance_file_storage_dir = None
+    if self.op.disk_template == constants.DT_FILE:
+      # build the full file storage dir path
+      joinargs = []
+
+      cfg_storagedir = self.cfg.GetFileStorageDir()
+      if not cfg_storagedir:
+        raise errors.OpPrereqError("Cluster file storage dir not defined")
+      joinargs.append(cfg_storagedir)
+
+      if self.op.file_storage_dir is not None:
+        joinargs.append(self.op.file_storage_dir)
+
+      joinargs.append(self.op.instance_name)
+
+      # pylint: disable-msg=W0142
+      self.instance_file_storage_dir = utils.PathJoin(*joinargs)
+
   def CheckPrereq(self):
     """Check prerequisites.
 
     """
+    self._CalculateFileStorageDir()
+
     if self.op.mode == constants.INSTANCE_IMPORT:
       export_info = self._ReadExportInfo()
       self._ReadExportParams(export_info)
@@ -7327,8 +7600,9 @@ class LUInstanceCreate(LogicalUnit):
       except (TypeError, ValueError):
         raise errors.OpPrereqError("Invalid disk size '%s'" % size,
                                    errors.ECODE_INVAL)
-      vg = disk.get("vg", self.cfg.GetVGName())
-      new_disk = {"size": size, "mode": mode, "vg": vg}
+      data_vg = disk.get("vg", self.cfg.GetVGName())
+      meta_vg = disk.get("metavg", data_vg)
+      new_disk = {"size": size, "mode": mode, "vg": data_vg, "metavg": meta_vg}
       if "adopt" in disk:
         new_disk["adopt"] = disk["adopt"]
       self.disks.append(new_disk)
@@ -7497,25 +7771,12 @@ class LUInstanceCreate(LogicalUnit):
     else:
       network_port = None
 
-    if constants.ENABLE_FILE_STORAGE:
-      # this is needed because os.path.join does not accept None arguments
-      if self.op.file_storage_dir is None:
-        string_file_storage_dir = ""
-      else:
-        string_file_storage_dir = self.op.file_storage_dir
-
-      # build the full file storage dir path
-      file_storage_dir = utils.PathJoin(self.cfg.GetFileStorageDir(),
-                                        string_file_storage_dir, instance)
-    else:
-      file_storage_dir = ""
-
     disks = _GenerateDiskTemplate(self,
                                   self.op.disk_template,
                                   instance, pnode_name,
                                   self.secondaries,
                                   self.disks,
-                                  file_storage_dir,
+                                  self.instance_file_storage_dir,
                                   self.op.file_driver,
                                   0,
                                   feedback_fn)
@@ -7556,18 +7817,6 @@ class LUInstanceCreate(LogicalUnit):
           self.cfg.ReleaseDRBDMinors(instance)
           raise
 
-      if self.cfg.GetClusterInfo().prealloc_wipe_disks:
-        feedback_fn("* wiping instance disks...")
-        try:
-          _WipeDisks(self, iobj)
-        except errors.OpExecError:
-          self.LogWarning("Device wiping failed, reverting...")
-          try:
-            _RemoveDisks(self, iobj)
-          finally:
-            self.cfg.ReleaseDRBDMinors(instance)
-            raise
-
     feedback_fn("adding instance %s to cluster config" % instance)
 
     self.cfg.AddInstance(iobj, self.proc.GetECId())
@@ -7586,7 +7835,20 @@ class LUInstanceCreate(LogicalUnit):
       self.context.glm.release(locking.LEVEL_NODE)
       del self.acquired_locks[locking.LEVEL_NODE]
 
-    if self.op.wait_for_sync:
+    disk_abort = False
+    if not self.adopt_disks and self.cfg.GetClusterInfo().prealloc_wipe_disks:
+      feedback_fn("* wiping instance disks...")
+      try:
+        _WipeDisks(self, iobj)
+      except errors.OpExecError, err:
+        logging.exception("Wiping disks failed")
+        self.LogWarning("Wiping instance disks failed (%s)", err)
+        disk_abort = True
+
+    if disk_abort:
+      # Something is already wrong with the disks, don't do anything else
+      pass
+    elif self.op.wait_for_sync:
       disk_abort = not _WaitForSync(self, iobj)
     elif iobj.disk_template in constants.DTS_NET_MIRROR:
       # make sure the disks are not degraded (still sync-ing is ok)
@@ -7731,18 +7993,28 @@ class LUInstanceConsole(NoHooksLU):
 
     logging.debug("Connecting to console of %s on %s", instance.name, node)
 
-    hyper = hypervisor.GetHypervisor(instance.hypervisor)
-    cluster = self.cfg.GetClusterInfo()
-    # beparams and hvparams are passed separately, to avoid editing the
-    # instance and then saving the defaults in the instance itself.
-    hvparams = cluster.FillHV(instance)
-    beparams = cluster.FillBE(instance)
-    console = hyper.GetInstanceConsole(instance, hvparams, beparams)
+    return _GetInstanceConsole(self.cfg.GetClusterInfo(), instance)
 
-    assert console.instance == instance.name
-    assert console.Validate()
 
-    return console.ToDict()
+def _GetInstanceConsole(cluster, instance):
+  """Returns console information for an instance.
+
+  @type cluster: L{objects.Cluster}
+  @type instance: L{objects.Instance}
+  @rtype: dict
+
+  """
+  hyper = hypervisor.GetHypervisor(instance.hypervisor)
+  # beparams and hvparams are passed separately, to avoid editing the
+  # instance and then saving the defaults in the instance itself.
+  hvparams = cluster.FillHV(instance)
+  beparams = cluster.FillBE(instance)
+  console = hyper.GetInstanceConsole(instance, hvparams, beparams)
+
+  assert console.instance == instance.name
+  assert console.Validate()
+
+  return console.ToDict()
 
 
 class LUInstanceReplaceDisks(LogicalUnit):
@@ -8026,18 +8298,24 @@ class TLReplaceDisks(Tasklet):
     for node in check_nodes:
       _CheckNodeOnline(self.lu, node)
 
+    touched_nodes = frozenset([self.new_node, self.other_node,
+                               self.target_node])
+
+    if self.lu.needed_locks[locking.LEVEL_NODE] == locking.ALL_SET:
+      # Release unneeded node locks
+      for name in self.lu.acquired_locks[locking.LEVEL_NODE]:
+        if name not in touched_nodes:
+          self._ReleaseNodeLock(name)
+
     # Check whether disks are valid
     for disk_idx in self.disks:
       instance.FindDisk(disk_idx)
 
     # Get secondary node IP addresses
-    node_2nd_ip = {}
-
-    for node_name in [self.target_node, self.other_node, self.new_node]:
-      if node_name is not None:
-        node_2nd_ip[node_name] = self.cfg.GetNodeInfo(node_name).secondary_ip
-
-    self.node_secondary_ip = node_2nd_ip
+    self.node_secondary_ip = \
+      dict((node_name, self.cfg.GetNodeInfo(node_name).secondary_ip)
+           for node_name in touched_nodes
+           if node_name is not None)
 
   def Exec(self, feedback_fn):
     """Execute disk replacement.
@@ -8048,6 +8326,13 @@ class TLReplaceDisks(Tasklet):
     if self.delay_iallocator:
       self._CheckPrereq2()
 
+    if (self.lu.needed_locks[locking.LEVEL_NODE] == locking.ALL_SET and
+        __debug__):
+      # Verify owned locks before starting operation
+      owned_locks = self.lu.context.glm.list_owned(locking.LEVEL_NODE)
+      assert set(owned_locks) == set(self.node_secondary_ip), \
+          "Not owning the correct locks: %s" % (owned_locks, )
+
     if not self.disks:
       feedback_fn("No disks need replacement")
       return
@@ -8068,14 +8353,24 @@ class TLReplaceDisks(Tasklet):
       else:
         fn = self._ExecDrbd8DiskOnly
 
-      return fn(feedback_fn)
-
+      result = fn(feedback_fn)
     finally:
       # Deactivate the instance disks if we're replacing them on a
       # down instance
       if activate_disks:
         _SafeShutdownInstanceDisks(self.lu, self.instance)
 
+    if __debug__:
+      # Verify owned locks
+      owned_locks = self.lu.context.glm.list_owned(locking.LEVEL_NODE)
+      assert ((self.early_release and not owned_locks) or
+              (not self.early_release and
+               set(owned_locks) == set(self.node_secondary_ip))), \
+        ("Not owning the correct locks, early_release=%s, owned=%r" %
+         (self.early_release, owned_locks))
+
+    return result
+
   def _CheckVolumeGroup(self, nodes):
     self.lu.LogInfo("Checking volume groups")
 
@@ -8127,7 +8422,12 @@ class TLReplaceDisks(Tasklet):
                                  (node_name, self.instance.name))
 
   def _CreateNewStorage(self, node_name):
-    vgname = self.cfg.GetVGName()
+    """Create new storage on the primary or secondary node.
+
+    This is only used for same-node replaces, not for changing the
+    secondary node, hence we don't want to modify the existing disk.
+
+    """
     iv_names = {}
 
     for idx, dev in enumerate(self.instance.disks):
@@ -8141,13 +8441,15 @@ class TLReplaceDisks(Tasklet):
       lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
       names = _GenerateUniqueNames(self.lu, lv_names)
 
+      vg_data = dev.children[0].logical_id[0]
       lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
-                             logical_id=(vgname, names[0]))
+                             logical_id=(vg_data, names[0]))
+      vg_meta = dev.children[1].logical_id[0]
       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
-                             logical_id=(vgname, names[1]))
+                             logical_id=(vg_meta, names[1]))
 
       new_lvs = [lv_data, lv_meta]
-      old_lvs = dev.children
+      old_lvs = [child.Copy() for child in dev.children]
       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
 
       # we pass force_create=True to force the LVM creation
@@ -8272,10 +8574,14 @@ class TLReplaceDisks(Tasklet):
                                              rename_new_to_old)
       result.Raise("Can't rename new LVs on node %s" % self.target_node)
 
+      # Intermediate steps of in memory modifications
       for old, new in zip(old_lvs, new_lvs):
         new.logical_id = old.logical_id
         self.cfg.SetDiskID(new, self.target_node)
 
+      # We need to modify old_lvs so that removal later removes the
+      # right LVs, not the newly added ones; note that old_lvs is a
+      # copy here
       for disk in old_lvs:
         disk.logical_id = ren_fn(disk, temp_suffix)
         self.cfg.SetDiskID(disk, self.target_node)
@@ -8295,10 +8601,6 @@ class TLReplaceDisks(Tasklet):
                                      "volumes"))
         raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
 
-      dev.children = new_lvs
-
-      self.cfg.Update(self.instance, feedback_fn)
-
     cstep = 5
     if self.early_release:
       self.lu.LogStep(cstep, steps_total, "Removing old storage")
@@ -8564,10 +8866,13 @@ class LUNodeEvacStrategy(NoHooksLU):
       locks[locking.LEVEL_NODE] = self.op.nodes + [self.op.remote_node]
 
   def Exec(self, feedback_fn):
+    instances = []
+    for node in self.op.nodes:
+      instances.extend(_GetNodeSecondaryInstances(self.cfg, node))
+    if not instances:
+      return []
+
     if self.op.remote_node is not None:
-      instances = []
-      for node in self.op.nodes:
-        instances.extend(_GetNodeSecondaryInstances(self.cfg, node))
       result = []
       for i in instances:
         if i.primary_node == self.op.remote_node:
@@ -8693,23 +8998,33 @@ class LUInstanceQueryData(NoHooksLU):
 
   def ExpandNames(self):
     self.needed_locks = {}
-    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
 
-    if self.op.instances:
-      self.wanted_names = []
-      for name in self.op.instances:
-        full_name = _ExpandInstanceName(self.cfg, name)
-        self.wanted_names.append(full_name)
-      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
+    # Use locking if requested or when non-static information is wanted
+    if not (self.op.static or self.op.use_locking):
+      self.LogWarning("Non-static data requested, locks need to be acquired")
+      self.op.use_locking = True
+
+    if self.op.instances or not self.op.use_locking:
+      # Expand instance names right here
+      self.wanted_names = _GetWantedInstances(self, self.op.instances)
     else:
+      # Will use acquired locks
       self.wanted_names = None
-      self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
 
-    self.needed_locks[locking.LEVEL_NODE] = []
-    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+    if self.op.use_locking:
+      self.share_locks = dict.fromkeys(locking.LEVELS, 1)
+
+      if self.wanted_names is None:
+        self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
+      else:
+        self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
+
+      self.needed_locks[locking.LEVEL_NODE] = []
+      self.share_locks = dict.fromkeys(locking.LEVELS, 1)
+      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
 
   def DeclareLocks(self, level):
-    if level == locking.LEVEL_NODE:
+    if self.op.use_locking and level == locking.LEVEL_NODE:
       self._LockInstancesNodes()
 
   def CheckPrereq(self):
@@ -8719,10 +9034,11 @@ class LUInstanceQueryData(NoHooksLU):
 
     """
     if self.wanted_names is None:
+      assert self.op.use_locking, "Locking was not used"
       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
 
-    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
-                             in self.wanted_names]
+    self.wanted_instances = [self.cfg.GetInstanceInfo(name)
+                             for name in self.wanted_names]
 
   def _ComputeBlockdevStatus(self, node, instance_name, dev):
     """Returns the status of a block device
@@ -8768,7 +9084,7 @@ class LUInstanceQueryData(NoHooksLU):
     else:
       dev_children = []
 
-    data = {
+    return {
       "iv_name": dev.iv_name,
       "dev_type": dev.dev_type,
       "logical_id": dev.logical_id,
@@ -8780,8 +9096,6 @@ class LUInstanceQueryData(NoHooksLU):
       "size": dev.size,
       }
 
-    return data
-
   def Exec(self, feedback_fn):
     """Gather and return data"""
     result = {}
@@ -8809,7 +9123,7 @@ class LUInstanceQueryData(NoHooksLU):
       disks = [self._ComputeDiskStatus(instance, None, device)
                for device in instance.disks]
 
-      idict = {
+      result[instance.name] = {
         "name": instance.name,
         "config_state": config_state,
         "run_state": remote_state,
@@ -8834,8 +9148,6 @@ class LUInstanceQueryData(NoHooksLU):
         "uuid": instance.uuid,
         }
 
-      result[instance.name] = idict
-
     return result
 
 
@@ -9101,6 +9413,7 @@ class LUInstanceSetParams(LogicalUnit):
       self.be_inst = i_bedict # the new dict (without defaults)
     else:
       self.be_new = self.be_inst = {}
+    be_old = cluster.FillBE(instance)
 
     # osparams processing
     if self.op.osparams:
@@ -9112,7 +9425,8 @@ class LUInstanceSetParams(LogicalUnit):
 
     self.warn = []
 
-    if constants.BE_MEMORY in self.op.beparams and not self.op.force:
+    if (constants.BE_MEMORY in self.op.beparams and not self.op.force and
+        be_new[constants.BE_MEMORY] > be_old[constants.BE_MEMORY]):
       mem_check_list = [pnode]
       if be_new[constants.BE_AUTO_BALANCE]:
         # either we changed auto_balance to yes or it was from before
@@ -9153,16 +9467,17 @@ class LUInstanceSetParams(LogicalUnit):
         for node, nres in nodeinfo.items():
           if node not in instance.secondary_nodes:
             continue
-          msg = nres.fail_msg
-          if msg:
-            self.warn.append("Can't get info from secondary node %s: %s" %
-                             (node, msg))
-          elif not isinstance(nres.payload.get('memory_free', None), int):
-            self.warn.append("Secondary node %s didn't return free"
-                             " memory information" % node)
+          nres.Raise("Can't get info from secondary node %s" % node,
+                     prereq=True, ecode=errors.ECODE_STATE)
+          if not isinstance(nres.payload.get('memory_free', None), int):
+            raise errors.OpPrereqError("Secondary node %s didn't return free"
+                                       " memory information" % node,
+                                       errors.ECODE_STATE)
           elif be_new[constants.BE_MEMORY] > nres.payload['memory_free']:
-            self.warn.append("Not enough memory to failover instance to"
-                             " secondary node %s" % node)
+            raise errors.OpPrereqError("This change will prevent the instance"
+                                       " from failover to its secondary node"
+                                       " %s, due to not enough memory" % node,
+                                       errors.ECODE_STATE)
 
     # NIC processing
     self.nic_pnew = {}
@@ -9277,7 +9592,8 @@ class LUInstanceSetParams(LogicalUnit):
     snode = self.op.remote_node
 
     # create a fake disk info for _GenerateDiskTemplate
-    disk_info = [{"size": d.size, "mode": d.mode} for d in instance.disks]
+    disk_info = [{"size": d.size, "mode": d.mode,
+                  "vg": d.logical_id[0]} for d in instance.disks]
     new_disks = _GenerateDiskTemplate(self, self.op.disk_template,
                                       instance.name, pnode, [snode],
                                       disk_info, None, None, 0, feedback_fn)
@@ -9311,7 +9627,8 @@ class LUInstanceSetParams(LogicalUnit):
     self.cfg.Update(instance, feedback_fn)
 
     # disks are created, waiting for sync
-    disk_abort = not _WaitForSync(self, instance)
+    disk_abort = not _WaitForSync(self, instance,
+                                  oneshot=not self.op.wait_for_sync)
     if disk_abort:
       raise errors.OpExecError("There are some degraded disks for"
                                " this instance, please cleanup manually")
@@ -9988,20 +10305,40 @@ class LUGroupAssignNodes(NoHooksLU):
 
     # We want to lock all the affected nodes and groups. We have readily
     # available the list of nodes, and the *destination* group. To gather the
-    # list of "source" groups, we need to fetch node information.
-    self.node_data = self.cfg.GetAllNodesInfo()
-    affected_groups = set(self.node_data[node].group for node in self.op.nodes)
-    affected_groups.add(self.group_uuid)
-
+    # list of "source" groups, we need to fetch node information later on.
     self.needed_locks = {
-      locking.LEVEL_NODEGROUP: list(affected_groups),
+      locking.LEVEL_NODEGROUP: set([self.group_uuid]),
       locking.LEVEL_NODE: self.op.nodes,
       }
 
+  def DeclareLocks(self, level):
+    if level == locking.LEVEL_NODEGROUP:
+      assert len(self.needed_locks[locking.LEVEL_NODEGROUP]) == 1
+
+      # Try to get all affected nodes' groups without having the group or node
+      # lock yet. Needs verification later in the code flow.
+      groups = self.cfg.GetNodeGroupsFromNodes(self.op.nodes)
+
+      self.needed_locks[locking.LEVEL_NODEGROUP].update(groups)
+
   def CheckPrereq(self):
     """Check prerequisites.
 
     """
+    assert self.needed_locks[locking.LEVEL_NODEGROUP]
+    assert (frozenset(self.acquired_locks[locking.LEVEL_NODE]) ==
+            frozenset(self.op.nodes))
+
+    expected_locks = (set([self.group_uuid]) |
+                      self.cfg.GetNodeGroupsFromNodes(self.op.nodes))
+    actual_locks = self.acquired_locks[locking.LEVEL_NODEGROUP]
+    if actual_locks != expected_locks:
+      raise errors.OpExecError("Nodes changed groups since locks were acquired,"
+                               " current groups are '%s', used to be '%s'" %
+                               (utils.CommaJoin(expected_locks),
+                                utils.CommaJoin(actual_locks)))
+
+    self.node_data = self.cfg.GetAllNodesInfo()
     self.group = self.cfg.GetNodeGroup(self.group_uuid)
     instance_data = self.cfg.GetAllInstancesInfo()
 
@@ -10027,7 +10364,7 @@ class LUGroupAssignNodes(NoHooksLU):
 
         if previous_splits:
           self.LogWarning("In addition, these already-split instances continue"
-                          " to be spit across groups: %s",
+                          " to be split across groups: %s",
                           utils.CommaJoin(utils.NiceSort(previous_splits)))
 
   def Exec(self, feedback_fn):
@@ -10037,6 +10374,9 @@ class LUGroupAssignNodes(NoHooksLU):
     for node in self.op.nodes:
       self.node_data[node].group = self.group_uuid
 
+    # FIXME: Depends on side-effects of modifying the result of
+    # C{cfg.GetAllNodesInfo}
+
     self.cfg.Update(self.group, feedback_fn) # Saves all modified nodes.
 
   @staticmethod
@@ -10117,7 +10457,8 @@ class _GroupQuery(_QueryBase):
           missing.append(name)
 
       if missing:
-        raise errors.OpPrereqError("Some groups do not exist: %s" % missing,
+        raise errors.OpPrereqError("Some groups do not exist: %s" %
+                                   utils.CommaJoin(missing),
                                    errors.ECODE_NOENT)
 
   def DeclareLocks(self, lu, level):
@@ -10286,9 +10627,9 @@ class LUGroupRemove(LogicalUnit):
 
     # Verify the cluster would not be left group-less.
     if len(self.cfg.GetNodeGroupList()) == 1:
-      raise errors.OpPrereqError("Group '%s' is the last group in the cluster,"
-                                 " which cannot be left without at least one"
-                                 " group" % self.op.group_name,
+      raise errors.OpPrereqError("Group '%s' is the only group,"
+                                 " cannot be removed" %
+                                 self.op.group_name,
                                  errors.ECODE_STATE)
 
   def BuildHooksEnv(self):
@@ -10929,8 +11270,7 @@ class IAllocator(object):
           "i_pri_up_memory": i_p_up_mem,
           }
         pnr_dyn.update(node_results[nname])
-
-      node_results[nname] = pnr_dyn
+        node_results[nname] = pnr_dyn
 
     return node_results