Get rid of constants.HT_HVM_DEFAULT_BOOT_ORDER
[ganeti-local] / lib / cmdlib.py
index 830cd07..98668b0 100644 (file)
@@ -30,10 +30,11 @@ import time
 import tempfile
 import re
 import platform
+import logging
+import copy
+import random
 
-from ganeti import rpc
 from ganeti import ssh
-from ganeti import logger
 from ganeti import utils
 from ganeti import errors
 from ganeti import hypervisor
@@ -42,6 +43,7 @@ from ganeti import constants
 from ganeti import objects
 from ganeti import opcodes
 from ganeti import serializer
+from ganeti import ssconf
 
 
 class LogicalUnit(object):
@@ -54,8 +56,6 @@ class LogicalUnit(object):
     - implement BuildHooksEnv
     - redefine HPATH and HTYPE
     - optionally redefine their run requirements:
-        REQ_MASTER: the LU needs to run on the master node
-        REQ_WSSTORE: the LU needs a writable SimpleStore
         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
 
   Note that all commands require root permissions.
@@ -64,11 +64,9 @@ class LogicalUnit(object):
   HPATH = None
   HTYPE = None
   _OP_REQP = []
-  REQ_MASTER = True
-  REQ_WSSTORE = False
   REQ_BGL = True
 
-  def __init__(self, processor, op, context, sstore):
+  def __init__(self, processor, op, context, rpc):
     """Constructor for LogicalUnit.
 
     This needs to be overriden in derived classes in order to check op
@@ -78,40 +76,56 @@ class LogicalUnit(object):
     self.proc = processor
     self.op = op
     self.cfg = context.cfg
-    self.sstore = sstore
     self.context = context
+    self.rpc = rpc
+    # Dicts used to declare locking needs to mcpu
     self.needed_locks = None
     self.acquired_locks = {}
     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
+    self.add_locks = {}
+    self.remove_locks = {}
     # Used to force good behavior when calling helper functions
     self.recalculate_locks = {}
     self.__ssh = None
+    # logging
+    self.LogWarning = processor.LogWarning
+    self.LogInfo = processor.LogInfo
 
     for attr_name in self._OP_REQP:
       attr_val = getattr(op, attr_name, None)
       if attr_val is None:
         raise errors.OpPrereqError("Required parameter '%s' missing" %
                                    attr_name)
-
-    if not self.cfg.IsCluster():
-      raise errors.OpPrereqError("Cluster not initialized yet,"
-                                 " use 'gnt-cluster init' first.")
-    if self.REQ_MASTER:
-      master = sstore.GetMasterNode()
-      if master != utils.HostInfo().name:
-        raise errors.OpPrereqError("Commands must be run on the master"
-                                   " node %s" % master)
+    self.CheckArguments()
 
   def __GetSSH(self):
     """Returns the SshRunner object
 
     """
     if not self.__ssh:
-      self.__ssh = ssh.SshRunner(self.sstore)
+      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
     return self.__ssh
 
   ssh = property(fget=__GetSSH)
 
+  def CheckArguments(self):
+    """Check syntactic validity for the opcode arguments.
+
+    This method is for doing a simple syntactic check and ensure
+    validity of opcode parameters, without any cluster-related
+    checks. While the same can be accomplished in ExpandNames and/or
+    CheckPrereq, doing these separate is better because:
+
+      - ExpandNames is left as as purely a lock-related function
+      - CheckPrereq is run after we have aquired locks (and possible
+        waited for them)
+
+    The function is allowed to change the self.op attribute so that
+    later methods can no longer worry about missing parameters.
+
+    """
+    pass
+
   def ExpandNames(self):
     """Expand names for this LU.
 
@@ -123,29 +137,29 @@ class LogicalUnit(object):
     LUs which implement this method must also populate the self.needed_locks
     member, as a dict with lock levels as keys, and a list of needed lock names
     as values. Rules:
-      - Use an empty dict if you don't need any lock
-      - If you don't need any lock at a particular level omit that level
-      - Don't put anything for the BGL level
-      - If you want all locks at a level use None as a value
-        (this reflects what LockSet does, and will be replaced before
-        CheckPrereq with the full list of nodes that have been locked)
+
+      - use an empty dict if you don't need any lock
+      - if you don't need any lock at a particular level omit that level
+      - don't put anything for the BGL level
+      - if you want all locks at a level use locking.ALL_SET as a value
 
     If you need to share locks (rather than acquire them exclusively) at one
     level you can modify self.share_locks, setting a true value (usually 1) for
     that level. By default locks are not shared.
 
-    Examples:
-    # Acquire all nodes and one instance
-    self.needed_locks = {
-      locking.LEVEL_NODE: None,
-      locking.LEVEL_INSTANCES: ['instance1.example.tld'],
-    }
-    # Acquire just two nodes
-    self.needed_locks = {
-      locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
-    }
-    # Acquire no locks
-    self.needed_locks = {} # No, you can't leave it to the default value None
+    Examples::
+
+      # Acquire all nodes and one instance
+      self.needed_locks = {
+        locking.LEVEL_NODE: locking.ALL_SET,
+        locking.LEVEL_INSTANCE: ['instance1.example.tld'],
+      }
+      # Acquire just two nodes
+      self.needed_locks = {
+        locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
+      }
+      # Acquire no locks
+      self.needed_locks = {} # No, you can't leave it to the default value None
 
     """
     # The implementation of this method is mandatory only if the new LU is
@@ -232,11 +246,14 @@ class LogicalUnit(object):
     previous result is passed back unchanged but any LU can define it if it
     wants to use the local cluster hook-scripts somehow.
 
-    Args:
-      phase: the hooks phase that has just been run
-      hooks_results: the results of the multi-node hooks rpc call
-      feedback_fn: function to send feedback back to the caller
-      lu_result: the previous result this LU had, or None in the PRE phase.
+    @param phase: one of L{constants.HOOKS_PHASE_POST} or
+        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
+    @param hook_results: the results of the multi-node hooks rpc call
+    @param feedback_fn: function used send feedback back to the caller
+    @param lu_result: the previous Exec result this LU had, or None
+        in the PRE phase
+    @return: the new Exec result, based on the previous result
+        and hook results
 
     """
     return lu_result
@@ -263,7 +280,7 @@ class LogicalUnit(object):
     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
     self.op.instance_name = expanded_name
 
-  def _LockInstancesNodes(self):
+  def _LockInstancesNodes(self, primary_only=False):
     """Helper function to declare instances' nodes for locking.
 
     This function should be called after locking one or more instances to lock
@@ -277,10 +294,13 @@ class LogicalUnit(object):
     In the future it may grow parameters to just lock some instance's nodes, or
     to just lock primaries or secondary nodes, if needed.
 
-    If should be called in DeclareLocks in a way similar to:
+    If should be called in DeclareLocks in a way similar to::
 
-    if level == locking.LEVEL_NODE:
-      self._LockInstancesNodes()
+      if level == locking.LEVEL_NODE:
+        self._LockInstancesNodes()
+
+    @type primary_only: boolean
+    @param primary_only: only lock primary nodes of locked instances
 
     """
     assert locking.LEVEL_NODE in self.recalculate_locks, \
@@ -295,8 +315,13 @@ class LogicalUnit(object):
     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
       instance = self.context.cfg.GetInstanceInfo(instance_name)
       wanted_nodes.append(instance.primary_node)
-      wanted_nodes.extend(instance.secondary_nodes)
-    self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
+      if not primary_only:
+        wanted_nodes.extend(instance.secondary_nodes)
+
+    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
+      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
+    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
+      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
 
     del self.recalculate_locks[locking.LEVEL_NODE]
 
@@ -315,32 +340,43 @@ class NoHooksLU(LogicalUnit):
 def _GetWantedNodes(lu, nodes):
   """Returns list of checked and expanded node names.
 
-  Args:
-    nodes: List of nodes (strings) or None for all
+  @type lu: L{LogicalUnit}
+  @param lu: the logical unit on whose behalf we execute
+  @type nodes: list
+  @param nodes: list of node names or None for all nodes
+  @rtype: list
+  @return: the list of nodes, sorted
+  @raise errors.OpProgrammerError: if the nodes parameter is wrong type
 
   """
   if not isinstance(nodes, list):
     raise errors.OpPrereqError("Invalid argument type 'nodes'")
 
-  if nodes:
-    wanted = []
+  if not nodes:
+    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
+      " non-empty list of nodes whose name is to be expanded.")
 
-    for name in nodes:
-      node = lu.cfg.ExpandNodeName(name)
-      if node is None:
-        raise errors.OpPrereqError("No such node name '%s'" % name)
-      wanted.append(node)
+  wanted = []
+  for name in nodes:
+    node = lu.cfg.ExpandNodeName(name)
+    if node is None:
+      raise errors.OpPrereqError("No such node name '%s'" % name)
+    wanted.append(node)
 
-  else:
-    wanted = lu.cfg.GetNodeList()
   return utils.NiceSort(wanted)
 
 
 def _GetWantedInstances(lu, instances):
   """Returns list of checked and expanded instance names.
 
-  Args:
-    instances: List of instances (strings) or None for all
+  @type lu: L{LogicalUnit}
+  @param lu: the logical unit on whose behalf we execute
+  @type instances: list
+  @param instances: list of instance names or None for all instances
+  @rtype: list
+  @return: the list of instances, sorted
+  @raise errors.OpPrereqError: if the instances parameter is wrong type
+  @raise errors.OpPrereqError: if any of the passed instances is not found
 
   """
   if not isinstance(instances, list):
@@ -356,43 +392,93 @@ def _GetWantedInstances(lu, instances):
       wanted.append(instance)
 
   else:
-    wanted = lu.cfg.GetInstanceList()
-  return utils.NiceSort(wanted)
+    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
+  return wanted
 
 
 def _CheckOutputFields(static, dynamic, selected):
   """Checks whether all selected fields are valid.
 
-  Args:
-    static: Static fields
-    dynamic: Dynamic fields
+  @type static: L{utils.FieldSet}
+  @param static: static fields set
+  @type dynamic: L{utils.FieldSet}
+  @param dynamic: dynamic fields set
 
   """
-  static_fields = frozenset(static)
-  dynamic_fields = frozenset(dynamic)
+  f = utils.FieldSet()
+  f.Extend(static)
+  f.Extend(dynamic)
 
-  all_fields = static_fields | dynamic_fields
-
-  if not all_fields.issuperset(selected):
+  delta = f.NonMatching(selected)
+  if delta:
     raise errors.OpPrereqError("Unknown output fields selected: %s"
-                               % ",".join(frozenset(selected).
-                                          difference(all_fields)))
+                               % ",".join(delta))
+
+
+def _CheckBooleanOpField(op, name):
+  """Validates boolean opcode parameters.
+
+  This will ensure that an opcode parameter is either a boolean value,
+  or None (but that it always exists).
+
+  """
+  val = getattr(op, name, None)
+  if not (val is None or isinstance(val, bool)):
+    raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
+                               (name, str(val)))
+  setattr(op, name, val)
+
+
+def _CheckNodeOnline(lu, node):
+  """Ensure that a given node is online.
+
+  @param lu: the LU on behalf of which we make the check
+  @param node: the node to check
+  @raise errors.OpPrereqError: if the nodes is offline
+
+  """
+  if lu.cfg.GetNodeInfo(node).offline:
+    raise errors.OpPrereqError("Can't use offline node %s" % node)
 
 
 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
                           memory, vcpus, nics):
-  """Builds instance related env variables for hooks from single variables.
+  """Builds instance related env variables for hooks
+
+  This builds the hook environment from individual variables.
+
+  @type name: string
+  @param name: the name of the instance
+  @type primary_node: string
+  @param primary_node: the name of the instance's primary node
+  @type secondary_nodes: list
+  @param secondary_nodes: list of secondary nodes as strings
+  @type os_type: string
+  @param os_type: the name of the instance's OS
+  @type status: boolean
+  @param status: the should_run status of the instance
+  @type memory: string
+  @param memory: the memory size of the instance
+  @type vcpus: string
+  @param vcpus: the count of VCPUs the instance has
+  @type nics: list
+  @param nics: list of tuples (ip, bridge, mac) representing
+      the NICs the instance  has
+  @rtype: dict
+  @return: the hook environment for this instance
 
-  Args:
-    secondary_nodes: List of secondary nodes as strings
   """
+  if status:
+    str_status = "up"
+  else:
+    str_status = "down"
   env = {
     "OP_TARGET": name,
     "INSTANCE_NAME": name,
     "INSTANCE_PRIMARY": primary_node,
     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
     "INSTANCE_OS_TYPE": os_type,
-    "INSTANCE_STATUS": status,
+    "INSTANCE_STATUS": str_status,
     "INSTANCE_MEMORY": memory,
     "INSTANCE_VCPUS": vcpus,
   }
@@ -413,21 +499,30 @@ def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
   return env
 
 
-def _BuildInstanceHookEnvByObject(instance, override=None):
+def _BuildInstanceHookEnvByObject(lu, instance, override=None):
   """Builds instance related env variables for hooks from an object.
 
-  Args:
-    instance: objects.Instance object of instance
-    override: dict of values to override
+  @type lu: L{LogicalUnit}
+  @param lu: the logical unit on whose behalf we execute
+  @type instance: L{objects.Instance}
+  @param instance: the instance for which we should build the
+      environment
+  @type override: dict
+  @param override: dictionary with key/values that will override
+      our values
+  @rtype: dict
+  @return: the hook environment dictionary
+
   """
+  bep = lu.cfg.GetClusterInfo().FillBE(instance)
   args = {
     'name': instance.name,
     'primary_node': instance.primary_node,
     'secondary_nodes': instance.secondary_nodes,
     'os_type': instance.os,
-    'status': instance.os,
-    'memory': instance.memory,
-    'vcpus': instance.vcpus,
+    'status': instance.admin_up,
+    'memory': bep[constants.BE_MEMORY],
+    'vcpus': bep[constants.BE_VCPUS],
     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
   }
   if override:
@@ -435,14 +530,32 @@ def _BuildInstanceHookEnvByObject(instance, override=None):
   return _BuildInstanceHookEnv(**args)
 
 
-def _CheckInstanceBridgesExist(instance):
+def _AdjustCandidatePool(lu):
+  """Adjust the candidate pool after node operations.
+
+  """
+  mod_list = lu.cfg.MaintainCandidatePool()
+  if mod_list:
+    lu.LogInfo("Promoted nodes to master candidate role: %s",
+               ", ".join(node.name for node in mod_list))
+    for name in mod_list:
+      lu.context.ReaddNode(name)
+  mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
+  if mc_now > mc_max:
+    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
+               (mc_now, mc_max))
+
+
+def _CheckInstanceBridgesExist(lu, instance):
   """Check that the brigdes needed by an instance exist.
 
   """
   # check bridges existance
   brlist = [nic.bridge for nic in instance.nics]
-  if not rpc.call_bridges_exist(instance.primary_node, brlist):
-    raise errors.OpPrereqError("one or more target bridges %s does not"
+  result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
+  result.Raise()
+  if not result.data:
+    raise errors.OpPrereqError("One or more target bridges %s does not"
                                " exist on destination node '%s'" %
                                (brlist, instance.primary_node))
 
@@ -461,7 +574,7 @@ class LUDestroyCluster(NoHooksLU):
     Any errors are signalled by raising errors.OpPrereqError.
 
     """
-    master = self.sstore.GetMasterNode()
+    master = self.cfg.GetMasterNode()
 
     nodelist = self.cfg.GetNodeList()
     if len(nodelist) != 1 or nodelist[0] != master:
@@ -476,8 +589,10 @@ class LUDestroyCluster(NoHooksLU):
     """Destroys the cluster.
 
     """
-    master = self.sstore.GetMasterNode()
-    if not rpc.call_node_stop_master(master, False):
+    master = self.cfg.GetMasterNode()
+    result = self.rpc.call_node_stop_master(master, False)
+    result.Raise()
+    if not result.data:
       raise errors.OpExecError("Could not disable the master role")
     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
     utils.CreateBackup(priv_key)
@@ -492,37 +607,72 @@ class LUVerifyCluster(LogicalUnit):
   HPATH = "cluster-verify"
   HTYPE = constants.HTYPE_CLUSTER
   _OP_REQP = ["skip_checks"]
+  REQ_BGL = False
 
-  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
-                  remote_version, feedback_fn):
+  def ExpandNames(self):
+    self.needed_locks = {
+      locking.LEVEL_NODE: locking.ALL_SET,
+      locking.LEVEL_INSTANCE: locking.ALL_SET,
+    }
+    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
+
+  def _VerifyNode(self, nodeinfo, file_list, local_cksum,
+                  node_result, feedback_fn, master_files,
+                  drbd_map):
     """Run multiple tests against a node.
 
     Test list:
+
       - compares ganeti version
       - checks vg existance and size > 20G
       - checks config file checksum
       - checks ssh to other nodes
 
-    Args:
-      node: name of the node to check
-      file_list: required list of files
-      local_cksum: dictionary of local files and their checksums
+    @type nodeinfo: L{objects.Node}
+    @param nodeinfo: the node to check
+    @param file_list: required list of files
+    @param local_cksum: dictionary of local files and their checksums
+    @param node_result: the results from the node
+    @param feedback_fn: function used to accumulate results
+    @param master_files: list of files that only masters should have
+    @param drbd_map: the useddrbd minors for this node, in
+        form of minor: (instance, must_exist) which correspond to instances
+        and their running status
 
     """
+    node = nodeinfo.name
+
+    # main result, node_result should be a non-empty dict
+    if not node_result or not isinstance(node_result, dict):
+      feedback_fn("  - ERROR: unable to verify node %s." % (node,))
+      return True
+
     # compares ganeti version
     local_version = constants.PROTOCOL_VERSION
-    if not remote_version:
+    remote_version = node_result.get('version', None)
+    if not (remote_version and isinstance(remote_version, (list, tuple)) and
+            len(remote_version) == 2):
       feedback_fn("  - ERROR: connection to %s failed" % (node))
       return True
 
-    if local_version != remote_version:
-      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
-                      (local_version, node, remote_version))
+    if local_version != remote_version[0]:
+      feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
+                  " node %s %s" % (local_version, node, remote_version[0]))
       return True
 
-    # checks vg existance and size > 20G
+    # node seems compatible, we can actually try to look into its results
 
     bad = False
+
+    # full package version
+    if constants.RELEASE_VERSION != remote_version[1]:
+      feedback_fn("  - WARNING: software version mismatch: master %s,"
+                  " node %s %s" %
+                  (constants.RELEASE_VERSION, node, remote_version[1]))
+
+    # checks vg existence and size > 20G
+
+    vglist = node_result.get(constants.NV_VGLIST, None)
     if not vglist:
       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
                       (node,))
@@ -535,48 +685,80 @@ class LUVerifyCluster(LogicalUnit):
         bad = True
 
     # checks config file checksum
-    # checks ssh to any
 
-    if 'filelist' not in node_result:
+    remote_cksum = node_result.get(constants.NV_FILELIST, None)
+    if not isinstance(remote_cksum, dict):
       bad = True
       feedback_fn("  - ERROR: node hasn't returned file checksum data")
     else:
-      remote_cksum = node_result['filelist']
       for file_name in file_list:
+        node_is_mc = nodeinfo.master_candidate
+        must_have_file = file_name not in master_files
         if file_name not in remote_cksum:
-          bad = True
-          feedback_fn("  - ERROR: file '%s' missing" % file_name)
+          if node_is_mc or must_have_file:
+            bad = True
+            feedback_fn("  - ERROR: file '%s' missing" % file_name)
         elif remote_cksum[file_name] != local_cksum[file_name]:
-          bad = True
-          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
+          if node_is_mc or must_have_file:
+            bad = True
+            feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
+          else:
+            # not candidate and this is not a must-have file
+            bad = True
+            feedback_fn("  - ERROR: non master-candidate has old/wrong file"
+                        " '%s'" % file_name)
+        else:
+          # all good, except non-master/non-must have combination
+          if not node_is_mc and not must_have_file:
+            feedback_fn("  - ERROR: file '%s' should not exist on non master"
+                        " candidates" % file_name)
 
-    if 'nodelist' not in node_result:
+    # checks ssh to any
+
+    if constants.NV_NODELIST not in node_result:
       bad = True
       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
     else:
-      if node_result['nodelist']:
+      if node_result[constants.NV_NODELIST]:
         bad = True
-        for node in node_result['nodelist']:
+        for node in node_result[constants.NV_NODELIST]:
           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
-                          (node, node_result['nodelist'][node]))
-    if 'node-net-test' not in node_result:
+                          (node, node_result[constants.NV_NODELIST][node]))
+
+    if constants.NV_NODENETTEST not in node_result:
       bad = True
       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
     else:
-      if node_result['node-net-test']:
+      if node_result[constants.NV_NODENETTEST]:
         bad = True
-        nlist = utils.NiceSort(node_result['node-net-test'].keys())
+        nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
         for node in nlist:
           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
-                          (node, node_result['node-net-test'][node]))
+                          (node, node_result[constants.NV_NODENETTEST][node]))
+
+    hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
+    if isinstance(hyp_result, dict):
+      for hv_name, hv_result in hyp_result.iteritems():
+        if hv_result is not None:
+          feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
+                      (hv_name, hv_result))
+
+    # check used drbd list
+    used_minors = node_result.get(constants.NV_DRBDLIST, [])
+    for minor, (iname, must_exist) in drbd_map.items():
+      if minor not in used_minors and must_exist:
+        feedback_fn("  - ERROR: drbd minor %d of instance %s is not active" %
+                    (minor, iname))
+        bad = True
+    for minor in used_minors:
+      if minor not in drbd_map:
+        feedback_fn("  - ERROR: unallocated drbd minor %d is in use" % minor)
+        bad = True
 
-    hyp_result = node_result.get('hypervisor', None)
-    if hyp_result is not None:
-      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
     return bad
 
   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
-                      node_instance, feedback_fn):
+                      node_instance, feedback_fn, n_offline):
     """Verify an instance.
 
     This function checks to see if the required block devices are
@@ -591,15 +773,19 @@ class LUVerifyCluster(LogicalUnit):
     instanceconfig.MapLVsByNode(node_vol_should)
 
     for node in node_vol_should:
+      if node in n_offline:
+        # ignore missing volumes on offline nodes
+        continue
       for volume in node_vol_should[node]:
         if node not in node_vol_is or volume not in node_vol_is[node]:
           feedback_fn("  - ERROR: volume %s missing on node %s" %
                           (volume, node))
           bad = True
 
-    if not instanceconfig.status == 'down':
-      if (node_current not in node_instance or
-          not instance in node_instance[node_current]):
+    if instanceconfig.admin_up:
+      if ((node_current not in node_instance or
+          not instance in node_instance[node_current]) and
+          node_current not in n_offline):
         feedback_fn("  - ERROR: instance %s not running on node %s" %
                         (instance, node_current))
         bad = True
@@ -666,7 +852,9 @@ class LUVerifyCluster(LogicalUnit):
       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
         needed_mem = 0
         for instance in instances:
-          needed_mem += instance_cfg[instance].memory
+          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
+          if bep[constants.BE_AUTO_BALANCE]:
+            needed_mem += bep[constants.BE_MEMORY]
         if nodeinfo['mfree'] < needed_mem:
           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
                       " failovers should node %s fail" % (node, prinode))
@@ -706,10 +894,15 @@ class LUVerifyCluster(LogicalUnit):
       feedback_fn("  - ERROR: %s" % msg)
 
     vg_name = self.cfg.GetVGName()
+    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
     nodelist = utils.NiceSort(self.cfg.GetNodeList())
     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
+    instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
+                        for iname in instancelist)
     i_non_redundant = [] # Non redundant instances
+    i_non_a_balanced = [] # Non auto-balanced instances
+    n_offline = [] # List of offline nodes
     node_volume = {}
     node_instance = {}
     node_info = {}
@@ -717,68 +910,103 @@ class LUVerifyCluster(LogicalUnit):
 
     # FIXME: verify OS list
     # do local checksums
-    file_names = list(self.sstore.GetFileList())
+    master_files = [constants.CLUSTER_CONF_FILE]
+
+    file_names = ssconf.SimpleStore().GetFileList()
     file_names.append(constants.SSL_CERT_FILE)
-    file_names.append(constants.CLUSTER_CONF_FILE)
+    file_names.append(constants.RAPI_CERT_FILE)
+    file_names.extend(master_files)
+
     local_checksums = utils.FingerprintFiles(file_names)
 
     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
-    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
-    all_instanceinfo = rpc.call_instance_list(nodelist)
-    all_vglist = rpc.call_vg_list(nodelist)
     node_verify_param = {
-      'filelist': file_names,
-      'nodelist': nodelist,
-      'hypervisor': None,
-      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
-                        for node in nodeinfo]
+      constants.NV_FILELIST: file_names,
+      constants.NV_NODELIST: [node.name for node in nodeinfo
+                              if not node.offline],
+      constants.NV_HYPERVISOR: hypervisors,
+      constants.NV_NODENETTEST: [(node.name, node.primary_ip,
+                                  node.secondary_ip) for node in nodeinfo
+                                 if not node.offline],
+      constants.NV_LVLIST: vg_name,
+      constants.NV_INSTANCELIST: hypervisors,
+      constants.NV_VGLIST: None,
+      constants.NV_VERSION: None,
+      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
+      constants.NV_DRBDLIST: None,
       }
-    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
-    all_rversion = rpc.call_version(nodelist)
-    all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
+    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
+                                           self.cfg.GetClusterName())
 
-    for node in nodelist:
-      feedback_fn("* Verifying node %s" % node)
-      result = self._VerifyNode(node, file_names, local_checksums,
-                                all_vglist[node], all_nvinfo[node],
-                                all_rversion[node], feedback_fn)
-      bad = bad or result
+    cluster = self.cfg.GetClusterInfo()
+    master_node = self.cfg.GetMasterNode()
+    all_drbd_map = self.cfg.ComputeDRBDMap()
 
-      # node_volume
-      volumeinfo = all_volumeinfo[node]
+    for node_i in nodeinfo:
+      node = node_i.name
+      nresult = all_nvinfo[node].data
+
+      if node_i.offline:
+        feedback_fn("* Skipping offline node %s" % (node,))
+        n_offline.append(node)
+        continue
+
+      if node == master_node:
+        ntype = "master"
+      elif node_i.master_candidate:
+        ntype = "master candidate"
+      else:
+        ntype = "regular"
+      feedback_fn("* Verifying node %s (%s)" % (node, ntype))
+
+      if all_nvinfo[node].failed or not isinstance(nresult, dict):
+        feedback_fn("  - ERROR: connection to %s failed" % (node,))
+        bad = True
+        continue
+
+      node_drbd = {}
+      for minor, instance in all_drbd_map[node].items():
+        instance = instanceinfo[instance]
+        node_drbd[minor] = (instance.name, instance.admin_up)
+      result = self._VerifyNode(node_i, file_names, local_checksums,
+                                nresult, feedback_fn, master_files,
+                                node_drbd)
+      bad = bad or result
 
-      if isinstance(volumeinfo, basestring):
+      lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
+      if isinstance(lvdata, basestring):
         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
-                    (node, volumeinfo[-400:].encode('string_escape')))
+                    (node, utils.SafeEncode(lvdata)))
         bad = True
         node_volume[node] = {}
-      elif not isinstance(volumeinfo, dict):
-        feedback_fn("  - ERROR: connection to %s failed" % (node,))
+      elif not isinstance(lvdata, dict):
+        feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
         bad = True
         continue
       else:
-        node_volume[node] = volumeinfo
+        node_volume[node] = lvdata
 
       # node_instance
-      nodeinstance = all_instanceinfo[node]
-      if type(nodeinstance) != list:
-        feedback_fn("  - ERROR: connection to %s failed" % (node,))
+      idata = nresult.get(constants.NV_INSTANCELIST, None)
+      if not isinstance(idata, list):
+        feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
+                    (node,))
         bad = True
         continue
 
-      node_instance[node] = nodeinstance
+      node_instance[node] = idata
 
       # node_info
-      nodeinfo = all_ninfo[node]
+      nodeinfo = nresult.get(constants.NV_HVINFO, None)
       if not isinstance(nodeinfo, dict):
-        feedback_fn("  - ERROR: connection to %s failed" % (node,))
+        feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
         bad = True
         continue
 
       try:
         node_info[node] = {
           "mfree": int(nodeinfo['memory_free']),
-          "dfree": int(nodeinfo['vg_free']),
+          "dfree": int(nresult[constants.NV_VGLIST][vg_name]),
           "pinst": [],
           "sinst": [],
           # dictionary holding all instances this node is secondary for,
@@ -798,10 +1026,11 @@ class LUVerifyCluster(LogicalUnit):
 
     for instance in instancelist:
       feedback_fn("* Verifying instance %s" % instance)
-      inst_config = self.cfg.GetInstanceInfo(instance)
+      inst_config = instanceinfo[instance]
       result =  self._VerifyInstance(instance, inst_config, node_volume,
-                                     node_instance, feedback_fn)
+                                     node_instance, feedback_fn, n_offline)
       bad = bad or result
+      inst_nodes_offline = []
 
       inst_config.MapLVsByNode(node_vol_should)
 
@@ -810,11 +1039,14 @@ class LUVerifyCluster(LogicalUnit):
       pnode = inst_config.primary_node
       if pnode in node_info:
         node_info[pnode]['pinst'].append(instance)
-      else:
+      elif pnode not in n_offline:
         feedback_fn("  - ERROR: instance %s, connection to primary node"
                     " %s failed" % (instance, pnode))
         bad = True
 
+      if pnode in n_offline:
+        inst_nodes_offline.append(pnode)
+
       # If the instance is non-redundant we cannot survive losing its primary
       # node, so we are not N+1 compliant. On the other hand we have no disk
       # templates with more than one secondary so that situation is not well
@@ -826,15 +1058,27 @@ class LUVerifyCluster(LogicalUnit):
         feedback_fn("  - WARNING: multiple secondaries for instance %s"
                     % instance)
 
+      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
+        i_non_a_balanced.append(instance)
+
       for snode in inst_config.secondary_nodes:
         if snode in node_info:
           node_info[snode]['sinst'].append(instance)
           if pnode not in node_info[snode]['sinst-by-pnode']:
             node_info[snode]['sinst-by-pnode'][pnode] = []
           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
-        else:
+        elif snode not in n_offline:
           feedback_fn("  - ERROR: instance %s, connection to secondary node"
                       " %s failed" % (instance, snode))
+          bad = True
+        if snode in n_offline:
+          inst_nodes_offline.append(snode)
+
+      if inst_nodes_offline:
+        # warn that the instance lives on offline nodes, and set bad=True
+        feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
+                    ", ".join(inst_nodes_offline))
+        bad = True
 
     feedback_fn("* Verifying orphan volumes")
     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
@@ -856,17 +1100,28 @@ class LUVerifyCluster(LogicalUnit):
       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
                   % len(i_non_redundant))
 
+    if i_non_a_balanced:
+      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
+                  % len(i_non_a_balanced))
+
+    if n_offline:
+      feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
+
     return not bad
 
   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
-    """Analize the post-hooks' result, handle it, and send some
+    """Analize the post-hooks' result
+
+    This method analyses the hook result, handles it, and sends some
     nicely-formatted feedback back to the user.
 
-    Args:
-      phase: the hooks phase that has just been run
-      hooks_results: the results of the multi-node hooks rpc call
-      feedback_fn: function to send feedback back to the caller
-      lu_result: previous Exec result
+    @param phase: one of L{constants.HOOKS_PHASE_POST} or
+        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
+    @param hooks_results: the results of the multi-node hooks rpc call
+    @param feedback_fn: function used send feedback back to the caller
+    @param lu_result: previous Exec result
+    @return: the new Exec result, based on the previous result
+        and hook results
 
     """
     # We only really run POST phase hooks, and are only interested in
@@ -882,11 +1137,14 @@ class LUVerifyCluster(LogicalUnit):
         for node_name in hooks_results:
           show_node_header = True
           res = hooks_results[node_name]
-          if res is False or not isinstance(res, list):
-            feedback_fn("    Communication failure")
+          if res.failed or res.data is False or not isinstance(res.data, list):
+            if res.offline:
+              # no need to warn or set fail return value
+              continue
+            feedback_fn("    Communication failure in hooks execution")
             lu_result = 1
             continue
-          for script, hkr, output in res:
+          for script, hkr, output in res.data:
             if hkr == constants.HKR_FAIL:
               # The node header is only shown once, if there are
               # failing hooks on that node
@@ -906,6 +1164,14 @@ class LUVerifyDisks(NoHooksLU):
 
   """
   _OP_REQP = []
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    self.needed_locks = {
+      locking.LEVEL_NODE: locking.ALL_SET,
+      locking.LEVEL_INSTANCE: locking.ALL_SET,
+    }
+    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
 
   def CheckPrereq(self):
     """Check prerequisites.
@@ -929,7 +1195,7 @@ class LUVerifyDisks(NoHooksLU):
     nv_dict = {}
     for inst in instances:
       inst_lvs = {}
-      if (inst.status != "up" or
+      if (not inst.admin_up or
           inst.disk_template not in constants.DTS_NET_MIRROR):
         continue
       inst.MapLVsByNode(inst_lvs)
@@ -941,19 +1207,24 @@ class LUVerifyDisks(NoHooksLU):
     if not nv_dict:
       return result
 
-    node_lvs = rpc.call_volume_list(nodes, vg_name)
+    node_lvs = self.rpc.call_volume_list(nodes, vg_name)
 
     to_act = set()
     for node in nodes:
       # node_volume
       lvs = node_lvs[node]
-
+      if lvs.failed:
+        if not lvs.offline:
+          self.LogWarning("Connection to node %s failed: %s" %
+                          (node, lvs.data))
+        continue
+      lvs = lvs.data
       if isinstance(lvs, basestring):
-        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
+        logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
         res_nlvm[node] = lvs
       elif not isinstance(lvs, dict):
-        logger.Info("connection to node %s failed or invalid data returned" %
-                    (node,))
+        logging.warning("Connection to node %s failed or invalid data"
+                        " returned", node)
         res_nodes.append(node)
         continue
 
@@ -980,17 +1251,16 @@ class LURenameCluster(LogicalUnit):
   HPATH = "cluster-rename"
   HTYPE = constants.HTYPE_CLUSTER
   _OP_REQP = ["name"]
-  REQ_WSSTORE = True
 
   def BuildHooksEnv(self):
     """Build hooks env.
 
     """
     env = {
-      "OP_TARGET": self.sstore.GetClusterName(),
+      "OP_TARGET": self.cfg.GetClusterName(),
       "NEW_NAME": self.op.name,
       }
-    mn = self.sstore.GetMasterNode()
+    mn = self.cfg.GetMasterNode()
     return env, [mn], [mn]
 
   def CheckPrereq(self):
@@ -1001,8 +1271,8 @@ class LURenameCluster(LogicalUnit):
 
     new_name = hostname.name
     self.ip = new_ip = hostname.ip
-    old_name = self.sstore.GetClusterName()
-    old_ip = self.sstore.GetMasterIP()
+    old_name = self.cfg.GetClusterName()
+    old_ip = self.cfg.GetMasterIP()
     if new_name == old_name and new_ip == old_ip:
       raise errors.OpPrereqError("Neither the name nor the IP address of the"
                                  " cluster has changed")
@@ -1020,46 +1290,47 @@ class LURenameCluster(LogicalUnit):
     """
     clustername = self.op.name
     ip = self.ip
-    ss = self.sstore
 
     # shutdown the master IP
-    master = ss.GetMasterNode()
-    if not rpc.call_node_stop_master(master, False):
+    master = self.cfg.GetMasterNode()
+    result = self.rpc.call_node_stop_master(master, False)
+    if result.failed or not result.data:
       raise errors.OpExecError("Could not disable the master role")
 
     try:
-      # modify the sstore
-      ss.SetKey(ss.SS_MASTER_IP, ip)
-      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
-
-      # Distribute updated ss config to all nodes
-      myself = self.cfg.GetNodeInfo(master)
-      dist_nodes = self.cfg.GetNodeList()
-      if myself.name in dist_nodes:
-        dist_nodes.remove(myself.name)
-
-      logger.Debug("Copying updated ssconf data to all nodes")
-      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
-        fname = ss.KeyToFilename(keyname)
-        result = rpc.call_upload_file(dist_nodes, fname)
-        for to_node in dist_nodes:
-          if not result[to_node]:
-            logger.Error("copy of file %s to node %s failed" %
-                         (fname, to_node))
+      cluster = self.cfg.GetClusterInfo()
+      cluster.cluster_name = clustername
+      cluster.master_ip = ip
+      self.cfg.Update(cluster)
+
+      # update the known hosts file
+      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
+      node_list = self.cfg.GetNodeList()
+      try:
+        node_list.remove(master)
+      except ValueError:
+        pass
+      result = self.rpc.call_upload_file(node_list,
+                                         constants.SSH_KNOWN_HOSTS_FILE)
+      for to_node, to_result in result.iteritems():
+        if to_result.failed or not to_result.data:
+          logging.error("Copy of file %s to node %s failed",
+                        constants.SSH_KNOWN_HOSTS_FILE, to_node)
+
     finally:
-      if not rpc.call_node_start_master(master, False):
-        logger.Error("Could not re-enable the master role on the master,"
-                     " please restart manually.")
+      result = self.rpc.call_node_start_master(master, False)
+      if result.failed or not result.data:
+        self.LogWarning("Could not re-enable the master role on"
+                        " the master, please restart manually.")
 
 
 def _RecursiveCheckIfLVMBased(disk):
   """Check if the given disk or its children are lvm-based.
 
-  Args:
-    disk: ganeti.objects.Disk object
-
-  Returns:
-    boolean indicating whether a LD_LV dev_type was found or not
+  @type disk: L{objects.Disk}
+  @param disk: the disk to check
+  @rtype: booleean
+  @return: boolean indicating whether a LD_LV dev_type was found or not
 
   """
   if disk.children:
@@ -1076,16 +1347,40 @@ class LUSetClusterParams(LogicalUnit):
   HPATH = "cluster-modify"
   HTYPE = constants.HTYPE_CLUSTER
   _OP_REQP = []
+  REQ_BGL = False
+
+  def CheckParameters(self):
+    """Check parameters
+
+    """
+    if not hasattr(self.op, "candidate_pool_size"):
+      self.op.candidate_pool_size = None
+    if self.op.candidate_pool_size is not None:
+      try:
+        self.op.candidate_pool_size = int(self.op.candidate_pool_size)
+      except ValueError, err:
+        raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
+                                   str(err))
+      if self.op.candidate_pool_size < 1:
+        raise errors.OpPrereqError("At least one master candidate needed")
+
+  def ExpandNames(self):
+    # FIXME: in the future maybe other cluster params won't require checking on
+    # all nodes to be modified.
+    self.needed_locks = {
+      locking.LEVEL_NODE: locking.ALL_SET,
+    }
+    self.share_locks[locking.LEVEL_NODE] = 1
 
   def BuildHooksEnv(self):
     """Build hooks env.
 
     """
     env = {
-      "OP_TARGET": self.sstore.GetClusterName(),
+      "OP_TARGET": self.cfg.GetClusterName(),
       "NEW_VG_NAME": self.op.vg_name,
       }
-    mn = self.sstore.GetMasterNode()
+    mn = self.cfg.GetMasterNode()
     return env, [mn], [mn]
 
   def CheckPrereq(self):
@@ -1095,38 +1390,122 @@ class LUSetClusterParams(LogicalUnit):
     if the given volume group is valid.
 
     """
-    if not self.op.vg_name:
-      instances = [self.cfg.GetInstanceInfo(name)
-                   for name in self.cfg.GetInstanceList()]
+    # FIXME: This only works because there is only one parameter that can be
+    # changed or removed.
+    if self.op.vg_name is not None and not self.op.vg_name:
+      instances = self.cfg.GetAllInstancesInfo().values()
       for inst in instances:
         for disk in inst.disks:
           if _RecursiveCheckIfLVMBased(disk):
             raise errors.OpPrereqError("Cannot disable lvm storage while"
                                        " lvm-based instances exist")
 
+    node_list = self.acquired_locks[locking.LEVEL_NODE]
+
     # if vg_name not None, checks given volume group on all nodes
     if self.op.vg_name:
-      node_list = self.cfg.GetNodeList()
-      vglist = rpc.call_vg_list(node_list)
+      vglist = self.rpc.call_vg_list(node_list)
       for node in node_list:
-        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
+        if vglist[node].failed:
+          # ignoring down node
+          self.LogWarning("Node %s unreachable/error, ignoring" % node)
+          continue
+        vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
+                                              self.op.vg_name,
                                               constants.MIN_VG_SIZE)
         if vgstatus:
           raise errors.OpPrereqError("Error on node '%s': %s" %
                                      (node, vgstatus))
 
+    self.cluster = cluster = self.cfg.GetClusterInfo()
+    # validate beparams changes
+    if self.op.beparams:
+      utils.CheckBEParams(self.op.beparams)
+      self.new_beparams = cluster.FillDict(
+        cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
+
+    # hypervisor list/parameters
+    self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
+    if self.op.hvparams:
+      if not isinstance(self.op.hvparams, dict):
+        raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
+      for hv_name, hv_dict in self.op.hvparams.items():
+        if hv_name not in self.new_hvparams:
+          self.new_hvparams[hv_name] = hv_dict
+        else:
+          self.new_hvparams[hv_name].update(hv_dict)
+
+    if self.op.enabled_hypervisors is not None:
+      self.hv_list = self.op.enabled_hypervisors
+    else:
+      self.hv_list = cluster.enabled_hypervisors
+
+    if self.op.hvparams or self.op.enabled_hypervisors is not None:
+      # either the enabled list has changed, or the parameters have, validate
+      for hv_name, hv_params in self.new_hvparams.items():
+        if ((self.op.hvparams and hv_name in self.op.hvparams) or
+            (self.op.enabled_hypervisors and
+             hv_name in self.op.enabled_hypervisors)):
+          # either this is a new hypervisor, or its parameters have changed
+          hv_class = hypervisor.GetHypervisor(hv_name)
+          hv_class.CheckParameterSyntax(hv_params)
+          _CheckHVParams(self, node_list, hv_name, hv_params)
+
   def Exec(self, feedback_fn):
     """Change the parameters of the cluster.
 
     """
-    if self.op.vg_name != self.cfg.GetVGName():
-      self.cfg.SetVGName(self.op.vg_name)
-    else:
-      feedback_fn("Cluster LVM configuration already in desired"
-                  " state, not changing")
+    if self.op.vg_name is not None:
+      if self.op.vg_name != self.cfg.GetVGName():
+        self.cfg.SetVGName(self.op.vg_name)
+      else:
+        feedback_fn("Cluster LVM configuration already in desired"
+                    " state, not changing")
+    if self.op.hvparams:
+      self.cluster.hvparams = self.new_hvparams
+    if self.op.enabled_hypervisors is not None:
+      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
+    if self.op.beparams:
+      self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
+    if self.op.candidate_pool_size is not None:
+      self.cluster.candidate_pool_size = self.op.candidate_pool_size
+
+    self.cfg.Update(self.cluster)
+
+    # we want to update nodes after the cluster so that if any errors
+    # happen, we have recorded and saved the cluster info
+    if self.op.candidate_pool_size is not None:
+      _AdjustCandidatePool(self)
+
 
+class LURedistributeConfig(NoHooksLU):
+  """Force the redistribution of cluster configuration.
 
-def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
+  This is a very simple LU.
+
+  """
+  _OP_REQP = []
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    self.needed_locks = {
+      locking.LEVEL_NODE: locking.ALL_SET,
+    }
+    self.share_locks[locking.LEVEL_NODE] = 1
+
+  def CheckPrereq(self):
+    """Check prerequisites.
+
+    """
+
+  def Exec(self, feedback_fn):
+    """Redistribute the configuration.
+
+    """
+    self.cfg.Update(self.cfg.GetClusterInfo())
+
+
+def _WaitForSync(lu, instance, oneshot=False, unlock=False):
   """Sleep and poll for an instance's disk to sync.
 
   """
@@ -1134,33 +1513,33 @@ def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
     return True
 
   if not oneshot:
-    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
+    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
 
   node = instance.primary_node
 
   for dev in instance.disks:
-    cfgw.SetDiskID(dev, node)
+    lu.cfg.SetDiskID(dev, node)
 
   retries = 0
   while True:
     max_time = 0
     done = True
     cumul_degraded = False
-    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
-    if not rstats:
-      proc.LogWarning("Can't get any data from node %s" % node)
+    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
+    if rstats.failed or not rstats.data:
+      lu.LogWarning("Can't get any data from node %s", node)
       retries += 1
       if retries >= 10:
         raise errors.RemoteError("Can't contact node %s for mirror data,"
                                  " aborting." % node)
       time.sleep(6)
       continue
+    rstats = rstats.data
     retries = 0
-    for i in range(len(rstats)):
-      mstat = rstats[i]
+    for i, mstat in enumerate(rstats):
       if mstat is None:
-        proc.LogWarning("Can't compute data for node %s/%s" %
-                        (node, instance.disks[i].iv_name))
+        lu.LogWarning("Can't compute data for node %s/%s",
+                           node, instance.disks[i].iv_name)
         continue
       # we ignore the ldisk parameter
       perc_done, est_time, is_degraded, _ = mstat
@@ -1172,19 +1551,19 @@ def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
           max_time = est_time
         else:
           rem_time = "no time estimate"
-        proc.LogInfo("- device %s: %5.2f%% done, %s" %
-                     (instance.disks[i].iv_name, perc_done, rem_time))
+        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
+                        (instance.disks[i].iv_name, perc_done, rem_time))
     if done or oneshot:
       break
 
     time.sleep(min(60, max_time))
 
   if done:
-    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
+    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
   return not cumul_degraded
 
 
-def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
+def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
   """Check that mirrors are not degraded.
 
   The ldisk parameter, if True, will change the test from the
@@ -1192,7 +1571,7 @@ def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
   the device(s)) to the ldisk (representing the local storage status).
 
   """
-  cfgw.SetDiskID(dev, node)
+  lu.cfg.SetDiskID(dev, node)
   if ldisk:
     idx = 6
   else:
@@ -1200,15 +1579,15 @@ def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
 
   result = True
   if on_primary or dev.AssembleOnSecondary():
-    rstats = rpc.call_blockdev_find(node, dev)
-    if not rstats:
-      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
+    rstats = lu.rpc.call_blockdev_find(node, dev)
+    if rstats.failed or not rstats.data:
+      logging.warning("Node %s: disk degraded, not found or node down", node)
       result = False
     else:
-      result = result and (not rstats[idx])
+      result = result and (not rstats.data[idx])
   if dev.children:
     for child in dev.children:
-      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
+      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
 
   return result
 
@@ -1218,43 +1597,49 @@ class LUDiagnoseOS(NoHooksLU):
 
   """
   _OP_REQP = ["output_fields", "names"]
+  REQ_BGL = False
+  _FIELDS_STATIC = utils.FieldSet()
+  _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
 
-  def CheckPrereq(self):
-    """Check prerequisites.
-
-    This always succeeds, since this is a pure query LU.
-
-    """
+  def ExpandNames(self):
     if self.op.names:
       raise errors.OpPrereqError("Selective OS query not supported")
 
-    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
-    _CheckOutputFields(static=[],
-                       dynamic=self.dynamic_fields,
+    _CheckOutputFields(static=self._FIELDS_STATIC,
+                       dynamic=self._FIELDS_DYNAMIC,
                        selected=self.op.output_fields)
 
+    # Lock all nodes, in shared mode
+    self.needed_locks = {}
+    self.share_locks[locking.LEVEL_NODE] = 1
+    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+
+  def CheckPrereq(self):
+    """Check prerequisites.
+
+    """
+
   @staticmethod
   def _DiagnoseByOS(node_list, rlist):
     """Remaps a per-node return list into an a per-os per-node dictionary
 
-      Args:
-        node_list: a list with the names of all nodes
-        rlist: a map with node names as keys and OS objects as values
+    @param node_list: a list with the names of all nodes
+    @param rlist: a map with node names as keys and OS objects as values
+
+    @rtype: dict
+    @returns: a dictionary with osnames as keys and as value another map, with
+        nodes as keys and list of OS objects as values, eg::
 
-      Returns:
-        map: a map with osnames as keys and as value another map, with
-             nodes as
-             keys and list of OS objects as values
-             e.g. {"debian-etch": {"node1": [<object>,...],
-                                   "node2": [<object>,]}
-                  }
+          {"debian-etch": {"node1": [<object>,...],
+                           "node2": [<object>,]}
+          }
 
     """
     all_os = {}
     for node_name, nr in rlist.iteritems():
-      if not nr:
+      if nr.failed or not nr.data:
         continue
-      for os_obj in nr:
+      for os_obj in nr.data:
         if os_obj.name not in all_os:
           # build a list of nodes for this os containing empty lists
           # for each node in node_list
@@ -1268,11 +1653,13 @@ class LUDiagnoseOS(NoHooksLU):
     """Compute the list of OSes.
 
     """
-    node_list = self.cfg.GetNodeList()
-    node_data = rpc.call_os_diagnose(node_list)
+    node_list = self.acquired_locks[locking.LEVEL_NODE]
+    valid_nodes = [node for node in self.cfg.GetOnlineNodeList()
+                   if node in node_list]
+    node_data = self.rpc.call_os_diagnose(valid_nodes)
     if node_data == False:
       raise errors.OpExecError("Can't gather the list of OSes")
-    pol = self._DiagnoseByOS(node_list, node_data)
+    pol = self._DiagnoseByOS(valid_nodes, node_data)
     output = []
     for os_name, os_data in pol.iteritems():
       row = []
@@ -1333,18 +1720,15 @@ class LURemoveNode(LogicalUnit):
 
     instance_list = self.cfg.GetInstanceList()
 
-    masternode = self.sstore.GetMasterNode()
+    masternode = self.cfg.GetMasterNode()
     if node.name == masternode:
       raise errors.OpPrereqError("Node is the master node,"
                                  " you need to failover first.")
 
     for instance_name in instance_list:
       instance = self.cfg.GetInstanceInfo(instance_name)
-      if node.name == instance.primary_node:
-        raise errors.OpPrereqError("Instance %s still running on the node,"
-                                   " please remove first." % instance_name)
-      if node.name in instance.secondary_nodes:
-        raise errors.OpPrereqError("Instance %s has node as a secondary,"
+      if node.name in instance.all_nodes:
+        raise errors.OpPrereqError("Instance %s is still running on the node,"
                                    " please remove first." % instance_name)
     self.op.node_name = node.name
     self.node = node
@@ -1354,76 +1738,106 @@ class LURemoveNode(LogicalUnit):
 
     """
     node = self.node
-    logger.Info("stopping the node daemon and removing configs from node %s" %
-                node.name)
+    logging.info("Stopping the node daemon and removing configs from node %s",
+                 node.name)
 
     self.context.RemoveNode(node.name)
 
-    rpc.call_node_leave_cluster(node.name)
+    self.rpc.call_node_leave_cluster(node.name)
+
+    # Promote nodes to master candidate as needed
+    _AdjustCandidatePool(self)
 
 
 class LUQueryNodes(NoHooksLU):
   """Logical unit for querying nodes.
 
   """
-  _OP_REQP = ["output_fields", "names"]
+  _OP_REQP = ["output_fields", "names", "use_locking"]
   REQ_BGL = False
+  _FIELDS_DYNAMIC = utils.FieldSet(
+    "dtotal", "dfree",
+    "mtotal", "mnode", "mfree",
+    "bootid",
+    "ctotal",
+    )
+
+  _FIELDS_STATIC = utils.FieldSet(
+    "name", "pinst_cnt", "sinst_cnt",
+    "pinst_list", "sinst_list",
+    "pip", "sip", "tags",
+    "serial_no",
+    "master_candidate",
+    "master",
+    "offline",
+    )
 
   def ExpandNames(self):
-    self.dynamic_fields = frozenset([
-      "dtotal", "dfree",
-      "mtotal", "mnode", "mfree",
-      "bootid",
-      "ctotal",
-      ])
-
-    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
-                               "pinst_list", "sinst_list",
-                               "pip", "sip", "tags"],
-                       dynamic=self.dynamic_fields,
+    _CheckOutputFields(static=self._FIELDS_STATIC,
+                       dynamic=self._FIELDS_DYNAMIC,
                        selected=self.op.output_fields)
 
     self.needed_locks = {}
     self.share_locks[locking.LEVEL_NODE] = 1
-    # TODO: we could lock nodes only if the user asked for dynamic fields. For
-    # that we need atomic ways to get info for a group of nodes from the
-    # config, though.
-    if not self.op.names:
-      self.needed_locks[locking.LEVEL_NODE] = None
+
+    if self.op.names:
+      self.wanted = _GetWantedNodes(self, self.op.names)
     else:
-      self.needed_locks[locking.LEVEL_NODE] = \
-        _GetWantedNodes(self, self.op.names)
+      self.wanted = locking.ALL_SET
+
+    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
+    self.do_locking = self.do_node_query and self.op.use_locking
+    if self.do_locking:
+      # if we don't request only static fields, we need to lock the nodes
+      self.needed_locks[locking.LEVEL_NODE] = self.wanted
+
 
   def CheckPrereq(self):
     """Check prerequisites.
 
     """
-    # This of course is valid only if we locked the nodes
-    self.wanted = self.acquired_locks[locking.LEVEL_NODE]
+    # The validation of the node list is done in the _GetWantedNodes,
+    # if non empty, and if empty, there's no validation to do
+    pass
 
   def Exec(self, feedback_fn):
     """Computes the list of nodes and their attributes.
 
     """
-    nodenames = self.wanted
-    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
+    all_info = self.cfg.GetAllNodesInfo()
+    if self.do_locking:
+      nodenames = self.acquired_locks[locking.LEVEL_NODE]
+    elif self.wanted != locking.ALL_SET:
+      nodenames = self.wanted
+      missing = set(nodenames).difference(all_info.keys())
+      if missing:
+        raise errors.OpExecError(
+          "Some nodes were removed before retrieving their data: %s" % missing)
+    else:
+      nodenames = all_info.keys()
+
+    nodenames = utils.NiceSort(nodenames)
+    nodelist = [all_info[name] for name in nodenames]
 
     # begin data gathering
 
-    if self.dynamic_fields.intersection(self.op.output_fields):
+    if self.do_node_query:
       live_data = {}
-      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
+      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
+                                          self.cfg.GetHypervisorType())
       for name in nodenames:
-        nodeinfo = node_data.get(name, None)
-        if nodeinfo:
+        nodeinfo = node_data[name]
+        if not nodeinfo.failed and nodeinfo.data:
+          nodeinfo = nodeinfo.data
+          fn = utils.TryConvert
           live_data[name] = {
-            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
-            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
-            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
-            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
-            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
-            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
-            "bootid": nodeinfo['bootid'],
+            "mtotal": fn(int, nodeinfo.get('memory_total', None)),
+            "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
+            "mfree": fn(int, nodeinfo.get('memory_free', None)),
+            "dtotal": fn(int, nodeinfo.get('vg_size', None)),
+            "dfree": fn(int, nodeinfo.get('vg_free', None)),
+            "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
+            "bootid": nodeinfo.get('bootid', None),
             }
         else:
           live_data[name] = {}
@@ -1446,6 +1860,8 @@ class LUQueryNodes(NoHooksLU):
           if secnode in node_to_secondary:
             node_to_secondary[secnode].add(inst.name)
 
+    master_node = self.cfg.GetMasterNode()
+
     # end data gathering
 
     output = []
@@ -1468,7 +1884,15 @@ class LUQueryNodes(NoHooksLU):
           val = node.secondary_ip
         elif field == "tags":
           val = list(node.GetTags())
-        elif field in self.dynamic_fields:
+        elif field == "serial_no":
+          val = node.serial_no
+        elif field == "master_candidate":
+          val = node.master_candidate
+        elif field == "master":
+          val = node.name == master_node
+        elif field == "offline":
+          val = node.offline
+        elif self._FIELDS_DYNAMIC.Matches(field):
           val = live_data[node.name].get(field, None)
         else:
           raise errors.ParameterError(field)
@@ -1483,6 +1907,22 @@ class LUQueryNodeVolumes(NoHooksLU):
 
   """
   _OP_REQP = ["nodes", "output_fields"]
+  REQ_BGL = False
+  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
+  _FIELDS_STATIC = utils.FieldSet("node")
+
+  def ExpandNames(self):
+    _CheckOutputFields(static=self._FIELDS_STATIC,
+                       dynamic=self._FIELDS_DYNAMIC,
+                       selected=self.op.output_fields)
+
+    self.needed_locks = {}
+    self.share_locks[locking.LEVEL_NODE] = 1
+    if not self.op.nodes:
+      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+    else:
+      self.needed_locks[locking.LEVEL_NODE] = \
+        _GetWantedNodes(self, self.op.nodes)
 
   def CheckPrereq(self):
     """Check prerequisites.
@@ -1490,19 +1930,14 @@ class LUQueryNodeVolumes(NoHooksLU):
     This checks that the fields required are valid output fields.
 
     """
-    self.nodes = _GetWantedNodes(self, self.op.nodes)
-
-    _CheckOutputFields(static=["node"],
-                       dynamic=["phys", "vg", "name", "size", "instance"],
-                       selected=self.op.output_fields)
-
+    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
 
   def Exec(self, feedback_fn):
     """Computes the list of nodes and their attributes.
 
     """
     nodenames = self.nodes
-    volumes = rpc.call_node_volumes(nodenames)
+    volumes = self.rpc.call_node_volumes(nodenames)
 
     ilist = [self.cfg.GetInstanceInfo(iname) for iname
              in self.cfg.GetInstanceList()]
@@ -1511,10 +1946,10 @@ class LUQueryNodeVolumes(NoHooksLU):
 
     output = []
     for node in nodenames:
-      if node not in volumes or not volumes[node]:
+      if node not in volumes or volumes[node].failed or not volumes[node].data:
         continue
 
-      node_vols = volumes[node][:]
+      node_vols = volumes[node].data[:]
       node_vols.sort(key=lambda vol: vol['dev'])
 
       for vol in node_vols:
@@ -1623,7 +2058,7 @@ class LUAddNode(LogicalUnit):
 
     # check that the type of the node (single versus dual homed) is the
     # same as for the master
-    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
+    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
     master_singlehomed = myself.secondary_ip == myself.primary_ip
     newbie_singlehomed = secondary_ip == primary_ip
     if master_singlehomed != newbie_singlehomed:
@@ -1645,9 +2080,15 @@ class LUAddNode(LogicalUnit):
         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
                                    " based ping to noded port")
 
+    cp_size = self.cfg.GetClusterInfo().candidate_pool_size
+    mc_now, _ = self.cfg.GetMasterCandidateStats()
+    master_candidate = mc_now < cp_size
+
     self.new_node = objects.Node(name=node,
                                  primary_ip=primary_ip,
-                                 secondary_ip=secondary_ip)
+                                 secondary_ip=secondary_ip,
+                                 master_candidate=master_candidate,
+                                 offline=False)
 
   def Exec(self, feedback_fn):
     """Adds the new node to the cluster.
@@ -1657,20 +2098,21 @@ class LUAddNode(LogicalUnit):
     node = new_node.name
 
     # check connectivity
-    result = rpc.call_version([node])[node]
-    if result:
-      if constants.PROTOCOL_VERSION == result:
-        logger.Info("communication to node %s fine, sw version %s match" %
-                    (node, result))
+    result = self.rpc.call_version([node])[node]
+    result.Raise()
+    if result.data:
+      if constants.PROTOCOL_VERSION == result.data:
+        logging.info("Communication to node %s fine, sw version %s match",
+                     node, result.data)
       else:
         raise errors.OpExecError("Version mismatch master version %s,"
                                  " node version %s" %
-                                 (constants.PROTOCOL_VERSION, result))
+                                 (constants.PROTOCOL_VERSION, result.data))
     else:
       raise errors.OpExecError("Cannot get version from the new node")
 
     # setup ssh on node
-    logger.Info("copy ssh key to node %s" % node)
+    logging.info("Copy ssh key to node %s", node)
     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
     keyarray = []
     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
@@ -1684,66 +2126,69 @@ class LUAddNode(LogicalUnit):
       finally:
         f.close()
 
-    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
-                               keyarray[3], keyarray[4], keyarray[5])
+    result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
+                                    keyarray[2],
+                                    keyarray[3], keyarray[4], keyarray[5])
 
-    if not result:
-      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
+    msg = result.RemoteFailMsg()
+    if msg:
+      raise errors.OpExecError("Cannot transfer ssh keys to the"
+                               " new node: %s" % msg)
 
     # Add node to our /etc/hosts, and add key to known_hosts
     utils.AddHostToEtcHosts(new_node.name)
 
     if new_node.secondary_ip != new_node.primary_ip:
-      if not rpc.call_node_tcp_ping(new_node.name,
-                                    constants.LOCALHOST_IP_ADDRESS,
-                                    new_node.secondary_ip,
-                                    constants.DEFAULT_NODED_PORT,
-                                    10, False):
+      result = self.rpc.call_node_has_ip_address(new_node.name,
+                                                 new_node.secondary_ip)
+      if result.failed or not result.data:
         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
                                  " you gave (%s). Please fix and re-run this"
                                  " command." % new_node.secondary_ip)
 
-    node_verify_list = [self.sstore.GetMasterNode()]
+    node_verify_list = [self.cfg.GetMasterNode()]
     node_verify_param = {
       'nodelist': [node],
       # TODO: do a node-net-test as well?
     }
 
-    result = rpc.call_node_verify(node_verify_list, node_verify_param)
+    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
+                                       self.cfg.GetClusterName())
     for verifier in node_verify_list:
-      if not result[verifier]:
+      if result[verifier].failed or not result[verifier].data:
         raise errors.OpExecError("Cannot communicate with %s's node daemon"
                                  " for remote verification" % verifier)
-      if result[verifier]['nodelist']:
-        for failed in result[verifier]['nodelist']:
+      if result[verifier].data['nodelist']:
+        for failed in result[verifier].data['nodelist']:
           feedback_fn("ssh/hostname verification failed %s -> %s" %
-                      (verifier, result[verifier]['nodelist'][failed]))
+                      (verifier, result[verifier].data['nodelist'][failed]))
         raise errors.OpExecError("ssh/hostname verification failed.")
 
     # Distribute updated /etc/hosts and known_hosts to all nodes,
     # including the node just added
-    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
+    myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
     dist_nodes = self.cfg.GetNodeList()
     if not self.op.readd:
       dist_nodes.append(node)
     if myself.name in dist_nodes:
       dist_nodes.remove(myself.name)
 
-    logger.Debug("Copying hosts and known_hosts to all nodes")
+    logging.debug("Copying hosts and known_hosts to all nodes")
     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
-      result = rpc.call_upload_file(dist_nodes, fname)
-      for to_node in dist_nodes:
-        if not result[to_node]:
-          logger.Error("copy of file %s to node %s failed" %
-                       (fname, to_node))
-
-    to_copy = self.sstore.GetFileList()
-    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
+      result = self.rpc.call_upload_file(dist_nodes, fname)
+      for to_node, to_result in result.iteritems():
+        if to_result.failed or not to_result.data:
+          logging.error("Copy of file %s to node %s failed", fname, to_node)
+
+    to_copy = []
+    enabled_hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
+    if constants.HTS_USE_VNC.intersection(enabled_hypervisors):
       to_copy.append(constants.VNC_PASSWORD_FILE)
+
     for fname in to_copy:
-      result = rpc.call_upload_file([node], fname)
-      if not result[node]:
-        logger.Error("could not copy file %s to node %s" % (fname, node))
+      result = self.rpc.call_upload_file([node], fname)
+      if result[node].failed or not result[node]:
+        logging.error("Could not copy file %s to node %s", fname, node)
 
     if self.op.readd:
       self.context.ReaddNode(new_node)
@@ -1751,12 +2196,117 @@ class LUAddNode(LogicalUnit):
       self.context.AddNode(new_node)
 
 
+class LUSetNodeParams(LogicalUnit):
+  """Modifies the parameters of a node.
+
+  """
+  HPATH = "node-modify"
+  HTYPE = constants.HTYPE_NODE
+  _OP_REQP = ["node_name"]
+  REQ_BGL = False
+
+  def CheckArguments(self):
+    node_name = self.cfg.ExpandNodeName(self.op.node_name)
+    if node_name is None:
+      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
+    self.op.node_name = node_name
+    _CheckBooleanOpField(self.op, 'master_candidate')
+    _CheckBooleanOpField(self.op, 'offline')
+    if self.op.master_candidate is None and self.op.offline is None:
+      raise errors.OpPrereqError("Please pass at least one modification")
+    if self.op.offline == True and self.op.master_candidate == True:
+      raise errors.OpPrereqError("Can't set the node into offline and"
+                                 " master_candidate at the same time")
+
+  def ExpandNames(self):
+    self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
+
+  def BuildHooksEnv(self):
+    """Build hooks env.
+
+    This runs on the master node.
+
+    """
+    env = {
+      "OP_TARGET": self.op.node_name,
+      "MASTER_CANDIDATE": str(self.op.master_candidate),
+      "OFFLINE": str(self.op.offline),
+      }
+    nl = [self.cfg.GetMasterNode(),
+          self.op.node_name]
+    return env, nl, nl
+
+  def CheckPrereq(self):
+    """Check prerequisites.
+
+    This only checks the instance list against the existing names.
+
+    """
+    node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
+
+    if ((self.op.master_candidate == False or self.op.offline == True)
+        and node.master_candidate):
+      # we will demote the node from master_candidate
+      if self.op.node_name == self.cfg.GetMasterNode():
+        raise errors.OpPrereqError("The master node has to be a"
+                                   " master candidate and online")
+      cp_size = self.cfg.GetClusterInfo().candidate_pool_size
+      num_candidates, _ = self.cfg.GetMasterCandidateStats()
+      if num_candidates <= cp_size:
+        msg = ("Not enough master candidates (desired"
+               " %d, new value will be %d)" % (cp_size, num_candidates-1))
+        if self.op.force:
+          self.LogWarning(msg)
+        else:
+          raise errors.OpPrereqError(msg)
+
+    if (self.op.master_candidate == True and node.offline and
+        not self.op.offline == False):
+      raise errors.OpPrereqError("Can't set an offline node to"
+                                 " master_candidate")
+
+    return
+
+  def Exec(self, feedback_fn):
+    """Modifies a node.
+
+    """
+    node = self.node
+
+    result = []
+
+    if self.op.offline is not None:
+      node.offline = self.op.offline
+      result.append(("offline", str(self.op.offline)))
+      if self.op.offline == True and node.master_candidate:
+        node.master_candidate = False
+        result.append(("master_candidate", "auto-demotion due to offline"))
+
+    if self.op.master_candidate is not None:
+      node.master_candidate = self.op.master_candidate
+      result.append(("master_candidate", str(self.op.master_candidate)))
+      if self.op.master_candidate == False:
+        rrc = self.rpc.call_node_demote_from_mc(node.name)
+        if (rrc.failed or not isinstance(rrc.data, (tuple, list))
+            or len(rrc.data) != 2):
+          self.LogWarning("Node rpc error: %s" % rrc.error)
+        elif not rrc.data[0]:
+          self.LogWarning("Node failed to demote itself: %s" % rrc.data[1])
+
+    # this will trigger configuration file update, if needed
+    self.cfg.Update(node)
+    # this will trigger job queue propagation or cleanup
+    if self.op.node_name != self.cfg.GetMasterNode():
+      self.context.ReaddNode(node)
+
+    return result
+
+
 class LUQueryClusterInfo(NoHooksLU):
   """Query cluster configuration.
 
   """
   _OP_REQP = []
-  REQ_MASTER = False
   REQ_BGL = False
 
   def ExpandNames(self):
@@ -1772,31 +2322,43 @@ class LUQueryClusterInfo(NoHooksLU):
     """Return cluster config.
 
     """
+    cluster = self.cfg.GetClusterInfo()
     result = {
-      "name": self.sstore.GetClusterName(),
       "software_version": constants.RELEASE_VERSION,
       "protocol_version": constants.PROTOCOL_VERSION,
       "config_version": constants.CONFIG_VERSION,
       "os_api_version": constants.OS_API_VERSION,
       "export_version": constants.EXPORT_VERSION,
-      "master": self.sstore.GetMasterNode(),
       "architecture": (platform.architecture()[0], platform.machine()),
-      "hypervisor_type": self.sstore.GetHypervisorType(),
+      "name": cluster.cluster_name,
+      "master": cluster.master_node,
+      "default_hypervisor": cluster.default_hypervisor,
+      "enabled_hypervisors": cluster.enabled_hypervisors,
+      "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
+                        for hypervisor in cluster.enabled_hypervisors]),
+      "beparams": cluster.beparams,
+      "candidate_pool_size": cluster.candidate_pool_size,
       }
 
     return result
 
 
-class LUDumpClusterConfig(NoHooksLU):
-  """Return a text-representation of the cluster-config.
+class LUQueryConfigValues(NoHooksLU):
+  """Return configuration values.
 
   """
   _OP_REQP = []
   REQ_BGL = False
+  _FIELDS_DYNAMIC = utils.FieldSet()
+  _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
 
   def ExpandNames(self):
     self.needed_locks = {}
 
+    _CheckOutputFields(static=self._FIELDS_STATIC,
+                       dynamic=self._FIELDS_DYNAMIC,
+                       selected=self.op.output_fields)
+
   def CheckPrereq(self):
     """No prerequisites.
 
@@ -1807,7 +2369,18 @@ class LUDumpClusterConfig(NoHooksLU):
     """Dump a representation of the cluster config to the standard output.
 
     """
-    return self.cfg.DumpConfig()
+    values = []
+    for field in self.op.output_fields:
+      if field == "cluster_name":
+        entry = self.cfg.GetClusterName()
+      elif field == "master_node":
+        entry = self.cfg.GetMasterNode()
+      elif field == "drain_flag":
+        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
+      else:
+        raise errors.ParameterError(field)
+      values.append(entry)
+    return values
 
 
 class LUActivateInstanceDisks(NoHooksLU):
@@ -1815,6 +2388,16 @@ class LUActivateInstanceDisks(NoHooksLU):
 
   """
   _OP_REQP = ["instance_name"]
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    self._ExpandAndLockInstance()
+    self.needed_locks[locking.LEVEL_NODE] = []
+    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+
+  def DeclareLocks(self, level):
+    if level == locking.LEVEL_NODE:
+      self._LockInstancesNodes()
 
   def CheckPrereq(self):
     """Check prerequisites.
@@ -1822,39 +2405,38 @@ class LUActivateInstanceDisks(NoHooksLU):
     This checks that the instance is in the cluster.
 
     """
-    instance = self.cfg.GetInstanceInfo(
-      self.cfg.ExpandInstanceName(self.op.instance_name))
-    if instance is None:
-      raise errors.OpPrereqError("Instance '%s' not known" %
-                                 self.op.instance_name)
-    self.instance = instance
-
+    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
+    assert self.instance is not None, \
+      "Cannot retrieve locked instance %s" % self.op.instance_name
+    _CheckNodeOnline(self, self.instance.primary_node)
 
   def Exec(self, feedback_fn):
     """Activate the disks.
 
     """
-    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
+    disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
     if not disks_ok:
       raise errors.OpExecError("Cannot activate block devices")
 
     return disks_info
 
 
-def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
+def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
   """Prepare the block devices for an instance.
 
   This sets up the block devices on all nodes.
 
-  Args:
-    instance: a ganeti.objects.Instance object
-    ignore_secondaries: if true, errors on secondary nodes won't result
-                        in an error return from the function
+  @type lu: L{LogicalUnit}
+  @param lu: the logical unit on whose behalf we execute
+  @type instance: L{objects.Instance}
+  @param instance: the instance for whose disks we assemble
+  @type ignore_secondaries: boolean
+  @param ignore_secondaries: if true, errors on secondary nodes
+      won't result in an error return from the function
+  @return: False if the operation failed, otherwise a list of
+      (host, instance_visible_name, node_visible_name)
+      with the mapping from node devices to instance devices
 
-  Returns:
-    false if the operation failed
-    list of (host, instance_visible_name, node_visible_name) if the operation
-         suceeded with the mapping from node devices to instance devices
   """
   device_info = []
   disks_ok = True
@@ -1871,11 +2453,12 @@ def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
   # 1st pass, assemble on all nodes in secondary mode
   for inst_disk in instance.disks:
     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
-      cfg.SetDiskID(node_disk, node)
-      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
-      if not result:
-        logger.Error("could not prepare block device %s on node %s"
-                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
+      lu.cfg.SetDiskID(node_disk, node)
+      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
+      if result.failed or not result:
+        lu.proc.LogWarning("Could not prepare block device %s on node %s"
+                           " (is_primary=False, pass=1)",
+                           inst_disk.iv_name, node)
         if not ignore_secondaries:
           disks_ok = False
 
@@ -1886,34 +2469,36 @@ def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
       if node != instance.primary_node:
         continue
-      cfg.SetDiskID(node_disk, node)
-      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
-      if not result:
-        logger.Error("could not prepare block device %s on node %s"
-                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
+      lu.cfg.SetDiskID(node_disk, node)
+      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
+      if result.failed or not result:
+        lu.proc.LogWarning("Could not prepare block device %s on node %s"
+                           " (is_primary=True, pass=2)",
+                           inst_disk.iv_name, node)
         disks_ok = False
-    device_info.append((instance.primary_node, inst_disk.iv_name, result))
+    device_info.append((instance.primary_node, inst_disk.iv_name, result.data))
 
   # leave the disks configured for the primary node
   # this is a workaround that would be fixed better by
   # improving the logical/physical id handling
   for disk in instance.disks:
-    cfg.SetDiskID(disk, instance.primary_node)
+    lu.cfg.SetDiskID(disk, instance.primary_node)
 
   return disks_ok, device_info
 
 
-def _StartInstanceDisks(cfg, instance, force):
+def _StartInstanceDisks(lu, instance, force):
   """Start the disks of an instance.
 
   """
-  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
+  disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
                                            ignore_secondaries=force)
   if not disks_ok:
-    _ShutdownInstanceDisks(instance, cfg)
+    _ShutdownInstanceDisks(lu, instance)
     if force is not None and not force:
-      logger.Error("If the message above refers to a secondary node,"
-                   " you can retry the operation using '--force'.")
+      lu.proc.LogWarning("", hint="If the message above refers to a"
+                         " secondary node,"
+                         " you can retry the operation using '--force'.")
     raise errors.OpExecError("Disk consistency error")
 
 
@@ -1922,6 +2507,16 @@ class LUDeactivateInstanceDisks(NoHooksLU):
 
   """
   _OP_REQP = ["instance_name"]
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    self._ExpandAndLockInstance()
+    self.needed_locks[locking.LEVEL_NODE] = []
+    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+
+  def DeclareLocks(self, level):
+    if level == locking.LEVEL_NODE:
+      self._LockInstancesNodes()
 
   def CheckPrereq(self):
     """Check prerequisites.
@@ -1929,32 +2524,40 @@ class LUDeactivateInstanceDisks(NoHooksLU):
     This checks that the instance is in the cluster.
 
     """
-    instance = self.cfg.GetInstanceInfo(
-      self.cfg.ExpandInstanceName(self.op.instance_name))
-    if instance is None:
-      raise errors.OpPrereqError("Instance '%s' not known" %
-                                 self.op.instance_name)
-    self.instance = instance
+    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
+    assert self.instance is not None, \
+      "Cannot retrieve locked instance %s" % self.op.instance_name
 
   def Exec(self, feedback_fn):
     """Deactivate the disks
 
     """
     instance = self.instance
-    ins_l = rpc.call_instance_list([instance.primary_node])
-    ins_l = ins_l[instance.primary_node]
-    if not type(ins_l) is list:
-      raise errors.OpExecError("Can't contact node '%s'" %
-                               instance.primary_node)
+    _SafeShutdownInstanceDisks(self, instance)
+
 
-    if self.instance.name in ins_l:
-      raise errors.OpExecError("Instance is running, can't shutdown"
-                               " block devices.")
+def _SafeShutdownInstanceDisks(lu, instance):
+  """Shutdown block devices of an instance.
 
-    _ShutdownInstanceDisks(instance, self.cfg)
+  This function checks if an instance is running, before calling
+  _ShutdownInstanceDisks.
+
+  """
+  ins_l = lu.rpc.call_instance_list([instance.primary_node],
+                                      [instance.hypervisor])
+  ins_l = ins_l[instance.primary_node]
+  if ins_l.failed or not isinstance(ins_l.data, list):
+    raise errors.OpExecError("Can't contact node '%s'" %
+                             instance.primary_node)
 
+  if instance.name in ins_l.data:
+    raise errors.OpExecError("Instance is running, can't shutdown"
+                             " block devices.")
 
-def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
+  _ShutdownInstanceDisks(lu, instance)
+
+
+def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
   """Shutdown block devices of an instance.
 
   This does the shutdown on all nodes of the instance.
@@ -1966,16 +2569,17 @@ def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
   result = True
   for disk in instance.disks:
     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
-      cfg.SetDiskID(top_disk, node)
-      if not rpc.call_blockdev_shutdown(node, top_disk):
-        logger.Error("could not shutdown block device %s on node %s" %
-                     (disk.iv_name, node))
+      lu.cfg.SetDiskID(top_disk, node)
+      result = lu.rpc.call_blockdev_shutdown(node, top_disk)
+      if result.failed or not result.data:
+        logging.error("Could not shutdown block device %s on node %s",
+                      disk.iv_name, node)
         if not ignore_primary or node != instance.primary_node:
           result = False
   return result
 
 
-def _CheckNodeFreeMemory(cfg, node, reason, requested):
+def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
   """Checks if a node has enough free memory.
 
   This function check if a given node has the needed amount of free
@@ -1983,19 +2587,23 @@ def _CheckNodeFreeMemory(cfg, node, reason, requested):
   information from the node, this function raise an OpPrereqError
   exception.
 
-  Args:
-    - cfg: a ConfigWriter instance
-    - node: the node name
-    - reason: string to use in the error message
-    - requested: the amount of memory in MiB
+  @type lu: C{LogicalUnit}
+  @param lu: a logical unit from which we get configuration data
+  @type node: C{str}
+  @param node: the node to check
+  @type reason: C{str}
+  @param reason: string to use in the error message
+  @type requested: C{int}
+  @param requested: the amount of memory in MiB to check for
+  @type hypervisor_name: C{str}
+  @param hypervisor_name: the hypervisor to ask for memory stats
+  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
+      we cannot check the node
 
   """
-  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
-  if not nodeinfo or not isinstance(nodeinfo, dict):
-    raise errors.OpPrereqError("Could not contact node %s for resource"
-                             " information" % (node,))
-
-  free_mem = nodeinfo[node].get('memory_free')
+  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
+  nodeinfo[node].Raise()
+  free_mem = nodeinfo[node].data.get('memory_free')
   if not isinstance(free_mem, int):
     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
                              " was '%s'" % (node, free_mem))
@@ -2016,12 +2624,6 @@ class LUStartupInstance(LogicalUnit):
 
   def ExpandNames(self):
     self._ExpandAndLockInstance()
-    self.needed_locks[locking.LEVEL_NODE] = []
-    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
-
-  def DeclareLocks(self, level):
-    if level == locking.LEVEL_NODE:
-      self._LockInstancesNodes()
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -2032,9 +2634,8 @@ class LUStartupInstance(LogicalUnit):
     env = {
       "FORCE": self.op.force,
       }
-    env.update(_BuildInstanceHookEnvByObject(self.instance))
-    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
-          list(self.instance.secondary_nodes))
+    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
+    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
     return env, nl, nl
 
   def CheckPrereq(self):
@@ -2047,12 +2648,15 @@ class LUStartupInstance(LogicalUnit):
     assert self.instance is not None, \
       "Cannot retrieve locked instance %s" % self.op.instance_name
 
+    _CheckNodeOnline(self, instance.primary_node)
+
+    bep = self.cfg.GetClusterInfo().FillBE(instance)
     # check bridges existance
-    _CheckInstanceBridgesExist(instance)
+    _CheckInstanceBridgesExist(self, instance)
 
-    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
+    _CheckNodeFreeMemory(self, instance.primary_node,
                          "starting instance %s" % instance.name,
-                         instance.memory)
+                         bep[constants.BE_MEMORY], instance.hypervisor)
 
   def Exec(self, feedback_fn):
     """Start the instance.
@@ -2066,11 +2670,13 @@ class LUStartupInstance(LogicalUnit):
 
     node_current = instance.primary_node
 
-    _StartInstanceDisks(self.cfg, instance, force)
+    _StartInstanceDisks(self, instance, force)
 
-    if not rpc.call_instance_start(node_current, instance, extra_args):
-      _ShutdownInstanceDisks(instance, self.cfg)
-      raise errors.OpExecError("Could not start instance")
+    result = self.rpc.call_instance_start(node_current, instance, extra_args)
+    msg = result.RemoteFailMsg()
+    if msg:
+      _ShutdownInstanceDisks(self, instance)
+      raise errors.OpExecError("Could not start instance: %s" % msg)
 
 
 class LURebootInstance(LogicalUnit):
@@ -2091,13 +2697,6 @@ class LURebootInstance(LogicalUnit):
                                    constants.INSTANCE_REBOOT_HARD,
                                    constants.INSTANCE_REBOOT_FULL))
     self._ExpandAndLockInstance()
-    self.needed_locks[locking.LEVEL_NODE] = []
-    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
-
-  def DeclareLocks(self, level):
-    if level == locking.LEVEL_NODE:
-      # FIXME: lock only primary on (not constants.INSTANCE_REBOOT_FULL)
-      self._LockInstancesNodes()
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -2108,9 +2707,8 @@ class LURebootInstance(LogicalUnit):
     env = {
       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
       }
-    env.update(_BuildInstanceHookEnvByObject(self.instance))
-    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
-          list(self.instance.secondary_nodes))
+    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
+    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
     return env, nl, nl
 
   def CheckPrereq(self):
@@ -2123,8 +2721,10 @@ class LURebootInstance(LogicalUnit):
     assert self.instance is not None, \
       "Cannot retrieve locked instance %s" % self.op.instance_name
 
+    _CheckNodeOnline(self, instance.primary_node)
+
     # check bridges existance
-    _CheckInstanceBridgesExist(instance)
+    _CheckInstanceBridgesExist(self, instance)
 
   def Exec(self, feedback_fn):
     """Reboot the instance.
@@ -2139,17 +2739,21 @@ class LURebootInstance(LogicalUnit):
 
     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
                        constants.INSTANCE_REBOOT_HARD]:
-      if not rpc.call_instance_reboot(node_current, instance,
-                                      reboot_type, extra_args):
+      result = self.rpc.call_instance_reboot(node_current, instance,
+                                             reboot_type, extra_args)
+      if result.failed or not result.data:
         raise errors.OpExecError("Could not reboot instance")
     else:
-      if not rpc.call_instance_shutdown(node_current, instance):
+      if not self.rpc.call_instance_shutdown(node_current, instance):
         raise errors.OpExecError("could not shutdown instance for full reboot")
-      _ShutdownInstanceDisks(instance, self.cfg)
-      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
-      if not rpc.call_instance_start(node_current, instance, extra_args):
-        _ShutdownInstanceDisks(instance, self.cfg)
-        raise errors.OpExecError("Could not start instance for full reboot")
+      _ShutdownInstanceDisks(self, instance)
+      _StartInstanceDisks(self, instance, ignore_secondaries)
+      result = self.rpc.call_instance_start(node_current, instance, extra_args)
+      msg = result.RemoteFailMsg()
+      if msg:
+        _ShutdownInstanceDisks(self, instance)
+        raise errors.OpExecError("Could not start instance for"
+                                 " full reboot: %s" % msg)
 
     self.cfg.MarkInstanceUp(instance.name)
 
@@ -2165,12 +2769,6 @@ class LUShutdownInstance(LogicalUnit):
 
   def ExpandNames(self):
     self._ExpandAndLockInstance()
-    self.needed_locks[locking.LEVEL_NODE] = []
-    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
-
-  def DeclareLocks(self, level):
-    if level == locking.LEVEL_NODE:
-      self._LockInstancesNodes()
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -2178,9 +2776,8 @@ class LUShutdownInstance(LogicalUnit):
     This runs on master, primary and secondary nodes of the instance.
 
     """
-    env = _BuildInstanceHookEnvByObject(self.instance)
-    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
-          list(self.instance.secondary_nodes))
+    env = _BuildInstanceHookEnvByObject(self, self.instance)
+    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
     return env, nl, nl
 
   def CheckPrereq(self):
@@ -2192,6 +2789,7 @@ class LUShutdownInstance(LogicalUnit):
     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
     assert self.instance is not None, \
       "Cannot retrieve locked instance %s" % self.op.instance_name
+    _CheckNodeOnline(self, self.instance.primary_node)
 
   def Exec(self, feedback_fn):
     """Shutdown the instance.
@@ -2200,10 +2798,11 @@ class LUShutdownInstance(LogicalUnit):
     instance = self.instance
     node_current = instance.primary_node
     self.cfg.MarkInstanceDown(instance.name)
-    if not rpc.call_instance_shutdown(node_current, instance):
-      logger.Error("could not shutdown instance")
+    result = self.rpc.call_instance_shutdown(node_current, instance)
+    if result.failed or not result.data:
+      self.proc.LogWarning("Could not shutdown instance")
 
-    _ShutdownInstanceDisks(instance, self.cfg)
+    _ShutdownInstanceDisks(self, instance)
 
 
 class LUReinstallInstance(LogicalUnit):
@@ -2217,12 +2816,6 @@ class LUReinstallInstance(LogicalUnit):
 
   def ExpandNames(self):
     self._ExpandAndLockInstance()
-    self.needed_locks[locking.LEVEL_NODE] = []
-    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
-
-  def DeclareLocks(self, level):
-    if level == locking.LEVEL_NODE:
-      self._LockInstancesNodes()
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -2230,9 +2823,8 @@ class LUReinstallInstance(LogicalUnit):
     This runs on master, primary and secondary nodes of the instance.
 
     """
-    env = _BuildInstanceHookEnvByObject(self.instance)
-    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
-          list(self.instance.secondary_nodes))
+    env = _BuildInstanceHookEnvByObject(self, self.instance)
+    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
     return env, nl, nl
 
   def CheckPrereq(self):
@@ -2244,15 +2836,18 @@ class LUReinstallInstance(LogicalUnit):
     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
     assert instance is not None, \
       "Cannot retrieve locked instance %s" % self.op.instance_name
+    _CheckNodeOnline(self, instance.primary_node)
 
     if instance.disk_template == constants.DT_DISKLESS:
       raise errors.OpPrereqError("Instance '%s' has no disks" %
                                  self.op.instance_name)
-    if instance.status != "down":
+    if instance.admin_up:
       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
                                  self.op.instance_name)
-    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
-    if remote_info:
+    remote_info = self.rpc.call_instance_info(instance.primary_node,
+                                              instance.name,
+                                              instance.hypervisor)
+    if remote_info.failed or remote_info.data:
       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
                                  (self.op.instance_name,
                                   instance.primary_node))
@@ -2265,8 +2860,9 @@ class LUReinstallInstance(LogicalUnit):
       if pnode is None:
         raise errors.OpPrereqError("Primary node '%s' is unknown" %
                                    self.op.pnode)
-      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
-      if not os_obj:
+      result = self.rpc.call_os_get(pnode.name, self.op.os_type)
+      result.Raise()
+      if not isinstance(result.data, objects.OS):
         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
                                    " primary node"  % self.op.os_type)
 
@@ -2281,17 +2877,19 @@ class LUReinstallInstance(LogicalUnit):
     if self.op.os_type is not None:
       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
       inst.os = self.op.os_type
-      self.cfg.AddInstance(inst)
+      self.cfg.Update(inst)
 
-    _StartInstanceDisks(self.cfg, inst, None)
+    _StartInstanceDisks(self, inst, None)
     try:
       feedback_fn("Running the instance OS create scripts...")
-      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
+      result = self.rpc.call_instance_os_add(inst.primary_node, inst)
+      msg = result.RemoteFailMsg()
+      if msg:
         raise errors.OpExecError("Could not install OS for instance %s"
-                                 " on node %s" %
-                                 (inst.name, inst.primary_node))
+                                 " on node %s: %s" %
+                                 (inst.name, inst.primary_node, msg))
     finally:
-      _ShutdownInstanceDisks(inst, self.cfg)
+      _ShutdownInstanceDisks(self, inst)
 
 
 class LURenameInstance(LogicalUnit):
@@ -2308,10 +2906,9 @@ class LURenameInstance(LogicalUnit):
     This runs on master, primary and secondary nodes of the instance.
 
     """
-    env = _BuildInstanceHookEnvByObject(self.instance)
+    env = _BuildInstanceHookEnvByObject(self, self.instance)
     env["INSTANCE_NEW_NAME"] = self.op.new_name
-    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
-          list(self.instance.secondary_nodes))
+    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
     return env, nl, nl
 
   def CheckPrereq(self):
@@ -2325,11 +2922,16 @@ class LURenameInstance(LogicalUnit):
     if instance is None:
       raise errors.OpPrereqError("Instance '%s' not known" %
                                  self.op.instance_name)
-    if instance.status != "down":
+    _CheckNodeOnline(self, instance.primary_node)
+
+    if instance.admin_up:
       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
                                  self.op.instance_name)
-    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
-    if remote_info:
+    remote_info = self.rpc.call_instance_info(instance.primary_node,
+                                              instance.name,
+                                              instance.hypervisor)
+    remote_info.Raise()
+    if remote_info.data:
       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
                                  (self.op.instance_name,
                                   instance.primary_node))
@@ -2362,7 +2964,7 @@ class LURenameInstance(LogicalUnit):
 
     self.cfg.RenameInstance(inst.name, self.op.new_name)
     # Change the instance lock. This is definitely safe while we hold the BGL
-    self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
+    self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
 
     # re-read the instance from the configuration after rename
@@ -2370,33 +2972,35 @@ class LURenameInstance(LogicalUnit):
 
     if inst.disk_template == constants.DT_FILE:
       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
-      result = rpc.call_file_storage_dir_rename(inst.primary_node,
-                                                old_file_storage_dir,
-                                                new_file_storage_dir)
-
-      if not result:
+      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
+                                                     old_file_storage_dir,
+                                                     new_file_storage_dir)
+      result.Raise()
+      if not result.data:
         raise errors.OpExecError("Could not connect to node '%s' to rename"
                                  " directory '%s' to '%s' (but the instance"
                                  " has been renamed in Ganeti)" % (
                                  inst.primary_node, old_file_storage_dir,
                                  new_file_storage_dir))
 
-      if not result[0]:
+      if not result.data[0]:
         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
                                  " (but the instance has been renamed in"
                                  " Ganeti)" % (old_file_storage_dir,
                                                new_file_storage_dir))
 
-    _StartInstanceDisks(self.cfg, inst, None)
+    _StartInstanceDisks(self, inst, None)
     try:
-      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
-                                          "sda", "sdb"):
+      result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
+                                                 old_name)
+      msg = result.RemoteFailMsg()
+      if msg:
         msg = ("Could not run OS rename script for instance %s on node %s"
-               " (but the instance has been renamed in Ganeti)" %
-               (inst.name, inst.primary_node))
-        logger.Error(msg)
+               " (but the instance has been renamed in Ganeti): %s" %
+               (inst.name, inst.primary_node, msg))
+        self.proc.LogWarning(msg)
     finally:
-      _ShutdownInstanceDisks(inst, self.cfg)
+      _ShutdownInstanceDisks(self, inst)
 
 
 class LURemoveInstance(LogicalUnit):
@@ -2406,6 +3010,16 @@ class LURemoveInstance(LogicalUnit):
   HPATH = "instance-remove"
   HTYPE = constants.HTYPE_INSTANCE
   _OP_REQP = ["instance_name", "ignore_failures"]
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    self._ExpandAndLockInstance()
+    self.needed_locks[locking.LEVEL_NODE] = []
+    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+
+  def DeclareLocks(self, level):
+    if level == locking.LEVEL_NODE:
+      self._LockInstancesNodes()
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -2413,8 +3027,8 @@ class LURemoveInstance(LogicalUnit):
     This runs on master, primary and secondary nodes of the instance.
 
     """
-    env = _BuildInstanceHookEnvByObject(self.instance)
-    nl = [self.sstore.GetMasterNode()]
+    env = _BuildInstanceHookEnvByObject(self, self.instance)
+    nl = [self.cfg.GetMasterNode()]
     return env, nl, nl
 
   def CheckPrereq(self):
@@ -2423,124 +3037,156 @@ class LURemoveInstance(LogicalUnit):
     This checks that the instance is in the cluster.
 
     """
-    instance = self.cfg.GetInstanceInfo(
-      self.cfg.ExpandInstanceName(self.op.instance_name))
-    if instance is None:
-      raise errors.OpPrereqError("Instance '%s' not known" %
-                                 self.op.instance_name)
-    self.instance = instance
+    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
+    assert self.instance is not None, \
+      "Cannot retrieve locked instance %s" % self.op.instance_name
 
   def Exec(self, feedback_fn):
     """Remove the instance.
 
     """
     instance = self.instance
-    logger.Info("shutting down instance %s on node %s" %
-                (instance.name, instance.primary_node))
+    logging.info("Shutting down instance %s on node %s",
+                 instance.name, instance.primary_node)
 
-    if not rpc.call_instance_shutdown(instance.primary_node, instance):
+    result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
+    if result.failed or not result.data:
       if self.op.ignore_failures:
         feedback_fn("Warning: can't shutdown instance")
       else:
         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
                                  (instance.name, instance.primary_node))
 
-    logger.Info("removing block devices for instance %s" % instance.name)
+    logging.info("Removing block devices for instance %s", instance.name)
 
-    if not _RemoveDisks(instance, self.cfg):
+    if not _RemoveDisks(self, instance):
       if self.op.ignore_failures:
         feedback_fn("Warning: can't remove instance's disks")
       else:
         raise errors.OpExecError("Can't remove instance's disks")
 
-    logger.Info("removing instance %s out of cluster config" % instance.name)
+    logging.info("Removing instance %s out of cluster config", instance.name)
 
     self.cfg.RemoveInstance(instance.name)
-    # Remove the new instance from the Ganeti Lock Manager
-    self.context.glm.remove(locking.LEVEL_INSTANCE, instance.name)
+    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
 
 
 class LUQueryInstances(NoHooksLU):
   """Logical unit for querying instances.
 
   """
-  _OP_REQP = ["output_fields", "names"]
+  _OP_REQP = ["output_fields", "names", "use_locking"]
   REQ_BGL = False
+  _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
+                                    "admin_state", "admin_ram",
+                                    "disk_template", "ip", "mac", "bridge",
+                                    "sda_size", "sdb_size", "vcpus", "tags",
+                                    "network_port", "beparams",
+                                    "(disk).(size)/([0-9]+)",
+                                    "(disk).(sizes)",
+                                    "(nic).(mac|ip|bridge)/([0-9]+)",
+                                    "(nic).(macs|ips|bridges)",
+                                    "(disk|nic).(count)",
+                                    "serial_no", "hypervisor", "hvparams",] +
+                                  ["hv/%s" % name
+                                   for name in constants.HVS_PARAMETERS] +
+                                  ["be/%s" % name
+                                   for name in constants.BES_PARAMETERS])
+  _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
+
 
   def ExpandNames(self):
-    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
-    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
-                               "admin_state", "admin_ram",
-                               "disk_template", "ip", "mac", "bridge",
-                               "sda_size", "sdb_size", "vcpus", "tags",
-                               "auto_balance",
-                               "network_port", "kernel_path", "initrd_path",
-                               "hvm_boot_order", "hvm_acpi", "hvm_pae",
-                               "hvm_cdrom_image_path", "hvm_nic_type",
-                               "hvm_disk_type", "vnc_bind_address"],
-                       dynamic=self.dynamic_fields,
+    _CheckOutputFields(static=self._FIELDS_STATIC,
+                       dynamic=self._FIELDS_DYNAMIC,
                        selected=self.op.output_fields)
 
     self.needed_locks = {}
     self.share_locks[locking.LEVEL_INSTANCE] = 1
     self.share_locks[locking.LEVEL_NODE] = 1
 
-    # TODO: we could lock instances (and nodes) only if the user asked for
-    # dynamic fields. For that we need atomic ways to get info for a group of
-    # instances from the config, though.
-    if not self.op.names:
-      self.needed_locks[locking.LEVEL_INSTANCE] = None # Acquire all
+    if self.op.names:
+      self.wanted = _GetWantedInstances(self, self.op.names)
     else:
-      self.needed_locks[locking.LEVEL_INSTANCE] = \
-        _GetWantedInstances(self, self.op.names)
+      self.wanted = locking.ALL_SET
 
-    self.needed_locks[locking.LEVEL_NODE] = []
-    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
+    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
+    self.do_locking = self.do_node_query and self.op.use_locking
+    if self.do_locking:
+      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
+      self.needed_locks[locking.LEVEL_NODE] = []
+      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
 
   def DeclareLocks(self, level):
-    # TODO: locking of nodes could be avoided when not querying them
-    if level == locking.LEVEL_NODE:
+    if level == locking.LEVEL_NODE and self.do_locking:
       self._LockInstancesNodes()
 
   def CheckPrereq(self):
     """Check prerequisites.
 
     """
-    # This of course is valid only if we locked the instances
-    self.wanted = self.acquired_locks[locking.LEVEL_INSTANCE]
+    pass
 
   def Exec(self, feedback_fn):
     """Computes the list of nodes and their attributes.
 
     """
-    instance_names = self.wanted
-    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
-                     in instance_names]
+    all_info = self.cfg.GetAllInstancesInfo()
+    if self.wanted == locking.ALL_SET:
+      # caller didn't specify instance names, so ordering is not important
+      if self.do_locking:
+        instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
+      else:
+        instance_names = all_info.keys()
+      instance_names = utils.NiceSort(instance_names)
+    else:
+      # caller did specify names, so we must keep the ordering
+      if self.do_locking:
+        tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
+      else:
+        tgt_set = all_info.keys()
+      missing = set(self.wanted).difference(tgt_set)
+      if missing:
+        raise errors.OpExecError("Some instances were removed before"
+                                 " retrieving their data: %s" % missing)
+      instance_names = self.wanted
+
+    instance_list = [all_info[iname] for iname in instance_names]
 
     # begin data gathering
 
     nodes = frozenset([inst.primary_node for inst in instance_list])
+    hv_list = list(set([inst.hypervisor for inst in instance_list]))
 
     bad_nodes = []
-    if self.dynamic_fields.intersection(self.op.output_fields):
+    off_nodes = []
+    if self.do_node_query:
       live_data = {}
-      node_data = rpc.call_all_instances_info(nodes)
+      node_data = self.rpc.call_all_instances_info(nodes, hv_list)
       for name in nodes:
         result = node_data[name]
-        if result:
-          live_data.update(result)
-        elif result == False:
+        if result.offline:
+          # offline nodes will be in both lists
+          off_nodes.append(name)
+        if result.failed:
           bad_nodes.append(name)
-        # else no instance is alive
+        else:
+          if result.data:
+            live_data.update(result.data)
+            # else no instance is alive
     else:
       live_data = dict([(name, {}) for name in instance_names])
 
     # end data gathering
 
+    HVPREFIX = "hv/"
+    BEPREFIX = "be/"
     output = []
     for instance in instance_list:
       iout = []
+      i_hv = self.cfg.GetClusterInfo().FillHV(instance)
+      i_be = self.cfg.GetClusterInfo().FillBE(instance)
       for field in self.op.output_fields:
+        st_match = self._FIELDS_STATIC.Matches(field)
         if field == "name":
           val = instance.name
         elif field == "os":
@@ -2550,29 +3196,29 @@ class LUQueryInstances(NoHooksLU):
         elif field == "snodes":
           val = list(instance.secondary_nodes)
         elif field == "admin_state":
-          val = (instance.status != "down")
+          val = instance.admin_up
         elif field == "oper_state":
           if instance.primary_node in bad_nodes:
             val = None
           else:
             val = bool(live_data.get(instance.name))
         elif field == "status":
-          if instance.primary_node in bad_nodes:
+          if instance.primary_node in off_nodes:
+            val = "ERROR_nodeoffline"
+          elif instance.primary_node in bad_nodes:
             val = "ERROR_nodedown"
           else:
             running = bool(live_data.get(instance.name))
             if running:
-              if instance.status != "down":
+              if instance.admin_up:
                 val = "running"
               else:
                 val = "ERROR_up"
             else:
-              if instance.status != "down":
+              if instance.admin_up:
                 val = "ERROR_down"
               else:
                 val = "ADMIN_down"
-        elif field == "admin_ram":
-          val = instance.memory
         elif field == "oper_ram":
           if instance.primary_node in bad_nodes:
             val = None
@@ -2589,27 +3235,69 @@ class LUQueryInstances(NoHooksLU):
         elif field == "mac":
           val = instance.nics[0].mac
         elif field == "sda_size" or field == "sdb_size":
-          disk = instance.FindDisk(field[:3])
-          if disk is None:
+          idx = ord(field[2]) - ord('a')
+          try:
+            val = instance.FindDisk(idx).size
+          except errors.OpPrereqError:
             val = None
-          else:
-            val = disk.size
-        elif field == "vcpus":
-          val = instance.vcpus
         elif field == "tags":
           val = list(instance.GetTags())
-        elif field in ("network_port", "kernel_path", "initrd_path",
-                       "hvm_boot_order", "hvm_acpi", "hvm_pae",
-                       "hvm_cdrom_image_path", "hvm_nic_type",
-                       "hvm_disk_type", "vnc_bind_address"):
-          val = getattr(instance, field, None)
-          if val is not None:
-            pass
-          elif field in ("hvm_nic_type", "hvm_disk_type",
-                         "kernel_path", "initrd_path"):
-            val = "default"
+        elif field == "serial_no":
+          val = instance.serial_no
+        elif field == "network_port":
+          val = instance.network_port
+        elif field == "hypervisor":
+          val = instance.hypervisor
+        elif field == "hvparams":
+          val = i_hv
+        elif (field.startswith(HVPREFIX) and
+              field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
+          val = i_hv.get(field[len(HVPREFIX):], None)
+        elif field == "beparams":
+          val = i_be
+        elif (field.startswith(BEPREFIX) and
+              field[len(BEPREFIX):] in constants.BES_PARAMETERS):
+          val = i_be.get(field[len(BEPREFIX):], None)
+        elif st_match and st_match.groups():
+          # matches a variable list
+          st_groups = st_match.groups()
+          if st_groups and st_groups[0] == "disk":
+            if st_groups[1] == "count":
+              val = len(instance.disks)
+            elif st_groups[1] == "sizes":
+              val = [disk.size for disk in instance.disks]
+            elif st_groups[1] == "size":
+              try:
+                val = instance.FindDisk(st_groups[2]).size
+              except errors.OpPrereqError:
+                val = None
+            else:
+              assert False, "Unhandled disk parameter"
+          elif st_groups[0] == "nic":
+            if st_groups[1] == "count":
+              val = len(instance.nics)
+            elif st_groups[1] == "macs":
+              val = [nic.mac for nic in instance.nics]
+            elif st_groups[1] == "ips":
+              val = [nic.ip for nic in instance.nics]
+            elif st_groups[1] == "bridges":
+              val = [nic.bridge for nic in instance.nics]
+            else:
+              # index-based item
+              nic_idx = int(st_groups[2])
+              if nic_idx >= len(instance.nics):
+                val = None
+              else:
+                if st_groups[1] == "mac":
+                  val = instance.nics[nic_idx].mac
+                elif st_groups[1] == "ip":
+                  val = instance.nics[nic_idx].ip
+                elif st_groups[1] == "bridge":
+                  val = instance.nics[nic_idx].bridge
+                else:
+                  assert False, "Unhandled NIC parameter"
           else:
-            val = "-"
+            assert False, "Unhandled variable parameter"
         else:
           raise errors.ParameterError(field)
         iout.append(val)
@@ -2630,7 +3318,7 @@ class LUFailoverInstance(LogicalUnit):
   def ExpandNames(self):
     self._ExpandAndLockInstance()
     self.needed_locks[locking.LEVEL_NODE] = []
-    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
+    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
 
   def DeclareLocks(self, level):
     if level == locking.LEVEL_NODE:
@@ -2645,8 +3333,8 @@ class LUFailoverInstance(LogicalUnit):
     env = {
       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
       }
-    env.update(_BuildInstanceHookEnvByObject(self.instance))
-    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
+    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
+    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
     return env, nl, nl
 
   def CheckPrereq(self):
@@ -2659,6 +3347,7 @@ class LUFailoverInstance(LogicalUnit):
     assert self.instance is not None, \
       "Cannot retrieve locked instance %s" % self.op.instance_name
 
+    bep = self.cfg.GetClusterInfo().FillBE(instance)
     if instance.disk_template not in constants.DTS_NET_MIRROR:
       raise errors.OpPrereqError("Instance's disk layout is not"
                                  " network mirrored, cannot failover.")
@@ -2669,13 +3358,17 @@ class LUFailoverInstance(LogicalUnit):
                                    "a mirrored disk template")
 
     target_node = secondary_nodes[0]
+    _CheckNodeOnline(self, target_node)
     # check memory requirements on the secondary node
-    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
-                         instance.name, instance.memory)
+    _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
+                         instance.name, bep[constants.BE_MEMORY],
+                         instance.hypervisor)
 
     # check bridge existance
     brlist = [nic.bridge for nic in instance.nics]
-    if not rpc.call_bridges_exist(target_node, brlist):
+    result = self.rpc.call_bridges_exist(target_node, brlist)
+    result.Raise()
+    if not result.data:
       raise errors.OpPrereqError("One or more target bridges %s does not"
                                  " exist on destination node '%s'" %
                                  (brlist, target_node))
@@ -2695,26 +3388,28 @@ class LUFailoverInstance(LogicalUnit):
     feedback_fn("* checking disk consistency between source and target")
     for dev in instance.disks:
       # for drbd, these are drbd over lvm
-      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
-        if instance.status == "up" and not self.op.ignore_consistency:
+      if not _CheckDiskConsistency(self, dev, target_node, False):
+        if instance.admin_up and not self.op.ignore_consistency:
           raise errors.OpExecError("Disk %s is degraded on target node,"
                                    " aborting failover." % dev.iv_name)
 
     feedback_fn("* shutting down instance on source node")
-    logger.Info("Shutting down instance %s on node %s" %
-                (instance.name, source_node))
+    logging.info("Shutting down instance %s on node %s",
+                 instance.name, source_node)
 
-    if not rpc.call_instance_shutdown(source_node, instance):
+    result = self.rpc.call_instance_shutdown(source_node, instance)
+    if result.failed or not result.data:
       if self.op.ignore_consistency:
-        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
-                     " anyway. Please make sure node %s is down"  %
-                     (instance.name, source_node, source_node))
+        self.proc.LogWarning("Could not shutdown instance %s on node %s."
+                             " Proceeding"
+                             " anyway. Please make sure node %s is down",
+                             instance.name, source_node, source_node)
       else:
         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
                                  (instance.name, source_node))
 
     feedback_fn("* deactivating the instance's disks on source node")
-    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
+    if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
       raise errors.OpExecError("Can't shut down the instance's disks.")
 
     instance.primary_node = target_node
@@ -2722,75 +3417,470 @@ class LUFailoverInstance(LogicalUnit):
     self.cfg.Update(instance)
 
     # Only start the instance if it's marked as up
-    if instance.status == "up":
+    if instance.admin_up:
       feedback_fn("* activating the instance's disks on target node")
-      logger.Info("Starting instance %s on node %s" %
-                  (instance.name, target_node))
+      logging.info("Starting instance %s on node %s",
+                   instance.name, target_node)
 
-      disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
+      disks_ok, dummy = _AssembleInstanceDisks(self, instance,
                                                ignore_secondaries=True)
       if not disks_ok:
-        _ShutdownInstanceDisks(instance, self.cfg)
+        _ShutdownInstanceDisks(self, instance)
         raise errors.OpExecError("Can't activate the instance's disks")
 
       feedback_fn("* starting the instance on the target node")
-      if not rpc.call_instance_start(target_node, instance, None):
-        _ShutdownInstanceDisks(instance, self.cfg)
-        raise errors.OpExecError("Could not start instance %s on node %s." %
-                                 (instance.name, target_node))
+      result = self.rpc.call_instance_start(target_node, instance, None)
+      msg = result.RemoteFailMsg()
+      if msg:
+        _ShutdownInstanceDisks(self, instance)
+        raise errors.OpExecError("Could not start instance %s on node %s: %s" %
+                                 (instance.name, target_node, msg))
 
 
-def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
-  """Create a tree of block devices on the primary node.
+class LUMigrateInstance(LogicalUnit):
+  """Migrate an instance.
 
-  This always creates all devices.
+  This is migration without shutting down, compared to the failover,
+  which is done with shutdown.
 
   """
-  if device.children:
-    for child in device.children:
-      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
-        return False
-
-  cfg.SetDiskID(device, node)
-  new_id = rpc.call_blockdev_create(node, device, device.size,
-                                    instance.name, True, info)
-  if not new_id:
-    return False
-  if device.physical_id is None:
-    device.physical_id = new_id
-  return True
+  HPATH = "instance-migrate"
+  HTYPE = constants.HTYPE_INSTANCE
+  _OP_REQP = ["instance_name", "live", "cleanup"]
+
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    self._ExpandAndLockInstance()
+    self.needed_locks[locking.LEVEL_NODE] = []
+    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+
+  def DeclareLocks(self, level):
+    if level == locking.LEVEL_NODE:
+      self._LockInstancesNodes()
+
+  def BuildHooksEnv(self):
+    """Build hooks env.
+
+    This runs on master, primary and secondary nodes of the instance.
+
+    """
+    env = _BuildInstanceHookEnvByObject(self, self.instance)
+    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
+    return env, nl, nl
+
+  def CheckPrereq(self):
+    """Check prerequisites.
+
+    This checks that the instance is in the cluster.
+
+    """
+    instance = self.cfg.GetInstanceInfo(
+      self.cfg.ExpandInstanceName(self.op.instance_name))
+    if instance is None:
+      raise errors.OpPrereqError("Instance '%s' not known" %
+                                 self.op.instance_name)
+
+    if instance.disk_template != constants.DT_DRBD8:
+      raise errors.OpPrereqError("Instance's disk layout is not"
+                                 " drbd8, cannot migrate.")
+
+    secondary_nodes = instance.secondary_nodes
+    if not secondary_nodes:
+      raise errors.ProgrammerError("no secondary node but using "
+                                   "drbd8 disk template")
+
+    i_be = self.cfg.GetClusterInfo().FillBE(instance)
+
+    target_node = secondary_nodes[0]
+    # check memory requirements on the secondary node
+    _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
+                         instance.name, i_be[constants.BE_MEMORY],
+                         instance.hypervisor)
+
+    # check bridge existance
+    brlist = [nic.bridge for nic in instance.nics]
+    result = self.rpc.call_bridges_exist(target_node, brlist)
+    if result.failed or not result.data:
+      raise errors.OpPrereqError("One or more target bridges %s does not"
+                                 " exist on destination node '%s'" %
+                                 (brlist, target_node))
+
+    if not self.op.cleanup:
+      result = self.rpc.call_instance_migratable(instance.primary_node,
+                                                 instance)
+      msg = result.RemoteFailMsg()
+      if msg:
+        raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
+                                   msg)
+
+    self.instance = instance
+
+  def _WaitUntilSync(self):
+    """Poll with custom rpc for disk sync.
+
+    This uses our own step-based rpc call.
+
+    """
+    self.feedback_fn("* wait until resync is done")
+    all_done = False
+    while not all_done:
+      all_done = True
+      result = self.rpc.call_drbd_wait_sync(self.all_nodes,
+                                            self.nodes_ip,
+                                            self.instance.disks)
+      min_percent = 100
+      for node, nres in result.items():
+        msg = nres.RemoteFailMsg()
+        if msg:
+          raise errors.OpExecError("Cannot resync disks on node %s: %s" %
+                                   (node, msg))
+        node_done, node_percent = nres.data[1]
+        all_done = all_done and node_done
+        if node_percent is not None:
+          min_percent = min(min_percent, node_percent)
+      if not all_done:
+        if min_percent < 100:
+          self.feedback_fn("   - progress: %.1f%%" % min_percent)
+        time.sleep(2)
+
+  def _EnsureSecondary(self, node):
+    """Demote a node to secondary.
+
+    """
+    self.feedback_fn("* switching node %s to secondary mode" % node)
+
+    for dev in self.instance.disks:
+      self.cfg.SetDiskID(dev, node)
+
+    result = self.rpc.call_blockdev_close(node, self.instance.name,
+                                          self.instance.disks)
+    msg = result.RemoteFailMsg()
+    if msg:
+      raise errors.OpExecError("Cannot change disk to secondary on node %s,"
+                               " error %s" % (node, msg))
+
+  def _GoStandalone(self):
+    """Disconnect from the network.
+
+    """
+    self.feedback_fn("* changing into standalone mode")
+    result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
+                                               self.instance.disks)
+    for node, nres in result.items():
+      msg = nres.RemoteFailMsg()
+      if msg:
+        raise errors.OpExecError("Cannot disconnect disks node %s,"
+                                 " error %s" % (node, msg))
+
+  def _GoReconnect(self, multimaster):
+    """Reconnect to the network.
+
+    """
+    if multimaster:
+      msg = "dual-master"
+    else:
+      msg = "single-master"
+    self.feedback_fn("* changing disks into %s mode" % msg)
+    result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
+                                           self.instance.disks,
+                                           self.instance.name, multimaster)
+    for node, nres in result.items():
+      msg = nres.RemoteFailMsg()
+      if msg:
+        raise errors.OpExecError("Cannot change disks config on node %s,"
+                                 " error: %s" % (node, msg))
+
+  def _ExecCleanup(self):
+    """Try to cleanup after a failed migration.
+
+    The cleanup is done by:
+      - check that the instance is running only on one node
+        (and update the config if needed)
+      - change disks on its secondary node to secondary
+      - wait until disks are fully synchronized
+      - disconnect from the network
+      - change disks into single-master mode
+      - wait again until disks are fully synchronized
+
+    """
+    instance = self.instance
+    target_node = self.target_node
+    source_node = self.source_node
+
+    # check running on only one node
+    self.feedback_fn("* checking where the instance actually runs"
+                     " (if this hangs, the hypervisor might be in"
+                     " a bad state)")
+    ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
+    for node, result in ins_l.items():
+      result.Raise()
+      if not isinstance(result.data, list):
+        raise errors.OpExecError("Can't contact node '%s'" % node)
+
+    runningon_source = instance.name in ins_l[source_node].data
+    runningon_target = instance.name in ins_l[target_node].data
+
+    if runningon_source and runningon_target:
+      raise errors.OpExecError("Instance seems to be running on two nodes,"
+                               " or the hypervisor is confused. You will have"
+                               " to ensure manually that it runs only on one"
+                               " and restart this operation.")
+
+    if not (runningon_source or runningon_target):
+      raise errors.OpExecError("Instance does not seem to be running at all."
+                               " In this case, it's safer to repair by"
+                               " running 'gnt-instance stop' to ensure disk"
+                               " shutdown, and then restarting it.")
+
+    if runningon_target:
+      # the migration has actually succeeded, we need to update the config
+      self.feedback_fn("* instance running on secondary node (%s),"
+                       " updating config" % target_node)
+      instance.primary_node = target_node
+      self.cfg.Update(instance)
+      demoted_node = source_node
+    else:
+      self.feedback_fn("* instance confirmed to be running on its"
+                       " primary node (%s)" % source_node)
+      demoted_node = target_node
+
+    self._EnsureSecondary(demoted_node)
+    try:
+      self._WaitUntilSync()
+    except errors.OpExecError:
+      # we ignore here errors, since if the device is standalone, it
+      # won't be able to sync
+      pass
+    self._GoStandalone()
+    self._GoReconnect(False)
+    self._WaitUntilSync()
+
+    self.feedback_fn("* done")
+
+  def _RevertDiskStatus(self):
+    """Try to revert the disk status after a failed migration.
+
+    """
+    target_node = self.target_node
+    try:
+      self._EnsureSecondary(target_node)
+      self._GoStandalone()
+      self._GoReconnect(False)
+      self._WaitUntilSync()
+    except errors.OpExecError, err:
+      self.LogWarning("Migration failed and I can't reconnect the"
+                      " drives: error '%s'\n"
+                      "Please look and recover the instance status" %
+                      str(err))
+
+  def _AbortMigration(self):
+    """Call the hypervisor code to abort a started migration.
 
+    """
+    instance = self.instance
+    target_node = self.target_node
+    migration_info = self.migration_info
+
+    abort_result = self.rpc.call_finalize_migration(target_node,
+                                                    instance,
+                                                    migration_info,
+                                                    False)
+    abort_msg = abort_result.RemoteFailMsg()
+    if abort_msg:
+      logging.error("Aborting migration failed on target node %s: %s" %
+                    (target_node, abort_msg))
+      # Don't raise an exception here, as we stil have to try to revert the
+      # disk status, even if this step failed.
+
+  def _ExecMigration(self):
+    """Migrate an instance.
+
+    The migrate is done by:
+      - change the disks into dual-master mode
+      - wait until disks are fully synchronized again
+      - migrate the instance
+      - change disks on the new secondary node (the old primary) to secondary
+      - wait until disks are fully synchronized
+      - change disks into single-master mode
+
+    """
+    instance = self.instance
+    target_node = self.target_node
+    source_node = self.source_node
+
+    self.feedback_fn("* checking disk consistency between source and target")
+    for dev in instance.disks:
+      if not _CheckDiskConsistency(self, dev, target_node, False):
+        raise errors.OpExecError("Disk %s is degraded or not fully"
+                                 " synchronized on target node,"
+                                 " aborting migrate." % dev.iv_name)
+
+    # First get the migration information from the remote node
+    result = self.rpc.call_migration_info(source_node, instance)
+    msg = result.RemoteFailMsg()
+    if msg:
+      log_err = ("Failed fetching source migration information from %s: %s" %
+                  (source_node, msg))
+      logging.error(log_err)
+      raise errors.OpExecError(log_err)
+
+    self.migration_info = migration_info = result.data[1]
+
+    # Then switch the disks to master/master mode
+    self._EnsureSecondary(target_node)
+    self._GoStandalone()
+    self._GoReconnect(True)
+    self._WaitUntilSync()
+
+    self.feedback_fn("* preparing %s to accept the instance" % target_node)
+    result = self.rpc.call_accept_instance(target_node,
+                                           instance,
+                                           migration_info,
+                                           self.nodes_ip[target_node])
+
+    msg = result.RemoteFailMsg()
+    if msg:
+      logging.error("Instance pre-migration failed, trying to revert"
+                    " disk status: %s", msg)
+      self._AbortMigration()
+      self._RevertDiskStatus()
+      raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
+                               (instance.name, msg))
+
+    self.feedback_fn("* migrating instance to %s" % target_node)
+    time.sleep(10)
+    result = self.rpc.call_instance_migrate(source_node, instance,
+                                            self.nodes_ip[target_node],
+                                            self.op.live)
+    msg = result.RemoteFailMsg()
+    if msg:
+      logging.error("Instance migration failed, trying to revert"
+                    " disk status: %s", msg)
+      self._AbortMigration()
+      self._RevertDiskStatus()
+      raise errors.OpExecError("Could not migrate instance %s: %s" %
+                               (instance.name, msg))
+    time.sleep(10)
+
+    instance.primary_node = target_node
+    # distribute new instance config to the other nodes
+    self.cfg.Update(instance)
 
-def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
-  """Create a tree of block devices on a secondary node.
+    result = self.rpc.call_finalize_migration(target_node,
+                                              instance,
+                                              migration_info,
+                                              True)
+    msg = result.RemoteFailMsg()
+    if msg:
+      logging.error("Instance migration succeeded, but finalization failed:"
+                    " %s" % msg)
+      raise errors.OpExecError("Could not finalize instance migration: %s" %
+                               msg)
+
+    self._EnsureSecondary(source_node)
+    self._WaitUntilSync()
+    self._GoStandalone()
+    self._GoReconnect(False)
+    self._WaitUntilSync()
+
+    self.feedback_fn("* done")
+
+  def Exec(self, feedback_fn):
+    """Perform the migration.
+
+    """
+    self.feedback_fn = feedback_fn
+
+    self.source_node = self.instance.primary_node
+    self.target_node = self.instance.secondary_nodes[0]
+    self.all_nodes = [self.source_node, self.target_node]
+    self.nodes_ip = {
+      self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
+      self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
+      }
+    if self.op.cleanup:
+      return self._ExecCleanup()
+    else:
+      return self._ExecMigration()
+
+
+def _CreateBlockDev(lu, node, instance, device, force_create,
+                    info, force_open):
+  """Create a tree of block devices on a given node.
 
   If this device type has to be created on secondaries, create it and
   all its children.
 
   If not, just recurse to children keeping the same 'force' value.
 
+  @param lu: the lu on whose behalf we execute
+  @param node: the node on which to create the device
+  @type instance: L{objects.Instance}
+  @param instance: the instance which owns the device
+  @type device: L{objects.Disk}
+  @param device: the device to create
+  @type force_create: boolean
+  @param force_create: whether to force creation of this device; this
+      will be change to True whenever we find a device which has
+      CreateOnSecondary() attribute
+  @param info: the extra 'metadata' we should attach to the device
+      (this will be represented as a LVM tag)
+  @type force_open: boolean
+  @param force_open: this parameter will be passes to the
+      L{backend.CreateBlockDevice} function where it specifies
+      whether we run on primary or not, and it affects both
+      the child assembly and the device own Open() execution
+
   """
   if device.CreateOnSecondary():
-    force = True
+    force_create = True
+
   if device.children:
     for child in device.children:
-      if not _CreateBlockDevOnSecondary(cfg, node, instance,
-                                        child, force, info):
-        return False
+      _CreateBlockDev(lu, node, instance, child, force_create,
+                      info, force_open)
 
-  if not force:
-    return True
-  cfg.SetDiskID(device, node)
-  new_id = rpc.call_blockdev_create(node, device, device.size,
-                                    instance.name, False, info)
-  if not new_id:
-    return False
+  if not force_create:
+    return
+
+  _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
+
+
+def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
+  """Create a single block device on a given node.
+
+  This will not recurse over children of the device, so they must be
+  created in advance.
+
+  @param lu: the lu on whose behalf we execute
+  @param node: the node on which to create the device
+  @type instance: L{objects.Instance}
+  @param instance: the instance which owns the device
+  @type device: L{objects.Disk}
+  @param device: the device to create
+  @param info: the extra 'metadata' we should attach to the device
+      (this will be represented as a LVM tag)
+  @type force_open: boolean
+  @param force_open: this parameter will be passes to the
+      L{backend.CreateBlockDevice} function where it specifies
+      whether we run on primary or not, and it affects both
+      the child assembly and the device own Open() execution
+
+  """
+  lu.cfg.SetDiskID(device, node)
+  result = lu.rpc.call_blockdev_create(node, device, device.size,
+                                       instance.name, force_open, info)
+  msg = result.RemoteFailMsg()
+  if msg:
+    raise errors.OpExecError("Can't create block device %s on"
+                             " node %s for instance %s: %s" %
+                             (device, node, instance.name, msg))
   if device.physical_id is None:
-    device.physical_id = new_id
-  return True
+    device.physical_id = result.data[1]
 
 
-def _GenerateUniqueNames(cfg, exts):
+def _GenerateUniqueNames(lu, exts):
   """Generate a suitable LV name.
 
   This will generate a logical volume name for the given instance.
@@ -2798,74 +3888,93 @@ def _GenerateUniqueNames(cfg, exts):
   """
   results = []
   for val in exts:
-    new_id = cfg.GenerateUniqueID()
+    new_id = lu.cfg.GenerateUniqueID()
     results.append("%s%s" % (new_id, val))
   return results
 
 
-def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
+def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
+                         p_minor, s_minor):
   """Generate a drbd8 device complete with its children.
 
   """
-  port = cfg.AllocatePort()
-  vgname = cfg.GetVGName()
+  port = lu.cfg.AllocatePort()
+  vgname = lu.cfg.GetVGName()
+  shared_secret = lu.cfg.GenerateDRBDSecret()
   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
                           logical_id=(vgname, names[0]))
   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
                           logical_id=(vgname, names[1]))
   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
-                          logical_id = (primary, secondary, port),
-                          children = [dev_data, dev_meta],
+                          logical_id=(primary, secondary, port,
+                                      p_minor, s_minor,
+                                      shared_secret),
+                          children=[dev_data, dev_meta],
                           iv_name=iv_name)
   return drbd_dev
 
 
-def _GenerateDiskTemplate(cfg, template_name,
+def _GenerateDiskTemplate(lu, template_name,
                           instance_name, primary_node,
-                          secondary_nodes, disk_sz, swap_sz,
-                          file_storage_dir, file_driver):
+                          secondary_nodes, disk_info,
+                          file_storage_dir, file_driver,
+                          base_index):
   """Generate the entire disk layout for a given template type.
 
   """
   #TODO: compute space requirements
 
-  vgname = cfg.GetVGName()
+  vgname = lu.cfg.GetVGName()
+  disk_count = len(disk_info)
+  disks = []
   if template_name == constants.DT_DISKLESS:
-    disks = []
+    pass
   elif template_name == constants.DT_PLAIN:
     if len(secondary_nodes) != 0:
       raise errors.ProgrammerError("Wrong template configuration")
 
-    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
-    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
-                           logical_id=(vgname, names[0]),
-                           iv_name = "sda")
-    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
-                           logical_id=(vgname, names[1]),
-                           iv_name = "sdb")
-    disks = [sda_dev, sdb_dev]
+    names = _GenerateUniqueNames(lu, [".disk%d" % i
+                                      for i in range(disk_count)])
+    for idx, disk in enumerate(disk_info):
+      disk_index = idx + base_index
+      disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
+                              logical_id=(vgname, names[idx]),
+                              iv_name="disk/%d" % disk_index,
+                              mode=disk["mode"])
+      disks.append(disk_dev)
   elif template_name == constants.DT_DRBD8:
     if len(secondary_nodes) != 1:
       raise errors.ProgrammerError("Wrong template configuration")
     remote_node = secondary_nodes[0]
-    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
-                                       ".sdb_data", ".sdb_meta"])
-    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
-                                         disk_sz, names[0:2], "sda")
-    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
-                                         swap_sz, names[2:4], "sdb")
-    disks = [drbd_sda_dev, drbd_sdb_dev]
+    minors = lu.cfg.AllocateDRBDMinor(
+      [primary_node, remote_node] * len(disk_info), instance_name)
+
+    names = []
+    for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
+                                               for i in range(disk_count)]):
+      names.append(lv_prefix + "_data")
+      names.append(lv_prefix + "_meta")
+    for idx, disk in enumerate(disk_info):
+      disk_index = idx + base_index
+      disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
+                                      disk["size"], names[idx*2:idx*2+2],
+                                      "disk/%d" % disk_index,
+                                      minors[idx*2], minors[idx*2+1])
+      disk_dev.mode = disk["mode"]
+      disks.append(disk_dev)
   elif template_name == constants.DT_FILE:
     if len(secondary_nodes) != 0:
       raise errors.ProgrammerError("Wrong template configuration")
 
-    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
-                                iv_name="sda", logical_id=(file_driver,
-                                "%s/sda" % file_storage_dir))
-    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
-                                iv_name="sdb", logical_id=(file_driver,
-                                "%s/sdb" % file_storage_dir))
-    disks = [file_sda_dev, file_sdb_dev]
+    for idx, disk in enumerate(disk_info):
+      disk_index = idx + base_index
+      disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
+                              iv_name="disk/%d" % disk_index,
+                              logical_id=(file_driver,
+                                          "%s/disk%d" % (file_storage_dir,
+                                                         idx)),
+                              mode=disk["mode"])
+      disks.append(disk_dev)
   else:
     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
   return disks
@@ -2878,54 +3987,45 @@ def _GetInstanceInfoText(instance):
   return "originstname+%s" % instance.name
 
 
-def _CreateDisks(cfg, instance):
+def _CreateDisks(lu, instance):
   """Create all disks for an instance.
 
   This abstracts away some work from AddInstance.
 
-  Args:
-    instance: the instance object
-
-  Returns:
-    True or False showing the success of the creation process
+  @type lu: L{LogicalUnit}
+  @param lu: the logical unit on whose behalf we execute
+  @type instance: L{objects.Instance}
+  @param instance: the instance whose disks we should create
+  @rtype: boolean
+  @return: the success of the creation
 
   """
   info = _GetInstanceInfoText(instance)
+  pnode = instance.primary_node
 
   if instance.disk_template == constants.DT_FILE:
     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
-    result = rpc.call_file_storage_dir_create(instance.primary_node,
-                                              file_storage_dir)
+    result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
 
-    if not result:
-      logger.Error("Could not connect to node '%s'" % instance.primary_node)
-      return False
+    if result.failed or not result.data:
+      raise errors.OpExecError("Could not connect to node '%s'" % pnode)
 
-    if not result[0]:
-      logger.Error("failed to create directory '%s'" % file_storage_dir)
-      return False
+    if not result.data[0]:
+      raise errors.OpExecError("Failed to create directory '%s'" %
+                               file_storage_dir)
 
+  # Note: this needs to be kept in sync with adding of disks in
+  # LUSetInstanceParams
   for device in instance.disks:
-    logger.Info("creating volume %s for instance %s" %
-                (device.iv_name, instance.name))
-    #HARDCODE
-    for secondary_node in instance.secondary_nodes:
-      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
-                                        device, False, info):
-        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
-                     (device.iv_name, device, secondary_node))
-        return False
+    logging.info("Creating volume %s for instance %s",
+                 device.iv_name, instance.name)
     #HARDCODE
-    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
-                                    instance, device, info):
-      logger.Error("failed to create volume %s on primary!" %
-                   device.iv_name)
-      return False
-
-  return True
+    for node in instance.all_nodes:
+      f_create = node == pnode
+      _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
 
 
-def _RemoveDisks(instance, cfg):
+def _RemoveDisks(lu, instance):
   """Remove all disks for an instance.
 
   This abstracts away some work from `AddInstance()` and
@@ -2933,47 +4033,47 @@ def _RemoveDisks(instance, cfg):
   be removed, the removal will continue with the other ones (compare
   with `_CreateDisks()`).
 
-  Args:
-    instance: the instance object
-
-  Returns:
-    True or False showing the success of the removal proces
+  @type lu: L{LogicalUnit}
+  @param lu: the logical unit on whose behalf we execute
+  @type instance: L{objects.Instance}
+  @param instance: the instance whose disks we should remove
+  @rtype: boolean
+  @return: the success of the removal
 
   """
-  logger.Info("removing block devices for instance %s" % instance.name)
+  logging.info("Removing block devices for instance %s", instance.name)
 
   result = True
   for device in instance.disks:
     for node, disk in device.ComputeNodeTree(instance.primary_node):
-      cfg.SetDiskID(disk, node)
-      if not rpc.call_blockdev_remove(node, disk):
-        logger.Error("could not remove block device %s on node %s,"
-                     " continuing anyway" %
-                     (device.iv_name, node))
+      lu.cfg.SetDiskID(disk, node)
+      result = lu.rpc.call_blockdev_remove(node, disk)
+      if result.failed or not result.data:
+        lu.proc.LogWarning("Could not remove block device %s on node %s,"
+                           " continuing anyway", device.iv_name, node)
         result = False
 
   if instance.disk_template == constants.DT_FILE:
     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
-    if not rpc.call_file_storage_dir_remove(instance.primary_node,
-                                            file_storage_dir):
-      logger.Error("could not remove directory '%s'" % file_storage_dir)
+    result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
+                                                 file_storage_dir)
+    if result.failed or not result.data:
+      logging.error("Could not remove directory '%s'", file_storage_dir)
       result = False
 
   return result
 
 
-def _ComputeDiskSize(disk_template, disk_size, swap_size):
+def _ComputeDiskSize(disk_template, disks):
   """Compute disk size requirements in the volume group
 
-  This is currently hard-coded for the two-drive layout.
-
   """
   # Required free disk space as a function of disk and swap space
   req_size_dict = {
     constants.DT_DISKLESS: None,
-    constants.DT_PLAIN: disk_size + swap_size,
-    # 256 MB are added for drbd metadata, 128MB for each drbd device
-    constants.DT_DRBD8: disk_size + swap_size + 256,
+    constants.DT_PLAIN: sum(d["size"] for d in disks),
+    # 128 MB are added for drbd metadata for each disk
+    constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
     constants.DT_FILE: None,
   }
 
@@ -2984,34 +4084,233 @@ def _ComputeDiskSize(disk_template, disk_size, swap_size):
   return req_size_dict[disk_template]
 
 
+def _CheckHVParams(lu, nodenames, hvname, hvparams):
+  """Hypervisor parameter validation.
+
+  This function abstract the hypervisor parameter validation to be
+  used in both instance create and instance modify.
+
+  @type lu: L{LogicalUnit}
+  @param lu: the logical unit for which we check
+  @type nodenames: list
+  @param nodenames: the list of nodes on which we should check
+  @type hvname: string
+  @param hvname: the name of the hypervisor we should use
+  @type hvparams: dict
+  @param hvparams: the parameters which we need to check
+  @raise errors.OpPrereqError: if the parameters are not valid
+
+  """
+  hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
+                                                  hvname,
+                                                  hvparams)
+  for node in nodenames:
+    info = hvinfo[node]
+    if info.offline:
+      continue
+    info.Raise()
+    if not info.data or not isinstance(info.data, (tuple, list)):
+      raise errors.OpPrereqError("Cannot get current information"
+                                 " from node '%s' (%s)" % (node, info.data))
+    if not info.data[0]:
+      raise errors.OpPrereqError("Hypervisor parameter validation failed:"
+                                 " %s" % info.data[1])
+
+
 class LUCreateInstance(LogicalUnit):
   """Create an instance.
 
   """
   HPATH = "instance-add"
   HTYPE = constants.HTYPE_INSTANCE
-  _OP_REQP = ["instance_name", "mem_size", "disk_size",
-              "disk_template", "swap_size", "mode", "start", "vcpus",
-              "wait_for_sync", "ip_check", "mac"]
+  _OP_REQP = ["instance_name", "disks", "disk_template",
+              "mode", "start",
+              "wait_for_sync", "ip_check", "nics",
+              "hvparams", "beparams"]
+  REQ_BGL = False
+
+  def _ExpandNode(self, node):
+    """Expands and checks one node name.
+
+    """
+    node_full = self.cfg.ExpandNodeName(node)
+    if node_full is None:
+      raise errors.OpPrereqError("Unknown node %s" % node)
+    return node_full
+
+  def ExpandNames(self):
+    """ExpandNames for CreateInstance.
+
+    Figure out the right locks for instance creation.
+
+    """
+    self.needed_locks = {}
+
+    # set optional parameters to none if they don't exist
+    for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
+      if not hasattr(self.op, attr):
+        setattr(self.op, attr, None)
+
+    # cheap checks, mostly valid constants given
+
+    # verify creation mode
+    if self.op.mode not in (constants.INSTANCE_CREATE,
+                            constants.INSTANCE_IMPORT):
+      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
+                                 self.op.mode)
+
+    # disk template and mirror node verification
+    if self.op.disk_template not in constants.DISK_TEMPLATES:
+      raise errors.OpPrereqError("Invalid disk template name")
+
+    if self.op.hypervisor is None:
+      self.op.hypervisor = self.cfg.GetHypervisorType()
+
+    cluster = self.cfg.GetClusterInfo()
+    enabled_hvs = cluster.enabled_hypervisors
+    if self.op.hypervisor not in enabled_hvs:
+      raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
+                                 " cluster (%s)" % (self.op.hypervisor,
+                                  ",".join(enabled_hvs)))
+
+    # check hypervisor parameter syntax (locally)
+
+    filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
+                                  self.op.hvparams)
+    hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
+    hv_type.CheckParameterSyntax(filled_hvp)
+
+    # fill and remember the beparams dict
+    utils.CheckBEParams(self.op.beparams)
+    self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
+                                    self.op.beparams)
+
+    #### instance parameters check
+
+    # instance name verification
+    hostname1 = utils.HostInfo(self.op.instance_name)
+    self.op.instance_name = instance_name = hostname1.name
+
+    # this is just a preventive check, but someone might still add this
+    # instance in the meantime, and creation will fail at lock-add time
+    if instance_name in self.cfg.GetInstanceList():
+      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
+                                 instance_name)
+
+    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
+
+    # NIC buildup
+    self.nics = []
+    for nic in self.op.nics:
+      # ip validity checks
+      ip = nic.get("ip", None)
+      if ip is None or ip.lower() == "none":
+        nic_ip = None
+      elif ip.lower() == constants.VALUE_AUTO:
+        nic_ip = hostname1.ip
+      else:
+        if not utils.IsValidIP(ip):
+          raise errors.OpPrereqError("Given IP address '%s' doesn't look"
+                                     " like a valid IP" % ip)
+        nic_ip = ip
+
+      # MAC address verification
+      mac = nic.get("mac", constants.VALUE_AUTO)
+      if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
+        if not utils.IsValidMac(mac.lower()):
+          raise errors.OpPrereqError("Invalid MAC address specified: %s" %
+                                     mac)
+      # bridge verification
+      bridge = nic.get("bridge", None)
+      if bridge is None:
+        bridge = self.cfg.GetDefBridge()
+      self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
+
+    # disk checks/pre-build
+    self.disks = []
+    for disk in self.op.disks:
+      mode = disk.get("mode", constants.DISK_RDWR)
+      if mode not in constants.DISK_ACCESS_SET:
+        raise errors.OpPrereqError("Invalid disk access mode '%s'" %
+                                   mode)
+      size = disk.get("size", None)
+      if size is None:
+        raise errors.OpPrereqError("Missing disk size")
+      try:
+        size = int(size)
+      except ValueError:
+        raise errors.OpPrereqError("Invalid disk size '%s'" % size)
+      self.disks.append({"size": size, "mode": mode})
+
+    # used in CheckPrereq for ip ping check
+    self.check_ip = hostname1.ip
+
+    # file storage checks
+    if (self.op.file_driver and
+        not self.op.file_driver in constants.FILE_DRIVER):
+      raise errors.OpPrereqError("Invalid file driver name '%s'" %
+                                 self.op.file_driver)
+
+    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
+      raise errors.OpPrereqError("File storage directory path not absolute")
+
+    ### Node/iallocator related checks
+    if [self.op.iallocator, self.op.pnode].count(None) != 1:
+      raise errors.OpPrereqError("One and only one of iallocator and primary"
+                                 " node must be given")
+
+    if self.op.iallocator:
+      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+    else:
+      self.op.pnode = self._ExpandNode(self.op.pnode)
+      nodelist = [self.op.pnode]
+      if self.op.snode is not None:
+        self.op.snode = self._ExpandNode(self.op.snode)
+        nodelist.append(self.op.snode)
+      self.needed_locks[locking.LEVEL_NODE] = nodelist
+
+    # in case of import lock the source node too
+    if self.op.mode == constants.INSTANCE_IMPORT:
+      src_node = getattr(self.op, "src_node", None)
+      src_path = getattr(self.op, "src_path", None)
+
+      if src_path is None:
+        self.op.src_path = src_path = self.op.instance_name
+
+      if src_node is None:
+        self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+        self.op.src_node = None
+        if os.path.isabs(src_path):
+          raise errors.OpPrereqError("Importing an instance from an absolute"
+                                     " path requires a source node option.")
+      else:
+        self.op.src_node = src_node = self._ExpandNode(src_node)
+        if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
+          self.needed_locks[locking.LEVEL_NODE].append(src_node)
+        if not os.path.isabs(src_path):
+          self.op.src_path = src_path = \
+            os.path.join(constants.EXPORT_DIR, src_path)
+
+    else: # INSTANCE_CREATE
+      if getattr(self.op, "os_type", None) is None:
+        raise errors.OpPrereqError("No guest OS specified")
 
   def _RunAllocator(self):
     """Run the allocator based on input opcode.
 
     """
-    disks = [{"size": self.op.disk_size, "mode": "w"},
-             {"size": self.op.swap_size, "mode": "w"}]
-    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
-             "bridge": self.op.bridge}]
-    ial = IAllocator(self.cfg, self.sstore,
+    nics = [n.ToDict() for n in self.nics]
+    ial = IAllocator(self,
                      mode=constants.IALLOCATOR_MODE_ALLOC,
                      name=self.op.instance_name,
                      disk_template=self.op.disk_template,
                      tags=[],
                      os=self.op.os_type,
-                     vcpus=self.op.vcpus,
-                     mem_size=self.op.mem_size,
-                     disks=disks,
+                     vcpus=self.be_full[constants.BE_VCPUS],
+                     mem_size=self.be_full[constants.BE_MEMORY],
+                     disks=self.disks,
                      nics=nics,
+                     hypervisor=self.op.hypervisor,
                      )
 
     ial.Run(self.op.iallocator)
@@ -3023,12 +4322,12 @@ class LUCreateInstance(LogicalUnit):
     if len(ial.nodes) != ial.required_nodes:
       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
                                  " of nodes (%s), required %s" %
-                                 (len(ial.nodes), ial.required_nodes))
+                                 (self.op.iallocator, len(ial.nodes),
+                                  ial.required_nodes))
     self.op.pnode = ial.nodes[0]
-    logger.ToStdout("Selected nodes for the instance: %s" %
-                    (", ".join(ial.nodes),))
-    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
-                (self.op.instance_name, self.op.iallocator, ial.nodes))
+    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
+                 self.op.instance_name, self.op.iallocator,
+                 ", ".join(ial.nodes))
     if ial.required_nodes == 2:
       self.op.snode = ial.nodes[1]
 
@@ -3040,26 +4339,25 @@ class LUCreateInstance(LogicalUnit):
     """
     env = {
       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
-      "INSTANCE_DISK_SIZE": self.op.disk_size,
-      "INSTANCE_SWAP_SIZE": self.op.swap_size,
+      "INSTANCE_DISK_SIZE": ",".join(str(d["size"]) for d in self.disks),
       "INSTANCE_ADD_MODE": self.op.mode,
       }
     if self.op.mode == constants.INSTANCE_IMPORT:
       env["INSTANCE_SRC_NODE"] = self.op.src_node
       env["INSTANCE_SRC_PATH"] = self.op.src_path
-      env["INSTANCE_SRC_IMAGE"] = self.src_image
+      env["INSTANCE_SRC_IMAGES"] = self.src_images
 
     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
       primary_node=self.op.pnode,
       secondary_nodes=self.secondaries,
       status=self.instance_status,
       os_type=self.op.os_type,
-      memory=self.op.mem_size,
-      vcpus=self.op.vcpus,
-      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
+      memory=self.be_full[constants.BE_MEMORY],
+      vcpus=self.be_full[constants.BE_VCPUS],
+      nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
     ))
 
-    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
+    nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
           self.secondaries)
     return env, nl, nl
 
@@ -3068,42 +4366,38 @@ class LUCreateInstance(LogicalUnit):
     """Check prerequisites.
 
     """
-    # set optional parameters to none if they don't exist
-    for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
-                 "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
-                 "hvm_nic_type", "hvm_disk_type", "vnc_bind_address"]:
-      if not hasattr(self.op, attr):
-        setattr(self.op, attr, None)
-
-    if self.op.mode not in (constants.INSTANCE_CREATE,
-                            constants.INSTANCE_IMPORT):
-      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
-                                 self.op.mode)
-
     if (not self.cfg.GetVGName() and
         self.op.disk_template not in constants.DTS_NOT_LVM):
       raise errors.OpPrereqError("Cluster does not support lvm-based"
                                  " instances")
 
-    if self.op.mode == constants.INSTANCE_IMPORT:
-      src_node = getattr(self.op, "src_node", None)
-      src_path = getattr(self.op, "src_path", None)
-      if src_node is None or src_path is None:
-        raise errors.OpPrereqError("Importing an instance requires source"
-                                   " node and path options")
-      src_node_full = self.cfg.ExpandNodeName(src_node)
-      if src_node_full is None:
-        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
-      self.op.src_node = src_node = src_node_full
 
-      if not os.path.isabs(src_path):
-        raise errors.OpPrereqError("The source path must be absolute")
-
-      export_info = rpc.call_export_info(src_node, src_path)
-
-      if not export_info:
+    if self.op.mode == constants.INSTANCE_IMPORT:
+      src_node = self.op.src_node
+      src_path = self.op.src_path
+
+      if src_node is None:
+        exp_list = self.rpc.call_export_list(
+          self.acquired_locks[locking.LEVEL_NODE])
+        found = False
+        for node in exp_list:
+          if not exp_list[node].failed and src_path in exp_list[node].data:
+            found = True
+            self.op.src_node = src_node = node
+            self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
+                                                       src_path)
+            break
+        if not found:
+          raise errors.OpPrereqError("No export found for relative path %s" %
+                                      src_path)
+
+      _CheckNodeOnline(self, src_node)
+      result = self.rpc.call_export_info(src_node, src_path)
+      result.Raise()
+      if not result.data:
         raise errors.OpPrereqError("No export found in dir %s" % src_path)
 
+      export_info = result.data
       if not export_info.has_section(constants.INISECT_EXP):
         raise errors.ProgrammerError("Corrupted export config")
 
@@ -3112,127 +4406,88 @@ class LUCreateInstance(LogicalUnit):
         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
                                    (ei_version, constants.EXPORT_VERSION))
 
-      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
-        raise errors.OpPrereqError("Can't import instance with more than"
-                                   " one data disk")
+      # Check that the new instance doesn't have less disks than the export
+      instance_disks = len(self.disks)
+      export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
+      if instance_disks < export_disks:
+        raise errors.OpPrereqError("Not enough disks to import."
+                                   " (instance: %d, export: %d)" %
+                                   (instance_disks, export_disks))
 
-      # FIXME: are the old os-es, disk sizes, etc. useful?
       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
-      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
-                                                         'disk0_dump'))
-      self.src_image = diskimage
-    else: # INSTANCE_CREATE
-      if getattr(self.op, "os_type", None) is None:
-        raise errors.OpPrereqError("No guest OS specified")
-
-    #### instance parameters check
-
-    # disk template and mirror node verification
-    if self.op.disk_template not in constants.DISK_TEMPLATES:
-      raise errors.OpPrereqError("Invalid disk template name")
-
-    # instance name verification
-    hostname1 = utils.HostInfo(self.op.instance_name)
+      disk_images = []
+      for idx in range(export_disks):
+        option = 'disk%d_dump' % idx
+        if export_info.has_option(constants.INISECT_INS, option):
+          # FIXME: are the old os-es, disk sizes, etc. useful?
+          export_name = export_info.get(constants.INISECT_INS, option)
+          image = os.path.join(src_path, export_name)
+          disk_images.append(image)
+        else:
+          disk_images.append(False)
 
-    self.op.instance_name = instance_name = hostname1.name
-    instance_list = self.cfg.GetInstanceList()
-    if instance_name in instance_list:
-      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
-                                 instance_name)
+      self.src_images = disk_images
 
-    # ip validity checks
-    ip = getattr(self.op, "ip", None)
-    if ip is None or ip.lower() == "none":
-      inst_ip = None
-    elif ip.lower() == "auto":
-      inst_ip = hostname1.ip
-    else:
-      if not utils.IsValidIP(ip):
-        raise errors.OpPrereqError("given IP address '%s' doesn't look"
-                                   " like a valid IP" % ip)
-      inst_ip = ip
-    self.inst_ip = self.op.ip = inst_ip
+      old_name = export_info.get(constants.INISECT_INS, 'name')
+      # FIXME: int() here could throw a ValueError on broken exports
+      exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
+      if self.op.instance_name == old_name:
+        for idx, nic in enumerate(self.nics):
+          if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
+            nic_mac_ini = 'nic%d_mac' % idx
+            nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
 
+    # ip ping checks (we use the same ip that was resolved in ExpandNames)
     if self.op.start and not self.op.ip_check:
       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
                                  " adding an instance in start mode")
 
     if self.op.ip_check:
-      if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
+      if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
         raise errors.OpPrereqError("IP %s of instance %s already in use" %
-                                   (hostname1.ip, instance_name))
-
-    # MAC address verification
-    if self.op.mac != "auto":
-      if not utils.IsValidMac(self.op.mac.lower()):
-        raise errors.OpPrereqError("invalid MAC address specified: %s" %
-                                   self.op.mac)
-
-    # bridge verification
-    bridge = getattr(self.op, "bridge", None)
-    if bridge is None:
-      self.op.bridge = self.cfg.GetDefBridge()
-    else:
-      self.op.bridge = bridge
-
-    # boot order verification
-    if self.op.hvm_boot_order is not None:
-      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
-        raise errors.OpPrereqError("invalid boot order specified,"
-                                   " must be one or more of [acdn]")
-    # file storage checks
-    if (self.op.file_driver and
-        not self.op.file_driver in constants.FILE_DRIVER):
-      raise errors.OpPrereqError("Invalid file driver name '%s'" %
-                                 self.op.file_driver)
+                                   (self.check_ip, self.op.instance_name))
 
-    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
-      raise errors.OpPrereqError("File storage directory not a relative"
-                                 " path")
     #### allocator run
 
-    if [self.op.iallocator, self.op.pnode].count(None) != 1:
-      raise errors.OpPrereqError("One and only one of iallocator and primary"
-                                 " node must be given")
-
     if self.op.iallocator is not None:
       self._RunAllocator()
 
     #### node related checks
 
     # check primary node
-    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
-    if pnode is None:
-      raise errors.OpPrereqError("Primary node '%s' is unknown" %
-                                 self.op.pnode)
-    self.op.pnode = pnode.name
-    self.pnode = pnode
+    self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
+    assert self.pnode is not None, \
+      "Cannot retrieve locked node %s" % self.op.pnode
+    if pnode.offline:
+      raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
+                                 pnode.name)
+
     self.secondaries = []
 
     # mirror node verification
     if self.op.disk_template in constants.DTS_NET_MIRROR:
-      if getattr(self.op, "snode", None) is None:
+      if self.op.snode is None:
         raise errors.OpPrereqError("The networked disk templates need"
                                    " a mirror node")
-
-      snode_name = self.cfg.ExpandNodeName(self.op.snode)
-      if snode_name is None:
-        raise errors.OpPrereqError("Unknown secondary node '%s'" %
-                                   self.op.snode)
-      elif snode_name == pnode.name:
+      if self.op.snode == pnode.name:
         raise errors.OpPrereqError("The secondary node cannot be"
                                    " the primary node.")
-      self.secondaries.append(snode_name)
+      self.secondaries.append(self.op.snode)
+      _CheckNodeOnline(self, self.op.snode)
+
+    nodenames = [pnode.name] + self.secondaries
 
     req_size = _ComputeDiskSize(self.op.disk_template,
-                                self.op.disk_size, self.op.swap_size)
+                                self.disks)
 
     # Check lv size requirements
     if req_size is not None:
-      nodenames = [pnode.name] + self.secondaries
-      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
+      nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
+                                         self.op.hypervisor)
       for node in nodenames:
-        info = nodeinfo.get(node, None)
+        info = nodeinfo[node]
+        info.Raise()
+        info = info.data
         if not info:
           raise errors.OpPrereqError("Cannot get current information"
                                      " from node '%s'" % node)
@@ -3245,60 +4500,32 @@ class LUCreateInstance(LogicalUnit):
                                      " %d MB available, %d MB required" %
                                      (node, info['vg_free'], req_size))
 
+    _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
+
     # os verification
-    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
-    if not os_obj:
+    result = self.rpc.call_os_get(pnode.name, self.op.os_type)
+    result.Raise()
+    if not isinstance(result.data, objects.OS):
       raise errors.OpPrereqError("OS '%s' not in supported os list for"
-                                 " primary node"  % self.op.os_type)
-
-    if self.op.kernel_path == constants.VALUE_NONE:
-      raise errors.OpPrereqError("Can't set instance kernel to none")
-
+                                 " primary node"  % self.op.os_type)
 
     # bridge check on primary node
-    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
-      raise errors.OpPrereqError("target bridge '%s' does not exist on"
-                                 " destination node '%s'" %
-                                 (self.op.bridge, pnode.name))
+    bridges = [n.bridge for n in self.nics]
+    result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
+    result.Raise()
+    if not result.data:
+      raise errors.OpPrereqError("One of the target bridges '%s' does not"
+                                 " exist on destination node '%s'" %
+                                 (",".join(bridges), pnode.name))
 
     # memory check on primary node
     if self.op.start:
-      _CheckNodeFreeMemory(self.cfg, self.pnode.name,
+      _CheckNodeFreeMemory(self, self.pnode.name,
                            "creating instance %s" % self.op.instance_name,
-                           self.op.mem_size)
-
-    # hvm_cdrom_image_path verification
-    if self.op.hvm_cdrom_image_path is not None:
-      if not os.path.isabs(self.op.hvm_cdrom_image_path):
-        raise errors.OpPrereqError("The path to the HVM CDROM image must"
-                                   " be an absolute path or None, not %s" %
-                                   self.op.hvm_cdrom_image_path)
-      if not os.path.isfile(self.op.hvm_cdrom_image_path):
-        raise errors.OpPrereqError("The HVM CDROM image must either be a"
-                                   " regular file or a symlink pointing to"
-                                   " an existing regular file, not %s" %
-                                   self.op.hvm_cdrom_image_path)
-
-    # vnc_bind_address verification
-    if self.op.vnc_bind_address is not None:
-      if not utils.IsValidIP(self.op.vnc_bind_address):
-        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
-                                   " like a valid IP address" %
-                                   self.op.vnc_bind_address)
-
-    # Xen HVM device type checks
-    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
-      if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
-        raise errors.OpPrereqError("Invalid NIC type %s specified for Xen HVM"
-                                   " hypervisor" % self.op.hvm_nic_type)
-      if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
-        raise errors.OpPrereqError("Invalid disk type %s specified for Xen HVM"
-                                   " hypervisor" % self.op.hvm_disk_type)
+                           self.be_full[constants.BE_MEMORY],
+                           self.op.hypervisor)
 
-    if self.op.start:
-      self.instance_status = 'up'
-    else:
-      self.instance_status = 'down'
+    self.instance_status = self.op.start
 
   def Exec(self, feedback_fn):
     """Create and add the instance to the cluster.
@@ -3307,23 +4534,18 @@ class LUCreateInstance(LogicalUnit):
     instance = self.op.instance_name
     pnode_name = self.pnode.name
 
-    if self.op.mac == "auto":
-      mac_address = self.cfg.GenerateMAC()
-    else:
-      mac_address = self.op.mac
-
-    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
-    if self.inst_ip is not None:
-      nic.ip = self.inst_ip
+    for nic in self.nics:
+      if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
+        nic.mac = self.cfg.GenerateMAC()
 
-    ht_kind = self.sstore.GetHypervisorType()
+    ht_kind = self.op.hypervisor
     if ht_kind in constants.HTS_REQ_PORT:
       network_port = self.cfg.AllocatePort()
     else:
       network_port = None
 
-    if self.op.vnc_bind_address is None:
-      self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
+    ##if self.op.vnc_bind_address is None:
+    ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
 
     # this is needed because os.path.join does not accept None arguments
     if self.op.file_storage_dir is None:
@@ -3333,63 +4555,73 @@ class LUCreateInstance(LogicalUnit):
 
     # build the full file storage dir path
     file_storage_dir = os.path.normpath(os.path.join(
-                                        self.sstore.GetFileStorageDir(),
+                                        self.cfg.GetFileStorageDir(),
                                         string_file_storage_dir, instance))
 
 
-    disks = _GenerateDiskTemplate(self.cfg,
+    disks = _GenerateDiskTemplate(self,
                                   self.op.disk_template,
                                   instance, pnode_name,
-                                  self.secondaries, self.op.disk_size,
-                                  self.op.swap_size,
+                                  self.secondaries,
+                                  self.disks,
                                   file_storage_dir,
-                                  self.op.file_driver)
+                                  self.op.file_driver,
+                                  0)
 
     iobj = objects.Instance(name=instance, os=self.op.os_type,
                             primary_node=pnode_name,
-                            memory=self.op.mem_size,
-                            vcpus=self.op.vcpus,
-                            nics=[nic], disks=disks,
+                            nics=self.nics, disks=disks,
                             disk_template=self.op.disk_template,
-                            status=self.instance_status,
+                            admin_up=self.instance_status,
                             network_port=network_port,
-                            kernel_path=self.op.kernel_path,
-                            initrd_path=self.op.initrd_path,
-                            hvm_boot_order=self.op.hvm_boot_order,
-                            hvm_acpi=self.op.hvm_acpi,
-                            hvm_pae=self.op.hvm_pae,
-                            hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
-                            vnc_bind_address=self.op.vnc_bind_address,
-                            hvm_nic_type=self.op.hvm_nic_type,
-                            hvm_disk_type=self.op.hvm_disk_type,
+                            beparams=self.op.beparams,
+                            hvparams=self.op.hvparams,
+                            hypervisor=self.op.hypervisor,
                             )
 
     feedback_fn("* creating instance disks...")
-    if not _CreateDisks(self.cfg, iobj):
-      _RemoveDisks(iobj, self.cfg)
-      raise errors.OpExecError("Device creation failed, reverting...")
+    try:
+      _CreateDisks(self, iobj)
+    except errors.OpExecError:
+      self.LogWarning("Device creation failed, reverting...")
+      try:
+        _RemoveDisks(self, iobj)
+      finally:
+        self.cfg.ReleaseDRBDMinors(instance)
+        raise
 
     feedback_fn("adding instance %s to cluster config" % instance)
 
     self.cfg.AddInstance(iobj)
-    # Add the new instance to the Ganeti Lock Manager
-    self.context.glm.add(locking.LEVEL_INSTANCE, instance)
+    # Declare that we don't want to remove the instance lock anymore, as we've
+    # added the instance to the config
+    del self.remove_locks[locking.LEVEL_INSTANCE]
+    # Unlock all the nodes
+    if self.op.mode == constants.INSTANCE_IMPORT:
+      nodes_keep = [self.op.src_node]
+      nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
+                       if node != self.op.src_node]
+      self.context.glm.release(locking.LEVEL_NODE, nodes_release)
+      self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
+    else:
+      self.context.glm.release(locking.LEVEL_NODE)
+      del self.acquired_locks[locking.LEVEL_NODE]
 
     if self.op.wait_for_sync:
-      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
+      disk_abort = not _WaitForSync(self, iobj)
     elif iobj.disk_template in constants.DTS_NET_MIRROR:
       # make sure the disks are not degraded (still sync-ing is ok)
       time.sleep(15)
       feedback_fn("* checking mirrors status")
-      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
+      disk_abort = not _WaitForSync(self, iobj, oneshot=True)
     else:
       disk_abort = False
 
     if disk_abort:
-      _RemoveDisks(iobj, self.cfg)
+      _RemoveDisks(self, iobj)
       self.cfg.RemoveInstance(iobj.name)
-      # Remove the new instance from the Ganeti Lock Manager
-      self.context.glm.remove(locking.LEVEL_INSTANCE, iobj.name)
+      # Make sure the instance lock gets removed
+      self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
       raise errors.OpExecError("There are some degraded disks for"
                                " this instance")
 
@@ -3399,30 +4631,39 @@ class LUCreateInstance(LogicalUnit):
     if iobj.disk_template != constants.DT_DISKLESS:
       if self.op.mode == constants.INSTANCE_CREATE:
         feedback_fn("* running the instance OS create scripts...")
-        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
-          raise errors.OpExecError("could not add os for instance %s"
-                                   " on node %s" %
-                                   (instance, pnode_name))
+        result = self.rpc.call_instance_os_add(pnode_name, iobj)
+        msg = result.RemoteFailMsg()
+        if msg:
+          raise errors.OpExecError("Could not add os for instance %s"
+                                   " on node %s: %s" %
+                                   (instance, pnode_name, msg))
 
       elif self.op.mode == constants.INSTANCE_IMPORT:
         feedback_fn("* running the instance OS import scripts...")
         src_node = self.op.src_node
-        src_image = self.src_image
-        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
-                                                src_node, src_image):
-          raise errors.OpExecError("Could not import os for instance"
-                                   " %s on node %s" %
-                                   (instance, pnode_name))
+        src_images = self.src_images
+        cluster_name = self.cfg.GetClusterName()
+        import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
+                                                         src_node, src_images,
+                                                         cluster_name)
+        import_result.Raise()
+        for idx, result in enumerate(import_result.data):
+          if not result:
+            self.LogWarning("Could not import the image %s for instance"
+                            " %s, disk %d, on node %s" %
+                            (src_images[idx], instance, idx, pnode_name))
       else:
         # also checked in the prereq part
         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
                                      % self.op.mode)
 
     if self.op.start:
-      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
+      logging.info("Starting instance %s on node %s", instance, pnode_name)
       feedback_fn("* starting instance...")
-      if not rpc.call_instance_start(pnode_name, iobj, None):
-        raise errors.OpExecError("Could not start instance")
+      result = self.rpc.call_instance_start(pnode_name, iobj, None)
+      msg = result.RemoteFailMsg()
+      if msg:
+        raise errors.OpExecError("Could not start instance: %s" % msg)
 
 
 class LUConnectConsole(NoHooksLU):
@@ -3448,6 +4689,7 @@ class LUConnectConsole(NoHooksLU):
     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
     assert self.instance is not None, \
       "Cannot retrieve locked instance %s" % self.op.instance_name
+    _CheckNodeOnline(self, self.instance.primary_node)
 
   def Exec(self, feedback_fn):
     """Connect to the console of an instance
@@ -3456,17 +4698,22 @@ class LUConnectConsole(NoHooksLU):
     instance = self.instance
     node = instance.primary_node
 
-    node_insts = rpc.call_instance_list([node])[node]
-    if node_insts is False:
-      raise errors.OpExecError("Can't connect to node %s." % node)
+    node_insts = self.rpc.call_instance_list([node],
+                                             [instance.hypervisor])[node]
+    node_insts.Raise()
 
-    if instance.name not in node_insts:
+    if instance.name not in node_insts.data:
       raise errors.OpExecError("Instance %s is not running." % instance.name)
 
-    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
+    logging.debug("Connecting to console of %s on %s", instance.name, node)
 
-    hyper = hypervisor.GetHypervisor()
-    console_cmd = hyper.GetShellCommandForConsole(instance)
+    hyper = hypervisor.GetHypervisor(instance.hypervisor)
+    cluster = self.cfg.GetClusterInfo()
+    # beparams and hvparams are passed separately, to avoid editing the
+    # instance and then saving the defaults in the instance itself.
+    hvparams = cluster.FillHV(instance)
+    beparams = cluster.FillBE(instance)
+    console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
 
     # build ssh cmdline
     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
@@ -3479,12 +4726,63 @@ class LUReplaceDisks(LogicalUnit):
   HPATH = "mirrors-replace"
   HTYPE = constants.HTYPE_INSTANCE
   _OP_REQP = ["instance_name", "mode", "disks"]
+  REQ_BGL = False
+
+  def CheckArguments(self):
+    if not hasattr(self.op, "remote_node"):
+      self.op.remote_node = None
+    if not hasattr(self.op, "iallocator"):
+      self.op.iallocator = None
+
+    # check for valid parameter combination
+    cnt = [self.op.remote_node, self.op.iallocator].count(None)
+    if self.op.mode == constants.REPLACE_DISK_CHG:
+      if cnt == 2:
+        raise errors.OpPrereqError("When changing the secondary either an"
+                                   " iallocator script must be used or the"
+                                   " new node given")
+      elif cnt == 0:
+        raise errors.OpPrereqError("Give either the iallocator or the new"
+                                   " secondary, not both")
+    else: # not replacing the secondary
+      if cnt != 2:
+        raise errors.OpPrereqError("The iallocator and new node options can"
+                                   " be used only when changing the"
+                                   " secondary node")
+
+  def ExpandNames(self):
+    self._ExpandAndLockInstance()
+
+    if self.op.iallocator is not None:
+      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+    elif self.op.remote_node is not None:
+      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
+      if remote_node is None:
+        raise errors.OpPrereqError("Node '%s' not known" %
+                                   self.op.remote_node)
+      self.op.remote_node = remote_node
+      # Warning: do not remove the locking of the new secondary here
+      # unless DRBD8.AddChildren is changed to work in parallel;
+      # currently it doesn't since parallel invocations of
+      # FindUnusedMinor will conflict
+      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
+      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
+    else:
+      self.needed_locks[locking.LEVEL_NODE] = []
+      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+
+  def DeclareLocks(self, level):
+    # If we're not already locking all nodes in the set we have to declare the
+    # instance's primary/secondary nodes.
+    if (level == locking.LEVEL_NODE and
+        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
+      self._LockInstancesNodes()
 
   def _RunAllocator(self):
     """Compute a new secondary node using an IAllocator.
 
     """
-    ial = IAllocator(self.cfg, self.sstore,
+    ial = IAllocator(self,
                      mode=constants.IALLOCATOR_MODE_RELOC,
                      name=self.op.instance_name,
                      relocate_from=[self.sec_node])
@@ -3500,8 +4798,8 @@ class LUReplaceDisks(LogicalUnit):
                                  " of nodes (%s), required %s" %
                                  (len(ial.nodes), ial.required_nodes))
     self.op.remote_node = ial.nodes[0]
-    logger.ToStdout("Selected new secondary for the instance: %s" %
-                    self.op.remote_node)
+    self.LogInfo("Selected new secondary for the instance: %s",
+                 self.op.remote_node)
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -3514,9 +4812,9 @@ class LUReplaceDisks(LogicalUnit):
       "NEW_SECONDARY": self.op.remote_node,
       "OLD_SECONDARY": self.instance.secondary_nodes[0],
       }
-    env.update(_BuildInstanceHookEnvByObject(self.instance))
+    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
     nl = [
-      self.sstore.GetMasterNode(),
+      self.cfg.GetMasterNode(),
       self.instance.primary_node,
       ]
     if self.op.remote_node is not None:
@@ -3529,20 +4827,14 @@ class LUReplaceDisks(LogicalUnit):
     This checks that the instance is in the cluster.
 
     """
-    if not hasattr(self.op, "remote_node"):
-      self.op.remote_node = None
-
-    instance = self.cfg.GetInstanceInfo(
-      self.cfg.ExpandInstanceName(self.op.instance_name))
-    if instance is None:
-      raise errors.OpPrereqError("Instance '%s' not known" %
-                                 self.op.instance_name)
+    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
+    assert instance is not None, \
+      "Cannot retrieve locked instance %s" % self.op.instance_name
     self.instance = instance
-    self.op.instance_name = instance.name
 
-    if instance.disk_template not in constants.DTS_NET_MIRROR:
-      raise errors.OpPrereqError("Instance's disk layout is not"
-                                 " network mirrored.")
+    if instance.disk_template != constants.DT_DRBD8:
+      raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
+                                 " instances")
 
     if len(instance.secondary_nodes) != 1:
       raise errors.OpPrereqError("The instance has a strange layout,"
@@ -3551,75 +4843,63 @@ class LUReplaceDisks(LogicalUnit):
 
     self.sec_node = instance.secondary_nodes[0]
 
-    ia_name = getattr(self.op, "iallocator", None)
-    if ia_name is not None:
-      if self.op.remote_node is not None:
-        raise errors.OpPrereqError("Give either the iallocator or the new"
-                                   " secondary, not both")
-      self.op.remote_node = self._RunAllocator()
+    if self.op.iallocator is not None:
+      self._RunAllocator()
 
     remote_node = self.op.remote_node
     if remote_node is not None:
-      remote_node = self.cfg.ExpandNodeName(remote_node)
-      if remote_node is None:
-        raise errors.OpPrereqError("Node '%s' not known" %
-                                   self.op.remote_node)
       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
+      assert self.remote_node_info is not None, \
+        "Cannot retrieve locked node %s" % remote_node
     else:
       self.remote_node_info = None
     if remote_node == instance.primary_node:
       raise errors.OpPrereqError("The specified node is the primary node of"
                                  " the instance.")
     elif remote_node == self.sec_node:
-      if self.op.mode == constants.REPLACE_DISK_SEC:
-        # this is for DRBD8, where we can't execute the same mode of
-        # replacement as for drbd7 (no different port allocated)
-        raise errors.OpPrereqError("Same secondary given, cannot execute"
-                                   " replacement")
-    if instance.disk_template == constants.DT_DRBD8:
-      if (self.op.mode == constants.REPLACE_DISK_ALL and
-          remote_node is not None):
-        # switch to replace secondary mode
-        self.op.mode = constants.REPLACE_DISK_SEC
-
-      if self.op.mode == constants.REPLACE_DISK_ALL:
-        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
-                                   " secondary disk replacement, not"
-                                   " both at once")
-      elif self.op.mode == constants.REPLACE_DISK_PRI:
-        if remote_node is not None:
-          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
-                                     " the secondary while doing a primary"
-                                     " node disk replacement")
-        self.tgt_node = instance.primary_node
-        self.oth_node = instance.secondary_nodes[0]
-      elif self.op.mode == constants.REPLACE_DISK_SEC:
-        self.new_node = remote_node # this can be None, in which case
-                                    # we don't change the secondary
-        self.tgt_node = instance.secondary_nodes[0]
-        self.oth_node = instance.primary_node
-      else:
-        raise errors.ProgrammerError("Unhandled disk replace mode")
+      raise errors.OpPrereqError("The specified node is already the"
+                                 " secondary node of the instance.")
+
+    if self.op.mode == constants.REPLACE_DISK_PRI:
+      n1 = self.tgt_node = instance.primary_node
+      n2 = self.oth_node = self.sec_node
+    elif self.op.mode == constants.REPLACE_DISK_SEC:
+      n1 = self.tgt_node = self.sec_node
+      n2 = self.oth_node = instance.primary_node
+    elif self.op.mode == constants.REPLACE_DISK_CHG:
+      n1 = self.new_node = remote_node
+      n2 = self.oth_node = instance.primary_node
+      self.tgt_node = self.sec_node
+    else:
+      raise errors.ProgrammerError("Unhandled disk replace mode")
 
-    for name in self.op.disks:
-      if instance.FindDisk(name) is None:
-        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
-                                   (name, instance.name))
-    self.op.remote_node = remote_node
+    _CheckNodeOnline(self, n1)
+    _CheckNodeOnline(self, n2)
+
+    if not self.op.disks:
+      self.op.disks = range(len(instance.disks))
+
+    for disk_idx in self.op.disks:
+      instance.FindDisk(disk_idx)
 
   def _ExecD8DiskOnly(self, feedback_fn):
     """Replace a disk on the primary or secondary for dbrd8.
 
     The algorithm for replace is quite complicated:
-      - for each disk to be replaced:
-        - create new LVs on the target node with unique names
-        - detach old LVs from the drbd device
-        - rename old LVs to name_replaced.<time_t>
-        - rename new LVs to old LVs
-        - attach the new LVs (with the old names now) to the drbd device
-      - wait for sync across all devices
-      - for each modified disk:
-        - remove old LVs (which have the name name_replaces.<time_t>)
+
+      1. for each disk to be replaced:
+
+        1. create new LVs on the target node with unique names
+        1. detach old LVs from the drbd device
+        1. rename old LVs to name_replaced.<time_t>
+        1. rename new LVs to old LVs
+        1. attach the new LVs (with the old names now) to the drbd device
+
+      1. wait for sync across all devices
+
+      1. for each modified disk:
+
+        1. remove old LVs (which have the name name_replaces.<time_t>)
 
     Failures are not very well handled.
 
@@ -3638,31 +4918,31 @@ class LUReplaceDisks(LogicalUnit):
     self.proc.LogStep(1, steps_total, "check device existence")
     info("checking volume groups")
     my_vg = cfg.GetVGName()
-    results = rpc.call_vg_list([oth_node, tgt_node])
+    results = self.rpc.call_vg_list([oth_node, tgt_node])
     if not results:
       raise errors.OpExecError("Can't list volume groups on the nodes")
     for node in oth_node, tgt_node:
-      res = results.get(node, False)
-      if not res or my_vg not in res:
+      res = results[node]
+      if res.failed or not res.data or my_vg not in res.data:
         raise errors.OpExecError("Volume group '%s' not found on %s" %
                                  (my_vg, node))
-    for dev in instance.disks:
-      if not dev.iv_name in self.op.disks:
+    for idx, dev in enumerate(instance.disks):
+      if idx not in self.op.disks:
         continue
       for node in tgt_node, oth_node:
-        info("checking %s on %s" % (dev.iv_name, node))
+        info("checking disk/%d on %s" % (idx, node))
         cfg.SetDiskID(dev, node)
-        if not rpc.call_blockdev_find(node, dev):
-          raise errors.OpExecError("Can't find device %s on node %s" %
-                                   (dev.iv_name, node))
+        if not self.rpc.call_blockdev_find(node, dev):
+          raise errors.OpExecError("Can't find disk/%d on node %s" %
+                                   (idx, node))
 
     # Step: check other node consistency
     self.proc.LogStep(2, steps_total, "check peer consistency")
-    for dev in instance.disks:
-      if not dev.iv_name in self.op.disks:
+    for idx, dev in enumerate(instance.disks):
+      if idx not in self.op.disks:
         continue
-      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
-      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
+      info("checking disk/%d consistency on %s" % (idx, oth_node))
+      if not _CheckDiskConsistency(self, dev, oth_node,
                                    oth_node==instance.primary_node):
         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
                                  " to replace disks on this node (%s)" %
@@ -3670,13 +4950,14 @@ class LUReplaceDisks(LogicalUnit):
 
     # Step: create new storage
     self.proc.LogStep(3, steps_total, "allocate new storage")
-    for dev in instance.disks:
-      if not dev.iv_name in self.op.disks:
+    for idx, dev in enumerate(instance.disks):
+      if idx not in self.op.disks:
         continue
       size = dev.size
       cfg.SetDiskID(dev, tgt_node)
-      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
-      names = _GenerateUniqueNames(cfg, lv_names)
+      lv_names = [".disk%d_%s" % (idx, suf)
+                  for suf in ["data", "meta"]]
+      names = _GenerateUniqueNames(self, lv_names)
       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
                              logical_id=(vgname, names[0]))
       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
@@ -3686,21 +4967,18 @@ class LUReplaceDisks(LogicalUnit):
       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
       info("creating new local storage on %s for %s" %
            (tgt_node, dev.iv_name))
-      # since we *always* want to create this LV, we use the
-      # _Create...OnPrimary (which forces the creation), even if we
-      # are talking about the secondary node
+      # we pass force_create=True to force the LVM creation
       for new_lv in new_lvs:
-        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
-                                        _GetInstanceInfoText(instance)):
-          raise errors.OpExecError("Failed to create new LV named '%s' on"
-                                   " node '%s'" %
-                                   (new_lv.logical_id[1], tgt_node))
+        _CreateBlockDev(self, tgt_node, instance, new_lv, True,
+                        _GetInstanceInfoText(instance), False)
 
     # Step: for each lv, detach+rename*2+attach
     self.proc.LogStep(4, steps_total, "change drbd configuration")
     for dev, old_lvs, new_lvs in iv_names.itervalues():
       info("detaching %s drbd from local storage" % dev.iv_name)
-      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
+      result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
+      result.Raise()
+      if not result.data:
         raise errors.OpExecError("Can't detach drbd from local storage on node"
                                  " %s for device %s" % (tgt_node, dev.iv_name))
       #dev.children = []
@@ -3719,17 +4997,21 @@ class LUReplaceDisks(LogicalUnit):
       # build the rename list based on what LVs exist on the node
       rlist = []
       for to_ren in old_lvs:
-        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
-        if find_res is not None: # device exists
+        find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
+        if not find_res.failed and find_res.data is not None: # device exists
           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
 
       info("renaming the old LVs on the target node")
-      if not rpc.call_blockdev_rename(tgt_node, rlist):
+      result = self.rpc.call_blockdev_rename(tgt_node, rlist)
+      result.Raise()
+      if not result.data:
         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
       # now we rename the new LVs to the old LVs
       info("renaming the new LVs on the target node")
       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
-      if not rpc.call_blockdev_rename(tgt_node, rlist):
+      result = self.rpc.call_blockdev_rename(tgt_node, rlist)
+      result.Raise()
+      if not result.data:
         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
 
       for old, new in zip(old_lvs, new_lvs):
@@ -3742,9 +5024,11 @@ class LUReplaceDisks(LogicalUnit):
 
       # now that the new lvs have the old name, we can add them to the device
       info("adding new mirror component on %s" % tgt_node)
-      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
+      result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
+      if result.failed or not result.data:
         for new_lv in new_lvs:
-          if not rpc.call_blockdev_remove(tgt_node, new_lv):
+          result = self.rpc.call_blockdev_remove(tgt_node, new_lv)
+          if result.failed or not result.data:
             warning("Can't rollback device %s", hint="manually cleanup unused"
                     " logical volumes")
         raise errors.OpExecError("Can't add local storage to drbd")
@@ -3758,13 +5042,13 @@ class LUReplaceDisks(LogicalUnit):
     # does a combined result over all disks, so we don't check its
     # return value
     self.proc.LogStep(5, steps_total, "sync devices")
-    _WaitForSync(cfg, instance, self.proc, unlock=True)
+    _WaitForSync(self, instance, unlock=True)
 
     # so check manually all the devices
     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
       cfg.SetDiskID(dev, instance.primary_node)
-      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
-      if is_degr:
+      result = self.rpc.call_blockdev_find(instance.primary_node, dev)
+      if result.failed or result.data[5]:
         raise errors.OpExecError("DRBD device %s is degraded!" % name)
 
     # Step: remove old storage
@@ -3773,7 +5057,8 @@ class LUReplaceDisks(LogicalUnit):
       info("remove logical volumes for %s" % name)
       for lv in old_lvs:
         cfg.SetDiskID(lv, tgt_node)
-        if not rpc.call_blockdev_remove(tgt_node, lv):
+        result = self.rpc.call_blockdev_remove(tgt_node, lv)
+        if result.failed or not result.data:
           warning("Can't remove old LV", hint="manually remove unused LVs")
           continue
 
@@ -3800,144 +5085,156 @@ class LUReplaceDisks(LogicalUnit):
     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
     instance = self.instance
     iv_names = {}
-    vgname = self.cfg.GetVGName()
     # start of work
     cfg = self.cfg
     old_node = self.tgt_node
     new_node = self.new_node
     pri_node = instance.primary_node
+    nodes_ip = {
+      old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
+      new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
+      pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
+      }
 
     # Step: check device activation
     self.proc.LogStep(1, steps_total, "check device existence")
     info("checking volume groups")
     my_vg = cfg.GetVGName()
-    results = rpc.call_vg_list([pri_node, new_node])
-    if not results:
-      raise errors.OpExecError("Can't list volume groups on the nodes")
+    results = self.rpc.call_vg_list([pri_node, new_node])
     for node in pri_node, new_node:
-      res = results.get(node, False)
-      if not res or my_vg not in res:
+      res = results[node]
+      if res.failed or not res.data or my_vg not in res.data:
         raise errors.OpExecError("Volume group '%s' not found on %s" %
                                  (my_vg, node))
-    for dev in instance.disks:
-      if not dev.iv_name in self.op.disks:
+    for idx, dev in enumerate(instance.disks):
+      if idx not in self.op.disks:
         continue
-      info("checking %s on %s" % (dev.iv_name, pri_node))
+      info("checking disk/%d on %s" % (idx, pri_node))
       cfg.SetDiskID(dev, pri_node)
-      if not rpc.call_blockdev_find(pri_node, dev):
-        raise errors.OpExecError("Can't find device %s on node %s" %
-                                 (dev.iv_name, pri_node))
+      result = self.rpc.call_blockdev_find(pri_node, dev)
+      result.Raise()
+      if not result.data:
+        raise errors.OpExecError("Can't find disk/%d on node %s" %
+                                 (idx, pri_node))
 
     # Step: check other node consistency
     self.proc.LogStep(2, steps_total, "check peer consistency")
-    for dev in instance.disks:
-      if not dev.iv_name in self.op.disks:
+    for idx, dev in enumerate(instance.disks):
+      if idx not in self.op.disks:
         continue
-      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
-      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
+      info("checking disk/%d consistency on %s" % (idx, pri_node))
+      if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
         raise errors.OpExecError("Primary node (%s) has degraded storage,"
                                  " unsafe to replace the secondary" %
                                  pri_node)
 
     # Step: create new storage
     self.proc.LogStep(3, steps_total, "allocate new storage")
-    for dev in instance.disks:
-      size = dev.size
-      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
-      # since we *always* want to create this LV, we use the
-      # _Create...OnPrimary (which forces the creation), even if we
-      # are talking about the secondary node
+    for idx, dev in enumerate(instance.disks):
+      info("adding new local storage on %s for disk/%d" %
+           (new_node, idx))
+      # we pass force_create=True to force LVM creation
       for new_lv in dev.children:
-        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
-                                        _GetInstanceInfoText(instance)):
-          raise errors.OpExecError("Failed to create new LV named '%s' on"
-                                   " node '%s'" %
-                                   (new_lv.logical_id[1], new_node))
-
-      iv_names[dev.iv_name] = (dev, dev.children)
-
+        _CreateBlockDev(self, new_node, instance, new_lv, True,
+                        _GetInstanceInfoText(instance), False)
+
+    # Step 4: dbrd minors and drbd setups changes
+    # after this, we must manually remove the drbd minors on both the
+    # error and the success paths
+    minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
+                                   instance.name)
+    logging.debug("Allocated minors %s" % (minors,))
     self.proc.LogStep(4, steps_total, "changing drbd configuration")
-    for dev in instance.disks:
+    for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
       size = dev.size
-      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
-      # create new devices on new_node
+      info("activating a new drbd on %s for disk/%d" % (new_node, idx))
+      # create new devices on new_node; note that we create two IDs:
+      # one without port, so the drbd will be activated without
+      # networking information on the new node at this stage, and one
+      # with network, for the latter activation in step 4
+      (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
+      if pri_node == o_node1:
+        p_minor = o_minor1
+      else:
+        p_minor = o_minor2
+
+      new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
+      new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
+
+      iv_names[idx] = (dev, dev.children, new_net_id)
+      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
+                    new_net_id)
       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
-                              logical_id=(pri_node, new_node,
-                                          dev.logical_id[2]),
+                              logical_id=new_alone_id,
                               children=dev.children)
-      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
-                                        new_drbd, False,
-                                      _GetInstanceInfoText(instance)):
-        raise errors.OpExecError("Failed to create new DRBD on"
-                                 " node '%s'" % new_node)
+      try:
+        _CreateSingleBlockDev(self, new_node, instance, new_drbd,
+                              _GetInstanceInfoText(instance), False)
+      except errors.BlockDeviceError:
+        self.cfg.ReleaseDRBDMinors(instance.name)
+        raise
 
-    for dev in instance.disks:
+    for idx, dev in enumerate(instance.disks):
       # we have new devices, shutdown the drbd on the old secondary
-      info("shutting down drbd for %s on old node" % dev.iv_name)
+      info("shutting down drbd for disk/%d on old node" % idx)
       cfg.SetDiskID(dev, old_node)
-      if not rpc.call_blockdev_shutdown(old_node, dev):
-        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
+      result = self.rpc.call_blockdev_shutdown(old_node, dev)
+      if result.failed or not result.data:
+        warning("Failed to shutdown drbd for disk/%d on old node" % idx,
                 hint="Please cleanup this device manually as soon as possible")
 
     info("detaching primary drbds from the network (=> standalone)")
-    done = 0
-    for dev in instance.disks:
-      cfg.SetDiskID(dev, pri_node)
-      # set the physical (unique in bdev terms) id to None, meaning
-      # detach from network
-      dev.physical_id = (None,) * len(dev.physical_id)
-      # and 'find' the device, which will 'fix' it to match the
-      # standalone state
-      if rpc.call_blockdev_find(pri_node, dev):
-        done += 1
-      else:
-        warning("Failed to detach drbd %s from network, unusual case" %
-                dev.iv_name)
+    result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
+                                               instance.disks)[pri_node]
 
-    if not done:
-      # no detaches succeeded (very unlikely)
-      raise errors.OpExecError("Can't detach at least one DRBD from old node")
+    msg = result.RemoteFailMsg()
+    if msg:
+      # detaches didn't succeed (unlikely)
+      self.cfg.ReleaseDRBDMinors(instance.name)
+      raise errors.OpExecError("Can't detach the disks from the network on"
+                               " old node: %s" % (msg,))
 
     # if we managed to detach at least one, we update all the disks of
     # the instance to point to the new secondary
     info("updating instance configuration")
-    for dev in instance.disks:
-      dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
+    for dev, _, new_logical_id in iv_names.itervalues():
+      dev.logical_id = new_logical_id
       cfg.SetDiskID(dev, pri_node)
     cfg.Update(instance)
 
     # and now perform the drbd attach
     info("attaching primary drbds to new secondary (standalone => connected)")
-    failures = []
-    for dev in instance.disks:
-      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
-      # since the attach is smart, it's enough to 'find' the device,
-      # it will automatically activate the network, if the physical_id
-      # is correct
-      cfg.SetDiskID(dev, pri_node)
-      if not rpc.call_blockdev_find(pri_node, dev):
-        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
-                "please do a gnt-instance info to see the status of disks")
+    result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
+                                           instance.disks, instance.name,
+                                           False)
+    for to_node, to_result in result.items():
+      msg = to_result.RemoteFailMsg()
+      if msg:
+        warning("can't attach drbd disks on node %s: %s", to_node, msg,
+                hint="please do a gnt-instance info to see the"
+                " status of disks")
 
     # this can fail as the old devices are degraded and _WaitForSync
     # does a combined result over all disks, so we don't check its
     # return value
     self.proc.LogStep(5, steps_total, "sync devices")
-    _WaitForSync(cfg, instance, self.proc, unlock=True)
+    _WaitForSync(self, instance, unlock=True)
 
     # so check manually all the devices
-    for name, (dev, old_lvs) in iv_names.iteritems():
+    for idx, (dev, old_lvs, _) in iv_names.iteritems():
       cfg.SetDiskID(dev, pri_node)
-      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
-      if is_degr:
-        raise errors.OpExecError("DRBD device %s is degraded!" % name)
+      result = self.rpc.call_blockdev_find(pri_node, dev)
+      result.Raise()
+      if result.data[5]:
+        raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
 
     self.proc.LogStep(6, steps_total, "removing old storage")
-    for name, (dev, old_lvs) in iv_names.iteritems():
-      info("remove logical volumes for %s" % name)
+    for idx, (dev, old_lvs, _) in iv_names.iteritems():
+      info("remove logical volumes for disk/%d" % idx)
       for lv in old_lvs:
         cfg.SetDiskID(lv, old_node)
-        if not rpc.call_blockdev_remove(old_node, lv):
+        result = self.rpc.call_blockdev_remove(old_node, lv)
+        if result.failed or not result.data:
           warning("Can't remove LV on old secondary",
                   hint="Cleanup stale volumes by hand")
 
@@ -3950,24 +5247,19 @@ class LUReplaceDisks(LogicalUnit):
     instance = self.instance
 
     # Activate the instance disks if we're replacing them on a down instance
-    if instance.status == "down":
-      op = opcodes.OpActivateInstanceDisks(instance_name=instance.name)
-      self.proc.ChainOpCode(op)
+    if not instance.admin_up:
+      _StartInstanceDisks(self, instance, True)
 
-    if instance.disk_template == constants.DT_DRBD8:
-      if self.op.remote_node is None:
-        fn = self._ExecD8DiskOnly
-      else:
-        fn = self._ExecD8Secondary
+    if self.op.mode == constants.REPLACE_DISK_CHG:
+      fn = self._ExecD8Secondary
     else:
-      raise errors.ProgrammerError("Unhandled disk replacement case")
+      fn = self._ExecD8DiskOnly
 
     ret = fn(feedback_fn)
 
     # Deactivate the instance disks if we're replacing them on a down instance
-    if instance.status == "down":
-      op = opcodes.OpDeactivateInstanceDisks(instance_name=instance.name)
-      self.proc.ChainOpCode(op)
+    if not instance.admin_up:
+      _SafeShutdownInstanceDisks(self, instance)
 
     return ret
 
@@ -3978,7 +5270,17 @@ class LUGrowDisk(LogicalUnit):
   """
   HPATH = "disk-grow"
   HTYPE = constants.HTYPE_INSTANCE
-  _OP_REQP = ["instance_name", "disk", "amount"]
+  _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    self._ExpandAndLockInstance()
+    self.needed_locks[locking.LEVEL_NODE] = []
+    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+
+  def DeclareLocks(self, level):
+    if level == locking.LEVEL_NODE:
+      self._LockInstancesNodes()
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -3990,9 +5292,9 @@ class LUGrowDisk(LogicalUnit):
       "DISK": self.op.disk,
       "AMOUNT": self.op.amount,
       }
-    env.update(_BuildInstanceHookEnvByObject(self.instance))
+    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
     nl = [
-      self.sstore.GetMasterNode(),
+      self.cfg.GetMasterNode(),
       self.instance.primary_node,
       ]
     return env, nl, nl
@@ -4003,62 +5305,95 @@ class LUGrowDisk(LogicalUnit):
     This checks that the instance is in the cluster.
 
     """
-    instance = self.cfg.GetInstanceInfo(
-      self.cfg.ExpandInstanceName(self.op.instance_name))
-    if instance is None:
-      raise errors.OpPrereqError("Instance '%s' not known" %
-                                 self.op.instance_name)
+    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
+    assert instance is not None, \
+      "Cannot retrieve locked instance %s" % self.op.instance_name
+    nodenames = list(instance.all_nodes)
+    for node in nodenames:
+      _CheckNodeOnline(self, node)
+
+
     self.instance = instance
-    self.op.instance_name = instance.name
 
     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
       raise errors.OpPrereqError("Instance's disk layout does not support"
                                  " growing.")
 
-    if instance.FindDisk(self.op.disk) is None:
-      raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
-                                 (self.op.disk, instance.name))
+    self.disk = instance.FindDisk(self.op.disk)
 
-    nodenames = [instance.primary_node] + list(instance.secondary_nodes)
-    nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
+    nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
+                                       instance.hypervisor)
     for node in nodenames:
-      info = nodeinfo.get(node, None)
-      if not info:
+      info = nodeinfo[node]
+      if info.failed or not info.data:
         raise errors.OpPrereqError("Cannot get current information"
                                    " from node '%s'" % node)
-      vg_free = info.get('vg_free', None)
+      vg_free = info.data.get('vg_free', None)
       if not isinstance(vg_free, int):
         raise errors.OpPrereqError("Can't compute free disk space on"
                                    " node %s" % node)
-      if self.op.amount > info['vg_free']:
+      if self.op.amount > vg_free:
         raise errors.OpPrereqError("Not enough disk space on target node %s:"
                                    " %d MiB available, %d MiB required" %
-                                   (node, info['vg_free'], self.op.amount))
+                                   (node, vg_free, self.op.amount))
 
   def Exec(self, feedback_fn):
     """Execute disk grow.
 
     """
     instance = self.instance
-    disk = instance.FindDisk(self.op.disk)
-    for node in (instance.secondary_nodes + (instance.primary_node,)):
+    disk = self.disk
+    for node in instance.all_nodes:
       self.cfg.SetDiskID(disk, node)
-      result = rpc.call_blockdev_grow(node, disk, self.op.amount)
-      if not result or not isinstance(result, tuple) or len(result) != 2:
-        raise errors.OpExecError("grow request failed to node %s" % node)
-      elif not result[0]:
-        raise errors.OpExecError("grow request failed to node %s: %s" %
-                                 (node, result[1]))
+      result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
+      result.Raise()
+      if (not result.data or not isinstance(result.data, (list, tuple)) or
+          len(result.data) != 2):
+        raise errors.OpExecError("Grow request failed to node %s" % node)
+      elif not result.data[0]:
+        raise errors.OpExecError("Grow request failed to node %s: %s" %
+                                 (node, result.data[1]))
     disk.RecordGrow(self.op.amount)
     self.cfg.Update(instance)
-    return
+    if self.op.wait_for_sync:
+      disk_abort = not _WaitForSync(self, instance)
+      if disk_abort:
+        self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
+                             " status.\nPlease check the instance.")
 
 
 class LUQueryInstanceData(NoHooksLU):
   """Query runtime instance data.
 
   """
-  _OP_REQP = ["instances"]
+  _OP_REQP = ["instances", "static"]
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    self.needed_locks = {}
+    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
+
+    if not isinstance(self.op.instances, list):
+      raise errors.OpPrereqError("Invalid argument type 'instances'")
+
+    if self.op.instances:
+      self.wanted_names = []
+      for name in self.op.instances:
+        full_name = self.cfg.ExpandInstanceName(name)
+        if full_name is None:
+          raise errors.OpPrereqError("Instance '%s' not known" % name)
+        self.wanted_names.append(full_name)
+      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
+    else:
+      self.wanted_names = None
+      self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
+
+    self.needed_locks[locking.LEVEL_NODE] = []
+    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+
+  def DeclareLocks(self, level):
+    if level == locking.LEVEL_NODE:
+      self._LockInstancesNodes()
 
   def CheckPrereq(self):
     """Check prerequisites.
@@ -4066,28 +5401,26 @@ class LUQueryInstanceData(NoHooksLU):
     This only checks the optional instance list against the existing names.
 
     """
-    if not isinstance(self.op.instances, list):
-      raise errors.OpPrereqError("Invalid argument type 'instances'")
-    if self.op.instances:
-      self.wanted_instances = []
-      names = self.op.instances
-      for name in names:
-        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
-        if instance is None:
-          raise errors.OpPrereqError("No such instance name '%s'" % name)
-        self.wanted_instances.append(instance)
-    else:
-      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
-                               in self.cfg.GetInstanceList()]
-    return
+    if self.wanted_names is None:
+      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
 
+    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
+                             in self.wanted_names]
+    return
 
   def _ComputeDiskStatus(self, instance, snode, dev):
     """Compute block device status.
 
     """
-    self.cfg.SetDiskID(dev, instance.primary_node)
-    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
+    static = self.op.static
+    if not static:
+      self.cfg.SetDiskID(dev, instance.primary_node)
+      dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
+      dev_pstatus.Raise()
+      dev_pstatus = dev_pstatus.data
+    else:
+      dev_pstatus = None
+
     if dev.dev_type in constants.LDS_DRBD:
       # we change the snode then (otherwise we use the one passed in)
       if dev.logical_id[0] == instance.primary_node:
@@ -4095,9 +5428,11 @@ class LUQueryInstanceData(NoHooksLU):
       else:
         snode = dev.logical_id[0]
 
-    if snode:
+    if snode and not static:
       self.cfg.SetDiskID(dev, snode)
-      dev_sstatus = rpc.call_blockdev_find(snode, dev)
+      dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
+      dev_sstatus.Raise()
+      dev_sstatus = dev_sstatus.data
     else:
       dev_sstatus = None
 
@@ -4115,6 +5450,7 @@ class LUQueryInstanceData(NoHooksLU):
       "pstatus": dev_pstatus,
       "sstatus": dev_sstatus,
       "children": dev_children,
+      "mode": dev.mode,
       }
 
     return data
@@ -4122,17 +5458,26 @@ class LUQueryInstanceData(NoHooksLU):
   def Exec(self, feedback_fn):
     """Gather and return data"""
     result = {}
+
+    cluster = self.cfg.GetClusterInfo()
+
     for instance in self.wanted_instances:
-      remote_info = rpc.call_instance_info(instance.primary_node,
-                                                instance.name)
-      if remote_info and "state" in remote_info:
-        remote_state = "up"
-      else:
-        remote_state = "down"
-      if instance.status == "down":
-        config_state = "down"
+      if not self.op.static:
+        remote_info = self.rpc.call_instance_info(instance.primary_node,
+                                                  instance.name,
+                                                  instance.hypervisor)
+        remote_info.Raise()
+        remote_info = remote_info.data
+        if remote_info and "state" in remote_info:
+          remote_state = "up"
+        else:
+          remote_state = "down"
       else:
+        remote_state = None
+      if instance.admin_up:
         config_state = "up"
+      else:
+        config_state = "down"
 
       disks = [self._ComputeDiskStatus(instance, None, device)
                for device in instance.disks]
@@ -4144,46 +5489,16 @@ class LUQueryInstanceData(NoHooksLU):
         "pnode": instance.primary_node,
         "snodes": instance.secondary_nodes,
         "os": instance.os,
-        "memory": instance.memory,
         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
         "disks": disks,
-        "vcpus": instance.vcpus,
+        "hypervisor": instance.hypervisor,
+        "network_port": instance.network_port,
+        "hv_instance": instance.hvparams,
+        "hv_actual": cluster.FillHV(instance),
+        "be_instance": instance.beparams,
+        "be_actual": cluster.FillBE(instance),
         }
 
-      htkind = self.sstore.GetHypervisorType()
-      if htkind == constants.HT_XEN_PVM30:
-        idict["kernel_path"] = instance.kernel_path
-        idict["initrd_path"] = instance.initrd_path
-
-      if htkind == constants.HT_XEN_HVM31:
-        idict["hvm_boot_order"] = instance.hvm_boot_order
-        idict["hvm_acpi"] = instance.hvm_acpi
-        idict["hvm_pae"] = instance.hvm_pae
-        idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
-        idict["hvm_nic_type"] = instance.hvm_nic_type
-        idict["hvm_disk_type"] = instance.hvm_disk_type
-
-      if htkind in constants.HTS_REQ_PORT:
-        if instance.vnc_bind_address is None:
-          vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
-        else:
-          vnc_bind_address = instance.vnc_bind_address
-        if instance.network_port is None:
-          vnc_console_port = None
-        elif vnc_bind_address == constants.BIND_ADDRESS_GLOBAL:
-          vnc_console_port = "%s:%s" % (instance.primary_node,
-                                       instance.network_port)
-        elif vnc_bind_address == constants.LOCALHOST_IP_ADDRESS:
-          vnc_console_port = "%s:%s on node %s" % (vnc_bind_address,
-                                                   instance.network_port,
-                                                   instance.primary_node)
-        else:
-          vnc_console_port = "%s:%s" % (instance.vnc_bind_address,
-                                        instance.network_port)
-        idict["vnc_console_port"] = vnc_console_port
-        idict["vnc_bind_address"] = vnc_bind_address
-        idict["network_port"] = instance.network_port
-
       result[instance.name] = idict
 
     return result
@@ -4198,8 +5513,101 @@ class LUSetInstanceParams(LogicalUnit):
   _OP_REQP = ["instance_name"]
   REQ_BGL = False
 
+  def CheckArguments(self):
+    if not hasattr(self.op, 'nics'):
+      self.op.nics = []
+    if not hasattr(self.op, 'disks'):
+      self.op.disks = []
+    if not hasattr(self.op, 'beparams'):
+      self.op.beparams = {}
+    if not hasattr(self.op, 'hvparams'):
+      self.op.hvparams = {}
+    self.op.force = getattr(self.op, "force", False)
+    if not (self.op.nics or self.op.disks or
+            self.op.hvparams or self.op.beparams):
+      raise errors.OpPrereqError("No changes submitted")
+
+    utils.CheckBEParams(self.op.beparams)
+
+    # Disk validation
+    disk_addremove = 0
+    for disk_op, disk_dict in self.op.disks:
+      if disk_op == constants.DDM_REMOVE:
+        disk_addremove += 1
+        continue
+      elif disk_op == constants.DDM_ADD:
+        disk_addremove += 1
+      else:
+        if not isinstance(disk_op, int):
+          raise errors.OpPrereqError("Invalid disk index")
+      if disk_op == constants.DDM_ADD:
+        mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
+        if mode not in constants.DISK_ACCESS_SET:
+          raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
+        size = disk_dict.get('size', None)
+        if size is None:
+          raise errors.OpPrereqError("Required disk parameter size missing")
+        try:
+          size = int(size)
+        except ValueError, err:
+          raise errors.OpPrereqError("Invalid disk size parameter: %s" %
+                                     str(err))
+        disk_dict['size'] = size
+      else:
+        # modification of disk
+        if 'size' in disk_dict:
+          raise errors.OpPrereqError("Disk size change not possible, use"
+                                     " grow-disk")
+
+    if disk_addremove > 1:
+      raise errors.OpPrereqError("Only one disk add or remove operation"
+                                 " supported at a time")
+
+    # NIC validation
+    nic_addremove = 0
+    for nic_op, nic_dict in self.op.nics:
+      if nic_op == constants.DDM_REMOVE:
+        nic_addremove += 1
+        continue
+      elif nic_op == constants.DDM_ADD:
+        nic_addremove += 1
+      else:
+        if not isinstance(nic_op, int):
+          raise errors.OpPrereqError("Invalid nic index")
+
+      # nic_dict should be a dict
+      nic_ip = nic_dict.get('ip', None)
+      if nic_ip is not None:
+        if nic_ip.lower() == "none":
+          nic_dict['ip'] = None
+        else:
+          if not utils.IsValidIP(nic_ip):
+            raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
+      # we can only check None bridges and assign the default one
+      nic_bridge = nic_dict.get('bridge', None)
+      if nic_bridge is None:
+        nic_dict['bridge'] = self.cfg.GetDefBridge()
+      # but we can validate MACs
+      nic_mac = nic_dict.get('mac', None)
+      if nic_mac is not None:
+        if self.cfg.IsMacInUse(nic_mac):
+          raise errors.OpPrereqError("MAC address %s already in use"
+                                     " in cluster" % nic_mac)
+        if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
+          if not utils.IsValidMac(nic_mac):
+            raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
+    if nic_addremove > 1:
+      raise errors.OpPrereqError("Only one NIC add or remove operation"
+                                 " supported at a time")
+
   def ExpandNames(self):
     self._ExpandAndLockInstance()
+    self.needed_locks[locking.LEVEL_NODE] = []
+    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+
+  def DeclareLocks(self, level):
+    if level == locking.LEVEL_NODE:
+      self._LockInstancesNodes()
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -4208,27 +5616,13 @@ class LUSetInstanceParams(LogicalUnit):
 
     """
     args = dict()
-    if self.mem:
-      args['memory'] = self.mem
-    if self.vcpus:
-      args['vcpus'] = self.vcpus
-    if self.do_ip or self.do_bridge or self.mac:
-      if self.do_ip:
-        ip = self.ip
-      else:
-        ip = self.instance.nics[0].ip
-      if self.bridge:
-        bridge = self.bridge
-      else:
-        bridge = self.instance.nics[0].bridge
-      if self.mac:
-        mac = self.mac
-      else:
-        mac = self.instance.nics[0].mac
-      args['nics'] = [(ip, bridge, mac)]
-    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
-    nl = [self.sstore.GetMasterNode(),
-          self.instance.primary_node] + list(self.instance.secondary_nodes)
+    if constants.BE_MEMORY in self.be_new:
+      args['memory'] = self.be_new[constants.BE_MEMORY]
+    if constants.BE_VCPUS in self.be_new:
+      args['vcpus'] = self.be_new[constants.BE_VCPUS]
+    # FIXME: readd disk/nic changes
+    env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
+    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
     return env, nl, nl
 
   def CheckPrereq(self):
@@ -4237,153 +5631,148 @@ class LUSetInstanceParams(LogicalUnit):
     This only checks the instance list against the existing names.
 
     """
-    # FIXME: all the parameters could be checked before, in ExpandNames, or in
-    # a separate CheckArguments function, if we implement one, so the operation
-    # can be aborted without waiting for any lock, should it have an error...
-    self.mem = getattr(self.op, "mem", None)
-    self.vcpus = getattr(self.op, "vcpus", None)
-    self.ip = getattr(self.op, "ip", None)
-    self.mac = getattr(self.op, "mac", None)
-    self.bridge = getattr(self.op, "bridge", None)
-    self.kernel_path = getattr(self.op, "kernel_path", None)
-    self.initrd_path = getattr(self.op, "initrd_path", None)
-    self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
-    self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
-    self.hvm_pae = getattr(self.op, "hvm_pae", None)
-    self.hvm_nic_type = getattr(self.op, "hvm_nic_type", None)
-    self.hvm_disk_type = getattr(self.op, "hvm_disk_type", None)
-    self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
-    self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
-    self.force = getattr(self.op, "force", None)
-    all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
-                 self.kernel_path, self.initrd_path, self.hvm_boot_order,
-                 self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
-                 self.vnc_bind_address, self.hvm_nic_type, self.hvm_disk_type]
-    if all_parms.count(None) == len(all_parms):
-      raise errors.OpPrereqError("No changes submitted")
-    if self.mem is not None:
-      try:
-        self.mem = int(self.mem)
-      except ValueError, err:
-        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
-    if self.vcpus is not None:
-      try:
-        self.vcpus = int(self.vcpus)
-      except ValueError, err:
-        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
-    if self.ip is not None:
-      self.do_ip = True
-      if self.ip.lower() == "none":
-        self.ip = None
-      else:
-        if not utils.IsValidIP(self.ip):
-          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
-    else:
-      self.do_ip = False
-    self.do_bridge = (self.bridge is not None)
-    if self.mac is not None:
-      if self.cfg.IsMacInUse(self.mac):
-        raise errors.OpPrereqError('MAC address %s already in use in cluster' %
-                                   self.mac)
-      if not utils.IsValidMac(self.mac):
-        raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
-
-    if self.kernel_path is not None:
-      self.do_kernel_path = True
-      if self.kernel_path == constants.VALUE_NONE:
-        raise errors.OpPrereqError("Can't set instance to no kernel")
-
-      if self.kernel_path != constants.VALUE_DEFAULT:
-        if not os.path.isabs(self.kernel_path):
-          raise errors.OpPrereqError("The kernel path must be an absolute"
-                                    " filename")
-    else:
-      self.do_kernel_path = False
-
-    if self.initrd_path is not None:
-      self.do_initrd_path = True
-      if self.initrd_path not in (constants.VALUE_NONE,
-                                  constants.VALUE_DEFAULT):
-        if not os.path.isabs(self.initrd_path):
-          raise errors.OpPrereqError("The initrd path must be an absolute"
-                                    " filename")
-    else:
-      self.do_initrd_path = False
-
-    # boot order verification
-    if self.hvm_boot_order is not None:
-      if self.hvm_boot_order != constants.VALUE_DEFAULT:
-        if len(self.hvm_boot_order.strip("acdn")) != 0:
-          raise errors.OpPrereqError("invalid boot order specified,"
-                                     " must be one or more of [acdn]"
-                                     " or 'default'")
-
-    # hvm_cdrom_image_path verification
-    if self.op.hvm_cdrom_image_path is not None:
-      if not (os.path.isabs(self.op.hvm_cdrom_image_path) or
-              self.op.hvm_cdrom_image_path.lower() == "none"):
-        raise errors.OpPrereqError("The path to the HVM CDROM image must"
-                                   " be an absolute path or None, not %s" %
-                                   self.op.hvm_cdrom_image_path)
-      if not (os.path.isfile(self.op.hvm_cdrom_image_path) or
-              self.op.hvm_cdrom_image_path.lower() == "none"):
-        raise errors.OpPrereqError("The HVM CDROM image must either be a"
-                                   " regular file or a symlink pointing to"
-                                   " an existing regular file, not %s" %
-                                   self.op.hvm_cdrom_image_path)
-
-    # vnc_bind_address verification
-    if self.op.vnc_bind_address is not None:
-      if not utils.IsValidIP(self.op.vnc_bind_address):
-        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
-                                   " like a valid IP address" %
-                                   self.op.vnc_bind_address)
+    force = self.force = self.op.force
+
+    # checking the new params on the primary/secondary nodes
 
     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
     assert self.instance is not None, \
       "Cannot retrieve locked instance %s" % self.op.instance_name
+    pnode = instance.primary_node
+    nodelist = list(instance.all_nodes)
+
+    # hvparams processing
+    if self.op.hvparams:
+      i_hvdict = copy.deepcopy(instance.hvparams)
+      for key, val in self.op.hvparams.iteritems():
+        if val == constants.VALUE_DEFAULT:
+          try:
+            del i_hvdict[key]
+          except KeyError:
+            pass
+        elif val == constants.VALUE_NONE:
+          i_hvdict[key] = None
+        else:
+          i_hvdict[key] = val
+      cluster = self.cfg.GetClusterInfo()
+      hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
+                                i_hvdict)
+      # local check
+      hypervisor.GetHypervisor(
+        instance.hypervisor).CheckParameterSyntax(hv_new)
+      _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
+      self.hv_new = hv_new # the new actual values
+      self.hv_inst = i_hvdict # the new dict (without defaults)
+    else:
+      self.hv_new = self.hv_inst = {}
+
+    # beparams processing
+    if self.op.beparams:
+      i_bedict = copy.deepcopy(instance.beparams)
+      for key, val in self.op.beparams.iteritems():
+        if val == constants.VALUE_DEFAULT:
+          try:
+            del i_bedict[key]
+          except KeyError:
+            pass
+        else:
+          i_bedict[key] = val
+      cluster = self.cfg.GetClusterInfo()
+      be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
+                                i_bedict)
+      self.be_new = be_new # the new actual values
+      self.be_inst = i_bedict # the new dict (without defaults)
+    else:
+      self.be_new = self.be_inst = {}
+
     self.warn = []
-    if self.mem is not None and not self.force:
-      pnode = self.instance.primary_node
-      nodelist = [pnode]
-      nodelist.extend(instance.secondary_nodes)
-      instance_info = rpc.call_instance_info(pnode, instance.name)
-      nodeinfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
-
-      if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
+
+    if constants.BE_MEMORY in self.op.beparams and not self.force:
+      mem_check_list = [pnode]
+      if be_new[constants.BE_AUTO_BALANCE]:
+        # either we changed auto_balance to yes or it was from before
+        mem_check_list.extend(instance.secondary_nodes)
+      instance_info = self.rpc.call_instance_info(pnode, instance.name,
+                                                  instance.hypervisor)
+      nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
+                                         instance.hypervisor)
+      if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
         # Assume the primary node is unreachable and go ahead
         self.warn.append("Can't get info from primary node %s" % pnode)
       else:
-        if instance_info:
-          current_mem = instance_info['memory']
+        if not instance_info.failed and instance_info.data:
+          current_mem = instance_info.data['memory']
         else:
           # Assume instance not running
           # (there is a slight race condition here, but it's not very probable,
           # and we have no other way to check)
           current_mem = 0
-        miss_mem = self.mem - current_mem - nodeinfo[pnode]['memory_free']
+        miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
+                    nodeinfo[pnode].data['memory_free'])
         if miss_mem > 0:
           raise errors.OpPrereqError("This change will prevent the instance"
                                      " from starting, due to %d MB of memory"
                                      " missing on its primary node" % miss_mem)
 
-      for node in instance.secondary_nodes:
-        if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
-          self.warn.append("Can't get info from secondary node %s" % node)
-        elif self.mem > nodeinfo[node]['memory_free']:
-          self.warn.append("Not enough memory to failover instance to secondary"
-                           " node %s" % node)
-
-    # Xen HVM device type checks
-    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
-      if self.op.hvm_nic_type is not None:
-        if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
-          raise errors.OpPrereqError("Invalid NIC type %s specified for Xen"
-                                     " HVM  hypervisor" % self.op.hvm_nic_type)
-      if self.op.hvm_disk_type is not None:
-        if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
-          raise errors.OpPrereqError("Invalid disk type %s specified for Xen"
-                                     " HVM hypervisor" % self.op.hvm_disk_type)
+      if be_new[constants.BE_AUTO_BALANCE]:
+        for node, nres in nodeinfo.iteritems():
+          if node not in instance.secondary_nodes:
+            continue
+          if nres.failed or not isinstance(nres.data, dict):
+            self.warn.append("Can't get info from secondary node %s" % node)
+          elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
+            self.warn.append("Not enough memory to failover instance to"
+                             " secondary node %s" % node)
+
+    # NIC processing
+    for nic_op, nic_dict in self.op.nics:
+      if nic_op == constants.DDM_REMOVE:
+        if not instance.nics:
+          raise errors.OpPrereqError("Instance has no NICs, cannot remove")
+        continue
+      if nic_op != constants.DDM_ADD:
+        # an existing nic
+        if nic_op < 0 or nic_op >= len(instance.nics):
+          raise errors.OpPrereqError("Invalid NIC index %s, valid values"
+                                     " are 0 to %d" %
+                                     (nic_op, len(instance.nics)))
+      nic_bridge = nic_dict.get('bridge', None)
+      if nic_bridge is not None:
+        if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
+          msg = ("Bridge '%s' doesn't exist on one of"
+                 " the instance nodes" % nic_bridge)
+          if self.force:
+            self.warn.append(msg)
+          else:
+            raise errors.OpPrereqError(msg)
+
+    # DISK processing
+    if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
+      raise errors.OpPrereqError("Disk operations not supported for"
+                                 " diskless instances")
+    for disk_op, disk_dict in self.op.disks:
+      if disk_op == constants.DDM_REMOVE:
+        if len(instance.disks) == 1:
+          raise errors.OpPrereqError("Cannot remove the last disk of"
+                                     " an instance")
+        ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
+        ins_l = ins_l[pnode]
+        if ins_l.failed or not isinstance(ins_l.data, list):
+          raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
+        if instance.name in ins_l.data:
+          raise errors.OpPrereqError("Instance is running, can't remove"
+                                     " disks.")
+
+      if (disk_op == constants.DDM_ADD and
+          len(instance.nics) >= constants.MAX_DISKS):
+        raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
+                                   " add more" % constants.MAX_DISKS)
+      if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
+        # an existing disk
+        if disk_op < 0 or disk_op >= len(instance.disks):
+          raise errors.OpPrereqError("Invalid disk index %s, valid values"
+                                     " are 0 to %d" %
+                                     (disk_op, len(instance.disks)))
 
     return
 
@@ -4391,6 +5780,7 @@ class LUSetInstanceParams(LogicalUnit):
     """Modifies an instance.
 
     All parameters take effect only at the next restart of the instance.
+
     """
     # Process here the warnings from CheckPrereq, as we don't have a
     # feedback_fn there.
@@ -4399,54 +5789,95 @@ class LUSetInstanceParams(LogicalUnit):
 
     result = []
     instance = self.instance
-    if self.mem:
-      instance.memory = self.mem
-      result.append(("mem", self.mem))
-    if self.vcpus:
-      instance.vcpus = self.vcpus
-      result.append(("vcpus",  self.vcpus))
-    if self.do_ip:
-      instance.nics[0].ip = self.ip
-      result.append(("ip", self.ip))
-    if self.bridge:
-      instance.nics[0].bridge = self.bridge
-      result.append(("bridge", self.bridge))
-    if self.mac:
-      instance.nics[0].mac = self.mac
-      result.append(("mac", self.mac))
-    if self.do_kernel_path:
-      instance.kernel_path = self.kernel_path
-      result.append(("kernel_path", self.kernel_path))
-    if self.do_initrd_path:
-      instance.initrd_path = self.initrd_path
-      result.append(("initrd_path", self.initrd_path))
-    if self.hvm_boot_order:
-      if self.hvm_boot_order == constants.VALUE_DEFAULT:
-        instance.hvm_boot_order = None
+    # disk changes
+    for disk_op, disk_dict in self.op.disks:
+      if disk_op == constants.DDM_REMOVE:
+        # remove the last disk
+        device = instance.disks.pop()
+        device_idx = len(instance.disks)
+        for node, disk in device.ComputeNodeTree(instance.primary_node):
+          self.cfg.SetDiskID(disk, node)
+          rpc_result = self.rpc.call_blockdev_remove(node, disk)
+          if rpc_result.failed or not rpc_result.data:
+            self.proc.LogWarning("Could not remove disk/%d on node %s,"
+                                 " continuing anyway", device_idx, node)
+        result.append(("disk/%d" % device_idx, "remove"))
+      elif disk_op == constants.DDM_ADD:
+        # add a new disk
+        if instance.disk_template == constants.DT_FILE:
+          file_driver, file_path = instance.disks[0].logical_id
+          file_path = os.path.dirname(file_path)
+        else:
+          file_driver = file_path = None
+        disk_idx_base = len(instance.disks)
+        new_disk = _GenerateDiskTemplate(self,
+                                         instance.disk_template,
+                                         instance.name, instance.primary_node,
+                                         instance.secondary_nodes,
+                                         [disk_dict],
+                                         file_path,
+                                         file_driver,
+                                         disk_idx_base)[0]
+        instance.disks.append(new_disk)
+        info = _GetInstanceInfoText(instance)
+
+        logging.info("Creating volume %s for instance %s",
+                     new_disk.iv_name, instance.name)
+        # Note: this needs to be kept in sync with _CreateDisks
+        #HARDCODE
+        for node in instance.all_nodes:
+          f_create = node == instance.primary_node
+          try:
+            _CreateBlockDev(self, node, instance, new_disk,
+                            f_create, info, f_create)
+          except errors.OpExecError, err:
+            self.LogWarning("Failed to create volume %s (%s) on"
+                            " node %s: %s",
+                            new_disk.iv_name, new_disk, node, err)
+        result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
+                       (new_disk.size, new_disk.mode)))
       else:
-        instance.hvm_boot_order = self.hvm_boot_order
-      result.append(("hvm_boot_order", self.hvm_boot_order))
-    if self.hvm_acpi is not None:
-      instance.hvm_acpi = self.hvm_acpi
-      result.append(("hvm_acpi", self.hvm_acpi))
-    if self.hvm_pae is not None:
-      instance.hvm_pae = self.hvm_pae
-      result.append(("hvm_pae", self.hvm_pae))
-    if self.hvm_nic_type is not None:
-      instance.hvm_nic_type = self.hvm_nic_type
-      result.append(("hvm_nic_type", self.hvm_nic_type))
-    if self.hvm_disk_type is not None:
-      instance.hvm_disk_type = self.hvm_disk_type
-      result.append(("hvm_disk_type", self.hvm_disk_type))
-    if self.hvm_cdrom_image_path:
-      if self.hvm_cdrom_image_path == constants.VALUE_NONE:
-        instance.hvm_cdrom_image_path = None
+        # change a given disk
+        instance.disks[disk_op].mode = disk_dict['mode']
+        result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
+    # NIC changes
+    for nic_op, nic_dict in self.op.nics:
+      if nic_op == constants.DDM_REMOVE:
+        # remove the last nic
+        del instance.nics[-1]
+        result.append(("nic.%d" % len(instance.nics), "remove"))
+      elif nic_op == constants.DDM_ADD:
+        # add a new nic
+        if 'mac' not in nic_dict:
+          mac = constants.VALUE_GENERATE
+        else:
+          mac = nic_dict['mac']
+        if mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
+          mac = self.cfg.GenerateMAC()
+        new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
+                              bridge=nic_dict.get('bridge', None))
+        instance.nics.append(new_nic)
+        result.append(("nic.%d" % (len(instance.nics) - 1),
+                       "add:mac=%s,ip=%s,bridge=%s" %
+                       (new_nic.mac, new_nic.ip, new_nic.bridge)))
       else:
-        instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
-      result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
-    if self.vnc_bind_address:
-      instance.vnc_bind_address = self.vnc_bind_address
-      result.append(("vnc_bind_address", self.vnc_bind_address))
+        # change a given nic
+        for key in 'mac', 'ip', 'bridge':
+          if key in nic_dict:
+            setattr(instance.nics[nic_op], key, nic_dict[key])
+            result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
+
+    # hvparams changes
+    if self.op.hvparams:
+      instance.hvparams = self.hv_new
+      for key, val in self.op.hvparams.iteritems():
+        result.append(("hv/%s" % key, val))
+
+    # beparams changes
+    if self.op.beparams:
+      instance.beparams = self.be_inst
+      for key, val in self.op.beparams.iteritems():
+        result.append(("be/%s" % key, val))
 
     self.cfg.Update(instance)
 
@@ -4457,24 +5888,42 @@ class LUQueryExports(NoHooksLU):
   """Query the exports list
 
   """
-  _OP_REQP = []
+  _OP_REQP = ['nodes']
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    self.needed_locks = {}
+    self.share_locks[locking.LEVEL_NODE] = 1
+    if not self.op.nodes:
+      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+    else:
+      self.needed_locks[locking.LEVEL_NODE] = \
+        _GetWantedNodes(self, self.op.nodes)
 
   def CheckPrereq(self):
-    """Check that the nodelist contains only existing nodes.
+    """Check prerequisites.
 
     """
-    self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
+    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
 
   def Exec(self, feedback_fn):
     """Compute the list of all the exported system images.
 
-    Returns:
-      a dictionary with the structure node->(export-list)
-      where export-list is a list of the instances exported on
-      that node.
+    @rtype: dict
+    @return: a dictionary with the structure node->(export-list)
+        where export-list is a list of the instances exported on
+        that node.
 
     """
-    return rpc.call_export_list(self.nodes)
+    rpcresult = self.rpc.call_export_list(self.nodes)
+    result = {}
+    for node in rpcresult:
+      if rpcresult[node].failed:
+        result[node] = False
+      else:
+        result[node] = rpcresult[node].data
+
+    return result
 
 
 class LUExportInstance(LogicalUnit):
@@ -4484,6 +5933,23 @@ class LUExportInstance(LogicalUnit):
   HPATH = "instance-export"
   HTYPE = constants.HTYPE_INSTANCE
   _OP_REQP = ["instance_name", "target_node", "shutdown"]
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    self._ExpandAndLockInstance()
+    # FIXME: lock only instance primary and destination node
+    #
+    # Sad but true, for now we have do lock all nodes, as we don't know where
+    # the previous export might be, and and in this LU we search for it and
+    # remove it from its current node. In the future we could fix this by:
+    #  - making a tasklet to search (share-lock all), then create the new one,
+    #    then one to remove, after
+    #  - removing the removal operation altoghether
+    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+
+  def DeclareLocks(self, level):
+    """Last minute lock declaration."""
+    # All nodes are locked anyway, so nothing to do here.
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -4495,8 +5961,8 @@ class LUExportInstance(LogicalUnit):
       "EXPORT_NODE": self.op.target_node,
       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
       }
-    env.update(_BuildInstanceHookEnvByObject(self.instance))
-    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
+    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
+    nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
           self.op.target_node]
     return env, nl, nl
 
@@ -4506,20 +5972,19 @@ class LUExportInstance(LogicalUnit):
     This checks that the instance and node names are valid.
 
     """
-    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
+    instance_name = self.op.instance_name
     self.instance = self.cfg.GetInstanceInfo(instance_name)
-    if self.instance is None:
-      raise errors.OpPrereqError("Instance '%s' not found" %
-                                 self.op.instance_name)
+    assert self.instance is not None, \
+          "Cannot retrieve locked instance %s" % self.op.instance_name
+    _CheckNodeOnline(self, self.instance.primary_node)
 
-    # node verification
-    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
-    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
+    self.dst_node = self.cfg.GetNodeInfo(
+      self.cfg.ExpandNodeName(self.op.target_node))
 
     if self.dst_node is None:
-      raise errors.OpPrereqError("Destination node '%s' is unknown." %
-                                 self.op.target_node)
-    self.op.target_node = self.dst_node.name
+      # This is wrong node name, not a non-locked node
+      raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
+    _CheckNodeOnline(self, self.dst_node.name)
 
     # instance disk type verification
     for disk in self.instance.disks:
@@ -4536,7 +6001,9 @@ class LUExportInstance(LogicalUnit):
     src_node = instance.primary_node
     if self.op.shutdown:
       # shutdown the instance, but not the disks
-      if not rpc.call_instance_shutdown(src_node, instance):
+      result = self.rpc.call_instance_shutdown(src_node, instance)
+      result.Raise()
+      if not result.data:
         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
                                  (instance.name, src_node))
 
@@ -4544,41 +6011,54 @@ class LUExportInstance(LogicalUnit):
 
     snap_disks = []
 
+    # set the disks ID correctly since call_instance_start needs the
+    # correct drbd minor to create the symlinks
+    for disk in instance.disks:
+      self.cfg.SetDiskID(disk, src_node)
+
     try:
       for disk in instance.disks:
-        if disk.iv_name == "sda":
-          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
-          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
-
-          if not new_dev_name:
-            logger.Error("could not snapshot block device %s on node %s" %
-                         (disk.logical_id[1], src_node))
-          else:
-            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
-                                      logical_id=(vgname, new_dev_name),
-                                      physical_id=(vgname, new_dev_name),
-                                      iv_name=disk.iv_name)
-            snap_disks.append(new_dev)
+        # new_dev_name will be a snapshot of an lvm leaf of the one we passed
+        new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
+        if new_dev_name.failed or not new_dev_name.data:
+          self.LogWarning("Could not snapshot block device %s on node %s",
+                          disk.logical_id[1], src_node)
+          snap_disks.append(False)
+        else:
+          new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
+                                 logical_id=(vgname, new_dev_name.data),
+                                 physical_id=(vgname, new_dev_name.data),
+                                 iv_name=disk.iv_name)
+          snap_disks.append(new_dev)
 
     finally:
-      if self.op.shutdown and instance.status == "up":
-        if not rpc.call_instance_start(src_node, instance, None):
-          _ShutdownInstanceDisks(instance, self.cfg)
-          raise errors.OpExecError("Could not start instance")
+      if self.op.shutdown and instance.admin_up:
+        result = self.rpc.call_instance_start(src_node, instance, None)
+        msg = result.RemoteFailMsg()
+        if msg:
+          _ShutdownInstanceDisks(self, instance)
+          raise errors.OpExecError("Could not start instance: %s" % msg)
 
     # TODO: check for size
 
-    for dev in snap_disks:
-      if not rpc.call_snapshot_export(src_node, dev, dst_node.name, instance):
-        logger.Error("could not export block device %s from node %s to node %s"
-                     % (dev.logical_id[1], src_node, dst_node.name))
-      if not rpc.call_blockdev_remove(src_node, dev):
-        logger.Error("could not remove snapshot block device %s from node %s" %
-                     (dev.logical_id[1], src_node))
-
-    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
-      logger.Error("could not finalize export for instance %s on node %s" %
-                   (instance.name, dst_node.name))
+    cluster_name = self.cfg.GetClusterName()
+    for idx, dev in enumerate(snap_disks):
+      if dev:
+        result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
+                                               instance, cluster_name, idx)
+        if result.failed or not result.data:
+          self.LogWarning("Could not export block device %s from node %s to"
+                          " node %s", dev.logical_id[1], src_node,
+                          dst_node.name)
+        result = self.rpc.call_blockdev_remove(src_node, dev)
+        if result.failed or not result.data:
+          self.LogWarning("Could not remove snapshot block device %s from node"
+                          " %s", dev.logical_id[1], src_node)
+
+    result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
+    if result.failed or not result.data:
+      self.LogWarning("Could not finalize export for instance %s on node %s",
+                      instance.name, dst_node.name)
 
     nodelist = self.cfg.GetNodeList()
     nodelist.remove(dst_node.name)
@@ -4587,12 +6067,14 @@ class LUExportInstance(LogicalUnit):
     # if we proceed the backup would be removed because OpQueryExports
     # substitutes an empty list with the full cluster node list.
     if nodelist:
-      exportlist = rpc.call_export_list(nodelist)
+      exportlist = self.rpc.call_export_list(nodelist)
       for node in exportlist:
-        if instance.name in exportlist[node]:
-          if not rpc.call_export_remove(node, instance.name):
-            logger.Error("could not remove older export for instance %s"
-                         " on node %s" % (instance.name, node))
+        if exportlist[node].failed:
+          continue
+        if instance.name in exportlist[node].data:
+          if not self.rpc.call_export_remove(node, instance.name):
+            self.LogWarning("Could not remove older export for instance %s"
+                            " on node %s", instance.name, node)
 
 
 class LURemoveExport(NoHooksLU):
@@ -4600,6 +6082,14 @@ class LURemoveExport(NoHooksLU):
 
   """
   _OP_REQP = ["instance_name"]
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    self.needed_locks = {}
+    # We need all nodes to be locked in order for RemoveExport to work, but we
+    # don't need to lock the instance itself, as nothing will happen to it (and
+    # we can remove exports also for a removed instance)
+    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
 
   def CheckPrereq(self):
     """Check prerequisites.
@@ -4618,14 +6108,19 @@ class LURemoveExport(NoHooksLU):
       fqdn_warn = True
       instance_name = self.op.instance_name
 
-    exportlist = rpc.call_export_list(self.cfg.GetNodeList())
+    exportlist = self.rpc.call_export_list(self.acquired_locks[
+      locking.LEVEL_NODE])
     found = False
     for node in exportlist:
-      if instance_name in exportlist[node]:
+      if exportlist[node].failed:
+        self.LogWarning("Failed to query node %s, continuing" % node)
+        continue
+      if instance_name in exportlist[node].data:
         found = True
-        if not rpc.call_export_remove(node, instance_name):
-          logger.Error("could not remove export for instance %s"
-                       " on node %s" % (instance_name, node))
+        result = self.rpc.call_export_remove(node, instance_name)
+        if result.failed or not result.data:
+          logging.error("Could not remove export for instance %s"
+                        " on node %s", instance_name, node)
 
     if fqdn_warn and not found:
       feedback_fn("Export not found. If trying to remove an export belonging"
@@ -4639,26 +6134,34 @@ class TagsLU(NoHooksLU):
   This is an abstract class which is the parent of all the other tags LUs.
 
   """
-  def CheckPrereq(self):
-    """Check prerequisites.
 
-    """
-    if self.op.kind == constants.TAG_CLUSTER:
-      self.target = self.cfg.GetClusterInfo()
-    elif self.op.kind == constants.TAG_NODE:
+  def ExpandNames(self):
+    self.needed_locks = {}
+    if self.op.kind == constants.TAG_NODE:
       name = self.cfg.ExpandNodeName(self.op.name)
       if name is None:
         raise errors.OpPrereqError("Invalid node name (%s)" %
                                    (self.op.name,))
       self.op.name = name
-      self.target = self.cfg.GetNodeInfo(name)
+      self.needed_locks[locking.LEVEL_NODE] = name
     elif self.op.kind == constants.TAG_INSTANCE:
       name = self.cfg.ExpandInstanceName(self.op.name)
       if name is None:
         raise errors.OpPrereqError("Invalid instance name (%s)" %
                                    (self.op.name,))
       self.op.name = name
-      self.target = self.cfg.GetInstanceInfo(name)
+      self.needed_locks[locking.LEVEL_INSTANCE] = name
+
+  def CheckPrereq(self):
+    """Check prerequisites.
+
+    """
+    if self.op.kind == constants.TAG_CLUSTER:
+      self.target = self.cfg.GetClusterInfo()
+    elif self.op.kind == constants.TAG_NODE:
+      self.target = self.cfg.GetNodeInfo(self.op.name)
+    elif self.op.kind == constants.TAG_INSTANCE:
+      self.target = self.cfg.GetInstanceInfo(self.op.name)
     else:
       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
                                  str(self.op.kind))
@@ -4669,6 +6172,7 @@ class LUGetTags(TagsLU):
 
   """
   _OP_REQP = ["kind", "name"]
+  REQ_BGL = False
 
   def Exec(self, feedback_fn):
     """Returns the tag list.
@@ -4682,6 +6186,10 @@ class LUSearchTags(NoHooksLU):
 
   """
   _OP_REQP = ["pattern"]
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    self.needed_locks = {}
 
   def CheckPrereq(self):
     """Check prerequisites.
@@ -4701,9 +6209,9 @@ class LUSearchTags(NoHooksLU):
     """
     cfg = self.cfg
     tgts = [("/cluster", cfg.GetClusterInfo())]
-    ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
+    ilist = cfg.GetAllInstancesInfo().values()
     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
-    nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
+    nlist = cfg.GetAllNodesInfo().values()
     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
     results = []
     for path, target in tgts:
@@ -4718,6 +6226,7 @@ class LUAddTags(TagsLU):
 
   """
   _OP_REQP = ["kind", "name", "tags"]
+  REQ_BGL = False
 
   def CheckPrereq(self):
     """Check prerequisites.
@@ -4751,6 +6260,7 @@ class LUDelTags(TagsLU):
 
   """
   _OP_REQP = ["kind", "name", "tags"]
+  REQ_BGL = False
 
   def CheckPrereq(self):
     """Check prerequisites.
@@ -4821,20 +6331,21 @@ class LUTestDelay(NoHooksLU):
       if not utils.TestDelay(self.op.duration):
         raise errors.OpExecError("Error during master delay test")
     if self.op.on_nodes:
-      result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
+      result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
       if not result:
         raise errors.OpExecError("Complete failure from rpc call")
       for node, node_result in result.items():
-        if not node_result:
+        node_result.Raise()
+        if not node_result.data:
           raise errors.OpExecError("Failure during rpc call to node %s,"
-                                   " result: %s" % (node, node_result))
+                                   " result: %s" % (node, node_result.data))
 
 
 class IAllocator(object):
   """IAllocator framework.
 
   An IAllocator instance has three sets of attributes:
-    - cfg/sstore that are needed to query the cluster
+    - cfg that is needed to query the cluster
     - input data (all members of the _KEYS class attribute are required)
     - four buffer attributes (in|out_data|text), that represent the
       input (to the external script) in text and data structure format,
@@ -4845,15 +6356,14 @@ class IAllocator(object):
   """
   _ALLO_KEYS = [
     "mem_size", "disks", "disk_template",
-    "os", "tags", "nics", "vcpus",
+    "os", "tags", "nics", "vcpus", "hypervisor",
     ]
   _RELO_KEYS = [
     "relocate_from",
     ]
 
-  def __init__(self, cfg, sstore, mode, name, **kwargs):
-    self.cfg = cfg
-    self.sstore = sstore
+  def __init__(self, lu, mode, name, **kwargs):
+    self.lu = lu
     # init buffer variables
     self.in_text = self.out_text = self.in_data = self.out_data = None
     # init all input fields so that pylint is happy
@@ -4861,6 +6371,7 @@ class IAllocator(object):
     self.name = name
     self.mem_size = self.disks = self.disk_template = None
     self.os = self.tags = self.nics = self.vcpus = None
+    self.hypervisor = None
     self.relocate_from = None
     # computed fields
     self.required_nodes = None
@@ -4890,77 +6401,105 @@ class IAllocator(object):
     This is the data that is independent of the actual operation.
 
     """
-    cfg = self.cfg
+    cfg = self.lu.cfg
+    cluster_info = cfg.GetClusterInfo()
     # cluster data
     data = {
       "version": 1,
-      "cluster_name": self.sstore.GetClusterName(),
-      "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
-      "hypervisor_type": self.sstore.GetHypervisorType(),
+      "cluster_name": cfg.GetClusterName(),
+      "cluster_tags": list(cluster_info.GetTags()),
+      "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
       # we don't have job IDs
       }
-
-    i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
+    iinfo = cfg.GetAllInstancesInfo().values()
+    i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
 
     # node data
     node_results = {}
     node_list = cfg.GetNodeList()
-    node_data = rpc.call_node_info(node_list, cfg.GetVGName())
-    for nname in node_list:
+
+    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
+      hypervisor_name = self.hypervisor
+    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
+      hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
+
+    node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
+                                           hypervisor_name)
+    node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
+                       cluster_info.enabled_hypervisors)
+    for nname, nresult in node_data.items():
+      # first fill in static (config-based) values
       ninfo = cfg.GetNodeInfo(nname)
-      if nname not in node_data or not isinstance(node_data[nname], dict):
-        raise errors.OpExecError("Can't get data for node %s" % nname)
-      remote_info = node_data[nname]
-      for attr in ['memory_total', 'memory_free', 'memory_dom0',
-                   'vg_size', 'vg_free', 'cpu_total']:
-        if attr not in remote_info:
-          raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
-                                   (nname, attr))
-        try:
-          remote_info[attr] = int(remote_info[attr])
-        except ValueError, err:
-          raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
-                                   " %s" % (nname, attr, str(err)))
-      # compute memory used by primary instances
-      i_p_mem = i_p_up_mem = 0
-      for iinfo in i_list:
-        if iinfo.primary_node == nname:
-          i_p_mem += iinfo.memory
-          if iinfo.status == "up":
-            i_p_up_mem += iinfo.memory
-
-      # compute memory used by instances
       pnr = {
         "tags": list(ninfo.GetTags()),
-        "total_memory": remote_info['memory_total'],
-        "reserved_memory": remote_info['memory_dom0'],
-        "free_memory": remote_info['memory_free'],
-        "i_pri_memory": i_p_mem,
-        "i_pri_up_memory": i_p_up_mem,
-        "total_disk": remote_info['vg_size'],
-        "free_disk": remote_info['vg_free'],
         "primary_ip": ninfo.primary_ip,
         "secondary_ip": ninfo.secondary_ip,
-        "total_cpus": remote_info['cpu_total'],
+        "offline": ninfo.offline,
+        "master_candidate": ninfo.master_candidate,
         }
+
+      if not ninfo.offline:
+        nresult.Raise()
+        if not isinstance(nresult.data, dict):
+          raise errors.OpExecError("Can't get data for node %s" % nname)
+        remote_info = nresult.data
+        for attr in ['memory_total', 'memory_free', 'memory_dom0',
+                     'vg_size', 'vg_free', 'cpu_total']:
+          if attr not in remote_info:
+            raise errors.OpExecError("Node '%s' didn't return attribute"
+                                     " '%s'" % (nname, attr))
+          try:
+            remote_info[attr] = int(remote_info[attr])
+          except ValueError, err:
+            raise errors.OpExecError("Node '%s' returned invalid value"
+                                     " for '%s': %s" % (nname, attr, err))
+        # compute memory used by primary instances
+        i_p_mem = i_p_up_mem = 0
+        for iinfo, beinfo in i_list:
+          if iinfo.primary_node == nname:
+            i_p_mem += beinfo[constants.BE_MEMORY]
+            if iinfo.name not in node_iinfo[nname].data:
+              i_used_mem = 0
+            else:
+              i_used_mem = int(node_iinfo[nname].data[iinfo.name]['memory'])
+            i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
+            remote_info['memory_free'] -= max(0, i_mem_diff)
+
+            if iinfo.admin_up:
+              i_p_up_mem += beinfo[constants.BE_MEMORY]
+
+        # compute memory used by instances
+        pnr_dyn = {
+          "total_memory": remote_info['memory_total'],
+          "reserved_memory": remote_info['memory_dom0'],
+          "free_memory": remote_info['memory_free'],
+          "total_disk": remote_info['vg_size'],
+          "free_disk": remote_info['vg_free'],
+          "total_cpus": remote_info['cpu_total'],
+          "i_pri_memory": i_p_mem,
+          "i_pri_up_memory": i_p_up_mem,
+          }
+        pnr.update(pnr_dyn)
+
       node_results[nname] = pnr
     data["nodes"] = node_results
 
     # instance data
     instance_data = {}
-    for iinfo in i_list:
+    for iinfo, beinfo in i_list:
       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
                   for n in iinfo.nics]
       pir = {
         "tags": list(iinfo.GetTags()),
-        "should_run": iinfo.status == "up",
-        "vcpus": iinfo.vcpus,
-        "memory": iinfo.memory,
+        "admin_up": iinfo.admin_up,
+        "vcpus": beinfo[constants.BE_VCPUS],
+        "memory": beinfo[constants.BE_MEMORY],
         "os": iinfo.os,
         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
         "nics": nic_data,
-        "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
+        "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
         "disk_template": iinfo.disk_template,
+        "hypervisor": iinfo.hypervisor,
         }
       instance_data[iinfo.name] = pir
 
@@ -4979,11 +6518,8 @@ class IAllocator(object):
 
     """
     data = self.in_data
-    if len(self.disks) != 2:
-      raise errors.OpExecError("Only two-disk configurations supported")
 
-    disk_space = _ComputeDiskSize(self.disk_template,
-                                  self.disks[0]["size"], self.disks[1]["size"])
+    disk_space = _ComputeDiskSize(self.disk_template, self.disks)
 
     if self.disk_template in constants.DTS_NET_MIRROR:
       self.required_nodes = 2
@@ -5014,7 +6550,7 @@ class IAllocator(object):
     done.
 
     """
-    instance = self.cfg.GetInstanceInfo(self.name)
+    instance = self.lu.cfg.GetInstanceInfo(self.name)
     if instance is None:
       raise errors.ProgrammerError("Unknown instance '%s' passed to"
                                    " IAllocator" % self.name)
@@ -5026,10 +6562,8 @@ class IAllocator(object):
       raise errors.OpPrereqError("Instance has not exactly one secondary node")
 
     self.required_nodes = 1
-
-    disk_space = _ComputeDiskSize(instance.disk_template,
-                                  instance.disks[0].size,
-                                  instance.disks[1].size)
+    disk_sizes = [{'size': disk.size} for disk in instance.disks]
+    disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
 
     request = {
       "type": "relocate",
@@ -5053,18 +6587,21 @@ class IAllocator(object):
 
     self.in_text = serializer.Dump(self.in_data)
 
-  def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
+  def Run(self, name, validate=True, call_fn=None):
     """Run an instance allocator and return the results.
 
     """
+    if call_fn is None:
+      call_fn = self.lu.rpc.call_iallocator_runner
     data = self.in_text
 
-    result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
+    result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
+    result.Raise()
 
-    if not isinstance(result, tuple) or len(result) != 4:
+    if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
       raise errors.OpExecError("Invalid result from master iallocator runner")
 
-    rcode, stdout, stderr, fail = result
+    rcode, stdout, stderr, fail = result.data
 
     if rcode == constants.IARUN_NOTFOUND:
       raise errors.OpExecError("Can't find allocator '%s'" % name)
@@ -5137,8 +6674,6 @@ class LUTestAllocator(NoHooksLU):
                                      " 'nics' parameter")
       if not isinstance(self.op.disks, list):
         raise errors.OpPrereqError("Invalid parameter 'disks'")
-      if len(self.op.disks) != 2:
-        raise errors.OpPrereqError("Only two-disk configurations supported")
       for row in self.op.disks:
         if (not isinstance(row, dict) or
             "size" not in row or
@@ -5147,6 +6682,8 @@ class LUTestAllocator(NoHooksLU):
             row["mode"] not in ['r', 'w']):
           raise errors.OpPrereqError("Invalid contents of the"
                                      " 'disks' parameter")
+      if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
+        self.op.hypervisor = self.cfg.GetHypervisorType()
     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
       if not hasattr(self.op, "name"):
         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
@@ -5172,7 +6709,7 @@ class LUTestAllocator(NoHooksLU):
 
     """
     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
-      ial = IAllocator(self.cfg, self.sstore,
+      ial = IAllocator(self,
                        mode=self.op.mode,
                        name=self.op.name,
                        mem_size=self.op.mem_size,
@@ -5182,9 +6719,10 @@ class LUTestAllocator(NoHooksLU):
                        tags=self.op.tags,
                        nics=self.op.nics,
                        vcpus=self.op.vcpus,
+                       hypervisor=self.op.hypervisor,
                        )
     else:
-      ial = IAllocator(self.cfg, self.sstore,
+      ial = IAllocator(self,
                        mode=self.op.mode,
                        name=self.op.name,
                        relocate_from=list(self.relocate_from),