self.proc = processor
self.op = op
self.cfg = context.cfg
+ self.glm = context.glm
self.context = context
self.rpc = rpc
# Dicts used to declare locking needs to mcpu
self.needed_locks = None
- self.acquired_locks = {}
self.share_locks = dict.fromkeys(locking.LEVELS, 0)
self.add_locks = {}
self.remove_locks = {}
# future we might want to have different behaviors depending on the value
# of self.recalculate_locks[locking.LEVEL_NODE]
wanted_nodes = []
- for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
+ for instance_name in self.glm.list_owned(locking.LEVEL_INSTANCE):
instance = self.context.cfg.GetInstanceInfo(instance_name)
wanted_nodes.append(instance.primary_node)
if not primary_only:
"""
if self.do_locking:
- names = lu.acquired_locks[lock_level]
+ names = lu.glm.list_owned(lock_level)
else:
names = all_names
# caller specified names and we must keep the same order
assert self.names
- assert not self.do_locking or lu.acquired_locks[lock_level]
+ assert not self.do_locking or lu.glm.is_owned(lock_level)
missing = set(self.wanted).difference(names)
if missing:
return params_copy
+def _ReleaseLocks(lu, level, names=None, keep=None):
+ """Releases locks owned by an LU.
+
+ @type lu: L{LogicalUnit}
+ @param level: Lock level
+ @type names: list or None
+ @param names: Names of locks to release
+ @type keep: list or None
+ @param keep: Names of locks to retain
+
+ """
+ assert not (keep is not None and names is not None), \
+ "Only one of the 'names' and the 'keep' parameters can be given"
+
+ if names is not None:
+ should_release = names.__contains__
+ elif keep:
+ should_release = lambda name: name not in keep
+ else:
+ should_release = None
+
+ if should_release:
+ retain = []
+ release = []
+
+ # Determine which locks to release
+ for name in lu.glm.list_owned(level):
+ if should_release(name):
+ release.append(name)
+ else:
+ retain.append(name)
+
+ assert len(lu.glm.list_owned(level)) == (len(retain) + len(release))
+
+ # Release just some locks
+ lu.glm.release(level, names=release)
+
+ assert frozenset(lu.glm.list_owned(level)) == frozenset(retain)
+ else:
+ # Release everything
+ lu.glm.release(level)
+
+ assert not lu.glm.is_owned(level), "No locks should be owned"
+
+
def _RunPostHook(lu, node_name):
"""Runs the post-hook for an opcode on a single node.
def ExpandNames(self):
if self.op.instances:
- self.wanted_names = []
- for name in self.op.instances:
- full_name = _ExpandInstanceName(self.cfg, name)
- self.wanted_names.append(full_name)
+ self.wanted_names = _GetWantedInstances(self, self.op.instances)
self.needed_locks = {
locking.LEVEL_NODE: [],
locking.LEVEL_INSTANCE: self.wanted_names,
locking.LEVEL_NODE: locking.ALL_SET,
locking.LEVEL_INSTANCE: locking.ALL_SET,
}
- self.share_locks = dict(((i, 1) for i in locking.LEVELS))
+ self.share_locks = dict.fromkeys(locking.LEVELS, 1)
def DeclareLocks(self, level):
if level == locking.LEVEL_NODE and self.wanted_names is not None:
"""
if self.wanted_names is None:
- self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
+ self.wanted_names = self.glm.list_owned(locking.LEVEL_INSTANCE)
self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
in self.wanted_names]
" drbd-based instances exist",
errors.ECODE_INVAL)
- node_list = self.acquired_locks[locking.LEVEL_NODE]
+ node_list = self.glm.list_owned(locking.LEVEL_NODE)
# if vg_name not None, checks given volume group on all nodes
if self.op.vg_name:
REG_BGL = False
_SKIP_MASTER = (constants.OOB_POWER_OFF, constants.OOB_POWER_CYCLE)
+ def ExpandNames(self):
+ """Gather locks we need.
+
+ """
+ if self.op.node_names:
+ self.op.node_names = _GetWantedNodes(self, self.op.node_names)
+ lock_names = self.op.node_names
+ else:
+ lock_names = locking.ALL_SET
+
+ self.needed_locks = {
+ locking.LEVEL_NODE: lock_names,
+ }
+
def CheckPrereq(self):
"""Check prerequisites.
assert self.op.power_delay >= 0.0
if self.op.node_names:
- if self.op.command in self._SKIP_MASTER:
- if self.master_node in self.op.node_names:
- master_node_obj = self.cfg.GetNodeInfo(self.master_node)
- master_oob_handler = _SupportsOob(self.cfg, master_node_obj)
-
- if master_oob_handler:
- additional_text = ("Run '%s %s %s' if you want to operate on the"
- " master regardless") % (master_oob_handler,
- self.op.command,
- self.master_node)
- else:
- additional_text = "The master node does not support out-of-band"
+ if (self.op.command in self._SKIP_MASTER and
+ self.master_node in self.op.node_names):
+ master_node_obj = self.cfg.GetNodeInfo(self.master_node)
+ master_oob_handler = _SupportsOob(self.cfg, master_node_obj)
+
+ if master_oob_handler:
+ additional_text = ("run '%s %s %s' if you want to operate on the"
+ " master regardless") % (master_oob_handler,
+ self.op.command,
+ self.master_node)
+ else:
+ additional_text = "it does not support out-of-band operations"
- raise errors.OpPrereqError(("Operating on the master node %s is not"
- " allowed for %s\n%s") %
- (self.master_node, self.op.command,
- additional_text), errors.ECODE_INVAL)
+ raise errors.OpPrereqError(("Operating on the master node %s is not"
+ " allowed for %s; %s") %
+ (self.master_node, self.op.command,
+ additional_text), errors.ECODE_INVAL)
else:
self.op.node_names = self.cfg.GetNodeList()
if self.op.command in self._SKIP_MASTER:
" not marked offline") % node_name,
errors.ECODE_STATE)
- def ExpandNames(self):
- """Gather locks we need.
-
- """
- if self.op.node_names:
- self.op.node_names = [_ExpandNodeName(self.cfg, name)
- for name in self.op.node_names]
- lock_names = self.op.node_names
- else:
- lock_names = locking.ALL_SET
-
- self.needed_locks = {
- locking.LEVEL_NODE: lock_names,
- }
-
def Exec(self, feedback_fn):
"""Execute OOB and return result if we expect any.
master_node = self.master_node
ret = []
- for idx, node in enumerate(self.nodes):
+ for idx, node in enumerate(utils.NiceSort(self.nodes,
+ key=lambda node: node.name)):
node_entry = [(constants.RS_NORMAL, node.name)]
ret.append(node_entry)
self.op.timeout)
if result.fail_msg:
- self.LogWarning("On node '%s' out-of-band RPC failed with: %s",
+ self.LogWarning("Out-of-band RPC failed on node '%s': %s",
node.name, result.fail_msg)
node_entry.append((constants.RS_NODATA, None))
else:
try:
self._CheckPayload(result)
except errors.OpExecError, err:
- self.LogWarning("The payload returned by '%s' is not valid: %s",
+ self.LogWarning("Payload returned by node '%s' is not valid: %s",
node.name, err)
node_entry.append((constants.RS_NODATA, None))
else:
for item, status in result.payload:
if status in [constants.OOB_STATUS_WARNING,
constants.OOB_STATUS_CRITICAL]:
- self.LogWarning("On node '%s' item '%s' has status '%s'",
- node.name, item, status)
+ self.LogWarning("Item '%s' on node '%s' has status '%s'",
+ item, node.name, status)
if self.op.command == constants.OOB_POWER_ON:
node.powered = True
"""
# Locking is not used
- assert not (lu.acquired_locks or self.do_locking or self.use_locking)
+ assert not (compat.any(lu.glm.is_owned(level)
+ for level in locking.LEVELS
+ if level != locking.LEVEL_CLUSTER) or
+ self.do_locking or self.use_locking)
valid_nodes = [node.name
for node in lu.cfg.GetAllNodesInfo().values()
masternode = self.cfg.GetMasterNode()
if node.name == masternode:
- raise errors.OpPrereqError("Node is the master node,"
- " you need to failover first.",
- errors.ECODE_INVAL)
+ raise errors.OpPrereqError("Node is the master node, failover to another"
+ " node is required", errors.ECODE_INVAL)
for instance_name in instance_list:
instance = self.cfg.GetInstanceInfo(instance_name)
if node.name in instance.all_nodes:
raise errors.OpPrereqError("Instance %s is still running on the node,"
- " please remove first." % instance_name,
+ " please remove first" % instance_name,
errors.ECODE_INVAL)
self.op.node_name = node.name
self.node = node
"""Computes the list of nodes and their attributes.
"""
- nodenames = self.acquired_locks[locking.LEVEL_NODE]
+ nodenames = self.glm.list_owned(locking.LEVEL_NODE)
volumes = self.rpc.call_node_volumes(nodenames)
ilist = [self.cfg.GetInstanceInfo(iname) for iname
"""Computes the list of nodes and their attributes.
"""
- self.nodes = self.acquired_locks[locking.LEVEL_NODE]
+ self.nodes = self.glm.list_owned(locking.LEVEL_NODE)
# Always get name to sort by
if constants.SF_NAME in self.op.output_fields:
# If we have locked all instances, before waiting to lock nodes, release
# all the ones living on nodes unrelated to the current operation.
if level == locking.LEVEL_NODE and self.lock_instances:
- instances_release = []
- instances_keep = []
self.affected_instances = []
if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
- for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
+ instances_keep = []
+
+ # Build list of instances to release
+ for instance_name in self.glm.list_owned(locking.LEVEL_INSTANCE):
instance = self.context.cfg.GetInstanceInfo(instance_name)
- i_mirrored = instance.disk_template in constants.DTS_INT_MIRROR
- if i_mirrored and self.op.node_name in instance.all_nodes:
+ if (instance.disk_template in constants.DTS_INT_MIRROR and
+ self.op.node_name in instance.all_nodes):
instances_keep.append(instance_name)
self.affected_instances.append(instance)
- else:
- instances_release.append(instance_name)
- if instances_release:
- self.context.glm.release(locking.LEVEL_INSTANCE, instances_release)
- self.acquired_locks[locking.LEVEL_INSTANCE] = instances_keep
+
+ _ReleaseLocks(self, locking.LEVEL_INSTANCE, keep=instances_keep)
+
+ assert (set(self.glm.list_owned(locking.LEVEL_INSTANCE)) ==
+ set(instances_keep))
def BuildHooksEnv(self):
"""Build hooks env.
self.old_flags = old_flags = (node.master_candidate,
node.drained, node.offline)
- assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags)
+ assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags)
self.old_role = old_role = self._F2R[old_flags]
# Check for ineffective changes
if _SupportsOob(self.cfg, node):
if self.op.offline is False and not (node.powered or
self.op.powered == True):
- raise errors.OpPrereqError(("Please power on node %s first before you"
- " can reset offline state") %
+ raise errors.OpPrereqError(("Node %s needs to be turned on before its"
+ " offline status can be reset") %
self.op.node_name)
elif self.op.powered is not None:
raise errors.OpPrereqError(("Unable to change powered state for node %s"
- " which does not support out-of-band"
+ " as it does not support out-of-band"
" handling") % self.op.node_name)
# If we're being deofflined/drained, we'll MC ourself if needed
else:
for idx in self.op.disks:
if idx >= len(instance.disks):
- raise errors.OpPrereqError("Invalid disk index passed '%s'" % idx,
+ raise errors.OpPrereqError("Invalid disk index '%s'" % idx,
errors.ECODE_INVAL)
self.instance = instance
"""
if self.op.ip_check and not self.op.name_check:
# TODO: make the ip check more flexible and not depend on the name check
- raise errors.OpPrereqError("Cannot do ip check without a name check",
+ raise errors.OpPrereqError("IP address check requires a name check",
errors.ECODE_INVAL)
def BuildHooksEnv(self):
# Change the instance lock. This is definitely safe while we hold the BGL.
# Otherwise the new lock would have to be added in acquired mode.
assert self.REQ_BGL
- self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
- self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
+ self.glm.remove(locking.LEVEL_INSTANCE, old_name)
+ self.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
# re-read the instance from the configuration after rename
inst = self.cfg.GetInstanceInfo(self.op.new_name)
shutdown_timeout = self.op.shutdown_timeout
self._migrater = TLMigrateInstance(self, self.op.instance_name,
cleanup=False,
- iallocator=self.op.iallocator,
- target_node=self.op.target_node,
failover=True,
ignore_consistency=ignore_consistency,
shutdown_timeout=shutdown_timeout)
"""
instance = self._migrater.instance
source_node = instance.primary_node
- target_node = self._migrater.target_node
+ target_node = self.op.target_node
env = {
"IGNORE_CONSISTENCY": self.op.ignore_consistency,
"SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
self._migrater = TLMigrateInstance(self, self.op.instance_name,
cleanup=self.op.cleanup,
- iallocator=self.op.iallocator,
- target_node=self.op.target_node,
failover=False,
fallback=self.op.allow_failover)
self.tasklets = [self._migrater]
"""
instance = self._migrater.instance
source_node = instance.primary_node
- target_node = self._migrater.target_node
+ target_node = self.op.target_node
env = _BuildInstanceHookEnvByObject(self, instance)
env.update({
"MIGRATE_LIVE": self._migrater.live,
logging.debug("Migrating instance %s", inst.name)
names.append(inst.name)
- tasklets.append(TLMigrateInstance(self, inst.name, cleanup=False,
- iallocator=self.op.iallocator,
- taget_node=None))
+ tasklets.append(TLMigrateInstance(self, inst.name, cleanup=False))
if inst.disk_template in constants.DTS_EXT_MIRROR:
# We need to lock all nodes, as the iallocator will choose the
@ivar shutdown_timeout: In case of failover timeout of the shutdown
"""
- def __init__(self, lu, instance_name, cleanup=False, iallocator=None,
- target_node=None, failover=False, fallback=False,
+ def __init__(self, lu, instance_name, cleanup=False,
+ failover=False, fallback=False,
ignore_consistency=False,
shutdown_timeout=constants.DEFAULT_SHUTDOWN_TIMEOUT):
"""Initializes this class.
self.instance_name = instance_name
self.cleanup = cleanup
self.live = False # will be overridden later
- self.iallocator = iallocator
- self.target_node = target_node
self.failover = failover
self.fallback = fallback
self.ignore_consistency = ignore_consistency
if instance.disk_template in constants.DTS_EXT_MIRROR:
_CheckIAllocatorOrNode(self.lu, "iallocator", "target_node")
- if self.iallocator:
+ if self.lu.op.iallocator:
self._RunAllocator()
+ else:
+ # We set set self.target_node as it is required by
+ # BuildHooksEnv
+ self.target_node = self.lu.op.target_node
# self.target_node is already populated, either directly or by the
# iallocator run
target_node = self.target_node
if len(self.lu.tasklets) == 1:
- # It is safe to remove locks only when we're the only tasklet in the LU
- nodes_keep = [instance.primary_node, self.target_node]
- nodes_rel = [node for node in self.lu.acquired_locks[locking.LEVEL_NODE]
- if node not in nodes_keep]
- self.lu.context.glm.release(locking.LEVEL_NODE, nodes_rel)
- self.lu.acquired_locks[locking.LEVEL_NODE] = nodes_keep
+ # It is safe to release locks only when we're the only tasklet
+ # in the LU
+ _ReleaseLocks(self.lu, locking.LEVEL_NODE,
+ keep=[instance.primary_node, self.target_node])
else:
secondary_nodes = instance.secondary_nodes
" %s disk template" %
instance.disk_template)
target_node = secondary_nodes[0]
- if self.iallocator or (self.target_node and
- self.target_node != target_node):
+ if self.lu.op.iallocator or (self.lu.op.target_node and
+ self.lu.op.target_node != target_node):
if self.failover:
text = "failed over"
else:
assert not (self.failover and self.cleanup)
+ if not self.failover:
+ if self.lu.op.live is not None and self.lu.op.mode is not None:
+ raise errors.OpPrereqError("Only one of the 'live' and 'mode'"
+ " parameters are accepted",
+ errors.ECODE_INVAL)
+ if self.lu.op.live is not None:
+ if self.lu.op.live:
+ self.lu.op.mode = constants.HT_MIGRATION_LIVE
+ else:
+ self.lu.op.mode = constants.HT_MIGRATION_NONLIVE
+ # reset the 'live' parameter to None so that repeated
+ # invocations of CheckPrereq do not raise an exception
+ self.lu.op.live = None
+ elif self.lu.op.mode is None:
+ # read the default value from the hypervisor
+ i_hv = self.cfg.GetClusterInfo().FillHV(self.instance,
+ skip_globals=False)
+ self.lu.op.mode = i_hv[constants.HV_MIGRATION_MODE]
+
+ self.live = self.lu.op.mode == constants.HT_MIGRATION_LIVE
+ else:
+ # Failover is never live
+ self.live = False
+
def _RunAllocator(self):
"""Run the allocator based on input opcode.
self.instance.primary_node],
)
- ial.Run(self.iallocator)
+ ial.Run(self.lu.op.iallocator)
if not ial.success:
raise errors.OpPrereqError("Can't compute nodes using"
" iallocator '%s': %s" %
- (self.iallocator, ial.info),
+ (self.lu.op.iallocator, ial.info),
errors.ECODE_NORES)
if len(ial.result) != ial.required_nodes:
raise errors.OpPrereqError("iallocator '%s' returned invalid number"
" of nodes (%s), required %s" %
- (self.iallocator, len(ial.result),
+ (self.lu.op.iallocator, len(ial.result),
ial.required_nodes), errors.ECODE_FAULT)
self.target_node = ial.result[0]
self.lu.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
- self.instance_name, self.iallocator,
+ self.instance_name, self.lu.op.iallocator,
utils.CommaJoin(ial.result))
- if not self.failover:
- if self.lu.op.live is not None and self.lu.op.mode is not None:
- raise errors.OpPrereqError("Only one of the 'live' and 'mode'"
- " parameters are accepted",
- errors.ECODE_INVAL)
- if self.lu.op.live is not None:
- if self.lu.op.live:
- self.lu.op.mode = constants.HT_MIGRATION_LIVE
- else:
- self.lu.op.mode = constants.HT_MIGRATION_NONLIVE
- # reset the 'live' parameter to None so that repeated
- # invocations of CheckPrereq do not raise an exception
- self.lu.op.live = None
- elif self.lu.op.mode is None:
- # read the default value from the hypervisor
- i_hv = self.cfg.GetClusterInfo().FillHV(self.instance,
- skip_globals=False)
- self.lu.op.mode = i_hv[constants.HV_MIGRATION_MODE]
-
- self.live = self.lu.op.mode == constants.HT_MIGRATION_LIVE
- else:
- # Failover is never live
- self.live = False
-
def _WaitUntilSync(self):
"""Poll with custom rpc for disk sync.
if runningon_source and runningon_target:
raise errors.OpExecError("Instance seems to be running on two nodes,"
- " or the hypervisor is confused. You will have"
+ " or the hypervisor is confused; you will have"
" to ensure manually that it runs only on one"
- " and restart this operation.")
+ " and restart this operation")
if not (runningon_source or runningon_target):
- raise errors.OpExecError("Instance does not seem to be running at all."
- " In this case, it's safer to repair by"
+ raise errors.OpExecError("Instance does not seem to be running at all;"
+ " in this case it's safer to repair by"
" running 'gnt-instance stop' to ensure disk"
- " shutdown, and then restarting it.")
+ " shutdown, and then restarting it")
if runningon_target:
# the migration has actually succeeded, we need to update the config
self._GoReconnect(False)
self._WaitUntilSync()
except errors.OpExecError, err:
- self.lu.LogWarning("Migration failed and I can't reconnect the"
- " drives: error '%s'\n"
- "Please look and recover the instance status" %
- str(err))
+ self.lu.LogWarning("Migration failed and I can't reconnect the drives,"
+ " please try to recover the instance manually;"
+ " error '%s'" % str(err))
def _AbortMigration(self):
"""Call the hypervisor code to abort a started migration.
if not _CheckDiskConsistency(self.lu, dev, target_node, False):
raise errors.OpExecError("Disk %s is degraded or not fully"
" synchronized on target node,"
- " aborting migrate." % dev.iv_name)
+ " aborting migration" % dev.iv_name)
# First get the migration information from the remote node
result = self.rpc.call_migration_info(source_node, instance)
if not _CheckDiskConsistency(self, dev, target_node, False):
if not self.ignore_consistency:
raise errors.OpExecError("Disk %s is degraded on target node,"
- " aborting failover." % dev.iv_name)
+ " aborting failover" % dev.iv_name)
else:
self.feedback_fn("* not checking disk consistency as instance is not"
" running")
msg = result.fail_msg
if msg:
if self.ignore_consistency or primary_node.offline:
- self.lu.LogWarning("Could not shutdown instance %s on node %s."
- " Proceeding anyway. Please make sure node"
- " %s is down. Error details: %s",
+ self.lu.LogWarning("Could not shutdown instance %s on node %s,"
+ " proceeding anyway; please make sure node"
+ " %s is down; error details: %s",
instance.name, source_node, source_node, msg)
else:
raise errors.OpExecError("Could not shutdown instance %s on"
for idx, success in enumerate(result.payload):
if not success:
- lu.LogWarning("Warning: Resume sync of disk %d failed. Please have a"
- " look at the status and troubleshoot the issue.", idx)
+ lu.LogWarning("Resume sync of disk %d failed, please have a"
+ " look at the status and troubleshoot the issue", idx)
logging.warn("resume-sync of instance %s for disks %d failed",
instance.name, idx)
if self.op.ip_check and not self.op.name_check:
# TODO: make the ip check more flexible and not depend on the name check
- raise errors.OpPrereqError("Cannot do ip check without a name check",
- errors.ECODE_INVAL)
+ raise errors.OpPrereqError("Cannot do IP address check without a name"
+ " check", errors.ECODE_INVAL)
# check nics' parameter names
for nic in self.op.nics:
self.op.src_node = None
if os.path.isabs(src_path):
raise errors.OpPrereqError("Importing an instance from an absolute"
- " path requires a source node option.",
+ " path requires a source node option",
errors.ECODE_INVAL)
else:
self.op.src_node = src_node = _ExpandNodeName(self.cfg, src_node)
src_path = self.op.src_path
if src_node is None:
- locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
+ locked_nodes = self.glm.list_owned(locking.LEVEL_NODE)
exp_list = self.rpc.call_export_list(locked_nodes)
found = False
for node in exp_list:
if self.op.disk_template in constants.DTS_INT_MIRROR:
if self.op.snode == pnode.name:
raise errors.OpPrereqError("The secondary node cannot be the"
- " primary node.", errors.ECODE_INVAL)
+ " primary node", errors.ECODE_INVAL)
_CheckNodeOnline(self, self.op.snode)
_CheckNodeNotDrained(self, self.op.snode)
_CheckNodeVmCapable(self, self.op.snode)
# Declare that we don't want to remove the instance lock anymore, as we've
# added the instance to the config
del self.remove_locks[locking.LEVEL_INSTANCE]
- # Unlock all the nodes
+
if self.op.mode == constants.INSTANCE_IMPORT:
- nodes_keep = [self.op.src_node]
- nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
- if node != self.op.src_node]
- self.context.glm.release(locking.LEVEL_NODE, nodes_release)
- self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
+ # Release unused nodes
+ _ReleaseLocks(self, locking.LEVEL_NODE, keep=[self.op.src_node])
else:
- self.context.glm.release(locking.LEVEL_NODE)
- del self.acquired_locks[locking.LEVEL_NODE]
+ # Release all nodes
+ _ReleaseLocks(self, locking.LEVEL_NODE)
disk_abort = False
if not self.adopt_disks and self.cfg.GetClusterInfo().prealloc_wipe_disks:
def ExpandNames(self):
self._ExpandAndLockInstance()
- if self.op.iallocator is not None:
- self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+ assert locking.LEVEL_NODE not in self.needed_locks
+ assert locking.LEVEL_NODEGROUP not in self.needed_locks
- elif self.op.remote_node is not None:
- remote_node = _ExpandNodeName(self.cfg, self.op.remote_node)
- self.op.remote_node = remote_node
+ assert self.op.iallocator is None or self.op.remote_node is None, \
+ "Conflicting options"
+
+ if self.op.remote_node is not None:
+ self.op.remote_node = _ExpandNodeName(self.cfg, self.op.remote_node)
# Warning: do not remove the locking of the new secondary here
# unless DRBD8.AddChildren is changed to work in parallel;
# currently it doesn't since parallel invocations of
# FindUnusedMinor will conflict
- self.needed_locks[locking.LEVEL_NODE] = [remote_node]
+ self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node]
self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
-
else:
self.needed_locks[locking.LEVEL_NODE] = []
self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+ if self.op.iallocator is not None:
+ # iallocator will select a new node in the same group
+ self.needed_locks[locking.LEVEL_NODEGROUP] = []
+
self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
self.op.iallocator, self.op.remote_node,
self.op.disks, False, self.op.early_release)
self.tasklets = [self.replacer]
def DeclareLocks(self, level):
- # If we're not already locking all nodes in the set we have to declare the
- # instance's primary/secondary nodes.
- if (level == locking.LEVEL_NODE and
- self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
- self._LockInstancesNodes()
+ if level == locking.LEVEL_NODEGROUP:
+ assert self.op.remote_node is None
+ assert self.op.iallocator is not None
+ assert not self.needed_locks[locking.LEVEL_NODEGROUP]
+
+ self.share_locks[locking.LEVEL_NODEGROUP] = 1
+ self.needed_locks[locking.LEVEL_NODEGROUP] = \
+ self.cfg.GetInstanceNodeGroups(self.op.instance_name)
+
+ elif level == locking.LEVEL_NODE:
+ if self.op.iallocator is not None:
+ assert self.op.remote_node is None
+ assert not self.needed_locks[locking.LEVEL_NODE]
+
+ # Lock member nodes of all locked groups
+ self.needed_locks[locking.LEVEL_NODE] = [node_name
+ for group_uuid in self.glm.list_owned(locking.LEVEL_NODEGROUP)
+ for node_name in self.cfg.GetNodeGroup(group_uuid).members]
+ else:
+ self._LockInstancesNodes()
def BuildHooksEnv(self):
"""Build hooks env.
nl.append(self.op.remote_node)
return nl, nl
+ def CheckPrereq(self):
+ """Check prerequisites.
+
+ """
+ assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or
+ self.op.iallocator is None)
+
+ owned_groups = self.glm.list_owned(locking.LEVEL_NODEGROUP)
+ if owned_groups:
+ groups = self.cfg.GetInstanceNodeGroups(self.op.instance_name)
+ if owned_groups != groups:
+ raise errors.OpExecError("Node groups used by instance '%s' changed"
+ " since lock was acquired, current list is %r,"
+ " used to be '%s'" %
+ (self.op.instance_name,
+ utils.CommaJoin(groups),
+ utils.CommaJoin(owned_groups)))
+
+ return LogicalUnit.CheckPrereq(self)
+
class TLReplaceDisks(Tasklet):
"""Replaces disks for an instance.
return True
-
def CheckPrereq(self):
"""Check prerequisites.
remote_node = self._RunAllocator(self.lu, self.iallocator_name,
instance.name, instance.secondary_nodes)
- if remote_node is not None:
+ if remote_node is None:
+ self.remote_node_info = None
+ else:
+ assert remote_node in self.lu.glm.list_owned(locking.LEVEL_NODE), \
+ "Remote node '%s' is not locked" % remote_node
+
self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
assert self.remote_node_info is not None, \
"Cannot retrieve locked node %s" % remote_node
- else:
- self.remote_node_info = None
if remote_node == self.instance.primary_node:
raise errors.OpPrereqError("The specified node is the primary node of"
- " the instance.", errors.ECODE_INVAL)
+ " the instance", errors.ECODE_INVAL)
if remote_node == secondary_node:
raise errors.OpPrereqError("The specified node is already the"
- " secondary node of the instance.",
+ " secondary node of the instance",
errors.ECODE_INVAL)
if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
for node in check_nodes:
_CheckNodeOnline(self.lu, node)
+ touched_nodes = frozenset(node_name for node_name in [self.new_node,
+ self.other_node,
+ self.target_node]
+ if node_name is not None)
+
+ # Release unneeded node locks
+ _ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
+
+ # Release any owned node group
+ if self.lu.glm.is_owned(locking.LEVEL_NODEGROUP):
+ _ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
+
# Check whether disks are valid
for disk_idx in self.disks:
instance.FindDisk(disk_idx)
# Get secondary node IP addresses
- node_2nd_ip = {}
-
- for node_name in [self.target_node, self.other_node, self.new_node]:
- if node_name is not None:
- node_2nd_ip[node_name] = self.cfg.GetNodeInfo(node_name).secondary_ip
-
- self.node_secondary_ip = node_2nd_ip
+ self.node_secondary_ip = \
+ dict((node_name, self.cfg.GetNodeInfo(node_name).secondary_ip)
+ for node_name in touched_nodes)
def Exec(self, feedback_fn):
"""Execute disk replacement.
if self.delay_iallocator:
self._CheckPrereq2()
+ if __debug__:
+ # Verify owned locks before starting operation
+ owned_locks = self.lu.glm.list_owned(locking.LEVEL_NODE)
+ assert set(owned_locks) == set(self.node_secondary_ip), \
+ ("Incorrect node locks, owning %s, expected %s" %
+ (owned_locks, self.node_secondary_ip.keys()))
+
+ owned_locks = self.lu.glm.list_owned(locking.LEVEL_INSTANCE)
+ assert list(owned_locks) == [self.instance_name], \
+ "Instance '%s' not locked" % self.instance_name
+
+ assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \
+ "Should not own any node group lock at this point"
+
if not self.disks:
feedback_fn("No disks need replacement")
return
else:
fn = self._ExecDrbd8DiskOnly
- return fn(feedback_fn)
-
+ result = fn(feedback_fn)
finally:
# Deactivate the instance disks if we're replacing them on a
# down instance
if activate_disks:
_SafeShutdownInstanceDisks(self.lu, self.instance)
+ if __debug__:
+ # Verify owned locks
+ owned_locks = self.lu.glm.list_owned(locking.LEVEL_NODE)
+ nodes = frozenset(self.node_secondary_ip)
+ assert ((self.early_release and not owned_locks) or
+ (not self.early_release and not (set(owned_locks) - nodes))), \
+ ("Not owning the correct locks, early_release=%s, owned=%r,"
+ " nodes=%r" % (self.early_release, owned_locks, nodes))
+
+ return result
+
def _CheckVolumeGroup(self, nodes):
self.lu.LogInfo("Checking volume groups")
self.lu.LogWarning("Can't remove old LV: %s" % msg,
hint="remove unused LVs manually")
- def _ReleaseNodeLock(self, node_name):
- """Releases the lock for a given node."""
- self.lu.context.glm.release(locking.LEVEL_NODE, node_name)
-
def _ExecDrbd8DiskOnly(self, feedback_fn):
"""Replace a disk on the primary or secondary for DRBD 8.
self._RemoveOldStorage(self.target_node, iv_names)
# WARNING: we release both node locks here, do not do other RPCs
# than WaitForSync to the primary node
- self._ReleaseNodeLock([self.target_node, self.other_node])
+ _ReleaseLocks(self.lu, locking.LEVEL_NODE,
+ names=[self.target_node, self.other_node])
# Wait for sync
# This can fail as the old devices are degraded and _WaitForSync
self._RemoveOldStorage(self.target_node, iv_names)
# WARNING: we release all node locks here, do not do other RPCs
# than WaitForSync to the primary node
- self._ReleaseNodeLock([self.instance.primary_node,
- self.target_node,
- self.new_node])
+ _ReleaseLocks(self.lu, locking.LEVEL_NODE,
+ names=[self.instance.primary_node,
+ self.target_node,
+ self.new_node])
# Wait for sync
# This can fail as the old devices are degraded and _WaitForSync
if instance.disk_template not in constants.DTS_GROWABLE:
raise errors.OpPrereqError("Instance's disk layout does not support"
- " growing.", errors.ECODE_INVAL)
+ " growing", errors.ECODE_INVAL)
self.disk = instance.FindDisk(self.op.disk)
if not disks_ok:
raise errors.OpExecError("Cannot activate block device to grow")
+ # First run all grow ops in dry-run mode
+ for node in instance.all_nodes:
+ self.cfg.SetDiskID(disk, node)
+ result = self.rpc.call_blockdev_grow(node, disk, self.op.amount, True)
+ result.Raise("Grow request failed to node %s" % node)
+
+ # We know that (as far as we can test) operations across different
+ # nodes will succeed, time to run it for real
for node in instance.all_nodes:
self.cfg.SetDiskID(disk, node)
- result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
+ result = self.rpc.call_blockdev_grow(node, disk, self.op.amount, False)
result.Raise("Grow request failed to node %s" % node)
# TODO: Rewrite code to work properly
if self.op.wait_for_sync:
disk_abort = not _WaitForSync(self, instance, disks=[disk])
if disk_abort:
- self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
- " status.\nPlease check the instance.")
+ self.proc.LogWarning("Disk sync-ing has not returned a good"
+ " status; please check the instance")
if not instance.admin_up:
_SafeShutdownInstanceDisks(self, instance, disks=[disk])
elif not instance.admin_up:
self.proc.LogWarning("Not shutting down the disk even if the instance is"
" not supposed to be running because no wait for"
- " sync mode was requested.")
+ " sync mode was requested")
class LUInstanceQueryData(NoHooksLU):
"""
if self.wanted_names is None:
assert self.op.use_locking, "Locking was not used"
- self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
+ self.wanted_names = self.glm.list_owned(locking.LEVEL_INSTANCE)
self.wanted_instances = [self.cfg.GetInstanceInfo(name)
for name in self.wanted_names]
that node.
"""
- self.nodes = self.acquired_locks[locking.LEVEL_NODE]
+ self.nodes = self.glm.list_owned(locking.LEVEL_NODE)
rpcresult = self.rpc.call_export_list(self.nodes)
result = {}
for node in rpcresult:
fqdn_warn = True
instance_name = self.op.instance_name
- locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
+ locked_nodes = self.glm.list_owned(locking.LEVEL_NODE)
exportlist = self.rpc.call_export_list(locked_nodes)
found = False
for node in exportlist: