Improve LUQueryNodes for lockless case
[ganeti-local] / lib / cmdlib.py
index 52c08f4..87ce76d 100644 (file)
@@ -25,9 +25,7 @@
 
 import os
 import os.path
-import sha
 import time
-import tempfile
 import re
 import platform
 import logging
@@ -40,8 +38,8 @@ from ganeti import hypervisor
 from ganeti import locking
 from ganeti import constants
 from ganeti import objects
-from ganeti import opcodes
 from ganeti import serializer
+from ganeti import ssconf
 
 
 class LogicalUnit(object):
@@ -67,7 +65,7 @@ class LogicalUnit(object):
   def __init__(self, processor, op, context, rpc):
     """Constructor for LogicalUnit.
 
-    This needs to be overriden in derived classes in order to check op
+    This needs to be overridden in derived classes in order to check op
     validity.
 
     """
@@ -115,7 +113,7 @@ class LogicalUnit(object):
     CheckPrereq, doing these separate is better because:
 
       - ExpandNames is left as as purely a lock-related function
-      - CheckPrereq is run after we have aquired locks (and possible
+      - CheckPrereq is run after we have acquired locks (and possible
         waited for them)
 
     The function is allowed to change the self.op attribute so that
@@ -390,8 +388,8 @@ def _GetWantedInstances(lu, instances):
       wanted.append(instance)
 
   else:
-    wanted = lu.cfg.GetInstanceList()
-  return utils.NiceSort(wanted)
+    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
+  return wanted
 
 
 def _CheckOutputFields(static, dynamic, selected):
@@ -413,8 +411,47 @@ def _CheckOutputFields(static, dynamic, selected):
                                % ",".join(delta))
 
 
+def _CheckBooleanOpField(op, name):
+  """Validates boolean opcode parameters.
+
+  This will ensure that an opcode parameter is either a boolean value,
+  or None (but that it always exists).
+
+  """
+  val = getattr(op, name, None)
+  if not (val is None or isinstance(val, bool)):
+    raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
+                               (name, str(val)))
+  setattr(op, name, val)
+
+
+def _CheckNodeOnline(lu, node):
+  """Ensure that a given node is online.
+
+  @param lu: the LU on behalf of which we make the check
+  @param node: the node to check
+  @raise errors.OpPrereqError: if the node is offline
+
+  """
+  if lu.cfg.GetNodeInfo(node).offline:
+    raise errors.OpPrereqError("Can't use offline node %s" % node)
+
+
+def _CheckNodeNotDrained(lu, node):
+  """Ensure that a given node is not drained.
+
+  @param lu: the LU on behalf of which we make the check
+  @param node: the node to check
+  @raise errors.OpPrereqError: if the node is drained
+
+  """
+  if lu.cfg.GetNodeInfo(node).drained:
+    raise errors.OpPrereqError("Can't use drained node %s" % node)
+
+
 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
-                          memory, vcpus, nics):
+                          memory, vcpus, nics, disk_template, disks,
+                          bep, hvp, hypervisor_name):
   """Builds instance related env variables for hooks
 
   This builds the hook environment from individual variables.
@@ -427,8 +464,8 @@ def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
   @param secondary_nodes: list of secondary nodes as strings
   @type os_type: string
   @param os_type: the name of the instance's OS
-  @type status: string
-  @param status: the desired status of the instances
+  @type status: boolean
+  @param status: the should_run status of the instance
   @type memory: string
   @param memory: the memory size of the instance
   @type vcpus: string
@@ -436,19 +473,35 @@ def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
   @type nics: list
   @param nics: list of tuples (ip, bridge, mac) representing
       the NICs the instance  has
+  @type disk_template: string
+  @param disk_template: the disk template of the instance
+  @type disks: list
+  @param disks: the list of (size, mode) pairs
+  @type bep: dict
+  @param bep: the backend parameters for the instance
+  @type hvp: dict
+  @param hvp: the hypervisor parameters for the instance
+  @type hypervisor_name: string
+  @param hypervisor_name: the hypervisor for the instance
   @rtype: dict
   @return: the hook environment for this instance
 
   """
+  if status:
+    str_status = "up"
+  else:
+    str_status = "down"
   env = {
     "OP_TARGET": name,
     "INSTANCE_NAME": name,
     "INSTANCE_PRIMARY": primary_node,
     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
     "INSTANCE_OS_TYPE": os_type,
-    "INSTANCE_STATUS": status,
+    "INSTANCE_STATUS": str_status,
     "INSTANCE_MEMORY": memory,
     "INSTANCE_VCPUS": vcpus,
+    "INSTANCE_DISK_TEMPLATE": disk_template,
+    "INSTANCE_HYPERVISOR": hypervisor_name,
   }
 
   if nics:
@@ -458,12 +511,26 @@ def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
         ip = ""
       env["INSTANCE_NIC%d_IP" % idx] = ip
       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
-      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
+      env["INSTANCE_NIC%d_MAC" % idx] = mac
   else:
     nic_count = 0
 
   env["INSTANCE_NIC_COUNT"] = nic_count
 
+  if disks:
+    disk_count = len(disks)
+    for idx, (size, mode) in enumerate(disks):
+      env["INSTANCE_DISK%d_SIZE" % idx] = size
+      env["INSTANCE_DISK%d_MODE" % idx] = mode
+  else:
+    disk_count = 0
+
+  env["INSTANCE_DISK_COUNT"] = disk_count
+
+  for source, kind in [(bep, "BE"), (hvp, "HV")]:
+    for key, value in source.items():
+      env["INSTANCE_%s_%s" % (kind, key)] = value
+
   return env
 
 
@@ -482,30 +549,55 @@ def _BuildInstanceHookEnvByObject(lu, instance, override=None):
   @return: the hook environment dictionary
 
   """
-  bep = lu.cfg.GetClusterInfo().FillBE(instance)
+  cluster = lu.cfg.GetClusterInfo()
+  bep = cluster.FillBE(instance)
+  hvp = cluster.FillHV(instance)
   args = {
     'name': instance.name,
     'primary_node': instance.primary_node,
     'secondary_nodes': instance.secondary_nodes,
     'os_type': instance.os,
-    'status': instance.os,
+    'status': instance.admin_up,
     'memory': bep[constants.BE_MEMORY],
     'vcpus': bep[constants.BE_VCPUS],
     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
+    'disk_template': instance.disk_template,
+    'disks': [(disk.size, disk.mode) for disk in instance.disks],
+    'bep': bep,
+    'hvp': hvp,
+    'hypervisor_name': instance.hypervisor,
   }
   if override:
     args.update(override)
   return _BuildInstanceHookEnv(**args)
 
 
+def _AdjustCandidatePool(lu):
+  """Adjust the candidate pool after node operations.
+
+  """
+  mod_list = lu.cfg.MaintainCandidatePool()
+  if mod_list:
+    lu.LogInfo("Promoted nodes to master candidate role: %s",
+               ", ".join(node.name for node in mod_list))
+    for name in mod_list:
+      lu.context.ReaddNode(name)
+  mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
+  if mc_now > mc_max:
+    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
+               (mc_now, mc_max))
+
+
 def _CheckInstanceBridgesExist(lu, instance):
-  """Check that the brigdes needed by an instance exist.
+  """Check that the bridges needed by an instance exist.
 
   """
-  # check bridges existance
+  # check bridges existence
   brlist = [nic.bridge for nic in instance.nics]
-  if not lu.rpc.call_bridges_exist(instance.primary_node, brlist):
-    raise errors.OpPrereqError("one or more target bridges %s does not"
+  result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
+  result.Raise()
+  if not result.data:
+    raise errors.OpPrereqError("One or more target bridges %s does not"
                                " exist on destination node '%s'" %
                                (brlist, instance.primary_node))
 
@@ -521,7 +613,7 @@ class LUDestroyCluster(NoHooksLU):
 
     This checks whether the cluster is empty.
 
-    Any errors are signalled by raising errors.OpPrereqError.
+    Any errors are signaled by raising errors.OpPrereqError.
 
     """
     master = self.cfg.GetMasterNode()
@@ -540,7 +632,9 @@ class LUDestroyCluster(NoHooksLU):
 
     """
     master = self.cfg.GetMasterNode()
-    if not self.rpc.call_node_stop_master(master, False):
+    result = self.rpc.call_node_stop_master(master, False)
+    result.Raise()
+    if not result.data:
       raise errors.OpExecError("Could not disable the master role")
     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
     utils.CreateBackup(priv_key)
@@ -564,103 +658,156 @@ class LUVerifyCluster(LogicalUnit):
     }
     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
 
-  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
-                  remote_version, feedback_fn):
+  def _VerifyNode(self, nodeinfo, file_list, local_cksum,
+                  node_result, feedback_fn, master_files,
+                  drbd_map, vg_name):
     """Run multiple tests against a node.
 
-    Test list::
+    Test list:
 
       - compares ganeti version
-      - checks vg existance and size > 20G
+      - checks vg existence and size > 20G
       - checks config file checksum
       - checks ssh to other nodes
 
-    @type node: string
-    @param node: the name of the node to check
+    @type nodeinfo: L{objects.Node}
+    @param nodeinfo: the node to check
     @param file_list: required list of files
     @param local_cksum: dictionary of local files and their checksums
-    @type vglist: dict
-    @param vglist: dictionary of volume group names and their size
     @param node_result: the results from the node
-    @param remote_version: the RPC version from the remote node
     @param feedback_fn: function used to accumulate results
+    @param master_files: list of files that only masters should have
+    @param drbd_map: the useddrbd minors for this node, in
+        form of minor: (instance, must_exist) which correspond to instances
+        and their running status
+    @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
 
     """
+    node = nodeinfo.name
+
+    # main result, node_result should be a non-empty dict
+    if not node_result or not isinstance(node_result, dict):
+      feedback_fn("  - ERROR: unable to verify node %s." % (node,))
+      return True
+
     # compares ganeti version
     local_version = constants.PROTOCOL_VERSION
-    if not remote_version:
+    remote_version = node_result.get('version', None)
+    if not (remote_version and isinstance(remote_version, (list, tuple)) and
+            len(remote_version) == 2):
       feedback_fn("  - ERROR: connection to %s failed" % (node))
       return True
 
-    if local_version != remote_version:
-      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
-                      (local_version, node, remote_version))
+    if local_version != remote_version[0]:
+      feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
+                  " node %s %s" % (local_version, node, remote_version[0]))
       return True
 
-    # checks vg existance and size > 20G
+    # node seems compatible, we can actually try to look into its results
 
     bad = False
-    if not vglist:
-      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
-                      (node,))
-      bad = True
-    else:
-      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
-                                            constants.MIN_VG_SIZE)
-      if vgstatus:
-        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
-        bad = True
 
-    if not node_result:
-      feedback_fn("  - ERROR: unable to verify node %s." % (node,))
-      return True
+    # full package version
+    if constants.RELEASE_VERSION != remote_version[1]:
+      feedback_fn("  - WARNING: software version mismatch: master %s,"
+                  " node %s %s" %
+                  (constants.RELEASE_VERSION, node, remote_version[1]))
+
+    # checks vg existence and size > 20G
+    if vg_name is not None:
+      vglist = node_result.get(constants.NV_VGLIST, None)
+      if not vglist:
+        feedback_fn("  - ERROR: unable to check volume groups on node %s." %
+                        (node,))
+        bad = True
+      else:
+        vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
+                                              constants.MIN_VG_SIZE)
+        if vgstatus:
+          feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
+          bad = True
 
     # checks config file checksum
-    # checks ssh to any
 
-    if 'filelist' not in node_result:
+    remote_cksum = node_result.get(constants.NV_FILELIST, None)
+    if not isinstance(remote_cksum, dict):
       bad = True
       feedback_fn("  - ERROR: node hasn't returned file checksum data")
     else:
-      remote_cksum = node_result['filelist']
       for file_name in file_list:
+        node_is_mc = nodeinfo.master_candidate
+        must_have_file = file_name not in master_files
         if file_name not in remote_cksum:
-          bad = True
-          feedback_fn("  - ERROR: file '%s' missing" % file_name)
+          if node_is_mc or must_have_file:
+            bad = True
+            feedback_fn("  - ERROR: file '%s' missing" % file_name)
         elif remote_cksum[file_name] != local_cksum[file_name]:
-          bad = True
-          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
+          if node_is_mc or must_have_file:
+            bad = True
+            feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
+          else:
+            # not candidate and this is not a must-have file
+            bad = True
+            feedback_fn("  - ERROR: file '%s' should not exist on non master"
+                        " candidates (and the file is outdated)" % file_name)
+        else:
+          # all good, except non-master/non-must have combination
+          if not node_is_mc and not must_have_file:
+            feedback_fn("  - ERROR: file '%s' should not exist on non master"
+                        " candidates" % file_name)
+
+    # checks ssh to any
 
-    if 'nodelist' not in node_result:
+    if constants.NV_NODELIST not in node_result:
       bad = True
       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
     else:
-      if node_result['nodelist']:
+      if node_result[constants.NV_NODELIST]:
         bad = True
-        for node in node_result['nodelist']:
+        for node in node_result[constants.NV_NODELIST]:
           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
-                          (node, node_result['nodelist'][node]))
-    if 'node-net-test' not in node_result:
+                          (node, node_result[constants.NV_NODELIST][node]))
+
+    if constants.NV_NODENETTEST not in node_result:
       bad = True
       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
     else:
-      if node_result['node-net-test']:
+      if node_result[constants.NV_NODENETTEST]:
         bad = True
-        nlist = utils.NiceSort(node_result['node-net-test'].keys())
+        nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
         for node in nlist:
           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
-                          (node, node_result['node-net-test'][node]))
+                          (node, node_result[constants.NV_NODENETTEST][node]))
 
-    hyp_result = node_result.get('hypervisor', None)
+    hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
     if isinstance(hyp_result, dict):
       for hv_name, hv_result in hyp_result.iteritems():
         if hv_result is not None:
           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
                       (hv_name, hv_result))
+
+    # check used drbd list
+    if vg_name is not None:
+      used_minors = node_result.get(constants.NV_DRBDLIST, [])
+      if not isinstance(used_minors, (tuple, list)):
+        feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
+                    str(used_minors))
+      else:
+        for minor, (iname, must_exist) in drbd_map.items():
+          if minor not in used_minors and must_exist:
+            feedback_fn("  - ERROR: drbd minor %d of instance %s is"
+                        " not active" % (minor, iname))
+            bad = True
+        for minor in used_minors:
+          if minor not in drbd_map:
+            feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
+                        minor)
+            bad = True
+
     return bad
 
   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
-                      node_instance, feedback_fn):
+                      node_instance, feedback_fn, n_offline):
     """Verify an instance.
 
     This function checks to see if the required block devices are
@@ -675,15 +822,19 @@ class LUVerifyCluster(LogicalUnit):
     instanceconfig.MapLVsByNode(node_vol_should)
 
     for node in node_vol_should:
+      if node in n_offline:
+        # ignore missing volumes on offline nodes
+        continue
       for volume in node_vol_should[node]:
         if node not in node_vol_is or volume not in node_vol_is[node]:
           feedback_fn("  - ERROR: volume %s missing on node %s" %
                           (volume, node))
           bad = True
 
-    if not instanceconfig.status == 'down':
-      if (node_current not in node_instance or
-          not instance in node_instance[node_current]):
+    if instanceconfig.admin_up:
+      if ((node_current not in node_instance or
+          not instance in node_instance[node_current]) and
+          node_current not in n_offline):
         feedback_fn("  - ERROR: instance %s not running on node %s" %
                         (instance, node_current))
         bad = True
@@ -754,7 +905,7 @@ class LUVerifyCluster(LogicalUnit):
           if bep[constants.BE_AUTO_BALANCE]:
             needed_mem += bep[constants.BE_MEMORY]
         if nodeinfo['mfree'] < needed_mem:
-          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
+          feedback_fn("  - ERROR: not enough memory on node %s to accommodate"
                       " failovers should node %s fail" % (node, prinode))
           bad = True
     return bad
@@ -773,13 +924,17 @@ class LUVerifyCluster(LogicalUnit):
   def BuildHooksEnv(self):
     """Build hooks env.
 
-    Cluster-Verify hooks just rone in the post phase and their failure makes
+    Cluster-Verify hooks just ran in the post phase and their failure makes
     the output be logged in the verify output and the verification to fail.
 
     """
     all_nodes = self.cfg.GetNodeList()
-    # TODO: populate the environment with useful information for verify hooks
-    env = {}
+    env = {
+      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
+      }
+    for node in self.cfg.GetAllNodesInfo().values():
+      env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
+
     return env, [], all_nodes
 
   def Exec(self, feedback_fn):
@@ -796,8 +951,12 @@ class LUVerifyCluster(LogicalUnit):
     nodelist = utils.NiceSort(self.cfg.GetNodeList())
     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
+    instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
+                        for iname in instancelist)
     i_non_redundant = [] # Non redundant instances
     i_non_a_balanced = [] # Non auto-balanced instances
+    n_offline = [] # List of offline nodes
+    n_drained = [] # List of nodes being drained
     node_volume = {}
     node_instance = {}
     node_info = {}
@@ -805,71 +964,116 @@ class LUVerifyCluster(LogicalUnit):
 
     # FIXME: verify OS list
     # do local checksums
-    file_names = []
+    master_files = [constants.CLUSTER_CONF_FILE]
+
+    file_names = ssconf.SimpleStore().GetFileList()
     file_names.append(constants.SSL_CERT_FILE)
-    file_names.append(constants.CLUSTER_CONF_FILE)
+    file_names.append(constants.RAPI_CERT_FILE)
+    file_names.extend(master_files)
+
     local_checksums = utils.FingerprintFiles(file_names)
 
     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
-    all_volumeinfo = self.rpc.call_volume_list(nodelist, vg_name)
-    all_instanceinfo = self.rpc.call_instance_list(nodelist, hypervisors)
-    all_vglist = self.rpc.call_vg_list(nodelist)
     node_verify_param = {
-      'filelist': file_names,
-      'nodelist': nodelist,
-      'hypervisor': hypervisors,
-      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
-                        for node in nodeinfo]
+      constants.NV_FILELIST: file_names,
+      constants.NV_NODELIST: [node.name for node in nodeinfo
+                              if not node.offline],
+      constants.NV_HYPERVISOR: hypervisors,
+      constants.NV_NODENETTEST: [(node.name, node.primary_ip,
+                                  node.secondary_ip) for node in nodeinfo
+                                 if not node.offline],
+      constants.NV_INSTANCELIST: hypervisors,
+      constants.NV_VERSION: None,
+      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
       }
+    if vg_name is not None:
+      node_verify_param[constants.NV_VGLIST] = None
+      node_verify_param[constants.NV_LVLIST] = vg_name
+      node_verify_param[constants.NV_DRBDLIST] = None
     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
                                            self.cfg.GetClusterName())
-    all_rversion = self.rpc.call_version(nodelist)
-    all_ninfo = self.rpc.call_node_info(nodelist, self.cfg.GetVGName(),
-                                        self.cfg.GetHypervisorType())
 
     cluster = self.cfg.GetClusterInfo()
-    for node in nodelist:
-      feedback_fn("* Verifying node %s" % node)
-      result = self._VerifyNode(node, file_names, local_checksums,
-                                all_vglist[node], all_nvinfo[node],
-                                all_rversion[node], feedback_fn)
-      bad = bad or result
+    master_node = self.cfg.GetMasterNode()
+    all_drbd_map = self.cfg.ComputeDRBDMap()
 
-      # node_volume
-      volumeinfo = all_volumeinfo[node]
+    for node_i in nodeinfo:
+      node = node_i.name
+      nresult = all_nvinfo[node].data
+
+      if node_i.offline:
+        feedback_fn("* Skipping offline node %s" % (node,))
+        n_offline.append(node)
+        continue
+
+      if node == master_node:
+        ntype = "master"
+      elif node_i.master_candidate:
+        ntype = "master candidate"
+      elif node_i.drained:
+        ntype = "drained"
+        n_drained.append(node)
+      else:
+        ntype = "regular"
+      feedback_fn("* Verifying node %s (%s)" % (node, ntype))
 
-      if isinstance(volumeinfo, basestring):
+      if all_nvinfo[node].failed or not isinstance(nresult, dict):
+        feedback_fn("  - ERROR: connection to %s failed" % (node,))
+        bad = True
+        continue
+
+      node_drbd = {}
+      for minor, instance in all_drbd_map[node].items():
+        if instance not in instanceinfo:
+          feedback_fn("  - ERROR: ghost instance '%s' in temporary DRBD map" %
+                      instance)
+          # ghost instance should not be running, but otherwise we
+          # don't give double warnings (both ghost instance and
+          # unallocated minor in use)
+          node_drbd[minor] = (instance, False)
+        else:
+          instance = instanceinfo[instance]
+          node_drbd[minor] = (instance.name, instance.admin_up)
+      result = self._VerifyNode(node_i, file_names, local_checksums,
+                                nresult, feedback_fn, master_files,
+                                node_drbd, vg_name)
+      bad = bad or result
+
+      lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
+      if vg_name is None:
+        node_volume[node] = {}
+      elif isinstance(lvdata, basestring):
         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
-                    (node, volumeinfo[-400:].encode('string_escape')))
+                    (node, utils.SafeEncode(lvdata)))
         bad = True
         node_volume[node] = {}
-      elif not isinstance(volumeinfo, dict):
-        feedback_fn("  - ERROR: connection to %s failed" % (node,))
+      elif not isinstance(lvdata, dict):
+        feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
         bad = True
         continue
       else:
-        node_volume[node] = volumeinfo
+        node_volume[node] = lvdata
 
       # node_instance
-      nodeinstance = all_instanceinfo[node]
-      if type(nodeinstance) != list:
-        feedback_fn("  - ERROR: connection to %s failed" % (node,))
+      idata = nresult.get(constants.NV_INSTANCELIST, None)
+      if not isinstance(idata, list):
+        feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
+                    (node,))
         bad = True
         continue
 
-      node_instance[node] = nodeinstance
+      node_instance[node] = idata
 
       # node_info
-      nodeinfo = all_ninfo[node]
+      nodeinfo = nresult.get(constants.NV_HVINFO, None)
       if not isinstance(nodeinfo, dict):
-        feedback_fn("  - ERROR: connection to %s failed" % (node,))
+        feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
         bad = True
         continue
 
       try:
         node_info[node] = {
           "mfree": int(nodeinfo['memory_free']),
-          "dfree": int(nodeinfo['vg_free']),
           "pinst": [],
           "sinst": [],
           # dictionary holding all instances this node is secondary for,
@@ -880,8 +1084,19 @@ class LUVerifyCluster(LogicalUnit):
           # secondary.
           "sinst-by-pnode": {},
         }
-      except ValueError:
-        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
+        # FIXME: devise a free space model for file based instances as well
+        if vg_name is not None:
+          if (constants.NV_VGLIST not in nresult or
+              vg_name not in nresult[constants.NV_VGLIST]):
+            feedback_fn("  - ERROR: node %s didn't return data for the"
+                        " volume group '%s' - it is either missing or broken" %
+                        (node, vg_name))
+            bad = True
+            continue
+          node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
+      except (ValueError, KeyError):
+        feedback_fn("  - ERROR: invalid nodeinfo value returned"
+                    " from node %s" % (node,))
         bad = True
         continue
 
@@ -889,10 +1104,11 @@ class LUVerifyCluster(LogicalUnit):
 
     for instance in instancelist:
       feedback_fn("* Verifying instance %s" % instance)
-      inst_config = self.cfg.GetInstanceInfo(instance)
+      inst_config = instanceinfo[instance]
       result =  self._VerifyInstance(instance, inst_config, node_volume,
-                                     node_instance, feedback_fn)
+                                     node_instance, feedback_fn, n_offline)
       bad = bad or result
+      inst_nodes_offline = []
 
       inst_config.MapLVsByNode(node_vol_should)
 
@@ -901,11 +1117,14 @@ class LUVerifyCluster(LogicalUnit):
       pnode = inst_config.primary_node
       if pnode in node_info:
         node_info[pnode]['pinst'].append(instance)
-      else:
+      elif pnode not in n_offline:
         feedback_fn("  - ERROR: instance %s, connection to primary node"
                     " %s failed" % (instance, pnode))
         bad = True
 
+      if pnode in n_offline:
+        inst_nodes_offline.append(pnode)
+
       # If the instance is non-redundant we cannot survive losing its primary
       # node, so we are not N+1 compliant. On the other hand we have no disk
       # templates with more than one secondary so that situation is not well
@@ -926,9 +1145,18 @@ class LUVerifyCluster(LogicalUnit):
           if pnode not in node_info[snode]['sinst-by-pnode']:
             node_info[snode]['sinst-by-pnode'][pnode] = []
           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
-        else:
+        elif snode not in n_offline:
           feedback_fn("  - ERROR: instance %s, connection to secondary node"
                       " %s failed" % (instance, snode))
+          bad = True
+        if snode in n_offline:
+          inst_nodes_offline.append(snode)
+
+      if inst_nodes_offline:
+        # warn that the instance lives on offline nodes, and set bad=True
+        feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
+                    ", ".join(inst_nodes_offline))
+        bad = True
 
     feedback_fn("* Verifying orphan volumes")
     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
@@ -954,10 +1182,16 @@ class LUVerifyCluster(LogicalUnit):
       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
                   % len(i_non_a_balanced))
 
+    if n_offline:
+      feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
+
+    if n_drained:
+      feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
+
     return not bad
 
   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
-    """Analize the post-hooks' result
+    """Analyze the post-hooks' result
 
     This method analyses the hook result, handles it, and sends some
     nicely-formatted feedback back to the user.
@@ -984,11 +1218,14 @@ class LUVerifyCluster(LogicalUnit):
         for node_name in hooks_results:
           show_node_header = True
           res = hooks_results[node_name]
-          if res is False or not isinstance(res, list):
-            feedback_fn("    Communication failure")
+          if res.failed or res.data is False or not isinstance(res.data, list):
+            if res.offline:
+              # no need to warn or set fail return value
+              continue
+            feedback_fn("    Communication failure in hooks execution")
             lu_result = 1
             continue
-          for script, hkr, output in res:
+          for script, hkr, output in res.data:
             if hkr == constants.HKR_FAIL:
               # The node header is only shown once, if there are
               # failing hooks on that node
@@ -1039,7 +1276,7 @@ class LUVerifyDisks(NoHooksLU):
     nv_dict = {}
     for inst in instances:
       inst_lvs = {}
-      if (inst.status != "up" or
+      if (not inst.admin_up or
           inst.disk_template not in constants.DTS_NET_MIRROR):
         continue
       inst.MapLVsByNode(inst_lvs)
@@ -1053,14 +1290,19 @@ class LUVerifyDisks(NoHooksLU):
 
     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
 
-    to_act = set()
     for node in nodes:
       # node_volume
       lvs = node_lvs[node]
-
+      if lvs.failed:
+        if not lvs.offline:
+          self.LogWarning("Connection to node %s failed: %s" %
+                          (node, lvs.data))
+        continue
+      lvs = lvs.data
       if isinstance(lvs, basestring):
         logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
         res_nlvm[node] = lvs
+        continue
       elif not isinstance(lvs, dict):
         logging.warning("Connection to node %s failed or invalid data"
                         " returned", node)
@@ -1083,6 +1325,128 @@ class LUVerifyDisks(NoHooksLU):
     return result
 
 
+class LURepairDiskSizes(NoHooksLU):
+  """Verifies the cluster disks sizes.
+
+  """
+  _OP_REQP = ["instances"]
+  REQ_BGL = False
+
+  def ExpandNames(self):
+
+    if not isinstance(self.op.instances, list):
+      raise errors.OpPrereqError("Invalid argument type 'instances'")
+
+    if self.op.instances:
+      self.wanted_names = []
+      for name in self.op.instances:
+        full_name = self.cfg.ExpandInstanceName(name)
+        if full_name is None:
+          raise errors.OpPrereqError("Instance '%s' not known" % name)
+        self.wanted_names.append(full_name)
+      self.needed_locks = {
+        locking.LEVEL_NODE: [],
+        locking.LEVEL_INSTANCE: self.wanted_names,
+        }
+      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+    else:
+      self.wanted_names = None
+      self.needed_locks = {
+        locking.LEVEL_NODE: locking.ALL_SET,
+        locking.LEVEL_INSTANCE: locking.ALL_SET,
+        }
+    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
+
+  def DeclareLocks(self, level):
+    if level == locking.LEVEL_NODE and self.wanted_names is not None:
+      self._LockInstancesNodes(primary_only=True)
+
+  def CheckPrereq(self):
+    """Check prerequisites.
+
+    This only checks the optional instance list against the existing names.
+
+    """
+    if self.wanted_names is None:
+      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
+
+    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
+                             in self.wanted_names]
+
+  def _EnsureChildSizes(self, disk):
+    """Ensure children of the disk have the needed disk size.
+
+    This is valid mainly for DRBD8 and fixes an issue where the
+    children have smaller disk size.
+
+    @param disk: an L{ganeti.objects.Disk} object
+
+    """
+    if disk.dev_type == constants.LD_DRBD8:
+      assert disk.children, "Empty children for DRBD8?"
+      fchild = disk.children[0]
+      mismatch = fchild.size < disk.size
+      if mismatch:
+        self.LogInfo("Child disk has size %d, parent %d, fixing",
+                     fchild.size, disk.size)
+        fchild.size = disk.size
+
+      # and we recurse on this child only, not on the metadev
+      return self._EnsureChildSizes(fchild) or mismatch
+    else:
+      return False
+
+  def Exec(self, feedback_fn):
+    """Verify the size of cluster disks.
+
+    """
+    # TODO: check child disks too
+    # TODO: check differences in size between primary/secondary nodes
+    per_node_disks = {}
+    for instance in self.wanted_instances:
+      pnode = instance.primary_node
+      if pnode not in per_node_disks:
+        per_node_disks[pnode] = []
+      for idx, disk in enumerate(instance.disks):
+        per_node_disks[pnode].append((instance, idx, disk))
+
+    changed = []
+    for node, dskl in per_node_disks.items():
+      newl = [v[2].Copy() for v in dskl]
+      for dsk in newl:
+        self.cfg.SetDiskID(dsk, node)
+      result = self.rpc.call_blockdev_getsizes(node, newl)
+      if result.failed:
+        self.LogWarning("Failure in blockdev_getsizes call to node"
+                        " %s, ignoring", node)
+        continue
+      if len(result.data) != len(dskl):
+        self.LogWarning("Invalid result from node %s, ignoring node results",
+                        node)
+        continue
+      for ((instance, idx, disk), size) in zip(dskl, result.data):
+        if size is None:
+          self.LogWarning("Disk %d of instance %s did not return size"
+                          " information, ignoring", idx, instance.name)
+          continue
+        if not isinstance(size, (int, long)):
+          self.LogWarning("Disk %d of instance %s did not return valid"
+                          " size information, ignoring", idx, instance.name)
+          continue
+        size = size >> 20
+        if size != disk.size:
+          self.LogInfo("Disk %d of instance %s has mismatched size,"
+                       " correcting: recorded %d, actual %d", idx,
+                       instance.name, disk.size, size)
+          disk.size = size
+          self.cfg.Update(instance)
+          changed.append((instance.name, idx, size))
+        if self._EnsureChildSizes(disk):
+          self.cfg.Update(instance)
+          changed.append((instance.name, idx, disk.size))
+    return changed
+
+
 class LURenameCluster(LogicalUnit):
   """Rename the cluster.
 
@@ -1132,31 +1496,33 @@ class LURenameCluster(LogicalUnit):
 
     # shutdown the master IP
     master = self.cfg.GetMasterNode()
-    if not self.rpc.call_node_stop_master(master, False):
+    result = self.rpc.call_node_stop_master(master, False)
+    if result.failed or not result.data:
       raise errors.OpExecError("Could not disable the master role")
 
     try:
-      # modify the sstore
-      # TODO: sstore
-      ss.SetKey(ss.SS_MASTER_IP, ip)
-      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
-
-      # Distribute updated ss config to all nodes
-      myself = self.cfg.GetNodeInfo(master)
-      dist_nodes = self.cfg.GetNodeList()
-      if myself.name in dist_nodes:
-        dist_nodes.remove(myself.name)
-
-      logging.debug("Copying updated ssconf data to all nodes")
-      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
-        fname = ss.KeyToFilename(keyname)
-        result = self.rpc.call_upload_file(dist_nodes, fname)
-        for to_node in dist_nodes:
-          if not result[to_node]:
-            self.LogWarning("Copy of file %s to node %s failed",
-                            fname, to_node)
+      cluster = self.cfg.GetClusterInfo()
+      cluster.cluster_name = clustername
+      cluster.master_ip = ip
+      self.cfg.Update(cluster)
+
+      # update the known hosts file
+      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
+      node_list = self.cfg.GetNodeList()
+      try:
+        node_list.remove(master)
+      except ValueError:
+        pass
+      result = self.rpc.call_upload_file(node_list,
+                                         constants.SSH_KNOWN_HOSTS_FILE)
+      for to_node, to_result in result.iteritems():
+        if to_result.failed or not to_result.data:
+          logging.error("Copy of file %s to node %s failed",
+                        constants.SSH_KNOWN_HOSTS_FILE, to_node)
+
     finally:
-      if not self.rpc.call_node_start_master(master, False):
+      result = self.rpc.call_node_start_master(master, False, False)
+      if result.failed or not result.data:
         self.LogWarning("Could not re-enable the master role on"
                         " the master, please restart manually.")
 
@@ -1166,7 +1532,7 @@ def _RecursiveCheckIfLVMBased(disk):
 
   @type disk: L{objects.Disk}
   @param disk: the disk to check
-  @rtype: booleean
+  @rtype: boolean
   @return: boolean indicating whether a LD_LV dev_type was found or not
 
   """
@@ -1186,6 +1552,21 @@ class LUSetClusterParams(LogicalUnit):
   _OP_REQP = []
   REQ_BGL = False
 
+  def CheckArguments(self):
+    """Check parameters
+
+    """
+    if not hasattr(self.op, "candidate_pool_size"):
+      self.op.candidate_pool_size = None
+    if self.op.candidate_pool_size is not None:
+      try:
+        self.op.candidate_pool_size = int(self.op.candidate_pool_size)
+      except (ValueError, TypeError), err:
+        raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
+                                   str(err))
+      if self.op.candidate_pool_size < 1:
+        raise errors.OpPrereqError("At least one master candidate needed")
+
   def ExpandNames(self):
     # FIXME: in the future maybe other cluster params won't require checking on
     # all nodes to be modified.
@@ -1212,8 +1593,6 @@ class LUSetClusterParams(LogicalUnit):
     if the given volume group is valid.
 
     """
-    # FIXME: This only works because there is only one parameter that can be
-    # changed or removed.
     if self.op.vg_name is not None and not self.op.vg_name:
       instances = self.cfg.GetAllInstancesInfo().values()
       for inst in instances:
@@ -1228,16 +1607,21 @@ class LUSetClusterParams(LogicalUnit):
     if self.op.vg_name:
       vglist = self.rpc.call_vg_list(node_list)
       for node in node_list:
-        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
+        if vglist[node].failed:
+          # ignoring down node
+          self.LogWarning("Node %s unreachable/error, ignoring" % node)
+          continue
+        vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
+                                              self.op.vg_name,
                                               constants.MIN_VG_SIZE)
         if vgstatus:
           raise errors.OpPrereqError("Error on node '%s': %s" %
                                      (node, vgstatus))
 
     self.cluster = cluster = self.cfg.GetClusterInfo()
-    # beparams changes do not need validation (we can't validate?),
-    # but we still process here
+    # validate beparams changes
     if self.op.beparams:
+      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
       self.new_beparams = cluster.FillDict(
         cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
 
@@ -1254,6 +1638,13 @@ class LUSetClusterParams(LogicalUnit):
 
     if self.op.enabled_hypervisors is not None:
       self.hv_list = self.op.enabled_hypervisors
+      if not self.hv_list:
+        raise errors.OpPrereqError("Enabled hypervisors list must contain at"
+                                   " least one member")
+      invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES
+      if invalid_hvs:
+        raise errors.OpPrereqError("Enabled hypervisors contains invalid"
+                                   " entries: %s" % invalid_hvs)
     else:
       self.hv_list = cluster.enabled_hypervisors
 
@@ -1265,6 +1656,7 @@ class LUSetClusterParams(LogicalUnit):
              hv_name in self.op.enabled_hypervisors)):
           # either this is a new hypervisor, or its parameters have changed
           hv_class = hypervisor.GetHypervisor(hv_name)
+          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
           hv_class.CheckParameterSyntax(hv_params)
           _CheckHVParams(self, node_list, hv_name, hv_params)
 
@@ -1273,8 +1665,11 @@ class LUSetClusterParams(LogicalUnit):
 
     """
     if self.op.vg_name is not None:
-      if self.op.vg_name != self.cfg.GetVGName():
-        self.cfg.SetVGName(self.op.vg_name)
+      new_volume = self.op.vg_name
+      if not new_volume:
+        new_volume = None
+      if new_volume != self.cfg.GetVGName():
+        self.cfg.SetVGName(new_volume)
       else:
         feedback_fn("Cluster LVM configuration already in desired"
                     " state, not changing")
@@ -1284,9 +1679,41 @@ class LUSetClusterParams(LogicalUnit):
       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
     if self.op.beparams:
       self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
+    if self.op.candidate_pool_size is not None:
+      self.cluster.candidate_pool_size = self.op.candidate_pool_size
+      # we need to update the pool size here, otherwise the save will fail
+      _AdjustCandidatePool(self)
+
     self.cfg.Update(self.cluster)
 
 
+class LURedistributeConfig(NoHooksLU):
+  """Force the redistribution of cluster configuration.
+
+  This is a very simple LU.
+
+  """
+  _OP_REQP = []
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    self.needed_locks = {
+      locking.LEVEL_NODE: locking.ALL_SET,
+    }
+    self.share_locks[locking.LEVEL_NODE] = 1
+
+  def CheckPrereq(self):
+    """Check prerequisites.
+
+    """
+
+  def Exec(self, feedback_fn):
+    """Redistribute the configuration.
+
+    """
+    self.cfg.Update(self.cfg.GetClusterInfo())
+
+
 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
   """Sleep and poll for an instance's disk to sync.
 
@@ -1303,12 +1730,13 @@ def _WaitForSync(lu, instance, oneshot=False, unlock=False):
     lu.cfg.SetDiskID(dev, node)
 
   retries = 0
+  degr_retries = 10 # in seconds, as we sleep 1 second each time
   while True:
     max_time = 0
     done = True
     cumul_degraded = False
     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
-    if not rstats:
+    if rstats.failed or not rstats.data:
       lu.LogWarning("Can't get any data from node %s", node)
       retries += 1
       if retries >= 10:
@@ -1316,9 +1744,9 @@ def _WaitForSync(lu, instance, oneshot=False, unlock=False):
                                  " aborting." % node)
       time.sleep(6)
       continue
+    rstats = rstats.data
     retries = 0
-    for i in range(len(rstats)):
-      mstat = rstats[i]
+    for i, mstat in enumerate(rstats):
       if mstat is None:
         lu.LogWarning("Can't compute data for node %s/%s",
                            node, instance.disks[i].iv_name)
@@ -1335,6 +1763,16 @@ def _WaitForSync(lu, instance, oneshot=False, unlock=False):
           rem_time = "no time estimate"
         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
                         (instance.disks[i].iv_name, perc_done, rem_time))
+
+    # if we're done but degraded, let's do a few small retries, to
+    # make sure we see a stable and not transient situation; therefore
+    # we force restart of the loop
+    if (done or oneshot) and cumul_degraded and degr_retries > 0:
+      logging.info("Degraded disks found, %d retries left", degr_retries)
+      degr_retries -= 1
+      time.sleep(1)
+      continue
+
     if done or oneshot:
       break
 
@@ -1362,11 +1800,15 @@ def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
   result = True
   if on_primary or dev.AssembleOnSecondary():
     rstats = lu.rpc.call_blockdev_find(node, dev)
-    if not rstats:
-      logging.warning("Node %s: disk degraded, not found or node down", node)
+    msg = rstats.RemoteFailMsg()
+    if msg:
+      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
+      result = False
+    elif not rstats.payload:
+      lu.LogWarning("Can't find disk on node %s", node)
       result = False
     else:
-      result = result and (not rstats[idx])
+      result = result and (not rstats.payload[idx])
   if dev.children:
     for child in dev.children:
       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
@@ -1392,9 +1834,11 @@ class LUDiagnoseOS(NoHooksLU):
                        selected=self.op.output_fields)
 
     # Lock all nodes, in shared mode
+    # Temporary removal of locks, should be reverted later
+    # TODO: reintroduce locks when they are lighter-weight
     self.needed_locks = {}
-    self.share_locks[locking.LEVEL_NODE] = 1
-    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+    #self.share_locks[locking.LEVEL_NODE] = 1
+    #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
 
   def CheckPrereq(self):
     """Check prerequisites.
@@ -1409,7 +1853,7 @@ class LUDiagnoseOS(NoHooksLU):
     @param rlist: a map with node names as keys and OS objects as values
 
     @rtype: dict
-    @returns: a dictionary with osnames as keys and as value another map, with
+    @return: a dictionary with osnames as keys and as value another map, with
         nodes as keys and list of OS objects as values, eg::
 
           {"debian-etch": {"node1": [<object>,...],
@@ -1418,15 +1862,20 @@ class LUDiagnoseOS(NoHooksLU):
 
     """
     all_os = {}
+    # we build here the list of nodes that didn't fail the RPC (at RPC
+    # level), so that nodes with a non-responding node daemon don't
+    # make all OSes invalid
+    good_nodes = [node_name for node_name in rlist
+                  if not rlist[node_name].failed]
     for node_name, nr in rlist.iteritems():
-      if not nr:
+      if nr.failed or not nr.data:
         continue
-      for os_obj in nr:
+      for os_obj in nr.data:
         if os_obj.name not in all_os:
           # build a list of nodes for this os containing empty lists
           # for each node in node_list
           all_os[os_obj.name] = {}
-          for nname in node_list:
+          for nname in good_nodes:
             all_os[os_obj.name][nname] = []
         all_os[os_obj.name][node_name].append(os_obj)
     return all_os
@@ -1435,11 +1884,11 @@ class LUDiagnoseOS(NoHooksLU):
     """Compute the list of OSes.
 
     """
-    node_list = self.acquired_locks[locking.LEVEL_NODE]
-    node_data = self.rpc.call_os_diagnose(node_list)
+    valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
+    node_data = self.rpc.call_os_diagnose(valid_nodes)
     if node_data == False:
       raise errors.OpExecError("Can't gather the list of OSes")
-    pol = self._DiagnoseByOS(node_list, node_data)
+    pol = self._DiagnoseByOS(valid_nodes, node_data)
     output = []
     for os_name, os_data in pol.iteritems():
       row = []
@@ -1491,7 +1940,7 @@ class LURemoveNode(LogicalUnit):
      - it does not have primary or secondary instances
      - it's not the master
 
-    Any errors are signalled by raising errors.OpPrereqError.
+    Any errors are signaled by raising errors.OpPrereqError.
 
     """
     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
@@ -1507,11 +1956,8 @@ class LURemoveNode(LogicalUnit):
 
     for instance_name in instance_list:
       instance = self.cfg.GetInstanceInfo(instance_name)
-      if node.name == instance.primary_node:
-        raise errors.OpPrereqError("Instance %s still running on the node,"
-                                   " please remove first." % instance_name)
-      if node.name in instance.secondary_nodes:
-        raise errors.OpPrereqError("Instance %s has node as a secondary,"
+      if node.name in instance.all_nodes:
+        raise errors.OpPrereqError("Instance %s is still running on the node,"
                                    " please remove first." % instance_name)
     self.op.node_name = node.name
     self.node = node
@@ -1528,18 +1974,21 @@ class LURemoveNode(LogicalUnit):
 
     self.rpc.call_node_leave_cluster(node.name)
 
+    # Promote nodes to master candidate as needed
+    _AdjustCandidatePool(self)
+
 
 class LUQueryNodes(NoHooksLU):
   """Logical unit for querying nodes.
 
   """
-  _OP_REQP = ["output_fields", "names"]
+  _OP_REQP = ["output_fields", "names", "use_locking"]
   REQ_BGL = False
   _FIELDS_DYNAMIC = utils.FieldSet(
     "dtotal", "dfree",
     "mtotal", "mnode", "mfree",
     "bootid",
-    "ctotal",
+    "ctotal", "cnodes", "csockets",
     )
 
   _FIELDS_STATIC = utils.FieldSet(
@@ -1547,6 +1996,11 @@ class LUQueryNodes(NoHooksLU):
     "pinst_list", "sinst_list",
     "pip", "sip", "tags",
     "serial_no",
+    "master_candidate",
+    "master",
+    "offline",
+    "drained",
+    "role",
     )
 
   def ExpandNames(self):
@@ -1562,7 +2016,8 @@ class LUQueryNodes(NoHooksLU):
     else:
       self.wanted = locking.ALL_SET
 
-    self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
+    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
+    self.do_locking = self.do_node_query and self.op.use_locking
     if self.do_locking:
       # if we don't request only static fields, we need to lock the nodes
       self.needed_locks[locking.LEVEL_NODE] = self.wanted
@@ -1597,21 +2052,25 @@ class LUQueryNodes(NoHooksLU):
 
     # begin data gathering
 
-    if self.do_locking:
+    if self.do_node_query:
       live_data = {}
       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
                                           self.cfg.GetHypervisorType())
       for name in nodenames:
-        nodeinfo = node_data.get(name, None)
-        if nodeinfo:
+        nodeinfo = node_data[name]
+        if not nodeinfo.failed and nodeinfo.data:
+          nodeinfo = nodeinfo.data
+          fn = utils.TryConvert
           live_data[name] = {
-            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
-            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
-            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
-            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
-            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
-            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
-            "bootid": nodeinfo['bootid'],
+            "mtotal": fn(int, nodeinfo.get('memory_total', None)),
+            "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
+            "mfree": fn(int, nodeinfo.get('memory_free', None)),
+            "dtotal": fn(int, nodeinfo.get('vg_size', None)),
+            "dfree": fn(int, nodeinfo.get('vg_free', None)),
+            "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
+            "bootid": nodeinfo.get('bootid', None),
+            "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
+            "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
             }
         else:
           live_data[name] = {}
@@ -1624,16 +2083,17 @@ class LUQueryNodes(NoHooksLU):
     inst_fields = frozenset(("pinst_cnt", "pinst_list",
                              "sinst_cnt", "sinst_list"))
     if inst_fields & frozenset(self.op.output_fields):
-      instancelist = self.cfg.GetInstanceList()
+      inst_data = self.cfg.GetAllInstancesInfo()
 
-      for instance_name in instancelist:
-        inst = self.cfg.GetInstanceInfo(instance_name)
+      for instance_name, inst in inst_data.items():
         if inst.primary_node in node_to_primary:
           node_to_primary[inst.primary_node].add(inst.name)
         for secnode in inst.secondary_nodes:
           if secnode in node_to_secondary:
             node_to_secondary[secnode].add(inst.name)
 
+    master_node = self.cfg.GetMasterNode()
+
     # end data gathering
 
     output = []
@@ -1658,8 +2118,27 @@ class LUQueryNodes(NoHooksLU):
           val = list(node.GetTags())
         elif field == "serial_no":
           val = node.serial_no
+        elif field == "master_candidate":
+          val = node.master_candidate
+        elif field == "master":
+          val = node.name == master_node
+        elif field == "offline":
+          val = node.offline
+        elif field == "drained":
+          val = node.drained
         elif self._FIELDS_DYNAMIC.Matches(field):
           val = live_data[node.name].get(field, None)
+        elif field == "role":
+          if node.name == master_node:
+            val = "M"
+          elif node.master_candidate:
+            val = "C"
+          elif node.drained:
+            val = "D"
+          elif node.offline:
+            val = "O"
+          else:
+            val = "R"
         else:
           raise errors.ParameterError(field)
         node_output.append(val)
@@ -1712,10 +2191,10 @@ class LUQueryNodeVolumes(NoHooksLU):
 
     output = []
     for node in nodenames:
-      if node not in volumes or not volumes[node]:
+      if node not in volumes or volumes[node].failed or not volumes[node].data:
         continue
 
-      node_vols = volumes[node][:]
+      node_vols = volumes[node].data[:]
       node_vols.sort(key=lambda vol: vol['dev'])
 
       for vol in node_vols:
@@ -1781,7 +2260,7 @@ class LUAddNode(LogicalUnit):
      - it is resolvable
      - its parameters (single/dual homed) matches the cluster
 
-    Any errors are signalled by raising errors.OpPrereqError.
+    Any errors are signaled by raising errors.OpPrereqError.
 
     """
     node_name = self.op.node_name
@@ -1835,7 +2314,7 @@ class LUAddNode(LogicalUnit):
         raise errors.OpPrereqError("The master has a private ip but the"
                                    " new node doesn't have one")
 
-    # checks reachablity
+    # checks reachability
     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
       raise errors.OpPrereqError("Node not reachable by ping")
 
@@ -1846,9 +2325,25 @@ class LUAddNode(LogicalUnit):
         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
                                    " based ping to noded port")
 
-    self.new_node = objects.Node(name=node,
-                                 primary_ip=primary_ip,
-                                 secondary_ip=secondary_ip)
+    cp_size = self.cfg.GetClusterInfo().candidate_pool_size
+    if self.op.readd:
+      exceptions = [node]
+    else:
+      exceptions = []
+    mc_now, mc_max = self.cfg.GetMasterCandidateStats(exceptions)
+    # the new node will increase mc_max with one, so:
+    mc_max = min(mc_max + 1, cp_size)
+    self.master_candidate = mc_now < mc_max
+
+    if self.op.readd:
+      self.new_node = self.cfg.GetNodeInfo(node)
+      assert self.new_node is not None, "Can't retrieve locked node %s" % node
+    else:
+      self.new_node = objects.Node(name=node,
+                                   primary_ip=primary_ip,
+                                   secondary_ip=secondary_ip,
+                                   master_candidate=self.master_candidate,
+                                   offline=False, drained=False)
 
   def Exec(self, feedback_fn):
     """Adds the new node to the cluster.
@@ -1857,16 +2352,31 @@ class LUAddNode(LogicalUnit):
     new_node = self.new_node
     node = new_node.name
 
+    # for re-adds, reset the offline/drained/master-candidate flags;
+    # we need to reset here, otherwise offline would prevent RPC calls
+    # later in the procedure; this also means that if the re-add
+    # fails, we are left with a non-offlined, broken node
+    if self.op.readd:
+      new_node.drained = new_node.offline = False
+      self.LogInfo("Readding a node, the offline/drained flags were reset")
+      # if we demote the node, we do cleanup later in the procedure
+      new_node.master_candidate = self.master_candidate
+
+    # notify the user about any possible mc promotion
+    if new_node.master_candidate:
+      self.LogInfo("Node will be a master candidate")
+
     # check connectivity
     result = self.rpc.call_version([node])[node]
-    if result:
-      if constants.PROTOCOL_VERSION == result:
+    result.Raise()
+    if result.data:
+      if constants.PROTOCOL_VERSION == result.data:
         logging.info("Communication to node %s fine, sw version %s match",
-                     node, result)
+                     node, result.data)
       else:
         raise errors.OpExecError("Version mismatch master version %s,"
                                  " node version %s" %
-                                 (constants.PROTOCOL_VERSION, result))
+                                 (constants.PROTOCOL_VERSION, result.data))
     else:
       raise errors.OpExecError("Cannot get version from the new node")
 
@@ -1889,15 +2399,19 @@ class LUAddNode(LogicalUnit):
                                     keyarray[2],
                                     keyarray[3], keyarray[4], keyarray[5])
 
-    if not result:
-      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
+    msg = result.RemoteFailMsg()
+    if msg:
+      raise errors.OpExecError("Cannot transfer ssh keys to the"
+                               " new node: %s" % msg)
 
     # Add node to our /etc/hosts, and add key to known_hosts
-    utils.AddHostToEtcHosts(new_node.name)
+    if self.cfg.GetClusterInfo().modify_etc_hosts:
+      utils.AddHostToEtcHosts(new_node.name)
 
     if new_node.secondary_ip != new_node.primary_ip:
-      if not self.rpc.call_node_has_ip_address(new_node.name,
-                                               new_node.secondary_ip):
+      result = self.rpc.call_node_has_ip_address(new_node.name,
+                                                 new_node.secondary_ip)
+      if result.failed or not result.data:
         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
                                  " you gave (%s). Please fix and re-run this"
                                  " command." % new_node.secondary_ip)
@@ -1911,13 +2425,14 @@ class LUAddNode(LogicalUnit):
     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
                                        self.cfg.GetClusterName())
     for verifier in node_verify_list:
-      if not result[verifier]:
+      if result[verifier].failed or not result[verifier].data:
         raise errors.OpExecError("Cannot communicate with %s's node daemon"
                                  " for remote verification" % verifier)
-      if result[verifier]['nodelist']:
-        for failed in result[verifier]['nodelist']:
-          feedback_fn("ssh/hostname verification failed %s -> %s" %
-                      (verifier, result[verifier]['nodelist'][failed]))
+      if result[verifier].data['nodelist']:
+        for failed in result[verifier].data['nodelist']:
+          feedback_fn("ssh/hostname verification failed"
+                      " (checking from %s): %s" %
+                      (verifier, result[verifier].data['nodelist'][failed]))
         raise errors.OpExecError("ssh/hostname verification failed.")
 
     # Distribute updated /etc/hosts and known_hosts to all nodes,
@@ -1932,24 +2447,170 @@ class LUAddNode(LogicalUnit):
     logging.debug("Copying hosts and known_hosts to all nodes")
     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
       result = self.rpc.call_upload_file(dist_nodes, fname)
-      for to_node in dist_nodes:
-        if not result[to_node]:
+      for to_node, to_result in result.iteritems():
+        if to_result.failed or not to_result.data:
           logging.error("Copy of file %s to node %s failed", fname, to_node)
 
     to_copy = []
-    if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
+    enabled_hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
+    if constants.HTS_COPY_VNC_PASSWORD.intersection(enabled_hypervisors):
       to_copy.append(constants.VNC_PASSWORD_FILE)
+
     for fname in to_copy:
       result = self.rpc.call_upload_file([node], fname)
-      if not result[node]:
+      if result[node].failed or not result[node]:
         logging.error("Could not copy file %s to node %s", fname, node)
 
     if self.op.readd:
       self.context.ReaddNode(new_node)
+      # make sure we redistribute the config
+      self.cfg.Update(new_node)
+      # and make sure the new node will not have old files around
+      if not new_node.master_candidate:
+        result = self.rpc.call_node_demote_from_mc(new_node.name)
+        msg = result.RemoteFailMsg()
+        if msg:
+          self.LogWarning("Node failed to demote itself from master"
+                          " candidate status: %s" % msg)
     else:
       self.context.AddNode(new_node)
 
 
+class LUSetNodeParams(LogicalUnit):
+  """Modifies the parameters of a node.
+
+  """
+  HPATH = "node-modify"
+  HTYPE = constants.HTYPE_NODE
+  _OP_REQP = ["node_name"]
+  REQ_BGL = False
+
+  def CheckArguments(self):
+    node_name = self.cfg.ExpandNodeName(self.op.node_name)
+    if node_name is None:
+      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
+    self.op.node_name = node_name
+    _CheckBooleanOpField(self.op, 'master_candidate')
+    _CheckBooleanOpField(self.op, 'offline')
+    _CheckBooleanOpField(self.op, 'drained')
+    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
+    if all_mods.count(None) == 3:
+      raise errors.OpPrereqError("Please pass at least one modification")
+    if all_mods.count(True) > 1:
+      raise errors.OpPrereqError("Can't set the node into more than one"
+                                 " state at the same time")
+
+  def ExpandNames(self):
+    self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
+
+  def BuildHooksEnv(self):
+    """Build hooks env.
+
+    This runs on the master node.
+
+    """
+    env = {
+      "OP_TARGET": self.op.node_name,
+      "MASTER_CANDIDATE": str(self.op.master_candidate),
+      "OFFLINE": str(self.op.offline),
+      "DRAINED": str(self.op.drained),
+      }
+    nl = [self.cfg.GetMasterNode(),
+          self.op.node_name]
+    return env, nl, nl
+
+  def CheckPrereq(self):
+    """Check prerequisites.
+
+    This only checks the instance list against the existing names.
+
+    """
+    node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
+
+    if (self.op.master_candidate is not None or
+        self.op.drained is not None or
+        self.op.offline is not None):
+      # we can't change the master's node flags
+      if self.op.node_name == self.cfg.GetMasterNode():
+        raise errors.OpPrereqError("The master role can be changed"
+                                   " only via masterfailover")
+
+    if ((self.op.master_candidate == False or self.op.offline == True or
+         self.op.drained == True) and node.master_candidate):
+      cp_size = self.cfg.GetClusterInfo().candidate_pool_size
+      num_candidates, _ = self.cfg.GetMasterCandidateStats()
+      if num_candidates <= cp_size:
+        msg = ("Not enough master candidates (desired"
+               " %d, new value will be %d)" % (cp_size, num_candidates-1))
+        if self.op.force:
+          self.LogWarning(msg)
+        else:
+          raise errors.OpPrereqError(msg)
+
+    if (self.op.master_candidate == True and
+        ((node.offline and not self.op.offline == False) or
+         (node.drained and not self.op.drained == False))):
+      raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
+                                 " to master_candidate" % node.name)
+
+    return
+
+  def Exec(self, feedback_fn):
+    """Modifies a node.
+
+    """
+    node = self.node
+
+    result = []
+    changed_mc = False
+
+    if self.op.offline is not None:
+      node.offline = self.op.offline
+      result.append(("offline", str(self.op.offline)))
+      if self.op.offline == True:
+        if node.master_candidate:
+          node.master_candidate = False
+          changed_mc = True
+          result.append(("master_candidate", "auto-demotion due to offline"))
+        if node.drained:
+          node.drained = False
+          result.append(("drained", "clear drained status due to offline"))
+
+    if self.op.master_candidate is not None:
+      node.master_candidate = self.op.master_candidate
+      changed_mc = True
+      result.append(("master_candidate", str(self.op.master_candidate)))
+      if self.op.master_candidate == False:
+        rrc = self.rpc.call_node_demote_from_mc(node.name)
+        msg = rrc.RemoteFailMsg()
+        if msg:
+          self.LogWarning("Node failed to demote itself: %s" % msg)
+
+    if self.op.drained is not None:
+      node.drained = self.op.drained
+      result.append(("drained", str(self.op.drained)))
+      if self.op.drained == True:
+        if node.master_candidate:
+          node.master_candidate = False
+          changed_mc = True
+          result.append(("master_candidate", "auto-demotion due to drain"))
+          rrc = self.rpc.call_node_demote_from_mc(node.name)
+          msg = rrc.RemoteFailMsg()
+          if msg:
+            self.LogWarning("Node failed to demote itself: %s" % msg)
+        if node.offline:
+          node.offline = False
+          result.append(("offline", "clear offline status due to drain"))
+
+    # this will trigger configuration file update, if needed
+    self.cfg.Update(node)
+    # this will trigger job queue propagation or cleanup
+    if changed_mc:
+      self.context.ReaddNode(node)
+
+    return result
+
+
 class LUQueryClusterInfo(NoHooksLU):
   """Query cluster configuration.
 
@@ -1982,8 +2643,15 @@ class LUQueryClusterInfo(NoHooksLU):
       "master": cluster.master_node,
       "default_hypervisor": cluster.default_hypervisor,
       "enabled_hypervisors": cluster.enabled_hypervisors,
-      "hvparams": cluster.hvparams,
+      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
+                        for hypervisor_name in cluster.enabled_hypervisors]),
       "beparams": cluster.beparams,
+      "candidate_pool_size": cluster.candidate_pool_size,
+      "default_bridge": cluster.default_bridge,
+      "master_netdev": cluster.master_netdev,
+      "volume_group_name": cluster.volume_group_name,
+      "file_storage_dir": cluster.file_storage_dir,
+      "tags": list(cluster.GetTags()),
       }
 
     return result
@@ -2054,19 +2722,25 @@ class LUActivateInstanceDisks(NoHooksLU):
     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
     assert self.instance is not None, \
       "Cannot retrieve locked instance %s" % self.op.instance_name
+    _CheckNodeOnline(self, self.instance.primary_node)
+    if not hasattr(self.op, "ignore_size"):
+      self.op.ignore_size = False
 
   def Exec(self, feedback_fn):
     """Activate the disks.
 
     """
-    disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
+    disks_ok, disks_info = \
+              _AssembleInstanceDisks(self, self.instance,
+                                     ignore_size=self.op.ignore_size)
     if not disks_ok:
       raise errors.OpExecError("Cannot activate block devices")
 
     return disks_info
 
 
-def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
+def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False,
+                           ignore_size=False):
   """Prepare the block devices for an instance.
 
   This sets up the block devices on all nodes.
@@ -2078,6 +2752,10 @@ def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
   @type ignore_secondaries: boolean
   @param ignore_secondaries: if true, errors on secondary nodes
       won't result in an error return from the function
+  @type ignore_size: boolean
+  @param ignore_size: if true, the current known size of the disk
+      will not be used during the disk activation, useful for cases
+      when the size is wrong
   @return: False if the operation failed, otherwise a list of
       (host, instance_visible_name, node_visible_name)
       with the mapping from node devices to instance devices
@@ -2098,12 +2776,16 @@ def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
   # 1st pass, assemble on all nodes in secondary mode
   for inst_disk in instance.disks:
     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
+      if ignore_size:
+        node_disk = node_disk.Copy()
+        node_disk.UnsetSize()
       lu.cfg.SetDiskID(node_disk, node)
       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
-      if not result:
+      msg = result.RemoteFailMsg()
+      if msg:
         lu.proc.LogWarning("Could not prepare block device %s on node %s"
-                           " (is_primary=False, pass=1)",
-                           inst_disk.iv_name, node)
+                           " (is_primary=False, pass=1): %s",
+                           inst_disk.iv_name, node, msg)
         if not ignore_secondaries:
           disks_ok = False
 
@@ -2114,14 +2796,19 @@ def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
       if node != instance.primary_node:
         continue
+      if ignore_size:
+        node_disk = node_disk.Copy()
+        node_disk.UnsetSize()
       lu.cfg.SetDiskID(node_disk, node)
       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
-      if not result:
+      msg = result.RemoteFailMsg()
+      if msg:
         lu.proc.LogWarning("Could not prepare block device %s on node %s"
-                           " (is_primary=True, pass=2)",
-                           inst_disk.iv_name, node)
+                           " (is_primary=True, pass=2): %s",
+                           inst_disk.iv_name, node, msg)
         disks_ok = False
-    device_info.append((instance.primary_node, inst_disk.iv_name, result))
+    device_info.append((instance.primary_node, inst_disk.iv_name,
+                        result.payload))
 
   # leave the disks configured for the primary node
   # this is a workaround that would be fixed better by
@@ -2136,7 +2823,7 @@ def _StartInstanceDisks(lu, instance, force):
   """Start the disks of an instance.
 
   """
-  disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
+  disks_ok, _ = _AssembleInstanceDisks(lu, instance,
                                            ignore_secondaries=force)
   if not disks_ok:
     _ShutdownInstanceDisks(lu, instance)
@@ -2191,11 +2878,11 @@ def _SafeShutdownInstanceDisks(lu, instance):
   ins_l = lu.rpc.call_instance_list([instance.primary_node],
                                       [instance.hypervisor])
   ins_l = ins_l[instance.primary_node]
-  if not type(ins_l) is list:
+  if ins_l.failed or not isinstance(ins_l.data, list):
     raise errors.OpExecError("Can't contact node '%s'" %
                              instance.primary_node)
 
-  if instance.name in ins_l:
+  if instance.name in ins_l.data:
     raise errors.OpExecError("Instance is running, can't shutdown"
                              " block devices.")
 
@@ -2211,19 +2898,21 @@ def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
   ignored.
 
   """
-  result = True
+  all_result = True
   for disk in instance.disks:
     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
       lu.cfg.SetDiskID(top_disk, node)
-      if not lu.rpc.call_blockdev_shutdown(node, top_disk):
-        logging.error("Could not shutdown block device %s on node %s",
-                      disk.iv_name, node)
+      result = lu.rpc.call_blockdev_shutdown(node, top_disk)
+      msg = result.RemoteFailMsg()
+      if msg:
+        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
+                      disk.iv_name, node, msg)
         if not ignore_primary or node != instance.primary_node:
-          result = False
-  return result
+          all_result = False
+  return all_result
 
 
-def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
+def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
   """Checks if a node has enough free memory.
 
   This function check if a given node has the needed amount of free
@@ -2239,18 +2928,15 @@ def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
   @param reason: string to use in the error message
   @type requested: C{int}
   @param requested: the amount of memory in MiB to check for
-  @type hypervisor: C{str}
-  @param hypervisor: the hypervisor to ask for memory stats
+  @type hypervisor_name: C{str}
+  @param hypervisor_name: the hypervisor to ask for memory stats
   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
       we cannot check the node
 
   """
-  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
-  if not nodeinfo or not isinstance(nodeinfo, dict):
-    raise errors.OpPrereqError("Could not contact node %s for resource"
-                             " information" % (node,))
-
-  free_mem = nodeinfo[node].get('memory_free')
+  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
+  nodeinfo[node].Raise()
+  free_mem = nodeinfo[node].data.get('memory_free')
   if not isinstance(free_mem, int):
     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
                              " was '%s'" % (node, free_mem))
@@ -2282,8 +2968,7 @@ class LUStartupInstance(LogicalUnit):
       "FORCE": self.op.force,
       }
     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
-    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
-          list(self.instance.secondary_nodes))
+    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
     return env, nl, nl
 
   def CheckPrereq(self):
@@ -2296,13 +2981,48 @@ class LUStartupInstance(LogicalUnit):
     assert self.instance is not None, \
       "Cannot retrieve locked instance %s" % self.op.instance_name
 
+    # extra beparams
+    self.beparams = getattr(self.op, "beparams", {})
+    if self.beparams:
+      if not isinstance(self.beparams, dict):
+        raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
+                                   " dict" % (type(self.beparams), ))
+      # fill the beparams dict
+      utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
+      self.op.beparams = self.beparams
+
+    # extra hvparams
+    self.hvparams = getattr(self.op, "hvparams", {})
+    if self.hvparams:
+      if not isinstance(self.hvparams, dict):
+        raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
+                                   " dict" % (type(self.hvparams), ))
+
+      # check hypervisor parameter syntax (locally)
+      cluster = self.cfg.GetClusterInfo()
+      utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
+      filled_hvp = cluster.FillDict(cluster.hvparams[instance.hypervisor],
+                                    instance.hvparams)
+      filled_hvp.update(self.hvparams)
+      hv_type = hypervisor.GetHypervisor(instance.hypervisor)
+      hv_type.CheckParameterSyntax(filled_hvp)
+      _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
+      self.op.hvparams = self.hvparams
+
+    _CheckNodeOnline(self, instance.primary_node)
+
     bep = self.cfg.GetClusterInfo().FillBE(instance)
-    # check bridges existance
+    # check bridges existence
     _CheckInstanceBridgesExist(self, instance)
 
-    _CheckNodeFreeMemory(self, instance.primary_node,
-                         "starting instance %s" % instance.name,
-                         bep[constants.BE_MEMORY], instance.hypervisor)
+    remote_info = self.rpc.call_instance_info(instance.primary_node,
+                                              instance.name,
+                                              instance.hypervisor)
+    remote_info.Raise()
+    if not remote_info.data:
+      _CheckNodeFreeMemory(self, instance.primary_node,
+                           "starting instance %s" % instance.name,
+                           bep[constants.BE_MEMORY], instance.hypervisor)
 
   def Exec(self, feedback_fn):
     """Start the instance.
@@ -2310,7 +3030,6 @@ class LUStartupInstance(LogicalUnit):
     """
     instance = self.instance
     force = self.op.force
-    extra_args = getattr(self.op, "extra_args", "")
 
     self.cfg.MarkInstanceUp(instance.name)
 
@@ -2318,9 +3037,12 @@ class LUStartupInstance(LogicalUnit):
 
     _StartInstanceDisks(self, instance, force)
 
-    if not self.rpc.call_instance_start(node_current, instance, extra_args):
+    result = self.rpc.call_instance_start(node_current, instance,
+                                          self.hvparams, self.beparams)
+    msg = result.RemoteFailMsg()
+    if msg:
       _ShutdownInstanceDisks(self, instance)
-      raise errors.OpExecError("Could not start instance")
+      raise errors.OpExecError("Could not start instance: %s" % msg)
 
 
 class LURebootInstance(LogicalUnit):
@@ -2350,10 +3072,10 @@ class LURebootInstance(LogicalUnit):
     """
     env = {
       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
+      "REBOOT_TYPE": self.op.reboot_type,
       }
     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
-    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
-          list(self.instance.secondary_nodes))
+    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
     return env, nl, nl
 
   def CheckPrereq(self):
@@ -2366,7 +3088,9 @@ class LURebootInstance(LogicalUnit):
     assert self.instance is not None, \
       "Cannot retrieve locked instance %s" % self.op.instance_name
 
-    # check bridges existance
+    _CheckNodeOnline(self, instance.primary_node)
+
+    # check bridges existence
     _CheckInstanceBridgesExist(self, instance)
 
   def Exec(self, feedback_fn):
@@ -2376,23 +3100,32 @@ class LURebootInstance(LogicalUnit):
     instance = self.instance
     ignore_secondaries = self.op.ignore_secondaries
     reboot_type = self.op.reboot_type
-    extra_args = getattr(self.op, "extra_args", "")
 
     node_current = instance.primary_node
 
     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
                        constants.INSTANCE_REBOOT_HARD]:
-      if not self.rpc.call_instance_reboot(node_current, instance,
-                                           reboot_type, extra_args):
-        raise errors.OpExecError("Could not reboot instance")
+      for disk in instance.disks:
+        self.cfg.SetDiskID(disk, node_current)
+      result = self.rpc.call_instance_reboot(node_current, instance,
+                                             reboot_type)
+      msg = result.RemoteFailMsg()
+      if msg:
+        raise errors.OpExecError("Could not reboot instance: %s" % msg)
     else:
-      if not self.rpc.call_instance_shutdown(node_current, instance):
-        raise errors.OpExecError("could not shutdown instance for full reboot")
+      result = self.rpc.call_instance_shutdown(node_current, instance)
+      msg = result.RemoteFailMsg()
+      if msg:
+        raise errors.OpExecError("Could not shutdown instance for"
+                                 " full reboot: %s" % msg)
       _ShutdownInstanceDisks(self, instance)
       _StartInstanceDisks(self, instance, ignore_secondaries)
-      if not self.rpc.call_instance_start(node_current, instance, extra_args):
+      result = self.rpc.call_instance_start(node_current, instance, None, None)
+      msg = result.RemoteFailMsg()
+      if msg:
         _ShutdownInstanceDisks(self, instance)
-        raise errors.OpExecError("Could not start instance for full reboot")
+        raise errors.OpExecError("Could not start instance for"
+                                 " full reboot: %s" % msg)
 
     self.cfg.MarkInstanceUp(instance.name)
 
@@ -2416,8 +3149,7 @@ class LUShutdownInstance(LogicalUnit):
 
     """
     env = _BuildInstanceHookEnvByObject(self, self.instance)
-    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
-          list(self.instance.secondary_nodes))
+    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
     return env, nl, nl
 
   def CheckPrereq(self):
@@ -2429,6 +3161,7 @@ class LUShutdownInstance(LogicalUnit):
     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
     assert self.instance is not None, \
       "Cannot retrieve locked instance %s" % self.op.instance_name
+    _CheckNodeOnline(self, self.instance.primary_node)
 
   def Exec(self, feedback_fn):
     """Shutdown the instance.
@@ -2437,8 +3170,10 @@ class LUShutdownInstance(LogicalUnit):
     instance = self.instance
     node_current = instance.primary_node
     self.cfg.MarkInstanceDown(instance.name)
-    if not self.rpc.call_instance_shutdown(node_current, instance):
-      self.proc.LogWarning("Could not shutdown instance")
+    result = self.rpc.call_instance_shutdown(node_current, instance)
+    msg = result.RemoteFailMsg()
+    if msg:
+      self.proc.LogWarning("Could not shutdown instance: %s" % msg)
 
     _ShutdownInstanceDisks(self, instance)
 
@@ -2462,8 +3197,7 @@ class LUReinstallInstance(LogicalUnit):
 
     """
     env = _BuildInstanceHookEnvByObject(self, self.instance)
-    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
-          list(self.instance.secondary_nodes))
+    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
     return env, nl, nl
 
   def CheckPrereq(self):
@@ -2475,17 +3209,19 @@ class LUReinstallInstance(LogicalUnit):
     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
     assert instance is not None, \
       "Cannot retrieve locked instance %s" % self.op.instance_name
+    _CheckNodeOnline(self, instance.primary_node)
 
     if instance.disk_template == constants.DT_DISKLESS:
       raise errors.OpPrereqError("Instance '%s' has no disks" %
                                  self.op.instance_name)
-    if instance.status != "down":
+    if instance.admin_up:
       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
                                  self.op.instance_name)
     remote_info = self.rpc.call_instance_info(instance.primary_node,
                                               instance.name,
                                               instance.hypervisor)
-    if remote_info:
+    remote_info.Raise()
+    if remote_info.data:
       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
                                  (self.op.instance_name,
                                   instance.primary_node))
@@ -2498,8 +3234,9 @@ class LUReinstallInstance(LogicalUnit):
       if pnode is None:
         raise errors.OpPrereqError("Primary node '%s' is unknown" %
                                    self.op.pnode)
-      os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
-      if not os_obj:
+      result = self.rpc.call_os_get(pnode.name, self.op.os_type)
+      result.Raise()
+      if not isinstance(result.data, objects.OS):
         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
                                    " primary node"  % self.op.os_type)
 
@@ -2519,10 +3256,12 @@ class LUReinstallInstance(LogicalUnit):
     _StartInstanceDisks(self, inst, None)
     try:
       feedback_fn("Running the instance OS create scripts...")
-      if not self.rpc.call_instance_os_add(inst.primary_node, inst):
+      result = self.rpc.call_instance_os_add(inst.primary_node, inst)
+      msg = result.RemoteFailMsg()
+      if msg:
         raise errors.OpExecError("Could not install OS for instance %s"
-                                 " on node %s" %
-                                 (inst.name, inst.primary_node))
+                                 " on node %s: %s" %
+                                 (inst.name, inst.primary_node, msg))
     finally:
       _ShutdownInstanceDisks(self, inst)
 
@@ -2543,8 +3282,7 @@ class LURenameInstance(LogicalUnit):
     """
     env = _BuildInstanceHookEnvByObject(self, self.instance)
     env["INSTANCE_NEW_NAME"] = self.op.new_name
-    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
-          list(self.instance.secondary_nodes))
+    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
     return env, nl, nl
 
   def CheckPrereq(self):
@@ -2558,13 +3296,16 @@ class LURenameInstance(LogicalUnit):
     if instance is None:
       raise errors.OpPrereqError("Instance '%s' not known" %
                                  self.op.instance_name)
-    if instance.status != "down":
+    _CheckNodeOnline(self, instance.primary_node)
+
+    if instance.admin_up:
       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
                                  self.op.instance_name)
     remote_info = self.rpc.call_instance_info(instance.primary_node,
                                               instance.name,
                                               instance.hypervisor)
-    if remote_info:
+    remote_info.Raise()
+    if remote_info.data:
       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
                                  (self.op.instance_name,
                                   instance.primary_node))
@@ -2608,15 +3349,15 @@ class LURenameInstance(LogicalUnit):
       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
                                                      old_file_storage_dir,
                                                      new_file_storage_dir)
-
-      if not result:
+      result.Raise()
+      if not result.data:
         raise errors.OpExecError("Could not connect to node '%s' to rename"
                                  " directory '%s' to '%s' (but the instance"
                                  " has been renamed in Ganeti)" % (
                                  inst.primary_node, old_file_storage_dir,
                                  new_file_storage_dir))
 
-      if not result[0]:
+      if not result.data[0]:
         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
                                  " (but the instance has been renamed in"
                                  " Ganeti)" % (old_file_storage_dir,
@@ -2624,11 +3365,13 @@ class LURenameInstance(LogicalUnit):
 
     _StartInstanceDisks(self, inst, None)
     try:
-      if not self.rpc.call_instance_run_rename(inst.primary_node, inst,
-                                               old_name):
+      result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
+                                                 old_name)
+      msg = result.RemoteFailMsg()
+      if msg:
         msg = ("Could not run OS rename script for instance %s on node %s"
-               " (but the instance has been renamed in Ganeti)" %
-               (inst.name, inst.primary_node))
+               " (but the instance has been renamed in Ganeti): %s" %
+               (inst.name, inst.primary_node, msg))
         self.proc.LogWarning(msg)
     finally:
       _ShutdownInstanceDisks(self, inst)
@@ -2680,12 +3423,15 @@ class LURemoveInstance(LogicalUnit):
     logging.info("Shutting down instance %s on node %s",
                  instance.name, instance.primary_node)
 
-    if not self.rpc.call_instance_shutdown(instance.primary_node, instance):
+    result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
+    msg = result.RemoteFailMsg()
+    if msg:
       if self.op.ignore_failures:
-        feedback_fn("Warning: can't shutdown instance")
+        feedback_fn("Warning: can't shutdown instance: %s" % msg)
       else:
-        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
-                                 (instance.name, instance.primary_node))
+        raise errors.OpExecError("Could not shutdown instance %s on"
+                                 " node %s: %s" %
+                                 (instance.name, instance.primary_node, msg))
 
     logging.info("Removing block devices for instance %s", instance.name)
 
@@ -2705,18 +3451,18 @@ class LUQueryInstances(NoHooksLU):
   """Logical unit for querying instances.
 
   """
-  _OP_REQP = ["output_fields", "names"]
+  _OP_REQP = ["output_fields", "names", "use_locking"]
   REQ_BGL = False
   _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
-                                    "admin_state", "admin_ram",
+                                    "admin_state",
                                     "disk_template", "ip", "mac", "bridge",
                                     "sda_size", "sdb_size", "vcpus", "tags",
                                     "network_port", "beparams",
-                                    "(disk).(size)/([0-9]+)",
-                                    "(disk).(sizes)",
-                                    "(nic).(mac|ip|bridge)/([0-9]+)",
-                                    "(nic).(macs|ips|bridges)",
-                                    "(disk|nic).(count)",
+                                    r"(disk)\.(size)/([0-9]+)",
+                                    r"(disk)\.(sizes)", "disk_usage",
+                                    r"(nic)\.(mac|ip|bridge)/([0-9]+)",
+                                    r"(nic)\.(macs|ips|bridges)",
+                                    r"(disk|nic)\.(count)",
                                     "serial_no", "hypervisor", "hvparams",] +
                                   ["hv/%s" % name
                                    for name in constants.HVS_PARAMETERS] +
@@ -2739,7 +3485,8 @@ class LUQueryInstances(NoHooksLU):
     else:
       self.wanted = locking.ALL_SET
 
-    self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
+    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
+    self.do_locking = self.do_node_query and self.op.use_locking
     if self.do_locking:
       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
       self.needed_locks[locking.LEVEL_NODE] = []
@@ -2760,19 +3507,25 @@ class LUQueryInstances(NoHooksLU):
 
     """
     all_info = self.cfg.GetAllInstancesInfo()
-    if self.do_locking:
-      instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
-    elif self.wanted != locking.ALL_SET:
-      instance_names = self.wanted
-      missing = set(instance_names).difference(all_info.keys())
-      if missing:
-        raise errors.OpExecError(
-          "Some instances were removed before retrieving their data: %s"
-          % missing)
+    if self.wanted == locking.ALL_SET:
+      # caller didn't specify instance names, so ordering is not important
+      if self.do_locking:
+        instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
+      else:
+        instance_names = all_info.keys()
+      instance_names = utils.NiceSort(instance_names)
     else:
-      instance_names = all_info.keys()
+      # caller did specify names, so we must keep the ordering
+      if self.do_locking:
+        tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
+      else:
+        tgt_set = all_info.keys()
+      missing = set(self.wanted).difference(tgt_set)
+      if missing:
+        raise errors.OpExecError("Some instances were removed before"
+                                 " retrieving their data: %s" % missing)
+      instance_names = self.wanted
 
-    instance_names = utils.NiceSort(instance_names)
     instance_list = [all_info[iname] for iname in instance_names]
 
     # begin data gathering
@@ -2781,16 +3534,21 @@ class LUQueryInstances(NoHooksLU):
     hv_list = list(set([inst.hypervisor for inst in instance_list]))
 
     bad_nodes = []
-    if self.do_locking:
+    off_nodes = []
+    if self.do_node_query:
       live_data = {}
       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
       for name in nodes:
         result = node_data[name]
-        if result:
-          live_data.update(result)
-        elif result == False:
+        if result.offline:
+          # offline nodes will be in both lists
+          off_nodes.append(name)
+        if result.failed:
           bad_nodes.append(name)
-        # else no instance is alive
+        else:
+          if result.data:
+            live_data.update(result.data)
+            # else no instance is alive
     else:
       live_data = dict([(name, {}) for name in instance_names])
 
@@ -2814,24 +3572,26 @@ class LUQueryInstances(NoHooksLU):
         elif field == "snodes":
           val = list(instance.secondary_nodes)
         elif field == "admin_state":
-          val = (instance.status != "down")
+          val = instance.admin_up
         elif field == "oper_state":
           if instance.primary_node in bad_nodes:
             val = None
           else:
             val = bool(live_data.get(instance.name))
         elif field == "status":
-          if instance.primary_node in bad_nodes:
+          if instance.primary_node in off_nodes:
+            val = "ERROR_nodeoffline"
+          elif instance.primary_node in bad_nodes:
             val = "ERROR_nodedown"
           else:
             running = bool(live_data.get(instance.name))
             if running:
-              if instance.status != "down":
+              if instance.admin_up:
                 val = "running"
               else:
                 val = "ERROR_up"
             else:
-              if instance.status != "down":
+              if instance.admin_up:
                 val = "ERROR_down"
               else:
                 val = "ADMIN_down"
@@ -2842,20 +3602,34 @@ class LUQueryInstances(NoHooksLU):
             val = live_data[instance.name].get("memory", "?")
           else:
             val = "-"
+        elif field == "vcpus":
+          val = i_be[constants.BE_VCPUS]
         elif field == "disk_template":
           val = instance.disk_template
         elif field == "ip":
-          val = instance.nics[0].ip
+          if instance.nics:
+            val = instance.nics[0].ip
+          else:
+            val = None
         elif field == "bridge":
-          val = instance.nics[0].bridge
+          if instance.nics:
+            val = instance.nics[0].bridge
+          else:
+            val = None
         elif field == "mac":
-          val = instance.nics[0].mac
+          if instance.nics:
+            val = instance.nics[0].mac
+          else:
+            val = None
         elif field == "sda_size" or field == "sdb_size":
           idx = ord(field[2]) - ord('a')
           try:
             val = instance.FindDisk(idx).size
           except errors.OpPrereqError:
             val = None
+        elif field == "disk_usage": # total disk usage per node
+          disk_sizes = [{'size': disk.size} for disk in instance.disks]
+          val = _ComputeDiskSize(instance.disk_template, disk_sizes)
         elif field == "tags":
           val = list(instance.GetTags())
         elif field == "serial_no":
@@ -2913,9 +3687,10 @@ class LUQueryInstances(NoHooksLU):
                 else:
                   assert False, "Unhandled NIC parameter"
           else:
-            assert False, "Unhandled variable parameter"
+            assert False, ("Declared but unhandled variable parameter '%s'" %
+                           field)
         else:
-          raise errors.ParameterError(field)
+          assert False, "Declared but unhandled parameter '%s'" % field
         iout.append(val)
       output.append(iout)
 
@@ -2974,14 +3749,23 @@ class LUFailoverInstance(LogicalUnit):
                                    "a mirrored disk template")
 
     target_node = secondary_nodes[0]
-    # check memory requirements on the secondary node
-    _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
-                         instance.name, bep[constants.BE_MEMORY],
-                         instance.hypervisor)
+    _CheckNodeOnline(self, target_node)
+    _CheckNodeNotDrained(self, target_node)
+
+    if instance.admin_up:
+      # check memory requirements on the secondary node
+      _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
+                           instance.name, bep[constants.BE_MEMORY],
+                           instance.hypervisor)
+    else:
+      self.LogInfo("Not checking memory on the secondary node as"
+                   " instance will not be started")
 
-    # check bridge existance
+    # check bridge existence
     brlist = [nic.bridge for nic in instance.nics]
-    if not self.rpc.call_bridges_exist(target_node, brlist):
+    result = self.rpc.call_bridges_exist(target_node, brlist)
+    result.Raise()
+    if not result.data:
       raise errors.OpPrereqError("One or more target bridges %s does not"
                                  " exist on destination node '%s'" %
                                  (brlist, target_node))
@@ -3002,7 +3786,7 @@ class LUFailoverInstance(LogicalUnit):
     for dev in instance.disks:
       # for drbd, these are drbd over lvm
       if not _CheckDiskConsistency(self, dev, target_node, False):
-        if instance.status == "up" and not self.op.ignore_consistency:
+        if instance.admin_up and not self.op.ignore_consistency:
           raise errors.OpExecError("Disk %s is degraded on target node,"
                                    " aborting failover." % dev.iv_name)
 
@@ -3010,15 +3794,18 @@ class LUFailoverInstance(LogicalUnit):
     logging.info("Shutting down instance %s on node %s",
                  instance.name, source_node)
 
-    if not self.rpc.call_instance_shutdown(source_node, instance):
+    result = self.rpc.call_instance_shutdown(source_node, instance)
+    msg = result.RemoteFailMsg()
+    if msg:
       if self.op.ignore_consistency:
         self.proc.LogWarning("Could not shutdown instance %s on node %s."
-                             " Proceeding"
-                             " anyway. Please make sure node %s is down",
-                             instance.name, source_node, source_node)
+                             " Proceeding anyway. Please make sure node"
+                             " %s is down. Error details: %s",
+                             instance.name, source_node, source_node, msg)
       else:
-        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
-                                 (instance.name, source_node))
+        raise errors.OpExecError("Could not shutdown instance %s on"
+                                 " node %s: %s" %
+                                 (instance.name, source_node, msg))
 
     feedback_fn("* deactivating the instance's disks on source node")
     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
@@ -3029,72 +3816,470 @@ class LUFailoverInstance(LogicalUnit):
     self.cfg.Update(instance)
 
     # Only start the instance if it's marked as up
-    if instance.status == "up":
+    if instance.admin_up:
       feedback_fn("* activating the instance's disks on target node")
       logging.info("Starting instance %s on node %s",
                    instance.name, target_node)
 
-      disks_ok, dummy = _AssembleInstanceDisks(self, instance,
+      disks_ok, _ = _AssembleInstanceDisks(self, instance,
                                                ignore_secondaries=True)
       if not disks_ok:
         _ShutdownInstanceDisks(self, instance)
         raise errors.OpExecError("Can't activate the instance's disks")
 
       feedback_fn("* starting the instance on the target node")
-      if not self.rpc.call_instance_start(target_node, instance, None):
+      result = self.rpc.call_instance_start(target_node, instance, None, None)
+      msg = result.RemoteFailMsg()
+      if msg:
         _ShutdownInstanceDisks(self, instance)
-        raise errors.OpExecError("Could not start instance %s on node %s." %
-                                 (instance.name, target_node))
+        raise errors.OpExecError("Could not start instance %s on node %s: %s" %
+                                 (instance.name, target_node, msg))
 
 
-def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
-  """Create a tree of block devices on the primary node.
+class LUMigrateInstance(LogicalUnit):
+  """Migrate an instance.
 
-  This always creates all devices.
+  This is migration without shutting down, compared to the failover,
+  which is done with shutdown.
 
   """
-  if device.children:
-    for child in device.children:
-      if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
-        return False
+  HPATH = "instance-migrate"
+  HTYPE = constants.HTYPE_INSTANCE
+  _OP_REQP = ["instance_name", "live", "cleanup"]
 
-  lu.cfg.SetDiskID(device, node)
-  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
-                                       instance.name, True, info)
-  if not new_id:
-    return False
-  if device.physical_id is None:
-    device.physical_id = new_id
-  return True
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    self._ExpandAndLockInstance()
+    self.needed_locks[locking.LEVEL_NODE] = []
+    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+
+  def DeclareLocks(self, level):
+    if level == locking.LEVEL_NODE:
+      self._LockInstancesNodes()
+
+  def BuildHooksEnv(self):
+    """Build hooks env.
+
+    This runs on master, primary and secondary nodes of the instance.
+
+    """
+    env = _BuildInstanceHookEnvByObject(self, self.instance)
+    env["MIGRATE_LIVE"] = self.op.live
+    env["MIGRATE_CLEANUP"] = self.op.cleanup
+    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
+    return env, nl, nl
+
+  def CheckPrereq(self):
+    """Check prerequisites.
+
+    This checks that the instance is in the cluster.
+
+    """
+    instance = self.cfg.GetInstanceInfo(
+      self.cfg.ExpandInstanceName(self.op.instance_name))
+    if instance is None:
+      raise errors.OpPrereqError("Instance '%s' not known" %
+                                 self.op.instance_name)
+
+    if instance.disk_template != constants.DT_DRBD8:
+      raise errors.OpPrereqError("Instance's disk layout is not"
+                                 " drbd8, cannot migrate.")
 
+    secondary_nodes = instance.secondary_nodes
+    if not secondary_nodes:
+      raise errors.ConfigurationError("No secondary node but using"
+                                      " drbd8 disk template")
+
+    i_be = self.cfg.GetClusterInfo().FillBE(instance)
+
+    target_node = secondary_nodes[0]
+    # check memory requirements on the secondary node
+    _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
+                         instance.name, i_be[constants.BE_MEMORY],
+                         instance.hypervisor)
+
+    # check bridge existence
+    brlist = [nic.bridge for nic in instance.nics]
+    result = self.rpc.call_bridges_exist(target_node, brlist)
+    if result.failed or not result.data:
+      raise errors.OpPrereqError("One or more target bridges %s does not"
+                                 " exist on destination node '%s'" %
+                                 (brlist, target_node))
+
+    if not self.op.cleanup:
+      _CheckNodeNotDrained(self, target_node)
+      result = self.rpc.call_instance_migratable(instance.primary_node,
+                                                 instance)
+      msg = result.RemoteFailMsg()
+      if msg:
+        raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
+                                   msg)
+
+    self.instance = instance
+
+  def _WaitUntilSync(self):
+    """Poll with custom rpc for disk sync.
+
+    This uses our own step-based rpc call.
+
+    """
+    self.feedback_fn("* wait until resync is done")
+    all_done = False
+    while not all_done:
+      all_done = True
+      result = self.rpc.call_drbd_wait_sync(self.all_nodes,
+                                            self.nodes_ip,
+                                            self.instance.disks)
+      min_percent = 100
+      for node, nres in result.items():
+        msg = nres.RemoteFailMsg()
+        if msg:
+          raise errors.OpExecError("Cannot resync disks on node %s: %s" %
+                                   (node, msg))
+        node_done, node_percent = nres.payload
+        all_done = all_done and node_done
+        if node_percent is not None:
+          min_percent = min(min_percent, node_percent)
+      if not all_done:
+        if min_percent < 100:
+          self.feedback_fn("   - progress: %.1f%%" % min_percent)
+        time.sleep(2)
+
+  def _EnsureSecondary(self, node):
+    """Demote a node to secondary.
+
+    """
+    self.feedback_fn("* switching node %s to secondary mode" % node)
+
+    for dev in self.instance.disks:
+      self.cfg.SetDiskID(dev, node)
+
+    result = self.rpc.call_blockdev_close(node, self.instance.name,
+                                          self.instance.disks)
+    msg = result.RemoteFailMsg()
+    if msg:
+      raise errors.OpExecError("Cannot change disk to secondary on node %s,"
+                               " error %s" % (node, msg))
+
+  def _GoStandalone(self):
+    """Disconnect from the network.
+
+    """
+    self.feedback_fn("* changing into standalone mode")
+    result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
+                                               self.instance.disks)
+    for node, nres in result.items():
+      msg = nres.RemoteFailMsg()
+      if msg:
+        raise errors.OpExecError("Cannot disconnect disks node %s,"
+                                 " error %s" % (node, msg))
+
+  def _GoReconnect(self, multimaster):
+    """Reconnect to the network.
+
+    """
+    if multimaster:
+      msg = "dual-master"
+    else:
+      msg = "single-master"
+    self.feedback_fn("* changing disks into %s mode" % msg)
+    result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
+                                           self.instance.disks,
+                                           self.instance.name, multimaster)
+    for node, nres in result.items():
+      msg = nres.RemoteFailMsg()
+      if msg:
+        raise errors.OpExecError("Cannot change disks config on node %s,"
+                                 " error: %s" % (node, msg))
+
+  def _ExecCleanup(self):
+    """Try to cleanup after a failed migration.
+
+    The cleanup is done by:
+      - check that the instance is running only on one node
+        (and update the config if needed)
+      - change disks on its secondary node to secondary
+      - wait until disks are fully synchronized
+      - disconnect from the network
+      - change disks into single-master mode
+      - wait again until disks are fully synchronized
+
+    """
+    instance = self.instance
+    target_node = self.target_node
+    source_node = self.source_node
+
+    # check running on only one node
+    self.feedback_fn("* checking where the instance actually runs"
+                     " (if this hangs, the hypervisor might be in"
+                     " a bad state)")
+    ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
+    for node, result in ins_l.items():
+      result.Raise()
+      if not isinstance(result.data, list):
+        raise errors.OpExecError("Can't contact node '%s'" % node)
+
+    runningon_source = instance.name in ins_l[source_node].data
+    runningon_target = instance.name in ins_l[target_node].data
+
+    if runningon_source and runningon_target:
+      raise errors.OpExecError("Instance seems to be running on two nodes,"
+                               " or the hypervisor is confused. You will have"
+                               " to ensure manually that it runs only on one"
+                               " and restart this operation.")
+
+    if not (runningon_source or runningon_target):
+      raise errors.OpExecError("Instance does not seem to be running at all."
+                               " In this case, it's safer to repair by"
+                               " running 'gnt-instance stop' to ensure disk"
+                               " shutdown, and then restarting it.")
+
+    if runningon_target:
+      # the migration has actually succeeded, we need to update the config
+      self.feedback_fn("* instance running on secondary node (%s),"
+                       " updating config" % target_node)
+      instance.primary_node = target_node
+      self.cfg.Update(instance)
+      demoted_node = source_node
+    else:
+      self.feedback_fn("* instance confirmed to be running on its"
+                       " primary node (%s)" % source_node)
+      demoted_node = target_node
+
+    self._EnsureSecondary(demoted_node)
+    try:
+      self._WaitUntilSync()
+    except errors.OpExecError:
+      # we ignore here errors, since if the device is standalone, it
+      # won't be able to sync
+      pass
+    self._GoStandalone()
+    self._GoReconnect(False)
+    self._WaitUntilSync()
+
+    self.feedback_fn("* done")
+
+  def _RevertDiskStatus(self):
+    """Try to revert the disk status after a failed migration.
+
+    """
+    target_node = self.target_node
+    try:
+      self._EnsureSecondary(target_node)
+      self._GoStandalone()
+      self._GoReconnect(False)
+      self._WaitUntilSync()
+    except errors.OpExecError, err:
+      self.LogWarning("Migration failed and I can't reconnect the"
+                      " drives: error '%s'\n"
+                      "Please look and recover the instance status" %
+                      str(err))
+
+  def _AbortMigration(self):
+    """Call the hypervisor code to abort a started migration.
+
+    """
+    instance = self.instance
+    target_node = self.target_node
+    migration_info = self.migration_info
+
+    abort_result = self.rpc.call_finalize_migration(target_node,
+                                                    instance,
+                                                    migration_info,
+                                                    False)
+    abort_msg = abort_result.RemoteFailMsg()
+    if abort_msg:
+      logging.error("Aborting migration failed on target node %s: %s" %
+                    (target_node, abort_msg))
+      # Don't raise an exception here, as we stil have to try to revert the
+      # disk status, even if this step failed.
+
+  def _ExecMigration(self):
+    """Migrate an instance.
+
+    The migrate is done by:
+      - change the disks into dual-master mode
+      - wait until disks are fully synchronized again
+      - migrate the instance
+      - change disks on the new secondary node (the old primary) to secondary
+      - wait until disks are fully synchronized
+      - change disks into single-master mode
+
+    """
+    instance = self.instance
+    target_node = self.target_node
+    source_node = self.source_node
+
+    self.feedback_fn("* checking disk consistency between source and target")
+    for dev in instance.disks:
+      if not _CheckDiskConsistency(self, dev, target_node, False):
+        raise errors.OpExecError("Disk %s is degraded or not fully"
+                                 " synchronized on target node,"
+                                 " aborting migrate." % dev.iv_name)
+
+    # First get the migration information from the remote node
+    result = self.rpc.call_migration_info(source_node, instance)
+    msg = result.RemoteFailMsg()
+    if msg:
+      log_err = ("Failed fetching source migration information from %s: %s" %
+                 (source_node, msg))
+      logging.error(log_err)
+      raise errors.OpExecError(log_err)
+
+    self.migration_info = migration_info = result.payload
+
+    # Then switch the disks to master/master mode
+    self._EnsureSecondary(target_node)
+    self._GoStandalone()
+    self._GoReconnect(True)
+    self._WaitUntilSync()
+
+    self.feedback_fn("* preparing %s to accept the instance" % target_node)
+    result = self.rpc.call_accept_instance(target_node,
+                                           instance,
+                                           migration_info,
+                                           self.nodes_ip[target_node])
+
+    msg = result.RemoteFailMsg()
+    if msg:
+      logging.error("Instance pre-migration failed, trying to revert"
+                    " disk status: %s", msg)
+      self._AbortMigration()
+      self._RevertDiskStatus()
+      raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
+                               (instance.name, msg))
+
+    self.feedback_fn("* migrating instance to %s" % target_node)
+    time.sleep(10)
+    result = self.rpc.call_instance_migrate(source_node, instance,
+                                            self.nodes_ip[target_node],
+                                            self.op.live)
+    msg = result.RemoteFailMsg()
+    if msg:
+      logging.error("Instance migration failed, trying to revert"
+                    " disk status: %s", msg)
+      self._AbortMigration()
+      self._RevertDiskStatus()
+      raise errors.OpExecError("Could not migrate instance %s: %s" %
+                               (instance.name, msg))
+    time.sleep(10)
+
+    instance.primary_node = target_node
+    # distribute new instance config to the other nodes
+    self.cfg.Update(instance)
+
+    result = self.rpc.call_finalize_migration(target_node,
+                                              instance,
+                                              migration_info,
+                                              True)
+    msg = result.RemoteFailMsg()
+    if msg:
+      logging.error("Instance migration succeeded, but finalization failed:"
+                    " %s" % msg)
+      raise errors.OpExecError("Could not finalize instance migration: %s" %
+                               msg)
+
+    self._EnsureSecondary(source_node)
+    self._WaitUntilSync()
+    self._GoStandalone()
+    self._GoReconnect(False)
+    self._WaitUntilSync()
+
+    self.feedback_fn("* done")
+
+  def Exec(self, feedback_fn):
+    """Perform the migration.
+
+    """
+    self.feedback_fn = feedback_fn
+
+    self.source_node = self.instance.primary_node
+    self.target_node = self.instance.secondary_nodes[0]
+    self.all_nodes = [self.source_node, self.target_node]
+    self.nodes_ip = {
+      self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
+      self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
+      }
+    if self.op.cleanup:
+      return self._ExecCleanup()
+    else:
+      return self._ExecMigration()
 
-def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
-  """Create a tree of block devices on a secondary node.
+
+def _CreateBlockDev(lu, node, instance, device, force_create,
+                    info, force_open):
+  """Create a tree of block devices on a given node.
 
   If this device type has to be created on secondaries, create it and
   all its children.
 
   If not, just recurse to children keeping the same 'force' value.
 
+  @param lu: the lu on whose behalf we execute
+  @param node: the node on which to create the device
+  @type instance: L{objects.Instance}
+  @param instance: the instance which owns the device
+  @type device: L{objects.Disk}
+  @param device: the device to create
+  @type force_create: boolean
+  @param force_create: whether to force creation of this device; this
+      will be change to True whenever we find a device which has
+      CreateOnSecondary() attribute
+  @param info: the extra 'metadata' we should attach to the device
+      (this will be represented as a LVM tag)
+  @type force_open: boolean
+  @param force_open: this parameter will be passes to the
+      L{backend.BlockdevCreate} function where it specifies
+      whether we run on primary or not, and it affects both
+      the child assembly and the device own Open() execution
+
   """
   if device.CreateOnSecondary():
-    force = True
+    force_create = True
+
   if device.children:
     for child in device.children:
-      if not _CreateBlockDevOnSecondary(lu, node, instance,
-                                        child, force, info):
-        return False
+      _CreateBlockDev(lu, node, instance, child, force_create,
+                      info, force_open)
 
-  if not force:
-    return True
+  if not force_create:
+    return
+
+  _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
+
+
+def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
+  """Create a single block device on a given node.
+
+  This will not recurse over children of the device, so they must be
+  created in advance.
+
+  @param lu: the lu on whose behalf we execute
+  @param node: the node on which to create the device
+  @type instance: L{objects.Instance}
+  @param instance: the instance which owns the device
+  @type device: L{objects.Disk}
+  @param device: the device to create
+  @param info: the extra 'metadata' we should attach to the device
+      (this will be represented as a LVM tag)
+  @type force_open: boolean
+  @param force_open: this parameter will be passes to the
+      L{backend.BlockdevCreate} function where it specifies
+      whether we run on primary or not, and it affects both
+      the child assembly and the device own Open() execution
+
+  """
   lu.cfg.SetDiskID(device, node)
-  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
-                                       instance.name, False, info)
-  if not new_id:
-    return False
+  result = lu.rpc.call_blockdev_create(node, device, device.size,
+                                       instance.name, force_open, info)
+  msg = result.RemoteFailMsg()
+  if msg:
+    raise errors.OpExecError("Can't create block device %s on"
+                             " node %s for instance %s: %s" %
+                             (device, node, instance.name, msg))
   if device.physical_id is None:
-    device.physical_id = new_id
-  return True
+    device.physical_id = result.payload
 
 
 def _GenerateUniqueNames(lu, exts):
@@ -3150,13 +4335,14 @@ def _GenerateDiskTemplate(lu, template_name,
     if len(secondary_nodes) != 0:
       raise errors.ProgrammerError("Wrong template configuration")
 
-    names = _GenerateUniqueNames(lu, [".disk%d" % i
+    names = _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
                                       for i in range(disk_count)])
     for idx, disk in enumerate(disk_info):
       disk_index = idx + base_index
       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
                               logical_id=(vgname, names[idx]),
-                              iv_name="disk/%d" % disk_index)
+                              iv_name="disk/%d" % disk_index,
+                              mode=disk["mode"])
       disks.append(disk_dev)
   elif template_name == constants.DT_DRBD8:
     if len(secondary_nodes) != 1:
@@ -3165,17 +4351,18 @@ def _GenerateDiskTemplate(lu, template_name,
     minors = lu.cfg.AllocateDRBDMinor(
       [primary_node, remote_node] * len(disk_info), instance_name)
 
-    names = _GenerateUniqueNames(lu,
-                                 [".disk%d_%s" % (i, s)
-                                  for i in range(disk_count)
-                                  for s in ("data", "meta")
-                                  ])
+    names = []
+    for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
+                                               for i in range(disk_count)]):
+      names.append(lv_prefix + "_data")
+      names.append(lv_prefix + "_meta")
     for idx, disk in enumerate(disk_info):
       disk_index = idx + base_index
       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
                                       disk["size"], names[idx*2:idx*2+2],
                                       "disk/%d" % disk_index,
                                       minors[idx*2], minors[idx*2+1])
+      disk_dev.mode = disk["mode"]
       disks.append(disk_dev)
   elif template_name == constants.DT_FILE:
     if len(secondary_nodes) != 0:
@@ -3187,7 +4374,8 @@ def _GenerateDiskTemplate(lu, template_name,
                               iv_name="disk/%d" % disk_index,
                               logical_id=(file_driver,
                                           "%s/disk%d" % (file_storage_dir,
-                                                         idx)))
+                                                         disk_index)),
+                              mode=disk["mode"])
       disks.append(disk_dev)
   else:
     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
@@ -3215,19 +4403,18 @@ def _CreateDisks(lu, instance):
 
   """
   info = _GetInstanceInfoText(instance)
+  pnode = instance.primary_node
 
   if instance.disk_template == constants.DT_FILE:
     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
-    result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
-                                                 file_storage_dir)
+    result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
 
-    if not result:
-      logging.error("Could not connect to node '%s'", instance.primary_node)
-      return False
+    if result.failed or not result.data:
+      raise errors.OpExecError("Could not connect to node '%s'" % pnode)
 
-    if not result[0]:
-      logging.error("Failed to create directory '%s'", file_storage_dir)
-      return False
+    if not result.data[0]:
+      raise errors.OpExecError("Failed to create directory '%s'" %
+                               file_storage_dir)
 
   # Note: this needs to be kept in sync with adding of disks in
   # LUSetInstanceParams
@@ -3235,19 +4422,9 @@ def _CreateDisks(lu, instance):
     logging.info("Creating volume %s for instance %s",
                  device.iv_name, instance.name)
     #HARDCODE
-    for secondary_node in instance.secondary_nodes:
-      if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
-                                        device, False, info):
-        logging.error("Failed to create volume %s (%s) on secondary node %s!",
-                      device.iv_name, device, secondary_node)
-        return False
-    #HARDCODE
-    if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
-                                    instance, device, info):
-      logging.error("Failed to create volume %s on primary!", device.iv_name)
-      return False
-
-  return True
+    for node in instance.all_nodes:
+      f_create = node == pnode
+      _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
 
 
 def _RemoveDisks(lu, instance):
@@ -3268,23 +4445,25 @@ def _RemoveDisks(lu, instance):
   """
   logging.info("Removing block devices for instance %s", instance.name)
 
-  result = True
+  all_result = True
   for device in instance.disks:
     for node, disk in device.ComputeNodeTree(instance.primary_node):
       lu.cfg.SetDiskID(disk, node)
-      if not lu.rpc.call_blockdev_remove(node, disk):
-        lu.proc.LogWarning("Could not remove block device %s on node %s,"
-                           " continuing anyway", device.iv_name, node)
-        result = False
+      msg = lu.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
+      if msg:
+        lu.LogWarning("Could not remove block device %s on node %s,"
+                      " continuing anyway: %s", device.iv_name, node, msg)
+        all_result = False
 
   if instance.disk_template == constants.DT_FILE:
     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
-    if not lu.rpc.call_file_storage_dir_remove(instance.primary_node,
-                                               file_storage_dir):
+    result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
+                                                 file_storage_dir)
+    if result.failed or not result.data:
       logging.error("Could not remove directory '%s'", file_storage_dir)
-      result = False
+      all_result = False
 
-  return result
+  return all_result
 
 
 def _ComputeDiskSize(disk_template, disks):
@@ -3328,13 +4507,13 @@ def _CheckHVParams(lu, nodenames, hvname, hvparams):
                                                   hvname,
                                                   hvparams)
   for node in nodenames:
-    info = hvinfo.get(node, None)
-    if not info or not isinstance(info, (tuple, list)):
-      raise errors.OpPrereqError("Cannot get current information"
-                                 " from node '%s' (%s)" % (node, info))
-    if not info[0]:
-      raise errors.OpPrereqError("Hypervisor parameter validation failed:"
-                                 " %s" % info[1])
+    info = hvinfo[node]
+    if info.offline:
+      continue
+    msg = info.RemoteFailMsg()
+    if msg:
+      raise errors.OpPrereqError("Hypervisor parameter validation"
+                                 " failed on node %s: %s" % (node, msg))
 
 
 class LUCreateInstance(LogicalUnit):
@@ -3394,13 +4573,15 @@ class LUCreateInstance(LogicalUnit):
                                   ",".join(enabled_hvs)))
 
     # check hypervisor parameter syntax (locally)
-
+    utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
     filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
                                   self.op.hvparams)
     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
     hv_type.CheckParameterSyntax(filled_hvp)
+    self.hv_full = filled_hvp
 
     # fill and remember the beparams dict
+    utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
     self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
                                     self.op.beparams)
 
@@ -3439,8 +4620,16 @@ class LUCreateInstance(LogicalUnit):
         if not utils.IsValidMac(mac.lower()):
           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
                                      mac)
+        else:
+          # or validate/reserve the current one
+          if self.cfg.IsMacInUse(mac):
+            raise errors.OpPrereqError("MAC address %s already in use"
+                                       " in cluster" % mac)
+
       # bridge verification
-      bridge = nic.get("bridge", self.cfg.GetDefBridge())
+      bridge = nic.get("bridge", None)
+      if bridge is None:
+        bridge = self.cfg.GetDefBridge()
       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
 
     # disk checks/pre-build
@@ -3491,16 +4680,22 @@ class LUCreateInstance(LogicalUnit):
       src_node = getattr(self.op, "src_node", None)
       src_path = getattr(self.op, "src_path", None)
 
-      if src_node is None or src_path is None:
-        raise errors.OpPrereqError("Importing an instance requires source"
-                                   " node and path options")
-
-      if not os.path.isabs(src_path):
-        raise errors.OpPrereqError("The source path must be absolute")
+      if src_path is None:
+        self.op.src_path = src_path = self.op.instance_name
 
-      self.op.src_node = src_node = self._ExpandNode(src_node)
-      if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
-        self.needed_locks[locking.LEVEL_NODE].append(src_node)
+      if src_node is None:
+        self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+        self.op.src_node = None
+        if os.path.isabs(src_path):
+          raise errors.OpPrereqError("Importing an instance from an absolute"
+                                     " path requires a source node option.")
+      else:
+        self.op.src_node = src_node = self._ExpandNode(src_node)
+        if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
+          self.needed_locks[locking.LEVEL_NODE].append(src_node)
+        if not os.path.isabs(src_path):
+          self.op.src_path = src_path = \
+            os.path.join(constants.EXPORT_DIR, src_path)
 
     else: # INSTANCE_CREATE
       if getattr(self.op, "os_type", None) is None:
@@ -3549,23 +4744,27 @@ class LUCreateInstance(LogicalUnit):
 
     """
     env = {
-      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
-      "INSTANCE_DISK_SIZE": ",".join(str(d["size"]) for d in self.disks),
-      "INSTANCE_ADD_MODE": self.op.mode,
+      "ADD_MODE": self.op.mode,
       }
     if self.op.mode == constants.INSTANCE_IMPORT:
-      env["INSTANCE_SRC_NODE"] = self.op.src_node
-      env["INSTANCE_SRC_PATH"] = self.op.src_path
-      env["INSTANCE_SRC_IMAGES"] = self.src_images
+      env["SRC_NODE"] = self.op.src_node
+      env["SRC_PATH"] = self.op.src_path
+      env["SRC_IMAGES"] = self.src_images
 
-    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
+    env.update(_BuildInstanceHookEnv(
+      name=self.op.instance_name,
       primary_node=self.op.pnode,
       secondary_nodes=self.secondaries,
-      status=self.instance_status,
+      status=self.op.start,
       os_type=self.op.os_type,
       memory=self.be_full[constants.BE_MEMORY],
       vcpus=self.be_full[constants.BE_VCPUS],
       nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
+      disk_template=self.op.disk_template,
+      disks=[(d["size"], d["mode"]) for d in self.disks],
+      bep=self.be_full,
+      hvp=self.hv_full,
+      hypervisor_name=self.op.hypervisor,
     ))
 
     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
@@ -3582,16 +4781,32 @@ class LUCreateInstance(LogicalUnit):
       raise errors.OpPrereqError("Cluster does not support lvm-based"
                                  " instances")
 
-
     if self.op.mode == constants.INSTANCE_IMPORT:
       src_node = self.op.src_node
       src_path = self.op.src_path
 
-      export_info = self.rpc.call_export_info(src_node, src_path)
-
-      if not export_info:
+      if src_node is None:
+        exp_list = self.rpc.call_export_list(
+          self.acquired_locks[locking.LEVEL_NODE])
+        found = False
+        for node in exp_list:
+          if not exp_list[node].failed and src_path in exp_list[node].data:
+            found = True
+            self.op.src_node = src_node = node
+            self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
+                                                       src_path)
+            break
+        if not found:
+          raise errors.OpPrereqError("No export found for relative path %s" %
+                                      src_path)
+
+      _CheckNodeOnline(self, src_node)
+      result = self.rpc.call_export_info(src_node, src_path)
+      result.Raise()
+      if not result.data:
         raise errors.OpPrereqError("No export found in dir %s" % src_path)
 
+      export_info = result.data
       if not export_info.has_section(constants.INISECT_EXP):
         raise errors.ProgrammerError("Corrupted export config")
 
@@ -3606,7 +4821,7 @@ class LUCreateInstance(LogicalUnit):
       if instance_disks < export_disks:
         raise errors.OpPrereqError("Not enough disks to import."
                                    " (instance: %d, export: %d)" %
-                                   (2, export_disks))
+                                   (instance_disks, export_disks))
 
       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
       disk_images = []
@@ -3631,6 +4846,7 @@ class LUCreateInstance(LogicalUnit):
             nic_mac_ini = 'nic%d_mac' % idx
             nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
 
+    # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
     # ip ping checks (we use the same ip that was resolved in ExpandNames)
     if self.op.start and not self.op.ip_check:
       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
@@ -3641,6 +4857,18 @@ class LUCreateInstance(LogicalUnit):
         raise errors.OpPrereqError("IP %s of instance %s already in use" %
                                    (self.check_ip, self.op.instance_name))
 
+    #### mac address generation
+    # By generating here the mac address both the allocator and the hooks get
+    # the real final mac address rather than the 'auto' or 'generate' value.
+    # There is a race condition between the generation and the instance object
+    # creation, which means that we know the mac is valid now, but we're not
+    # sure it will be when we actually add the instance. If things go bad
+    # adding the instance will abort because of a duplicate mac, and the
+    # creation job will fail.
+    for nic in self.nics:
+      if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
+        nic.mac = self.cfg.GenerateMAC()
+
     #### allocator run
 
     if self.op.iallocator is not None:
@@ -3652,6 +4880,13 @@ class LUCreateInstance(LogicalUnit):
     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
     assert self.pnode is not None, \
       "Cannot retrieve locked node %s" % self.op.pnode
+    if pnode.offline:
+      raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
+                                 pnode.name)
+    if pnode.drained:
+      raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
+                                 pnode.name)
+
     self.secondaries = []
 
     # mirror node verification
@@ -3662,6 +4897,8 @@ class LUCreateInstance(LogicalUnit):
       if self.op.snode == pnode.name:
         raise errors.OpPrereqError("The secondary node cannot be"
                                    " the primary node.")
+      _CheckNodeOnline(self, self.op.snode)
+      _CheckNodeNotDrained(self, self.op.snode)
       self.secondaries.append(self.op.snode)
 
     nodenames = [pnode.name] + self.secondaries
@@ -3674,7 +4911,9 @@ class LUCreateInstance(LogicalUnit):
       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
                                          self.op.hypervisor)
       for node in nodenames:
-        info = nodeinfo.get(node, None)
+        info = nodeinfo[node]
+        info.Raise()
+        info = info.data
         if not info:
           raise errors.OpPrereqError("Cannot get current information"
                                      " from node '%s'" % node)
@@ -3690,17 +4929,19 @@ class LUCreateInstance(LogicalUnit):
     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
 
     # os verification
-    os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
-    if not os_obj:
+    result = self.rpc.call_os_get(pnode.name, self.op.os_type)
+    result.Raise()
+    if not isinstance(result.data, objects.OS) or not result.data:
       raise errors.OpPrereqError("OS '%s' not in supported os list for"
                                  " primary node"  % self.op.os_type)
 
     # bridge check on primary node
     bridges = [n.bridge for n in self.nics]
-    if not self.rpc.call_bridges_exist(self.pnode.name, bridges):
-      raise errors.OpPrereqError("one of the target bridges '%s' does not"
-                                 " exist on"
-                                 " destination node '%s'" %
+    result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
+    result.Raise()
+    if not result.data:
+      raise errors.OpPrereqError("One of the target bridges '%s' does not"
+                                 " exist on destination node '%s'" %
                                  (",".join(bridges), pnode.name))
 
     # memory check on primary node
@@ -3710,11 +4951,6 @@ class LUCreateInstance(LogicalUnit):
                            self.be_full[constants.BE_MEMORY],
                            self.op.hypervisor)
 
-    if self.op.start:
-      self.instance_status = 'up'
-    else:
-      self.instance_status = 'down'
-
   def Exec(self, feedback_fn):
     """Create and add the instance to the cluster.
 
@@ -3722,10 +4958,6 @@ class LUCreateInstance(LogicalUnit):
     instance = self.op.instance_name
     pnode_name = self.pnode.name
 
-    for nic in self.nics:
-      if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
-        nic.mac = self.cfg.GenerateMAC()
-
     ht_kind = self.op.hypervisor
     if ht_kind in constants.HTS_REQ_PORT:
       network_port = self.cfg.AllocatePort()
@@ -3760,7 +4992,7 @@ class LUCreateInstance(LogicalUnit):
                             primary_node=pnode_name,
                             nics=self.nics, disks=disks,
                             disk_template=self.op.disk_template,
-                            status=self.instance_status,
+                            admin_up=False,
                             network_port=network_port,
                             beparams=self.op.beparams,
                             hvparams=self.op.hvparams,
@@ -3768,10 +5000,15 @@ class LUCreateInstance(LogicalUnit):
                             )
 
     feedback_fn("* creating instance disks...")
-    if not _CreateDisks(self, iobj):
-      _RemoveDisks(self, iobj)
-      self.cfg.ReleaseDRBDMinors(instance)
-      raise errors.OpExecError("Device creation failed, reverting...")
+    try:
+      _CreateDisks(self, iobj)
+    except errors.OpExecError:
+      self.LogWarning("Device creation failed, reverting...")
+      try:
+        _RemoveDisks(self, iobj)
+      finally:
+        self.cfg.ReleaseDRBDMinors(instance)
+        raise
 
     feedback_fn("adding instance %s to cluster config" % instance)
 
@@ -3779,11 +5016,16 @@ class LUCreateInstance(LogicalUnit):
     # Declare that we don't want to remove the instance lock anymore, as we've
     # added the instance to the config
     del self.remove_locks[locking.LEVEL_INSTANCE]
-    # Remove the temp. assignements for the instance's drbds
-    self.cfg.ReleaseDRBDMinors(instance)
     # Unlock all the nodes
-    self.context.glm.release(locking.LEVEL_NODE)
-    del self.acquired_locks[locking.LEVEL_NODE]
+    if self.op.mode == constants.INSTANCE_IMPORT:
+      nodes_keep = [self.op.src_node]
+      nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
+                       if node != self.op.src_node]
+      self.context.glm.release(locking.LEVEL_NODE, nodes_release)
+      self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
+    else:
+      self.context.glm.release(locking.LEVEL_NODE)
+      del self.acquired_locks[locking.LEVEL_NODE]
 
     if self.op.wait_for_sync:
       disk_abort = not _WaitForSync(self, iobj)
@@ -3809,10 +5051,12 @@ class LUCreateInstance(LogicalUnit):
     if iobj.disk_template != constants.DT_DISKLESS:
       if self.op.mode == constants.INSTANCE_CREATE:
         feedback_fn("* running the instance OS create scripts...")
-        if not self.rpc.call_instance_os_add(pnode_name, iobj):
-          raise errors.OpExecError("could not add os for instance %s"
-                                   " on node %s" %
-                                   (instance, pnode_name))
+        result = self.rpc.call_instance_os_add(pnode_name, iobj)
+        msg = result.RemoteFailMsg()
+        if msg:
+          raise errors.OpExecError("Could not add os for instance %s"
+                                   " on node %s: %s" %
+                                   (instance, pnode_name, msg))
 
       elif self.op.mode == constants.INSTANCE_IMPORT:
         feedback_fn("* running the instance OS import scripts...")
@@ -3822,21 +5066,26 @@ class LUCreateInstance(LogicalUnit):
         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
                                                          src_node, src_images,
                                                          cluster_name)
-        for idx, result in enumerate(import_result):
+        import_result.Raise()
+        for idx, result in enumerate(import_result.data):
           if not result:
-            self.LogWarning("Could not image %s for on instance %s, disk %d,"
-                            " on node %s" % (src_images[idx], instance, idx,
-                                             pnode_name))
+            self.LogWarning("Could not import the image %s for instance"
+                            " %s, disk %d, on node %s" %
+                            (src_images[idx], instance, idx, pnode_name))
       else:
         # also checked in the prereq part
         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
                                      % self.op.mode)
 
     if self.op.start:
+      iobj.admin_up = True
+      self.cfg.Update(iobj)
       logging.info("Starting instance %s on node %s", instance, pnode_name)
       feedback_fn("* starting instance...")
-      if not self.rpc.call_instance_start(pnode_name, iobj, None):
-        raise errors.OpExecError("Could not start instance")
+      result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
+      msg = result.RemoteFailMsg()
+      if msg:
+        raise errors.OpExecError("Could not start instance: %s" % msg)
 
 
 class LUConnectConsole(NoHooksLU):
@@ -3862,6 +5111,7 @@ class LUConnectConsole(NoHooksLU):
     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
     assert self.instance is not None, \
       "Cannot retrieve locked instance %s" % self.op.instance_name
+    _CheckNodeOnline(self, self.instance.primary_node)
 
   def Exec(self, feedback_fn):
     """Connect to the console of an instance
@@ -3872,16 +5122,20 @@ class LUConnectConsole(NoHooksLU):
 
     node_insts = self.rpc.call_instance_list([node],
                                              [instance.hypervisor])[node]
-    if node_insts is False:
-      raise errors.OpExecError("Can't connect to node %s." % node)
+    node_insts.Raise()
 
-    if instance.name not in node_insts:
+    if instance.name not in node_insts.data:
       raise errors.OpExecError("Instance %s is not running." % instance.name)
 
     logging.debug("Connecting to console of %s on %s", instance.name, node)
 
     hyper = hypervisor.GetHypervisor(instance.hypervisor)
-    console_cmd = hyper.GetShellCommandForConsole(instance)
+    cluster = self.cfg.GetClusterInfo()
+    # beparams and hvparams are passed separately, to avoid editing the
+    # instance and then saving the defaults in the instance itself.
+    hvparams = cluster.FillHV(instance)
+    beparams = cluster.FillBE(instance)
+    console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
 
     # build ssh cmdline
     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
@@ -3896,17 +5150,32 @@ class LUReplaceDisks(LogicalUnit):
   _OP_REQP = ["instance_name", "mode", "disks"]
   REQ_BGL = False
 
-  def ExpandNames(self):
-    self._ExpandAndLockInstance()
-
+  def CheckArguments(self):
     if not hasattr(self.op, "remote_node"):
       self.op.remote_node = None
-
-    ia_name = getattr(self.op, "iallocator", None)
-    if ia_name is not None:
-      if self.op.remote_node is not None:
+    if not hasattr(self.op, "iallocator"):
+      self.op.iallocator = None
+
+    # check for valid parameter combination
+    cnt = [self.op.remote_node, self.op.iallocator].count(None)
+    if self.op.mode == constants.REPLACE_DISK_CHG:
+      if cnt == 2:
+        raise errors.OpPrereqError("When changing the secondary either an"
+                                   " iallocator script must be used or the"
+                                   " new node given")
+      elif cnt == 0:
         raise errors.OpPrereqError("Give either the iallocator or the new"
                                    " secondary, not both")
+    else: # not replacing the secondary
+      if cnt != 2:
+        raise errors.OpPrereqError("The iallocator and new node options can"
+                                   " be used only when changing the"
+                                   " secondary node")
+
+  def ExpandNames(self):
+    self._ExpandAndLockInstance()
+
+    if self.op.iallocator is not None:
       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
     elif self.op.remote_node is not None:
       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
@@ -3914,6 +5183,10 @@ class LUReplaceDisks(LogicalUnit):
         raise errors.OpPrereqError("Node '%s' not known" %
                                    self.op.remote_node)
       self.op.remote_node = remote_node
+      # Warning: do not remove the locking of the new secondary here
+      # unless DRBD8.AddChildren is changed to work in parallel;
+      # currently it doesn't since parallel invocations of
+      # FindUnusedMinor will conflict
       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
     else:
@@ -3981,9 +5254,9 @@ class LUReplaceDisks(LogicalUnit):
       "Cannot retrieve locked instance %s" % self.op.instance_name
     self.instance = instance
 
-    if instance.disk_template not in constants.DTS_NET_MIRROR:
-      raise errors.OpPrereqError("Instance's disk layout is not"
-                                 " network mirrored.")
+    if instance.disk_template != constants.DT_DRBD8:
+      raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
+                                 " instances")
 
     if len(instance.secondary_nodes) != 1:
       raise errors.OpPrereqError("The instance has a strange layout,"
@@ -3992,8 +5265,7 @@ class LUReplaceDisks(LogicalUnit):
 
     self.sec_node = instance.secondary_nodes[0]
 
-    ia_name = getattr(self.op, "iallocator", None)
-    if ia_name is not None:
+    if self.op.iallocator is not None:
       self._RunAllocator()
 
     remote_node = self.op.remote_node
@@ -4007,35 +5279,25 @@ class LUReplaceDisks(LogicalUnit):
       raise errors.OpPrereqError("The specified node is the primary node of"
                                  " the instance.")
     elif remote_node == self.sec_node:
-      if self.op.mode == constants.REPLACE_DISK_SEC:
-        # this is for DRBD8, where we can't execute the same mode of
-        # replacement as for drbd7 (no different port allocated)
-        raise errors.OpPrereqError("Same secondary given, cannot execute"
-                                   " replacement")
-    if instance.disk_template == constants.DT_DRBD8:
-      if (self.op.mode == constants.REPLACE_DISK_ALL and
-          remote_node is not None):
-        # switch to replace secondary mode
-        self.op.mode = constants.REPLACE_DISK_SEC
-
-      if self.op.mode == constants.REPLACE_DISK_ALL:
-        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
-                                   " secondary disk replacement, not"
-                                   " both at once")
-      elif self.op.mode == constants.REPLACE_DISK_PRI:
-        if remote_node is not None:
-          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
-                                     " the secondary while doing a primary"
-                                     " node disk replacement")
-        self.tgt_node = instance.primary_node
-        self.oth_node = instance.secondary_nodes[0]
-      elif self.op.mode == constants.REPLACE_DISK_SEC:
-        self.new_node = remote_node # this can be None, in which case
-                                    # we don't change the secondary
-        self.tgt_node = instance.secondary_nodes[0]
-        self.oth_node = instance.primary_node
-      else:
-        raise errors.ProgrammerError("Unhandled disk replace mode")
+      raise errors.OpPrereqError("The specified node is already the"
+                                 " secondary node of the instance.")
+
+    if self.op.mode == constants.REPLACE_DISK_PRI:
+      n1 = self.tgt_node = instance.primary_node
+      n2 = self.oth_node = self.sec_node
+    elif self.op.mode == constants.REPLACE_DISK_SEC:
+      n1 = self.tgt_node = self.sec_node
+      n2 = self.oth_node = instance.primary_node
+    elif self.op.mode == constants.REPLACE_DISK_CHG:
+      n1 = self.new_node = remote_node
+      n2 = self.oth_node = instance.primary_node
+      self.tgt_node = self.sec_node
+      _CheckNodeNotDrained(self, remote_node)
+    else:
+      raise errors.ProgrammerError("Unhandled disk replace mode")
+
+    _CheckNodeOnline(self, n1)
+    _CheckNodeOnline(self, n2)
 
     if not self.op.disks:
       self.op.disks = range(len(instance.disks))
@@ -4083,8 +5345,8 @@ class LUReplaceDisks(LogicalUnit):
     if not results:
       raise errors.OpExecError("Can't list volume groups on the nodes")
     for node in oth_node, tgt_node:
-      res = results.get(node, False)
-      if not res or my_vg not in res:
+      res = results[node]
+      if res.failed or not res.data or my_vg not in res.data:
         raise errors.OpExecError("Volume group '%s' not found on %s" %
                                  (my_vg, node))
     for idx, dev in enumerate(instance.disks):
@@ -4093,9 +5355,13 @@ class LUReplaceDisks(LogicalUnit):
       for node in tgt_node, oth_node:
         info("checking disk/%d on %s" % (idx, node))
         cfg.SetDiskID(dev, node)
-        if not self.rpc.call_blockdev_find(node, dev):
-          raise errors.OpExecError("Can't find disk/%d on node %s" %
-                                   (idx, node))
+        result = self.rpc.call_blockdev_find(node, dev)
+        msg = result.RemoteFailMsg()
+        if not msg and not result.payload:
+          msg = "disk not found"
+        if msg:
+          raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
+                                   (idx, node, msg))
 
     # Step: check other node consistency
     self.proc.LogStep(2, steps_total, "check peer consistency")
@@ -4128,21 +5394,18 @@ class LUReplaceDisks(LogicalUnit):
       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
       info("creating new local storage on %s for %s" %
            (tgt_node, dev.iv_name))
-      # since we *always* want to create this LV, we use the
-      # _Create...OnPrimary (which forces the creation), even if we
-      # are talking about the secondary node
+      # we pass force_create=True to force the LVM creation
       for new_lv in new_lvs:
-        if not _CreateBlockDevOnPrimary(self, tgt_node, instance, new_lv,
-                                        _GetInstanceInfoText(instance)):
-          raise errors.OpExecError("Failed to create new LV named '%s' on"
-                                   " node '%s'" %
-                                   (new_lv.logical_id[1], tgt_node))
+        _CreateBlockDev(self, tgt_node, instance, new_lv, True,
+                        _GetInstanceInfoText(instance), False)
 
     # Step: for each lv, detach+rename*2+attach
     self.proc.LogStep(4, steps_total, "change drbd configuration")
     for dev, old_lvs, new_lvs in iv_names.itervalues():
       info("detaching %s drbd from local storage" % dev.iv_name)
-      if not self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
+      result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
+      result.Raise()
+      if not result.data:
         raise errors.OpExecError("Can't detach drbd from local storage on node"
                                  " %s for device %s" % (tgt_node, dev.iv_name))
       #dev.children = []
@@ -4161,17 +5424,22 @@ class LUReplaceDisks(LogicalUnit):
       # build the rename list based on what LVs exist on the node
       rlist = []
       for to_ren in old_lvs:
-        find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
-        if find_res is not None: # device exists
+        result = self.rpc.call_blockdev_find(tgt_node, to_ren)
+        if not result.RemoteFailMsg() and result.payload:
+          # device exists
           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
 
       info("renaming the old LVs on the target node")
-      if not self.rpc.call_blockdev_rename(tgt_node, rlist):
+      result = self.rpc.call_blockdev_rename(tgt_node, rlist)
+      result.Raise()
+      if not result.data:
         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
       # now we rename the new LVs to the old LVs
       info("renaming the new LVs on the target node")
       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
-      if not self.rpc.call_blockdev_rename(tgt_node, rlist):
+      result = self.rpc.call_blockdev_rename(tgt_node, rlist)
+      result.Raise()
+      if not result.data:
         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
 
       for old, new in zip(old_lvs, new_lvs):
@@ -4184,11 +5452,13 @@ class LUReplaceDisks(LogicalUnit):
 
       # now that the new lvs have the old name, we can add them to the device
       info("adding new mirror component on %s" % tgt_node)
-      if not self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
+      result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
+      if result.failed or not result.data:
         for new_lv in new_lvs:
-          if not self.rpc.call_blockdev_remove(tgt_node, new_lv):
-            warning("Can't rollback device %s", hint="manually cleanup unused"
-                    " logical volumes")
+          msg = self.rpc.call_blockdev_remove(tgt_node, new_lv).RemoteFailMsg()
+          if msg:
+            warning("Can't rollback device %s: %s", dev, msg,
+                    hint="cleanup manually the unused logical volumes")
         raise errors.OpExecError("Can't add local storage to drbd")
 
       dev.children = new_lvs
@@ -4205,8 +5475,14 @@ class LUReplaceDisks(LogicalUnit):
     # so check manually all the devices
     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
       cfg.SetDiskID(dev, instance.primary_node)
-      is_degr = self.rpc.call_blockdev_find(instance.primary_node, dev)[5]
-      if is_degr:
+      result = self.rpc.call_blockdev_find(instance.primary_node, dev)
+      msg = result.RemoteFailMsg()
+      if not msg and not result.payload:
+        msg = "disk not found"
+      if msg:
+        raise errors.OpExecError("Can't find DRBD device %s: %s" %
+                                 (name, msg))
+      if result.payload[5]:
         raise errors.OpExecError("DRBD device %s is degraded!" % name)
 
     # Step: remove old storage
@@ -4215,8 +5491,10 @@ class LUReplaceDisks(LogicalUnit):
       info("remove logical volumes for %s" % name)
       for lv in old_lvs:
         cfg.SetDiskID(lv, tgt_node)
-        if not self.rpc.call_blockdev_remove(tgt_node, lv):
-          warning("Can't remove old LV", hint="manually remove unused LVs")
+        msg = self.rpc.call_blockdev_remove(tgt_node, lv).RemoteFailMsg()
+        if msg:
+          warning("Can't remove old LV: %s" % msg,
+                  hint="manually remove unused LVs")
           continue
 
   def _ExecD8Secondary(self, feedback_fn):
@@ -4242,23 +5520,25 @@ class LUReplaceDisks(LogicalUnit):
     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
     instance = self.instance
     iv_names = {}
-    vgname = self.cfg.GetVGName()
     # start of work
     cfg = self.cfg
     old_node = self.tgt_node
     new_node = self.new_node
     pri_node = instance.primary_node
+    nodes_ip = {
+      old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
+      new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
+      pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
+      }
 
     # Step: check device activation
     self.proc.LogStep(1, steps_total, "check device existence")
     info("checking volume groups")
     my_vg = cfg.GetVGName()
     results = self.rpc.call_vg_list([pri_node, new_node])
-    if not results:
-      raise errors.OpExecError("Can't list volume groups on the nodes")
     for node in pri_node, new_node:
-      res = results.get(node, False)
-      if not res or my_vg not in res:
+      res = results[node]
+      if res.failed or not res.data or my_vg not in res.data:
         raise errors.OpExecError("Volume group '%s' not found on %s" %
                                  (my_vg, node))
     for idx, dev in enumerate(instance.disks):
@@ -4266,9 +5546,13 @@ class LUReplaceDisks(LogicalUnit):
         continue
       info("checking disk/%d on %s" % (idx, pri_node))
       cfg.SetDiskID(dev, pri_node)
-      if not self.rpc.call_blockdev_find(pri_node, dev):
-        raise errors.OpExecError("Can't find disk/%d on node %s" %
-                                 (idx, pri_node))
+      result = self.rpc.call_blockdev_find(pri_node, dev)
+      msg = result.RemoteFailMsg()
+      if not msg and not result.payload:
+        msg = "disk not found"
+      if msg:
+        raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
+                                 (idx, pri_node, msg))
 
     # Step: check other node consistency
     self.proc.LogStep(2, steps_total, "check peer consistency")
@@ -4284,18 +5568,12 @@ class LUReplaceDisks(LogicalUnit):
     # Step: create new storage
     self.proc.LogStep(3, steps_total, "allocate new storage")
     for idx, dev in enumerate(instance.disks):
-      size = dev.size
       info("adding new local storage on %s for disk/%d" %
            (new_node, idx))
-      # since we *always* want to create this LV, we use the
-      # _Create...OnPrimary (which forces the creation), even if we
-      # are talking about the secondary node
+      # we pass force_create=True to force LVM creation
       for new_lv in dev.children:
-        if not _CreateBlockDevOnPrimary(self, new_node, instance, new_lv,
-                                        _GetInstanceInfoText(instance)):
-          raise errors.OpExecError("Failed to create new LV named '%s' on"
-                                   " node '%s'" %
-                                   (new_lv.logical_id[1], new_node))
+        _CreateBlockDev(self, new_node, instance, new_lv, True,
+                        _GetInstanceInfoText(instance), False)
 
     # Step 4: dbrd minors and drbd setups changes
     # after this, we must manually remove the drbd minors on both the
@@ -4305,57 +5583,54 @@ class LUReplaceDisks(LogicalUnit):
     logging.debug("Allocated minors %s" % (minors,))
     self.proc.LogStep(4, steps_total, "changing drbd configuration")
     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
-      size = dev.size
       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
-      # create new devices on new_node
-      if pri_node == dev.logical_id[0]:
-        new_logical_id = (pri_node, new_node,
-                          dev.logical_id[2], dev.logical_id[3], new_minor,
-                          dev.logical_id[5])
+      # create new devices on new_node; note that we create two IDs:
+      # one without port, so the drbd will be activated without
+      # networking information on the new node at this stage, and one
+      # with network, for the latter activation in step 4
+      (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
+      if pri_node == o_node1:
+        p_minor = o_minor1
       else:
-        new_logical_id = (new_node, pri_node,
-                          dev.logical_id[2], new_minor, dev.logical_id[4],
-                          dev.logical_id[5])
-      iv_names[idx] = (dev, dev.children, new_logical_id)
+        p_minor = o_minor2
+
+      new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
+      new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
+
+      iv_names[idx] = (dev, dev.children, new_net_id)
       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
-                    new_logical_id)
+                    new_net_id)
       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
-                              logical_id=new_logical_id,
-                              children=dev.children)
-      if not _CreateBlockDevOnSecondary(self, new_node, instance,
-                                        new_drbd, False,
-                                        _GetInstanceInfoText(instance)):
+                              logical_id=new_alone_id,
+                              children=dev.children,
+                              size=dev.size)
+      try:
+        _CreateSingleBlockDev(self, new_node, instance, new_drbd,
+                              _GetInstanceInfoText(instance), False)
+      except errors.GenericError:
         self.cfg.ReleaseDRBDMinors(instance.name)
-        raise errors.OpExecError("Failed to create new DRBD on"
-                                 " node '%s'" % new_node)
+        raise
 
     for idx, dev in enumerate(instance.disks):
       # we have new devices, shutdown the drbd on the old secondary
       info("shutting down drbd for disk/%d on old node" % idx)
       cfg.SetDiskID(dev, old_node)
-      if not self.rpc.call_blockdev_shutdown(old_node, dev):
-        warning("Failed to shutdown drbd for disk/%d on old node" % idx,
+      msg = self.rpc.call_blockdev_shutdown(old_node, dev).RemoteFailMsg()
+      if msg:
+        warning("Failed to shutdown drbd for disk/%d on old node: %s" %
+                (idx, msg),
                 hint="Please cleanup this device manually as soon as possible")
 
     info("detaching primary drbds from the network (=> standalone)")
-    done = 0
-    for idx, dev in enumerate(instance.disks):
-      cfg.SetDiskID(dev, pri_node)
-      # set the network part of the physical (unique in bdev terms) id
-      # to None, meaning detach from network
-      dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
-      # and 'find' the device, which will 'fix' it to match the
-      # standalone state
-      if self.rpc.call_blockdev_find(pri_node, dev):
-        done += 1
-      else:
-        warning("Failed to detach drbd disk/%d from network, unusual case" %
-                idx)
+    result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
+                                               instance.disks)[pri_node]
 
-    if not done:
-      # no detaches succeeded (very unlikely)
+    msg = result.RemoteFailMsg()
+    if msg:
+      # detaches didn't succeed (unlikely)
       self.cfg.ReleaseDRBDMinors(instance.name)
-      raise errors.OpExecError("Can't detach at least one DRBD from old node")
+      raise errors.OpExecError("Can't detach the disks from the network on"
+                               " old node: %s" % (msg,))
 
     # if we managed to detach at least one, we update all the disks of
     # the instance to point to the new secondary
@@ -4364,23 +5639,18 @@ class LUReplaceDisks(LogicalUnit):
       dev.logical_id = new_logical_id
       cfg.SetDiskID(dev, pri_node)
     cfg.Update(instance)
-    # we can remove now the temp minors as now the new values are
-    # written to the config file (and therefore stable)
-    self.cfg.ReleaseDRBDMinors(instance.name)
 
     # and now perform the drbd attach
     info("attaching primary drbds to new secondary (standalone => connected)")
-    failures = []
-    for idx, dev in enumerate(instance.disks):
-      info("attaching primary drbd for disk/%d to new secondary node" % idx)
-      # since the attach is smart, it's enough to 'find' the device,
-      # it will automatically activate the network, if the physical_id
-      # is correct
-      cfg.SetDiskID(dev, pri_node)
-      logging.debug("Disk to attach: %s", dev)
-      if not self.rpc.call_blockdev_find(pri_node, dev):
-        warning("can't attach drbd disk/%d to new secondary!" % idx,
-                "please do a gnt-instance info to see the status of disks")
+    result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
+                                           instance.disks, instance.name,
+                                           False)
+    for to_node, to_result in result.items():
+      msg = to_result.RemoteFailMsg()
+      if msg:
+        warning("can't attach drbd disks on node %s: %s", to_node, msg,
+                hint="please do a gnt-instance info to see the"
+                " status of disks")
 
     # this can fail as the old devices are degraded and _WaitForSync
     # does a combined result over all disks, so we don't check its
@@ -4391,8 +5661,14 @@ class LUReplaceDisks(LogicalUnit):
     # so check manually all the devices
     for idx, (dev, old_lvs, _) in iv_names.iteritems():
       cfg.SetDiskID(dev, pri_node)
-      is_degr = self.rpc.call_blockdev_find(pri_node, dev)[5]
-      if is_degr:
+      result = self.rpc.call_blockdev_find(pri_node, dev)
+      msg = result.RemoteFailMsg()
+      if not msg and not result.payload:
+        msg = "disk not found"
+      if msg:
+        raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
+                                 (idx, msg))
+      if result.payload[5]:
         raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
 
     self.proc.LogStep(6, steps_total, "removing old storage")
@@ -4400,8 +5676,9 @@ class LUReplaceDisks(LogicalUnit):
       info("remove logical volumes for disk/%d" % idx)
       for lv in old_lvs:
         cfg.SetDiskID(lv, old_node)
-        if not self.rpc.call_blockdev_remove(old_node, lv):
-          warning("Can't remove LV on old secondary",
+        msg = self.rpc.call_blockdev_remove(old_node, lv).RemoteFailMsg()
+        if msg:
+          warning("Can't remove LV on old secondary: %s", msg,
                   hint="Cleanup stale volumes by hand")
 
   def Exec(self, feedback_fn):
@@ -4413,21 +5690,18 @@ class LUReplaceDisks(LogicalUnit):
     instance = self.instance
 
     # Activate the instance disks if we're replacing them on a down instance
-    if instance.status == "down":
+    if not instance.admin_up:
       _StartInstanceDisks(self, instance, True)
 
-    if instance.disk_template == constants.DT_DRBD8:
-      if self.op.remote_node is None:
-        fn = self._ExecD8DiskOnly
-      else:
-        fn = self._ExecD8Secondary
+    if self.op.mode == constants.REPLACE_DISK_CHG:
+      fn = self._ExecD8Secondary
     else:
-      raise errors.ProgrammerError("Unhandled disk replacement case")
+      fn = self._ExecD8DiskOnly
 
     ret = fn(feedback_fn)
 
     # Deactivate the instance disks if we're replacing them on a down instance
-    if instance.status == "down":
+    if not instance.admin_up:
       _SafeShutdownInstanceDisks(self, instance)
 
     return ret
@@ -4477,6 +5751,10 @@ class LUGrowDisk(LogicalUnit):
     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
     assert instance is not None, \
       "Cannot retrieve locked instance %s" % self.op.instance_name
+    nodenames = list(instance.all_nodes)
+    for node in nodenames:
+      _CheckNodeOnline(self, node)
+
 
     self.instance = instance
 
@@ -4486,22 +5764,21 @@ class LUGrowDisk(LogicalUnit):
 
     self.disk = instance.FindDisk(self.op.disk)
 
-    nodenames = [instance.primary_node] + list(instance.secondary_nodes)
     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
                                        instance.hypervisor)
     for node in nodenames:
-      info = nodeinfo.get(node, None)
-      if not info:
+      info = nodeinfo[node]
+      if info.failed or not info.data:
         raise errors.OpPrereqError("Cannot get current information"
                                    " from node '%s'" % node)
-      vg_free = info.get('vg_free', None)
+      vg_free = info.data.get('vg_free', None)
       if not isinstance(vg_free, int):
         raise errors.OpPrereqError("Can't compute free disk space on"
                                    " node %s" % node)
-      if self.op.amount > info['vg_free']:
+      if self.op.amount > vg_free:
         raise errors.OpPrereqError("Not enough disk space on target node %s:"
                                    " %d MiB available, %d MiB required" %
-                                   (node, info['vg_free'], self.op.amount))
+                                   (node, vg_free, self.op.amount))
 
   def Exec(self, feedback_fn):
     """Execute disk grow.
@@ -4509,15 +5786,13 @@ class LUGrowDisk(LogicalUnit):
     """
     instance = self.instance
     disk = self.disk
-    for node in (instance.secondary_nodes + (instance.primary_node,)):
+    for node in instance.all_nodes:
       self.cfg.SetDiskID(disk, node)
       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
-      if (not result or not isinstance(result, (list, tuple)) or
-          len(result) != 2):
-        raise errors.OpExecError("grow request failed to node %s" % node)
-      elif not result[0]:
-        raise errors.OpExecError("grow request failed to node %s: %s" %
-                                 (node, result[1]))
+      msg = result.RemoteFailMsg()
+      if msg:
+        raise errors.OpExecError("Grow request failed to node %s: %s" %
+                                 (node, msg))
     disk.RecordGrow(self.op.amount)
     self.cfg.Update(instance)
     if self.op.wait_for_sync:
@@ -4546,8 +5821,7 @@ class LUQueryInstanceData(NoHooksLU):
       for name in self.op.instances:
         full_name = self.cfg.ExpandInstanceName(name)
         if full_name is None:
-          raise errors.OpPrereqError("Instance '%s' not known" %
-                                     self.op.instance_name)
+          raise errors.OpPrereqError("Instance '%s' not known" % name)
         self.wanted_names.append(full_name)
       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
     else:
@@ -4582,6 +5856,14 @@ class LUQueryInstanceData(NoHooksLU):
     if not static:
       self.cfg.SetDiskID(dev, instance.primary_node)
       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
+      if dev_pstatus.offline:
+        dev_pstatus = None
+      else:
+        msg = dev_pstatus.RemoteFailMsg()
+        if msg:
+          raise errors.OpExecError("Can't compute disk status for %s: %s" %
+                                   (instance.name, msg))
+        dev_pstatus = dev_pstatus.payload
     else:
       dev_pstatus = None
 
@@ -4595,6 +5877,14 @@ class LUQueryInstanceData(NoHooksLU):
     if snode and not static:
       self.cfg.SetDiskID(dev, snode)
       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
+      if dev_sstatus.offline:
+        dev_sstatus = None
+      else:
+        msg = dev_sstatus.RemoteFailMsg()
+        if msg:
+          raise errors.OpExecError("Can't compute disk status for %s: %s" %
+                                   (instance.name, msg))
+        dev_sstatus = dev_sstatus.payload
     else:
       dev_sstatus = None
 
@@ -4613,6 +5903,7 @@ class LUQueryInstanceData(NoHooksLU):
       "sstatus": dev_sstatus,
       "children": dev_children,
       "mode": dev.mode,
+      "size": dev.size,
       }
 
     return data
@@ -4628,16 +5919,18 @@ class LUQueryInstanceData(NoHooksLU):
         remote_info = self.rpc.call_instance_info(instance.primary_node,
                                                   instance.name,
                                                   instance.hypervisor)
+        remote_info.Raise()
+        remote_info = remote_info.data
         if remote_info and "state" in remote_info:
           remote_state = "up"
         else:
           remote_state = "down"
       else:
         remote_state = None
-      if instance.status == "down":
-        config_state = "down"
-      else:
+      if instance.admin_up:
         config_state = "up"
+      else:
+        config_state = "down"
 
       disks = [self._ComputeDiskStatus(instance, None, device)
                for device in instance.disks]
@@ -4687,14 +5980,6 @@ class LUSetInstanceParams(LogicalUnit):
             self.op.hvparams or self.op.beparams):
       raise errors.OpPrereqError("No changes submitted")
 
-    for item in (constants.BE_MEMORY, constants.BE_VCPUS):
-      val = self.op.beparams.get(item, None)
-      if val is not None:
-        try:
-          val = int(val)
-        except ValueError, err:
-          raise errors.OpPrereqError("Invalid %s size: %s" % (item, str(err)))
-        self.op.beparams[item] = val
     # Disk validation
     disk_addremove = 0
     for disk_op, disk_dict in self.op.disks:
@@ -4708,7 +5993,7 @@ class LUSetInstanceParams(LogicalUnit):
           raise errors.OpPrereqError("Invalid disk index")
       if disk_op == constants.DDM_ADD:
         mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
-        if mode not in (constants.DISK_RDONLY, constants.DISK_RDWR):
+        if mode not in constants.DISK_ACCESS_SET:
           raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
         size = disk_dict.get('size', None)
         if size is None:
@@ -4744,24 +6029,29 @@ class LUSetInstanceParams(LogicalUnit):
       # nic_dict should be a dict
       nic_ip = nic_dict.get('ip', None)
       if nic_ip is not None:
-        if nic_ip.lower() == "none":
+        if nic_ip.lower() == constants.VALUE_NONE:
           nic_dict['ip'] = None
         else:
           if not utils.IsValidIP(nic_ip):
             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
-      # we can only check None bridges and assign the default one
-      nic_bridge = nic_dict.get('bridge', None)
-      if nic_bridge is None:
-        nic_dict['bridge'] = self.cfg.GetDefBridge()
-      # but we can validate MACs
-      nic_mac = nic_dict.get('mac', None)
-      if nic_mac is not None:
-        if self.cfg.IsMacInUse(nic_mac):
-          raise errors.OpPrereqError("MAC address %s already in use"
-                                     " in cluster" % nic_mac)
+
+      if nic_op == constants.DDM_ADD:
+        nic_bridge = nic_dict.get('bridge', None)
+        if nic_bridge is None:
+          nic_dict['bridge'] = self.cfg.GetDefBridge()
+        nic_mac = nic_dict.get('mac', None)
+        if nic_mac is None:
+          nic_dict['mac'] = constants.VALUE_AUTO
+
+      if 'mac' in nic_dict:
+        nic_mac = nic_dict['mac']
         if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
           if not utils.IsValidMac(nic_mac):
             raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
+        if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
+          raise errors.OpPrereqError("'auto' is not a valid MAC address when"
+                                     " modifying an existing nic")
+
     if nic_addremove > 1:
       raise errors.OpPrereqError("Only one NIC add or remove operation"
                                  " supported at a time")
@@ -4786,10 +6076,39 @@ class LUSetInstanceParams(LogicalUnit):
       args['memory'] = self.be_new[constants.BE_MEMORY]
     if constants.BE_VCPUS in self.be_new:
       args['vcpus'] = self.be_new[constants.BE_VCPUS]
-    # FIXME: readd disk/nic changes
+    # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
+    # information at all.
+    if self.op.nics:
+      args['nics'] = []
+      nic_override = dict(self.op.nics)
+      for idx, nic in enumerate(self.instance.nics):
+        if idx in nic_override:
+          this_nic_override = nic_override[idx]
+        else:
+          this_nic_override = {}
+        if 'ip' in this_nic_override:
+          ip = this_nic_override['ip']
+        else:
+          ip = nic.ip
+        if 'bridge' in this_nic_override:
+          bridge = this_nic_override['bridge']
+        else:
+          bridge = nic.bridge
+        if 'mac' in this_nic_override:
+          mac = this_nic_override['mac']
+        else:
+          mac = nic.mac
+        args['nics'].append((ip, bridge, mac))
+      if constants.DDM_ADD in nic_override:
+        ip = nic_override[constants.DDM_ADD].get('ip', None)
+        bridge = nic_override[constants.DDM_ADD]['bridge']
+        mac = nic_override[constants.DDM_ADD]['mac']
+        args['nics'].append((ip, bridge, mac))
+      elif constants.DDM_REMOVE in nic_override:
+        del args['nics'][-1]
+
     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
-    nl = [self.cfg.GetMasterNode(),
-          self.instance.primary_node] + list(self.instance.secondary_nodes)
+    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
     return env, nl, nl
 
   def CheckPrereq(self):
@@ -4798,22 +6117,21 @@ class LUSetInstanceParams(LogicalUnit):
     This only checks the instance list against the existing names.
 
     """
-    force = self.force = self.op.force
+    self.force = self.op.force
 
     # checking the new params on the primary/secondary nodes
 
     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
     assert self.instance is not None, \
       "Cannot retrieve locked instance %s" % self.op.instance_name
-    pnode = self.instance.primary_node
-    nodelist = [pnode]
-    nodelist.extend(instance.secondary_nodes)
+    pnode = instance.primary_node
+    nodelist = list(instance.all_nodes)
 
     # hvparams processing
     if self.op.hvparams:
       i_hvdict = copy.deepcopy(instance.hvparams)
       for key, val in self.op.hvparams.iteritems():
-        if val is None:
+        if val == constants.VALUE_DEFAULT:
           try:
             del i_hvdict[key]
           except KeyError:
@@ -4821,6 +6139,7 @@ class LUSetInstanceParams(LogicalUnit):
         else:
           i_hvdict[key] = val
       cluster = self.cfg.GetClusterInfo()
+      utils.ForceDictType(i_hvdict, constants.HVS_PARAMETER_TYPES)
       hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
                                 i_hvdict)
       # local check
@@ -4836,7 +6155,7 @@ class LUSetInstanceParams(LogicalUnit):
     if self.op.beparams:
       i_bedict = copy.deepcopy(instance.beparams)
       for key, val in self.op.beparams.iteritems():
-        if val is None:
+        if val == constants.VALUE_DEFAULT:
           try:
             del i_bedict[key]
           except KeyError:
@@ -4844,6 +6163,7 @@ class LUSetInstanceParams(LogicalUnit):
         else:
           i_bedict[key] = val
       cluster = self.cfg.GetClusterInfo()
+      utils.ForceDictType(i_bedict, constants.BES_PARAMETER_TYPES)
       be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
                                 i_bedict)
       self.be_new = be_new # the new actual values
@@ -4862,30 +6182,31 @@ class LUSetInstanceParams(LogicalUnit):
                                                   instance.hypervisor)
       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
                                          instance.hypervisor)
-
-      if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
+      if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
         # Assume the primary node is unreachable and go ahead
         self.warn.append("Can't get info from primary node %s" % pnode)
       else:
-        if instance_info:
-          current_mem = instance_info['memory']
+        if not instance_info.failed and instance_info.data:
+          current_mem = int(instance_info.data['memory'])
         else:
           # Assume instance not running
           # (there is a slight race condition here, but it's not very probable,
           # and we have no other way to check)
           current_mem = 0
         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
-                    nodeinfo[pnode]['memory_free'])
+                    nodeinfo[pnode].data['memory_free'])
         if miss_mem > 0:
           raise errors.OpPrereqError("This change will prevent the instance"
                                      " from starting, due to %d MB of memory"
                                      " missing on its primary node" % miss_mem)
 
       if be_new[constants.BE_AUTO_BALANCE]:
-        for node in instance.secondary_nodes:
-          if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
+        for node, nres in nodeinfo.iteritems():
+          if node not in instance.secondary_nodes:
+            continue
+          if nres.failed or not isinstance(nres.data, dict):
             self.warn.append("Can't get info from secondary node %s" % node)
-          elif be_new[constants.BE_MEMORY] > nodeinfo[node]['memory_free']:
+          elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
             self.warn.append("Not enough memory to failover instance to"
                              " secondary node %s" % node)
 
@@ -4901,8 +6222,10 @@ class LUSetInstanceParams(LogicalUnit):
           raise errors.OpPrereqError("Invalid NIC index %s, valid values"
                                      " are 0 to %d" %
                                      (nic_op, len(instance.nics)))
-      nic_bridge = nic_dict.get('bridge', None)
-      if nic_bridge is not None:
+      if 'bridge' in nic_dict:
+        nic_bridge = nic_dict['bridge']
+        if nic_bridge is None:
+          raise errors.OpPrereqError('Cannot set the nic bridge to None')
         if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
           msg = ("Bridge '%s' doesn't exist on one of"
                  " the instance nodes" % nic_bridge)
@@ -4910,6 +6233,18 @@ class LUSetInstanceParams(LogicalUnit):
             self.warn.append(msg)
           else:
             raise errors.OpPrereqError(msg)
+      if 'mac' in nic_dict:
+        nic_mac = nic_dict['mac']
+        if nic_mac is None:
+          raise errors.OpPrereqError('Cannot set the nic mac to None')
+        elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
+          # otherwise generate the mac
+          nic_dict['mac'] = self.cfg.GenerateMAC()
+        else:
+          # or validate/reserve the current one
+          if self.cfg.IsMacInUse(nic_mac):
+            raise errors.OpPrereqError("MAC address %s already in use"
+                                       " in cluster" % nic_mac)
 
     # DISK processing
     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
@@ -4922,9 +6257,9 @@ class LUSetInstanceParams(LogicalUnit):
                                      " an instance")
         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
         ins_l = ins_l[pnode]
-        if not type(ins_l) is list:
+        if ins_l.failed or not isinstance(ins_l.data, list):
           raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
-        if instance.name in ins_l:
+        if instance.name in ins_l.data:
           raise errors.OpPrereqError("Instance is running, can't remove"
                                      " disks.")
 
@@ -4962,9 +6297,10 @@ class LUSetInstanceParams(LogicalUnit):
         device_idx = len(instance.disks)
         for node, disk in device.ComputeNodeTree(instance.primary_node):
           self.cfg.SetDiskID(disk, node)
-          if not self.rpc.call_blockdev_remove(node, disk):
-            self.proc.LogWarning("Could not remove disk/%d on node %s,"
-                                 " continuing anyway", device_idx, node)
+          msg = self.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
+          if msg:
+            self.LogWarning("Could not remove disk/%d on node %s: %s,"
+                            " continuing anyway", device_idx, node, msg)
         result.append(("disk/%d" % device_idx, "remove"))
       elif disk_op == constants.DDM_ADD:
         # add a new disk
@@ -4976,13 +6312,12 @@ class LUSetInstanceParams(LogicalUnit):
         disk_idx_base = len(instance.disks)
         new_disk = _GenerateDiskTemplate(self,
                                          instance.disk_template,
-                                         instance, instance.primary_node,
+                                         instance.name, instance.primary_node,
                                          instance.secondary_nodes,
                                          [disk_dict],
                                          file_path,
                                          file_driver,
                                          disk_idx_base)[0]
-        new_disk.mode = disk_dict['mode']
         instance.disks.append(new_disk)
         info = _GetInstanceInfoText(instance)
 
@@ -4990,17 +6325,15 @@ class LUSetInstanceParams(LogicalUnit):
                      new_disk.iv_name, instance.name)
         # Note: this needs to be kept in sync with _CreateDisks
         #HARDCODE
-        for secondary_node in instance.secondary_nodes:
-          if not _CreateBlockDevOnSecondary(self, secondary_node, instance,
-                                            new_disk, False, info):
+        for node in instance.all_nodes:
+          f_create = node == instance.primary_node
+          try:
+            _CreateBlockDev(self, node, instance, new_disk,
+                            f_create, info, f_create)
+          except errors.OpExecError, err:
             self.LogWarning("Failed to create volume %s (%s) on"
-                            " secondary node %s!",
-                            new_disk.iv_name, new_disk, secondary_node)
-        #HARDCODE
-        if not _CreateBlockDevOnPrimary(self, instance.primary_node,
-                                        instance, new_disk, info):
-          self.LogWarning("Failed to create volume %s on primary!",
-                          new_disk.iv_name)
+                            " node %s: %s",
+                            new_disk.iv_name, new_disk, node, err)
         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
                        (new_disk.size, new_disk.mode)))
       else:
@@ -5014,15 +6347,11 @@ class LUSetInstanceParams(LogicalUnit):
         del instance.nics[-1]
         result.append(("nic.%d" % len(instance.nics), "remove"))
       elif nic_op == constants.DDM_ADD:
-        # add a new nic
-        if 'mac' not in nic_dict:
-          mac = constants.VALUE_GENERATE
-        else:
-          mac = nic_dict['mac']
-        if mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
-          mac = self.cfg.GenerateMAC()
+        # mac and bridge should be set, by now
+        mac = nic_dict['mac']
+        bridge = nic_dict['bridge']
         new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
-                              bridge=nic_dict.get('bridge', None))
+                              bridge=bridge)
         instance.nics.append(new_nic)
         result.append(("nic.%d" % (len(instance.nics) - 1),
                        "add:mac=%s,ip=%s,bridge=%s" %
@@ -5036,7 +6365,7 @@ class LUSetInstanceParams(LogicalUnit):
 
     # hvparams changes
     if self.op.hvparams:
-      instance.hvparams = self.hv_new
+      instance.hvparams = self.hv_inst
       for key, val in self.op.hvparams.iteritems():
         result.append(("hv/%s" % key, val))
 
@@ -5082,7 +6411,15 @@ class LUQueryExports(NoHooksLU):
         that node.
 
     """
-    return self.rpc.call_export_list(self.nodes)
+    rpcresult = self.rpc.call_export_list(self.nodes)
+    result = {}
+    for node in rpcresult:
+      if rpcresult[node].failed:
+        result[node] = False
+      else:
+        result[node] = rpcresult[node].data
+
+    return result
 
 
 class LUExportInstance(LogicalUnit):
@@ -5103,7 +6440,7 @@ class LUExportInstance(LogicalUnit):
     # remove it from its current node. In the future we could fix this by:
     #  - making a tasklet to search (share-lock all), then create the new one,
     #    then one to remove, after
-    #  - removing the removal operation altoghether
+    #  - removing the removal operation altogether
     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
 
   def DeclareLocks(self, level):
@@ -5135,12 +6472,16 @@ class LUExportInstance(LogicalUnit):
     self.instance = self.cfg.GetInstanceInfo(instance_name)
     assert self.instance is not None, \
           "Cannot retrieve locked instance %s" % self.op.instance_name
+    _CheckNodeOnline(self, self.instance.primary_node)
 
     self.dst_node = self.cfg.GetNodeInfo(
       self.cfg.ExpandNodeName(self.op.target_node))
 
-    assert self.dst_node is not None, \
-          "Cannot retrieve locked node %s" % self.op.target_node
+    if self.dst_node is None:
+      # This is wrong node name, not a non-locked node
+      raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
+    _CheckNodeOnline(self, self.dst_node.name)
+    _CheckNodeNotDrained(self, self.dst_node.name)
 
     # instance disk type verification
     for disk in self.instance.disks:
@@ -5157,53 +6498,73 @@ class LUExportInstance(LogicalUnit):
     src_node = instance.primary_node
     if self.op.shutdown:
       # shutdown the instance, but not the disks
-      if not self.rpc.call_instance_shutdown(src_node, instance):
-        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
-                                 (instance.name, src_node))
+      result = self.rpc.call_instance_shutdown(src_node, instance)
+      msg = result.RemoteFailMsg()
+      if msg:
+        raise errors.OpExecError("Could not shutdown instance %s on"
+                                 " node %s: %s" %
+                                 (instance.name, src_node, msg))
 
     vgname = self.cfg.GetVGName()
 
     snap_disks = []
 
+    # set the disks ID correctly since call_instance_start needs the
+    # correct drbd minor to create the symlinks
+    for disk in instance.disks:
+      self.cfg.SetDiskID(disk, src_node)
+
+    # per-disk results
+    dresults = []
     try:
-      for disk in instance.disks:
+      for idx, disk in enumerate(instance.disks):
         # new_dev_name will be a snapshot of an lvm leaf of the one we passed
         new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
-
-        if not new_dev_name:
-          self.LogWarning("Could not snapshot block device %s on node %s",
-                          disk.logical_id[1], src_node)
+        if new_dev_name.failed or not new_dev_name.data:
+          self.LogWarning("Could not snapshot disk/%d on node %s",
+                          idx, src_node)
           snap_disks.append(False)
         else:
           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
-                                 logical_id=(vgname, new_dev_name),
-                                 physical_id=(vgname, new_dev_name),
+                                 logical_id=(vgname, new_dev_name.data),
+                                 physical_id=(vgname, new_dev_name.data),
                                  iv_name=disk.iv_name)
           snap_disks.append(new_dev)
 
     finally:
-      if self.op.shutdown and instance.status == "up":
-        if not self.rpc.call_instance_start(src_node, instance, None):
+      if self.op.shutdown and instance.admin_up:
+        result = self.rpc.call_instance_start(src_node, instance, None, None)
+        msg = result.RemoteFailMsg()
+        if msg:
           _ShutdownInstanceDisks(self, instance)
-          raise errors.OpExecError("Could not start instance")
+          raise errors.OpExecError("Could not start instance: %s" % msg)
 
     # TODO: check for size
 
     cluster_name = self.cfg.GetClusterName()
     for idx, dev in enumerate(snap_disks):
       if dev:
-        if not self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
-                                             instance, cluster_name, idx):
-          self.LogWarning("Could not export block device %s from node %s to"
-                          " node %s", dev.logical_id[1], src_node,
-                          dst_node.name)
-        if not self.rpc.call_blockdev_remove(src_node, dev):
-          self.LogWarning("Could not remove snapshot block device %s from node"
-                          " %s", dev.logical_id[1], src_node)
-
-    if not self.rpc.call_finalize_export(dst_node.name, instance, snap_disks):
+        result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
+                                               instance, cluster_name, idx)
+        if result.failed or not result.data:
+          self.LogWarning("Could not export disk/%d from node %s to"
+                          " node %s", idx, src_node, dst_node.name)
+          dresults.append(False)
+        else:
+          dresults.append(True)
+        msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg()
+        if msg:
+          self.LogWarning("Could not remove snapshot for disk/%d from node"
+                          " %s: %s", idx, src_node, msg)
+      else:
+        dresults.append(False)
+
+    result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
+    fin_resu = True
+    if result.failed or not result.data:
       self.LogWarning("Could not finalize export for instance %s on node %s",
                       instance.name, dst_node.name)
+      fin_resu = False
 
     nodelist = self.cfg.GetNodeList()
     nodelist.remove(dst_node.name)
@@ -5214,10 +6575,13 @@ class LUExportInstance(LogicalUnit):
     if nodelist:
       exportlist = self.rpc.call_export_list(nodelist)
       for node in exportlist:
-        if instance.name in exportlist[node]:
+        if exportlist[node].failed:
+          continue
+        if instance.name in exportlist[node].data:
           if not self.rpc.call_export_remove(node, instance.name):
             self.LogWarning("Could not remove older export for instance %s"
                             " on node %s", instance.name, node)
+    return fin_resu, dresults
 
 
 class LURemoveExport(NoHooksLU):
@@ -5255,9 +6619,13 @@ class LURemoveExport(NoHooksLU):
       locking.LEVEL_NODE])
     found = False
     for node in exportlist:
-      if instance_name in exportlist[node]:
+      if exportlist[node].failed:
+        self.LogWarning("Failed to query node %s, continuing" % node)
+        continue
+      if instance_name in exportlist[node].data:
         found = True
-        if not self.rpc.call_export_remove(node, instance_name):
+        result = self.rpc.call_export_remove(node, instance_name)
+        if result.failed or not result.data:
           logging.error("Could not remove export for instance %s"
                         " on node %s", instance_name, node)
 
@@ -5474,9 +6842,10 @@ class LUTestDelay(NoHooksLU):
       if not result:
         raise errors.OpExecError("Complete failure from rpc call")
       for node, node_result in result.items():
-        if not node_result:
+        node_result.Raise()
+        if not node_result.data:
           raise errors.OpExecError("Failure during rpc call to node %s,"
-                                   " result: %s" % (node, node_result))
+                                   " result: %s" % (node, node_result.data))
 
 
 class IAllocator(object):
@@ -5509,6 +6878,7 @@ class IAllocator(object):
     self.name = name
     self.mem_size = self.disks = self.disk_template = None
     self.os = self.tags = self.nics = self.vcpus = None
+    self.hypervisor = None
     self.relocate_from = None
     # computed fields
     self.required_nodes = None
@@ -5542,10 +6912,10 @@ class IAllocator(object):
     cluster_info = cfg.GetClusterInfo()
     # cluster data
     data = {
-      "version": 1,
+      "version": constants.IALLOCATOR_VERSION,
       "cluster_name": cfg.GetClusterName(),
       "cluster_tags": list(cluster_info.GetTags()),
-      "enable_hypervisors": list(cluster_info.enabled_hypervisors),
+      "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
       # we don't have job IDs
       }
     iinfo = cfg.GetAllInstancesInfo().values()
@@ -5556,58 +6926,69 @@ class IAllocator(object):
     node_list = cfg.GetNodeList()
 
     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
-      hypervisor = self.hypervisor
+      hypervisor_name = self.hypervisor
     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
-      hypervisor = cfg.GetInstanceInfo(self.name).hypervisor
+      hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
 
     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
-                                           hypervisor)
+                                           hypervisor_name)
     node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
                        cluster_info.enabled_hypervisors)
-    for nname in node_list:
+    for nname, nresult in node_data.items():
+      # first fill in static (config-based) values
       ninfo = cfg.GetNodeInfo(nname)
-      if nname not in node_data or not isinstance(node_data[nname], dict):
-        raise errors.OpExecError("Can't get data for node %s" % nname)
-      remote_info = node_data[nname]
-      for attr in ['memory_total', 'memory_free', 'memory_dom0',
-                   'vg_size', 'vg_free', 'cpu_total']:
-        if attr not in remote_info:
-          raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
-                                   (nname, attr))
-        try:
-          remote_info[attr] = int(remote_info[attr])
-        except ValueError, err:
-          raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
-                                   " %s" % (nname, attr, str(err)))
-      # compute memory used by primary instances
-      i_p_mem = i_p_up_mem = 0
-      for iinfo, beinfo in i_list:
-        if iinfo.primary_node == nname:
-          i_p_mem += beinfo[constants.BE_MEMORY]
-          if iinfo.name not in node_iinfo[nname]:
-            i_used_mem = 0
-          else:
-            i_used_mem = int(node_iinfo[nname][iinfo.name]['memory'])
-          i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
-          remote_info['memory_free'] -= max(0, i_mem_diff)
-
-          if iinfo.status == "up":
-            i_p_up_mem += beinfo[constants.BE_MEMORY]
-
-      # compute memory used by instances
       pnr = {
         "tags": list(ninfo.GetTags()),
-        "total_memory": remote_info['memory_total'],
-        "reserved_memory": remote_info['memory_dom0'],
-        "free_memory": remote_info['memory_free'],
-        "i_pri_memory": i_p_mem,
-        "i_pri_up_memory": i_p_up_mem,
-        "total_disk": remote_info['vg_size'],
-        "free_disk": remote_info['vg_free'],
         "primary_ip": ninfo.primary_ip,
         "secondary_ip": ninfo.secondary_ip,
-        "total_cpus": remote_info['cpu_total'],
+        "offline": ninfo.offline,
+        "drained": ninfo.drained,
+        "master_candidate": ninfo.master_candidate,
         }
+
+      if not (ninfo.offline or ninfo.drained):
+        nresult.Raise()
+        if not isinstance(nresult.data, dict):
+          raise errors.OpExecError("Can't get data for node %s" % nname)
+        remote_info = nresult.data
+        for attr in ['memory_total', 'memory_free', 'memory_dom0',
+                     'vg_size', 'vg_free', 'cpu_total']:
+          if attr not in remote_info:
+            raise errors.OpExecError("Node '%s' didn't return attribute"
+                                     " '%s'" % (nname, attr))
+          try:
+            remote_info[attr] = int(remote_info[attr])
+          except ValueError, err:
+            raise errors.OpExecError("Node '%s' returned invalid value"
+                                     " for '%s': %s" % (nname, attr, err))
+        # compute memory used by primary instances
+        i_p_mem = i_p_up_mem = 0
+        for iinfo, beinfo in i_list:
+          if iinfo.primary_node == nname:
+            i_p_mem += beinfo[constants.BE_MEMORY]
+            if iinfo.name not in node_iinfo[nname].data:
+              i_used_mem = 0
+            else:
+              i_used_mem = int(node_iinfo[nname].data[iinfo.name]['memory'])
+            i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
+            remote_info['memory_free'] -= max(0, i_mem_diff)
+
+            if iinfo.admin_up:
+              i_p_up_mem += beinfo[constants.BE_MEMORY]
+
+        # compute memory used by instances
+        pnr_dyn = {
+          "total_memory": remote_info['memory_total'],
+          "reserved_memory": remote_info['memory_dom0'],
+          "free_memory": remote_info['memory_free'],
+          "total_disk": remote_info['vg_size'],
+          "free_disk": remote_info['vg_free'],
+          "total_cpus": remote_info['cpu_total'],
+          "i_pri_memory": i_p_mem,
+          "i_pri_up_memory": i_p_up_mem,
+          }
+        pnr.update(pnr_dyn)
+
       node_results[nname] = pnr
     data["nodes"] = node_results
 
@@ -5618,16 +6999,18 @@ class IAllocator(object):
                   for n in iinfo.nics]
       pir = {
         "tags": list(iinfo.GetTags()),
-        "should_run": iinfo.status == "up",
+        "admin_up": iinfo.admin_up,
         "vcpus": beinfo[constants.BE_VCPUS],
         "memory": beinfo[constants.BE_MEMORY],
         "os": iinfo.os,
         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
         "nics": nic_data,
-        "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
+        "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
         "disk_template": iinfo.disk_template,
         "hypervisor": iinfo.hypervisor,
         }
+      pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
+                                                 pir["disks"])
       instance_data[iinfo.name] = pir
 
     data["instances"] = instance_data
@@ -5645,8 +7028,6 @@ class IAllocator(object):
 
     """
     data = self.in_data
-    if len(self.disks) != 2:
-      raise errors.OpExecError("Only two-disk configurations supported")
 
     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
 
@@ -5722,14 +7103,14 @@ class IAllocator(object):
     """
     if call_fn is None:
       call_fn = self.lu.rpc.call_iallocator_runner
-    data = self.in_text
 
     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
+    result.Raise()
 
-    if not isinstance(result, (list, tuple)) or len(result) != 4:
+    if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
       raise errors.OpExecError("Invalid result from master iallocator runner")
 
-    rcode, stdout, stderr, fail = result
+    rcode, stdout, stderr, fail = result.data
 
     if rcode == constants.IARUN_NOTFOUND:
       raise errors.OpExecError("Can't find allocator '%s'" % name)
@@ -5802,8 +7183,6 @@ class LUTestAllocator(NoHooksLU):
                                      " 'nics' parameter")
       if not isinstance(self.op.disks, list):
         raise errors.OpPrereqError("Invalid parameter 'disks'")
-      if len(self.op.disks) != 2:
-        raise errors.OpPrereqError("Only two-disk configurations supported")
       for row in self.op.disks:
         if (not isinstance(row, dict) or
             "size" not in row or
@@ -5812,7 +7191,7 @@ class LUTestAllocator(NoHooksLU):
             row["mode"] not in ['r', 'w']):
           raise errors.OpPrereqError("Invalid contents of the"
                                      " 'disks' parameter")
-      if self.op.hypervisor is None:
+      if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
         self.op.hypervisor = self.cfg.GetHypervisorType()
     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
       if not hasattr(self.op, "name"):