X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/f9bbf32be3274f4ec04304ee95fba0936b16d35c..cb83e4dba6d30467515b7f86bb5be4051b921ea9:/lib/cmdlib.py diff --git a/lib/cmdlib.py b/lib/cmdlib.py index d47f4d4..a838787 100644 --- a/lib/cmdlib.py +++ b/lib/cmdlib.py @@ -48,7 +48,6 @@ from ganeti import hypervisor from ganeti import locking from ganeti import constants from ganeti import objects -from ganeti import serializer from ganeti import ssconf from ganeti import uidpool from ganeti import compat @@ -60,13 +59,11 @@ from ganeti import opcodes from ganeti import ht from ganeti import rpc from ganeti import runtime +from ganeti.masterd import iallocator import ganeti.masterd.instance # pylint: disable=W0611 -#: Size of DRBD meta block device -DRBD_META_SIZE = 128 - # States of instance INSTANCE_DOWN = [constants.ADMINST_DOWN] INSTANCE_ONLINE = [constants.ADMINST_DOWN, constants.ADMINST_UP] @@ -585,20 +582,6 @@ def _ShareAll(): return dict.fromkeys(locking.LEVELS, 1) -def _MakeLegacyNodeInfo(data): - """Formats the data returned by L{rpc.RpcRunner.call_node_info}. - - Converts the data into a single dictionary. This is fine for most use cases, - but some require information from more than one volume group or hypervisor. - - """ - (bootid, (vg_info, ), (hv_info, )) = data - - return utils.JoinDisjointDicts(utils.JoinDisjointDicts(vg_info, hv_info), { - "bootid": bootid, - }) - - def _AnnotateDiskParams(instance, devs, cfg): """Little helper wrapper to the rpc annotation method. @@ -640,7 +623,8 @@ def _CheckInstancesNodeGroups(cfg, instances, owned_groups, owned_nodes, "Instance %s has no node in group %s" % (name, cur_group_uuid) -def _CheckInstanceNodeGroups(cfg, instance_name, owned_groups): +def _CheckInstanceNodeGroups(cfg, instance_name, owned_groups, + primary_only=False): """Checks if the owned node groups are still correct for an instance. @type cfg: L{config.ConfigWriter} @@ -649,9 +633,11 @@ def _CheckInstanceNodeGroups(cfg, instance_name, owned_groups): @param instance_name: Instance name @type owned_groups: set or frozenset @param owned_groups: List of currently owned node groups + @type primary_only: boolean + @param primary_only: Whether to check node groups for only the primary node """ - inst_groups = cfg.GetInstanceNodeGroups(instance_name) + inst_groups = cfg.GetInstanceNodeGroups(instance_name, primary_only) if not owned_groups.issuperset(inst_groups): raise errors.OpPrereqError("Instance %s's node groups changed since" @@ -793,7 +779,8 @@ def _GetUpdatedIPolicy(old_ipolicy, new_ipolicy, group_policy=False): use_none=use_none, use_default=use_default) else: - if not value or value == [constants.VALUE_DEFAULT]: + if (not value or value == [constants.VALUE_DEFAULT] or + value == constants.VALUE_DEFAULT): if group_policy: del ipolicy[key] else: @@ -814,7 +801,7 @@ def _GetUpdatedIPolicy(old_ipolicy, new_ipolicy, group_policy=False): # in a nicer way ipolicy[key] = list(value) try: - objects.InstancePolicy.CheckParameterSyntax(ipolicy) + objects.InstancePolicy.CheckParameterSyntax(ipolicy, not group_policy) except errors.ConfigurationError, err: raise errors.OpPrereqError("Invalid instance policy: %s" % err, errors.ECODE_INVAL) @@ -956,9 +943,8 @@ def _RunPostHook(lu, node_name): hm = lu.proc.BuildHooksManager(lu) try: hm.RunPhase(constants.HOOKS_PHASE_POST, nodes=[node_name]) - except: - # pylint: disable=W0702 - lu.LogWarning("Errors occurred running hooks on %s" % node_name) + except Exception, err: # pylint: disable=W0703 + lu.LogWarning("Errors occurred running hooks on %s: %s" % (node_name, err)) def _CheckOutputFields(static, dynamic, selected): @@ -1107,19 +1093,24 @@ def _CheckInstanceState(lu, instance, req_states, msg=None): if constants.ADMINST_UP not in req_states: pnode = instance.primary_node - ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode] - ins_l.Raise("Can't contact node %s for instance information" % pnode, - prereq=True, ecode=errors.ECODE_ENVIRON) - - if instance.name in ins_l.payload: - raise errors.OpPrereqError("Instance %s is running, %s" % - (instance.name, msg), errors.ECODE_STATE) + if not lu.cfg.GetNodeInfo(pnode).offline: + ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode] + ins_l.Raise("Can't contact node %s for instance information" % pnode, + prereq=True, ecode=errors.ECODE_ENVIRON) + if instance.name in ins_l.payload: + raise errors.OpPrereqError("Instance %s is running, %s" % + (instance.name, msg), errors.ECODE_STATE) + else: + lu.LogWarning("Primary node offline, ignoring check that instance" + " is down") -def _ComputeMinMaxSpec(name, ipolicy, value): +def _ComputeMinMaxSpec(name, qualifier, ipolicy, value): """Computes if value is in the desired range. @param name: name of the parameter for which we perform the check + @param qualifier: a qualifier used in the error message (e.g. 'disk/1', + not just 'disk') @param ipolicy: dictionary containing min, max and std values @param value: actual value that we want to use @return: None or element not meeting the criteria @@ -1131,8 +1122,12 @@ def _ComputeMinMaxSpec(name, ipolicy, value): max_v = ipolicy[constants.ISPECS_MAX].get(name, value) min_v = ipolicy[constants.ISPECS_MIN].get(name, value) if value > max_v or min_v > value: + if qualifier: + fqn = "%s/%s" % (name, qualifier) + else: + fqn = name return ("%s value %s is not in range [%s, %s]" % - (name, value, min_v, max_v)) + (fqn, value, min_v, max_v)) return None @@ -1162,16 +1157,17 @@ def _ComputeIPolicySpecViolation(ipolicy, mem_size, cpu_count, disk_count, assert disk_count == len(disk_sizes) test_settings = [ - (constants.ISPEC_MEM_SIZE, mem_size), - (constants.ISPEC_CPU_COUNT, cpu_count), - (constants.ISPEC_DISK_COUNT, disk_count), - (constants.ISPEC_NIC_COUNT, nic_count), - (constants.ISPEC_SPINDLE_USE, spindle_use), - ] + map((lambda d: (constants.ISPEC_DISK_SIZE, d)), disk_sizes) + (constants.ISPEC_MEM_SIZE, "", mem_size), + (constants.ISPEC_CPU_COUNT, "", cpu_count), + (constants.ISPEC_DISK_COUNT, "", disk_count), + (constants.ISPEC_NIC_COUNT, "", nic_count), + (constants.ISPEC_SPINDLE_USE, "", spindle_use), + ] + [(constants.ISPEC_DISK_SIZE, str(idx), d) + for idx, d in enumerate(disk_sizes)] return filter(None, - (_compute_fn(name, ipolicy, value) - for (name, value) in test_settings)) + (_compute_fn(name, qualifier, ipolicy, value) + for (name, qualifier, value) in test_settings)) def _ComputeIPolicyInstanceViolation(ipolicy, instance, @@ -1197,8 +1193,8 @@ def _ComputeIPolicyInstanceViolation(ipolicy, instance, disk_sizes, spindle_use) -def _ComputeIPolicyInstanceSpecViolation(ipolicy, instance_spec, - _compute_fn=_ComputeIPolicySpecViolation): +def _ComputeIPolicyInstanceSpecViolation( + ipolicy, instance_spec, _compute_fn=_ComputeIPolicySpecViolation): """Compute if instance specs meets the specs of ipolicy. @type ipolicy: dict @@ -1269,11 +1265,12 @@ def _ComputeNewInstanceViolations(old_ipolicy, new_ipolicy, instances): @param old_ipolicy: The current (still in-place) ipolicy @param new_ipolicy: The new (to become) ipolicy @param instances: List of instances to verify - @return: A list of instances which violates the new ipolicy but did not before + @return: A list of instances which violates the new ipolicy but + did not before """ - return (_ComputeViolatingInstances(old_ipolicy, instances) - - _ComputeViolatingInstances(new_ipolicy, instances)) + return (_ComputeViolatingInstances(new_ipolicy, instances) - + _ComputeViolatingInstances(old_ipolicy, instances)) def _ExpandItemName(fn, name, kind): @@ -1489,13 +1486,6 @@ def _DecideSelfPromotion(lu, exceptions=None): return mc_now < mc_should -def _CalculateGroupIPolicy(cluster, group): - """Calculate instance policy for group. - - """ - return cluster.SimpleFillIPolicy(group.ipolicy) - - def _ComputeViolatingInstances(ipolicy, instances): """Computes a set of instances who violates given ipolicy. @@ -1602,7 +1592,8 @@ def _FindFaultyInstanceDisks(cfg, rpc_runner, instance, node_name, prereq): for dev in instance.disks: cfg.SetDiskID(dev, node_name) - result = rpc_runner.call_blockdev_getmirrorstatus(node_name, instance.disks) + result = rpc_runner.call_blockdev_getmirrorstatus(node_name, (instance.disks, + instance)) result.Raise("Failed to get disk status from node %s" % node_name, prereq=prereq, ecode=errors.ECODE_ENVIRON) @@ -1628,12 +1619,12 @@ def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot): """ node = getattr(lu.op, node_slot, None) - iallocator = getattr(lu.op, iallocator_slot, None) + ialloc = getattr(lu.op, iallocator_slot, None) - if node is not None and iallocator is not None: + if node is not None and ialloc is not None: raise errors.OpPrereqError("Do not specify both, iallocator and node", errors.ECODE_INVAL) - elif node is None and iallocator is None: + elif node is None and ialloc is None: default_iallocator = lu.cfg.GetDefaultIAllocator() if default_iallocator: setattr(lu.op, iallocator_slot, default_iallocator) @@ -1642,30 +1633,30 @@ def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot): " cluster-wide default iallocator found;" " please specify either an iallocator or a" " node, or set a cluster-wide default" - " iallocator") + " iallocator", errors.ECODE_INVAL) -def _GetDefaultIAllocator(cfg, iallocator): +def _GetDefaultIAllocator(cfg, ialloc): """Decides on which iallocator to use. @type cfg: L{config.ConfigWriter} @param cfg: Cluster configuration object - @type iallocator: string or None - @param iallocator: Iallocator specified in opcode + @type ialloc: string or None + @param ialloc: Iallocator specified in opcode @rtype: string @return: Iallocator name """ - if not iallocator: + if not ialloc: # Use default iallocator - iallocator = cfg.GetDefaultIAllocator() + ialloc = cfg.GetDefaultIAllocator() - if not iallocator: + if not ialloc: raise errors.OpPrereqError("No iallocator was specified, neither in the" " opcode nor as a cluster-wide default", errors.ECODE_INVAL) - return iallocator + return ialloc class LUClusterPostInit(LogicalUnit): @@ -1907,10 +1898,11 @@ class LUClusterVerify(NoHooksLU): # Always depend on global verification depends_fn = lambda: [(-len(jobs), [])] - jobs.extend([opcodes.OpClusterVerifyGroup(group_name=group, - ignore_errors=self.op.ignore_errors, - depends=depends_fn())] - for group in groups) + jobs.extend( + [opcodes.OpClusterVerifyGroup(group_name=group, + ignore_errors=self.op.ignore_errors, + depends=depends_fn())] + for group in groups) # Fix up all parameters for op in itertools.chain(*jobs): # pylint: disable=W0142 @@ -2411,9 +2403,11 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): node_vol_should = {} instanceconfig.MapLVsByNode(node_vol_should) - ipolicy = _CalculateGroupIPolicy(self.cfg.GetClusterInfo(), self.group_info) + cluster = self.cfg.GetClusterInfo() + ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster, + self.group_info) err = _ComputeIPolicyInstanceViolation(ipolicy, instanceconfig) - _ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance, err) + _ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance, utils.CommaJoin(err)) for node in node_vol_should: n_img = node_image[node] @@ -2632,7 +2626,7 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): if drbd_helper: helper_result = nresult.get(constants.NV_DRBDHELPER, None) - test = (helper_result == None) + test = (helper_result is None) _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node, "no drbd usermode helper returned") if helper_result: @@ -2914,12 +2908,12 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): node_disks[nname] = disks - # Creating copies as SetDiskID below will modify the objects and that can - # lead to incorrect data returned from nodes - devonly = [dev.Copy() for (_, dev) in disks] - - for dev in devonly: - self.cfg.SetDiskID(dev, nname) + # _AnnotateDiskParams makes already copies of the disks + devonly = [] + for (inst, dev) in disks: + (anno_disk,) = _AnnotateDiskParams(instanceinfo[inst], [dev], self.cfg) + self.cfg.SetDiskID(anno_disk, nname) + devonly.append(anno_disk) node_disks_devonly[nname] = devonly @@ -3143,6 +3137,8 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): for instance in self.my_inst_names: inst_config = self.my_inst_info[instance] + if inst_config.admin_state == constants.ADMINST_OFFLINE: + i_offline += 1 for nname in inst_config.all_nodes: if nname not in node_image: @@ -3202,10 +3198,12 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): if master_node not in self.my_node_info: additional_nodes.append(master_node) vf_node_info.append(self.all_node_info[master_node]) - # Add the first vm_capable node we find which is not included + # Add the first vm_capable node we find which is not included, + # excluding the master node (which we already have) for node in absent_nodes: nodeinfo = self.all_node_info[node] - if nodeinfo.vm_capable and not nodeinfo.offline: + if (nodeinfo.vm_capable and not nodeinfo.offline and + node != master_node): additional_nodes.append(node) vf_node_info.append(self.all_node_info[node]) break @@ -3282,12 +3280,6 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): non_primary_inst = set(nimg.instances).difference(nimg.pinst) for inst in non_primary_inst: - # FIXME: investigate best way to handle offline insts - if inst.admin_state == constants.ADMINST_OFFLINE: - if verbose: - feedback_fn("* Skipping offline instance %s" % inst.name) - i_offline += 1 - continue test = inst in self.all_inst_info _ErrorIf(test, constants.CV_EINSTANCEWRONGNODE, inst, "instance should not run on node %s", node_i.name) @@ -3563,9 +3555,9 @@ class LUGroupVerifyDisks(NoHooksLU): res_instances = set() res_missing = {} - nv_dict = _MapInstanceDisksToNodes([inst - for inst in self.instances.values() - if inst.admin_state == constants.ADMINST_UP]) + nv_dict = _MapInstanceDisksToNodes( + [inst for inst in self.instances.values() + if inst.admin_state == constants.ADMINST_UP]) if nv_dict: nodes = utils.NiceSort(set(self.owned_locks(locking.LEVEL_NODE)) & @@ -3818,10 +3810,10 @@ def _ValidateNetmask(cfg, netmask): ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family) except errors.ProgrammerError: raise errors.OpPrereqError("Invalid primary ip family: %s." % - ip_family) + ip_family, errors.ECODE_INVAL) if not ipcls.ValidateNetmask(netmask): raise errors.OpPrereqError("CIDR netmask (%s) not valid" % - (netmask)) + (netmask), errors.ECODE_INVAL) class LUClusterSetParams(LogicalUnit): @@ -3851,6 +3843,11 @@ class LUClusterSetParams(LogicalUnit): if self.op.diskparams: for dt_params in self.op.diskparams.values(): utils.ForceDictType(dt_params, constants.DISK_DT_TYPES) + try: + utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS) + except errors.OpPrereqError, err: + raise errors.OpPrereqError("While verify diskparams options: %s" % err, + errors.ECODE_INVAL) def ExpandNames(self): # FIXME: in the future maybe other cluster params won't require checking on @@ -3978,8 +3975,8 @@ class LUClusterSetParams(LogicalUnit): if compat.any(node in group.members for node in inst.all_nodes)]) new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy) - new = _ComputeNewInstanceViolations(_CalculateGroupIPolicy(cluster, - group), + ipol = ganeti.masterd.instance.CalculateGroupIPolicy(cluster, group) + new = _ComputeNewInstanceViolations(ipol, new_ipolicy, instances) if new: violations.update(new) @@ -3987,7 +3984,7 @@ class LUClusterSetParams(LogicalUnit): if violations: self.LogWarning("After the ipolicy change the following instances" " violate them: %s", - utils.CommaJoin(violations)) + utils.CommaJoin(utils.NiceSort(violations))) if self.op.nicparams: utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES) @@ -4015,7 +4012,7 @@ class LUClusterSetParams(LogicalUnit): " address" % (instance.name, nic_idx)) if nic_errors: raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" % - "\n".join(nic_errors)) + "\n".join(nic_errors), errors.ECODE_INVAL) # hypervisor list/parameters self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {}) @@ -4299,6 +4296,9 @@ def _ComputeAncillaryFiles(cluster, redist): if cluster.modify_etc_hosts: files_all.add(constants.ETC_HOSTS) + if cluster.use_external_mip_script: + files_all.add(constants.EXTERNAL_MASTER_SETUP_SCRIPT) + # Files which are optional, these must: # - be present in one other category as well # - either exist or not exist on all nodes of that category (mc, vm all) @@ -4312,16 +4312,14 @@ def _ComputeAncillaryFiles(cluster, redist): if not redist: files_mc.add(constants.CLUSTER_CONF_FILE) - # FIXME: this should also be replicated but Ganeti doesn't support files_mc - # replication - files_mc.add(constants.DEFAULT_MASTER_SETUP_SCRIPT) - # Files which should only be on VM-capable nodes - files_vm = set(filename + files_vm = set( + filename for hv_name in cluster.enabled_hypervisors for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles()[0]) - files_opt |= set(filename + files_opt |= set( + filename for hv_name in cluster.enabled_hypervisors for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles()[1]) @@ -4356,7 +4354,8 @@ def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True): master_info = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode()) online_nodes = lu.cfg.GetOnlineNodeList() - vm_nodes = lu.cfg.GetVmCapableNodeList() + online_set = frozenset(online_nodes) + vm_nodes = list(online_set.intersection(lu.cfg.GetVmCapableNodeList())) if additional_nodes is not None: online_nodes.extend(additional_nodes) @@ -4465,7 +4464,7 @@ def _WaitForSync(lu, instance, disks=None, oneshot=False): max_time = 0 done = True cumul_degraded = False - rstats = lu.rpc.call_blockdev_getmirrorstatus(node, disks) + rstats = lu.rpc.call_blockdev_getmirrorstatus(node, (disks, instance)) msg = rstats.fail_msg if msg: lu.LogWarning("Can't get any data from node %s: %s", node, msg) @@ -4743,10 +4742,10 @@ class LUOobCommand(NoHooksLU): type(result.payload)) if self.op.command in [ - constants.OOB_POWER_ON, - constants.OOB_POWER_OFF, - constants.OOB_POWER_CYCLE, - ]: + constants.OOB_POWER_ON, + constants.OOB_POWER_OFF, + constants.OOB_POWER_CYCLE, + ]: if result.payload is not None: errs.append("%s is expected to not return payload but got '%s'" % (self.op.command, result.payload)) @@ -5048,7 +5047,7 @@ class _NodeQuery(_QueryBase): node_data = lu.rpc.call_node_info(toquery_nodes, [lu.cfg.GetVGName()], [lu.cfg.GetHypervisorType()]) - live_data = dict((name, _MakeLegacyNodeInfo(nresult.payload)) + live_data = dict((name, rpc.MakeLegacyNodeInfo(nresult.payload)) for (name, nresult) in node_data.items() if not nresult.fail_msg and nresult.payload) else: @@ -5356,10 +5355,11 @@ class _InstanceQuery(_QueryBase): live_data = {} if query.IQ_DISKUSAGE in self.requested_data: + gmi = ganeti.masterd.instance disk_usage = dict((inst.name, - _ComputeDiskSize(inst.disk_template, - [{constants.IDISK_SIZE: disk.size} - for disk in inst.disks])) + gmi.ComputeDiskSize(inst.disk_template, + [{constants.IDISK_SIZE: disk.size} + for disk in inst.disks])) for inst in instance_list) else: disk_usage = None @@ -5622,7 +5622,7 @@ class LUNodeAdd(LogicalUnit): if not newbie_singlehomed: # check reachability from my secondary ip to newbie's secondary ip if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT, - source=myself.secondary_ip): + source=myself.secondary_ip): raise errors.OpPrereqError("Node secondary ip not reachable by TCP" " based ping to node daemon port", errors.ECODE_ENVIRON) @@ -5800,10 +5800,10 @@ class LUNodeSetParams(LogicalUnit): errors.ECODE_INVAL) # Boolean value that tells us whether we might be demoting from MC - self.might_demote = (self.op.master_candidate == False or - self.op.offline == True or - self.op.drained == True or - self.op.master_capable == False) + self.might_demote = (self.op.master_candidate is False or + self.op.offline is True or + self.op.drained is True or + self.op.master_capable is False) if self.op.secondary_ip: if not netutils.IP4Address.IsValid(self.op.secondary_ip): @@ -5904,7 +5904,7 @@ class LUNodeSetParams(LogicalUnit): " it a master candidate" % node.name, errors.ECODE_STATE) - if self.op.vm_capable == False: + if self.op.vm_capable is False: (ipri, isec) = self.cfg.GetNodeInstances(self.op.node_name) if ipri or isec: raise errors.OpPrereqError("Node %s hosts instances, cannot unset" @@ -5920,7 +5920,8 @@ class LUNodeSetParams(LogicalUnit): if mc_remaining < mc_should: raise errors.OpPrereqError("Not enough master candidates, please" " pass auto promote option to allow" - " promotion", errors.ECODE_STATE) + " promotion (--auto-promote or RAPI" + " auto_promote=True)", errors.ECODE_STATE) self.old_flags = old_flags = (node.master_candidate, node.drained, node.offline) @@ -5929,7 +5930,7 @@ class LUNodeSetParams(LogicalUnit): # Check for ineffective changes for attr in self._FLAGS: - if (getattr(self.op, attr) == False and getattr(node, attr) == False): + if (getattr(self.op, attr) is False and getattr(node, attr) is False): self.LogInfo("Ignoring request to unset flag %s, already unset", attr) setattr(self.op, attr, None) @@ -5939,24 +5940,25 @@ class LUNodeSetParams(LogicalUnit): # TODO: We might query the real power state if it supports OOB if _SupportsOob(self.cfg, node): if self.op.offline is False and not (node.powered or - self.op.powered == True): + self.op.powered is True): raise errors.OpPrereqError(("Node %s needs to be turned on before its" " offline status can be reset") % - self.op.node_name) + self.op.node_name, errors.ECODE_STATE) elif self.op.powered is not None: raise errors.OpPrereqError(("Unable to change powered state for node %s" " as it does not support out-of-band" - " handling") % self.op.node_name) + " handling") % self.op.node_name, + errors.ECODE_STATE) # If we're being deofflined/drained, we'll MC ourself if needed - if (self.op.drained == False or self.op.offline == False or + if (self.op.drained is False or self.op.offline is False or (self.op.master_capable and not node.master_capable)): if _DecideSelfPromotion(self): self.op.master_candidate = True self.LogInfo("Auto-promoting node to master candidate") # If we're no longer master capable, we'll demote ourselves from MC - if self.op.master_capable == False and node.master_candidate: + if self.op.master_capable is False and node.master_candidate: self.LogInfo("Demoting from master candidate") self.op.master_candidate = False @@ -5990,23 +5992,43 @@ class LUNodeSetParams(LogicalUnit): " without using re-add. Please make sure the node" " is healthy!") + # When changing the secondary ip, verify if this is a single-homed to + # multi-homed transition or vice versa, and apply the relevant + # restrictions. if self.op.secondary_ip: # Ok even without locking, because this can't be changed by any LU master = self.cfg.GetNodeInfo(self.cfg.GetMasterNode()) master_singlehomed = master.secondary_ip == master.primary_ip - if master_singlehomed and self.op.secondary_ip: - raise errors.OpPrereqError("Cannot change the secondary ip on a single" - " homed cluster", errors.ECODE_INVAL) + if master_singlehomed and self.op.secondary_ip != node.primary_ip: + if self.op.force and node.name == master.name: + self.LogWarning("Transitioning from single-homed to multi-homed" + " cluster. All nodes will require a secondary ip.") + else: + raise errors.OpPrereqError("Changing the secondary ip on a" + " single-homed cluster requires the" + " --force option to be passed, and the" + " target node to be the master", + errors.ECODE_INVAL) + elif not master_singlehomed and self.op.secondary_ip == node.primary_ip: + if self.op.force and node.name == master.name: + self.LogWarning("Transitioning from multi-homed to single-homed" + " cluster. Secondary IPs will have to be removed.") + else: + raise errors.OpPrereqError("Cannot set the secondary IP to be the" + " same as the primary IP on a multi-homed" + " cluster, unless the --force option is" + " passed, and the target node is the" + " master", errors.ECODE_INVAL) assert not (frozenset(affected_instances) - self.owned_locks(locking.LEVEL_INSTANCE)) if node.offline: if affected_instances: - raise errors.OpPrereqError("Cannot change secondary IP address:" - " offline node has instances (%s)" - " configured to use it" % - utils.CommaJoin(affected_instances.keys())) + msg = ("Cannot change secondary IP address: offline node has" + " instances (%s) configured to use it" % + utils.CommaJoin(affected_instances.keys())) + raise errors.OpPrereqError(msg, errors.ECODE_STATE) else: # On online nodes, check that no instances are running, and that # the node has the new ip and we can reach it. @@ -6310,6 +6332,10 @@ class LUInstanceActivateDisks(NoHooksLU): if not disks_ok: raise errors.OpExecError("Cannot activate block devices") + if self.op.wait_for_sync: + if not _WaitForSync(self, self.instance): + raise errors.OpExecError("Some disks of the instance are degraded!") + return disks_info @@ -6362,10 +6388,12 @@ def _AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False, False, idx) msg = result.fail_msg if msg: + is_offline_secondary = (node in instance.secondary_nodes and + result.offline) lu.proc.LogWarning("Could not prepare block device %s on node %s" " (is_primary=False, pass=1): %s", inst_disk.iv_name, node, msg) - if not ignore_secondaries: + if not (ignore_secondaries or is_offline_secondary): disks_ok = False # FIXME: race condition on drbd migration to primary @@ -6498,7 +6526,7 @@ def _ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False): for disk in disks: for node, top_disk in disk.ComputeNodeTree(instance.primary_node): lu.cfg.SetDiskID(top_disk, node) - result = lu.rpc.call_blockdev_shutdown(node, top_disk) + result = lu.rpc.call_blockdev_shutdown(node, (top_disk, instance)) msg = result.fail_msg if msg: lu.LogWarning("Could not shutdown block device %s on node %s: %s", @@ -6971,9 +6999,6 @@ class LUInstanceReinstall(LogicalUnit): "Cannot retrieve locked instance %s" % self.op.instance_name _CheckNodeOnline(self, instance.primary_node, "Instance primary node" " offline, cannot reinstall") - for node in instance.secondary_nodes: - _CheckNodeOnline(self, node, "Instance secondary node offline," - " cannot reinstall") if instance.disk_template == constants.DT_DISKLESS: raise errors.OpPrereqError("Instance '%s' has no disks" % @@ -7047,6 +7072,59 @@ class LUInstanceRecreateDisks(LogicalUnit): constants.IDISK_METAVG, ])) + def _RunAllocator(self): + """Run the allocator based on input opcode. + + """ + be_full = self.cfg.GetClusterInfo().FillBE(self.instance) + + # FIXME + # The allocator should actually run in "relocate" mode, but current + # allocators don't support relocating all the nodes of an instance at + # the same time. As a workaround we use "allocate" mode, but this is + # suboptimal for two reasons: + # - The instance name passed to the allocator is present in the list of + # existing instances, so there could be a conflict within the + # internal structures of the allocator. This doesn't happen with the + # current allocators, but it's a liability. + # - The allocator counts the resources used by the instance twice: once + # because the instance exists already, and once because it tries to + # allocate a new instance. + # The allocator could choose some of the nodes on which the instance is + # running, but that's not a problem. If the instance nodes are broken, + # they should be already be marked as drained or offline, and hence + # skipped by the allocator. If instance disks have been lost for other + # reasons, then recreating the disks on the same nodes should be fine. + disk_template = self.instance.disk_template + spindle_use = be_full[constants.BE_SPINDLE_USE] + req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name, + disk_template=disk_template, + tags=list(self.instance.GetTags()), + os=self.instance.os, + nics=[{}], + vcpus=be_full[constants.BE_VCPUS], + memory=be_full[constants.BE_MAXMEM], + spindle_use=spindle_use, + disks=[{constants.IDISK_SIZE: d.size, + constants.IDISK_MODE: d.mode} + for d in self.instance.disks], + hypervisor=self.instance.hypervisor) + ial = iallocator.IAllocator(self.cfg, self.rpc, req) + + ial.Run(self.op.iallocator) + + assert req.RequiredNodes() == len(self.instance.all_nodes) + + if not ial.success: + raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':" + " %s" % (self.op.iallocator, ial.info), + errors.ECODE_NORES) + + self.op.nodes = ial.result + self.LogInfo("Selected nodes for instance %s via iallocator %s: %s", + self.op.instance_name, self.op.iallocator, + utils.CommaJoin(ial.result)) + def CheckArguments(self): if self.op.disks and ht.TPositiveInt(self.op.disks[0]): # Normalize and convert deprecated list of disk indices @@ -7058,6 +7136,10 @@ class LUInstanceRecreateDisks(LogicalUnit): " once: %s" % utils.CommaJoin(duplicates), errors.ECODE_INVAL) + if self.op.iallocator and self.op.nodes: + raise errors.OpPrereqError("Give either the iallocator or the new" + " nodes, not both", errors.ECODE_INVAL) + for (idx, params) in self.op.disks: utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES) unsupported = frozenset(params.keys()) - self._MODIFYABLE @@ -7075,14 +7157,40 @@ class LUInstanceRecreateDisks(LogicalUnit): self.needed_locks[locking.LEVEL_NODE] = list(self.op.nodes) else: self.needed_locks[locking.LEVEL_NODE] = [] + if self.op.iallocator: + # iallocator will select a new node in the same group + self.needed_locks[locking.LEVEL_NODEGROUP] = [] self.needed_locks[locking.LEVEL_NODE_RES] = [] def DeclareLocks(self, level): - if level == locking.LEVEL_NODE: - # if we replace the nodes, we only need to lock the old primary, - # otherwise we need to lock all nodes for disk re-creation - primary_only = bool(self.op.nodes) - self._LockInstancesNodes(primary_only=primary_only) + if level == locking.LEVEL_NODEGROUP: + assert self.op.iallocator is not None + assert not self.op.nodes + assert not self.needed_locks[locking.LEVEL_NODEGROUP] + self.share_locks[locking.LEVEL_NODEGROUP] = 1 + # Lock the primary group used by the instance optimistically; this + # requires going via the node before it's locked, requiring + # verification later on + self.needed_locks[locking.LEVEL_NODEGROUP] = \ + self.cfg.GetInstanceNodeGroups(self.op.instance_name, primary_only=True) + + elif level == locking.LEVEL_NODE: + # If an allocator is used, then we lock all the nodes in the current + # instance group, as we don't know yet which ones will be selected; + # if we replace the nodes without using an allocator, locks are + # already declared in ExpandNames; otherwise, we need to lock all the + # instance nodes for disk re-creation + if self.op.iallocator: + assert not self.op.nodes + assert not self.needed_locks[locking.LEVEL_NODE] + assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1 + + # Lock member nodes of the group of the primary node + for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP): + self.needed_locks[locking.LEVEL_NODE].extend( + self.cfg.GetNodeGroup(group_uuid).members) + elif not self.op.nodes: + self._LockInstancesNodes(primary_only=False) elif level == locking.LEVEL_NODE_RES: # Copy node locks self.needed_locks[locking.LEVEL_NODE_RES] = \ @@ -7126,18 +7234,25 @@ class LUInstanceRecreateDisks(LogicalUnit): primary_node = self.op.nodes[0] else: primary_node = instance.primary_node - _CheckNodeOnline(self, primary_node) + if not self.op.iallocator: + _CheckNodeOnline(self, primary_node) if instance.disk_template == constants.DT_DISKLESS: raise errors.OpPrereqError("Instance '%s' has no disks" % self.op.instance_name, errors.ECODE_INVAL) + # Verify if node group locks are still correct + owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP) + if owned_groups: + # Node group locks are acquired only for the primary node (and only + # when the allocator is used) + _CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups, + primary_only=True) + # if we replace nodes *and* the old primary is offline, we don't - # check - assert instance.primary_node in self.owned_locks(locking.LEVEL_NODE) - assert instance.primary_node in self.owned_locks(locking.LEVEL_NODE_RES) + # check the instance state old_pnode = self.cfg.GetNodeInfo(instance.primary_node) - if not (self.op.nodes and old_pnode.offline): + if not ((self.op.iallocator or self.op.nodes) and old_pnode.offline): _CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING, msg="cannot recreate disks") @@ -7151,7 +7266,7 @@ class LUInstanceRecreateDisks(LogicalUnit): raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx, errors.ECODE_INVAL) - if (self.op.nodes and + if ((self.op.nodes or self.op.iallocator) and sorted(self.disks.keys()) != range(len(instance.disks))): raise errors.OpPrereqError("Can't recreate disks partially and" " change the nodes at the same time", @@ -7159,6 +7274,13 @@ class LUInstanceRecreateDisks(LogicalUnit): self.instance = instance + if self.op.iallocator: + self._RunAllocator() + + # Release unneeded node and node resource locks + _ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.nodes) + _ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.nodes) + def Exec(self, feedback_fn): """Recreate the disks. @@ -7711,8 +7833,9 @@ class LUInstanceMove(LogicalUnit): _CheckNodeOnline(self, target_node) _CheckNodeNotDrained(self, target_node) _CheckNodeVmCapable(self, target_node) - ipolicy = _CalculateGroupIPolicy(self.cfg.GetClusterInfo(), - self.cfg.GetNodeGroup(node.group)) + cluster = self.cfg.GetClusterInfo() + group_info = self.cfg.GetNodeGroup(node.group) + ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster, group_info) _CheckTargetNodeIPolicy(self, ipolicy, instance, node, ignore=self.op.ignore_ipolicy) @@ -7988,7 +8111,8 @@ class TLMigrateInstance(Tasklet): # Check that the target node is correct in terms of instance policy nodeinfo = self.cfg.GetNodeInfo(self.target_node) group_info = self.cfg.GetNodeGroup(nodeinfo.group) - ipolicy = _CalculateGroupIPolicy(cluster, group_info) + ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster, + group_info) _CheckTargetNodeIPolicy(self.lu, ipolicy, instance, nodeinfo, ignore=self.ignore_ipolicy) @@ -7998,7 +8122,8 @@ class TLMigrateInstance(Tasklet): if self.target_node == instance.primary_node: raise errors.OpPrereqError("Cannot migrate instance %s" " to its primary (%s)" % - (instance.name, instance.primary_node)) + (instance.name, instance.primary_node), + errors.ECODE_STATE) if len(self.lu.tasklets) == 1: # It is safe to release locks only when we're the only tasklet @@ -8027,7 +8152,8 @@ class TLMigrateInstance(Tasklet): errors.ECODE_INVAL) nodeinfo = self.cfg.GetNodeInfo(target_node) group_info = self.cfg.GetNodeGroup(nodeinfo.group) - ipolicy = _CalculateGroupIPolicy(cluster, group_info) + ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster, + group_info) _CheckTargetNodeIPolicy(self.lu, ipolicy, instance, nodeinfo, ignore=self.ignore_ipolicy) @@ -8113,11 +8239,9 @@ class TLMigrateInstance(Tasklet): """ # FIXME: add a self.ignore_ipolicy option - ial = IAllocator(self.cfg, self.rpc, - mode=constants.IALLOCATOR_MODE_RELOC, - name=self.instance_name, - relocate_from=[self.instance.primary_node], - ) + req = iallocator.IAReqRelocate(name=self.instance_name, + relocate_from=[self.instance.primary_node]) + ial = iallocator.IAllocator(self.cfg, self.rpc, req) ial.Run(self.lu.op.iallocator) @@ -8126,15 +8250,10 @@ class TLMigrateInstance(Tasklet): " iallocator '%s': %s" % (self.lu.op.iallocator, ial.info), errors.ECODE_NORES) - if len(ial.result) != ial.required_nodes: - raise errors.OpPrereqError("iallocator '%s' returned invalid number" - " of nodes (%s), required %s" % - (self.lu.op.iallocator, len(ial.result), - ial.required_nodes), errors.ECODE_FAULT) self.target_node = ial.result[0] self.lu.LogInfo("Selected nodes for instance %s via iallocator %s: %s", - self.instance_name, self.lu.op.iallocator, - utils.CommaJoin(ial.result)) + self.instance_name, self.lu.op.iallocator, + utils.CommaJoin(ial.result)) def _WaitUntilSync(self): """Poll with custom rpc for disk sync. @@ -8304,8 +8423,8 @@ class TLMigrateInstance(Tasklet): # Don't raise an exception here, as we stil have to try to revert the # disk status, even if this step failed. - abort_result = self.rpc.call_instance_finalize_migration_src(source_node, - instance, False, self.live) + abort_result = self.rpc.call_instance_finalize_migration_src( + source_node, instance, False, self.live) abort_msg = abort_result.fail_msg if abort_msg: logging.error("Aborting migration failed on source node %s: %s", @@ -8482,7 +8601,7 @@ class TLMigrateInstance(Tasklet): disks = _ExpandCheckDisks(instance, instance.disks) self.feedback_fn("* unmapping instance's disks from %s" % source_node) for disk in disks: - result = self.rpc.call_blockdev_shutdown(source_node, disk) + result = self.rpc.call_blockdev_shutdown(source_node, (disk, instance)) msg = result.fail_msg if msg: logging.error("Migration was successful, but couldn't unmap the" @@ -8712,7 +8831,8 @@ def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names, dev_data = objects.Disk(dev_type=constants.LD_LV, size=size, logical_id=(vgnames[0], names[0]), params={}) - dev_meta = objects.Disk(dev_type=constants.LD_LV, size=DRBD_META_SIZE, + dev_meta = objects.Disk(dev_type=constants.LD_LV, + size=constants.DRBD_META_SIZE, logical_id=(vgnames[1], names[1]), params={}) drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size, @@ -8739,10 +8859,11 @@ _DISK_TEMPLATE_DEVICE_TYPE = { } -def _GenerateDiskTemplate(lu, template_name, instance_name, primary_node, - secondary_nodes, disk_info, file_storage_dir, file_driver, base_index, - feedback_fn, full_disk_params, _req_file_storage=opcodes.RequireFileStorage, - _req_shr_file_storage=opcodes.RequireSharedFileStorage): +def _GenerateDiskTemplate( + lu, template_name, instance_name, primary_node, secondary_nodes, + disk_info, file_storage_dir, file_driver, base_index, + feedback_fn, full_disk_params, _req_file_storage=opcodes.RequireFileStorage, + _req_shr_file_storage=opcodes.RequireSharedFileStorage): """Generate the entire disk layout for a given template type. """ @@ -8872,6 +8993,7 @@ def _WipeDisks(lu, instance): result = lu.rpc.call_blockdev_pause_resume_sync(node, (instance.disks, instance), True) + result.Raise("Failed RPC to node %s for pausing the disk syncing" % node) for idx, success in enumerate(result.payload): if not success: @@ -8919,12 +9041,17 @@ def _WipeDisks(lu, instance): (instance.disks, instance), False) - for idx, success in enumerate(result.payload): - if not success: - lu.LogWarning("Resume sync of disk %d failed, please have a" - " look at the status and troubleshoot the issue", idx) - logging.warn("resume-sync of instance %s for disks %d failed", - instance.name, idx) + if result.fail_msg: + lu.LogWarning("RPC call to %s for resuming disk syncing failed," + " please have a look at the status and troubleshoot" + " the issue: %s", node, result.fail_msg) + else: + for idx, success in enumerate(result.payload): + if not success: + lu.LogWarning("Resume sync of disk %d failed, please have a" + " look at the status and troubleshoot the issue", idx) + logging.warn("resume-sync of instance %s for disks %d failed", + instance.name, idx) def _CreateDisks(lu, instance, to_skip=None, target_node=None): @@ -8993,18 +9120,20 @@ def _RemoveDisks(lu, instance, target_node=None, ignore_failures=False): all_result = True ports_to_release = set() - for (idx, device) in enumerate(instance.disks): + anno_disks = _AnnotateDiskParams(instance, instance.disks, lu.cfg) + for (idx, device) in enumerate(anno_disks): if target_node: edata = [(target_node, device)] else: edata = device.ComputeNodeTree(instance.primary_node) for node, disk in edata: lu.cfg.SetDiskID(disk, node) - msg = lu.rpc.call_blockdev_remove(node, disk).fail_msg - if msg: + result = lu.rpc.call_blockdev_remove(node, disk) + if result.fail_msg: lu.LogWarning("Could not remove disk %s on node %s," - " continuing anyway: %s", idx, node, msg) - all_result = False + " continuing anyway: %s", idx, node, result.fail_msg) + if not (result.offline and node != instance.primary_node): + all_result = False # if this is a DRBD disk, return its port to the pool if device.dev_type in constants.LDS_DRBD: @@ -9049,7 +9178,7 @@ def _ComputeDiskSizePerVG(disk_template, disks): constants.DT_DISKLESS: {}, constants.DT_PLAIN: _compute(disks, 0), # 128 MB are added for drbd metadata for each disk - constants.DT_DRBD8: _compute(disks, DRBD_META_SIZE), + constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE), constants.DT_FILE: {}, constants.DT_SHARED_FILE: {}, } @@ -9061,30 +9190,6 @@ def _ComputeDiskSizePerVG(disk_template, disks): return req_size_dict[disk_template] -def _ComputeDiskSize(disk_template, disks): - """Compute disk size requirements in the volume group - - """ - # Required free disk space as a function of disk and swap space - req_size_dict = { - constants.DT_DISKLESS: None, - constants.DT_PLAIN: sum(d[constants.IDISK_SIZE] for d in disks), - # 128 MB are added for drbd metadata for each disk - constants.DT_DRBD8: - sum(d[constants.IDISK_SIZE] + DRBD_META_SIZE for d in disks), - constants.DT_FILE: None, - constants.DT_SHARED_FILE: 0, - constants.DT_BLOCK: 0, - constants.DT_RBD: 0, - } - - if disk_template not in req_size_dict: - raise errors.ProgrammerError("Disk template '%s' size requirement" - " is unknown" % disk_template) - - return req_size_dict[disk_template] - - def _FilterVmNodes(lu, nodenames): """Filters out non-vm_capable nodes from a list. @@ -9385,19 +9490,19 @@ class LUInstanceCreate(LogicalUnit): """ nics = [n.ToDict() for n in self.nics] - ial = IAllocator(self.cfg, self.rpc, - mode=constants.IALLOCATOR_MODE_ALLOC, - name=self.op.instance_name, - disk_template=self.op.disk_template, - tags=self.op.tags, - os=self.op.os_type, - vcpus=self.be_full[constants.BE_VCPUS], - memory=self.be_full[constants.BE_MAXMEM], - spindle_use=self.be_full[constants.BE_SPINDLE_USE], - disks=self.disks, - nics=nics, - hypervisor=self.op.hypervisor, - ) + memory = self.be_full[constants.BE_MAXMEM] + spindle_use = self.be_full[constants.BE_SPINDLE_USE] + req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name, + disk_template=self.op.disk_template, + tags=self.op.tags, + os=self.op.os_type, + vcpus=self.be_full[constants.BE_VCPUS], + memory=memory, + spindle_use=spindle_use, + disks=self.disks, + nics=nics, + hypervisor=self.op.hypervisor) + ial = iallocator.IAllocator(self.cfg, self.rpc, req) ial.Run(self.op.iallocator) @@ -9406,16 +9511,14 @@ class LUInstanceCreate(LogicalUnit): " iallocator '%s': %s" % (self.op.iallocator, ial.info), errors.ECODE_NORES) - if len(ial.result) != ial.required_nodes: - raise errors.OpPrereqError("iallocator '%s' returned invalid number" - " of nodes (%s), required %s" % - (self.op.iallocator, len(ial.result), - ial.required_nodes), errors.ECODE_FAULT) self.op.pnode = ial.result[0] self.LogInfo("Selected nodes for instance %s via iallocator %s: %s", self.op.instance_name, self.op.iallocator, utils.CommaJoin(ial.result)) - if ial.required_nodes == 2: + + assert req.RequiredNodes() in (1, 2), "Wrong node count from iallocator" + + if req.RequiredNodes() == 2: self.op.snode = ial.result[1] def BuildHooksEnv(self): @@ -9524,7 +9627,9 @@ class LUInstanceCreate(LogicalUnit): if self.op.disk_template not in constants.DISK_TEMPLATES: raise errors.OpPrereqError("Disk template specified in configuration" " file is not one of the allowed values:" - " %s" % " ".join(constants.DISK_TEMPLATES)) + " %s" % + " ".join(constants.DISK_TEMPLATES), + errors.ECODE_INVAL) else: raise errors.OpPrereqError("No disk template specified and the export" " is missing the disk_template information", @@ -9637,7 +9742,8 @@ class LUInstanceCreate(LogicalUnit): cfg_storagedir = get_fsd_fn() if not cfg_storagedir: - raise errors.OpPrereqError("Cluster file storage dir not defined") + raise errors.OpPrereqError("Cluster file storage dir not defined", + errors.ECODE_STATE) joinargs.append(cfg_storagedir) if self.op.file_storage_dir is not None: @@ -9657,6 +9763,9 @@ class LUInstanceCreate(LogicalUnit): if self.op.mode == constants.INSTANCE_IMPORT: export_info = self._ReadExportInfo() self._ReadExportParams(export_info) + self._old_instance_name = export_info.get(constants.INISECT_INS, "name") + else: + self._old_instance_name = None if (not self.cfg.GetVGName() and self.op.disk_template not in constants.DTS_NOT_LVM): @@ -9671,8 +9780,8 @@ class LUInstanceCreate(LogicalUnit): enabled_hvs = cluster.enabled_hypervisors if self.op.hypervisor not in enabled_hvs: raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the" - " cluster (%s)" % (self.op.hypervisor, - ",".join(enabled_hvs)), + " cluster (%s)" % + (self.op.hypervisor, ",".join(enabled_hvs)), errors.ECODE_STATE) # Check tag validity @@ -9810,8 +9919,7 @@ class LUInstanceCreate(LogicalUnit): self.src_images = disk_images - old_name = export_info.get(constants.INISECT_INS, "name") - if self.op.instance_name == old_name: + if self.op.instance_name == self._old_instance_name: for idx, nic in enumerate(self.nics): if nic.mac == constants.VALUE_AUTO: nic_mac_ini = "nic%d_mac" % idx @@ -9900,7 +10008,7 @@ class LUInstanceCreate(LogicalUnit): } group_info = self.cfg.GetNodeGroup(pnode.group) - ipolicy = _CalculateGroupIPolicy(cluster, group_info) + ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster, group_info) res = _ComputeIPolicyInstanceSpecViolation(ipolicy, ispec) if not self.op.ignore_ipolicy and res: raise errors.OpPrereqError(("Instance allocation to group %s violates" @@ -10130,6 +10238,11 @@ class LUInstanceCreate(LogicalUnit): _ReleaseLocks(self, locking.LEVEL_NODE_RES) if iobj.disk_template != constants.DT_DISKLESS and not self.adopt_disks: + # we need to set the disks ID to the primary node, since the + # preceding code might or might have not done it, depending on + # disk template and other options + for disk in iobj.disks: + self.cfg.SetDiskID(disk, pnode_name) if self.op.mode == constants.INSTANCE_CREATE: if not self.op.no_install: pause_sync = (iobj.disk_template in constants.DTS_INT_MIRROR and @@ -10162,67 +10275,73 @@ class LUInstanceCreate(LogicalUnit): os_add_result.Raise("Could not add os for instance %s" " on node %s" % (instance, pnode_name)) - elif self.op.mode == constants.INSTANCE_IMPORT: - feedback_fn("* running the instance OS import scripts...") + else: + if self.op.mode == constants.INSTANCE_IMPORT: + feedback_fn("* running the instance OS import scripts...") + + transfers = [] + + for idx, image in enumerate(self.src_images): + if not image: + continue + + # FIXME: pass debug option from opcode to backend + dt = masterd.instance.DiskTransfer("disk/%s" % idx, + constants.IEIO_FILE, (image, ), + constants.IEIO_SCRIPT, + (iobj.disks[idx], idx), + None) + transfers.append(dt) + + import_result = \ + masterd.instance.TransferInstanceData(self, feedback_fn, + self.op.src_node, pnode_name, + self.pnode.secondary_ip, + iobj, transfers) + if not compat.all(import_result): + self.LogWarning("Some disks for instance %s on node %s were not" + " imported successfully" % (instance, pnode_name)) + + rename_from = self._old_instance_name + + elif self.op.mode == constants.INSTANCE_REMOTE_IMPORT: + feedback_fn("* preparing remote import...") + # The source cluster will stop the instance before attempting to make + # a connection. In some cases stopping an instance can take a long + # time, hence the shutdown timeout is added to the connection + # timeout. + connect_timeout = (constants.RIE_CONNECT_TIMEOUT + + self.op.source_shutdown_timeout) + timeouts = masterd.instance.ImportExportTimeouts(connect_timeout) - transfers = [] + assert iobj.primary_node == self.pnode.name + disk_results = \ + masterd.instance.RemoteImport(self, feedback_fn, iobj, self.pnode, + self.source_x509_ca, + self._cds, timeouts) + if not compat.all(disk_results): + # TODO: Should the instance still be started, even if some disks + # failed to import (valid for local imports, too)? + self.LogWarning("Some disks for instance %s on node %s were not" + " imported successfully" % (instance, pnode_name)) - for idx, image in enumerate(self.src_images): - if not image: - continue + rename_from = self.source_instance_name - # FIXME: pass debug option from opcode to backend - dt = masterd.instance.DiskTransfer("disk/%s" % idx, - constants.IEIO_FILE, (image, ), - constants.IEIO_SCRIPT, - (iobj.disks[idx], idx), - None) - transfers.append(dt) - - import_result = \ - masterd.instance.TransferInstanceData(self, feedback_fn, - self.op.src_node, pnode_name, - self.pnode.secondary_ip, - iobj, transfers) - if not compat.all(import_result): - self.LogWarning("Some disks for instance %s on node %s were not" - " imported successfully" % (instance, pnode_name)) - - elif self.op.mode == constants.INSTANCE_REMOTE_IMPORT: - feedback_fn("* preparing remote import...") - # The source cluster will stop the instance before attempting to make a - # connection. In some cases stopping an instance can take a long time, - # hence the shutdown timeout is added to the connection timeout. - connect_timeout = (constants.RIE_CONNECT_TIMEOUT + - self.op.source_shutdown_timeout) - timeouts = masterd.instance.ImportExportTimeouts(connect_timeout) - - assert iobj.primary_node == self.pnode.name - disk_results = \ - masterd.instance.RemoteImport(self, feedback_fn, iobj, self.pnode, - self.source_x509_ca, - self._cds, timeouts) - if not compat.all(disk_results): - # TODO: Should the instance still be started, even if some disks - # failed to import (valid for local imports, too)? - self.LogWarning("Some disks for instance %s on node %s were not" - " imported successfully" % (instance, pnode_name)) + else: + # also checked in the prereq part + raise errors.ProgrammerError("Unknown OS initialization mode '%s'" + % self.op.mode) # Run rename script on newly imported instance assert iobj.name == instance feedback_fn("Running rename script for %s" % instance) result = self.rpc.call_instance_run_rename(pnode_name, iobj, - self.source_instance_name, + rename_from, self.op.debug_level) if result.fail_msg: self.LogWarning("Failed to run rename script for %s on node" " %s: %s" % (instance, pnode_name, result.fail_msg)) - else: - # also checked in the prereq part - raise errors.ProgrammerError("Unknown OS initialization mode '%s'" - % self.op.mode) - assert not self.owned_locks(locking.LEVEL_NODE_RES) if self.op.start: @@ -10383,9 +10502,10 @@ class LUInstanceReplaceDisks(LogicalUnit): assert not self.needed_locks[locking.LEVEL_NODE] # Lock member nodes of all locked groups - self.needed_locks[locking.LEVEL_NODE] = [node_name - for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP) - for node_name in self.cfg.GetNodeGroup(group_uuid).members] + self.needed_locks[locking.LEVEL_NODE] = \ + [node_name + for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP) + for node_name in self.cfg.GetNodeGroup(group_uuid).members] else: self._LockInstancesNodes() elif level == locking.LEVEL_NODE_RES: @@ -10468,22 +10588,22 @@ class TLReplaceDisks(Tasklet): self.node_secondary_ip = None @staticmethod - def CheckArguments(mode, remote_node, iallocator): + def CheckArguments(mode, remote_node, ialloc): """Helper function for users of this class. """ # check for valid parameter combination if mode == constants.REPLACE_DISK_CHG: - if remote_node is None and iallocator is None: + if remote_node is None and ialloc is None: raise errors.OpPrereqError("When changing the secondary either an" " iallocator script must be used or the" " new node given", errors.ECODE_INVAL) - if remote_node is not None and iallocator is not None: + if remote_node is not None and ialloc is not None: raise errors.OpPrereqError("Give either the iallocator or the new" " secondary, not both", errors.ECODE_INVAL) - elif remote_node is not None or iallocator is not None: + elif remote_node is not None or ialloc is not None: # Not replacing the secondary raise errors.OpPrereqError("The iallocator and new node options can" " only be used when changing the" @@ -10494,10 +10614,9 @@ class TLReplaceDisks(Tasklet): """Compute a new secondary node using an IAllocator. """ - ial = IAllocator(lu.cfg, lu.rpc, - mode=constants.IALLOCATOR_MODE_RELOC, - name=instance_name, - relocate_from=list(relocate_from)) + req = iallocator.IAReqRelocate(name=instance_name, + relocate_from=list(relocate_from)) + ial = iallocator.IAllocator(lu.cfg, lu.rpc, req) ial.Run(iallocator_name) @@ -10506,13 +10625,6 @@ class TLReplaceDisks(Tasklet): " %s" % (iallocator_name, ial.info), errors.ECODE_NORES) - if len(ial.result) != ial.required_nodes: - raise errors.OpPrereqError("iallocator '%s' returned invalid number" - " of nodes (%s), required %s" % - (iallocator_name, - len(ial.result), ial.required_nodes), - errors.ECODE_FAULT) - remote_node_name = ial.result[0] lu.LogInfo("Selected new secondary for instance '%s': %s", @@ -10685,8 +10797,9 @@ class TLReplaceDisks(Tasklet): if self.remote_node_info: # We change the node, lets verify it still meets instance policy new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group) - ipolicy = _CalculateGroupIPolicy(self.cfg.GetClusterInfo(), - new_group_info) + cluster = self.cfg.GetClusterInfo() + ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster, + new_group_info) _CheckTargetNodeIPolicy(self, ipolicy, instance, self.remote_node_info, ignore=self.ignore_ipolicy) @@ -10856,7 +10969,8 @@ class TLReplaceDisks(Tasklet): logical_id=(vg_data, names[0]), params=data_disk.params) vg_meta = meta_disk.logical_id[0] - lv_meta = objects.Disk(dev_type=constants.LD_LV, size=DRBD_META_SIZE, + lv_meta = objects.Disk(dev_type=constants.LD_LV, + size=constants.DRBD_META_SIZE, logical_id=(vg_meta, names[1]), params=meta_disk.params) @@ -11135,7 +11249,8 @@ class TLReplaceDisks(Tasklet): for idx, dev in enumerate(self.instance.disks): self.lu.LogInfo("Shutting down drbd for disk/%d on old node" % idx) self.cfg.SetDiskID(dev, self.target_node) - msg = self.rpc.call_blockdev_shutdown(self.target_node, dev).fail_msg + msg = self.rpc.call_blockdev_shutdown(self.target_node, + (dev, self.instance)).fail_msg if msg: self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old" "node: %s" % (idx, msg), @@ -11442,9 +11557,10 @@ class LUNodeEvacuate(NoHooksLU): elif self.op.iallocator is not None: # TODO: Implement relocation to other group - ial = IAllocator(self.cfg, self.rpc, constants.IALLOCATOR_MODE_NODE_EVAC, - evac_mode=self._MODE2IALLOCATOR[self.op.mode], - instances=list(self.instance_names)) + evac_mode = self._MODE2IALLOCATOR[self.op.mode] + req = iallocator.IAReqNodeEvac(evac_mode=evac_mode, + instances=list(self.instance_names)) + ial = iallocator.IAllocator(self.cfg, self.rpc, req) ial.Run(self.op.iallocator) @@ -11644,23 +11760,23 @@ class LUInstanceGrowDisk(LogicalUnit): for node in instance.all_nodes: self.cfg.SetDiskID(disk, node) result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta, - True) + True, True) result.Raise("Grow request failed to node %s" % node) # We know that (as far as we can test) operations across different - # nodes will succeed, time to run it for real + # nodes will succeed, time to run it for real on the backing storage for node in instance.all_nodes: self.cfg.SetDiskID(disk, node) result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta, - False) + False, True) result.Raise("Grow request failed to node %s" % node) - # TODO: Rewrite code to work properly - # DRBD goes into sync mode for a short amount of time after executing the - # "resize" command. DRBD 8.x below version 8.0.13 contains a bug whereby - # calling "resize" in sync mode fails. Sleeping for a short amount of - # time is a work-around. - time.sleep(5) + # And now execute it for logical storage, on the primary node + node = instance.primary_node + self.cfg.SetDiskID(disk, node) + result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta, + False, False) + result.Raise("Grow request failed to node %s" % node) disk.RecordGrow(self.delta) self.cfg.Update(instance, feedback_fn) @@ -12172,12 +12288,10 @@ class LUInstanceSetParams(LogicalUnit): if self.op.hvparams: _CheckGlobalHvParams(self.op.hvparams) - self.op.disks = \ - self._UpgradeDiskNicMods("disk", self.op.disks, - opcodes.OpInstanceSetParams.TestDiskModifications) - self.op.nics = \ - self._UpgradeDiskNicMods("NIC", self.op.nics, - opcodes.OpInstanceSetParams.TestNicModifications) + self.op.disks = self._UpgradeDiskNicMods( + "disk", self.op.disks, opcodes.OpInstanceSetParams.TestDiskModifications) + self.op.nics = self._UpgradeDiskNicMods( + "NIC", self.op.nics, opcodes.OpInstanceSetParams.TestNicModifications) # Check disk modifications self._CheckMods("disk", self.op.disks, constants.IDISK_PARAMS_TYPES, @@ -12314,8 +12428,6 @@ class LUInstanceSetParams(LogicalUnit): private.params = new_params private.filled = new_filled_params - return (None, None) - def CheckPrereq(self): """Check prerequisites. @@ -12378,7 +12490,8 @@ class LUInstanceSetParams(LogicalUnit): snode_info = self.cfg.GetNodeInfo(self.op.remote_node) snode_group = self.cfg.GetNodeGroup(snode_info.group) - ipolicy = _CalculateGroupIPolicy(cluster, snode_group) + ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster, + snode_group) _CheckTargetNodeIPolicy(self, ipolicy, instance, snode_info, ignore=self.op.ignore_ipolicy) if pnode_info.group != snode_info.group: @@ -12418,7 +12531,7 @@ class LUInstanceSetParams(LogicalUnit): self.be_proposed = cluster.SimpleFillBE(instance.beparams) be_old = cluster.FillBE(instance) - # CPU param validation -- checking every time a paramtere is + # CPU param validation -- checking every time a parameter is # changed to cover all cases where either CPU mask or vcpus have # changed if (constants.BE_VCPUS in self.be_proposed and @@ -12479,7 +12592,7 @@ class LUInstanceSetParams(LogicalUnit): " free memory information" % pnode) elif instance_info.fail_msg: self.warn.append("Can't get instance runtime information: %s" % - instance_info.fail_msg) + instance_info.fail_msg) else: if instance_info.payload: current_mem = int(instance_info.payload["memory"]) @@ -12496,8 +12609,7 @@ class LUInstanceSetParams(LogicalUnit): raise errors.OpPrereqError("This change will prevent the instance" " from starting, due to %d MB of memory" " missing on its primary node" % - miss_mem, - errors.ECODE_NORES) + miss_mem, errors.ECODE_NORES) if be_new[constants.BE_AUTO_BALANCE]: for node, nres in nodeinfo.items(): @@ -12523,8 +12635,8 @@ class LUInstanceSetParams(LogicalUnit): instance.hypervisor) remote_info.Raise("Error checking node %s" % instance.primary_node) if not remote_info.payload: # not running already - raise errors.OpPrereqError("Instance %s is not running" % instance.name, - errors.ECODE_STATE) + raise errors.OpPrereqError("Instance %s is not running" % + instance.name, errors.ECODE_STATE) current_memory = remote_info.payload["memory"] if (not self.op.force and @@ -12532,7 +12644,8 @@ class LUInstanceSetParams(LogicalUnit): self.op.runtime_mem < self.be_proposed[constants.BE_MINMEM])): raise errors.OpPrereqError("Instance %s must have memory between %d" " and %d MB of memory unless --force is" - " given" % (instance.name, + " given" % + (instance.name, self.be_proposed[constants.BE_MINMEM], self.be_proposed[constants.BE_MAXMEM]), errors.ECODE_INVAL) @@ -12546,16 +12659,16 @@ class LUInstanceSetParams(LogicalUnit): if self.op.disks and instance.disk_template == constants.DT_DISKLESS: raise errors.OpPrereqError("Disk operations not supported for" - " diskless instances", - errors.ECODE_INVAL) + " diskless instances", errors.ECODE_INVAL) def _PrepareNicCreate(_, params, private): - return self._PrepareNicModification(params, private, None, {}, - cluster, pnode) + self._PrepareNicModification(params, private, None, {}, cluster, pnode) + return (None, None) def _PrepareNicMod(_, nic, params, private): - return self._PrepareNicModification(params, private, nic.ip, - nic.nicparams, cluster, pnode) + self._PrepareNicModification(params, private, nic.ip, + nic.nicparams, cluster, pnode) + return None # Verify NIC changes (operating on copy) nics = instance.nics[:] @@ -12667,8 +12780,8 @@ class LUInstanceSetParams(LogicalUnit): snode = instance.secondary_nodes[0] feedback_fn("Converting template to plain") - old_disks = instance.disks - new_disks = [d.children[0] for d in old_disks] + old_disks = _AnnotateDiskParams(instance, instance.disks, self.cfg) + new_disks = [d.children[0] for d in instance.disks] # copy over size and mode for parent, child in zip(old_disks, new_disks): @@ -12758,7 +12871,8 @@ class LUInstanceSetParams(LogicalUnit): """Removes a disk. """ - for node, disk in root.ComputeNodeTree(self.instance.primary_node): + (anno_disk,) = _AnnotateDiskParams(self.instance, [root], self.cfg) + for node, disk in anno_disk.ComputeNodeTree(self.instance.primary_node): self.cfg.SetDiskID(disk, node) msg = self.rpc.call_blockdev_remove(node, disk).fail_msg if msg: @@ -13043,8 +13157,9 @@ class LUInstanceChangeGroup(LogicalUnit): assert instances == [self.op.instance_name], "Instance not locked" - ial = IAllocator(self.cfg, self.rpc, constants.IALLOCATOR_MODE_CHG_GROUP, - instances=instances, target_groups=list(self.target_uuids)) + req = iallocator.IAReqGroupChange(instances=instances, + target_groups=list(self.target_uuids)) + ial = iallocator.IAllocator(self.cfg, self.rpc, req) ial.Run(self.op.iallocator) @@ -13052,8 +13167,7 @@ class LUInstanceChangeGroup(LogicalUnit): raise errors.OpPrereqError("Can't compute solution for changing group of" " instance '%s' using iallocator '%s': %s" % (self.op.instance_name, self.op.iallocator, - ial.info), - errors.ECODE_NORES) + ial.info), errors.ECODE_NORES) jobs = _LoadNodeEvacResult(self, ial.result, self.op.early_release, False) @@ -13282,7 +13396,7 @@ class LUBackupExport(LogicalUnit): self.instance.admin_state == constants.ADMINST_UP and not self.op.shutdown): raise errors.OpPrereqError("Can not remove instance without shutting it" - " down before") + " down before", errors.ECODE_STATE) if self.op.mode == constants.EXPORT_MODE_LOCAL: self.op.target_node = _ExpandNodeName(self.cfg, self.op.target_node) @@ -13312,7 +13426,8 @@ class LUBackupExport(LogicalUnit): try: (key_name, hmac_digest, hmac_salt) = self.x509_key_name except (TypeError, ValueError), err: - raise errors.OpPrereqError("Invalid data for X509 key name: %s" % err) + raise errors.OpPrereqError("Invalid data for X509 key name: %s" % err, + errors.ECODE_INVAL) if not utils.VerifySha1Hmac(cds, key_name, hmac_digest, salt=hmac_salt): raise errors.OpPrereqError("HMAC for X509 key name is wrong", @@ -13588,14 +13703,19 @@ class LUGroupAdd(LogicalUnit): utils.ForceDictType(self.op.diskparams[templ], constants.DISK_DT_TYPES) self.new_diskparams = self.op.diskparams + try: + utils.VerifyDictOptions(self.new_diskparams, constants.DISK_DT_DEFAULTS) + except errors.OpPrereqError, err: + raise errors.OpPrereqError("While verify diskparams options: %s" % err, + errors.ECODE_INVAL) else: - self.new_diskparams = None + self.new_diskparams = {} if self.op.ipolicy: cluster = self.cfg.GetClusterInfo() full_ipolicy = cluster.SimpleFillIPolicy(self.op.ipolicy) try: - objects.InstancePolicy.CheckParameterSyntax(full_ipolicy) + objects.InstancePolicy.CheckParameterSyntax(full_ipolicy, False) except errors.ConfigurationError, err: raise errors.OpPrereqError("Invalid instance policy: %s" % err, errors.ECODE_INVAL) @@ -13841,7 +13961,8 @@ class _GroupQuery(_QueryBase): return query.GroupQueryData(self._cluster, [self._all_groups[uuid] for uuid in self.wanted], - group_to_nodes, group_to_instances) + group_to_nodes, group_to_instances, + query.GQ_DISKPARAMS in self.requested_data) class LUGroupQuery(NoHooksLU): @@ -13933,17 +14054,26 @@ class LUGroupSetParams(LogicalUnit): if self.op.ndparams: new_ndparams = _GetUpdatedParams(self.group.ndparams, self.op.ndparams) - utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES) + utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES) self.new_ndparams = new_ndparams if self.op.diskparams: diskparams = self.group.diskparams + uavdp = self._UpdateAndVerifyDiskParams + # For each disktemplate subdict update and verify the values new_diskparams = dict((dt, - self._UpdateAndVerifyDiskParams(diskparams[dt], - self.op.diskparams[dt])) + uavdp(diskparams.get(dt, {}), + self.op.diskparams[dt])) for dt in constants.DISK_TEMPLATES if dt in self.op.diskparams) - self.new_diskparams = objects.FillDiskParams(diskparams, new_diskparams) + # As we've all subdicts of diskparams ready, lets merge the actual + # dict with all updated subdicts + self.new_diskparams = objects.FillDict(diskparams, new_diskparams) + try: + utils.VerifyDictOptions(self.new_diskparams, constants.DISK_DT_DEFAULTS) + except errors.OpPrereqError, err: + raise errors.OpPrereqError("While verify diskparams options: %s" % err, + errors.ECODE_INVAL) if self.op.hv_state: self.new_hv_state = _MergeAndVerifyHvState(self.op.hv_state, @@ -13962,9 +14092,10 @@ class LUGroupSetParams(LogicalUnit): new_ipolicy = cluster.SimpleFillIPolicy(self.new_ipolicy) inst_filter = lambda inst: inst.name in owned_instances instances = self.cfg.GetInstancesInfoByFilter(inst_filter).values() + gmi = ganeti.masterd.instance violations = \ - _ComputeNewInstanceViolations(_CalculateGroupIPolicy(cluster, - self.group), + _ComputeNewInstanceViolations(gmi.CalculateGroupIPolicy(cluster, + self.group), new_ipolicy, instances) if violations: @@ -14052,9 +14183,8 @@ class LUGroupRemove(LogicalUnit): # Verify the cluster would not be left group-less. if len(self.cfg.GetNodeGroupList()) == 1: - raise errors.OpPrereqError("Group '%s' is the only group," - " cannot be removed" % - self.op.group_name, + raise errors.OpPrereqError("Group '%s' is the only group, cannot be" + " removed" % self.op.group_name, errors.ECODE_STATE) def BuildHooksEnv(self): @@ -14283,8 +14413,9 @@ class LUGroupEvacuate(LogicalUnit): assert self.group_uuid not in self.target_uuids - ial = IAllocator(self.cfg, self.rpc, constants.IALLOCATOR_MODE_CHG_GROUP, - instances=instances, target_groups=self.target_uuids) + req = iallocator.IAReqGroupChange(instances=instances, + target_groups=self.target_uuids) + ial = iallocator.IAllocator(self.cfg, self.rpc, req) ial.Run(self.op.iallocator) @@ -14657,519 +14788,6 @@ class LUTestJqueue(NoHooksLU): return True -class IAllocator(object): - """IAllocator framework. - - An IAllocator instance has three sets of attributes: - - cfg that is needed to query the cluster - - input data (all members of the _KEYS class attribute are required) - - four buffer attributes (in|out_data|text), that represent the - input (to the external script) in text and data structure format, - and the output from it, again in two formats - - the result variables from the script (success, info, nodes) for - easy usage - - """ - # pylint: disable=R0902 - # lots of instance attributes - - def __init__(self, cfg, rpc_runner, mode, **kwargs): - self.cfg = cfg - self.rpc = rpc_runner - # init buffer variables - self.in_text = self.out_text = self.in_data = self.out_data = None - # init all input fields so that pylint is happy - self.mode = mode - self.memory = self.disks = self.disk_template = self.spindle_use = None - self.os = self.tags = self.nics = self.vcpus = None - self.hypervisor = None - self.relocate_from = None - self.name = None - self.instances = None - self.evac_mode = None - self.target_groups = [] - # computed fields - self.required_nodes = None - # init result fields - self.success = self.info = self.result = None - - try: - (fn, keydata, self._result_check) = self._MODE_DATA[self.mode] - except KeyError: - raise errors.ProgrammerError("Unknown mode '%s' passed to the" - " IAllocator" % self.mode) - - keyset = [n for (n, _) in keydata] - - for key in kwargs: - if key not in keyset: - raise errors.ProgrammerError("Invalid input parameter '%s' to" - " IAllocator" % key) - setattr(self, key, kwargs[key]) - - for key in keyset: - if key not in kwargs: - raise errors.ProgrammerError("Missing input parameter '%s' to" - " IAllocator" % key) - self._BuildInputData(compat.partial(fn, self), keydata) - - def _ComputeClusterData(self): - """Compute the generic allocator input data. - - This is the data that is independent of the actual operation. - - """ - cfg = self.cfg - cluster_info = cfg.GetClusterInfo() - # cluster data - data = { - "version": constants.IALLOCATOR_VERSION, - "cluster_name": cfg.GetClusterName(), - "cluster_tags": list(cluster_info.GetTags()), - "enabled_hypervisors": list(cluster_info.enabled_hypervisors), - "ipolicy": cluster_info.ipolicy, - } - ninfo = cfg.GetAllNodesInfo() - iinfo = cfg.GetAllInstancesInfo().values() - i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo] - - # node data - node_list = [n.name for n in ninfo.values() if n.vm_capable] - - if self.mode == constants.IALLOCATOR_MODE_ALLOC: - hypervisor_name = self.hypervisor - elif self.mode == constants.IALLOCATOR_MODE_RELOC: - hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor - else: - hypervisor_name = cluster_info.primary_hypervisor - - node_data = self.rpc.call_node_info(node_list, [cfg.GetVGName()], - [hypervisor_name]) - node_iinfo = \ - self.rpc.call_all_instances_info(node_list, - cluster_info.enabled_hypervisors) - - data["nodegroups"] = self._ComputeNodeGroupData(cfg) - - config_ndata = self._ComputeBasicNodeData(cfg, ninfo) - data["nodes"] = self._ComputeDynamicNodeData(ninfo, node_data, node_iinfo, - i_list, config_ndata) - assert len(data["nodes"]) == len(ninfo), \ - "Incomplete node data computed" - - data["instances"] = self._ComputeInstanceData(cluster_info, i_list) - - self.in_data = data - - @staticmethod - def _ComputeNodeGroupData(cfg): - """Compute node groups data. - - """ - cluster = cfg.GetClusterInfo() - ng = dict((guuid, { - "name": gdata.name, - "alloc_policy": gdata.alloc_policy, - "ipolicy": _CalculateGroupIPolicy(cluster, gdata), - }) - for guuid, gdata in cfg.GetAllNodeGroupsInfo().items()) - - return ng - - @staticmethod - def _ComputeBasicNodeData(cfg, node_cfg): - """Compute global node data. - - @rtype: dict - @returns: a dict of name: (node dict, node config) - - """ - # fill in static (config-based) values - node_results = dict((ninfo.name, { - "tags": list(ninfo.GetTags()), - "primary_ip": ninfo.primary_ip, - "secondary_ip": ninfo.secondary_ip, - "offline": ninfo.offline, - "drained": ninfo.drained, - "master_candidate": ninfo.master_candidate, - "group": ninfo.group, - "master_capable": ninfo.master_capable, - "vm_capable": ninfo.vm_capable, - "ndparams": cfg.GetNdParams(ninfo), - }) - for ninfo in node_cfg.values()) - - return node_results - - @staticmethod - def _ComputeDynamicNodeData(node_cfg, node_data, node_iinfo, i_list, - node_results): - """Compute global node data. - - @param node_results: the basic node structures as filled from the config - - """ - #TODO(dynmem): compute the right data on MAX and MIN memory - # make a copy of the current dict - node_results = dict(node_results) - for nname, nresult in node_data.items(): - assert nname in node_results, "Missing basic data for node %s" % nname - ninfo = node_cfg[nname] - - if not (ninfo.offline or ninfo.drained): - nresult.Raise("Can't get data for node %s" % nname) - node_iinfo[nname].Raise("Can't get node instance info from node %s" % - nname) - remote_info = _MakeLegacyNodeInfo(nresult.payload) - - for attr in ["memory_total", "memory_free", "memory_dom0", - "vg_size", "vg_free", "cpu_total"]: - if attr not in remote_info: - raise errors.OpExecError("Node '%s' didn't return attribute" - " '%s'" % (nname, attr)) - if not isinstance(remote_info[attr], int): - raise errors.OpExecError("Node '%s' returned invalid value" - " for '%s': %s" % - (nname, attr, remote_info[attr])) - # compute memory used by primary instances - i_p_mem = i_p_up_mem = 0 - for iinfo, beinfo in i_list: - if iinfo.primary_node == nname: - i_p_mem += beinfo[constants.BE_MAXMEM] - if iinfo.name not in node_iinfo[nname].payload: - i_used_mem = 0 - else: - i_used_mem = int(node_iinfo[nname].payload[iinfo.name]["memory"]) - i_mem_diff = beinfo[constants.BE_MAXMEM] - i_used_mem - remote_info["memory_free"] -= max(0, i_mem_diff) - - if iinfo.admin_state == constants.ADMINST_UP: - i_p_up_mem += beinfo[constants.BE_MAXMEM] - - # compute memory used by instances - pnr_dyn = { - "total_memory": remote_info["memory_total"], - "reserved_memory": remote_info["memory_dom0"], - "free_memory": remote_info["memory_free"], - "total_disk": remote_info["vg_size"], - "free_disk": remote_info["vg_free"], - "total_cpus": remote_info["cpu_total"], - "i_pri_memory": i_p_mem, - "i_pri_up_memory": i_p_up_mem, - } - pnr_dyn.update(node_results[nname]) - node_results[nname] = pnr_dyn - - return node_results - - @staticmethod - def _ComputeInstanceData(cluster_info, i_list): - """Compute global instance data. - - """ - instance_data = {} - for iinfo, beinfo in i_list: - nic_data = [] - for nic in iinfo.nics: - filled_params = cluster_info.SimpleFillNIC(nic.nicparams) - nic_dict = { - "mac": nic.mac, - "ip": nic.ip, - "mode": filled_params[constants.NIC_MODE], - "link": filled_params[constants.NIC_LINK], - } - if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED: - nic_dict["bridge"] = filled_params[constants.NIC_LINK] - nic_data.append(nic_dict) - pir = { - "tags": list(iinfo.GetTags()), - "admin_state": iinfo.admin_state, - "vcpus": beinfo[constants.BE_VCPUS], - "memory": beinfo[constants.BE_MAXMEM], - "spindle_use": beinfo[constants.BE_SPINDLE_USE], - "os": iinfo.os, - "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes), - "nics": nic_data, - "disks": [{constants.IDISK_SIZE: dsk.size, - constants.IDISK_MODE: dsk.mode} - for dsk in iinfo.disks], - "disk_template": iinfo.disk_template, - "hypervisor": iinfo.hypervisor, - } - pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template, - pir["disks"]) - instance_data[iinfo.name] = pir - - return instance_data - - def _AddNewInstance(self): - """Add new instance data to allocator structure. - - This in combination with _AllocatorGetClusterData will create the - correct structure needed as input for the allocator. - - The checks for the completeness of the opcode must have already been - done. - - """ - disk_space = _ComputeDiskSize(self.disk_template, self.disks) - - if self.disk_template in constants.DTS_INT_MIRROR: - self.required_nodes = 2 - else: - self.required_nodes = 1 - - request = { - "name": self.name, - "disk_template": self.disk_template, - "tags": self.tags, - "os": self.os, - "vcpus": self.vcpus, - "memory": self.memory, - "spindle_use": self.spindle_use, - "disks": self.disks, - "disk_space_total": disk_space, - "nics": self.nics, - "required_nodes": self.required_nodes, - "hypervisor": self.hypervisor, - } - - return request - - def _AddRelocateInstance(self): - """Add relocate instance data to allocator structure. - - This in combination with _IAllocatorGetClusterData will create the - correct structure needed as input for the allocator. - - The checks for the completeness of the opcode must have already been - done. - - """ - instance = self.cfg.GetInstanceInfo(self.name) - if instance is None: - raise errors.ProgrammerError("Unknown instance '%s' passed to" - " IAllocator" % self.name) - - if instance.disk_template not in constants.DTS_MIRRORED: - raise errors.OpPrereqError("Can't relocate non-mirrored instances", - errors.ECODE_INVAL) - - if instance.disk_template in constants.DTS_INT_MIRROR and \ - len(instance.secondary_nodes) != 1: - raise errors.OpPrereqError("Instance has not exactly one secondary node", - errors.ECODE_STATE) - - self.required_nodes = 1 - disk_sizes = [{constants.IDISK_SIZE: disk.size} for disk in instance.disks] - disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes) - - request = { - "name": self.name, - "disk_space_total": disk_space, - "required_nodes": self.required_nodes, - "relocate_from": self.relocate_from, - } - return request - - def _AddNodeEvacuate(self): - """Get data for node-evacuate requests. - - """ - return { - "instances": self.instances, - "evac_mode": self.evac_mode, - } - - def _AddChangeGroup(self): - """Get data for node-evacuate requests. - - """ - return { - "instances": self.instances, - "target_groups": self.target_groups, - } - - def _BuildInputData(self, fn, keydata): - """Build input data structures. - - """ - self._ComputeClusterData() - - request = fn() - request["type"] = self.mode - for keyname, keytype in keydata: - if keyname not in request: - raise errors.ProgrammerError("Request parameter %s is missing" % - keyname) - val = request[keyname] - if not keytype(val): - raise errors.ProgrammerError("Request parameter %s doesn't pass" - " validation, value %s, expected" - " type %s" % (keyname, val, keytype)) - self.in_data["request"] = request - - self.in_text = serializer.Dump(self.in_data) - - _STRING_LIST = ht.TListOf(ht.TString) - _JOB_LIST = ht.TListOf(ht.TListOf(ht.TStrictDict(True, False, { - # pylint: disable=E1101 - # Class '...' has no 'OP_ID' member - "OP_ID": ht.TElemOf([opcodes.OpInstanceFailover.OP_ID, - opcodes.OpInstanceMigrate.OP_ID, - opcodes.OpInstanceReplaceDisks.OP_ID]) - }))) - - _NEVAC_MOVED = \ - ht.TListOf(ht.TAnd(ht.TIsLength(3), - ht.TItems([ht.TNonEmptyString, - ht.TNonEmptyString, - ht.TListOf(ht.TNonEmptyString), - ]))) - _NEVAC_FAILED = \ - ht.TListOf(ht.TAnd(ht.TIsLength(2), - ht.TItems([ht.TNonEmptyString, - ht.TMaybeString, - ]))) - _NEVAC_RESULT = ht.TAnd(ht.TIsLength(3), - ht.TItems([_NEVAC_MOVED, _NEVAC_FAILED, _JOB_LIST])) - - _MODE_DATA = { - constants.IALLOCATOR_MODE_ALLOC: - (_AddNewInstance, - [ - ("name", ht.TString), - ("memory", ht.TInt), - ("spindle_use", ht.TInt), - ("disks", ht.TListOf(ht.TDict)), - ("disk_template", ht.TString), - ("os", ht.TString), - ("tags", _STRING_LIST), - ("nics", ht.TListOf(ht.TDict)), - ("vcpus", ht.TInt), - ("hypervisor", ht.TString), - ], ht.TList), - constants.IALLOCATOR_MODE_RELOC: - (_AddRelocateInstance, - [("name", ht.TString), ("relocate_from", _STRING_LIST)], - ht.TList), - constants.IALLOCATOR_MODE_NODE_EVAC: - (_AddNodeEvacuate, [ - ("instances", _STRING_LIST), - ("evac_mode", ht.TElemOf(constants.IALLOCATOR_NEVAC_MODES)), - ], _NEVAC_RESULT), - constants.IALLOCATOR_MODE_CHG_GROUP: - (_AddChangeGroup, [ - ("instances", _STRING_LIST), - ("target_groups", _STRING_LIST), - ], _NEVAC_RESULT), - } - - def Run(self, name, validate=True, call_fn=None): - """Run an instance allocator and return the results. - - """ - if call_fn is None: - call_fn = self.rpc.call_iallocator_runner - - result = call_fn(self.cfg.GetMasterNode(), name, self.in_text) - result.Raise("Failure while running the iallocator script") - - self.out_text = result.payload - if validate: - self._ValidateResult() - - def _ValidateResult(self): - """Process the allocator results. - - This will process and if successful save the result in - self.out_data and the other parameters. - - """ - try: - rdict = serializer.Load(self.out_text) - except Exception, err: - raise errors.OpExecError("Can't parse iallocator results: %s" % str(err)) - - if not isinstance(rdict, dict): - raise errors.OpExecError("Can't parse iallocator results: not a dict") - - # TODO: remove backwards compatiblity in later versions - if "nodes" in rdict and "result" not in rdict: - rdict["result"] = rdict["nodes"] - del rdict["nodes"] - - for key in "success", "info", "result": - if key not in rdict: - raise errors.OpExecError("Can't parse iallocator results:" - " missing key '%s'" % key) - setattr(self, key, rdict[key]) - - if not self._result_check(self.result): - raise errors.OpExecError("Iallocator returned invalid result," - " expected %s, got %s" % - (self._result_check, self.result), - errors.ECODE_INVAL) - - if self.mode == constants.IALLOCATOR_MODE_RELOC: - assert self.relocate_from is not None - assert self.required_nodes == 1 - - node2group = dict((name, ndata["group"]) - for (name, ndata) in self.in_data["nodes"].items()) - - fn = compat.partial(self._NodesToGroups, node2group, - self.in_data["nodegroups"]) - - instance = self.cfg.GetInstanceInfo(self.name) - request_groups = fn(self.relocate_from + [instance.primary_node]) - result_groups = fn(rdict["result"] + [instance.primary_node]) - - if self.success and not set(result_groups).issubset(request_groups): - raise errors.OpExecError("Groups of nodes returned by iallocator (%s)" - " differ from original groups (%s)" % - (utils.CommaJoin(result_groups), - utils.CommaJoin(request_groups))) - - elif self.mode == constants.IALLOCATOR_MODE_NODE_EVAC: - assert self.evac_mode in constants.IALLOCATOR_NEVAC_MODES - - self.out_data = rdict - - @staticmethod - def _NodesToGroups(node2group, groups, nodes): - """Returns a list of unique group names for a list of nodes. - - @type node2group: dict - @param node2group: Map from node name to group UUID - @type groups: dict - @param groups: Group information - @type nodes: list - @param nodes: Node names - - """ - result = set() - - for node in nodes: - try: - group_uuid = node2group[node] - except KeyError: - # Ignore unknown node - pass - else: - try: - group = groups[group_uuid] - except KeyError: - # Can't find group, let's use UUID - group_name = group_uuid - else: - group_name = group["name"] - - result.add(group_name) - - return sorted(result) - - class LUTestAllocator(NoHooksLU): """Run allocator tests. @@ -15182,7 +14800,8 @@ class LUTestAllocator(NoHooksLU): This checks the opcode parameters depending on the director and mode test. """ - if self.op.mode == constants.IALLOCATOR_MODE_ALLOC: + if self.op.mode in (constants.IALLOCATOR_MODE_ALLOC, + constants.IALLOCATOR_MODE_MULTI_ALLOC): for attr in ["memory", "disks", "disk_template", "os", "tags", "nics", "vcpus"]: if not hasattr(self.op, attr): @@ -15235,38 +14854,44 @@ class LUTestAllocator(NoHooksLU): """ if self.op.mode == constants.IALLOCATOR_MODE_ALLOC: - ial = IAllocator(self.cfg, self.rpc, - mode=self.op.mode, - name=self.op.name, - memory=self.op.memory, - disks=self.op.disks, - disk_template=self.op.disk_template, - os=self.op.os, - tags=self.op.tags, - nics=self.op.nics, - vcpus=self.op.vcpus, - hypervisor=self.op.hypervisor, - ) + req = iallocator.IAReqInstanceAlloc(name=self.op.name, + memory=self.op.memory, + disks=self.op.disks, + disk_template=self.op.disk_template, + os=self.op.os, + tags=self.op.tags, + nics=self.op.nics, + vcpus=self.op.vcpus, + spindle_use=self.op.spindle_use, + hypervisor=self.op.hypervisor) elif self.op.mode == constants.IALLOCATOR_MODE_RELOC: - ial = IAllocator(self.cfg, self.rpc, - mode=self.op.mode, - name=self.op.name, - relocate_from=list(self.relocate_from), - ) + req = iallocator.IAReqRelocate(name=self.op.name, + relocate_from=list(self.relocate_from)) elif self.op.mode == constants.IALLOCATOR_MODE_CHG_GROUP: - ial = IAllocator(self.cfg, self.rpc, - mode=self.op.mode, - instances=self.op.instances, - target_groups=self.op.target_groups) + req = iallocator.IAReqGroupChange(instances=self.op.instances, + target_groups=self.op.target_groups) elif self.op.mode == constants.IALLOCATOR_MODE_NODE_EVAC: - ial = IAllocator(self.cfg, self.rpc, - mode=self.op.mode, - instances=self.op.instances, - evac_mode=self.op.evac_mode) + req = iallocator.IAReqNodeEvac(instances=self.op.instances, + evac_mode=self.op.evac_mode) + elif self.op.mode == constants.IALLOCATOR_MODE_MULTI_ALLOC: + disk_template = self.op.disk_template + insts = [iallocator.IAReqInstanceAlloc(name="%s%s" % (self.op.name, idx), + memory=self.op.memory, + disks=self.op.disks, + disk_template=disk_template, + os=self.op.os, + tags=self.op.tags, + nics=self.op.nics, + vcpus=self.op.vcpus, + spindle_use=self.op.spindle_use, + hypervisor=self.op.hypervisor) + for idx in range(self.op.count)] + req = iallocator.IAReqMultiInstanceAlloc(instances=insts) else: raise errors.ProgrammerError("Uncatched mode %s in" " LUTestAllocator.Exec", self.op.mode) + ial = iallocator.IAllocator(self.cfg, self.rpc, req) if self.op.direction == constants.IALLOCATOR_DIR_IN: result = ial.in_text else: