Fix broken commit 9e302a8
[ganeti-local] / lib / config.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Configuration management for Ganeti
23
24 This module provides the interface to the Ganeti cluster configuration.
25
26 The configuration data is stored on every node but is updated on the master
27 only. After each update, the master distributes the data to the other nodes.
28
29 Currently, the data storage format is JSON. YAML was slow and consuming too
30 much memory.
31
32 """
33
34 import os
35 import random
36 import logging
37 import time
38
39 from ganeti import errors
40 from ganeti import locking
41 from ganeti import utils
42 from ganeti import constants
43 from ganeti import rpc
44 from ganeti import objects
45 from ganeti import serializer
46 from ganeti import uidpool
47
48
49 _config_lock = locking.SharedLock()
50
51 # job id used for resource management at config upgrade time
52 _UPGRADE_CONFIG_JID = "jid-cfg-upgrade"
53
54
55 def _ValidateConfig(data):
56   """Verifies that a configuration objects looks valid.
57
58   This only verifies the version of the configuration.
59
60   @raise errors.ConfigurationError: if the version differs from what
61       we expect
62
63   """
64   if data.version != constants.CONFIG_VERSION:
65     raise errors.ConfigurationError("Cluster configuration version"
66                                     " mismatch, got %s instead of %s" %
67                                     (data.version,
68                                      constants.CONFIG_VERSION))
69
70
71 class TemporaryReservationManager:
72   """A temporary resource reservation manager.
73
74   This is used to reserve resources in a job, before using them, making sure
75   other jobs cannot get them in the meantime.
76
77   """
78   def __init__(self):
79     self._ec_reserved = {}
80
81   def Reserved(self, resource):
82     for holder_reserved in self._ec_reserved.items():
83       if resource in holder_reserved:
84         return True
85     return False
86
87   def Reserve(self, ec_id, resource):
88     if self.Reserved(resource):
89       raise errors.ReservationError("Duplicate reservation for resource: %s." %
90                                     (resource))
91     if ec_id not in self._ec_reserved:
92       self._ec_reserved[ec_id] = set([resource])
93     else:
94       self._ec_reserved[ec_id].add(resource)
95
96   def DropECReservations(self, ec_id):
97     if ec_id in self._ec_reserved:
98       del self._ec_reserved[ec_id]
99
100   def GetReserved(self):
101     all_reserved = set()
102     for holder_reserved in self._ec_reserved.values():
103       all_reserved.update(holder_reserved)
104     return all_reserved
105
106   def Generate(self, existing, generate_one_fn, ec_id):
107     """Generate a new resource of this type
108
109     """
110     assert callable(generate_one_fn)
111
112     all_elems = self.GetReserved()
113     all_elems.update(existing)
114     retries = 64
115     while retries > 0:
116       new_resource = generate_one_fn()
117       if new_resource is not None and new_resource not in all_elems:
118         break
119     else:
120       raise errors.ConfigurationError("Not able generate new resource"
121                                       " (last tried: %s)" % new_resource)
122     self.Reserve(ec_id, new_resource)
123     return new_resource
124
125
126 class ConfigWriter:
127   """The interface to the cluster configuration.
128
129   @ivar _temporary_lvs: reservation manager for temporary LVs
130   @ivar _all_rms: a list of all temporary reservation managers
131
132   """
133   def __init__(self, cfg_file=None, offline=False):
134     self.write_count = 0
135     self._lock = _config_lock
136     self._config_data = None
137     self._offline = offline
138     if cfg_file is None:
139       self._cfg_file = constants.CLUSTER_CONF_FILE
140     else:
141       self._cfg_file = cfg_file
142     self._temporary_ids = TemporaryReservationManager()
143     self._temporary_drbds = {}
144     self._temporary_macs = TemporaryReservationManager()
145     self._temporary_secrets = TemporaryReservationManager()
146     self._temporary_lvs = TemporaryReservationManager()
147     self._all_rms = [self._temporary_ids, self._temporary_macs,
148                      self._temporary_secrets, self._temporary_lvs]
149     # Note: in order to prevent errors when resolving our name in
150     # _DistributeConfig, we compute it here once and reuse it; it's
151     # better to raise an error before starting to modify the config
152     # file than after it was modified
153     self._my_hostname = utils.HostInfo().name
154     self._last_cluster_serial = -1
155     self._OpenConfig()
156
157   # this method needs to be static, so that we can call it on the class
158   @staticmethod
159   def IsCluster():
160     """Check if the cluster is configured.
161
162     """
163     return os.path.exists(constants.CLUSTER_CONF_FILE)
164
165   def _GenerateOneMAC(self):
166     """Generate one mac address
167
168     """
169     prefix = self._config_data.cluster.mac_prefix
170     byte1 = random.randrange(0, 256)
171     byte2 = random.randrange(0, 256)
172     byte3 = random.randrange(0, 256)
173     mac = "%s:%02x:%02x:%02x" % (prefix, byte1, byte2, byte3)
174     return mac
175
176   @locking.ssynchronized(_config_lock, shared=1)
177   def GenerateMAC(self, ec_id):
178     """Generate a MAC for an instance.
179
180     This should check the current instances for duplicates.
181
182     """
183     existing = self._AllMACs()
184     return self._temporary_ids.Generate(existing, self._GenerateOneMAC, ec_id)
185
186   @locking.ssynchronized(_config_lock, shared=1)
187   def ReserveMAC(self, mac, ec_id):
188     """Reserve a MAC for an instance.
189
190     This only checks instances managed by this cluster, it does not
191     check for potential collisions elsewhere.
192
193     """
194     all_macs = self._AllMACs()
195     if mac in all_macs:
196       raise errors.ReservationError("mac already in use")
197     else:
198       self._temporary_macs.Reserve(mac, ec_id)
199
200   @locking.ssynchronized(_config_lock, shared=1)
201   def ReserveLV(self, lv_name, ec_id):
202     """Reserve an VG/LV pair for an instance.
203
204     @type lv_name: string
205     @param lv_name: the logical volume name to reserve
206
207     """
208     all_lvs = self._AllLVs()
209     if lv_name in all_lvs:
210       raise errors.ReservationError("LV already in use")
211     else:
212       self._temporary_lvs.Reserve(lv_name, ec_id)
213
214   @locking.ssynchronized(_config_lock, shared=1)
215   def GenerateDRBDSecret(self, ec_id):
216     """Generate a DRBD secret.
217
218     This checks the current disks for duplicates.
219
220     """
221     return self._temporary_secrets.Generate(self._AllDRBDSecrets(),
222                                             utils.GenerateSecret,
223                                             ec_id)
224
225   def _AllLVs(self):
226     """Compute the list of all LVs.
227
228     """
229     lvnames = set()
230     for instance in self._config_data.instances.values():
231       node_data = instance.MapLVsByNode()
232       for lv_list in node_data.values():
233         lvnames.update(lv_list)
234     return lvnames
235
236   def _AllIDs(self, include_temporary):
237     """Compute the list of all UUIDs and names we have.
238
239     @type include_temporary: boolean
240     @param include_temporary: whether to include the _temporary_ids set
241     @rtype: set
242     @return: a set of IDs
243
244     """
245     existing = set()
246     if include_temporary:
247       existing.update(self._temporary_ids.GetReserved())
248     existing.update(self._AllLVs())
249     existing.update(self._config_data.instances.keys())
250     existing.update(self._config_data.nodes.keys())
251     existing.update([i.uuid for i in self._AllUUIDObjects() if i.uuid])
252     return existing
253
254   def _GenerateUniqueID(self, ec_id):
255     """Generate an unique UUID.
256
257     This checks the current node, instances and disk names for
258     duplicates.
259
260     @rtype: string
261     @return: the unique id
262
263     """
264     existing = self._AllIDs(include_temporary=False)
265     return self._temporary_ids.Generate(existing, utils.NewUUID, ec_id)
266
267   @locking.ssynchronized(_config_lock, shared=1)
268   def GenerateUniqueID(self, ec_id):
269     """Generate an unique ID.
270
271     This is just a wrapper over the unlocked version.
272
273     @type ec_id: string
274     @param ec_id: unique id for the job to reserve the id to
275
276     """
277     return self._GenerateUniqueID(ec_id)
278
279   def _AllMACs(self):
280     """Return all MACs present in the config.
281
282     @rtype: list
283     @return: the list of all MACs
284
285     """
286     result = []
287     for instance in self._config_data.instances.values():
288       for nic in instance.nics:
289         result.append(nic.mac)
290
291     return result
292
293   def _AllDRBDSecrets(self):
294     """Return all DRBD secrets present in the config.
295
296     @rtype: list
297     @return: the list of all DRBD secrets
298
299     """
300     def helper(disk, result):
301       """Recursively gather secrets from this disk."""
302       if disk.dev_type == constants.DT_DRBD8:
303         result.append(disk.logical_id[5])
304       if disk.children:
305         for child in disk.children:
306           helper(child, result)
307
308     result = []
309     for instance in self._config_data.instances.values():
310       for disk in instance.disks:
311         helper(disk, result)
312
313     return result
314
315   def _CheckDiskIDs(self, disk, l_ids, p_ids):
316     """Compute duplicate disk IDs
317
318     @type disk: L{objects.Disk}
319     @param disk: the disk at which to start searching
320     @type l_ids: list
321     @param l_ids: list of current logical ids
322     @type p_ids: list
323     @param p_ids: list of current physical ids
324     @rtype: list
325     @return: a list of error messages
326
327     """
328     result = []
329     if disk.logical_id is not None:
330       if disk.logical_id in l_ids:
331         result.append("duplicate logical id %s" % str(disk.logical_id))
332       else:
333         l_ids.append(disk.logical_id)
334     if disk.physical_id is not None:
335       if disk.physical_id in p_ids:
336         result.append("duplicate physical id %s" % str(disk.physical_id))
337       else:
338         p_ids.append(disk.physical_id)
339
340     if disk.children:
341       for child in disk.children:
342         result.extend(self._CheckDiskIDs(child, l_ids, p_ids))
343     return result
344
345   def _UnlockedVerifyConfig(self):
346     """Verify function.
347
348     @rtype: list
349     @return: a list of error messages; a non-empty list signifies
350         configuration errors
351
352     """
353     result = []
354     seen_macs = []
355     ports = {}
356     data = self._config_data
357     seen_lids = []
358     seen_pids = []
359
360     # global cluster checks
361     if not data.cluster.enabled_hypervisors:
362       result.append("enabled hypervisors list doesn't have any entries")
363     invalid_hvs = set(data.cluster.enabled_hypervisors) - constants.HYPER_TYPES
364     if invalid_hvs:
365       result.append("enabled hypervisors contains invalid entries: %s" %
366                     invalid_hvs)
367     missing_hvp = (set(data.cluster.enabled_hypervisors) -
368                    set(data.cluster.hvparams.keys()))
369     if missing_hvp:
370       result.append("hypervisor parameters missing for the enabled"
371                     " hypervisor(s) %s" % utils.CommaJoin(missing_hvp))
372
373     if data.cluster.master_node not in data.nodes:
374       result.append("cluster has invalid primary node '%s'" %
375                     data.cluster.master_node)
376
377     # per-instance checks
378     for instance_name in data.instances:
379       instance = data.instances[instance_name]
380       if instance.name != instance_name:
381         result.append("instance '%s' is indexed by wrong name '%s'" %
382                       (instance.name, instance_name))
383       if instance.primary_node not in data.nodes:
384         result.append("instance '%s' has invalid primary node '%s'" %
385                       (instance_name, instance.primary_node))
386       for snode in instance.secondary_nodes:
387         if snode not in data.nodes:
388           result.append("instance '%s' has invalid secondary node '%s'" %
389                         (instance_name, snode))
390       for idx, nic in enumerate(instance.nics):
391         if nic.mac in seen_macs:
392           result.append("instance '%s' has NIC %d mac %s duplicate" %
393                         (instance_name, idx, nic.mac))
394         else:
395           seen_macs.append(nic.mac)
396
397       # gather the drbd ports for duplicate checks
398       for dsk in instance.disks:
399         if dsk.dev_type in constants.LDS_DRBD:
400           tcp_port = dsk.logical_id[2]
401           if tcp_port not in ports:
402             ports[tcp_port] = []
403           ports[tcp_port].append((instance.name, "drbd disk %s" % dsk.iv_name))
404       # gather network port reservation
405       net_port = getattr(instance, "network_port", None)
406       if net_port is not None:
407         if net_port not in ports:
408           ports[net_port] = []
409         ports[net_port].append((instance.name, "network port"))
410
411       # instance disk verify
412       for idx, disk in enumerate(instance.disks):
413         result.extend(["instance '%s' disk %d error: %s" %
414                        (instance.name, idx, msg) for msg in disk.Verify()])
415         result.extend(self._CheckDiskIDs(disk, seen_lids, seen_pids))
416
417     # cluster-wide pool of free ports
418     for free_port in data.cluster.tcpudp_port_pool:
419       if free_port not in ports:
420         ports[free_port] = []
421       ports[free_port].append(("cluster", "port marked as free"))
422
423     # compute tcp/udp duplicate ports
424     keys = ports.keys()
425     keys.sort()
426     for pnum in keys:
427       pdata = ports[pnum]
428       if len(pdata) > 1:
429         txt = utils.CommaJoin(["%s/%s" % val for val in pdata])
430         result.append("tcp/udp port %s has duplicates: %s" % (pnum, txt))
431
432     # highest used tcp port check
433     if keys:
434       if keys[-1] > data.cluster.highest_used_port:
435         result.append("Highest used port mismatch, saved %s, computed %s" %
436                       (data.cluster.highest_used_port, keys[-1]))
437
438     if not data.nodes[data.cluster.master_node].master_candidate:
439       result.append("Master node is not a master candidate")
440
441     # master candidate checks
442     mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats()
443     if mc_now < mc_max:
444       result.append("Not enough master candidates: actual %d, target %d" %
445                     (mc_now, mc_max))
446
447     # node checks
448     for node_name, node in data.nodes.items():
449       if node.name != node_name:
450         result.append("Node '%s' is indexed by wrong name '%s'" %
451                       (node.name, node_name))
452       if [node.master_candidate, node.drained, node.offline].count(True) > 1:
453         result.append("Node %s state is invalid: master_candidate=%s,"
454                       " drain=%s, offline=%s" %
455                       (node.name, node.master_candidate, node.drain,
456                        node.offline))
457
458     # drbd minors check
459     _, duplicates = self._UnlockedComputeDRBDMap()
460     for node, minor, instance_a, instance_b in duplicates:
461       result.append("DRBD minor %d on node %s is assigned twice to instances"
462                     " %s and %s" % (minor, node, instance_a, instance_b))
463
464     # IP checks
465     default_nicparams = data.cluster.nicparams[constants.PP_DEFAULT]
466     ips = {}
467
468     def _AddIpAddress(ip, name):
469       ips.setdefault(ip, []).append(name)
470
471     _AddIpAddress(data.cluster.master_ip, "cluster_ip")
472
473     for node in data.nodes.values():
474       _AddIpAddress(node.primary_ip, "node:%s/primary" % node.name)
475       if node.secondary_ip != node.primary_ip:
476         _AddIpAddress(node.secondary_ip, "node:%s/secondary" % node.name)
477
478     for instance in data.instances.values():
479       for idx, nic in enumerate(instance.nics):
480         if nic.ip is None:
481           continue
482
483         nicparams = objects.FillDict(default_nicparams, nic.nicparams)
484         nic_mode = nicparams[constants.NIC_MODE]
485         nic_link = nicparams[constants.NIC_LINK]
486
487         if nic_mode == constants.NIC_MODE_BRIDGED:
488           link = "bridge:%s" % nic_link
489         elif nic_mode == constants.NIC_MODE_ROUTED:
490           link = "route:%s" % nic_link
491         else:
492           raise errors.ProgrammerError("NIC mode '%s' not handled" % nic_mode)
493
494         _AddIpAddress("%s/%s" % (link, nic.ip),
495                       "instance:%s/nic:%d" % (instance.name, idx))
496
497     for ip, owners in ips.items():
498       if len(owners) > 1:
499         result.append("IP address %s is used by multiple owners: %s" %
500                       (ip, utils.CommaJoin(owners)))
501
502     return result
503
504   @locking.ssynchronized(_config_lock, shared=1)
505   def VerifyConfig(self):
506     """Verify function.
507
508     This is just a wrapper over L{_UnlockedVerifyConfig}.
509
510     @rtype: list
511     @return: a list of error messages; a non-empty list signifies
512         configuration errors
513
514     """
515     return self._UnlockedVerifyConfig()
516
517   def _UnlockedSetDiskID(self, disk, node_name):
518     """Convert the unique ID to the ID needed on the target nodes.
519
520     This is used only for drbd, which needs ip/port configuration.
521
522     The routine descends down and updates its children also, because
523     this helps when the only the top device is passed to the remote
524     node.
525
526     This function is for internal use, when the config lock is already held.
527
528     """
529     if disk.children:
530       for child in disk.children:
531         self._UnlockedSetDiskID(child, node_name)
532
533     if disk.logical_id is None and disk.physical_id is not None:
534       return
535     if disk.dev_type == constants.LD_DRBD8:
536       pnode, snode, port, pminor, sminor, secret = disk.logical_id
537       if node_name not in (pnode, snode):
538         raise errors.ConfigurationError("DRBD device not knowing node %s" %
539                                         node_name)
540       pnode_info = self._UnlockedGetNodeInfo(pnode)
541       snode_info = self._UnlockedGetNodeInfo(snode)
542       if pnode_info is None or snode_info is None:
543         raise errors.ConfigurationError("Can't find primary or secondary node"
544                                         " for %s" % str(disk))
545       p_data = (pnode_info.secondary_ip, port)
546       s_data = (snode_info.secondary_ip, port)
547       if pnode == node_name:
548         disk.physical_id = p_data + s_data + (pminor, secret)
549       else: # it must be secondary, we tested above
550         disk.physical_id = s_data + p_data + (sminor, secret)
551     else:
552       disk.physical_id = disk.logical_id
553     return
554
555   @locking.ssynchronized(_config_lock)
556   def SetDiskID(self, disk, node_name):
557     """Convert the unique ID to the ID needed on the target nodes.
558
559     This is used only for drbd, which needs ip/port configuration.
560
561     The routine descends down and updates its children also, because
562     this helps when the only the top device is passed to the remote
563     node.
564
565     """
566     return self._UnlockedSetDiskID(disk, node_name)
567
568   @locking.ssynchronized(_config_lock)
569   def AddTcpUdpPort(self, port):
570     """Adds a new port to the available port pool.
571
572     """
573     if not isinstance(port, int):
574       raise errors.ProgrammerError("Invalid type passed for port")
575
576     self._config_data.cluster.tcpudp_port_pool.add(port)
577     self._WriteConfig()
578
579   @locking.ssynchronized(_config_lock, shared=1)
580   def GetPortList(self):
581     """Returns a copy of the current port list.
582
583     """
584     return self._config_data.cluster.tcpudp_port_pool.copy()
585
586   @locking.ssynchronized(_config_lock)
587   def AllocatePort(self):
588     """Allocate a port.
589
590     The port will be taken from the available port pool or from the
591     default port range (and in this case we increase
592     highest_used_port).
593
594     """
595     # If there are TCP/IP ports configured, we use them first.
596     if self._config_data.cluster.tcpudp_port_pool:
597       port = self._config_data.cluster.tcpudp_port_pool.pop()
598     else:
599       port = self._config_data.cluster.highest_used_port + 1
600       if port >= constants.LAST_DRBD_PORT:
601         raise errors.ConfigurationError("The highest used port is greater"
602                                         " than %s. Aborting." %
603                                         constants.LAST_DRBD_PORT)
604       self._config_data.cluster.highest_used_port = port
605
606     self._WriteConfig()
607     return port
608
609   def _UnlockedComputeDRBDMap(self):
610     """Compute the used DRBD minor/nodes.
611
612     @rtype: (dict, list)
613     @return: dictionary of node_name: dict of minor: instance_name;
614         the returned dict will have all the nodes in it (even if with
615         an empty list), and a list of duplicates; if the duplicates
616         list is not empty, the configuration is corrupted and its caller
617         should raise an exception
618
619     """
620     def _AppendUsedPorts(instance_name, disk, used):
621       duplicates = []
622       if disk.dev_type == constants.LD_DRBD8 and len(disk.logical_id) >= 5:
623         node_a, node_b, _, minor_a, minor_b = disk.logical_id[:5]
624         for node, port in ((node_a, minor_a), (node_b, minor_b)):
625           assert node in used, ("Node '%s' of instance '%s' not found"
626                                 " in node list" % (node, instance_name))
627           if port in used[node]:
628             duplicates.append((node, port, instance_name, used[node][port]))
629           else:
630             used[node][port] = instance_name
631       if disk.children:
632         for child in disk.children:
633           duplicates.extend(_AppendUsedPorts(instance_name, child, used))
634       return duplicates
635
636     duplicates = []
637     my_dict = dict((node, {}) for node in self._config_data.nodes)
638     for instance in self._config_data.instances.itervalues():
639       for disk in instance.disks:
640         duplicates.extend(_AppendUsedPorts(instance.name, disk, my_dict))
641     for (node, minor), instance in self._temporary_drbds.iteritems():
642       if minor in my_dict[node] and my_dict[node][minor] != instance:
643         duplicates.append((node, minor, instance, my_dict[node][minor]))
644       else:
645         my_dict[node][minor] = instance
646     return my_dict, duplicates
647
648   @locking.ssynchronized(_config_lock)
649   def ComputeDRBDMap(self):
650     """Compute the used DRBD minor/nodes.
651
652     This is just a wrapper over L{_UnlockedComputeDRBDMap}.
653
654     @return: dictionary of node_name: dict of minor: instance_name;
655         the returned dict will have all the nodes in it (even if with
656         an empty list).
657
658     """
659     d_map, duplicates = self._UnlockedComputeDRBDMap()
660     if duplicates:
661       raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
662                                       str(duplicates))
663     return d_map
664
665   @locking.ssynchronized(_config_lock)
666   def AllocateDRBDMinor(self, nodes, instance):
667     """Allocate a drbd minor.
668
669     The free minor will be automatically computed from the existing
670     devices. A node can be given multiple times in order to allocate
671     multiple minors. The result is the list of minors, in the same
672     order as the passed nodes.
673
674     @type instance: string
675     @param instance: the instance for which we allocate minors
676
677     """
678     assert isinstance(instance, basestring), \
679            "Invalid argument '%s' passed to AllocateDRBDMinor" % instance
680
681     d_map, duplicates = self._UnlockedComputeDRBDMap()
682     if duplicates:
683       raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
684                                       str(duplicates))
685     result = []
686     for nname in nodes:
687       ndata = d_map[nname]
688       if not ndata:
689         # no minors used, we can start at 0
690         result.append(0)
691         ndata[0] = instance
692         self._temporary_drbds[(nname, 0)] = instance
693         continue
694       keys = ndata.keys()
695       keys.sort()
696       ffree = utils.FirstFree(keys)
697       if ffree is None:
698         # return the next minor
699         # TODO: implement high-limit check
700         minor = keys[-1] + 1
701       else:
702         minor = ffree
703       # double-check minor against current instances
704       assert minor not in d_map[nname], \
705              ("Attempt to reuse allocated DRBD minor %d on node %s,"
706               " already allocated to instance %s" %
707               (minor, nname, d_map[nname][minor]))
708       ndata[minor] = instance
709       # double-check minor against reservation
710       r_key = (nname, minor)
711       assert r_key not in self._temporary_drbds, \
712              ("Attempt to reuse reserved DRBD minor %d on node %s,"
713               " reserved for instance %s" %
714               (minor, nname, self._temporary_drbds[r_key]))
715       self._temporary_drbds[r_key] = instance
716       result.append(minor)
717     logging.debug("Request to allocate drbd minors, input: %s, returning %s",
718                   nodes, result)
719     return result
720
721   def _UnlockedReleaseDRBDMinors(self, instance):
722     """Release temporary drbd minors allocated for a given instance.
723
724     @type instance: string
725     @param instance: the instance for which temporary minors should be
726                      released
727
728     """
729     assert isinstance(instance, basestring), \
730            "Invalid argument passed to ReleaseDRBDMinors"
731     for key, name in self._temporary_drbds.items():
732       if name == instance:
733         del self._temporary_drbds[key]
734
735   @locking.ssynchronized(_config_lock)
736   def ReleaseDRBDMinors(self, instance):
737     """Release temporary drbd minors allocated for a given instance.
738
739     This should be called on the error paths, on the success paths
740     it's automatically called by the ConfigWriter add and update
741     functions.
742
743     This function is just a wrapper over L{_UnlockedReleaseDRBDMinors}.
744
745     @type instance: string
746     @param instance: the instance for which temporary minors should be
747                      released
748
749     """
750     self._UnlockedReleaseDRBDMinors(instance)
751
752   @locking.ssynchronized(_config_lock, shared=1)
753   def GetConfigVersion(self):
754     """Get the configuration version.
755
756     @return: Config version
757
758     """
759     return self._config_data.version
760
761   @locking.ssynchronized(_config_lock, shared=1)
762   def GetClusterName(self):
763     """Get cluster name.
764
765     @return: Cluster name
766
767     """
768     return self._config_data.cluster.cluster_name
769
770   @locking.ssynchronized(_config_lock, shared=1)
771   def GetMasterNode(self):
772     """Get the hostname of the master node for this cluster.
773
774     @return: Master hostname
775
776     """
777     return self._config_data.cluster.master_node
778
779   @locking.ssynchronized(_config_lock, shared=1)
780   def GetMasterIP(self):
781     """Get the IP of the master node for this cluster.
782
783     @return: Master IP
784
785     """
786     return self._config_data.cluster.master_ip
787
788   @locking.ssynchronized(_config_lock, shared=1)
789   def GetMasterNetdev(self):
790     """Get the master network device for this cluster.
791
792     """
793     return self._config_data.cluster.master_netdev
794
795   @locking.ssynchronized(_config_lock, shared=1)
796   def GetFileStorageDir(self):
797     """Get the file storage dir for this cluster.
798
799     """
800     return self._config_data.cluster.file_storage_dir
801
802   @locking.ssynchronized(_config_lock, shared=1)
803   def GetHypervisorType(self):
804     """Get the hypervisor type for this cluster.
805
806     """
807     return self._config_data.cluster.enabled_hypervisors[0]
808
809   @locking.ssynchronized(_config_lock, shared=1)
810   def GetHostKey(self):
811     """Return the rsa hostkey from the config.
812
813     @rtype: string
814     @return: the rsa hostkey
815
816     """
817     return self._config_data.cluster.rsahostkeypub
818
819   @locking.ssynchronized(_config_lock)
820   def AddInstance(self, instance, ec_id):
821     """Add an instance to the config.
822
823     This should be used after creating a new instance.
824
825     @type instance: L{objects.Instance}
826     @param instance: the instance object
827
828     """
829     if not isinstance(instance, objects.Instance):
830       raise errors.ProgrammerError("Invalid type passed to AddInstance")
831
832     if instance.disk_template != constants.DT_DISKLESS:
833       all_lvs = instance.MapLVsByNode()
834       logging.info("Instance '%s' DISK_LAYOUT: %s", instance.name, all_lvs)
835
836     all_macs = self._AllMACs()
837     for nic in instance.nics:
838       if nic.mac in all_macs:
839         raise errors.ConfigurationError("Cannot add instance %s:"
840                                         " MAC address '%s' already in use." %
841                                         (instance.name, nic.mac))
842
843     self._EnsureUUID(instance, ec_id)
844
845     instance.serial_no = 1
846     instance.ctime = instance.mtime = time.time()
847     self._config_data.instances[instance.name] = instance
848     self._config_data.cluster.serial_no += 1
849     self._UnlockedReleaseDRBDMinors(instance.name)
850     self._WriteConfig()
851
852   def _EnsureUUID(self, item, ec_id):
853     """Ensures a given object has a valid UUID.
854
855     @param item: the instance or node to be checked
856     @param ec_id: the execution context id for the uuid reservation
857
858     """
859     if not item.uuid:
860       item.uuid = self._GenerateUniqueID(ec_id)
861     elif item.uuid in self._AllIDs(include_temporary=True):
862       raise errors.ConfigurationError("Cannot add '%s': UUID %s already"
863                                       " in use" % (item.name, item.uuid))
864
865   def _SetInstanceStatus(self, instance_name, status):
866     """Set the instance's status to a given value.
867
868     """
869     assert isinstance(status, bool), \
870            "Invalid status '%s' passed to SetInstanceStatus" % (status,)
871
872     if instance_name not in self._config_data.instances:
873       raise errors.ConfigurationError("Unknown instance '%s'" %
874                                       instance_name)
875     instance = self._config_data.instances[instance_name]
876     if instance.admin_up != status:
877       instance.admin_up = status
878       instance.serial_no += 1
879       instance.mtime = time.time()
880       self._WriteConfig()
881
882   @locking.ssynchronized(_config_lock)
883   def MarkInstanceUp(self, instance_name):
884     """Mark the instance status to up in the config.
885
886     """
887     self._SetInstanceStatus(instance_name, True)
888
889   @locking.ssynchronized(_config_lock)
890   def RemoveInstance(self, instance_name):
891     """Remove the instance from the configuration.
892
893     """
894     if instance_name not in self._config_data.instances:
895       raise errors.ConfigurationError("Unknown instance '%s'" % instance_name)
896     del self._config_data.instances[instance_name]
897     self._config_data.cluster.serial_no += 1
898     self._WriteConfig()
899
900   @locking.ssynchronized(_config_lock)
901   def RenameInstance(self, old_name, new_name):
902     """Rename an instance.
903
904     This needs to be done in ConfigWriter and not by RemoveInstance
905     combined with AddInstance as only we can guarantee an atomic
906     rename.
907
908     """
909     if old_name not in self._config_data.instances:
910       raise errors.ConfigurationError("Unknown instance '%s'" % old_name)
911     inst = self._config_data.instances[old_name]
912     del self._config_data.instances[old_name]
913     inst.name = new_name
914
915     for disk in inst.disks:
916       if disk.dev_type == constants.LD_FILE:
917         # rename the file paths in logical and physical id
918         file_storage_dir = os.path.dirname(os.path.dirname(disk.logical_id[1]))
919         disk.physical_id = disk.logical_id = (disk.logical_id[0],
920                                               utils.PathJoin(file_storage_dir,
921                                                              inst.name,
922                                                              disk.iv_name))
923
924     self._config_data.instances[inst.name] = inst
925     self._WriteConfig()
926
927   @locking.ssynchronized(_config_lock)
928   def MarkInstanceDown(self, instance_name):
929     """Mark the status of an instance to down in the configuration.
930
931     """
932     self._SetInstanceStatus(instance_name, False)
933
934   def _UnlockedGetInstanceList(self):
935     """Get the list of instances.
936
937     This function is for internal use, when the config lock is already held.
938
939     """
940     return self._config_data.instances.keys()
941
942   @locking.ssynchronized(_config_lock, shared=1)
943   def GetInstanceList(self):
944     """Get the list of instances.
945
946     @return: array of instances, ex. ['instance2.example.com',
947         'instance1.example.com']
948
949     """
950     return self._UnlockedGetInstanceList()
951
952   @locking.ssynchronized(_config_lock, shared=1)
953   def ExpandInstanceName(self, short_name):
954     """Attempt to expand an incomplete instance name.
955
956     """
957     return utils.MatchNameComponent(short_name,
958                                     self._config_data.instances.keys(),
959                                     case_sensitive=False)
960
961   def _UnlockedGetInstanceInfo(self, instance_name):
962     """Returns information about an instance.
963
964     This function is for internal use, when the config lock is already held.
965
966     """
967     if instance_name not in self._config_data.instances:
968       return None
969
970     return self._config_data.instances[instance_name]
971
972   @locking.ssynchronized(_config_lock, shared=1)
973   def GetInstanceInfo(self, instance_name):
974     """Returns information about an instance.
975
976     It takes the information from the configuration file. Other information of
977     an instance are taken from the live systems.
978
979     @param instance_name: name of the instance, e.g.
980         I{instance1.example.com}
981
982     @rtype: L{objects.Instance}
983     @return: the instance object
984
985     """
986     return self._UnlockedGetInstanceInfo(instance_name)
987
988   @locking.ssynchronized(_config_lock, shared=1)
989   def GetAllInstancesInfo(self):
990     """Get the configuration of all instances.
991
992     @rtype: dict
993     @return: dict of (instance, instance_info), where instance_info is what
994               would GetInstanceInfo return for the node
995
996     """
997     my_dict = dict([(instance, self._UnlockedGetInstanceInfo(instance))
998                     for instance in self._UnlockedGetInstanceList()])
999     return my_dict
1000
1001   @locking.ssynchronized(_config_lock)
1002   def AddNode(self, node, ec_id):
1003     """Add a node to the configuration.
1004
1005     @type node: L{objects.Node}
1006     @param node: a Node instance
1007
1008     """
1009     logging.info("Adding node %s to configuration", node.name)
1010
1011     self._EnsureUUID(node, ec_id)
1012
1013     node.serial_no = 1
1014     node.ctime = node.mtime = time.time()
1015     self._config_data.nodes[node.name] = node
1016     self._config_data.cluster.serial_no += 1
1017     self._WriteConfig()
1018
1019   @locking.ssynchronized(_config_lock)
1020   def RemoveNode(self, node_name):
1021     """Remove a node from the configuration.
1022
1023     """
1024     logging.info("Removing node %s from configuration", node_name)
1025
1026     if node_name not in self._config_data.nodes:
1027       raise errors.ConfigurationError("Unknown node '%s'" % node_name)
1028
1029     del self._config_data.nodes[node_name]
1030     self._config_data.cluster.serial_no += 1
1031     self._WriteConfig()
1032
1033   @locking.ssynchronized(_config_lock, shared=1)
1034   def ExpandNodeName(self, short_name):
1035     """Attempt to expand an incomplete instance name.
1036
1037     """
1038     return utils.MatchNameComponent(short_name,
1039                                     self._config_data.nodes.keys(),
1040                                     case_sensitive=False)
1041
1042   def _UnlockedGetNodeInfo(self, node_name):
1043     """Get the configuration of a node, as stored in the config.
1044
1045     This function is for internal use, when the config lock is already
1046     held.
1047
1048     @param node_name: the node name, e.g. I{node1.example.com}
1049
1050     @rtype: L{objects.Node}
1051     @return: the node object
1052
1053     """
1054     if node_name not in self._config_data.nodes:
1055       return None
1056
1057     return self._config_data.nodes[node_name]
1058
1059   @locking.ssynchronized(_config_lock, shared=1)
1060   def GetNodeInfo(self, node_name):
1061     """Get the configuration of a node, as stored in the config.
1062
1063     This is just a locked wrapper over L{_UnlockedGetNodeInfo}.
1064
1065     @param node_name: the node name, e.g. I{node1.example.com}
1066
1067     @rtype: L{objects.Node}
1068     @return: the node object
1069
1070     """
1071     return self._UnlockedGetNodeInfo(node_name)
1072
1073   def _UnlockedGetNodeList(self):
1074     """Return the list of nodes which are in the configuration.
1075
1076     This function is for internal use, when the config lock is already
1077     held.
1078
1079     @rtype: list
1080
1081     """
1082     return self._config_data.nodes.keys()
1083
1084   @locking.ssynchronized(_config_lock, shared=1)
1085   def GetNodeList(self):
1086     """Return the list of nodes which are in the configuration.
1087
1088     """
1089     return self._UnlockedGetNodeList()
1090
1091   def _UnlockedGetOnlineNodeList(self):
1092     """Return the list of nodes which are online.
1093
1094     """
1095     all_nodes = [self._UnlockedGetNodeInfo(node)
1096                  for node in self._UnlockedGetNodeList()]
1097     return [node.name for node in all_nodes if not node.offline]
1098
1099   @locking.ssynchronized(_config_lock, shared=1)
1100   def GetOnlineNodeList(self):
1101     """Return the list of nodes which are online.
1102
1103     """
1104     return self._UnlockedGetOnlineNodeList()
1105
1106   @locking.ssynchronized(_config_lock, shared=1)
1107   def GetAllNodesInfo(self):
1108     """Get the configuration of all nodes.
1109
1110     @rtype: dict
1111     @return: dict of (node, node_info), where node_info is what
1112               would GetNodeInfo return for the node
1113
1114     """
1115     my_dict = dict([(node, self._UnlockedGetNodeInfo(node))
1116                     for node in self._UnlockedGetNodeList()])
1117     return my_dict
1118
1119   def _UnlockedGetMasterCandidateStats(self, exceptions=None):
1120     """Get the number of current and maximum desired and possible candidates.
1121
1122     @type exceptions: list
1123     @param exceptions: if passed, list of nodes that should be ignored
1124     @rtype: tuple
1125     @return: tuple of (current, desired and possible, possible)
1126
1127     """
1128     mc_now = mc_should = mc_max = 0
1129     for node in self._config_data.nodes.values():
1130       if exceptions and node.name in exceptions:
1131         continue
1132       if not (node.offline or node.drained):
1133         mc_max += 1
1134       if node.master_candidate:
1135         mc_now += 1
1136     mc_should = min(mc_max, self._config_data.cluster.candidate_pool_size)
1137     return (mc_now, mc_should, mc_max)
1138
1139   @locking.ssynchronized(_config_lock, shared=1)
1140   def GetMasterCandidateStats(self, exceptions=None):
1141     """Get the number of current and maximum possible candidates.
1142
1143     This is just a wrapper over L{_UnlockedGetMasterCandidateStats}.
1144
1145     @type exceptions: list
1146     @param exceptions: if passed, list of nodes that should be ignored
1147     @rtype: tuple
1148     @return: tuple of (current, max)
1149
1150     """
1151     return self._UnlockedGetMasterCandidateStats(exceptions)
1152
1153   @locking.ssynchronized(_config_lock)
1154   def MaintainCandidatePool(self, exceptions):
1155     """Try to grow the candidate pool to the desired size.
1156
1157     @type exceptions: list
1158     @param exceptions: if passed, list of nodes that should be ignored
1159     @rtype: list
1160     @return: list with the adjusted nodes (L{objects.Node} instances)
1161
1162     """
1163     mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats(exceptions)
1164     mod_list = []
1165     if mc_now < mc_max:
1166       node_list = self._config_data.nodes.keys()
1167       random.shuffle(node_list)
1168       for name in node_list:
1169         if mc_now >= mc_max:
1170           break
1171         node = self._config_data.nodes[name]
1172         if (node.master_candidate or node.offline or node.drained or
1173             node.name in exceptions):
1174           continue
1175         mod_list.append(node)
1176         node.master_candidate = True
1177         node.serial_no += 1
1178         mc_now += 1
1179       if mc_now != mc_max:
1180         # this should not happen
1181         logging.warning("Warning: MaintainCandidatePool didn't manage to"
1182                         " fill the candidate pool (%d/%d)", mc_now, mc_max)
1183       if mod_list:
1184         self._config_data.cluster.serial_no += 1
1185         self._WriteConfig()
1186
1187     return mod_list
1188
1189   def _BumpSerialNo(self):
1190     """Bump up the serial number of the config.
1191
1192     """
1193     self._config_data.serial_no += 1
1194     self._config_data.mtime = time.time()
1195
1196   def _AllUUIDObjects(self):
1197     """Returns all objects with uuid attributes.
1198
1199     """
1200     return (self._config_data.instances.values() +
1201             self._config_data.nodes.values() +
1202             [self._config_data.cluster])
1203
1204   def _OpenConfig(self):
1205     """Read the config data from disk.
1206
1207     """
1208     raw_data = utils.ReadFile(self._cfg_file)
1209
1210     try:
1211       data = objects.ConfigData.FromDict(serializer.Load(raw_data))
1212     except Exception, err:
1213       raise errors.ConfigurationError(err)
1214
1215     # Make sure the configuration has the right version
1216     _ValidateConfig(data)
1217
1218     if (not hasattr(data, 'cluster') or
1219         not hasattr(data.cluster, 'rsahostkeypub')):
1220       raise errors.ConfigurationError("Incomplete configuration"
1221                                       " (missing cluster.rsahostkeypub)")
1222
1223     # Upgrade configuration if needed
1224     data.UpgradeConfig()
1225
1226     self._config_data = data
1227     # reset the last serial as -1 so that the next write will cause
1228     # ssconf update
1229     self._last_cluster_serial = -1
1230
1231     # And finally run our (custom) config upgrade sequence
1232     self._UpgradeConfig()
1233
1234   def _UpgradeConfig(self):
1235     """Run upgrade steps that cannot be done purely in the objects.
1236
1237     This is because some data elements need uniqueness across the
1238     whole configuration, etc.
1239
1240     @warning: this function will call L{_WriteConfig()}, so it needs
1241         to either be called with the lock held or from a safe place
1242         (the constructor)
1243
1244     """
1245     modified = False
1246     for item in self._AllUUIDObjects():
1247       if item.uuid is None:
1248         item.uuid = self._GenerateUniqueID(_UPGRADE_CONFIG_JID)
1249         modified = True
1250     if modified:
1251       self._WriteConfig()
1252       # This is ok even if it acquires the internal lock, as _UpgradeConfig is
1253       # only called at config init time, without the lock held
1254       self.DropECReservations(_UPGRADE_CONFIG_JID)
1255
1256   def _DistributeConfig(self, feedback_fn):
1257     """Distribute the configuration to the other nodes.
1258
1259     Currently, this only copies the configuration file. In the future,
1260     it could be used to encapsulate the 2/3-phase update mechanism.
1261
1262     """
1263     if self._offline:
1264       return True
1265
1266     bad = False
1267
1268     node_list = []
1269     addr_list = []
1270     myhostname = self._my_hostname
1271     # we can skip checking whether _UnlockedGetNodeInfo returns None
1272     # since the node list comes from _UnlocketGetNodeList, and we are
1273     # called with the lock held, so no modifications should take place
1274     # in between
1275     for node_name in self._UnlockedGetNodeList():
1276       if node_name == myhostname:
1277         continue
1278       node_info = self._UnlockedGetNodeInfo(node_name)
1279       if not node_info.master_candidate:
1280         continue
1281       node_list.append(node_info.name)
1282       addr_list.append(node_info.primary_ip)
1283
1284     result = rpc.RpcRunner.call_upload_file(node_list, self._cfg_file,
1285                                             address_list=addr_list)
1286     for to_node, to_result in result.items():
1287       msg = to_result.fail_msg
1288       if msg:
1289         msg = ("Copy of file %s to node %s failed: %s" %
1290                (self._cfg_file, to_node, msg))
1291         logging.error(msg)
1292
1293         if feedback_fn:
1294           feedback_fn(msg)
1295
1296         bad = True
1297
1298     return not bad
1299
1300   def _WriteConfig(self, destination=None, feedback_fn=None):
1301     """Write the configuration data to persistent storage.
1302
1303     """
1304     assert feedback_fn is None or callable(feedback_fn)
1305
1306     # Warn on config errors, but don't abort the save - the
1307     # configuration has already been modified, and we can't revert;
1308     # the best we can do is to warn the user and save as is, leaving
1309     # recovery to the user
1310     config_errors = self._UnlockedVerifyConfig()
1311     if config_errors:
1312       errmsg = ("Configuration data is not consistent: %s" %
1313                 (utils.CommaJoin(config_errors)))
1314       logging.critical(errmsg)
1315       if feedback_fn:
1316         feedback_fn(errmsg)
1317
1318     if destination is None:
1319       destination = self._cfg_file
1320     self._BumpSerialNo()
1321     txt = serializer.Dump(self._config_data.ToDict())
1322
1323     utils.WriteFile(destination, data=txt)
1324
1325     self.write_count += 1
1326
1327     # and redistribute the config file to master candidates
1328     self._DistributeConfig(feedback_fn)
1329
1330     # Write ssconf files on all nodes (including locally)
1331     if self._last_cluster_serial < self._config_data.cluster.serial_no:
1332       if not self._offline:
1333         result = rpc.RpcRunner.call_write_ssconf_files(
1334           self._UnlockedGetOnlineNodeList(),
1335           self._UnlockedGetSsconfValues())
1336
1337         for nname, nresu in result.items():
1338           msg = nresu.fail_msg
1339           if msg:
1340             errmsg = ("Error while uploading ssconf files to"
1341                       " node %s: %s" % (nname, msg))
1342             logging.warning(errmsg)
1343
1344             if feedback_fn:
1345               feedback_fn(errmsg)
1346
1347       self._last_cluster_serial = self._config_data.cluster.serial_no
1348
1349   def _UnlockedGetSsconfValues(self):
1350     """Return the values needed by ssconf.
1351
1352     @rtype: dict
1353     @return: a dictionary with keys the ssconf names and values their
1354         associated value
1355
1356     """
1357     fn = "\n".join
1358     instance_names = utils.NiceSort(self._UnlockedGetInstanceList())
1359     node_names = utils.NiceSort(self._UnlockedGetNodeList())
1360     node_info = [self._UnlockedGetNodeInfo(name) for name in node_names]
1361     node_pri_ips = ["%s %s" % (ninfo.name, ninfo.primary_ip)
1362                     for ninfo in node_info]
1363     node_snd_ips = ["%s %s" % (ninfo.name, ninfo.secondary_ip)
1364                     for ninfo in node_info]
1365
1366     instance_data = fn(instance_names)
1367     off_data = fn(node.name for node in node_info if node.offline)
1368     on_data = fn(node.name for node in node_info if not node.offline)
1369     mc_data = fn(node.name for node in node_info if node.master_candidate)
1370     mc_ips_data = fn(node.primary_ip for node in node_info
1371                      if node.master_candidate)
1372     node_data = fn(node_names)
1373     node_pri_ips_data = fn(node_pri_ips)
1374     node_snd_ips_data = fn(node_snd_ips)
1375
1376     cluster = self._config_data.cluster
1377     cluster_tags = fn(cluster.GetTags())
1378
1379     hypervisor_list = fn(cluster.enabled_hypervisors)
1380
1381     uid_pool = uidpool.FormatUidPool(cluster.uid_pool, separator="\n")
1382
1383     return {
1384       constants.SS_CLUSTER_NAME: cluster.cluster_name,
1385       constants.SS_CLUSTER_TAGS: cluster_tags,
1386       constants.SS_FILE_STORAGE_DIR: cluster.file_storage_dir,
1387       constants.SS_MASTER_CANDIDATES: mc_data,
1388       constants.SS_MASTER_CANDIDATES_IPS: mc_ips_data,
1389       constants.SS_MASTER_IP: cluster.master_ip,
1390       constants.SS_MASTER_NETDEV: cluster.master_netdev,
1391       constants.SS_MASTER_NODE: cluster.master_node,
1392       constants.SS_NODE_LIST: node_data,
1393       constants.SS_NODE_PRIMARY_IPS: node_pri_ips_data,
1394       constants.SS_NODE_SECONDARY_IPS: node_snd_ips_data,
1395       constants.SS_OFFLINE_NODES: off_data,
1396       constants.SS_ONLINE_NODES: on_data,
1397       constants.SS_INSTANCE_LIST: instance_data,
1398       constants.SS_RELEASE_VERSION: constants.RELEASE_VERSION,
1399       constants.SS_HYPERVISOR_LIST: hypervisor_list,
1400       constants.SS_MAINTAIN_NODE_HEALTH: str(cluster.maintain_node_health),
1401       constants.SS_UID_POOL: uid_pool,
1402       }
1403
1404   @locking.ssynchronized(_config_lock, shared=1)
1405   def GetVGName(self):
1406     """Return the volume group name.
1407
1408     """
1409     return self._config_data.cluster.volume_group_name
1410
1411   @locking.ssynchronized(_config_lock)
1412   def SetVGName(self, vg_name):
1413     """Set the volume group name.
1414
1415     """
1416     self._config_data.cluster.volume_group_name = vg_name
1417     self._config_data.cluster.serial_no += 1
1418     self._WriteConfig()
1419
1420   @locking.ssynchronized(_config_lock, shared=1)
1421   def GetMACPrefix(self):
1422     """Return the mac prefix.
1423
1424     """
1425     return self._config_data.cluster.mac_prefix
1426
1427   @locking.ssynchronized(_config_lock, shared=1)
1428   def GetClusterInfo(self):
1429     """Returns information about the cluster
1430
1431     @rtype: L{objects.Cluster}
1432     @return: the cluster object
1433
1434     """
1435     return self._config_data.cluster
1436
1437   @locking.ssynchronized(_config_lock)
1438   def Update(self, target, feedback_fn):
1439     """Notify function to be called after updates.
1440
1441     This function must be called when an object (as returned by
1442     GetInstanceInfo, GetNodeInfo, GetCluster) has been updated and the
1443     caller wants the modifications saved to the backing store. Note
1444     that all modified objects will be saved, but the target argument
1445     is the one the caller wants to ensure that it's saved.
1446
1447     @param target: an instance of either L{objects.Cluster},
1448         L{objects.Node} or L{objects.Instance} which is existing in
1449         the cluster
1450     @param feedback_fn: Callable feedback function
1451
1452     """
1453     if self._config_data is None:
1454       raise errors.ProgrammerError("Configuration file not read,"
1455                                    " cannot save.")
1456     update_serial = False
1457     if isinstance(target, objects.Cluster):
1458       test = target == self._config_data.cluster
1459     elif isinstance(target, objects.Node):
1460       test = target in self._config_data.nodes.values()
1461       update_serial = True
1462     elif isinstance(target, objects.Instance):
1463       test = target in self._config_data.instances.values()
1464     else:
1465       raise errors.ProgrammerError("Invalid object type (%s) passed to"
1466                                    " ConfigWriter.Update" % type(target))
1467     if not test:
1468       raise errors.ConfigurationError("Configuration updated since object"
1469                                       " has been read or unknown object")
1470     target.serial_no += 1
1471     target.mtime = now = time.time()
1472
1473     if update_serial:
1474       # for node updates, we need to increase the cluster serial too
1475       self._config_data.cluster.serial_no += 1
1476       self._config_data.cluster.mtime = now
1477
1478     if isinstance(target, objects.Instance):
1479       self._UnlockedReleaseDRBDMinors(target.name)
1480
1481     self._WriteConfig(feedback_fn=feedback_fn)
1482
1483   @locking.ssynchronized(_config_lock)
1484   def DropECReservations(self, ec_id):
1485     """Drop per-execution-context reservations
1486
1487     """
1488     for rm in self._all_rms:
1489       rm.DropECReservations(ec_id)