workerpool: Add support for task priority
[ganeti-local] / lib / config.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Configuration management for Ganeti
23
24 This module provides the interface to the Ganeti cluster configuration.
25
26 The configuration data is stored on every node but is updated on the master
27 only. After each update, the master distributes the data to the other nodes.
28
29 Currently, the data storage format is JSON. YAML was slow and consuming too
30 much memory.
31
32 """
33
34 # pylint: disable-msg=R0904
35 # R0904: Too many public methods
36
37 import os
38 import random
39 import logging
40 import time
41
42 from ganeti import errors
43 from ganeti import locking
44 from ganeti import utils
45 from ganeti import constants
46 from ganeti import rpc
47 from ganeti import objects
48 from ganeti import serializer
49 from ganeti import uidpool
50 from ganeti import netutils
51
52
53 _config_lock = locking.SharedLock("ConfigWriter")
54
55 # job id used for resource management at config upgrade time
56 _UPGRADE_CONFIG_JID = "jid-cfg-upgrade"
57
58
59 def _ValidateConfig(data):
60   """Verifies that a configuration objects looks valid.
61
62   This only verifies the version of the configuration.
63
64   @raise errors.ConfigurationError: if the version differs from what
65       we expect
66
67   """
68   if data.version != constants.CONFIG_VERSION:
69     raise errors.ConfigurationError("Cluster configuration version"
70                                     " mismatch, got %s instead of %s" %
71                                     (data.version,
72                                      constants.CONFIG_VERSION))
73
74
75 class TemporaryReservationManager:
76   """A temporary resource reservation manager.
77
78   This is used to reserve resources in a job, before using them, making sure
79   other jobs cannot get them in the meantime.
80
81   """
82   def __init__(self):
83     self._ec_reserved = {}
84
85   def Reserved(self, resource):
86     for holder_reserved in self._ec_reserved.items():
87       if resource in holder_reserved:
88         return True
89     return False
90
91   def Reserve(self, ec_id, resource):
92     if self.Reserved(resource):
93       raise errors.ReservationError("Duplicate reservation for resource: %s." %
94                                     (resource))
95     if ec_id not in self._ec_reserved:
96       self._ec_reserved[ec_id] = set([resource])
97     else:
98       self._ec_reserved[ec_id].add(resource)
99
100   def DropECReservations(self, ec_id):
101     if ec_id in self._ec_reserved:
102       del self._ec_reserved[ec_id]
103
104   def GetReserved(self):
105     all_reserved = set()
106     for holder_reserved in self._ec_reserved.values():
107       all_reserved.update(holder_reserved)
108     return all_reserved
109
110   def Generate(self, existing, generate_one_fn, ec_id):
111     """Generate a new resource of this type
112
113     """
114     assert callable(generate_one_fn)
115
116     all_elems = self.GetReserved()
117     all_elems.update(existing)
118     retries = 64
119     while retries > 0:
120       new_resource = generate_one_fn()
121       if new_resource is not None and new_resource not in all_elems:
122         break
123     else:
124       raise errors.ConfigurationError("Not able generate new resource"
125                                       " (last tried: %s)" % new_resource)
126     self.Reserve(ec_id, new_resource)
127     return new_resource
128
129
130 class ConfigWriter:
131   """The interface to the cluster configuration.
132
133   @ivar _temporary_lvs: reservation manager for temporary LVs
134   @ivar _all_rms: a list of all temporary reservation managers
135
136   """
137   def __init__(self, cfg_file=None, offline=False):
138     self.write_count = 0
139     self._lock = _config_lock
140     self._config_data = None
141     self._offline = offline
142     if cfg_file is None:
143       self._cfg_file = constants.CLUSTER_CONF_FILE
144     else:
145       self._cfg_file = cfg_file
146     self._temporary_ids = TemporaryReservationManager()
147     self._temporary_drbds = {}
148     self._temporary_macs = TemporaryReservationManager()
149     self._temporary_secrets = TemporaryReservationManager()
150     self._temporary_lvs = TemporaryReservationManager()
151     self._all_rms = [self._temporary_ids, self._temporary_macs,
152                      self._temporary_secrets, self._temporary_lvs]
153     # Note: in order to prevent errors when resolving our name in
154     # _DistributeConfig, we compute it here once and reuse it; it's
155     # better to raise an error before starting to modify the config
156     # file than after it was modified
157     self._my_hostname = netutils.Hostname.GetSysName()
158     self._last_cluster_serial = -1
159     self._OpenConfig()
160
161   # this method needs to be static, so that we can call it on the class
162   @staticmethod
163   def IsCluster():
164     """Check if the cluster is configured.
165
166     """
167     return os.path.exists(constants.CLUSTER_CONF_FILE)
168
169   def _GenerateOneMAC(self):
170     """Generate one mac address
171
172     """
173     prefix = self._config_data.cluster.mac_prefix
174     byte1 = random.randrange(0, 256)
175     byte2 = random.randrange(0, 256)
176     byte3 = random.randrange(0, 256)
177     mac = "%s:%02x:%02x:%02x" % (prefix, byte1, byte2, byte3)
178     return mac
179
180   @locking.ssynchronized(_config_lock, shared=1)
181   def GenerateMAC(self, ec_id):
182     """Generate a MAC for an instance.
183
184     This should check the current instances for duplicates.
185
186     """
187     existing = self._AllMACs()
188     return self._temporary_ids.Generate(existing, self._GenerateOneMAC, ec_id)
189
190   @locking.ssynchronized(_config_lock, shared=1)
191   def ReserveMAC(self, mac, ec_id):
192     """Reserve a MAC for an instance.
193
194     This only checks instances managed by this cluster, it does not
195     check for potential collisions elsewhere.
196
197     """
198     all_macs = self._AllMACs()
199     if mac in all_macs:
200       raise errors.ReservationError("mac already in use")
201     else:
202       self._temporary_macs.Reserve(mac, ec_id)
203
204   @locking.ssynchronized(_config_lock, shared=1)
205   def ReserveLV(self, lv_name, ec_id):
206     """Reserve an VG/LV pair for an instance.
207
208     @type lv_name: string
209     @param lv_name: the logical volume name to reserve
210
211     """
212     all_lvs = self._AllLVs()
213     if lv_name in all_lvs:
214       raise errors.ReservationError("LV already in use")
215     else:
216       self._temporary_lvs.Reserve(lv_name, ec_id)
217
218   @locking.ssynchronized(_config_lock, shared=1)
219   def GenerateDRBDSecret(self, ec_id):
220     """Generate a DRBD secret.
221
222     This checks the current disks for duplicates.
223
224     """
225     return self._temporary_secrets.Generate(self._AllDRBDSecrets(),
226                                             utils.GenerateSecret,
227                                             ec_id)
228
229   def _AllLVs(self):
230     """Compute the list of all LVs.
231
232     """
233     lvnames = set()
234     for instance in self._config_data.instances.values():
235       node_data = instance.MapLVsByNode()
236       for lv_list in node_data.values():
237         lvnames.update(lv_list)
238     return lvnames
239
240   def _AllIDs(self, include_temporary):
241     """Compute the list of all UUIDs and names we have.
242
243     @type include_temporary: boolean
244     @param include_temporary: whether to include the _temporary_ids set
245     @rtype: set
246     @return: a set of IDs
247
248     """
249     existing = set()
250     if include_temporary:
251       existing.update(self._temporary_ids.GetReserved())
252     existing.update(self._AllLVs())
253     existing.update(self._config_data.instances.keys())
254     existing.update(self._config_data.nodes.keys())
255     existing.update([i.uuid for i in self._AllUUIDObjects() if i.uuid])
256     return existing
257
258   def _GenerateUniqueID(self, ec_id):
259     """Generate an unique UUID.
260
261     This checks the current node, instances and disk names for
262     duplicates.
263
264     @rtype: string
265     @return: the unique id
266
267     """
268     existing = self._AllIDs(include_temporary=False)
269     return self._temporary_ids.Generate(existing, utils.NewUUID, ec_id)
270
271   @locking.ssynchronized(_config_lock, shared=1)
272   def GenerateUniqueID(self, ec_id):
273     """Generate an unique ID.
274
275     This is just a wrapper over the unlocked version.
276
277     @type ec_id: string
278     @param ec_id: unique id for the job to reserve the id to
279
280     """
281     return self._GenerateUniqueID(ec_id)
282
283   def _AllMACs(self):
284     """Return all MACs present in the config.
285
286     @rtype: list
287     @return: the list of all MACs
288
289     """
290     result = []
291     for instance in self._config_data.instances.values():
292       for nic in instance.nics:
293         result.append(nic.mac)
294
295     return result
296
297   def _AllDRBDSecrets(self):
298     """Return all DRBD secrets present in the config.
299
300     @rtype: list
301     @return: the list of all DRBD secrets
302
303     """
304     def helper(disk, result):
305       """Recursively gather secrets from this disk."""
306       if disk.dev_type == constants.DT_DRBD8:
307         result.append(disk.logical_id[5])
308       if disk.children:
309         for child in disk.children:
310           helper(child, result)
311
312     result = []
313     for instance in self._config_data.instances.values():
314       for disk in instance.disks:
315         helper(disk, result)
316
317     return result
318
319   def _CheckDiskIDs(self, disk, l_ids, p_ids):
320     """Compute duplicate disk IDs
321
322     @type disk: L{objects.Disk}
323     @param disk: the disk at which to start searching
324     @type l_ids: list
325     @param l_ids: list of current logical ids
326     @type p_ids: list
327     @param p_ids: list of current physical ids
328     @rtype: list
329     @return: a list of error messages
330
331     """
332     result = []
333     if disk.logical_id is not None:
334       if disk.logical_id in l_ids:
335         result.append("duplicate logical id %s" % str(disk.logical_id))
336       else:
337         l_ids.append(disk.logical_id)
338     if disk.physical_id is not None:
339       if disk.physical_id in p_ids:
340         result.append("duplicate physical id %s" % str(disk.physical_id))
341       else:
342         p_ids.append(disk.physical_id)
343
344     if disk.children:
345       for child in disk.children:
346         result.extend(self._CheckDiskIDs(child, l_ids, p_ids))
347     return result
348
349   def _UnlockedVerifyConfig(self):
350     """Verify function.
351
352     @rtype: list
353     @return: a list of error messages; a non-empty list signifies
354         configuration errors
355
356     """
357     result = []
358     seen_macs = []
359     ports = {}
360     data = self._config_data
361     seen_lids = []
362     seen_pids = []
363
364     # global cluster checks
365     if not data.cluster.enabled_hypervisors:
366       result.append("enabled hypervisors list doesn't have any entries")
367     invalid_hvs = set(data.cluster.enabled_hypervisors) - constants.HYPER_TYPES
368     if invalid_hvs:
369       result.append("enabled hypervisors contains invalid entries: %s" %
370                     invalid_hvs)
371     missing_hvp = (set(data.cluster.enabled_hypervisors) -
372                    set(data.cluster.hvparams.keys()))
373     if missing_hvp:
374       result.append("hypervisor parameters missing for the enabled"
375                     " hypervisor(s) %s" % utils.CommaJoin(missing_hvp))
376
377     if data.cluster.master_node not in data.nodes:
378       result.append("cluster has invalid primary node '%s'" %
379                     data.cluster.master_node)
380
381     # per-instance checks
382     for instance_name in data.instances:
383       instance = data.instances[instance_name]
384       if instance.name != instance_name:
385         result.append("instance '%s' is indexed by wrong name '%s'" %
386                       (instance.name, instance_name))
387       if instance.primary_node not in data.nodes:
388         result.append("instance '%s' has invalid primary node '%s'" %
389                       (instance_name, instance.primary_node))
390       for snode in instance.secondary_nodes:
391         if snode not in data.nodes:
392           result.append("instance '%s' has invalid secondary node '%s'" %
393                         (instance_name, snode))
394       for idx, nic in enumerate(instance.nics):
395         if nic.mac in seen_macs:
396           result.append("instance '%s' has NIC %d mac %s duplicate" %
397                         (instance_name, idx, nic.mac))
398         else:
399           seen_macs.append(nic.mac)
400
401       # gather the drbd ports for duplicate checks
402       for dsk in instance.disks:
403         if dsk.dev_type in constants.LDS_DRBD:
404           tcp_port = dsk.logical_id[2]
405           if tcp_port not in ports:
406             ports[tcp_port] = []
407           ports[tcp_port].append((instance.name, "drbd disk %s" % dsk.iv_name))
408       # gather network port reservation
409       net_port = getattr(instance, "network_port", None)
410       if net_port is not None:
411         if net_port not in ports:
412           ports[net_port] = []
413         ports[net_port].append((instance.name, "network port"))
414
415       # instance disk verify
416       for idx, disk in enumerate(instance.disks):
417         result.extend(["instance '%s' disk %d error: %s" %
418                        (instance.name, idx, msg) for msg in disk.Verify()])
419         result.extend(self._CheckDiskIDs(disk, seen_lids, seen_pids))
420
421     # cluster-wide pool of free ports
422     for free_port in data.cluster.tcpudp_port_pool:
423       if free_port not in ports:
424         ports[free_port] = []
425       ports[free_port].append(("cluster", "port marked as free"))
426
427     # compute tcp/udp duplicate ports
428     keys = ports.keys()
429     keys.sort()
430     for pnum in keys:
431       pdata = ports[pnum]
432       if len(pdata) > 1:
433         txt = utils.CommaJoin(["%s/%s" % val for val in pdata])
434         result.append("tcp/udp port %s has duplicates: %s" % (pnum, txt))
435
436     # highest used tcp port check
437     if keys:
438       if keys[-1] > data.cluster.highest_used_port:
439         result.append("Highest used port mismatch, saved %s, computed %s" %
440                       (data.cluster.highest_used_port, keys[-1]))
441
442     if not data.nodes[data.cluster.master_node].master_candidate:
443       result.append("Master node is not a master candidate")
444
445     # master candidate checks
446     mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats()
447     if mc_now < mc_max:
448       result.append("Not enough master candidates: actual %d, target %d" %
449                     (mc_now, mc_max))
450
451     # node checks
452     for node_name, node in data.nodes.items():
453       if node.name != node_name:
454         result.append("Node '%s' is indexed by wrong name '%s'" %
455                       (node.name, node_name))
456       if [node.master_candidate, node.drained, node.offline].count(True) > 1:
457         result.append("Node %s state is invalid: master_candidate=%s,"
458                       " drain=%s, offline=%s" %
459                       (node.name, node.master_candidate, node.drain,
460                        node.offline))
461
462     # drbd minors check
463     _, duplicates = self._UnlockedComputeDRBDMap()
464     for node, minor, instance_a, instance_b in duplicates:
465       result.append("DRBD minor %d on node %s is assigned twice to instances"
466                     " %s and %s" % (minor, node, instance_a, instance_b))
467
468     # IP checks
469     default_nicparams = data.cluster.nicparams[constants.PP_DEFAULT]
470     ips = {}
471
472     def _AddIpAddress(ip, name):
473       ips.setdefault(ip, []).append(name)
474
475     _AddIpAddress(data.cluster.master_ip, "cluster_ip")
476
477     for node in data.nodes.values():
478       _AddIpAddress(node.primary_ip, "node:%s/primary" % node.name)
479       if node.secondary_ip != node.primary_ip:
480         _AddIpAddress(node.secondary_ip, "node:%s/secondary" % node.name)
481
482     for instance in data.instances.values():
483       for idx, nic in enumerate(instance.nics):
484         if nic.ip is None:
485           continue
486
487         nicparams = objects.FillDict(default_nicparams, nic.nicparams)
488         nic_mode = nicparams[constants.NIC_MODE]
489         nic_link = nicparams[constants.NIC_LINK]
490
491         if nic_mode == constants.NIC_MODE_BRIDGED:
492           link = "bridge:%s" % nic_link
493         elif nic_mode == constants.NIC_MODE_ROUTED:
494           link = "route:%s" % nic_link
495         else:
496           raise errors.ProgrammerError("NIC mode '%s' not handled" % nic_mode)
497
498         _AddIpAddress("%s/%s" % (link, nic.ip),
499                       "instance:%s/nic:%d" % (instance.name, idx))
500
501     for ip, owners in ips.items():
502       if len(owners) > 1:
503         result.append("IP address %s is used by multiple owners: %s" %
504                       (ip, utils.CommaJoin(owners)))
505
506     return result
507
508   @locking.ssynchronized(_config_lock, shared=1)
509   def VerifyConfig(self):
510     """Verify function.
511
512     This is just a wrapper over L{_UnlockedVerifyConfig}.
513
514     @rtype: list
515     @return: a list of error messages; a non-empty list signifies
516         configuration errors
517
518     """
519     return self._UnlockedVerifyConfig()
520
521   def _UnlockedSetDiskID(self, disk, node_name):
522     """Convert the unique ID to the ID needed on the target nodes.
523
524     This is used only for drbd, which needs ip/port configuration.
525
526     The routine descends down and updates its children also, because
527     this helps when the only the top device is passed to the remote
528     node.
529
530     This function is for internal use, when the config lock is already held.
531
532     """
533     if disk.children:
534       for child in disk.children:
535         self._UnlockedSetDiskID(child, node_name)
536
537     if disk.logical_id is None and disk.physical_id is not None:
538       return
539     if disk.dev_type == constants.LD_DRBD8:
540       pnode, snode, port, pminor, sminor, secret = disk.logical_id
541       if node_name not in (pnode, snode):
542         raise errors.ConfigurationError("DRBD device not knowing node %s" %
543                                         node_name)
544       pnode_info = self._UnlockedGetNodeInfo(pnode)
545       snode_info = self._UnlockedGetNodeInfo(snode)
546       if pnode_info is None or snode_info is None:
547         raise errors.ConfigurationError("Can't find primary or secondary node"
548                                         " for %s" % str(disk))
549       p_data = (pnode_info.secondary_ip, port)
550       s_data = (snode_info.secondary_ip, port)
551       if pnode == node_name:
552         disk.physical_id = p_data + s_data + (pminor, secret)
553       else: # it must be secondary, we tested above
554         disk.physical_id = s_data + p_data + (sminor, secret)
555     else:
556       disk.physical_id = disk.logical_id
557     return
558
559   @locking.ssynchronized(_config_lock)
560   def SetDiskID(self, disk, node_name):
561     """Convert the unique ID to the ID needed on the target nodes.
562
563     This is used only for drbd, which needs ip/port configuration.
564
565     The routine descends down and updates its children also, because
566     this helps when the only the top device is passed to the remote
567     node.
568
569     """
570     return self._UnlockedSetDiskID(disk, node_name)
571
572   @locking.ssynchronized(_config_lock)
573   def AddTcpUdpPort(self, port):
574     """Adds a new port to the available port pool.
575
576     """
577     if not isinstance(port, int):
578       raise errors.ProgrammerError("Invalid type passed for port")
579
580     self._config_data.cluster.tcpudp_port_pool.add(port)
581     self._WriteConfig()
582
583   @locking.ssynchronized(_config_lock, shared=1)
584   def GetPortList(self):
585     """Returns a copy of the current port list.
586
587     """
588     return self._config_data.cluster.tcpudp_port_pool.copy()
589
590   @locking.ssynchronized(_config_lock)
591   def AllocatePort(self):
592     """Allocate a port.
593
594     The port will be taken from the available port pool or from the
595     default port range (and in this case we increase
596     highest_used_port).
597
598     """
599     # If there are TCP/IP ports configured, we use them first.
600     if self._config_data.cluster.tcpudp_port_pool:
601       port = self._config_data.cluster.tcpudp_port_pool.pop()
602     else:
603       port = self._config_data.cluster.highest_used_port + 1
604       if port >= constants.LAST_DRBD_PORT:
605         raise errors.ConfigurationError("The highest used port is greater"
606                                         " than %s. Aborting." %
607                                         constants.LAST_DRBD_PORT)
608       self._config_data.cluster.highest_used_port = port
609
610     self._WriteConfig()
611     return port
612
613   def _UnlockedComputeDRBDMap(self):
614     """Compute the used DRBD minor/nodes.
615
616     @rtype: (dict, list)
617     @return: dictionary of node_name: dict of minor: instance_name;
618         the returned dict will have all the nodes in it (even if with
619         an empty list), and a list of duplicates; if the duplicates
620         list is not empty, the configuration is corrupted and its caller
621         should raise an exception
622
623     """
624     def _AppendUsedPorts(instance_name, disk, used):
625       duplicates = []
626       if disk.dev_type == constants.LD_DRBD8 and len(disk.logical_id) >= 5:
627         node_a, node_b, _, minor_a, minor_b = disk.logical_id[:5]
628         for node, port in ((node_a, minor_a), (node_b, minor_b)):
629           assert node in used, ("Node '%s' of instance '%s' not found"
630                                 " in node list" % (node, instance_name))
631           if port in used[node]:
632             duplicates.append((node, port, instance_name, used[node][port]))
633           else:
634             used[node][port] = instance_name
635       if disk.children:
636         for child in disk.children:
637           duplicates.extend(_AppendUsedPorts(instance_name, child, used))
638       return duplicates
639
640     duplicates = []
641     my_dict = dict((node, {}) for node in self._config_data.nodes)
642     for instance in self._config_data.instances.itervalues():
643       for disk in instance.disks:
644         duplicates.extend(_AppendUsedPorts(instance.name, disk, my_dict))
645     for (node, minor), instance in self._temporary_drbds.iteritems():
646       if minor in my_dict[node] and my_dict[node][minor] != instance:
647         duplicates.append((node, minor, instance, my_dict[node][minor]))
648       else:
649         my_dict[node][minor] = instance
650     return my_dict, duplicates
651
652   @locking.ssynchronized(_config_lock)
653   def ComputeDRBDMap(self):
654     """Compute the used DRBD minor/nodes.
655
656     This is just a wrapper over L{_UnlockedComputeDRBDMap}.
657
658     @return: dictionary of node_name: dict of minor: instance_name;
659         the returned dict will have all the nodes in it (even if with
660         an empty list).
661
662     """
663     d_map, duplicates = self._UnlockedComputeDRBDMap()
664     if duplicates:
665       raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
666                                       str(duplicates))
667     return d_map
668
669   @locking.ssynchronized(_config_lock)
670   def AllocateDRBDMinor(self, nodes, instance):
671     """Allocate a drbd minor.
672
673     The free minor will be automatically computed from the existing
674     devices. A node can be given multiple times in order to allocate
675     multiple minors. The result is the list of minors, in the same
676     order as the passed nodes.
677
678     @type instance: string
679     @param instance: the instance for which we allocate minors
680
681     """
682     assert isinstance(instance, basestring), \
683            "Invalid argument '%s' passed to AllocateDRBDMinor" % instance
684
685     d_map, duplicates = self._UnlockedComputeDRBDMap()
686     if duplicates:
687       raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
688                                       str(duplicates))
689     result = []
690     for nname in nodes:
691       ndata = d_map[nname]
692       if not ndata:
693         # no minors used, we can start at 0
694         result.append(0)
695         ndata[0] = instance
696         self._temporary_drbds[(nname, 0)] = instance
697         continue
698       keys = ndata.keys()
699       keys.sort()
700       ffree = utils.FirstFree(keys)
701       if ffree is None:
702         # return the next minor
703         # TODO: implement high-limit check
704         minor = keys[-1] + 1
705       else:
706         minor = ffree
707       # double-check minor against current instances
708       assert minor not in d_map[nname], \
709              ("Attempt to reuse allocated DRBD minor %d on node %s,"
710               " already allocated to instance %s" %
711               (minor, nname, d_map[nname][minor]))
712       ndata[minor] = instance
713       # double-check minor against reservation
714       r_key = (nname, minor)
715       assert r_key not in self._temporary_drbds, \
716              ("Attempt to reuse reserved DRBD minor %d on node %s,"
717               " reserved for instance %s" %
718               (minor, nname, self._temporary_drbds[r_key]))
719       self._temporary_drbds[r_key] = instance
720       result.append(minor)
721     logging.debug("Request to allocate drbd minors, input: %s, returning %s",
722                   nodes, result)
723     return result
724
725   def _UnlockedReleaseDRBDMinors(self, instance):
726     """Release temporary drbd minors allocated for a given instance.
727
728     @type instance: string
729     @param instance: the instance for which temporary minors should be
730                      released
731
732     """
733     assert isinstance(instance, basestring), \
734            "Invalid argument passed to ReleaseDRBDMinors"
735     for key, name in self._temporary_drbds.items():
736       if name == instance:
737         del self._temporary_drbds[key]
738
739   @locking.ssynchronized(_config_lock)
740   def ReleaseDRBDMinors(self, instance):
741     """Release temporary drbd minors allocated for a given instance.
742
743     This should be called on the error paths, on the success paths
744     it's automatically called by the ConfigWriter add and update
745     functions.
746
747     This function is just a wrapper over L{_UnlockedReleaseDRBDMinors}.
748
749     @type instance: string
750     @param instance: the instance for which temporary minors should be
751                      released
752
753     """
754     self._UnlockedReleaseDRBDMinors(instance)
755
756   @locking.ssynchronized(_config_lock, shared=1)
757   def GetConfigVersion(self):
758     """Get the configuration version.
759
760     @return: Config version
761
762     """
763     return self._config_data.version
764
765   @locking.ssynchronized(_config_lock, shared=1)
766   def GetClusterName(self):
767     """Get cluster name.
768
769     @return: Cluster name
770
771     """
772     return self._config_data.cluster.cluster_name
773
774   @locking.ssynchronized(_config_lock, shared=1)
775   def GetMasterNode(self):
776     """Get the hostname of the master node for this cluster.
777
778     @return: Master hostname
779
780     """
781     return self._config_data.cluster.master_node
782
783   @locking.ssynchronized(_config_lock, shared=1)
784   def GetMasterIP(self):
785     """Get the IP of the master node for this cluster.
786
787     @return: Master IP
788
789     """
790     return self._config_data.cluster.master_ip
791
792   @locking.ssynchronized(_config_lock, shared=1)
793   def GetMasterNetdev(self):
794     """Get the master network device for this cluster.
795
796     """
797     return self._config_data.cluster.master_netdev
798
799   @locking.ssynchronized(_config_lock, shared=1)
800   def GetFileStorageDir(self):
801     """Get the file storage dir for this cluster.
802
803     """
804     return self._config_data.cluster.file_storage_dir
805
806   @locking.ssynchronized(_config_lock, shared=1)
807   def GetHypervisorType(self):
808     """Get the hypervisor type for this cluster.
809
810     """
811     return self._config_data.cluster.enabled_hypervisors[0]
812
813   @locking.ssynchronized(_config_lock, shared=1)
814   def GetHostKey(self):
815     """Return the rsa hostkey from the config.
816
817     @rtype: string
818     @return: the rsa hostkey
819
820     """
821     return self._config_data.cluster.rsahostkeypub
822
823   @locking.ssynchronized(_config_lock, shared=1)
824   def GetDefaultIAllocator(self):
825     """Get the default instance allocator for this cluster.
826
827     """
828     return self._config_data.cluster.default_iallocator
829
830   @locking.ssynchronized(_config_lock, shared=1)
831   def GetPrimaryIPFamily(self):
832     """Get cluster primary ip family.
833
834     @return: primary ip family
835
836     """
837     return self._config_data.cluster.primary_ip_family
838
839   @locking.ssynchronized(_config_lock)
840   def AddInstance(self, instance, ec_id):
841     """Add an instance to the config.
842
843     This should be used after creating a new instance.
844
845     @type instance: L{objects.Instance}
846     @param instance: the instance object
847
848     """
849     if not isinstance(instance, objects.Instance):
850       raise errors.ProgrammerError("Invalid type passed to AddInstance")
851
852     if instance.disk_template != constants.DT_DISKLESS:
853       all_lvs = instance.MapLVsByNode()
854       logging.info("Instance '%s' DISK_LAYOUT: %s", instance.name, all_lvs)
855
856     all_macs = self._AllMACs()
857     for nic in instance.nics:
858       if nic.mac in all_macs:
859         raise errors.ConfigurationError("Cannot add instance %s:"
860                                         " MAC address '%s' already in use." %
861                                         (instance.name, nic.mac))
862
863     self._EnsureUUID(instance, ec_id)
864
865     instance.serial_no = 1
866     instance.ctime = instance.mtime = time.time()
867     self._config_data.instances[instance.name] = instance
868     self._config_data.cluster.serial_no += 1
869     self._UnlockedReleaseDRBDMinors(instance.name)
870     self._WriteConfig()
871
872   def _EnsureUUID(self, item, ec_id):
873     """Ensures a given object has a valid UUID.
874
875     @param item: the instance or node to be checked
876     @param ec_id: the execution context id for the uuid reservation
877
878     """
879     if not item.uuid:
880       item.uuid = self._GenerateUniqueID(ec_id)
881     elif item.uuid in self._AllIDs(include_temporary=True):
882       raise errors.ConfigurationError("Cannot add '%s': UUID %s already"
883                                       " in use" % (item.name, item.uuid))
884
885   def _SetInstanceStatus(self, instance_name, status):
886     """Set the instance's status to a given value.
887
888     """
889     assert isinstance(status, bool), \
890            "Invalid status '%s' passed to SetInstanceStatus" % (status,)
891
892     if instance_name not in self._config_data.instances:
893       raise errors.ConfigurationError("Unknown instance '%s'" %
894                                       instance_name)
895     instance = self._config_data.instances[instance_name]
896     if instance.admin_up != status:
897       instance.admin_up = status
898       instance.serial_no += 1
899       instance.mtime = time.time()
900       self._WriteConfig()
901
902   @locking.ssynchronized(_config_lock)
903   def MarkInstanceUp(self, instance_name):
904     """Mark the instance status to up in the config.
905
906     """
907     self._SetInstanceStatus(instance_name, True)
908
909   @locking.ssynchronized(_config_lock)
910   def RemoveInstance(self, instance_name):
911     """Remove the instance from the configuration.
912
913     """
914     if instance_name not in self._config_data.instances:
915       raise errors.ConfigurationError("Unknown instance '%s'" % instance_name)
916     del self._config_data.instances[instance_name]
917     self._config_data.cluster.serial_no += 1
918     self._WriteConfig()
919
920   @locking.ssynchronized(_config_lock)
921   def RenameInstance(self, old_name, new_name):
922     """Rename an instance.
923
924     This needs to be done in ConfigWriter and not by RemoveInstance
925     combined with AddInstance as only we can guarantee an atomic
926     rename.
927
928     """
929     if old_name not in self._config_data.instances:
930       raise errors.ConfigurationError("Unknown instance '%s'" % old_name)
931     inst = self._config_data.instances[old_name]
932     del self._config_data.instances[old_name]
933     inst.name = new_name
934
935     for disk in inst.disks:
936       if disk.dev_type == constants.LD_FILE:
937         # rename the file paths in logical and physical id
938         file_storage_dir = os.path.dirname(os.path.dirname(disk.logical_id[1]))
939         disk.physical_id = disk.logical_id = (disk.logical_id[0],
940                                               utils.PathJoin(file_storage_dir,
941                                                              inst.name,
942                                                              disk.iv_name))
943
944     self._config_data.instances[inst.name] = inst
945     self._WriteConfig()
946
947   @locking.ssynchronized(_config_lock)
948   def MarkInstanceDown(self, instance_name):
949     """Mark the status of an instance to down in the configuration.
950
951     """
952     self._SetInstanceStatus(instance_name, False)
953
954   def _UnlockedGetInstanceList(self):
955     """Get the list of instances.
956
957     This function is for internal use, when the config lock is already held.
958
959     """
960     return self._config_data.instances.keys()
961
962   @locking.ssynchronized(_config_lock, shared=1)
963   def GetInstanceList(self):
964     """Get the list of instances.
965
966     @return: array of instances, ex. ['instance2.example.com',
967         'instance1.example.com']
968
969     """
970     return self._UnlockedGetInstanceList()
971
972   @locking.ssynchronized(_config_lock, shared=1)
973   def ExpandInstanceName(self, short_name):
974     """Attempt to expand an incomplete instance name.
975
976     """
977     return utils.MatchNameComponent(short_name,
978                                     self._config_data.instances.keys(),
979                                     case_sensitive=False)
980
981   def _UnlockedGetInstanceInfo(self, instance_name):
982     """Returns information about an instance.
983
984     This function is for internal use, when the config lock is already held.
985
986     """
987     if instance_name not in self._config_data.instances:
988       return None
989
990     return self._config_data.instances[instance_name]
991
992   @locking.ssynchronized(_config_lock, shared=1)
993   def GetInstanceInfo(self, instance_name):
994     """Returns information about an instance.
995
996     It takes the information from the configuration file. Other information of
997     an instance are taken from the live systems.
998
999     @param instance_name: name of the instance, e.g.
1000         I{instance1.example.com}
1001
1002     @rtype: L{objects.Instance}
1003     @return: the instance object
1004
1005     """
1006     return self._UnlockedGetInstanceInfo(instance_name)
1007
1008   @locking.ssynchronized(_config_lock, shared=1)
1009   def GetAllInstancesInfo(self):
1010     """Get the configuration of all instances.
1011
1012     @rtype: dict
1013     @return: dict of (instance, instance_info), where instance_info is what
1014               would GetInstanceInfo return for the node
1015
1016     """
1017     my_dict = dict([(instance, self._UnlockedGetInstanceInfo(instance))
1018                     for instance in self._UnlockedGetInstanceList()])
1019     return my_dict
1020
1021   @locking.ssynchronized(_config_lock)
1022   def AddNode(self, node, ec_id):
1023     """Add a node to the configuration.
1024
1025     @type node: L{objects.Node}
1026     @param node: a Node instance
1027
1028     """
1029     logging.info("Adding node %s to configuration", node.name)
1030
1031     self._EnsureUUID(node, ec_id)
1032
1033     node.serial_no = 1
1034     node.ctime = node.mtime = time.time()
1035     self._config_data.nodes[node.name] = node
1036     self._config_data.cluster.serial_no += 1
1037     self._WriteConfig()
1038
1039   @locking.ssynchronized(_config_lock)
1040   def RemoveNode(self, node_name):
1041     """Remove a node from the configuration.
1042
1043     """
1044     logging.info("Removing node %s from configuration", node_name)
1045
1046     if node_name not in self._config_data.nodes:
1047       raise errors.ConfigurationError("Unknown node '%s'" % node_name)
1048
1049     del self._config_data.nodes[node_name]
1050     self._config_data.cluster.serial_no += 1
1051     self._WriteConfig()
1052
1053   @locking.ssynchronized(_config_lock, shared=1)
1054   def ExpandNodeName(self, short_name):
1055     """Attempt to expand an incomplete instance name.
1056
1057     """
1058     return utils.MatchNameComponent(short_name,
1059                                     self._config_data.nodes.keys(),
1060                                     case_sensitive=False)
1061
1062   def _UnlockedGetNodeInfo(self, node_name):
1063     """Get the configuration of a node, as stored in the config.
1064
1065     This function is for internal use, when the config lock is already
1066     held.
1067
1068     @param node_name: the node name, e.g. I{node1.example.com}
1069
1070     @rtype: L{objects.Node}
1071     @return: the node object
1072
1073     """
1074     if node_name not in self._config_data.nodes:
1075       return None
1076
1077     return self._config_data.nodes[node_name]
1078
1079   @locking.ssynchronized(_config_lock, shared=1)
1080   def GetNodeInfo(self, node_name):
1081     """Get the configuration of a node, as stored in the config.
1082
1083     This is just a locked wrapper over L{_UnlockedGetNodeInfo}.
1084
1085     @param node_name: the node name, e.g. I{node1.example.com}
1086
1087     @rtype: L{objects.Node}
1088     @return: the node object
1089
1090     """
1091     return self._UnlockedGetNodeInfo(node_name)
1092
1093   def _UnlockedGetNodeList(self):
1094     """Return the list of nodes which are in the configuration.
1095
1096     This function is for internal use, when the config lock is already
1097     held.
1098
1099     @rtype: list
1100
1101     """
1102     return self._config_data.nodes.keys()
1103
1104   @locking.ssynchronized(_config_lock, shared=1)
1105   def GetNodeList(self):
1106     """Return the list of nodes which are in the configuration.
1107
1108     """
1109     return self._UnlockedGetNodeList()
1110
1111   def _UnlockedGetOnlineNodeList(self):
1112     """Return the list of nodes which are online.
1113
1114     """
1115     all_nodes = [self._UnlockedGetNodeInfo(node)
1116                  for node in self._UnlockedGetNodeList()]
1117     return [node.name for node in all_nodes if not node.offline]
1118
1119   @locking.ssynchronized(_config_lock, shared=1)
1120   def GetOnlineNodeList(self):
1121     """Return the list of nodes which are online.
1122
1123     """
1124     return self._UnlockedGetOnlineNodeList()
1125
1126   @locking.ssynchronized(_config_lock, shared=1)
1127   def GetAllNodesInfo(self):
1128     """Get the configuration of all nodes.
1129
1130     @rtype: dict
1131     @return: dict of (node, node_info), where node_info is what
1132               would GetNodeInfo return for the node
1133
1134     """
1135     my_dict = dict([(node, self._UnlockedGetNodeInfo(node))
1136                     for node in self._UnlockedGetNodeList()])
1137     return my_dict
1138
1139   def _UnlockedGetMasterCandidateStats(self, exceptions=None):
1140     """Get the number of current and maximum desired and possible candidates.
1141
1142     @type exceptions: list
1143     @param exceptions: if passed, list of nodes that should be ignored
1144     @rtype: tuple
1145     @return: tuple of (current, desired and possible, possible)
1146
1147     """
1148     mc_now = mc_should = mc_max = 0
1149     for node in self._config_data.nodes.values():
1150       if exceptions and node.name in exceptions:
1151         continue
1152       if not (node.offline or node.drained):
1153         mc_max += 1
1154       if node.master_candidate:
1155         mc_now += 1
1156     mc_should = min(mc_max, self._config_data.cluster.candidate_pool_size)
1157     return (mc_now, mc_should, mc_max)
1158
1159   @locking.ssynchronized(_config_lock, shared=1)
1160   def GetMasterCandidateStats(self, exceptions=None):
1161     """Get the number of current and maximum possible candidates.
1162
1163     This is just a wrapper over L{_UnlockedGetMasterCandidateStats}.
1164
1165     @type exceptions: list
1166     @param exceptions: if passed, list of nodes that should be ignored
1167     @rtype: tuple
1168     @return: tuple of (current, max)
1169
1170     """
1171     return self._UnlockedGetMasterCandidateStats(exceptions)
1172
1173   @locking.ssynchronized(_config_lock)
1174   def MaintainCandidatePool(self, exceptions):
1175     """Try to grow the candidate pool to the desired size.
1176
1177     @type exceptions: list
1178     @param exceptions: if passed, list of nodes that should be ignored
1179     @rtype: list
1180     @return: list with the adjusted nodes (L{objects.Node} instances)
1181
1182     """
1183     mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats(exceptions)
1184     mod_list = []
1185     if mc_now < mc_max:
1186       node_list = self._config_data.nodes.keys()
1187       random.shuffle(node_list)
1188       for name in node_list:
1189         if mc_now >= mc_max:
1190           break
1191         node = self._config_data.nodes[name]
1192         if (node.master_candidate or node.offline or node.drained or
1193             node.name in exceptions):
1194           continue
1195         mod_list.append(node)
1196         node.master_candidate = True
1197         node.serial_no += 1
1198         mc_now += 1
1199       if mc_now != mc_max:
1200         # this should not happen
1201         logging.warning("Warning: MaintainCandidatePool didn't manage to"
1202                         " fill the candidate pool (%d/%d)", mc_now, mc_max)
1203       if mod_list:
1204         self._config_data.cluster.serial_no += 1
1205         self._WriteConfig()
1206
1207     return mod_list
1208
1209   def _BumpSerialNo(self):
1210     """Bump up the serial number of the config.
1211
1212     """
1213     self._config_data.serial_no += 1
1214     self._config_data.mtime = time.time()
1215
1216   def _AllUUIDObjects(self):
1217     """Returns all objects with uuid attributes.
1218
1219     """
1220     return (self._config_data.instances.values() +
1221             self._config_data.nodes.values() +
1222             [self._config_data.cluster])
1223
1224   def _OpenConfig(self):
1225     """Read the config data from disk.
1226
1227     """
1228     raw_data = utils.ReadFile(self._cfg_file)
1229
1230     try:
1231       data = objects.ConfigData.FromDict(serializer.Load(raw_data))
1232     except Exception, err:
1233       raise errors.ConfigurationError(err)
1234
1235     # Make sure the configuration has the right version
1236     _ValidateConfig(data)
1237
1238     if (not hasattr(data, 'cluster') or
1239         not hasattr(data.cluster, 'rsahostkeypub')):
1240       raise errors.ConfigurationError("Incomplete configuration"
1241                                       " (missing cluster.rsahostkeypub)")
1242
1243     # Upgrade configuration if needed
1244     data.UpgradeConfig()
1245
1246     self._config_data = data
1247     # reset the last serial as -1 so that the next write will cause
1248     # ssconf update
1249     self._last_cluster_serial = -1
1250
1251     # And finally run our (custom) config upgrade sequence
1252     self._UpgradeConfig()
1253
1254   def _UpgradeConfig(self):
1255     """Run upgrade steps that cannot be done purely in the objects.
1256
1257     This is because some data elements need uniqueness across the
1258     whole configuration, etc.
1259
1260     @warning: this function will call L{_WriteConfig()}, but also
1261         L{DropECReservations} so it needs to be called only from a
1262         "safe" place (the constructor). If one wanted to call it with
1263         the lock held, a DropECReservationUnlocked would need to be
1264         created first, to avoid causing deadlock.
1265
1266     """
1267     modified = False
1268     for item in self._AllUUIDObjects():
1269       if item.uuid is None:
1270         item.uuid = self._GenerateUniqueID(_UPGRADE_CONFIG_JID)
1271         modified = True
1272     if modified:
1273       self._WriteConfig()
1274       # This is ok even if it acquires the internal lock, as _UpgradeConfig is
1275       # only called at config init time, without the lock held
1276       self.DropECReservations(_UPGRADE_CONFIG_JID)
1277
1278   def _DistributeConfig(self, feedback_fn):
1279     """Distribute the configuration to the other nodes.
1280
1281     Currently, this only copies the configuration file. In the future,
1282     it could be used to encapsulate the 2/3-phase update mechanism.
1283
1284     """
1285     if self._offline:
1286       return True
1287
1288     bad = False
1289
1290     node_list = []
1291     addr_list = []
1292     myhostname = self._my_hostname
1293     # we can skip checking whether _UnlockedGetNodeInfo returns None
1294     # since the node list comes from _UnlocketGetNodeList, and we are
1295     # called with the lock held, so no modifications should take place
1296     # in between
1297     for node_name in self._UnlockedGetNodeList():
1298       if node_name == myhostname:
1299         continue
1300       node_info = self._UnlockedGetNodeInfo(node_name)
1301       if not node_info.master_candidate:
1302         continue
1303       node_list.append(node_info.name)
1304       addr_list.append(node_info.primary_ip)
1305
1306     result = rpc.RpcRunner.call_upload_file(node_list, self._cfg_file,
1307                                             address_list=addr_list)
1308     for to_node, to_result in result.items():
1309       msg = to_result.fail_msg
1310       if msg:
1311         msg = ("Copy of file %s to node %s failed: %s" %
1312                (self._cfg_file, to_node, msg))
1313         logging.error(msg)
1314
1315         if feedback_fn:
1316           feedback_fn(msg)
1317
1318         bad = True
1319
1320     return not bad
1321
1322   def _WriteConfig(self, destination=None, feedback_fn=None):
1323     """Write the configuration data to persistent storage.
1324
1325     """
1326     assert feedback_fn is None or callable(feedback_fn)
1327
1328     # Warn on config errors, but don't abort the save - the
1329     # configuration has already been modified, and we can't revert;
1330     # the best we can do is to warn the user and save as is, leaving
1331     # recovery to the user
1332     config_errors = self._UnlockedVerifyConfig()
1333     if config_errors:
1334       errmsg = ("Configuration data is not consistent: %s" %
1335                 (utils.CommaJoin(config_errors)))
1336       logging.critical(errmsg)
1337       if feedback_fn:
1338         feedback_fn(errmsg)
1339
1340     if destination is None:
1341       destination = self._cfg_file
1342     self._BumpSerialNo()
1343     txt = serializer.Dump(self._config_data.ToDict())
1344
1345     utils.WriteFile(destination, data=txt)
1346
1347     self.write_count += 1
1348
1349     # and redistribute the config file to master candidates
1350     self._DistributeConfig(feedback_fn)
1351
1352     # Write ssconf files on all nodes (including locally)
1353     if self._last_cluster_serial < self._config_data.cluster.serial_no:
1354       if not self._offline:
1355         result = rpc.RpcRunner.call_write_ssconf_files(
1356           self._UnlockedGetOnlineNodeList(),
1357           self._UnlockedGetSsconfValues())
1358
1359         for nname, nresu in result.items():
1360           msg = nresu.fail_msg
1361           if msg:
1362             errmsg = ("Error while uploading ssconf files to"
1363                       " node %s: %s" % (nname, msg))
1364             logging.warning(errmsg)
1365
1366             if feedback_fn:
1367               feedback_fn(errmsg)
1368
1369       self._last_cluster_serial = self._config_data.cluster.serial_no
1370
1371   def _UnlockedGetSsconfValues(self):
1372     """Return the values needed by ssconf.
1373
1374     @rtype: dict
1375     @return: a dictionary with keys the ssconf names and values their
1376         associated value
1377
1378     """
1379     fn = "\n".join
1380     instance_names = utils.NiceSort(self._UnlockedGetInstanceList())
1381     node_names = utils.NiceSort(self._UnlockedGetNodeList())
1382     node_info = [self._UnlockedGetNodeInfo(name) for name in node_names]
1383     node_pri_ips = ["%s %s" % (ninfo.name, ninfo.primary_ip)
1384                     for ninfo in node_info]
1385     node_snd_ips = ["%s %s" % (ninfo.name, ninfo.secondary_ip)
1386                     for ninfo in node_info]
1387
1388     instance_data = fn(instance_names)
1389     off_data = fn(node.name for node in node_info if node.offline)
1390     on_data = fn(node.name for node in node_info if not node.offline)
1391     mc_data = fn(node.name for node in node_info if node.master_candidate)
1392     mc_ips_data = fn(node.primary_ip for node in node_info
1393                      if node.master_candidate)
1394     node_data = fn(node_names)
1395     node_pri_ips_data = fn(node_pri_ips)
1396     node_snd_ips_data = fn(node_snd_ips)
1397
1398     cluster = self._config_data.cluster
1399     cluster_tags = fn(cluster.GetTags())
1400
1401     hypervisor_list = fn(cluster.enabled_hypervisors)
1402
1403     uid_pool = uidpool.FormatUidPool(cluster.uid_pool, separator="\n")
1404
1405     return {
1406       constants.SS_CLUSTER_NAME: cluster.cluster_name,
1407       constants.SS_CLUSTER_TAGS: cluster_tags,
1408       constants.SS_FILE_STORAGE_DIR: cluster.file_storage_dir,
1409       constants.SS_MASTER_CANDIDATES: mc_data,
1410       constants.SS_MASTER_CANDIDATES_IPS: mc_ips_data,
1411       constants.SS_MASTER_IP: cluster.master_ip,
1412       constants.SS_MASTER_NETDEV: cluster.master_netdev,
1413       constants.SS_MASTER_NODE: cluster.master_node,
1414       constants.SS_NODE_LIST: node_data,
1415       constants.SS_NODE_PRIMARY_IPS: node_pri_ips_data,
1416       constants.SS_NODE_SECONDARY_IPS: node_snd_ips_data,
1417       constants.SS_OFFLINE_NODES: off_data,
1418       constants.SS_ONLINE_NODES: on_data,
1419       constants.SS_PRIMARY_IP_FAMILY: str(cluster.primary_ip_family),
1420       constants.SS_INSTANCE_LIST: instance_data,
1421       constants.SS_RELEASE_VERSION: constants.RELEASE_VERSION,
1422       constants.SS_HYPERVISOR_LIST: hypervisor_list,
1423       constants.SS_MAINTAIN_NODE_HEALTH: str(cluster.maintain_node_health),
1424       constants.SS_UID_POOL: uid_pool,
1425       }
1426
1427   @locking.ssynchronized(_config_lock, shared=1)
1428   def GetSsconfValues(self):
1429     """Wrapper using lock around _UnlockedGetSsconf().
1430
1431     """
1432     return self._UnlockedGetSsconfValues()
1433
1434   @locking.ssynchronized(_config_lock, shared=1)
1435   def GetVGName(self):
1436     """Return the volume group name.
1437
1438     """
1439     return self._config_data.cluster.volume_group_name
1440
1441   @locking.ssynchronized(_config_lock)
1442   def SetVGName(self, vg_name):
1443     """Set the volume group name.
1444
1445     """
1446     self._config_data.cluster.volume_group_name = vg_name
1447     self._config_data.cluster.serial_no += 1
1448     self._WriteConfig()
1449
1450   @locking.ssynchronized(_config_lock, shared=1)
1451   def GetDRBDHelper(self):
1452     """Return DRBD usermode helper.
1453
1454     """
1455     return self._config_data.cluster.drbd_usermode_helper
1456
1457   @locking.ssynchronized(_config_lock)
1458   def SetDRBDHelper(self, drbd_helper):
1459     """Set DRBD usermode helper.
1460
1461     """
1462     self._config_data.cluster.drbd_usermode_helper = drbd_helper
1463     self._config_data.cluster.serial_no += 1
1464     self._WriteConfig()
1465
1466   @locking.ssynchronized(_config_lock, shared=1)
1467   def GetMACPrefix(self):
1468     """Return the mac prefix.
1469
1470     """
1471     return self._config_data.cluster.mac_prefix
1472
1473   @locking.ssynchronized(_config_lock, shared=1)
1474   def GetClusterInfo(self):
1475     """Returns information about the cluster
1476
1477     @rtype: L{objects.Cluster}
1478     @return: the cluster object
1479
1480     """
1481     return self._config_data.cluster
1482
1483   @locking.ssynchronized(_config_lock, shared=1)
1484   def HasAnyDiskOfType(self, dev_type):
1485     """Check if in there is at disk of the given type in the configuration.
1486
1487     """
1488     return self._config_data.HasAnyDiskOfType(dev_type)
1489
1490   @locking.ssynchronized(_config_lock)
1491   def Update(self, target, feedback_fn):
1492     """Notify function to be called after updates.
1493
1494     This function must be called when an object (as returned by
1495     GetInstanceInfo, GetNodeInfo, GetCluster) has been updated and the
1496     caller wants the modifications saved to the backing store. Note
1497     that all modified objects will be saved, but the target argument
1498     is the one the caller wants to ensure that it's saved.
1499
1500     @param target: an instance of either L{objects.Cluster},
1501         L{objects.Node} or L{objects.Instance} which is existing in
1502         the cluster
1503     @param feedback_fn: Callable feedback function
1504
1505     """
1506     if self._config_data is None:
1507       raise errors.ProgrammerError("Configuration file not read,"
1508                                    " cannot save.")
1509     update_serial = False
1510     if isinstance(target, objects.Cluster):
1511       test = target == self._config_data.cluster
1512     elif isinstance(target, objects.Node):
1513       test = target in self._config_data.nodes.values()
1514       update_serial = True
1515     elif isinstance(target, objects.Instance):
1516       test = target in self._config_data.instances.values()
1517     else:
1518       raise errors.ProgrammerError("Invalid object type (%s) passed to"
1519                                    " ConfigWriter.Update" % type(target))
1520     if not test:
1521       raise errors.ConfigurationError("Configuration updated since object"
1522                                       " has been read or unknown object")
1523     target.serial_no += 1
1524     target.mtime = now = time.time()
1525
1526     if update_serial:
1527       # for node updates, we need to increase the cluster serial too
1528       self._config_data.cluster.serial_no += 1
1529       self._config_data.cluster.mtime = now
1530
1531     if isinstance(target, objects.Instance):
1532       self._UnlockedReleaseDRBDMinors(target.name)
1533
1534     self._WriteConfig(feedback_fn=feedback_fn)
1535
1536   @locking.ssynchronized(_config_lock)
1537   def DropECReservations(self, ec_id):
1538     """Drop per-execution-context reservations
1539
1540     """
1541     for rm in self._all_rms:
1542       rm.DropECReservations(ec_id)