Subclasses must follow these rules:
- implement ExpandNames
- - implement CheckPrereq
- - implement Exec
+ - implement CheckPrereq (except when tasklets are used)
+ - implement Exec (except when tasklets are used)
- implement BuildHooksEnv
- redefine HPATH and HTYPE
- optionally redefine their run requirements:
# support for dry-run
self.dry_run_result = None
+ # Tasklets
+ self.tasklets = None
+
for attr_name in self._OP_REQP:
attr_val = getattr(op, attr_name, None)
if attr_val is None:
raise errors.OpPrereqError("Required parameter '%s' missing" %
attr_name)
+
self.CheckArguments()
def __GetSSH(self):
level you can modify self.share_locks, setting a true value (usually 1) for
that level. By default locks are not shared.
+ This function can also define a list of tasklets, which then will be
+ executed in order instead of the usual LU-level CheckPrereq and Exec
+ functions, if those are not defined by the LU.
+
Examples::
# Acquire all nodes and one instance
their canonical form if it hasn't been done by ExpandNames before.
"""
- raise NotImplementedError
+ if self.tasklets is not None:
+ for (idx, tl) in enumerate(self.tasklets):
+ logging.debug("Checking prerequisites for tasklet %s/%s",
+ idx + 1, len(self.tasklets))
+ tl.CheckPrereq()
+ else:
+ raise NotImplementedError
def Exec(self, feedback_fn):
"""Execute the LU.
code, or expected.
"""
- raise NotImplementedError
+ if self.tasklets is not None:
+ for (idx, tl) in enumerate(self.tasklets):
+ logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
+ tl.Exec(feedback_fn)
+ else:
+ raise NotImplementedError
def BuildHooksEnv(self):
"""Build hooks environment for this LU.
HTYPE = None
+class Tasklet:
+ """Tasklet base class.
+
+ Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
+ they can mix legacy code with tasklets. Locking needs to be done in the LU,
+ tasklets know nothing about locks.
+
+ Subclasses must follow these rules:
+ - Implement CheckPrereq
+ - Implement Exec
+
+ """
+ def __init__(self, lu):
+ self.lu = lu
+
+ # Shortcuts
+ self.cfg = lu.cfg
+ self.rpc = lu.rpc
+
+ def CheckPrereq(self):
+ """Check prerequisites for this tasklets.
+
+ This method should check whether the prerequisites for the execution of
+ this tasklet are fulfilled. It can do internode communication, but it
+ should be idempotent - no cluster or system changes are allowed.
+
+ The method should raise errors.OpPrereqError in case something is not
+ fulfilled. Its return value is ignored.
+
+ This method should also update all parameters to their canonical form if it
+ hasn't been done before.
+
+ """
+ raise NotImplementedError
+
+ def Exec(self, feedback_fn):
+ """Execute the tasklet.
+
+ This method should implement the actual work. It should raise
+ errors.OpExecError for failures that are somewhat dealt with in code, or
+ expected.
+
+ """
+ raise NotImplementedError
+
+
def _GetWantedNodes(lu, nodes):
"""Returns list of checked and expanded node names.
_CheckNicsBridgesExist(lu, instance.nics, node)
+def _GetNodePrimaryInstances(cfg, node_name):
+ """Returns primary instances on a node.
+
+ """
+ instances = []
+
+ for (_, inst) in cfg.GetAllInstancesInfo().iteritems():
+ if node_name == inst.primary_node:
+ instances.append(inst)
+
+ return instances
+
+
+def _GetNodeSecondaryInstances(cfg, node_name):
+ """Returns secondary instances on a node.
+
+ """
+ instances = []
+
+ for (_, inst) in cfg.GetAllInstancesInfo().iteritems():
+ if node_name in inst.secondary_nodes:
+ instances.append(inst)
+
+ return instances
+
+
class LUDestroyCluster(NoHooksLU):
"""Logical unit for destroying the cluster.
def ExpandNames(self):
self._ExpandAndLockInstance()
+
self.needed_locks[locking.LEVEL_NODE] = []
self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+ self._migrater = TLMigrateInstance(self, self.op.instance_name,
+ self.op.live, self.op.cleanup)
+ self.tasklets = [self._migrater]
+
def DeclareLocks(self, level):
if level == locking.LEVEL_NODE:
self._LockInstancesNodes()
This runs on master, primary and secondary nodes of the instance.
"""
- env = _BuildInstanceHookEnvByObject(self, self.instance)
+ instance = self._migrater.instance
+ env = _BuildInstanceHookEnvByObject(self, instance)
env["MIGRATE_LIVE"] = self.op.live
env["MIGRATE_CLEANUP"] = self.op.cleanup
- nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
+ nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
return env, nl, nl
+
+class LUMigrateNode(LogicalUnit):
+ """Migrate all instances from a node.
+
+ """
+ HPATH = "node-migrate"
+ HTYPE = constants.HTYPE_NODE
+ _OP_REQP = ["node_name", "live"]
+ REQ_BGL = False
+
+ def ExpandNames(self):
+ self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name)
+ if self.op.node_name is None:
+ raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name)
+
+ self.needed_locks = {
+ locking.LEVEL_NODE: [self.op.node_name],
+ }
+
+ self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
+
+ # Create tasklets for migrating instances for all instances on this node
+ names = []
+ tasklets = []
+
+ for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name):
+ logging.debug("Migrating instance %s", inst.name)
+ names.append(inst.name)
+
+ tasklets.append(TLMigrateInstance(self, inst.name, self.op.live, False))
+
+ self.tasklets = tasklets
+
+ # Declare instance locks
+ self.needed_locks[locking.LEVEL_INSTANCE] = names
+
+ def DeclareLocks(self, level):
+ if level == locking.LEVEL_NODE:
+ self._LockInstancesNodes()
+
+ def BuildHooksEnv(self):
+ """Build hooks env.
+
+ This runs on the master, the primary and all the secondaries.
+
+ """
+ env = {
+ "NODE_NAME": self.op.node_name,
+ }
+
+ nl = [self.cfg.GetMasterNode()]
+
+ return (env, nl, nl)
+
+
+class TLMigrateInstance(Tasklet):
+ def __init__(self, lu, instance_name, live, cleanup):
+ """Initializes this class.
+
+ """
+ Tasklet.__init__(self, lu)
+
+ # Parameters
+ self.instance_name = instance_name
+ self.live = live
+ self.cleanup = cleanup
+
def CheckPrereq(self):
"""Check prerequisites.
"""
instance = self.cfg.GetInstanceInfo(
- self.cfg.ExpandInstanceName(self.op.instance_name))
+ self.cfg.ExpandInstanceName(self.instance_name))
if instance is None:
raise errors.OpPrereqError("Instance '%s' not known" %
- self.op.instance_name)
+ self.instance_name)
if instance.disk_template != constants.DT_DRBD8:
raise errors.OpPrereqError("Instance's disk layout is not"
# check bridge existance
_CheckInstanceBridgesExist(self, instance, node=target_node)
- if not self.op.cleanup:
+ if not self.cleanup:
_CheckNodeNotDrained(self, target_node)
result = self.rpc.call_instance_migratable(instance.primary_node,
instance)
self._GoReconnect(False)
self._WaitUntilSync()
except errors.OpExecError, err:
- self.LogWarning("Migration failed and I can't reconnect the"
- " drives: error '%s'\n"
- "Please look and recover the instance status" %
- str(err))
+ self.lu.LogWarning("Migration failed and I can't reconnect the"
+ " drives: error '%s'\n"
+ "Please look and recover the instance status" %
+ str(err))
def _AbortMigration(self):
"""Call the hypervisor code to abort a started migration.
time.sleep(10)
result = self.rpc.call_instance_migrate(source_node, instance,
self.nodes_ip[target_node],
- self.op.live)
+ self.live)
msg = result.fail_msg
if msg:
logging.error("Instance migration failed, trying to revert"
"""Perform the migration.
"""
+ feedback_fn("Migrating instance %s" % self.instance.name)
+
self.feedback_fn = feedback_fn
self.source_node = self.instance.primary_node
self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
}
- if self.op.cleanup:
+
+ if self.cleanup:
return self._ExecCleanup()
else:
return self._ExecMigration()
if not hasattr(self.op, "iallocator"):
self.op.iallocator = None
- _DiskReplacer.CheckArguments(self.op.mode, self.op.remote_node,
- self.op.iallocator)
+ TLReplaceDisks.CheckArguments(self.op.mode, self.op.remote_node,
+ self.op.iallocator)
def ExpandNames(self):
self._ExpandAndLockInstance()
self.needed_locks[locking.LEVEL_NODE] = []
self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
- self.replacer = _DiskReplacer(self, self.op.instance_name, self.op.mode,
- self.op.iallocator, self.op.remote_node,
- self.op.disks)
+ self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
+ self.op.iallocator, self.op.remote_node,
+ self.op.disks)
+
+ self.tasklets = [self.replacer]
def DeclareLocks(self, level):
# If we're not already locking all nodes in the set we have to declare the
nl.append(self.op.remote_node)
return env, nl, nl
- def CheckPrereq(self):
- """Check prerequisites.
- This checks that the instance is in the cluster.
+class LUEvacuateNode(LogicalUnit):
+ """Relocate the secondary instances from a node.
- """
- self.replacer.CheckPrereq()
+ """
+ HPATH = "node-evacuate"
+ HTYPE = constants.HTYPE_NODE
+ _OP_REQP = ["node_name"]
+ REQ_BGL = False
- def Exec(self, feedback_fn):
- """Execute disk replacement.
+ def CheckArguments(self):
+ if not hasattr(self.op, "remote_node"):
+ self.op.remote_node = None
+ if not hasattr(self.op, "iallocator"):
+ self.op.iallocator = None
- This dispatches the disk replacement to the appropriate handler.
+ TLReplaceDisks.CheckArguments(constants.REPLACE_DISK_CHG,
+ self.op.remote_node,
+ self.op.iallocator)
+
+ def ExpandNames(self):
+ self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name)
+ if self.op.node_name is None:
+ raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name)
+
+ self.needed_locks = {}
+
+ # Declare node locks
+ if self.op.iallocator is not None:
+ self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+
+ elif self.op.remote_node is not None:
+ remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
+ if remote_node is None:
+ raise errors.OpPrereqError("Node '%s' not known" %
+ self.op.remote_node)
+
+ self.op.remote_node = remote_node
+
+ # Warning: do not remove the locking of the new secondary here
+ # unless DRBD8.AddChildren is changed to work in parallel;
+ # currently it doesn't since parallel invocations of
+ # FindUnusedMinor will conflict
+ self.needed_locks[locking.LEVEL_NODE] = [remote_node]
+ self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
+
+ else:
+ raise errors.OpPrereqError("Invalid parameters")
+
+ # Create tasklets for replacing disks for all secondary instances on this
+ # node
+ names = []
+ tasklets = []
+
+ for inst in _GetNodeSecondaryInstances(self.cfg, self.op.node_name):
+ logging.debug("Replacing disks for instance %s", inst.name)
+ names.append(inst.name)
+
+ replacer = TLReplaceDisks(self, inst.name, constants.REPLACE_DISK_CHG,
+ self.op.iallocator, self.op.remote_node, [])
+ tasklets.append(replacer)
+
+ self.tasklets = tasklets
+ self.instance_names = names
+
+ # Declare instance locks
+ self.needed_locks[locking.LEVEL_INSTANCE] = self.instance_names
+
+ def DeclareLocks(self, level):
+ # If we're not already locking all nodes in the set we have to declare the
+ # instance's primary/secondary nodes.
+ if (level == locking.LEVEL_NODE and
+ self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
+ self._LockInstancesNodes()
+
+ def BuildHooksEnv(self):
+ """Build hooks env.
+
+ This runs on the master, the primary and all the secondaries.
"""
- self.replacer.Exec()
+ env = {
+ "NODE_NAME": self.op.node_name,
+ }
+
+ nl = [self.cfg.GetMasterNode()]
+
+ if self.op.remote_node is not None:
+ env["NEW_SECONDARY"] = self.op.remote_node
+ nl.append(self.op.remote_node)
+ return (env, nl, nl)
-class _DiskReplacer:
+
+class TLReplaceDisks(Tasklet):
"""Replaces disks for an instance.
Note: Locking is not within the scope of this class.
"""Initializes this class.
"""
+ Tasklet.__init__(self, lu)
+
# Parameters
- self.lu = lu
self.instance_name = instance_name
self.mode = mode
self.iallocator_name = iallocator_name
self.remote_node = remote_node
self.disks = disks
- # Shortcuts
- self.cfg = lu.cfg
- self.rpc = lu.rpc
-
# Runtime data
self.instance = None
self.new_node = None
@staticmethod
def CheckArguments(mode, remote_node, iallocator):
+ """Helper function for users of this class.
+
+ """
# check for valid parameter combination
cnt = [remote_node, iallocator].count(None)
if mode == constants.REPLACE_DISK_CHG:
self.node_secondary_ip = node_2nd_ip
- def Exec(self):
+ def Exec(self, feedback_fn):
"""Execute disk replacement.
This dispatches the disk replacement to the appropriate handler.
"""
+ feedback_fn("Replacing disks for %s" % self.instance.name)
+
activate_disks = (not self.instance.admin_up)
# Activate the instance disks if we're replacing them on a down instance