Add infrastructure for building numeric namefield filters
[ganeti-local] / lib / cmdlib.py
index 34902d1..7a12229 100644 (file)
@@ -640,7 +640,8 @@ def _CheckInstancesNodeGroups(cfg, instances, owned_groups, owned_nodes,
       "Instance %s has no node in group %s" % (name, cur_group_uuid)
 
 
-def _CheckInstanceNodeGroups(cfg, instance_name, owned_groups):
+def _CheckInstanceNodeGroups(cfg, instance_name, owned_groups,
+                             primary_only=False):
   """Checks if the owned node groups are still correct for an instance.
 
   @type cfg: L{config.ConfigWriter}
@@ -649,9 +650,11 @@ def _CheckInstanceNodeGroups(cfg, instance_name, owned_groups):
   @param instance_name: Instance name
   @type owned_groups: set or frozenset
   @param owned_groups: List of currently owned node groups
+  @type primary_only: boolean
+  @param primary_only: Whether to check node groups for only the primary node
 
   """
-  inst_groups = cfg.GetInstanceNodeGroups(instance_name)
+  inst_groups = cfg.GetInstanceNodeGroups(instance_name, primary_only)
 
   if not owned_groups.issuperset(inst_groups):
     raise errors.OpPrereqError("Instance %s's node groups changed since"
@@ -815,7 +818,7 @@ def _GetUpdatedIPolicy(old_ipolicy, new_ipolicy, group_policy=False):
           # in a nicer way
           ipolicy[key] = list(value)
   try:
-    objects.InstancePolicy.CheckParameterSyntax(ipolicy)
+    objects.InstancePolicy.CheckParameterSyntax(ipolicy, not group_policy)
   except errors.ConfigurationError, err:
     raise errors.OpPrereqError("Invalid instance policy: %s" % err,
                                errors.ECODE_INVAL)
@@ -1611,7 +1614,8 @@ def _FindFaultyInstanceDisks(cfg, rpc_runner, instance, node_name, prereq):
   for dev in instance.disks:
     cfg.SetDiskID(dev, node_name)
 
-  result = rpc_runner.call_blockdev_getmirrorstatus(node_name, instance.disks)
+  result = rpc_runner.call_blockdev_getmirrorstatus(node_name, (instance.disks,
+                                                                instance))
   result.Raise("Failed to get disk status from node %s" % node_name,
                prereq=prereq, ecode=errors.ECODE_ENVIRON)
 
@@ -1651,7 +1655,7 @@ def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
                                  " cluster-wide default iallocator found;"
                                  " please specify either an iallocator or a"
                                  " node, or set a cluster-wide default"
-                                 " iallocator")
+                                 " iallocator", errors.ECODE_INVAL)
 
 
 def _GetDefaultIAllocator(cfg, iallocator):
@@ -2923,12 +2927,12 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
 
       node_disks[nname] = disks
 
-      # Creating copies as SetDiskID below will modify the objects and that can
-      # lead to incorrect data returned from nodes
-      devonly = [dev.Copy() for (_, dev) in disks]
-
-      for dev in devonly:
-        self.cfg.SetDiskID(dev, nname)
+      # _AnnotateDiskParams makes already copies of the disks
+      devonly = []
+      for (inst, dev) in disks:
+        (anno_disk,) = _AnnotateDiskParams(instanceinfo[inst], [dev], self.cfg)
+        self.cfg.SetDiskID(anno_disk, nname)
+        devonly.append(anno_disk)
 
       node_disks_devonly[nname] = devonly
 
@@ -3823,10 +3827,10 @@ def _ValidateNetmask(cfg, netmask):
     ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
   except errors.ProgrammerError:
     raise errors.OpPrereqError("Invalid primary ip family: %s." %
-                               ip_family)
+                               ip_family, errors.ECODE_INVAL)
   if not ipcls.ValidateNetmask(netmask):
     raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
-                                (netmask))
+                                (netmask), errors.ECODE_INVAL)
 
 
 class LUClusterSetParams(LogicalUnit):
@@ -3856,6 +3860,11 @@ class LUClusterSetParams(LogicalUnit):
     if self.op.diskparams:
       for dt_params in self.op.diskparams.values():
         utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
+      try:
+        utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
+      except errors.OpPrereqError, err:
+        raise errors.OpPrereqError("While verify diskparams options: %s" % err,
+                                   errors.ECODE_INVAL)
 
   def ExpandNames(self):
     # FIXME: in the future maybe other cluster params won't require checking on
@@ -4020,7 +4029,7 @@ class LUClusterSetParams(LogicalUnit):
                               " address" % (instance.name, nic_idx))
       if nic_errors:
         raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
-                                   "\n".join(nic_errors))
+                                   "\n".join(nic_errors), errors.ECODE_INVAL)
 
     # hypervisor list/parameters
     self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
@@ -4304,6 +4313,9 @@ def _ComputeAncillaryFiles(cluster, redist):
   if cluster.modify_etc_hosts:
     files_all.add(constants.ETC_HOSTS)
 
+  if cluster.use_external_mip_script:
+    files_all.add(constants.EXTERNAL_MASTER_SETUP_SCRIPT)
+
   # Files which are optional, these must:
   # - be present in one other category as well
   # - either exist or not exist on all nodes of that category (mc, vm all)
@@ -4317,10 +4329,6 @@ def _ComputeAncillaryFiles(cluster, redist):
   if not redist:
     files_mc.add(constants.CLUSTER_CONF_FILE)
 
-    # FIXME: this should also be replicated but Ganeti doesn't support files_mc
-    # replication
-    files_mc.add(constants.DEFAULT_MASTER_SETUP_SCRIPT)
-
   # Files which should only be on VM-capable nodes
   files_vm = set(filename
     for hv_name in cluster.enabled_hypervisors
@@ -4361,7 +4369,8 @@ def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True):
   master_info = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
 
   online_nodes = lu.cfg.GetOnlineNodeList()
-  vm_nodes = lu.cfg.GetVmCapableNodeList()
+  online_set = frozenset(online_nodes)
+  vm_nodes = list(online_set.intersection(lu.cfg.GetVmCapableNodeList()))
 
   if additional_nodes is not None:
     online_nodes.extend(additional_nodes)
@@ -4470,7 +4479,7 @@ def _WaitForSync(lu, instance, disks=None, oneshot=False):
     max_time = 0
     done = True
     cumul_degraded = False
-    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, disks)
+    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, (disks, instance))
     msg = rstats.fail_msg
     if msg:
       lu.LogWarning("Can't get any data from node %s: %s", node, msg)
@@ -5948,11 +5957,12 @@ class LUNodeSetParams(LogicalUnit):
                                            self.op.powered == True):
         raise errors.OpPrereqError(("Node %s needs to be turned on before its"
                                     " offline status can be reset") %
-                                   self.op.node_name)
+                                   self.op.node_name, errors.ECODE_STATE)
     elif self.op.powered is not None:
       raise errors.OpPrereqError(("Unable to change powered state for node %s"
                                   " as it does not support out-of-band"
-                                  " handling") % self.op.node_name)
+                                  " handling") % self.op.node_name,
+                                 errors.ECODE_STATE)
 
     # If we're being deofflined/drained, we'll MC ourself if needed
     if (self.op.drained == False or self.op.offline == False or
@@ -5996,23 +6006,43 @@ class LUNodeSetParams(LogicalUnit):
                         " without using re-add. Please make sure the node"
                         " is healthy!")
 
+    # When changing the secondary ip, verify if this is a single-homed to
+    # multi-homed transition or vice versa, and apply the relevant
+    # restrictions.
     if self.op.secondary_ip:
       # Ok even without locking, because this can't be changed by any LU
       master = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
       master_singlehomed = master.secondary_ip == master.primary_ip
-      if master_singlehomed and self.op.secondary_ip:
-        raise errors.OpPrereqError("Cannot change the secondary ip on a single"
-                                   " homed cluster", errors.ECODE_INVAL)
+      if master_singlehomed and self.op.secondary_ip != node.primary_ip:
+        if self.op.force and node.name == master.name:
+          self.LogWarning("Transitioning from single-homed to multi-homed"
+                          " cluster. All nodes will require a secondary ip.")
+        else:
+          raise errors.OpPrereqError("Changing the secondary ip on a"
+                                     " single-homed cluster requires the"
+                                     " --force option to be passed, and the"
+                                     " target node to be the master",
+                                     errors.ECODE_INVAL)
+      elif not master_singlehomed and self.op.secondary_ip == node.primary_ip:
+        if self.op.force and node.name == master.name:
+          self.LogWarning("Transitioning from multi-homed to single-homed"
+                          " cluster. Secondary IPs will have to be removed.")
+        else:
+          raise errors.OpPrereqError("Cannot set the secondary IP to be the"
+                                     " same as the primary IP on a multi-homed"
+                                     " cluster, unless the --force option is"
+                                     " passed, and the target node is the"
+                                     " master", errors.ECODE_INVAL)
 
       assert not (frozenset(affected_instances) -
                   self.owned_locks(locking.LEVEL_INSTANCE))
 
       if node.offline:
         if affected_instances:
-          raise errors.OpPrereqError("Cannot change secondary IP address:"
-                                     " offline node has instances (%s)"
-                                     " configured to use it" %
-                                     utils.CommaJoin(affected_instances.keys()))
+          msg = ("Cannot change secondary IP address: offline node has"
+                 " instances (%s) configured to use it" %
+                 utils.CommaJoin(affected_instances.keys()))
+          raise errors.OpPrereqError(msg, errors.ECODE_STATE)
       else:
         # On online nodes, check that no instances are running, and that
         # the node has the new ip and we can reach it.
@@ -6316,6 +6346,10 @@ class LUInstanceActivateDisks(NoHooksLU):
     if not disks_ok:
       raise errors.OpExecError("Cannot activate block devices")
 
+    if self.op.wait_for_sync:
+      if not _WaitForSync(self, self.instance):
+        raise errors.OpExecError("Some disks of the instance are degraded!")
+
     return disks_info
 
 
@@ -6368,10 +6402,12 @@ def _AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
                                              False, idx)
       msg = result.fail_msg
       if msg:
+        is_offline_secondary = (node in instance.secondary_nodes and
+                                result.offline)
         lu.proc.LogWarning("Could not prepare block device %s on node %s"
                            " (is_primary=False, pass=1): %s",
                            inst_disk.iv_name, node, msg)
-        if not ignore_secondaries:
+        if not (ignore_secondaries or is_offline_secondary):
           disks_ok = False
 
   # FIXME: race condition on drbd migration to primary
@@ -6504,7 +6540,7 @@ def _ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
   for disk in disks:
     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
       lu.cfg.SetDiskID(top_disk, node)
-      result = lu.rpc.call_blockdev_shutdown(node, top_disk)
+      result = lu.rpc.call_blockdev_shutdown(node, (top_disk, instance))
       msg = result.fail_msg
       if msg:
         lu.LogWarning("Could not shutdown block device %s on node %s: %s",
@@ -6977,9 +7013,6 @@ class LUInstanceReinstall(LogicalUnit):
       "Cannot retrieve locked instance %s" % self.op.instance_name
     _CheckNodeOnline(self, instance.primary_node, "Instance primary node"
                      " offline, cannot reinstall")
-    for node in instance.secondary_nodes:
-      _CheckNodeOnline(self, node, "Instance secondary node offline,"
-                       " cannot reinstall")
 
     if instance.disk_template == constants.DT_DISKLESS:
       raise errors.OpPrereqError("Instance '%s' has no disks" %
@@ -7053,6 +7086,64 @@ class LUInstanceRecreateDisks(LogicalUnit):
     constants.IDISK_METAVG,
     ]))
 
+  def _RunAllocator(self):
+    """Run the allocator based on input opcode.
+
+    """
+    be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
+
+    # FIXME
+    # The allocator should actually run in "relocate" mode, but current
+    # allocators don't support relocating all the nodes of an instance at
+    # the same time. As a workaround we use "allocate" mode, but this is
+    # suboptimal for two reasons:
+    # - The instance name passed to the allocator is present in the list of
+    #   existing instances, so there could be a conflict within the
+    #   internal structures of the allocator. This doesn't happen with the
+    #   current allocators, but it's a liability.
+    # - The allocator counts the resources used by the instance twice: once
+    #   because the instance exists already, and once because it tries to
+    #   allocate a new instance.
+    # The allocator could choose some of the nodes on which the instance is
+    # running, but that's not a problem. If the instance nodes are broken,
+    # they should be already be marked as drained or offline, and hence
+    # skipped by the allocator. If instance disks have been lost for other
+    # reasons, then recreating the disks on the same nodes should be fine.
+    ial = IAllocator(self.cfg, self.rpc,
+                     mode=constants.IALLOCATOR_MODE_ALLOC,
+                     name=self.op.instance_name,
+                     disk_template=self.instance.disk_template,
+                     tags=list(self.instance.GetTags()),
+                     os=self.instance.os,
+                     nics=[{}],
+                     vcpus=be_full[constants.BE_VCPUS],
+                     memory=be_full[constants.BE_MAXMEM],
+                     spindle_use=be_full[constants.BE_SPINDLE_USE],
+                     disks=[{constants.IDISK_SIZE: d.size,
+                             constants.IDISK_MODE: d.mode}
+                            for d in self.instance.disks],
+                     hypervisor=self.instance.hypervisor)
+
+    assert ial.required_nodes == len(self.instance.all_nodes)
+
+    ial.Run(self.op.iallocator)
+
+    if not ial.success:
+      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
+                                 " %s" % (self.op.iallocator, ial.info),
+                                 errors.ECODE_NORES)
+
+    if len(ial.result) != ial.required_nodes:
+      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
+                                 " of nodes (%s), required %s" %
+                                 (self.op.iallocator, len(ial.result),
+                                  ial.required_nodes), errors.ECODE_FAULT)
+
+    self.op.nodes = ial.result
+    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
+                 self.op.instance_name, self.op.iallocator,
+                 utils.CommaJoin(ial.result))
+
   def CheckArguments(self):
     if self.op.disks and ht.TPositiveInt(self.op.disks[0]):
       # Normalize and convert deprecated list of disk indices
@@ -7064,6 +7155,10 @@ class LUInstanceRecreateDisks(LogicalUnit):
                                  " once: %s" % utils.CommaJoin(duplicates),
                                  errors.ECODE_INVAL)
 
+    if self.op.iallocator and self.op.nodes:
+      raise errors.OpPrereqError("Give either the iallocator or the new"
+                                 " nodes, not both", errors.ECODE_INVAL)
+
     for (idx, params) in self.op.disks:
       utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
       unsupported = frozenset(params.keys()) - self._MODIFYABLE
@@ -7081,14 +7176,42 @@ class LUInstanceRecreateDisks(LogicalUnit):
       self.needed_locks[locking.LEVEL_NODE] = list(self.op.nodes)
     else:
       self.needed_locks[locking.LEVEL_NODE] = []
+      if self.op.iallocator:
+        # iallocator will select a new node in the same group
+        self.needed_locks[locking.LEVEL_NODEGROUP] = []
     self.needed_locks[locking.LEVEL_NODE_RES] = []
 
   def DeclareLocks(self, level):
-    if level == locking.LEVEL_NODE:
-      # if we replace the nodes, we only need to lock the old primary,
-      # otherwise we need to lock all nodes for disk re-creation
-      primary_only = bool(self.op.nodes)
-      self._LockInstancesNodes(primary_only=primary_only)
+    if level == locking.LEVEL_NODEGROUP:
+      assert self.op.iallocator is not None
+      assert not self.op.nodes
+      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
+      self.share_locks[locking.LEVEL_NODEGROUP] = 1
+      # Lock the primary group used by the instance optimistically; this
+      # requires going via the node before it's locked, requiring
+      # verification later on
+      self.needed_locks[locking.LEVEL_NODEGROUP] = \
+        self.cfg.GetInstanceNodeGroups(self.op.instance_name, primary_only=True)
+
+    elif level == locking.LEVEL_NODE:
+      # If an allocator is used, then we lock all the nodes in the current
+      # instance group, as we don't know yet which ones will be selected;
+      # if we replace the nodes without using an allocator, we only need to
+      # lock the old primary for doing RPCs (FIXME: we don't lock nodes for
+      # RPC anymore), otherwise we need to lock all the instance nodes for
+      # disk re-creation
+      if self.op.iallocator:
+        assert not self.op.nodes
+        assert not self.needed_locks[locking.LEVEL_NODE]
+        assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1
+
+        # Lock member nodes of the group of the primary node
+        for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
+          self.needed_locks[locking.LEVEL_NODE].extend(
+            self.cfg.GetNodeGroup(group_uuid).members)
+      else:
+        primary_only = bool(self.op.nodes)
+        self._LockInstancesNodes(primary_only=primary_only)
     elif level == locking.LEVEL_NODE_RES:
       # Copy node locks
       self.needed_locks[locking.LEVEL_NODE_RES] = \
@@ -7132,18 +7255,27 @@ class LUInstanceRecreateDisks(LogicalUnit):
       primary_node = self.op.nodes[0]
     else:
       primary_node = instance.primary_node
-    _CheckNodeOnline(self, primary_node)
+    if not self.op.iallocator:
+      _CheckNodeOnline(self, primary_node)
 
     if instance.disk_template == constants.DT_DISKLESS:
       raise errors.OpPrereqError("Instance '%s' has no disks" %
                                  self.op.instance_name, errors.ECODE_INVAL)
 
+    # Verify if node group locks are still correct
+    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
+    if owned_groups:
+      # Node group locks are acquired only for the primary node (and only
+      # when the allocator is used)
+      _CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups,
+                               primary_only=True)
+
     # if we replace nodes *and* the old primary is offline, we don't
     # check
     assert instance.primary_node in self.owned_locks(locking.LEVEL_NODE)
     assert instance.primary_node in self.owned_locks(locking.LEVEL_NODE_RES)
     old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
-    if not (self.op.nodes and old_pnode.offline):
+    if not ((self.op.iallocator or self.op.nodes) and old_pnode.offline):
       _CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
                           msg="cannot recreate disks")
 
@@ -7157,7 +7289,7 @@ class LUInstanceRecreateDisks(LogicalUnit):
       raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
                                  errors.ECODE_INVAL)
 
-    if (self.op.nodes and
+    if ((self.op.nodes or self.op.iallocator) and
         sorted(self.disks.keys()) != range(len(instance.disks))):
       raise errors.OpPrereqError("Can't recreate disks partially and"
                                  " change the nodes at the same time",
@@ -7165,6 +7297,13 @@ class LUInstanceRecreateDisks(LogicalUnit):
 
     self.instance = instance
 
+    if self.op.iallocator:
+      self._RunAllocator()
+
+    # Release unneeded node and node resource locks
+    _ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.nodes)
+    _ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.nodes)
+
   def Exec(self, feedback_fn):
     """Recreate the disks.
 
@@ -8004,7 +8143,8 @@ class TLMigrateInstance(Tasklet):
       if self.target_node == instance.primary_node:
         raise errors.OpPrereqError("Cannot migrate instance %s"
                                    " to its primary (%s)" %
-                                   (instance.name, instance.primary_node))
+                                   (instance.name, instance.primary_node),
+                                   errors.ECODE_STATE)
 
       if len(self.lu.tasklets) == 1:
         # It is safe to release locks only when we're the only tasklet
@@ -8488,7 +8628,7 @@ class TLMigrateInstance(Tasklet):
       disks = _ExpandCheckDisks(instance, instance.disks)
       self.feedback_fn("* unmapping instance's disks from %s" % source_node)
       for disk in disks:
-        result = self.rpc.call_blockdev_shutdown(source_node, disk)
+        result = self.rpc.call_blockdev_shutdown(source_node, (disk, instance))
         msg = result.fail_msg
         if msg:
           logging.error("Migration was successful, but couldn't unmap the"
@@ -8999,18 +9139,20 @@ def _RemoveDisks(lu, instance, target_node=None, ignore_failures=False):
 
   all_result = True
   ports_to_release = set()
-  for (idx, device) in enumerate(instance.disks):
+  anno_disks = _AnnotateDiskParams(instance, instance.disks, lu.cfg)
+  for (idx, device) in enumerate(anno_disks):
     if target_node:
       edata = [(target_node, device)]
     else:
       edata = device.ComputeNodeTree(instance.primary_node)
     for node, disk in edata:
       lu.cfg.SetDiskID(disk, node)
-      msg = lu.rpc.call_blockdev_remove(node, disk).fail_msg
-      if msg:
+      result = lu.rpc.call_blockdev_remove(node, disk)
+      if result.fail_msg:
         lu.LogWarning("Could not remove disk %s on node %s,"
-                      " continuing anyway: %s", idx, node, msg)
-        all_result = False
+                      " continuing anyway: %s", idx, node, result.fail_msg)
+        if not (result.offline and node != instance.primary_node):
+          all_result = False
 
     # if this is a DRBD disk, return its port to the pool
     if device.dev_type in constants.LDS_DRBD:
@@ -9530,7 +9672,9 @@ class LUInstanceCreate(LogicalUnit):
         if self.op.disk_template not in constants.DISK_TEMPLATES:
           raise errors.OpPrereqError("Disk template specified in configuration"
                                      " file is not one of the allowed values:"
-                                     " %s" % " ".join(constants.DISK_TEMPLATES))
+                                     " %s" %
+                                     " ".join(constants.DISK_TEMPLATES),
+                                     errors.ECODE_INVAL)
       else:
         raise errors.OpPrereqError("No disk template specified and the export"
                                    " is missing the disk_template information",
@@ -9643,7 +9787,8 @@ class LUInstanceCreate(LogicalUnit):
 
       cfg_storagedir = get_fsd_fn()
       if not cfg_storagedir:
-        raise errors.OpPrereqError("Cluster file storage dir not defined")
+        raise errors.OpPrereqError("Cluster file storage dir not defined",
+                                   errors.ECODE_STATE)
       joinargs.append(cfg_storagedir)
 
       if self.op.file_storage_dir is not None:
@@ -10138,6 +10283,11 @@ class LUInstanceCreate(LogicalUnit):
     _ReleaseLocks(self, locking.LEVEL_NODE_RES)
 
     if iobj.disk_template != constants.DT_DISKLESS and not self.adopt_disks:
+      # we need to set the disks ID to the primary node, since the
+      # preceding code might or might have not done it, depending on
+      # disk template and other options
+      for disk in iobj.disks:
+        self.cfg.SetDiskID(disk, pnode_name)
       if self.op.mode == constants.INSTANCE_CREATE:
         if not self.op.no_install:
           pause_sync = (iobj.disk_template in constants.DTS_INT_MIRROR and
@@ -11149,7 +11299,8 @@ class TLReplaceDisks(Tasklet):
     for idx, dev in enumerate(self.instance.disks):
       self.lu.LogInfo("Shutting down drbd for disk/%d on old node" % idx)
       self.cfg.SetDiskID(dev, self.target_node)
-      msg = self.rpc.call_blockdev_shutdown(self.target_node, dev).fail_msg
+      msg = self.rpc.call_blockdev_shutdown(self.target_node,
+                                            (dev, self.instance)).fail_msg
       if msg:
         self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
                            "node: %s" % (idx, msg),
@@ -11658,23 +11809,23 @@ class LUInstanceGrowDisk(LogicalUnit):
     for node in instance.all_nodes:
       self.cfg.SetDiskID(disk, node)
       result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
-                                           True)
+                                           True, True)
       result.Raise("Grow request failed to node %s" % node)
 
     # We know that (as far as we can test) operations across different
-    # nodes will succeed, time to run it for real
+    # nodes will succeed, time to run it for real on the backing storage
     for node in instance.all_nodes:
       self.cfg.SetDiskID(disk, node)
       result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
-                                           False)
+                                           False, True)
       result.Raise("Grow request failed to node %s" % node)
 
-      # TODO: Rewrite code to work properly
-      # DRBD goes into sync mode for a short amount of time after executing the
-      # "resize" command. DRBD 8.x below version 8.0.13 contains a bug whereby
-      # calling "resize" in sync mode fails. Sleeping for a short amount of
-      # time is a work-around.
-      time.sleep(5)
+    # And now execute it for logical storage, on the primary node
+    node = instance.primary_node
+    self.cfg.SetDiskID(disk, node)
+    result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
+                                         False, False)
+    result.Raise("Grow request failed to node %s" % node)
 
     disk.RecordGrow(self.delta)
     self.cfg.Update(instance, feedback_fn)
@@ -12328,8 +12479,6 @@ class LUInstanceSetParams(LogicalUnit):
     private.params = new_params
     private.filled = new_filled_params
 
-    return (None, None)
-
   def CheckPrereq(self):
     """Check prerequisites.
 
@@ -12510,8 +12659,7 @@ class LUInstanceSetParams(LogicalUnit):
             raise errors.OpPrereqError("This change will prevent the instance"
                                        " from starting, due to %d MB of memory"
                                        " missing on its primary node" %
-                                       miss_mem,
-                                       errors.ECODE_NORES)
+                                       miss_mem, errors.ECODE_NORES)
 
       if be_new[constants.BE_AUTO_BALANCE]:
         for node, nres in nodeinfo.items():
@@ -12537,8 +12685,8 @@ class LUInstanceSetParams(LogicalUnit):
                                                 instance.hypervisor)
       remote_info.Raise("Error checking node %s" % instance.primary_node)
       if not remote_info.payload: # not running already
-        raise errors.OpPrereqError("Instance %s is not running" % instance.name,
-                                   errors.ECODE_STATE)
+        raise errors.OpPrereqError("Instance %s is not running" %
+                                   instance.name, errors.ECODE_STATE)
 
       current_memory = remote_info.payload["memory"]
       if (not self.op.force and
@@ -12560,16 +12708,16 @@ class LUInstanceSetParams(LogicalUnit):
 
     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
       raise errors.OpPrereqError("Disk operations not supported for"
-                                 " diskless instances",
-                                 errors.ECODE_INVAL)
+                                 " diskless instances", errors.ECODE_INVAL)
 
     def _PrepareNicCreate(_, params, private):
-      return self._PrepareNicModification(params, private, None, {},
-                                          cluster, pnode)
+      self._PrepareNicModification(params, private, None, {}, cluster, pnode)
+      return (None, None)
 
     def _PrepareNicMod(_, nic, params, private):
-      return self._PrepareNicModification(params, private, nic.ip,
-                                          nic.nicparams, cluster, pnode)
+      self._PrepareNicModification(params, private, nic.ip,
+                                   nic.nicparams, cluster, pnode)
+      return None
 
     # Verify NIC changes (operating on copy)
     nics = instance.nics[:]
@@ -12681,8 +12829,8 @@ class LUInstanceSetParams(LogicalUnit):
     snode = instance.secondary_nodes[0]
     feedback_fn("Converting template to plain")
 
-    old_disks = instance.disks
-    new_disks = [d.children[0] for d in old_disks]
+    old_disks = _AnnotateDiskParams(instance, instance.disks, self.cfg)
+    new_disks = [d.children[0] for d in instance.disks]
 
     # copy over size and mode
     for parent, child in zip(old_disks, new_disks):
@@ -12772,7 +12920,8 @@ class LUInstanceSetParams(LogicalUnit):
     """Removes a disk.
 
     """
-    for node, disk in root.ComputeNodeTree(self.instance.primary_node):
+    (anno_disk,) = _AnnotateDiskParams(self.instance, [root], self.cfg)
+    for node, disk in anno_disk.ComputeNodeTree(self.instance.primary_node):
       self.cfg.SetDiskID(disk, node)
       msg = self.rpc.call_blockdev_remove(node, disk).fail_msg
       if msg:
@@ -13066,8 +13215,7 @@ class LUInstanceChangeGroup(LogicalUnit):
       raise errors.OpPrereqError("Can't compute solution for changing group of"
                                  " instance '%s' using iallocator '%s': %s" %
                                  (self.op.instance_name, self.op.iallocator,
-                                  ial.info),
-                                 errors.ECODE_NORES)
+                                  ial.info), errors.ECODE_NORES)
 
     jobs = _LoadNodeEvacResult(self, ial.result, self.op.early_release, False)
 
@@ -13296,7 +13444,7 @@ class LUBackupExport(LogicalUnit):
         self.instance.admin_state == constants.ADMINST_UP and
         not self.op.shutdown):
       raise errors.OpPrereqError("Can not remove instance without shutting it"
-                                 " down before")
+                                 " down before", errors.ECODE_STATE)
 
     if self.op.mode == constants.EXPORT_MODE_LOCAL:
       self.op.target_node = _ExpandNodeName(self.cfg, self.op.target_node)
@@ -13326,7 +13474,8 @@ class LUBackupExport(LogicalUnit):
       try:
         (key_name, hmac_digest, hmac_salt) = self.x509_key_name
       except (TypeError, ValueError), err:
-        raise errors.OpPrereqError("Invalid data for X509 key name: %s" % err)
+        raise errors.OpPrereqError("Invalid data for X509 key name: %s" % err,
+                                   errors.ECODE_INVAL)
 
       if not utils.VerifySha1Hmac(cds, key_name, hmac_digest, salt=hmac_salt):
         raise errors.OpPrereqError("HMAC for X509 key name is wrong",
@@ -13602,6 +13751,11 @@ class LUGroupAdd(LogicalUnit):
           utils.ForceDictType(self.op.diskparams[templ],
                               constants.DISK_DT_TYPES)
       self.new_diskparams = self.op.diskparams
+      try:
+        utils.VerifyDictOptions(self.new_diskparams, constants.DISK_DT_DEFAULTS)
+      except errors.OpPrereqError, err:
+        raise errors.OpPrereqError("While verify diskparams options: %s" % err,
+                                   errors.ECODE_INVAL)
     else:
       self.new_diskparams = {}
 
@@ -13609,7 +13763,7 @@ class LUGroupAdd(LogicalUnit):
       cluster = self.cfg.GetClusterInfo()
       full_ipolicy = cluster.SimpleFillIPolicy(self.op.ipolicy)
       try:
-        objects.InstancePolicy.CheckParameterSyntax(full_ipolicy)
+        objects.InstancePolicy.CheckParameterSyntax(full_ipolicy, False)
       except errors.ConfigurationError, err:
         raise errors.OpPrereqError("Invalid instance policy: %s" % err,
                                    errors.ECODE_INVAL)
@@ -13963,6 +14117,11 @@ class LUGroupSetParams(LogicalUnit):
       # As we've all subdicts of diskparams ready, lets merge the actual
       # dict with all updated subdicts
       self.new_diskparams = objects.FillDict(diskparams, new_diskparams)
+      try:
+        utils.VerifyDictOptions(self.new_diskparams, constants.DISK_DT_DEFAULTS)
+      except errors.OpPrereqError, err:
+        raise errors.OpPrereqError("While verify diskparams options: %s" % err,
+                                   errors.ECODE_INVAL)
 
     if self.op.hv_state:
       self.new_hv_state = _MergeAndVerifyHvState(self.op.hv_state,
@@ -14071,9 +14230,8 @@ class LUGroupRemove(LogicalUnit):
 
     # Verify the cluster would not be left group-less.
     if len(self.cfg.GetNodeGroupList()) == 1:
-      raise errors.OpPrereqError("Group '%s' is the only group,"
-                                 " cannot be removed" %
-                                 self.op.group_name,
+      raise errors.OpPrereqError("Group '%s' is the only group, cannot be"
+                                 " removed" % self.op.group_name,
                                  errors.ECODE_STATE)
 
   def BuildHooksEnv(self):