Add new opcode to list physical volumes
[ganeti-local] / lib / cmdlib.py
index 88d600d..c6101c5 100644 (file)
@@ -47,8 +47,8 @@ class LogicalUnit(object):
 
   Subclasses must follow these rules:
     - implement ExpandNames
-    - implement CheckPrereq
-    - implement Exec
+    - implement CheckPrereq (except when tasklets are used)
+    - implement Exec (except when tasklets are used)
     - implement BuildHooksEnv
     - redefine HPATH and HTYPE
     - optionally redefine their run requirements:
@@ -89,14 +89,19 @@ class LogicalUnit(object):
     # logging
     self.LogWarning = processor.LogWarning
     self.LogInfo = processor.LogInfo
+    self.LogStep = processor.LogStep
     # support for dry-run
     self.dry_run_result = None
 
+    # Tasklets
+    self.tasklets = None
+
     for attr_name in self._OP_REQP:
       attr_val = getattr(op, attr_name, None)
       if attr_val is None:
         raise errors.OpPrereqError("Required parameter '%s' missing" %
                                    attr_name)
+
     self.CheckArguments()
 
   def __GetSSH(self):
@@ -148,6 +153,10 @@ class LogicalUnit(object):
     level you can modify self.share_locks, setting a true value (usually 1) for
     that level. By default locks are not shared.
 
+    This function can also define a list of tasklets, which then will be
+    executed in order instead of the usual LU-level CheckPrereq and Exec
+    functions, if those are not defined by the LU.
+
     Examples::
 
       # Acquire all nodes and one instance
@@ -204,7 +213,13 @@ class LogicalUnit(object):
     their canonical form if it hasn't been done by ExpandNames before.
 
     """
-    raise NotImplementedError
+    if self.tasklets is not None:
+      for (idx, tl) in enumerate(self.tasklets):
+        logging.debug("Checking prerequisites for tasklet %s/%s",
+                      idx + 1, len(self.tasklets))
+        tl.CheckPrereq()
+    else:
+      raise NotImplementedError
 
   def Exec(self, feedback_fn):
     """Execute the LU.
@@ -214,7 +229,12 @@ class LogicalUnit(object):
     code, or expected.
 
     """
-    raise NotImplementedError
+    if self.tasklets is not None:
+      for (idx, tl) in enumerate(self.tasklets):
+        logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
+        tl.Exec(feedback_fn)
+    else:
+      raise NotImplementedError
 
   def BuildHooksEnv(self):
     """Build hooks environment for this LU.
@@ -338,6 +358,52 @@ class NoHooksLU(LogicalUnit):
   HTYPE = None
 
 
+class Tasklet:
+  """Tasklet base class.
+
+  Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
+  they can mix legacy code with tasklets. Locking needs to be done in the LU,
+  tasklets know nothing about locks.
+
+  Subclasses must follow these rules:
+    - Implement CheckPrereq
+    - Implement Exec
+
+  """
+  def __init__(self, lu):
+    self.lu = lu
+
+    # Shortcuts
+    self.cfg = lu.cfg
+    self.rpc = lu.rpc
+
+  def CheckPrereq(self):
+    """Check prerequisites for this tasklets.
+
+    This method should check whether the prerequisites for the execution of
+    this tasklet are fulfilled. It can do internode communication, but it
+    should be idempotent - no cluster or system changes are allowed.
+
+    The method should raise errors.OpPrereqError in case something is not
+    fulfilled. Its return value is ignored.
+
+    This method should also update all parameters to their canonical form if it
+    hasn't been done before.
+
+    """
+    raise NotImplementedError
+
+  def Exec(self, feedback_fn):
+    """Execute the tasklet.
+
+    This method should implement the actual work. It should raise
+    errors.OpExecError for failures that are somewhat dealt with in code, or
+    expected.
+
+    """
+    raise NotImplementedError
+
+
 def _GetWantedNodes(lu, nodes):
   """Returns list of checked and expanded node names.
 
@@ -595,7 +661,7 @@ def _BuildInstanceHookEnvByObject(lu, instance, override=None):
     'disks': [(disk.size, disk.mode) for disk in instance.disks],
     'bep': bep,
     'hvp': hvp,
-    'hypervisor': instance.hypervisor,
+    'hypervisor_name': instance.hypervisor,
   }
   if override:
     args.update(override)
@@ -643,6 +709,32 @@ def _CheckInstanceBridgesExist(lu, instance, node=None):
   _CheckNicsBridgesExist(lu, instance.nics, node)
 
 
+def _GetNodePrimaryInstances(cfg, node_name):
+  """Returns primary instances on a node.
+
+  """
+  instances = []
+
+  for (_, inst) in cfg.GetAllInstancesInfo().iteritems():
+    if node_name == inst.primary_node:
+      instances.append(inst)
+
+  return instances
+
+
+def _GetNodeSecondaryInstances(cfg, node_name):
+  """Returns secondary instances on a node.
+
+  """
+  instances = []
+
+  for (_, inst) in cfg.GetAllInstancesInfo().iteritems():
+    if node_name in inst.secondary_nodes:
+      instances.append(inst)
+
+  return instances
+
+
 class LUDestroyCluster(NoHooksLU):
   """Logical unit for destroying the cluster.
 
@@ -1566,6 +1658,13 @@ class LUSetClusterParams(LogicalUnit):
 
     if self.op.enabled_hypervisors is not None:
       self.hv_list = self.op.enabled_hypervisors
+      if not self.hv_list:
+        raise errors.OpPrereqError("Enabled hypervisors list must contain at"
+                                   " least one member")
+      invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES
+      if invalid_hvs:
+        raise errors.OpPrereqError("Enabled hypervisors contains invalid"
+                                   " entries: %s" % invalid_hvs)
     else:
       self.hv_list = cluster.enabled_hypervisors
 
@@ -1634,6 +1733,7 @@ def _RedistributeAncillaryFiles(lu, additional_nodes=None):
                     constants.SSH_KNOWN_HOSTS_FILE,
                     constants.RAPI_CERT_FILE,
                     constants.RAPI_USERS_FILE,
+                    constants.HMAC_CLUSTER_KEY,
                    ])
 
   enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
@@ -2206,6 +2306,106 @@ class LUQueryNodeVolumes(NoHooksLU):
     return output
 
 
+class LUQueryNodeStorage(NoHooksLU):
+  """Logical unit for getting information on storage units on node(s).
+
+  """
+  _OP_REQP = ["nodes", "storage_type", "output_fields"]
+  REQ_BGL = False
+  _FIELDS_STATIC = utils.FieldSet("node")
+
+  def ExpandNames(self):
+    storage_type = self.op.storage_type
+
+    if storage_type not in constants.VALID_STORAGE_FIELDS:
+      raise errors.OpPrereqError("Unknown storage type: %s" % storage_type)
+
+    dynamic_fields = constants.VALID_STORAGE_FIELDS[storage_type]
+
+    _CheckOutputFields(static=self._FIELDS_STATIC,
+                       dynamic=utils.FieldSet(*dynamic_fields),
+                       selected=self.op.output_fields)
+
+    self.needed_locks = {}
+    self.share_locks[locking.LEVEL_NODE] = 1
+
+    if self.op.nodes:
+      self.needed_locks[locking.LEVEL_NODE] = \
+        _GetWantedNodes(self, self.op.nodes)
+    else:
+      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+
+  def CheckPrereq(self):
+    """Check prerequisites.
+
+    This checks that the fields required are valid output fields.
+
+    """
+    self.op.name = getattr(self.op, "name", None)
+
+    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
+
+  def Exec(self, feedback_fn):
+    """Computes the list of nodes and their attributes.
+
+    """
+    # Special case for file storage
+    if self.op.storage_type == constants.ST_FILE:
+      st_args = [self.cfg.GetFileStorageDir()]
+    else:
+      st_args = []
+
+    # Always get name to sort by
+    if constants.SF_NAME in self.op.output_fields:
+      fields = self.op.output_fields[:]
+    else:
+      fields = [constants.SF_NAME] + self.op.output_fields
+
+    # Never ask for node as it's only known to the LU
+    while "node" in fields:
+      fields.remove("node")
+
+    field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
+    name_idx = field_idx[constants.SF_NAME]
+
+    data = self.rpc.call_storage_list(self.nodes,
+                                      self.op.storage_type, st_args,
+                                      self.op.name, fields)
+
+    result = []
+
+    for node in utils.NiceSort(self.nodes):
+      nresult = data[node]
+      if nresult.offline:
+        continue
+
+      msg = nresult.fail_msg
+      if msg:
+        self.LogWarning("Can't get storage data from node %s: %s", node, msg)
+        continue
+
+      rows = dict([(row[name_idx], row) for row in nresult.payload])
+
+      for name in utils.NiceSort(rows.keys()):
+        row = rows[name]
+
+        out = []
+
+        for field in self.op.output_fields:
+          if field == "node":
+            val = node
+          elif field in field_idx:
+            val = row[field_idx[field]]
+          else:
+            raise errors.ParameterError(field)
+
+          out.append(val)
+
+        result.append(out)
+
+    return result
+
+
 class LUAddNode(LogicalUnit):
   """Logical unit for adding node to the cluster.
 
@@ -3828,9 +4028,14 @@ class LUMigrateInstance(LogicalUnit):
 
   def ExpandNames(self):
     self._ExpandAndLockInstance()
+
     self.needed_locks[locking.LEVEL_NODE] = []
     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
 
+    self._migrater = TLMigrateInstance(self, self.op.instance_name,
+                                       self.op.live, self.op.cleanup)
+    self.tasklets = [self._migrater]
+
   def DeclareLocks(self, level):
     if level == locking.LEVEL_NODE:
       self._LockInstancesNodes()
@@ -3841,12 +4046,80 @@ class LUMigrateInstance(LogicalUnit):
     This runs on master, primary and secondary nodes of the instance.
 
     """
-    env = _BuildInstanceHookEnvByObject(self, self.instance)
+    instance = self._migrater.instance
+    env = _BuildInstanceHookEnvByObject(self, instance)
     env["MIGRATE_LIVE"] = self.op.live
     env["MIGRATE_CLEANUP"] = self.op.cleanup
-    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
+    nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
     return env, nl, nl
 
+
+class LUMigrateNode(LogicalUnit):
+  """Migrate all instances from a node.
+
+  """
+  HPATH = "node-migrate"
+  HTYPE = constants.HTYPE_NODE
+  _OP_REQP = ["node_name", "live"]
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name)
+    if self.op.node_name is None:
+      raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name)
+
+    self.needed_locks = {
+      locking.LEVEL_NODE: [self.op.node_name],
+      }
+
+    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
+
+    # Create tasklets for migrating instances for all instances on this node
+    names = []
+    tasklets = []
+
+    for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name):
+      logging.debug("Migrating instance %s", inst.name)
+      names.append(inst.name)
+
+      tasklets.append(TLMigrateInstance(self, inst.name, self.op.live, False))
+
+    self.tasklets = tasklets
+
+    # Declare instance locks
+    self.needed_locks[locking.LEVEL_INSTANCE] = names
+
+  def DeclareLocks(self, level):
+    if level == locking.LEVEL_NODE:
+      self._LockInstancesNodes()
+
+  def BuildHooksEnv(self):
+    """Build hooks env.
+
+    This runs on the master, the primary and all the secondaries.
+
+    """
+    env = {
+      "NODE_NAME": self.op.node_name,
+      }
+
+    nl = [self.cfg.GetMasterNode()]
+
+    return (env, nl, nl)
+
+
+class TLMigrateInstance(Tasklet):
+  def __init__(self, lu, instance_name, live, cleanup):
+    """Initializes this class.
+
+    """
+    Tasklet.__init__(self, lu)
+
+    # Parameters
+    self.instance_name = instance_name
+    self.live = live
+    self.cleanup = cleanup
+
   def CheckPrereq(self):
     """Check prerequisites.
 
@@ -3854,10 +4127,10 @@ class LUMigrateInstance(LogicalUnit):
 
     """
     instance = self.cfg.GetInstanceInfo(
-      self.cfg.ExpandInstanceName(self.op.instance_name))
+      self.cfg.ExpandInstanceName(self.instance_name))
     if instance is None:
       raise errors.OpPrereqError("Instance '%s' not known" %
-                                 self.op.instance_name)
+                                 self.instance_name)
 
     if instance.disk_template != constants.DT_DRBD8:
       raise errors.OpPrereqError("Instance's disk layout is not"
@@ -3879,7 +4152,7 @@ class LUMigrateInstance(LogicalUnit):
     # check bridge existance
     _CheckInstanceBridgesExist(self, instance, node=target_node)
 
-    if not self.op.cleanup:
+    if not self.cleanup:
       _CheckNodeNotDrained(self, target_node)
       result = self.rpc.call_instance_migratable(instance.primary_node,
                                                  instance)
@@ -4026,10 +4299,10 @@ class LUMigrateInstance(LogicalUnit):
       self._GoReconnect(False)
       self._WaitUntilSync()
     except errors.OpExecError, err:
-      self.LogWarning("Migration failed and I can't reconnect the"
-                      " drives: error '%s'\n"
-                      "Please look and recover the instance status" %
-                      str(err))
+      self.lu.LogWarning("Migration failed and I can't reconnect the"
+                         " drives: error '%s'\n"
+                         "Please look and recover the instance status" %
+                         str(err))
 
   def _AbortMigration(self):
     """Call the hypervisor code to abort a started migration.
@@ -4109,7 +4382,7 @@ class LUMigrateInstance(LogicalUnit):
     time.sleep(10)
     result = self.rpc.call_instance_migrate(source_node, instance,
                                             self.nodes_ip[target_node],
-                                            self.op.live)
+                                            self.live)
     msg = result.fail_msg
     if msg:
       logging.error("Instance migration failed, trying to revert"
@@ -4147,6 +4420,8 @@ class LUMigrateInstance(LogicalUnit):
     """Perform the migration.
 
     """
+    feedback_fn("Migrating instance %s" % self.instance.name)
+
     self.feedback_fn = feedback_fn
 
     self.source_node = self.instance.primary_node
@@ -4156,7 +4431,8 @@ class LUMigrateInstance(LogicalUnit):
       self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
       self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
       }
-    if self.op.cleanup:
+
+    if self.cleanup:
       return self._ExecCleanup()
     else:
       return self._ExecMigration()
@@ -4678,7 +4954,7 @@ class LUCreateInstance(LogicalUnit):
 
     """
     nics = [n.ToDict() for n in self.nics]
-    ial = IAllocator(self,
+    ial = IAllocator(self.cfg, self.rpc,
                      mode=constants.IALLOCATOR_MODE_ALLOC,
                      name=self.op.instance_name,
                      disk_template=self.op.disk_template,
@@ -4736,7 +5012,7 @@ class LUCreateInstance(LogicalUnit):
       disks=[(d["size"], d["mode"]) for d in self.disks],
       bep=self.be_full,
       hvp=self.hv_full,
-      hypervisor=self.op.hypervisor,
+      hypervisor_name=self.op.hypervisor,
     ))
 
     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
@@ -5113,43 +5389,40 @@ class LUReplaceDisks(LogicalUnit):
     if not hasattr(self.op, "iallocator"):
       self.op.iallocator = None
 
-    # check for valid parameter combination
-    cnt = [self.op.remote_node, self.op.iallocator].count(None)
-    if self.op.mode == constants.REPLACE_DISK_CHG:
-      if cnt == 2:
-        raise errors.OpPrereqError("When changing the secondary either an"
-                                   " iallocator script must be used or the"
-                                   " new node given")
-      elif cnt == 0:
-        raise errors.OpPrereqError("Give either the iallocator or the new"
-                                   " secondary, not both")
-    else: # not replacing the secondary
-      if cnt != 2:
-        raise errors.OpPrereqError("The iallocator and new node options can"
-                                   " be used only when changing the"
-                                   " secondary node")
+    TLReplaceDisks.CheckArguments(self.op.mode, self.op.remote_node,
+                                  self.op.iallocator)
 
   def ExpandNames(self):
     self._ExpandAndLockInstance()
 
     if self.op.iallocator is not None:
       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+
     elif self.op.remote_node is not None:
       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
       if remote_node is None:
         raise errors.OpPrereqError("Node '%s' not known" %
                                    self.op.remote_node)
+
       self.op.remote_node = remote_node
+
       # Warning: do not remove the locking of the new secondary here
       # unless DRBD8.AddChildren is changed to work in parallel;
       # currently it doesn't since parallel invocations of
       # FindUnusedMinor will conflict
       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
+
     else:
       self.needed_locks[locking.LEVEL_NODE] = []
       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
 
+    self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
+                                   self.op.iallocator, self.op.remote_node,
+                                   self.op.disks)
+
+    self.tasklets = [self.replacer]
+
   def DeclareLocks(self, level):
     # If we're not already locking all nodes in the set we have to declare the
     # instance's primary/secondary nodes.
@@ -5157,213 +5430,463 @@ class LUReplaceDisks(LogicalUnit):
         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
       self._LockInstancesNodes()
 
-  def _RunAllocator(self):
-    """Compute a new secondary node using an IAllocator.
-
-    """
-    ial = IAllocator(self,
-                     mode=constants.IALLOCATOR_MODE_RELOC,
-                     name=self.op.instance_name,
-                     relocate_from=[self.sec_node])
-
-    ial.Run(self.op.iallocator)
-
-    if not ial.success:
-      raise errors.OpPrereqError("Can't compute nodes using"
-                                 " iallocator '%s': %s" % (self.op.iallocator,
-                                                           ial.info))
-    if len(ial.nodes) != ial.required_nodes:
-      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
-                                 " of nodes (%s), required %s" %
-                                 (len(ial.nodes), ial.required_nodes))
-    self.op.remote_node = ial.nodes[0]
-    self.LogInfo("Selected new secondary for the instance: %s",
-                 self.op.remote_node)
-
   def BuildHooksEnv(self):
     """Build hooks env.
 
     This runs on the master, the primary and all the secondaries.
 
     """
+    instance = self.replacer.instance
     env = {
       "MODE": self.op.mode,
       "NEW_SECONDARY": self.op.remote_node,
-      "OLD_SECONDARY": self.instance.secondary_nodes[0],
+      "OLD_SECONDARY": instance.secondary_nodes[0],
       }
-    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
+    env.update(_BuildInstanceHookEnvByObject(self, instance))
     nl = [
       self.cfg.GetMasterNode(),
-      self.instance.primary_node,
+      instance.primary_node,
       ]
     if self.op.remote_node is not None:
       nl.append(self.op.remote_node)
     return env, nl, nl
 
+
+class LUEvacuateNode(LogicalUnit):
+  """Relocate the secondary instances from a node.
+
+  """
+  HPATH = "node-evacuate"
+  HTYPE = constants.HTYPE_NODE
+  _OP_REQP = ["node_name"]
+  REQ_BGL = False
+
+  def CheckArguments(self):
+    if not hasattr(self.op, "remote_node"):
+      self.op.remote_node = None
+    if not hasattr(self.op, "iallocator"):
+      self.op.iallocator = None
+
+    TLReplaceDisks.CheckArguments(constants.REPLACE_DISK_CHG,
+                                  self.op.remote_node,
+                                  self.op.iallocator)
+
+  def ExpandNames(self):
+    self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name)
+    if self.op.node_name is None:
+      raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name)
+
+    self.needed_locks = {}
+
+    # Declare node locks
+    if self.op.iallocator is not None:
+      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+
+    elif self.op.remote_node is not None:
+      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
+      if remote_node is None:
+        raise errors.OpPrereqError("Node '%s' not known" %
+                                   self.op.remote_node)
+
+      self.op.remote_node = remote_node
+
+      # Warning: do not remove the locking of the new secondary here
+      # unless DRBD8.AddChildren is changed to work in parallel;
+      # currently it doesn't since parallel invocations of
+      # FindUnusedMinor will conflict
+      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
+      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
+
+    else:
+      raise errors.OpPrereqError("Invalid parameters")
+
+    # Create tasklets for replacing disks for all secondary instances on this
+    # node
+    names = []
+    tasklets = []
+
+    for inst in _GetNodeSecondaryInstances(self.cfg, self.op.node_name):
+      logging.debug("Replacing disks for instance %s", inst.name)
+      names.append(inst.name)
+
+      replacer = TLReplaceDisks(self, inst.name, constants.REPLACE_DISK_CHG,
+                                self.op.iallocator, self.op.remote_node, [])
+      tasklets.append(replacer)
+
+    self.tasklets = tasklets
+    self.instance_names = names
+
+    # Declare instance locks
+    self.needed_locks[locking.LEVEL_INSTANCE] = self.instance_names
+
+  def DeclareLocks(self, level):
+    # If we're not already locking all nodes in the set we have to declare the
+    # instance's primary/secondary nodes.
+    if (level == locking.LEVEL_NODE and
+        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
+      self._LockInstancesNodes()
+
+  def BuildHooksEnv(self):
+    """Build hooks env.
+
+    This runs on the master, the primary and all the secondaries.
+
+    """
+    env = {
+      "NODE_NAME": self.op.node_name,
+      }
+
+    nl = [self.cfg.GetMasterNode()]
+
+    if self.op.remote_node is not None:
+      env["NEW_SECONDARY"] = self.op.remote_node
+      nl.append(self.op.remote_node)
+
+    return (env, nl, nl)
+
+
+class TLReplaceDisks(Tasklet):
+  """Replaces disks for an instance.
+
+  Note: Locking is not within the scope of this class.
+
+  """
+  def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
+               disks):
+    """Initializes this class.
+
+    """
+    Tasklet.__init__(self, lu)
+
+    # Parameters
+    self.instance_name = instance_name
+    self.mode = mode
+    self.iallocator_name = iallocator_name
+    self.remote_node = remote_node
+    self.disks = disks
+
+    # Runtime data
+    self.instance = None
+    self.new_node = None
+    self.target_node = None
+    self.other_node = None
+    self.remote_node_info = None
+    self.node_secondary_ip = None
+
+  @staticmethod
+  def CheckArguments(mode, remote_node, iallocator):
+    """Helper function for users of this class.
+
+    """
+    # check for valid parameter combination
+    cnt = [remote_node, iallocator].count(None)
+    if mode == constants.REPLACE_DISK_CHG:
+      if cnt == 2:
+        raise errors.OpPrereqError("When changing the secondary either an"
+                                   " iallocator script must be used or the"
+                                   " new node given")
+      elif cnt == 0:
+        raise errors.OpPrereqError("Give either the iallocator or the new"
+                                   " secondary, not both")
+    else: # not replacing the secondary
+      if cnt != 2:
+        raise errors.OpPrereqError("The iallocator and new node options can"
+                                   " be used only when changing the"
+                                   " secondary node")
+
+  @staticmethod
+  def _RunAllocator(lu, iallocator_name, instance_name, relocate_from):
+    """Compute a new secondary node using an IAllocator.
+
+    """
+    ial = IAllocator(lu.cfg, lu.rpc,
+                     mode=constants.IALLOCATOR_MODE_RELOC,
+                     name=instance_name,
+                     relocate_from=relocate_from)
+
+    ial.Run(iallocator_name)
+
+    if not ial.success:
+      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
+                                 " %s" % (iallocator_name, ial.info))
+
+    if len(ial.nodes) != ial.required_nodes:
+      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
+                                 " of nodes (%s), required %s" %
+                                 (len(ial.nodes), ial.required_nodes))
+
+    remote_node_name = ial.nodes[0]
+
+    lu.LogInfo("Selected new secondary for instance '%s': %s",
+               instance_name, remote_node_name)
+
+    return remote_node_name
+
   def CheckPrereq(self):
     """Check prerequisites.
 
     This checks that the instance is in the cluster.
 
     """
-    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
-    assert instance is not None, \
-      "Cannot retrieve locked instance %s" % self.op.instance_name
-    self.instance = instance
+    self.instance = self.cfg.GetInstanceInfo(self.instance_name)
+    assert self.instance is not None, \
+      "Cannot retrieve locked instance %s" % self.instance_name
 
-    if instance.disk_template != constants.DT_DRBD8:
+    if self.instance.disk_template != constants.DT_DRBD8:
       raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
                                  " instances")
 
-    if len(instance.secondary_nodes) != 1:
+    if len(self.instance.secondary_nodes) != 1:
       raise errors.OpPrereqError("The instance has a strange layout,"
                                  " expected one secondary but found %d" %
-                                 len(instance.secondary_nodes))
+                                 len(self.instance.secondary_nodes))
 
-    self.sec_node = instance.secondary_nodes[0]
+    secondary_node = self.instance.secondary_nodes[0]
 
-    if self.op.iallocator is not None:
-      self._RunAllocator()
+    if self.iallocator_name is None:
+      remote_node = self.remote_node
+    else:
+      remote_node = self._RunAllocator(self.lu, self.iallocator_name,
+                                       self.instance.name, secondary_node)
 
-    remote_node = self.op.remote_node
     if remote_node is not None:
       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
       assert self.remote_node_info is not None, \
         "Cannot retrieve locked node %s" % remote_node
     else:
       self.remote_node_info = None
-    if remote_node == instance.primary_node:
+
+    if remote_node == self.instance.primary_node:
       raise errors.OpPrereqError("The specified node is the primary node of"
                                  " the instance.")
-    elif remote_node == self.sec_node:
+
+    if remote_node == secondary_node:
       raise errors.OpPrereqError("The specified node is already the"
                                  " secondary node of the instance.")
 
-    if self.op.mode == constants.REPLACE_DISK_PRI:
-      n1 = self.tgt_node = instance.primary_node
-      n2 = self.oth_node = self.sec_node
-    elif self.op.mode == constants.REPLACE_DISK_SEC:
-      n1 = self.tgt_node = self.sec_node
-      n2 = self.oth_node = instance.primary_node
-    elif self.op.mode == constants.REPLACE_DISK_CHG:
-      n1 = self.new_node = remote_node
-      n2 = self.oth_node = instance.primary_node
-      self.tgt_node = self.sec_node
-      _CheckNodeNotDrained(self, remote_node)
-    else:
-      raise errors.ProgrammerError("Unhandled disk replace mode")
+    if self.mode == constants.REPLACE_DISK_PRI:
+      self.target_node = self.instance.primary_node
+      self.other_node = secondary_node
+      check_nodes = [self.target_node, self.other_node]
+
+    elif self.mode == constants.REPLACE_DISK_SEC:
+      self.target_node = secondary_node
+      self.other_node = self.instance.primary_node
+      check_nodes = [self.target_node, self.other_node]
 
-    _CheckNodeOnline(self, n1)
-    _CheckNodeOnline(self, n2)
+    elif self.mode == constants.REPLACE_DISK_CHG:
+      self.new_node = remote_node
+      self.other_node = self.instance.primary_node
+      self.target_node = secondary_node
+      check_nodes = [self.new_node, self.other_node]
 
-    if not self.op.disks:
-      self.op.disks = range(len(instance.disks))
+      _CheckNodeNotDrained(self.lu, remote_node)
 
-    for disk_idx in self.op.disks:
-      instance.FindDisk(disk_idx)
+    else:
+      raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
+                                   self.mode)
 
-  def _ExecD8DiskOnly(self, feedback_fn):
-    """Replace a disk on the primary or secondary for dbrd8.
+    for node in check_nodes:
+      _CheckNodeOnline(self.lu, node)
 
-    The algorithm for replace is quite complicated:
+    # If not specified all disks should be replaced
+    if not self.disks:
+      self.disks = range(len(self.instance.disks))
 
-      1. for each disk to be replaced:
+    # Check whether disks are valid
+    for disk_idx in self.disks:
+      self.instance.FindDisk(disk_idx)
 
-        1. create new LVs on the target node with unique names
-        1. detach old LVs from the drbd device
-        1. rename old LVs to name_replaced.<time_t>
-        1. rename new LVs to old LVs
-        1. attach the new LVs (with the old names now) to the drbd device
+    # Get secondary node IP addresses
+    node_2nd_ip = {}
 
-      1. wait for sync across all devices
+    for node_name in [self.target_node, self.other_node, self.new_node]:
+      if node_name is not None:
+        node_2nd_ip[node_name] = self.cfg.GetNodeInfo(node_name).secondary_ip
 
-      1. for each modified disk:
+    self.node_secondary_ip = node_2nd_ip
 
-        1. remove old LVs (which have the name name_replaces.<time_t>)
+  def Exec(self, feedback_fn):
+    """Execute disk replacement.
 
-    Failures are not very well handled.
+    This dispatches the disk replacement to the appropriate handler.
 
     """
-    steps_total = 6
-    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
-    instance = self.instance
-    iv_names = {}
+    feedback_fn("Replacing disks for %s" % self.instance.name)
+
+    activate_disks = (not self.instance.admin_up)
+
+    # Activate the instance disks if we're replacing them on a down instance
+    if activate_disks:
+      _StartInstanceDisks(self.lu, self.instance, True)
+
+    try:
+      if self.mode == constants.REPLACE_DISK_CHG:
+        return self._ExecDrbd8Secondary()
+      else:
+        return self._ExecDrbd8DiskOnly()
+
+    finally:
+      # Deactivate the instance disks if we're replacing them on a down instance
+      if activate_disks:
+        _SafeShutdownInstanceDisks(self.lu, self.instance)
+
+  def _CheckVolumeGroup(self, nodes):
+    self.lu.LogInfo("Checking volume groups")
+
     vgname = self.cfg.GetVGName()
-    # start of work
-    cfg = self.cfg
-    tgt_node = self.tgt_node
-    oth_node = self.oth_node
 
-    # Step: check device activation
-    self.proc.LogStep(1, steps_total, "check device existence")
-    info("checking volume groups")
-    my_vg = cfg.GetVGName()
-    results = self.rpc.call_vg_list([oth_node, tgt_node])
+    # Make sure volume group exists on all involved nodes
+    results = self.rpc.call_vg_list(nodes)
     if not results:
       raise errors.OpExecError("Can't list volume groups on the nodes")
-    for node in oth_node, tgt_node:
+
+    for node in nodes:
       res = results[node]
       res.Raise("Error checking node %s" % node)
-      if my_vg not in res.payload:
-        raise errors.OpExecError("Volume group '%s' not found on %s" %
-                                 (my_vg, node))
-    for idx, dev in enumerate(instance.disks):
-      if idx not in self.op.disks:
+      if vgname not in res.payload:
+        raise errors.OpExecError("Volume group '%s' not found on node %s" %
+                                 (vgname, node))
+
+  def _CheckDisksExistence(self, nodes):
+    # Check disk existence
+    for idx, dev in enumerate(self.instance.disks):
+      if idx not in self.disks:
         continue
-      for node in tgt_node, oth_node:
-        info("checking disk/%d on %s" % (idx, node))
-        cfg.SetDiskID(dev, node)
+
+      for node in nodes:
+        self.lu.LogInfo("Checking disk/%d on %s" % (idx, node))
+        self.cfg.SetDiskID(dev, node)
+
         result = self.rpc.call_blockdev_find(node, dev)
+
         msg = result.fail_msg
-        if not msg and not result.payload:
-          msg = "disk not found"
-        if msg:
+        if msg or not result.payload:
+          if not msg:
+            msg = "disk not found"
           raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
                                    (idx, node, msg))
 
-    # Step: check other node consistency
-    self.proc.LogStep(2, steps_total, "check peer consistency")
-    for idx, dev in enumerate(instance.disks):
-      if idx not in self.op.disks:
+  def _CheckDisksConsistency(self, node_name, on_primary, ldisk):
+    for idx, dev in enumerate(self.instance.disks):
+      if idx not in self.disks:
         continue
-      info("checking disk/%d consistency on %s" % (idx, oth_node))
-      if not _CheckDiskConsistency(self, dev, oth_node,
-                                   oth_node==instance.primary_node):
-        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
-                                 " to replace disks on this node (%s)" %
-                                 (oth_node, tgt_node))
 
-    # Step: create new storage
-    self.proc.LogStep(3, steps_total, "allocate new storage")
-    for idx, dev in enumerate(instance.disks):
-      if idx not in self.op.disks:
+      self.lu.LogInfo("Checking disk/%d consistency on node %s" %
+                      (idx, node_name))
+
+      if not _CheckDiskConsistency(self.lu, dev, node_name, on_primary,
+                                   ldisk=ldisk):
+        raise errors.OpExecError("Node %s has degraded storage, unsafe to"
+                                 " replace disks for instance %s" %
+                                 (node_name, self.instance.name))
+
+  def _CreateNewStorage(self, node_name):
+    vgname = self.cfg.GetVGName()
+    iv_names = {}
+
+    for idx, dev in enumerate(self.instance.disks):
+      if idx not in self.disks:
         continue
-      size = dev.size
-      cfg.SetDiskID(dev, tgt_node)
-      lv_names = [".disk%d_%s" % (idx, suf)
-                  for suf in ["data", "meta"]]
-      names = _GenerateUniqueNames(self, lv_names)
-      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
+
+      self.lu.LogInfo("Adding storage on %s for disk/%d" % (node_name, idx))
+
+      self.cfg.SetDiskID(dev, node_name)
+
+      lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
+      names = _GenerateUniqueNames(self.lu, lv_names)
+
+      lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
                              logical_id=(vgname, names[0]))
       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
                              logical_id=(vgname, names[1]))
+
       new_lvs = [lv_data, lv_meta]
       old_lvs = dev.children
       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
-      info("creating new local storage on %s for %s" %
-           (tgt_node, dev.iv_name))
+
       # we pass force_create=True to force the LVM creation
       for new_lv in new_lvs:
-        _CreateBlockDev(self, tgt_node, instance, new_lv, True,
-                        _GetInstanceInfoText(instance), False)
+        _CreateBlockDev(self.lu, node_name, self.instance, new_lv, True,
+                        _GetInstanceInfoText(self.instance), False)
+
+    return iv_names
+
+  def _CheckDevices(self, node_name, iv_names):
+    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
+      self.cfg.SetDiskID(dev, node_name)
+
+      result = self.rpc.call_blockdev_find(node_name, dev)
+
+      msg = result.fail_msg
+      if msg or not result.payload:
+        if not msg:
+          msg = "disk not found"
+        raise errors.OpExecError("Can't find DRBD device %s: %s" %
+                                 (name, msg))
+
+      if result.payload[5]:
+        raise errors.OpExecError("DRBD device %s is degraded!" % name)
+
+  def _RemoveOldStorage(self, node_name, iv_names):
+    for name, (dev, old_lvs, _) in iv_names.iteritems():
+      self.lu.LogInfo("Remove logical volumes for %s" % name)
+
+      for lv in old_lvs:
+        self.cfg.SetDiskID(lv, node_name)
+
+        msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg
+        if msg:
+          self.lu.LogWarning("Can't remove old LV: %s" % msg,
+                             hint="remove unused LVs manually")
+
+  def _ExecDrbd8DiskOnly(self):
+    """Replace a disk on the primary or secondary for DRBD 8.
+
+    The algorithm for replace is quite complicated:
+
+      1. for each disk to be replaced:
+
+        1. create new LVs on the target node with unique names
+        1. detach old LVs from the drbd device
+        1. rename old LVs to name_replaced.<time_t>
+        1. rename new LVs to old LVs
+        1. attach the new LVs (with the old names now) to the drbd device
+
+      1. wait for sync across all devices
+
+      1. for each modified disk:
+
+        1. remove old LVs (which have the name name_replaces.<time_t>)
+
+    Failures are not very well handled.
+
+    """
+    steps_total = 6
+
+    # Step: check device activation
+    self.lu.LogStep(1, steps_total, "Check device existence")
+    self._CheckDisksExistence([self.other_node, self.target_node])
+    self._CheckVolumeGroup([self.target_node, self.other_node])
+
+    # Step: check other node consistency
+    self.lu.LogStep(2, steps_total, "Check peer consistency")
+    self._CheckDisksConsistency(self.other_node,
+                                self.other_node == self.instance.primary_node,
+                                False)
+
+    # Step: create new storage
+    self.lu.LogStep(3, steps_total, "Allocate new storage")
+    iv_names = self._CreateNewStorage(self.target_node)
 
     # Step: for each lv, detach+rename*2+attach
-    self.proc.LogStep(4, steps_total, "change drbd configuration")
+    self.lu.LogStep(4, steps_total, "Changing drbd configuration")
     for dev, old_lvs, new_lvs in iv_names.itervalues():
-      info("detaching %s drbd from local storage" % dev.iv_name)
-      result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
+      self.lu.LogInfo("Detaching %s drbd from local storage" % dev.iv_name)
+
+      result = self.rpc.call_blockdev_removechildren(self.target_node, dev, old_lvs)
       result.Raise("Can't detach drbd from local storage on node"
-                   " %s for device %s" % (tgt_node, dev.iv_name))
+                   " %s for device %s" % (self.target_node, dev.iv_name))
       #dev.children = []
       #cfg.Update(instance)
 
@@ -5377,81 +5900,66 @@ class LUReplaceDisks(LogicalUnit):
       temp_suffix = int(time.time())
       ren_fn = lambda d, suff: (d.physical_id[0],
                                 d.physical_id[1] + "_replaced-%s" % suff)
-      # build the rename list based on what LVs exist on the node
-      rlist = []
+
+      # Build the rename list based on what LVs exist on the node
+      rename_old_to_new = []
       for to_ren in old_lvs:
-        result = self.rpc.call_blockdev_find(tgt_node, to_ren)
+        result = self.rpc.call_blockdev_find(self.target_node, to_ren)
         if not result.fail_msg and result.payload:
           # device exists
-          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
+          rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
 
-      info("renaming the old LVs on the target node")
-      result = self.rpc.call_blockdev_rename(tgt_node, rlist)
-      result.Raise("Can't rename old LVs on node %s" % tgt_node)
-      # now we rename the new LVs to the old LVs
-      info("renaming the new LVs on the target node")
-      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
-      result = self.rpc.call_blockdev_rename(tgt_node, rlist)
-      result.Raise("Can't rename new LVs on node %s" % tgt_node)
+      self.lu.LogInfo("Renaming the old LVs on the target node")
+      result = self.rpc.call_blockdev_rename(self.target_node, rename_old_to_new)
+      result.Raise("Can't rename old LVs on node %s" % self.target_node)
+
+      # Now we rename the new LVs to the old LVs
+      self.lu.LogInfo("Renaming the new LVs on the target node")
+      rename_new_to_old = [(new, old.physical_id)
+                           for old, new in zip(old_lvs, new_lvs)]
+      result = self.rpc.call_blockdev_rename(self.target_node, rename_new_to_old)
+      result.Raise("Can't rename new LVs on node %s" % self.target_node)
 
       for old, new in zip(old_lvs, new_lvs):
         new.logical_id = old.logical_id
-        cfg.SetDiskID(new, tgt_node)
+        self.cfg.SetDiskID(new, self.target_node)
 
       for disk in old_lvs:
         disk.logical_id = ren_fn(disk, temp_suffix)
-        cfg.SetDiskID(disk, tgt_node)
+        self.cfg.SetDiskID(disk, self.target_node)
 
-      # now that the new lvs have the old name, we can add them to the device
-      info("adding new mirror component on %s" % tgt_node)
-      result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
+      # Now that the new lvs have the old name, we can add them to the device
+      self.lu.LogInfo("Adding new mirror component on %s" % self.target_node)
+      result = self.rpc.call_blockdev_addchildren(self.target_node, dev, new_lvs)
       msg = result.fail_msg
       if msg:
         for new_lv in new_lvs:
-          msg2 = self.rpc.call_blockdev_remove(tgt_node, new_lv).fail_msg
+          msg2 = self.rpc.call_blockdev_remove(self.target_node, new_lv).fail_msg
           if msg2:
-            warning("Can't rollback device %s: %s", dev, msg2,
-                    hint="cleanup manually the unused logical volumes")
+            self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
+                               hint=("cleanup manually the unused logical"
+                                     "volumes"))
         raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
 
       dev.children = new_lvs
-      cfg.Update(instance)
 
-    # Step: wait for sync
+      self.cfg.Update(self.instance)
 
-    # this can fail as the old devices are degraded and _WaitForSync
-    # does a combined result over all disks, so we don't check its
-    # return value
-    self.proc.LogStep(5, steps_total, "sync devices")
-    _WaitForSync(self, instance, unlock=True)
+    # Wait for sync
+    # This can fail as the old devices are degraded and _WaitForSync
+    # does a combined result over all disks, so we don't check its return value
+    self.lu.LogStep(5, steps_total, "Sync devices")
+    _WaitForSync(self.lu, self.instance, unlock=True)
 
-    # so check manually all the devices
-    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
-      cfg.SetDiskID(dev, instance.primary_node)
-      result = self.rpc.call_blockdev_find(instance.primary_node, dev)
-      msg = result.fail_msg
-      if not msg and not result.payload:
-        msg = "disk not found"
-      if msg:
-        raise errors.OpExecError("Can't find DRBD device %s: %s" %
-                                 (name, msg))
-      if result.payload[5]:
-        raise errors.OpExecError("DRBD device %s is degraded!" % name)
+    # Check all devices manually
+    self._CheckDevices(self.instance.primary_node, iv_names)
 
     # Step: remove old storage
-    self.proc.LogStep(6, steps_total, "removing old storage")
-    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
-      info("remove logical volumes for %s" % name)
-      for lv in old_lvs:
-        cfg.SetDiskID(lv, tgt_node)
-        msg = self.rpc.call_blockdev_remove(tgt_node, lv).fail_msg
-        if msg:
-          warning("Can't remove old LV: %s" % msg,
-                  hint="manually remove unused LVs")
-          continue
+    self.lu.LogStep(6, steps_total, "Removing old storage")
+    self._RemoveOldStorage(self.target_node, iv_names)
 
-  def _ExecD8Secondary(self, feedback_fn):
-    """Replace the secondary node for drbd8.
+  def _ExecDrbd8Secondary(self):
+    """Replace the secondary node for DRBD 8.
 
     The algorithm for replace is quite complicated:
       - for all disks of the instance:
@@ -5470,86 +5978,49 @@ class LUReplaceDisks(LogicalUnit):
 
     """
     steps_total = 6
-    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
-    instance = self.instance
-    iv_names = {}
-    # start of work
-    cfg = self.cfg
-    old_node = self.tgt_node
-    new_node = self.new_node
-    pri_node = instance.primary_node
-    nodes_ip = {
-      old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
-      new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
-      pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
-      }
 
     # Step: check device activation
-    self.proc.LogStep(1, steps_total, "check device existence")
-    info("checking volume groups")
-    my_vg = cfg.GetVGName()
-    results = self.rpc.call_vg_list([pri_node, new_node])
-    for node in pri_node, new_node:
-      res = results[node]
-      res.Raise("Error checking node %s" % node)
-      if my_vg not in res.payload:
-        raise errors.OpExecError("Volume group '%s' not found on %s" %
-                                 (my_vg, node))
-    for idx, dev in enumerate(instance.disks):
-      if idx not in self.op.disks:
-        continue
-      info("checking disk/%d on %s" % (idx, pri_node))
-      cfg.SetDiskID(dev, pri_node)
-      result = self.rpc.call_blockdev_find(pri_node, dev)
-      msg = result.fail_msg
-      if not msg and not result.payload:
-        msg = "disk not found"
-      if msg:
-        raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
-                                 (idx, pri_node, msg))
+    self.lu.LogStep(1, steps_total, "Check device existence")
+    self._CheckDisksExistence([self.instance.primary_node])
+    self._CheckVolumeGroup([self.instance.primary_node])
 
     # Step: check other node consistency
-    self.proc.LogStep(2, steps_total, "check peer consistency")
-    for idx, dev in enumerate(instance.disks):
-      if idx not in self.op.disks:
-        continue
-      info("checking disk/%d consistency on %s" % (idx, pri_node))
-      if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
-        raise errors.OpExecError("Primary node (%s) has degraded storage,"
-                                 " unsafe to replace the secondary" %
-                                 pri_node)
+    self.lu.LogStep(2, steps_total, "Check peer consistency")
+    self._CheckDisksConsistency(self.instance.primary_node, True, True)
 
     # Step: create new storage
-    self.proc.LogStep(3, steps_total, "allocate new storage")
-    for idx, dev in enumerate(instance.disks):
-      info("adding new local storage on %s for disk/%d" %
-           (new_node, idx))
+    self.lu.LogStep(3, steps_total, "Allocate new storage")
+    for idx, dev in enumerate(self.instance.disks):
+      self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
+                      (self.new_node, idx))
       # we pass force_create=True to force LVM creation
       for new_lv in dev.children:
-        _CreateBlockDev(self, new_node, instance, new_lv, True,
-                        _GetInstanceInfoText(instance), False)
+        _CreateBlockDev(self.lu, self.new_node, self.instance, new_lv, True,
+                        _GetInstanceInfoText(self.instance), False)
 
     # Step 4: dbrd minors and drbd setups changes
     # after this, we must manually remove the drbd minors on both the
     # error and the success paths
-    minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
-                                   instance.name)
-    logging.debug("Allocated minors %s" % (minors,))
-    self.proc.LogStep(4, steps_total, "changing drbd configuration")
-    for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
-      info("activating a new drbd on %s for disk/%d" % (new_node, idx))
+    self.lu.LogStep(4, steps_total, "Changing drbd configuration")
+    minors = self.cfg.AllocateDRBDMinor([self.new_node for dev in self.instance.disks],
+                                        self.instance.name)
+    logging.debug("Allocated minors %r" % (minors,))
+
+    iv_names = {}
+    for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
+      self.lu.LogInfo("activating a new drbd on %s for disk/%d" % (self.new_node, idx))
       # create new devices on new_node; note that we create two IDs:
       # one without port, so the drbd will be activated without
       # networking information on the new node at this stage, and one
       # with network, for the latter activation in step 4
       (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
-      if pri_node == o_node1:
+      if self.instance.primary_node == o_node1:
         p_minor = o_minor1
       else:
         p_minor = o_minor2
 
-      new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
-      new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
+      new_alone_id = (self.instance.primary_node, self.new_node, None, p_minor, new_minor, o_secret)
+      new_net_id = (self.instance.primary_node, self.new_node, o_port, p_minor, new_minor, o_secret)
 
       iv_names[idx] = (dev, dev.children, new_net_id)
       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
@@ -5559,106 +6030,68 @@ class LUReplaceDisks(LogicalUnit):
                               children=dev.children,
                               size=dev.size)
       try:
-        _CreateSingleBlockDev(self, new_node, instance, new_drbd,
-                              _GetInstanceInfoText(instance), False)
+        _CreateSingleBlockDev(self.lu, self.new_node, self.instance, new_drbd,
+                              _GetInstanceInfoText(self.instance), False)
       except errors.GenericError:
-        self.cfg.ReleaseDRBDMinors(instance.name)
+        self.cfg.ReleaseDRBDMinors(self.instance.name)
         raise
 
-    for idx, dev in enumerate(instance.disks):
-      # we have new devices, shutdown the drbd on the old secondary
-      info("shutting down drbd for disk/%d on old node" % idx)
-      cfg.SetDiskID(dev, old_node)
-      msg = self.rpc.call_blockdev_shutdown(old_node, dev).fail_msg
+    # We have new devices, shutdown the drbd on the old secondary
+    for idx, dev in enumerate(self.instance.disks):
+      self.lu.LogInfo("Shutting down drbd for disk/%d on old node" % idx)
+      self.cfg.SetDiskID(dev, self.target_node)
+      msg = self.rpc.call_blockdev_shutdown(self.target_node, dev).fail_msg
       if msg:
-        warning("Failed to shutdown drbd for disk/%d on old node: %s" %
-                (idx, msg),
-                hint="Please cleanup this device manually as soon as possible")
+        self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
+                           "node: %s" % (idx, msg),
+                           hint=("Please cleanup this device manually as"
+                                 " soon as possible"))
 
-    info("detaching primary drbds from the network (=> standalone)")
-    result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
-                                               instance.disks)[pri_node]
+    self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
+    result = self.rpc.call_drbd_disconnect_net([self.instance.primary_node], self.node_secondary_ip,
+                                               self.instance.disks)[self.instance.primary_node]
 
     msg = result.fail_msg
     if msg:
       # detaches didn't succeed (unlikely)
-      self.cfg.ReleaseDRBDMinors(instance.name)
+      self.cfg.ReleaseDRBDMinors(self.instance.name)
       raise errors.OpExecError("Can't detach the disks from the network on"
                                " old node: %s" % (msg,))
 
     # if we managed to detach at least one, we update all the disks of
     # the instance to point to the new secondary
-    info("updating instance configuration")
+    self.lu.LogInfo("Updating instance configuration")
     for dev, _, new_logical_id in iv_names.itervalues():
       dev.logical_id = new_logical_id
-      cfg.SetDiskID(dev, pri_node)
-    cfg.Update(instance)
+      self.cfg.SetDiskID(dev, self.instance.primary_node)
+
+    self.cfg.Update(self.instance)
 
     # and now perform the drbd attach
-    info("attaching primary drbds to new secondary (standalone => connected)")
-    result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
-                                           instance.disks, instance.name,
+    self.lu.LogInfo("Attaching primary drbds to new secondary"
+                    " (standalone => connected)")
+    result = self.rpc.call_drbd_attach_net([self.instance.primary_node, self.new_node], self.node_secondary_ip,
+                                           self.instance.disks, self.instance.name,
                                            False)
     for to_node, to_result in result.items():
       msg = to_result.fail_msg
       if msg:
-        warning("can't attach drbd disks on node %s: %s", to_node, msg,
-                hint="please do a gnt-instance info to see the"
-                " status of disks")
-
-    # this can fail as the old devices are degraded and _WaitForSync
-    # does a combined result over all disks, so we don't check its
-    # return value
-    self.proc.LogStep(5, steps_total, "sync devices")
-    _WaitForSync(self, instance, unlock=True)
-
-    # so check manually all the devices
-    for idx, (dev, old_lvs, _) in iv_names.iteritems():
-      cfg.SetDiskID(dev, pri_node)
-      result = self.rpc.call_blockdev_find(pri_node, dev)
-      msg = result.fail_msg
-      if not msg and not result.payload:
-        msg = "disk not found"
-      if msg:
-        raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
-                                 (idx, msg))
-      if result.payload[5]:
-        raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
+        self.lu.LogWarning("Can't attach drbd disks on node %s: %s", to_node, msg,
+                           hint=("please do a gnt-instance info to see the"
+                                 " status of disks"))
 
-    self.proc.LogStep(6, steps_total, "removing old storage")
-    for idx, (dev, old_lvs, _) in iv_names.iteritems():
-      info("remove logical volumes for disk/%d" % idx)
-      for lv in old_lvs:
-        cfg.SetDiskID(lv, old_node)
-        msg = self.rpc.call_blockdev_remove(old_node, lv).fail_msg
-        if msg:
-          warning("Can't remove LV on old secondary: %s", msg,
-                  hint="Cleanup stale volumes by hand")
+    # Wait for sync
+    # This can fail as the old devices are degraded and _WaitForSync
+    # does a combined result over all disks, so we don't check its return value
+    self.lu.LogStep(5, steps_total, "Sync devices")
+    _WaitForSync(self.lu, self.instance, unlock=True)
 
-  def Exec(self, feedback_fn):
-    """Execute disk replacement.
+    # Check all devices manually
+    self._CheckDevices(self.instance.primary_node, iv_names)
 
-    This dispatches the disk replacement to the appropriate handler.
-
-    """
-    instance = self.instance
-
-    # Activate the instance disks if we're replacing them on a down instance
-    if not instance.admin_up:
-      _StartInstanceDisks(self, instance, True)
-
-    if self.op.mode == constants.REPLACE_DISK_CHG:
-      fn = self._ExecD8Secondary
-    else:
-      fn = self._ExecD8DiskOnly
-
-    ret = fn(feedback_fn)
-
-    # Deactivate the instance disks if we're replacing them on a down instance
-    if not instance.admin_up:
-      _SafeShutdownInstanceDisks(self, instance)
-
-    return ret
+    # Step: remove old storage
+    self.lu.LogStep(6, steps_total, "Removing old storage")
+    self._RemoveOldStorage(self.target_node, iv_names)
 
 
 class LUGrowDisk(LogicalUnit):
@@ -6888,8 +7321,9 @@ class IAllocator(object):
     "relocate_from",
     ]
 
-  def __init__(self, lu, mode, name, **kwargs):
-    self.lu = lu
+  def __init__(self, cfg, rpc, mode, name, **kwargs):
+    self.cfg = cfg
+    self.rpc = rpc
     # init buffer variables
     self.in_text = self.out_text = self.in_data = self.out_data = None
     # init all input fields so that pylint is happy
@@ -6927,7 +7361,7 @@ class IAllocator(object):
     This is the data that is independent of the actual operation.
 
     """
-    cfg = self.lu.cfg
+    cfg = self.cfg
     cluster_info = cfg.GetClusterInfo()
     # cluster data
     data = {
@@ -6949,10 +7383,11 @@ class IAllocator(object):
     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
       hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
 
-    node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
-                                           hypervisor_name)
-    node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
-                       cluster_info.enabled_hypervisors)
+    node_data = self.rpc.call_node_info(node_list, cfg.GetVGName(),
+                                        hypervisor_name)
+    node_iinfo = \
+      self.rpc.call_all_instances_info(node_list,
+                                       cluster_info.enabled_hypervisors)
     for nname, nresult in node_data.items():
       # first fill in static (config-based) values
       ninfo = cfg.GetNodeInfo(nname)
@@ -7089,7 +7524,7 @@ class IAllocator(object):
     done.
 
     """
-    instance = self.lu.cfg.GetInstanceInfo(self.name)
+    instance = self.cfg.GetInstanceInfo(self.name)
     if instance is None:
       raise errors.ProgrammerError("Unknown instance '%s' passed to"
                                    " IAllocator" % self.name)
@@ -7131,9 +7566,9 @@ class IAllocator(object):
 
     """
     if call_fn is None:
-      call_fn = self.lu.rpc.call_iallocator_runner
+      call_fn = self.rpc.call_iallocator_runner
 
-    result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
+    result = call_fn(self.cfg.GetMasterNode(), name, self.in_text)
     result.Raise("Failure while running the iallocator script")
 
     self.out_text = result.payload
@@ -7237,7 +7672,7 @@ class LUTestAllocator(NoHooksLU):
 
     """
     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
-      ial = IAllocator(self,
+      ial = IAllocator(self.cfg, self.rpc,
                        mode=self.op.mode,
                        name=self.op.name,
                        mem_size=self.op.mem_size,
@@ -7250,7 +7685,7 @@ class LUTestAllocator(NoHooksLU):
                        hypervisor=self.op.hypervisor,
                        )
     else:
-      ial = IAllocator(self,
+      ial = IAllocator(self.cfg, self.rpc,
                        mode=self.op.mode,
                        name=self.op.name,
                        relocate_from=list(self.relocate_from),