cmdlib: Don't sort list of nodes if names are given
[ganeti-local] / lib / config.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Configuration management for Ganeti
23
24 This module provides the interface to the Ganeti cluster configuration.
25
26 The configuration data is stored on every node but is updated on the master
27 only. After each update, the master distributes the data to the other nodes.
28
29 Currently, the data storage format is JSON. YAML was slow and consuming too
30 much memory.
31
32 """
33
34 # pylint: disable-msg=R0904
35 # R0904: Too many public methods
36
37 import os
38 import random
39 import logging
40 import time
41
42 from ganeti import errors
43 from ganeti import locking
44 from ganeti import utils
45 from ganeti import constants
46 from ganeti import rpc
47 from ganeti import objects
48 from ganeti import serializer
49 from ganeti import uidpool
50 from ganeti import netutils
51 from ganeti import runtime
52
53
54 _config_lock = locking.SharedLock("ConfigWriter")
55
56 # job id used for resource management at config upgrade time
57 _UPGRADE_CONFIG_JID = "jid-cfg-upgrade"
58
59
60 def _ValidateConfig(data):
61   """Verifies that a configuration objects looks valid.
62
63   This only verifies the version of the configuration.
64
65   @raise errors.ConfigurationError: if the version differs from what
66       we expect
67
68   """
69   if data.version != constants.CONFIG_VERSION:
70     raise errors.ConfigVersionMismatch(constants.CONFIG_VERSION, data.version)
71
72
73 class TemporaryReservationManager:
74   """A temporary resource reservation manager.
75
76   This is used to reserve resources in a job, before using them, making sure
77   other jobs cannot get them in the meantime.
78
79   """
80   def __init__(self):
81     self._ec_reserved = {}
82
83   def Reserved(self, resource):
84     for holder_reserved in self._ec_reserved.values():
85       if resource in holder_reserved:
86         return True
87     return False
88
89   def Reserve(self, ec_id, resource):
90     if self.Reserved(resource):
91       raise errors.ReservationError("Duplicate reservation for resource '%s'"
92                                     % str(resource))
93     if ec_id not in self._ec_reserved:
94       self._ec_reserved[ec_id] = set([resource])
95     else:
96       self._ec_reserved[ec_id].add(resource)
97
98   def DropECReservations(self, ec_id):
99     if ec_id in self._ec_reserved:
100       del self._ec_reserved[ec_id]
101
102   def GetReserved(self):
103     all_reserved = set()
104     for holder_reserved in self._ec_reserved.values():
105       all_reserved.update(holder_reserved)
106     return all_reserved
107
108   def Generate(self, existing, generate_one_fn, ec_id):
109     """Generate a new resource of this type
110
111     """
112     assert callable(generate_one_fn)
113
114     all_elems = self.GetReserved()
115     all_elems.update(existing)
116     retries = 64
117     while retries > 0:
118       new_resource = generate_one_fn()
119       if new_resource is not None and new_resource not in all_elems:
120         break
121     else:
122       raise errors.ConfigurationError("Not able generate new resource"
123                                       " (last tried: %s)" % new_resource)
124     self.Reserve(ec_id, new_resource)
125     return new_resource
126
127
128 class ConfigWriter:
129   """The interface to the cluster configuration.
130
131   @ivar _temporary_lvs: reservation manager for temporary LVs
132   @ivar _all_rms: a list of all temporary reservation managers
133
134   """
135   def __init__(self, cfg_file=None, offline=False, _getents=runtime.GetEnts,
136                accept_foreign=False):
137     self.write_count = 0
138     self._lock = _config_lock
139     self._config_data = None
140     self._offline = offline
141     if cfg_file is None:
142       self._cfg_file = constants.CLUSTER_CONF_FILE
143     else:
144       self._cfg_file = cfg_file
145     self._getents = _getents
146     self._temporary_ids = TemporaryReservationManager()
147     self._temporary_drbds = {}
148     self._temporary_macs = TemporaryReservationManager()
149     self._temporary_secrets = TemporaryReservationManager()
150     self._temporary_lvs = TemporaryReservationManager()
151     self._all_rms = [self._temporary_ids, self._temporary_macs,
152                      self._temporary_secrets, self._temporary_lvs]
153     # Note: in order to prevent errors when resolving our name in
154     # _DistributeConfig, we compute it here once and reuse it; it's
155     # better to raise an error before starting to modify the config
156     # file than after it was modified
157     self._my_hostname = netutils.Hostname.GetSysName()
158     self._last_cluster_serial = -1
159     self._cfg_id = None
160     self._OpenConfig(accept_foreign)
161
162   # this method needs to be static, so that we can call it on the class
163   @staticmethod
164   def IsCluster():
165     """Check if the cluster is configured.
166
167     """
168     return os.path.exists(constants.CLUSTER_CONF_FILE)
169
170   def _GenerateOneMAC(self):
171     """Generate one mac address
172
173     """
174     prefix = self._config_data.cluster.mac_prefix
175     byte1 = random.randrange(0, 256)
176     byte2 = random.randrange(0, 256)
177     byte3 = random.randrange(0, 256)
178     mac = "%s:%02x:%02x:%02x" % (prefix, byte1, byte2, byte3)
179     return mac
180
181   @locking.ssynchronized(_config_lock, shared=1)
182   def GetNdParams(self, node):
183     """Get the node params populated with cluster defaults.
184
185     @type node: L{object.Node}
186     @param node: The node we want to know the params for
187     @return: A dict with the filled in node params
188
189     """
190     nodegroup = self._UnlockedGetNodeGroup(node.group)
191     return self._config_data.cluster.FillND(node, nodegroup)
192
193   @locking.ssynchronized(_config_lock, shared=1)
194   def GenerateMAC(self, ec_id):
195     """Generate a MAC for an instance.
196
197     This should check the current instances for duplicates.
198
199     """
200     existing = self._AllMACs()
201     return self._temporary_ids.Generate(existing, self._GenerateOneMAC, ec_id)
202
203   @locking.ssynchronized(_config_lock, shared=1)
204   def ReserveMAC(self, mac, ec_id):
205     """Reserve a MAC for an instance.
206
207     This only checks instances managed by this cluster, it does not
208     check for potential collisions elsewhere.
209
210     """
211     all_macs = self._AllMACs()
212     if mac in all_macs:
213       raise errors.ReservationError("mac already in use")
214     else:
215       self._temporary_macs.Reserve(mac, ec_id)
216
217   @locking.ssynchronized(_config_lock, shared=1)
218   def ReserveLV(self, lv_name, ec_id):
219     """Reserve an VG/LV pair for an instance.
220
221     @type lv_name: string
222     @param lv_name: the logical volume name to reserve
223
224     """
225     all_lvs = self._AllLVs()
226     if lv_name in all_lvs:
227       raise errors.ReservationError("LV already in use")
228     else:
229       self._temporary_lvs.Reserve(lv_name, ec_id)
230
231   @locking.ssynchronized(_config_lock, shared=1)
232   def GenerateDRBDSecret(self, ec_id):
233     """Generate a DRBD secret.
234
235     This checks the current disks for duplicates.
236
237     """
238     return self._temporary_secrets.Generate(self._AllDRBDSecrets(),
239                                             utils.GenerateSecret,
240                                             ec_id)
241
242   def _AllLVs(self):
243     """Compute the list of all LVs.
244
245     """
246     lvnames = set()
247     for instance in self._config_data.instances.values():
248       node_data = instance.MapLVsByNode()
249       for lv_list in node_data.values():
250         lvnames.update(lv_list)
251     return lvnames
252
253   def _AllIDs(self, include_temporary):
254     """Compute the list of all UUIDs and names we have.
255
256     @type include_temporary: boolean
257     @param include_temporary: whether to include the _temporary_ids set
258     @rtype: set
259     @return: a set of IDs
260
261     """
262     existing = set()
263     if include_temporary:
264       existing.update(self._temporary_ids.GetReserved())
265     existing.update(self._AllLVs())
266     existing.update(self._config_data.instances.keys())
267     existing.update(self._config_data.nodes.keys())
268     existing.update([i.uuid for i in self._AllUUIDObjects() if i.uuid])
269     return existing
270
271   def _GenerateUniqueID(self, ec_id):
272     """Generate an unique UUID.
273
274     This checks the current node, instances and disk names for
275     duplicates.
276
277     @rtype: string
278     @return: the unique id
279
280     """
281     existing = self._AllIDs(include_temporary=False)
282     return self._temporary_ids.Generate(existing, utils.NewUUID, ec_id)
283
284   @locking.ssynchronized(_config_lock, shared=1)
285   def GenerateUniqueID(self, ec_id):
286     """Generate an unique ID.
287
288     This is just a wrapper over the unlocked version.
289
290     @type ec_id: string
291     @param ec_id: unique id for the job to reserve the id to
292
293     """
294     return self._GenerateUniqueID(ec_id)
295
296   def _AllMACs(self):
297     """Return all MACs present in the config.
298
299     @rtype: list
300     @return: the list of all MACs
301
302     """
303     result = []
304     for instance in self._config_data.instances.values():
305       for nic in instance.nics:
306         result.append(nic.mac)
307
308     return result
309
310   def _AllDRBDSecrets(self):
311     """Return all DRBD secrets present in the config.
312
313     @rtype: list
314     @return: the list of all DRBD secrets
315
316     """
317     def helper(disk, result):
318       """Recursively gather secrets from this disk."""
319       if disk.dev_type == constants.DT_DRBD8:
320         result.append(disk.logical_id[5])
321       if disk.children:
322         for child in disk.children:
323           helper(child, result)
324
325     result = []
326     for instance in self._config_data.instances.values():
327       for disk in instance.disks:
328         helper(disk, result)
329
330     return result
331
332   def _CheckDiskIDs(self, disk, l_ids, p_ids):
333     """Compute duplicate disk IDs
334
335     @type disk: L{objects.Disk}
336     @param disk: the disk at which to start searching
337     @type l_ids: list
338     @param l_ids: list of current logical ids
339     @type p_ids: list
340     @param p_ids: list of current physical ids
341     @rtype: list
342     @return: a list of error messages
343
344     """
345     result = []
346     if disk.logical_id is not None:
347       if disk.logical_id in l_ids:
348         result.append("duplicate logical id %s" % str(disk.logical_id))
349       else:
350         l_ids.append(disk.logical_id)
351     if disk.physical_id is not None:
352       if disk.physical_id in p_ids:
353         result.append("duplicate physical id %s" % str(disk.physical_id))
354       else:
355         p_ids.append(disk.physical_id)
356
357     if disk.children:
358       for child in disk.children:
359         result.extend(self._CheckDiskIDs(child, l_ids, p_ids))
360     return result
361
362   def _UnlockedVerifyConfig(self):
363     """Verify function.
364
365     @rtype: list
366     @return: a list of error messages; a non-empty list signifies
367         configuration errors
368
369     """
370     result = []
371     seen_macs = []
372     ports = {}
373     data = self._config_data
374     seen_lids = []
375     seen_pids = []
376
377     # global cluster checks
378     if not data.cluster.enabled_hypervisors:
379       result.append("enabled hypervisors list doesn't have any entries")
380     invalid_hvs = set(data.cluster.enabled_hypervisors) - constants.HYPER_TYPES
381     if invalid_hvs:
382       result.append("enabled hypervisors contains invalid entries: %s" %
383                     invalid_hvs)
384     missing_hvp = (set(data.cluster.enabled_hypervisors) -
385                    set(data.cluster.hvparams.keys()))
386     if missing_hvp:
387       result.append("hypervisor parameters missing for the enabled"
388                     " hypervisor(s) %s" % utils.CommaJoin(missing_hvp))
389
390     if data.cluster.master_node not in data.nodes:
391       result.append("cluster has invalid primary node '%s'" %
392                     data.cluster.master_node)
393
394     # per-instance checks
395     for instance_name in data.instances:
396       instance = data.instances[instance_name]
397       if instance.name != instance_name:
398         result.append("instance '%s' is indexed by wrong name '%s'" %
399                       (instance.name, instance_name))
400       if instance.primary_node not in data.nodes:
401         result.append("instance '%s' has invalid primary node '%s'" %
402                       (instance_name, instance.primary_node))
403       for snode in instance.secondary_nodes:
404         if snode not in data.nodes:
405           result.append("instance '%s' has invalid secondary node '%s'" %
406                         (instance_name, snode))
407       for idx, nic in enumerate(instance.nics):
408         if nic.mac in seen_macs:
409           result.append("instance '%s' has NIC %d mac %s duplicate" %
410                         (instance_name, idx, nic.mac))
411         else:
412           seen_macs.append(nic.mac)
413
414       # gather the drbd ports for duplicate checks
415       for dsk in instance.disks:
416         if dsk.dev_type in constants.LDS_DRBD:
417           tcp_port = dsk.logical_id[2]
418           if tcp_port not in ports:
419             ports[tcp_port] = []
420           ports[tcp_port].append((instance.name, "drbd disk %s" % dsk.iv_name))
421       # gather network port reservation
422       net_port = getattr(instance, "network_port", None)
423       if net_port is not None:
424         if net_port not in ports:
425           ports[net_port] = []
426         ports[net_port].append((instance.name, "network port"))
427
428       # instance disk verify
429       for idx, disk in enumerate(instance.disks):
430         result.extend(["instance '%s' disk %d error: %s" %
431                        (instance.name, idx, msg) for msg in disk.Verify()])
432         result.extend(self._CheckDiskIDs(disk, seen_lids, seen_pids))
433
434     # cluster-wide pool of free ports
435     for free_port in data.cluster.tcpudp_port_pool:
436       if free_port not in ports:
437         ports[free_port] = []
438       ports[free_port].append(("cluster", "port marked as free"))
439
440     # compute tcp/udp duplicate ports
441     keys = ports.keys()
442     keys.sort()
443     for pnum in keys:
444       pdata = ports[pnum]
445       if len(pdata) > 1:
446         txt = utils.CommaJoin(["%s/%s" % val for val in pdata])
447         result.append("tcp/udp port %s has duplicates: %s" % (pnum, txt))
448
449     # highest used tcp port check
450     if keys:
451       if keys[-1] > data.cluster.highest_used_port:
452         result.append("Highest used port mismatch, saved %s, computed %s" %
453                       (data.cluster.highest_used_port, keys[-1]))
454
455     if not data.nodes[data.cluster.master_node].master_candidate:
456       result.append("Master node is not a master candidate")
457
458     # master candidate checks
459     mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats()
460     if mc_now < mc_max:
461       result.append("Not enough master candidates: actual %d, target %d" %
462                     (mc_now, mc_max))
463
464     # node checks
465     for node_name, node in data.nodes.items():
466       if node.name != node_name:
467         result.append("Node '%s' is indexed by wrong name '%s'" %
468                       (node.name, node_name))
469       if [node.master_candidate, node.drained, node.offline].count(True) > 1:
470         result.append("Node %s state is invalid: master_candidate=%s,"
471                       " drain=%s, offline=%s" %
472                       (node.name, node.master_candidate, node.drained,
473                        node.offline))
474
475     # nodegroups checks
476     nodegroups_names = set()
477     for nodegroup_uuid in data.nodegroups:
478       nodegroup = data.nodegroups[nodegroup_uuid]
479       if nodegroup.uuid != nodegroup_uuid:
480         result.append("node group '%s' (uuid: '%s') indexed by wrong uuid '%s'"
481                       % (nodegroup.name, nodegroup.uuid, nodegroup_uuid))
482       if utils.UUID_RE.match(nodegroup.name.lower()):
483         result.append("node group '%s' (uuid: '%s') has uuid-like name" %
484                       (nodegroup.name, nodegroup.uuid))
485       if nodegroup.name in nodegroups_names:
486         result.append("duplicate node group name '%s'" % nodegroup.name)
487       else:
488         nodegroups_names.add(nodegroup.name)
489
490     # drbd minors check
491     _, duplicates = self._UnlockedComputeDRBDMap()
492     for node, minor, instance_a, instance_b in duplicates:
493       result.append("DRBD minor %d on node %s is assigned twice to instances"
494                     " %s and %s" % (minor, node, instance_a, instance_b))
495
496     # IP checks
497     default_nicparams = data.cluster.nicparams[constants.PP_DEFAULT]
498     ips = {}
499
500     def _AddIpAddress(ip, name):
501       ips.setdefault(ip, []).append(name)
502
503     _AddIpAddress(data.cluster.master_ip, "cluster_ip")
504
505     for node in data.nodes.values():
506       _AddIpAddress(node.primary_ip, "node:%s/primary" % node.name)
507       if node.secondary_ip != node.primary_ip:
508         _AddIpAddress(node.secondary_ip, "node:%s/secondary" % node.name)
509
510     for instance in data.instances.values():
511       for idx, nic in enumerate(instance.nics):
512         if nic.ip is None:
513           continue
514
515         nicparams = objects.FillDict(default_nicparams, nic.nicparams)
516         nic_mode = nicparams[constants.NIC_MODE]
517         nic_link = nicparams[constants.NIC_LINK]
518
519         if nic_mode == constants.NIC_MODE_BRIDGED:
520           link = "bridge:%s" % nic_link
521         elif nic_mode == constants.NIC_MODE_ROUTED:
522           link = "route:%s" % nic_link
523         else:
524           raise errors.ProgrammerError("NIC mode '%s' not handled" % nic_mode)
525
526         _AddIpAddress("%s/%s" % (link, nic.ip),
527                       "instance:%s/nic:%d" % (instance.name, idx))
528
529     for ip, owners in ips.items():
530       if len(owners) > 1:
531         result.append("IP address %s is used by multiple owners: %s" %
532                       (ip, utils.CommaJoin(owners)))
533
534     return result
535
536   @locking.ssynchronized(_config_lock, shared=1)
537   def VerifyConfig(self):
538     """Verify function.
539
540     This is just a wrapper over L{_UnlockedVerifyConfig}.
541
542     @rtype: list
543     @return: a list of error messages; a non-empty list signifies
544         configuration errors
545
546     """
547     return self._UnlockedVerifyConfig()
548
549   def _UnlockedSetDiskID(self, disk, node_name):
550     """Convert the unique ID to the ID needed on the target nodes.
551
552     This is used only for drbd, which needs ip/port configuration.
553
554     The routine descends down and updates its children also, because
555     this helps when the only the top device is passed to the remote
556     node.
557
558     This function is for internal use, when the config lock is already held.
559
560     """
561     if disk.children:
562       for child in disk.children:
563         self._UnlockedSetDiskID(child, node_name)
564
565     if disk.logical_id is None and disk.physical_id is not None:
566       return
567     if disk.dev_type == constants.LD_DRBD8:
568       pnode, snode, port, pminor, sminor, secret = disk.logical_id
569       if node_name not in (pnode, snode):
570         raise errors.ConfigurationError("DRBD device not knowing node %s" %
571                                         node_name)
572       pnode_info = self._UnlockedGetNodeInfo(pnode)
573       snode_info = self._UnlockedGetNodeInfo(snode)
574       if pnode_info is None or snode_info is None:
575         raise errors.ConfigurationError("Can't find primary or secondary node"
576                                         " for %s" % str(disk))
577       p_data = (pnode_info.secondary_ip, port)
578       s_data = (snode_info.secondary_ip, port)
579       if pnode == node_name:
580         disk.physical_id = p_data + s_data + (pminor, secret)
581       else: # it must be secondary, we tested above
582         disk.physical_id = s_data + p_data + (sminor, secret)
583     else:
584       disk.physical_id = disk.logical_id
585     return
586
587   @locking.ssynchronized(_config_lock)
588   def SetDiskID(self, disk, node_name):
589     """Convert the unique ID to the ID needed on the target nodes.
590
591     This is used only for drbd, which needs ip/port configuration.
592
593     The routine descends down and updates its children also, because
594     this helps when the only the top device is passed to the remote
595     node.
596
597     """
598     return self._UnlockedSetDiskID(disk, node_name)
599
600   @locking.ssynchronized(_config_lock)
601   def AddTcpUdpPort(self, port):
602     """Adds a new port to the available port pool.
603
604     """
605     if not isinstance(port, int):
606       raise errors.ProgrammerError("Invalid type passed for port")
607
608     self._config_data.cluster.tcpudp_port_pool.add(port)
609     self._WriteConfig()
610
611   @locking.ssynchronized(_config_lock, shared=1)
612   def GetPortList(self):
613     """Returns a copy of the current port list.
614
615     """
616     return self._config_data.cluster.tcpudp_port_pool.copy()
617
618   @locking.ssynchronized(_config_lock)
619   def AllocatePort(self):
620     """Allocate a port.
621
622     The port will be taken from the available port pool or from the
623     default port range (and in this case we increase
624     highest_used_port).
625
626     """
627     # If there are TCP/IP ports configured, we use them first.
628     if self._config_data.cluster.tcpudp_port_pool:
629       port = self._config_data.cluster.tcpudp_port_pool.pop()
630     else:
631       port = self._config_data.cluster.highest_used_port + 1
632       if port >= constants.LAST_DRBD_PORT:
633         raise errors.ConfigurationError("The highest used port is greater"
634                                         " than %s. Aborting." %
635                                         constants.LAST_DRBD_PORT)
636       self._config_data.cluster.highest_used_port = port
637
638     self._WriteConfig()
639     return port
640
641   def _UnlockedComputeDRBDMap(self):
642     """Compute the used DRBD minor/nodes.
643
644     @rtype: (dict, list)
645     @return: dictionary of node_name: dict of minor: instance_name;
646         the returned dict will have all the nodes in it (even if with
647         an empty list), and a list of duplicates; if the duplicates
648         list is not empty, the configuration is corrupted and its caller
649         should raise an exception
650
651     """
652     def _AppendUsedPorts(instance_name, disk, used):
653       duplicates = []
654       if disk.dev_type == constants.LD_DRBD8 and len(disk.logical_id) >= 5:
655         node_a, node_b, _, minor_a, minor_b = disk.logical_id[:5]
656         for node, port in ((node_a, minor_a), (node_b, minor_b)):
657           assert node in used, ("Node '%s' of instance '%s' not found"
658                                 " in node list" % (node, instance_name))
659           if port in used[node]:
660             duplicates.append((node, port, instance_name, used[node][port]))
661           else:
662             used[node][port] = instance_name
663       if disk.children:
664         for child in disk.children:
665           duplicates.extend(_AppendUsedPorts(instance_name, child, used))
666       return duplicates
667
668     duplicates = []
669     my_dict = dict((node, {}) for node in self._config_data.nodes)
670     for instance in self._config_data.instances.itervalues():
671       for disk in instance.disks:
672         duplicates.extend(_AppendUsedPorts(instance.name, disk, my_dict))
673     for (node, minor), instance in self._temporary_drbds.iteritems():
674       if minor in my_dict[node] and my_dict[node][minor] != instance:
675         duplicates.append((node, minor, instance, my_dict[node][minor]))
676       else:
677         my_dict[node][minor] = instance
678     return my_dict, duplicates
679
680   @locking.ssynchronized(_config_lock)
681   def ComputeDRBDMap(self):
682     """Compute the used DRBD minor/nodes.
683
684     This is just a wrapper over L{_UnlockedComputeDRBDMap}.
685
686     @return: dictionary of node_name: dict of minor: instance_name;
687         the returned dict will have all the nodes in it (even if with
688         an empty list).
689
690     """
691     d_map, duplicates = self._UnlockedComputeDRBDMap()
692     if duplicates:
693       raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
694                                       str(duplicates))
695     return d_map
696
697   @locking.ssynchronized(_config_lock)
698   def AllocateDRBDMinor(self, nodes, instance):
699     """Allocate a drbd minor.
700
701     The free minor will be automatically computed from the existing
702     devices. A node can be given multiple times in order to allocate
703     multiple minors. The result is the list of minors, in the same
704     order as the passed nodes.
705
706     @type instance: string
707     @param instance: the instance for which we allocate minors
708
709     """
710     assert isinstance(instance, basestring), \
711            "Invalid argument '%s' passed to AllocateDRBDMinor" % instance
712
713     d_map, duplicates = self._UnlockedComputeDRBDMap()
714     if duplicates:
715       raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
716                                       str(duplicates))
717     result = []
718     for nname in nodes:
719       ndata = d_map[nname]
720       if not ndata:
721         # no minors used, we can start at 0
722         result.append(0)
723         ndata[0] = instance
724         self._temporary_drbds[(nname, 0)] = instance
725         continue
726       keys = ndata.keys()
727       keys.sort()
728       ffree = utils.FirstFree(keys)
729       if ffree is None:
730         # return the next minor
731         # TODO: implement high-limit check
732         minor = keys[-1] + 1
733       else:
734         minor = ffree
735       # double-check minor against current instances
736       assert minor not in d_map[nname], \
737              ("Attempt to reuse allocated DRBD minor %d on node %s,"
738               " already allocated to instance %s" %
739               (minor, nname, d_map[nname][minor]))
740       ndata[minor] = instance
741       # double-check minor against reservation
742       r_key = (nname, minor)
743       assert r_key not in self._temporary_drbds, \
744              ("Attempt to reuse reserved DRBD minor %d on node %s,"
745               " reserved for instance %s" %
746               (minor, nname, self._temporary_drbds[r_key]))
747       self._temporary_drbds[r_key] = instance
748       result.append(minor)
749     logging.debug("Request to allocate drbd minors, input: %s, returning %s",
750                   nodes, result)
751     return result
752
753   def _UnlockedReleaseDRBDMinors(self, instance):
754     """Release temporary drbd minors allocated for a given instance.
755
756     @type instance: string
757     @param instance: the instance for which temporary minors should be
758                      released
759
760     """
761     assert isinstance(instance, basestring), \
762            "Invalid argument passed to ReleaseDRBDMinors"
763     for key, name in self._temporary_drbds.items():
764       if name == instance:
765         del self._temporary_drbds[key]
766
767   @locking.ssynchronized(_config_lock)
768   def ReleaseDRBDMinors(self, instance):
769     """Release temporary drbd minors allocated for a given instance.
770
771     This should be called on the error paths, on the success paths
772     it's automatically called by the ConfigWriter add and update
773     functions.
774
775     This function is just a wrapper over L{_UnlockedReleaseDRBDMinors}.
776
777     @type instance: string
778     @param instance: the instance for which temporary minors should be
779                      released
780
781     """
782     self._UnlockedReleaseDRBDMinors(instance)
783
784   @locking.ssynchronized(_config_lock, shared=1)
785   def GetConfigVersion(self):
786     """Get the configuration version.
787
788     @return: Config version
789
790     """
791     return self._config_data.version
792
793   @locking.ssynchronized(_config_lock, shared=1)
794   def GetClusterName(self):
795     """Get cluster name.
796
797     @return: Cluster name
798
799     """
800     return self._config_data.cluster.cluster_name
801
802   @locking.ssynchronized(_config_lock, shared=1)
803   def GetMasterNode(self):
804     """Get the hostname of the master node for this cluster.
805
806     @return: Master hostname
807
808     """
809     return self._config_data.cluster.master_node
810
811   @locking.ssynchronized(_config_lock, shared=1)
812   def GetMasterIP(self):
813     """Get the IP of the master node for this cluster.
814
815     @return: Master IP
816
817     """
818     return self._config_data.cluster.master_ip
819
820   @locking.ssynchronized(_config_lock, shared=1)
821   def GetMasterNetdev(self):
822     """Get the master network device for this cluster.
823
824     """
825     return self._config_data.cluster.master_netdev
826
827   @locking.ssynchronized(_config_lock, shared=1)
828   def GetFileStorageDir(self):
829     """Get the file storage dir for this cluster.
830
831     """
832     return self._config_data.cluster.file_storage_dir
833
834   @locking.ssynchronized(_config_lock, shared=1)
835   def GetHypervisorType(self):
836     """Get the hypervisor type for this cluster.
837
838     """
839     return self._config_data.cluster.enabled_hypervisors[0]
840
841   @locking.ssynchronized(_config_lock, shared=1)
842   def GetHostKey(self):
843     """Return the rsa hostkey from the config.
844
845     @rtype: string
846     @return: the rsa hostkey
847
848     """
849     return self._config_data.cluster.rsahostkeypub
850
851   @locking.ssynchronized(_config_lock, shared=1)
852   def GetDefaultIAllocator(self):
853     """Get the default instance allocator for this cluster.
854
855     """
856     return self._config_data.cluster.default_iallocator
857
858   @locking.ssynchronized(_config_lock, shared=1)
859   def GetPrimaryIPFamily(self):
860     """Get cluster primary ip family.
861
862     @return: primary ip family
863
864     """
865     return self._config_data.cluster.primary_ip_family
866
867   @locking.ssynchronized(_config_lock, shared=1)
868   def LookupNodeGroup(self, target):
869     """Lookup a node group's UUID.
870
871     @type target: string or None
872     @param target: group name or UUID or None to look for the default
873     @rtype: string
874     @return: nodegroup UUID
875     @raises errors.OpPrereqError: when the target group cannot be found
876
877     """
878     if target is None:
879       if len(self._config_data.nodegroups) != 1:
880         raise errors.OpPrereqError("More than one node group exists. Target"
881                                    " group must be specified explicitely.")
882       else:
883         return self._config_data.nodegroups.keys()[0]
884     if target in self._config_data.nodegroups:
885       return target
886     for nodegroup in self._config_data.nodegroups.values():
887       if nodegroup.name == target:
888         return nodegroup.uuid
889     raise errors.OpPrereqError("Node group '%s' not found" % target,
890                                errors.ECODE_NOENT)
891
892   def _UnlockedGetNodeGroup(self, uuid):
893     """Lookup a node group.
894
895     @type uuid: string
896     @param uuid: group UUID
897     @rtype: L{objects.NodeGroup} or None
898     @return: nodegroup object, or None if not found
899
900     """
901     if uuid not in self._config_data.nodegroups:
902       return None
903
904     return self._config_data.nodegroups[uuid]
905
906   @locking.ssynchronized(_config_lock, shared=1)
907   def GetNodeGroup(self, uuid):
908     """Lookup a node group.
909
910     @type uuid: string
911     @param uuid: group UUID
912     @rtype: L{objects.NodeGroup} or None
913     @return: nodegroup object, or None if not found
914
915     """
916     return self._UnlockedGetNodeGroup(uuid)
917
918   @locking.ssynchronized(_config_lock, shared=1)
919   def GetAllNodeGroupsInfo(self):
920     """Get the configuration of all node groups.
921
922     """
923     return dict(self._config_data.nodegroups)
924
925   @locking.ssynchronized(_config_lock, shared=1)
926   def GetNodeGroupList(self):
927     """Get a list of node groups.
928
929     """
930     return self._config_data.nodegroups.keys()
931
932   @locking.ssynchronized(_config_lock)
933   def AddInstance(self, instance, ec_id):
934     """Add an instance to the config.
935
936     This should be used after creating a new instance.
937
938     @type instance: L{objects.Instance}
939     @param instance: the instance object
940
941     """
942     if not isinstance(instance, objects.Instance):
943       raise errors.ProgrammerError("Invalid type passed to AddInstance")
944
945     if instance.disk_template != constants.DT_DISKLESS:
946       all_lvs = instance.MapLVsByNode()
947       logging.info("Instance '%s' DISK_LAYOUT: %s", instance.name, all_lvs)
948
949     all_macs = self._AllMACs()
950     for nic in instance.nics:
951       if nic.mac in all_macs:
952         raise errors.ConfigurationError("Cannot add instance %s:"
953                                         " MAC address '%s' already in use." %
954                                         (instance.name, nic.mac))
955
956     self._EnsureUUID(instance, ec_id)
957
958     instance.serial_no = 1
959     instance.ctime = instance.mtime = time.time()
960     self._config_data.instances[instance.name] = instance
961     self._config_data.cluster.serial_no += 1
962     self._UnlockedReleaseDRBDMinors(instance.name)
963     self._WriteConfig()
964
965   def _EnsureUUID(self, item, ec_id):
966     """Ensures a given object has a valid UUID.
967
968     @param item: the instance or node to be checked
969     @param ec_id: the execution context id for the uuid reservation
970
971     """
972     if not item.uuid:
973       item.uuid = self._GenerateUniqueID(ec_id)
974     elif item.uuid in self._AllIDs(include_temporary=True):
975       raise errors.ConfigurationError("Cannot add '%s': UUID %s already"
976                                       " in use" % (item.name, item.uuid))
977
978   def _SetInstanceStatus(self, instance_name, status):
979     """Set the instance's status to a given value.
980
981     """
982     assert isinstance(status, bool), \
983            "Invalid status '%s' passed to SetInstanceStatus" % (status,)
984
985     if instance_name not in self._config_data.instances:
986       raise errors.ConfigurationError("Unknown instance '%s'" %
987                                       instance_name)
988     instance = self._config_data.instances[instance_name]
989     if instance.admin_up != status:
990       instance.admin_up = status
991       instance.serial_no += 1
992       instance.mtime = time.time()
993       self._WriteConfig()
994
995   @locking.ssynchronized(_config_lock)
996   def MarkInstanceUp(self, instance_name):
997     """Mark the instance status to up in the config.
998
999     """
1000     self._SetInstanceStatus(instance_name, True)
1001
1002   @locking.ssynchronized(_config_lock)
1003   def RemoveInstance(self, instance_name):
1004     """Remove the instance from the configuration.
1005
1006     """
1007     if instance_name not in self._config_data.instances:
1008       raise errors.ConfigurationError("Unknown instance '%s'" % instance_name)
1009     del self._config_data.instances[instance_name]
1010     self._config_data.cluster.serial_no += 1
1011     self._WriteConfig()
1012
1013   @locking.ssynchronized(_config_lock)
1014   def RenameInstance(self, old_name, new_name):
1015     """Rename an instance.
1016
1017     This needs to be done in ConfigWriter and not by RemoveInstance
1018     combined with AddInstance as only we can guarantee an atomic
1019     rename.
1020
1021     """
1022     if old_name not in self._config_data.instances:
1023       raise errors.ConfigurationError("Unknown instance '%s'" % old_name)
1024     inst = self._config_data.instances[old_name]
1025     del self._config_data.instances[old_name]
1026     inst.name = new_name
1027
1028     for disk in inst.disks:
1029       if disk.dev_type == constants.LD_FILE:
1030         # rename the file paths in logical and physical id
1031         file_storage_dir = os.path.dirname(os.path.dirname(disk.logical_id[1]))
1032         disk.physical_id = disk.logical_id = (disk.logical_id[0],
1033                                               utils.PathJoin(file_storage_dir,
1034                                                              inst.name,
1035                                                              disk.iv_name))
1036
1037     # Force update of ssconf files
1038     self._config_data.cluster.serial_no += 1
1039
1040     self._config_data.instances[inst.name] = inst
1041     self._WriteConfig()
1042
1043   @locking.ssynchronized(_config_lock)
1044   def MarkInstanceDown(self, instance_name):
1045     """Mark the status of an instance to down in the configuration.
1046
1047     """
1048     self._SetInstanceStatus(instance_name, False)
1049
1050   def _UnlockedGetInstanceList(self):
1051     """Get the list of instances.
1052
1053     This function is for internal use, when the config lock is already held.
1054
1055     """
1056     return self._config_data.instances.keys()
1057
1058   @locking.ssynchronized(_config_lock, shared=1)
1059   def GetInstanceList(self):
1060     """Get the list of instances.
1061
1062     @return: array of instances, ex. ['instance2.example.com',
1063         'instance1.example.com']
1064
1065     """
1066     return self._UnlockedGetInstanceList()
1067
1068   @locking.ssynchronized(_config_lock, shared=1)
1069   def ExpandInstanceName(self, short_name):
1070     """Attempt to expand an incomplete instance name.
1071
1072     """
1073     return utils.MatchNameComponent(short_name,
1074                                     self._config_data.instances.keys(),
1075                                     case_sensitive=False)
1076
1077   def _UnlockedGetInstanceInfo(self, instance_name):
1078     """Returns information about an instance.
1079
1080     This function is for internal use, when the config lock is already held.
1081
1082     """
1083     if instance_name not in self._config_data.instances:
1084       return None
1085
1086     return self._config_data.instances[instance_name]
1087
1088   @locking.ssynchronized(_config_lock, shared=1)
1089   def GetInstanceInfo(self, instance_name):
1090     """Returns information about an instance.
1091
1092     It takes the information from the configuration file. Other information of
1093     an instance are taken from the live systems.
1094
1095     @param instance_name: name of the instance, e.g.
1096         I{instance1.example.com}
1097
1098     @rtype: L{objects.Instance}
1099     @return: the instance object
1100
1101     """
1102     return self._UnlockedGetInstanceInfo(instance_name)
1103
1104   @locking.ssynchronized(_config_lock, shared=1)
1105   def GetAllInstancesInfo(self):
1106     """Get the configuration of all instances.
1107
1108     @rtype: dict
1109     @return: dict of (instance, instance_info), where instance_info is what
1110               would GetInstanceInfo return for the node
1111
1112     """
1113     my_dict = dict([(instance, self._UnlockedGetInstanceInfo(instance))
1114                     for instance in self._UnlockedGetInstanceList()])
1115     return my_dict
1116
1117   @locking.ssynchronized(_config_lock)
1118   def AddNode(self, node, ec_id):
1119     """Add a node to the configuration.
1120
1121     @type node: L{objects.Node}
1122     @param node: a Node instance
1123
1124     """
1125     logging.info("Adding node %s to configuration", node.name)
1126
1127     self._EnsureUUID(node, ec_id)
1128
1129     node.serial_no = 1
1130     node.ctime = node.mtime = time.time()
1131     self._UnlockedAddNodeToGroup(node.name, node.group)
1132     self._config_data.nodes[node.name] = node
1133     self._config_data.cluster.serial_no += 1
1134     self._WriteConfig()
1135
1136   @locking.ssynchronized(_config_lock)
1137   def RemoveNode(self, node_name):
1138     """Remove a node from the configuration.
1139
1140     """
1141     logging.info("Removing node %s from configuration", node_name)
1142
1143     if node_name not in self._config_data.nodes:
1144       raise errors.ConfigurationError("Unknown node '%s'" % node_name)
1145
1146     self._UnlockedRemoveNodeFromGroup(self._config_data.nodes[node_name])
1147     del self._config_data.nodes[node_name]
1148     self._config_data.cluster.serial_no += 1
1149     self._WriteConfig()
1150
1151   @locking.ssynchronized(_config_lock, shared=1)
1152   def ExpandNodeName(self, short_name):
1153     """Attempt to expand an incomplete instance name.
1154
1155     """
1156     return utils.MatchNameComponent(short_name,
1157                                     self._config_data.nodes.keys(),
1158                                     case_sensitive=False)
1159
1160   def _UnlockedGetNodeInfo(self, node_name):
1161     """Get the configuration of a node, as stored in the config.
1162
1163     This function is for internal use, when the config lock is already
1164     held.
1165
1166     @param node_name: the node name, e.g. I{node1.example.com}
1167
1168     @rtype: L{objects.Node}
1169     @return: the node object
1170
1171     """
1172     if node_name not in self._config_data.nodes:
1173       return None
1174
1175     return self._config_data.nodes[node_name]
1176
1177   @locking.ssynchronized(_config_lock, shared=1)
1178   def GetNodeInfo(self, node_name):
1179     """Get the configuration of a node, as stored in the config.
1180
1181     This is just a locked wrapper over L{_UnlockedGetNodeInfo}.
1182
1183     @param node_name: the node name, e.g. I{node1.example.com}
1184
1185     @rtype: L{objects.Node}
1186     @return: the node object
1187
1188     """
1189     return self._UnlockedGetNodeInfo(node_name)
1190
1191   @locking.ssynchronized(_config_lock, shared=1)
1192   def GetNodeInstances(self, node_name):
1193     """Get the instances of a node, as stored in the config.
1194
1195     @param node_name: the node name, e.g. I{node1.example.com}
1196
1197     @rtype: (list, list)
1198     @return: a tuple with two lists: the primary and the secondary instances
1199
1200     """
1201     pri = []
1202     sec = []
1203     for inst in self._config_data.instances.values():
1204       if inst.primary_node == node_name:
1205         pri.append(inst.name)
1206       if node_name in inst.secondary_nodes:
1207         sec.append(inst.name)
1208     return (pri, sec)
1209
1210   def _UnlockedGetNodeList(self):
1211     """Return the list of nodes which are in the configuration.
1212
1213     This function is for internal use, when the config lock is already
1214     held.
1215
1216     @rtype: list
1217
1218     """
1219     return self._config_data.nodes.keys()
1220
1221   @locking.ssynchronized(_config_lock, shared=1)
1222   def GetNodeList(self):
1223     """Return the list of nodes which are in the configuration.
1224
1225     """
1226     return self._UnlockedGetNodeList()
1227
1228   def _UnlockedGetOnlineNodeList(self):
1229     """Return the list of nodes which are online.
1230
1231     """
1232     all_nodes = [self._UnlockedGetNodeInfo(node)
1233                  for node in self._UnlockedGetNodeList()]
1234     return [node.name for node in all_nodes if not node.offline]
1235
1236   @locking.ssynchronized(_config_lock, shared=1)
1237   def GetOnlineNodeList(self):
1238     """Return the list of nodes which are online.
1239
1240     """
1241     return self._UnlockedGetOnlineNodeList()
1242
1243   @locking.ssynchronized(_config_lock, shared=1)
1244   def GetNonVmCapableNodeList(self):
1245     """Return the list of nodes which are not vm capable.
1246
1247     """
1248     all_nodes = [self._UnlockedGetNodeInfo(node)
1249                  for node in self._UnlockedGetNodeList()]
1250     return [node.name for node in all_nodes if not node.vm_capable]
1251
1252   @locking.ssynchronized(_config_lock, shared=1)
1253   def GetAllNodesInfo(self):
1254     """Get the configuration of all nodes.
1255
1256     @rtype: dict
1257     @return: dict of (node, node_info), where node_info is what
1258               would GetNodeInfo return for the node
1259
1260     """
1261     my_dict = dict([(node, self._UnlockedGetNodeInfo(node))
1262                     for node in self._UnlockedGetNodeList()])
1263     return my_dict
1264
1265   def _UnlockedGetMasterCandidateStats(self, exceptions=None):
1266     """Get the number of current and maximum desired and possible candidates.
1267
1268     @type exceptions: list
1269     @param exceptions: if passed, list of nodes that should be ignored
1270     @rtype: tuple
1271     @return: tuple of (current, desired and possible, possible)
1272
1273     """
1274     mc_now = mc_should = mc_max = 0
1275     for node in self._config_data.nodes.values():
1276       if exceptions and node.name in exceptions:
1277         continue
1278       if not (node.offline or node.drained) and node.master_capable:
1279         mc_max += 1
1280       if node.master_candidate:
1281         mc_now += 1
1282     mc_should = min(mc_max, self._config_data.cluster.candidate_pool_size)
1283     return (mc_now, mc_should, mc_max)
1284
1285   @locking.ssynchronized(_config_lock, shared=1)
1286   def GetMasterCandidateStats(self, exceptions=None):
1287     """Get the number of current and maximum possible candidates.
1288
1289     This is just a wrapper over L{_UnlockedGetMasterCandidateStats}.
1290
1291     @type exceptions: list
1292     @param exceptions: if passed, list of nodes that should be ignored
1293     @rtype: tuple
1294     @return: tuple of (current, max)
1295
1296     """
1297     return self._UnlockedGetMasterCandidateStats(exceptions)
1298
1299   @locking.ssynchronized(_config_lock)
1300   def MaintainCandidatePool(self, exceptions):
1301     """Try to grow the candidate pool to the desired size.
1302
1303     @type exceptions: list
1304     @param exceptions: if passed, list of nodes that should be ignored
1305     @rtype: list
1306     @return: list with the adjusted nodes (L{objects.Node} instances)
1307
1308     """
1309     mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats(exceptions)
1310     mod_list = []
1311     if mc_now < mc_max:
1312       node_list = self._config_data.nodes.keys()
1313       random.shuffle(node_list)
1314       for name in node_list:
1315         if mc_now >= mc_max:
1316           break
1317         node = self._config_data.nodes[name]
1318         if (node.master_candidate or node.offline or node.drained or
1319             node.name in exceptions or not node.master_capable):
1320           continue
1321         mod_list.append(node)
1322         node.master_candidate = True
1323         node.serial_no += 1
1324         mc_now += 1
1325       if mc_now != mc_max:
1326         # this should not happen
1327         logging.warning("Warning: MaintainCandidatePool didn't manage to"
1328                         " fill the candidate pool (%d/%d)", mc_now, mc_max)
1329       if mod_list:
1330         self._config_data.cluster.serial_no += 1
1331         self._WriteConfig()
1332
1333     return mod_list
1334
1335   def _UnlockedAddNodeToGroup(self, node_name, nodegroup_uuid):
1336     """Add a given node to the specified group.
1337
1338     """
1339     if nodegroup_uuid not in self._config_data.nodegroups:
1340       # This can happen if a node group gets deleted between its lookup and
1341       # when we're adding the first node to it, since we don't keep a lock in
1342       # the meantime. It's ok though, as we'll fail cleanly if the node group
1343       # is not found anymore.
1344       raise errors.OpExecError("Unknown node group: %s" % nodegroup_uuid)
1345     if node_name not in self._config_data.nodegroups[nodegroup_uuid].members:
1346       self._config_data.nodegroups[nodegroup_uuid].members.append(node_name)
1347
1348   def _UnlockedRemoveNodeFromGroup(self, node):
1349     """Remove a given node from its group.
1350
1351     """
1352     nodegroup = node.group
1353     if nodegroup not in self._config_data.nodegroups:
1354       logging.warning("Warning: node '%s' has unknown node group '%s'"
1355                       " (while being removed from it)", node.name, nodegroup)
1356     nodegroup_obj = self._config_data.nodegroups[nodegroup]
1357     if node.name not in nodegroup_obj.members:
1358       logging.warning("Warning: node '%s' not a member of its node group '%s'"
1359                       " (while being removed from it)", node.name, nodegroup)
1360     else:
1361       nodegroup_obj.members.remove(node.name)
1362
1363   def _BumpSerialNo(self):
1364     """Bump up the serial number of the config.
1365
1366     """
1367     self._config_data.serial_no += 1
1368     self._config_data.mtime = time.time()
1369
1370   def _AllUUIDObjects(self):
1371     """Returns all objects with uuid attributes.
1372
1373     """
1374     return (self._config_data.instances.values() +
1375             self._config_data.nodes.values() +
1376             self._config_data.nodegroups.values() +
1377             [self._config_data.cluster])
1378
1379   def _OpenConfig(self, accept_foreign):
1380     """Read the config data from disk.
1381
1382     """
1383     raw_data = utils.ReadFile(self._cfg_file)
1384
1385     try:
1386       data = objects.ConfigData.FromDict(serializer.Load(raw_data))
1387     except Exception, err:
1388       raise errors.ConfigurationError(err)
1389
1390     # Make sure the configuration has the right version
1391     _ValidateConfig(data)
1392
1393     if (not hasattr(data, 'cluster') or
1394         not hasattr(data.cluster, 'rsahostkeypub')):
1395       raise errors.ConfigurationError("Incomplete configuration"
1396                                       " (missing cluster.rsahostkeypub)")
1397
1398     if data.cluster.master_node != self._my_hostname and not accept_foreign:
1399       msg = ("The configuration denotes node %s as master, while my"
1400              " hostname is %s; opening a foreign configuration is only"
1401              " possible in accept_foreign mode" %
1402              (data.cluster.master_node, self._my_hostname))
1403       raise errors.ConfigurationError(msg)
1404
1405     # Upgrade configuration if needed
1406     data.UpgradeConfig()
1407
1408     self._config_data = data
1409     # reset the last serial as -1 so that the next write will cause
1410     # ssconf update
1411     self._last_cluster_serial = -1
1412
1413     # And finally run our (custom) config upgrade sequence
1414     self._UpgradeConfig()
1415
1416     self._cfg_id = utils.GetFileID(path=self._cfg_file)
1417
1418   def _UpgradeConfig(self):
1419     """Run upgrade steps that cannot be done purely in the objects.
1420
1421     This is because some data elements need uniqueness across the
1422     whole configuration, etc.
1423
1424     @warning: this function will call L{_WriteConfig()}, but also
1425         L{DropECReservations} so it needs to be called only from a
1426         "safe" place (the constructor). If one wanted to call it with
1427         the lock held, a DropECReservationUnlocked would need to be
1428         created first, to avoid causing deadlock.
1429
1430     """
1431     modified = False
1432     for item in self._AllUUIDObjects():
1433       if item.uuid is None:
1434         item.uuid = self._GenerateUniqueID(_UPGRADE_CONFIG_JID)
1435         modified = True
1436     if not self._config_data.nodegroups:
1437       default_nodegroup_uuid = self._GenerateUniqueID(_UPGRADE_CONFIG_JID)
1438       default_nodegroup = objects.NodeGroup(
1439           uuid=default_nodegroup_uuid,
1440           name="default",
1441           members=[],
1442           )
1443       self._config_data.nodegroups[default_nodegroup_uuid] = default_nodegroup
1444       modified = True
1445     for node in self._config_data.nodes.values():
1446       if not node.group:
1447         node.group = self.LookupNodeGroup(None)
1448         modified = True
1449       # This is technically *not* an upgrade, but needs to be done both when
1450       # nodegroups are being added, and upon normally loading the config,
1451       # because the members list of a node group is discarded upon
1452       # serializing/deserializing the object.
1453       self._UnlockedAddNodeToGroup(node.name, node.group)
1454     if modified:
1455       self._WriteConfig()
1456       # This is ok even if it acquires the internal lock, as _UpgradeConfig is
1457       # only called at config init time, without the lock held
1458       self.DropECReservations(_UPGRADE_CONFIG_JID)
1459
1460   def _DistributeConfig(self, feedback_fn):
1461     """Distribute the configuration to the other nodes.
1462
1463     Currently, this only copies the configuration file. In the future,
1464     it could be used to encapsulate the 2/3-phase update mechanism.
1465
1466     """
1467     if self._offline:
1468       return True
1469
1470     bad = False
1471
1472     node_list = []
1473     addr_list = []
1474     myhostname = self._my_hostname
1475     # we can skip checking whether _UnlockedGetNodeInfo returns None
1476     # since the node list comes from _UnlocketGetNodeList, and we are
1477     # called with the lock held, so no modifications should take place
1478     # in between
1479     for node_name in self._UnlockedGetNodeList():
1480       if node_name == myhostname:
1481         continue
1482       node_info = self._UnlockedGetNodeInfo(node_name)
1483       if not node_info.master_candidate:
1484         continue
1485       node_list.append(node_info.name)
1486       addr_list.append(node_info.primary_ip)
1487
1488     result = rpc.RpcRunner.call_upload_file(node_list, self._cfg_file,
1489                                             address_list=addr_list)
1490     for to_node, to_result in result.items():
1491       msg = to_result.fail_msg
1492       if msg:
1493         msg = ("Copy of file %s to node %s failed: %s" %
1494                (self._cfg_file, to_node, msg))
1495         logging.error(msg)
1496
1497         if feedback_fn:
1498           feedback_fn(msg)
1499
1500         bad = True
1501
1502     return not bad
1503
1504   def _WriteConfig(self, destination=None, feedback_fn=None):
1505     """Write the configuration data to persistent storage.
1506
1507     """
1508     assert feedback_fn is None or callable(feedback_fn)
1509
1510     # Warn on config errors, but don't abort the save - the
1511     # configuration has already been modified, and we can't revert;
1512     # the best we can do is to warn the user and save as is, leaving
1513     # recovery to the user
1514     config_errors = self._UnlockedVerifyConfig()
1515     if config_errors:
1516       errmsg = ("Configuration data is not consistent: %s" %
1517                 (utils.CommaJoin(config_errors)))
1518       logging.critical(errmsg)
1519       if feedback_fn:
1520         feedback_fn(errmsg)
1521
1522     if destination is None:
1523       destination = self._cfg_file
1524     self._BumpSerialNo()
1525     txt = serializer.Dump(self._config_data.ToDict())
1526
1527     getents = self._getents()
1528     try:
1529       fd = utils.SafeWriteFile(destination, self._cfg_id, data=txt,
1530                                close=False, gid=getents.confd_gid, mode=0640)
1531     except errors.LockError:
1532       raise errors.ConfigurationError("The configuration file has been"
1533                                       " modified since the last write, cannot"
1534                                       " update")
1535     try:
1536       self._cfg_id = utils.GetFileID(fd=fd)
1537     finally:
1538       os.close(fd)
1539
1540     self.write_count += 1
1541
1542     # and redistribute the config file to master candidates
1543     self._DistributeConfig(feedback_fn)
1544
1545     # Write ssconf files on all nodes (including locally)
1546     if self._last_cluster_serial < self._config_data.cluster.serial_no:
1547       if not self._offline:
1548         result = rpc.RpcRunner.call_write_ssconf_files(
1549           self._UnlockedGetOnlineNodeList(),
1550           self._UnlockedGetSsconfValues())
1551
1552         for nname, nresu in result.items():
1553           msg = nresu.fail_msg
1554           if msg:
1555             errmsg = ("Error while uploading ssconf files to"
1556                       " node %s: %s" % (nname, msg))
1557             logging.warning(errmsg)
1558
1559             if feedback_fn:
1560               feedback_fn(errmsg)
1561
1562       self._last_cluster_serial = self._config_data.cluster.serial_no
1563
1564   def _UnlockedGetSsconfValues(self):
1565     """Return the values needed by ssconf.
1566
1567     @rtype: dict
1568     @return: a dictionary with keys the ssconf names and values their
1569         associated value
1570
1571     """
1572     fn = "\n".join
1573     instance_names = utils.NiceSort(self._UnlockedGetInstanceList())
1574     node_names = utils.NiceSort(self._UnlockedGetNodeList())
1575     node_info = [self._UnlockedGetNodeInfo(name) for name in node_names]
1576     node_pri_ips = ["%s %s" % (ninfo.name, ninfo.primary_ip)
1577                     for ninfo in node_info]
1578     node_snd_ips = ["%s %s" % (ninfo.name, ninfo.secondary_ip)
1579                     for ninfo in node_info]
1580
1581     instance_data = fn(instance_names)
1582     off_data = fn(node.name for node in node_info if node.offline)
1583     on_data = fn(node.name for node in node_info if not node.offline)
1584     mc_data = fn(node.name for node in node_info if node.master_candidate)
1585     mc_ips_data = fn(node.primary_ip for node in node_info
1586                      if node.master_candidate)
1587     node_data = fn(node_names)
1588     node_pri_ips_data = fn(node_pri_ips)
1589     node_snd_ips_data = fn(node_snd_ips)
1590
1591     cluster = self._config_data.cluster
1592     cluster_tags = fn(cluster.GetTags())
1593
1594     hypervisor_list = fn(cluster.enabled_hypervisors)
1595
1596     uid_pool = uidpool.FormatUidPool(cluster.uid_pool, separator="\n")
1597
1598     nodegroups = ["%s %s" % (nodegroup.uuid, nodegroup.name) for nodegroup in
1599                   self._config_data.nodegroups.values()]
1600     nodegroups_data = fn(utils.NiceSort(nodegroups))
1601
1602     return {
1603       constants.SS_CLUSTER_NAME: cluster.cluster_name,
1604       constants.SS_CLUSTER_TAGS: cluster_tags,
1605       constants.SS_FILE_STORAGE_DIR: cluster.file_storage_dir,
1606       constants.SS_MASTER_CANDIDATES: mc_data,
1607       constants.SS_MASTER_CANDIDATES_IPS: mc_ips_data,
1608       constants.SS_MASTER_IP: cluster.master_ip,
1609       constants.SS_MASTER_NETDEV: cluster.master_netdev,
1610       constants.SS_MASTER_NODE: cluster.master_node,
1611       constants.SS_NODE_LIST: node_data,
1612       constants.SS_NODE_PRIMARY_IPS: node_pri_ips_data,
1613       constants.SS_NODE_SECONDARY_IPS: node_snd_ips_data,
1614       constants.SS_OFFLINE_NODES: off_data,
1615       constants.SS_ONLINE_NODES: on_data,
1616       constants.SS_PRIMARY_IP_FAMILY: str(cluster.primary_ip_family),
1617       constants.SS_INSTANCE_LIST: instance_data,
1618       constants.SS_RELEASE_VERSION: constants.RELEASE_VERSION,
1619       constants.SS_HYPERVISOR_LIST: hypervisor_list,
1620       constants.SS_MAINTAIN_NODE_HEALTH: str(cluster.maintain_node_health),
1621       constants.SS_UID_POOL: uid_pool,
1622       constants.SS_NODEGROUPS: nodegroups_data,
1623       }
1624
1625   @locking.ssynchronized(_config_lock, shared=1)
1626   def GetSsconfValues(self):
1627     """Wrapper using lock around _UnlockedGetSsconf().
1628
1629     """
1630     return self._UnlockedGetSsconfValues()
1631
1632   @locking.ssynchronized(_config_lock, shared=1)
1633   def GetVGName(self):
1634     """Return the volume group name.
1635
1636     """
1637     return self._config_data.cluster.volume_group_name
1638
1639   @locking.ssynchronized(_config_lock)
1640   def SetVGName(self, vg_name):
1641     """Set the volume group name.
1642
1643     """
1644     self._config_data.cluster.volume_group_name = vg_name
1645     self._config_data.cluster.serial_no += 1
1646     self._WriteConfig()
1647
1648   @locking.ssynchronized(_config_lock, shared=1)
1649   def GetDRBDHelper(self):
1650     """Return DRBD usermode helper.
1651
1652     """
1653     return self._config_data.cluster.drbd_usermode_helper
1654
1655   @locking.ssynchronized(_config_lock)
1656   def SetDRBDHelper(self, drbd_helper):
1657     """Set DRBD usermode helper.
1658
1659     """
1660     self._config_data.cluster.drbd_usermode_helper = drbd_helper
1661     self._config_data.cluster.serial_no += 1
1662     self._WriteConfig()
1663
1664   @locking.ssynchronized(_config_lock, shared=1)
1665   def GetMACPrefix(self):
1666     """Return the mac prefix.
1667
1668     """
1669     return self._config_data.cluster.mac_prefix
1670
1671   @locking.ssynchronized(_config_lock, shared=1)
1672   def GetClusterInfo(self):
1673     """Returns information about the cluster
1674
1675     @rtype: L{objects.Cluster}
1676     @return: the cluster object
1677
1678     """
1679     return self._config_data.cluster
1680
1681   @locking.ssynchronized(_config_lock, shared=1)
1682   def HasAnyDiskOfType(self, dev_type):
1683     """Check if in there is at disk of the given type in the configuration.
1684
1685     """
1686     return self._config_data.HasAnyDiskOfType(dev_type)
1687
1688   @locking.ssynchronized(_config_lock)
1689   def Update(self, target, feedback_fn):
1690     """Notify function to be called after updates.
1691
1692     This function must be called when an object (as returned by
1693     GetInstanceInfo, GetNodeInfo, GetCluster) has been updated and the
1694     caller wants the modifications saved to the backing store. Note
1695     that all modified objects will be saved, but the target argument
1696     is the one the caller wants to ensure that it's saved.
1697
1698     @param target: an instance of either L{objects.Cluster},
1699         L{objects.Node} or L{objects.Instance} which is existing in
1700         the cluster
1701     @param feedback_fn: Callable feedback function
1702
1703     """
1704     if self._config_data is None:
1705       raise errors.ProgrammerError("Configuration file not read,"
1706                                    " cannot save.")
1707     update_serial = False
1708     if isinstance(target, objects.Cluster):
1709       test = target == self._config_data.cluster
1710     elif isinstance(target, objects.Node):
1711       test = target in self._config_data.nodes.values()
1712       update_serial = True
1713     elif isinstance(target, objects.Instance):
1714       test = target in self._config_data.instances.values()
1715     else:
1716       raise errors.ProgrammerError("Invalid object type (%s) passed to"
1717                                    " ConfigWriter.Update" % type(target))
1718     if not test:
1719       raise errors.ConfigurationError("Configuration updated since object"
1720                                       " has been read or unknown object")
1721     target.serial_no += 1
1722     target.mtime = now = time.time()
1723
1724     if update_serial:
1725       # for node updates, we need to increase the cluster serial too
1726       self._config_data.cluster.serial_no += 1
1727       self._config_data.cluster.mtime = now
1728
1729     if isinstance(target, objects.Instance):
1730       self._UnlockedReleaseDRBDMinors(target.name)
1731
1732     self._WriteConfig(feedback_fn=feedback_fn)
1733
1734   @locking.ssynchronized(_config_lock)
1735   def DropECReservations(self, ec_id):
1736     """Drop per-execution-context reservations
1737
1738     """
1739     for rm in self._all_rms:
1740       rm.DropECReservations(ec_id)