4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Configuration management for Ganeti
24 This module provides the interface to the Ganeti cluster configuration.
26 The configuration data is stored on every node but is updated on the master
27 only. After each update, the master distributes the data to the other nodes.
29 Currently, the data storage format is JSON. YAML was slow and consuming too
40 from ganeti import errors
41 from ganeti import locking
42 from ganeti import utils
43 from ganeti import constants
44 from ganeti import rpc
45 from ganeti import objects
46 from ganeti import serializer
49 _config_lock = locking.SharedLock()
52 def _ValidateConfig(data):
53 """Verifies that a configuration objects looks valid.
55 This only verifies the version of the configuration.
57 @raise errors.ConfigurationError: if the version differs from what
61 if data.version != constants.CONFIG_VERSION:
62 raise errors.ConfigurationError("Cluster configuration version"
63 " mismatch, got %s instead of %s" %
65 constants.CONFIG_VERSION))
69 """The interface to the cluster configuration.
72 def __init__(self, cfg_file=None, offline=False):
74 self._lock = _config_lock
75 self._config_data = None
76 self._offline = offline
78 self._cfg_file = constants.CLUSTER_CONF_FILE
80 self._cfg_file = cfg_file
81 self._temporary_ids = set()
82 self._temporary_drbds = {}
83 self._temporary_macs = set()
84 # Note: in order to prevent errors when resolving our name in
85 # _DistributeConfig, we compute it here once and reuse it; it's
86 # better to raise an error before starting to modify the config
87 # file than after it was modified
88 self._my_hostname = utils.HostInfo().name
89 self._last_cluster_serial = -1
92 # this method needs to be static, so that we can call it on the class
95 """Check if the cluster is configured.
98 return os.path.exists(constants.CLUSTER_CONF_FILE)
100 @locking.ssynchronized(_config_lock, shared=1)
101 def GenerateMAC(self):
102 """Generate a MAC for an instance.
104 This should check the current instances for duplicates.
107 prefix = self._config_data.cluster.mac_prefix
108 all_macs = self._AllMACs()
111 byte1 = random.randrange(0, 256)
112 byte2 = random.randrange(0, 256)
113 byte3 = random.randrange(0, 256)
114 mac = "%s:%02x:%02x:%02x" % (prefix, byte1, byte2, byte3)
115 if mac not in all_macs and mac not in self._temporary_macs:
119 raise errors.ConfigurationError("Can't generate unique MAC")
120 self._temporary_macs.add(mac)
123 @locking.ssynchronized(_config_lock, shared=1)
124 def IsMacInUse(self, mac):
125 """Predicate: check if the specified MAC is in use in the Ganeti cluster.
127 This only checks instances managed by this cluster, it does not
128 check for potential collisions elsewhere.
131 all_macs = self._AllMACs()
132 return mac in all_macs or mac in self._temporary_macs
134 @locking.ssynchronized(_config_lock, shared=1)
135 def GenerateDRBDSecret(self):
136 """Generate a DRBD secret.
138 This checks the current disks for duplicates.
141 all_secrets = self._AllDRBDSecrets()
144 secret = utils.GenerateSecret()
145 if secret not in all_secrets:
149 raise errors.ConfigurationError("Can't generate unique DRBD secret")
153 """Compute the list of all LVs.
157 for instance in self._config_data.instances.values():
158 node_data = instance.MapLVsByNode()
159 for lv_list in node_data.values():
160 lvnames.update(lv_list)
163 def _AllIDs(self, include_temporary):
164 """Compute the list of all UUIDs and names we have.
166 @type include_temporary: boolean
167 @param include_temporary: whether to include the _temporary_ids set
169 @return: a set of IDs
173 if include_temporary:
174 existing.update(self._temporary_ids)
175 existing.update(self._AllLVs())
176 existing.update(self._config_data.instances.keys())
177 existing.update(self._config_data.nodes.keys())
178 existing.update([i.uuid for i in self._AllUUIDObjects() if i.uuid])
181 @locking.ssynchronized(_config_lock, shared=1)
182 def GenerateUniqueID(self, exceptions=None):
183 """Generate an unique disk name.
185 This checks the current node, instances and disk names for
188 @param exceptions: a list with some other names which should be checked
189 for uniqueness (used for example when you want to get
190 more than one id at one time without adding each one in
191 turn to the config file)
194 @return: the unique id
197 existing = self._AllIDs(include_temporary=True)
198 if exceptions is not None:
199 existing.update(exceptions)
202 unique_id = utils.NewUUID()
203 if unique_id not in existing and unique_id is not None:
206 raise errors.ConfigurationError("Not able generate an unique ID"
207 " (last tried ID: %s" % unique_id)
208 self._temporary_ids.add(unique_id)
211 def _CleanupTemporaryIDs(self):
212 """Cleanups the _temporary_ids structure.
215 existing = self._AllIDs(include_temporary=False)
216 self._temporary_ids = self._temporary_ids - existing
219 """Return all MACs present in the config.
222 @return: the list of all MACs
226 for instance in self._config_data.instances.values():
227 for nic in instance.nics:
228 result.append(nic.mac)
232 def _AllDRBDSecrets(self):
233 """Return all DRBD secrets present in the config.
236 @return: the list of all DRBD secrets
239 def helper(disk, result):
240 """Recursively gather secrets from this disk."""
241 if disk.dev_type == constants.DT_DRBD8:
242 result.append(disk.logical_id[5])
244 for child in disk.children:
245 helper(child, result)
248 for instance in self._config_data.instances.values():
249 for disk in instance.disks:
254 def _CheckDiskIDs(self, disk, l_ids, p_ids):
255 """Compute duplicate disk IDs
257 @type disk: L{objects.Disk}
258 @param disk: the disk at which to start searching
260 @param l_ids: list of current logical ids
262 @param p_ids: list of current physical ids
264 @return: a list of error messages
268 if disk.logical_id is not None:
269 if disk.logical_id in l_ids:
270 result.append("duplicate logical id %s" % str(disk.logical_id))
272 l_ids.append(disk.logical_id)
273 if disk.physical_id is not None:
274 if disk.physical_id in p_ids:
275 result.append("duplicate physical id %s" % str(disk.physical_id))
277 p_ids.append(disk.physical_id)
280 for child in disk.children:
281 result.extend(self._CheckDiskIDs(child, l_ids, p_ids))
284 def _UnlockedVerifyConfig(self):
288 @return: a list of error messages; a non-empty list signifies
295 data = self._config_data
299 # global cluster checks
300 if not data.cluster.enabled_hypervisors:
301 result.append("enabled hypervisors list doesn't have any entries")
302 invalid_hvs = set(data.cluster.enabled_hypervisors) - constants.HYPER_TYPES
304 result.append("enabled hypervisors contains invalid entries: %s" %
307 if data.cluster.master_node not in data.nodes:
308 result.append("cluster has invalid primary node '%s'" %
309 data.cluster.master_node)
311 # per-instance checks
312 for instance_name in data.instances:
313 instance = data.instances[instance_name]
314 if instance.primary_node not in data.nodes:
315 result.append("instance '%s' has invalid primary node '%s'" %
316 (instance_name, instance.primary_node))
317 for snode in instance.secondary_nodes:
318 if snode not in data.nodes:
319 result.append("instance '%s' has invalid secondary node '%s'" %
320 (instance_name, snode))
321 for idx, nic in enumerate(instance.nics):
322 if nic.mac in seen_macs:
323 result.append("instance '%s' has NIC %d mac %s duplicate" %
324 (instance_name, idx, nic.mac))
326 seen_macs.append(nic.mac)
328 # gather the drbd ports for duplicate checks
329 for dsk in instance.disks:
330 if dsk.dev_type in constants.LDS_DRBD:
331 tcp_port = dsk.logical_id[2]
332 if tcp_port not in ports:
334 ports[tcp_port].append((instance.name, "drbd disk %s" % dsk.iv_name))
335 # gather network port reservation
336 net_port = getattr(instance, "network_port", None)
337 if net_port is not None:
338 if net_port not in ports:
340 ports[net_port].append((instance.name, "network port"))
342 # instance disk verify
343 for idx, disk in enumerate(instance.disks):
344 result.extend(["instance '%s' disk %d error: %s" %
345 (instance.name, idx, msg) for msg in disk.Verify()])
346 result.extend(self._CheckDiskIDs(disk, seen_lids, seen_pids))
348 # cluster-wide pool of free ports
349 for free_port in data.cluster.tcpudp_port_pool:
350 if free_port not in ports:
351 ports[free_port] = []
352 ports[free_port].append(("cluster", "port marked as free"))
354 # compute tcp/udp duplicate ports
360 txt = ", ".join(["%s/%s" % val for val in pdata])
361 result.append("tcp/udp port %s has duplicates: %s" % (pnum, txt))
363 # highest used tcp port check
365 if keys[-1] > data.cluster.highest_used_port:
366 result.append("Highest used port mismatch, saved %s, computed %s" %
367 (data.cluster.highest_used_port, keys[-1]))
369 if not data.nodes[data.cluster.master_node].master_candidate:
370 result.append("Master node is not a master candidate")
372 # master candidate checks
373 mc_now, mc_max = self._UnlockedGetMasterCandidateStats()
375 result.append("Not enough master candidates: actual %d, target %d" %
379 for node in data.nodes.values():
380 if [node.master_candidate, node.drained, node.offline].count(True) > 1:
381 result.append("Node %s state is invalid: master_candidate=%s,"
382 " drain=%s, offline=%s" %
383 (node.name, node.master_candidate, node.drain,
387 d_map, duplicates = self._UnlockedComputeDRBDMap()
388 for node, minor, instance_a, instance_b in duplicates:
389 result.append("DRBD minor %d on node %s is assigned twice to instances"
390 " %s and %s" % (minor, node, instance_a, instance_b))
394 @locking.ssynchronized(_config_lock, shared=1)
395 def VerifyConfig(self):
398 This is just a wrapper over L{_UnlockedVerifyConfig}.
401 @return: a list of error messages; a non-empty list signifies
405 return self._UnlockedVerifyConfig()
407 def _UnlockedSetDiskID(self, disk, node_name):
408 """Convert the unique ID to the ID needed on the target nodes.
410 This is used only for drbd, which needs ip/port configuration.
412 The routine descends down and updates its children also, because
413 this helps when the only the top device is passed to the remote
416 This function is for internal use, when the config lock is already held.
420 for child in disk.children:
421 self._UnlockedSetDiskID(child, node_name)
423 if disk.logical_id is None and disk.physical_id is not None:
425 if disk.dev_type == constants.LD_DRBD8:
426 pnode, snode, port, pminor, sminor, secret = disk.logical_id
427 if node_name not in (pnode, snode):
428 raise errors.ConfigurationError("DRBD device not knowing node %s" %
430 pnode_info = self._UnlockedGetNodeInfo(pnode)
431 snode_info = self._UnlockedGetNodeInfo(snode)
432 if pnode_info is None or snode_info is None:
433 raise errors.ConfigurationError("Can't find primary or secondary node"
434 " for %s" % str(disk))
435 p_data = (pnode_info.secondary_ip, port)
436 s_data = (snode_info.secondary_ip, port)
437 if pnode == node_name:
438 disk.physical_id = p_data + s_data + (pminor, secret)
439 else: # it must be secondary, we tested above
440 disk.physical_id = s_data + p_data + (sminor, secret)
442 disk.physical_id = disk.logical_id
445 @locking.ssynchronized(_config_lock)
446 def SetDiskID(self, disk, node_name):
447 """Convert the unique ID to the ID needed on the target nodes.
449 This is used only for drbd, which needs ip/port configuration.
451 The routine descends down and updates its children also, because
452 this helps when the only the top device is passed to the remote
456 return self._UnlockedSetDiskID(disk, node_name)
458 @locking.ssynchronized(_config_lock)
459 def AddTcpUdpPort(self, port):
460 """Adds a new port to the available port pool.
463 if not isinstance(port, int):
464 raise errors.ProgrammerError("Invalid type passed for port")
466 self._config_data.cluster.tcpudp_port_pool.add(port)
469 @locking.ssynchronized(_config_lock, shared=1)
470 def GetPortList(self):
471 """Returns a copy of the current port list.
474 return self._config_data.cluster.tcpudp_port_pool.copy()
476 @locking.ssynchronized(_config_lock)
477 def AllocatePort(self):
480 The port will be taken from the available port pool or from the
481 default port range (and in this case we increase
485 # If there are TCP/IP ports configured, we use them first.
486 if self._config_data.cluster.tcpudp_port_pool:
487 port = self._config_data.cluster.tcpudp_port_pool.pop()
489 port = self._config_data.cluster.highest_used_port + 1
490 if port >= constants.LAST_DRBD_PORT:
491 raise errors.ConfigurationError("The highest used port is greater"
492 " than %s. Aborting." %
493 constants.LAST_DRBD_PORT)
494 self._config_data.cluster.highest_used_port = port
499 def _UnlockedComputeDRBDMap(self):
500 """Compute the used DRBD minor/nodes.
503 @return: dictionary of node_name: dict of minor: instance_name;
504 the returned dict will have all the nodes in it (even if with
505 an empty list), and a list of duplicates; if the duplicates
506 list is not empty, the configuration is corrupted and its caller
507 should raise an exception
510 def _AppendUsedPorts(instance_name, disk, used):
512 if disk.dev_type == constants.LD_DRBD8 and len(disk.logical_id) >= 5:
513 node_a, node_b, _, minor_a, minor_b = disk.logical_id[:5]
514 for node, port in ((node_a, minor_a), (node_b, minor_b)):
515 assert node in used, ("Node '%s' of instance '%s' not found"
516 " in node list" % (node, instance_name))
517 if port in used[node]:
518 duplicates.append((node, port, instance_name, used[node][port]))
520 used[node][port] = instance_name
522 for child in disk.children:
523 duplicates.extend(_AppendUsedPorts(instance_name, child, used))
527 my_dict = dict((node, {}) for node in self._config_data.nodes)
528 for instance in self._config_data.instances.itervalues():
529 for disk in instance.disks:
530 duplicates.extend(_AppendUsedPorts(instance.name, disk, my_dict))
531 for (node, minor), instance in self._temporary_drbds.iteritems():
532 if minor in my_dict[node] and my_dict[node][minor] != instance:
533 duplicates.append((node, minor, instance, my_dict[node][minor]))
535 my_dict[node][minor] = instance
536 return my_dict, duplicates
538 @locking.ssynchronized(_config_lock)
539 def ComputeDRBDMap(self):
540 """Compute the used DRBD minor/nodes.
542 This is just a wrapper over L{_UnlockedComputeDRBDMap}.
544 @return: dictionary of node_name: dict of minor: instance_name;
545 the returned dict will have all the nodes in it (even if with
549 d_map, duplicates = self._UnlockedComputeDRBDMap()
551 raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
555 @locking.ssynchronized(_config_lock)
556 def AllocateDRBDMinor(self, nodes, instance):
557 """Allocate a drbd minor.
559 The free minor will be automatically computed from the existing
560 devices. A node can be given multiple times in order to allocate
561 multiple minors. The result is the list of minors, in the same
562 order as the passed nodes.
564 @type instance: string
565 @param instance: the instance for which we allocate minors
568 assert isinstance(instance, basestring), \
569 "Invalid argument '%s' passed to AllocateDRBDMinor" % instance
571 d_map, duplicates = self._UnlockedComputeDRBDMap()
573 raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
579 # no minors used, we can start at 0
582 self._temporary_drbds[(nname, 0)] = instance
586 ffree = utils.FirstFree(keys)
588 # return the next minor
589 # TODO: implement high-limit check
593 # double-check minor against current instances
594 assert minor not in d_map[nname], \
595 ("Attempt to reuse allocated DRBD minor %d on node %s,"
596 " already allocated to instance %s" %
597 (minor, nname, d_map[nname][minor]))
598 ndata[minor] = instance
599 # double-check minor against reservation
600 r_key = (nname, minor)
601 assert r_key not in self._temporary_drbds, \
602 ("Attempt to reuse reserved DRBD minor %d on node %s,"
603 " reserved for instance %s" %
604 (minor, nname, self._temporary_drbds[r_key]))
605 self._temporary_drbds[r_key] = instance
607 logging.debug("Request to allocate drbd minors, input: %s, returning %s",
611 def _UnlockedReleaseDRBDMinors(self, instance):
612 """Release temporary drbd minors allocated for a given instance.
614 @type instance: string
615 @param instance: the instance for which temporary minors should be
619 assert isinstance(instance, basestring), \
620 "Invalid argument passed to ReleaseDRBDMinors"
621 for key, name in self._temporary_drbds.items():
623 del self._temporary_drbds[key]
625 @locking.ssynchronized(_config_lock)
626 def ReleaseDRBDMinors(self, instance):
627 """Release temporary drbd minors allocated for a given instance.
629 This should be called on the error paths, on the success paths
630 it's automatically called by the ConfigWriter add and update
633 This function is just a wrapper over L{_UnlockedReleaseDRBDMinors}.
635 @type instance: string
636 @param instance: the instance for which temporary minors should be
640 self._UnlockedReleaseDRBDMinors(instance)
642 @locking.ssynchronized(_config_lock, shared=1)
643 def GetConfigVersion(self):
644 """Get the configuration version.
646 @return: Config version
649 return self._config_data.version
651 @locking.ssynchronized(_config_lock, shared=1)
652 def GetClusterName(self):
655 @return: Cluster name
658 return self._config_data.cluster.cluster_name
660 @locking.ssynchronized(_config_lock, shared=1)
661 def GetMasterNode(self):
662 """Get the hostname of the master node for this cluster.
664 @return: Master hostname
667 return self._config_data.cluster.master_node
669 @locking.ssynchronized(_config_lock, shared=1)
670 def GetMasterIP(self):
671 """Get the IP of the master node for this cluster.
676 return self._config_data.cluster.master_ip
678 @locking.ssynchronized(_config_lock, shared=1)
679 def GetMasterNetdev(self):
680 """Get the master network device for this cluster.
683 return self._config_data.cluster.master_netdev
685 @locking.ssynchronized(_config_lock, shared=1)
686 def GetFileStorageDir(self):
687 """Get the file storage dir for this cluster.
690 return self._config_data.cluster.file_storage_dir
692 @locking.ssynchronized(_config_lock, shared=1)
693 def GetHypervisorType(self):
694 """Get the hypervisor type for this cluster.
697 return self._config_data.cluster.enabled_hypervisors[0]
699 @locking.ssynchronized(_config_lock, shared=1)
700 def GetHostKey(self):
701 """Return the rsa hostkey from the config.
704 @return: the rsa hostkey
707 return self._config_data.cluster.rsahostkeypub
709 @locking.ssynchronized(_config_lock)
710 def AddInstance(self, instance):
711 """Add an instance to the config.
713 This should be used after creating a new instance.
715 @type instance: L{objects.Instance}
716 @param instance: the instance object
719 if not isinstance(instance, objects.Instance):
720 raise errors.ProgrammerError("Invalid type passed to AddInstance")
722 if instance.disk_template != constants.DT_DISKLESS:
723 all_lvs = instance.MapLVsByNode()
724 logging.info("Instance '%s' DISK_LAYOUT: %s", instance.name, all_lvs)
726 all_macs = self._AllMACs()
727 for nic in instance.nics:
728 if nic.mac in all_macs:
729 raise errors.ConfigurationError("Cannot add instance %s:"
730 " MAC address '%s' already in use." % (instance.name, nic.mac))
732 instance.serial_no = 1
733 instance.ctime = instance.mtime = time.time()
734 self._config_data.instances[instance.name] = instance
735 self._config_data.cluster.serial_no += 1
736 self._UnlockedReleaseDRBDMinors(instance.name)
737 for nic in instance.nics:
738 self._temporary_macs.discard(nic.mac)
741 def _SetInstanceStatus(self, instance_name, status):
742 """Set the instance's status to a given value.
745 assert isinstance(status, bool), \
746 "Invalid status '%s' passed to SetInstanceStatus" % (status,)
748 if instance_name not in self._config_data.instances:
749 raise errors.ConfigurationError("Unknown instance '%s'" %
751 instance = self._config_data.instances[instance_name]
752 if instance.admin_up != status:
753 instance.admin_up = status
754 instance.serial_no += 1
755 instance.mtime = time.time()
758 @locking.ssynchronized(_config_lock)
759 def MarkInstanceUp(self, instance_name):
760 """Mark the instance status to up in the config.
763 self._SetInstanceStatus(instance_name, True)
765 @locking.ssynchronized(_config_lock)
766 def RemoveInstance(self, instance_name):
767 """Remove the instance from the configuration.
770 if instance_name not in self._config_data.instances:
771 raise errors.ConfigurationError("Unknown instance '%s'" % instance_name)
772 del self._config_data.instances[instance_name]
773 self._config_data.cluster.serial_no += 1
776 @locking.ssynchronized(_config_lock)
777 def RenameInstance(self, old_name, new_name):
778 """Rename an instance.
780 This needs to be done in ConfigWriter and not by RemoveInstance
781 combined with AddInstance as only we can guarantee an atomic
785 if old_name not in self._config_data.instances:
786 raise errors.ConfigurationError("Unknown instance '%s'" % old_name)
787 inst = self._config_data.instances[old_name]
788 del self._config_data.instances[old_name]
791 for disk in inst.disks:
792 if disk.dev_type == constants.LD_FILE:
793 # rename the file paths in logical and physical id
794 file_storage_dir = os.path.dirname(os.path.dirname(disk.logical_id[1]))
795 disk.physical_id = disk.logical_id = (disk.logical_id[0],
796 os.path.join(file_storage_dir,
800 self._config_data.instances[inst.name] = inst
803 @locking.ssynchronized(_config_lock)
804 def MarkInstanceDown(self, instance_name):
805 """Mark the status of an instance to down in the configuration.
808 self._SetInstanceStatus(instance_name, False)
810 def _UnlockedGetInstanceList(self):
811 """Get the list of instances.
813 This function is for internal use, when the config lock is already held.
816 return self._config_data.instances.keys()
818 @locking.ssynchronized(_config_lock, shared=1)
819 def GetInstanceList(self):
820 """Get the list of instances.
822 @return: array of instances, ex. ['instance2.example.com',
823 'instance1.example.com']
826 return self._UnlockedGetInstanceList()
828 @locking.ssynchronized(_config_lock, shared=1)
829 def ExpandInstanceName(self, short_name):
830 """Attempt to expand an incomplete instance name.
833 return utils.MatchNameComponent(short_name,
834 self._config_data.instances.keys())
836 def _UnlockedGetInstanceInfo(self, instance_name):
837 """Returns information about an instance.
839 This function is for internal use, when the config lock is already held.
842 if instance_name not in self._config_data.instances:
845 return self._config_data.instances[instance_name]
847 @locking.ssynchronized(_config_lock, shared=1)
848 def GetInstanceInfo(self, instance_name):
849 """Returns information about an instance.
851 It takes the information from the configuration file. Other information of
852 an instance are taken from the live systems.
854 @param instance_name: name of the instance, e.g.
855 I{instance1.example.com}
857 @rtype: L{objects.Instance}
858 @return: the instance object
861 return self._UnlockedGetInstanceInfo(instance_name)
863 @locking.ssynchronized(_config_lock, shared=1)
864 def GetAllInstancesInfo(self):
865 """Get the configuration of all instances.
868 @return: dict of (instance, instance_info), where instance_info is what
869 would GetInstanceInfo return for the node
872 my_dict = dict([(instance, self._UnlockedGetInstanceInfo(instance))
873 for instance in self._UnlockedGetInstanceList()])
876 @locking.ssynchronized(_config_lock)
877 def AddNode(self, node):
878 """Add a node to the configuration.
880 @type node: L{objects.Node}
881 @param node: a Node instance
884 logging.info("Adding node %s to configuration" % node.name)
887 node.ctime = node.mtime = time.time()
888 self._config_data.nodes[node.name] = node
889 self._config_data.cluster.serial_no += 1
892 @locking.ssynchronized(_config_lock)
893 def RemoveNode(self, node_name):
894 """Remove a node from the configuration.
897 logging.info("Removing node %s from configuration" % node_name)
899 if node_name not in self._config_data.nodes:
900 raise errors.ConfigurationError("Unknown node '%s'" % node_name)
902 del self._config_data.nodes[node_name]
903 self._config_data.cluster.serial_no += 1
906 @locking.ssynchronized(_config_lock, shared=1)
907 def ExpandNodeName(self, short_name):
908 """Attempt to expand an incomplete instance name.
911 return utils.MatchNameComponent(short_name,
912 self._config_data.nodes.keys())
914 def _UnlockedGetNodeInfo(self, node_name):
915 """Get the configuration of a node, as stored in the config.
917 This function is for internal use, when the config lock is already
920 @param node_name: the node name, e.g. I{node1.example.com}
922 @rtype: L{objects.Node}
923 @return: the node object
926 if node_name not in self._config_data.nodes:
929 return self._config_data.nodes[node_name]
932 @locking.ssynchronized(_config_lock, shared=1)
933 def GetNodeInfo(self, node_name):
934 """Get the configuration of a node, as stored in the config.
936 This is just a locked wrapper over L{_UnlockedGetNodeInfo}.
938 @param node_name: the node name, e.g. I{node1.example.com}
940 @rtype: L{objects.Node}
941 @return: the node object
944 return self._UnlockedGetNodeInfo(node_name)
946 def _UnlockedGetNodeList(self):
947 """Return the list of nodes which are in the configuration.
949 This function is for internal use, when the config lock is already
955 return self._config_data.nodes.keys()
958 @locking.ssynchronized(_config_lock, shared=1)
959 def GetNodeList(self):
960 """Return the list of nodes which are in the configuration.
963 return self._UnlockedGetNodeList()
965 @locking.ssynchronized(_config_lock, shared=1)
966 def GetOnlineNodeList(self):
967 """Return the list of nodes which are online.
970 all_nodes = [self._UnlockedGetNodeInfo(node)
971 for node in self._UnlockedGetNodeList()]
972 return [node.name for node in all_nodes if not node.offline]
974 @locking.ssynchronized(_config_lock, shared=1)
975 def GetAllNodesInfo(self):
976 """Get the configuration of all nodes.
979 @return: dict of (node, node_info), where node_info is what
980 would GetNodeInfo return for the node
983 my_dict = dict([(node, self._UnlockedGetNodeInfo(node))
984 for node in self._UnlockedGetNodeList()])
987 def _UnlockedGetMasterCandidateStats(self, exceptions=None):
988 """Get the number of current and maximum desired and possible candidates.
990 @type exceptions: list
991 @param exceptions: if passed, list of nodes that should be ignored
993 @return: tuple of (current, desired and possible)
997 for node in self._config_data.nodes.values():
998 if exceptions and node.name in exceptions:
1000 if not (node.offline or node.drained):
1002 if node.master_candidate:
1004 mc_max = min(mc_max, self._config_data.cluster.candidate_pool_size)
1005 return (mc_now, mc_max)
1007 @locking.ssynchronized(_config_lock, shared=1)
1008 def GetMasterCandidateStats(self, exceptions=None):
1009 """Get the number of current and maximum possible candidates.
1011 This is just a wrapper over L{_UnlockedGetMasterCandidateStats}.
1013 @type exceptions: list
1014 @param exceptions: if passed, list of nodes that should be ignored
1016 @return: tuple of (current, max)
1019 return self._UnlockedGetMasterCandidateStats(exceptions)
1021 @locking.ssynchronized(_config_lock)
1022 def MaintainCandidatePool(self):
1023 """Try to grow the candidate pool to the desired size.
1026 @return: list with the adjusted nodes (L{objects.Node} instances)
1029 mc_now, mc_max = self._UnlockedGetMasterCandidateStats()
1032 node_list = self._config_data.nodes.keys()
1033 random.shuffle(node_list)
1034 for name in node_list:
1035 if mc_now >= mc_max:
1037 node = self._config_data.nodes[name]
1038 if node.master_candidate or node.offline or node.drained:
1040 mod_list.append(node)
1041 node.master_candidate = True
1044 if mc_now != mc_max:
1045 # this should not happen
1046 logging.warning("Warning: MaintainCandidatePool didn't manage to"
1047 " fill the candidate pool (%d/%d)", mc_now, mc_max)
1049 self._config_data.cluster.serial_no += 1
1054 def _BumpSerialNo(self):
1055 """Bump up the serial number of the config.
1058 self._config_data.serial_no += 1
1059 self._config_data.mtime = time.time()
1061 def _AllUUIDObjects(self):
1062 """Returns all objects with uuid attributes.
1065 return (self._config_data.instances.values() +
1066 self._config_data.nodes.values() +
1067 [self._config_data.cluster])
1069 def _OpenConfig(self):
1070 """Read the config data from disk.
1073 raw_data = utils.ReadFile(self._cfg_file)
1076 data = objects.ConfigData.FromDict(serializer.Load(raw_data))
1077 except Exception, err:
1078 raise errors.ConfigurationError(err)
1080 # Make sure the configuration has the right version
1081 _ValidateConfig(data)
1083 if (not hasattr(data, 'cluster') or
1084 not hasattr(data.cluster, 'rsahostkeypub')):
1085 raise errors.ConfigurationError("Incomplete configuration"
1086 " (missing cluster.rsahostkeypub)")
1088 # Upgrade configuration if needed
1089 data.UpgradeConfig()
1091 self._config_data = data
1092 # reset the last serial as -1 so that the next write will cause
1094 self._last_cluster_serial = -1
1096 # And finally run our (custom) config upgrade sequence
1097 self._UpgradeConfig()
1099 def _UpgradeConfig(self):
1100 """Run upgrade steps that cannot be done purely in the objects.
1102 This is because some data elements need uniqueness across the
1103 whole configuration, etc.
1105 @warning: this function will call L{_WriteConfig()}, so it needs
1106 to either be called with the lock held or from a safe place
1111 for item in self._AllUUIDObjects():
1112 if item.uuid is None:
1113 item.uuid = self.GenerateUniqueID()
1118 def _DistributeConfig(self):
1119 """Distribute the configuration to the other nodes.
1121 Currently, this only copies the configuration file. In the future,
1122 it could be used to encapsulate the 2/3-phase update mechanism.
1131 myhostname = self._my_hostname
1132 # we can skip checking whether _UnlockedGetNodeInfo returns None
1133 # since the node list comes from _UnlocketGetNodeList, and we are
1134 # called with the lock held, so no modifications should take place
1136 for node_name in self._UnlockedGetNodeList():
1137 if node_name == myhostname:
1139 node_info = self._UnlockedGetNodeInfo(node_name)
1140 if not node_info.master_candidate:
1142 node_list.append(node_info.name)
1143 addr_list.append(node_info.primary_ip)
1145 result = rpc.RpcRunner.call_upload_file(node_list, self._cfg_file,
1146 address_list=addr_list)
1147 for to_node, to_result in result.items():
1148 msg = to_result.fail_msg
1150 msg = ("Copy of file %s to node %s failed: %s" %
1151 (self._cfg_file, to_node, msg))
1156 def _WriteConfig(self, destination=None):
1157 """Write the configuration data to persistent storage.
1160 # first, cleanup the _temporary_ids set, if an ID is now in the
1161 # other objects it should be discarded to prevent unbounded growth
1163 self._CleanupTemporaryIDs()
1164 config_errors = self._UnlockedVerifyConfig()
1166 raise errors.ConfigurationError("Configuration data is not"
1168 (", ".join(config_errors)))
1169 if destination is None:
1170 destination = self._cfg_file
1171 self._BumpSerialNo()
1172 txt = serializer.Dump(self._config_data.ToDict())
1174 utils.WriteFile(destination, data=txt)
1176 self.write_count += 1
1178 # and redistribute the config file to master candidates
1179 self._DistributeConfig()
1181 # Write ssconf files on all nodes (including locally)
1182 if self._last_cluster_serial < self._config_data.cluster.serial_no:
1183 if not self._offline:
1184 result = rpc.RpcRunner.call_write_ssconf_files(\
1185 self._UnlockedGetNodeList(),
1186 self._UnlockedGetSsconfValues())
1187 for nname, nresu in result.items():
1188 msg = nresu.fail_msg
1190 logging.warning("Error while uploading ssconf files to"
1191 " node %s: %s", nname, msg)
1192 self._last_cluster_serial = self._config_data.cluster.serial_no
1194 def _UnlockedGetSsconfValues(self):
1195 """Return the values needed by ssconf.
1198 @return: a dictionary with keys the ssconf names and values their
1203 instance_names = utils.NiceSort(self._UnlockedGetInstanceList())
1204 node_names = utils.NiceSort(self._UnlockedGetNodeList())
1205 node_info = [self._UnlockedGetNodeInfo(name) for name in node_names]
1206 node_pri_ips = ["%s %s" % (ninfo.name, ninfo.primary_ip)
1207 for ninfo in node_info]
1208 node_snd_ips = ["%s %s" % (ninfo.name, ninfo.secondary_ip)
1209 for ninfo in node_info]
1211 instance_data = fn(instance_names)
1212 off_data = fn(node.name for node in node_info if node.offline)
1213 on_data = fn(node.name for node in node_info if not node.offline)
1214 mc_data = fn(node.name for node in node_info if node.master_candidate)
1215 mc_ips_data = fn(node.primary_ip for node in node_info
1216 if node.master_candidate)
1217 node_data = fn(node_names)
1218 node_pri_ips_data = fn(node_pri_ips)
1219 node_snd_ips_data = fn(node_snd_ips)
1221 cluster = self._config_data.cluster
1222 cluster_tags = fn(cluster.GetTags())
1224 constants.SS_CLUSTER_NAME: cluster.cluster_name,
1225 constants.SS_CLUSTER_TAGS: cluster_tags,
1226 constants.SS_FILE_STORAGE_DIR: cluster.file_storage_dir,
1227 constants.SS_MASTER_CANDIDATES: mc_data,
1228 constants.SS_MASTER_CANDIDATES_IPS: mc_ips_data,
1229 constants.SS_MASTER_IP: cluster.master_ip,
1230 constants.SS_MASTER_NETDEV: cluster.master_netdev,
1231 constants.SS_MASTER_NODE: cluster.master_node,
1232 constants.SS_NODE_LIST: node_data,
1233 constants.SS_NODE_PRIMARY_IPS: node_pri_ips_data,
1234 constants.SS_NODE_SECONDARY_IPS: node_snd_ips_data,
1235 constants.SS_OFFLINE_NODES: off_data,
1236 constants.SS_ONLINE_NODES: on_data,
1237 constants.SS_INSTANCE_LIST: instance_data,
1238 constants.SS_RELEASE_VERSION: constants.RELEASE_VERSION,
1241 @locking.ssynchronized(_config_lock, shared=1)
1242 def GetVGName(self):
1243 """Return the volume group name.
1246 return self._config_data.cluster.volume_group_name
1248 @locking.ssynchronized(_config_lock)
1249 def SetVGName(self, vg_name):
1250 """Set the volume group name.
1253 self._config_data.cluster.volume_group_name = vg_name
1254 self._config_data.cluster.serial_no += 1
1257 @locking.ssynchronized(_config_lock, shared=1)
1258 def GetMACPrefix(self):
1259 """Return the mac prefix.
1262 return self._config_data.cluster.mac_prefix
1264 @locking.ssynchronized(_config_lock, shared=1)
1265 def GetClusterInfo(self):
1266 """Returns information about the cluster
1268 @rtype: L{objects.Cluster}
1269 @return: the cluster object
1272 return self._config_data.cluster
1274 @locking.ssynchronized(_config_lock)
1275 def Update(self, target):
1276 """Notify function to be called after updates.
1278 This function must be called when an object (as returned by
1279 GetInstanceInfo, GetNodeInfo, GetCluster) has been updated and the
1280 caller wants the modifications saved to the backing store. Note
1281 that all modified objects will be saved, but the target argument
1282 is the one the caller wants to ensure that it's saved.
1284 @param target: an instance of either L{objects.Cluster},
1285 L{objects.Node} or L{objects.Instance} which is existing in
1289 if self._config_data is None:
1290 raise errors.ProgrammerError("Configuration file not read,"
1292 update_serial = False
1293 if isinstance(target, objects.Cluster):
1294 test = target == self._config_data.cluster
1295 elif isinstance(target, objects.Node):
1296 test = target in self._config_data.nodes.values()
1297 update_serial = True
1298 elif isinstance(target, objects.Instance):
1299 test = target in self._config_data.instances.values()
1301 raise errors.ProgrammerError("Invalid object type (%s) passed to"
1302 " ConfigWriter.Update" % type(target))
1304 raise errors.ConfigurationError("Configuration updated since object"
1305 " has been read or unknown object")
1306 target.serial_no += 1
1307 target.mtime = now = time.time()
1310 # for node updates, we need to increase the cluster serial too
1311 self._config_data.cluster.serial_no += 1
1312 self._config_data.cluster.mtime = now
1314 if isinstance(target, objects.Instance):
1315 self._UnlockedReleaseDRBDMinors(target.name)
1316 for nic in target.nics:
1317 self._temporary_macs.discard(nic.mac)