Call blockdev_create RPCs with the exclusive_storage flag
[ganeti-local] / lib / cmdlib.py
index 3aed190..c91c2d1 100644 (file)
@@ -697,6 +697,39 @@ def _SupportsOob(cfg, node):
   return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM]
 
 
+def _IsExclusiveStorageEnabledNode(cfg, node):
+  """Whether exclusive_storage is in effect for the given node.
+
+  @type cfg: L{config.ConfigWriter}
+  @param cfg: The cluster configuration
+  @type node: L{objects.Node}
+  @param node: The node
+  @rtype: bool
+  @return: The effective value of exclusive_storage
+
+  """
+  return cfg.GetNdParams(node)[constants.ND_EXCLUSIVE_STORAGE]
+
+
+def _IsExclusiveStorageEnabledNodeName(cfg, nodename):
+  """Whether exclusive_storage is in effect for the given node.
+
+  @type cfg: L{config.ConfigWriter}
+  @param cfg: The cluster configuration
+  @type nodename: string
+  @param nodename: The node
+  @rtype: bool
+  @return: The effective value of exclusive_storage
+  @raise errors.OpPrereqError: if no node exists with the given name
+
+  """
+  ni = cfg.GetNodeInfo(nodename)
+  if ni is None:
+    raise errors.OpPrereqError("Invalid node name %s" % nodename,
+                               errors.ECODE_NOENT)
+  return _IsExclusiveStorageEnabledNode(cfg, ni)
+
+
 def _CopyLockList(names):
   """Makes a copy of a list of lock names.
 
@@ -1367,27 +1400,6 @@ def _BuildNetworkHookEnv(name, subnet, gateway, network6, gateway6,
   return env
 
 
-def _BuildNetworkHookEnvByObject(net):
-  """Builds network related env varliables for hooks
-
-  @type net: L{objects.Network}
-  @param net: the network object
-
-  """
-  args = {
-    "name": net.name,
-    "subnet": net.network,
-    "gateway": net.gateway,
-    "network6": net.network6,
-    "gateway6": net.gateway6,
-    "network_type": net.network_type,
-    "mac_prefix": net.mac_prefix,
-    "tags": net.tags,
-  }
-
-  return _BuildNetworkHookEnv(**args) # pylint: disable=W0142
-
-
 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
                           minmem, maxmem, vcpus, nics, disk_template, disks,
                           bep, hvp, hypervisor_name, tags):
@@ -2445,19 +2457,20 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
                                             constants.MIN_VG_SIZE)
       _ErrorIf(vgstatus, constants.CV_ENODELVM, node, vgstatus)
 
-    # check pv names
-    pvlist = nresult.get(constants.NV_PVLIST, None)
-    test = pvlist is None
+    # check pv names (and possibly sizes)
+    pvlist_dict = nresult.get(constants.NV_PVLIST, None)
+    test = pvlist_dict is None
     _ErrorIf(test, constants.CV_ENODELVM, node, "Can't get PV list from node")
     if not test:
+      pvlist = map(objects.LvmPvInfo.FromDict, pvlist_dict)
       # check that ':' is not present in PV names, since it's a
       # special character for lvcreate (denotes the range of PEs to
       # use on the PV)
-      for _, pvname, owner_vg in pvlist:
-        test = ":" in pvname
+      for pv in pvlist:
+        test = ":" in pv.name
         _ErrorIf(test, constants.CV_ENODELVM, node,
                  "Invalid character ':' in PV '%s' of VG '%s'",
-                 pvname, owner_vg)
+                 pv.name, pv.vg_name)
 
   def _VerifyNodeBridges(self, ninfo, nresult, bridges):
     """Check the node bridges.
@@ -4814,10 +4827,11 @@ class LUOobCommand(NoHooksLU):
       locking.LEVEL_NODE: lock_names,
       }
 
+    self.share_locks[locking.LEVEL_NODE_ALLOC] = 1
+
     if not self.op.node_names:
       # Acquire node allocation lock only if all nodes are affected
       self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
-      self.share_locks[locking.LEVEL_NODE_ALLOC] = 1
 
   def CheckPrereq(self):
     """Check prerequisites.
@@ -6530,7 +6544,13 @@ class _ClusterQuery(_QueryBase):
       drain_flag = NotImplemented
 
     if query.CQ_WATCHER_PAUSE in self.requested_data:
-      watcher_pause = utils.ReadWatcherPauseFile(pathutils.WATCHER_PAUSEFILE)
+      master_name = lu.cfg.GetMasterNode()
+
+      result = lu.rpc.call_get_watcher_pause(master_name)
+      result.Raise("Can't retrieve watcher pause from master node '%s'" %
+                   master_name)
+
+      watcher_pause = result.payload
     else:
       watcher_pause = NotImplemented
 
@@ -6781,9 +6801,9 @@ def _ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
   """Checks if a node has enough free memory.
 
-  This function check if a given node has the needed amount of free
+  This function checks if a given node has the needed amount of free
   memory. In case the node has less memory or we cannot get the
-  information from the node, this function raise an OpPrereqError
+  information from the node, this function raises an OpPrereqError
   exception.
 
   @type lu: C{LogicalUnit}
@@ -6821,11 +6841,11 @@ def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
 
 
 def _CheckNodesFreeDiskPerVG(lu, nodenames, req_sizes):
-  """Checks if nodes have enough free disk space in the all VGs.
+  """Checks if nodes have enough free disk space in all the VGs.
 
-  This function check if all given nodes have the needed amount of
+  This function checks if all given nodes have the needed amount of
   free disk. In case any node has less disk or we cannot get the
-  information from the node, this function raise an OpPrereqError
+  information from the node, this function raises an OpPrereqError
   exception.
 
   @type lu: C{LogicalUnit}
@@ -6846,9 +6866,9 @@ def _CheckNodesFreeDiskPerVG(lu, nodenames, req_sizes):
 def _CheckNodesFreeDiskOnVG(lu, nodenames, vg, requested):
   """Checks if nodes have enough free disk space in the specified VG.
 
-  This function check if all given nodes have the needed amount of
+  This function checks if all given nodes have the needed amount of
   free disk. In case any node has less disk or we cannot get the
-  information from the node, this function raise an OpPrereqError
+  information from the node, this function raises an OpPrereqError
   exception.
 
   @type lu: C{LogicalUnit}
@@ -7170,7 +7190,10 @@ class LUInstanceShutdown(LogicalUnit):
     assert self.instance is not None, \
       "Cannot retrieve locked instance %s" % self.op.instance_name
 
-    _CheckInstanceState(self, self.instance, INSTANCE_ONLINE)
+    if not self.op.force:
+      _CheckInstanceState(self, self.instance, INSTANCE_ONLINE)
+    else:
+      self.LogWarning("Ignoring offline instance check")
 
     self.primary_offline = \
       self.cfg.GetNodeInfo(self.instance.primary_node).offline
@@ -7188,7 +7211,9 @@ class LUInstanceShutdown(LogicalUnit):
     node_current = instance.primary_node
     timeout = self.op.timeout
 
-    if not self.op.no_remember:
+    # If the instance is offline we shouldn't mark it as down, as that
+    # resets the offline flag.
+    if not self.op.no_remember and instance.admin_state in INSTANCE_ONLINE:
       self.cfg.MarkInstanceDown(instance.name)
 
     if self.primary_offline:
@@ -7299,7 +7324,7 @@ class LUInstanceRecreateDisks(LogicalUnit):
   HTYPE = constants.HTYPE_INSTANCE
   REQ_BGL = False
 
-  _MODIFYABLE = frozenset([
+  _MODIFYABLE = compat.UniqueFrozenset([
     constants.IDISK_SIZE,
     constants.IDISK_MODE,
     ])
@@ -7311,6 +7336,7 @@ class LUInstanceRecreateDisks(LogicalUnit):
     # TODO: Implement support changing VG while recreating
     constants.IDISK_VG,
     constants.IDISK_METAVG,
+    constants.IDISK_PROVIDER,
     ]))
 
   def _RunAllocator(self):
@@ -7867,6 +7893,8 @@ def _DeclareLocksForMigration(lu, level):
 
     instance = lu.cfg.GetInstanceInfo(lu.op.instance_name)
 
+    # Node locks are already declared here rather than at LEVEL_NODE as we need
+    # the instance object anyway to declare the node allocation lock.
     if instance.disk_template in constants.DTS_EXT_MIRROR:
       if lu.op.target_node is None:
         lu.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
@@ -7880,7 +7908,8 @@ def _DeclareLocksForMigration(lu, level):
 
   elif level == locking.LEVEL_NODE:
     # Node locks are declared together with the node allocation lock
-    assert lu.needed_locks[locking.LEVEL_NODE]
+    assert (lu.needed_locks[locking.LEVEL_NODE] or
+            lu.needed_locks[locking.LEVEL_NODE] is locking.ALL_SET)
 
   elif level == locking.LEVEL_NODE_RES:
     # Copy node locks
@@ -8855,9 +8884,9 @@ class TLMigrateInstance(Tasklet):
       self._GoReconnect(False)
       self._WaitUntilSync()
 
-    # If the instance's disk template is `rbd' and there was a successful
-    # migration, unmap the device from the source node.
-    if self.instance.disk_template == constants.DT_RBD:
+    # If the instance's disk template is `rbd' or `ext' and there was a
+    # successful migration, unmap the device from the source node.
+    if self.instance.disk_template in (constants.DT_RBD, constants.DT_EXT):
       disks = _ExpandCheckDisks(instance, instance.disks)
       self.feedback_fn("* unmapping instance's disks from %s" % source_node)
       for disk in disks:
@@ -8988,12 +9017,13 @@ def _CreateBlockDev(lu, node, instance, device, force_create, info,
 
   """
   (disk,) = _AnnotateDiskParams(instance, [device], lu.cfg)
+  excl_stor = _IsExclusiveStorageEnabledNodeName(lu.cfg, node)
   return _CreateBlockDevInner(lu, node, instance, disk, force_create, info,
-                              force_open)
+                              force_open, excl_stor)
 
 
 def _CreateBlockDevInner(lu, node, instance, device, force_create,
-                         info, force_open):
+                         info, force_open, excl_stor):
   """Create a tree of block devices on a given node.
 
   If this device type has to be created on secondaries, create it and
@@ -9020,6 +9050,8 @@ def _CreateBlockDevInner(lu, node, instance, device, force_create,
       L{backend.BlockdevCreate} function where it specifies
       whether we run on primary or not, and it affects both
       the child assembly and the device own Open() execution
+  @type excl_stor: boolean
+  @param excl_stor: Whether exclusive_storage is active for the node
 
   """
   if device.CreateOnSecondary():
@@ -9028,15 +9060,17 @@ def _CreateBlockDevInner(lu, node, instance, device, force_create,
   if device.children:
     for child in device.children:
       _CreateBlockDevInner(lu, node, instance, child, force_create,
-                           info, force_open)
+                           info, force_open, excl_stor)
 
   if not force_create:
     return
 
-  _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
+  _CreateSingleBlockDev(lu, node, instance, device, info, force_open,
+                        excl_stor)
 
 
-def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
+def _CreateSingleBlockDev(lu, node, instance, device, info, force_open,
+                          excl_stor):
   """Create a single block device on a given node.
 
   This will not recurse over children of the device, so they must be
@@ -9055,11 +9089,14 @@ def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
       L{backend.BlockdevCreate} function where it specifies
       whether we run on primary or not, and it affects both
       the child assembly and the device own Open() execution
+  @type excl_stor: boolean
+  @param excl_stor: Whether exclusive_storage is active for the node
 
   """
   lu.cfg.SetDiskID(device, node)
   result = lu.rpc.call_blockdev_create(node, device, device.size,
-                                       instance.name, force_open, info)
+                                       instance.name, force_open, info,
+                                       excl_stor)
   result.Raise("Can't create block device %s on"
                " node %s for instance %s" % (device, node, instance.name))
   if device.physical_id is None:
@@ -9107,6 +9144,7 @@ def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names,
 _DISK_TEMPLATE_NAME_PREFIX = {
   constants.DT_PLAIN: "",
   constants.DT_RBD: ".rbd",
+  constants.DT_EXT: ".ext",
   }
 
 
@@ -9116,6 +9154,7 @@ _DISK_TEMPLATE_DEVICE_TYPE = {
   constants.DT_SHARED_FILE: constants.LD_FILE,
   constants.DT_BLOCK: constants.LD_BLOCKDEV,
   constants.DT_RBD: constants.LD_RBD,
+  constants.DT_EXT: constants.LD_EXT,
   }
 
 
@@ -9127,8 +9166,6 @@ def _GenerateDiskTemplate(
   """Generate the entire disk layout for a given template type.
 
   """
-  #TODO: compute space requirements
-
   vgname = lu.cfg.GetVGName()
   disk_count = len(disk_info)
   disks = []
@@ -9197,12 +9234,27 @@ def _GenerateDiskTemplate(
                                        disk[constants.IDISK_ADOPT])
     elif template_name == constants.DT_RBD:
       logical_id_fn = lambda idx, _, disk: ("rbd", names[idx])
+    elif template_name == constants.DT_EXT:
+      def logical_id_fn(idx, _, disk):
+        provider = disk.get(constants.IDISK_PROVIDER, None)
+        if provider is None:
+          raise errors.ProgrammerError("Disk template is %s, but '%s' is"
+                                       " not found", constants.DT_EXT,
+                                       constants.IDISK_PROVIDER)
+        return (provider, names[idx])
     else:
       raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)
 
     dev_type = _DISK_TEMPLATE_DEVICE_TYPE[template_name]
 
     for idx, disk in enumerate(disk_info):
+      params = {}
+      # Only for the Ext template add disk_info to params
+      if template_name == constants.DT_EXT:
+        params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
+        for key in disk:
+          if key not in constants.IDISK_PARAMS:
+            params[key] = disk[key]
       disk_index = idx + base_index
       size = disk[constants.IDISK_SIZE]
       feedback_fn("* disk %s, size %s" %
@@ -9211,7 +9263,7 @@ def _GenerateDiskTemplate(
                                 logical_id=logical_id_fn(idx, disk_index, disk),
                                 iv_name="disk/%d" % disk_index,
                                 mode=disk[constants.IDISK_MODE],
-                                params={}))
+                                params=params))
 
   return disks
 
@@ -9661,7 +9713,7 @@ def _ComputeDisks(op, default_vg):
   @param op: The instance opcode
   @param default_vg: The default_vg to assume
 
-  @return: The computer disks
+  @return: The computed disks
 
   """
   disks = []
@@ -9679,16 +9731,37 @@ def _ComputeDisks(op, default_vg):
       raise errors.OpPrereqError("Invalid disk size '%s'" % size,
                                  errors.ECODE_INVAL)
 
+    ext_provider = disk.get(constants.IDISK_PROVIDER, None)
+    if ext_provider and op.disk_template != constants.DT_EXT:
+      raise errors.OpPrereqError("The '%s' option is only valid for the %s"
+                                 " disk template, not %s" %
+                                 (constants.IDISK_PROVIDER, constants.DT_EXT,
+                                 op.disk_template), errors.ECODE_INVAL)
+
     data_vg = disk.get(constants.IDISK_VG, default_vg)
     new_disk = {
       constants.IDISK_SIZE: size,
       constants.IDISK_MODE: mode,
       constants.IDISK_VG: data_vg,
       }
+
     if constants.IDISK_METAVG in disk:
       new_disk[constants.IDISK_METAVG] = disk[constants.IDISK_METAVG]
     if constants.IDISK_ADOPT in disk:
       new_disk[constants.IDISK_ADOPT] = disk[constants.IDISK_ADOPT]
+
+    # For extstorage, demand the `provider' option and add any
+    # additional parameters (ext-params) to the dict
+    if op.disk_template == constants.DT_EXT:
+      if ext_provider:
+        new_disk[constants.IDISK_PROVIDER] = ext_provider
+        for key in disk:
+          if key not in constants.IDISK_PARAMS:
+            new_disk[key] = disk[key]
+      else:
+        raise errors.OpPrereqError("Missing provider for template '%s'" %
+                                   constants.DT_EXT, errors.ECODE_INVAL)
+
     disks.append(new_disk)
 
   return disks
@@ -9712,6 +9785,16 @@ def _ComputeFullBeParams(op, cluster):
   return cluster.SimpleFillBE(op.beparams)
 
 
+def _CheckOpportunisticLocking(op):
+  """Generate error if opportunistic locking is not possible.
+
+  """
+  if op.opportunistic_locking and not op.iallocator:
+    raise errors.OpPrereqError("Opportunistic locking is only available in"
+                               " combination with an instance allocator",
+                               errors.ECODE_INVAL)
+
+
 class LUInstanceCreate(LogicalUnit):
   """Create an instance.
 
@@ -9745,7 +9828,8 @@ class LUInstanceCreate(LogicalUnit):
     # check disks. parameter names and consistent adopt/no-adopt strategy
     has_adopt = has_no_adopt = False
     for disk in self.op.disks:
-      utils.ForceDictType(disk, constants.IDISK_PARAMS_TYPES)
+      if self.op.disk_template != constants.DT_EXT:
+        utils.ForceDictType(disk, constants.IDISK_PARAMS_TYPES)
       if constants.IDISK_ADOPT in disk:
         has_adopt = True
       else:
@@ -9807,6 +9891,8 @@ class LUInstanceCreate(LogicalUnit):
                         " template")
         self.op.snode = None
 
+    _CheckOpportunisticLocking(self.op)
+
     self._cds = _GetClusterDomainSecret()
 
     if self.op.mode == constants.INSTANCE_IMPORT:
@@ -9957,10 +10043,17 @@ class LUInstanceCreate(LogicalUnit):
     ial.Run(self.op.iallocator)
 
     if not ial.success:
+      # When opportunistic locks are used only a temporary failure is generated
+      if self.op.opportunistic_locking:
+        ecode = errors.ECODE_TEMP_NORES
+      else:
+        ecode = errors.ECODE_NORES
+
       raise errors.OpPrereqError("Can't compute nodes using"
                                  " iallocator '%s': %s" %
                                  (self.op.iallocator, ial.info),
-                                 errors.ECODE_NORES)
+                                 ecode)
+
     self.op.pnode = ial.result[0]
     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
                  self.op.instance_name, self.op.iallocator,
@@ -10260,7 +10353,7 @@ class LUInstanceCreate(LogicalUnit):
       self._RevertToDefaults(cluster)
 
     # NIC buildup
-    self.nics = _ComputeNics(self.op, cluster, self.hostname1.ip, self.cfg,
+    self.nics = _ComputeNics(self.op, cluster, self.check_ip, self.cfg,
                              self.proc.GetECId())
 
     # disk checks/pre-build
@@ -10373,10 +10466,10 @@ class LUInstanceCreate(LogicalUnit):
                                          " or does not belong to network %s" %
                                          (nic.ip, net),
                                          errors.ECODE_NOTUNIQUE)
-      else:
-        # net is None, ip None or given
-        if self.op.conflicts_check:
-          _CheckForConflictingIp(self, nic.ip, self.pnode.name)
+
+      # net is None, ip None or given
+      elif self.op.conflicts_check:
+        _CheckForConflictingIp(self, nic.ip, self.pnode.name)
 
     # mirror node verification
     if self.op.disk_template in constants.DTS_INT_MIRROR:
@@ -10422,6 +10515,9 @@ class LUInstanceCreate(LogicalUnit):
         # Any function that checks prerequisites can be placed here.
         # Check if there is enough space on the RADOS cluster.
         _CheckRADOSFreeSpace()
+      elif self.op.disk_template == constants.DT_EXT:
+        # FIXME: Function that checks prereqs if needed
+        pass
       else:
         # Check lv size requirements, if not adopting
         req_sizes = _ComputeDiskSizePerVG(self.op.disk_template, self.disks)
@@ -10526,6 +10622,9 @@ class LUInstanceCreate(LogicalUnit):
 
     _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
 
+    #TODO: _CheckExtParams (remotely)
+    # Check parameters for extstorage
+
     # memory check on primary node
     #TODO(dynmem): use MINMEM for checking
     if self.op.start:
@@ -10815,6 +10914,8 @@ class LUInstanceMultiAlloc(NoHooksLU):
                                    " or set a cluster-wide default iallocator",
                                    errors.ECODE_INVAL)
 
+    _CheckOpportunisticLocking(self.op)
+
     dups = utils.FindDuplicates([op.instance_name for op in self.op.instances])
     if dups:
       raise errors.OpPrereqError("There are duplicate instance names: %s" %
@@ -10887,7 +10988,7 @@ class LUInstanceMultiAlloc(NoHooksLU):
     self.ia_result = ial.result
 
     if self.op.dry_run:
-      self.dry_run_rsult = objects.FillDict(self._ConstructPartialResult(), {
+      self.dry_run_result = objects.FillDict(self._ConstructPartialResult(), {
         constants.JOB_IDS_KEY: [],
         })
 
@@ -11414,8 +11515,8 @@ class TLReplaceDisks(Tasklet):
 
     feedback_fn("Replacing disk(s) %s for instance '%s'" %
                 (utils.CommaJoin(self.disks), self.instance.name))
-    feedback_fn("Current primary node: %s", self.instance.primary_node)
-    feedback_fn("Current seconary node: %s",
+    feedback_fn("Current primary node: %s" % self.instance.primary_node)
+    feedback_fn("Current seconary node: %s" %
                 utils.CommaJoin(self.instance.secondary_nodes))
 
     activate_disks = (self.instance.admin_state != constants.ADMINST_UP)
@@ -11536,11 +11637,13 @@ class TLReplaceDisks(Tasklet):
       new_lvs = [lv_data, lv_meta]
       old_lvs = [child.Copy() for child in dev.children]
       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
+      excl_stor = _IsExclusiveStorageEnabledNodeName(self.lu.cfg, node_name)
 
       # we pass force_create=True to force the LVM creation
       for new_lv in new_lvs:
         _CreateBlockDevInner(self.lu, node_name, self.instance, new_lv, True,
-                             _GetInstanceInfoText(self.instance), False)
+                             _GetInstanceInfoText(self.instance), False,
+                             excl_stor)
 
     return iv_names
 
@@ -11749,13 +11852,15 @@ class TLReplaceDisks(Tasklet):
     # Step: create new storage
     self.lu.LogStep(3, steps_total, "Allocate new storage")
     disks = _AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
+    excl_stor = _IsExclusiveStorageEnabledNodeName(self.lu.cfg, self.new_node)
     for idx, dev in enumerate(disks):
       self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
                       (self.new_node, idx))
       # we pass force_create=True to force LVM creation
       for new_lv in dev.children:
         _CreateBlockDevInner(self.lu, self.new_node, self.instance, new_lv,
-                             True, _GetInstanceInfoText(self.instance), False)
+                             True, _GetInstanceInfoText(self.instance), False,
+                             excl_stor)
 
     # Step 4: dbrd minors and drbd setups changes
     # after this, we must manually remove the drbd minors on both the
@@ -11799,7 +11904,8 @@ class TLReplaceDisks(Tasklet):
       try:
         _CreateSingleBlockDev(self.lu, self.new_node, self.instance,
                               anno_new_drbd,
-                              _GetInstanceInfoText(self.instance), False)
+                              _GetInstanceInfoText(self.instance), False,
+                              excl_stor)
       except errors.GenericError:
         self.cfg.ReleaseDRBDMinors(self.instance.name)
         raise
@@ -12305,7 +12411,8 @@ class LUInstanceGrowDisk(LogicalUnit):
 
     if instance.disk_template not in (constants.DT_FILE,
                                       constants.DT_SHARED_FILE,
-                                      constants.DT_RBD):
+                                      constants.DT_RBD,
+                                      constants.DT_EXT):
       # TODO: check the free disk space for file, when that feature will be
       # supported
       _CheckNodesFreeDiskPerVG(self, nodenames,
@@ -12805,7 +12912,10 @@ class LUInstanceSetParams(LogicalUnit):
     for (op, _, params) in mods:
       assert ht.TDict(params)
 
-      utils.ForceDictType(params, key_types)
+      # If 'key_types' is an empty dict, we assume we have an
+      # 'ext' template and thus do not ForceDictType
+      if key_types:
+        utils.ForceDictType(params, key_types)
 
       if op == constants.DDM_REMOVE:
         if params:
@@ -12841,9 +12951,18 @@ class LUInstanceSetParams(LogicalUnit):
 
       params[constants.IDISK_SIZE] = size
 
-    elif op == constants.DDM_MODIFY and constants.IDISK_SIZE in params:
-      raise errors.OpPrereqError("Disk size change not possible, use"
-                                 " grow-disk", errors.ECODE_INVAL)
+    elif op == constants.DDM_MODIFY:
+      if constants.IDISK_SIZE in params:
+        raise errors.OpPrereqError("Disk size change not possible, use"
+                                   " grow-disk", errors.ECODE_INVAL)
+      if constants.IDISK_MODE not in params:
+        raise errors.OpPrereqError("Disk 'mode' is the only kind of"
+                                   " modification supported, but missing",
+                                   errors.ECODE_NOENT)
+      if len(params) > 1:
+        raise errors.OpPrereqError("Disk modification doesn't support"
+                                   " additional arbitrary parameters",
+                                   errors.ECODE_INVAL)
 
   @staticmethod
   def _VerifyNicModification(op, params):
@@ -12907,10 +13026,6 @@ class LUInstanceSetParams(LogicalUnit):
     self.op.nics = self._UpgradeDiskNicMods(
       "NIC", self.op.nics, opcodes.OpInstanceSetParams.TestNicModifications)
 
-    # Check disk modifications
-    self._CheckMods("disk", self.op.disks, constants.IDISK_PARAMS_TYPES,
-                    self._VerifyDiskModification)
-
     if self.op.disks and self.op.disk_template is not None:
       raise errors.OpPrereqError("Disk template conversion and other disk"
                                  " changes not supported at the same time",
@@ -13040,6 +13155,10 @@ class LUInstanceSetParams(LogicalUnit):
         raise errors.OpPrereqError("Cannot set the NIC IP address to None"
                                    " on a routed NIC", errors.ECODE_INVAL)
 
+    elif new_mode == constants.NIC_MODE_OVS:
+      # TODO: check OVS link
+      self.LogInfo("OVS links are currently not checked for correctness")
+
     if constants.INIC_MAC in params:
       mac = params[constants.INIC_MAC]
       if mac is None:
@@ -13098,10 +13217,10 @@ class LUInstanceSetParams(LogicalUnit):
         elif new_ip.lower() == constants.NIC_IP_POOL:
           raise errors.OpPrereqError("ip=pool, but no network found",
                                      errors.ECODE_INVAL)
-        else:
-          # new net is None
-          if self.op.conflicts_check:
-            _CheckForConflictingIp(self, new_ip, pnode)
+
+        # new net is None
+        elif self.op.conflicts_check:
+          _CheckForConflictingIp(self, new_ip, pnode)
 
       if old_ip:
         if old_net:
@@ -13121,7 +13240,7 @@ class LUInstanceSetParams(LogicalUnit):
     private.params = new_params
     private.filled = new_filled_params
 
-  def CheckPrereq(self):
+  def CheckPrereq(self): # pylint: disable=R0914
     """Check prerequisites.
 
     This only checks the instance list against the existing names.
@@ -13147,10 +13266,46 @@ class LUInstanceSetParams(LogicalUnit):
     # dictionary with instance information after the modification
     ispec = {}
 
+    # Check disk modifications. This is done here and not in CheckArguments
+    # (as with NICs), because we need to know the instance's disk template
+    if instance.disk_template == constants.DT_EXT:
+      self._CheckMods("disk", self.op.disks, {},
+                      self._VerifyDiskModification)
+    else:
+      self._CheckMods("disk", self.op.disks, constants.IDISK_PARAMS_TYPES,
+                      self._VerifyDiskModification)
+
     # Prepare disk/NIC modifications
     self.diskmod = PrepareContainerMods(self.op.disks, None)
     self.nicmod = PrepareContainerMods(self.op.nics, _InstNicModPrivate)
 
+    # Check the validity of the `provider' parameter
+    if instance.disk_template in constants.DT_EXT:
+      for mod in self.diskmod:
+        ext_provider = mod[2].get(constants.IDISK_PROVIDER, None)
+        if mod[0] == constants.DDM_ADD:
+          if ext_provider is None:
+            raise errors.OpPrereqError("Instance template is '%s' and parameter"
+                                       " '%s' missing, during disk add" %
+                                       (constants.DT_EXT,
+                                        constants.IDISK_PROVIDER),
+                                       errors.ECODE_NOENT)
+        elif mod[0] == constants.DDM_MODIFY:
+          if ext_provider:
+            raise errors.OpPrereqError("Parameter '%s' is invalid during disk"
+                                       " modification" %
+                                       constants.IDISK_PROVIDER,
+                                       errors.ECODE_INVAL)
+    else:
+      for mod in self.diskmod:
+        ext_provider = mod[2].get(constants.IDISK_PROVIDER, None)
+        if ext_provider is not None:
+          raise errors.OpPrereqError("Parameter '%s' is only valid for"
+                                     " instances of type '%s'" %
+                                     (constants.IDISK_PROVIDER,
+                                      constants.DT_EXT),
+                                     errors.ECODE_INVAL)
+
     # OS change
     if self.op.os_name and not self.op.force:
       _CheckNodeHasOS(self, instance.primary_node, self.op.os_name,
@@ -13400,12 +13555,9 @@ class LUInstanceSetParams(LogicalUnit):
     ispec[constants.ISPEC_DISK_COUNT] = len(disk_sizes)
     ispec[constants.ISPEC_DISK_SIZE] = disk_sizes
 
-    if self.op.offline is not None:
-      if self.op.offline:
-        msg = "can't change to offline"
-      else:
-        msg = "can't change to online"
-      _CheckInstanceState(self, instance, CAN_CHANGE_INSTANCE_OFFLINE, msg=msg)
+    if self.op.offline is not None and self.op.offline:
+      _CheckInstanceState(self, instance, CAN_CHANGE_INSTANCE_OFFLINE,
+                          msg="can't change to offline")
 
     # Pre-compute NIC changes (necessary to use result in hooks)
     self._nic_chgdesc = []
@@ -13469,15 +13621,18 @@ class LUInstanceSetParams(LogicalUnit):
                                       self.diskparams)
     anno_disks = rpc.AnnotateDiskParams(constants.DT_DRBD8, new_disks,
                                         self.diskparams)
+    p_excl_stor = _IsExclusiveStorageEnabledNodeName(self.cfg, pnode)
+    s_excl_stor = _IsExclusiveStorageEnabledNodeName(self.cfg, snode)
     info = _GetInstanceInfoText(instance)
     feedback_fn("Creating additional volumes...")
     # first, create the missing data and meta devices
     for disk in anno_disks:
       # unfortunately this is... not too nice
       _CreateSingleBlockDev(self, pnode, instance, disk.children[1],
-                            info, True)
+                            info, True, p_excl_stor)
       for child in disk.children:
-        _CreateSingleBlockDev(self, snode, instance, child, info, True)
+        _CreateSingleBlockDev(self, snode, instance, child, info, True,
+                              s_excl_stor)
     # at this stage, all new LVs have been created, we can rename the
     # old ones
     feedback_fn("Renaming original volumes...")
@@ -13489,9 +13644,10 @@ class LUInstanceSetParams(LogicalUnit):
     feedback_fn("Initializing DRBD devices...")
     # all child devices are in place, we can now create the DRBD devices
     for disk in anno_disks:
-      for node in [pnode, snode]:
+      for (node, excl_stor) in [(pnode, p_excl_stor), (snode, s_excl_stor)]:
         f_create = node == pnode
-        _CreateSingleBlockDev(self, node, instance, disk, info, f_create)
+        _CreateSingleBlockDev(self, node, instance, disk, info, f_create,
+                              excl_stor)
 
     # at this point, the instance has been modified
     instance.disk_template = constants.DT_DRBD8
@@ -15742,8 +15898,10 @@ class LUNetworkAdd(LogicalUnit):
 
     if self.op.conflicts_check:
       self.share_locks[locking.LEVEL_NODE] = 1
+      self.share_locks[locking.LEVEL_NODE_ALLOC] = 1
       self.needed_locks = {
         locking.LEVEL_NODE: locking.ALL_SET,
+        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
         }
     else:
       self.needed_locks = {}
@@ -15758,8 +15916,8 @@ class LUNetworkAdd(LogicalUnit):
     uuid = self.cfg.LookupNetwork(self.op.network_name)
 
     if uuid:
-      raise errors.OpPrereqError("Network '%s' already defined" %
-                                 self.op.network, errors.ECODE_EXISTS)
+      raise errors.OpPrereqError(("Network with name '%s' already exists" %
+                                  self.op.network_name), errors.ECODE_EXISTS)
 
     # Check tag validity
     for tag in self.op.tags:
@@ -15798,7 +15956,7 @@ class LUNetworkAdd(LogicalUnit):
     try:
       pool = network.AddressPool.InitializeNetwork(nobj)
     except errors.AddressPoolError, e:
-      raise errors.OpExecError("Cannot create IP pool for this network. %s" % e)
+      raise errors.OpExecError("Cannot create IP pool for this network: %s" % e)
 
     # Check if we need to reserve the nodes and the cluster master IP
     # These may not be allocated to any instances in routed mode, as
@@ -15849,8 +16007,7 @@ class LUNetworkRemove(LogicalUnit):
 
     if not self.network_uuid:
       raise errors.OpPrereqError(("Network '%s' not found" %
-                                  self.op.network_name),
-                                 errors.ECODE_INVAL)
+                                  self.op.network_name), errors.ECODE_NOENT)
 
     self.share_locks[locking.LEVEL_NODEGROUP] = 1
     self.needed_locks = {
@@ -15922,8 +16079,7 @@ class LUNetworkSetParams(LogicalUnit):
     self.network_uuid = self.cfg.LookupNetwork(self.op.network_name)
     if self.network_uuid is None:
       raise errors.OpPrereqError(("Network '%s' not found" %
-                                  self.op.network_name),
-                                 errors.ECODE_INVAL)
+                                  self.op.network_name), errors.ECODE_NOENT)
 
     self.needed_locks = {
       locking.LEVEL_NETWORK: [self.network_uuid],
@@ -15949,8 +16105,9 @@ class LUNetworkSetParams(LogicalUnit):
       else:
         self.gateway = self.op.gateway
         if self.pool.IsReserved(self.gateway):
-          raise errors.OpPrereqError("%s is already reserved" %
-                                     self.gateway, errors.ECODE_INVAL)
+          raise errors.OpPrereqError("Gateway IP address '%s' is already"
+                                     " reserved" % self.gateway,
+                                     errors.ECODE_STATE)
 
     if self.op.network_type:
       if self.op.network_type == constants.VALUE_NONE:
@@ -16095,11 +16252,8 @@ class _NetworkQuery(_QueryBase):
     """
     do_instances = query.NETQ_INST in self.requested_data
     do_groups = do_instances or (query.NETQ_GROUP in self.requested_data)
-    do_stats = query.NETQ_STATS in self.requested_data
 
-    network_to_groups = None
     network_to_instances = None
-    stats = None
 
     # For NETQ_GROUP, we need to map network->[groups]
     if do_groups:
@@ -16118,33 +16272,30 @@ class _NetworkQuery(_QueryBase):
           group_instances = [instance for instance in all_instances.values()
                              if instance.primary_node in group_nodes]
 
-        for net_uuid in group.networks.keys():
-          if net_uuid in network_to_groups:
-            netparams = group.networks[net_uuid]
-            mode = netparams[constants.NIC_MODE]
-            link = netparams[constants.NIC_LINK]
-            info = group.name + "(" + mode + ", " + link + ")"
+        for net_uuid in self.wanted:
+          netparams = group.networks.get(net_uuid, None)
+          if netparams:
+            info = (group.name, netparams[constants.NIC_MODE],
+                    netparams[constants.NIC_LINK])
+
             network_to_groups[net_uuid].append(info)
 
-            if do_instances:
-              for instance in group_instances:
-                for nic in instance.nics:
-                  if nic.network == self._all_networks[net_uuid].name:
-                    network_to_instances[net_uuid].append(instance.name)
-                    break
-
-    if do_stats:
-      stats = {}
-      for uuid, net in self._all_networks.items():
-        if uuid in self.wanted:
-          pool = network.AddressPool(net)
-          stats[uuid] = {
-            "free_count": pool.GetFreeCount(),
-            "reserved_count": pool.GetReservedCount(),
-            "map": pool.GetMap(),
-            "external_reservations":
-              utils.CommaJoin(pool.GetExternalReservations()),
-            }
+          if do_instances:
+            for instance in group_instances:
+              for nic in instance.nics:
+                if nic.network == self._all_networks[net_uuid].name:
+                  network_to_instances[net_uuid].append(instance.name)
+                  break
+    else:
+      network_to_groups = None
+
+    if query.NETQ_STATS in self.requested_data:
+      stats = \
+        dict((uuid,
+              self._GetStats(network.AddressPool(self._all_networks[uuid])))
+             for uuid in self.wanted)
+    else:
+      stats = None
 
     return query.NetworkQueryData([self._all_networks[uuid]
                                    for uuid in self.wanted],
@@ -16152,6 +16303,19 @@ class _NetworkQuery(_QueryBase):
                                    network_to_instances,
                                    stats)
 
+  @staticmethod
+  def _GetStats(pool):
+    """Returns statistics for a network address pool.
+
+    """
+    return {
+      "free_count": pool.GetFreeCount(),
+      "reserved_count": pool.GetReservedCount(),
+      "map": pool.GetMap(),
+      "external_reservations":
+        utils.CommaJoin(pool.GetExternalReservations()),
+      }
+
 
 class LUNetworkQuery(NoHooksLU):
   """Logical unit for querying networks.
@@ -16186,13 +16350,13 @@ class LUNetworkConnect(LogicalUnit):
 
     self.network_uuid = self.cfg.LookupNetwork(self.network_name)
     if self.network_uuid is None:
-      raise errors.OpPrereqError("Network %s does not exist" %
-                                 self.network_name, errors.ECODE_INVAL)
+      raise errors.OpPrereqError("Network '%s' does not exist" %
+                                 self.network_name, errors.ECODE_NOENT)
 
     self.group_uuid = self.cfg.LookupNodeGroup(self.group_name)
     if self.group_uuid is None:
-      raise errors.OpPrereqError("Group %s does not exist" %
-                                 self.group_name, errors.ECODE_INVAL)
+      raise errors.OpPrereqError("Group '%s' does not exist" %
+                                 self.group_name, errors.ECODE_NOENT)
 
     self.needed_locks = {
       locking.LEVEL_INSTANCE: [],
@@ -16231,9 +16395,6 @@ class LUNetworkConnect(LogicalUnit):
 
     assert self.group_uuid in owned_groups
 
-    l = lambda value: utils.CommaJoin("%s: %s/%s" % (i[0], i[1], i[2])
-                                      for i in value)
-
     self.netparams = {
       constants.NIC_MODE: self.network_mode,
       constants.NIC_LINK: self.network_link,
@@ -16251,28 +16412,10 @@ class LUNetworkConnect(LogicalUnit):
       return
 
     if self.op.conflicts_check:
-      # Check if locked instances are still correct
-      owned_instances = frozenset(self.owned_locks(locking.LEVEL_INSTANCE))
-      _CheckNodeGroupInstances(self.cfg, self.group_uuid, owned_instances)
-
-      nobj = self.cfg.GetNetwork(self.network_uuid)
-      pool = network.AddressPool(nobj)
-      conflicting_instances = []
-
-      for (_, instance) in self.cfg.GetMultiInstanceInfo(owned_instances):
-        for idx, nic in enumerate(instance.nics):
-          if pool.Contains(nic.ip):
-            conflicting_instances.append((instance.name, idx, nic.ip))
-
-      if conflicting_instances:
-        self.LogWarning("Following occurences use IPs from network %s"
-                        " that is about to connect to nodegroup %s: %s" %
-                        (self.network_name, self.group.name,
-                        l(conflicting_instances)))
-        raise errors.OpPrereqError("Conflicting IPs found."
-                                   " Please remove/modify"
-                                   " corresponding NICs",
-                                   errors.ECODE_INVAL)
+      pool = network.AddressPool(self.cfg.GetNetwork(self.network_uuid))
+
+      _NetworkConflictCheck(self, lambda nic: pool.Contains(nic.ip),
+                            "connect to")
 
   def Exec(self, feedback_fn):
     if self.connected:
@@ -16282,6 +16425,53 @@ class LUNetworkConnect(LogicalUnit):
     self.cfg.Update(self.group, feedback_fn)
 
 
+def _NetworkConflictCheck(lu, check_fn, action):
+  """Checks for network interface conflicts with a network.
+
+  @type lu: L{LogicalUnit}
+  @type check_fn: callable receiving one parameter (L{objects.NIC}) and
+    returning boolean
+  @param check_fn: Function checking for conflict
+  @type action: string
+  @param action: Part of error message (see code)
+  @raise errors.OpPrereqError: If conflicting IP addresses are found.
+
+  """
+  # Check if locked instances are still correct
+  owned_instances = frozenset(lu.owned_locks(locking.LEVEL_INSTANCE))
+  _CheckNodeGroupInstances(lu.cfg, lu.group_uuid, owned_instances)
+
+  conflicts = []
+
+  for (_, instance) in lu.cfg.GetMultiInstanceInfo(owned_instances):
+    instconflicts = [(idx, nic.ip)
+                     for (idx, nic) in enumerate(instance.nics)
+                     if check_fn(nic)]
+
+    if instconflicts:
+      conflicts.append((instance.name, instconflicts))
+
+  if conflicts:
+    lu.LogWarning("IP addresses from network '%s', which is about to %s"
+                  " node group '%s', are in use: %s" %
+                  (lu.network_name, action, lu.group.name,
+                   utils.CommaJoin(("%s: %s" %
+                                    (name, _FmtNetworkConflict(details)))
+                                   for (name, details) in conflicts)))
+
+    raise errors.OpPrereqError("Conflicting IP addresses found; "
+                               " remove/modify the corresponding network"
+                               " interfaces", errors.ECODE_STATE)
+
+
+def _FmtNetworkConflict(details):
+  """Utility for L{_NetworkConflictCheck}.
+
+  """
+  return utils.CommaJoin("nic%s/%s" % (idx, ipaddr)
+                         for (idx, ipaddr) in details)
+
+
 class LUNetworkDisconnect(LogicalUnit):
   """Disconnect a network to a nodegroup
 
@@ -16296,13 +16486,13 @@ class LUNetworkDisconnect(LogicalUnit):
 
     self.network_uuid = self.cfg.LookupNetwork(self.network_name)
     if self.network_uuid is None:
-      raise errors.OpPrereqError("Network %s does not exist" %
-                                 self.network_name, errors.ECODE_INVAL)
+      raise errors.OpPrereqError("Network '%s' does not exist" %
+                                 self.network_name, errors.ECODE_NOENT)
 
     self.group_uuid = self.cfg.LookupNodeGroup(self.group_name)
     if self.group_uuid is None:
-      raise errors.OpPrereqError("Group %s does not exist" %
-                                 self.group_name, errors.ECODE_INVAL)
+      raise errors.OpPrereqError("Group '%s' does not exist" %
+                                 self.group_name, errors.ECODE_NOENT)
 
     self.needed_locks = {
       locking.LEVEL_INSTANCE: [],
@@ -16335,9 +16525,6 @@ class LUNetworkDisconnect(LogicalUnit):
 
     assert self.group_uuid in owned_groups
 
-    l = lambda value: utils.CommaJoin("%s: %s/%s" % (i[0], i[1], i[2])
-                                      for i in value)
-
     self.group = self.cfg.GetNodeGroup(self.group_uuid)
     self.connected = True
     if self.network_uuid not in self.group.networks:
@@ -16347,27 +16534,8 @@ class LUNetworkDisconnect(LogicalUnit):
       return
 
     if self.op.conflicts_check:
-      # Check if locked instances are still correct
-      owned_instances = frozenset(self.owned_locks(locking.LEVEL_INSTANCE))
-      _CheckNodeGroupInstances(self.cfg, self.group_uuid, owned_instances)
-
-      conflicting_instances = []
-
-      for (_, instance) in self.cfg.GetMultiInstanceInfo(owned_instances):
-        for idx, nic in enumerate(instance.nics):
-          if nic.network == self.network_name:
-            conflicting_instances.append((instance.name, idx, nic.ip))
-
-      if conflicting_instances:
-        self.LogWarning("Following occurences use IPs from network %s"
-                           " that is about to disconnected from the nodegroup"
-                           " %s: %s" %
-                           (self.network_name, self.group.name,
-                            l(conflicting_instances)))
-        raise errors.OpPrereqError("Conflicting IPs."
-                                   " Please remove/modify"
-                                   " corresponding NICS",
-                                   errors.ECODE_INVAL)
+      _NetworkConflictCheck(self, lambda nic: nic.network == self.network_name,
+                            "disconnect from")
 
   def Exec(self, feedback_fn):
     if not self.connected:
@@ -16405,18 +16573,18 @@ def _GetQueryImplementation(name):
 
 
 def _CheckForConflictingIp(lu, ip, node):
-  """In case of conflicting ip raise error.
+  """In case of conflicting IP address raise error.
 
   @type ip: string
-  @param ip: ip address
+  @param ip: IP address
   @type node: string
   @param node: node name
 
   """
   (conf_net, _) = lu.cfg.CheckIPInNodeGroup(ip, node)
   if conf_net is not None:
-    raise errors.OpPrereqError("Conflicting IP found:"
-                               " %s <> %s." % (ip, conf_net),
-                               errors.ECODE_INVAL)
+    raise errors.OpPrereqError(("Conflicting IP address found: '%s' != '%s'" %
+                                (ip, conf_net)),
+                               errors.ECODE_STATE)
 
   return (None, None)