4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Configuration management for Ganeti
24 This module provides the interface to the Ganeti cluster configuration.
26 The configuration data is stored on every node but is updated on the master
27 only. After each update, the master distributes the data to the other nodes.
29 Currently, the data storage format is JSON. YAML was slow and consuming too
40 from ganeti import errors
41 from ganeti import locking
42 from ganeti import utils
43 from ganeti import constants
44 from ganeti import rpc
45 from ganeti import objects
46 from ganeti import serializer
49 _config_lock = locking.SharedLock()
52 def _ValidateConfig(data):
53 """Verifies that a configuration objects looks valid.
55 This only verifies the version of the configuration.
57 @raise errors.ConfigurationError: if the version differs from what
61 if data.version != constants.CONFIG_VERSION:
62 raise errors.ConfigurationError("Cluster configuration version"
63 " mismatch, got %s instead of %s" %
65 constants.CONFIG_VERSION))
69 """The interface to the cluster configuration.
72 def __init__(self, cfg_file=None, offline=False):
74 self._lock = _config_lock
75 self._config_data = None
76 self._offline = offline
78 self._cfg_file = constants.CLUSTER_CONF_FILE
80 self._cfg_file = cfg_file
81 self._temporary_ids = set()
82 self._temporary_drbds = {}
83 self._temporary_macs = set()
84 # Note: in order to prevent errors when resolving our name in
85 # _DistributeConfig, we compute it here once and reuse it; it's
86 # better to raise an error before starting to modify the config
87 # file than after it was modified
88 self._my_hostname = utils.HostInfo().name
89 self._last_cluster_serial = -1
92 # this method needs to be static, so that we can call it on the class
95 """Check if the cluster is configured.
98 return os.path.exists(constants.CLUSTER_CONF_FILE)
100 @locking.ssynchronized(_config_lock, shared=1)
101 def GenerateMAC(self):
102 """Generate a MAC for an instance.
104 This should check the current instances for duplicates.
107 prefix = self._config_data.cluster.mac_prefix
108 all_macs = self._AllMACs()
111 byte1 = random.randrange(0, 256)
112 byte2 = random.randrange(0, 256)
113 byte3 = random.randrange(0, 256)
114 mac = "%s:%02x:%02x:%02x" % (prefix, byte1, byte2, byte3)
115 if mac not in all_macs and mac not in self._temporary_macs:
119 raise errors.ConfigurationError("Can't generate unique MAC")
120 self._temporary_macs.add(mac)
123 @locking.ssynchronized(_config_lock, shared=1)
124 def IsMacInUse(self, mac):
125 """Predicate: check if the specified MAC is in use in the Ganeti cluster.
127 This only checks instances managed by this cluster, it does not
128 check for potential collisions elsewhere.
131 all_macs = self._AllMACs()
132 return mac in all_macs or mac in self._temporary_macs
134 @locking.ssynchronized(_config_lock, shared=1)
135 def GenerateDRBDSecret(self):
136 """Generate a DRBD secret.
138 This checks the current disks for duplicates.
141 all_secrets = self._AllDRBDSecrets()
144 secret = utils.GenerateSecret()
145 if secret not in all_secrets:
149 raise errors.ConfigurationError("Can't generate unique DRBD secret")
153 """Compute the list of all LVs.
157 for instance in self._config_data.instances.values():
158 node_data = instance.MapLVsByNode()
159 for lv_list in node_data.values():
160 lvnames.update(lv_list)
163 def _AllIDs(self, include_temporary):
164 """Compute the list of all UUIDs and names we have.
166 @type include_temporary: boolean
167 @param include_temporary: whether to include the _temporary_ids set
169 @return: a set of IDs
173 if include_temporary:
174 existing.update(self._temporary_ids)
175 existing.update(self._AllLVs())
176 existing.update(self._config_data.instances.keys())
177 existing.update(self._config_data.nodes.keys())
178 existing.update([i.uuid for i in self._AllUUIDObjects() if i.uuid])
181 def _GenerateUniqueID(self, exceptions=None):
182 """Generate an unique UUID.
184 This checks the current node, instances and disk names for
187 @param exceptions: a list with some other names which should be
188 checked for uniqueness (used for example when you want to get
189 more than one id at one time without adding each one in turn
193 @return: the unique id
196 existing = self._AllIDs(include_temporary=True)
197 if exceptions is not None:
198 existing.update(exceptions)
201 unique_id = utils.NewUUID()
202 if unique_id not in existing and unique_id is not None:
205 raise errors.ConfigurationError("Not able generate an unique ID"
206 " (last tried ID: %s" % unique_id)
207 self._temporary_ids.add(unique_id)
210 @locking.ssynchronized(_config_lock, shared=1)
211 def GenerateUniqueID(self, exceptions=None):
212 """Generate an unique ID.
214 This is just a wrapper over the unlocked version.
217 return self._GenerateUniqueID(exceptions=exceptions)
219 def _CleanupTemporaryIDs(self):
220 """Cleanups the _temporary_ids structure.
223 existing = self._AllIDs(include_temporary=False)
224 self._temporary_ids = self._temporary_ids - existing
227 """Return all MACs present in the config.
230 @return: the list of all MACs
234 for instance in self._config_data.instances.values():
235 for nic in instance.nics:
236 result.append(nic.mac)
240 def _AllDRBDSecrets(self):
241 """Return all DRBD secrets present in the config.
244 @return: the list of all DRBD secrets
247 def helper(disk, result):
248 """Recursively gather secrets from this disk."""
249 if disk.dev_type == constants.DT_DRBD8:
250 result.append(disk.logical_id[5])
252 for child in disk.children:
253 helper(child, result)
256 for instance in self._config_data.instances.values():
257 for disk in instance.disks:
262 def _CheckDiskIDs(self, disk, l_ids, p_ids):
263 """Compute duplicate disk IDs
265 @type disk: L{objects.Disk}
266 @param disk: the disk at which to start searching
268 @param l_ids: list of current logical ids
270 @param p_ids: list of current physical ids
272 @return: a list of error messages
276 if disk.logical_id is not None:
277 if disk.logical_id in l_ids:
278 result.append("duplicate logical id %s" % str(disk.logical_id))
280 l_ids.append(disk.logical_id)
281 if disk.physical_id is not None:
282 if disk.physical_id in p_ids:
283 result.append("duplicate physical id %s" % str(disk.physical_id))
285 p_ids.append(disk.physical_id)
288 for child in disk.children:
289 result.extend(self._CheckDiskIDs(child, l_ids, p_ids))
292 def _UnlockedVerifyConfig(self):
296 @return: a list of error messages; a non-empty list signifies
303 data = self._config_data
307 # global cluster checks
308 if not data.cluster.enabled_hypervisors:
309 result.append("enabled hypervisors list doesn't have any entries")
310 invalid_hvs = set(data.cluster.enabled_hypervisors) - constants.HYPER_TYPES
312 result.append("enabled hypervisors contains invalid entries: %s" %
315 if data.cluster.master_node not in data.nodes:
316 result.append("cluster has invalid primary node '%s'" %
317 data.cluster.master_node)
319 # per-instance checks
320 for instance_name in data.instances:
321 instance = data.instances[instance_name]
322 if instance.primary_node not in data.nodes:
323 result.append("instance '%s' has invalid primary node '%s'" %
324 (instance_name, instance.primary_node))
325 for snode in instance.secondary_nodes:
326 if snode not in data.nodes:
327 result.append("instance '%s' has invalid secondary node '%s'" %
328 (instance_name, snode))
329 for idx, nic in enumerate(instance.nics):
330 if nic.mac in seen_macs:
331 result.append("instance '%s' has NIC %d mac %s duplicate" %
332 (instance_name, idx, nic.mac))
334 seen_macs.append(nic.mac)
336 # gather the drbd ports for duplicate checks
337 for dsk in instance.disks:
338 if dsk.dev_type in constants.LDS_DRBD:
339 tcp_port = dsk.logical_id[2]
340 if tcp_port not in ports:
342 ports[tcp_port].append((instance.name, "drbd disk %s" % dsk.iv_name))
343 # gather network port reservation
344 net_port = getattr(instance, "network_port", None)
345 if net_port is not None:
346 if net_port not in ports:
348 ports[net_port].append((instance.name, "network port"))
350 # instance disk verify
351 for idx, disk in enumerate(instance.disks):
352 result.extend(["instance '%s' disk %d error: %s" %
353 (instance.name, idx, msg) for msg in disk.Verify()])
354 result.extend(self._CheckDiskIDs(disk, seen_lids, seen_pids))
356 # cluster-wide pool of free ports
357 for free_port in data.cluster.tcpudp_port_pool:
358 if free_port not in ports:
359 ports[free_port] = []
360 ports[free_port].append(("cluster", "port marked as free"))
362 # compute tcp/udp duplicate ports
368 txt = ", ".join(["%s/%s" % val for val in pdata])
369 result.append("tcp/udp port %s has duplicates: %s" % (pnum, txt))
371 # highest used tcp port check
373 if keys[-1] > data.cluster.highest_used_port:
374 result.append("Highest used port mismatch, saved %s, computed %s" %
375 (data.cluster.highest_used_port, keys[-1]))
377 if not data.nodes[data.cluster.master_node].master_candidate:
378 result.append("Master node is not a master candidate")
380 # master candidate checks
381 mc_now, mc_max = self._UnlockedGetMasterCandidateStats()
383 result.append("Not enough master candidates: actual %d, target %d" %
387 for node in data.nodes.values():
388 if [node.master_candidate, node.drained, node.offline].count(True) > 1:
389 result.append("Node %s state is invalid: master_candidate=%s,"
390 " drain=%s, offline=%s" %
391 (node.name, node.master_candidate, node.drain,
395 d_map, duplicates = self._UnlockedComputeDRBDMap()
396 for node, minor, instance_a, instance_b in duplicates:
397 result.append("DRBD minor %d on node %s is assigned twice to instances"
398 " %s and %s" % (minor, node, instance_a, instance_b))
402 @locking.ssynchronized(_config_lock, shared=1)
403 def VerifyConfig(self):
406 This is just a wrapper over L{_UnlockedVerifyConfig}.
409 @return: a list of error messages; a non-empty list signifies
413 return self._UnlockedVerifyConfig()
415 def _UnlockedSetDiskID(self, disk, node_name):
416 """Convert the unique ID to the ID needed on the target nodes.
418 This is used only for drbd, which needs ip/port configuration.
420 The routine descends down and updates its children also, because
421 this helps when the only the top device is passed to the remote
424 This function is for internal use, when the config lock is already held.
428 for child in disk.children:
429 self._UnlockedSetDiskID(child, node_name)
431 if disk.logical_id is None and disk.physical_id is not None:
433 if disk.dev_type == constants.LD_DRBD8:
434 pnode, snode, port, pminor, sminor, secret = disk.logical_id
435 if node_name not in (pnode, snode):
436 raise errors.ConfigurationError("DRBD device not knowing node %s" %
438 pnode_info = self._UnlockedGetNodeInfo(pnode)
439 snode_info = self._UnlockedGetNodeInfo(snode)
440 if pnode_info is None or snode_info is None:
441 raise errors.ConfigurationError("Can't find primary or secondary node"
442 " for %s" % str(disk))
443 p_data = (pnode_info.secondary_ip, port)
444 s_data = (snode_info.secondary_ip, port)
445 if pnode == node_name:
446 disk.physical_id = p_data + s_data + (pminor, secret)
447 else: # it must be secondary, we tested above
448 disk.physical_id = s_data + p_data + (sminor, secret)
450 disk.physical_id = disk.logical_id
453 @locking.ssynchronized(_config_lock)
454 def SetDiskID(self, disk, node_name):
455 """Convert the unique ID to the ID needed on the target nodes.
457 This is used only for drbd, which needs ip/port configuration.
459 The routine descends down and updates its children also, because
460 this helps when the only the top device is passed to the remote
464 return self._UnlockedSetDiskID(disk, node_name)
466 @locking.ssynchronized(_config_lock)
467 def AddTcpUdpPort(self, port):
468 """Adds a new port to the available port pool.
471 if not isinstance(port, int):
472 raise errors.ProgrammerError("Invalid type passed for port")
474 self._config_data.cluster.tcpudp_port_pool.add(port)
477 @locking.ssynchronized(_config_lock, shared=1)
478 def GetPortList(self):
479 """Returns a copy of the current port list.
482 return self._config_data.cluster.tcpudp_port_pool.copy()
484 @locking.ssynchronized(_config_lock)
485 def AllocatePort(self):
488 The port will be taken from the available port pool or from the
489 default port range (and in this case we increase
493 # If there are TCP/IP ports configured, we use them first.
494 if self._config_data.cluster.tcpudp_port_pool:
495 port = self._config_data.cluster.tcpudp_port_pool.pop()
497 port = self._config_data.cluster.highest_used_port + 1
498 if port >= constants.LAST_DRBD_PORT:
499 raise errors.ConfigurationError("The highest used port is greater"
500 " than %s. Aborting." %
501 constants.LAST_DRBD_PORT)
502 self._config_data.cluster.highest_used_port = port
507 def _UnlockedComputeDRBDMap(self):
508 """Compute the used DRBD minor/nodes.
511 @return: dictionary of node_name: dict of minor: instance_name;
512 the returned dict will have all the nodes in it (even if with
513 an empty list), and a list of duplicates; if the duplicates
514 list is not empty, the configuration is corrupted and its caller
515 should raise an exception
518 def _AppendUsedPorts(instance_name, disk, used):
520 if disk.dev_type == constants.LD_DRBD8 and len(disk.logical_id) >= 5:
521 node_a, node_b, _, minor_a, minor_b = disk.logical_id[:5]
522 for node, port in ((node_a, minor_a), (node_b, minor_b)):
523 assert node in used, ("Node '%s' of instance '%s' not found"
524 " in node list" % (node, instance_name))
525 if port in used[node]:
526 duplicates.append((node, port, instance_name, used[node][port]))
528 used[node][port] = instance_name
530 for child in disk.children:
531 duplicates.extend(_AppendUsedPorts(instance_name, child, used))
535 my_dict = dict((node, {}) for node in self._config_data.nodes)
536 for instance in self._config_data.instances.itervalues():
537 for disk in instance.disks:
538 duplicates.extend(_AppendUsedPorts(instance.name, disk, my_dict))
539 for (node, minor), instance in self._temporary_drbds.iteritems():
540 if minor in my_dict[node] and my_dict[node][minor] != instance:
541 duplicates.append((node, minor, instance, my_dict[node][minor]))
543 my_dict[node][minor] = instance
544 return my_dict, duplicates
546 @locking.ssynchronized(_config_lock)
547 def ComputeDRBDMap(self):
548 """Compute the used DRBD minor/nodes.
550 This is just a wrapper over L{_UnlockedComputeDRBDMap}.
552 @return: dictionary of node_name: dict of minor: instance_name;
553 the returned dict will have all the nodes in it (even if with
557 d_map, duplicates = self._UnlockedComputeDRBDMap()
559 raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
563 @locking.ssynchronized(_config_lock)
564 def AllocateDRBDMinor(self, nodes, instance):
565 """Allocate a drbd minor.
567 The free minor will be automatically computed from the existing
568 devices. A node can be given multiple times in order to allocate
569 multiple minors. The result is the list of minors, in the same
570 order as the passed nodes.
572 @type instance: string
573 @param instance: the instance for which we allocate minors
576 assert isinstance(instance, basestring), \
577 "Invalid argument '%s' passed to AllocateDRBDMinor" % instance
579 d_map, duplicates = self._UnlockedComputeDRBDMap()
581 raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
587 # no minors used, we can start at 0
590 self._temporary_drbds[(nname, 0)] = instance
594 ffree = utils.FirstFree(keys)
596 # return the next minor
597 # TODO: implement high-limit check
601 # double-check minor against current instances
602 assert minor not in d_map[nname], \
603 ("Attempt to reuse allocated DRBD minor %d on node %s,"
604 " already allocated to instance %s" %
605 (minor, nname, d_map[nname][minor]))
606 ndata[minor] = instance
607 # double-check minor against reservation
608 r_key = (nname, minor)
609 assert r_key not in self._temporary_drbds, \
610 ("Attempt to reuse reserved DRBD minor %d on node %s,"
611 " reserved for instance %s" %
612 (minor, nname, self._temporary_drbds[r_key]))
613 self._temporary_drbds[r_key] = instance
615 logging.debug("Request to allocate drbd minors, input: %s, returning %s",
619 def _UnlockedReleaseDRBDMinors(self, instance):
620 """Release temporary drbd minors allocated for a given instance.
622 @type instance: string
623 @param instance: the instance for which temporary minors should be
627 assert isinstance(instance, basestring), \
628 "Invalid argument passed to ReleaseDRBDMinors"
629 for key, name in self._temporary_drbds.items():
631 del self._temporary_drbds[key]
633 @locking.ssynchronized(_config_lock)
634 def ReleaseDRBDMinors(self, instance):
635 """Release temporary drbd minors allocated for a given instance.
637 This should be called on the error paths, on the success paths
638 it's automatically called by the ConfigWriter add and update
641 This function is just a wrapper over L{_UnlockedReleaseDRBDMinors}.
643 @type instance: string
644 @param instance: the instance for which temporary minors should be
648 self._UnlockedReleaseDRBDMinors(instance)
650 @locking.ssynchronized(_config_lock, shared=1)
651 def GetConfigVersion(self):
652 """Get the configuration version.
654 @return: Config version
657 return self._config_data.version
659 @locking.ssynchronized(_config_lock, shared=1)
660 def GetClusterName(self):
663 @return: Cluster name
666 return self._config_data.cluster.cluster_name
668 @locking.ssynchronized(_config_lock, shared=1)
669 def GetMasterNode(self):
670 """Get the hostname of the master node for this cluster.
672 @return: Master hostname
675 return self._config_data.cluster.master_node
677 @locking.ssynchronized(_config_lock, shared=1)
678 def GetMasterIP(self):
679 """Get the IP of the master node for this cluster.
684 return self._config_data.cluster.master_ip
686 @locking.ssynchronized(_config_lock, shared=1)
687 def GetMasterNetdev(self):
688 """Get the master network device for this cluster.
691 return self._config_data.cluster.master_netdev
693 @locking.ssynchronized(_config_lock, shared=1)
694 def GetFileStorageDir(self):
695 """Get the file storage dir for this cluster.
698 return self._config_data.cluster.file_storage_dir
700 @locking.ssynchronized(_config_lock, shared=1)
701 def GetHypervisorType(self):
702 """Get the hypervisor type for this cluster.
705 return self._config_data.cluster.enabled_hypervisors[0]
707 @locking.ssynchronized(_config_lock, shared=1)
708 def GetHostKey(self):
709 """Return the rsa hostkey from the config.
712 @return: the rsa hostkey
715 return self._config_data.cluster.rsahostkeypub
717 @locking.ssynchronized(_config_lock)
718 def AddInstance(self, instance):
719 """Add an instance to the config.
721 This should be used after creating a new instance.
723 @type instance: L{objects.Instance}
724 @param instance: the instance object
727 if not isinstance(instance, objects.Instance):
728 raise errors.ProgrammerError("Invalid type passed to AddInstance")
730 if instance.disk_template != constants.DT_DISKLESS:
731 all_lvs = instance.MapLVsByNode()
732 logging.info("Instance '%s' DISK_LAYOUT: %s", instance.name, all_lvs)
734 all_macs = self._AllMACs()
735 for nic in instance.nics:
736 if nic.mac in all_macs:
737 raise errors.ConfigurationError("Cannot add instance %s:"
738 " MAC address '%s' already in use." %
739 (instance.name, nic.mac))
741 self._EnsureUUID(instance)
743 instance.serial_no = 1
744 instance.ctime = instance.mtime = time.time()
745 self._config_data.instances[instance.name] = instance
746 self._config_data.cluster.serial_no += 1
747 self._UnlockedReleaseDRBDMinors(instance.name)
748 for nic in instance.nics:
749 self._temporary_macs.discard(nic.mac)
752 def _EnsureUUID(self, item):
753 """Ensures a given object has a valid UUID.
755 @param item: the instance or node to be checked
759 item.uuid = self._GenerateUniqueID()
760 elif item.uuid in self._AllIDs(temporary=True):
761 raise errors.ConfigurationError("Cannot add '%s': UUID already in use" %
762 (item.name, item.uuid))
764 def _SetInstanceStatus(self, instance_name, status):
765 """Set the instance's status to a given value.
768 assert isinstance(status, bool), \
769 "Invalid status '%s' passed to SetInstanceStatus" % (status,)
771 if instance_name not in self._config_data.instances:
772 raise errors.ConfigurationError("Unknown instance '%s'" %
774 instance = self._config_data.instances[instance_name]
775 if instance.admin_up != status:
776 instance.admin_up = status
777 instance.serial_no += 1
778 instance.mtime = time.time()
781 @locking.ssynchronized(_config_lock)
782 def MarkInstanceUp(self, instance_name):
783 """Mark the instance status to up in the config.
786 self._SetInstanceStatus(instance_name, True)
788 @locking.ssynchronized(_config_lock)
789 def RemoveInstance(self, instance_name):
790 """Remove the instance from the configuration.
793 if instance_name not in self._config_data.instances:
794 raise errors.ConfigurationError("Unknown instance '%s'" % instance_name)
795 del self._config_data.instances[instance_name]
796 self._config_data.cluster.serial_no += 1
799 @locking.ssynchronized(_config_lock)
800 def RenameInstance(self, old_name, new_name):
801 """Rename an instance.
803 This needs to be done in ConfigWriter and not by RemoveInstance
804 combined with AddInstance as only we can guarantee an atomic
808 if old_name not in self._config_data.instances:
809 raise errors.ConfigurationError("Unknown instance '%s'" % old_name)
810 inst = self._config_data.instances[old_name]
811 del self._config_data.instances[old_name]
814 for disk in inst.disks:
815 if disk.dev_type == constants.LD_FILE:
816 # rename the file paths in logical and physical id
817 file_storage_dir = os.path.dirname(os.path.dirname(disk.logical_id[1]))
818 disk.physical_id = disk.logical_id = (disk.logical_id[0],
819 os.path.join(file_storage_dir,
823 self._config_data.instances[inst.name] = inst
826 @locking.ssynchronized(_config_lock)
827 def MarkInstanceDown(self, instance_name):
828 """Mark the status of an instance to down in the configuration.
831 self._SetInstanceStatus(instance_name, False)
833 def _UnlockedGetInstanceList(self):
834 """Get the list of instances.
836 This function is for internal use, when the config lock is already held.
839 return self._config_data.instances.keys()
841 @locking.ssynchronized(_config_lock, shared=1)
842 def GetInstanceList(self):
843 """Get the list of instances.
845 @return: array of instances, ex. ['instance2.example.com',
846 'instance1.example.com']
849 return self._UnlockedGetInstanceList()
851 @locking.ssynchronized(_config_lock, shared=1)
852 def ExpandInstanceName(self, short_name):
853 """Attempt to expand an incomplete instance name.
856 return utils.MatchNameComponent(short_name,
857 self._config_data.instances.keys())
859 def _UnlockedGetInstanceInfo(self, instance_name):
860 """Returns information about an instance.
862 This function is for internal use, when the config lock is already held.
865 if instance_name not in self._config_data.instances:
868 return self._config_data.instances[instance_name]
870 @locking.ssynchronized(_config_lock, shared=1)
871 def GetInstanceInfo(self, instance_name):
872 """Returns information about an instance.
874 It takes the information from the configuration file. Other information of
875 an instance are taken from the live systems.
877 @param instance_name: name of the instance, e.g.
878 I{instance1.example.com}
880 @rtype: L{objects.Instance}
881 @return: the instance object
884 return self._UnlockedGetInstanceInfo(instance_name)
886 @locking.ssynchronized(_config_lock, shared=1)
887 def GetAllInstancesInfo(self):
888 """Get the configuration of all instances.
891 @return: dict of (instance, instance_info), where instance_info is what
892 would GetInstanceInfo return for the node
895 my_dict = dict([(instance, self._UnlockedGetInstanceInfo(instance))
896 for instance in self._UnlockedGetInstanceList()])
899 @locking.ssynchronized(_config_lock)
900 def AddNode(self, node):
901 """Add a node to the configuration.
903 @type node: L{objects.Node}
904 @param node: a Node instance
907 logging.info("Adding node %s to configuration" % node.name)
909 self._EnsureUUID(node)
912 node.ctime = node.mtime = time.time()
913 self._config_data.nodes[node.name] = node
914 self._config_data.cluster.serial_no += 1
917 @locking.ssynchronized(_config_lock)
918 def RemoveNode(self, node_name):
919 """Remove a node from the configuration.
922 logging.info("Removing node %s from configuration" % node_name)
924 if node_name not in self._config_data.nodes:
925 raise errors.ConfigurationError("Unknown node '%s'" % node_name)
927 del self._config_data.nodes[node_name]
928 self._config_data.cluster.serial_no += 1
931 @locking.ssynchronized(_config_lock, shared=1)
932 def ExpandNodeName(self, short_name):
933 """Attempt to expand an incomplete instance name.
936 return utils.MatchNameComponent(short_name,
937 self._config_data.nodes.keys())
939 def _UnlockedGetNodeInfo(self, node_name):
940 """Get the configuration of a node, as stored in the config.
942 This function is for internal use, when the config lock is already
945 @param node_name: the node name, e.g. I{node1.example.com}
947 @rtype: L{objects.Node}
948 @return: the node object
951 if node_name not in self._config_data.nodes:
954 return self._config_data.nodes[node_name]
957 @locking.ssynchronized(_config_lock, shared=1)
958 def GetNodeInfo(self, node_name):
959 """Get the configuration of a node, as stored in the config.
961 This is just a locked wrapper over L{_UnlockedGetNodeInfo}.
963 @param node_name: the node name, e.g. I{node1.example.com}
965 @rtype: L{objects.Node}
966 @return: the node object
969 return self._UnlockedGetNodeInfo(node_name)
971 def _UnlockedGetNodeList(self):
972 """Return the list of nodes which are in the configuration.
974 This function is for internal use, when the config lock is already
980 return self._config_data.nodes.keys()
983 @locking.ssynchronized(_config_lock, shared=1)
984 def GetNodeList(self):
985 """Return the list of nodes which are in the configuration.
988 return self._UnlockedGetNodeList()
990 @locking.ssynchronized(_config_lock, shared=1)
991 def GetOnlineNodeList(self):
992 """Return the list of nodes which are online.
995 all_nodes = [self._UnlockedGetNodeInfo(node)
996 for node in self._UnlockedGetNodeList()]
997 return [node.name for node in all_nodes if not node.offline]
999 @locking.ssynchronized(_config_lock, shared=1)
1000 def GetAllNodesInfo(self):
1001 """Get the configuration of all nodes.
1004 @return: dict of (node, node_info), where node_info is what
1005 would GetNodeInfo return for the node
1008 my_dict = dict([(node, self._UnlockedGetNodeInfo(node))
1009 for node in self._UnlockedGetNodeList()])
1012 def _UnlockedGetMasterCandidateStats(self, exceptions=None):
1013 """Get the number of current and maximum desired and possible candidates.
1015 @type exceptions: list
1016 @param exceptions: if passed, list of nodes that should be ignored
1018 @return: tuple of (current, desired and possible)
1022 for node in self._config_data.nodes.values():
1023 if exceptions and node.name in exceptions:
1025 if not (node.offline or node.drained):
1027 if node.master_candidate:
1029 mc_max = min(mc_max, self._config_data.cluster.candidate_pool_size)
1030 return (mc_now, mc_max)
1032 @locking.ssynchronized(_config_lock, shared=1)
1033 def GetMasterCandidateStats(self, exceptions=None):
1034 """Get the number of current and maximum possible candidates.
1036 This is just a wrapper over L{_UnlockedGetMasterCandidateStats}.
1038 @type exceptions: list
1039 @param exceptions: if passed, list of nodes that should be ignored
1041 @return: tuple of (current, max)
1044 return self._UnlockedGetMasterCandidateStats(exceptions)
1046 @locking.ssynchronized(_config_lock)
1047 def MaintainCandidatePool(self):
1048 """Try to grow the candidate pool to the desired size.
1051 @return: list with the adjusted nodes (L{objects.Node} instances)
1054 mc_now, mc_max = self._UnlockedGetMasterCandidateStats()
1057 node_list = self._config_data.nodes.keys()
1058 random.shuffle(node_list)
1059 for name in node_list:
1060 if mc_now >= mc_max:
1062 node = self._config_data.nodes[name]
1063 if node.master_candidate or node.offline or node.drained:
1065 mod_list.append(node)
1066 node.master_candidate = True
1069 if mc_now != mc_max:
1070 # this should not happen
1071 logging.warning("Warning: MaintainCandidatePool didn't manage to"
1072 " fill the candidate pool (%d/%d)", mc_now, mc_max)
1074 self._config_data.cluster.serial_no += 1
1079 def _BumpSerialNo(self):
1080 """Bump up the serial number of the config.
1083 self._config_data.serial_no += 1
1084 self._config_data.mtime = time.time()
1086 def _AllUUIDObjects(self):
1087 """Returns all objects with uuid attributes.
1090 return (self._config_data.instances.values() +
1091 self._config_data.nodes.values() +
1092 [self._config_data.cluster])
1094 def _OpenConfig(self):
1095 """Read the config data from disk.
1098 raw_data = utils.ReadFile(self._cfg_file)
1101 data = objects.ConfigData.FromDict(serializer.Load(raw_data))
1102 except Exception, err:
1103 raise errors.ConfigurationError(err)
1105 # Make sure the configuration has the right version
1106 _ValidateConfig(data)
1108 if (not hasattr(data, 'cluster') or
1109 not hasattr(data.cluster, 'rsahostkeypub')):
1110 raise errors.ConfigurationError("Incomplete configuration"
1111 " (missing cluster.rsahostkeypub)")
1113 # Upgrade configuration if needed
1114 data.UpgradeConfig()
1116 self._config_data = data
1117 # reset the last serial as -1 so that the next write will cause
1119 self._last_cluster_serial = -1
1121 # And finally run our (custom) config upgrade sequence
1122 self._UpgradeConfig()
1124 def _UpgradeConfig(self):
1125 """Run upgrade steps that cannot be done purely in the objects.
1127 This is because some data elements need uniqueness across the
1128 whole configuration, etc.
1130 @warning: this function will call L{_WriteConfig()}, so it needs
1131 to either be called with the lock held or from a safe place
1136 for item in self._AllUUIDObjects():
1137 if item.uuid is None:
1138 item.uuid = self._GenerateUniqueID()
1143 def _DistributeConfig(self):
1144 """Distribute the configuration to the other nodes.
1146 Currently, this only copies the configuration file. In the future,
1147 it could be used to encapsulate the 2/3-phase update mechanism.
1156 myhostname = self._my_hostname
1157 # we can skip checking whether _UnlockedGetNodeInfo returns None
1158 # since the node list comes from _UnlocketGetNodeList, and we are
1159 # called with the lock held, so no modifications should take place
1161 for node_name in self._UnlockedGetNodeList():
1162 if node_name == myhostname:
1164 node_info = self._UnlockedGetNodeInfo(node_name)
1165 if not node_info.master_candidate:
1167 node_list.append(node_info.name)
1168 addr_list.append(node_info.primary_ip)
1170 result = rpc.RpcRunner.call_upload_file(node_list, self._cfg_file,
1171 address_list=addr_list)
1172 for to_node, to_result in result.items():
1173 msg = to_result.fail_msg
1175 msg = ("Copy of file %s to node %s failed: %s" %
1176 (self._cfg_file, to_node, msg))
1181 def _WriteConfig(self, destination=None):
1182 """Write the configuration data to persistent storage.
1185 # first, cleanup the _temporary_ids set, if an ID is now in the
1186 # other objects it should be discarded to prevent unbounded growth
1188 self._CleanupTemporaryIDs()
1189 config_errors = self._UnlockedVerifyConfig()
1191 raise errors.ConfigurationError("Configuration data is not"
1193 (", ".join(config_errors)))
1194 if destination is None:
1195 destination = self._cfg_file
1196 self._BumpSerialNo()
1197 txt = serializer.Dump(self._config_data.ToDict())
1199 utils.WriteFile(destination, data=txt)
1201 self.write_count += 1
1203 # and redistribute the config file to master candidates
1204 self._DistributeConfig()
1206 # Write ssconf files on all nodes (including locally)
1207 if self._last_cluster_serial < self._config_data.cluster.serial_no:
1208 if not self._offline:
1209 result = rpc.RpcRunner.call_write_ssconf_files(\
1210 self._UnlockedGetNodeList(),
1211 self._UnlockedGetSsconfValues())
1212 for nname, nresu in result.items():
1213 msg = nresu.fail_msg
1215 logging.warning("Error while uploading ssconf files to"
1216 " node %s: %s", nname, msg)
1217 self._last_cluster_serial = self._config_data.cluster.serial_no
1219 def _UnlockedGetSsconfValues(self):
1220 """Return the values needed by ssconf.
1223 @return: a dictionary with keys the ssconf names and values their
1228 instance_names = utils.NiceSort(self._UnlockedGetInstanceList())
1229 node_names = utils.NiceSort(self._UnlockedGetNodeList())
1230 node_info = [self._UnlockedGetNodeInfo(name) for name in node_names]
1231 node_pri_ips = ["%s %s" % (ninfo.name, ninfo.primary_ip)
1232 for ninfo in node_info]
1233 node_snd_ips = ["%s %s" % (ninfo.name, ninfo.secondary_ip)
1234 for ninfo in node_info]
1236 instance_data = fn(instance_names)
1237 off_data = fn(node.name for node in node_info if node.offline)
1238 on_data = fn(node.name for node in node_info if not node.offline)
1239 mc_data = fn(node.name for node in node_info if node.master_candidate)
1240 mc_ips_data = fn(node.primary_ip for node in node_info
1241 if node.master_candidate)
1242 node_data = fn(node_names)
1243 node_pri_ips_data = fn(node_pri_ips)
1244 node_snd_ips_data = fn(node_snd_ips)
1246 cluster = self._config_data.cluster
1247 cluster_tags = fn(cluster.GetTags())
1249 constants.SS_CLUSTER_NAME: cluster.cluster_name,
1250 constants.SS_CLUSTER_TAGS: cluster_tags,
1251 constants.SS_FILE_STORAGE_DIR: cluster.file_storage_dir,
1252 constants.SS_MASTER_CANDIDATES: mc_data,
1253 constants.SS_MASTER_CANDIDATES_IPS: mc_ips_data,
1254 constants.SS_MASTER_IP: cluster.master_ip,
1255 constants.SS_MASTER_NETDEV: cluster.master_netdev,
1256 constants.SS_MASTER_NODE: cluster.master_node,
1257 constants.SS_NODE_LIST: node_data,
1258 constants.SS_NODE_PRIMARY_IPS: node_pri_ips_data,
1259 constants.SS_NODE_SECONDARY_IPS: node_snd_ips_data,
1260 constants.SS_OFFLINE_NODES: off_data,
1261 constants.SS_ONLINE_NODES: on_data,
1262 constants.SS_INSTANCE_LIST: instance_data,
1263 constants.SS_RELEASE_VERSION: constants.RELEASE_VERSION,
1266 @locking.ssynchronized(_config_lock, shared=1)
1267 def GetVGName(self):
1268 """Return the volume group name.
1271 return self._config_data.cluster.volume_group_name
1273 @locking.ssynchronized(_config_lock)
1274 def SetVGName(self, vg_name):
1275 """Set the volume group name.
1278 self._config_data.cluster.volume_group_name = vg_name
1279 self._config_data.cluster.serial_no += 1
1282 @locking.ssynchronized(_config_lock, shared=1)
1283 def GetMACPrefix(self):
1284 """Return the mac prefix.
1287 return self._config_data.cluster.mac_prefix
1289 @locking.ssynchronized(_config_lock, shared=1)
1290 def GetClusterInfo(self):
1291 """Returns information about the cluster
1293 @rtype: L{objects.Cluster}
1294 @return: the cluster object
1297 return self._config_data.cluster
1299 @locking.ssynchronized(_config_lock)
1300 def Update(self, target):
1301 """Notify function to be called after updates.
1303 This function must be called when an object (as returned by
1304 GetInstanceInfo, GetNodeInfo, GetCluster) has been updated and the
1305 caller wants the modifications saved to the backing store. Note
1306 that all modified objects will be saved, but the target argument
1307 is the one the caller wants to ensure that it's saved.
1309 @param target: an instance of either L{objects.Cluster},
1310 L{objects.Node} or L{objects.Instance} which is existing in
1314 if self._config_data is None:
1315 raise errors.ProgrammerError("Configuration file not read,"
1317 update_serial = False
1318 if isinstance(target, objects.Cluster):
1319 test = target == self._config_data.cluster
1320 elif isinstance(target, objects.Node):
1321 test = target in self._config_data.nodes.values()
1322 update_serial = True
1323 elif isinstance(target, objects.Instance):
1324 test = target in self._config_data.instances.values()
1326 raise errors.ProgrammerError("Invalid object type (%s) passed to"
1327 " ConfigWriter.Update" % type(target))
1329 raise errors.ConfigurationError("Configuration updated since object"
1330 " has been read or unknown object")
1331 target.serial_no += 1
1332 target.mtime = now = time.time()
1335 # for node updates, we need to increase the cluster serial too
1336 self._config_data.cluster.serial_no += 1
1337 self._config_data.cluster.mtime = now
1339 if isinstance(target, objects.Instance):
1340 self._UnlockedReleaseDRBDMinors(target.name)
1341 for nic in target.nics:
1342 self._temporary_macs.discard(nic.mac)