X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/32388e6d2cc05a9b93b28c36f002462bba98c5c4..fc8a6b8f3334c1352caafcede1e31d801fc146ba:/lib/config.py?ds=sidebyside diff --git a/lib/config.py b/lib/config.py index 2d849f1..712f599 100644 --- a/lib/config.py +++ b/lib/config.py @@ -79,6 +79,7 @@ class ConfigWriter: self._cfg_file = cfg_file self._temporary_ids = set() self._temporary_drbds = {} + self._temporary_macs = set() # Note: in order to prevent errors when resolving our name in # _DistributeConfig, we compute it here once and reuse it; it's # better to raise an error before starting to modify the config @@ -110,11 +111,12 @@ class ConfigWriter: byte2 = random.randrange(0, 256) byte3 = random.randrange(0, 256) mac = "%s:%02x:%02x:%02x" % (prefix, byte1, byte2, byte3) - if mac not in all_macs: + if mac not in all_macs and mac not in self._temporary_macs: break retries -= 1 else: raise errors.ConfigurationError("Can't generate unique MAC") + self._temporary_macs.add(mac) return mac @locking.ssynchronized(_config_lock, shared=1) @@ -126,7 +128,7 @@ class ConfigWriter: """ all_macs = self._AllMACs() - return mac in all_macs + return mac in all_macs or mac in self._temporary_macs @locking.ssynchronized(_config_lock, shared=1) def GenerateDRBDSecret(self): @@ -146,7 +148,7 @@ class ConfigWriter: raise errors.ConfigurationError("Can't generate unique DRBD secret") return secret - def _ComputeAllLVs(self): + def _AllLVs(self): """Compute the list of all LVs. """ @@ -157,6 +159,23 @@ class ConfigWriter: lvnames.update(lv_list) return lvnames + def _AllIDs(self, include_temporary): + """Compute the list of all UUIDs and names we have. + + @type include_temporary: boolean + @param include_temporary: whether to include the _temporary_ids set + @rtype: set + @return: a set of IDs + + """ + existing = set() + if include_temporary: + existing.update(self._temporary_ids) + existing.update(self._AllLVs()) + existing.update(self._config_data.instances.keys()) + existing.update(self._config_data.nodes.keys()) + return existing + @locking.ssynchronized(_config_lock, shared=1) def GenerateUniqueID(self, exceptions=None): """Generate an unique disk name. @@ -173,11 +192,7 @@ class ConfigWriter: @return: the unique id """ - existing = set() - existing.update(self._temporary_ids) - existing.update(self._ComputeAllLVs()) - existing.update(self._config_data.instances.keys()) - existing.update(self._config_data.nodes.keys()) + existing = self._AllIDs(include_temporary=True) if exceptions is not None: existing.update(exceptions) retries = 64 @@ -191,6 +206,13 @@ class ConfigWriter: self._temporary_ids.add(unique_id) return unique_id + def _CleanupTemporaryIDs(self): + """Cleanups the _temporary_ids structure. + + """ + existing = self._AllIDs(include_temporary=False) + self._temporary_ids = self._temporary_ids - existing + def _AllMACs(self): """Return all MACs present in the config. @@ -227,15 +249,64 @@ class ConfigWriter: return result - @locking.ssynchronized(_config_lock, shared=1) - def VerifyConfig(self): + def _CheckDiskIDs(self, disk, l_ids, p_ids): + """Compute duplicate disk IDs + + @type disk: L{objects.Disk} + @param disk: the disk at which to start searching + @type l_ids: list + @param l_ids: list of current logical ids + @type p_ids: list + @param p_ids: list of current physical ids + @rtype: list + @return: a list of error messages + + """ + result = [] + if disk.logical_id is not None: + if disk.logical_id in l_ids: + result.append("duplicate logical id %s" % str(disk.logical_id)) + else: + l_ids.append(disk.logical_id) + if disk.physical_id is not None: + if disk.physical_id in p_ids: + result.append("duplicate physical id %s" % str(disk.physical_id)) + else: + p_ids.append(disk.physical_id) + + if disk.children: + for child in disk.children: + result.extend(self._CheckDiskIDs(child, l_ids, p_ids)) + return result + + def _UnlockedVerifyConfig(self): """Verify function. + @rtype: list + @return: a list of error messages; a non-empty list signifies + configuration errors + """ result = [] seen_macs = [] ports = {} data = self._config_data + seen_lids = [] + seen_pids = [] + + # global cluster checks + if not data.cluster.enabled_hypervisors: + result.append("enabled hypervisors list doesn't have any entries") + invalid_hvs = set(data.cluster.enabled_hypervisors) - constants.HYPER_TYPES + if invalid_hvs: + result.append("enabled hypervisors contains invalid entries: %s" % + invalid_hvs) + + if data.cluster.master_node not in data.nodes: + result.append("cluster has invalid primary node '%s'" % + data.cluster.master_node) + + # per-instance checks for instance_name in data.instances: instance = data.instances[instance_name] if instance.primary_node not in data.nodes: @@ -266,6 +337,12 @@ class ConfigWriter: ports[net_port] = [] ports[net_port].append((instance.name, "network port")) + # instance disk verify + for idx, disk in enumerate(instance.disks): + result.extend(["instance '%s' disk %d error: %s" % + (instance.name, idx, msg) for msg in disk.Verify()]) + result.extend(self._CheckDiskIDs(disk, seen_lids, seen_pids)) + # cluster-wide pool of free ports for free_port in data.cluster.tcpudp_port_pool: if free_port not in ports: @@ -290,13 +367,41 @@ class ConfigWriter: if not data.nodes[data.cluster.master_node].master_candidate: result.append("Master node is not a master candidate") + # master candidate checks mc_now, mc_max = self._UnlockedGetMasterCandidateStats() if mc_now < mc_max: result.append("Not enough master candidates: actual %d, target %d" % (mc_now, mc_max)) + # node checks + for node in data.nodes.values(): + if [node.master_candidate, node.drained, node.offline].count(True) > 1: + result.append("Node %s state is invalid: master_candidate=%s," + " drain=%s, offline=%s" % + (node.name, node.master_candidate, node.drain, + node.offline)) + + # drbd minors check + d_map, duplicates = self._UnlockedComputeDRBDMap() + for node, minor, instance_a, instance_b in duplicates: + result.append("DRBD minor %d on node %s is assigned twice to instances" + " %s and %s" % (minor, node, instance_a, instance_b)) + return result + @locking.ssynchronized(_config_lock, shared=1) + def VerifyConfig(self): + """Verify function. + + This is just a wrapper over L{_UnlockedVerifyConfig}. + + @rtype: list + @return: a list of error messages; a non-empty list signifies + configuration errors + + """ + return self._UnlockedVerifyConfig() + def _UnlockedSetDiskID(self, disk, node_name): """Convert the unique ID to the ID needed on the target nodes. @@ -392,34 +497,41 @@ class ConfigWriter: def _UnlockedComputeDRBDMap(self): """Compute the used DRBD minor/nodes. + @rtype: (dict, list) @return: dictionary of node_name: dict of minor: instance_name; the returned dict will have all the nodes in it (even if with - an empty list). + an empty list), and a list of duplicates; if the duplicates + list is not empty, the configuration is corrupted and its caller + should raise an exception """ def _AppendUsedPorts(instance_name, disk, used): + duplicates = [] if disk.dev_type == constants.LD_DRBD8 and len(disk.logical_id) >= 5: - nodeA, nodeB, dummy, minorA, minorB = disk.logical_id[:5] - for node, port in ((nodeA, minorA), (nodeB, minorB)): - assert node in used, "Instance node not found in node list" + node_a, node_b, _, minor_a, minor_b = disk.logical_id[:5] + for node, port in ((node_a, minor_a), (node_b, minor_b)): + assert node in used, ("Node '%s' of instance '%s' not found" + " in node list" % (node, instance_name)) if port in used[node]: - raise errors.ProgrammerError("DRBD minor already used:" - " %s/%s, %s/%s" % - (node, port, instance_name, - used[node][port])) - - used[node][port] = instance_name + duplicates.append((node, port, instance_name, used[node][port])) + else: + used[node][port] = instance_name if disk.children: for child in disk.children: - _AppendUsedPorts(instance_name, child, used) + duplicates.extend(_AppendUsedPorts(instance_name, child, used)) + return duplicates + duplicates = [] my_dict = dict((node, {}) for node in self._config_data.nodes) - for (node, minor), instance in self._temporary_drbds.iteritems(): - my_dict[node][minor] = instance for instance in self._config_data.instances.itervalues(): for disk in instance.disks: - _AppendUsedPorts(instance.name, disk, my_dict) - return my_dict + duplicates.extend(_AppendUsedPorts(instance.name, disk, my_dict)) + for (node, minor), instance in self._temporary_drbds.iteritems(): + if minor in my_dict[node] and my_dict[node][minor] != instance: + duplicates.append((node, minor, instance, my_dict[node][minor])) + else: + my_dict[node][minor] = instance + return my_dict, duplicates @locking.ssynchronized(_config_lock) def ComputeDRBDMap(self): @@ -432,7 +544,11 @@ class ConfigWriter: an empty list). """ - return self._UnlockedComputeDRBDMap() + d_map, duplicates = self._UnlockedComputeDRBDMap() + if duplicates: + raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" % + str(duplicates)) + return d_map @locking.ssynchronized(_config_lock) def AllocateDRBDMinor(self, nodes, instance): @@ -448,9 +564,12 @@ class ConfigWriter: """ assert isinstance(instance, basestring), \ - "Invalid argument passed to AllocateDRBDMinor" + "Invalid argument '%s' passed to AllocateDRBDMinor" % instance - d_map = self._UnlockedComputeDRBDMap() + d_map, duplicates = self._UnlockedComputeDRBDMap() + if duplicates: + raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" % + str(duplicates)) result = [] for nname in nodes: ndata = d_map[nname] @@ -469,22 +588,27 @@ class ConfigWriter: minor = keys[-1] + 1 else: minor = ffree - result.append(minor) + # double-check minor against current instances + assert minor not in d_map[nname], \ + ("Attempt to reuse allocated DRBD minor %d on node %s," + " already allocated to instance %s" % + (minor, nname, d_map[nname][minor])) ndata[minor] = instance - assert (nname, minor) not in self._temporary_drbds, \ - "Attempt to reuse reserved DRBD minor" - self._temporary_drbds[(nname, minor)] = instance + # double-check minor against reservation + r_key = (nname, minor) + assert r_key not in self._temporary_drbds, \ + ("Attempt to reuse reserved DRBD minor %d on node %s," + " reserved for instance %s" % + (minor, nname, self._temporary_drbds[r_key])) + self._temporary_drbds[r_key] = instance + result.append(minor) logging.debug("Request to allocate drbd minors, input: %s, returning %s", nodes, result) return result - @locking.ssynchronized(_config_lock) - def ReleaseDRBDMinors(self, instance): + def _UnlockedReleaseDRBDMinors(self, instance): """Release temporary drbd minors allocated for a given instance. - This should be called on both the error paths and on the success - paths (after the instance has been added or updated). - @type instance: string @param instance: the instance for which temporary minors should be released @@ -496,6 +620,23 @@ class ConfigWriter: if name == instance: del self._temporary_drbds[key] + @locking.ssynchronized(_config_lock) + def ReleaseDRBDMinors(self, instance): + """Release temporary drbd minors allocated for a given instance. + + This should be called on the error paths, on the success paths + it's automatically called by the ConfigWriter add and update + functions. + + This function is just a wrapper over L{_UnlockedReleaseDRBDMinors}. + + @type instance: string + @param instance: the instance for which temporary minors should be + released + + """ + self._UnlockedReleaseDRBDMinors(instance) + @locking.ssynchronized(_config_lock, shared=1) def GetConfigVersion(self): """Get the configuration version. @@ -580,25 +721,33 @@ class ConfigWriter: all_lvs = instance.MapLVsByNode() logging.info("Instance '%s' DISK_LAYOUT: %s", instance.name, all_lvs) + all_macs = self._AllMACs() + for nic in instance.nics: + if nic.mac in all_macs: + raise errors.ConfigurationError("Cannot add instance %s:" + " MAC address '%s' already in use." % (instance.name, nic.mac)) + instance.serial_no = 1 self._config_data.instances[instance.name] = instance + self._config_data.cluster.serial_no += 1 + self._UnlockedReleaseDRBDMinors(instance.name) + for nic in instance.nics: + self._temporary_macs.discard(nic.mac) self._WriteConfig() def _SetInstanceStatus(self, instance_name, status): """Set the instance's status to a given value. """ - if status not in ("up", "down"): - raise errors.ProgrammerError("Invalid status '%s' passed to" - " ConfigWriter._SetInstanceStatus()" % - status) + assert isinstance(status, bool), \ + "Invalid status '%s' passed to SetInstanceStatus" % (status,) if instance_name not in self._config_data.instances: raise errors.ConfigurationError("Unknown instance '%s'" % instance_name) instance = self._config_data.instances[instance_name] - if instance.status != status: - instance.status = status + if instance.admin_up != status: + instance.admin_up = status instance.serial_no += 1 self._WriteConfig() @@ -607,7 +756,7 @@ class ConfigWriter: """Mark the instance status to up in the config. """ - self._SetInstanceStatus(instance_name, "up") + self._SetInstanceStatus(instance_name, True) @locking.ssynchronized(_config_lock) def RemoveInstance(self, instance_name): @@ -617,6 +766,7 @@ class ConfigWriter: if instance_name not in self._config_data.instances: raise errors.ConfigurationError("Unknown instance '%s'" % instance_name) del self._config_data.instances[instance_name] + self._config_data.cluster.serial_no += 1 self._WriteConfig() @locking.ssynchronized(_config_lock) @@ -651,7 +801,7 @@ class ConfigWriter: """Mark the status of an instance to down in the configuration. """ - self._SetInstanceStatus(instance_name, "down") + self._SetInstanceStatus(instance_name, False) def _UnlockedGetInstanceList(self): """Get the list of instances. @@ -680,7 +830,7 @@ class ConfigWriter: self._config_data.instances.keys()) def _UnlockedGetInstanceInfo(self, instance_name): - """Returns informations about an instance. + """Returns information about an instance. This function is for internal use, when the config lock is already held. @@ -692,9 +842,9 @@ class ConfigWriter: @locking.ssynchronized(_config_lock, shared=1) def GetInstanceInfo(self, instance_name): - """Returns informations about an instance. + """Returns information about an instance. - It takes the information from the configuration file. Other informations of + It takes the information from the configuration file. Other information of an instance are taken from the live systems. @param instance_name: name of the instance, e.g. @@ -711,7 +861,7 @@ class ConfigWriter: """Get the configuration of all instances. @rtype: dict - @returns: dict of (instance, instance_info), where instance_info is what + @return: dict of (instance, instance_info), where instance_info is what would GetInstanceInfo return for the node """ @@ -829,16 +979,20 @@ class ConfigWriter: for node in self._UnlockedGetNodeList()]) return my_dict - def _UnlockedGetMasterCandidateStats(self): + def _UnlockedGetMasterCandidateStats(self, exceptions=None): """Get the number of current and maximum desired and possible candidates. + @type exceptions: list + @param exceptions: if passed, list of nodes that should be ignored @rtype: tuple @return: tuple of (current, desired and possible) """ mc_now = mc_max = 0 - for node in self._config_data.nodes.itervalues(): - if not node.offline: + for node in self._config_data.nodes.values(): + if exceptions and node.name in exceptions: + continue + if not (node.offline or node.drained): mc_max += 1 if node.master_candidate: mc_now += 1 @@ -846,16 +1000,18 @@ class ConfigWriter: return (mc_now, mc_max) @locking.ssynchronized(_config_lock, shared=1) - def GetMasterCandidateStats(self): + def GetMasterCandidateStats(self, exceptions=None): """Get the number of current and maximum possible candidates. This is just a wrapper over L{_UnlockedGetMasterCandidateStats}. + @type exceptions: list + @param exceptions: if passed, list of nodes that should be ignored @rtype: tuple @return: tuple of (current, max) """ - return self._UnlockedGetMasterCandidateStats() + return self._UnlockedGetMasterCandidateStats(exceptions) @locking.ssynchronized(_config_lock) def MaintainCandidatePool(self): @@ -874,7 +1030,7 @@ class ConfigWriter: if mc_now >= mc_max: break node = self._config_data.nodes[name] - if node.master_candidate or node.offline: + if node.master_candidate or node.offline or node.drained: continue mod_list.append(node) node.master_candidate = True @@ -916,6 +1072,10 @@ class ConfigWriter: not hasattr(data.cluster, 'rsahostkeypub')): raise errors.ConfigurationError("Incomplete configuration" " (missing cluster.rsahostkeypub)") + + # Upgrade configuration if needed + data.UpgradeConfig() + self._config_data = data # reset the last serial as -1 so that the next write will cause # ssconf update @@ -961,6 +1121,15 @@ class ConfigWriter: """Write the configuration data to persistent storage. """ + # first, cleanup the _temporary_ids set, if an ID is now in the + # other objects it should be discarded to prevent unbounded growth + # of that structure + self._CleanupTemporaryIDs() + config_errors = self._UnlockedVerifyConfig() + if config_errors: + raise errors.ConfigurationError("Configuration data is not" + " consistent: %s" % + (", ".join(config_errors))) if destination is None: destination = self._cfg_file self._BumpSerialNo() @@ -996,16 +1165,21 @@ class ConfigWriter: """ fn = "\n".join + instance_names = utils.NiceSort(self._UnlockedGetInstanceList()) node_names = utils.NiceSort(self._UnlockedGetNodeList()) node_info = [self._UnlockedGetNodeInfo(name) for name in node_names] + instance_data = fn(instance_names) off_data = fn(node.name for node in node_info if node.offline) + on_data = fn(node.name for node in node_info if not node.offline) mc_data = fn(node.name for node in node_info if node.master_candidate) node_data = fn(node_names) cluster = self._config_data.cluster + cluster_tags = fn(cluster.GetTags()) return { constants.SS_CLUSTER_NAME: cluster.cluster_name, + constants.SS_CLUSTER_TAGS: cluster_tags, constants.SS_FILE_STORAGE_DIR: cluster.file_storage_dir, constants.SS_MASTER_CANDIDATES: mc_data, constants.SS_MASTER_IP: cluster.master_ip, @@ -1013,35 +1187,11 @@ class ConfigWriter: constants.SS_MASTER_NODE: cluster.master_node, constants.SS_NODE_LIST: node_data, constants.SS_OFFLINE_NODES: off_data, + constants.SS_ONLINE_NODES: on_data, + constants.SS_INSTANCE_LIST: instance_data, constants.SS_RELEASE_VERSION: constants.RELEASE_VERSION, } - @locking.ssynchronized(_config_lock) - def InitConfig(self, version, cluster_config, master_node_config): - """Create the initial cluster configuration. - - It will contain the current node, which will also be the master - node, and no instances. - - @type version: int - @param version: Configuration version - @type cluster_config: objects.Cluster - @param cluster_config: Cluster configuration - @type master_node_config: objects.Node - @param master_node_config: Master node configuration - - """ - nodes = { - master_node_config.name: master_node_config, - } - - self._config_data = objects.ConfigData(version=version, - cluster=cluster_config, - nodes=nodes, - instances={}, - serial_no=1) - self._WriteConfig() - @locking.ssynchronized(_config_lock, shared=1) def GetVGName(self): """Return the volume group name. @@ -1074,7 +1224,7 @@ class ConfigWriter: @locking.ssynchronized(_config_lock, shared=1) def GetClusterInfo(self): - """Returns informations about the cluster + """Returns information about the cluster @rtype: L{objects.Cluster} @return: the cluster object @@ -1120,4 +1270,9 @@ class ConfigWriter: # for node updates, we need to increase the cluster serial too self._config_data.cluster.serial_no += 1 + if isinstance(target, objects.Instance): + self._UnlockedReleaseDRBDMinors(target.name) + for nic in target.nics: + self._temporary_macs.discard(nic.mac) + self._WriteConfig()