Convert AddOSToInstance to (status, data)
[ganeti-local] / lib / cmdlib.py
index 4f7c68e..ce5edc6 100644 (file)
@@ -881,6 +881,7 @@ class LUVerifyCluster(LogicalUnit):
 
     file_names = ssconf.SimpleStore().GetFileList()
     file_names.append(constants.SSL_CERT_FILE)
+    file_names.append(constants.RAPI_CERT_FILE)
     file_names.extend(master_files)
 
     local_checksums = utils.FingerprintFiles(file_names)
@@ -888,10 +889,12 @@ class LUVerifyCluster(LogicalUnit):
     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
     node_verify_param = {
       constants.NV_FILELIST: file_names,
-      constants.NV_NODELIST: nodelist,
+      constants.NV_NODELIST: [node.name for node in nodeinfo
+                              if not node.offline],
       constants.NV_HYPERVISOR: hypervisors,
       constants.NV_NODENETTEST: [(node.name, node.primary_ip,
-                                  node.secondary_ip) for node in nodeinfo],
+                                  node.secondary_ip) for node in nodeinfo
+                                 if not node.offline],
       constants.NV_LVLIST: vg_name,
       constants.NV_INSTANCELIST: hypervisors,
       constants.NV_VGLIST: None,
@@ -1270,7 +1273,8 @@ class LURenameCluster(LogicalUnit):
                                          constants.SSH_KNOWN_HOSTS_FILE)
       for to_node, to_result in result.iteritems():
         if to_result.failed or not to_result.data:
-          logging.error("Copy of file %s to node %s failed", fname, to_node)
+          logging.error("Copy of file %s to node %s failed",
+                        constants.SSH_KNOWN_HOSTS_FILE, to_node)
 
     finally:
       result = self.rpc.call_node_start_master(master, False)
@@ -1433,6 +1437,33 @@ class LUSetClusterParams(LogicalUnit):
       _AdjustCandidatePool(self)
 
 
+class LURedistributeConfig(NoHooksLU):
+  """Force the redistribution of cluster configuration.
+
+  This is a very simple LU.
+
+  """
+  _OP_REQP = []
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    self.needed_locks = {
+      locking.LEVEL_NODE: locking.ALL_SET,
+    }
+    self.share_locks[locking.LEVEL_NODE] = 1
+
+  def CheckPrereq(self):
+    """Check prerequisites.
+
+    """
+
+  def Exec(self, feedback_fn):
+    """Redistribute the configuration.
+
+    """
+    self.cfg.Update(self.cfg.GetClusterInfo())
+
+
 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
   """Sleep and poll for an instance's disk to sync.
 
@@ -1583,10 +1614,12 @@ class LUDiagnoseOS(NoHooksLU):
 
     """
     node_list = self.acquired_locks[locking.LEVEL_NODE]
-    node_data = self.rpc.call_os_diagnose(node_list)
+    valid_nodes = [node for node in self.cfg.GetOnlineNodeList()
+                   if node in node_list]
+    node_data = self.rpc.call_os_diagnose(valid_nodes)
     if node_data == False:
       raise errors.OpExecError("Can't gather the list of OSes")
-    pol = self._DiagnoseByOS(node_list, node_data)
+    pol = self._DiagnoseByOS(valid_nodes, node_data)
     output = []
     for os_name, os_data in pol.iteritems():
       row = []
@@ -1654,11 +1687,8 @@ class LURemoveNode(LogicalUnit):
 
     for instance_name in instance_list:
       instance = self.cfg.GetInstanceInfo(instance_name)
-      if node.name == instance.primary_node:
-        raise errors.OpPrereqError("Instance %s still running on the node,"
-                                   " please remove first." % instance_name)
-      if node.name in instance.secondary_nodes:
-        raise errors.OpPrereqError("Instance %s has node as a secondary,"
+      if node.name in instance.all_nodes:
+        raise errors.OpPrereqError("Instance %s is still running on the node,"
                                    " please remove first." % instance_name)
     self.op.node_name = node.name
     self.node = node
@@ -2010,7 +2040,6 @@ class LUAddNode(LogicalUnit):
                                    " based ping to noded port")
 
     cp_size = self.cfg.GetClusterInfo().candidate_pool_size
-    node_info = self.cfg.GetAllNodesInfo().values()
     mc_now, _ = self.cfg.GetMasterCandidateStats()
     master_candidate = mc_now < cp_size
 
@@ -2177,7 +2206,6 @@ class LUSetNodeParams(LogicalUnit):
         raise errors.OpPrereqError("The master node has to be a"
                                    " master candidate and online")
       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
-      node_info = self.cfg.GetAllNodesInfo().values()
       num_candidates, _ = self.cfg.GetMasterCandidateStats()
       if num_candidates <= cp_size:
         msg = ("Not enough master candidates (desired"
@@ -2334,7 +2362,7 @@ class LUActivateInstanceDisks(NoHooksLU):
     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
     assert self.instance is not None, \
       "Cannot retrieve locked instance %s" % self.op.instance_name
-    _CheckNodeOnline(self, instance.primary_node)
+    _CheckNodeOnline(self, self.instance.primary_node)
 
   def Exec(self, feedback_fn):
     """Activate the disks.
@@ -2402,7 +2430,7 @@ def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
                            " (is_primary=True, pass=2)",
                            inst_disk.iv_name, node)
         disks_ok = False
-    device_info.append((instance.primary_node, inst_disk.iv_name, result))
+    device_info.append((instance.primary_node, inst_disk.iv_name, result.data))
 
   # leave the disks configured for the primary node
   # this is a workaround that would be fixed better by
@@ -2505,7 +2533,7 @@ def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
   return result
 
 
-def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
+def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
   """Checks if a node has enough free memory.
 
   This function check if a given node has the needed amount of free
@@ -2521,13 +2549,13 @@ def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
   @param reason: string to use in the error message
   @type requested: C{int}
   @param requested: the amount of memory in MiB to check for
-  @type hypervisor: C{str}
-  @param hypervisor: the hypervisor to ask for memory stats
+  @type hypervisor_name: C{str}
+  @param hypervisor_name: the hypervisor to ask for memory stats
   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
       we cannot check the node
 
   """
-  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
+  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
   nodeinfo[node].Raise()
   free_mem = nodeinfo[node].data.get('memory_free')
   if not isinstance(free_mem, int):
@@ -2561,8 +2589,7 @@ class LUStartupInstance(LogicalUnit):
       "FORCE": self.op.force,
       }
     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
-    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
-          list(self.instance.secondary_nodes))
+    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
     return env, nl, nl
 
   def CheckPrereq(self):
@@ -2600,9 +2627,10 @@ class LUStartupInstance(LogicalUnit):
     _StartInstanceDisks(self, instance, force)
 
     result = self.rpc.call_instance_start(node_current, instance, extra_args)
-    if result.failed or not result.data:
+    msg = result.RemoteFailMsg()
+    if msg:
       _ShutdownInstanceDisks(self, instance)
-      raise errors.OpExecError("Could not start instance")
+      raise errors.OpExecError("Could not start instance: %s" % msg)
 
 
 class LURebootInstance(LogicalUnit):
@@ -2634,8 +2662,7 @@ class LURebootInstance(LogicalUnit):
       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
       }
     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
-    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
-          list(self.instance.secondary_nodes))
+    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
     return env, nl, nl
 
   def CheckPrereq(self):
@@ -2676,9 +2703,11 @@ class LURebootInstance(LogicalUnit):
       _ShutdownInstanceDisks(self, instance)
       _StartInstanceDisks(self, instance, ignore_secondaries)
       result = self.rpc.call_instance_start(node_current, instance, extra_args)
-      if result.failed or not result.data:
+      msg = result.RemoteFailMsg()
+      if msg:
         _ShutdownInstanceDisks(self, instance)
-        raise errors.OpExecError("Could not start instance for full reboot")
+        raise errors.OpExecError("Could not start instance for"
+                                 " full reboot: %s" % msg)
 
     self.cfg.MarkInstanceUp(instance.name)
 
@@ -2702,8 +2731,7 @@ class LUShutdownInstance(LogicalUnit):
 
     """
     env = _BuildInstanceHookEnvByObject(self, self.instance)
-    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
-          list(self.instance.secondary_nodes))
+    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
     return env, nl, nl
 
   def CheckPrereq(self):
@@ -2715,7 +2743,7 @@ class LUShutdownInstance(LogicalUnit):
     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
     assert self.instance is not None, \
       "Cannot retrieve locked instance %s" % self.op.instance_name
-    _CheckNodeOnline(self, instance.primary_node)
+    _CheckNodeOnline(self, self.instance.primary_node)
 
   def Exec(self, feedback_fn):
     """Shutdown the instance.
@@ -2750,8 +2778,7 @@ class LUReinstallInstance(LogicalUnit):
 
     """
     env = _BuildInstanceHookEnvByObject(self, self.instance)
-    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
-          list(self.instance.secondary_nodes))
+    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
     return env, nl, nl
 
   def CheckPrereq(self):
@@ -2810,11 +2837,11 @@ class LUReinstallInstance(LogicalUnit):
     try:
       feedback_fn("Running the instance OS create scripts...")
       result = self.rpc.call_instance_os_add(inst.primary_node, inst)
-      result.Raise()
-      if not result.data:
+      msg = result.RemoteFailMsg()
+      if msg:
         raise errors.OpExecError("Could not install OS for instance %s"
-                                 " on node %s" %
-                                 (inst.name, inst.primary_node))
+                                 " on node %s: %s" %
+                                 (inst.name, inst.primary_node, msg))
     finally:
       _ShutdownInstanceDisks(self, inst)
 
@@ -2835,8 +2862,7 @@ class LURenameInstance(LogicalUnit):
     """
     env = _BuildInstanceHookEnvByObject(self, self.instance)
     env["INSTANCE_NEW_NAME"] = self.op.new_name
-    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
-          list(self.instance.secondary_nodes))
+    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
     return env, nl, nl
 
   def CheckPrereq(self):
@@ -3350,60 +3376,390 @@ class LUFailoverInstance(LogicalUnit):
 
       feedback_fn("* starting the instance on the target node")
       result = self.rpc.call_instance_start(target_node, instance, None)
-      if result.failed or not result.data:
+      msg = result.RemoteFailMsg()
+      if msg:
         _ShutdownInstanceDisks(self, instance)
-        raise errors.OpExecError("Could not start instance %s on node %s." %
-                                 (instance.name, target_node))
+        raise errors.OpExecError("Could not start instance %s on node %s: %s" %
+                                 (instance.name, target_node, msg))
 
 
-def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
-  """Create a tree of block devices on the primary node.
+class LUMigrateInstance(LogicalUnit):
+  """Migrate an instance.
 
-  This always creates all devices.
+  This is migration without shutting down, compared to the failover,
+  which is done with shutdown.
 
   """
-  if device.children:
-    for child in device.children:
-      if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
-        return False
+  HPATH = "instance-migrate"
+  HTYPE = constants.HTYPE_INSTANCE
+  _OP_REQP = ["instance_name", "live", "cleanup"]
 
-  lu.cfg.SetDiskID(device, node)
-  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
-                                       instance.name, True, info)
-  if new_id.failed or not new_id.data:
-    return False
-  if device.physical_id is None:
-    device.physical_id = new_id
-  return True
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    self._ExpandAndLockInstance()
+    self.needed_locks[locking.LEVEL_NODE] = []
+    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+
+  def DeclareLocks(self, level):
+    if level == locking.LEVEL_NODE:
+      self._LockInstancesNodes()
+
+  def BuildHooksEnv(self):
+    """Build hooks env.
 
+    This runs on master, primary and secondary nodes of the instance.
 
-def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
-  """Create a tree of block devices on a secondary node.
+    """
+    env = _BuildInstanceHookEnvByObject(self, self.instance)
+    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
+    return env, nl, nl
+
+  def CheckPrereq(self):
+    """Check prerequisites.
+
+    This checks that the instance is in the cluster.
+
+    """
+    instance = self.cfg.GetInstanceInfo(
+      self.cfg.ExpandInstanceName(self.op.instance_name))
+    if instance is None:
+      raise errors.OpPrereqError("Instance '%s' not known" %
+                                 self.op.instance_name)
+
+    if instance.disk_template != constants.DT_DRBD8:
+      raise errors.OpPrereqError("Instance's disk layout is not"
+                                 " drbd8, cannot migrate.")
+
+    secondary_nodes = instance.secondary_nodes
+    if not secondary_nodes:
+      raise errors.ProgrammerError("no secondary node but using "
+                                   "drbd8 disk template")
+
+    i_be = self.cfg.GetClusterInfo().FillBE(instance)
+
+    target_node = secondary_nodes[0]
+    # check memory requirements on the secondary node
+    _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
+                         instance.name, i_be[constants.BE_MEMORY],
+                         instance.hypervisor)
+
+    # check bridge existance
+    brlist = [nic.bridge for nic in instance.nics]
+    result = self.rpc.call_bridges_exist(target_node, brlist)
+    if result.failed or not result.data:
+      raise errors.OpPrereqError("One or more target bridges %s does not"
+                                 " exist on destination node '%s'" %
+                                 (brlist, target_node))
+
+    if not self.op.cleanup:
+      result = self.rpc.call_instance_migratable(instance.primary_node,
+                                                 instance)
+      msg = result.RemoteFailMsg()
+      if msg:
+        raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
+                                   msg)
+
+    self.instance = instance
+
+  def _WaitUntilSync(self):
+    """Poll with custom rpc for disk sync.
+
+    This uses our own step-based rpc call.
+
+    """
+    self.feedback_fn("* wait until resync is done")
+    all_done = False
+    while not all_done:
+      all_done = True
+      result = self.rpc.call_drbd_wait_sync(self.all_nodes,
+                                            self.nodes_ip,
+                                            self.instance.disks)
+      min_percent = 100
+      for node, nres in result.items():
+        msg = nres.RemoteFailMsg()
+        if msg:
+          raise errors.OpExecError("Cannot resync disks on node %s: %s" %
+                                   (node, msg))
+        node_done, node_percent = nres.data[1]
+        all_done = all_done and node_done
+        if node_percent is not None:
+          min_percent = min(min_percent, node_percent)
+      if not all_done:
+        if min_percent < 100:
+          self.feedback_fn("   - progress: %.1f%%" % min_percent)
+        time.sleep(2)
+
+  def _EnsureSecondary(self, node):
+    """Demote a node to secondary.
+
+    """
+    self.feedback_fn("* switching node %s to secondary mode" % node)
+
+    for dev in self.instance.disks:
+      self.cfg.SetDiskID(dev, node)
+
+    result = self.rpc.call_blockdev_close(node, self.instance.name,
+                                          self.instance.disks)
+    msg = result.RemoteFailMsg()
+    if msg:
+      raise errors.OpExecError("Cannot change disk to secondary on node %s,"
+                               " error %s" % (node, msg))
+
+  def _GoStandalone(self):
+    """Disconnect from the network.
+
+    """
+    self.feedback_fn("* changing into standalone mode")
+    result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
+                                               self.instance.disks)
+    for node, nres in result.items():
+      msg = nres.RemoteFailMsg()
+      if msg:
+        raise errors.OpExecError("Cannot disconnect disks node %s,"
+                                 " error %s" % (node, msg))
+
+  def _GoReconnect(self, multimaster):
+    """Reconnect to the network.
+
+    """
+    if multimaster:
+      msg = "dual-master"
+    else:
+      msg = "single-master"
+    self.feedback_fn("* changing disks into %s mode" % msg)
+    result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
+                                           self.instance.disks,
+                                           self.instance.name, multimaster)
+    for node, nres in result.items():
+      msg = nres.RemoteFailMsg()
+      if msg:
+        raise errors.OpExecError("Cannot change disks config on node %s,"
+                                 " error: %s" % (node, msg))
+
+  def _ExecCleanup(self):
+    """Try to cleanup after a failed migration.
+
+    The cleanup is done by:
+      - check that the instance is running only on one node
+        (and update the config if needed)
+      - change disks on its secondary node to secondary
+      - wait until disks are fully synchronized
+      - disconnect from the network
+      - change disks into single-master mode
+      - wait again until disks are fully synchronized
+
+    """
+    instance = self.instance
+    target_node = self.target_node
+    source_node = self.source_node
+
+    # check running on only one node
+    self.feedback_fn("* checking where the instance actually runs"
+                     " (if this hangs, the hypervisor might be in"
+                     " a bad state)")
+    ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
+    for node, result in ins_l.items():
+      result.Raise()
+      if not isinstance(result.data, list):
+        raise errors.OpExecError("Can't contact node '%s'" % node)
+
+    runningon_source = instance.name in ins_l[source_node].data
+    runningon_target = instance.name in ins_l[target_node].data
+
+    if runningon_source and runningon_target:
+      raise errors.OpExecError("Instance seems to be running on two nodes,"
+                               " or the hypervisor is confused. You will have"
+                               " to ensure manually that it runs only on one"
+                               " and restart this operation.")
+
+    if not (runningon_source or runningon_target):
+      raise errors.OpExecError("Instance does not seem to be running at all."
+                               " In this case, it's safer to repair by"
+                               " running 'gnt-instance stop' to ensure disk"
+                               " shutdown, and then restarting it.")
+
+    if runningon_target:
+      # the migration has actually succeeded, we need to update the config
+      self.feedback_fn("* instance running on secondary node (%s),"
+                       " updating config" % target_node)
+      instance.primary_node = target_node
+      self.cfg.Update(instance)
+      demoted_node = source_node
+    else:
+      self.feedback_fn("* instance confirmed to be running on its"
+                       " primary node (%s)" % source_node)
+      demoted_node = target_node
+
+    self._EnsureSecondary(demoted_node)
+    try:
+      self._WaitUntilSync()
+    except errors.OpExecError:
+      # we ignore here errors, since if the device is standalone, it
+      # won't be able to sync
+      pass
+    self._GoStandalone()
+    self._GoReconnect(False)
+    self._WaitUntilSync()
+
+    self.feedback_fn("* done")
+
+  def _ExecMigration(self):
+    """Migrate an instance.
+
+    The migrate is done by:
+      - change the disks into dual-master mode
+      - wait until disks are fully synchronized again
+      - migrate the instance
+      - change disks on the new secondary node (the old primary) to secondary
+      - wait until disks are fully synchronized
+      - change disks into single-master mode
+
+    """
+    instance = self.instance
+    target_node = self.target_node
+    source_node = self.source_node
+
+    self.feedback_fn("* checking disk consistency between source and target")
+    for dev in instance.disks:
+      if not _CheckDiskConsistency(self, dev, target_node, False):
+        raise errors.OpExecError("Disk %s is degraded or not fully"
+                                 " synchronized on target node,"
+                                 " aborting migrate." % dev.iv_name)
+
+    self._EnsureSecondary(target_node)
+    self._GoStandalone()
+    self._GoReconnect(True)
+    self._WaitUntilSync()
+
+    self.feedback_fn("* migrating instance to %s" % target_node)
+    time.sleep(10)
+    result = self.rpc.call_instance_migrate(source_node, instance,
+                                            self.nodes_ip[target_node],
+                                            self.op.live)
+    msg = result.RemoteFailMsg()
+    if msg:
+      logging.error("Instance migration failed, trying to revert"
+                    " disk status: %s", msg)
+      try:
+        self._EnsureSecondary(target_node)
+        self._GoStandalone()
+        self._GoReconnect(False)
+        self._WaitUntilSync()
+      except errors.OpExecError, err:
+        self.LogWarning("Migration failed and I can't reconnect the"
+                        " drives: error '%s'\n"
+                        "Please look and recover the instance status" %
+                        str(err))
+
+      raise errors.OpExecError("Could not migrate instance %s: %s" %
+                               (instance.name, msg))
+    time.sleep(10)
+
+    instance.primary_node = target_node
+    # distribute new instance config to the other nodes
+    self.cfg.Update(instance)
+
+    self._EnsureSecondary(source_node)
+    self._WaitUntilSync()
+    self._GoStandalone()
+    self._GoReconnect(False)
+    self._WaitUntilSync()
+
+    self.feedback_fn("* done")
+
+  def Exec(self, feedback_fn):
+    """Perform the migration.
+
+    """
+    self.feedback_fn = feedback_fn
+
+    self.source_node = self.instance.primary_node
+    self.target_node = self.instance.secondary_nodes[0]
+    self.all_nodes = [self.source_node, self.target_node]
+    self.nodes_ip = {
+      self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
+      self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
+      }
+    if self.op.cleanup:
+      return self._ExecCleanup()
+    else:
+      return self._ExecMigration()
+
+
+def _CreateBlockDev(lu, node, instance, device, force_create,
+                    info, force_open):
+  """Create a tree of block devices on a given node.
 
   If this device type has to be created on secondaries, create it and
   all its children.
 
   If not, just recurse to children keeping the same 'force' value.
 
+  @param lu: the lu on whose behalf we execute
+  @param node: the node on which to create the device
+  @type instance: L{objects.Instance}
+  @param instance: the instance which owns the device
+  @type device: L{objects.Disk}
+  @param device: the device to create
+  @type force_create: boolean
+  @param force_create: whether to force creation of this device; this
+      will be change to True whenever we find a device which has
+      CreateOnSecondary() attribute
+  @param info: the extra 'metadata' we should attach to the device
+      (this will be represented as a LVM tag)
+  @type force_open: boolean
+  @param force_open: this parameter will be passes to the
+      L{backend.CreateBlockDevice} function where it specifies
+      whether we run on primary or not, and it affects both
+      the child assembly and the device own Open() execution
+
   """
   if device.CreateOnSecondary():
-    force = True
+    force_create = True
+
   if device.children:
     for child in device.children:
-      if not _CreateBlockDevOnSecondary(lu, node, instance,
-                                        child, force, info):
-        return False
+      _CreateBlockDev(lu, node, instance, child, force_create,
+                      info, force_open)
 
-  if not force:
-    return True
+  if not force_create:
+    return
+
+  _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
+
+
+def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
+  """Create a single block device on a given node.
+
+  This will not recurse over children of the device, so they must be
+  created in advance.
+
+  @param lu: the lu on whose behalf we execute
+  @param node: the node on which to create the device
+  @type instance: L{objects.Instance}
+  @param instance: the instance which owns the device
+  @type device: L{objects.Disk}
+  @param device: the device to create
+  @param info: the extra 'metadata' we should attach to the device
+      (this will be represented as a LVM tag)
+  @type force_open: boolean
+  @param force_open: this parameter will be passes to the
+      L{backend.CreateBlockDevice} function where it specifies
+      whether we run on primary or not, and it affects both
+      the child assembly and the device own Open() execution
+
+  """
   lu.cfg.SetDiskID(device, node)
-  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
-                                       instance.name, False, info)
-  if new_id.failed or not new_id.data:
-    return False
+  result = lu.rpc.call_blockdev_create(node, device, device.size,
+                                       instance.name, force_open, info)
+  msg = result.RemoteFailMsg()
+  if msg:
+    raise errors.OpExecError("Can't create block device %s on"
+                             " node %s for instance %s: %s" %
+                             (device, node, instance.name, msg))
   if device.physical_id is None:
-    device.physical_id = new_id
-  return True
+    device.physical_id = result.data[1]
 
 
 def _GenerateUniqueNames(lu, exts):
@@ -3474,11 +3830,11 @@ def _GenerateDiskTemplate(lu, template_name,
     minors = lu.cfg.AllocateDRBDMinor(
       [primary_node, remote_node] * len(disk_info), instance_name)
 
-    names = _GenerateUniqueNames(lu,
-                                 [".disk%d_%s" % (i, s)
-                                  for i in range(disk_count)
-                                  for s in ("data", "meta")
-                                  ])
+    names = []
+    for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
+                                               for i in range(disk_count)]):
+      names.append(lv_prefix + "_data")
+      names.append(lv_prefix + "_meta")
     for idx, disk in enumerate(disk_info):
       disk_index = idx + base_index
       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
@@ -3524,19 +3880,18 @@ def _CreateDisks(lu, instance):
 
   """
   info = _GetInstanceInfoText(instance)
+  pnode = instance.primary_node
 
   if instance.disk_template == constants.DT_FILE:
     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
-    result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
-                                                 file_storage_dir)
+    result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
 
     if result.failed or not result.data:
-      logging.error("Could not connect to node '%s'", instance.primary_node)
-      return False
+      raise errors.OpExecError("Could not connect to node '%s'" % pnode)
 
     if not result.data[0]:
-      logging.error("Failed to create directory '%s'", file_storage_dir)
-      return False
+      raise errors.OpExecError("Failed to create directory '%s'" %
+                               file_storage_dir)
 
   # Note: this needs to be kept in sync with adding of disks in
   # LUSetInstanceParams
@@ -3544,19 +3899,9 @@ def _CreateDisks(lu, instance):
     logging.info("Creating volume %s for instance %s",
                  device.iv_name, instance.name)
     #HARDCODE
-    for secondary_node in instance.secondary_nodes:
-      if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
-                                        device, False, info):
-        logging.error("Failed to create volume %s (%s) on secondary node %s!",
-                      device.iv_name, device, secondary_node)
-        return False
-    #HARDCODE
-    if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
-                                    instance, device, info):
-      logging.error("Failed to create volume %s on primary!", device.iv_name)
-      return False
-
-  return True
+    for node in instance.all_nodes:
+      f_create = node == pnode
+      _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
 
 
 def _RemoveDisks(lu, instance):
@@ -4113,10 +4458,15 @@ class LUCreateInstance(LogicalUnit):
                             )
 
     feedback_fn("* creating instance disks...")
-    if not _CreateDisks(self, iobj):
-      _RemoveDisks(self, iobj)
-      self.cfg.ReleaseDRBDMinors(instance)
-      raise errors.OpExecError("Device creation failed, reverting...")
+    try:
+      _CreateDisks(self, iobj)
+    except errors.OpExecError:
+      self.LogWarning("Device creation failed, reverting...")
+      try:
+        _RemoveDisks(self, iobj)
+      finally:
+        self.cfg.ReleaseDRBDMinors(instance)
+        raise
 
     feedback_fn("adding instance %s to cluster config" % instance)
 
@@ -4162,11 +4512,11 @@ class LUCreateInstance(LogicalUnit):
       if self.op.mode == constants.INSTANCE_CREATE:
         feedback_fn("* running the instance OS create scripts...")
         result = self.rpc.call_instance_os_add(pnode_name, iobj)
-        result.Raise()
-        if not result.data:
+        msg = result.RemoteFailMsg()
+        if msg:
           raise errors.OpExecError("Could not add os for instance %s"
-                                   " on node %s" %
-                                   (instance, pnode_name))
+                                   " on node %s: %s" %
+                                   (instance, pnode_name, msg))
 
       elif self.op.mode == constants.INSTANCE_IMPORT:
         feedback_fn("* running the instance OS import scripts...")
@@ -4191,9 +4541,9 @@ class LUCreateInstance(LogicalUnit):
       logging.info("Starting instance %s on node %s", instance, pnode_name)
       feedback_fn("* starting instance...")
       result = self.rpc.call_instance_start(pnode_name, iobj, None)
-      result.Raise()
-      if not result.data:
-        raise errors.OpExecError("Could not start instance")
+      msg = result.RemoteFailMsg()
+      if msg:
+        raise errors.OpExecError("Could not start instance: %s" % msg)
 
 
 class LUConnectConsole(NoHooksLU):
@@ -4219,7 +4569,7 @@ class LUConnectConsole(NoHooksLU):
     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
     assert self.instance is not None, \
       "Cannot retrieve locked instance %s" % self.op.instance_name
-    _CheckNodeOnline(self, self.op.primary_node)
+    _CheckNodeOnline(self, self.instance.primary_node)
 
   def Exec(self, feedback_fn):
     """Connect to the console of an instance
@@ -4253,17 +4603,32 @@ class LUReplaceDisks(LogicalUnit):
   _OP_REQP = ["instance_name", "mode", "disks"]
   REQ_BGL = False
 
-  def ExpandNames(self):
-    self._ExpandAndLockInstance()
-
+  def CheckArguments(self):
     if not hasattr(self.op, "remote_node"):
       self.op.remote_node = None
-
-    ia_name = getattr(self.op, "iallocator", None)
-    if ia_name is not None:
-      if self.op.remote_node is not None:
+    if not hasattr(self.op, "iallocator"):
+      self.op.iallocator = None
+
+    # check for valid parameter combination
+    cnt = [self.op.remote_node, self.op.iallocator].count(None)
+    if self.op.mode == constants.REPLACE_DISK_CHG:
+      if cnt == 2:
+        raise errors.OpPrereqError("When changing the secondary either an"
+                                   " iallocator script must be used or the"
+                                   " new node given")
+      elif cnt == 0:
         raise errors.OpPrereqError("Give either the iallocator or the new"
                                    " secondary, not both")
+    else: # not replacing the secondary
+      if cnt != 2:
+        raise errors.OpPrereqError("The iallocator and new node options can"
+                                   " be used only when changing the"
+                                   " secondary node")
+
+  def ExpandNames(self):
+    self._ExpandAndLockInstance()
+
+    if self.op.iallocator is not None:
       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
     elif self.op.remote_node is not None:
       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
@@ -4338,9 +4703,9 @@ class LUReplaceDisks(LogicalUnit):
       "Cannot retrieve locked instance %s" % self.op.instance_name
     self.instance = instance
 
-    if instance.disk_template not in constants.DTS_NET_MIRROR:
-      raise errors.OpPrereqError("Instance's disk layout is not"
-                                 " network mirrored.")
+    if instance.disk_template != constants.DT_DRBD8:
+      raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
+                                 " instances")
 
     if len(instance.secondary_nodes) != 1:
       raise errors.OpPrereqError("The instance has a strange layout,"
@@ -4349,8 +4714,7 @@ class LUReplaceDisks(LogicalUnit):
 
     self.sec_node = instance.secondary_nodes[0]
 
-    ia_name = getattr(self.op, "iallocator", None)
-    if ia_name is not None:
+    if self.op.iallocator is not None:
       self._RunAllocator()
 
     remote_node = self.op.remote_node
@@ -4364,42 +4728,24 @@ class LUReplaceDisks(LogicalUnit):
       raise errors.OpPrereqError("The specified node is the primary node of"
                                  " the instance.")
     elif remote_node == self.sec_node:
-      if self.op.mode == constants.REPLACE_DISK_SEC:
-        # this is for DRBD8, where we can't execute the same mode of
-        # replacement as for drbd7 (no different port allocated)
-        raise errors.OpPrereqError("Same secondary given, cannot execute"
-                                   " replacement")
-    if instance.disk_template == constants.DT_DRBD8:
-      if (self.op.mode == constants.REPLACE_DISK_ALL and
-          remote_node is not None):
-        # switch to replace secondary mode
-        self.op.mode = constants.REPLACE_DISK_SEC
-
-      if self.op.mode == constants.REPLACE_DISK_ALL:
-        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
-                                   " secondary disk replacement, not"
-                                   " both at once")
-      elif self.op.mode == constants.REPLACE_DISK_PRI:
-        if remote_node is not None:
-          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
-                                     " the secondary while doing a primary"
-                                     " node disk replacement")
-        self.tgt_node = instance.primary_node
-        self.oth_node = instance.secondary_nodes[0]
-        _CheckNodeOnline(self, self.tgt_node)
-        _CheckNodeOnline(self, self.oth_node)
-      elif self.op.mode == constants.REPLACE_DISK_SEC:
-        self.new_node = remote_node # this can be None, in which case
-                                    # we don't change the secondary
-        self.tgt_node = instance.secondary_nodes[0]
-        self.oth_node = instance.primary_node
-        _CheckNodeOnline(self, self.oth_node)
-        if self.new_node is not None:
-          _CheckNodeOnline(self, self.new_node)
-        else:
-          _CheckNodeOnline(self, self.tgt_node)
-      else:
-        raise errors.ProgrammerError("Unhandled disk replace mode")
+      raise errors.OpPrereqError("The specified node is already the"
+                                 " secondary node of the instance.")
+
+    if self.op.mode == constants.REPLACE_DISK_PRI:
+      n1 = self.tgt_node = instance.primary_node
+      n2 = self.oth_node = self.sec_node
+    elif self.op.mode == constants.REPLACE_DISK_SEC:
+      n1 = self.tgt_node = self.sec_node
+      n2 = self.oth_node = instance.primary_node
+    elif self.op.mode == constants.REPLACE_DISK_CHG:
+      n1 = self.new_node = remote_node
+      n2 = self.oth_node = instance.primary_node
+      self.tgt_node = self.sec_node
+    else:
+      raise errors.ProgrammerError("Unhandled disk replace mode")
+
+    _CheckNodeOnline(self, n1)
+    _CheckNodeOnline(self, n2)
 
     if not self.op.disks:
       self.op.disks = range(len(instance.disks))
@@ -4492,15 +4838,10 @@ class LUReplaceDisks(LogicalUnit):
       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
       info("creating new local storage on %s for %s" %
            (tgt_node, dev.iv_name))
-      # since we *always* want to create this LV, we use the
-      # _Create...OnPrimary (which forces the creation), even if we
-      # are talking about the secondary node
+      # we pass force_create=True to force the LVM creation
       for new_lv in new_lvs:
-        if not _CreateBlockDevOnPrimary(self, tgt_node, instance, new_lv,
-                                        _GetInstanceInfoText(instance)):
-          raise errors.OpExecError("Failed to create new LV named '%s' on"
-                                   " node '%s'" %
-                                   (new_lv.logical_id[1], tgt_node))
+        _CreateBlockDev(self, tgt_node, instance, new_lv, True,
+                        _GetInstanceInfoText(instance), False)
 
     # Step: for each lv, detach+rename*2+attach
     self.proc.LogStep(4, steps_total, "change drbd configuration")
@@ -4554,7 +4895,7 @@ class LUReplaceDisks(LogicalUnit):
 
       # now that the new lvs have the old name, we can add them to the device
       info("adding new mirror component on %s" % tgt_node)
-      result =self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
+      result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
       if result.failed or not result.data:
         for new_lv in new_lvs:
           result = self.rpc.call_blockdev_remove(tgt_node, new_lv)
@@ -4615,12 +4956,16 @@ class LUReplaceDisks(LogicalUnit):
     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
     instance = self.instance
     iv_names = {}
-    vgname = self.cfg.GetVGName()
     # start of work
     cfg = self.cfg
     old_node = self.tgt_node
     new_node = self.new_node
     pri_node = instance.primary_node
+    nodes_ip = {
+      old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
+      new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
+      pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
+      }
 
     # Step: check device activation
     self.proc.LogStep(1, steps_total, "check device existence")
@@ -4657,18 +5002,12 @@ class LUReplaceDisks(LogicalUnit):
     # Step: create new storage
     self.proc.LogStep(3, steps_total, "allocate new storage")
     for idx, dev in enumerate(instance.disks):
-      size = dev.size
       info("adding new local storage on %s for disk/%d" %
            (new_node, idx))
-      # since we *always* want to create this LV, we use the
-      # _Create...OnPrimary (which forces the creation), even if we
-      # are talking about the secondary node
+      # we pass force_create=True to force LVM creation
       for new_lv in dev.children:
-        if not _CreateBlockDevOnPrimary(self, new_node, instance, new_lv,
-                                        _GetInstanceInfoText(instance)):
-          raise errors.OpExecError("Failed to create new LV named '%s' on"
-                                   " node '%s'" %
-                                   (new_lv.logical_id[1], new_node))
+        _CreateBlockDev(self, new_node, instance, new_lv, True,
+                        _GetInstanceInfoText(instance), False)
 
     # Step 4: dbrd minors and drbd setups changes
     # after this, we must manually remove the drbd minors on both the
@@ -4680,27 +5019,31 @@ class LUReplaceDisks(LogicalUnit):
     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
       size = dev.size
       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
-      # create new devices on new_node
-      if pri_node == dev.logical_id[0]:
-        new_logical_id = (pri_node, new_node,
-                          dev.logical_id[2], dev.logical_id[3], new_minor,
-                          dev.logical_id[5])
+      # create new devices on new_node; note that we create two IDs:
+      # one without port, so the drbd will be activated without
+      # networking information on the new node at this stage, and one
+      # with network, for the latter activation in step 4
+      (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
+      if pri_node == o_node1:
+        p_minor = o_minor1
       else:
-        new_logical_id = (new_node, pri_node,
-                          dev.logical_id[2], new_minor, dev.logical_id[4],
-                          dev.logical_id[5])
-      iv_names[idx] = (dev, dev.children, new_logical_id)
+        p_minor = o_minor2
+
+      new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
+      new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
+
+      iv_names[idx] = (dev, dev.children, new_net_id)
       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
-                    new_logical_id)
+                    new_net_id)
       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
-                              logical_id=new_logical_id,
+                              logical_id=new_alone_id,
                               children=dev.children)
-      if not _CreateBlockDevOnSecondary(self, new_node, instance,
-                                        new_drbd, False,
-                                        _GetInstanceInfoText(instance)):
+      try:
+        _CreateSingleBlockDev(self, new_node, instance, new_drbd,
+                              _GetInstanceInfoText(instance), False)
+      except error.BlockDeviceError:
         self.cfg.ReleaseDRBDMinors(instance.name)
-        raise errors.OpExecError("Failed to create new DRBD on"
-                                 " node '%s'" % new_node)
+        raise
 
     for idx, dev in enumerate(instance.disks):
       # we have new devices, shutdown the drbd on the old secondary
@@ -4712,25 +5055,15 @@ class LUReplaceDisks(LogicalUnit):
                 hint="Please cleanup this device manually as soon as possible")
 
     info("detaching primary drbds from the network (=> standalone)")
-    done = 0
-    for idx, dev in enumerate(instance.disks):
-      cfg.SetDiskID(dev, pri_node)
-      # set the network part of the physical (unique in bdev terms) id
-      # to None, meaning detach from network
-      dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
-      # and 'find' the device, which will 'fix' it to match the
-      # standalone state
-      result = self.rpc.call_blockdev_find(pri_node, dev)
-      if not result.failed and result.data:
-        done += 1
-      else:
-        warning("Failed to detach drbd disk/%d from network, unusual case" %
-                idx)
+    result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
+                                               instance.disks)[pri_node]
 
-    if not done:
-      # no detaches succeeded (very unlikely)
+    msg = result.RemoteFailMsg()
+    if msg:
+      # detaches didn't succeed (unlikely)
       self.cfg.ReleaseDRBDMinors(instance.name)
-      raise errors.OpExecError("Can't detach at least one DRBD from old node")
+      raise errors.OpExecError("Can't detach the disks from the network on"
+                               " old node: %s" % (msg,))
 
     # if we managed to detach at least one, we update all the disks of
     # the instance to point to the new secondary
@@ -4745,18 +5078,15 @@ class LUReplaceDisks(LogicalUnit):
 
     # and now perform the drbd attach
     info("attaching primary drbds to new secondary (standalone => connected)")
-    failures = []
-    for idx, dev in enumerate(instance.disks):
-      info("attaching primary drbd for disk/%d to new secondary node" % idx)
-      # since the attach is smart, it's enough to 'find' the device,
-      # it will automatically activate the network, if the physical_id
-      # is correct
-      cfg.SetDiskID(dev, pri_node)
-      logging.debug("Disk to attach: %s", dev)
-      result = self.rpc.call_blockdev_find(pri_node, dev)
-      if result.failed or not result.data:
-        warning("can't attach drbd disk/%d to new secondary!" % idx,
-                "please do a gnt-instance info to see the status of disks")
+    result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
+                                           instance.disks, instance.name,
+                                           False)
+    for to_node, to_result in result.items():
+      msg = to_result.RemoteFailMsg()
+      if msg:
+        warning("can't attach drbd disks on node %s: %s", to_node, msg,
+                hint="please do a gnt-instance info to see the"
+                " status of disks")
 
     # this can fail as the old devices are degraded and _WaitForSync
     # does a combined result over all disks, so we don't check its
@@ -4794,13 +5124,10 @@ class LUReplaceDisks(LogicalUnit):
     if instance.status == "down":
       _StartInstanceDisks(self, instance, True)
 
-    if instance.disk_template == constants.DT_DRBD8:
-      if self.op.remote_node is None:
-        fn = self._ExecD8DiskOnly
-      else:
-        fn = self._ExecD8Secondary
+    if self.op.mode == constants.REPLACE_DISK_CHG:
+      fn = self._ExecD8Secondary
     else:
-      raise errors.ProgrammerError("Unhandled disk replacement case")
+      fn = self._ExecD8DiskOnly
 
     ret = fn(feedback_fn)
 
@@ -4855,8 +5182,8 @@ class LUGrowDisk(LogicalUnit):
     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
     assert instance is not None, \
       "Cannot retrieve locked instance %s" % self.op.instance_name
-    _CheckNodeOnline(self, instance.primary_node)
-    for node in instance.secondary_nodes:
+    nodenames = list(instance.all_nodes)
+    for node in nodenames:
       _CheckNodeOnline(self, node)
 
 
@@ -4868,7 +5195,6 @@ class LUGrowDisk(LogicalUnit):
 
     self.disk = instance.FindDisk(self.op.disk)
 
-    nodenames = [instance.primary_node] + list(instance.secondary_nodes)
     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
                                        instance.hypervisor)
     for node in nodenames:
@@ -4891,7 +5217,7 @@ class LUGrowDisk(LogicalUnit):
     """
     instance = self.instance
     disk = self.disk
-    for node in (instance.secondary_nodes + (instance.primary_node,)):
+    for node in instance.all_nodes:
       self.cfg.SetDiskID(disk, node)
       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
       result.Raise()
@@ -4929,8 +5255,7 @@ class LUQueryInstanceData(NoHooksLU):
       for name in self.op.instances:
         full_name = self.cfg.ExpandInstanceName(name)
         if full_name is None:
-          raise errors.OpPrereqError("Instance '%s' not known" %
-                                     self.op.instance_name)
+          raise errors.OpPrereqError("Instance '%s' not known" % name)
         self.wanted_names.append(full_name)
       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
     else:
@@ -5171,8 +5496,7 @@ class LUSetInstanceParams(LogicalUnit):
       args['vcpus'] = self.be_new[constants.BE_VCPUS]
     # FIXME: readd disk/nic changes
     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
-    nl = [self.cfg.GetMasterNode(),
-          self.instance.primary_node] + list(self.instance.secondary_nodes)
+    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
     return env, nl, nl
 
   def CheckPrereq(self):
@@ -5188,9 +5512,8 @@ class LUSetInstanceParams(LogicalUnit):
     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
     assert self.instance is not None, \
       "Cannot retrieve locked instance %s" % self.op.instance_name
-    pnode = self.instance.primary_node
-    nodelist = [pnode]
-    nodelist.extend(instance.secondary_nodes)
+    pnode = instance.primary_node
+    nodelist = list(instance.all_nodes)
 
     # hvparams processing
     if self.op.hvparams:
@@ -5266,7 +5589,9 @@ class LUSetInstanceParams(LogicalUnit):
                                      " missing on its primary node" % miss_mem)
 
       if be_new[constants.BE_AUTO_BALANCE]:
-        for node, nres in instance.secondary_nodes.iteritems():
+        for node, nres in nodeinfo.iteritems():
+          if node not in instance.secondary_nodes:
+            continue
           if nres.failed or not isinstance(nres.data, dict):
             self.warn.append("Can't get info from secondary node %s" % node)
           elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
@@ -5306,9 +5631,9 @@ class LUSetInstanceParams(LogicalUnit):
                                      " an instance")
         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
         ins_l = ins_l[pnode]
-        if not type(ins_l) is list:
+        if ins_l.failed or not isinstance(ins_l.data, list):
           raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
-        if instance.name in ins_l:
+        if instance.name in ins_l.data:
           raise errors.OpPrereqError("Instance is running, can't remove"
                                      " disks.")
 
@@ -5346,8 +5671,8 @@ class LUSetInstanceParams(LogicalUnit):
         device_idx = len(instance.disks)
         for node, disk in device.ComputeNodeTree(instance.primary_node):
           self.cfg.SetDiskID(disk, node)
-          result = self.rpc.call_blockdev_remove(node, disk)
-          if result.failed or not result.data:
+          rpc_result = self.rpc.call_blockdev_remove(node, disk)
+          if rpc_result.failed or not rpc_result.data:
             self.proc.LogWarning("Could not remove disk/%d on node %s,"
                                  " continuing anyway", device_idx, node)
         result.append(("disk/%d" % device_idx, "remove"))
@@ -5375,17 +5700,15 @@ class LUSetInstanceParams(LogicalUnit):
                      new_disk.iv_name, instance.name)
         # Note: this needs to be kept in sync with _CreateDisks
         #HARDCODE
-        for secondary_node in instance.secondary_nodes:
-          if not _CreateBlockDevOnSecondary(self, secondary_node, instance,
-                                            new_disk, False, info):
+        for node in instance.all_nodes:
+          f_create = node == instance.primary_node
+          try:
+            _CreateBlockDev(self, node, instance, new_disk,
+                            f_create, info, f_create)
+          except error.OpExecError, err:
             self.LogWarning("Failed to create volume %s (%s) on"
-                            " secondary node %s!",
-                            new_disk.iv_name, new_disk, secondary_node)
-        #HARDCODE
-        if not _CreateBlockDevOnPrimary(self, instance.primary_node,
-                                        instance, new_disk, info):
-          self.LogWarning("Failed to create volume %s on primary!",
-                          new_disk.iv_name)
+                            " node %s: %s",
+                            new_disk.iv_name, new_disk, node, err)
         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
                        (new_disk.size, new_disk.mode)))
       else:
@@ -5528,7 +5851,7 @@ class LUExportInstance(LogicalUnit):
     self.instance = self.cfg.GetInstanceInfo(instance_name)
     assert self.instance is not None, \
           "Cannot retrieve locked instance %s" % self.op.instance_name
-    _CheckNodeOnline(self, instance.primary_node)
+    _CheckNodeOnline(self, self.instance.primary_node)
 
     self.dst_node = self.cfg.GetNodeInfo(
       self.cfg.ExpandNodeName(self.op.target_node))
@@ -5536,7 +5859,7 @@ class LUExportInstance(LogicalUnit):
     if self.dst_node is None:
       # This is wrong node name, not a non-locked node
       raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
-    _CheckNodeOnline(self, self.op.target_node)
+    _CheckNodeOnline(self, self.dst_node.name)
 
     # instance disk type verification
     for disk in self.instance.disks:
@@ -5563,6 +5886,11 @@ class LUExportInstance(LogicalUnit):
 
     snap_disks = []
 
+    # set the disks ID correctly since call_instance_start needs the
+    # correct drbd minor to create the symlinks
+    for disk in instance.disks:
+      self.cfg.SetDiskID(disk, src_node)
+
     try:
       for disk in instance.disks:
         # new_dev_name will be a snapshot of an lvm leaf of the one we passed
@@ -5581,9 +5909,10 @@ class LUExportInstance(LogicalUnit):
     finally:
       if self.op.shutdown and instance.status == "up":
         result = self.rpc.call_instance_start(src_node, instance, None)
-        if result.failed or not result.data:
+        msg = result.RemoteFailMsg()
+        if msg:
           _ShutdownInstanceDisks(self, instance)
-          raise errors.OpExecError("Could not start instance")
+          raise errors.OpExecError("Could not start instance: %s" % msg)
 
     # TODO: check for size
 
@@ -5917,6 +6246,7 @@ class IAllocator(object):
     self.name = name
     self.mem_size = self.disks = self.disk_template = None
     self.os = self.tags = self.nics = self.vcpus = None
+    self.hypervisor = None
     self.relocate_from = None
     # computed fields
     self.required_nodes = None
@@ -5964,12 +6294,12 @@ class IAllocator(object):
     node_list = cfg.GetNodeList()
 
     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
-      hypervisor = self.hypervisor
+      hypervisor_name = self.hypervisor
     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
-      hypervisor = cfg.GetInstanceInfo(self.name).hypervisor
+      hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
 
     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
-                                           hypervisor)
+                                           hypervisor_name)
     node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
                        cluster_info.enabled_hypervisors)
     for nname in node_list:
@@ -6032,7 +6362,7 @@ class IAllocator(object):
         "vcpus": beinfo[constants.BE_VCPUS],
         "memory": beinfo[constants.BE_MEMORY],
         "os": iinfo.os,
-        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
+        "nodes": list(iinfo.all_nodes),
         "nics": nic_data,
         "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
         "disk_template": iinfo.disk_template,