LUClusterSetParams: When ipolicy is updated warn for new violations
[ganeti-local] / lib / cmdlib.py
index d1fe084..f867a4d 100644 (file)
@@ -1,7 +1,7 @@
 #
 #
 
-# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
+# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
@@ -67,6 +67,13 @@ import ganeti.masterd.instance # pylint: disable=W0611
 #: Size of DRBD meta block device
 DRBD_META_SIZE = 128
 
+# States of instance
+INSTANCE_UP = [constants.ADMINST_UP]
+INSTANCE_DOWN = [constants.ADMINST_DOWN]
+INSTANCE_OFFLINE = [constants.ADMINST_OFFLINE]
+INSTANCE_ONLINE = [constants.ADMINST_DOWN, constants.ADMINST_UP]
+INSTANCE_NOT_RUNNING = [constants.ADMINST_DOWN, constants.ADMINST_OFFLINE]
+
 
 class ResultWithJobs:
   """Data container for LU results with jobs.
@@ -566,6 +573,20 @@ def _ShareAll():
   return dict.fromkeys(locking.LEVELS, 1)
 
 
+def _MakeLegacyNodeInfo(data):
+  """Formats the data returned by L{rpc.RpcRunner.call_node_info}.
+
+  Converts the data into a single dictionary. This is fine for most use cases,
+  but some require information from more than one volume group or hypervisor.
+
+  """
+  (bootid, (vg_info, ), (hv_info, )) = data
+
+  return utils.JoinDisjointDicts(utils.JoinDisjointDicts(vg_info, hv_info), {
+    "bootid": bootid,
+    })
+
+
 def _CheckInstanceNodeGroups(cfg, instance_name, owned_groups):
   """Checks if the owned node groups are still correct for an instance.
 
@@ -700,6 +721,109 @@ def _GetUpdatedParams(old_params, update_dict,
   return params_copy
 
 
+def _GetUpdatedIPolicy(old_ipolicy, new_ipolicy, group_policy=False):
+  """Return the new version of a instance policy.
+
+  @param group_policy: whether this policy applies to a group and thus
+    we should support removal of policy entries
+
+  """
+  use_none = use_default = group_policy
+  ipolicy = copy.deepcopy(old_ipolicy)
+  for key, value in new_ipolicy.items():
+    if key not in constants.IPOLICY_ALL_KEYS:
+      raise errors.OpPrereqError("Invalid key in new ipolicy: %s" % key,
+                                 errors.ECODE_INVAL)
+    if key in constants.IPOLICY_PARAMETERS:
+      utils.ForceDictType(value, constants.ISPECS_PARAMETER_TYPES)
+      ipolicy[key] = _GetUpdatedParams(old_ipolicy.get(key, {}), value,
+                                       use_none=use_none,
+                                       use_default=use_default)
+    else:
+      # FIXME: we assume all others are lists; this should be redone
+      # in a nicer way
+      if not value or value == [constants.VALUE_DEFAULT]:
+        if group_policy:
+          del ipolicy[key]
+        else:
+          raise errors.OpPrereqError("Can't unset ipolicy attribute '%s'"
+                                     " on the cluster'" % key,
+                                     errors.ECODE_INVAL)
+      else:
+        ipolicy[key] = list(value)
+  try:
+    objects.InstancePolicy.CheckParameterSyntax(ipolicy)
+  except errors.ConfigurationError, err:
+    raise errors.OpPrereqError("Invalid instance policy: %s" % err,
+                               errors.ECODE_INVAL)
+  return ipolicy
+
+
+def _UpdateAndVerifySubDict(base, updates, type_check):
+  """Updates and verifies a dict with sub dicts of the same type.
+
+  @param base: The dict with the old data
+  @param updates: The dict with the new data
+  @param type_check: Dict suitable to ForceDictType to verify correct types
+  @returns: A new dict with updated and verified values
+
+  """
+  def fn(old, value):
+    new = _GetUpdatedParams(old, value)
+    utils.ForceDictType(new, type_check)
+    return new
+
+  ret = copy.deepcopy(base)
+  ret.update(dict((key, fn(base.get(key, {}), value))
+                  for key, value in updates.items()))
+  return ret
+
+
+def _MergeAndVerifyHvState(op_input, obj_input):
+  """Combines the hv state from an opcode with the one of the object
+
+  @param op_input: The input dict from the opcode
+  @param obj_input: The input dict from the objects
+  @return: The verified and updated dict
+
+  """
+  if op_input:
+    invalid_hvs = set(op_input) - constants.HYPER_TYPES
+    if invalid_hvs:
+      raise errors.OpPrereqError("Invalid hypervisor(s) in hypervisor state:"
+                                 " %s" % utils.CommaJoin(invalid_hvs),
+                                 errors.ECODE_INVAL)
+    if obj_input is None:
+      obj_input = {}
+    type_check = constants.HVSTS_PARAMETER_TYPES
+    return _UpdateAndVerifySubDict(obj_input, op_input, type_check)
+
+  return None
+
+
+def _MergeAndVerifyDiskState(op_input, obj_input):
+  """Combines the disk state from an opcode with the one of the object
+
+  @param op_input: The input dict from the opcode
+  @param obj_input: The input dict from the objects
+  @return: The verified and updated dict
+  """
+  if op_input:
+    invalid_dst = set(op_input) - constants.DS_VALID_TYPES
+    if invalid_dst:
+      raise errors.OpPrereqError("Invalid storage type(s) in disk state: %s" %
+                                 utils.CommaJoin(invalid_dst),
+                                 errors.ECODE_INVAL)
+    type_check = constants.DSS_PARAMETER_TYPES
+    if obj_input is None:
+      obj_input = {}
+    return dict((key, _UpdateAndVerifySubDict(obj_input.get(key, {}), value,
+                                              type_check))
+                for key, value in op_input.items())
+
+  return None
+
+
 def _ReleaseLocks(lu, level, names=None, keep=None):
   """Releases locks owned by an LU.
 
@@ -721,12 +845,17 @@ def _ReleaseLocks(lu, level, names=None, keep=None):
   else:
     should_release = None
 
-  if should_release:
+  owned = lu.owned_locks(level)
+  if not owned:
+    # Not owning any lock at this level, do nothing
+    pass
+
+  elif should_release:
     retain = []
     release = []
 
     # Determine which locks to release
-    for name in lu.owned_locks(level):
+    for name in owned:
       if should_release(name):
         release.append(name)
       else:
@@ -898,20 +1027,186 @@ def _GetClusterDomainSecret():
                                strict=True)
 
 
-def _CheckInstanceDown(lu, instance, reason):
-  """Ensure that an instance is not running."""
-  if instance.admin_up:
-    raise errors.OpPrereqError("Instance %s is marked to be up, %s" %
-                               (instance.name, reason), errors.ECODE_STATE)
+def _CheckInstanceState(lu, instance, req_states, msg=None):
+  """Ensure that an instance is in one of the required states.
+
+  @param lu: the LU on behalf of which we make the check
+  @param instance: the instance to check
+  @param msg: if passed, should be a message to replace the default one
+  @raise errors.OpPrereqError: if the instance is not in the required state
+
+  """
+  if msg is None:
+    msg = "can't use instance from outside %s states" % ", ".join(req_states)
+  if instance.admin_state not in req_states:
+    raise errors.OpPrereqError("Instance '%s' is marked to be %s, %s" %
+                               (instance.name, instance.admin_state, msg),
+                               errors.ECODE_STATE)
+
+  if constants.ADMINST_UP not in req_states:
+    pnode = instance.primary_node
+    ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
+    ins_l.Raise("Can't contact node %s for instance information" % pnode,
+                prereq=True, ecode=errors.ECODE_ENVIRON)
+
+    if instance.name in ins_l.payload:
+      raise errors.OpPrereqError("Instance %s is running, %s" %
+                                 (instance.name, msg), errors.ECODE_STATE)
+
+
+def _ComputeMinMaxSpec(name, ipolicy, value):
+  """Computes if value is in the desired range.
+
+  @param name: name of the parameter for which we perform the check
+  @param ipolicy: dictionary containing min, max and std values
+  @param value: actual value that we want to use
+  @return: None or element not meeting the criteria
+
+
+  """
+  if value in [None, constants.VALUE_AUTO]:
+    return None
+  max_v = ipolicy[constants.ISPECS_MAX].get(name, value)
+  min_v = ipolicy[constants.ISPECS_MIN].get(name, value)
+  if value > max_v or min_v > value:
+    return ("%s value %s is not in range [%s, %s]" %
+            (name, value, min_v, max_v))
+  return None
+
+
+def _ComputeIPolicySpecViolation(ipolicy, mem_size, cpu_count, disk_count,
+                                 nic_count, disk_sizes,
+                                 _compute_fn=_ComputeMinMaxSpec):
+  """Verifies ipolicy against provided specs.
+
+  @type ipolicy: dict
+  @param ipolicy: The ipolicy
+  @type mem_size: int
+  @param mem_size: The memory size
+  @type cpu_count: int
+  @param cpu_count: Used cpu cores
+  @type disk_count: int
+  @param disk_count: Number of disks used
+  @type nic_count: int
+  @param nic_count: Number of nics used
+  @type disk_sizes: list of ints
+  @param disk_sizes: Disk sizes of used disk (len must match C{disk_count})
+  @param _compute_fn: The compute function (unittest only)
+  @return: A list of violations, or an empty list of no violations are found
+
+  """
+  assert disk_count == len(disk_sizes)
+
+  test_settings = [
+    (constants.ISPEC_MEM_SIZE, mem_size),
+    (constants.ISPEC_CPU_COUNT, cpu_count),
+    (constants.ISPEC_DISK_COUNT, disk_count),
+    (constants.ISPEC_NIC_COUNT, nic_count),
+    ] + map((lambda d: (constants.ISPEC_DISK_SIZE, d)), disk_sizes)
+
+  return filter(None,
+                (_compute_fn(name, ipolicy, value)
+                 for (name, value) in test_settings))
+
+
+def _ComputeIPolicyInstanceViolation(ipolicy, instance,
+                                     _compute_fn=_ComputeIPolicySpecViolation):
+  """Compute if instance meets the specs of ipolicy.
+
+  @type ipolicy: dict
+  @param ipolicy: The ipolicy to verify against
+  @type instance: L{objects.Instance}
+  @param instance: The instance to verify
+  @param _compute_fn: The function to verify ipolicy (unittest only)
+  @see: L{_ComputeIPolicySpecViolation}
+
+  """
+  mem_size = instance.beparams.get(constants.BE_MAXMEM, None)
+  cpu_count = instance.beparams.get(constants.BE_VCPUS, None)
+  disk_count = len(instance.disks)
+  disk_sizes = [disk.size for disk in instance.disks]
+  nic_count = len(instance.nics)
+
+  return _compute_fn(ipolicy, mem_size, cpu_count, disk_count, nic_count,
+                     disk_sizes)
+
+
+def _ComputeIPolicyInstanceSpecViolation(ipolicy, instance_spec,
+    _compute_fn=_ComputeIPolicySpecViolation):
+  """Compute if instance specs meets the specs of ipolicy.
+
+  @type ipolicy: dict
+  @param ipolicy: The ipolicy to verify against
+  @param instance_spec: dict
+  @param instance_spec: The instance spec to verify
+  @param _compute_fn: The function to verify ipolicy (unittest only)
+  @see: L{_ComputeIPolicySpecViolation}
+
+  """
+  mem_size = instance_spec.get(constants.ISPEC_MEM_SIZE, None)
+  cpu_count = instance_spec.get(constants.ISPEC_CPU_COUNT, None)
+  disk_count = instance_spec.get(constants.ISPEC_DISK_COUNT, 0)
+  disk_sizes = instance_spec.get(constants.ISPEC_DISK_SIZE, [])
+  nic_count = instance_spec.get(constants.ISPEC_NIC_COUNT, 0)
+
+  return _compute_fn(ipolicy, mem_size, cpu_count, disk_count, nic_count,
+                     disk_sizes)
 
-  pnode = instance.primary_node
-  ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
-  ins_l.Raise("Can't contact node %s for instance information" % pnode,
-              prereq=True, ecode=errors.ECODE_ENVIRON)
 
-  if instance.name in ins_l.payload:
-    raise errors.OpPrereqError("Instance %s is running, %s" %
-                               (instance.name, reason), errors.ECODE_STATE)
+def _ComputeIPolicyNodeViolation(ipolicy, instance, current_group,
+                                 target_group,
+                                 _compute_fn=_ComputeIPolicyInstanceViolation):
+  """Compute if instance meets the specs of the new target group.
+
+  @param ipolicy: The ipolicy to verify
+  @param instance: The instance object to verify
+  @param current_group: The current group of the instance
+  @param target_group: The new group of the instance
+  @param _compute_fn: The function to verify ipolicy (unittest only)
+  @see: L{_ComputeIPolicySpecViolation}
+
+  """
+  if current_group == target_group:
+    return []
+  else:
+    return _compute_fn(ipolicy, instance)
+
+
+def _CheckTargetNodeIPolicy(lu, ipolicy, instance, node, ignore=False,
+                            _compute_fn=_ComputeIPolicyNodeViolation):
+  """Checks that the target node is correct in terms of instance policy.
+
+  @param ipolicy: The ipolicy to verify
+  @param instance: The instance object to verify
+  @param node: The new node to relocate
+  @param ignore: Ignore violations of the ipolicy
+  @param _compute_fn: The function to verify ipolicy (unittest only)
+  @see: L{_ComputeIPolicySpecViolation}
+
+  """
+  primary_node = lu.cfg.GetNodeInfo(instance.primary_node)
+  res = _compute_fn(ipolicy, instance, primary_node.group, node.group)
+
+  if res:
+    msg = ("Instance does not meet target node group's (%s) instance"
+           " policy: %s") % (node.group, utils.CommaJoin(res))
+    if ignore:
+      lu.LogWarning(msg)
+    else:
+      raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
+
+
+def _ComputeNewInstanceViolations(old_ipolicy, new_ipolicy, instances):
+  """Computes a set of any instances that would violate the new ipolicy.
+
+  @param old_ipolicy: The current (still in-place) ipolicy
+  @param new_ipolicy: The new (to become) ipolicy
+  @param instances: List of instances to verify
+  @return: A list of instances which violates the new ipolicy but did not before
+
+  """
+  return (_ComputeViolatingInstances(old_ipolicy, instances) -
+          _ComputeViolatingInstances(new_ipolicy, instances))
 
 
 def _ExpandItemName(fn, name, kind):
@@ -942,7 +1237,7 @@ def _ExpandInstanceName(cfg, name):
 
 
 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
-                          memory, vcpus, nics, disk_template, disks,
+                          minmem, maxmem, vcpus, nics, disk_template, disks,
                           bep, hvp, hypervisor_name, tags):
   """Builds instance related env variables for hooks
 
@@ -956,10 +1251,12 @@ def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
   @param secondary_nodes: list of secondary nodes as strings
   @type os_type: string
   @param os_type: the name of the instance's OS
-  @type status: boolean
-  @param status: the should_run status of the instance
-  @type memory: string
-  @param memory: the memory size of the instance
+  @type status: string
+  @param status: the desired status of the instance
+  @type minmem: string
+  @param minmem: the minimum memory size of the instance
+  @type maxmem: string
+  @param maxmem: the maximum memory size of the instance
   @type vcpus: string
   @param vcpus: the count of VCPUs the instance has
   @type nics: list
@@ -981,23 +1278,21 @@ def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
   @return: the hook environment for this instance
 
   """
-  if status:
-    str_status = "up"
-  else:
-    str_status = "down"
   env = {
     "OP_TARGET": name,
     "INSTANCE_NAME": name,
     "INSTANCE_PRIMARY": primary_node,
     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
     "INSTANCE_OS_TYPE": os_type,
-    "INSTANCE_STATUS": str_status,
-    "INSTANCE_MEMORY": memory,
+    "INSTANCE_STATUS": status,
+    "INSTANCE_MINMEM": minmem,
+    "INSTANCE_MAXMEM": maxmem,
+    # TODO(2.7) remove deprecated "memory" value
+    "INSTANCE_MEMORY": maxmem,
     "INSTANCE_VCPUS": vcpus,
     "INSTANCE_DISK_TEMPLATE": disk_template,
     "INSTANCE_HYPERVISOR": hypervisor_name,
   }
-
   if nics:
     nic_count = len(nics)
     for idx, (ip, mac, mode, link) in enumerate(nics):
@@ -1083,8 +1378,9 @@ def _BuildInstanceHookEnvByObject(lu, instance, override=None):
     "primary_node": instance.primary_node,
     "secondary_nodes": instance.secondary_nodes,
     "os_type": instance.os,
-    "status": instance.admin_up,
-    "memory": bep[constants.BE_MEMORY],
+    "status": instance.admin_state,
+    "maxmem": bep[constants.BE_MAXMEM],
+    "minmem": bep[constants.BE_MINMEM],
     "vcpus": bep[constants.BE_VCPUS],
     "nics": _NICListToTuple(lu, instance.nics),
     "disk_template": instance.disk_template,
@@ -1126,6 +1422,26 @@ def _DecideSelfPromotion(lu, exceptions=None):
   return mc_now < mc_should
 
 
+def _CalculateGroupIPolicy(cluster, group):
+  """Calculate instance policy for group.
+
+  """
+  return cluster.SimpleFillIPolicy(group.ipolicy)
+
+
+def _ComputeViolatingInstances(ipolicy, instances):
+  """Computes a set of instances who violates given ipolicy.
+
+  @param ipolicy: The ipolicy to verify
+  @type instances: object.Instance
+  @param instances: List of instances to verify
+  @return: A frozenset of instance names violating the ipolicy
+
+  """
+  return frozenset([inst.name for inst in instances
+                    if _ComputeIPolicyInstanceViolation(ipolicy, inst)])
+
+
 def _CheckNicsBridgesExist(lu, target_nics, target_node):
   """Check that the brigdes needed by a list of nics exist.
 
@@ -1367,7 +1683,9 @@ class LUClusterDestroy(LogicalUnit):
     ems = self.cfg.GetUseExternalMipScript()
     result = self.rpc.call_node_deactivate_master_ip(master_params.name,
                                                      master_params, ems)
-    result.Raise("Could not disable the master role")
+    if result.fail_msg:
+      self.LogWarning("Error disabling the master IP address: %s",
+                      result.fail_msg)
 
     return master_params.name
 
@@ -2018,6 +2336,10 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
     node_vol_should = {}
     instanceconfig.MapLVsByNode(node_vol_should)
 
+    ipolicy = _CalculateGroupIPolicy(self.cfg.GetClusterInfo(), self.group_info)
+    err = _ComputeIPolicyInstanceViolation(ipolicy, instanceconfig)
+    _ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance, err)
+
     for node in node_vol_should:
       n_img = node_image[node]
       if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
@@ -2028,7 +2350,7 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
         _ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance,
                  "volume %s missing on node %s", volume, node)
 
-    if instanceconfig.admin_up:
+    if instanceconfig.admin_state == constants.ADMINST_UP:
       pri_img = node_image[node_current]
       test = instance not in pri_img.instances and not pri_img.offline
       _ErrorIf(test, constants.CV_EINSTANCEDOWN, instance,
@@ -2044,12 +2366,13 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
       # node here
       snode = node_image[nname]
       bad_snode = snode.ghost or snode.offline
-      _ErrorIf(instanceconfig.admin_up and not success and not bad_snode,
+      _ErrorIf(instanceconfig.admin_state == constants.ADMINST_UP and
+               not success and not bad_snode,
                constants.CV_EINSTANCEFAULTYDISK, instance,
                "couldn't retrieve status for disk/%s on %s: %s",
                idx, nname, bdev_status)
-      _ErrorIf((instanceconfig.admin_up and success and
-                bdev_status.ldisk_status == constants.LDS_FAULTY),
+      _ErrorIf((instanceconfig.admin_state == constants.ADMINST_UP and
+                success and bdev_status.ldisk_status == constants.LDS_FAULTY),
                constants.CV_EINSTANCEFAULTYDISK, instance,
                "disk/%s on %s is faulty", idx, nname)
 
@@ -2097,12 +2420,14 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
         # we already list instances living on such nodes, and that's
         # enough warning
         continue
+      #TODO(dynmem): use MINMEM for checking
+      #TODO(dynmem): also consider ballooning out other instances
       for prinode, instances in n_img.sbp.items():
         needed_mem = 0
         for instance in instances:
           bep = cluster_info.FillBE(instance_cfg[instance])
           if bep[constants.BE_AUTO_BALANCE]:
-            needed_mem += bep[constants.BE_MEMORY]
+            needed_mem += bep[constants.BE_MAXMEM]
         test = n_img.mfree < needed_mem
         self._ErrorIf(test, constants.CV_ENODEN1, node,
                       "not enough memory to accomodate instance failovers"
@@ -2257,7 +2582,8 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
         node_drbd[minor] = (instance, False)
       else:
         instance = instanceinfo[instance]
-        node_drbd[minor] = (instance.name, instance.admin_up)
+        node_drbd[minor] = (instance.name,
+                            instance.admin_state == constants.ADMINST_UP)
 
     # and now check them
     used_minors = nresult.get(constants.NV_DRBDLIST, [])
@@ -2655,6 +2981,7 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
 
     i_non_redundant = [] # Non redundant instances
     i_non_a_balanced = [] # Non auto-balanced instances
+    i_offline = 0 # Count of offline instances
     n_offline = 0 # Count of offline nodes
     n_drained = 0 # Count of nodes being drained
     node_vol_should = {}
@@ -2880,6 +3207,12 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
         non_primary_inst = set(nimg.instances).difference(nimg.pinst)
 
         for inst in non_primary_inst:
+          # FIXME: investigate best way to handle offline insts
+          if inst.admin_state == constants.ADMINST_OFFLINE:
+            if verbose:
+              feedback_fn("* Skipping offline instance %s" % inst.name)
+            i_offline += 1
+            continue
           test = inst in self.all_inst_info
           _ErrorIf(test, constants.CV_EINSTANCEWRONGNODE, inst,
                    "instance should not run on node %s", node_i.name)
@@ -2905,7 +3238,8 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
                constants.CV_ENODERPC, pnode, "instance %s, connection to"
                " primary node failed", instance)
 
-      _ErrorIf(inst_config.admin_up and pnode_img.offline,
+      _ErrorIf(inst_config.admin_state == constants.ADMINST_UP and
+               pnode_img.offline,
                constants.CV_EINSTANCEBADNODE, instance,
                "instance is marked as running and lives on offline node %s",
                inst_config.primary_node)
@@ -2997,6 +3331,9 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
                   % len(i_non_a_balanced))
 
+    if i_offline:
+      feedback_fn("  - NOTICE: %d offline instance(s) found." % i_offline)
+
     if n_offline:
       feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
 
@@ -3159,8 +3496,8 @@ class LUGroupVerifyDisks(NoHooksLU):
     res_missing = {}
 
     nv_dict = _MapInstanceDisksToNodes([inst
-                                        for inst in self.instances.values()
-                                        if inst.admin_up])
+            for inst in self.instances.values()
+            if inst.admin_state == constants.ADMINST_UP])
 
     if nv_dict:
       nodes = utils.NiceSort(set(self.owned_locks(locking.LEVEL_NODE)) &
@@ -3211,7 +3548,10 @@ class LUClusterRepairDiskSizes(NoHooksLU):
         locking.LEVEL_NODE_RES: locking.ALL_SET,
         locking.LEVEL_INSTANCE: locking.ALL_SET,
         }
-    self.share_locks = _ShareAll()
+    self.share_locks = {
+      locking.LEVEL_NODE_RES: 1,
+      locking.LEVEL_INSTANCE: 0,
+      }
 
   def DeclareLocks(self, level):
     if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
@@ -3440,13 +3780,23 @@ class LUClusterSetParams(LogicalUnit):
     if self.op.master_netmask is not None:
       _ValidateNetmask(self.cfg, self.op.master_netmask)
 
+    if self.op.diskparams:
+      for dt_params in self.op.diskparams.values():
+        utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
+
   def ExpandNames(self):
     # FIXME: in the future maybe other cluster params won't require checking on
     # all nodes to be modified.
     self.needed_locks = {
       locking.LEVEL_NODE: locking.ALL_SET,
+      locking.LEVEL_INSTANCE: locking.ALL_SET,
+      locking.LEVEL_NODEGROUP: locking.ALL_SET,
+    }
+    self.share_locks = {
+        locking.LEVEL_NODE: 1,
+        locking.LEVEL_INSTANCE: 1,
+        locking.LEVEL_NODEGROUP: 1,
     }
-    self.share_locks[locking.LEVEL_NODE] = 1
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -3521,6 +3871,7 @@ class LUClusterSetParams(LogicalUnit):
     self.cluster = cluster = self.cfg.GetClusterInfo()
     # validate params changes
     if self.op.beparams:
+      objects.UpgradeBeParams(self.op.beparams)
       utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
       self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
 
@@ -3534,6 +3885,42 @@ class LUClusterSetParams(LogicalUnit):
         self.new_ndparams["oob_program"] = \
             constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
 
+    if self.op.hv_state:
+      new_hv_state = _MergeAndVerifyHvState(self.op.hv_state,
+                                            self.cluster.hv_state_static)
+      self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
+                               for hv, values in new_hv_state.items())
+
+    if self.op.disk_state:
+      new_disk_state = _MergeAndVerifyDiskState(self.op.disk_state,
+                                                self.cluster.disk_state_static)
+      self.new_disk_state = \
+        dict((storage, dict((name, cluster.SimpleFillDiskState(values))
+                            for name, values in svalues.items()))
+             for storage, svalues in new_disk_state.items())
+
+    if self.op.ipolicy:
+      self.new_ipolicy = _GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
+                                            group_policy=False)
+
+      all_instances = self.cfg.GetAllInstancesInfo().values()
+      violations = set()
+      for group in self.cfg.GetAllNodeGroupsInfo().values():
+        instances = frozenset([inst for inst in all_instances
+                               if compat.any(node in group.members
+                                             for node in inst.all_nodes)])
+        new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
+        new = _ComputeNewInstanceViolations(_CalculateGroupIPolicy(cluster,
+                                                                   group),
+                                            new_ipolicy, instances)
+        if new:
+          violations.update(new)
+
+      if violations:
+        self.LogWarning("After the ipolicy change the following instances"
+                        " violate them: %s",
+                        utils.CommaJoin(violations))
+
     if self.op.nicparams:
       utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
       self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
@@ -3571,6 +3958,15 @@ class LUClusterSetParams(LogicalUnit):
         else:
           self.new_hvparams[hv_name].update(hv_dict)
 
+    # disk template parameters
+    self.new_diskparams = objects.FillDict(cluster.diskparams, {})
+    if self.op.diskparams:
+      for dt_name, dt_params in self.op.diskparams.items():
+        if dt_name not in self.op.diskparams:
+          self.new_diskparams[dt_name] = dt_params
+        else:
+          self.new_diskparams[dt_name].update(dt_params)
+
     # os hypervisor parameters
     self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
     if self.op.os_hvp:
@@ -3685,10 +4081,18 @@ class LUClusterSetParams(LogicalUnit):
       self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
     if self.op.nicparams:
       self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
+    if self.op.ipolicy:
+      self.cluster.ipolicy = self.new_ipolicy
     if self.op.osparams:
       self.cluster.osparams = self.new_osp
     if self.op.ndparams:
       self.cluster.ndparams = self.new_ndparams
+    if self.op.diskparams:
+      self.cluster.diskparams = self.new_diskparams
+    if self.op.hv_state:
+      self.cluster.hv_state_static = self.new_hv_state
+    if self.op.disk_state:
+      self.cluster.disk_state_static = self.new_disk_state
 
     if self.op.candidate_pool_size is not None:
       self.cluster.candidate_pool_size = self.op.candidate_pool_size
@@ -3696,6 +4100,9 @@ class LUClusterSetParams(LogicalUnit):
       _AdjustCandidatePool(self, [])
 
     if self.op.maintain_node_health is not None:
+      if self.op.maintain_node_health and not constants.ENABLE_CONFD:
+        feedback_fn("Note: CONFD was disabled at build time, node health"
+                    " maintenance is not useful (still enabling it)")
       self.cluster.maintain_node_health = self.op.maintain_node_health
 
     if self.op.prealloc_wipe_disks is not None:
@@ -3945,8 +4352,9 @@ class LUClusterActivateMasterIp(NoHooksLU):
     """
     master_params = self.cfg.GetMasterNetworkParameters()
     ems = self.cfg.GetUseExternalMipScript()
-    self.rpc.call_node_activate_master_ip(master_params.name,
-                                          master_params, ems)
+    result = self.rpc.call_node_activate_master_ip(master_params.name,
+                                                   master_params, ems)
+    result.Raise("Could not activate the master IP")
 
 
 class LUClusterDeactivateMasterIp(NoHooksLU):
@@ -3959,8 +4367,9 @@ class LUClusterDeactivateMasterIp(NoHooksLU):
     """
     master_params = self.cfg.GetMasterNetworkParameters()
     ems = self.cfg.GetUseExternalMipScript()
-    self.rpc.call_node_deactivate_master_ip(master_params.name, master_params,
-                                            ems)
+    result = self.rpc.call_node_deactivate_master_ip(master_params.name,
+                                                     master_params, ems)
+    result.Raise("Could not deactivate the master IP")
 
 
 def _WaitForSync(lu, instance, disks=None, oneshot=False):
@@ -4464,7 +4873,7 @@ class LUNodeRemove(LogicalUnit):
       raise errors.OpPrereqError("Node is the master node, failover to another"
                                  " node is required", errors.ECODE_INVAL)
 
-    for instance_name, instance in self.cfg.GetAllInstancesInfo():
+    for instance_name, instance in self.cfg.GetAllInstancesInfo().items():
       if node.name in instance.all_nodes:
         raise errors.OpPrereqError("Instance %s is still running on the node,"
                                    " please remove first" % instance_name,
@@ -4543,9 +4952,9 @@ class _NodeQuery(_QueryBase):
       # filter out non-vm_capable nodes
       toquery_nodes = [name for name in nodenames if all_info[name].vm_capable]
 
-      node_data = lu.rpc.call_node_info(toquery_nodes, lu.cfg.GetVGName(),
-                                        lu.cfg.GetHypervisorType())
-      live_data = dict((name, nresult.payload)
+      node_data = lu.rpc.call_node_info(toquery_nodes, [lu.cfg.GetVGName()],
+                                        [lu.cfg.GetHypervisorType()])
+      live_data = dict((name, _MakeLegacyNodeInfo(nresult.payload))
                        for (name, nresult) in node_data.items()
                        if not nresult.fail_msg and nresult.payload)
     else:
@@ -5148,6 +5557,12 @@ class LUNodeAdd(LogicalUnit):
     if self.op.ndparams:
       utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
 
+    if self.op.hv_state:
+      self.new_hv_state = _MergeAndVerifyHvState(self.op.hv_state, None)
+
+    if self.op.disk_state:
+      self.new_disk_state = _MergeAndVerifyDiskState(self.op.disk_state, None)
+
   def Exec(self, feedback_fn):
     """Adds the new node to the cluster.
 
@@ -5186,6 +5601,12 @@ class LUNodeAdd(LogicalUnit):
     else:
       new_node.ndparams = {}
 
+    if self.op.hv_state:
+      new_node.hv_state_static = self.new_hv_state
+
+    if self.op.disk_state:
+      new_node.disk_state_static = self.new_disk_state
+
     # check connectivity
     result = self.rpc.call_version([node])[node]
     result.Raise("Can't get version information from node %s" % node)
@@ -5272,7 +5693,8 @@ class LUNodeSetParams(LogicalUnit):
     self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
     all_mods = [self.op.offline, self.op.master_candidate, self.op.drained,
                 self.op.master_capable, self.op.vm_capable,
-                self.op.secondary_ip, self.op.ndparams]
+                self.op.secondary_ip, self.op.ndparams, self.op.hv_state,
+                self.op.disk_state]
     if all_mods.count(None) == len(all_mods):
       raise errors.OpPrereqError("Please pass at least one modification",
                                  errors.ECODE_INVAL)
@@ -5309,6 +5731,16 @@ class LUNodeSetParams(LogicalUnit):
     else:
       self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
 
+    # Since modifying a node can have severe effects on currently running
+    # operations the resource lock is at least acquired in shared mode
+    self.needed_locks[locking.LEVEL_NODE_RES] = \
+      self.needed_locks[locking.LEVEL_NODE]
+
+    # Get node resource and instance locks in shared mode; they are not used
+    # for anything but read-only access
+    self.share_locks[locking.LEVEL_NODE_RES] = 1
+    self.share_locks[locking.LEVEL_INSTANCE] = 1
+
     if self.lock_instances:
       self.needed_locks[locking.LEVEL_INSTANCE] = \
         frozenset(self.cfg.GetInstancesInfoByFilter(self._InstanceFilter))
@@ -5485,7 +5917,8 @@ class LUNodeSetParams(LogicalUnit):
         # On online nodes, check that no instances are running, and that
         # the node has the new ip and we can reach it.
         for instance in affected_instances.values():
-          _CheckInstanceDown(self, instance, "cannot change secondary ip")
+          _CheckInstanceState(self, instance, INSTANCE_DOWN,
+                              msg="cannot change secondary ip")
 
         _CheckNodeHasSecondaryIP(self, node.name, self.op.secondary_ip, True)
         if master.name != node.name:
@@ -5502,6 +5935,15 @@ class LUNodeSetParams(LogicalUnit):
       utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
       self.new_ndparams = new_ndparams
 
+    if self.op.hv_state:
+      self.new_hv_state = _MergeAndVerifyHvState(self.op.hv_state,
+                                                 self.node.hv_state_static)
+
+    if self.op.disk_state:
+      self.new_disk_state = \
+        _MergeAndVerifyDiskState(self.op.disk_state,
+                                 self.node.disk_state_static)
+
   def Exec(self, feedback_fn):
     """Modifies a node.
 
@@ -5518,6 +5960,12 @@ class LUNodeSetParams(LogicalUnit):
     if self.op.powered is not None:
       node.powered = self.op.powered
 
+    if self.op.hv_state:
+      node.hv_state_static = self.new_hv_state
+
+    if self.op.disk_state:
+      node.disk_state_static = self.new_disk_state
+
     for attr in ["master_capable", "vm_capable"]:
       val = getattr(self.op, attr)
       if val is not None:
@@ -5625,13 +6073,14 @@ class LUClusterQuery(NoHooksLU):
       "architecture": (platform.architecture()[0], platform.machine()),
       "name": cluster.cluster_name,
       "master": cluster.master_node,
-      "default_hypervisor": cluster.enabled_hypervisors[0],
+      "default_hypervisor": cluster.primary_hypervisor,
       "enabled_hypervisors": cluster.enabled_hypervisors,
       "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
                         for hypervisor_name in cluster.enabled_hypervisors]),
       "os_hvp": os_hvp,
       "beparams": cluster.beparams,
       "osparams": cluster.osparams,
+      "ipolicy": cluster.ipolicy,
       "nicparams": cluster.nicparams,
       "ndparams": cluster.ndparams,
       "candidate_pool_size": cluster.candidate_pool_size,
@@ -5883,7 +6332,7 @@ def _SafeShutdownInstanceDisks(lu, instance, disks=None):
   _ShutdownInstanceDisks.
 
   """
-  _CheckInstanceDown(lu, instance, "cannot shutdown disks")
+  _CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
   _ShutdownInstanceDisks(lu, instance, disks=disks)
 
 
@@ -5953,10 +6402,12 @@ def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
       we cannot check the node
 
   """
-  nodeinfo = lu.rpc.call_node_info([node], None, hypervisor_name)
+  nodeinfo = lu.rpc.call_node_info([node], None, [hypervisor_name])
   nodeinfo[node].Raise("Can't get data from node %s" % node,
                        prereq=True, ecode=errors.ECODE_ENVIRON)
-  free_mem = nodeinfo[node].payload.get("memory_free", None)
+  (_, _, (hv_info, )) = nodeinfo[node].payload
+
+  free_mem = hv_info.get("memory_free", None)
   if not isinstance(free_mem, int):
     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
                                " was '%s'" % (node, free_mem),
@@ -6011,12 +6462,13 @@ def _CheckNodesFreeDiskOnVG(lu, nodenames, vg, requested):
       or we cannot check the node
 
   """
-  nodeinfo = lu.rpc.call_node_info(nodenames, vg, None)
+  nodeinfo = lu.rpc.call_node_info(nodenames, [vg], None)
   for node in nodenames:
     info = nodeinfo[node]
     info.Raise("Cannot get current information from node %s" % node,
                prereq=True, ecode=errors.ECODE_ENVIRON)
-    vg_free = info.payload.get("vg_free", None)
+    (_, (vg_info, ), _) = info.payload
+    vg_free = vg_info.get("vg_free", None)
     if not isinstance(vg_free, int):
       raise errors.OpPrereqError("Can't compute free disk space on node"
                                  " %s for vg %s, result was '%s'" %
@@ -6046,12 +6498,13 @@ def _CheckNodesPhysicalCPUs(lu, nodenames, requested, hypervisor_name):
       or we cannot check the node
 
   """
-  nodeinfo = lu.rpc.call_node_info(nodenames, None, hypervisor_name)
+  nodeinfo = lu.rpc.call_node_info(nodenames, None, [hypervisor_name])
   for node in nodenames:
     info = nodeinfo[node]
     info.Raise("Cannot get current information from node %s" % node,
                prereq=True, ecode=errors.ECODE_ENVIRON)
-    num_cpus = info.payload.get("cpu_total", None)
+    (_, _, (hv_info, )) = info.payload
+    num_cpus = hv_info.get("cpu_total", None)
     if not isinstance(num_cpus, int):
       raise errors.OpPrereqError("Can't compute the number of physical CPUs"
                                  " on node %s, result was '%s'" %
@@ -6074,10 +6527,16 @@ class LUInstanceStartup(LogicalUnit):
     # extra beparams
     if self.op.beparams:
       # fill the beparams dict
+      objects.UpgradeBeParams(self.op.beparams)
       utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
 
   def ExpandNames(self):
     self._ExpandAndLockInstance()
+    self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
+
+  def DeclareLocks(self, level):
+    if level == locking.LEVEL_NODE_RES:
+      self._LockInstancesNodes(primary_only=True, level=locking.LEVEL_NODE_RES)
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -6121,6 +6580,8 @@ class LUInstanceStartup(LogicalUnit):
       hv_type.CheckParameterSyntax(filled_hvp)
       _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
 
+    _CheckInstanceState(self, instance, INSTANCE_ONLINE)
+
     self.primary_offline = self.cfg.GetNodeInfo(instance.primary_node).offline
 
     if self.primary_offline and self.op.ignore_offline_nodes:
@@ -6132,6 +6593,7 @@ class LUInstanceStartup(LogicalUnit):
       _CheckNodeOnline(self, instance.primary_node)
 
       bep = self.cfg.GetClusterInfo().FillBE(instance)
+      bep.update(self.op.beparams)
 
       # check bridges existence
       _CheckInstanceBridgesExist(self, instance)
@@ -6144,7 +6606,7 @@ class LUInstanceStartup(LogicalUnit):
       if not remote_info.payload: # not running already
         _CheckNodeFreeMemory(self, instance.primary_node,
                              "starting instance %s" % instance.name,
-                             bep[constants.BE_MEMORY], instance.hypervisor)
+                             bep[constants.BE_MINMEM], instance.hypervisor)
 
   def Exec(self, feedback_fn):
     """Start the instance.
@@ -6218,7 +6680,7 @@ class LUInstanceReboot(LogicalUnit):
     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
     assert self.instance is not None, \
       "Cannot retrieve locked instance %s" % self.op.instance_name
-
+    _CheckInstanceState(self, instance, INSTANCE_ONLINE)
     _CheckNodeOnline(self, instance.primary_node)
 
     # check bridges existence
@@ -6307,6 +6769,8 @@ class LUInstanceShutdown(LogicalUnit):
     assert self.instance is not None, \
       "Cannot retrieve locked instance %s" % self.op.instance_name
 
+    _CheckInstanceState(self, self.instance, INSTANCE_ONLINE)
+
     self.primary_offline = \
       self.cfg.GetNodeInfo(self.instance.primary_node).offline
 
@@ -6383,7 +6847,7 @@ class LUInstanceReinstall(LogicalUnit):
       raise errors.OpPrereqError("Instance '%s' has no disks" %
                                  self.op.instance_name,
                                  errors.ECODE_INVAL)
-    _CheckInstanceDown(self, instance, "cannot reinstall")
+    _CheckInstanceState(self, instance, INSTANCE_DOWN, msg="cannot reinstall")
 
     if self.op.os_type is not None:
       # OS verification
@@ -6449,6 +6913,7 @@ class LUInstanceRecreateDisks(LogicalUnit):
       self.needed_locks[locking.LEVEL_NODE] = list(self.op.nodes)
     else:
       self.needed_locks[locking.LEVEL_NODE] = []
+    self.needed_locks[locking.LEVEL_NODE_RES] = []
 
   def DeclareLocks(self, level):
     if level == locking.LEVEL_NODE:
@@ -6510,7 +6975,8 @@ class LUInstanceRecreateDisks(LogicalUnit):
     assert instance.primary_node in self.owned_locks(locking.LEVEL_NODE_RES)
     old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
     if not (self.op.nodes and old_pnode.offline):
-      _CheckInstanceDown(self, instance, "cannot recreate disks")
+      _CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
+                          msg="cannot recreate disks")
 
     if not self.op.disks:
       self.op.disks = range(len(instance.disks))
@@ -6616,13 +7082,14 @@ class LUInstanceRename(LogicalUnit):
     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
     assert instance is not None
     _CheckNodeOnline(self, instance.primary_node)
-    _CheckInstanceDown(self, instance, "cannot rename")
+    _CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
+                        msg="cannot rename")
     self.instance = instance
 
     new_name = self.op.new_name
     if self.op.name_check:
       hostname = netutils.GetHostname(name=new_name)
-      if hostname != new_name:
+      if hostname.name != new_name:
         self.LogInfo("Resolved given name '%s' to '%s'", new_name,
                      hostname.name)
       if not utils.MatchNameComponent(self.op.new_name, [hostname.name]):
@@ -6702,11 +7169,16 @@ class LUInstanceRemove(LogicalUnit):
   def ExpandNames(self):
     self._ExpandAndLockInstance()
     self.needed_locks[locking.LEVEL_NODE] = []
+    self.needed_locks[locking.LEVEL_NODE_RES] = []
     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
 
   def DeclareLocks(self, level):
     if level == locking.LEVEL_NODE:
       self._LockInstancesNodes()
+    elif level == locking.LEVEL_NODE_RES:
+      # Copy node locks
+      self.needed_locks[locking.LEVEL_NODE_RES] = \
+        self.needed_locks[locking.LEVEL_NODE][:]
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -6755,6 +7227,12 @@ class LUInstanceRemove(LogicalUnit):
                                  " node %s: %s" %
                                  (instance.name, instance.primary_node, msg))
 
+    assert (self.owned_locks(locking.LEVEL_NODE) ==
+            self.owned_locks(locking.LEVEL_NODE_RES))
+    assert not (set(instance.all_nodes) -
+                self.owned_locks(locking.LEVEL_NODE)), \
+      "Not owning correct locks"
+
     _RemoveInstance(self, feedback_fn, instance, self.op.ignore_failures)
 
 
@@ -6831,7 +7309,8 @@ class LUInstanceFailover(LogicalUnit):
                                        cleanup=False,
                                        failover=True,
                                        ignore_consistency=ignore_consistency,
-                                       shutdown_timeout=shutdown_timeout)
+                                       shutdown_timeout=shutdown_timeout,
+                                       ignore_ipolicy=self.op.ignore_ipolicy)
     self.tasklets = [self._migrater]
 
   def DeclareLocks(self, level):
@@ -6905,7 +7384,8 @@ class LUInstanceMigrate(LogicalUnit):
     self._migrater = TLMigrateInstance(self, self.op.instance_name,
                                        cleanup=self.op.cleanup,
                                        failover=False,
-                                       fallback=self.op.allow_failover)
+                                       fallback=self.op.allow_failover,
+                                       ignore_ipolicy=self.op.ignore_ipolicy)
     self.tasklets = [self._migrater]
 
   def DeclareLocks(self, level):
@@ -6968,11 +7448,16 @@ class LUInstanceMove(LogicalUnit):
     target_node = _ExpandNodeName(self.cfg, self.op.target_node)
     self.op.target_node = target_node
     self.needed_locks[locking.LEVEL_NODE] = [target_node]
+    self.needed_locks[locking.LEVEL_NODE_RES] = []
     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
 
   def DeclareLocks(self, level):
     if level == locking.LEVEL_NODE:
       self._LockInstancesNodes(primary_only=True)
+    elif level == locking.LEVEL_NODE_RES:
+      # Copy node locks
+      self.needed_locks[locking.LEVEL_NODE_RES] = \
+        self.needed_locks[locking.LEVEL_NODE][:]
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -7029,11 +7514,15 @@ class LUInstanceMove(LogicalUnit):
     _CheckNodeOnline(self, target_node)
     _CheckNodeNotDrained(self, target_node)
     _CheckNodeVmCapable(self, target_node)
+    ipolicy = _CalculateGroupIPolicy(self.cfg.GetClusterInfo(),
+                                     self.cfg.GetNodeGroup(node.group))
+    _CheckTargetNodeIPolicy(self, ipolicy, instance, node,
+                            ignore=self.op.ignore_ipolicy)
 
-    if instance.admin_up:
+    if instance.admin_state == constants.ADMINST_UP:
       # check memory requirements on the secondary node
       _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
-                           instance.name, bep[constants.BE_MEMORY],
+                           instance.name, bep[constants.BE_MAXMEM],
                            instance.hypervisor)
     else:
       self.LogInfo("Not checking memory on the secondary node as"
@@ -7057,6 +7546,9 @@ class LUInstanceMove(LogicalUnit):
     self.LogInfo("Shutting down instance %s on source node %s",
                  instance.name, source_node)
 
+    assert (self.owned_locks(locking.LEVEL_NODE) ==
+            self.owned_locks(locking.LEVEL_NODE_RES))
+
     result = self.rpc.call_instance_shutdown(source_node, instance,
                                              self.op.shutdown_timeout)
     msg = result.fail_msg
@@ -7121,7 +7613,7 @@ class LUInstanceMove(LogicalUnit):
     _RemoveDisks(self, instance, target_node=source_node)
 
     # Only start the instance if it's marked as up
-    if instance.admin_up:
+    if instance.admin_state == constants.ADMINST_UP:
       self.LogInfo("Starting instance %s on node %s",
                    instance.name, target_node)
 
@@ -7186,7 +7678,8 @@ class LUNodeMigrate(LogicalUnit):
                                  mode=self.op.mode,
                                  live=self.op.live,
                                  iallocator=self.op.iallocator,
-                                 target_node=self.op.target_node)]
+                                 target_node=self.op.target_node,
+                                 ignore_ipolicy=self.op.ignore_ipolicy)]
       for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name)
       ]
 
@@ -7223,6 +7716,8 @@ class TLMigrateInstance(Tasklet):
                             and target node
   @type shutdown_timeout: int
   @ivar shutdown_timeout: In case of failover timeout of the shutdown
+  @type ignore_ipolicy: bool
+  @ivar ignore_ipolicy: If true, we can ignore instance policy when migrating
 
   """
 
@@ -7233,7 +7728,8 @@ class TLMigrateInstance(Tasklet):
   def __init__(self, lu, instance_name, cleanup=False,
                failover=False, fallback=False,
                ignore_consistency=False,
-               shutdown_timeout=constants.DEFAULT_SHUTDOWN_TIMEOUT):
+               shutdown_timeout=constants.DEFAULT_SHUTDOWN_TIMEOUT,
+               ignore_ipolicy=False):
     """Initializes this class.
 
     """
@@ -7247,6 +7743,7 @@ class TLMigrateInstance(Tasklet):
     self.fallback = fallback
     self.ignore_consistency = ignore_consistency
     self.shutdown_timeout = shutdown_timeout
+    self.ignore_ipolicy = ignore_ipolicy
 
   def CheckPrereq(self):
     """Check prerequisites.
@@ -7258,11 +7755,13 @@ class TLMigrateInstance(Tasklet):
     instance = self.cfg.GetInstanceInfo(instance_name)
     assert instance is not None
     self.instance = instance
+    cluster = self.cfg.GetClusterInfo()
 
-    if (not self.cleanup and not instance.admin_up and not self.failover and
-        self.fallback):
-      self.lu.LogInfo("Instance is marked down, fallback allowed, switching"
-                      " to failover")
+    if (not self.cleanup and
+        not instance.admin_state == constants.ADMINST_UP and
+        not self.failover and self.fallback):
+      self.lu.LogInfo("Instance is marked down or offline, fallback allowed,"
+                      " switching to failover")
       self.failover = True
 
     if instance.disk_template not in constants.DTS_MIRRORED:
@@ -7284,6 +7783,13 @@ class TLMigrateInstance(Tasklet):
         # BuildHooksEnv
         self.target_node = self.lu.op.target_node
 
+      # Check that the target node is correct in terms of instance policy
+      nodeinfo = self.cfg.GetNodeInfo(self.target_node)
+      group_info = self.cfg.GetNodeGroup(nodeinfo.group)
+      ipolicy = _CalculateGroupIPolicy(cluster, group_info)
+      _CheckTargetNodeIPolicy(self.lu, ipolicy, instance, nodeinfo,
+                              ignore=self.ignore_ipolicy)
+
       # self.target_node is already populated, either directly or by the
       # iallocator run
       target_node = self.target_node
@@ -7317,18 +7823,36 @@ class TLMigrateInstance(Tasklet):
                                    " node can be passed)" %
                                    (instance.disk_template, text),
                                    errors.ECODE_INVAL)
+      nodeinfo = self.cfg.GetNodeInfo(target_node)
+      group_info = self.cfg.GetNodeGroup(nodeinfo.group)
+      ipolicy = _CalculateGroupIPolicy(cluster, group_info)
+      _CheckTargetNodeIPolicy(self.lu, ipolicy, instance, nodeinfo,
+                              ignore=self.ignore_ipolicy)
 
-    i_be = self.cfg.GetClusterInfo().FillBE(instance)
+    i_be = cluster.FillBE(instance)
 
     # check memory requirements on the secondary node
-    if not self.failover or instance.admin_up:
+    if (not self.cleanup and
+         (not self.failover or instance.admin_state == constants.ADMINST_UP)):
       _CheckNodeFreeMemory(self.lu, target_node, "migrating instance %s" %
-                           instance.name, i_be[constants.BE_MEMORY],
+                           instance.name, i_be[constants.BE_MAXMEM],
                            instance.hypervisor)
     else:
       self.lu.LogInfo("Not checking memory on the secondary node as"
                       " instance will not be started")
 
+    # check if failover must be forced instead of migration
+    if (not self.cleanup and not self.failover and
+        i_be[constants.BE_ALWAYS_FAILOVER]):
+      if self.fallback:
+        self.lu.LogInfo("Instance configured to always failover; fallback"
+                        " to failover")
+        self.failover = True
+      else:
+        raise errors.OpPrereqError("This instance has been configured to"
+                                   " always failover, please allow failover",
+                                   errors.ECODE_STATE)
+
     # check bridge existance
     _CheckInstanceBridgesExist(self.lu, instance, node=target_node)
 
@@ -7362,8 +7886,7 @@ class TLMigrateInstance(Tasklet):
         self.lu.op.live = None
       elif self.lu.op.mode is None:
         # read the default value from the hypervisor
-        i_hv = self.cfg.GetClusterInfo().FillHV(self.instance,
-                                                skip_globals=False)
+        i_hv = cluster.FillHV(self.instance, skip_globals=False)
         self.lu.op.mode = i_hv[constants.HV_MIGRATION_MODE]
 
       self.live = self.lu.op.mode == constants.HT_MIGRATION_LIVE
@@ -7375,6 +7898,7 @@ class TLMigrateInstance(Tasklet):
     """Run the allocator based on input opcode.
 
     """
+    # FIXME: add a self.ignore_ipolicy option
     ial = IAllocator(self.cfg, self.rpc,
                      mode=constants.IALLOCATOR_MODE_RELOC,
                      name=self.instance_name,
@@ -7592,14 +8116,17 @@ class TLMigrateInstance(Tasklet):
 
     # Check for hypervisor version mismatch and warn the user.
     nodeinfo = self.rpc.call_node_info([source_node, target_node],
-                                       None, self.instance.hypervisor)
-    src_info = nodeinfo[source_node]
-    dst_info = nodeinfo[target_node]
-
-    if ((constants.HV_NODEINFO_KEY_VERSION in src_info.payload) and
-        (constants.HV_NODEINFO_KEY_VERSION in dst_info.payload)):
-      src_version = src_info.payload[constants.HV_NODEINFO_KEY_VERSION]
-      dst_version = dst_info.payload[constants.HV_NODEINFO_KEY_VERSION]
+                                       None, [self.instance.hypervisor])
+    for ninfo in nodeinfo.values():
+      ninfo.Raise("Unable to retrieve node information from node '%s'" %
+                  ninfo.node)
+    (_, _, (src_info, )) = nodeinfo[source_node].payload
+    (_, _, (dst_info, )) = nodeinfo[target_node].payload
+
+    if ((constants.HV_NODEINFO_KEY_VERSION in src_info) and
+        (constants.HV_NODEINFO_KEY_VERSION in dst_info)):
+      src_version = src_info[constants.HV_NODEINFO_KEY_VERSION]
+      dst_version = dst_info[constants.HV_NODEINFO_KEY_VERSION]
       if src_version != dst_version:
         self.feedback_fn("* warning: hypervisor version mismatch between"
                          " source (%s) and target (%s) node" %
@@ -7723,6 +8250,21 @@ class TLMigrateInstance(Tasklet):
       self._GoReconnect(False)
       self._WaitUntilSync()
 
+    # If the instance's disk template is `rbd' and there was a successful
+    # migration, unmap the device from the source node.
+    if self.instance.disk_template == constants.DT_RBD:
+      disks = _ExpandCheckDisks(instance, instance.disks)
+      self.feedback_fn("* unmapping instance's disks from %s" % source_node)
+      for disk in disks:
+        result = self.rpc.call_blockdev_shutdown(source_node, disk)
+        msg = result.fail_msg
+        if msg:
+          logging.error("Migration was successful, but couldn't unmap the"
+                        " block device %s on source node %s: %s",
+                        disk.iv_name, source_node, msg)
+          logging.error("You need to unmap the device %s manually on %s",
+                        disk.iv_name, source_node)
+
     self.feedback_fn("* done")
 
   def _ExecFailover(self):
@@ -7738,7 +8280,7 @@ class TLMigrateInstance(Tasklet):
     source_node = instance.primary_node
     target_node = self.target_node
 
-    if instance.admin_up:
+    if instance.admin_state == constants.ADMINST_UP:
       self.feedback_fn("* checking disk consistency between source and target")
       for dev in instance.disks:
         # for drbd, these are drbd over lvm
@@ -7781,7 +8323,7 @@ class TLMigrateInstance(Tasklet):
     self.cfg.Update(instance, self.feedback_fn)
 
     # Only start the instance if it's marked as up
-    if instance.admin_up:
+    if instance.admin_state == constants.ADMINST_UP:
       self.feedback_fn("* activating the instance's disks on target node %s" %
                        target_node)
       logging.info("Starting instance %s on node %s",
@@ -7917,24 +8459,113 @@ def _GenerateUniqueNames(lu, exts):
   return results
 
 
+def _ComputeLDParams(disk_template, disk_params):
+  """Computes Logical Disk parameters from Disk Template parameters.
+
+  @type disk_template: string
+  @param disk_template: disk template, one of L{constants.DISK_TEMPLATES}
+  @type disk_params: dict
+  @param disk_params: disk template parameters; dict(template_name -> parameters
+  @rtype: list(dict)
+  @return: a list of dicts, one for each node of the disk hierarchy. Each dict
+    contains the LD parameters of the node. The tree is flattened in-order.
+
+  """
+  if disk_template not in constants.DISK_TEMPLATES:
+    raise errors.ProgrammerError("Unknown disk template %s" % disk_template)
+
+  result = list()
+  dt_params = disk_params[disk_template]
+  if disk_template == constants.DT_DRBD8:
+    drbd_params = {
+      constants.LDP_RESYNC_RATE: dt_params[constants.DRBD_RESYNC_RATE],
+      constants.LDP_BARRIERS: dt_params[constants.DRBD_DISK_BARRIERS],
+      constants.LDP_NO_META_FLUSH: dt_params[constants.DRBD_META_BARRIERS],
+      constants.LDP_DEFAULT_METAVG: dt_params[constants.DRBD_DEFAULT_METAVG],
+      constants.LDP_DISK_CUSTOM: dt_params[constants.DRBD_DISK_CUSTOM],
+      constants.LDP_NET_CUSTOM: dt_params[constants.DRBD_NET_CUSTOM],
+      constants.LDP_DYNAMIC_RESYNC: dt_params[constants.DRBD_DYNAMIC_RESYNC],
+      constants.LDP_PLAN_AHEAD: dt_params[constants.DRBD_PLAN_AHEAD],
+      constants.LDP_FILL_TARGET: dt_params[constants.DRBD_FILL_TARGET],
+      constants.LDP_DELAY_TARGET: dt_params[constants.DRBD_DELAY_TARGET],
+      constants.LDP_MAX_RATE: dt_params[constants.DRBD_MAX_RATE],
+      constants.LDP_MIN_RATE: dt_params[constants.DRBD_MIN_RATE],
+      }
+
+    drbd_params = \
+      objects.FillDict(constants.DISK_LD_DEFAULTS[constants.LD_DRBD8],
+                       drbd_params)
+
+    result.append(drbd_params)
+
+    # data LV
+    data_params = {
+      constants.LDP_STRIPES: dt_params[constants.DRBD_DATA_STRIPES],
+      }
+    data_params = \
+      objects.FillDict(constants.DISK_LD_DEFAULTS[constants.LD_LV],
+                       data_params)
+    result.append(data_params)
+
+    # metadata LV
+    meta_params = {
+      constants.LDP_STRIPES: dt_params[constants.DRBD_META_STRIPES],
+      }
+    meta_params = \
+      objects.FillDict(constants.DISK_LD_DEFAULTS[constants.LD_LV],
+                       meta_params)
+    result.append(meta_params)
+
+  elif (disk_template == constants.DT_FILE or
+        disk_template == constants.DT_SHARED_FILE):
+    result.append(constants.DISK_LD_DEFAULTS[constants.LD_FILE])
+
+  elif disk_template == constants.DT_PLAIN:
+    params = {
+      constants.LDP_STRIPES: dt_params[constants.LV_STRIPES],
+      }
+    params = \
+      objects.FillDict(constants.DISK_LD_DEFAULTS[constants.LD_LV],
+                       params)
+    result.append(params)
+
+  elif disk_template == constants.DT_BLOCK:
+    result.append(constants.DISK_LD_DEFAULTS[constants.LD_BLOCKDEV])
+
+  elif disk_template == constants.DT_RBD:
+    params = {
+      constants.LDP_POOL: dt_params[constants.RBD_POOL]
+      }
+    params = \
+      objects.FillDict(constants.DISK_LD_DEFAULTS[constants.LD_RBD],
+                       params)
+    result.append(params)
+
+  return result
+
+
 def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names,
-                         iv_name, p_minor, s_minor):
+                         iv_name, p_minor, s_minor, drbd_params, data_params,
+                         meta_params):
   """Generate a drbd8 device complete with its children.
 
   """
   assert len(vgnames) == len(names) == 2
   port = lu.cfg.AllocatePort()
   shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
+
   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
-                          logical_id=(vgnames[0], names[0]))
+                          logical_id=(vgnames[0], names[0]),
+                          params=data_params)
   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=DRBD_META_SIZE,
-                          logical_id=(vgnames[1], names[1]))
+                          logical_id=(vgnames[1], names[1]),
+                          params=meta_params)
   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
                           logical_id=(primary, secondary, port,
                                       p_minor, s_minor,
                                       shared_secret),
                           children=[dev_data, dev_meta],
-                          iv_name=iv_name)
+                          iv_name=iv_name, params=drbd_params)
   return drbd_dev
 
 
@@ -7942,7 +8573,7 @@ def _GenerateDiskTemplate(lu, template_name,
                           instance_name, primary_node,
                           secondary_nodes, disk_info,
                           file_storage_dir, file_driver,
-                          base_index, feedback_fn):
+                          base_index, feedback_fn, disk_params):
   """Generate the entire disk layout for a given template type.
 
   """
@@ -7951,10 +8582,11 @@ def _GenerateDiskTemplate(lu, template_name,
   vgname = lu.cfg.GetVGName()
   disk_count = len(disk_info)
   disks = []
+  ld_params = _ComputeLDParams(template_name, disk_params)
   if template_name == constants.DT_DISKLESS:
     pass
   elif template_name == constants.DT_PLAIN:
-    if len(secondary_nodes) != 0:
+    if secondary_nodes:
       raise errors.ProgrammerError("Wrong template configuration")
 
     names = _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
@@ -7967,9 +8599,11 @@ def _GenerateDiskTemplate(lu, template_name,
                               size=disk[constants.IDISK_SIZE],
                               logical_id=(vg, names[idx]),
                               iv_name="disk/%d" % disk_index,
-                              mode=disk[constants.IDISK_MODE])
+                              mode=disk[constants.IDISK_MODE],
+                              params=ld_params[0])
       disks.append(disk_dev)
   elif template_name == constants.DT_DRBD8:
+    drbd_params, data_params, meta_params = ld_params
     if len(secondary_nodes) != 1:
       raise errors.ProgrammerError("Wrong template configuration")
     remote_node = secondary_nodes[0]
@@ -7983,18 +8617,20 @@ def _GenerateDiskTemplate(lu, template_name,
       names.append(lv_prefix + "_meta")
     for idx, disk in enumerate(disk_info):
       disk_index = idx + base_index
+      drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
       data_vg = disk.get(constants.IDISK_VG, vgname)
-      meta_vg = disk.get(constants.IDISK_METAVG, data_vg)
+      meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
                                       disk[constants.IDISK_SIZE],
                                       [data_vg, meta_vg],
                                       names[idx * 2:idx * 2 + 2],
                                       "disk/%d" % disk_index,
-                                      minors[idx * 2], minors[idx * 2 + 1])
+                                      minors[idx * 2], minors[idx * 2 + 1],
+                                      drbd_params, data_params, meta_params)
       disk_dev.mode = disk[constants.IDISK_MODE]
       disks.append(disk_dev)
   elif template_name == constants.DT_FILE:
-    if len(secondary_nodes) != 0:
+    if secondary_nodes:
       raise errors.ProgrammerError("Wrong template configuration")
 
     opcodes.RequireFileStorage()
@@ -8007,10 +8643,11 @@ def _GenerateDiskTemplate(lu, template_name,
                               logical_id=(file_driver,
                                           "%s/disk%d" % (file_storage_dir,
                                                          disk_index)),
-                              mode=disk[constants.IDISK_MODE])
+                              mode=disk[constants.IDISK_MODE],
+                              params=ld_params[0])
       disks.append(disk_dev)
   elif template_name == constants.DT_SHARED_FILE:
-    if len(secondary_nodes) != 0:
+    if secondary_nodes:
       raise errors.ProgrammerError("Wrong template configuration")
 
     opcodes.RequireSharedFileStorage()
@@ -8023,10 +8660,11 @@ def _GenerateDiskTemplate(lu, template_name,
                               logical_id=(file_driver,
                                           "%s/disk%d" % (file_storage_dir,
                                                          disk_index)),
-                              mode=disk[constants.IDISK_MODE])
+                              mode=disk[constants.IDISK_MODE],
+                              params=ld_params[0])
       disks.append(disk_dev)
   elif template_name == constants.DT_BLOCK:
-    if len(secondary_nodes) != 0:
+    if secondary_nodes:
       raise errors.ProgrammerError("Wrong template configuration")
 
     for idx, disk in enumerate(disk_info):
@@ -8036,7 +8674,24 @@ def _GenerateDiskTemplate(lu, template_name,
                               logical_id=(constants.BLOCKDEV_DRIVER_MANUAL,
                                           disk[constants.IDISK_ADOPT]),
                               iv_name="disk/%d" % disk_index,
-                              mode=disk[constants.IDISK_MODE])
+                              mode=disk[constants.IDISK_MODE],
+                              params=ld_params[0])
+      disks.append(disk_dev)
+  elif template_name == constants.DT_RBD:
+    if secondary_nodes:
+      raise errors.ProgrammerError("Wrong template configuration")
+
+    names = _GenerateUniqueNames(lu, [".rbd.disk%d" % (base_index + i)
+                                      for i in range(disk_count)])
+
+    for idx, disk in enumerate(disk_info):
+      disk_index = idx + base_index
+      disk_dev = objects.Disk(dev_type=constants.LD_RBD,
+                              size=disk[constants.IDISK_SIZE],
+                              logical_id=("rbd", names[idx]),
+                              iv_name="disk/%d" % disk_index,
+                              mode=disk[constants.IDISK_MODE],
+                              params=ld_params[0])
       disks.append(disk_dev)
 
   else:
@@ -8212,6 +8867,11 @@ def _RemoveDisks(lu, instance, target_node=None):
                       " continuing anyway: %s", device.iv_name, node, msg)
         all_result = False
 
+    # if this is a DRBD disk, return its port to the pool
+    if device.dev_type in constants.LDS_DRBD:
+      tcp_port = device.logical_id[2]
+      lu.cfg.AddTcpUdpPort(tcp_port)
+
   if instance.disk_template == constants.DT_FILE:
     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
     if target_node:
@@ -8273,6 +8933,7 @@ def _ComputeDiskSize(disk_template, disks):
     constants.DT_FILE: None,
     constants.DT_SHARED_FILE: 0,
     constants.DT_BLOCK: 0,
+    constants.DT_RBD: 0,
   }
 
   if disk_template not in req_size_dict:
@@ -8589,7 +9250,7 @@ class LUInstanceCreate(LogicalUnit):
                      tags=self.op.tags,
                      os=self.op.os_type,
                      vcpus=self.be_full[constants.BE_VCPUS],
-                     memory=self.be_full[constants.BE_MEMORY],
+                     memory=self.be_full[constants.BE_MAXMEM],
                      disks=self.disks,
                      nics=nics,
                      hypervisor=self.op.hypervisor,
@@ -8634,7 +9295,8 @@ class LUInstanceCreate(LogicalUnit):
       secondary_nodes=self.secondaries,
       status=self.op.start,
       os_type=self.op.os_type,
-      memory=self.be_full[constants.BE_MEMORY],
+      minmem=self.be_full[constants.BE_MINMEM],
+      maxmem=self.be_full[constants.BE_MAXMEM],
       vcpus=self.be_full[constants.BE_VCPUS],
       nics=_NICListToTuple(self, self.nics),
       disk_template=self.op.disk_template,
@@ -8770,6 +9432,12 @@ class LUInstanceCreate(LogicalUnit):
       for name, value in einfo.items(constants.INISECT_BEP):
         if name not in self.op.beparams:
           self.op.beparams[name] = value
+        # Compatibility for the old "memory" be param
+        if name == constants.BE_MEMORY:
+          if constants.BE_MAXMEM not in self.op.beparams:
+            self.op.beparams[constants.BE_MAXMEM] = value
+          if constants.BE_MINMEM not in self.op.beparams:
+            self.op.beparams[constants.BE_MINMEM] = value
     else:
       # try to read the parameters old style, from the main section
       for name in constants.BES_PARAMETERS:
@@ -8837,7 +9505,7 @@ class LUInstanceCreate(LogicalUnit):
       # pylint: disable=W0142
       self.instance_file_storage_dir = utils.PathJoin(*joinargs)
 
-  def CheckPrereq(self):
+  def CheckPrereq(self): # pylint: disable=R0914
     """Check prerequisites.
 
     """
@@ -8883,6 +9551,7 @@ class LUInstanceCreate(LogicalUnit):
     for param, value in self.op.beparams.iteritems():
       if value == constants.VALUE_AUTO:
         self.op.beparams[param] = default_beparams[param]
+    objects.UpgradeBeParams(self.op.beparams)
     utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
     self.be_full = cluster.SimpleFillBE(self.op.beparams)
 
@@ -8977,8 +9646,9 @@ class LUInstanceCreate(LogicalUnit):
         constants.IDISK_SIZE: size,
         constants.IDISK_MODE: mode,
         constants.IDISK_VG: data_vg,
-        constants.IDISK_METAVG: disk.get(constants.IDISK_METAVG, data_vg),
         }
+      if constants.IDISK_METAVG in disk:
+        new_disk[constants.IDISK_METAVG] = disk[constants.IDISK_METAVG]
       if constants.IDISK_ADOPT in disk:
         new_disk[constants.IDISK_ADOPT] = disk[constants.IDISK_ADOPT]
       self.disks.append(new_disk)
@@ -9030,6 +9700,14 @@ class LUInstanceCreate(LogicalUnit):
     if self.op.iallocator is not None:
       self._RunAllocator()
 
+    # Release all unneeded node locks
+    _ReleaseLocks(self, locking.LEVEL_NODE,
+                  keep=filter(None, [self.op.pnode, self.op.snode,
+                                     self.op.src_node]))
+    _ReleaseLocks(self, locking.LEVEL_NODE_RES,
+                  keep=filter(None, [self.op.pnode, self.op.snode,
+                                     self.op.src_node]))
+
     #### node related checks
 
     # check primary node
@@ -9058,12 +9736,47 @@ class LUInstanceCreate(LogicalUnit):
       _CheckNodeVmCapable(self, self.op.snode)
       self.secondaries.append(self.op.snode)
 
+      snode = self.cfg.GetNodeInfo(self.op.snode)
+      if pnode.group != snode.group:
+        self.LogWarning("The primary and secondary nodes are in two"
+                        " different node groups; the disk parameters"
+                        " from the first disk's node group will be"
+                        " used")
+
     nodenames = [pnode.name] + self.secondaries
 
+    # Verify instance specs
+    ispec = {
+      constants.ISPEC_MEM_SIZE: self.be_full.get(constants.BE_MAXMEM, None),
+      constants.ISPEC_CPU_COUNT: self.be_full.get(constants.BE_VCPUS, None),
+      constants.ISPEC_DISK_COUNT: len(self.disks),
+      constants.ISPEC_DISK_SIZE: [disk["size"] for disk in self.disks],
+      constants.ISPEC_NIC_COUNT: len(self.nics),
+      }
+
+    group_info = self.cfg.GetNodeGroup(pnode.group)
+    ipolicy = _CalculateGroupIPolicy(cluster, group_info)
+    res = _ComputeIPolicyInstanceSpecViolation(ipolicy, ispec)
+    if not self.op.ignore_ipolicy and res:
+      raise errors.OpPrereqError(("Instance allocation to group %s violates"
+                                  " policy: %s") % (pnode.group,
+                                                    utils.CommaJoin(res)),
+                                  errors.ECODE_INVAL)
+
+    # disk parameters (not customizable at instance or node level)
+    # just use the primary node parameters, ignoring the secondary.
+    self.diskparams = group_info.diskparams
+
     if not self.adopt_disks:
-      # Check lv size requirements, if not adopting
-      req_sizes = _ComputeDiskSizePerVG(self.op.disk_template, self.disks)
-      _CheckNodesFreeDiskPerVG(self, nodenames, req_sizes)
+      if self.op.disk_template == constants.DT_RBD:
+        # _CheckRADOSFreeSpace() is just a placeholder.
+        # Any function that checks prerequisites can be placed here.
+        # Check if there is enough space on the RADOS cluster.
+        _CheckRADOSFreeSpace()
+      else:
+        # Check lv size requirements, if not adopting
+        req_sizes = _ComputeDiskSizePerVG(self.op.disk_template, self.disks)
+        _CheckNodesFreeDiskPerVG(self, nodenames, req_sizes)
 
     elif self.op.disk_template == constants.DT_PLAIN: # Check the adoption data
       all_lvs = set(["%s/%s" % (disk[constants.IDISK_VG],
@@ -9144,10 +9857,11 @@ class LUInstanceCreate(LogicalUnit):
     _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
 
     # memory check on primary node
+    #TODO(dynmem): use MINMEM for checking
     if self.op.start:
       _CheckNodeFreeMemory(self, self.pnode.name,
                            "creating instance %s" % self.op.instance_name,
-                           self.be_full[constants.BE_MEMORY],
+                           self.be_full[constants.BE_MAXMEM],
                            self.op.hypervisor)
 
     self.dry_run_result = list(nodenames)
@@ -9177,13 +9891,14 @@ class LUInstanceCreate(LogicalUnit):
                                   self.instance_file_storage_dir,
                                   self.op.file_driver,
                                   0,
-                                  feedback_fn)
+                                  feedback_fn,
+                                  self.diskparams)
 
     iobj = objects.Instance(name=instance, os=self.op.os_type,
                             primary_node=pnode_name,
                             nics=self.nics, disks=disks,
                             disk_template=self.op.disk_template,
-                            admin_up=False,
+                            admin_state=constants.ADMINST_DOWN,
                             network_port=network_port,
                             beparams=self.op.beparams,
                             hvparams=self.op.hvparams,
@@ -9363,7 +10078,7 @@ class LUInstanceCreate(LogicalUnit):
     assert not self.owned_locks(locking.LEVEL_NODE_RES)
 
     if self.op.start:
-      iobj.admin_up = True
+      iobj.admin_state = constants.ADMINST_UP
       self.cfg.Update(iobj, feedback_fn)
       logging.info("Starting instance %s on node %s", instance, pnode_name)
       feedback_fn("* starting instance...")
@@ -9374,6 +10089,14 @@ class LUInstanceCreate(LogicalUnit):
     return list(iobj.all_nodes)
 
 
+def _CheckRADOSFreeSpace():
+  """Compute disk size requirements inside the RADOS cluster.
+
+  """
+  # For the RADOS cluster we assume there is always enough space.
+  pass
+
+
 class LUInstanceConsole(NoHooksLU):
   """Connect to an instance's console.
 
@@ -9411,10 +10134,12 @@ class LUInstanceConsole(NoHooksLU):
     node_insts.Raise("Can't get node information from %s" % node)
 
     if instance.name not in node_insts.payload:
-      if instance.admin_up:
+      if instance.admin_state == constants.ADMINST_UP:
         state = constants.INSTST_ERRORDOWN
-      else:
+      elif instance.admin_state == constants.ADMINST_DOWN:
         state = constants.INSTST_ADMINDOWN
+      else:
+        state = constants.INSTST_ADMINOFFLINE
       raise errors.OpExecError("Instance %s is not running (state %s)" %
                                (instance.name, state))
 
@@ -9460,6 +10185,7 @@ class LUInstanceReplaceDisks(LogicalUnit):
     self._ExpandAndLockInstance()
 
     assert locking.LEVEL_NODE not in self.needed_locks
+    assert locking.LEVEL_NODE_RES not in self.needed_locks
     assert locking.LEVEL_NODEGROUP not in self.needed_locks
 
     assert self.op.iallocator is None or self.op.remote_node is None, \
@@ -9482,9 +10208,12 @@ class LUInstanceReplaceDisks(LogicalUnit):
         # iallocator will select a new node in the same group
         self.needed_locks[locking.LEVEL_NODEGROUP] = []
 
+    self.needed_locks[locking.LEVEL_NODE_RES] = []
+
     self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
                                    self.op.iallocator, self.op.remote_node,
-                                   self.op.disks, False, self.op.early_release)
+                                   self.op.disks, False, self.op.early_release,
+                                   self.op.ignore_ipolicy)
 
     self.tasklets = [self.replacer]
 
@@ -9495,6 +10224,8 @@ class LUInstanceReplaceDisks(LogicalUnit):
       assert not self.needed_locks[locking.LEVEL_NODEGROUP]
 
       self.share_locks[locking.LEVEL_NODEGROUP] = 1
+      # Lock all groups used by instance optimistically; this requires going
+      # via the node before it's locked, requiring verification later on
       self.needed_locks[locking.LEVEL_NODEGROUP] = \
         self.cfg.GetInstanceNodeGroups(self.op.instance_name)
 
@@ -9509,6 +10240,10 @@ class LUInstanceReplaceDisks(LogicalUnit):
           for node_name in self.cfg.GetNodeGroup(group_uuid).members]
       else:
         self._LockInstancesNodes()
+    elif level == locking.LEVEL_NODE_RES:
+      # Reuse node locks
+      self.needed_locks[locking.LEVEL_NODE_RES] = \
+        self.needed_locks[locking.LEVEL_NODE]
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -9545,6 +10280,7 @@ class LUInstanceReplaceDisks(LogicalUnit):
     assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or
             self.op.iallocator is None)
 
+    # Verify if node group locks are still correct
     owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
     if owned_groups:
       _CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups)
@@ -9559,7 +10295,7 @@ class TLReplaceDisks(Tasklet):
 
   """
   def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
-               disks, delay_iallocator, early_release):
+               disks, delay_iallocator, early_release, ignore_ipolicy):
     """Initializes this class.
 
     """
@@ -9573,6 +10309,7 @@ class TLReplaceDisks(Tasklet):
     self.disks = disks
     self.delay_iallocator = delay_iallocator
     self.early_release = early_release
+    self.ignore_ipolicy = ignore_ipolicy
 
     # Runtime data
     self.instance = None
@@ -9795,6 +10532,26 @@ class TLReplaceDisks(Tasklet):
       if not self.disks:
         self.disks = range(len(self.instance.disks))
 
+    # TODO: This is ugly, but right now we can't distinguish between internal
+    # submitted opcode and external one. We should fix that.
+    if self.remote_node_info:
+      # We change the node, lets verify it still meets instance policy
+      new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group)
+      ipolicy = _CalculateGroupIPolicy(self.cfg.GetClusterInfo(),
+                                       new_group_info)
+      _CheckTargetNodeIPolicy(self, ipolicy, instance, self.remote_node_info,
+                              ignore=self.ignore_ipolicy)
+
+    # TODO: compute disk parameters
+    primary_node_info = self.cfg.GetNodeInfo(instance.primary_node)
+    secondary_node_info = self.cfg.GetNodeInfo(secondary_node)
+    if primary_node_info.group != secondary_node_info.group:
+      self.lu.LogInfo("The instance primary and secondary nodes are in two"
+                      " different node groups; the disk parameters of the"
+                      " primary node's group will be applied.")
+
+    self.diskparams = self.cfg.GetNodeGroup(primary_node_info.group).diskparams
+
     for node in check_nodes:
       _CheckNodeOnline(self.lu, node)
 
@@ -9803,8 +10560,9 @@ class TLReplaceDisks(Tasklet):
                                                           self.target_node]
                               if node_name is not None)
 
-    # Release unneeded node locks
+    # Release unneeded node and node resource locks
     _ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
+    _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes)
 
     # Release any owned node group
     if self.lu.glm.is_owned(locking.LEVEL_NODEGROUP):
@@ -9833,6 +10591,8 @@ class TLReplaceDisks(Tasklet):
       assert set(owned_nodes) == set(self.node_secondary_ip), \
           ("Incorrect node locks, owning %s, expected %s" %
            (owned_nodes, self.node_secondary_ip.keys()))
+      assert (self.lu.owned_locks(locking.LEVEL_NODE) ==
+              self.lu.owned_locks(locking.LEVEL_NODE_RES))
 
       owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE)
       assert list(owned_instances) == [self.instance_name], \
@@ -9848,7 +10608,7 @@ class TLReplaceDisks(Tasklet):
     feedback_fn("Replacing disk(s) %s for %s" %
                 (utils.CommaJoin(self.disks), self.instance.name))
 
-    activate_disks = (not self.instance.admin_up)
+    activate_disks = (self.instance.admin_state != constants.ADMINST_UP)
 
     # Activate the instance disks if we're replacing them on a down instance
     if activate_disks:
@@ -9868,9 +10628,11 @@ class TLReplaceDisks(Tasklet):
       if activate_disks:
         _SafeShutdownInstanceDisks(self.lu, self.instance)
 
+    assert not self.lu.owned_locks(locking.LEVEL_NODE)
+
     if __debug__:
       # Verify owned locks
-      owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE)
+      owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES)
       nodes = frozenset(self.node_secondary_ip)
       assert ((self.early_release and not owned_nodes) or
               (not self.early_release and not (set(owned_nodes) - nodes))), \
@@ -9949,12 +10711,14 @@ class TLReplaceDisks(Tasklet):
       lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
       names = _GenerateUniqueNames(self.lu, lv_names)
 
+      _, data_p, meta_p = _ComputeLDParams(constants.DT_DRBD8, self.diskparams)
+
       vg_data = dev.children[0].logical_id[0]
       lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
-                             logical_id=(vg_data, names[0]))
+                             logical_id=(vg_data, names[0]), params=data_p)
       vg_meta = dev.children[1].logical_id[0]
       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=DRBD_META_SIZE,
-                             logical_id=(vg_meta, names[1]))
+                             logical_id=(vg_meta, names[1]), params=meta_p)
 
       new_lvs = [lv_data, lv_meta]
       old_lvs = [child.Copy() for child in dev.children]
@@ -10105,21 +10869,28 @@ class TLReplaceDisks(Tasklet):
                                      "volumes"))
         raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
 
-    cstep = 5
+    cstep = itertools.count(5)
+
     if self.early_release:
-      self.lu.LogStep(cstep, steps_total, "Removing old storage")
-      cstep += 1
+      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
       self._RemoveOldStorage(self.target_node, iv_names)
-      # WARNING: we release both node locks here, do not do other RPCs
-      # than WaitForSync to the primary node
-      _ReleaseLocks(self.lu, locking.LEVEL_NODE,
-                    names=[self.target_node, self.other_node])
+      # TODO: Check if releasing locks early still makes sense
+      _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
+    else:
+      # Release all resource locks except those used by the instance
+      _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
+                    keep=self.node_secondary_ip.keys())
+
+    # Release all node locks while waiting for sync
+    _ReleaseLocks(self.lu, locking.LEVEL_NODE)
+
+    # TODO: Can the instance lock be downgraded here? Take the optional disk
+    # shutdown in the caller into consideration.
 
     # Wait for sync
     # This can fail as the old devices are degraded and _WaitForSync
     # does a combined result over all disks, so we don't check its return value
-    self.lu.LogStep(cstep, steps_total, "Sync devices")
-    cstep += 1
+    self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
     _WaitForSync(self.lu, self.instance)
 
     # Check all devices manually
@@ -10127,8 +10898,7 @@ class TLReplaceDisks(Tasklet):
 
     # Step: remove old storage
     if not self.early_release:
-      self.lu.LogStep(cstep, steps_total, "Removing old storage")
-      cstep += 1
+      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
       self._RemoveOldStorage(self.target_node, iv_names)
 
   def _ExecDrbd8Secondary(self, feedback_fn):
@@ -10205,10 +10975,12 @@ class TLReplaceDisks(Tasklet):
       iv_names[idx] = (dev, dev.children, new_net_id)
       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
                     new_net_id)
+      drbd_params, _, _ = _ComputeLDParams(constants.DT_DRBD8, self.diskparams)
       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
                               logical_id=new_alone_id,
                               children=dev.children,
-                              size=dev.size)
+                              size=dev.size,
+                              params=drbd_params)
       try:
         _CreateSingleBlockDev(self.lu, self.new_node, self.instance, new_drbd,
                               _GetInstanceInfoText(self.instance), False)
@@ -10247,6 +11019,9 @@ class TLReplaceDisks(Tasklet):
 
     self.cfg.Update(self.instance, feedback_fn)
 
+    # Release all node locks (the configuration has been updated)
+    _ReleaseLocks(self.lu, locking.LEVEL_NODE)
+
     # and now perform the drbd attach
     self.lu.LogInfo("Attaching primary drbds to new secondary"
                     " (standalone => connected)")
@@ -10263,23 +11038,26 @@ class TLReplaceDisks(Tasklet):
                            to_node, msg,
                            hint=("please do a gnt-instance info to see the"
                                  " status of disks"))
-    cstep = 5
+
+    cstep = itertools.count(5)
+
     if self.early_release:
-      self.lu.LogStep(cstep, steps_total, "Removing old storage")
-      cstep += 1
+      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
       self._RemoveOldStorage(self.target_node, iv_names)
-      # WARNING: we release all node locks here, do not do other RPCs
-      # than WaitForSync to the primary node
-      _ReleaseLocks(self.lu, locking.LEVEL_NODE,
-                    names=[self.instance.primary_node,
-                           self.target_node,
-                           self.new_node])
+      # TODO: Check if releasing locks early still makes sense
+      _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
+    else:
+      # Release all resource locks except those used by the instance
+      _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
+                    keep=self.node_secondary_ip.keys())
+
+    # TODO: Can the instance lock be downgraded here? Take the optional disk
+    # shutdown in the caller into consideration.
 
     # Wait for sync
     # This can fail as the old devices are degraded and _WaitForSync
     # does a combined result over all disks, so we don't check its return value
-    self.lu.LogStep(cstep, steps_total, "Sync devices")
-    cstep += 1
+    self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
     _WaitForSync(self.lu, self.instance)
 
     # Check all devices manually
@@ -10287,7 +11065,7 @@ class TLReplaceDisks(Tasklet):
 
     # Step: remove old storage
     if not self.early_release:
-      self.lu.LogStep(cstep, steps_total, "Removing old storage")
+      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
       self._RemoveOldStorage(self.target_node, iv_names)
 
 
@@ -10333,7 +11111,7 @@ class LURepairNodeStorage(NoHooksLU):
     """
     # Check whether any instance on this node has faulty disks
     for inst in _GetNodeInstances(self.cfg, self.op.node_name):
-      if not inst.admin_up:
+      if inst.admin_state != constants.ADMINST_UP:
         continue
       check_nodes = set(inst.all_nodes)
       check_nodes.discard(self.op.node_name)
@@ -10359,6 +11137,15 @@ class LUNodeEvacuate(NoHooksLU):
   """
   REQ_BGL = False
 
+  _MODE2IALLOCATOR = {
+    constants.NODE_EVAC_PRI: constants.IALLOCATOR_NEVAC_PRI,
+    constants.NODE_EVAC_SEC: constants.IALLOCATOR_NEVAC_SEC,
+    constants.NODE_EVAC_ALL: constants.IALLOCATOR_NEVAC_ALL,
+    }
+  assert frozenset(_MODE2IALLOCATOR.keys()) == constants.NODE_EVAC_MODES
+  assert (frozenset(_MODE2IALLOCATOR.values()) ==
+          constants.IALLOCATOR_NEVAC_MODES)
+
   def CheckArguments(self):
     _CheckIAllocatorOrNode(self, "iallocator", "remote_node")
 
@@ -10373,7 +11160,7 @@ class LUNodeEvacuate(NoHooksLU):
         raise errors.OpPrereqError("Can not use evacuated node as a new"
                                    " secondary node", errors.ECODE_INVAL)
 
-      if self.op.mode != constants.IALLOCATOR_NEVAC_SEC:
+      if self.op.mode != constants.NODE_EVAC_SEC:
         raise errors.OpPrereqError("Without the use of an iallocator only"
                                    " secondary instances can be evacuated",
                                    errors.ECODE_INVAL)
@@ -10386,6 +11173,14 @@ class LUNodeEvacuate(NoHooksLU):
       locking.LEVEL_NODE: [],
       }
 
+    # Determine nodes (via group) optimistically, needs verification once locks
+    # have been acquired
+    self.lock_nodes = self._DetermineNodes()
+
+  def _DetermineNodes(self):
+    """Gets the list of nodes to operate on.
+
+    """
     if self.op.remote_node is None:
       # Iallocator will choose any node(s) in the same group
       group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_name])
@@ -10393,26 +11188,34 @@ class LUNodeEvacuate(NoHooksLU):
       group_nodes = frozenset([self.op.remote_node])
 
     # Determine nodes to be locked
-    self.lock_nodes = set([self.op.node_name]) | group_nodes
+    return set([self.op.node_name]) | group_nodes
 
   def _DetermineInstances(self):
     """Builds list of instances to operate on.
 
     """
-    assert self.op.mode in constants.IALLOCATOR_NEVAC_MODES
+    assert self.op.mode in constants.NODE_EVAC_MODES
 
-    if self.op.mode == constants.IALLOCATOR_NEVAC_PRI:
+    if self.op.mode == constants.NODE_EVAC_PRI:
       # Primary instances only
       inst_fn = _GetNodePrimaryInstances
       assert self.op.remote_node is None, \
         "Evacuating primary instances requires iallocator"
-    elif self.op.mode == constants.IALLOCATOR_NEVAC_SEC:
+    elif self.op.mode == constants.NODE_EVAC_SEC:
       # Secondary instances only
       inst_fn = _GetNodeSecondaryInstances
     else:
       # All instances
-      assert self.op.mode == constants.IALLOCATOR_NEVAC_ALL
+      assert self.op.mode == constants.NODE_EVAC_ALL
       inst_fn = _GetNodeInstances
+      # TODO: In 2.6, change the iallocator interface to take an evacuation mode
+      # per instance
+      raise errors.OpPrereqError("Due to an issue with the iallocator"
+                                 " interface it is not possible to evacuate"
+                                 " all instances at once; specify explicitly"
+                                 " whether to evacuate primary or secondary"
+                                 " instances",
+                                 errors.ECODE_INVAL)
 
     return inst_fn(self.cfg, self.op.node_name)
 
@@ -10424,8 +11227,8 @@ class LUNodeEvacuate(NoHooksLU):
         set(i.name for i in self._DetermineInstances())
 
     elif level == locking.LEVEL_NODEGROUP:
-      # Lock node groups optimistically, needs verification once nodes have
-      # been acquired
+      # Lock node groups for all potential target nodes optimistically, needs
+      # verification once nodes have been acquired
       self.needed_locks[locking.LEVEL_NODEGROUP] = \
         self.cfg.GetNodeGroupsFromNodes(self.lock_nodes)
 
@@ -10438,12 +11241,23 @@ class LUNodeEvacuate(NoHooksLU):
     owned_nodes = self.owned_locks(locking.LEVEL_NODE)
     owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
 
-    assert owned_nodes == self.lock_nodes
+    need_nodes = self._DetermineNodes()
+
+    if not owned_nodes.issuperset(need_nodes):
+      raise errors.OpPrereqError("Nodes in same group as '%s' changed since"
+                                 " locks were acquired, current nodes are"
+                                 " are '%s', used to be '%s'; retry the"
+                                 " operation" %
+                                 (self.op.node_name,
+                                  utils.CommaJoin(need_nodes),
+                                  utils.CommaJoin(owned_nodes)),
+                                 errors.ECODE_STATE)
 
     wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes)
     if owned_groups != wanted_groups:
       raise errors.OpExecError("Node groups changed since locks were acquired,"
-                               " current groups are '%s', used to be '%s'" %
+                               " current groups are '%s', used to be '%s';"
+                               " retry the operation" %
                                (utils.CommaJoin(wanted_groups),
                                 utils.CommaJoin(owned_groups)))
 
@@ -10454,7 +11268,7 @@ class LUNodeEvacuate(NoHooksLU):
     if set(self.instance_names) != owned_instances:
       raise errors.OpExecError("Instances on node '%s' changed since locks"
                                " were acquired, current instances are '%s',"
-                               " used to be '%s'" %
+                               " used to be '%s'; retry the operation" %
                                (self.op.node_name,
                                 utils.CommaJoin(self.instance_names),
                                 utils.CommaJoin(owned_instances)))
@@ -10486,7 +11300,7 @@ class LUNodeEvacuate(NoHooksLU):
     elif self.op.iallocator is not None:
       # TODO: Implement relocation to other group
       ial = IAllocator(self.cfg, self.rpc, constants.IALLOCATOR_MODE_NODE_EVAC,
-                       evac_mode=self.op.mode,
+                       evac_mode=self._MODE2IALLOCATOR[self.op.mode],
                        instances=list(self.instance_names))
 
       ial.Run(self.op.iallocator)
@@ -10500,7 +11314,7 @@ class LUNodeEvacuate(NoHooksLU):
       jobs = _LoadNodeEvacResult(self, ial.result, self.op.early_release, True)
 
     elif self.op.remote_node is not None:
-      assert self.op.mode == constants.IALLOCATOR_NEVAC_SEC
+      assert self.op.mode == constants.NODE_EVAC_SEC
       jobs = [
         [opcodes.OpInstanceReplaceDisks(instance_name=instance_name,
                                         remote_node=self.op.remote_node,
@@ -10557,9 +11371,10 @@ def _LoadNodeEvacResult(lu, alloc_result, early_release, use_nodes):
   (moved, failed, jobs) = alloc_result
 
   if failed:
-    lu.LogWarning("Unable to evacuate instances %s",
-                  utils.CommaJoin("%s (%s)" % (name, reason)
-                                  for (name, reason) in failed))
+    failreason = utils.CommaJoin("%s (%s)" % (name, reason)
+                                 for (name, reason) in failed)
+    lu.LogWarning("Unable to evacuate instances %s", failreason)
+    raise errors.OpExecError("Unable to evacuate instances %s" % failreason)
 
   if moved:
     lu.LogInfo("Instances to be moved: %s",
@@ -10584,6 +11399,7 @@ class LUInstanceGrowDisk(LogicalUnit):
     self._ExpandAndLockInstance()
     self.needed_locks[locking.LEVEL_NODE] = []
     self.needed_locks[locking.LEVEL_NODE_RES] = []
+    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
     self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
 
   def DeclareLocks(self, level):
@@ -10636,7 +11452,8 @@ class LUInstanceGrowDisk(LogicalUnit):
     self.disk = instance.FindDisk(self.op.disk)
 
     if instance.disk_template not in (constants.DT_FILE,
-                                      constants.DT_SHARED_FILE):
+                                      constants.DT_SHARED_FILE,
+                                      constants.DT_RBD):
       # TODO: check the free disk space for file, when that feature will be
       # supported
       _CheckNodesFreeDiskPerVG(self, nodenames,
@@ -10695,9 +11512,9 @@ class LUInstanceGrowDisk(LogicalUnit):
       if disk_abort:
         self.proc.LogWarning("Disk sync-ing has not returned a good"
                              " status; please check the instance")
-      if not instance.admin_up:
+      if instance.admin_state != constants.ADMINST_UP:
         _SafeShutdownInstanceDisks(self, instance, disks=[disk])
-    elif not instance.admin_up:
+    elif instance.admin_state != constants.ADMINST_UP:
       self.proc.LogWarning("Not shutting down the disk even if the instance is"
                            " not supposed to be running because no wait for"
                            " sync mode was requested")
@@ -10836,19 +11653,17 @@ class LUInstanceQueryData(NoHooksLU):
         if remote_info and "state" in remote_info:
           remote_state = "up"
         else:
-          remote_state = "down"
-
-      if instance.admin_up:
-        config_state = "up"
-      else:
-        config_state = "down"
+          if instance.admin_state == constants.ADMINST_UP:
+            remote_state = "down"
+          else:
+            remote_state = instance.admin_state
 
       disks = map(compat.partial(self._ComputeDiskStatus, instance, None),
                   instance.disks)
 
       result[instance.name] = {
         "name": instance.name,
-        "config_state": config_state,
+        "config_state": instance.admin_state,
         "run_state": remote_state,
         "pnode": instance.primary_node,
         "snodes": instance.secondary_nodes,
@@ -10884,7 +11699,9 @@ class LUInstanceSetParams(LogicalUnit):
 
   def CheckArguments(self):
     if not (self.op.nics or self.op.disks or self.op.disk_template or
-            self.op.hvparams or self.op.beparams or self.op.os_name):
+            self.op.hvparams or self.op.beparams or self.op.os_name or
+            self.op.online_inst or self.op.offline_inst or
+            self.op.runtime_mem):
       raise errors.OpPrereqError("No changes submitted", errors.ECODE_INVAL)
 
     if self.op.hvparams:
@@ -11000,7 +11817,10 @@ class LUInstanceSetParams(LogicalUnit):
 
   def ExpandNames(self):
     self._ExpandAndLockInstance()
+    # Can't even acquire node locks in shared mode as upcoming changes in
+    # Ganeti 2.6 will start to modify the node object on disk conversion
     self.needed_locks[locking.LEVEL_NODE] = []
+    self.needed_locks[locking.LEVEL_NODE_RES] = []
     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
 
   def DeclareLocks(self, level):
@@ -11009,6 +11829,10 @@ class LUInstanceSetParams(LogicalUnit):
       if self.op.disk_template and self.op.remote_node:
         self.op.remote_node = _ExpandNodeName(self.cfg, self.op.remote_node)
         self.needed_locks[locking.LEVEL_NODE].append(self.op.remote_node)
+    elif level == locking.LEVEL_NODE_RES and self.op.disk_template:
+      # Copy node locks
+      self.needed_locks[locking.LEVEL_NODE_RES] = \
+        self.needed_locks[locking.LEVEL_NODE][:]
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -11017,8 +11841,10 @@ class LUInstanceSetParams(LogicalUnit):
 
     """
     args = dict()
-    if constants.BE_MEMORY in self.be_new:
-      args["memory"] = self.be_new[constants.BE_MEMORY]
+    if constants.BE_MINMEM in self.be_new:
+      args["minmem"] = self.be_new[constants.BE_MINMEM]
+    if constants.BE_MAXMEM in self.be_new:
+      args["maxmem"] = self.be_new[constants.BE_MAXMEM]
     if constants.BE_VCPUS in self.be_new:
       args["vcpus"] = self.be_new[constants.BE_VCPUS]
     # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
@@ -11059,6 +11885,8 @@ class LUInstanceSetParams(LogicalUnit):
     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
     if self.op.disk_template:
       env["NEW_DISK_TEMPLATE"] = self.op.disk_template
+    if self.op.runtime_mem:
+      env["RUNTIME_MEMORY"] = self.op.runtime_mem
 
     return env
 
@@ -11083,6 +11911,8 @@ class LUInstanceSetParams(LogicalUnit):
       "Cannot retrieve locked instance %s" % self.op.instance_name
     pnode = instance.primary_node
     nodelist = list(instance.all_nodes)
+    pnode_info = self.cfg.GetNodeInfo(pnode)
+    self.diskparams = self.cfg.GetNodeGroup(pnode_info.group).diskparams
 
     # OS change
     if self.op.os_name and not self.op.force:
@@ -11103,7 +11933,8 @@ class LUInstanceSetParams(LogicalUnit):
                                    " %s to %s" % (instance.disk_template,
                                                   self.op.disk_template),
                                    errors.ECODE_INVAL)
-      _CheckInstanceDown(self, instance, "cannot change disk template")
+      _CheckInstanceState(self, instance, INSTANCE_DOWN,
+                          msg="cannot change disk template")
       if self.op.disk_template in constants.DTS_INT_MIRROR:
         if self.op.remote_node == pnode:
           raise errors.OpPrereqError("Given new secondary node %s is the same"
@@ -11119,6 +11950,17 @@ class LUInstanceSetParams(LogicalUnit):
         required = _ComputeDiskSizePerVG(self.op.disk_template, disks)
         _CheckNodesFreeDiskPerVG(self, [self.op.remote_node], required)
 
+        snode_info = self.cfg.GetNodeInfo(self.op.remote_node)
+        snode_group = self.cfg.GetNodeGroup(snode_info.group)
+        ipolicy = _CalculateGroupIPolicy(cluster, snode_group)
+        _CheckTargetNodeIPolicy(self, ipolicy, instance, snode_info,
+                                ignore=self.op.ignore_ipolicy)
+        if pnode_info.group != snode_info.group:
+          self.LogWarning("The primary and secondary nodes are in two"
+                          " different node groups; the disk parameters"
+                          " from the first disk's node group will be"
+                          " used")
+
     # hvparams processing
     if self.op.hvparams:
       hv_type = instance.hypervisor
@@ -11140,6 +11982,7 @@ class LUInstanceSetParams(LogicalUnit):
     if self.op.beparams:
       i_bedict = _GetUpdatedParams(instance.beparams, self.op.beparams,
                                    use_none=True)
+      objects.UpgradeBeParams(i_bedict)
       utils.ForceDictType(i_bedict, constants.BES_PARAMETER_TYPES)
       be_new = cluster.SimpleFillBE(i_bedict)
       self.be_proposed = self.be_new = be_new # the new actual values
@@ -11186,8 +12029,9 @@ class LUInstanceSetParams(LogicalUnit):
 
     self.warn = []
 
-    if (constants.BE_MEMORY in self.op.beparams and not self.op.force and
-        be_new[constants.BE_MEMORY] > be_old[constants.BE_MEMORY]):
+    #TODO(dynmem): do the appropriate check involving MINMEM
+    if (constants.BE_MAXMEM in self.op.beparams and not self.op.force and
+        be_new[constants.BE_MAXMEM] > be_old[constants.BE_MAXMEM]):
       mem_check_list = [pnode]
       if be_new[constants.BE_AUTO_BALANCE]:
         # either we changed auto_balance to yes or it was from before
@@ -11195,34 +12039,39 @@ class LUInstanceSetParams(LogicalUnit):
       instance_info = self.rpc.call_instance_info(pnode, instance.name,
                                                   instance.hypervisor)
       nodeinfo = self.rpc.call_node_info(mem_check_list, None,
-                                         instance.hypervisor)
+                                         [instance.hypervisor])
       pninfo = nodeinfo[pnode]
       msg = pninfo.fail_msg
       if msg:
         # Assume the primary node is unreachable and go ahead
         self.warn.append("Can't get info from primary node %s: %s" %
                          (pnode, msg))
-      elif not isinstance(pninfo.payload.get("memory_free", None), int):
-        self.warn.append("Node data from primary node %s doesn't contain"
-                         " free memory information" % pnode)
-      elif instance_info.fail_msg:
-        self.warn.append("Can't get instance runtime information: %s" %
-                        instance_info.fail_msg)
       else:
-        if instance_info.payload:
-          current_mem = int(instance_info.payload["memory"])
+        (_, _, (pnhvinfo, )) = pninfo.payload
+        if not isinstance(pnhvinfo.get("memory_free", None), int):
+          self.warn.append("Node data from primary node %s doesn't contain"
+                           " free memory information" % pnode)
+        elif instance_info.fail_msg:
+          self.warn.append("Can't get instance runtime information: %s" %
+                          instance_info.fail_msg)
         else:
-          # Assume instance not running
-          # (there is a slight race condition here, but it's not very probable,
-          # and we have no other way to check)
-          current_mem = 0
-        miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
-                    pninfo.payload["memory_free"])
-        if miss_mem > 0:
-          raise errors.OpPrereqError("This change will prevent the instance"
-                                     " from starting, due to %d MB of memory"
-                                     " missing on its primary node" % miss_mem,
-                                     errors.ECODE_NORES)
+          if instance_info.payload:
+            current_mem = int(instance_info.payload["memory"])
+          else:
+            # Assume instance not running
+            # (there is a slight race condition here, but it's not very
+            # probable, and we have no other way to check)
+            # TODO: Describe race condition
+            current_mem = 0
+          #TODO(dynmem): do the appropriate check involving MINMEM
+          miss_mem = (be_new[constants.BE_MAXMEM] - current_mem -
+                      pnhvinfo["memory_free"])
+          if miss_mem > 0:
+            raise errors.OpPrereqError("This change will prevent the instance"
+                                       " from starting, due to %d MB of memory"
+                                       " missing on its primary node" %
+                                       miss_mem,
+                                       errors.ECODE_NORES)
 
       if be_new[constants.BE_AUTO_BALANCE]:
         for node, nres in nodeinfo.items():
@@ -11230,16 +12079,45 @@ class LUInstanceSetParams(LogicalUnit):
             continue
           nres.Raise("Can't get info from secondary node %s" % node,
                      prereq=True, ecode=errors.ECODE_STATE)
-          if not isinstance(nres.payload.get("memory_free", None), int):
+          (_, _, (nhvinfo, )) = nres.payload
+          if not isinstance(nhvinfo.get("memory_free", None), int):
             raise errors.OpPrereqError("Secondary node %s didn't return free"
                                        " memory information" % node,
                                        errors.ECODE_STATE)
-          elif be_new[constants.BE_MEMORY] > nres.payload["memory_free"]:
+          #TODO(dynmem): do the appropriate check involving MINMEM
+          elif be_new[constants.BE_MAXMEM] > nhvinfo["memory_free"]:
             raise errors.OpPrereqError("This change will prevent the instance"
                                        " from failover to its secondary node"
                                        " %s, due to not enough memory" % node,
                                        errors.ECODE_STATE)
 
+    if self.op.runtime_mem:
+      remote_info = self.rpc.call_instance_info(instance.primary_node,
+                                                instance.name,
+                                                instance.hypervisor)
+      remote_info.Raise("Error checking node %s" % instance.primary_node)
+      if not remote_info.payload: # not running already
+        raise errors.OpPrereqError("Instance %s is not running" % instance.name,
+                                   errors.ECODE_STATE)
+
+      current_memory = remote_info.payload["memory"]
+      if (not self.op.force and
+           (self.op.runtime_mem > self.be_proposed[constants.BE_MAXMEM] or
+            self.op.runtime_mem < self.be_proposed[constants.BE_MINMEM])):
+        raise errors.OpPrereqError("Instance %s must have memory between %d"
+                                   " and %d MB of memory unless --force is"
+                                   " given" % (instance.name,
+                                    self.be_proposed[constants.BE_MINMEM],
+                                    self.be_proposed[constants.BE_MAXMEM]),
+                                   errors.ECODE_INVAL)
+
+      if self.op.runtime_mem > current_memory:
+        _CheckNodeFreeMemory(self, instance.primary_node,
+                             "ballooning memory for instance %s" %
+                             instance.name,
+                             self.op.memory - current_memory,
+                             instance.hypervisor)
+
     # NIC processing
     self.nic_pnew = {}
     self.nic_pinst = {}
@@ -11327,7 +12205,8 @@ class LUInstanceSetParams(LogicalUnit):
         if len(instance.disks) == 1:
           raise errors.OpPrereqError("Cannot remove the last disk of"
                                      " an instance", errors.ECODE_INVAL)
-        _CheckInstanceDown(self, instance, "cannot remove disks")
+        _CheckInstanceState(self, instance, INSTANCE_DOWN,
+                            msg="cannot remove disks")
 
       if (disk_op == constants.DDM_ADD and
           len(instance.disks) >= constants.MAX_DISKS):
@@ -11342,7 +12221,15 @@ class LUInstanceSetParams(LogicalUnit):
                                      (disk_op, len(instance.disks)),
                                      errors.ECODE_INVAL)
 
-    return
+    # disabling the instance
+    if self.op.offline_inst:
+      _CheckInstanceState(self, instance, INSTANCE_DOWN,
+                          msg="cannot change instance state to offline")
+
+    # enabling the instance
+    if self.op.online_inst:
+      _CheckInstanceState(self, instance, INSTANCE_OFFLINE,
+                          msg="cannot make instance go online")
 
   def _ConvertPlainToDrbd(self, feedback_fn):
     """Converts an instance from plain to drbd.
@@ -11353,13 +12240,16 @@ class LUInstanceSetParams(LogicalUnit):
     pnode = instance.primary_node
     snode = self.op.remote_node
 
+    assert instance.disk_template == constants.DT_PLAIN
+
     # create a fake disk info for _GenerateDiskTemplate
     disk_info = [{constants.IDISK_SIZE: d.size, constants.IDISK_MODE: d.mode,
                   constants.IDISK_VG: d.logical_id[0]}
                  for d in instance.disks]
     new_disks = _GenerateDiskTemplate(self, self.op.disk_template,
                                       instance.name, pnode, [snode],
-                                      disk_info, None, None, 0, feedback_fn)
+                                      disk_info, None, None, 0, feedback_fn,
+                                      self.diskparams)
     info = _GetInstanceInfoText(instance)
     feedback_fn("Creating aditional volumes...")
     # first, create the missing data and meta devices
@@ -11389,6 +12279,9 @@ class LUInstanceSetParams(LogicalUnit):
     instance.disks = new_disks
     self.cfg.Update(instance, feedback_fn)
 
+    # Release node locks while waiting for sync
+    _ReleaseLocks(self, locking.LEVEL_NODE)
+
     # disks are created, waiting for sync
     disk_abort = not _WaitForSync(self, instance,
                                   oneshot=not self.op.wait_for_sync)
@@ -11396,12 +12289,17 @@ class LUInstanceSetParams(LogicalUnit):
       raise errors.OpExecError("There are some degraded disks for"
                                " this instance, please cleanup manually")
 
+    # Node resource locks will be released by caller
+
   def _ConvertDrbdToPlain(self, feedback_fn):
     """Converts an instance from drbd to plain.
 
     """
     instance = self.instance
+
     assert len(instance.secondary_nodes) == 1
+    assert instance.disk_template == constants.DT_DRBD8
+
     pnode = instance.primary_node
     snode = instance.secondary_nodes[0]
     feedback_fn("Converting template to plain")
@@ -11419,6 +12317,9 @@ class LUInstanceSetParams(LogicalUnit):
     instance.disk_template = constants.DT_PLAIN
     self.cfg.Update(instance, feedback_fn)
 
+    # Release locks in case removing disks takes a while
+    _ReleaseLocks(self, locking.LEVEL_NODE)
+
     feedback_fn("Removing volumes on the secondary node...")
     for disk in old_disks:
       self.cfg.SetDiskID(disk, snode)
@@ -11436,6 +12337,13 @@ class LUInstanceSetParams(LogicalUnit):
         self.LogWarning("Could not remove metadata for disk %d on node %s,"
                         " continuing anyway: %s", idx, pnode, msg)
 
+    # this is a DRBD disk, return its port to the pool
+    for disk in old_disks:
+      tcp_port = disk.logical_id[2]
+      self.cfg.AddTcpUdpPort(tcp_port)
+
+    # Node resource locks will be released by caller
+
   def Exec(self, feedback_fn):
     """Modifies an instance.
 
@@ -11447,8 +12355,21 @@ class LUInstanceSetParams(LogicalUnit):
     for warn in self.warn:
       feedback_fn("WARNING: %s" % warn)
 
+    assert ((self.op.disk_template is None) ^
+            bool(self.owned_locks(locking.LEVEL_NODE_RES))), \
+      "Not owning any node resource locks"
+
     result = []
     instance = self.instance
+
+    # runtime memory
+    if self.op.runtime_mem:
+      rpcres = self.rpc.call_instance_balloon_memory(instance.primary_node,
+                                                     instance,
+                                                     self.op.runtime_mem)
+      rpcres.Raise("Cannot modify instance runtime memory")
+      result.append(("runtime_memory", self.op.runtime_mem))
+
     # disk changes
     for disk_op, disk_dict in self.op.disks:
       if disk_op == constants.DDM_REMOVE:
@@ -11462,6 +12383,11 @@ class LUInstanceSetParams(LogicalUnit):
             self.LogWarning("Could not remove disk/%d on node %s: %s,"
                             " continuing anyway", device_idx, node, msg)
         result.append(("disk/%d" % device_idx, "remove"))
+
+        # if this is a DRBD disk, return its port to the pool
+        if device.dev_type in constants.LDS_DRBD:
+          tcp_port = device.logical_id[2]
+          self.cfg.AddTcpUdpPort(tcp_port)
       elif disk_op == constants.DDM_ADD:
         # add a new disk
         if instance.disk_template in (constants.DT_FILE,
@@ -11478,7 +12404,9 @@ class LUInstanceSetParams(LogicalUnit):
                                          [disk_dict],
                                          file_path,
                                          file_driver,
-                                         disk_idx_base, feedback_fn)[0]
+                                         disk_idx_base,
+                                         feedback_fn,
+                                         self.diskparams)[0]
         instance.disks.append(new_disk)
         info = _GetInstanceInfoText(instance)
 
@@ -11504,6 +12432,16 @@ class LUInstanceSetParams(LogicalUnit):
                        disk_dict[constants.IDISK_MODE]))
 
     if self.op.disk_template:
+      if __debug__:
+        check_nodes = set(instance.all_nodes)
+        if self.op.remote_node:
+          check_nodes.add(self.op.remote_node)
+        for level in [locking.LEVEL_NODE, locking.LEVEL_NODE_RES]:
+          owned = self.owned_locks(level)
+          assert not (check_nodes - owned), \
+            ("Not owning the correct locks, owning %r, expected at least %r" %
+             (owned, check_nodes))
+
       r_shut = _ShutdownInstanceDisks(self, instance)
       if not r_shut:
         raise errors.OpExecError("Cannot shutdown instance disks, unable to"
@@ -11516,6 +12454,15 @@ class LUInstanceSetParams(LogicalUnit):
         raise
       result.append(("disk_template", self.op.disk_template))
 
+      assert instance.disk_template == self.op.disk_template, \
+        ("Expected disk template '%s', found '%s'" %
+         (self.op.disk_template, instance.disk_template))
+
+    # Release node and resource locks if there are any (they might already have
+    # been released during disk conversion)
+    _ReleaseLocks(self, locking.LEVEL_NODE)
+    _ReleaseLocks(self, locking.LEVEL_NODE_RES)
+
     # NIC changes
     for nic_op, nic_dict in self.op.nics:
       if nic_op == constants.DDM_REMOVE:
@@ -11566,8 +12513,20 @@ class LUInstanceSetParams(LogicalUnit):
       for key, val in self.op.osparams.iteritems():
         result.append(("os/%s" % key, val))
 
+    # online/offline instance
+    if self.op.online_inst:
+      self.cfg.MarkInstanceDown(instance.name)
+      result.append(("admin_state", constants.ADMINST_DOWN))
+    if self.op.offline_inst:
+      self.cfg.MarkInstanceOffline(instance.name)
+      result.append(("admin_state", constants.ADMINST_OFFLINE))
+
     self.cfg.Update(instance, feedback_fn)
 
+    assert not (self.owned_locks(locking.LEVEL_NODE_RES) or
+                self.owned_locks(locking.LEVEL_NODE)), \
+      "All node locks should have been released by now"
+
     return result
 
   _DISK_CONVERSIONS = {
@@ -11890,7 +12849,8 @@ class LUBackupExport(LogicalUnit):
           "Cannot retrieve locked instance %s" % self.op.instance_name
     _CheckNodeOnline(self, self.instance.primary_node)
 
-    if (self.op.remove_instance and self.instance.admin_up and
+    if (self.op.remove_instance and
+        self.instance.admin_state == constants.ADMINST_UP and
         not self.op.shutdown):
       raise errors.OpPrereqError("Can not remove instance without shutting it"
                                  " down before")
@@ -12020,7 +12980,7 @@ class LUBackupExport(LogicalUnit):
     for disk in instance.disks:
       self.cfg.SetDiskID(disk, src_node)
 
-    activate_disks = (not instance.admin_up)
+    activate_disks = (instance.admin_state != constants.ADMINST_UP)
 
     if activate_disks:
       # Activate the instance disks if we'exporting a stopped instance
@@ -12033,7 +12993,8 @@ class LUBackupExport(LogicalUnit):
 
       helper.CreateSnapshots()
       try:
-        if (self.op.shutdown and instance.admin_up and
+        if (self.op.shutdown and
+            instance.admin_state == constants.ADMINST_UP and
             not self.op.remove_instance):
           assert not activate_disks
           feedback_fn("Starting instance %s" % instance.name)
@@ -12182,6 +13143,33 @@ class LUGroupAdd(LogicalUnit):
     if self.op.ndparams:
       utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
 
+    if self.op.hv_state:
+      self.new_hv_state = _MergeAndVerifyHvState(self.op.hv_state, None)
+    else:
+      self.new_hv_state = None
+
+    if self.op.disk_state:
+      self.new_disk_state = _MergeAndVerifyDiskState(self.op.disk_state, None)
+    else:
+      self.new_disk_state = None
+
+    if self.op.diskparams:
+      for templ in constants.DISK_TEMPLATES:
+        if templ not in self.op.diskparams:
+          self.op.diskparams[templ] = {}
+        utils.ForceDictType(self.op.diskparams[templ], constants.DISK_DT_TYPES)
+    else:
+      self.op.diskparams = self.cfg.GetClusterInfo().diskparams
+
+    if self.op.ipolicy:
+      cluster = self.cfg.GetClusterInfo()
+      full_ipolicy = cluster.SimpleFillIPolicy(self.op.ipolicy)
+      try:
+        objects.InstancePolicy.CheckParameterSyntax(full_ipolicy)
+      except errors.ConfigurationError, err:
+        raise errors.OpPrereqError("Invalid instance policy: %s" % err,
+                                   errors.ECODE_INVAL)
+
   def BuildHooksEnv(self):
     """Build hooks env.
 
@@ -12204,7 +13192,11 @@ class LUGroupAdd(LogicalUnit):
     group_obj = objects.NodeGroup(name=self.op.group_name, members=[],
                                   uuid=self.group_uuid,
                                   alloc_policy=self.op.alloc_policy,
-                                  ndparams=self.op.ndparams)
+                                  ndparams=self.op.ndparams,
+                                  diskparams=self.op.diskparams,
+                                  ipolicy=self.op.ipolicy,
+                                  hv_state_static=self.new_hv_state,
+                                  disk_state_static=self.new_disk_state)
 
     self.cfg.AddNodeGroup(group_obj, self.proc.GetECId(), check_uuid=False)
     del self.remove_locks[locking.LEVEL_NODEGROUP]
@@ -12289,13 +13281,9 @@ class LUGroupAssignNodes(NoHooksLU):
     """Assign nodes to a new group.
 
     """
-    for node in self.op.nodes:
-      self.node_data[node].group = self.group_uuid
-
-    # FIXME: Depends on side-effects of modifying the result of
-    # C{cfg.GetAllNodesInfo}
+    mods = [(node_name, self.group_uuid) for node_name in self.op.nodes]
 
-    self.cfg.Update(self.group, feedback_fn) # Saves all modified nodes.
+    self.cfg.AssignGroupNodes(mods)
 
   @staticmethod
   def CheckAssignmentForSplitInstances(changes, node_data, instance_data):
@@ -12354,6 +13342,7 @@ class _GroupQuery(_QueryBase):
     lu.needed_locks = {}
 
     self._all_groups = lu.cfg.GetAllNodeGroupsInfo()
+    self._cluster = lu.cfg.GetClusterInfo()
     name_to_uuid = dict((g.name, g.uuid) for g in self._all_groups.values())
 
     if not self.names:
@@ -12419,7 +13408,8 @@ class _GroupQuery(_QueryBase):
           # Do not pass on node information if it was not requested.
           group_to_nodes = None
 
-    return query.GroupQueryData([self._all_groups[uuid]
+    return query.GroupQueryData(self._cluster,
+                                [self._all_groups[uuid]
                                  for uuid in self.wanted],
                                 group_to_nodes, group_to_instances)
 
@@ -12455,7 +13445,11 @@ class LUGroupSetParams(LogicalUnit):
   def CheckArguments(self):
     all_changes = [
       self.op.ndparams,
+      self.op.diskparams,
       self.op.alloc_policy,
+      self.op.hv_state,
+      self.op.disk_state,
+      self.op.ipolicy,
       ]
 
     if all_changes.count(None) == len(all_changes):
@@ -12467,14 +13461,32 @@ class LUGroupSetParams(LogicalUnit):
     self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
 
     self.needed_locks = {
+      locking.LEVEL_INSTANCE: [],
       locking.LEVEL_NODEGROUP: [self.group_uuid],
       }
 
+    self.share_locks[locking.LEVEL_INSTANCE] = 1
+
+  def DeclareLocks(self, level):
+    if level == locking.LEVEL_INSTANCE:
+      assert not self.needed_locks[locking.LEVEL_INSTANCE]
+
+      # Lock instances optimistically, needs verification once group lock has
+      # been acquired
+      self.needed_locks[locking.LEVEL_INSTANCE] = \
+          self.cfg.GetNodeGroupInstances(self.group_uuid)
+
   def CheckPrereq(self):
     """Check prerequisites.
 
     """
+    owned_instances = frozenset(self.owned_locks(locking.LEVEL_INSTANCE))
+
+    # Check if locked instances are still correct
+    _CheckNodeGroupInstances(self.cfg, self.group_uuid, owned_instances)
+
     self.group = self.cfg.GetNodeGroup(self.group_uuid)
+    cluster = self.cfg.GetClusterInfo()
 
     if self.group is None:
       raise errors.OpExecError("Could not retrieve group '%s' (UUID: %s)" %
@@ -12485,6 +13497,43 @@ class LUGroupSetParams(LogicalUnit):
       utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
       self.new_ndparams = new_ndparams
 
+    if self.op.diskparams:
+      self.new_diskparams = dict()
+      for templ in constants.DISK_TEMPLATES:
+        if templ not in self.op.diskparams:
+          self.op.diskparams[templ] = {}
+        new_templ_params = _GetUpdatedParams(self.group.diskparams[templ],
+                                             self.op.diskparams[templ])
+        utils.ForceDictType(new_templ_params, constants.DISK_DT_TYPES)
+        self.new_diskparams[templ] = new_templ_params
+
+    if self.op.hv_state:
+      self.new_hv_state = _MergeAndVerifyHvState(self.op.hv_state,
+                                                 self.group.hv_state_static)
+
+    if self.op.disk_state:
+      self.new_disk_state = \
+        _MergeAndVerifyDiskState(self.op.disk_state,
+                                 self.group.disk_state_static)
+
+    if self.op.ipolicy:
+      self.new_ipolicy = _GetUpdatedIPolicy(self.group.ipolicy,
+                                            self.op.ipolicy,
+                                            group_policy=True)
+
+      new_ipolicy = cluster.SimpleFillIPolicy(self.new_ipolicy)
+      inst_filter = lambda inst: inst.name in owned_instances
+      instances = self.cfg.GetInstancesInfoByFilter(inst_filter).values()
+      violations = \
+          _ComputeNewInstanceViolations(_CalculateGroupIPolicy(cluster,
+                                                               self.group),
+                                        new_ipolicy, instances)
+
+      if violations:
+        self.LogWarning("After the ipolicy change the following instances"
+                        " violate them: %s",
+                        utils.CommaJoin(violations))
+
   def BuildHooksEnv(self):
     """Build hooks env.
 
@@ -12511,9 +13560,22 @@ class LUGroupSetParams(LogicalUnit):
       self.group.ndparams = self.new_ndparams
       result.append(("ndparams", str(self.group.ndparams)))
 
+    if self.op.diskparams:
+      self.group.diskparams = self.new_diskparams
+      result.append(("diskparams", str(self.group.diskparams)))
+
     if self.op.alloc_policy:
       self.group.alloc_policy = self.op.alloc_policy
 
+    if self.op.hv_state:
+      self.group.hv_state_static = self.new_hv_state
+
+    if self.op.disk_state:
+      self.group.disk_state_static = self.new_disk_state
+
+    if self.op.ipolicy:
+      self.group.ipolicy = self.new_ipolicy
+
     self.cfg.Update(self.group, feedback_fn)
     return result
 
@@ -13238,10 +14300,10 @@ class IAllocator(object):
     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
       hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
     else:
-      hypervisor_name = cluster_info.enabled_hypervisors[0]
+      hypervisor_name = cluster_info.primary_hypervisor
 
-    node_data = self.rpc.call_node_info(node_list, cfg.GetVGName(),
-                                        hypervisor_name)
+    node_data = self.rpc.call_node_info(node_list, [cfg.GetVGName()],
+                                        [hypervisor_name])
     node_iinfo = \
       self.rpc.call_all_instances_info(node_list,
                                        cluster_info.enabled_hypervisors)
@@ -13263,9 +14325,11 @@ class IAllocator(object):
     """Compute node groups data.
 
     """
+    cluster = cfg.GetClusterInfo()
     ng = dict((guuid, {
       "name": gdata.name,
       "alloc_policy": gdata.alloc_policy,
+      "ipolicy": _CalculateGroupIPolicy(cluster, gdata),
       })
       for guuid, gdata in cfg.GetAllNodeGroupsInfo().items())
 
@@ -13303,6 +14367,7 @@ class IAllocator(object):
     @param node_results: the basic node structures as filled from the config
 
     """
+    #TODO(dynmem): compute the right data on MAX and MIN memory
     # make a copy of the current dict
     node_results = dict(node_results)
     for nname, nresult in node_data.items():
@@ -13313,7 +14378,7 @@ class IAllocator(object):
         nresult.Raise("Can't get data for node %s" % nname)
         node_iinfo[nname].Raise("Can't get node instance info from node %s" %
                                 nname)
-        remote_info = nresult.payload
+        remote_info = _MakeLegacyNodeInfo(nresult.payload)
 
         for attr in ["memory_total", "memory_free", "memory_dom0",
                      "vg_size", "vg_free", "cpu_total"]:
@@ -13328,16 +14393,16 @@ class IAllocator(object):
         i_p_mem = i_p_up_mem = 0
         for iinfo, beinfo in i_list:
           if iinfo.primary_node == nname:
-            i_p_mem += beinfo[constants.BE_MEMORY]
+            i_p_mem += beinfo[constants.BE_MAXMEM]
             if iinfo.name not in node_iinfo[nname].payload:
               i_used_mem = 0
             else:
               i_used_mem = int(node_iinfo[nname].payload[iinfo.name]["memory"])
-            i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
+            i_mem_diff = beinfo[constants.BE_MAXMEM] - i_used_mem
             remote_info["memory_free"] -= max(0, i_mem_diff)
 
-            if iinfo.admin_up:
-              i_p_up_mem += beinfo[constants.BE_MEMORY]
+            if iinfo.admin_state == constants.ADMINST_UP:
+              i_p_up_mem += beinfo[constants.BE_MAXMEM]
 
         # compute memory used by instances
         pnr_dyn = {
@@ -13376,9 +14441,9 @@ class IAllocator(object):
         nic_data.append(nic_dict)
       pir = {
         "tags": list(iinfo.GetTags()),
-        "admin_up": iinfo.admin_up,
+        "admin_state": iinfo.admin_state,
         "vcpus": beinfo[constants.BE_VCPUS],
-        "memory": beinfo[constants.BE_MEMORY],
+        "memory": beinfo[constants.BE_MAXMEM],
         "os": iinfo.os,
         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
         "nics": nic_data,