4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Configuration management for Ganeti
24 This module provides the interface to the Ganeti cluster configuration.
26 The configuration data is stored on every node but is updated on the master
27 only. After each update, the master distributes the data to the other nodes.
29 Currently, the data storage format is JSON. YAML was slow and consuming too
34 # pylint: disable-msg=R0904
35 # R0904: Too many public methods
42 from ganeti import errors
43 from ganeti import locking
44 from ganeti import utils
45 from ganeti import constants
46 from ganeti import rpc
47 from ganeti import objects
48 from ganeti import serializer
49 from ganeti import uidpool
50 from ganeti import netutils
51 from ganeti import runtime
54 _config_lock = locking.SharedLock("ConfigWriter")
56 # job id used for resource management at config upgrade time
57 _UPGRADE_CONFIG_JID = "jid-cfg-upgrade"
60 def _ValidateConfig(data):
61 """Verifies that a configuration objects looks valid.
63 This only verifies the version of the configuration.
65 @raise errors.ConfigurationError: if the version differs from what
69 if data.version != constants.CONFIG_VERSION:
70 raise errors.ConfigVersionMismatch(constants.CONFIG_VERSION, data.version)
73 class TemporaryReservationManager:
74 """A temporary resource reservation manager.
76 This is used to reserve resources in a job, before using them, making sure
77 other jobs cannot get them in the meantime.
81 self._ec_reserved = {}
83 def Reserved(self, resource):
84 for holder_reserved in self._ec_reserved.values():
85 if resource in holder_reserved:
89 def Reserve(self, ec_id, resource):
90 if self.Reserved(resource):
91 raise errors.ReservationError("Duplicate reservation for resource '%s'"
93 if ec_id not in self._ec_reserved:
94 self._ec_reserved[ec_id] = set([resource])
96 self._ec_reserved[ec_id].add(resource)
98 def DropECReservations(self, ec_id):
99 if ec_id in self._ec_reserved:
100 del self._ec_reserved[ec_id]
102 def GetReserved(self):
104 for holder_reserved in self._ec_reserved.values():
105 all_reserved.update(holder_reserved)
108 def Generate(self, existing, generate_one_fn, ec_id):
109 """Generate a new resource of this type
112 assert callable(generate_one_fn)
114 all_elems = self.GetReserved()
115 all_elems.update(existing)
118 new_resource = generate_one_fn()
119 if new_resource is not None and new_resource not in all_elems:
122 raise errors.ConfigurationError("Not able generate new resource"
123 " (last tried: %s)" % new_resource)
124 self.Reserve(ec_id, new_resource)
129 """The interface to the cluster configuration.
131 @ivar _temporary_lvs: reservation manager for temporary LVs
132 @ivar _all_rms: a list of all temporary reservation managers
135 def __init__(self, cfg_file=None, offline=False, _getents=runtime.GetEnts,
136 accept_foreign=False):
138 self._lock = _config_lock
139 self._config_data = None
140 self._offline = offline
142 self._cfg_file = constants.CLUSTER_CONF_FILE
144 self._cfg_file = cfg_file
145 self._getents = _getents
146 self._temporary_ids = TemporaryReservationManager()
147 self._temporary_drbds = {}
148 self._temporary_macs = TemporaryReservationManager()
149 self._temporary_secrets = TemporaryReservationManager()
150 self._temporary_lvs = TemporaryReservationManager()
151 self._all_rms = [self._temporary_ids, self._temporary_macs,
152 self._temporary_secrets, self._temporary_lvs]
153 # Note: in order to prevent errors when resolving our name in
154 # _DistributeConfig, we compute it here once and reuse it; it's
155 # better to raise an error before starting to modify the config
156 # file than after it was modified
157 self._my_hostname = netutils.Hostname.GetSysName()
158 self._last_cluster_serial = -1
160 self._OpenConfig(accept_foreign)
162 # this method needs to be static, so that we can call it on the class
165 """Check if the cluster is configured.
168 return os.path.exists(constants.CLUSTER_CONF_FILE)
170 def _GenerateOneMAC(self):
171 """Generate one mac address
174 prefix = self._config_data.cluster.mac_prefix
175 byte1 = random.randrange(0, 256)
176 byte2 = random.randrange(0, 256)
177 byte3 = random.randrange(0, 256)
178 mac = "%s:%02x:%02x:%02x" % (prefix, byte1, byte2, byte3)
181 @locking.ssynchronized(_config_lock, shared=1)
182 def GetNdParams(self, node):
183 """Get the node params populated with cluster defaults.
185 @type node: L{object.Node}
186 @param node: The node we want to know the params for
187 @return: A dict with the filled in node params
190 nodegroup = self._UnlockedGetNodeGroup(node.group)
191 return self._config_data.cluster.FillND(node, nodegroup)
193 @locking.ssynchronized(_config_lock, shared=1)
194 def GenerateMAC(self, ec_id):
195 """Generate a MAC for an instance.
197 This should check the current instances for duplicates.
200 existing = self._AllMACs()
201 return self._temporary_ids.Generate(existing, self._GenerateOneMAC, ec_id)
203 @locking.ssynchronized(_config_lock, shared=1)
204 def ReserveMAC(self, mac, ec_id):
205 """Reserve a MAC for an instance.
207 This only checks instances managed by this cluster, it does not
208 check for potential collisions elsewhere.
211 all_macs = self._AllMACs()
213 raise errors.ReservationError("mac already in use")
215 self._temporary_macs.Reserve(mac, ec_id)
217 @locking.ssynchronized(_config_lock, shared=1)
218 def ReserveLV(self, lv_name, ec_id):
219 """Reserve an VG/LV pair for an instance.
221 @type lv_name: string
222 @param lv_name: the logical volume name to reserve
225 all_lvs = self._AllLVs()
226 if lv_name in all_lvs:
227 raise errors.ReservationError("LV already in use")
229 self._temporary_lvs.Reserve(lv_name, ec_id)
231 @locking.ssynchronized(_config_lock, shared=1)
232 def GenerateDRBDSecret(self, ec_id):
233 """Generate a DRBD secret.
235 This checks the current disks for duplicates.
238 return self._temporary_secrets.Generate(self._AllDRBDSecrets(),
239 utils.GenerateSecret,
243 """Compute the list of all LVs.
247 for instance in self._config_data.instances.values():
248 node_data = instance.MapLVsByNode()
249 for lv_list in node_data.values():
250 lvnames.update(lv_list)
253 def _AllIDs(self, include_temporary):
254 """Compute the list of all UUIDs and names we have.
256 @type include_temporary: boolean
257 @param include_temporary: whether to include the _temporary_ids set
259 @return: a set of IDs
263 if include_temporary:
264 existing.update(self._temporary_ids.GetReserved())
265 existing.update(self._AllLVs())
266 existing.update(self._config_data.instances.keys())
267 existing.update(self._config_data.nodes.keys())
268 existing.update([i.uuid for i in self._AllUUIDObjects() if i.uuid])
271 def _GenerateUniqueID(self, ec_id):
272 """Generate an unique UUID.
274 This checks the current node, instances and disk names for
278 @return: the unique id
281 existing = self._AllIDs(include_temporary=False)
282 return self._temporary_ids.Generate(existing, utils.NewUUID, ec_id)
284 @locking.ssynchronized(_config_lock, shared=1)
285 def GenerateUniqueID(self, ec_id):
286 """Generate an unique ID.
288 This is just a wrapper over the unlocked version.
291 @param ec_id: unique id for the job to reserve the id to
294 return self._GenerateUniqueID(ec_id)
297 """Return all MACs present in the config.
300 @return: the list of all MACs
304 for instance in self._config_data.instances.values():
305 for nic in instance.nics:
306 result.append(nic.mac)
310 def _AllDRBDSecrets(self):
311 """Return all DRBD secrets present in the config.
314 @return: the list of all DRBD secrets
317 def helper(disk, result):
318 """Recursively gather secrets from this disk."""
319 if disk.dev_type == constants.DT_DRBD8:
320 result.append(disk.logical_id[5])
322 for child in disk.children:
323 helper(child, result)
326 for instance in self._config_data.instances.values():
327 for disk in instance.disks:
332 def _CheckDiskIDs(self, disk, l_ids, p_ids):
333 """Compute duplicate disk IDs
335 @type disk: L{objects.Disk}
336 @param disk: the disk at which to start searching
338 @param l_ids: list of current logical ids
340 @param p_ids: list of current physical ids
342 @return: a list of error messages
346 if disk.logical_id is not None:
347 if disk.logical_id in l_ids:
348 result.append("duplicate logical id %s" % str(disk.logical_id))
350 l_ids.append(disk.logical_id)
351 if disk.physical_id is not None:
352 if disk.physical_id in p_ids:
353 result.append("duplicate physical id %s" % str(disk.physical_id))
355 p_ids.append(disk.physical_id)
358 for child in disk.children:
359 result.extend(self._CheckDiskIDs(child, l_ids, p_ids))
362 def _UnlockedVerifyConfig(self):
366 @return: a list of error messages; a non-empty list signifies
370 # pylint: disable-msg=R0914
374 data = self._config_data
375 cluster = data.cluster
379 # global cluster checks
380 if not cluster.enabled_hypervisors:
381 result.append("enabled hypervisors list doesn't have any entries")
382 invalid_hvs = set(cluster.enabled_hypervisors) - constants.HYPER_TYPES
384 result.append("enabled hypervisors contains invalid entries: %s" %
386 missing_hvp = (set(cluster.enabled_hypervisors) -
387 set(cluster.hvparams.keys()))
389 result.append("hypervisor parameters missing for the enabled"
390 " hypervisor(s) %s" % utils.CommaJoin(missing_hvp))
392 if cluster.master_node not in data.nodes:
393 result.append("cluster has invalid primary node '%s'" %
396 def _helper(owner, attr, value, template):
398 utils.ForceDictType(value, template)
399 except errors.GenericError, err:
400 result.append("%s has invalid %s: %s" % (owner, attr, err))
402 def _helper_nic(owner, params):
404 objects.NIC.CheckParameterSyntax(params)
405 except errors.ConfigurationError, err:
406 result.append("%s has invalid nicparams: %s" % (owner, err))
408 # check cluster parameters
409 _helper("cluster", "beparams", cluster.SimpleFillBE({}),
410 constants.BES_PARAMETER_TYPES)
411 _helper("cluster", "nicparams", cluster.SimpleFillNIC({}),
412 constants.NICS_PARAMETER_TYPES)
413 _helper_nic("cluster", cluster.SimpleFillNIC({}))
414 _helper("cluster", "ndparams", cluster.SimpleFillND({}),
415 constants.NDS_PARAMETER_TYPES)
417 # per-instance checks
418 for instance_name in data.instances:
419 instance = data.instances[instance_name]
420 if instance.name != instance_name:
421 result.append("instance '%s' is indexed by wrong name '%s'" %
422 (instance.name, instance_name))
423 if instance.primary_node not in data.nodes:
424 result.append("instance '%s' has invalid primary node '%s'" %
425 (instance_name, instance.primary_node))
426 for snode in instance.secondary_nodes:
427 if snode not in data.nodes:
428 result.append("instance '%s' has invalid secondary node '%s'" %
429 (instance_name, snode))
430 for idx, nic in enumerate(instance.nics):
431 if nic.mac in seen_macs:
432 result.append("instance '%s' has NIC %d mac %s duplicate" %
433 (instance_name, idx, nic.mac))
435 seen_macs.append(nic.mac)
437 filled = cluster.SimpleFillNIC(nic.nicparams)
438 owner = "instance %s nic %d" % (instance.name, idx)
439 _helper(owner, "nicparams",
440 filled, constants.NICS_PARAMETER_TYPES)
441 _helper_nic(owner, filled)
444 if instance.beparams:
445 _helper("instance %s" % instance.name, "beparams",
446 cluster.FillBE(instance), constants.BES_PARAMETER_TYPES)
448 # gather the drbd ports for duplicate checks
449 for dsk in instance.disks:
450 if dsk.dev_type in constants.LDS_DRBD:
451 tcp_port = dsk.logical_id[2]
452 if tcp_port not in ports:
454 ports[tcp_port].append((instance.name, "drbd disk %s" % dsk.iv_name))
455 # gather network port reservation
456 net_port = getattr(instance, "network_port", None)
457 if net_port is not None:
458 if net_port not in ports:
460 ports[net_port].append((instance.name, "network port"))
462 # instance disk verify
463 for idx, disk in enumerate(instance.disks):
464 result.extend(["instance '%s' disk %d error: %s" %
465 (instance.name, idx, msg) for msg in disk.Verify()])
466 result.extend(self._CheckDiskIDs(disk, seen_lids, seen_pids))
468 # cluster-wide pool of free ports
469 for free_port in cluster.tcpudp_port_pool:
470 if free_port not in ports:
471 ports[free_port] = []
472 ports[free_port].append(("cluster", "port marked as free"))
474 # compute tcp/udp duplicate ports
480 txt = utils.CommaJoin(["%s/%s" % val for val in pdata])
481 result.append("tcp/udp port %s has duplicates: %s" % (pnum, txt))
483 # highest used tcp port check
485 if keys[-1] > cluster.highest_used_port:
486 result.append("Highest used port mismatch, saved %s, computed %s" %
487 (cluster.highest_used_port, keys[-1]))
489 if not data.nodes[cluster.master_node].master_candidate:
490 result.append("Master node is not a master candidate")
492 # master candidate checks
493 mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats()
495 result.append("Not enough master candidates: actual %d, target %d" %
499 for node_name, node in data.nodes.items():
500 if node.name != node_name:
501 result.append("Node '%s' is indexed by wrong name '%s'" %
502 (node.name, node_name))
503 if [node.master_candidate, node.drained, node.offline].count(True) > 1:
504 result.append("Node %s state is invalid: master_candidate=%s,"
505 " drain=%s, offline=%s" %
506 (node.name, node.master_candidate, node.drained,
508 if node.group not in data.nodegroups:
509 result.append("Node '%s' has invalid group '%s'" %
510 (node.name, node.group))
512 _helper("node %s" % node.name, "ndparams",
513 cluster.FillND(node, data.nodegroups[node.group]),
514 constants.NDS_PARAMETER_TYPES)
517 nodegroups_names = set()
518 for nodegroup_uuid in data.nodegroups:
519 nodegroup = data.nodegroups[nodegroup_uuid]
520 if nodegroup.uuid != nodegroup_uuid:
521 result.append("node group '%s' (uuid: '%s') indexed by wrong uuid '%s'"
522 % (nodegroup.name, nodegroup.uuid, nodegroup_uuid))
523 if utils.UUID_RE.match(nodegroup.name.lower()):
524 result.append("node group '%s' (uuid: '%s') has uuid-like name" %
525 (nodegroup.name, nodegroup.uuid))
526 if nodegroup.name in nodegroups_names:
527 result.append("duplicate node group name '%s'" % nodegroup.name)
529 nodegroups_names.add(nodegroup.name)
530 if nodegroup.ndparams:
531 _helper("group %s" % nodegroup.name, "ndparams",
532 cluster.SimpleFillND(nodegroup.ndparams),
533 constants.NDS_PARAMETER_TYPES)
537 _, duplicates = self._UnlockedComputeDRBDMap()
538 for node, minor, instance_a, instance_b in duplicates:
539 result.append("DRBD minor %d on node %s is assigned twice to instances"
540 " %s and %s" % (minor, node, instance_a, instance_b))
543 default_nicparams = cluster.nicparams[constants.PP_DEFAULT]
546 def _AddIpAddress(ip, name):
547 ips.setdefault(ip, []).append(name)
549 _AddIpAddress(cluster.master_ip, "cluster_ip")
551 for node in data.nodes.values():
552 _AddIpAddress(node.primary_ip, "node:%s/primary" % node.name)
553 if node.secondary_ip != node.primary_ip:
554 _AddIpAddress(node.secondary_ip, "node:%s/secondary" % node.name)
556 for instance in data.instances.values():
557 for idx, nic in enumerate(instance.nics):
561 nicparams = objects.FillDict(default_nicparams, nic.nicparams)
562 nic_mode = nicparams[constants.NIC_MODE]
563 nic_link = nicparams[constants.NIC_LINK]
565 if nic_mode == constants.NIC_MODE_BRIDGED:
566 link = "bridge:%s" % nic_link
567 elif nic_mode == constants.NIC_MODE_ROUTED:
568 link = "route:%s" % nic_link
570 raise errors.ProgrammerError("NIC mode '%s' not handled" % nic_mode)
572 _AddIpAddress("%s/%s" % (link, nic.ip),
573 "instance:%s/nic:%d" % (instance.name, idx))
575 for ip, owners in ips.items():
577 result.append("IP address %s is used by multiple owners: %s" %
578 (ip, utils.CommaJoin(owners)))
582 @locking.ssynchronized(_config_lock, shared=1)
583 def VerifyConfig(self):
586 This is just a wrapper over L{_UnlockedVerifyConfig}.
589 @return: a list of error messages; a non-empty list signifies
593 return self._UnlockedVerifyConfig()
595 def _UnlockedSetDiskID(self, disk, node_name):
596 """Convert the unique ID to the ID needed on the target nodes.
598 This is used only for drbd, which needs ip/port configuration.
600 The routine descends down and updates its children also, because
601 this helps when the only the top device is passed to the remote
604 This function is for internal use, when the config lock is already held.
608 for child in disk.children:
609 self._UnlockedSetDiskID(child, node_name)
611 if disk.logical_id is None and disk.physical_id is not None:
613 if disk.dev_type == constants.LD_DRBD8:
614 pnode, snode, port, pminor, sminor, secret = disk.logical_id
615 if node_name not in (pnode, snode):
616 raise errors.ConfigurationError("DRBD device not knowing node %s" %
618 pnode_info = self._UnlockedGetNodeInfo(pnode)
619 snode_info = self._UnlockedGetNodeInfo(snode)
620 if pnode_info is None or snode_info is None:
621 raise errors.ConfigurationError("Can't find primary or secondary node"
622 " for %s" % str(disk))
623 p_data = (pnode_info.secondary_ip, port)
624 s_data = (snode_info.secondary_ip, port)
625 if pnode == node_name:
626 disk.physical_id = p_data + s_data + (pminor, secret)
627 else: # it must be secondary, we tested above
628 disk.physical_id = s_data + p_data + (sminor, secret)
630 disk.physical_id = disk.logical_id
633 @locking.ssynchronized(_config_lock)
634 def SetDiskID(self, disk, node_name):
635 """Convert the unique ID to the ID needed on the target nodes.
637 This is used only for drbd, which needs ip/port configuration.
639 The routine descends down and updates its children also, because
640 this helps when the only the top device is passed to the remote
644 return self._UnlockedSetDiskID(disk, node_name)
646 @locking.ssynchronized(_config_lock)
647 def AddTcpUdpPort(self, port):
648 """Adds a new port to the available port pool.
651 if not isinstance(port, int):
652 raise errors.ProgrammerError("Invalid type passed for port")
654 self._config_data.cluster.tcpudp_port_pool.add(port)
657 @locking.ssynchronized(_config_lock, shared=1)
658 def GetPortList(self):
659 """Returns a copy of the current port list.
662 return self._config_data.cluster.tcpudp_port_pool.copy()
664 @locking.ssynchronized(_config_lock)
665 def AllocatePort(self):
668 The port will be taken from the available port pool or from the
669 default port range (and in this case we increase
673 # If there are TCP/IP ports configured, we use them first.
674 if self._config_data.cluster.tcpudp_port_pool:
675 port = self._config_data.cluster.tcpudp_port_pool.pop()
677 port = self._config_data.cluster.highest_used_port + 1
678 if port >= constants.LAST_DRBD_PORT:
679 raise errors.ConfigurationError("The highest used port is greater"
680 " than %s. Aborting." %
681 constants.LAST_DRBD_PORT)
682 self._config_data.cluster.highest_used_port = port
687 def _UnlockedComputeDRBDMap(self):
688 """Compute the used DRBD minor/nodes.
691 @return: dictionary of node_name: dict of minor: instance_name;
692 the returned dict will have all the nodes in it (even if with
693 an empty list), and a list of duplicates; if the duplicates
694 list is not empty, the configuration is corrupted and its caller
695 should raise an exception
698 def _AppendUsedPorts(instance_name, disk, used):
700 if disk.dev_type == constants.LD_DRBD8 and len(disk.logical_id) >= 5:
701 node_a, node_b, _, minor_a, minor_b = disk.logical_id[:5]
702 for node, port in ((node_a, minor_a), (node_b, minor_b)):
703 assert node in used, ("Node '%s' of instance '%s' not found"
704 " in node list" % (node, instance_name))
705 if port in used[node]:
706 duplicates.append((node, port, instance_name, used[node][port]))
708 used[node][port] = instance_name
710 for child in disk.children:
711 duplicates.extend(_AppendUsedPorts(instance_name, child, used))
715 my_dict = dict((node, {}) for node in self._config_data.nodes)
716 for instance in self._config_data.instances.itervalues():
717 for disk in instance.disks:
718 duplicates.extend(_AppendUsedPorts(instance.name, disk, my_dict))
719 for (node, minor), instance in self._temporary_drbds.iteritems():
720 if minor in my_dict[node] and my_dict[node][minor] != instance:
721 duplicates.append((node, minor, instance, my_dict[node][minor]))
723 my_dict[node][minor] = instance
724 return my_dict, duplicates
726 @locking.ssynchronized(_config_lock)
727 def ComputeDRBDMap(self):
728 """Compute the used DRBD minor/nodes.
730 This is just a wrapper over L{_UnlockedComputeDRBDMap}.
732 @return: dictionary of node_name: dict of minor: instance_name;
733 the returned dict will have all the nodes in it (even if with
737 d_map, duplicates = self._UnlockedComputeDRBDMap()
739 raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
743 @locking.ssynchronized(_config_lock)
744 def AllocateDRBDMinor(self, nodes, instance):
745 """Allocate a drbd minor.
747 The free minor will be automatically computed from the existing
748 devices. A node can be given multiple times in order to allocate
749 multiple minors. The result is the list of minors, in the same
750 order as the passed nodes.
752 @type instance: string
753 @param instance: the instance for which we allocate minors
756 assert isinstance(instance, basestring), \
757 "Invalid argument '%s' passed to AllocateDRBDMinor" % instance
759 d_map, duplicates = self._UnlockedComputeDRBDMap()
761 raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
767 # no minors used, we can start at 0
770 self._temporary_drbds[(nname, 0)] = instance
774 ffree = utils.FirstFree(keys)
776 # return the next minor
777 # TODO: implement high-limit check
781 # double-check minor against current instances
782 assert minor not in d_map[nname], \
783 ("Attempt to reuse allocated DRBD minor %d on node %s,"
784 " already allocated to instance %s" %
785 (minor, nname, d_map[nname][minor]))
786 ndata[minor] = instance
787 # double-check minor against reservation
788 r_key = (nname, minor)
789 assert r_key not in self._temporary_drbds, \
790 ("Attempt to reuse reserved DRBD minor %d on node %s,"
791 " reserved for instance %s" %
792 (minor, nname, self._temporary_drbds[r_key]))
793 self._temporary_drbds[r_key] = instance
795 logging.debug("Request to allocate drbd minors, input: %s, returning %s",
799 def _UnlockedReleaseDRBDMinors(self, instance):
800 """Release temporary drbd minors allocated for a given instance.
802 @type instance: string
803 @param instance: the instance for which temporary minors should be
807 assert isinstance(instance, basestring), \
808 "Invalid argument passed to ReleaseDRBDMinors"
809 for key, name in self._temporary_drbds.items():
811 del self._temporary_drbds[key]
813 @locking.ssynchronized(_config_lock)
814 def ReleaseDRBDMinors(self, instance):
815 """Release temporary drbd minors allocated for a given instance.
817 This should be called on the error paths, on the success paths
818 it's automatically called by the ConfigWriter add and update
821 This function is just a wrapper over L{_UnlockedReleaseDRBDMinors}.
823 @type instance: string
824 @param instance: the instance for which temporary minors should be
828 self._UnlockedReleaseDRBDMinors(instance)
830 @locking.ssynchronized(_config_lock, shared=1)
831 def GetConfigVersion(self):
832 """Get the configuration version.
834 @return: Config version
837 return self._config_data.version
839 @locking.ssynchronized(_config_lock, shared=1)
840 def GetClusterName(self):
843 @return: Cluster name
846 return self._config_data.cluster.cluster_name
848 @locking.ssynchronized(_config_lock, shared=1)
849 def GetMasterNode(self):
850 """Get the hostname of the master node for this cluster.
852 @return: Master hostname
855 return self._config_data.cluster.master_node
857 @locking.ssynchronized(_config_lock, shared=1)
858 def GetMasterIP(self):
859 """Get the IP of the master node for this cluster.
864 return self._config_data.cluster.master_ip
866 @locking.ssynchronized(_config_lock, shared=1)
867 def GetMasterNetdev(self):
868 """Get the master network device for this cluster.
871 return self._config_data.cluster.master_netdev
873 @locking.ssynchronized(_config_lock, shared=1)
874 def GetFileStorageDir(self):
875 """Get the file storage dir for this cluster.
878 return self._config_data.cluster.file_storage_dir
880 @locking.ssynchronized(_config_lock, shared=1)
881 def GetHypervisorType(self):
882 """Get the hypervisor type for this cluster.
885 return self._config_data.cluster.enabled_hypervisors[0]
887 @locking.ssynchronized(_config_lock, shared=1)
888 def GetHostKey(self):
889 """Return the rsa hostkey from the config.
892 @return: the rsa hostkey
895 return self._config_data.cluster.rsahostkeypub
897 @locking.ssynchronized(_config_lock, shared=1)
898 def GetDefaultIAllocator(self):
899 """Get the default instance allocator for this cluster.
902 return self._config_data.cluster.default_iallocator
904 @locking.ssynchronized(_config_lock, shared=1)
905 def GetPrimaryIPFamily(self):
906 """Get cluster primary ip family.
908 @return: primary ip family
911 return self._config_data.cluster.primary_ip_family
913 @locking.ssynchronized(_config_lock)
914 def AddNodeGroup(self, group, ec_id, check_uuid=True):
915 """Add a node group to the configuration.
917 This method calls group.UpgradeConfig() to fill any missing attributes
918 according to their default values.
920 @type group: L{objects.NodeGroup}
921 @param group: the NodeGroup object to add
923 @param ec_id: unique id for the job to use when creating a missing UUID
924 @type check_uuid: bool
925 @param check_uuid: add an UUID to the group if it doesn't have one or, if
926 it does, ensure that it does not exist in the
927 configuration already
930 self._UnlockedAddNodeGroup(group, ec_id, check_uuid)
933 def _UnlockedAddNodeGroup(self, group, ec_id, check_uuid):
934 """Add a node group to the configuration.
937 logging.info("Adding node group %s to configuration", group.name)
939 # Some code might need to add a node group with a pre-populated UUID
940 # generated with ConfigWriter.GenerateUniqueID(). We allow them to bypass
941 # the "does this UUID" exist already check.
943 self._EnsureUUID(group, ec_id)
946 existing_uuid = self._UnlockedLookupNodeGroup(group.name)
947 except errors.OpPrereqError:
950 raise errors.OpPrereqError("Desired group name '%s' already exists as a"
951 " node group (UUID: %s)" %
952 (group.name, existing_uuid),
956 group.ctime = group.mtime = time.time()
957 group.UpgradeConfig()
959 self._config_data.nodegroups[group.uuid] = group
960 self._config_data.cluster.serial_no += 1
962 @locking.ssynchronized(_config_lock)
963 def RemoveNodeGroup(self, group_uuid):
964 """Remove a node group from the configuration.
966 @type group_uuid: string
967 @param group_uuid: the UUID of the node group to remove
970 logging.info("Removing node group %s from configuration", group_uuid)
972 if group_uuid not in self._config_data.nodegroups:
973 raise errors.ConfigurationError("Unknown node group '%s'" % group_uuid)
975 assert len(self._config_data.nodegroups) != 1, \
976 "Group '%s' is the only group, cannot be removed" % group_uuid
978 del self._config_data.nodegroups[group_uuid]
979 self._config_data.cluster.serial_no += 1
982 def _UnlockedLookupNodeGroup(self, target):
983 """Lookup a node group's UUID.
985 @type target: string or None
986 @param target: group name or UUID or None to look for the default
988 @return: nodegroup UUID
989 @raises errors.OpPrereqError: when the target group cannot be found
993 if len(self._config_data.nodegroups) != 1:
994 raise errors.OpPrereqError("More than one node group exists. Target"
995 " group must be specified explicitely.")
997 return self._config_data.nodegroups.keys()[0]
998 if target in self._config_data.nodegroups:
1000 for nodegroup in self._config_data.nodegroups.values():
1001 if nodegroup.name == target:
1002 return nodegroup.uuid
1003 raise errors.OpPrereqError("Node group '%s' not found" % target,
1006 @locking.ssynchronized(_config_lock, shared=1)
1007 def LookupNodeGroup(self, target):
1008 """Lookup a node group's UUID.
1010 This function is just a wrapper over L{_UnlockedLookupNodeGroup}.
1012 @type target: string or None
1013 @param target: group name or UUID or None to look for the default
1015 @return: nodegroup UUID
1018 return self._UnlockedLookupNodeGroup(target)
1020 def _UnlockedGetNodeGroup(self, uuid):
1021 """Lookup a node group.
1024 @param uuid: group UUID
1025 @rtype: L{objects.NodeGroup} or None
1026 @return: nodegroup object, or None if not found
1029 if uuid not in self._config_data.nodegroups:
1032 return self._config_data.nodegroups[uuid]
1034 @locking.ssynchronized(_config_lock, shared=1)
1035 def GetNodeGroup(self, uuid):
1036 """Lookup a node group.
1039 @param uuid: group UUID
1040 @rtype: L{objects.NodeGroup} or None
1041 @return: nodegroup object, or None if not found
1044 return self._UnlockedGetNodeGroup(uuid)
1046 @locking.ssynchronized(_config_lock, shared=1)
1047 def GetAllNodeGroupsInfo(self):
1048 """Get the configuration of all node groups.
1051 return dict(self._config_data.nodegroups)
1053 @locking.ssynchronized(_config_lock, shared=1)
1054 def GetNodeGroupList(self):
1055 """Get a list of node groups.
1058 return self._config_data.nodegroups.keys()
1060 @locking.ssynchronized(_config_lock)
1061 def AddInstance(self, instance, ec_id):
1062 """Add an instance to the config.
1064 This should be used after creating a new instance.
1066 @type instance: L{objects.Instance}
1067 @param instance: the instance object
1070 if not isinstance(instance, objects.Instance):
1071 raise errors.ProgrammerError("Invalid type passed to AddInstance")
1073 if instance.disk_template != constants.DT_DISKLESS:
1074 all_lvs = instance.MapLVsByNode()
1075 logging.info("Instance '%s' DISK_LAYOUT: %s", instance.name, all_lvs)
1077 all_macs = self._AllMACs()
1078 for nic in instance.nics:
1079 if nic.mac in all_macs:
1080 raise errors.ConfigurationError("Cannot add instance %s:"
1081 " MAC address '%s' already in use." %
1082 (instance.name, nic.mac))
1084 self._EnsureUUID(instance, ec_id)
1086 instance.serial_no = 1
1087 instance.ctime = instance.mtime = time.time()
1088 self._config_data.instances[instance.name] = instance
1089 self._config_data.cluster.serial_no += 1
1090 self._UnlockedReleaseDRBDMinors(instance.name)
1093 def _EnsureUUID(self, item, ec_id):
1094 """Ensures a given object has a valid UUID.
1096 @param item: the instance or node to be checked
1097 @param ec_id: the execution context id for the uuid reservation
1101 item.uuid = self._GenerateUniqueID(ec_id)
1102 elif item.uuid in self._AllIDs(include_temporary=True):
1103 raise errors.ConfigurationError("Cannot add '%s': UUID %s already"
1104 " in use" % (item.name, item.uuid))
1106 def _SetInstanceStatus(self, instance_name, status):
1107 """Set the instance's status to a given value.
1110 assert isinstance(status, bool), \
1111 "Invalid status '%s' passed to SetInstanceStatus" % (status,)
1113 if instance_name not in self._config_data.instances:
1114 raise errors.ConfigurationError("Unknown instance '%s'" %
1116 instance = self._config_data.instances[instance_name]
1117 if instance.admin_up != status:
1118 instance.admin_up = status
1119 instance.serial_no += 1
1120 instance.mtime = time.time()
1123 @locking.ssynchronized(_config_lock)
1124 def MarkInstanceUp(self, instance_name):
1125 """Mark the instance status to up in the config.
1128 self._SetInstanceStatus(instance_name, True)
1130 @locking.ssynchronized(_config_lock)
1131 def RemoveInstance(self, instance_name):
1132 """Remove the instance from the configuration.
1135 if instance_name not in self._config_data.instances:
1136 raise errors.ConfigurationError("Unknown instance '%s'" % instance_name)
1137 del self._config_data.instances[instance_name]
1138 self._config_data.cluster.serial_no += 1
1141 @locking.ssynchronized(_config_lock)
1142 def RenameInstance(self, old_name, new_name):
1143 """Rename an instance.
1145 This needs to be done in ConfigWriter and not by RemoveInstance
1146 combined with AddInstance as only we can guarantee an atomic
1150 if old_name not in self._config_data.instances:
1151 raise errors.ConfigurationError("Unknown instance '%s'" % old_name)
1152 inst = self._config_data.instances[old_name]
1153 del self._config_data.instances[old_name]
1154 inst.name = new_name
1156 for disk in inst.disks:
1157 if disk.dev_type == constants.LD_FILE:
1158 # rename the file paths in logical and physical id
1159 file_storage_dir = os.path.dirname(os.path.dirname(disk.logical_id[1]))
1160 disk_fname = "disk%s" % disk.iv_name.split("/")[1]
1161 disk.physical_id = disk.logical_id = (disk.logical_id[0],
1162 utils.PathJoin(file_storage_dir,
1166 # Force update of ssconf files
1167 self._config_data.cluster.serial_no += 1
1169 self._config_data.instances[inst.name] = inst
1172 @locking.ssynchronized(_config_lock)
1173 def MarkInstanceDown(self, instance_name):
1174 """Mark the status of an instance to down in the configuration.
1177 self._SetInstanceStatus(instance_name, False)
1179 def _UnlockedGetInstanceList(self):
1180 """Get the list of instances.
1182 This function is for internal use, when the config lock is already held.
1185 return self._config_data.instances.keys()
1187 @locking.ssynchronized(_config_lock, shared=1)
1188 def GetInstanceList(self):
1189 """Get the list of instances.
1191 @return: array of instances, ex. ['instance2.example.com',
1192 'instance1.example.com']
1195 return self._UnlockedGetInstanceList()
1197 @locking.ssynchronized(_config_lock, shared=1)
1198 def ExpandInstanceName(self, short_name):
1199 """Attempt to expand an incomplete instance name.
1202 return utils.MatchNameComponent(short_name,
1203 self._config_data.instances.keys(),
1204 case_sensitive=False)
1206 def _UnlockedGetInstanceInfo(self, instance_name):
1207 """Returns information about an instance.
1209 This function is for internal use, when the config lock is already held.
1212 if instance_name not in self._config_data.instances:
1215 return self._config_data.instances[instance_name]
1217 @locking.ssynchronized(_config_lock, shared=1)
1218 def GetInstanceInfo(self, instance_name):
1219 """Returns information about an instance.
1221 It takes the information from the configuration file. Other information of
1222 an instance are taken from the live systems.
1224 @param instance_name: name of the instance, e.g.
1225 I{instance1.example.com}
1227 @rtype: L{objects.Instance}
1228 @return: the instance object
1231 return self._UnlockedGetInstanceInfo(instance_name)
1233 @locking.ssynchronized(_config_lock, shared=1)
1234 def GetAllInstancesInfo(self):
1235 """Get the configuration of all instances.
1238 @return: dict of (instance, instance_info), where instance_info is what
1239 would GetInstanceInfo return for the node
1242 my_dict = dict([(instance, self._UnlockedGetInstanceInfo(instance))
1243 for instance in self._UnlockedGetInstanceList()])
1246 @locking.ssynchronized(_config_lock)
1247 def AddNode(self, node, ec_id):
1248 """Add a node to the configuration.
1250 @type node: L{objects.Node}
1251 @param node: a Node instance
1254 logging.info("Adding node %s to configuration", node.name)
1256 self._EnsureUUID(node, ec_id)
1259 node.ctime = node.mtime = time.time()
1260 self._UnlockedAddNodeToGroup(node.name, node.group)
1261 self._config_data.nodes[node.name] = node
1262 self._config_data.cluster.serial_no += 1
1265 @locking.ssynchronized(_config_lock)
1266 def RemoveNode(self, node_name):
1267 """Remove a node from the configuration.
1270 logging.info("Removing node %s from configuration", node_name)
1272 if node_name not in self._config_data.nodes:
1273 raise errors.ConfigurationError("Unknown node '%s'" % node_name)
1275 self._UnlockedRemoveNodeFromGroup(self._config_data.nodes[node_name])
1276 del self._config_data.nodes[node_name]
1277 self._config_data.cluster.serial_no += 1
1280 @locking.ssynchronized(_config_lock, shared=1)
1281 def ExpandNodeName(self, short_name):
1282 """Attempt to expand an incomplete instance name.
1285 return utils.MatchNameComponent(short_name,
1286 self._config_data.nodes.keys(),
1287 case_sensitive=False)
1289 def _UnlockedGetNodeInfo(self, node_name):
1290 """Get the configuration of a node, as stored in the config.
1292 This function is for internal use, when the config lock is already
1295 @param node_name: the node name, e.g. I{node1.example.com}
1297 @rtype: L{objects.Node}
1298 @return: the node object
1301 if node_name not in self._config_data.nodes:
1304 return self._config_data.nodes[node_name]
1306 @locking.ssynchronized(_config_lock, shared=1)
1307 def GetNodeInfo(self, node_name):
1308 """Get the configuration of a node, as stored in the config.
1310 This is just a locked wrapper over L{_UnlockedGetNodeInfo}.
1312 @param node_name: the node name, e.g. I{node1.example.com}
1314 @rtype: L{objects.Node}
1315 @return: the node object
1318 return self._UnlockedGetNodeInfo(node_name)
1320 @locking.ssynchronized(_config_lock, shared=1)
1321 def GetNodeInstances(self, node_name):
1322 """Get the instances of a node, as stored in the config.
1324 @param node_name: the node name, e.g. I{node1.example.com}
1326 @rtype: (list, list)
1327 @return: a tuple with two lists: the primary and the secondary instances
1332 for inst in self._config_data.instances.values():
1333 if inst.primary_node == node_name:
1334 pri.append(inst.name)
1335 if node_name in inst.secondary_nodes:
1336 sec.append(inst.name)
1339 def _UnlockedGetNodeList(self):
1340 """Return the list of nodes which are in the configuration.
1342 This function is for internal use, when the config lock is already
1348 return self._config_data.nodes.keys()
1350 @locking.ssynchronized(_config_lock, shared=1)
1351 def GetNodeList(self):
1352 """Return the list of nodes which are in the configuration.
1355 return self._UnlockedGetNodeList()
1357 def _UnlockedGetOnlineNodeList(self):
1358 """Return the list of nodes which are online.
1361 all_nodes = [self._UnlockedGetNodeInfo(node)
1362 for node in self._UnlockedGetNodeList()]
1363 return [node.name for node in all_nodes if not node.offline]
1365 @locking.ssynchronized(_config_lock, shared=1)
1366 def GetOnlineNodeList(self):
1367 """Return the list of nodes which are online.
1370 return self._UnlockedGetOnlineNodeList()
1372 @locking.ssynchronized(_config_lock, shared=1)
1373 def GetVmCapableNodeList(self):
1374 """Return the list of nodes which are not vm capable.
1377 all_nodes = [self._UnlockedGetNodeInfo(node)
1378 for node in self._UnlockedGetNodeList()]
1379 return [node.name for node in all_nodes if node.vm_capable]
1381 @locking.ssynchronized(_config_lock, shared=1)
1382 def GetNonVmCapableNodeList(self):
1383 """Return the list of nodes which are not vm capable.
1386 all_nodes = [self._UnlockedGetNodeInfo(node)
1387 for node in self._UnlockedGetNodeList()]
1388 return [node.name for node in all_nodes if not node.vm_capable]
1390 @locking.ssynchronized(_config_lock, shared=1)
1391 def GetAllNodesInfo(self):
1392 """Get the configuration of all nodes.
1395 @return: dict of (node, node_info), where node_info is what
1396 would GetNodeInfo return for the node
1399 my_dict = dict([(node, self._UnlockedGetNodeInfo(node))
1400 for node in self._UnlockedGetNodeList()])
1403 def _UnlockedGetMasterCandidateStats(self, exceptions=None):
1404 """Get the number of current and maximum desired and possible candidates.
1406 @type exceptions: list
1407 @param exceptions: if passed, list of nodes that should be ignored
1409 @return: tuple of (current, desired and possible, possible)
1412 mc_now = mc_should = mc_max = 0
1413 for node in self._config_data.nodes.values():
1414 if exceptions and node.name in exceptions:
1416 if not (node.offline or node.drained) and node.master_capable:
1418 if node.master_candidate:
1420 mc_should = min(mc_max, self._config_data.cluster.candidate_pool_size)
1421 return (mc_now, mc_should, mc_max)
1423 @locking.ssynchronized(_config_lock, shared=1)
1424 def GetMasterCandidateStats(self, exceptions=None):
1425 """Get the number of current and maximum possible candidates.
1427 This is just a wrapper over L{_UnlockedGetMasterCandidateStats}.
1429 @type exceptions: list
1430 @param exceptions: if passed, list of nodes that should be ignored
1432 @return: tuple of (current, max)
1435 return self._UnlockedGetMasterCandidateStats(exceptions)
1437 @locking.ssynchronized(_config_lock)
1438 def MaintainCandidatePool(self, exceptions):
1439 """Try to grow the candidate pool to the desired size.
1441 @type exceptions: list
1442 @param exceptions: if passed, list of nodes that should be ignored
1444 @return: list with the adjusted nodes (L{objects.Node} instances)
1447 mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats(exceptions)
1450 node_list = self._config_data.nodes.keys()
1451 random.shuffle(node_list)
1452 for name in node_list:
1453 if mc_now >= mc_max:
1455 node = self._config_data.nodes[name]
1456 if (node.master_candidate or node.offline or node.drained or
1457 node.name in exceptions or not node.master_capable):
1459 mod_list.append(node)
1460 node.master_candidate = True
1463 if mc_now != mc_max:
1464 # this should not happen
1465 logging.warning("Warning: MaintainCandidatePool didn't manage to"
1466 " fill the candidate pool (%d/%d)", mc_now, mc_max)
1468 self._config_data.cluster.serial_no += 1
1473 def _UnlockedAddNodeToGroup(self, node_name, nodegroup_uuid):
1474 """Add a given node to the specified group.
1477 if nodegroup_uuid not in self._config_data.nodegroups:
1478 # This can happen if a node group gets deleted between its lookup and
1479 # when we're adding the first node to it, since we don't keep a lock in
1480 # the meantime. It's ok though, as we'll fail cleanly if the node group
1481 # is not found anymore.
1482 raise errors.OpExecError("Unknown node group: %s" % nodegroup_uuid)
1483 if node_name not in self._config_data.nodegroups[nodegroup_uuid].members:
1484 self._config_data.nodegroups[nodegroup_uuid].members.append(node_name)
1486 def _UnlockedRemoveNodeFromGroup(self, node):
1487 """Remove a given node from its group.
1490 nodegroup = node.group
1491 if nodegroup not in self._config_data.nodegroups:
1492 logging.warning("Warning: node '%s' has unknown node group '%s'"
1493 " (while being removed from it)", node.name, nodegroup)
1494 nodegroup_obj = self._config_data.nodegroups[nodegroup]
1495 if node.name not in nodegroup_obj.members:
1496 logging.warning("Warning: node '%s' not a member of its node group '%s'"
1497 " (while being removed from it)", node.name, nodegroup)
1499 nodegroup_obj.members.remove(node.name)
1501 def _BumpSerialNo(self):
1502 """Bump up the serial number of the config.
1505 self._config_data.serial_no += 1
1506 self._config_data.mtime = time.time()
1508 def _AllUUIDObjects(self):
1509 """Returns all objects with uuid attributes.
1512 return (self._config_data.instances.values() +
1513 self._config_data.nodes.values() +
1514 self._config_data.nodegroups.values() +
1515 [self._config_data.cluster])
1517 def _OpenConfig(self, accept_foreign):
1518 """Read the config data from disk.
1521 raw_data = utils.ReadFile(self._cfg_file)
1524 data = objects.ConfigData.FromDict(serializer.Load(raw_data))
1525 except Exception, err:
1526 raise errors.ConfigurationError(err)
1528 # Make sure the configuration has the right version
1529 _ValidateConfig(data)
1531 if (not hasattr(data, 'cluster') or
1532 not hasattr(data.cluster, 'rsahostkeypub')):
1533 raise errors.ConfigurationError("Incomplete configuration"
1534 " (missing cluster.rsahostkeypub)")
1536 if data.cluster.master_node != self._my_hostname and not accept_foreign:
1537 msg = ("The configuration denotes node %s as master, while my"
1538 " hostname is %s; opening a foreign configuration is only"
1539 " possible in accept_foreign mode" %
1540 (data.cluster.master_node, self._my_hostname))
1541 raise errors.ConfigurationError(msg)
1543 # Upgrade configuration if needed
1544 data.UpgradeConfig()
1546 self._config_data = data
1547 # reset the last serial as -1 so that the next write will cause
1549 self._last_cluster_serial = -1
1551 # And finally run our (custom) config upgrade sequence
1552 self._UpgradeConfig()
1554 self._cfg_id = utils.GetFileID(path=self._cfg_file)
1556 def _UpgradeConfig(self):
1557 """Run upgrade steps that cannot be done purely in the objects.
1559 This is because some data elements need uniqueness across the
1560 whole configuration, etc.
1562 @warning: this function will call L{_WriteConfig()}, but also
1563 L{DropECReservations} so it needs to be called only from a
1564 "safe" place (the constructor). If one wanted to call it with
1565 the lock held, a DropECReservationUnlocked would need to be
1566 created first, to avoid causing deadlock.
1570 for item in self._AllUUIDObjects():
1571 if item.uuid is None:
1572 item.uuid = self._GenerateUniqueID(_UPGRADE_CONFIG_JID)
1574 if not self._config_data.nodegroups:
1575 default_nodegroup_name = constants.INITIAL_NODE_GROUP_NAME
1576 default_nodegroup = objects.NodeGroup(name=default_nodegroup_name,
1578 self._UnlockedAddNodeGroup(default_nodegroup, _UPGRADE_CONFIG_JID, True)
1580 for node in self._config_data.nodes.values():
1582 node.group = self.LookupNodeGroup(None)
1584 # This is technically *not* an upgrade, but needs to be done both when
1585 # nodegroups are being added, and upon normally loading the config,
1586 # because the members list of a node group is discarded upon
1587 # serializing/deserializing the object.
1588 self._UnlockedAddNodeToGroup(node.name, node.group)
1591 # This is ok even if it acquires the internal lock, as _UpgradeConfig is
1592 # only called at config init time, without the lock held
1593 self.DropECReservations(_UPGRADE_CONFIG_JID)
1595 def _DistributeConfig(self, feedback_fn):
1596 """Distribute the configuration to the other nodes.
1598 Currently, this only copies the configuration file. In the future,
1599 it could be used to encapsulate the 2/3-phase update mechanism.
1609 myhostname = self._my_hostname
1610 # we can skip checking whether _UnlockedGetNodeInfo returns None
1611 # since the node list comes from _UnlocketGetNodeList, and we are
1612 # called with the lock held, so no modifications should take place
1614 for node_name in self._UnlockedGetNodeList():
1615 if node_name == myhostname:
1617 node_info = self._UnlockedGetNodeInfo(node_name)
1618 if not node_info.master_candidate:
1620 node_list.append(node_info.name)
1621 addr_list.append(node_info.primary_ip)
1623 result = rpc.RpcRunner.call_upload_file(node_list, self._cfg_file,
1624 address_list=addr_list)
1625 for to_node, to_result in result.items():
1626 msg = to_result.fail_msg
1628 msg = ("Copy of file %s to node %s failed: %s" %
1629 (self._cfg_file, to_node, msg))
1639 def _WriteConfig(self, destination=None, feedback_fn=None):
1640 """Write the configuration data to persistent storage.
1643 assert feedback_fn is None or callable(feedback_fn)
1645 # Warn on config errors, but don't abort the save - the
1646 # configuration has already been modified, and we can't revert;
1647 # the best we can do is to warn the user and save as is, leaving
1648 # recovery to the user
1649 config_errors = self._UnlockedVerifyConfig()
1651 errmsg = ("Configuration data is not consistent: %s" %
1652 (utils.CommaJoin(config_errors)))
1653 logging.critical(errmsg)
1657 if destination is None:
1658 destination = self._cfg_file
1659 self._BumpSerialNo()
1660 txt = serializer.Dump(self._config_data.ToDict())
1662 getents = self._getents()
1664 fd = utils.SafeWriteFile(destination, self._cfg_id, data=txt,
1665 close=False, gid=getents.confd_gid, mode=0640)
1666 except errors.LockError:
1667 raise errors.ConfigurationError("The configuration file has been"
1668 " modified since the last write, cannot"
1671 self._cfg_id = utils.GetFileID(fd=fd)
1675 self.write_count += 1
1677 # and redistribute the config file to master candidates
1678 self._DistributeConfig(feedback_fn)
1680 # Write ssconf files on all nodes (including locally)
1681 if self._last_cluster_serial < self._config_data.cluster.serial_no:
1682 if not self._offline:
1683 result = rpc.RpcRunner.call_write_ssconf_files(
1684 self._UnlockedGetOnlineNodeList(),
1685 self._UnlockedGetSsconfValues())
1687 for nname, nresu in result.items():
1688 msg = nresu.fail_msg
1690 errmsg = ("Error while uploading ssconf files to"
1691 " node %s: %s" % (nname, msg))
1692 logging.warning(errmsg)
1697 self._last_cluster_serial = self._config_data.cluster.serial_no
1699 def _UnlockedGetSsconfValues(self):
1700 """Return the values needed by ssconf.
1703 @return: a dictionary with keys the ssconf names and values their
1708 instance_names = utils.NiceSort(self._UnlockedGetInstanceList())
1709 node_names = utils.NiceSort(self._UnlockedGetNodeList())
1710 node_info = [self._UnlockedGetNodeInfo(name) for name in node_names]
1711 node_pri_ips = ["%s %s" % (ninfo.name, ninfo.primary_ip)
1712 for ninfo in node_info]
1713 node_snd_ips = ["%s %s" % (ninfo.name, ninfo.secondary_ip)
1714 for ninfo in node_info]
1716 instance_data = fn(instance_names)
1717 off_data = fn(node.name for node in node_info if node.offline)
1718 on_data = fn(node.name for node in node_info if not node.offline)
1719 mc_data = fn(node.name for node in node_info if node.master_candidate)
1720 mc_ips_data = fn(node.primary_ip for node in node_info
1721 if node.master_candidate)
1722 node_data = fn(node_names)
1723 node_pri_ips_data = fn(node_pri_ips)
1724 node_snd_ips_data = fn(node_snd_ips)
1726 cluster = self._config_data.cluster
1727 cluster_tags = fn(cluster.GetTags())
1729 hypervisor_list = fn(cluster.enabled_hypervisors)
1731 uid_pool = uidpool.FormatUidPool(cluster.uid_pool, separator="\n")
1733 nodegroups = ["%s %s" % (nodegroup.uuid, nodegroup.name) for nodegroup in
1734 self._config_data.nodegroups.values()]
1735 nodegroups_data = fn(utils.NiceSort(nodegroups))
1738 constants.SS_CLUSTER_NAME: cluster.cluster_name,
1739 constants.SS_CLUSTER_TAGS: cluster_tags,
1740 constants.SS_FILE_STORAGE_DIR: cluster.file_storage_dir,
1741 constants.SS_MASTER_CANDIDATES: mc_data,
1742 constants.SS_MASTER_CANDIDATES_IPS: mc_ips_data,
1743 constants.SS_MASTER_IP: cluster.master_ip,
1744 constants.SS_MASTER_NETDEV: cluster.master_netdev,
1745 constants.SS_MASTER_NODE: cluster.master_node,
1746 constants.SS_NODE_LIST: node_data,
1747 constants.SS_NODE_PRIMARY_IPS: node_pri_ips_data,
1748 constants.SS_NODE_SECONDARY_IPS: node_snd_ips_data,
1749 constants.SS_OFFLINE_NODES: off_data,
1750 constants.SS_ONLINE_NODES: on_data,
1751 constants.SS_PRIMARY_IP_FAMILY: str(cluster.primary_ip_family),
1752 constants.SS_INSTANCE_LIST: instance_data,
1753 constants.SS_RELEASE_VERSION: constants.RELEASE_VERSION,
1754 constants.SS_HYPERVISOR_LIST: hypervisor_list,
1755 constants.SS_MAINTAIN_NODE_HEALTH: str(cluster.maintain_node_health),
1756 constants.SS_UID_POOL: uid_pool,
1757 constants.SS_NODEGROUPS: nodegroups_data,
1760 @locking.ssynchronized(_config_lock, shared=1)
1761 def GetSsconfValues(self):
1762 """Wrapper using lock around _UnlockedGetSsconf().
1765 return self._UnlockedGetSsconfValues()
1767 @locking.ssynchronized(_config_lock, shared=1)
1768 def GetVGName(self):
1769 """Return the volume group name.
1772 return self._config_data.cluster.volume_group_name
1774 @locking.ssynchronized(_config_lock)
1775 def SetVGName(self, vg_name):
1776 """Set the volume group name.
1779 self._config_data.cluster.volume_group_name = vg_name
1780 self._config_data.cluster.serial_no += 1
1783 @locking.ssynchronized(_config_lock, shared=1)
1784 def GetDRBDHelper(self):
1785 """Return DRBD usermode helper.
1788 return self._config_data.cluster.drbd_usermode_helper
1790 @locking.ssynchronized(_config_lock)
1791 def SetDRBDHelper(self, drbd_helper):
1792 """Set DRBD usermode helper.
1795 self._config_data.cluster.drbd_usermode_helper = drbd_helper
1796 self._config_data.cluster.serial_no += 1
1799 @locking.ssynchronized(_config_lock, shared=1)
1800 def GetMACPrefix(self):
1801 """Return the mac prefix.
1804 return self._config_data.cluster.mac_prefix
1806 @locking.ssynchronized(_config_lock, shared=1)
1807 def GetClusterInfo(self):
1808 """Returns information about the cluster
1810 @rtype: L{objects.Cluster}
1811 @return: the cluster object
1814 return self._config_data.cluster
1816 @locking.ssynchronized(_config_lock, shared=1)
1817 def HasAnyDiskOfType(self, dev_type):
1818 """Check if in there is at disk of the given type in the configuration.
1821 return self._config_data.HasAnyDiskOfType(dev_type)
1823 @locking.ssynchronized(_config_lock)
1824 def Update(self, target, feedback_fn):
1825 """Notify function to be called after updates.
1827 This function must be called when an object (as returned by
1828 GetInstanceInfo, GetNodeInfo, GetCluster) has been updated and the
1829 caller wants the modifications saved to the backing store. Note
1830 that all modified objects will be saved, but the target argument
1831 is the one the caller wants to ensure that it's saved.
1833 @param target: an instance of either L{objects.Cluster},
1834 L{objects.Node} or L{objects.Instance} which is existing in
1836 @param feedback_fn: Callable feedback function
1839 if self._config_data is None:
1840 raise errors.ProgrammerError("Configuration file not read,"
1842 update_serial = False
1843 if isinstance(target, objects.Cluster):
1844 test = target == self._config_data.cluster
1845 elif isinstance(target, objects.Node):
1846 test = target in self._config_data.nodes.values()
1847 update_serial = True
1848 elif isinstance(target, objects.Instance):
1849 test = target in self._config_data.instances.values()
1850 elif isinstance(target, objects.NodeGroup):
1851 test = target in self._config_data.nodegroups.values()
1853 raise errors.ProgrammerError("Invalid object type (%s) passed to"
1854 " ConfigWriter.Update" % type(target))
1856 raise errors.ConfigurationError("Configuration updated since object"
1857 " has been read or unknown object")
1858 target.serial_no += 1
1859 target.mtime = now = time.time()
1862 # for node updates, we need to increase the cluster serial too
1863 self._config_data.cluster.serial_no += 1
1864 self._config_data.cluster.mtime = now
1866 if isinstance(target, objects.Instance):
1867 self._UnlockedReleaseDRBDMinors(target.name)
1869 self._WriteConfig(feedback_fn=feedback_fn)
1871 @locking.ssynchronized(_config_lock)
1872 def DropECReservations(self, ec_id):
1873 """Drop per-execution-context reservations
1876 for rm in self._all_rms:
1877 rm.DropECReservations(ec_id)