4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Configuration management for Ganeti
24 This module provides the interface to the Ganeti cluster configuration.
26 The configuration data is stored on every node but is updated on the master
27 only. After each update, the master distributes the data to the other nodes.
29 Currently, the data storage format is JSON. YAML was slow and consuming too
39 from ganeti import errors
40 from ganeti import locking
41 from ganeti import utils
42 from ganeti import constants
43 from ganeti import rpc
44 from ganeti import objects
45 from ganeti import serializer
46 from ganeti import ssconf
49 _config_lock = locking.SharedLock()
53 sstore = ssconf.SimpleStore()
55 if sstore.GetConfigVersion() != constants.CONFIG_VERSION:
56 raise errors.ConfigurationError("Cluster configuration version"
57 " mismatch, got %s instead of %s" %
58 (sstore.GetConfigVersion(),
59 constants.CONFIG_VERSION))
63 """The interface to the cluster configuration.
66 def __init__(self, cfg_file=None, offline=False):
68 self._lock = _config_lock
69 self._config_data = None
70 self._config_time = None
71 self._config_size = None
72 self._config_inode = None
73 self._offline = offline
75 self._cfg_file = constants.CLUSTER_CONF_FILE
77 self._cfg_file = cfg_file
78 self._temporary_ids = set()
79 self._temporary_drbds = {}
80 # Note: in order to prevent errors when resolving our name in
81 # _DistributeConfig, we compute it here once and reuse it; it's
82 # better to raise an error before starting to modify the config
83 # file than after it was modified
84 self._my_hostname = utils.HostInfo().name
86 # this method needs to be static, so that we can call it on the class
89 """Check if the cluster is configured.
92 return os.path.exists(constants.CLUSTER_CONF_FILE)
94 @locking.ssynchronized(_config_lock, shared=1)
95 def GenerateMAC(self):
96 """Generate a MAC for an instance.
98 This should check the current instances for duplicates.
102 prefix = self._config_data.cluster.mac_prefix
103 all_macs = self._AllMACs()
106 byte1 = random.randrange(0, 256)
107 byte2 = random.randrange(0, 256)
108 byte3 = random.randrange(0, 256)
109 mac = "%s:%02x:%02x:%02x" % (prefix, byte1, byte2, byte3)
110 if mac not in all_macs:
114 raise errors.ConfigurationError("Can't generate unique MAC")
117 @locking.ssynchronized(_config_lock, shared=1)
118 def IsMacInUse(self, mac):
119 """Predicate: check if the specified MAC is in use in the Ganeti cluster.
121 This only checks instances managed by this cluster, it does not
122 check for potential collisions elsewhere.
126 all_macs = self._AllMACs()
127 return mac in all_macs
129 @locking.ssynchronized(_config_lock, shared=1)
130 def GenerateDRBDSecret(self):
131 """Generate a DRBD secret.
133 This checks the current disks for duplicates.
137 all_secrets = self._AllDRBDSecrets()
140 secret = utils.GenerateSecret()
141 if secret not in all_secrets:
145 raise errors.ConfigurationError("Can't generate unique DRBD secret")
148 def _ComputeAllLVs(self):
149 """Compute the list of all LVs.
154 for instance in self._config_data.instances.values():
155 node_data = instance.MapLVsByNode()
156 for lv_list in node_data.values():
157 lvnames.update(lv_list)
160 @locking.ssynchronized(_config_lock, shared=1)
161 def GenerateUniqueID(self, exceptions=None):
162 """Generate an unique disk name.
164 This checks the current node, instances and disk names for
168 - exceptions: a list with some other names which should be checked
169 for uniqueness (used for example when you want to get
170 more than one id at one time without adding each one in
171 turn to the config file
173 Returns: the unique id as a string
177 existing.update(self._temporary_ids)
178 existing.update(self._ComputeAllLVs())
179 existing.update(self._config_data.instances.keys())
180 existing.update(self._config_data.nodes.keys())
181 if exceptions is not None:
182 existing.update(exceptions)
185 unique_id = utils.NewUUID()
186 if unique_id not in existing and unique_id is not None:
189 raise errors.ConfigurationError("Not able generate an unique ID"
190 " (last tried ID: %s" % unique_id)
191 self._temporary_ids.add(unique_id)
195 """Return all MACs present in the config.
201 for instance in self._config_data.instances.values():
202 for nic in instance.nics:
203 result.append(nic.mac)
207 def _AllDRBDSecrets(self):
208 """Return all DRBD secrets present in the config.
211 def helper(disk, result):
212 """Recursively gather secrets from this disk."""
213 if disk.dev_type == constants.DT_DRBD8:
214 result.append(disk.logical_id[5])
216 for child in disk.children:
217 helper(child, result)
220 for instance in self._config_data.instances.values():
221 for disk in instance.disks:
226 @locking.ssynchronized(_config_lock, shared=1)
227 def VerifyConfig(self):
228 """Stub verify function.
235 data = self._config_data
236 for instance_name in data.instances:
237 instance = data.instances[instance_name]
238 if instance.primary_node not in data.nodes:
239 result.append("instance '%s' has invalid primary node '%s'" %
240 (instance_name, instance.primary_node))
241 for snode in instance.secondary_nodes:
242 if snode not in data.nodes:
243 result.append("instance '%s' has invalid secondary node '%s'" %
244 (instance_name, snode))
245 for idx, nic in enumerate(instance.nics):
246 if nic.mac in seen_macs:
247 result.append("instance '%s' has NIC %d mac %s duplicate" %
248 (instance_name, idx, nic.mac))
250 seen_macs.append(nic.mac)
252 # gather the drbd ports for duplicate checks
253 for dsk in instance.disks:
254 if dsk.dev_type in constants.LDS_DRBD:
255 tcp_port = dsk.logical_id[2]
256 if tcp_port not in ports:
258 ports[tcp_port].append((instance.name, "drbd disk %s" % dsk.iv_name))
259 # gather network port reservation
260 net_port = getattr(instance, "network_port", None)
261 if net_port is not None:
262 if net_port not in ports:
264 ports[net_port].append((instance.name, "network port"))
266 # cluster-wide pool of free ports
267 for free_port in self._config_data.cluster.tcpudp_port_pool:
268 if free_port not in ports:
269 ports[free_port] = []
270 ports[free_port].append(("cluster", "port marked as free"))
272 # compute tcp/udp duplicate ports
278 txt = ", ".join(["%s/%s" % val for val in pdata])
279 result.append("tcp/udp port %s has duplicates: %s" % (pnum, txt))
281 # highest used tcp port check
283 if keys[-1] > self._config_data.cluster.highest_used_port:
284 result.append("Highest used port mismatch, saved %s, computed %s" %
285 (self._config_data.cluster.highest_used_port,
290 def _UnlockedSetDiskID(self, disk, node_name):
291 """Convert the unique ID to the ID needed on the target nodes.
293 This is used only for drbd, which needs ip/port configuration.
295 The routine descends down and updates its children also, because
296 this helps when the only the top device is passed to the remote
299 This function is for internal use, when the config lock is already held.
303 for child in disk.children:
304 self._UnlockedSetDiskID(child, node_name)
306 if disk.logical_id is None and disk.physical_id is not None:
308 if disk.dev_type == constants.LD_DRBD8:
309 pnode, snode, port, pminor, sminor, secret = disk.logical_id
310 if node_name not in (pnode, snode):
311 raise errors.ConfigurationError("DRBD device not knowing node %s" %
313 pnode_info = self._UnlockedGetNodeInfo(pnode)
314 snode_info = self._UnlockedGetNodeInfo(snode)
315 if pnode_info is None or snode_info is None:
316 raise errors.ConfigurationError("Can't find primary or secondary node"
317 " for %s" % str(disk))
318 p_data = (pnode_info.secondary_ip, port)
319 s_data = (snode_info.secondary_ip, port)
320 if pnode == node_name:
321 disk.physical_id = p_data + s_data + (pminor, secret)
322 else: # it must be secondary, we tested above
323 disk.physical_id = s_data + p_data + (sminor, secret)
325 disk.physical_id = disk.logical_id
328 @locking.ssynchronized(_config_lock)
329 def SetDiskID(self, disk, node_name):
330 """Convert the unique ID to the ID needed on the target nodes.
332 This is used only for drbd, which needs ip/port configuration.
334 The routine descends down and updates its children also, because
335 this helps when the only the top device is passed to the remote
339 return self._UnlockedSetDiskID(disk, node_name)
341 @locking.ssynchronized(_config_lock)
342 def AddTcpUdpPort(self, port):
343 """Adds a new port to the available port pool.
346 if not isinstance(port, int):
347 raise errors.ProgrammerError("Invalid type passed for port")
350 self._config_data.cluster.tcpudp_port_pool.add(port)
353 @locking.ssynchronized(_config_lock, shared=1)
354 def GetPortList(self):
355 """Returns a copy of the current port list.
359 return self._config_data.cluster.tcpudp_port_pool.copy()
361 @locking.ssynchronized(_config_lock)
362 def AllocatePort(self):
365 The port will be taken from the available port pool or from the
366 default port range (and in this case we increase
372 # If there are TCP/IP ports configured, we use them first.
373 if self._config_data.cluster.tcpudp_port_pool:
374 port = self._config_data.cluster.tcpudp_port_pool.pop()
376 port = self._config_data.cluster.highest_used_port + 1
377 if port >= constants.LAST_DRBD_PORT:
378 raise errors.ConfigurationError("The highest used port is greater"
379 " than %s. Aborting." %
380 constants.LAST_DRBD_PORT)
381 self._config_data.cluster.highest_used_port = port
386 def _ComputeDRBDMap(self, instance):
387 """Compute the used DRBD minor/nodes.
389 Return: dictionary of node_name: dict of minor: instance_name. The
390 returned dict will have all the nodes in it (even if with an empty
394 def _AppendUsedPorts(instance_name, disk, used):
395 if disk.dev_type == constants.LD_DRBD8 and len(disk.logical_id) >= 5:
396 nodeA, nodeB, dummy, minorA, minorB = disk.logical_id[:5]
397 for node, port in ((nodeA, minorA), (nodeB, minorB)):
398 assert node in used, "Instance node not found in node list"
399 if port in used[node]:
400 raise errors.ProgrammerError("DRBD minor already used:"
402 (node, port, instance_name,
405 used[node][port] = instance_name
407 for child in disk.children:
408 _AppendUsedPorts(instance_name, child, used)
410 my_dict = dict((node, {}) for node in self._config_data.nodes)
411 for (node, minor), instance in self._temporary_drbds.iteritems():
412 my_dict[node][minor] = instance
413 for instance in self._config_data.instances.itervalues():
414 for disk in instance.disks:
415 _AppendUsedPorts(instance.name, disk, my_dict)
418 @locking.ssynchronized(_config_lock)
419 def AllocateDRBDMinor(self, nodes, instance):
420 """Allocate a drbd minor.
422 The free minor will be automatically computed from the existing
423 devices. A node can be given multiple times in order to allocate
424 multiple minors. The result is the list of minors, in the same
425 order as the passed nodes.
430 d_map = self._ComputeDRBDMap(instance)
435 # no minors used, we can start at 0
438 self._temporary_drbds[(nname, 0)] = instance
442 ffree = utils.FirstFree(keys)
444 # return the next minor
445 # TODO: implement high-limit check
450 ndata[minor] = instance
451 assert (nname, minor) not in self._temporary_drbds, \
452 "Attempt to reuse reserved DRBD minor"
453 self._temporary_drbds[(nname, minor)] = instance
454 logging.debug("Request to allocate drbd minors, input: %s, returning %s",
458 @locking.ssynchronized(_config_lock)
459 def ReleaseDRBDMinors(self, instance):
460 """Release temporary drbd minors allocated for a given instance.
462 This should be called on both the error paths and on the success
463 paths (after the instance has been added or updated).
465 @type instance: string
466 @param instance: the instance for which temporary minors should be
470 for key, name in self._temporary_drbds.items():
472 del self._temporary_drbds[key]
474 @locking.ssynchronized(_config_lock, shared=1)
475 def GetHostKey(self):
476 """Return the rsa hostkey from the config.
483 return self._config_data.cluster.rsahostkeypub
485 @locking.ssynchronized(_config_lock)
486 def AddInstance(self, instance):
487 """Add an instance to the config.
489 This should be used after creating a new instance.
492 instance: the instance object
494 if not isinstance(instance, objects.Instance):
495 raise errors.ProgrammerError("Invalid type passed to AddInstance")
497 if instance.disk_template != constants.DT_DISKLESS:
498 all_lvs = instance.MapLVsByNode()
499 logging.info("Instance '%s' DISK_LAYOUT: %s", instance.name, all_lvs)
502 instance.serial_no = 1
503 self._config_data.instances[instance.name] = instance
504 self._config_data.cluster.serial_no += 1
507 def _SetInstanceStatus(self, instance_name, status):
508 """Set the instance's status to a given value.
511 if status not in ("up", "down"):
512 raise errors.ProgrammerError("Invalid status '%s' passed to"
513 " ConfigWriter._SetInstanceStatus()" %
517 if instance_name not in self._config_data.instances:
518 raise errors.ConfigurationError("Unknown instance '%s'" %
520 instance = self._config_data.instances[instance_name]
521 if instance.status != status:
522 instance.status = status
523 instance.serial_no += 1
526 @locking.ssynchronized(_config_lock)
527 def MarkInstanceUp(self, instance_name):
528 """Mark the instance status to up in the config.
531 self._SetInstanceStatus(instance_name, "up")
533 @locking.ssynchronized(_config_lock)
534 def RemoveInstance(self, instance_name):
535 """Remove the instance from the configuration.
540 if instance_name not in self._config_data.instances:
541 raise errors.ConfigurationError("Unknown instance '%s'" % instance_name)
542 del self._config_data.instances[instance_name]
543 self._config_data.cluster.serial_no += 1
546 @locking.ssynchronized(_config_lock)
547 def RenameInstance(self, old_name, new_name):
548 """Rename an instance.
550 This needs to be done in ConfigWriter and not by RemoveInstance
551 combined with AddInstance as only we can guarantee an atomic
556 if old_name not in self._config_data.instances:
557 raise errors.ConfigurationError("Unknown instance '%s'" % old_name)
558 inst = self._config_data.instances[old_name]
559 del self._config_data.instances[old_name]
562 for disk in inst.disks:
563 if disk.dev_type == constants.LD_FILE:
564 # rename the file paths in logical and physical id
565 file_storage_dir = os.path.dirname(os.path.dirname(disk.logical_id[1]))
566 disk.physical_id = disk.logical_id = (disk.logical_id[0],
567 os.path.join(file_storage_dir,
571 self._config_data.instances[inst.name] = inst
572 self._config_data.cluster.serial_no += 1
575 @locking.ssynchronized(_config_lock)
576 def MarkInstanceDown(self, instance_name):
577 """Mark the status of an instance to down in the configuration.
580 self._SetInstanceStatus(instance_name, "down")
582 def _UnlockedGetInstanceList(self):
583 """Get the list of instances.
585 This function is for internal use, when the config lock is already held.
589 return self._config_data.instances.keys()
591 @locking.ssynchronized(_config_lock, shared=1)
592 def GetInstanceList(self):
593 """Get the list of instances.
596 array of instances, ex. ['instance2.example.com','instance1.example.com']
597 these contains all the instances, also the ones in Admin_down state
600 return self._UnlockedGetInstanceList()
602 @locking.ssynchronized(_config_lock, shared=1)
603 def ExpandInstanceName(self, short_name):
604 """Attempt to expand an incomplete instance name.
609 return utils.MatchNameComponent(short_name,
610 self._config_data.instances.keys())
612 def _UnlockedGetInstanceInfo(self, instance_name):
613 """Returns informations about an instance.
615 This function is for internal use, when the config lock is already held.
620 if instance_name not in self._config_data.instances:
623 return self._config_data.instances[instance_name]
625 @locking.ssynchronized(_config_lock, shared=1)
626 def GetInstanceInfo(self, instance_name):
627 """Returns informations about an instance.
629 It takes the information from the configuration file. Other informations of
630 an instance are taken from the live systems.
633 instance: name of the instance, ex instance1.example.com
639 return self._UnlockedGetInstanceInfo(instance_name)
641 @locking.ssynchronized(_config_lock, shared=1)
642 def GetAllInstancesInfo(self):
643 """Get the configuration of all instances.
646 @returns: dict of (instance, instance_info), where instance_info is what
647 would GetInstanceInfo return for the node
650 my_dict = dict([(instance, self._UnlockedGetInstanceInfo(instance))
651 for instance in self._UnlockedGetInstanceList()])
654 @locking.ssynchronized(_config_lock)
655 def AddNode(self, node):
656 """Add a node to the configuration.
659 node: an object.Node instance
662 logging.info("Adding node %s to configuration" % node.name)
666 self._config_data.nodes[node.name] = node
667 self._config_data.cluster.serial_no += 1
670 @locking.ssynchronized(_config_lock)
671 def RemoveNode(self, node_name):
672 """Remove a node from the configuration.
675 logging.info("Removing node %s from configuration" % node_name)
678 if node_name not in self._config_data.nodes:
679 raise errors.ConfigurationError("Unknown node '%s'" % node_name)
681 del self._config_data.nodes[node_name]
682 self._config_data.cluster.serial_no += 1
685 @locking.ssynchronized(_config_lock, shared=1)
686 def ExpandNodeName(self, short_name):
687 """Attempt to expand an incomplete instance name.
692 return utils.MatchNameComponent(short_name,
693 self._config_data.nodes.keys())
695 def _UnlockedGetNodeInfo(self, node_name):
696 """Get the configuration of a node, as stored in the config.
698 This function is for internal use, when the config lock is already held.
700 Args: node: nodename (tuple) of the node
702 Returns: the node object
707 if node_name not in self._config_data.nodes:
710 return self._config_data.nodes[node_name]
713 @locking.ssynchronized(_config_lock, shared=1)
714 def GetNodeInfo(self, node_name):
715 """Get the configuration of a node, as stored in the config.
717 Args: node: nodename (tuple) of the node
719 Returns: the node object
722 return self._UnlockedGetNodeInfo(node_name)
724 def _UnlockedGetNodeList(self):
725 """Return the list of nodes which are in the configuration.
727 This function is for internal use, when the config lock is already held.
731 return self._config_data.nodes.keys()
734 @locking.ssynchronized(_config_lock, shared=1)
735 def GetNodeList(self):
736 """Return the list of nodes which are in the configuration.
739 return self._UnlockedGetNodeList()
741 @locking.ssynchronized(_config_lock, shared=1)
742 def GetAllNodesInfo(self):
743 """Get the configuration of all nodes.
746 @returns: dict of (node, node_info), where node_info is what
747 would GetNodeInfo return for the node
750 my_dict = dict([(node, self._UnlockedGetNodeInfo(node))
751 for node in self._UnlockedGetNodeList()])
754 @locking.ssynchronized(_config_lock, shared=1)
755 def DumpConfig(self):
756 """Return the entire configuration of the cluster.
759 return self._config_data
761 def _BumpSerialNo(self):
762 """Bump up the serial number of the config.
765 self._config_data.serial_no += 1
767 def _OpenConfig(self):
768 """Read the config data from disk.
770 In case we already have configuration data and the config file has
771 the same mtime as when we read it, we skip the parsing of the
772 file, since de-serialisation could be slow.
776 st = os.stat(self._cfg_file)
778 raise errors.ConfigurationError("Can't stat config file: %s" % err)
779 if (self._config_data is not None and
780 self._config_time is not None and
781 self._config_time == st.st_mtime and
782 self._config_size == st.st_size and
783 self._config_inode == st.st_ino):
784 # data is current, so skip loading of config file
787 # Make sure the configuration has the right version
790 f = open(self._cfg_file, 'r')
793 data = objects.ConfigData.FromDict(serializer.Load(f.read()))
794 except Exception, err:
795 raise errors.ConfigurationError(err)
798 if (not hasattr(data, 'cluster') or
799 not hasattr(data.cluster, 'rsahostkeypub')):
800 raise errors.ConfigurationError("Incomplete configuration"
801 " (missing cluster.rsahostkeypub)")
802 self._config_data = data
803 self._config_time = st.st_mtime
804 self._config_size = st.st_size
805 self._config_inode = st.st_ino
807 def _DistributeConfig(self):
808 """Distribute the configuration to the other nodes.
810 Currently, this only copies the configuration file. In the future,
811 it could be used to encapsulate the 2/3-phase update mechanism.
817 nodelist = self._UnlockedGetNodeList()
818 myhostname = self._my_hostname
821 nodelist.remove(myhostname)
825 result = rpc.call_upload_file(nodelist, self._cfg_file)
826 for node in nodelist:
828 logging.error("copy of file %s to node %s failed",
829 self._cfg_file, node)
833 def _WriteConfig(self, destination=None):
834 """Write the configuration data to persistent storage.
837 if destination is None:
838 destination = self._cfg_file
840 txt = serializer.Dump(self._config_data.ToDict())
841 dir_name, file_name = os.path.split(destination)
842 fd, name = tempfile.mkstemp('.newconfig', file_name, dir_name)
843 f = os.fdopen(fd, 'w')
849 # we don't need to do os.close(fd) as f.close() did it
850 os.rename(name, destination)
851 self.write_count += 1
852 # re-set our cache as not to re-read the config file
854 st = os.stat(destination)
856 raise errors.ConfigurationError("Can't stat config file: %s" % err)
857 self._config_time = st.st_mtime
858 self._config_size = st.st_size
859 self._config_inode = st.st_ino
860 # and redistribute the config file
861 self._DistributeConfig()
863 @locking.ssynchronized(_config_lock)
864 def InitConfig(self, version, cluster_config, master_node_config):
865 """Create the initial cluster configuration.
867 It will contain the current node, which will also be the master
868 node, and no instances.
871 @param version: Configuration version
872 @type cluster_config: objects.Cluster
873 @param cluster_config: Cluster configuration
874 @type master_node_config: objects.Node
875 @param master_node_config: Master node configuration
879 master_node_config.name: master_node_config,
882 self._config_data = objects.ConfigData(version=version,
883 cluster=cluster_config,
889 @locking.ssynchronized(_config_lock, shared=1)
891 """Return the volume group name.
895 return self._config_data.cluster.volume_group_name
897 @locking.ssynchronized(_config_lock)
898 def SetVGName(self, vg_name):
899 """Set the volume group name.
903 self._config_data.cluster.volume_group_name = vg_name
904 self._config_data.cluster.serial_no += 1
907 @locking.ssynchronized(_config_lock, shared=1)
908 def GetDefBridge(self):
909 """Return the default bridge.
913 return self._config_data.cluster.default_bridge
915 @locking.ssynchronized(_config_lock, shared=1)
916 def GetMACPrefix(self):
917 """Return the mac prefix.
921 return self._config_data.cluster.mac_prefix
923 @locking.ssynchronized(_config_lock, shared=1)
924 def GetClusterInfo(self):
925 """Returns informations about the cluster
933 return self._config_data.cluster
935 @locking.ssynchronized(_config_lock)
936 def Update(self, target):
937 """Notify function to be called after updates.
939 This function must be called when an object (as returned by
940 GetInstanceInfo, GetNodeInfo, GetCluster) has been updated and the
941 caller wants the modifications saved to the backing store. Note
942 that all modified objects will be saved, but the target argument
943 is the one the caller wants to ensure that it's saved.
946 if self._config_data is None:
947 raise errors.ProgrammerError("Configuration file not read,"
949 if isinstance(target, objects.Cluster):
950 test = target == self._config_data.cluster
951 elif isinstance(target, objects.Node):
952 test = target in self._config_data.nodes.values()
953 elif isinstance(target, objects.Instance):
954 test = target in self._config_data.instances.values()
956 raise errors.ProgrammerError("Invalid object type (%s) passed to"
957 " ConfigWriter.Update" % type(target))
959 raise errors.ConfigurationError("Configuration updated since object"
960 " has been read or unknown object")
961 target.serial_no += 1