4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Configuration management for Ganeti
24 This module provides the interface to the Ganeti cluster configuration.
26 The configuration data is stored on every node but is updated on the master
27 only. After each update, the master distributes the data to the other nodes.
29 Currently, the data storage format is JSON. YAML was slow and consuming too
34 # pylint: disable-msg=R0904
35 # R0904: Too many public methods
42 from ganeti import errors
43 from ganeti import locking
44 from ganeti import utils
45 from ganeti import constants
46 from ganeti import rpc
47 from ganeti import objects
48 from ganeti import serializer
49 from ganeti import uidpool
50 from ganeti import netutils
51 from ganeti import runtime
54 _config_lock = locking.SharedLock("ConfigWriter")
56 # job id used for resource management at config upgrade time
57 _UPGRADE_CONFIG_JID = "jid-cfg-upgrade"
60 def _ValidateConfig(data):
61 """Verifies that a configuration objects looks valid.
63 This only verifies the version of the configuration.
65 @raise errors.ConfigurationError: if the version differs from what
69 if data.version != constants.CONFIG_VERSION:
70 raise errors.ConfigVersionMismatch(constants.CONFIG_VERSION, data.version)
73 class TemporaryReservationManager:
74 """A temporary resource reservation manager.
76 This is used to reserve resources in a job, before using them, making sure
77 other jobs cannot get them in the meantime.
81 self._ec_reserved = {}
83 def Reserved(self, resource):
84 for holder_reserved in self._ec_reserved.values():
85 if resource in holder_reserved:
89 def Reserve(self, ec_id, resource):
90 if self.Reserved(resource):
91 raise errors.ReservationError("Duplicate reservation for resource '%s'"
93 if ec_id not in self._ec_reserved:
94 self._ec_reserved[ec_id] = set([resource])
96 self._ec_reserved[ec_id].add(resource)
98 def DropECReservations(self, ec_id):
99 if ec_id in self._ec_reserved:
100 del self._ec_reserved[ec_id]
102 def GetReserved(self):
104 for holder_reserved in self._ec_reserved.values():
105 all_reserved.update(holder_reserved)
108 def Generate(self, existing, generate_one_fn, ec_id):
109 """Generate a new resource of this type
112 assert callable(generate_one_fn)
114 all_elems = self.GetReserved()
115 all_elems.update(existing)
118 new_resource = generate_one_fn()
119 if new_resource is not None and new_resource not in all_elems:
122 raise errors.ConfigurationError("Not able generate new resource"
123 " (last tried: %s)" % new_resource)
124 self.Reserve(ec_id, new_resource)
129 """The interface to the cluster configuration.
131 @ivar _temporary_lvs: reservation manager for temporary LVs
132 @ivar _all_rms: a list of all temporary reservation managers
135 def __init__(self, cfg_file=None, offline=False, _getents=runtime.GetEnts,
136 accept_foreign=False):
138 self._lock = _config_lock
139 self._config_data = None
140 self._offline = offline
142 self._cfg_file = constants.CLUSTER_CONF_FILE
144 self._cfg_file = cfg_file
145 self._getents = _getents
146 self._temporary_ids = TemporaryReservationManager()
147 self._temporary_drbds = {}
148 self._temporary_macs = TemporaryReservationManager()
149 self._temporary_secrets = TemporaryReservationManager()
150 self._temporary_lvs = TemporaryReservationManager()
151 self._all_rms = [self._temporary_ids, self._temporary_macs,
152 self._temporary_secrets, self._temporary_lvs]
153 # Note: in order to prevent errors when resolving our name in
154 # _DistributeConfig, we compute it here once and reuse it; it's
155 # better to raise an error before starting to modify the config
156 # file than after it was modified
157 self._my_hostname = netutils.Hostname.GetSysName()
158 self._last_cluster_serial = -1
160 self._OpenConfig(accept_foreign)
162 # this method needs to be static, so that we can call it on the class
165 """Check if the cluster is configured.
168 return os.path.exists(constants.CLUSTER_CONF_FILE)
170 def _GenerateOneMAC(self):
171 """Generate one mac address
174 prefix = self._config_data.cluster.mac_prefix
175 byte1 = random.randrange(0, 256)
176 byte2 = random.randrange(0, 256)
177 byte3 = random.randrange(0, 256)
178 mac = "%s:%02x:%02x:%02x" % (prefix, byte1, byte2, byte3)
181 @locking.ssynchronized(_config_lock, shared=1)
182 def GetNdParams(self, node):
183 """Get the node params populated with cluster defaults.
185 @type node: L{object.Node}
186 @param node: The node we want to know the params for
187 @return: A dict with the filled in node params
190 nodegroup = self._UnlockedGetNodeGroup(node.group)
191 return self._config_data.cluster.FillND(node, nodegroup)
193 @locking.ssynchronized(_config_lock, shared=1)
194 def GenerateMAC(self, ec_id):
195 """Generate a MAC for an instance.
197 This should check the current instances for duplicates.
200 existing = self._AllMACs()
201 return self._temporary_ids.Generate(existing, self._GenerateOneMAC, ec_id)
203 @locking.ssynchronized(_config_lock, shared=1)
204 def ReserveMAC(self, mac, ec_id):
205 """Reserve a MAC for an instance.
207 This only checks instances managed by this cluster, it does not
208 check for potential collisions elsewhere.
211 all_macs = self._AllMACs()
213 raise errors.ReservationError("mac already in use")
215 self._temporary_macs.Reserve(mac, ec_id)
217 @locking.ssynchronized(_config_lock, shared=1)
218 def ReserveLV(self, lv_name, ec_id):
219 """Reserve an VG/LV pair for an instance.
221 @type lv_name: string
222 @param lv_name: the logical volume name to reserve
225 all_lvs = self._AllLVs()
226 if lv_name in all_lvs:
227 raise errors.ReservationError("LV already in use")
229 self._temporary_lvs.Reserve(lv_name, ec_id)
231 @locking.ssynchronized(_config_lock, shared=1)
232 def GenerateDRBDSecret(self, ec_id):
233 """Generate a DRBD secret.
235 This checks the current disks for duplicates.
238 return self._temporary_secrets.Generate(self._AllDRBDSecrets(),
239 utils.GenerateSecret,
243 """Compute the list of all LVs.
247 for instance in self._config_data.instances.values():
248 node_data = instance.MapLVsByNode()
249 for lv_list in node_data.values():
250 lvnames.update(lv_list)
253 def _AllIDs(self, include_temporary):
254 """Compute the list of all UUIDs and names we have.
256 @type include_temporary: boolean
257 @param include_temporary: whether to include the _temporary_ids set
259 @return: a set of IDs
263 if include_temporary:
264 existing.update(self._temporary_ids.GetReserved())
265 existing.update(self._AllLVs())
266 existing.update(self._config_data.instances.keys())
267 existing.update(self._config_data.nodes.keys())
268 existing.update([i.uuid for i in self._AllUUIDObjects() if i.uuid])
271 def _GenerateUniqueID(self, ec_id):
272 """Generate an unique UUID.
274 This checks the current node, instances and disk names for
278 @return: the unique id
281 existing = self._AllIDs(include_temporary=False)
282 return self._temporary_ids.Generate(existing, utils.NewUUID, ec_id)
284 @locking.ssynchronized(_config_lock, shared=1)
285 def GenerateUniqueID(self, ec_id):
286 """Generate an unique ID.
288 This is just a wrapper over the unlocked version.
291 @param ec_id: unique id for the job to reserve the id to
294 return self._GenerateUniqueID(ec_id)
297 """Return all MACs present in the config.
300 @return: the list of all MACs
304 for instance in self._config_data.instances.values():
305 for nic in instance.nics:
306 result.append(nic.mac)
310 def _AllDRBDSecrets(self):
311 """Return all DRBD secrets present in the config.
314 @return: the list of all DRBD secrets
317 def helper(disk, result):
318 """Recursively gather secrets from this disk."""
319 if disk.dev_type == constants.DT_DRBD8:
320 result.append(disk.logical_id[5])
322 for child in disk.children:
323 helper(child, result)
326 for instance in self._config_data.instances.values():
327 for disk in instance.disks:
332 def _CheckDiskIDs(self, disk, l_ids, p_ids):
333 """Compute duplicate disk IDs
335 @type disk: L{objects.Disk}
336 @param disk: the disk at which to start searching
338 @param l_ids: list of current logical ids
340 @param p_ids: list of current physical ids
342 @return: a list of error messages
346 if disk.logical_id is not None:
347 if disk.logical_id in l_ids:
348 result.append("duplicate logical id %s" % str(disk.logical_id))
350 l_ids.append(disk.logical_id)
351 if disk.physical_id is not None:
352 if disk.physical_id in p_ids:
353 result.append("duplicate physical id %s" % str(disk.physical_id))
355 p_ids.append(disk.physical_id)
358 for child in disk.children:
359 result.extend(self._CheckDiskIDs(child, l_ids, p_ids))
362 def _UnlockedVerifyConfig(self):
366 @return: a list of error messages; a non-empty list signifies
370 # pylint: disable-msg=R0914
374 data = self._config_data
375 cluster = data.cluster
379 # global cluster checks
380 if not cluster.enabled_hypervisors:
381 result.append("enabled hypervisors list doesn't have any entries")
382 invalid_hvs = set(cluster.enabled_hypervisors) - constants.HYPER_TYPES
384 result.append("enabled hypervisors contains invalid entries: %s" %
386 missing_hvp = (set(cluster.enabled_hypervisors) -
387 set(cluster.hvparams.keys()))
389 result.append("hypervisor parameters missing for the enabled"
390 " hypervisor(s) %s" % utils.CommaJoin(missing_hvp))
392 if cluster.master_node not in data.nodes:
393 result.append("cluster has invalid primary node '%s'" %
396 def _helper(owner, attr, value, template):
398 utils.ForceDictType(value, template)
399 except errors.GenericError, err:
400 result.append("%s has invalid %s: %s" % (owner, attr, err))
402 def _helper_nic(owner, params):
404 objects.NIC.CheckParameterSyntax(params)
405 except errors.ConfigurationError, err:
406 result.append("%s has invalid nicparams: %s" % (owner, err))
408 # check cluster parameters
409 _helper("cluster", "beparams", cluster.SimpleFillBE({}),
410 constants.BES_PARAMETER_TYPES)
411 _helper("cluster", "nicparams", cluster.SimpleFillNIC({}),
412 constants.NICS_PARAMETER_TYPES)
413 _helper_nic("cluster", cluster.SimpleFillNIC({}))
414 _helper("cluster", "ndparams", cluster.SimpleFillND({}),
415 constants.NDS_PARAMETER_TYPES)
417 # per-instance checks
418 for instance_name in data.instances:
419 instance = data.instances[instance_name]
420 if instance.name != instance_name:
421 result.append("instance '%s' is indexed by wrong name '%s'" %
422 (instance.name, instance_name))
423 if instance.primary_node not in data.nodes:
424 result.append("instance '%s' has invalid primary node '%s'" %
425 (instance_name, instance.primary_node))
426 for snode in instance.secondary_nodes:
427 if snode not in data.nodes:
428 result.append("instance '%s' has invalid secondary node '%s'" %
429 (instance_name, snode))
430 for idx, nic in enumerate(instance.nics):
431 if nic.mac in seen_macs:
432 result.append("instance '%s' has NIC %d mac %s duplicate" %
433 (instance_name, idx, nic.mac))
435 seen_macs.append(nic.mac)
437 filled = cluster.SimpleFillNIC(nic.nicparams)
438 owner = "instance %s nic %d" % (instance.name, idx)
439 _helper(owner, "nicparams",
440 filled, constants.NICS_PARAMETER_TYPES)
441 _helper_nic(owner, filled)
444 if instance.beparams:
445 _helper("instance %s" % instance.name, "beparams",
446 cluster.FillBE(instance), constants.BES_PARAMETER_TYPES)
448 # gather the drbd ports for duplicate checks
449 for dsk in instance.disks:
450 if dsk.dev_type in constants.LDS_DRBD:
451 tcp_port = dsk.logical_id[2]
452 if tcp_port not in ports:
454 ports[tcp_port].append((instance.name, "drbd disk %s" % dsk.iv_name))
455 # gather network port reservation
456 net_port = getattr(instance, "network_port", None)
457 if net_port is not None:
458 if net_port not in ports:
460 ports[net_port].append((instance.name, "network port"))
462 # instance disk verify
463 for idx, disk in enumerate(instance.disks):
464 result.extend(["instance '%s' disk %d error: %s" %
465 (instance.name, idx, msg) for msg in disk.Verify()])
466 result.extend(self._CheckDiskIDs(disk, seen_lids, seen_pids))
468 # cluster-wide pool of free ports
469 for free_port in cluster.tcpudp_port_pool:
470 if free_port not in ports:
471 ports[free_port] = []
472 ports[free_port].append(("cluster", "port marked as free"))
474 # compute tcp/udp duplicate ports
480 txt = utils.CommaJoin(["%s/%s" % val for val in pdata])
481 result.append("tcp/udp port %s has duplicates: %s" % (pnum, txt))
483 # highest used tcp port check
485 if keys[-1] > cluster.highest_used_port:
486 result.append("Highest used port mismatch, saved %s, computed %s" %
487 (cluster.highest_used_port, keys[-1]))
489 if not data.nodes[cluster.master_node].master_candidate:
490 result.append("Master node is not a master candidate")
492 # master candidate checks
493 mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats()
495 result.append("Not enough master candidates: actual %d, target %d" %
499 for node_name, node in data.nodes.items():
500 if node.name != node_name:
501 result.append("Node '%s' is indexed by wrong name '%s'" %
502 (node.name, node_name))
503 if [node.master_candidate, node.drained, node.offline].count(True) > 1:
504 result.append("Node %s state is invalid: master_candidate=%s,"
505 " drain=%s, offline=%s" %
506 (node.name, node.master_candidate, node.drained,
508 if node.group not in data.nodegroups:
509 result.append("Node '%s' has invalid group '%s'" %
510 (node.name, node.group))
512 _helper("node %s" % node.name, "ndparams",
513 cluster.FillND(node, data.nodegroups[node.group]),
514 constants.NDS_PARAMETER_TYPES)
517 nodegroups_names = set()
518 for nodegroup_uuid in data.nodegroups:
519 nodegroup = data.nodegroups[nodegroup_uuid]
520 if nodegroup.uuid != nodegroup_uuid:
521 result.append("node group '%s' (uuid: '%s') indexed by wrong uuid '%s'"
522 % (nodegroup.name, nodegroup.uuid, nodegroup_uuid))
523 if utils.UUID_RE.match(nodegroup.name.lower()):
524 result.append("node group '%s' (uuid: '%s') has uuid-like name" %
525 (nodegroup.name, nodegroup.uuid))
526 if nodegroup.name in nodegroups_names:
527 result.append("duplicate node group name '%s'" % nodegroup.name)
529 nodegroups_names.add(nodegroup.name)
530 if nodegroup.ndparams:
531 _helper("group %s" % nodegroup.name, "ndparams",
532 cluster.SimpleFillND(nodegroup.ndparams),
533 constants.NDS_PARAMETER_TYPES)
537 _, duplicates = self._UnlockedComputeDRBDMap()
538 for node, minor, instance_a, instance_b in duplicates:
539 result.append("DRBD minor %d on node %s is assigned twice to instances"
540 " %s and %s" % (minor, node, instance_a, instance_b))
543 default_nicparams = cluster.nicparams[constants.PP_DEFAULT]
546 def _AddIpAddress(ip, name):
547 ips.setdefault(ip, []).append(name)
549 _AddIpAddress(cluster.master_ip, "cluster_ip")
551 for node in data.nodes.values():
552 _AddIpAddress(node.primary_ip, "node:%s/primary" % node.name)
553 if node.secondary_ip != node.primary_ip:
554 _AddIpAddress(node.secondary_ip, "node:%s/secondary" % node.name)
556 for instance in data.instances.values():
557 for idx, nic in enumerate(instance.nics):
561 nicparams = objects.FillDict(default_nicparams, nic.nicparams)
562 nic_mode = nicparams[constants.NIC_MODE]
563 nic_link = nicparams[constants.NIC_LINK]
565 if nic_mode == constants.NIC_MODE_BRIDGED:
566 link = "bridge:%s" % nic_link
567 elif nic_mode == constants.NIC_MODE_ROUTED:
568 link = "route:%s" % nic_link
570 raise errors.ProgrammerError("NIC mode '%s' not handled" % nic_mode)
572 _AddIpAddress("%s/%s" % (link, nic.ip),
573 "instance:%s/nic:%d" % (instance.name, idx))
575 for ip, owners in ips.items():
577 result.append("IP address %s is used by multiple owners: %s" %
578 (ip, utils.CommaJoin(owners)))
582 @locking.ssynchronized(_config_lock, shared=1)
583 def VerifyConfig(self):
586 This is just a wrapper over L{_UnlockedVerifyConfig}.
589 @return: a list of error messages; a non-empty list signifies
593 return self._UnlockedVerifyConfig()
595 def _UnlockedSetDiskID(self, disk, node_name):
596 """Convert the unique ID to the ID needed on the target nodes.
598 This is used only for drbd, which needs ip/port configuration.
600 The routine descends down and updates its children also, because
601 this helps when the only the top device is passed to the remote
604 This function is for internal use, when the config lock is already held.
608 for child in disk.children:
609 self._UnlockedSetDiskID(child, node_name)
611 if disk.logical_id is None and disk.physical_id is not None:
613 if disk.dev_type == constants.LD_DRBD8:
614 pnode, snode, port, pminor, sminor, secret = disk.logical_id
615 if node_name not in (pnode, snode):
616 raise errors.ConfigurationError("DRBD device not knowing node %s" %
618 pnode_info = self._UnlockedGetNodeInfo(pnode)
619 snode_info = self._UnlockedGetNodeInfo(snode)
620 if pnode_info is None or snode_info is None:
621 raise errors.ConfigurationError("Can't find primary or secondary node"
622 " for %s" % str(disk))
623 p_data = (pnode_info.secondary_ip, port)
624 s_data = (snode_info.secondary_ip, port)
625 if pnode == node_name:
626 disk.physical_id = p_data + s_data + (pminor, secret)
627 else: # it must be secondary, we tested above
628 disk.physical_id = s_data + p_data + (sminor, secret)
630 disk.physical_id = disk.logical_id
633 @locking.ssynchronized(_config_lock)
634 def SetDiskID(self, disk, node_name):
635 """Convert the unique ID to the ID needed on the target nodes.
637 This is used only for drbd, which needs ip/port configuration.
639 The routine descends down and updates its children also, because
640 this helps when the only the top device is passed to the remote
644 return self._UnlockedSetDiskID(disk, node_name)
646 @locking.ssynchronized(_config_lock)
647 def AddTcpUdpPort(self, port):
648 """Adds a new port to the available port pool.
651 if not isinstance(port, int):
652 raise errors.ProgrammerError("Invalid type passed for port")
654 self._config_data.cluster.tcpudp_port_pool.add(port)
657 @locking.ssynchronized(_config_lock, shared=1)
658 def GetPortList(self):
659 """Returns a copy of the current port list.
662 return self._config_data.cluster.tcpudp_port_pool.copy()
664 @locking.ssynchronized(_config_lock)
665 def AllocatePort(self):
668 The port will be taken from the available port pool or from the
669 default port range (and in this case we increase
673 # If there are TCP/IP ports configured, we use them first.
674 if self._config_data.cluster.tcpudp_port_pool:
675 port = self._config_data.cluster.tcpudp_port_pool.pop()
677 port = self._config_data.cluster.highest_used_port + 1
678 if port >= constants.LAST_DRBD_PORT:
679 raise errors.ConfigurationError("The highest used port is greater"
680 " than %s. Aborting." %
681 constants.LAST_DRBD_PORT)
682 self._config_data.cluster.highest_used_port = port
687 def _UnlockedComputeDRBDMap(self):
688 """Compute the used DRBD minor/nodes.
691 @return: dictionary of node_name: dict of minor: instance_name;
692 the returned dict will have all the nodes in it (even if with
693 an empty list), and a list of duplicates; if the duplicates
694 list is not empty, the configuration is corrupted and its caller
695 should raise an exception
698 def _AppendUsedPorts(instance_name, disk, used):
700 if disk.dev_type == constants.LD_DRBD8 and len(disk.logical_id) >= 5:
701 node_a, node_b, _, minor_a, minor_b = disk.logical_id[:5]
702 for node, port in ((node_a, minor_a), (node_b, minor_b)):
703 assert node in used, ("Node '%s' of instance '%s' not found"
704 " in node list" % (node, instance_name))
705 if port in used[node]:
706 duplicates.append((node, port, instance_name, used[node][port]))
708 used[node][port] = instance_name
710 for child in disk.children:
711 duplicates.extend(_AppendUsedPorts(instance_name, child, used))
715 my_dict = dict((node, {}) for node in self._config_data.nodes)
716 for instance in self._config_data.instances.itervalues():
717 for disk in instance.disks:
718 duplicates.extend(_AppendUsedPorts(instance.name, disk, my_dict))
719 for (node, minor), instance in self._temporary_drbds.iteritems():
720 if minor in my_dict[node] and my_dict[node][minor] != instance:
721 duplicates.append((node, minor, instance, my_dict[node][minor]))
723 my_dict[node][minor] = instance
724 return my_dict, duplicates
726 @locking.ssynchronized(_config_lock)
727 def ComputeDRBDMap(self):
728 """Compute the used DRBD minor/nodes.
730 This is just a wrapper over L{_UnlockedComputeDRBDMap}.
732 @return: dictionary of node_name: dict of minor: instance_name;
733 the returned dict will have all the nodes in it (even if with
737 d_map, duplicates = self._UnlockedComputeDRBDMap()
739 raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
743 @locking.ssynchronized(_config_lock)
744 def AllocateDRBDMinor(self, nodes, instance):
745 """Allocate a drbd minor.
747 The free minor will be automatically computed from the existing
748 devices. A node can be given multiple times in order to allocate
749 multiple minors. The result is the list of minors, in the same
750 order as the passed nodes.
752 @type instance: string
753 @param instance: the instance for which we allocate minors
756 assert isinstance(instance, basestring), \
757 "Invalid argument '%s' passed to AllocateDRBDMinor" % instance
759 d_map, duplicates = self._UnlockedComputeDRBDMap()
761 raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
767 # no minors used, we can start at 0
770 self._temporary_drbds[(nname, 0)] = instance
774 ffree = utils.FirstFree(keys)
776 # return the next minor
777 # TODO: implement high-limit check
781 # double-check minor against current instances
782 assert minor not in d_map[nname], \
783 ("Attempt to reuse allocated DRBD minor %d on node %s,"
784 " already allocated to instance %s" %
785 (minor, nname, d_map[nname][minor]))
786 ndata[minor] = instance
787 # double-check minor against reservation
788 r_key = (nname, minor)
789 assert r_key not in self._temporary_drbds, \
790 ("Attempt to reuse reserved DRBD minor %d on node %s,"
791 " reserved for instance %s" %
792 (minor, nname, self._temporary_drbds[r_key]))
793 self._temporary_drbds[r_key] = instance
795 logging.debug("Request to allocate drbd minors, input: %s, returning %s",
799 def _UnlockedReleaseDRBDMinors(self, instance):
800 """Release temporary drbd minors allocated for a given instance.
802 @type instance: string
803 @param instance: the instance for which temporary minors should be
807 assert isinstance(instance, basestring), \
808 "Invalid argument passed to ReleaseDRBDMinors"
809 for key, name in self._temporary_drbds.items():
811 del self._temporary_drbds[key]
813 @locking.ssynchronized(_config_lock)
814 def ReleaseDRBDMinors(self, instance):
815 """Release temporary drbd minors allocated for a given instance.
817 This should be called on the error paths, on the success paths
818 it's automatically called by the ConfigWriter add and update
821 This function is just a wrapper over L{_UnlockedReleaseDRBDMinors}.
823 @type instance: string
824 @param instance: the instance for which temporary minors should be
828 self._UnlockedReleaseDRBDMinors(instance)
830 @locking.ssynchronized(_config_lock, shared=1)
831 def GetConfigVersion(self):
832 """Get the configuration version.
834 @return: Config version
837 return self._config_data.version
839 @locking.ssynchronized(_config_lock, shared=1)
840 def GetClusterName(self):
843 @return: Cluster name
846 return self._config_data.cluster.cluster_name
848 @locking.ssynchronized(_config_lock, shared=1)
849 def GetMasterNode(self):
850 """Get the hostname of the master node for this cluster.
852 @return: Master hostname
855 return self._config_data.cluster.master_node
857 @locking.ssynchronized(_config_lock, shared=1)
858 def GetMasterIP(self):
859 """Get the IP of the master node for this cluster.
864 return self._config_data.cluster.master_ip
866 @locking.ssynchronized(_config_lock, shared=1)
867 def GetMasterNetdev(self):
868 """Get the master network device for this cluster.
871 return self._config_data.cluster.master_netdev
873 @locking.ssynchronized(_config_lock, shared=1)
874 def GetFileStorageDir(self):
875 """Get the file storage dir for this cluster.
878 return self._config_data.cluster.file_storage_dir
880 @locking.ssynchronized(_config_lock, shared=1)
881 def GetHypervisorType(self):
882 """Get the hypervisor type for this cluster.
885 return self._config_data.cluster.enabled_hypervisors[0]
887 @locking.ssynchronized(_config_lock, shared=1)
888 def GetHostKey(self):
889 """Return the rsa hostkey from the config.
892 @return: the rsa hostkey
895 return self._config_data.cluster.rsahostkeypub
897 @locking.ssynchronized(_config_lock, shared=1)
898 def GetDefaultIAllocator(self):
899 """Get the default instance allocator for this cluster.
902 return self._config_data.cluster.default_iallocator
904 @locking.ssynchronized(_config_lock, shared=1)
905 def GetPrimaryIPFamily(self):
906 """Get cluster primary ip family.
908 @return: primary ip family
911 return self._config_data.cluster.primary_ip_family
913 @locking.ssynchronized(_config_lock)
914 def AddNodeGroup(self, group, ec_id, check_uuid=True):
915 """Add a node group to the configuration.
917 This method calls group.UpgradeConfig() to fill any missing attributes
918 according to their default values.
920 @type group: L{objects.NodeGroup}
921 @param group: the NodeGroup object to add
923 @param ec_id: unique id for the job to use when creating a missing UUID
924 @type check_uuid: bool
925 @param check_uuid: add an UUID to the group if it doesn't have one or, if
926 it does, ensure that it does not exist in the
927 configuration already
930 self._UnlockedAddNodeGroup(group, ec_id, check_uuid)
933 def _UnlockedAddNodeGroup(self, group, ec_id, check_uuid):
934 """Add a node group to the configuration.
937 logging.info("Adding node group %s to configuration", group.name)
939 # Some code might need to add a node group with a pre-populated UUID
940 # generated with ConfigWriter.GenerateUniqueID(). We allow them to bypass
941 # the "does this UUID" exist already check.
943 self._EnsureUUID(group, ec_id)
946 group.ctime = group.mtime = time.time()
947 group.UpgradeConfig()
949 self._config_data.nodegroups[group.uuid] = group
950 self._config_data.cluster.serial_no += 1
952 @locking.ssynchronized(_config_lock)
953 def RemoveNodeGroup(self, group_uuid):
954 """Remove a node group from the configuration.
956 @type group_uuid: string
957 @param group_uuid: the UUID of the node group to remove
960 logging.info("Removing node group %s from configuration", group_uuid)
962 if group_uuid not in self._config_data.nodegroups:
963 raise errors.ConfigurationError("Unknown node group '%s'" % group_uuid)
965 assert len(self._config_data.nodegroups) != 1, \
966 "Group '%s' is the only group, cannot be removed" % group_uuid
968 del self._config_data.nodegroups[group_uuid]
969 self._config_data.cluster.serial_no += 1
972 @locking.ssynchronized(_config_lock, shared=1)
973 def LookupNodeGroup(self, target):
974 """Lookup a node group's UUID.
976 @type target: string or None
977 @param target: group name or UUID or None to look for the default
979 @return: nodegroup UUID
980 @raises errors.OpPrereqError: when the target group cannot be found
984 if len(self._config_data.nodegroups) != 1:
985 raise errors.OpPrereqError("More than one node group exists. Target"
986 " group must be specified explicitely.")
988 return self._config_data.nodegroups.keys()[0]
989 if target in self._config_data.nodegroups:
991 for nodegroup in self._config_data.nodegroups.values():
992 if nodegroup.name == target:
993 return nodegroup.uuid
994 raise errors.OpPrereqError("Node group '%s' not found" % target,
997 def _UnlockedGetNodeGroup(self, uuid):
998 """Lookup a node group.
1001 @param uuid: group UUID
1002 @rtype: L{objects.NodeGroup} or None
1003 @return: nodegroup object, or None if not found
1006 if uuid not in self._config_data.nodegroups:
1009 return self._config_data.nodegroups[uuid]
1011 @locking.ssynchronized(_config_lock, shared=1)
1012 def GetNodeGroup(self, uuid):
1013 """Lookup a node group.
1016 @param uuid: group UUID
1017 @rtype: L{objects.NodeGroup} or None
1018 @return: nodegroup object, or None if not found
1021 return self._UnlockedGetNodeGroup(uuid)
1023 @locking.ssynchronized(_config_lock, shared=1)
1024 def GetAllNodeGroupsInfo(self):
1025 """Get the configuration of all node groups.
1028 return dict(self._config_data.nodegroups)
1030 @locking.ssynchronized(_config_lock, shared=1)
1031 def GetNodeGroupList(self):
1032 """Get a list of node groups.
1035 return self._config_data.nodegroups.keys()
1037 @locking.ssynchronized(_config_lock)
1038 def AddInstance(self, instance, ec_id):
1039 """Add an instance to the config.
1041 This should be used after creating a new instance.
1043 @type instance: L{objects.Instance}
1044 @param instance: the instance object
1047 if not isinstance(instance, objects.Instance):
1048 raise errors.ProgrammerError("Invalid type passed to AddInstance")
1050 if instance.disk_template != constants.DT_DISKLESS:
1051 all_lvs = instance.MapLVsByNode()
1052 logging.info("Instance '%s' DISK_LAYOUT: %s", instance.name, all_lvs)
1054 all_macs = self._AllMACs()
1055 for nic in instance.nics:
1056 if nic.mac in all_macs:
1057 raise errors.ConfigurationError("Cannot add instance %s:"
1058 " MAC address '%s' already in use." %
1059 (instance.name, nic.mac))
1061 self._EnsureUUID(instance, ec_id)
1063 instance.serial_no = 1
1064 instance.ctime = instance.mtime = time.time()
1065 self._config_data.instances[instance.name] = instance
1066 self._config_data.cluster.serial_no += 1
1067 self._UnlockedReleaseDRBDMinors(instance.name)
1070 def _EnsureUUID(self, item, ec_id):
1071 """Ensures a given object has a valid UUID.
1073 @param item: the instance or node to be checked
1074 @param ec_id: the execution context id for the uuid reservation
1078 item.uuid = self._GenerateUniqueID(ec_id)
1079 elif item.uuid in self._AllIDs(include_temporary=True):
1080 raise errors.ConfigurationError("Cannot add '%s': UUID %s already"
1081 " in use" % (item.name, item.uuid))
1083 def _SetInstanceStatus(self, instance_name, status):
1084 """Set the instance's status to a given value.
1087 assert isinstance(status, bool), \
1088 "Invalid status '%s' passed to SetInstanceStatus" % (status,)
1090 if instance_name not in self._config_data.instances:
1091 raise errors.ConfigurationError("Unknown instance '%s'" %
1093 instance = self._config_data.instances[instance_name]
1094 if instance.admin_up != status:
1095 instance.admin_up = status
1096 instance.serial_no += 1
1097 instance.mtime = time.time()
1100 @locking.ssynchronized(_config_lock)
1101 def MarkInstanceUp(self, instance_name):
1102 """Mark the instance status to up in the config.
1105 self._SetInstanceStatus(instance_name, True)
1107 @locking.ssynchronized(_config_lock)
1108 def RemoveInstance(self, instance_name):
1109 """Remove the instance from the configuration.
1112 if instance_name not in self._config_data.instances:
1113 raise errors.ConfigurationError("Unknown instance '%s'" % instance_name)
1114 del self._config_data.instances[instance_name]
1115 self._config_data.cluster.serial_no += 1
1118 @locking.ssynchronized(_config_lock)
1119 def RenameInstance(self, old_name, new_name):
1120 """Rename an instance.
1122 This needs to be done in ConfigWriter and not by RemoveInstance
1123 combined with AddInstance as only we can guarantee an atomic
1127 if old_name not in self._config_data.instances:
1128 raise errors.ConfigurationError("Unknown instance '%s'" % old_name)
1129 inst = self._config_data.instances[old_name]
1130 del self._config_data.instances[old_name]
1131 inst.name = new_name
1133 for disk in inst.disks:
1134 if disk.dev_type == constants.LD_FILE:
1135 # rename the file paths in logical and physical id
1136 file_storage_dir = os.path.dirname(os.path.dirname(disk.logical_id[1]))
1137 disk_fname = "disk%s" % disk.iv_name.split("/")[1]
1138 disk.physical_id = disk.logical_id = (disk.logical_id[0],
1139 utils.PathJoin(file_storage_dir,
1143 # Force update of ssconf files
1144 self._config_data.cluster.serial_no += 1
1146 self._config_data.instances[inst.name] = inst
1149 @locking.ssynchronized(_config_lock)
1150 def MarkInstanceDown(self, instance_name):
1151 """Mark the status of an instance to down in the configuration.
1154 self._SetInstanceStatus(instance_name, False)
1156 def _UnlockedGetInstanceList(self):
1157 """Get the list of instances.
1159 This function is for internal use, when the config lock is already held.
1162 return self._config_data.instances.keys()
1164 @locking.ssynchronized(_config_lock, shared=1)
1165 def GetInstanceList(self):
1166 """Get the list of instances.
1168 @return: array of instances, ex. ['instance2.example.com',
1169 'instance1.example.com']
1172 return self._UnlockedGetInstanceList()
1174 @locking.ssynchronized(_config_lock, shared=1)
1175 def ExpandInstanceName(self, short_name):
1176 """Attempt to expand an incomplete instance name.
1179 return utils.MatchNameComponent(short_name,
1180 self._config_data.instances.keys(),
1181 case_sensitive=False)
1183 def _UnlockedGetInstanceInfo(self, instance_name):
1184 """Returns information about an instance.
1186 This function is for internal use, when the config lock is already held.
1189 if instance_name not in self._config_data.instances:
1192 return self._config_data.instances[instance_name]
1194 @locking.ssynchronized(_config_lock, shared=1)
1195 def GetInstanceInfo(self, instance_name):
1196 """Returns information about an instance.
1198 It takes the information from the configuration file. Other information of
1199 an instance are taken from the live systems.
1201 @param instance_name: name of the instance, e.g.
1202 I{instance1.example.com}
1204 @rtype: L{objects.Instance}
1205 @return: the instance object
1208 return self._UnlockedGetInstanceInfo(instance_name)
1210 @locking.ssynchronized(_config_lock, shared=1)
1211 def GetAllInstancesInfo(self):
1212 """Get the configuration of all instances.
1215 @return: dict of (instance, instance_info), where instance_info is what
1216 would GetInstanceInfo return for the node
1219 my_dict = dict([(instance, self._UnlockedGetInstanceInfo(instance))
1220 for instance in self._UnlockedGetInstanceList()])
1223 @locking.ssynchronized(_config_lock)
1224 def AddNode(self, node, ec_id):
1225 """Add a node to the configuration.
1227 @type node: L{objects.Node}
1228 @param node: a Node instance
1231 logging.info("Adding node %s to configuration", node.name)
1233 self._EnsureUUID(node, ec_id)
1236 node.ctime = node.mtime = time.time()
1237 self._UnlockedAddNodeToGroup(node.name, node.group)
1238 self._config_data.nodes[node.name] = node
1239 self._config_data.cluster.serial_no += 1
1242 @locking.ssynchronized(_config_lock)
1243 def RemoveNode(self, node_name):
1244 """Remove a node from the configuration.
1247 logging.info("Removing node %s from configuration", node_name)
1249 if node_name not in self._config_data.nodes:
1250 raise errors.ConfigurationError("Unknown node '%s'" % node_name)
1252 self._UnlockedRemoveNodeFromGroup(self._config_data.nodes[node_name])
1253 del self._config_data.nodes[node_name]
1254 self._config_data.cluster.serial_no += 1
1257 @locking.ssynchronized(_config_lock, shared=1)
1258 def ExpandNodeName(self, short_name):
1259 """Attempt to expand an incomplete instance name.
1262 return utils.MatchNameComponent(short_name,
1263 self._config_data.nodes.keys(),
1264 case_sensitive=False)
1266 def _UnlockedGetNodeInfo(self, node_name):
1267 """Get the configuration of a node, as stored in the config.
1269 This function is for internal use, when the config lock is already
1272 @param node_name: the node name, e.g. I{node1.example.com}
1274 @rtype: L{objects.Node}
1275 @return: the node object
1278 if node_name not in self._config_data.nodes:
1281 return self._config_data.nodes[node_name]
1283 @locking.ssynchronized(_config_lock, shared=1)
1284 def GetNodeInfo(self, node_name):
1285 """Get the configuration of a node, as stored in the config.
1287 This is just a locked wrapper over L{_UnlockedGetNodeInfo}.
1289 @param node_name: the node name, e.g. I{node1.example.com}
1291 @rtype: L{objects.Node}
1292 @return: the node object
1295 return self._UnlockedGetNodeInfo(node_name)
1297 @locking.ssynchronized(_config_lock, shared=1)
1298 def GetNodeInstances(self, node_name):
1299 """Get the instances of a node, as stored in the config.
1301 @param node_name: the node name, e.g. I{node1.example.com}
1303 @rtype: (list, list)
1304 @return: a tuple with two lists: the primary and the secondary instances
1309 for inst in self._config_data.instances.values():
1310 if inst.primary_node == node_name:
1311 pri.append(inst.name)
1312 if node_name in inst.secondary_nodes:
1313 sec.append(inst.name)
1316 def _UnlockedGetNodeList(self):
1317 """Return the list of nodes which are in the configuration.
1319 This function is for internal use, when the config lock is already
1325 return self._config_data.nodes.keys()
1327 @locking.ssynchronized(_config_lock, shared=1)
1328 def GetNodeList(self):
1329 """Return the list of nodes which are in the configuration.
1332 return self._UnlockedGetNodeList()
1334 def _UnlockedGetOnlineNodeList(self):
1335 """Return the list of nodes which are online.
1338 all_nodes = [self._UnlockedGetNodeInfo(node)
1339 for node in self._UnlockedGetNodeList()]
1340 return [node.name for node in all_nodes if not node.offline]
1342 @locking.ssynchronized(_config_lock, shared=1)
1343 def GetOnlineNodeList(self):
1344 """Return the list of nodes which are online.
1347 return self._UnlockedGetOnlineNodeList()
1349 @locking.ssynchronized(_config_lock, shared=1)
1350 def GetVmCapableNodeList(self):
1351 """Return the list of nodes which are not vm capable.
1354 all_nodes = [self._UnlockedGetNodeInfo(node)
1355 for node in self._UnlockedGetNodeList()]
1356 return [node.name for node in all_nodes if node.vm_capable]
1358 @locking.ssynchronized(_config_lock, shared=1)
1359 def GetNonVmCapableNodeList(self):
1360 """Return the list of nodes which are not vm capable.
1363 all_nodes = [self._UnlockedGetNodeInfo(node)
1364 for node in self._UnlockedGetNodeList()]
1365 return [node.name for node in all_nodes if not node.vm_capable]
1367 @locking.ssynchronized(_config_lock, shared=1)
1368 def GetAllNodesInfo(self):
1369 """Get the configuration of all nodes.
1372 @return: dict of (node, node_info), where node_info is what
1373 would GetNodeInfo return for the node
1376 my_dict = dict([(node, self._UnlockedGetNodeInfo(node))
1377 for node in self._UnlockedGetNodeList()])
1380 def _UnlockedGetMasterCandidateStats(self, exceptions=None):
1381 """Get the number of current and maximum desired and possible candidates.
1383 @type exceptions: list
1384 @param exceptions: if passed, list of nodes that should be ignored
1386 @return: tuple of (current, desired and possible, possible)
1389 mc_now = mc_should = mc_max = 0
1390 for node in self._config_data.nodes.values():
1391 if exceptions and node.name in exceptions:
1393 if not (node.offline or node.drained) and node.master_capable:
1395 if node.master_candidate:
1397 mc_should = min(mc_max, self._config_data.cluster.candidate_pool_size)
1398 return (mc_now, mc_should, mc_max)
1400 @locking.ssynchronized(_config_lock, shared=1)
1401 def GetMasterCandidateStats(self, exceptions=None):
1402 """Get the number of current and maximum possible candidates.
1404 This is just a wrapper over L{_UnlockedGetMasterCandidateStats}.
1406 @type exceptions: list
1407 @param exceptions: if passed, list of nodes that should be ignored
1409 @return: tuple of (current, max)
1412 return self._UnlockedGetMasterCandidateStats(exceptions)
1414 @locking.ssynchronized(_config_lock)
1415 def MaintainCandidatePool(self, exceptions):
1416 """Try to grow the candidate pool to the desired size.
1418 @type exceptions: list
1419 @param exceptions: if passed, list of nodes that should be ignored
1421 @return: list with the adjusted nodes (L{objects.Node} instances)
1424 mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats(exceptions)
1427 node_list = self._config_data.nodes.keys()
1428 random.shuffle(node_list)
1429 for name in node_list:
1430 if mc_now >= mc_max:
1432 node = self._config_data.nodes[name]
1433 if (node.master_candidate or node.offline or node.drained or
1434 node.name in exceptions or not node.master_capable):
1436 mod_list.append(node)
1437 node.master_candidate = True
1440 if mc_now != mc_max:
1441 # this should not happen
1442 logging.warning("Warning: MaintainCandidatePool didn't manage to"
1443 " fill the candidate pool (%d/%d)", mc_now, mc_max)
1445 self._config_data.cluster.serial_no += 1
1450 def _UnlockedAddNodeToGroup(self, node_name, nodegroup_uuid):
1451 """Add a given node to the specified group.
1454 if nodegroup_uuid not in self._config_data.nodegroups:
1455 # This can happen if a node group gets deleted between its lookup and
1456 # when we're adding the first node to it, since we don't keep a lock in
1457 # the meantime. It's ok though, as we'll fail cleanly if the node group
1458 # is not found anymore.
1459 raise errors.OpExecError("Unknown node group: %s" % nodegroup_uuid)
1460 if node_name not in self._config_data.nodegroups[nodegroup_uuid].members:
1461 self._config_data.nodegroups[nodegroup_uuid].members.append(node_name)
1463 def _UnlockedRemoveNodeFromGroup(self, node):
1464 """Remove a given node from its group.
1467 nodegroup = node.group
1468 if nodegroup not in self._config_data.nodegroups:
1469 logging.warning("Warning: node '%s' has unknown node group '%s'"
1470 " (while being removed from it)", node.name, nodegroup)
1471 nodegroup_obj = self._config_data.nodegroups[nodegroup]
1472 if node.name not in nodegroup_obj.members:
1473 logging.warning("Warning: node '%s' not a member of its node group '%s'"
1474 " (while being removed from it)", node.name, nodegroup)
1476 nodegroup_obj.members.remove(node.name)
1478 def _BumpSerialNo(self):
1479 """Bump up the serial number of the config.
1482 self._config_data.serial_no += 1
1483 self._config_data.mtime = time.time()
1485 def _AllUUIDObjects(self):
1486 """Returns all objects with uuid attributes.
1489 return (self._config_data.instances.values() +
1490 self._config_data.nodes.values() +
1491 self._config_data.nodegroups.values() +
1492 [self._config_data.cluster])
1494 def _OpenConfig(self, accept_foreign):
1495 """Read the config data from disk.
1498 raw_data = utils.ReadFile(self._cfg_file)
1501 data = objects.ConfigData.FromDict(serializer.Load(raw_data))
1502 except Exception, err:
1503 raise errors.ConfigurationError(err)
1505 # Make sure the configuration has the right version
1506 _ValidateConfig(data)
1508 if (not hasattr(data, 'cluster') or
1509 not hasattr(data.cluster, 'rsahostkeypub')):
1510 raise errors.ConfigurationError("Incomplete configuration"
1511 " (missing cluster.rsahostkeypub)")
1513 if data.cluster.master_node != self._my_hostname and not accept_foreign:
1514 msg = ("The configuration denotes node %s as master, while my"
1515 " hostname is %s; opening a foreign configuration is only"
1516 " possible in accept_foreign mode" %
1517 (data.cluster.master_node, self._my_hostname))
1518 raise errors.ConfigurationError(msg)
1520 # Upgrade configuration if needed
1521 data.UpgradeConfig()
1523 self._config_data = data
1524 # reset the last serial as -1 so that the next write will cause
1526 self._last_cluster_serial = -1
1528 # And finally run our (custom) config upgrade sequence
1529 self._UpgradeConfig()
1531 self._cfg_id = utils.GetFileID(path=self._cfg_file)
1533 def _UpgradeConfig(self):
1534 """Run upgrade steps that cannot be done purely in the objects.
1536 This is because some data elements need uniqueness across the
1537 whole configuration, etc.
1539 @warning: this function will call L{_WriteConfig()}, but also
1540 L{DropECReservations} so it needs to be called only from a
1541 "safe" place (the constructor). If one wanted to call it with
1542 the lock held, a DropECReservationUnlocked would need to be
1543 created first, to avoid causing deadlock.
1547 for item in self._AllUUIDObjects():
1548 if item.uuid is None:
1549 item.uuid = self._GenerateUniqueID(_UPGRADE_CONFIG_JID)
1551 if not self._config_data.nodegroups:
1552 default_nodegroup_name = constants.INITIAL_NODE_GROUP_NAME
1553 default_nodegroup = objects.NodeGroup(name=default_nodegroup_name,
1555 self._UnlockedAddNodeGroup(default_nodegroup, _UPGRADE_CONFIG_JID, True)
1557 for node in self._config_data.nodes.values():
1559 node.group = self.LookupNodeGroup(None)
1561 # This is technically *not* an upgrade, but needs to be done both when
1562 # nodegroups are being added, and upon normally loading the config,
1563 # because the members list of a node group is discarded upon
1564 # serializing/deserializing the object.
1565 self._UnlockedAddNodeToGroup(node.name, node.group)
1568 # This is ok even if it acquires the internal lock, as _UpgradeConfig is
1569 # only called at config init time, without the lock held
1570 self.DropECReservations(_UPGRADE_CONFIG_JID)
1572 def _DistributeConfig(self, feedback_fn):
1573 """Distribute the configuration to the other nodes.
1575 Currently, this only copies the configuration file. In the future,
1576 it could be used to encapsulate the 2/3-phase update mechanism.
1586 myhostname = self._my_hostname
1587 # we can skip checking whether _UnlockedGetNodeInfo returns None
1588 # since the node list comes from _UnlocketGetNodeList, and we are
1589 # called with the lock held, so no modifications should take place
1591 for node_name in self._UnlockedGetNodeList():
1592 if node_name == myhostname:
1594 node_info = self._UnlockedGetNodeInfo(node_name)
1595 if not node_info.master_candidate:
1597 node_list.append(node_info.name)
1598 addr_list.append(node_info.primary_ip)
1600 result = rpc.RpcRunner.call_upload_file(node_list, self._cfg_file,
1601 address_list=addr_list)
1602 for to_node, to_result in result.items():
1603 msg = to_result.fail_msg
1605 msg = ("Copy of file %s to node %s failed: %s" %
1606 (self._cfg_file, to_node, msg))
1616 def _WriteConfig(self, destination=None, feedback_fn=None):
1617 """Write the configuration data to persistent storage.
1620 assert feedback_fn is None or callable(feedback_fn)
1622 # Warn on config errors, but don't abort the save - the
1623 # configuration has already been modified, and we can't revert;
1624 # the best we can do is to warn the user and save as is, leaving
1625 # recovery to the user
1626 config_errors = self._UnlockedVerifyConfig()
1628 errmsg = ("Configuration data is not consistent: %s" %
1629 (utils.CommaJoin(config_errors)))
1630 logging.critical(errmsg)
1634 if destination is None:
1635 destination = self._cfg_file
1636 self._BumpSerialNo()
1637 txt = serializer.Dump(self._config_data.ToDict())
1639 getents = self._getents()
1641 fd = utils.SafeWriteFile(destination, self._cfg_id, data=txt,
1642 close=False, gid=getents.confd_gid, mode=0640)
1643 except errors.LockError:
1644 raise errors.ConfigurationError("The configuration file has been"
1645 " modified since the last write, cannot"
1648 self._cfg_id = utils.GetFileID(fd=fd)
1652 self.write_count += 1
1654 # and redistribute the config file to master candidates
1655 self._DistributeConfig(feedback_fn)
1657 # Write ssconf files on all nodes (including locally)
1658 if self._last_cluster_serial < self._config_data.cluster.serial_no:
1659 if not self._offline:
1660 result = rpc.RpcRunner.call_write_ssconf_files(
1661 self._UnlockedGetOnlineNodeList(),
1662 self._UnlockedGetSsconfValues())
1664 for nname, nresu in result.items():
1665 msg = nresu.fail_msg
1667 errmsg = ("Error while uploading ssconf files to"
1668 " node %s: %s" % (nname, msg))
1669 logging.warning(errmsg)
1674 self._last_cluster_serial = self._config_data.cluster.serial_no
1676 def _UnlockedGetSsconfValues(self):
1677 """Return the values needed by ssconf.
1680 @return: a dictionary with keys the ssconf names and values their
1685 instance_names = utils.NiceSort(self._UnlockedGetInstanceList())
1686 node_names = utils.NiceSort(self._UnlockedGetNodeList())
1687 node_info = [self._UnlockedGetNodeInfo(name) for name in node_names]
1688 node_pri_ips = ["%s %s" % (ninfo.name, ninfo.primary_ip)
1689 for ninfo in node_info]
1690 node_snd_ips = ["%s %s" % (ninfo.name, ninfo.secondary_ip)
1691 for ninfo in node_info]
1693 instance_data = fn(instance_names)
1694 off_data = fn(node.name for node in node_info if node.offline)
1695 on_data = fn(node.name for node in node_info if not node.offline)
1696 mc_data = fn(node.name for node in node_info if node.master_candidate)
1697 mc_ips_data = fn(node.primary_ip for node in node_info
1698 if node.master_candidate)
1699 node_data = fn(node_names)
1700 node_pri_ips_data = fn(node_pri_ips)
1701 node_snd_ips_data = fn(node_snd_ips)
1703 cluster = self._config_data.cluster
1704 cluster_tags = fn(cluster.GetTags())
1706 hypervisor_list = fn(cluster.enabled_hypervisors)
1708 uid_pool = uidpool.FormatUidPool(cluster.uid_pool, separator="\n")
1710 nodegroups = ["%s %s" % (nodegroup.uuid, nodegroup.name) for nodegroup in
1711 self._config_data.nodegroups.values()]
1712 nodegroups_data = fn(utils.NiceSort(nodegroups))
1715 constants.SS_CLUSTER_NAME: cluster.cluster_name,
1716 constants.SS_CLUSTER_TAGS: cluster_tags,
1717 constants.SS_FILE_STORAGE_DIR: cluster.file_storage_dir,
1718 constants.SS_MASTER_CANDIDATES: mc_data,
1719 constants.SS_MASTER_CANDIDATES_IPS: mc_ips_data,
1720 constants.SS_MASTER_IP: cluster.master_ip,
1721 constants.SS_MASTER_NETDEV: cluster.master_netdev,
1722 constants.SS_MASTER_NODE: cluster.master_node,
1723 constants.SS_NODE_LIST: node_data,
1724 constants.SS_NODE_PRIMARY_IPS: node_pri_ips_data,
1725 constants.SS_NODE_SECONDARY_IPS: node_snd_ips_data,
1726 constants.SS_OFFLINE_NODES: off_data,
1727 constants.SS_ONLINE_NODES: on_data,
1728 constants.SS_PRIMARY_IP_FAMILY: str(cluster.primary_ip_family),
1729 constants.SS_INSTANCE_LIST: instance_data,
1730 constants.SS_RELEASE_VERSION: constants.RELEASE_VERSION,
1731 constants.SS_HYPERVISOR_LIST: hypervisor_list,
1732 constants.SS_MAINTAIN_NODE_HEALTH: str(cluster.maintain_node_health),
1733 constants.SS_UID_POOL: uid_pool,
1734 constants.SS_NODEGROUPS: nodegroups_data,
1737 @locking.ssynchronized(_config_lock, shared=1)
1738 def GetSsconfValues(self):
1739 """Wrapper using lock around _UnlockedGetSsconf().
1742 return self._UnlockedGetSsconfValues()
1744 @locking.ssynchronized(_config_lock, shared=1)
1745 def GetVGName(self):
1746 """Return the volume group name.
1749 return self._config_data.cluster.volume_group_name
1751 @locking.ssynchronized(_config_lock)
1752 def SetVGName(self, vg_name):
1753 """Set the volume group name.
1756 self._config_data.cluster.volume_group_name = vg_name
1757 self._config_data.cluster.serial_no += 1
1760 @locking.ssynchronized(_config_lock, shared=1)
1761 def GetDRBDHelper(self):
1762 """Return DRBD usermode helper.
1765 return self._config_data.cluster.drbd_usermode_helper
1767 @locking.ssynchronized(_config_lock)
1768 def SetDRBDHelper(self, drbd_helper):
1769 """Set DRBD usermode helper.
1772 self._config_data.cluster.drbd_usermode_helper = drbd_helper
1773 self._config_data.cluster.serial_no += 1
1776 @locking.ssynchronized(_config_lock, shared=1)
1777 def GetMACPrefix(self):
1778 """Return the mac prefix.
1781 return self._config_data.cluster.mac_prefix
1783 @locking.ssynchronized(_config_lock, shared=1)
1784 def GetClusterInfo(self):
1785 """Returns information about the cluster
1787 @rtype: L{objects.Cluster}
1788 @return: the cluster object
1791 return self._config_data.cluster
1793 @locking.ssynchronized(_config_lock, shared=1)
1794 def HasAnyDiskOfType(self, dev_type):
1795 """Check if in there is at disk of the given type in the configuration.
1798 return self._config_data.HasAnyDiskOfType(dev_type)
1800 @locking.ssynchronized(_config_lock)
1801 def Update(self, target, feedback_fn):
1802 """Notify function to be called after updates.
1804 This function must be called when an object (as returned by
1805 GetInstanceInfo, GetNodeInfo, GetCluster) has been updated and the
1806 caller wants the modifications saved to the backing store. Note
1807 that all modified objects will be saved, but the target argument
1808 is the one the caller wants to ensure that it's saved.
1810 @param target: an instance of either L{objects.Cluster},
1811 L{objects.Node} or L{objects.Instance} which is existing in
1813 @param feedback_fn: Callable feedback function
1816 if self._config_data is None:
1817 raise errors.ProgrammerError("Configuration file not read,"
1819 update_serial = False
1820 if isinstance(target, objects.Cluster):
1821 test = target == self._config_data.cluster
1822 elif isinstance(target, objects.Node):
1823 test = target in self._config_data.nodes.values()
1824 update_serial = True
1825 elif isinstance(target, objects.Instance):
1826 test = target in self._config_data.instances.values()
1827 elif isinstance(target, objects.NodeGroup):
1828 test = target in self._config_data.nodegroups.values()
1830 raise errors.ProgrammerError("Invalid object type (%s) passed to"
1831 " ConfigWriter.Update" % type(target))
1833 raise errors.ConfigurationError("Configuration updated since object"
1834 " has been read or unknown object")
1835 target.serial_no += 1
1836 target.mtime = now = time.time()
1839 # for node updates, we need to increase the cluster serial too
1840 self._config_data.cluster.serial_no += 1
1841 self._config_data.cluster.mtime = now
1843 if isinstance(target, objects.Instance):
1844 self._UnlockedReleaseDRBDMinors(target.name)
1846 self._WriteConfig(feedback_fn=feedback_fn)
1848 @locking.ssynchronized(_config_lock)
1849 def DropECReservations(self, ec_id):
1850 """Drop per-execution-context reservations
1853 for rm in self._all_rms:
1854 rm.DropECReservations(ec_id)