rlib2: Set tag operation param “name” to None for cluster tags
[ganeti-local] / lib / config.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Configuration management for Ganeti
23
24 This module provides the interface to the Ganeti cluster configuration.
25
26 The configuration data is stored on every node but is updated on the master
27 only. After each update, the master distributes the data to the other nodes.
28
29 Currently, the data storage format is JSON. YAML was slow and consuming too
30 much memory.
31
32 """
33
34 import os
35 import random
36 import logging
37 import time
38
39 from ganeti import errors
40 from ganeti import locking
41 from ganeti import utils
42 from ganeti import constants
43 from ganeti import rpc
44 from ganeti import objects
45 from ganeti import serializer
46 from ganeti import uidpool
47 from ganeti import netutils
48
49
50 _config_lock = locking.SharedLock("ConfigWriter")
51
52 # job id used for resource management at config upgrade time
53 _UPGRADE_CONFIG_JID = "jid-cfg-upgrade"
54
55
56 def _ValidateConfig(data):
57   """Verifies that a configuration objects looks valid.
58
59   This only verifies the version of the configuration.
60
61   @raise errors.ConfigurationError: if the version differs from what
62       we expect
63
64   """
65   if data.version != constants.CONFIG_VERSION:
66     raise errors.ConfigurationError("Cluster configuration version"
67                                     " mismatch, got %s instead of %s" %
68                                     (data.version,
69                                      constants.CONFIG_VERSION))
70
71
72 class TemporaryReservationManager:
73   """A temporary resource reservation manager.
74
75   This is used to reserve resources in a job, before using them, making sure
76   other jobs cannot get them in the meantime.
77
78   """
79   def __init__(self):
80     self._ec_reserved = {}
81
82   def Reserved(self, resource):
83     for holder_reserved in self._ec_reserved.items():
84       if resource in holder_reserved:
85         return True
86     return False
87
88   def Reserve(self, ec_id, resource):
89     if self.Reserved(resource):
90       raise errors.ReservationError("Duplicate reservation for resource: %s." %
91                                     (resource))
92     if ec_id not in self._ec_reserved:
93       self._ec_reserved[ec_id] = set([resource])
94     else:
95       self._ec_reserved[ec_id].add(resource)
96
97   def DropECReservations(self, ec_id):
98     if ec_id in self._ec_reserved:
99       del self._ec_reserved[ec_id]
100
101   def GetReserved(self):
102     all_reserved = set()
103     for holder_reserved in self._ec_reserved.values():
104       all_reserved.update(holder_reserved)
105     return all_reserved
106
107   def Generate(self, existing, generate_one_fn, ec_id):
108     """Generate a new resource of this type
109
110     """
111     assert callable(generate_one_fn)
112
113     all_elems = self.GetReserved()
114     all_elems.update(existing)
115     retries = 64
116     while retries > 0:
117       new_resource = generate_one_fn()
118       if new_resource is not None and new_resource not in all_elems:
119         break
120     else:
121       raise errors.ConfigurationError("Not able generate new resource"
122                                       " (last tried: %s)" % new_resource)
123     self.Reserve(ec_id, new_resource)
124     return new_resource
125
126
127 class ConfigWriter:
128   """The interface to the cluster configuration.
129
130   @ivar _temporary_lvs: reservation manager for temporary LVs
131   @ivar _all_rms: a list of all temporary reservation managers
132
133   """
134   def __init__(self, cfg_file=None, offline=False):
135     self.write_count = 0
136     self._lock = _config_lock
137     self._config_data = None
138     self._offline = offline
139     if cfg_file is None:
140       self._cfg_file = constants.CLUSTER_CONF_FILE
141     else:
142       self._cfg_file = cfg_file
143     self._temporary_ids = TemporaryReservationManager()
144     self._temporary_drbds = {}
145     self._temporary_macs = TemporaryReservationManager()
146     self._temporary_secrets = TemporaryReservationManager()
147     self._temporary_lvs = TemporaryReservationManager()
148     self._all_rms = [self._temporary_ids, self._temporary_macs,
149                      self._temporary_secrets, self._temporary_lvs]
150     # Note: in order to prevent errors when resolving our name in
151     # _DistributeConfig, we compute it here once and reuse it; it's
152     # better to raise an error before starting to modify the config
153     # file than after it was modified
154     self._my_hostname = netutils.HostInfo().name
155     self._last_cluster_serial = -1
156     self._OpenConfig()
157
158   # this method needs to be static, so that we can call it on the class
159   @staticmethod
160   def IsCluster():
161     """Check if the cluster is configured.
162
163     """
164     return os.path.exists(constants.CLUSTER_CONF_FILE)
165
166   def _GenerateOneMAC(self):
167     """Generate one mac address
168
169     """
170     prefix = self._config_data.cluster.mac_prefix
171     byte1 = random.randrange(0, 256)
172     byte2 = random.randrange(0, 256)
173     byte3 = random.randrange(0, 256)
174     mac = "%s:%02x:%02x:%02x" % (prefix, byte1, byte2, byte3)
175     return mac
176
177   @locking.ssynchronized(_config_lock, shared=1)
178   def GenerateMAC(self, ec_id):
179     """Generate a MAC for an instance.
180
181     This should check the current instances for duplicates.
182
183     """
184     existing = self._AllMACs()
185     return self._temporary_ids.Generate(existing, self._GenerateOneMAC, ec_id)
186
187   @locking.ssynchronized(_config_lock, shared=1)
188   def ReserveMAC(self, mac, ec_id):
189     """Reserve a MAC for an instance.
190
191     This only checks instances managed by this cluster, it does not
192     check for potential collisions elsewhere.
193
194     """
195     all_macs = self._AllMACs()
196     if mac in all_macs:
197       raise errors.ReservationError("mac already in use")
198     else:
199       self._temporary_macs.Reserve(mac, ec_id)
200
201   @locking.ssynchronized(_config_lock, shared=1)
202   def ReserveLV(self, lv_name, ec_id):
203     """Reserve an VG/LV pair for an instance.
204
205     @type lv_name: string
206     @param lv_name: the logical volume name to reserve
207
208     """
209     all_lvs = self._AllLVs()
210     if lv_name in all_lvs:
211       raise errors.ReservationError("LV already in use")
212     else:
213       self._temporary_lvs.Reserve(lv_name, ec_id)
214
215   @locking.ssynchronized(_config_lock, shared=1)
216   def GenerateDRBDSecret(self, ec_id):
217     """Generate a DRBD secret.
218
219     This checks the current disks for duplicates.
220
221     """
222     return self._temporary_secrets.Generate(self._AllDRBDSecrets(),
223                                             utils.GenerateSecret,
224                                             ec_id)
225
226   def _AllLVs(self):
227     """Compute the list of all LVs.
228
229     """
230     lvnames = set()
231     for instance in self._config_data.instances.values():
232       node_data = instance.MapLVsByNode()
233       for lv_list in node_data.values():
234         lvnames.update(lv_list)
235     return lvnames
236
237   def _AllIDs(self, include_temporary):
238     """Compute the list of all UUIDs and names we have.
239
240     @type include_temporary: boolean
241     @param include_temporary: whether to include the _temporary_ids set
242     @rtype: set
243     @return: a set of IDs
244
245     """
246     existing = set()
247     if include_temporary:
248       existing.update(self._temporary_ids.GetReserved())
249     existing.update(self._AllLVs())
250     existing.update(self._config_data.instances.keys())
251     existing.update(self._config_data.nodes.keys())
252     existing.update([i.uuid for i in self._AllUUIDObjects() if i.uuid])
253     return existing
254
255   def _GenerateUniqueID(self, ec_id):
256     """Generate an unique UUID.
257
258     This checks the current node, instances and disk names for
259     duplicates.
260
261     @rtype: string
262     @return: the unique id
263
264     """
265     existing = self._AllIDs(include_temporary=False)
266     return self._temporary_ids.Generate(existing, utils.NewUUID, ec_id)
267
268   @locking.ssynchronized(_config_lock, shared=1)
269   def GenerateUniqueID(self, ec_id):
270     """Generate an unique ID.
271
272     This is just a wrapper over the unlocked version.
273
274     @type ec_id: string
275     @param ec_id: unique id for the job to reserve the id to
276
277     """
278     return self._GenerateUniqueID(ec_id)
279
280   def _AllMACs(self):
281     """Return all MACs present in the config.
282
283     @rtype: list
284     @return: the list of all MACs
285
286     """
287     result = []
288     for instance in self._config_data.instances.values():
289       for nic in instance.nics:
290         result.append(nic.mac)
291
292     return result
293
294   def _AllDRBDSecrets(self):
295     """Return all DRBD secrets present in the config.
296
297     @rtype: list
298     @return: the list of all DRBD secrets
299
300     """
301     def helper(disk, result):
302       """Recursively gather secrets from this disk."""
303       if disk.dev_type == constants.DT_DRBD8:
304         result.append(disk.logical_id[5])
305       if disk.children:
306         for child in disk.children:
307           helper(child, result)
308
309     result = []
310     for instance in self._config_data.instances.values():
311       for disk in instance.disks:
312         helper(disk, result)
313
314     return result
315
316   def _CheckDiskIDs(self, disk, l_ids, p_ids):
317     """Compute duplicate disk IDs
318
319     @type disk: L{objects.Disk}
320     @param disk: the disk at which to start searching
321     @type l_ids: list
322     @param l_ids: list of current logical ids
323     @type p_ids: list
324     @param p_ids: list of current physical ids
325     @rtype: list
326     @return: a list of error messages
327
328     """
329     result = []
330     if disk.logical_id is not None:
331       if disk.logical_id in l_ids:
332         result.append("duplicate logical id %s" % str(disk.logical_id))
333       else:
334         l_ids.append(disk.logical_id)
335     if disk.physical_id is not None:
336       if disk.physical_id in p_ids:
337         result.append("duplicate physical id %s" % str(disk.physical_id))
338       else:
339         p_ids.append(disk.physical_id)
340
341     if disk.children:
342       for child in disk.children:
343         result.extend(self._CheckDiskIDs(child, l_ids, p_ids))
344     return result
345
346   def _UnlockedVerifyConfig(self):
347     """Verify function.
348
349     @rtype: list
350     @return: a list of error messages; a non-empty list signifies
351         configuration errors
352
353     """
354     result = []
355     seen_macs = []
356     ports = {}
357     data = self._config_data
358     seen_lids = []
359     seen_pids = []
360
361     # global cluster checks
362     if not data.cluster.enabled_hypervisors:
363       result.append("enabled hypervisors list doesn't have any entries")
364     invalid_hvs = set(data.cluster.enabled_hypervisors) - constants.HYPER_TYPES
365     if invalid_hvs:
366       result.append("enabled hypervisors contains invalid entries: %s" %
367                     invalid_hvs)
368     missing_hvp = (set(data.cluster.enabled_hypervisors) -
369                    set(data.cluster.hvparams.keys()))
370     if missing_hvp:
371       result.append("hypervisor parameters missing for the enabled"
372                     " hypervisor(s) %s" % utils.CommaJoin(missing_hvp))
373
374     if data.cluster.master_node not in data.nodes:
375       result.append("cluster has invalid primary node '%s'" %
376                     data.cluster.master_node)
377
378     # per-instance checks
379     for instance_name in data.instances:
380       instance = data.instances[instance_name]
381       if instance.name != instance_name:
382         result.append("instance '%s' is indexed by wrong name '%s'" %
383                       (instance.name, instance_name))
384       if instance.primary_node not in data.nodes:
385         result.append("instance '%s' has invalid primary node '%s'" %
386                       (instance_name, instance.primary_node))
387       for snode in instance.secondary_nodes:
388         if snode not in data.nodes:
389           result.append("instance '%s' has invalid secondary node '%s'" %
390                         (instance_name, snode))
391       for idx, nic in enumerate(instance.nics):
392         if nic.mac in seen_macs:
393           result.append("instance '%s' has NIC %d mac %s duplicate" %
394                         (instance_name, idx, nic.mac))
395         else:
396           seen_macs.append(nic.mac)
397
398       # gather the drbd ports for duplicate checks
399       for dsk in instance.disks:
400         if dsk.dev_type in constants.LDS_DRBD:
401           tcp_port = dsk.logical_id[2]
402           if tcp_port not in ports:
403             ports[tcp_port] = []
404           ports[tcp_port].append((instance.name, "drbd disk %s" % dsk.iv_name))
405       # gather network port reservation
406       net_port = getattr(instance, "network_port", None)
407       if net_port is not None:
408         if net_port not in ports:
409           ports[net_port] = []
410         ports[net_port].append((instance.name, "network port"))
411
412       # instance disk verify
413       for idx, disk in enumerate(instance.disks):
414         result.extend(["instance '%s' disk %d error: %s" %
415                        (instance.name, idx, msg) for msg in disk.Verify()])
416         result.extend(self._CheckDiskIDs(disk, seen_lids, seen_pids))
417
418     # cluster-wide pool of free ports
419     for free_port in data.cluster.tcpudp_port_pool:
420       if free_port not in ports:
421         ports[free_port] = []
422       ports[free_port].append(("cluster", "port marked as free"))
423
424     # compute tcp/udp duplicate ports
425     keys = ports.keys()
426     keys.sort()
427     for pnum in keys:
428       pdata = ports[pnum]
429       if len(pdata) > 1:
430         txt = utils.CommaJoin(["%s/%s" % val for val in pdata])
431         result.append("tcp/udp port %s has duplicates: %s" % (pnum, txt))
432
433     # highest used tcp port check
434     if keys:
435       if keys[-1] > data.cluster.highest_used_port:
436         result.append("Highest used port mismatch, saved %s, computed %s" %
437                       (data.cluster.highest_used_port, keys[-1]))
438
439     if not data.nodes[data.cluster.master_node].master_candidate:
440       result.append("Master node is not a master candidate")
441
442     # master candidate checks
443     mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats()
444     if mc_now < mc_max:
445       result.append("Not enough master candidates: actual %d, target %d" %
446                     (mc_now, mc_max))
447
448     # node checks
449     for node_name, node in data.nodes.items():
450       if node.name != node_name:
451         result.append("Node '%s' is indexed by wrong name '%s'" %
452                       (node.name, node_name))
453       if [node.master_candidate, node.drained, node.offline].count(True) > 1:
454         result.append("Node %s state is invalid: master_candidate=%s,"
455                       " drain=%s, offline=%s" %
456                       (node.name, node.master_candidate, node.drain,
457                        node.offline))
458
459     # drbd minors check
460     _, duplicates = self._UnlockedComputeDRBDMap()
461     for node, minor, instance_a, instance_b in duplicates:
462       result.append("DRBD minor %d on node %s is assigned twice to instances"
463                     " %s and %s" % (minor, node, instance_a, instance_b))
464
465     # IP checks
466     default_nicparams = data.cluster.nicparams[constants.PP_DEFAULT]
467     ips = {}
468
469     def _AddIpAddress(ip, name):
470       ips.setdefault(ip, []).append(name)
471
472     _AddIpAddress(data.cluster.master_ip, "cluster_ip")
473
474     for node in data.nodes.values():
475       _AddIpAddress(node.primary_ip, "node:%s/primary" % node.name)
476       if node.secondary_ip != node.primary_ip:
477         _AddIpAddress(node.secondary_ip, "node:%s/secondary" % node.name)
478
479     for instance in data.instances.values():
480       for idx, nic in enumerate(instance.nics):
481         if nic.ip is None:
482           continue
483
484         nicparams = objects.FillDict(default_nicparams, nic.nicparams)
485         nic_mode = nicparams[constants.NIC_MODE]
486         nic_link = nicparams[constants.NIC_LINK]
487
488         if nic_mode == constants.NIC_MODE_BRIDGED:
489           link = "bridge:%s" % nic_link
490         elif nic_mode == constants.NIC_MODE_ROUTED:
491           link = "route:%s" % nic_link
492         else:
493           raise errors.ProgrammerError("NIC mode '%s' not handled" % nic_mode)
494
495         _AddIpAddress("%s/%s" % (link, nic.ip),
496                       "instance:%s/nic:%d" % (instance.name, idx))
497
498     for ip, owners in ips.items():
499       if len(owners) > 1:
500         result.append("IP address %s is used by multiple owners: %s" %
501                       (ip, utils.CommaJoin(owners)))
502
503     return result
504
505   @locking.ssynchronized(_config_lock, shared=1)
506   def VerifyConfig(self):
507     """Verify function.
508
509     This is just a wrapper over L{_UnlockedVerifyConfig}.
510
511     @rtype: list
512     @return: a list of error messages; a non-empty list signifies
513         configuration errors
514
515     """
516     return self._UnlockedVerifyConfig()
517
518   def _UnlockedSetDiskID(self, disk, node_name):
519     """Convert the unique ID to the ID needed on the target nodes.
520
521     This is used only for drbd, which needs ip/port configuration.
522
523     The routine descends down and updates its children also, because
524     this helps when the only the top device is passed to the remote
525     node.
526
527     This function is for internal use, when the config lock is already held.
528
529     """
530     if disk.children:
531       for child in disk.children:
532         self._UnlockedSetDiskID(child, node_name)
533
534     if disk.logical_id is None and disk.physical_id is not None:
535       return
536     if disk.dev_type == constants.LD_DRBD8:
537       pnode, snode, port, pminor, sminor, secret = disk.logical_id
538       if node_name not in (pnode, snode):
539         raise errors.ConfigurationError("DRBD device not knowing node %s" %
540                                         node_name)
541       pnode_info = self._UnlockedGetNodeInfo(pnode)
542       snode_info = self._UnlockedGetNodeInfo(snode)
543       if pnode_info is None or snode_info is None:
544         raise errors.ConfigurationError("Can't find primary or secondary node"
545                                         " for %s" % str(disk))
546       p_data = (pnode_info.secondary_ip, port)
547       s_data = (snode_info.secondary_ip, port)
548       if pnode == node_name:
549         disk.physical_id = p_data + s_data + (pminor, secret)
550       else: # it must be secondary, we tested above
551         disk.physical_id = s_data + p_data + (sminor, secret)
552     else:
553       disk.physical_id = disk.logical_id
554     return
555
556   @locking.ssynchronized(_config_lock)
557   def SetDiskID(self, disk, node_name):
558     """Convert the unique ID to the ID needed on the target nodes.
559
560     This is used only for drbd, which needs ip/port configuration.
561
562     The routine descends down and updates its children also, because
563     this helps when the only the top device is passed to the remote
564     node.
565
566     """
567     return self._UnlockedSetDiskID(disk, node_name)
568
569   @locking.ssynchronized(_config_lock)
570   def AddTcpUdpPort(self, port):
571     """Adds a new port to the available port pool.
572
573     """
574     if not isinstance(port, int):
575       raise errors.ProgrammerError("Invalid type passed for port")
576
577     self._config_data.cluster.tcpudp_port_pool.add(port)
578     self._WriteConfig()
579
580   @locking.ssynchronized(_config_lock, shared=1)
581   def GetPortList(self):
582     """Returns a copy of the current port list.
583
584     """
585     return self._config_data.cluster.tcpudp_port_pool.copy()
586
587   @locking.ssynchronized(_config_lock)
588   def AllocatePort(self):
589     """Allocate a port.
590
591     The port will be taken from the available port pool or from the
592     default port range (and in this case we increase
593     highest_used_port).
594
595     """
596     # If there are TCP/IP ports configured, we use them first.
597     if self._config_data.cluster.tcpudp_port_pool:
598       port = self._config_data.cluster.tcpudp_port_pool.pop()
599     else:
600       port = self._config_data.cluster.highest_used_port + 1
601       if port >= constants.LAST_DRBD_PORT:
602         raise errors.ConfigurationError("The highest used port is greater"
603                                         " than %s. Aborting." %
604                                         constants.LAST_DRBD_PORT)
605       self._config_data.cluster.highest_used_port = port
606
607     self._WriteConfig()
608     return port
609
610   def _UnlockedComputeDRBDMap(self):
611     """Compute the used DRBD minor/nodes.
612
613     @rtype: (dict, list)
614     @return: dictionary of node_name: dict of minor: instance_name;
615         the returned dict will have all the nodes in it (even if with
616         an empty list), and a list of duplicates; if the duplicates
617         list is not empty, the configuration is corrupted and its caller
618         should raise an exception
619
620     """
621     def _AppendUsedPorts(instance_name, disk, used):
622       duplicates = []
623       if disk.dev_type == constants.LD_DRBD8 and len(disk.logical_id) >= 5:
624         node_a, node_b, _, minor_a, minor_b = disk.logical_id[:5]
625         for node, port in ((node_a, minor_a), (node_b, minor_b)):
626           assert node in used, ("Node '%s' of instance '%s' not found"
627                                 " in node list" % (node, instance_name))
628           if port in used[node]:
629             duplicates.append((node, port, instance_name, used[node][port]))
630           else:
631             used[node][port] = instance_name
632       if disk.children:
633         for child in disk.children:
634           duplicates.extend(_AppendUsedPorts(instance_name, child, used))
635       return duplicates
636
637     duplicates = []
638     my_dict = dict((node, {}) for node in self._config_data.nodes)
639     for instance in self._config_data.instances.itervalues():
640       for disk in instance.disks:
641         duplicates.extend(_AppendUsedPorts(instance.name, disk, my_dict))
642     for (node, minor), instance in self._temporary_drbds.iteritems():
643       if minor in my_dict[node] and my_dict[node][minor] != instance:
644         duplicates.append((node, minor, instance, my_dict[node][minor]))
645       else:
646         my_dict[node][minor] = instance
647     return my_dict, duplicates
648
649   @locking.ssynchronized(_config_lock)
650   def ComputeDRBDMap(self):
651     """Compute the used DRBD minor/nodes.
652
653     This is just a wrapper over L{_UnlockedComputeDRBDMap}.
654
655     @return: dictionary of node_name: dict of minor: instance_name;
656         the returned dict will have all the nodes in it (even if with
657         an empty list).
658
659     """
660     d_map, duplicates = self._UnlockedComputeDRBDMap()
661     if duplicates:
662       raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
663                                       str(duplicates))
664     return d_map
665
666   @locking.ssynchronized(_config_lock)
667   def AllocateDRBDMinor(self, nodes, instance):
668     """Allocate a drbd minor.
669
670     The free minor will be automatically computed from the existing
671     devices. A node can be given multiple times in order to allocate
672     multiple minors. The result is the list of minors, in the same
673     order as the passed nodes.
674
675     @type instance: string
676     @param instance: the instance for which we allocate minors
677
678     """
679     assert isinstance(instance, basestring), \
680            "Invalid argument '%s' passed to AllocateDRBDMinor" % instance
681
682     d_map, duplicates = self._UnlockedComputeDRBDMap()
683     if duplicates:
684       raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
685                                       str(duplicates))
686     result = []
687     for nname in nodes:
688       ndata = d_map[nname]
689       if not ndata:
690         # no minors used, we can start at 0
691         result.append(0)
692         ndata[0] = instance
693         self._temporary_drbds[(nname, 0)] = instance
694         continue
695       keys = ndata.keys()
696       keys.sort()
697       ffree = utils.FirstFree(keys)
698       if ffree is None:
699         # return the next minor
700         # TODO: implement high-limit check
701         minor = keys[-1] + 1
702       else:
703         minor = ffree
704       # double-check minor against current instances
705       assert minor not in d_map[nname], \
706              ("Attempt to reuse allocated DRBD minor %d on node %s,"
707               " already allocated to instance %s" %
708               (minor, nname, d_map[nname][minor]))
709       ndata[minor] = instance
710       # double-check minor against reservation
711       r_key = (nname, minor)
712       assert r_key not in self._temporary_drbds, \
713              ("Attempt to reuse reserved DRBD minor %d on node %s,"
714               " reserved for instance %s" %
715               (minor, nname, self._temporary_drbds[r_key]))
716       self._temporary_drbds[r_key] = instance
717       result.append(minor)
718     logging.debug("Request to allocate drbd minors, input: %s, returning %s",
719                   nodes, result)
720     return result
721
722   def _UnlockedReleaseDRBDMinors(self, instance):
723     """Release temporary drbd minors allocated for a given instance.
724
725     @type instance: string
726     @param instance: the instance for which temporary minors should be
727                      released
728
729     """
730     assert isinstance(instance, basestring), \
731            "Invalid argument passed to ReleaseDRBDMinors"
732     for key, name in self._temporary_drbds.items():
733       if name == instance:
734         del self._temporary_drbds[key]
735
736   @locking.ssynchronized(_config_lock)
737   def ReleaseDRBDMinors(self, instance):
738     """Release temporary drbd minors allocated for a given instance.
739
740     This should be called on the error paths, on the success paths
741     it's automatically called by the ConfigWriter add and update
742     functions.
743
744     This function is just a wrapper over L{_UnlockedReleaseDRBDMinors}.
745
746     @type instance: string
747     @param instance: the instance for which temporary minors should be
748                      released
749
750     """
751     self._UnlockedReleaseDRBDMinors(instance)
752
753   @locking.ssynchronized(_config_lock, shared=1)
754   def GetConfigVersion(self):
755     """Get the configuration version.
756
757     @return: Config version
758
759     """
760     return self._config_data.version
761
762   @locking.ssynchronized(_config_lock, shared=1)
763   def GetClusterName(self):
764     """Get cluster name.
765
766     @return: Cluster name
767
768     """
769     return self._config_data.cluster.cluster_name
770
771   @locking.ssynchronized(_config_lock, shared=1)
772   def GetMasterNode(self):
773     """Get the hostname of the master node for this cluster.
774
775     @return: Master hostname
776
777     """
778     return self._config_data.cluster.master_node
779
780   @locking.ssynchronized(_config_lock, shared=1)
781   def GetMasterIP(self):
782     """Get the IP of the master node for this cluster.
783
784     @return: Master IP
785
786     """
787     return self._config_data.cluster.master_ip
788
789   @locking.ssynchronized(_config_lock, shared=1)
790   def GetMasterNetdev(self):
791     """Get the master network device for this cluster.
792
793     """
794     return self._config_data.cluster.master_netdev
795
796   @locking.ssynchronized(_config_lock, shared=1)
797   def GetFileStorageDir(self):
798     """Get the file storage dir for this cluster.
799
800     """
801     return self._config_data.cluster.file_storage_dir
802
803   @locking.ssynchronized(_config_lock, shared=1)
804   def GetHypervisorType(self):
805     """Get the hypervisor type for this cluster.
806
807     """
808     return self._config_data.cluster.enabled_hypervisors[0]
809
810   @locking.ssynchronized(_config_lock, shared=1)
811   def GetHostKey(self):
812     """Return the rsa hostkey from the config.
813
814     @rtype: string
815     @return: the rsa hostkey
816
817     """
818     return self._config_data.cluster.rsahostkeypub
819
820   @locking.ssynchronized(_config_lock, shared=1)
821   def GetDefaultIAllocator(self):
822     """Get the default instance allocator for this cluster.
823
824     """
825     return self._config_data.cluster.default_iallocator
826
827   @locking.ssynchronized(_config_lock)
828   def AddInstance(self, instance, ec_id):
829     """Add an instance to the config.
830
831     This should be used after creating a new instance.
832
833     @type instance: L{objects.Instance}
834     @param instance: the instance object
835
836     """
837     if not isinstance(instance, objects.Instance):
838       raise errors.ProgrammerError("Invalid type passed to AddInstance")
839
840     if instance.disk_template != constants.DT_DISKLESS:
841       all_lvs = instance.MapLVsByNode()
842       logging.info("Instance '%s' DISK_LAYOUT: %s", instance.name, all_lvs)
843
844     all_macs = self._AllMACs()
845     for nic in instance.nics:
846       if nic.mac in all_macs:
847         raise errors.ConfigurationError("Cannot add instance %s:"
848                                         " MAC address '%s' already in use." %
849                                         (instance.name, nic.mac))
850
851     self._EnsureUUID(instance, ec_id)
852
853     instance.serial_no = 1
854     instance.ctime = instance.mtime = time.time()
855     self._config_data.instances[instance.name] = instance
856     self._config_data.cluster.serial_no += 1
857     self._UnlockedReleaseDRBDMinors(instance.name)
858     self._WriteConfig()
859
860   def _EnsureUUID(self, item, ec_id):
861     """Ensures a given object has a valid UUID.
862
863     @param item: the instance or node to be checked
864     @param ec_id: the execution context id for the uuid reservation
865
866     """
867     if not item.uuid:
868       item.uuid = self._GenerateUniqueID(ec_id)
869     elif item.uuid in self._AllIDs(include_temporary=True):
870       raise errors.ConfigurationError("Cannot add '%s': UUID %s already"
871                                       " in use" % (item.name, item.uuid))
872
873   def _SetInstanceStatus(self, instance_name, status):
874     """Set the instance's status to a given value.
875
876     """
877     assert isinstance(status, bool), \
878            "Invalid status '%s' passed to SetInstanceStatus" % (status,)
879
880     if instance_name not in self._config_data.instances:
881       raise errors.ConfigurationError("Unknown instance '%s'" %
882                                       instance_name)
883     instance = self._config_data.instances[instance_name]
884     if instance.admin_up != status:
885       instance.admin_up = status
886       instance.serial_no += 1
887       instance.mtime = time.time()
888       self._WriteConfig()
889
890   @locking.ssynchronized(_config_lock)
891   def MarkInstanceUp(self, instance_name):
892     """Mark the instance status to up in the config.
893
894     """
895     self._SetInstanceStatus(instance_name, True)
896
897   @locking.ssynchronized(_config_lock)
898   def RemoveInstance(self, instance_name):
899     """Remove the instance from the configuration.
900
901     """
902     if instance_name not in self._config_data.instances:
903       raise errors.ConfigurationError("Unknown instance '%s'" % instance_name)
904     del self._config_data.instances[instance_name]
905     self._config_data.cluster.serial_no += 1
906     self._WriteConfig()
907
908   @locking.ssynchronized(_config_lock)
909   def RenameInstance(self, old_name, new_name):
910     """Rename an instance.
911
912     This needs to be done in ConfigWriter and not by RemoveInstance
913     combined with AddInstance as only we can guarantee an atomic
914     rename.
915
916     """
917     if old_name not in self._config_data.instances:
918       raise errors.ConfigurationError("Unknown instance '%s'" % old_name)
919     inst = self._config_data.instances[old_name]
920     del self._config_data.instances[old_name]
921     inst.name = new_name
922
923     for disk in inst.disks:
924       if disk.dev_type == constants.LD_FILE:
925         # rename the file paths in logical and physical id
926         file_storage_dir = os.path.dirname(os.path.dirname(disk.logical_id[1]))
927         disk.physical_id = disk.logical_id = (disk.logical_id[0],
928                                               utils.PathJoin(file_storage_dir,
929                                                              inst.name,
930                                                              disk.iv_name))
931
932     self._config_data.instances[inst.name] = inst
933     self._WriteConfig()
934
935   @locking.ssynchronized(_config_lock)
936   def MarkInstanceDown(self, instance_name):
937     """Mark the status of an instance to down in the configuration.
938
939     """
940     self._SetInstanceStatus(instance_name, False)
941
942   def _UnlockedGetInstanceList(self):
943     """Get the list of instances.
944
945     This function is for internal use, when the config lock is already held.
946
947     """
948     return self._config_data.instances.keys()
949
950   @locking.ssynchronized(_config_lock, shared=1)
951   def GetInstanceList(self):
952     """Get the list of instances.
953
954     @return: array of instances, ex. ['instance2.example.com',
955         'instance1.example.com']
956
957     """
958     return self._UnlockedGetInstanceList()
959
960   @locking.ssynchronized(_config_lock, shared=1)
961   def ExpandInstanceName(self, short_name):
962     """Attempt to expand an incomplete instance name.
963
964     """
965     return utils.MatchNameComponent(short_name,
966                                     self._config_data.instances.keys(),
967                                     case_sensitive=False)
968
969   def _UnlockedGetInstanceInfo(self, instance_name):
970     """Returns information about an instance.
971
972     This function is for internal use, when the config lock is already held.
973
974     """
975     if instance_name not in self._config_data.instances:
976       return None
977
978     return self._config_data.instances[instance_name]
979
980   @locking.ssynchronized(_config_lock, shared=1)
981   def GetInstanceInfo(self, instance_name):
982     """Returns information about an instance.
983
984     It takes the information from the configuration file. Other information of
985     an instance are taken from the live systems.
986
987     @param instance_name: name of the instance, e.g.
988         I{instance1.example.com}
989
990     @rtype: L{objects.Instance}
991     @return: the instance object
992
993     """
994     return self._UnlockedGetInstanceInfo(instance_name)
995
996   @locking.ssynchronized(_config_lock, shared=1)
997   def GetAllInstancesInfo(self):
998     """Get the configuration of all instances.
999
1000     @rtype: dict
1001     @return: dict of (instance, instance_info), where instance_info is what
1002               would GetInstanceInfo return for the node
1003
1004     """
1005     my_dict = dict([(instance, self._UnlockedGetInstanceInfo(instance))
1006                     for instance in self._UnlockedGetInstanceList()])
1007     return my_dict
1008
1009   @locking.ssynchronized(_config_lock)
1010   def AddNode(self, node, ec_id):
1011     """Add a node to the configuration.
1012
1013     @type node: L{objects.Node}
1014     @param node: a Node instance
1015
1016     """
1017     logging.info("Adding node %s to configuration", node.name)
1018
1019     self._EnsureUUID(node, ec_id)
1020
1021     node.serial_no = 1
1022     node.ctime = node.mtime = time.time()
1023     self._config_data.nodes[node.name] = node
1024     self._config_data.cluster.serial_no += 1
1025     self._WriteConfig()
1026
1027   @locking.ssynchronized(_config_lock)
1028   def RemoveNode(self, node_name):
1029     """Remove a node from the configuration.
1030
1031     """
1032     logging.info("Removing node %s from configuration", node_name)
1033
1034     if node_name not in self._config_data.nodes:
1035       raise errors.ConfigurationError("Unknown node '%s'" % node_name)
1036
1037     del self._config_data.nodes[node_name]
1038     self._config_data.cluster.serial_no += 1
1039     self._WriteConfig()
1040
1041   @locking.ssynchronized(_config_lock, shared=1)
1042   def ExpandNodeName(self, short_name):
1043     """Attempt to expand an incomplete instance name.
1044
1045     """
1046     return utils.MatchNameComponent(short_name,
1047                                     self._config_data.nodes.keys(),
1048                                     case_sensitive=False)
1049
1050   def _UnlockedGetNodeInfo(self, node_name):
1051     """Get the configuration of a node, as stored in the config.
1052
1053     This function is for internal use, when the config lock is already
1054     held.
1055
1056     @param node_name: the node name, e.g. I{node1.example.com}
1057
1058     @rtype: L{objects.Node}
1059     @return: the node object
1060
1061     """
1062     if node_name not in self._config_data.nodes:
1063       return None
1064
1065     return self._config_data.nodes[node_name]
1066
1067   @locking.ssynchronized(_config_lock, shared=1)
1068   def GetNodeInfo(self, node_name):
1069     """Get the configuration of a node, as stored in the config.
1070
1071     This is just a locked wrapper over L{_UnlockedGetNodeInfo}.
1072
1073     @param node_name: the node name, e.g. I{node1.example.com}
1074
1075     @rtype: L{objects.Node}
1076     @return: the node object
1077
1078     """
1079     return self._UnlockedGetNodeInfo(node_name)
1080
1081   def _UnlockedGetNodeList(self):
1082     """Return the list of nodes which are in the configuration.
1083
1084     This function is for internal use, when the config lock is already
1085     held.
1086
1087     @rtype: list
1088
1089     """
1090     return self._config_data.nodes.keys()
1091
1092   @locking.ssynchronized(_config_lock, shared=1)
1093   def GetNodeList(self):
1094     """Return the list of nodes which are in the configuration.
1095
1096     """
1097     return self._UnlockedGetNodeList()
1098
1099   def _UnlockedGetOnlineNodeList(self):
1100     """Return the list of nodes which are online.
1101
1102     """
1103     all_nodes = [self._UnlockedGetNodeInfo(node)
1104                  for node in self._UnlockedGetNodeList()]
1105     return [node.name for node in all_nodes if not node.offline]
1106
1107   @locking.ssynchronized(_config_lock, shared=1)
1108   def GetOnlineNodeList(self):
1109     """Return the list of nodes which are online.
1110
1111     """
1112     return self._UnlockedGetOnlineNodeList()
1113
1114   @locking.ssynchronized(_config_lock, shared=1)
1115   def GetAllNodesInfo(self):
1116     """Get the configuration of all nodes.
1117
1118     @rtype: dict
1119     @return: dict of (node, node_info), where node_info is what
1120               would GetNodeInfo return for the node
1121
1122     """
1123     my_dict = dict([(node, self._UnlockedGetNodeInfo(node))
1124                     for node in self._UnlockedGetNodeList()])
1125     return my_dict
1126
1127   def _UnlockedGetMasterCandidateStats(self, exceptions=None):
1128     """Get the number of current and maximum desired and possible candidates.
1129
1130     @type exceptions: list
1131     @param exceptions: if passed, list of nodes that should be ignored
1132     @rtype: tuple
1133     @return: tuple of (current, desired and possible, possible)
1134
1135     """
1136     mc_now = mc_should = mc_max = 0
1137     for node in self._config_data.nodes.values():
1138       if exceptions and node.name in exceptions:
1139         continue
1140       if not (node.offline or node.drained):
1141         mc_max += 1
1142       if node.master_candidate:
1143         mc_now += 1
1144     mc_should = min(mc_max, self._config_data.cluster.candidate_pool_size)
1145     return (mc_now, mc_should, mc_max)
1146
1147   @locking.ssynchronized(_config_lock, shared=1)
1148   def GetMasterCandidateStats(self, exceptions=None):
1149     """Get the number of current and maximum possible candidates.
1150
1151     This is just a wrapper over L{_UnlockedGetMasterCandidateStats}.
1152
1153     @type exceptions: list
1154     @param exceptions: if passed, list of nodes that should be ignored
1155     @rtype: tuple
1156     @return: tuple of (current, max)
1157
1158     """
1159     return self._UnlockedGetMasterCandidateStats(exceptions)
1160
1161   @locking.ssynchronized(_config_lock)
1162   def MaintainCandidatePool(self, exceptions):
1163     """Try to grow the candidate pool to the desired size.
1164
1165     @type exceptions: list
1166     @param exceptions: if passed, list of nodes that should be ignored
1167     @rtype: list
1168     @return: list with the adjusted nodes (L{objects.Node} instances)
1169
1170     """
1171     mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats(exceptions)
1172     mod_list = []
1173     if mc_now < mc_max:
1174       node_list = self._config_data.nodes.keys()
1175       random.shuffle(node_list)
1176       for name in node_list:
1177         if mc_now >= mc_max:
1178           break
1179         node = self._config_data.nodes[name]
1180         if (node.master_candidate or node.offline or node.drained or
1181             node.name in exceptions):
1182           continue
1183         mod_list.append(node)
1184         node.master_candidate = True
1185         node.serial_no += 1
1186         mc_now += 1
1187       if mc_now != mc_max:
1188         # this should not happen
1189         logging.warning("Warning: MaintainCandidatePool didn't manage to"
1190                         " fill the candidate pool (%d/%d)", mc_now, mc_max)
1191       if mod_list:
1192         self._config_data.cluster.serial_no += 1
1193         self._WriteConfig()
1194
1195     return mod_list
1196
1197   def _BumpSerialNo(self):
1198     """Bump up the serial number of the config.
1199
1200     """
1201     self._config_data.serial_no += 1
1202     self._config_data.mtime = time.time()
1203
1204   def _AllUUIDObjects(self):
1205     """Returns all objects with uuid attributes.
1206
1207     """
1208     return (self._config_data.instances.values() +
1209             self._config_data.nodes.values() +
1210             [self._config_data.cluster])
1211
1212   def _OpenConfig(self):
1213     """Read the config data from disk.
1214
1215     """
1216     raw_data = utils.ReadFile(self._cfg_file)
1217
1218     try:
1219       data = objects.ConfigData.FromDict(serializer.Load(raw_data))
1220     except Exception, err:
1221       raise errors.ConfigurationError(err)
1222
1223     # Make sure the configuration has the right version
1224     _ValidateConfig(data)
1225
1226     if (not hasattr(data, 'cluster') or
1227         not hasattr(data.cluster, 'rsahostkeypub')):
1228       raise errors.ConfigurationError("Incomplete configuration"
1229                                       " (missing cluster.rsahostkeypub)")
1230
1231     # Upgrade configuration if needed
1232     data.UpgradeConfig()
1233
1234     self._config_data = data
1235     # reset the last serial as -1 so that the next write will cause
1236     # ssconf update
1237     self._last_cluster_serial = -1
1238
1239     # And finally run our (custom) config upgrade sequence
1240     self._UpgradeConfig()
1241
1242   def _UpgradeConfig(self):
1243     """Run upgrade steps that cannot be done purely in the objects.
1244
1245     This is because some data elements need uniqueness across the
1246     whole configuration, etc.
1247
1248     @warning: this function will call L{_WriteConfig()}, but also
1249         L{DropECReservations} so it needs to be called only from a
1250         "safe" place (the constructor). If one wanted to call it with
1251         the lock held, a DropECReservationUnlocked would need to be
1252         created first, to avoid causing deadlock.
1253
1254     """
1255     modified = False
1256     for item in self._AllUUIDObjects():
1257       if item.uuid is None:
1258         item.uuid = self._GenerateUniqueID(_UPGRADE_CONFIG_JID)
1259         modified = True
1260     if modified:
1261       self._WriteConfig()
1262       # This is ok even if it acquires the internal lock, as _UpgradeConfig is
1263       # only called at config init time, without the lock held
1264       self.DropECReservations(_UPGRADE_CONFIG_JID)
1265
1266   def _DistributeConfig(self, feedback_fn):
1267     """Distribute the configuration to the other nodes.
1268
1269     Currently, this only copies the configuration file. In the future,
1270     it could be used to encapsulate the 2/3-phase update mechanism.
1271
1272     """
1273     if self._offline:
1274       return True
1275
1276     bad = False
1277
1278     node_list = []
1279     addr_list = []
1280     myhostname = self._my_hostname
1281     # we can skip checking whether _UnlockedGetNodeInfo returns None
1282     # since the node list comes from _UnlocketGetNodeList, and we are
1283     # called with the lock held, so no modifications should take place
1284     # in between
1285     for node_name in self._UnlockedGetNodeList():
1286       if node_name == myhostname:
1287         continue
1288       node_info = self._UnlockedGetNodeInfo(node_name)
1289       if not node_info.master_candidate:
1290         continue
1291       node_list.append(node_info.name)
1292       addr_list.append(node_info.primary_ip)
1293
1294     result = rpc.RpcRunner.call_upload_file(node_list, self._cfg_file,
1295                                             address_list=addr_list)
1296     for to_node, to_result in result.items():
1297       msg = to_result.fail_msg
1298       if msg:
1299         msg = ("Copy of file %s to node %s failed: %s" %
1300                (self._cfg_file, to_node, msg))
1301         logging.error(msg)
1302
1303         if feedback_fn:
1304           feedback_fn(msg)
1305
1306         bad = True
1307
1308     return not bad
1309
1310   def _WriteConfig(self, destination=None, feedback_fn=None):
1311     """Write the configuration data to persistent storage.
1312
1313     """
1314     assert feedback_fn is None or callable(feedback_fn)
1315
1316     # Warn on config errors, but don't abort the save - the
1317     # configuration has already been modified, and we can't revert;
1318     # the best we can do is to warn the user and save as is, leaving
1319     # recovery to the user
1320     config_errors = self._UnlockedVerifyConfig()
1321     if config_errors:
1322       errmsg = ("Configuration data is not consistent: %s" %
1323                 (utils.CommaJoin(config_errors)))
1324       logging.critical(errmsg)
1325       if feedback_fn:
1326         feedback_fn(errmsg)
1327
1328     if destination is None:
1329       destination = self._cfg_file
1330     self._BumpSerialNo()
1331     txt = serializer.Dump(self._config_data.ToDict())
1332
1333     utils.WriteFile(destination, data=txt)
1334
1335     self.write_count += 1
1336
1337     # and redistribute the config file to master candidates
1338     self._DistributeConfig(feedback_fn)
1339
1340     # Write ssconf files on all nodes (including locally)
1341     if self._last_cluster_serial < self._config_data.cluster.serial_no:
1342       if not self._offline:
1343         result = rpc.RpcRunner.call_write_ssconf_files(
1344           self._UnlockedGetOnlineNodeList(),
1345           self._UnlockedGetSsconfValues())
1346
1347         for nname, nresu in result.items():
1348           msg = nresu.fail_msg
1349           if msg:
1350             errmsg = ("Error while uploading ssconf files to"
1351                       " node %s: %s" % (nname, msg))
1352             logging.warning(errmsg)
1353
1354             if feedback_fn:
1355               feedback_fn(errmsg)
1356
1357       self._last_cluster_serial = self._config_data.cluster.serial_no
1358
1359   def _UnlockedGetSsconfValues(self):
1360     """Return the values needed by ssconf.
1361
1362     @rtype: dict
1363     @return: a dictionary with keys the ssconf names and values their
1364         associated value
1365
1366     """
1367     fn = "\n".join
1368     instance_names = utils.NiceSort(self._UnlockedGetInstanceList())
1369     node_names = utils.NiceSort(self._UnlockedGetNodeList())
1370     node_info = [self._UnlockedGetNodeInfo(name) for name in node_names]
1371     node_pri_ips = ["%s %s" % (ninfo.name, ninfo.primary_ip)
1372                     for ninfo in node_info]
1373     node_snd_ips = ["%s %s" % (ninfo.name, ninfo.secondary_ip)
1374                     for ninfo in node_info]
1375
1376     instance_data = fn(instance_names)
1377     off_data = fn(node.name for node in node_info if node.offline)
1378     on_data = fn(node.name for node in node_info if not node.offline)
1379     mc_data = fn(node.name for node in node_info if node.master_candidate)
1380     mc_ips_data = fn(node.primary_ip for node in node_info
1381                      if node.master_candidate)
1382     node_data = fn(node_names)
1383     node_pri_ips_data = fn(node_pri_ips)
1384     node_snd_ips_data = fn(node_snd_ips)
1385
1386     cluster = self._config_data.cluster
1387     cluster_tags = fn(cluster.GetTags())
1388
1389     hypervisor_list = fn(cluster.enabled_hypervisors)
1390
1391     uid_pool = uidpool.FormatUidPool(cluster.uid_pool, separator="\n")
1392
1393     return {
1394       constants.SS_CLUSTER_NAME: cluster.cluster_name,
1395       constants.SS_CLUSTER_TAGS: cluster_tags,
1396       constants.SS_FILE_STORAGE_DIR: cluster.file_storage_dir,
1397       constants.SS_MASTER_CANDIDATES: mc_data,
1398       constants.SS_MASTER_CANDIDATES_IPS: mc_ips_data,
1399       constants.SS_MASTER_IP: cluster.master_ip,
1400       constants.SS_MASTER_NETDEV: cluster.master_netdev,
1401       constants.SS_MASTER_NODE: cluster.master_node,
1402       constants.SS_NODE_LIST: node_data,
1403       constants.SS_NODE_PRIMARY_IPS: node_pri_ips_data,
1404       constants.SS_NODE_SECONDARY_IPS: node_snd_ips_data,
1405       constants.SS_OFFLINE_NODES: off_data,
1406       constants.SS_ONLINE_NODES: on_data,
1407       constants.SS_INSTANCE_LIST: instance_data,
1408       constants.SS_RELEASE_VERSION: constants.RELEASE_VERSION,
1409       constants.SS_HYPERVISOR_LIST: hypervisor_list,
1410       constants.SS_MAINTAIN_NODE_HEALTH: str(cluster.maintain_node_health),
1411       constants.SS_UID_POOL: uid_pool,
1412       }
1413
1414   @locking.ssynchronized(_config_lock, shared=1)
1415   def GetVGName(self):
1416     """Return the volume group name.
1417
1418     """
1419     return self._config_data.cluster.volume_group_name
1420
1421   @locking.ssynchronized(_config_lock)
1422   def SetVGName(self, vg_name):
1423     """Set the volume group name.
1424
1425     """
1426     self._config_data.cluster.volume_group_name = vg_name
1427     self._config_data.cluster.serial_no += 1
1428     self._WriteConfig()
1429
1430   @locking.ssynchronized(_config_lock, shared=1)
1431   def GetDRBDHelper(self):
1432     """Return DRBD usermode helper.
1433
1434     """
1435     return self._config_data.cluster.drbd_usermode_helper
1436
1437   @locking.ssynchronized(_config_lock)
1438   def SetDRBDHelper(self, drbd_helper):
1439     """Set DRBD usermode helper.
1440
1441     """
1442     self._config_data.cluster.drbd_usermode_helper = drbd_helper
1443     self._config_data.cluster.serial_no += 1
1444     self._WriteConfig()
1445
1446   @locking.ssynchronized(_config_lock, shared=1)
1447   def GetMACPrefix(self):
1448     """Return the mac prefix.
1449
1450     """
1451     return self._config_data.cluster.mac_prefix
1452
1453   @locking.ssynchronized(_config_lock, shared=1)
1454   def GetClusterInfo(self):
1455     """Returns information about the cluster
1456
1457     @rtype: L{objects.Cluster}
1458     @return: the cluster object
1459
1460     """
1461     return self._config_data.cluster
1462
1463   @locking.ssynchronized(_config_lock, shared=1)
1464   def HasAnyDiskOfType(self, dev_type):
1465     """Check if in there is at disk of the given type in the configuration.
1466
1467     """
1468     return self._config_data.HasAnyDiskOfType(dev_type)
1469
1470   @locking.ssynchronized(_config_lock)
1471   def Update(self, target, feedback_fn):
1472     """Notify function to be called after updates.
1473
1474     This function must be called when an object (as returned by
1475     GetInstanceInfo, GetNodeInfo, GetCluster) has been updated and the
1476     caller wants the modifications saved to the backing store. Note
1477     that all modified objects will be saved, but the target argument
1478     is the one the caller wants to ensure that it's saved.
1479
1480     @param target: an instance of either L{objects.Cluster},
1481         L{objects.Node} or L{objects.Instance} which is existing in
1482         the cluster
1483     @param feedback_fn: Callable feedback function
1484
1485     """
1486     if self._config_data is None:
1487       raise errors.ProgrammerError("Configuration file not read,"
1488                                    " cannot save.")
1489     update_serial = False
1490     if isinstance(target, objects.Cluster):
1491       test = target == self._config_data.cluster
1492     elif isinstance(target, objects.Node):
1493       test = target in self._config_data.nodes.values()
1494       update_serial = True
1495     elif isinstance(target, objects.Instance):
1496       test = target in self._config_data.instances.values()
1497     else:
1498       raise errors.ProgrammerError("Invalid object type (%s) passed to"
1499                                    " ConfigWriter.Update" % type(target))
1500     if not test:
1501       raise errors.ConfigurationError("Configuration updated since object"
1502                                       " has been read or unknown object")
1503     target.serial_no += 1
1504     target.mtime = now = time.time()
1505
1506     if update_serial:
1507       # for node updates, we need to increase the cluster serial too
1508       self._config_data.cluster.serial_no += 1
1509       self._config_data.cluster.mtime = now
1510
1511     if isinstance(target, objects.Instance):
1512       self._UnlockedReleaseDRBDMinors(target.name)
1513
1514     self._WriteConfig(feedback_fn=feedback_fn)
1515
1516   @locking.ssynchronized(_config_lock)
1517   def DropECReservations(self, ec_id):
1518     """Drop per-execution-context reservations
1519
1520     """
1521     for rm in self._all_rms:
1522       rm.DropECReservations(ec_id)