- Implement Exec
"""
+ def __init__(self, lu):
+ self.lu = lu
+
+ # Shortcuts
+ self.cfg = lu.cfg
+ self.rpc = lu.rpc
+
def CheckPrereq(self):
"""Check prerequisites for this tasklets.
_CheckNicsBridgesExist(lu, instance.nics, node)
+def _GetNodeSecondaryInstances(cfg, node_name):
+ """Returns secondary instances on a node.
+
+ """
+ instances = []
+
+ for (_, inst) in cfg.GetAllInstancesInfo().iteritems():
+ if node_name in inst.secondary_nodes:
+ instances.append(inst)
+
+ return instances
+
+
class LUDestroyCluster(NoHooksLU):
"""Logical unit for destroying the cluster.
def ExpandNames(self):
self._ExpandAndLockInstance()
+
self.needed_locks[locking.LEVEL_NODE] = []
self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+ self._migrater = TLMigrateInstance(self, self.op.instance_name,
+ self.op.live, self.op.cleanup)
+ self.tasklets.append(self._migrater)
+
def DeclareLocks(self, level):
if level == locking.LEVEL_NODE:
self._LockInstancesNodes()
This runs on master, primary and secondary nodes of the instance.
"""
- env = _BuildInstanceHookEnvByObject(self, self.instance)
+ instance = self._migrater.instance
+ env = _BuildInstanceHookEnvByObject(self, instance)
env["MIGRATE_LIVE"] = self.op.live
env["MIGRATE_CLEANUP"] = self.op.cleanup
- nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
+ nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
return env, nl, nl
+
+class TLMigrateInstance(Tasklet):
+ def __init__(self, lu, instance_name, live, cleanup):
+ """Initializes this class.
+
+ """
+ Tasklet.__init__(self, lu)
+
+ # Parameters
+ self.instance_name = instance_name
+ self.live = live
+ self.cleanup = cleanup
+
def CheckPrereq(self):
"""Check prerequisites.
"""
instance = self.cfg.GetInstanceInfo(
- self.cfg.ExpandInstanceName(self.op.instance_name))
+ self.cfg.ExpandInstanceName(self.instance_name))
if instance is None:
raise errors.OpPrereqError("Instance '%s' not known" %
- self.op.instance_name)
+ self.instance_name)
if instance.disk_template != constants.DT_DRBD8:
raise errors.OpPrereqError("Instance's disk layout is not"
# check bridge existance
_CheckInstanceBridgesExist(self, instance, node=target_node)
- if not self.op.cleanup:
+ if not self.cleanup:
_CheckNodeNotDrained(self, target_node)
result = self.rpc.call_instance_migratable(instance.primary_node,
instance)
self._GoReconnect(False)
self._WaitUntilSync()
except errors.OpExecError, err:
- self.LogWarning("Migration failed and I can't reconnect the"
- " drives: error '%s'\n"
- "Please look and recover the instance status" %
- str(err))
+ self.lu.LogWarning("Migration failed and I can't reconnect the"
+ " drives: error '%s'\n"
+ "Please look and recover the instance status" %
+ str(err))
def _AbortMigration(self):
"""Call the hypervisor code to abort a started migration.
time.sleep(10)
result = self.rpc.call_instance_migrate(source_node, instance,
self.nodes_ip[target_node],
- self.op.live)
+ self.live)
msg = result.fail_msg
if msg:
logging.error("Instance migration failed, trying to revert"
self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
}
- if self.op.cleanup:
+
+ if self.cleanup:
return self._ExecCleanup()
else:
return self._ExecMigration()
if not hasattr(self.op, "iallocator"):
self.op.iallocator = None
- _DiskReplacer.CheckArguments(self.op.mode, self.op.remote_node,
- self.op.iallocator)
+ TLReplaceDisks.CheckArguments(self.op.mode, self.op.remote_node,
+ self.op.iallocator)
def ExpandNames(self):
self._ExpandAndLockInstance()
self.needed_locks[locking.LEVEL_NODE] = []
self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
- self.replacer = _DiskReplacer(self, self.op.instance_name, self.op.mode,
- self.op.iallocator, self.op.remote_node,
- self.op.disks)
+ self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
+ self.op.iallocator, self.op.remote_node,
+ self.op.disks)
+
+ self.tasklets.append(self.replacer)
def DeclareLocks(self, level):
# If we're not already locking all nodes in the set we have to declare the
nl.append(self.op.remote_node)
return env, nl, nl
- def CheckPrereq(self):
- """Check prerequisites.
- This checks that the instance is in the cluster.
+class LUEvacuateNode(LogicalUnit):
+ """Relocate the secondary instances from a node.
- """
- self.replacer.CheckPrereq()
+ """
+ HPATH = "node-evacuate"
+ HTYPE = constants.HTYPE_NODE
+ _OP_REQP = ["node_name"]
+ REQ_BGL = False
- def Exec(self, feedback_fn):
- """Execute disk replacement.
+ def CheckArguments(self):
+ if not hasattr(self.op, "remote_node"):
+ self.op.remote_node = None
+ if not hasattr(self.op, "iallocator"):
+ self.op.iallocator = None
- This dispatches the disk replacement to the appropriate handler.
+ TLReplaceDisks.CheckArguments(constants.REPLACE_DISK_CHG,
+ self.op.remote_node,
+ self.op.iallocator)
+
+ def ExpandNames(self):
+ self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name)
+ if self.op.node_name is None:
+ raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name)
+
+ self.needed_locks = {}
+
+ # Declare node locks
+ if self.op.iallocator is not None:
+ self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+
+ elif self.op.remote_node is not None:
+ remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
+ if remote_node is None:
+ raise errors.OpPrereqError("Node '%s' not known" %
+ self.op.remote_node)
+
+ self.op.remote_node = remote_node
+
+ # Warning: do not remove the locking of the new secondary here
+ # unless DRBD8.AddChildren is changed to work in parallel;
+ # currently it doesn't since parallel invocations of
+ # FindUnusedMinor will conflict
+ self.needed_locks[locking.LEVEL_NODE] = [remote_node]
+ self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
+
+ else:
+ raise errors.OpPrereqError("Invalid parameters")
+
+ # Create tasklets for replacing disks for all secondary instances on this
+ # node
+ names = []
+
+ for inst in _GetNodeSecondaryInstances(self.cfg, self.op.node_name):
+ logging.debug("Replacing disks for instance %s", inst.name)
+ names.append(inst.name)
+
+ replacer = TLReplaceDisks(self, inst.name, constants.REPLACE_DISK_CHG,
+ self.op.iallocator, self.op.remote_node, [])
+ self.tasklets.append(replacer)
+
+ self.instance_names = names
+
+ # Declare instance locks
+ self.needed_locks[locking.LEVEL_INSTANCE] = self.instance_names
+
+ def DeclareLocks(self, level):
+ # If we're not already locking all nodes in the set we have to declare the
+ # instance's primary/secondary nodes.
+ if (level == locking.LEVEL_NODE and
+ self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
+ self._LockInstancesNodes()
+
+ def BuildHooksEnv(self):
+ """Build hooks env.
+
+ This runs on the master, the primary and all the secondaries.
"""
- self.replacer.Exec()
+ env = {
+ "NODE_NAME": self.op.node_name,
+ }
+
+ nl = [self.cfg.GetMasterNode()]
+
+ if self.op.remote_node is not None:
+ env["NEW_SECONDARY"] = self.op.remote_node
+ nl.append(self.op.remote_node)
+ return (env, nl, nl)
-class _DiskReplacer:
+
+class TLReplaceDisks(Tasklet):
"""Replaces disks for an instance.
Note: Locking is not within the scope of this class.
"""Initializes this class.
"""
+ Tasklet.__init__(self, lu)
+
# Parameters
- self.lu = lu
self.instance_name = instance_name
self.mode = mode
self.iallocator_name = iallocator_name
self.remote_node = remote_node
self.disks = disks
- # Shortcuts
- self.cfg = lu.cfg
- self.rpc = lu.rpc
-
# Runtime data
self.instance = None
self.new_node = None
@staticmethod
def CheckArguments(mode, remote_node, iallocator):
+ """Helper function for users of this class.
+
+ """
# check for valid parameter combination
cnt = [remote_node, iallocator].count(None)
if mode == constants.REPLACE_DISK_CHG:
self.node_secondary_ip = node_2nd_ip
- def Exec(self):
+ def Exec(self, feedback_fn):
"""Execute disk replacement.
This dispatches the disk replacement to the appropriate handler.
"""
+ feedback_fn("Replacing disks for %s" % self.instance.name)
+
activate_disks = (not self.instance.admin_up)
# Activate the instance disks if we're replacing them on a down instance