4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Configuration management for Ganeti
24 This module provides the interface to the Ganeti cluster configuration.
26 The configuration data is stored on every node but is updated on the master
27 only. After each update, the master distributes the data to the other nodes.
29 Currently, the data storage format is JSON. YAML was slow and consuming too
39 from ganeti import errors
40 from ganeti import locking
41 from ganeti import utils
42 from ganeti import constants
43 from ganeti import rpc
44 from ganeti import objects
45 from ganeti import serializer
48 _config_lock = locking.SharedLock()
50 # job id used for resource management at config upgrade time
51 _UPGRADE_CONFIG_JID = "jid-cfg-upgrade"
54 def _ValidateConfig(data):
55 """Verifies that a configuration objects looks valid.
57 This only verifies the version of the configuration.
59 @raise errors.ConfigurationError: if the version differs from what
63 if data.version != constants.CONFIG_VERSION:
64 raise errors.ConfigurationError("Cluster configuration version"
65 " mismatch, got %s instead of %s" %
67 constants.CONFIG_VERSION))
70 class TemporaryReservationManager:
71 """A temporary resource reservation manager.
73 This is used to reserve resources in a job, before using them, making sure
74 other jobs cannot get them in the meantime.
78 self._ec_reserved = {}
80 def Reserved(self, resource):
81 for holder_reserved in self._ec_reserved.items():
82 if resource in holder_reserved:
86 def Reserve(self, ec_id, resource):
87 if self.Reserved(resource):
88 raise errors.ReservationError("Duplicate reservation for resource: %s." %
90 if ec_id not in self._ec_reserved:
91 self._ec_reserved[ec_id] = set([resource])
93 self._ec_reserved[ec_id].add(resource)
95 def DropECReservations(self, ec_id):
96 if ec_id in self._ec_reserved:
97 del self._ec_reserved[ec_id]
99 def GetReserved(self):
101 for holder_reserved in self._ec_reserved.values():
102 all_reserved.update(holder_reserved)
105 def Generate(self, existing, generate_one_fn, ec_id):
106 """Generate a new resource of this type
109 assert callable(generate_one_fn)
111 all_elems = self.GetReserved()
112 all_elems.update(existing)
115 new_resource = generate_one_fn()
116 if new_resource is not None and new_resource not in all_elems:
119 raise errors.ConfigurationError("Not able generate new resource"
120 " (last tried: %s)" % new_resource)
121 self.Reserve(ec_id, new_resource)
126 """The interface to the cluster configuration.
129 def __init__(self, cfg_file=None, offline=False):
131 self._lock = _config_lock
132 self._config_data = None
133 self._offline = offline
135 self._cfg_file = constants.CLUSTER_CONF_FILE
137 self._cfg_file = cfg_file
138 self._temporary_ids = TemporaryReservationManager()
139 self._temporary_drbds = {}
140 self._temporary_macs = TemporaryReservationManager()
141 self._temporary_secrets = TemporaryReservationManager()
142 # Note: in order to prevent errors when resolving our name in
143 # _DistributeConfig, we compute it here once and reuse it; it's
144 # better to raise an error before starting to modify the config
145 # file than after it was modified
146 self._my_hostname = utils.HostInfo().name
147 self._last_cluster_serial = -1
150 # this method needs to be static, so that we can call it on the class
153 """Check if the cluster is configured.
156 return os.path.exists(constants.CLUSTER_CONF_FILE)
158 def _GenerateOneMAC(self):
159 """Generate one mac address
162 prefix = self._config_data.cluster.mac_prefix
163 byte1 = random.randrange(0, 256)
164 byte2 = random.randrange(0, 256)
165 byte3 = random.randrange(0, 256)
166 mac = "%s:%02x:%02x:%02x" % (prefix, byte1, byte2, byte3)
169 @locking.ssynchronized(_config_lock, shared=1)
170 def GenerateMAC(self, ec_id):
171 """Generate a MAC for an instance.
173 This should check the current instances for duplicates.
176 existing = self._AllMACs()
177 return self._temporary_ids.Generate(existing, self._GenerateOneMAC, ec_id)
179 @locking.ssynchronized(_config_lock, shared=1)
180 def ReserveMAC(self, mac, ec_id):
181 """Reserve a MAC for an instance.
183 This only checks instances managed by this cluster, it does not
184 check for potential collisions elsewhere.
187 all_macs = self._AllMACs()
189 raise errors.ReservationError("mac already in use")
191 self._temporary_macs.Reserve(mac, ec_id)
193 @locking.ssynchronized(_config_lock, shared=1)
194 def GenerateDRBDSecret(self, ec_id):
195 """Generate a DRBD secret.
197 This checks the current disks for duplicates.
200 return self._temporary_secrets.Generate(self._AllDRBDSecrets(),
201 utils.GenerateSecret,
205 """Compute the list of all LVs.
209 for instance in self._config_data.instances.values():
210 node_data = instance.MapLVsByNode()
211 for lv_list in node_data.values():
212 lvnames.update(lv_list)
215 def _AllIDs(self, include_temporary):
216 """Compute the list of all UUIDs and names we have.
218 @type include_temporary: boolean
219 @param include_temporary: whether to include the _temporary_ids set
221 @return: a set of IDs
225 if include_temporary:
226 existing.update(self._temporary_ids.GetReserved())
227 existing.update(self._AllLVs())
228 existing.update(self._config_data.instances.keys())
229 existing.update(self._config_data.nodes.keys())
230 existing.update([i.uuid for i in self._AllUUIDObjects() if i.uuid])
233 def _GenerateUniqueID(self, ec_id):
234 """Generate an unique UUID.
236 This checks the current node, instances and disk names for
240 @return: the unique id
243 existing = self._AllIDs(include_temporary=False)
244 return self._temporary_ids.Generate(existing, utils.NewUUID, ec_id)
246 @locking.ssynchronized(_config_lock, shared=1)
247 def GenerateUniqueID(self, ec_id):
248 """Generate an unique ID.
250 This is just a wrapper over the unlocked version.
253 @param ec_id: unique id for the job to reserve the id to
256 return self._GenerateUniqueID(ec_id)
259 """Return all MACs present in the config.
262 @return: the list of all MACs
266 for instance in self._config_data.instances.values():
267 for nic in instance.nics:
268 result.append(nic.mac)
272 def _AllDRBDSecrets(self):
273 """Return all DRBD secrets present in the config.
276 @return: the list of all DRBD secrets
279 def helper(disk, result):
280 """Recursively gather secrets from this disk."""
281 if disk.dev_type == constants.DT_DRBD8:
282 result.append(disk.logical_id[5])
284 for child in disk.children:
285 helper(child, result)
288 for instance in self._config_data.instances.values():
289 for disk in instance.disks:
294 def _CheckDiskIDs(self, disk, l_ids, p_ids):
295 """Compute duplicate disk IDs
297 @type disk: L{objects.Disk}
298 @param disk: the disk at which to start searching
300 @param l_ids: list of current logical ids
302 @param p_ids: list of current physical ids
304 @return: a list of error messages
308 if disk.logical_id is not None:
309 if disk.logical_id in l_ids:
310 result.append("duplicate logical id %s" % str(disk.logical_id))
312 l_ids.append(disk.logical_id)
313 if disk.physical_id is not None:
314 if disk.physical_id in p_ids:
315 result.append("duplicate physical id %s" % str(disk.physical_id))
317 p_ids.append(disk.physical_id)
320 for child in disk.children:
321 result.extend(self._CheckDiskIDs(child, l_ids, p_ids))
324 def _UnlockedVerifyConfig(self):
328 @return: a list of error messages; a non-empty list signifies
335 data = self._config_data
339 # global cluster checks
340 if not data.cluster.enabled_hypervisors:
341 result.append("enabled hypervisors list doesn't have any entries")
342 invalid_hvs = set(data.cluster.enabled_hypervisors) - constants.HYPER_TYPES
344 result.append("enabled hypervisors contains invalid entries: %s" %
347 if data.cluster.master_node not in data.nodes:
348 result.append("cluster has invalid primary node '%s'" %
349 data.cluster.master_node)
351 # per-instance checks
352 for instance_name in data.instances:
353 instance = data.instances[instance_name]
354 if instance.name != instance_name:
355 result.append("instance '%s' is indexed by wrong name '%s'" %
356 (instance.name, instance_name))
357 if instance.primary_node not in data.nodes:
358 result.append("instance '%s' has invalid primary node '%s'" %
359 (instance_name, instance.primary_node))
360 for snode in instance.secondary_nodes:
361 if snode not in data.nodes:
362 result.append("instance '%s' has invalid secondary node '%s'" %
363 (instance_name, snode))
364 for idx, nic in enumerate(instance.nics):
365 if nic.mac in seen_macs:
366 result.append("instance '%s' has NIC %d mac %s duplicate" %
367 (instance_name, idx, nic.mac))
369 seen_macs.append(nic.mac)
371 # gather the drbd ports for duplicate checks
372 for dsk in instance.disks:
373 if dsk.dev_type in constants.LDS_DRBD:
374 tcp_port = dsk.logical_id[2]
375 if tcp_port not in ports:
377 ports[tcp_port].append((instance.name, "drbd disk %s" % dsk.iv_name))
378 # gather network port reservation
379 net_port = getattr(instance, "network_port", None)
380 if net_port is not None:
381 if net_port not in ports:
383 ports[net_port].append((instance.name, "network port"))
385 # instance disk verify
386 for idx, disk in enumerate(instance.disks):
387 result.extend(["instance '%s' disk %d error: %s" %
388 (instance.name, idx, msg) for msg in disk.Verify()])
389 result.extend(self._CheckDiskIDs(disk, seen_lids, seen_pids))
391 # cluster-wide pool of free ports
392 for free_port in data.cluster.tcpudp_port_pool:
393 if free_port not in ports:
394 ports[free_port] = []
395 ports[free_port].append(("cluster", "port marked as free"))
397 # compute tcp/udp duplicate ports
403 txt = utils.CommaJoin(["%s/%s" % val for val in pdata])
404 result.append("tcp/udp port %s has duplicates: %s" % (pnum, txt))
406 # highest used tcp port check
408 if keys[-1] > data.cluster.highest_used_port:
409 result.append("Highest used port mismatch, saved %s, computed %s" %
410 (data.cluster.highest_used_port, keys[-1]))
412 if not data.nodes[data.cluster.master_node].master_candidate:
413 result.append("Master node is not a master candidate")
415 # master candidate checks
416 mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats()
418 result.append("Not enough master candidates: actual %d, target %d" %
422 for node_name, node in data.nodes.items():
423 if node.name != node_name:
424 result.append("Node '%s' is indexed by wrong name '%s'" %
425 (node.name, node_name))
426 if [node.master_candidate, node.drained, node.offline].count(True) > 1:
427 result.append("Node %s state is invalid: master_candidate=%s,"
428 " drain=%s, offline=%s" %
429 (node.name, node.master_candidate, node.drain,
433 _, duplicates = self._UnlockedComputeDRBDMap()
434 for node, minor, instance_a, instance_b in duplicates:
435 result.append("DRBD minor %d on node %s is assigned twice to instances"
436 " %s and %s" % (minor, node, instance_a, instance_b))
439 default_nicparams = data.cluster.nicparams[constants.PP_DEFAULT]
442 def _AddIpAddress(ip, name):
443 ips.setdefault(ip, []).append(name)
445 _AddIpAddress(data.cluster.master_ip, "cluster_ip")
447 for node in data.nodes.values():
448 _AddIpAddress(node.primary_ip, "node:%s/primary" % node.name)
449 if node.secondary_ip != node.primary_ip:
450 _AddIpAddress(node.secondary_ip, "node:%s/secondary" % node.name)
452 for instance in data.instances.values():
453 for idx, nic in enumerate(instance.nics):
457 nicparams = objects.FillDict(default_nicparams, nic.nicparams)
458 nic_mode = nicparams[constants.NIC_MODE]
459 nic_link = nicparams[constants.NIC_LINK]
461 if nic_mode == constants.NIC_MODE_BRIDGED:
462 link = "bridge:%s" % nic_link
463 elif nic_mode == constants.NIC_MODE_ROUTED:
464 link = "route:%s" % nic_link
466 raise errors.ProgrammerError("NIC mode '%s' not handled" % nic_mode)
468 _AddIpAddress("%s/%s" % (link, nic.ip),
469 "instance:%s/nic:%d" % (instance.name, idx))
471 for ip, owners in ips.items():
473 result.append("IP address %s is used by multiple owners: %s" %
474 (ip, utils.CommaJoin(owners)))
478 @locking.ssynchronized(_config_lock, shared=1)
479 def VerifyConfig(self):
482 This is just a wrapper over L{_UnlockedVerifyConfig}.
485 @return: a list of error messages; a non-empty list signifies
489 return self._UnlockedVerifyConfig()
491 def _UnlockedSetDiskID(self, disk, node_name):
492 """Convert the unique ID to the ID needed on the target nodes.
494 This is used only for drbd, which needs ip/port configuration.
496 The routine descends down and updates its children also, because
497 this helps when the only the top device is passed to the remote
500 This function is for internal use, when the config lock is already held.
504 for child in disk.children:
505 self._UnlockedSetDiskID(child, node_name)
507 if disk.logical_id is None and disk.physical_id is not None:
509 if disk.dev_type == constants.LD_DRBD8:
510 pnode, snode, port, pminor, sminor, secret = disk.logical_id
511 if node_name not in (pnode, snode):
512 raise errors.ConfigurationError("DRBD device not knowing node %s" %
514 pnode_info = self._UnlockedGetNodeInfo(pnode)
515 snode_info = self._UnlockedGetNodeInfo(snode)
516 if pnode_info is None or snode_info is None:
517 raise errors.ConfigurationError("Can't find primary or secondary node"
518 " for %s" % str(disk))
519 p_data = (pnode_info.secondary_ip, port)
520 s_data = (snode_info.secondary_ip, port)
521 if pnode == node_name:
522 disk.physical_id = p_data + s_data + (pminor, secret)
523 else: # it must be secondary, we tested above
524 disk.physical_id = s_data + p_data + (sminor, secret)
526 disk.physical_id = disk.logical_id
529 @locking.ssynchronized(_config_lock)
530 def SetDiskID(self, disk, node_name):
531 """Convert the unique ID to the ID needed on the target nodes.
533 This is used only for drbd, which needs ip/port configuration.
535 The routine descends down and updates its children also, because
536 this helps when the only the top device is passed to the remote
540 return self._UnlockedSetDiskID(disk, node_name)
542 @locking.ssynchronized(_config_lock)
543 def AddTcpUdpPort(self, port):
544 """Adds a new port to the available port pool.
547 if not isinstance(port, int):
548 raise errors.ProgrammerError("Invalid type passed for port")
550 self._config_data.cluster.tcpudp_port_pool.add(port)
553 @locking.ssynchronized(_config_lock, shared=1)
554 def GetPortList(self):
555 """Returns a copy of the current port list.
558 return self._config_data.cluster.tcpudp_port_pool.copy()
560 @locking.ssynchronized(_config_lock)
561 def AllocatePort(self):
564 The port will be taken from the available port pool or from the
565 default port range (and in this case we increase
569 # If there are TCP/IP ports configured, we use them first.
570 if self._config_data.cluster.tcpudp_port_pool:
571 port = self._config_data.cluster.tcpudp_port_pool.pop()
573 port = self._config_data.cluster.highest_used_port + 1
574 if port >= constants.LAST_DRBD_PORT:
575 raise errors.ConfigurationError("The highest used port is greater"
576 " than %s. Aborting." %
577 constants.LAST_DRBD_PORT)
578 self._config_data.cluster.highest_used_port = port
583 def _UnlockedComputeDRBDMap(self):
584 """Compute the used DRBD minor/nodes.
587 @return: dictionary of node_name: dict of minor: instance_name;
588 the returned dict will have all the nodes in it (even if with
589 an empty list), and a list of duplicates; if the duplicates
590 list is not empty, the configuration is corrupted and its caller
591 should raise an exception
594 def _AppendUsedPorts(instance_name, disk, used):
596 if disk.dev_type == constants.LD_DRBD8 and len(disk.logical_id) >= 5:
597 node_a, node_b, _, minor_a, minor_b = disk.logical_id[:5]
598 for node, port in ((node_a, minor_a), (node_b, minor_b)):
599 assert node in used, ("Node '%s' of instance '%s' not found"
600 " in node list" % (node, instance_name))
601 if port in used[node]:
602 duplicates.append((node, port, instance_name, used[node][port]))
604 used[node][port] = instance_name
606 for child in disk.children:
607 duplicates.extend(_AppendUsedPorts(instance_name, child, used))
611 my_dict = dict((node, {}) for node in self._config_data.nodes)
612 for instance in self._config_data.instances.itervalues():
613 for disk in instance.disks:
614 duplicates.extend(_AppendUsedPorts(instance.name, disk, my_dict))
615 for (node, minor), instance in self._temporary_drbds.iteritems():
616 if minor in my_dict[node] and my_dict[node][minor] != instance:
617 duplicates.append((node, minor, instance, my_dict[node][minor]))
619 my_dict[node][minor] = instance
620 return my_dict, duplicates
622 @locking.ssynchronized(_config_lock)
623 def ComputeDRBDMap(self):
624 """Compute the used DRBD minor/nodes.
626 This is just a wrapper over L{_UnlockedComputeDRBDMap}.
628 @return: dictionary of node_name: dict of minor: instance_name;
629 the returned dict will have all the nodes in it (even if with
633 d_map, duplicates = self._UnlockedComputeDRBDMap()
635 raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
639 @locking.ssynchronized(_config_lock)
640 def AllocateDRBDMinor(self, nodes, instance):
641 """Allocate a drbd minor.
643 The free minor will be automatically computed from the existing
644 devices. A node can be given multiple times in order to allocate
645 multiple minors. The result is the list of minors, in the same
646 order as the passed nodes.
648 @type instance: string
649 @param instance: the instance for which we allocate minors
652 assert isinstance(instance, basestring), \
653 "Invalid argument '%s' passed to AllocateDRBDMinor" % instance
655 d_map, duplicates = self._UnlockedComputeDRBDMap()
657 raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
663 # no minors used, we can start at 0
666 self._temporary_drbds[(nname, 0)] = instance
670 ffree = utils.FirstFree(keys)
672 # return the next minor
673 # TODO: implement high-limit check
677 # double-check minor against current instances
678 assert minor not in d_map[nname], \
679 ("Attempt to reuse allocated DRBD minor %d on node %s,"
680 " already allocated to instance %s" %
681 (minor, nname, d_map[nname][minor]))
682 ndata[minor] = instance
683 # double-check minor against reservation
684 r_key = (nname, minor)
685 assert r_key not in self._temporary_drbds, \
686 ("Attempt to reuse reserved DRBD minor %d on node %s,"
687 " reserved for instance %s" %
688 (minor, nname, self._temporary_drbds[r_key]))
689 self._temporary_drbds[r_key] = instance
691 logging.debug("Request to allocate drbd minors, input: %s, returning %s",
695 def _UnlockedReleaseDRBDMinors(self, instance):
696 """Release temporary drbd minors allocated for a given instance.
698 @type instance: string
699 @param instance: the instance for which temporary minors should be
703 assert isinstance(instance, basestring), \
704 "Invalid argument passed to ReleaseDRBDMinors"
705 for key, name in self._temporary_drbds.items():
707 del self._temporary_drbds[key]
709 @locking.ssynchronized(_config_lock)
710 def ReleaseDRBDMinors(self, instance):
711 """Release temporary drbd minors allocated for a given instance.
713 This should be called on the error paths, on the success paths
714 it's automatically called by the ConfigWriter add and update
717 This function is just a wrapper over L{_UnlockedReleaseDRBDMinors}.
719 @type instance: string
720 @param instance: the instance for which temporary minors should be
724 self._UnlockedReleaseDRBDMinors(instance)
726 @locking.ssynchronized(_config_lock, shared=1)
727 def GetConfigVersion(self):
728 """Get the configuration version.
730 @return: Config version
733 return self._config_data.version
735 @locking.ssynchronized(_config_lock, shared=1)
736 def GetClusterName(self):
739 @return: Cluster name
742 return self._config_data.cluster.cluster_name
744 @locking.ssynchronized(_config_lock, shared=1)
745 def GetMasterNode(self):
746 """Get the hostname of the master node for this cluster.
748 @return: Master hostname
751 return self._config_data.cluster.master_node
753 @locking.ssynchronized(_config_lock, shared=1)
754 def GetMasterIP(self):
755 """Get the IP of the master node for this cluster.
760 return self._config_data.cluster.master_ip
762 @locking.ssynchronized(_config_lock, shared=1)
763 def GetMasterNetdev(self):
764 """Get the master network device for this cluster.
767 return self._config_data.cluster.master_netdev
769 @locking.ssynchronized(_config_lock, shared=1)
770 def GetFileStorageDir(self):
771 """Get the file storage dir for this cluster.
774 return self._config_data.cluster.file_storage_dir
776 @locking.ssynchronized(_config_lock, shared=1)
777 def GetHypervisorType(self):
778 """Get the hypervisor type for this cluster.
781 return self._config_data.cluster.enabled_hypervisors[0]
783 @locking.ssynchronized(_config_lock, shared=1)
784 def GetHostKey(self):
785 """Return the rsa hostkey from the config.
788 @return: the rsa hostkey
791 return self._config_data.cluster.rsahostkeypub
793 @locking.ssynchronized(_config_lock)
794 def AddInstance(self, instance, ec_id):
795 """Add an instance to the config.
797 This should be used after creating a new instance.
799 @type instance: L{objects.Instance}
800 @param instance: the instance object
803 if not isinstance(instance, objects.Instance):
804 raise errors.ProgrammerError("Invalid type passed to AddInstance")
806 if instance.disk_template != constants.DT_DISKLESS:
807 all_lvs = instance.MapLVsByNode()
808 logging.info("Instance '%s' DISK_LAYOUT: %s", instance.name, all_lvs)
810 all_macs = self._AllMACs()
811 for nic in instance.nics:
812 if nic.mac in all_macs:
813 raise errors.ConfigurationError("Cannot add instance %s:"
814 " MAC address '%s' already in use." %
815 (instance.name, nic.mac))
817 self._EnsureUUID(instance, ec_id)
819 instance.serial_no = 1
820 instance.ctime = instance.mtime = time.time()
821 self._config_data.instances[instance.name] = instance
822 self._config_data.cluster.serial_no += 1
823 self._UnlockedReleaseDRBDMinors(instance.name)
826 def _EnsureUUID(self, item, ec_id):
827 """Ensures a given object has a valid UUID.
829 @param item: the instance or node to be checked
830 @param ec_id: the execution context id for the uuid reservation
834 item.uuid = self._GenerateUniqueID(ec_id)
835 elif item.uuid in self._AllIDs(include_temporary=True):
836 raise errors.ConfigurationError("Cannot add '%s': UUID %s already"
837 " in use" % (item.name, item.uuid))
839 def _SetInstanceStatus(self, instance_name, status):
840 """Set the instance's status to a given value.
843 assert isinstance(status, bool), \
844 "Invalid status '%s' passed to SetInstanceStatus" % (status,)
846 if instance_name not in self._config_data.instances:
847 raise errors.ConfigurationError("Unknown instance '%s'" %
849 instance = self._config_data.instances[instance_name]
850 if instance.admin_up != status:
851 instance.admin_up = status
852 instance.serial_no += 1
853 instance.mtime = time.time()
856 @locking.ssynchronized(_config_lock)
857 def MarkInstanceUp(self, instance_name):
858 """Mark the instance status to up in the config.
861 self._SetInstanceStatus(instance_name, True)
863 @locking.ssynchronized(_config_lock)
864 def RemoveInstance(self, instance_name):
865 """Remove the instance from the configuration.
868 if instance_name not in self._config_data.instances:
869 raise errors.ConfigurationError("Unknown instance '%s'" % instance_name)
870 del self._config_data.instances[instance_name]
871 self._config_data.cluster.serial_no += 1
874 @locking.ssynchronized(_config_lock)
875 def RenameInstance(self, old_name, new_name):
876 """Rename an instance.
878 This needs to be done in ConfigWriter and not by RemoveInstance
879 combined with AddInstance as only we can guarantee an atomic
883 if old_name not in self._config_data.instances:
884 raise errors.ConfigurationError("Unknown instance '%s'" % old_name)
885 inst = self._config_data.instances[old_name]
886 del self._config_data.instances[old_name]
889 for disk in inst.disks:
890 if disk.dev_type == constants.LD_FILE:
891 # rename the file paths in logical and physical id
892 file_storage_dir = os.path.dirname(os.path.dirname(disk.logical_id[1]))
893 disk.physical_id = disk.logical_id = (disk.logical_id[0],
894 utils.PathJoin(file_storage_dir,
898 self._config_data.instances[inst.name] = inst
901 @locking.ssynchronized(_config_lock)
902 def MarkInstanceDown(self, instance_name):
903 """Mark the status of an instance to down in the configuration.
906 self._SetInstanceStatus(instance_name, False)
908 def _UnlockedGetInstanceList(self):
909 """Get the list of instances.
911 This function is for internal use, when the config lock is already held.
914 return self._config_data.instances.keys()
916 @locking.ssynchronized(_config_lock, shared=1)
917 def GetInstanceList(self):
918 """Get the list of instances.
920 @return: array of instances, ex. ['instance2.example.com',
921 'instance1.example.com']
924 return self._UnlockedGetInstanceList()
926 @locking.ssynchronized(_config_lock, shared=1)
927 def ExpandInstanceName(self, short_name):
928 """Attempt to expand an incomplete instance name.
931 return utils.MatchNameComponent(short_name,
932 self._config_data.instances.keys(),
933 case_sensitive=False)
935 def _UnlockedGetInstanceInfo(self, instance_name):
936 """Returns information about an instance.
938 This function is for internal use, when the config lock is already held.
941 if instance_name not in self._config_data.instances:
944 return self._config_data.instances[instance_name]
946 @locking.ssynchronized(_config_lock, shared=1)
947 def GetInstanceInfo(self, instance_name):
948 """Returns information about an instance.
950 It takes the information from the configuration file. Other information of
951 an instance are taken from the live systems.
953 @param instance_name: name of the instance, e.g.
954 I{instance1.example.com}
956 @rtype: L{objects.Instance}
957 @return: the instance object
960 return self._UnlockedGetInstanceInfo(instance_name)
962 @locking.ssynchronized(_config_lock, shared=1)
963 def GetAllInstancesInfo(self):
964 """Get the configuration of all instances.
967 @return: dict of (instance, instance_info), where instance_info is what
968 would GetInstanceInfo return for the node
971 my_dict = dict([(instance, self._UnlockedGetInstanceInfo(instance))
972 for instance in self._UnlockedGetInstanceList()])
975 @locking.ssynchronized(_config_lock)
976 def AddNode(self, node, ec_id):
977 """Add a node to the configuration.
979 @type node: L{objects.Node}
980 @param node: a Node instance
983 logging.info("Adding node %s to configuration", node.name)
985 self._EnsureUUID(node, ec_id)
988 node.ctime = node.mtime = time.time()
989 self._config_data.nodes[node.name] = node
990 self._config_data.cluster.serial_no += 1
993 @locking.ssynchronized(_config_lock)
994 def RemoveNode(self, node_name):
995 """Remove a node from the configuration.
998 logging.info("Removing node %s from configuration", node_name)
1000 if node_name not in self._config_data.nodes:
1001 raise errors.ConfigurationError("Unknown node '%s'" % node_name)
1003 del self._config_data.nodes[node_name]
1004 self._config_data.cluster.serial_no += 1
1007 @locking.ssynchronized(_config_lock, shared=1)
1008 def ExpandNodeName(self, short_name):
1009 """Attempt to expand an incomplete instance name.
1012 return utils.MatchNameComponent(short_name,
1013 self._config_data.nodes.keys(),
1014 case_sensitive=False)
1016 def _UnlockedGetNodeInfo(self, node_name):
1017 """Get the configuration of a node, as stored in the config.
1019 This function is for internal use, when the config lock is already
1022 @param node_name: the node name, e.g. I{node1.example.com}
1024 @rtype: L{objects.Node}
1025 @return: the node object
1028 if node_name not in self._config_data.nodes:
1031 return self._config_data.nodes[node_name]
1033 @locking.ssynchronized(_config_lock, shared=1)
1034 def GetNodeInfo(self, node_name):
1035 """Get the configuration of a node, as stored in the config.
1037 This is just a locked wrapper over L{_UnlockedGetNodeInfo}.
1039 @param node_name: the node name, e.g. I{node1.example.com}
1041 @rtype: L{objects.Node}
1042 @return: the node object
1045 return self._UnlockedGetNodeInfo(node_name)
1047 def _UnlockedGetNodeList(self):
1048 """Return the list of nodes which are in the configuration.
1050 This function is for internal use, when the config lock is already
1056 return self._config_data.nodes.keys()
1058 @locking.ssynchronized(_config_lock, shared=1)
1059 def GetNodeList(self):
1060 """Return the list of nodes which are in the configuration.
1063 return self._UnlockedGetNodeList()
1065 def _UnlockedGetOnlineNodeList(self):
1066 """Return the list of nodes which are online.
1069 all_nodes = [self._UnlockedGetNodeInfo(node)
1070 for node in self._UnlockedGetNodeList()]
1071 return [node.name for node in all_nodes if not node.offline]
1073 @locking.ssynchronized(_config_lock, shared=1)
1074 def GetOnlineNodeList(self):
1075 """Return the list of nodes which are online.
1078 return self._UnlockedGetOnlineNodeList()
1080 @locking.ssynchronized(_config_lock, shared=1)
1081 def GetAllNodesInfo(self):
1082 """Get the configuration of all nodes.
1085 @return: dict of (node, node_info), where node_info is what
1086 would GetNodeInfo return for the node
1089 my_dict = dict([(node, self._UnlockedGetNodeInfo(node))
1090 for node in self._UnlockedGetNodeList()])
1093 def _UnlockedGetMasterCandidateStats(self, exceptions=None):
1094 """Get the number of current and maximum desired and possible candidates.
1096 @type exceptions: list
1097 @param exceptions: if passed, list of nodes that should be ignored
1099 @return: tuple of (current, desired and possible, possible)
1102 mc_now = mc_should = mc_max = 0
1103 for node in self._config_data.nodes.values():
1104 if exceptions and node.name in exceptions:
1106 if not (node.offline or node.drained):
1108 if node.master_candidate:
1110 mc_should = min(mc_max, self._config_data.cluster.candidate_pool_size)
1111 return (mc_now, mc_should, mc_max)
1113 @locking.ssynchronized(_config_lock, shared=1)
1114 def GetMasterCandidateStats(self, exceptions=None):
1115 """Get the number of current and maximum possible candidates.
1117 This is just a wrapper over L{_UnlockedGetMasterCandidateStats}.
1119 @type exceptions: list
1120 @param exceptions: if passed, list of nodes that should be ignored
1122 @return: tuple of (current, max)
1125 return self._UnlockedGetMasterCandidateStats(exceptions)
1127 @locking.ssynchronized(_config_lock)
1128 def MaintainCandidatePool(self, exceptions):
1129 """Try to grow the candidate pool to the desired size.
1131 @type exceptions: list
1132 @param exceptions: if passed, list of nodes that should be ignored
1134 @return: list with the adjusted nodes (L{objects.Node} instances)
1137 mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats(exceptions)
1140 node_list = self._config_data.nodes.keys()
1141 random.shuffle(node_list)
1142 for name in node_list:
1143 if mc_now >= mc_max:
1145 node = self._config_data.nodes[name]
1146 if (node.master_candidate or node.offline or node.drained or
1147 node.name in exceptions):
1149 mod_list.append(node)
1150 node.master_candidate = True
1153 if mc_now != mc_max:
1154 # this should not happen
1155 logging.warning("Warning: MaintainCandidatePool didn't manage to"
1156 " fill the candidate pool (%d/%d)", mc_now, mc_max)
1158 self._config_data.cluster.serial_no += 1
1163 def _BumpSerialNo(self):
1164 """Bump up the serial number of the config.
1167 self._config_data.serial_no += 1
1168 self._config_data.mtime = time.time()
1170 def _AllUUIDObjects(self):
1171 """Returns all objects with uuid attributes.
1174 return (self._config_data.instances.values() +
1175 self._config_data.nodes.values() +
1176 [self._config_data.cluster])
1178 def _OpenConfig(self):
1179 """Read the config data from disk.
1182 raw_data = utils.ReadFile(self._cfg_file)
1185 data = objects.ConfigData.FromDict(serializer.Load(raw_data))
1186 except Exception, err:
1187 raise errors.ConfigurationError(err)
1189 # Make sure the configuration has the right version
1190 _ValidateConfig(data)
1192 if (not hasattr(data, 'cluster') or
1193 not hasattr(data.cluster, 'rsahostkeypub')):
1194 raise errors.ConfigurationError("Incomplete configuration"
1195 " (missing cluster.rsahostkeypub)")
1197 # Upgrade configuration if needed
1198 data.UpgradeConfig()
1200 self._config_data = data
1201 # reset the last serial as -1 so that the next write will cause
1203 self._last_cluster_serial = -1
1205 # And finally run our (custom) config upgrade sequence
1206 self._UpgradeConfig()
1208 def _UpgradeConfig(self):
1209 """Run upgrade steps that cannot be done purely in the objects.
1211 This is because some data elements need uniqueness across the
1212 whole configuration, etc.
1214 @warning: this function will call L{_WriteConfig()}, so it needs
1215 to either be called with the lock held or from a safe place
1220 for item in self._AllUUIDObjects():
1221 if item.uuid is None:
1222 item.uuid = self._GenerateUniqueID(_UPGRADE_CONFIG_JID)
1226 # This is ok even if it acquires the internal lock, as _UpgradeConfig is
1227 # only called at config init time, without the lock held
1228 self.DropECReservations(_UPGRADE_CONFIG_JID)
1230 def _DistributeConfig(self, feedback_fn):
1231 """Distribute the configuration to the other nodes.
1233 Currently, this only copies the configuration file. In the future,
1234 it could be used to encapsulate the 2/3-phase update mechanism.
1244 myhostname = self._my_hostname
1245 # we can skip checking whether _UnlockedGetNodeInfo returns None
1246 # since the node list comes from _UnlocketGetNodeList, and we are
1247 # called with the lock held, so no modifications should take place
1249 for node_name in self._UnlockedGetNodeList():
1250 if node_name == myhostname:
1252 node_info = self._UnlockedGetNodeInfo(node_name)
1253 if not node_info.master_candidate:
1255 node_list.append(node_info.name)
1256 addr_list.append(node_info.primary_ip)
1258 result = rpc.RpcRunner.call_upload_file(node_list, self._cfg_file,
1259 address_list=addr_list)
1260 for to_node, to_result in result.items():
1261 msg = to_result.fail_msg
1263 msg = ("Copy of file %s to node %s failed: %s" %
1264 (self._cfg_file, to_node, msg))
1274 def _WriteConfig(self, destination=None, feedback_fn=None):
1275 """Write the configuration data to persistent storage.
1278 assert feedback_fn is None or callable(feedback_fn)
1280 # Warn on config errors, but don't abort the save - the
1281 # configuration has already been modified, and we can't revert;
1282 # the best we can do is to warn the user and save as is, leaving
1283 # recovery to the user
1284 config_errors = self._UnlockedVerifyConfig()
1286 errmsg = ("Configuration data is not consistent: %s" %
1287 (utils.CommaJoin(config_errors)))
1288 logging.critical(errmsg)
1292 if destination is None:
1293 destination = self._cfg_file
1294 self._BumpSerialNo()
1295 txt = serializer.Dump(self._config_data.ToDict())
1297 utils.WriteFile(destination, data=txt)
1299 self.write_count += 1
1301 # and redistribute the config file to master candidates
1302 self._DistributeConfig(feedback_fn)
1304 # Write ssconf files on all nodes (including locally)
1305 if self._last_cluster_serial < self._config_data.cluster.serial_no:
1306 if not self._offline:
1307 result = rpc.RpcRunner.call_write_ssconf_files(
1308 self._UnlockedGetOnlineNodeList(),
1309 self._UnlockedGetSsconfValues())
1311 for nname, nresu in result.items():
1312 msg = nresu.fail_msg
1314 errmsg = ("Error while uploading ssconf files to"
1315 " node %s: %s" % (nname, msg))
1316 logging.warning(errmsg)
1321 self._last_cluster_serial = self._config_data.cluster.serial_no
1323 def _UnlockedGetSsconfValues(self):
1324 """Return the values needed by ssconf.
1327 @return: a dictionary with keys the ssconf names and values their
1332 instance_names = utils.NiceSort(self._UnlockedGetInstanceList())
1333 node_names = utils.NiceSort(self._UnlockedGetNodeList())
1334 node_info = [self._UnlockedGetNodeInfo(name) for name in node_names]
1335 node_pri_ips = ["%s %s" % (ninfo.name, ninfo.primary_ip)
1336 for ninfo in node_info]
1337 node_snd_ips = ["%s %s" % (ninfo.name, ninfo.secondary_ip)
1338 for ninfo in node_info]
1340 instance_data = fn(instance_names)
1341 off_data = fn(node.name for node in node_info if node.offline)
1342 on_data = fn(node.name for node in node_info if not node.offline)
1343 mc_data = fn(node.name for node in node_info if node.master_candidate)
1344 mc_ips_data = fn(node.primary_ip for node in node_info
1345 if node.master_candidate)
1346 node_data = fn(node_names)
1347 node_pri_ips_data = fn(node_pri_ips)
1348 node_snd_ips_data = fn(node_snd_ips)
1350 cluster = self._config_data.cluster
1351 cluster_tags = fn(cluster.GetTags())
1353 constants.SS_CLUSTER_NAME: cluster.cluster_name,
1354 constants.SS_CLUSTER_TAGS: cluster_tags,
1355 constants.SS_FILE_STORAGE_DIR: cluster.file_storage_dir,
1356 constants.SS_MASTER_CANDIDATES: mc_data,
1357 constants.SS_MASTER_CANDIDATES_IPS: mc_ips_data,
1358 constants.SS_MASTER_IP: cluster.master_ip,
1359 constants.SS_MASTER_NETDEV: cluster.master_netdev,
1360 constants.SS_MASTER_NODE: cluster.master_node,
1361 constants.SS_NODE_LIST: node_data,
1362 constants.SS_NODE_PRIMARY_IPS: node_pri_ips_data,
1363 constants.SS_NODE_SECONDARY_IPS: node_snd_ips_data,
1364 constants.SS_OFFLINE_NODES: off_data,
1365 constants.SS_ONLINE_NODES: on_data,
1366 constants.SS_INSTANCE_LIST: instance_data,
1367 constants.SS_RELEASE_VERSION: constants.RELEASE_VERSION,
1370 @locking.ssynchronized(_config_lock, shared=1)
1371 def GetVGName(self):
1372 """Return the volume group name.
1375 return self._config_data.cluster.volume_group_name
1377 @locking.ssynchronized(_config_lock)
1378 def SetVGName(self, vg_name):
1379 """Set the volume group name.
1382 self._config_data.cluster.volume_group_name = vg_name
1383 self._config_data.cluster.serial_no += 1
1386 @locking.ssynchronized(_config_lock, shared=1)
1387 def GetMACPrefix(self):
1388 """Return the mac prefix.
1391 return self._config_data.cluster.mac_prefix
1393 @locking.ssynchronized(_config_lock, shared=1)
1394 def GetClusterInfo(self):
1395 """Returns information about the cluster
1397 @rtype: L{objects.Cluster}
1398 @return: the cluster object
1401 return self._config_data.cluster
1403 @locking.ssynchronized(_config_lock)
1404 def Update(self, target, feedback_fn):
1405 """Notify function to be called after updates.
1407 This function must be called when an object (as returned by
1408 GetInstanceInfo, GetNodeInfo, GetCluster) has been updated and the
1409 caller wants the modifications saved to the backing store. Note
1410 that all modified objects will be saved, but the target argument
1411 is the one the caller wants to ensure that it's saved.
1413 @param target: an instance of either L{objects.Cluster},
1414 L{objects.Node} or L{objects.Instance} which is existing in
1416 @param feedback_fn: Callable feedback function
1419 if self._config_data is None:
1420 raise errors.ProgrammerError("Configuration file not read,"
1422 update_serial = False
1423 if isinstance(target, objects.Cluster):
1424 test = target == self._config_data.cluster
1425 elif isinstance(target, objects.Node):
1426 test = target in self._config_data.nodes.values()
1427 update_serial = True
1428 elif isinstance(target, objects.Instance):
1429 test = target in self._config_data.instances.values()
1431 raise errors.ProgrammerError("Invalid object type (%s) passed to"
1432 " ConfigWriter.Update" % type(target))
1434 raise errors.ConfigurationError("Configuration updated since object"
1435 " has been read or unknown object")
1436 target.serial_no += 1
1437 target.mtime = now = time.time()
1440 # for node updates, we need to increase the cluster serial too
1441 self._config_data.cluster.serial_no += 1
1442 self._config_data.cluster.mtime = now
1444 if isinstance(target, objects.Instance):
1445 self._UnlockedReleaseDRBDMinors(target.name)
1447 self._WriteConfig(feedback_fn=feedback_fn)
1449 @locking.ssynchronized(_config_lock)
1450 def DropECReservations(self, ec_id):
1451 """Drop per-execution-context reservations
1454 self._temporary_ids.DropECReservations(ec_id)
1455 self._temporary_macs.DropECReservations(ec_id)
1456 self._temporary_secrets.DropECReservations(ec_id)