Acquire node resource lock when removing instance
[ganeti-local] / lib / cmdlib.py
index 8ec6f4a..7bfb1e3 100644 (file)
@@ -59,10 +59,15 @@ from ganeti import query
 from ganeti import qlang
 from ganeti import opcodes
 from ganeti import ht
 from ganeti import qlang
 from ganeti import opcodes
 from ganeti import ht
+from ganeti import rpc
 
 import ganeti.masterd.instance # pylint: disable=W0611
 
 
 
 import ganeti.masterd.instance # pylint: disable=W0611
 
 
+#: Size of DRBD meta block device
+DRBD_META_SIZE = 128
+
+
 class ResultWithJobs:
   """Data container for LU results with jobs.
 
 class ResultWithJobs:
   """Data container for LU results with jobs.
 
@@ -108,7 +113,7 @@ class LogicalUnit(object):
   HTYPE = None
   REQ_BGL = True
 
   HTYPE = None
   REQ_BGL = True
 
-  def __init__(self, processor, op, context, rpc):
+  def __init__(self, processor, op, context, rpc_runner):
     """Constructor for LogicalUnit.
 
     This needs to be overridden in derived classes in order to check op
     """Constructor for LogicalUnit.
 
     This needs to be overridden in derived classes in order to check op
@@ -122,7 +127,7 @@ class LogicalUnit(object):
     # readability alias
     self.owned_locks = context.glm.list_owned
     self.context = context
     # readability alias
     self.owned_locks = context.glm.list_owned
     self.context = context
-    self.rpc = rpc
+    self.rpc = rpc_runner
     # Dicts used to declare locking needs to mcpu
     self.needed_locks = None
     self.share_locks = dict.fromkeys(locking.LEVELS, 0)
     # Dicts used to declare locking needs to mcpu
     self.needed_locks = None
     self.share_locks = dict.fromkeys(locking.LEVELS, 0)
@@ -344,7 +349,8 @@ class LogicalUnit(object):
                                                 self.op.instance_name)
     self.needed_locks[locking.LEVEL_INSTANCE] = self.op.instance_name
 
                                                 self.op.instance_name)
     self.needed_locks[locking.LEVEL_INSTANCE] = self.op.instance_name
 
-  def _LockInstancesNodes(self, primary_only=False):
+  def _LockInstancesNodes(self, primary_only=False,
+                          level=locking.LEVEL_NODE):
     """Helper function to declare instances' nodes for locking.
 
     This function should be called after locking one or more instances to lock
     """Helper function to declare instances' nodes for locking.
 
     This function should be called after locking one or more instances to lock
@@ -365,9 +371,10 @@ class LogicalUnit(object):
 
     @type primary_only: boolean
     @param primary_only: only lock primary nodes of locked instances
 
     @type primary_only: boolean
     @param primary_only: only lock primary nodes of locked instances
+    @param level: Which lock level to use for locking nodes
 
     """
 
     """
-    assert locking.LEVEL_NODE in self.recalculate_locks, \
+    assert level in self.recalculate_locks, \
       "_LockInstancesNodes helper function called with no nodes to recalculate"
 
     # TODO: check if we're really been called with the instance locks held
       "_LockInstancesNodes helper function called with no nodes to recalculate"
 
     # TODO: check if we're really been called with the instance locks held
@@ -382,12 +389,14 @@ class LogicalUnit(object):
       if not primary_only:
         wanted_nodes.extend(instance.secondary_nodes)
 
       if not primary_only:
         wanted_nodes.extend(instance.secondary_nodes)
 
-    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
-      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
-    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
-      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
+    if self.recalculate_locks[level] == constants.LOCKS_REPLACE:
+      self.needed_locks[level] = wanted_nodes
+    elif self.recalculate_locks[level] == constants.LOCKS_APPEND:
+      self.needed_locks[level].extend(wanted_nodes)
+    else:
+      raise errors.ProgrammerError("Unknown recalculation mode")
 
 
-    del self.recalculate_locks[locking.LEVEL_NODE]
+    del self.recalculate_locks[level]
 
 
 class NoHooksLU(LogicalUnit): # pylint: disable=W0223
 
 
 class NoHooksLU(LogicalUnit): # pylint: disable=W0223
@@ -753,7 +762,7 @@ def _RunPostHook(lu, node_name):
   """Runs the post-hook for an opcode on a single node.
 
   """
   """Runs the post-hook for an opcode on a single node.
 
   """
-  hm = lu.proc.hmclass(lu.rpc.call_hooks_runner, lu)
+  hm = lu.proc.BuildHooksManager(lu)
   try:
     hm.RunPhase(constants.HOOKS_PHASE_POST, nodes=[node_name])
   except:
   try:
     hm.RunPhase(constants.HOOKS_PHASE_POST, nodes=[node_name])
   except:
@@ -1204,13 +1213,13 @@ def _GetStorageTypeArgs(cfg, storage_type):
   return []
 
 
   return []
 
 
-def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
+def _FindFaultyInstanceDisks(cfg, rpc_runner, instance, node_name, prereq):
   faulty = []
 
   for dev in instance.disks:
     cfg.SetDiskID(dev, node_name)
 
   faulty = []
 
   for dev in instance.disks:
     cfg.SetDiskID(dev, node_name)
 
-  result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks)
+  result = rpc_runner.call_blockdev_getmirrorstatus(node_name, instance.disks)
   result.Raise("Failed to get disk status from node %s" % node_name,
                prereq=prereq, ecode=errors.ECODE_ENVIRON)
 
   result.Raise("Failed to get disk status from node %s" % node_name,
                prereq=prereq, ecode=errors.ECODE_ENVIRON)
 
@@ -1350,15 +1359,17 @@ class LUClusterDestroy(LogicalUnit):
     """Destroys the cluster.
 
     """
     """Destroys the cluster.
 
     """
-    master = self.cfg.GetMasterNode()
+    master_params = self.cfg.GetMasterNetworkParameters()
 
     # Run post hooks on master node before it's removed
 
     # Run post hooks on master node before it's removed
-    _RunPostHook(self, master)
+    _RunPostHook(self, master_params.name)
 
 
-    result = self.rpc.call_node_deactivate_master_ip(master)
+    ems = self.cfg.GetUseExternalMipScript()
+    result = self.rpc.call_node_deactivate_master_ip(master_params.name,
+                                                     master_params, ems)
     result.Raise("Could not disable the master role")
 
     result.Raise("Could not disable the master role")
 
-    return master
+    return master_params.name
 
 
 def _VerifyCertificate(filename):
 
 
 def _VerifyCertificate(filename):
@@ -1931,6 +1942,26 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
       _ErrorIf(bool(missing), constants.CV_ENODENET, node,
                "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
 
       _ErrorIf(bool(missing), constants.CV_ENODENET, node,
                "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
 
+  def _VerifyNodeUserScripts(self, ninfo, nresult):
+    """Check the results of user scripts presence and executability on the node
+
+    @type ninfo: L{objects.Node}
+    @param ninfo: the node to check
+    @param nresult: the remote results for the node
+
+    """
+    node = ninfo.name
+
+    test = not constants.NV_USERSCRIPTS in nresult
+    self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, node,
+                  "did not return user scripts information")
+
+    broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
+    if not test:
+      self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, node,
+                    "user scripts not present or not executable: %s" %
+                    utils.CommaJoin(sorted(broken_scripts)))
+
   def _VerifyNodeNetwork(self, ninfo, nresult):
     """Check the node network connectivity results.
 
   def _VerifyNodeNetwork(self, ninfo, nresult):
     """Check the node network connectivity results.
 
@@ -2080,7 +2111,7 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
 
   @classmethod
   def _VerifyFiles(cls, errorif, nodeinfo, master_node, all_nvinfo,
 
   @classmethod
   def _VerifyFiles(cls, errorif, nodeinfo, master_node, all_nvinfo,
-                   (files_all, files_all_opt, files_mc, files_vm)):
+                   (files_all, files_opt, files_mc, files_vm)):
     """Verifies file checksums collected from all nodes.
 
     @param errorif: Callback for reporting errors
     """Verifies file checksums collected from all nodes.
 
     @param errorif: Callback for reporting errors
@@ -2089,14 +2120,9 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
     @param all_nvinfo: RPC results
 
     """
     @param all_nvinfo: RPC results
 
     """
-    assert (len(files_all | files_all_opt | files_mc | files_vm) ==
-            sum(map(len, [files_all, files_all_opt, files_mc, files_vm]))), \
-           "Found file listed in more than one file list"
-
     # Define functions determining which nodes to consider for a file
     files2nodefn = [
       (files_all, None),
     # Define functions determining which nodes to consider for a file
     files2nodefn = [
       (files_all, None),
-      (files_all_opt, None),
       (files_mc, lambda node: (node.master_candidate or
                                node.name == master_node)),
       (files_vm, lambda node: node.vm_capable),
       (files_mc, lambda node: (node.master_candidate or
                                node.name == master_node)),
       (files_vm, lambda node: node.vm_capable),
@@ -2113,7 +2139,7 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
                         frozenset(map(operator.attrgetter("name"), filenodes)))
                        for filename in files)
 
                         frozenset(map(operator.attrgetter("name"), filenodes)))
                        for filename in files)
 
-    assert set(nodefiles) == (files_all | files_all_opt | files_mc | files_vm)
+    assert set(nodefiles) == (files_all | files_mc | files_vm)
 
     fileinfo = dict((filename, {}) for filename in nodefiles)
     ignore_nodes = set()
 
     fileinfo = dict((filename, {}) for filename in nodefiles)
     ignore_nodes = set()
@@ -2155,7 +2181,7 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
       # Nodes missing file
       missing_file = expected_nodes - with_file
 
       # Nodes missing file
       missing_file = expected_nodes - with_file
 
-      if filename in files_all_opt:
+      if filename in files_opt:
         # All or no nodes
         errorif(missing_file and missing_file != expected_nodes,
                 constants.CV_ECLUSTERFILECHECK, None,
         # All or no nodes
         errorif(missing_file and missing_file != expected_nodes,
                 constants.CV_ECLUSTERFILECHECK, None,
@@ -2644,6 +2670,10 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
 
     feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_names))
 
 
     feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_names))
 
+    user_scripts = []
+    if self.cfg.GetUseExternalMipScript():
+      user_scripts.append(constants.EXTERNAL_MASTER_SETUP_SCRIPT)
+
     node_verify_param = {
       constants.NV_FILELIST:
         utils.UniqueSequence(filename
     node_verify_param = {
       constants.NV_FILELIST:
         utils.UniqueSequence(filename
@@ -2666,6 +2696,7 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
       constants.NV_MASTERIP: (master_node, master_ip),
       constants.NV_OSLIST: None,
       constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
       constants.NV_MASTERIP: (master_node, master_ip),
       constants.NV_OSLIST: None,
       constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
+      constants.NV_USERSCRIPTS: user_scripts,
       }
 
     if vg_name is not None:
       }
 
     if vg_name is not None:
@@ -2824,6 +2855,7 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
       nimg.call_ok = self._VerifyNode(node_i, nresult)
       self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
       self._VerifyNodeNetwork(node_i, nresult)
       nimg.call_ok = self._VerifyNode(node_i, nresult)
       self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
       self._VerifyNodeNetwork(node_i, nresult)
+      self._VerifyNodeUserScripts(node_i, nresult)
       self._VerifyOob(node_i, nresult)
 
       if nimg.vm_capable:
       self._VerifyOob(node_i, nresult)
 
       if nimg.vm_capable:
@@ -3154,7 +3186,7 @@ class LUGroupVerifyDisks(NoHooksLU):
       # any leftover items in nv_dict are missing LVs, let's arrange the data
       # better
       for key, inst in nv_dict.iteritems():
       # any leftover items in nv_dict are missing LVs, let's arrange the data
       # better
       for key, inst in nv_dict.iteritems():
-        res_missing.setdefault(inst, []).append(key)
+        res_missing.setdefault(inst, []).append(list(key))
 
     return (res_nodes, list(res_instances), res_missing)
 
 
     return (res_nodes, list(res_instances), res_missing)
 
@@ -3169,21 +3201,21 @@ class LUClusterRepairDiskSizes(NoHooksLU):
     if self.op.instances:
       self.wanted_names = _GetWantedInstances(self, self.op.instances)
       self.needed_locks = {
     if self.op.instances:
       self.wanted_names = _GetWantedInstances(self, self.op.instances)
       self.needed_locks = {
-        locking.LEVEL_NODE: [],
+        locking.LEVEL_NODE_RES: [],
         locking.LEVEL_INSTANCE: self.wanted_names,
         }
         locking.LEVEL_INSTANCE: self.wanted_names,
         }
-      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+      self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
     else:
       self.wanted_names = None
       self.needed_locks = {
     else:
       self.wanted_names = None
       self.needed_locks = {
-        locking.LEVEL_NODE: locking.ALL_SET,
+        locking.LEVEL_NODE_RES: locking.ALL_SET,
         locking.LEVEL_INSTANCE: locking.ALL_SET,
         }
     self.share_locks = _ShareAll()
 
   def DeclareLocks(self, level):
         locking.LEVEL_INSTANCE: locking.ALL_SET,
         }
     self.share_locks = _ShareAll()
 
   def DeclareLocks(self, level):
-    if level == locking.LEVEL_NODE and self.wanted_names is not None:
-      self._LockInstancesNodes(primary_only=True)
+    if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
+      self._LockInstancesNodes(primary_only=True, level=level)
 
   def CheckPrereq(self):
     """Check prerequisites.
 
   def CheckPrereq(self):
     """Check prerequisites.
@@ -3234,6 +3266,11 @@ class LUClusterRepairDiskSizes(NoHooksLU):
       for idx, disk in enumerate(instance.disks):
         per_node_disks[pnode].append((instance, idx, disk))
 
       for idx, disk in enumerate(instance.disks):
         per_node_disks[pnode].append((instance, idx, disk))
 
+    assert not (frozenset(per_node_disks.keys()) -
+                self.owned_locks(locking.LEVEL_NODE_RES)), \
+      "Not owning correct locks"
+    assert not self.owned_locks(locking.LEVEL_NODE)
+
     changed = []
     for node, dskl in per_node_disks.items():
       newl = [v[2].Copy() for v in dskl]
     changed = []
     for node, dskl in per_node_disks.items():
       newl = [v[2].Copy() for v in dskl]
@@ -3323,29 +3360,33 @@ class LUClusterRename(LogicalUnit):
 
     """
     clustername = self.op.name
 
     """
     clustername = self.op.name
-    ip = self.ip
+    new_ip = self.ip
 
     # shutdown the master IP
 
     # shutdown the master IP
-    master = self.cfg.GetMasterNode()
-    result = self.rpc.call_node_deactivate_master_ip(master)
+    master_params = self.cfg.GetMasterNetworkParameters()
+    ems = self.cfg.GetUseExternalMipScript()
+    result = self.rpc.call_node_deactivate_master_ip(master_params.name,
+                                                     master_params, ems)
     result.Raise("Could not disable the master role")
 
     try:
       cluster = self.cfg.GetClusterInfo()
       cluster.cluster_name = clustername
     result.Raise("Could not disable the master role")
 
     try:
       cluster = self.cfg.GetClusterInfo()
       cluster.cluster_name = clustername
-      cluster.master_ip = ip
+      cluster.master_ip = new_ip
       self.cfg.Update(cluster, feedback_fn)
 
       # update the known hosts file
       ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
       node_list = self.cfg.GetOnlineNodeList()
       try:
       self.cfg.Update(cluster, feedback_fn)
 
       # update the known hosts file
       ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
       node_list = self.cfg.GetOnlineNodeList()
       try:
-        node_list.remove(master)
+        node_list.remove(master_params.name)
       except ValueError:
         pass
       _UploadHelper(self, node_list, constants.SSH_KNOWN_HOSTS_FILE)
     finally:
       except ValueError:
         pass
       _UploadHelper(self, node_list, constants.SSH_KNOWN_HOSTS_FILE)
     finally:
-      result = self.rpc.call_node_activate_master_ip(master)
+      master_params.ip = new_ip
+      result = self.rpc.call_node_activate_master_ip(master_params.name,
+                                                     master_params, ems)
       msg = result.fail_msg
       if msg:
         self.LogWarning("Could not re-enable the master role on"
       msg = result.fail_msg
       if msg:
         self.LogWarning("Could not re-enable the master role on"
@@ -3675,6 +3716,9 @@ class LUClusterSetParams(LogicalUnit):
     if self.op.reserved_lvs is not None:
       self.cluster.reserved_lvs = self.op.reserved_lvs
 
     if self.op.reserved_lvs is not None:
       self.cluster.reserved_lvs = self.op.reserved_lvs
 
+    if self.op.use_external_mip_script is not None:
+      self.cluster.use_external_mip_script = self.op.use_external_mip_script
+
     def helper_os(aname, mods, desc):
       desc += " OS list"
       lst = getattr(self.cluster, aname)
     def helper_os(aname, mods, desc):
       desc += " OS list"
       lst = getattr(self.cluster, aname)
@@ -3699,33 +3743,40 @@ class LUClusterSetParams(LogicalUnit):
       helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
 
     if self.op.master_netdev:
       helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
 
     if self.op.master_netdev:
-      master = self.cfg.GetMasterNode()
+      master_params = self.cfg.GetMasterNetworkParameters()
+      ems = self.cfg.GetUseExternalMipScript()
       feedback_fn("Shutting down master ip on the current netdev (%s)" %
                   self.cluster.master_netdev)
       feedback_fn("Shutting down master ip on the current netdev (%s)" %
                   self.cluster.master_netdev)
-      result = self.rpc.call_node_deactivate_master_ip(master)
+      result = self.rpc.call_node_deactivate_master_ip(master_params.name,
+                                                       master_params, ems)
       result.Raise("Could not disable the master ip")
       feedback_fn("Changing master_netdev from %s to %s" %
       result.Raise("Could not disable the master ip")
       feedback_fn("Changing master_netdev from %s to %s" %
-                  (self.cluster.master_netdev, self.op.master_netdev))
+                  (master_params.netdev, self.op.master_netdev))
       self.cluster.master_netdev = self.op.master_netdev
 
     if self.op.master_netmask:
       self.cluster.master_netdev = self.op.master_netdev
 
     if self.op.master_netmask:
-      master = self.cfg.GetMasterNode()
+      master_params = self.cfg.GetMasterNetworkParameters()
       feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
       feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
-      result = self.rpc.call_node_change_master_netmask(master,
-                                                        self.op.master_netmask)
+      result = self.rpc.call_node_change_master_netmask(master_params.name,
+                                                        master_params.netmask,
+                                                        self.op.master_netmask,
+                                                        master_params.ip,
+                                                        master_params.netdev)
       if result.fail_msg:
         msg = "Could not change the master IP netmask: %s" % result.fail_msg
       if result.fail_msg:
         msg = "Could not change the master IP netmask: %s" % result.fail_msg
-        self.LogWarning(msg)
         feedback_fn(msg)
         feedback_fn(msg)
-      else:
-        self.cluster.master_netmask = self.op.master_netmask
+
+      self.cluster.master_netmask = self.op.master_netmask
 
     self.cfg.Update(self.cluster, feedback_fn)
 
     if self.op.master_netdev:
 
     self.cfg.Update(self.cluster, feedback_fn)
 
     if self.op.master_netdev:
+      master_params = self.cfg.GetMasterNetworkParameters()
       feedback_fn("Starting the master ip on the new master netdev (%s)" %
                   self.op.master_netdev)
       feedback_fn("Starting the master ip on the new master netdev (%s)" %
                   self.op.master_netdev)
-      result = self.rpc.call_node_activate_master_ip(master)
+      ems = self.cfg.GetUseExternalMipScript()
+      result = self.rpc.call_node_activate_master_ip(master_params.name,
+                                                     master_params, ems)
       if result.fail_msg:
         self.LogWarning("Could not re-enable the master ip on"
                         " the master, please restart manually: %s",
       if result.fail_msg:
         self.LogWarning("Could not re-enable the master ip on"
                         " the master, please restart manually: %s",
@@ -3760,6 +3811,7 @@ def _ComputeAncillaryFiles(cluster, redist):
     constants.CLUSTER_DOMAIN_SECRET_FILE,
     constants.SPICE_CERT_FILE,
     constants.SPICE_CACERT_FILE,
     constants.CLUSTER_DOMAIN_SECRET_FILE,
     constants.SPICE_CERT_FILE,
     constants.SPICE_CACERT_FILE,
+    constants.RAPI_USERS_FILE,
     ])
 
   if not redist:
     ])
 
   if not redist:
@@ -3772,27 +3824,43 @@ def _ComputeAncillaryFiles(cluster, redist):
   if cluster.modify_etc_hosts:
     files_all.add(constants.ETC_HOSTS)
 
   if cluster.modify_etc_hosts:
     files_all.add(constants.ETC_HOSTS)
 
-  # Files which must either exist on all nodes or on none
-  files_all_opt = set([
+  # Files which are optional, these must:
+  # - be present in one other category as well
+  # - either exist or not exist on all nodes of that category (mc, vm all)
+  files_opt = set([
     constants.RAPI_USERS_FILE,
     ])
 
   # Files which should only be on master candidates
   files_mc = set()
     constants.RAPI_USERS_FILE,
     ])
 
   # Files which should only be on master candidates
   files_mc = set()
+
   if not redist:
     files_mc.add(constants.CLUSTER_CONF_FILE)
 
   if not redist:
     files_mc.add(constants.CLUSTER_CONF_FILE)
 
+    # FIXME: this should also be replicated but Ganeti doesn't support files_mc
+    # replication
+    files_mc.add(constants.DEFAULT_MASTER_SETUP_SCRIPT)
+
   # Files which should only be on VM-capable nodes
   files_vm = set(filename
     for hv_name in cluster.enabled_hypervisors
   # Files which should only be on VM-capable nodes
   files_vm = set(filename
     for hv_name in cluster.enabled_hypervisors
-    for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles())
+    for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles()[0])
+
+  files_opt |= set(filename
+    for hv_name in cluster.enabled_hypervisors
+    for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles()[1])
 
 
-  # Filenames must be unique
-  assert (len(files_all | files_all_opt | files_mc | files_vm) ==
-          sum(map(len, [files_all, files_all_opt, files_mc, files_vm]))), \
+  # Filenames in each category must be unique
+  all_files_set = files_all | files_mc | files_vm
+  assert (len(all_files_set) ==
+          sum(map(len, [files_all, files_mc, files_vm]))), \
          "Found file listed in more than one file list"
 
          "Found file listed in more than one file list"
 
-  return (files_all, files_all_opt, files_mc, files_vm)
+  # Optional files must be present in one other category
+  assert all_files_set.issuperset(files_opt), \
+         "Optional file not in a different required list"
+
+  return (files_all, files_opt, files_mc, files_vm)
 
 
 def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True):
 
 
 def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True):
@@ -3826,7 +3894,7 @@ def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True):
       nodelist.remove(master_info.name)
 
   # Gather file lists
       nodelist.remove(master_info.name)
 
   # Gather file lists
-  (files_all, files_all_opt, files_mc, files_vm) = \
+  (files_all, _, files_mc, files_vm) = \
     _ComputeAncillaryFiles(cluster, True)
 
   # Never re-distribute configuration file from here
     _ComputeAncillaryFiles(cluster, True)
 
   # Never re-distribute configuration file from here
@@ -3836,7 +3904,6 @@ def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True):
 
   filemap = [
     (online_nodes, files_all),
 
   filemap = [
     (online_nodes, files_all),
-    (online_nodes, files_all_opt),
     (vm_nodes, files_vm),
     ]
 
     (vm_nodes, files_vm),
     ]
 
@@ -3876,8 +3943,10 @@ class LUClusterActivateMasterIp(NoHooksLU):
     """Activate the master IP.
 
     """
     """Activate the master IP.
 
     """
-    master = self.cfg.GetMasterNode()
-    self.rpc.call_node_activate_master_ip(master)
+    master_params = self.cfg.GetMasterNetworkParameters()
+    ems = self.cfg.GetUseExternalMipScript()
+    self.rpc.call_node_activate_master_ip(master_params.name,
+                                          master_params, ems)
 
 
 class LUClusterDeactivateMasterIp(NoHooksLU):
 
 
 class LUClusterDeactivateMasterIp(NoHooksLU):
@@ -3888,8 +3957,10 @@ class LUClusterDeactivateMasterIp(NoHooksLU):
     """Deactivate the master IP.
 
     """
     """Deactivate the master IP.
 
     """
-    master = self.cfg.GetMasterNode()
-    self.rpc.call_node_deactivate_master_ip(master)
+    master_params = self.cfg.GetMasterNetworkParameters()
+    ems = self.cfg.GetUseExternalMipScript()
+    self.rpc.call_node_deactivate_master_ip(master_params.name, master_params,
+                                            ems)
 
 
 def _WaitForSync(lu, instance, disks=None, oneshot=False):
 
 
 def _WaitForSync(lu, instance, disks=None, oneshot=False):
@@ -4411,6 +4482,9 @@ class LUNodeRemove(LogicalUnit):
 
     modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
 
 
     modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
 
+    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
+      "Not owning BGL"
+
     # Promote nodes to master candidate as needed
     _AdjustCandidatePool(self, exceptions=[node.name])
     self.context.RemoveNode(node.name)
     # Promote nodes to master candidate as needed
     _AdjustCandidatePool(self, exceptions=[node.name])
     self.context.RemoveNode(node.name)
@@ -4524,6 +4598,9 @@ class LUNodeQuery(NoHooksLU):
   def ExpandNames(self):
     self.nq.ExpandNames(self)
 
   def ExpandNames(self):
     self.nq.ExpandNames(self)
 
+  def DeclareLocks(self, level):
+    self.nq.DeclareLocks(self, level)
+
   def Exec(self, feedback_fn):
     return self.nq.OldStyleQuery(self)
 
   def Exec(self, feedback_fn):
     return self.nq.OldStyleQuery(self)
 
@@ -4542,8 +4619,9 @@ class LUNodeQueryvols(NoHooksLU):
                        selected=self.op.output_fields)
 
   def ExpandNames(self):
                        selected=self.op.output_fields)
 
   def ExpandNames(self):
+    self.share_locks = _ShareAll()
     self.needed_locks = {}
     self.needed_locks = {}
-    self.share_locks[locking.LEVEL_NODE] = 1
+
     if not self.op.nodes:
       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
     else:
     if not self.op.nodes:
       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
     else:
@@ -4610,8 +4688,8 @@ class LUNodeQueryStorage(NoHooksLU):
                        selected=self.op.output_fields)
 
   def ExpandNames(self):
                        selected=self.op.output_fields)
 
   def ExpandNames(self):
+    self.share_locks = _ShareAll()
     self.needed_locks = {}
     self.needed_locks = {}
-    self.share_locks[locking.LEVEL_NODE] = 1
 
     if self.op.nodes:
       self.needed_locks[locking.LEVEL_NODE] = \
 
     if self.op.nodes:
       self.needed_locks[locking.LEVEL_NODE] = \
@@ -5077,6 +5155,9 @@ class LUNodeAdd(LogicalUnit):
     new_node = self.new_node
     node = new_node.name
 
     new_node = self.new_node
     node = new_node.name
 
+    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
+      "Not owning BGL"
+
     # We adding a new node so we assume it's powered
     new_node.powered = True
 
     # We adding a new node so we assume it's powered
     new_node.powered = True
 
@@ -5215,6 +5296,13 @@ class LUNodeSetParams(LogicalUnit):
     self.lock_all = self.op.auto_promote and self.might_demote
     self.lock_instances = self.op.secondary_ip is not None
 
     self.lock_all = self.op.auto_promote and self.might_demote
     self.lock_instances = self.op.secondary_ip is not None
 
+  def _InstanceFilter(self, instance):
+    """Filter for getting affected instances.
+
+    """
+    return (instance.disk_template in constants.DTS_INT_MIRROR and
+            self.op.node_name in instance.all_nodes)
+
   def ExpandNames(self):
     if self.lock_all:
       self.needed_locks = {locking.LEVEL_NODE: locking.ALL_SET}
   def ExpandNames(self):
     if self.lock_all:
       self.needed_locks = {locking.LEVEL_NODE: locking.ALL_SET}
@@ -5222,28 +5310,8 @@ class LUNodeSetParams(LogicalUnit):
       self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
 
     if self.lock_instances:
       self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
 
     if self.lock_instances:
-      self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
-
-  def DeclareLocks(self, level):
-    # If we have locked all instances, before waiting to lock nodes, release
-    # all the ones living on nodes unrelated to the current operation.
-    if level == locking.LEVEL_NODE and self.lock_instances:
-      self.affected_instances = []
-      if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
-        instances_keep = []
-
-        # Build list of instances to release
-        locked_i = self.owned_locks(locking.LEVEL_INSTANCE)
-        for instance_name, instance in self.cfg.GetMultiInstanceInfo(locked_i):
-          if (instance.disk_template in constants.DTS_INT_MIRROR and
-              self.op.node_name in instance.all_nodes):
-            instances_keep.append(instance_name)
-            self.affected_instances.append(instance)
-
-        _ReleaseLocks(self, locking.LEVEL_INSTANCE, keep=instances_keep)
-
-        assert (set(self.owned_locks(locking.LEVEL_INSTANCE)) ==
-                set(instances_keep))
+      self.needed_locks[locking.LEVEL_INSTANCE] = \
+        frozenset(self.cfg.GetInstancesInfoByFilter(self._InstanceFilter))
 
   def BuildHooksEnv(self):
     """Build hooks env.
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -5275,6 +5343,25 @@ class LUNodeSetParams(LogicalUnit):
     """
     node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
 
     """
     node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
 
+    if self.lock_instances:
+      affected_instances = \
+        self.cfg.GetInstancesInfoByFilter(self._InstanceFilter)
+
+      # Verify instance locks
+      owned_instances = self.owned_locks(locking.LEVEL_INSTANCE)
+      wanted_instances = frozenset(affected_instances.keys())
+      if wanted_instances - owned_instances:
+        raise errors.OpPrereqError("Instances affected by changing node %s's"
+                                   " secondary IP address have changed since"
+                                   " locks were acquired, wanted '%s', have"
+                                   " '%s'; retry the operation" %
+                                   (self.op.node_name,
+                                    utils.CommaJoin(wanted_instances),
+                                    utils.CommaJoin(owned_instances)),
+                                   errors.ECODE_STATE)
+    else:
+      affected_instances = None
+
     if (self.op.master_candidate is not None or
         self.op.drained is not None or
         self.op.offline is not None):
     if (self.op.master_candidate is not None or
         self.op.drained is not None or
         self.op.offline is not None):
@@ -5364,7 +5451,9 @@ class LUNodeSetParams(LogicalUnit):
 
     if old_role == self._ROLE_OFFLINE and new_role != old_role:
       # Trying to transition out of offline status
 
     if old_role == self._ROLE_OFFLINE and new_role != old_role:
       # Trying to transition out of offline status
-      result = self.rpc.call_version([node.name])[node.name]
+      # TODO: Use standard RPC runner, but make sure it works when the node is
+      # still marked offline
+      result = rpc.BootstrapRunner().call_version([node.name])[node.name]
       if result.fail_msg:
         raise errors.OpPrereqError("Node %s is being de-offlined but fails"
                                    " to report its version: %s" %
       if result.fail_msg:
         raise errors.OpPrereqError("Node %s is being de-offlined but fails"
                                    " to report its version: %s" %
@@ -5383,15 +5472,19 @@ class LUNodeSetParams(LogicalUnit):
         raise errors.OpPrereqError("Cannot change the secondary ip on a single"
                                    " homed cluster", errors.ECODE_INVAL)
 
         raise errors.OpPrereqError("Cannot change the secondary ip on a single"
                                    " homed cluster", errors.ECODE_INVAL)
 
+      assert not (frozenset(affected_instances) -
+                  self.owned_locks(locking.LEVEL_INSTANCE))
+
       if node.offline:
       if node.offline:
-        if self.affected_instances:
-          raise errors.OpPrereqError("Cannot change secondary ip: offline"
-                                     " node has instances (%s) configured"
-                                     " to use it" % self.affected_instances)
+        if affected_instances:
+          raise errors.OpPrereqError("Cannot change secondary IP address:"
+                                     " offline node has instances (%s)"
+                                     " configured to use it" %
+                                     utils.CommaJoin(affected_instances.keys()))
       else:
         # On online nodes, check that no instances are running, and that
         # the node has the new ip and we can reach it.
       else:
         # On online nodes, check that no instances are running, and that
         # the node has the new ip and we can reach it.
-        for instance in self.affected_instances:
+        for instance in affected_instances.values():
           _CheckInstanceDown(self, instance, "cannot change secondary ip")
 
         _CheckNodeHasSecondaryIP(self, node.name, self.op.secondary_ip, True)
           _CheckInstanceDown(self, instance, "cannot change secondary ip")
 
         _CheckNodeHasSecondaryIP(self, node.name, self.op.secondary_ip, True)
@@ -5544,6 +5637,7 @@ class LUClusterQuery(NoHooksLU):
       "candidate_pool_size": cluster.candidate_pool_size,
       "master_netdev": cluster.master_netdev,
       "master_netmask": cluster.master_netmask,
       "candidate_pool_size": cluster.candidate_pool_size,
       "master_netdev": cluster.master_netdev,
       "master_netmask": cluster.master_netmask,
+      "use_external_mip_script": cluster.use_external_mip_script,
       "volume_group_name": cluster.volume_group_name,
       "drbd_usermode_helper": cluster.drbd_usermode_helper,
       "file_storage_dir": cluster.file_storage_dir,
       "volume_group_name": cluster.volume_group_name,
       "drbd_usermode_helper": cluster.drbd_usermode_helper,
       "file_storage_dir": cluster.file_storage_dir,
@@ -6070,9 +6164,11 @@ class LUInstanceStartup(LogicalUnit):
 
       _StartInstanceDisks(self, instance, force)
 
 
       _StartInstanceDisks(self, instance, force)
 
-      result = self.rpc.call_instance_start(node_current, instance,
-                                            self.op.hvparams, self.op.beparams,
-                                            self.op.startup_paused)
+      result = \
+        self.rpc.call_instance_start(node_current,
+                                     (instance, self.op.hvparams,
+                                      self.op.beparams),
+                                     self.op.startup_paused)
       msg = result.fail_msg
       if msg:
         _ShutdownInstanceDisks(self, instance)
       msg = result.fail_msg
       if msg:
         _ShutdownInstanceDisks(self, instance)
@@ -6162,8 +6258,8 @@ class LUInstanceReboot(LogicalUnit):
         self.LogInfo("Instance %s was already stopped, starting now",
                      instance.name)
       _StartInstanceDisks(self, instance, ignore_secondaries)
         self.LogInfo("Instance %s was already stopped, starting now",
                      instance.name)
       _StartInstanceDisks(self, instance, ignore_secondaries)
-      result = self.rpc.call_instance_start(node_current, instance,
-                                            None, None, False)
+      result = self.rpc.call_instance_start(node_current,
+                                            (instance, None, None), False)
       msg = result.fail_msg
       if msg:
         _ShutdownInstanceDisks(self, instance)
       msg = result.fail_msg
       if msg:
         _ShutdownInstanceDisks(self, instance)
@@ -6324,9 +6420,9 @@ class LUInstanceReinstall(LogicalUnit):
     try:
       feedback_fn("Running the instance OS create scripts...")
       # FIXME: pass debug option from opcode to backend
     try:
       feedback_fn("Running the instance OS create scripts...")
       # FIXME: pass debug option from opcode to backend
-      result = self.rpc.call_instance_os_add(inst.primary_node, inst, True,
-                                             self.op.debug_level,
-                                             osparams=self.os_inst)
+      result = self.rpc.call_instance_os_add(inst.primary_node,
+                                             (inst, self.os_inst), True,
+                                             self.op.debug_level)
       result.Raise("Could not install OS for instance %s on node %s" %
                    (inst.name, inst.primary_node))
     finally:
       result.Raise("Could not install OS for instance %s on node %s" %
                    (inst.name, inst.primary_node))
     finally:
@@ -6360,6 +6456,10 @@ class LUInstanceRecreateDisks(LogicalUnit):
       # otherwise we need to lock all nodes for disk re-creation
       primary_only = bool(self.op.nodes)
       self._LockInstancesNodes(primary_only=primary_only)
       # otherwise we need to lock all nodes for disk re-creation
       primary_only = bool(self.op.nodes)
       self._LockInstancesNodes(primary_only=primary_only)
+    elif level == locking.LEVEL_NODE_RES:
+      # Copy node locks
+      self.needed_locks[locking.LEVEL_NODE_RES] = \
+        self.needed_locks[locking.LEVEL_NODE][:]
 
   def BuildHooksEnv(self):
     """Build hooks env.
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -6406,7 +6506,8 @@ class LUInstanceRecreateDisks(LogicalUnit):
                                  self.op.instance_name, errors.ECODE_INVAL)
     # if we replace nodes *and* the old primary is offline, we don't
     # check
                                  self.op.instance_name, errors.ECODE_INVAL)
     # if we replace nodes *and* the old primary is offline, we don't
     # check
-    assert instance.primary_node in self.needed_locks[locking.LEVEL_NODE]
+    assert instance.primary_node in self.owned_locks(locking.LEVEL_NODE)
+    assert instance.primary_node in self.owned_locks(locking.LEVEL_NODE_RES)
     old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
     if not (self.op.nodes and old_pnode.offline):
       _CheckInstanceDown(self, instance, "cannot recreate disks")
     old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
     if not (self.op.nodes and old_pnode.offline):
       _CheckInstanceDown(self, instance, "cannot recreate disks")
@@ -6430,6 +6531,9 @@ class LUInstanceRecreateDisks(LogicalUnit):
     """
     instance = self.instance
 
     """
     instance = self.instance
 
+    assert (self.owned_locks(locking.LEVEL_NODE) ==
+            self.owned_locks(locking.LEVEL_NODE_RES))
+
     to_skip = []
     mods = [] # keeps track of needed logical_id changes
 
     to_skip = []
     mods = [] # keeps track of needed logical_id changes
 
@@ -6598,11 +6702,16 @@ class LUInstanceRemove(LogicalUnit):
   def ExpandNames(self):
     self._ExpandAndLockInstance()
     self.needed_locks[locking.LEVEL_NODE] = []
   def ExpandNames(self):
     self._ExpandAndLockInstance()
     self.needed_locks[locking.LEVEL_NODE] = []
+    self.needed_locks[locking.LEVEL_NODE_RES] = []
     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
 
   def DeclareLocks(self, level):
     if level == locking.LEVEL_NODE:
       self._LockInstancesNodes()
     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
 
   def DeclareLocks(self, level):
     if level == locking.LEVEL_NODE:
       self._LockInstancesNodes()
+    elif level == locking.LEVEL_NODE_RES:
+      # Copy node locks
+      self.needed_locks[locking.LEVEL_NODE_RES] = \
+        self.needed_locks[locking.LEVEL_NODE][:]
 
   def BuildHooksEnv(self):
     """Build hooks env.
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -6651,6 +6760,12 @@ class LUInstanceRemove(LogicalUnit):
                                  " node %s: %s" %
                                  (instance.name, instance.primary_node, msg))
 
                                  " node %s: %s" %
                                  (instance.name, instance.primary_node, msg))
 
+    assert (self.owned_locks(locking.LEVEL_NODE) ==
+            self.owned_locks(locking.LEVEL_NODE_RES))
+    assert not (set(instance.all_nodes) -
+                self.owned_locks(locking.LEVEL_NODE)), \
+      "Not owning correct locks"
+
     _RemoveInstance(self, feedback_fn, instance, self.op.ignore_failures)
 
 
     _RemoveInstance(self, feedback_fn, instance, self.op.ignore_failures)
 
 
@@ -7027,8 +7142,8 @@ class LUInstanceMove(LogicalUnit):
         _ShutdownInstanceDisks(self, instance)
         raise errors.OpExecError("Can't activate the instance's disks")
 
         _ShutdownInstanceDisks(self, instance)
         raise errors.OpExecError("Can't activate the instance's disks")
 
-      result = self.rpc.call_instance_start(target_node, instance,
-                                            None, None, False)
+      result = self.rpc.call_instance_start(target_node,
+                                            (instance, None, None), False)
       msg = result.fail_msg
       if msg:
         _ShutdownInstanceDisks(self, instance)
       msg = result.fail_msg
       if msg:
         _ShutdownInstanceDisks(self, instance)
@@ -7691,7 +7806,7 @@ class TLMigrateInstance(Tasklet):
 
       self.feedback_fn("* starting the instance on the target node %s" %
                        target_node)
 
       self.feedback_fn("* starting the instance on the target node %s" %
                        target_node)
-      result = self.rpc.call_instance_start(target_node, instance, None, None,
+      result = self.rpc.call_instance_start(target_node, (instance, None, None),
                                             False)
       msg = result.fail_msg
       if msg:
                                             False)
       msg = result.fail_msg
       if msg:
@@ -7823,7 +7938,7 @@ def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names,
   shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
                           logical_id=(vgnames[0], names[0]))
   shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
                           logical_id=(vgnames[0], names[0]))
-  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
+  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=DRBD_META_SIZE,
                           logical_id=(vgnames[1], names[1]))
   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
                           logical_id=(primary, secondary, port,
                           logical_id=(vgnames[1], names[1]))
   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
                           logical_id=(primary, secondary, port,
@@ -8143,7 +8258,7 @@ def _ComputeDiskSizePerVG(disk_template, disks):
     constants.DT_DISKLESS: {},
     constants.DT_PLAIN: _compute(disks, 0),
     # 128 MB are added for drbd metadata for each disk
     constants.DT_DISKLESS: {},
     constants.DT_PLAIN: _compute(disks, 0),
     # 128 MB are added for drbd metadata for each disk
-    constants.DT_DRBD8: _compute(disks, 128),
+    constants.DT_DRBD8: _compute(disks, DRBD_META_SIZE),
     constants.DT_FILE: {},
     constants.DT_SHARED_FILE: {},
   }
     constants.DT_FILE: {},
     constants.DT_SHARED_FILE: {},
   }
@@ -8164,7 +8279,8 @@ def _ComputeDiskSize(disk_template, disks):
     constants.DT_DISKLESS: None,
     constants.DT_PLAIN: sum(d[constants.IDISK_SIZE] for d in disks),
     # 128 MB are added for drbd metadata for each disk
     constants.DT_DISKLESS: None,
     constants.DT_PLAIN: sum(d[constants.IDISK_SIZE] for d in disks),
     # 128 MB are added for drbd metadata for each disk
-    constants.DT_DRBD8: sum(d[constants.IDISK_SIZE] + 128 for d in disks),
+    constants.DT_DRBD8:
+      sum(d[constants.IDISK_SIZE] + DRBD_META_SIZE for d in disks),
     constants.DT_FILE: None,
     constants.DT_SHARED_FILE: 0,
     constants.DT_BLOCK: 0,
     constants.DT_FILE: None,
     constants.DT_SHARED_FILE: 0,
     constants.DT_BLOCK: 0,
@@ -8210,9 +8326,11 @@ def _CheckHVParams(lu, nodenames, hvname, hvparams):
 
   """
   nodenames = _FilterVmNodes(lu, nodenames)
 
   """
   nodenames = _FilterVmNodes(lu, nodenames)
-  hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
-                                                  hvname,
-                                                  hvparams)
+
+  cluster = lu.cfg.GetClusterInfo()
+  hvfull = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
+
+  hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames, hvname, hvfull)
   for node in nodenames:
     info = hvinfo[node]
     if info.offline:
   for node in nodenames:
     info = hvinfo[node]
     if info.offline:
@@ -8238,7 +8356,7 @@ def _CheckOSParams(lu, required, nodenames, osname, osparams):
 
   """
   nodenames = _FilterVmNodes(lu, nodenames)
 
   """
   nodenames = _FilterVmNodes(lu, nodenames)
-  result = lu.rpc.call_os_validate(required, nodenames, osname,
+  result = lu.rpc.call_os_validate(nodenames, required, osname,
                                    [constants.OS_VALIDATE_PARAMETERS],
                                    osparams)
   for node, nres in result.items():
                                    [constants.OS_VALIDATE_PARAMETERS],
                                    osparams)
   for node, nres in result.items():
@@ -8431,7 +8549,11 @@ class LUInstanceCreate(LogicalUnit):
     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
 
     if self.op.iallocator:
     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
 
     if self.op.iallocator:
+      # TODO: Find a solution to not lock all nodes in the cluster, e.g. by
+      # specifying a group on instance creation and then selecting nodes from
+      # that group
       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+      self.needed_locks[locking.LEVEL_NODE_RES] = locking.ALL_SET
     else:
       self.op.pnode = _ExpandNodeName(self.cfg, self.op.pnode)
       nodelist = [self.op.pnode]
     else:
       self.op.pnode = _ExpandNodeName(self.cfg, self.op.pnode)
       nodelist = [self.op.pnode]
@@ -8439,6 +8561,9 @@ class LUInstanceCreate(LogicalUnit):
         self.op.snode = _ExpandNodeName(self.cfg, self.op.snode)
         nodelist.append(self.op.snode)
       self.needed_locks[locking.LEVEL_NODE] = nodelist
         self.op.snode = _ExpandNodeName(self.cfg, self.op.snode)
         nodelist.append(self.op.snode)
       self.needed_locks[locking.LEVEL_NODE] = nodelist
+      # Lock resources of instance's primary and secondary nodes (copy to
+      # prevent accidential modification)
+      self.needed_locks[locking.LEVEL_NODE_RES] = list(nodelist)
 
     # in case of import lock the source node too
     if self.op.mode == constants.INSTANCE_IMPORT:
 
     # in case of import lock the source node too
     if self.op.mode == constants.INSTANCE_IMPORT:
@@ -9045,6 +9170,10 @@ class LUInstanceCreate(LogicalUnit):
     instance = self.op.instance_name
     pnode_name = self.pnode.name
 
     instance = self.op.instance_name
     pnode_name = self.pnode.name
 
+    assert not (self.owned_locks(locking.LEVEL_NODE_RES) -
+                self.owned_locks(locking.LEVEL_NODE)), \
+      "Node locks differ from node resource locks"
+
     ht_kind = self.op.hypervisor
     if ht_kind in constants.HTS_REQ_PORT:
       network_port = self.cfg.AllocatePort()
     ht_kind = self.op.hypervisor
     if ht_kind in constants.HTS_REQ_PORT:
       network_port = self.cfg.AllocatePort()
@@ -9147,6 +9276,9 @@ class LUInstanceCreate(LogicalUnit):
       raise errors.OpExecError("There are some degraded disks for"
                                " this instance")
 
       raise errors.OpExecError("There are some degraded disks for"
                                " this instance")
 
+    # Release all node resource locks
+    _ReleaseLocks(self, locking.LEVEL_NODE_RES)
+
     if iobj.disk_template != constants.DT_DISKLESS and not self.adopt_disks:
       if self.op.mode == constants.INSTANCE_CREATE:
         if not self.op.no_install:
     if iobj.disk_template != constants.DT_DISKLESS and not self.adopt_disks:
       if self.op.mode == constants.INSTANCE_CREATE:
         if not self.op.no_install:
@@ -9164,7 +9296,7 @@ class LUInstanceCreate(LogicalUnit):
           feedback_fn("* running the instance OS create scripts...")
           # FIXME: pass debug option from opcode to backend
           os_add_result = \
           feedback_fn("* running the instance OS create scripts...")
           # FIXME: pass debug option from opcode to backend
           os_add_result = \
-            self.rpc.call_instance_os_add(pnode_name, iobj, False,
+            self.rpc.call_instance_os_add(pnode_name, (iobj, None), False,
                                           self.op.debug_level)
           if pause_sync:
             feedback_fn("* resuming disk sync")
                                           self.op.debug_level)
           if pause_sync:
             feedback_fn("* resuming disk sync")
@@ -9239,13 +9371,15 @@ class LUInstanceCreate(LogicalUnit):
         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
                                      % self.op.mode)
 
         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
                                      % self.op.mode)
 
+    assert not self.owned_locks(locking.LEVEL_NODE_RES)
+
     if self.op.start:
       iobj.admin_up = True
       self.cfg.Update(iobj, feedback_fn)
       logging.info("Starting instance %s on node %s", instance, pnode_name)
       feedback_fn("* starting instance...")
     if self.op.start:
       iobj.admin_up = True
       self.cfg.Update(iobj, feedback_fn)
       logging.info("Starting instance %s on node %s", instance, pnode_name)
       feedback_fn("* starting instance...")
-      result = self.rpc.call_instance_start(pnode_name, iobj,
-                                            None, None, False)
+      result = self.rpc.call_instance_start(pnode_name, (iobj, None, None),
+                                            False)
       result.Raise("Could not start instance")
 
     return list(iobj.all_nodes)
       result.Raise("Could not start instance")
 
     return list(iobj.all_nodes)
@@ -9262,6 +9396,7 @@ class LUInstanceConsole(NoHooksLU):
   REQ_BGL = False
 
   def ExpandNames(self):
   REQ_BGL = False
 
   def ExpandNames(self):
+    self.share_locks = _ShareAll()
     self._ExpandAndLockInstance()
 
   def CheckPrereq(self):
     self._ExpandAndLockInstance()
 
   def CheckPrereq(self):
@@ -9829,7 +9964,7 @@ class TLReplaceDisks(Tasklet):
       lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
                              logical_id=(vg_data, names[0]))
       vg_meta = dev.children[1].logical_id[0]
       lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
                              logical_id=(vg_data, names[0]))
       vg_meta = dev.children[1].logical_id[0]
-      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
+      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=DRBD_META_SIZE,
                              logical_id=(vg_meta, names[1]))
 
       new_lvs = [lv_data, lv_meta]
                              logical_id=(vg_meta, names[1]))
 
       new_lvs = [lv_data, lv_meta]
@@ -10459,11 +10594,16 @@ class LUInstanceGrowDisk(LogicalUnit):
   def ExpandNames(self):
     self._ExpandAndLockInstance()
     self.needed_locks[locking.LEVEL_NODE] = []
   def ExpandNames(self):
     self._ExpandAndLockInstance()
     self.needed_locks[locking.LEVEL_NODE] = []
-    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+    self.needed_locks[locking.LEVEL_NODE_RES] = []
+    self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
 
   def DeclareLocks(self, level):
     if level == locking.LEVEL_NODE:
       self._LockInstancesNodes()
 
   def DeclareLocks(self, level):
     if level == locking.LEVEL_NODE:
       self._LockInstancesNodes()
+    elif level == locking.LEVEL_NODE_RES:
+      # Copy node locks
+      self.needed_locks[locking.LEVEL_NODE_RES] = \
+        self.needed_locks[locking.LEVEL_NODE][:]
 
   def BuildHooksEnv(self):
     """Build hooks env.
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -10520,10 +10660,18 @@ class LUInstanceGrowDisk(LogicalUnit):
     instance = self.instance
     disk = self.disk
 
     instance = self.instance
     disk = self.disk
 
+    assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
+    assert (self.owned_locks(locking.LEVEL_NODE) ==
+            self.owned_locks(locking.LEVEL_NODE_RES))
+
     disks_ok, _ = _AssembleInstanceDisks(self, self.instance, disks=[disk])
     if not disks_ok:
       raise errors.OpExecError("Cannot activate block device to grow")
 
     disks_ok, _ = _AssembleInstanceDisks(self, self.instance, disks=[disk])
     if not disks_ok:
       raise errors.OpExecError("Cannot activate block device to grow")
 
+    feedback_fn("Growing disk %s of instance '%s' by %s" %
+                (self.op.disk, instance.name,
+                 utils.FormatUnit(self.op.amount, "h")))
+
     # First run all grow ops in dry-run mode
     for node in instance.all_nodes:
       self.cfg.SetDiskID(disk, node)
     # First run all grow ops in dry-run mode
     for node in instance.all_nodes:
       self.cfg.SetDiskID(disk, node)
@@ -10546,6 +10694,13 @@ class LUInstanceGrowDisk(LogicalUnit):
 
     disk.RecordGrow(self.op.amount)
     self.cfg.Update(instance, feedback_fn)
 
     disk.RecordGrow(self.op.amount)
     self.cfg.Update(instance, feedback_fn)
+
+    # Changes have been recorded, release node lock
+    _ReleaseLocks(self, locking.LEVEL_NODE)
+
+    # Downgrade lock while waiting for sync
+    self.glm.downgrade(locking.LEVEL_INSTANCE)
+
     if self.op.wait_for_sync:
       disk_abort = not _WaitForSync(self, instance, disks=[disk])
       if disk_abort:
     if self.op.wait_for_sync:
       disk_abort = not _WaitForSync(self, instance, disks=[disk])
       if disk_abort:
@@ -10558,6 +10713,9 @@ class LUInstanceGrowDisk(LogicalUnit):
                            " not supposed to be running because no wait for"
                            " sync mode was requested")
 
                            " not supposed to be running because no wait for"
                            " sync mode was requested")
 
+    assert self.owned_locks(locking.LEVEL_NODE_RES)
+    assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
+
 
 class LUInstanceQueryData(NoHooksLU):
   """Query runtime instance data.
 
 class LUInstanceQueryData(NoHooksLU):
   """Query runtime instance data.
@@ -11890,8 +12048,8 @@ class LUBackupExport(LogicalUnit):
             not self.op.remove_instance):
           assert not activate_disks
           feedback_fn("Starting instance %s" % instance.name)
             not self.op.remove_instance):
           assert not activate_disks
           feedback_fn("Starting instance %s" % instance.name)
-          result = self.rpc.call_instance_start(src_node, instance,
-                                                None, None, False)
+          result = self.rpc.call_instance_start(src_node,
+                                                (instance, None, None), False)
           msg = result.fail_msg
           if msg:
             feedback_fn("Failed to start instance: %s" % msg)
           msg = result.fail_msg
           if msg:
             feedback_fn("Failed to start instance: %s" % msg)
@@ -13023,9 +13181,9 @@ class IAllocator(object):
   # pylint: disable=R0902
   # lots of instance attributes
 
   # pylint: disable=R0902
   # lots of instance attributes
 
-  def __init__(self, cfg, rpc, mode, **kwargs):
+  def __init__(self, cfg, rpc_runner, mode, **kwargs):
     self.cfg = cfg
     self.cfg = cfg
-    self.rpc = rpc
+    self.rpc = rpc_runner
     # init buffer variables
     self.in_text = self.out_text = self.in_data = self.out_data = None
     # init all input fields so that pylint is happy
     # init buffer variables
     self.in_text = self.out_text = self.in_data = self.out_data = None
     # init all input fields so that pylint is happy