4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Configuration management for Ganeti
24 This module provides the interface to the Ganeti cluster configuration.
26 The configuration data is stored on every node but is updated on the master
27 only. After each update, the master distributes the data to the other nodes.
29 Currently, the data storage format is JSON. YAML was slow and consuming too
39 from ganeti import errors
40 from ganeti import locking
41 from ganeti import utils
42 from ganeti import constants
43 from ganeti import rpc
44 from ganeti import objects
45 from ganeti import serializer
48 _config_lock = locking.SharedLock()
51 def _ValidateConfig(data):
52 """Verifies that a configuration objects looks valid.
54 This only verifies the version of the configuration.
56 @raise errors.ConfigurationError: if the version differs from what
60 if data.version != constants.CONFIG_VERSION:
61 raise errors.ConfigurationError("Cluster configuration version"
62 " mismatch, got %s instead of %s" %
64 constants.CONFIG_VERSION))
68 """The interface to the cluster configuration.
71 def __init__(self, cfg_file=None, offline=False):
73 self._lock = _config_lock
74 self._config_data = None
75 self._offline = offline
77 self._cfg_file = constants.CLUSTER_CONF_FILE
79 self._cfg_file = cfg_file
80 self._temporary_ids = set()
81 self._temporary_drbds = {}
82 # Note: in order to prevent errors when resolving our name in
83 # _DistributeConfig, we compute it here once and reuse it; it's
84 # better to raise an error before starting to modify the config
85 # file than after it was modified
86 self._my_hostname = utils.HostInfo().name
87 self._last_cluster_serial = -1
90 # this method needs to be static, so that we can call it on the class
93 """Check if the cluster is configured.
96 return os.path.exists(constants.CLUSTER_CONF_FILE)
98 @locking.ssynchronized(_config_lock, shared=1)
99 def GenerateMAC(self):
100 """Generate a MAC for an instance.
102 This should check the current instances for duplicates.
105 prefix = self._config_data.cluster.mac_prefix
106 all_macs = self._AllMACs()
109 byte1 = random.randrange(0, 256)
110 byte2 = random.randrange(0, 256)
111 byte3 = random.randrange(0, 256)
112 mac = "%s:%02x:%02x:%02x" % (prefix, byte1, byte2, byte3)
113 if mac not in all_macs:
117 raise errors.ConfigurationError("Can't generate unique MAC")
120 @locking.ssynchronized(_config_lock, shared=1)
121 def IsMacInUse(self, mac):
122 """Predicate: check if the specified MAC is in use in the Ganeti cluster.
124 This only checks instances managed by this cluster, it does not
125 check for potential collisions elsewhere.
128 all_macs = self._AllMACs()
129 return mac in all_macs
131 @locking.ssynchronized(_config_lock, shared=1)
132 def GenerateDRBDSecret(self):
133 """Generate a DRBD secret.
135 This checks the current disks for duplicates.
138 all_secrets = self._AllDRBDSecrets()
141 secret = utils.GenerateSecret()
142 if secret not in all_secrets:
146 raise errors.ConfigurationError("Can't generate unique DRBD secret")
149 def _ComputeAllLVs(self):
150 """Compute the list of all LVs.
154 for instance in self._config_data.instances.values():
155 node_data = instance.MapLVsByNode()
156 for lv_list in node_data.values():
157 lvnames.update(lv_list)
160 @locking.ssynchronized(_config_lock, shared=1)
161 def GenerateUniqueID(self, exceptions=None):
162 """Generate an unique disk name.
164 This checks the current node, instances and disk names for
167 @param exceptions: a list with some other names which should be checked
168 for uniqueness (used for example when you want to get
169 more than one id at one time without adding each one in
170 turn to the config file)
173 @return: the unique id
177 existing.update(self._temporary_ids)
178 existing.update(self._ComputeAllLVs())
179 existing.update(self._config_data.instances.keys())
180 existing.update(self._config_data.nodes.keys())
181 if exceptions is not None:
182 existing.update(exceptions)
185 unique_id = utils.NewUUID()
186 if unique_id not in existing and unique_id is not None:
189 raise errors.ConfigurationError("Not able generate an unique ID"
190 " (last tried ID: %s" % unique_id)
191 self._temporary_ids.add(unique_id)
195 """Return all MACs present in the config.
198 @return: the list of all MACs
202 for instance in self._config_data.instances.values():
203 for nic in instance.nics:
204 result.append(nic.mac)
208 def _AllDRBDSecrets(self):
209 """Return all DRBD secrets present in the config.
212 @return: the list of all DRBD secrets
215 def helper(disk, result):
216 """Recursively gather secrets from this disk."""
217 if disk.dev_type == constants.DT_DRBD8:
218 result.append(disk.logical_id[5])
220 for child in disk.children:
221 helper(child, result)
224 for instance in self._config_data.instances.values():
225 for disk in instance.disks:
230 def _CheckDiskIDs(self, disk, l_ids, p_ids):
231 """Compute duplicate disk IDs
233 @type disk: L{objects.Disk}
234 @param disk: the disk at which to start searching
236 @param l_ids: list of current logical ids
238 @param p_ids: list of current physical ids
240 @return: a list of error messages
244 if disk.logical_id in l_ids:
245 result.append("duplicate logical id %s" % str(disk.logical_id))
247 l_ids.append(disk.logical_id)
248 if disk.physical_id in p_ids:
249 result.append("duplicate physical id %s" % str(disk.physical_id))
251 p_ids.append(disk.physical_id)
254 for child in disk.children:
255 result.extend(self._CheckDiskIDs(child, l_ids, p_ids))
258 def _UnlockedVerifyConfig(self):
262 @return: a list of error messages; a non-empty list signifies
269 data = self._config_data
272 for instance_name in data.instances:
273 instance = data.instances[instance_name]
274 if instance.primary_node not in data.nodes:
275 result.append("instance '%s' has invalid primary node '%s'" %
276 (instance_name, instance.primary_node))
277 for snode in instance.secondary_nodes:
278 if snode not in data.nodes:
279 result.append("instance '%s' has invalid secondary node '%s'" %
280 (instance_name, snode))
281 for idx, nic in enumerate(instance.nics):
282 if nic.mac in seen_macs:
283 result.append("instance '%s' has NIC %d mac %s duplicate" %
284 (instance_name, idx, nic.mac))
286 seen_macs.append(nic.mac)
288 # gather the drbd ports for duplicate checks
289 for dsk in instance.disks:
290 if dsk.dev_type in constants.LDS_DRBD:
291 tcp_port = dsk.logical_id[2]
292 if tcp_port not in ports:
294 ports[tcp_port].append((instance.name, "drbd disk %s" % dsk.iv_name))
295 # gather network port reservation
296 net_port = getattr(instance, "network_port", None)
297 if net_port is not None:
298 if net_port not in ports:
300 ports[net_port].append((instance.name, "network port"))
302 # instance disk verify
303 for idx, disk in enumerate(instance.disks):
304 result.extend(["instance '%s' disk %d error: %s" %
305 (instance.name, idx, msg) for msg in disk.Verify()])
306 result.extend(self._CheckDiskIDs(disk, seen_lids, seen_pids))
308 # cluster-wide pool of free ports
309 for free_port in data.cluster.tcpudp_port_pool:
310 if free_port not in ports:
311 ports[free_port] = []
312 ports[free_port].append(("cluster", "port marked as free"))
314 # compute tcp/udp duplicate ports
320 txt = ", ".join(["%s/%s" % val for val in pdata])
321 result.append("tcp/udp port %s has duplicates: %s" % (pnum, txt))
323 # highest used tcp port check
325 if keys[-1] > data.cluster.highest_used_port:
326 result.append("Highest used port mismatch, saved %s, computed %s" %
327 (data.cluster.highest_used_port, keys[-1]))
329 if not data.nodes[data.cluster.master_node].master_candidate:
330 result.append("Master node is not a master candidate")
332 # master candidate checks
333 mc_now, mc_max = self._UnlockedGetMasterCandidateStats()
335 result.append("Not enough master candidates: actual %d, target %d" %
339 for node in data.nodes.values():
340 if [node.master_candidate, node.drained, node.offline].count(True) > 1:
341 result.append("Node %s state is invalid: master_candidate=%s,"
342 " drain=%s, offline=%s" %
343 (node.name, node.master_candidate, node.drain,
347 d_map, duplicates = self._UnlockedComputeDRBDMap()
348 for node, minor, instance_a, instance_b in duplicates:
349 result.append("DRBD minor %d on node %s is assigned twice to instances"
350 " %s and %s" % (minor, node, instance_a, instance_b))
354 @locking.ssynchronized(_config_lock, shared=1)
355 def VerifyConfig(self):
358 This is just a wrapper over L{_UnlockedVerifyConfig}.
361 @return: a list of error messages; a non-empty list signifies
365 return self._UnlockedVerifyConfig()
367 def _UnlockedSetDiskID(self, disk, node_name):
368 """Convert the unique ID to the ID needed on the target nodes.
370 This is used only for drbd, which needs ip/port configuration.
372 The routine descends down and updates its children also, because
373 this helps when the only the top device is passed to the remote
376 This function is for internal use, when the config lock is already held.
380 for child in disk.children:
381 self._UnlockedSetDiskID(child, node_name)
383 if disk.logical_id is None and disk.physical_id is not None:
385 if disk.dev_type == constants.LD_DRBD8:
386 pnode, snode, port, pminor, sminor, secret = disk.logical_id
387 if node_name not in (pnode, snode):
388 raise errors.ConfigurationError("DRBD device not knowing node %s" %
390 pnode_info = self._UnlockedGetNodeInfo(pnode)
391 snode_info = self._UnlockedGetNodeInfo(snode)
392 if pnode_info is None or snode_info is None:
393 raise errors.ConfigurationError("Can't find primary or secondary node"
394 " for %s" % str(disk))
395 p_data = (pnode_info.secondary_ip, port)
396 s_data = (snode_info.secondary_ip, port)
397 if pnode == node_name:
398 disk.physical_id = p_data + s_data + (pminor, secret)
399 else: # it must be secondary, we tested above
400 disk.physical_id = s_data + p_data + (sminor, secret)
402 disk.physical_id = disk.logical_id
405 @locking.ssynchronized(_config_lock)
406 def SetDiskID(self, disk, node_name):
407 """Convert the unique ID to the ID needed on the target nodes.
409 This is used only for drbd, which needs ip/port configuration.
411 The routine descends down and updates its children also, because
412 this helps when the only the top device is passed to the remote
416 return self._UnlockedSetDiskID(disk, node_name)
418 @locking.ssynchronized(_config_lock)
419 def AddTcpUdpPort(self, port):
420 """Adds a new port to the available port pool.
423 if not isinstance(port, int):
424 raise errors.ProgrammerError("Invalid type passed for port")
426 self._config_data.cluster.tcpudp_port_pool.add(port)
429 @locking.ssynchronized(_config_lock, shared=1)
430 def GetPortList(self):
431 """Returns a copy of the current port list.
434 return self._config_data.cluster.tcpudp_port_pool.copy()
436 @locking.ssynchronized(_config_lock)
437 def AllocatePort(self):
440 The port will be taken from the available port pool or from the
441 default port range (and in this case we increase
445 # If there are TCP/IP ports configured, we use them first.
446 if self._config_data.cluster.tcpudp_port_pool:
447 port = self._config_data.cluster.tcpudp_port_pool.pop()
449 port = self._config_data.cluster.highest_used_port + 1
450 if port >= constants.LAST_DRBD_PORT:
451 raise errors.ConfigurationError("The highest used port is greater"
452 " than %s. Aborting." %
453 constants.LAST_DRBD_PORT)
454 self._config_data.cluster.highest_used_port = port
459 def _UnlockedComputeDRBDMap(self):
460 """Compute the used DRBD minor/nodes.
463 @return: dictionary of node_name: dict of minor: instance_name;
464 the returned dict will have all the nodes in it (even if with
465 an empty list), and a list of duplicates; if the duplicates
466 list is not empty, the configuration is corrupted and its caller
467 should raise an exception
470 def _AppendUsedPorts(instance_name, disk, used):
472 if disk.dev_type == constants.LD_DRBD8 and len(disk.logical_id) >= 5:
473 nodeA, nodeB, dummy, minorA, minorB = disk.logical_id[:5]
474 for node, port in ((nodeA, minorA), (nodeB, minorB)):
475 assert node in used, ("Node '%s' of instance '%s' not found"
476 " in node list" % (node, instance_name))
477 if port in used[node]:
478 duplicates.append((node, port, instance_name, used[node][port]))
480 used[node][port] = instance_name
482 for child in disk.children:
483 duplicates.extend(_AppendUsedPorts(instance_name, child, used))
487 my_dict = dict((node, {}) for node in self._config_data.nodes)
488 for instance in self._config_data.instances.itervalues():
489 for disk in instance.disks:
490 duplicates.extend(_AppendUsedPorts(instance.name, disk, my_dict))
491 for (node, minor), instance in self._temporary_drbds.iteritems():
492 if minor in my_dict[node] and my_dict[node][minor] != instance:
493 duplicates.append((node, minor, instance, my_dict[node][minor]))
495 my_dict[node][minor] = instance
496 return my_dict, duplicates
498 @locking.ssynchronized(_config_lock)
499 def ComputeDRBDMap(self):
500 """Compute the used DRBD minor/nodes.
502 This is just a wrapper over L{_UnlockedComputeDRBDMap}.
504 @return: dictionary of node_name: dict of minor: instance_name;
505 the returned dict will have all the nodes in it (even if with
509 d_map, duplicates = self._UnlockedComputeDRBDMap()
511 raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
515 @locking.ssynchronized(_config_lock)
516 def AllocateDRBDMinor(self, nodes, instance):
517 """Allocate a drbd minor.
519 The free minor will be automatically computed from the existing
520 devices. A node can be given multiple times in order to allocate
521 multiple minors. The result is the list of minors, in the same
522 order as the passed nodes.
524 @type instance: string
525 @param instance: the instance for which we allocate minors
528 assert isinstance(instance, basestring), \
529 "Invalid argument '%s' passed to AllocateDRBDMinor" % instance
531 d_map, duplicates = self._UnlockedComputeDRBDMap()
533 raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
539 # no minors used, we can start at 0
542 self._temporary_drbds[(nname, 0)] = instance
546 ffree = utils.FirstFree(keys)
548 # return the next minor
549 # TODO: implement high-limit check
553 # double-check minor against current instances
554 assert minor not in d_map[nname], \
555 ("Attempt to reuse allocated DRBD minor %d on node %s,"
556 " already allocated to instance %s" %
557 (minor, nname, d_map[nname][minor]))
558 ndata[minor] = instance
559 # double-check minor against reservation
560 r_key = (nname, minor)
561 assert r_key not in self._temporary_drbds, \
562 ("Attempt to reuse reserved DRBD minor %d on node %s,"
563 " reserved for instance %s" %
564 (minor, nname, self._temporary_drbds[r_key]))
565 self._temporary_drbds[r_key] = instance
567 logging.debug("Request to allocate drbd minors, input: %s, returning %s",
571 def _UnlockedReleaseDRBDMinors(self, instance):
572 """Release temporary drbd minors allocated for a given instance.
574 @type instance: string
575 @param instance: the instance for which temporary minors should be
579 assert isinstance(instance, basestring), \
580 "Invalid argument passed to ReleaseDRBDMinors"
581 for key, name in self._temporary_drbds.items():
583 del self._temporary_drbds[key]
585 @locking.ssynchronized(_config_lock)
586 def ReleaseDRBDMinors(self, instance):
587 """Release temporary drbd minors allocated for a given instance.
589 This should be called on the error paths, on the success paths
590 it's automatically called by the ConfigWriter add and update
593 This function is just a wrapper over L{_UnlockedReleaseDRBDMinors}.
595 @type instance: string
596 @param instance: the instance for which temporary minors should be
600 self._UnlockedReleaseDRBDMinors(instance)
602 @locking.ssynchronized(_config_lock, shared=1)
603 def GetConfigVersion(self):
604 """Get the configuration version.
606 @return: Config version
609 return self._config_data.version
611 @locking.ssynchronized(_config_lock, shared=1)
612 def GetClusterName(self):
615 @return: Cluster name
618 return self._config_data.cluster.cluster_name
620 @locking.ssynchronized(_config_lock, shared=1)
621 def GetMasterNode(self):
622 """Get the hostname of the master node for this cluster.
624 @return: Master hostname
627 return self._config_data.cluster.master_node
629 @locking.ssynchronized(_config_lock, shared=1)
630 def GetMasterIP(self):
631 """Get the IP of the master node for this cluster.
636 return self._config_data.cluster.master_ip
638 @locking.ssynchronized(_config_lock, shared=1)
639 def GetMasterNetdev(self):
640 """Get the master network device for this cluster.
643 return self._config_data.cluster.master_netdev
645 @locking.ssynchronized(_config_lock, shared=1)
646 def GetFileStorageDir(self):
647 """Get the file storage dir for this cluster.
650 return self._config_data.cluster.file_storage_dir
652 @locking.ssynchronized(_config_lock, shared=1)
653 def GetHypervisorType(self):
654 """Get the hypervisor type for this cluster.
657 return self._config_data.cluster.default_hypervisor
659 @locking.ssynchronized(_config_lock, shared=1)
660 def GetHostKey(self):
661 """Return the rsa hostkey from the config.
664 @return: the rsa hostkey
667 return self._config_data.cluster.rsahostkeypub
669 @locking.ssynchronized(_config_lock)
670 def AddInstance(self, instance):
671 """Add an instance to the config.
673 This should be used after creating a new instance.
675 @type instance: L{objects.Instance}
676 @param instance: the instance object
679 if not isinstance(instance, objects.Instance):
680 raise errors.ProgrammerError("Invalid type passed to AddInstance")
682 if instance.disk_template != constants.DT_DISKLESS:
683 all_lvs = instance.MapLVsByNode()
684 logging.info("Instance '%s' DISK_LAYOUT: %s", instance.name, all_lvs)
686 instance.serial_no = 1
687 self._config_data.instances[instance.name] = instance
688 self._config_data.cluster.serial_no += 1
689 self._UnlockedReleaseDRBDMinors(instance.name)
692 def _SetInstanceStatus(self, instance_name, status):
693 """Set the instance's status to a given value.
696 assert isinstance(status, bool), \
697 "Invalid status '%s' passed to SetInstanceStatus" % (status,)
699 if instance_name not in self._config_data.instances:
700 raise errors.ConfigurationError("Unknown instance '%s'" %
702 instance = self._config_data.instances[instance_name]
703 if instance.admin_up != status:
704 instance.admin_up = status
705 instance.serial_no += 1
708 @locking.ssynchronized(_config_lock)
709 def MarkInstanceUp(self, instance_name):
710 """Mark the instance status to up in the config.
713 self._SetInstanceStatus(instance_name, True)
715 @locking.ssynchronized(_config_lock)
716 def RemoveInstance(self, instance_name):
717 """Remove the instance from the configuration.
720 if instance_name not in self._config_data.instances:
721 raise errors.ConfigurationError("Unknown instance '%s'" % instance_name)
722 del self._config_data.instances[instance_name]
723 self._config_data.cluster.serial_no += 1
726 @locking.ssynchronized(_config_lock)
727 def RenameInstance(self, old_name, new_name):
728 """Rename an instance.
730 This needs to be done in ConfigWriter and not by RemoveInstance
731 combined with AddInstance as only we can guarantee an atomic
735 if old_name not in self._config_data.instances:
736 raise errors.ConfigurationError("Unknown instance '%s'" % old_name)
737 inst = self._config_data.instances[old_name]
738 del self._config_data.instances[old_name]
741 for disk in inst.disks:
742 if disk.dev_type == constants.LD_FILE:
743 # rename the file paths in logical and physical id
744 file_storage_dir = os.path.dirname(os.path.dirname(disk.logical_id[1]))
745 disk.physical_id = disk.logical_id = (disk.logical_id[0],
746 os.path.join(file_storage_dir,
750 self._config_data.instances[inst.name] = inst
753 @locking.ssynchronized(_config_lock)
754 def MarkInstanceDown(self, instance_name):
755 """Mark the status of an instance to down in the configuration.
758 self._SetInstanceStatus(instance_name, False)
760 def _UnlockedGetInstanceList(self):
761 """Get the list of instances.
763 This function is for internal use, when the config lock is already held.
766 return self._config_data.instances.keys()
768 @locking.ssynchronized(_config_lock, shared=1)
769 def GetInstanceList(self):
770 """Get the list of instances.
772 @return: array of instances, ex. ['instance2.example.com',
773 'instance1.example.com']
776 return self._UnlockedGetInstanceList()
778 @locking.ssynchronized(_config_lock, shared=1)
779 def ExpandInstanceName(self, short_name):
780 """Attempt to expand an incomplete instance name.
783 return utils.MatchNameComponent(short_name,
784 self._config_data.instances.keys())
786 def _UnlockedGetInstanceInfo(self, instance_name):
787 """Returns informations about an instance.
789 This function is for internal use, when the config lock is already held.
792 if instance_name not in self._config_data.instances:
795 return self._config_data.instances[instance_name]
797 @locking.ssynchronized(_config_lock, shared=1)
798 def GetInstanceInfo(self, instance_name):
799 """Returns informations about an instance.
801 It takes the information from the configuration file. Other informations of
802 an instance are taken from the live systems.
804 @param instance_name: name of the instance, e.g.
805 I{instance1.example.com}
807 @rtype: L{objects.Instance}
808 @return: the instance object
811 return self._UnlockedGetInstanceInfo(instance_name)
813 @locking.ssynchronized(_config_lock, shared=1)
814 def GetAllInstancesInfo(self):
815 """Get the configuration of all instances.
818 @returns: dict of (instance, instance_info), where instance_info is what
819 would GetInstanceInfo return for the node
822 my_dict = dict([(instance, self._UnlockedGetInstanceInfo(instance))
823 for instance in self._UnlockedGetInstanceList()])
826 @locking.ssynchronized(_config_lock)
827 def AddNode(self, node):
828 """Add a node to the configuration.
830 @type node: L{objects.Node}
831 @param node: a Node instance
834 logging.info("Adding node %s to configuration" % node.name)
837 self._config_data.nodes[node.name] = node
838 self._config_data.cluster.serial_no += 1
841 @locking.ssynchronized(_config_lock)
842 def RemoveNode(self, node_name):
843 """Remove a node from the configuration.
846 logging.info("Removing node %s from configuration" % node_name)
848 if node_name not in self._config_data.nodes:
849 raise errors.ConfigurationError("Unknown node '%s'" % node_name)
851 del self._config_data.nodes[node_name]
852 self._config_data.cluster.serial_no += 1
855 @locking.ssynchronized(_config_lock, shared=1)
856 def ExpandNodeName(self, short_name):
857 """Attempt to expand an incomplete instance name.
860 return utils.MatchNameComponent(short_name,
861 self._config_data.nodes.keys())
863 def _UnlockedGetNodeInfo(self, node_name):
864 """Get the configuration of a node, as stored in the config.
866 This function is for internal use, when the config lock is already
869 @param node_name: the node name, e.g. I{node1.example.com}
871 @rtype: L{objects.Node}
872 @return: the node object
875 if node_name not in self._config_data.nodes:
878 return self._config_data.nodes[node_name]
881 @locking.ssynchronized(_config_lock, shared=1)
882 def GetNodeInfo(self, node_name):
883 """Get the configuration of a node, as stored in the config.
885 This is just a locked wrapper over L{_UnlockedGetNodeInfo}.
887 @param node_name: the node name, e.g. I{node1.example.com}
889 @rtype: L{objects.Node}
890 @return: the node object
893 return self._UnlockedGetNodeInfo(node_name)
895 def _UnlockedGetNodeList(self):
896 """Return the list of nodes which are in the configuration.
898 This function is for internal use, when the config lock is already
904 return self._config_data.nodes.keys()
907 @locking.ssynchronized(_config_lock, shared=1)
908 def GetNodeList(self):
909 """Return the list of nodes which are in the configuration.
912 return self._UnlockedGetNodeList()
914 @locking.ssynchronized(_config_lock, shared=1)
915 def GetOnlineNodeList(self):
916 """Return the list of nodes which are online.
919 all_nodes = [self._UnlockedGetNodeInfo(node)
920 for node in self._UnlockedGetNodeList()]
921 return [node.name for node in all_nodes if not node.offline]
923 @locking.ssynchronized(_config_lock, shared=1)
924 def GetAllNodesInfo(self):
925 """Get the configuration of all nodes.
928 @return: dict of (node, node_info), where node_info is what
929 would GetNodeInfo return for the node
932 my_dict = dict([(node, self._UnlockedGetNodeInfo(node))
933 for node in self._UnlockedGetNodeList()])
936 def _UnlockedGetMasterCandidateStats(self):
937 """Get the number of current and maximum desired and possible candidates.
940 @return: tuple of (current, desired and possible)
944 for node in self._config_data.nodes.itervalues():
945 if not (node.offline or node.drained):
947 if node.master_candidate:
949 mc_max = min(mc_max, self._config_data.cluster.candidate_pool_size)
950 return (mc_now, mc_max)
952 @locking.ssynchronized(_config_lock, shared=1)
953 def GetMasterCandidateStats(self):
954 """Get the number of current and maximum possible candidates.
956 This is just a wrapper over L{_UnlockedGetMasterCandidateStats}.
959 @return: tuple of (current, max)
962 return self._UnlockedGetMasterCandidateStats()
964 @locking.ssynchronized(_config_lock)
965 def MaintainCandidatePool(self):
966 """Try to grow the candidate pool to the desired size.
969 @return: list with the adjusted nodes (L{objects.Node} instances)
972 mc_now, mc_max = self._UnlockedGetMasterCandidateStats()
975 node_list = self._config_data.nodes.keys()
976 random.shuffle(node_list)
977 for name in node_list:
980 node = self._config_data.nodes[name]
981 if node.master_candidate or node.offline or node.drained:
983 mod_list.append(node)
984 node.master_candidate = True
988 # this should not happen
989 logging.warning("Warning: MaintainCandidatePool didn't manage to"
990 " fill the candidate pool (%d/%d)", mc_now, mc_max)
992 self._config_data.cluster.serial_no += 1
997 def _BumpSerialNo(self):
998 """Bump up the serial number of the config.
1001 self._config_data.serial_no += 1
1003 def _OpenConfig(self):
1004 """Read the config data from disk.
1007 f = open(self._cfg_file, 'r')
1010 data = objects.ConfigData.FromDict(serializer.Load(f.read()))
1011 except Exception, err:
1012 raise errors.ConfigurationError(err)
1016 # Make sure the configuration has the right version
1017 _ValidateConfig(data)
1019 if (not hasattr(data, 'cluster') or
1020 not hasattr(data.cluster, 'rsahostkeypub')):
1021 raise errors.ConfigurationError("Incomplete configuration"
1022 " (missing cluster.rsahostkeypub)")
1023 self._config_data = data
1024 # reset the last serial as -1 so that the next write will cause
1026 self._last_cluster_serial = -1
1028 def _DistributeConfig(self):
1029 """Distribute the configuration to the other nodes.
1031 Currently, this only copies the configuration file. In the future,
1032 it could be used to encapsulate the 2/3-phase update mechanism.
1041 myhostname = self._my_hostname
1042 # we can skip checking whether _UnlockedGetNodeInfo returns None
1043 # since the node list comes from _UnlocketGetNodeList, and we are
1044 # called with the lock held, so no modifications should take place
1046 for node_name in self._UnlockedGetNodeList():
1047 if node_name == myhostname:
1049 node_info = self._UnlockedGetNodeInfo(node_name)
1050 if not node_info.master_candidate:
1052 node_list.append(node_info.name)
1053 addr_list.append(node_info.primary_ip)
1055 result = rpc.RpcRunner.call_upload_file(node_list, self._cfg_file,
1056 address_list=addr_list)
1057 for node in node_list:
1058 if not result[node]:
1059 logging.error("copy of file %s to node %s failed",
1060 self._cfg_file, node)
1064 def _WriteConfig(self, destination=None):
1065 """Write the configuration data to persistent storage.
1068 config_errors = self._UnlockedVerifyConfig()
1070 raise errors.ConfigurationError("Configuration data is not"
1072 (", ".join(config_errors)))
1073 if destination is None:
1074 destination = self._cfg_file
1075 self._BumpSerialNo()
1076 txt = serializer.Dump(self._config_data.ToDict())
1077 dir_name, file_name = os.path.split(destination)
1078 fd, name = tempfile.mkstemp('.newconfig', file_name, dir_name)
1079 f = os.fdopen(fd, 'w')
1082 os.fsync(f.fileno())
1085 # we don't need to do os.close(fd) as f.close() did it
1086 os.rename(name, destination)
1087 self.write_count += 1
1089 # and redistribute the config file to master candidates
1090 self._DistributeConfig()
1092 # Write ssconf files on all nodes (including locally)
1093 if self._last_cluster_serial < self._config_data.cluster.serial_no:
1094 if not self._offline:
1095 rpc.RpcRunner.call_write_ssconf_files(self._UnlockedGetNodeList(),
1096 self._UnlockedGetSsconfValues())
1097 self._last_cluster_serial = self._config_data.cluster.serial_no
1099 def _UnlockedGetSsconfValues(self):
1100 """Return the values needed by ssconf.
1103 @return: a dictionary with keys the ssconf names and values their
1108 instance_names = utils.NiceSort(self._UnlockedGetInstanceList())
1109 node_names = utils.NiceSort(self._UnlockedGetNodeList())
1110 node_info = [self._UnlockedGetNodeInfo(name) for name in node_names]
1112 instance_data = fn(instance_names)
1113 off_data = fn(node.name for node in node_info if node.offline)
1114 on_data = fn(node.name for node in node_info if not node.offline)
1115 mc_data = fn(node.name for node in node_info if node.master_candidate)
1116 node_data = fn(node_names)
1118 cluster = self._config_data.cluster
1120 constants.SS_CLUSTER_NAME: cluster.cluster_name,
1121 constants.SS_FILE_STORAGE_DIR: cluster.file_storage_dir,
1122 constants.SS_MASTER_CANDIDATES: mc_data,
1123 constants.SS_MASTER_IP: cluster.master_ip,
1124 constants.SS_MASTER_NETDEV: cluster.master_netdev,
1125 constants.SS_MASTER_NODE: cluster.master_node,
1126 constants.SS_NODE_LIST: node_data,
1127 constants.SS_OFFLINE_NODES: off_data,
1128 constants.SS_ONLINE_NODES: on_data,
1129 constants.SS_INSTANCE_LIST: instance_data,
1130 constants.SS_RELEASE_VERSION: constants.RELEASE_VERSION,
1133 @locking.ssynchronized(_config_lock)
1134 def InitConfig(self, version, cluster_config, master_node_config):
1135 """Create the initial cluster configuration.
1137 It will contain the current node, which will also be the master
1138 node, and no instances.
1141 @param version: Configuration version
1142 @type cluster_config: objects.Cluster
1143 @param cluster_config: Cluster configuration
1144 @type master_node_config: objects.Node
1145 @param master_node_config: Master node configuration
1149 master_node_config.name: master_node_config,
1152 self._config_data = objects.ConfigData(version=version,
1153 cluster=cluster_config,
1159 @locking.ssynchronized(_config_lock, shared=1)
1160 def GetVGName(self):
1161 """Return the volume group name.
1164 return self._config_data.cluster.volume_group_name
1166 @locking.ssynchronized(_config_lock)
1167 def SetVGName(self, vg_name):
1168 """Set the volume group name.
1171 self._config_data.cluster.volume_group_name = vg_name
1172 self._config_data.cluster.serial_no += 1
1175 @locking.ssynchronized(_config_lock, shared=1)
1176 def GetDefBridge(self):
1177 """Return the default bridge.
1180 return self._config_data.cluster.default_bridge
1182 @locking.ssynchronized(_config_lock, shared=1)
1183 def GetMACPrefix(self):
1184 """Return the mac prefix.
1187 return self._config_data.cluster.mac_prefix
1189 @locking.ssynchronized(_config_lock, shared=1)
1190 def GetClusterInfo(self):
1191 """Returns informations about the cluster
1193 @rtype: L{objects.Cluster}
1194 @return: the cluster object
1197 return self._config_data.cluster
1199 @locking.ssynchronized(_config_lock)
1200 def Update(self, target):
1201 """Notify function to be called after updates.
1203 This function must be called when an object (as returned by
1204 GetInstanceInfo, GetNodeInfo, GetCluster) has been updated and the
1205 caller wants the modifications saved to the backing store. Note
1206 that all modified objects will be saved, but the target argument
1207 is the one the caller wants to ensure that it's saved.
1209 @param target: an instance of either L{objects.Cluster},
1210 L{objects.Node} or L{objects.Instance} which is existing in
1214 if self._config_data is None:
1215 raise errors.ProgrammerError("Configuration file not read,"
1217 update_serial = False
1218 if isinstance(target, objects.Cluster):
1219 test = target == self._config_data.cluster
1220 elif isinstance(target, objects.Node):
1221 test = target in self._config_data.nodes.values()
1222 update_serial = True
1223 elif isinstance(target, objects.Instance):
1224 test = target in self._config_data.instances.values()
1226 raise errors.ProgrammerError("Invalid object type (%s) passed to"
1227 " ConfigWriter.Update" % type(target))
1229 raise errors.ConfigurationError("Configuration updated since object"
1230 " has been read or unknown object")
1231 target.serial_no += 1
1234 # for node updates, we need to increase the cluster serial too
1235 self._config_data.cluster.serial_no += 1
1237 if isinstance(target, objects.Instance):
1238 self._UnlockedReleaseDRBDMinors(target.name)