Explicitly ask for the default iallocator in commands
[ganeti-local] / lib / cmdlib.py
index 652a940..c1eff02 100644 (file)
@@ -48,7 +48,6 @@ from ganeti import hypervisor
 from ganeti import locking
 from ganeti import constants
 from ganeti import objects
-from ganeti import serializer
 from ganeti import ssconf
 from ganeti import uidpool
 from ganeti import compat
@@ -60,13 +59,12 @@ from ganeti import opcodes
 from ganeti import ht
 from ganeti import rpc
 from ganeti import runtime
+from ganeti import pathutils
+from ganeti.masterd import iallocator
 
 import ganeti.masterd.instance # pylint: disable=W0611
 
 
-#: Size of DRBD meta block device
-DRBD_META_SIZE = 128
-
 # States of instance
 INSTANCE_DOWN = [constants.ADMINST_DOWN]
 INSTANCE_ONLINE = [constants.ADMINST_DOWN, constants.ADMINST_UP]
@@ -585,20 +583,6 @@ def _ShareAll():
   return dict.fromkeys(locking.LEVELS, 1)
 
 
-def _MakeLegacyNodeInfo(data):
-  """Formats the data returned by L{rpc.RpcRunner.call_node_info}.
-
-  Converts the data into a single dictionary. This is fine for most use cases,
-  but some require information from more than one volume group or hypervisor.
-
-  """
-  (bootid, (vg_info, ), (hv_info, )) = data
-
-  return utils.JoinDisjointDicts(utils.JoinDisjointDicts(vg_info, hv_info), {
-    "bootid": bootid,
-    })
-
-
 def _AnnotateDiskParams(instance, devs, cfg):
   """Little helper wrapper to the rpc annotation method.
 
@@ -640,7 +624,8 @@ def _CheckInstancesNodeGroups(cfg, instances, owned_groups, owned_nodes,
       "Instance %s has no node in group %s" % (name, cur_group_uuid)
 
 
-def _CheckInstanceNodeGroups(cfg, instance_name, owned_groups):
+def _CheckInstanceNodeGroups(cfg, instance_name, owned_groups,
+                             primary_only=False):
   """Checks if the owned node groups are still correct for an instance.
 
   @type cfg: L{config.ConfigWriter}
@@ -649,9 +634,11 @@ def _CheckInstanceNodeGroups(cfg, instance_name, owned_groups):
   @param instance_name: Instance name
   @type owned_groups: set or frozenset
   @param owned_groups: List of currently owned node groups
+  @type primary_only: boolean
+  @param primary_only: Whether to check node groups for only the primary node
 
   """
-  inst_groups = cfg.GetInstanceNodeGroups(instance_name)
+  inst_groups = cfg.GetInstanceNodeGroups(instance_name, primary_only)
 
   if not owned_groups.issuperset(inst_groups):
     raise errors.OpPrereqError("Instance %s's node groups changed since"
@@ -957,9 +944,8 @@ def _RunPostHook(lu, node_name):
   hm = lu.proc.BuildHooksManager(lu)
   try:
     hm.RunPhase(constants.HOOKS_PHASE_POST, nodes=[node_name])
-  except:
-    # pylint: disable=W0702
-    lu.LogWarning("Errors occurred running hooks on %s" % node_name)
+  except Exception, err: # pylint: disable=W0703
+    lu.LogWarning("Errors occurred running hooks on %s: %s" % (node_name, err))
 
 
 def _CheckOutputFields(static, dynamic, selected):
@@ -1086,7 +1072,7 @@ def _GetClusterDomainSecret():
   """Reads the cluster domain secret.
 
   """
-  return utils.ReadOneLineFile(constants.CLUSTER_DOMAIN_SECRET_FILE,
+  return utils.ReadOneLineFile(pathutils.CLUSTER_DOMAIN_SECRET_FILE,
                                strict=True)
 
 
@@ -1108,13 +1094,16 @@ def _CheckInstanceState(lu, instance, req_states, msg=None):
 
   if constants.ADMINST_UP not in req_states:
     pnode = instance.primary_node
-    ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
-    ins_l.Raise("Can't contact node %s for instance information" % pnode,
-                prereq=True, ecode=errors.ECODE_ENVIRON)
-
-    if instance.name in ins_l.payload:
-      raise errors.OpPrereqError("Instance %s is running, %s" %
-                                 (instance.name, msg), errors.ECODE_STATE)
+    if not lu.cfg.GetNodeInfo(pnode).offline:
+      ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
+      ins_l.Raise("Can't contact node %s for instance information" % pnode,
+                  prereq=True, ecode=errors.ECODE_ENVIRON)
+      if instance.name in ins_l.payload:
+        raise errors.OpPrereqError("Instance %s is running, %s" %
+                                   (instance.name, msg), errors.ECODE_STATE)
+    else:
+      lu.LogWarning("Primary node offline, ignoring check that instance"
+                     " is down")
 
 
 def _ComputeMinMaxSpec(name, qualifier, ipolicy, value):
@@ -1205,8 +1194,8 @@ def _ComputeIPolicyInstanceViolation(ipolicy, instance,
                      disk_sizes, spindle_use)
 
 
-def _ComputeIPolicyInstanceSpecViolation(ipolicy, instance_spec,
-    _compute_fn=_ComputeIPolicySpecViolation):
+def _ComputeIPolicyInstanceSpecViolation(
+  ipolicy, instance_spec, _compute_fn=_ComputeIPolicySpecViolation):
   """Compute if instance specs meets the specs of ipolicy.
 
   @type ipolicy: dict
@@ -1498,13 +1487,6 @@ def _DecideSelfPromotion(lu, exceptions=None):
   return mc_now < mc_should
 
 
-def _CalculateGroupIPolicy(cluster, group):
-  """Calculate instance policy for group.
-
-  """
-  return cluster.SimpleFillIPolicy(group.ipolicy)
-
-
 def _ComputeViolatingInstances(ipolicy, instances):
   """Computes a set of instances who violates given ipolicy.
 
@@ -1628,8 +1610,9 @@ def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
   cluster-wide iallocator if appropriate.
 
   Check that at most one of (iallocator, node) is specified. If none is
-  specified, then the LU's opcode's iallocator slot is filled with the
-  cluster-wide default iallocator.
+  specified, or the iallocator is L{constants.DEFAULT_IALLOCATOR_SHORTCUT},
+  then the LU's opcode's iallocator slot is filled with the cluster-wide
+  default iallocator.
 
   @type iallocator_slot: string
   @param iallocator_slot: the name of the opcode iallocator slot
@@ -1638,12 +1621,15 @@ def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
 
   """
   node = getattr(lu.op, node_slot, None)
-  iallocator = getattr(lu.op, iallocator_slot, None)
+  ialloc = getattr(lu.op, iallocator_slot, None)
+  if node == []:
+    node = None
 
-  if node is not None and iallocator is not None:
+  if node is not None and ialloc is not None:
     raise errors.OpPrereqError("Do not specify both, iallocator and node",
                                errors.ECODE_INVAL)
-  elif node is None and iallocator is None:
+  elif ((node is None and ialloc is None) or
+        ialloc == constants.DEFAULT_IALLOCATOR_SHORTCUT):
     default_iallocator = lu.cfg.GetDefaultIAllocator()
     if default_iallocator:
       setattr(lu.op, iallocator_slot, default_iallocator)
@@ -1652,30 +1638,30 @@ def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
                                  " cluster-wide default iallocator found;"
                                  " please specify either an iallocator or a"
                                  " node, or set a cluster-wide default"
-                                 " iallocator")
+                                 " iallocator", errors.ECODE_INVAL)
 
 
-def _GetDefaultIAllocator(cfg, iallocator):
+def _GetDefaultIAllocator(cfg, ialloc):
   """Decides on which iallocator to use.
 
   @type cfg: L{config.ConfigWriter}
   @param cfg: Cluster configuration object
-  @type iallocator: string or None
-  @param iallocator: Iallocator specified in opcode
+  @type ialloc: string or None
+  @param ialloc: Iallocator specified in opcode
   @rtype: string
   @return: Iallocator name
 
   """
-  if not iallocator:
+  if not ialloc:
     # Use default iallocator
-    iallocator = cfg.GetDefaultIAllocator()
+    ialloc = cfg.GetDefaultIAllocator()
 
-  if not iallocator:
+  if not ialloc:
     raise errors.OpPrereqError("No iallocator was specified, neither in the"
                                " opcode nor as a cluster-wide default",
                                errors.ECODE_INVAL)
 
-  return iallocator
+  return ialloc
 
 
 class LUClusterPostInit(LogicalUnit):
@@ -1917,10 +1903,11 @@ class LUClusterVerify(NoHooksLU):
       # Always depend on global verification
       depends_fn = lambda: [(-len(jobs), [])]
 
-    jobs.extend([opcodes.OpClusterVerifyGroup(group_name=group,
-                                            ignore_errors=self.op.ignore_errors,
-                                            depends=depends_fn())]
-                for group in groups)
+    jobs.extend(
+      [opcodes.OpClusterVerifyGroup(group_name=group,
+                                    ignore_errors=self.op.ignore_errors,
+                                    depends=depends_fn())]
+      for group in groups)
 
     # Fix up all parameters
     for op in itertools.chain(*jobs): # pylint: disable=W0142
@@ -1982,7 +1969,7 @@ class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
 
     feedback_fn("* Verifying cluster certificate files")
 
-    for cert_filename in constants.ALL_CERT_FILES:
+    for cert_filename in pathutils.ALL_CERT_FILES:
       (errcode, msg) = _VerifyCertificate(cert_filename)
       self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
 
@@ -2421,7 +2408,9 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
     node_vol_should = {}
     instanceconfig.MapLVsByNode(node_vol_should)
 
-    ipolicy = _CalculateGroupIPolicy(self.cfg.GetClusterInfo(), self.group_info)
+    cluster = self.cfg.GetClusterInfo()
+    ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
+                                                            self.group_info)
     err = _ComputeIPolicyInstanceViolation(ipolicy, instanceconfig)
     _ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance, utils.CommaJoin(err))
 
@@ -2642,7 +2631,7 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
 
     if drbd_helper:
       helper_result = nresult.get(constants.NV_DRBDHELPER, None)
-      test = (helper_result == None)
+      test = (helper_result is None)
       _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
                "no drbd usermode helper returned")
       if helper_result:
@@ -3084,7 +3073,7 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
 
     user_scripts = []
     if self.cfg.GetUseExternalMipScript():
-      user_scripts.append(constants.EXTERNAL_MASTER_SETUP_SCRIPT)
+      user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
 
     node_verify_param = {
       constants.NV_FILELIST:
@@ -3115,9 +3104,9 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
       node_verify_param[constants.NV_VGLIST] = None
       node_verify_param[constants.NV_LVLIST] = vg_name
       node_verify_param[constants.NV_PVLIST] = [vg_name]
-      node_verify_param[constants.NV_DRBDLIST] = None
 
     if drbd_helper:
+      node_verify_param[constants.NV_DRBDLIST] = None
       node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
 
     # bridge checks
@@ -3214,10 +3203,12 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
       if master_node not in self.my_node_info:
         additional_nodes.append(master_node)
         vf_node_info.append(self.all_node_info[master_node])
-      # Add the first vm_capable node we find which is not included
+      # Add the first vm_capable node we find which is not included,
+      # excluding the master node (which we already have)
       for node in absent_nodes:
         nodeinfo = self.all_node_info[node]
-        if nodeinfo.vm_capable and not nodeinfo.offline:
+        if (nodeinfo.vm_capable and not nodeinfo.offline and
+            node != master_node):
           additional_nodes.append(node)
           vf_node_info.append(self.all_node_info[node])
           break
@@ -3569,9 +3560,9 @@ class LUGroupVerifyDisks(NoHooksLU):
     res_instances = set()
     res_missing = {}
 
-    nv_dict = _MapInstanceDisksToNodes([inst
-            for inst in self.instances.values()
-            if inst.admin_state == constants.ADMINST_UP])
+    nv_dict = _MapInstanceDisksToNodes(
+      [inst for inst in self.instances.values()
+       if inst.admin_state == constants.ADMINST_UP])
 
     if nv_dict:
       nodes = utils.NiceSort(set(self.owned_locks(locking.LEVEL_NODE)) &
@@ -3790,13 +3781,13 @@ class LUClusterRename(LogicalUnit):
       self.cfg.Update(cluster, feedback_fn)
 
       # update the known hosts file
-      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
+      ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
       node_list = self.cfg.GetOnlineNodeList()
       try:
         node_list.remove(master_params.name)
       except ValueError:
         pass
-      _UploadHelper(self, node_list, constants.SSH_KNOWN_HOSTS_FILE)
+      _UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
     finally:
       master_params.ip = new_ip
       result = self.rpc.call_node_activate_master_ip(master_params.name,
@@ -3824,10 +3815,10 @@ def _ValidateNetmask(cfg, netmask):
     ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
   except errors.ProgrammerError:
     raise errors.OpPrereqError("Invalid primary ip family: %s." %
-                               ip_family)
+                               ip_family, errors.ECODE_INVAL)
   if not ipcls.ValidateNetmask(netmask):
     raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
-                                (netmask))
+                                (netmask), errors.ECODE_INVAL)
 
 
 class LUClusterSetParams(LogicalUnit):
@@ -3989,8 +3980,8 @@ class LUClusterSetParams(LogicalUnit):
                                if compat.any(node in group.members
                                              for node in inst.all_nodes)])
         new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
-        new = _ComputeNewInstanceViolations(_CalculateGroupIPolicy(cluster,
-                                                                   group),
+        ipol = ganeti.masterd.instance.CalculateGroupIPolicy(cluster, group)
+        new = _ComputeNewInstanceViolations(ipol,
                                             new_ipolicy, instances)
         if new:
           violations.update(new)
@@ -4026,7 +4017,7 @@ class LUClusterSetParams(LogicalUnit):
                               " address" % (instance.name, nic_idx))
       if nic_errors:
         raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
-                                   "\n".join(nic_errors))
+                                   "\n".join(nic_errors), errors.ECODE_INVAL)
 
     # hypervisor list/parameters
     self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
@@ -4292,46 +4283,48 @@ def _ComputeAncillaryFiles(cluster, redist):
   """
   # Compute files for all nodes
   files_all = set([
-    constants.SSH_KNOWN_HOSTS_FILE,
-    constants.CONFD_HMAC_KEY,
-    constants.CLUSTER_DOMAIN_SECRET_FILE,
-    constants.SPICE_CERT_FILE,
-    constants.SPICE_CACERT_FILE,
-    constants.RAPI_USERS_FILE,
+    pathutils.SSH_KNOWN_HOSTS_FILE,
+    pathutils.CONFD_HMAC_KEY,
+    pathutils.CLUSTER_DOMAIN_SECRET_FILE,
+    pathutils.SPICE_CERT_FILE,
+    pathutils.SPICE_CACERT_FILE,
+    pathutils.RAPI_USERS_FILE,
     ])
 
   if not redist:
-    files_all.update(constants.ALL_CERT_FILES)
+    files_all.update(pathutils.ALL_CERT_FILES)
     files_all.update(ssconf.SimpleStore().GetFileList())
   else:
     # we need to ship at least the RAPI certificate
-    files_all.add(constants.RAPI_CERT_FILE)
+    files_all.add(pathutils.RAPI_CERT_FILE)
 
   if cluster.modify_etc_hosts:
     files_all.add(constants.ETC_HOSTS)
 
   if cluster.use_external_mip_script:
-    files_all.add(constants.EXTERNAL_MASTER_SETUP_SCRIPT)
+    files_all.add(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
 
   # Files which are optional, these must:
   # - be present in one other category as well
   # - either exist or not exist on all nodes of that category (mc, vm all)
   files_opt = set([
-    constants.RAPI_USERS_FILE,
+    pathutils.RAPI_USERS_FILE,
     ])
 
   # Files which should only be on master candidates
   files_mc = set()
 
   if not redist:
-    files_mc.add(constants.CLUSTER_CONF_FILE)
+    files_mc.add(pathutils.CLUSTER_CONF_FILE)
 
   # Files which should only be on VM-capable nodes
-  files_vm = set(filename
+  files_vm = set(
+    filename
     for hv_name in cluster.enabled_hypervisors
     for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles()[0])
 
-  files_opt |= set(filename
+  files_opt |= set(
+    filename
     for hv_name in cluster.enabled_hypervisors
     for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles()[1])
 
@@ -4384,8 +4377,8 @@ def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True):
     _ComputeAncillaryFiles(cluster, True)
 
   # Never re-distribute configuration file from here
-  assert not (constants.CLUSTER_CONF_FILE in files_all or
-              constants.CLUSTER_CONF_FILE in files_vm)
+  assert not (pathutils.CLUSTER_CONF_FILE in files_all or
+              pathutils.CLUSTER_CONF_FILE in files_vm)
   assert not files_mc, "Master candidates not handled in this function"
 
   filemap = [
@@ -4754,10 +4747,10 @@ class LUOobCommand(NoHooksLU):
                     type(result.payload))
 
     if self.op.command in [
-        constants.OOB_POWER_ON,
-        constants.OOB_POWER_OFF,
-        constants.OOB_POWER_CYCLE,
-        ]:
+      constants.OOB_POWER_ON,
+      constants.OOB_POWER_OFF,
+      constants.OOB_POWER_CYCLE,
+      ]:
       if result.payload is not None:
         errs.append("%s is expected to not return payload but got '%s'" %
                     (self.op.command, result.payload))
@@ -5059,7 +5052,7 @@ class _NodeQuery(_QueryBase):
 
       node_data = lu.rpc.call_node_info(toquery_nodes, [lu.cfg.GetVGName()],
                                         [lu.cfg.GetHypervisorType()])
-      live_data = dict((name, _MakeLegacyNodeInfo(nresult.payload))
+      live_data = dict((name, rpc.MakeLegacyNodeInfo(nresult.payload))
                        for (name, nresult) in node_data.items()
                        if not nresult.fail_msg and nresult.payload)
     else:
@@ -5367,10 +5360,11 @@ class _InstanceQuery(_QueryBase):
       live_data = {}
 
     if query.IQ_DISKUSAGE in self.requested_data:
+      gmi = ganeti.masterd.instance
       disk_usage = dict((inst.name,
-                         _ComputeDiskSize(inst.disk_template,
-                                          [{constants.IDISK_SIZE: disk.size}
-                                           for disk in inst.disks]))
+                         gmi.ComputeDiskSize(inst.disk_template,
+                                             [{constants.IDISK_SIZE: disk.size}
+                                              for disk in inst.disks]))
                         for inst in instance_list)
     else:
       disk_usage = None
@@ -5633,7 +5627,7 @@ class LUNodeAdd(LogicalUnit):
     if not newbie_singlehomed:
       # check reachability from my secondary ip to newbie's secondary ip
       if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
-                           source=myself.secondary_ip):
+                              source=myself.secondary_ip):
         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
                                    " based ping to node daemon port",
                                    errors.ECODE_ENVIRON)
@@ -5811,10 +5805,10 @@ class LUNodeSetParams(LogicalUnit):
                                  errors.ECODE_INVAL)
 
     # Boolean value that tells us whether we might be demoting from MC
-    self.might_demote = (self.op.master_candidate == False or
-                         self.op.offline == True or
-                         self.op.drained == True or
-                         self.op.master_capable == False)
+    self.might_demote = (self.op.master_candidate is False or
+                         self.op.offline is True or
+                         self.op.drained is True or
+                         self.op.master_capable is False)
 
     if self.op.secondary_ip:
       if not netutils.IP4Address.IsValid(self.op.secondary_ip):
@@ -5915,7 +5909,7 @@ class LUNodeSetParams(LogicalUnit):
                                  " it a master candidate" % node.name,
                                  errors.ECODE_STATE)
 
-    if self.op.vm_capable == False:
+    if self.op.vm_capable is False:
       (ipri, isec) = self.cfg.GetNodeInstances(self.op.node_name)
       if ipri or isec:
         raise errors.OpPrereqError("Node %s hosts instances, cannot unset"
@@ -5941,7 +5935,7 @@ class LUNodeSetParams(LogicalUnit):
 
     # Check for ineffective changes
     for attr in self._FLAGS:
-      if (getattr(self.op, attr) == False and getattr(node, attr) == False):
+      if (getattr(self.op, attr) is False and getattr(node, attr) is False):
         self.LogInfo("Ignoring request to unset flag %s, already unset", attr)
         setattr(self.op, attr, None)
 
@@ -5951,24 +5945,25 @@ class LUNodeSetParams(LogicalUnit):
     # TODO: We might query the real power state if it supports OOB
     if _SupportsOob(self.cfg, node):
       if self.op.offline is False and not (node.powered or
-                                           self.op.powered == True):
+                                           self.op.powered is True):
         raise errors.OpPrereqError(("Node %s needs to be turned on before its"
                                     " offline status can be reset") %
-                                   self.op.node_name)
+                                   self.op.node_name, errors.ECODE_STATE)
     elif self.op.powered is not None:
       raise errors.OpPrereqError(("Unable to change powered state for node %s"
                                   " as it does not support out-of-band"
-                                  " handling") % self.op.node_name)
+                                  " handling") % self.op.node_name,
+                                 errors.ECODE_STATE)
 
     # If we're being deofflined/drained, we'll MC ourself if needed
-    if (self.op.drained == False or self.op.offline == False or
+    if (self.op.drained is False or self.op.offline is False or
         (self.op.master_capable and not node.master_capable)):
       if _DecideSelfPromotion(self):
         self.op.master_candidate = True
         self.LogInfo("Auto-promoting node to master candidate")
 
     # If we're no longer master capable, we'll demote ourselves from MC
-    if self.op.master_capable == False and node.master_candidate:
+    if self.op.master_capable is False and node.master_candidate:
       self.LogInfo("Demoting from master candidate")
       self.op.master_candidate = False
 
@@ -6035,10 +6030,10 @@ class LUNodeSetParams(LogicalUnit):
 
       if node.offline:
         if affected_instances:
-          raise errors.OpPrereqError("Cannot change secondary IP address:"
-                                     " offline node has instances (%s)"
-                                     " configured to use it" %
-                                     utils.CommaJoin(affected_instances.keys()))
+          msg = ("Cannot change secondary IP address: offline node has"
+                 " instances (%s) configured to use it" %
+                 utils.CommaJoin(affected_instances.keys()))
+          raise errors.OpPrereqError(msg, errors.ECODE_STATE)
       else:
         # On online nodes, check that no instances are running, and that
         # the node has the new ip and we can reach it.
@@ -6294,12 +6289,12 @@ class _ClusterQuery(_QueryBase):
       cluster = NotImplemented
 
     if query.CQ_QUEUE_DRAINED in self.requested_data:
-      drain_flag = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
+      drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
     else:
       drain_flag = NotImplemented
 
     if query.CQ_WATCHER_PAUSE in self.requested_data:
-      watcher_pause = utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE)
+      watcher_pause = utils.ReadWatcherPauseFile(pathutils.WATCHER_PAUSEFILE)
     else:
       watcher_pause = NotImplemented
 
@@ -7082,6 +7077,59 @@ class LUInstanceRecreateDisks(LogicalUnit):
     constants.IDISK_METAVG,
     ]))
 
+  def _RunAllocator(self):
+    """Run the allocator based on input opcode.
+
+    """
+    be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
+
+    # FIXME
+    # The allocator should actually run in "relocate" mode, but current
+    # allocators don't support relocating all the nodes of an instance at
+    # the same time. As a workaround we use "allocate" mode, but this is
+    # suboptimal for two reasons:
+    # - The instance name passed to the allocator is present in the list of
+    #   existing instances, so there could be a conflict within the
+    #   internal structures of the allocator. This doesn't happen with the
+    #   current allocators, but it's a liability.
+    # - The allocator counts the resources used by the instance twice: once
+    #   because the instance exists already, and once because it tries to
+    #   allocate a new instance.
+    # The allocator could choose some of the nodes on which the instance is
+    # running, but that's not a problem. If the instance nodes are broken,
+    # they should be already be marked as drained or offline, and hence
+    # skipped by the allocator. If instance disks have been lost for other
+    # reasons, then recreating the disks on the same nodes should be fine.
+    disk_template = self.instance.disk_template
+    spindle_use = be_full[constants.BE_SPINDLE_USE]
+    req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
+                                        disk_template=disk_template,
+                                        tags=list(self.instance.GetTags()),
+                                        os=self.instance.os,
+                                        nics=[{}],
+                                        vcpus=be_full[constants.BE_VCPUS],
+                                        memory=be_full[constants.BE_MAXMEM],
+                                        spindle_use=spindle_use,
+                                        disks=[{constants.IDISK_SIZE: d.size,
+                                                constants.IDISK_MODE: d.mode}
+                                                for d in self.instance.disks],
+                                        hypervisor=self.instance.hypervisor)
+    ial = iallocator.IAllocator(self.cfg, self.rpc, req)
+
+    ial.Run(self.op.iallocator)
+
+    assert req.RequiredNodes() == len(self.instance.all_nodes)
+
+    if not ial.success:
+      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
+                                 " %s" % (self.op.iallocator, ial.info),
+                                 errors.ECODE_NORES)
+
+    self.op.nodes = ial.result
+    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
+                 self.op.instance_name, self.op.iallocator,
+                 utils.CommaJoin(ial.result))
+
   def CheckArguments(self):
     if self.op.disks and ht.TPositiveInt(self.op.disks[0]):
       # Normalize and convert deprecated list of disk indices
@@ -7093,6 +7141,11 @@ class LUInstanceRecreateDisks(LogicalUnit):
                                  " once: %s" % utils.CommaJoin(duplicates),
                                  errors.ECODE_INVAL)
 
+    # We don't want _CheckIAllocatorOrNode selecting the default iallocator
+    # when neither iallocator nor nodes are specified
+    if self.op.iallocator or self.op.nodes:
+      _CheckIAllocatorOrNode(self, "iallocator", "nodes")
+
     for (idx, params) in self.op.disks:
       utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
       unsupported = frozenset(params.keys()) - self._MODIFYABLE
@@ -7110,14 +7163,40 @@ class LUInstanceRecreateDisks(LogicalUnit):
       self.needed_locks[locking.LEVEL_NODE] = list(self.op.nodes)
     else:
       self.needed_locks[locking.LEVEL_NODE] = []
+      if self.op.iallocator:
+        # iallocator will select a new node in the same group
+        self.needed_locks[locking.LEVEL_NODEGROUP] = []
     self.needed_locks[locking.LEVEL_NODE_RES] = []
 
   def DeclareLocks(self, level):
-    if level == locking.LEVEL_NODE:
-      # if we replace the nodes, we only need to lock the old primary,
-      # otherwise we need to lock all nodes for disk re-creation
-      primary_only = bool(self.op.nodes)
-      self._LockInstancesNodes(primary_only=primary_only)
+    if level == locking.LEVEL_NODEGROUP:
+      assert self.op.iallocator is not None
+      assert not self.op.nodes
+      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
+      self.share_locks[locking.LEVEL_NODEGROUP] = 1
+      # Lock the primary group used by the instance optimistically; this
+      # requires going via the node before it's locked, requiring
+      # verification later on
+      self.needed_locks[locking.LEVEL_NODEGROUP] = \
+        self.cfg.GetInstanceNodeGroups(self.op.instance_name, primary_only=True)
+
+    elif level == locking.LEVEL_NODE:
+      # If an allocator is used, then we lock all the nodes in the current
+      # instance group, as we don't know yet which ones will be selected;
+      # if we replace the nodes without using an allocator, locks are
+      # already declared in ExpandNames; otherwise, we need to lock all the
+      # instance nodes for disk re-creation
+      if self.op.iallocator:
+        assert not self.op.nodes
+        assert not self.needed_locks[locking.LEVEL_NODE]
+        assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1
+
+        # Lock member nodes of the group of the primary node
+        for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
+          self.needed_locks[locking.LEVEL_NODE].extend(
+            self.cfg.GetNodeGroup(group_uuid).members)
+      elif not self.op.nodes:
+        self._LockInstancesNodes(primary_only=False)
     elif level == locking.LEVEL_NODE_RES:
       # Copy node locks
       self.needed_locks[locking.LEVEL_NODE_RES] = \
@@ -7161,18 +7240,25 @@ class LUInstanceRecreateDisks(LogicalUnit):
       primary_node = self.op.nodes[0]
     else:
       primary_node = instance.primary_node
-    _CheckNodeOnline(self, primary_node)
+    if not self.op.iallocator:
+      _CheckNodeOnline(self, primary_node)
 
     if instance.disk_template == constants.DT_DISKLESS:
       raise errors.OpPrereqError("Instance '%s' has no disks" %
                                  self.op.instance_name, errors.ECODE_INVAL)
 
+    # Verify if node group locks are still correct
+    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
+    if owned_groups:
+      # Node group locks are acquired only for the primary node (and only
+      # when the allocator is used)
+      _CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups,
+                               primary_only=True)
+
     # if we replace nodes *and* the old primary is offline, we don't
-    # check
-    assert instance.primary_node in self.owned_locks(locking.LEVEL_NODE)
-    assert instance.primary_node in self.owned_locks(locking.LEVEL_NODE_RES)
+    # check the instance state
     old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
-    if not (self.op.nodes and old_pnode.offline):
+    if not ((self.op.iallocator or self.op.nodes) and old_pnode.offline):
       _CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
                           msg="cannot recreate disks")
 
@@ -7186,7 +7272,7 @@ class LUInstanceRecreateDisks(LogicalUnit):
       raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
                                  errors.ECODE_INVAL)
 
-    if (self.op.nodes and
+    if ((self.op.nodes or self.op.iallocator) and
         sorted(self.disks.keys()) != range(len(instance.disks))):
       raise errors.OpPrereqError("Can't recreate disks partially and"
                                  " change the nodes at the same time",
@@ -7194,6 +7280,12 @@ class LUInstanceRecreateDisks(LogicalUnit):
 
     self.instance = instance
 
+    if self.op.iallocator:
+      self._RunAllocator()
+      # Release unneeded node and node resource locks
+      _ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.nodes)
+      _ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.nodes)
+
   def Exec(self, feedback_fn):
     """Recreate the disks.
 
@@ -7250,6 +7342,9 @@ class LUInstanceRecreateDisks(LogicalUnit):
     if self.op.nodes:
       self.cfg.Update(instance, feedback_fn)
 
+    # All touched nodes must be locked
+    mylocks = self.owned_locks(locking.LEVEL_NODE)
+    assert mylocks.issuperset(frozenset(instance.all_nodes))
     _CreateDisks(self, instance, to_skip=to_skip)
 
 
@@ -7746,8 +7841,9 @@ class LUInstanceMove(LogicalUnit):
     _CheckNodeOnline(self, target_node)
     _CheckNodeNotDrained(self, target_node)
     _CheckNodeVmCapable(self, target_node)
-    ipolicy = _CalculateGroupIPolicy(self.cfg.GetClusterInfo(),
-                                     self.cfg.GetNodeGroup(node.group))
+    cluster = self.cfg.GetClusterInfo()
+    group_info = self.cfg.GetNodeGroup(node.group)
+    ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster, group_info)
     _CheckTargetNodeIPolicy(self, ipolicy, instance, node,
                             ignore=self.op.ignore_ipolicy)
 
@@ -8023,7 +8119,8 @@ class TLMigrateInstance(Tasklet):
       # Check that the target node is correct in terms of instance policy
       nodeinfo = self.cfg.GetNodeInfo(self.target_node)
       group_info = self.cfg.GetNodeGroup(nodeinfo.group)
-      ipolicy = _CalculateGroupIPolicy(cluster, group_info)
+      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
+                                                              group_info)
       _CheckTargetNodeIPolicy(self.lu, ipolicy, instance, nodeinfo,
                               ignore=self.ignore_ipolicy)
 
@@ -8033,7 +8130,8 @@ class TLMigrateInstance(Tasklet):
       if self.target_node == instance.primary_node:
         raise errors.OpPrereqError("Cannot migrate instance %s"
                                    " to its primary (%s)" %
-                                   (instance.name, instance.primary_node))
+                                   (instance.name, instance.primary_node),
+                                   errors.ECODE_STATE)
 
       if len(self.lu.tasklets) == 1:
         # It is safe to release locks only when we're the only tasklet
@@ -8062,7 +8160,8 @@ class TLMigrateInstance(Tasklet):
                                    errors.ECODE_INVAL)
       nodeinfo = self.cfg.GetNodeInfo(target_node)
       group_info = self.cfg.GetNodeGroup(nodeinfo.group)
-      ipolicy = _CalculateGroupIPolicy(cluster, group_info)
+      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
+                                                              group_info)
       _CheckTargetNodeIPolicy(self.lu, ipolicy, instance, nodeinfo,
                               ignore=self.ignore_ipolicy)
 
@@ -8083,14 +8182,9 @@ class TLMigrateInstance(Tasklet):
     # check if failover must be forced instead of migration
     if (not self.cleanup and not self.failover and
         i_be[constants.BE_ALWAYS_FAILOVER]):
-      if self.fallback:
-        self.lu.LogInfo("Instance configured to always failover; fallback"
-                        " to failover")
-        self.failover = True
-      else:
-        raise errors.OpPrereqError("This instance has been configured to"
-                                   " always failover, please allow failover",
-                                   errors.ECODE_STATE)
+      self.lu.LogInfo("Instance configured to always failover; fallback"
+                      " to failover")
+      self.failover = True
 
     # check bridge existance
     _CheckInstanceBridgesExist(self.lu, instance, node=target_node)
@@ -8148,11 +8242,9 @@ class TLMigrateInstance(Tasklet):
 
     """
     # FIXME: add a self.ignore_ipolicy option
-    ial = IAllocator(self.cfg, self.rpc,
-                     mode=constants.IALLOCATOR_MODE_RELOC,
-                     name=self.instance_name,
-                     relocate_from=[self.instance.primary_node],
-                     )
+    req = iallocator.IAReqRelocate(name=self.instance_name,
+                                   relocate_from=[self.instance.primary_node])
+    ial = iallocator.IAllocator(self.cfg, self.rpc, req)
 
     ial.Run(self.lu.op.iallocator)
 
@@ -8161,15 +8253,10 @@ class TLMigrateInstance(Tasklet):
                                  " iallocator '%s': %s" %
                                  (self.lu.op.iallocator, ial.info),
                                  errors.ECODE_NORES)
-    if len(ial.result) != ial.required_nodes:
-      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
-                                 " of nodes (%s), required %s" %
-                                 (self.lu.op.iallocator, len(ial.result),
-                                  ial.required_nodes), errors.ECODE_FAULT)
     self.target_node = ial.result[0]
     self.lu.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
-                 self.instance_name, self.lu.op.iallocator,
-                 utils.CommaJoin(ial.result))
+                    self.instance_name, self.lu.op.iallocator,
+                    utils.CommaJoin(ial.result))
 
   def _WaitUntilSync(self):
     """Poll with custom rpc for disk sync.
@@ -8339,8 +8426,8 @@ class TLMigrateInstance(Tasklet):
       # Don't raise an exception here, as we stil have to try to revert the
       # disk status, even if this step failed.
 
-    abort_result = self.rpc.call_instance_finalize_migration_src(source_node,
-        instance, False, self.live)
+    abort_result = self.rpc.call_instance_finalize_migration_src(
+      source_node, instance, False, self.live)
     abort_msg = abort_result.fail_msg
     if abort_msg:
       logging.error("Aborting migration failed on source node %s: %s",
@@ -8747,7 +8834,8 @@ def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names,
   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
                           logical_id=(vgnames[0], names[0]),
                           params={})
-  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=DRBD_META_SIZE,
+  dev_meta = objects.Disk(dev_type=constants.LD_LV,
+                          size=constants.DRBD_META_SIZE,
                           logical_id=(vgnames[1], names[1]),
                           params={})
   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
@@ -8774,10 +8862,11 @@ _DISK_TEMPLATE_DEVICE_TYPE = {
   }
 
 
-def _GenerateDiskTemplate(lu, template_name, instance_name, primary_node,
-    secondary_nodes, disk_info, file_storage_dir, file_driver, base_index,
-    feedback_fn, full_disk_params, _req_file_storage=opcodes.RequireFileStorage,
-    _req_shr_file_storage=opcodes.RequireSharedFileStorage):
+def _GenerateDiskTemplate(
+  lu, template_name, instance_name, primary_node, secondary_nodes,
+  disk_info, file_storage_dir, file_driver, base_index,
+  feedback_fn, full_disk_params, _req_file_storage=opcodes.RequireFileStorage,
+  _req_shr_file_storage=opcodes.RequireSharedFileStorage):
   """Generate the entire disk layout for a given template type.
 
   """
@@ -8888,7 +8977,7 @@ def _CalcEta(time_taken, written, total_size):
   return (total_size - written) * avg_time
 
 
-def _WipeDisks(lu, instance):
+def _WipeDisks(lu, instance, disks=None):
   """Wipes instance disks.
 
   @type lu: L{LogicalUnit}
@@ -8900,66 +8989,85 @@ def _WipeDisks(lu, instance):
   """
   node = instance.primary_node
 
-  for device in instance.disks:
+  if disks is None:
+    disks = [(idx, disk, 0)
+             for (idx, disk) in enumerate(instance.disks)]
+
+  for (_, device, _) in disks:
     lu.cfg.SetDiskID(device, node)
 
-  logging.info("Pause sync of instance %s disks", instance.name)
+  logging.info("Pausing synchronization of disks of instance '%s'",
+               instance.name)
   result = lu.rpc.call_blockdev_pause_resume_sync(node,
-                                                  (instance.disks, instance),
+                                                  (map(compat.snd, disks),
+                                                   instance),
                                                   True)
+  result.Raise("Failed to pause disk synchronization on node '%s'" % node)
 
   for idx, success in enumerate(result.payload):
     if not success:
-      logging.warn("pause-sync of instance %s for disks %d failed",
-                   instance.name, idx)
+      logging.warn("Pausing synchronization of disk %s of instance '%s'"
+                   " failed", idx, instance.name)
 
   try:
-    for idx, device in enumerate(instance.disks):
+    for (idx, device, offset) in disks:
       # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
-      # MAX_WIPE_CHUNK at max
-      wipe_chunk_size = min(constants.MAX_WIPE_CHUNK, device.size / 100.0 *
-                            constants.MIN_WIPE_CHUNK_PERCENT)
-      # we _must_ make this an int, otherwise rounding errors will
-      # occur
-      wipe_chunk_size = int(wipe_chunk_size)
-
-      lu.LogInfo("* Wiping disk %d", idx)
-      logging.info("Wiping disk %d for instance %s, node %s using"
-                   " chunk size %s", idx, instance.name, node, wipe_chunk_size)
+      # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors.
+      wipe_chunk_size = \
+        int(min(constants.MAX_WIPE_CHUNK,
+                device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT))
 
-      offset = 0
       size = device.size
       last_output = 0
       start_time = time.time()
 
+      if offset == 0:
+        info_text = ""
+      else:
+        info_text = (" (from %s to %s)" %
+                     (utils.FormatUnit(offset, "h"),
+                      utils.FormatUnit(size, "h")))
+
+      lu.LogInfo("* Wiping disk %s%s", idx, info_text)
+
+      logging.info("Wiping disk %d for instance %s on node %s using"
+                   " chunk size %s", idx, instance.name, node, wipe_chunk_size)
+
       while offset < size:
         wipe_size = min(wipe_chunk_size, size - offset)
+
         logging.debug("Wiping disk %d, offset %s, chunk %s",
                       idx, offset, wipe_size)
+
         result = lu.rpc.call_blockdev_wipe(node, (device, instance), offset,
                                            wipe_size)
         result.Raise("Could not wipe disk %d at offset %d for size %d" %
                      (idx, offset, wipe_size))
+
         now = time.time()
         offset += wipe_size
         if now - last_output >= 60:
           eta = _CalcEta(now - start_time, offset, size)
-          lu.LogInfo(" - done: %.1f%% ETA: %s" %
-                     (offset / float(size) * 100, utils.FormatSeconds(eta)))
+          lu.LogInfo(" - done: %.1f%% ETA: %s",
+                     offset / float(size) * 100, utils.FormatSeconds(eta))
           last_output = now
   finally:
-    logging.info("Resume sync of instance %s disks", instance.name)
+    logging.info("Resuming synchronization of disks for instance '%s'",
+                 instance.name)
 
     result = lu.rpc.call_blockdev_pause_resume_sync(node,
-                                                    (instance.disks, instance),
+                                                    (map(compat.snd, disks),
+                                                     instance),
                                                     False)
 
-    for idx, success in enumerate(result.payload):
-      if not success:
-        lu.LogWarning("Resume sync of disk %d failed, please have a"
-                      " look at the status and troubleshoot the issue", idx)
-        logging.warn("resume-sync of instance %s for disks %d failed",
-                     instance.name, idx)
+    if result.fail_msg:
+      lu.LogWarning("Failed to resume disk synchronization on node '%s': %s",
+                    node, result.fail_msg)
+    else:
+      for idx, success in enumerate(result.payload):
+        if not success:
+          lu.LogWarning("Resuming synchronization of disk %s of instance '%s'"
+                        " failed", idx, instance.name)
 
 
 def _CreateDisks(lu, instance, to_skip=None, target_node=None):
@@ -9086,7 +9194,7 @@ def _ComputeDiskSizePerVG(disk_template, disks):
     constants.DT_DISKLESS: {},
     constants.DT_PLAIN: _compute(disks, 0),
     # 128 MB are added for drbd metadata for each disk
-    constants.DT_DRBD8: _compute(disks, DRBD_META_SIZE),
+    constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE),
     constants.DT_FILE: {},
     constants.DT_SHARED_FILE: {},
   }
@@ -9098,30 +9206,6 @@ def _ComputeDiskSizePerVG(disk_template, disks):
   return req_size_dict[disk_template]
 
 
-def _ComputeDiskSize(disk_template, disks):
-  """Compute disk size requirements in the volume group
-
-  """
-  # Required free disk space as a function of disk and swap space
-  req_size_dict = {
-    constants.DT_DISKLESS: None,
-    constants.DT_PLAIN: sum(d[constants.IDISK_SIZE] for d in disks),
-    # 128 MB are added for drbd metadata for each disk
-    constants.DT_DRBD8:
-      sum(d[constants.IDISK_SIZE] + DRBD_META_SIZE for d in disks),
-    constants.DT_FILE: None,
-    constants.DT_SHARED_FILE: 0,
-    constants.DT_BLOCK: 0,
-    constants.DT_RBD: 0,
-  }
-
-  if disk_template not in req_size_dict:
-    raise errors.ProgrammerError("Disk template '%s' size requirement"
-                                 " is unknown" % disk_template)
-
-  return req_size_dict[disk_template]
-
-
 def _FilterVmNodes(lu, nodenames):
   """Filters out non-vm_capable nodes from a list.
 
@@ -9197,6 +9281,163 @@ def _CheckOSParams(lu, required, nodenames, osname, osparams):
                  osname, node)
 
 
+def _CreateInstanceAllocRequest(op, disks, nics, beparams):
+  """Wrapper around IAReqInstanceAlloc.
+
+  @param op: The instance opcode
+  @param disks: The computed disks
+  @param nics: The computed nics
+  @param beparams: The full filled beparams
+
+  @returns: A filled L{iallocator.IAReqInstanceAlloc}
+
+  """
+  spindle_use = beparams[constants.BE_SPINDLE_USE]
+  return iallocator.IAReqInstanceAlloc(name=op.instance_name,
+                                       disk_template=op.disk_template,
+                                       tags=op.tags,
+                                       os=op.os_type,
+                                       vcpus=beparams[constants.BE_VCPUS],
+                                       memory=beparams[constants.BE_MAXMEM],
+                                       spindle_use=spindle_use,
+                                       disks=disks,
+                                       nics=[n.ToDict() for n in nics],
+                                       hypervisor=op.hypervisor)
+
+
+def _ComputeNics(op, cluster, default_ip, cfg, proc):
+  """Computes the nics.
+
+  @param op: The instance opcode
+  @param cluster: Cluster configuration object
+  @param default_ip: The default ip to assign
+  @param cfg: An instance of the configuration object
+  @param proc: The executer instance
+
+  @returns: The build up nics
+
+  """
+  nics = []
+  for idx, nic in enumerate(op.nics):
+    nic_mode_req = nic.get(constants.INIC_MODE, None)
+    nic_mode = nic_mode_req
+    if nic_mode is None or nic_mode == constants.VALUE_AUTO:
+      nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
+
+    # in routed mode, for the first nic, the default ip is 'auto'
+    if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
+      default_ip_mode = constants.VALUE_AUTO
+    else:
+      default_ip_mode = constants.VALUE_NONE
+
+    # ip validity checks
+    ip = nic.get(constants.INIC_IP, default_ip_mode)
+    if ip is None or ip.lower() == constants.VALUE_NONE:
+      nic_ip = None
+    elif ip.lower() == constants.VALUE_AUTO:
+      if not op.name_check:
+        raise errors.OpPrereqError("IP address set to auto but name checks"
+                                   " have been skipped",
+                                   errors.ECODE_INVAL)
+      nic_ip = default_ip
+    else:
+      if not netutils.IPAddress.IsValid(ip):
+        raise errors.OpPrereqError("Invalid IP address '%s'" % ip,
+                                   errors.ECODE_INVAL)
+      nic_ip = ip
+
+    # TODO: check the ip address for uniqueness
+    if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
+      raise errors.OpPrereqError("Routed nic mode requires an ip address",
+                                 errors.ECODE_INVAL)
+
+    # MAC address verification
+    mac = nic.get(constants.INIC_MAC, constants.VALUE_AUTO)
+    if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
+      mac = utils.NormalizeAndValidateMac(mac)
+
+      try:
+        # TODO: We need to factor this out
+        cfg.ReserveMAC(mac, proc.GetECId())
+      except errors.ReservationError:
+        raise errors.OpPrereqError("MAC address %s already in use"
+                                   " in cluster" % mac,
+                                   errors.ECODE_NOTUNIQUE)
+
+    #  Build nic parameters
+    link = nic.get(constants.INIC_LINK, None)
+    if link == constants.VALUE_AUTO:
+      link = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_LINK]
+    nicparams = {}
+    if nic_mode_req:
+      nicparams[constants.NIC_MODE] = nic_mode
+    if link:
+      nicparams[constants.NIC_LINK] = link
+
+    check_params = cluster.SimpleFillNIC(nicparams)
+    objects.NIC.CheckParameterSyntax(check_params)
+    nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
+
+  return nics
+
+
+def _ComputeDisks(op, default_vg):
+  """Computes the instance disks.
+
+  @param op: The instance opcode
+  @param default_vg: The default_vg to assume
+
+  @return: The computer disks
+
+  """
+  disks = []
+  for disk in op.disks:
+    mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
+    if mode not in constants.DISK_ACCESS_SET:
+      raise errors.OpPrereqError("Invalid disk access mode '%s'" %
+                                 mode, errors.ECODE_INVAL)
+    size = disk.get(constants.IDISK_SIZE, None)
+    if size is None:
+      raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
+    try:
+      size = int(size)
+    except (TypeError, ValueError):
+      raise errors.OpPrereqError("Invalid disk size '%s'" % size,
+                                 errors.ECODE_INVAL)
+
+    data_vg = disk.get(constants.IDISK_VG, default_vg)
+    new_disk = {
+      constants.IDISK_SIZE: size,
+      constants.IDISK_MODE: mode,
+      constants.IDISK_VG: data_vg,
+      }
+    if constants.IDISK_METAVG in disk:
+      new_disk[constants.IDISK_METAVG] = disk[constants.IDISK_METAVG]
+    if constants.IDISK_ADOPT in disk:
+      new_disk[constants.IDISK_ADOPT] = disk[constants.IDISK_ADOPT]
+    disks.append(new_disk)
+
+  return disks
+
+
+def _ComputeFullBeParams(op, cluster):
+  """Computes the full beparams.
+
+  @param op: The instance opcode
+  @param cluster: The cluster config object
+
+  @return: The fully filled beparams
+
+  """
+  default_beparams = cluster.beparams[constants.PP_DEFAULT]
+  for param, value in op.beparams.iteritems():
+    if value == constants.VALUE_AUTO:
+      op.beparams[param] = default_beparams[param]
+  objects.UpgradeBeParams(op.beparams)
+  utils.ForceDictType(op.beparams, constants.BES_PARAMETER_TYPES)
+  return cluster.SimpleFillBE(op.beparams)
+
+
 class LUInstanceCreate(LogicalUnit):
   """Create an instance.
 
@@ -9415,26 +9656,15 @@ class LUInstanceCreate(LogicalUnit):
           self.needed_locks[locking.LEVEL_NODE].append(src_node)
         if not os.path.isabs(src_path):
           self.op.src_path = src_path = \
-            utils.PathJoin(constants.EXPORT_DIR, src_path)
+            utils.PathJoin(pathutils.EXPORT_DIR, src_path)
 
   def _RunAllocator(self):
     """Run the allocator based on input opcode.
 
     """
-    nics = [n.ToDict() for n in self.nics]
-    ial = IAllocator(self.cfg, self.rpc,
-                     mode=constants.IALLOCATOR_MODE_ALLOC,
-                     name=self.op.instance_name,
-                     disk_template=self.op.disk_template,
-                     tags=self.op.tags,
-                     os=self.op.os_type,
-                     vcpus=self.be_full[constants.BE_VCPUS],
-                     memory=self.be_full[constants.BE_MAXMEM],
-                     spindle_use=self.be_full[constants.BE_SPINDLE_USE],
-                     disks=self.disks,
-                     nics=nics,
-                     hypervisor=self.op.hypervisor,
-                     )
+    req = _CreateInstanceAllocRequest(self.op, self.disks,
+                                      self.nics, self.be_full)
+    ial = iallocator.IAllocator(self.cfg, self.rpc, req)
 
     ial.Run(self.op.iallocator)
 
@@ -9443,16 +9673,14 @@ class LUInstanceCreate(LogicalUnit):
                                  " iallocator '%s': %s" %
                                  (self.op.iallocator, ial.info),
                                  errors.ECODE_NORES)
-    if len(ial.result) != ial.required_nodes:
-      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
-                                 " of nodes (%s), required %s" %
-                                 (self.op.iallocator, len(ial.result),
-                                  ial.required_nodes), errors.ECODE_FAULT)
     self.op.pnode = ial.result[0]
     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
                  self.op.instance_name, self.op.iallocator,
                  utils.CommaJoin(ial.result))
-    if ial.required_nodes == 2:
+
+    assert req.RequiredNodes() in (1, 2), "Wrong node count from iallocator"
+
+    if req.RequiredNodes() == 2:
       self.op.snode = ial.result[1]
 
   def BuildHooksEnv(self):
@@ -9521,7 +9749,7 @@ class LUInstanceCreate(LogicalUnit):
         if src_path in exp_list[node].payload:
           found = True
           self.op.src_node = src_node = node
-          self.op.src_path = src_path = utils.PathJoin(constants.EXPORT_DIR,
+          self.op.src_path = src_path = utils.PathJoin(pathutils.EXPORT_DIR,
                                                        src_path)
           break
       if not found:
@@ -9561,7 +9789,9 @@ class LUInstanceCreate(LogicalUnit):
         if self.op.disk_template not in constants.DISK_TEMPLATES:
           raise errors.OpPrereqError("Disk template specified in configuration"
                                      " file is not one of the allowed values:"
-                                     " %s" % " ".join(constants.DISK_TEMPLATES))
+                                     " %s" %
+                                     " ".join(constants.DISK_TEMPLATES),
+                                     errors.ECODE_INVAL)
       else:
         raise errors.OpPrereqError("No disk template specified and the export"
                                    " is missing the disk_template information",
@@ -9674,7 +9904,8 @@ class LUInstanceCreate(LogicalUnit):
 
       cfg_storagedir = get_fsd_fn()
       if not cfg_storagedir:
-        raise errors.OpPrereqError("Cluster file storage dir not defined")
+        raise errors.OpPrereqError("Cluster file storage dir not defined",
+                                   errors.ECODE_STATE)
       joinargs.append(cfg_storagedir)
 
       if self.op.file_storage_dir is not None:
@@ -9711,8 +9942,8 @@ class LUInstanceCreate(LogicalUnit):
     enabled_hvs = cluster.enabled_hypervisors
     if self.op.hypervisor not in enabled_hvs:
       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
-                                 " cluster (%s)" % (self.op.hypervisor,
-                                  ",".join(enabled_hvs)),
+                                 " cluster (%s)" %
+                                 (self.op.hypervisor, ",".join(enabled_hvs)),
                                  errors.ECODE_STATE)
 
     # Check tag validity
@@ -9730,13 +9961,7 @@ class LUInstanceCreate(LogicalUnit):
     _CheckGlobalHvParams(self.op.hvparams)
 
     # fill and remember the beparams dict
-    default_beparams = cluster.beparams[constants.PP_DEFAULT]
-    for param, value in self.op.beparams.iteritems():
-      if value == constants.VALUE_AUTO:
-        self.op.beparams[param] = default_beparams[param]
-    objects.UpgradeBeParams(self.op.beparams)
-    utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
-    self.be_full = cluster.SimpleFillBE(self.op.beparams)
+    self.be_full = _ComputeFullBeParams(self.op, cluster)
 
     # build os parameters
     self.os_full = cluster.SimpleFillOS(self.op.os_type, self.op.osparams)
@@ -9747,94 +9972,12 @@ class LUInstanceCreate(LogicalUnit):
       self._RevertToDefaults(cluster)
 
     # NIC buildup
-    self.nics = []
-    for idx, nic in enumerate(self.op.nics):
-      nic_mode_req = nic.get(constants.INIC_MODE, None)
-      nic_mode = nic_mode_req
-      if nic_mode is None or nic_mode == constants.VALUE_AUTO:
-        nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
-
-      # in routed mode, for the first nic, the default ip is 'auto'
-      if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
-        default_ip_mode = constants.VALUE_AUTO
-      else:
-        default_ip_mode = constants.VALUE_NONE
-
-      # ip validity checks
-      ip = nic.get(constants.INIC_IP, default_ip_mode)
-      if ip is None or ip.lower() == constants.VALUE_NONE:
-        nic_ip = None
-      elif ip.lower() == constants.VALUE_AUTO:
-        if not self.op.name_check:
-          raise errors.OpPrereqError("IP address set to auto but name checks"
-                                     " have been skipped",
-                                     errors.ECODE_INVAL)
-        nic_ip = self.hostname1.ip
-      else:
-        if not netutils.IPAddress.IsValid(ip):
-          raise errors.OpPrereqError("Invalid IP address '%s'" % ip,
-                                     errors.ECODE_INVAL)
-        nic_ip = ip
-
-      # TODO: check the ip address for uniqueness
-      if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
-        raise errors.OpPrereqError("Routed nic mode requires an ip address",
-                                   errors.ECODE_INVAL)
-
-      # MAC address verification
-      mac = nic.get(constants.INIC_MAC, constants.VALUE_AUTO)
-      if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
-        mac = utils.NormalizeAndValidateMac(mac)
-
-        try:
-          self.cfg.ReserveMAC(mac, self.proc.GetECId())
-        except errors.ReservationError:
-          raise errors.OpPrereqError("MAC address %s already in use"
-                                     " in cluster" % mac,
-                                     errors.ECODE_NOTUNIQUE)
-
-      #  Build nic parameters
-      link = nic.get(constants.INIC_LINK, None)
-      if link == constants.VALUE_AUTO:
-        link = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_LINK]
-      nicparams = {}
-      if nic_mode_req:
-        nicparams[constants.NIC_MODE] = nic_mode
-      if link:
-        nicparams[constants.NIC_LINK] = link
-
-      check_params = cluster.SimpleFillNIC(nicparams)
-      objects.NIC.CheckParameterSyntax(check_params)
-      self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
+    self.nics = _ComputeNics(self.op, cluster, self.hostname1.ip, self.cfg,
+                             self.proc)
 
     # disk checks/pre-build
     default_vg = self.cfg.GetVGName()
-    self.disks = []
-    for disk in self.op.disks:
-      mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
-      if mode not in constants.DISK_ACCESS_SET:
-        raise errors.OpPrereqError("Invalid disk access mode '%s'" %
-                                   mode, errors.ECODE_INVAL)
-      size = disk.get(constants.IDISK_SIZE, None)
-      if size is None:
-        raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
-      try:
-        size = int(size)
-      except (TypeError, ValueError):
-        raise errors.OpPrereqError("Invalid disk size '%s'" % size,
-                                   errors.ECODE_INVAL)
-
-      data_vg = disk.get(constants.IDISK_VG, default_vg)
-      new_disk = {
-        constants.IDISK_SIZE: size,
-        constants.IDISK_MODE: mode,
-        constants.IDISK_VG: data_vg,
-        }
-      if constants.IDISK_METAVG in disk:
-        new_disk[constants.IDISK_METAVG] = disk[constants.IDISK_METAVG]
-      if constants.IDISK_ADOPT in disk:
-        new_disk[constants.IDISK_ADOPT] = disk[constants.IDISK_ADOPT]
-      self.disks.append(new_disk)
+    self.disks = _ComputeDisks(self.op, default_vg)
 
     if self.op.mode == constants.INSTANCE_IMPORT:
       disk_images = []
@@ -9939,13 +10082,12 @@ class LUInstanceCreate(LogicalUnit):
       }
 
     group_info = self.cfg.GetNodeGroup(pnode.group)
-    ipolicy = _CalculateGroupIPolicy(cluster, group_info)
+    ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster, group_info)
     res = _ComputeIPolicyInstanceSpecViolation(ipolicy, ispec)
     if not self.op.ignore_ipolicy and res:
-      raise errors.OpPrereqError(("Instance allocation to group %s violates"
-                                  " policy: %s") % (pnode.group,
-                                                    utils.CommaJoin(res)),
-                                  errors.ECODE_INVAL)
+      msg = ("Instance allocation to group %s (%s) violates policy: %s" %
+             (pnode.group, group_info.name, utils.CommaJoin(res)))
+      raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
 
     if not self.adopt_disks:
       if self.op.disk_template == constants.DT_RBD:
@@ -10287,6 +10429,137 @@ class LUInstanceCreate(LogicalUnit):
     return list(iobj.all_nodes)
 
 
+class LUInstanceMultiAlloc(NoHooksLU):
+  """Allocates multiple instances at the same time.
+
+  """
+  REQ_BGL = False
+
+  def CheckArguments(self):
+    """Check arguments.
+
+    """
+    nodes = []
+    for inst in self.op.instances:
+      if inst.iallocator is not None:
+        raise errors.OpPrereqError("iallocator are not allowed to be set on"
+                                   " instance objects", errors.ECODE_INVAL)
+      nodes.append(bool(inst.pnode))
+      if inst.disk_template in constants.DTS_INT_MIRROR:
+        nodes.append(bool(inst.snode))
+
+    has_nodes = compat.any(nodes)
+    if compat.all(nodes) ^ has_nodes:
+      raise errors.OpPrereqError("There are instance objects providing"
+                                 " pnode/snode while others do not",
+                                 errors.ECODE_INVAL)
+
+    if self.op.iallocator is None:
+      default_iallocator = self.cfg.GetDefaultIAllocator()
+      if default_iallocator and has_nodes:
+        self.op.iallocator = default_iallocator
+      else:
+        raise errors.OpPrereqError("No iallocator or nodes on the instances"
+                                   " given and no cluster-wide default"
+                                   " iallocator found; please specify either"
+                                   " an iallocator or nodes on the instances"
+                                   " or set a cluster-wide default iallocator",
+                                   errors.ECODE_INVAL)
+
+    dups = utils.FindDuplicates([op.instance_name for op in self.op.instances])
+    if dups:
+      raise errors.OpPrereqError("There are duplicate instance names: %s" %
+                                 utils.CommaJoin(dups), errors.ECODE_INVAL)
+
+  def ExpandNames(self):
+    """Calculate the locks.
+
+    """
+    self.share_locks = _ShareAll()
+    self.needed_locks = {}
+
+    if self.op.iallocator:
+      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+      self.needed_locks[locking.LEVEL_NODE_RES] = locking.ALL_SET
+    else:
+      nodeslist = []
+      for inst in self.op.instances:
+        inst.pnode = _ExpandNodeName(self.cfg, inst.pnode)
+        nodeslist.append(inst.pnode)
+        if inst.snode is not None:
+          inst.snode = _ExpandNodeName(self.cfg, inst.snode)
+          nodeslist.append(inst.snode)
+
+      self.needed_locks[locking.LEVEL_NODE] = nodeslist
+      # Lock resources of instance's primary and secondary nodes (copy to
+      # prevent accidential modification)
+      self.needed_locks[locking.LEVEL_NODE_RES] = list(nodeslist)
+
+  def CheckPrereq(self):
+    """Check prerequisite.
+
+    """
+    cluster = self.cfg.GetClusterInfo()
+    default_vg = self.cfg.GetVGName()
+    insts = [_CreateInstanceAllocRequest(op, _ComputeDisks(op, default_vg),
+                                         _ComputeNics(op, cluster, None,
+                                                      self.cfg, self.proc),
+                                         _ComputeFullBeParams(op, cluster))
+             for op in self.op.instances]
+    req = iallocator.IAReqMultiInstanceAlloc(instances=insts)
+    ial = iallocator.IAllocator(self.cfg, self.rpc, req)
+
+    ial.Run(self.op.iallocator)
+
+    if not ial.success:
+      raise errors.OpPrereqError("Can't compute nodes using"
+                                 " iallocator '%s': %s" %
+                                 (self.op.iallocator, ial.info),
+                                 errors.ECODE_NORES)
+
+    self.ia_result = ial.result
+
+    if self.op.dry_run:
+      self.dry_run_rsult = objects.FillDict(self._ConstructPartialResult(), {
+        constants.JOB_IDS_KEY: [],
+        })
+
+  def _ConstructPartialResult(self):
+    """Contructs the partial result.
+
+    """
+    (allocatable, failed) = self.ia_result
+    return {
+      opcodes.OpInstanceMultiAlloc.ALLOCATABLE_KEY:
+        map(compat.fst, allocatable),
+      opcodes.OpInstanceMultiAlloc.FAILED_KEY: failed,
+      }
+
+  def Exec(self, feedback_fn):
+    """Executes the opcode.
+
+    """
+    op2inst = dict((op.instance_name, op) for op in self.op.instances)
+    (allocatable, failed) = self.ia_result
+
+    jobs = []
+    for (name, nodes) in allocatable:
+      op = op2inst.pop(name)
+
+      if len(nodes) > 1:
+        (op.pnode, op.snode) = nodes
+      else:
+        (op.pnode,) = nodes
+
+      jobs.append([op])
+
+    missing = set(op2inst.keys()) - set(failed)
+    assert not missing, \
+      "Iallocator did return incomplete result: %s" % utils.CommaJoin(missing)
+
+    return ResultWithJobs(jobs, **self._ConstructPartialResult())
+
+
 def _CheckRADOSFreeSpace():
   """Compute disk size requirements inside the RADOS cluster.
 
@@ -10433,9 +10706,10 @@ class LUInstanceReplaceDisks(LogicalUnit):
         assert not self.needed_locks[locking.LEVEL_NODE]
 
         # Lock member nodes of all locked groups
-        self.needed_locks[locking.LEVEL_NODE] = [node_name
-          for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
-          for node_name in self.cfg.GetNodeGroup(group_uuid).members]
+        self.needed_locks[locking.LEVEL_NODE] = \
+            [node_name
+             for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
+             for node_name in self.cfg.GetNodeGroup(group_uuid).members]
       else:
         self._LockInstancesNodes()
     elif level == locking.LEVEL_NODE_RES:
@@ -10518,22 +10792,22 @@ class TLReplaceDisks(Tasklet):
     self.node_secondary_ip = None
 
   @staticmethod
-  def CheckArguments(mode, remote_node, iallocator):
+  def CheckArguments(mode, remote_node, ialloc):
     """Helper function for users of this class.
 
     """
     # check for valid parameter combination
     if mode == constants.REPLACE_DISK_CHG:
-      if remote_node is None and iallocator is None:
+      if remote_node is None and ialloc is None:
         raise errors.OpPrereqError("When changing the secondary either an"
                                    " iallocator script must be used or the"
                                    " new node given", errors.ECODE_INVAL)
 
-      if remote_node is not None and iallocator is not None:
+      if remote_node is not None and ialloc is not None:
         raise errors.OpPrereqError("Give either the iallocator or the new"
                                    " secondary, not both", errors.ECODE_INVAL)
 
-    elif remote_node is not None or iallocator is not None:
+    elif remote_node is not None or ialloc is not None:
       # Not replacing the secondary
       raise errors.OpPrereqError("The iallocator and new node options can"
                                  " only be used when changing the"
@@ -10544,10 +10818,9 @@ class TLReplaceDisks(Tasklet):
     """Compute a new secondary node using an IAllocator.
 
     """
-    ial = IAllocator(lu.cfg, lu.rpc,
-                     mode=constants.IALLOCATOR_MODE_RELOC,
-                     name=instance_name,
-                     relocate_from=list(relocate_from))
+    req = iallocator.IAReqRelocate(name=instance_name,
+                                   relocate_from=list(relocate_from))
+    ial = iallocator.IAllocator(lu.cfg, lu.rpc, req)
 
     ial.Run(iallocator_name)
 
@@ -10556,13 +10829,6 @@ class TLReplaceDisks(Tasklet):
                                  " %s" % (iallocator_name, ial.info),
                                  errors.ECODE_NORES)
 
-    if len(ial.result) != ial.required_nodes:
-      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
-                                 " of nodes (%s), required %s" %
-                                 (iallocator_name,
-                                  len(ial.result), ial.required_nodes),
-                                 errors.ECODE_FAULT)
-
     remote_node_name = ial.result[0]
 
     lu.LogInfo("Selected new secondary for instance '%s': %s",
@@ -10735,8 +11001,9 @@ class TLReplaceDisks(Tasklet):
     if self.remote_node_info:
       # We change the node, lets verify it still meets instance policy
       new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group)
-      ipolicy = _CalculateGroupIPolicy(self.cfg.GetClusterInfo(),
-                                       new_group_info)
+      cluster = self.cfg.GetClusterInfo()
+      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
+                                                              new_group_info)
       _CheckTargetNodeIPolicy(self, ipolicy, instance, self.remote_node_info,
                               ignore=self.ignore_ipolicy)
 
@@ -10906,7 +11173,8 @@ class TLReplaceDisks(Tasklet):
                              logical_id=(vg_data, names[0]),
                              params=data_disk.params)
       vg_meta = meta_disk.logical_id[0]
-      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=DRBD_META_SIZE,
+      lv_meta = objects.Disk(dev_type=constants.LD_LV,
+                             size=constants.DRBD_META_SIZE,
                              logical_id=(vg_meta, names[1]),
                              params=meta_disk.params)
 
@@ -11493,9 +11761,10 @@ class LUNodeEvacuate(NoHooksLU):
 
     elif self.op.iallocator is not None:
       # TODO: Implement relocation to other group
-      ial = IAllocator(self.cfg, self.rpc, constants.IALLOCATOR_MODE_NODE_EVAC,
-                       evac_mode=self._MODE2IALLOCATOR[self.op.mode],
-                       instances=list(self.instance_names))
+      evac_mode = self._MODE2IALLOCATOR[self.op.mode]
+      req = iallocator.IAReqNodeEvac(evac_mode=evac_mode,
+                                     instances=list(self.instance_names))
+      ial = iallocator.IAllocator(self.cfg, self.rpc, req)
 
       ial.Run(self.op.iallocator)
 
@@ -11581,6 +11850,23 @@ def _LoadNodeEvacResult(lu, alloc_result, early_release, use_nodes):
           for ops in jobs]
 
 
+def _DiskSizeInBytesToMebibytes(lu, size):
+  """Converts a disk size in bytes to mebibytes.
+
+  Warns and rounds up if the size isn't an even multiple of 1 MiB.
+
+  """
+  (mib, remainder) = divmod(size, 1024 * 1024)
+
+  if remainder != 0:
+    lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up"
+                  " to not overwrite existing data (%s bytes will not be"
+                  " wiped)", (1024 * 1024) - remainder)
+    mib += 1
+
+  return mib
+
+
 class LUInstanceGrowDisk(LogicalUnit):
   """Grow a disk of an instance.
 
@@ -11682,6 +11968,8 @@ class LUInstanceGrowDisk(LogicalUnit):
     assert (self.owned_locks(locking.LEVEL_NODE) ==
             self.owned_locks(locking.LEVEL_NODE_RES))
 
+    wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks
+
     disks_ok, _ = _AssembleInstanceDisks(self, self.instance, disks=[disk])
     if not disks_ok:
       raise errors.OpExecError("Cannot activate block device to grow")
@@ -11696,7 +11984,27 @@ class LUInstanceGrowDisk(LogicalUnit):
       self.cfg.SetDiskID(disk, node)
       result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
                                            True, True)
-      result.Raise("Grow request failed to node %s" % node)
+      result.Raise("Dry-run grow request failed to node %s" % node)
+
+    if wipe_disks:
+      # Get disk size from primary node for wiping
+      result = self.rpc.call_blockdev_getsize(instance.primary_node, [disk])
+      result.Raise("Failed to retrieve disk size from node '%s'" %
+                   instance.primary_node)
+
+      (disk_size_in_bytes, ) = result.payload
+
+      if disk_size_in_bytes is None:
+        raise errors.OpExecError("Failed to retrieve disk size from primary"
+                                 " node '%s'" % instance.primary_node)
+
+      old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes)
+
+      assert old_disk_size >= disk.size, \
+        ("Retrieved disk size too small (got %s, should be at least %s)" %
+         (old_disk_size, disk.size))
+    else:
+      old_disk_size = None
 
     # We know that (as far as we can test) operations across different
     # nodes will succeed, time to run it for real on the backing storage
@@ -11722,6 +12030,15 @@ class LUInstanceGrowDisk(LogicalUnit):
     # Downgrade lock while waiting for sync
     self.glm.downgrade(locking.LEVEL_INSTANCE)
 
+    assert wipe_disks ^ (old_disk_size is None)
+
+    if wipe_disks:
+      assert instance.disks[self.op.disk] == disk
+
+      # Wipe newly added disk space
+      _WipeDisks(self, instance,
+                 disks=[(self.op.disk, disk, old_disk_size)])
+
     if self.op.wait_for_sync:
       disk_abort = not _WaitForSync(self, instance, disks=[disk])
       if disk_abort:
@@ -12223,12 +12540,10 @@ class LUInstanceSetParams(LogicalUnit):
     if self.op.hvparams:
       _CheckGlobalHvParams(self.op.hvparams)
 
-    self.op.disks = \
-      self._UpgradeDiskNicMods("disk", self.op.disks,
-        opcodes.OpInstanceSetParams.TestDiskModifications)
-    self.op.nics = \
-      self._UpgradeDiskNicMods("NIC", self.op.nics,
-        opcodes.OpInstanceSetParams.TestNicModifications)
+    self.op.disks = self._UpgradeDiskNicMods(
+      "disk", self.op.disks, opcodes.OpInstanceSetParams.TestDiskModifications)
+    self.op.nics = self._UpgradeDiskNicMods(
+      "NIC", self.op.nics, opcodes.OpInstanceSetParams.TestNicModifications)
 
     # Check disk modifications
     self._CheckMods("disk", self.op.disks, constants.IDISK_PARAMS_TYPES,
@@ -12427,7 +12742,8 @@ class LUInstanceSetParams(LogicalUnit):
 
         snode_info = self.cfg.GetNodeInfo(self.op.remote_node)
         snode_group = self.cfg.GetNodeGroup(snode_info.group)
-        ipolicy = _CalculateGroupIPolicy(cluster, snode_group)
+        ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
+                                                                snode_group)
         _CheckTargetNodeIPolicy(self, ipolicy, instance, snode_info,
                                 ignore=self.op.ignore_ipolicy)
         if pnode_info.group != snode_info.group:
@@ -12528,7 +12844,7 @@ class LUInstanceSetParams(LogicalUnit):
                            " free memory information" % pnode)
         elif instance_info.fail_msg:
           self.warn.append("Can't get instance runtime information: %s" %
-                          instance_info.fail_msg)
+                           instance_info.fail_msg)
         else:
           if instance_info.payload:
             current_mem = int(instance_info.payload["memory"])
@@ -12545,8 +12861,7 @@ class LUInstanceSetParams(LogicalUnit):
             raise errors.OpPrereqError("This change will prevent the instance"
                                        " from starting, due to %d MB of memory"
                                        " missing on its primary node" %
-                                       miss_mem,
-                                       errors.ECODE_NORES)
+                                       miss_mem, errors.ECODE_NORES)
 
       if be_new[constants.BE_AUTO_BALANCE]:
         for node, nres in nodeinfo.items():
@@ -12572,8 +12887,8 @@ class LUInstanceSetParams(LogicalUnit):
                                                 instance.hypervisor)
       remote_info.Raise("Error checking node %s" % instance.primary_node)
       if not remote_info.payload: # not running already
-        raise errors.OpPrereqError("Instance %s is not running" % instance.name,
-                                   errors.ECODE_STATE)
+        raise errors.OpPrereqError("Instance %s is not running" %
+                                   instance.name, errors.ECODE_STATE)
 
       current_memory = remote_info.payload["memory"]
       if (not self.op.force and
@@ -12581,7 +12896,8 @@ class LUInstanceSetParams(LogicalUnit):
             self.op.runtime_mem < self.be_proposed[constants.BE_MINMEM])):
         raise errors.OpPrereqError("Instance %s must have memory between %d"
                                    " and %d MB of memory unless --force is"
-                                   " given" % (instance.name,
+                                   " given" %
+                                   (instance.name,
                                     self.be_proposed[constants.BE_MINMEM],
                                     self.be_proposed[constants.BE_MAXMEM]),
                                    errors.ECODE_INVAL)
@@ -12595,8 +12911,7 @@ class LUInstanceSetParams(LogicalUnit):
 
     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
       raise errors.OpPrereqError("Disk operations not supported for"
-                                 " diskless instances",
-                                 errors.ECODE_INVAL)
+                                 " diskless instances", errors.ECODE_INVAL)
 
     def _PrepareNicCreate(_, params, private):
       self._PrepareNicModification(params, private, None, {}, cluster, pnode)
@@ -13094,8 +13409,9 @@ class LUInstanceChangeGroup(LogicalUnit):
 
     assert instances == [self.op.instance_name], "Instance not locked"
 
-    ial = IAllocator(self.cfg, self.rpc, constants.IALLOCATOR_MODE_CHG_GROUP,
-                     instances=instances, target_groups=list(self.target_uuids))
+    req = iallocator.IAReqGroupChange(instances=instances,
+                                      target_groups=list(self.target_uuids))
+    ial = iallocator.IAllocator(self.cfg, self.rpc, req)
 
     ial.Run(self.op.iallocator)
 
@@ -13103,8 +13419,7 @@ class LUInstanceChangeGroup(LogicalUnit):
       raise errors.OpPrereqError("Can't compute solution for changing group of"
                                  " instance '%s' using iallocator '%s': %s" %
                                  (self.op.instance_name, self.op.iallocator,
-                                  ial.info),
-                                 errors.ECODE_NORES)
+                                  ial.info), errors.ECODE_NORES)
 
     jobs = _LoadNodeEvacResult(self, ial.result, self.op.early_release, False)
 
@@ -13333,7 +13648,7 @@ class LUBackupExport(LogicalUnit):
         self.instance.admin_state == constants.ADMINST_UP and
         not self.op.shutdown):
       raise errors.OpPrereqError("Can not remove instance without shutting it"
-                                 " down before")
+                                 " down before", errors.ECODE_STATE)
 
     if self.op.mode == constants.EXPORT_MODE_LOCAL:
       self.op.target_node = _ExpandNodeName(self.cfg, self.op.target_node)
@@ -13363,7 +13678,8 @@ class LUBackupExport(LogicalUnit):
       try:
         (key_name, hmac_digest, hmac_salt) = self.x509_key_name
       except (TypeError, ValueError), err:
-        raise errors.OpPrereqError("Invalid data for X509 key name: %s" % err)
+        raise errors.OpPrereqError("Invalid data for X509 key name: %s" % err,
+                                   errors.ECODE_INVAL)
 
       if not utils.VerifySha1Hmac(cds, key_name, hmac_digest, salt=hmac_salt):
         raise errors.OpPrereqError("HMAC for X509 key name is wrong",
@@ -13990,7 +14306,7 @@ class LUGroupSetParams(LogicalUnit):
 
     if self.op.ndparams:
       new_ndparams = _GetUpdatedParams(self.group.ndparams, self.op.ndparams)
-      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
+      utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
       self.new_ndparams = new_ndparams
 
     if self.op.diskparams:
@@ -14028,9 +14344,10 @@ class LUGroupSetParams(LogicalUnit):
       new_ipolicy = cluster.SimpleFillIPolicy(self.new_ipolicy)
       inst_filter = lambda inst: inst.name in owned_instances
       instances = self.cfg.GetInstancesInfoByFilter(inst_filter).values()
+      gmi = ganeti.masterd.instance
       violations = \
-          _ComputeNewInstanceViolations(_CalculateGroupIPolicy(cluster,
-                                                               self.group),
+          _ComputeNewInstanceViolations(gmi.CalculateGroupIPolicy(cluster,
+                                                                  self.group),
                                         new_ipolicy, instances)
 
       if violations:
@@ -14118,9 +14435,8 @@ class LUGroupRemove(LogicalUnit):
 
     # Verify the cluster would not be left group-less.
     if len(self.cfg.GetNodeGroupList()) == 1:
-      raise errors.OpPrereqError("Group '%s' is the only group,"
-                                 " cannot be removed" %
-                                 self.op.group_name,
+      raise errors.OpPrereqError("Group '%s' is the only group, cannot be"
+                                 " removed" % self.op.group_name,
                                  errors.ECODE_STATE)
 
   def BuildHooksEnv(self):
@@ -14349,8 +14665,9 @@ class LUGroupEvacuate(LogicalUnit):
 
     assert self.group_uuid not in self.target_uuids
 
-    ial = IAllocator(self.cfg, self.rpc, constants.IALLOCATOR_MODE_CHG_GROUP,
-                     instances=instances, target_groups=self.target_uuids)
+    req = iallocator.IAReqGroupChange(instances=instances,
+                                      target_groups=self.target_uuids)
+    ial = iallocator.IAllocator(self.cfg, self.rpc, req)
 
     ial.Run(self.op.iallocator)
 
@@ -14723,519 +15040,6 @@ class LUTestJqueue(NoHooksLU):
     return True
 
 
-class IAllocator(object):
-  """IAllocator framework.
-
-  An IAllocator instance has three sets of attributes:
-    - cfg that is needed to query the cluster
-    - input data (all members of the _KEYS class attribute are required)
-    - four buffer attributes (in|out_data|text), that represent the
-      input (to the external script) in text and data structure format,
-      and the output from it, again in two formats
-    - the result variables from the script (success, info, nodes) for
-      easy usage
-
-  """
-  # pylint: disable=R0902
-  # lots of instance attributes
-
-  def __init__(self, cfg, rpc_runner, mode, **kwargs):
-    self.cfg = cfg
-    self.rpc = rpc_runner
-    # init buffer variables
-    self.in_text = self.out_text = self.in_data = self.out_data = None
-    # init all input fields so that pylint is happy
-    self.mode = mode
-    self.memory = self.disks = self.disk_template = self.spindle_use = None
-    self.os = self.tags = self.nics = self.vcpus = None
-    self.hypervisor = None
-    self.relocate_from = None
-    self.name = None
-    self.instances = None
-    self.evac_mode = None
-    self.target_groups = []
-    # computed fields
-    self.required_nodes = None
-    # init result fields
-    self.success = self.info = self.result = None
-
-    try:
-      (fn, keydata, self._result_check) = self._MODE_DATA[self.mode]
-    except KeyError:
-      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
-                                   " IAllocator" % self.mode)
-
-    keyset = [n for (n, _) in keydata]
-
-    for key in kwargs:
-      if key not in keyset:
-        raise errors.ProgrammerError("Invalid input parameter '%s' to"
-                                     " IAllocator" % key)
-      setattr(self, key, kwargs[key])
-
-    for key in keyset:
-      if key not in kwargs:
-        raise errors.ProgrammerError("Missing input parameter '%s' to"
-                                     " IAllocator" % key)
-    self._BuildInputData(compat.partial(fn, self), keydata)
-
-  def _ComputeClusterData(self):
-    """Compute the generic allocator input data.
-
-    This is the data that is independent of the actual operation.
-
-    """
-    cfg = self.cfg
-    cluster_info = cfg.GetClusterInfo()
-    # cluster data
-    data = {
-      "version": constants.IALLOCATOR_VERSION,
-      "cluster_name": cfg.GetClusterName(),
-      "cluster_tags": list(cluster_info.GetTags()),
-      "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
-      "ipolicy": cluster_info.ipolicy,
-      }
-    ninfo = cfg.GetAllNodesInfo()
-    iinfo = cfg.GetAllInstancesInfo().values()
-    i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
-
-    # node data
-    node_list = [n.name for n in ninfo.values() if n.vm_capable]
-
-    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
-      hypervisor_name = self.hypervisor
-    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
-      hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
-    else:
-      hypervisor_name = cluster_info.primary_hypervisor
-
-    node_data = self.rpc.call_node_info(node_list, [cfg.GetVGName()],
-                                        [hypervisor_name])
-    node_iinfo = \
-      self.rpc.call_all_instances_info(node_list,
-                                       cluster_info.enabled_hypervisors)
-
-    data["nodegroups"] = self._ComputeNodeGroupData(cfg)
-
-    config_ndata = self._ComputeBasicNodeData(cfg, ninfo)
-    data["nodes"] = self._ComputeDynamicNodeData(ninfo, node_data, node_iinfo,
-                                                 i_list, config_ndata)
-    assert len(data["nodes"]) == len(ninfo), \
-        "Incomplete node data computed"
-
-    data["instances"] = self._ComputeInstanceData(cluster_info, i_list)
-
-    self.in_data = data
-
-  @staticmethod
-  def _ComputeNodeGroupData(cfg):
-    """Compute node groups data.
-
-    """
-    cluster = cfg.GetClusterInfo()
-    ng = dict((guuid, {
-      "name": gdata.name,
-      "alloc_policy": gdata.alloc_policy,
-      "ipolicy": _CalculateGroupIPolicy(cluster, gdata),
-      })
-      for guuid, gdata in cfg.GetAllNodeGroupsInfo().items())
-
-    return ng
-
-  @staticmethod
-  def _ComputeBasicNodeData(cfg, node_cfg):
-    """Compute global node data.
-
-    @rtype: dict
-    @returns: a dict of name: (node dict, node config)
-
-    """
-    # fill in static (config-based) values
-    node_results = dict((ninfo.name, {
-      "tags": list(ninfo.GetTags()),
-      "primary_ip": ninfo.primary_ip,
-      "secondary_ip": ninfo.secondary_ip,
-      "offline": ninfo.offline,
-      "drained": ninfo.drained,
-      "master_candidate": ninfo.master_candidate,
-      "group": ninfo.group,
-      "master_capable": ninfo.master_capable,
-      "vm_capable": ninfo.vm_capable,
-      "ndparams": cfg.GetNdParams(ninfo),
-      })
-      for ninfo in node_cfg.values())
-
-    return node_results
-
-  @staticmethod
-  def _ComputeDynamicNodeData(node_cfg, node_data, node_iinfo, i_list,
-                              node_results):
-    """Compute global node data.
-
-    @param node_results: the basic node structures as filled from the config
-
-    """
-    #TODO(dynmem): compute the right data on MAX and MIN memory
-    # make a copy of the current dict
-    node_results = dict(node_results)
-    for nname, nresult in node_data.items():
-      assert nname in node_results, "Missing basic data for node %s" % nname
-      ninfo = node_cfg[nname]
-
-      if not (ninfo.offline or ninfo.drained):
-        nresult.Raise("Can't get data for node %s" % nname)
-        node_iinfo[nname].Raise("Can't get node instance info from node %s" %
-                                nname)
-        remote_info = _MakeLegacyNodeInfo(nresult.payload)
-
-        for attr in ["memory_total", "memory_free", "memory_dom0",
-                     "vg_size", "vg_free", "cpu_total"]:
-          if attr not in remote_info:
-            raise errors.OpExecError("Node '%s' didn't return attribute"
-                                     " '%s'" % (nname, attr))
-          if not isinstance(remote_info[attr], int):
-            raise errors.OpExecError("Node '%s' returned invalid value"
-                                     " for '%s': %s" %
-                                     (nname, attr, remote_info[attr]))
-        # compute memory used by primary instances
-        i_p_mem = i_p_up_mem = 0
-        for iinfo, beinfo in i_list:
-          if iinfo.primary_node == nname:
-            i_p_mem += beinfo[constants.BE_MAXMEM]
-            if iinfo.name not in node_iinfo[nname].payload:
-              i_used_mem = 0
-            else:
-              i_used_mem = int(node_iinfo[nname].payload[iinfo.name]["memory"])
-            i_mem_diff = beinfo[constants.BE_MAXMEM] - i_used_mem
-            remote_info["memory_free"] -= max(0, i_mem_diff)
-
-            if iinfo.admin_state == constants.ADMINST_UP:
-              i_p_up_mem += beinfo[constants.BE_MAXMEM]
-
-        # compute memory used by instances
-        pnr_dyn = {
-          "total_memory": remote_info["memory_total"],
-          "reserved_memory": remote_info["memory_dom0"],
-          "free_memory": remote_info["memory_free"],
-          "total_disk": remote_info["vg_size"],
-          "free_disk": remote_info["vg_free"],
-          "total_cpus": remote_info["cpu_total"],
-          "i_pri_memory": i_p_mem,
-          "i_pri_up_memory": i_p_up_mem,
-          }
-        pnr_dyn.update(node_results[nname])
-        node_results[nname] = pnr_dyn
-
-    return node_results
-
-  @staticmethod
-  def _ComputeInstanceData(cluster_info, i_list):
-    """Compute global instance data.
-
-    """
-    instance_data = {}
-    for iinfo, beinfo in i_list:
-      nic_data = []
-      for nic in iinfo.nics:
-        filled_params = cluster_info.SimpleFillNIC(nic.nicparams)
-        nic_dict = {
-          "mac": nic.mac,
-          "ip": nic.ip,
-          "mode": filled_params[constants.NIC_MODE],
-          "link": filled_params[constants.NIC_LINK],
-          }
-        if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
-          nic_dict["bridge"] = filled_params[constants.NIC_LINK]
-        nic_data.append(nic_dict)
-      pir = {
-        "tags": list(iinfo.GetTags()),
-        "admin_state": iinfo.admin_state,
-        "vcpus": beinfo[constants.BE_VCPUS],
-        "memory": beinfo[constants.BE_MAXMEM],
-        "spindle_use": beinfo[constants.BE_SPINDLE_USE],
-        "os": iinfo.os,
-        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
-        "nics": nic_data,
-        "disks": [{constants.IDISK_SIZE: dsk.size,
-                   constants.IDISK_MODE: dsk.mode}
-                  for dsk in iinfo.disks],
-        "disk_template": iinfo.disk_template,
-        "hypervisor": iinfo.hypervisor,
-        }
-      pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
-                                                 pir["disks"])
-      instance_data[iinfo.name] = pir
-
-    return instance_data
-
-  def _AddNewInstance(self):
-    """Add new instance data to allocator structure.
-
-    This in combination with _AllocatorGetClusterData will create the
-    correct structure needed as input for the allocator.
-
-    The checks for the completeness of the opcode must have already been
-    done.
-
-    """
-    disk_space = _ComputeDiskSize(self.disk_template, self.disks)
-
-    if self.disk_template in constants.DTS_INT_MIRROR:
-      self.required_nodes = 2
-    else:
-      self.required_nodes = 1
-
-    request = {
-      "name": self.name,
-      "disk_template": self.disk_template,
-      "tags": self.tags,
-      "os": self.os,
-      "vcpus": self.vcpus,
-      "memory": self.memory,
-      "spindle_use": self.spindle_use,
-      "disks": self.disks,
-      "disk_space_total": disk_space,
-      "nics": self.nics,
-      "required_nodes": self.required_nodes,
-      "hypervisor": self.hypervisor,
-      }
-
-    return request
-
-  def _AddRelocateInstance(self):
-    """Add relocate instance data to allocator structure.
-
-    This in combination with _IAllocatorGetClusterData will create the
-    correct structure needed as input for the allocator.
-
-    The checks for the completeness of the opcode must have already been
-    done.
-
-    """
-    instance = self.cfg.GetInstanceInfo(self.name)
-    if instance is None:
-      raise errors.ProgrammerError("Unknown instance '%s' passed to"
-                                   " IAllocator" % self.name)
-
-    if instance.disk_template not in constants.DTS_MIRRORED:
-      raise errors.OpPrereqError("Can't relocate non-mirrored instances",
-                                 errors.ECODE_INVAL)
-
-    if instance.disk_template in constants.DTS_INT_MIRROR and \
-        len(instance.secondary_nodes) != 1:
-      raise errors.OpPrereqError("Instance has not exactly one secondary node",
-                                 errors.ECODE_STATE)
-
-    self.required_nodes = 1
-    disk_sizes = [{constants.IDISK_SIZE: disk.size} for disk in instance.disks]
-    disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
-
-    request = {
-      "name": self.name,
-      "disk_space_total": disk_space,
-      "required_nodes": self.required_nodes,
-      "relocate_from": self.relocate_from,
-      }
-    return request
-
-  def _AddNodeEvacuate(self):
-    """Get data for node-evacuate requests.
-
-    """
-    return {
-      "instances": self.instances,
-      "evac_mode": self.evac_mode,
-      }
-
-  def _AddChangeGroup(self):
-    """Get data for node-evacuate requests.
-
-    """
-    return {
-      "instances": self.instances,
-      "target_groups": self.target_groups,
-      }
-
-  def _BuildInputData(self, fn, keydata):
-    """Build input data structures.
-
-    """
-    self._ComputeClusterData()
-
-    request = fn()
-    request["type"] = self.mode
-    for keyname, keytype in keydata:
-      if keyname not in request:
-        raise errors.ProgrammerError("Request parameter %s is missing" %
-                                     keyname)
-      val = request[keyname]
-      if not keytype(val):
-        raise errors.ProgrammerError("Request parameter %s doesn't pass"
-                                     " validation, value %s, expected"
-                                     " type %s" % (keyname, val, keytype))
-    self.in_data["request"] = request
-
-    self.in_text = serializer.Dump(self.in_data)
-
-  _STRING_LIST = ht.TListOf(ht.TString)
-  _JOB_LIST = ht.TListOf(ht.TListOf(ht.TStrictDict(True, False, {
-     # pylint: disable=E1101
-     # Class '...' has no 'OP_ID' member
-     "OP_ID": ht.TElemOf([opcodes.OpInstanceFailover.OP_ID,
-                          opcodes.OpInstanceMigrate.OP_ID,
-                          opcodes.OpInstanceReplaceDisks.OP_ID])
-     })))
-
-  _NEVAC_MOVED = \
-    ht.TListOf(ht.TAnd(ht.TIsLength(3),
-                       ht.TItems([ht.TNonEmptyString,
-                                  ht.TNonEmptyString,
-                                  ht.TListOf(ht.TNonEmptyString),
-                                 ])))
-  _NEVAC_FAILED = \
-    ht.TListOf(ht.TAnd(ht.TIsLength(2),
-                       ht.TItems([ht.TNonEmptyString,
-                                  ht.TMaybeString,
-                                 ])))
-  _NEVAC_RESULT = ht.TAnd(ht.TIsLength(3),
-                          ht.TItems([_NEVAC_MOVED, _NEVAC_FAILED, _JOB_LIST]))
-
-  _MODE_DATA = {
-    constants.IALLOCATOR_MODE_ALLOC:
-      (_AddNewInstance,
-       [
-        ("name", ht.TString),
-        ("memory", ht.TInt),
-        ("spindle_use", ht.TInt),
-        ("disks", ht.TListOf(ht.TDict)),
-        ("disk_template", ht.TString),
-        ("os", ht.TString),
-        ("tags", _STRING_LIST),
-        ("nics", ht.TListOf(ht.TDict)),
-        ("vcpus", ht.TInt),
-        ("hypervisor", ht.TString),
-        ], ht.TList),
-    constants.IALLOCATOR_MODE_RELOC:
-      (_AddRelocateInstance,
-       [("name", ht.TString), ("relocate_from", _STRING_LIST)],
-       ht.TList),
-     constants.IALLOCATOR_MODE_NODE_EVAC:
-      (_AddNodeEvacuate, [
-        ("instances", _STRING_LIST),
-        ("evac_mode", ht.TElemOf(constants.IALLOCATOR_NEVAC_MODES)),
-        ], _NEVAC_RESULT),
-     constants.IALLOCATOR_MODE_CHG_GROUP:
-      (_AddChangeGroup, [
-        ("instances", _STRING_LIST),
-        ("target_groups", _STRING_LIST),
-        ], _NEVAC_RESULT),
-    }
-
-  def Run(self, name, validate=True, call_fn=None):
-    """Run an instance allocator and return the results.
-
-    """
-    if call_fn is None:
-      call_fn = self.rpc.call_iallocator_runner
-
-    result = call_fn(self.cfg.GetMasterNode(), name, self.in_text)
-    result.Raise("Failure while running the iallocator script")
-
-    self.out_text = result.payload
-    if validate:
-      self._ValidateResult()
-
-  def _ValidateResult(self):
-    """Process the allocator results.
-
-    This will process and if successful save the result in
-    self.out_data and the other parameters.
-
-    """
-    try:
-      rdict = serializer.Load(self.out_text)
-    except Exception, err:
-      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
-
-    if not isinstance(rdict, dict):
-      raise errors.OpExecError("Can't parse iallocator results: not a dict")
-
-    # TODO: remove backwards compatiblity in later versions
-    if "nodes" in rdict and "result" not in rdict:
-      rdict["result"] = rdict["nodes"]
-      del rdict["nodes"]
-
-    for key in "success", "info", "result":
-      if key not in rdict:
-        raise errors.OpExecError("Can't parse iallocator results:"
-                                 " missing key '%s'" % key)
-      setattr(self, key, rdict[key])
-
-    if not self._result_check(self.result):
-      raise errors.OpExecError("Iallocator returned invalid result,"
-                               " expected %s, got %s" %
-                               (self._result_check, self.result),
-                               errors.ECODE_INVAL)
-
-    if self.mode == constants.IALLOCATOR_MODE_RELOC:
-      assert self.relocate_from is not None
-      assert self.required_nodes == 1
-
-      node2group = dict((name, ndata["group"])
-                        for (name, ndata) in self.in_data["nodes"].items())
-
-      fn = compat.partial(self._NodesToGroups, node2group,
-                          self.in_data["nodegroups"])
-
-      instance = self.cfg.GetInstanceInfo(self.name)
-      request_groups = fn(self.relocate_from + [instance.primary_node])
-      result_groups = fn(rdict["result"] + [instance.primary_node])
-
-      if self.success and not set(result_groups).issubset(request_groups):
-        raise errors.OpExecError("Groups of nodes returned by iallocator (%s)"
-                                 " differ from original groups (%s)" %
-                                 (utils.CommaJoin(result_groups),
-                                  utils.CommaJoin(request_groups)))
-
-    elif self.mode == constants.IALLOCATOR_MODE_NODE_EVAC:
-      assert self.evac_mode in constants.IALLOCATOR_NEVAC_MODES
-
-    self.out_data = rdict
-
-  @staticmethod
-  def _NodesToGroups(node2group, groups, nodes):
-    """Returns a list of unique group names for a list of nodes.
-
-    @type node2group: dict
-    @param node2group: Map from node name to group UUID
-    @type groups: dict
-    @param groups: Group information
-    @type nodes: list
-    @param nodes: Node names
-
-    """
-    result = set()
-
-    for node in nodes:
-      try:
-        group_uuid = node2group[node]
-      except KeyError:
-        # Ignore unknown node
-        pass
-      else:
-        try:
-          group = groups[group_uuid]
-        except KeyError:
-          # Can't find group, let's use UUID
-          group_name = group_uuid
-        else:
-          group_name = group["name"]
-
-        result.add(group_name)
-
-    return sorted(result)
-
-
 class LUTestAllocator(NoHooksLU):
   """Run allocator tests.
 
@@ -15248,7 +15052,8 @@ class LUTestAllocator(NoHooksLU):
     This checks the opcode parameters depending on the director and mode test.
 
     """
-    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
+    if self.op.mode in (constants.IALLOCATOR_MODE_ALLOC,
+                        constants.IALLOCATOR_MODE_MULTI_ALLOC):
       for attr in ["memory", "disks", "disk_template",
                    "os", "tags", "nics", "vcpus"]:
         if not hasattr(self.op, attr):
@@ -15301,38 +15106,44 @@ class LUTestAllocator(NoHooksLU):
 
     """
     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
-      ial = IAllocator(self.cfg, self.rpc,
-                       mode=self.op.mode,
-                       name=self.op.name,
-                       memory=self.op.memory,
-                       disks=self.op.disks,
-                       disk_template=self.op.disk_template,
-                       os=self.op.os,
-                       tags=self.op.tags,
-                       nics=self.op.nics,
-                       vcpus=self.op.vcpus,
-                       hypervisor=self.op.hypervisor,
-                       )
+      req = iallocator.IAReqInstanceAlloc(name=self.op.name,
+                                          memory=self.op.memory,
+                                          disks=self.op.disks,
+                                          disk_template=self.op.disk_template,
+                                          os=self.op.os,
+                                          tags=self.op.tags,
+                                          nics=self.op.nics,
+                                          vcpus=self.op.vcpus,
+                                          spindle_use=self.op.spindle_use,
+                                          hypervisor=self.op.hypervisor)
     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
-      ial = IAllocator(self.cfg, self.rpc,
-                       mode=self.op.mode,
-                       name=self.op.name,
-                       relocate_from=list(self.relocate_from),
-                       )
+      req = iallocator.IAReqRelocate(name=self.op.name,
+                                     relocate_from=list(self.relocate_from))
     elif self.op.mode == constants.IALLOCATOR_MODE_CHG_GROUP:
-      ial = IAllocator(self.cfg, self.rpc,
-                       mode=self.op.mode,
-                       instances=self.op.instances,
-                       target_groups=self.op.target_groups)
+      req = iallocator.IAReqGroupChange(instances=self.op.instances,
+                                        target_groups=self.op.target_groups)
     elif self.op.mode == constants.IALLOCATOR_MODE_NODE_EVAC:
-      ial = IAllocator(self.cfg, self.rpc,
-                       mode=self.op.mode,
-                       instances=self.op.instances,
-                       evac_mode=self.op.evac_mode)
+      req = iallocator.IAReqNodeEvac(instances=self.op.instances,
+                                     evac_mode=self.op.evac_mode)
+    elif self.op.mode == constants.IALLOCATOR_MODE_MULTI_ALLOC:
+      disk_template = self.op.disk_template
+      insts = [iallocator.IAReqInstanceAlloc(name="%s%s" % (self.op.name, idx),
+                                             memory=self.op.memory,
+                                             disks=self.op.disks,
+                                             disk_template=disk_template,
+                                             os=self.op.os,
+                                             tags=self.op.tags,
+                                             nics=self.op.nics,
+                                             vcpus=self.op.vcpus,
+                                             spindle_use=self.op.spindle_use,
+                                             hypervisor=self.op.hypervisor)
+               for idx in range(self.op.count)]
+      req = iallocator.IAReqMultiInstanceAlloc(instances=insts)
     else:
       raise errors.ProgrammerError("Uncatched mode %s in"
                                    " LUTestAllocator.Exec", self.op.mode)
 
+    ial = iallocator.IAllocator(self.cfg, self.rpc, req)
     if self.op.direction == constants.IALLOCATOR_DIR_IN:
       result = ial.in_text
     else: