Move Ipolicy utility function to cmdlib/common.py
[ganeti-local] / lib / cmdlib / common.py
index 2d675b8..73132c2 100644 (file)
@@ -48,148 +48,167 @@ CAN_CHANGE_INSTANCE_OFFLINE = (frozenset(INSTANCE_DOWN) | frozenset([
   ]))
 
 
-def _ExpandItemName(fn, name, kind):
+def _ExpandItemName(expand_fn, name, kind):
   """Expand an item name.
 
-  @param fn: the function to use for expansion
+  @param expand_fn: the function to use for expansion
   @param name: requested item name
   @param kind: text description ('Node' or 'Instance')
-  @return: the resolved (full) name
+  @return: the result of the expand_fn, if successful
   @raise errors.OpPrereqError: if the item is not found
 
   """
-  full_name = fn(name)
-  if full_name is None:
+  (uuid, full_name) = expand_fn(name)
+  if uuid is None or full_name is None:
     raise errors.OpPrereqError("%s '%s' not known" % (kind, name),
                                errors.ECODE_NOENT)
-  return full_name
+  return (uuid, full_name)
 
 
-def _ExpandInstanceName(cfg, name):
+def ExpandInstanceUuidAndName(cfg, expected_uuid, name):
   """Wrapper over L{_ExpandItemName} for instance."""
-  return _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
+  (uuid, full_name) = _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
+  if expected_uuid is not None and uuid != expected_uuid:
+    raise errors.OpPrereqError(
+      "The instances UUID '%s' does not match the expected UUID '%s' for"
+      " instance '%s'. Maybe the instance changed since you submitted this"
+      " job." % (uuid, expected_uuid, full_name), errors.ECODE_NOTUNIQUE)
+  return (uuid, full_name)
 
 
-def _ExpandNodeName(cfg, name):
-  """Wrapper over L{_ExpandItemName} for nodes."""
-  return _ExpandItemName(cfg.ExpandNodeName, name, "Node")
+def ExpandNodeUuidAndName(cfg, expected_uuid, name):
+  """Expand a short node name into the node UUID and full name.
+
+  @type cfg: L{config.ConfigWriter}
+  @param cfg: The cluster configuration
+  @type expected_uuid: string
+  @param expected_uuid: expected UUID for the node (or None if there is no
+        expectation). If it does not match, a L{errors.OpPrereqError} is
+        raised.
+  @type name: string
+  @param name: the short node name
+
+  """
+  (uuid, full_name) = _ExpandItemName(cfg.ExpandNodeName, name, "Node")
+  if expected_uuid is not None and uuid != expected_uuid:
+    raise errors.OpPrereqError(
+      "The nodes UUID '%s' does not match the expected UUID '%s' for node"
+      " '%s'. Maybe the node changed since you submitted this job." %
+      (uuid, expected_uuid, full_name), errors.ECODE_NOTUNIQUE)
+  return (uuid, full_name)
 
 
-def _ShareAll():
+def ShareAll():
   """Returns a dict declaring all lock levels shared.
 
   """
   return dict.fromkeys(locking.LEVELS, 1)
 
 
-def _CheckNodeGroupInstances(cfg, group_uuid, owned_instances):
+def CheckNodeGroupInstances(cfg, group_uuid, owned_instance_names):
   """Checks if the instances in a node group are still correct.
 
   @type cfg: L{config.ConfigWriter}
   @param cfg: The cluster configuration
   @type group_uuid: string
   @param group_uuid: Node group UUID
-  @type owned_instances: set or frozenset
-  @param owned_instances: List of currently owned instances
+  @type owned_instance_names: set or frozenset
+  @param owned_instance_names: List of currently owned instances
 
   """
-  wanted_instances = cfg.GetNodeGroupInstances(group_uuid)
-  if owned_instances != wanted_instances:
+  wanted_instances = frozenset(cfg.GetInstanceNames(
+                                 cfg.GetNodeGroupInstances(group_uuid)))
+  if owned_instance_names != wanted_instances:
     raise errors.OpPrereqError("Instances in node group '%s' changed since"
                                " locks were acquired, wanted '%s', have '%s';"
                                " retry the operation" %
                                (group_uuid,
                                 utils.CommaJoin(wanted_instances),
-                                utils.CommaJoin(owned_instances)),
+                                utils.CommaJoin(owned_instance_names)),
                                errors.ECODE_STATE)
 
   return wanted_instances
 
 
-def _GetWantedNodes(lu, nodes):
+def GetWantedNodes(lu, short_node_names):
   """Returns list of checked and expanded node names.
 
   @type lu: L{LogicalUnit}
   @param lu: the logical unit on whose behalf we execute
-  @type nodes: list
-  @param nodes: list of node names or None for all nodes
-  @rtype: list
-  @return: the list of nodes, sorted
+  @type short_node_names: list
+  @param short_node_names: list of node names or None for all nodes
+  @rtype: tuple of lists
+  @return: tupe with (list of node UUIDs, list of node names)
   @raise errors.ProgrammerError: if the nodes parameter is wrong type
 
   """
-  if nodes:
-    return [_ExpandNodeName(lu.cfg, name) for name in nodes]
+  if short_node_names:
+    node_uuids = [ExpandNodeUuidAndName(lu.cfg, None, name)[0]
+                  for name in short_node_names]
+  else:
+    node_uuids = lu.cfg.GetNodeList()
 
-  return utils.NiceSort(lu.cfg.GetNodeList())
+  return (node_uuids, [lu.cfg.GetNodeName(uuid) for uuid in node_uuids])
 
 
-def _GetWantedInstances(lu, instances):
+def GetWantedInstances(lu, short_inst_names):
   """Returns list of checked and expanded instance names.
 
   @type lu: L{LogicalUnit}
   @param lu: the logical unit on whose behalf we execute
-  @type instances: list
-  @param instances: list of instance names or None for all instances
-  @rtype: list
-  @return: the list of instances, sorted
+  @type short_inst_names: list
+  @param short_inst_names: list of instance names or None for all instances
+  @rtype: tuple of lists
+  @return: tuple of (instance UUIDs, instance names)
   @raise errors.OpPrereqError: if the instances parameter is wrong type
   @raise errors.OpPrereqError: if any of the passed instances is not found
 
   """
-  if instances:
-    wanted = [_ExpandInstanceName(lu.cfg, name) for name in instances]
+  if short_inst_names:
+    inst_uuids = [ExpandInstanceUuidAndName(lu.cfg, None, name)[0]
+                  for name in short_inst_names]
   else:
-    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
-  return wanted
+    inst_uuids = lu.cfg.GetInstanceList()
+  return (inst_uuids, [lu.cfg.GetInstanceName(uuid) for uuid in inst_uuids])
 
 
-def _RunPostHook(lu, node_name):
+def RunPostHook(lu, node_name):
   """Runs the post-hook for an opcode on a single node.
 
   """
   hm = lu.proc.BuildHooksManager(lu)
   try:
-    hm.RunPhase(constants.HOOKS_PHASE_POST, nodes=[node_name])
+    hm.RunPhase(constants.HOOKS_PHASE_POST, node_names=[node_name])
   except Exception, err: # pylint: disable=W0703
     lu.LogWarning("Errors occurred running hooks on %s: %s",
                   node_name, err)
 
 
-def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True):
+def RedistributeAncillaryFiles(lu):
   """Distribute additional files which are part of the cluster configuration.
 
   ConfigWriter takes care of distributing the config and ssconf files, but
   there are more files which should be distributed to all nodes. This function
   makes sure those are copied.
 
-  @param lu: calling logical unit
-  @param additional_nodes: list of nodes not in the config to distribute to
-  @type additional_vm: boolean
-  @param additional_vm: whether the additional nodes are vm-capable or not
-
   """
   # Gather target nodes
   cluster = lu.cfg.GetClusterInfo()
   master_info = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
 
-  online_nodes = lu.cfg.GetOnlineNodeList()
-  online_set = frozenset(online_nodes)
-  vm_nodes = list(online_set.intersection(lu.cfg.GetVmCapableNodeList()))
-
-  if additional_nodes is not None:
-    online_nodes.extend(additional_nodes)
-    if additional_vm:
-      vm_nodes.extend(additional_nodes)
+  online_node_uuids = lu.cfg.GetOnlineNodeList()
+  online_node_uuid_set = frozenset(online_node_uuids)
+  vm_node_uuids = list(online_node_uuid_set.intersection(
+                         lu.cfg.GetVmCapableNodeList()))
 
   # Never distribute to master node
-  for nodelist in [online_nodes, vm_nodes]:
-    if master_info.name in nodelist:
-      nodelist.remove(master_info.name)
+  for node_uuids in [online_node_uuids, vm_node_uuids]:
+    if master_info.uuid in node_uuids:
+      node_uuids.remove(master_info.uuid)
 
   # Gather file lists
   (files_all, _, files_mc, files_vm) = \
-    _ComputeAncillaryFiles(cluster, True)
+    ComputeAncillaryFiles(cluster, True)
 
   # Never re-distribute configuration file from here
   assert not (pathutils.CLUSTER_CONF_FILE in files_all or
@@ -197,17 +216,17 @@ def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True):
   assert not files_mc, "Master candidates not handled in this function"
 
   filemap = [
-    (online_nodes, files_all),
-    (vm_nodes, files_vm),
+    (online_node_uuids, files_all),
+    (vm_node_uuids, files_vm),
     ]
 
   # Upload the files
-  for (node_list, files) in filemap:
+  for (node_uuids, files) in filemap:
     for fname in files:
-      _UploadHelper(lu, node_list, fname)
+      UploadHelper(lu, node_uuids, fname)
 
 
-def _ComputeAncillaryFiles(cluster, redist):
+def ComputeAncillaryFiles(cluster, redist):
   """Compute files external to Ganeti which need to be consistent.
 
   @type redist: boolean
@@ -251,8 +270,8 @@ def _ComputeAncillaryFiles(cluster, redist):
     files_mc.add(pathutils.CLUSTER_CONF_FILE)
 
   # File storage
-  if (not redist and (constants.ENABLE_FILE_STORAGE or
-                        constants.ENABLE_SHARED_FILE_STORAGE)):
+  if (not redist and (cluster.IsFileStorageEnabled() or
+                        cluster.IsSharedFileStorageEnabled())):
     files_all.add(pathutils.FILE_STORAGE_PATHS_FILE)
     files_opt.add(pathutils.FILE_STORAGE_PATHS_FILE)
 
@@ -286,21 +305,21 @@ def _ComputeAncillaryFiles(cluster, redist):
   return (files_all, files_opt, files_mc, files_vm)
 
 
-def _UploadHelper(lu, nodes, fname):
+def UploadHelper(lu, node_uuids, fname):
   """Helper for uploading a file and showing warnings.
 
   """
   if os.path.exists(fname):
-    result = lu.rpc.call_upload_file(nodes, fname)
-    for to_node, to_result in result.items():
+    result = lu.rpc.call_upload_file(node_uuids, fname)
+    for to_node_uuids, to_result in result.items():
       msg = to_result.fail_msg
       if msg:
         msg = ("Copy of file %s to node %s failed: %s" %
-               (fname, to_node, msg))
+               (fname, lu.cfg.GetNodeName(to_node_uuids), msg))
         lu.LogWarning(msg)
 
 
-def _MergeAndVerifyHvState(op_input, obj_input):
+def MergeAndVerifyHvState(op_input, obj_input):
   """Combines the hv state from an opcode with the one of the object
 
   @param op_input: The input dict from the opcode
@@ -322,7 +341,7 @@ def _MergeAndVerifyHvState(op_input, obj_input):
   return None
 
 
-def _MergeAndVerifyDiskState(op_input, obj_input):
+def MergeAndVerifyDiskState(op_input, obj_input):
   """Combines the disk state from an opcode with the one of the object
 
   @param op_input: The input dict from the opcode
@@ -345,7 +364,7 @@ def _MergeAndVerifyDiskState(op_input, obj_input):
   return None
 
 
-def _CheckOSParams(lu, required, nodenames, osname, osparams):
+def CheckOSParams(lu, required, node_uuids, osname, osparams):
   """OS parameters validation.
 
   @type lu: L{LogicalUnit}
@@ -353,8 +372,8 @@ def _CheckOSParams(lu, required, nodenames, osname, osparams):
   @type required: boolean
   @param required: whether the validation should fail if the OS is not
       found
-  @type nodenames: list
-  @param nodenames: the list of nodes on which we should check
+  @type node_uuids: list
+  @param node_uuids: the list of nodes on which we should check
   @type osname: string
   @param osname: the name of the hypervisor we should use
   @type osparams: dict
@@ -362,20 +381,21 @@ def _CheckOSParams(lu, required, nodenames, osname, osparams):
   @raise errors.OpPrereqError: if the parameters are not valid
 
   """
-  nodenames = _FilterVmNodes(lu, nodenames)
-  result = lu.rpc.call_os_validate(nodenames, required, osname,
+  node_uuids = _FilterVmNodes(lu, node_uuids)
+  result = lu.rpc.call_os_validate(node_uuids, required, osname,
                                    [constants.OS_VALIDATE_PARAMETERS],
                                    osparams)
-  for node, nres in result.items():
+  for node_uuid, nres in result.items():
     # we don't check for offline cases since this should be run only
     # against the master node and/or an instance's nodes
-    nres.Raise("OS Parameters validation failed on node %s" % node)
+    nres.Raise("OS Parameters validation failed on node %s" %
+               lu.cfg.GetNodeName(node_uuid))
     if not nres.payload:
       lu.LogInfo("OS %s not found on node %s, validation skipped",
-                 osname, node)
+                 osname, lu.cfg.GetNodeName(node_uuid))
 
 
-def _CheckHVParams(lu, nodenames, hvname, hvparams):
+def CheckHVParams(lu, node_uuids, hvname, hvparams):
   """Hypervisor parameter validation.
 
   This function abstract the hypervisor parameter validation to be
@@ -383,8 +403,8 @@ def _CheckHVParams(lu, nodenames, hvname, hvparams):
 
   @type lu: L{LogicalUnit}
   @param lu: the logical unit for which we check
-  @type nodenames: list
-  @param nodenames: the list of nodes on which we should check
+  @type node_uuids: list
+  @param node_uuids: the list of nodes on which we should check
   @type hvname: string
   @param hvname: the name of the hypervisor we should use
   @type hvparams: dict
@@ -392,20 +412,21 @@ def _CheckHVParams(lu, nodenames, hvname, hvparams):
   @raise errors.OpPrereqError: if the parameters are not valid
 
   """
-  nodenames = _FilterVmNodes(lu, nodenames)
+  node_uuids = _FilterVmNodes(lu, node_uuids)
 
   cluster = lu.cfg.GetClusterInfo()
   hvfull = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
 
-  hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames, hvname, hvfull)
-  for node in nodenames:
-    info = hvinfo[node]
+  hvinfo = lu.rpc.call_hypervisor_validate_params(node_uuids, hvname, hvfull)
+  for node_uuid in node_uuids:
+    info = hvinfo[node_uuid]
     if info.offline:
       continue
-    info.Raise("Hypervisor parameter validation failed on node %s" % node)
+    info.Raise("Hypervisor parameter validation failed on node %s" %
+               lu.cfg.GetNodeName(node_uuid))
 
 
-def _AdjustCandidatePool(lu, exceptions):
+def AdjustCandidatePool(lu, exceptions):
   """Adjust the candidate pool after node operations.
 
   """
@@ -413,15 +434,15 @@ def _AdjustCandidatePool(lu, exceptions):
   if mod_list:
     lu.LogInfo("Promoted nodes to master candidate role: %s",
                utils.CommaJoin(node.name for node in mod_list))
-    for name in mod_list:
-      lu.context.ReaddNode(name)
+    for node in mod_list:
+      lu.context.ReaddNode(node)
   mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
   if mc_now > mc_max:
     lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
                (mc_now, mc_max))
 
 
-def _CheckNodePVs(nresult, exclusive_storage):
+def CheckNodePVs(nresult, exclusive_storage):
   """Check node PVs.
 
   """
@@ -475,10 +496,10 @@ def _ComputeMinMaxSpec(name, qualifier, ispecs, value):
   return None
 
 
-def _ComputeIPolicySpecViolation(ipolicy, mem_size, cpu_count, disk_count,
-                                 nic_count, disk_sizes, spindle_use,
-                                 disk_template,
-                                 _compute_fn=_ComputeMinMaxSpec):
+def ComputeIPolicySpecViolation(ipolicy, mem_size, cpu_count, disk_count,
+                                nic_count, disk_sizes, spindle_use,
+                                disk_template,
+                                _compute_fn=_ComputeMinMaxSpec):
   """Verifies ipolicy against provided specs.
 
   @type ipolicy: dict
@@ -530,8 +551,8 @@ def _ComputeIPolicySpecViolation(ipolicy, mem_size, cpu_count, disk_count,
   return ret + min_errs
 
 
-def _ComputeIPolicyInstanceViolation(ipolicy, instance, cfg,
-                                     _compute_fn=_ComputeIPolicySpecViolation):
+def ComputeIPolicyInstanceViolation(ipolicy, instance, cfg,
+                                    _compute_fn=ComputeIPolicySpecViolation):
   """Compute if instance meets the specs of ipolicy.
 
   @type ipolicy: dict
@@ -541,20 +562,33 @@ def _ComputeIPolicyInstanceViolation(ipolicy, instance, cfg,
   @type cfg: L{config.ConfigWriter}
   @param cfg: Cluster configuration
   @param _compute_fn: The function to verify ipolicy (unittest only)
-  @see: L{_ComputeIPolicySpecViolation}
+  @see: L{ComputeIPolicySpecViolation}
 
   """
+  ret = []
   be_full = cfg.GetClusterInfo().FillBE(instance)
   mem_size = be_full[constants.BE_MAXMEM]
   cpu_count = be_full[constants.BE_VCPUS]
-  spindle_use = be_full[constants.BE_SPINDLE_USE]
+  es_flags = rpc.GetExclusiveStorageForNodes(cfg, instance.all_nodes)
+  if any(es_flags.values()):
+    # With exclusive storage use the actual spindles
+    try:
+      spindle_use = sum([disk.spindles for disk in instance.disks])
+    except TypeError:
+      ret.append("Number of spindles not configured for disks of instance %s"
+                 " while exclusive storage is enabled, try running gnt-cluster"
+                 " repair-disk-sizes" % instance.name)
+      # _ComputeMinMaxSpec ignores 'None's
+      spindle_use = None
+  else:
+    spindle_use = be_full[constants.BE_SPINDLE_USE]
   disk_count = len(instance.disks)
   disk_sizes = [disk.size for disk in instance.disks]
   nic_count = len(instance.nics)
   disk_template = instance.disk_template
 
-  return _compute_fn(ipolicy, mem_size, cpu_count, disk_count, nic_count,
-                     disk_sizes, spindle_use, disk_template)
+  return ret + _compute_fn(ipolicy, mem_size, cpu_count, disk_count, nic_count,
+                           disk_sizes, spindle_use, disk_template)
 
 
 def _ComputeViolatingInstances(ipolicy, instances, cfg):
@@ -569,10 +603,10 @@ def _ComputeViolatingInstances(ipolicy, instances, cfg):
 
   """
   return frozenset([inst.name for inst in instances
-                    if _ComputeIPolicyInstanceViolation(ipolicy, inst, cfg)])
+                    if ComputeIPolicyInstanceViolation(ipolicy, inst, cfg)])
 
 
-def _ComputeNewInstanceViolations(old_ipolicy, new_ipolicy, instances, cfg):
+def ComputeNewInstanceViolations(old_ipolicy, new_ipolicy, instances, cfg):
   """Computes a set of any instances that would violate the new ipolicy.
 
   @param old_ipolicy: The current (still in-place) ipolicy
@@ -588,7 +622,7 @@ def _ComputeNewInstanceViolations(old_ipolicy, new_ipolicy, instances, cfg):
           _ComputeViolatingInstances(old_ipolicy, instances, cfg))
 
 
-def _GetUpdatedParams(old_params, update_dict,
+def GetUpdatedParams(old_params, update_dict,
                       use_default=True, use_none=False):
   """Return the new version of a parameter dictionary.
 
@@ -621,7 +655,7 @@ def _GetUpdatedParams(old_params, update_dict,
   return params_copy
 
 
-def _GetUpdatedIPolicy(old_ipolicy, new_ipolicy, group_policy=False):
+def GetUpdatedIPolicy(old_ipolicy, new_ipolicy, group_policy=False):
   """Return the new version of an instance policy.
 
   @param group_policy: whether this policy applies to a group and thus
@@ -660,8 +694,8 @@ def _GetUpdatedIPolicy(old_ipolicy, new_ipolicy, group_policy=False):
         if group_policy:
           msg = "%s cannot appear in group instance specs" % key
           raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
-        ipolicy[key] = _GetUpdatedParams(old_ipolicy.get(key, {}), value,
-                                         use_none=False, use_default=False)
+        ipolicy[key] = GetUpdatedParams(old_ipolicy.get(key, {}), value,
+                                        use_none=False, use_default=False)
         utils.ForceDictType(ipolicy[key], constants.ISPECS_PARAMETER_TYPES)
       else:
         # FIXME: we assume all others are lists; this should be redone
@@ -675,7 +709,7 @@ def _GetUpdatedIPolicy(old_ipolicy, new_ipolicy, group_policy=False):
   return ipolicy
 
 
-def _AnnotateDiskParams(instance, devs, cfg):
+def AnnotateDiskParams(instance, devs, cfg):
   """Little helper wrapper to the rpc annotation method.
 
   @param instance: The instance object
@@ -690,7 +724,7 @@ def _AnnotateDiskParams(instance, devs, cfg):
                                 cfg.GetInstanceDiskParams(instance))
 
 
-def _SupportsOob(cfg, node):
+def SupportsOob(cfg, node):
   """Tells if node supports OOB.
 
   @type cfg: L{config.ConfigWriter}
@@ -713,7 +747,7 @@ def _UpdateAndVerifySubDict(base, updates, type_check):
 
   """
   def fn(old, value):
-    new = _GetUpdatedParams(old, value)
+    new = GetUpdatedParams(old, value)
     utils.ForceDictType(new, type_check)
     return new
 
@@ -723,22 +757,22 @@ def _UpdateAndVerifySubDict(base, updates, type_check):
   return ret
 
 
-def _FilterVmNodes(lu, nodenames):
+def _FilterVmNodes(lu, node_uuids):
   """Filters out non-vm_capable nodes from a list.
 
   @type lu: L{LogicalUnit}
   @param lu: the logical unit for which we check
-  @type nodenames: list
-  @param nodenames: the list of nodes on which we should check
+  @type node_uuids: list
+  @param node_uuids: the list of nodes on which we should check
   @rtype: list
   @return: the list of vm-capable nodes
 
   """
   vm_nodes = frozenset(lu.cfg.GetNonVmCapableNodeList())
-  return [name for name in nodenames if name not in vm_nodes]
+  return [uuid for uuid in node_uuids if uuid not in vm_nodes]
 
 
-def _GetDefaultIAllocator(cfg, ialloc):
+def GetDefaultIAllocator(cfg, ialloc):
   """Decides on which iallocator to use.
 
   @type cfg: L{config.ConfigWriter}
@@ -761,54 +795,53 @@ def _GetDefaultIAllocator(cfg, ialloc):
   return ialloc
 
 
-def _CheckInstancesNodeGroups(cfg, instances, owned_groups, owned_nodes,
-                              cur_group_uuid):
+def CheckInstancesNodeGroups(cfg, instances, owned_groups, owned_node_uuids,
+                             cur_group_uuid):
   """Checks if node groups for locked instances are still correct.
 
   @type cfg: L{config.ConfigWriter}
   @param cfg: Cluster configuration
   @type instances: dict; string as key, L{objects.Instance} as value
-  @param instances: Dictionary, instance name as key, instance object as value
+  @param instances: Dictionary, instance UUID as key, instance object as value
   @type owned_groups: iterable of string
   @param owned_groups: List of owned groups
-  @type owned_nodes: iterable of string
-  @param owned_nodes: List of owned nodes
+  @type owned_node_uuids: iterable of string
+  @param owned_node_uuids: List of owned nodes
   @type cur_group_uuid: string or None
   @param cur_group_uuid: Optional group UUID to check against instance's groups
 
   """
-  for (name, inst) in instances.items():
-    assert owned_nodes.issuperset(inst.all_nodes), \
-      "Instance %s's nodes changed while we kept the lock" % name
+  for (uuid, inst) in instances.items():
+    assert owned_node_uuids.issuperset(inst.all_nodes), \
+      "Instance %s's nodes changed while we kept the lock" % inst.name
 
-    inst_groups = _CheckInstanceNodeGroups(cfg, name, owned_groups)
+    inst_groups = CheckInstanceNodeGroups(cfg, uuid, owned_groups)
 
     assert cur_group_uuid is None or cur_group_uuid in inst_groups, \
-      "Instance %s has no node in group %s" % (name, cur_group_uuid)
+      "Instance %s has no node in group %s" % (inst.name, cur_group_uuid)
 
 
-def _CheckInstanceNodeGroups(cfg, instance_name, owned_groups,
-                             primary_only=False):
+def CheckInstanceNodeGroups(cfg, inst_uuid, owned_groups, primary_only=False):
   """Checks if the owned node groups are still correct for an instance.
 
   @type cfg: L{config.ConfigWriter}
   @param cfg: The cluster configuration
-  @type instance_name: string
-  @param instance_name: Instance name
+  @type inst_uuid: string
+  @param inst_uuid: Instance UUID
   @type owned_groups: set or frozenset
   @param owned_groups: List of currently owned node groups
   @type primary_only: boolean
   @param primary_only: Whether to check node groups for only the primary node
 
   """
-  inst_groups = cfg.GetInstanceNodeGroups(instance_name, primary_only)
+  inst_groups = cfg.GetInstanceNodeGroups(inst_uuid, primary_only)
 
   if not owned_groups.issuperset(inst_groups):
     raise errors.OpPrereqError("Instance %s's node groups changed since"
                                " locks were acquired, current groups are"
                                " are '%s', owning groups '%s'; retry the"
                                " operation" %
-                               (instance_name,
+                               (cfg.GetInstanceName(inst_uuid),
                                 utils.CommaJoin(inst_groups),
                                 utils.CommaJoin(owned_groups)),
                                errors.ECODE_STATE)
@@ -816,7 +849,7 @@ def _CheckInstanceNodeGroups(cfg, instance_name, owned_groups,
   return inst_groups
 
 
-def _LoadNodeEvacResult(lu, alloc_result, early_release, use_nodes):
+def LoadNodeEvacResult(lu, alloc_result, early_release, use_nodes):
   """Unpacks the result of change-group and node-evacuate iallocator requests.
 
   Iallocator modes L{constants.IALLOCATOR_MODE_NODE_EVAC} and
@@ -842,21 +875,22 @@ def _LoadNodeEvacResult(lu, alloc_result, early_release, use_nodes):
 
   if moved:
     lu.LogInfo("Instances to be moved: %s",
-               utils.CommaJoin("%s (to %s)" %
-                               (name, _NodeEvacDest(use_nodes, group, nodes))
-                               for (name, group, nodes) in moved))
+               utils.CommaJoin(
+                 "%s (to %s)" %
+                 (name, _NodeEvacDest(use_nodes, group, node_names))
+                 for (name, group, node_names) in moved))
 
   return [map(compat.partial(_SetOpEarlyRelease, early_release),
               map(opcodes.OpCode.LoadOpCode, ops))
           for ops in jobs]
 
 
-def _NodeEvacDest(use_nodes, group, nodes):
+def _NodeEvacDest(use_nodes, group, node_names):
   """Returns group or nodes depending on caller's choice.
 
   """
   if use_nodes:
-    return utils.CommaJoin(nodes)
+    return utils.CommaJoin(node_names)
   else:
     return group
 
@@ -873,20 +907,21 @@ def _SetOpEarlyRelease(early_release, op):
   return op
 
 
-def _MapInstanceDisksToNodes(instances):
+def MapInstanceLvsToNodes(instances):
   """Creates a map from (node, volume) to instance name.
 
   @type instances: list of L{objects.Instance}
-  @rtype: dict; tuple of (node name, volume name) as key, instance name as value
+  @rtype: dict; tuple of (node uuid, volume name) as key, L{objects.Instance}
+          object as value
 
   """
-  return dict(((node, vol), inst.name)
+  return dict(((node_uuid, vol), inst)
               for inst in instances
-              for (node, vols) in inst.MapLVsByNode().items()
+              for (node_uuid, vols) in inst.MapLVsByNode().items()
               for vol in vols)
 
 
-def _CheckParamsNotGlobal(params, glob_pars, kind, bad_levels, good_levels):
+def CheckParamsNotGlobal(params, glob_pars, kind, bad_levels, good_levels):
   """Make sure that none of the given paramters is global.
 
   If a global parameter is found, an L{errors.OpPrereqError} exception is
@@ -915,7 +950,7 @@ def _CheckParamsNotGlobal(params, glob_pars, kind, bad_levels, good_levels):
     raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
 
 
-def _IsExclusiveStorageEnabledNode(cfg, node):
+def IsExclusiveStorageEnabledNode(cfg, node):
   """Whether exclusive_storage is in effect for the given node.
 
   @type cfg: L{config.ConfigWriter}
@@ -929,7 +964,7 @@ def _IsExclusiveStorageEnabledNode(cfg, node):
   return cfg.GetNdParams(node)[constants.ND_EXCLUSIVE_STORAGE]
 
 
-def _CheckInstanceState(lu, instance, req_states, msg=None):
+def CheckInstanceState(lu, instance, req_states, msg=None):
   """Ensure that an instance is in one of the required states.
 
   @param lu: the LU on behalf of which we make the check
@@ -947,10 +982,13 @@ def _CheckInstanceState(lu, instance, req_states, msg=None):
                                errors.ECODE_STATE)
 
   if constants.ADMINST_UP not in req_states:
-    pnode = instance.primary_node
-    if not lu.cfg.GetNodeInfo(pnode).offline:
-      ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
-      ins_l.Raise("Can't contact node %s for instance information" % pnode,
+    pnode_uuid = instance.primary_node
+    if not lu.cfg.GetNodeInfo(pnode_uuid).offline:
+      all_hvparams = lu.cfg.GetClusterInfo().hvparams
+      ins_l = lu.rpc.call_instance_list(
+                [pnode_uuid], [instance.hypervisor], all_hvparams)[pnode_uuid]
+      ins_l.Raise("Can't contact node %s for instance information" %
+                  lu.cfg.GetNodeName(pnode_uuid),
                   prereq=True, ecode=errors.ECODE_ENVIRON)
       if instance.name in ins_l.payload:
         raise errors.OpPrereqError("Instance %s is running, %s" %
@@ -960,7 +998,7 @@ def _CheckInstanceState(lu, instance, req_states, msg=None):
                      " is down")
 
 
-def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
+def CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
   """Check the sanity of iallocator and node arguments and use the
   cluster-wide iallocator if appropriate.
 
@@ -996,15 +1034,16 @@ def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
                                  " iallocator", errors.ECODE_INVAL)
 
 
-def _FindFaultyInstanceDisks(cfg, rpc_runner, instance, node_name, prereq):
+def FindFaultyInstanceDisks(cfg, rpc_runner, instance, node_uuid, prereq):
   faulty = []
 
   for dev in instance.disks:
-    cfg.SetDiskID(dev, node_name)
+    cfg.SetDiskID(dev, node_uuid)
 
-  result = rpc_runner.call_blockdev_getmirrorstatus(node_name, (instance.disks,
-                                                                instance))
-  result.Raise("Failed to get disk status from node %s" % node_name,
+  result = rpc_runner.call_blockdev_getmirrorstatus(
+             node_uuid, (instance.disks, instance))
+  result.Raise("Failed to get disk status from node %s" %
+               cfg.GetNodeName(node_uuid),
                prereq=prereq, ecode=errors.ECODE_ENVIRON)
 
   for idx, bdev_status in enumerate(result.payload):
@@ -1012,3 +1051,89 @@ def _FindFaultyInstanceDisks(cfg, rpc_runner, instance, node_name, prereq):
       faulty.append(idx)
 
   return faulty
+
+
+def CheckNodeOnline(lu, node_uuid, msg=None):
+  """Ensure that a given node is online.
+
+  @param lu: the LU on behalf of which we make the check
+  @param node_uuid: the node to check
+  @param msg: if passed, should be a message to replace the default one
+  @raise errors.OpPrereqError: if the node is offline
+
+  """
+  if msg is None:
+    msg = "Can't use offline node"
+  if lu.cfg.GetNodeInfo(node_uuid).offline:
+    raise errors.OpPrereqError("%s: %s" % (msg, lu.cfg.GetNodeName(node_uuid)),
+                               errors.ECODE_STATE)
+
+
+def CheckDiskTemplateEnabled(cluster, disk_template):
+  """Helper function to check if a disk template is enabled.
+
+  @type cluster: C{objects.Cluster}
+  @param cluster: the cluster's configuration
+  @type disk_template: str
+  @param disk_template: the disk template to be checked
+
+  """
+  assert disk_template is not None
+  if disk_template not in constants.DISK_TEMPLATES:
+    raise errors.OpPrereqError("'%s' is not a valid disk template."
+                               " Valid disk templates are: %s" %
+                               (disk_template,
+                                ",".join(constants.DISK_TEMPLATES)))
+  if not disk_template in cluster.enabled_disk_templates:
+    raise errors.OpPrereqError("Disk template '%s' is not enabled in cluster."
+                               " Enabled disk templates are: %s" %
+                               (disk_template,
+                                ",".join(cluster.enabled_disk_templates)))
+
+
+def CheckStorageTypeEnabled(cluster, storage_type):
+  """Helper function to check if a storage type is enabled.
+
+  @type cluster: C{objects.Cluster}
+  @param cluster: the cluster's configuration
+  @type storage_type: str
+  @param storage_type: the storage type to be checked
+
+  """
+  assert storage_type is not None
+  assert storage_type in constants.STORAGE_TYPES
+  # special case for lvm-pv, because it cannot be enabled
+  # via disk templates
+  if storage_type == constants.ST_LVM_PV:
+    CheckStorageTypeEnabled(cluster, constants.ST_LVM_VG)
+  else:
+    possible_disk_templates = \
+        utils.storage.GetDiskTemplatesOfStorageType(storage_type)
+    for disk_template in possible_disk_templates:
+      if disk_template in cluster.enabled_disk_templates:
+        return
+    raise errors.OpPrereqError("No disk template of storage type '%s' is"
+                               " enabled in this cluster. Enabled disk"
+                               " templates are: %s" % (storage_type,
+                               ",".join(cluster.enabled_disk_templates)))
+
+
+def CheckIpolicyVsDiskTemplates(ipolicy, enabled_disk_templates):
+  """Checks ipolicy disk templates against enabled disk tempaltes.
+
+  @type ipolicy: dict
+  @param ipolicy: the new ipolicy
+  @type enabled_disk_templates: list of string
+  @param enabled_disk_templates: list of enabled disk templates on the
+    cluster
+  @raises errors.OpPrereqError: if there is at least one allowed disk
+    template that is not also enabled.
+
+  """
+  assert constants.IPOLICY_DTS in ipolicy
+  allowed_disk_templates = ipolicy[constants.IPOLICY_DTS]
+  not_enabled = set(allowed_disk_templates) - set(enabled_disk_templates)
+  if not_enabled:
+    raise errors.OpPrereqError("The following disk template are allowed"
+                               " by the ipolicy, but not enabled on the"
+                               " cluster: %s" % utils.CommaJoin(not_enabled))