4 # Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Configuration management for Ganeti
24 This module provides the interface to the Ganeti cluster configuration.
26 The configuration data is stored on every node but is updated on the master
27 only. After each update, the master distributes the data to the other nodes.
29 Currently, the data storage format is JSON. YAML was slow and consuming too
34 # pylint: disable-msg=R0904
35 # R0904: Too many public methods
42 from ganeti import errors
43 from ganeti import locking
44 from ganeti import utils
45 from ganeti import constants
46 from ganeti import rpc
47 from ganeti import objects
48 from ganeti import serializer
49 from ganeti import uidpool
50 from ganeti import netutils
51 from ganeti import runtime
54 _config_lock = locking.SharedLock("ConfigWriter")
56 # job id used for resource management at config upgrade time
57 _UPGRADE_CONFIG_JID = "jid-cfg-upgrade"
60 def _ValidateConfig(data):
61 """Verifies that a configuration objects looks valid.
63 This only verifies the version of the configuration.
65 @raise errors.ConfigurationError: if the version differs from what
69 if data.version != constants.CONFIG_VERSION:
70 raise errors.ConfigurationError("Cluster configuration version"
71 " mismatch, got %s instead of %s" %
73 constants.CONFIG_VERSION))
76 class TemporaryReservationManager:
77 """A temporary resource reservation manager.
79 This is used to reserve resources in a job, before using them, making sure
80 other jobs cannot get them in the meantime.
84 self._ec_reserved = {}
86 def Reserved(self, resource):
87 for holder_reserved in self._ec_reserved.items():
88 if resource in holder_reserved:
92 def Reserve(self, ec_id, resource):
93 if self.Reserved(resource):
94 raise errors.ReservationError("Duplicate reservation for resource: %s." %
96 if ec_id not in self._ec_reserved:
97 self._ec_reserved[ec_id] = set([resource])
99 self._ec_reserved[ec_id].add(resource)
101 def DropECReservations(self, ec_id):
102 if ec_id in self._ec_reserved:
103 del self._ec_reserved[ec_id]
105 def GetReserved(self):
107 for holder_reserved in self._ec_reserved.values():
108 all_reserved.update(holder_reserved)
111 def Generate(self, existing, generate_one_fn, ec_id):
112 """Generate a new resource of this type
115 assert callable(generate_one_fn)
117 all_elems = self.GetReserved()
118 all_elems.update(existing)
121 new_resource = generate_one_fn()
122 if new_resource is not None and new_resource not in all_elems:
125 raise errors.ConfigurationError("Not able generate new resource"
126 " (last tried: %s)" % new_resource)
127 self.Reserve(ec_id, new_resource)
132 """The interface to the cluster configuration.
134 @ivar _temporary_lvs: reservation manager for temporary LVs
135 @ivar _all_rms: a list of all temporary reservation managers
138 def __init__(self, cfg_file=None, offline=False, _getents=runtime.GetEnts):
140 self._lock = _config_lock
141 self._config_data = None
142 self._offline = offline
144 self._cfg_file = constants.CLUSTER_CONF_FILE
146 self._cfg_file = cfg_file
147 self._getents = _getents
148 self._temporary_ids = TemporaryReservationManager()
149 self._temporary_drbds = {}
150 self._temporary_macs = TemporaryReservationManager()
151 self._temporary_secrets = TemporaryReservationManager()
152 self._temporary_lvs = TemporaryReservationManager()
153 self._all_rms = [self._temporary_ids, self._temporary_macs,
154 self._temporary_secrets, self._temporary_lvs]
155 # Note: in order to prevent errors when resolving our name in
156 # _DistributeConfig, we compute it here once and reuse it; it's
157 # better to raise an error before starting to modify the config
158 # file than after it was modified
159 self._my_hostname = netutils.Hostname.GetSysName()
160 self._last_cluster_serial = -1
163 # this method needs to be static, so that we can call it on the class
166 """Check if the cluster is configured.
169 return os.path.exists(constants.CLUSTER_CONF_FILE)
171 def _GenerateOneMAC(self):
172 """Generate one mac address
175 prefix = self._config_data.cluster.mac_prefix
176 byte1 = random.randrange(0, 256)
177 byte2 = random.randrange(0, 256)
178 byte3 = random.randrange(0, 256)
179 mac = "%s:%02x:%02x:%02x" % (prefix, byte1, byte2, byte3)
182 @locking.ssynchronized(_config_lock, shared=1)
183 def GenerateMAC(self, ec_id):
184 """Generate a MAC for an instance.
186 This should check the current instances for duplicates.
189 existing = self._AllMACs()
190 return self._temporary_ids.Generate(existing, self._GenerateOneMAC, ec_id)
192 @locking.ssynchronized(_config_lock, shared=1)
193 def ReserveMAC(self, mac, ec_id):
194 """Reserve a MAC for an instance.
196 This only checks instances managed by this cluster, it does not
197 check for potential collisions elsewhere.
200 all_macs = self._AllMACs()
202 raise errors.ReservationError("mac already in use")
204 self._temporary_macs.Reserve(mac, ec_id)
206 @locking.ssynchronized(_config_lock, shared=1)
207 def ReserveLV(self, lv_name, ec_id):
208 """Reserve an VG/LV pair for an instance.
210 @type lv_name: string
211 @param lv_name: the logical volume name to reserve
214 all_lvs = self._AllLVs()
215 if lv_name in all_lvs:
216 raise errors.ReservationError("LV already in use")
218 self._temporary_lvs.Reserve(lv_name, ec_id)
220 @locking.ssynchronized(_config_lock, shared=1)
221 def GenerateDRBDSecret(self, ec_id):
222 """Generate a DRBD secret.
224 This checks the current disks for duplicates.
227 return self._temporary_secrets.Generate(self._AllDRBDSecrets(),
228 utils.GenerateSecret,
232 """Compute the list of all LVs.
236 for instance in self._config_data.instances.values():
237 node_data = instance.MapLVsByNode()
238 for lv_list in node_data.values():
239 lvnames.update(lv_list)
242 def _AllIDs(self, include_temporary):
243 """Compute the list of all UUIDs and names we have.
245 @type include_temporary: boolean
246 @param include_temporary: whether to include the _temporary_ids set
248 @return: a set of IDs
252 if include_temporary:
253 existing.update(self._temporary_ids.GetReserved())
254 existing.update(self._AllLVs())
255 existing.update(self._config_data.instances.keys())
256 existing.update(self._config_data.nodes.keys())
257 existing.update([i.uuid for i in self._AllUUIDObjects() if i.uuid])
260 def _GenerateUniqueID(self, ec_id):
261 """Generate an unique UUID.
263 This checks the current node, instances and disk names for
267 @return: the unique id
270 existing = self._AllIDs(include_temporary=False)
271 return self._temporary_ids.Generate(existing, utils.NewUUID, ec_id)
273 @locking.ssynchronized(_config_lock, shared=1)
274 def GenerateUniqueID(self, ec_id):
275 """Generate an unique ID.
277 This is just a wrapper over the unlocked version.
280 @param ec_id: unique id for the job to reserve the id to
283 return self._GenerateUniqueID(ec_id)
286 """Return all MACs present in the config.
289 @return: the list of all MACs
293 for instance in self._config_data.instances.values():
294 for nic in instance.nics:
295 result.append(nic.mac)
299 def _AllDRBDSecrets(self):
300 """Return all DRBD secrets present in the config.
303 @return: the list of all DRBD secrets
306 def helper(disk, result):
307 """Recursively gather secrets from this disk."""
308 if disk.dev_type == constants.DT_DRBD8:
309 result.append(disk.logical_id[5])
311 for child in disk.children:
312 helper(child, result)
315 for instance in self._config_data.instances.values():
316 for disk in instance.disks:
321 def _CheckDiskIDs(self, disk, l_ids, p_ids):
322 """Compute duplicate disk IDs
324 @type disk: L{objects.Disk}
325 @param disk: the disk at which to start searching
327 @param l_ids: list of current logical ids
329 @param p_ids: list of current physical ids
331 @return: a list of error messages
335 if disk.logical_id is not None:
336 if disk.logical_id in l_ids:
337 result.append("duplicate logical id %s" % str(disk.logical_id))
339 l_ids.append(disk.logical_id)
340 if disk.physical_id is not None:
341 if disk.physical_id in p_ids:
342 result.append("duplicate physical id %s" % str(disk.physical_id))
344 p_ids.append(disk.physical_id)
347 for child in disk.children:
348 result.extend(self._CheckDiskIDs(child, l_ids, p_ids))
351 def _UnlockedVerifyConfig(self):
355 @return: a list of error messages; a non-empty list signifies
362 data = self._config_data
366 # global cluster checks
367 if not data.cluster.enabled_hypervisors:
368 result.append("enabled hypervisors list doesn't have any entries")
369 invalid_hvs = set(data.cluster.enabled_hypervisors) - constants.HYPER_TYPES
371 result.append("enabled hypervisors contains invalid entries: %s" %
373 missing_hvp = (set(data.cluster.enabled_hypervisors) -
374 set(data.cluster.hvparams.keys()))
376 result.append("hypervisor parameters missing for the enabled"
377 " hypervisor(s) %s" % utils.CommaJoin(missing_hvp))
379 if data.cluster.master_node not in data.nodes:
380 result.append("cluster has invalid primary node '%s'" %
381 data.cluster.master_node)
383 # per-instance checks
384 for instance_name in data.instances:
385 instance = data.instances[instance_name]
386 if instance.name != instance_name:
387 result.append("instance '%s' is indexed by wrong name '%s'" %
388 (instance.name, instance_name))
389 if instance.primary_node not in data.nodes:
390 result.append("instance '%s' has invalid primary node '%s'" %
391 (instance_name, instance.primary_node))
392 for snode in instance.secondary_nodes:
393 if snode not in data.nodes:
394 result.append("instance '%s' has invalid secondary node '%s'" %
395 (instance_name, snode))
396 for idx, nic in enumerate(instance.nics):
397 if nic.mac in seen_macs:
398 result.append("instance '%s' has NIC %d mac %s duplicate" %
399 (instance_name, idx, nic.mac))
401 seen_macs.append(nic.mac)
403 # gather the drbd ports for duplicate checks
404 for dsk in instance.disks:
405 if dsk.dev_type in constants.LDS_DRBD:
406 tcp_port = dsk.logical_id[2]
407 if tcp_port not in ports:
409 ports[tcp_port].append((instance.name, "drbd disk %s" % dsk.iv_name))
410 # gather network port reservation
411 net_port = getattr(instance, "network_port", None)
412 if net_port is not None:
413 if net_port not in ports:
415 ports[net_port].append((instance.name, "network port"))
417 # instance disk verify
418 for idx, disk in enumerate(instance.disks):
419 result.extend(["instance '%s' disk %d error: %s" %
420 (instance.name, idx, msg) for msg in disk.Verify()])
421 result.extend(self._CheckDiskIDs(disk, seen_lids, seen_pids))
423 # cluster-wide pool of free ports
424 for free_port in data.cluster.tcpudp_port_pool:
425 if free_port not in ports:
426 ports[free_port] = []
427 ports[free_port].append(("cluster", "port marked as free"))
429 # compute tcp/udp duplicate ports
435 txt = utils.CommaJoin(["%s/%s" % val for val in pdata])
436 result.append("tcp/udp port %s has duplicates: %s" % (pnum, txt))
438 # highest used tcp port check
440 if keys[-1] > data.cluster.highest_used_port:
441 result.append("Highest used port mismatch, saved %s, computed %s" %
442 (data.cluster.highest_used_port, keys[-1]))
444 if not data.nodes[data.cluster.master_node].master_candidate:
445 result.append("Master node is not a master candidate")
447 # master candidate checks
448 mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats()
450 result.append("Not enough master candidates: actual %d, target %d" %
454 for node_name, node in data.nodes.items():
455 if node.name != node_name:
456 result.append("Node '%s' is indexed by wrong name '%s'" %
457 (node.name, node_name))
458 if [node.master_candidate, node.drained, node.offline].count(True) > 1:
459 result.append("Node %s state is invalid: master_candidate=%s,"
460 " drain=%s, offline=%s" %
461 (node.name, node.master_candidate, node.drain,
465 nodegroups_names = set()
466 for nodegroup_uuid in data.nodegroups:
467 nodegroup = data.nodegroups[nodegroup_uuid]
468 if nodegroup.uuid != nodegroup_uuid:
469 result.append("nodegroup '%s' (uuid: '%s') indexed by wrong uuid '%s'"
470 % (nodegroup.name, nodegroup.uuid, nodegroup_uuid))
471 if utils.UUID_RE.match(nodegroup.name.lower()):
472 result.append("nodegroup '%s' (uuid: '%s') has uuid-like name" %
473 (nodegroup.name, nodegroup.uuid))
474 if nodegroup.name in nodegroups_names:
475 result.append("duplicate nodegroup name '%s'" % nodegroup.name)
477 nodegroups_names.add(nodegroup.name)
480 _, duplicates = self._UnlockedComputeDRBDMap()
481 for node, minor, instance_a, instance_b in duplicates:
482 result.append("DRBD minor %d on node %s is assigned twice to instances"
483 " %s and %s" % (minor, node, instance_a, instance_b))
486 default_nicparams = data.cluster.nicparams[constants.PP_DEFAULT]
489 def _AddIpAddress(ip, name):
490 ips.setdefault(ip, []).append(name)
492 _AddIpAddress(data.cluster.master_ip, "cluster_ip")
494 for node in data.nodes.values():
495 _AddIpAddress(node.primary_ip, "node:%s/primary" % node.name)
496 if node.secondary_ip != node.primary_ip:
497 _AddIpAddress(node.secondary_ip, "node:%s/secondary" % node.name)
499 for instance in data.instances.values():
500 for idx, nic in enumerate(instance.nics):
504 nicparams = objects.FillDict(default_nicparams, nic.nicparams)
505 nic_mode = nicparams[constants.NIC_MODE]
506 nic_link = nicparams[constants.NIC_LINK]
508 if nic_mode == constants.NIC_MODE_BRIDGED:
509 link = "bridge:%s" % nic_link
510 elif nic_mode == constants.NIC_MODE_ROUTED:
511 link = "route:%s" % nic_link
513 raise errors.ProgrammerError("NIC mode '%s' not handled" % nic_mode)
515 _AddIpAddress("%s/%s" % (link, nic.ip),
516 "instance:%s/nic:%d" % (instance.name, idx))
518 for ip, owners in ips.items():
520 result.append("IP address %s is used by multiple owners: %s" %
521 (ip, utils.CommaJoin(owners)))
525 @locking.ssynchronized(_config_lock, shared=1)
526 def VerifyConfig(self):
529 This is just a wrapper over L{_UnlockedVerifyConfig}.
532 @return: a list of error messages; a non-empty list signifies
536 return self._UnlockedVerifyConfig()
538 def _UnlockedSetDiskID(self, disk, node_name):
539 """Convert the unique ID to the ID needed on the target nodes.
541 This is used only for drbd, which needs ip/port configuration.
543 The routine descends down and updates its children also, because
544 this helps when the only the top device is passed to the remote
547 This function is for internal use, when the config lock is already held.
551 for child in disk.children:
552 self._UnlockedSetDiskID(child, node_name)
554 if disk.logical_id is None and disk.physical_id is not None:
556 if disk.dev_type == constants.LD_DRBD8:
557 pnode, snode, port, pminor, sminor, secret = disk.logical_id
558 if node_name not in (pnode, snode):
559 raise errors.ConfigurationError("DRBD device not knowing node %s" %
561 pnode_info = self._UnlockedGetNodeInfo(pnode)
562 snode_info = self._UnlockedGetNodeInfo(snode)
563 if pnode_info is None or snode_info is None:
564 raise errors.ConfigurationError("Can't find primary or secondary node"
565 " for %s" % str(disk))
566 p_data = (pnode_info.secondary_ip, port)
567 s_data = (snode_info.secondary_ip, port)
568 if pnode == node_name:
569 disk.physical_id = p_data + s_data + (pminor, secret)
570 else: # it must be secondary, we tested above
571 disk.physical_id = s_data + p_data + (sminor, secret)
573 disk.physical_id = disk.logical_id
576 @locking.ssynchronized(_config_lock)
577 def SetDiskID(self, disk, node_name):
578 """Convert the unique ID to the ID needed on the target nodes.
580 This is used only for drbd, which needs ip/port configuration.
582 The routine descends down and updates its children also, because
583 this helps when the only the top device is passed to the remote
587 return self._UnlockedSetDiskID(disk, node_name)
589 @locking.ssynchronized(_config_lock)
590 def AddTcpUdpPort(self, port):
591 """Adds a new port to the available port pool.
594 if not isinstance(port, int):
595 raise errors.ProgrammerError("Invalid type passed for port")
597 self._config_data.cluster.tcpudp_port_pool.add(port)
600 @locking.ssynchronized(_config_lock, shared=1)
601 def GetPortList(self):
602 """Returns a copy of the current port list.
605 return self._config_data.cluster.tcpudp_port_pool.copy()
607 @locking.ssynchronized(_config_lock)
608 def AllocatePort(self):
611 The port will be taken from the available port pool or from the
612 default port range (and in this case we increase
616 # If there are TCP/IP ports configured, we use them first.
617 if self._config_data.cluster.tcpudp_port_pool:
618 port = self._config_data.cluster.tcpudp_port_pool.pop()
620 port = self._config_data.cluster.highest_used_port + 1
621 if port >= constants.LAST_DRBD_PORT:
622 raise errors.ConfigurationError("The highest used port is greater"
623 " than %s. Aborting." %
624 constants.LAST_DRBD_PORT)
625 self._config_data.cluster.highest_used_port = port
630 def _UnlockedComputeDRBDMap(self):
631 """Compute the used DRBD minor/nodes.
634 @return: dictionary of node_name: dict of minor: instance_name;
635 the returned dict will have all the nodes in it (even if with
636 an empty list), and a list of duplicates; if the duplicates
637 list is not empty, the configuration is corrupted and its caller
638 should raise an exception
641 def _AppendUsedPorts(instance_name, disk, used):
643 if disk.dev_type == constants.LD_DRBD8 and len(disk.logical_id) >= 5:
644 node_a, node_b, _, minor_a, minor_b = disk.logical_id[:5]
645 for node, port in ((node_a, minor_a), (node_b, minor_b)):
646 assert node in used, ("Node '%s' of instance '%s' not found"
647 " in node list" % (node, instance_name))
648 if port in used[node]:
649 duplicates.append((node, port, instance_name, used[node][port]))
651 used[node][port] = instance_name
653 for child in disk.children:
654 duplicates.extend(_AppendUsedPorts(instance_name, child, used))
658 my_dict = dict((node, {}) for node in self._config_data.nodes)
659 for instance in self._config_data.instances.itervalues():
660 for disk in instance.disks:
661 duplicates.extend(_AppendUsedPorts(instance.name, disk, my_dict))
662 for (node, minor), instance in self._temporary_drbds.iteritems():
663 if minor in my_dict[node] and my_dict[node][minor] != instance:
664 duplicates.append((node, minor, instance, my_dict[node][minor]))
666 my_dict[node][minor] = instance
667 return my_dict, duplicates
669 @locking.ssynchronized(_config_lock)
670 def ComputeDRBDMap(self):
671 """Compute the used DRBD minor/nodes.
673 This is just a wrapper over L{_UnlockedComputeDRBDMap}.
675 @return: dictionary of node_name: dict of minor: instance_name;
676 the returned dict will have all the nodes in it (even if with
680 d_map, duplicates = self._UnlockedComputeDRBDMap()
682 raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
686 @locking.ssynchronized(_config_lock)
687 def AllocateDRBDMinor(self, nodes, instance):
688 """Allocate a drbd minor.
690 The free minor will be automatically computed from the existing
691 devices. A node can be given multiple times in order to allocate
692 multiple minors. The result is the list of minors, in the same
693 order as the passed nodes.
695 @type instance: string
696 @param instance: the instance for which we allocate minors
699 assert isinstance(instance, basestring), \
700 "Invalid argument '%s' passed to AllocateDRBDMinor" % instance
702 d_map, duplicates = self._UnlockedComputeDRBDMap()
704 raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
710 # no minors used, we can start at 0
713 self._temporary_drbds[(nname, 0)] = instance
717 ffree = utils.FirstFree(keys)
719 # return the next minor
720 # TODO: implement high-limit check
724 # double-check minor against current instances
725 assert minor not in d_map[nname], \
726 ("Attempt to reuse allocated DRBD minor %d on node %s,"
727 " already allocated to instance %s" %
728 (minor, nname, d_map[nname][minor]))
729 ndata[minor] = instance
730 # double-check minor against reservation
731 r_key = (nname, minor)
732 assert r_key not in self._temporary_drbds, \
733 ("Attempt to reuse reserved DRBD minor %d on node %s,"
734 " reserved for instance %s" %
735 (minor, nname, self._temporary_drbds[r_key]))
736 self._temporary_drbds[r_key] = instance
738 logging.debug("Request to allocate drbd minors, input: %s, returning %s",
742 def _UnlockedReleaseDRBDMinors(self, instance):
743 """Release temporary drbd minors allocated for a given instance.
745 @type instance: string
746 @param instance: the instance for which temporary minors should be
750 assert isinstance(instance, basestring), \
751 "Invalid argument passed to ReleaseDRBDMinors"
752 for key, name in self._temporary_drbds.items():
754 del self._temporary_drbds[key]
756 @locking.ssynchronized(_config_lock)
757 def ReleaseDRBDMinors(self, instance):
758 """Release temporary drbd minors allocated for a given instance.
760 This should be called on the error paths, on the success paths
761 it's automatically called by the ConfigWriter add and update
764 This function is just a wrapper over L{_UnlockedReleaseDRBDMinors}.
766 @type instance: string
767 @param instance: the instance for which temporary minors should be
771 self._UnlockedReleaseDRBDMinors(instance)
773 @locking.ssynchronized(_config_lock, shared=1)
774 def GetConfigVersion(self):
775 """Get the configuration version.
777 @return: Config version
780 return self._config_data.version
782 @locking.ssynchronized(_config_lock, shared=1)
783 def GetClusterName(self):
786 @return: Cluster name
789 return self._config_data.cluster.cluster_name
791 @locking.ssynchronized(_config_lock, shared=1)
792 def GetMasterNode(self):
793 """Get the hostname of the master node for this cluster.
795 @return: Master hostname
798 return self._config_data.cluster.master_node
800 @locking.ssynchronized(_config_lock, shared=1)
801 def GetMasterIP(self):
802 """Get the IP of the master node for this cluster.
807 return self._config_data.cluster.master_ip
809 @locking.ssynchronized(_config_lock, shared=1)
810 def GetMasterNetdev(self):
811 """Get the master network device for this cluster.
814 return self._config_data.cluster.master_netdev
816 @locking.ssynchronized(_config_lock, shared=1)
817 def GetFileStorageDir(self):
818 """Get the file storage dir for this cluster.
821 return self._config_data.cluster.file_storage_dir
823 @locking.ssynchronized(_config_lock, shared=1)
824 def GetHypervisorType(self):
825 """Get the hypervisor type for this cluster.
828 return self._config_data.cluster.enabled_hypervisors[0]
830 @locking.ssynchronized(_config_lock, shared=1)
831 def GetHostKey(self):
832 """Return the rsa hostkey from the config.
835 @return: the rsa hostkey
838 return self._config_data.cluster.rsahostkeypub
840 @locking.ssynchronized(_config_lock, shared=1)
841 def GetDefaultIAllocator(self):
842 """Get the default instance allocator for this cluster.
845 return self._config_data.cluster.default_iallocator
847 @locking.ssynchronized(_config_lock, shared=1)
848 def GetPrimaryIPFamily(self):
849 """Get cluster primary ip family.
851 @return: primary ip family
854 return self._config_data.cluster.primary_ip_family
856 @locking.ssynchronized(_config_lock, shared=1)
857 def LookupNodeGroup(self, target):
858 """Lookup a node group.
860 @type target: string or None
861 @param target: group name or uuid or None to look for the default
863 @return: nodegroup uuid
864 @raises errors.OpPrereqError: when the target group cannot be found
868 if len(self._config_data.nodegroups) != 1:
869 raise errors.OpPrereqError("More than one nodegroup exists. Target"
870 " group must be specified explicitely.")
872 return self._config_data.nodegroups.keys()[0]
873 if target in self._config_data.nodegroups:
875 for nodegroup in self._config_data.nodegroups.values():
876 if nodegroup.name == target:
877 return nodegroup.uuid
878 raise errors.OpPrereqError("Nodegroup '%s' not found", target)
880 @locking.ssynchronized(_config_lock)
881 def AddInstance(self, instance, ec_id):
882 """Add an instance to the config.
884 This should be used after creating a new instance.
886 @type instance: L{objects.Instance}
887 @param instance: the instance object
890 if not isinstance(instance, objects.Instance):
891 raise errors.ProgrammerError("Invalid type passed to AddInstance")
893 if instance.disk_template != constants.DT_DISKLESS:
894 all_lvs = instance.MapLVsByNode()
895 logging.info("Instance '%s' DISK_LAYOUT: %s", instance.name, all_lvs)
897 all_macs = self._AllMACs()
898 for nic in instance.nics:
899 if nic.mac in all_macs:
900 raise errors.ConfigurationError("Cannot add instance %s:"
901 " MAC address '%s' already in use." %
902 (instance.name, nic.mac))
904 self._EnsureUUID(instance, ec_id)
906 instance.serial_no = 1
907 instance.ctime = instance.mtime = time.time()
908 self._config_data.instances[instance.name] = instance
909 self._config_data.cluster.serial_no += 1
910 self._UnlockedReleaseDRBDMinors(instance.name)
913 def _EnsureUUID(self, item, ec_id):
914 """Ensures a given object has a valid UUID.
916 @param item: the instance or node to be checked
917 @param ec_id: the execution context id for the uuid reservation
921 item.uuid = self._GenerateUniqueID(ec_id)
922 elif item.uuid in self._AllIDs(include_temporary=True):
923 raise errors.ConfigurationError("Cannot add '%s': UUID %s already"
924 " in use" % (item.name, item.uuid))
926 def _SetInstanceStatus(self, instance_name, status):
927 """Set the instance's status to a given value.
930 assert isinstance(status, bool), \
931 "Invalid status '%s' passed to SetInstanceStatus" % (status,)
933 if instance_name not in self._config_data.instances:
934 raise errors.ConfigurationError("Unknown instance '%s'" %
936 instance = self._config_data.instances[instance_name]
937 if instance.admin_up != status:
938 instance.admin_up = status
939 instance.serial_no += 1
940 instance.mtime = time.time()
943 @locking.ssynchronized(_config_lock)
944 def MarkInstanceUp(self, instance_name):
945 """Mark the instance status to up in the config.
948 self._SetInstanceStatus(instance_name, True)
950 @locking.ssynchronized(_config_lock)
951 def RemoveInstance(self, instance_name):
952 """Remove the instance from the configuration.
955 if instance_name not in self._config_data.instances:
956 raise errors.ConfigurationError("Unknown instance '%s'" % instance_name)
957 del self._config_data.instances[instance_name]
958 self._config_data.cluster.serial_no += 1
961 @locking.ssynchronized(_config_lock)
962 def RenameInstance(self, old_name, new_name):
963 """Rename an instance.
965 This needs to be done in ConfigWriter and not by RemoveInstance
966 combined with AddInstance as only we can guarantee an atomic
970 if old_name not in self._config_data.instances:
971 raise errors.ConfigurationError("Unknown instance '%s'" % old_name)
972 inst = self._config_data.instances[old_name]
973 del self._config_data.instances[old_name]
976 for disk in inst.disks:
977 if disk.dev_type == constants.LD_FILE:
978 # rename the file paths in logical and physical id
979 file_storage_dir = os.path.dirname(os.path.dirname(disk.logical_id[1]))
980 disk.physical_id = disk.logical_id = (disk.logical_id[0],
981 utils.PathJoin(file_storage_dir,
985 self._config_data.instances[inst.name] = inst
988 @locking.ssynchronized(_config_lock)
989 def MarkInstanceDown(self, instance_name):
990 """Mark the status of an instance to down in the configuration.
993 self._SetInstanceStatus(instance_name, False)
995 def _UnlockedGetInstanceList(self):
996 """Get the list of instances.
998 This function is for internal use, when the config lock is already held.
1001 return self._config_data.instances.keys()
1003 @locking.ssynchronized(_config_lock, shared=1)
1004 def GetInstanceList(self):
1005 """Get the list of instances.
1007 @return: array of instances, ex. ['instance2.example.com',
1008 'instance1.example.com']
1011 return self._UnlockedGetInstanceList()
1013 @locking.ssynchronized(_config_lock, shared=1)
1014 def ExpandInstanceName(self, short_name):
1015 """Attempt to expand an incomplete instance name.
1018 return utils.MatchNameComponent(short_name,
1019 self._config_data.instances.keys(),
1020 case_sensitive=False)
1022 def _UnlockedGetInstanceInfo(self, instance_name):
1023 """Returns information about an instance.
1025 This function is for internal use, when the config lock is already held.
1028 if instance_name not in self._config_data.instances:
1031 return self._config_data.instances[instance_name]
1033 @locking.ssynchronized(_config_lock, shared=1)
1034 def GetInstanceInfo(self, instance_name):
1035 """Returns information about an instance.
1037 It takes the information from the configuration file. Other information of
1038 an instance are taken from the live systems.
1040 @param instance_name: name of the instance, e.g.
1041 I{instance1.example.com}
1043 @rtype: L{objects.Instance}
1044 @return: the instance object
1047 return self._UnlockedGetInstanceInfo(instance_name)
1049 @locking.ssynchronized(_config_lock, shared=1)
1050 def GetAllInstancesInfo(self):
1051 """Get the configuration of all instances.
1054 @return: dict of (instance, instance_info), where instance_info is what
1055 would GetInstanceInfo return for the node
1058 my_dict = dict([(instance, self._UnlockedGetInstanceInfo(instance))
1059 for instance in self._UnlockedGetInstanceList()])
1062 @locking.ssynchronized(_config_lock)
1063 def AddNode(self, node, ec_id):
1064 """Add a node to the configuration.
1066 @type node: L{objects.Node}
1067 @param node: a Node instance
1070 logging.info("Adding node %s to configuration", node.name)
1072 self._EnsureUUID(node, ec_id)
1075 node.ctime = node.mtime = time.time()
1076 self._UnlockedAddNodeToGroup(node.name, node.nodegroup)
1077 self._config_data.nodes[node.name] = node
1078 self._config_data.cluster.serial_no += 1
1081 @locking.ssynchronized(_config_lock)
1082 def RemoveNode(self, node_name):
1083 """Remove a node from the configuration.
1086 logging.info("Removing node %s from configuration", node_name)
1088 if node_name not in self._config_data.nodes:
1089 raise errors.ConfigurationError("Unknown node '%s'" % node_name)
1091 self._UnlockedRemoveNodeFromGroup(self._config_data.nodes[node_name])
1092 del self._config_data.nodes[node_name]
1093 self._config_data.cluster.serial_no += 1
1096 @locking.ssynchronized(_config_lock, shared=1)
1097 def ExpandNodeName(self, short_name):
1098 """Attempt to expand an incomplete instance name.
1101 return utils.MatchNameComponent(short_name,
1102 self._config_data.nodes.keys(),
1103 case_sensitive=False)
1105 def _UnlockedGetNodeInfo(self, node_name):
1106 """Get the configuration of a node, as stored in the config.
1108 This function is for internal use, when the config lock is already
1111 @param node_name: the node name, e.g. I{node1.example.com}
1113 @rtype: L{objects.Node}
1114 @return: the node object
1117 if node_name not in self._config_data.nodes:
1120 return self._config_data.nodes[node_name]
1122 @locking.ssynchronized(_config_lock, shared=1)
1123 def GetNodeInfo(self, node_name):
1124 """Get the configuration of a node, as stored in the config.
1126 This is just a locked wrapper over L{_UnlockedGetNodeInfo}.
1128 @param node_name: the node name, e.g. I{node1.example.com}
1130 @rtype: L{objects.Node}
1131 @return: the node object
1134 return self._UnlockedGetNodeInfo(node_name)
1136 def _UnlockedGetNodeList(self):
1137 """Return the list of nodes which are in the configuration.
1139 This function is for internal use, when the config lock is already
1145 return self._config_data.nodes.keys()
1147 @locking.ssynchronized(_config_lock, shared=1)
1148 def GetNodeList(self):
1149 """Return the list of nodes which are in the configuration.
1152 return self._UnlockedGetNodeList()
1154 def _UnlockedGetOnlineNodeList(self):
1155 """Return the list of nodes which are online.
1158 all_nodes = [self._UnlockedGetNodeInfo(node)
1159 for node in self._UnlockedGetNodeList()]
1160 return [node.name for node in all_nodes if not node.offline]
1162 @locking.ssynchronized(_config_lock, shared=1)
1163 def GetOnlineNodeList(self):
1164 """Return the list of nodes which are online.
1167 return self._UnlockedGetOnlineNodeList()
1169 @locking.ssynchronized(_config_lock, shared=1)
1170 def GetAllNodesInfo(self):
1171 """Get the configuration of all nodes.
1174 @return: dict of (node, node_info), where node_info is what
1175 would GetNodeInfo return for the node
1178 my_dict = dict([(node, self._UnlockedGetNodeInfo(node))
1179 for node in self._UnlockedGetNodeList()])
1182 def _UnlockedGetMasterCandidateStats(self, exceptions=None):
1183 """Get the number of current and maximum desired and possible candidates.
1185 @type exceptions: list
1186 @param exceptions: if passed, list of nodes that should be ignored
1188 @return: tuple of (current, desired and possible, possible)
1191 mc_now = mc_should = mc_max = 0
1192 for node in self._config_data.nodes.values():
1193 if exceptions and node.name in exceptions:
1195 if not (node.offline or node.drained):
1197 if node.master_candidate:
1199 mc_should = min(mc_max, self._config_data.cluster.candidate_pool_size)
1200 return (mc_now, mc_should, mc_max)
1202 @locking.ssynchronized(_config_lock, shared=1)
1203 def GetMasterCandidateStats(self, exceptions=None):
1204 """Get the number of current and maximum possible candidates.
1206 This is just a wrapper over L{_UnlockedGetMasterCandidateStats}.
1208 @type exceptions: list
1209 @param exceptions: if passed, list of nodes that should be ignored
1211 @return: tuple of (current, max)
1214 return self._UnlockedGetMasterCandidateStats(exceptions)
1216 @locking.ssynchronized(_config_lock)
1217 def MaintainCandidatePool(self, exceptions):
1218 """Try to grow the candidate pool to the desired size.
1220 @type exceptions: list
1221 @param exceptions: if passed, list of nodes that should be ignored
1223 @return: list with the adjusted nodes (L{objects.Node} instances)
1226 mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats(exceptions)
1229 node_list = self._config_data.nodes.keys()
1230 random.shuffle(node_list)
1231 for name in node_list:
1232 if mc_now >= mc_max:
1234 node = self._config_data.nodes[name]
1235 if (node.master_candidate or node.offline or node.drained or
1236 node.name in exceptions):
1238 mod_list.append(node)
1239 node.master_candidate = True
1242 if mc_now != mc_max:
1243 # this should not happen
1244 logging.warning("Warning: MaintainCandidatePool didn't manage to"
1245 " fill the candidate pool (%d/%d)", mc_now, mc_max)
1247 self._config_data.cluster.serial_no += 1
1252 def _UnlockedAddNodeToGroup(self, node_name, nodegroup_uuid):
1253 """Add a given node to the specified group.
1256 if nodegroup_uuid not in self._config_data.nodegroups:
1257 # This can happen if a node group gets deleted between its lookup and
1258 # when we're adding the first node to it, since we don't keep a lock in
1259 # the meantime. It's ok though, as we'll fail cleanly if the node group
1260 # is not found anymore.
1261 raise errors.OpExecError("Unknown nodegroup: %s" % nodegroup_uuid)
1262 if node_name not in self._config_data.nodegroups[nodegroup_uuid].members:
1263 self._config_data.nodegroups[nodegroup_uuid].members.append(node_name)
1265 def _UnlockedRemoveNodeFromGroup(self, node):
1266 """Remove a given node from its group.
1269 nodegroup = node.nodegroup
1270 if nodegroup not in self._config_data.nodegroups:
1271 logging.warning("Warning: node '%s' has a non-existing nodegroup '%s'"
1272 " (while being removed from it)", node.name, nodegroup)
1273 nodegroup_obj = self._config_data.nodegroups[nodegroup]
1274 if node.name not in nodegroup_obj.members:
1275 logging.warning("Warning: node '%s' not a member of its nodegroup '%s'"
1276 " (while being removed from it)", node.name, nodegroup)
1278 nodegroup_obj.members.remove(node.name)
1280 def _BumpSerialNo(self):
1281 """Bump up the serial number of the config.
1284 self._config_data.serial_no += 1
1285 self._config_data.mtime = time.time()
1287 def _AllUUIDObjects(self):
1288 """Returns all objects with uuid attributes.
1291 return (self._config_data.instances.values() +
1292 self._config_data.nodes.values() +
1293 self._config_data.nodegroups.values() +
1294 [self._config_data.cluster])
1296 def _OpenConfig(self):
1297 """Read the config data from disk.
1300 raw_data = utils.ReadFile(self._cfg_file)
1303 data = objects.ConfigData.FromDict(serializer.Load(raw_data))
1304 except Exception, err:
1305 raise errors.ConfigurationError(err)
1307 # Make sure the configuration has the right version
1308 _ValidateConfig(data)
1310 if (not hasattr(data, 'cluster') or
1311 not hasattr(data.cluster, 'rsahostkeypub')):
1312 raise errors.ConfigurationError("Incomplete configuration"
1313 " (missing cluster.rsahostkeypub)")
1315 # Upgrade configuration if needed
1316 data.UpgradeConfig()
1318 self._config_data = data
1319 # reset the last serial as -1 so that the next write will cause
1321 self._last_cluster_serial = -1
1323 # And finally run our (custom) config upgrade sequence
1324 self._UpgradeConfig()
1326 def _UpgradeConfig(self):
1327 """Run upgrade steps that cannot be done purely in the objects.
1329 This is because some data elements need uniqueness across the
1330 whole configuration, etc.
1332 @warning: this function will call L{_WriteConfig()}, but also
1333 L{DropECReservations} so it needs to be called only from a
1334 "safe" place (the constructor). If one wanted to call it with
1335 the lock held, a DropECReservationUnlocked would need to be
1336 created first, to avoid causing deadlock.
1340 for item in self._AllUUIDObjects():
1341 if item.uuid is None:
1342 item.uuid = self._GenerateUniqueID(_UPGRADE_CONFIG_JID)
1344 if not self._config_data.nodegroups:
1345 default_nodegroup_uuid = self._GenerateUniqueID(_UPGRADE_CONFIG_JID)
1346 default_nodegroup = objects.NodeGroup(
1347 uuid=default_nodegroup_uuid,
1351 self._config_data.nodegroups[default_nodegroup_uuid] = default_nodegroup
1353 for node in self._config_data.nodes.values():
1354 if not node.nodegroup:
1355 node.nodegroup = self.LookupNodeGroup(None)
1357 # This is technically *not* an upgrade, but needs to be done both when
1358 # nodegroups are being added, and upon normally loading the config,
1359 # because the members list of a node group is discarded upon
1360 # serializing/deserializing the object.
1361 self._UnlockedAddNodeToGroup(node.name, node.nodegroup)
1364 # This is ok even if it acquires the internal lock, as _UpgradeConfig is
1365 # only called at config init time, without the lock held
1366 self.DropECReservations(_UPGRADE_CONFIG_JID)
1368 def _DistributeConfig(self, feedback_fn):
1369 """Distribute the configuration to the other nodes.
1371 Currently, this only copies the configuration file. In the future,
1372 it could be used to encapsulate the 2/3-phase update mechanism.
1382 myhostname = self._my_hostname
1383 # we can skip checking whether _UnlockedGetNodeInfo returns None
1384 # since the node list comes from _UnlocketGetNodeList, and we are
1385 # called with the lock held, so no modifications should take place
1387 for node_name in self._UnlockedGetNodeList():
1388 if node_name == myhostname:
1390 node_info = self._UnlockedGetNodeInfo(node_name)
1391 if not node_info.master_candidate:
1393 node_list.append(node_info.name)
1394 addr_list.append(node_info.primary_ip)
1396 result = rpc.RpcRunner.call_upload_file(node_list, self._cfg_file,
1397 address_list=addr_list)
1398 for to_node, to_result in result.items():
1399 msg = to_result.fail_msg
1401 msg = ("Copy of file %s to node %s failed: %s" %
1402 (self._cfg_file, to_node, msg))
1412 def _WriteConfig(self, destination=None, feedback_fn=None):
1413 """Write the configuration data to persistent storage.
1416 assert feedback_fn is None or callable(feedback_fn)
1418 # Warn on config errors, but don't abort the save - the
1419 # configuration has already been modified, and we can't revert;
1420 # the best we can do is to warn the user and save as is, leaving
1421 # recovery to the user
1422 config_errors = self._UnlockedVerifyConfig()
1424 errmsg = ("Configuration data is not consistent: %s" %
1425 (utils.CommaJoin(config_errors)))
1426 logging.critical(errmsg)
1430 if destination is None:
1431 destination = self._cfg_file
1432 self._BumpSerialNo()
1433 txt = serializer.Dump(self._config_data.ToDict())
1435 getents = self._getents()
1436 utils.WriteFile(destination, data=txt, gid=getents.confd_gid, mode=0640)
1438 self.write_count += 1
1440 # and redistribute the config file to master candidates
1441 self._DistributeConfig(feedback_fn)
1443 # Write ssconf files on all nodes (including locally)
1444 if self._last_cluster_serial < self._config_data.cluster.serial_no:
1445 if not self._offline:
1446 result = rpc.RpcRunner.call_write_ssconf_files(
1447 self._UnlockedGetOnlineNodeList(),
1448 self._UnlockedGetSsconfValues())
1450 for nname, nresu in result.items():
1451 msg = nresu.fail_msg
1453 errmsg = ("Error while uploading ssconf files to"
1454 " node %s: %s" % (nname, msg))
1455 logging.warning(errmsg)
1460 self._last_cluster_serial = self._config_data.cluster.serial_no
1462 def _UnlockedGetSsconfValues(self):
1463 """Return the values needed by ssconf.
1466 @return: a dictionary with keys the ssconf names and values their
1471 instance_names = utils.NiceSort(self._UnlockedGetInstanceList())
1472 node_names = utils.NiceSort(self._UnlockedGetNodeList())
1473 node_info = [self._UnlockedGetNodeInfo(name) for name in node_names]
1474 node_pri_ips = ["%s %s" % (ninfo.name, ninfo.primary_ip)
1475 for ninfo in node_info]
1476 node_snd_ips = ["%s %s" % (ninfo.name, ninfo.secondary_ip)
1477 for ninfo in node_info]
1479 instance_data = fn(instance_names)
1480 off_data = fn(node.name for node in node_info if node.offline)
1481 on_data = fn(node.name for node in node_info if not node.offline)
1482 mc_data = fn(node.name for node in node_info if node.master_candidate)
1483 mc_ips_data = fn(node.primary_ip for node in node_info
1484 if node.master_candidate)
1485 node_data = fn(node_names)
1486 node_pri_ips_data = fn(node_pri_ips)
1487 node_snd_ips_data = fn(node_snd_ips)
1489 cluster = self._config_data.cluster
1490 cluster_tags = fn(cluster.GetTags())
1492 hypervisor_list = fn(cluster.enabled_hypervisors)
1494 uid_pool = uidpool.FormatUidPool(cluster.uid_pool, separator="\n")
1496 nodegroups = ["%s %s" % (nodegroup.uuid, nodegroup.name) for nodegroup in
1497 self._config_data.nodegroups.values()]
1498 nodegroups_data = fn(utils.NiceSort(nodegroups))
1501 constants.SS_CLUSTER_NAME: cluster.cluster_name,
1502 constants.SS_CLUSTER_TAGS: cluster_tags,
1503 constants.SS_FILE_STORAGE_DIR: cluster.file_storage_dir,
1504 constants.SS_MASTER_CANDIDATES: mc_data,
1505 constants.SS_MASTER_CANDIDATES_IPS: mc_ips_data,
1506 constants.SS_MASTER_IP: cluster.master_ip,
1507 constants.SS_MASTER_NETDEV: cluster.master_netdev,
1508 constants.SS_MASTER_NODE: cluster.master_node,
1509 constants.SS_NODE_LIST: node_data,
1510 constants.SS_NODE_PRIMARY_IPS: node_pri_ips_data,
1511 constants.SS_NODE_SECONDARY_IPS: node_snd_ips_data,
1512 constants.SS_OFFLINE_NODES: off_data,
1513 constants.SS_ONLINE_NODES: on_data,
1514 constants.SS_PRIMARY_IP_FAMILY: str(cluster.primary_ip_family),
1515 constants.SS_INSTANCE_LIST: instance_data,
1516 constants.SS_RELEASE_VERSION: constants.RELEASE_VERSION,
1517 constants.SS_HYPERVISOR_LIST: hypervisor_list,
1518 constants.SS_MAINTAIN_NODE_HEALTH: str(cluster.maintain_node_health),
1519 constants.SS_UID_POOL: uid_pool,
1520 constants.SS_NODEGROUPS: nodegroups_data,
1523 @locking.ssynchronized(_config_lock, shared=1)
1524 def GetSsconfValues(self):
1525 """Wrapper using lock around _UnlockedGetSsconf().
1528 return self._UnlockedGetSsconfValues()
1530 @locking.ssynchronized(_config_lock, shared=1)
1531 def GetVGName(self):
1532 """Return the volume group name.
1535 return self._config_data.cluster.volume_group_name
1537 @locking.ssynchronized(_config_lock)
1538 def SetVGName(self, vg_name):
1539 """Set the volume group name.
1542 self._config_data.cluster.volume_group_name = vg_name
1543 self._config_data.cluster.serial_no += 1
1546 @locking.ssynchronized(_config_lock, shared=1)
1547 def GetDRBDHelper(self):
1548 """Return DRBD usermode helper.
1551 return self._config_data.cluster.drbd_usermode_helper
1553 @locking.ssynchronized(_config_lock)
1554 def SetDRBDHelper(self, drbd_helper):
1555 """Set DRBD usermode helper.
1558 self._config_data.cluster.drbd_usermode_helper = drbd_helper
1559 self._config_data.cluster.serial_no += 1
1562 @locking.ssynchronized(_config_lock, shared=1)
1563 def GetMACPrefix(self):
1564 """Return the mac prefix.
1567 return self._config_data.cluster.mac_prefix
1569 @locking.ssynchronized(_config_lock, shared=1)
1570 def GetClusterInfo(self):
1571 """Returns information about the cluster
1573 @rtype: L{objects.Cluster}
1574 @return: the cluster object
1577 return self._config_data.cluster
1579 @locking.ssynchronized(_config_lock, shared=1)
1580 def HasAnyDiskOfType(self, dev_type):
1581 """Check if in there is at disk of the given type in the configuration.
1584 return self._config_data.HasAnyDiskOfType(dev_type)
1586 @locking.ssynchronized(_config_lock)
1587 def Update(self, target, feedback_fn):
1588 """Notify function to be called after updates.
1590 This function must be called when an object (as returned by
1591 GetInstanceInfo, GetNodeInfo, GetCluster) has been updated and the
1592 caller wants the modifications saved to the backing store. Note
1593 that all modified objects will be saved, but the target argument
1594 is the one the caller wants to ensure that it's saved.
1596 @param target: an instance of either L{objects.Cluster},
1597 L{objects.Node} or L{objects.Instance} which is existing in
1599 @param feedback_fn: Callable feedback function
1602 if self._config_data is None:
1603 raise errors.ProgrammerError("Configuration file not read,"
1605 update_serial = False
1606 if isinstance(target, objects.Cluster):
1607 test = target == self._config_data.cluster
1608 elif isinstance(target, objects.Node):
1609 test = target in self._config_data.nodes.values()
1610 update_serial = True
1611 elif isinstance(target, objects.Instance):
1612 test = target in self._config_data.instances.values()
1614 raise errors.ProgrammerError("Invalid object type (%s) passed to"
1615 " ConfigWriter.Update" % type(target))
1617 raise errors.ConfigurationError("Configuration updated since object"
1618 " has been read or unknown object")
1619 target.serial_no += 1
1620 target.mtime = now = time.time()
1623 # for node updates, we need to increase the cluster serial too
1624 self._config_data.cluster.serial_no += 1
1625 self._config_data.cluster.mtime = now
1627 if isinstance(target, objects.Instance):
1628 self._UnlockedReleaseDRBDMinors(target.name)
1630 self._WriteConfig(feedback_fn=feedback_fn)
1632 @locking.ssynchronized(_config_lock)
1633 def DropECReservations(self, ec_id):
1634 """Drop per-execution-context reservations
1637 for rm in self._all_rms:
1638 rm.DropECReservations(ec_id)