4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Configuration management for Ganeti
24 This module provides the interface to the Ganeti cluster configuration.
26 The configuration data is stored on every node but is updated on the master
27 only. After each update, the master distributes the data to the other nodes.
29 Currently, the data storage format is JSON. YAML was slow and consuming too
39 from ganeti import errors
40 from ganeti import locking
41 from ganeti import utils
42 from ganeti import constants
43 from ganeti import rpc
44 from ganeti import objects
45 from ganeti import serializer
46 from ganeti import uidpool
49 _config_lock = locking.SharedLock()
51 # job id used for resource management at config upgrade time
52 _UPGRADE_CONFIG_JID = "jid-cfg-upgrade"
55 def _ValidateConfig(data):
56 """Verifies that a configuration objects looks valid.
58 This only verifies the version of the configuration.
60 @raise errors.ConfigurationError: if the version differs from what
64 if data.version != constants.CONFIG_VERSION:
65 raise errors.ConfigurationError("Cluster configuration version"
66 " mismatch, got %s instead of %s" %
68 constants.CONFIG_VERSION))
71 class TemporaryReservationManager:
72 """A temporary resource reservation manager.
74 This is used to reserve resources in a job, before using them, making sure
75 other jobs cannot get them in the meantime.
79 self._ec_reserved = {}
81 def Reserved(self, resource):
82 for holder_reserved in self._ec_reserved.items():
83 if resource in holder_reserved:
87 def Reserve(self, ec_id, resource):
88 if self.Reserved(resource):
89 raise errors.ReservationError("Duplicate reservation for resource: %s." %
91 if ec_id not in self._ec_reserved:
92 self._ec_reserved[ec_id] = set([resource])
94 self._ec_reserved[ec_id].add(resource)
96 def DropECReservations(self, ec_id):
97 if ec_id in self._ec_reserved:
98 del self._ec_reserved[ec_id]
100 def GetReserved(self):
102 for holder_reserved in self._ec_reserved.values():
103 all_reserved.update(holder_reserved)
106 def Generate(self, existing, generate_one_fn, ec_id):
107 """Generate a new resource of this type
110 assert callable(generate_one_fn)
112 all_elems = self.GetReserved()
113 all_elems.update(existing)
116 new_resource = generate_one_fn()
117 if new_resource is not None and new_resource not in all_elems:
120 raise errors.ConfigurationError("Not able generate new resource"
121 " (last tried: %s)" % new_resource)
122 self.Reserve(ec_id, new_resource)
127 """The interface to the cluster configuration.
129 @ivar _temporary_lvs: reservation manager for temporary LVs
130 @ivar _all_rms: a list of all temporary reservation managers
133 def __init__(self, cfg_file=None, offline=False):
135 self._lock = _config_lock
136 self._config_data = None
137 self._offline = offline
139 self._cfg_file = constants.CLUSTER_CONF_FILE
141 self._cfg_file = cfg_file
142 self._temporary_ids = TemporaryReservationManager()
143 self._temporary_drbds = {}
144 self._temporary_macs = TemporaryReservationManager()
145 self._temporary_secrets = TemporaryReservationManager()
146 self._temporary_lvs = TemporaryReservationManager()
147 self._all_rms = [self._temporary_ids, self._temporary_macs,
148 self._temporary_secrets, self._temporary_lvs]
149 # Note: in order to prevent errors when resolving our name in
150 # _DistributeConfig, we compute it here once and reuse it; it's
151 # better to raise an error before starting to modify the config
152 # file than after it was modified
153 self._my_hostname = utils.HostInfo().name
154 self._last_cluster_serial = -1
157 # this method needs to be static, so that we can call it on the class
160 """Check if the cluster is configured.
163 return os.path.exists(constants.CLUSTER_CONF_FILE)
165 def _GenerateOneMAC(self):
166 """Generate one mac address
169 prefix = self._config_data.cluster.mac_prefix
170 byte1 = random.randrange(0, 256)
171 byte2 = random.randrange(0, 256)
172 byte3 = random.randrange(0, 256)
173 mac = "%s:%02x:%02x:%02x" % (prefix, byte1, byte2, byte3)
176 @locking.ssynchronized(_config_lock, shared=1)
177 def GenerateMAC(self, ec_id):
178 """Generate a MAC for an instance.
180 This should check the current instances for duplicates.
183 existing = self._AllMACs()
184 return self._temporary_ids.Generate(existing, self._GenerateOneMAC, ec_id)
186 @locking.ssynchronized(_config_lock, shared=1)
187 def ReserveMAC(self, mac, ec_id):
188 """Reserve a MAC for an instance.
190 This only checks instances managed by this cluster, it does not
191 check for potential collisions elsewhere.
194 all_macs = self._AllMACs()
196 raise errors.ReservationError("mac already in use")
198 self._temporary_macs.Reserve(mac, ec_id)
200 @locking.ssynchronized(_config_lock, shared=1)
201 def ReserveLV(self, lv_name, ec_id):
202 """Reserve an VG/LV pair for an instance.
204 @type lv_name: string
205 @param lv_name: the logical volume name to reserve
208 all_lvs = self._AllLVs()
209 if lv_name in all_lvs:
210 raise errors.ReservationError("LV already in use")
212 self._temporary_lvs.Reserve(lv_name, ec_id)
214 @locking.ssynchronized(_config_lock, shared=1)
215 def GenerateDRBDSecret(self, ec_id):
216 """Generate a DRBD secret.
218 This checks the current disks for duplicates.
221 return self._temporary_secrets.Generate(self._AllDRBDSecrets(),
222 utils.GenerateSecret,
226 """Compute the list of all LVs.
230 for instance in self._config_data.instances.values():
231 node_data = instance.MapLVsByNode()
232 for lv_list in node_data.values():
233 lvnames.update(lv_list)
236 def _AllIDs(self, include_temporary):
237 """Compute the list of all UUIDs and names we have.
239 @type include_temporary: boolean
240 @param include_temporary: whether to include the _temporary_ids set
242 @return: a set of IDs
246 if include_temporary:
247 existing.update(self._temporary_ids.GetReserved())
248 existing.update(self._AllLVs())
249 existing.update(self._config_data.instances.keys())
250 existing.update(self._config_data.nodes.keys())
251 existing.update([i.uuid for i in self._AllUUIDObjects() if i.uuid])
254 def _GenerateUniqueID(self, ec_id):
255 """Generate an unique UUID.
257 This checks the current node, instances and disk names for
261 @return: the unique id
264 existing = self._AllIDs(include_temporary=False)
265 return self._temporary_ids.Generate(existing, utils.NewUUID, ec_id)
267 @locking.ssynchronized(_config_lock, shared=1)
268 def GenerateUniqueID(self, ec_id):
269 """Generate an unique ID.
271 This is just a wrapper over the unlocked version.
274 @param ec_id: unique id for the job to reserve the id to
277 return self._GenerateUniqueID(ec_id)
280 """Return all MACs present in the config.
283 @return: the list of all MACs
287 for instance in self._config_data.instances.values():
288 for nic in instance.nics:
289 result.append(nic.mac)
293 def _AllDRBDSecrets(self):
294 """Return all DRBD secrets present in the config.
297 @return: the list of all DRBD secrets
300 def helper(disk, result):
301 """Recursively gather secrets from this disk."""
302 if disk.dev_type == constants.DT_DRBD8:
303 result.append(disk.logical_id[5])
305 for child in disk.children:
306 helper(child, result)
309 for instance in self._config_data.instances.values():
310 for disk in instance.disks:
315 def _CheckDiskIDs(self, disk, l_ids, p_ids):
316 """Compute duplicate disk IDs
318 @type disk: L{objects.Disk}
319 @param disk: the disk at which to start searching
321 @param l_ids: list of current logical ids
323 @param p_ids: list of current physical ids
325 @return: a list of error messages
329 if disk.logical_id is not None:
330 if disk.logical_id in l_ids:
331 result.append("duplicate logical id %s" % str(disk.logical_id))
333 l_ids.append(disk.logical_id)
334 if disk.physical_id is not None:
335 if disk.physical_id in p_ids:
336 result.append("duplicate physical id %s" % str(disk.physical_id))
338 p_ids.append(disk.physical_id)
341 for child in disk.children:
342 result.extend(self._CheckDiskIDs(child, l_ids, p_ids))
345 def _UnlockedVerifyConfig(self):
349 @return: a list of error messages; a non-empty list signifies
356 data = self._config_data
360 # global cluster checks
361 if not data.cluster.enabled_hypervisors:
362 result.append("enabled hypervisors list doesn't have any entries")
363 invalid_hvs = set(data.cluster.enabled_hypervisors) - constants.HYPER_TYPES
365 result.append("enabled hypervisors contains invalid entries: %s" %
367 missing_hvp = (set(data.cluster.enabled_hypervisors) -
368 set(data.cluster.hvparams.keys()))
370 result.append("hypervisor parameters missing for the enabled"
371 " hypervisor(s) %s" % utils.CommaJoin(missing_hvp))
373 if data.cluster.master_node not in data.nodes:
374 result.append("cluster has invalid primary node '%s'" %
375 data.cluster.master_node)
377 # per-instance checks
378 for instance_name in data.instances:
379 instance = data.instances[instance_name]
380 if instance.name != instance_name:
381 result.append("instance '%s' is indexed by wrong name '%s'" %
382 (instance.name, instance_name))
383 if instance.primary_node not in data.nodes:
384 result.append("instance '%s' has invalid primary node '%s'" %
385 (instance_name, instance.primary_node))
386 for snode in instance.secondary_nodes:
387 if snode not in data.nodes:
388 result.append("instance '%s' has invalid secondary node '%s'" %
389 (instance_name, snode))
390 for idx, nic in enumerate(instance.nics):
391 if nic.mac in seen_macs:
392 result.append("instance '%s' has NIC %d mac %s duplicate" %
393 (instance_name, idx, nic.mac))
395 seen_macs.append(nic.mac)
397 # gather the drbd ports for duplicate checks
398 for dsk in instance.disks:
399 if dsk.dev_type in constants.LDS_DRBD:
400 tcp_port = dsk.logical_id[2]
401 if tcp_port not in ports:
403 ports[tcp_port].append((instance.name, "drbd disk %s" % dsk.iv_name))
404 # gather network port reservation
405 net_port = getattr(instance, "network_port", None)
406 if net_port is not None:
407 if net_port not in ports:
409 ports[net_port].append((instance.name, "network port"))
411 # instance disk verify
412 for idx, disk in enumerate(instance.disks):
413 result.extend(["instance '%s' disk %d error: %s" %
414 (instance.name, idx, msg) for msg in disk.Verify()])
415 result.extend(self._CheckDiskIDs(disk, seen_lids, seen_pids))
417 # cluster-wide pool of free ports
418 for free_port in data.cluster.tcpudp_port_pool:
419 if free_port not in ports:
420 ports[free_port] = []
421 ports[free_port].append(("cluster", "port marked as free"))
423 # compute tcp/udp duplicate ports
429 txt = utils.CommaJoin(["%s/%s" % val for val in pdata])
430 result.append("tcp/udp port %s has duplicates: %s" % (pnum, txt))
432 # highest used tcp port check
434 if keys[-1] > data.cluster.highest_used_port:
435 result.append("Highest used port mismatch, saved %s, computed %s" %
436 (data.cluster.highest_used_port, keys[-1]))
438 if not data.nodes[data.cluster.master_node].master_candidate:
439 result.append("Master node is not a master candidate")
441 # master candidate checks
442 mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats()
444 result.append("Not enough master candidates: actual %d, target %d" %
448 for node_name, node in data.nodes.items():
449 if node.name != node_name:
450 result.append("Node '%s' is indexed by wrong name '%s'" %
451 (node.name, node_name))
452 if [node.master_candidate, node.drained, node.offline].count(True) > 1:
453 result.append("Node %s state is invalid: master_candidate=%s,"
454 " drain=%s, offline=%s" %
455 (node.name, node.master_candidate, node.drain,
459 _, duplicates = self._UnlockedComputeDRBDMap()
460 for node, minor, instance_a, instance_b in duplicates:
461 result.append("DRBD minor %d on node %s is assigned twice to instances"
462 " %s and %s" % (minor, node, instance_a, instance_b))
465 default_nicparams = data.cluster.nicparams[constants.PP_DEFAULT]
468 def _AddIpAddress(ip, name):
469 ips.setdefault(ip, []).append(name)
471 _AddIpAddress(data.cluster.master_ip, "cluster_ip")
473 for node in data.nodes.values():
474 _AddIpAddress(node.primary_ip, "node:%s/primary" % node.name)
475 if node.secondary_ip != node.primary_ip:
476 _AddIpAddress(node.secondary_ip, "node:%s/secondary" % node.name)
478 for instance in data.instances.values():
479 for idx, nic in enumerate(instance.nics):
483 nicparams = objects.FillDict(default_nicparams, nic.nicparams)
484 nic_mode = nicparams[constants.NIC_MODE]
485 nic_link = nicparams[constants.NIC_LINK]
487 if nic_mode == constants.NIC_MODE_BRIDGED:
488 link = "bridge:%s" % nic_link
489 elif nic_mode == constants.NIC_MODE_ROUTED:
490 link = "route:%s" % nic_link
492 raise errors.ProgrammerError("NIC mode '%s' not handled" % nic_mode)
494 _AddIpAddress("%s/%s" % (link, nic.ip),
495 "instance:%s/nic:%d" % (instance.name, idx))
497 for ip, owners in ips.items():
499 result.append("IP address %s is used by multiple owners: %s" %
500 (ip, utils.CommaJoin(owners)))
504 @locking.ssynchronized(_config_lock, shared=1)
505 def VerifyConfig(self):
508 This is just a wrapper over L{_UnlockedVerifyConfig}.
511 @return: a list of error messages; a non-empty list signifies
515 return self._UnlockedVerifyConfig()
517 def _UnlockedSetDiskID(self, disk, node_name):
518 """Convert the unique ID to the ID needed on the target nodes.
520 This is used only for drbd, which needs ip/port configuration.
522 The routine descends down and updates its children also, because
523 this helps when the only the top device is passed to the remote
526 This function is for internal use, when the config lock is already held.
530 for child in disk.children:
531 self._UnlockedSetDiskID(child, node_name)
533 if disk.logical_id is None and disk.physical_id is not None:
535 if disk.dev_type == constants.LD_DRBD8:
536 pnode, snode, port, pminor, sminor, secret = disk.logical_id
537 if node_name not in (pnode, snode):
538 raise errors.ConfigurationError("DRBD device not knowing node %s" %
540 pnode_info = self._UnlockedGetNodeInfo(pnode)
541 snode_info = self._UnlockedGetNodeInfo(snode)
542 if pnode_info is None or snode_info is None:
543 raise errors.ConfigurationError("Can't find primary or secondary node"
544 " for %s" % str(disk))
545 p_data = (pnode_info.secondary_ip, port)
546 s_data = (snode_info.secondary_ip, port)
547 if pnode == node_name:
548 disk.physical_id = p_data + s_data + (pminor, secret)
549 else: # it must be secondary, we tested above
550 disk.physical_id = s_data + p_data + (sminor, secret)
552 disk.physical_id = disk.logical_id
555 @locking.ssynchronized(_config_lock)
556 def SetDiskID(self, disk, node_name):
557 """Convert the unique ID to the ID needed on the target nodes.
559 This is used only for drbd, which needs ip/port configuration.
561 The routine descends down and updates its children also, because
562 this helps when the only the top device is passed to the remote
566 return self._UnlockedSetDiskID(disk, node_name)
568 @locking.ssynchronized(_config_lock)
569 def AddTcpUdpPort(self, port):
570 """Adds a new port to the available port pool.
573 if not isinstance(port, int):
574 raise errors.ProgrammerError("Invalid type passed for port")
576 self._config_data.cluster.tcpudp_port_pool.add(port)
579 @locking.ssynchronized(_config_lock, shared=1)
580 def GetPortList(self):
581 """Returns a copy of the current port list.
584 return self._config_data.cluster.tcpudp_port_pool.copy()
586 @locking.ssynchronized(_config_lock)
587 def AllocatePort(self):
590 The port will be taken from the available port pool or from the
591 default port range (and in this case we increase
595 # If there are TCP/IP ports configured, we use them first.
596 if self._config_data.cluster.tcpudp_port_pool:
597 port = self._config_data.cluster.tcpudp_port_pool.pop()
599 port = self._config_data.cluster.highest_used_port + 1
600 if port >= constants.LAST_DRBD_PORT:
601 raise errors.ConfigurationError("The highest used port is greater"
602 " than %s. Aborting." %
603 constants.LAST_DRBD_PORT)
604 self._config_data.cluster.highest_used_port = port
609 def _UnlockedComputeDRBDMap(self):
610 """Compute the used DRBD minor/nodes.
613 @return: dictionary of node_name: dict of minor: instance_name;
614 the returned dict will have all the nodes in it (even if with
615 an empty list), and a list of duplicates; if the duplicates
616 list is not empty, the configuration is corrupted and its caller
617 should raise an exception
620 def _AppendUsedPorts(instance_name, disk, used):
622 if disk.dev_type == constants.LD_DRBD8 and len(disk.logical_id) >= 5:
623 node_a, node_b, _, minor_a, minor_b = disk.logical_id[:5]
624 for node, port in ((node_a, minor_a), (node_b, minor_b)):
625 assert node in used, ("Node '%s' of instance '%s' not found"
626 " in node list" % (node, instance_name))
627 if port in used[node]:
628 duplicates.append((node, port, instance_name, used[node][port]))
630 used[node][port] = instance_name
632 for child in disk.children:
633 duplicates.extend(_AppendUsedPorts(instance_name, child, used))
637 my_dict = dict((node, {}) for node in self._config_data.nodes)
638 for instance in self._config_data.instances.itervalues():
639 for disk in instance.disks:
640 duplicates.extend(_AppendUsedPorts(instance.name, disk, my_dict))
641 for (node, minor), instance in self._temporary_drbds.iteritems():
642 if minor in my_dict[node] and my_dict[node][minor] != instance:
643 duplicates.append((node, minor, instance, my_dict[node][minor]))
645 my_dict[node][minor] = instance
646 return my_dict, duplicates
648 @locking.ssynchronized(_config_lock)
649 def ComputeDRBDMap(self):
650 """Compute the used DRBD minor/nodes.
652 This is just a wrapper over L{_UnlockedComputeDRBDMap}.
654 @return: dictionary of node_name: dict of minor: instance_name;
655 the returned dict will have all the nodes in it (even if with
659 d_map, duplicates = self._UnlockedComputeDRBDMap()
661 raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
665 @locking.ssynchronized(_config_lock)
666 def AllocateDRBDMinor(self, nodes, instance):
667 """Allocate a drbd minor.
669 The free minor will be automatically computed from the existing
670 devices. A node can be given multiple times in order to allocate
671 multiple minors. The result is the list of minors, in the same
672 order as the passed nodes.
674 @type instance: string
675 @param instance: the instance for which we allocate minors
678 assert isinstance(instance, basestring), \
679 "Invalid argument '%s' passed to AllocateDRBDMinor" % instance
681 d_map, duplicates = self._UnlockedComputeDRBDMap()
683 raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
689 # no minors used, we can start at 0
692 self._temporary_drbds[(nname, 0)] = instance
696 ffree = utils.FirstFree(keys)
698 # return the next minor
699 # TODO: implement high-limit check
703 # double-check minor against current instances
704 assert minor not in d_map[nname], \
705 ("Attempt to reuse allocated DRBD minor %d on node %s,"
706 " already allocated to instance %s" %
707 (minor, nname, d_map[nname][minor]))
708 ndata[minor] = instance
709 # double-check minor against reservation
710 r_key = (nname, minor)
711 assert r_key not in self._temporary_drbds, \
712 ("Attempt to reuse reserved DRBD minor %d on node %s,"
713 " reserved for instance %s" %
714 (minor, nname, self._temporary_drbds[r_key]))
715 self._temporary_drbds[r_key] = instance
717 logging.debug("Request to allocate drbd minors, input: %s, returning %s",
721 def _UnlockedReleaseDRBDMinors(self, instance):
722 """Release temporary drbd minors allocated for a given instance.
724 @type instance: string
725 @param instance: the instance for which temporary minors should be
729 assert isinstance(instance, basestring), \
730 "Invalid argument passed to ReleaseDRBDMinors"
731 for key, name in self._temporary_drbds.items():
733 del self._temporary_drbds[key]
735 @locking.ssynchronized(_config_lock)
736 def ReleaseDRBDMinors(self, instance):
737 """Release temporary drbd minors allocated for a given instance.
739 This should be called on the error paths, on the success paths
740 it's automatically called by the ConfigWriter add and update
743 This function is just a wrapper over L{_UnlockedReleaseDRBDMinors}.
745 @type instance: string
746 @param instance: the instance for which temporary minors should be
750 self._UnlockedReleaseDRBDMinors(instance)
752 @locking.ssynchronized(_config_lock, shared=1)
753 def GetConfigVersion(self):
754 """Get the configuration version.
756 @return: Config version
759 return self._config_data.version
761 @locking.ssynchronized(_config_lock, shared=1)
762 def GetClusterName(self):
765 @return: Cluster name
768 return self._config_data.cluster.cluster_name
770 @locking.ssynchronized(_config_lock, shared=1)
771 def GetMasterNode(self):
772 """Get the hostname of the master node for this cluster.
774 @return: Master hostname
777 return self._config_data.cluster.master_node
779 @locking.ssynchronized(_config_lock, shared=1)
780 def GetMasterIP(self):
781 """Get the IP of the master node for this cluster.
786 return self._config_data.cluster.master_ip
788 @locking.ssynchronized(_config_lock, shared=1)
789 def GetMasterNetdev(self):
790 """Get the master network device for this cluster.
793 return self._config_data.cluster.master_netdev
795 @locking.ssynchronized(_config_lock, shared=1)
796 def GetFileStorageDir(self):
797 """Get the file storage dir for this cluster.
800 return self._config_data.cluster.file_storage_dir
802 @locking.ssynchronized(_config_lock, shared=1)
803 def GetHypervisorType(self):
804 """Get the hypervisor type for this cluster.
807 return self._config_data.cluster.enabled_hypervisors[0]
809 @locking.ssynchronized(_config_lock, shared=1)
810 def GetHostKey(self):
811 """Return the rsa hostkey from the config.
814 @return: the rsa hostkey
817 return self._config_data.cluster.rsahostkeypub
819 @locking.ssynchronized(_config_lock)
820 def AddInstance(self, instance, ec_id):
821 """Add an instance to the config.
823 This should be used after creating a new instance.
825 @type instance: L{objects.Instance}
826 @param instance: the instance object
829 if not isinstance(instance, objects.Instance):
830 raise errors.ProgrammerError("Invalid type passed to AddInstance")
832 if instance.disk_template != constants.DT_DISKLESS:
833 all_lvs = instance.MapLVsByNode()
834 logging.info("Instance '%s' DISK_LAYOUT: %s", instance.name, all_lvs)
836 all_macs = self._AllMACs()
837 for nic in instance.nics:
838 if nic.mac in all_macs:
839 raise errors.ConfigurationError("Cannot add instance %s:"
840 " MAC address '%s' already in use." %
841 (instance.name, nic.mac))
843 self._EnsureUUID(instance, ec_id)
845 instance.serial_no = 1
846 instance.ctime = instance.mtime = time.time()
847 self._config_data.instances[instance.name] = instance
848 self._config_data.cluster.serial_no += 1
849 self._UnlockedReleaseDRBDMinors(instance.name)
852 def _EnsureUUID(self, item, ec_id):
853 """Ensures a given object has a valid UUID.
855 @param item: the instance or node to be checked
856 @param ec_id: the execution context id for the uuid reservation
860 item.uuid = self._GenerateUniqueID(ec_id)
861 elif item.uuid in self._AllIDs(include_temporary=True):
862 raise errors.ConfigurationError("Cannot add '%s': UUID %s already"
863 " in use" % (item.name, item.uuid))
865 def _SetInstanceStatus(self, instance_name, status):
866 """Set the instance's status to a given value.
869 assert isinstance(status, bool), \
870 "Invalid status '%s' passed to SetInstanceStatus" % (status,)
872 if instance_name not in self._config_data.instances:
873 raise errors.ConfigurationError("Unknown instance '%s'" %
875 instance = self._config_data.instances[instance_name]
876 if instance.admin_up != status:
877 instance.admin_up = status
878 instance.serial_no += 1
879 instance.mtime = time.time()
882 @locking.ssynchronized(_config_lock)
883 def MarkInstanceUp(self, instance_name):
884 """Mark the instance status to up in the config.
887 self._SetInstanceStatus(instance_name, True)
889 @locking.ssynchronized(_config_lock)
890 def RemoveInstance(self, instance_name):
891 """Remove the instance from the configuration.
894 if instance_name not in self._config_data.instances:
895 raise errors.ConfigurationError("Unknown instance '%s'" % instance_name)
896 del self._config_data.instances[instance_name]
897 self._config_data.cluster.serial_no += 1
900 @locking.ssynchronized(_config_lock)
901 def RenameInstance(self, old_name, new_name):
902 """Rename an instance.
904 This needs to be done in ConfigWriter and not by RemoveInstance
905 combined with AddInstance as only we can guarantee an atomic
909 if old_name not in self._config_data.instances:
910 raise errors.ConfigurationError("Unknown instance '%s'" % old_name)
911 inst = self._config_data.instances[old_name]
912 del self._config_data.instances[old_name]
915 for disk in inst.disks:
916 if disk.dev_type == constants.LD_FILE:
917 # rename the file paths in logical and physical id
918 file_storage_dir = os.path.dirname(os.path.dirname(disk.logical_id[1]))
919 disk.physical_id = disk.logical_id = (disk.logical_id[0],
920 utils.PathJoin(file_storage_dir,
924 self._config_data.instances[inst.name] = inst
927 @locking.ssynchronized(_config_lock)
928 def MarkInstanceDown(self, instance_name):
929 """Mark the status of an instance to down in the configuration.
932 self._SetInstanceStatus(instance_name, False)
934 def _UnlockedGetInstanceList(self):
935 """Get the list of instances.
937 This function is for internal use, when the config lock is already held.
940 return self._config_data.instances.keys()
942 @locking.ssynchronized(_config_lock, shared=1)
943 def GetInstanceList(self):
944 """Get the list of instances.
946 @return: array of instances, ex. ['instance2.example.com',
947 'instance1.example.com']
950 return self._UnlockedGetInstanceList()
952 @locking.ssynchronized(_config_lock, shared=1)
953 def ExpandInstanceName(self, short_name):
954 """Attempt to expand an incomplete instance name.
957 return utils.MatchNameComponent(short_name,
958 self._config_data.instances.keys(),
959 case_sensitive=False)
961 def _UnlockedGetInstanceInfo(self, instance_name):
962 """Returns information about an instance.
964 This function is for internal use, when the config lock is already held.
967 if instance_name not in self._config_data.instances:
970 return self._config_data.instances[instance_name]
972 @locking.ssynchronized(_config_lock, shared=1)
973 def GetInstanceInfo(self, instance_name):
974 """Returns information about an instance.
976 It takes the information from the configuration file. Other information of
977 an instance are taken from the live systems.
979 @param instance_name: name of the instance, e.g.
980 I{instance1.example.com}
982 @rtype: L{objects.Instance}
983 @return: the instance object
986 return self._UnlockedGetInstanceInfo(instance_name)
988 @locking.ssynchronized(_config_lock, shared=1)
989 def GetAllInstancesInfo(self):
990 """Get the configuration of all instances.
993 @return: dict of (instance, instance_info), where instance_info is what
994 would GetInstanceInfo return for the node
997 my_dict = dict([(instance, self._UnlockedGetInstanceInfo(instance))
998 for instance in self._UnlockedGetInstanceList()])
1001 @locking.ssynchronized(_config_lock)
1002 def AddNode(self, node, ec_id):
1003 """Add a node to the configuration.
1005 @type node: L{objects.Node}
1006 @param node: a Node instance
1009 logging.info("Adding node %s to configuration", node.name)
1011 self._EnsureUUID(node, ec_id)
1014 node.ctime = node.mtime = time.time()
1015 self._config_data.nodes[node.name] = node
1016 self._config_data.cluster.serial_no += 1
1019 @locking.ssynchronized(_config_lock)
1020 def RemoveNode(self, node_name):
1021 """Remove a node from the configuration.
1024 logging.info("Removing node %s from configuration", node_name)
1026 if node_name not in self._config_data.nodes:
1027 raise errors.ConfigurationError("Unknown node '%s'" % node_name)
1029 del self._config_data.nodes[node_name]
1030 self._config_data.cluster.serial_no += 1
1033 @locking.ssynchronized(_config_lock, shared=1)
1034 def ExpandNodeName(self, short_name):
1035 """Attempt to expand an incomplete instance name.
1038 return utils.MatchNameComponent(short_name,
1039 self._config_data.nodes.keys(),
1040 case_sensitive=False)
1042 def _UnlockedGetNodeInfo(self, node_name):
1043 """Get the configuration of a node, as stored in the config.
1045 This function is for internal use, when the config lock is already
1048 @param node_name: the node name, e.g. I{node1.example.com}
1050 @rtype: L{objects.Node}
1051 @return: the node object
1054 if node_name not in self._config_data.nodes:
1057 return self._config_data.nodes[node_name]
1059 @locking.ssynchronized(_config_lock, shared=1)
1060 def GetNodeInfo(self, node_name):
1061 """Get the configuration of a node, as stored in the config.
1063 This is just a locked wrapper over L{_UnlockedGetNodeInfo}.
1065 @param node_name: the node name, e.g. I{node1.example.com}
1067 @rtype: L{objects.Node}
1068 @return: the node object
1071 return self._UnlockedGetNodeInfo(node_name)
1073 def _UnlockedGetNodeList(self):
1074 """Return the list of nodes which are in the configuration.
1076 This function is for internal use, when the config lock is already
1082 return self._config_data.nodes.keys()
1084 @locking.ssynchronized(_config_lock, shared=1)
1085 def GetNodeList(self):
1086 """Return the list of nodes which are in the configuration.
1089 return self._UnlockedGetNodeList()
1091 def _UnlockedGetOnlineNodeList(self):
1092 """Return the list of nodes which are online.
1095 all_nodes = [self._UnlockedGetNodeInfo(node)
1096 for node in self._UnlockedGetNodeList()]
1097 return [node.name for node in all_nodes if not node.offline]
1099 @locking.ssynchronized(_config_lock, shared=1)
1100 def GetOnlineNodeList(self):
1101 """Return the list of nodes which are online.
1104 return self._UnlockedGetOnlineNodeList()
1106 @locking.ssynchronized(_config_lock, shared=1)
1107 def GetAllNodesInfo(self):
1108 """Get the configuration of all nodes.
1111 @return: dict of (node, node_info), where node_info is what
1112 would GetNodeInfo return for the node
1115 my_dict = dict([(node, self._UnlockedGetNodeInfo(node))
1116 for node in self._UnlockedGetNodeList()])
1119 def _UnlockedGetMasterCandidateStats(self, exceptions=None):
1120 """Get the number of current and maximum desired and possible candidates.
1122 @type exceptions: list
1123 @param exceptions: if passed, list of nodes that should be ignored
1125 @return: tuple of (current, desired and possible, possible)
1128 mc_now = mc_should = mc_max = 0
1129 for node in self._config_data.nodes.values():
1130 if exceptions and node.name in exceptions:
1132 if not (node.offline or node.drained):
1134 if node.master_candidate:
1136 mc_should = min(mc_max, self._config_data.cluster.candidate_pool_size)
1137 return (mc_now, mc_should, mc_max)
1139 @locking.ssynchronized(_config_lock, shared=1)
1140 def GetMasterCandidateStats(self, exceptions=None):
1141 """Get the number of current and maximum possible candidates.
1143 This is just a wrapper over L{_UnlockedGetMasterCandidateStats}.
1145 @type exceptions: list
1146 @param exceptions: if passed, list of nodes that should be ignored
1148 @return: tuple of (current, max)
1151 return self._UnlockedGetMasterCandidateStats(exceptions)
1153 @locking.ssynchronized(_config_lock)
1154 def MaintainCandidatePool(self, exceptions):
1155 """Try to grow the candidate pool to the desired size.
1157 @type exceptions: list
1158 @param exceptions: if passed, list of nodes that should be ignored
1160 @return: list with the adjusted nodes (L{objects.Node} instances)
1163 mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats(exceptions)
1166 node_list = self._config_data.nodes.keys()
1167 random.shuffle(node_list)
1168 for name in node_list:
1169 if mc_now >= mc_max:
1171 node = self._config_data.nodes[name]
1172 if (node.master_candidate or node.offline or node.drained or
1173 node.name in exceptions):
1175 mod_list.append(node)
1176 node.master_candidate = True
1179 if mc_now != mc_max:
1180 # this should not happen
1181 logging.warning("Warning: MaintainCandidatePool didn't manage to"
1182 " fill the candidate pool (%d/%d)", mc_now, mc_max)
1184 self._config_data.cluster.serial_no += 1
1189 def _BumpSerialNo(self):
1190 """Bump up the serial number of the config.
1193 self._config_data.serial_no += 1
1194 self._config_data.mtime = time.time()
1196 def _AllUUIDObjects(self):
1197 """Returns all objects with uuid attributes.
1200 return (self._config_data.instances.values() +
1201 self._config_data.nodes.values() +
1202 [self._config_data.cluster])
1204 def _OpenConfig(self):
1205 """Read the config data from disk.
1208 raw_data = utils.ReadFile(self._cfg_file)
1211 data = objects.ConfigData.FromDict(serializer.Load(raw_data))
1212 except Exception, err:
1213 raise errors.ConfigurationError(err)
1215 # Make sure the configuration has the right version
1216 _ValidateConfig(data)
1218 if (not hasattr(data, 'cluster') or
1219 not hasattr(data.cluster, 'rsahostkeypub')):
1220 raise errors.ConfigurationError("Incomplete configuration"
1221 " (missing cluster.rsahostkeypub)")
1223 # Upgrade configuration if needed
1224 data.UpgradeConfig()
1226 self._config_data = data
1227 # reset the last serial as -1 so that the next write will cause
1229 self._last_cluster_serial = -1
1231 # And finally run our (custom) config upgrade sequence
1232 self._UpgradeConfig()
1234 def _UpgradeConfig(self):
1235 """Run upgrade steps that cannot be done purely in the objects.
1237 This is because some data elements need uniqueness across the
1238 whole configuration, etc.
1240 @warning: this function will call L{_WriteConfig()}, so it needs
1241 to either be called with the lock held or from a safe place
1246 for item in self._AllUUIDObjects():
1247 if item.uuid is None:
1248 item.uuid = self._GenerateUniqueID(_UPGRADE_CONFIG_JID)
1252 # This is ok even if it acquires the internal lock, as _UpgradeConfig is
1253 # only called at config init time, without the lock held
1254 self.DropECReservations(_UPGRADE_CONFIG_JID)
1256 def _DistributeConfig(self, feedback_fn):
1257 """Distribute the configuration to the other nodes.
1259 Currently, this only copies the configuration file. In the future,
1260 it could be used to encapsulate the 2/3-phase update mechanism.
1270 myhostname = self._my_hostname
1271 # we can skip checking whether _UnlockedGetNodeInfo returns None
1272 # since the node list comes from _UnlocketGetNodeList, and we are
1273 # called with the lock held, so no modifications should take place
1275 for node_name in self._UnlockedGetNodeList():
1276 if node_name == myhostname:
1278 node_info = self._UnlockedGetNodeInfo(node_name)
1279 if not node_info.master_candidate:
1281 node_list.append(node_info.name)
1282 addr_list.append(node_info.primary_ip)
1284 result = rpc.RpcRunner.call_upload_file(node_list, self._cfg_file,
1285 address_list=addr_list)
1286 for to_node, to_result in result.items():
1287 msg = to_result.fail_msg
1289 msg = ("Copy of file %s to node %s failed: %s" %
1290 (self._cfg_file, to_node, msg))
1300 def _WriteConfig(self, destination=None, feedback_fn=None):
1301 """Write the configuration data to persistent storage.
1304 assert feedback_fn is None or callable(feedback_fn)
1306 # Warn on config errors, but don't abort the save - the
1307 # configuration has already been modified, and we can't revert;
1308 # the best we can do is to warn the user and save as is, leaving
1309 # recovery to the user
1310 config_errors = self._UnlockedVerifyConfig()
1312 errmsg = ("Configuration data is not consistent: %s" %
1313 (utils.CommaJoin(config_errors)))
1314 logging.critical(errmsg)
1318 if destination is None:
1319 destination = self._cfg_file
1320 self._BumpSerialNo()
1321 txt = serializer.Dump(self._config_data.ToDict())
1323 utils.WriteFile(destination, data=txt)
1325 self.write_count += 1
1327 # and redistribute the config file to master candidates
1328 self._DistributeConfig(feedback_fn)
1330 # Write ssconf files on all nodes (including locally)
1331 if self._last_cluster_serial < self._config_data.cluster.serial_no:
1332 if not self._offline:
1333 result = rpc.RpcRunner.call_write_ssconf_files(
1334 self._UnlockedGetOnlineNodeList(),
1335 self._UnlockedGetSsconfValues())
1337 for nname, nresu in result.items():
1338 msg = nresu.fail_msg
1340 errmsg = ("Error while uploading ssconf files to"
1341 " node %s: %s" % (nname, msg))
1342 logging.warning(errmsg)
1347 self._last_cluster_serial = self._config_data.cluster.serial_no
1349 def _UnlockedGetSsconfValues(self):
1350 """Return the values needed by ssconf.
1353 @return: a dictionary with keys the ssconf names and values their
1358 instance_names = utils.NiceSort(self._UnlockedGetInstanceList())
1359 node_names = utils.NiceSort(self._UnlockedGetNodeList())
1360 node_info = [self._UnlockedGetNodeInfo(name) for name in node_names]
1361 node_pri_ips = ["%s %s" % (ninfo.name, ninfo.primary_ip)
1362 for ninfo in node_info]
1363 node_snd_ips = ["%s %s" % (ninfo.name, ninfo.secondary_ip)
1364 for ninfo in node_info]
1366 instance_data = fn(instance_names)
1367 off_data = fn(node.name for node in node_info if node.offline)
1368 on_data = fn(node.name for node in node_info if not node.offline)
1369 mc_data = fn(node.name for node in node_info if node.master_candidate)
1370 mc_ips_data = fn(node.primary_ip for node in node_info
1371 if node.master_candidate)
1372 node_data = fn(node_names)
1373 node_pri_ips_data = fn(node_pri_ips)
1374 node_snd_ips_data = fn(node_snd_ips)
1376 cluster = self._config_data.cluster
1377 cluster_tags = fn(cluster.GetTags())
1379 hypervisor_list = fn(cluster.enabled_hypervisors)
1381 uid_pool = uidpool.FormatUidPool(cluster.uid_pool, separator="\n")
1384 constants.SS_CLUSTER_NAME: cluster.cluster_name,
1385 constants.SS_CLUSTER_TAGS: cluster_tags,
1386 constants.SS_FILE_STORAGE_DIR: cluster.file_storage_dir,
1387 constants.SS_MASTER_CANDIDATES: mc_data,
1388 constants.SS_MASTER_CANDIDATES_IPS: mc_ips_data,
1389 constants.SS_MASTER_IP: cluster.master_ip,
1390 constants.SS_MASTER_NETDEV: cluster.master_netdev,
1391 constants.SS_MASTER_NODE: cluster.master_node,
1392 constants.SS_NODE_LIST: node_data,
1393 constants.SS_NODE_PRIMARY_IPS: node_pri_ips_data,
1394 constants.SS_NODE_SECONDARY_IPS: node_snd_ips_data,
1395 constants.SS_OFFLINE_NODES: off_data,
1396 constants.SS_ONLINE_NODES: on_data,
1397 constants.SS_INSTANCE_LIST: instance_data,
1398 constants.SS_RELEASE_VERSION: constants.RELEASE_VERSION,
1399 constants.SS_HYPERVISOR_LIST: hypervisor_list,
1400 constants.SS_MAINTAIN_NODE_HEALTH: str(cluster.maintain_node_health),
1401 constants.SS_UID_POOL: uid_pool,
1404 @locking.ssynchronized(_config_lock, shared=1)
1405 def GetVGName(self):
1406 """Return the volume group name.
1409 return self._config_data.cluster.volume_group_name
1411 @locking.ssynchronized(_config_lock)
1412 def SetVGName(self, vg_name):
1413 """Set the volume group name.
1416 self._config_data.cluster.volume_group_name = vg_name
1417 self._config_data.cluster.serial_no += 1
1420 @locking.ssynchronized(_config_lock, shared=1)
1421 def GetMACPrefix(self):
1422 """Return the mac prefix.
1425 return self._config_data.cluster.mac_prefix
1427 @locking.ssynchronized(_config_lock, shared=1)
1428 def GetClusterInfo(self):
1429 """Returns information about the cluster
1431 @rtype: L{objects.Cluster}
1432 @return: the cluster object
1435 return self._config_data.cluster
1437 @locking.ssynchronized(_config_lock)
1438 def Update(self, target, feedback_fn):
1439 """Notify function to be called after updates.
1441 This function must be called when an object (as returned by
1442 GetInstanceInfo, GetNodeInfo, GetCluster) has been updated and the
1443 caller wants the modifications saved to the backing store. Note
1444 that all modified objects will be saved, but the target argument
1445 is the one the caller wants to ensure that it's saved.
1447 @param target: an instance of either L{objects.Cluster},
1448 L{objects.Node} or L{objects.Instance} which is existing in
1450 @param feedback_fn: Callable feedback function
1453 if self._config_data is None:
1454 raise errors.ProgrammerError("Configuration file not read,"
1456 update_serial = False
1457 if isinstance(target, objects.Cluster):
1458 test = target == self._config_data.cluster
1459 elif isinstance(target, objects.Node):
1460 test = target in self._config_data.nodes.values()
1461 update_serial = True
1462 elif isinstance(target, objects.Instance):
1463 test = target in self._config_data.instances.values()
1465 raise errors.ProgrammerError("Invalid object type (%s) passed to"
1466 " ConfigWriter.Update" % type(target))
1468 raise errors.ConfigurationError("Configuration updated since object"
1469 " has been read or unknown object")
1470 target.serial_no += 1
1471 target.mtime = now = time.time()
1474 # for node updates, we need to increase the cluster serial too
1475 self._config_data.cluster.serial_no += 1
1476 self._config_data.cluster.mtime = now
1478 if isinstance(target, objects.Instance):
1479 self._UnlockedReleaseDRBDMinors(target.name)
1481 self._WriteConfig(feedback_fn=feedback_fn)
1483 @locking.ssynchronized(_config_lock)
1484 def DropECReservations(self, ec_id):
1485 """Drop per-execution-context reservations
1488 for rm in self._all_rms:
1489 rm.DropECReservations(ec_id)