4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Configuration management for Ganeti
24 This module provides the interface to the Ganeti cluster configuration.
26 The configuration data is stored on every node but is updated on the master
27 only. After each update, the master distributes the data to the other nodes.
29 Currently, the data storage format is JSON. YAML was slow and consuming too
39 from ganeti import errors
40 from ganeti import locking
41 from ganeti import utils
42 from ganeti import constants
43 from ganeti import rpc
44 from ganeti import objects
45 from ganeti import serializer
48 _config_lock = locking.SharedLock()
50 # job id used for resource management at config upgrade time
51 _UPGRADE_CONFIG_JID = "jid-cfg-upgrade"
54 def _ValidateConfig(data):
55 """Verifies that a configuration objects looks valid.
57 This only verifies the version of the configuration.
59 @raise errors.ConfigurationError: if the version differs from what
63 if data.version != constants.CONFIG_VERSION:
64 raise errors.ConfigurationError("Cluster configuration version"
65 " mismatch, got %s instead of %s" %
67 constants.CONFIG_VERSION))
70 class TemporaryReservationManager:
71 """A temporary resource reservation manager.
73 This is used to reserve resources in a job, before using them, making sure
74 other jobs cannot get them in the meantime.
78 self._ec_reserved = {}
80 def Reserved(self, resource):
81 for holder_reserved in self._ec_reserved.items():
82 if resource in holder_reserved:
86 def Reserve(self, ec_id, resource):
87 if self.Reserved(resource):
88 raise errors.ReservationError("Duplicate reservation for resource: %s." %
90 if ec_id not in self._ec_reserved:
91 self._ec_reserved[ec_id] = set([resource])
93 self._ec_reserved[ec_id].add(resource)
95 def DropECReservations(self, ec_id):
96 if ec_id in self._ec_reserved:
97 del self._ec_reserved[ec_id]
99 def GetReserved(self):
101 for holder_reserved in self._ec_reserved.values():
102 all_reserved.update(holder_reserved)
105 def Generate(self, existing, generate_one_fn, ec_id):
106 """Generate a new resource of this type
109 assert callable(generate_one_fn)
111 all_elems = self.GetReserved()
112 all_elems.update(existing)
115 new_resource = generate_one_fn()
116 if new_resource is not None and new_resource not in all_elems:
119 raise errors.ConfigurationError("Not able generate new resource"
120 " (last tried: %s)" % new_resource)
121 self.Reserve(ec_id, new_resource)
126 """The interface to the cluster configuration.
128 @ivar _temporary_lvs: reservation manager for temporary LVs
129 @ivar _all_rms: a list of all temporary reservation managers
132 def __init__(self, cfg_file=None, offline=False):
134 self._lock = _config_lock
135 self._config_data = None
136 self._offline = offline
138 self._cfg_file = constants.CLUSTER_CONF_FILE
140 self._cfg_file = cfg_file
141 self._temporary_ids = TemporaryReservationManager()
142 self._temporary_drbds = {}
143 self._temporary_macs = TemporaryReservationManager()
144 self._temporary_secrets = TemporaryReservationManager()
145 self._temporary_lvs = TemporaryReservationManager()
146 self._all_rms = [self._temporary_ids, self._temporary_macs,
147 self._temporary_secrets, self._temporary_lvs]
148 # Note: in order to prevent errors when resolving our name in
149 # _DistributeConfig, we compute it here once and reuse it; it's
150 # better to raise an error before starting to modify the config
151 # file than after it was modified
152 self._my_hostname = utils.HostInfo().name
153 self._last_cluster_serial = -1
156 # this method needs to be static, so that we can call it on the class
159 """Check if the cluster is configured.
162 return os.path.exists(constants.CLUSTER_CONF_FILE)
164 def _GenerateOneMAC(self):
165 """Generate one mac address
168 prefix = self._config_data.cluster.mac_prefix
169 byte1 = random.randrange(0, 256)
170 byte2 = random.randrange(0, 256)
171 byte3 = random.randrange(0, 256)
172 mac = "%s:%02x:%02x:%02x" % (prefix, byte1, byte2, byte3)
175 @locking.ssynchronized(_config_lock, shared=1)
176 def GenerateMAC(self, ec_id):
177 """Generate a MAC for an instance.
179 This should check the current instances for duplicates.
182 existing = self._AllMACs()
183 return self._temporary_ids.Generate(existing, self._GenerateOneMAC, ec_id)
185 @locking.ssynchronized(_config_lock, shared=1)
186 def ReserveMAC(self, mac, ec_id):
187 """Reserve a MAC for an instance.
189 This only checks instances managed by this cluster, it does not
190 check for potential collisions elsewhere.
193 all_macs = self._AllMACs()
195 raise errors.ReservationError("mac already in use")
197 self._temporary_macs.Reserve(mac, ec_id)
199 @locking.ssynchronized(_config_lock, shared=1)
200 def ReserveLV(self, lv_name, ec_id):
201 """Reserve an VG/LV pair for an instance.
203 @type lv_name: string
204 @param lv_name: the logical volume name to reserve
207 all_lvs = self._AllLVs()
208 if lv_name in all_lvs:
209 raise errors.ReservationError("LV already in use")
211 self._temporary_lvs.Reserve(lv_name, ec_id)
213 @locking.ssynchronized(_config_lock, shared=1)
214 def GenerateDRBDSecret(self, ec_id):
215 """Generate a DRBD secret.
217 This checks the current disks for duplicates.
220 return self._temporary_secrets.Generate(self._AllDRBDSecrets(),
221 utils.GenerateSecret,
225 """Compute the list of all LVs.
229 for instance in self._config_data.instances.values():
230 node_data = instance.MapLVsByNode()
231 for lv_list in node_data.values():
232 lvnames.update(lv_list)
235 def _AllIDs(self, include_temporary):
236 """Compute the list of all UUIDs and names we have.
238 @type include_temporary: boolean
239 @param include_temporary: whether to include the _temporary_ids set
241 @return: a set of IDs
245 if include_temporary:
246 existing.update(self._temporary_ids.GetReserved())
247 existing.update(self._AllLVs())
248 existing.update(self._config_data.instances.keys())
249 existing.update(self._config_data.nodes.keys())
250 existing.update([i.uuid for i in self._AllUUIDObjects() if i.uuid])
253 def _GenerateUniqueID(self, ec_id):
254 """Generate an unique UUID.
256 This checks the current node, instances and disk names for
260 @return: the unique id
263 existing = self._AllIDs(include_temporary=False)
264 return self._temporary_ids.Generate(existing, utils.NewUUID, ec_id)
266 @locking.ssynchronized(_config_lock, shared=1)
267 def GenerateUniqueID(self, ec_id):
268 """Generate an unique ID.
270 This is just a wrapper over the unlocked version.
273 @param ec_id: unique id for the job to reserve the id to
276 return self._GenerateUniqueID(ec_id)
279 """Return all MACs present in the config.
282 @return: the list of all MACs
286 for instance in self._config_data.instances.values():
287 for nic in instance.nics:
288 result.append(nic.mac)
292 def _AllDRBDSecrets(self):
293 """Return all DRBD secrets present in the config.
296 @return: the list of all DRBD secrets
299 def helper(disk, result):
300 """Recursively gather secrets from this disk."""
301 if disk.dev_type == constants.DT_DRBD8:
302 result.append(disk.logical_id[5])
304 for child in disk.children:
305 helper(child, result)
308 for instance in self._config_data.instances.values():
309 for disk in instance.disks:
314 def _CheckDiskIDs(self, disk, l_ids, p_ids):
315 """Compute duplicate disk IDs
317 @type disk: L{objects.Disk}
318 @param disk: the disk at which to start searching
320 @param l_ids: list of current logical ids
322 @param p_ids: list of current physical ids
324 @return: a list of error messages
328 if disk.logical_id is not None:
329 if disk.logical_id in l_ids:
330 result.append("duplicate logical id %s" % str(disk.logical_id))
332 l_ids.append(disk.logical_id)
333 if disk.physical_id is not None:
334 if disk.physical_id in p_ids:
335 result.append("duplicate physical id %s" % str(disk.physical_id))
337 p_ids.append(disk.physical_id)
340 for child in disk.children:
341 result.extend(self._CheckDiskIDs(child, l_ids, p_ids))
344 def _UnlockedVerifyConfig(self):
348 @return: a list of error messages; a non-empty list signifies
355 data = self._config_data
359 # global cluster checks
360 if not data.cluster.enabled_hypervisors:
361 result.append("enabled hypervisors list doesn't have any entries")
362 invalid_hvs = set(data.cluster.enabled_hypervisors) - constants.HYPER_TYPES
364 result.append("enabled hypervisors contains invalid entries: %s" %
367 if data.cluster.master_node not in data.nodes:
368 result.append("cluster has invalid primary node '%s'" %
369 data.cluster.master_node)
371 # per-instance checks
372 for instance_name in data.instances:
373 instance = data.instances[instance_name]
374 if instance.name != instance_name:
375 result.append("instance '%s' is indexed by wrong name '%s'" %
376 (instance.name, instance_name))
377 if instance.primary_node not in data.nodes:
378 result.append("instance '%s' has invalid primary node '%s'" %
379 (instance_name, instance.primary_node))
380 for snode in instance.secondary_nodes:
381 if snode not in data.nodes:
382 result.append("instance '%s' has invalid secondary node '%s'" %
383 (instance_name, snode))
384 for idx, nic in enumerate(instance.nics):
385 if nic.mac in seen_macs:
386 result.append("instance '%s' has NIC %d mac %s duplicate" %
387 (instance_name, idx, nic.mac))
389 seen_macs.append(nic.mac)
391 # gather the drbd ports for duplicate checks
392 for dsk in instance.disks:
393 if dsk.dev_type in constants.LDS_DRBD:
394 tcp_port = dsk.logical_id[2]
395 if tcp_port not in ports:
397 ports[tcp_port].append((instance.name, "drbd disk %s" % dsk.iv_name))
398 # gather network port reservation
399 net_port = getattr(instance, "network_port", None)
400 if net_port is not None:
401 if net_port not in ports:
403 ports[net_port].append((instance.name, "network port"))
405 # instance disk verify
406 for idx, disk in enumerate(instance.disks):
407 result.extend(["instance '%s' disk %d error: %s" %
408 (instance.name, idx, msg) for msg in disk.Verify()])
409 result.extend(self._CheckDiskIDs(disk, seen_lids, seen_pids))
411 # cluster-wide pool of free ports
412 for free_port in data.cluster.tcpudp_port_pool:
413 if free_port not in ports:
414 ports[free_port] = []
415 ports[free_port].append(("cluster", "port marked as free"))
417 # compute tcp/udp duplicate ports
423 txt = utils.CommaJoin(["%s/%s" % val for val in pdata])
424 result.append("tcp/udp port %s has duplicates: %s" % (pnum, txt))
426 # highest used tcp port check
428 if keys[-1] > data.cluster.highest_used_port:
429 result.append("Highest used port mismatch, saved %s, computed %s" %
430 (data.cluster.highest_used_port, keys[-1]))
432 if not data.nodes[data.cluster.master_node].master_candidate:
433 result.append("Master node is not a master candidate")
435 # master candidate checks
436 mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats()
438 result.append("Not enough master candidates: actual %d, target %d" %
442 for node_name, node in data.nodes.items():
443 if node.name != node_name:
444 result.append("Node '%s' is indexed by wrong name '%s'" %
445 (node.name, node_name))
446 if [node.master_candidate, node.drained, node.offline].count(True) > 1:
447 result.append("Node %s state is invalid: master_candidate=%s,"
448 " drain=%s, offline=%s" %
449 (node.name, node.master_candidate, node.drain,
453 _, duplicates = self._UnlockedComputeDRBDMap()
454 for node, minor, instance_a, instance_b in duplicates:
455 result.append("DRBD minor %d on node %s is assigned twice to instances"
456 " %s and %s" % (minor, node, instance_a, instance_b))
459 default_nicparams = data.cluster.nicparams[constants.PP_DEFAULT]
462 def _AddIpAddress(ip, name):
463 ips.setdefault(ip, []).append(name)
465 _AddIpAddress(data.cluster.master_ip, "cluster_ip")
467 for node in data.nodes.values():
468 _AddIpAddress(node.primary_ip, "node:%s/primary" % node.name)
469 if node.secondary_ip != node.primary_ip:
470 _AddIpAddress(node.secondary_ip, "node:%s/secondary" % node.name)
472 for instance in data.instances.values():
473 for idx, nic in enumerate(instance.nics):
477 nicparams = objects.FillDict(default_nicparams, nic.nicparams)
478 nic_mode = nicparams[constants.NIC_MODE]
479 nic_link = nicparams[constants.NIC_LINK]
481 if nic_mode == constants.NIC_MODE_BRIDGED:
482 link = "bridge:%s" % nic_link
483 elif nic_mode == constants.NIC_MODE_ROUTED:
484 link = "route:%s" % nic_link
486 raise errors.ProgrammerError("NIC mode '%s' not handled" % nic_mode)
488 _AddIpAddress("%s/%s" % (link, nic.ip),
489 "instance:%s/nic:%d" % (instance.name, idx))
491 for ip, owners in ips.items():
493 result.append("IP address %s is used by multiple owners: %s" %
494 (ip, utils.CommaJoin(owners)))
498 @locking.ssynchronized(_config_lock, shared=1)
499 def VerifyConfig(self):
502 This is just a wrapper over L{_UnlockedVerifyConfig}.
505 @return: a list of error messages; a non-empty list signifies
509 return self._UnlockedVerifyConfig()
511 def _UnlockedSetDiskID(self, disk, node_name):
512 """Convert the unique ID to the ID needed on the target nodes.
514 This is used only for drbd, which needs ip/port configuration.
516 The routine descends down and updates its children also, because
517 this helps when the only the top device is passed to the remote
520 This function is for internal use, when the config lock is already held.
524 for child in disk.children:
525 self._UnlockedSetDiskID(child, node_name)
527 if disk.logical_id is None and disk.physical_id is not None:
529 if disk.dev_type == constants.LD_DRBD8:
530 pnode, snode, port, pminor, sminor, secret = disk.logical_id
531 if node_name not in (pnode, snode):
532 raise errors.ConfigurationError("DRBD device not knowing node %s" %
534 pnode_info = self._UnlockedGetNodeInfo(pnode)
535 snode_info = self._UnlockedGetNodeInfo(snode)
536 if pnode_info is None or snode_info is None:
537 raise errors.ConfigurationError("Can't find primary or secondary node"
538 " for %s" % str(disk))
539 p_data = (pnode_info.secondary_ip, port)
540 s_data = (snode_info.secondary_ip, port)
541 if pnode == node_name:
542 disk.physical_id = p_data + s_data + (pminor, secret)
543 else: # it must be secondary, we tested above
544 disk.physical_id = s_data + p_data + (sminor, secret)
546 disk.physical_id = disk.logical_id
549 @locking.ssynchronized(_config_lock)
550 def SetDiskID(self, disk, node_name):
551 """Convert the unique ID to the ID needed on the target nodes.
553 This is used only for drbd, which needs ip/port configuration.
555 The routine descends down and updates its children also, because
556 this helps when the only the top device is passed to the remote
560 return self._UnlockedSetDiskID(disk, node_name)
562 @locking.ssynchronized(_config_lock)
563 def AddTcpUdpPort(self, port):
564 """Adds a new port to the available port pool.
567 if not isinstance(port, int):
568 raise errors.ProgrammerError("Invalid type passed for port")
570 self._config_data.cluster.tcpudp_port_pool.add(port)
573 @locking.ssynchronized(_config_lock, shared=1)
574 def GetPortList(self):
575 """Returns a copy of the current port list.
578 return self._config_data.cluster.tcpudp_port_pool.copy()
580 @locking.ssynchronized(_config_lock)
581 def AllocatePort(self):
584 The port will be taken from the available port pool or from the
585 default port range (and in this case we increase
589 # If there are TCP/IP ports configured, we use them first.
590 if self._config_data.cluster.tcpudp_port_pool:
591 port = self._config_data.cluster.tcpudp_port_pool.pop()
593 port = self._config_data.cluster.highest_used_port + 1
594 if port >= constants.LAST_DRBD_PORT:
595 raise errors.ConfigurationError("The highest used port is greater"
596 " than %s. Aborting." %
597 constants.LAST_DRBD_PORT)
598 self._config_data.cluster.highest_used_port = port
603 def _UnlockedComputeDRBDMap(self):
604 """Compute the used DRBD minor/nodes.
607 @return: dictionary of node_name: dict of minor: instance_name;
608 the returned dict will have all the nodes in it (even if with
609 an empty list), and a list of duplicates; if the duplicates
610 list is not empty, the configuration is corrupted and its caller
611 should raise an exception
614 def _AppendUsedPorts(instance_name, disk, used):
616 if disk.dev_type == constants.LD_DRBD8 and len(disk.logical_id) >= 5:
617 node_a, node_b, _, minor_a, minor_b = disk.logical_id[:5]
618 for node, port in ((node_a, minor_a), (node_b, minor_b)):
619 assert node in used, ("Node '%s' of instance '%s' not found"
620 " in node list" % (node, instance_name))
621 if port in used[node]:
622 duplicates.append((node, port, instance_name, used[node][port]))
624 used[node][port] = instance_name
626 for child in disk.children:
627 duplicates.extend(_AppendUsedPorts(instance_name, child, used))
631 my_dict = dict((node, {}) for node in self._config_data.nodes)
632 for instance in self._config_data.instances.itervalues():
633 for disk in instance.disks:
634 duplicates.extend(_AppendUsedPorts(instance.name, disk, my_dict))
635 for (node, minor), instance in self._temporary_drbds.iteritems():
636 if minor in my_dict[node] and my_dict[node][minor] != instance:
637 duplicates.append((node, minor, instance, my_dict[node][minor]))
639 my_dict[node][minor] = instance
640 return my_dict, duplicates
642 @locking.ssynchronized(_config_lock)
643 def ComputeDRBDMap(self):
644 """Compute the used DRBD minor/nodes.
646 This is just a wrapper over L{_UnlockedComputeDRBDMap}.
648 @return: dictionary of node_name: dict of minor: instance_name;
649 the returned dict will have all the nodes in it (even if with
653 d_map, duplicates = self._UnlockedComputeDRBDMap()
655 raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
659 @locking.ssynchronized(_config_lock)
660 def AllocateDRBDMinor(self, nodes, instance):
661 """Allocate a drbd minor.
663 The free minor will be automatically computed from the existing
664 devices. A node can be given multiple times in order to allocate
665 multiple minors. The result is the list of minors, in the same
666 order as the passed nodes.
668 @type instance: string
669 @param instance: the instance for which we allocate minors
672 assert isinstance(instance, basestring), \
673 "Invalid argument '%s' passed to AllocateDRBDMinor" % instance
675 d_map, duplicates = self._UnlockedComputeDRBDMap()
677 raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
683 # no minors used, we can start at 0
686 self._temporary_drbds[(nname, 0)] = instance
690 ffree = utils.FirstFree(keys)
692 # return the next minor
693 # TODO: implement high-limit check
697 # double-check minor against current instances
698 assert minor not in d_map[nname], \
699 ("Attempt to reuse allocated DRBD minor %d on node %s,"
700 " already allocated to instance %s" %
701 (minor, nname, d_map[nname][minor]))
702 ndata[minor] = instance
703 # double-check minor against reservation
704 r_key = (nname, minor)
705 assert r_key not in self._temporary_drbds, \
706 ("Attempt to reuse reserved DRBD minor %d on node %s,"
707 " reserved for instance %s" %
708 (minor, nname, self._temporary_drbds[r_key]))
709 self._temporary_drbds[r_key] = instance
711 logging.debug("Request to allocate drbd minors, input: %s, returning %s",
715 def _UnlockedReleaseDRBDMinors(self, instance):
716 """Release temporary drbd minors allocated for a given instance.
718 @type instance: string
719 @param instance: the instance for which temporary minors should be
723 assert isinstance(instance, basestring), \
724 "Invalid argument passed to ReleaseDRBDMinors"
725 for key, name in self._temporary_drbds.items():
727 del self._temporary_drbds[key]
729 @locking.ssynchronized(_config_lock)
730 def ReleaseDRBDMinors(self, instance):
731 """Release temporary drbd minors allocated for a given instance.
733 This should be called on the error paths, on the success paths
734 it's automatically called by the ConfigWriter add and update
737 This function is just a wrapper over L{_UnlockedReleaseDRBDMinors}.
739 @type instance: string
740 @param instance: the instance for which temporary minors should be
744 self._UnlockedReleaseDRBDMinors(instance)
746 @locking.ssynchronized(_config_lock, shared=1)
747 def GetConfigVersion(self):
748 """Get the configuration version.
750 @return: Config version
753 return self._config_data.version
755 @locking.ssynchronized(_config_lock, shared=1)
756 def GetClusterName(self):
759 @return: Cluster name
762 return self._config_data.cluster.cluster_name
764 @locking.ssynchronized(_config_lock, shared=1)
765 def GetMasterNode(self):
766 """Get the hostname of the master node for this cluster.
768 @return: Master hostname
771 return self._config_data.cluster.master_node
773 @locking.ssynchronized(_config_lock, shared=1)
774 def GetMasterIP(self):
775 """Get the IP of the master node for this cluster.
780 return self._config_data.cluster.master_ip
782 @locking.ssynchronized(_config_lock, shared=1)
783 def GetMasterNetdev(self):
784 """Get the master network device for this cluster.
787 return self._config_data.cluster.master_netdev
789 @locking.ssynchronized(_config_lock, shared=1)
790 def GetFileStorageDir(self):
791 """Get the file storage dir for this cluster.
794 return self._config_data.cluster.file_storage_dir
796 @locking.ssynchronized(_config_lock, shared=1)
797 def GetHypervisorType(self):
798 """Get the hypervisor type for this cluster.
801 return self._config_data.cluster.enabled_hypervisors[0]
803 @locking.ssynchronized(_config_lock, shared=1)
804 def GetHostKey(self):
805 """Return the rsa hostkey from the config.
808 @return: the rsa hostkey
811 return self._config_data.cluster.rsahostkeypub
813 @locking.ssynchronized(_config_lock)
814 def AddInstance(self, instance, ec_id):
815 """Add an instance to the config.
817 This should be used after creating a new instance.
819 @type instance: L{objects.Instance}
820 @param instance: the instance object
823 if not isinstance(instance, objects.Instance):
824 raise errors.ProgrammerError("Invalid type passed to AddInstance")
826 if instance.disk_template != constants.DT_DISKLESS:
827 all_lvs = instance.MapLVsByNode()
828 logging.info("Instance '%s' DISK_LAYOUT: %s", instance.name, all_lvs)
830 all_macs = self._AllMACs()
831 for nic in instance.nics:
832 if nic.mac in all_macs:
833 raise errors.ConfigurationError("Cannot add instance %s:"
834 " MAC address '%s' already in use." %
835 (instance.name, nic.mac))
837 self._EnsureUUID(instance, ec_id)
839 instance.serial_no = 1
840 instance.ctime = instance.mtime = time.time()
841 self._config_data.instances[instance.name] = instance
842 self._config_data.cluster.serial_no += 1
843 self._UnlockedReleaseDRBDMinors(instance.name)
846 def _EnsureUUID(self, item, ec_id):
847 """Ensures a given object has a valid UUID.
849 @param item: the instance or node to be checked
850 @param ec_id: the execution context id for the uuid reservation
854 item.uuid = self._GenerateUniqueID(ec_id)
855 elif item.uuid in self._AllIDs(include_temporary=True):
856 raise errors.ConfigurationError("Cannot add '%s': UUID %s already"
857 " in use" % (item.name, item.uuid))
859 def _SetInstanceStatus(self, instance_name, status):
860 """Set the instance's status to a given value.
863 assert isinstance(status, bool), \
864 "Invalid status '%s' passed to SetInstanceStatus" % (status,)
866 if instance_name not in self._config_data.instances:
867 raise errors.ConfigurationError("Unknown instance '%s'" %
869 instance = self._config_data.instances[instance_name]
870 if instance.admin_up != status:
871 instance.admin_up = status
872 instance.serial_no += 1
873 instance.mtime = time.time()
876 @locking.ssynchronized(_config_lock)
877 def MarkInstanceUp(self, instance_name):
878 """Mark the instance status to up in the config.
881 self._SetInstanceStatus(instance_name, True)
883 @locking.ssynchronized(_config_lock)
884 def RemoveInstance(self, instance_name):
885 """Remove the instance from the configuration.
888 if instance_name not in self._config_data.instances:
889 raise errors.ConfigurationError("Unknown instance '%s'" % instance_name)
890 del self._config_data.instances[instance_name]
891 self._config_data.cluster.serial_no += 1
894 @locking.ssynchronized(_config_lock)
895 def RenameInstance(self, old_name, new_name):
896 """Rename an instance.
898 This needs to be done in ConfigWriter and not by RemoveInstance
899 combined with AddInstance as only we can guarantee an atomic
903 if old_name not in self._config_data.instances:
904 raise errors.ConfigurationError("Unknown instance '%s'" % old_name)
905 inst = self._config_data.instances[old_name]
906 del self._config_data.instances[old_name]
909 for disk in inst.disks:
910 if disk.dev_type == constants.LD_FILE:
911 # rename the file paths in logical and physical id
912 file_storage_dir = os.path.dirname(os.path.dirname(disk.logical_id[1]))
913 disk.physical_id = disk.logical_id = (disk.logical_id[0],
914 utils.PathJoin(file_storage_dir,
918 self._config_data.instances[inst.name] = inst
921 @locking.ssynchronized(_config_lock)
922 def MarkInstanceDown(self, instance_name):
923 """Mark the status of an instance to down in the configuration.
926 self._SetInstanceStatus(instance_name, False)
928 def _UnlockedGetInstanceList(self):
929 """Get the list of instances.
931 This function is for internal use, when the config lock is already held.
934 return self._config_data.instances.keys()
936 @locking.ssynchronized(_config_lock, shared=1)
937 def GetInstanceList(self):
938 """Get the list of instances.
940 @return: array of instances, ex. ['instance2.example.com',
941 'instance1.example.com']
944 return self._UnlockedGetInstanceList()
946 @locking.ssynchronized(_config_lock, shared=1)
947 def ExpandInstanceName(self, short_name):
948 """Attempt to expand an incomplete instance name.
951 return utils.MatchNameComponent(short_name,
952 self._config_data.instances.keys(),
953 case_sensitive=False)
955 def _UnlockedGetInstanceInfo(self, instance_name):
956 """Returns information about an instance.
958 This function is for internal use, when the config lock is already held.
961 if instance_name not in self._config_data.instances:
964 return self._config_data.instances[instance_name]
966 @locking.ssynchronized(_config_lock, shared=1)
967 def GetInstanceInfo(self, instance_name):
968 """Returns information about an instance.
970 It takes the information from the configuration file. Other information of
971 an instance are taken from the live systems.
973 @param instance_name: name of the instance, e.g.
974 I{instance1.example.com}
976 @rtype: L{objects.Instance}
977 @return: the instance object
980 return self._UnlockedGetInstanceInfo(instance_name)
982 @locking.ssynchronized(_config_lock, shared=1)
983 def GetAllInstancesInfo(self):
984 """Get the configuration of all instances.
987 @return: dict of (instance, instance_info), where instance_info is what
988 would GetInstanceInfo return for the node
991 my_dict = dict([(instance, self._UnlockedGetInstanceInfo(instance))
992 for instance in self._UnlockedGetInstanceList()])
995 @locking.ssynchronized(_config_lock)
996 def AddNode(self, node, ec_id):
997 """Add a node to the configuration.
999 @type node: L{objects.Node}
1000 @param node: a Node instance
1003 logging.info("Adding node %s to configuration", node.name)
1005 self._EnsureUUID(node, ec_id)
1008 node.ctime = node.mtime = time.time()
1009 self._config_data.nodes[node.name] = node
1010 self._config_data.cluster.serial_no += 1
1013 @locking.ssynchronized(_config_lock)
1014 def RemoveNode(self, node_name):
1015 """Remove a node from the configuration.
1018 logging.info("Removing node %s from configuration", node_name)
1020 if node_name not in self._config_data.nodes:
1021 raise errors.ConfigurationError("Unknown node '%s'" % node_name)
1023 del self._config_data.nodes[node_name]
1024 self._config_data.cluster.serial_no += 1
1027 @locking.ssynchronized(_config_lock, shared=1)
1028 def ExpandNodeName(self, short_name):
1029 """Attempt to expand an incomplete instance name.
1032 return utils.MatchNameComponent(short_name,
1033 self._config_data.nodes.keys(),
1034 case_sensitive=False)
1036 def _UnlockedGetNodeInfo(self, node_name):
1037 """Get the configuration of a node, as stored in the config.
1039 This function is for internal use, when the config lock is already
1042 @param node_name: the node name, e.g. I{node1.example.com}
1044 @rtype: L{objects.Node}
1045 @return: the node object
1048 if node_name not in self._config_data.nodes:
1051 return self._config_data.nodes[node_name]
1053 @locking.ssynchronized(_config_lock, shared=1)
1054 def GetNodeInfo(self, node_name):
1055 """Get the configuration of a node, as stored in the config.
1057 This is just a locked wrapper over L{_UnlockedGetNodeInfo}.
1059 @param node_name: the node name, e.g. I{node1.example.com}
1061 @rtype: L{objects.Node}
1062 @return: the node object
1065 return self._UnlockedGetNodeInfo(node_name)
1067 def _UnlockedGetNodeList(self):
1068 """Return the list of nodes which are in the configuration.
1070 This function is for internal use, when the config lock is already
1076 return self._config_data.nodes.keys()
1078 @locking.ssynchronized(_config_lock, shared=1)
1079 def GetNodeList(self):
1080 """Return the list of nodes which are in the configuration.
1083 return self._UnlockedGetNodeList()
1085 def _UnlockedGetOnlineNodeList(self):
1086 """Return the list of nodes which are online.
1089 all_nodes = [self._UnlockedGetNodeInfo(node)
1090 for node in self._UnlockedGetNodeList()]
1091 return [node.name for node in all_nodes if not node.offline]
1093 @locking.ssynchronized(_config_lock, shared=1)
1094 def GetOnlineNodeList(self):
1095 """Return the list of nodes which are online.
1098 return self._UnlockedGetOnlineNodeList()
1100 @locking.ssynchronized(_config_lock, shared=1)
1101 def GetAllNodesInfo(self):
1102 """Get the configuration of all nodes.
1105 @return: dict of (node, node_info), where node_info is what
1106 would GetNodeInfo return for the node
1109 my_dict = dict([(node, self._UnlockedGetNodeInfo(node))
1110 for node in self._UnlockedGetNodeList()])
1113 def _UnlockedGetMasterCandidateStats(self, exceptions=None):
1114 """Get the number of current and maximum desired and possible candidates.
1116 @type exceptions: list
1117 @param exceptions: if passed, list of nodes that should be ignored
1119 @return: tuple of (current, desired and possible, possible)
1122 mc_now = mc_should = mc_max = 0
1123 for node in self._config_data.nodes.values():
1124 if exceptions and node.name in exceptions:
1126 if not (node.offline or node.drained):
1128 if node.master_candidate:
1130 mc_should = min(mc_max, self._config_data.cluster.candidate_pool_size)
1131 return (mc_now, mc_should, mc_max)
1133 @locking.ssynchronized(_config_lock, shared=1)
1134 def GetMasterCandidateStats(self, exceptions=None):
1135 """Get the number of current and maximum possible candidates.
1137 This is just a wrapper over L{_UnlockedGetMasterCandidateStats}.
1139 @type exceptions: list
1140 @param exceptions: if passed, list of nodes that should be ignored
1142 @return: tuple of (current, max)
1145 return self._UnlockedGetMasterCandidateStats(exceptions)
1147 @locking.ssynchronized(_config_lock)
1148 def MaintainCandidatePool(self, exceptions):
1149 """Try to grow the candidate pool to the desired size.
1151 @type exceptions: list
1152 @param exceptions: if passed, list of nodes that should be ignored
1154 @return: list with the adjusted nodes (L{objects.Node} instances)
1157 mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats(exceptions)
1160 node_list = self._config_data.nodes.keys()
1161 random.shuffle(node_list)
1162 for name in node_list:
1163 if mc_now >= mc_max:
1165 node = self._config_data.nodes[name]
1166 if (node.master_candidate or node.offline or node.drained or
1167 node.name in exceptions):
1169 mod_list.append(node)
1170 node.master_candidate = True
1173 if mc_now != mc_max:
1174 # this should not happen
1175 logging.warning("Warning: MaintainCandidatePool didn't manage to"
1176 " fill the candidate pool (%d/%d)", mc_now, mc_max)
1178 self._config_data.cluster.serial_no += 1
1183 def _BumpSerialNo(self):
1184 """Bump up the serial number of the config.
1187 self._config_data.serial_no += 1
1188 self._config_data.mtime = time.time()
1190 def _AllUUIDObjects(self):
1191 """Returns all objects with uuid attributes.
1194 return (self._config_data.instances.values() +
1195 self._config_data.nodes.values() +
1196 [self._config_data.cluster])
1198 def _OpenConfig(self):
1199 """Read the config data from disk.
1202 raw_data = utils.ReadFile(self._cfg_file)
1205 data = objects.ConfigData.FromDict(serializer.Load(raw_data))
1206 except Exception, err:
1207 raise errors.ConfigurationError(err)
1209 # Make sure the configuration has the right version
1210 _ValidateConfig(data)
1212 if (not hasattr(data, 'cluster') or
1213 not hasattr(data.cluster, 'rsahostkeypub')):
1214 raise errors.ConfigurationError("Incomplete configuration"
1215 " (missing cluster.rsahostkeypub)")
1217 # Upgrade configuration if needed
1218 data.UpgradeConfig()
1220 self._config_data = data
1221 # reset the last serial as -1 so that the next write will cause
1223 self._last_cluster_serial = -1
1225 # And finally run our (custom) config upgrade sequence
1226 self._UpgradeConfig()
1228 def _UpgradeConfig(self):
1229 """Run upgrade steps that cannot be done purely in the objects.
1231 This is because some data elements need uniqueness across the
1232 whole configuration, etc.
1234 @warning: this function will call L{_WriteConfig()}, so it needs
1235 to either be called with the lock held or from a safe place
1240 for item in self._AllUUIDObjects():
1241 if item.uuid is None:
1242 item.uuid = self._GenerateUniqueID(_UPGRADE_CONFIG_JID)
1246 # This is ok even if it acquires the internal lock, as _UpgradeConfig is
1247 # only called at config init time, without the lock held
1248 self.DropECReservations(_UPGRADE_CONFIG_JID)
1250 def _DistributeConfig(self, feedback_fn):
1251 """Distribute the configuration to the other nodes.
1253 Currently, this only copies the configuration file. In the future,
1254 it could be used to encapsulate the 2/3-phase update mechanism.
1264 myhostname = self._my_hostname
1265 # we can skip checking whether _UnlockedGetNodeInfo returns None
1266 # since the node list comes from _UnlocketGetNodeList, and we are
1267 # called with the lock held, so no modifications should take place
1269 for node_name in self._UnlockedGetNodeList():
1270 if node_name == myhostname:
1272 node_info = self._UnlockedGetNodeInfo(node_name)
1273 if not node_info.master_candidate:
1275 node_list.append(node_info.name)
1276 addr_list.append(node_info.primary_ip)
1278 result = rpc.RpcRunner.call_upload_file(node_list, self._cfg_file,
1279 address_list=addr_list)
1280 for to_node, to_result in result.items():
1281 msg = to_result.fail_msg
1283 msg = ("Copy of file %s to node %s failed: %s" %
1284 (self._cfg_file, to_node, msg))
1294 def _WriteConfig(self, destination=None, feedback_fn=None):
1295 """Write the configuration data to persistent storage.
1298 assert feedback_fn is None or callable(feedback_fn)
1300 # Warn on config errors, but don't abort the save - the
1301 # configuration has already been modified, and we can't revert;
1302 # the best we can do is to warn the user and save as is, leaving
1303 # recovery to the user
1304 config_errors = self._UnlockedVerifyConfig()
1306 errmsg = ("Configuration data is not consistent: %s" %
1307 (utils.CommaJoin(config_errors)))
1308 logging.critical(errmsg)
1312 if destination is None:
1313 destination = self._cfg_file
1314 self._BumpSerialNo()
1315 txt = serializer.Dump(self._config_data.ToDict())
1317 utils.WriteFile(destination, data=txt)
1319 self.write_count += 1
1321 # and redistribute the config file to master candidates
1322 self._DistributeConfig(feedback_fn)
1324 # Write ssconf files on all nodes (including locally)
1325 if self._last_cluster_serial < self._config_data.cluster.serial_no:
1326 if not self._offline:
1327 result = rpc.RpcRunner.call_write_ssconf_files(
1328 self._UnlockedGetOnlineNodeList(),
1329 self._UnlockedGetSsconfValues())
1331 for nname, nresu in result.items():
1332 msg = nresu.fail_msg
1334 errmsg = ("Error while uploading ssconf files to"
1335 " node %s: %s" % (nname, msg))
1336 logging.warning(errmsg)
1341 self._last_cluster_serial = self._config_data.cluster.serial_no
1343 def _UnlockedGetSsconfValues(self):
1344 """Return the values needed by ssconf.
1347 @return: a dictionary with keys the ssconf names and values their
1352 instance_names = utils.NiceSort(self._UnlockedGetInstanceList())
1353 node_names = utils.NiceSort(self._UnlockedGetNodeList())
1354 node_info = [self._UnlockedGetNodeInfo(name) for name in node_names]
1355 node_pri_ips = ["%s %s" % (ninfo.name, ninfo.primary_ip)
1356 for ninfo in node_info]
1357 node_snd_ips = ["%s %s" % (ninfo.name, ninfo.secondary_ip)
1358 for ninfo in node_info]
1360 instance_data = fn(instance_names)
1361 off_data = fn(node.name for node in node_info if node.offline)
1362 on_data = fn(node.name for node in node_info if not node.offline)
1363 mc_data = fn(node.name for node in node_info if node.master_candidate)
1364 mc_ips_data = fn(node.primary_ip for node in node_info
1365 if node.master_candidate)
1366 node_data = fn(node_names)
1367 node_pri_ips_data = fn(node_pri_ips)
1368 node_snd_ips_data = fn(node_snd_ips)
1370 cluster = self._config_data.cluster
1371 cluster_tags = fn(cluster.GetTags())
1373 constants.SS_CLUSTER_NAME: cluster.cluster_name,
1374 constants.SS_CLUSTER_TAGS: cluster_tags,
1375 constants.SS_FILE_STORAGE_DIR: cluster.file_storage_dir,
1376 constants.SS_MASTER_CANDIDATES: mc_data,
1377 constants.SS_MASTER_CANDIDATES_IPS: mc_ips_data,
1378 constants.SS_MASTER_IP: cluster.master_ip,
1379 constants.SS_MASTER_NETDEV: cluster.master_netdev,
1380 constants.SS_MASTER_NODE: cluster.master_node,
1381 constants.SS_NODE_LIST: node_data,
1382 constants.SS_NODE_PRIMARY_IPS: node_pri_ips_data,
1383 constants.SS_NODE_SECONDARY_IPS: node_snd_ips_data,
1384 constants.SS_OFFLINE_NODES: off_data,
1385 constants.SS_ONLINE_NODES: on_data,
1386 constants.SS_INSTANCE_LIST: instance_data,
1387 constants.SS_RELEASE_VERSION: constants.RELEASE_VERSION,
1390 @locking.ssynchronized(_config_lock, shared=1)
1391 def GetVGName(self):
1392 """Return the volume group name.
1395 return self._config_data.cluster.volume_group_name
1397 @locking.ssynchronized(_config_lock)
1398 def SetVGName(self, vg_name):
1399 """Set the volume group name.
1402 self._config_data.cluster.volume_group_name = vg_name
1403 self._config_data.cluster.serial_no += 1
1406 @locking.ssynchronized(_config_lock, shared=1)
1407 def GetMACPrefix(self):
1408 """Return the mac prefix.
1411 return self._config_data.cluster.mac_prefix
1413 @locking.ssynchronized(_config_lock, shared=1)
1414 def GetClusterInfo(self):
1415 """Returns information about the cluster
1417 @rtype: L{objects.Cluster}
1418 @return: the cluster object
1421 return self._config_data.cluster
1423 @locking.ssynchronized(_config_lock)
1424 def Update(self, target, feedback_fn):
1425 """Notify function to be called after updates.
1427 This function must be called when an object (as returned by
1428 GetInstanceInfo, GetNodeInfo, GetCluster) has been updated and the
1429 caller wants the modifications saved to the backing store. Note
1430 that all modified objects will be saved, but the target argument
1431 is the one the caller wants to ensure that it's saved.
1433 @param target: an instance of either L{objects.Cluster},
1434 L{objects.Node} or L{objects.Instance} which is existing in
1436 @param feedback_fn: Callable feedback function
1439 if self._config_data is None:
1440 raise errors.ProgrammerError("Configuration file not read,"
1442 update_serial = False
1443 if isinstance(target, objects.Cluster):
1444 test = target == self._config_data.cluster
1445 elif isinstance(target, objects.Node):
1446 test = target in self._config_data.nodes.values()
1447 update_serial = True
1448 elif isinstance(target, objects.Instance):
1449 test = target in self._config_data.instances.values()
1451 raise errors.ProgrammerError("Invalid object type (%s) passed to"
1452 " ConfigWriter.Update" % type(target))
1454 raise errors.ConfigurationError("Configuration updated since object"
1455 " has been read or unknown object")
1456 target.serial_no += 1
1457 target.mtime = now = time.time()
1460 # for node updates, we need to increase the cluster serial too
1461 self._config_data.cluster.serial_no += 1
1462 self._config_data.cluster.mtime = now
1464 if isinstance(target, objects.Instance):
1465 self._UnlockedReleaseDRBDMinors(target.name)
1467 self._WriteConfig(feedback_fn=feedback_fn)
1469 @locking.ssynchronized(_config_lock)
1470 def DropECReservations(self, ec_id):
1471 """Drop per-execution-context reservations
1474 for rm in self._all_rms:
1475 rm.DropECReservations(ec_id)