#
#
-# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
+# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
return dict.fromkeys(locking.LEVELS, 1)
+def _MakeLegacyNodeInfo(data):
+ """Formats the data returned by L{rpc.RpcRunner.call_node_info}.
+
+ Converts the data into a single dictionary. This is fine for most use cases,
+ but some require information from more than one volume group or hypervisor.
+
+ """
+ (bootid, (vg_info, ), (hv_info, )) = data
+
+ return utils.JoinDisjointDicts(utils.JoinDisjointDicts(vg_info, hv_info), {
+ "bootid": bootid,
+ })
+
+
def _CheckInstanceNodeGroups(cfg, instance_name, owned_groups):
"""Checks if the owned node groups are still correct for an instance.
return params_copy
+def _GetUpdatedIPolicy(old_ipolicy, new_ipolicy, group_policy=False):
+ """Return the new version of a instance policy.
+
+ @param group_policy: whether this policy applies to a group and thus
+ we should support removal of policy entries
+
+ """
+ use_none = use_default = group_policy
+ ipolicy = copy.deepcopy(old_ipolicy)
+ for key, value in new_ipolicy.items():
+ if key not in constants.IPOLICY_ALL_KEYS:
+ raise errors.OpPrereqError("Invalid key in new ipolicy: %s" % key,
+ errors.ECODE_INVAL)
+ if key in constants.IPOLICY_ISPECS:
+ utils.ForceDictType(value, constants.ISPECS_PARAMETER_TYPES)
+ ipolicy[key] = _GetUpdatedParams(old_ipolicy.get(key, {}), value,
+ use_none=use_none,
+ use_default=use_default)
+ else:
+ if not value or value == [constants.VALUE_DEFAULT]:
+ if group_policy:
+ del ipolicy[key]
+ else:
+ raise errors.OpPrereqError("Can't unset ipolicy attribute '%s'"
+ " on the cluster'" % key,
+ errors.ECODE_INVAL)
+ else:
+ if key in constants.IPOLICY_PARAMETERS:
+ # FIXME: we assume all such values are float
+ try:
+ ipolicy[key] = float(value)
+ except (TypeError, ValueError), err:
+ raise errors.OpPrereqError("Invalid value for attribute"
+ " '%s': '%s', error: %s" %
+ (key, value, err), errors.ECODE_INVAL)
+ else:
+ # FIXME: we assume all others are lists; this should be redone
+ # in a nicer way
+ ipolicy[key] = list(value)
+ try:
+ objects.InstancePolicy.CheckParameterSyntax(ipolicy)
+ except errors.ConfigurationError, err:
+ raise errors.OpPrereqError("Invalid instance policy: %s" % err,
+ errors.ECODE_INVAL)
+ return ipolicy
+
+
+def _UpdateAndVerifySubDict(base, updates, type_check):
+ """Updates and verifies a dict with sub dicts of the same type.
+
+ @param base: The dict with the old data
+ @param updates: The dict with the new data
+ @param type_check: Dict suitable to ForceDictType to verify correct types
+ @returns: A new dict with updated and verified values
+
+ """
+ def fn(old, value):
+ new = _GetUpdatedParams(old, value)
+ utils.ForceDictType(new, type_check)
+ return new
+
+ ret = copy.deepcopy(base)
+ ret.update(dict((key, fn(base.get(key, {}), value))
+ for key, value in updates.items()))
+ return ret
+
+
+def _MergeAndVerifyHvState(op_input, obj_input):
+ """Combines the hv state from an opcode with the one of the object
+
+ @param op_input: The input dict from the opcode
+ @param obj_input: The input dict from the objects
+ @return: The verified and updated dict
+
+ """
+ if op_input:
+ invalid_hvs = set(op_input) - constants.HYPER_TYPES
+ if invalid_hvs:
+ raise errors.OpPrereqError("Invalid hypervisor(s) in hypervisor state:"
+ " %s" % utils.CommaJoin(invalid_hvs),
+ errors.ECODE_INVAL)
+ if obj_input is None:
+ obj_input = {}
+ type_check = constants.HVSTS_PARAMETER_TYPES
+ return _UpdateAndVerifySubDict(obj_input, op_input, type_check)
+
+ return None
+
+
+def _MergeAndVerifyDiskState(op_input, obj_input):
+ """Combines the disk state from an opcode with the one of the object
+
+ @param op_input: The input dict from the opcode
+ @param obj_input: The input dict from the objects
+ @return: The verified and updated dict
+ """
+ if op_input:
+ invalid_dst = set(op_input) - constants.DS_VALID_TYPES
+ if invalid_dst:
+ raise errors.OpPrereqError("Invalid storage type(s) in disk state: %s" %
+ utils.CommaJoin(invalid_dst),
+ errors.ECODE_INVAL)
+ type_check = constants.DSS_PARAMETER_TYPES
+ if obj_input is None:
+ obj_input = {}
+ return dict((key, _UpdateAndVerifySubDict(obj_input.get(key, {}), value,
+ type_check))
+ for key, value in op_input.items())
+
+ return None
+
+
def _ReleaseLocks(lu, level, names=None, keep=None):
"""Releases locks owned by an LU.
if msg is None:
msg = "can't use instance from outside %s states" % ", ".join(req_states)
if instance.admin_state not in req_states:
- raise errors.OpPrereqError("Instance %s is marked to be %s, %s" %
- (instance, instance.admin_state, msg),
+ raise errors.OpPrereqError("Instance '%s' is marked to be %s, %s" %
+ (instance.name, instance.admin_state, msg),
errors.ECODE_STATE)
if constants.ADMINST_UP not in req_states:
(instance.name, msg), errors.ECODE_STATE)
+def _ComputeMinMaxSpec(name, ipolicy, value):
+ """Computes if value is in the desired range.
+
+ @param name: name of the parameter for which we perform the check
+ @param ipolicy: dictionary containing min, max and std values
+ @param value: actual value that we want to use
+ @return: None or element not meeting the criteria
+
+
+ """
+ if value in [None, constants.VALUE_AUTO]:
+ return None
+ max_v = ipolicy[constants.ISPECS_MAX].get(name, value)
+ min_v = ipolicy[constants.ISPECS_MIN].get(name, value)
+ if value > max_v or min_v > value:
+ return ("%s value %s is not in range [%s, %s]" %
+ (name, value, min_v, max_v))
+ return None
+
+
+def _ComputeIPolicySpecViolation(ipolicy, mem_size, cpu_count, disk_count,
+ nic_count, disk_sizes,
+ _compute_fn=_ComputeMinMaxSpec):
+ """Verifies ipolicy against provided specs.
+
+ @type ipolicy: dict
+ @param ipolicy: The ipolicy
+ @type mem_size: int
+ @param mem_size: The memory size
+ @type cpu_count: int
+ @param cpu_count: Used cpu cores
+ @type disk_count: int
+ @param disk_count: Number of disks used
+ @type nic_count: int
+ @param nic_count: Number of nics used
+ @type disk_sizes: list of ints
+ @param disk_sizes: Disk sizes of used disk (len must match C{disk_count})
+ @param _compute_fn: The compute function (unittest only)
+ @return: A list of violations, or an empty list of no violations are found
+
+ """
+ assert disk_count == len(disk_sizes)
+
+ test_settings = [
+ (constants.ISPEC_MEM_SIZE, mem_size),
+ (constants.ISPEC_CPU_COUNT, cpu_count),
+ (constants.ISPEC_DISK_COUNT, disk_count),
+ (constants.ISPEC_NIC_COUNT, nic_count),
+ ] + map((lambda d: (constants.ISPEC_DISK_SIZE, d)), disk_sizes)
+
+ return filter(None,
+ (_compute_fn(name, ipolicy, value)
+ for (name, value) in test_settings))
+
+
+def _ComputeIPolicyInstanceViolation(ipolicy, instance,
+ _compute_fn=_ComputeIPolicySpecViolation):
+ """Compute if instance meets the specs of ipolicy.
+
+ @type ipolicy: dict
+ @param ipolicy: The ipolicy to verify against
+ @type instance: L{objects.Instance}
+ @param instance: The instance to verify
+ @param _compute_fn: The function to verify ipolicy (unittest only)
+ @see: L{_ComputeIPolicySpecViolation}
+
+ """
+ mem_size = instance.beparams.get(constants.BE_MAXMEM, None)
+ cpu_count = instance.beparams.get(constants.BE_VCPUS, None)
+ disk_count = len(instance.disks)
+ disk_sizes = [disk.size for disk in instance.disks]
+ nic_count = len(instance.nics)
+
+ return _compute_fn(ipolicy, mem_size, cpu_count, disk_count, nic_count,
+ disk_sizes)
+
+
+def _ComputeIPolicyInstanceSpecViolation(ipolicy, instance_spec,
+ _compute_fn=_ComputeIPolicySpecViolation):
+ """Compute if instance specs meets the specs of ipolicy.
+
+ @type ipolicy: dict
+ @param ipolicy: The ipolicy to verify against
+ @param instance_spec: dict
+ @param instance_spec: The instance spec to verify
+ @param _compute_fn: The function to verify ipolicy (unittest only)
+ @see: L{_ComputeIPolicySpecViolation}
+
+ """
+ mem_size = instance_spec.get(constants.ISPEC_MEM_SIZE, None)
+ cpu_count = instance_spec.get(constants.ISPEC_CPU_COUNT, None)
+ disk_count = instance_spec.get(constants.ISPEC_DISK_COUNT, 0)
+ disk_sizes = instance_spec.get(constants.ISPEC_DISK_SIZE, [])
+ nic_count = instance_spec.get(constants.ISPEC_NIC_COUNT, 0)
+
+ return _compute_fn(ipolicy, mem_size, cpu_count, disk_count, nic_count,
+ disk_sizes)
+
+
+def _ComputeIPolicyNodeViolation(ipolicy, instance, current_group,
+ target_group,
+ _compute_fn=_ComputeIPolicyInstanceViolation):
+ """Compute if instance meets the specs of the new target group.
+
+ @param ipolicy: The ipolicy to verify
+ @param instance: The instance object to verify
+ @param current_group: The current group of the instance
+ @param target_group: The new group of the instance
+ @param _compute_fn: The function to verify ipolicy (unittest only)
+ @see: L{_ComputeIPolicySpecViolation}
+
+ """
+ if current_group == target_group:
+ return []
+ else:
+ return _compute_fn(ipolicy, instance)
+
+
+def _CheckTargetNodeIPolicy(lu, ipolicy, instance, node, ignore=False,
+ _compute_fn=_ComputeIPolicyNodeViolation):
+ """Checks that the target node is correct in terms of instance policy.
+
+ @param ipolicy: The ipolicy to verify
+ @param instance: The instance object to verify
+ @param node: The new node to relocate
+ @param ignore: Ignore violations of the ipolicy
+ @param _compute_fn: The function to verify ipolicy (unittest only)
+ @see: L{_ComputeIPolicySpecViolation}
+
+ """
+ primary_node = lu.cfg.GetNodeInfo(instance.primary_node)
+ res = _compute_fn(ipolicy, instance, primary_node.group, node.group)
+
+ if res:
+ msg = ("Instance does not meet target node group's (%s) instance"
+ " policy: %s") % (node.group, utils.CommaJoin(res))
+ if ignore:
+ lu.LogWarning(msg)
+ else:
+ raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
+
+
+def _ComputeNewInstanceViolations(old_ipolicy, new_ipolicy, instances):
+ """Computes a set of any instances that would violate the new ipolicy.
+
+ @param old_ipolicy: The current (still in-place) ipolicy
+ @param new_ipolicy: The new (to become) ipolicy
+ @param instances: List of instances to verify
+ @return: A list of instances which violates the new ipolicy but did not before
+
+ """
+ return (_ComputeViolatingInstances(old_ipolicy, instances) -
+ _ComputeViolatingInstances(new_ipolicy, instances))
+
+
def _ExpandItemName(fn, name, kind):
"""Expand an item name.
return mc_now < mc_should
+def _CalculateGroupIPolicy(cluster, group):
+ """Calculate instance policy for group.
+
+ """
+ return cluster.SimpleFillIPolicy(group.ipolicy)
+
+
+def _ComputeViolatingInstances(ipolicy, instances):
+ """Computes a set of instances who violates given ipolicy.
+
+ @param ipolicy: The ipolicy to verify
+ @type instances: object.Instance
+ @param instances: List of instances to verify
+ @return: A frozenset of instance names violating the ipolicy
+
+ """
+ return frozenset([inst.name for inst in instances
+ if _ComputeIPolicyInstanceViolation(ipolicy, inst)])
+
+
def _CheckNicsBridgesExist(lu, target_nics, target_node):
"""Check that the brigdes needed by a list of nics exist.
ems = self.cfg.GetUseExternalMipScript()
result = self.rpc.call_node_deactivate_master_ip(master_params.name,
master_params, ems)
- result.Raise("Could not disable the master role")
+ if result.fail_msg:
+ self.LogWarning("Error disabling the master IP address: %s",
+ result.fail_msg)
return master_params.name
node_vol_should = {}
instanceconfig.MapLVsByNode(node_vol_should)
+ ipolicy = _CalculateGroupIPolicy(self.cfg.GetClusterInfo(), self.group_info)
+ err = _ComputeIPolicyInstanceViolation(ipolicy, instanceconfig)
+ _ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance, err)
+
for node in node_vol_should:
n_img = node_image[node]
if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
# we already list instances living on such nodes, and that's
# enough warning
continue
- #TODO(dynmem): use MINMEM for checking
#TODO(dynmem): also consider ballooning out other instances
for prinode, instances in n_img.sbp.items():
needed_mem = 0
for instance in instances:
bep = cluster_info.FillBE(instance_cfg[instance])
if bep[constants.BE_AUTO_BALANCE]:
- needed_mem += bep[constants.BE_MAXMEM]
+ needed_mem += bep[constants.BE_MINMEM]
test = n_img.mfree < needed_mem
self._ErrorIf(test, constants.CV_ENODEN1, node,
"not enough memory to accomodate instance failovers"
if self.op.master_netmask is not None:
_ValidateNetmask(self.cfg, self.op.master_netmask)
+ if self.op.diskparams:
+ for dt_params in self.op.diskparams.values():
+ utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
+
def ExpandNames(self):
# FIXME: in the future maybe other cluster params won't require checking on
# all nodes to be modified.
self.needed_locks = {
locking.LEVEL_NODE: locking.ALL_SET,
+ locking.LEVEL_INSTANCE: locking.ALL_SET,
+ locking.LEVEL_NODEGROUP: locking.ALL_SET,
+ }
+ self.share_locks = {
+ locking.LEVEL_NODE: 1,
+ locking.LEVEL_INSTANCE: 1,
+ locking.LEVEL_NODEGROUP: 1,
}
- self.share_locks[locking.LEVEL_NODE] = 1
def BuildHooksEnv(self):
"""Build hooks env.
self.cluster = cluster = self.cfg.GetClusterInfo()
# validate params changes
if self.op.beparams:
+ objects.UpgradeBeParams(self.op.beparams)
utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
self.new_ndparams["oob_program"] = \
constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
+ if self.op.hv_state:
+ new_hv_state = _MergeAndVerifyHvState(self.op.hv_state,
+ self.cluster.hv_state_static)
+ self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
+ for hv, values in new_hv_state.items())
+
+ if self.op.disk_state:
+ new_disk_state = _MergeAndVerifyDiskState(self.op.disk_state,
+ self.cluster.disk_state_static)
+ self.new_disk_state = \
+ dict((storage, dict((name, cluster.SimpleFillDiskState(values))
+ for name, values in svalues.items()))
+ for storage, svalues in new_disk_state.items())
+
+ if self.op.ipolicy:
+ self.new_ipolicy = _GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
+ group_policy=False)
+
+ all_instances = self.cfg.GetAllInstancesInfo().values()
+ violations = set()
+ for group in self.cfg.GetAllNodeGroupsInfo().values():
+ instances = frozenset([inst for inst in all_instances
+ if compat.any(node in group.members
+ for node in inst.all_nodes)])
+ new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
+ new = _ComputeNewInstanceViolations(_CalculateGroupIPolicy(cluster,
+ group),
+ new_ipolicy, instances)
+ if new:
+ violations.update(new)
+
+ if violations:
+ self.LogWarning("After the ipolicy change the following instances"
+ " violate them: %s",
+ utils.CommaJoin(violations))
+
if self.op.nicparams:
utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
else:
self.new_hvparams[hv_name].update(hv_dict)
+ # disk template parameters
+ self.new_diskparams = objects.FillDict(cluster.diskparams, {})
+ if self.op.diskparams:
+ for dt_name, dt_params in self.op.diskparams.items():
+ if dt_name not in self.op.diskparams:
+ self.new_diskparams[dt_name] = dt_params
+ else:
+ self.new_diskparams[dt_name].update(dt_params)
+
# os hypervisor parameters
self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
if self.op.os_hvp:
self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
if self.op.nicparams:
self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
+ if self.op.ipolicy:
+ self.cluster.ipolicy = self.new_ipolicy
if self.op.osparams:
self.cluster.osparams = self.new_osp
if self.op.ndparams:
self.cluster.ndparams = self.new_ndparams
+ if self.op.diskparams:
+ self.cluster.diskparams = self.new_diskparams
+ if self.op.hv_state:
+ self.cluster.hv_state_static = self.new_hv_state
+ if self.op.disk_state:
+ self.cluster.disk_state_static = self.new_disk_state
if self.op.candidate_pool_size is not None:
self.cluster.candidate_pool_size = self.op.candidate_pool_size
raise errors.OpPrereqError("Node is the master node, failover to another"
" node is required", errors.ECODE_INVAL)
- for instance_name, instance in self.cfg.GetAllInstancesInfo():
+ for instance_name, instance in self.cfg.GetAllInstancesInfo().items():
if node.name in instance.all_nodes:
raise errors.OpPrereqError("Instance %s is still running on the node,"
" please remove first" % instance_name,
# filter out non-vm_capable nodes
toquery_nodes = [name for name in nodenames if all_info[name].vm_capable]
- node_data = lu.rpc.call_node_info(toquery_nodes, lu.cfg.GetVGName(),
- lu.cfg.GetHypervisorType())
- live_data = dict((name, nresult.payload)
+ node_data = lu.rpc.call_node_info(toquery_nodes, [lu.cfg.GetVGName()],
+ [lu.cfg.GetHypervisorType()])
+ live_data = dict((name, _MakeLegacyNodeInfo(nresult.payload))
for (name, nresult) in node_data.items()
if not nresult.fail_msg and nresult.payload)
else:
if self.op.ndparams:
utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
+ if self.op.hv_state:
+ self.new_hv_state = _MergeAndVerifyHvState(self.op.hv_state, None)
+
+ if self.op.disk_state:
+ self.new_disk_state = _MergeAndVerifyDiskState(self.op.disk_state, None)
+
def Exec(self, feedback_fn):
"""Adds the new node to the cluster.
else:
new_node.ndparams = {}
+ if self.op.hv_state:
+ new_node.hv_state_static = self.new_hv_state
+
+ if self.op.disk_state:
+ new_node.disk_state_static = self.new_disk_state
+
# check connectivity
result = self.rpc.call_version([node])[node]
result.Raise("Can't get version information from node %s" % node)
self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
all_mods = [self.op.offline, self.op.master_candidate, self.op.drained,
self.op.master_capable, self.op.vm_capable,
- self.op.secondary_ip, self.op.ndparams]
+ self.op.secondary_ip, self.op.ndparams, self.op.hv_state,
+ self.op.disk_state]
if all_mods.count(None) == len(all_mods):
raise errors.OpPrereqError("Please pass at least one modification",
errors.ECODE_INVAL)
utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
self.new_ndparams = new_ndparams
+ if self.op.hv_state:
+ self.new_hv_state = _MergeAndVerifyHvState(self.op.hv_state,
+ self.node.hv_state_static)
+
+ if self.op.disk_state:
+ self.new_disk_state = \
+ _MergeAndVerifyDiskState(self.op.disk_state,
+ self.node.disk_state_static)
+
def Exec(self, feedback_fn):
"""Modifies a node.
if self.op.powered is not None:
node.powered = self.op.powered
+ if self.op.hv_state:
+ node.hv_state_static = self.new_hv_state
+
+ if self.op.disk_state:
+ node.disk_state_static = self.new_disk_state
+
for attr in ["master_capable", "vm_capable"]:
val = getattr(self.op, attr)
if val is not None:
"architecture": (platform.architecture()[0], platform.machine()),
"name": cluster.cluster_name,
"master": cluster.master_node,
- "default_hypervisor": cluster.enabled_hypervisors[0],
+ "default_hypervisor": cluster.primary_hypervisor,
"enabled_hypervisors": cluster.enabled_hypervisors,
"hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
for hypervisor_name in cluster.enabled_hypervisors]),
"os_hvp": os_hvp,
"beparams": cluster.beparams,
"osparams": cluster.osparams,
+ "ipolicy": cluster.ipolicy,
"nicparams": cluster.nicparams,
"ndparams": cluster.ndparams,
"candidate_pool_size": cluster.candidate_pool_size,
@param requested: the amount of memory in MiB to check for
@type hypervisor_name: C{str}
@param hypervisor_name: the hypervisor to ask for memory stats
+ @rtype: integer
+ @return: node current free memory
@raise errors.OpPrereqError: if the node doesn't have enough memory, or
we cannot check the node
"""
- nodeinfo = lu.rpc.call_node_info([node], None, hypervisor_name)
+ nodeinfo = lu.rpc.call_node_info([node], None, [hypervisor_name])
nodeinfo[node].Raise("Can't get data from node %s" % node,
prereq=True, ecode=errors.ECODE_ENVIRON)
- free_mem = nodeinfo[node].payload.get("memory_free", None)
+ (_, _, (hv_info, )) = nodeinfo[node].payload
+
+ free_mem = hv_info.get("memory_free", None)
if not isinstance(free_mem, int):
raise errors.OpPrereqError("Can't compute free memory on node %s, result"
" was '%s'" % (node, free_mem),
" needed %s MiB, available %s MiB" %
(node, reason, requested, free_mem),
errors.ECODE_NORES)
+ return free_mem
def _CheckNodesFreeDiskPerVG(lu, nodenames, req_sizes):
or we cannot check the node
"""
- nodeinfo = lu.rpc.call_node_info(nodenames, vg, None)
+ nodeinfo = lu.rpc.call_node_info(nodenames, [vg], None)
for node in nodenames:
info = nodeinfo[node]
info.Raise("Cannot get current information from node %s" % node,
prereq=True, ecode=errors.ECODE_ENVIRON)
- vg_free = info.payload.get("vg_free", None)
+ (_, (vg_info, ), _) = info.payload
+ vg_free = vg_info.get("vg_free", None)
if not isinstance(vg_free, int):
raise errors.OpPrereqError("Can't compute free disk space on node"
" %s for vg %s, result was '%s'" %
or we cannot check the node
"""
- nodeinfo = lu.rpc.call_node_info(nodenames, None, hypervisor_name)
+ nodeinfo = lu.rpc.call_node_info(nodenames, None, [hypervisor_name])
for node in nodenames:
info = nodeinfo[node]
info.Raise("Cannot get current information from node %s" % node,
prereq=True, ecode=errors.ECODE_ENVIRON)
- num_cpus = info.payload.get("cpu_total", None)
+ (_, _, (hv_info, )) = info.payload
+ num_cpus = hv_info.get("cpu_total", None)
if not isinstance(num_cpus, int):
raise errors.OpPrereqError("Can't compute the number of physical CPUs"
" on node %s, result was '%s'" %
# extra beparams
if self.op.beparams:
# fill the beparams dict
+ objects.UpgradeBeParams(self.op.beparams)
utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
def ExpandNames(self):
self._ExpandAndLockInstance()
+ self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
+
+ def DeclareLocks(self, level):
+ if level == locking.LEVEL_NODE_RES:
+ self._LockInstancesNodes(primary_only=True, level=locking.LEVEL_NODE_RES)
def BuildHooksEnv(self):
"""Build hooks env.
_CheckNodeOnline(self, instance.primary_node)
bep = self.cfg.GetClusterInfo().FillBE(instance)
+ bep.update(self.op.beparams)
# check bridges existence
_CheckInstanceBridgesExist(self, instance)
if not remote_info.payload: # not running already
_CheckNodeFreeMemory(self, instance.primary_node,
"starting instance %s" % instance.name,
- bep[constants.BE_MAXMEM], instance.hypervisor)
+ bep[constants.BE_MINMEM], instance.hypervisor)
def Exec(self, feedback_fn):
"""Start the instance.
HTYPE = constants.HTYPE_INSTANCE
REQ_BGL = False
+ _MODIFYABLE = frozenset([
+ constants.IDISK_SIZE,
+ constants.IDISK_MODE,
+ ])
+
+ # New or changed disk parameters may have different semantics
+ assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
+ constants.IDISK_ADOPT,
+
+ # TODO: Implement support changing VG while recreating
+ constants.IDISK_VG,
+ constants.IDISK_METAVG,
+ ]))
+
def CheckArguments(self):
- # normalise the disk list
- self.op.disks = sorted(frozenset(self.op.disks))
+ if self.op.disks and ht.TPositiveInt(self.op.disks[0]):
+ # Normalize and convert deprecated list of disk indices
+ self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))]
+
+ duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks))
+ if duplicates:
+ raise errors.OpPrereqError("Some disks have been specified more than"
+ " once: %s" % utils.CommaJoin(duplicates),
+ errors.ECODE_INVAL)
+
+ for (idx, params) in self.op.disks:
+ utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
+ unsupported = frozenset(params.keys()) - self._MODIFYABLE
+ if unsupported:
+ raise errors.OpPrereqError("Parameters for disk %s try to change"
+ " unmodifyable parameter(s): %s" %
+ (idx, utils.CommaJoin(unsupported)),
+ errors.ECODE_INVAL)
def ExpandNames(self):
self._ExpandAndLockInstance()
self.needed_locks[locking.LEVEL_NODE] = list(self.op.nodes)
else:
self.needed_locks[locking.LEVEL_NODE] = []
+ self.needed_locks[locking.LEVEL_NODE_RES] = []
def DeclareLocks(self, level):
if level == locking.LEVEL_NODE:
if instance.disk_template == constants.DT_DISKLESS:
raise errors.OpPrereqError("Instance '%s' has no disks" %
self.op.instance_name, errors.ECODE_INVAL)
+
# if we replace nodes *and* the old primary is offline, we don't
# check
assert instance.primary_node in self.owned_locks(locking.LEVEL_NODE)
_CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
msg="cannot recreate disks")
- if not self.op.disks:
- self.op.disks = range(len(instance.disks))
+ if self.op.disks:
+ self.disks = dict(self.op.disks)
else:
- for idx in self.op.disks:
- if idx >= len(instance.disks):
- raise errors.OpPrereqError("Invalid disk index '%s'" % idx,
- errors.ECODE_INVAL)
- if self.op.disks != range(len(instance.disks)) and self.op.nodes:
+ self.disks = dict((idx, {}) for idx in range(len(instance.disks)))
+
+ maxidx = max(self.disks.keys())
+ if maxidx >= len(instance.disks):
+ raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
+ errors.ECODE_INVAL)
+
+ if (self.op.nodes and
+ sorted(self.disks.keys()) != range(len(instance.disks))):
raise errors.OpPrereqError("Can't recreate disks partially and"
" change the nodes at the same time",
errors.ECODE_INVAL)
+
self.instance = instance
def Exec(self, feedback_fn):
self.owned_locks(locking.LEVEL_NODE_RES))
to_skip = []
- mods = [] # keeps track of needed logical_id changes
+ mods = [] # keeps track of needed changes
for idx, disk in enumerate(instance.disks):
- if idx not in self.op.disks: # disk idx has not been passed in
+ try:
+ changes = self.disks[idx]
+ except KeyError:
+ # Disk should not be recreated
to_skip.append(idx)
continue
+
# update secondaries for disks, if needed
- if self.op.nodes:
- if disk.dev_type == constants.LD_DRBD8:
- # need to update the nodes and minors
- assert len(self.op.nodes) == 2
- assert len(disk.logical_id) == 6 # otherwise disk internals
- # have changed
- (_, _, old_port, _, _, old_secret) = disk.logical_id
- new_minors = self.cfg.AllocateDRBDMinor(self.op.nodes, instance.name)
- new_id = (self.op.nodes[0], self.op.nodes[1], old_port,
- new_minors[0], new_minors[1], old_secret)
- assert len(disk.logical_id) == len(new_id)
- mods.append((idx, new_id))
+ if self.op.nodes and disk.dev_type == constants.LD_DRBD8:
+ # need to update the nodes and minors
+ assert len(self.op.nodes) == 2
+ assert len(disk.logical_id) == 6 # otherwise disk internals
+ # have changed
+ (_, _, old_port, _, _, old_secret) = disk.logical_id
+ new_minors = self.cfg.AllocateDRBDMinor(self.op.nodes, instance.name)
+ new_id = (self.op.nodes[0], self.op.nodes[1], old_port,
+ new_minors[0], new_minors[1], old_secret)
+ assert len(disk.logical_id) == len(new_id)
+ else:
+ new_id = None
+
+ mods.append((idx, new_id, changes))
# now that we have passed all asserts above, we can apply the mods
# in a single run (to avoid partial changes)
- for idx, new_id in mods:
- instance.disks[idx].logical_id = new_id
+ for idx, new_id, changes in mods:
+ disk = instance.disks[idx]
+ if new_id is not None:
+ assert disk.dev_type == constants.LD_DRBD8
+ disk.logical_id = new_id
+ if changes:
+ disk.Update(size=changes.get(constants.IDISK_SIZE, None),
+ mode=changes.get(constants.IDISK_MODE, None))
# change primary node, if needed
if self.op.nodes:
self.needed_locks[locking.LEVEL_NODE] = []
self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+ self.needed_locks[locking.LEVEL_NODE_RES] = []
+ self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
+
ignore_consistency = self.op.ignore_consistency
shutdown_timeout = self.op.shutdown_timeout
self._migrater = TLMigrateInstance(self, self.op.instance_name,
cleanup=False,
failover=True,
ignore_consistency=ignore_consistency,
- shutdown_timeout=shutdown_timeout)
+ shutdown_timeout=shutdown_timeout,
+ ignore_ipolicy=self.op.ignore_ipolicy)
self.tasklets = [self._migrater]
def DeclareLocks(self, level):
del self.recalculate_locks[locking.LEVEL_NODE]
else:
self._LockInstancesNodes()
+ elif level == locking.LEVEL_NODE_RES:
+ # Copy node locks
+ self.needed_locks[locking.LEVEL_NODE_RES] = \
+ self.needed_locks[locking.LEVEL_NODE][:]
def BuildHooksEnv(self):
"""Build hooks env.
self.needed_locks[locking.LEVEL_NODE] = []
self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
- self._migrater = TLMigrateInstance(self, self.op.instance_name,
- cleanup=self.op.cleanup,
- failover=False,
- fallback=self.op.allow_failover)
+ self.needed_locks[locking.LEVEL_NODE] = []
+ self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+
+ self._migrater = \
+ TLMigrateInstance(self, self.op.instance_name,
+ cleanup=self.op.cleanup,
+ failover=False,
+ fallback=self.op.allow_failover,
+ allow_runtime_changes=self.op.allow_runtime_changes,
+ ignore_ipolicy=self.op.ignore_ipolicy)
self.tasklets = [self._migrater]
def DeclareLocks(self, level):
del self.recalculate_locks[locking.LEVEL_NODE]
else:
self._LockInstancesNodes()
+ elif level == locking.LEVEL_NODE_RES:
+ # Copy node locks
+ self.needed_locks[locking.LEVEL_NODE_RES] = \
+ self.needed_locks[locking.LEVEL_NODE][:]
def BuildHooksEnv(self):
"""Build hooks env.
"MIGRATE_CLEANUP": self.op.cleanup,
"OLD_PRIMARY": source_node,
"NEW_PRIMARY": target_node,
+ "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
})
if instance.disk_template in constants.DTS_INT_MIRROR:
_CheckNodeOnline(self, target_node)
_CheckNodeNotDrained(self, target_node)
_CheckNodeVmCapable(self, target_node)
+ ipolicy = _CalculateGroupIPolicy(self.cfg.GetClusterInfo(),
+ self.cfg.GetNodeGroup(node.group))
+ _CheckTargetNodeIPolicy(self, ipolicy, instance, node,
+ ignore=self.op.ignore_ipolicy)
if instance.admin_state == constants.ADMINST_UP:
# check memory requirements on the secondary node
"""
return {
"NODE_NAME": self.op.node_name,
+ "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
}
def BuildHooksNodes(self):
def Exec(self, feedback_fn):
# Prepare jobs for migration instances
+ allow_runtime_changes = self.op.allow_runtime_changes
jobs = [
[opcodes.OpInstanceMigrate(instance_name=inst.name,
mode=self.op.mode,
live=self.op.live,
iallocator=self.op.iallocator,
- target_node=self.op.target_node)]
+ target_node=self.op.target_node,
+ allow_runtime_changes=allow_runtime_changes,
+ ignore_ipolicy=self.op.ignore_ipolicy)]
for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name)
]
and target node
@type shutdown_timeout: int
@ivar shutdown_timeout: In case of failover timeout of the shutdown
+ @type ignore_ipolicy: bool
+ @ivar ignore_ipolicy: If true, we can ignore instance policy when migrating
"""
def __init__(self, lu, instance_name, cleanup=False,
failover=False, fallback=False,
ignore_consistency=False,
- shutdown_timeout=constants.DEFAULT_SHUTDOWN_TIMEOUT):
+ allow_runtime_changes=True,
+ shutdown_timeout=constants.DEFAULT_SHUTDOWN_TIMEOUT,
+ ignore_ipolicy=False):
"""Initializes this class.
"""
self.fallback = fallback
self.ignore_consistency = ignore_consistency
self.shutdown_timeout = shutdown_timeout
+ self.ignore_ipolicy = ignore_ipolicy
+ self.allow_runtime_changes = allow_runtime_changes
def CheckPrereq(self):
"""Check prerequisites.
instance = self.cfg.GetInstanceInfo(instance_name)
assert instance is not None
self.instance = instance
+ cluster = self.cfg.GetClusterInfo()
if (not self.cleanup and
not instance.admin_state == constants.ADMINST_UP and
# BuildHooksEnv
self.target_node = self.lu.op.target_node
+ # Check that the target node is correct in terms of instance policy
+ nodeinfo = self.cfg.GetNodeInfo(self.target_node)
+ group_info = self.cfg.GetNodeGroup(nodeinfo.group)
+ ipolicy = _CalculateGroupIPolicy(cluster, group_info)
+ _CheckTargetNodeIPolicy(self.lu, ipolicy, instance, nodeinfo,
+ ignore=self.ignore_ipolicy)
+
# self.target_node is already populated, either directly or by the
# iallocator run
target_node = self.target_node
" node can be passed)" %
(instance.disk_template, text),
errors.ECODE_INVAL)
+ nodeinfo = self.cfg.GetNodeInfo(target_node)
+ group_info = self.cfg.GetNodeGroup(nodeinfo.group)
+ ipolicy = _CalculateGroupIPolicy(cluster, group_info)
+ _CheckTargetNodeIPolicy(self.lu, ipolicy, instance, nodeinfo,
+ ignore=self.ignore_ipolicy)
- i_be = self.cfg.GetClusterInfo().FillBE(instance)
+ i_be = cluster.FillBE(instance)
# check memory requirements on the secondary node
- if not self.failover or instance.admin_state == constants.ADMINST_UP:
- _CheckNodeFreeMemory(self.lu, target_node, "migrating instance %s" %
- instance.name, i_be[constants.BE_MAXMEM],
- instance.hypervisor)
+ if (not self.cleanup and
+ (not self.failover or instance.admin_state == constants.ADMINST_UP)):
+ self.tgt_free_mem = _CheckNodeFreeMemory(self.lu, target_node,
+ "migrating instance %s" %
+ instance.name,
+ i_be[constants.BE_MINMEM],
+ instance.hypervisor)
else:
self.lu.LogInfo("Not checking memory on the secondary node as"
" instance will not be started")
+ # check if failover must be forced instead of migration
+ if (not self.cleanup and not self.failover and
+ i_be[constants.BE_ALWAYS_FAILOVER]):
+ if self.fallback:
+ self.lu.LogInfo("Instance configured to always failover; fallback"
+ " to failover")
+ self.failover = True
+ else:
+ raise errors.OpPrereqError("This instance has been configured to"
+ " always failover, please allow failover",
+ errors.ECODE_STATE)
+
# check bridge existance
_CheckInstanceBridgesExist(self.lu, instance, node=target_node)
self.lu.op.live = None
elif self.lu.op.mode is None:
# read the default value from the hypervisor
- i_hv = self.cfg.GetClusterInfo().FillHV(self.instance,
- skip_globals=False)
+ i_hv = cluster.FillHV(self.instance, skip_globals=False)
self.lu.op.mode = i_hv[constants.HV_MIGRATION_MODE]
self.live = self.lu.op.mode == constants.HT_MIGRATION_LIVE
# Failover is never live
self.live = False
+ if not (self.failover or self.cleanup):
+ remote_info = self.rpc.call_instance_info(instance.primary_node,
+ instance.name,
+ instance.hypervisor)
+ remote_info.Raise("Error checking instance on node %s" %
+ instance.primary_node)
+ instance_running = bool(remote_info.payload)
+ if instance_running:
+ self.current_mem = int(remote_info.payload["memory"])
+
def _RunAllocator(self):
"""Run the allocator based on input opcode.
"""
+ # FIXME: add a self.ignore_ipolicy option
ial = IAllocator(self.cfg, self.rpc,
mode=constants.IALLOCATOR_MODE_RELOC,
name=self.instance_name,
# Check for hypervisor version mismatch and warn the user.
nodeinfo = self.rpc.call_node_info([source_node, target_node],
- None, self.instance.hypervisor)
- src_info = nodeinfo[source_node]
- dst_info = nodeinfo[target_node]
-
- if ((constants.HV_NODEINFO_KEY_VERSION in src_info.payload) and
- (constants.HV_NODEINFO_KEY_VERSION in dst_info.payload)):
- src_version = src_info.payload[constants.HV_NODEINFO_KEY_VERSION]
- dst_version = dst_info.payload[constants.HV_NODEINFO_KEY_VERSION]
+ None, [self.instance.hypervisor])
+ for ninfo in nodeinfo.values():
+ ninfo.Raise("Unable to retrieve node information from node '%s'" %
+ ninfo.node)
+ (_, _, (src_info, )) = nodeinfo[source_node].payload
+ (_, _, (dst_info, )) = nodeinfo[target_node].payload
+
+ if ((constants.HV_NODEINFO_KEY_VERSION in src_info) and
+ (constants.HV_NODEINFO_KEY_VERSION in dst_info)):
+ src_version = src_info[constants.HV_NODEINFO_KEY_VERSION]
+ dst_version = dst_info[constants.HV_NODEINFO_KEY_VERSION]
if src_version != dst_version:
self.feedback_fn("* warning: hypervisor version mismatch between"
" source (%s) and target (%s) node" %
" synchronized on target node,"
" aborting migration" % dev.iv_name)
+ if self.current_mem > self.tgt_free_mem:
+ if not self.allow_runtime_changes:
+ raise errors.OpExecError("Memory ballooning not allowed and not enough"
+ " free memory to fit instance %s on target"
+ " node %s (have %dMB, need %dMB)" %
+ (instance.name, target_node,
+ self.tgt_free_mem, self.current_mem))
+ self.feedback_fn("* setting instance memory to %s" % self.tgt_free_mem)
+ rpcres = self.rpc.call_instance_balloon_memory(instance.primary_node,
+ instance,
+ self.tgt_free_mem)
+ rpcres.Raise("Cannot modify instance runtime memory")
+
# First get the migration information from the remote node
result = self.rpc.call_migration_info(source_node, instance)
msg = result.fail_msg
self._GoReconnect(False)
self._WaitUntilSync()
+ # If the instance's disk template is `rbd' and there was a successful
+ # migration, unmap the device from the source node.
+ if self.instance.disk_template == constants.DT_RBD:
+ disks = _ExpandCheckDisks(instance, instance.disks)
+ self.feedback_fn("* unmapping instance's disks from %s" % source_node)
+ for disk in disks:
+ result = self.rpc.call_blockdev_shutdown(source_node, disk)
+ msg = result.fail_msg
+ if msg:
+ logging.error("Migration was successful, but couldn't unmap the"
+ " block device %s on source node %s: %s",
+ disk.iv_name, source_node, msg)
+ logging.error("You need to unmap the device %s manually on %s",
+ disk.iv_name, source_node)
+
self.feedback_fn("* done")
def _ExecFailover(self):
return results
+def _ComputeLDParams(disk_template, disk_params):
+ """Computes Logical Disk parameters from Disk Template parameters.
+
+ @type disk_template: string
+ @param disk_template: disk template, one of L{constants.DISK_TEMPLATES}
+ @type disk_params: dict
+ @param disk_params: disk template parameters; dict(template_name -> parameters
+ @rtype: list(dict)
+ @return: a list of dicts, one for each node of the disk hierarchy. Each dict
+ contains the LD parameters of the node. The tree is flattened in-order.
+
+ """
+ if disk_template not in constants.DISK_TEMPLATES:
+ raise errors.ProgrammerError("Unknown disk template %s" % disk_template)
+
+ result = list()
+ dt_params = disk_params[disk_template]
+ if disk_template == constants.DT_DRBD8:
+ drbd_params = {
+ constants.LDP_RESYNC_RATE: dt_params[constants.DRBD_RESYNC_RATE],
+ constants.LDP_BARRIERS: dt_params[constants.DRBD_DISK_BARRIERS],
+ constants.LDP_NO_META_FLUSH: dt_params[constants.DRBD_META_BARRIERS],
+ constants.LDP_DEFAULT_METAVG: dt_params[constants.DRBD_DEFAULT_METAVG],
+ constants.LDP_DISK_CUSTOM: dt_params[constants.DRBD_DISK_CUSTOM],
+ constants.LDP_NET_CUSTOM: dt_params[constants.DRBD_NET_CUSTOM],
+ constants.LDP_DYNAMIC_RESYNC: dt_params[constants.DRBD_DYNAMIC_RESYNC],
+ constants.LDP_PLAN_AHEAD: dt_params[constants.DRBD_PLAN_AHEAD],
+ constants.LDP_FILL_TARGET: dt_params[constants.DRBD_FILL_TARGET],
+ constants.LDP_DELAY_TARGET: dt_params[constants.DRBD_DELAY_TARGET],
+ constants.LDP_MAX_RATE: dt_params[constants.DRBD_MAX_RATE],
+ constants.LDP_MIN_RATE: dt_params[constants.DRBD_MIN_RATE],
+ }
+
+ drbd_params = \
+ objects.FillDict(constants.DISK_LD_DEFAULTS[constants.LD_DRBD8],
+ drbd_params)
+
+ result.append(drbd_params)
+
+ # data LV
+ data_params = {
+ constants.LDP_STRIPES: dt_params[constants.DRBD_DATA_STRIPES],
+ }
+ data_params = \
+ objects.FillDict(constants.DISK_LD_DEFAULTS[constants.LD_LV],
+ data_params)
+ result.append(data_params)
+
+ # metadata LV
+ meta_params = {
+ constants.LDP_STRIPES: dt_params[constants.DRBD_META_STRIPES],
+ }
+ meta_params = \
+ objects.FillDict(constants.DISK_LD_DEFAULTS[constants.LD_LV],
+ meta_params)
+ result.append(meta_params)
+
+ elif (disk_template == constants.DT_FILE or
+ disk_template == constants.DT_SHARED_FILE):
+ result.append(constants.DISK_LD_DEFAULTS[constants.LD_FILE])
+
+ elif disk_template == constants.DT_PLAIN:
+ params = {
+ constants.LDP_STRIPES: dt_params[constants.LV_STRIPES],
+ }
+ params = \
+ objects.FillDict(constants.DISK_LD_DEFAULTS[constants.LD_LV],
+ params)
+ result.append(params)
+
+ elif disk_template == constants.DT_BLOCK:
+ result.append(constants.DISK_LD_DEFAULTS[constants.LD_BLOCKDEV])
+
+ elif disk_template == constants.DT_RBD:
+ params = {
+ constants.LDP_POOL: dt_params[constants.RBD_POOL]
+ }
+ params = \
+ objects.FillDict(constants.DISK_LD_DEFAULTS[constants.LD_RBD],
+ params)
+ result.append(params)
+
+ return result
+
+
def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names,
- iv_name, p_minor, s_minor):
+ iv_name, p_minor, s_minor, drbd_params, data_params,
+ meta_params):
"""Generate a drbd8 device complete with its children.
"""
assert len(vgnames) == len(names) == 2
port = lu.cfg.AllocatePort()
shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
+
dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
- logical_id=(vgnames[0], names[0]))
+ logical_id=(vgnames[0], names[0]),
+ params=data_params)
dev_meta = objects.Disk(dev_type=constants.LD_LV, size=DRBD_META_SIZE,
- logical_id=(vgnames[1], names[1]))
+ logical_id=(vgnames[1], names[1]),
+ params=meta_params)
drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
logical_id=(primary, secondary, port,
p_minor, s_minor,
shared_secret),
children=[dev_data, dev_meta],
- iv_name=iv_name)
+ iv_name=iv_name, params=drbd_params)
return drbd_dev
instance_name, primary_node,
secondary_nodes, disk_info,
file_storage_dir, file_driver,
- base_index, feedback_fn):
+ base_index, feedback_fn, disk_params):
"""Generate the entire disk layout for a given template type.
"""
vgname = lu.cfg.GetVGName()
disk_count = len(disk_info)
disks = []
+ ld_params = _ComputeLDParams(template_name, disk_params)
if template_name == constants.DT_DISKLESS:
pass
elif template_name == constants.DT_PLAIN:
- if len(secondary_nodes) != 0:
+ if secondary_nodes:
raise errors.ProgrammerError("Wrong template configuration")
names = _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
size=disk[constants.IDISK_SIZE],
logical_id=(vg, names[idx]),
iv_name="disk/%d" % disk_index,
- mode=disk[constants.IDISK_MODE])
+ mode=disk[constants.IDISK_MODE],
+ params=ld_params[0])
disks.append(disk_dev)
elif template_name == constants.DT_DRBD8:
+ drbd_params, data_params, meta_params = ld_params
if len(secondary_nodes) != 1:
raise errors.ProgrammerError("Wrong template configuration")
remote_node = secondary_nodes[0]
names.append(lv_prefix + "_meta")
for idx, disk in enumerate(disk_info):
disk_index = idx + base_index
+ drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
data_vg = disk.get(constants.IDISK_VG, vgname)
- meta_vg = disk.get(constants.IDISK_METAVG, data_vg)
+ meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
disk[constants.IDISK_SIZE],
[data_vg, meta_vg],
names[idx * 2:idx * 2 + 2],
"disk/%d" % disk_index,
- minors[idx * 2], minors[idx * 2 + 1])
+ minors[idx * 2], minors[idx * 2 + 1],
+ drbd_params, data_params, meta_params)
disk_dev.mode = disk[constants.IDISK_MODE]
disks.append(disk_dev)
elif template_name == constants.DT_FILE:
- if len(secondary_nodes) != 0:
+ if secondary_nodes:
raise errors.ProgrammerError("Wrong template configuration")
opcodes.RequireFileStorage()
logical_id=(file_driver,
"%s/disk%d" % (file_storage_dir,
disk_index)),
- mode=disk[constants.IDISK_MODE])
+ mode=disk[constants.IDISK_MODE],
+ params=ld_params[0])
disks.append(disk_dev)
elif template_name == constants.DT_SHARED_FILE:
- if len(secondary_nodes) != 0:
+ if secondary_nodes:
raise errors.ProgrammerError("Wrong template configuration")
opcodes.RequireSharedFileStorage()
logical_id=(file_driver,
"%s/disk%d" % (file_storage_dir,
disk_index)),
- mode=disk[constants.IDISK_MODE])
+ mode=disk[constants.IDISK_MODE],
+ params=ld_params[0])
disks.append(disk_dev)
elif template_name == constants.DT_BLOCK:
- if len(secondary_nodes) != 0:
+ if secondary_nodes:
raise errors.ProgrammerError("Wrong template configuration")
for idx, disk in enumerate(disk_info):
logical_id=(constants.BLOCKDEV_DRIVER_MANUAL,
disk[constants.IDISK_ADOPT]),
iv_name="disk/%d" % disk_index,
- mode=disk[constants.IDISK_MODE])
+ mode=disk[constants.IDISK_MODE],
+ params=ld_params[0])
+ disks.append(disk_dev)
+ elif template_name == constants.DT_RBD:
+ if secondary_nodes:
+ raise errors.ProgrammerError("Wrong template configuration")
+
+ names = _GenerateUniqueNames(lu, [".rbd.disk%d" % (base_index + i)
+ for i in range(disk_count)])
+
+ for idx, disk in enumerate(disk_info):
+ disk_index = idx + base_index
+ disk_dev = objects.Disk(dev_type=constants.LD_RBD,
+ size=disk[constants.IDISK_SIZE],
+ logical_id=("rbd", names[idx]),
+ iv_name="disk/%d" % disk_index,
+ mode=disk[constants.IDISK_MODE],
+ params=ld_params[0])
disks.append(disk_dev)
else:
constants.DT_FILE: None,
constants.DT_SHARED_FILE: 0,
constants.DT_BLOCK: 0,
+ constants.DT_RBD: 0,
}
if disk_template not in req_size_dict:
# pylint: disable=W0142
self.instance_file_storage_dir = utils.PathJoin(*joinargs)
- def CheckPrereq(self):
+ def CheckPrereq(self): # pylint: disable=R0914
"""Check prerequisites.
"""
for param, value in self.op.beparams.iteritems():
if value == constants.VALUE_AUTO:
self.op.beparams[param] = default_beparams[param]
+ objects.UpgradeBeParams(self.op.beparams)
utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
self.be_full = cluster.SimpleFillBE(self.op.beparams)
constants.IDISK_SIZE: size,
constants.IDISK_MODE: mode,
constants.IDISK_VG: data_vg,
- constants.IDISK_METAVG: disk.get(constants.IDISK_METAVG, data_vg),
}
+ if constants.IDISK_METAVG in disk:
+ new_disk[constants.IDISK_METAVG] = disk[constants.IDISK_METAVG]
if constants.IDISK_ADOPT in disk:
new_disk[constants.IDISK_ADOPT] = disk[constants.IDISK_ADOPT]
self.disks.append(new_disk)
_ReleaseLocks(self, locking.LEVEL_NODE,
keep=filter(None, [self.op.pnode, self.op.snode,
self.op.src_node]))
+ _ReleaseLocks(self, locking.LEVEL_NODE_RES,
+ keep=filter(None, [self.op.pnode, self.op.snode,
+ self.op.src_node]))
#### node related checks
_CheckNodeVmCapable(self, self.op.snode)
self.secondaries.append(self.op.snode)
+ snode = self.cfg.GetNodeInfo(self.op.snode)
+ if pnode.group != snode.group:
+ self.LogWarning("The primary and secondary nodes are in two"
+ " different node groups; the disk parameters"
+ " from the first disk's node group will be"
+ " used")
+
nodenames = [pnode.name] + self.secondaries
+ # Verify instance specs
+ ispec = {
+ constants.ISPEC_MEM_SIZE: self.be_full.get(constants.BE_MAXMEM, None),
+ constants.ISPEC_CPU_COUNT: self.be_full.get(constants.BE_VCPUS, None),
+ constants.ISPEC_DISK_COUNT: len(self.disks),
+ constants.ISPEC_DISK_SIZE: [disk["size"] for disk in self.disks],
+ constants.ISPEC_NIC_COUNT: len(self.nics),
+ }
+
+ group_info = self.cfg.GetNodeGroup(pnode.group)
+ ipolicy = _CalculateGroupIPolicy(cluster, group_info)
+ res = _ComputeIPolicyInstanceSpecViolation(ipolicy, ispec)
+ if not self.op.ignore_ipolicy and res:
+ raise errors.OpPrereqError(("Instance allocation to group %s violates"
+ " policy: %s") % (pnode.group,
+ utils.CommaJoin(res)),
+ errors.ECODE_INVAL)
+
+ # disk parameters (not customizable at instance or node level)
+ # just use the primary node parameters, ignoring the secondary.
+ self.diskparams = group_info.diskparams
+
if not self.adopt_disks:
- # Check lv size requirements, if not adopting
- req_sizes = _ComputeDiskSizePerVG(self.op.disk_template, self.disks)
- _CheckNodesFreeDiskPerVG(self, nodenames, req_sizes)
+ if self.op.disk_template == constants.DT_RBD:
+ # _CheckRADOSFreeSpace() is just a placeholder.
+ # Any function that checks prerequisites can be placed here.
+ # Check if there is enough space on the RADOS cluster.
+ _CheckRADOSFreeSpace()
+ else:
+ # Check lv size requirements, if not adopting
+ req_sizes = _ComputeDiskSizePerVG(self.op.disk_template, self.disks)
+ _CheckNodesFreeDiskPerVG(self, nodenames, req_sizes)
elif self.op.disk_template == constants.DT_PLAIN: # Check the adoption data
all_lvs = set(["%s/%s" % (disk[constants.IDISK_VG],
self.instance_file_storage_dir,
self.op.file_driver,
0,
- feedback_fn)
+ feedback_fn,
+ self.diskparams)
iobj = objects.Instance(name=instance, os=self.op.os_type,
primary_node=pnode_name,
return list(iobj.all_nodes)
+def _CheckRADOSFreeSpace():
+ """Compute disk size requirements inside the RADOS cluster.
+
+ """
+ # For the RADOS cluster we assume there is always enough space.
+ pass
+
+
class LUInstanceConsole(NoHooksLU):
"""Connect to an instance's console.
self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
self.op.iallocator, self.op.remote_node,
- self.op.disks, False, self.op.early_release)
+ self.op.disks, False, self.op.early_release,
+ self.op.ignore_ipolicy)
self.tasklets = [self.replacer]
"""
def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
- disks, delay_iallocator, early_release):
+ disks, delay_iallocator, early_release, ignore_ipolicy):
"""Initializes this class.
"""
self.disks = disks
self.delay_iallocator = delay_iallocator
self.early_release = early_release
+ self.ignore_ipolicy = ignore_ipolicy
# Runtime data
self.instance = None
if not self.disks:
self.disks = range(len(self.instance.disks))
+ # TODO: This is ugly, but right now we can't distinguish between internal
+ # submitted opcode and external one. We should fix that.
+ if self.remote_node_info:
+ # We change the node, lets verify it still meets instance policy
+ new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group)
+ ipolicy = _CalculateGroupIPolicy(self.cfg.GetClusterInfo(),
+ new_group_info)
+ _CheckTargetNodeIPolicy(self, ipolicy, instance, self.remote_node_info,
+ ignore=self.ignore_ipolicy)
+
+ # TODO: compute disk parameters
+ primary_node_info = self.cfg.GetNodeInfo(instance.primary_node)
+ secondary_node_info = self.cfg.GetNodeInfo(secondary_node)
+ if primary_node_info.group != secondary_node_info.group:
+ self.lu.LogInfo("The instance primary and secondary nodes are in two"
+ " different node groups; the disk parameters of the"
+ " primary node's group will be applied.")
+
+ self.diskparams = self.cfg.GetNodeGroup(primary_node_info.group).diskparams
+
for node in check_nodes:
_CheckNodeOnline(self.lu, node)
lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
names = _GenerateUniqueNames(self.lu, lv_names)
+ _, data_p, meta_p = _ComputeLDParams(constants.DT_DRBD8, self.diskparams)
+
vg_data = dev.children[0].logical_id[0]
lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
- logical_id=(vg_data, names[0]))
+ logical_id=(vg_data, names[0]), params=data_p)
vg_meta = dev.children[1].logical_id[0]
lv_meta = objects.Disk(dev_type=constants.LD_LV, size=DRBD_META_SIZE,
- logical_id=(vg_meta, names[1]))
+ logical_id=(vg_meta, names[1]), params=meta_p)
new_lvs = [lv_data, lv_meta]
old_lvs = [child.Copy() for child in dev.children]
iv_names[idx] = (dev, dev.children, new_net_id)
logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
new_net_id)
+ drbd_params, _, _ = _ComputeLDParams(constants.DT_DRBD8, self.diskparams)
new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
logical_id=new_alone_id,
children=dev.children,
- size=dev.size)
+ size=dev.size,
+ params=drbd_params)
try:
_CreateSingleBlockDev(self.lu, self.new_node, self.instance, new_drbd,
_GetInstanceInfoText(self.instance), False)
"""
REQ_BGL = False
+ _MODE2IALLOCATOR = {
+ constants.NODE_EVAC_PRI: constants.IALLOCATOR_NEVAC_PRI,
+ constants.NODE_EVAC_SEC: constants.IALLOCATOR_NEVAC_SEC,
+ constants.NODE_EVAC_ALL: constants.IALLOCATOR_NEVAC_ALL,
+ }
+ assert frozenset(_MODE2IALLOCATOR.keys()) == constants.NODE_EVAC_MODES
+ assert (frozenset(_MODE2IALLOCATOR.values()) ==
+ constants.IALLOCATOR_NEVAC_MODES)
+
def CheckArguments(self):
_CheckIAllocatorOrNode(self, "iallocator", "remote_node")
raise errors.OpPrereqError("Can not use evacuated node as a new"
" secondary node", errors.ECODE_INVAL)
- if self.op.mode != constants.IALLOCATOR_NEVAC_SEC:
+ if self.op.mode != constants.NODE_EVAC_SEC:
raise errors.OpPrereqError("Without the use of an iallocator only"
" secondary instances can be evacuated",
errors.ECODE_INVAL)
locking.LEVEL_NODE: [],
}
+ # Determine nodes (via group) optimistically, needs verification once locks
+ # have been acquired
+ self.lock_nodes = self._DetermineNodes()
+
+ def _DetermineNodes(self):
+ """Gets the list of nodes to operate on.
+
+ """
if self.op.remote_node is None:
# Iallocator will choose any node(s) in the same group
group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_name])
group_nodes = frozenset([self.op.remote_node])
# Determine nodes to be locked
- self.lock_nodes = set([self.op.node_name]) | group_nodes
+ return set([self.op.node_name]) | group_nodes
def _DetermineInstances(self):
"""Builds list of instances to operate on.
"""
- assert self.op.mode in constants.IALLOCATOR_NEVAC_MODES
+ assert self.op.mode in constants.NODE_EVAC_MODES
- if self.op.mode == constants.IALLOCATOR_NEVAC_PRI:
+ if self.op.mode == constants.NODE_EVAC_PRI:
# Primary instances only
inst_fn = _GetNodePrimaryInstances
assert self.op.remote_node is None, \
"Evacuating primary instances requires iallocator"
- elif self.op.mode == constants.IALLOCATOR_NEVAC_SEC:
+ elif self.op.mode == constants.NODE_EVAC_SEC:
# Secondary instances only
inst_fn = _GetNodeSecondaryInstances
else:
# All instances
- assert self.op.mode == constants.IALLOCATOR_NEVAC_ALL
+ assert self.op.mode == constants.NODE_EVAC_ALL
inst_fn = _GetNodeInstances
+ # TODO: In 2.6, change the iallocator interface to take an evacuation mode
+ # per instance
+ raise errors.OpPrereqError("Due to an issue with the iallocator"
+ " interface it is not possible to evacuate"
+ " all instances at once; specify explicitly"
+ " whether to evacuate primary or secondary"
+ " instances",
+ errors.ECODE_INVAL)
return inst_fn(self.cfg, self.op.node_name)
set(i.name for i in self._DetermineInstances())
elif level == locking.LEVEL_NODEGROUP:
- # Lock node groups optimistically, needs verification once nodes have
- # been acquired
+ # Lock node groups for all potential target nodes optimistically, needs
+ # verification once nodes have been acquired
self.needed_locks[locking.LEVEL_NODEGROUP] = \
self.cfg.GetNodeGroupsFromNodes(self.lock_nodes)
owned_nodes = self.owned_locks(locking.LEVEL_NODE)
owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
- assert owned_nodes == self.lock_nodes
+ need_nodes = self._DetermineNodes()
+
+ if not owned_nodes.issuperset(need_nodes):
+ raise errors.OpPrereqError("Nodes in same group as '%s' changed since"
+ " locks were acquired, current nodes are"
+ " are '%s', used to be '%s'; retry the"
+ " operation" %
+ (self.op.node_name,
+ utils.CommaJoin(need_nodes),
+ utils.CommaJoin(owned_nodes)),
+ errors.ECODE_STATE)
wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes)
if owned_groups != wanted_groups:
raise errors.OpExecError("Node groups changed since locks were acquired,"
- " current groups are '%s', used to be '%s'" %
+ " current groups are '%s', used to be '%s';"
+ " retry the operation" %
(utils.CommaJoin(wanted_groups),
utils.CommaJoin(owned_groups)))
if set(self.instance_names) != owned_instances:
raise errors.OpExecError("Instances on node '%s' changed since locks"
" were acquired, current instances are '%s',"
- " used to be '%s'" %
+ " used to be '%s'; retry the operation" %
(self.op.node_name,
utils.CommaJoin(self.instance_names),
utils.CommaJoin(owned_instances)))
elif self.op.iallocator is not None:
# TODO: Implement relocation to other group
ial = IAllocator(self.cfg, self.rpc, constants.IALLOCATOR_MODE_NODE_EVAC,
- evac_mode=self.op.mode,
+ evac_mode=self._MODE2IALLOCATOR[self.op.mode],
instances=list(self.instance_names))
ial.Run(self.op.iallocator)
jobs = _LoadNodeEvacResult(self, ial.result, self.op.early_release, True)
elif self.op.remote_node is not None:
- assert self.op.mode == constants.IALLOCATOR_NEVAC_SEC
+ assert self.op.mode == constants.NODE_EVAC_SEC
jobs = [
[opcodes.OpInstanceReplaceDisks(instance_name=instance_name,
remote_node=self.op.remote_node,
self._ExpandAndLockInstance()
self.needed_locks[locking.LEVEL_NODE] = []
self.needed_locks[locking.LEVEL_NODE_RES] = []
+ self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
def DeclareLocks(self, level):
self.disk = instance.FindDisk(self.op.disk)
if instance.disk_template not in (constants.DT_FILE,
- constants.DT_SHARED_FILE):
+ constants.DT_SHARED_FILE,
+ constants.DT_RBD):
# TODO: check the free disk space for file, when that feature will be
# supported
_CheckNodesFreeDiskPerVG(self, nodenames,
def CheckArguments(self):
if not (self.op.nics or self.op.disks or self.op.disk_template or
self.op.hvparams or self.op.beparams or self.op.os_name or
- self.op.online_inst or self.op.offline_inst):
+ self.op.online_inst or self.op.offline_inst or
+ self.op.runtime_mem):
raise errors.OpPrereqError("No changes submitted", errors.ECODE_INVAL)
if self.op.hvparams:
env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
if self.op.disk_template:
env["NEW_DISK_TEMPLATE"] = self.op.disk_template
+ if self.op.runtime_mem:
+ env["RUNTIME_MEMORY"] = self.op.runtime_mem
return env
"Cannot retrieve locked instance %s" % self.op.instance_name
pnode = instance.primary_node
nodelist = list(instance.all_nodes)
+ pnode_info = self.cfg.GetNodeInfo(pnode)
+ self.diskparams = self.cfg.GetNodeGroup(pnode_info.group).diskparams
# OS change
if self.op.os_name and not self.op.force:
required = _ComputeDiskSizePerVG(self.op.disk_template, disks)
_CheckNodesFreeDiskPerVG(self, [self.op.remote_node], required)
+ snode_info = self.cfg.GetNodeInfo(self.op.remote_node)
+ snode_group = self.cfg.GetNodeGroup(snode_info.group)
+ ipolicy = _CalculateGroupIPolicy(cluster, snode_group)
+ _CheckTargetNodeIPolicy(self, ipolicy, instance, snode_info,
+ ignore=self.op.ignore_ipolicy)
+ if pnode_info.group != snode_info.group:
+ self.LogWarning("The primary and secondary nodes are in two"
+ " different node groups; the disk parameters"
+ " from the first disk's node group will be"
+ " used")
+
# hvparams processing
if self.op.hvparams:
hv_type = instance.hypervisor
if self.op.beparams:
i_bedict = _GetUpdatedParams(instance.beparams, self.op.beparams,
use_none=True)
+ objects.UpgradeBeParams(i_bedict)
utils.ForceDictType(i_bedict, constants.BES_PARAMETER_TYPES)
be_new = cluster.SimpleFillBE(i_bedict)
self.be_proposed = self.be_new = be_new # the new actual values
instance_info = self.rpc.call_instance_info(pnode, instance.name,
instance.hypervisor)
nodeinfo = self.rpc.call_node_info(mem_check_list, None,
- instance.hypervisor)
+ [instance.hypervisor])
pninfo = nodeinfo[pnode]
msg = pninfo.fail_msg
if msg:
# Assume the primary node is unreachable and go ahead
self.warn.append("Can't get info from primary node %s: %s" %
(pnode, msg))
- elif not isinstance(pninfo.payload.get("memory_free", None), int):
- self.warn.append("Node data from primary node %s doesn't contain"
- " free memory information" % pnode)
- elif instance_info.fail_msg:
- self.warn.append("Can't get instance runtime information: %s" %
- instance_info.fail_msg)
else:
- if instance_info.payload:
- current_mem = int(instance_info.payload["memory"])
+ (_, _, (pnhvinfo, )) = pninfo.payload
+ if not isinstance(pnhvinfo.get("memory_free", None), int):
+ self.warn.append("Node data from primary node %s doesn't contain"
+ " free memory information" % pnode)
+ elif instance_info.fail_msg:
+ self.warn.append("Can't get instance runtime information: %s" %
+ instance_info.fail_msg)
else:
- # Assume instance not running
- # (there is a slight race condition here, but it's not very probable,
- # and we have no other way to check)
- current_mem = 0
- #TODO(dynmem): do the appropriate check involving MINMEM
- miss_mem = (be_new[constants.BE_MAXMEM] - current_mem -
- pninfo.payload["memory_free"])
- if miss_mem > 0:
- raise errors.OpPrereqError("This change will prevent the instance"
- " from starting, due to %d MB of memory"
- " missing on its primary node" % miss_mem,
- errors.ECODE_NORES)
+ if instance_info.payload:
+ current_mem = int(instance_info.payload["memory"])
+ else:
+ # Assume instance not running
+ # (there is a slight race condition here, but it's not very
+ # probable, and we have no other way to check)
+ # TODO: Describe race condition
+ current_mem = 0
+ #TODO(dynmem): do the appropriate check involving MINMEM
+ miss_mem = (be_new[constants.BE_MAXMEM] - current_mem -
+ pnhvinfo["memory_free"])
+ if miss_mem > 0:
+ raise errors.OpPrereqError("This change will prevent the instance"
+ " from starting, due to %d MB of memory"
+ " missing on its primary node" %
+ miss_mem,
+ errors.ECODE_NORES)
if be_new[constants.BE_AUTO_BALANCE]:
for node, nres in nodeinfo.items():
continue
nres.Raise("Can't get info from secondary node %s" % node,
prereq=True, ecode=errors.ECODE_STATE)
- if not isinstance(nres.payload.get("memory_free", None), int):
+ (_, _, (nhvinfo, )) = nres.payload
+ if not isinstance(nhvinfo.get("memory_free", None), int):
raise errors.OpPrereqError("Secondary node %s didn't return free"
" memory information" % node,
errors.ECODE_STATE)
#TODO(dynmem): do the appropriate check involving MINMEM
- elif be_new[constants.BE_MAXMEM] > nres.payload["memory_free"]:
+ elif be_new[constants.BE_MAXMEM] > nhvinfo["memory_free"]:
raise errors.OpPrereqError("This change will prevent the instance"
" from failover to its secondary node"
" %s, due to not enough memory" % node,
errors.ECODE_STATE)
+ if self.op.runtime_mem:
+ remote_info = self.rpc.call_instance_info(instance.primary_node,
+ instance.name,
+ instance.hypervisor)
+ remote_info.Raise("Error checking node %s" % instance.primary_node)
+ if not remote_info.payload: # not running already
+ raise errors.OpPrereqError("Instance %s is not running" % instance.name,
+ errors.ECODE_STATE)
+
+ current_memory = remote_info.payload["memory"]
+ if (not self.op.force and
+ (self.op.runtime_mem > self.be_proposed[constants.BE_MAXMEM] or
+ self.op.runtime_mem < self.be_proposed[constants.BE_MINMEM])):
+ raise errors.OpPrereqError("Instance %s must have memory between %d"
+ " and %d MB of memory unless --force is"
+ " given" % (instance.name,
+ self.be_proposed[constants.BE_MINMEM],
+ self.be_proposed[constants.BE_MAXMEM]),
+ errors.ECODE_INVAL)
+
+ if self.op.runtime_mem > current_memory:
+ _CheckNodeFreeMemory(self, instance.primary_node,
+ "ballooning memory for instance %s" %
+ instance.name,
+ self.op.memory - current_memory,
+ instance.hypervisor)
+
# NIC processing
self.nic_pnew = {}
self.nic_pinst = {}
for d in instance.disks]
new_disks = _GenerateDiskTemplate(self, self.op.disk_template,
instance.name, pnode, [snode],
- disk_info, None, None, 0, feedback_fn)
+ disk_info, None, None, 0, feedback_fn,
+ self.diskparams)
info = _GetInstanceInfoText(instance)
feedback_fn("Creating aditional volumes...")
# first, create the missing data and meta devices
result = []
instance = self.instance
+
+ # runtime memory
+ if self.op.runtime_mem:
+ rpcres = self.rpc.call_instance_balloon_memory(instance.primary_node,
+ instance,
+ self.op.runtime_mem)
+ rpcres.Raise("Cannot modify instance runtime memory")
+ result.append(("runtime_memory", self.op.runtime_mem))
+
# disk changes
for disk_op, disk_dict in self.op.disks:
if disk_op == constants.DDM_REMOVE:
[disk_dict],
file_path,
file_driver,
- disk_idx_base, feedback_fn)[0]
+ disk_idx_base,
+ feedback_fn,
+ self.diskparams)[0]
instance.disks.append(new_disk)
info = _GetInstanceInfoText(instance)
if self.op.ndparams:
utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
+ if self.op.hv_state:
+ self.new_hv_state = _MergeAndVerifyHvState(self.op.hv_state, None)
+ else:
+ self.new_hv_state = None
+
+ if self.op.disk_state:
+ self.new_disk_state = _MergeAndVerifyDiskState(self.op.disk_state, None)
+ else:
+ self.new_disk_state = None
+
+ if self.op.diskparams:
+ for templ in constants.DISK_TEMPLATES:
+ if templ not in self.op.diskparams:
+ self.op.diskparams[templ] = {}
+ utils.ForceDictType(self.op.diskparams[templ], constants.DISK_DT_TYPES)
+ else:
+ self.op.diskparams = self.cfg.GetClusterInfo().diskparams
+
+ if self.op.ipolicy:
+ cluster = self.cfg.GetClusterInfo()
+ full_ipolicy = cluster.SimpleFillIPolicy(self.op.ipolicy)
+ try:
+ objects.InstancePolicy.CheckParameterSyntax(full_ipolicy)
+ except errors.ConfigurationError, err:
+ raise errors.OpPrereqError("Invalid instance policy: %s" % err,
+ errors.ECODE_INVAL)
+
def BuildHooksEnv(self):
"""Build hooks env.
group_obj = objects.NodeGroup(name=self.op.group_name, members=[],
uuid=self.group_uuid,
alloc_policy=self.op.alloc_policy,
- ndparams=self.op.ndparams)
+ ndparams=self.op.ndparams,
+ diskparams=self.op.diskparams,
+ ipolicy=self.op.ipolicy,
+ hv_state_static=self.new_hv_state,
+ disk_state_static=self.new_disk_state)
self.cfg.AddNodeGroup(group_obj, self.proc.GetECId(), check_uuid=False)
del self.remove_locks[locking.LEVEL_NODEGROUP]
"""Assign nodes to a new group.
"""
- for node in self.op.nodes:
- self.node_data[node].group = self.group_uuid
+ mods = [(node_name, self.group_uuid) for node_name in self.op.nodes]
- # FIXME: Depends on side-effects of modifying the result of
- # C{cfg.GetAllNodesInfo}
-
- self.cfg.Update(self.group, feedback_fn) # Saves all modified nodes.
+ self.cfg.AssignGroupNodes(mods)
@staticmethod
def CheckAssignmentForSplitInstances(changes, node_data, instance_data):
lu.needed_locks = {}
self._all_groups = lu.cfg.GetAllNodeGroupsInfo()
+ self._cluster = lu.cfg.GetClusterInfo()
name_to_uuid = dict((g.name, g.uuid) for g in self._all_groups.values())
if not self.names:
# Do not pass on node information if it was not requested.
group_to_nodes = None
- return query.GroupQueryData([self._all_groups[uuid]
+ return query.GroupQueryData(self._cluster,
+ [self._all_groups[uuid]
for uuid in self.wanted],
group_to_nodes, group_to_instances)
def CheckArguments(self):
all_changes = [
self.op.ndparams,
+ self.op.diskparams,
self.op.alloc_policy,
+ self.op.hv_state,
+ self.op.disk_state,
+ self.op.ipolicy,
]
if all_changes.count(None) == len(all_changes):
self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
self.needed_locks = {
+ locking.LEVEL_INSTANCE: [],
locking.LEVEL_NODEGROUP: [self.group_uuid],
}
+ self.share_locks[locking.LEVEL_INSTANCE] = 1
+
+ def DeclareLocks(self, level):
+ if level == locking.LEVEL_INSTANCE:
+ assert not self.needed_locks[locking.LEVEL_INSTANCE]
+
+ # Lock instances optimistically, needs verification once group lock has
+ # been acquired
+ self.needed_locks[locking.LEVEL_INSTANCE] = \
+ self.cfg.GetNodeGroupInstances(self.group_uuid)
+
def CheckPrereq(self):
"""Check prerequisites.
"""
+ owned_instances = frozenset(self.owned_locks(locking.LEVEL_INSTANCE))
+
+ # Check if locked instances are still correct
+ _CheckNodeGroupInstances(self.cfg, self.group_uuid, owned_instances)
+
self.group = self.cfg.GetNodeGroup(self.group_uuid)
+ cluster = self.cfg.GetClusterInfo()
if self.group is None:
raise errors.OpExecError("Could not retrieve group '%s' (UUID: %s)" %
utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
self.new_ndparams = new_ndparams
+ if self.op.diskparams:
+ self.new_diskparams = dict()
+ for templ in constants.DISK_TEMPLATES:
+ if templ not in self.op.diskparams:
+ self.op.diskparams[templ] = {}
+ new_templ_params = _GetUpdatedParams(self.group.diskparams[templ],
+ self.op.diskparams[templ])
+ utils.ForceDictType(new_templ_params, constants.DISK_DT_TYPES)
+ self.new_diskparams[templ] = new_templ_params
+
+ if self.op.hv_state:
+ self.new_hv_state = _MergeAndVerifyHvState(self.op.hv_state,
+ self.group.hv_state_static)
+
+ if self.op.disk_state:
+ self.new_disk_state = \
+ _MergeAndVerifyDiskState(self.op.disk_state,
+ self.group.disk_state_static)
+
+ if self.op.ipolicy:
+ self.new_ipolicy = _GetUpdatedIPolicy(self.group.ipolicy,
+ self.op.ipolicy,
+ group_policy=True)
+
+ new_ipolicy = cluster.SimpleFillIPolicy(self.new_ipolicy)
+ inst_filter = lambda inst: inst.name in owned_instances
+ instances = self.cfg.GetInstancesInfoByFilter(inst_filter).values()
+ violations = \
+ _ComputeNewInstanceViolations(_CalculateGroupIPolicy(cluster,
+ self.group),
+ new_ipolicy, instances)
+
+ if violations:
+ self.LogWarning("After the ipolicy change the following instances"
+ " violate them: %s",
+ utils.CommaJoin(violations))
+
def BuildHooksEnv(self):
"""Build hooks env.
self.group.ndparams = self.new_ndparams
result.append(("ndparams", str(self.group.ndparams)))
+ if self.op.diskparams:
+ self.group.diskparams = self.new_diskparams
+ result.append(("diskparams", str(self.group.diskparams)))
+
if self.op.alloc_policy:
self.group.alloc_policy = self.op.alloc_policy
+ if self.op.hv_state:
+ self.group.hv_state_static = self.new_hv_state
+
+ if self.op.disk_state:
+ self.group.disk_state_static = self.new_disk_state
+
+ if self.op.ipolicy:
+ self.group.ipolicy = self.new_ipolicy
+
self.cfg.Update(self.group, feedback_fn)
return result
"cluster_name": cfg.GetClusterName(),
"cluster_tags": list(cluster_info.GetTags()),
"enabled_hypervisors": list(cluster_info.enabled_hypervisors),
- # we don't have job IDs
+ "ipolicy": cluster_info.ipolicy,
}
ninfo = cfg.GetAllNodesInfo()
iinfo = cfg.GetAllInstancesInfo().values()
elif self.mode == constants.IALLOCATOR_MODE_RELOC:
hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
else:
- hypervisor_name = cluster_info.enabled_hypervisors[0]
+ hypervisor_name = cluster_info.primary_hypervisor
- node_data = self.rpc.call_node_info(node_list, cfg.GetVGName(),
- hypervisor_name)
+ node_data = self.rpc.call_node_info(node_list, [cfg.GetVGName()],
+ [hypervisor_name])
node_iinfo = \
self.rpc.call_all_instances_info(node_list,
cluster_info.enabled_hypervisors)
"""Compute node groups data.
"""
+ cluster = cfg.GetClusterInfo()
ng = dict((guuid, {
"name": gdata.name,
"alloc_policy": gdata.alloc_policy,
+ "ipolicy": _CalculateGroupIPolicy(cluster, gdata),
})
for guuid, gdata in cfg.GetAllNodeGroupsInfo().items())
nresult.Raise("Can't get data for node %s" % nname)
node_iinfo[nname].Raise("Can't get node instance info from node %s" %
nname)
- remote_info = nresult.payload
+ remote_info = _MakeLegacyNodeInfo(nresult.payload)
for attr in ["memory_total", "memory_free", "memory_dom0",
"vg_size", "vg_free", "cpu_total"]: