X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/e8e079f391a1e7c92ec0e6ddf721ecfa4340db24..525939d91ad44e90b7a60ba91d1fff71cd01a0ee:/lib/cmdlib.py diff --git a/lib/cmdlib.py b/lib/cmdlib.py index 15a99fb..f13b1c1 100644 --- a/lib/cmdlib.py +++ b/lib/cmdlib.py @@ -40,7 +40,6 @@ import tempfile import shutil import itertools import operator -import ipaddr from ganeti import ssh from ganeti import utils @@ -139,13 +138,18 @@ class LogicalUnit(object): self.owned_locks = context.glm.list_owned self.context = context self.rpc = rpc_runner - # Dicts used to declare locking needs to mcpu + + # Dictionaries used to declare locking needs to mcpu self.needed_locks = None self.share_locks = dict.fromkeys(locking.LEVELS, 0) + self.opportunistic_locks = dict.fromkeys(locking.LEVELS, False) + self.add_locks = {} self.remove_locks = {} + # Used to force good behavior when calling helper functions self.recalculate_locks = {} + # logging self.Log = processor.Log # pylint: disable=C0103 self.LogWarning = processor.LogWarning # pylint: disable=C0103 @@ -693,6 +697,39 @@ def _SupportsOob(cfg, node): return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM] +def _IsExclusiveStorageEnabledNode(cfg, node): + """Whether exclusive_storage is in effect for the given node. + + @type cfg: L{config.ConfigWriter} + @param cfg: The cluster configuration + @type node: L{objects.Node} + @param node: The node + @rtype: bool + @return: The effective value of exclusive_storage + + """ + return cfg.GetNdParams(node)[constants.ND_EXCLUSIVE_STORAGE] + + +def _IsExclusiveStorageEnabledNodeName(cfg, nodename): + """Whether exclusive_storage is in effect for the given node. + + @type cfg: L{config.ConfigWriter} + @param cfg: The cluster configuration + @type nodename: string + @param nodename: The node + @rtype: bool + @return: The effective value of exclusive_storage + @raise errors.OpPrereqError: if no node exists with the given name + + """ + ni = cfg.GetNodeInfo(nodename) + if ni is None: + raise errors.OpPrereqError("Invalid node name %s" % nodename, + errors.ECODE_NOENT) + return _IsExclusiveStorageEnabledNode(cfg, ni) + + def _CopyLockList(names): """Makes a copy of a list of lock names. @@ -960,7 +997,8 @@ def _RunPostHook(lu, node_name): try: hm.RunPhase(constants.HOOKS_PHASE_POST, nodes=[node_name]) except Exception, err: # pylint: disable=W0703 - lu.LogWarning("Errors occurred running hooks on %s: %s" % (node_name, err)) + lu.LogWarning("Errors occurred running hooks on %s: %s", + node_name, err) def _CheckOutputFields(static, dynamic, selected): @@ -1101,7 +1139,8 @@ def _CheckInstanceState(lu, instance, req_states, msg=None): """ if msg is None: - msg = "can't use instance from outside %s states" % ", ".join(req_states) + msg = ("can't use instance from outside %s states" % + utils.CommaJoin(req_states)) if instance.admin_state not in req_states: raise errors.OpPrereqError("Instance '%s' is marked to be %s, %s" % (instance.name, instance.admin_state, msg), @@ -1316,6 +1355,51 @@ def _ExpandInstanceName(cfg, name): return _ExpandItemName(cfg.ExpandInstanceName, name, "Instance") +def _BuildNetworkHookEnv(name, subnet, gateway, network6, gateway6, + network_type, mac_prefix, tags): + """Builds network related env variables for hooks + + This builds the hook environment from individual variables. + + @type name: string + @param name: the name of the network + @type subnet: string + @param subnet: the ipv4 subnet + @type gateway: string + @param gateway: the ipv4 gateway + @type network6: string + @param network6: the ipv6 subnet + @type gateway6: string + @param gateway6: the ipv6 gateway + @type network_type: string + @param network_type: the type of the network + @type mac_prefix: string + @param mac_prefix: the mac_prefix + @type tags: list + @param tags: the tags of the network + + """ + env = {} + if name: + env["NETWORK_NAME"] = name + if subnet: + env["NETWORK_SUBNET"] = subnet + if gateway: + env["NETWORK_GATEWAY"] = gateway + if network6: + env["NETWORK_SUBNET6"] = network6 + if gateway6: + env["NETWORK_GATEWAY6"] = gateway6 + if mac_prefix: + env["NETWORK_MAC_PREFIX"] = mac_prefix + if network_type: + env["NETWORK_TYPE"] = network_type + if tags: + env["NETWORK_TAGS"] = " ".join(tags) + + return env + + def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status, minmem, maxmem, vcpus, nics, disk_template, disks, bep, hvp, hypervisor_name, tags): @@ -1375,14 +1459,31 @@ def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status, } if nics: nic_count = len(nics) - for idx, (ip, mac, mode, link, network) in enumerate(nics): + for idx, (ip, mac, mode, link, net, netinfo) in enumerate(nics): if ip is None: ip = "" env["INSTANCE_NIC%d_IP" % idx] = ip env["INSTANCE_NIC%d_MAC" % idx] = mac env["INSTANCE_NIC%d_MODE" % idx] = mode env["INSTANCE_NIC%d_LINK" % idx] = link - env["INSTANCE_NIC%d_NETWORK" % idx] = network + if network: + env["INSTANCE_NIC%d_NETWORK" % idx] = net + if netinfo: + nobj = objects.Network.FromDict(netinfo) + if nobj.network: + env["INSTANCE_NIC%d_NETWORK_SUBNET" % idx] = nobj.network + if nobj.gateway: + env["INSTANCE_NIC%d_NETWORK_GATEWAY" % idx] = nobj.gateway + if nobj.network6: + env["INSTANCE_NIC%d_NETWORK_SUBNET6" % idx] = nobj.network6 + if nobj.gateway6: + env["INSTANCE_NIC%d_NETWORK_GATEWAY6" % idx] = nobj.gateway6 + if nobj.mac_prefix: + env["INSTANCE_NIC%d_NETWORK_MAC_PREFIX" % idx] = nobj.mac_prefix + if nobj.network_type: + env["INSTANCE_NIC%d_NETWORK_TYPE" % idx] = nobj.network_type + if nobj.tags: + env["INSTANCE_NIC%d_NETWORK_TAGS" % idx] = " ".join(nobj.tags) if mode == constants.NIC_MODE_BRIDGED: env["INSTANCE_NIC%d_BRIDGE" % idx] = link else: @@ -1412,6 +1513,31 @@ def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status, return env +def _NICToTuple(lu, nic): + """Build a tupple of nic information. + + @type lu: L{LogicalUnit} + @param lu: the logical unit on whose behalf we execute + @type nic: L{objects.NIC} + @param nic: nic to convert to hooks tuple + + """ + ip = nic.ip + mac = nic.mac + cluster = lu.cfg.GetClusterInfo() + filled_params = cluster.SimpleFillNIC(nic.nicparams) + mode = filled_params[constants.NIC_MODE] + link = filled_params[constants.NIC_LINK] + net = nic.network + netinfo = None + if net: + net_uuid = lu.cfg.LookupNetwork(net) + if net_uuid: + nobj = lu.cfg.GetNetwork(net_uuid) + netinfo = objects.Network.ToDict(nobj) + return (ip, mac, mode, link, net, netinfo) + + def _NICListToTuple(lu, nics): """Build a list of nic information tuples. @@ -1425,15 +1551,8 @@ def _NICListToTuple(lu, nics): """ hooks_nics = [] - cluster = lu.cfg.GetClusterInfo() for nic in nics: - ip = nic.ip - mac = nic.mac - filled_params = cluster.SimpleFillNIC(nic.nicparams) - mode = filled_params[constants.NIC_MODE] - link = filled_params[constants.NIC_LINK] - network = nic.network - hooks_nics.append((ip, mac, mode, link, network)) + hooks_nics.append(_NICToTuple(lu, nic)) return hooks_nics @@ -1935,7 +2054,7 @@ class LUClusterVerify(NoHooksLU): # Verify global configuration jobs.append([ - opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors) + opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors), ]) # Always depend on global verification @@ -1974,7 +2093,7 @@ class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors): msg = ("hypervisor %s parameters syntax check (source %s): %%s" % (item, hv_name)) try: - hv_class = hypervisor.GetHypervisor(hv_name) + hv_class = hypervisor.GetHypervisorClass(hv_name) utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES) hv_class.CheckParameterSyntax(hv_params) except errors.GenericError, err: @@ -2094,6 +2213,10 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): @ivar oslist: list of OSes as diagnosed by DiagnoseOS @type vm_capable: boolean @ivar vm_capable: whether the node can host instances + @type pv_min: float + @ivar pv_min: size in MiB of the smallest PVs + @type pv_max: float + @ivar pv_max: size in MiB of the biggest PVs """ def __init__(self, offline=False, name=None, vm_capable=True): @@ -2113,6 +2236,8 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): self.ghost = False self.os_fail = False self.oslist = {} + self.pv_min = None + self.pv_max = None def ExpandNames(self): # This raises errors.OpPrereqError on its own: @@ -2126,6 +2251,11 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): locking.LEVEL_INSTANCE: inst_names, locking.LEVEL_NODEGROUP: [self.group_uuid], locking.LEVEL_NODE: [], + + # This opcode is run by watcher every five minutes and acquires all nodes + # for a group. It doesn't run for a long time, so it's better to acquire + # the node allocation lock as well. + locking.LEVEL_NODE_ALLOC: locking.ALL_SET, } self.share_locks = _ShareAll() @@ -2309,13 +2439,15 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): "Node time diverges by at least %s from master node time", ntime_diff) - def _VerifyNodeLVM(self, ninfo, nresult, vg_name): - """Check the node LVM results. + def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg): + """Check the node LVM results and update info for cross-node checks. @type ninfo: L{objects.Node} @param ninfo: the node to check @param nresult: the remote results for the node @param vg_name: the configured VG name + @type nimg: L{NodeImage} + @param nimg: node image """ if vg_name is None: @@ -2333,19 +2465,56 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): constants.MIN_VG_SIZE) _ErrorIf(vgstatus, constants.CV_ENODELVM, node, vgstatus) - # check pv names - pvlist = nresult.get(constants.NV_PVLIST, None) - test = pvlist is None + # check pv names (and possibly sizes) + pvlist_dict = nresult.get(constants.NV_PVLIST, None) + test = pvlist_dict is None _ErrorIf(test, constants.CV_ENODELVM, node, "Can't get PV list from node") if not test: + pvlist = map(objects.LvmPvInfo.FromDict, pvlist_dict) # check that ':' is not present in PV names, since it's a # special character for lvcreate (denotes the range of PEs to # use on the PV) - for _, pvname, owner_vg in pvlist: - test = ":" in pvname + for pv in pvlist: + test = ":" in pv.name _ErrorIf(test, constants.CV_ENODELVM, node, "Invalid character ':' in PV '%s' of VG '%s'", - pvname, owner_vg) + pv.name, pv.vg_name) + if self._exclusive_storage: + (errmsgs, (pvmin, pvmax)) = utils.LvmExclusiveCheckNodePvs(pvlist) + for msg in errmsgs: + self._Error(constants.CV_ENODELVM, node, msg) + nimg.pv_min = pvmin + nimg.pv_max = pvmax + + def _VerifyGroupLVM(self, node_image, vg_name): + """Check cross-node consistency in LVM. + + @type node_image: dict + @param node_image: info about nodes, mapping from node to names to + L{NodeImage} objects + @param vg_name: the configured VG name + + """ + if vg_name is None: + return + + # Only exlcusive storage needs this kind of checks + if not self._exclusive_storage: + return + + # exclusive_storage wants all PVs to have the same size (approximately), + # if the smallest and the biggest ones are okay, everything is fine. + # pv_min is None iff pv_max is None + vals = filter((lambda ni: ni.pv_min is not None), node_image.values()) + if not vals: + return + (pvmin, minnode) = min((ni.pv_min, ni.name) for ni in vals) + (pvmax, maxnode) = max((ni.pv_max, ni.name) for ni in vals) + bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax) + self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name, + "PV sizes differ too much in the group; smallest (%s MB) is" + " on %s, biggest (%s MB) is on %s", + pvmin, minnode, pvmax, maxnode) def _VerifyNodeBridges(self, ninfo, nresult, bridges): """Check the node bridges. @@ -2450,7 +2619,8 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster, self.group_info) err = _ComputeIPolicyInstanceViolation(ipolicy, instanceconfig) - _ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance, utils.CommaJoin(err)) + _ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance, utils.CommaJoin(err), + code=self.ETYPE_WARNING) for node in node_vol_should: n_img = node_image[node] @@ -3041,7 +3211,12 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): len(s) == 2 for s in statuses) for inst, nnames in instdisk.items() for nname, statuses in nnames.items()) - assert set(instdisk) == set(instanceinfo), "instdisk consistency failure" + if __debug__: + instdisk_keys = set(instdisk) + instanceinfo_keys = set(instanceinfo) + assert instdisk_keys == instanceinfo_keys, \ + ("instdisk keys (%s) do not match instanceinfo keys (%s)" % + (instdisk_keys, instanceinfo_keys)) return instdisk @@ -3087,7 +3262,7 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): """ env = { - "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()) + "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()), } env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags())) @@ -3240,6 +3415,23 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): nimg.sbp[pnode] = [] nimg.sbp[pnode].append(instance) + es_flags = rpc.GetExclusiveStorageForNodeNames(self.cfg, self.my_node_names) + es_unset_nodes = [] + # The value of exclusive_storage should be the same across the group, so if + # it's True for at least a node, we act as if it were set for all the nodes + self._exclusive_storage = compat.any(es_flags.values()) + if self._exclusive_storage: + es_unset_nodes = [n for (n, es) in es_flags.items() + if not es] + + if es_unset_nodes: + self._Error(constants.CV_EGROUPMIXEDESFLAG, self.group_info.name, + "The exclusive_storage flag should be uniform in a group," + " but these nodes have it unset: %s", + utils.CommaJoin(utils.NiceSort(es_unset_nodes))) + self.LogWarning("Some checks required by exclusive storage will be" + " performed also on nodes with the flag unset") + # At this point, we have the in-memory data structures complete, # except for the runtime information, which we'll gather next @@ -3343,7 +3535,7 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): node == master_node) if nimg.vm_capable: - self._VerifyNodeLVM(node_i, nresult, vg_name) + self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg) self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper, all_drbd_map) @@ -3370,6 +3562,8 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): _ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name, "node is running unknown instance %s", inst) + self._VerifyGroupLVM(node_image, vg_name) + for node, result in extra_lv_nvinfo.items(): self._UpdateNodeVolumes(self.all_node_info[node], result.payload, node_image[node], vg_name) @@ -3572,6 +3766,12 @@ class LUGroupVerifyDisks(NoHooksLU): locking.LEVEL_INSTANCE: [], locking.LEVEL_NODEGROUP: [], locking.LEVEL_NODE: [], + + # This opcode is acquires all node locks in a group. LUClusterVerifyDisks + # starts one instance of this opcode for every group, which means all + # nodes will be locked for a short amount of time, so it's better to + # acquire the node allocation lock as well. + locking.LEVEL_NODE_ALLOC: locking.ALL_SET, } def DeclareLocks(self, level): @@ -3678,6 +3878,8 @@ class LUClusterRepairDiskSizes(NoHooksLU): def ExpandNames(self): if self.op.instances: self.wanted_names = _GetWantedInstances(self, self.op.instances) + # Not getting the node allocation lock as only a specific set of + # instances (and their nodes) is going to be acquired self.needed_locks = { locking.LEVEL_NODE_RES: [], locking.LEVEL_INSTANCE: self.wanted_names, @@ -3688,10 +3890,15 @@ class LUClusterRepairDiskSizes(NoHooksLU): self.needed_locks = { locking.LEVEL_NODE_RES: locking.ALL_SET, locking.LEVEL_INSTANCE: locking.ALL_SET, + + # This opcode is acquires the node locks for all instances + locking.LEVEL_NODE_ALLOC: locking.ALL_SET, } + self.share_locks = { locking.LEVEL_NODE_RES: 1, locking.LEVEL_INSTANCE: 0, + locking.LEVEL_NODE_ALLOC: 1, } def DeclareLocks(self, level): @@ -3933,16 +4140,15 @@ class LUClusterSetParams(LogicalUnit): def ExpandNames(self): # FIXME: in the future maybe other cluster params won't require checking on # all nodes to be modified. + # FIXME: This opcode changes cluster-wide settings. Is acquiring all + # resource locks the right thing, shouldn't it be the BGL instead? self.needed_locks = { locking.LEVEL_NODE: locking.ALL_SET, locking.LEVEL_INSTANCE: locking.ALL_SET, locking.LEVEL_NODEGROUP: locking.ALL_SET, + locking.LEVEL_NODE_ALLOC: locking.ALL_SET, } - self.share_locks = { - locking.LEVEL_NODE: 1, - locking.LEVEL_INSTANCE: 1, - locking.LEVEL_NODEGROUP: 1, - } + self.share_locks = _ShareAll() def BuildHooksEnv(self): """Build hooks env. @@ -4121,7 +4327,10 @@ class LUClusterSetParams(LogicalUnit): self.new_os_hvp[os_name] = hvs else: for hv_name, hv_dict in hvs.items(): - if hv_name not in self.new_os_hvp[os_name]: + if hv_dict is None: + # Delete if it exists + self.new_os_hvp[os_name].pop(hv_name, None) + elif hv_name not in self.new_os_hvp[os_name]: self.new_os_hvp[os_name][hv_name] = hv_dict else: self.new_os_hvp[os_name][hv_name].update(hv_dict) @@ -4167,7 +4376,7 @@ class LUClusterSetParams(LogicalUnit): (self.op.enabled_hypervisors and hv_name in self.op.enabled_hypervisors)): # either this is a new hypervisor, or its parameters have changed - hv_class = hypervisor.GetHypervisor(hv_name) + hv_class = hypervisor.GetHypervisorClass(hv_name) utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES) hv_class.CheckParameterSyntax(hv_params) _CheckHVParams(self, node_list, hv_name, hv_params) @@ -4181,7 +4390,7 @@ class LUClusterSetParams(LogicalUnit): # we need to fill in the new os_hvp on top of the actual hv_p cluster_defaults = self.new_hvparams.get(hv_name, {}) new_osp = objects.FillDict(cluster_defaults, hv_params) - hv_class = hypervisor.GetHypervisor(hv_name) + hv_class = hypervisor.GetHypervisorClass(hv_name) hv_class.CheckParameterSyntax(new_osp) _CheckHVParams(self, node_list, hv_name, new_osp) @@ -4347,7 +4556,7 @@ def _UploadHelper(lu, nodes, fname): if msg: msg = ("Copy of file %s to node %s failed: %s" % (fname, to_node, msg)) - lu.proc.LogWarning(msg) + lu.LogWarning(msg) def _ComputeAncillaryFiles(cluster, redist): @@ -4403,12 +4612,14 @@ def _ComputeAncillaryFiles(cluster, redist): files_vm = set( filename for hv_name in cluster.enabled_hypervisors - for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles()[0]) + for filename in + hypervisor.GetHypervisorClass(hv_name).GetAncillaryFiles()[0]) files_opt |= set( filename for hv_name in cluster.enabled_hypervisors - for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles()[1]) + for filename in + hypervisor.GetHypervisorClass(hv_name).GetAncillaryFiles()[1]) # Filenames in each category must be unique all_files_set = files_all | files_mc | files_vm @@ -4489,8 +4700,9 @@ class LUClusterRedistConf(NoHooksLU): def ExpandNames(self): self.needed_locks = { locking.LEVEL_NODE: locking.ALL_SET, + locking.LEVEL_NODE_ALLOC: locking.ALL_SET, } - self.share_locks[locking.LEVEL_NODE] = 1 + self.share_locks = _ShareAll() def Exec(self, feedback_fn): """Redistribute the configuration. @@ -4540,7 +4752,7 @@ def _WaitForSync(lu, instance, disks=None, oneshot=False): disks = _ExpandCheckDisks(instance, disks) if not oneshot: - lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name) + lu.LogInfo("Waiting for instance %s to sync disks", instance.name) node = instance.primary_node @@ -4583,8 +4795,8 @@ def _WaitForSync(lu, instance, disks=None, oneshot=False): max_time = mstat.estimated_time else: rem_time = "no time estimate" - lu.proc.LogInfo("- device %s: %5.2f%% done, %s" % - (disks[i].iv_name, mstat.sync_percent, rem_time)) + lu.LogInfo("- device %s: %5.2f%% done, %s", + disks[i].iv_name, mstat.sync_percent, rem_time) # if we're done but degraded, let's do a few small retries, to # make sure we see a stable and not transient situation; therefore @@ -4601,7 +4813,8 @@ def _WaitForSync(lu, instance, disks=None, oneshot=False): time.sleep(min(60, max_time)) if done: - lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name) + lu.LogInfo("Instance %s's disks are in sync", instance.name) + return not cumul_degraded @@ -4687,6 +4900,12 @@ class LUOobCommand(NoHooksLU): locking.LEVEL_NODE: lock_names, } + self.share_locks[locking.LEVEL_NODE_ALLOC] = 1 + + if not self.op.node_names: + # Acquire node allocation lock only if all nodes are affected + self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET + def CheckPrereq(self): """Check prerequisites. @@ -5007,6 +5226,159 @@ class LUOsDiagnose(NoHooksLU): return self.oq.OldStyleQuery(self) +class _ExtStorageQuery(_QueryBase): + FIELDS = query.EXTSTORAGE_FIELDS + + def ExpandNames(self, lu): + # Lock all nodes in shared mode + # Temporary removal of locks, should be reverted later + # TODO: reintroduce locks when they are lighter-weight + lu.needed_locks = {} + #self.share_locks[locking.LEVEL_NODE] = 1 + #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET + + # The following variables interact with _QueryBase._GetNames + if self.names: + self.wanted = self.names + else: + self.wanted = locking.ALL_SET + + self.do_locking = self.use_locking + + def DeclareLocks(self, lu, level): + pass + + @staticmethod + def _DiagnoseByProvider(rlist): + """Remaps a per-node return list into an a per-provider per-node dictionary + + @param rlist: a map with node names as keys and ExtStorage objects as values + + @rtype: dict + @return: a dictionary with extstorage providers as keys and as + value another map, with nodes as keys and tuples of + (path, status, diagnose, parameters) as values, eg:: + + {"provider1": {"node1": [(/usr/lib/..., True, "", [])] + "node2": [(/srv/..., False, "missing file")] + "node3": [(/srv/..., True, "", [])] + } + + """ + all_es = {} + # we build here the list of nodes that didn't fail the RPC (at RPC + # level), so that nodes with a non-responding node daemon don't + # make all OSes invalid + good_nodes = [node_name for node_name in rlist + if not rlist[node_name].fail_msg] + for node_name, nr in rlist.items(): + if nr.fail_msg or not nr.payload: + continue + for (name, path, status, diagnose, params) in nr.payload: + if name not in all_es: + # build a list of nodes for this os containing empty lists + # for each node in node_list + all_es[name] = {} + for nname in good_nodes: + all_es[name][nname] = [] + # convert params from [name, help] to (name, help) + params = [tuple(v) for v in params] + all_es[name][node_name].append((path, status, diagnose, params)) + return all_es + + def _GetQueryData(self, lu): + """Computes the list of nodes and their attributes. + + """ + # Locking is not used + assert not (compat.any(lu.glm.is_owned(level) + for level in locking.LEVELS + if level != locking.LEVEL_CLUSTER) or + self.do_locking or self.use_locking) + + valid_nodes = [node.name + for node in lu.cfg.GetAllNodesInfo().values() + if not node.offline and node.vm_capable] + pol = self._DiagnoseByProvider(lu.rpc.call_extstorage_diagnose(valid_nodes)) + + data = {} + + nodegroup_list = lu.cfg.GetNodeGroupList() + + for (es_name, es_data) in pol.items(): + # For every provider compute the nodegroup validity. + # To do this we need to check the validity of each node in es_data + # and then construct the corresponding nodegroup dict: + # { nodegroup1: status + # nodegroup2: status + # } + ndgrp_data = {} + for nodegroup in nodegroup_list: + ndgrp = lu.cfg.GetNodeGroup(nodegroup) + + nodegroup_nodes = ndgrp.members + nodegroup_name = ndgrp.name + node_statuses = [] + + for node in nodegroup_nodes: + if node in valid_nodes: + if es_data[node] != []: + node_status = es_data[node][0][1] + node_statuses.append(node_status) + else: + node_statuses.append(False) + + if False in node_statuses: + ndgrp_data[nodegroup_name] = False + else: + ndgrp_data[nodegroup_name] = True + + # Compute the provider's parameters + parameters = set() + for idx, esl in enumerate(es_data.values()): + valid = bool(esl and esl[0][1]) + if not valid: + break + + node_params = esl[0][3] + if idx == 0: + # First entry + parameters.update(node_params) + else: + # Filter out inconsistent values + parameters.intersection_update(node_params) + + params = list(parameters) + + # Now fill all the info for this provider + info = query.ExtStorageInfo(name=es_name, node_status=es_data, + nodegroup_status=ndgrp_data, + parameters=params) + + data[es_name] = info + + # Prepare data in requested order + return [data[name] for name in self._GetNames(lu, pol.keys(), None) + if name in data] + + +class LUExtStorageDiagnose(NoHooksLU): + """Logical unit for ExtStorage diagnose/query. + + """ + REQ_BGL = False + + def CheckArguments(self): + self.eq = _ExtStorageQuery(qlang.MakeSimpleFilter("name", self.op.names), + self.op.output_fields, False) + + def ExpandNames(self): + self.eq.ExpandNames(self) + + def Exec(self, feedback_fn): + return self.eq.OldStyleQuery(self) + + class LUNodeRemove(LogicalUnit): """Logical unit for removing a node. @@ -5119,6 +5491,7 @@ class _NodeQuery(_QueryBase): if self.do_locking: # If any non-static field is requested we need to lock the nodes lu.needed_locks[locking.LEVEL_NODE] = self.wanted + lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET def DeclareLocks(self, lu, level): pass @@ -5136,8 +5509,9 @@ class _NodeQuery(_QueryBase): # filter out non-vm_capable nodes toquery_nodes = [name for name in nodenames if all_info[name].vm_capable] + es_flags = rpc.GetExclusiveStorageForNodeNames(lu.cfg, toquery_nodes) node_data = lu.rpc.call_node_info(toquery_nodes, [lu.cfg.GetVGName()], - [lu.cfg.GetHypervisorType()]) + [lu.cfg.GetHypervisorType()], es_flags) live_data = dict((name, rpc.MakeLegacyNodeInfo(nresult.payload)) for (name, nresult) in node_data.items() if not nresult.fail_msg and nresult.payload) @@ -5213,13 +5587,16 @@ class LUNodeQueryvols(NoHooksLU): def ExpandNames(self): self.share_locks = _ShareAll() - self.needed_locks = {} - if not self.op.nodes: - self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET + if self.op.nodes: + self.needed_locks = { + locking.LEVEL_NODE: _GetWantedNodes(self, self.op.nodes), + } else: - self.needed_locks[locking.LEVEL_NODE] = \ - _GetWantedNodes(self, self.op.nodes) + self.needed_locks = { + locking.LEVEL_NODE: locking.ALL_SET, + locking.LEVEL_NODE_ALLOC: locking.ALL_SET, + } def Exec(self, feedback_fn): """Computes the list of nodes and their attributes. @@ -5282,13 +5659,16 @@ class LUNodeQueryStorage(NoHooksLU): def ExpandNames(self): self.share_locks = _ShareAll() - self.needed_locks = {} if self.op.nodes: - self.needed_locks[locking.LEVEL_NODE] = \ - _GetWantedNodes(self, self.op.nodes) + self.needed_locks = { + locking.LEVEL_NODE: _GetWantedNodes(self, self.op.nodes), + } else: - self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET + self.needed_locks = { + locking.LEVEL_NODE: locking.ALL_SET, + locking.LEVEL_NODE_ALLOC: locking.ALL_SET, + } def Exec(self, feedback_fn): """Computes the list of nodes and their attributes. @@ -5914,19 +6294,28 @@ class LUNodeSetParams(LogicalUnit): def ExpandNames(self): if self.lock_all: - self.needed_locks = {locking.LEVEL_NODE: locking.ALL_SET} + self.needed_locks = { + locking.LEVEL_NODE: locking.ALL_SET, + + # Block allocations when all nodes are locked + locking.LEVEL_NODE_ALLOC: locking.ALL_SET, + } else: - self.needed_locks = {locking.LEVEL_NODE: self.op.node_name} + self.needed_locks = { + locking.LEVEL_NODE: self.op.node_name, + } # Since modifying a node can have severe effects on currently running # operations the resource lock is at least acquired in shared mode self.needed_locks[locking.LEVEL_NODE_RES] = \ self.needed_locks[locking.LEVEL_NODE] - # Get node resource and instance locks in shared mode; they are not used - # for anything but read-only access - self.share_locks[locking.LEVEL_NODE_RES] = 1 - self.share_locks[locking.LEVEL_INSTANCE] = 1 + # Get all locks except nodes in shared mode; they are not used for anything + # but read-only access + self.share_locks = _ShareAll() + self.share_locks[locking.LEVEL_NODE] = 0 + self.share_locks[locking.LEVEL_NODE_RES] = 0 + self.share_locks[locking.LEVEL_NODE_ALLOC] = 0 if self.lock_instances: self.needed_locks[locking.LEVEL_INSTANCE] = \ @@ -6093,7 +6482,8 @@ class LUNodeSetParams(LogicalUnit): if master_singlehomed and self.op.secondary_ip != node.primary_ip: if self.op.force and node.name == master.name: self.LogWarning("Transitioning from single-homed to multi-homed" - " cluster. All nodes will require a secondary ip.") + " cluster; all nodes will require a secondary IP" + " address") else: raise errors.OpPrereqError("Changing the secondary ip on a" " single-homed cluster requires the" @@ -6103,7 +6493,8 @@ class LUNodeSetParams(LogicalUnit): elif not master_singlehomed and self.op.secondary_ip == node.primary_ip: if self.op.force and node.name == master.name: self.LogWarning("Transitioning from multi-homed to single-homed" - " cluster. Secondary IPs will have to be removed.") + " cluster; secondary IP addresses will have to be" + " removed") else: raise errors.OpPrereqError("Cannot set the secondary IP to be the" " same as the primary IP on a multi-homed" @@ -6380,7 +6771,13 @@ class _ClusterQuery(_QueryBase): drain_flag = NotImplemented if query.CQ_WATCHER_PAUSE in self.requested_data: - watcher_pause = utils.ReadWatcherPauseFile(pathutils.WATCHER_PAUSEFILE) + master_name = lu.cfg.GetMasterNode() + + result = lu.rpc.call_get_watcher_pause(master_name) + result.Raise("Can't retrieve watcher pause from master node '%s'" % + master_name) + + watcher_pause = result.payload else: watcher_pause = NotImplemented @@ -6481,9 +6878,9 @@ def _AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False, if msg: is_offline_secondary = (node in instance.secondary_nodes and result.offline) - lu.proc.LogWarning("Could not prepare block device %s on node %s" - " (is_primary=False, pass=1): %s", - inst_disk.iv_name, node, msg) + lu.LogWarning("Could not prepare block device %s on node %s" + " (is_primary=False, pass=1): %s", + inst_disk.iv_name, node, msg) if not (ignore_secondaries or is_offline_secondary): disks_ok = False @@ -6504,9 +6901,9 @@ def _AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False, True, idx) msg = result.fail_msg if msg: - lu.proc.LogWarning("Could not prepare block device %s on node %s" - " (is_primary=True, pass=2): %s", - inst_disk.iv_name, node, msg) + lu.LogWarning("Could not prepare block device %s on node %s" + " (is_primary=True, pass=2): %s", + inst_disk.iv_name, node, msg) disks_ok = False else: dev_path = result.payload @@ -6531,9 +6928,9 @@ def _StartInstanceDisks(lu, instance, force): if not disks_ok: _ShutdownInstanceDisks(lu, instance) if force is not None and not force: - lu.proc.LogWarning("", hint="If the message above refers to a" - " secondary node," - " you can retry the operation using '--force'.") + lu.LogWarning("", + hint=("If the message above refers to a secondary node," + " you can retry the operation using '--force'")) raise errors.OpExecError("Disk consistency error") @@ -6631,9 +7028,9 @@ def _ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False): def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name): """Checks if a node has enough free memory. - This function check if a given node has the needed amount of free + This function checks if a given node has the needed amount of free memory. In case the node has less memory or we cannot get the - information from the node, this function raise an OpPrereqError + information from the node, this function raises an OpPrereqError exception. @type lu: C{LogicalUnit} @@ -6652,7 +7049,7 @@ def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name): we cannot check the node """ - nodeinfo = lu.rpc.call_node_info([node], None, [hypervisor_name]) + nodeinfo = lu.rpc.call_node_info([node], None, [hypervisor_name], False) nodeinfo[node].Raise("Can't get data from node %s" % node, prereq=True, ecode=errors.ECODE_ENVIRON) (_, _, (hv_info, )) = nodeinfo[node].payload @@ -6671,11 +7068,11 @@ def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name): def _CheckNodesFreeDiskPerVG(lu, nodenames, req_sizes): - """Checks if nodes have enough free disk space in the all VGs. + """Checks if nodes have enough free disk space in all the VGs. - This function check if all given nodes have the needed amount of + This function checks if all given nodes have the needed amount of free disk. In case any node has less disk or we cannot get the - information from the node, this function raise an OpPrereqError + information from the node, this function raises an OpPrereqError exception. @type lu: C{LogicalUnit} @@ -6696,9 +7093,9 @@ def _CheckNodesFreeDiskPerVG(lu, nodenames, req_sizes): def _CheckNodesFreeDiskOnVG(lu, nodenames, vg, requested): """Checks if nodes have enough free disk space in the specified VG. - This function check if all given nodes have the needed amount of + This function checks if all given nodes have the needed amount of free disk. In case any node has less disk or we cannot get the - information from the node, this function raise an OpPrereqError + information from the node, this function raises an OpPrereqError exception. @type lu: C{LogicalUnit} @@ -6713,7 +7110,8 @@ def _CheckNodesFreeDiskOnVG(lu, nodenames, vg, requested): or we cannot check the node """ - nodeinfo = lu.rpc.call_node_info(nodenames, [vg], None) + es_flags = rpc.GetExclusiveStorageForNodeNames(lu.cfg, nodenames) + nodeinfo = lu.rpc.call_node_info(nodenames, [vg], None, es_flags) for node in nodenames: info = nodeinfo[node] info.Raise("Cannot get current information from node %s" % node, @@ -6749,7 +7147,7 @@ def _CheckNodesPhysicalCPUs(lu, nodenames, requested, hypervisor_name): or we cannot check the node """ - nodeinfo = lu.rpc.call_node_info(nodenames, None, [hypervisor_name]) + nodeinfo = lu.rpc.call_node_info(nodenames, None, [hypervisor_name], None) for node in nodenames: info = nodeinfo[node] info.Raise("Cannot get current information from node %s" % node, @@ -6827,7 +7225,7 @@ class LUInstanceStartup(LogicalUnit): utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES) filled_hvp = cluster.FillHV(instance) filled_hvp.update(self.op.hvparams) - hv_type = hypervisor.GetHypervisor(instance.hypervisor) + hv_type = hypervisor.GetHypervisorClass(instance.hypervisor) hv_type.CheckParameterSyntax(filled_hvp) _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp) @@ -6836,10 +7234,10 @@ class LUInstanceStartup(LogicalUnit): self.primary_offline = self.cfg.GetNodeInfo(instance.primary_node).offline if self.primary_offline and self.op.ignore_offline_nodes: - self.proc.LogWarning("Ignoring offline primary node") + self.LogWarning("Ignoring offline primary node") if self.op.hvparams or self.op.beparams: - self.proc.LogWarning("Overridden parameters are ignored") + self.LogWarning("Overridden parameters are ignored") else: _CheckNodeOnline(self, instance.primary_node) @@ -6871,7 +7269,7 @@ class LUInstanceStartup(LogicalUnit): if self.primary_offline: assert self.op.ignore_offline_nodes - self.proc.LogInfo("Primary node offline, marked instance as started") + self.LogInfo("Primary node offline, marked instance as started") else: node_current = instance.primary_node @@ -7020,13 +7418,16 @@ class LUInstanceShutdown(LogicalUnit): assert self.instance is not None, \ "Cannot retrieve locked instance %s" % self.op.instance_name - _CheckInstanceState(self, self.instance, INSTANCE_ONLINE) + if not self.op.force: + _CheckInstanceState(self, self.instance, INSTANCE_ONLINE) + else: + self.LogWarning("Ignoring offline instance check") self.primary_offline = \ self.cfg.GetNodeInfo(self.instance.primary_node).offline if self.primary_offline and self.op.ignore_offline_nodes: - self.proc.LogWarning("Ignoring offline primary node") + self.LogWarning("Ignoring offline primary node") else: _CheckNodeOnline(self, self.instance.primary_node) @@ -7038,17 +7439,19 @@ class LUInstanceShutdown(LogicalUnit): node_current = instance.primary_node timeout = self.op.timeout - if not self.op.no_remember: + # If the instance is offline we shouldn't mark it as down, as that + # resets the offline flag. + if not self.op.no_remember and instance.admin_state in INSTANCE_ONLINE: self.cfg.MarkInstanceDown(instance.name) if self.primary_offline: assert self.op.ignore_offline_nodes - self.proc.LogInfo("Primary node offline, marked instance as stopped") + self.LogInfo("Primary node offline, marked instance as stopped") else: result = self.rpc.call_instance_shutdown(node_current, instance, timeout) msg = result.fail_msg if msg: - self.proc.LogWarning("Could not shutdown instance: %s" % msg) + self.LogWarning("Could not shutdown instance: %s", msg) _ShutdownInstanceDisks(self, instance) @@ -7149,7 +7552,7 @@ class LUInstanceRecreateDisks(LogicalUnit): HTYPE = constants.HTYPE_INSTANCE REQ_BGL = False - _MODIFYABLE = frozenset([ + _MODIFYABLE = compat.UniqueFrozenset([ constants.IDISK_SIZE, constants.IDISK_MODE, ]) @@ -7161,6 +7564,7 @@ class LUInstanceRecreateDisks(LogicalUnit): # TODO: Implement support changing VG while recreating constants.IDISK_VG, constants.IDISK_METAVG, + constants.IDISK_PROVIDER, ])) def _RunAllocator(self): @@ -7199,7 +7603,8 @@ class LUInstanceRecreateDisks(LogicalUnit): disks=[{constants.IDISK_SIZE: d.size, constants.IDISK_MODE: d.mode} for d in self.instance.disks], - hypervisor=self.instance.hypervisor) + hypervisor=self.instance.hypervisor, + node_whitelist=None) ial = iallocator.IAllocator(self.cfg, self.rpc, req) ial.Run(self.op.iallocator) @@ -7244,6 +7649,7 @@ class LUInstanceRecreateDisks(LogicalUnit): def ExpandNames(self): self._ExpandAndLockInstance() self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND + if self.op.nodes: self.op.nodes = [_ExpandNodeName(self.cfg, n) for n in self.op.nodes] self.needed_locks[locking.LEVEL_NODE] = list(self.op.nodes) @@ -7252,6 +7658,8 @@ class LUInstanceRecreateDisks(LogicalUnit): if self.op.iallocator: # iallocator will select a new node in the same group self.needed_locks[locking.LEVEL_NODEGROUP] = [] + self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET + self.needed_locks[locking.LEVEL_NODE_RES] = [] def DeclareLocks(self, level): @@ -7281,6 +7689,8 @@ class LUInstanceRecreateDisks(LogicalUnit): for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP): self.needed_locks[locking.LEVEL_NODE].extend( self.cfg.GetNodeGroup(group_uuid).members) + + assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC) elif not self.op.nodes: self._LockInstancesNodes(primary_only=False) elif level == locking.LEVEL_NODE_RES: @@ -7371,6 +7781,9 @@ class LUInstanceRecreateDisks(LogicalUnit): # Release unneeded node and node resource locks _ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.nodes) _ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.nodes) + _ReleaseLocks(self, locking.LEVEL_NODE_ALLOC) + + assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC) def Exec(self, feedback_fn): """Recreate the disks. @@ -7514,6 +7927,7 @@ class LUInstanceRename(LogicalUnit): # Change the instance lock. This is definitely safe while we hold the BGL. # Otherwise the new lock would have to be added in acquired mode. assert self.REQ_BGL + assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER) self.glm.remove(locking.LEVEL_INSTANCE, old_name) self.glm.add(locking.LEVEL_INSTANCE, self.op.new_name) @@ -7548,7 +7962,7 @@ class LUInstanceRename(LogicalUnit): msg = ("Could not run OS rename script for instance %s on node %s" " (but the instance has been renamed in Ganeti): %s" % (inst.name, inst.primary_node, msg)) - self.proc.LogWarning(msg) + self.LogWarning(msg) finally: _ShutdownInstanceDisks(self, inst) @@ -7676,6 +8090,62 @@ class LUInstanceQuery(NoHooksLU): return self.iq.OldStyleQuery(self) +def _ExpandNamesForMigration(lu): + """Expands names for use with L{TLMigrateInstance}. + + @type lu: L{LogicalUnit} + + """ + if lu.op.target_node is not None: + lu.op.target_node = _ExpandNodeName(lu.cfg, lu.op.target_node) + + lu.needed_locks[locking.LEVEL_NODE] = [] + lu.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE + + lu.needed_locks[locking.LEVEL_NODE_RES] = [] + lu.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE + + # The node allocation lock is actually only needed for replicated instances + # (e.g. DRBD8) and if an iallocator is used. + lu.needed_locks[locking.LEVEL_NODE_ALLOC] = [] + + +def _DeclareLocksForMigration(lu, level): + """Declares locks for L{TLMigrateInstance}. + + @type lu: L{LogicalUnit} + @param level: Lock level + + """ + if level == locking.LEVEL_NODE_ALLOC: + assert lu.op.instance_name in lu.owned_locks(locking.LEVEL_INSTANCE) + + instance = lu.cfg.GetInstanceInfo(lu.op.instance_name) + + # Node locks are already declared here rather than at LEVEL_NODE as we need + # the instance object anyway to declare the node allocation lock. + if instance.disk_template in constants.DTS_EXT_MIRROR: + if lu.op.target_node is None: + lu.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET + lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET + else: + lu.needed_locks[locking.LEVEL_NODE] = [instance.primary_node, + lu.op.target_node] + del lu.recalculate_locks[locking.LEVEL_NODE] + else: + lu._LockInstancesNodes() # pylint: disable=W0212 + + elif level == locking.LEVEL_NODE: + # Node locks are declared together with the node allocation lock + assert (lu.needed_locks[locking.LEVEL_NODE] or + lu.needed_locks[locking.LEVEL_NODE] is locking.ALL_SET) + + elif level == locking.LEVEL_NODE_RES: + # Copy node locks + lu.needed_locks[locking.LEVEL_NODE_RES] = \ + _CopyLockList(lu.needed_locks[locking.LEVEL_NODE]) + + class LUInstanceFailover(LogicalUnit): """Failover an instance. @@ -7693,42 +8163,17 @@ class LUInstanceFailover(LogicalUnit): def ExpandNames(self): self._ExpandAndLockInstance() + _ExpandNamesForMigration(self) - if self.op.target_node is not None: - self.op.target_node = _ExpandNodeName(self.cfg, self.op.target_node) - - self.needed_locks[locking.LEVEL_NODE] = [] - self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE - - self.needed_locks[locking.LEVEL_NODE_RES] = [] - self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE + self._migrater = \ + TLMigrateInstance(self, self.op.instance_name, False, True, False, + self.op.ignore_consistency, True, + self.op.shutdown_timeout, self.op.ignore_ipolicy) - ignore_consistency = self.op.ignore_consistency - shutdown_timeout = self.op.shutdown_timeout - self._migrater = TLMigrateInstance(self, self.op.instance_name, - cleanup=False, - failover=True, - ignore_consistency=ignore_consistency, - shutdown_timeout=shutdown_timeout, - ignore_ipolicy=self.op.ignore_ipolicy) self.tasklets = [self._migrater] def DeclareLocks(self, level): - if level == locking.LEVEL_NODE: - instance = self.context.cfg.GetInstanceInfo(self.op.instance_name) - if instance.disk_template in constants.DTS_EXT_MIRROR: - if self.op.target_node is None: - self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET - else: - self.needed_locks[locking.LEVEL_NODE] = [instance.primary_node, - self.op.target_node] - del self.recalculate_locks[locking.LEVEL_NODE] - else: - self._LockInstancesNodes() - elif level == locking.LEVEL_NODE_RES: - # Copy node locks - self.needed_locks[locking.LEVEL_NODE_RES] = \ - _CopyLockList(self.needed_locks[locking.LEVEL_NODE]) + _DeclareLocksForMigration(self, level) def BuildHooksEnv(self): """Build hooks env. @@ -7778,41 +8223,19 @@ class LUInstanceMigrate(LogicalUnit): def ExpandNames(self): self._ExpandAndLockInstance() - - if self.op.target_node is not None: - self.op.target_node = _ExpandNodeName(self.cfg, self.op.target_node) - - self.needed_locks[locking.LEVEL_NODE] = [] - self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE - - self.needed_locks[locking.LEVEL_NODE] = [] - self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE + _ExpandNamesForMigration(self) self._migrater = \ - TLMigrateInstance(self, self.op.instance_name, - cleanup=self.op.cleanup, - failover=False, - fallback=self.op.allow_failover, - allow_runtime_changes=self.op.allow_runtime_changes, - ignore_ipolicy=self.op.ignore_ipolicy) + TLMigrateInstance(self, self.op.instance_name, self.op.cleanup, + False, self.op.allow_failover, False, + self.op.allow_runtime_changes, + constants.DEFAULT_SHUTDOWN_TIMEOUT, + self.op.ignore_ipolicy) + self.tasklets = [self._migrater] def DeclareLocks(self, level): - if level == locking.LEVEL_NODE: - instance = self.context.cfg.GetInstanceInfo(self.op.instance_name) - if instance.disk_template in constants.DTS_EXT_MIRROR: - if self.op.target_node is None: - self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET - else: - self.needed_locks[locking.LEVEL_NODE] = [instance.primary_node, - self.op.target_node] - del self.recalculate_locks[locking.LEVEL_NODE] - else: - self._LockInstancesNodes() - elif level == locking.LEVEL_NODE_RES: - # Copy node locks - self.needed_locks[locking.LEVEL_NODE_RES] = \ - _CopyLockList(self.needed_locks[locking.LEVEL_NODE]) + _DeclareLocksForMigration(self, level) def BuildHooksEnv(self): """Build hooks env. @@ -7969,10 +8392,10 @@ class LUInstanceMove(LogicalUnit): msg = result.fail_msg if msg: if self.op.ignore_consistency: - self.proc.LogWarning("Could not shutdown instance %s on node %s." - " Proceeding anyway. Please make sure node" - " %s is down. Error details: %s", - instance.name, source_node, source_node, msg) + self.LogWarning("Could not shutdown instance %s on node %s." + " Proceeding anyway. Please make sure node" + " %s is down. Error details: %s", + instance.name, source_node, source_node, msg) else: raise errors.OpExecError("Could not shutdown instance %s on" " node %s: %s" % @@ -8098,8 +8521,7 @@ class LUNodeMigrate(LogicalUnit): target_node=self.op.target_node, allow_runtime_changes=allow_runtime_changes, ignore_ipolicy=self.op.ignore_ipolicy)] - for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name) - ] + for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name)] # TODO: Run iallocator in this opcode and pass correct placement options to # OpInstanceMigrate. Since other jobs can modify the cluster between @@ -8143,12 +8565,9 @@ class TLMigrateInstance(Tasklet): _MIGRATION_POLL_INTERVAL = 1 # seconds _MIGRATION_FEEDBACK_INTERVAL = 10 # seconds - def __init__(self, lu, instance_name, cleanup=False, - failover=False, fallback=False, - ignore_consistency=False, - allow_runtime_changes=True, - shutdown_timeout=constants.DEFAULT_SHUTDOWN_TIMEOUT, - ignore_ipolicy=False): + def __init__(self, lu, instance_name, cleanup, failover, fallback, + ignore_consistency, allow_runtime_changes, shutdown_timeout, + ignore_ipolicy): """Initializes this class. """ @@ -8194,6 +8613,8 @@ class TLMigrateInstance(Tasklet): errors.ECODE_STATE) if instance.disk_template in constants.DTS_EXT_MIRROR: + assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC) + _CheckIAllocatorOrNode(self.lu, "iallocator", "target_node") if self.lu.op.iallocator: @@ -8225,8 +8646,11 @@ class TLMigrateInstance(Tasklet): # in the LU _ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=[instance.primary_node, self.target_node]) + _ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC) else: + assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC) + secondary_nodes = instance.secondary_nodes if not secondary_nodes: raise errors.ConfigurationError("No secondary node but using" @@ -8328,6 +8752,8 @@ class TLMigrateInstance(Tasklet): """Run the allocator based on input opcode. """ + assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC) + # FIXME: add a self.ignore_ipolicy option req = iallocator.IAReqRelocate(name=self.instance_name, relocate_from=[self.instance.primary_node]) @@ -8538,7 +8964,7 @@ class TLMigrateInstance(Tasklet): # Check for hypervisor version mismatch and warn the user. nodeinfo = self.rpc.call_node_info([source_node, target_node], - None, [self.instance.hypervisor]) + None, [self.instance.hypervisor], False) for ninfo in nodeinfo.values(): ninfo.Raise("Unable to retrieve node information from node '%s'" % ninfo.node) @@ -8687,9 +9113,9 @@ class TLMigrateInstance(Tasklet): self._GoReconnect(False) self._WaitUntilSync() - # If the instance's disk template is `rbd' and there was a successful - # migration, unmap the device from the source node. - if self.instance.disk_template == constants.DT_RBD: + # If the instance's disk template is `rbd' or `ext' and there was a + # successful migration, unmap the device from the source node. + if self.instance.disk_template in (constants.DT_RBD, constants.DT_EXT): disks = _ExpandCheckDisks(instance, instance.disks) self.feedback_fn("* unmapping instance's disks from %s" % source_node) for disk in disks: @@ -8820,12 +9246,13 @@ def _CreateBlockDev(lu, node, instance, device, force_create, info, """ (disk,) = _AnnotateDiskParams(instance, [device], lu.cfg) + excl_stor = _IsExclusiveStorageEnabledNodeName(lu.cfg, node) return _CreateBlockDevInner(lu, node, instance, disk, force_create, info, - force_open) + force_open, excl_stor) def _CreateBlockDevInner(lu, node, instance, device, force_create, - info, force_open): + info, force_open, excl_stor): """Create a tree of block devices on a given node. If this device type has to be created on secondaries, create it and @@ -8852,6 +9279,8 @@ def _CreateBlockDevInner(lu, node, instance, device, force_create, L{backend.BlockdevCreate} function where it specifies whether we run on primary or not, and it affects both the child assembly and the device own Open() execution + @type excl_stor: boolean + @param excl_stor: Whether exclusive_storage is active for the node """ if device.CreateOnSecondary(): @@ -8860,15 +9289,17 @@ def _CreateBlockDevInner(lu, node, instance, device, force_create, if device.children: for child in device.children: _CreateBlockDevInner(lu, node, instance, child, force_create, - info, force_open) + info, force_open, excl_stor) if not force_create: return - _CreateSingleBlockDev(lu, node, instance, device, info, force_open) + _CreateSingleBlockDev(lu, node, instance, device, info, force_open, + excl_stor) -def _CreateSingleBlockDev(lu, node, instance, device, info, force_open): +def _CreateSingleBlockDev(lu, node, instance, device, info, force_open, + excl_stor): """Create a single block device on a given node. This will not recurse over children of the device, so they must be @@ -8887,11 +9318,14 @@ def _CreateSingleBlockDev(lu, node, instance, device, info, force_open): L{backend.BlockdevCreate} function where it specifies whether we run on primary or not, and it affects both the child assembly and the device own Open() execution + @type excl_stor: boolean + @param excl_stor: Whether exclusive_storage is active for the node """ lu.cfg.SetDiskID(device, node) result = lu.rpc.call_blockdev_create(node, device, device.size, - instance.name, force_open, info) + instance.name, force_open, info, + excl_stor) result.Raise("Can't create block device %s on" " node %s for instance %s" % (device, node, instance.name)) if device.physical_id is None: @@ -8939,6 +9373,7 @@ def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names, _DISK_TEMPLATE_NAME_PREFIX = { constants.DT_PLAIN: "", constants.DT_RBD: ".rbd", + constants.DT_EXT: ".ext", } @@ -8948,6 +9383,7 @@ _DISK_TEMPLATE_DEVICE_TYPE = { constants.DT_SHARED_FILE: constants.LD_FILE, constants.DT_BLOCK: constants.LD_BLOCKDEV, constants.DT_RBD: constants.LD_RBD, + constants.DT_EXT: constants.LD_EXT, } @@ -8959,8 +9395,6 @@ def _GenerateDiskTemplate( """Generate the entire disk layout for a given template type. """ - #TODO: compute space requirements - vgname = lu.cfg.GetVGName() disk_count = len(disk_info) disks = [] @@ -9013,9 +9447,11 @@ def _GenerateDiskTemplate( for i in range(disk_count)]) if template_name == constants.DT_PLAIN: + def logical_id_fn(idx, _, disk): vg = disk.get(constants.IDISK_VG, vgname) return (vg, names[idx]) + elif template_name in (constants.DT_FILE, constants.DT_SHARED_FILE): logical_id_fn = \ lambda _, disk_index, disk: (file_driver, @@ -9027,12 +9463,27 @@ def _GenerateDiskTemplate( disk[constants.IDISK_ADOPT]) elif template_name == constants.DT_RBD: logical_id_fn = lambda idx, _, disk: ("rbd", names[idx]) + elif template_name == constants.DT_EXT: + def logical_id_fn(idx, _, disk): + provider = disk.get(constants.IDISK_PROVIDER, None) + if provider is None: + raise errors.ProgrammerError("Disk template is %s, but '%s' is" + " not found", constants.DT_EXT, + constants.IDISK_PROVIDER) + return (provider, names[idx]) else: raise errors.ProgrammerError("Unknown disk template '%s'" % template_name) dev_type = _DISK_TEMPLATE_DEVICE_TYPE[template_name] for idx, disk in enumerate(disk_info): + params = {} + # Only for the Ext template add disk_info to params + if template_name == constants.DT_EXT: + params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER] + for key in disk: + if key not in constants.IDISK_PARAMS: + params[key] = disk[key] disk_index = idx + base_index size = disk[constants.IDISK_SIZE] feedback_fn("* disk %s, size %s" % @@ -9041,7 +9492,7 @@ def _GenerateDiskTemplate( logical_id=logical_id_fn(idx, disk_index, disk), iv_name="disk/%d" % disk_index, mode=disk[constants.IDISK_MODE], - params={})) + params=params)) return disks @@ -9370,13 +9821,15 @@ def _CheckOSParams(lu, required, nodenames, osname, osparams): osname, node) -def _CreateInstanceAllocRequest(op, disks, nics, beparams): +def _CreateInstanceAllocRequest(op, disks, nics, beparams, node_whitelist): """Wrapper around IAReqInstanceAlloc. @param op: The instance opcode @param disks: The computed disks @param nics: The computed nics @param beparams: The full filled beparams + @param node_whitelist: List of nodes which should appear as online to the + allocator (unless the node is already marked offline) @returns: A filled L{iallocator.IAReqInstanceAlloc} @@ -9391,23 +9844,24 @@ def _CreateInstanceAllocRequest(op, disks, nics, beparams): spindle_use=spindle_use, disks=disks, nics=[n.ToDict() for n in nics], - hypervisor=op.hypervisor) + hypervisor=op.hypervisor, + node_whitelist=node_whitelist) -def _ComputeNics(op, cluster, default_ip, cfg, proc): +def _ComputeNics(op, cluster, default_ip, cfg, ec_id): """Computes the nics. @param op: The instance opcode @param cluster: Cluster configuration object @param default_ip: The default ip to assign @param cfg: An instance of the configuration object - @param proc: The executer instance + @param ec_id: Execution context ID @returns: The build up nics """ nics = [] - for idx, nic in enumerate(op.nics): + for nic in op.nics: nic_mode_req = nic.get(constants.INIC_MODE, None) nic_mode = nic_mode_req if nic_mode is None or nic_mode == constants.VALUE_AUTO: @@ -9461,7 +9915,7 @@ def _ComputeNics(op, cluster, default_ip, cfg, proc): try: # TODO: We need to factor this out - cfg.ReserveMAC(mac, proc.GetECId()) + cfg.ReserveMAC(mac, ec_id) except errors.ReservationError: raise errors.OpPrereqError("MAC address %s already in use" " in cluster" % mac, @@ -9476,7 +9930,8 @@ def _ComputeNics(op, cluster, default_ip, cfg, proc): check_params = cluster.SimpleFillNIC(nicparams) objects.NIC.CheckParameterSyntax(check_params) - nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams)) + nics.append(objects.NIC(mac=mac, ip=nic_ip, + network=net, nicparams=nicparams)) return nics @@ -9487,7 +9942,7 @@ def _ComputeDisks(op, default_vg): @param op: The instance opcode @param default_vg: The default_vg to assume - @return: The computer disks + @return: The computed disks """ disks = [] @@ -9505,17 +9960,38 @@ def _ComputeDisks(op, default_vg): raise errors.OpPrereqError("Invalid disk size '%s'" % size, errors.ECODE_INVAL) + ext_provider = disk.get(constants.IDISK_PROVIDER, None) + if ext_provider and op.disk_template != constants.DT_EXT: + raise errors.OpPrereqError("The '%s' option is only valid for the %s" + " disk template, not %s" % + (constants.IDISK_PROVIDER, constants.DT_EXT, + op.disk_template), errors.ECODE_INVAL) + data_vg = disk.get(constants.IDISK_VG, default_vg) new_disk = { constants.IDISK_SIZE: size, constants.IDISK_MODE: mode, constants.IDISK_VG: data_vg, } + if constants.IDISK_METAVG in disk: new_disk[constants.IDISK_METAVG] = disk[constants.IDISK_METAVG] if constants.IDISK_ADOPT in disk: new_disk[constants.IDISK_ADOPT] = disk[constants.IDISK_ADOPT] - disks.append(new_disk) + + # For extstorage, demand the `provider' option and add any + # additional parameters (ext-params) to the dict + if op.disk_template == constants.DT_EXT: + if ext_provider: + new_disk[constants.IDISK_PROVIDER] = ext_provider + for key in disk: + if key not in constants.IDISK_PARAMS: + new_disk[key] = disk[key] + else: + raise errors.OpPrereqError("Missing provider for template '%s'" % + constants.DT_EXT, errors.ECODE_INVAL) + + disks.append(new_disk) return disks @@ -9538,6 +10014,16 @@ def _ComputeFullBeParams(op, cluster): return cluster.SimpleFillBE(op.beparams) +def _CheckOpportunisticLocking(op): + """Generate error if opportunistic locking is not possible. + + """ + if op.opportunistic_locking and not op.iallocator: + raise errors.OpPrereqError("Opportunistic locking is only available in" + " combination with an instance allocator", + errors.ECODE_INVAL) + + class LUInstanceCreate(LogicalUnit): """Create an instance. @@ -9571,7 +10057,8 @@ class LUInstanceCreate(LogicalUnit): # check disks. parameter names and consistent adopt/no-adopt strategy has_adopt = has_no_adopt = False for disk in self.op.disks: - utils.ForceDictType(disk, constants.IDISK_PARAMS_TYPES) + if self.op.disk_template != constants.DT_EXT: + utils.ForceDictType(disk, constants.IDISK_PARAMS_TYPES) if constants.IDISK_ADOPT in disk: has_adopt = True else: @@ -9633,6 +10120,8 @@ class LUInstanceCreate(LogicalUnit): " template") self.op.snode = None + _CheckOpportunisticLocking(self.op) + self._cds = _GetClusterDomainSecret() if self.op.mode == constants.INSTANCE_IMPORT: @@ -9723,7 +10212,11 @@ class LUInstanceCreate(LogicalUnit): # specifying a group on instance creation and then selecting nodes from # that group self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET - self.needed_locks[locking.LEVEL_NODE_RES] = locking.ALL_SET + self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET + + if self.op.opportunistic_locking: + self.opportunistic_locks[locking.LEVEL_NODE] = True + self.opportunistic_locks[locking.LEVEL_NODE_RES] = True else: self.op.pnode = _ExpandNodeName(self.cfg, self.op.pnode) nodelist = [self.op.pnode] @@ -9731,9 +10224,6 @@ class LUInstanceCreate(LogicalUnit): self.op.snode = _ExpandNodeName(self.cfg, self.op.snode) nodelist.append(self.op.snode) self.needed_locks[locking.LEVEL_NODE] = nodelist - # Lock resources of instance's primary and secondary nodes (copy to - # prevent accidential modification) - self.needed_locks[locking.LEVEL_NODE_RES] = list(nodelist) # in case of import lock the source node too if self.op.mode == constants.INSTANCE_IMPORT: @@ -9745,6 +10235,7 @@ class LUInstanceCreate(LogicalUnit): if src_node is None: self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET + self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET self.op.src_node = None if os.path.isabs(src_path): raise errors.OpPrereqError("Importing an instance from a path" @@ -9758,23 +10249,40 @@ class LUInstanceCreate(LogicalUnit): self.op.src_path = src_path = \ utils.PathJoin(pathutils.EXPORT_DIR, src_path) + self.needed_locks[locking.LEVEL_NODE_RES] = \ + _CopyLockList(self.needed_locks[locking.LEVEL_NODE]) + def _RunAllocator(self): """Run the allocator based on input opcode. """ + if self.op.opportunistic_locking: + # Only consider nodes for which a lock is held + node_whitelist = self.owned_locks(locking.LEVEL_NODE) + else: + node_whitelist = None + #TODO Export network to iallocator so that it chooses a pnode # in a nodegroup that has the desired network connected to req = _CreateInstanceAllocRequest(self.op, self.disks, - self.nics, self.be_full) + self.nics, self.be_full, + node_whitelist) ial = iallocator.IAllocator(self.cfg, self.rpc, req) ial.Run(self.op.iallocator) if not ial.success: + # When opportunistic locks are used only a temporary failure is generated + if self.op.opportunistic_locking: + ecode = errors.ECODE_TEMP_NORES + else: + ecode = errors.ECODE_NORES + raise errors.OpPrereqError("Can't compute nodes using" " iallocator '%s': %s" % (self.op.iallocator, ial.info), - errors.ECODE_NORES) + ecode) + self.op.pnode = ial.result[0] self.LogInfo("Selected nodes for instance %s via iallocator %s: %s", self.op.instance_name, self.op.iallocator, @@ -10056,7 +10564,7 @@ class LUInstanceCreate(LogicalUnit): utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES) filled_hvp = cluster.SimpleFillHV(self.op.hypervisor, self.op.os_type, self.op.hvparams) - hv_type = hypervisor.GetHypervisor(self.op.hypervisor) + hv_type = hypervisor.GetHypervisorClass(self.op.hypervisor) hv_type.CheckParameterSyntax(filled_hvp) self.hv_full = filled_hvp # check that we don't specify global parameters on an instance @@ -10074,8 +10582,8 @@ class LUInstanceCreate(LogicalUnit): self._RevertToDefaults(cluster) # NIC buildup - self.nics = _ComputeNics(self.op, cluster, self.hostname1.ip, self.cfg, - self.proc) + self.nics = _ComputeNics(self.op, cluster, self.check_ip, self.cfg, + self.proc.GetECId()) # disk checks/pre-build default_vg = self.cfg.GetVGName() @@ -10120,7 +10628,7 @@ class LUInstanceCreate(LogicalUnit): # creation job will fail. for nic in self.nics: if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE): - nic.mac = self.cfg.GenerateMAC(self.proc.GetECId()) + nic.mac = self.cfg.GenerateMAC(nic.network, self.proc.GetECId()) #### allocator run @@ -10128,12 +10636,14 @@ class LUInstanceCreate(LogicalUnit): self._RunAllocator() # Release all unneeded node locks - _ReleaseLocks(self, locking.LEVEL_NODE, - keep=filter(None, [self.op.pnode, self.op.snode, - self.op.src_node])) - _ReleaseLocks(self, locking.LEVEL_NODE_RES, - keep=filter(None, [self.op.pnode, self.op.snode, - self.op.src_node])) + keep_locks = filter(None, [self.op.pnode, self.op.snode, self.op.src_node]) + _ReleaseLocks(self, locking.LEVEL_NODE, keep=keep_locks) + _ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=keep_locks) + _ReleaseLocks(self, locking.LEVEL_NODE_ALLOC) + + assert (self.owned_locks(locking.LEVEL_NODE) == + self.owned_locks(locking.LEVEL_NODE_RES)), \ + "Node locks differ from node resource locks" #### node related checks @@ -10169,7 +10679,6 @@ class LUInstanceCreate(LogicalUnit): (idx, netparams.values())) nic.nicparams = dict(netparams) if nic.ip is not None: - filled_params = cluster.SimpleFillNIC(nic.nicparams) if nic.ip.lower() == constants.NIC_IP_POOL: try: nic.ip = self.cfg.GenerateIp(net, self.proc.GetECId()) @@ -10186,11 +10695,10 @@ class LUInstanceCreate(LogicalUnit): " or does not belong to network %s" % (nic.ip, net), errors.ECODE_NOTUNIQUE) - else: - # net is None, ip None or given - if self.op.conflicts_check: - _CheckForConflictingIp(self, nic.ip, self.pnode.name) + # net is None, ip None or given + elif self.op.conflicts_check: + _CheckForConflictingIp(self, nic.ip, self.pnode.name) # mirror node verification if self.op.disk_template in constants.DTS_INT_MIRROR: @@ -10209,6 +10717,16 @@ class LUInstanceCreate(LogicalUnit): " from the first disk's node group will be" " used") + if not self.op.disk_template in constants.DTS_EXCL_STORAGE: + nodes = [pnode] + if self.op.disk_template in constants.DTS_INT_MIRROR: + nodes.append(snode) + has_es = lambda n: _IsExclusiveStorageEnabledNode(self.cfg, n) + if compat.any(map(has_es, nodes)): + raise errors.OpPrereqError("Disk template %s not supported with" + " exclusive storage" % self.op.disk_template, + errors.ECODE_STATE) + nodenames = [pnode.name] + self.secondaries # Verify instance specs @@ -10236,6 +10754,9 @@ class LUInstanceCreate(LogicalUnit): # Any function that checks prerequisites can be placed here. # Check if there is enough space on the RADOS cluster. _CheckRADOSFreeSpace() + elif self.op.disk_template == constants.DT_EXT: + # FIXME: Function that checks prereqs if needed + pass else: # Check lv size requirements, if not adopting req_sizes = _ComputeDiskSizePerVG(self.op.disk_template, self.disks) @@ -10293,7 +10814,7 @@ class LUInstanceCreate(LogicalUnit): if baddisks: raise errors.OpPrereqError("Device node(s) %s lie outside %s and" " cannot be adopted" % - (", ".join(baddisks), + (utils.CommaJoin(baddisks), constants.ADOPTABLE_BLOCKDEV_ROOT), errors.ECODE_INVAL) @@ -10340,6 +10861,9 @@ class LUInstanceCreate(LogicalUnit): _CheckNicsBridgesExist(self, self.nics, self.pnode.name) + #TODO: _CheckExtParams (remotely) + # Check parameters for extstorage + # memory check on primary node #TODO(dynmem): use MINMEM for checking if self.op.start: @@ -10360,6 +10884,7 @@ class LUInstanceCreate(LogicalUnit): assert not (self.owned_locks(locking.LEVEL_NODE_RES) - self.owned_locks(locking.LEVEL_NODE)), \ "Node locks differ from node resource locks" + assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC) ht_kind = self.op.hypervisor if ht_kind in constants.HTS_REQ_PORT: @@ -10628,6 +11153,8 @@ class LUInstanceMultiAlloc(NoHooksLU): " or set a cluster-wide default iallocator", errors.ECODE_INVAL) + _CheckOpportunisticLocking(self.op) + dups = utils.FindDuplicates([op.instance_name for op in self.op.instances]) if dups: raise errors.OpPrereqError("There are duplicate instance names: %s" % @@ -10638,11 +11165,19 @@ class LUInstanceMultiAlloc(NoHooksLU): """ self.share_locks = _ShareAll() - self.needed_locks = {} + self.needed_locks = { + # iallocator will select nodes and even if no iallocator is used, + # collisions with LUInstanceCreate should be avoided + locking.LEVEL_NODE_ALLOC: locking.ALL_SET, + } if self.op.iallocator: self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET self.needed_locks[locking.LEVEL_NODE_RES] = locking.ALL_SET + + if self.op.opportunistic_locking: + self.opportunistic_locks[locking.LEVEL_NODE] = True + self.opportunistic_locks[locking.LEVEL_NODE_RES] = True else: nodeslist = [] for inst in self.op.instances: @@ -10663,11 +11198,21 @@ class LUInstanceMultiAlloc(NoHooksLU): """ cluster = self.cfg.GetClusterInfo() default_vg = self.cfg.GetVGName() + ec_id = self.proc.GetECId() + + if self.op.opportunistic_locking: + # Only consider nodes for which a lock is held + node_whitelist = self.owned_locks(locking.LEVEL_NODE) + else: + node_whitelist = None + insts = [_CreateInstanceAllocRequest(op, _ComputeDisks(op, default_vg), _ComputeNics(op, cluster, None, - self.cfg, self.proc), - _ComputeFullBeParams(op, cluster)) + self.cfg, ec_id), + _ComputeFullBeParams(op, cluster), + node_whitelist) for op in self.op.instances] + req = iallocator.IAReqMultiInstanceAlloc(instances=insts) ial = iallocator.IAllocator(self.cfg, self.rpc, req) @@ -10682,7 +11227,7 @@ class LUInstanceMultiAlloc(NoHooksLU): self.ia_result = ial.result if self.op.dry_run: - self.dry_run_rsult = objects.FillDict(self._ConstructPartialResult(), { + self.dry_run_result = objects.FillDict(self._ConstructPartialResult(), { constants.JOB_IDS_KEY: [], }) @@ -10789,7 +11334,7 @@ def _GetInstanceConsole(cluster, instance): @rtype: dict """ - hyper = hypervisor.GetHypervisor(instance.hypervisor) + hyper = hypervisor.GetHypervisorClass(instance.hypervisor) # beparams and hvparams are passed separately, to avoid editing the # instance and then saving the defaults in the instance itself. hvparams = cluster.FillHV(instance) @@ -10856,12 +11401,13 @@ class LUInstanceReplaceDisks(LogicalUnit): if self.op.iallocator is not None: # iallocator will select a new node in the same group self.needed_locks[locking.LEVEL_NODEGROUP] = [] + self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET self.needed_locks[locking.LEVEL_NODE_RES] = [] self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode, self.op.iallocator, self.op.remote_node, - self.op.disks, False, self.op.early_release, + self.op.disks, self.op.early_release, self.op.ignore_ipolicy) self.tasklets = [self.replacer] @@ -10882,6 +11428,7 @@ class LUInstanceReplaceDisks(LogicalUnit): if self.op.iallocator is not None: assert self.op.remote_node is None assert not self.needed_locks[locking.LEVEL_NODE] + assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC) # Lock member nodes of all locked groups self.needed_locks[locking.LEVEL_NODE] = \ @@ -10889,7 +11436,10 @@ class LUInstanceReplaceDisks(LogicalUnit): for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP) for node_name in self.cfg.GetNodeGroup(group_uuid).members] else: + assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC) + self._LockInstancesNodes() + elif level == locking.LEVEL_NODE_RES: # Reuse node locks self.needed_locks[locking.LEVEL_NODE_RES] = \ @@ -10945,7 +11495,7 @@ class TLReplaceDisks(Tasklet): """ def __init__(self, lu, instance_name, mode, iallocator_name, remote_node, - disks, delay_iallocator, early_release, ignore_ipolicy): + disks, early_release, ignore_ipolicy): """Initializes this class. """ @@ -10957,7 +11507,6 @@ class TLReplaceDisks(Tasklet): self.iallocator_name = iallocator_name self.remote_node = remote_node self.disks = disks - self.delay_iallocator = delay_iallocator self.early_release = early_release self.ignore_ipolicy = ignore_ipolicy @@ -11042,18 +11591,6 @@ class TLReplaceDisks(Tasklet): len(instance.secondary_nodes), errors.ECODE_FAULT) - if not self.delay_iallocator: - self._CheckPrereq2() - - def _CheckPrereq2(self): - """Check prerequisites, second part. - - This function should always be part of CheckPrereq. It was separated and is - now called from Exec because during node evacuation iallocator was only - called with an unmodified cluster model, not taking planned changes into - account. - - """ instance = self.instance secondary_node = instance.secondary_nodes[0] @@ -11174,10 +11711,10 @@ class TLReplaceDisks(Tasklet): # Release unneeded node and node resource locks _ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes) _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes) + _ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC) # Release any owned node group - if self.lu.glm.is_owned(locking.LEVEL_NODEGROUP): - _ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP) + _ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP) # Check whether disks are valid for disk_idx in self.disks: @@ -11193,9 +11730,6 @@ class TLReplaceDisks(Tasklet): This dispatches the disk replacement to the appropriate handler. """ - if self.delay_iallocator: - self._CheckPrereq2() - if __debug__: # Verify owned locks before starting operation owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE) @@ -11204,6 +11738,7 @@ class TLReplaceDisks(Tasklet): (owned_nodes, self.node_secondary_ip.keys())) assert (self.lu.owned_locks(locking.LEVEL_NODE) == self.lu.owned_locks(locking.LEVEL_NODE_RES)) + assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC) owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE) assert list(owned_instances) == [self.instance_name], \ @@ -11219,8 +11754,8 @@ class TLReplaceDisks(Tasklet): feedback_fn("Replacing disk(s) %s for instance '%s'" % (utils.CommaJoin(self.disks), self.instance.name)) - feedback_fn("Current primary node: %s", self.instance.primary_node) - feedback_fn("Current seconary node: %s", + feedback_fn("Current primary node: %s" % self.instance.primary_node) + feedback_fn("Current seconary node: %s" % utils.CommaJoin(self.instance.secondary_nodes)) activate_disks = (self.instance.admin_state != constants.ADMINST_UP) @@ -11280,7 +11815,7 @@ class TLReplaceDisks(Tasklet): continue for node in nodes: - self.lu.LogInfo("Checking disk/%d on %s" % (idx, node)) + self.lu.LogInfo("Checking disk/%d on %s", idx, node) self.cfg.SetDiskID(dev, node) result = _BlockdevFind(self, node, dev, self.instance) @@ -11320,7 +11855,7 @@ class TLReplaceDisks(Tasklet): if idx not in self.disks: continue - self.lu.LogInfo("Adding storage on %s for disk/%d" % (node_name, idx)) + self.lu.LogInfo("Adding storage on %s for disk/%d", node_name, idx) self.cfg.SetDiskID(dev, node_name) @@ -11341,11 +11876,13 @@ class TLReplaceDisks(Tasklet): new_lvs = [lv_data, lv_meta] old_lvs = [child.Copy() for child in dev.children] iv_names[dev.iv_name] = (dev, old_lvs, new_lvs) + excl_stor = _IsExclusiveStorageEnabledNodeName(self.lu.cfg, node_name) # we pass force_create=True to force the LVM creation for new_lv in new_lvs: _CreateBlockDevInner(self.lu, node_name, self.instance, new_lv, True, - _GetInstanceInfoText(self.instance), False) + _GetInstanceInfoText(self.instance), False, + excl_stor) return iv_names @@ -11367,14 +11904,14 @@ class TLReplaceDisks(Tasklet): def _RemoveOldStorage(self, node_name, iv_names): for name, (_, old_lvs, _) in iv_names.iteritems(): - self.lu.LogInfo("Remove logical volumes for %s" % name) + self.lu.LogInfo("Remove logical volumes for %s", name) for lv in old_lvs: self.cfg.SetDiskID(lv, node_name) msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg if msg: - self.lu.LogWarning("Can't remove old LV: %s" % msg, + self.lu.LogWarning("Can't remove old LV: %s", msg, hint="remove unused LVs manually") def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613 @@ -11419,7 +11956,7 @@ class TLReplaceDisks(Tasklet): # Step: for each lv, detach+rename*2+attach self.lu.LogStep(4, steps_total, "Changing drbd configuration") for dev, old_lvs, new_lvs in iv_names.itervalues(): - self.lu.LogInfo("Detaching %s drbd from local storage" % dev.iv_name) + self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name) result = self.rpc.call_blockdev_removechildren(self.target_node, dev, old_lvs) @@ -11473,7 +12010,7 @@ class TLReplaceDisks(Tasklet): self.cfg.SetDiskID(disk, self.target_node) # Now that the new lvs have the old name, we can add them to the device - self.lu.LogInfo("Adding new mirror component on %s" % self.target_node) + self.lu.LogInfo("Adding new mirror component on %s", self.target_node) result = self.rpc.call_blockdev_addchildren(self.target_node, (dev, self.instance), new_lvs) msg = result.fail_msg @@ -11554,13 +12091,15 @@ class TLReplaceDisks(Tasklet): # Step: create new storage self.lu.LogStep(3, steps_total, "Allocate new storage") disks = _AnnotateDiskParams(self.instance, self.instance.disks, self.cfg) + excl_stor = _IsExclusiveStorageEnabledNodeName(self.lu.cfg, self.new_node) for idx, dev in enumerate(disks): self.lu.LogInfo("Adding new local storage on %s for disk/%d" % (self.new_node, idx)) # we pass force_create=True to force LVM creation for new_lv in dev.children: _CreateBlockDevInner(self.lu, self.new_node, self.instance, new_lv, - True, _GetInstanceInfoText(self.instance), False) + True, _GetInstanceInfoText(self.instance), False, + excl_stor) # Step 4: dbrd minors and drbd setups changes # after this, we must manually remove the drbd minors on both the @@ -11604,14 +12143,15 @@ class TLReplaceDisks(Tasklet): try: _CreateSingleBlockDev(self.lu, self.new_node, self.instance, anno_new_drbd, - _GetInstanceInfoText(self.instance), False) + _GetInstanceInfoText(self.instance), False, + excl_stor) except errors.GenericError: self.cfg.ReleaseDRBDMinors(self.instance.name) raise # We have new devices, shutdown the drbd on the old secondary for idx, dev in enumerate(self.instance.disks): - self.lu.LogInfo("Shutting down drbd for disk/%d on old node" % idx) + self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx) self.cfg.SetDiskID(dev, self.target_node) msg = self.rpc.call_blockdev_shutdown(self.target_node, (dev, self.instance)).fail_msg @@ -11723,7 +12263,7 @@ class LURepairNodeStorage(NoHooksLU): errors.ECODE_STATE) except errors.OpPrereqError, err: if self.op.ignore_consistency: - self.proc.LogWarning(str(err.args[0])) + self.LogWarning(str(err.args[0])) else: raise @@ -11944,8 +12484,7 @@ class LUNodeEvacuate(NoHooksLU): disks=[], mode=constants.REPLACE_DISK_CHG, early_release=self.op.early_release)] - for instance_name in self.instance_names - ] + for instance_name in self.instance_names] else: raise errors.ProgrammerError("No iallocator or remote node") @@ -12109,13 +12648,22 @@ class LUInstanceGrowDisk(LogicalUnit): utils.FormatUnit(self.delta, "h"), errors.ECODE_INVAL) - if instance.disk_template not in (constants.DT_FILE, - constants.DT_SHARED_FILE, - constants.DT_RBD): + self._CheckDiskSpace(nodenames, self.disk.ComputeGrowth(self.delta)) + + def _CheckDiskSpace(self, nodenames, req_vgspace): + template = self.instance.disk_template + if template not in (constants.DTS_NO_FREE_SPACE_CHECK): # TODO: check the free disk space for file, when that feature will be # supported - _CheckNodesFreeDiskPerVG(self, nodenames, - self.disk.ComputeGrowth(self.delta)) + nodes = map(self.cfg.GetNodeInfo, nodenames) + es_nodes = filter(lambda n: _IsExclusiveStorageEnabledNode(self.cfg, n), + nodes) + if es_nodes: + # With exclusive storage we need to something smarter than just looking + # at free space; for now, let's simply abort the operation. + raise errors.OpPrereqError("Cannot grow disks when exclusive_storage" + " is enabled", errors.ECODE_STATE) + _CheckNodesFreeDiskPerVG(self, nodenames, req_vgspace) def Exec(self, feedback_fn): """Execute disk grow. @@ -12202,14 +12750,14 @@ class LUInstanceGrowDisk(LogicalUnit): if self.op.wait_for_sync: disk_abort = not _WaitForSync(self, instance, disks=[disk]) if disk_abort: - self.proc.LogWarning("Disk sync-ing has not returned a good" - " status; please check the instance") + self.LogWarning("Disk syncing has not returned a good status; check" + " the instance") if instance.admin_state != constants.ADMINST_UP: _SafeShutdownInstanceDisks(self, instance, disks=[disk]) elif instance.admin_state != constants.ADMINST_UP: - self.proc.LogWarning("Not shutting down the disk even if the instance is" - " not supposed to be running because no wait for" - " sync mode was requested") + self.LogWarning("Not shutting down the disk even if the instance is" + " not supposed to be running because no wait for" + " sync mode was requested") assert self.owned_locks(locking.LEVEL_NODE_RES) assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE) @@ -12611,7 +13159,10 @@ class LUInstanceSetParams(LogicalUnit): for (op, _, params) in mods: assert ht.TDict(params) - utils.ForceDictType(params, key_types) + # If 'key_types' is an empty dict, we assume we have an + # 'ext' template and thus do not ForceDictType + if key_types: + utils.ForceDictType(params, key_types) if op == constants.DDM_REMOVE: if params: @@ -12647,9 +13198,18 @@ class LUInstanceSetParams(LogicalUnit): params[constants.IDISK_SIZE] = size - elif op == constants.DDM_MODIFY and constants.IDISK_SIZE in params: - raise errors.OpPrereqError("Disk size change not possible, use" - " grow-disk", errors.ECODE_INVAL) + elif op == constants.DDM_MODIFY: + if constants.IDISK_SIZE in params: + raise errors.OpPrereqError("Disk size change not possible, use" + " grow-disk", errors.ECODE_INVAL) + if constants.IDISK_MODE not in params: + raise errors.OpPrereqError("Disk 'mode' is the only kind of" + " modification supported, but missing", + errors.ECODE_NOENT) + if len(params) > 1: + raise errors.OpPrereqError("Disk modification doesn't support" + " additional arbitrary parameters", + errors.ECODE_INVAL) @staticmethod def _VerifyNicModification(op, params): @@ -12658,29 +13218,37 @@ class LUInstanceSetParams(LogicalUnit): """ if op in (constants.DDM_ADD, constants.DDM_MODIFY): ip = params.get(constants.INIC_IP, None) - if ip is None: - pass - elif ip.lower() == constants.VALUE_NONE: - params[constants.INIC_IP] = None - elif not netutils.IPAddress.IsValid(ip): - raise errors.OpPrereqError("Invalid IP address '%s'" % ip, - errors.ECODE_INVAL) - - bridge = params.get("bridge", None) - link = params.get(constants.INIC_LINK, None) - if bridge and link: - raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'" - " at the same time", errors.ECODE_INVAL) - elif bridge and bridge.lower() == constants.VALUE_NONE: - params["bridge"] = None - elif link and link.lower() == constants.VALUE_NONE: - params[constants.INIC_LINK] = None + req_net = params.get(constants.INIC_NETWORK, None) + link = params.get(constants.NIC_LINK, None) + mode = params.get(constants.NIC_MODE, None) + if req_net is not None: + if req_net.lower() == constants.VALUE_NONE: + params[constants.INIC_NETWORK] = None + req_net = None + elif link is not None or mode is not None: + raise errors.OpPrereqError("If network is given" + " mode or link should not", + errors.ECODE_INVAL) if op == constants.DDM_ADD: macaddr = params.get(constants.INIC_MAC, None) if macaddr is None: params[constants.INIC_MAC] = constants.VALUE_AUTO + if ip is not None: + if ip.lower() == constants.VALUE_NONE: + params[constants.INIC_IP] = None + else: + if ip.lower() == constants.NIC_IP_POOL: + if op == constants.DDM_ADD and req_net is None: + raise errors.OpPrereqError("If ip=pool, parameter network" + " cannot be none", + errors.ECODE_INVAL) + else: + if not netutils.IPAddress.IsValid(ip): + raise errors.OpPrereqError("Invalid IP address '%s'" % ip, + errors.ECODE_INVAL) + if constants.INIC_MAC in params: macaddr = params[constants.INIC_MAC] if macaddr not in (constants.VALUE_AUTO, constants.VALUE_GENERATE): @@ -12705,10 +13273,6 @@ class LUInstanceSetParams(LogicalUnit): self.op.nics = self._UpgradeDiskNicMods( "NIC", self.op.nics, opcodes.OpInstanceSetParams.TestNicModifications) - # Check disk modifications - self._CheckMods("disk", self.op.disks, constants.IDISK_PARAMS_TYPES, - self._VerifyDiskModification) - if self.op.disks and self.op.disk_template is not None: raise errors.OpPrereqError("Disk template conversion and other disk" " changes not supported at the same time", @@ -12727,15 +13291,23 @@ class LUInstanceSetParams(LogicalUnit): def ExpandNames(self): self._ExpandAndLockInstance() + self.needed_locks[locking.LEVEL_NODEGROUP] = [] # Can't even acquire node locks in shared mode as upcoming changes in # Ganeti 2.6 will start to modify the node object on disk conversion self.needed_locks[locking.LEVEL_NODE] = [] self.needed_locks[locking.LEVEL_NODE_RES] = [] self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE + # Look node group to look up the ipolicy + self.share_locks[locking.LEVEL_NODEGROUP] = 1 def DeclareLocks(self, level): - # TODO: Acquire group lock in shared mode (disk parameters) - if level == locking.LEVEL_NODE: + if level == locking.LEVEL_NODEGROUP: + assert not self.needed_locks[locking.LEVEL_NODEGROUP] + # Acquire locks for the instance's nodegroups optimistically. Needs + # to be verified in CheckPrereq + self.needed_locks[locking.LEVEL_NODEGROUP] = \ + self.cfg.GetInstanceNodeGroups(self.op.instance_name) + elif level == locking.LEVEL_NODE: self._LockInstancesNodes() if self.op.disk_template and self.op.remote_node: self.op.remote_node = _ExpandNodeName(self.cfg, self.op.remote_node) @@ -12751,7 +13323,7 @@ class LUInstanceSetParams(LogicalUnit): This runs on the master, primary and secondaries. """ - args = dict() + args = {} if constants.BE_MINMEM in self.be_new: args["minmem"] = self.be_new[constants.BE_MINMEM] if constants.BE_MAXMEM in self.be_new: @@ -12765,10 +13337,10 @@ class LUInstanceSetParams(LogicalUnit): nics = [] for nic in self._new_nics: - nicparams = self.cluster.SimpleFillNIC(nic.nicparams) - mode = nicparams[constants.NIC_MODE] - link = nicparams[constants.NIC_LINK] - nics.append((nic.ip, nic.mac, mode, link)) + n = copy.deepcopy(nic) + nicparams = self.cluster.SimpleFillNIC(n.nicparams) + n.nicparams = nicparams + nics.append(_NICToTuple(self, n)) args["nics"] = nics @@ -12787,16 +13359,27 @@ class LUInstanceSetParams(LogicalUnit): nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes) return (nl, nl) - def _PrepareNicModification(self, params, private, old_ip, old_params, - cluster, pnode): + def _PrepareNicModification(self, params, private, old_ip, old_net, + old_params, cluster, pnode): + update_params_dict = dict([(key, params[key]) for key in constants.NICS_PARAMETERS if key in params]) - if "bridge" in params: - update_params_dict[constants.NIC_LINK] = params["bridge"] + req_link = update_params_dict.get(constants.NIC_LINK, None) + req_mode = update_params_dict.get(constants.NIC_MODE, None) + + new_net = params.get(constants.INIC_NETWORK, old_net) + if new_net is not None: + netparams = self.cfg.GetGroupNetParams(new_net, pnode) + if netparams is None: + raise errors.OpPrereqError("No netparams found for the network" + " %s, probably not connected" % new_net, + errors.ECODE_INVAL) + new_params = dict(netparams) + else: + new_params = _GetUpdatedParams(old_params, update_params_dict) - new_params = _GetUpdatedParams(old_params, update_params_dict) utils.ForceDictType(new_params, constants.NICS_PARAMETER_TYPES) new_filled_params = cluster.SimpleFillNIC(new_params) @@ -12819,6 +13402,10 @@ class LUInstanceSetParams(LogicalUnit): raise errors.OpPrereqError("Cannot set the NIC IP address to None" " on a routed NIC", errors.ECODE_INVAL) + elif new_mode == constants.NIC_MODE_OVS: + # TODO: check OVS link + self.LogInfo("OVS links are currently not checked for correctness") + if constants.INIC_MAC in params: mac = params[constants.INIC_MAC] if mac is None: @@ -12827,7 +13414,7 @@ class LUInstanceSetParams(LogicalUnit): elif mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE): # otherwise generate the MAC address params[constants.INIC_MAC] = \ - self.cfg.GenerateMAC(self.proc.GetECId()) + self.cfg.GenerateMAC(new_net, self.proc.GetECId()) else: # or validate/reserve the current one try: @@ -12836,31 +13423,194 @@ class LUInstanceSetParams(LogicalUnit): raise errors.OpPrereqError("MAC address '%s' already in use" " in cluster" % mac, errors.ECODE_NOTUNIQUE) + elif new_net != old_net: + + def get_net_prefix(net): + if net: + uuid = self.cfg.LookupNetwork(net) + if uuid: + nobj = self.cfg.GetNetwork(uuid) + return nobj.mac_prefix + return None + + new_prefix = get_net_prefix(new_net) + old_prefix = get_net_prefix(old_net) + if old_prefix != new_prefix: + params[constants.INIC_MAC] = \ + self.cfg.GenerateMAC(new_net, self.proc.GetECId()) + + #if there is a change in nic-network configuration + new_ip = params.get(constants.INIC_IP, old_ip) + if (new_ip, new_net) != (old_ip, old_net): + if new_ip: + if new_net: + if new_ip.lower() == constants.NIC_IP_POOL: + try: + new_ip = self.cfg.GenerateIp(new_net, self.proc.GetECId()) + except errors.ReservationError: + raise errors.OpPrereqError("Unable to get a free IP" + " from the address pool", + errors.ECODE_STATE) + self.LogInfo("Chose IP %s from pool %s", new_ip, new_net) + params[constants.INIC_IP] = new_ip + elif new_ip != old_ip or new_net != old_net: + try: + self.LogInfo("Reserving IP %s in pool %s", new_ip, new_net) + self.cfg.ReserveIp(new_net, new_ip, self.proc.GetECId()) + except errors.ReservationError: + raise errors.OpPrereqError("IP %s not available in network %s" % + (new_ip, new_net), + errors.ECODE_NOTUNIQUE) + elif new_ip.lower() == constants.NIC_IP_POOL: + raise errors.OpPrereqError("ip=pool, but no network found", + errors.ECODE_INVAL) + + # new net is None + elif self.op.conflicts_check: + _CheckForConflictingIp(self, new_ip, pnode) + + if old_ip: + if old_net: + try: + self.cfg.ReleaseIp(old_net, old_ip, self.proc.GetECId()) + except errors.AddressPoolError: + logging.warning("Release IP %s not contained in network %s", + old_ip, old_net) + + # there are no changes in (net, ip) tuple + elif (old_net is not None and + (req_link is not None or req_mode is not None)): + raise errors.OpPrereqError("Not allowed to change link or mode of" + " a NIC that is connected to a network", + errors.ECODE_INVAL) private.params = new_params private.filled = new_filled_params + def _PreCheckDiskTemplate(self, pnode_info): + """CheckPrereq checks related to a new disk template.""" + # Arguments are passed to avoid configuration lookups + instance = self.instance + pnode = instance.primary_node + cluster = self.cluster + if instance.disk_template == self.op.disk_template: + raise errors.OpPrereqError("Instance already has disk template %s" % + instance.disk_template, errors.ECODE_INVAL) + + if (instance.disk_template, + self.op.disk_template) not in self._DISK_CONVERSIONS: + raise errors.OpPrereqError("Unsupported disk template conversion from" + " %s to %s" % (instance.disk_template, + self.op.disk_template), + errors.ECODE_INVAL) + _CheckInstanceState(self, instance, INSTANCE_DOWN, + msg="cannot change disk template") + if self.op.disk_template in constants.DTS_INT_MIRROR: + if self.op.remote_node == pnode: + raise errors.OpPrereqError("Given new secondary node %s is the same" + " as the primary node of the instance" % + self.op.remote_node, errors.ECODE_STATE) + _CheckNodeOnline(self, self.op.remote_node) + _CheckNodeNotDrained(self, self.op.remote_node) + # FIXME: here we assume that the old instance type is DT_PLAIN + assert instance.disk_template == constants.DT_PLAIN + disks = [{constants.IDISK_SIZE: d.size, + constants.IDISK_VG: d.logical_id[0]} + for d in instance.disks] + required = _ComputeDiskSizePerVG(self.op.disk_template, disks) + _CheckNodesFreeDiskPerVG(self, [self.op.remote_node], required) + + snode_info = self.cfg.GetNodeInfo(self.op.remote_node) + snode_group = self.cfg.GetNodeGroup(snode_info.group) + ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster, + snode_group) + _CheckTargetNodeIPolicy(self, ipolicy, instance, snode_info, + ignore=self.op.ignore_ipolicy) + if pnode_info.group != snode_info.group: + self.LogWarning("The primary and secondary nodes are in two" + " different node groups; the disk parameters" + " from the first disk's node group will be" + " used") + + if not self.op.disk_template in constants.DTS_EXCL_STORAGE: + # Make sure none of the nodes require exclusive storage + nodes = [pnode_info] + if self.op.disk_template in constants.DTS_INT_MIRROR: + assert snode_info + nodes.append(snode_info) + has_es = lambda n: _IsExclusiveStorageEnabledNode(self.cfg, n) + if compat.any(map(has_es, nodes)): + errmsg = ("Cannot convert disk template from %s to %s when exclusive" + " storage is enabled" % (instance.disk_template, + self.op.disk_template)) + raise errors.OpPrereqError(errmsg, errors.ECODE_STATE) + def CheckPrereq(self): """Check prerequisites. This only checks the instance list against the existing names. """ - # checking the new params on the primary/secondary nodes - + assert self.op.instance_name in self.owned_locks(locking.LEVEL_INSTANCE) instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name) + cluster = self.cluster = self.cfg.GetClusterInfo() assert self.instance is not None, \ "Cannot retrieve locked instance %s" % self.op.instance_name + pnode = instance.primary_node + assert pnode in self.owned_locks(locking.LEVEL_NODE) nodelist = list(instance.all_nodes) pnode_info = self.cfg.GetNodeInfo(pnode) self.diskparams = self.cfg.GetInstanceDiskParams(instance) + #_CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups) + assert pnode_info.group in self.owned_locks(locking.LEVEL_NODEGROUP) + group_info = self.cfg.GetNodeGroup(pnode_info.group) + + # dictionary with instance information after the modification + ispec = {} + + # Check disk modifications. This is done here and not in CheckArguments + # (as with NICs), because we need to know the instance's disk template + if instance.disk_template == constants.DT_EXT: + self._CheckMods("disk", self.op.disks, {}, + self._VerifyDiskModification) + else: + self._CheckMods("disk", self.op.disks, constants.IDISK_PARAMS_TYPES, + self._VerifyDiskModification) + # Prepare disk/NIC modifications self.diskmod = PrepareContainerMods(self.op.disks, None) self.nicmod = PrepareContainerMods(self.op.nics, _InstNicModPrivate) + # Check the validity of the `provider' parameter + if instance.disk_template in constants.DT_EXT: + for mod in self.diskmod: + ext_provider = mod[2].get(constants.IDISK_PROVIDER, None) + if mod[0] == constants.DDM_ADD: + if ext_provider is None: + raise errors.OpPrereqError("Instance template is '%s' and parameter" + " '%s' missing, during disk add" % + (constants.DT_EXT, + constants.IDISK_PROVIDER), + errors.ECODE_NOENT) + elif mod[0] == constants.DDM_MODIFY: + if ext_provider: + raise errors.OpPrereqError("Parameter '%s' is invalid during disk" + " modification" % + constants.IDISK_PROVIDER, + errors.ECODE_INVAL) + else: + for mod in self.diskmod: + ext_provider = mod[2].get(constants.IDISK_PROVIDER, None) + if ext_provider is not None: + raise errors.OpPrereqError("Parameter '%s' is only valid for" + " instances of type '%s'" % + (constants.IDISK_PROVIDER, + constants.DT_EXT), + errors.ECODE_INVAL) + # OS change if self.op.os_name and not self.op.force: _CheckNodeHasOS(self, instance.primary_node, self.op.os_name, @@ -12873,44 +13623,7 @@ class LUInstanceSetParams(LogicalUnit): "Can't modify disk template and apply disk changes at the same time" if self.op.disk_template: - if instance.disk_template == self.op.disk_template: - raise errors.OpPrereqError("Instance already has disk template %s" % - instance.disk_template, errors.ECODE_INVAL) - - if (instance.disk_template, - self.op.disk_template) not in self._DISK_CONVERSIONS: - raise errors.OpPrereqError("Unsupported disk template conversion from" - " %s to %s" % (instance.disk_template, - self.op.disk_template), - errors.ECODE_INVAL) - _CheckInstanceState(self, instance, INSTANCE_DOWN, - msg="cannot change disk template") - if self.op.disk_template in constants.DTS_INT_MIRROR: - if self.op.remote_node == pnode: - raise errors.OpPrereqError("Given new secondary node %s is the same" - " as the primary node of the instance" % - self.op.remote_node, errors.ECODE_STATE) - _CheckNodeOnline(self, self.op.remote_node) - _CheckNodeNotDrained(self, self.op.remote_node) - # FIXME: here we assume that the old instance type is DT_PLAIN - assert instance.disk_template == constants.DT_PLAIN - disks = [{constants.IDISK_SIZE: d.size, - constants.IDISK_VG: d.logical_id[0]} - for d in instance.disks] - required = _ComputeDiskSizePerVG(self.op.disk_template, disks) - _CheckNodesFreeDiskPerVG(self, [self.op.remote_node], required) - - snode_info = self.cfg.GetNodeInfo(self.op.remote_node) - snode_group = self.cfg.GetNodeGroup(snode_info.group) - ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster, - snode_group) - _CheckTargetNodeIPolicy(self, ipolicy, instance, snode_info, - ignore=self.op.ignore_ipolicy) - if pnode_info.group != snode_info.group: - self.LogWarning("The primary and secondary nodes are in two" - " different node groups; the disk parameters" - " from the first disk's node group will be" - " used") + self._PreCheckDiskTemplate(pnode_info) # hvparams processing if self.op.hvparams: @@ -12920,7 +13633,7 @@ class LUInstanceSetParams(LogicalUnit): hv_new = cluster.SimpleFillHV(hv_type, instance.os, i_hvdict) # local check - hypervisor.GetHypervisor(hv_type).CheckParameterSyntax(hv_new) + hypervisor.GetHypervisorClass(hv_type).CheckParameterSyntax(hv_new) _CheckHVParams(self, nodelist, instance.hypervisor, hv_new) self.hv_proposed = self.hv_new = hv_new # the new actual values self.hv_inst = i_hvdict # the new dict (without defaults) @@ -12990,7 +13703,7 @@ class LUInstanceSetParams(LogicalUnit): instance_info = self.rpc.call_instance_info(pnode, instance.name, instance.hypervisor) nodeinfo = self.rpc.call_node_info(mem_check_list, None, - [instance.hypervisor]) + [instance.hypervisor], False) pninfo = nodeinfo[pnode] msg = pninfo.fail_msg if msg: @@ -13073,18 +13786,25 @@ class LUInstanceSetParams(LogicalUnit): " diskless instances", errors.ECODE_INVAL) def _PrepareNicCreate(_, params, private): - self._PrepareNicModification(params, private, None, {}, cluster, pnode) + self._PrepareNicModification(params, private, None, None, + {}, cluster, pnode) return (None, None) def _PrepareNicMod(_, nic, params, private): - self._PrepareNicModification(params, private, nic.ip, + self._PrepareNicModification(params, private, nic.ip, nic.network, nic.nicparams, cluster, pnode) return None + def _PrepareNicRemove(_, params, __): + ip = params.ip + net = params.network + if net is not None and ip is not None: + self.cfg.ReleaseIp(net, ip, self.proc.GetECId()) + # Verify NIC changes (operating on copy) nics = instance.nics[:] ApplyContainerMods("NIC", nics, None, self.nicmod, - _PrepareNicCreate, _PrepareNicMod, None) + _PrepareNicCreate, _PrepareNicMod, _PrepareNicRemove) if len(nics) > constants.MAX_NICS: raise errors.OpPrereqError("Instance has too many network interfaces" " (%d), cannot add more" % constants.MAX_NICS, @@ -13097,13 +13817,15 @@ class LUInstanceSetParams(LogicalUnit): raise errors.OpPrereqError("Instance has too many disks (%d), cannot add" " more" % constants.MAX_DISKS, errors.ECODE_STATE) + disk_sizes = [disk.size for disk in instance.disks] + disk_sizes.extend(params["size"] for (op, idx, params, private) in + self.diskmod if op == constants.DDM_ADD) + ispec[constants.ISPEC_DISK_COUNT] = len(disk_sizes) + ispec[constants.ISPEC_DISK_SIZE] = disk_sizes - if self.op.offline is not None: - if self.op.offline: - msg = "can't change to offline" - else: - msg = "can't change to online" - _CheckInstanceState(self, instance, CAN_CHANGE_INSTANCE_OFFLINE, msg=msg) + if self.op.offline is not None and self.op.offline: + _CheckInstanceState(self, instance, CAN_CHANGE_INSTANCE_OFFLINE, + msg="can't change to offline") # Pre-compute NIC changes (necessary to use result in hooks) self._nic_chgdesc = [] @@ -13113,8 +13835,38 @@ class LUInstanceSetParams(LogicalUnit): ApplyContainerMods("NIC", nics, self._nic_chgdesc, self.nicmod, self._CreateNewNic, self._ApplyNicMods, None) self._new_nics = nics + ispec[constants.ISPEC_NIC_COUNT] = len(self._new_nics) else: self._new_nics = None + ispec[constants.ISPEC_NIC_COUNT] = len(instance.nics) + + if not self.op.ignore_ipolicy: + ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster, + group_info) + + # Fill ispec with backend parameters + ispec[constants.ISPEC_SPINDLE_USE] = \ + self.be_new.get(constants.BE_SPINDLE_USE, None) + ispec[constants.ISPEC_CPU_COUNT] = self.be_new.get(constants.BE_VCPUS, + None) + + # Copy ispec to verify parameters with min/max values separately + ispec_max = ispec.copy() + ispec_max[constants.ISPEC_MEM_SIZE] = \ + self.be_new.get(constants.BE_MAXMEM, None) + res_max = _ComputeIPolicyInstanceSpecViolation(ipolicy, ispec_max) + ispec_min = ispec.copy() + ispec_min[constants.ISPEC_MEM_SIZE] = \ + self.be_new.get(constants.BE_MINMEM, None) + res_min = _ComputeIPolicyInstanceSpecViolation(ipolicy, ispec_min) + + if (res_max or res_min): + # FIXME: Improve error message by including information about whether + # the upper or lower limit of the parameter fails the ipolicy. + msg = ("Instance allocation to group %s (%s) violates policy: %s" % + (group_info, group_info.name, + utils.CommaJoin(set(res_max + res_min)))) + raise errors.OpPrereqError(msg, errors.ECODE_INVAL) def _ConvertPlainToDrbd(self, feedback_fn): """Converts an instance from plain to drbd. @@ -13137,15 +13889,18 @@ class LUInstanceSetParams(LogicalUnit): self.diskparams) anno_disks = rpc.AnnotateDiskParams(constants.DT_DRBD8, new_disks, self.diskparams) + p_excl_stor = _IsExclusiveStorageEnabledNodeName(self.cfg, pnode) + s_excl_stor = _IsExclusiveStorageEnabledNodeName(self.cfg, snode) info = _GetInstanceInfoText(instance) feedback_fn("Creating additional volumes...") # first, create the missing data and meta devices for disk in anno_disks: # unfortunately this is... not too nice _CreateSingleBlockDev(self, pnode, instance, disk.children[1], - info, True) + info, True, p_excl_stor) for child in disk.children: - _CreateSingleBlockDev(self, snode, instance, child, info, True) + _CreateSingleBlockDev(self, snode, instance, child, info, True, + s_excl_stor) # at this stage, all new LVs have been created, we can rename the # old ones feedback_fn("Renaming original volumes...") @@ -13157,9 +13912,10 @@ class LUInstanceSetParams(LogicalUnit): feedback_fn("Initializing DRBD devices...") # all child devices are in place, we can now create the DRBD devices for disk in anno_disks: - for node in [pnode, snode]: + for (node, excl_stor) in [(pnode, p_excl_stor), (snode, s_excl_stor)]: f_create = node == pnode - _CreateSingleBlockDev(self, node, instance, disk, info, f_create) + _CreateSingleBlockDev(self, node, instance, disk, info, f_create, + excl_stor) # at this point, the instance has been modified instance.disk_template = constants.DT_DRBD8 @@ -13301,13 +14057,16 @@ class LUInstanceSetParams(LogicalUnit): """ mac = params[constants.INIC_MAC] ip = params.get(constants.INIC_IP, None) - nicparams = private.params + net = params.get(constants.INIC_NETWORK, None) + #TODO: not private.filled?? can a nic have no nicparams?? + nicparams = private.filled - return (objects.NIC(mac=mac, ip=ip, nicparams=nicparams), [ + return (objects.NIC(mac=mac, ip=ip, network=net, nicparams=nicparams), [ ("nic.%d" % idx, - "add:mac=%s,ip=%s,mode=%s,link=%s" % + "add:mac=%s,ip=%s,mode=%s,link=%s,network=%s" % (mac, ip, private.filled[constants.NIC_MODE], - private.filled[constants.NIC_LINK])), + private.filled[constants.NIC_LINK], + net)), ]) @staticmethod @@ -13317,15 +14076,15 @@ class LUInstanceSetParams(LogicalUnit): """ changes = [] - for key in [constants.INIC_MAC, constants.INIC_IP]: + for key in [constants.INIC_MAC, constants.INIC_IP, constants.INIC_NETWORK]: if key in params: changes.append(("nic.%s/%d" % (key, idx), params[key])) setattr(nic, key, params[key]) - if private.params: - nic.nicparams = private.params + if private.filled: + nic.nicparams = private.filled - for (key, val) in params.items(): + for (key, val) in nic.nicparams.items(): changes.append(("nic.%s/%d" % (key, idx), val)) return changes @@ -13433,7 +14192,7 @@ class LUInstanceSetParams(LogicalUnit): self.cfg.MarkInstanceDown(instance.name) result.append(("admin_state", constants.ADMINST_DOWN)) - self.cfg.Update(instance, feedback_fn) + self.cfg.Update(instance, feedback_fn, self.proc.GetECId()) assert not (self.owned_locks(locking.LEVEL_NODE_RES) or self.owned_locks(locking.LEVEL_NODE)), \ @@ -13454,9 +14213,11 @@ class LUInstanceChangeGroup(LogicalUnit): def ExpandNames(self): self.share_locks = _ShareAll() + self.needed_locks = { locking.LEVEL_NODEGROUP: [], locking.LEVEL_NODE: [], + locking.LEVEL_NODE_ALLOC: locking.ALL_SET, } self._ExpandAndLockInstance() @@ -13639,6 +14400,9 @@ class _ExportQuery(_QueryBase): locking.LEVEL_NODE: self.wanted, } + if not self.names: + lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET + def DeclareLocks(self, lu, level): pass @@ -13756,6 +14520,11 @@ class LUBackupExport(LogicalUnit): # - removing the removal operation altogether self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET + # Allocations should be stopped while this LU runs with node locks, but + # it doesn't have to be exclusive + self.share_locks[locking.LEVEL_NODE_ALLOC] = 1 + self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET + def DeclareLocks(self, level): """Last minute lock declaration.""" # All nodes are locked anyway, so nothing to do here. @@ -14022,11 +14791,19 @@ class LUBackupRemove(NoHooksLU): REQ_BGL = False def ExpandNames(self): - self.needed_locks = {} - # We need all nodes to be locked in order for RemoveExport to work, but we - # don't need to lock the instance itself, as nothing will happen to it (and - # we can remove exports also for a removed instance) - self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET + self.needed_locks = { + # We need all nodes to be locked in order for RemoveExport to work, but + # we don't need to lock the instance itself, as nothing will happen to it + # (and we can remove exports also for a removed instance) + locking.LEVEL_NODE: locking.ALL_SET, + + # Removing backups is quick, so blocking allocations is justified + locking.LEVEL_NODE_ALLOC: locking.ALL_SET, + } + + # Allocations should be stopped while this LU runs with node locks, but it + # doesn't have to be exclusive + self.share_locks[locking.LEVEL_NODE_ALLOC] = 1 def Exec(self, feedback_fn): """Remove any export. @@ -14866,6 +15643,10 @@ class TagsLU(NoHooksLU): # pylint: disable=W0223 self.group_uuid = self.cfg.LookupNodeGroup(self.op.name) lock_level = locking.LEVEL_NODEGROUP lock_name = self.group_uuid + elif self.op.kind == constants.TAG_NETWORK: + self.network_uuid = self.cfg.LookupNetwork(self.op.name) + lock_level = locking.LEVEL_NETWORK + lock_name = self.network_uuid else: lock_level = None lock_name = None @@ -14888,6 +15669,8 @@ class TagsLU(NoHooksLU): # pylint: disable=W0223 self.target = self.cfg.GetInstanceInfo(self.op.name) elif self.op.kind == constants.TAG_NODEGROUP: self.target = self.cfg.GetNodeGroup(self.group_uuid) + elif self.op.kind == constants.TAG_NETWORK: + self.target = self.cfg.GetNetwork(self.network_uuid) else: raise errors.OpPrereqError("Wrong tag type requested (%s)" % str(self.op.kind), errors.ECODE_INVAL) @@ -15059,7 +15842,7 @@ class LUTestDelay(NoHooksLU): else: top_value = self.op.repeat - 1 for i in range(self.op.repeat): - self.LogInfo("Test delay iteration %d/%d" % (i, top_value)) + self.LogInfo("Test delay iteration %d/%d", i, top_value) self._TestDelay() @@ -15300,7 +16083,7 @@ class LUTestAllocator(NoHooksLU): self.op.mode, errors.ECODE_INVAL) if self.op.direction == constants.IALLOCATOR_DIR_OUT: - if self.op.allocator is None: + if self.op.iallocator is None: raise errors.OpPrereqError("Missing allocator name", errors.ECODE_INVAL) elif self.op.direction != constants.IALLOCATOR_DIR_IN: @@ -15353,11 +16136,11 @@ class LUTestAllocator(NoHooksLU): if self.op.direction == constants.IALLOCATOR_DIR_IN: result = ial.in_text else: - ial.Run(self.op.allocator, validate=False) + ial.Run(self.op.iallocator, validate=False) result = ial.out_text return result -# Network LUs + class LUNetworkAdd(LogicalUnit): """Logical unit for creating networks. @@ -15373,18 +16156,27 @@ class LUNetworkAdd(LogicalUnit): mn = self.cfg.GetMasterNode() return ([mn], [mn]) + def CheckArguments(self): + if self.op.mac_prefix: + self.op.mac_prefix = \ + utils.NormalizeAndValidateThreeOctetMacPrefix(self.op.mac_prefix) + def ExpandNames(self): self.network_uuid = self.cfg.GenerateUniqueID(self.proc.GetECId()) - self.needed_locks = {} - self.add_locks[locking.LEVEL_NETWORK] = self.network_uuid - def CheckPrereq(self): - """Check prerequisites. + if self.op.conflicts_check: + self.share_locks[locking.LEVEL_NODE] = 1 + self.share_locks[locking.LEVEL_NODE_ALLOC] = 1 + self.needed_locks = { + locking.LEVEL_NODE: locking.ALL_SET, + locking.LEVEL_NODE_ALLOC: locking.ALL_SET, + } + else: + self.needed_locks = {} - This checks that the given group name is not an existing node group - already. + self.add_locks[locking.LEVEL_NETWORK] = self.network_uuid - """ + def CheckPrereq(self): if self.op.network is None: raise errors.OpPrereqError("Network must be given", errors.ECODE_INVAL) @@ -15392,24 +16184,28 @@ class LUNetworkAdd(LogicalUnit): uuid = self.cfg.LookupNetwork(self.op.network_name) if uuid: - raise errors.OpPrereqError("Network '%s' already defined" % - self.op.network, errors.ECODE_EXISTS) + raise errors.OpPrereqError(("Network with name '%s' already exists" % + self.op.network_name), errors.ECODE_EXISTS) + # Check tag validity + for tag in self.op.tags: + objects.TaggableObject.ValidateTag(tag) def BuildHooksEnv(self): """Build hooks env. """ - env = { - "NETWORK_NAME": self.op.network_name, - "NETWORK_SUBNET": self.op.network, - "NETWORK_GATEWAY": self.op.gateway, - "NETWORK_SUBNET6": self.op.network6, - "NETWORK_GATEWAY6": self.op.gateway6, - "NETWORK_MAC_PREFIX": self.op.mac_prefix, - "NETWORK_TYPE": self.op.network_type, + args = { + "name": self.op.network_name, + "subnet": self.op.network, + "gateway": self.op.gateway, + "network6": self.op.network6, + "gateway6": self.op.gateway6, + "mac_prefix": self.op.mac_prefix, + "network_type": self.op.network_type, + "tags": self.op.tags, } - return env + return _BuildNetworkHookEnv(**args) # pylint: disable=W0142 def Exec(self, feedback_fn): """Add the ip pool to the cluster. @@ -15423,31 +16219,36 @@ class LUNetworkAdd(LogicalUnit): mac_prefix=self.op.mac_prefix, network_type=self.op.network_type, uuid=self.network_uuid, - family=4) + family=constants.IP4_VERSION) # Initialize the associated address pool try: pool = network.AddressPool.InitializeNetwork(nobj) except errors.AddressPoolError, e: - raise errors.OpExecError("Cannot create IP pool for this network. %s" % e) + raise errors.OpExecError("Cannot create IP pool for this network: %s" % e) # Check if we need to reserve the nodes and the cluster master IP # These may not be allocated to any instances in routed mode, as # they wouldn't function anyway. - for node in self.cfg.GetAllNodesInfo().values(): - for ip in [node.primary_ip, node.secondary_ip]: - try: - pool.Reserve(ip) - self.LogInfo("Reserved node %s's IP (%s)", node.name, ip) - - except errors.AddressPoolError: - pass - - master_ip = self.cfg.GetClusterInfo().master_ip - try: - pool.Reserve(master_ip) - self.LogInfo("Reserved cluster master IP (%s)", master_ip) - except errors.AddressPoolError: - pass + if self.op.conflicts_check: + for node in self.cfg.GetAllNodesInfo().values(): + for ip in [node.primary_ip, node.secondary_ip]: + try: + if pool.Contains(ip): + pool.Reserve(ip) + self.LogInfo("Reserved IP address of node '%s' (%s)", + node.name, ip) + except errors.AddressPoolError: + self.LogWarning("Cannot reserve IP address of node '%s' (%s)", + node.name, ip) + + master_ip = self.cfg.GetClusterInfo().master_ip + try: + if pool.Contains(master_ip): + pool.Reserve(master_ip) + self.LogInfo("Reserved cluster master IP address (%s)", master_ip) + except errors.AddressPoolError: + self.LogWarning("Cannot reserve cluster master IP address (%s)", + master_ip) if self.op.add_reserved_ips: for ip in self.op.add_reserved_ips: @@ -15456,6 +16257,10 @@ class LUNetworkAdd(LogicalUnit): except errors.AddressPoolError, e: raise errors.OpExecError("Cannot reserve IP %s. %s " % (ip, e)) + if self.op.tags: + for tag in self.op.tags: + nobj.AddTag(tag) + self.cfg.AddNetwork(nobj, self.proc.GetECId(), check_uuid=False) del self.remove_locks[locking.LEVEL_NETWORK] @@ -15468,11 +16273,16 @@ class LUNetworkRemove(LogicalUnit): def ExpandNames(self): self.network_uuid = self.cfg.LookupNetwork(self.op.network_name) + if not self.network_uuid: + raise errors.OpPrereqError(("Network '%s' not found" % + self.op.network_name), errors.ECODE_NOENT) + + self.share_locks[locking.LEVEL_NODEGROUP] = 1 self.needed_locks = { locking.LEVEL_NETWORK: [self.network_uuid], + locking.LEVEL_NODEGROUP: locking.ALL_SET, } - def CheckPrereq(self): """Check prerequisites. @@ -15481,22 +16291,17 @@ class LUNetworkRemove(LogicalUnit): cluster. """ - if not self.network_uuid: - raise errors.OpPrereqError("Network %s not found" % self.op.network_name, - errors.ECODE_INVAL) - # Verify that the network is not conncted. node_groups = [group.name for group in self.cfg.GetAllNodeGroupsInfo().values() - for network in group.networks.keys() - if network == self.network_uuid] + if self.network_uuid in group.networks] if node_groups: - self.LogWarning("Nework '%s' is connected to the following" - " node groups: %s" % (self.op.network_name, - utils.CommaJoin(utils.NiceSort(node_groups)))) - raise errors.OpPrereqError("Network still connected", - errors.ECODE_STATE) + self.LogWarning("Network '%s' is connected to the following" + " node groups: %s" % + (self.op.network_name, + utils.CommaJoin(utils.NiceSort(node_groups)))) + raise errors.OpPrereqError("Network still connected", errors.ECODE_STATE) def BuildHooksEnv(self): """Build hooks env. @@ -15538,29 +16343,27 @@ class LUNetworkSetParams(LogicalUnit): raise errors.OpPrereqError("Cannot modify gateway and reserved ips" " at once", errors.ECODE_INVAL) - def ExpandNames(self): self.network_uuid = self.cfg.LookupNetwork(self.op.network_name) - self.network = self.cfg.GetNetwork(self.network_uuid) + if self.network_uuid is None: + raise errors.OpPrereqError(("Network '%s' not found" % + self.op.network_name), errors.ECODE_NOENT) + self.needed_locks = { locking.LEVEL_NETWORK: [self.network_uuid], } - - if self.network is None: - raise errors.OpPrereqError("Could not retrieve network '%s' (UUID: %s)" % - (self.op.network_name, self.network_uuid), - errors.ECODE_INVAL) - def CheckPrereq(self): """Check prerequisites. """ + self.network = self.cfg.GetNetwork(self.network_uuid) self.gateway = self.network.gateway self.network_type = self.network.network_type self.mac_prefix = self.network.mac_prefix self.network6 = self.network.network6 self.gateway6 = self.network.gateway6 + self.tags = self.network.tags self.pool = network.AddressPool(self.network) @@ -15570,8 +16373,9 @@ class LUNetworkSetParams(LogicalUnit): else: self.gateway = self.op.gateway if self.pool.IsReserved(self.gateway): - raise errors.OpPrereqError("%s is already reserved" % - self.gateway, errors.ECODE_INVAL) + raise errors.OpPrereqError("Gateway IP address '%s' is already" + " reserved" % self.gateway, + errors.ECODE_STATE) if self.op.network_type: if self.op.network_type == constants.VALUE_NONE: @@ -15583,7 +16387,8 @@ class LUNetworkSetParams(LogicalUnit): if self.op.mac_prefix == constants.VALUE_NONE: self.mac_prefix = None else: - self.mac_prefix = self.op.mac_prefix + self.mac_prefix = \ + utils.NormalizeAndValidateThreeOctetMacPrefix(self.op.mac_prefix) if self.op.gateway6: if self.op.gateway6 == constants.VALUE_NONE: @@ -15597,22 +16402,21 @@ class LUNetworkSetParams(LogicalUnit): else: self.network6 = self.op.network6 - - def BuildHooksEnv(self): """Build hooks env. """ - env = { - "NETWORK_NAME": self.op.network_name, - "NETWORK_SUBNET": self.network.network, - "NETWORK_GATEWAY": self.gateway, - "NETWORK_SUBNET6": self.network6, - "NETWORK_GATEWAY6": self.gateway6, - "NETWORK_MAC_PREFIX": self.mac_prefix, - "NETWORK_TYPE": self.network_type, + args = { + "name": self.op.network_name, + "subnet": self.network.network, + "gateway": self.gateway, + "network6": self.network6, + "gateway6": self.gateway6, + "mac_prefix": self.mac_prefix, + "network_type": self.network_type, + "tags": self.tags, } - return env + return _BuildNetworkHookEnv(**args) # pylint: disable=W0142 def BuildHooksNodes(self): """Build hooks nodes. @@ -15629,7 +16433,7 @@ class LUNetworkSetParams(LogicalUnit): # extend cfg.ReserveIp/ReleaseIp with the external flag if self.op.gateway: if self.gateway == self.network.gateway: - self.LogWarning("Gateway is already %s" % self.gateway) + self.LogWarning("Gateway is already %s", self.gateway) else: if self.gateway: self.pool.Reserve(self.gateway, external=True) @@ -15641,11 +16445,11 @@ class LUNetworkSetParams(LogicalUnit): for ip in self.op.add_reserved_ips: try: if self.pool.IsReserved(ip): - self.LogWarning("IP %s is already reserved" % ip) + self.LogWarning("IP address %s is already reserved", ip) else: self.pool.Reserve(ip, external=True) - except errors.AddressPoolError, e: - self.LogWarning("Cannot reserve ip %s. %s" % (ip, e)) + except errors.AddressPoolError, err: + self.LogWarning("Cannot reserve IP address %s: %s", ip, err) if self.op.remove_reserved_ips: for ip in self.op.remove_reserved_ips: @@ -15654,11 +16458,11 @@ class LUNetworkSetParams(LogicalUnit): continue try: if not self.pool.IsReserved(ip): - self.LogWarning("IP %s is already unreserved" % ip) + self.LogWarning("IP address %s is already unreserved", ip) else: self.pool.Release(ip, external=True) - except errors.AddressPoolError, e: - self.LogWarning("Cannot release ip %s. %s" % (ip, e)) + except errors.AddressPoolError, err: + self.LogWarning("Cannot release IP address %s: %s", ip, err) if self.op.mac_prefix: self.network.mac_prefix = self.mac_prefix @@ -15682,23 +16486,19 @@ class _NetworkQuery(_QueryBase): def ExpandNames(self, lu): lu.needed_locks = {} + lu.share_locks = _ShareAll() - self._all_networks = lu.cfg.GetAllNetworksInfo() - name_to_uuid = dict((n.name, n.uuid) for n in self._all_networks.values()) + self.do_locking = self.use_locking - if not self.names: - self.wanted = [name_to_uuid[name] - for name in utils.NiceSort(name_to_uuid.keys())] - else: - # Accept names to be either names or UUIDs. + all_networks = lu.cfg.GetAllNetworksInfo() + name_to_uuid = dict((n.name, n.uuid) for n in all_networks.values()) + + if self.names: missing = [] self.wanted = [] - all_uuid = frozenset(self._all_networks.keys()) for name in self.names: - if name in all_uuid: - self.wanted.append(name) - elif name in name_to_uuid: + if name in name_to_uuid: self.wanted.append(name_to_uuid[name]) else: missing.append(name) @@ -15706,6 +16506,15 @@ class _NetworkQuery(_QueryBase): if missing: raise errors.OpPrereqError("Some networks do not exist: %s" % missing, errors.ECODE_NOENT) + else: + self.wanted = locking.ALL_SET + + if self.do_locking: + lu.needed_locks[locking.LEVEL_NETWORK] = self.wanted + if query.NETQ_INST in self.requested_data: + lu.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET + if query.NETQ_GROUP in self.requested_data: + lu.needed_locks[locking.LEVEL_NODEGROUP] = locking.ALL_SET def DeclareLocks(self, lu, level): pass @@ -15714,67 +16523,70 @@ class _NetworkQuery(_QueryBase): """Computes the list of networks and their attributes. """ + all_networks = lu.cfg.GetAllNetworksInfo() + + network_uuids = self._GetNames(lu, all_networks.keys(), + locking.LEVEL_NETWORK) + + name_to_uuid = dict((n.name, n.uuid) for n in all_networks.values()) + do_instances = query.NETQ_INST in self.requested_data - do_groups = do_instances or (query.NETQ_GROUP in self.requested_data) - do_stats = query.NETQ_STATS in self.requested_data - cluster = lu.cfg.GetClusterInfo() + do_groups = query.NETQ_GROUP in self.requested_data - network_to_groups = None network_to_instances = None - stats = None + network_to_groups = None # For NETQ_GROUP, we need to map network->[groups] if do_groups: all_groups = lu.cfg.GetAllNodeGroupsInfo() - network_to_groups = dict((uuid, []) for uuid in self.wanted) - default_nicpp = cluster.nicparams[constants.PP_DEFAULT] + network_to_groups = dict((uuid, []) for uuid in network_uuids) + for _, group in all_groups.iteritems(): + for net_uuid in network_uuids: + netparams = group.networks.get(net_uuid, None) + if netparams: + info = (group.name, netparams[constants.NIC_MODE], + netparams[constants.NIC_LINK]) - if do_instances: - all_instances = lu.cfg.GetAllInstancesInfo() - all_nodes = lu.cfg.GetAllNodesInfo() - network_to_instances = dict((uuid, []) for uuid in self.wanted) - - - for group in all_groups.values(): - if do_instances: - group_nodes = [node.name for node in all_nodes.values() if - node.group == group.uuid] - group_instances = [instance for instance in all_instances.values() - if instance.primary_node in group_nodes] - - for net_uuid in group.networks.keys(): - if net_uuid in network_to_groups: - netparams = group.networks[net_uuid] - mode = netparams[constants.NIC_MODE] - link = netparams[constants.NIC_LINK] - info = group.name + '(' + mode + ', ' + link + ')' network_to_groups[net_uuid].append(info) - if do_instances: - for instance in group_instances: - for nic in instance.nics: - if nic.network == self._all_networks[net_uuid].name: - network_to_instances[net_uuid].append(instance.name) - break - - if do_stats: - stats = {} - for uuid, net in self._all_networks.items(): - if uuid in self.wanted: - pool = network.AddressPool(net) - stats[uuid] = { - "free_count": pool.GetFreeCount(), - "reserved_count": pool.GetReservedCount(), - "map": pool.GetMap(), - "external_reservations": ", ".join(pool.GetExternalReservations()), - } - - return query.NetworkQueryData([self._all_networks[uuid] - for uuid in self.wanted], + if do_instances: + all_instances = lu.cfg.GetAllInstancesInfo() + network_to_instances = dict((uuid, []) for uuid in network_uuids) + for instance in all_instances.values(): + for nic in instance.nics: + if nic.network: + net_uuid = name_to_uuid[nic.network] + if net_uuid in network_uuids: + network_to_instances[net_uuid].append(instance.name) + break + + if query.NETQ_STATS in self.requested_data: + stats = \ + dict((uuid, + self._GetStats(network.AddressPool(all_networks[uuid]))) + for uuid in network_uuids) + else: + stats = None + + return query.NetworkQueryData([all_networks[uuid] + for uuid in network_uuids], network_to_groups, network_to_instances, stats) + @staticmethod + def _GetStats(pool): + """Returns statistics for a network address pool. + + """ + return { + "free_count": pool.GetFreeCount(), + "reserved_count": pool.GetReservedCount(), + "map": pool.GetMap(), + "external_reservations": + utils.CommaJoin(pool.GetExternalReservations()), + } + class LUNetworkQuery(NoHooksLU): """Logical unit for querying networks. @@ -15784,7 +16596,7 @@ class LUNetworkQuery(NoHooksLU): def CheckArguments(self): self.nq = _NetworkQuery(qlang.MakeSimpleFilter("name", self.op.names), - self.op.output_fields, False) + self.op.output_fields, self.op.use_locking) def ExpandNames(self): self.nq.ExpandNames(self) @@ -15793,7 +16605,6 @@ class LUNetworkQuery(NoHooksLU): return self.nq.OldStyleQuery(self) - class LUNetworkConnect(LogicalUnit): """Connect a network to a nodegroup @@ -15809,9 +16620,14 @@ class LUNetworkConnect(LogicalUnit): self.network_link = self.op.network_link self.network_uuid = self.cfg.LookupNetwork(self.network_name) - self.network = self.cfg.GetNetwork(self.network_uuid) + if self.network_uuid is None: + raise errors.OpPrereqError("Network '%s' does not exist" % + self.network_name, errors.ECODE_NOENT) + self.group_uuid = self.cfg.LookupNodeGroup(self.group_name) - self.group = self.cfg.GetNodeGroup(self.group_uuid) + if self.group_uuid is None: + raise errors.OpPrereqError("Group '%s' does not exist" % + self.group_name, errors.ECODE_NOENT) self.needed_locks = { locking.LEVEL_INSTANCE: [], @@ -15819,41 +16635,44 @@ class LUNetworkConnect(LogicalUnit): } self.share_locks[locking.LEVEL_INSTANCE] = 1 + if self.op.conflicts_check: + self.needed_locks[locking.LEVEL_NETWORK] = [self.network_uuid] + self.share_locks[locking.LEVEL_NETWORK] = 1 + def DeclareLocks(self, level): if level == locking.LEVEL_INSTANCE: assert not self.needed_locks[locking.LEVEL_INSTANCE] # Lock instances optimistically, needs verification once group lock has # been acquired - self.needed_locks[locking.LEVEL_INSTANCE] = \ - self.cfg.GetNodeGroupInstances(self.group_uuid) + if self.op.conflicts_check: + self.needed_locks[locking.LEVEL_INSTANCE] = \ + self.cfg.GetNodeGroupInstances(self.group_uuid) def BuildHooksEnv(self): - ret = dict() - ret["GROUP_NAME"] = self.group_name - ret["GROUP_NETWORK_NAME"] = self.network_name - ret["GROUP_NETWORK_MODE"] = self.network_mode - ret["GROUP_NETWORK_LINK"] = self.network_link + ret = { + "GROUP_NAME": self.group_name, + "GROUP_NETWORK_MODE": self.network_mode, + "GROUP_NETWORK_LINK": self.network_link, + } return ret def BuildHooksNodes(self): nodes = self.cfg.GetNodeGroup(self.group_uuid).members return (nodes, nodes) - def CheckPrereq(self): - l = lambda value: ", ".join("%s: %s/%s" % (i[0], i[1], i[2]) - for i in value) + owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP)) - if self.network is None: - raise errors.OpPrereqError("Network %s does not exist" % - self.network_name, errors.ECODE_INVAL) + assert self.group_uuid in owned_groups - self.netparams = dict() - self.netparams[constants.NIC_MODE] = self.network_mode - self.netparams[constants.NIC_LINK] = self.network_link + self.netparams = { + constants.NIC_MODE: self.network_mode, + constants.NIC_LINK: self.network_link, + } objects.NIC.CheckParameterSyntax(self.netparams) + self.group = self.cfg.GetNodeGroup(self.group_uuid) #if self.network_mode == constants.NIC_MODE_BRIDGED: # _CheckNodeGroupBridgesExist(self, self.network_link, self.group_uuid) self.connected = False @@ -15863,24 +16682,11 @@ class LUNetworkConnect(LogicalUnit): self.connected = True return - pool = network.AddressPool(self.network) if self.op.conflicts_check: - groupinstances = [] - for n in self.cfg.GetNodeGroupInstances(self.group_uuid): - groupinstances.append(self.cfg.GetInstanceInfo(n)) - instances = [(instance.name, idx, nic.ip) - for instance in groupinstances - for idx, nic in enumerate(instance.nics) - if (not nic.network and pool._Contains(nic.ip))] - if instances: - self.LogWarning("Following occurences use IPs from network %s" - " that is about to connect to nodegroup %s: %s" % - (self.network_name, self.group.name, - l(instances))) - raise errors.OpPrereqError("Conflicting IPs found." - " Please remove/modify" - " corresponding NICs", - errors.ECODE_INVAL) + pool = network.AddressPool(self.cfg.GetNetwork(self.network_uuid)) + + _NetworkConflictCheck(self, lambda nic: pool.Contains(nic.ip), + "connect to") def Exec(self, feedback_fn): if self.connected: @@ -15890,6 +16696,53 @@ class LUNetworkConnect(LogicalUnit): self.cfg.Update(self.group, feedback_fn) +def _NetworkConflictCheck(lu, check_fn, action): + """Checks for network interface conflicts with a network. + + @type lu: L{LogicalUnit} + @type check_fn: callable receiving one parameter (L{objects.NIC}) and + returning boolean + @param check_fn: Function checking for conflict + @type action: string + @param action: Part of error message (see code) + @raise errors.OpPrereqError: If conflicting IP addresses are found. + + """ + # Check if locked instances are still correct + owned_instances = frozenset(lu.owned_locks(locking.LEVEL_INSTANCE)) + _CheckNodeGroupInstances(lu.cfg, lu.group_uuid, owned_instances) + + conflicts = [] + + for (_, instance) in lu.cfg.GetMultiInstanceInfo(owned_instances): + instconflicts = [(idx, nic.ip) + for (idx, nic) in enumerate(instance.nics) + if check_fn(nic)] + + if instconflicts: + conflicts.append((instance.name, instconflicts)) + + if conflicts: + lu.LogWarning("IP addresses from network '%s', which is about to %s" + " node group '%s', are in use: %s" % + (lu.network_name, action, lu.group.name, + utils.CommaJoin(("%s: %s" % + (name, _FmtNetworkConflict(details))) + for (name, details) in conflicts))) + + raise errors.OpPrereqError("Conflicting IP addresses found; " + " remove/modify the corresponding network" + " interfaces", errors.ECODE_STATE) + + +def _FmtNetworkConflict(details): + """Utility for L{_NetworkConflictCheck}. + + """ + return utils.CommaJoin("nic%s/%s" % (idx, ipaddr) + for (idx, ipaddr) in details) + + class LUNetworkDisconnect(LogicalUnit): """Disconnect a network to a nodegroup @@ -15903,9 +16756,14 @@ class LUNetworkDisconnect(LogicalUnit): self.group_name = self.op.group_name self.network_uuid = self.cfg.LookupNetwork(self.network_name) - self.network = self.cfg.GetNetwork(self.network_uuid) + if self.network_uuid is None: + raise errors.OpPrereqError("Network '%s' does not exist" % + self.network_name, errors.ECODE_NOENT) + self.group_uuid = self.cfg.LookupNodeGroup(self.group_name) - self.group = self.cfg.GetNodeGroup(self.group_uuid) + if self.group_uuid is None: + raise errors.OpPrereqError("Group '%s' does not exist" % + self.group_name, errors.ECODE_NOENT) self.needed_locks = { locking.LEVEL_INSTANCE: [], @@ -15919,50 +16777,36 @@ class LUNetworkDisconnect(LogicalUnit): # Lock instances optimistically, needs verification once group lock has # been acquired - self.needed_locks[locking.LEVEL_INSTANCE] = \ + if self.op.conflicts_check: + self.needed_locks[locking.LEVEL_INSTANCE] = \ self.cfg.GetNodeGroupInstances(self.group_uuid) def BuildHooksEnv(self): - ret = dict() - ret["GROUP_NAME"] = self.group_name - ret["GROUP_NETWORK_NAME"] = self.network_name + ret = { + "GROUP_NAME": self.group_name, + } return ret def BuildHooksNodes(self): nodes = self.cfg.GetNodeGroup(self.group_uuid).members return (nodes, nodes) - def CheckPrereq(self): - l = lambda value: ", ".join("%s: %s/%s" % (i[0], i[1], i[2]) - for i in value) + owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP)) + + assert self.group_uuid in owned_groups + self.group = self.cfg.GetNodeGroup(self.group_uuid) self.connected = True if self.network_uuid not in self.group.networks: - self.LogWarning("Network '%s' is" - " not mapped to group '%s'" % - (self.network_name, self.group.name)) + self.LogWarning("Network '%s' is not mapped to group '%s'", + self.network_name, self.group.name) self.connected = False return if self.op.conflicts_check: - groupinstances = [] - for n in self.cfg.GetNodeGroupInstances(self.group_uuid): - groupinstances.append(self.cfg.GetInstanceInfo(n)) - instances = [(instance.name, idx, nic.ip) - for instance in groupinstances - for idx, nic in enumerate(instance.nics) - if nic.network == self.network_name] - if instances: - self.LogWarning("Following occurences use IPs from network %s" - " that is about to disconnected from the nodegroup" - " %s: %s" % - (self.network_name, self.group.name, - l(instances))) - raise errors.OpPrereqError("Conflicting IPs." - " Please remove/modify" - " corresponding NICS", - errors.ECODE_INVAL) + _NetworkConflictCheck(self, lambda nic: nic.network == self.network_name, + "disconnect from") def Exec(self, feedback_fn): if not self.connected: @@ -15980,6 +16824,7 @@ _QUERY_IMPL = { constants.QR_GROUP: _GroupQuery, constants.QR_NETWORK: _NetworkQuery, constants.QR_OS: _OsQuery, + constants.QR_EXTSTORAGE: _ExtStorageQuery, constants.QR_EXPORT: _ExportQuery, } @@ -15998,19 +16843,20 @@ def _GetQueryImplementation(name): raise errors.OpPrereqError("Unknown query resource '%s'" % name, errors.ECODE_INVAL) + def _CheckForConflictingIp(lu, ip, node): - """In case of conflicting ip raise error. + """In case of conflicting IP address raise error. @type ip: string - @param ip: ip address + @param ip: IP address @type node: string @param node: node name """ - (conf_net, conf_netparams) = lu.cfg.CheckIPInNodeGroup(ip, node) + (conf_net, _) = lu.cfg.CheckIPInNodeGroup(ip, node) if conf_net is not None: - raise errors.OpPrereqError("Conflicting IP found:" - " %s <> %s." % (ip, conf_net), - errors.ECODE_INVAL) + raise errors.OpPrereqError(("Conflicting IP address found: '%s' != '%s'" % + (ip, conf_net)), + errors.ECODE_STATE) return (None, None)