A couple of small fixes to iallocator
[ganeti-local] / lib / cmdlib.py
index ae50552..7da0f87 100644 (file)
@@ -415,6 +415,32 @@ def _CheckOutputFields(static, dynamic, selected):
                                % ",".join(delta))
 
 
                                % ",".join(delta))
 
 
+def _CheckBooleanOpField(op, name):
+  """Validates boolean opcode parameters.
+
+  This will ensure that an opcode parameter is either a boolean value,
+  or None (but that it always exists).
+
+  """
+  val = getattr(op, name, None)
+  if not (val is None or isinstance(val, bool)):
+    raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
+                               (name, str(val)))
+  setattr(op, name, val)
+
+
+def _CheckNodeOnline(lu, node):
+  """Ensure that a given node is online.
+
+  @param lu: the LU on behalf of which we make the check
+  @param node: the node to check
+  @raise errors.OpPrereqError: if the nodes is offline
+
+  """
+  if lu.cfg.GetNodeInfo(node).offline:
+    raise errors.OpPrereqError("Can't use offline node %s" % node)
+
+
 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
                           memory, vcpus, nics):
   """Builds instance related env variables for hooks
 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
                           memory, vcpus, nics):
   """Builds instance related env variables for hooks
@@ -429,8 +455,8 @@ def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
   @param secondary_nodes: list of secondary nodes as strings
   @type os_type: string
   @param os_type: the name of the instance's OS
   @param secondary_nodes: list of secondary nodes as strings
   @type os_type: string
   @param os_type: the name of the instance's OS
-  @type status: string
-  @param status: the desired status of the instances
+  @type status: boolean
+  @param status: the should_run status of the instance
   @type memory: string
   @param memory: the memory size of the instance
   @type vcpus: string
   @type memory: string
   @param memory: the memory size of the instance
   @type vcpus: string
@@ -442,13 +468,17 @@ def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
   @return: the hook environment for this instance
 
   """
   @return: the hook environment for this instance
 
   """
+  if status:
+    str_status = "up"
+  else:
+    str_status = "down"
   env = {
     "OP_TARGET": name,
     "INSTANCE_NAME": name,
     "INSTANCE_PRIMARY": primary_node,
     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
     "INSTANCE_OS_TYPE": os_type,
   env = {
     "OP_TARGET": name,
     "INSTANCE_NAME": name,
     "INSTANCE_PRIMARY": primary_node,
     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
     "INSTANCE_OS_TYPE": os_type,
-    "INSTANCE_STATUS": status,
+    "INSTANCE_STATUS": str_status,
     "INSTANCE_MEMORY": memory,
     "INSTANCE_VCPUS": vcpus,
   }
     "INSTANCE_MEMORY": memory,
     "INSTANCE_VCPUS": vcpus,
   }
@@ -490,7 +520,7 @@ def _BuildInstanceHookEnvByObject(lu, instance, override=None):
     'primary_node': instance.primary_node,
     'secondary_nodes': instance.secondary_nodes,
     'os_type': instance.os,
     'primary_node': instance.primary_node,
     'secondary_nodes': instance.secondary_nodes,
     'os_type': instance.os,
-    'status': instance.os,
+    'status': instance.admin_up,
     'memory': bep[constants.BE_MEMORY],
     'vcpus': bep[constants.BE_VCPUS],
     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
     'memory': bep[constants.BE_MEMORY],
     'vcpus': bep[constants.BE_VCPUS],
     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
@@ -500,14 +530,32 @@ def _BuildInstanceHookEnvByObject(lu, instance, override=None):
   return _BuildInstanceHookEnv(**args)
 
 
   return _BuildInstanceHookEnv(**args)
 
 
+def _AdjustCandidatePool(lu):
+  """Adjust the candidate pool after node operations.
+
+  """
+  mod_list = lu.cfg.MaintainCandidatePool()
+  if mod_list:
+    lu.LogInfo("Promoted nodes to master candidate role: %s",
+               ", ".join(node.name for node in mod_list))
+    for name in mod_list:
+      lu.context.ReaddNode(name)
+  mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
+  if mc_now > mc_max:
+    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
+               (mc_now, mc_max))
+
+
 def _CheckInstanceBridgesExist(lu, instance):
   """Check that the brigdes needed by an instance exist.
 
   """
   # check bridges existance
   brlist = [nic.bridge for nic in instance.nics]
 def _CheckInstanceBridgesExist(lu, instance):
   """Check that the brigdes needed by an instance exist.
 
   """
   # check bridges existance
   brlist = [nic.bridge for nic in instance.nics]
-  if not lu.rpc.call_bridges_exist(instance.primary_node, brlist):
-    raise errors.OpPrereqError("one or more target bridges %s does not"
+  result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
+  result.Raise()
+  if not result.data:
+    raise errors.OpPrereqError("One or more target bridges %s does not"
                                " exist on destination node '%s'" %
                                (brlist, instance.primary_node))
 
                                " exist on destination node '%s'" %
                                (brlist, instance.primary_node))
 
@@ -542,7 +590,9 @@ class LUDestroyCluster(NoHooksLU):
 
     """
     master = self.cfg.GetMasterNode()
 
     """
     master = self.cfg.GetMasterNode()
-    if not self.rpc.call_node_stop_master(master, False):
+    result = self.rpc.call_node_stop_master(master, False)
+    result.Raise()
+    if not result.data:
       raise errors.OpExecError("Could not disable the master role")
     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
     utils.CreateBackup(priv_key)
       raise errors.OpExecError("Could not disable the master role")
     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
     utils.CreateBackup(priv_key)
@@ -566,8 +616,9 @@ class LUVerifyCluster(LogicalUnit):
     }
     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
 
     }
     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
 
-  def _VerifyNode(self, nodeinfo, file_list, local_cksum, vglist, node_result,
-                  remote_version, feedback_fn, master_files):
+  def _VerifyNode(self, nodeinfo, file_list, local_cksum,
+                  node_result, feedback_fn, master_files,
+                  drbd_map):
     """Run multiple tests against a node.
 
     Test list:
     """Run multiple tests against a node.
 
     Test list:
@@ -581,17 +632,24 @@ class LUVerifyCluster(LogicalUnit):
     @param nodeinfo: the node to check
     @param file_list: required list of files
     @param local_cksum: dictionary of local files and their checksums
     @param nodeinfo: the node to check
     @param file_list: required list of files
     @param local_cksum: dictionary of local files and their checksums
-    @type vglist: dict
-    @param vglist: dictionary of volume group names and their size
     @param node_result: the results from the node
     @param node_result: the results from the node
-    @param remote_version: the RPC version from the remote node
     @param feedback_fn: function used to accumulate results
     @param master_files: list of files that only masters should have
     @param feedback_fn: function used to accumulate results
     @param master_files: list of files that only masters should have
+    @param drbd_map: the useddrbd minors for this node, in
+        form of minor: (instance, must_exist) which correspond to instances
+        and their running status
 
     """
     node = nodeinfo.name
 
     """
     node = nodeinfo.name
+
+    # main result, node_result should be a non-empty dict
+    if not node_result or not isinstance(node_result, dict):
+      feedback_fn("  - ERROR: unable to verify node %s." % (node,))
+      return True
+
     # compares ganeti version
     local_version = constants.PROTOCOL_VERSION
     # compares ganeti version
     local_version = constants.PROTOCOL_VERSION
+    remote_version = node_result.get('version', None)
     if not remote_version:
       feedback_fn("  - ERROR: connection to %s failed" % (node))
       return True
     if not remote_version:
       feedback_fn("  - ERROR: connection to %s failed" % (node))
       return True
@@ -604,6 +662,7 @@ class LUVerifyCluster(LogicalUnit):
     # checks vg existance and size > 20G
 
     bad = False
     # checks vg existance and size > 20G
 
     bad = False
+    vglist = node_result.get(constants.NV_VGLIST, None)
     if not vglist:
       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
                       (node,))
     if not vglist:
       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
                       (node,))
@@ -615,18 +674,13 @@ class LUVerifyCluster(LogicalUnit):
         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
         bad = True
 
         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
         bad = True
 
-    if not node_result:
-      feedback_fn("  - ERROR: unable to verify node %s." % (node,))
-      return True
-
     # checks config file checksum
     # checks config file checksum
-    # checks ssh to any
 
 
-    if 'filelist' not in node_result:
+    remote_cksum = node_result.get(constants.NV_FILELIST, None)
+    if not isinstance(remote_cksum, dict):
       bad = True
       feedback_fn("  - ERROR: node hasn't returned file checksum data")
     else:
       bad = True
       feedback_fn("  - ERROR: node hasn't returned file checksum data")
     else:
-      remote_cksum = node_result['filelist']
       for file_name in file_list:
         node_is_mc = nodeinfo.master_candidate
         must_have_file = file_name not in master_files
       for file_name in file_list:
         node_is_mc = nodeinfo.master_candidate
         must_have_file = file_name not in master_files
@@ -649,36 +703,52 @@ class LUVerifyCluster(LogicalUnit):
             feedback_fn("  - ERROR: file '%s' should not exist on non master"
                         " candidates" % file_name)
 
             feedback_fn("  - ERROR: file '%s' should not exist on non master"
                         " candidates" % file_name)
 
-    if 'nodelist' not in node_result:
+    # checks ssh to any
+
+    if constants.NV_NODELIST not in node_result:
       bad = True
       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
     else:
       bad = True
       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
     else:
-      if node_result['nodelist']:
+      if node_result[constants.NV_NODELIST]:
         bad = True
         bad = True
-        for node in node_result['nodelist']:
+        for node in node_result[constants.NV_NODELIST]:
           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
-                          (node, node_result['nodelist'][node]))
-    if 'node-net-test' not in node_result:
+                          (node, node_result[constants.NV_NODELIST][node]))
+
+    if constants.NV_NODENETTEST not in node_result:
       bad = True
       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
     else:
       bad = True
       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
     else:
-      if node_result['node-net-test']:
+      if node_result[constants.NV_NODENETTEST]:
         bad = True
         bad = True
-        nlist = utils.NiceSort(node_result['node-net-test'].keys())
+        nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
         for node in nlist:
           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
         for node in nlist:
           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
-                          (node, node_result['node-net-test'][node]))
+                          (node, node_result[constants.NV_NODENETTEST][node]))
 
 
-    hyp_result = node_result.get('hypervisor', None)
+    hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
     if isinstance(hyp_result, dict):
       for hv_name, hv_result in hyp_result.iteritems():
         if hv_result is not None:
           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
                       (hv_name, hv_result))
     if isinstance(hyp_result, dict):
       for hv_name, hv_result in hyp_result.iteritems():
         if hv_result is not None:
           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
                       (hv_name, hv_result))
+
+    # check used drbd list
+    used_minors = node_result.get(constants.NV_DRBDLIST, [])
+    for minor, (iname, must_exist) in drbd_map.items():
+      if minor not in used_minors and must_exist:
+        feedback_fn("  - ERROR: drbd minor %d of instance %s is not active" %
+                    (minor, iname))
+        bad = True
+    for minor in used_minors:
+      if minor not in drbd_map:
+        feedback_fn("  - ERROR: unallocated drbd minor %d is in use" % minor)
+        bad = True
+
     return bad
 
   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
     return bad
 
   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
-                      node_instance, feedback_fn):
+                      node_instance, feedback_fn, n_offline):
     """Verify an instance.
 
     This function checks to see if the required block devices are
     """Verify an instance.
 
     This function checks to see if the required block devices are
@@ -693,15 +763,19 @@ class LUVerifyCluster(LogicalUnit):
     instanceconfig.MapLVsByNode(node_vol_should)
 
     for node in node_vol_should:
     instanceconfig.MapLVsByNode(node_vol_should)
 
     for node in node_vol_should:
+      if node in n_offline:
+        # ignore missing volumes on offline nodes
+        continue
       for volume in node_vol_should[node]:
         if node not in node_vol_is or volume not in node_vol_is[node]:
           feedback_fn("  - ERROR: volume %s missing on node %s" %
                           (volume, node))
           bad = True
 
       for volume in node_vol_should[node]:
         if node not in node_vol_is or volume not in node_vol_is[node]:
           feedback_fn("  - ERROR: volume %s missing on node %s" %
                           (volume, node))
           bad = True
 
-    if not instanceconfig.status == 'down':
-      if (node_current not in node_instance or
-          not instance in node_instance[node_current]):
+    if instanceconfig.admin_up:
+      if ((node_current not in node_instance or
+          not instance in node_instance[node_current]) and
+          node_current not in n_offline):
         feedback_fn("  - ERROR: instance %s not running on node %s" %
                         (instance, node_current))
         bad = True
         feedback_fn("  - ERROR: instance %s not running on node %s" %
                         (instance, node_current))
         bad = True
@@ -814,8 +888,11 @@ class LUVerifyCluster(LogicalUnit):
     nodelist = utils.NiceSort(self.cfg.GetNodeList())
     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
     nodelist = utils.NiceSort(self.cfg.GetNodeList())
     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
+    instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
+                        for iname in instancelist)
     i_non_redundant = [] # Non redundant instances
     i_non_a_balanced = [] # Non auto-balanced instances
     i_non_redundant = [] # Non redundant instances
     i_non_a_balanced = [] # Non auto-balanced instances
+    n_offline = [] # List of offline nodes
     node_volume = {}
     node_instance = {}
     node_info = {}
     node_volume = {}
     node_instance = {}
     node_info = {}
@@ -827,78 +904,99 @@ class LUVerifyCluster(LogicalUnit):
 
     file_names = ssconf.SimpleStore().GetFileList()
     file_names.append(constants.SSL_CERT_FILE)
 
     file_names = ssconf.SimpleStore().GetFileList()
     file_names.append(constants.SSL_CERT_FILE)
+    file_names.append(constants.RAPI_CERT_FILE)
     file_names.extend(master_files)
 
     local_checksums = utils.FingerprintFiles(file_names)
 
     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
     file_names.extend(master_files)
 
     local_checksums = utils.FingerprintFiles(file_names)
 
     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
-    all_volumeinfo = self.rpc.call_volume_list(nodelist, vg_name)
-    all_instanceinfo = self.rpc.call_instance_list(nodelist, hypervisors)
-    all_vglist = self.rpc.call_vg_list(nodelist)
     node_verify_param = {
     node_verify_param = {
-      'filelist': file_names,
-      'nodelist': nodelist,
-      'hypervisor': hypervisors,
-      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
-                        for node in nodeinfo]
+      constants.NV_FILELIST: file_names,
+      constants.NV_NODELIST: [node.name for node in nodeinfo
+                              if not node.offline],
+      constants.NV_HYPERVISOR: hypervisors,
+      constants.NV_NODENETTEST: [(node.name, node.primary_ip,
+                                  node.secondary_ip) for node in nodeinfo
+                                 if not node.offline],
+      constants.NV_LVLIST: vg_name,
+      constants.NV_INSTANCELIST: hypervisors,
+      constants.NV_VGLIST: None,
+      constants.NV_VERSION: None,
+      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
+      constants.NV_DRBDLIST: None,
       }
     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
                                            self.cfg.GetClusterName())
       }
     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
                                            self.cfg.GetClusterName())
-    all_rversion = self.rpc.call_version(nodelist)
-    all_ninfo = self.rpc.call_node_info(nodelist, self.cfg.GetVGName(),
-                                        self.cfg.GetHypervisorType())
 
     cluster = self.cfg.GetClusterInfo()
     master_node = self.cfg.GetMasterNode()
 
     cluster = self.cfg.GetClusterInfo()
     master_node = self.cfg.GetMasterNode()
+    all_drbd_map = self.cfg.ComputeDRBDMap()
+
     for node_i in nodeinfo:
       node = node_i.name
     for node_i in nodeinfo:
       node = node_i.name
+      nresult = all_nvinfo[node].data
+
+      if node_i.offline:
+        feedback_fn("* Skipping offline node %s" % (node,))
+        n_offline.append(node)
+        continue
+
       if node == master_node:
       if node == master_node:
-        ntype="master"
+        ntype = "master"
       elif node_i.master_candidate:
       elif node_i.master_candidate:
-        ntype="master candidate"
+        ntype = "master candidate"
       else:
       else:
-        ntype="regular"
+        ntype = "regular"
       feedback_fn("* Verifying node %s (%s)" % (node, ntype))
       feedback_fn("* Verifying node %s (%s)" % (node, ntype))
+
+      if all_nvinfo[node].failed or not isinstance(nresult, dict):
+        feedback_fn("  - ERROR: connection to %s failed" % (node,))
+        bad = True
+        continue
+
+      node_drbd = {}
+      for minor, instance in all_drbd_map[node].items():
+        instance = instanceinfo[instance]
+        node_drbd[minor] = (instance.name, instance.admin_up)
       result = self._VerifyNode(node_i, file_names, local_checksums,
       result = self._VerifyNode(node_i, file_names, local_checksums,
-                                all_vglist[node], all_nvinfo[node],
-                                all_rversion[node], feedback_fn, master_files)
+                                nresult, feedback_fn, master_files,
+                                node_drbd)
       bad = bad or result
 
       bad = bad or result
 
-      # node_volume
-      volumeinfo = all_volumeinfo[node]
-
-      if isinstance(volumeinfo, basestring):
+      lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
+      if isinstance(lvdata, basestring):
         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
-                    (node, volumeinfo[-400:].encode('string_escape')))
+                    (node, lvdata.encode('string_escape')))
         bad = True
         node_volume[node] = {}
         bad = True
         node_volume[node] = {}
-      elif not isinstance(volumeinfo, dict):
-        feedback_fn("  - ERROR: connection to %s failed" % (node,))
+      elif not isinstance(lvdata, dict):
+        feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
         bad = True
         continue
       else:
         bad = True
         continue
       else:
-        node_volume[node] = volumeinfo
+        node_volume[node] = lvdata
 
       # node_instance
 
       # node_instance
-      nodeinstance = all_instanceinfo[node]
-      if type(nodeinstance) != list:
-        feedback_fn("  - ERROR: connection to %s failed" % (node,))
+      idata = nresult.get(constants.NV_INSTANCELIST, None)
+      if not isinstance(idata, list):
+        feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
+                    (node,))
         bad = True
         continue
 
         bad = True
         continue
 
-      node_instance[node] = nodeinstance
+      node_instance[node] = idata
 
       # node_info
 
       # node_info
-      nodeinfo = all_ninfo[node]
+      nodeinfo = nresult.get(constants.NV_HVINFO, None)
       if not isinstance(nodeinfo, dict):
       if not isinstance(nodeinfo, dict):
-        feedback_fn("  - ERROR: connection to %s failed" % (node,))
+        feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
         bad = True
         continue
 
       try:
         node_info[node] = {
           "mfree": int(nodeinfo['memory_free']),
         bad = True
         continue
 
       try:
         node_info[node] = {
           "mfree": int(nodeinfo['memory_free']),
-          "dfree": int(nodeinfo['vg_free']),
+          "dfree": int(nresult[constants.NV_VGLIST][vg_name]),
           "pinst": [],
           "sinst": [],
           # dictionary holding all instances this node is secondary for,
           "pinst": [],
           "sinst": [],
           # dictionary holding all instances this node is secondary for,
@@ -918,10 +1016,11 @@ class LUVerifyCluster(LogicalUnit):
 
     for instance in instancelist:
       feedback_fn("* Verifying instance %s" % instance)
 
     for instance in instancelist:
       feedback_fn("* Verifying instance %s" % instance)
-      inst_config = self.cfg.GetInstanceInfo(instance)
+      inst_config = instanceinfo[instance]
       result =  self._VerifyInstance(instance, inst_config, node_volume,
       result =  self._VerifyInstance(instance, inst_config, node_volume,
-                                     node_instance, feedback_fn)
+                                     node_instance, feedback_fn, n_offline)
       bad = bad or result
       bad = bad or result
+      inst_nodes_offline = []
 
       inst_config.MapLVsByNode(node_vol_should)
 
 
       inst_config.MapLVsByNode(node_vol_should)
 
@@ -930,11 +1029,14 @@ class LUVerifyCluster(LogicalUnit):
       pnode = inst_config.primary_node
       if pnode in node_info:
         node_info[pnode]['pinst'].append(instance)
       pnode = inst_config.primary_node
       if pnode in node_info:
         node_info[pnode]['pinst'].append(instance)
-      else:
+      elif pnode not in n_offline:
         feedback_fn("  - ERROR: instance %s, connection to primary node"
                     " %s failed" % (instance, pnode))
         bad = True
 
         feedback_fn("  - ERROR: instance %s, connection to primary node"
                     " %s failed" % (instance, pnode))
         bad = True
 
+      if pnode in n_offline:
+        inst_nodes_offline.append(pnode)
+
       # If the instance is non-redundant we cannot survive losing its primary
       # node, so we are not N+1 compliant. On the other hand we have no disk
       # templates with more than one secondary so that situation is not well
       # If the instance is non-redundant we cannot survive losing its primary
       # node, so we are not N+1 compliant. On the other hand we have no disk
       # templates with more than one secondary so that situation is not well
@@ -955,9 +1057,18 @@ class LUVerifyCluster(LogicalUnit):
           if pnode not in node_info[snode]['sinst-by-pnode']:
             node_info[snode]['sinst-by-pnode'][pnode] = []
           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
           if pnode not in node_info[snode]['sinst-by-pnode']:
             node_info[snode]['sinst-by-pnode'][pnode] = []
           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
-        else:
+        elif snode not in n_offline:
           feedback_fn("  - ERROR: instance %s, connection to secondary node"
                       " %s failed" % (instance, snode))
           feedback_fn("  - ERROR: instance %s, connection to secondary node"
                       " %s failed" % (instance, snode))
+          bad = True
+        if snode in n_offline:
+          inst_nodes_offline.append(snode)
+
+      if inst_nodes_offline:
+        # warn that the instance lives on offline nodes, and set bad=True
+        feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
+                    ", ".join(inst_nodes_offline))
+        bad = True
 
     feedback_fn("* Verifying orphan volumes")
     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
 
     feedback_fn("* Verifying orphan volumes")
     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
@@ -983,6 +1094,9 @@ class LUVerifyCluster(LogicalUnit):
       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
                   % len(i_non_a_balanced))
 
       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
                   % len(i_non_a_balanced))
 
+    if n_offline:
+      feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
+
     return not bad
 
   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
     return not bad
 
   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
@@ -1013,11 +1127,14 @@ class LUVerifyCluster(LogicalUnit):
         for node_name in hooks_results:
           show_node_header = True
           res = hooks_results[node_name]
         for node_name in hooks_results:
           show_node_header = True
           res = hooks_results[node_name]
-          if res is False or not isinstance(res, list):
-            feedback_fn("    Communication failure")
+          if res.failed or res.data is False or not isinstance(res.data, list):
+            if res.offline:
+              # no need to warn or set fail return value
+              continue
+            feedback_fn("    Communication failure in hooks execution")
             lu_result = 1
             continue
             lu_result = 1
             continue
-          for script, hkr, output in res:
+          for script, hkr, output in res.data:
             if hkr == constants.HKR_FAIL:
               # The node header is only shown once, if there are
               # failing hooks on that node
             if hkr == constants.HKR_FAIL:
               # The node header is only shown once, if there are
               # failing hooks on that node
@@ -1068,7 +1185,7 @@ class LUVerifyDisks(NoHooksLU):
     nv_dict = {}
     for inst in instances:
       inst_lvs = {}
     nv_dict = {}
     for inst in instances:
       inst_lvs = {}
-      if (inst.status != "up" or
+      if (not inst.admin_up or
           inst.disk_template not in constants.DTS_NET_MIRROR):
         continue
       inst.MapLVsByNode(inst_lvs)
           inst.disk_template not in constants.DTS_NET_MIRROR):
         continue
       inst.MapLVsByNode(inst_lvs)
@@ -1086,7 +1203,12 @@ class LUVerifyDisks(NoHooksLU):
     for node in nodes:
       # node_volume
       lvs = node_lvs[node]
     for node in nodes:
       # node_volume
       lvs = node_lvs[node]
-
+      if lvs.failed:
+        if not lvs.offline:
+          self.LogWarning("Connection to node %s failed: %s" %
+                          (node, lvs.data))
+        continue
+      lvs = lvs.data
       if isinstance(lvs, basestring):
         logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
         res_nlvm[node] = lvs
       if isinstance(lvs, basestring):
         logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
         res_nlvm[node] = lvs
@@ -1161,31 +1283,33 @@ class LURenameCluster(LogicalUnit):
 
     # shutdown the master IP
     master = self.cfg.GetMasterNode()
 
     # shutdown the master IP
     master = self.cfg.GetMasterNode()
-    if not self.rpc.call_node_stop_master(master, False):
+    result = self.rpc.call_node_stop_master(master, False)
+    if result.failed or not result.data:
       raise errors.OpExecError("Could not disable the master role")
 
     try:
       raise errors.OpExecError("Could not disable the master role")
 
     try:
-      # modify the sstore
-      # TODO: sstore
-      ss.SetKey(ss.SS_MASTER_IP, ip)
-      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
-
-      # Distribute updated ss config to all nodes
-      myself = self.cfg.GetNodeInfo(master)
-      dist_nodes = self.cfg.GetNodeList()
-      if myself.name in dist_nodes:
-        dist_nodes.remove(myself.name)
-
-      logging.debug("Copying updated ssconf data to all nodes")
-      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
-        fname = ss.KeyToFilename(keyname)
-        result = self.rpc.call_upload_file(dist_nodes, fname)
-        for to_node in dist_nodes:
-          if not result[to_node]:
-            self.LogWarning("Copy of file %s to node %s failed",
-                            fname, to_node)
+      cluster = self.cfg.GetClusterInfo()
+      cluster.cluster_name = clustername
+      cluster.master_ip = ip
+      self.cfg.Update(cluster)
+
+      # update the known hosts file
+      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
+      node_list = self.cfg.GetNodeList()
+      try:
+        node_list.remove(master)
+      except ValueError:
+        pass
+      result = self.rpc.call_upload_file(node_list,
+                                         constants.SSH_KNOWN_HOSTS_FILE)
+      for to_node, to_result in result.iteritems():
+        if to_result.failed or not to_result.data:
+          logging.error("Copy of file %s to node %s failed",
+                        constants.SSH_KNOWN_HOSTS_FILE, to_node)
+
     finally:
     finally:
-      if not self.rpc.call_node_start_master(master, False):
+      result = self.rpc.call_node_start_master(master, False)
+      if result.failed or not result.data:
         self.LogWarning("Could not re-enable the master role on"
                         " the master, please restart manually.")
 
         self.LogWarning("Could not re-enable the master role on"
                         " the master, please restart manually.")
 
@@ -1272,16 +1396,21 @@ class LUSetClusterParams(LogicalUnit):
     if self.op.vg_name:
       vglist = self.rpc.call_vg_list(node_list)
       for node in node_list:
     if self.op.vg_name:
       vglist = self.rpc.call_vg_list(node_list)
       for node in node_list:
-        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
+        if vglist[node].failed:
+          # ignoring down node
+          self.LogWarning("Node %s unreachable/error, ignoring" % node)
+          continue
+        vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
+                                              self.op.vg_name,
                                               constants.MIN_VG_SIZE)
         if vgstatus:
           raise errors.OpPrereqError("Error on node '%s': %s" %
                                      (node, vgstatus))
 
     self.cluster = cluster = self.cfg.GetClusterInfo()
                                               constants.MIN_VG_SIZE)
         if vgstatus:
           raise errors.OpPrereqError("Error on node '%s': %s" %
                                      (node, vgstatus))
 
     self.cluster = cluster = self.cfg.GetClusterInfo()
-    # beparams changes do not need validation (we can't validate?),
-    # but we still process here
+    # validate beparams changes
     if self.op.beparams:
     if self.op.beparams:
+      utils.CheckBEParams(self.op.beparams)
       self.new_beparams = cluster.FillDict(
         cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
 
       self.new_beparams = cluster.FillDict(
         cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
 
@@ -1336,26 +1465,34 @@ class LUSetClusterParams(LogicalUnit):
     # we want to update nodes after the cluster so that if any errors
     # happen, we have recorded and saved the cluster info
     if self.op.candidate_pool_size is not None:
     # we want to update nodes after the cluster so that if any errors
     # happen, we have recorded and saved the cluster info
     if self.op.candidate_pool_size is not None:
-      node_info = self.cfg.GetAllNodesInfo().values()
-      num_candidates = len([node for node in node_info
-                            if node.master_candidate])
-      num_nodes = len(node_info)
-      if num_candidates < self.op.candidate_pool_size:
-        random.shuffle(node_info)
-        for node in node_info:
-          if num_candidates >= self.op.candidate_pool_size:
-            break
-          if node.master_candidate:
-            continue
-          node.master_candidate = True
-          self.LogInfo("Promoting node %s to master candidate", node.name)
-          self.cfg.Update(node)
-          self.context.ReaddNode(node)
-          num_candidates += 1
-      elif num_candidates > self.op.candidate_pool_size:
-        self.LogInfo("Note: more nodes are candidates (%d) than the new value"
-                     " of candidate_pool_size (%d)" %
-                     (num_candidates, self.op.candidate_pool_size))
+      _AdjustCandidatePool(self)
+
+
+class LURedistributeConfig(NoHooksLU):
+  """Force the redistribution of cluster configuration.
+
+  This is a very simple LU.
+
+  """
+  _OP_REQP = []
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    self.needed_locks = {
+      locking.LEVEL_NODE: locking.ALL_SET,
+    }
+    self.share_locks[locking.LEVEL_NODE] = 1
+
+  def CheckPrereq(self):
+    """Check prerequisites.
+
+    """
+
+  def Exec(self, feedback_fn):
+    """Redistribute the configuration.
+
+    """
+    self.cfg.Update(self.cfg.GetClusterInfo())
 
 
 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
 
 
 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
@@ -1379,7 +1516,7 @@ def _WaitForSync(lu, instance, oneshot=False, unlock=False):
     done = True
     cumul_degraded = False
     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
     done = True
     cumul_degraded = False
     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
-    if not rstats:
+    if rstats.failed or not rstats.data:
       lu.LogWarning("Can't get any data from node %s", node)
       retries += 1
       if retries >= 10:
       lu.LogWarning("Can't get any data from node %s", node)
       retries += 1
       if retries >= 10:
@@ -1387,9 +1524,9 @@ def _WaitForSync(lu, instance, oneshot=False, unlock=False):
                                  " aborting." % node)
       time.sleep(6)
       continue
                                  " aborting." % node)
       time.sleep(6)
       continue
+    rstats = rstats.data
     retries = 0
     retries = 0
-    for i in range(len(rstats)):
-      mstat = rstats[i]
+    for i, mstat in enumerate(rstats):
       if mstat is None:
         lu.LogWarning("Can't compute data for node %s/%s",
                            node, instance.disks[i].iv_name)
       if mstat is None:
         lu.LogWarning("Can't compute data for node %s/%s",
                            node, instance.disks[i].iv_name)
@@ -1433,11 +1570,11 @@ def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
   result = True
   if on_primary or dev.AssembleOnSecondary():
     rstats = lu.rpc.call_blockdev_find(node, dev)
   result = True
   if on_primary or dev.AssembleOnSecondary():
     rstats = lu.rpc.call_blockdev_find(node, dev)
-    if not rstats:
+    if rstats.failed or not rstats.data:
       logging.warning("Node %s: disk degraded, not found or node down", node)
       result = False
     else:
       logging.warning("Node %s: disk degraded, not found or node down", node)
       result = False
     else:
-      result = result and (not rstats[idx])
+      result = result and (not rstats.data[idx])
   if dev.children:
     for child in dev.children:
       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
   if dev.children:
     for child in dev.children:
       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
@@ -1490,9 +1627,9 @@ class LUDiagnoseOS(NoHooksLU):
     """
     all_os = {}
     for node_name, nr in rlist.iteritems():
     """
     all_os = {}
     for node_name, nr in rlist.iteritems():
-      if not nr:
+      if nr.failed or not nr.data:
         continue
         continue
-      for os_obj in nr:
+      for os_obj in nr.data:
         if os_obj.name not in all_os:
           # build a list of nodes for this os containing empty lists
           # for each node in node_list
         if os_obj.name not in all_os:
           # build a list of nodes for this os containing empty lists
           # for each node in node_list
@@ -1507,10 +1644,12 @@ class LUDiagnoseOS(NoHooksLU):
 
     """
     node_list = self.acquired_locks[locking.LEVEL_NODE]
 
     """
     node_list = self.acquired_locks[locking.LEVEL_NODE]
-    node_data = self.rpc.call_os_diagnose(node_list)
+    valid_nodes = [node for node in self.cfg.GetOnlineNodeList()
+                   if node in node_list]
+    node_data = self.rpc.call_os_diagnose(valid_nodes)
     if node_data == False:
       raise errors.OpExecError("Can't gather the list of OSes")
     if node_data == False:
       raise errors.OpExecError("Can't gather the list of OSes")
-    pol = self._DiagnoseByOS(node_list, node_data)
+    pol = self._DiagnoseByOS(valid_nodes, node_data)
     output = []
     for os_name, os_data in pol.iteritems():
       row = []
     output = []
     for os_name, os_data in pol.iteritems():
       row = []
@@ -1578,11 +1717,8 @@ class LURemoveNode(LogicalUnit):
 
     for instance_name in instance_list:
       instance = self.cfg.GetInstanceInfo(instance_name)
 
     for instance_name in instance_list:
       instance = self.cfg.GetInstanceInfo(instance_name)
-      if node.name == instance.primary_node:
-        raise errors.OpPrereqError("Instance %s still running on the node,"
-                                   " please remove first." % instance_name)
-      if node.name in instance.secondary_nodes:
-        raise errors.OpPrereqError("Instance %s has node as a secondary,"
+      if node.name in instance.all_nodes:
+        raise errors.OpPrereqError("Instance %s is still running on the node,"
                                    " please remove first." % instance_name)
     self.op.node_name = node.name
     self.node = node
                                    " please remove first." % instance_name)
     self.op.node_name = node.name
     self.node = node
@@ -1599,6 +1735,9 @@ class LURemoveNode(LogicalUnit):
 
     self.rpc.call_node_leave_cluster(node.name)
 
 
     self.rpc.call_node_leave_cluster(node.name)
 
+    # Promote nodes to master candidate as needed
+    _AdjustCandidatePool(self)
+
 
 class LUQueryNodes(NoHooksLU):
   """Logical unit for querying nodes.
 
 class LUQueryNodes(NoHooksLU):
   """Logical unit for querying nodes.
@@ -1620,6 +1759,7 @@ class LUQueryNodes(NoHooksLU):
     "serial_no",
     "master_candidate",
     "master",
     "serial_no",
     "master_candidate",
     "master",
+    "offline",
     )
 
   def ExpandNames(self):
     )
 
   def ExpandNames(self):
@@ -1675,8 +1815,9 @@ class LUQueryNodes(NoHooksLU):
       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
                                           self.cfg.GetHypervisorType())
       for name in nodenames:
       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
                                           self.cfg.GetHypervisorType())
       for name in nodenames:
-        nodeinfo = node_data.get(name, None)
-        if nodeinfo:
+        nodeinfo = node_data[name]
+        if not nodeinfo.failed and nodeinfo.data:
+          nodeinfo = nodeinfo.data
           fn = utils.TryConvert
           live_data[name] = {
             "mtotal": fn(int, nodeinfo.get('memory_total', None)),
           fn = utils.TryConvert
           live_data[name] = {
             "mtotal": fn(int, nodeinfo.get('memory_total', None)),
@@ -1738,6 +1879,8 @@ class LUQueryNodes(NoHooksLU):
           val = node.master_candidate
         elif field == "master":
           val = node.name == master_node
           val = node.master_candidate
         elif field == "master":
           val = node.name == master_node
+        elif field == "offline":
+          val = node.offline
         elif self._FIELDS_DYNAMIC.Matches(field):
           val = live_data[node.name].get(field, None)
         else:
         elif self._FIELDS_DYNAMIC.Matches(field):
           val = live_data[node.name].get(field, None)
         else:
@@ -1792,10 +1935,10 @@ class LUQueryNodeVolumes(NoHooksLU):
 
     output = []
     for node in nodenames:
 
     output = []
     for node in nodenames:
-      if node not in volumes or not volumes[node]:
+      if node not in volumes or volumes[node].failed or not volumes[node].data:
         continue
 
         continue
 
-      node_vols = volumes[node][:]
+      node_vols = volumes[node].data[:]
       node_vols.sort(key=lambda vol: vol['dev'])
 
       for vol in node_vols:
       node_vols.sort(key=lambda vol: vol['dev'])
 
       for vol in node_vols:
@@ -1926,9 +2069,15 @@ class LUAddNode(LogicalUnit):
         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
                                    " based ping to noded port")
 
         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
                                    " based ping to noded port")
 
+    cp_size = self.cfg.GetClusterInfo().candidate_pool_size
+    mc_now, _ = self.cfg.GetMasterCandidateStats()
+    master_candidate = mc_now < cp_size
+
     self.new_node = objects.Node(name=node,
                                  primary_ip=primary_ip,
     self.new_node = objects.Node(name=node,
                                  primary_ip=primary_ip,
-                                 secondary_ip=secondary_ip)
+                                 secondary_ip=secondary_ip,
+                                 master_candidate=master_candidate,
+                                 offline=False)
 
   def Exec(self, feedback_fn):
     """Adds the new node to the cluster.
 
   def Exec(self, feedback_fn):
     """Adds the new node to the cluster.
@@ -1939,14 +2088,15 @@ class LUAddNode(LogicalUnit):
 
     # check connectivity
     result = self.rpc.call_version([node])[node]
 
     # check connectivity
     result = self.rpc.call_version([node])[node]
-    if result:
-      if constants.PROTOCOL_VERSION == result:
+    result.Raise()
+    if result.data:
+      if constants.PROTOCOL_VERSION == result.data:
         logging.info("Communication to node %s fine, sw version %s match",
         logging.info("Communication to node %s fine, sw version %s match",
-                     node, result)
+                     node, result.data)
       else:
         raise errors.OpExecError("Version mismatch master version %s,"
                                  " node version %s" %
       else:
         raise errors.OpExecError("Version mismatch master version %s,"
                                  " node version %s" %
-                                 (constants.PROTOCOL_VERSION, result))
+                                 (constants.PROTOCOL_VERSION, result.data))
     else:
       raise errors.OpExecError("Cannot get version from the new node")
 
     else:
       raise errors.OpExecError("Cannot get version from the new node")
 
@@ -1969,15 +2119,16 @@ class LUAddNode(LogicalUnit):
                                     keyarray[2],
                                     keyarray[3], keyarray[4], keyarray[5])
 
                                     keyarray[2],
                                     keyarray[3], keyarray[4], keyarray[5])
 
-    if not result:
+    if result.failed or not result.data:
       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
 
     # Add node to our /etc/hosts, and add key to known_hosts
     utils.AddHostToEtcHosts(new_node.name)
 
     if new_node.secondary_ip != new_node.primary_ip:
       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
 
     # Add node to our /etc/hosts, and add key to known_hosts
     utils.AddHostToEtcHosts(new_node.name)
 
     if new_node.secondary_ip != new_node.primary_ip:
-      if not self.rpc.call_node_has_ip_address(new_node.name,
-                                               new_node.secondary_ip):
+      result = self.rpc.call_node_has_ip_address(new_node.name,
+                                                 new_node.secondary_ip)
+      if result.failed or not result.data:
         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
                                  " you gave (%s). Please fix and re-run this"
                                  " command." % new_node.secondary_ip)
         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
                                  " you gave (%s). Please fix and re-run this"
                                  " command." % new_node.secondary_ip)
@@ -1991,11 +2142,11 @@ class LUAddNode(LogicalUnit):
     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
                                        self.cfg.GetClusterName())
     for verifier in node_verify_list:
     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
                                        self.cfg.GetClusterName())
     for verifier in node_verify_list:
-      if not result[verifier]:
+      if result[verifier].failed or not result[verifier].data:
         raise errors.OpExecError("Cannot communicate with %s's node daemon"
                                  " for remote verification" % verifier)
         raise errors.OpExecError("Cannot communicate with %s's node daemon"
                                  " for remote verification" % verifier)
-      if result[verifier]['nodelist']:
-        for failed in result[verifier]['nodelist']:
+      if result[verifier].data['nodelist']:
+        for failed in result[verifier].data['nodelist']:
           feedback_fn("ssh/hostname verification failed %s -> %s" %
                       (verifier, result[verifier]['nodelist'][failed]))
         raise errors.OpExecError("ssh/hostname verification failed.")
           feedback_fn("ssh/hostname verification failed %s -> %s" %
                       (verifier, result[verifier]['nodelist'][failed]))
         raise errors.OpExecError("ssh/hostname verification failed.")
@@ -2012,8 +2163,8 @@ class LUAddNode(LogicalUnit):
     logging.debug("Copying hosts and known_hosts to all nodes")
     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
       result = self.rpc.call_upload_file(dist_nodes, fname)
     logging.debug("Copying hosts and known_hosts to all nodes")
     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
       result = self.rpc.call_upload_file(dist_nodes, fname)
-      for to_node in dist_nodes:
-        if not result[to_node]:
+      for to_node, to_result in result.iteritems():
+        if to_result.failed or not to_result.data:
           logging.error("Copy of file %s to node %s failed", fname, to_node)
 
     to_copy = []
           logging.error("Copy of file %s to node %s failed", fname, to_node)
 
     to_copy = []
@@ -2021,7 +2172,7 @@ class LUAddNode(LogicalUnit):
       to_copy.append(constants.VNC_PASSWORD_FILE)
     for fname in to_copy:
       result = self.rpc.call_upload_file([node], fname)
       to_copy.append(constants.VNC_PASSWORD_FILE)
     for fname in to_copy:
       result = self.rpc.call_upload_file([node], fname)
-      if not result[node]:
+      if result[node].failed or not result[node]:
         logging.error("Could not copy file %s to node %s", fname, node)
 
     if self.op.readd:
         logging.error("Could not copy file %s to node %s", fname, node)
 
     if self.op.readd:
@@ -2044,9 +2195,13 @@ class LUSetNodeParams(LogicalUnit):
     if node_name is None:
       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
     self.op.node_name = node_name
     if node_name is None:
       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
     self.op.node_name = node_name
-    if not hasattr(self.op, 'master_candidate'):
+    _CheckBooleanOpField(self.op, 'master_candidate')
+    _CheckBooleanOpField(self.op, 'offline')
+    if self.op.master_candidate is None and self.op.offline is None:
       raise errors.OpPrereqError("Please pass at least one modification")
       raise errors.OpPrereqError("Please pass at least one modification")
-    self.op.master_candidate = bool(self.op.master_candidate)
+    if self.op.offline == True and self.op.master_candidate == True:
+      raise errors.OpPrereqError("Can't set the node into offline and"
+                                 " master_candidate at the same time")
 
   def ExpandNames(self):
     self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
 
   def ExpandNames(self):
     self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
@@ -2060,6 +2215,7 @@ class LUSetNodeParams(LogicalUnit):
     env = {
       "OP_TARGET": self.op.node_name,
       "MASTER_CANDIDATE": str(self.op.master_candidate),
     env = {
       "OP_TARGET": self.op.node_name,
       "MASTER_CANDIDATE": str(self.op.master_candidate),
+      "OFFLINE": str(self.op.offline),
       }
     nl = [self.cfg.GetMasterNode(),
           self.op.node_name]
       }
     nl = [self.cfg.GetMasterNode(),
           self.op.node_name]
@@ -2071,39 +2227,62 @@ class LUSetNodeParams(LogicalUnit):
     This only checks the instance list against the existing names.
 
     """
     This only checks the instance list against the existing names.
 
     """
-    force = self.force = self.op.force
-
-    if self.op.master_candidate == False:
+    node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
+
+    if ((self.op.master_candidate == False or self.op.offline == True)
+        and node.master_candidate):
+      # we will demote the node from master_candidate
+      if self.op.node_name == self.cfg.GetMasterNode():
+        raise errors.OpPrereqError("The master node has to be a"
+                                   " master candidate and online")
       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
-      node_info = self.cfg.GetAllNodesInfo().values()
-      num_candidates = len([node for node in node_info
-                            if node.master_candidate])
+      num_candidates, _ = self.cfg.GetMasterCandidateStats()
       if num_candidates <= cp_size:
         msg = ("Not enough master candidates (desired"
                " %d, new value will be %d)" % (cp_size, num_candidates-1))
       if num_candidates <= cp_size:
         msg = ("Not enough master candidates (desired"
                " %d, new value will be %d)" % (cp_size, num_candidates-1))
-        if force:
+        if self.op.force:
           self.LogWarning(msg)
         else:
           raise errors.OpPrereqError(msg)
 
           self.LogWarning(msg)
         else:
           raise errors.OpPrereqError(msg)
 
+    if (self.op.master_candidate == True and node.offline and
+        not self.op.offline == False):
+      raise errors.OpPrereqError("Can't set an offline node to"
+                                 " master_candidate")
+
     return
 
   def Exec(self, feedback_fn):
     """Modifies a node.
 
     """
     return
 
   def Exec(self, feedback_fn):
     """Modifies a node.
 
     """
-    node = self.cfg.GetNodeInfo(self.op.node_name)
+    node = self.node
 
     result = []
 
 
     result = []
 
+    if self.op.offline is not None:
+      node.offline = self.op.offline
+      result.append(("offline", str(self.op.offline)))
+      if self.op.offline == True and node.master_candidate:
+        node.master_candidate = False
+        result.append(("master_candidate", "auto-demotion due to offline"))
+
     if self.op.master_candidate is not None:
       node.master_candidate = self.op.master_candidate
       result.append(("master_candidate", str(self.op.master_candidate)))
     if self.op.master_candidate is not None:
       node.master_candidate = self.op.master_candidate
       result.append(("master_candidate", str(self.op.master_candidate)))
+      if self.op.master_candidate == False:
+        rrc = self.rpc.call_node_demote_from_mc(node.name)
+        if (rrc.failed or not isinstance(rrc.data, (tuple, list))
+            or len(rrc.data) != 2):
+          self.LogWarning("Node rpc error: %s" % rrc.error)
+        elif not rrc.data[0]:
+          self.LogWarning("Node failed to demote itself: %s" % rrc.data[1])
 
     # this will trigger configuration file update, if needed
     self.cfg.Update(node)
     # this will trigger job queue propagation or cleanup
 
     # this will trigger configuration file update, if needed
     self.cfg.Update(node)
     # this will trigger job queue propagation or cleanup
-    self.context.ReaddNode(node)
+    if self.op.node_name != self.cfg.GetMasterNode():
+      self.context.ReaddNode(node)
 
     return result
 
 
     return result
 
@@ -2213,6 +2392,7 @@ class LUActivateInstanceDisks(NoHooksLU):
     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
     assert self.instance is not None, \
       "Cannot retrieve locked instance %s" % self.op.instance_name
     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
     assert self.instance is not None, \
       "Cannot retrieve locked instance %s" % self.op.instance_name
+    _CheckNodeOnline(self, self.instance.primary_node)
 
   def Exec(self, feedback_fn):
     """Activate the disks.
 
   def Exec(self, feedback_fn):
     """Activate the disks.
@@ -2259,7 +2439,7 @@ def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
       lu.cfg.SetDiskID(node_disk, node)
       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
       lu.cfg.SetDiskID(node_disk, node)
       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
-      if not result:
+      if result.failed or not result:
         lu.proc.LogWarning("Could not prepare block device %s on node %s"
                            " (is_primary=False, pass=1)",
                            inst_disk.iv_name, node)
         lu.proc.LogWarning("Could not prepare block device %s on node %s"
                            " (is_primary=False, pass=1)",
                            inst_disk.iv_name, node)
@@ -2275,12 +2455,12 @@ def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
         continue
       lu.cfg.SetDiskID(node_disk, node)
       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
         continue
       lu.cfg.SetDiskID(node_disk, node)
       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
-      if not result:
+      if result.failed or not result:
         lu.proc.LogWarning("Could not prepare block device %s on node %s"
                            " (is_primary=True, pass=2)",
                            inst_disk.iv_name, node)
         disks_ok = False
         lu.proc.LogWarning("Could not prepare block device %s on node %s"
                            " (is_primary=True, pass=2)",
                            inst_disk.iv_name, node)
         disks_ok = False
-    device_info.append((instance.primary_node, inst_disk.iv_name, result))
+    device_info.append((instance.primary_node, inst_disk.iv_name, result.data))
 
   # leave the disks configured for the primary node
   # this is a workaround that would be fixed better by
 
   # leave the disks configured for the primary node
   # this is a workaround that would be fixed better by
@@ -2350,11 +2530,11 @@ def _SafeShutdownInstanceDisks(lu, instance):
   ins_l = lu.rpc.call_instance_list([instance.primary_node],
                                       [instance.hypervisor])
   ins_l = ins_l[instance.primary_node]
   ins_l = lu.rpc.call_instance_list([instance.primary_node],
                                       [instance.hypervisor])
   ins_l = ins_l[instance.primary_node]
-  if not type(ins_l) is list:
+  if ins_l.failed or not isinstance(ins_l.data, list):
     raise errors.OpExecError("Can't contact node '%s'" %
                              instance.primary_node)
 
     raise errors.OpExecError("Can't contact node '%s'" %
                              instance.primary_node)
 
-  if instance.name in ins_l:
+  if instance.name in ins_l.data:
     raise errors.OpExecError("Instance is running, can't shutdown"
                              " block devices.")
 
     raise errors.OpExecError("Instance is running, can't shutdown"
                              " block devices.")
 
@@ -2374,7 +2554,8 @@ def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
   for disk in instance.disks:
     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
       lu.cfg.SetDiskID(top_disk, node)
   for disk in instance.disks:
     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
       lu.cfg.SetDiskID(top_disk, node)
-      if not lu.rpc.call_blockdev_shutdown(node, top_disk):
+      result = lu.rpc.call_blockdev_shutdown(node, top_disk)
+      if result.failed or not result.data:
         logging.error("Could not shutdown block device %s on node %s",
                       disk.iv_name, node)
         if not ignore_primary or node != instance.primary_node:
         logging.error("Could not shutdown block device %s on node %s",
                       disk.iv_name, node)
         if not ignore_primary or node != instance.primary_node:
@@ -2382,7 +2563,7 @@ def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
   return result
 
 
   return result
 
 
-def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
+def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
   """Checks if a node has enough free memory.
 
   This function check if a given node has the needed amount of free
   """Checks if a node has enough free memory.
 
   This function check if a given node has the needed amount of free
@@ -2398,18 +2579,15 @@ def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
   @param reason: string to use in the error message
   @type requested: C{int}
   @param requested: the amount of memory in MiB to check for
   @param reason: string to use in the error message
   @type requested: C{int}
   @param requested: the amount of memory in MiB to check for
-  @type hypervisor: C{str}
-  @param hypervisor: the hypervisor to ask for memory stats
+  @type hypervisor_name: C{str}
+  @param hypervisor_name: the hypervisor to ask for memory stats
   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
       we cannot check the node
 
   """
   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
       we cannot check the node
 
   """
-  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
-  if not nodeinfo or not isinstance(nodeinfo, dict):
-    raise errors.OpPrereqError("Could not contact node %s for resource"
-                             " information" % (node,))
-
-  free_mem = nodeinfo[node].get('memory_free')
+  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
+  nodeinfo[node].Raise()
+  free_mem = nodeinfo[node].data.get('memory_free')
   if not isinstance(free_mem, int):
     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
                              " was '%s'" % (node, free_mem))
   if not isinstance(free_mem, int):
     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
                              " was '%s'" % (node, free_mem))
@@ -2441,8 +2619,7 @@ class LUStartupInstance(LogicalUnit):
       "FORCE": self.op.force,
       }
     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
       "FORCE": self.op.force,
       }
     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
-    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
-          list(self.instance.secondary_nodes))
+    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
     return env, nl, nl
 
   def CheckPrereq(self):
     return env, nl, nl
 
   def CheckPrereq(self):
@@ -2455,6 +2632,8 @@ class LUStartupInstance(LogicalUnit):
     assert self.instance is not None, \
       "Cannot retrieve locked instance %s" % self.op.instance_name
 
     assert self.instance is not None, \
       "Cannot retrieve locked instance %s" % self.op.instance_name
 
+    _CheckNodeOnline(self, instance.primary_node)
+
     bep = self.cfg.GetClusterInfo().FillBE(instance)
     # check bridges existance
     _CheckInstanceBridgesExist(self, instance)
     bep = self.cfg.GetClusterInfo().FillBE(instance)
     # check bridges existance
     _CheckInstanceBridgesExist(self, instance)
@@ -2477,9 +2656,11 @@ class LUStartupInstance(LogicalUnit):
 
     _StartInstanceDisks(self, instance, force)
 
 
     _StartInstanceDisks(self, instance, force)
 
-    if not self.rpc.call_instance_start(node_current, instance, extra_args):
+    result = self.rpc.call_instance_start(node_current, instance, extra_args)
+    msg = result.RemoteFailMsg()
+    if msg:
       _ShutdownInstanceDisks(self, instance)
       _ShutdownInstanceDisks(self, instance)
-      raise errors.OpExecError("Could not start instance")
+      raise errors.OpExecError("Could not start instance: %s" % msg)
 
 
 class LURebootInstance(LogicalUnit):
 
 
 class LURebootInstance(LogicalUnit):
@@ -2511,8 +2692,7 @@ class LURebootInstance(LogicalUnit):
       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
       }
     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
       }
     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
-    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
-          list(self.instance.secondary_nodes))
+    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
     return env, nl, nl
 
   def CheckPrereq(self):
     return env, nl, nl
 
   def CheckPrereq(self):
@@ -2525,6 +2705,8 @@ class LURebootInstance(LogicalUnit):
     assert self.instance is not None, \
       "Cannot retrieve locked instance %s" % self.op.instance_name
 
     assert self.instance is not None, \
       "Cannot retrieve locked instance %s" % self.op.instance_name
 
+    _CheckNodeOnline(self, instance.primary_node)
+
     # check bridges existance
     _CheckInstanceBridgesExist(self, instance)
 
     # check bridges existance
     _CheckInstanceBridgesExist(self, instance)
 
@@ -2541,17 +2723,21 @@ class LURebootInstance(LogicalUnit):
 
     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
                        constants.INSTANCE_REBOOT_HARD]:
 
     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
                        constants.INSTANCE_REBOOT_HARD]:
-      if not self.rpc.call_instance_reboot(node_current, instance,
-                                           reboot_type, extra_args):
+      result = self.rpc.call_instance_reboot(node_current, instance,
+                                             reboot_type, extra_args)
+      if result.failed or not result.data:
         raise errors.OpExecError("Could not reboot instance")
     else:
       if not self.rpc.call_instance_shutdown(node_current, instance):
         raise errors.OpExecError("could not shutdown instance for full reboot")
       _ShutdownInstanceDisks(self, instance)
       _StartInstanceDisks(self, instance, ignore_secondaries)
         raise errors.OpExecError("Could not reboot instance")
     else:
       if not self.rpc.call_instance_shutdown(node_current, instance):
         raise errors.OpExecError("could not shutdown instance for full reboot")
       _ShutdownInstanceDisks(self, instance)
       _StartInstanceDisks(self, instance, ignore_secondaries)
-      if not self.rpc.call_instance_start(node_current, instance, extra_args):
+      result = self.rpc.call_instance_start(node_current, instance, extra_args)
+      msg = result.RemoteFailMsg()
+      if msg:
         _ShutdownInstanceDisks(self, instance)
         _ShutdownInstanceDisks(self, instance)
-        raise errors.OpExecError("Could not start instance for full reboot")
+        raise errors.OpExecError("Could not start instance for"
+                                 " full reboot: %s" % msg)
 
     self.cfg.MarkInstanceUp(instance.name)
 
 
     self.cfg.MarkInstanceUp(instance.name)
 
@@ -2575,8 +2761,7 @@ class LUShutdownInstance(LogicalUnit):
 
     """
     env = _BuildInstanceHookEnvByObject(self, self.instance)
 
     """
     env = _BuildInstanceHookEnvByObject(self, self.instance)
-    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
-          list(self.instance.secondary_nodes))
+    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
     return env, nl, nl
 
   def CheckPrereq(self):
     return env, nl, nl
 
   def CheckPrereq(self):
@@ -2588,6 +2773,7 @@ class LUShutdownInstance(LogicalUnit):
     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
     assert self.instance is not None, \
       "Cannot retrieve locked instance %s" % self.op.instance_name
     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
     assert self.instance is not None, \
       "Cannot retrieve locked instance %s" % self.op.instance_name
+    _CheckNodeOnline(self, self.instance.primary_node)
 
   def Exec(self, feedback_fn):
     """Shutdown the instance.
 
   def Exec(self, feedback_fn):
     """Shutdown the instance.
@@ -2596,7 +2782,8 @@ class LUShutdownInstance(LogicalUnit):
     instance = self.instance
     node_current = instance.primary_node
     self.cfg.MarkInstanceDown(instance.name)
     instance = self.instance
     node_current = instance.primary_node
     self.cfg.MarkInstanceDown(instance.name)
-    if not self.rpc.call_instance_shutdown(node_current, instance):
+    result = self.rpc.call_instance_shutdown(node_current, instance)
+    if result.failed or not result.data:
       self.proc.LogWarning("Could not shutdown instance")
 
     _ShutdownInstanceDisks(self, instance)
       self.proc.LogWarning("Could not shutdown instance")
 
     _ShutdownInstanceDisks(self, instance)
@@ -2621,8 +2808,7 @@ class LUReinstallInstance(LogicalUnit):
 
     """
     env = _BuildInstanceHookEnvByObject(self, self.instance)
 
     """
     env = _BuildInstanceHookEnvByObject(self, self.instance)
-    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
-          list(self.instance.secondary_nodes))
+    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
     return env, nl, nl
 
   def CheckPrereq(self):
     return env, nl, nl
 
   def CheckPrereq(self):
@@ -2634,17 +2820,18 @@ class LUReinstallInstance(LogicalUnit):
     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
     assert instance is not None, \
       "Cannot retrieve locked instance %s" % self.op.instance_name
     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
     assert instance is not None, \
       "Cannot retrieve locked instance %s" % self.op.instance_name
+    _CheckNodeOnline(self, instance.primary_node)
 
     if instance.disk_template == constants.DT_DISKLESS:
       raise errors.OpPrereqError("Instance '%s' has no disks" %
                                  self.op.instance_name)
 
     if instance.disk_template == constants.DT_DISKLESS:
       raise errors.OpPrereqError("Instance '%s' has no disks" %
                                  self.op.instance_name)
-    if instance.status != "down":
+    if instance.admin_up:
       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
                                  self.op.instance_name)
     remote_info = self.rpc.call_instance_info(instance.primary_node,
                                               instance.name,
                                               instance.hypervisor)
       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
                                  self.op.instance_name)
     remote_info = self.rpc.call_instance_info(instance.primary_node,
                                               instance.name,
                                               instance.hypervisor)
-    if remote_info:
+    if remote_info.failed or remote_info.data:
       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
                                  (self.op.instance_name,
                                   instance.primary_node))
       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
                                  (self.op.instance_name,
                                   instance.primary_node))
@@ -2657,8 +2844,9 @@ class LUReinstallInstance(LogicalUnit):
       if pnode is None:
         raise errors.OpPrereqError("Primary node '%s' is unknown" %
                                    self.op.pnode)
       if pnode is None:
         raise errors.OpPrereqError("Primary node '%s' is unknown" %
                                    self.op.pnode)
-      os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
-      if not os_obj:
+      result = self.rpc.call_os_get(pnode.name, self.op.os_type)
+      result.Raise()
+      if not isinstance(result.data, objects.OS):
         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
                                    " primary node"  % self.op.os_type)
 
         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
                                    " primary node"  % self.op.os_type)
 
@@ -2678,10 +2866,12 @@ class LUReinstallInstance(LogicalUnit):
     _StartInstanceDisks(self, inst, None)
     try:
       feedback_fn("Running the instance OS create scripts...")
     _StartInstanceDisks(self, inst, None)
     try:
       feedback_fn("Running the instance OS create scripts...")
-      if not self.rpc.call_instance_os_add(inst.primary_node, inst):
+      result = self.rpc.call_instance_os_add(inst.primary_node, inst)
+      msg = result.RemoteFailMsg()
+      if msg:
         raise errors.OpExecError("Could not install OS for instance %s"
         raise errors.OpExecError("Could not install OS for instance %s"
-                                 " on node %s" %
-                                 (inst.name, inst.primary_node))
+                                 " on node %s: %s" %
+                                 (inst.name, inst.primary_node, msg))
     finally:
       _ShutdownInstanceDisks(self, inst)
 
     finally:
       _ShutdownInstanceDisks(self, inst)
 
@@ -2702,8 +2892,7 @@ class LURenameInstance(LogicalUnit):
     """
     env = _BuildInstanceHookEnvByObject(self, self.instance)
     env["INSTANCE_NEW_NAME"] = self.op.new_name
     """
     env = _BuildInstanceHookEnvByObject(self, self.instance)
     env["INSTANCE_NEW_NAME"] = self.op.new_name
-    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
-          list(self.instance.secondary_nodes))
+    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
     return env, nl, nl
 
   def CheckPrereq(self):
     return env, nl, nl
 
   def CheckPrereq(self):
@@ -2717,13 +2906,16 @@ class LURenameInstance(LogicalUnit):
     if instance is None:
       raise errors.OpPrereqError("Instance '%s' not known" %
                                  self.op.instance_name)
     if instance is None:
       raise errors.OpPrereqError("Instance '%s' not known" %
                                  self.op.instance_name)
-    if instance.status != "down":
+    _CheckNodeOnline(self, instance.primary_node)
+
+    if instance.admin_up:
       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
                                  self.op.instance_name)
     remote_info = self.rpc.call_instance_info(instance.primary_node,
                                               instance.name,
                                               instance.hypervisor)
       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
                                  self.op.instance_name)
     remote_info = self.rpc.call_instance_info(instance.primary_node,
                                               instance.name,
                                               instance.hypervisor)
-    if remote_info:
+    remote_info.Raise()
+    if remote_info.data:
       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
                                  (self.op.instance_name,
                                   instance.primary_node))
       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
                                  (self.op.instance_name,
                                   instance.primary_node))
@@ -2767,15 +2959,15 @@ class LURenameInstance(LogicalUnit):
       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
                                                      old_file_storage_dir,
                                                      new_file_storage_dir)
       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
                                                      old_file_storage_dir,
                                                      new_file_storage_dir)
-
-      if not result:
+      result.Raise()
+      if not result.data:
         raise errors.OpExecError("Could not connect to node '%s' to rename"
                                  " directory '%s' to '%s' (but the instance"
                                  " has been renamed in Ganeti)" % (
                                  inst.primary_node, old_file_storage_dir,
                                  new_file_storage_dir))
 
         raise errors.OpExecError("Could not connect to node '%s' to rename"
                                  " directory '%s' to '%s' (but the instance"
                                  " has been renamed in Ganeti)" % (
                                  inst.primary_node, old_file_storage_dir,
                                  new_file_storage_dir))
 
-      if not result[0]:
+      if not result.data[0]:
         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
                                  " (but the instance has been renamed in"
                                  " Ganeti)" % (old_file_storage_dir,
         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
                                  " (but the instance has been renamed in"
                                  " Ganeti)" % (old_file_storage_dir,
@@ -2783,11 +2975,13 @@ class LURenameInstance(LogicalUnit):
 
     _StartInstanceDisks(self, inst, None)
     try:
 
     _StartInstanceDisks(self, inst, None)
     try:
-      if not self.rpc.call_instance_run_rename(inst.primary_node, inst,
-                                               old_name):
+      result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
+                                                 old_name)
+      msg = result.RemoteFailMsg()
+      if msg:
         msg = ("Could not run OS rename script for instance %s on node %s"
         msg = ("Could not run OS rename script for instance %s on node %s"
-               " (but the instance has been renamed in Ganeti)" %
-               (inst.name, inst.primary_node))
+               " (but the instance has been renamed in Ganeti): %s" %
+               (inst.name, inst.primary_node, msg))
         self.proc.LogWarning(msg)
     finally:
       _ShutdownInstanceDisks(self, inst)
         self.proc.LogWarning(msg)
     finally:
       _ShutdownInstanceDisks(self, inst)
@@ -2839,7 +3033,8 @@ class LURemoveInstance(LogicalUnit):
     logging.info("Shutting down instance %s on node %s",
                  instance.name, instance.primary_node)
 
     logging.info("Shutting down instance %s on node %s",
                  instance.name, instance.primary_node)
 
-    if not self.rpc.call_instance_shutdown(instance.primary_node, instance):
+    result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
+    if result.failed or not result.data:
       if self.op.ignore_failures:
         feedback_fn("Warning: can't shutdown instance")
       else:
       if self.op.ignore_failures:
         feedback_fn("Warning: can't shutdown instance")
       else:
@@ -2940,16 +3135,21 @@ class LUQueryInstances(NoHooksLU):
     hv_list = list(set([inst.hypervisor for inst in instance_list]))
 
     bad_nodes = []
     hv_list = list(set([inst.hypervisor for inst in instance_list]))
 
     bad_nodes = []
+    off_nodes = []
     if self.do_locking:
       live_data = {}
       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
       for name in nodes:
         result = node_data[name]
     if self.do_locking:
       live_data = {}
       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
       for name in nodes:
         result = node_data[name]
-        if result:
-          live_data.update(result)
-        elif result == False:
+        if result.offline:
+          # offline nodes will be in both lists
+          off_nodes.append(name)
+        if result.failed:
           bad_nodes.append(name)
           bad_nodes.append(name)
-        # else no instance is alive
+        else:
+          if result.data:
+            live_data.update(result.data)
+            # else no instance is alive
     else:
       live_data = dict([(name, {}) for name in instance_names])
 
     else:
       live_data = dict([(name, {}) for name in instance_names])
 
@@ -2973,24 +3173,26 @@ class LUQueryInstances(NoHooksLU):
         elif field == "snodes":
           val = list(instance.secondary_nodes)
         elif field == "admin_state":
         elif field == "snodes":
           val = list(instance.secondary_nodes)
         elif field == "admin_state":
-          val = (instance.status != "down")
+          val = instance.admin_up
         elif field == "oper_state":
           if instance.primary_node in bad_nodes:
             val = None
           else:
             val = bool(live_data.get(instance.name))
         elif field == "status":
         elif field == "oper_state":
           if instance.primary_node in bad_nodes:
             val = None
           else:
             val = bool(live_data.get(instance.name))
         elif field == "status":
-          if instance.primary_node in bad_nodes:
+          if instance.primary_node in off_nodes:
+            val = "ERROR_nodeoffline"
+          elif instance.primary_node in bad_nodes:
             val = "ERROR_nodedown"
           else:
             running = bool(live_data.get(instance.name))
             if running:
             val = "ERROR_nodedown"
           else:
             running = bool(live_data.get(instance.name))
             if running:
-              if instance.status != "down":
+              if instance.admin_up:
                 val = "running"
               else:
                 val = "ERROR_up"
             else:
                 val = "running"
               else:
                 val = "ERROR_up"
             else:
-              if instance.status != "down":
+              if instance.admin_up:
                 val = "ERROR_down"
               else:
                 val = "ADMIN_down"
                 val = "ERROR_down"
               else:
                 val = "ADMIN_down"
@@ -3133,6 +3335,7 @@ class LUFailoverInstance(LogicalUnit):
                                    "a mirrored disk template")
 
     target_node = secondary_nodes[0]
                                    "a mirrored disk template")
 
     target_node = secondary_nodes[0]
+    _CheckNodeOnline(self, target_node)
     # check memory requirements on the secondary node
     _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
                          instance.name, bep[constants.BE_MEMORY],
     # check memory requirements on the secondary node
     _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
                          instance.name, bep[constants.BE_MEMORY],
@@ -3140,7 +3343,9 @@ class LUFailoverInstance(LogicalUnit):
 
     # check bridge existance
     brlist = [nic.bridge for nic in instance.nics]
 
     # check bridge existance
     brlist = [nic.bridge for nic in instance.nics]
-    if not self.rpc.call_bridges_exist(target_node, brlist):
+    result = self.rpc.call_bridges_exist(target_node, brlist)
+    result.Raise()
+    if not result.data:
       raise errors.OpPrereqError("One or more target bridges %s does not"
                                  " exist on destination node '%s'" %
                                  (brlist, target_node))
       raise errors.OpPrereqError("One or more target bridges %s does not"
                                  " exist on destination node '%s'" %
                                  (brlist, target_node))
@@ -3161,7 +3366,7 @@ class LUFailoverInstance(LogicalUnit):
     for dev in instance.disks:
       # for drbd, these are drbd over lvm
       if not _CheckDiskConsistency(self, dev, target_node, False):
     for dev in instance.disks:
       # for drbd, these are drbd over lvm
       if not _CheckDiskConsistency(self, dev, target_node, False):
-        if instance.status == "up" and not self.op.ignore_consistency:
+        if instance.admin_up and not self.op.ignore_consistency:
           raise errors.OpExecError("Disk %s is degraded on target node,"
                                    " aborting failover." % dev.iv_name)
 
           raise errors.OpExecError("Disk %s is degraded on target node,"
                                    " aborting failover." % dev.iv_name)
 
@@ -3169,7 +3374,8 @@ class LUFailoverInstance(LogicalUnit):
     logging.info("Shutting down instance %s on node %s",
                  instance.name, source_node)
 
     logging.info("Shutting down instance %s on node %s",
                  instance.name, source_node)
 
-    if not self.rpc.call_instance_shutdown(source_node, instance):
+    result = self.rpc.call_instance_shutdown(source_node, instance)
+    if result.failed or not result.data:
       if self.op.ignore_consistency:
         self.proc.LogWarning("Could not shutdown instance %s on node %s."
                              " Proceeding"
       if self.op.ignore_consistency:
         self.proc.LogWarning("Could not shutdown instance %s on node %s."
                              " Proceeding"
@@ -3188,7 +3394,7 @@ class LUFailoverInstance(LogicalUnit):
     self.cfg.Update(instance)
 
     # Only start the instance if it's marked as up
     self.cfg.Update(instance)
 
     # Only start the instance if it's marked as up
-    if instance.status == "up":
+    if instance.admin_up:
       feedback_fn("* activating the instance's disks on target node")
       logging.info("Starting instance %s on node %s",
                    instance.name, target_node)
       feedback_fn("* activating the instance's disks on target node")
       logging.info("Starting instance %s on node %s",
                    instance.name, target_node)
@@ -3200,60 +3406,455 @@ class LUFailoverInstance(LogicalUnit):
         raise errors.OpExecError("Can't activate the instance's disks")
 
       feedback_fn("* starting the instance on the target node")
         raise errors.OpExecError("Can't activate the instance's disks")
 
       feedback_fn("* starting the instance on the target node")
-      if not self.rpc.call_instance_start(target_node, instance, None):
+      result = self.rpc.call_instance_start(target_node, instance, None)
+      msg = result.RemoteFailMsg()
+      if msg:
         _ShutdownInstanceDisks(self, instance)
         _ShutdownInstanceDisks(self, instance)
-        raise errors.OpExecError("Could not start instance %s on node %s." %
-                                 (instance.name, target_node))
+        raise errors.OpExecError("Could not start instance %s on node %s: %s" %
+                                 (instance.name, target_node, msg))
 
 
 
 
-def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
-  """Create a tree of block devices on the primary node.
+class LUMigrateInstance(LogicalUnit):
+  """Migrate an instance.
 
 
-  This always creates all devices.
+  This is migration without shutting down, compared to the failover,
+  which is done with shutdown.
 
   """
 
   """
-  if device.children:
-    for child in device.children:
-      if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
-        return False
+  HPATH = "instance-migrate"
+  HTYPE = constants.HTYPE_INSTANCE
+  _OP_REQP = ["instance_name", "live", "cleanup"]
 
 
-  lu.cfg.SetDiskID(device, node)
-  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
-                                       instance.name, True, info)
-  if not new_id:
-    return False
-  if device.physical_id is None:
-    device.physical_id = new_id
-  return True
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    self._ExpandAndLockInstance()
+    self.needed_locks[locking.LEVEL_NODE] = []
+    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+
+  def DeclareLocks(self, level):
+    if level == locking.LEVEL_NODE:
+      self._LockInstancesNodes()
+
+  def BuildHooksEnv(self):
+    """Build hooks env.
+
+    This runs on master, primary and secondary nodes of the instance.
+
+    """
+    env = _BuildInstanceHookEnvByObject(self, self.instance)
+    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
+    return env, nl, nl
+
+  def CheckPrereq(self):
+    """Check prerequisites.
+
+    This checks that the instance is in the cluster.
+
+    """
+    instance = self.cfg.GetInstanceInfo(
+      self.cfg.ExpandInstanceName(self.op.instance_name))
+    if instance is None:
+      raise errors.OpPrereqError("Instance '%s' not known" %
+                                 self.op.instance_name)
 
 
+    if instance.disk_template != constants.DT_DRBD8:
+      raise errors.OpPrereqError("Instance's disk layout is not"
+                                 " drbd8, cannot migrate.")
 
 
-def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
-  """Create a tree of block devices on a secondary node.
+    secondary_nodes = instance.secondary_nodes
+    if not secondary_nodes:
+      raise errors.ProgrammerError("no secondary node but using "
+                                   "drbd8 disk template")
+
+    i_be = self.cfg.GetClusterInfo().FillBE(instance)
+
+    target_node = secondary_nodes[0]
+    # check memory requirements on the secondary node
+    _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
+                         instance.name, i_be[constants.BE_MEMORY],
+                         instance.hypervisor)
+
+    # check bridge existance
+    brlist = [nic.bridge for nic in instance.nics]
+    result = self.rpc.call_bridges_exist(target_node, brlist)
+    if result.failed or not result.data:
+      raise errors.OpPrereqError("One or more target bridges %s does not"
+                                 " exist on destination node '%s'" %
+                                 (brlist, target_node))
+
+    if not self.op.cleanup:
+      result = self.rpc.call_instance_migratable(instance.primary_node,
+                                                 instance)
+      msg = result.RemoteFailMsg()
+      if msg:
+        raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
+                                   msg)
+
+    self.instance = instance
+
+  def _WaitUntilSync(self):
+    """Poll with custom rpc for disk sync.
+
+    This uses our own step-based rpc call.
+
+    """
+    self.feedback_fn("* wait until resync is done")
+    all_done = False
+    while not all_done:
+      all_done = True
+      result = self.rpc.call_drbd_wait_sync(self.all_nodes,
+                                            self.nodes_ip,
+                                            self.instance.disks)
+      min_percent = 100
+      for node, nres in result.items():
+        msg = nres.RemoteFailMsg()
+        if msg:
+          raise errors.OpExecError("Cannot resync disks on node %s: %s" %
+                                   (node, msg))
+        node_done, node_percent = nres.data[1]
+        all_done = all_done and node_done
+        if node_percent is not None:
+          min_percent = min(min_percent, node_percent)
+      if not all_done:
+        if min_percent < 100:
+          self.feedback_fn("   - progress: %.1f%%" % min_percent)
+        time.sleep(2)
+
+  def _EnsureSecondary(self, node):
+    """Demote a node to secondary.
+
+    """
+    self.feedback_fn("* switching node %s to secondary mode" % node)
+
+    for dev in self.instance.disks:
+      self.cfg.SetDiskID(dev, node)
+
+    result = self.rpc.call_blockdev_close(node, self.instance.name,
+                                          self.instance.disks)
+    msg = result.RemoteFailMsg()
+    if msg:
+      raise errors.OpExecError("Cannot change disk to secondary on node %s,"
+                               " error %s" % (node, msg))
+
+  def _GoStandalone(self):
+    """Disconnect from the network.
+
+    """
+    self.feedback_fn("* changing into standalone mode")
+    result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
+                                               self.instance.disks)
+    for node, nres in result.items():
+      msg = nres.RemoteFailMsg()
+      if msg:
+        raise errors.OpExecError("Cannot disconnect disks node %s,"
+                                 " error %s" % (node, msg))
+
+  def _GoReconnect(self, multimaster):
+    """Reconnect to the network.
+
+    """
+    if multimaster:
+      msg = "dual-master"
+    else:
+      msg = "single-master"
+    self.feedback_fn("* changing disks into %s mode" % msg)
+    result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
+                                           self.instance.disks,
+                                           self.instance.name, multimaster)
+    for node, nres in result.items():
+      msg = nres.RemoteFailMsg()
+      if msg:
+        raise errors.OpExecError("Cannot change disks config on node %s,"
+                                 " error: %s" % (node, msg))
+
+  def _ExecCleanup(self):
+    """Try to cleanup after a failed migration.
+
+    The cleanup is done by:
+      - check that the instance is running only on one node
+        (and update the config if needed)
+      - change disks on its secondary node to secondary
+      - wait until disks are fully synchronized
+      - disconnect from the network
+      - change disks into single-master mode
+      - wait again until disks are fully synchronized
+
+    """
+    instance = self.instance
+    target_node = self.target_node
+    source_node = self.source_node
+
+    # check running on only one node
+    self.feedback_fn("* checking where the instance actually runs"
+                     " (if this hangs, the hypervisor might be in"
+                     " a bad state)")
+    ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
+    for node, result in ins_l.items():
+      result.Raise()
+      if not isinstance(result.data, list):
+        raise errors.OpExecError("Can't contact node '%s'" % node)
+
+    runningon_source = instance.name in ins_l[source_node].data
+    runningon_target = instance.name in ins_l[target_node].data
+
+    if runningon_source and runningon_target:
+      raise errors.OpExecError("Instance seems to be running on two nodes,"
+                               " or the hypervisor is confused. You will have"
+                               " to ensure manually that it runs only on one"
+                               " and restart this operation.")
+
+    if not (runningon_source or runningon_target):
+      raise errors.OpExecError("Instance does not seem to be running at all."
+                               " In this case, it's safer to repair by"
+                               " running 'gnt-instance stop' to ensure disk"
+                               " shutdown, and then restarting it.")
+
+    if runningon_target:
+      # the migration has actually succeeded, we need to update the config
+      self.feedback_fn("* instance running on secondary node (%s),"
+                       " updating config" % target_node)
+      instance.primary_node = target_node
+      self.cfg.Update(instance)
+      demoted_node = source_node
+    else:
+      self.feedback_fn("* instance confirmed to be running on its"
+                       " primary node (%s)" % source_node)
+      demoted_node = target_node
+
+    self._EnsureSecondary(demoted_node)
+    try:
+      self._WaitUntilSync()
+    except errors.OpExecError:
+      # we ignore here errors, since if the device is standalone, it
+      # won't be able to sync
+      pass
+    self._GoStandalone()
+    self._GoReconnect(False)
+    self._WaitUntilSync()
+
+    self.feedback_fn("* done")
+
+  def _RevertDiskStatus(self):
+    """Try to revert the disk status after a failed migration.
+
+    """
+    target_node = self.target_node
+    try:
+      self._EnsureSecondary(target_node)
+      self._GoStandalone()
+      self._GoReconnect(False)
+      self._WaitUntilSync()
+    except errors.OpExecError, err:
+      self.LogWarning("Migration failed and I can't reconnect the"
+                      " drives: error '%s'\n"
+                      "Please look and recover the instance status" %
+                      str(err))
+
+  def _AbortMigration(self):
+    """Call the hypervisor code to abort a started migration.
+
+    """
+    instance = self.instance
+    target_node = self.target_node
+    migration_info = self.migration_info
+
+    abort_result = self.rpc.call_finalize_migration(target_node,
+                                                    instance,
+                                                    migration_info,
+                                                    False)
+    abort_msg = abort_result.RemoteFailMsg()
+    if abort_msg:
+      logging.error("Aborting migration failed on target node %s: %s" %
+                    (target_node, abort_msg))
+      # Don't raise an exception here, as we stil have to try to revert the
+      # disk status, even if this step failed.
+
+  def _ExecMigration(self):
+    """Migrate an instance.
+
+    The migrate is done by:
+      - change the disks into dual-master mode
+      - wait until disks are fully synchronized again
+      - migrate the instance
+      - change disks on the new secondary node (the old primary) to secondary
+      - wait until disks are fully synchronized
+      - change disks into single-master mode
+
+    """
+    instance = self.instance
+    target_node = self.target_node
+    source_node = self.source_node
+
+    self.feedback_fn("* checking disk consistency between source and target")
+    for dev in instance.disks:
+      if not _CheckDiskConsistency(self, dev, target_node, False):
+        raise errors.OpExecError("Disk %s is degraded or not fully"
+                                 " synchronized on target node,"
+                                 " aborting migrate." % dev.iv_name)
+
+    # First get the migration information from the remote node
+    result = self.rpc.call_migration_info(source_node, instance)
+    msg = result.RemoteFailMsg()
+    if msg:
+      log_err = ("Failed fetching source migration information from %s: %s" %
+                  (source_node, msg))
+      logging.error(log_err)
+      raise errors.OpExecError(log_err)
+
+    self.migration_info = migration_info = result.data[1]
+
+    # Then switch the disks to master/master mode
+    self._EnsureSecondary(target_node)
+    self._GoStandalone()
+    self._GoReconnect(True)
+    self._WaitUntilSync()
+
+    self.feedback_fn("* preparing %s to accept the instance" % target_node)
+    result = self.rpc.call_accept_instance(target_node,
+                                           instance,
+                                           migration_info,
+                                           self.nodes_ip[target_node])
+
+    msg = result.RemoteFailMsg()
+    if msg:
+      logging.error("Instance pre-migration failed, trying to revert"
+                    " disk status: %s", msg)
+      self._AbortMigration()
+      self._RevertDiskStatus()
+      raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
+                               (instance.name, msg))
+
+    self.feedback_fn("* migrating instance to %s" % target_node)
+    time.sleep(10)
+    result = self.rpc.call_instance_migrate(source_node, instance,
+                                            self.nodes_ip[target_node],
+                                            self.op.live)
+    msg = result.RemoteFailMsg()
+    if msg:
+      logging.error("Instance migration failed, trying to revert"
+                    " disk status: %s", msg)
+      self._AbortMigration()
+      self._RevertDiskStatus()
+      raise errors.OpExecError("Could not migrate instance %s: %s" %
+                               (instance.name, msg))
+    time.sleep(10)
+
+    instance.primary_node = target_node
+    # distribute new instance config to the other nodes
+    self.cfg.Update(instance)
+
+    result = self.rpc.call_finalize_migration(target_node,
+                                              instance,
+                                              migration_info,
+                                              True)
+    msg = result.RemoteFailMsg()
+    if msg:
+      logging.error("Instance migration succeeded, but finalization failed:"
+                    " %s" % msg)
+      raise errors.OpExecError("Could not finalize instance migration: %s" %
+                               msg)
+
+    self._EnsureSecondary(source_node)
+    self._WaitUntilSync()
+    self._GoStandalone()
+    self._GoReconnect(False)
+    self._WaitUntilSync()
+
+    self.feedback_fn("* done")
+
+  def Exec(self, feedback_fn):
+    """Perform the migration.
+
+    """
+    self.feedback_fn = feedback_fn
+
+    self.source_node = self.instance.primary_node
+    self.target_node = self.instance.secondary_nodes[0]
+    self.all_nodes = [self.source_node, self.target_node]
+    self.nodes_ip = {
+      self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
+      self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
+      }
+    if self.op.cleanup:
+      return self._ExecCleanup()
+    else:
+      return self._ExecMigration()
+
+
+def _CreateBlockDev(lu, node, instance, device, force_create,
+                    info, force_open):
+  """Create a tree of block devices on a given node.
 
   If this device type has to be created on secondaries, create it and
   all its children.
 
   If not, just recurse to children keeping the same 'force' value.
 
 
   If this device type has to be created on secondaries, create it and
   all its children.
 
   If not, just recurse to children keeping the same 'force' value.
 
+  @param lu: the lu on whose behalf we execute
+  @param node: the node on which to create the device
+  @type instance: L{objects.Instance}
+  @param instance: the instance which owns the device
+  @type device: L{objects.Disk}
+  @param device: the device to create
+  @type force_create: boolean
+  @param force_create: whether to force creation of this device; this
+      will be change to True whenever we find a device which has
+      CreateOnSecondary() attribute
+  @param info: the extra 'metadata' we should attach to the device
+      (this will be represented as a LVM tag)
+  @type force_open: boolean
+  @param force_open: this parameter will be passes to the
+      L{backend.CreateBlockDevice} function where it specifies
+      whether we run on primary or not, and it affects both
+      the child assembly and the device own Open() execution
+
   """
   if device.CreateOnSecondary():
   """
   if device.CreateOnSecondary():
-    force = True
+    force_create = True
+
   if device.children:
     for child in device.children:
   if device.children:
     for child in device.children:
-      if not _CreateBlockDevOnSecondary(lu, node, instance,
-                                        child, force, info):
-        return False
+      _CreateBlockDev(lu, node, instance, child, force_create,
+                      info, force_open)
 
 
-  if not force:
-    return True
+  if not force_create:
+    return
+
+  _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
+
+
+def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
+  """Create a single block device on a given node.
+
+  This will not recurse over children of the device, so they must be
+  created in advance.
+
+  @param lu: the lu on whose behalf we execute
+  @param node: the node on which to create the device
+  @type instance: L{objects.Instance}
+  @param instance: the instance which owns the device
+  @type device: L{objects.Disk}
+  @param device: the device to create
+  @param info: the extra 'metadata' we should attach to the device
+      (this will be represented as a LVM tag)
+  @type force_open: boolean
+  @param force_open: this parameter will be passes to the
+      L{backend.CreateBlockDevice} function where it specifies
+      whether we run on primary or not, and it affects both
+      the child assembly and the device own Open() execution
+
+  """
   lu.cfg.SetDiskID(device, node)
   lu.cfg.SetDiskID(device, node)
-  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
-                                       instance.name, False, info)
-  if not new_id:
-    return False
+  result = lu.rpc.call_blockdev_create(node, device, device.size,
+                                       instance.name, force_open, info)
+  msg = result.RemoteFailMsg()
+  if msg:
+    raise errors.OpExecError("Can't create block device %s on"
+                             " node %s for instance %s: %s" %
+                             (device, node, instance.name, msg))
   if device.physical_id is None:
   if device.physical_id is None:
-    device.physical_id = new_id
-  return True
+    device.physical_id = result.data[1]
 
 
 def _GenerateUniqueNames(lu, exts):
 
 
 def _GenerateUniqueNames(lu, exts):
@@ -3324,11 +3925,11 @@ def _GenerateDiskTemplate(lu, template_name,
     minors = lu.cfg.AllocateDRBDMinor(
       [primary_node, remote_node] * len(disk_info), instance_name)
 
     minors = lu.cfg.AllocateDRBDMinor(
       [primary_node, remote_node] * len(disk_info), instance_name)
 
-    names = _GenerateUniqueNames(lu,
-                                 [".disk%d_%s" % (i, s)
-                                  for i in range(disk_count)
-                                  for s in ("data", "meta")
-                                  ])
+    names = []
+    for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
+                                               for i in range(disk_count)]):
+      names.append(lv_prefix + "_data")
+      names.append(lv_prefix + "_meta")
     for idx, disk in enumerate(disk_info):
       disk_index = idx + base_index
       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
     for idx, disk in enumerate(disk_info):
       disk_index = idx + base_index
       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
@@ -3374,19 +3975,18 @@ def _CreateDisks(lu, instance):
 
   """
   info = _GetInstanceInfoText(instance)
 
   """
   info = _GetInstanceInfoText(instance)
+  pnode = instance.primary_node
 
   if instance.disk_template == constants.DT_FILE:
     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
 
   if instance.disk_template == constants.DT_FILE:
     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
-    result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
-                                                 file_storage_dir)
+    result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
 
 
-    if not result:
-      logging.error("Could not connect to node '%s'", instance.primary_node)
-      return False
+    if result.failed or not result.data:
+      raise errors.OpExecError("Could not connect to node '%s'" % pnode)
 
 
-    if not result[0]:
-      logging.error("Failed to create directory '%s'", file_storage_dir)
-      return False
+    if not result.data[0]:
+      raise errors.OpExecError("Failed to create directory '%s'" %
+                               file_storage_dir)
 
   # Note: this needs to be kept in sync with adding of disks in
   # LUSetInstanceParams
 
   # Note: this needs to be kept in sync with adding of disks in
   # LUSetInstanceParams
@@ -3394,19 +3994,9 @@ def _CreateDisks(lu, instance):
     logging.info("Creating volume %s for instance %s",
                  device.iv_name, instance.name)
     #HARDCODE
     logging.info("Creating volume %s for instance %s",
                  device.iv_name, instance.name)
     #HARDCODE
-    for secondary_node in instance.secondary_nodes:
-      if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
-                                        device, False, info):
-        logging.error("Failed to create volume %s (%s) on secondary node %s!",
-                      device.iv_name, device, secondary_node)
-        return False
-    #HARDCODE
-    if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
-                                    instance, device, info):
-      logging.error("Failed to create volume %s on primary!", device.iv_name)
-      return False
-
-  return True
+    for node in instance.all_nodes:
+      f_create = node == pnode
+      _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
 
 
 def _RemoveDisks(lu, instance):
 
 
 def _RemoveDisks(lu, instance):
@@ -3431,15 +4021,17 @@ def _RemoveDisks(lu, instance):
   for device in instance.disks:
     for node, disk in device.ComputeNodeTree(instance.primary_node):
       lu.cfg.SetDiskID(disk, node)
   for device in instance.disks:
     for node, disk in device.ComputeNodeTree(instance.primary_node):
       lu.cfg.SetDiskID(disk, node)
-      if not lu.rpc.call_blockdev_remove(node, disk):
+      result = lu.rpc.call_blockdev_remove(node, disk)
+      if result.failed or not result.data:
         lu.proc.LogWarning("Could not remove block device %s on node %s,"
                            " continuing anyway", device.iv_name, node)
         result = False
 
   if instance.disk_template == constants.DT_FILE:
     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
         lu.proc.LogWarning("Could not remove block device %s on node %s,"
                            " continuing anyway", device.iv_name, node)
         result = False
 
   if instance.disk_template == constants.DT_FILE:
     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
-    if not lu.rpc.call_file_storage_dir_remove(instance.primary_node,
-                                               file_storage_dir):
+    result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
+                                                 file_storage_dir)
+    if result.failed or not result.data:
       logging.error("Could not remove directory '%s'", file_storage_dir)
       result = False
 
       logging.error("Could not remove directory '%s'", file_storage_dir)
       result = False
 
@@ -3487,13 +4079,14 @@ def _CheckHVParams(lu, nodenames, hvname, hvparams):
                                                   hvname,
                                                   hvparams)
   for node in nodenames:
                                                   hvname,
                                                   hvparams)
   for node in nodenames:
-    info = hvinfo.get(node, None)
-    if not info or not isinstance(info, (tuple, list)):
+    info = hvinfo[node]
+    info.Raise()
+    if not info.data or not isinstance(info.data, (tuple, list)):
       raise errors.OpPrereqError("Cannot get current information"
       raise errors.OpPrereqError("Cannot get current information"
-                                 " from node '%s' (%s)" % (node, info))
-    if not info[0]:
+                                 " from node '%s' (%s)" % (node, info.data))
+    if not info.data[0]:
       raise errors.OpPrereqError("Hypervisor parameter validation failed:"
       raise errors.OpPrereqError("Hypervisor parameter validation failed:"
-                                 " %s" % info[1])
+                                 " %s" % info.data[1])
 
 
 class LUCreateInstance(LogicalUnit):
 
 
 class LUCreateInstance(LogicalUnit):
@@ -3560,6 +4153,7 @@ class LUCreateInstance(LogicalUnit):
     hv_type.CheckParameterSyntax(filled_hvp)
 
     # fill and remember the beparams dict
     hv_type.CheckParameterSyntax(filled_hvp)
 
     # fill and remember the beparams dict
+    utils.CheckBEParams(self.op.beparams)
     self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
                                     self.op.beparams)
 
     self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
                                     self.op.beparams)
 
@@ -3650,16 +4244,22 @@ class LUCreateInstance(LogicalUnit):
       src_node = getattr(self.op, "src_node", None)
       src_path = getattr(self.op, "src_path", None)
 
       src_node = getattr(self.op, "src_node", None)
       src_path = getattr(self.op, "src_path", None)
 
-      if src_node is None or src_path is None:
-        raise errors.OpPrereqError("Importing an instance requires source"
-                                   " node and path options")
-
-      if not os.path.isabs(src_path):
-        raise errors.OpPrereqError("The source path must be absolute")
+      if src_path is None:
+        self.op.src_path = src_path = self.op.instance_name
 
 
-      self.op.src_node = src_node = self._ExpandNode(src_node)
-      if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
-        self.needed_locks[locking.LEVEL_NODE].append(src_node)
+      if src_node is None:
+        self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+        self.op.src_node = None
+        if os.path.isabs(src_path):
+          raise errors.OpPrereqError("Importing an instance from an absolute"
+                                     " path requires a source node option.")
+      else:
+        self.op.src_node = src_node = self._ExpandNode(src_node)
+        if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
+          self.needed_locks[locking.LEVEL_NODE].append(src_node)
+        if not os.path.isabs(src_path):
+          self.op.src_path = src_path = \
+            os.path.join(constants.EXPORT_DIR, src_path)
 
     else: # INSTANCE_CREATE
       if getattr(self.op, "os_type", None) is None:
 
     else: # INSTANCE_CREATE
       if getattr(self.op, "os_type", None) is None:
@@ -3746,11 +4346,28 @@ class LUCreateInstance(LogicalUnit):
       src_node = self.op.src_node
       src_path = self.op.src_path
 
       src_node = self.op.src_node
       src_path = self.op.src_path
 
-      export_info = self.rpc.call_export_info(src_node, src_path)
-
-      if not export_info:
+      if src_node is None:
+        exp_list = self.rpc.call_export_list(
+          self.acquired_locks[locking.LEVEL_NODE])
+        found = False
+        for node in exp_list:
+          if not exp_list[node].failed and src_path in exp_list[node].data:
+            found = True
+            self.op.src_node = src_node = node
+            self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
+                                                       src_path)
+            break
+        if not found:
+          raise errors.OpPrereqError("No export found for relative path %s" %
+                                      src_path)
+
+      _CheckNodeOnline(self, src_node)
+      result = self.rpc.call_export_info(src_node, src_path)
+      result.Raise()
+      if not result.data:
         raise errors.OpPrereqError("No export found in dir %s" % src_path)
 
         raise errors.OpPrereqError("No export found in dir %s" % src_path)
 
+      export_info = result.data
       if not export_info.has_section(constants.INISECT_EXP):
         raise errors.ProgrammerError("Corrupted export config")
 
       if not export_info.has_section(constants.INISECT_EXP):
         raise errors.ProgrammerError("Corrupted export config")
 
@@ -3811,6 +4428,10 @@ class LUCreateInstance(LogicalUnit):
     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
     assert self.pnode is not None, \
       "Cannot retrieve locked node %s" % self.op.pnode
     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
     assert self.pnode is not None, \
       "Cannot retrieve locked node %s" % self.op.pnode
+    if pnode.offline:
+      raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
+                                 pnode.name)
+
     self.secondaries = []
 
     # mirror node verification
     self.secondaries = []
 
     # mirror node verification
@@ -3822,6 +4443,7 @@ class LUCreateInstance(LogicalUnit):
         raise errors.OpPrereqError("The secondary node cannot be"
                                    " the primary node.")
       self.secondaries.append(self.op.snode)
         raise errors.OpPrereqError("The secondary node cannot be"
                                    " the primary node.")
       self.secondaries.append(self.op.snode)
+      _CheckNodeOnline(self, self.op.snode)
 
     nodenames = [pnode.name] + self.secondaries
 
 
     nodenames = [pnode.name] + self.secondaries
 
@@ -3833,7 +4455,9 @@ class LUCreateInstance(LogicalUnit):
       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
                                          self.op.hypervisor)
       for node in nodenames:
       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
                                          self.op.hypervisor)
       for node in nodenames:
-        info = nodeinfo.get(node, None)
+        info = nodeinfo[node]
+        info.Raise()
+        info = info.data
         if not info:
           raise errors.OpPrereqError("Cannot get current information"
                                      " from node '%s'" % node)
         if not info:
           raise errors.OpPrereqError("Cannot get current information"
                                      " from node '%s'" % node)
@@ -3849,17 +4473,19 @@ class LUCreateInstance(LogicalUnit):
     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
 
     # os verification
     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
 
     # os verification
-    os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
-    if not os_obj:
+    result = self.rpc.call_os_get(pnode.name, self.op.os_type)
+    result.Raise()
+    if not isinstance(result.data, objects.OS):
       raise errors.OpPrereqError("OS '%s' not in supported os list for"
                                  " primary node"  % self.op.os_type)
 
     # bridge check on primary node
     bridges = [n.bridge for n in self.nics]
       raise errors.OpPrereqError("OS '%s' not in supported os list for"
                                  " primary node"  % self.op.os_type)
 
     # bridge check on primary node
     bridges = [n.bridge for n in self.nics]
-    if not self.rpc.call_bridges_exist(self.pnode.name, bridges):
-      raise errors.OpPrereqError("one of the target bridges '%s' does not"
-                                 " exist on"
-                                 " destination node '%s'" %
+    result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
+    result.Raise()
+    if not result.data:
+      raise errors.OpPrereqError("One of the target bridges '%s' does not"
+                                 " exist on destination node '%s'" %
                                  (",".join(bridges), pnode.name))
 
     # memory check on primary node
                                  (",".join(bridges), pnode.name))
 
     # memory check on primary node
@@ -3869,10 +4495,7 @@ class LUCreateInstance(LogicalUnit):
                            self.be_full[constants.BE_MEMORY],
                            self.op.hypervisor)
 
                            self.be_full[constants.BE_MEMORY],
                            self.op.hypervisor)
 
-    if self.op.start:
-      self.instance_status = 'up'
-    else:
-      self.instance_status = 'down'
+    self.instance_status = self.op.start
 
   def Exec(self, feedback_fn):
     """Create and add the instance to the cluster.
 
   def Exec(self, feedback_fn):
     """Create and add the instance to the cluster.
@@ -3919,7 +4542,7 @@ class LUCreateInstance(LogicalUnit):
                             primary_node=pnode_name,
                             nics=self.nics, disks=disks,
                             disk_template=self.op.disk_template,
                             primary_node=pnode_name,
                             nics=self.nics, disks=disks,
                             disk_template=self.op.disk_template,
-                            status=self.instance_status,
+                            admin_up=self.instance_status,
                             network_port=network_port,
                             beparams=self.op.beparams,
                             hvparams=self.op.hvparams,
                             network_port=network_port,
                             beparams=self.op.beparams,
                             hvparams=self.op.hvparams,
@@ -3927,10 +4550,15 @@ class LUCreateInstance(LogicalUnit):
                             )
 
     feedback_fn("* creating instance disks...")
                             )
 
     feedback_fn("* creating instance disks...")
-    if not _CreateDisks(self, iobj):
-      _RemoveDisks(self, iobj)
-      self.cfg.ReleaseDRBDMinors(instance)
-      raise errors.OpExecError("Device creation failed, reverting...")
+    try:
+      _CreateDisks(self, iobj)
+    except errors.OpExecError:
+      self.LogWarning("Device creation failed, reverting...")
+      try:
+        _RemoveDisks(self, iobj)
+      finally:
+        self.cfg.ReleaseDRBDMinors(instance)
+        raise
 
     feedback_fn("adding instance %s to cluster config" % instance)
 
 
     feedback_fn("adding instance %s to cluster config" % instance)
 
@@ -3938,8 +4566,6 @@ class LUCreateInstance(LogicalUnit):
     # Declare that we don't want to remove the instance lock anymore, as we've
     # added the instance to the config
     del self.remove_locks[locking.LEVEL_INSTANCE]
     # Declare that we don't want to remove the instance lock anymore, as we've
     # added the instance to the config
     del self.remove_locks[locking.LEVEL_INSTANCE]
-    # Remove the temp. assignements for the instance's drbds
-    self.cfg.ReleaseDRBDMinors(instance)
     # Unlock all the nodes
     if self.op.mode == constants.INSTANCE_IMPORT:
       nodes_keep = [self.op.src_node]
     # Unlock all the nodes
     if self.op.mode == constants.INSTANCE_IMPORT:
       nodes_keep = [self.op.src_node]
@@ -3975,10 +4601,12 @@ class LUCreateInstance(LogicalUnit):
     if iobj.disk_template != constants.DT_DISKLESS:
       if self.op.mode == constants.INSTANCE_CREATE:
         feedback_fn("* running the instance OS create scripts...")
     if iobj.disk_template != constants.DT_DISKLESS:
       if self.op.mode == constants.INSTANCE_CREATE:
         feedback_fn("* running the instance OS create scripts...")
-        if not self.rpc.call_instance_os_add(pnode_name, iobj):
-          raise errors.OpExecError("could not add os for instance %s"
-                                   " on node %s" %
-                                   (instance, pnode_name))
+        result = self.rpc.call_instance_os_add(pnode_name, iobj)
+        msg = result.RemoteFailMsg()
+        if msg:
+          raise errors.OpExecError("Could not add os for instance %s"
+                                   " on node %s: %s" %
+                                   (instance, pnode_name, msg))
 
       elif self.op.mode == constants.INSTANCE_IMPORT:
         feedback_fn("* running the instance OS import scripts...")
 
       elif self.op.mode == constants.INSTANCE_IMPORT:
         feedback_fn("* running the instance OS import scripts...")
@@ -3988,7 +4616,8 @@ class LUCreateInstance(LogicalUnit):
         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
                                                          src_node, src_images,
                                                          cluster_name)
         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
                                                          src_node, src_images,
                                                          cluster_name)
-        for idx, result in enumerate(import_result):
+        import_result.Raise()
+        for idx, result in enumerate(import_result.data):
           if not result:
             self.LogWarning("Could not import the image %s for instance"
                             " %s, disk %d, on node %s" %
           if not result:
             self.LogWarning("Could not import the image %s for instance"
                             " %s, disk %d, on node %s" %
@@ -4001,8 +4630,10 @@ class LUCreateInstance(LogicalUnit):
     if self.op.start:
       logging.info("Starting instance %s on node %s", instance, pnode_name)
       feedback_fn("* starting instance...")
     if self.op.start:
       logging.info("Starting instance %s on node %s", instance, pnode_name)
       feedback_fn("* starting instance...")
-      if not self.rpc.call_instance_start(pnode_name, iobj, None):
-        raise errors.OpExecError("Could not start instance")
+      result = self.rpc.call_instance_start(pnode_name, iobj, None)
+      msg = result.RemoteFailMsg()
+      if msg:
+        raise errors.OpExecError("Could not start instance: %s" % msg)
 
 
 class LUConnectConsole(NoHooksLU):
 
 
 class LUConnectConsole(NoHooksLU):
@@ -4028,6 +4659,7 @@ class LUConnectConsole(NoHooksLU):
     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
     assert self.instance is not None, \
       "Cannot retrieve locked instance %s" % self.op.instance_name
     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
     assert self.instance is not None, \
       "Cannot retrieve locked instance %s" % self.op.instance_name
+    _CheckNodeOnline(self, self.instance.primary_node)
 
   def Exec(self, feedback_fn):
     """Connect to the console of an instance
 
   def Exec(self, feedback_fn):
     """Connect to the console of an instance
@@ -4038,10 +4670,9 @@ class LUConnectConsole(NoHooksLU):
 
     node_insts = self.rpc.call_instance_list([node],
                                              [instance.hypervisor])[node]
 
     node_insts = self.rpc.call_instance_list([node],
                                              [instance.hypervisor])[node]
-    if node_insts is False:
-      raise errors.OpExecError("Can't connect to node %s." % node)
+    node_insts.Raise()
 
 
-    if instance.name not in node_insts:
+    if instance.name not in node_insts.data:
       raise errors.OpExecError("Instance %s is not running." % instance.name)
 
     logging.debug("Connecting to console of %s on %s", instance.name, node)
       raise errors.OpExecError("Instance %s is not running." % instance.name)
 
     logging.debug("Connecting to console of %s on %s", instance.name, node)
@@ -4062,17 +4693,32 @@ class LUReplaceDisks(LogicalUnit):
   _OP_REQP = ["instance_name", "mode", "disks"]
   REQ_BGL = False
 
   _OP_REQP = ["instance_name", "mode", "disks"]
   REQ_BGL = False
 
-  def ExpandNames(self):
-    self._ExpandAndLockInstance()
-
+  def CheckArguments(self):
     if not hasattr(self.op, "remote_node"):
       self.op.remote_node = None
     if not hasattr(self.op, "remote_node"):
       self.op.remote_node = None
-
-    ia_name = getattr(self.op, "iallocator", None)
-    if ia_name is not None:
-      if self.op.remote_node is not None:
+    if not hasattr(self.op, "iallocator"):
+      self.op.iallocator = None
+
+    # check for valid parameter combination
+    cnt = [self.op.remote_node, self.op.iallocator].count(None)
+    if self.op.mode == constants.REPLACE_DISK_CHG:
+      if cnt == 2:
+        raise errors.OpPrereqError("When changing the secondary either an"
+                                   " iallocator script must be used or the"
+                                   " new node given")
+      elif cnt == 0:
         raise errors.OpPrereqError("Give either the iallocator or the new"
                                    " secondary, not both")
         raise errors.OpPrereqError("Give either the iallocator or the new"
                                    " secondary, not both")
+    else: # not replacing the secondary
+      if cnt != 2:
+        raise errors.OpPrereqError("The iallocator and new node options can"
+                                   " be used only when changing the"
+                                   " secondary node")
+
+  def ExpandNames(self):
+    self._ExpandAndLockInstance()
+
+    if self.op.iallocator is not None:
       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
     elif self.op.remote_node is not None:
       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
     elif self.op.remote_node is not None:
       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
@@ -4147,9 +4793,9 @@ class LUReplaceDisks(LogicalUnit):
       "Cannot retrieve locked instance %s" % self.op.instance_name
     self.instance = instance
 
       "Cannot retrieve locked instance %s" % self.op.instance_name
     self.instance = instance
 
-    if instance.disk_template not in constants.DTS_NET_MIRROR:
-      raise errors.OpPrereqError("Instance's disk layout is not"
-                                 " network mirrored.")
+    if instance.disk_template != constants.DT_DRBD8:
+      raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
+                                 " instances")
 
     if len(instance.secondary_nodes) != 1:
       raise errors.OpPrereqError("The instance has a strange layout,"
 
     if len(instance.secondary_nodes) != 1:
       raise errors.OpPrereqError("The instance has a strange layout,"
@@ -4158,8 +4804,7 @@ class LUReplaceDisks(LogicalUnit):
 
     self.sec_node = instance.secondary_nodes[0]
 
 
     self.sec_node = instance.secondary_nodes[0]
 
-    ia_name = getattr(self.op, "iallocator", None)
-    if ia_name is not None:
+    if self.op.iallocator is not None:
       self._RunAllocator()
 
     remote_node = self.op.remote_node
       self._RunAllocator()
 
     remote_node = self.op.remote_node
@@ -4173,35 +4818,24 @@ class LUReplaceDisks(LogicalUnit):
       raise errors.OpPrereqError("The specified node is the primary node of"
                                  " the instance.")
     elif remote_node == self.sec_node:
       raise errors.OpPrereqError("The specified node is the primary node of"
                                  " the instance.")
     elif remote_node == self.sec_node:
-      if self.op.mode == constants.REPLACE_DISK_SEC:
-        # this is for DRBD8, where we can't execute the same mode of
-        # replacement as for drbd7 (no different port allocated)
-        raise errors.OpPrereqError("Same secondary given, cannot execute"
-                                   " replacement")
-    if instance.disk_template == constants.DT_DRBD8:
-      if (self.op.mode == constants.REPLACE_DISK_ALL and
-          remote_node is not None):
-        # switch to replace secondary mode
-        self.op.mode = constants.REPLACE_DISK_SEC
-
-      if self.op.mode == constants.REPLACE_DISK_ALL:
-        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
-                                   " secondary disk replacement, not"
-                                   " both at once")
-      elif self.op.mode == constants.REPLACE_DISK_PRI:
-        if remote_node is not None:
-          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
-                                     " the secondary while doing a primary"
-                                     " node disk replacement")
-        self.tgt_node = instance.primary_node
-        self.oth_node = instance.secondary_nodes[0]
-      elif self.op.mode == constants.REPLACE_DISK_SEC:
-        self.new_node = remote_node # this can be None, in which case
-                                    # we don't change the secondary
-        self.tgt_node = instance.secondary_nodes[0]
-        self.oth_node = instance.primary_node
-      else:
-        raise errors.ProgrammerError("Unhandled disk replace mode")
+      raise errors.OpPrereqError("The specified node is already the"
+                                 " secondary node of the instance.")
+
+    if self.op.mode == constants.REPLACE_DISK_PRI:
+      n1 = self.tgt_node = instance.primary_node
+      n2 = self.oth_node = self.sec_node
+    elif self.op.mode == constants.REPLACE_DISK_SEC:
+      n1 = self.tgt_node = self.sec_node
+      n2 = self.oth_node = instance.primary_node
+    elif self.op.mode == constants.REPLACE_DISK_CHG:
+      n1 = self.new_node = remote_node
+      n2 = self.oth_node = instance.primary_node
+      self.tgt_node = self.sec_node
+    else:
+      raise errors.ProgrammerError("Unhandled disk replace mode")
+
+    _CheckNodeOnline(self, n1)
+    _CheckNodeOnline(self, n2)
 
     if not self.op.disks:
       self.op.disks = range(len(instance.disks))
 
     if not self.op.disks:
       self.op.disks = range(len(instance.disks))
@@ -4249,8 +4883,8 @@ class LUReplaceDisks(LogicalUnit):
     if not results:
       raise errors.OpExecError("Can't list volume groups on the nodes")
     for node in oth_node, tgt_node:
     if not results:
       raise errors.OpExecError("Can't list volume groups on the nodes")
     for node in oth_node, tgt_node:
-      res = results.get(node, False)
-      if not res or my_vg not in res:
+      res = results[node]
+      if res.failed or not res.data or my_vg not in res.data:
         raise errors.OpExecError("Volume group '%s' not found on %s" %
                                  (my_vg, node))
     for idx, dev in enumerate(instance.disks):
         raise errors.OpExecError("Volume group '%s' not found on %s" %
                                  (my_vg, node))
     for idx, dev in enumerate(instance.disks):
@@ -4294,21 +4928,18 @@ class LUReplaceDisks(LogicalUnit):
       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
       info("creating new local storage on %s for %s" %
            (tgt_node, dev.iv_name))
       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
       info("creating new local storage on %s for %s" %
            (tgt_node, dev.iv_name))
-      # since we *always* want to create this LV, we use the
-      # _Create...OnPrimary (which forces the creation), even if we
-      # are talking about the secondary node
+      # we pass force_create=True to force the LVM creation
       for new_lv in new_lvs:
       for new_lv in new_lvs:
-        if not _CreateBlockDevOnPrimary(self, tgt_node, instance, new_lv,
-                                        _GetInstanceInfoText(instance)):
-          raise errors.OpExecError("Failed to create new LV named '%s' on"
-                                   " node '%s'" %
-                                   (new_lv.logical_id[1], tgt_node))
+        _CreateBlockDev(self, tgt_node, instance, new_lv, True,
+                        _GetInstanceInfoText(instance), False)
 
     # Step: for each lv, detach+rename*2+attach
     self.proc.LogStep(4, steps_total, "change drbd configuration")
     for dev, old_lvs, new_lvs in iv_names.itervalues():
       info("detaching %s drbd from local storage" % dev.iv_name)
 
     # Step: for each lv, detach+rename*2+attach
     self.proc.LogStep(4, steps_total, "change drbd configuration")
     for dev, old_lvs, new_lvs in iv_names.itervalues():
       info("detaching %s drbd from local storage" % dev.iv_name)
-      if not self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
+      result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
+      result.Raise()
+      if not result.data:
         raise errors.OpExecError("Can't detach drbd from local storage on node"
                                  " %s for device %s" % (tgt_node, dev.iv_name))
       #dev.children = []
         raise errors.OpExecError("Can't detach drbd from local storage on node"
                                  " %s for device %s" % (tgt_node, dev.iv_name))
       #dev.children = []
@@ -4328,16 +4959,20 @@ class LUReplaceDisks(LogicalUnit):
       rlist = []
       for to_ren in old_lvs:
         find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
       rlist = []
       for to_ren in old_lvs:
         find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
-        if find_res is not None: # device exists
+        if not find_res.failed and find_res.data is not None: # device exists
           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
 
       info("renaming the old LVs on the target node")
           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
 
       info("renaming the old LVs on the target node")
-      if not self.rpc.call_blockdev_rename(tgt_node, rlist):
+      result = self.rpc.call_blockdev_rename(tgt_node, rlist)
+      result.Raise()
+      if not result.data:
         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
       # now we rename the new LVs to the old LVs
       info("renaming the new LVs on the target node")
       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
       # now we rename the new LVs to the old LVs
       info("renaming the new LVs on the target node")
       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
-      if not self.rpc.call_blockdev_rename(tgt_node, rlist):
+      result = self.rpc.call_blockdev_rename(tgt_node, rlist)
+      result.Raise()
+      if not result.data:
         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
 
       for old, new in zip(old_lvs, new_lvs):
         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
 
       for old, new in zip(old_lvs, new_lvs):
@@ -4350,9 +4985,11 @@ class LUReplaceDisks(LogicalUnit):
 
       # now that the new lvs have the old name, we can add them to the device
       info("adding new mirror component on %s" % tgt_node)
 
       # now that the new lvs have the old name, we can add them to the device
       info("adding new mirror component on %s" % tgt_node)
-      if not self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
+      result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
+      if result.failed or not result.data:
         for new_lv in new_lvs:
         for new_lv in new_lvs:
-          if not self.rpc.call_blockdev_remove(tgt_node, new_lv):
+          result = self.rpc.call_blockdev_remove(tgt_node, new_lv)
+          if result.failed or not result.data:
             warning("Can't rollback device %s", hint="manually cleanup unused"
                     " logical volumes")
         raise errors.OpExecError("Can't add local storage to drbd")
             warning("Can't rollback device %s", hint="manually cleanup unused"
                     " logical volumes")
         raise errors.OpExecError("Can't add local storage to drbd")
@@ -4371,8 +5008,8 @@ class LUReplaceDisks(LogicalUnit):
     # so check manually all the devices
     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
       cfg.SetDiskID(dev, instance.primary_node)
     # so check manually all the devices
     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
       cfg.SetDiskID(dev, instance.primary_node)
-      is_degr = self.rpc.call_blockdev_find(instance.primary_node, dev)[5]
-      if is_degr:
+      result = self.rpc.call_blockdev_find(instance.primary_node, dev)
+      if result.failed or result.data[5]:
         raise errors.OpExecError("DRBD device %s is degraded!" % name)
 
     # Step: remove old storage
         raise errors.OpExecError("DRBD device %s is degraded!" % name)
 
     # Step: remove old storage
@@ -4381,7 +5018,8 @@ class LUReplaceDisks(LogicalUnit):
       info("remove logical volumes for %s" % name)
       for lv in old_lvs:
         cfg.SetDiskID(lv, tgt_node)
       info("remove logical volumes for %s" % name)
       for lv in old_lvs:
         cfg.SetDiskID(lv, tgt_node)
-        if not self.rpc.call_blockdev_remove(tgt_node, lv):
+        result = self.rpc.call_blockdev_remove(tgt_node, lv)
+        if result.failed or not result.data:
           warning("Can't remove old LV", hint="manually remove unused LVs")
           continue
 
           warning("Can't remove old LV", hint="manually remove unused LVs")
           continue
 
@@ -4408,23 +5046,25 @@ class LUReplaceDisks(LogicalUnit):
     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
     instance = self.instance
     iv_names = {}
     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
     instance = self.instance
     iv_names = {}
-    vgname = self.cfg.GetVGName()
     # start of work
     cfg = self.cfg
     old_node = self.tgt_node
     new_node = self.new_node
     pri_node = instance.primary_node
     # start of work
     cfg = self.cfg
     old_node = self.tgt_node
     new_node = self.new_node
     pri_node = instance.primary_node
+    nodes_ip = {
+      old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
+      new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
+      pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
+      }
 
     # Step: check device activation
     self.proc.LogStep(1, steps_total, "check device existence")
     info("checking volume groups")
     my_vg = cfg.GetVGName()
     results = self.rpc.call_vg_list([pri_node, new_node])
 
     # Step: check device activation
     self.proc.LogStep(1, steps_total, "check device existence")
     info("checking volume groups")
     my_vg = cfg.GetVGName()
     results = self.rpc.call_vg_list([pri_node, new_node])
-    if not results:
-      raise errors.OpExecError("Can't list volume groups on the nodes")
     for node in pri_node, new_node:
     for node in pri_node, new_node:
-      res = results.get(node, False)
-      if not res or my_vg not in res:
+      res = results[node]
+      if res.failed or not res.data or my_vg not in res.data:
         raise errors.OpExecError("Volume group '%s' not found on %s" %
                                  (my_vg, node))
     for idx, dev in enumerate(instance.disks):
         raise errors.OpExecError("Volume group '%s' not found on %s" %
                                  (my_vg, node))
     for idx, dev in enumerate(instance.disks):
@@ -4432,7 +5072,9 @@ class LUReplaceDisks(LogicalUnit):
         continue
       info("checking disk/%d on %s" % (idx, pri_node))
       cfg.SetDiskID(dev, pri_node)
         continue
       info("checking disk/%d on %s" % (idx, pri_node))
       cfg.SetDiskID(dev, pri_node)
-      if not self.rpc.call_blockdev_find(pri_node, dev):
+      result = self.rpc.call_blockdev_find(pri_node, dev)
+      result.Raise()
+      if not result.data:
         raise errors.OpExecError("Can't find disk/%d on node %s" %
                                  (idx, pri_node))
 
         raise errors.OpExecError("Can't find disk/%d on node %s" %
                                  (idx, pri_node))
 
@@ -4450,18 +5092,12 @@ class LUReplaceDisks(LogicalUnit):
     # Step: create new storage
     self.proc.LogStep(3, steps_total, "allocate new storage")
     for idx, dev in enumerate(instance.disks):
     # Step: create new storage
     self.proc.LogStep(3, steps_total, "allocate new storage")
     for idx, dev in enumerate(instance.disks):
-      size = dev.size
       info("adding new local storage on %s for disk/%d" %
            (new_node, idx))
       info("adding new local storage on %s for disk/%d" %
            (new_node, idx))
-      # since we *always* want to create this LV, we use the
-      # _Create...OnPrimary (which forces the creation), even if we
-      # are talking about the secondary node
+      # we pass force_create=True to force LVM creation
       for new_lv in dev.children:
       for new_lv in dev.children:
-        if not _CreateBlockDevOnPrimary(self, new_node, instance, new_lv,
-                                        _GetInstanceInfoText(instance)):
-          raise errors.OpExecError("Failed to create new LV named '%s' on"
-                                   " node '%s'" %
-                                   (new_lv.logical_id[1], new_node))
+        _CreateBlockDev(self, new_node, instance, new_lv, True,
+                        _GetInstanceInfoText(instance), False)
 
     # Step 4: dbrd minors and drbd setups changes
     # after this, we must manually remove the drbd minors on both the
 
     # Step 4: dbrd minors and drbd setups changes
     # after this, we must manually remove the drbd minors on both the
@@ -4473,55 +5109,51 @@ class LUReplaceDisks(LogicalUnit):
     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
       size = dev.size
       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
       size = dev.size
       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
-      # create new devices on new_node
-      if pri_node == dev.logical_id[0]:
-        new_logical_id = (pri_node, new_node,
-                          dev.logical_id[2], dev.logical_id[3], new_minor,
-                          dev.logical_id[5])
+      # create new devices on new_node; note that we create two IDs:
+      # one without port, so the drbd will be activated without
+      # networking information on the new node at this stage, and one
+      # with network, for the latter activation in step 4
+      (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
+      if pri_node == o_node1:
+        p_minor = o_minor1
       else:
       else:
-        new_logical_id = (new_node, pri_node,
-                          dev.logical_id[2], new_minor, dev.logical_id[4],
-                          dev.logical_id[5])
-      iv_names[idx] = (dev, dev.children, new_logical_id)
+        p_minor = o_minor2
+
+      new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
+      new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
+
+      iv_names[idx] = (dev, dev.children, new_net_id)
       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
-                    new_logical_id)
+                    new_net_id)
       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
-                              logical_id=new_logical_id,
+                              logical_id=new_alone_id,
                               children=dev.children)
                               children=dev.children)
-      if not _CreateBlockDevOnSecondary(self, new_node, instance,
-                                        new_drbd, False,
-                                        _GetInstanceInfoText(instance)):
+      try:
+        _CreateSingleBlockDev(self, new_node, instance, new_drbd,
+                              _GetInstanceInfoText(instance), False)
+      except errors.BlockDeviceError:
         self.cfg.ReleaseDRBDMinors(instance.name)
         self.cfg.ReleaseDRBDMinors(instance.name)
-        raise errors.OpExecError("Failed to create new DRBD on"
-                                 " node '%s'" % new_node)
+        raise
 
     for idx, dev in enumerate(instance.disks):
       # we have new devices, shutdown the drbd on the old secondary
       info("shutting down drbd for disk/%d on old node" % idx)
       cfg.SetDiskID(dev, old_node)
 
     for idx, dev in enumerate(instance.disks):
       # we have new devices, shutdown the drbd on the old secondary
       info("shutting down drbd for disk/%d on old node" % idx)
       cfg.SetDiskID(dev, old_node)
-      if not self.rpc.call_blockdev_shutdown(old_node, dev):
+      result = self.rpc.call_blockdev_shutdown(old_node, dev)
+      if result.failed or not result.data:
         warning("Failed to shutdown drbd for disk/%d on old node" % idx,
                 hint="Please cleanup this device manually as soon as possible")
 
     info("detaching primary drbds from the network (=> standalone)")
         warning("Failed to shutdown drbd for disk/%d on old node" % idx,
                 hint="Please cleanup this device manually as soon as possible")
 
     info("detaching primary drbds from the network (=> standalone)")
-    done = 0
-    for idx, dev in enumerate(instance.disks):
-      cfg.SetDiskID(dev, pri_node)
-      # set the network part of the physical (unique in bdev terms) id
-      # to None, meaning detach from network
-      dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
-      # and 'find' the device, which will 'fix' it to match the
-      # standalone state
-      if self.rpc.call_blockdev_find(pri_node, dev):
-        done += 1
-      else:
-        warning("Failed to detach drbd disk/%d from network, unusual case" %
-                idx)
+    result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
+                                               instance.disks)[pri_node]
 
 
-    if not done:
-      # no detaches succeeded (very unlikely)
+    msg = result.RemoteFailMsg()
+    if msg:
+      # detaches didn't succeed (unlikely)
       self.cfg.ReleaseDRBDMinors(instance.name)
       self.cfg.ReleaseDRBDMinors(instance.name)
-      raise errors.OpExecError("Can't detach at least one DRBD from old node")
+      raise errors.OpExecError("Can't detach the disks from the network on"
+                               " old node: %s" % (msg,))
 
     # if we managed to detach at least one, we update all the disks of
     # the instance to point to the new secondary
 
     # if we managed to detach at least one, we update all the disks of
     # the instance to point to the new secondary
@@ -4530,23 +5162,18 @@ class LUReplaceDisks(LogicalUnit):
       dev.logical_id = new_logical_id
       cfg.SetDiskID(dev, pri_node)
     cfg.Update(instance)
       dev.logical_id = new_logical_id
       cfg.SetDiskID(dev, pri_node)
     cfg.Update(instance)
-    # we can remove now the temp minors as now the new values are
-    # written to the config file (and therefore stable)
-    self.cfg.ReleaseDRBDMinors(instance.name)
 
     # and now perform the drbd attach
     info("attaching primary drbds to new secondary (standalone => connected)")
 
     # and now perform the drbd attach
     info("attaching primary drbds to new secondary (standalone => connected)")
-    failures = []
-    for idx, dev in enumerate(instance.disks):
-      info("attaching primary drbd for disk/%d to new secondary node" % idx)
-      # since the attach is smart, it's enough to 'find' the device,
-      # it will automatically activate the network, if the physical_id
-      # is correct
-      cfg.SetDiskID(dev, pri_node)
-      logging.debug("Disk to attach: %s", dev)
-      if not self.rpc.call_blockdev_find(pri_node, dev):
-        warning("can't attach drbd disk/%d to new secondary!" % idx,
-                "please do a gnt-instance info to see the status of disks")
+    result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
+                                           instance.disks, instance.name,
+                                           False)
+    for to_node, to_result in result.items():
+      msg = to_result.RemoteFailMsg()
+      if msg:
+        warning("can't attach drbd disks on node %s: %s", to_node, msg,
+                hint="please do a gnt-instance info to see the"
+                " status of disks")
 
     # this can fail as the old devices are degraded and _WaitForSync
     # does a combined result over all disks, so we don't check its
 
     # this can fail as the old devices are degraded and _WaitForSync
     # does a combined result over all disks, so we don't check its
@@ -4557,8 +5184,9 @@ class LUReplaceDisks(LogicalUnit):
     # so check manually all the devices
     for idx, (dev, old_lvs, _) in iv_names.iteritems():
       cfg.SetDiskID(dev, pri_node)
     # so check manually all the devices
     for idx, (dev, old_lvs, _) in iv_names.iteritems():
       cfg.SetDiskID(dev, pri_node)
-      is_degr = self.rpc.call_blockdev_find(pri_node, dev)[5]
-      if is_degr:
+      result = self.rpc.call_blockdev_find(pri_node, dev)
+      result.Raise()
+      if result.data[5]:
         raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
 
     self.proc.LogStep(6, steps_total, "removing old storage")
         raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
 
     self.proc.LogStep(6, steps_total, "removing old storage")
@@ -4566,7 +5194,8 @@ class LUReplaceDisks(LogicalUnit):
       info("remove logical volumes for disk/%d" % idx)
       for lv in old_lvs:
         cfg.SetDiskID(lv, old_node)
       info("remove logical volumes for disk/%d" % idx)
       for lv in old_lvs:
         cfg.SetDiskID(lv, old_node)
-        if not self.rpc.call_blockdev_remove(old_node, lv):
+        result = self.rpc.call_blockdev_remove(old_node, lv)
+        if result.failed or not result.data:
           warning("Can't remove LV on old secondary",
                   hint="Cleanup stale volumes by hand")
 
           warning("Can't remove LV on old secondary",
                   hint="Cleanup stale volumes by hand")
 
@@ -4579,21 +5208,18 @@ class LUReplaceDisks(LogicalUnit):
     instance = self.instance
 
     # Activate the instance disks if we're replacing them on a down instance
     instance = self.instance
 
     # Activate the instance disks if we're replacing them on a down instance
-    if instance.status == "down":
+    if not instance.admin_up:
       _StartInstanceDisks(self, instance, True)
 
       _StartInstanceDisks(self, instance, True)
 
-    if instance.disk_template == constants.DT_DRBD8:
-      if self.op.remote_node is None:
-        fn = self._ExecD8DiskOnly
-      else:
-        fn = self._ExecD8Secondary
+    if self.op.mode == constants.REPLACE_DISK_CHG:
+      fn = self._ExecD8Secondary
     else:
     else:
-      raise errors.ProgrammerError("Unhandled disk replacement case")
+      fn = self._ExecD8DiskOnly
 
     ret = fn(feedback_fn)
 
     # Deactivate the instance disks if we're replacing them on a down instance
 
     ret = fn(feedback_fn)
 
     # Deactivate the instance disks if we're replacing them on a down instance
-    if instance.status == "down":
+    if not instance.admin_up:
       _SafeShutdownInstanceDisks(self, instance)
 
     return ret
       _SafeShutdownInstanceDisks(self, instance)
 
     return ret
@@ -4643,6 +5269,10 @@ class LUGrowDisk(LogicalUnit):
     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
     assert instance is not None, \
       "Cannot retrieve locked instance %s" % self.op.instance_name
     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
     assert instance is not None, \
       "Cannot retrieve locked instance %s" % self.op.instance_name
+    nodenames = list(instance.all_nodes)
+    for node in nodenames:
+      _CheckNodeOnline(self, node)
+
 
     self.instance = instance
 
 
     self.instance = instance
 
@@ -4652,22 +5282,21 @@ class LUGrowDisk(LogicalUnit):
 
     self.disk = instance.FindDisk(self.op.disk)
 
 
     self.disk = instance.FindDisk(self.op.disk)
 
-    nodenames = [instance.primary_node] + list(instance.secondary_nodes)
     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
                                        instance.hypervisor)
     for node in nodenames:
     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
                                        instance.hypervisor)
     for node in nodenames:
-      info = nodeinfo.get(node, None)
-      if not info:
+      info = nodeinfo[node]
+      if info.failed or not info.data:
         raise errors.OpPrereqError("Cannot get current information"
                                    " from node '%s'" % node)
         raise errors.OpPrereqError("Cannot get current information"
                                    " from node '%s'" % node)
-      vg_free = info.get('vg_free', None)
+      vg_free = info.data.get('vg_free', None)
       if not isinstance(vg_free, int):
         raise errors.OpPrereqError("Can't compute free disk space on"
                                    " node %s" % node)
       if not isinstance(vg_free, int):
         raise errors.OpPrereqError("Can't compute free disk space on"
                                    " node %s" % node)
-      if self.op.amount > info['vg_free']:
+      if self.op.amount > vg_free:
         raise errors.OpPrereqError("Not enough disk space on target node %s:"
                                    " %d MiB available, %d MiB required" %
         raise errors.OpPrereqError("Not enough disk space on target node %s:"
                                    " %d MiB available, %d MiB required" %
-                                   (node, info['vg_free'], self.op.amount))
+                                   (node, vg_free, self.op.amount))
 
   def Exec(self, feedback_fn):
     """Execute disk grow.
 
   def Exec(self, feedback_fn):
     """Execute disk grow.
@@ -4675,15 +5304,16 @@ class LUGrowDisk(LogicalUnit):
     """
     instance = self.instance
     disk = self.disk
     """
     instance = self.instance
     disk = self.disk
-    for node in (instance.secondary_nodes + (instance.primary_node,)):
+    for node in instance.all_nodes:
       self.cfg.SetDiskID(disk, node)
       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
       self.cfg.SetDiskID(disk, node)
       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
-      if (not result or not isinstance(result, (list, tuple)) or
-          len(result) != 2):
-        raise errors.OpExecError("grow request failed to node %s" % node)
-      elif not result[0]:
-        raise errors.OpExecError("grow request failed to node %s: %s" %
-                                 (node, result[1]))
+      result.Raise()
+      if (not result.data or not isinstance(result.data, (list, tuple)) or
+          len(result.data) != 2):
+        raise errors.OpExecError("Grow request failed to node %s" % node)
+      elif not result.data[0]:
+        raise errors.OpExecError("Grow request failed to node %s: %s" %
+                                 (node, result.data[1]))
     disk.RecordGrow(self.op.amount)
     self.cfg.Update(instance)
     if self.op.wait_for_sync:
     disk.RecordGrow(self.op.amount)
     self.cfg.Update(instance)
     if self.op.wait_for_sync:
@@ -4712,8 +5342,7 @@ class LUQueryInstanceData(NoHooksLU):
       for name in self.op.instances:
         full_name = self.cfg.ExpandInstanceName(name)
         if full_name is None:
       for name in self.op.instances:
         full_name = self.cfg.ExpandInstanceName(name)
         if full_name is None:
-          raise errors.OpPrereqError("Instance '%s' not known" %
-                                     self.op.instance_name)
+          raise errors.OpPrereqError("Instance '%s' not known" % name)
         self.wanted_names.append(full_name)
       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
     else:
         self.wanted_names.append(full_name)
       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
     else:
@@ -4748,6 +5377,8 @@ class LUQueryInstanceData(NoHooksLU):
     if not static:
       self.cfg.SetDiskID(dev, instance.primary_node)
       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
     if not static:
       self.cfg.SetDiskID(dev, instance.primary_node)
       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
+      dev_pstatus.Raise()
+      dev_pstatus = dev_pstatus.data
     else:
       dev_pstatus = None
 
     else:
       dev_pstatus = None
 
@@ -4761,6 +5392,8 @@ class LUQueryInstanceData(NoHooksLU):
     if snode and not static:
       self.cfg.SetDiskID(dev, snode)
       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
     if snode and not static:
       self.cfg.SetDiskID(dev, snode)
       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
+      dev_sstatus.Raise()
+      dev_sstatus = dev_sstatus.data
     else:
       dev_sstatus = None
 
     else:
       dev_sstatus = None
 
@@ -4794,16 +5427,18 @@ class LUQueryInstanceData(NoHooksLU):
         remote_info = self.rpc.call_instance_info(instance.primary_node,
                                                   instance.name,
                                                   instance.hypervisor)
         remote_info = self.rpc.call_instance_info(instance.primary_node,
                                                   instance.name,
                                                   instance.hypervisor)
+        remote_info.Raise()
+        remote_info = remote_info.data
         if remote_info and "state" in remote_info:
           remote_state = "up"
         else:
           remote_state = "down"
       else:
         remote_state = None
         if remote_info and "state" in remote_info:
           remote_state = "up"
         else:
           remote_state = "down"
       else:
         remote_state = None
-      if instance.status == "down":
-        config_state = "down"
-      else:
+      if instance.admin_up:
         config_state = "up"
         config_state = "up"
+      else:
+        config_state = "down"
 
       disks = [self._ComputeDiskStatus(instance, None, device)
                for device in instance.disks]
 
       disks = [self._ComputeDiskStatus(instance, None, device)
                for device in instance.disks]
@@ -4853,14 +5488,8 @@ class LUSetInstanceParams(LogicalUnit):
             self.op.hvparams or self.op.beparams):
       raise errors.OpPrereqError("No changes submitted")
 
             self.op.hvparams or self.op.beparams):
       raise errors.OpPrereqError("No changes submitted")
 
-    for item in (constants.BE_MEMORY, constants.BE_VCPUS):
-      val = self.op.beparams.get(item, None)
-      if val is not None:
-        try:
-          val = int(val)
-        except ValueError, err:
-          raise errors.OpPrereqError("Invalid %s size: %s" % (item, str(err)))
-        self.op.beparams[item] = val
+    utils.CheckBEParams(self.op.beparams)
+
     # Disk validation
     disk_addremove = 0
     for disk_op, disk_dict in self.op.disks:
     # Disk validation
     disk_addremove = 0
     for disk_op, disk_dict in self.op.disks:
@@ -4954,8 +5583,7 @@ class LUSetInstanceParams(LogicalUnit):
       args['vcpus'] = self.be_new[constants.BE_VCPUS]
     # FIXME: readd disk/nic changes
     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
       args['vcpus'] = self.be_new[constants.BE_VCPUS]
     # FIXME: readd disk/nic changes
     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
-    nl = [self.cfg.GetMasterNode(),
-          self.instance.primary_node] + list(self.instance.secondary_nodes)
+    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
     return env, nl, nl
 
   def CheckPrereq(self):
     return env, nl, nl
 
   def CheckPrereq(self):
@@ -4971,19 +5599,20 @@ class LUSetInstanceParams(LogicalUnit):
     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
     assert self.instance is not None, \
       "Cannot retrieve locked instance %s" % self.op.instance_name
     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
     assert self.instance is not None, \
       "Cannot retrieve locked instance %s" % self.op.instance_name
-    pnode = self.instance.primary_node
-    nodelist = [pnode]
-    nodelist.extend(instance.secondary_nodes)
+    pnode = instance.primary_node
+    nodelist = list(instance.all_nodes)
 
     # hvparams processing
     if self.op.hvparams:
       i_hvdict = copy.deepcopy(instance.hvparams)
       for key, val in self.op.hvparams.iteritems():
 
     # hvparams processing
     if self.op.hvparams:
       i_hvdict = copy.deepcopy(instance.hvparams)
       for key, val in self.op.hvparams.iteritems():
-        if val is None:
+        if val == constants.VALUE_DEFAULT:
           try:
             del i_hvdict[key]
           except KeyError:
             pass
           try:
             del i_hvdict[key]
           except KeyError:
             pass
+        elif val == constants.VALUE_NONE:
+          i_hvdict[key] = None
         else:
           i_hvdict[key] = val
       cluster = self.cfg.GetClusterInfo()
         else:
           i_hvdict[key] = val
       cluster = self.cfg.GetClusterInfo()
@@ -5002,7 +5631,7 @@ class LUSetInstanceParams(LogicalUnit):
     if self.op.beparams:
       i_bedict = copy.deepcopy(instance.beparams)
       for key, val in self.op.beparams.iteritems():
     if self.op.beparams:
       i_bedict = copy.deepcopy(instance.beparams)
       for key, val in self.op.beparams.iteritems():
-        if val is None:
+        if val == constants.VALUE_DEFAULT:
           try:
             del i_bedict[key]
           except KeyError:
           try:
             del i_bedict[key]
           except KeyError:
@@ -5028,30 +5657,31 @@ class LUSetInstanceParams(LogicalUnit):
                                                   instance.hypervisor)
       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
                                          instance.hypervisor)
                                                   instance.hypervisor)
       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
                                          instance.hypervisor)
-
-      if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
+      if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
         # Assume the primary node is unreachable and go ahead
         self.warn.append("Can't get info from primary node %s" % pnode)
       else:
         # Assume the primary node is unreachable and go ahead
         self.warn.append("Can't get info from primary node %s" % pnode)
       else:
-        if instance_info:
-          current_mem = instance_info['memory']
+        if not instance_info.failed and instance_info.data:
+          current_mem = instance_info.data['memory']
         else:
           # Assume instance not running
           # (there is a slight race condition here, but it's not very probable,
           # and we have no other way to check)
           current_mem = 0
         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
         else:
           # Assume instance not running
           # (there is a slight race condition here, but it's not very probable,
           # and we have no other way to check)
           current_mem = 0
         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
-                    nodeinfo[pnode]['memory_free'])
+                    nodeinfo[pnode].data['memory_free'])
         if miss_mem > 0:
           raise errors.OpPrereqError("This change will prevent the instance"
                                      " from starting, due to %d MB of memory"
                                      " missing on its primary node" % miss_mem)
 
       if be_new[constants.BE_AUTO_BALANCE]:
         if miss_mem > 0:
           raise errors.OpPrereqError("This change will prevent the instance"
                                      " from starting, due to %d MB of memory"
                                      " missing on its primary node" % miss_mem)
 
       if be_new[constants.BE_AUTO_BALANCE]:
-        for node in instance.secondary_nodes:
-          if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
+        for node, nres in nodeinfo.iteritems():
+          if node not in instance.secondary_nodes:
+            continue
+          if nres.failed or not isinstance(nres.data, dict):
             self.warn.append("Can't get info from secondary node %s" % node)
             self.warn.append("Can't get info from secondary node %s" % node)
-          elif be_new[constants.BE_MEMORY] > nodeinfo[node]['memory_free']:
+          elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
             self.warn.append("Not enough memory to failover instance to"
                              " secondary node %s" % node)
 
             self.warn.append("Not enough memory to failover instance to"
                              " secondary node %s" % node)
 
@@ -5088,9 +5718,9 @@ class LUSetInstanceParams(LogicalUnit):
                                      " an instance")
         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
         ins_l = ins_l[pnode]
                                      " an instance")
         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
         ins_l = ins_l[pnode]
-        if not type(ins_l) is list:
+        if ins_l.failed or not isinstance(ins_l.data, list):
           raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
           raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
-        if instance.name in ins_l:
+        if instance.name in ins_l.data:
           raise errors.OpPrereqError("Instance is running, can't remove"
                                      " disks.")
 
           raise errors.OpPrereqError("Instance is running, can't remove"
                                      " disks.")
 
@@ -5128,7 +5758,8 @@ class LUSetInstanceParams(LogicalUnit):
         device_idx = len(instance.disks)
         for node, disk in device.ComputeNodeTree(instance.primary_node):
           self.cfg.SetDiskID(disk, node)
         device_idx = len(instance.disks)
         for node, disk in device.ComputeNodeTree(instance.primary_node):
           self.cfg.SetDiskID(disk, node)
-          if not self.rpc.call_blockdev_remove(node, disk):
+          rpc_result = self.rpc.call_blockdev_remove(node, disk)
+          if rpc_result.failed or not rpc_result.data:
             self.proc.LogWarning("Could not remove disk/%d on node %s,"
                                  " continuing anyway", device_idx, node)
         result.append(("disk/%d" % device_idx, "remove"))
             self.proc.LogWarning("Could not remove disk/%d on node %s,"
                                  " continuing anyway", device_idx, node)
         result.append(("disk/%d" % device_idx, "remove"))
@@ -5142,7 +5773,7 @@ class LUSetInstanceParams(LogicalUnit):
         disk_idx_base = len(instance.disks)
         new_disk = _GenerateDiskTemplate(self,
                                          instance.disk_template,
         disk_idx_base = len(instance.disks)
         new_disk = _GenerateDiskTemplate(self,
                                          instance.disk_template,
-                                         instance, instance.primary_node,
+                                         instance.name, instance.primary_node,
                                          instance.secondary_nodes,
                                          [disk_dict],
                                          file_path,
                                          instance.secondary_nodes,
                                          [disk_dict],
                                          file_path,
@@ -5156,17 +5787,15 @@ class LUSetInstanceParams(LogicalUnit):
                      new_disk.iv_name, instance.name)
         # Note: this needs to be kept in sync with _CreateDisks
         #HARDCODE
                      new_disk.iv_name, instance.name)
         # Note: this needs to be kept in sync with _CreateDisks
         #HARDCODE
-        for secondary_node in instance.secondary_nodes:
-          if not _CreateBlockDevOnSecondary(self, secondary_node, instance,
-                                            new_disk, False, info):
+        for node in instance.all_nodes:
+          f_create = node == instance.primary_node
+          try:
+            _CreateBlockDev(self, node, instance, new_disk,
+                            f_create, info, f_create)
+          except errors.OpExecError, err:
             self.LogWarning("Failed to create volume %s (%s) on"
             self.LogWarning("Failed to create volume %s (%s) on"
-                            " secondary node %s!",
-                            new_disk.iv_name, new_disk, secondary_node)
-        #HARDCODE
-        if not _CreateBlockDevOnPrimary(self, instance.primary_node,
-                                        instance, new_disk, info):
-          self.LogWarning("Failed to create volume %s on primary!",
-                          new_disk.iv_name)
+                            " node %s: %s",
+                            new_disk.iv_name, new_disk, node, err)
         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
                        (new_disk.size, new_disk.mode)))
       else:
         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
                        (new_disk.size, new_disk.mode)))
       else:
@@ -5248,7 +5877,15 @@ class LUQueryExports(NoHooksLU):
         that node.
 
     """
         that node.
 
     """
-    return self.rpc.call_export_list(self.nodes)
+    rpcresult = self.rpc.call_export_list(self.nodes)
+    result = {}
+    for node in rpcresult:
+      if rpcresult[node].failed:
+        result[node] = False
+      else:
+        result[node] = rpcresult[node].data
+
+    return result
 
 
 class LUExportInstance(LogicalUnit):
 
 
 class LUExportInstance(LogicalUnit):
@@ -5301,6 +5938,7 @@ class LUExportInstance(LogicalUnit):
     self.instance = self.cfg.GetInstanceInfo(instance_name)
     assert self.instance is not None, \
           "Cannot retrieve locked instance %s" % self.op.instance_name
     self.instance = self.cfg.GetInstanceInfo(instance_name)
     assert self.instance is not None, \
           "Cannot retrieve locked instance %s" % self.op.instance_name
+    _CheckNodeOnline(self, self.instance.primary_node)
 
     self.dst_node = self.cfg.GetNodeInfo(
       self.cfg.ExpandNodeName(self.op.target_node))
 
     self.dst_node = self.cfg.GetNodeInfo(
       self.cfg.ExpandNodeName(self.op.target_node))
@@ -5308,6 +5946,7 @@ class LUExportInstance(LogicalUnit):
     if self.dst_node is None:
       # This is wrong node name, not a non-locked node
       raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
     if self.dst_node is None:
       # This is wrong node name, not a non-locked node
       raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
+    _CheckNodeOnline(self, self.dst_node.name)
 
     # instance disk type verification
     for disk in self.instance.disks:
 
     # instance disk type verification
     for disk in self.instance.disks:
@@ -5324,7 +5963,9 @@ class LUExportInstance(LogicalUnit):
     src_node = instance.primary_node
     if self.op.shutdown:
       # shutdown the instance, but not the disks
     src_node = instance.primary_node
     if self.op.shutdown:
       # shutdown the instance, but not the disks
-      if not self.rpc.call_instance_shutdown(src_node, instance):
+      result = self.rpc.call_instance_shutdown(src_node, instance)
+      result.Raise()
+      if not result.data:
         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
                                  (instance.name, src_node))
 
         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
                                  (instance.name, src_node))
 
@@ -5332,43 +5973,52 @@ class LUExportInstance(LogicalUnit):
 
     snap_disks = []
 
 
     snap_disks = []
 
+    # set the disks ID correctly since call_instance_start needs the
+    # correct drbd minor to create the symlinks
+    for disk in instance.disks:
+      self.cfg.SetDiskID(disk, src_node)
+
     try:
       for disk in instance.disks:
         # new_dev_name will be a snapshot of an lvm leaf of the one we passed
         new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
     try:
       for disk in instance.disks:
         # new_dev_name will be a snapshot of an lvm leaf of the one we passed
         new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
-
-        if not new_dev_name:
+        if new_dev_name.failed or not new_dev_name.data:
           self.LogWarning("Could not snapshot block device %s on node %s",
                           disk.logical_id[1], src_node)
           snap_disks.append(False)
         else:
           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
           self.LogWarning("Could not snapshot block device %s on node %s",
                           disk.logical_id[1], src_node)
           snap_disks.append(False)
         else:
           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
-                                 logical_id=(vgname, new_dev_name),
-                                 physical_id=(vgname, new_dev_name),
+                                 logical_id=(vgname, new_dev_name.data),
+                                 physical_id=(vgname, new_dev_name.data),
                                  iv_name=disk.iv_name)
           snap_disks.append(new_dev)
 
     finally:
                                  iv_name=disk.iv_name)
           snap_disks.append(new_dev)
 
     finally:
-      if self.op.shutdown and instance.status == "up":
-        if not self.rpc.call_instance_start(src_node, instance, None):
+      if self.op.shutdown and instance.admin_up:
+        result = self.rpc.call_instance_start(src_node, instance, None)
+        msg = result.RemoteFailMsg()
+        if msg:
           _ShutdownInstanceDisks(self, instance)
           _ShutdownInstanceDisks(self, instance)
-          raise errors.OpExecError("Could not start instance")
+          raise errors.OpExecError("Could not start instance: %s" % msg)
 
     # TODO: check for size
 
     cluster_name = self.cfg.GetClusterName()
     for idx, dev in enumerate(snap_disks):
       if dev:
 
     # TODO: check for size
 
     cluster_name = self.cfg.GetClusterName()
     for idx, dev in enumerate(snap_disks):
       if dev:
-        if not self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
-                                             instance, cluster_name, idx):
+        result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
+                                               instance, cluster_name, idx)
+        if result.failed or not result.data:
           self.LogWarning("Could not export block device %s from node %s to"
                           " node %s", dev.logical_id[1], src_node,
                           dst_node.name)
           self.LogWarning("Could not export block device %s from node %s to"
                           " node %s", dev.logical_id[1], src_node,
                           dst_node.name)
-        if not self.rpc.call_blockdev_remove(src_node, dev):
+        result = self.rpc.call_blockdev_remove(src_node, dev)
+        if result.failed or not result.data:
           self.LogWarning("Could not remove snapshot block device %s from node"
                           " %s", dev.logical_id[1], src_node)
 
           self.LogWarning("Could not remove snapshot block device %s from node"
                           " %s", dev.logical_id[1], src_node)
 
-    if not self.rpc.call_finalize_export(dst_node.name, instance, snap_disks):
+    result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
+    if result.failed or not result.data:
       self.LogWarning("Could not finalize export for instance %s on node %s",
                       instance.name, dst_node.name)
 
       self.LogWarning("Could not finalize export for instance %s on node %s",
                       instance.name, dst_node.name)
 
@@ -5381,7 +6031,9 @@ class LUExportInstance(LogicalUnit):
     if nodelist:
       exportlist = self.rpc.call_export_list(nodelist)
       for node in exportlist:
     if nodelist:
       exportlist = self.rpc.call_export_list(nodelist)
       for node in exportlist:
-        if instance.name in exportlist[node]:
+        if exportlist[node].failed:
+          continue
+        if instance.name in exportlist[node].data:
           if not self.rpc.call_export_remove(node, instance.name):
             self.LogWarning("Could not remove older export for instance %s"
                             " on node %s", instance.name, node)
           if not self.rpc.call_export_remove(node, instance.name):
             self.LogWarning("Could not remove older export for instance %s"
                             " on node %s", instance.name, node)
@@ -5422,9 +6074,13 @@ class LURemoveExport(NoHooksLU):
       locking.LEVEL_NODE])
     found = False
     for node in exportlist:
       locking.LEVEL_NODE])
     found = False
     for node in exportlist:
-      if instance_name in exportlist[node]:
+      if exportlist[node].failed:
+        self.LogWarning("Failed to query node %s, continuing" % node)
+        continue
+      if instance_name in exportlist[node].data:
         found = True
         found = True
-        if not self.rpc.call_export_remove(node, instance_name):
+        result = self.rpc.call_export_remove(node, instance_name)
+        if result.failed or not result.data:
           logging.error("Could not remove export for instance %s"
                         " on node %s", instance_name, node)
 
           logging.error("Could not remove export for instance %s"
                         " on node %s", instance_name, node)
 
@@ -5641,9 +6297,10 @@ class LUTestDelay(NoHooksLU):
       if not result:
         raise errors.OpExecError("Complete failure from rpc call")
       for node, node_result in result.items():
       if not result:
         raise errors.OpExecError("Complete failure from rpc call")
       for node, node_result in result.items():
-        if not node_result:
+        node_result.Raise()
+        if not node_result.data:
           raise errors.OpExecError("Failure during rpc call to node %s,"
           raise errors.OpExecError("Failure during rpc call to node %s,"
-                                   " result: %s" % (node, node_result))
+                                   " result: %s" % (node, node_result.data))
 
 
 class IAllocator(object):
 
 
 class IAllocator(object):
@@ -5676,6 +6333,7 @@ class IAllocator(object):
     self.name = name
     self.mem_size = self.disks = self.disk_template = None
     self.os = self.tags = self.nics = self.vcpus = None
     self.name = name
     self.mem_size = self.disks = self.disk_template = None
     self.os = self.tags = self.nics = self.vcpus = None
+    self.hypervisor = None
     self.relocate_from = None
     # computed fields
     self.required_nodes = None
     self.relocate_from = None
     # computed fields
     self.required_nodes = None
@@ -5723,19 +6381,20 @@ class IAllocator(object):
     node_list = cfg.GetNodeList()
 
     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
     node_list = cfg.GetNodeList()
 
     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
-      hypervisor = self.hypervisor
+      hypervisor_name = self.hypervisor
     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
-      hypervisor = cfg.GetInstanceInfo(self.name).hypervisor
+      hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
 
     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
 
     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
-                                           hypervisor)
+                                           hypervisor_name)
     node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
                        cluster_info.enabled_hypervisors)
     for nname in node_list:
       ninfo = cfg.GetNodeInfo(nname)
     node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
                        cluster_info.enabled_hypervisors)
     for nname in node_list:
       ninfo = cfg.GetNodeInfo(nname)
-      if nname not in node_data or not isinstance(node_data[nname], dict):
+      node_data[nname].Raise()
+      if not isinstance(node_data[nname].data, dict):
         raise errors.OpExecError("Can't get data for node %s" % nname)
         raise errors.OpExecError("Can't get data for node %s" % nname)
-      remote_info = node_data[nname]
+      remote_info = node_data[nname].data
       for attr in ['memory_total', 'memory_free', 'memory_dom0',
                    'vg_size', 'vg_free', 'cpu_total']:
         if attr not in remote_info:
       for attr in ['memory_total', 'memory_free', 'memory_dom0',
                    'vg_size', 'vg_free', 'cpu_total']:
         if attr not in remote_info:
@@ -5758,7 +6417,7 @@ class IAllocator(object):
           i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
           remote_info['memory_free'] -= max(0, i_mem_diff)
 
           i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
           remote_info['memory_free'] -= max(0, i_mem_diff)
 
-          if iinfo.status == "up":
+          if iinfo.admin_up:
             i_p_up_mem += beinfo[constants.BE_MEMORY]
 
       # compute memory used by instances
             i_p_up_mem += beinfo[constants.BE_MEMORY]
 
       # compute memory used by instances
@@ -5774,6 +6433,7 @@ class IAllocator(object):
         "primary_ip": ninfo.primary_ip,
         "secondary_ip": ninfo.secondary_ip,
         "total_cpus": remote_info['cpu_total'],
         "primary_ip": ninfo.primary_ip,
         "secondary_ip": ninfo.secondary_ip,
         "total_cpus": remote_info['cpu_total'],
+        "offline": ninfo.offline,
         }
       node_results[nname] = pnr
     data["nodes"] = node_results
         }
       node_results[nname] = pnr
     data["nodes"] = node_results
@@ -5785,11 +6445,11 @@ class IAllocator(object):
                   for n in iinfo.nics]
       pir = {
         "tags": list(iinfo.GetTags()),
                   for n in iinfo.nics]
       pir = {
         "tags": list(iinfo.GetTags()),
-        "should_run": iinfo.status == "up",
+        "should_run": iinfo.admin_up,
         "vcpus": beinfo[constants.BE_VCPUS],
         "memory": beinfo[constants.BE_MEMORY],
         "os": iinfo.os,
         "vcpus": beinfo[constants.BE_VCPUS],
         "memory": beinfo[constants.BE_MEMORY],
         "os": iinfo.os,
-        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
+        "nodes": list(iinfo.all_nodes),
         "nics": nic_data,
         "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
         "disk_template": iinfo.disk_template,
         "nics": nic_data,
         "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
         "disk_template": iinfo.disk_template,
@@ -5812,8 +6472,6 @@ class IAllocator(object):
 
     """
     data = self.in_data
 
     """
     data = self.in_data
-    if len(self.disks) != 2:
-      raise errors.OpExecError("Only two-disk configurations supported")
 
     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
 
 
     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
 
@@ -5892,11 +6550,12 @@ class IAllocator(object):
     data = self.in_text
 
     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
     data = self.in_text
 
     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
+    result.Raise()
 
 
-    if not isinstance(result, (list, tuple)) or len(result) != 4:
+    if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
       raise errors.OpExecError("Invalid result from master iallocator runner")
 
       raise errors.OpExecError("Invalid result from master iallocator runner")
 
-    rcode, stdout, stderr, fail = result
+    rcode, stdout, stderr, fail = result.data
 
     if rcode == constants.IARUN_NOTFOUND:
       raise errors.OpExecError("Can't find allocator '%s'" % name)
 
     if rcode == constants.IARUN_NOTFOUND:
       raise errors.OpExecError("Can't find allocator '%s'" % name)
@@ -5969,8 +6628,6 @@ class LUTestAllocator(NoHooksLU):
                                      " 'nics' parameter")
       if not isinstance(self.op.disks, list):
         raise errors.OpPrereqError("Invalid parameter 'disks'")
                                      " 'nics' parameter")
       if not isinstance(self.op.disks, list):
         raise errors.OpPrereqError("Invalid parameter 'disks'")
-      if len(self.op.disks) != 2:
-        raise errors.OpPrereqError("Only two-disk configurations supported")
       for row in self.op.disks:
         if (not isinstance(row, dict) or
             "size" not in row or
       for row in self.op.disks:
         if (not isinstance(row, dict) or
             "size" not in row or
@@ -5979,7 +6636,7 @@ class LUTestAllocator(NoHooksLU):
             row["mode"] not in ['r', 'w']):
           raise errors.OpPrereqError("Invalid contents of the"
                                      " 'disks' parameter")
             row["mode"] not in ['r', 'w']):
           raise errors.OpPrereqError("Invalid contents of the"
                                      " 'disks' parameter")
-      if self.op.hypervisor is None:
+      if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
         self.op.hypervisor = self.cfg.GetHypervisorType()
     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
       if not hasattr(self.op, "name"):
         self.op.hypervisor = self.cfg.GetHypervisorType()
     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
       if not hasattr(self.op, "name"):