self.proc = processor
self.op = op
self.cfg = context.cfg
+ self.glm = context.glm
self.context = context
self.rpc = rpc
# Dicts used to declare locking needs to mcpu
self.needed_locks = None
- self.acquired_locks = {}
self.share_locks = dict.fromkeys(locking.LEVELS, 0)
self.add_locks = {}
self.remove_locks = {}
# future we might want to have different behaviors depending on the value
# of self.recalculate_locks[locking.LEVEL_NODE]
wanted_nodes = []
- for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
+ for instance_name in self.glm.list_owned(locking.LEVEL_INSTANCE):
instance = self.context.cfg.GetInstanceInfo(instance_name)
wanted_nodes.append(instance.primary_node)
if not primary_only:
"""
if self.do_locking:
- names = lu.acquired_locks[lock_level]
+ names = lu.glm.list_owned(lock_level)
else:
names = all_names
# caller specified names and we must keep the same order
assert self.names
- assert not self.do_locking or lu.acquired_locks[lock_level]
+ assert not self.do_locking or lu.glm.is_owned(lock_level)
missing = set(self.wanted).difference(names)
if missing:
release = []
# Determine which locks to release
- for name in lu.acquired_locks[level]:
+ for name in lu.glm.list_owned(level):
if should_release(name):
release.append(name)
else:
retain.append(name)
- assert len(lu.acquired_locks[level]) == (len(retain) + len(release))
+ assert len(lu.glm.list_owned(level)) == (len(retain) + len(release))
# Release just some locks
- lu.context.glm.release(level, names=release)
- lu.acquired_locks[level] = retain
+ lu.glm.release(level, names=release)
- assert frozenset(lu.context.glm.list_owned(level)) == frozenset(retain)
+ assert frozenset(lu.glm.list_owned(level)) == frozenset(retain)
else:
# Release everything
- lu.context.glm.release(level)
- del lu.acquired_locks[level]
+ lu.glm.release(level)
- assert not lu.context.glm.list_owned(level), "No locks should be owned"
+ assert not lu.glm.is_owned(level), "No locks should be owned"
def _RunPostHook(lu, node_name):
ntime_diff)
def _VerifyNodeLVM(self, ninfo, nresult, vg_name):
- """Check the node time.
+ """Check the node LVM results.
@type ninfo: L{objects.Node}
@param ninfo: the node to check
_ErrorIf(test, self.ENODELVM, node, "Invalid character ':' in PV"
" '%s' of VG '%s'", pvname, owner_vg)
+ def _VerifyNodeBridges(self, ninfo, nresult, bridges):
+ """Check the node bridges.
+
+ @type ninfo: L{objects.Node}
+ @param ninfo: the node to check
+ @param nresult: the remote results for the node
+ @param bridges: the expected list of bridges
+
+ """
+ if not bridges:
+ return
+
+ node = ninfo.name
+ _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
+
+ missing = nresult.get(constants.NV_BRIDGES, None)
+ test = not isinstance(missing, list)
+ _ErrorIf(test, self.ENODENET, node,
+ "did not return valid bridge information")
+ if not test:
+ _ErrorIf(bool(missing), self.ENODENET, node, "missing bridges: %s" %
+ utils.CommaJoin(sorted(missing)))
+
def _VerifyNodeNetwork(self, ninfo, nresult):
- """Check the node time.
+ """Check the node network connectivity results.
@type ninfo: L{objects.Node}
@param ninfo: the node to check
drbd_helper = self.cfg.GetDRBDHelper()
hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
cluster = self.cfg.GetClusterInfo()
- nodelist = utils.NiceSort(self.cfg.GetNodeList())
- nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
- nodeinfo_byname = dict(zip(nodelist, nodeinfo))
- instancelist = utils.NiceSort(self.cfg.GetInstanceList())
- instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
- for iname in instancelist)
+ nodeinfo_byname = self.cfg.GetAllNodesInfo()
+ nodelist = utils.NiceSort(nodeinfo_byname.keys())
+ nodeinfo = [nodeinfo_byname[nname] for nname in nodelist]
+ instanceinfo = self.cfg.GetAllInstancesInfo()
+ instancelist = utils.NiceSort(instanceinfo.keys())
groupinfo = self.cfg.GetAllNodeGroupsInfo()
i_non_redundant = [] # Non redundant instances
i_non_a_balanced = [] # Non auto-balanced instances
if drbd_helper:
node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
+ # bridge checks
+ # FIXME: this needs to be changed per node-group, not cluster-wide
+ bridges = set()
+ default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
+ if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
+ bridges.add(default_nicpp[constants.NIC_LINK])
+ for instance in instanceinfo.values():
+ for nic in instance.nics:
+ full_nic = cluster.SimpleFillNIC(nic.nicparams)
+ if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
+ bridges.add(full_nic[constants.NIC_LINK])
+
+ if bridges:
+ node_verify_param[constants.NV_BRIDGES] = list(bridges)
+
# Build our expected cluster state
node_image = dict((node.name, self.NodeImage(offline=node.offline,
name=node.name,
if refos_img is None:
refos_img = nimg
self._VerifyNodeOS(node_i, nimg, refos_img)
+ self._VerifyNodeBridges(node_i, nresult, bridges)
feedback_fn("* Verifying instance status")
for instance in instancelist:
def ExpandNames(self):
if self.op.instances:
- self.wanted_names = []
- for name in self.op.instances:
- full_name = _ExpandInstanceName(self.cfg, name)
- self.wanted_names.append(full_name)
+ self.wanted_names = _GetWantedInstances(self, self.op.instances)
self.needed_locks = {
locking.LEVEL_NODE: [],
locking.LEVEL_INSTANCE: self.wanted_names,
locking.LEVEL_NODE: locking.ALL_SET,
locking.LEVEL_INSTANCE: locking.ALL_SET,
}
- self.share_locks = dict(((i, 1) for i in locking.LEVELS))
+ self.share_locks = dict.fromkeys(locking.LEVELS, 1)
def DeclareLocks(self, level):
if level == locking.LEVEL_NODE and self.wanted_names is not None:
"""
if self.wanted_names is None:
- self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
+ self.wanted_names = self.glm.list_owned(locking.LEVEL_INSTANCE)
self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
in self.wanted_names]
" drbd-based instances exist",
errors.ECODE_INVAL)
- node_list = self.acquired_locks[locking.LEVEL_NODE]
+ node_list = self.glm.list_owned(locking.LEVEL_NODE)
# if vg_name not None, checks given volume group on all nodes
if self.op.vg_name:
# if we're moving instances to routed, check that they have an ip
target_mode = params_filled[constants.NIC_MODE]
if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
- nic_errors.append("Instance %s, nic/%d: routed nick with no ip" %
- (instance.name, nic_idx))
+ nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
+ " address" % (instance.name, nic_idx))
if nic_errors:
raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
"\n".join(nic_errors))
REG_BGL = False
_SKIP_MASTER = (constants.OOB_POWER_OFF, constants.OOB_POWER_CYCLE)
+ def ExpandNames(self):
+ """Gather locks we need.
+
+ """
+ if self.op.node_names:
+ self.op.node_names = _GetWantedNodes(self, self.op.node_names)
+ lock_names = self.op.node_names
+ else:
+ lock_names = locking.ALL_SET
+
+ self.needed_locks = {
+ locking.LEVEL_NODE: lock_names,
+ }
+
def CheckPrereq(self):
"""Check prerequisites.
" not marked offline") % node_name,
errors.ECODE_STATE)
- def ExpandNames(self):
- """Gather locks we need.
-
- """
- if self.op.node_names:
- self.op.node_names = [_ExpandNodeName(self.cfg, name)
- for name in self.op.node_names]
- lock_names = self.op.node_names
- else:
- lock_names = locking.ALL_SET
-
- self.needed_locks = {
- locking.LEVEL_NODE: lock_names,
- }
-
def Exec(self, feedback_fn):
"""Execute OOB and return result if we expect any.
master_node = self.master_node
ret = []
- for idx, node in enumerate(self.nodes):
+ for idx, node in enumerate(utils.NiceSort(self.nodes,
+ key=lambda node: node.name)):
node_entry = [(constants.RS_NORMAL, node.name)]
ret.append(node_entry)
"""
# Locking is not used
- assert not (lu.acquired_locks or self.do_locking or self.use_locking)
+ assert not (compat.any(lu.glm.is_owned(level)
+ for level in locking.LEVELS
+ if level != locking.LEVEL_CLUSTER) or
+ self.do_locking or self.use_locking)
valid_nodes = [node.name
for node in lu.cfg.GetAllNodesInfo().values()
"""Computes the list of nodes and their attributes.
"""
- nodenames = self.acquired_locks[locking.LEVEL_NODE]
+ nodenames = self.glm.list_owned(locking.LEVEL_NODE)
volumes = self.rpc.call_node_volumes(nodenames)
ilist = [self.cfg.GetInstanceInfo(iname) for iname
"""Computes the list of nodes and their attributes.
"""
- self.nodes = self.acquired_locks[locking.LEVEL_NODE]
+ self.nodes = self.glm.list_owned(locking.LEVEL_NODE)
# Always get name to sort by
if constants.SF_NAME in self.op.output_fields:
instances_keep = []
# Build list of instances to release
- for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
+ for instance_name in self.glm.list_owned(locking.LEVEL_INSTANCE):
instance = self.context.cfg.GetInstanceInfo(instance_name)
if (instance.disk_template in constants.DTS_INT_MIRROR and
self.op.node_name in instance.all_nodes):
_ReleaseLocks(self, locking.LEVEL_INSTANCE, keep=instances_keep)
- assert (set(self.acquired_locks.get(locking.LEVEL_INSTANCE, [])) ==
+ assert (set(self.glm.list_owned(locking.LEVEL_INSTANCE)) ==
set(instances_keep))
def BuildHooksEnv(self):
HTYPE = constants.HTYPE_INSTANCE
REQ_BGL = False
+ def CheckArguments(self):
+ # normalise the disk list
+ self.op.disks = sorted(frozenset(self.op.disks))
+
def ExpandNames(self):
self._ExpandAndLockInstance()
+ self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
+ if self.op.nodes:
+ self.op.nodes = [_ExpandNodeName(self.cfg, n) for n in self.op.nodes]
+ self.needed_locks[locking.LEVEL_NODE] = list(self.op.nodes)
+ else:
+ self.needed_locks[locking.LEVEL_NODE] = []
+
+ def DeclareLocks(self, level):
+ if level == locking.LEVEL_NODE:
+ # if we replace the nodes, we only need to lock the old primary,
+ # otherwise we need to lock all nodes for disk re-creation
+ primary_only = bool(self.op.nodes)
+ self._LockInstancesNodes(primary_only=primary_only)
def BuildHooksEnv(self):
"""Build hooks env.
instance = self.cfg.GetInstanceInfo(self.op.instance_name)
assert instance is not None, \
"Cannot retrieve locked instance %s" % self.op.instance_name
- _CheckNodeOnline(self, instance.primary_node)
+ if self.op.nodes:
+ if len(self.op.nodes) != len(instance.all_nodes):
+ raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
+ " %d replacement nodes were specified" %
+ (instance.name, len(instance.all_nodes),
+ len(self.op.nodes)),
+ errors.ECODE_INVAL)
+ assert instance.disk_template != constants.DT_DRBD8 or \
+ len(self.op.nodes) == 2
+ assert instance.disk_template != constants.DT_PLAIN or \
+ len(self.op.nodes) == 1
+ primary_node = self.op.nodes[0]
+ else:
+ primary_node = instance.primary_node
+ _CheckNodeOnline(self, primary_node)
if instance.disk_template == constants.DT_DISKLESS:
raise errors.OpPrereqError("Instance '%s' has no disks" %
self.op.instance_name, errors.ECODE_INVAL)
- _CheckInstanceDown(self, instance, "cannot recreate disks")
+ # if we replace nodes *and* the old primary is offline, we don't
+ # check
+ assert instance.primary_node in self.needed_locks[locking.LEVEL_NODE]
+ old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
+ if not (self.op.nodes and old_pnode.offline):
+ _CheckInstanceDown(self, instance, "cannot recreate disks")
if not self.op.disks:
self.op.disks = range(len(instance.disks))
if idx >= len(instance.disks):
raise errors.OpPrereqError("Invalid disk index '%s'" % idx,
errors.ECODE_INVAL)
-
+ if self.op.disks != range(len(instance.disks)) and self.op.nodes:
+ raise errors.OpPrereqError("Can't recreate disks partially and"
+ " change the nodes at the same time",
+ errors.ECODE_INVAL)
self.instance = instance
def Exec(self, feedback_fn):
"""Recreate the disks.
"""
+ # change primary node, if needed
+ if self.op.nodes:
+ self.instance.primary_node = self.op.nodes[0]
+ self.LogWarning("Changing the instance's nodes, you will have to"
+ " remove any disks left on the older nodes manually")
+
to_skip = []
- for idx, _ in enumerate(self.instance.disks):
+ for idx, disk in enumerate(self.instance.disks):
if idx not in self.op.disks: # disk idx has not been passed in
to_skip.append(idx)
continue
+ # update secondaries for disks, if needed
+ if self.op.nodes:
+ if disk.dev_type == constants.LD_DRBD8:
+ # need to update the nodes
+ assert len(self.op.nodes) == 2
+ logical_id = list(disk.logical_id)
+ logical_id[0] = self.op.nodes[0]
+ logical_id[1] = self.op.nodes[1]
+ disk.logical_id = tuple(logical_id)
+
+ if self.op.nodes:
+ self.cfg.Update(self.instance, feedback_fn)
_CreateDisks(self, self.instance, to_skip=to_skip)
new_name = self.op.new_name
if self.op.name_check:
hostname = netutils.GetHostname(name=new_name)
- self.LogInfo("Resolved given name '%s' to '%s'", new_name,
- hostname.name)
+ if hostname != new_name:
+ self.LogInfo("Resolved given name '%s' to '%s'", new_name,
+ hostname.name)
if not utils.MatchNameComponent(self.op.new_name, [hostname.name]):
raise errors.OpPrereqError(("Resolved hostname '%s' does not look the"
" same as given hostname '%s'") %
# Change the instance lock. This is definitely safe while we hold the BGL.
# Otherwise the new lock would have to be added in acquired mode.
assert self.REQ_BGL
- self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
- self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
+ self.glm.remove(locking.LEVEL_INSTANCE, old_name)
+ self.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
# re-read the instance from the configuration after rename
inst = self.cfg.GetInstanceInfo(self.op.new_name)
shutdown_timeout = self.op.shutdown_timeout
self._migrater = TLMigrateInstance(self, self.op.instance_name,
cleanup=False,
- iallocator=self.op.iallocator,
- target_node=self.op.target_node,
failover=True,
ignore_consistency=ignore_consistency,
shutdown_timeout=shutdown_timeout)
"""
instance = self._migrater.instance
source_node = instance.primary_node
- target_node = self._migrater.target_node
+ target_node = self.op.target_node
env = {
"IGNORE_CONSISTENCY": self.op.ignore_consistency,
"SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
self._migrater = TLMigrateInstance(self, self.op.instance_name,
cleanup=self.op.cleanup,
- iallocator=self.op.iallocator,
- target_node=self.op.target_node,
failover=False,
fallback=self.op.allow_failover)
self.tasklets = [self._migrater]
"""
instance = self._migrater.instance
source_node = instance.primary_node
- target_node = self._migrater.target_node
+ target_node = self.op.target_node
env = _BuildInstanceHookEnvByObject(self, instance)
env.update({
"MIGRATE_LIVE": self._migrater.live,
logging.debug("Migrating instance %s", inst.name)
names.append(inst.name)
- tasklets.append(TLMigrateInstance(self, inst.name, cleanup=False,
- iallocator=self.op.iallocator,
- taget_node=None))
+ tasklets.append(TLMigrateInstance(self, inst.name, cleanup=False))
if inst.disk_template in constants.DTS_EXT_MIRROR:
# We need to lock all nodes, as the iallocator will choose the
@ivar shutdown_timeout: In case of failover timeout of the shutdown
"""
- def __init__(self, lu, instance_name, cleanup=False, iallocator=None,
- target_node=None, failover=False, fallback=False,
+ def __init__(self, lu, instance_name, cleanup=False,
+ failover=False, fallback=False,
ignore_consistency=False,
shutdown_timeout=constants.DEFAULT_SHUTDOWN_TIMEOUT):
"""Initializes this class.
self.instance_name = instance_name
self.cleanup = cleanup
self.live = False # will be overridden later
- self.iallocator = iallocator
- self.target_node = target_node
self.failover = failover
self.fallback = fallback
self.ignore_consistency = ignore_consistency
if instance.disk_template in constants.DTS_EXT_MIRROR:
_CheckIAllocatorOrNode(self.lu, "iallocator", "target_node")
- if self.iallocator:
+ if self.lu.op.iallocator:
self._RunAllocator()
+ else:
+ # We set set self.target_node as it is required by
+ # BuildHooksEnv
+ self.target_node = self.lu.op.target_node
# self.target_node is already populated, either directly or by the
# iallocator run
target_node = self.target_node
+ if self.target_node == instance.primary_node:
+ raise errors.OpPrereqError("Cannot migrate instance %s"
+ " to its primary (%s)" %
+ (instance.name, instance.primary_node))
if len(self.lu.tasklets) == 1:
- # It is safe to release locks only when we're the only tasklet in the LU
- _ReleaseLocks(self, locking.LEVEL_NODE,
+ # It is safe to release locks only when we're the only tasklet
+ # in the LU
+ _ReleaseLocks(self.lu, locking.LEVEL_NODE,
keep=[instance.primary_node, self.target_node])
else:
" %s disk template" %
instance.disk_template)
target_node = secondary_nodes[0]
- if self.iallocator or (self.target_node and
- self.target_node != target_node):
+ if self.lu.op.iallocator or (self.lu.op.target_node and
+ self.lu.op.target_node != target_node):
if self.failover:
text = "failed over"
else:
self.instance.primary_node],
)
- ial.Run(self.iallocator)
+ ial.Run(self.lu.op.iallocator)
if not ial.success:
raise errors.OpPrereqError("Can't compute nodes using"
" iallocator '%s': %s" %
- (self.iallocator, ial.info),
+ (self.lu.op.iallocator, ial.info),
errors.ECODE_NORES)
if len(ial.result) != ial.required_nodes:
raise errors.OpPrereqError("iallocator '%s' returned invalid number"
" of nodes (%s), required %s" %
- (self.iallocator, len(ial.result),
+ (self.lu.op.iallocator, len(ial.result),
ial.required_nodes), errors.ECODE_FAULT)
self.target_node = ial.result[0]
self.lu.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
- self.instance_name, self.iallocator,
+ self.instance_name, self.lu.op.iallocator,
utils.CommaJoin(ial.result))
def _WaitUntilSync(self):
src_path = self.op.src_path
if src_node is None:
- locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
+ locked_nodes = self.glm.list_owned(locking.LEVEL_NODE)
exp_list = self.rpc.call_export_list(locked_nodes)
found = False
for node in exp_list:
# Lock member nodes of all locked groups
self.needed_locks[locking.LEVEL_NODE] = [node_name
- for group_uuid in self.acquired_locks[locking.LEVEL_NODEGROUP]
+ for group_uuid in self.glm.list_owned(locking.LEVEL_NODEGROUP)
for node_name in self.cfg.GetNodeGroup(group_uuid).members]
else:
self._LockInstancesNodes()
"""Check prerequisites.
"""
- assert (locking.LEVEL_NODEGROUP in self.acquired_locks or
+ assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or
self.op.iallocator is None)
- if locking.LEVEL_NODEGROUP in self.acquired_locks:
+ owned_groups = self.glm.list_owned(locking.LEVEL_NODEGROUP)
+ if owned_groups:
groups = self.cfg.GetInstanceNodeGroups(self.op.instance_name)
- prevgroups = self.acquired_locks[locking.LEVEL_NODEGROUP]
- if prevgroups != groups:
+ if owned_groups != groups:
raise errors.OpExecError("Node groups used by instance '%s' changed"
" since lock was acquired, current list is %r,"
" used to be '%s'" %
(self.op.instance_name,
utils.CommaJoin(groups),
- utils.CommaJoin(prevgroups)))
+ utils.CommaJoin(owned_groups)))
return LogicalUnit.CheckPrereq(self)
if remote_node is None:
self.remote_node_info = None
else:
- assert remote_node in self.lu.acquired_locks[locking.LEVEL_NODE], \
+ assert remote_node in self.lu.glm.list_owned(locking.LEVEL_NODE), \
"Remote node '%s' is not locked" % remote_node
self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
_ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
# Release any owned node group
- if self.lu.context.glm.is_owned(locking.LEVEL_NODEGROUP):
+ if self.lu.glm.is_owned(locking.LEVEL_NODEGROUP):
_ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
# Check whether disks are valid
if __debug__:
# Verify owned locks before starting operation
- owned_locks = self.lu.context.glm.list_owned(locking.LEVEL_NODE)
+ owned_locks = self.lu.glm.list_owned(locking.LEVEL_NODE)
assert set(owned_locks) == set(self.node_secondary_ip), \
("Incorrect node locks, owning %s, expected %s" %
(owned_locks, self.node_secondary_ip.keys()))
- owned_locks = self.lu.context.glm.list_owned(locking.LEVEL_INSTANCE)
+ owned_locks = self.lu.glm.list_owned(locking.LEVEL_INSTANCE)
assert list(owned_locks) == [self.instance_name], \
"Instance '%s' not locked" % self.instance_name
- assert not self.lu.context.glm.is_owned(locking.LEVEL_NODEGROUP), \
+ assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \
"Should not own any node group lock at this point"
if not self.disks:
if __debug__:
# Verify owned locks
- owned_locks = self.lu.context.glm.list_owned(locking.LEVEL_NODE)
+ owned_locks = self.lu.glm.list_owned(locking.LEVEL_NODE)
nodes = frozenset(self.node_secondary_ip)
assert ((self.early_release and not owned_locks) or
(not self.early_release and not (set(owned_locks) - nodes))), \
if not disks_ok:
raise errors.OpExecError("Cannot activate block device to grow")
+ # First run all grow ops in dry-run mode
+ for node in instance.all_nodes:
+ self.cfg.SetDiskID(disk, node)
+ result = self.rpc.call_blockdev_grow(node, disk, self.op.amount, True)
+ result.Raise("Grow request failed to node %s" % node)
+
+ # We know that (as far as we can test) operations across different
+ # nodes will succeed, time to run it for real
for node in instance.all_nodes:
self.cfg.SetDiskID(disk, node)
- result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
+ result = self.rpc.call_blockdev_grow(node, disk, self.op.amount, False)
result.Raise("Grow request failed to node %s" % node)
# TODO: Rewrite code to work properly
"""
if self.wanted_names is None:
assert self.op.use_locking, "Locking was not used"
- self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
+ self.wanted_names = self.glm.list_owned(locking.LEVEL_INSTANCE)
self.wanted_instances = [self.cfg.GetInstanceInfo(name)
for name in self.wanted_names]
self.cfg.Update(instance, feedback_fn)
# disks are created, waiting for sync
- disk_abort = not _WaitForSync(self, instance)
+ disk_abort = not _WaitForSync(self, instance,
+ oneshot=not self.op.wait_for_sync)
if disk_abort:
raise errors.OpExecError("There are some degraded disks for"
" this instance, please cleanup manually")
that node.
"""
- self.nodes = self.acquired_locks[locking.LEVEL_NODE]
+ self.nodes = self.glm.list_owned(locking.LEVEL_NODE)
rpcresult = self.rpc.call_export_list(self.nodes)
result = {}
for node in rpcresult:
fqdn_warn = True
instance_name = self.op.instance_name
- locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
+ locked_nodes = self.glm.list_owned(locking.LEVEL_NODE)
exportlist = self.rpc.call_export_list(locked_nodes)
found = False
for node in exportlist:
# We want to lock all the affected nodes and groups. We have readily
# available the list of nodes, and the *destination* group. To gather the
- # list of "source" groups, we need to fetch node information.
- self.node_data = self.cfg.GetAllNodesInfo()
- affected_groups = set(self.node_data[node].group for node in self.op.nodes)
- affected_groups.add(self.group_uuid)
-
+ # list of "source" groups, we need to fetch node information later on.
self.needed_locks = {
- locking.LEVEL_NODEGROUP: list(affected_groups),
+ locking.LEVEL_NODEGROUP: set([self.group_uuid]),
locking.LEVEL_NODE: self.op.nodes,
}
+ def DeclareLocks(self, level):
+ if level == locking.LEVEL_NODEGROUP:
+ assert len(self.needed_locks[locking.LEVEL_NODEGROUP]) == 1
+
+ # Try to get all affected nodes' groups without having the group or node
+ # lock yet. Needs verification later in the code flow.
+ groups = self.cfg.GetNodeGroupsFromNodes(self.op.nodes)
+
+ self.needed_locks[locking.LEVEL_NODEGROUP].update(groups)
+
def CheckPrereq(self):
"""Check prerequisites.
"""
+ assert self.needed_locks[locking.LEVEL_NODEGROUP]
+ assert (frozenset(self.glm.list_owned(locking.LEVEL_NODE)) ==
+ frozenset(self.op.nodes))
+
+ expected_locks = (set([self.group_uuid]) |
+ self.cfg.GetNodeGroupsFromNodes(self.op.nodes))
+ actual_locks = self.glm.list_owned(locking.LEVEL_NODEGROUP)
+ if actual_locks != expected_locks:
+ raise errors.OpExecError("Nodes changed groups since locks were acquired,"
+ " current groups are '%s', used to be '%s'" %
+ (utils.CommaJoin(expected_locks),
+ utils.CommaJoin(actual_locks)))
+
+ self.node_data = self.cfg.GetAllNodesInfo()
self.group = self.cfg.GetNodeGroup(self.group_uuid)
instance_data = self.cfg.GetAllInstancesInfo()
for node in self.op.nodes:
self.node_data[node].group = self.group_uuid
+ # FIXME: Depends on side-effects of modifying the result of
+ # C{cfg.GetAllNodesInfo}
+
self.cfg.Update(self.group, feedback_fn) # Saves all modified nodes.
@staticmethod
"""
# pylint: disable-msg=R0902
# lots of instance attributes
- _ALLO_KEYS = [
- "name", "mem_size", "disks", "disk_template",
- "os", "tags", "nics", "vcpus", "hypervisor",
- ]
- _RELO_KEYS = [
- "name", "relocate_from",
- ]
- _EVAC_KEYS = [
- "evac_nodes",
- ]
def __init__(self, cfg, rpc, mode, **kwargs):
self.cfg = cfg
self.required_nodes = None
# init result fields
self.success = self.info = self.result = None
- if self.mode == constants.IALLOCATOR_MODE_ALLOC:
- keyset = self._ALLO_KEYS
- fn = self._AddNewInstance
- elif self.mode == constants.IALLOCATOR_MODE_RELOC:
- keyset = self._RELO_KEYS
- fn = self._AddRelocateInstance
- elif self.mode == constants.IALLOCATOR_MODE_MEVAC:
- keyset = self._EVAC_KEYS
- fn = self._AddEvacuateNodes
- else:
+
+ try:
+ (fn, keyset) = self._MODE_DATA[self.mode]
+ except KeyError:
raise errors.ProgrammerError("Unknown mode '%s' passed to the"
" IAllocator" % self.mode)
+
for key in kwargs:
if key not in keyset:
raise errors.ProgrammerError("Invalid input parameter '%s' to"
if key not in kwargs:
raise errors.ProgrammerError("Missing input parameter '%s' to"
" IAllocator" % key)
- self._BuildInputData(fn)
+ self._BuildInputData(compat.partial(fn, self))
def _ComputeClusterData(self):
"""Compute the generic allocator input data.
self.in_text = serializer.Dump(self.in_data)
+ _MODE_DATA = {
+ constants.IALLOCATOR_MODE_ALLOC:
+ (_AddNewInstance,
+ ["name", "mem_size", "disks", "disk_template", "os", "tags", "nics",
+ "vcpus", "hypervisor"]),
+ constants.IALLOCATOR_MODE_RELOC:
+ (_AddRelocateInstance, ["name", "relocate_from"]),
+ constants.IALLOCATOR_MODE_MEVAC:
+ (_AddEvacuateNodes, ["evac_nodes"]),
+ }
+
def Run(self, name, validate=True, call_fn=None):
"""Run an instance allocator and return the results.