Bump version to 2.5.0~rc1
[ganeti-local] / lib / config.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Configuration management for Ganeti
23
24 This module provides the interface to the Ganeti cluster configuration.
25
26 The configuration data is stored on every node but is updated on the master
27 only. After each update, the master distributes the data to the other nodes.
28
29 Currently, the data storage format is JSON. YAML was slow and consuming too
30 much memory.
31
32 """
33
34 # pylint: disable=R0904
35 # R0904: Too many public methods
36
37 import os
38 import random
39 import logging
40 import time
41
42 from ganeti import errors
43 from ganeti import locking
44 from ganeti import utils
45 from ganeti import constants
46 from ganeti import rpc
47 from ganeti import objects
48 from ganeti import serializer
49 from ganeti import uidpool
50 from ganeti import netutils
51 from ganeti import runtime
52
53
54 _config_lock = locking.SharedLock("ConfigWriter")
55
56 # job id used for resource management at config upgrade time
57 _UPGRADE_CONFIG_JID = "jid-cfg-upgrade"
58
59
60 def _ValidateConfig(data):
61   """Verifies that a configuration objects looks valid.
62
63   This only verifies the version of the configuration.
64
65   @raise errors.ConfigurationError: if the version differs from what
66       we expect
67
68   """
69   if data.version != constants.CONFIG_VERSION:
70     raise errors.ConfigVersionMismatch(constants.CONFIG_VERSION, data.version)
71
72
73 class TemporaryReservationManager:
74   """A temporary resource reservation manager.
75
76   This is used to reserve resources in a job, before using them, making sure
77   other jobs cannot get them in the meantime.
78
79   """
80   def __init__(self):
81     self._ec_reserved = {}
82
83   def Reserved(self, resource):
84     for holder_reserved in self._ec_reserved.values():
85       if resource in holder_reserved:
86         return True
87     return False
88
89   def Reserve(self, ec_id, resource):
90     if self.Reserved(resource):
91       raise errors.ReservationError("Duplicate reservation for resource '%s'"
92                                     % str(resource))
93     if ec_id not in self._ec_reserved:
94       self._ec_reserved[ec_id] = set([resource])
95     else:
96       self._ec_reserved[ec_id].add(resource)
97
98   def DropECReservations(self, ec_id):
99     if ec_id in self._ec_reserved:
100       del self._ec_reserved[ec_id]
101
102   def GetReserved(self):
103     all_reserved = set()
104     for holder_reserved in self._ec_reserved.values():
105       all_reserved.update(holder_reserved)
106     return all_reserved
107
108   def Generate(self, existing, generate_one_fn, ec_id):
109     """Generate a new resource of this type
110
111     """
112     assert callable(generate_one_fn)
113
114     all_elems = self.GetReserved()
115     all_elems.update(existing)
116     retries = 64
117     while retries > 0:
118       new_resource = generate_one_fn()
119       if new_resource is not None and new_resource not in all_elems:
120         break
121     else:
122       raise errors.ConfigurationError("Not able generate new resource"
123                                       " (last tried: %s)" % new_resource)
124     self.Reserve(ec_id, new_resource)
125     return new_resource
126
127
128 def _MatchNameComponentIgnoreCase(short_name, names):
129   """Wrapper around L{utils.text.MatchNameComponent}.
130
131   """
132   return utils.MatchNameComponent(short_name, names, case_sensitive=False)
133
134
135 class ConfigWriter:
136   """The interface to the cluster configuration.
137
138   @ivar _temporary_lvs: reservation manager for temporary LVs
139   @ivar _all_rms: a list of all temporary reservation managers
140
141   """
142   def __init__(self, cfg_file=None, offline=False, _getents=runtime.GetEnts,
143                accept_foreign=False):
144     self.write_count = 0
145     self._lock = _config_lock
146     self._config_data = None
147     self._offline = offline
148     if cfg_file is None:
149       self._cfg_file = constants.CLUSTER_CONF_FILE
150     else:
151       self._cfg_file = cfg_file
152     self._getents = _getents
153     self._temporary_ids = TemporaryReservationManager()
154     self._temporary_drbds = {}
155     self._temporary_macs = TemporaryReservationManager()
156     self._temporary_secrets = TemporaryReservationManager()
157     self._temporary_lvs = TemporaryReservationManager()
158     self._all_rms = [self._temporary_ids, self._temporary_macs,
159                      self._temporary_secrets, self._temporary_lvs]
160     # Note: in order to prevent errors when resolving our name in
161     # _DistributeConfig, we compute it here once and reuse it; it's
162     # better to raise an error before starting to modify the config
163     # file than after it was modified
164     self._my_hostname = netutils.Hostname.GetSysName()
165     self._last_cluster_serial = -1
166     self._cfg_id = None
167     self._OpenConfig(accept_foreign)
168
169   # this method needs to be static, so that we can call it on the class
170   @staticmethod
171   def IsCluster():
172     """Check if the cluster is configured.
173
174     """
175     return os.path.exists(constants.CLUSTER_CONF_FILE)
176
177   def _GenerateOneMAC(self):
178     """Generate one mac address
179
180     """
181     prefix = self._config_data.cluster.mac_prefix
182     byte1 = random.randrange(0, 256)
183     byte2 = random.randrange(0, 256)
184     byte3 = random.randrange(0, 256)
185     mac = "%s:%02x:%02x:%02x" % (prefix, byte1, byte2, byte3)
186     return mac
187
188   @locking.ssynchronized(_config_lock, shared=1)
189   def GetNdParams(self, node):
190     """Get the node params populated with cluster defaults.
191
192     @type node: L{objects.Node}
193     @param node: The node we want to know the params for
194     @return: A dict with the filled in node params
195
196     """
197     nodegroup = self._UnlockedGetNodeGroup(node.group)
198     return self._config_data.cluster.FillND(node, nodegroup)
199
200   @locking.ssynchronized(_config_lock, shared=1)
201   def GenerateMAC(self, ec_id):
202     """Generate a MAC for an instance.
203
204     This should check the current instances for duplicates.
205
206     """
207     existing = self._AllMACs()
208     return self._temporary_ids.Generate(existing, self._GenerateOneMAC, ec_id)
209
210   @locking.ssynchronized(_config_lock, shared=1)
211   def ReserveMAC(self, mac, ec_id):
212     """Reserve a MAC for an instance.
213
214     This only checks instances managed by this cluster, it does not
215     check for potential collisions elsewhere.
216
217     """
218     all_macs = self._AllMACs()
219     if mac in all_macs:
220       raise errors.ReservationError("mac already in use")
221     else:
222       self._temporary_macs.Reserve(ec_id, mac)
223
224   @locking.ssynchronized(_config_lock, shared=1)
225   def ReserveLV(self, lv_name, ec_id):
226     """Reserve an VG/LV pair for an instance.
227
228     @type lv_name: string
229     @param lv_name: the logical volume name to reserve
230
231     """
232     all_lvs = self._AllLVs()
233     if lv_name in all_lvs:
234       raise errors.ReservationError("LV already in use")
235     else:
236       self._temporary_lvs.Reserve(ec_id, lv_name)
237
238   @locking.ssynchronized(_config_lock, shared=1)
239   def GenerateDRBDSecret(self, ec_id):
240     """Generate a DRBD secret.
241
242     This checks the current disks for duplicates.
243
244     """
245     return self._temporary_secrets.Generate(self._AllDRBDSecrets(),
246                                             utils.GenerateSecret,
247                                             ec_id)
248
249   def _AllLVs(self):
250     """Compute the list of all LVs.
251
252     """
253     lvnames = set()
254     for instance in self._config_data.instances.values():
255       node_data = instance.MapLVsByNode()
256       for lv_list in node_data.values():
257         lvnames.update(lv_list)
258     return lvnames
259
260   def _AllIDs(self, include_temporary):
261     """Compute the list of all UUIDs and names we have.
262
263     @type include_temporary: boolean
264     @param include_temporary: whether to include the _temporary_ids set
265     @rtype: set
266     @return: a set of IDs
267
268     """
269     existing = set()
270     if include_temporary:
271       existing.update(self._temporary_ids.GetReserved())
272     existing.update(self._AllLVs())
273     existing.update(self._config_data.instances.keys())
274     existing.update(self._config_data.nodes.keys())
275     existing.update([i.uuid for i in self._AllUUIDObjects() if i.uuid])
276     return existing
277
278   def _GenerateUniqueID(self, ec_id):
279     """Generate an unique UUID.
280
281     This checks the current node, instances and disk names for
282     duplicates.
283
284     @rtype: string
285     @return: the unique id
286
287     """
288     existing = self._AllIDs(include_temporary=False)
289     return self._temporary_ids.Generate(existing, utils.NewUUID, ec_id)
290
291   @locking.ssynchronized(_config_lock, shared=1)
292   def GenerateUniqueID(self, ec_id):
293     """Generate an unique ID.
294
295     This is just a wrapper over the unlocked version.
296
297     @type ec_id: string
298     @param ec_id: unique id for the job to reserve the id to
299
300     """
301     return self._GenerateUniqueID(ec_id)
302
303   def _AllMACs(self):
304     """Return all MACs present in the config.
305
306     @rtype: list
307     @return: the list of all MACs
308
309     """
310     result = []
311     for instance in self._config_data.instances.values():
312       for nic in instance.nics:
313         result.append(nic.mac)
314
315     return result
316
317   def _AllDRBDSecrets(self):
318     """Return all DRBD secrets present in the config.
319
320     @rtype: list
321     @return: the list of all DRBD secrets
322
323     """
324     def helper(disk, result):
325       """Recursively gather secrets from this disk."""
326       if disk.dev_type == constants.DT_DRBD8:
327         result.append(disk.logical_id[5])
328       if disk.children:
329         for child in disk.children:
330           helper(child, result)
331
332     result = []
333     for instance in self._config_data.instances.values():
334       for disk in instance.disks:
335         helper(disk, result)
336
337     return result
338
339   def _CheckDiskIDs(self, disk, l_ids, p_ids):
340     """Compute duplicate disk IDs
341
342     @type disk: L{objects.Disk}
343     @param disk: the disk at which to start searching
344     @type l_ids: list
345     @param l_ids: list of current logical ids
346     @type p_ids: list
347     @param p_ids: list of current physical ids
348     @rtype: list
349     @return: a list of error messages
350
351     """
352     result = []
353     if disk.logical_id is not None:
354       if disk.logical_id in l_ids:
355         result.append("duplicate logical id %s" % str(disk.logical_id))
356       else:
357         l_ids.append(disk.logical_id)
358     if disk.physical_id is not None:
359       if disk.physical_id in p_ids:
360         result.append("duplicate physical id %s" % str(disk.physical_id))
361       else:
362         p_ids.append(disk.physical_id)
363
364     if disk.children:
365       for child in disk.children:
366         result.extend(self._CheckDiskIDs(child, l_ids, p_ids))
367     return result
368
369   def _UnlockedVerifyConfig(self):
370     """Verify function.
371
372     @rtype: list
373     @return: a list of error messages; a non-empty list signifies
374         configuration errors
375
376     """
377     # pylint: disable=R0914
378     result = []
379     seen_macs = []
380     ports = {}
381     data = self._config_data
382     cluster = data.cluster
383     seen_lids = []
384     seen_pids = []
385
386     # global cluster checks
387     if not cluster.enabled_hypervisors:
388       result.append("enabled hypervisors list doesn't have any entries")
389     invalid_hvs = set(cluster.enabled_hypervisors) - constants.HYPER_TYPES
390     if invalid_hvs:
391       result.append("enabled hypervisors contains invalid entries: %s" %
392                     invalid_hvs)
393     missing_hvp = (set(cluster.enabled_hypervisors) -
394                    set(cluster.hvparams.keys()))
395     if missing_hvp:
396       result.append("hypervisor parameters missing for the enabled"
397                     " hypervisor(s) %s" % utils.CommaJoin(missing_hvp))
398
399     if cluster.master_node not in data.nodes:
400       result.append("cluster has invalid primary node '%s'" %
401                     cluster.master_node)
402
403     def _helper(owner, attr, value, template):
404       try:
405         utils.ForceDictType(value, template)
406       except errors.GenericError, err:
407         result.append("%s has invalid %s: %s" % (owner, attr, err))
408
409     def _helper_nic(owner, params):
410       try:
411         objects.NIC.CheckParameterSyntax(params)
412       except errors.ConfigurationError, err:
413         result.append("%s has invalid nicparams: %s" % (owner, err))
414
415     # check cluster parameters
416     _helper("cluster", "beparams", cluster.SimpleFillBE({}),
417             constants.BES_PARAMETER_TYPES)
418     _helper("cluster", "nicparams", cluster.SimpleFillNIC({}),
419             constants.NICS_PARAMETER_TYPES)
420     _helper_nic("cluster", cluster.SimpleFillNIC({}))
421     _helper("cluster", "ndparams", cluster.SimpleFillND({}),
422             constants.NDS_PARAMETER_TYPES)
423
424     # per-instance checks
425     for instance_name in data.instances:
426       instance = data.instances[instance_name]
427       if instance.name != instance_name:
428         result.append("instance '%s' is indexed by wrong name '%s'" %
429                       (instance.name, instance_name))
430       if instance.primary_node not in data.nodes:
431         result.append("instance '%s' has invalid primary node '%s'" %
432                       (instance_name, instance.primary_node))
433       for snode in instance.secondary_nodes:
434         if snode not in data.nodes:
435           result.append("instance '%s' has invalid secondary node '%s'" %
436                         (instance_name, snode))
437       for idx, nic in enumerate(instance.nics):
438         if nic.mac in seen_macs:
439           result.append("instance '%s' has NIC %d mac %s duplicate" %
440                         (instance_name, idx, nic.mac))
441         else:
442           seen_macs.append(nic.mac)
443         if nic.nicparams:
444           filled = cluster.SimpleFillNIC(nic.nicparams)
445           owner = "instance %s nic %d" % (instance.name, idx)
446           _helper(owner, "nicparams",
447                   filled, constants.NICS_PARAMETER_TYPES)
448           _helper_nic(owner, filled)
449
450       # parameter checks
451       if instance.beparams:
452         _helper("instance %s" % instance.name, "beparams",
453                 cluster.FillBE(instance), constants.BES_PARAMETER_TYPES)
454
455       # gather the drbd ports for duplicate checks
456       for dsk in instance.disks:
457         if dsk.dev_type in constants.LDS_DRBD:
458           tcp_port = dsk.logical_id[2]
459           if tcp_port not in ports:
460             ports[tcp_port] = []
461           ports[tcp_port].append((instance.name, "drbd disk %s" % dsk.iv_name))
462       # gather network port reservation
463       net_port = getattr(instance, "network_port", None)
464       if net_port is not None:
465         if net_port not in ports:
466           ports[net_port] = []
467         ports[net_port].append((instance.name, "network port"))
468
469       # instance disk verify
470       for idx, disk in enumerate(instance.disks):
471         result.extend(["instance '%s' disk %d error: %s" %
472                        (instance.name, idx, msg) for msg in disk.Verify()])
473         result.extend(self._CheckDiskIDs(disk, seen_lids, seen_pids))
474
475     # cluster-wide pool of free ports
476     for free_port in cluster.tcpudp_port_pool:
477       if free_port not in ports:
478         ports[free_port] = []
479       ports[free_port].append(("cluster", "port marked as free"))
480
481     # compute tcp/udp duplicate ports
482     keys = ports.keys()
483     keys.sort()
484     for pnum in keys:
485       pdata = ports[pnum]
486       if len(pdata) > 1:
487         txt = utils.CommaJoin(["%s/%s" % val for val in pdata])
488         result.append("tcp/udp port %s has duplicates: %s" % (pnum, txt))
489
490     # highest used tcp port check
491     if keys:
492       if keys[-1] > cluster.highest_used_port:
493         result.append("Highest used port mismatch, saved %s, computed %s" %
494                       (cluster.highest_used_port, keys[-1]))
495
496     if not data.nodes[cluster.master_node].master_candidate:
497       result.append("Master node is not a master candidate")
498
499     # master candidate checks
500     mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats()
501     if mc_now < mc_max:
502       result.append("Not enough master candidates: actual %d, target %d" %
503                     (mc_now, mc_max))
504
505     # node checks
506     for node_name, node in data.nodes.items():
507       if node.name != node_name:
508         result.append("Node '%s' is indexed by wrong name '%s'" %
509                       (node.name, node_name))
510       if [node.master_candidate, node.drained, node.offline].count(True) > 1:
511         result.append("Node %s state is invalid: master_candidate=%s,"
512                       " drain=%s, offline=%s" %
513                       (node.name, node.master_candidate, node.drained,
514                        node.offline))
515       if node.group not in data.nodegroups:
516         result.append("Node '%s' has invalid group '%s'" %
517                       (node.name, node.group))
518       else:
519         _helper("node %s" % node.name, "ndparams",
520                 cluster.FillND(node, data.nodegroups[node.group]),
521                 constants.NDS_PARAMETER_TYPES)
522
523     # nodegroups checks
524     nodegroups_names = set()
525     for nodegroup_uuid in data.nodegroups:
526       nodegroup = data.nodegroups[nodegroup_uuid]
527       if nodegroup.uuid != nodegroup_uuid:
528         result.append("node group '%s' (uuid: '%s') indexed by wrong uuid '%s'"
529                       % (nodegroup.name, nodegroup.uuid, nodegroup_uuid))
530       if utils.UUID_RE.match(nodegroup.name.lower()):
531         result.append("node group '%s' (uuid: '%s') has uuid-like name" %
532                       (nodegroup.name, nodegroup.uuid))
533       if nodegroup.name in nodegroups_names:
534         result.append("duplicate node group name '%s'" % nodegroup.name)
535       else:
536         nodegroups_names.add(nodegroup.name)
537       if nodegroup.ndparams:
538         _helper("group %s" % nodegroup.name, "ndparams",
539                 cluster.SimpleFillND(nodegroup.ndparams),
540                 constants.NDS_PARAMETER_TYPES)
541
542     # drbd minors check
543     _, duplicates = self._UnlockedComputeDRBDMap()
544     for node, minor, instance_a, instance_b in duplicates:
545       result.append("DRBD minor %d on node %s is assigned twice to instances"
546                     " %s and %s" % (minor, node, instance_a, instance_b))
547
548     # IP checks
549     default_nicparams = cluster.nicparams[constants.PP_DEFAULT]
550     ips = {}
551
552     def _AddIpAddress(ip, name):
553       ips.setdefault(ip, []).append(name)
554
555     _AddIpAddress(cluster.master_ip, "cluster_ip")
556
557     for node in data.nodes.values():
558       _AddIpAddress(node.primary_ip, "node:%s/primary" % node.name)
559       if node.secondary_ip != node.primary_ip:
560         _AddIpAddress(node.secondary_ip, "node:%s/secondary" % node.name)
561
562     for instance in data.instances.values():
563       for idx, nic in enumerate(instance.nics):
564         if nic.ip is None:
565           continue
566
567         nicparams = objects.FillDict(default_nicparams, nic.nicparams)
568         nic_mode = nicparams[constants.NIC_MODE]
569         nic_link = nicparams[constants.NIC_LINK]
570
571         if nic_mode == constants.NIC_MODE_BRIDGED:
572           link = "bridge:%s" % nic_link
573         elif nic_mode == constants.NIC_MODE_ROUTED:
574           link = "route:%s" % nic_link
575         else:
576           raise errors.ProgrammerError("NIC mode '%s' not handled" % nic_mode)
577
578         _AddIpAddress("%s/%s" % (link, nic.ip),
579                       "instance:%s/nic:%d" % (instance.name, idx))
580
581     for ip, owners in ips.items():
582       if len(owners) > 1:
583         result.append("IP address %s is used by multiple owners: %s" %
584                       (ip, utils.CommaJoin(owners)))
585
586     return result
587
588   @locking.ssynchronized(_config_lock, shared=1)
589   def VerifyConfig(self):
590     """Verify function.
591
592     This is just a wrapper over L{_UnlockedVerifyConfig}.
593
594     @rtype: list
595     @return: a list of error messages; a non-empty list signifies
596         configuration errors
597
598     """
599     return self._UnlockedVerifyConfig()
600
601   def _UnlockedSetDiskID(self, disk, node_name):
602     """Convert the unique ID to the ID needed on the target nodes.
603
604     This is used only for drbd, which needs ip/port configuration.
605
606     The routine descends down and updates its children also, because
607     this helps when the only the top device is passed to the remote
608     node.
609
610     This function is for internal use, when the config lock is already held.
611
612     """
613     if disk.children:
614       for child in disk.children:
615         self._UnlockedSetDiskID(child, node_name)
616
617     if disk.logical_id is None and disk.physical_id is not None:
618       return
619     if disk.dev_type == constants.LD_DRBD8:
620       pnode, snode, port, pminor, sminor, secret = disk.logical_id
621       if node_name not in (pnode, snode):
622         raise errors.ConfigurationError("DRBD device not knowing node %s" %
623                                         node_name)
624       pnode_info = self._UnlockedGetNodeInfo(pnode)
625       snode_info = self._UnlockedGetNodeInfo(snode)
626       if pnode_info is None or snode_info is None:
627         raise errors.ConfigurationError("Can't find primary or secondary node"
628                                         " for %s" % str(disk))
629       p_data = (pnode_info.secondary_ip, port)
630       s_data = (snode_info.secondary_ip, port)
631       if pnode == node_name:
632         disk.physical_id = p_data + s_data + (pminor, secret)
633       else: # it must be secondary, we tested above
634         disk.physical_id = s_data + p_data + (sminor, secret)
635     else:
636       disk.physical_id = disk.logical_id
637     return
638
639   @locking.ssynchronized(_config_lock)
640   def SetDiskID(self, disk, node_name):
641     """Convert the unique ID to the ID needed on the target nodes.
642
643     This is used only for drbd, which needs ip/port configuration.
644
645     The routine descends down and updates its children also, because
646     this helps when the only the top device is passed to the remote
647     node.
648
649     """
650     return self._UnlockedSetDiskID(disk, node_name)
651
652   @locking.ssynchronized(_config_lock)
653   def AddTcpUdpPort(self, port):
654     """Adds a new port to the available port pool.
655
656     """
657     if not isinstance(port, int):
658       raise errors.ProgrammerError("Invalid type passed for port")
659
660     self._config_data.cluster.tcpudp_port_pool.add(port)
661     self._WriteConfig()
662
663   @locking.ssynchronized(_config_lock, shared=1)
664   def GetPortList(self):
665     """Returns a copy of the current port list.
666
667     """
668     return self._config_data.cluster.tcpudp_port_pool.copy()
669
670   @locking.ssynchronized(_config_lock)
671   def AllocatePort(self):
672     """Allocate a port.
673
674     The port will be taken from the available port pool or from the
675     default port range (and in this case we increase
676     highest_used_port).
677
678     """
679     # If there are TCP/IP ports configured, we use them first.
680     if self._config_data.cluster.tcpudp_port_pool:
681       port = self._config_data.cluster.tcpudp_port_pool.pop()
682     else:
683       port = self._config_data.cluster.highest_used_port + 1
684       if port >= constants.LAST_DRBD_PORT:
685         raise errors.ConfigurationError("The highest used port is greater"
686                                         " than %s. Aborting." %
687                                         constants.LAST_DRBD_PORT)
688       self._config_data.cluster.highest_used_port = port
689
690     self._WriteConfig()
691     return port
692
693   def _UnlockedComputeDRBDMap(self):
694     """Compute the used DRBD minor/nodes.
695
696     @rtype: (dict, list)
697     @return: dictionary of node_name: dict of minor: instance_name;
698         the returned dict will have all the nodes in it (even if with
699         an empty list), and a list of duplicates; if the duplicates
700         list is not empty, the configuration is corrupted and its caller
701         should raise an exception
702
703     """
704     def _AppendUsedPorts(instance_name, disk, used):
705       duplicates = []
706       if disk.dev_type == constants.LD_DRBD8 and len(disk.logical_id) >= 5:
707         node_a, node_b, _, minor_a, minor_b = disk.logical_id[:5]
708         for node, port in ((node_a, minor_a), (node_b, minor_b)):
709           assert node in used, ("Node '%s' of instance '%s' not found"
710                                 " in node list" % (node, instance_name))
711           if port in used[node]:
712             duplicates.append((node, port, instance_name, used[node][port]))
713           else:
714             used[node][port] = instance_name
715       if disk.children:
716         for child in disk.children:
717           duplicates.extend(_AppendUsedPorts(instance_name, child, used))
718       return duplicates
719
720     duplicates = []
721     my_dict = dict((node, {}) for node in self._config_data.nodes)
722     for instance in self._config_data.instances.itervalues():
723       for disk in instance.disks:
724         duplicates.extend(_AppendUsedPorts(instance.name, disk, my_dict))
725     for (node, minor), instance in self._temporary_drbds.iteritems():
726       if minor in my_dict[node] and my_dict[node][minor] != instance:
727         duplicates.append((node, minor, instance, my_dict[node][minor]))
728       else:
729         my_dict[node][minor] = instance
730     return my_dict, duplicates
731
732   @locking.ssynchronized(_config_lock)
733   def ComputeDRBDMap(self):
734     """Compute the used DRBD minor/nodes.
735
736     This is just a wrapper over L{_UnlockedComputeDRBDMap}.
737
738     @return: dictionary of node_name: dict of minor: instance_name;
739         the returned dict will have all the nodes in it (even if with
740         an empty list).
741
742     """
743     d_map, duplicates = self._UnlockedComputeDRBDMap()
744     if duplicates:
745       raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
746                                       str(duplicates))
747     return d_map
748
749   @locking.ssynchronized(_config_lock)
750   def AllocateDRBDMinor(self, nodes, instance):
751     """Allocate a drbd minor.
752
753     The free minor will be automatically computed from the existing
754     devices. A node can be given multiple times in order to allocate
755     multiple minors. The result is the list of minors, in the same
756     order as the passed nodes.
757
758     @type instance: string
759     @param instance: the instance for which we allocate minors
760
761     """
762     assert isinstance(instance, basestring), \
763            "Invalid argument '%s' passed to AllocateDRBDMinor" % instance
764
765     d_map, duplicates = self._UnlockedComputeDRBDMap()
766     if duplicates:
767       raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
768                                       str(duplicates))
769     result = []
770     for nname in nodes:
771       ndata = d_map[nname]
772       if not ndata:
773         # no minors used, we can start at 0
774         result.append(0)
775         ndata[0] = instance
776         self._temporary_drbds[(nname, 0)] = instance
777         continue
778       keys = ndata.keys()
779       keys.sort()
780       ffree = utils.FirstFree(keys)
781       if ffree is None:
782         # return the next minor
783         # TODO: implement high-limit check
784         minor = keys[-1] + 1
785       else:
786         minor = ffree
787       # double-check minor against current instances
788       assert minor not in d_map[nname], \
789              ("Attempt to reuse allocated DRBD minor %d on node %s,"
790               " already allocated to instance %s" %
791               (minor, nname, d_map[nname][minor]))
792       ndata[minor] = instance
793       # double-check minor against reservation
794       r_key = (nname, minor)
795       assert r_key not in self._temporary_drbds, \
796              ("Attempt to reuse reserved DRBD minor %d on node %s,"
797               " reserved for instance %s" %
798               (minor, nname, self._temporary_drbds[r_key]))
799       self._temporary_drbds[r_key] = instance
800       result.append(minor)
801     logging.debug("Request to allocate drbd minors, input: %s, returning %s",
802                   nodes, result)
803     return result
804
805   def _UnlockedReleaseDRBDMinors(self, instance):
806     """Release temporary drbd minors allocated for a given instance.
807
808     @type instance: string
809     @param instance: the instance for which temporary minors should be
810                      released
811
812     """
813     assert isinstance(instance, basestring), \
814            "Invalid argument passed to ReleaseDRBDMinors"
815     for key, name in self._temporary_drbds.items():
816       if name == instance:
817         del self._temporary_drbds[key]
818
819   @locking.ssynchronized(_config_lock)
820   def ReleaseDRBDMinors(self, instance):
821     """Release temporary drbd minors allocated for a given instance.
822
823     This should be called on the error paths, on the success paths
824     it's automatically called by the ConfigWriter add and update
825     functions.
826
827     This function is just a wrapper over L{_UnlockedReleaseDRBDMinors}.
828
829     @type instance: string
830     @param instance: the instance for which temporary minors should be
831                      released
832
833     """
834     self._UnlockedReleaseDRBDMinors(instance)
835
836   @locking.ssynchronized(_config_lock, shared=1)
837   def GetConfigVersion(self):
838     """Get the configuration version.
839
840     @return: Config version
841
842     """
843     return self._config_data.version
844
845   @locking.ssynchronized(_config_lock, shared=1)
846   def GetClusterName(self):
847     """Get cluster name.
848
849     @return: Cluster name
850
851     """
852     return self._config_data.cluster.cluster_name
853
854   @locking.ssynchronized(_config_lock, shared=1)
855   def GetMasterNode(self):
856     """Get the hostname of the master node for this cluster.
857
858     @return: Master hostname
859
860     """
861     return self._config_data.cluster.master_node
862
863   @locking.ssynchronized(_config_lock, shared=1)
864   def GetMasterIP(self):
865     """Get the IP of the master node for this cluster.
866
867     @return: Master IP
868
869     """
870     return self._config_data.cluster.master_ip
871
872   @locking.ssynchronized(_config_lock, shared=1)
873   def GetMasterNetdev(self):
874     """Get the master network device for this cluster.
875
876     """
877     return self._config_data.cluster.master_netdev
878
879   @locking.ssynchronized(_config_lock, shared=1)
880   def GetFileStorageDir(self):
881     """Get the file storage dir for this cluster.
882
883     """
884     return self._config_data.cluster.file_storage_dir
885
886   @locking.ssynchronized(_config_lock, shared=1)
887   def GetSharedFileStorageDir(self):
888     """Get the shared file storage dir for this cluster.
889
890     """
891     return self._config_data.cluster.shared_file_storage_dir
892
893   @locking.ssynchronized(_config_lock, shared=1)
894   def GetHypervisorType(self):
895     """Get the hypervisor type for this cluster.
896
897     """
898     return self._config_data.cluster.enabled_hypervisors[0]
899
900   @locking.ssynchronized(_config_lock, shared=1)
901   def GetHostKey(self):
902     """Return the rsa hostkey from the config.
903
904     @rtype: string
905     @return: the rsa hostkey
906
907     """
908     return self._config_data.cluster.rsahostkeypub
909
910   @locking.ssynchronized(_config_lock, shared=1)
911   def GetDefaultIAllocator(self):
912     """Get the default instance allocator for this cluster.
913
914     """
915     return self._config_data.cluster.default_iallocator
916
917   @locking.ssynchronized(_config_lock, shared=1)
918   def GetPrimaryIPFamily(self):
919     """Get cluster primary ip family.
920
921     @return: primary ip family
922
923     """
924     return self._config_data.cluster.primary_ip_family
925
926   @locking.ssynchronized(_config_lock)
927   def AddNodeGroup(self, group, ec_id, check_uuid=True):
928     """Add a node group to the configuration.
929
930     This method calls group.UpgradeConfig() to fill any missing attributes
931     according to their default values.
932
933     @type group: L{objects.NodeGroup}
934     @param group: the NodeGroup object to add
935     @type ec_id: string
936     @param ec_id: unique id for the job to use when creating a missing UUID
937     @type check_uuid: bool
938     @param check_uuid: add an UUID to the group if it doesn't have one or, if
939                        it does, ensure that it does not exist in the
940                        configuration already
941
942     """
943     self._UnlockedAddNodeGroup(group, ec_id, check_uuid)
944     self._WriteConfig()
945
946   def _UnlockedAddNodeGroup(self, group, ec_id, check_uuid):
947     """Add a node group to the configuration.
948
949     """
950     logging.info("Adding node group %s to configuration", group.name)
951
952     # Some code might need to add a node group with a pre-populated UUID
953     # generated with ConfigWriter.GenerateUniqueID(). We allow them to bypass
954     # the "does this UUID" exist already check.
955     if check_uuid:
956       self._EnsureUUID(group, ec_id)
957
958     try:
959       existing_uuid = self._UnlockedLookupNodeGroup(group.name)
960     except errors.OpPrereqError:
961       pass
962     else:
963       raise errors.OpPrereqError("Desired group name '%s' already exists as a"
964                                  " node group (UUID: %s)" %
965                                  (group.name, existing_uuid),
966                                  errors.ECODE_EXISTS)
967
968     group.serial_no = 1
969     group.ctime = group.mtime = time.time()
970     group.UpgradeConfig()
971
972     self._config_data.nodegroups[group.uuid] = group
973     self._config_data.cluster.serial_no += 1
974
975   @locking.ssynchronized(_config_lock)
976   def RemoveNodeGroup(self, group_uuid):
977     """Remove a node group from the configuration.
978
979     @type group_uuid: string
980     @param group_uuid: the UUID of the node group to remove
981
982     """
983     logging.info("Removing node group %s from configuration", group_uuid)
984
985     if group_uuid not in self._config_data.nodegroups:
986       raise errors.ConfigurationError("Unknown node group '%s'" % group_uuid)
987
988     assert len(self._config_data.nodegroups) != 1, \
989             "Group '%s' is the only group, cannot be removed" % group_uuid
990
991     del self._config_data.nodegroups[group_uuid]
992     self._config_data.cluster.serial_no += 1
993     self._WriteConfig()
994
995   def _UnlockedLookupNodeGroup(self, target):
996     """Lookup a node group's UUID.
997
998     @type target: string or None
999     @param target: group name or UUID or None to look for the default
1000     @rtype: string
1001     @return: nodegroup UUID
1002     @raises errors.OpPrereqError: when the target group cannot be found
1003
1004     """
1005     if target is None:
1006       if len(self._config_data.nodegroups) != 1:
1007         raise errors.OpPrereqError("More than one node group exists. Target"
1008                                    " group must be specified explicitely.")
1009       else:
1010         return self._config_data.nodegroups.keys()[0]
1011     if target in self._config_data.nodegroups:
1012       return target
1013     for nodegroup in self._config_data.nodegroups.values():
1014       if nodegroup.name == target:
1015         return nodegroup.uuid
1016     raise errors.OpPrereqError("Node group '%s' not found" % target,
1017                                errors.ECODE_NOENT)
1018
1019   @locking.ssynchronized(_config_lock, shared=1)
1020   def LookupNodeGroup(self, target):
1021     """Lookup a node group's UUID.
1022
1023     This function is just a wrapper over L{_UnlockedLookupNodeGroup}.
1024
1025     @type target: string or None
1026     @param target: group name or UUID or None to look for the default
1027     @rtype: string
1028     @return: nodegroup UUID
1029
1030     """
1031     return self._UnlockedLookupNodeGroup(target)
1032
1033   def _UnlockedGetNodeGroup(self, uuid):
1034     """Lookup a node group.
1035
1036     @type uuid: string
1037     @param uuid: group UUID
1038     @rtype: L{objects.NodeGroup} or None
1039     @return: nodegroup object, or None if not found
1040
1041     """
1042     if uuid not in self._config_data.nodegroups:
1043       return None
1044
1045     return self._config_data.nodegroups[uuid]
1046
1047   @locking.ssynchronized(_config_lock, shared=1)
1048   def GetNodeGroup(self, uuid):
1049     """Lookup a node group.
1050
1051     @type uuid: string
1052     @param uuid: group UUID
1053     @rtype: L{objects.NodeGroup} or None
1054     @return: nodegroup object, or None if not found
1055
1056     """
1057     return self._UnlockedGetNodeGroup(uuid)
1058
1059   @locking.ssynchronized(_config_lock, shared=1)
1060   def GetAllNodeGroupsInfo(self):
1061     """Get the configuration of all node groups.
1062
1063     """
1064     return dict(self._config_data.nodegroups)
1065
1066   @locking.ssynchronized(_config_lock, shared=1)
1067   def GetNodeGroupList(self):
1068     """Get a list of node groups.
1069
1070     """
1071     return self._config_data.nodegroups.keys()
1072
1073   @locking.ssynchronized(_config_lock, shared=1)
1074   def GetNodeGroupMembersByNodes(self, nodes):
1075     """Get nodes which are member in the same nodegroups as the given nodes.
1076
1077     """
1078     ngfn = lambda node_name: self._UnlockedGetNodeInfo(node_name).group
1079     return frozenset(member_name
1080                      for node_name in nodes
1081                      for member_name in
1082                        self._UnlockedGetNodeGroup(ngfn(node_name)).members)
1083
1084   @locking.ssynchronized(_config_lock)
1085   def AddInstance(self, instance, ec_id):
1086     """Add an instance to the config.
1087
1088     This should be used after creating a new instance.
1089
1090     @type instance: L{objects.Instance}
1091     @param instance: the instance object
1092
1093     """
1094     if not isinstance(instance, objects.Instance):
1095       raise errors.ProgrammerError("Invalid type passed to AddInstance")
1096
1097     if instance.disk_template != constants.DT_DISKLESS:
1098       all_lvs = instance.MapLVsByNode()
1099       logging.info("Instance '%s' DISK_LAYOUT: %s", instance.name, all_lvs)
1100
1101     all_macs = self._AllMACs()
1102     for nic in instance.nics:
1103       if nic.mac in all_macs:
1104         raise errors.ConfigurationError("Cannot add instance %s:"
1105                                         " MAC address '%s' already in use." %
1106                                         (instance.name, nic.mac))
1107
1108     self._EnsureUUID(instance, ec_id)
1109
1110     instance.serial_no = 1
1111     instance.ctime = instance.mtime = time.time()
1112     self._config_data.instances[instance.name] = instance
1113     self._config_data.cluster.serial_no += 1
1114     self._UnlockedReleaseDRBDMinors(instance.name)
1115     self._WriteConfig()
1116
1117   def _EnsureUUID(self, item, ec_id):
1118     """Ensures a given object has a valid UUID.
1119
1120     @param item: the instance or node to be checked
1121     @param ec_id: the execution context id for the uuid reservation
1122
1123     """
1124     if not item.uuid:
1125       item.uuid = self._GenerateUniqueID(ec_id)
1126     elif item.uuid in self._AllIDs(include_temporary=True):
1127       raise errors.ConfigurationError("Cannot add '%s': UUID %s already"
1128                                       " in use" % (item.name, item.uuid))
1129
1130   def _SetInstanceStatus(self, instance_name, status):
1131     """Set the instance's status to a given value.
1132
1133     """
1134     assert isinstance(status, bool), \
1135            "Invalid status '%s' passed to SetInstanceStatus" % (status,)
1136
1137     if instance_name not in self._config_data.instances:
1138       raise errors.ConfigurationError("Unknown instance '%s'" %
1139                                       instance_name)
1140     instance = self._config_data.instances[instance_name]
1141     if instance.admin_up != status:
1142       instance.admin_up = status
1143       instance.serial_no += 1
1144       instance.mtime = time.time()
1145       self._WriteConfig()
1146
1147   @locking.ssynchronized(_config_lock)
1148   def MarkInstanceUp(self, instance_name):
1149     """Mark the instance status to up in the config.
1150
1151     """
1152     self._SetInstanceStatus(instance_name, True)
1153
1154   @locking.ssynchronized(_config_lock)
1155   def RemoveInstance(self, instance_name):
1156     """Remove the instance from the configuration.
1157
1158     """
1159     if instance_name not in self._config_data.instances:
1160       raise errors.ConfigurationError("Unknown instance '%s'" % instance_name)
1161     del self._config_data.instances[instance_name]
1162     self._config_data.cluster.serial_no += 1
1163     self._WriteConfig()
1164
1165   @locking.ssynchronized(_config_lock)
1166   def RenameInstance(self, old_name, new_name):
1167     """Rename an instance.
1168
1169     This needs to be done in ConfigWriter and not by RemoveInstance
1170     combined with AddInstance as only we can guarantee an atomic
1171     rename.
1172
1173     """
1174     if old_name not in self._config_data.instances:
1175       raise errors.ConfigurationError("Unknown instance '%s'" % old_name)
1176     inst = self._config_data.instances[old_name]
1177     del self._config_data.instances[old_name]
1178     inst.name = new_name
1179
1180     for disk in inst.disks:
1181       if disk.dev_type == constants.LD_FILE:
1182         # rename the file paths in logical and physical id
1183         file_storage_dir = os.path.dirname(os.path.dirname(disk.logical_id[1]))
1184         disk_fname = "disk%s" % disk.iv_name.split("/")[1]
1185         disk.physical_id = disk.logical_id = (disk.logical_id[0],
1186                                               utils.PathJoin(file_storage_dir,
1187                                                              inst.name,
1188                                                              disk_fname))
1189
1190     # Force update of ssconf files
1191     self._config_data.cluster.serial_no += 1
1192
1193     self._config_data.instances[inst.name] = inst
1194     self._WriteConfig()
1195
1196   @locking.ssynchronized(_config_lock)
1197   def MarkInstanceDown(self, instance_name):
1198     """Mark the status of an instance to down in the configuration.
1199
1200     """
1201     self._SetInstanceStatus(instance_name, False)
1202
1203   def _UnlockedGetInstanceList(self):
1204     """Get the list of instances.
1205
1206     This function is for internal use, when the config lock is already held.
1207
1208     """
1209     return self._config_data.instances.keys()
1210
1211   @locking.ssynchronized(_config_lock, shared=1)
1212   def GetInstanceList(self):
1213     """Get the list of instances.
1214
1215     @return: array of instances, ex. ['instance2.example.com',
1216         'instance1.example.com']
1217
1218     """
1219     return self._UnlockedGetInstanceList()
1220
1221   def ExpandInstanceName(self, short_name):
1222     """Attempt to expand an incomplete instance name.
1223
1224     """
1225     # Locking is done in L{ConfigWriter.GetInstanceList}
1226     return _MatchNameComponentIgnoreCase(short_name, self.GetInstanceList())
1227
1228   def _UnlockedGetInstanceInfo(self, instance_name):
1229     """Returns information about an instance.
1230
1231     This function is for internal use, when the config lock is already held.
1232
1233     """
1234     if instance_name not in self._config_data.instances:
1235       return None
1236
1237     return self._config_data.instances[instance_name]
1238
1239   @locking.ssynchronized(_config_lock, shared=1)
1240   def GetInstanceInfo(self, instance_name):
1241     """Returns information about an instance.
1242
1243     It takes the information from the configuration file. Other information of
1244     an instance are taken from the live systems.
1245
1246     @param instance_name: name of the instance, e.g.
1247         I{instance1.example.com}
1248
1249     @rtype: L{objects.Instance}
1250     @return: the instance object
1251
1252     """
1253     return self._UnlockedGetInstanceInfo(instance_name)
1254
1255   @locking.ssynchronized(_config_lock, shared=1)
1256   def GetInstanceNodeGroups(self, instance_name, primary_only=False):
1257     """Returns set of node group UUIDs for instance's nodes.
1258
1259     @rtype: frozenset
1260
1261     """
1262     instance = self._UnlockedGetInstanceInfo(instance_name)
1263     if not instance:
1264       raise errors.ConfigurationError("Unknown instance '%s'" % instance_name)
1265
1266     if primary_only:
1267       nodes = [instance.primary_node]
1268     else:
1269       nodes = instance.all_nodes
1270
1271     return frozenset(self._UnlockedGetNodeInfo(node_name).group
1272                      for node_name in nodes)
1273
1274   @locking.ssynchronized(_config_lock, shared=1)
1275   def GetMultiInstanceInfo(self, instances):
1276     """Get the configuration of multiple instances.
1277
1278     @param instances: list of instance names
1279     @rtype: list
1280     @return: list of tuples (instance, instance_info), where
1281         instance_info is what would GetInstanceInfo return for the
1282         node, while keeping the original order
1283
1284     """
1285     return [(name, self._UnlockedGetInstanceInfo(name)) for name in instances]
1286
1287   @locking.ssynchronized(_config_lock, shared=1)
1288   def GetAllInstancesInfo(self):
1289     """Get the configuration of all instances.
1290
1291     @rtype: dict
1292     @return: dict of (instance, instance_info), where instance_info is what
1293               would GetInstanceInfo return for the node
1294
1295     """
1296     my_dict = dict([(instance, self._UnlockedGetInstanceInfo(instance))
1297                     for instance in self._UnlockedGetInstanceList()])
1298     return my_dict
1299
1300   @locking.ssynchronized(_config_lock)
1301   def AddNode(self, node, ec_id):
1302     """Add a node to the configuration.
1303
1304     @type node: L{objects.Node}
1305     @param node: a Node instance
1306
1307     """
1308     logging.info("Adding node %s to configuration", node.name)
1309
1310     self._EnsureUUID(node, ec_id)
1311
1312     node.serial_no = 1
1313     node.ctime = node.mtime = time.time()
1314     self._UnlockedAddNodeToGroup(node.name, node.group)
1315     self._config_data.nodes[node.name] = node
1316     self._config_data.cluster.serial_no += 1
1317     self._WriteConfig()
1318
1319   @locking.ssynchronized(_config_lock)
1320   def RemoveNode(self, node_name):
1321     """Remove a node from the configuration.
1322
1323     """
1324     logging.info("Removing node %s from configuration", node_name)
1325
1326     if node_name not in self._config_data.nodes:
1327       raise errors.ConfigurationError("Unknown node '%s'" % node_name)
1328
1329     self._UnlockedRemoveNodeFromGroup(self._config_data.nodes[node_name])
1330     del self._config_data.nodes[node_name]
1331     self._config_data.cluster.serial_no += 1
1332     self._WriteConfig()
1333
1334   def ExpandNodeName(self, short_name):
1335     """Attempt to expand an incomplete node name.
1336
1337     """
1338     # Locking is done in L{ConfigWriter.GetNodeList}
1339     return _MatchNameComponentIgnoreCase(short_name, self.GetNodeList())
1340
1341   def _UnlockedGetNodeInfo(self, node_name):
1342     """Get the configuration of a node, as stored in the config.
1343
1344     This function is for internal use, when the config lock is already
1345     held.
1346
1347     @param node_name: the node name, e.g. I{node1.example.com}
1348
1349     @rtype: L{objects.Node}
1350     @return: the node object
1351
1352     """
1353     if node_name not in self._config_data.nodes:
1354       return None
1355
1356     return self._config_data.nodes[node_name]
1357
1358   @locking.ssynchronized(_config_lock, shared=1)
1359   def GetNodeInfo(self, node_name):
1360     """Get the configuration of a node, as stored in the config.
1361
1362     This is just a locked wrapper over L{_UnlockedGetNodeInfo}.
1363
1364     @param node_name: the node name, e.g. I{node1.example.com}
1365
1366     @rtype: L{objects.Node}
1367     @return: the node object
1368
1369     """
1370     return self._UnlockedGetNodeInfo(node_name)
1371
1372   @locking.ssynchronized(_config_lock, shared=1)
1373   def GetNodeInstances(self, node_name):
1374     """Get the instances of a node, as stored in the config.
1375
1376     @param node_name: the node name, e.g. I{node1.example.com}
1377
1378     @rtype: (list, list)
1379     @return: a tuple with two lists: the primary and the secondary instances
1380
1381     """
1382     pri = []
1383     sec = []
1384     for inst in self._config_data.instances.values():
1385       if inst.primary_node == node_name:
1386         pri.append(inst.name)
1387       if node_name in inst.secondary_nodes:
1388         sec.append(inst.name)
1389     return (pri, sec)
1390
1391   @locking.ssynchronized(_config_lock, shared=1)
1392   def GetNodeGroupInstances(self, uuid, primary_only=False):
1393     """Get the instances of a node group.
1394
1395     @param uuid: Node group UUID
1396     @param primary_only: Whether to only consider primary nodes
1397     @rtype: frozenset
1398     @return: List of instance names in node group
1399
1400     """
1401     if primary_only:
1402       nodes_fn = lambda inst: [inst.primary_node]
1403     else:
1404       nodes_fn = lambda inst: inst.all_nodes
1405
1406     return frozenset(inst.name
1407                      for inst in self._config_data.instances.values()
1408                      for node_name in nodes_fn(inst)
1409                      if self._UnlockedGetNodeInfo(node_name).group == uuid)
1410
1411   def _UnlockedGetNodeList(self):
1412     """Return the list of nodes which are in the configuration.
1413
1414     This function is for internal use, when the config lock is already
1415     held.
1416
1417     @rtype: list
1418
1419     """
1420     return self._config_data.nodes.keys()
1421
1422   @locking.ssynchronized(_config_lock, shared=1)
1423   def GetNodeList(self):
1424     """Return the list of nodes which are in the configuration.
1425
1426     """
1427     return self._UnlockedGetNodeList()
1428
1429   def _UnlockedGetOnlineNodeList(self):
1430     """Return the list of nodes which are online.
1431
1432     """
1433     all_nodes = [self._UnlockedGetNodeInfo(node)
1434                  for node in self._UnlockedGetNodeList()]
1435     return [node.name for node in all_nodes if not node.offline]
1436
1437   @locking.ssynchronized(_config_lock, shared=1)
1438   def GetOnlineNodeList(self):
1439     """Return the list of nodes which are online.
1440
1441     """
1442     return self._UnlockedGetOnlineNodeList()
1443
1444   @locking.ssynchronized(_config_lock, shared=1)
1445   def GetVmCapableNodeList(self):
1446     """Return the list of nodes which are not vm capable.
1447
1448     """
1449     all_nodes = [self._UnlockedGetNodeInfo(node)
1450                  for node in self._UnlockedGetNodeList()]
1451     return [node.name for node in all_nodes if node.vm_capable]
1452
1453   @locking.ssynchronized(_config_lock, shared=1)
1454   def GetNonVmCapableNodeList(self):
1455     """Return the list of nodes which are not vm capable.
1456
1457     """
1458     all_nodes = [self._UnlockedGetNodeInfo(node)
1459                  for node in self._UnlockedGetNodeList()]
1460     return [node.name for node in all_nodes if not node.vm_capable]
1461
1462   @locking.ssynchronized(_config_lock, shared=1)
1463   def GetMultiNodeInfo(self, nodes):
1464     """Get the configuration of multiple nodes.
1465
1466     @param nodes: list of node names
1467     @rtype: list
1468     @return: list of tuples of (node, node_info), where node_info is
1469         what would GetNodeInfo return for the node, in the original
1470         order
1471
1472     """
1473     return [(name, self._UnlockedGetNodeInfo(name)) for name in nodes]
1474
1475   @locking.ssynchronized(_config_lock, shared=1)
1476   def GetAllNodesInfo(self):
1477     """Get the configuration of all nodes.
1478
1479     @rtype: dict
1480     @return: dict of (node, node_info), where node_info is what
1481               would GetNodeInfo return for the node
1482
1483     """
1484     my_dict = dict([(node, self._UnlockedGetNodeInfo(node))
1485                     for node in self._UnlockedGetNodeList()])
1486     return my_dict
1487
1488   @locking.ssynchronized(_config_lock, shared=1)
1489   def GetNodeGroupsFromNodes(self, nodes):
1490     """Returns groups for a list of nodes.
1491
1492     @type nodes: list of string
1493     @param nodes: List of node names
1494     @rtype: frozenset
1495
1496     """
1497     return frozenset(self._UnlockedGetNodeInfo(name).group for name in nodes)
1498
1499   def _UnlockedGetMasterCandidateStats(self, exceptions=None):
1500     """Get the number of current and maximum desired and possible candidates.
1501
1502     @type exceptions: list
1503     @param exceptions: if passed, list of nodes that should be ignored
1504     @rtype: tuple
1505     @return: tuple of (current, desired and possible, possible)
1506
1507     """
1508     mc_now = mc_should = mc_max = 0
1509     for node in self._config_data.nodes.values():
1510       if exceptions and node.name in exceptions:
1511         continue
1512       if not (node.offline or node.drained) and node.master_capable:
1513         mc_max += 1
1514       if node.master_candidate:
1515         mc_now += 1
1516     mc_should = min(mc_max, self._config_data.cluster.candidate_pool_size)
1517     return (mc_now, mc_should, mc_max)
1518
1519   @locking.ssynchronized(_config_lock, shared=1)
1520   def GetMasterCandidateStats(self, exceptions=None):
1521     """Get the number of current and maximum possible candidates.
1522
1523     This is just a wrapper over L{_UnlockedGetMasterCandidateStats}.
1524
1525     @type exceptions: list
1526     @param exceptions: if passed, list of nodes that should be ignored
1527     @rtype: tuple
1528     @return: tuple of (current, max)
1529
1530     """
1531     return self._UnlockedGetMasterCandidateStats(exceptions)
1532
1533   @locking.ssynchronized(_config_lock)
1534   def MaintainCandidatePool(self, exceptions):
1535     """Try to grow the candidate pool to the desired size.
1536
1537     @type exceptions: list
1538     @param exceptions: if passed, list of nodes that should be ignored
1539     @rtype: list
1540     @return: list with the adjusted nodes (L{objects.Node} instances)
1541
1542     """
1543     mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats(exceptions)
1544     mod_list = []
1545     if mc_now < mc_max:
1546       node_list = self._config_data.nodes.keys()
1547       random.shuffle(node_list)
1548       for name in node_list:
1549         if mc_now >= mc_max:
1550           break
1551         node = self._config_data.nodes[name]
1552         if (node.master_candidate or node.offline or node.drained or
1553             node.name in exceptions or not node.master_capable):
1554           continue
1555         mod_list.append(node)
1556         node.master_candidate = True
1557         node.serial_no += 1
1558         mc_now += 1
1559       if mc_now != mc_max:
1560         # this should not happen
1561         logging.warning("Warning: MaintainCandidatePool didn't manage to"
1562                         " fill the candidate pool (%d/%d)", mc_now, mc_max)
1563       if mod_list:
1564         self._config_data.cluster.serial_no += 1
1565         self._WriteConfig()
1566
1567     return mod_list
1568
1569   def _UnlockedAddNodeToGroup(self, node_name, nodegroup_uuid):
1570     """Add a given node to the specified group.
1571
1572     """
1573     if nodegroup_uuid not in self._config_data.nodegroups:
1574       # This can happen if a node group gets deleted between its lookup and
1575       # when we're adding the first node to it, since we don't keep a lock in
1576       # the meantime. It's ok though, as we'll fail cleanly if the node group
1577       # is not found anymore.
1578       raise errors.OpExecError("Unknown node group: %s" % nodegroup_uuid)
1579     if node_name not in self._config_data.nodegroups[nodegroup_uuid].members:
1580       self._config_data.nodegroups[nodegroup_uuid].members.append(node_name)
1581
1582   def _UnlockedRemoveNodeFromGroup(self, node):
1583     """Remove a given node from its group.
1584
1585     """
1586     nodegroup = node.group
1587     if nodegroup not in self._config_data.nodegroups:
1588       logging.warning("Warning: node '%s' has unknown node group '%s'"
1589                       " (while being removed from it)", node.name, nodegroup)
1590     nodegroup_obj = self._config_data.nodegroups[nodegroup]
1591     if node.name not in nodegroup_obj.members:
1592       logging.warning("Warning: node '%s' not a member of its node group '%s'"
1593                       " (while being removed from it)", node.name, nodegroup)
1594     else:
1595       nodegroup_obj.members.remove(node.name)
1596
1597   def _BumpSerialNo(self):
1598     """Bump up the serial number of the config.
1599
1600     """
1601     self._config_data.serial_no += 1
1602     self._config_data.mtime = time.time()
1603
1604   def _AllUUIDObjects(self):
1605     """Returns all objects with uuid attributes.
1606
1607     """
1608     return (self._config_data.instances.values() +
1609             self._config_data.nodes.values() +
1610             self._config_data.nodegroups.values() +
1611             [self._config_data.cluster])
1612
1613   def _OpenConfig(self, accept_foreign):
1614     """Read the config data from disk.
1615
1616     """
1617     raw_data = utils.ReadFile(self._cfg_file)
1618
1619     try:
1620       data = objects.ConfigData.FromDict(serializer.Load(raw_data))
1621     except Exception, err:
1622       raise errors.ConfigurationError(err)
1623
1624     # Make sure the configuration has the right version
1625     _ValidateConfig(data)
1626
1627     if (not hasattr(data, 'cluster') or
1628         not hasattr(data.cluster, 'rsahostkeypub')):
1629       raise errors.ConfigurationError("Incomplete configuration"
1630                                       " (missing cluster.rsahostkeypub)")
1631
1632     if data.cluster.master_node != self._my_hostname and not accept_foreign:
1633       msg = ("The configuration denotes node %s as master, while my"
1634              " hostname is %s; opening a foreign configuration is only"
1635              " possible in accept_foreign mode" %
1636              (data.cluster.master_node, self._my_hostname))
1637       raise errors.ConfigurationError(msg)
1638
1639     # Upgrade configuration if needed
1640     data.UpgradeConfig()
1641
1642     self._config_data = data
1643     # reset the last serial as -1 so that the next write will cause
1644     # ssconf update
1645     self._last_cluster_serial = -1
1646
1647     # And finally run our (custom) config upgrade sequence
1648     self._UpgradeConfig()
1649
1650     self._cfg_id = utils.GetFileID(path=self._cfg_file)
1651
1652   def _UpgradeConfig(self):
1653     """Run upgrade steps that cannot be done purely in the objects.
1654
1655     This is because some data elements need uniqueness across the
1656     whole configuration, etc.
1657
1658     @warning: this function will call L{_WriteConfig()}, but also
1659         L{DropECReservations} so it needs to be called only from a
1660         "safe" place (the constructor). If one wanted to call it with
1661         the lock held, a DropECReservationUnlocked would need to be
1662         created first, to avoid causing deadlock.
1663
1664     """
1665     modified = False
1666     for item in self._AllUUIDObjects():
1667       if item.uuid is None:
1668         item.uuid = self._GenerateUniqueID(_UPGRADE_CONFIG_JID)
1669         modified = True
1670     if not self._config_data.nodegroups:
1671       default_nodegroup_name = constants.INITIAL_NODE_GROUP_NAME
1672       default_nodegroup = objects.NodeGroup(name=default_nodegroup_name,
1673                                             members=[])
1674       self._UnlockedAddNodeGroup(default_nodegroup, _UPGRADE_CONFIG_JID, True)
1675       modified = True
1676     for node in self._config_data.nodes.values():
1677       if not node.group:
1678         node.group = self.LookupNodeGroup(None)
1679         modified = True
1680       # This is technically *not* an upgrade, but needs to be done both when
1681       # nodegroups are being added, and upon normally loading the config,
1682       # because the members list of a node group is discarded upon
1683       # serializing/deserializing the object.
1684       self._UnlockedAddNodeToGroup(node.name, node.group)
1685     if modified:
1686       self._WriteConfig()
1687       # This is ok even if it acquires the internal lock, as _UpgradeConfig is
1688       # only called at config init time, without the lock held
1689       self.DropECReservations(_UPGRADE_CONFIG_JID)
1690
1691   def _DistributeConfig(self, feedback_fn):
1692     """Distribute the configuration to the other nodes.
1693
1694     Currently, this only copies the configuration file. In the future,
1695     it could be used to encapsulate the 2/3-phase update mechanism.
1696
1697     """
1698     if self._offline:
1699       return True
1700
1701     bad = False
1702
1703     node_list = []
1704     addr_list = []
1705     myhostname = self._my_hostname
1706     # we can skip checking whether _UnlockedGetNodeInfo returns None
1707     # since the node list comes from _UnlocketGetNodeList, and we are
1708     # called with the lock held, so no modifications should take place
1709     # in between
1710     for node_name in self._UnlockedGetNodeList():
1711       if node_name == myhostname:
1712         continue
1713       node_info = self._UnlockedGetNodeInfo(node_name)
1714       if not node_info.master_candidate:
1715         continue
1716       node_list.append(node_info.name)
1717       addr_list.append(node_info.primary_ip)
1718
1719     result = rpc.RpcRunner.call_upload_file(node_list, self._cfg_file,
1720                                             address_list=addr_list)
1721     for to_node, to_result in result.items():
1722       msg = to_result.fail_msg
1723       if msg:
1724         msg = ("Copy of file %s to node %s failed: %s" %
1725                (self._cfg_file, to_node, msg))
1726         logging.error(msg)
1727
1728         if feedback_fn:
1729           feedback_fn(msg)
1730
1731         bad = True
1732
1733     return not bad
1734
1735   def _WriteConfig(self, destination=None, feedback_fn=None):
1736     """Write the configuration data to persistent storage.
1737
1738     """
1739     assert feedback_fn is None or callable(feedback_fn)
1740
1741     # Warn on config errors, but don't abort the save - the
1742     # configuration has already been modified, and we can't revert;
1743     # the best we can do is to warn the user and save as is, leaving
1744     # recovery to the user
1745     config_errors = self._UnlockedVerifyConfig()
1746     if config_errors:
1747       errmsg = ("Configuration data is not consistent: %s" %
1748                 (utils.CommaJoin(config_errors)))
1749       logging.critical(errmsg)
1750       if feedback_fn:
1751         feedback_fn(errmsg)
1752
1753     if destination is None:
1754       destination = self._cfg_file
1755     self._BumpSerialNo()
1756     txt = serializer.Dump(self._config_data.ToDict())
1757
1758     getents = self._getents()
1759     try:
1760       fd = utils.SafeWriteFile(destination, self._cfg_id, data=txt,
1761                                close=False, gid=getents.confd_gid, mode=0640)
1762     except errors.LockError:
1763       raise errors.ConfigurationError("The configuration file has been"
1764                                       " modified since the last write, cannot"
1765                                       " update")
1766     try:
1767       self._cfg_id = utils.GetFileID(fd=fd)
1768     finally:
1769       os.close(fd)
1770
1771     self.write_count += 1
1772
1773     # and redistribute the config file to master candidates
1774     self._DistributeConfig(feedback_fn)
1775
1776     # Write ssconf files on all nodes (including locally)
1777     if self._last_cluster_serial < self._config_data.cluster.serial_no:
1778       if not self._offline:
1779         result = rpc.RpcRunner.call_write_ssconf_files(
1780           self._UnlockedGetOnlineNodeList(),
1781           self._UnlockedGetSsconfValues())
1782
1783         for nname, nresu in result.items():
1784           msg = nresu.fail_msg
1785           if msg:
1786             errmsg = ("Error while uploading ssconf files to"
1787                       " node %s: %s" % (nname, msg))
1788             logging.warning(errmsg)
1789
1790             if feedback_fn:
1791               feedback_fn(errmsg)
1792
1793       self._last_cluster_serial = self._config_data.cluster.serial_no
1794
1795   def _UnlockedGetSsconfValues(self):
1796     """Return the values needed by ssconf.
1797
1798     @rtype: dict
1799     @return: a dictionary with keys the ssconf names and values their
1800         associated value
1801
1802     """
1803     fn = "\n".join
1804     instance_names = utils.NiceSort(self._UnlockedGetInstanceList())
1805     node_names = utils.NiceSort(self._UnlockedGetNodeList())
1806     node_info = [self._UnlockedGetNodeInfo(name) for name in node_names]
1807     node_pri_ips = ["%s %s" % (ninfo.name, ninfo.primary_ip)
1808                     for ninfo in node_info]
1809     node_snd_ips = ["%s %s" % (ninfo.name, ninfo.secondary_ip)
1810                     for ninfo in node_info]
1811
1812     instance_data = fn(instance_names)
1813     off_data = fn(node.name for node in node_info if node.offline)
1814     on_data = fn(node.name for node in node_info if not node.offline)
1815     mc_data = fn(node.name for node in node_info if node.master_candidate)
1816     mc_ips_data = fn(node.primary_ip for node in node_info
1817                      if node.master_candidate)
1818     node_data = fn(node_names)
1819     node_pri_ips_data = fn(node_pri_ips)
1820     node_snd_ips_data = fn(node_snd_ips)
1821
1822     cluster = self._config_data.cluster
1823     cluster_tags = fn(cluster.GetTags())
1824
1825     hypervisor_list = fn(cluster.enabled_hypervisors)
1826
1827     uid_pool = uidpool.FormatUidPool(cluster.uid_pool, separator="\n")
1828
1829     nodegroups = ["%s %s" % (nodegroup.uuid, nodegroup.name) for nodegroup in
1830                   self._config_data.nodegroups.values()]
1831     nodegroups_data = fn(utils.NiceSort(nodegroups))
1832
1833     ssconf_values = {
1834       constants.SS_CLUSTER_NAME: cluster.cluster_name,
1835       constants.SS_CLUSTER_TAGS: cluster_tags,
1836       constants.SS_FILE_STORAGE_DIR: cluster.file_storage_dir,
1837       constants.SS_SHARED_FILE_STORAGE_DIR: cluster.shared_file_storage_dir,
1838       constants.SS_MASTER_CANDIDATES: mc_data,
1839       constants.SS_MASTER_CANDIDATES_IPS: mc_ips_data,
1840       constants.SS_MASTER_IP: cluster.master_ip,
1841       constants.SS_MASTER_NETDEV: cluster.master_netdev,
1842       constants.SS_MASTER_NODE: cluster.master_node,
1843       constants.SS_NODE_LIST: node_data,
1844       constants.SS_NODE_PRIMARY_IPS: node_pri_ips_data,
1845       constants.SS_NODE_SECONDARY_IPS: node_snd_ips_data,
1846       constants.SS_OFFLINE_NODES: off_data,
1847       constants.SS_ONLINE_NODES: on_data,
1848       constants.SS_PRIMARY_IP_FAMILY: str(cluster.primary_ip_family),
1849       constants.SS_INSTANCE_LIST: instance_data,
1850       constants.SS_RELEASE_VERSION: constants.RELEASE_VERSION,
1851       constants.SS_HYPERVISOR_LIST: hypervisor_list,
1852       constants.SS_MAINTAIN_NODE_HEALTH: str(cluster.maintain_node_health),
1853       constants.SS_UID_POOL: uid_pool,
1854       constants.SS_NODEGROUPS: nodegroups_data,
1855       }
1856     bad_values = [(k, v) for k, v in ssconf_values.items()
1857                   if not isinstance(v, (str, basestring))]
1858     if bad_values:
1859       err = utils.CommaJoin("%s=%s" % (k, v) for k, v in bad_values)
1860       raise errors.ConfigurationError("Some ssconf key(s) have non-string"
1861                                       " values: %s" % err)
1862     return ssconf_values
1863
1864   @locking.ssynchronized(_config_lock, shared=1)
1865   def GetSsconfValues(self):
1866     """Wrapper using lock around _UnlockedGetSsconf().
1867
1868     """
1869     return self._UnlockedGetSsconfValues()
1870
1871   @locking.ssynchronized(_config_lock, shared=1)
1872   def GetVGName(self):
1873     """Return the volume group name.
1874
1875     """
1876     return self._config_data.cluster.volume_group_name
1877
1878   @locking.ssynchronized(_config_lock)
1879   def SetVGName(self, vg_name):
1880     """Set the volume group name.
1881
1882     """
1883     self._config_data.cluster.volume_group_name = vg_name
1884     self._config_data.cluster.serial_no += 1
1885     self._WriteConfig()
1886
1887   @locking.ssynchronized(_config_lock, shared=1)
1888   def GetDRBDHelper(self):
1889     """Return DRBD usermode helper.
1890
1891     """
1892     return self._config_data.cluster.drbd_usermode_helper
1893
1894   @locking.ssynchronized(_config_lock)
1895   def SetDRBDHelper(self, drbd_helper):
1896     """Set DRBD usermode helper.
1897
1898     """
1899     self._config_data.cluster.drbd_usermode_helper = drbd_helper
1900     self._config_data.cluster.serial_no += 1
1901     self._WriteConfig()
1902
1903   @locking.ssynchronized(_config_lock, shared=1)
1904   def GetMACPrefix(self):
1905     """Return the mac prefix.
1906
1907     """
1908     return self._config_data.cluster.mac_prefix
1909
1910   @locking.ssynchronized(_config_lock, shared=1)
1911   def GetClusterInfo(self):
1912     """Returns information about the cluster
1913
1914     @rtype: L{objects.Cluster}
1915     @return: the cluster object
1916
1917     """
1918     return self._config_data.cluster
1919
1920   @locking.ssynchronized(_config_lock, shared=1)
1921   def HasAnyDiskOfType(self, dev_type):
1922     """Check if in there is at disk of the given type in the configuration.
1923
1924     """
1925     return self._config_data.HasAnyDiskOfType(dev_type)
1926
1927   @locking.ssynchronized(_config_lock)
1928   def Update(self, target, feedback_fn):
1929     """Notify function to be called after updates.
1930
1931     This function must be called when an object (as returned by
1932     GetInstanceInfo, GetNodeInfo, GetCluster) has been updated and the
1933     caller wants the modifications saved to the backing store. Note
1934     that all modified objects will be saved, but the target argument
1935     is the one the caller wants to ensure that it's saved.
1936
1937     @param target: an instance of either L{objects.Cluster},
1938         L{objects.Node} or L{objects.Instance} which is existing in
1939         the cluster
1940     @param feedback_fn: Callable feedback function
1941
1942     """
1943     if self._config_data is None:
1944       raise errors.ProgrammerError("Configuration file not read,"
1945                                    " cannot save.")
1946     update_serial = False
1947     if isinstance(target, objects.Cluster):
1948       test = target == self._config_data.cluster
1949     elif isinstance(target, objects.Node):
1950       test = target in self._config_data.nodes.values()
1951       update_serial = True
1952     elif isinstance(target, objects.Instance):
1953       test = target in self._config_data.instances.values()
1954     elif isinstance(target, objects.NodeGroup):
1955       test = target in self._config_data.nodegroups.values()
1956     else:
1957       raise errors.ProgrammerError("Invalid object type (%s) passed to"
1958                                    " ConfigWriter.Update" % type(target))
1959     if not test:
1960       raise errors.ConfigurationError("Configuration updated since object"
1961                                       " has been read or unknown object")
1962     target.serial_no += 1
1963     target.mtime = now = time.time()
1964
1965     if update_serial:
1966       # for node updates, we need to increase the cluster serial too
1967       self._config_data.cluster.serial_no += 1
1968       self._config_data.cluster.mtime = now
1969
1970     if isinstance(target, objects.Instance):
1971       self._UnlockedReleaseDRBDMinors(target.name)
1972
1973     self._WriteConfig(feedback_fn=feedback_fn)
1974
1975   @locking.ssynchronized(_config_lock)
1976   def DropECReservations(self, ec_id):
1977     """Drop per-execution-context reservations
1978
1979     """
1980     for rm in self._all_rms:
1981       rm.DropECReservations(ec_id)