4 # Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Configuration management for Ganeti
24 This module provides the interface to the Ganeti cluster configuration.
26 The configuration data is stored on every node but is updated on the master
27 only. After each update, the master distributes the data to the other nodes.
29 Currently, the data storage format is JSON. YAML was slow and consuming too
39 from ganeti import errors
40 from ganeti import locking
41 from ganeti import utils
42 from ganeti import constants
43 from ganeti import rpc
44 from ganeti import objects
45 from ganeti import serializer
46 from ganeti import uidpool
47 from ganeti import netutils
50 _config_lock = locking.SharedLock("ConfigWriter")
52 # job id used for resource management at config upgrade time
53 _UPGRADE_CONFIG_JID = "jid-cfg-upgrade"
56 def _ValidateConfig(data):
57 """Verifies that a configuration objects looks valid.
59 This only verifies the version of the configuration.
61 @raise errors.ConfigurationError: if the version differs from what
65 if data.version != constants.CONFIG_VERSION:
66 raise errors.ConfigurationError("Cluster configuration version"
67 " mismatch, got %s instead of %s" %
69 constants.CONFIG_VERSION))
72 class TemporaryReservationManager:
73 """A temporary resource reservation manager.
75 This is used to reserve resources in a job, before using them, making sure
76 other jobs cannot get them in the meantime.
80 self._ec_reserved = {}
82 def Reserved(self, resource):
83 for holder_reserved in self._ec_reserved.items():
84 if resource in holder_reserved:
88 def Reserve(self, ec_id, resource):
89 if self.Reserved(resource):
90 raise errors.ReservationError("Duplicate reservation for resource: %s." %
92 if ec_id not in self._ec_reserved:
93 self._ec_reserved[ec_id] = set([resource])
95 self._ec_reserved[ec_id].add(resource)
97 def DropECReservations(self, ec_id):
98 if ec_id in self._ec_reserved:
99 del self._ec_reserved[ec_id]
101 def GetReserved(self):
103 for holder_reserved in self._ec_reserved.values():
104 all_reserved.update(holder_reserved)
107 def Generate(self, existing, generate_one_fn, ec_id):
108 """Generate a new resource of this type
111 assert callable(generate_one_fn)
113 all_elems = self.GetReserved()
114 all_elems.update(existing)
117 new_resource = generate_one_fn()
118 if new_resource is not None and new_resource not in all_elems:
121 raise errors.ConfigurationError("Not able generate new resource"
122 " (last tried: %s)" % new_resource)
123 self.Reserve(ec_id, new_resource)
128 """The interface to the cluster configuration.
130 @ivar _temporary_lvs: reservation manager for temporary LVs
131 @ivar _all_rms: a list of all temporary reservation managers
134 def __init__(self, cfg_file=None, offline=False):
136 self._lock = _config_lock
137 self._config_data = None
138 self._offline = offline
140 self._cfg_file = constants.CLUSTER_CONF_FILE
142 self._cfg_file = cfg_file
143 self._temporary_ids = TemporaryReservationManager()
144 self._temporary_drbds = {}
145 self._temporary_macs = TemporaryReservationManager()
146 self._temporary_secrets = TemporaryReservationManager()
147 self._temporary_lvs = TemporaryReservationManager()
148 self._all_rms = [self._temporary_ids, self._temporary_macs,
149 self._temporary_secrets, self._temporary_lvs]
150 # Note: in order to prevent errors when resolving our name in
151 # _DistributeConfig, we compute it here once and reuse it; it's
152 # better to raise an error before starting to modify the config
153 # file than after it was modified
154 self._my_hostname = netutils.HostInfo().name
155 self._last_cluster_serial = -1
158 # this method needs to be static, so that we can call it on the class
161 """Check if the cluster is configured.
164 return os.path.exists(constants.CLUSTER_CONF_FILE)
166 def _GenerateOneMAC(self):
167 """Generate one mac address
170 prefix = self._config_data.cluster.mac_prefix
171 byte1 = random.randrange(0, 256)
172 byte2 = random.randrange(0, 256)
173 byte3 = random.randrange(0, 256)
174 mac = "%s:%02x:%02x:%02x" % (prefix, byte1, byte2, byte3)
177 @locking.ssynchronized(_config_lock, shared=1)
178 def GenerateMAC(self, ec_id):
179 """Generate a MAC for an instance.
181 This should check the current instances for duplicates.
184 existing = self._AllMACs()
185 return self._temporary_ids.Generate(existing, self._GenerateOneMAC, ec_id)
187 @locking.ssynchronized(_config_lock, shared=1)
188 def ReserveMAC(self, mac, ec_id):
189 """Reserve a MAC for an instance.
191 This only checks instances managed by this cluster, it does not
192 check for potential collisions elsewhere.
195 all_macs = self._AllMACs()
197 raise errors.ReservationError("mac already in use")
199 self._temporary_macs.Reserve(mac, ec_id)
201 @locking.ssynchronized(_config_lock, shared=1)
202 def ReserveLV(self, lv_name, ec_id):
203 """Reserve an VG/LV pair for an instance.
205 @type lv_name: string
206 @param lv_name: the logical volume name to reserve
209 all_lvs = self._AllLVs()
210 if lv_name in all_lvs:
211 raise errors.ReservationError("LV already in use")
213 self._temporary_lvs.Reserve(lv_name, ec_id)
215 @locking.ssynchronized(_config_lock, shared=1)
216 def GenerateDRBDSecret(self, ec_id):
217 """Generate a DRBD secret.
219 This checks the current disks for duplicates.
222 return self._temporary_secrets.Generate(self._AllDRBDSecrets(),
223 utils.GenerateSecret,
227 """Compute the list of all LVs.
231 for instance in self._config_data.instances.values():
232 node_data = instance.MapLVsByNode()
233 for lv_list in node_data.values():
234 lvnames.update(lv_list)
237 def _AllIDs(self, include_temporary):
238 """Compute the list of all UUIDs and names we have.
240 @type include_temporary: boolean
241 @param include_temporary: whether to include the _temporary_ids set
243 @return: a set of IDs
247 if include_temporary:
248 existing.update(self._temporary_ids.GetReserved())
249 existing.update(self._AllLVs())
250 existing.update(self._config_data.instances.keys())
251 existing.update(self._config_data.nodes.keys())
252 existing.update([i.uuid for i in self._AllUUIDObjects() if i.uuid])
255 def _GenerateUniqueID(self, ec_id):
256 """Generate an unique UUID.
258 This checks the current node, instances and disk names for
262 @return: the unique id
265 existing = self._AllIDs(include_temporary=False)
266 return self._temporary_ids.Generate(existing, utils.NewUUID, ec_id)
268 @locking.ssynchronized(_config_lock, shared=1)
269 def GenerateUniqueID(self, ec_id):
270 """Generate an unique ID.
272 This is just a wrapper over the unlocked version.
275 @param ec_id: unique id for the job to reserve the id to
278 return self._GenerateUniqueID(ec_id)
281 """Return all MACs present in the config.
284 @return: the list of all MACs
288 for instance in self._config_data.instances.values():
289 for nic in instance.nics:
290 result.append(nic.mac)
294 def _AllDRBDSecrets(self):
295 """Return all DRBD secrets present in the config.
298 @return: the list of all DRBD secrets
301 def helper(disk, result):
302 """Recursively gather secrets from this disk."""
303 if disk.dev_type == constants.DT_DRBD8:
304 result.append(disk.logical_id[5])
306 for child in disk.children:
307 helper(child, result)
310 for instance in self._config_data.instances.values():
311 for disk in instance.disks:
316 def _CheckDiskIDs(self, disk, l_ids, p_ids):
317 """Compute duplicate disk IDs
319 @type disk: L{objects.Disk}
320 @param disk: the disk at which to start searching
322 @param l_ids: list of current logical ids
324 @param p_ids: list of current physical ids
326 @return: a list of error messages
330 if disk.logical_id is not None:
331 if disk.logical_id in l_ids:
332 result.append("duplicate logical id %s" % str(disk.logical_id))
334 l_ids.append(disk.logical_id)
335 if disk.physical_id is not None:
336 if disk.physical_id in p_ids:
337 result.append("duplicate physical id %s" % str(disk.physical_id))
339 p_ids.append(disk.physical_id)
342 for child in disk.children:
343 result.extend(self._CheckDiskIDs(child, l_ids, p_ids))
346 def _UnlockedVerifyConfig(self):
350 @return: a list of error messages; a non-empty list signifies
357 data = self._config_data
361 # global cluster checks
362 if not data.cluster.enabled_hypervisors:
363 result.append("enabled hypervisors list doesn't have any entries")
364 invalid_hvs = set(data.cluster.enabled_hypervisors) - constants.HYPER_TYPES
366 result.append("enabled hypervisors contains invalid entries: %s" %
368 missing_hvp = (set(data.cluster.enabled_hypervisors) -
369 set(data.cluster.hvparams.keys()))
371 result.append("hypervisor parameters missing for the enabled"
372 " hypervisor(s) %s" % utils.CommaJoin(missing_hvp))
374 if data.cluster.master_node not in data.nodes:
375 result.append("cluster has invalid primary node '%s'" %
376 data.cluster.master_node)
378 # per-instance checks
379 for instance_name in data.instances:
380 instance = data.instances[instance_name]
381 if instance.name != instance_name:
382 result.append("instance '%s' is indexed by wrong name '%s'" %
383 (instance.name, instance_name))
384 if instance.primary_node not in data.nodes:
385 result.append("instance '%s' has invalid primary node '%s'" %
386 (instance_name, instance.primary_node))
387 for snode in instance.secondary_nodes:
388 if snode not in data.nodes:
389 result.append("instance '%s' has invalid secondary node '%s'" %
390 (instance_name, snode))
391 for idx, nic in enumerate(instance.nics):
392 if nic.mac in seen_macs:
393 result.append("instance '%s' has NIC %d mac %s duplicate" %
394 (instance_name, idx, nic.mac))
396 seen_macs.append(nic.mac)
398 # gather the drbd ports for duplicate checks
399 for dsk in instance.disks:
400 if dsk.dev_type in constants.LDS_DRBD:
401 tcp_port = dsk.logical_id[2]
402 if tcp_port not in ports:
404 ports[tcp_port].append((instance.name, "drbd disk %s" % dsk.iv_name))
405 # gather network port reservation
406 net_port = getattr(instance, "network_port", None)
407 if net_port is not None:
408 if net_port not in ports:
410 ports[net_port].append((instance.name, "network port"))
412 # instance disk verify
413 for idx, disk in enumerate(instance.disks):
414 result.extend(["instance '%s' disk %d error: %s" %
415 (instance.name, idx, msg) for msg in disk.Verify()])
416 result.extend(self._CheckDiskIDs(disk, seen_lids, seen_pids))
418 # cluster-wide pool of free ports
419 for free_port in data.cluster.tcpudp_port_pool:
420 if free_port not in ports:
421 ports[free_port] = []
422 ports[free_port].append(("cluster", "port marked as free"))
424 # compute tcp/udp duplicate ports
430 txt = utils.CommaJoin(["%s/%s" % val for val in pdata])
431 result.append("tcp/udp port %s has duplicates: %s" % (pnum, txt))
433 # highest used tcp port check
435 if keys[-1] > data.cluster.highest_used_port:
436 result.append("Highest used port mismatch, saved %s, computed %s" %
437 (data.cluster.highest_used_port, keys[-1]))
439 if not data.nodes[data.cluster.master_node].master_candidate:
440 result.append("Master node is not a master candidate")
442 # master candidate checks
443 mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats()
445 result.append("Not enough master candidates: actual %d, target %d" %
449 for node_name, node in data.nodes.items():
450 if node.name != node_name:
451 result.append("Node '%s' is indexed by wrong name '%s'" %
452 (node.name, node_name))
453 if [node.master_candidate, node.drained, node.offline].count(True) > 1:
454 result.append("Node %s state is invalid: master_candidate=%s,"
455 " drain=%s, offline=%s" %
456 (node.name, node.master_candidate, node.drain,
460 _, duplicates = self._UnlockedComputeDRBDMap()
461 for node, minor, instance_a, instance_b in duplicates:
462 result.append("DRBD minor %d on node %s is assigned twice to instances"
463 " %s and %s" % (minor, node, instance_a, instance_b))
466 default_nicparams = data.cluster.nicparams[constants.PP_DEFAULT]
469 def _AddIpAddress(ip, name):
470 ips.setdefault(ip, []).append(name)
472 _AddIpAddress(data.cluster.master_ip, "cluster_ip")
474 for node in data.nodes.values():
475 _AddIpAddress(node.primary_ip, "node:%s/primary" % node.name)
476 if node.secondary_ip != node.primary_ip:
477 _AddIpAddress(node.secondary_ip, "node:%s/secondary" % node.name)
479 for instance in data.instances.values():
480 for idx, nic in enumerate(instance.nics):
484 nicparams = objects.FillDict(default_nicparams, nic.nicparams)
485 nic_mode = nicparams[constants.NIC_MODE]
486 nic_link = nicparams[constants.NIC_LINK]
488 if nic_mode == constants.NIC_MODE_BRIDGED:
489 link = "bridge:%s" % nic_link
490 elif nic_mode == constants.NIC_MODE_ROUTED:
491 link = "route:%s" % nic_link
493 raise errors.ProgrammerError("NIC mode '%s' not handled" % nic_mode)
495 _AddIpAddress("%s/%s" % (link, nic.ip),
496 "instance:%s/nic:%d" % (instance.name, idx))
498 for ip, owners in ips.items():
500 result.append("IP address %s is used by multiple owners: %s" %
501 (ip, utils.CommaJoin(owners)))
505 @locking.ssynchronized(_config_lock, shared=1)
506 def VerifyConfig(self):
509 This is just a wrapper over L{_UnlockedVerifyConfig}.
512 @return: a list of error messages; a non-empty list signifies
516 return self._UnlockedVerifyConfig()
518 def _UnlockedSetDiskID(self, disk, node_name):
519 """Convert the unique ID to the ID needed on the target nodes.
521 This is used only for drbd, which needs ip/port configuration.
523 The routine descends down and updates its children also, because
524 this helps when the only the top device is passed to the remote
527 This function is for internal use, when the config lock is already held.
531 for child in disk.children:
532 self._UnlockedSetDiskID(child, node_name)
534 if disk.logical_id is None and disk.physical_id is not None:
536 if disk.dev_type == constants.LD_DRBD8:
537 pnode, snode, port, pminor, sminor, secret = disk.logical_id
538 if node_name not in (pnode, snode):
539 raise errors.ConfigurationError("DRBD device not knowing node %s" %
541 pnode_info = self._UnlockedGetNodeInfo(pnode)
542 snode_info = self._UnlockedGetNodeInfo(snode)
543 if pnode_info is None or snode_info is None:
544 raise errors.ConfigurationError("Can't find primary or secondary node"
545 " for %s" % str(disk))
546 p_data = (pnode_info.secondary_ip, port)
547 s_data = (snode_info.secondary_ip, port)
548 if pnode == node_name:
549 disk.physical_id = p_data + s_data + (pminor, secret)
550 else: # it must be secondary, we tested above
551 disk.physical_id = s_data + p_data + (sminor, secret)
553 disk.physical_id = disk.logical_id
556 @locking.ssynchronized(_config_lock)
557 def SetDiskID(self, disk, node_name):
558 """Convert the unique ID to the ID needed on the target nodes.
560 This is used only for drbd, which needs ip/port configuration.
562 The routine descends down and updates its children also, because
563 this helps when the only the top device is passed to the remote
567 return self._UnlockedSetDiskID(disk, node_name)
569 @locking.ssynchronized(_config_lock)
570 def AddTcpUdpPort(self, port):
571 """Adds a new port to the available port pool.
574 if not isinstance(port, int):
575 raise errors.ProgrammerError("Invalid type passed for port")
577 self._config_data.cluster.tcpudp_port_pool.add(port)
580 @locking.ssynchronized(_config_lock, shared=1)
581 def GetPortList(self):
582 """Returns a copy of the current port list.
585 return self._config_data.cluster.tcpudp_port_pool.copy()
587 @locking.ssynchronized(_config_lock)
588 def AllocatePort(self):
591 The port will be taken from the available port pool or from the
592 default port range (and in this case we increase
596 # If there are TCP/IP ports configured, we use them first.
597 if self._config_data.cluster.tcpudp_port_pool:
598 port = self._config_data.cluster.tcpudp_port_pool.pop()
600 port = self._config_data.cluster.highest_used_port + 1
601 if port >= constants.LAST_DRBD_PORT:
602 raise errors.ConfigurationError("The highest used port is greater"
603 " than %s. Aborting." %
604 constants.LAST_DRBD_PORT)
605 self._config_data.cluster.highest_used_port = port
610 def _UnlockedComputeDRBDMap(self):
611 """Compute the used DRBD minor/nodes.
614 @return: dictionary of node_name: dict of minor: instance_name;
615 the returned dict will have all the nodes in it (even if with
616 an empty list), and a list of duplicates; if the duplicates
617 list is not empty, the configuration is corrupted and its caller
618 should raise an exception
621 def _AppendUsedPorts(instance_name, disk, used):
623 if disk.dev_type == constants.LD_DRBD8 and len(disk.logical_id) >= 5:
624 node_a, node_b, _, minor_a, minor_b = disk.logical_id[:5]
625 for node, port in ((node_a, minor_a), (node_b, minor_b)):
626 assert node in used, ("Node '%s' of instance '%s' not found"
627 " in node list" % (node, instance_name))
628 if port in used[node]:
629 duplicates.append((node, port, instance_name, used[node][port]))
631 used[node][port] = instance_name
633 for child in disk.children:
634 duplicates.extend(_AppendUsedPorts(instance_name, child, used))
638 my_dict = dict((node, {}) for node in self._config_data.nodes)
639 for instance in self._config_data.instances.itervalues():
640 for disk in instance.disks:
641 duplicates.extend(_AppendUsedPorts(instance.name, disk, my_dict))
642 for (node, minor), instance in self._temporary_drbds.iteritems():
643 if minor in my_dict[node] and my_dict[node][minor] != instance:
644 duplicates.append((node, minor, instance, my_dict[node][minor]))
646 my_dict[node][minor] = instance
647 return my_dict, duplicates
649 @locking.ssynchronized(_config_lock)
650 def ComputeDRBDMap(self):
651 """Compute the used DRBD minor/nodes.
653 This is just a wrapper over L{_UnlockedComputeDRBDMap}.
655 @return: dictionary of node_name: dict of minor: instance_name;
656 the returned dict will have all the nodes in it (even if with
660 d_map, duplicates = self._UnlockedComputeDRBDMap()
662 raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
666 @locking.ssynchronized(_config_lock)
667 def AllocateDRBDMinor(self, nodes, instance):
668 """Allocate a drbd minor.
670 The free minor will be automatically computed from the existing
671 devices. A node can be given multiple times in order to allocate
672 multiple minors. The result is the list of minors, in the same
673 order as the passed nodes.
675 @type instance: string
676 @param instance: the instance for which we allocate minors
679 assert isinstance(instance, basestring), \
680 "Invalid argument '%s' passed to AllocateDRBDMinor" % instance
682 d_map, duplicates = self._UnlockedComputeDRBDMap()
684 raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
690 # no minors used, we can start at 0
693 self._temporary_drbds[(nname, 0)] = instance
697 ffree = utils.FirstFree(keys)
699 # return the next minor
700 # TODO: implement high-limit check
704 # double-check minor against current instances
705 assert minor not in d_map[nname], \
706 ("Attempt to reuse allocated DRBD minor %d on node %s,"
707 " already allocated to instance %s" %
708 (minor, nname, d_map[nname][minor]))
709 ndata[minor] = instance
710 # double-check minor against reservation
711 r_key = (nname, minor)
712 assert r_key not in self._temporary_drbds, \
713 ("Attempt to reuse reserved DRBD minor %d on node %s,"
714 " reserved for instance %s" %
715 (minor, nname, self._temporary_drbds[r_key]))
716 self._temporary_drbds[r_key] = instance
718 logging.debug("Request to allocate drbd minors, input: %s, returning %s",
722 def _UnlockedReleaseDRBDMinors(self, instance):
723 """Release temporary drbd minors allocated for a given instance.
725 @type instance: string
726 @param instance: the instance for which temporary minors should be
730 assert isinstance(instance, basestring), \
731 "Invalid argument passed to ReleaseDRBDMinors"
732 for key, name in self._temporary_drbds.items():
734 del self._temporary_drbds[key]
736 @locking.ssynchronized(_config_lock)
737 def ReleaseDRBDMinors(self, instance):
738 """Release temporary drbd minors allocated for a given instance.
740 This should be called on the error paths, on the success paths
741 it's automatically called by the ConfigWriter add and update
744 This function is just a wrapper over L{_UnlockedReleaseDRBDMinors}.
746 @type instance: string
747 @param instance: the instance for which temporary minors should be
751 self._UnlockedReleaseDRBDMinors(instance)
753 @locking.ssynchronized(_config_lock, shared=1)
754 def GetConfigVersion(self):
755 """Get the configuration version.
757 @return: Config version
760 return self._config_data.version
762 @locking.ssynchronized(_config_lock, shared=1)
763 def GetClusterName(self):
766 @return: Cluster name
769 return self._config_data.cluster.cluster_name
771 @locking.ssynchronized(_config_lock, shared=1)
772 def GetMasterNode(self):
773 """Get the hostname of the master node for this cluster.
775 @return: Master hostname
778 return self._config_data.cluster.master_node
780 @locking.ssynchronized(_config_lock, shared=1)
781 def GetMasterIP(self):
782 """Get the IP of the master node for this cluster.
787 return self._config_data.cluster.master_ip
789 @locking.ssynchronized(_config_lock, shared=1)
790 def GetMasterNetdev(self):
791 """Get the master network device for this cluster.
794 return self._config_data.cluster.master_netdev
796 @locking.ssynchronized(_config_lock, shared=1)
797 def GetFileStorageDir(self):
798 """Get the file storage dir for this cluster.
801 return self._config_data.cluster.file_storage_dir
803 @locking.ssynchronized(_config_lock, shared=1)
804 def GetHypervisorType(self):
805 """Get the hypervisor type for this cluster.
808 return self._config_data.cluster.enabled_hypervisors[0]
810 @locking.ssynchronized(_config_lock, shared=1)
811 def GetHostKey(self):
812 """Return the rsa hostkey from the config.
815 @return: the rsa hostkey
818 return self._config_data.cluster.rsahostkeypub
820 @locking.ssynchronized(_config_lock, shared=1)
821 def GetDefaultIAllocator(self):
822 """Get the default instance allocator for this cluster.
825 return self._config_data.cluster.default_iallocator
827 @locking.ssynchronized(_config_lock)
828 def AddInstance(self, instance, ec_id):
829 """Add an instance to the config.
831 This should be used after creating a new instance.
833 @type instance: L{objects.Instance}
834 @param instance: the instance object
837 if not isinstance(instance, objects.Instance):
838 raise errors.ProgrammerError("Invalid type passed to AddInstance")
840 if instance.disk_template != constants.DT_DISKLESS:
841 all_lvs = instance.MapLVsByNode()
842 logging.info("Instance '%s' DISK_LAYOUT: %s", instance.name, all_lvs)
844 all_macs = self._AllMACs()
845 for nic in instance.nics:
846 if nic.mac in all_macs:
847 raise errors.ConfigurationError("Cannot add instance %s:"
848 " MAC address '%s' already in use." %
849 (instance.name, nic.mac))
851 self._EnsureUUID(instance, ec_id)
853 instance.serial_no = 1
854 instance.ctime = instance.mtime = time.time()
855 self._config_data.instances[instance.name] = instance
856 self._config_data.cluster.serial_no += 1
857 self._UnlockedReleaseDRBDMinors(instance.name)
860 def _EnsureUUID(self, item, ec_id):
861 """Ensures a given object has a valid UUID.
863 @param item: the instance or node to be checked
864 @param ec_id: the execution context id for the uuid reservation
868 item.uuid = self._GenerateUniqueID(ec_id)
869 elif item.uuid in self._AllIDs(include_temporary=True):
870 raise errors.ConfigurationError("Cannot add '%s': UUID %s already"
871 " in use" % (item.name, item.uuid))
873 def _SetInstanceStatus(self, instance_name, status):
874 """Set the instance's status to a given value.
877 assert isinstance(status, bool), \
878 "Invalid status '%s' passed to SetInstanceStatus" % (status,)
880 if instance_name not in self._config_data.instances:
881 raise errors.ConfigurationError("Unknown instance '%s'" %
883 instance = self._config_data.instances[instance_name]
884 if instance.admin_up != status:
885 instance.admin_up = status
886 instance.serial_no += 1
887 instance.mtime = time.time()
890 @locking.ssynchronized(_config_lock)
891 def MarkInstanceUp(self, instance_name):
892 """Mark the instance status to up in the config.
895 self._SetInstanceStatus(instance_name, True)
897 @locking.ssynchronized(_config_lock)
898 def RemoveInstance(self, instance_name):
899 """Remove the instance from the configuration.
902 if instance_name not in self._config_data.instances:
903 raise errors.ConfigurationError("Unknown instance '%s'" % instance_name)
904 del self._config_data.instances[instance_name]
905 self._config_data.cluster.serial_no += 1
908 @locking.ssynchronized(_config_lock)
909 def RenameInstance(self, old_name, new_name):
910 """Rename an instance.
912 This needs to be done in ConfigWriter and not by RemoveInstance
913 combined with AddInstance as only we can guarantee an atomic
917 if old_name not in self._config_data.instances:
918 raise errors.ConfigurationError("Unknown instance '%s'" % old_name)
919 inst = self._config_data.instances[old_name]
920 del self._config_data.instances[old_name]
923 for disk in inst.disks:
924 if disk.dev_type == constants.LD_FILE:
925 # rename the file paths in logical and physical id
926 file_storage_dir = os.path.dirname(os.path.dirname(disk.logical_id[1]))
927 disk.physical_id = disk.logical_id = (disk.logical_id[0],
928 utils.PathJoin(file_storage_dir,
932 self._config_data.instances[inst.name] = inst
935 @locking.ssynchronized(_config_lock)
936 def MarkInstanceDown(self, instance_name):
937 """Mark the status of an instance to down in the configuration.
940 self._SetInstanceStatus(instance_name, False)
942 def _UnlockedGetInstanceList(self):
943 """Get the list of instances.
945 This function is for internal use, when the config lock is already held.
948 return self._config_data.instances.keys()
950 @locking.ssynchronized(_config_lock, shared=1)
951 def GetInstanceList(self):
952 """Get the list of instances.
954 @return: array of instances, ex. ['instance2.example.com',
955 'instance1.example.com']
958 return self._UnlockedGetInstanceList()
960 @locking.ssynchronized(_config_lock, shared=1)
961 def ExpandInstanceName(self, short_name):
962 """Attempt to expand an incomplete instance name.
965 return utils.MatchNameComponent(short_name,
966 self._config_data.instances.keys(),
967 case_sensitive=False)
969 def _UnlockedGetInstanceInfo(self, instance_name):
970 """Returns information about an instance.
972 This function is for internal use, when the config lock is already held.
975 if instance_name not in self._config_data.instances:
978 return self._config_data.instances[instance_name]
980 @locking.ssynchronized(_config_lock, shared=1)
981 def GetInstanceInfo(self, instance_name):
982 """Returns information about an instance.
984 It takes the information from the configuration file. Other information of
985 an instance are taken from the live systems.
987 @param instance_name: name of the instance, e.g.
988 I{instance1.example.com}
990 @rtype: L{objects.Instance}
991 @return: the instance object
994 return self._UnlockedGetInstanceInfo(instance_name)
996 @locking.ssynchronized(_config_lock, shared=1)
997 def GetAllInstancesInfo(self):
998 """Get the configuration of all instances.
1001 @return: dict of (instance, instance_info), where instance_info is what
1002 would GetInstanceInfo return for the node
1005 my_dict = dict([(instance, self._UnlockedGetInstanceInfo(instance))
1006 for instance in self._UnlockedGetInstanceList()])
1009 @locking.ssynchronized(_config_lock)
1010 def AddNode(self, node, ec_id):
1011 """Add a node to the configuration.
1013 @type node: L{objects.Node}
1014 @param node: a Node instance
1017 logging.info("Adding node %s to configuration", node.name)
1019 self._EnsureUUID(node, ec_id)
1022 node.ctime = node.mtime = time.time()
1023 self._config_data.nodes[node.name] = node
1024 self._config_data.cluster.serial_no += 1
1027 @locking.ssynchronized(_config_lock)
1028 def RemoveNode(self, node_name):
1029 """Remove a node from the configuration.
1032 logging.info("Removing node %s from configuration", node_name)
1034 if node_name not in self._config_data.nodes:
1035 raise errors.ConfigurationError("Unknown node '%s'" % node_name)
1037 del self._config_data.nodes[node_name]
1038 self._config_data.cluster.serial_no += 1
1041 @locking.ssynchronized(_config_lock, shared=1)
1042 def ExpandNodeName(self, short_name):
1043 """Attempt to expand an incomplete instance name.
1046 return utils.MatchNameComponent(short_name,
1047 self._config_data.nodes.keys(),
1048 case_sensitive=False)
1050 def _UnlockedGetNodeInfo(self, node_name):
1051 """Get the configuration of a node, as stored in the config.
1053 This function is for internal use, when the config lock is already
1056 @param node_name: the node name, e.g. I{node1.example.com}
1058 @rtype: L{objects.Node}
1059 @return: the node object
1062 if node_name not in self._config_data.nodes:
1065 return self._config_data.nodes[node_name]
1067 @locking.ssynchronized(_config_lock, shared=1)
1068 def GetNodeInfo(self, node_name):
1069 """Get the configuration of a node, as stored in the config.
1071 This is just a locked wrapper over L{_UnlockedGetNodeInfo}.
1073 @param node_name: the node name, e.g. I{node1.example.com}
1075 @rtype: L{objects.Node}
1076 @return: the node object
1079 return self._UnlockedGetNodeInfo(node_name)
1081 def _UnlockedGetNodeList(self):
1082 """Return the list of nodes which are in the configuration.
1084 This function is for internal use, when the config lock is already
1090 return self._config_data.nodes.keys()
1092 @locking.ssynchronized(_config_lock, shared=1)
1093 def GetNodeList(self):
1094 """Return the list of nodes which are in the configuration.
1097 return self._UnlockedGetNodeList()
1099 def _UnlockedGetOnlineNodeList(self):
1100 """Return the list of nodes which are online.
1103 all_nodes = [self._UnlockedGetNodeInfo(node)
1104 for node in self._UnlockedGetNodeList()]
1105 return [node.name for node in all_nodes if not node.offline]
1107 @locking.ssynchronized(_config_lock, shared=1)
1108 def GetOnlineNodeList(self):
1109 """Return the list of nodes which are online.
1112 return self._UnlockedGetOnlineNodeList()
1114 @locking.ssynchronized(_config_lock, shared=1)
1115 def GetAllNodesInfo(self):
1116 """Get the configuration of all nodes.
1119 @return: dict of (node, node_info), where node_info is what
1120 would GetNodeInfo return for the node
1123 my_dict = dict([(node, self._UnlockedGetNodeInfo(node))
1124 for node in self._UnlockedGetNodeList()])
1127 def _UnlockedGetMasterCandidateStats(self, exceptions=None):
1128 """Get the number of current and maximum desired and possible candidates.
1130 @type exceptions: list
1131 @param exceptions: if passed, list of nodes that should be ignored
1133 @return: tuple of (current, desired and possible, possible)
1136 mc_now = mc_should = mc_max = 0
1137 for node in self._config_data.nodes.values():
1138 if exceptions and node.name in exceptions:
1140 if not (node.offline or node.drained):
1142 if node.master_candidate:
1144 mc_should = min(mc_max, self._config_data.cluster.candidate_pool_size)
1145 return (mc_now, mc_should, mc_max)
1147 @locking.ssynchronized(_config_lock, shared=1)
1148 def GetMasterCandidateStats(self, exceptions=None):
1149 """Get the number of current and maximum possible candidates.
1151 This is just a wrapper over L{_UnlockedGetMasterCandidateStats}.
1153 @type exceptions: list
1154 @param exceptions: if passed, list of nodes that should be ignored
1156 @return: tuple of (current, max)
1159 return self._UnlockedGetMasterCandidateStats(exceptions)
1161 @locking.ssynchronized(_config_lock)
1162 def MaintainCandidatePool(self, exceptions):
1163 """Try to grow the candidate pool to the desired size.
1165 @type exceptions: list
1166 @param exceptions: if passed, list of nodes that should be ignored
1168 @return: list with the adjusted nodes (L{objects.Node} instances)
1171 mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats(exceptions)
1174 node_list = self._config_data.nodes.keys()
1175 random.shuffle(node_list)
1176 for name in node_list:
1177 if mc_now >= mc_max:
1179 node = self._config_data.nodes[name]
1180 if (node.master_candidate or node.offline or node.drained or
1181 node.name in exceptions):
1183 mod_list.append(node)
1184 node.master_candidate = True
1187 if mc_now != mc_max:
1188 # this should not happen
1189 logging.warning("Warning: MaintainCandidatePool didn't manage to"
1190 " fill the candidate pool (%d/%d)", mc_now, mc_max)
1192 self._config_data.cluster.serial_no += 1
1197 def _BumpSerialNo(self):
1198 """Bump up the serial number of the config.
1201 self._config_data.serial_no += 1
1202 self._config_data.mtime = time.time()
1204 def _AllUUIDObjects(self):
1205 """Returns all objects with uuid attributes.
1208 return (self._config_data.instances.values() +
1209 self._config_data.nodes.values() +
1210 [self._config_data.cluster])
1212 def _OpenConfig(self):
1213 """Read the config data from disk.
1216 raw_data = utils.ReadFile(self._cfg_file)
1219 data = objects.ConfigData.FromDict(serializer.Load(raw_data))
1220 except Exception, err:
1221 raise errors.ConfigurationError(err)
1223 # Make sure the configuration has the right version
1224 _ValidateConfig(data)
1226 if (not hasattr(data, 'cluster') or
1227 not hasattr(data.cluster, 'rsahostkeypub')):
1228 raise errors.ConfigurationError("Incomplete configuration"
1229 " (missing cluster.rsahostkeypub)")
1231 # Upgrade configuration if needed
1232 data.UpgradeConfig()
1234 self._config_data = data
1235 # reset the last serial as -1 so that the next write will cause
1237 self._last_cluster_serial = -1
1239 # And finally run our (custom) config upgrade sequence
1240 self._UpgradeConfig()
1242 def _UpgradeConfig(self):
1243 """Run upgrade steps that cannot be done purely in the objects.
1245 This is because some data elements need uniqueness across the
1246 whole configuration, etc.
1248 @warning: this function will call L{_WriteConfig()}, but also
1249 L{DropECReservations} so it needs to be called only from a
1250 "safe" place (the constructor). If one wanted to call it with
1251 the lock held, a DropECReservationUnlocked would need to be
1252 created first, to avoid causing deadlock.
1256 for item in self._AllUUIDObjects():
1257 if item.uuid is None:
1258 item.uuid = self._GenerateUniqueID(_UPGRADE_CONFIG_JID)
1262 # This is ok even if it acquires the internal lock, as _UpgradeConfig is
1263 # only called at config init time, without the lock held
1264 self.DropECReservations(_UPGRADE_CONFIG_JID)
1266 def _DistributeConfig(self, feedback_fn):
1267 """Distribute the configuration to the other nodes.
1269 Currently, this only copies the configuration file. In the future,
1270 it could be used to encapsulate the 2/3-phase update mechanism.
1280 myhostname = self._my_hostname
1281 # we can skip checking whether _UnlockedGetNodeInfo returns None
1282 # since the node list comes from _UnlocketGetNodeList, and we are
1283 # called with the lock held, so no modifications should take place
1285 for node_name in self._UnlockedGetNodeList():
1286 if node_name == myhostname:
1288 node_info = self._UnlockedGetNodeInfo(node_name)
1289 if not node_info.master_candidate:
1291 node_list.append(node_info.name)
1292 addr_list.append(node_info.primary_ip)
1294 result = rpc.RpcRunner.call_upload_file(node_list, self._cfg_file,
1295 address_list=addr_list)
1296 for to_node, to_result in result.items():
1297 msg = to_result.fail_msg
1299 msg = ("Copy of file %s to node %s failed: %s" %
1300 (self._cfg_file, to_node, msg))
1310 def _WriteConfig(self, destination=None, feedback_fn=None):
1311 """Write the configuration data to persistent storage.
1314 assert feedback_fn is None or callable(feedback_fn)
1316 # Warn on config errors, but don't abort the save - the
1317 # configuration has already been modified, and we can't revert;
1318 # the best we can do is to warn the user and save as is, leaving
1319 # recovery to the user
1320 config_errors = self._UnlockedVerifyConfig()
1322 errmsg = ("Configuration data is not consistent: %s" %
1323 (utils.CommaJoin(config_errors)))
1324 logging.critical(errmsg)
1328 if destination is None:
1329 destination = self._cfg_file
1330 self._BumpSerialNo()
1331 txt = serializer.Dump(self._config_data.ToDict())
1333 utils.WriteFile(destination, data=txt)
1335 self.write_count += 1
1337 # and redistribute the config file to master candidates
1338 self._DistributeConfig(feedback_fn)
1340 # Write ssconf files on all nodes (including locally)
1341 if self._last_cluster_serial < self._config_data.cluster.serial_no:
1342 if not self._offline:
1343 result = rpc.RpcRunner.call_write_ssconf_files(
1344 self._UnlockedGetOnlineNodeList(),
1345 self._UnlockedGetSsconfValues())
1347 for nname, nresu in result.items():
1348 msg = nresu.fail_msg
1350 errmsg = ("Error while uploading ssconf files to"
1351 " node %s: %s" % (nname, msg))
1352 logging.warning(errmsg)
1357 self._last_cluster_serial = self._config_data.cluster.serial_no
1359 def _UnlockedGetSsconfValues(self):
1360 """Return the values needed by ssconf.
1363 @return: a dictionary with keys the ssconf names and values their
1368 instance_names = utils.NiceSort(self._UnlockedGetInstanceList())
1369 node_names = utils.NiceSort(self._UnlockedGetNodeList())
1370 node_info = [self._UnlockedGetNodeInfo(name) for name in node_names]
1371 node_pri_ips = ["%s %s" % (ninfo.name, ninfo.primary_ip)
1372 for ninfo in node_info]
1373 node_snd_ips = ["%s %s" % (ninfo.name, ninfo.secondary_ip)
1374 for ninfo in node_info]
1376 instance_data = fn(instance_names)
1377 off_data = fn(node.name for node in node_info if node.offline)
1378 on_data = fn(node.name for node in node_info if not node.offline)
1379 mc_data = fn(node.name for node in node_info if node.master_candidate)
1380 mc_ips_data = fn(node.primary_ip for node in node_info
1381 if node.master_candidate)
1382 node_data = fn(node_names)
1383 node_pri_ips_data = fn(node_pri_ips)
1384 node_snd_ips_data = fn(node_snd_ips)
1386 cluster = self._config_data.cluster
1387 cluster_tags = fn(cluster.GetTags())
1389 hypervisor_list = fn(cluster.enabled_hypervisors)
1391 uid_pool = uidpool.FormatUidPool(cluster.uid_pool, separator="\n")
1394 constants.SS_CLUSTER_NAME: cluster.cluster_name,
1395 constants.SS_CLUSTER_TAGS: cluster_tags,
1396 constants.SS_FILE_STORAGE_DIR: cluster.file_storage_dir,
1397 constants.SS_MASTER_CANDIDATES: mc_data,
1398 constants.SS_MASTER_CANDIDATES_IPS: mc_ips_data,
1399 constants.SS_MASTER_IP: cluster.master_ip,
1400 constants.SS_MASTER_NETDEV: cluster.master_netdev,
1401 constants.SS_MASTER_NODE: cluster.master_node,
1402 constants.SS_NODE_LIST: node_data,
1403 constants.SS_NODE_PRIMARY_IPS: node_pri_ips_data,
1404 constants.SS_NODE_SECONDARY_IPS: node_snd_ips_data,
1405 constants.SS_OFFLINE_NODES: off_data,
1406 constants.SS_ONLINE_NODES: on_data,
1407 constants.SS_INSTANCE_LIST: instance_data,
1408 constants.SS_RELEASE_VERSION: constants.RELEASE_VERSION,
1409 constants.SS_HYPERVISOR_LIST: hypervisor_list,
1410 constants.SS_MAINTAIN_NODE_HEALTH: str(cluster.maintain_node_health),
1411 constants.SS_UID_POOL: uid_pool,
1414 @locking.ssynchronized(_config_lock, shared=1)
1415 def GetVGName(self):
1416 """Return the volume group name.
1419 return self._config_data.cluster.volume_group_name
1421 @locking.ssynchronized(_config_lock)
1422 def SetVGName(self, vg_name):
1423 """Set the volume group name.
1426 self._config_data.cluster.volume_group_name = vg_name
1427 self._config_data.cluster.serial_no += 1
1430 @locking.ssynchronized(_config_lock, shared=1)
1431 def GetDRBDHelper(self):
1432 """Return DRBD usermode helper.
1435 return self._config_data.cluster.drbd_usermode_helper
1437 @locking.ssynchronized(_config_lock)
1438 def SetDRBDHelper(self, drbd_helper):
1439 """Set DRBD usermode helper.
1442 self._config_data.cluster.drbd_usermode_helper = drbd_helper
1443 self._config_data.cluster.serial_no += 1
1446 @locking.ssynchronized(_config_lock, shared=1)
1447 def GetMACPrefix(self):
1448 """Return the mac prefix.
1451 return self._config_data.cluster.mac_prefix
1453 @locking.ssynchronized(_config_lock, shared=1)
1454 def GetClusterInfo(self):
1455 """Returns information about the cluster
1457 @rtype: L{objects.Cluster}
1458 @return: the cluster object
1461 return self._config_data.cluster
1463 @locking.ssynchronized(_config_lock, shared=1)
1464 def HasAnyDiskOfType(self, dev_type):
1465 """Check if in there is at disk of the given type in the configuration.
1468 return self._config_data.HasAnyDiskOfType(dev_type)
1470 @locking.ssynchronized(_config_lock)
1471 def Update(self, target, feedback_fn):
1472 """Notify function to be called after updates.
1474 This function must be called when an object (as returned by
1475 GetInstanceInfo, GetNodeInfo, GetCluster) has been updated and the
1476 caller wants the modifications saved to the backing store. Note
1477 that all modified objects will be saved, but the target argument
1478 is the one the caller wants to ensure that it's saved.
1480 @param target: an instance of either L{objects.Cluster},
1481 L{objects.Node} or L{objects.Instance} which is existing in
1483 @param feedback_fn: Callable feedback function
1486 if self._config_data is None:
1487 raise errors.ProgrammerError("Configuration file not read,"
1489 update_serial = False
1490 if isinstance(target, objects.Cluster):
1491 test = target == self._config_data.cluster
1492 elif isinstance(target, objects.Node):
1493 test = target in self._config_data.nodes.values()
1494 update_serial = True
1495 elif isinstance(target, objects.Instance):
1496 test = target in self._config_data.instances.values()
1498 raise errors.ProgrammerError("Invalid object type (%s) passed to"
1499 " ConfigWriter.Update" % type(target))
1501 raise errors.ConfigurationError("Configuration updated since object"
1502 " has been read or unknown object")
1503 target.serial_no += 1
1504 target.mtime = now = time.time()
1507 # for node updates, we need to increase the cluster serial too
1508 self._config_data.cluster.serial_no += 1
1509 self._config_data.cluster.mtime = now
1511 if isinstance(target, objects.Instance):
1512 self._UnlockedReleaseDRBDMinors(target.name)
1514 self._WriteConfig(feedback_fn=feedback_fn)
1516 @locking.ssynchronized(_config_lock)
1517 def DropECReservations(self, ec_id):
1518 """Drop per-execution-context reservations
1521 for rm in self._all_rms:
1522 rm.DropECReservations(ec_id)