4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Configuration management for Ganeti
24 This module provides the interface to the Ganeti cluster configuration.
26 The configuration data is stored on every node but is updated on the master
27 only. After each update, the master distributes the data to the other nodes.
29 Currently, the data storage format is JSON. YAML was slow and consuming too
34 # pylint: disable=R0904
35 # R0904: Too many public methods
43 from ganeti import errors
44 from ganeti import locking
45 from ganeti import utils
46 from ganeti import constants
47 from ganeti import rpc
48 from ganeti import objects
49 from ganeti import serializer
50 from ganeti import uidpool
51 from ganeti import netutils
52 from ganeti import runtime
55 _config_lock = locking.SharedLock("ConfigWriter")
57 # job id used for resource management at config upgrade time
58 _UPGRADE_CONFIG_JID = "jid-cfg-upgrade"
61 def _ValidateConfig(data):
62 """Verifies that a configuration objects looks valid.
64 This only verifies the version of the configuration.
66 @raise errors.ConfigurationError: if the version differs from what
70 if data.version != constants.CONFIG_VERSION:
71 raise errors.ConfigVersionMismatch(constants.CONFIG_VERSION, data.version)
74 class TemporaryReservationManager:
75 """A temporary resource reservation manager.
77 This is used to reserve resources in a job, before using them, making sure
78 other jobs cannot get them in the meantime.
82 self._ec_reserved = {}
84 def Reserved(self, resource):
85 for holder_reserved in self._ec_reserved.values():
86 if resource in holder_reserved:
90 def Reserve(self, ec_id, resource):
91 if self.Reserved(resource):
92 raise errors.ReservationError("Duplicate reservation for resource '%s'"
94 if ec_id not in self._ec_reserved:
95 self._ec_reserved[ec_id] = set([resource])
97 self._ec_reserved[ec_id].add(resource)
99 def DropECReservations(self, ec_id):
100 if ec_id in self._ec_reserved:
101 del self._ec_reserved[ec_id]
103 def GetReserved(self):
105 for holder_reserved in self._ec_reserved.values():
106 all_reserved.update(holder_reserved)
109 def Generate(self, existing, generate_one_fn, ec_id):
110 """Generate a new resource of this type
113 assert callable(generate_one_fn)
115 all_elems = self.GetReserved()
116 all_elems.update(existing)
119 new_resource = generate_one_fn()
120 if new_resource is not None and new_resource not in all_elems:
123 raise errors.ConfigurationError("Not able generate new resource"
124 " (last tried: %s)" % new_resource)
125 self.Reserve(ec_id, new_resource)
129 def _MatchNameComponentIgnoreCase(short_name, names):
130 """Wrapper around L{utils.text.MatchNameComponent}.
133 return utils.MatchNameComponent(short_name, names, case_sensitive=False)
137 """The interface to the cluster configuration.
139 @ivar _temporary_lvs: reservation manager for temporary LVs
140 @ivar _all_rms: a list of all temporary reservation managers
143 def __init__(self, cfg_file=None, offline=False, _getents=runtime.GetEnts,
144 accept_foreign=False):
146 self._lock = _config_lock
147 self._config_data = None
148 self._offline = offline
150 self._cfg_file = constants.CLUSTER_CONF_FILE
152 self._cfg_file = cfg_file
153 self._getents = _getents
154 self._temporary_ids = TemporaryReservationManager()
155 self._temporary_drbds = {}
156 self._temporary_macs = TemporaryReservationManager()
157 self._temporary_secrets = TemporaryReservationManager()
158 self._temporary_lvs = TemporaryReservationManager()
159 self._all_rms = [self._temporary_ids, self._temporary_macs,
160 self._temporary_secrets, self._temporary_lvs]
161 # Note: in order to prevent errors when resolving our name in
162 # _DistributeConfig, we compute it here once and reuse it; it's
163 # better to raise an error before starting to modify the config
164 # file than after it was modified
165 self._my_hostname = netutils.Hostname.GetSysName()
166 self._last_cluster_serial = -1
169 self._OpenConfig(accept_foreign)
171 def _GetRpc(self, address_list):
172 """Returns RPC runner for configuration.
175 return rpc.ConfigRunner(self._context, address_list)
177 def SetContext(self, context):
178 """Sets Ganeti context.
181 self._context = context
183 # this method needs to be static, so that we can call it on the class
186 """Check if the cluster is configured.
189 return os.path.exists(constants.CLUSTER_CONF_FILE)
191 def _GenerateOneMAC(self):
192 """Generate one mac address
195 prefix = self._config_data.cluster.mac_prefix
196 byte1 = random.randrange(0, 256)
197 byte2 = random.randrange(0, 256)
198 byte3 = random.randrange(0, 256)
199 mac = "%s:%02x:%02x:%02x" % (prefix, byte1, byte2, byte3)
202 @locking.ssynchronized(_config_lock, shared=1)
203 def GetNdParams(self, node):
204 """Get the node params populated with cluster defaults.
206 @type node: L{objects.Node}
207 @param node: The node we want to know the params for
208 @return: A dict with the filled in node params
211 nodegroup = self._UnlockedGetNodeGroup(node.group)
212 return self._config_data.cluster.FillND(node, nodegroup)
214 @locking.ssynchronized(_config_lock, shared=1)
215 def GenerateMAC(self, ec_id):
216 """Generate a MAC for an instance.
218 This should check the current instances for duplicates.
221 existing = self._AllMACs()
222 return self._temporary_ids.Generate(existing, self._GenerateOneMAC, ec_id)
224 @locking.ssynchronized(_config_lock, shared=1)
225 def ReserveMAC(self, mac, ec_id):
226 """Reserve a MAC for an instance.
228 This only checks instances managed by this cluster, it does not
229 check for potential collisions elsewhere.
232 all_macs = self._AllMACs()
234 raise errors.ReservationError("mac already in use")
236 self._temporary_macs.Reserve(ec_id, mac)
238 @locking.ssynchronized(_config_lock, shared=1)
239 def ReserveLV(self, lv_name, ec_id):
240 """Reserve an VG/LV pair for an instance.
242 @type lv_name: string
243 @param lv_name: the logical volume name to reserve
246 all_lvs = self._AllLVs()
247 if lv_name in all_lvs:
248 raise errors.ReservationError("LV already in use")
250 self._temporary_lvs.Reserve(ec_id, lv_name)
252 @locking.ssynchronized(_config_lock, shared=1)
253 def GenerateDRBDSecret(self, ec_id):
254 """Generate a DRBD secret.
256 This checks the current disks for duplicates.
259 return self._temporary_secrets.Generate(self._AllDRBDSecrets(),
260 utils.GenerateSecret,
264 """Compute the list of all LVs.
268 for instance in self._config_data.instances.values():
269 node_data = instance.MapLVsByNode()
270 for lv_list in node_data.values():
271 lvnames.update(lv_list)
274 def _AllIDs(self, include_temporary):
275 """Compute the list of all UUIDs and names we have.
277 @type include_temporary: boolean
278 @param include_temporary: whether to include the _temporary_ids set
280 @return: a set of IDs
284 if include_temporary:
285 existing.update(self._temporary_ids.GetReserved())
286 existing.update(self._AllLVs())
287 existing.update(self._config_data.instances.keys())
288 existing.update(self._config_data.nodes.keys())
289 existing.update([i.uuid for i in self._AllUUIDObjects() if i.uuid])
292 def _GenerateUniqueID(self, ec_id):
293 """Generate an unique UUID.
295 This checks the current node, instances and disk names for
299 @return: the unique id
302 existing = self._AllIDs(include_temporary=False)
303 return self._temporary_ids.Generate(existing, utils.NewUUID, ec_id)
305 @locking.ssynchronized(_config_lock, shared=1)
306 def GenerateUniqueID(self, ec_id):
307 """Generate an unique ID.
309 This is just a wrapper over the unlocked version.
312 @param ec_id: unique id for the job to reserve the id to
315 return self._GenerateUniqueID(ec_id)
318 """Return all MACs present in the config.
321 @return: the list of all MACs
325 for instance in self._config_data.instances.values():
326 for nic in instance.nics:
327 result.append(nic.mac)
331 def _AllDRBDSecrets(self):
332 """Return all DRBD secrets present in the config.
335 @return: the list of all DRBD secrets
338 def helper(disk, result):
339 """Recursively gather secrets from this disk."""
340 if disk.dev_type == constants.DT_DRBD8:
341 result.append(disk.logical_id[5])
343 for child in disk.children:
344 helper(child, result)
347 for instance in self._config_data.instances.values():
348 for disk in instance.disks:
353 def _CheckDiskIDs(self, disk, l_ids, p_ids):
354 """Compute duplicate disk IDs
356 @type disk: L{objects.Disk}
357 @param disk: the disk at which to start searching
359 @param l_ids: list of current logical ids
361 @param p_ids: list of current physical ids
363 @return: a list of error messages
367 if disk.logical_id is not None:
368 if disk.logical_id in l_ids:
369 result.append("duplicate logical id %s" % str(disk.logical_id))
371 l_ids.append(disk.logical_id)
372 if disk.physical_id is not None:
373 if disk.physical_id in p_ids:
374 result.append("duplicate physical id %s" % str(disk.physical_id))
376 p_ids.append(disk.physical_id)
379 for child in disk.children:
380 result.extend(self._CheckDiskIDs(child, l_ids, p_ids))
383 def _UnlockedVerifyConfig(self):
387 @return: a list of error messages; a non-empty list signifies
391 # pylint: disable=R0914
395 data = self._config_data
396 cluster = data.cluster
400 # global cluster checks
401 if not cluster.enabled_hypervisors:
402 result.append("enabled hypervisors list doesn't have any entries")
403 invalid_hvs = set(cluster.enabled_hypervisors) - constants.HYPER_TYPES
405 result.append("enabled hypervisors contains invalid entries: %s" %
407 missing_hvp = (set(cluster.enabled_hypervisors) -
408 set(cluster.hvparams.keys()))
410 result.append("hypervisor parameters missing for the enabled"
411 " hypervisor(s) %s" % utils.CommaJoin(missing_hvp))
413 if cluster.master_node not in data.nodes:
414 result.append("cluster has invalid primary node '%s'" %
417 def _helper(owner, attr, value, template):
419 utils.ForceDictType(value, template)
420 except errors.GenericError, err:
421 result.append("%s has invalid %s: %s" % (owner, attr, err))
423 def _helper_nic(owner, params):
425 objects.NIC.CheckParameterSyntax(params)
426 except errors.ConfigurationError, err:
427 result.append("%s has invalid nicparams: %s" % (owner, err))
429 # check cluster parameters
430 _helper("cluster", "beparams", cluster.SimpleFillBE({}),
431 constants.BES_PARAMETER_TYPES)
432 _helper("cluster", "nicparams", cluster.SimpleFillNIC({}),
433 constants.NICS_PARAMETER_TYPES)
434 _helper_nic("cluster", cluster.SimpleFillNIC({}))
435 _helper("cluster", "ndparams", cluster.SimpleFillND({}),
436 constants.NDS_PARAMETER_TYPES)
438 # per-instance checks
439 for instance_name in data.instances:
440 instance = data.instances[instance_name]
441 if instance.name != instance_name:
442 result.append("instance '%s' is indexed by wrong name '%s'" %
443 (instance.name, instance_name))
444 if instance.primary_node not in data.nodes:
445 result.append("instance '%s' has invalid primary node '%s'" %
446 (instance_name, instance.primary_node))
447 for snode in instance.secondary_nodes:
448 if snode not in data.nodes:
449 result.append("instance '%s' has invalid secondary node '%s'" %
450 (instance_name, snode))
451 for idx, nic in enumerate(instance.nics):
452 if nic.mac in seen_macs:
453 result.append("instance '%s' has NIC %d mac %s duplicate" %
454 (instance_name, idx, nic.mac))
456 seen_macs.append(nic.mac)
458 filled = cluster.SimpleFillNIC(nic.nicparams)
459 owner = "instance %s nic %d" % (instance.name, idx)
460 _helper(owner, "nicparams",
461 filled, constants.NICS_PARAMETER_TYPES)
462 _helper_nic(owner, filled)
465 if instance.beparams:
466 _helper("instance %s" % instance.name, "beparams",
467 cluster.FillBE(instance), constants.BES_PARAMETER_TYPES)
469 # gather the drbd ports for duplicate checks
470 for dsk in instance.disks:
471 if dsk.dev_type in constants.LDS_DRBD:
472 tcp_port = dsk.logical_id[2]
473 if tcp_port not in ports:
475 ports[tcp_port].append((instance.name, "drbd disk %s" % dsk.iv_name))
476 # gather network port reservation
477 net_port = getattr(instance, "network_port", None)
478 if net_port is not None:
479 if net_port not in ports:
481 ports[net_port].append((instance.name, "network port"))
483 # instance disk verify
484 for idx, disk in enumerate(instance.disks):
485 result.extend(["instance '%s' disk %d error: %s" %
486 (instance.name, idx, msg) for msg in disk.Verify()])
487 result.extend(self._CheckDiskIDs(disk, seen_lids, seen_pids))
489 # cluster-wide pool of free ports
490 for free_port in cluster.tcpudp_port_pool:
491 if free_port not in ports:
492 ports[free_port] = []
493 ports[free_port].append(("cluster", "port marked as free"))
495 # compute tcp/udp duplicate ports
501 txt = utils.CommaJoin(["%s/%s" % val for val in pdata])
502 result.append("tcp/udp port %s has duplicates: %s" % (pnum, txt))
504 # highest used tcp port check
506 if keys[-1] > cluster.highest_used_port:
507 result.append("Highest used port mismatch, saved %s, computed %s" %
508 (cluster.highest_used_port, keys[-1]))
510 if not data.nodes[cluster.master_node].master_candidate:
511 result.append("Master node is not a master candidate")
513 # master candidate checks
514 mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats()
516 result.append("Not enough master candidates: actual %d, target %d" %
520 for node_name, node in data.nodes.items():
521 if node.name != node_name:
522 result.append("Node '%s' is indexed by wrong name '%s'" %
523 (node.name, node_name))
524 if [node.master_candidate, node.drained, node.offline].count(True) > 1:
525 result.append("Node %s state is invalid: master_candidate=%s,"
526 " drain=%s, offline=%s" %
527 (node.name, node.master_candidate, node.drained,
529 if node.group not in data.nodegroups:
530 result.append("Node '%s' has invalid group '%s'" %
531 (node.name, node.group))
533 _helper("node %s" % node.name, "ndparams",
534 cluster.FillND(node, data.nodegroups[node.group]),
535 constants.NDS_PARAMETER_TYPES)
538 nodegroups_names = set()
539 for nodegroup_uuid in data.nodegroups:
540 nodegroup = data.nodegroups[nodegroup_uuid]
541 if nodegroup.uuid != nodegroup_uuid:
542 result.append("node group '%s' (uuid: '%s') indexed by wrong uuid '%s'"
543 % (nodegroup.name, nodegroup.uuid, nodegroup_uuid))
544 if utils.UUID_RE.match(nodegroup.name.lower()):
545 result.append("node group '%s' (uuid: '%s') has uuid-like name" %
546 (nodegroup.name, nodegroup.uuid))
547 if nodegroup.name in nodegroups_names:
548 result.append("duplicate node group name '%s'" % nodegroup.name)
550 nodegroups_names.add(nodegroup.name)
551 if nodegroup.ndparams:
552 _helper("group %s" % nodegroup.name, "ndparams",
553 cluster.SimpleFillND(nodegroup.ndparams),
554 constants.NDS_PARAMETER_TYPES)
557 _, duplicates = self._UnlockedComputeDRBDMap()
558 for node, minor, instance_a, instance_b in duplicates:
559 result.append("DRBD minor %d on node %s is assigned twice to instances"
560 " %s and %s" % (minor, node, instance_a, instance_b))
563 default_nicparams = cluster.nicparams[constants.PP_DEFAULT]
566 def _AddIpAddress(ip, name):
567 ips.setdefault(ip, []).append(name)
569 _AddIpAddress(cluster.master_ip, "cluster_ip")
571 for node in data.nodes.values():
572 _AddIpAddress(node.primary_ip, "node:%s/primary" % node.name)
573 if node.secondary_ip != node.primary_ip:
574 _AddIpAddress(node.secondary_ip, "node:%s/secondary" % node.name)
576 for instance in data.instances.values():
577 for idx, nic in enumerate(instance.nics):
581 nicparams = objects.FillDict(default_nicparams, nic.nicparams)
582 nic_mode = nicparams[constants.NIC_MODE]
583 nic_link = nicparams[constants.NIC_LINK]
585 if nic_mode == constants.NIC_MODE_BRIDGED:
586 link = "bridge:%s" % nic_link
587 elif nic_mode == constants.NIC_MODE_ROUTED:
588 link = "route:%s" % nic_link
590 raise errors.ProgrammerError("NIC mode '%s' not handled" % nic_mode)
592 _AddIpAddress("%s/%s" % (link, nic.ip),
593 "instance:%s/nic:%d" % (instance.name, idx))
595 for ip, owners in ips.items():
597 result.append("IP address %s is used by multiple owners: %s" %
598 (ip, utils.CommaJoin(owners)))
602 @locking.ssynchronized(_config_lock, shared=1)
603 def VerifyConfig(self):
606 This is just a wrapper over L{_UnlockedVerifyConfig}.
609 @return: a list of error messages; a non-empty list signifies
613 return self._UnlockedVerifyConfig()
615 def _UnlockedSetDiskID(self, disk, node_name):
616 """Convert the unique ID to the ID needed on the target nodes.
618 This is used only for drbd, which needs ip/port configuration.
620 The routine descends down and updates its children also, because
621 this helps when the only the top device is passed to the remote
624 This function is for internal use, when the config lock is already held.
628 for child in disk.children:
629 self._UnlockedSetDiskID(child, node_name)
631 if disk.logical_id is None and disk.physical_id is not None:
633 if disk.dev_type == constants.LD_DRBD8:
634 pnode, snode, port, pminor, sminor, secret = disk.logical_id
635 if node_name not in (pnode, snode):
636 raise errors.ConfigurationError("DRBD device not knowing node %s" %
638 pnode_info = self._UnlockedGetNodeInfo(pnode)
639 snode_info = self._UnlockedGetNodeInfo(snode)
640 if pnode_info is None or snode_info is None:
641 raise errors.ConfigurationError("Can't find primary or secondary node"
642 " for %s" % str(disk))
643 p_data = (pnode_info.secondary_ip, port)
644 s_data = (snode_info.secondary_ip, port)
645 if pnode == node_name:
646 disk.physical_id = p_data + s_data + (pminor, secret)
647 else: # it must be secondary, we tested above
648 disk.physical_id = s_data + p_data + (sminor, secret)
650 disk.physical_id = disk.logical_id
653 @locking.ssynchronized(_config_lock)
654 def SetDiskID(self, disk, node_name):
655 """Convert the unique ID to the ID needed on the target nodes.
657 This is used only for drbd, which needs ip/port configuration.
659 The routine descends down and updates its children also, because
660 this helps when the only the top device is passed to the remote
664 return self._UnlockedSetDiskID(disk, node_name)
666 @locking.ssynchronized(_config_lock)
667 def AddTcpUdpPort(self, port):
668 """Adds a new port to the available port pool.
671 if not isinstance(port, int):
672 raise errors.ProgrammerError("Invalid type passed for port")
674 self._config_data.cluster.tcpudp_port_pool.add(port)
677 @locking.ssynchronized(_config_lock, shared=1)
678 def GetPortList(self):
679 """Returns a copy of the current port list.
682 return self._config_data.cluster.tcpudp_port_pool.copy()
684 @locking.ssynchronized(_config_lock)
685 def AllocatePort(self):
688 The port will be taken from the available port pool or from the
689 default port range (and in this case we increase
693 # If there are TCP/IP ports configured, we use them first.
694 if self._config_data.cluster.tcpudp_port_pool:
695 port = self._config_data.cluster.tcpudp_port_pool.pop()
697 port = self._config_data.cluster.highest_used_port + 1
698 if port >= constants.LAST_DRBD_PORT:
699 raise errors.ConfigurationError("The highest used port is greater"
700 " than %s. Aborting." %
701 constants.LAST_DRBD_PORT)
702 self._config_data.cluster.highest_used_port = port
707 def _UnlockedComputeDRBDMap(self):
708 """Compute the used DRBD minor/nodes.
711 @return: dictionary of node_name: dict of minor: instance_name;
712 the returned dict will have all the nodes in it (even if with
713 an empty list), and a list of duplicates; if the duplicates
714 list is not empty, the configuration is corrupted and its caller
715 should raise an exception
718 def _AppendUsedPorts(instance_name, disk, used):
720 if disk.dev_type == constants.LD_DRBD8 and len(disk.logical_id) >= 5:
721 node_a, node_b, _, minor_a, minor_b = disk.logical_id[:5]
722 for node, port in ((node_a, minor_a), (node_b, minor_b)):
723 assert node in used, ("Node '%s' of instance '%s' not found"
724 " in node list" % (node, instance_name))
725 if port in used[node]:
726 duplicates.append((node, port, instance_name, used[node][port]))
728 used[node][port] = instance_name
730 for child in disk.children:
731 duplicates.extend(_AppendUsedPorts(instance_name, child, used))
735 my_dict = dict((node, {}) for node in self._config_data.nodes)
736 for instance in self._config_data.instances.itervalues():
737 for disk in instance.disks:
738 duplicates.extend(_AppendUsedPorts(instance.name, disk, my_dict))
739 for (node, minor), instance in self._temporary_drbds.iteritems():
740 if minor in my_dict[node] and my_dict[node][minor] != instance:
741 duplicates.append((node, minor, instance, my_dict[node][minor]))
743 my_dict[node][minor] = instance
744 return my_dict, duplicates
746 @locking.ssynchronized(_config_lock)
747 def ComputeDRBDMap(self):
748 """Compute the used DRBD minor/nodes.
750 This is just a wrapper over L{_UnlockedComputeDRBDMap}.
752 @return: dictionary of node_name: dict of minor: instance_name;
753 the returned dict will have all the nodes in it (even if with
757 d_map, duplicates = self._UnlockedComputeDRBDMap()
759 raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
763 @locking.ssynchronized(_config_lock)
764 def AllocateDRBDMinor(self, nodes, instance):
765 """Allocate a drbd minor.
767 The free minor will be automatically computed from the existing
768 devices. A node can be given multiple times in order to allocate
769 multiple minors. The result is the list of minors, in the same
770 order as the passed nodes.
772 @type instance: string
773 @param instance: the instance for which we allocate minors
776 assert isinstance(instance, basestring), \
777 "Invalid argument '%s' passed to AllocateDRBDMinor" % instance
779 d_map, duplicates = self._UnlockedComputeDRBDMap()
781 raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
787 # no minors used, we can start at 0
790 self._temporary_drbds[(nname, 0)] = instance
794 ffree = utils.FirstFree(keys)
796 # return the next minor
797 # TODO: implement high-limit check
801 # double-check minor against current instances
802 assert minor not in d_map[nname], \
803 ("Attempt to reuse allocated DRBD minor %d on node %s,"
804 " already allocated to instance %s" %
805 (minor, nname, d_map[nname][minor]))
806 ndata[minor] = instance
807 # double-check minor against reservation
808 r_key = (nname, minor)
809 assert r_key not in self._temporary_drbds, \
810 ("Attempt to reuse reserved DRBD minor %d on node %s,"
811 " reserved for instance %s" %
812 (minor, nname, self._temporary_drbds[r_key]))
813 self._temporary_drbds[r_key] = instance
815 logging.debug("Request to allocate drbd minors, input: %s, returning %s",
819 def _UnlockedReleaseDRBDMinors(self, instance):
820 """Release temporary drbd minors allocated for a given instance.
822 @type instance: string
823 @param instance: the instance for which temporary minors should be
827 assert isinstance(instance, basestring), \
828 "Invalid argument passed to ReleaseDRBDMinors"
829 for key, name in self._temporary_drbds.items():
831 del self._temporary_drbds[key]
833 @locking.ssynchronized(_config_lock)
834 def ReleaseDRBDMinors(self, instance):
835 """Release temporary drbd minors allocated for a given instance.
837 This should be called on the error paths, on the success paths
838 it's automatically called by the ConfigWriter add and update
841 This function is just a wrapper over L{_UnlockedReleaseDRBDMinors}.
843 @type instance: string
844 @param instance: the instance for which temporary minors should be
848 self._UnlockedReleaseDRBDMinors(instance)
850 @locking.ssynchronized(_config_lock, shared=1)
851 def GetConfigVersion(self):
852 """Get the configuration version.
854 @return: Config version
857 return self._config_data.version
859 @locking.ssynchronized(_config_lock, shared=1)
860 def GetClusterName(self):
863 @return: Cluster name
866 return self._config_data.cluster.cluster_name
868 @locking.ssynchronized(_config_lock, shared=1)
869 def GetMasterNode(self):
870 """Get the hostname of the master node for this cluster.
872 @return: Master hostname
875 return self._config_data.cluster.master_node
877 @locking.ssynchronized(_config_lock, shared=1)
878 def GetMasterIP(self):
879 """Get the IP of the master node for this cluster.
884 return self._config_data.cluster.master_ip
886 @locking.ssynchronized(_config_lock, shared=1)
887 def GetMasterNetdev(self):
888 """Get the master network device for this cluster.
891 return self._config_data.cluster.master_netdev
893 @locking.ssynchronized(_config_lock, shared=1)
894 def GetMasterNetmask(self):
895 """Get the netmask of the master node for this cluster.
898 return self._config_data.cluster.master_netmask
900 @locking.ssynchronized(_config_lock, shared=1)
901 def GetUseExternalMipScript(self):
902 """Get flag representing whether to use the external master IP setup script.
905 return self._config_data.cluster.use_external_mip_script
907 @locking.ssynchronized(_config_lock, shared=1)
908 def GetFileStorageDir(self):
909 """Get the file storage dir for this cluster.
912 return self._config_data.cluster.file_storage_dir
914 @locking.ssynchronized(_config_lock, shared=1)
915 def GetSharedFileStorageDir(self):
916 """Get the shared file storage dir for this cluster.
919 return self._config_data.cluster.shared_file_storage_dir
921 @locking.ssynchronized(_config_lock, shared=1)
922 def GetHypervisorType(self):
923 """Get the hypervisor type for this cluster.
926 return self._config_data.cluster.enabled_hypervisors[0]
928 @locking.ssynchronized(_config_lock, shared=1)
929 def GetHostKey(self):
930 """Return the rsa hostkey from the config.
933 @return: the rsa hostkey
936 return self._config_data.cluster.rsahostkeypub
938 @locking.ssynchronized(_config_lock, shared=1)
939 def GetDefaultIAllocator(self):
940 """Get the default instance allocator for this cluster.
943 return self._config_data.cluster.default_iallocator
945 @locking.ssynchronized(_config_lock, shared=1)
946 def GetPrimaryIPFamily(self):
947 """Get cluster primary ip family.
949 @return: primary ip family
952 return self._config_data.cluster.primary_ip_family
954 @locking.ssynchronized(_config_lock, shared=1)
955 def GetMasterNetworkParameters(self):
956 """Get network parameters of the master node.
958 @rtype: L{object.MasterNetworkParameters}
959 @return: network parameters of the master node
962 cluster = self._config_data.cluster
963 result = objects.MasterNetworkParameters(name=cluster.master_node,
964 ip=cluster.master_ip,
965 netmask=cluster.master_netmask,
966 netdev=cluster.master_netdev,
967 ip_family=cluster.primary_ip_family)
971 @locking.ssynchronized(_config_lock)
972 def AddNodeGroup(self, group, ec_id, check_uuid=True):
973 """Add a node group to the configuration.
975 This method calls group.UpgradeConfig() to fill any missing attributes
976 according to their default values.
978 @type group: L{objects.NodeGroup}
979 @param group: the NodeGroup object to add
981 @param ec_id: unique id for the job to use when creating a missing UUID
982 @type check_uuid: bool
983 @param check_uuid: add an UUID to the group if it doesn't have one or, if
984 it does, ensure that it does not exist in the
985 configuration already
988 self._UnlockedAddNodeGroup(group, ec_id, check_uuid)
991 def _UnlockedAddNodeGroup(self, group, ec_id, check_uuid):
992 """Add a node group to the configuration.
995 logging.info("Adding node group %s to configuration", group.name)
997 # Some code might need to add a node group with a pre-populated UUID
998 # generated with ConfigWriter.GenerateUniqueID(). We allow them to bypass
999 # the "does this UUID" exist already check.
1001 self._EnsureUUID(group, ec_id)
1004 existing_uuid = self._UnlockedLookupNodeGroup(group.name)
1005 except errors.OpPrereqError:
1008 raise errors.OpPrereqError("Desired group name '%s' already exists as a"
1009 " node group (UUID: %s)" %
1010 (group.name, existing_uuid),
1011 errors.ECODE_EXISTS)
1014 group.ctime = group.mtime = time.time()
1015 group.UpgradeConfig()
1017 self._config_data.nodegroups[group.uuid] = group
1018 self._config_data.cluster.serial_no += 1
1020 @locking.ssynchronized(_config_lock)
1021 def RemoveNodeGroup(self, group_uuid):
1022 """Remove a node group from the configuration.
1024 @type group_uuid: string
1025 @param group_uuid: the UUID of the node group to remove
1028 logging.info("Removing node group %s from configuration", group_uuid)
1030 if group_uuid not in self._config_data.nodegroups:
1031 raise errors.ConfigurationError("Unknown node group '%s'" % group_uuid)
1033 assert len(self._config_data.nodegroups) != 1, \
1034 "Group '%s' is the only group, cannot be removed" % group_uuid
1036 del self._config_data.nodegroups[group_uuid]
1037 self._config_data.cluster.serial_no += 1
1040 def _UnlockedLookupNodeGroup(self, target):
1041 """Lookup a node group's UUID.
1043 @type target: string or None
1044 @param target: group name or UUID or None to look for the default
1046 @return: nodegroup UUID
1047 @raises errors.OpPrereqError: when the target group cannot be found
1051 if len(self._config_data.nodegroups) != 1:
1052 raise errors.OpPrereqError("More than one node group exists. Target"
1053 " group must be specified explicitely.")
1055 return self._config_data.nodegroups.keys()[0]
1056 if target in self._config_data.nodegroups:
1058 for nodegroup in self._config_data.nodegroups.values():
1059 if nodegroup.name == target:
1060 return nodegroup.uuid
1061 raise errors.OpPrereqError("Node group '%s' not found" % target,
1064 @locking.ssynchronized(_config_lock, shared=1)
1065 def LookupNodeGroup(self, target):
1066 """Lookup a node group's UUID.
1068 This function is just a wrapper over L{_UnlockedLookupNodeGroup}.
1070 @type target: string or None
1071 @param target: group name or UUID or None to look for the default
1073 @return: nodegroup UUID
1076 return self._UnlockedLookupNodeGroup(target)
1078 def _UnlockedGetNodeGroup(self, uuid):
1079 """Lookup a node group.
1082 @param uuid: group UUID
1083 @rtype: L{objects.NodeGroup} or None
1084 @return: nodegroup object, or None if not found
1087 if uuid not in self._config_data.nodegroups:
1090 return self._config_data.nodegroups[uuid]
1092 @locking.ssynchronized(_config_lock, shared=1)
1093 def GetNodeGroup(self, uuid):
1094 """Lookup a node group.
1097 @param uuid: group UUID
1098 @rtype: L{objects.NodeGroup} or None
1099 @return: nodegroup object, or None if not found
1102 return self._UnlockedGetNodeGroup(uuid)
1104 @locking.ssynchronized(_config_lock, shared=1)
1105 def GetAllNodeGroupsInfo(self):
1106 """Get the configuration of all node groups.
1109 return dict(self._config_data.nodegroups)
1111 @locking.ssynchronized(_config_lock, shared=1)
1112 def GetNodeGroupList(self):
1113 """Get a list of node groups.
1116 return self._config_data.nodegroups.keys()
1118 @locking.ssynchronized(_config_lock, shared=1)
1119 def GetNodeGroupMembersByNodes(self, nodes):
1120 """Get nodes which are member in the same nodegroups as the given nodes.
1123 ngfn = lambda node_name: self._UnlockedGetNodeInfo(node_name).group
1124 return frozenset(member_name
1125 for node_name in nodes
1127 self._UnlockedGetNodeGroup(ngfn(node_name)).members)
1129 @locking.ssynchronized(_config_lock)
1130 def AddInstance(self, instance, ec_id):
1131 """Add an instance to the config.
1133 This should be used after creating a new instance.
1135 @type instance: L{objects.Instance}
1136 @param instance: the instance object
1139 if not isinstance(instance, objects.Instance):
1140 raise errors.ProgrammerError("Invalid type passed to AddInstance")
1142 if instance.disk_template != constants.DT_DISKLESS:
1143 all_lvs = instance.MapLVsByNode()
1144 logging.info("Instance '%s' DISK_LAYOUT: %s", instance.name, all_lvs)
1146 all_macs = self._AllMACs()
1147 for nic in instance.nics:
1148 if nic.mac in all_macs:
1149 raise errors.ConfigurationError("Cannot add instance %s:"
1150 " MAC address '%s' already in use." %
1151 (instance.name, nic.mac))
1153 self._EnsureUUID(instance, ec_id)
1155 instance.serial_no = 1
1156 instance.ctime = instance.mtime = time.time()
1157 self._config_data.instances[instance.name] = instance
1158 self._config_data.cluster.serial_no += 1
1159 self._UnlockedReleaseDRBDMinors(instance.name)
1162 def _EnsureUUID(self, item, ec_id):
1163 """Ensures a given object has a valid UUID.
1165 @param item: the instance or node to be checked
1166 @param ec_id: the execution context id for the uuid reservation
1170 item.uuid = self._GenerateUniqueID(ec_id)
1171 elif item.uuid in self._AllIDs(include_temporary=True):
1172 raise errors.ConfigurationError("Cannot add '%s': UUID %s already"
1173 " in use" % (item.name, item.uuid))
1175 def _SetInstanceStatus(self, instance_name, status):
1176 """Set the instance's status to a given value.
1179 assert status in constants.ADMINST_ALL, \
1180 "Invalid status '%s' passed to SetInstanceStatus" % (status,)
1182 if instance_name not in self._config_data.instances:
1183 raise errors.ConfigurationError("Unknown instance '%s'" %
1185 instance = self._config_data.instances[instance_name]
1186 if instance.admin_state != status:
1187 instance.admin_state = status
1188 instance.serial_no += 1
1189 instance.mtime = time.time()
1192 @locking.ssynchronized(_config_lock)
1193 def MarkInstanceUp(self, instance_name):
1194 """Mark the instance status to up in the config.
1197 self._SetInstanceStatus(instance_name, constants.ADMINST_UP)
1199 @locking.ssynchronized(_config_lock)
1200 def MarkInstanceOffline(self, instance_name):
1201 """Mark the instance status to down in the config.
1204 self._SetInstanceStatus(instance_name, constants.ADMINST_OFFLINE)
1206 @locking.ssynchronized(_config_lock)
1207 def RemoveInstance(self, instance_name):
1208 """Remove the instance from the configuration.
1211 if instance_name not in self._config_data.instances:
1212 raise errors.ConfigurationError("Unknown instance '%s'" % instance_name)
1214 # If a network port has been allocated to the instance,
1215 # return it to the pool of free ports.
1216 inst = self._config_data.instances[instance_name]
1217 network_port = getattr(inst, "network_port", None)
1218 if network_port is not None:
1219 self._config_data.cluster.tcpudp_port_pool.add(network_port)
1221 del self._config_data.instances[instance_name]
1222 self._config_data.cluster.serial_no += 1
1225 @locking.ssynchronized(_config_lock)
1226 def RenameInstance(self, old_name, new_name):
1227 """Rename an instance.
1229 This needs to be done in ConfigWriter and not by RemoveInstance
1230 combined with AddInstance as only we can guarantee an atomic
1234 if old_name not in self._config_data.instances:
1235 raise errors.ConfigurationError("Unknown instance '%s'" % old_name)
1236 inst = self._config_data.instances[old_name]
1237 del self._config_data.instances[old_name]
1238 inst.name = new_name
1240 for disk in inst.disks:
1241 if disk.dev_type == constants.LD_FILE:
1242 # rename the file paths in logical and physical id
1243 file_storage_dir = os.path.dirname(os.path.dirname(disk.logical_id[1]))
1244 disk_fname = "disk%s" % disk.iv_name.split("/")[1]
1245 disk.physical_id = disk.logical_id = (disk.logical_id[0],
1246 utils.PathJoin(file_storage_dir,
1250 # Force update of ssconf files
1251 self._config_data.cluster.serial_no += 1
1253 self._config_data.instances[inst.name] = inst
1256 @locking.ssynchronized(_config_lock)
1257 def MarkInstanceDown(self, instance_name):
1258 """Mark the status of an instance to down in the configuration.
1261 self._SetInstanceStatus(instance_name, constants.ADMINST_DOWN)
1263 def _UnlockedGetInstanceList(self):
1264 """Get the list of instances.
1266 This function is for internal use, when the config lock is already held.
1269 return self._config_data.instances.keys()
1271 @locking.ssynchronized(_config_lock, shared=1)
1272 def GetInstanceList(self):
1273 """Get the list of instances.
1275 @return: array of instances, ex. ['instance2.example.com',
1276 'instance1.example.com']
1279 return self._UnlockedGetInstanceList()
1281 def ExpandInstanceName(self, short_name):
1282 """Attempt to expand an incomplete instance name.
1285 # Locking is done in L{ConfigWriter.GetInstanceList}
1286 return _MatchNameComponentIgnoreCase(short_name, self.GetInstanceList())
1288 def _UnlockedGetInstanceInfo(self, instance_name):
1289 """Returns information about an instance.
1291 This function is for internal use, when the config lock is already held.
1294 if instance_name not in self._config_data.instances:
1297 return self._config_data.instances[instance_name]
1299 @locking.ssynchronized(_config_lock, shared=1)
1300 def GetInstanceInfo(self, instance_name):
1301 """Returns information about an instance.
1303 It takes the information from the configuration file. Other information of
1304 an instance are taken from the live systems.
1306 @param instance_name: name of the instance, e.g.
1307 I{instance1.example.com}
1309 @rtype: L{objects.Instance}
1310 @return: the instance object
1313 return self._UnlockedGetInstanceInfo(instance_name)
1315 @locking.ssynchronized(_config_lock, shared=1)
1316 def GetInstanceNodeGroups(self, instance_name, primary_only=False):
1317 """Returns set of node group UUIDs for instance's nodes.
1322 instance = self._UnlockedGetInstanceInfo(instance_name)
1324 raise errors.ConfigurationError("Unknown instance '%s'" % instance_name)
1327 nodes = [instance.primary_node]
1329 nodes = instance.all_nodes
1331 return frozenset(self._UnlockedGetNodeInfo(node_name).group
1332 for node_name in nodes)
1334 @locking.ssynchronized(_config_lock, shared=1)
1335 def GetMultiInstanceInfo(self, instances):
1336 """Get the configuration of multiple instances.
1338 @param instances: list of instance names
1340 @return: list of tuples (instance, instance_info), where
1341 instance_info is what would GetInstanceInfo return for the
1342 node, while keeping the original order
1345 return [(name, self._UnlockedGetInstanceInfo(name)) for name in instances]
1347 @locking.ssynchronized(_config_lock, shared=1)
1348 def GetAllInstancesInfo(self):
1349 """Get the configuration of all instances.
1352 @return: dict of (instance, instance_info), where instance_info is what
1353 would GetInstanceInfo return for the node
1356 my_dict = dict([(instance, self._UnlockedGetInstanceInfo(instance))
1357 for instance in self._UnlockedGetInstanceList()])
1360 @locking.ssynchronized(_config_lock, shared=1)
1361 def GetInstancesInfoByFilter(self, filter_fn):
1362 """Get instance configuration with a filter.
1364 @type filter_fn: callable
1365 @param filter_fn: Filter function receiving instance object as parameter,
1366 returning boolean. Important: this function is called while the
1367 configuration locks is held. It must not do any complex work or call
1368 functions potentially leading to a deadlock. Ideally it doesn't call any
1369 other functions and just compares instance attributes.
1372 return dict((name, inst)
1373 for (name, inst) in self._config_data.instances.items()
1376 @locking.ssynchronized(_config_lock)
1377 def AddNode(self, node, ec_id):
1378 """Add a node to the configuration.
1380 @type node: L{objects.Node}
1381 @param node: a Node instance
1384 logging.info("Adding node %s to configuration", node.name)
1386 self._EnsureUUID(node, ec_id)
1389 node.ctime = node.mtime = time.time()
1390 self._UnlockedAddNodeToGroup(node.name, node.group)
1391 self._config_data.nodes[node.name] = node
1392 self._config_data.cluster.serial_no += 1
1395 @locking.ssynchronized(_config_lock)
1396 def RemoveNode(self, node_name):
1397 """Remove a node from the configuration.
1400 logging.info("Removing node %s from configuration", node_name)
1402 if node_name not in self._config_data.nodes:
1403 raise errors.ConfigurationError("Unknown node '%s'" % node_name)
1405 self._UnlockedRemoveNodeFromGroup(self._config_data.nodes[node_name])
1406 del self._config_data.nodes[node_name]
1407 self._config_data.cluster.serial_no += 1
1410 def ExpandNodeName(self, short_name):
1411 """Attempt to expand an incomplete node name.
1414 # Locking is done in L{ConfigWriter.GetNodeList}
1415 return _MatchNameComponentIgnoreCase(short_name, self.GetNodeList())
1417 def _UnlockedGetNodeInfo(self, node_name):
1418 """Get the configuration of a node, as stored in the config.
1420 This function is for internal use, when the config lock is already
1423 @param node_name: the node name, e.g. I{node1.example.com}
1425 @rtype: L{objects.Node}
1426 @return: the node object
1429 if node_name not in self._config_data.nodes:
1432 return self._config_data.nodes[node_name]
1434 @locking.ssynchronized(_config_lock, shared=1)
1435 def GetNodeInfo(self, node_name):
1436 """Get the configuration of a node, as stored in the config.
1438 This is just a locked wrapper over L{_UnlockedGetNodeInfo}.
1440 @param node_name: the node name, e.g. I{node1.example.com}
1442 @rtype: L{objects.Node}
1443 @return: the node object
1446 return self._UnlockedGetNodeInfo(node_name)
1448 @locking.ssynchronized(_config_lock, shared=1)
1449 def GetNodeInstances(self, node_name):
1450 """Get the instances of a node, as stored in the config.
1452 @param node_name: the node name, e.g. I{node1.example.com}
1454 @rtype: (list, list)
1455 @return: a tuple with two lists: the primary and the secondary instances
1460 for inst in self._config_data.instances.values():
1461 if inst.primary_node == node_name:
1462 pri.append(inst.name)
1463 if node_name in inst.secondary_nodes:
1464 sec.append(inst.name)
1467 @locking.ssynchronized(_config_lock, shared=1)
1468 def GetNodeGroupInstances(self, uuid, primary_only=False):
1469 """Get the instances of a node group.
1471 @param uuid: Node group UUID
1472 @param primary_only: Whether to only consider primary nodes
1474 @return: List of instance names in node group
1478 nodes_fn = lambda inst: [inst.primary_node]
1480 nodes_fn = lambda inst: inst.all_nodes
1482 return frozenset(inst.name
1483 for inst in self._config_data.instances.values()
1484 for node_name in nodes_fn(inst)
1485 if self._UnlockedGetNodeInfo(node_name).group == uuid)
1487 def _UnlockedGetNodeList(self):
1488 """Return the list of nodes which are in the configuration.
1490 This function is for internal use, when the config lock is already
1496 return self._config_data.nodes.keys()
1498 @locking.ssynchronized(_config_lock, shared=1)
1499 def GetNodeList(self):
1500 """Return the list of nodes which are in the configuration.
1503 return self._UnlockedGetNodeList()
1505 def _UnlockedGetOnlineNodeList(self):
1506 """Return the list of nodes which are online.
1509 all_nodes = [self._UnlockedGetNodeInfo(node)
1510 for node in self._UnlockedGetNodeList()]
1511 return [node.name for node in all_nodes if not node.offline]
1513 @locking.ssynchronized(_config_lock, shared=1)
1514 def GetOnlineNodeList(self):
1515 """Return the list of nodes which are online.
1518 return self._UnlockedGetOnlineNodeList()
1520 @locking.ssynchronized(_config_lock, shared=1)
1521 def GetVmCapableNodeList(self):
1522 """Return the list of nodes which are not vm capable.
1525 all_nodes = [self._UnlockedGetNodeInfo(node)
1526 for node in self._UnlockedGetNodeList()]
1527 return [node.name for node in all_nodes if node.vm_capable]
1529 @locking.ssynchronized(_config_lock, shared=1)
1530 def GetNonVmCapableNodeList(self):
1531 """Return the list of nodes which are not vm capable.
1534 all_nodes = [self._UnlockedGetNodeInfo(node)
1535 for node in self._UnlockedGetNodeList()]
1536 return [node.name for node in all_nodes if not node.vm_capable]
1538 @locking.ssynchronized(_config_lock, shared=1)
1539 def GetMultiNodeInfo(self, nodes):
1540 """Get the configuration of multiple nodes.
1542 @param nodes: list of node names
1544 @return: list of tuples of (node, node_info), where node_info is
1545 what would GetNodeInfo return for the node, in the original
1549 return [(name, self._UnlockedGetNodeInfo(name)) for name in nodes]
1551 @locking.ssynchronized(_config_lock, shared=1)
1552 def GetAllNodesInfo(self):
1553 """Get the configuration of all nodes.
1556 @return: dict of (node, node_info), where node_info is what
1557 would GetNodeInfo return for the node
1560 return self._UnlockedGetAllNodesInfo()
1562 def _UnlockedGetAllNodesInfo(self):
1563 """Gets configuration of all nodes.
1565 @note: See L{GetAllNodesInfo}
1568 return dict([(node, self._UnlockedGetNodeInfo(node))
1569 for node in self._UnlockedGetNodeList()])
1571 @locking.ssynchronized(_config_lock, shared=1)
1572 def GetNodeGroupsFromNodes(self, nodes):
1573 """Returns groups for a list of nodes.
1575 @type nodes: list of string
1576 @param nodes: List of node names
1580 return frozenset(self._UnlockedGetNodeInfo(name).group for name in nodes)
1582 def _UnlockedGetMasterCandidateStats(self, exceptions=None):
1583 """Get the number of current and maximum desired and possible candidates.
1585 @type exceptions: list
1586 @param exceptions: if passed, list of nodes that should be ignored
1588 @return: tuple of (current, desired and possible, possible)
1591 mc_now = mc_should = mc_max = 0
1592 for node in self._config_data.nodes.values():
1593 if exceptions and node.name in exceptions:
1595 if not (node.offline or node.drained) and node.master_capable:
1597 if node.master_candidate:
1599 mc_should = min(mc_max, self._config_data.cluster.candidate_pool_size)
1600 return (mc_now, mc_should, mc_max)
1602 @locking.ssynchronized(_config_lock, shared=1)
1603 def GetMasterCandidateStats(self, exceptions=None):
1604 """Get the number of current and maximum possible candidates.
1606 This is just a wrapper over L{_UnlockedGetMasterCandidateStats}.
1608 @type exceptions: list
1609 @param exceptions: if passed, list of nodes that should be ignored
1611 @return: tuple of (current, max)
1614 return self._UnlockedGetMasterCandidateStats(exceptions)
1616 @locking.ssynchronized(_config_lock)
1617 def MaintainCandidatePool(self, exceptions):
1618 """Try to grow the candidate pool to the desired size.
1620 @type exceptions: list
1621 @param exceptions: if passed, list of nodes that should be ignored
1623 @return: list with the adjusted nodes (L{objects.Node} instances)
1626 mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats(exceptions)
1629 node_list = self._config_data.nodes.keys()
1630 random.shuffle(node_list)
1631 for name in node_list:
1632 if mc_now >= mc_max:
1634 node = self._config_data.nodes[name]
1635 if (node.master_candidate or node.offline or node.drained or
1636 node.name in exceptions or not node.master_capable):
1638 mod_list.append(node)
1639 node.master_candidate = True
1642 if mc_now != mc_max:
1643 # this should not happen
1644 logging.warning("Warning: MaintainCandidatePool didn't manage to"
1645 " fill the candidate pool (%d/%d)", mc_now, mc_max)
1647 self._config_data.cluster.serial_no += 1
1652 def _UnlockedAddNodeToGroup(self, node_name, nodegroup_uuid):
1653 """Add a given node to the specified group.
1656 if nodegroup_uuid not in self._config_data.nodegroups:
1657 # This can happen if a node group gets deleted between its lookup and
1658 # when we're adding the first node to it, since we don't keep a lock in
1659 # the meantime. It's ok though, as we'll fail cleanly if the node group
1660 # is not found anymore.
1661 raise errors.OpExecError("Unknown node group: %s" % nodegroup_uuid)
1662 if node_name not in self._config_data.nodegroups[nodegroup_uuid].members:
1663 self._config_data.nodegroups[nodegroup_uuid].members.append(node_name)
1665 def _UnlockedRemoveNodeFromGroup(self, node):
1666 """Remove a given node from its group.
1669 nodegroup = node.group
1670 if nodegroup not in self._config_data.nodegroups:
1671 logging.warning("Warning: node '%s' has unknown node group '%s'"
1672 " (while being removed from it)", node.name, nodegroup)
1673 nodegroup_obj = self._config_data.nodegroups[nodegroup]
1674 if node.name not in nodegroup_obj.members:
1675 logging.warning("Warning: node '%s' not a member of its node group '%s'"
1676 " (while being removed from it)", node.name, nodegroup)
1678 nodegroup_obj.members.remove(node.name)
1680 @locking.ssynchronized(_config_lock)
1681 def AssignGroupNodes(self, mods):
1682 """Changes the group of a number of nodes.
1684 @type mods: list of tuples; (node name, new group UUID)
1685 @param mods: Node membership modifications
1688 groups = self._config_data.nodegroups
1689 nodes = self._config_data.nodes
1693 # Try to resolve names/UUIDs first
1694 for (node_name, new_group_uuid) in mods:
1696 node = nodes[node_name]
1698 raise errors.ConfigurationError("Unable to find node '%s'" % node_name)
1700 if node.group == new_group_uuid:
1701 # Node is being assigned to its current group
1702 logging.debug("Node '%s' was assigned to its current group (%s)",
1703 node_name, node.group)
1706 # Try to find current group of node
1708 old_group = groups[node.group]
1710 raise errors.ConfigurationError("Unable to find old group '%s'" %
1713 # Try to find new group for node
1715 new_group = groups[new_group_uuid]
1717 raise errors.ConfigurationError("Unable to find new group '%s'" %
1720 assert node.name in old_group.members, \
1721 ("Inconsistent configuration: node '%s' not listed in members for its"
1722 " old group '%s'" % (node.name, old_group.uuid))
1723 assert node.name not in new_group.members, \
1724 ("Inconsistent configuration: node '%s' already listed in members for"
1725 " its new group '%s'" % (node.name, new_group.uuid))
1727 resmod.append((node, old_group, new_group))
1730 for (node, old_group, new_group) in resmod:
1731 assert node.uuid != new_group.uuid and old_group.uuid != new_group.uuid, \
1732 "Assigning to current group is not possible"
1734 node.group = new_group.uuid
1736 # Update members of involved groups
1737 if node.name in old_group.members:
1738 old_group.members.remove(node.name)
1739 if node.name not in new_group.members:
1740 new_group.members.append(node.name)
1742 # Update timestamps and serials (only once per node/group object)
1744 for obj in frozenset(itertools.chain(*resmod)): # pylint: disable=W0142
1748 # Force ssconf update
1749 self._config_data.cluster.serial_no += 1
1753 def _BumpSerialNo(self):
1754 """Bump up the serial number of the config.
1757 self._config_data.serial_no += 1
1758 self._config_data.mtime = time.time()
1760 def _AllUUIDObjects(self):
1761 """Returns all objects with uuid attributes.
1764 return (self._config_data.instances.values() +
1765 self._config_data.nodes.values() +
1766 self._config_data.nodegroups.values() +
1767 [self._config_data.cluster])
1769 def _OpenConfig(self, accept_foreign):
1770 """Read the config data from disk.
1773 raw_data = utils.ReadFile(self._cfg_file)
1776 data = objects.ConfigData.FromDict(serializer.Load(raw_data))
1777 except Exception, err:
1778 raise errors.ConfigurationError(err)
1780 # Make sure the configuration has the right version
1781 _ValidateConfig(data)
1783 if (not hasattr(data, 'cluster') or
1784 not hasattr(data.cluster, 'rsahostkeypub')):
1785 raise errors.ConfigurationError("Incomplete configuration"
1786 " (missing cluster.rsahostkeypub)")
1788 if data.cluster.master_node != self._my_hostname and not accept_foreign:
1789 msg = ("The configuration denotes node %s as master, while my"
1790 " hostname is %s; opening a foreign configuration is only"
1791 " possible in accept_foreign mode" %
1792 (data.cluster.master_node, self._my_hostname))
1793 raise errors.ConfigurationError(msg)
1795 # Upgrade configuration if needed
1796 data.UpgradeConfig()
1798 self._config_data = data
1799 # reset the last serial as -1 so that the next write will cause
1801 self._last_cluster_serial = -1
1803 # And finally run our (custom) config upgrade sequence
1804 self._UpgradeConfig()
1806 self._cfg_id = utils.GetFileID(path=self._cfg_file)
1808 def _UpgradeConfig(self):
1809 """Run upgrade steps that cannot be done purely in the objects.
1811 This is because some data elements need uniqueness across the
1812 whole configuration, etc.
1814 @warning: this function will call L{_WriteConfig()}, but also
1815 L{DropECReservations} so it needs to be called only from a
1816 "safe" place (the constructor). If one wanted to call it with
1817 the lock held, a DropECReservationUnlocked would need to be
1818 created first, to avoid causing deadlock.
1822 for item in self._AllUUIDObjects():
1823 if item.uuid is None:
1824 item.uuid = self._GenerateUniqueID(_UPGRADE_CONFIG_JID)
1826 if not self._config_data.nodegroups:
1827 default_nodegroup_name = constants.INITIAL_NODE_GROUP_NAME
1828 default_nodegroup = objects.NodeGroup(name=default_nodegroup_name,
1830 self._UnlockedAddNodeGroup(default_nodegroup, _UPGRADE_CONFIG_JID, True)
1832 for node in self._config_data.nodes.values():
1834 node.group = self.LookupNodeGroup(None)
1836 # This is technically *not* an upgrade, but needs to be done both when
1837 # nodegroups are being added, and upon normally loading the config,
1838 # because the members list of a node group is discarded upon
1839 # serializing/deserializing the object.
1840 self._UnlockedAddNodeToGroup(node.name, node.group)
1843 # This is ok even if it acquires the internal lock, as _UpgradeConfig is
1844 # only called at config init time, without the lock held
1845 self.DropECReservations(_UPGRADE_CONFIG_JID)
1847 def _DistributeConfig(self, feedback_fn):
1848 """Distribute the configuration to the other nodes.
1850 Currently, this only copies the configuration file. In the future,
1851 it could be used to encapsulate the 2/3-phase update mechanism.
1861 myhostname = self._my_hostname
1862 # we can skip checking whether _UnlockedGetNodeInfo returns None
1863 # since the node list comes from _UnlocketGetNodeList, and we are
1864 # called with the lock held, so no modifications should take place
1866 for node_name in self._UnlockedGetNodeList():
1867 if node_name == myhostname:
1869 node_info = self._UnlockedGetNodeInfo(node_name)
1870 if not node_info.master_candidate:
1872 node_list.append(node_info.name)
1873 addr_list.append(node_info.primary_ip)
1875 # TODO: Use dedicated resolver talking to config writer for name resolution
1877 self._GetRpc(addr_list).call_upload_file(node_list, self._cfg_file)
1878 for to_node, to_result in result.items():
1879 msg = to_result.fail_msg
1881 msg = ("Copy of file %s to node %s failed: %s" %
1882 (self._cfg_file, to_node, msg))
1892 def _WriteConfig(self, destination=None, feedback_fn=None):
1893 """Write the configuration data to persistent storage.
1896 assert feedback_fn is None or callable(feedback_fn)
1898 # Warn on config errors, but don't abort the save - the
1899 # configuration has already been modified, and we can't revert;
1900 # the best we can do is to warn the user and save as is, leaving
1901 # recovery to the user
1902 config_errors = self._UnlockedVerifyConfig()
1904 errmsg = ("Configuration data is not consistent: %s" %
1905 (utils.CommaJoin(config_errors)))
1906 logging.critical(errmsg)
1910 if destination is None:
1911 destination = self._cfg_file
1912 self._BumpSerialNo()
1913 txt = serializer.Dump(self._config_data.ToDict())
1915 getents = self._getents()
1917 fd = utils.SafeWriteFile(destination, self._cfg_id, data=txt,
1918 close=False, gid=getents.confd_gid, mode=0640)
1919 except errors.LockError:
1920 raise errors.ConfigurationError("The configuration file has been"
1921 " modified since the last write, cannot"
1924 self._cfg_id = utils.GetFileID(fd=fd)
1928 self.write_count += 1
1930 # and redistribute the config file to master candidates
1931 self._DistributeConfig(feedback_fn)
1933 # Write ssconf files on all nodes (including locally)
1934 if self._last_cluster_serial < self._config_data.cluster.serial_no:
1935 if not self._offline:
1936 result = self._GetRpc(None).call_write_ssconf_files(
1937 self._UnlockedGetOnlineNodeList(),
1938 self._UnlockedGetSsconfValues())
1940 for nname, nresu in result.items():
1941 msg = nresu.fail_msg
1943 errmsg = ("Error while uploading ssconf files to"
1944 " node %s: %s" % (nname, msg))
1945 logging.warning(errmsg)
1950 self._last_cluster_serial = self._config_data.cluster.serial_no
1952 def _UnlockedGetSsconfValues(self):
1953 """Return the values needed by ssconf.
1956 @return: a dictionary with keys the ssconf names and values their
1961 instance_names = utils.NiceSort(self._UnlockedGetInstanceList())
1962 node_names = utils.NiceSort(self._UnlockedGetNodeList())
1963 node_info = [self._UnlockedGetNodeInfo(name) for name in node_names]
1964 node_pri_ips = ["%s %s" % (ninfo.name, ninfo.primary_ip)
1965 for ninfo in node_info]
1966 node_snd_ips = ["%s %s" % (ninfo.name, ninfo.secondary_ip)
1967 for ninfo in node_info]
1969 instance_data = fn(instance_names)
1970 off_data = fn(node.name for node in node_info if node.offline)
1971 on_data = fn(node.name for node in node_info if not node.offline)
1972 mc_data = fn(node.name for node in node_info if node.master_candidate)
1973 mc_ips_data = fn(node.primary_ip for node in node_info
1974 if node.master_candidate)
1975 node_data = fn(node_names)
1976 node_pri_ips_data = fn(node_pri_ips)
1977 node_snd_ips_data = fn(node_snd_ips)
1979 cluster = self._config_data.cluster
1980 cluster_tags = fn(cluster.GetTags())
1982 hypervisor_list = fn(cluster.enabled_hypervisors)
1984 uid_pool = uidpool.FormatUidPool(cluster.uid_pool, separator="\n")
1986 nodegroups = ["%s %s" % (nodegroup.uuid, nodegroup.name) for nodegroup in
1987 self._config_data.nodegroups.values()]
1988 nodegroups_data = fn(utils.NiceSort(nodegroups))
1991 constants.SS_CLUSTER_NAME: cluster.cluster_name,
1992 constants.SS_CLUSTER_TAGS: cluster_tags,
1993 constants.SS_FILE_STORAGE_DIR: cluster.file_storage_dir,
1994 constants.SS_SHARED_FILE_STORAGE_DIR: cluster.shared_file_storage_dir,
1995 constants.SS_MASTER_CANDIDATES: mc_data,
1996 constants.SS_MASTER_CANDIDATES_IPS: mc_ips_data,
1997 constants.SS_MASTER_IP: cluster.master_ip,
1998 constants.SS_MASTER_NETDEV: cluster.master_netdev,
1999 constants.SS_MASTER_NETMASK: str(cluster.master_netmask),
2000 constants.SS_MASTER_NODE: cluster.master_node,
2001 constants.SS_NODE_LIST: node_data,
2002 constants.SS_NODE_PRIMARY_IPS: node_pri_ips_data,
2003 constants.SS_NODE_SECONDARY_IPS: node_snd_ips_data,
2004 constants.SS_OFFLINE_NODES: off_data,
2005 constants.SS_ONLINE_NODES: on_data,
2006 constants.SS_PRIMARY_IP_FAMILY: str(cluster.primary_ip_family),
2007 constants.SS_INSTANCE_LIST: instance_data,
2008 constants.SS_RELEASE_VERSION: constants.RELEASE_VERSION,
2009 constants.SS_HYPERVISOR_LIST: hypervisor_list,
2010 constants.SS_MAINTAIN_NODE_HEALTH: str(cluster.maintain_node_health),
2011 constants.SS_UID_POOL: uid_pool,
2012 constants.SS_NODEGROUPS: nodegroups_data,
2014 bad_values = [(k, v) for k, v in ssconf_values.items()
2015 if not isinstance(v, (str, basestring))]
2017 err = utils.CommaJoin("%s=%s" % (k, v) for k, v in bad_values)
2018 raise errors.ConfigurationError("Some ssconf key(s) have non-string"
2019 " values: %s" % err)
2020 return ssconf_values
2022 @locking.ssynchronized(_config_lock, shared=1)
2023 def GetSsconfValues(self):
2024 """Wrapper using lock around _UnlockedGetSsconf().
2027 return self._UnlockedGetSsconfValues()
2029 @locking.ssynchronized(_config_lock, shared=1)
2030 def GetVGName(self):
2031 """Return the volume group name.
2034 return self._config_data.cluster.volume_group_name
2036 @locking.ssynchronized(_config_lock)
2037 def SetVGName(self, vg_name):
2038 """Set the volume group name.
2041 self._config_data.cluster.volume_group_name = vg_name
2042 self._config_data.cluster.serial_no += 1
2045 @locking.ssynchronized(_config_lock, shared=1)
2046 def GetDRBDHelper(self):
2047 """Return DRBD usermode helper.
2050 return self._config_data.cluster.drbd_usermode_helper
2052 @locking.ssynchronized(_config_lock)
2053 def SetDRBDHelper(self, drbd_helper):
2054 """Set DRBD usermode helper.
2057 self._config_data.cluster.drbd_usermode_helper = drbd_helper
2058 self._config_data.cluster.serial_no += 1
2061 @locking.ssynchronized(_config_lock, shared=1)
2062 def GetMACPrefix(self):
2063 """Return the mac prefix.
2066 return self._config_data.cluster.mac_prefix
2068 @locking.ssynchronized(_config_lock, shared=1)
2069 def GetClusterInfo(self):
2070 """Returns information about the cluster
2072 @rtype: L{objects.Cluster}
2073 @return: the cluster object
2076 return self._config_data.cluster
2078 @locking.ssynchronized(_config_lock, shared=1)
2079 def HasAnyDiskOfType(self, dev_type):
2080 """Check if in there is at disk of the given type in the configuration.
2083 return self._config_data.HasAnyDiskOfType(dev_type)
2085 @locking.ssynchronized(_config_lock)
2086 def Update(self, target, feedback_fn):
2087 """Notify function to be called after updates.
2089 This function must be called when an object (as returned by
2090 GetInstanceInfo, GetNodeInfo, GetCluster) has been updated and the
2091 caller wants the modifications saved to the backing store. Note
2092 that all modified objects will be saved, but the target argument
2093 is the one the caller wants to ensure that it's saved.
2095 @param target: an instance of either L{objects.Cluster},
2096 L{objects.Node} or L{objects.Instance} which is existing in
2098 @param feedback_fn: Callable feedback function
2101 if self._config_data is None:
2102 raise errors.ProgrammerError("Configuration file not read,"
2104 update_serial = False
2105 if isinstance(target, objects.Cluster):
2106 test = target == self._config_data.cluster
2107 elif isinstance(target, objects.Node):
2108 test = target in self._config_data.nodes.values()
2109 update_serial = True
2110 elif isinstance(target, objects.Instance):
2111 test = target in self._config_data.instances.values()
2112 elif isinstance(target, objects.NodeGroup):
2113 test = target in self._config_data.nodegroups.values()
2115 raise errors.ProgrammerError("Invalid object type (%s) passed to"
2116 " ConfigWriter.Update" % type(target))
2118 raise errors.ConfigurationError("Configuration updated since object"
2119 " has been read or unknown object")
2120 target.serial_no += 1
2121 target.mtime = now = time.time()
2124 # for node updates, we need to increase the cluster serial too
2125 self._config_data.cluster.serial_no += 1
2126 self._config_data.cluster.mtime = now
2128 if isinstance(target, objects.Instance):
2129 self._UnlockedReleaseDRBDMinors(target.name)
2131 self._WriteConfig(feedback_fn=feedback_fn)
2133 @locking.ssynchronized(_config_lock)
2134 def DropECReservations(self, ec_id):
2135 """Drop per-execution-context reservations
2138 for rm in self._all_rms:
2139 rm.DropECReservations(ec_id)