X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/73e6cf8d614d26a9d9bc1fbc12031f07b758b7b0..784b6603e36bc7cd86c2712623cea2359f0ace91:/lib/cmdlib.py diff --git a/lib/cmdlib.py b/lib/cmdlib.py index 973f978..81f21ae 100644 --- a/lib/cmdlib.py +++ b/lib/cmdlib.py @@ -1,7 +1,7 @@ # # -# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc. +# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -721,6 +721,53 @@ def _GetUpdatedParams(old_params, update_dict, return params_copy +def _GetUpdatedIPolicy(old_ipolicy, new_ipolicy, group_policy=False): + """Return the new version of a instance policy. + + @param group_policy: whether this policy applies to a group and thus + we should support removal of policy entries + + """ + use_none = use_default = group_policy + ipolicy = copy.deepcopy(old_ipolicy) + for key, value in new_ipolicy.items(): + if key not in constants.IPOLICY_ALL_KEYS: + raise errors.OpPrereqError("Invalid key in new ipolicy: %s" % key, + errors.ECODE_INVAL) + if key in constants.IPOLICY_ISPECS: + utils.ForceDictType(value, constants.ISPECS_PARAMETER_TYPES) + ipolicy[key] = _GetUpdatedParams(old_ipolicy.get(key, {}), value, + use_none=use_none, + use_default=use_default) + else: + if not value or value == [constants.VALUE_DEFAULT]: + if group_policy: + del ipolicy[key] + else: + raise errors.OpPrereqError("Can't unset ipolicy attribute '%s'" + " on the cluster'" % key, + errors.ECODE_INVAL) + else: + if key in constants.IPOLICY_PARAMETERS: + # FIXME: we assume all such values are float + try: + ipolicy[key] = float(value) + except (TypeError, ValueError), err: + raise errors.OpPrereqError("Invalid value for attribute" + " '%s': '%s', error: %s" % + (key, value, err), errors.ECODE_INVAL) + else: + # FIXME: we assume all others are lists; this should be redone + # in a nicer way + ipolicy[key] = list(value) + try: + objects.InstancePolicy.CheckParameterSyntax(ipolicy) + except errors.ConfigurationError, err: + raise errors.OpPrereqError("Invalid instance policy: %s" % err, + errors.ECODE_INVAL) + return ipolicy + + def _UpdateAndVerifySubDict(base, updates, type_check): """Updates and verifies a dict with sub dicts of the same type. @@ -1001,8 +1048,8 @@ def _CheckInstanceState(lu, instance, req_states, msg=None): if msg is None: msg = "can't use instance from outside %s states" % ", ".join(req_states) if instance.admin_state not in req_states: - raise errors.OpPrereqError("Instance %s is marked to be %s, %s" % - (instance, instance.admin_state, msg), + raise errors.OpPrereqError("Instance '%s' is marked to be %s, %s" % + (instance.name, instance.admin_state, msg), errors.ECODE_STATE) if constants.ADMINST_UP not in req_states: @@ -1016,8 +1063,8 @@ def _CheckInstanceState(lu, instance, req_states, msg=None): (instance.name, msg), errors.ECODE_STATE) -def _CheckMinMaxSpecs(name, ipolicy, value): - """Checks if value is in the desired range. +def _ComputeMinMaxSpec(name, ipolicy, value): + """Computes if value is in the desired range. @param name: name of the parameter for which we perform the check @param ipolicy: dictionary containing min, max and std values @@ -1036,6 +1083,141 @@ def _CheckMinMaxSpecs(name, ipolicy, value): return None +def _ComputeIPolicySpecViolation(ipolicy, mem_size, cpu_count, disk_count, + nic_count, disk_sizes, + _compute_fn=_ComputeMinMaxSpec): + """Verifies ipolicy against provided specs. + + @type ipolicy: dict + @param ipolicy: The ipolicy + @type mem_size: int + @param mem_size: The memory size + @type cpu_count: int + @param cpu_count: Used cpu cores + @type disk_count: int + @param disk_count: Number of disks used + @type nic_count: int + @param nic_count: Number of nics used + @type disk_sizes: list of ints + @param disk_sizes: Disk sizes of used disk (len must match C{disk_count}) + @param _compute_fn: The compute function (unittest only) + @return: A list of violations, or an empty list of no violations are found + + """ + assert disk_count == len(disk_sizes) + + test_settings = [ + (constants.ISPEC_MEM_SIZE, mem_size), + (constants.ISPEC_CPU_COUNT, cpu_count), + (constants.ISPEC_DISK_COUNT, disk_count), + (constants.ISPEC_NIC_COUNT, nic_count), + ] + map((lambda d: (constants.ISPEC_DISK_SIZE, d)), disk_sizes) + + return filter(None, + (_compute_fn(name, ipolicy, value) + for (name, value) in test_settings)) + + +def _ComputeIPolicyInstanceViolation(ipolicy, instance, + _compute_fn=_ComputeIPolicySpecViolation): + """Compute if instance meets the specs of ipolicy. + + @type ipolicy: dict + @param ipolicy: The ipolicy to verify against + @type instance: L{objects.Instance} + @param instance: The instance to verify + @param _compute_fn: The function to verify ipolicy (unittest only) + @see: L{_ComputeIPolicySpecViolation} + + """ + mem_size = instance.beparams.get(constants.BE_MAXMEM, None) + cpu_count = instance.beparams.get(constants.BE_VCPUS, None) + disk_count = len(instance.disks) + disk_sizes = [disk.size for disk in instance.disks] + nic_count = len(instance.nics) + + return _compute_fn(ipolicy, mem_size, cpu_count, disk_count, nic_count, + disk_sizes) + + +def _ComputeIPolicyInstanceSpecViolation(ipolicy, instance_spec, + _compute_fn=_ComputeIPolicySpecViolation): + """Compute if instance specs meets the specs of ipolicy. + + @type ipolicy: dict + @param ipolicy: The ipolicy to verify against + @param instance_spec: dict + @param instance_spec: The instance spec to verify + @param _compute_fn: The function to verify ipolicy (unittest only) + @see: L{_ComputeIPolicySpecViolation} + + """ + mem_size = instance_spec.get(constants.ISPEC_MEM_SIZE, None) + cpu_count = instance_spec.get(constants.ISPEC_CPU_COUNT, None) + disk_count = instance_spec.get(constants.ISPEC_DISK_COUNT, 0) + disk_sizes = instance_spec.get(constants.ISPEC_DISK_SIZE, []) + nic_count = instance_spec.get(constants.ISPEC_NIC_COUNT, 0) + + return _compute_fn(ipolicy, mem_size, cpu_count, disk_count, nic_count, + disk_sizes) + + +def _ComputeIPolicyNodeViolation(ipolicy, instance, current_group, + target_group, + _compute_fn=_ComputeIPolicyInstanceViolation): + """Compute if instance meets the specs of the new target group. + + @param ipolicy: The ipolicy to verify + @param instance: The instance object to verify + @param current_group: The current group of the instance + @param target_group: The new group of the instance + @param _compute_fn: The function to verify ipolicy (unittest only) + @see: L{_ComputeIPolicySpecViolation} + + """ + if current_group == target_group: + return [] + else: + return _compute_fn(ipolicy, instance) + + +def _CheckTargetNodeIPolicy(lu, ipolicy, instance, node, ignore=False, + _compute_fn=_ComputeIPolicyNodeViolation): + """Checks that the target node is correct in terms of instance policy. + + @param ipolicy: The ipolicy to verify + @param instance: The instance object to verify + @param node: The new node to relocate + @param ignore: Ignore violations of the ipolicy + @param _compute_fn: The function to verify ipolicy (unittest only) + @see: L{_ComputeIPolicySpecViolation} + + """ + primary_node = lu.cfg.GetNodeInfo(instance.primary_node) + res = _compute_fn(ipolicy, instance, primary_node.group, node.group) + + if res: + msg = ("Instance does not meet target node group's (%s) instance" + " policy: %s") % (node.group, utils.CommaJoin(res)) + if ignore: + lu.LogWarning(msg) + else: + raise errors.OpPrereqError(msg, errors.ECODE_INVAL) + + +def _ComputeNewInstanceViolations(old_ipolicy, new_ipolicy, instances): + """Computes a set of any instances that would violate the new ipolicy. + + @param old_ipolicy: The current (still in-place) ipolicy + @param new_ipolicy: The new (to become) ipolicy + @param instances: List of instances to verify + @return: A list of instances which violates the new ipolicy but did not before + + """ + return (_ComputeViolatingInstances(old_ipolicy, instances) - + _ComputeViolatingInstances(new_ipolicy, instances)) + + def _ExpandItemName(fn, name, kind): """Expand an item name. @@ -1249,14 +1431,26 @@ def _DecideSelfPromotion(lu, exceptions=None): return mc_now < mc_should -def _CalculateGroupIPolicy(cfg, group): +def _CalculateGroupIPolicy(cluster, group): """Calculate instance policy for group. """ - cluster = cfg.GetClusterInfo() return cluster.SimpleFillIPolicy(group.ipolicy) +def _ComputeViolatingInstances(ipolicy, instances): + """Computes a set of instances who violates given ipolicy. + + @param ipolicy: The ipolicy to verify + @type instances: object.Instance + @param instances: List of instances to verify + @return: A frozenset of instance names violating the ipolicy + + """ + return frozenset([inst.name for inst in instances + if _ComputeIPolicyInstanceViolation(ipolicy, inst)]) + + def _CheckNicsBridgesExist(lu, target_nics, target_node): """Check that the brigdes needed by a list of nics exist. @@ -2137,34 +2331,6 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): msg = "cannot reach the master IP" _ErrorIf(True, constants.CV_ENODENET, node, msg) - def _VerifyInstancePolicy(self, instance): - """Verify instance specs against instance policy set on node group level. - - - """ - cluster = self.cfg.GetClusterInfo() - full_beparams = cluster.FillBE(instance) - ipolicy = cluster.SimpleFillIPolicy(self.group_info.ipolicy) - - mem_size = full_beparams.get(constants.BE_MAXMEM, None) - cpu_count = full_beparams.get(constants.BE_VCPUS, None) - disk_count = len(instance.disks) - disk_sizes = [disk.size for disk in instance.disks] - nic_count = len(instance.nics) - - test_settings = [ - (constants.ISPEC_MEM_SIZE, mem_size), - (constants.ISPEC_CPU_COUNT, cpu_count), - (constants.ISPEC_DISK_COUNT, disk_count), - (constants.ISPEC_NIC_COUNT, nic_count), - ] + map((lambda d: (constants.ISPEC_DISK_SIZE, d)), disk_sizes) - - for (name, value) in test_settings: - test_result = _CheckMinMaxSpecs(name, ipolicy, value) - self._ErrorIf(test_result is not None, - constants.CV_EINSTANCEPOLICY, instance.name, - test_result) - def _VerifyInstance(self, instance, instanceconfig, node_image, diskstatus): """Verify an instance. @@ -2179,7 +2345,9 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): node_vol_should = {} instanceconfig.MapLVsByNode(node_vol_should) - self._VerifyInstancePolicy(instanceconfig) + ipolicy = _CalculateGroupIPolicy(self.cfg.GetClusterInfo(), self.group_info) + err = _ComputeIPolicyInstanceViolation(ipolicy, instanceconfig) + _ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance, err) for node in node_vol_should: n_img = node_image[node] @@ -2261,14 +2429,13 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): # we already list instances living on such nodes, and that's # enough warning continue - #TODO(dynmem): use MINMEM for checking #TODO(dynmem): also consider ballooning out other instances for prinode, instances in n_img.sbp.items(): needed_mem = 0 for instance in instances: bep = cluster_info.FillBE(instance_cfg[instance]) if bep[constants.BE_AUTO_BALANCE]: - needed_mem += bep[constants.BE_MAXMEM] + needed_mem += bep[constants.BE_MINMEM] test = n_img.mfree < needed_mem self._ErrorIf(test, constants.CV_ENODEN1, node, "not enough memory to accomodate instance failovers" @@ -3630,8 +3797,14 @@ class LUClusterSetParams(LogicalUnit): # all nodes to be modified. self.needed_locks = { locking.LEVEL_NODE: locking.ALL_SET, + locking.LEVEL_INSTANCE: locking.ALL_SET, + locking.LEVEL_NODEGROUP: locking.ALL_SET, + } + self.share_locks = { + locking.LEVEL_NODE: 1, + locking.LEVEL_INSTANCE: 1, + locking.LEVEL_NODEGROUP: 1, } - self.share_locks[locking.LEVEL_NODE] = 1 def BuildHooksEnv(self): """Build hooks env. @@ -3735,13 +3908,26 @@ class LUClusterSetParams(LogicalUnit): for storage, svalues in new_disk_state.items()) if self.op.ipolicy: - ipolicy = {} - for key, value in self.op.ipolicy.items(): - utils.ForceDictType(value, constants.ISPECS_PARAMETER_TYPES) - ipolicy[key] = _GetUpdatedParams(cluster.ipolicy.get(key, {}), - value) - objects.InstancePolicy.CheckParameterSyntax(ipolicy) - self.new_ipolicy = ipolicy + self.new_ipolicy = _GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy, + group_policy=False) + + all_instances = self.cfg.GetAllInstancesInfo().values() + violations = set() + for group in self.cfg.GetAllNodeGroupsInfo().values(): + instances = frozenset([inst for inst in all_instances + if compat.any(node in group.members + for node in inst.all_nodes)]) + new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy) + new = _ComputeNewInstanceViolations(_CalculateGroupIPolicy(cluster, + group), + new_ipolicy, instances) + if new: + violations.update(new) + + if violations: + self.LogWarning("After the ipolicy change the following instances" + " violate them: %s", + utils.CommaJoin(violations)) if self.op.nicparams: utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES) @@ -6220,6 +6406,8 @@ def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name): @param requested: the amount of memory in MiB to check for @type hypervisor_name: C{str} @param hypervisor_name: the hypervisor to ask for memory stats + @rtype: integer + @return: node current free memory @raise errors.OpPrereqError: if the node doesn't have enough memory, or we cannot check the node @@ -6239,6 +6427,7 @@ def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name): " needed %s MiB, available %s MiB" % (node, reason, requested, free_mem), errors.ECODE_NORES) + return free_mem def _CheckNodesFreeDiskPerVG(lu, nodenames, req_sizes): @@ -6354,6 +6543,11 @@ class LUInstanceStartup(LogicalUnit): def ExpandNames(self): self._ExpandAndLockInstance() + self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE + + def DeclareLocks(self, level): + if level == locking.LEVEL_NODE_RES: + self._LockInstancesNodes(primary_only=True, level=locking.LEVEL_NODE_RES) def BuildHooksEnv(self): """Build hooks env. @@ -6410,6 +6604,7 @@ class LUInstanceStartup(LogicalUnit): _CheckNodeOnline(self, instance.primary_node) bep = self.cfg.GetClusterInfo().FillBE(instance) + bep.update(self.op.beparams) # check bridges existence _CheckInstanceBridgesExist(self, instance) @@ -6422,7 +6617,7 @@ class LUInstanceStartup(LogicalUnit): if not remote_info.payload: # not running already _CheckNodeFreeMemory(self, instance.primary_node, "starting instance %s" % instance.name, - bep[constants.BE_MAXMEM], instance.hypervisor) + bep[constants.BE_MINMEM], instance.hypervisor) def Exec(self, feedback_fn): """Start the instance. @@ -6717,9 +6912,39 @@ class LUInstanceRecreateDisks(LogicalUnit): HTYPE = constants.HTYPE_INSTANCE REQ_BGL = False + _MODIFYABLE = frozenset([ + constants.IDISK_SIZE, + constants.IDISK_MODE, + ]) + + # New or changed disk parameters may have different semantics + assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([ + constants.IDISK_ADOPT, + + # TODO: Implement support changing VG while recreating + constants.IDISK_VG, + constants.IDISK_METAVG, + ])) + def CheckArguments(self): - # normalise the disk list - self.op.disks = sorted(frozenset(self.op.disks)) + if self.op.disks and ht.TPositiveInt(self.op.disks[0]): + # Normalize and convert deprecated list of disk indices + self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))] + + duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks)) + if duplicates: + raise errors.OpPrereqError("Some disks have been specified more than" + " once: %s" % utils.CommaJoin(duplicates), + errors.ECODE_INVAL) + + for (idx, params) in self.op.disks: + utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES) + unsupported = frozenset(params.keys()) - self._MODIFYABLE + if unsupported: + raise errors.OpPrereqError("Parameters for disk %s try to change" + " unmodifyable parameter(s): %s" % + (idx, utils.CommaJoin(unsupported)), + errors.ECODE_INVAL) def ExpandNames(self): self._ExpandAndLockInstance() @@ -6729,6 +6954,7 @@ class LUInstanceRecreateDisks(LogicalUnit): self.needed_locks[locking.LEVEL_NODE] = list(self.op.nodes) else: self.needed_locks[locking.LEVEL_NODE] = [] + self.needed_locks[locking.LEVEL_NODE_RES] = [] def DeclareLocks(self, level): if level == locking.LEVEL_NODE: @@ -6784,6 +7010,7 @@ class LUInstanceRecreateDisks(LogicalUnit): if instance.disk_template == constants.DT_DISKLESS: raise errors.OpPrereqError("Instance '%s' has no disks" % self.op.instance_name, errors.ECODE_INVAL) + # if we replace nodes *and* the old primary is offline, we don't # check assert instance.primary_node in self.owned_locks(locking.LEVEL_NODE) @@ -6793,17 +7020,22 @@ class LUInstanceRecreateDisks(LogicalUnit): _CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING, msg="cannot recreate disks") - if not self.op.disks: - self.op.disks = range(len(instance.disks)) + if self.op.disks: + self.disks = dict(self.op.disks) else: - for idx in self.op.disks: - if idx >= len(instance.disks): - raise errors.OpPrereqError("Invalid disk index '%s'" % idx, - errors.ECODE_INVAL) - if self.op.disks != range(len(instance.disks)) and self.op.nodes: + self.disks = dict((idx, {}) for idx in range(len(instance.disks))) + + maxidx = max(self.disks.keys()) + if maxidx >= len(instance.disks): + raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx, + errors.ECODE_INVAL) + + if (self.op.nodes and + sorted(self.disks.keys()) != range(len(instance.disks))): raise errors.OpPrereqError("Can't recreate disks partially and" " change the nodes at the same time", errors.ECODE_INVAL) + self.instance = instance def Exec(self, feedback_fn): @@ -6816,30 +7048,42 @@ class LUInstanceRecreateDisks(LogicalUnit): self.owned_locks(locking.LEVEL_NODE_RES)) to_skip = [] - mods = [] # keeps track of needed logical_id changes + mods = [] # keeps track of needed changes for idx, disk in enumerate(instance.disks): - if idx not in self.op.disks: # disk idx has not been passed in + try: + changes = self.disks[idx] + except KeyError: + # Disk should not be recreated to_skip.append(idx) continue + # update secondaries for disks, if needed - if self.op.nodes: - if disk.dev_type == constants.LD_DRBD8: - # need to update the nodes and minors - assert len(self.op.nodes) == 2 - assert len(disk.logical_id) == 6 # otherwise disk internals - # have changed - (_, _, old_port, _, _, old_secret) = disk.logical_id - new_minors = self.cfg.AllocateDRBDMinor(self.op.nodes, instance.name) - new_id = (self.op.nodes[0], self.op.nodes[1], old_port, - new_minors[0], new_minors[1], old_secret) - assert len(disk.logical_id) == len(new_id) - mods.append((idx, new_id)) + if self.op.nodes and disk.dev_type == constants.LD_DRBD8: + # need to update the nodes and minors + assert len(self.op.nodes) == 2 + assert len(disk.logical_id) == 6 # otherwise disk internals + # have changed + (_, _, old_port, _, _, old_secret) = disk.logical_id + new_minors = self.cfg.AllocateDRBDMinor(self.op.nodes, instance.name) + new_id = (self.op.nodes[0], self.op.nodes[1], old_port, + new_minors[0], new_minors[1], old_secret) + assert len(disk.logical_id) == len(new_id) + else: + new_id = None + + mods.append((idx, new_id, changes)) # now that we have passed all asserts above, we can apply the mods # in a single run (to avoid partial changes) - for idx, new_id in mods: - instance.disks[idx].logical_id = new_id + for idx, new_id, changes in mods: + disk = instance.disks[idx] + if new_id is not None: + assert disk.dev_type == constants.LD_DRBD8 + disk.logical_id = new_id + if changes: + disk.Update(size=changes.get(constants.IDISK_SIZE, None), + mode=changes.get(constants.IDISK_MODE, None)) # change primary node, if needed if self.op.nodes: @@ -7118,13 +7362,17 @@ class LUInstanceFailover(LogicalUnit): self.needed_locks[locking.LEVEL_NODE] = [] self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE + self.needed_locks[locking.LEVEL_NODE_RES] = [] + self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE + ignore_consistency = self.op.ignore_consistency shutdown_timeout = self.op.shutdown_timeout self._migrater = TLMigrateInstance(self, self.op.instance_name, cleanup=False, failover=True, ignore_consistency=ignore_consistency, - shutdown_timeout=shutdown_timeout) + shutdown_timeout=shutdown_timeout, + ignore_ipolicy=self.op.ignore_ipolicy) self.tasklets = [self._migrater] def DeclareLocks(self, level): @@ -7139,6 +7387,10 @@ class LUInstanceFailover(LogicalUnit): del self.recalculate_locks[locking.LEVEL_NODE] else: self._LockInstancesNodes() + elif level == locking.LEVEL_NODE_RES: + # Copy node locks + self.needed_locks[locking.LEVEL_NODE_RES] = \ + self.needed_locks[locking.LEVEL_NODE][:] def BuildHooksEnv(self): """Build hooks env. @@ -7195,10 +7447,16 @@ class LUInstanceMigrate(LogicalUnit): self.needed_locks[locking.LEVEL_NODE] = [] self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE - self._migrater = TLMigrateInstance(self, self.op.instance_name, - cleanup=self.op.cleanup, - failover=False, - fallback=self.op.allow_failover) + self.needed_locks[locking.LEVEL_NODE] = [] + self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE + + self._migrater = \ + TLMigrateInstance(self, self.op.instance_name, + cleanup=self.op.cleanup, + failover=False, + fallback=self.op.allow_failover, + allow_runtime_changes=self.op.allow_runtime_changes, + ignore_ipolicy=self.op.ignore_ipolicy) self.tasklets = [self._migrater] def DeclareLocks(self, level): @@ -7213,6 +7471,10 @@ class LUInstanceMigrate(LogicalUnit): del self.recalculate_locks[locking.LEVEL_NODE] else: self._LockInstancesNodes() + elif level == locking.LEVEL_NODE_RES: + # Copy node locks + self.needed_locks[locking.LEVEL_NODE_RES] = \ + self.needed_locks[locking.LEVEL_NODE][:] def BuildHooksEnv(self): """Build hooks env. @@ -7229,6 +7491,7 @@ class LUInstanceMigrate(LogicalUnit): "MIGRATE_CLEANUP": self.op.cleanup, "OLD_PRIMARY": source_node, "NEW_PRIMARY": target_node, + "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes, }) if instance.disk_template in constants.DTS_INT_MIRROR: @@ -7327,6 +7590,10 @@ class LUInstanceMove(LogicalUnit): _CheckNodeOnline(self, target_node) _CheckNodeNotDrained(self, target_node) _CheckNodeVmCapable(self, target_node) + ipolicy = _CalculateGroupIPolicy(self.cfg.GetClusterInfo(), + self.cfg.GetNodeGroup(node.group)) + _CheckTargetNodeIPolicy(self, ipolicy, instance, node, + ignore=self.op.ignore_ipolicy) if instance.admin_state == constants.ADMINST_UP: # check memory requirements on the secondary node @@ -7468,6 +7735,7 @@ class LUNodeMigrate(LogicalUnit): """ return { "NODE_NAME": self.op.node_name, + "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes, } def BuildHooksNodes(self): @@ -7482,12 +7750,15 @@ class LUNodeMigrate(LogicalUnit): def Exec(self, feedback_fn): # Prepare jobs for migration instances + allow_runtime_changes = self.op.allow_runtime_changes jobs = [ [opcodes.OpInstanceMigrate(instance_name=inst.name, mode=self.op.mode, live=self.op.live, iallocator=self.op.iallocator, - target_node=self.op.target_node)] + target_node=self.op.target_node, + allow_runtime_changes=allow_runtime_changes, + ignore_ipolicy=self.op.ignore_ipolicy)] for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name) ] @@ -7524,6 +7795,8 @@ class TLMigrateInstance(Tasklet): and target node @type shutdown_timeout: int @ivar shutdown_timeout: In case of failover timeout of the shutdown + @type ignore_ipolicy: bool + @ivar ignore_ipolicy: If true, we can ignore instance policy when migrating """ @@ -7534,7 +7807,9 @@ class TLMigrateInstance(Tasklet): def __init__(self, lu, instance_name, cleanup=False, failover=False, fallback=False, ignore_consistency=False, - shutdown_timeout=constants.DEFAULT_SHUTDOWN_TIMEOUT): + allow_runtime_changes=True, + shutdown_timeout=constants.DEFAULT_SHUTDOWN_TIMEOUT, + ignore_ipolicy=False): """Initializes this class. """ @@ -7548,6 +7823,8 @@ class TLMigrateInstance(Tasklet): self.fallback = fallback self.ignore_consistency = ignore_consistency self.shutdown_timeout = shutdown_timeout + self.ignore_ipolicy = ignore_ipolicy + self.allow_runtime_changes = allow_runtime_changes def CheckPrereq(self): """Check prerequisites. @@ -7559,6 +7836,7 @@ class TLMigrateInstance(Tasklet): instance = self.cfg.GetInstanceInfo(instance_name) assert instance is not None self.instance = instance + cluster = self.cfg.GetClusterInfo() if (not self.cleanup and not instance.admin_state == constants.ADMINST_UP and @@ -7586,6 +7864,13 @@ class TLMigrateInstance(Tasklet): # BuildHooksEnv self.target_node = self.lu.op.target_node + # Check that the target node is correct in terms of instance policy + nodeinfo = self.cfg.GetNodeInfo(self.target_node) + group_info = self.cfg.GetNodeGroup(nodeinfo.group) + ipolicy = _CalculateGroupIPolicy(cluster, group_info) + _CheckTargetNodeIPolicy(self.lu, ipolicy, instance, nodeinfo, + ignore=self.ignore_ipolicy) + # self.target_node is already populated, either directly or by the # iallocator run target_node = self.target_node @@ -7619,14 +7904,22 @@ class TLMigrateInstance(Tasklet): " node can be passed)" % (instance.disk_template, text), errors.ECODE_INVAL) + nodeinfo = self.cfg.GetNodeInfo(target_node) + group_info = self.cfg.GetNodeGroup(nodeinfo.group) + ipolicy = _CalculateGroupIPolicy(cluster, group_info) + _CheckTargetNodeIPolicy(self.lu, ipolicy, instance, nodeinfo, + ignore=self.ignore_ipolicy) - i_be = self.cfg.GetClusterInfo().FillBE(instance) + i_be = cluster.FillBE(instance) # check memory requirements on the secondary node - if not self.failover or instance.admin_state == constants.ADMINST_UP: - _CheckNodeFreeMemory(self.lu, target_node, "migrating instance %s" % - instance.name, i_be[constants.BE_MAXMEM], - instance.hypervisor) + if (not self.cleanup and + (not self.failover or instance.admin_state == constants.ADMINST_UP)): + self.tgt_free_mem = _CheckNodeFreeMemory(self.lu, target_node, + "migrating instance %s" % + instance.name, + i_be[constants.BE_MINMEM], + instance.hypervisor) else: self.lu.LogInfo("Not checking memory on the secondary node as" " instance will not be started") @@ -7676,8 +7969,7 @@ class TLMigrateInstance(Tasklet): self.lu.op.live = None elif self.lu.op.mode is None: # read the default value from the hypervisor - i_hv = self.cfg.GetClusterInfo().FillHV(self.instance, - skip_globals=False) + i_hv = cluster.FillHV(self.instance, skip_globals=False) self.lu.op.mode = i_hv[constants.HV_MIGRATION_MODE] self.live = self.lu.op.mode == constants.HT_MIGRATION_LIVE @@ -7685,10 +7977,21 @@ class TLMigrateInstance(Tasklet): # Failover is never live self.live = False + if not (self.failover or self.cleanup): + remote_info = self.rpc.call_instance_info(instance.primary_node, + instance.name, + instance.hypervisor) + remote_info.Raise("Error checking instance on node %s" % + instance.primary_node) + instance_running = bool(remote_info.payload) + if instance_running: + self.current_mem = int(remote_info.payload["memory"]) + def _RunAllocator(self): """Run the allocator based on input opcode. """ + # FIXME: add a self.ignore_ipolicy option ial = IAllocator(self.cfg, self.rpc, mode=constants.IALLOCATOR_MODE_RELOC, name=self.instance_name, @@ -7929,6 +8232,19 @@ class TLMigrateInstance(Tasklet): " synchronized on target node," " aborting migration" % dev.iv_name) + if self.current_mem > self.tgt_free_mem: + if not self.allow_runtime_changes: + raise errors.OpExecError("Memory ballooning not allowed and not enough" + " free memory to fit instance %s on target" + " node %s (have %dMB, need %dMB)" % + (instance.name, target_node, + self.tgt_free_mem, self.current_mem)) + self.feedback_fn("* setting instance memory to %s" % self.tgt_free_mem) + rpcres = self.rpc.call_instance_balloon_memory(instance.primary_node, + instance, + self.tgt_free_mem) + rpcres.Raise("Cannot modify instance runtime memory") + # First get the migration information from the remote node result = self.rpc.call_migration_info(source_node, instance) msg = result.fail_msg @@ -8040,6 +8356,21 @@ class TLMigrateInstance(Tasklet): self._GoReconnect(False) self._WaitUntilSync() + # If the instance's disk template is `rbd' and there was a successful + # migration, unmap the device from the source node. + if self.instance.disk_template == constants.DT_RBD: + disks = _ExpandCheckDisks(instance, instance.disks) + self.feedback_fn("* unmapping instance's disks from %s" % source_node) + for disk in disks: + result = self.rpc.call_blockdev_shutdown(source_node, disk) + msg = result.fail_msg + if msg: + logging.error("Migration was successful, but couldn't unmap the" + " block device %s on source node %s: %s", + disk.iv_name, source_node, msg) + logging.error("You need to unmap the device %s manually on %s", + disk.iv_name, source_node) + self.feedback_fn("* done") def _ExecFailover(self): @@ -8307,6 +8638,15 @@ def _ComputeLDParams(disk_template, disk_params): elif disk_template == constants.DT_BLOCK: result.append(constants.DISK_LD_DEFAULTS[constants.LD_BLOCKDEV]) + elif disk_template == constants.DT_RBD: + params = { + constants.LDP_POOL: dt_params[constants.RBD_POOL] + } + params = \ + objects.FillDict(constants.DISK_LD_DEFAULTS[constants.LD_RBD], + params) + result.append(params) + return result @@ -8352,7 +8692,7 @@ def _GenerateDiskTemplate(lu, template_name, if template_name == constants.DT_DISKLESS: pass elif template_name == constants.DT_PLAIN: - if len(secondary_nodes) != 0: + if secondary_nodes: raise errors.ProgrammerError("Wrong template configuration") names = _GenerateUniqueNames(lu, [".disk%d" % (base_index + i) @@ -8396,7 +8736,7 @@ def _GenerateDiskTemplate(lu, template_name, disk_dev.mode = disk[constants.IDISK_MODE] disks.append(disk_dev) elif template_name == constants.DT_FILE: - if len(secondary_nodes) != 0: + if secondary_nodes: raise errors.ProgrammerError("Wrong template configuration") opcodes.RequireFileStorage() @@ -8413,7 +8753,7 @@ def _GenerateDiskTemplate(lu, template_name, params=ld_params[0]) disks.append(disk_dev) elif template_name == constants.DT_SHARED_FILE: - if len(secondary_nodes) != 0: + if secondary_nodes: raise errors.ProgrammerError("Wrong template configuration") opcodes.RequireSharedFileStorage() @@ -8430,7 +8770,7 @@ def _GenerateDiskTemplate(lu, template_name, params=ld_params[0]) disks.append(disk_dev) elif template_name == constants.DT_BLOCK: - if len(secondary_nodes) != 0: + if secondary_nodes: raise errors.ProgrammerError("Wrong template configuration") for idx, disk in enumerate(disk_info): @@ -8443,6 +8783,22 @@ def _GenerateDiskTemplate(lu, template_name, mode=disk[constants.IDISK_MODE], params=ld_params[0]) disks.append(disk_dev) + elif template_name == constants.DT_RBD: + if secondary_nodes: + raise errors.ProgrammerError("Wrong template configuration") + + names = _GenerateUniqueNames(lu, [".rbd.disk%d" % (base_index + i) + for i in range(disk_count)]) + + for idx, disk in enumerate(disk_info): + disk_index = idx + base_index + disk_dev = objects.Disk(dev_type=constants.LD_RBD, + size=disk[constants.IDISK_SIZE], + logical_id=("rbd", names[idx]), + iv_name="disk/%d" % disk_index, + mode=disk[constants.IDISK_MODE], + params=ld_params[0]) + disks.append(disk_dev) else: raise errors.ProgrammerError("Invalid disk template '%s'" % template_name) @@ -8683,6 +9039,7 @@ def _ComputeDiskSize(disk_template, disks): constants.DT_FILE: None, constants.DT_SHARED_FILE: 0, constants.DT_BLOCK: 0, + constants.DT_RBD: 0, } if disk_template not in req_size_dict: @@ -9254,7 +9611,7 @@ class LUInstanceCreate(LogicalUnit): # pylint: disable=W0142 self.instance_file_storage_dir = utils.PathJoin(*joinargs) - def CheckPrereq(self): + def CheckPrereq(self): # pylint: disable=R0914 """Check prerequisites. """ @@ -9453,6 +9810,9 @@ class LUInstanceCreate(LogicalUnit): _ReleaseLocks(self, locking.LEVEL_NODE, keep=filter(None, [self.op.pnode, self.op.snode, self.op.src_node])) + _ReleaseLocks(self, locking.LEVEL_NODE_RES, + keep=filter(None, [self.op.pnode, self.op.snode, + self.op.src_node])) #### node related checks @@ -9491,14 +9851,38 @@ class LUInstanceCreate(LogicalUnit): nodenames = [pnode.name] + self.secondaries + # Verify instance specs + ispec = { + constants.ISPEC_MEM_SIZE: self.be_full.get(constants.BE_MAXMEM, None), + constants.ISPEC_CPU_COUNT: self.be_full.get(constants.BE_VCPUS, None), + constants.ISPEC_DISK_COUNT: len(self.disks), + constants.ISPEC_DISK_SIZE: [disk["size"] for disk in self.disks], + constants.ISPEC_NIC_COUNT: len(self.nics), + } + + group_info = self.cfg.GetNodeGroup(pnode.group) + ipolicy = _CalculateGroupIPolicy(cluster, group_info) + res = _ComputeIPolicyInstanceSpecViolation(ipolicy, ispec) + if not self.op.ignore_ipolicy and res: + raise errors.OpPrereqError(("Instance allocation to group %s violates" + " policy: %s") % (pnode.group, + utils.CommaJoin(res)), + errors.ECODE_INVAL) + # disk parameters (not customizable at instance or node level) # just use the primary node parameters, ignoring the secondary. - self.diskparams = self.cfg.GetNodeGroup(pnode.group).diskparams + self.diskparams = group_info.diskparams if not self.adopt_disks: - # Check lv size requirements, if not adopting - req_sizes = _ComputeDiskSizePerVG(self.op.disk_template, self.disks) - _CheckNodesFreeDiskPerVG(self, nodenames, req_sizes) + if self.op.disk_template == constants.DT_RBD: + # _CheckRADOSFreeSpace() is just a placeholder. + # Any function that checks prerequisites can be placed here. + # Check if there is enough space on the RADOS cluster. + _CheckRADOSFreeSpace() + else: + # Check lv size requirements, if not adopting + req_sizes = _ComputeDiskSizePerVG(self.op.disk_template, self.disks) + _CheckNodesFreeDiskPerVG(self, nodenames, req_sizes) elif self.op.disk_template == constants.DT_PLAIN: # Check the adoption data all_lvs = set(["%s/%s" % (disk[constants.IDISK_VG], @@ -9811,6 +10195,14 @@ class LUInstanceCreate(LogicalUnit): return list(iobj.all_nodes) +def _CheckRADOSFreeSpace(): + """Compute disk size requirements inside the RADOS cluster. + + """ + # For the RADOS cluster we assume there is always enough space. + pass + + class LUInstanceConsole(NoHooksLU): """Connect to an instance's console. @@ -9926,7 +10318,8 @@ class LUInstanceReplaceDisks(LogicalUnit): self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode, self.op.iallocator, self.op.remote_node, - self.op.disks, False, self.op.early_release) + self.op.disks, False, self.op.early_release, + self.op.ignore_ipolicy) self.tasklets = [self.replacer] @@ -10008,7 +10401,7 @@ class TLReplaceDisks(Tasklet): """ def __init__(self, lu, instance_name, mode, iallocator_name, remote_node, - disks, delay_iallocator, early_release): + disks, delay_iallocator, early_release, ignore_ipolicy): """Initializes this class. """ @@ -10022,6 +10415,7 @@ class TLReplaceDisks(Tasklet): self.disks = disks self.delay_iallocator = delay_iallocator self.early_release = early_release + self.ignore_ipolicy = ignore_ipolicy # Runtime data self.instance = None @@ -10244,6 +10638,16 @@ class TLReplaceDisks(Tasklet): if not self.disks: self.disks = range(len(self.instance.disks)) + # TODO: This is ugly, but right now we can't distinguish between internal + # submitted opcode and external one. We should fix that. + if self.remote_node_info: + # We change the node, lets verify it still meets instance policy + new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group) + ipolicy = _CalculateGroupIPolicy(self.cfg.GetClusterInfo(), + new_group_info) + _CheckTargetNodeIPolicy(self, ipolicy, instance, self.remote_node_info, + ignore=self.ignore_ipolicy) + # TODO: compute disk parameters primary_node_info = self.cfg.GetNodeInfo(instance.primary_node) secondary_node_info = self.cfg.GetNodeInfo(secondary_node) @@ -11101,6 +11505,7 @@ class LUInstanceGrowDisk(LogicalUnit): self._ExpandAndLockInstance() self.needed_locks[locking.LEVEL_NODE] = [] self.needed_locks[locking.LEVEL_NODE_RES] = [] + self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE def DeclareLocks(self, level): @@ -11153,7 +11558,8 @@ class LUInstanceGrowDisk(LogicalUnit): self.disk = instance.FindDisk(self.op.disk) if instance.disk_template not in (constants.DT_FILE, - constants.DT_SHARED_FILE): + constants.DT_SHARED_FILE, + constants.DT_RBD): # TODO: check the free disk space for file, when that feature will be # supported _CheckNodesFreeDiskPerVG(self, nodenames, @@ -11400,7 +11806,8 @@ class LUInstanceSetParams(LogicalUnit): def CheckArguments(self): if not (self.op.nics or self.op.disks or self.op.disk_template or self.op.hvparams or self.op.beparams or self.op.os_name or - self.op.online_inst or self.op.offline_inst): + self.op.online_inst or self.op.offline_inst or + self.op.runtime_mem): raise errors.OpPrereqError("No changes submitted", errors.ECODE_INVAL) if self.op.hvparams: @@ -11584,6 +11991,8 @@ class LUInstanceSetParams(LogicalUnit): env = _BuildInstanceHookEnvByObject(self, self.instance, override=args) if self.op.disk_template: env["NEW_DISK_TEMPLATE"] = self.op.disk_template + if self.op.runtime_mem: + env["RUNTIME_MEMORY"] = self.op.runtime_mem return env @@ -11648,6 +12057,10 @@ class LUInstanceSetParams(LogicalUnit): _CheckNodesFreeDiskPerVG(self, [self.op.remote_node], required) snode_info = self.cfg.GetNodeInfo(self.op.remote_node) + snode_group = self.cfg.GetNodeGroup(snode_info.group) + ipolicy = _CalculateGroupIPolicy(cluster, snode_group) + _CheckTargetNodeIPolicy(self, ipolicy, instance, snode_info, + ignore=self.op.ignore_ipolicy) if pnode_info.group != snode_info.group: self.LogWarning("The primary and secondary nodes are in two" " different node groups; the disk parameters" @@ -11784,6 +12197,33 @@ class LUInstanceSetParams(LogicalUnit): " %s, due to not enough memory" % node, errors.ECODE_STATE) + if self.op.runtime_mem: + remote_info = self.rpc.call_instance_info(instance.primary_node, + instance.name, + instance.hypervisor) + remote_info.Raise("Error checking node %s" % instance.primary_node) + if not remote_info.payload: # not running already + raise errors.OpPrereqError("Instance %s is not running" % instance.name, + errors.ECODE_STATE) + + current_memory = remote_info.payload["memory"] + if (not self.op.force and + (self.op.runtime_mem > self.be_proposed[constants.BE_MAXMEM] or + self.op.runtime_mem < self.be_proposed[constants.BE_MINMEM])): + raise errors.OpPrereqError("Instance %s must have memory between %d" + " and %d MB of memory unless --force is" + " given" % (instance.name, + self.be_proposed[constants.BE_MINMEM], + self.be_proposed[constants.BE_MAXMEM]), + errors.ECODE_INVAL) + + if self.op.runtime_mem > current_memory: + _CheckNodeFreeMemory(self, instance.primary_node, + "ballooning memory for instance %s" % + instance.name, + self.op.memory - current_memory, + instance.hypervisor) + # NIC processing self.nic_pnew = {} self.nic_pinst = {} @@ -12027,6 +12467,15 @@ class LUInstanceSetParams(LogicalUnit): result = [] instance = self.instance + + # runtime memory + if self.op.runtime_mem: + rpcres = self.rpc.call_instance_balloon_memory(instance.primary_node, + instance, + self.op.runtime_mem) + rpcres.Raise("Cannot modify instance runtime memory") + result.append(("runtime_memory", self.op.runtime_mem)) + # disk changes for disk_op, disk_dict in self.op.disks: if disk_op == constants.DDM_REMOVE: @@ -12821,7 +13270,11 @@ class LUGroupAdd(LogicalUnit): if self.op.ipolicy: cluster = self.cfg.GetClusterInfo() full_ipolicy = cluster.SimpleFillIPolicy(self.op.ipolicy) - objects.InstancePolicy.CheckParameterSyntax(full_ipolicy) + try: + objects.InstancePolicy.CheckParameterSyntax(full_ipolicy) + except errors.ConfigurationError, err: + raise errors.OpPrereqError("Invalid instance policy: %s" % err, + errors.ECODE_INVAL) def BuildHooksEnv(self): """Build hooks env. @@ -13114,14 +13567,32 @@ class LUGroupSetParams(LogicalUnit): self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name) self.needed_locks = { + locking.LEVEL_INSTANCE: [], locking.LEVEL_NODEGROUP: [self.group_uuid], } + self.share_locks[locking.LEVEL_INSTANCE] = 1 + + def DeclareLocks(self, level): + if level == locking.LEVEL_INSTANCE: + assert not self.needed_locks[locking.LEVEL_INSTANCE] + + # Lock instances optimistically, needs verification once group lock has + # been acquired + self.needed_locks[locking.LEVEL_INSTANCE] = \ + self.cfg.GetNodeGroupInstances(self.group_uuid) + def CheckPrereq(self): """Check prerequisites. """ + owned_instances = frozenset(self.owned_locks(locking.LEVEL_INSTANCE)) + + # Check if locked instances are still correct + _CheckNodeGroupInstances(self.cfg, self.group_uuid, owned_instances) + self.group = self.cfg.GetNodeGroup(self.group_uuid) + cluster = self.cfg.GetClusterInfo() if self.group is None: raise errors.OpExecError("Could not retrieve group '%s' (UUID: %s)" % @@ -13152,14 +13623,22 @@ class LUGroupSetParams(LogicalUnit): self.group.disk_state_static) if self.op.ipolicy: - g_ipolicy = {} - for key, value in self.op.ipolicy.iteritems(): - g_ipolicy[key] = _GetUpdatedParams(self.group.ipolicy.get(key, {}), - value, - use_none=True) - utils.ForceDictType(g_ipolicy[key], constants.ISPECS_PARAMETER_TYPES) - self.new_ipolicy = g_ipolicy - objects.InstancePolicy.CheckParameterSyntax(self.new_ipolicy) + self.new_ipolicy = _GetUpdatedIPolicy(self.group.ipolicy, + self.op.ipolicy, + group_policy=True) + + new_ipolicy = cluster.SimpleFillIPolicy(self.new_ipolicy) + inst_filter = lambda inst: inst.name in owned_instances + instances = self.cfg.GetInstancesInfoByFilter(inst_filter).values() + violations = \ + _ComputeNewInstanceViolations(_CalculateGroupIPolicy(cluster, + self.group), + new_ipolicy, instances) + + if violations: + self.LogWarning("After the ipolicy change the following instances" + " violate them: %s", + utils.CommaJoin(violations)) def BuildHooksEnv(self): """Build hooks env. @@ -13913,7 +14392,7 @@ class IAllocator(object): "cluster_name": cfg.GetClusterName(), "cluster_tags": list(cluster_info.GetTags()), "enabled_hypervisors": list(cluster_info.enabled_hypervisors), - # we don't have job IDs + "ipolicy": cluster_info.ipolicy, } ninfo = cfg.GetAllNodesInfo() iinfo = cfg.GetAllInstancesInfo().values() @@ -13952,9 +14431,11 @@ class IAllocator(object): """Compute node groups data. """ + cluster = cfg.GetClusterInfo() ng = dict((guuid, { "name": gdata.name, "alloc_policy": gdata.alloc_policy, + "ipolicy": _CalculateGroupIPolicy(cluster, gdata), }) for guuid, gdata in cfg.GetAllNodeGroupsInfo().items())