4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Configuration management for Ganeti
24 This module provides the interface to the Ganeti cluster configuration.
26 The configuration data is stored on every node but is updated on the master
27 only. After each update, the master distributes the data to the other nodes.
29 Currently, the data storage format is JSON. YAML was slow and consuming too
39 from ganeti import errors
40 from ganeti import locking
41 from ganeti import utils
42 from ganeti import constants
43 from ganeti import rpc
44 from ganeti import objects
45 from ganeti import serializer
48 _config_lock = locking.SharedLock()
51 def _ValidateConfig(data):
52 """Verifies that a configuration objects looks valid.
54 This only verifies the version of the configuration.
56 @raise errors.ConfigurationError: if the version differs from what
60 if data.version != constants.CONFIG_VERSION:
61 raise errors.ConfigurationError("Cluster configuration version"
62 " mismatch, got %s instead of %s" %
64 constants.CONFIG_VERSION))
68 """The interface to the cluster configuration.
71 def __init__(self, cfg_file=None, offline=False):
73 self._lock = _config_lock
74 self._config_data = None
75 self._offline = offline
77 self._cfg_file = constants.CLUSTER_CONF_FILE
79 self._cfg_file = cfg_file
80 self._temporary_ids = set()
81 self._temporary_drbds = {}
82 self._temporary_macs = set()
83 # Note: in order to prevent errors when resolving our name in
84 # _DistributeConfig, we compute it here once and reuse it; it's
85 # better to raise an error before starting to modify the config
86 # file than after it was modified
87 self._my_hostname = utils.HostInfo().name
88 self._last_cluster_serial = -1
91 # this method needs to be static, so that we can call it on the class
94 """Check if the cluster is configured.
97 return os.path.exists(constants.CLUSTER_CONF_FILE)
99 @locking.ssynchronized(_config_lock, shared=1)
100 def GenerateMAC(self):
101 """Generate a MAC for an instance.
103 This should check the current instances for duplicates.
106 prefix = self._config_data.cluster.mac_prefix
107 all_macs = self._AllMACs()
110 byte1 = random.randrange(0, 256)
111 byte2 = random.randrange(0, 256)
112 byte3 = random.randrange(0, 256)
113 mac = "%s:%02x:%02x:%02x" % (prefix, byte1, byte2, byte3)
114 if mac not in all_macs and mac not in self._temporary_macs:
118 raise errors.ConfigurationError("Can't generate unique MAC")
119 self._temporary_macs.add(mac)
122 @locking.ssynchronized(_config_lock, shared=1)
123 def IsMacInUse(self, mac):
124 """Predicate: check if the specified MAC is in use in the Ganeti cluster.
126 This only checks instances managed by this cluster, it does not
127 check for potential collisions elsewhere.
130 all_macs = self._AllMACs()
131 return mac in all_macs or mac in self._temporary_macs
133 @locking.ssynchronized(_config_lock, shared=1)
134 def GenerateDRBDSecret(self):
135 """Generate a DRBD secret.
137 This checks the current disks for duplicates.
140 all_secrets = self._AllDRBDSecrets()
143 secret = utils.GenerateSecret()
144 if secret not in all_secrets:
148 raise errors.ConfigurationError("Can't generate unique DRBD secret")
152 """Compute the list of all LVs.
156 for instance in self._config_data.instances.values():
157 node_data = instance.MapLVsByNode()
158 for lv_list in node_data.values():
159 lvnames.update(lv_list)
162 def _AllIDs(self, include_temporary):
163 """Compute the list of all UUIDs and names we have.
165 @type include_temporary: boolean
166 @param include_temporary: whether to include the _temporary_ids set
168 @return: a set of IDs
172 if include_temporary:
173 existing.update(self._temporary_ids)
174 existing.update(self._AllLVs())
175 existing.update(self._config_data.instances.keys())
176 existing.update(self._config_data.nodes.keys())
179 @locking.ssynchronized(_config_lock, shared=1)
180 def GenerateUniqueID(self, exceptions=None):
181 """Generate an unique disk name.
183 This checks the current node, instances and disk names for
186 @param exceptions: a list with some other names which should be checked
187 for uniqueness (used for example when you want to get
188 more than one id at one time without adding each one in
189 turn to the config file)
192 @return: the unique id
195 existing = self._AllIDs(include_temporary=True)
196 if exceptions is not None:
197 existing.update(exceptions)
200 unique_id = utils.NewUUID()
201 if unique_id not in existing and unique_id is not None:
204 raise errors.ConfigurationError("Not able generate an unique ID"
205 " (last tried ID: %s" % unique_id)
206 self._temporary_ids.add(unique_id)
210 """Return all MACs present in the config.
213 @return: the list of all MACs
217 for instance in self._config_data.instances.values():
218 for nic in instance.nics:
219 result.append(nic.mac)
223 def _AllDRBDSecrets(self):
224 """Return all DRBD secrets present in the config.
227 @return: the list of all DRBD secrets
230 def helper(disk, result):
231 """Recursively gather secrets from this disk."""
232 if disk.dev_type == constants.DT_DRBD8:
233 result.append(disk.logical_id[5])
235 for child in disk.children:
236 helper(child, result)
239 for instance in self._config_data.instances.values():
240 for disk in instance.disks:
245 def _CheckDiskIDs(self, disk, l_ids, p_ids):
246 """Compute duplicate disk IDs
248 @type disk: L{objects.Disk}
249 @param disk: the disk at which to start searching
251 @param l_ids: list of current logical ids
253 @param p_ids: list of current physical ids
255 @return: a list of error messages
259 if disk.logical_id is not None:
260 if disk.logical_id in l_ids:
261 result.append("duplicate logical id %s" % str(disk.logical_id))
263 l_ids.append(disk.logical_id)
264 if disk.physical_id is not None:
265 if disk.physical_id in p_ids:
266 result.append("duplicate physical id %s" % str(disk.physical_id))
268 p_ids.append(disk.physical_id)
271 for child in disk.children:
272 result.extend(self._CheckDiskIDs(child, l_ids, p_ids))
275 def _UnlockedVerifyConfig(self):
279 @return: a list of error messages; a non-empty list signifies
286 data = self._config_data
290 # global cluster checks
291 if not data.cluster.enabled_hypervisors:
292 result.append("enabled hypervisors list doesn't have any entries")
293 invalid_hvs = set(data.cluster.enabled_hypervisors) - constants.HYPER_TYPES
295 result.append("enabled hypervisors contains invalid entries: %s" %
298 if data.cluster.master_node not in data.nodes:
299 result.append("cluster has invalid primary node '%s'" %
300 data.cluster.master_node)
302 # per-instance checks
303 for instance_name in data.instances:
304 instance = data.instances[instance_name]
305 if instance.primary_node not in data.nodes:
306 result.append("instance '%s' has invalid primary node '%s'" %
307 (instance_name, instance.primary_node))
308 for snode in instance.secondary_nodes:
309 if snode not in data.nodes:
310 result.append("instance '%s' has invalid secondary node '%s'" %
311 (instance_name, snode))
312 for idx, nic in enumerate(instance.nics):
313 if nic.mac in seen_macs:
314 result.append("instance '%s' has NIC %d mac %s duplicate" %
315 (instance_name, idx, nic.mac))
317 seen_macs.append(nic.mac)
319 # gather the drbd ports for duplicate checks
320 for dsk in instance.disks:
321 if dsk.dev_type in constants.LDS_DRBD:
322 tcp_port = dsk.logical_id[2]
323 if tcp_port not in ports:
325 ports[tcp_port].append((instance.name, "drbd disk %s" % dsk.iv_name))
326 # gather network port reservation
327 net_port = getattr(instance, "network_port", None)
328 if net_port is not None:
329 if net_port not in ports:
331 ports[net_port].append((instance.name, "network port"))
333 # instance disk verify
334 for idx, disk in enumerate(instance.disks):
335 result.extend(["instance '%s' disk %d error: %s" %
336 (instance.name, idx, msg) for msg in disk.Verify()])
337 result.extend(self._CheckDiskIDs(disk, seen_lids, seen_pids))
339 # cluster-wide pool of free ports
340 for free_port in data.cluster.tcpudp_port_pool:
341 if free_port not in ports:
342 ports[free_port] = []
343 ports[free_port].append(("cluster", "port marked as free"))
345 # compute tcp/udp duplicate ports
351 txt = ", ".join(["%s/%s" % val for val in pdata])
352 result.append("tcp/udp port %s has duplicates: %s" % (pnum, txt))
354 # highest used tcp port check
356 if keys[-1] > data.cluster.highest_used_port:
357 result.append("Highest used port mismatch, saved %s, computed %s" %
358 (data.cluster.highest_used_port, keys[-1]))
360 if not data.nodes[data.cluster.master_node].master_candidate:
361 result.append("Master node is not a master candidate")
363 # master candidate checks
364 mc_now, mc_max = self._UnlockedGetMasterCandidateStats()
366 result.append("Not enough master candidates: actual %d, target %d" %
370 for node in data.nodes.values():
371 if [node.master_candidate, node.drained, node.offline].count(True) > 1:
372 result.append("Node %s state is invalid: master_candidate=%s,"
373 " drain=%s, offline=%s" %
374 (node.name, node.master_candidate, node.drain,
378 d_map, duplicates = self._UnlockedComputeDRBDMap()
379 for node, minor, instance_a, instance_b in duplicates:
380 result.append("DRBD minor %d on node %s is assigned twice to instances"
381 " %s and %s" % (minor, node, instance_a, instance_b))
385 @locking.ssynchronized(_config_lock, shared=1)
386 def VerifyConfig(self):
389 This is just a wrapper over L{_UnlockedVerifyConfig}.
392 @return: a list of error messages; a non-empty list signifies
396 return self._UnlockedVerifyConfig()
398 def _UnlockedSetDiskID(self, disk, node_name):
399 """Convert the unique ID to the ID needed on the target nodes.
401 This is used only for drbd, which needs ip/port configuration.
403 The routine descends down and updates its children also, because
404 this helps when the only the top device is passed to the remote
407 This function is for internal use, when the config lock is already held.
411 for child in disk.children:
412 self._UnlockedSetDiskID(child, node_name)
414 if disk.logical_id is None and disk.physical_id is not None:
416 if disk.dev_type == constants.LD_DRBD8:
417 pnode, snode, port, pminor, sminor, secret = disk.logical_id
418 if node_name not in (pnode, snode):
419 raise errors.ConfigurationError("DRBD device not knowing node %s" %
421 pnode_info = self._UnlockedGetNodeInfo(pnode)
422 snode_info = self._UnlockedGetNodeInfo(snode)
423 if pnode_info is None or snode_info is None:
424 raise errors.ConfigurationError("Can't find primary or secondary node"
425 " for %s" % str(disk))
426 p_data = (pnode_info.secondary_ip, port)
427 s_data = (snode_info.secondary_ip, port)
428 if pnode == node_name:
429 disk.physical_id = p_data + s_data + (pminor, secret)
430 else: # it must be secondary, we tested above
431 disk.physical_id = s_data + p_data + (sminor, secret)
433 disk.physical_id = disk.logical_id
436 @locking.ssynchronized(_config_lock)
437 def SetDiskID(self, disk, node_name):
438 """Convert the unique ID to the ID needed on the target nodes.
440 This is used only for drbd, which needs ip/port configuration.
442 The routine descends down and updates its children also, because
443 this helps when the only the top device is passed to the remote
447 return self._UnlockedSetDiskID(disk, node_name)
449 @locking.ssynchronized(_config_lock)
450 def AddTcpUdpPort(self, port):
451 """Adds a new port to the available port pool.
454 if not isinstance(port, int):
455 raise errors.ProgrammerError("Invalid type passed for port")
457 self._config_data.cluster.tcpudp_port_pool.add(port)
460 @locking.ssynchronized(_config_lock, shared=1)
461 def GetPortList(self):
462 """Returns a copy of the current port list.
465 return self._config_data.cluster.tcpudp_port_pool.copy()
467 @locking.ssynchronized(_config_lock)
468 def AllocatePort(self):
471 The port will be taken from the available port pool or from the
472 default port range (and in this case we increase
476 # If there are TCP/IP ports configured, we use them first.
477 if self._config_data.cluster.tcpudp_port_pool:
478 port = self._config_data.cluster.tcpudp_port_pool.pop()
480 port = self._config_data.cluster.highest_used_port + 1
481 if port >= constants.LAST_DRBD_PORT:
482 raise errors.ConfigurationError("The highest used port is greater"
483 " than %s. Aborting." %
484 constants.LAST_DRBD_PORT)
485 self._config_data.cluster.highest_used_port = port
490 def _UnlockedComputeDRBDMap(self):
491 """Compute the used DRBD minor/nodes.
494 @return: dictionary of node_name: dict of minor: instance_name;
495 the returned dict will have all the nodes in it (even if with
496 an empty list), and a list of duplicates; if the duplicates
497 list is not empty, the configuration is corrupted and its caller
498 should raise an exception
501 def _AppendUsedPorts(instance_name, disk, used):
503 if disk.dev_type == constants.LD_DRBD8 and len(disk.logical_id) >= 5:
504 node_a, node_b, _, minor_a, minor_b = disk.logical_id[:5]
505 for node, port in ((node_a, minor_a), (node_b, minor_b)):
506 assert node in used, ("Node '%s' of instance '%s' not found"
507 " in node list" % (node, instance_name))
508 if port in used[node]:
509 duplicates.append((node, port, instance_name, used[node][port]))
511 used[node][port] = instance_name
513 for child in disk.children:
514 duplicates.extend(_AppendUsedPorts(instance_name, child, used))
518 my_dict = dict((node, {}) for node in self._config_data.nodes)
519 for instance in self._config_data.instances.itervalues():
520 for disk in instance.disks:
521 duplicates.extend(_AppendUsedPorts(instance.name, disk, my_dict))
522 for (node, minor), instance in self._temporary_drbds.iteritems():
523 if minor in my_dict[node] and my_dict[node][minor] != instance:
524 duplicates.append((node, minor, instance, my_dict[node][minor]))
526 my_dict[node][minor] = instance
527 return my_dict, duplicates
529 @locking.ssynchronized(_config_lock)
530 def ComputeDRBDMap(self):
531 """Compute the used DRBD minor/nodes.
533 This is just a wrapper over L{_UnlockedComputeDRBDMap}.
535 @return: dictionary of node_name: dict of minor: instance_name;
536 the returned dict will have all the nodes in it (even if with
540 d_map, duplicates = self._UnlockedComputeDRBDMap()
542 raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
546 @locking.ssynchronized(_config_lock)
547 def AllocateDRBDMinor(self, nodes, instance):
548 """Allocate a drbd minor.
550 The free minor will be automatically computed from the existing
551 devices. A node can be given multiple times in order to allocate
552 multiple minors. The result is the list of minors, in the same
553 order as the passed nodes.
555 @type instance: string
556 @param instance: the instance for which we allocate minors
559 assert isinstance(instance, basestring), \
560 "Invalid argument '%s' passed to AllocateDRBDMinor" % instance
562 d_map, duplicates = self._UnlockedComputeDRBDMap()
564 raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
570 # no minors used, we can start at 0
573 self._temporary_drbds[(nname, 0)] = instance
577 ffree = utils.FirstFree(keys)
579 # return the next minor
580 # TODO: implement high-limit check
584 # double-check minor against current instances
585 assert minor not in d_map[nname], \
586 ("Attempt to reuse allocated DRBD minor %d on node %s,"
587 " already allocated to instance %s" %
588 (minor, nname, d_map[nname][minor]))
589 ndata[minor] = instance
590 # double-check minor against reservation
591 r_key = (nname, minor)
592 assert r_key not in self._temporary_drbds, \
593 ("Attempt to reuse reserved DRBD minor %d on node %s,"
594 " reserved for instance %s" %
595 (minor, nname, self._temporary_drbds[r_key]))
596 self._temporary_drbds[r_key] = instance
598 logging.debug("Request to allocate drbd minors, input: %s, returning %s",
602 def _UnlockedReleaseDRBDMinors(self, instance):
603 """Release temporary drbd minors allocated for a given instance.
605 @type instance: string
606 @param instance: the instance for which temporary minors should be
610 assert isinstance(instance, basestring), \
611 "Invalid argument passed to ReleaseDRBDMinors"
612 for key, name in self._temporary_drbds.items():
614 del self._temporary_drbds[key]
616 @locking.ssynchronized(_config_lock)
617 def ReleaseDRBDMinors(self, instance):
618 """Release temporary drbd minors allocated for a given instance.
620 This should be called on the error paths, on the success paths
621 it's automatically called by the ConfigWriter add and update
624 This function is just a wrapper over L{_UnlockedReleaseDRBDMinors}.
626 @type instance: string
627 @param instance: the instance for which temporary minors should be
631 self._UnlockedReleaseDRBDMinors(instance)
633 @locking.ssynchronized(_config_lock, shared=1)
634 def GetConfigVersion(self):
635 """Get the configuration version.
637 @return: Config version
640 return self._config_data.version
642 @locking.ssynchronized(_config_lock, shared=1)
643 def GetClusterName(self):
646 @return: Cluster name
649 return self._config_data.cluster.cluster_name
651 @locking.ssynchronized(_config_lock, shared=1)
652 def GetMasterNode(self):
653 """Get the hostname of the master node for this cluster.
655 @return: Master hostname
658 return self._config_data.cluster.master_node
660 @locking.ssynchronized(_config_lock, shared=1)
661 def GetMasterIP(self):
662 """Get the IP of the master node for this cluster.
667 return self._config_data.cluster.master_ip
669 @locking.ssynchronized(_config_lock, shared=1)
670 def GetMasterNetdev(self):
671 """Get the master network device for this cluster.
674 return self._config_data.cluster.master_netdev
676 @locking.ssynchronized(_config_lock, shared=1)
677 def GetFileStorageDir(self):
678 """Get the file storage dir for this cluster.
681 return self._config_data.cluster.file_storage_dir
683 @locking.ssynchronized(_config_lock, shared=1)
684 def GetHypervisorType(self):
685 """Get the hypervisor type for this cluster.
688 return self._config_data.cluster.default_hypervisor
690 @locking.ssynchronized(_config_lock, shared=1)
691 def GetHostKey(self):
692 """Return the rsa hostkey from the config.
695 @return: the rsa hostkey
698 return self._config_data.cluster.rsahostkeypub
700 @locking.ssynchronized(_config_lock)
701 def AddInstance(self, instance):
702 """Add an instance to the config.
704 This should be used after creating a new instance.
706 @type instance: L{objects.Instance}
707 @param instance: the instance object
710 if not isinstance(instance, objects.Instance):
711 raise errors.ProgrammerError("Invalid type passed to AddInstance")
713 if instance.disk_template != constants.DT_DISKLESS:
714 all_lvs = instance.MapLVsByNode()
715 logging.info("Instance '%s' DISK_LAYOUT: %s", instance.name, all_lvs)
717 all_macs = self._AllMACs()
718 for nic in instance.nics:
719 if nic.mac in all_macs:
720 raise errors.ConfigurationError("Cannot add instance %s:"
721 " MAC address '%s' already in use." % (instance.name, nic.mac))
723 instance.serial_no = 1
724 self._config_data.instances[instance.name] = instance
725 self._config_data.cluster.serial_no += 1
726 self._UnlockedReleaseDRBDMinors(instance.name)
727 for nic in instance.nics:
728 self._temporary_macs.discard(nic.mac)
731 def _SetInstanceStatus(self, instance_name, status):
732 """Set the instance's status to a given value.
735 assert isinstance(status, bool), \
736 "Invalid status '%s' passed to SetInstanceStatus" % (status,)
738 if instance_name not in self._config_data.instances:
739 raise errors.ConfigurationError("Unknown instance '%s'" %
741 instance = self._config_data.instances[instance_name]
742 if instance.admin_up != status:
743 instance.admin_up = status
744 instance.serial_no += 1
747 @locking.ssynchronized(_config_lock)
748 def MarkInstanceUp(self, instance_name):
749 """Mark the instance status to up in the config.
752 self._SetInstanceStatus(instance_name, True)
754 @locking.ssynchronized(_config_lock)
755 def RemoveInstance(self, instance_name):
756 """Remove the instance from the configuration.
759 if instance_name not in self._config_data.instances:
760 raise errors.ConfigurationError("Unknown instance '%s'" % instance_name)
761 del self._config_data.instances[instance_name]
762 self._config_data.cluster.serial_no += 1
765 @locking.ssynchronized(_config_lock)
766 def RenameInstance(self, old_name, new_name):
767 """Rename an instance.
769 This needs to be done in ConfigWriter and not by RemoveInstance
770 combined with AddInstance as only we can guarantee an atomic
774 if old_name not in self._config_data.instances:
775 raise errors.ConfigurationError("Unknown instance '%s'" % old_name)
776 inst = self._config_data.instances[old_name]
777 del self._config_data.instances[old_name]
780 for disk in inst.disks:
781 if disk.dev_type == constants.LD_FILE:
782 # rename the file paths in logical and physical id
783 file_storage_dir = os.path.dirname(os.path.dirname(disk.logical_id[1]))
784 disk.physical_id = disk.logical_id = (disk.logical_id[0],
785 os.path.join(file_storage_dir,
789 self._config_data.instances[inst.name] = inst
792 @locking.ssynchronized(_config_lock)
793 def MarkInstanceDown(self, instance_name):
794 """Mark the status of an instance to down in the configuration.
797 self._SetInstanceStatus(instance_name, False)
799 def _UnlockedGetInstanceList(self):
800 """Get the list of instances.
802 This function is for internal use, when the config lock is already held.
805 return self._config_data.instances.keys()
807 @locking.ssynchronized(_config_lock, shared=1)
808 def GetInstanceList(self):
809 """Get the list of instances.
811 @return: array of instances, ex. ['instance2.example.com',
812 'instance1.example.com']
815 return self._UnlockedGetInstanceList()
817 @locking.ssynchronized(_config_lock, shared=1)
818 def ExpandInstanceName(self, short_name):
819 """Attempt to expand an incomplete instance name.
822 return utils.MatchNameComponent(short_name,
823 self._config_data.instances.keys())
825 def _UnlockedGetInstanceInfo(self, instance_name):
826 """Returns information about an instance.
828 This function is for internal use, when the config lock is already held.
831 if instance_name not in self._config_data.instances:
834 return self._config_data.instances[instance_name]
836 @locking.ssynchronized(_config_lock, shared=1)
837 def GetInstanceInfo(self, instance_name):
838 """Returns information about an instance.
840 It takes the information from the configuration file. Other information of
841 an instance are taken from the live systems.
843 @param instance_name: name of the instance, e.g.
844 I{instance1.example.com}
846 @rtype: L{objects.Instance}
847 @return: the instance object
850 return self._UnlockedGetInstanceInfo(instance_name)
852 @locking.ssynchronized(_config_lock, shared=1)
853 def GetAllInstancesInfo(self):
854 """Get the configuration of all instances.
857 @return: dict of (instance, instance_info), where instance_info is what
858 would GetInstanceInfo return for the node
861 my_dict = dict([(instance, self._UnlockedGetInstanceInfo(instance))
862 for instance in self._UnlockedGetInstanceList()])
865 @locking.ssynchronized(_config_lock)
866 def AddNode(self, node):
867 """Add a node to the configuration.
869 @type node: L{objects.Node}
870 @param node: a Node instance
873 logging.info("Adding node %s to configuration" % node.name)
876 self._config_data.nodes[node.name] = node
877 self._config_data.cluster.serial_no += 1
880 @locking.ssynchronized(_config_lock)
881 def RemoveNode(self, node_name):
882 """Remove a node from the configuration.
885 logging.info("Removing node %s from configuration" % node_name)
887 if node_name not in self._config_data.nodes:
888 raise errors.ConfigurationError("Unknown node '%s'" % node_name)
890 del self._config_data.nodes[node_name]
891 self._config_data.cluster.serial_no += 1
894 @locking.ssynchronized(_config_lock, shared=1)
895 def ExpandNodeName(self, short_name):
896 """Attempt to expand an incomplete instance name.
899 return utils.MatchNameComponent(short_name,
900 self._config_data.nodes.keys())
902 def _UnlockedGetNodeInfo(self, node_name):
903 """Get the configuration of a node, as stored in the config.
905 This function is for internal use, when the config lock is already
908 @param node_name: the node name, e.g. I{node1.example.com}
910 @rtype: L{objects.Node}
911 @return: the node object
914 if node_name not in self._config_data.nodes:
917 return self._config_data.nodes[node_name]
920 @locking.ssynchronized(_config_lock, shared=1)
921 def GetNodeInfo(self, node_name):
922 """Get the configuration of a node, as stored in the config.
924 This is just a locked wrapper over L{_UnlockedGetNodeInfo}.
926 @param node_name: the node name, e.g. I{node1.example.com}
928 @rtype: L{objects.Node}
929 @return: the node object
932 return self._UnlockedGetNodeInfo(node_name)
934 def _UnlockedGetNodeList(self):
935 """Return the list of nodes which are in the configuration.
937 This function is for internal use, when the config lock is already
943 return self._config_data.nodes.keys()
946 @locking.ssynchronized(_config_lock, shared=1)
947 def GetNodeList(self):
948 """Return the list of nodes which are in the configuration.
951 return self._UnlockedGetNodeList()
953 @locking.ssynchronized(_config_lock, shared=1)
954 def GetOnlineNodeList(self):
955 """Return the list of nodes which are online.
958 all_nodes = [self._UnlockedGetNodeInfo(node)
959 for node in self._UnlockedGetNodeList()]
960 return [node.name for node in all_nodes if not node.offline]
962 @locking.ssynchronized(_config_lock, shared=1)
963 def GetAllNodesInfo(self):
964 """Get the configuration of all nodes.
967 @return: dict of (node, node_info), where node_info is what
968 would GetNodeInfo return for the node
971 my_dict = dict([(node, self._UnlockedGetNodeInfo(node))
972 for node in self._UnlockedGetNodeList()])
975 def _UnlockedGetMasterCandidateStats(self, exceptions=None):
976 """Get the number of current and maximum desired and possible candidates.
978 @type exceptions: list
979 @param exceptions: if passed, list of nodes that should be ignored
981 @return: tuple of (current, desired and possible)
985 for node in self._config_data.nodes.values():
986 if exceptions and node.name in exceptions:
988 if not (node.offline or node.drained):
990 if node.master_candidate:
992 mc_max = min(mc_max, self._config_data.cluster.candidate_pool_size)
993 return (mc_now, mc_max)
995 @locking.ssynchronized(_config_lock, shared=1)
996 def GetMasterCandidateStats(self, exceptions=None):
997 """Get the number of current and maximum possible candidates.
999 This is just a wrapper over L{_UnlockedGetMasterCandidateStats}.
1001 @type exceptions: list
1002 @param exceptions: if passed, list of nodes that should be ignored
1004 @return: tuple of (current, max)
1007 return self._UnlockedGetMasterCandidateStats(exceptions)
1009 @locking.ssynchronized(_config_lock)
1010 def MaintainCandidatePool(self):
1011 """Try to grow the candidate pool to the desired size.
1014 @return: list with the adjusted nodes (L{objects.Node} instances)
1017 mc_now, mc_max = self._UnlockedGetMasterCandidateStats()
1020 node_list = self._config_data.nodes.keys()
1021 random.shuffle(node_list)
1022 for name in node_list:
1023 if mc_now >= mc_max:
1025 node = self._config_data.nodes[name]
1026 if node.master_candidate or node.offline or node.drained:
1028 mod_list.append(node)
1029 node.master_candidate = True
1032 if mc_now != mc_max:
1033 # this should not happen
1034 logging.warning("Warning: MaintainCandidatePool didn't manage to"
1035 " fill the candidate pool (%d/%d)", mc_now, mc_max)
1037 self._config_data.cluster.serial_no += 1
1042 def _BumpSerialNo(self):
1043 """Bump up the serial number of the config.
1046 self._config_data.serial_no += 1
1048 def _OpenConfig(self):
1049 """Read the config data from disk.
1052 f = open(self._cfg_file, 'r')
1055 data = objects.ConfigData.FromDict(serializer.Load(f.read()))
1056 except Exception, err:
1057 raise errors.ConfigurationError(err)
1061 # Make sure the configuration has the right version
1062 _ValidateConfig(data)
1064 if (not hasattr(data, 'cluster') or
1065 not hasattr(data.cluster, 'rsahostkeypub')):
1066 raise errors.ConfigurationError("Incomplete configuration"
1067 " (missing cluster.rsahostkeypub)")
1069 # Upgrade configuration if needed
1070 data.UpgradeConfig()
1072 self._config_data = data
1073 # reset the last serial as -1 so that the next write will cause
1075 self._last_cluster_serial = -1
1077 def _DistributeConfig(self):
1078 """Distribute the configuration to the other nodes.
1080 Currently, this only copies the configuration file. In the future,
1081 it could be used to encapsulate the 2/3-phase update mechanism.
1090 myhostname = self._my_hostname
1091 # we can skip checking whether _UnlockedGetNodeInfo returns None
1092 # since the node list comes from _UnlocketGetNodeList, and we are
1093 # called with the lock held, so no modifications should take place
1095 for node_name in self._UnlockedGetNodeList():
1096 if node_name == myhostname:
1098 node_info = self._UnlockedGetNodeInfo(node_name)
1099 if not node_info.master_candidate:
1101 node_list.append(node_info.name)
1102 addr_list.append(node_info.primary_ip)
1104 result = rpc.RpcRunner.call_upload_file(node_list, self._cfg_file,
1105 address_list=addr_list)
1106 for node in node_list:
1107 if not result[node]:
1108 logging.error("copy of file %s to node %s failed",
1109 self._cfg_file, node)
1113 def _WriteConfig(self, destination=None):
1114 """Write the configuration data to persistent storage.
1117 config_errors = self._UnlockedVerifyConfig()
1119 raise errors.ConfigurationError("Configuration data is not"
1121 (", ".join(config_errors)))
1122 if destination is None:
1123 destination = self._cfg_file
1124 self._BumpSerialNo()
1125 txt = serializer.Dump(self._config_data.ToDict())
1126 dir_name, file_name = os.path.split(destination)
1127 fd, name = tempfile.mkstemp('.newconfig', file_name, dir_name)
1128 f = os.fdopen(fd, 'w')
1131 os.fsync(f.fileno())
1134 # we don't need to do os.close(fd) as f.close() did it
1135 os.rename(name, destination)
1136 self.write_count += 1
1138 # and redistribute the config file to master candidates
1139 self._DistributeConfig()
1141 # Write ssconf files on all nodes (including locally)
1142 if self._last_cluster_serial < self._config_data.cluster.serial_no:
1143 if not self._offline:
1144 rpc.RpcRunner.call_write_ssconf_files(self._UnlockedGetNodeList(),
1145 self._UnlockedGetSsconfValues())
1146 self._last_cluster_serial = self._config_data.cluster.serial_no
1148 def _UnlockedGetSsconfValues(self):
1149 """Return the values needed by ssconf.
1152 @return: a dictionary with keys the ssconf names and values their
1157 instance_names = utils.NiceSort(self._UnlockedGetInstanceList())
1158 node_names = utils.NiceSort(self._UnlockedGetNodeList())
1159 node_info = [self._UnlockedGetNodeInfo(name) for name in node_names]
1161 instance_data = fn(instance_names)
1162 off_data = fn(node.name for node in node_info if node.offline)
1163 on_data = fn(node.name for node in node_info if not node.offline)
1164 mc_data = fn(node.name for node in node_info if node.master_candidate)
1165 node_data = fn(node_names)
1167 cluster = self._config_data.cluster
1168 cluster_tags = fn(cluster.GetTags())
1170 constants.SS_CLUSTER_NAME: cluster.cluster_name,
1171 constants.SS_CLUSTER_TAGS: cluster_tags,
1172 constants.SS_FILE_STORAGE_DIR: cluster.file_storage_dir,
1173 constants.SS_MASTER_CANDIDATES: mc_data,
1174 constants.SS_MASTER_IP: cluster.master_ip,
1175 constants.SS_MASTER_NETDEV: cluster.master_netdev,
1176 constants.SS_MASTER_NODE: cluster.master_node,
1177 constants.SS_NODE_LIST: node_data,
1178 constants.SS_OFFLINE_NODES: off_data,
1179 constants.SS_ONLINE_NODES: on_data,
1180 constants.SS_INSTANCE_LIST: instance_data,
1181 constants.SS_RELEASE_VERSION: constants.RELEASE_VERSION,
1184 @locking.ssynchronized(_config_lock, shared=1)
1185 def GetVGName(self):
1186 """Return the volume group name.
1189 return self._config_data.cluster.volume_group_name
1191 @locking.ssynchronized(_config_lock)
1192 def SetVGName(self, vg_name):
1193 """Set the volume group name.
1196 self._config_data.cluster.volume_group_name = vg_name
1197 self._config_data.cluster.serial_no += 1
1200 @locking.ssynchronized(_config_lock, shared=1)
1201 def GetDefBridge(self):
1202 """Return the default bridge.
1205 return self._config_data.cluster.default_bridge
1207 @locking.ssynchronized(_config_lock, shared=1)
1208 def GetMACPrefix(self):
1209 """Return the mac prefix.
1212 return self._config_data.cluster.mac_prefix
1214 @locking.ssynchronized(_config_lock, shared=1)
1215 def GetClusterInfo(self):
1216 """Returns information about the cluster
1218 @rtype: L{objects.Cluster}
1219 @return: the cluster object
1222 return self._config_data.cluster
1224 @locking.ssynchronized(_config_lock)
1225 def Update(self, target):
1226 """Notify function to be called after updates.
1228 This function must be called when an object (as returned by
1229 GetInstanceInfo, GetNodeInfo, GetCluster) has been updated and the
1230 caller wants the modifications saved to the backing store. Note
1231 that all modified objects will be saved, but the target argument
1232 is the one the caller wants to ensure that it's saved.
1234 @param target: an instance of either L{objects.Cluster},
1235 L{objects.Node} or L{objects.Instance} which is existing in
1239 if self._config_data is None:
1240 raise errors.ProgrammerError("Configuration file not read,"
1242 update_serial = False
1243 if isinstance(target, objects.Cluster):
1244 test = target == self._config_data.cluster
1245 elif isinstance(target, objects.Node):
1246 test = target in self._config_data.nodes.values()
1247 update_serial = True
1248 elif isinstance(target, objects.Instance):
1249 test = target in self._config_data.instances.values()
1251 raise errors.ProgrammerError("Invalid object type (%s) passed to"
1252 " ConfigWriter.Update" % type(target))
1254 raise errors.ConfigurationError("Configuration updated since object"
1255 " has been read or unknown object")
1256 target.serial_no += 1
1259 # for node updates, we need to increase the cluster serial too
1260 self._config_data.cluster.serial_no += 1
1262 if isinstance(target, objects.Instance):
1263 self._UnlockedReleaseDRBDMinors(target.name)
1264 for nic in target.nics:
1265 self._temporary_macs.discard(nic.mac)