X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/33e2603829c43c0849c1eb3d71e77f8dc3496e42..dc7d923e222361ed176abd841f47f62900d4ac65:/lib/cmdlib.py diff --git a/lib/cmdlib.py b/lib/cmdlib.py index 71881c9..7b6d571 100644 --- a/lib/cmdlib.py +++ b/lib/cmdlib.py @@ -60,6 +60,8 @@ from ganeti import ht from ganeti import rpc from ganeti import runtime from ganeti import pathutils +from ganeti import vcluster +from ganeti import network from ganeti.masterd import iallocator import ganeti.masterd.instance # pylint: disable=W0611 @@ -136,13 +138,18 @@ class LogicalUnit(object): self.owned_locks = context.glm.list_owned self.context = context self.rpc = rpc_runner - # Dicts used to declare locking needs to mcpu + + # Dictionaries used to declare locking needs to mcpu self.needed_locks = None self.share_locks = dict.fromkeys(locking.LEVELS, 0) + self.opportunistic_locks = dict.fromkeys(locking.LEVELS, False) + self.add_locks = {} self.remove_locks = {} + # Used to force good behavior when calling helper functions self.recalculate_locks = {} + # logging self.Log = processor.Log # pylint: disable=C0103 self.LogWarning = processor.LogWarning # pylint: disable=C0103 @@ -690,6 +697,18 @@ def _SupportsOob(cfg, node): return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM] +def _CopyLockList(names): + """Makes a copy of a list of lock names. + + Handles L{locking.ALL_SET} correctly. + + """ + if names == locking.ALL_SET: + return locking.ALL_SET + else: + return names[:] + + def _GetWantedNodes(lu, nodes): """Returns list of checked and expanded node names. @@ -945,7 +964,8 @@ def _RunPostHook(lu, node_name): try: hm.RunPhase(constants.HOOKS_PHASE_POST, nodes=[node_name]) except Exception, err: # pylint: disable=W0703 - lu.LogWarning("Errors occurred running hooks on %s: %s" % (node_name, err)) + lu.LogWarning("Errors occurred running hooks on %s: %s", + node_name, err) def _CheckOutputFields(static, dynamic, selected): @@ -1086,7 +1106,8 @@ def _CheckInstanceState(lu, instance, req_states, msg=None): """ if msg is None: - msg = "can't use instance from outside %s states" % ", ".join(req_states) + msg = ("can't use instance from outside %s states" % + utils.CommaJoin(req_states)) if instance.admin_state not in req_states: raise errors.OpPrereqError("Instance '%s' is marked to be %s, %s" % (instance.name, instance.admin_state, msg), @@ -1301,6 +1322,72 @@ def _ExpandInstanceName(cfg, name): return _ExpandItemName(cfg.ExpandInstanceName, name, "Instance") +def _BuildNetworkHookEnv(name, subnet, gateway, network6, gateway6, + network_type, mac_prefix, tags): + """Builds network related env variables for hooks + + This builds the hook environment from individual variables. + + @type name: string + @param name: the name of the network + @type subnet: string + @param subnet: the ipv4 subnet + @type gateway: string + @param gateway: the ipv4 gateway + @type network6: string + @param network6: the ipv6 subnet + @type gateway6: string + @param gateway6: the ipv6 gateway + @type network_type: string + @param network_type: the type of the network + @type mac_prefix: string + @param mac_prefix: the mac_prefix + @type tags: list + @param tags: the tags of the network + + """ + env = {} + if name: + env["NETWORK_NAME"] = name + if subnet: + env["NETWORK_SUBNET"] = subnet + if gateway: + env["NETWORK_GATEWAY"] = gateway + if network6: + env["NETWORK_SUBNET6"] = network6 + if gateway6: + env["NETWORK_GATEWAY6"] = gateway6 + if mac_prefix: + env["NETWORK_MAC_PREFIX"] = mac_prefix + if network_type: + env["NETWORK_TYPE"] = network_type + if tags: + env["NETWORK_TAGS"] = " ".join(tags) + + return env + + +def _BuildNetworkHookEnvByObject(net): + """Builds network related env varliables for hooks + + @type net: L{objects.Network} + @param net: the network object + + """ + args = { + "name": net.name, + "subnet": net.network, + "gateway": net.gateway, + "network6": net.network6, + "gateway6": net.gateway6, + "network_type": net.network_type, + "mac_prefix": net.mac_prefix, + "tags": net.tags, + } + + return _BuildNetworkHookEnv(**args) # pylint: disable=W0142 + + def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status, minmem, maxmem, vcpus, nics, disk_template, disks, bep, hvp, hypervisor_name, tags): @@ -1325,7 +1412,7 @@ def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status, @type vcpus: string @param vcpus: the count of VCPUs the instance has @type nics: list - @param nics: list of tuples (ip, mac, mode, link) representing + @param nics: list of tuples (ip, mac, mode, link, network) representing the NICs the instance has @type disk_template: string @param disk_template: the disk template of the instance @@ -1360,13 +1447,31 @@ def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status, } if nics: nic_count = len(nics) - for idx, (ip, mac, mode, link) in enumerate(nics): + for idx, (ip, mac, mode, link, net, netinfo) in enumerate(nics): if ip is None: ip = "" env["INSTANCE_NIC%d_IP" % idx] = ip env["INSTANCE_NIC%d_MAC" % idx] = mac env["INSTANCE_NIC%d_MODE" % idx] = mode env["INSTANCE_NIC%d_LINK" % idx] = link + if network: + env["INSTANCE_NIC%d_NETWORK" % idx] = net + if netinfo: + nobj = objects.Network.FromDict(netinfo) + if nobj.network: + env["INSTANCE_NIC%d_NETWORK_SUBNET" % idx] = nobj.network + if nobj.gateway: + env["INSTANCE_NIC%d_NETWORK_GATEWAY" % idx] = nobj.gateway + if nobj.network6: + env["INSTANCE_NIC%d_NETWORK_SUBNET6" % idx] = nobj.network6 + if nobj.gateway6: + env["INSTANCE_NIC%d_NETWORK_GATEWAY6" % idx] = nobj.gateway6 + if nobj.mac_prefix: + env["INSTANCE_NIC%d_NETWORK_MAC_PREFIX" % idx] = nobj.mac_prefix + if nobj.network_type: + env["INSTANCE_NIC%d_NETWORK_TYPE" % idx] = nobj.network_type + if nobj.tags: + env["INSTANCE_NIC%d_NETWORK_TAGS" % idx] = " ".join(nobj.tags) if mode == constants.NIC_MODE_BRIDGED: env["INSTANCE_NIC%d_BRIDGE" % idx] = link else: @@ -1396,6 +1501,31 @@ def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status, return env +def _NICToTuple(lu, nic): + """Build a tupple of nic information. + + @type lu: L{LogicalUnit} + @param lu: the logical unit on whose behalf we execute + @type nic: L{objects.NIC} + @param nic: nic to convert to hooks tuple + + """ + ip = nic.ip + mac = nic.mac + cluster = lu.cfg.GetClusterInfo() + filled_params = cluster.SimpleFillNIC(nic.nicparams) + mode = filled_params[constants.NIC_MODE] + link = filled_params[constants.NIC_LINK] + net = nic.network + netinfo = None + if net: + net_uuid = lu.cfg.LookupNetwork(net) + if net_uuid: + nobj = lu.cfg.GetNetwork(net_uuid) + netinfo = objects.Network.ToDict(nobj) + return (ip, mac, mode, link, net, netinfo) + + def _NICListToTuple(lu, nics): """Build a list of nic information tuples. @@ -1409,14 +1539,8 @@ def _NICListToTuple(lu, nics): """ hooks_nics = [] - cluster = lu.cfg.GetClusterInfo() for nic in nics: - ip = nic.ip - mac = nic.mac - filled_params = cluster.SimpleFillNIC(nic.nicparams) - mode = filled_params[constants.NIC_MODE] - link = filled_params[constants.NIC_LINK] - hooks_nics.append((ip, mac, mode, link)) + hooks_nics.append(_NICToTuple(lu, nic)) return hooks_nics @@ -1610,8 +1734,9 @@ def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot): cluster-wide iallocator if appropriate. Check that at most one of (iallocator, node) is specified. If none is - specified, then the LU's opcode's iallocator slot is filled with the - cluster-wide default iallocator. + specified, or the iallocator is L{constants.DEFAULT_IALLOCATOR_SHORTCUT}, + then the LU's opcode's iallocator slot is filled with the cluster-wide + default iallocator. @type iallocator_slot: string @param iallocator_slot: the name of the opcode iallocator slot @@ -1621,11 +1746,14 @@ def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot): """ node = getattr(lu.op, node_slot, None) ialloc = getattr(lu.op, iallocator_slot, None) + if node == []: + node = None if node is not None and ialloc is not None: raise errors.OpPrereqError("Do not specify both, iallocator and node", errors.ECODE_INVAL) - elif node is None and ialloc is None: + elif ((node is None and ialloc is None) or + ialloc == constants.DEFAULT_IALLOCATOR_SHORTCUT): default_iallocator = lu.cfg.GetDefaultIAllocator() if default_iallocator: setattr(lu.op, iallocator_slot, default_iallocator) @@ -1660,6 +1788,27 @@ def _GetDefaultIAllocator(cfg, ialloc): return ialloc +def _CheckHostnameSane(lu, name): + """Ensures that a given hostname resolves to a 'sane' name. + + The given name is required to be a prefix of the resolved hostname, + to prevent accidental mismatches. + + @param lu: the logical unit on behalf of which we're checking + @param name: the name we should resolve and check + @return: the resolved hostname object + + """ + hostname = netutils.GetHostname(name=name) + if hostname.name != name: + lu.LogInfo("Resolved given name '%s' to '%s'", name, hostname.name) + if not utils.MatchNameComponent(name, [hostname.name]): + raise errors.OpPrereqError(("Resolved hostname '%s' does not look the" + " same as given hostname '%s'") % + (hostname.name, name), errors.ECODE_INVAL) + return hostname + + class LUClusterPostInit(LogicalUnit): """Logical unit for running hooks after cluster initialization. @@ -1893,7 +2042,7 @@ class LUClusterVerify(NoHooksLU): # Verify global configuration jobs.append([ - opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors) + opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors), ]) # Always depend on global verification @@ -2084,6 +2233,11 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): locking.LEVEL_INSTANCE: inst_names, locking.LEVEL_NODEGROUP: [self.group_uuid], locking.LEVEL_NODE: [], + + # This opcode is run by watcher every five minutes and acquires all nodes + # for a group. It doesn't run for a long time, so it's better to acquire + # the node allocation lock as well. + locking.LEVEL_NODE_ALLOC: locking.ALL_SET, } self.share_locks = _ShareAll() @@ -2408,7 +2562,8 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster, self.group_info) err = _ComputeIPolicyInstanceViolation(ipolicy, instanceconfig) - _ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance, utils.CommaJoin(err)) + _ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance, utils.CommaJoin(err), + code=self.ETYPE_WARNING) for node in node_vol_should: n_img = node_image[node] @@ -2549,7 +2704,10 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): if nresult.fail_msg or not nresult.payload: node_files = None else: - node_files = nresult.payload.get(constants.NV_FILELIST, None) + fingerprints = nresult.payload.get(constants.NV_FILELIST, None) + node_files = dict((vcluster.LocalizeVirtualPath(key), value) + for (key, value) in fingerprints.items()) + del fingerprints test = not (node_files and isinstance(node_files, dict)) errorif(test, constants.CV_ENODEFILECHECK, node.name, @@ -2764,6 +2922,37 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): "OSes present on reference node %s but missing on this node: %s", base.name, utils.CommaJoin(missing)) + def _VerifyFileStoragePaths(self, ninfo, nresult, is_master): + """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}. + + @type ninfo: L{objects.Node} + @param ninfo: the node to check + @param nresult: the remote results for the node + @type is_master: bool + @param is_master: Whether node is the master node + + """ + node = ninfo.name + + if (is_master and + (constants.ENABLE_FILE_STORAGE or + constants.ENABLE_SHARED_FILE_STORAGE)): + try: + fspaths = nresult[constants.NV_FILE_STORAGE_PATHS] + except KeyError: + # This should never happen + self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, node, + "Node did not return forbidden file storage paths") + else: + self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, node, + "Found forbidden file storage paths: %s", + utils.CommaJoin(fspaths)) + else: + self._ErrorIf(constants.NV_FILE_STORAGE_PATHS in nresult, + constants.CV_ENODEFILESTORAGEPATHS, node, + "Node should not have returned forbidden file storage" + " paths") + def _VerifyOob(self, ninfo, nresult): """Verifies out of band functionality of a node. @@ -3011,7 +3200,7 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): """ env = { - "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()) + "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()), } env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags())) @@ -3073,9 +3262,10 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): node_verify_param = { constants.NV_FILELIST: - utils.UniqueSequence(filename - for files in filemap - for filename in files), + map(vcluster.MakeVirtualPath, + utils.UniqueSequence(filename + for files in filemap + for filename in files)), constants.NV_NODELIST: self._SelectSshCheckNodes(node_data_list, self.group_uuid, self.all_node_info.values()), @@ -3105,6 +3295,10 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): node_verify_param[constants.NV_DRBDLIST] = None node_verify_param[constants.NV_DRBDHELPER] = drbd_helper + if constants.ENABLE_FILE_STORAGE or constants.ENABLE_SHARED_FILE_STORAGE: + # Load file storage paths only from master node + node_verify_param[constants.NV_FILE_STORAGE_PATHS] = master_node + # bridge checks # FIXME: this needs to be changed per node-group, not cluster-wide bridges = set() @@ -3258,6 +3452,8 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): self._VerifyNodeNetwork(node_i, nresult) self._VerifyNodeUserScripts(node_i, nresult) self._VerifyOob(node_i, nresult) + self._VerifyFileStoragePaths(node_i, nresult, + node == master_node) if nimg.vm_capable: self._VerifyNodeLVM(node_i, nresult, vg_name) @@ -3313,11 +3509,8 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): inst_config.primary_node) # If the instance is non-redundant we cannot survive losing its primary - # node, so we are not N+1 compliant. On the other hand we have no disk - # templates with more than one secondary so that situation is not well - # supported either. - # FIXME: does not support file-backed instances - if not inst_config.secondary_nodes: + # node, so we are not N+1 compliant. + if inst_config.disk_template not in constants.DTS_MIRRORED: i_non_redundant.append(instance) _ErrorIf(len(inst_config.secondary_nodes) > 1, @@ -3492,6 +3685,12 @@ class LUGroupVerifyDisks(NoHooksLU): locking.LEVEL_INSTANCE: [], locking.LEVEL_NODEGROUP: [], locking.LEVEL_NODE: [], + + # This opcode is acquires all node locks in a group. LUClusterVerifyDisks + # starts one instance of this opcode for every group, which means all + # nodes will be locked for a short amount of time, so it's better to + # acquire the node allocation lock as well. + locking.LEVEL_NODE_ALLOC: locking.ALL_SET, } def DeclareLocks(self, level): @@ -3598,6 +3797,8 @@ class LUClusterRepairDiskSizes(NoHooksLU): def ExpandNames(self): if self.op.instances: self.wanted_names = _GetWantedInstances(self, self.op.instances) + # Not getting the node allocation lock as only a specific set of + # instances (and their nodes) is going to be acquired self.needed_locks = { locking.LEVEL_NODE_RES: [], locking.LEVEL_INSTANCE: self.wanted_names, @@ -3608,10 +3809,15 @@ class LUClusterRepairDiskSizes(NoHooksLU): self.needed_locks = { locking.LEVEL_NODE_RES: locking.ALL_SET, locking.LEVEL_INSTANCE: locking.ALL_SET, + + # This opcode is acquires the node locks for all instances + locking.LEVEL_NODE_ALLOC: locking.ALL_SET, } + self.share_locks = { locking.LEVEL_NODE_RES: 1, locking.LEVEL_INSTANCE: 0, + locking.LEVEL_NODE_ALLOC: 1, } def DeclareLocks(self, level): @@ -3853,16 +4059,15 @@ class LUClusterSetParams(LogicalUnit): def ExpandNames(self): # FIXME: in the future maybe other cluster params won't require checking on # all nodes to be modified. + # FIXME: This opcode changes cluster-wide settings. Is acquiring all + # resource locks the right thing, shouldn't it be the BGL instead? self.needed_locks = { locking.LEVEL_NODE: locking.ALL_SET, locking.LEVEL_INSTANCE: locking.ALL_SET, locking.LEVEL_NODEGROUP: locking.ALL_SET, + locking.LEVEL_NODE_ALLOC: locking.ALL_SET, } - self.share_locks = { - locking.LEVEL_NODE: 1, - locking.LEVEL_INSTANCE: 1, - locking.LEVEL_NODEGROUP: 1, - } + self.share_locks = _ShareAll() def BuildHooksEnv(self): """Build hooks env. @@ -4267,7 +4472,7 @@ def _UploadHelper(lu, nodes, fname): if msg: msg = ("Copy of file %s to node %s failed: %s" % (fname, to_node, msg)) - lu.proc.LogWarning(msg) + lu.LogWarning(msg) def _ComputeAncillaryFiles(cluster, redist): @@ -4287,15 +4492,15 @@ def _ComputeAncillaryFiles(cluster, redist): pathutils.RAPI_USERS_FILE, ]) - if not redist: - files_all.update(pathutils.ALL_CERT_FILES) - files_all.update(ssconf.SimpleStore().GetFileList()) - else: + if redist: # we need to ship at least the RAPI certificate files_all.add(pathutils.RAPI_CERT_FILE) + else: + files_all.update(pathutils.ALL_CERT_FILES) + files_all.update(ssconf.SimpleStore().GetFileList()) if cluster.modify_etc_hosts: - files_all.add(constants.ETC_HOSTS) + files_all.add(pathutils.ETC_HOSTS) if cluster.use_external_mip_script: files_all.add(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT) @@ -4313,6 +4518,12 @@ def _ComputeAncillaryFiles(cluster, redist): if not redist: files_mc.add(pathutils.CLUSTER_CONF_FILE) + # File storage + if (not redist and + (constants.ENABLE_FILE_STORAGE or constants.ENABLE_SHARED_FILE_STORAGE)): + files_all.add(pathutils.FILE_STORAGE_PATHS_FILE) + files_opt.add(pathutils.FILE_STORAGE_PATHS_FILE) + # Files which should only be on VM-capable nodes files_vm = set( filename @@ -4334,6 +4545,10 @@ def _ComputeAncillaryFiles(cluster, redist): assert all_files_set.issuperset(files_opt), \ "Optional file not in a different required list" + # This one file should never ever be re-distributed via RPC + assert not (redist and + pathutils.FILE_STORAGE_PATHS_FILE in all_files_set) + return (files_all, files_opt, files_mc, files_vm) @@ -4399,8 +4614,9 @@ class LUClusterRedistConf(NoHooksLU): def ExpandNames(self): self.needed_locks = { locking.LEVEL_NODE: locking.ALL_SET, + locking.LEVEL_NODE_ALLOC: locking.ALL_SET, } - self.share_locks[locking.LEVEL_NODE] = 1 + self.share_locks = _ShareAll() def Exec(self, feedback_fn): """Redistribute the configuration. @@ -4450,7 +4666,7 @@ def _WaitForSync(lu, instance, disks=None, oneshot=False): disks = _ExpandCheckDisks(instance, disks) if not oneshot: - lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name) + lu.LogInfo("Waiting for instance %s to sync disks", instance.name) node = instance.primary_node @@ -4493,8 +4709,8 @@ def _WaitForSync(lu, instance, disks=None, oneshot=False): max_time = mstat.estimated_time else: rem_time = "no time estimate" - lu.proc.LogInfo("- device %s: %5.2f%% done, %s" % - (disks[i].iv_name, mstat.sync_percent, rem_time)) + lu.LogInfo("- device %s: %5.2f%% done, %s", + disks[i].iv_name, mstat.sync_percent, rem_time) # if we're done but degraded, let's do a few small retries, to # make sure we see a stable and not transient situation; therefore @@ -4511,7 +4727,8 @@ def _WaitForSync(lu, instance, disks=None, oneshot=False): time.sleep(min(60, max_time)) if done: - lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name) + lu.LogInfo("Instance %s's disks are in sync", instance.name) + return not cumul_degraded @@ -4597,6 +4814,11 @@ class LUOobCommand(NoHooksLU): locking.LEVEL_NODE: lock_names, } + if not self.op.node_names: + # Acquire node allocation lock only if all nodes are affected + self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET + self.share_locks[locking.LEVEL_NODE_ALLOC] = 1 + def CheckPrereq(self): """Check prerequisites. @@ -5029,6 +5251,7 @@ class _NodeQuery(_QueryBase): if self.do_locking: # If any non-static field is requested we need to lock the nodes lu.needed_locks[locking.LEVEL_NODE] = self.wanted + lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET def DeclareLocks(self, lu, level): pass @@ -5123,13 +5346,16 @@ class LUNodeQueryvols(NoHooksLU): def ExpandNames(self): self.share_locks = _ShareAll() - self.needed_locks = {} - if not self.op.nodes: - self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET + if self.op.nodes: + self.needed_locks = { + locking.LEVEL_NODE: _GetWantedNodes(self, self.op.nodes), + } else: - self.needed_locks[locking.LEVEL_NODE] = \ - _GetWantedNodes(self, self.op.nodes) + self.needed_locks = { + locking.LEVEL_NODE: locking.ALL_SET, + locking.LEVEL_NODE_ALLOC: locking.ALL_SET, + } def Exec(self, feedback_fn): """Computes the list of nodes and their attributes. @@ -5192,13 +5418,16 @@ class LUNodeQueryStorage(NoHooksLU): def ExpandNames(self): self.share_locks = _ShareAll() - self.needed_locks = {} if self.op.nodes: - self.needed_locks[locking.LEVEL_NODE] = \ - _GetWantedNodes(self, self.op.nodes) + self.needed_locks = { + locking.LEVEL_NODE: _GetWantedNodes(self, self.op.nodes), + } else: - self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET + self.needed_locks = { + locking.LEVEL_NODE: locking.ALL_SET, + locking.LEVEL_NODE_ALLOC: locking.ALL_SET, + } def Exec(self, feedback_fn): """Computes the list of nodes and their attributes. @@ -5824,19 +6053,28 @@ class LUNodeSetParams(LogicalUnit): def ExpandNames(self): if self.lock_all: - self.needed_locks = {locking.LEVEL_NODE: locking.ALL_SET} + self.needed_locks = { + locking.LEVEL_NODE: locking.ALL_SET, + + # Block allocations when all nodes are locked + locking.LEVEL_NODE_ALLOC: locking.ALL_SET, + } else: - self.needed_locks = {locking.LEVEL_NODE: self.op.node_name} + self.needed_locks = { + locking.LEVEL_NODE: self.op.node_name, + } # Since modifying a node can have severe effects on currently running # operations the resource lock is at least acquired in shared mode self.needed_locks[locking.LEVEL_NODE_RES] = \ self.needed_locks[locking.LEVEL_NODE] - # Get node resource and instance locks in shared mode; they are not used - # for anything but read-only access - self.share_locks[locking.LEVEL_NODE_RES] = 1 - self.share_locks[locking.LEVEL_INSTANCE] = 1 + # Get all locks except nodes in shared mode; they are not used for anything + # but read-only access + self.share_locks = _ShareAll() + self.share_locks[locking.LEVEL_NODE] = 0 + self.share_locks[locking.LEVEL_NODE_RES] = 0 + self.share_locks[locking.LEVEL_NODE_ALLOC] = 0 if self.lock_instances: self.needed_locks[locking.LEVEL_INSTANCE] = \ @@ -6003,7 +6241,8 @@ class LUNodeSetParams(LogicalUnit): if master_singlehomed and self.op.secondary_ip != node.primary_ip: if self.op.force and node.name == master.name: self.LogWarning("Transitioning from single-homed to multi-homed" - " cluster. All nodes will require a secondary ip.") + " cluster; all nodes will require a secondary IP" + " address") else: raise errors.OpPrereqError("Changing the secondary ip on a" " single-homed cluster requires the" @@ -6013,7 +6252,8 @@ class LUNodeSetParams(LogicalUnit): elif not master_singlehomed and self.op.secondary_ip == node.primary_ip: if self.op.force and node.name == master.name: self.LogWarning("Transitioning from multi-homed to single-homed" - " cluster. Secondary IPs will have to be removed.") + " cluster; secondary IP addresses will have to be" + " removed") else: raise errors.OpPrereqError("Cannot set the secondary IP to be the" " same as the primary IP on a multi-homed" @@ -6391,9 +6631,9 @@ def _AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False, if msg: is_offline_secondary = (node in instance.secondary_nodes and result.offline) - lu.proc.LogWarning("Could not prepare block device %s on node %s" - " (is_primary=False, pass=1): %s", - inst_disk.iv_name, node, msg) + lu.LogWarning("Could not prepare block device %s on node %s" + " (is_primary=False, pass=1): %s", + inst_disk.iv_name, node, msg) if not (ignore_secondaries or is_offline_secondary): disks_ok = False @@ -6414,9 +6654,9 @@ def _AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False, True, idx) msg = result.fail_msg if msg: - lu.proc.LogWarning("Could not prepare block device %s on node %s" - " (is_primary=True, pass=2): %s", - inst_disk.iv_name, node, msg) + lu.LogWarning("Could not prepare block device %s on node %s" + " (is_primary=True, pass=2): %s", + inst_disk.iv_name, node, msg) disks_ok = False else: dev_path = result.payload @@ -6441,9 +6681,9 @@ def _StartInstanceDisks(lu, instance, force): if not disks_ok: _ShutdownInstanceDisks(lu, instance) if force is not None and not force: - lu.proc.LogWarning("", hint="If the message above refers to a" - " secondary node," - " you can retry the operation using '--force'.") + lu.LogWarning("", + hint=("If the message above refers to a secondary node," + " you can retry the operation using '--force'")) raise errors.OpExecError("Disk consistency error") @@ -6746,10 +6986,10 @@ class LUInstanceStartup(LogicalUnit): self.primary_offline = self.cfg.GetNodeInfo(instance.primary_node).offline if self.primary_offline and self.op.ignore_offline_nodes: - self.proc.LogWarning("Ignoring offline primary node") + self.LogWarning("Ignoring offline primary node") if self.op.hvparams or self.op.beparams: - self.proc.LogWarning("Overridden parameters are ignored") + self.LogWarning("Overridden parameters are ignored") else: _CheckNodeOnline(self, instance.primary_node) @@ -6781,7 +7021,7 @@ class LUInstanceStartup(LogicalUnit): if self.primary_offline: assert self.op.ignore_offline_nodes - self.proc.LogInfo("Primary node offline, marked instance as started") + self.LogInfo("Primary node offline, marked instance as started") else: node_current = instance.primary_node @@ -6936,7 +7176,7 @@ class LUInstanceShutdown(LogicalUnit): self.cfg.GetNodeInfo(self.instance.primary_node).offline if self.primary_offline and self.op.ignore_offline_nodes: - self.proc.LogWarning("Ignoring offline primary node") + self.LogWarning("Ignoring offline primary node") else: _CheckNodeOnline(self, self.instance.primary_node) @@ -6953,12 +7193,12 @@ class LUInstanceShutdown(LogicalUnit): if self.primary_offline: assert self.op.ignore_offline_nodes - self.proc.LogInfo("Primary node offline, marked instance as stopped") + self.LogInfo("Primary node offline, marked instance as stopped") else: result = self.rpc.call_instance_shutdown(node_current, instance, timeout) msg = result.fail_msg if msg: - self.proc.LogWarning("Could not shutdown instance: %s" % msg) + self.LogWarning("Could not shutdown instance: %s", msg) _ShutdownInstanceDisks(self, instance) @@ -7127,7 +7367,7 @@ class LUInstanceRecreateDisks(LogicalUnit): utils.CommaJoin(ial.result)) def CheckArguments(self): - if self.op.disks and ht.TPositiveInt(self.op.disks[0]): + if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]): # Normalize and convert deprecated list of disk indices self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))] @@ -7137,9 +7377,10 @@ class LUInstanceRecreateDisks(LogicalUnit): " once: %s" % utils.CommaJoin(duplicates), errors.ECODE_INVAL) - if self.op.iallocator and self.op.nodes: - raise errors.OpPrereqError("Give either the iallocator or the new" - " nodes, not both", errors.ECODE_INVAL) + # We don't want _CheckIAllocatorOrNode selecting the default iallocator + # when neither iallocator nor nodes are specified + if self.op.iallocator or self.op.nodes: + _CheckIAllocatorOrNode(self, "iallocator", "nodes") for (idx, params) in self.op.disks: utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES) @@ -7153,6 +7394,7 @@ class LUInstanceRecreateDisks(LogicalUnit): def ExpandNames(self): self._ExpandAndLockInstance() self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND + if self.op.nodes: self.op.nodes = [_ExpandNodeName(self.cfg, n) for n in self.op.nodes] self.needed_locks[locking.LEVEL_NODE] = list(self.op.nodes) @@ -7161,6 +7403,8 @@ class LUInstanceRecreateDisks(LogicalUnit): if self.op.iallocator: # iallocator will select a new node in the same group self.needed_locks[locking.LEVEL_NODEGROUP] = [] + self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET + self.needed_locks[locking.LEVEL_NODE_RES] = [] def DeclareLocks(self, level): @@ -7190,12 +7434,14 @@ class LUInstanceRecreateDisks(LogicalUnit): for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP): self.needed_locks[locking.LEVEL_NODE].extend( self.cfg.GetNodeGroup(group_uuid).members) + + assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC) elif not self.op.nodes: self._LockInstancesNodes(primary_only=False) elif level == locking.LEVEL_NODE_RES: # Copy node locks self.needed_locks[locking.LEVEL_NODE_RES] = \ - self.needed_locks[locking.LEVEL_NODE][:] + _CopyLockList(self.needed_locks[locking.LEVEL_NODE]) def BuildHooksEnv(self): """Build hooks env. @@ -7280,6 +7526,9 @@ class LUInstanceRecreateDisks(LogicalUnit): # Release unneeded node and node resource locks _ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.nodes) _ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.nodes) + _ReleaseLocks(self, locking.LEVEL_NODE_ALLOC) + + assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC) def Exec(self, feedback_fn): """Recreate the disks. @@ -7393,15 +7642,7 @@ class LUInstanceRename(LogicalUnit): new_name = self.op.new_name if self.op.name_check: - hostname = netutils.GetHostname(name=new_name) - if hostname.name != new_name: - self.LogInfo("Resolved given name '%s' to '%s'", new_name, - hostname.name) - if not utils.MatchNameComponent(self.op.new_name, [hostname.name]): - raise errors.OpPrereqError(("Resolved hostname '%s' does not look the" - " same as given hostname '%s'") % - (hostname.name, self.op.new_name), - errors.ECODE_INVAL) + hostname = _CheckHostnameSane(self, new_name) new_name = self.op.new_name = hostname.name if (self.op.ip_check and netutils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT)): @@ -7431,6 +7672,7 @@ class LUInstanceRename(LogicalUnit): # Change the instance lock. This is definitely safe while we hold the BGL. # Otherwise the new lock would have to be added in acquired mode. assert self.REQ_BGL + assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER) self.glm.remove(locking.LEVEL_INSTANCE, old_name) self.glm.add(locking.LEVEL_INSTANCE, self.op.new_name) @@ -7448,6 +7690,15 @@ class LUInstanceRename(LogicalUnit): new_file_storage_dir)) _StartInstanceDisks(self, inst, None) + # update info on disks + info = _GetInstanceInfoText(inst) + for (idx, disk) in enumerate(inst.disks): + for node in inst.all_nodes: + self.cfg.SetDiskID(disk, node) + result = self.rpc.call_blockdev_setinfo(node, disk, info) + if result.fail_msg: + self.LogWarning("Error setting info on node %s for disk %s: %s", + node, idx, result.fail_msg) try: result = self.rpc.call_instance_run_rename(inst.primary_node, inst, old_name, self.op.debug_level) @@ -7456,7 +7707,7 @@ class LUInstanceRename(LogicalUnit): msg = ("Could not run OS rename script for instance %s on node %s" " (but the instance has been renamed in Ganeti): %s" % (inst.name, inst.primary_node, msg)) - self.proc.LogWarning(msg) + self.LogWarning(msg) finally: _ShutdownInstanceDisks(self, inst) @@ -7483,7 +7734,7 @@ class LUInstanceRemove(LogicalUnit): elif level == locking.LEVEL_NODE_RES: # Copy node locks self.needed_locks[locking.LEVEL_NODE_RES] = \ - self.needed_locks[locking.LEVEL_NODE][:] + _CopyLockList(self.needed_locks[locking.LEVEL_NODE]) def BuildHooksEnv(self): """Build hooks env. @@ -7584,6 +7835,59 @@ class LUInstanceQuery(NoHooksLU): return self.iq.OldStyleQuery(self) +def _ExpandNamesForMigration(lu): + """Expands names for use with L{TLMigrateInstance}. + + @type lu: L{LogicalUnit} + + """ + if lu.op.target_node is not None: + lu.op.target_node = _ExpandNodeName(lu.cfg, lu.op.target_node) + + lu.needed_locks[locking.LEVEL_NODE] = [] + lu.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE + + lu.needed_locks[locking.LEVEL_NODE_RES] = [] + lu.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE + + # The node allocation lock is actually only needed for replicated instances + # (e.g. DRBD8) and if an iallocator is used. + lu.needed_locks[locking.LEVEL_NODE_ALLOC] = [] + + +def _DeclareLocksForMigration(lu, level): + """Declares locks for L{TLMigrateInstance}. + + @type lu: L{LogicalUnit} + @param level: Lock level + + """ + if level == locking.LEVEL_NODE_ALLOC: + assert lu.op.instance_name in lu.owned_locks(locking.LEVEL_INSTANCE) + + instance = lu.cfg.GetInstanceInfo(lu.op.instance_name) + + if instance.disk_template in constants.DTS_EXT_MIRROR: + if lu.op.target_node is None: + lu.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET + lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET + else: + lu.needed_locks[locking.LEVEL_NODE] = [instance.primary_node, + lu.op.target_node] + del lu.recalculate_locks[locking.LEVEL_NODE] + else: + lu._LockInstancesNodes() # pylint: disable=W0212 + + elif level == locking.LEVEL_NODE: + # Node locks are declared together with the node allocation lock + assert lu.needed_locks[locking.LEVEL_NODE] + + elif level == locking.LEVEL_NODE_RES: + # Copy node locks + lu.needed_locks[locking.LEVEL_NODE_RES] = \ + _CopyLockList(lu.needed_locks[locking.LEVEL_NODE]) + + class LUInstanceFailover(LogicalUnit): """Failover an instance. @@ -7601,42 +7905,17 @@ class LUInstanceFailover(LogicalUnit): def ExpandNames(self): self._ExpandAndLockInstance() + _ExpandNamesForMigration(self) - if self.op.target_node is not None: - self.op.target_node = _ExpandNodeName(self.cfg, self.op.target_node) - - self.needed_locks[locking.LEVEL_NODE] = [] - self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE - - self.needed_locks[locking.LEVEL_NODE_RES] = [] - self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE + self._migrater = \ + TLMigrateInstance(self, self.op.instance_name, False, True, False, + self.op.ignore_consistency, True, + self.op.shutdown_timeout, self.op.ignore_ipolicy) - ignore_consistency = self.op.ignore_consistency - shutdown_timeout = self.op.shutdown_timeout - self._migrater = TLMigrateInstance(self, self.op.instance_name, - cleanup=False, - failover=True, - ignore_consistency=ignore_consistency, - shutdown_timeout=shutdown_timeout, - ignore_ipolicy=self.op.ignore_ipolicy) self.tasklets = [self._migrater] def DeclareLocks(self, level): - if level == locking.LEVEL_NODE: - instance = self.context.cfg.GetInstanceInfo(self.op.instance_name) - if instance.disk_template in constants.DTS_EXT_MIRROR: - if self.op.target_node is None: - self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET - else: - self.needed_locks[locking.LEVEL_NODE] = [instance.primary_node, - self.op.target_node] - del self.recalculate_locks[locking.LEVEL_NODE] - else: - self._LockInstancesNodes() - elif level == locking.LEVEL_NODE_RES: - # Copy node locks - self.needed_locks[locking.LEVEL_NODE_RES] = \ - self.needed_locks[locking.LEVEL_NODE][:] + _DeclareLocksForMigration(self, level) def BuildHooksEnv(self): """Build hooks env. @@ -7686,41 +7965,19 @@ class LUInstanceMigrate(LogicalUnit): def ExpandNames(self): self._ExpandAndLockInstance() - - if self.op.target_node is not None: - self.op.target_node = _ExpandNodeName(self.cfg, self.op.target_node) - - self.needed_locks[locking.LEVEL_NODE] = [] - self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE - - self.needed_locks[locking.LEVEL_NODE] = [] - self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE + _ExpandNamesForMigration(self) self._migrater = \ - TLMigrateInstance(self, self.op.instance_name, - cleanup=self.op.cleanup, - failover=False, - fallback=self.op.allow_failover, - allow_runtime_changes=self.op.allow_runtime_changes, - ignore_ipolicy=self.op.ignore_ipolicy) + TLMigrateInstance(self, self.op.instance_name, self.op.cleanup, + False, self.op.allow_failover, False, + self.op.allow_runtime_changes, + constants.DEFAULT_SHUTDOWN_TIMEOUT, + self.op.ignore_ipolicy) + self.tasklets = [self._migrater] def DeclareLocks(self, level): - if level == locking.LEVEL_NODE: - instance = self.context.cfg.GetInstanceInfo(self.op.instance_name) - if instance.disk_template in constants.DTS_EXT_MIRROR: - if self.op.target_node is None: - self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET - else: - self.needed_locks[locking.LEVEL_NODE] = [instance.primary_node, - self.op.target_node] - del self.recalculate_locks[locking.LEVEL_NODE] - else: - self._LockInstancesNodes() - elif level == locking.LEVEL_NODE_RES: - # Copy node locks - self.needed_locks[locking.LEVEL_NODE_RES] = \ - self.needed_locks[locking.LEVEL_NODE][:] + _DeclareLocksForMigration(self, level) def BuildHooksEnv(self): """Build hooks env. @@ -7779,7 +8036,7 @@ class LUInstanceMove(LogicalUnit): elif level == locking.LEVEL_NODE_RES: # Copy node locks self.needed_locks[locking.LEVEL_NODE_RES] = \ - self.needed_locks[locking.LEVEL_NODE][:] + _CopyLockList(self.needed_locks[locking.LEVEL_NODE]) def BuildHooksEnv(self): """Build hooks env. @@ -7877,10 +8134,10 @@ class LUInstanceMove(LogicalUnit): msg = result.fail_msg if msg: if self.op.ignore_consistency: - self.proc.LogWarning("Could not shutdown instance %s on node %s." - " Proceeding anyway. Please make sure node" - " %s is down. Error details: %s", - instance.name, source_node, source_node, msg) + self.LogWarning("Could not shutdown instance %s on node %s." + " Proceeding anyway. Please make sure node" + " %s is down. Error details: %s", + instance.name, source_node, source_node, msg) else: raise errors.OpExecError("Could not shutdown instance %s on" " node %s: %s" % @@ -8006,8 +8263,7 @@ class LUNodeMigrate(LogicalUnit): target_node=self.op.target_node, allow_runtime_changes=allow_runtime_changes, ignore_ipolicy=self.op.ignore_ipolicy)] - for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name) - ] + for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name)] # TODO: Run iallocator in this opcode and pass correct placement options to # OpInstanceMigrate. Since other jobs can modify the cluster between @@ -8051,12 +8307,9 @@ class TLMigrateInstance(Tasklet): _MIGRATION_POLL_INTERVAL = 1 # seconds _MIGRATION_FEEDBACK_INTERVAL = 10 # seconds - def __init__(self, lu, instance_name, cleanup=False, - failover=False, fallback=False, - ignore_consistency=False, - allow_runtime_changes=True, - shutdown_timeout=constants.DEFAULT_SHUTDOWN_TIMEOUT, - ignore_ipolicy=False): + def __init__(self, lu, instance_name, cleanup, failover, fallback, + ignore_consistency, allow_runtime_changes, shutdown_timeout, + ignore_ipolicy): """Initializes this class. """ @@ -8102,6 +8355,8 @@ class TLMigrateInstance(Tasklet): errors.ECODE_STATE) if instance.disk_template in constants.DTS_EXT_MIRROR: + assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC) + _CheckIAllocatorOrNode(self.lu, "iallocator", "target_node") if self.lu.op.iallocator: @@ -8133,8 +8388,11 @@ class TLMigrateInstance(Tasklet): # in the LU _ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=[instance.primary_node, self.target_node]) + _ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC) else: + assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC) + secondary_nodes = instance.secondary_nodes if not secondary_nodes: raise errors.ConfigurationError("No secondary node but using" @@ -8236,6 +8494,8 @@ class TLMigrateInstance(Tasklet): """Run the allocator based on input opcode. """ + assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC) + # FIXME: add a self.ignore_ipolicy option req = iallocator.IAReqRelocate(name=self.instance_name, relocate_from=[self.instance.primary_node]) @@ -8543,6 +8803,8 @@ class TLMigrateInstance(Tasklet): self.feedback_fn("Migration failed, aborting") self._AbortMigration() self._RevertDiskStatus() + if not msg: + msg = "hypervisor returned failure" raise errors.OpExecError("Could not migrate instance %s: %s" % (instance.name, msg)) @@ -8919,9 +9181,11 @@ def _GenerateDiskTemplate( for i in range(disk_count)]) if template_name == constants.DT_PLAIN: + def logical_id_fn(idx, _, disk): vg = disk.get(constants.IDISK_VG, vgname) return (vg, names[idx]) + elif template_name in (constants.DT_FILE, constants.DT_SHARED_FILE): logical_id_fn = \ lambda _, disk_index, disk: (file_driver, @@ -8972,7 +9236,7 @@ def _CalcEta(time_taken, written, total_size): return (total_size - written) * avg_time -def _WipeDisks(lu, instance): +def _WipeDisks(lu, instance, disks=None): """Wipes instance disks. @type lu: L{LogicalUnit} @@ -8984,13 +9248,18 @@ def _WipeDisks(lu, instance): """ node = instance.primary_node - for device in instance.disks: + if disks is None: + disks = [(idx, disk, 0) + for (idx, disk) in enumerate(instance.disks)] + + for (_, device, _) in disks: lu.cfg.SetDiskID(device, node) logging.info("Pausing synchronization of disks of instance '%s'", instance.name) result = lu.rpc.call_blockdev_pause_resume_sync(node, - (instance.disks, instance), + (map(compat.snd, disks), + instance), True) result.Raise("Failed to pause disk synchronization on node '%s'" % node) @@ -9000,45 +9269,54 @@ def _WipeDisks(lu, instance): " failed", idx, instance.name) try: - for idx, device in enumerate(instance.disks): + for (idx, device, offset) in disks: # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but - # MAX_WIPE_CHUNK at max - wipe_chunk_size = min(constants.MAX_WIPE_CHUNK, device.size / 100.0 * - constants.MIN_WIPE_CHUNK_PERCENT) - # we _must_ make this an int, otherwise rounding errors will - # occur - wipe_chunk_size = int(wipe_chunk_size) - - lu.LogInfo("* Wiping disk %d", idx) - logging.info("Wiping disk %d for instance %s on node %s using" - " chunk size %s", idx, instance.name, node, wipe_chunk_size) + # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors. + wipe_chunk_size = \ + int(min(constants.MAX_WIPE_CHUNK, + device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT)) - offset = 0 size = device.size last_output = 0 start_time = time.time() + if offset == 0: + info_text = "" + else: + info_text = (" (from %s to %s)" % + (utils.FormatUnit(offset, "h"), + utils.FormatUnit(size, "h"))) + + lu.LogInfo("* Wiping disk %s%s", idx, info_text) + + logging.info("Wiping disk %d for instance %s on node %s using" + " chunk size %s", idx, instance.name, node, wipe_chunk_size) + while offset < size: wipe_size = min(wipe_chunk_size, size - offset) + logging.debug("Wiping disk %d, offset %s, chunk %s", idx, offset, wipe_size) + result = lu.rpc.call_blockdev_wipe(node, (device, instance), offset, wipe_size) result.Raise("Could not wipe disk %d at offset %d for size %d" % (idx, offset, wipe_size)) + now = time.time() offset += wipe_size if now - last_output >= 60: eta = _CalcEta(now - start_time, offset, size) - lu.LogInfo(" - done: %.1f%% ETA: %s" % - (offset / float(size) * 100, utils.FormatSeconds(eta))) + lu.LogInfo(" - done: %.1f%% ETA: %s", + offset / float(size) * 100, utils.FormatSeconds(eta)) last_output = now finally: logging.info("Resuming synchronization of disks for instance '%s'", instance.name) result = lu.rpc.call_blockdev_pause_resume_sync(node, - (instance.disks, instance), + (map(compat.snd, disks), + instance), False) if result.fail_msg: @@ -9140,7 +9418,7 @@ def _RemoveDisks(lu, instance, target_node=None, ignore_failures=False): for port in ports_to_release: lu.cfg.AddTcpUdpPort(port) - if instance.disk_template == constants.DT_FILE: + if instance.disk_template in constants.DTS_FILEBASED: file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1]) if target_node: tgt = target_node @@ -9286,33 +9564,38 @@ def _CreateInstanceAllocRequest(op, disks, nics, beparams): hypervisor=op.hypervisor) -def _ComputeNics(op, cluster, default_ip, cfg, proc): +def _ComputeNics(op, cluster, default_ip, cfg, ec_id): """Computes the nics. @param op: The instance opcode @param cluster: Cluster configuration object @param default_ip: The default ip to assign @param cfg: An instance of the configuration object - @param proc: The executer instance + @param ec_id: Execution context ID @returns: The build up nics """ nics = [] - for idx, nic in enumerate(op.nics): + for nic in op.nics: nic_mode_req = nic.get(constants.INIC_MODE, None) nic_mode = nic_mode_req if nic_mode is None or nic_mode == constants.VALUE_AUTO: nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE] - # in routed mode, for the first nic, the default ip is 'auto' - if nic_mode == constants.NIC_MODE_ROUTED and idx == 0: - default_ip_mode = constants.VALUE_AUTO + net = nic.get(constants.INIC_NETWORK, None) + link = nic.get(constants.NIC_LINK, None) + ip = nic.get(constants.INIC_IP, None) + + if net is None or net.lower() == constants.VALUE_NONE: + net = None else: - default_ip_mode = constants.VALUE_NONE + if nic_mode_req is not None or link is not None: + raise errors.OpPrereqError("If network is given, no mode or link" + " is allowed to be passed", + errors.ECODE_INVAL) # ip validity checks - ip = nic.get(constants.INIC_IP, default_ip_mode) if ip is None or ip.lower() == constants.VALUE_NONE: nic_ip = None elif ip.lower() == constants.VALUE_AUTO: @@ -9322,9 +9605,18 @@ def _ComputeNics(op, cluster, default_ip, cfg, proc): errors.ECODE_INVAL) nic_ip = default_ip else: - if not netutils.IPAddress.IsValid(ip): + # We defer pool operations until later, so that the iallocator has + # filled in the instance's node(s) dimara + if ip.lower() == constants.NIC_IP_POOL: + if net is None: + raise errors.OpPrereqError("if ip=pool, parameter network" + " must be passed too", + errors.ECODE_INVAL) + + elif not netutils.IPAddress.IsValid(ip): raise errors.OpPrereqError("Invalid IP address '%s'" % ip, errors.ECODE_INVAL) + nic_ip = ip # TODO: check the ip address for uniqueness @@ -9339,16 +9631,13 @@ def _ComputeNics(op, cluster, default_ip, cfg, proc): try: # TODO: We need to factor this out - cfg.ReserveMAC(mac, proc.GetECId()) + cfg.ReserveMAC(mac, ec_id) except errors.ReservationError: raise errors.OpPrereqError("MAC address %s already in use" " in cluster" % mac, errors.ECODE_NOTUNIQUE) # Build nic parameters - link = nic.get(constants.INIC_LINK, None) - if link == constants.VALUE_AUTO: - link = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_LINK] nicparams = {} if nic_mode_req: nicparams[constants.NIC_MODE] = nic_mode @@ -9357,7 +9646,8 @@ def _ComputeNics(op, cluster, default_ip, cfg, proc): check_params = cluster.SimpleFillNIC(nicparams) objects.NIC.CheckParameterSyntax(check_params) - nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams)) + nics.append(objects.NIC(mac=mac, ip=nic_ip, + network=net, nicparams=nicparams)) return nics @@ -9483,7 +9773,7 @@ class LUInstanceCreate(LogicalUnit): # instance name verification if self.op.name_check: - self.hostname1 = netutils.GetHostname(name=self.op.instance_name) + self.hostname1 = _CheckHostnameSane(self, self.op.instance_name) self.op.instance_name = self.hostname1.name # used in CheckPrereq for ip ping check self.check_ip = self.hostname1.ip @@ -9604,7 +9894,7 @@ class LUInstanceCreate(LogicalUnit): # specifying a group on instance creation and then selecting nodes from # that group self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET - self.needed_locks[locking.LEVEL_NODE_RES] = locking.ALL_SET + self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET else: self.op.pnode = _ExpandNodeName(self.cfg, self.op.pnode) nodelist = [self.op.pnode] @@ -9612,9 +9902,6 @@ class LUInstanceCreate(LogicalUnit): self.op.snode = _ExpandNodeName(self.cfg, self.op.snode) nodelist.append(self.op.snode) self.needed_locks[locking.LEVEL_NODE] = nodelist - # Lock resources of instance's primary and secondary nodes (copy to - # prevent accidential modification) - self.needed_locks[locking.LEVEL_NODE_RES] = list(nodelist) # in case of import lock the source node too if self.op.mode == constants.INSTANCE_IMPORT: @@ -9626,6 +9913,7 @@ class LUInstanceCreate(LogicalUnit): if src_node is None: self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET + self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET self.op.src_node = None if os.path.isabs(src_path): raise errors.OpPrereqError("Importing an instance from a path" @@ -9639,10 +9927,15 @@ class LUInstanceCreate(LogicalUnit): self.op.src_path = src_path = \ utils.PathJoin(pathutils.EXPORT_DIR, src_path) + self.needed_locks[locking.LEVEL_NODE_RES] = \ + _CopyLockList(self.needed_locks[locking.LEVEL_NODE]) + def _RunAllocator(self): """Run the allocator based on input opcode. """ + #TODO Export network to iallocator so that it chooses a pnode + # in a nodegroup that has the desired network connected to req = _CreateInstanceAllocRequest(self.op, self.disks, self.nics, self.be_full) ial = iallocator.IAllocator(self.cfg, self.rpc, req) @@ -9954,7 +10247,7 @@ class LUInstanceCreate(LogicalUnit): # NIC buildup self.nics = _ComputeNics(self.op, cluster, self.hostname1.ip, self.cfg, - self.proc) + self.proc.GetECId()) # disk checks/pre-build default_vg = self.cfg.GetVGName() @@ -9999,7 +10292,7 @@ class LUInstanceCreate(LogicalUnit): # creation job will fail. for nic in self.nics: if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE): - nic.mac = self.cfg.GenerateMAC(self.proc.GetECId()) + nic.mac = self.cfg.GenerateMAC(nic.network, self.proc.GetECId()) #### allocator run @@ -10007,12 +10300,14 @@ class LUInstanceCreate(LogicalUnit): self._RunAllocator() # Release all unneeded node locks - _ReleaseLocks(self, locking.LEVEL_NODE, - keep=filter(None, [self.op.pnode, self.op.snode, - self.op.src_node])) - _ReleaseLocks(self, locking.LEVEL_NODE_RES, - keep=filter(None, [self.op.pnode, self.op.snode, - self.op.src_node])) + keep_locks = filter(None, [self.op.pnode, self.op.snode, self.op.src_node]) + _ReleaseLocks(self, locking.LEVEL_NODE, keep=keep_locks) + _ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=keep_locks) + _ReleaseLocks(self, locking.LEVEL_NODE_ALLOC) + + assert (self.owned_locks(locking.LEVEL_NODE) == + self.owned_locks(locking.LEVEL_NODE_RES)), \ + "Node locks differ from node resource locks" #### node related checks @@ -10032,6 +10327,43 @@ class LUInstanceCreate(LogicalUnit): self.secondaries = [] + # Fill in any IPs from IP pools. This must happen here, because we need to + # know the nic's primary node, as specified by the iallocator + for idx, nic in enumerate(self.nics): + net = nic.network + if net is not None: + netparams = self.cfg.GetGroupNetParams(net, self.pnode.name) + if netparams is None: + raise errors.OpPrereqError("No netparams found for network" + " %s. Propably not connected to" + " node's %s nodegroup" % + (net, self.pnode.name), + errors.ECODE_INVAL) + self.LogInfo("NIC/%d inherits netparams %s" % + (idx, netparams.values())) + nic.nicparams = dict(netparams) + if nic.ip is not None: + if nic.ip.lower() == constants.NIC_IP_POOL: + try: + nic.ip = self.cfg.GenerateIp(net, self.proc.GetECId()) + except errors.ReservationError: + raise errors.OpPrereqError("Unable to get a free IP for NIC %d" + " from the address pool" % idx, + errors.ECODE_STATE) + self.LogInfo("Chose IP %s from network %s", nic.ip, net) + else: + try: + self.cfg.ReserveIp(net, nic.ip, self.proc.GetECId()) + except errors.ReservationError: + raise errors.OpPrereqError("IP address %s already in use" + " or does not belong to network %s" % + (nic.ip, net), + errors.ECODE_NOTUNIQUE) + else: + # net is None, ip None or given + if self.op.conflicts_check: + _CheckForConflictingIp(self, nic.ip, self.pnode.name) + # mirror node verification if self.op.disk_template in constants.DTS_INT_MIRROR: if self.op.snode == pnode.name: @@ -10133,7 +10465,7 @@ class LUInstanceCreate(LogicalUnit): if baddisks: raise errors.OpPrereqError("Device node(s) %s lie outside %s and" " cannot be adopted" % - (", ".join(baddisks), + (utils.CommaJoin(baddisks), constants.ADOPTABLE_BLOCKDEV_ROOT), errors.ECODE_INVAL) @@ -10151,6 +10483,27 @@ class LUInstanceCreate(LogicalUnit): dsk[constants.IDISK_SIZE] = \ int(float(node_disks[dsk[constants.IDISK_ADOPT]])) + # Verify instance specs + spindle_use = self.be_full.get(constants.BE_SPINDLE_USE, None) + ispec = { + constants.ISPEC_MEM_SIZE: self.be_full.get(constants.BE_MAXMEM, None), + constants.ISPEC_CPU_COUNT: self.be_full.get(constants.BE_VCPUS, None), + constants.ISPEC_DISK_COUNT: len(self.disks), + constants.ISPEC_DISK_SIZE: [disk[constants.IDISK_SIZE] + for disk in self.disks], + constants.ISPEC_NIC_COUNT: len(self.nics), + constants.ISPEC_SPINDLE_USE: spindle_use, + } + + group_info = self.cfg.GetNodeGroup(pnode.group) + ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster, group_info) + res = _ComputeIPolicyInstanceSpecViolation(ipolicy, ispec) + if not self.op.ignore_ipolicy and res: + raise errors.OpPrereqError(("Instance allocation to group %s violates" + " policy: %s") % (pnode.group, + utils.CommaJoin(res)), + errors.ECODE_INVAL) + _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams) _CheckNodeHasOS(self, pnode.name, self.op.os_type, self.op.force_variant) @@ -10179,6 +10532,7 @@ class LUInstanceCreate(LogicalUnit): assert not (self.owned_locks(locking.LEVEL_NODE_RES) - self.owned_locks(locking.LEVEL_NODE)), \ "Node locks differ from node resource locks" + assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC) ht_kind = self.op.hypervisor if ht_kind in constants.HTS_REQ_PORT: @@ -10457,7 +10811,11 @@ class LUInstanceMultiAlloc(NoHooksLU): """ self.share_locks = _ShareAll() - self.needed_locks = {} + self.needed_locks = { + # iallocator will select nodes and even if no iallocator is used, + # collisions with LUInstanceCreate should be avoided + locking.LEVEL_NODE_ALLOC: locking.ALL_SET, + } if self.op.iallocator: self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET @@ -10482,11 +10840,14 @@ class LUInstanceMultiAlloc(NoHooksLU): """ cluster = self.cfg.GetClusterInfo() default_vg = self.cfg.GetVGName() + ec_id = self.proc.GetECId() + insts = [_CreateInstanceAllocRequest(op, _ComputeDisks(op, default_vg), _ComputeNics(op, cluster, None, - self.cfg, self.proc), + self.cfg, ec_id), _ComputeFullBeParams(op, cluster)) for op in self.op.instances] + req = iallocator.IAReqMultiInstanceAlloc(instances=insts) ial = iallocator.IAllocator(self.cfg, self.rpc, req) @@ -10630,8 +10991,24 @@ class LUInstanceReplaceDisks(LogicalUnit): REQ_BGL = False def CheckArguments(self): - TLReplaceDisks.CheckArguments(self.op.mode, self.op.remote_node, - self.op.iallocator) + """Check arguments. + + """ + remote_node = self.op.remote_node + ialloc = self.op.iallocator + if self.op.mode == constants.REPLACE_DISK_CHG: + if remote_node is None and ialloc is None: + raise errors.OpPrereqError("When changing the secondary either an" + " iallocator script must be used or the" + " new node given", errors.ECODE_INVAL) + else: + _CheckIAllocatorOrNode(self, "iallocator", "remote_node") + + elif remote_node is not None or ialloc is not None: + # Not replacing the secondary + raise errors.OpPrereqError("The iallocator and new node options can" + " only be used when changing the" + " secondary node", errors.ECODE_INVAL) def ExpandNames(self): self._ExpandAndLockInstance() @@ -10659,12 +11036,13 @@ class LUInstanceReplaceDisks(LogicalUnit): if self.op.iallocator is not None: # iallocator will select a new node in the same group self.needed_locks[locking.LEVEL_NODEGROUP] = [] + self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET self.needed_locks[locking.LEVEL_NODE_RES] = [] self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode, self.op.iallocator, self.op.remote_node, - self.op.disks, False, self.op.early_release, + self.op.disks, self.op.early_release, self.op.ignore_ipolicy) self.tasklets = [self.replacer] @@ -10685,6 +11063,7 @@ class LUInstanceReplaceDisks(LogicalUnit): if self.op.iallocator is not None: assert self.op.remote_node is None assert not self.needed_locks[locking.LEVEL_NODE] + assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC) # Lock member nodes of all locked groups self.needed_locks[locking.LEVEL_NODE] = \ @@ -10692,7 +11071,10 @@ class LUInstanceReplaceDisks(LogicalUnit): for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP) for node_name in self.cfg.GetNodeGroup(group_uuid).members] else: + assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC) + self._LockInstancesNodes() + elif level == locking.LEVEL_NODE_RES: # Reuse node locks self.needed_locks[locking.LEVEL_NODE_RES] = \ @@ -10748,7 +11130,7 @@ class TLReplaceDisks(Tasklet): """ def __init__(self, lu, instance_name, mode, iallocator_name, remote_node, - disks, delay_iallocator, early_release, ignore_ipolicy): + disks, early_release, ignore_ipolicy): """Initializes this class. """ @@ -10760,7 +11142,6 @@ class TLReplaceDisks(Tasklet): self.iallocator_name = iallocator_name self.remote_node = remote_node self.disks = disks - self.delay_iallocator = delay_iallocator self.early_release = early_release self.ignore_ipolicy = ignore_ipolicy @@ -10773,28 +11154,6 @@ class TLReplaceDisks(Tasklet): self.node_secondary_ip = None @staticmethod - def CheckArguments(mode, remote_node, ialloc): - """Helper function for users of this class. - - """ - # check for valid parameter combination - if mode == constants.REPLACE_DISK_CHG: - if remote_node is None and ialloc is None: - raise errors.OpPrereqError("When changing the secondary either an" - " iallocator script must be used or the" - " new node given", errors.ECODE_INVAL) - - if remote_node is not None and ialloc is not None: - raise errors.OpPrereqError("Give either the iallocator or the new" - " secondary, not both", errors.ECODE_INVAL) - - elif remote_node is not None or ialloc is not None: - # Not replacing the secondary - raise errors.OpPrereqError("The iallocator and new node options can" - " only be used when changing the" - " secondary node", errors.ECODE_INVAL) - - @staticmethod def _RunAllocator(lu, iallocator_name, instance_name, relocate_from): """Compute a new secondary node using an IAllocator. @@ -10867,18 +11226,6 @@ class TLReplaceDisks(Tasklet): len(instance.secondary_nodes), errors.ECODE_FAULT) - if not self.delay_iallocator: - self._CheckPrereq2() - - def _CheckPrereq2(self): - """Check prerequisites, second part. - - This function should always be part of CheckPrereq. It was separated and is - now called from Exec because during node evacuation iallocator was only - called with an unmodified cluster model, not taking planned changes into - account. - - """ instance = self.instance secondary_node = instance.secondary_nodes[0] @@ -10999,10 +11346,10 @@ class TLReplaceDisks(Tasklet): # Release unneeded node and node resource locks _ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes) _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes) + _ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC) # Release any owned node group - if self.lu.glm.is_owned(locking.LEVEL_NODEGROUP): - _ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP) + _ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP) # Check whether disks are valid for disk_idx in self.disks: @@ -11018,9 +11365,6 @@ class TLReplaceDisks(Tasklet): This dispatches the disk replacement to the appropriate handler. """ - if self.delay_iallocator: - self._CheckPrereq2() - if __debug__: # Verify owned locks before starting operation owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE) @@ -11029,6 +11373,7 @@ class TLReplaceDisks(Tasklet): (owned_nodes, self.node_secondary_ip.keys())) assert (self.lu.owned_locks(locking.LEVEL_NODE) == self.lu.owned_locks(locking.LEVEL_NODE_RES)) + assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC) owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE) assert list(owned_instances) == [self.instance_name], \ @@ -11038,11 +11383,15 @@ class TLReplaceDisks(Tasklet): "Should not own any node group lock at this point" if not self.disks: - feedback_fn("No disks need replacement") + feedback_fn("No disks need replacement for instance '%s'" % + self.instance.name) return - feedback_fn("Replacing disk(s) %s for %s" % + feedback_fn("Replacing disk(s) %s for instance '%s'" % (utils.CommaJoin(self.disks), self.instance.name)) + feedback_fn("Current primary node: %s", self.instance.primary_node) + feedback_fn("Current seconary node: %s", + utils.CommaJoin(self.instance.secondary_nodes)) activate_disks = (self.instance.admin_state != constants.ADMINST_UP) @@ -11101,7 +11450,7 @@ class TLReplaceDisks(Tasklet): continue for node in nodes: - self.lu.LogInfo("Checking disk/%d on %s" % (idx, node)) + self.lu.LogInfo("Checking disk/%d on %s", idx, node) self.cfg.SetDiskID(dev, node) result = _BlockdevFind(self, node, dev, self.instance) @@ -11141,7 +11490,7 @@ class TLReplaceDisks(Tasklet): if idx not in self.disks: continue - self.lu.LogInfo("Adding storage on %s for disk/%d" % (node_name, idx)) + self.lu.LogInfo("Adding storage on %s for disk/%d", node_name, idx) self.cfg.SetDiskID(dev, node_name) @@ -11188,14 +11537,14 @@ class TLReplaceDisks(Tasklet): def _RemoveOldStorage(self, node_name, iv_names): for name, (_, old_lvs, _) in iv_names.iteritems(): - self.lu.LogInfo("Remove logical volumes for %s" % name) + self.lu.LogInfo("Remove logical volumes for %s", name) for lv in old_lvs: self.cfg.SetDiskID(lv, node_name) msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg if msg: - self.lu.LogWarning("Can't remove old LV: %s" % msg, + self.lu.LogWarning("Can't remove old LV: %s", msg, hint="remove unused LVs manually") def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613 @@ -11240,7 +11589,7 @@ class TLReplaceDisks(Tasklet): # Step: for each lv, detach+rename*2+attach self.lu.LogStep(4, steps_total, "Changing drbd configuration") for dev, old_lvs, new_lvs in iv_names.itervalues(): - self.lu.LogInfo("Detaching %s drbd from local storage" % dev.iv_name) + self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name) result = self.rpc.call_blockdev_removechildren(self.target_node, dev, old_lvs) @@ -11294,7 +11643,7 @@ class TLReplaceDisks(Tasklet): self.cfg.SetDiskID(disk, self.target_node) # Now that the new lvs have the old name, we can add them to the device - self.lu.LogInfo("Adding new mirror component on %s" % self.target_node) + self.lu.LogInfo("Adding new mirror component on %s", self.target_node) result = self.rpc.call_blockdev_addchildren(self.target_node, (dev, self.instance), new_lvs) msg = result.fail_msg @@ -11432,7 +11781,7 @@ class TLReplaceDisks(Tasklet): # We have new devices, shutdown the drbd on the old secondary for idx, dev in enumerate(self.instance.disks): - self.lu.LogInfo("Shutting down drbd for disk/%d on old node" % idx) + self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx) self.cfg.SetDiskID(dev, self.target_node) msg = self.rpc.call_blockdev_shutdown(self.target_node, (dev, self.instance)).fail_msg @@ -11544,7 +11893,7 @@ class LURepairNodeStorage(NoHooksLU): errors.ECODE_STATE) except errors.OpPrereqError, err: if self.op.ignore_consistency: - self.proc.LogWarning(str(err.args[0])) + self.LogWarning(str(err.args[0])) else: raise @@ -11765,8 +12114,7 @@ class LUNodeEvacuate(NoHooksLU): disks=[], mode=constants.REPLACE_DISK_CHG, early_release=self.op.early_release)] - for instance_name in self.instance_names - ] + for instance_name in self.instance_names] else: raise errors.ProgrammerError("No iallocator or remote node") @@ -11831,6 +12179,23 @@ def _LoadNodeEvacResult(lu, alloc_result, early_release, use_nodes): for ops in jobs] +def _DiskSizeInBytesToMebibytes(lu, size): + """Converts a disk size in bytes to mebibytes. + + Warns and rounds up if the size isn't an even multiple of 1 MiB. + + """ + (mib, remainder) = divmod(size, 1024 * 1024) + + if remainder != 0: + lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up" + " to not overwrite existing data (%s bytes will not be" + " wiped)", (1024 * 1024) - remainder) + mib += 1 + + return mib + + class LUInstanceGrowDisk(LogicalUnit): """Grow a disk of an instance. @@ -11852,7 +12217,7 @@ class LUInstanceGrowDisk(LogicalUnit): elif level == locking.LEVEL_NODE_RES: # Copy node locks self.needed_locks[locking.LEVEL_NODE_RES] = \ - self.needed_locks[locking.LEVEL_NODE][:] + _CopyLockList(self.needed_locks[locking.LEVEL_NODE]) def BuildHooksEnv(self): """Build hooks env. @@ -11932,6 +12297,8 @@ class LUInstanceGrowDisk(LogicalUnit): assert (self.owned_locks(locking.LEVEL_NODE) == self.owned_locks(locking.LEVEL_NODE_RES)) + wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks + disks_ok, _ = _AssembleInstanceDisks(self, self.instance, disks=[disk]) if not disks_ok: raise errors.OpExecError("Cannot activate block device to grow") @@ -11946,7 +12313,27 @@ class LUInstanceGrowDisk(LogicalUnit): self.cfg.SetDiskID(disk, node) result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta, True, True) - result.Raise("Grow request failed to node %s" % node) + result.Raise("Dry-run grow request failed to node %s" % node) + + if wipe_disks: + # Get disk size from primary node for wiping + result = self.rpc.call_blockdev_getsize(instance.primary_node, [disk]) + result.Raise("Failed to retrieve disk size from node '%s'" % + instance.primary_node) + + (disk_size_in_bytes, ) = result.payload + + if disk_size_in_bytes is None: + raise errors.OpExecError("Failed to retrieve disk size from primary" + " node '%s'" % instance.primary_node) + + old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes) + + assert old_disk_size >= disk.size, \ + ("Retrieved disk size too small (got %s, should be at least %s)" % + (old_disk_size, disk.size)) + else: + old_disk_size = None # We know that (as far as we can test) operations across different # nodes will succeed, time to run it for real on the backing storage @@ -11972,17 +12359,26 @@ class LUInstanceGrowDisk(LogicalUnit): # Downgrade lock while waiting for sync self.glm.downgrade(locking.LEVEL_INSTANCE) + assert wipe_disks ^ (old_disk_size is None) + + if wipe_disks: + assert instance.disks[self.op.disk] == disk + + # Wipe newly added disk space + _WipeDisks(self, instance, + disks=[(self.op.disk, disk, old_disk_size)]) + if self.op.wait_for_sync: disk_abort = not _WaitForSync(self, instance, disks=[disk]) if disk_abort: - self.proc.LogWarning("Disk sync-ing has not returned a good" - " status; please check the instance") + self.LogWarning("Disk syncing has not returned a good status; check" + " the instance") if instance.admin_state != constants.ADMINST_UP: _SafeShutdownInstanceDisks(self, instance, disks=[disk]) elif instance.admin_state != constants.ADMINST_UP: - self.proc.LogWarning("Not shutting down the disk even if the instance is" - " not supposed to be running because no wait for" - " sync mode was requested") + self.LogWarning("Not shutting down the disk even if the instance is" + " not supposed to be running because no wait for" + " sync mode was requested") assert self.owned_locks(locking.LEVEL_NODE_RES) assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE) @@ -12431,29 +12827,37 @@ class LUInstanceSetParams(LogicalUnit): """ if op in (constants.DDM_ADD, constants.DDM_MODIFY): ip = params.get(constants.INIC_IP, None) - if ip is None: - pass - elif ip.lower() == constants.VALUE_NONE: - params[constants.INIC_IP] = None - elif not netutils.IPAddress.IsValid(ip): - raise errors.OpPrereqError("Invalid IP address '%s'" % ip, - errors.ECODE_INVAL) - - bridge = params.get("bridge", None) - link = params.get(constants.INIC_LINK, None) - if bridge and link: - raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'" - " at the same time", errors.ECODE_INVAL) - elif bridge and bridge.lower() == constants.VALUE_NONE: - params["bridge"] = None - elif link and link.lower() == constants.VALUE_NONE: - params[constants.INIC_LINK] = None + req_net = params.get(constants.INIC_NETWORK, None) + link = params.get(constants.NIC_LINK, None) + mode = params.get(constants.NIC_MODE, None) + if req_net is not None: + if req_net.lower() == constants.VALUE_NONE: + params[constants.INIC_NETWORK] = None + req_net = None + elif link is not None or mode is not None: + raise errors.OpPrereqError("If network is given" + " mode or link should not", + errors.ECODE_INVAL) if op == constants.DDM_ADD: macaddr = params.get(constants.INIC_MAC, None) if macaddr is None: params[constants.INIC_MAC] = constants.VALUE_AUTO + if ip is not None: + if ip.lower() == constants.VALUE_NONE: + params[constants.INIC_IP] = None + else: + if ip.lower() == constants.NIC_IP_POOL: + if op == constants.DDM_ADD and req_net is None: + raise errors.OpPrereqError("If ip=pool, parameter network" + " cannot be none", + errors.ECODE_INVAL) + else: + if not netutils.IPAddress.IsValid(ip): + raise errors.OpPrereqError("Invalid IP address '%s'" % ip, + errors.ECODE_INVAL) + if constants.INIC_MAC in params: macaddr = params[constants.INIC_MAC] if macaddr not in (constants.VALUE_AUTO, constants.VALUE_GENERATE): @@ -12500,15 +12904,23 @@ class LUInstanceSetParams(LogicalUnit): def ExpandNames(self): self._ExpandAndLockInstance() + self.needed_locks[locking.LEVEL_NODEGROUP] = [] # Can't even acquire node locks in shared mode as upcoming changes in # Ganeti 2.6 will start to modify the node object on disk conversion self.needed_locks[locking.LEVEL_NODE] = [] self.needed_locks[locking.LEVEL_NODE_RES] = [] self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE + # Look node group to look up the ipolicy + self.share_locks[locking.LEVEL_NODEGROUP] = 1 def DeclareLocks(self, level): - # TODO: Acquire group lock in shared mode (disk parameters) - if level == locking.LEVEL_NODE: + if level == locking.LEVEL_NODEGROUP: + assert not self.needed_locks[locking.LEVEL_NODEGROUP] + # Acquire locks for the instance's nodegroups optimistically. Needs + # to be verified in CheckPrereq + self.needed_locks[locking.LEVEL_NODEGROUP] = \ + self.cfg.GetInstanceNodeGroups(self.op.instance_name) + elif level == locking.LEVEL_NODE: self._LockInstancesNodes() if self.op.disk_template and self.op.remote_node: self.op.remote_node = _ExpandNodeName(self.cfg, self.op.remote_node) @@ -12516,7 +12928,7 @@ class LUInstanceSetParams(LogicalUnit): elif level == locking.LEVEL_NODE_RES and self.op.disk_template: # Copy node locks self.needed_locks[locking.LEVEL_NODE_RES] = \ - self.needed_locks[locking.LEVEL_NODE][:] + _CopyLockList(self.needed_locks[locking.LEVEL_NODE]) def BuildHooksEnv(self): """Build hooks env. @@ -12524,7 +12936,7 @@ class LUInstanceSetParams(LogicalUnit): This runs on the master, primary and secondaries. """ - args = dict() + args = {} if constants.BE_MINMEM in self.be_new: args["minmem"] = self.be_new[constants.BE_MINMEM] if constants.BE_MAXMEM in self.be_new: @@ -12538,10 +12950,10 @@ class LUInstanceSetParams(LogicalUnit): nics = [] for nic in self._new_nics: - nicparams = self.cluster.SimpleFillNIC(nic.nicparams) - mode = nicparams[constants.NIC_MODE] - link = nicparams[constants.NIC_LINK] - nics.append((nic.ip, nic.mac, mode, link)) + n = copy.deepcopy(nic) + nicparams = self.cluster.SimpleFillNIC(n.nicparams) + n.nicparams = nicparams + nics.append(_NICToTuple(self, n)) args["nics"] = nics @@ -12560,16 +12972,27 @@ class LUInstanceSetParams(LogicalUnit): nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes) return (nl, nl) - def _PrepareNicModification(self, params, private, old_ip, old_params, - cluster, pnode): + def _PrepareNicModification(self, params, private, old_ip, old_net, + old_params, cluster, pnode): + update_params_dict = dict([(key, params[key]) for key in constants.NICS_PARAMETERS if key in params]) - if "bridge" in params: - update_params_dict[constants.NIC_LINK] = params["bridge"] + req_link = update_params_dict.get(constants.NIC_LINK, None) + req_mode = update_params_dict.get(constants.NIC_MODE, None) + + new_net = params.get(constants.INIC_NETWORK, old_net) + if new_net is not None: + netparams = self.cfg.GetGroupNetParams(new_net, pnode) + if netparams is None: + raise errors.OpPrereqError("No netparams found for the network" + " %s, probably not connected" % new_net, + errors.ECODE_INVAL) + new_params = dict(netparams) + else: + new_params = _GetUpdatedParams(old_params, update_params_dict) - new_params = _GetUpdatedParams(old_params, update_params_dict) utils.ForceDictType(new_params, constants.NICS_PARAMETER_TYPES) new_filled_params = cluster.SimpleFillNIC(new_params) @@ -12600,7 +13023,7 @@ class LUInstanceSetParams(LogicalUnit): elif mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE): # otherwise generate the MAC address params[constants.INIC_MAC] = \ - self.cfg.GenerateMAC(self.proc.GetECId()) + self.cfg.GenerateMAC(new_net, self.proc.GetECId()) else: # or validate/reserve the current one try: @@ -12609,6 +13032,66 @@ class LUInstanceSetParams(LogicalUnit): raise errors.OpPrereqError("MAC address '%s' already in use" " in cluster" % mac, errors.ECODE_NOTUNIQUE) + elif new_net != old_net: + + def get_net_prefix(net): + if net: + uuid = self.cfg.LookupNetwork(net) + if uuid: + nobj = self.cfg.GetNetwork(uuid) + return nobj.mac_prefix + return None + + new_prefix = get_net_prefix(new_net) + old_prefix = get_net_prefix(old_net) + if old_prefix != new_prefix: + params[constants.INIC_MAC] = \ + self.cfg.GenerateMAC(new_net, self.proc.GetECId()) + + #if there is a change in nic-network configuration + new_ip = params.get(constants.INIC_IP, old_ip) + if (new_ip, new_net) != (old_ip, old_net): + if new_ip: + if new_net: + if new_ip.lower() == constants.NIC_IP_POOL: + try: + new_ip = self.cfg.GenerateIp(new_net, self.proc.GetECId()) + except errors.ReservationError: + raise errors.OpPrereqError("Unable to get a free IP" + " from the address pool", + errors.ECODE_STATE) + self.LogInfo("Chose IP %s from pool %s", new_ip, new_net) + params[constants.INIC_IP] = new_ip + elif new_ip != old_ip or new_net != old_net: + try: + self.LogInfo("Reserving IP %s in pool %s", new_ip, new_net) + self.cfg.ReserveIp(new_net, new_ip, self.proc.GetECId()) + except errors.ReservationError: + raise errors.OpPrereqError("IP %s not available in network %s" % + (new_ip, new_net), + errors.ECODE_NOTUNIQUE) + elif new_ip.lower() == constants.NIC_IP_POOL: + raise errors.OpPrereqError("ip=pool, but no network found", + errors.ECODE_INVAL) + else: + # new net is None + if self.op.conflicts_check: + _CheckForConflictingIp(self, new_ip, pnode) + + if old_ip: + if old_net: + try: + self.cfg.ReleaseIp(old_net, old_ip, self.proc.GetECId()) + except errors.AddressPoolError: + logging.warning("Release IP %s not contained in network %s", + old_ip, old_net) + + # there are no changes in (net, ip) tuple + elif (old_net is not None and + (req_link is not None or req_mode is not None)): + raise errors.OpPrereqError("Not allowed to change link or mode of" + " a NIC that is connected to a network", + errors.ECODE_INVAL) private.params = new_params private.filled = new_filled_params @@ -12619,17 +13102,26 @@ class LUInstanceSetParams(LogicalUnit): This only checks the instance list against the existing names. """ - # checking the new params on the primary/secondary nodes - + assert self.op.instance_name in self.owned_locks(locking.LEVEL_INSTANCE) instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name) + cluster = self.cluster = self.cfg.GetClusterInfo() assert self.instance is not None, \ "Cannot retrieve locked instance %s" % self.op.instance_name + pnode = instance.primary_node + assert pnode in self.owned_locks(locking.LEVEL_NODE) nodelist = list(instance.all_nodes) pnode_info = self.cfg.GetNodeInfo(pnode) self.diskparams = self.cfg.GetInstanceDiskParams(instance) + #_CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups) + assert pnode_info.group in self.owned_locks(locking.LEVEL_NODEGROUP) + group_info = self.cfg.GetNodeGroup(pnode_info.group) + + # dictionary with instance information after the modification + ispec = {} + # Prepare disk/NIC modifications self.diskmod = PrepareContainerMods(self.op.disks, None) self.nicmod = PrepareContainerMods(self.op.nics, _InstNicModPrivate) @@ -12835,30 +13327,36 @@ class LUInstanceSetParams(LogicalUnit): self.be_proposed[constants.BE_MAXMEM]), errors.ECODE_INVAL) - if self.op.runtime_mem > current_memory: + delta = self.op.runtime_mem - current_memory + if delta > 0: _CheckNodeFreeMemory(self, instance.primary_node, "ballooning memory for instance %s" % - instance.name, - self.op.memory - current_memory, - instance.hypervisor) + instance.name, delta, instance.hypervisor) if self.op.disks and instance.disk_template == constants.DT_DISKLESS: raise errors.OpPrereqError("Disk operations not supported for" " diskless instances", errors.ECODE_INVAL) def _PrepareNicCreate(_, params, private): - self._PrepareNicModification(params, private, None, {}, cluster, pnode) + self._PrepareNicModification(params, private, None, None, + {}, cluster, pnode) return (None, None) def _PrepareNicMod(_, nic, params, private): - self._PrepareNicModification(params, private, nic.ip, + self._PrepareNicModification(params, private, nic.ip, nic.network, nic.nicparams, cluster, pnode) return None + def _PrepareNicRemove(_, params, __): + ip = params.ip + net = params.network + if net is not None and ip is not None: + self.cfg.ReleaseIp(net, ip, self.proc.GetECId()) + # Verify NIC changes (operating on copy) nics = instance.nics[:] ApplyContainerMods("NIC", nics, None, self.nicmod, - _PrepareNicCreate, _PrepareNicMod, None) + _PrepareNicCreate, _PrepareNicMod, _PrepareNicRemove) if len(nics) > constants.MAX_NICS: raise errors.OpPrereqError("Instance has too many network interfaces" " (%d), cannot add more" % constants.MAX_NICS, @@ -12871,6 +13369,11 @@ class LUInstanceSetParams(LogicalUnit): raise errors.OpPrereqError("Instance has too many disks (%d), cannot add" " more" % constants.MAX_DISKS, errors.ECODE_STATE) + disk_sizes = [disk.size for disk in instance.disks] + disk_sizes.extend(params["size"] for (op, idx, params, private) in + self.diskmod if op == constants.DDM_ADD) + ispec[constants.ISPEC_DISK_COUNT] = len(disk_sizes) + ispec[constants.ISPEC_DISK_SIZE] = disk_sizes if self.op.offline is not None: if self.op.offline: @@ -12887,8 +13390,38 @@ class LUInstanceSetParams(LogicalUnit): ApplyContainerMods("NIC", nics, self._nic_chgdesc, self.nicmod, self._CreateNewNic, self._ApplyNicMods, None) self._new_nics = nics + ispec[constants.ISPEC_NIC_COUNT] = len(self._new_nics) else: self._new_nics = None + ispec[constants.ISPEC_NIC_COUNT] = len(instance.nics) + + if not self.op.ignore_ipolicy: + ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster, + group_info) + + # Fill ispec with backend parameters + ispec[constants.ISPEC_SPINDLE_USE] = \ + self.be_new.get(constants.BE_SPINDLE_USE, None) + ispec[constants.ISPEC_CPU_COUNT] = self.be_new.get(constants.BE_VCPUS, + None) + + # Copy ispec to verify parameters with min/max values separately + ispec_max = ispec.copy() + ispec_max[constants.ISPEC_MEM_SIZE] = \ + self.be_new.get(constants.BE_MAXMEM, None) + res_max = _ComputeIPolicyInstanceSpecViolation(ipolicy, ispec_max) + ispec_min = ispec.copy() + ispec_min[constants.ISPEC_MEM_SIZE] = \ + self.be_new.get(constants.BE_MINMEM, None) + res_min = _ComputeIPolicyInstanceSpecViolation(ipolicy, ispec_min) + + if (res_max or res_min): + # FIXME: Improve error message by including information about whether + # the upper or lower limit of the parameter fails the ipolicy. + msg = ("Instance allocation to group %s (%s) violates policy: %s" % + (group_info, group_info.name, + utils.CommaJoin(set(res_max + res_min)))) + raise errors.OpPrereqError(msg, errors.ECODE_INVAL) def _ConvertPlainToDrbd(self, feedback_fn): """Converts an instance from plain to drbd. @@ -13075,13 +13608,16 @@ class LUInstanceSetParams(LogicalUnit): """ mac = params[constants.INIC_MAC] ip = params.get(constants.INIC_IP, None) - nicparams = private.params + net = params.get(constants.INIC_NETWORK, None) + #TODO: not private.filled?? can a nic have no nicparams?? + nicparams = private.filled - return (objects.NIC(mac=mac, ip=ip, nicparams=nicparams), [ + return (objects.NIC(mac=mac, ip=ip, network=net, nicparams=nicparams), [ ("nic.%d" % idx, - "add:mac=%s,ip=%s,mode=%s,link=%s" % + "add:mac=%s,ip=%s,mode=%s,link=%s,network=%s" % (mac, ip, private.filled[constants.NIC_MODE], - private.filled[constants.NIC_LINK])), + private.filled[constants.NIC_LINK], + net)), ]) @staticmethod @@ -13091,15 +13627,15 @@ class LUInstanceSetParams(LogicalUnit): """ changes = [] - for key in [constants.INIC_MAC, constants.INIC_IP]: + for key in [constants.INIC_MAC, constants.INIC_IP, constants.INIC_NETWORK]: if key in params: changes.append(("nic.%s/%d" % (key, idx), params[key])) setattr(nic, key, params[key]) - if private.params: - nic.nicparams = private.params + if private.filled: + nic.nicparams = private.filled - for (key, val) in params.items(): + for (key, val) in nic.nicparams.items(): changes.append(("nic.%s/%d" % (key, idx), val)) return changes @@ -13207,7 +13743,7 @@ class LUInstanceSetParams(LogicalUnit): self.cfg.MarkInstanceDown(instance.name) result.append(("admin_state", constants.ADMINST_DOWN)) - self.cfg.Update(instance, feedback_fn) + self.cfg.Update(instance, feedback_fn, self.proc.GetECId()) assert not (self.owned_locks(locking.LEVEL_NODE_RES) or self.owned_locks(locking.LEVEL_NODE)), \ @@ -13228,9 +13764,11 @@ class LUInstanceChangeGroup(LogicalUnit): def ExpandNames(self): self.share_locks = _ShareAll() + self.needed_locks = { locking.LEVEL_NODEGROUP: [], locking.LEVEL_NODE: [], + locking.LEVEL_NODE_ALLOC: locking.ALL_SET, } self._ExpandAndLockInstance() @@ -13796,11 +14334,19 @@ class LUBackupRemove(NoHooksLU): REQ_BGL = False def ExpandNames(self): - self.needed_locks = {} - # We need all nodes to be locked in order for RemoveExport to work, but we - # don't need to lock the instance itself, as nothing will happen to it (and - # we can remove exports also for a removed instance) - self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET + self.needed_locks = { + # We need all nodes to be locked in order for RemoveExport to work, but + # we don't need to lock the instance itself, as nothing will happen to it + # (and we can remove exports also for a removed instance) + locking.LEVEL_NODE: locking.ALL_SET, + + # Removing backups is quick, so blocking allocations is justified + locking.LEVEL_NODE_ALLOC: locking.ALL_SET, + } + + # Allocations should be stopped while this LU runs with node locks, but it + # doesn't have to be exclusive + self.share_locks[locking.LEVEL_NODE_ALLOC] = 1 def Exec(self, feedback_fn): """Remove any export. @@ -14640,6 +15186,10 @@ class TagsLU(NoHooksLU): # pylint: disable=W0223 self.group_uuid = self.cfg.LookupNodeGroup(self.op.name) lock_level = locking.LEVEL_NODEGROUP lock_name = self.group_uuid + elif self.op.kind == constants.TAG_NETWORK: + self.network_uuid = self.cfg.LookupNetwork(self.op.name) + lock_level = locking.LEVEL_NETWORK + lock_name = self.network_uuid else: lock_level = None lock_name = None @@ -14662,6 +15212,8 @@ class TagsLU(NoHooksLU): # pylint: disable=W0223 self.target = self.cfg.GetInstanceInfo(self.op.name) elif self.op.kind == constants.TAG_NODEGROUP: self.target = self.cfg.GetNodeGroup(self.group_uuid) + elif self.op.kind == constants.TAG_NETWORK: + self.target = self.cfg.GetNetwork(self.network_uuid) else: raise errors.OpPrereqError("Wrong tag type requested (%s)" % str(self.op.kind), errors.ECODE_INVAL) @@ -14833,10 +15385,57 @@ class LUTestDelay(NoHooksLU): else: top_value = self.op.repeat - 1 for i in range(self.op.repeat): - self.LogInfo("Test delay iteration %d/%d" % (i, top_value)) + self.LogInfo("Test delay iteration %d/%d", i, top_value) self._TestDelay() +class LURestrictedCommand(NoHooksLU): + """Logical unit for executing restricted commands. + + """ + REQ_BGL = False + + def ExpandNames(self): + if self.op.nodes: + self.op.nodes = _GetWantedNodes(self, self.op.nodes) + + self.needed_locks = { + locking.LEVEL_NODE: self.op.nodes, + } + self.share_locks = { + locking.LEVEL_NODE: not self.op.use_locking, + } + + def CheckPrereq(self): + """Check prerequisites. + + """ + + def Exec(self, feedback_fn): + """Execute restricted command and return output. + + """ + owned_nodes = frozenset(self.owned_locks(locking.LEVEL_NODE)) + + # Check if correct locks are held + assert set(self.op.nodes).issubset(owned_nodes) + + rpcres = self.rpc.call_restricted_command(self.op.nodes, self.op.command) + + result = [] + + for node_name in self.op.nodes: + nres = rpcres[node_name] + if nres.fail_msg: + msg = ("Command '%s' on node '%s' failed: %s" % + (self.op.command, node_name, nres.fail_msg)) + result.append((False, msg)) + else: + result.append((True, nres.payload)) + + return result + + class LUTestJqueue(NoHooksLU): """Utility LU to test some aspects of the job queue. @@ -15027,7 +15626,7 @@ class LUTestAllocator(NoHooksLU): self.op.mode, errors.ECODE_INVAL) if self.op.direction == constants.IALLOCATOR_DIR_OUT: - if self.op.allocator is None: + if self.op.iallocator is None: raise errors.OpPrereqError("Missing allocator name", errors.ECODE_INVAL) elif self.op.direction != constants.IALLOCATOR_DIR_IN: @@ -15080,17 +15679,678 @@ class LUTestAllocator(NoHooksLU): if self.op.direction == constants.IALLOCATOR_DIR_IN: result = ial.in_text else: - ial.Run(self.op.allocator, validate=False) + ial.Run(self.op.iallocator, validate=False) result = ial.out_text return result +class LUNetworkAdd(LogicalUnit): + """Logical unit for creating networks. + + """ + HPATH = "network-add" + HTYPE = constants.HTYPE_NETWORK + REQ_BGL = False + + def BuildHooksNodes(self): + """Build hooks nodes. + + """ + mn = self.cfg.GetMasterNode() + return ([mn], [mn]) + + def CheckArguments(self): + if self.op.mac_prefix: + self.op.mac_prefix = \ + utils.NormalizeAndValidateThreeOctetMacPrefix(self.op.mac_prefix) + + def ExpandNames(self): + self.network_uuid = self.cfg.GenerateUniqueID(self.proc.GetECId()) + + if self.op.conflicts_check: + self.share_locks[locking.LEVEL_NODE] = 1 + self.needed_locks = { + locking.LEVEL_NODE: locking.ALL_SET, + } + else: + self.needed_locks = {} + + self.add_locks[locking.LEVEL_NETWORK] = self.network_uuid + + def CheckPrereq(self): + if self.op.network is None: + raise errors.OpPrereqError("Network must be given", + errors.ECODE_INVAL) + + uuid = self.cfg.LookupNetwork(self.op.network_name) + + if uuid: + raise errors.OpPrereqError("Network '%s' already defined" % + self.op.network, errors.ECODE_EXISTS) + + # Check tag validity + for tag in self.op.tags: + objects.TaggableObject.ValidateTag(tag) + + def BuildHooksEnv(self): + """Build hooks env. + + """ + args = { + "name": self.op.network_name, + "subnet": self.op.network, + "gateway": self.op.gateway, + "network6": self.op.network6, + "gateway6": self.op.gateway6, + "mac_prefix": self.op.mac_prefix, + "network_type": self.op.network_type, + "tags": self.op.tags, + } + return _BuildNetworkHookEnv(**args) # pylint: disable=W0142 + + def Exec(self, feedback_fn): + """Add the ip pool to the cluster. + + """ + nobj = objects.Network(name=self.op.network_name, + network=self.op.network, + gateway=self.op.gateway, + network6=self.op.network6, + gateway6=self.op.gateway6, + mac_prefix=self.op.mac_prefix, + network_type=self.op.network_type, + uuid=self.network_uuid, + family=constants.IP4_VERSION) + # Initialize the associated address pool + try: + pool = network.AddressPool.InitializeNetwork(nobj) + except errors.AddressPoolError, e: + raise errors.OpExecError("Cannot create IP pool for this network. %s" % e) + + # Check if we need to reserve the nodes and the cluster master IP + # These may not be allocated to any instances in routed mode, as + # they wouldn't function anyway. + if self.op.conflicts_check: + for node in self.cfg.GetAllNodesInfo().values(): + for ip in [node.primary_ip, node.secondary_ip]: + try: + if pool.Contains(ip): + pool.Reserve(ip) + self.LogInfo("Reserved IP address of node '%s' (%s)", + node.name, ip) + except errors.AddressPoolError: + self.LogWarning("Cannot reserve IP address of node '%s' (%s)", + node.name, ip) + + master_ip = self.cfg.GetClusterInfo().master_ip + try: + if pool.Contains(master_ip): + pool.Reserve(master_ip) + self.LogInfo("Reserved cluster master IP address (%s)", master_ip) + except errors.AddressPoolError: + self.LogWarning("Cannot reserve cluster master IP address (%s)", + master_ip) + + if self.op.add_reserved_ips: + for ip in self.op.add_reserved_ips: + try: + pool.Reserve(ip, external=True) + except errors.AddressPoolError, e: + raise errors.OpExecError("Cannot reserve IP %s. %s " % (ip, e)) + + if self.op.tags: + for tag in self.op.tags: + nobj.AddTag(tag) + + self.cfg.AddNetwork(nobj, self.proc.GetECId(), check_uuid=False) + del self.remove_locks[locking.LEVEL_NETWORK] + + +class LUNetworkRemove(LogicalUnit): + HPATH = "network-remove" + HTYPE = constants.HTYPE_NETWORK + REQ_BGL = False + + def ExpandNames(self): + self.network_uuid = self.cfg.LookupNetwork(self.op.network_name) + + if not self.network_uuid: + raise errors.OpPrereqError(("Network '%s' not found" % + self.op.network_name), + errors.ECODE_INVAL) + + self.share_locks[locking.LEVEL_NODEGROUP] = 1 + self.needed_locks = { + locking.LEVEL_NETWORK: [self.network_uuid], + locking.LEVEL_NODEGROUP: locking.ALL_SET, + } + + def CheckPrereq(self): + """Check prerequisites. + + This checks that the given network name exists as a network, that is + empty (i.e., contains no nodes), and that is not the last group of the + cluster. + + """ + # Verify that the network is not conncted. + node_groups = [group.name + for group in self.cfg.GetAllNodeGroupsInfo().values() + if self.network_uuid in group.networks] + + if node_groups: + self.LogWarning("Network '%s' is connected to the following" + " node groups: %s" % + (self.op.network_name, + utils.CommaJoin(utils.NiceSort(node_groups)))) + raise errors.OpPrereqError("Network still connected", errors.ECODE_STATE) + + def BuildHooksEnv(self): + """Build hooks env. + + """ + return { + "NETWORK_NAME": self.op.network_name, + } + + def BuildHooksNodes(self): + """Build hooks nodes. + + """ + mn = self.cfg.GetMasterNode() + return ([mn], [mn]) + + def Exec(self, feedback_fn): + """Remove the network. + + """ + try: + self.cfg.RemoveNetwork(self.network_uuid) + except errors.ConfigurationError: + raise errors.OpExecError("Network '%s' with UUID %s disappeared" % + (self.op.network_name, self.network_uuid)) + + +class LUNetworkSetParams(LogicalUnit): + """Modifies the parameters of a network. + + """ + HPATH = "network-modify" + HTYPE = constants.HTYPE_NETWORK + REQ_BGL = False + + def CheckArguments(self): + if (self.op.gateway and + (self.op.add_reserved_ips or self.op.remove_reserved_ips)): + raise errors.OpPrereqError("Cannot modify gateway and reserved ips" + " at once", errors.ECODE_INVAL) + + def ExpandNames(self): + self.network_uuid = self.cfg.LookupNetwork(self.op.network_name) + if self.network_uuid is None: + raise errors.OpPrereqError(("Network '%s' not found" % + self.op.network_name), + errors.ECODE_INVAL) + + self.needed_locks = { + locking.LEVEL_NETWORK: [self.network_uuid], + } + + def CheckPrereq(self): + """Check prerequisites. + + """ + self.network = self.cfg.GetNetwork(self.network_uuid) + self.gateway = self.network.gateway + self.network_type = self.network.network_type + self.mac_prefix = self.network.mac_prefix + self.network6 = self.network.network6 + self.gateway6 = self.network.gateway6 + self.tags = self.network.tags + + self.pool = network.AddressPool(self.network) + + if self.op.gateway: + if self.op.gateway == constants.VALUE_NONE: + self.gateway = None + else: + self.gateway = self.op.gateway + if self.pool.IsReserved(self.gateway): + raise errors.OpPrereqError("%s is already reserved" % + self.gateway, errors.ECODE_INVAL) + + if self.op.network_type: + if self.op.network_type == constants.VALUE_NONE: + self.network_type = None + else: + self.network_type = self.op.network_type + + if self.op.mac_prefix: + if self.op.mac_prefix == constants.VALUE_NONE: + self.mac_prefix = None + else: + self.mac_prefix = \ + utils.NormalizeAndValidateThreeOctetMacPrefix(self.op.mac_prefix) + + if self.op.gateway6: + if self.op.gateway6 == constants.VALUE_NONE: + self.gateway6 = None + else: + self.gateway6 = self.op.gateway6 + + if self.op.network6: + if self.op.network6 == constants.VALUE_NONE: + self.network6 = None + else: + self.network6 = self.op.network6 + + def BuildHooksEnv(self): + """Build hooks env. + + """ + args = { + "name": self.op.network_name, + "subnet": self.network.network, + "gateway": self.gateway, + "network6": self.network6, + "gateway6": self.gateway6, + "mac_prefix": self.mac_prefix, + "network_type": self.network_type, + "tags": self.tags, + } + return _BuildNetworkHookEnv(**args) # pylint: disable=W0142 + + def BuildHooksNodes(self): + """Build hooks nodes. + + """ + mn = self.cfg.GetMasterNode() + return ([mn], [mn]) + + def Exec(self, feedback_fn): + """Modifies the network. + + """ + #TODO: reserve/release via temporary reservation manager + # extend cfg.ReserveIp/ReleaseIp with the external flag + if self.op.gateway: + if self.gateway == self.network.gateway: + self.LogWarning("Gateway is already %s", self.gateway) + else: + if self.gateway: + self.pool.Reserve(self.gateway, external=True) + if self.network.gateway: + self.pool.Release(self.network.gateway, external=True) + self.network.gateway = self.gateway + + if self.op.add_reserved_ips: + for ip in self.op.add_reserved_ips: + try: + if self.pool.IsReserved(ip): + self.LogWarning("IP address %s is already reserved", ip) + else: + self.pool.Reserve(ip, external=True) + except errors.AddressPoolError, err: + self.LogWarning("Cannot reserve IP address %s: %s", ip, err) + + if self.op.remove_reserved_ips: + for ip in self.op.remove_reserved_ips: + if ip == self.network.gateway: + self.LogWarning("Cannot unreserve Gateway's IP") + continue + try: + if not self.pool.IsReserved(ip): + self.LogWarning("IP address %s is already unreserved", ip) + else: + self.pool.Release(ip, external=True) + except errors.AddressPoolError, err: + self.LogWarning("Cannot release IP address %s: %s", ip, err) + + if self.op.mac_prefix: + self.network.mac_prefix = self.mac_prefix + + if self.op.network6: + self.network.network6 = self.network6 + + if self.op.gateway6: + self.network.gateway6 = self.gateway6 + + if self.op.network_type: + self.network.network_type = self.network_type + + self.pool.Validate() + + self.cfg.Update(self.network, feedback_fn) + + +class _NetworkQuery(_QueryBase): + FIELDS = query.NETWORK_FIELDS + + def ExpandNames(self, lu): + lu.needed_locks = {} + + self._all_networks = lu.cfg.GetAllNetworksInfo() + name_to_uuid = dict((n.name, n.uuid) for n in self._all_networks.values()) + + if not self.names: + self.wanted = [name_to_uuid[name] + for name in utils.NiceSort(name_to_uuid.keys())] + else: + # Accept names to be either names or UUIDs. + missing = [] + self.wanted = [] + all_uuid = frozenset(self._all_networks.keys()) + + for name in self.names: + if name in all_uuid: + self.wanted.append(name) + elif name in name_to_uuid: + self.wanted.append(name_to_uuid[name]) + else: + missing.append(name) + + if missing: + raise errors.OpPrereqError("Some networks do not exist: %s" % missing, + errors.ECODE_NOENT) + + def DeclareLocks(self, lu, level): + pass + + def _GetQueryData(self, lu): + """Computes the list of networks and their attributes. + + """ + do_instances = query.NETQ_INST in self.requested_data + do_groups = do_instances or (query.NETQ_GROUP in self.requested_data) + do_stats = query.NETQ_STATS in self.requested_data + + network_to_groups = None + network_to_instances = None + stats = None + + # For NETQ_GROUP, we need to map network->[groups] + if do_groups: + all_groups = lu.cfg.GetAllNodeGroupsInfo() + network_to_groups = dict((uuid, []) for uuid in self.wanted) + + if do_instances: + all_instances = lu.cfg.GetAllInstancesInfo() + all_nodes = lu.cfg.GetAllNodesInfo() + network_to_instances = dict((uuid, []) for uuid in self.wanted) + + for group in all_groups.values(): + if do_instances: + group_nodes = [node.name for node in all_nodes.values() if + node.group == group.uuid] + group_instances = [instance for instance in all_instances.values() + if instance.primary_node in group_nodes] + + for net_uuid in group.networks.keys(): + if net_uuid in network_to_groups: + netparams = group.networks[net_uuid] + mode = netparams[constants.NIC_MODE] + link = netparams[constants.NIC_LINK] + info = group.name + "(" + mode + ", " + link + ")" + network_to_groups[net_uuid].append(info) + + if do_instances: + for instance in group_instances: + for nic in instance.nics: + if nic.network == self._all_networks[net_uuid].name: + network_to_instances[net_uuid].append(instance.name) + break + + if do_stats: + stats = {} + for uuid, net in self._all_networks.items(): + if uuid in self.wanted: + pool = network.AddressPool(net) + stats[uuid] = { + "free_count": pool.GetFreeCount(), + "reserved_count": pool.GetReservedCount(), + "map": pool.GetMap(), + "external_reservations": + utils.CommaJoin(pool.GetExternalReservations()), + } + + return query.NetworkQueryData([self._all_networks[uuid] + for uuid in self.wanted], + network_to_groups, + network_to_instances, + stats) + + +class LUNetworkQuery(NoHooksLU): + """Logical unit for querying networks. + + """ + REQ_BGL = False + + def CheckArguments(self): + self.nq = _NetworkQuery(qlang.MakeSimpleFilter("name", self.op.names), + self.op.output_fields, False) + + def ExpandNames(self): + self.nq.ExpandNames(self) + + def Exec(self, feedback_fn): + return self.nq.OldStyleQuery(self) + + +class LUNetworkConnect(LogicalUnit): + """Connect a network to a nodegroup + + """ + HPATH = "network-connect" + HTYPE = constants.HTYPE_NETWORK + REQ_BGL = False + + def ExpandNames(self): + self.network_name = self.op.network_name + self.group_name = self.op.group_name + self.network_mode = self.op.network_mode + self.network_link = self.op.network_link + + self.network_uuid = self.cfg.LookupNetwork(self.network_name) + if self.network_uuid is None: + raise errors.OpPrereqError("Network %s does not exist" % + self.network_name, errors.ECODE_INVAL) + + self.group_uuid = self.cfg.LookupNodeGroup(self.group_name) + if self.group_uuid is None: + raise errors.OpPrereqError("Group %s does not exist" % + self.group_name, errors.ECODE_INVAL) + + self.needed_locks = { + locking.LEVEL_INSTANCE: [], + locking.LEVEL_NODEGROUP: [self.group_uuid], + } + self.share_locks[locking.LEVEL_INSTANCE] = 1 + + if self.op.conflicts_check: + self.needed_locks[locking.LEVEL_NETWORK] = [self.network_uuid] + self.share_locks[locking.LEVEL_NETWORK] = 1 + + def DeclareLocks(self, level): + if level == locking.LEVEL_INSTANCE: + assert not self.needed_locks[locking.LEVEL_INSTANCE] + + # Lock instances optimistically, needs verification once group lock has + # been acquired + if self.op.conflicts_check: + self.needed_locks[locking.LEVEL_INSTANCE] = \ + self.cfg.GetNodeGroupInstances(self.group_uuid) + + def BuildHooksEnv(self): + ret = { + "GROUP_NAME": self.group_name, + "GROUP_NETWORK_MODE": self.network_mode, + "GROUP_NETWORK_LINK": self.network_link, + } + return ret + + def BuildHooksNodes(self): + nodes = self.cfg.GetNodeGroup(self.group_uuid).members + return (nodes, nodes) + + def CheckPrereq(self): + owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP)) + + assert self.group_uuid in owned_groups + + l = lambda value: utils.CommaJoin("%s: %s/%s" % (i[0], i[1], i[2]) + for i in value) + + self.netparams = { + constants.NIC_MODE: self.network_mode, + constants.NIC_LINK: self.network_link, + } + objects.NIC.CheckParameterSyntax(self.netparams) + + self.group = self.cfg.GetNodeGroup(self.group_uuid) + #if self.network_mode == constants.NIC_MODE_BRIDGED: + # _CheckNodeGroupBridgesExist(self, self.network_link, self.group_uuid) + self.connected = False + if self.network_uuid in self.group.networks: + self.LogWarning("Network '%s' is already mapped to group '%s'" % + (self.network_name, self.group.name)) + self.connected = True + return + + if self.op.conflicts_check: + # Check if locked instances are still correct + owned_instances = frozenset(self.owned_locks(locking.LEVEL_INSTANCE)) + _CheckNodeGroupInstances(self.cfg, self.group_uuid, owned_instances) + + nobj = self.cfg.GetNetwork(self.network_uuid) + pool = network.AddressPool(nobj) + conflicting_instances = [] + + for (_, instance) in self.cfg.GetMultiInstanceInfo(owned_instances): + for idx, nic in enumerate(instance.nics): + if pool.Contains(nic.ip): + conflicting_instances.append((instance.name, idx, nic.ip)) + + if conflicting_instances: + self.LogWarning("Following occurences use IPs from network %s" + " that is about to connect to nodegroup %s: %s" % + (self.network_name, self.group.name, + l(conflicting_instances))) + raise errors.OpPrereqError("Conflicting IPs found." + " Please remove/modify" + " corresponding NICs", + errors.ECODE_INVAL) + + def Exec(self, feedback_fn): + if self.connected: + return + + self.group.networks[self.network_uuid] = self.netparams + self.cfg.Update(self.group, feedback_fn) + + +class LUNetworkDisconnect(LogicalUnit): + """Disconnect a network to a nodegroup + + """ + HPATH = "network-disconnect" + HTYPE = constants.HTYPE_NETWORK + REQ_BGL = False + + def ExpandNames(self): + self.network_name = self.op.network_name + self.group_name = self.op.group_name + + self.network_uuid = self.cfg.LookupNetwork(self.network_name) + if self.network_uuid is None: + raise errors.OpPrereqError("Network %s does not exist" % + self.network_name, errors.ECODE_INVAL) + + self.group_uuid = self.cfg.LookupNodeGroup(self.group_name) + if self.group_uuid is None: + raise errors.OpPrereqError("Group %s does not exist" % + self.group_name, errors.ECODE_INVAL) + + self.needed_locks = { + locking.LEVEL_INSTANCE: [], + locking.LEVEL_NODEGROUP: [self.group_uuid], + } + self.share_locks[locking.LEVEL_INSTANCE] = 1 + + def DeclareLocks(self, level): + if level == locking.LEVEL_INSTANCE: + assert not self.needed_locks[locking.LEVEL_INSTANCE] + + # Lock instances optimistically, needs verification once group lock has + # been acquired + if self.op.conflicts_check: + self.needed_locks[locking.LEVEL_INSTANCE] = \ + self.cfg.GetNodeGroupInstances(self.group_uuid) + + def BuildHooksEnv(self): + ret = { + "GROUP_NAME": self.group_name, + } + return ret + + def BuildHooksNodes(self): + nodes = self.cfg.GetNodeGroup(self.group_uuid).members + return (nodes, nodes) + + def CheckPrereq(self): + owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP)) + + assert self.group_uuid in owned_groups + + l = lambda value: utils.CommaJoin("%s: %s/%s" % (i[0], i[1], i[2]) + for i in value) + + self.group = self.cfg.GetNodeGroup(self.group_uuid) + self.connected = True + if self.network_uuid not in self.group.networks: + self.LogWarning("Network '%s' is not mapped to group '%s'", + self.network_name, self.group.name) + self.connected = False + return + + if self.op.conflicts_check: + # Check if locked instances are still correct + owned_instances = frozenset(self.owned_locks(locking.LEVEL_INSTANCE)) + _CheckNodeGroupInstances(self.cfg, self.group_uuid, owned_instances) + + conflicting_instances = [] + + for (_, instance) in self.cfg.GetMultiInstanceInfo(owned_instances): + for idx, nic in enumerate(instance.nics): + if nic.network == self.network_name: + conflicting_instances.append((instance.name, idx, nic.ip)) + + if conflicting_instances: + self.LogWarning("Following occurences use IPs from network %s" + " that is about to disconnected from the nodegroup" + " %s: %s" % + (self.network_name, self.group.name, + l(conflicting_instances))) + raise errors.OpPrereqError("Conflicting IPs." + " Please remove/modify" + " corresponding NICS", + errors.ECODE_INVAL) + + def Exec(self, feedback_fn): + if not self.connected: + return + + del self.group.networks[self.network_uuid] + self.cfg.Update(self.group, feedback_fn) + + #: Query type implementations _QUERY_IMPL = { constants.QR_CLUSTER: _ClusterQuery, constants.QR_INSTANCE: _InstanceQuery, constants.QR_NODE: _NodeQuery, constants.QR_GROUP: _GroupQuery, + constants.QR_NETWORK: _NetworkQuery, constants.QR_OS: _OsQuery, constants.QR_EXPORT: _ExportQuery, } @@ -15109,3 +16369,21 @@ def _GetQueryImplementation(name): except KeyError: raise errors.OpPrereqError("Unknown query resource '%s'" % name, errors.ECODE_INVAL) + + +def _CheckForConflictingIp(lu, ip, node): + """In case of conflicting ip raise error. + + @type ip: string + @param ip: ip address + @type node: string + @param node: node name + + """ + (conf_net, _) = lu.cfg.CheckIPInNodeGroup(ip, node) + if conf_net is not None: + raise errors.OpPrereqError("Conflicting IP found:" + " %s <> %s." % (ip, conf_net), + errors.ECODE_INVAL) + + return (None, None)