X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/f56618e0b11301e9a314b7285bc462ee2f7f0bd0..f12eadb345d80b51c0aa667b0e7cf62d97925448:/lib/config.py diff --git a/lib/config.py b/lib/config.py index 9138e79..9256bb9 100644 --- a/lib/config.py +++ b/lib/config.py @@ -49,6 +49,14 @@ _config_lock = locking.SharedLock() def _ValidateConfig(data): + """Verifies that a configuration objects looks valid. + + This only verifies the version of the configuration. + + @raise errors.ConfigurationError: if the version differs from what + we expect + + """ if data.version != constants.CONFIG_VERSION: raise errors.ConfigurationError("Cluster configuration version" " mismatch, got %s instead of %s" % @@ -76,6 +84,7 @@ class ConfigWriter: # better to raise an error before starting to modify the config # file than after it was modified self._my_hostname = utils.HostInfo().name + self._last_cluster_serial = -1 self._OpenConfig() # this method needs to be static, so that we can call it on the class @@ -155,13 +164,13 @@ class ConfigWriter: This checks the current node, instances and disk names for duplicates. - Args: - - exceptions: a list with some other names which should be checked - for uniqueness (used for example when you want to get - more than one id at one time without adding each one in - turn to the config file + @param exceptions: a list with some other names which should be checked + for uniqueness (used for example when you want to get + more than one id at one time without adding each one in + turn to the config file) - Returns: the unique id as a string + @rtype: string + @return: the unique id """ existing = set() @@ -185,6 +194,9 @@ class ConfigWriter: def _AllMACs(self): """Return all MACs present in the config. + @rtype: list + @return: the list of all MACs + """ result = [] for instance in self._config_data.instances.values(): @@ -196,6 +208,9 @@ class ConfigWriter: def _AllDRBDSecrets(self): """Return all DRBD secrets present in the config. + @rtype: list + @return: the list of all DRBD secrets + """ def helper(disk, result): """Recursively gather secrets from this disk.""" @@ -212,9 +227,13 @@ class ConfigWriter: return result - @locking.ssynchronized(_config_lock, shared=1) - def VerifyConfig(self): - """Stub verify function. + def _UnlockedVerifyConfig(self): + """Verify function. + + @rtype: list + @return: a list of error messages; a non-empty list signifies + configuration errors + """ result = [] seen_macs = [] @@ -250,8 +269,13 @@ class ConfigWriter: ports[net_port] = [] ports[net_port].append((instance.name, "network port")) + # instance disk verify + for idx, disk in enumerate(instance.disks): + result.extend(["instance '%s' disk %d error: %s" % + (instance.name, idx, msg) for msg in disk.Verify()]) + # cluster-wide pool of free ports - for free_port in self._config_data.cluster.tcpudp_port_pool: + for free_port in data.cluster.tcpudp_port_pool: if free_port not in ports: ports[free_port] = [] ports[free_port].append(("cluster", "port marked as free")) @@ -267,13 +291,40 @@ class ConfigWriter: # highest used tcp port check if keys: - if keys[-1] > self._config_data.cluster.highest_used_port: + if keys[-1] > data.cluster.highest_used_port: result.append("Highest used port mismatch, saved %s, computed %s" % - (self._config_data.cluster.highest_used_port, - keys[-1])) + (data.cluster.highest_used_port, keys[-1])) + + if not data.nodes[data.cluster.master_node].master_candidate: + result.append("Master node is not a master candidate") + + # master candidate checks + mc_now, mc_max = self._UnlockedGetMasterCandidateStats() + if mc_now < mc_max: + result.append("Not enough master candidates: actual %d, target %d" % + (mc_now, mc_max)) + + # drbd minors check + d_map, duplicates = self._UnlockedComputeDRBDMap() + for node, minor, instance_a, instance_b in duplicates: + result.append("DRBD minor %d on node %s is assigned twice to instances" + " %s and %s" % (minor, node, instance_a, instance_b)) return result + @locking.ssynchronized(_config_lock, shared=1) + def VerifyConfig(self): + """Verify function. + + This is just a wrapper over L{_UnlockedVerifyConfig}. + + @rtype: list + @return: a list of error messages; a non-empty list signifies + configuration errors + + """ + return self._UnlockedVerifyConfig() + def _UnlockedSetDiskID(self, disk, node_name): """Convert the unique ID to the ID needed on the target nodes. @@ -366,37 +417,61 @@ class ConfigWriter: self._WriteConfig() return port - def _ComputeDRBDMap(self, instance): + def _UnlockedComputeDRBDMap(self): """Compute the used DRBD minor/nodes. - Return: dictionary of node_name: dict of minor: instance_name. The - returned dict will have all the nodes in it (even if with an empty - list). + @rtype: (dict, list) + @return: dictionary of node_name: dict of minor: instance_name; + the returned dict will have all the nodes in it (even if with + an empty list), and a list of duplicates; if the duplicates + list is not empty, the configuration is corrupted and its caller + should raise an exception """ def _AppendUsedPorts(instance_name, disk, used): + duplicates = [] if disk.dev_type == constants.LD_DRBD8 and len(disk.logical_id) >= 5: nodeA, nodeB, dummy, minorA, minorB = disk.logical_id[:5] for node, port in ((nodeA, minorA), (nodeB, minorB)): - assert node in used, "Instance node not found in node list" + assert node in used, ("Node '%s' of instance '%s' not found" + " in node list" % (node, instance_name)) if port in used[node]: - raise errors.ProgrammerError("DRBD minor already used:" - " %s/%s, %s/%s" % - (node, port, instance_name, - used[node][port])) - - used[node][port] = instance_name + duplicates.append((node, port, instance_name, used[node][port])) + else: + used[node][port] = instance_name if disk.children: for child in disk.children: - _AppendUsedPorts(instance_name, child, used) + duplicates.extend(_AppendUsedPorts(instance_name, child, used)) + return duplicates + duplicates = [] my_dict = dict((node, {}) for node in self._config_data.nodes) - for (node, minor), instance in self._temporary_drbds.iteritems(): - my_dict[node][minor] = instance for instance in self._config_data.instances.itervalues(): for disk in instance.disks: - _AppendUsedPorts(instance.name, disk, my_dict) - return my_dict + duplicates.extend(_AppendUsedPorts(instance.name, disk, my_dict)) + for (node, minor), instance in self._temporary_drbds.iteritems(): + if minor in my_dict[node] and my_dict[node][minor] != instance: + duplicates.append((node, minor, instance, my_dict[node][minor])) + else: + my_dict[node][minor] = instance + return my_dict, duplicates + + @locking.ssynchronized(_config_lock) + def ComputeDRBDMap(self): + """Compute the used DRBD minor/nodes. + + This is just a wrapper over L{_UnlockedComputeDRBDMap}. + + @return: dictionary of node_name: dict of minor: instance_name; + the returned dict will have all the nodes in it (even if with + an empty list). + + """ + d_map, duplicates = self._UnlockedComputeDRBDMap() + if duplicates: + raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" % + str(duplicates)) + return d_map @locking.ssynchronized(_config_lock) def AllocateDRBDMinor(self, nodes, instance): @@ -407,8 +482,17 @@ class ConfigWriter: multiple minors. The result is the list of minors, in the same order as the passed nodes. + @type instance: string + @param instance: the instance for which we allocate minors + """ - d_map = self._ComputeDRBDMap(instance) + assert isinstance(instance, basestring), \ + "Invalid argument '%s' passed to AllocateDRBDMinor" % instance + + d_map, duplicates = self._UnlockedComputeDRBDMap() + if duplicates: + raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" % + str(duplicates)) result = [] for nname in nodes: ndata = d_map[nname] @@ -427,31 +511,55 @@ class ConfigWriter: minor = keys[-1] + 1 else: minor = ffree - result.append(minor) + # double-check minor against current instances + assert minor not in d_map[nname], \ + ("Attempt to reuse allocated DRBD minor %d on node %s," + " already allocated to instance %s" % + (minor, nname, d_map[nname][minor])) ndata[minor] = instance - assert (nname, minor) not in self._temporary_drbds, \ - "Attempt to reuse reserved DRBD minor" - self._temporary_drbds[(nname, minor)] = instance + # double-check minor against reservation + r_key = (nname, minor) + assert r_key not in self._temporary_drbds, \ + ("Attempt to reuse reserved DRBD minor %d on node %s," + " reserved for instance %s" % + (minor, nname, self._temporary_drbds[r_key])) + self._temporary_drbds[r_key] = instance + result.append(minor) logging.debug("Request to allocate drbd minors, input: %s, returning %s", nodes, result) return result - @locking.ssynchronized(_config_lock) - def ReleaseDRBDMinors(self, instance): + def _UnlockedReleaseDRBDMinors(self, instance): """Release temporary drbd minors allocated for a given instance. - This should be called on both the error paths and on the success - paths (after the instance has been added or updated). - @type instance: string @param instance: the instance for which temporary minors should be released """ + assert isinstance(instance, basestring), \ + "Invalid argument passed to ReleaseDRBDMinors" for key, name in self._temporary_drbds.items(): if name == instance: del self._temporary_drbds[key] + @locking.ssynchronized(_config_lock) + def ReleaseDRBDMinors(self, instance): + """Release temporary drbd minors allocated for a given instance. + + This should be called on the error paths, on the success paths + it's automatically called by the ConfigWriter add and update + functions. + + This function is just a wrapper over L{_UnlockedReleaseDRBDMinors}. + + @type instance: string + @param instance: the instance for which temporary minors should be + released + + """ + self._UnlockedReleaseDRBDMinors(instance) + @locking.ssynchronized(_config_lock, shared=1) def GetConfigVersion(self): """Get the configuration version. @@ -513,9 +621,9 @@ class ConfigWriter: def GetHostKey(self): """Return the rsa hostkey from the config. - Args: None + @rtype: string + @return: the rsa hostkey - Returns: rsa hostkey """ return self._config_data.cluster.rsahostkeypub @@ -525,8 +633,9 @@ class ConfigWriter: This should be used after creating a new instance. - Args: - instance: the instance object + @type instance: L{objects.Instance} + @param instance: the instance object + """ if not isinstance(instance, objects.Instance): raise errors.ProgrammerError("Invalid type passed to AddInstance") @@ -537,23 +646,22 @@ class ConfigWriter: instance.serial_no = 1 self._config_data.instances[instance.name] = instance + self._UnlockedReleaseDRBDMinors(instance.name) self._WriteConfig() def _SetInstanceStatus(self, instance_name, status): """Set the instance's status to a given value. """ - if status not in ("up", "down"): - raise errors.ProgrammerError("Invalid status '%s' passed to" - " ConfigWriter._SetInstanceStatus()" % - status) + assert isinstance(status, bool), \ + "Invalid status '%s' passed to SetInstanceStatus" % (status,) if instance_name not in self._config_data.instances: raise errors.ConfigurationError("Unknown instance '%s'" % instance_name) instance = self._config_data.instances[instance_name] - if instance.status != status: - instance.status = status + if instance.admin_up != status: + instance.admin_up = status instance.serial_no += 1 self._WriteConfig() @@ -562,7 +670,7 @@ class ConfigWriter: """Mark the instance status to up in the config. """ - self._SetInstanceStatus(instance_name, "up") + self._SetInstanceStatus(instance_name, True) @locking.ssynchronized(_config_lock) def RemoveInstance(self, instance_name): @@ -606,7 +714,7 @@ class ConfigWriter: """Mark the status of an instance to down in the configuration. """ - self._SetInstanceStatus(instance_name, "down") + self._SetInstanceStatus(instance_name, False) def _UnlockedGetInstanceList(self): """Get the list of instances. @@ -620,9 +728,8 @@ class ConfigWriter: def GetInstanceList(self): """Get the list of instances. - Returns: - array of instances, ex. ['instance2.example.com','instance1.example.com'] - these contains all the instances, also the ones in Admin_down state + @return: array of instances, ex. ['instance2.example.com', + 'instance1.example.com'] """ return self._UnlockedGetInstanceList() @@ -653,11 +760,11 @@ class ConfigWriter: It takes the information from the configuration file. Other informations of an instance are taken from the live systems. - Args: - instance: name of the instance, ex instance1.example.com + @param instance_name: name of the instance, e.g. + I{instance1.example.com} - Returns: - the instance object + @rtype: L{objects.Instance} + @return: the instance object """ return self._UnlockedGetInstanceInfo(instance_name) @@ -679,8 +786,8 @@ class ConfigWriter: def AddNode(self, node): """Add a node to the configuration. - Args: - node: an object.Node instance + @type node: L{objects.Node} + @param node: a Node instance """ logging.info("Adding node %s to configuration" % node.name) @@ -715,11 +822,13 @@ class ConfigWriter: def _UnlockedGetNodeInfo(self, node_name): """Get the configuration of a node, as stored in the config. - This function is for internal use, when the config lock is already held. + This function is for internal use, when the config lock is already + held. - Args: node: nodename (tuple) of the node + @param node_name: the node name, e.g. I{node1.example.com} - Returns: the node object + @rtype: L{objects.Node} + @return: the node object """ if node_name not in self._config_data.nodes: @@ -732,9 +841,12 @@ class ConfigWriter: def GetNodeInfo(self, node_name): """Get the configuration of a node, as stored in the config. - Args: node: nodename (tuple) of the node + This is just a locked wrapper over L{_UnlockedGetNodeInfo}. + + @param node_name: the node name, e.g. I{node1.example.com} - Returns: the node object + @rtype: L{objects.Node} + @return: the node object """ return self._UnlockedGetNodeInfo(node_name) @@ -742,7 +854,10 @@ class ConfigWriter: def _UnlockedGetNodeList(self): """Return the list of nodes which are in the configuration. - This function is for internal use, when the config lock is already held. + This function is for internal use, when the config lock is already + held. + + @rtype: list """ return self._config_data.nodes.keys() @@ -756,11 +871,20 @@ class ConfigWriter: return self._UnlockedGetNodeList() @locking.ssynchronized(_config_lock, shared=1) + def GetOnlineNodeList(self): + """Return the list of nodes which are online. + + """ + all_nodes = [self._UnlockedGetNodeInfo(node) + for node in self._UnlockedGetNodeList()] + return [node.name for node in all_nodes if not node.offline] + + @locking.ssynchronized(_config_lock, shared=1) def GetAllNodesInfo(self): """Get the configuration of all nodes. @rtype: dict - @returns: dict of (node, node_info), where node_info is what + @return: dict of (node, node_info), where node_info is what would GetNodeInfo return for the node """ @@ -768,6 +892,67 @@ class ConfigWriter: for node in self._UnlockedGetNodeList()]) return my_dict + def _UnlockedGetMasterCandidateStats(self): + """Get the number of current and maximum desired and possible candidates. + + @rtype: tuple + @return: tuple of (current, desired and possible) + + """ + mc_now = mc_max = 0 + for node in self._config_data.nodes.itervalues(): + if not node.offline: + mc_max += 1 + if node.master_candidate: + mc_now += 1 + mc_max = min(mc_max, self._config_data.cluster.candidate_pool_size) + return (mc_now, mc_max) + + @locking.ssynchronized(_config_lock, shared=1) + def GetMasterCandidateStats(self): + """Get the number of current and maximum possible candidates. + + This is just a wrapper over L{_UnlockedGetMasterCandidateStats}. + + @rtype: tuple + @return: tuple of (current, max) + + """ + return self._UnlockedGetMasterCandidateStats() + + @locking.ssynchronized(_config_lock) + def MaintainCandidatePool(self): + """Try to grow the candidate pool to the desired size. + + @rtype: list + @return: list with the adjusted nodes (L{objects.Node} instances) + + """ + mc_now, mc_max = self._UnlockedGetMasterCandidateStats() + mod_list = [] + if mc_now < mc_max: + node_list = self._config_data.nodes.keys() + random.shuffle(node_list) + for name in node_list: + if mc_now >= mc_max: + break + node = self._config_data.nodes[name] + if node.master_candidate or node.offline: + continue + mod_list.append(node) + node.master_candidate = True + node.serial_no += 1 + mc_now += 1 + if mc_now != mc_max: + # this should not happen + logging.warning("Warning: MaintainCandidatePool didn't manage to" + " fill the candidate pool (%d/%d)", mc_now, mc_max) + if mod_list: + self._config_data.cluster.serial_no += 1 + self._WriteConfig() + + return mod_list + def _BumpSerialNo(self): """Bump up the serial number of the config. @@ -777,10 +962,6 @@ class ConfigWriter: def _OpenConfig(self): """Read the config data from disk. - In case we already have configuration data and the config file has - the same mtime as when we read it, we skip the parsing of the - file, since de-serialisation could be slow. - """ f = open(self._cfg_file, 'r') try: @@ -799,7 +980,7 @@ class ConfigWriter: raise errors.ConfigurationError("Incomplete configuration" " (missing cluster.rsahostkeypub)") self._config_data = data - # init the last serial as -1 so that the next write will cause + # reset the last serial as -1 so that the next write will cause # ssconf update self._last_cluster_serial = -1 @@ -843,6 +1024,11 @@ class ConfigWriter: """Write the configuration data to persistent storage. """ + config_errors = self._UnlockedVerifyConfig() + if config_errors: + raise errors.ConfigurationError("Configuration data is not" + " consistent: %s" % + (", ".join(config_errors))) if destination is None: destination = self._cfg_file self._BumpSerialNo() @@ -877,21 +1063,25 @@ class ConfigWriter: associated value """ - node_list = utils.NiceSort(self._UnlockedGetNodeList()) - mc_list = [self._UnlockedGetNodeInfo(name) for name in node_list] - mc_list = [node.name for node in mc_list if node.master_candidate] - node_list = "\n".join(node_list) - mc_list = "\n".join(mc_list) + fn = "\n".join + node_names = utils.NiceSort(self._UnlockedGetNodeList()) + node_info = [self._UnlockedGetNodeInfo(name) for name in node_names] + + off_data = fn(node.name for node in node_info if node.offline) + mc_data = fn(node.name for node in node_info if node.master_candidate) + node_data = fn(node_names) cluster = self._config_data.cluster return { constants.SS_CLUSTER_NAME: cluster.cluster_name, constants.SS_FILE_STORAGE_DIR: cluster.file_storage_dir, - constants.SS_MASTER_CANDIDATES: mc_list, + constants.SS_MASTER_CANDIDATES: mc_data, constants.SS_MASTER_IP: cluster.master_ip, constants.SS_MASTER_NETDEV: cluster.master_netdev, constants.SS_MASTER_NODE: cluster.master_node, - constants.SS_NODE_LIST: node_list, + constants.SS_NODE_LIST: node_data, + constants.SS_OFFLINE_NODES: off_data, + constants.SS_RELEASE_VERSION: constants.RELEASE_VERSION, } @locking.ssynchronized(_config_lock) @@ -954,8 +1144,8 @@ class ConfigWriter: def GetClusterInfo(self): """Returns informations about the cluster - Returns: - the cluster object + @rtype: L{objects.Cluster} + @return: the cluster object """ return self._config_data.cluster @@ -970,6 +1160,10 @@ class ConfigWriter: that all modified objects will be saved, but the target argument is the one the caller wants to ensure that it's saved. + @param target: an instance of either L{objects.Cluster}, + L{objects.Node} or L{objects.Instance} which is existing in + the cluster + """ if self._config_data is None: raise errors.ProgrammerError("Configuration file not read," @@ -994,4 +1188,7 @@ class ConfigWriter: # for node updates, we need to increase the cluster serial too self._config_data.cluster.serial_no += 1 + if isinstance(target, objects.Instance): + self._UnlockedReleaseDRBDMinors(target.name) + self._WriteConfig()