X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/195f90bf2ad2d7961b5a871264212c486c6932ea..090377807b5214b3ae4a6bfe294d94df3eb5d6df:/lib/cmdlib.py diff --git a/lib/cmdlib.py b/lib/cmdlib.py index 61103d0..d475b30 100644 --- a/lib/cmdlib.py +++ b/lib/cmdlib.py @@ -1,7 +1,7 @@ # # -# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc. +# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -68,12 +68,15 @@ import ganeti.masterd.instance # pylint: disable=W0611 DRBD_META_SIZE = 128 # States of instance -INSTANCE_UP = [constants.ADMINST_UP] INSTANCE_DOWN = [constants.ADMINST_DOWN] -INSTANCE_OFFLINE = [constants.ADMINST_OFFLINE] INSTANCE_ONLINE = [constants.ADMINST_DOWN, constants.ADMINST_UP] INSTANCE_NOT_RUNNING = [constants.ADMINST_DOWN, constants.ADMINST_OFFLINE] +#: Instance status in which an instance can be marked as offline/online +CAN_CHANGE_INSTANCE_OFFLINE = (frozenset(INSTANCE_DOWN) | frozenset([ + constants.ADMINST_OFFLINE, + ])) + class ResultWithJobs: """Data container for LU results with jobs. @@ -193,9 +196,15 @@ class LogicalUnit(object): as values. Rules: - use an empty dict if you don't need any lock - - if you don't need any lock at a particular level omit that level + - if you don't need any lock at a particular level omit that + level (note that in this case C{DeclareLocks} won't be called + at all for that level) + - if you need locks at a level, but you can't calculate it in + this function, initialise that level with an empty list and do + further processing in L{LogicalUnit.DeclareLocks} (see that + function's docstring) - don't put anything for the BGL level - - if you want all locks at a level use locking.ALL_SET as a value + - if you want all locks at a level use L{locking.ALL_SET} as a value If you need to share locks (rather than acquire them exclusively) at one level you can modify self.share_locks, setting a true value (usually 1) for @@ -242,7 +251,7 @@ class LogicalUnit(object): self.needed_locks for the level. @param level: Locking level which is going to be locked - @type level: member of ganeti.locking.LEVELS + @type level: member of L{ganeti.locking.LEVELS} """ @@ -721,6 +730,53 @@ def _GetUpdatedParams(old_params, update_dict, return params_copy +def _GetUpdatedIPolicy(old_ipolicy, new_ipolicy, group_policy=False): + """Return the new version of a instance policy. + + @param group_policy: whether this policy applies to a group and thus + we should support removal of policy entries + + """ + use_none = use_default = group_policy + ipolicy = copy.deepcopy(old_ipolicy) + for key, value in new_ipolicy.items(): + if key not in constants.IPOLICY_ALL_KEYS: + raise errors.OpPrereqError("Invalid key in new ipolicy: %s" % key, + errors.ECODE_INVAL) + if key in constants.IPOLICY_ISPECS: + utils.ForceDictType(value, constants.ISPECS_PARAMETER_TYPES) + ipolicy[key] = _GetUpdatedParams(old_ipolicy.get(key, {}), value, + use_none=use_none, + use_default=use_default) + else: + if not value or value == [constants.VALUE_DEFAULT]: + if group_policy: + del ipolicy[key] + else: + raise errors.OpPrereqError("Can't unset ipolicy attribute '%s'" + " on the cluster'" % key, + errors.ECODE_INVAL) + else: + if key in constants.IPOLICY_PARAMETERS: + # FIXME: we assume all such values are float + try: + ipolicy[key] = float(value) + except (TypeError, ValueError), err: + raise errors.OpPrereqError("Invalid value for attribute" + " '%s': '%s', error: %s" % + (key, value, err), errors.ECODE_INVAL) + else: + # FIXME: we assume all others are lists; this should be redone + # in a nicer way + ipolicy[key] = list(value) + try: + objects.InstancePolicy.CheckParameterSyntax(ipolicy) + except errors.ConfigurationError, err: + raise errors.OpPrereqError("Invalid instance policy: %s" % err, + errors.ECODE_INVAL) + return ipolicy + + def _UpdateAndVerifySubDict(base, updates, type_check): """Updates and verifies a dict with sub dicts of the same type. @@ -1001,8 +1057,8 @@ def _CheckInstanceState(lu, instance, req_states, msg=None): if msg is None: msg = "can't use instance from outside %s states" % ", ".join(req_states) if instance.admin_state not in req_states: - raise errors.OpPrereqError("Instance %s is marked to be %s, %s" % - (instance, instance.admin_state, msg), + raise errors.OpPrereqError("Instance '%s' is marked to be %s, %s" % + (instance.name, instance.admin_state, msg), errors.ECODE_STATE) if constants.ADMINST_UP not in req_states: @@ -1016,8 +1072,8 @@ def _CheckInstanceState(lu, instance, req_states, msg=None): (instance.name, msg), errors.ECODE_STATE) -def _CheckMinMaxSpecs(name, ipolicy, value): - """Checks if value is in the desired range. +def _ComputeMinMaxSpec(name, ipolicy, value): + """Computes if value is in the desired range. @param name: name of the parameter for which we perform the check @param ipolicy: dictionary containing min, max and std values @@ -1036,6 +1092,141 @@ def _CheckMinMaxSpecs(name, ipolicy, value): return None +def _ComputeIPolicySpecViolation(ipolicy, mem_size, cpu_count, disk_count, + nic_count, disk_sizes, + _compute_fn=_ComputeMinMaxSpec): + """Verifies ipolicy against provided specs. + + @type ipolicy: dict + @param ipolicy: The ipolicy + @type mem_size: int + @param mem_size: The memory size + @type cpu_count: int + @param cpu_count: Used cpu cores + @type disk_count: int + @param disk_count: Number of disks used + @type nic_count: int + @param nic_count: Number of nics used + @type disk_sizes: list of ints + @param disk_sizes: Disk sizes of used disk (len must match C{disk_count}) + @param _compute_fn: The compute function (unittest only) + @return: A list of violations, or an empty list of no violations are found + + """ + assert disk_count == len(disk_sizes) + + test_settings = [ + (constants.ISPEC_MEM_SIZE, mem_size), + (constants.ISPEC_CPU_COUNT, cpu_count), + (constants.ISPEC_DISK_COUNT, disk_count), + (constants.ISPEC_NIC_COUNT, nic_count), + ] + map((lambda d: (constants.ISPEC_DISK_SIZE, d)), disk_sizes) + + return filter(None, + (_compute_fn(name, ipolicy, value) + for (name, value) in test_settings)) + + +def _ComputeIPolicyInstanceViolation(ipolicy, instance, + _compute_fn=_ComputeIPolicySpecViolation): + """Compute if instance meets the specs of ipolicy. + + @type ipolicy: dict + @param ipolicy: The ipolicy to verify against + @type instance: L{objects.Instance} + @param instance: The instance to verify + @param _compute_fn: The function to verify ipolicy (unittest only) + @see: L{_ComputeIPolicySpecViolation} + + """ + mem_size = instance.beparams.get(constants.BE_MAXMEM, None) + cpu_count = instance.beparams.get(constants.BE_VCPUS, None) + disk_count = len(instance.disks) + disk_sizes = [disk.size for disk in instance.disks] + nic_count = len(instance.nics) + + return _compute_fn(ipolicy, mem_size, cpu_count, disk_count, nic_count, + disk_sizes) + + +def _ComputeIPolicyInstanceSpecViolation(ipolicy, instance_spec, + _compute_fn=_ComputeIPolicySpecViolation): + """Compute if instance specs meets the specs of ipolicy. + + @type ipolicy: dict + @param ipolicy: The ipolicy to verify against + @param instance_spec: dict + @param instance_spec: The instance spec to verify + @param _compute_fn: The function to verify ipolicy (unittest only) + @see: L{_ComputeIPolicySpecViolation} + + """ + mem_size = instance_spec.get(constants.ISPEC_MEM_SIZE, None) + cpu_count = instance_spec.get(constants.ISPEC_CPU_COUNT, None) + disk_count = instance_spec.get(constants.ISPEC_DISK_COUNT, 0) + disk_sizes = instance_spec.get(constants.ISPEC_DISK_SIZE, []) + nic_count = instance_spec.get(constants.ISPEC_NIC_COUNT, 0) + + return _compute_fn(ipolicy, mem_size, cpu_count, disk_count, nic_count, + disk_sizes) + + +def _ComputeIPolicyNodeViolation(ipolicy, instance, current_group, + target_group, + _compute_fn=_ComputeIPolicyInstanceViolation): + """Compute if instance meets the specs of the new target group. + + @param ipolicy: The ipolicy to verify + @param instance: The instance object to verify + @param current_group: The current group of the instance + @param target_group: The new group of the instance + @param _compute_fn: The function to verify ipolicy (unittest only) + @see: L{_ComputeIPolicySpecViolation} + + """ + if current_group == target_group: + return [] + else: + return _compute_fn(ipolicy, instance) + + +def _CheckTargetNodeIPolicy(lu, ipolicy, instance, node, ignore=False, + _compute_fn=_ComputeIPolicyNodeViolation): + """Checks that the target node is correct in terms of instance policy. + + @param ipolicy: The ipolicy to verify + @param instance: The instance object to verify + @param node: The new node to relocate + @param ignore: Ignore violations of the ipolicy + @param _compute_fn: The function to verify ipolicy (unittest only) + @see: L{_ComputeIPolicySpecViolation} + + """ + primary_node = lu.cfg.GetNodeInfo(instance.primary_node) + res = _compute_fn(ipolicy, instance, primary_node.group, node.group) + + if res: + msg = ("Instance does not meet target node group's (%s) instance" + " policy: %s") % (node.group, utils.CommaJoin(res)) + if ignore: + lu.LogWarning(msg) + else: + raise errors.OpPrereqError(msg, errors.ECODE_INVAL) + + +def _ComputeNewInstanceViolations(old_ipolicy, new_ipolicy, instances): + """Computes a set of any instances that would violate the new ipolicy. + + @param old_ipolicy: The current (still in-place) ipolicy + @param new_ipolicy: The new (to become) ipolicy + @param instances: List of instances to verify + @return: A list of instances which violates the new ipolicy but did not before + + """ + return (_ComputeViolatingInstances(old_ipolicy, instances) - + _ComputeViolatingInstances(new_ipolicy, instances)) + + def _ExpandItemName(fn, name, kind): """Expand an item name. @@ -1249,14 +1440,26 @@ def _DecideSelfPromotion(lu, exceptions=None): return mc_now < mc_should -def _CalculateGroupIPolicy(cfg, group): +def _CalculateGroupIPolicy(cluster, group): """Calculate instance policy for group. """ - cluster = cfg.GetClusterInfo() return cluster.SimpleFillIPolicy(group.ipolicy) +def _ComputeViolatingInstances(ipolicy, instances): + """Computes a set of instances who violates given ipolicy. + + @param ipolicy: The ipolicy to verify + @type instances: object.Instance + @param instances: List of instances to verify + @return: A frozenset of instance names violating the ipolicy + + """ + return frozenset([inst.name for inst in instances + if _ComputeIPolicyInstanceViolation(ipolicy, inst)]) + + def _CheckNicsBridgesExist(lu, target_nics, target_node): """Check that the brigdes needed by a list of nics exist. @@ -1498,7 +1701,9 @@ class LUClusterDestroy(LogicalUnit): ems = self.cfg.GetUseExternalMipScript() result = self.rpc.call_node_deactivate_master_ip(master_params.name, master_params, ems) - result.Raise("Could not disable the master role") + if result.fail_msg: + self.LogWarning("Error disabling the master IP address: %s", + result.fail_msg) return master_params.name @@ -2135,34 +2340,6 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): msg = "cannot reach the master IP" _ErrorIf(True, constants.CV_ENODENET, node, msg) - def _VerifyInstancePolicy(self, instance): - """Verify instance specs against instance policy set on node group level. - - - """ - cluster = self.cfg.GetClusterInfo() - full_beparams = cluster.FillBE(instance) - ipolicy = cluster.SimpleFillIPolicy(self.group_info.ipolicy) - - mem_size = full_beparams.get(constants.BE_MAXMEM, None) - cpu_count = full_beparams.get(constants.BE_VCPUS, None) - disk_count = len(instance.disks) - disk_sizes = [disk.size for disk in instance.disks] - nic_count = len(instance.nics) - - test_settings = [ - (constants.ISPEC_MEM_SIZE, mem_size), - (constants.ISPEC_CPU_COUNT, cpu_count), - (constants.ISPEC_DISK_COUNT, disk_count), - (constants.ISPEC_NIC_COUNT, nic_count), - ] + map((lambda d: (constants.ISPEC_DISK_SIZE, d)), disk_sizes) - - for (name, value) in test_settings: - test_result = _CheckMinMaxSpecs(name, ipolicy, value) - self._ErrorIf(test_result is not None, - constants.CV_EINSTANCEPOLICY, instance.name, - test_result) - def _VerifyInstance(self, instance, instanceconfig, node_image, diskstatus): """Verify an instance. @@ -2177,7 +2354,9 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): node_vol_should = {} instanceconfig.MapLVsByNode(node_vol_should) - self._VerifyInstancePolicy(instanceconfig) + ipolicy = _CalculateGroupIPolicy(self.cfg.GetClusterInfo(), self.group_info) + err = _ComputeIPolicyInstanceViolation(ipolicy, instanceconfig) + _ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance, err) for node in node_vol_should: n_img = node_image[node] @@ -2259,14 +2438,13 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): # we already list instances living on such nodes, and that's # enough warning continue - #TODO(dynmem): use MINMEM for checking #TODO(dynmem): also consider ballooning out other instances for prinode, instances in n_img.sbp.items(): needed_mem = 0 for instance in instances: bep = cluster_info.FillBE(instance_cfg[instance]) if bep[constants.BE_AUTO_BALANCE]: - needed_mem += bep[constants.BE_MAXMEM] + needed_mem += bep[constants.BE_MINMEM] test = n_img.mfree < needed_mem self._ErrorIf(test, constants.CV_ENODEN1, node, "not enough memory to accomodate instance failovers" @@ -3628,8 +3806,14 @@ class LUClusterSetParams(LogicalUnit): # all nodes to be modified. self.needed_locks = { locking.LEVEL_NODE: locking.ALL_SET, + locking.LEVEL_INSTANCE: locking.ALL_SET, + locking.LEVEL_NODEGROUP: locking.ALL_SET, + } + self.share_locks = { + locking.LEVEL_NODE: 1, + locking.LEVEL_INSTANCE: 1, + locking.LEVEL_NODEGROUP: 1, } - self.share_locks[locking.LEVEL_NODE] = 1 def BuildHooksEnv(self): """Build hooks env. @@ -3733,13 +3917,26 @@ class LUClusterSetParams(LogicalUnit): for storage, svalues in new_disk_state.items()) if self.op.ipolicy: - ipolicy = {} - for key, value in self.op.ipolicy.items(): - utils.ForceDictType(value, constants.ISPECS_PARAMETER_TYPES) - ipolicy[key] = _GetUpdatedParams(cluster.ipolicy.get(key, {}), - value) - objects.InstancePolicy.CheckParameterSyntax(ipolicy) - self.new_ipolicy = ipolicy + self.new_ipolicy = _GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy, + group_policy=False) + + all_instances = self.cfg.GetAllInstancesInfo().values() + violations = set() + for group in self.cfg.GetAllNodeGroupsInfo().values(): + instances = frozenset([inst for inst in all_instances + if compat.any(node in group.members + for node in inst.all_nodes)]) + new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy) + new = _ComputeNewInstanceViolations(_CalculateGroupIPolicy(cluster, + group), + new_ipolicy, instances) + if new: + violations.update(new) + + if violations: + self.LogWarning("After the ipolicy change the following instances" + " violate them: %s", + utils.CommaJoin(violations)) if self.op.nicparams: utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES) @@ -5377,6 +5574,12 @@ class LUNodeAdd(LogicalUnit): if self.op.ndparams: utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES) + if self.op.hv_state: + self.new_hv_state = _MergeAndVerifyHvState(self.op.hv_state, None) + + if self.op.disk_state: + self.new_disk_state = _MergeAndVerifyDiskState(self.op.disk_state, None) + def Exec(self, feedback_fn): """Adds the new node to the cluster. @@ -5415,6 +5618,12 @@ class LUNodeAdd(LogicalUnit): else: new_node.ndparams = {} + if self.op.hv_state: + new_node.hv_state_static = self.new_hv_state + + if self.op.disk_state: + new_node.disk_state_static = self.new_disk_state + # check connectivity result = self.rpc.call_version([node])[node] result.Raise("Can't get version information from node %s" % node) @@ -6206,6 +6415,8 @@ def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name): @param requested: the amount of memory in MiB to check for @type hypervisor_name: C{str} @param hypervisor_name: the hypervisor to ask for memory stats + @rtype: integer + @return: node current free memory @raise errors.OpPrereqError: if the node doesn't have enough memory, or we cannot check the node @@ -6225,6 +6436,7 @@ def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name): " needed %s MiB, available %s MiB" % (node, reason, requested, free_mem), errors.ECODE_NORES) + return free_mem def _CheckNodesFreeDiskPerVG(lu, nodenames, req_sizes): @@ -6340,6 +6552,11 @@ class LUInstanceStartup(LogicalUnit): def ExpandNames(self): self._ExpandAndLockInstance() + self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE + + def DeclareLocks(self, level): + if level == locking.LEVEL_NODE_RES: + self._LockInstancesNodes(primary_only=True, level=locking.LEVEL_NODE_RES) def BuildHooksEnv(self): """Build hooks env. @@ -6396,6 +6613,7 @@ class LUInstanceStartup(LogicalUnit): _CheckNodeOnline(self, instance.primary_node) bep = self.cfg.GetClusterInfo().FillBE(instance) + bep.update(self.op.beparams) # check bridges existence _CheckInstanceBridgesExist(self, instance) @@ -6408,7 +6626,7 @@ class LUInstanceStartup(LogicalUnit): if not remote_info.payload: # not running already _CheckNodeFreeMemory(self, instance.primary_node, "starting instance %s" % instance.name, - bep[constants.BE_MAXMEM], instance.hypervisor) + bep[constants.BE_MINMEM], instance.hypervisor) def Exec(self, feedback_fn): """Start the instance. @@ -6703,9 +6921,39 @@ class LUInstanceRecreateDisks(LogicalUnit): HTYPE = constants.HTYPE_INSTANCE REQ_BGL = False + _MODIFYABLE = frozenset([ + constants.IDISK_SIZE, + constants.IDISK_MODE, + ]) + + # New or changed disk parameters may have different semantics + assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([ + constants.IDISK_ADOPT, + + # TODO: Implement support changing VG while recreating + constants.IDISK_VG, + constants.IDISK_METAVG, + ])) + def CheckArguments(self): - # normalise the disk list - self.op.disks = sorted(frozenset(self.op.disks)) + if self.op.disks and ht.TPositiveInt(self.op.disks[0]): + # Normalize and convert deprecated list of disk indices + self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))] + + duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks)) + if duplicates: + raise errors.OpPrereqError("Some disks have been specified more than" + " once: %s" % utils.CommaJoin(duplicates), + errors.ECODE_INVAL) + + for (idx, params) in self.op.disks: + utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES) + unsupported = frozenset(params.keys()) - self._MODIFYABLE + if unsupported: + raise errors.OpPrereqError("Parameters for disk %s try to change" + " unmodifyable parameter(s): %s" % + (idx, utils.CommaJoin(unsupported)), + errors.ECODE_INVAL) def ExpandNames(self): self._ExpandAndLockInstance() @@ -6715,6 +6963,7 @@ class LUInstanceRecreateDisks(LogicalUnit): self.needed_locks[locking.LEVEL_NODE] = list(self.op.nodes) else: self.needed_locks[locking.LEVEL_NODE] = [] + self.needed_locks[locking.LEVEL_NODE_RES] = [] def DeclareLocks(self, level): if level == locking.LEVEL_NODE: @@ -6770,6 +7019,7 @@ class LUInstanceRecreateDisks(LogicalUnit): if instance.disk_template == constants.DT_DISKLESS: raise errors.OpPrereqError("Instance '%s' has no disks" % self.op.instance_name, errors.ECODE_INVAL) + # if we replace nodes *and* the old primary is offline, we don't # check assert instance.primary_node in self.owned_locks(locking.LEVEL_NODE) @@ -6779,17 +7029,22 @@ class LUInstanceRecreateDisks(LogicalUnit): _CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING, msg="cannot recreate disks") - if not self.op.disks: - self.op.disks = range(len(instance.disks)) + if self.op.disks: + self.disks = dict(self.op.disks) else: - for idx in self.op.disks: - if idx >= len(instance.disks): - raise errors.OpPrereqError("Invalid disk index '%s'" % idx, - errors.ECODE_INVAL) - if self.op.disks != range(len(instance.disks)) and self.op.nodes: + self.disks = dict((idx, {}) for idx in range(len(instance.disks))) + + maxidx = max(self.disks.keys()) + if maxidx >= len(instance.disks): + raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx, + errors.ECODE_INVAL) + + if (self.op.nodes and + sorted(self.disks.keys()) != range(len(instance.disks))): raise errors.OpPrereqError("Can't recreate disks partially and" " change the nodes at the same time", errors.ECODE_INVAL) + self.instance = instance def Exec(self, feedback_fn): @@ -6802,30 +7057,42 @@ class LUInstanceRecreateDisks(LogicalUnit): self.owned_locks(locking.LEVEL_NODE_RES)) to_skip = [] - mods = [] # keeps track of needed logical_id changes + mods = [] # keeps track of needed changes for idx, disk in enumerate(instance.disks): - if idx not in self.op.disks: # disk idx has not been passed in + try: + changes = self.disks[idx] + except KeyError: + # Disk should not be recreated to_skip.append(idx) continue + # update secondaries for disks, if needed - if self.op.nodes: - if disk.dev_type == constants.LD_DRBD8: - # need to update the nodes and minors - assert len(self.op.nodes) == 2 - assert len(disk.logical_id) == 6 # otherwise disk internals - # have changed - (_, _, old_port, _, _, old_secret) = disk.logical_id - new_minors = self.cfg.AllocateDRBDMinor(self.op.nodes, instance.name) - new_id = (self.op.nodes[0], self.op.nodes[1], old_port, - new_minors[0], new_minors[1], old_secret) - assert len(disk.logical_id) == len(new_id) - mods.append((idx, new_id)) + if self.op.nodes and disk.dev_type == constants.LD_DRBD8: + # need to update the nodes and minors + assert len(self.op.nodes) == 2 + assert len(disk.logical_id) == 6 # otherwise disk internals + # have changed + (_, _, old_port, _, _, old_secret) = disk.logical_id + new_minors = self.cfg.AllocateDRBDMinor(self.op.nodes, instance.name) + new_id = (self.op.nodes[0], self.op.nodes[1], old_port, + new_minors[0], new_minors[1], old_secret) + assert len(disk.logical_id) == len(new_id) + else: + new_id = None + + mods.append((idx, new_id, changes)) # now that we have passed all asserts above, we can apply the mods # in a single run (to avoid partial changes) - for idx, new_id in mods: - instance.disks[idx].logical_id = new_id + for idx, new_id, changes in mods: + disk = instance.disks[idx] + if new_id is not None: + assert disk.dev_type == constants.LD_DRBD8 + disk.logical_id = new_id + if changes: + disk.Update(size=changes.get(constants.IDISK_SIZE, None), + mode=changes.get(constants.IDISK_MODE, None)) # change primary node, if needed if self.op.nodes: @@ -7104,13 +7371,17 @@ class LUInstanceFailover(LogicalUnit): self.needed_locks[locking.LEVEL_NODE] = [] self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE + self.needed_locks[locking.LEVEL_NODE_RES] = [] + self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE + ignore_consistency = self.op.ignore_consistency shutdown_timeout = self.op.shutdown_timeout self._migrater = TLMigrateInstance(self, self.op.instance_name, cleanup=False, failover=True, ignore_consistency=ignore_consistency, - shutdown_timeout=shutdown_timeout) + shutdown_timeout=shutdown_timeout, + ignore_ipolicy=self.op.ignore_ipolicy) self.tasklets = [self._migrater] def DeclareLocks(self, level): @@ -7125,6 +7396,10 @@ class LUInstanceFailover(LogicalUnit): del self.recalculate_locks[locking.LEVEL_NODE] else: self._LockInstancesNodes() + elif level == locking.LEVEL_NODE_RES: + # Copy node locks + self.needed_locks[locking.LEVEL_NODE_RES] = \ + self.needed_locks[locking.LEVEL_NODE][:] def BuildHooksEnv(self): """Build hooks env. @@ -7181,10 +7456,16 @@ class LUInstanceMigrate(LogicalUnit): self.needed_locks[locking.LEVEL_NODE] = [] self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE - self._migrater = TLMigrateInstance(self, self.op.instance_name, - cleanup=self.op.cleanup, - failover=False, - fallback=self.op.allow_failover) + self.needed_locks[locking.LEVEL_NODE] = [] + self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE + + self._migrater = \ + TLMigrateInstance(self, self.op.instance_name, + cleanup=self.op.cleanup, + failover=False, + fallback=self.op.allow_failover, + allow_runtime_changes=self.op.allow_runtime_changes, + ignore_ipolicy=self.op.ignore_ipolicy) self.tasklets = [self._migrater] def DeclareLocks(self, level): @@ -7199,6 +7480,10 @@ class LUInstanceMigrate(LogicalUnit): del self.recalculate_locks[locking.LEVEL_NODE] else: self._LockInstancesNodes() + elif level == locking.LEVEL_NODE_RES: + # Copy node locks + self.needed_locks[locking.LEVEL_NODE_RES] = \ + self.needed_locks[locking.LEVEL_NODE][:] def BuildHooksEnv(self): """Build hooks env. @@ -7215,6 +7500,7 @@ class LUInstanceMigrate(LogicalUnit): "MIGRATE_CLEANUP": self.op.cleanup, "OLD_PRIMARY": source_node, "NEW_PRIMARY": target_node, + "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes, }) if instance.disk_template in constants.DTS_INT_MIRROR: @@ -7313,6 +7599,10 @@ class LUInstanceMove(LogicalUnit): _CheckNodeOnline(self, target_node) _CheckNodeNotDrained(self, target_node) _CheckNodeVmCapable(self, target_node) + ipolicy = _CalculateGroupIPolicy(self.cfg.GetClusterInfo(), + self.cfg.GetNodeGroup(node.group)) + _CheckTargetNodeIPolicy(self, ipolicy, instance, node, + ignore=self.op.ignore_ipolicy) if instance.admin_state == constants.ADMINST_UP: # check memory requirements on the secondary node @@ -7454,6 +7744,7 @@ class LUNodeMigrate(LogicalUnit): """ return { "NODE_NAME": self.op.node_name, + "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes, } def BuildHooksNodes(self): @@ -7468,12 +7759,15 @@ class LUNodeMigrate(LogicalUnit): def Exec(self, feedback_fn): # Prepare jobs for migration instances + allow_runtime_changes = self.op.allow_runtime_changes jobs = [ [opcodes.OpInstanceMigrate(instance_name=inst.name, mode=self.op.mode, live=self.op.live, iallocator=self.op.iallocator, - target_node=self.op.target_node)] + target_node=self.op.target_node, + allow_runtime_changes=allow_runtime_changes, + ignore_ipolicy=self.op.ignore_ipolicy)] for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name) ] @@ -7510,6 +7804,8 @@ class TLMigrateInstance(Tasklet): and target node @type shutdown_timeout: int @ivar shutdown_timeout: In case of failover timeout of the shutdown + @type ignore_ipolicy: bool + @ivar ignore_ipolicy: If true, we can ignore instance policy when migrating """ @@ -7520,7 +7816,9 @@ class TLMigrateInstance(Tasklet): def __init__(self, lu, instance_name, cleanup=False, failover=False, fallback=False, ignore_consistency=False, - shutdown_timeout=constants.DEFAULT_SHUTDOWN_TIMEOUT): + allow_runtime_changes=True, + shutdown_timeout=constants.DEFAULT_SHUTDOWN_TIMEOUT, + ignore_ipolicy=False): """Initializes this class. """ @@ -7534,6 +7832,8 @@ class TLMigrateInstance(Tasklet): self.fallback = fallback self.ignore_consistency = ignore_consistency self.shutdown_timeout = shutdown_timeout + self.ignore_ipolicy = ignore_ipolicy + self.allow_runtime_changes = allow_runtime_changes def CheckPrereq(self): """Check prerequisites. @@ -7545,6 +7845,7 @@ class TLMigrateInstance(Tasklet): instance = self.cfg.GetInstanceInfo(instance_name) assert instance is not None self.instance = instance + cluster = self.cfg.GetClusterInfo() if (not self.cleanup and not instance.admin_state == constants.ADMINST_UP and @@ -7572,6 +7873,13 @@ class TLMigrateInstance(Tasklet): # BuildHooksEnv self.target_node = self.lu.op.target_node + # Check that the target node is correct in terms of instance policy + nodeinfo = self.cfg.GetNodeInfo(self.target_node) + group_info = self.cfg.GetNodeGroup(nodeinfo.group) + ipolicy = _CalculateGroupIPolicy(cluster, group_info) + _CheckTargetNodeIPolicy(self.lu, ipolicy, instance, nodeinfo, + ignore=self.ignore_ipolicy) + # self.target_node is already populated, either directly or by the # iallocator run target_node = self.target_node @@ -7605,14 +7913,22 @@ class TLMigrateInstance(Tasklet): " node can be passed)" % (instance.disk_template, text), errors.ECODE_INVAL) + nodeinfo = self.cfg.GetNodeInfo(target_node) + group_info = self.cfg.GetNodeGroup(nodeinfo.group) + ipolicy = _CalculateGroupIPolicy(cluster, group_info) + _CheckTargetNodeIPolicy(self.lu, ipolicy, instance, nodeinfo, + ignore=self.ignore_ipolicy) - i_be = self.cfg.GetClusterInfo().FillBE(instance) + i_be = cluster.FillBE(instance) # check memory requirements on the secondary node - if not self.failover or instance.admin_state == constants.ADMINST_UP: - _CheckNodeFreeMemory(self.lu, target_node, "migrating instance %s" % - instance.name, i_be[constants.BE_MAXMEM], - instance.hypervisor) + if (not self.cleanup and + (not self.failover or instance.admin_state == constants.ADMINST_UP)): + self.tgt_free_mem = _CheckNodeFreeMemory(self.lu, target_node, + "migrating instance %s" % + instance.name, + i_be[constants.BE_MINMEM], + instance.hypervisor) else: self.lu.LogInfo("Not checking memory on the secondary node as" " instance will not be started") @@ -7662,8 +7978,7 @@ class TLMigrateInstance(Tasklet): self.lu.op.live = None elif self.lu.op.mode is None: # read the default value from the hypervisor - i_hv = self.cfg.GetClusterInfo().FillHV(self.instance, - skip_globals=False) + i_hv = cluster.FillHV(self.instance, skip_globals=False) self.lu.op.mode = i_hv[constants.HV_MIGRATION_MODE] self.live = self.lu.op.mode == constants.HT_MIGRATION_LIVE @@ -7671,10 +7986,21 @@ class TLMigrateInstance(Tasklet): # Failover is never live self.live = False + if not (self.failover or self.cleanup): + remote_info = self.rpc.call_instance_info(instance.primary_node, + instance.name, + instance.hypervisor) + remote_info.Raise("Error checking instance on node %s" % + instance.primary_node) + instance_running = bool(remote_info.payload) + if instance_running: + self.current_mem = int(remote_info.payload["memory"]) + def _RunAllocator(self): """Run the allocator based on input opcode. """ + # FIXME: add a self.ignore_ipolicy option ial = IAllocator(self.cfg, self.rpc, mode=constants.IALLOCATOR_MODE_RELOC, name=self.instance_name, @@ -7909,11 +8235,24 @@ class TLMigrateInstance(Tasklet): (src_version, dst_version)) self.feedback_fn("* checking disk consistency between source and target") - for dev in instance.disks: + for (idx, dev) in enumerate(instance.disks): if not _CheckDiskConsistency(self.lu, dev, target_node, False): raise errors.OpExecError("Disk %s is degraded or not fully" " synchronized on target node," - " aborting migration" % dev.iv_name) + " aborting migration" % idx) + + if self.current_mem > self.tgt_free_mem: + if not self.allow_runtime_changes: + raise errors.OpExecError("Memory ballooning not allowed and not enough" + " free memory to fit instance %s on target" + " node %s (have %dMB, need %dMB)" % + (instance.name, target_node, + self.tgt_free_mem, self.current_mem)) + self.feedback_fn("* setting instance memory to %s" % self.tgt_free_mem) + rpcres = self.rpc.call_instance_balloon_memory(instance.primary_node, + instance, + self.tgt_free_mem) + rpcres.Raise("Cannot modify instance runtime memory") # First get the migration information from the remote node result = self.rpc.call_migration_info(source_node, instance) @@ -8026,6 +8365,21 @@ class TLMigrateInstance(Tasklet): self._GoReconnect(False) self._WaitUntilSync() + # If the instance's disk template is `rbd' and there was a successful + # migration, unmap the device from the source node. + if self.instance.disk_template == constants.DT_RBD: + disks = _ExpandCheckDisks(instance, instance.disks) + self.feedback_fn("* unmapping instance's disks from %s" % source_node) + for disk in disks: + result = self.rpc.call_blockdev_shutdown(source_node, disk) + msg = result.fail_msg + if msg: + logging.error("Migration was successful, but couldn't unmap the" + " block device %s on source node %s: %s", + disk.iv_name, source_node, msg) + logging.error("You need to unmap the device %s manually on %s", + disk.iv_name, source_node) + self.feedback_fn("* done") def _ExecFailover(self): @@ -8043,16 +8397,16 @@ class TLMigrateInstance(Tasklet): if instance.admin_state == constants.ADMINST_UP: self.feedback_fn("* checking disk consistency between source and target") - for dev in instance.disks: + for (idx, dev) in enumerate(instance.disks): # for drbd, these are drbd over lvm if not _CheckDiskConsistency(self.lu, dev, target_node, False): if primary_node.offline: self.feedback_fn("Node %s is offline, ignoring degraded disk %s on" " target node %s" % - (primary_node.name, dev.iv_name, target_node)) + (primary_node.name, idx, target_node)) elif not self.ignore_consistency: raise errors.OpExecError("Disk %s is degraded on target node," - " aborting failover" % dev.iv_name) + " aborting failover" % idx) else: self.feedback_fn("* not checking disk consistency as instance is not" " running") @@ -8245,6 +8599,12 @@ def _ComputeLDParams(disk_template, disk_params): constants.LDP_DEFAULT_METAVG: dt_params[constants.DRBD_DEFAULT_METAVG], constants.LDP_DISK_CUSTOM: dt_params[constants.DRBD_DISK_CUSTOM], constants.LDP_NET_CUSTOM: dt_params[constants.DRBD_NET_CUSTOM], + constants.LDP_DYNAMIC_RESYNC: dt_params[constants.DRBD_DYNAMIC_RESYNC], + constants.LDP_PLAN_AHEAD: dt_params[constants.DRBD_PLAN_AHEAD], + constants.LDP_FILL_TARGET: dt_params[constants.DRBD_FILL_TARGET], + constants.LDP_DELAY_TARGET: dt_params[constants.DRBD_DELAY_TARGET], + constants.LDP_MAX_RATE: dt_params[constants.DRBD_MAX_RATE], + constants.LDP_MIN_RATE: dt_params[constants.DRBD_MIN_RATE], } drbd_params = \ @@ -8287,6 +8647,15 @@ def _ComputeLDParams(disk_template, disk_params): elif disk_template == constants.DT_BLOCK: result.append(constants.DISK_LD_DEFAULTS[constants.LD_BLOCKDEV]) + elif disk_template == constants.DT_RBD: + params = { + constants.LDP_POOL: dt_params[constants.RBD_POOL] + } + params = \ + objects.FillDict(constants.DISK_LD_DEFAULTS[constants.LD_RBD], + params) + result.append(params) + return result @@ -8315,11 +8684,26 @@ def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names, return drbd_dev -def _GenerateDiskTemplate(lu, template_name, - instance_name, primary_node, - secondary_nodes, disk_info, - file_storage_dir, file_driver, - base_index, feedback_fn, disk_params): +_DISK_TEMPLATE_NAME_PREFIX = { + constants.DT_PLAIN: "", + constants.DT_RBD: ".rbd", + } + + +_DISK_TEMPLATE_DEVICE_TYPE = { + constants.DT_PLAIN: constants.LD_LV, + constants.DT_FILE: constants.LD_FILE, + constants.DT_SHARED_FILE: constants.LD_FILE, + constants.DT_BLOCK: constants.LD_BLOCKDEV, + constants.DT_RBD: constants.LD_RBD, + } + + +def _GenerateDiskTemplate(lu, template_name, instance_name, primary_node, + secondary_nodes, disk_info, file_storage_dir, file_driver, base_index, + feedback_fn, disk_params, + _req_file_storage=opcodes.RequireFileStorage, + _req_shr_file_storage=opcodes.RequireSharedFileStorage): """Generate the entire disk layout for a given template type. """ @@ -8329,25 +8713,9 @@ def _GenerateDiskTemplate(lu, template_name, disk_count = len(disk_info) disks = [] ld_params = _ComputeLDParams(template_name, disk_params) + if template_name == constants.DT_DISKLESS: pass - elif template_name == constants.DT_PLAIN: - if len(secondary_nodes) != 0: - raise errors.ProgrammerError("Wrong template configuration") - - names = _GenerateUniqueNames(lu, [".disk%d" % (base_index + i) - for i in range(disk_count)]) - for idx, disk in enumerate(disk_info): - disk_index = idx + base_index - vg = disk.get(constants.IDISK_VG, vgname) - feedback_fn("* disk %i, vg %s, name %s" % (idx, vg, names[idx])) - disk_dev = objects.Disk(dev_type=constants.LD_LV, - size=disk[constants.IDISK_SIZE], - logical_id=(vg, names[idx]), - iv_name="disk/%d" % disk_index, - mode=disk[constants.IDISK_MODE], - params=ld_params[0]) - disks.append(disk_dev) elif template_name == constants.DT_DRBD8: drbd_params, data_params, meta_params = ld_params if len(secondary_nodes) != 1: @@ -8375,57 +8743,54 @@ def _GenerateDiskTemplate(lu, template_name, drbd_params, data_params, meta_params) disk_dev.mode = disk[constants.IDISK_MODE] disks.append(disk_dev) - elif template_name == constants.DT_FILE: - if len(secondary_nodes) != 0: - raise errors.ProgrammerError("Wrong template configuration") - - opcodes.RequireFileStorage() - - for idx, disk in enumerate(disk_info): - disk_index = idx + base_index - disk_dev = objects.Disk(dev_type=constants.LD_FILE, - size=disk[constants.IDISK_SIZE], - iv_name="disk/%d" % disk_index, - logical_id=(file_driver, - "%s/disk%d" % (file_storage_dir, - disk_index)), - mode=disk[constants.IDISK_MODE], - params=ld_params[0]) - disks.append(disk_dev) - elif template_name == constants.DT_SHARED_FILE: - if len(secondary_nodes) != 0: + else: + if secondary_nodes: raise errors.ProgrammerError("Wrong template configuration") - opcodes.RequireSharedFileStorage() + if template_name == constants.DT_FILE: + _req_file_storage() + elif template_name == constants.DT_SHARED_FILE: + _req_shr_file_storage() - for idx, disk in enumerate(disk_info): - disk_index = idx + base_index - disk_dev = objects.Disk(dev_type=constants.LD_FILE, - size=disk[constants.IDISK_SIZE], - iv_name="disk/%d" % disk_index, - logical_id=(file_driver, - "%s/disk%d" % (file_storage_dir, - disk_index)), - mode=disk[constants.IDISK_MODE], - params=ld_params[0]) - disks.append(disk_dev) - elif template_name == constants.DT_BLOCK: - if len(secondary_nodes) != 0: - raise errors.ProgrammerError("Wrong template configuration") + name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None) + if name_prefix is None: + names = None + else: + names = _GenerateUniqueNames(lu, ["%s.disk%s" % + (name_prefix, base_index + i) + for i in range(disk_count)]) + + dev_type = _DISK_TEMPLATE_DEVICE_TYPE[template_name] + + if template_name == constants.DT_PLAIN: + def logical_id_fn(idx, _, disk): + vg = disk.get(constants.IDISK_VG, vgname) + return (vg, names[idx]) + elif template_name in (constants.DT_FILE, constants.DT_SHARED_FILE): + logical_id_fn = \ + lambda _, disk_index, disk: (file_driver, + "%s/disk%d" % (file_storage_dir, + disk_index)) + elif template_name == constants.DT_BLOCK: + logical_id_fn = \ + lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL, + disk[constants.IDISK_ADOPT]) + elif template_name == constants.DT_RBD: + logical_id_fn = lambda idx, _, disk: ("rbd", names[idx]) + else: + raise errors.ProgrammerError("Unknown disk template '%s'" % template_name) for idx, disk in enumerate(disk_info): disk_index = idx + base_index - disk_dev = objects.Disk(dev_type=constants.LD_BLOCKDEV, - size=disk[constants.IDISK_SIZE], - logical_id=(constants.BLOCKDEV_DRIVER_MANUAL, - disk[constants.IDISK_ADOPT]), - iv_name="disk/%d" % disk_index, - mode=disk[constants.IDISK_MODE], - params=ld_params[0]) - disks.append(disk_dev) + size = disk[constants.IDISK_SIZE] + feedback_fn("* disk %s, size %s" % + (disk_index, utils.FormatUnit(size, "h"))) + disks.append(objects.Disk(dev_type=dev_type, size=size, + logical_id=logical_id_fn(idx, disk_index, disk), + iv_name="disk/%d" % disk_index, + mode=disk[constants.IDISK_MODE], + params=ld_params[0])) - else: - raise errors.ProgrammerError("Invalid disk template '%s'" % template_name) return disks @@ -8555,8 +8920,7 @@ def _CreateDisks(lu, instance, to_skip=None, target_node=None): for idx, device in enumerate(instance.disks): if to_skip and idx in to_skip: continue - logging.info("Creating volume %s for instance %s", - device.iv_name, instance.name) + logging.info("Creating disk %s for instance '%s'", idx, instance.name) #HARDCODE for node in all_nodes: f_create = node == pnode @@ -8584,7 +8948,7 @@ def _RemoveDisks(lu, instance, target_node=None): logging.info("Removing block devices for instance %s", instance.name) all_result = True - for device in instance.disks: + for (idx, device) in enumerate(instance.disks): if target_node: edata = [(target_node, device)] else: @@ -8593,8 +8957,8 @@ def _RemoveDisks(lu, instance, target_node=None): lu.cfg.SetDiskID(disk, node) msg = lu.rpc.call_blockdev_remove(node, disk).fail_msg if msg: - lu.LogWarning("Could not remove block device %s on node %s," - " continuing anyway: %s", device.iv_name, node, msg) + lu.LogWarning("Could not remove disk %s on node %s," + " continuing anyway: %s", idx, node, msg) all_result = False # if this is a DRBD disk, return its port to the pool @@ -8663,6 +9027,7 @@ def _ComputeDiskSize(disk_template, disks): constants.DT_FILE: None, constants.DT_SHARED_FILE: 0, constants.DT_BLOCK: 0, + constants.DT_RBD: 0, } if disk_template not in req_size_dict: @@ -9234,7 +9599,7 @@ class LUInstanceCreate(LogicalUnit): # pylint: disable=W0142 self.instance_file_storage_dir = utils.PathJoin(*joinargs) - def CheckPrereq(self): + def CheckPrereq(self): # pylint: disable=R0914 """Check prerequisites. """ @@ -9433,6 +9798,9 @@ class LUInstanceCreate(LogicalUnit): _ReleaseLocks(self, locking.LEVEL_NODE, keep=filter(None, [self.op.pnode, self.op.snode, self.op.src_node])) + _ReleaseLocks(self, locking.LEVEL_NODE_RES, + keep=filter(None, [self.op.pnode, self.op.snode, + self.op.src_node])) #### node related checks @@ -9471,14 +9839,38 @@ class LUInstanceCreate(LogicalUnit): nodenames = [pnode.name] + self.secondaries + # Verify instance specs + ispec = { + constants.ISPEC_MEM_SIZE: self.be_full.get(constants.BE_MAXMEM, None), + constants.ISPEC_CPU_COUNT: self.be_full.get(constants.BE_VCPUS, None), + constants.ISPEC_DISK_COUNT: len(self.disks), + constants.ISPEC_DISK_SIZE: [disk["size"] for disk in self.disks], + constants.ISPEC_NIC_COUNT: len(self.nics), + } + + group_info = self.cfg.GetNodeGroup(pnode.group) + ipolicy = _CalculateGroupIPolicy(cluster, group_info) + res = _ComputeIPolicyInstanceSpecViolation(ipolicy, ispec) + if not self.op.ignore_ipolicy and res: + raise errors.OpPrereqError(("Instance allocation to group %s violates" + " policy: %s") % (pnode.group, + utils.CommaJoin(res)), + errors.ECODE_INVAL) + # disk parameters (not customizable at instance or node level) # just use the primary node parameters, ignoring the secondary. - self.diskparams = self.cfg.GetNodeGroup(pnode.group).diskparams + self.diskparams = group_info.diskparams if not self.adopt_disks: - # Check lv size requirements, if not adopting - req_sizes = _ComputeDiskSizePerVG(self.op.disk_template, self.disks) - _CheckNodesFreeDiskPerVG(self, nodenames, req_sizes) + if self.op.disk_template == constants.DT_RBD: + # _CheckRADOSFreeSpace() is just a placeholder. + # Any function that checks prerequisites can be placed here. + # Check if there is enough space on the RADOS cluster. + _CheckRADOSFreeSpace() + else: + # Check lv size requirements, if not adopting + req_sizes = _ComputeDiskSizePerVG(self.op.disk_template, self.disks) + _CheckNodesFreeDiskPerVG(self, nodenames, req_sizes) elif self.op.disk_template == constants.DT_PLAIN: # Check the adoption data all_lvs = set(["%s/%s" % (disk[constants.IDISK_VG], @@ -9791,6 +10183,14 @@ class LUInstanceCreate(LogicalUnit): return list(iobj.all_nodes) +def _CheckRADOSFreeSpace(): + """Compute disk size requirements inside the RADOS cluster. + + """ + # For the RADOS cluster we assume there is always enough space. + pass + + class LUInstanceConsole(NoHooksLU): """Connect to an instance's console. @@ -9906,7 +10306,8 @@ class LUInstanceReplaceDisks(LogicalUnit): self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode, self.op.iallocator, self.op.remote_node, - self.op.disks, False, self.op.early_release) + self.op.disks, False, self.op.early_release, + self.op.ignore_ipolicy) self.tasklets = [self.replacer] @@ -9988,7 +10389,7 @@ class TLReplaceDisks(Tasklet): """ def __init__(self, lu, instance_name, mode, iallocator_name, remote_node, - disks, delay_iallocator, early_release): + disks, delay_iallocator, early_release, ignore_ipolicy): """Initializes this class. """ @@ -10002,6 +10403,7 @@ class TLReplaceDisks(Tasklet): self.disks = disks self.delay_iallocator = delay_iallocator self.early_release = early_release + self.ignore_ipolicy = ignore_ipolicy # Runtime data self.instance = None @@ -10224,6 +10626,16 @@ class TLReplaceDisks(Tasklet): if not self.disks: self.disks = range(len(self.instance.disks)) + # TODO: This is ugly, but right now we can't distinguish between internal + # submitted opcode and external one. We should fix that. + if self.remote_node_info: + # We change the node, lets verify it still meets instance policy + new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group) + ipolicy = _CalculateGroupIPolicy(self.cfg.GetClusterInfo(), + new_group_info) + _CheckTargetNodeIPolicy(self, ipolicy, instance, self.remote_node_info, + ignore=self.ignore_ipolicy) + # TODO: compute disk parameters primary_node_info = self.cfg.GetNodeInfo(instance.primary_node) secondary_node_info = self.cfg.GetNodeInfo(secondary_node) @@ -11081,6 +11493,7 @@ class LUInstanceGrowDisk(LogicalUnit): self._ExpandAndLockInstance() self.needed_locks[locking.LEVEL_NODE] = [] self.needed_locks[locking.LEVEL_NODE_RES] = [] + self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE def DeclareLocks(self, level): @@ -11133,7 +11546,8 @@ class LUInstanceGrowDisk(LogicalUnit): self.disk = instance.FindDisk(self.op.disk) if instance.disk_template not in (constants.DT_FILE, - constants.DT_SHARED_FILE): + constants.DT_SHARED_FILE, + constants.DT_RBD): # TODO: check the free disk space for file, when that feature will be # supported _CheckNodesFreeDiskPerVG(self, nodenames, @@ -11369,6 +11783,144 @@ class LUInstanceQueryData(NoHooksLU): return result +def PrepareContainerMods(mods, private_fn): + """Prepares a list of container modifications by adding a private data field. + + @type mods: list of tuples; (operation, index, parameters) + @param mods: List of modifications + @type private_fn: callable or None + @param private_fn: Callable for constructing a private data field for a + modification + @rtype: list + + """ + if private_fn is None: + fn = lambda: None + else: + fn = private_fn + + return [(op, idx, params, fn()) for (op, idx, params) in mods] + + +#: Type description for changes as returned by L{ApplyContainerMods}'s +#: callbacks +_TApplyContModsCbChanges = \ + ht.TMaybeListOf(ht.TAnd(ht.TIsLength(2), ht.TItems([ + ht.TNonEmptyString, + ht.TAny, + ]))) + + +def ApplyContainerMods(kind, container, chgdesc, mods, + create_fn, modify_fn, remove_fn): + """Applies descriptions in C{mods} to C{container}. + + @type kind: string + @param kind: One-word item description + @type container: list + @param container: Container to modify + @type chgdesc: None or list + @param chgdesc: List of applied changes + @type mods: list + @param mods: Modifications as returned by L{PrepareContainerMods} + @type create_fn: callable + @param create_fn: Callback for creating a new item (L{constants.DDM_ADD}); + receives absolute item index, parameters and private data object as added + by L{PrepareContainerMods}, returns tuple containing new item and changes + as list + @type modify_fn: callable + @param modify_fn: Callback for modifying an existing item + (L{constants.DDM_MODIFY}); receives absolute item index, item, parameters + and private data object as added by L{PrepareContainerMods}, returns + changes as list + @type remove_fn: callable + @param remove_fn: Callback on removing item; receives absolute item index, + item and private data object as added by L{PrepareContainerMods} + + """ + for (op, idx, params, private) in mods: + if idx == -1: + # Append + absidx = len(container) - 1 + elif idx < 0: + raise IndexError("Not accepting negative indices other than -1") + elif idx > len(container): + raise IndexError("Got %s index %s, but there are only %s" % + (kind, idx, len(container))) + else: + absidx = idx + + changes = None + + if op == constants.DDM_ADD: + # Calculate where item will be added + if idx == -1: + addidx = len(container) + else: + addidx = idx + + if create_fn is None: + item = params + else: + (item, changes) = create_fn(addidx, params, private) + + if idx == -1: + container.append(item) + else: + assert idx >= 0 + assert idx <= len(container) + # list.insert does so before the specified index + container.insert(idx, item) + else: + # Retrieve existing item + try: + item = container[absidx] + except IndexError: + raise IndexError("Invalid %s index %s" % (kind, idx)) + + if op == constants.DDM_REMOVE: + assert not params + + if remove_fn is not None: + remove_fn(absidx, item, private) + + changes = [("%s/%s" % (kind, absidx), "remove")] + + assert container[absidx] == item + del container[absidx] + elif op == constants.DDM_MODIFY: + if modify_fn is not None: + changes = modify_fn(absidx, item, params, private) + else: + raise errors.ProgrammerError("Unhandled operation '%s'" % op) + + assert _TApplyContModsCbChanges(changes) + + if not (chgdesc is None or changes is None): + chgdesc.extend(changes) + + +def _UpdateIvNames(base_index, disks): + """Updates the C{iv_name} attribute of disks. + + @type disks: list of L{objects.Disk} + + """ + for (idx, disk) in enumerate(disks): + disk.iv_name = "disk/%s" % (base_index + idx, ) + + +class _InstNicModPrivate: + """Data structure for network interface modifications. + + Used by L{LUInstanceSetParams}. + + """ + def __init__(self): + self.params = None + self.filled = None + + class LUInstanceSetParams(LogicalUnit): """Modifies an instances's parameters. @@ -11377,55 +11929,140 @@ class LUInstanceSetParams(LogicalUnit): HTYPE = constants.HTYPE_INSTANCE REQ_BGL = False + @staticmethod + def _UpgradeDiskNicMods(kind, mods, verify_fn): + assert ht.TList(mods) + assert not mods or len(mods[0]) in (2, 3) + + if mods and len(mods[0]) == 2: + result = [] + + addremove = 0 + for op, params in mods: + if op in (constants.DDM_ADD, constants.DDM_REMOVE): + result.append((op, -1, params)) + addremove += 1 + + if addremove > 1: + raise errors.OpPrereqError("Only one %s add or remove operation is" + " supported at a time" % kind, + errors.ECODE_INVAL) + else: + result.append((constants.DDM_MODIFY, op, params)) + + assert verify_fn(result) + else: + result = mods + + return result + + @staticmethod + def _CheckMods(kind, mods, key_types, item_fn): + """Ensures requested disk/NIC modifications are valid. + + """ + for (op, _, params) in mods: + assert ht.TDict(params) + + utils.ForceDictType(params, key_types) + + if op == constants.DDM_REMOVE: + if params: + raise errors.OpPrereqError("No settings should be passed when" + " removing a %s" % kind, + errors.ECODE_INVAL) + elif op in (constants.DDM_ADD, constants.DDM_MODIFY): + item_fn(op, params) + else: + raise errors.ProgrammerError("Unhandled operation '%s'" % op) + + @staticmethod + def _VerifyDiskModification(op, params): + """Verifies a disk modification. + + """ + if op == constants.DDM_ADD: + mode = params.setdefault(constants.IDISK_MODE, constants.DISK_RDWR) + if mode not in constants.DISK_ACCESS_SET: + raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode, + errors.ECODE_INVAL) + + size = params.get(constants.IDISK_SIZE, None) + if size is None: + raise errors.OpPrereqError("Required disk parameter '%s' missing" % + constants.IDISK_SIZE, errors.ECODE_INVAL) + + try: + size = int(size) + except (TypeError, ValueError), err: + raise errors.OpPrereqError("Invalid disk size parameter: %s" % err, + errors.ECODE_INVAL) + + params[constants.IDISK_SIZE] = size + + elif op == constants.DDM_MODIFY and constants.IDISK_SIZE in params: + raise errors.OpPrereqError("Disk size change not possible, use" + " grow-disk", errors.ECODE_INVAL) + + @staticmethod + def _VerifyNicModification(op, params): + """Verifies a network interface modification. + + """ + if op in (constants.DDM_ADD, constants.DDM_MODIFY): + ip = params.get(constants.INIC_IP, None) + if ip is None: + pass + elif ip.lower() == constants.VALUE_NONE: + params[constants.INIC_IP] = None + elif not netutils.IPAddress.IsValid(ip): + raise errors.OpPrereqError("Invalid IP address '%s'" % ip, + errors.ECODE_INVAL) + + bridge = params.get("bridge", None) + link = params.get(constants.INIC_LINK, None) + if bridge and link: + raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'" + " at the same time", errors.ECODE_INVAL) + elif bridge and bridge.lower() == constants.VALUE_NONE: + params["bridge"] = None + elif link and link.lower() == constants.VALUE_NONE: + params[constants.INIC_LINK] = None + + if op == constants.DDM_ADD: + macaddr = params.get(constants.INIC_MAC, None) + if macaddr is None: + params[constants.INIC_MAC] = constants.VALUE_AUTO + + if constants.INIC_MAC in params: + macaddr = params[constants.INIC_MAC] + if macaddr not in (constants.VALUE_AUTO, constants.VALUE_GENERATE): + macaddr = utils.NormalizeAndValidateMac(macaddr) + + if op == constants.DDM_MODIFY and macaddr == constants.VALUE_AUTO: + raise errors.OpPrereqError("'auto' is not a valid MAC address when" + " modifying an existing NIC", + errors.ECODE_INVAL) + def CheckArguments(self): if not (self.op.nics or self.op.disks or self.op.disk_template or self.op.hvparams or self.op.beparams or self.op.os_name or - self.op.online_inst or self.op.offline_inst): + self.op.offline is not None or self.op.runtime_mem): raise errors.OpPrereqError("No changes submitted", errors.ECODE_INVAL) if self.op.hvparams: _CheckGlobalHvParams(self.op.hvparams) - # Disk validation - disk_addremove = 0 - for disk_op, disk_dict in self.op.disks: - utils.ForceDictType(disk_dict, constants.IDISK_PARAMS_TYPES) - if disk_op == constants.DDM_REMOVE: - disk_addremove += 1 - continue - elif disk_op == constants.DDM_ADD: - disk_addremove += 1 - else: - if not isinstance(disk_op, int): - raise errors.OpPrereqError("Invalid disk index", errors.ECODE_INVAL) - if not isinstance(disk_dict, dict): - msg = "Invalid disk value: expected dict, got '%s'" % disk_dict - raise errors.OpPrereqError(msg, errors.ECODE_INVAL) - - if disk_op == constants.DDM_ADD: - mode = disk_dict.setdefault(constants.IDISK_MODE, constants.DISK_RDWR) - if mode not in constants.DISK_ACCESS_SET: - raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode, - errors.ECODE_INVAL) - size = disk_dict.get(constants.IDISK_SIZE, None) - if size is None: - raise errors.OpPrereqError("Required disk parameter size missing", - errors.ECODE_INVAL) - try: - size = int(size) - except (TypeError, ValueError), err: - raise errors.OpPrereqError("Invalid disk size parameter: %s" % - str(err), errors.ECODE_INVAL) - disk_dict[constants.IDISK_SIZE] = size - else: - # modification of disk - if constants.IDISK_SIZE in disk_dict: - raise errors.OpPrereqError("Disk size change not possible, use" - " grow-disk", errors.ECODE_INVAL) + self.op.disks = \ + self._UpgradeDiskNicMods("disk", self.op.disks, + opcodes.OpInstanceSetParams.TestDiskModifications) + self.op.nics = \ + self._UpgradeDiskNicMods("NIC", self.op.nics, + opcodes.OpInstanceSetParams.TestNicModifications) - if disk_addremove > 1: - raise errors.OpPrereqError("Only one disk add or remove operation" - " supported at a time", errors.ECODE_INVAL) + # Check disk modifications + self._CheckMods("disk", self.op.disks, constants.IDISK_PARAMS_TYPES, + self._VerifyDiskModification) if self.op.disks and self.op.disk_template is not None: raise errors.OpPrereqError("Disk template conversion and other disk" @@ -11439,60 +12076,9 @@ class LUInstanceSetParams(LogicalUnit): " one requires specifying a secondary node", errors.ECODE_INVAL) - # NIC validation - nic_addremove = 0 - for nic_op, nic_dict in self.op.nics: - utils.ForceDictType(nic_dict, constants.INIC_PARAMS_TYPES) - if nic_op == constants.DDM_REMOVE: - nic_addremove += 1 - continue - elif nic_op == constants.DDM_ADD: - nic_addremove += 1 - else: - if not isinstance(nic_op, int): - raise errors.OpPrereqError("Invalid nic index", errors.ECODE_INVAL) - if not isinstance(nic_dict, dict): - msg = "Invalid nic value: expected dict, got '%s'" % nic_dict - raise errors.OpPrereqError(msg, errors.ECODE_INVAL) - - # nic_dict should be a dict - nic_ip = nic_dict.get(constants.INIC_IP, None) - if nic_ip is not None: - if nic_ip.lower() == constants.VALUE_NONE: - nic_dict[constants.INIC_IP] = None - else: - if not netutils.IPAddress.IsValid(nic_ip): - raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip, - errors.ECODE_INVAL) - - nic_bridge = nic_dict.get("bridge", None) - nic_link = nic_dict.get(constants.INIC_LINK, None) - if nic_bridge and nic_link: - raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'" - " at the same time", errors.ECODE_INVAL) - elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE: - nic_dict["bridge"] = None - elif nic_link and nic_link.lower() == constants.VALUE_NONE: - nic_dict[constants.INIC_LINK] = None - - if nic_op == constants.DDM_ADD: - nic_mac = nic_dict.get(constants.INIC_MAC, None) - if nic_mac is None: - nic_dict[constants.INIC_MAC] = constants.VALUE_AUTO - - if constants.INIC_MAC in nic_dict: - nic_mac = nic_dict[constants.INIC_MAC] - if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE): - nic_mac = utils.NormalizeAndValidateMac(nic_mac) - - if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO: - raise errors.OpPrereqError("'auto' is not a valid MAC address when" - " modifying an existing nic", - errors.ECODE_INVAL) - - if nic_addremove > 1: - raise errors.OpPrereqError("Only one NIC add or remove operation" - " supported at a time", errors.ECODE_INVAL) + # Check NIC modifications + self._CheckMods("NIC", self.op.nics, constants.INIC_PARAMS_TYPES, + self._VerifyNicModification) def ExpandNames(self): self._ExpandAndLockInstance() @@ -11503,6 +12089,7 @@ class LUInstanceSetParams(LogicalUnit): self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE def DeclareLocks(self, level): + # TODO: Acquire group lock in shared mode (disk parameters) if level == locking.LEVEL_NODE: self._LockInstancesNodes() if self.op.disk_template and self.op.remote_node: @@ -11528,42 +12115,23 @@ class LUInstanceSetParams(LogicalUnit): args["vcpus"] = self.be_new[constants.BE_VCPUS] # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk # information at all. - if self.op.nics: - args["nics"] = [] - nic_override = dict(self.op.nics) - for idx, nic in enumerate(self.instance.nics): - if idx in nic_override: - this_nic_override = nic_override[idx] - else: - this_nic_override = {} - if constants.INIC_IP in this_nic_override: - ip = this_nic_override[constants.INIC_IP] - else: - ip = nic.ip - if constants.INIC_MAC in this_nic_override: - mac = this_nic_override[constants.INIC_MAC] - else: - mac = nic.mac - if idx in self.nic_pnew: - nicparams = self.nic_pnew[idx] - else: - nicparams = self.cluster.SimpleFillNIC(nic.nicparams) - mode = nicparams[constants.NIC_MODE] - link = nicparams[constants.NIC_LINK] - args["nics"].append((ip, mac, mode, link)) - if constants.DDM_ADD in nic_override: - ip = nic_override[constants.DDM_ADD].get(constants.INIC_IP, None) - mac = nic_override[constants.DDM_ADD][constants.INIC_MAC] - nicparams = self.nic_pnew[constants.DDM_ADD] + + if self._new_nics is not None: + nics = [] + + for nic in self._new_nics: + nicparams = self.cluster.SimpleFillNIC(nic.nicparams) mode = nicparams[constants.NIC_MODE] link = nicparams[constants.NIC_LINK] - args["nics"].append((ip, mac, mode, link)) - elif constants.DDM_REMOVE in nic_override: - del args["nics"][-1] + nics.append((nic.ip, nic.mac, mode, link)) + + args["nics"] = nics env = _BuildInstanceHookEnvByObject(self, self.instance, override=args) if self.op.disk_template: env["NEW_DISK_TEMPLATE"] = self.op.disk_template + if self.op.runtime_mem: + env["RUNTIME_MEMORY"] = self.op.runtime_mem return env @@ -11574,6 +12142,61 @@ class LUInstanceSetParams(LogicalUnit): nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes) return (nl, nl) + def _PrepareNicModification(self, params, private, old_ip, old_params, + cluster, pnode): + update_params_dict = dict([(key, params[key]) + for key in constants.NICS_PARAMETERS + if key in params]) + + if "bridge" in params: + update_params_dict[constants.NIC_LINK] = params["bridge"] + + new_params = _GetUpdatedParams(old_params, update_params_dict) + utils.ForceDictType(new_params, constants.NICS_PARAMETER_TYPES) + + new_filled_params = cluster.SimpleFillNIC(new_params) + objects.NIC.CheckParameterSyntax(new_filled_params) + + new_mode = new_filled_params[constants.NIC_MODE] + if new_mode == constants.NIC_MODE_BRIDGED: + bridge = new_filled_params[constants.NIC_LINK] + msg = self.rpc.call_bridges_exist(pnode, [bridge]).fail_msg + if msg: + msg = "Error checking bridges on node '%s': %s" % (pnode, msg) + if self.op.force: + self.warn.append(msg) + else: + raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON) + + elif new_mode == constants.NIC_MODE_ROUTED: + ip = params.get(constants.INIC_IP, old_ip) + if ip is None: + raise errors.OpPrereqError("Cannot set the NIC IP address to None" + " on a routed NIC", errors.ECODE_INVAL) + + if constants.INIC_MAC in params: + mac = params[constants.INIC_MAC] + if mac is None: + raise errors.OpPrereqError("Cannot unset the NIC MAC address", + errors.ECODE_INVAL) + elif mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE): + # otherwise generate the MAC address + params[constants.INIC_MAC] = \ + self.cfg.GenerateMAC(self.proc.GetECId()) + else: + # or validate/reserve the current one + try: + self.cfg.ReserveMAC(mac, self.proc.GetECId()) + except errors.ReservationError: + raise errors.OpPrereqError("MAC address '%s' already in use" + " in cluster" % mac, + errors.ECODE_NOTUNIQUE) + + private.params = new_params + private.filled = new_filled_params + + return (None, None) + def CheckPrereq(self): """Check prerequisites. @@ -11591,6 +12214,10 @@ class LUInstanceSetParams(LogicalUnit): pnode_info = self.cfg.GetNodeInfo(pnode) self.diskparams = self.cfg.GetNodeGroup(pnode_info.group).diskparams + # Prepare disk/NIC modifications + self.diskmod = PrepareContainerMods(self.op.disks, None) + self.nicmod = PrepareContainerMods(self.op.nics, _InstNicModPrivate) + # OS change if self.op.os_name and not self.op.force: _CheckNodeHasOS(self, instance.primary_node, self.op.os_name, @@ -11599,6 +12226,9 @@ class LUInstanceSetParams(LogicalUnit): else: instance_os = instance.os + assert not (self.op.disk_template and self.op.disks), \ + "Can't modify disk template and apply disk changes at the same time" + if self.op.disk_template: if instance.disk_template == self.op.disk_template: raise errors.OpPrereqError("Instance already has disk template %s" % @@ -11628,6 +12258,10 @@ class LUInstanceSetParams(LogicalUnit): _CheckNodesFreeDiskPerVG(self, [self.op.remote_node], required) snode_info = self.cfg.GetNodeInfo(self.op.remote_node) + snode_group = self.cfg.GetNodeGroup(snode_info.group) + ipolicy = _CalculateGroupIPolicy(cluster, snode_group) + _CheckTargetNodeIPolicy(self, ipolicy, instance, snode_info, + ignore=self.op.ignore_ipolicy) if pnode_info.group != snode_info.group: self.LogWarning("The primary and secondary nodes are in two" " different node groups; the disk parameters" @@ -11764,118 +12398,80 @@ class LUInstanceSetParams(LogicalUnit): " %s, due to not enough memory" % node, errors.ECODE_STATE) - # NIC processing - self.nic_pnew = {} - self.nic_pinst = {} - for nic_op, nic_dict in self.op.nics: - if nic_op == constants.DDM_REMOVE: - if not instance.nics: - raise errors.OpPrereqError("Instance has no NICs, cannot remove", - errors.ECODE_INVAL) - continue - if nic_op != constants.DDM_ADD: - # an existing nic - if not instance.nics: - raise errors.OpPrereqError("Invalid NIC index %s, instance has" - " no NICs" % nic_op, - errors.ECODE_INVAL) - if nic_op < 0 or nic_op >= len(instance.nics): - raise errors.OpPrereqError("Invalid NIC index %s, valid values" - " are 0 to %d" % - (nic_op, len(instance.nics) - 1), - errors.ECODE_INVAL) - old_nic_params = instance.nics[nic_op].nicparams - old_nic_ip = instance.nics[nic_op].ip - else: - old_nic_params = {} - old_nic_ip = None - - update_params_dict = dict([(key, nic_dict[key]) - for key in constants.NICS_PARAMETERS - if key in nic_dict]) - - if "bridge" in nic_dict: - update_params_dict[constants.NIC_LINK] = nic_dict["bridge"] - - new_nic_params = _GetUpdatedParams(old_nic_params, - update_params_dict) - utils.ForceDictType(new_nic_params, constants.NICS_PARAMETER_TYPES) - new_filled_nic_params = cluster.SimpleFillNIC(new_nic_params) - objects.NIC.CheckParameterSyntax(new_filled_nic_params) - self.nic_pinst[nic_op] = new_nic_params - self.nic_pnew[nic_op] = new_filled_nic_params - new_nic_mode = new_filled_nic_params[constants.NIC_MODE] - - if new_nic_mode == constants.NIC_MODE_BRIDGED: - nic_bridge = new_filled_nic_params[constants.NIC_LINK] - msg = self.rpc.call_bridges_exist(pnode, [nic_bridge]).fail_msg - if msg: - msg = "Error checking bridges on node %s: %s" % (pnode, msg) - if self.op.force: - self.warn.append(msg) - else: - raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON) - if new_nic_mode == constants.NIC_MODE_ROUTED: - if constants.INIC_IP in nic_dict: - nic_ip = nic_dict[constants.INIC_IP] - else: - nic_ip = old_nic_ip - if nic_ip is None: - raise errors.OpPrereqError("Cannot set the nic ip to None" - " on a routed nic", errors.ECODE_INVAL) - if constants.INIC_MAC in nic_dict: - nic_mac = nic_dict[constants.INIC_MAC] - if nic_mac is None: - raise errors.OpPrereqError("Cannot set the nic mac to None", - errors.ECODE_INVAL) - elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE): - # otherwise generate the mac - nic_dict[constants.INIC_MAC] = \ - self.cfg.GenerateMAC(self.proc.GetECId()) - else: - # or validate/reserve the current one - try: - self.cfg.ReserveMAC(nic_mac, self.proc.GetECId()) - except errors.ReservationError: - raise errors.OpPrereqError("MAC address %s already in use" - " in cluster" % nic_mac, - errors.ECODE_NOTUNIQUE) + if self.op.runtime_mem: + remote_info = self.rpc.call_instance_info(instance.primary_node, + instance.name, + instance.hypervisor) + remote_info.Raise("Error checking node %s" % instance.primary_node) + if not remote_info.payload: # not running already + raise errors.OpPrereqError("Instance %s is not running" % instance.name, + errors.ECODE_STATE) + + current_memory = remote_info.payload["memory"] + if (not self.op.force and + (self.op.runtime_mem > self.be_proposed[constants.BE_MAXMEM] or + self.op.runtime_mem < self.be_proposed[constants.BE_MINMEM])): + raise errors.OpPrereqError("Instance %s must have memory between %d" + " and %d MB of memory unless --force is" + " given" % (instance.name, + self.be_proposed[constants.BE_MINMEM], + self.be_proposed[constants.BE_MAXMEM]), + errors.ECODE_INVAL) + + if self.op.runtime_mem > current_memory: + _CheckNodeFreeMemory(self, instance.primary_node, + "ballooning memory for instance %s" % + instance.name, + self.op.memory - current_memory, + instance.hypervisor) - # DISK processing if self.op.disks and instance.disk_template == constants.DT_DISKLESS: raise errors.OpPrereqError("Disk operations not supported for" " diskless instances", errors.ECODE_INVAL) - for disk_op, _ in self.op.disks: - if disk_op == constants.DDM_REMOVE: - if len(instance.disks) == 1: - raise errors.OpPrereqError("Cannot remove the last disk of" - " an instance", errors.ECODE_INVAL) - _CheckInstanceState(self, instance, INSTANCE_DOWN, - msg="cannot remove disks") - - if (disk_op == constants.DDM_ADD and - len(instance.disks) >= constants.MAX_DISKS): - raise errors.OpPrereqError("Instance has too many disks (%d), cannot" - " add more" % constants.MAX_DISKS, - errors.ECODE_STATE) - if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE): - # an existing disk - if disk_op < 0 or disk_op >= len(instance.disks): - raise errors.OpPrereqError("Invalid disk index %s, valid values" - " are 0 to %d" % - (disk_op, len(instance.disks)), - errors.ECODE_INVAL) - # disabling the instance - if self.op.offline_inst: - _CheckInstanceState(self, instance, INSTANCE_DOWN, - msg="cannot change instance state to offline") + def _PrepareNicCreate(_, params, private): + return self._PrepareNicModification(params, private, None, {}, + cluster, pnode) + + def _PrepareNicMod(_, nic, params, private): + return self._PrepareNicModification(params, private, nic.ip, + nic.nicparams, cluster, pnode) + + # Verify NIC changes (operating on copy) + nics = instance.nics[:] + ApplyContainerMods("NIC", nics, None, self.nicmod, + _PrepareNicCreate, _PrepareNicMod, None) + if len(nics) > constants.MAX_NICS: + raise errors.OpPrereqError("Instance has too many network interfaces" + " (%d), cannot add more" % constants.MAX_NICS, + errors.ECODE_STATE) - # enabling the instance - if self.op.online_inst: - _CheckInstanceState(self, instance, INSTANCE_OFFLINE, - msg="cannot make instance go online") + # Verify disk changes (operating on a copy) + disks = instance.disks[:] + ApplyContainerMods("disk", disks, None, self.diskmod, None, None, None) + if len(disks) > constants.MAX_DISKS: + raise errors.OpPrereqError("Instance has too many disks (%d), cannot add" + " more" % constants.MAX_DISKS, + errors.ECODE_STATE) + + if self.op.offline is not None: + if self.op.offline: + msg = "can't change to offline" + else: + msg = "can't change to online" + _CheckInstanceState(self, instance, CAN_CHANGE_INSTANCE_OFFLINE, msg=msg) + + # Pre-compute NIC changes (necessary to use result in hooks) + self._nic_chgdesc = [] + if self.nicmod: + # Operate on copies as this is still in prereq + nics = [nic.Copy() for nic in instance.nics] + ApplyContainerMods("NIC", nics, self._nic_chgdesc, self.nicmod, + self._CreateNewNic, self._ApplyNicMods, None) + self._new_nics = nics + else: + self._new_nics = None def _ConvertPlainToDrbd(self, feedback_fn): """Converts an instance from plain to drbd. @@ -11990,6 +12586,105 @@ class LUInstanceSetParams(LogicalUnit): # Node resource locks will be released by caller + def _CreateNewDisk(self, idx, params, _): + """Creates a new disk. + + """ + instance = self.instance + + # add a new disk + if instance.disk_template in constants.DTS_FILEBASED: + (file_driver, file_path) = instance.disks[0].logical_id + file_path = os.path.dirname(file_path) + else: + file_driver = file_path = None + + disk = \ + _GenerateDiskTemplate(self, instance.disk_template, instance.name, + instance.primary_node, instance.secondary_nodes, + [params], file_path, file_driver, idx, + self.Log, self.diskparams)[0] + + info = _GetInstanceInfoText(instance) + + logging.info("Creating volume %s for instance %s", + disk.iv_name, instance.name) + # Note: this needs to be kept in sync with _CreateDisks + #HARDCODE + for node in instance.all_nodes: + f_create = (node == instance.primary_node) + try: + _CreateBlockDev(self, node, instance, disk, f_create, info, f_create) + except errors.OpExecError, err: + self.LogWarning("Failed to create volume %s (%s) on node '%s': %s", + disk.iv_name, disk, node, err) + + return (disk, [ + ("disk/%d" % idx, "add:size=%s,mode=%s" % (disk.size, disk.mode)), + ]) + + @staticmethod + def _ModifyDisk(idx, disk, params, _): + """Modifies a disk. + + """ + disk.mode = params[constants.IDISK_MODE] + + return [ + ("disk.mode/%d" % idx, disk.mode), + ] + + def _RemoveDisk(self, idx, root, _): + """Removes a disk. + + """ + for node, disk in root.ComputeNodeTree(self.instance.primary_node): + self.cfg.SetDiskID(disk, node) + msg = self.rpc.call_blockdev_remove(node, disk).fail_msg + if msg: + self.LogWarning("Could not remove disk/%d on node '%s': %s," + " continuing anyway", idx, node, msg) + + # if this is a DRBD disk, return its port to the pool + if root.dev_type in constants.LDS_DRBD: + self.cfg.AddTcpUdpPort(root.logical_id[2]) + + @staticmethod + def _CreateNewNic(idx, params, private): + """Creates data structure for a new network interface. + + """ + mac = params[constants.INIC_MAC] + ip = params.get(constants.INIC_IP, None) + nicparams = private.params + + return (objects.NIC(mac=mac, ip=ip, nicparams=nicparams), [ + ("nic.%d" % idx, + "add:mac=%s,ip=%s,mode=%s,link=%s" % + (mac, ip, private.filled[constants.NIC_MODE], + private.filled[constants.NIC_LINK])), + ]) + + @staticmethod + def _ApplyNicMods(idx, nic, params, private): + """Modifies a network interface. + + """ + changes = [] + + for key in [constants.INIC_MAC, constants.INIC_IP]: + if key in params: + changes.append(("nic.%s/%d" % (key, idx), params[key])) + setattr(nic, key, params[key]) + + if private.params: + nic.nicparams = private.params + + for (key, val) in params.items(): + changes.append(("nic.%s/%d" % (key, idx), val)) + + return changes + def Exec(self, feedback_fn): """Modifies an instance. @@ -11998,6 +12693,7 @@ class LUInstanceSetParams(LogicalUnit): """ # Process here the warnings from CheckPrereq, as we don't have a # feedback_fn there. + # TODO: Replace with self.LogWarning for warn in self.warn: feedback_fn("WARNING: %s" % warn) @@ -12007,66 +12703,19 @@ class LUInstanceSetParams(LogicalUnit): result = [] instance = self.instance - # disk changes - for disk_op, disk_dict in self.op.disks: - if disk_op == constants.DDM_REMOVE: - # remove the last disk - device = instance.disks.pop() - device_idx = len(instance.disks) - for node, disk in device.ComputeNodeTree(instance.primary_node): - self.cfg.SetDiskID(disk, node) - msg = self.rpc.call_blockdev_remove(node, disk).fail_msg - if msg: - self.LogWarning("Could not remove disk/%d on node %s: %s," - " continuing anyway", device_idx, node, msg) - result.append(("disk/%d" % device_idx, "remove")) - - # if this is a DRBD disk, return its port to the pool - if device.dev_type in constants.LDS_DRBD: - tcp_port = device.logical_id[2] - self.cfg.AddTcpUdpPort(tcp_port) - elif disk_op == constants.DDM_ADD: - # add a new disk - if instance.disk_template in (constants.DT_FILE, - constants.DT_SHARED_FILE): - file_driver, file_path = instance.disks[0].logical_id - file_path = os.path.dirname(file_path) - else: - file_driver = file_path = None - disk_idx_base = len(instance.disks) - new_disk = _GenerateDiskTemplate(self, - instance.disk_template, - instance.name, instance.primary_node, - instance.secondary_nodes, - [disk_dict], - file_path, - file_driver, - disk_idx_base, - feedback_fn, - self.diskparams)[0] - instance.disks.append(new_disk) - info = _GetInstanceInfoText(instance) - - logging.info("Creating volume %s for instance %s", - new_disk.iv_name, instance.name) - # Note: this needs to be kept in sync with _CreateDisks - #HARDCODE - for node in instance.all_nodes: - f_create = node == instance.primary_node - try: - _CreateBlockDev(self, node, instance, new_disk, - f_create, info, f_create) - except errors.OpExecError, err: - self.LogWarning("Failed to create volume %s (%s) on" - " node %s: %s", - new_disk.iv_name, new_disk, node, err) - result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" % - (new_disk.size, new_disk.mode))) - else: - # change a given disk - instance.disks[disk_op].mode = disk_dict[constants.IDISK_MODE] - result.append(("disk.mode/%d" % disk_op, - disk_dict[constants.IDISK_MODE])) + + # runtime memory + if self.op.runtime_mem: + rpcres = self.rpc.call_instance_balloon_memory(instance.primary_node, + instance, + self.op.runtime_mem) + rpcres.Raise("Cannot modify instance runtime memory") + result.append(("runtime_memory", self.op.runtime_mem)) + + # Apply disk changes + ApplyContainerMods("disk", instance.disks, result, self.diskmod, + self._CreateNewDisk, self._ModifyDisk, self._RemoveDisk) + _UpdateIvNames(0, instance.disks) if self.op.disk_template: if __debug__: @@ -12100,33 +12749,10 @@ class LUInstanceSetParams(LogicalUnit): _ReleaseLocks(self, locking.LEVEL_NODE) _ReleaseLocks(self, locking.LEVEL_NODE_RES) - # NIC changes - for nic_op, nic_dict in self.op.nics: - if nic_op == constants.DDM_REMOVE: - # remove the last nic - del instance.nics[-1] - result.append(("nic.%d" % len(instance.nics), "remove")) - elif nic_op == constants.DDM_ADD: - # mac and bridge should be set, by now - mac = nic_dict[constants.INIC_MAC] - ip = nic_dict.get(constants.INIC_IP, None) - nicparams = self.nic_pinst[constants.DDM_ADD] - new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams) - instance.nics.append(new_nic) - result.append(("nic.%d" % (len(instance.nics) - 1), - "add:mac=%s,ip=%s,mode=%s,link=%s" % - (new_nic.mac, new_nic.ip, - self.nic_pnew[constants.DDM_ADD][constants.NIC_MODE], - self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK] - ))) - else: - for key in (constants.INIC_MAC, constants.INIC_IP): - if key in nic_dict: - setattr(instance.nics[nic_op], key, nic_dict[key]) - if nic_op in self.nic_pinst: - instance.nics[nic_op].nicparams = self.nic_pinst[nic_op] - for key, val in nic_dict.iteritems(): - result.append(("nic.%s/%d" % (key, nic_op), val)) + # Apply NIC changes + if self._new_nics is not None: + instance.nics = self._new_nics + result.extend(self._nic_chgdesc) # hvparams changes if self.op.hvparams: @@ -12150,13 +12776,17 @@ class LUInstanceSetParams(LogicalUnit): for key, val in self.op.osparams.iteritems(): result.append(("os/%s" % key, val)) - # online/offline instance - if self.op.online_inst: - self.cfg.MarkInstanceDown(instance.name) - result.append(("admin_state", constants.ADMINST_DOWN)) - if self.op.offline_inst: + if self.op.offline is None: + # Ignore + pass + elif self.op.offline: + # Mark instance as offline self.cfg.MarkInstanceOffline(instance.name) result.append(("admin_state", constants.ADMINST_OFFLINE)) + else: + # Mark instance as online, but stopped + self.cfg.MarkInstanceDown(instance.name) + result.append(("admin_state", constants.ADMINST_DOWN)) self.cfg.Update(instance, feedback_fn) @@ -12780,6 +13410,16 @@ class LUGroupAdd(LogicalUnit): if self.op.ndparams: utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES) + if self.op.hv_state: + self.new_hv_state = _MergeAndVerifyHvState(self.op.hv_state, None) + else: + self.new_hv_state = None + + if self.op.disk_state: + self.new_disk_state = _MergeAndVerifyDiskState(self.op.disk_state, None) + else: + self.new_disk_state = None + if self.op.diskparams: for templ in constants.DISK_TEMPLATES: if templ not in self.op.diskparams: @@ -12791,7 +13431,11 @@ class LUGroupAdd(LogicalUnit): if self.op.ipolicy: cluster = self.cfg.GetClusterInfo() full_ipolicy = cluster.SimpleFillIPolicy(self.op.ipolicy) - objects.InstancePolicy.CheckParameterSyntax(full_ipolicy) + try: + objects.InstancePolicy.CheckParameterSyntax(full_ipolicy) + except errors.ConfigurationError, err: + raise errors.OpPrereqError("Invalid instance policy: %s" % err, + errors.ECODE_INVAL) def BuildHooksEnv(self): """Build hooks env. @@ -12817,7 +13461,9 @@ class LUGroupAdd(LogicalUnit): alloc_policy=self.op.alloc_policy, ndparams=self.op.ndparams, diskparams=self.op.diskparams, - ipolicy=self.op.ipolicy) + ipolicy=self.op.ipolicy, + hv_state_static=self.new_hv_state, + disk_state_static=self.new_disk_state) self.cfg.AddNodeGroup(group_obj, self.proc.GetECId(), check_uuid=False) del self.remove_locks[locking.LEVEL_NODEGROUP] @@ -13082,14 +13728,32 @@ class LUGroupSetParams(LogicalUnit): self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name) self.needed_locks = { + locking.LEVEL_INSTANCE: [], locking.LEVEL_NODEGROUP: [self.group_uuid], } + self.share_locks[locking.LEVEL_INSTANCE] = 1 + + def DeclareLocks(self, level): + if level == locking.LEVEL_INSTANCE: + assert not self.needed_locks[locking.LEVEL_INSTANCE] + + # Lock instances optimistically, needs verification once group lock has + # been acquired + self.needed_locks[locking.LEVEL_INSTANCE] = \ + self.cfg.GetNodeGroupInstances(self.group_uuid) + def CheckPrereq(self): """Check prerequisites. """ + owned_instances = frozenset(self.owned_locks(locking.LEVEL_INSTANCE)) + + # Check if locked instances are still correct + _CheckNodeGroupInstances(self.cfg, self.group_uuid, owned_instances) + self.group = self.cfg.GetNodeGroup(self.group_uuid) + cluster = self.cfg.GetClusterInfo() if self.group is None: raise errors.OpExecError("Could not retrieve group '%s' (UUID: %s)" % @@ -13120,14 +13784,22 @@ class LUGroupSetParams(LogicalUnit): self.group.disk_state_static) if self.op.ipolicy: - g_ipolicy = {} - for key, value in self.op.ipolicy.iteritems(): - g_ipolicy[key] = _GetUpdatedParams(self.group.ipolicy.get(key, {}), - value, - use_none=True) - utils.ForceDictType(g_ipolicy[key], constants.ISPECS_PARAMETER_TYPES) - self.new_ipolicy = g_ipolicy - objects.InstancePolicy.CheckParameterSyntax(self.new_ipolicy) + self.new_ipolicy = _GetUpdatedIPolicy(self.group.ipolicy, + self.op.ipolicy, + group_policy=True) + + new_ipolicy = cluster.SimpleFillIPolicy(self.new_ipolicy) + inst_filter = lambda inst: inst.name in owned_instances + instances = self.cfg.GetInstancesInfoByFilter(inst_filter).values() + violations = \ + _ComputeNewInstanceViolations(_CalculateGroupIPolicy(cluster, + self.group), + new_ipolicy, instances) + + if violations: + self.LogWarning("After the ipolicy change the following instances" + " violate them: %s", + utils.CommaJoin(violations)) def BuildHooksEnv(self): """Build hooks env. @@ -13881,7 +14553,7 @@ class IAllocator(object): "cluster_name": cfg.GetClusterName(), "cluster_tags": list(cluster_info.GetTags()), "enabled_hypervisors": list(cluster_info.enabled_hypervisors), - # we don't have job IDs + "ipolicy": cluster_info.ipolicy, } ninfo = cfg.GetAllNodesInfo() iinfo = cfg.GetAllInstancesInfo().values() @@ -13905,7 +14577,7 @@ class IAllocator(object): data["nodegroups"] = self._ComputeNodeGroupData(cfg) - config_ndata = self._ComputeBasicNodeData(ninfo) + config_ndata = self._ComputeBasicNodeData(cfg, ninfo) data["nodes"] = self._ComputeDynamicNodeData(ninfo, node_data, node_iinfo, i_list, config_ndata) assert len(data["nodes"]) == len(ninfo), \ @@ -13920,16 +14592,18 @@ class IAllocator(object): """Compute node groups data. """ + cluster = cfg.GetClusterInfo() ng = dict((guuid, { "name": gdata.name, "alloc_policy": gdata.alloc_policy, + "ipolicy": _CalculateGroupIPolicy(cluster, gdata), }) for guuid, gdata in cfg.GetAllNodeGroupsInfo().items()) return ng @staticmethod - def _ComputeBasicNodeData(node_cfg): + def _ComputeBasicNodeData(cfg, node_cfg): """Compute global node data. @rtype: dict @@ -13947,6 +14621,7 @@ class IAllocator(object): "group": ninfo.group, "master_capable": ninfo.master_capable, "vm_capable": ninfo.vm_capable, + "ndparams": cfg.GetNdParams(ninfo), }) for ninfo in node_cfg.values())