Hold node resource lock while moving instance
[ganeti-local] / lib / cmdlib.py
index 8ec6f4a..f6afefe 100644 (file)
@@ -59,10 +59,15 @@ from ganeti import query
 from ganeti import qlang
 from ganeti import opcodes
 from ganeti import ht
+from ganeti import rpc
 
 import ganeti.masterd.instance # pylint: disable=W0611
 
 
+#: Size of DRBD meta block device
+DRBD_META_SIZE = 128
+
+
 class ResultWithJobs:
   """Data container for LU results with jobs.
 
@@ -108,7 +113,7 @@ class LogicalUnit(object):
   HTYPE = None
   REQ_BGL = True
 
-  def __init__(self, processor, op, context, rpc):
+  def __init__(self, processor, op, context, rpc_runner):
     """Constructor for LogicalUnit.
 
     This needs to be overridden in derived classes in order to check op
@@ -122,7 +127,7 @@ class LogicalUnit(object):
     # readability alias
     self.owned_locks = context.glm.list_owned
     self.context = context
-    self.rpc = rpc
+    self.rpc = rpc_runner
     # Dicts used to declare locking needs to mcpu
     self.needed_locks = None
     self.share_locks = dict.fromkeys(locking.LEVELS, 0)
@@ -344,7 +349,8 @@ class LogicalUnit(object):
                                                 self.op.instance_name)
     self.needed_locks[locking.LEVEL_INSTANCE] = self.op.instance_name
 
-  def _LockInstancesNodes(self, primary_only=False):
+  def _LockInstancesNodes(self, primary_only=False,
+                          level=locking.LEVEL_NODE):
     """Helper function to declare instances' nodes for locking.
 
     This function should be called after locking one or more instances to lock
@@ -365,9 +371,10 @@ class LogicalUnit(object):
 
     @type primary_only: boolean
     @param primary_only: only lock primary nodes of locked instances
+    @param level: Which lock level to use for locking nodes
 
     """
-    assert locking.LEVEL_NODE in self.recalculate_locks, \
+    assert level in self.recalculate_locks, \
       "_LockInstancesNodes helper function called with no nodes to recalculate"
 
     # TODO: check if we're really been called with the instance locks held
@@ -382,12 +389,14 @@ class LogicalUnit(object):
       if not primary_only:
         wanted_nodes.extend(instance.secondary_nodes)
 
-    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
-      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
-    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
-      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
+    if self.recalculate_locks[level] == constants.LOCKS_REPLACE:
+      self.needed_locks[level] = wanted_nodes
+    elif self.recalculate_locks[level] == constants.LOCKS_APPEND:
+      self.needed_locks[level].extend(wanted_nodes)
+    else:
+      raise errors.ProgrammerError("Unknown recalculation mode")
 
-    del self.recalculate_locks[locking.LEVEL_NODE]
+    del self.recalculate_locks[level]
 
 
 class NoHooksLU(LogicalUnit): # pylint: disable=W0223
@@ -753,7 +762,7 @@ def _RunPostHook(lu, node_name):
   """Runs the post-hook for an opcode on a single node.
 
   """
-  hm = lu.proc.hmclass(lu.rpc.call_hooks_runner, lu)
+  hm = lu.proc.BuildHooksManager(lu)
   try:
     hm.RunPhase(constants.HOOKS_PHASE_POST, nodes=[node_name])
   except:
@@ -1204,13 +1213,13 @@ def _GetStorageTypeArgs(cfg, storage_type):
   return []
 
 
-def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
+def _FindFaultyInstanceDisks(cfg, rpc_runner, instance, node_name, prereq):
   faulty = []
 
   for dev in instance.disks:
     cfg.SetDiskID(dev, node_name)
 
-  result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks)
+  result = rpc_runner.call_blockdev_getmirrorstatus(node_name, instance.disks)
   result.Raise("Failed to get disk status from node %s" % node_name,
                prereq=prereq, ecode=errors.ECODE_ENVIRON)
 
@@ -1350,15 +1359,17 @@ class LUClusterDestroy(LogicalUnit):
     """Destroys the cluster.
 
     """
-    master = self.cfg.GetMasterNode()
+    master_params = self.cfg.GetMasterNetworkParameters()
 
     # Run post hooks on master node before it's removed
-    _RunPostHook(self, master)
+    _RunPostHook(self, master_params.name)
 
-    result = self.rpc.call_node_deactivate_master_ip(master)
+    ems = self.cfg.GetUseExternalMipScript()
+    result = self.rpc.call_node_deactivate_master_ip(master_params.name,
+                                                     master_params, ems)
     result.Raise("Could not disable the master role")
 
-    return master
+    return master_params.name
 
 
 def _VerifyCertificate(filename):
@@ -1931,6 +1942,26 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
       _ErrorIf(bool(missing), constants.CV_ENODENET, node,
                "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
 
+  def _VerifyNodeUserScripts(self, ninfo, nresult):
+    """Check the results of user scripts presence and executability on the node
+
+    @type ninfo: L{objects.Node}
+    @param ninfo: the node to check
+    @param nresult: the remote results for the node
+
+    """
+    node = ninfo.name
+
+    test = not constants.NV_USERSCRIPTS in nresult
+    self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, node,
+                  "did not return user scripts information")
+
+    broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
+    if not test:
+      self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, node,
+                    "user scripts not present or not executable: %s" %
+                    utils.CommaJoin(sorted(broken_scripts)))
+
   def _VerifyNodeNetwork(self, ninfo, nresult):
     """Check the node network connectivity results.
 
@@ -2080,7 +2111,7 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
 
   @classmethod
   def _VerifyFiles(cls, errorif, nodeinfo, master_node, all_nvinfo,
-                   (files_all, files_all_opt, files_mc, files_vm)):
+                   (files_all, files_opt, files_mc, files_vm)):
     """Verifies file checksums collected from all nodes.
 
     @param errorif: Callback for reporting errors
@@ -2089,14 +2120,9 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
     @param all_nvinfo: RPC results
 
     """
-    assert (len(files_all | files_all_opt | files_mc | files_vm) ==
-            sum(map(len, [files_all, files_all_opt, files_mc, files_vm]))), \
-           "Found file listed in more than one file list"
-
     # Define functions determining which nodes to consider for a file
     files2nodefn = [
       (files_all, None),
-      (files_all_opt, None),
       (files_mc, lambda node: (node.master_candidate or
                                node.name == master_node)),
       (files_vm, lambda node: node.vm_capable),
@@ -2113,7 +2139,7 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
                         frozenset(map(operator.attrgetter("name"), filenodes)))
                        for filename in files)
 
-    assert set(nodefiles) == (files_all | files_all_opt | files_mc | files_vm)
+    assert set(nodefiles) == (files_all | files_mc | files_vm)
 
     fileinfo = dict((filename, {}) for filename in nodefiles)
     ignore_nodes = set()
@@ -2155,7 +2181,7 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
       # Nodes missing file
       missing_file = expected_nodes - with_file
 
-      if filename in files_all_opt:
+      if filename in files_opt:
         # All or no nodes
         errorif(missing_file and missing_file != expected_nodes,
                 constants.CV_ECLUSTERFILECHECK, None,
@@ -2644,6 +2670,10 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
 
     feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_names))
 
+    user_scripts = []
+    if self.cfg.GetUseExternalMipScript():
+      user_scripts.append(constants.EXTERNAL_MASTER_SETUP_SCRIPT)
+
     node_verify_param = {
       constants.NV_FILELIST:
         utils.UniqueSequence(filename
@@ -2666,6 +2696,7 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
       constants.NV_MASTERIP: (master_node, master_ip),
       constants.NV_OSLIST: None,
       constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
+      constants.NV_USERSCRIPTS: user_scripts,
       }
 
     if vg_name is not None:
@@ -2824,6 +2855,7 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
       nimg.call_ok = self._VerifyNode(node_i, nresult)
       self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
       self._VerifyNodeNetwork(node_i, nresult)
+      self._VerifyNodeUserScripts(node_i, nresult)
       self._VerifyOob(node_i, nresult)
 
       if nimg.vm_capable:
@@ -3154,7 +3186,7 @@ class LUGroupVerifyDisks(NoHooksLU):
       # any leftover items in nv_dict are missing LVs, let's arrange the data
       # better
       for key, inst in nv_dict.iteritems():
-        res_missing.setdefault(inst, []).append(key)
+        res_missing.setdefault(inst, []).append(list(key))
 
     return (res_nodes, list(res_instances), res_missing)
 
@@ -3169,21 +3201,21 @@ class LUClusterRepairDiskSizes(NoHooksLU):
     if self.op.instances:
       self.wanted_names = _GetWantedInstances(self, self.op.instances)
       self.needed_locks = {
-        locking.LEVEL_NODE: [],
+        locking.LEVEL_NODE_RES: [],
         locking.LEVEL_INSTANCE: self.wanted_names,
         }
-      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+      self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
     else:
       self.wanted_names = None
       self.needed_locks = {
-        locking.LEVEL_NODE: locking.ALL_SET,
+        locking.LEVEL_NODE_RES: locking.ALL_SET,
         locking.LEVEL_INSTANCE: locking.ALL_SET,
         }
     self.share_locks = _ShareAll()
 
   def DeclareLocks(self, level):
-    if level == locking.LEVEL_NODE and self.wanted_names is not None:
-      self._LockInstancesNodes(primary_only=True)
+    if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
+      self._LockInstancesNodes(primary_only=True, level=level)
 
   def CheckPrereq(self):
     """Check prerequisites.
@@ -3234,6 +3266,11 @@ class LUClusterRepairDiskSizes(NoHooksLU):
       for idx, disk in enumerate(instance.disks):
         per_node_disks[pnode].append((instance, idx, disk))
 
+    assert not (frozenset(per_node_disks.keys()) -
+                self.owned_locks(locking.LEVEL_NODE_RES)), \
+      "Not owning correct locks"
+    assert not self.owned_locks(locking.LEVEL_NODE)
+
     changed = []
     for node, dskl in per_node_disks.items():
       newl = [v[2].Copy() for v in dskl]
@@ -3323,29 +3360,33 @@ class LUClusterRename(LogicalUnit):
 
     """
     clustername = self.op.name
-    ip = self.ip
+    new_ip = self.ip
 
     # shutdown the master IP
-    master = self.cfg.GetMasterNode()
-    result = self.rpc.call_node_deactivate_master_ip(master)
+    master_params = self.cfg.GetMasterNetworkParameters()
+    ems = self.cfg.GetUseExternalMipScript()
+    result = self.rpc.call_node_deactivate_master_ip(master_params.name,
+                                                     master_params, ems)
     result.Raise("Could not disable the master role")
 
     try:
       cluster = self.cfg.GetClusterInfo()
       cluster.cluster_name = clustername
-      cluster.master_ip = ip
+      cluster.master_ip = new_ip
       self.cfg.Update(cluster, feedback_fn)
 
       # update the known hosts file
       ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
       node_list = self.cfg.GetOnlineNodeList()
       try:
-        node_list.remove(master)
+        node_list.remove(master_params.name)
       except ValueError:
         pass
       _UploadHelper(self, node_list, constants.SSH_KNOWN_HOSTS_FILE)
     finally:
-      result = self.rpc.call_node_activate_master_ip(master)
+      master_params.ip = new_ip
+      result = self.rpc.call_node_activate_master_ip(master_params.name,
+                                                     master_params, ems)
       msg = result.fail_msg
       if msg:
         self.LogWarning("Could not re-enable the master role on"
@@ -3675,6 +3716,9 @@ class LUClusterSetParams(LogicalUnit):
     if self.op.reserved_lvs is not None:
       self.cluster.reserved_lvs = self.op.reserved_lvs
 
+    if self.op.use_external_mip_script is not None:
+      self.cluster.use_external_mip_script = self.op.use_external_mip_script
+
     def helper_os(aname, mods, desc):
       desc += " OS list"
       lst = getattr(self.cluster, aname)
@@ -3699,33 +3743,40 @@ class LUClusterSetParams(LogicalUnit):
       helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
 
     if self.op.master_netdev:
-      master = self.cfg.GetMasterNode()
+      master_params = self.cfg.GetMasterNetworkParameters()
+      ems = self.cfg.GetUseExternalMipScript()
       feedback_fn("Shutting down master ip on the current netdev (%s)" %
                   self.cluster.master_netdev)
-      result = self.rpc.call_node_deactivate_master_ip(master)
+      result = self.rpc.call_node_deactivate_master_ip(master_params.name,
+                                                       master_params, ems)
       result.Raise("Could not disable the master ip")
       feedback_fn("Changing master_netdev from %s to %s" %
-                  (self.cluster.master_netdev, self.op.master_netdev))
+                  (master_params.netdev, self.op.master_netdev))
       self.cluster.master_netdev = self.op.master_netdev
 
     if self.op.master_netmask:
-      master = self.cfg.GetMasterNode()
+      master_params = self.cfg.GetMasterNetworkParameters()
       feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
-      result = self.rpc.call_node_change_master_netmask(master,
-                                                        self.op.master_netmask)
+      result = self.rpc.call_node_change_master_netmask(master_params.name,
+                                                        master_params.netmask,
+                                                        self.op.master_netmask,
+                                                        master_params.ip,
+                                                        master_params.netdev)
       if result.fail_msg:
         msg = "Could not change the master IP netmask: %s" % result.fail_msg
-        self.LogWarning(msg)
         feedback_fn(msg)
-      else:
-        self.cluster.master_netmask = self.op.master_netmask
+
+      self.cluster.master_netmask = self.op.master_netmask
 
     self.cfg.Update(self.cluster, feedback_fn)
 
     if self.op.master_netdev:
+      master_params = self.cfg.GetMasterNetworkParameters()
       feedback_fn("Starting the master ip on the new master netdev (%s)" %
                   self.op.master_netdev)
-      result = self.rpc.call_node_activate_master_ip(master)
+      ems = self.cfg.GetUseExternalMipScript()
+      result = self.rpc.call_node_activate_master_ip(master_params.name,
+                                                     master_params, ems)
       if result.fail_msg:
         self.LogWarning("Could not re-enable the master ip on"
                         " the master, please restart manually: %s",
@@ -3760,6 +3811,7 @@ def _ComputeAncillaryFiles(cluster, redist):
     constants.CLUSTER_DOMAIN_SECRET_FILE,
     constants.SPICE_CERT_FILE,
     constants.SPICE_CACERT_FILE,
+    constants.RAPI_USERS_FILE,
     ])
 
   if not redist:
@@ -3772,27 +3824,43 @@ def _ComputeAncillaryFiles(cluster, redist):
   if cluster.modify_etc_hosts:
     files_all.add(constants.ETC_HOSTS)
 
-  # Files which must either exist on all nodes or on none
-  files_all_opt = set([
+  # Files which are optional, these must:
+  # - be present in one other category as well
+  # - either exist or not exist on all nodes of that category (mc, vm all)
+  files_opt = set([
     constants.RAPI_USERS_FILE,
     ])
 
   # Files which should only be on master candidates
   files_mc = set()
+
   if not redist:
     files_mc.add(constants.CLUSTER_CONF_FILE)
 
+    # FIXME: this should also be replicated but Ganeti doesn't support files_mc
+    # replication
+    files_mc.add(constants.DEFAULT_MASTER_SETUP_SCRIPT)
+
   # Files which should only be on VM-capable nodes
   files_vm = set(filename
     for hv_name in cluster.enabled_hypervisors
-    for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles())
+    for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles()[0])
+
+  files_opt |= set(filename
+    for hv_name in cluster.enabled_hypervisors
+    for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles()[1])
 
-  # Filenames must be unique
-  assert (len(files_all | files_all_opt | files_mc | files_vm) ==
-          sum(map(len, [files_all, files_all_opt, files_mc, files_vm]))), \
+  # Filenames in each category must be unique
+  all_files_set = files_all | files_mc | files_vm
+  assert (len(all_files_set) ==
+          sum(map(len, [files_all, files_mc, files_vm]))), \
          "Found file listed in more than one file list"
 
-  return (files_all, files_all_opt, files_mc, files_vm)
+  # Optional files must be present in one other category
+  assert all_files_set.issuperset(files_opt), \
+         "Optional file not in a different required list"
+
+  return (files_all, files_opt, files_mc, files_vm)
 
 
 def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True):
@@ -3826,7 +3894,7 @@ def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True):
       nodelist.remove(master_info.name)
 
   # Gather file lists
-  (files_all, files_all_opt, files_mc, files_vm) = \
+  (files_all, _, files_mc, files_vm) = \
     _ComputeAncillaryFiles(cluster, True)
 
   # Never re-distribute configuration file from here
@@ -3836,7 +3904,6 @@ def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True):
 
   filemap = [
     (online_nodes, files_all),
-    (online_nodes, files_all_opt),
     (vm_nodes, files_vm),
     ]
 
@@ -3876,8 +3943,10 @@ class LUClusterActivateMasterIp(NoHooksLU):
     """Activate the master IP.
 
     """
-    master = self.cfg.GetMasterNode()
-    self.rpc.call_node_activate_master_ip(master)
+    master_params = self.cfg.GetMasterNetworkParameters()
+    ems = self.cfg.GetUseExternalMipScript()
+    self.rpc.call_node_activate_master_ip(master_params.name,
+                                          master_params, ems)
 
 
 class LUClusterDeactivateMasterIp(NoHooksLU):
@@ -3888,8 +3957,10 @@ class LUClusterDeactivateMasterIp(NoHooksLU):
     """Deactivate the master IP.
 
     """
-    master = self.cfg.GetMasterNode()
-    self.rpc.call_node_deactivate_master_ip(master)
+    master_params = self.cfg.GetMasterNetworkParameters()
+    ems = self.cfg.GetUseExternalMipScript()
+    self.rpc.call_node_deactivate_master_ip(master_params.name, master_params,
+                                            ems)
 
 
 def _WaitForSync(lu, instance, disks=None, oneshot=False):
@@ -4411,6 +4482,9 @@ class LUNodeRemove(LogicalUnit):
 
     modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
 
+    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
+      "Not owning BGL"
+
     # Promote nodes to master candidate as needed
     _AdjustCandidatePool(self, exceptions=[node.name])
     self.context.RemoveNode(node.name)
@@ -4524,6 +4598,9 @@ class LUNodeQuery(NoHooksLU):
   def ExpandNames(self):
     self.nq.ExpandNames(self)
 
+  def DeclareLocks(self, level):
+    self.nq.DeclareLocks(self, level)
+
   def Exec(self, feedback_fn):
     return self.nq.OldStyleQuery(self)
 
@@ -4542,8 +4619,9 @@ class LUNodeQueryvols(NoHooksLU):
                        selected=self.op.output_fields)
 
   def ExpandNames(self):
+    self.share_locks = _ShareAll()
     self.needed_locks = {}
-    self.share_locks[locking.LEVEL_NODE] = 1
+
     if not self.op.nodes:
       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
     else:
@@ -4610,8 +4688,8 @@ class LUNodeQueryStorage(NoHooksLU):
                        selected=self.op.output_fields)
 
   def ExpandNames(self):
+    self.share_locks = _ShareAll()
     self.needed_locks = {}
-    self.share_locks[locking.LEVEL_NODE] = 1
 
     if self.op.nodes:
       self.needed_locks[locking.LEVEL_NODE] = \
@@ -5077,6 +5155,9 @@ class LUNodeAdd(LogicalUnit):
     new_node = self.new_node
     node = new_node.name
 
+    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
+      "Not owning BGL"
+
     # We adding a new node so we assume it's powered
     new_node.powered = True
 
@@ -5215,6 +5296,13 @@ class LUNodeSetParams(LogicalUnit):
     self.lock_all = self.op.auto_promote and self.might_demote
     self.lock_instances = self.op.secondary_ip is not None
 
+  def _InstanceFilter(self, instance):
+    """Filter for getting affected instances.
+
+    """
+    return (instance.disk_template in constants.DTS_INT_MIRROR and
+            self.op.node_name in instance.all_nodes)
+
   def ExpandNames(self):
     if self.lock_all:
       self.needed_locks = {locking.LEVEL_NODE: locking.ALL_SET}
@@ -5222,28 +5310,8 @@ class LUNodeSetParams(LogicalUnit):
       self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
 
     if self.lock_instances:
-      self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
-
-  def DeclareLocks(self, level):
-    # If we have locked all instances, before waiting to lock nodes, release
-    # all the ones living on nodes unrelated to the current operation.
-    if level == locking.LEVEL_NODE and self.lock_instances:
-      self.affected_instances = []
-      if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
-        instances_keep = []
-
-        # Build list of instances to release
-        locked_i = self.owned_locks(locking.LEVEL_INSTANCE)
-        for instance_name, instance in self.cfg.GetMultiInstanceInfo(locked_i):
-          if (instance.disk_template in constants.DTS_INT_MIRROR and
-              self.op.node_name in instance.all_nodes):
-            instances_keep.append(instance_name)
-            self.affected_instances.append(instance)
-
-        _ReleaseLocks(self, locking.LEVEL_INSTANCE, keep=instances_keep)
-
-        assert (set(self.owned_locks(locking.LEVEL_INSTANCE)) ==
-                set(instances_keep))
+      self.needed_locks[locking.LEVEL_INSTANCE] = \
+        frozenset(self.cfg.GetInstancesInfoByFilter(self._InstanceFilter))
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -5275,6 +5343,25 @@ class LUNodeSetParams(LogicalUnit):
     """
     node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
 
+    if self.lock_instances:
+      affected_instances = \
+        self.cfg.GetInstancesInfoByFilter(self._InstanceFilter)
+
+      # Verify instance locks
+      owned_instances = self.owned_locks(locking.LEVEL_INSTANCE)
+      wanted_instances = frozenset(affected_instances.keys())
+      if wanted_instances - owned_instances:
+        raise errors.OpPrereqError("Instances affected by changing node %s's"
+                                   " secondary IP address have changed since"
+                                   " locks were acquired, wanted '%s', have"
+                                   " '%s'; retry the operation" %
+                                   (self.op.node_name,
+                                    utils.CommaJoin(wanted_instances),
+                                    utils.CommaJoin(owned_instances)),
+                                   errors.ECODE_STATE)
+    else:
+      affected_instances = None
+
     if (self.op.master_candidate is not None or
         self.op.drained is not None or
         self.op.offline is not None):
@@ -5364,7 +5451,9 @@ class LUNodeSetParams(LogicalUnit):
 
     if old_role == self._ROLE_OFFLINE and new_role != old_role:
       # Trying to transition out of offline status
-      result = self.rpc.call_version([node.name])[node.name]
+      # TODO: Use standard RPC runner, but make sure it works when the node is
+      # still marked offline
+      result = rpc.BootstrapRunner().call_version([node.name])[node.name]
       if result.fail_msg:
         raise errors.OpPrereqError("Node %s is being de-offlined but fails"
                                    " to report its version: %s" %
@@ -5383,15 +5472,19 @@ class LUNodeSetParams(LogicalUnit):
         raise errors.OpPrereqError("Cannot change the secondary ip on a single"
                                    " homed cluster", errors.ECODE_INVAL)
 
+      assert not (frozenset(affected_instances) -
+                  self.owned_locks(locking.LEVEL_INSTANCE))
+
       if node.offline:
-        if self.affected_instances:
-          raise errors.OpPrereqError("Cannot change secondary ip: offline"
-                                     " node has instances (%s) configured"
-                                     " to use it" % self.affected_instances)
+        if affected_instances:
+          raise errors.OpPrereqError("Cannot change secondary IP address:"
+                                     " offline node has instances (%s)"
+                                     " configured to use it" %
+                                     utils.CommaJoin(affected_instances.keys()))
       else:
         # On online nodes, check that no instances are running, and that
         # the node has the new ip and we can reach it.
-        for instance in self.affected_instances:
+        for instance in affected_instances.values():
           _CheckInstanceDown(self, instance, "cannot change secondary ip")
 
         _CheckNodeHasSecondaryIP(self, node.name, self.op.secondary_ip, True)
@@ -5544,6 +5637,7 @@ class LUClusterQuery(NoHooksLU):
       "candidate_pool_size": cluster.candidate_pool_size,
       "master_netdev": cluster.master_netdev,
       "master_netmask": cluster.master_netmask,
+      "use_external_mip_script": cluster.use_external_mip_script,
       "volume_group_name": cluster.volume_group_name,
       "drbd_usermode_helper": cluster.drbd_usermode_helper,
       "file_storage_dir": cluster.file_storage_dir,
@@ -6070,9 +6164,11 @@ class LUInstanceStartup(LogicalUnit):
 
       _StartInstanceDisks(self, instance, force)
 
-      result = self.rpc.call_instance_start(node_current, instance,
-                                            self.op.hvparams, self.op.beparams,
-                                            self.op.startup_paused)
+      result = \
+        self.rpc.call_instance_start(node_current,
+                                     (instance, self.op.hvparams,
+                                      self.op.beparams),
+                                     self.op.startup_paused)
       msg = result.fail_msg
       if msg:
         _ShutdownInstanceDisks(self, instance)
@@ -6162,8 +6258,8 @@ class LUInstanceReboot(LogicalUnit):
         self.LogInfo("Instance %s was already stopped, starting now",
                      instance.name)
       _StartInstanceDisks(self, instance, ignore_secondaries)
-      result = self.rpc.call_instance_start(node_current, instance,
-                                            None, None, False)
+      result = self.rpc.call_instance_start(node_current,
+                                            (instance, None, None), False)
       msg = result.fail_msg
       if msg:
         _ShutdownInstanceDisks(self, instance)
@@ -6324,9 +6420,9 @@ class LUInstanceReinstall(LogicalUnit):
     try:
       feedback_fn("Running the instance OS create scripts...")
       # FIXME: pass debug option from opcode to backend
-      result = self.rpc.call_instance_os_add(inst.primary_node, inst, True,
-                                             self.op.debug_level,
-                                             osparams=self.os_inst)
+      result = self.rpc.call_instance_os_add(inst.primary_node,
+                                             (inst, self.os_inst), True,
+                                             self.op.debug_level)
       result.Raise("Could not install OS for instance %s on node %s" %
                    (inst.name, inst.primary_node))
     finally:
@@ -6360,6 +6456,10 @@ class LUInstanceRecreateDisks(LogicalUnit):
       # otherwise we need to lock all nodes for disk re-creation
       primary_only = bool(self.op.nodes)
       self._LockInstancesNodes(primary_only=primary_only)
+    elif level == locking.LEVEL_NODE_RES:
+      # Copy node locks
+      self.needed_locks[locking.LEVEL_NODE_RES] = \
+        self.needed_locks[locking.LEVEL_NODE][:]
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -6406,7 +6506,8 @@ class LUInstanceRecreateDisks(LogicalUnit):
                                  self.op.instance_name, errors.ECODE_INVAL)
     # if we replace nodes *and* the old primary is offline, we don't
     # check
-    assert instance.primary_node in self.needed_locks[locking.LEVEL_NODE]
+    assert instance.primary_node in self.owned_locks(locking.LEVEL_NODE)
+    assert instance.primary_node in self.owned_locks(locking.LEVEL_NODE_RES)
     old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
     if not (self.op.nodes and old_pnode.offline):
       _CheckInstanceDown(self, instance, "cannot recreate disks")
@@ -6430,6 +6531,9 @@ class LUInstanceRecreateDisks(LogicalUnit):
     """
     instance = self.instance
 
+    assert (self.owned_locks(locking.LEVEL_NODE) ==
+            self.owned_locks(locking.LEVEL_NODE_RES))
+
     to_skip = []
     mods = [] # keeps track of needed logical_id changes
 
@@ -6598,11 +6702,16 @@ class LUInstanceRemove(LogicalUnit):
   def ExpandNames(self):
     self._ExpandAndLockInstance()
     self.needed_locks[locking.LEVEL_NODE] = []
+    self.needed_locks[locking.LEVEL_NODE_RES] = []
     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
 
   def DeclareLocks(self, level):
     if level == locking.LEVEL_NODE:
       self._LockInstancesNodes()
+    elif level == locking.LEVEL_NODE_RES:
+      # Copy node locks
+      self.needed_locks[locking.LEVEL_NODE_RES] = \
+        self.needed_locks[locking.LEVEL_NODE][:]
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -6651,6 +6760,12 @@ class LUInstanceRemove(LogicalUnit):
                                  " node %s: %s" %
                                  (instance.name, instance.primary_node, msg))
 
+    assert (self.owned_locks(locking.LEVEL_NODE) ==
+            self.owned_locks(locking.LEVEL_NODE_RES))
+    assert not (set(instance.all_nodes) -
+                self.owned_locks(locking.LEVEL_NODE)), \
+      "Not owning correct locks"
+
     _RemoveInstance(self, feedback_fn, instance, self.op.ignore_failures)
 
 
@@ -6864,11 +6979,16 @@ class LUInstanceMove(LogicalUnit):
     target_node = _ExpandNodeName(self.cfg, self.op.target_node)
     self.op.target_node = target_node
     self.needed_locks[locking.LEVEL_NODE] = [target_node]
+    self.needed_locks[locking.LEVEL_NODE_RES] = []
     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
 
   def DeclareLocks(self, level):
     if level == locking.LEVEL_NODE:
       self._LockInstancesNodes(primary_only=True)
+    elif level == locking.LEVEL_NODE_RES:
+      # Copy node locks
+      self.needed_locks[locking.LEVEL_NODE_RES] = \
+        self.needed_locks[locking.LEVEL_NODE][:]
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -6953,6 +7073,9 @@ class LUInstanceMove(LogicalUnit):
     self.LogInfo("Shutting down instance %s on source node %s",
                  instance.name, source_node)
 
+    assert (self.owned_locks(locking.LEVEL_NODE) ==
+            self.owned_locks(locking.LEVEL_NODE_RES))
+
     result = self.rpc.call_instance_shutdown(source_node, instance,
                                              self.op.shutdown_timeout)
     msg = result.fail_msg
@@ -7027,8 +7150,8 @@ class LUInstanceMove(LogicalUnit):
         _ShutdownInstanceDisks(self, instance)
         raise errors.OpExecError("Can't activate the instance's disks")
 
-      result = self.rpc.call_instance_start(target_node, instance,
-                                            None, None, False)
+      result = self.rpc.call_instance_start(target_node,
+                                            (instance, None, None), False)
       msg = result.fail_msg
       if msg:
         _ShutdownInstanceDisks(self, instance)
@@ -7691,7 +7814,7 @@ class TLMigrateInstance(Tasklet):
 
       self.feedback_fn("* starting the instance on the target node %s" %
                        target_node)
-      result = self.rpc.call_instance_start(target_node, instance, None, None,
+      result = self.rpc.call_instance_start(target_node, (instance, None, None),
                                             False)
       msg = result.fail_msg
       if msg:
@@ -7823,7 +7946,7 @@ def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names,
   shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
                           logical_id=(vgnames[0], names[0]))
-  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
+  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=DRBD_META_SIZE,
                           logical_id=(vgnames[1], names[1]))
   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
                           logical_id=(primary, secondary, port,
@@ -8143,7 +8266,7 @@ def _ComputeDiskSizePerVG(disk_template, disks):
     constants.DT_DISKLESS: {},
     constants.DT_PLAIN: _compute(disks, 0),
     # 128 MB are added for drbd metadata for each disk
-    constants.DT_DRBD8: _compute(disks, 128),
+    constants.DT_DRBD8: _compute(disks, DRBD_META_SIZE),
     constants.DT_FILE: {},
     constants.DT_SHARED_FILE: {},
   }
@@ -8164,7 +8287,8 @@ def _ComputeDiskSize(disk_template, disks):
     constants.DT_DISKLESS: None,
     constants.DT_PLAIN: sum(d[constants.IDISK_SIZE] for d in disks),
     # 128 MB are added for drbd metadata for each disk
-    constants.DT_DRBD8: sum(d[constants.IDISK_SIZE] + 128 for d in disks),
+    constants.DT_DRBD8:
+      sum(d[constants.IDISK_SIZE] + DRBD_META_SIZE for d in disks),
     constants.DT_FILE: None,
     constants.DT_SHARED_FILE: 0,
     constants.DT_BLOCK: 0,
@@ -8210,9 +8334,11 @@ def _CheckHVParams(lu, nodenames, hvname, hvparams):
 
   """
   nodenames = _FilterVmNodes(lu, nodenames)
-  hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
-                                                  hvname,
-                                                  hvparams)
+
+  cluster = lu.cfg.GetClusterInfo()
+  hvfull = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
+
+  hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames, hvname, hvfull)
   for node in nodenames:
     info = hvinfo[node]
     if info.offline:
@@ -8238,7 +8364,7 @@ def _CheckOSParams(lu, required, nodenames, osname, osparams):
 
   """
   nodenames = _FilterVmNodes(lu, nodenames)
-  result = lu.rpc.call_os_validate(required, nodenames, osname,
+  result = lu.rpc.call_os_validate(nodenames, required, osname,
                                    [constants.OS_VALIDATE_PARAMETERS],
                                    osparams)
   for node, nres in result.items():
@@ -8431,7 +8557,11 @@ class LUInstanceCreate(LogicalUnit):
     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
 
     if self.op.iallocator:
+      # TODO: Find a solution to not lock all nodes in the cluster, e.g. by
+      # specifying a group on instance creation and then selecting nodes from
+      # that group
       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+      self.needed_locks[locking.LEVEL_NODE_RES] = locking.ALL_SET
     else:
       self.op.pnode = _ExpandNodeName(self.cfg, self.op.pnode)
       nodelist = [self.op.pnode]
@@ -8439,6 +8569,9 @@ class LUInstanceCreate(LogicalUnit):
         self.op.snode = _ExpandNodeName(self.cfg, self.op.snode)
         nodelist.append(self.op.snode)
       self.needed_locks[locking.LEVEL_NODE] = nodelist
+      # Lock resources of instance's primary and secondary nodes (copy to
+      # prevent accidential modification)
+      self.needed_locks[locking.LEVEL_NODE_RES] = list(nodelist)
 
     # in case of import lock the source node too
     if self.op.mode == constants.INSTANCE_IMPORT:
@@ -9045,6 +9178,10 @@ class LUInstanceCreate(LogicalUnit):
     instance = self.op.instance_name
     pnode_name = self.pnode.name
 
+    assert not (self.owned_locks(locking.LEVEL_NODE_RES) -
+                self.owned_locks(locking.LEVEL_NODE)), \
+      "Node locks differ from node resource locks"
+
     ht_kind = self.op.hypervisor
     if ht_kind in constants.HTS_REQ_PORT:
       network_port = self.cfg.AllocatePort()
@@ -9147,6 +9284,9 @@ class LUInstanceCreate(LogicalUnit):
       raise errors.OpExecError("There are some degraded disks for"
                                " this instance")
 
+    # Release all node resource locks
+    _ReleaseLocks(self, locking.LEVEL_NODE_RES)
+
     if iobj.disk_template != constants.DT_DISKLESS and not self.adopt_disks:
       if self.op.mode == constants.INSTANCE_CREATE:
         if not self.op.no_install:
@@ -9164,7 +9304,7 @@ class LUInstanceCreate(LogicalUnit):
           feedback_fn("* running the instance OS create scripts...")
           # FIXME: pass debug option from opcode to backend
           os_add_result = \
-            self.rpc.call_instance_os_add(pnode_name, iobj, False,
+            self.rpc.call_instance_os_add(pnode_name, (iobj, None), False,
                                           self.op.debug_level)
           if pause_sync:
             feedback_fn("* resuming disk sync")
@@ -9239,13 +9379,15 @@ class LUInstanceCreate(LogicalUnit):
         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
                                      % self.op.mode)
 
+    assert not self.owned_locks(locking.LEVEL_NODE_RES)
+
     if self.op.start:
       iobj.admin_up = True
       self.cfg.Update(iobj, feedback_fn)
       logging.info("Starting instance %s on node %s", instance, pnode_name)
       feedback_fn("* starting instance...")
-      result = self.rpc.call_instance_start(pnode_name, iobj,
-                                            None, None, False)
+      result = self.rpc.call_instance_start(pnode_name, (iobj, None, None),
+                                            False)
       result.Raise("Could not start instance")
 
     return list(iobj.all_nodes)
@@ -9262,6 +9404,7 @@ class LUInstanceConsole(NoHooksLU):
   REQ_BGL = False
 
   def ExpandNames(self):
+    self.share_locks = _ShareAll()
     self._ExpandAndLockInstance()
 
   def CheckPrereq(self):
@@ -9829,7 +9972,7 @@ class TLReplaceDisks(Tasklet):
       lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
                              logical_id=(vg_data, names[0]))
       vg_meta = dev.children[1].logical_id[0]
-      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
+      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=DRBD_META_SIZE,
                              logical_id=(vg_meta, names[1]))
 
       new_lvs = [lv_data, lv_meta]
@@ -10459,11 +10602,16 @@ class LUInstanceGrowDisk(LogicalUnit):
   def ExpandNames(self):
     self._ExpandAndLockInstance()
     self.needed_locks[locking.LEVEL_NODE] = []
-    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+    self.needed_locks[locking.LEVEL_NODE_RES] = []
+    self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
 
   def DeclareLocks(self, level):
     if level == locking.LEVEL_NODE:
       self._LockInstancesNodes()
+    elif level == locking.LEVEL_NODE_RES:
+      # Copy node locks
+      self.needed_locks[locking.LEVEL_NODE_RES] = \
+        self.needed_locks[locking.LEVEL_NODE][:]
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -10520,10 +10668,18 @@ class LUInstanceGrowDisk(LogicalUnit):
     instance = self.instance
     disk = self.disk
 
+    assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
+    assert (self.owned_locks(locking.LEVEL_NODE) ==
+            self.owned_locks(locking.LEVEL_NODE_RES))
+
     disks_ok, _ = _AssembleInstanceDisks(self, self.instance, disks=[disk])
     if not disks_ok:
       raise errors.OpExecError("Cannot activate block device to grow")
 
+    feedback_fn("Growing disk %s of instance '%s' by %s" %
+                (self.op.disk, instance.name,
+                 utils.FormatUnit(self.op.amount, "h")))
+
     # First run all grow ops in dry-run mode
     for node in instance.all_nodes:
       self.cfg.SetDiskID(disk, node)
@@ -10546,6 +10702,13 @@ class LUInstanceGrowDisk(LogicalUnit):
 
     disk.RecordGrow(self.op.amount)
     self.cfg.Update(instance, feedback_fn)
+
+    # Changes have been recorded, release node lock
+    _ReleaseLocks(self, locking.LEVEL_NODE)
+
+    # Downgrade lock while waiting for sync
+    self.glm.downgrade(locking.LEVEL_INSTANCE)
+
     if self.op.wait_for_sync:
       disk_abort = not _WaitForSync(self, instance, disks=[disk])
       if disk_abort:
@@ -10558,6 +10721,9 @@ class LUInstanceGrowDisk(LogicalUnit):
                            " not supposed to be running because no wait for"
                            " sync mode was requested")
 
+    assert self.owned_locks(locking.LEVEL_NODE_RES)
+    assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
+
 
 class LUInstanceQueryData(NoHooksLU):
   """Query runtime instance data.
@@ -11890,8 +12056,8 @@ class LUBackupExport(LogicalUnit):
             not self.op.remove_instance):
           assert not activate_disks
           feedback_fn("Starting instance %s" % instance.name)
-          result = self.rpc.call_instance_start(src_node, instance,
-                                                None, None, False)
+          result = self.rpc.call_instance_start(src_node,
+                                                (instance, None, None), False)
           msg = result.fail_msg
           if msg:
             feedback_fn("Failed to start instance: %s" % msg)
@@ -13023,9 +13189,9 @@ class IAllocator(object):
   # pylint: disable=R0902
   # lots of instance attributes
 
-  def __init__(self, cfg, rpc, mode, **kwargs):
+  def __init__(self, cfg, rpc_runner, mode, **kwargs):
     self.cfg = cfg
-    self.rpc = rpc
+    self.rpc = rpc_runner
     # init buffer variables
     self.in_text = self.out_text = self.in_data = self.out_data = None
     # init all input fields so that pylint is happy