X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/5fcdc80db13233a8718074a18064c40b0d3a1e20..8161a64679c5b3c46a275bc3ae8e13b69c902993:/lib/config.py diff --git a/lib/config.py b/lib/config.py index f83fab6..d3c40cd 100644 --- a/lib/config.py +++ b/lib/config.py @@ -1,4 +1,4 @@ -#!/usr/bin/python +# # # Copyright (C) 2006, 2007 Google Inc. @@ -21,30 +21,42 @@ """Configuration management for Ganeti -This module provides the interface to the ganeti cluster configuration. - +This module provides the interface to the Ganeti cluster configuration. -The configuration data is stored on every node but is updated on the -master only. After each update, the master distributes the data to the -other nodes. +The configuration data is stored on every node but is updated on the master +only. After each update, the master distributes the data to the other nodes. -Currently the data storage format is pickle as yaml was initially not -available, then we used it but it was a memory-eating slow beast, so -we reverted to pickle using custom Unpicklers. +Currently, the data storage format is JSON. YAML was slow and consuming too +much memory. """ import os -import socket import tempfile import random from ganeti import errors +from ganeti import locking from ganeti import logger from ganeti import utils from ganeti import constants from ganeti import rpc from ganeti import objects +from ganeti import serializer +from ganeti import ssconf + + +_config_lock = locking.SharedLock() + + +def ValidateConfig(): + sstore = ssconf.SimpleStore() + + if sstore.GetConfigVersion() != constants.CONFIG_VERSION: + raise errors.ConfigurationError("Cluster configuration version" + " mismatch, got %s instead of %s" % + (sstore.GetConfigVersion(), + constants.CONFIG_VERSION)) class ConfigWriter: @@ -52,6 +64,8 @@ class ConfigWriter: """ def __init__(self, cfg_file=None, offline=False): + self.write_count = 0 + self._lock = _config_lock self._config_data = None self._config_time = None self._config_size = None @@ -61,6 +75,12 @@ class ConfigWriter: self._cfg_file = constants.CLUSTER_CONF_FILE else: self._cfg_file = cfg_file + self._temporary_ids = set() + # Note: in order to prevent errors when resolving our name in + # _DistributeConfig, we compute it here once and reuse it; it's + # better to raise an error before starting to modify the config + # file than after it was modified + self._my_hostname = utils.HostInfo().name # this method needs to be static, so that we can call it on the class @staticmethod @@ -70,6 +90,7 @@ class ConfigWriter: """ return os.path.exists(constants.CLUSTER_CONF_FILE) + @locking.ssynchronized(_config_lock, shared=1) def GenerateMAC(self): """Generate a MAC for an instance. @@ -77,7 +98,6 @@ class ConfigWriter: """ self._OpenConfig() - self._ReleaseLock() prefix = self._config_data.cluster.mac_prefix all_macs = self._AllMACs() retries = 64 @@ -90,15 +110,72 @@ class ConfigWriter: break retries -= 1 else: - raise errors.ConfigurationError, ("Can't generate unique MAC") + raise errors.ConfigurationError("Can't generate unique MAC") return mac + @locking.ssynchronized(_config_lock, shared=1) + def IsMacInUse(self, mac): + """Predicate: check if the specified MAC is in use in the Ganeti cluster. + + This only checks instances managed by this cluster, it does not + check for potential collisions elsewhere. + + """ + self._OpenConfig() + all_macs = self._AllMACs() + return mac in all_macs + + def _ComputeAllLVs(self): + """Compute the list of all LVs. + + """ + self._OpenConfig() + lvnames = set() + for instance in self._config_data.instances.values(): + node_data = instance.MapLVsByNode() + for lv_list in node_data.values(): + lvnames.update(lv_list) + return lvnames + + @locking.ssynchronized(_config_lock, shared=1) + def GenerateUniqueID(self, exceptions=None): + """Generate an unique disk name. + + This checks the current node, instances and disk names for + duplicates. + + Args: + - exceptions: a list with some other names which should be checked + for uniqueness (used for example when you want to get + more than one id at one time without adding each one in + turn to the config file + + Returns: the unique id as a string + + """ + existing = set() + existing.update(self._temporary_ids) + existing.update(self._ComputeAllLVs()) + existing.update(self._config_data.instances.keys()) + existing.update(self._config_data.nodes.keys()) + if exceptions is not None: + existing.update(exceptions) + retries = 64 + while retries > 0: + unique_id = utils.NewUUID() + if unique_id not in existing and unique_id is not None: + break + else: + raise errors.ConfigurationError("Not able generate an unique ID" + " (last tried ID: %s" % unique_id) + self._temporary_ids.add(unique_id) + return unique_id + def _AllMACs(self): """Return all MACs present in the config. """ self._OpenConfig() - self._ReleaseLock() result = [] for instance in self._config_data.instances.values(): @@ -107,11 +184,11 @@ class ConfigWriter: return result + @locking.ssynchronized(_config_lock, shared=1) def VerifyConfig(self): """Stub verify function. """ self._OpenConfig() - self._ReleaseLock() result = [] seen_macs = [] @@ -119,22 +196,21 @@ class ConfigWriter: for instance_name in data.instances: instance = data.instances[instance_name] if instance.primary_node not in data.nodes: - result.append("Instance '%s' has invalid primary node '%s'" % + result.append("instance '%s' has invalid primary node '%s'" % (instance_name, instance.primary_node)) for snode in instance.secondary_nodes: if snode not in data.nodes: - result.append("Instance '%s' has invalid secondary node '%s'" % + result.append("instance '%s' has invalid secondary node '%s'" % (instance_name, snode)) for idx, nic in enumerate(instance.nics): if nic.mac in seen_macs: - result.append("Instance '%s' has NIC %d mac %s duplicate" % + result.append("instance '%s' has NIC %d mac %s duplicate" % (instance_name, idx, nic.mac)) else: seen_macs.append(nic.mac) return result - - def SetDiskID(self, disk, node_name): + def _UnlockedSetDiskID(self, disk, node_name): """Convert the unique ID to the ID needed on the target nodes. This is used only for drbd, which needs ip/port configuration. @@ -143,20 +219,22 @@ class ConfigWriter: this helps when the only the top device is passed to the remote node. + This function is for internal use, when the config lock is already held. + """ if disk.children: for child in disk.children: - self.SetDiskID(child, node_name) + self._UnlockedSetDiskID(child, node_name) if disk.logical_id is None and disk.physical_id is not None: return - if disk.dev_type == "drbd": + if disk.dev_type in constants.LDS_DRBD: pnode, snode, port = disk.logical_id if node_name not in (pnode, snode): - raise errors.ConfigurationError, ("DRBD device not knowing node %s" % - node_name) - pnode_info = self.GetNodeInfo(pnode) - snode_info = self.GetNodeInfo(snode) + raise errors.ConfigurationError("DRBD device not knowing node %s" % + node_name) + pnode_info = self._UnlockedGetNodeInfo(pnode) + snode_info = self._UnlockedGetNodeInfo(snode) if pnode_info is None or snode_info is None: raise errors.ConfigurationError("Can't find primary or secondary node" " for %s" % str(disk)) @@ -170,25 +248,40 @@ class ConfigWriter: disk.physical_id = disk.logical_id return + @locking.ssynchronized(_config_lock) + def SetDiskID(self, disk, node_name): + """Convert the unique ID to the ID needed on the target nodes. + + This is used only for drbd, which needs ip/port configuration. + + The routine descends down and updates its children also, because + this helps when the only the top device is passed to the remote + node. + + """ + return self._UnlockedSetDiskID(disk, node_name) + + @locking.ssynchronized(_config_lock) def AddTcpUdpPort(self, port): """Adds a new port to the available port pool. """ if not isinstance(port, int): - raise errors.ProgrammerError, ("Invalid type passed for port") + raise errors.ProgrammerError("Invalid type passed for port") self._OpenConfig() self._config_data.cluster.tcpudp_port_pool.add(port) self._WriteConfig() + @locking.ssynchronized(_config_lock, shared=1) def GetPortList(self): """Returns a copy of the current port list. """ self._OpenConfig() - self._ReleaseLock() return self._config_data.cluster.tcpudp_port_pool.copy() + @locking.ssynchronized(_config_lock) def AllocatePort(self): """Allocate a port. @@ -205,14 +298,15 @@ class ConfigWriter: else: port = self._config_data.cluster.highest_used_port + 1 if port >= constants.LAST_DRBD_PORT: - raise errors.ConfigurationError, ("The highest used port is greater" - " than %s. Aborting." % - constants.LAST_DRBD_PORT) + raise errors.ConfigurationError("The highest used port is greater" + " than %s. Aborting." % + constants.LAST_DRBD_PORT) self._config_data.cluster.highest_used_port = port self._WriteConfig() return port + @locking.ssynchronized(_config_lock, shared=1) def GetHostKey(self): """Return the rsa hostkey from the config. @@ -221,9 +315,9 @@ class ConfigWriter: Returns: rsa hostkey """ self._OpenConfig() - self._ReleaseLock() return self._config_data.cluster.rsahostkeypub + @locking.ssynchronized(_config_lock) def AddInstance(self, instance): """Add an instance to the config. @@ -235,23 +329,40 @@ class ConfigWriter: if not isinstance(instance, objects.Instance): raise errors.ProgrammerError("Invalid type passed to AddInstance") + if instance.disk_template != constants.DT_DISKLESS: + all_lvs = instance.MapLVsByNode() + logger.Info("Instance '%s' DISK_LAYOUT: %s" % (instance.name, all_lvs)) + self._OpenConfig() self._config_data.instances[instance.name] = instance self._WriteConfig() - def MarkInstanceUp(self, instance_name): - """Mark the instance status to up in the config. + def _SetInstanceStatus(self, instance_name, status): + """Set the instance's status to a given value. """ + if status not in ("up", "down"): + raise errors.ProgrammerError("Invalid status '%s' passed to" + " ConfigWriter._SetInstanceStatus()" % + status) self._OpenConfig() if instance_name not in self._config_data.instances: - raise errors.ConfigurationError, ("Unknown instance '%s'" % - instance_name) + raise errors.ConfigurationError("Unknown instance '%s'" % + instance_name) instance = self._config_data.instances[instance_name] - instance.status = "up" - self._WriteConfig() + if instance.status != status: + instance.status = status + self._WriteConfig() + + @locking.ssynchronized(_config_lock) + def MarkInstanceUp(self, instance_name): + """Mark the instance status to up in the config. + """ + self._SetInstanceStatus(instance_name, "up") + + @locking.ssynchronized(_config_lock) def RemoveInstance(self, instance_name): """Remove the instance from the configuration. @@ -259,24 +370,46 @@ class ConfigWriter: self._OpenConfig() if instance_name not in self._config_data.instances: - raise errors.ConfigurationError, ("Unknown instance '%s'" % - instance_name) + raise errors.ConfigurationError("Unknown instance '%s'" % instance_name) del self._config_data.instances[instance_name] self._WriteConfig() - def MarkInstanceDown(self, instance_name): - """Mark the status of an instance to down in the configuration. + @locking.ssynchronized(_config_lock) + def RenameInstance(self, old_name, new_name): + """Rename an instance. + + This needs to be done in ConfigWriter and not by RemoveInstance + combined with AddInstance as only we can guarantee an atomic + rename. """ self._OpenConfig() - - if instance_name not in self._config_data.instances: - raise errors.ConfigurationError, ("Unknown instance '%s'" % - instance_name) - instance = self._config_data.instances[instance_name] - instance.status = "down" + if old_name not in self._config_data.instances: + raise errors.ConfigurationError("Unknown instance '%s'" % old_name) + inst = self._config_data.instances[old_name] + del self._config_data.instances[old_name] + inst.name = new_name + + for disk in inst.disks: + if disk.dev_type == constants.LD_FILE: + # rename the file paths in logical and physical id + file_storage_dir = os.path.dirname(os.path.dirname(disk.logical_id[1])) + disk.physical_id = disk.logical_id = (disk.logical_id[0], + os.path.join(file_storage_dir, + inst.name, + disk.iv_name)) + + self._config_data.instances[inst.name] = inst self._WriteConfig() + @locking.ssynchronized(_config_lock) + def MarkInstanceDown(self, instance_name): + """Mark the status of an instance to down in the configuration. + + """ + self._SetInstanceStatus(instance_name, "down") + + @locking.ssynchronized(_config_lock, shared=1) def GetInstanceList(self): """Get the list of instances. @@ -286,20 +419,20 @@ class ConfigWriter: """ self._OpenConfig() - self._ReleaseLock() return self._config_data.instances.keys() + @locking.ssynchronized(_config_lock, shared=1) def ExpandInstanceName(self, short_name): """Attempt to expand an incomplete instance name. """ self._OpenConfig() - self._ReleaseLock() return utils.MatchNameComponent(short_name, self._config_data.instances.keys()) + @locking.ssynchronized(_config_lock, shared=1) def GetInstanceInfo(self, instance_name): """Returns informations about an instance. @@ -314,13 +447,13 @@ class ConfigWriter: """ self._OpenConfig() - self._ReleaseLock() if instance_name not in self._config_data.instances: return None return self._config_data.instances[instance_name] + @locking.ssynchronized(_config_lock) def AddNode(self, node): """Add a node to the configuration. @@ -332,56 +465,79 @@ class ConfigWriter: self._config_data.nodes[node.name] = node self._WriteConfig() + @locking.ssynchronized(_config_lock) def RemoveNode(self, node_name): """Remove a node from the configuration. """ self._OpenConfig() if node_name not in self._config_data.nodes: - raise errors.ConfigurationError, ("Unknown node '%s'" % node_name) + raise errors.ConfigurationError("Unknown node '%s'" % node_name) del self._config_data.nodes[node_name] self._WriteConfig() + @locking.ssynchronized(_config_lock, shared=1) def ExpandNodeName(self, short_name): """Attempt to expand an incomplete instance name. """ self._OpenConfig() - self._ReleaseLock() return utils.MatchNameComponent(short_name, self._config_data.nodes.keys()) - def GetNodeInfo(self, node_name): + def _UnlockedGetNodeInfo(self, node_name): """Get the configuration of a node, as stored in the config. + This function is for internal use, when the config lock is already held. + Args: node: nodename (tuple) of the node Returns: the node object """ self._OpenConfig() - self._ReleaseLock() if node_name not in self._config_data.nodes: return None return self._config_data.nodes[node_name] - def GetNodeList(self): + + @locking.ssynchronized(_config_lock, shared=1) + def GetNodeInfo(self, node_name): + """Get the configuration of a node, as stored in the config. + + Args: node: nodename (tuple) of the node + + Returns: the node object + + """ + return self._UnlockedGetNodeInfo(node_name) + + def _UnlockedGetNodeList(self): """Return the list of nodes which are in the configuration. + This function is for internal use, when the config lock is already held. + """ self._OpenConfig() - self._ReleaseLock() return self._config_data.nodes.keys() + + @locking.ssynchronized(_config_lock, shared=1) + def GetNodeList(self): + """Return the list of nodes which are in the configuration. + + """ + return self._UnlockedGetNodeList() + + @locking.ssynchronized(_config_lock, shared=1) def DumpConfig(self): """Return the entire configuration of the cluster. """ self._OpenConfig() - self._ReleaseLock() return self._config_data def _BumpSerialNo(self): @@ -401,7 +557,7 @@ class ConfigWriter: try: st = os.stat(self._cfg_file) except OSError, err: - raise errors.ConfigurationError, "Can't stat config file: %s" % err + raise errors.ConfigurationError("Can't stat config file: %s" % err) if (self._config_data is not None and self._config_time is not None and self._config_time == st.st_mtime and @@ -409,32 +565,27 @@ class ConfigWriter: self._config_inode == st.st_ino): # data is current, so skip loading of config file return + + # Make sure the configuration has the right version + ValidateConfig() + f = open(self._cfg_file, 'r') try: try: - data = objects.ConfigObject.Load(f) + data = objects.ConfigData.FromDict(serializer.Load(f.read())) except Exception, err: - raise errors.ConfigurationError, err + raise errors.ConfigurationError(err) finally: f.close() if (not hasattr(data, 'cluster') or - not hasattr(data.cluster, 'config_version')): - raise errors.ConfigurationError, ("Incomplete configuration" - " (missing cluster.config_version)") - if data.cluster.config_version != constants.CONFIG_VERSION: - raise errors.ConfigurationError, ("Cluster configuration version" - " mismatch, got %s instead of %s" % - (data.cluster.config_version, - constants.CONFIG_VERSION)) + not hasattr(data.cluster, 'rsahostkeypub')): + raise errors.ConfigurationError("Incomplete configuration" + " (missing cluster.rsahostkeypub)") self._config_data = data self._config_time = st.st_mtime self._config_size = st.st_size self._config_inode = st.st_ino - def _ReleaseLock(self): - """xxxx - """ - def _DistributeConfig(self): """Distribute the configuration to the other nodes. @@ -445,18 +596,16 @@ class ConfigWriter: if self._offline: return True bad = False - nodelist = self.GetNodeList() - myhostname = socket.gethostname() + nodelist = self._UnlockedGetNodeList() + myhostname = self._my_hostname - tgt_list = [] - for node in nodelist: - nodeinfo = self.GetNodeInfo(node) - if nodeinfo.name == myhostname: - continue - tgt_list.append(node) + try: + nodelist.remove(myhostname) + except ValueError: + pass - result = rpc.call_upload_file(tgt_list, self._cfg_file) - for node in tgt_list: + result = rpc.call_upload_file(nodelist, self._cfg_file) + for node in nodelist: if not result[node]: logger.Error("copy of file %s to node %s failed" % (self._cfg_file, node)) @@ -470,18 +619,30 @@ class ConfigWriter: if destination is None: destination = self._cfg_file self._BumpSerialNo() + txt = serializer.Dump(self._config_data.ToDict()) dir_name, file_name = os.path.split(destination) fd, name = tempfile.mkstemp('.newconfig', file_name, dir_name) f = os.fdopen(fd, 'w') try: - self._config_data.Dump(f) + f.write(txt) os.fsync(f.fileno()) finally: f.close() # we don't need to do os.close(fd) as f.close() did it os.rename(name, destination) + self.write_count += 1 + # re-set our cache as not to re-read the config file + try: + st = os.stat(destination) + except OSError, err: + raise errors.ConfigurationError("Can't stat config file: %s" % err) + self._config_time = st.st_mtime + self._config_size = st.st_size + self._config_inode = st.st_ino + # and redistribute the config file self._DistributeConfig() + @locking.ssynchronized(_config_lock) def InitConfig(self, node, primary_ip, secondary_ip, hostkeypub, mac_prefix, vg_name, def_bridge): """Create the initial cluster configuration. @@ -497,8 +658,7 @@ class ConfigWriter: """ hu_port = constants.FIRST_DRBD_PORT - 1 - globalconfig = objects.Cluster(config_version=constants.CONFIG_VERSION, - serial_no=1, + globalconfig = objects.Cluster(serial_no=1, rsahostkeypub=hostkeypub, highest_used_port=hu_port, mac_prefix=mac_prefix, @@ -515,26 +675,75 @@ class ConfigWriter: cluster=globalconfig) self._WriteConfig() + @locking.ssynchronized(_config_lock, shared=1) def GetVGName(self): """Return the volume group name. """ self._OpenConfig() - self._ReleaseLock() return self._config_data.cluster.volume_group_name + @locking.ssynchronized(_config_lock) + def SetVGName(self, vg_name): + """Set the volume group name. + + """ + self._OpenConfig() + self._config_data.cluster.volume_group_name = vg_name + self._WriteConfig() + + @locking.ssynchronized(_config_lock, shared=1) def GetDefBridge(self): """Return the default bridge. """ self._OpenConfig() - self._ReleaseLock() return self._config_data.cluster.default_bridge + @locking.ssynchronized(_config_lock, shared=1) def GetMACPrefix(self): """Return the mac prefix. """ self._OpenConfig() - self._ReleaseLock() return self._config_data.cluster.mac_prefix + + @locking.ssynchronized(_config_lock, shared=1) + def GetClusterInfo(self): + """Returns informations about the cluster + + Returns: + the cluster object + + """ + self._OpenConfig() + + return self._config_data.cluster + + @locking.ssynchronized(_config_lock) + def Update(self, target): + """Notify function to be called after updates. + + This function must be called when an object (as returned by + GetInstanceInfo, GetNodeInfo, GetCluster) has been updated and the + caller wants the modifications saved to the backing store. Note + that all modified objects will be saved, but the target argument + is the one the caller wants to ensure that it's saved. + + """ + if self._config_data is None: + raise errors.ProgrammerError("Configuration file not read," + " cannot save.") + if isinstance(target, objects.Cluster): + test = target == self._config_data.cluster + elif isinstance(target, objects.Node): + test = target in self._config_data.nodes.values() + elif isinstance(target, objects.Instance): + test = target in self._config_data.instances.values() + else: + raise errors.ProgrammerError("Invalid object type (%s) passed to" + " ConfigWriter.Update" % type(target)) + if not test: + raise errors.ConfigurationError("Configuration updated since object" + " has been read or unknown object") + self._WriteConfig()