http.client: Remove HTTP client pool code
[ganeti-local] / lib / config.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Configuration management for Ganeti
23
24 This module provides the interface to the Ganeti cluster configuration.
25
26 The configuration data is stored on every node but is updated on the master
27 only. After each update, the master distributes the data to the other nodes.
28
29 Currently, the data storage format is JSON. YAML was slow and consuming too
30 much memory.
31
32 """
33
34 # pylint: disable=R0904
35 # R0904: Too many public methods
36
37 import os
38 import random
39 import logging
40 import time
41
42 from ganeti import errors
43 from ganeti import locking
44 from ganeti import utils
45 from ganeti import constants
46 from ganeti import rpc
47 from ganeti import objects
48 from ganeti import serializer
49 from ganeti import uidpool
50 from ganeti import netutils
51 from ganeti import runtime
52
53
54 _config_lock = locking.SharedLock("ConfigWriter")
55
56 # job id used for resource management at config upgrade time
57 _UPGRADE_CONFIG_JID = "jid-cfg-upgrade"
58
59
60 def _ValidateConfig(data):
61   """Verifies that a configuration objects looks valid.
62
63   This only verifies the version of the configuration.
64
65   @raise errors.ConfigurationError: if the version differs from what
66       we expect
67
68   """
69   if data.version != constants.CONFIG_VERSION:
70     raise errors.ConfigVersionMismatch(constants.CONFIG_VERSION, data.version)
71
72
73 class TemporaryReservationManager:
74   """A temporary resource reservation manager.
75
76   This is used to reserve resources in a job, before using them, making sure
77   other jobs cannot get them in the meantime.
78
79   """
80   def __init__(self):
81     self._ec_reserved = {}
82
83   def Reserved(self, resource):
84     for holder_reserved in self._ec_reserved.values():
85       if resource in holder_reserved:
86         return True
87     return False
88
89   def Reserve(self, ec_id, resource):
90     if self.Reserved(resource):
91       raise errors.ReservationError("Duplicate reservation for resource '%s'"
92                                     % str(resource))
93     if ec_id not in self._ec_reserved:
94       self._ec_reserved[ec_id] = set([resource])
95     else:
96       self._ec_reserved[ec_id].add(resource)
97
98   def DropECReservations(self, ec_id):
99     if ec_id in self._ec_reserved:
100       del self._ec_reserved[ec_id]
101
102   def GetReserved(self):
103     all_reserved = set()
104     for holder_reserved in self._ec_reserved.values():
105       all_reserved.update(holder_reserved)
106     return all_reserved
107
108   def Generate(self, existing, generate_one_fn, ec_id):
109     """Generate a new resource of this type
110
111     """
112     assert callable(generate_one_fn)
113
114     all_elems = self.GetReserved()
115     all_elems.update(existing)
116     retries = 64
117     while retries > 0:
118       new_resource = generate_one_fn()
119       if new_resource is not None and new_resource not in all_elems:
120         break
121     else:
122       raise errors.ConfigurationError("Not able generate new resource"
123                                       " (last tried: %s)" % new_resource)
124     self.Reserve(ec_id, new_resource)
125     return new_resource
126
127
128 def _MatchNameComponentIgnoreCase(short_name, names):
129   """Wrapper around L{utils.text.MatchNameComponent}.
130
131   """
132   return utils.MatchNameComponent(short_name, names, case_sensitive=False)
133
134
135 class ConfigWriter:
136   """The interface to the cluster configuration.
137
138   @ivar _temporary_lvs: reservation manager for temporary LVs
139   @ivar _all_rms: a list of all temporary reservation managers
140
141   """
142   def __init__(self, cfg_file=None, offline=False, _getents=runtime.GetEnts,
143                accept_foreign=False):
144     self.write_count = 0
145     self._lock = _config_lock
146     self._config_data = None
147     self._offline = offline
148     if cfg_file is None:
149       self._cfg_file = constants.CLUSTER_CONF_FILE
150     else:
151       self._cfg_file = cfg_file
152     self._getents = _getents
153     self._temporary_ids = TemporaryReservationManager()
154     self._temporary_drbds = {}
155     self._temporary_macs = TemporaryReservationManager()
156     self._temporary_secrets = TemporaryReservationManager()
157     self._temporary_lvs = TemporaryReservationManager()
158     self._all_rms = [self._temporary_ids, self._temporary_macs,
159                      self._temporary_secrets, self._temporary_lvs]
160     # Note: in order to prevent errors when resolving our name in
161     # _DistributeConfig, we compute it here once and reuse it; it's
162     # better to raise an error before starting to modify the config
163     # file than after it was modified
164     self._my_hostname = netutils.Hostname.GetSysName()
165     self._last_cluster_serial = -1
166     self._cfg_id = None
167     self._OpenConfig(accept_foreign)
168
169   # this method needs to be static, so that we can call it on the class
170   @staticmethod
171   def IsCluster():
172     """Check if the cluster is configured.
173
174     """
175     return os.path.exists(constants.CLUSTER_CONF_FILE)
176
177   def _GenerateOneMAC(self):
178     """Generate one mac address
179
180     """
181     prefix = self._config_data.cluster.mac_prefix
182     byte1 = random.randrange(0, 256)
183     byte2 = random.randrange(0, 256)
184     byte3 = random.randrange(0, 256)
185     mac = "%s:%02x:%02x:%02x" % (prefix, byte1, byte2, byte3)
186     return mac
187
188   @locking.ssynchronized(_config_lock, shared=1)
189   def GetNdParams(self, node):
190     """Get the node params populated with cluster defaults.
191
192     @type node: L{objects.Node}
193     @param node: The node we want to know the params for
194     @return: A dict with the filled in node params
195
196     """
197     nodegroup = self._UnlockedGetNodeGroup(node.group)
198     return self._config_data.cluster.FillND(node, nodegroup)
199
200   @locking.ssynchronized(_config_lock, shared=1)
201   def GenerateMAC(self, ec_id):
202     """Generate a MAC for an instance.
203
204     This should check the current instances for duplicates.
205
206     """
207     existing = self._AllMACs()
208     return self._temporary_ids.Generate(existing, self._GenerateOneMAC, ec_id)
209
210   @locking.ssynchronized(_config_lock, shared=1)
211   def ReserveMAC(self, mac, ec_id):
212     """Reserve a MAC for an instance.
213
214     This only checks instances managed by this cluster, it does not
215     check for potential collisions elsewhere.
216
217     """
218     all_macs = self._AllMACs()
219     if mac in all_macs:
220       raise errors.ReservationError("mac already in use")
221     else:
222       self._temporary_macs.Reserve(ec_id, mac)
223
224   @locking.ssynchronized(_config_lock, shared=1)
225   def ReserveLV(self, lv_name, ec_id):
226     """Reserve an VG/LV pair for an instance.
227
228     @type lv_name: string
229     @param lv_name: the logical volume name to reserve
230
231     """
232     all_lvs = self._AllLVs()
233     if lv_name in all_lvs:
234       raise errors.ReservationError("LV already in use")
235     else:
236       self._temporary_lvs.Reserve(ec_id, lv_name)
237
238   @locking.ssynchronized(_config_lock, shared=1)
239   def GenerateDRBDSecret(self, ec_id):
240     """Generate a DRBD secret.
241
242     This checks the current disks for duplicates.
243
244     """
245     return self._temporary_secrets.Generate(self._AllDRBDSecrets(),
246                                             utils.GenerateSecret,
247                                             ec_id)
248
249   def _AllLVs(self):
250     """Compute the list of all LVs.
251
252     """
253     lvnames = set()
254     for instance in self._config_data.instances.values():
255       node_data = instance.MapLVsByNode()
256       for lv_list in node_data.values():
257         lvnames.update(lv_list)
258     return lvnames
259
260   def _AllIDs(self, include_temporary):
261     """Compute the list of all UUIDs and names we have.
262
263     @type include_temporary: boolean
264     @param include_temporary: whether to include the _temporary_ids set
265     @rtype: set
266     @return: a set of IDs
267
268     """
269     existing = set()
270     if include_temporary:
271       existing.update(self._temporary_ids.GetReserved())
272     existing.update(self._AllLVs())
273     existing.update(self._config_data.instances.keys())
274     existing.update(self._config_data.nodes.keys())
275     existing.update([i.uuid for i in self._AllUUIDObjects() if i.uuid])
276     return existing
277
278   def _GenerateUniqueID(self, ec_id):
279     """Generate an unique UUID.
280
281     This checks the current node, instances and disk names for
282     duplicates.
283
284     @rtype: string
285     @return: the unique id
286
287     """
288     existing = self._AllIDs(include_temporary=False)
289     return self._temporary_ids.Generate(existing, utils.NewUUID, ec_id)
290
291   @locking.ssynchronized(_config_lock, shared=1)
292   def GenerateUniqueID(self, ec_id):
293     """Generate an unique ID.
294
295     This is just a wrapper over the unlocked version.
296
297     @type ec_id: string
298     @param ec_id: unique id for the job to reserve the id to
299
300     """
301     return self._GenerateUniqueID(ec_id)
302
303   def _AllMACs(self):
304     """Return all MACs present in the config.
305
306     @rtype: list
307     @return: the list of all MACs
308
309     """
310     result = []
311     for instance in self._config_data.instances.values():
312       for nic in instance.nics:
313         result.append(nic.mac)
314
315     return result
316
317   def _AllDRBDSecrets(self):
318     """Return all DRBD secrets present in the config.
319
320     @rtype: list
321     @return: the list of all DRBD secrets
322
323     """
324     def helper(disk, result):
325       """Recursively gather secrets from this disk."""
326       if disk.dev_type == constants.DT_DRBD8:
327         result.append(disk.logical_id[5])
328       if disk.children:
329         for child in disk.children:
330           helper(child, result)
331
332     result = []
333     for instance in self._config_data.instances.values():
334       for disk in instance.disks:
335         helper(disk, result)
336
337     return result
338
339   def _CheckDiskIDs(self, disk, l_ids, p_ids):
340     """Compute duplicate disk IDs
341
342     @type disk: L{objects.Disk}
343     @param disk: the disk at which to start searching
344     @type l_ids: list
345     @param l_ids: list of current logical ids
346     @type p_ids: list
347     @param p_ids: list of current physical ids
348     @rtype: list
349     @return: a list of error messages
350
351     """
352     result = []
353     if disk.logical_id is not None:
354       if disk.logical_id in l_ids:
355         result.append("duplicate logical id %s" % str(disk.logical_id))
356       else:
357         l_ids.append(disk.logical_id)
358     if disk.physical_id is not None:
359       if disk.physical_id in p_ids:
360         result.append("duplicate physical id %s" % str(disk.physical_id))
361       else:
362         p_ids.append(disk.physical_id)
363
364     if disk.children:
365       for child in disk.children:
366         result.extend(self._CheckDiskIDs(child, l_ids, p_ids))
367     return result
368
369   def _UnlockedVerifyConfig(self):
370     """Verify function.
371
372     @rtype: list
373     @return: a list of error messages; a non-empty list signifies
374         configuration errors
375
376     """
377     # pylint: disable=R0914
378     result = []
379     seen_macs = []
380     ports = {}
381     data = self._config_data
382     cluster = data.cluster
383     seen_lids = []
384     seen_pids = []
385
386     # global cluster checks
387     if not cluster.enabled_hypervisors:
388       result.append("enabled hypervisors list doesn't have any entries")
389     invalid_hvs = set(cluster.enabled_hypervisors) - constants.HYPER_TYPES
390     if invalid_hvs:
391       result.append("enabled hypervisors contains invalid entries: %s" %
392                     invalid_hvs)
393     missing_hvp = (set(cluster.enabled_hypervisors) -
394                    set(cluster.hvparams.keys()))
395     if missing_hvp:
396       result.append("hypervisor parameters missing for the enabled"
397                     " hypervisor(s) %s" % utils.CommaJoin(missing_hvp))
398
399     if cluster.master_node not in data.nodes:
400       result.append("cluster has invalid primary node '%s'" %
401                     cluster.master_node)
402
403     def _helper(owner, attr, value, template):
404       try:
405         utils.ForceDictType(value, template)
406       except errors.GenericError, err:
407         result.append("%s has invalid %s: %s" % (owner, attr, err))
408
409     def _helper_nic(owner, params):
410       try:
411         objects.NIC.CheckParameterSyntax(params)
412       except errors.ConfigurationError, err:
413         result.append("%s has invalid nicparams: %s" % (owner, err))
414
415     # check cluster parameters
416     _helper("cluster", "beparams", cluster.SimpleFillBE({}),
417             constants.BES_PARAMETER_TYPES)
418     _helper("cluster", "nicparams", cluster.SimpleFillNIC({}),
419             constants.NICS_PARAMETER_TYPES)
420     _helper_nic("cluster", cluster.SimpleFillNIC({}))
421     _helper("cluster", "ndparams", cluster.SimpleFillND({}),
422             constants.NDS_PARAMETER_TYPES)
423
424     # per-instance checks
425     for instance_name in data.instances:
426       instance = data.instances[instance_name]
427       if instance.name != instance_name:
428         result.append("instance '%s' is indexed by wrong name '%s'" %
429                       (instance.name, instance_name))
430       if instance.primary_node not in data.nodes:
431         result.append("instance '%s' has invalid primary node '%s'" %
432                       (instance_name, instance.primary_node))
433       for snode in instance.secondary_nodes:
434         if snode not in data.nodes:
435           result.append("instance '%s' has invalid secondary node '%s'" %
436                         (instance_name, snode))
437       for idx, nic in enumerate(instance.nics):
438         if nic.mac in seen_macs:
439           result.append("instance '%s' has NIC %d mac %s duplicate" %
440                         (instance_name, idx, nic.mac))
441         else:
442           seen_macs.append(nic.mac)
443         if nic.nicparams:
444           filled = cluster.SimpleFillNIC(nic.nicparams)
445           owner = "instance %s nic %d" % (instance.name, idx)
446           _helper(owner, "nicparams",
447                   filled, constants.NICS_PARAMETER_TYPES)
448           _helper_nic(owner, filled)
449
450       # parameter checks
451       if instance.beparams:
452         _helper("instance %s" % instance.name, "beparams",
453                 cluster.FillBE(instance), constants.BES_PARAMETER_TYPES)
454
455       # gather the drbd ports for duplicate checks
456       for dsk in instance.disks:
457         if dsk.dev_type in constants.LDS_DRBD:
458           tcp_port = dsk.logical_id[2]
459           if tcp_port not in ports:
460             ports[tcp_port] = []
461           ports[tcp_port].append((instance.name, "drbd disk %s" % dsk.iv_name))
462       # gather network port reservation
463       net_port = getattr(instance, "network_port", None)
464       if net_port is not None:
465         if net_port not in ports:
466           ports[net_port] = []
467         ports[net_port].append((instance.name, "network port"))
468
469       # instance disk verify
470       for idx, disk in enumerate(instance.disks):
471         result.extend(["instance '%s' disk %d error: %s" %
472                        (instance.name, idx, msg) for msg in disk.Verify()])
473         result.extend(self._CheckDiskIDs(disk, seen_lids, seen_pids))
474
475     # cluster-wide pool of free ports
476     for free_port in cluster.tcpudp_port_pool:
477       if free_port not in ports:
478         ports[free_port] = []
479       ports[free_port].append(("cluster", "port marked as free"))
480
481     # compute tcp/udp duplicate ports
482     keys = ports.keys()
483     keys.sort()
484     for pnum in keys:
485       pdata = ports[pnum]
486       if len(pdata) > 1:
487         txt = utils.CommaJoin(["%s/%s" % val for val in pdata])
488         result.append("tcp/udp port %s has duplicates: %s" % (pnum, txt))
489
490     # highest used tcp port check
491     if keys:
492       if keys[-1] > cluster.highest_used_port:
493         result.append("Highest used port mismatch, saved %s, computed %s" %
494                       (cluster.highest_used_port, keys[-1]))
495
496     if not data.nodes[cluster.master_node].master_candidate:
497       result.append("Master node is not a master candidate")
498
499     # master candidate checks
500     mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats()
501     if mc_now < mc_max:
502       result.append("Not enough master candidates: actual %d, target %d" %
503                     (mc_now, mc_max))
504
505     # node checks
506     for node_name, node in data.nodes.items():
507       if node.name != node_name:
508         result.append("Node '%s' is indexed by wrong name '%s'" %
509                       (node.name, node_name))
510       if [node.master_candidate, node.drained, node.offline].count(True) > 1:
511         result.append("Node %s state is invalid: master_candidate=%s,"
512                       " drain=%s, offline=%s" %
513                       (node.name, node.master_candidate, node.drained,
514                        node.offline))
515       if node.group not in data.nodegroups:
516         result.append("Node '%s' has invalid group '%s'" %
517                       (node.name, node.group))
518       else:
519         _helper("node %s" % node.name, "ndparams",
520                 cluster.FillND(node, data.nodegroups[node.group]),
521                 constants.NDS_PARAMETER_TYPES)
522
523     # nodegroups checks
524     nodegroups_names = set()
525     for nodegroup_uuid in data.nodegroups:
526       nodegroup = data.nodegroups[nodegroup_uuid]
527       if nodegroup.uuid != nodegroup_uuid:
528         result.append("node group '%s' (uuid: '%s') indexed by wrong uuid '%s'"
529                       % (nodegroup.name, nodegroup.uuid, nodegroup_uuid))
530       if utils.UUID_RE.match(nodegroup.name.lower()):
531         result.append("node group '%s' (uuid: '%s') has uuid-like name" %
532                       (nodegroup.name, nodegroup.uuid))
533       if nodegroup.name in nodegroups_names:
534         result.append("duplicate node group name '%s'" % nodegroup.name)
535       else:
536         nodegroups_names.add(nodegroup.name)
537       if nodegroup.ndparams:
538         _helper("group %s" % nodegroup.name, "ndparams",
539                 cluster.SimpleFillND(nodegroup.ndparams),
540                 constants.NDS_PARAMETER_TYPES)
541
542     # drbd minors check
543     _, duplicates = self._UnlockedComputeDRBDMap()
544     for node, minor, instance_a, instance_b in duplicates:
545       result.append("DRBD minor %d on node %s is assigned twice to instances"
546                     " %s and %s" % (minor, node, instance_a, instance_b))
547
548     # IP checks
549     default_nicparams = cluster.nicparams[constants.PP_DEFAULT]
550     ips = {}
551
552     def _AddIpAddress(ip, name):
553       ips.setdefault(ip, []).append(name)
554
555     _AddIpAddress(cluster.master_ip, "cluster_ip")
556
557     for node in data.nodes.values():
558       _AddIpAddress(node.primary_ip, "node:%s/primary" % node.name)
559       if node.secondary_ip != node.primary_ip:
560         _AddIpAddress(node.secondary_ip, "node:%s/secondary" % node.name)
561
562     for instance in data.instances.values():
563       for idx, nic in enumerate(instance.nics):
564         if nic.ip is None:
565           continue
566
567         nicparams = objects.FillDict(default_nicparams, nic.nicparams)
568         nic_mode = nicparams[constants.NIC_MODE]
569         nic_link = nicparams[constants.NIC_LINK]
570
571         if nic_mode == constants.NIC_MODE_BRIDGED:
572           link = "bridge:%s" % nic_link
573         elif nic_mode == constants.NIC_MODE_ROUTED:
574           link = "route:%s" % nic_link
575         else:
576           raise errors.ProgrammerError("NIC mode '%s' not handled" % nic_mode)
577
578         _AddIpAddress("%s/%s" % (link, nic.ip),
579                       "instance:%s/nic:%d" % (instance.name, idx))
580
581     for ip, owners in ips.items():
582       if len(owners) > 1:
583         result.append("IP address %s is used by multiple owners: %s" %
584                       (ip, utils.CommaJoin(owners)))
585
586     return result
587
588   @locking.ssynchronized(_config_lock, shared=1)
589   def VerifyConfig(self):
590     """Verify function.
591
592     This is just a wrapper over L{_UnlockedVerifyConfig}.
593
594     @rtype: list
595     @return: a list of error messages; a non-empty list signifies
596         configuration errors
597
598     """
599     return self._UnlockedVerifyConfig()
600
601   def _UnlockedSetDiskID(self, disk, node_name):
602     """Convert the unique ID to the ID needed on the target nodes.
603
604     This is used only for drbd, which needs ip/port configuration.
605
606     The routine descends down and updates its children also, because
607     this helps when the only the top device is passed to the remote
608     node.
609
610     This function is for internal use, when the config lock is already held.
611
612     """
613     if disk.children:
614       for child in disk.children:
615         self._UnlockedSetDiskID(child, node_name)
616
617     if disk.logical_id is None and disk.physical_id is not None:
618       return
619     if disk.dev_type == constants.LD_DRBD8:
620       pnode, snode, port, pminor, sminor, secret = disk.logical_id
621       if node_name not in (pnode, snode):
622         raise errors.ConfigurationError("DRBD device not knowing node %s" %
623                                         node_name)
624       pnode_info = self._UnlockedGetNodeInfo(pnode)
625       snode_info = self._UnlockedGetNodeInfo(snode)
626       if pnode_info is None or snode_info is None:
627         raise errors.ConfigurationError("Can't find primary or secondary node"
628                                         " for %s" % str(disk))
629       p_data = (pnode_info.secondary_ip, port)
630       s_data = (snode_info.secondary_ip, port)
631       if pnode == node_name:
632         disk.physical_id = p_data + s_data + (pminor, secret)
633       else: # it must be secondary, we tested above
634         disk.physical_id = s_data + p_data + (sminor, secret)
635     else:
636       disk.physical_id = disk.logical_id
637     return
638
639   @locking.ssynchronized(_config_lock)
640   def SetDiskID(self, disk, node_name):
641     """Convert the unique ID to the ID needed on the target nodes.
642
643     This is used only for drbd, which needs ip/port configuration.
644
645     The routine descends down and updates its children also, because
646     this helps when the only the top device is passed to the remote
647     node.
648
649     """
650     return self._UnlockedSetDiskID(disk, node_name)
651
652   @locking.ssynchronized(_config_lock)
653   def AddTcpUdpPort(self, port):
654     """Adds a new port to the available port pool.
655
656     """
657     if not isinstance(port, int):
658       raise errors.ProgrammerError("Invalid type passed for port")
659
660     self._config_data.cluster.tcpudp_port_pool.add(port)
661     self._WriteConfig()
662
663   @locking.ssynchronized(_config_lock, shared=1)
664   def GetPortList(self):
665     """Returns a copy of the current port list.
666
667     """
668     return self._config_data.cluster.tcpudp_port_pool.copy()
669
670   @locking.ssynchronized(_config_lock)
671   def AllocatePort(self):
672     """Allocate a port.
673
674     The port will be taken from the available port pool or from the
675     default port range (and in this case we increase
676     highest_used_port).
677
678     """
679     # If there are TCP/IP ports configured, we use them first.
680     if self._config_data.cluster.tcpudp_port_pool:
681       port = self._config_data.cluster.tcpudp_port_pool.pop()
682     else:
683       port = self._config_data.cluster.highest_used_port + 1
684       if port >= constants.LAST_DRBD_PORT:
685         raise errors.ConfigurationError("The highest used port is greater"
686                                         " than %s. Aborting." %
687                                         constants.LAST_DRBD_PORT)
688       self._config_data.cluster.highest_used_port = port
689
690     self._WriteConfig()
691     return port
692
693   def _UnlockedComputeDRBDMap(self):
694     """Compute the used DRBD minor/nodes.
695
696     @rtype: (dict, list)
697     @return: dictionary of node_name: dict of minor: instance_name;
698         the returned dict will have all the nodes in it (even if with
699         an empty list), and a list of duplicates; if the duplicates
700         list is not empty, the configuration is corrupted and its caller
701         should raise an exception
702
703     """
704     def _AppendUsedPorts(instance_name, disk, used):
705       duplicates = []
706       if disk.dev_type == constants.LD_DRBD8 and len(disk.logical_id) >= 5:
707         node_a, node_b, _, minor_a, minor_b = disk.logical_id[:5]
708         for node, port in ((node_a, minor_a), (node_b, minor_b)):
709           assert node in used, ("Node '%s' of instance '%s' not found"
710                                 " in node list" % (node, instance_name))
711           if port in used[node]:
712             duplicates.append((node, port, instance_name, used[node][port]))
713           else:
714             used[node][port] = instance_name
715       if disk.children:
716         for child in disk.children:
717           duplicates.extend(_AppendUsedPorts(instance_name, child, used))
718       return duplicates
719
720     duplicates = []
721     my_dict = dict((node, {}) for node in self._config_data.nodes)
722     for instance in self._config_data.instances.itervalues():
723       for disk in instance.disks:
724         duplicates.extend(_AppendUsedPorts(instance.name, disk, my_dict))
725     for (node, minor), instance in self._temporary_drbds.iteritems():
726       if minor in my_dict[node] and my_dict[node][minor] != instance:
727         duplicates.append((node, minor, instance, my_dict[node][minor]))
728       else:
729         my_dict[node][minor] = instance
730     return my_dict, duplicates
731
732   @locking.ssynchronized(_config_lock)
733   def ComputeDRBDMap(self):
734     """Compute the used DRBD minor/nodes.
735
736     This is just a wrapper over L{_UnlockedComputeDRBDMap}.
737
738     @return: dictionary of node_name: dict of minor: instance_name;
739         the returned dict will have all the nodes in it (even if with
740         an empty list).
741
742     """
743     d_map, duplicates = self._UnlockedComputeDRBDMap()
744     if duplicates:
745       raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
746                                       str(duplicates))
747     return d_map
748
749   @locking.ssynchronized(_config_lock)
750   def AllocateDRBDMinor(self, nodes, instance):
751     """Allocate a drbd minor.
752
753     The free minor will be automatically computed from the existing
754     devices. A node can be given multiple times in order to allocate
755     multiple minors. The result is the list of minors, in the same
756     order as the passed nodes.
757
758     @type instance: string
759     @param instance: the instance for which we allocate minors
760
761     """
762     assert isinstance(instance, basestring), \
763            "Invalid argument '%s' passed to AllocateDRBDMinor" % instance
764
765     d_map, duplicates = self._UnlockedComputeDRBDMap()
766     if duplicates:
767       raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
768                                       str(duplicates))
769     result = []
770     for nname in nodes:
771       ndata = d_map[nname]
772       if not ndata:
773         # no minors used, we can start at 0
774         result.append(0)
775         ndata[0] = instance
776         self._temporary_drbds[(nname, 0)] = instance
777         continue
778       keys = ndata.keys()
779       keys.sort()
780       ffree = utils.FirstFree(keys)
781       if ffree is None:
782         # return the next minor
783         # TODO: implement high-limit check
784         minor = keys[-1] + 1
785       else:
786         minor = ffree
787       # double-check minor against current instances
788       assert minor not in d_map[nname], \
789              ("Attempt to reuse allocated DRBD minor %d on node %s,"
790               " already allocated to instance %s" %
791               (minor, nname, d_map[nname][minor]))
792       ndata[minor] = instance
793       # double-check minor against reservation
794       r_key = (nname, minor)
795       assert r_key not in self._temporary_drbds, \
796              ("Attempt to reuse reserved DRBD minor %d on node %s,"
797               " reserved for instance %s" %
798               (minor, nname, self._temporary_drbds[r_key]))
799       self._temporary_drbds[r_key] = instance
800       result.append(minor)
801     logging.debug("Request to allocate drbd minors, input: %s, returning %s",
802                   nodes, result)
803     return result
804
805   def _UnlockedReleaseDRBDMinors(self, instance):
806     """Release temporary drbd minors allocated for a given instance.
807
808     @type instance: string
809     @param instance: the instance for which temporary minors should be
810                      released
811
812     """
813     assert isinstance(instance, basestring), \
814            "Invalid argument passed to ReleaseDRBDMinors"
815     for key, name in self._temporary_drbds.items():
816       if name == instance:
817         del self._temporary_drbds[key]
818
819   @locking.ssynchronized(_config_lock)
820   def ReleaseDRBDMinors(self, instance):
821     """Release temporary drbd minors allocated for a given instance.
822
823     This should be called on the error paths, on the success paths
824     it's automatically called by the ConfigWriter add and update
825     functions.
826
827     This function is just a wrapper over L{_UnlockedReleaseDRBDMinors}.
828
829     @type instance: string
830     @param instance: the instance for which temporary minors should be
831                      released
832
833     """
834     self._UnlockedReleaseDRBDMinors(instance)
835
836   @locking.ssynchronized(_config_lock, shared=1)
837   def GetConfigVersion(self):
838     """Get the configuration version.
839
840     @return: Config version
841
842     """
843     return self._config_data.version
844
845   @locking.ssynchronized(_config_lock, shared=1)
846   def GetClusterName(self):
847     """Get cluster name.
848
849     @return: Cluster name
850
851     """
852     return self._config_data.cluster.cluster_name
853
854   @locking.ssynchronized(_config_lock, shared=1)
855   def GetMasterNode(self):
856     """Get the hostname of the master node for this cluster.
857
858     @return: Master hostname
859
860     """
861     return self._config_data.cluster.master_node
862
863   @locking.ssynchronized(_config_lock, shared=1)
864   def GetMasterIP(self):
865     """Get the IP of the master node for this cluster.
866
867     @return: Master IP
868
869     """
870     return self._config_data.cluster.master_ip
871
872   @locking.ssynchronized(_config_lock, shared=1)
873   def GetMasterNetdev(self):
874     """Get the master network device for this cluster.
875
876     """
877     return self._config_data.cluster.master_netdev
878
879   @locking.ssynchronized(_config_lock, shared=1)
880   def GetMasterNetmask(self):
881     """Get the netmask of the master node for this cluster.
882
883     """
884     return self._config_data.cluster.master_netmask
885
886   @locking.ssynchronized(_config_lock, shared=1)
887   def GetFileStorageDir(self):
888     """Get the file storage dir for this cluster.
889
890     """
891     return self._config_data.cluster.file_storage_dir
892
893   @locking.ssynchronized(_config_lock, shared=1)
894   def GetSharedFileStorageDir(self):
895     """Get the shared file storage dir for this cluster.
896
897     """
898     return self._config_data.cluster.shared_file_storage_dir
899
900   @locking.ssynchronized(_config_lock, shared=1)
901   def GetHypervisorType(self):
902     """Get the hypervisor type for this cluster.
903
904     """
905     return self._config_data.cluster.enabled_hypervisors[0]
906
907   @locking.ssynchronized(_config_lock, shared=1)
908   def GetHostKey(self):
909     """Return the rsa hostkey from the config.
910
911     @rtype: string
912     @return: the rsa hostkey
913
914     """
915     return self._config_data.cluster.rsahostkeypub
916
917   @locking.ssynchronized(_config_lock, shared=1)
918   def GetDefaultIAllocator(self):
919     """Get the default instance allocator for this cluster.
920
921     """
922     return self._config_data.cluster.default_iallocator
923
924   @locking.ssynchronized(_config_lock, shared=1)
925   def GetPrimaryIPFamily(self):
926     """Get cluster primary ip family.
927
928     @return: primary ip family
929
930     """
931     return self._config_data.cluster.primary_ip_family
932
933   @locking.ssynchronized(_config_lock)
934   def AddNodeGroup(self, group, ec_id, check_uuid=True):
935     """Add a node group to the configuration.
936
937     This method calls group.UpgradeConfig() to fill any missing attributes
938     according to their default values.
939
940     @type group: L{objects.NodeGroup}
941     @param group: the NodeGroup object to add
942     @type ec_id: string
943     @param ec_id: unique id for the job to use when creating a missing UUID
944     @type check_uuid: bool
945     @param check_uuid: add an UUID to the group if it doesn't have one or, if
946                        it does, ensure that it does not exist in the
947                        configuration already
948
949     """
950     self._UnlockedAddNodeGroup(group, ec_id, check_uuid)
951     self._WriteConfig()
952
953   def _UnlockedAddNodeGroup(self, group, ec_id, check_uuid):
954     """Add a node group to the configuration.
955
956     """
957     logging.info("Adding node group %s to configuration", group.name)
958
959     # Some code might need to add a node group with a pre-populated UUID
960     # generated with ConfigWriter.GenerateUniqueID(). We allow them to bypass
961     # the "does this UUID" exist already check.
962     if check_uuid:
963       self._EnsureUUID(group, ec_id)
964
965     try:
966       existing_uuid = self._UnlockedLookupNodeGroup(group.name)
967     except errors.OpPrereqError:
968       pass
969     else:
970       raise errors.OpPrereqError("Desired group name '%s' already exists as a"
971                                  " node group (UUID: %s)" %
972                                  (group.name, existing_uuid),
973                                  errors.ECODE_EXISTS)
974
975     group.serial_no = 1
976     group.ctime = group.mtime = time.time()
977     group.UpgradeConfig()
978
979     self._config_data.nodegroups[group.uuid] = group
980     self._config_data.cluster.serial_no += 1
981
982   @locking.ssynchronized(_config_lock)
983   def RemoveNodeGroup(self, group_uuid):
984     """Remove a node group from the configuration.
985
986     @type group_uuid: string
987     @param group_uuid: the UUID of the node group to remove
988
989     """
990     logging.info("Removing node group %s from configuration", group_uuid)
991
992     if group_uuid not in self._config_data.nodegroups:
993       raise errors.ConfigurationError("Unknown node group '%s'" % group_uuid)
994
995     assert len(self._config_data.nodegroups) != 1, \
996             "Group '%s' is the only group, cannot be removed" % group_uuid
997
998     del self._config_data.nodegroups[group_uuid]
999     self._config_data.cluster.serial_no += 1
1000     self._WriteConfig()
1001
1002   def _UnlockedLookupNodeGroup(self, target):
1003     """Lookup a node group's UUID.
1004
1005     @type target: string or None
1006     @param target: group name or UUID or None to look for the default
1007     @rtype: string
1008     @return: nodegroup UUID
1009     @raises errors.OpPrereqError: when the target group cannot be found
1010
1011     """
1012     if target is None:
1013       if len(self._config_data.nodegroups) != 1:
1014         raise errors.OpPrereqError("More than one node group exists. Target"
1015                                    " group must be specified explicitely.")
1016       else:
1017         return self._config_data.nodegroups.keys()[0]
1018     if target in self._config_data.nodegroups:
1019       return target
1020     for nodegroup in self._config_data.nodegroups.values():
1021       if nodegroup.name == target:
1022         return nodegroup.uuid
1023     raise errors.OpPrereqError("Node group '%s' not found" % target,
1024                                errors.ECODE_NOENT)
1025
1026   @locking.ssynchronized(_config_lock, shared=1)
1027   def LookupNodeGroup(self, target):
1028     """Lookup a node group's UUID.
1029
1030     This function is just a wrapper over L{_UnlockedLookupNodeGroup}.
1031
1032     @type target: string or None
1033     @param target: group name or UUID or None to look for the default
1034     @rtype: string
1035     @return: nodegroup UUID
1036
1037     """
1038     return self._UnlockedLookupNodeGroup(target)
1039
1040   def _UnlockedGetNodeGroup(self, uuid):
1041     """Lookup a node group.
1042
1043     @type uuid: string
1044     @param uuid: group UUID
1045     @rtype: L{objects.NodeGroup} or None
1046     @return: nodegroup object, or None if not found
1047
1048     """
1049     if uuid not in self._config_data.nodegroups:
1050       return None
1051
1052     return self._config_data.nodegroups[uuid]
1053
1054   @locking.ssynchronized(_config_lock, shared=1)
1055   def GetNodeGroup(self, uuid):
1056     """Lookup a node group.
1057
1058     @type uuid: string
1059     @param uuid: group UUID
1060     @rtype: L{objects.NodeGroup} or None
1061     @return: nodegroup object, or None if not found
1062
1063     """
1064     return self._UnlockedGetNodeGroup(uuid)
1065
1066   @locking.ssynchronized(_config_lock, shared=1)
1067   def GetAllNodeGroupsInfo(self):
1068     """Get the configuration of all node groups.
1069
1070     """
1071     return dict(self._config_data.nodegroups)
1072
1073   @locking.ssynchronized(_config_lock, shared=1)
1074   def GetNodeGroupList(self):
1075     """Get a list of node groups.
1076
1077     """
1078     return self._config_data.nodegroups.keys()
1079
1080   @locking.ssynchronized(_config_lock, shared=1)
1081   def GetNodeGroupMembersByNodes(self, nodes):
1082     """Get nodes which are member in the same nodegroups as the given nodes.
1083
1084     """
1085     ngfn = lambda node_name: self._UnlockedGetNodeInfo(node_name).group
1086     return frozenset(member_name
1087                      for node_name in nodes
1088                      for member_name in
1089                        self._UnlockedGetNodeGroup(ngfn(node_name)).members)
1090
1091   @locking.ssynchronized(_config_lock)
1092   def AddInstance(self, instance, ec_id):
1093     """Add an instance to the config.
1094
1095     This should be used after creating a new instance.
1096
1097     @type instance: L{objects.Instance}
1098     @param instance: the instance object
1099
1100     """
1101     if not isinstance(instance, objects.Instance):
1102       raise errors.ProgrammerError("Invalid type passed to AddInstance")
1103
1104     if instance.disk_template != constants.DT_DISKLESS:
1105       all_lvs = instance.MapLVsByNode()
1106       logging.info("Instance '%s' DISK_LAYOUT: %s", instance.name, all_lvs)
1107
1108     all_macs = self._AllMACs()
1109     for nic in instance.nics:
1110       if nic.mac in all_macs:
1111         raise errors.ConfigurationError("Cannot add instance %s:"
1112                                         " MAC address '%s' already in use." %
1113                                         (instance.name, nic.mac))
1114
1115     self._EnsureUUID(instance, ec_id)
1116
1117     instance.serial_no = 1
1118     instance.ctime = instance.mtime = time.time()
1119     self._config_data.instances[instance.name] = instance
1120     self._config_data.cluster.serial_no += 1
1121     self._UnlockedReleaseDRBDMinors(instance.name)
1122     self._WriteConfig()
1123
1124   def _EnsureUUID(self, item, ec_id):
1125     """Ensures a given object has a valid UUID.
1126
1127     @param item: the instance or node to be checked
1128     @param ec_id: the execution context id for the uuid reservation
1129
1130     """
1131     if not item.uuid:
1132       item.uuid = self._GenerateUniqueID(ec_id)
1133     elif item.uuid in self._AllIDs(include_temporary=True):
1134       raise errors.ConfigurationError("Cannot add '%s': UUID %s already"
1135                                       " in use" % (item.name, item.uuid))
1136
1137   def _SetInstanceStatus(self, instance_name, status):
1138     """Set the instance's status to a given value.
1139
1140     """
1141     assert isinstance(status, bool), \
1142            "Invalid status '%s' passed to SetInstanceStatus" % (status,)
1143
1144     if instance_name not in self._config_data.instances:
1145       raise errors.ConfigurationError("Unknown instance '%s'" %
1146                                       instance_name)
1147     instance = self._config_data.instances[instance_name]
1148     if instance.admin_up != status:
1149       instance.admin_up = status
1150       instance.serial_no += 1
1151       instance.mtime = time.time()
1152       self._WriteConfig()
1153
1154   @locking.ssynchronized(_config_lock)
1155   def MarkInstanceUp(self, instance_name):
1156     """Mark the instance status to up in the config.
1157
1158     """
1159     self._SetInstanceStatus(instance_name, True)
1160
1161   @locking.ssynchronized(_config_lock)
1162   def RemoveInstance(self, instance_name):
1163     """Remove the instance from the configuration.
1164
1165     """
1166     if instance_name not in self._config_data.instances:
1167       raise errors.ConfigurationError("Unknown instance '%s'" % instance_name)
1168     del self._config_data.instances[instance_name]
1169     self._config_data.cluster.serial_no += 1
1170     self._WriteConfig()
1171
1172   @locking.ssynchronized(_config_lock)
1173   def RenameInstance(self, old_name, new_name):
1174     """Rename an instance.
1175
1176     This needs to be done in ConfigWriter and not by RemoveInstance
1177     combined with AddInstance as only we can guarantee an atomic
1178     rename.
1179
1180     """
1181     if old_name not in self._config_data.instances:
1182       raise errors.ConfigurationError("Unknown instance '%s'" % old_name)
1183     inst = self._config_data.instances[old_name]
1184     del self._config_data.instances[old_name]
1185     inst.name = new_name
1186
1187     for disk in inst.disks:
1188       if disk.dev_type == constants.LD_FILE:
1189         # rename the file paths in logical and physical id
1190         file_storage_dir = os.path.dirname(os.path.dirname(disk.logical_id[1]))
1191         disk_fname = "disk%s" % disk.iv_name.split("/")[1]
1192         disk.physical_id = disk.logical_id = (disk.logical_id[0],
1193                                               utils.PathJoin(file_storage_dir,
1194                                                              inst.name,
1195                                                              disk_fname))
1196
1197     # Force update of ssconf files
1198     self._config_data.cluster.serial_no += 1
1199
1200     self._config_data.instances[inst.name] = inst
1201     self._WriteConfig()
1202
1203   @locking.ssynchronized(_config_lock)
1204   def MarkInstanceDown(self, instance_name):
1205     """Mark the status of an instance to down in the configuration.
1206
1207     """
1208     self._SetInstanceStatus(instance_name, False)
1209
1210   def _UnlockedGetInstanceList(self):
1211     """Get the list of instances.
1212
1213     This function is for internal use, when the config lock is already held.
1214
1215     """
1216     return self._config_data.instances.keys()
1217
1218   @locking.ssynchronized(_config_lock, shared=1)
1219   def GetInstanceList(self):
1220     """Get the list of instances.
1221
1222     @return: array of instances, ex. ['instance2.example.com',
1223         'instance1.example.com']
1224
1225     """
1226     return self._UnlockedGetInstanceList()
1227
1228   def ExpandInstanceName(self, short_name):
1229     """Attempt to expand an incomplete instance name.
1230
1231     """
1232     # Locking is done in L{ConfigWriter.GetInstanceList}
1233     return _MatchNameComponentIgnoreCase(short_name, self.GetInstanceList())
1234
1235   def _UnlockedGetInstanceInfo(self, instance_name):
1236     """Returns information about an instance.
1237
1238     This function is for internal use, when the config lock is already held.
1239
1240     """
1241     if instance_name not in self._config_data.instances:
1242       return None
1243
1244     return self._config_data.instances[instance_name]
1245
1246   @locking.ssynchronized(_config_lock, shared=1)
1247   def GetInstanceInfo(self, instance_name):
1248     """Returns information about an instance.
1249
1250     It takes the information from the configuration file. Other information of
1251     an instance are taken from the live systems.
1252
1253     @param instance_name: name of the instance, e.g.
1254         I{instance1.example.com}
1255
1256     @rtype: L{objects.Instance}
1257     @return: the instance object
1258
1259     """
1260     return self._UnlockedGetInstanceInfo(instance_name)
1261
1262   @locking.ssynchronized(_config_lock, shared=1)
1263   def GetInstanceNodeGroups(self, instance_name, primary_only=False):
1264     """Returns set of node group UUIDs for instance's nodes.
1265
1266     @rtype: frozenset
1267
1268     """
1269     instance = self._UnlockedGetInstanceInfo(instance_name)
1270     if not instance:
1271       raise errors.ConfigurationError("Unknown instance '%s'" % instance_name)
1272
1273     if primary_only:
1274       nodes = [instance.primary_node]
1275     else:
1276       nodes = instance.all_nodes
1277
1278     return frozenset(self._UnlockedGetNodeInfo(node_name).group
1279                      for node_name in nodes)
1280
1281   @locking.ssynchronized(_config_lock, shared=1)
1282   def GetMultiInstanceInfo(self, instances):
1283     """Get the configuration of multiple instances.
1284
1285     @param instances: list of instance names
1286     @rtype: list
1287     @return: list of tuples (instance, instance_info), where
1288         instance_info is what would GetInstanceInfo return for the
1289         node, while keeping the original order
1290
1291     """
1292     return [(name, self._UnlockedGetInstanceInfo(name)) for name in instances]
1293
1294   @locking.ssynchronized(_config_lock, shared=1)
1295   def GetAllInstancesInfo(self):
1296     """Get the configuration of all instances.
1297
1298     @rtype: dict
1299     @return: dict of (instance, instance_info), where instance_info is what
1300               would GetInstanceInfo return for the node
1301
1302     """
1303     my_dict = dict([(instance, self._UnlockedGetInstanceInfo(instance))
1304                     for instance in self._UnlockedGetInstanceList()])
1305     return my_dict
1306
1307   @locking.ssynchronized(_config_lock)
1308   def AddNode(self, node, ec_id):
1309     """Add a node to the configuration.
1310
1311     @type node: L{objects.Node}
1312     @param node: a Node instance
1313
1314     """
1315     logging.info("Adding node %s to configuration", node.name)
1316
1317     self._EnsureUUID(node, ec_id)
1318
1319     node.serial_no = 1
1320     node.ctime = node.mtime = time.time()
1321     self._UnlockedAddNodeToGroup(node.name, node.group)
1322     self._config_data.nodes[node.name] = node
1323     self._config_data.cluster.serial_no += 1
1324     self._WriteConfig()
1325
1326   @locking.ssynchronized(_config_lock)
1327   def RemoveNode(self, node_name):
1328     """Remove a node from the configuration.
1329
1330     """
1331     logging.info("Removing node %s from configuration", node_name)
1332
1333     if node_name not in self._config_data.nodes:
1334       raise errors.ConfigurationError("Unknown node '%s'" % node_name)
1335
1336     self._UnlockedRemoveNodeFromGroup(self._config_data.nodes[node_name])
1337     del self._config_data.nodes[node_name]
1338     self._config_data.cluster.serial_no += 1
1339     self._WriteConfig()
1340
1341   def ExpandNodeName(self, short_name):
1342     """Attempt to expand an incomplete node name.
1343
1344     """
1345     # Locking is done in L{ConfigWriter.GetNodeList}
1346     return _MatchNameComponentIgnoreCase(short_name, self.GetNodeList())
1347
1348   def _UnlockedGetNodeInfo(self, node_name):
1349     """Get the configuration of a node, as stored in the config.
1350
1351     This function is for internal use, when the config lock is already
1352     held.
1353
1354     @param node_name: the node name, e.g. I{node1.example.com}
1355
1356     @rtype: L{objects.Node}
1357     @return: the node object
1358
1359     """
1360     if node_name not in self._config_data.nodes:
1361       return None
1362
1363     return self._config_data.nodes[node_name]
1364
1365   @locking.ssynchronized(_config_lock, shared=1)
1366   def GetNodeInfo(self, node_name):
1367     """Get the configuration of a node, as stored in the config.
1368
1369     This is just a locked wrapper over L{_UnlockedGetNodeInfo}.
1370
1371     @param node_name: the node name, e.g. I{node1.example.com}
1372
1373     @rtype: L{objects.Node}
1374     @return: the node object
1375
1376     """
1377     return self._UnlockedGetNodeInfo(node_name)
1378
1379   @locking.ssynchronized(_config_lock, shared=1)
1380   def GetNodeInstances(self, node_name):
1381     """Get the instances of a node, as stored in the config.
1382
1383     @param node_name: the node name, e.g. I{node1.example.com}
1384
1385     @rtype: (list, list)
1386     @return: a tuple with two lists: the primary and the secondary instances
1387
1388     """
1389     pri = []
1390     sec = []
1391     for inst in self._config_data.instances.values():
1392       if inst.primary_node == node_name:
1393         pri.append(inst.name)
1394       if node_name in inst.secondary_nodes:
1395         sec.append(inst.name)
1396     return (pri, sec)
1397
1398   @locking.ssynchronized(_config_lock, shared=1)
1399   def GetNodeGroupInstances(self, uuid, primary_only=False):
1400     """Get the instances of a node group.
1401
1402     @param uuid: Node group UUID
1403     @param primary_only: Whether to only consider primary nodes
1404     @rtype: frozenset
1405     @return: List of instance names in node group
1406
1407     """
1408     if primary_only:
1409       nodes_fn = lambda inst: [inst.primary_node]
1410     else:
1411       nodes_fn = lambda inst: inst.all_nodes
1412
1413     return frozenset(inst.name
1414                      for inst in self._config_data.instances.values()
1415                      for node_name in nodes_fn(inst)
1416                      if self._UnlockedGetNodeInfo(node_name).group == uuid)
1417
1418   def _UnlockedGetNodeList(self):
1419     """Return the list of nodes which are in the configuration.
1420
1421     This function is for internal use, when the config lock is already
1422     held.
1423
1424     @rtype: list
1425
1426     """
1427     return self._config_data.nodes.keys()
1428
1429   @locking.ssynchronized(_config_lock, shared=1)
1430   def GetNodeList(self):
1431     """Return the list of nodes which are in the configuration.
1432
1433     """
1434     return self._UnlockedGetNodeList()
1435
1436   def _UnlockedGetOnlineNodeList(self):
1437     """Return the list of nodes which are online.
1438
1439     """
1440     all_nodes = [self._UnlockedGetNodeInfo(node)
1441                  for node in self._UnlockedGetNodeList()]
1442     return [node.name for node in all_nodes if not node.offline]
1443
1444   @locking.ssynchronized(_config_lock, shared=1)
1445   def GetOnlineNodeList(self):
1446     """Return the list of nodes which are online.
1447
1448     """
1449     return self._UnlockedGetOnlineNodeList()
1450
1451   @locking.ssynchronized(_config_lock, shared=1)
1452   def GetVmCapableNodeList(self):
1453     """Return the list of nodes which are not vm capable.
1454
1455     """
1456     all_nodes = [self._UnlockedGetNodeInfo(node)
1457                  for node in self._UnlockedGetNodeList()]
1458     return [node.name for node in all_nodes if node.vm_capable]
1459
1460   @locking.ssynchronized(_config_lock, shared=1)
1461   def GetNonVmCapableNodeList(self):
1462     """Return the list of nodes which are not vm capable.
1463
1464     """
1465     all_nodes = [self._UnlockedGetNodeInfo(node)
1466                  for node in self._UnlockedGetNodeList()]
1467     return [node.name for node in all_nodes if not node.vm_capable]
1468
1469   @locking.ssynchronized(_config_lock, shared=1)
1470   def GetMultiNodeInfo(self, nodes):
1471     """Get the configuration of multiple nodes.
1472
1473     @param nodes: list of node names
1474     @rtype: list
1475     @return: list of tuples of (node, node_info), where node_info is
1476         what would GetNodeInfo return for the node, in the original
1477         order
1478
1479     """
1480     return [(name, self._UnlockedGetNodeInfo(name)) for name in nodes]
1481
1482   @locking.ssynchronized(_config_lock, shared=1)
1483   def GetAllNodesInfo(self):
1484     """Get the configuration of all nodes.
1485
1486     @rtype: dict
1487     @return: dict of (node, node_info), where node_info is what
1488               would GetNodeInfo return for the node
1489
1490     """
1491     my_dict = dict([(node, self._UnlockedGetNodeInfo(node))
1492                     for node in self._UnlockedGetNodeList()])
1493     return my_dict
1494
1495   @locking.ssynchronized(_config_lock, shared=1)
1496   def GetNodeGroupsFromNodes(self, nodes):
1497     """Returns groups for a list of nodes.
1498
1499     @type nodes: list of string
1500     @param nodes: List of node names
1501     @rtype: frozenset
1502
1503     """
1504     return frozenset(self._UnlockedGetNodeInfo(name).group for name in nodes)
1505
1506   def _UnlockedGetMasterCandidateStats(self, exceptions=None):
1507     """Get the number of current and maximum desired and possible candidates.
1508
1509     @type exceptions: list
1510     @param exceptions: if passed, list of nodes that should be ignored
1511     @rtype: tuple
1512     @return: tuple of (current, desired and possible, possible)
1513
1514     """
1515     mc_now = mc_should = mc_max = 0
1516     for node in self._config_data.nodes.values():
1517       if exceptions and node.name in exceptions:
1518         continue
1519       if not (node.offline or node.drained) and node.master_capable:
1520         mc_max += 1
1521       if node.master_candidate:
1522         mc_now += 1
1523     mc_should = min(mc_max, self._config_data.cluster.candidate_pool_size)
1524     return (mc_now, mc_should, mc_max)
1525
1526   @locking.ssynchronized(_config_lock, shared=1)
1527   def GetMasterCandidateStats(self, exceptions=None):
1528     """Get the number of current and maximum possible candidates.
1529
1530     This is just a wrapper over L{_UnlockedGetMasterCandidateStats}.
1531
1532     @type exceptions: list
1533     @param exceptions: if passed, list of nodes that should be ignored
1534     @rtype: tuple
1535     @return: tuple of (current, max)
1536
1537     """
1538     return self._UnlockedGetMasterCandidateStats(exceptions)
1539
1540   @locking.ssynchronized(_config_lock)
1541   def MaintainCandidatePool(self, exceptions):
1542     """Try to grow the candidate pool to the desired size.
1543
1544     @type exceptions: list
1545     @param exceptions: if passed, list of nodes that should be ignored
1546     @rtype: list
1547     @return: list with the adjusted nodes (L{objects.Node} instances)
1548
1549     """
1550     mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats(exceptions)
1551     mod_list = []
1552     if mc_now < mc_max:
1553       node_list = self._config_data.nodes.keys()
1554       random.shuffle(node_list)
1555       for name in node_list:
1556         if mc_now >= mc_max:
1557           break
1558         node = self._config_data.nodes[name]
1559         if (node.master_candidate or node.offline or node.drained or
1560             node.name in exceptions or not node.master_capable):
1561           continue
1562         mod_list.append(node)
1563         node.master_candidate = True
1564         node.serial_no += 1
1565         mc_now += 1
1566       if mc_now != mc_max:
1567         # this should not happen
1568         logging.warning("Warning: MaintainCandidatePool didn't manage to"
1569                         " fill the candidate pool (%d/%d)", mc_now, mc_max)
1570       if mod_list:
1571         self._config_data.cluster.serial_no += 1
1572         self._WriteConfig()
1573
1574     return mod_list
1575
1576   def _UnlockedAddNodeToGroup(self, node_name, nodegroup_uuid):
1577     """Add a given node to the specified group.
1578
1579     """
1580     if nodegroup_uuid not in self._config_data.nodegroups:
1581       # This can happen if a node group gets deleted between its lookup and
1582       # when we're adding the first node to it, since we don't keep a lock in
1583       # the meantime. It's ok though, as we'll fail cleanly if the node group
1584       # is not found anymore.
1585       raise errors.OpExecError("Unknown node group: %s" % nodegroup_uuid)
1586     if node_name not in self._config_data.nodegroups[nodegroup_uuid].members:
1587       self._config_data.nodegroups[nodegroup_uuid].members.append(node_name)
1588
1589   def _UnlockedRemoveNodeFromGroup(self, node):
1590     """Remove a given node from its group.
1591
1592     """
1593     nodegroup = node.group
1594     if nodegroup not in self._config_data.nodegroups:
1595       logging.warning("Warning: node '%s' has unknown node group '%s'"
1596                       " (while being removed from it)", node.name, nodegroup)
1597     nodegroup_obj = self._config_data.nodegroups[nodegroup]
1598     if node.name not in nodegroup_obj.members:
1599       logging.warning("Warning: node '%s' not a member of its node group '%s'"
1600                       " (while being removed from it)", node.name, nodegroup)
1601     else:
1602       nodegroup_obj.members.remove(node.name)
1603
1604   def _BumpSerialNo(self):
1605     """Bump up the serial number of the config.
1606
1607     """
1608     self._config_data.serial_no += 1
1609     self._config_data.mtime = time.time()
1610
1611   def _AllUUIDObjects(self):
1612     """Returns all objects with uuid attributes.
1613
1614     """
1615     return (self._config_data.instances.values() +
1616             self._config_data.nodes.values() +
1617             self._config_data.nodegroups.values() +
1618             [self._config_data.cluster])
1619
1620   def _OpenConfig(self, accept_foreign):
1621     """Read the config data from disk.
1622
1623     """
1624     raw_data = utils.ReadFile(self._cfg_file)
1625
1626     try:
1627       data = objects.ConfigData.FromDict(serializer.Load(raw_data))
1628     except Exception, err:
1629       raise errors.ConfigurationError(err)
1630
1631     # Make sure the configuration has the right version
1632     _ValidateConfig(data)
1633
1634     if (not hasattr(data, 'cluster') or
1635         not hasattr(data.cluster, 'rsahostkeypub')):
1636       raise errors.ConfigurationError("Incomplete configuration"
1637                                       " (missing cluster.rsahostkeypub)")
1638
1639     if data.cluster.master_node != self._my_hostname and not accept_foreign:
1640       msg = ("The configuration denotes node %s as master, while my"
1641              " hostname is %s; opening a foreign configuration is only"
1642              " possible in accept_foreign mode" %
1643              (data.cluster.master_node, self._my_hostname))
1644       raise errors.ConfigurationError(msg)
1645
1646     # Upgrade configuration if needed
1647     data.UpgradeConfig()
1648
1649     self._config_data = data
1650     # reset the last serial as -1 so that the next write will cause
1651     # ssconf update
1652     self._last_cluster_serial = -1
1653
1654     # And finally run our (custom) config upgrade sequence
1655     self._UpgradeConfig()
1656
1657     self._cfg_id = utils.GetFileID(path=self._cfg_file)
1658
1659   def _UpgradeConfig(self):
1660     """Run upgrade steps that cannot be done purely in the objects.
1661
1662     This is because some data elements need uniqueness across the
1663     whole configuration, etc.
1664
1665     @warning: this function will call L{_WriteConfig()}, but also
1666         L{DropECReservations} so it needs to be called only from a
1667         "safe" place (the constructor). If one wanted to call it with
1668         the lock held, a DropECReservationUnlocked would need to be
1669         created first, to avoid causing deadlock.
1670
1671     """
1672     modified = False
1673     for item in self._AllUUIDObjects():
1674       if item.uuid is None:
1675         item.uuid = self._GenerateUniqueID(_UPGRADE_CONFIG_JID)
1676         modified = True
1677     if not self._config_data.nodegroups:
1678       default_nodegroup_name = constants.INITIAL_NODE_GROUP_NAME
1679       default_nodegroup = objects.NodeGroup(name=default_nodegroup_name,
1680                                             members=[])
1681       self._UnlockedAddNodeGroup(default_nodegroup, _UPGRADE_CONFIG_JID, True)
1682       modified = True
1683     for node in self._config_data.nodes.values():
1684       if not node.group:
1685         node.group = self.LookupNodeGroup(None)
1686         modified = True
1687       # This is technically *not* an upgrade, but needs to be done both when
1688       # nodegroups are being added, and upon normally loading the config,
1689       # because the members list of a node group is discarded upon
1690       # serializing/deserializing the object.
1691       self._UnlockedAddNodeToGroup(node.name, node.group)
1692     if modified:
1693       self._WriteConfig()
1694       # This is ok even if it acquires the internal lock, as _UpgradeConfig is
1695       # only called at config init time, without the lock held
1696       self.DropECReservations(_UPGRADE_CONFIG_JID)
1697
1698   def _DistributeConfig(self, feedback_fn):
1699     """Distribute the configuration to the other nodes.
1700
1701     Currently, this only copies the configuration file. In the future,
1702     it could be used to encapsulate the 2/3-phase update mechanism.
1703
1704     """
1705     if self._offline:
1706       return True
1707
1708     bad = False
1709
1710     node_list = []
1711     addr_list = []
1712     myhostname = self._my_hostname
1713     # we can skip checking whether _UnlockedGetNodeInfo returns None
1714     # since the node list comes from _UnlocketGetNodeList, and we are
1715     # called with the lock held, so no modifications should take place
1716     # in between
1717     for node_name in self._UnlockedGetNodeList():
1718       if node_name == myhostname:
1719         continue
1720       node_info = self._UnlockedGetNodeInfo(node_name)
1721       if not node_info.master_candidate:
1722         continue
1723       node_list.append(node_info.name)
1724       addr_list.append(node_info.primary_ip)
1725
1726     result = rpc.RpcRunner.call_upload_file(node_list, self._cfg_file,
1727                                             address_list=addr_list)
1728     for to_node, to_result in result.items():
1729       msg = to_result.fail_msg
1730       if msg:
1731         msg = ("Copy of file %s to node %s failed: %s" %
1732                (self._cfg_file, to_node, msg))
1733         logging.error(msg)
1734
1735         if feedback_fn:
1736           feedback_fn(msg)
1737
1738         bad = True
1739
1740     return not bad
1741
1742   def _WriteConfig(self, destination=None, feedback_fn=None):
1743     """Write the configuration data to persistent storage.
1744
1745     """
1746     assert feedback_fn is None or callable(feedback_fn)
1747
1748     # Warn on config errors, but don't abort the save - the
1749     # configuration has already been modified, and we can't revert;
1750     # the best we can do is to warn the user and save as is, leaving
1751     # recovery to the user
1752     config_errors = self._UnlockedVerifyConfig()
1753     if config_errors:
1754       errmsg = ("Configuration data is not consistent: %s" %
1755                 (utils.CommaJoin(config_errors)))
1756       logging.critical(errmsg)
1757       if feedback_fn:
1758         feedback_fn(errmsg)
1759
1760     if destination is None:
1761       destination = self._cfg_file
1762     self._BumpSerialNo()
1763     txt = serializer.Dump(self._config_data.ToDict())
1764
1765     getents = self._getents()
1766     try:
1767       fd = utils.SafeWriteFile(destination, self._cfg_id, data=txt,
1768                                close=False, gid=getents.confd_gid, mode=0640)
1769     except errors.LockError:
1770       raise errors.ConfigurationError("The configuration file has been"
1771                                       " modified since the last write, cannot"
1772                                       " update")
1773     try:
1774       self._cfg_id = utils.GetFileID(fd=fd)
1775     finally:
1776       os.close(fd)
1777
1778     self.write_count += 1
1779
1780     # and redistribute the config file to master candidates
1781     self._DistributeConfig(feedback_fn)
1782
1783     # Write ssconf files on all nodes (including locally)
1784     if self._last_cluster_serial < self._config_data.cluster.serial_no:
1785       if not self._offline:
1786         result = rpc.RpcRunner.call_write_ssconf_files(
1787           self._UnlockedGetOnlineNodeList(),
1788           self._UnlockedGetSsconfValues())
1789
1790         for nname, nresu in result.items():
1791           msg = nresu.fail_msg
1792           if msg:
1793             errmsg = ("Error while uploading ssconf files to"
1794                       " node %s: %s" % (nname, msg))
1795             logging.warning(errmsg)
1796
1797             if feedback_fn:
1798               feedback_fn(errmsg)
1799
1800       self._last_cluster_serial = self._config_data.cluster.serial_no
1801
1802   def _UnlockedGetSsconfValues(self):
1803     """Return the values needed by ssconf.
1804
1805     @rtype: dict
1806     @return: a dictionary with keys the ssconf names and values their
1807         associated value
1808
1809     """
1810     fn = "\n".join
1811     instance_names = utils.NiceSort(self._UnlockedGetInstanceList())
1812     node_names = utils.NiceSort(self._UnlockedGetNodeList())
1813     node_info = [self._UnlockedGetNodeInfo(name) for name in node_names]
1814     node_pri_ips = ["%s %s" % (ninfo.name, ninfo.primary_ip)
1815                     for ninfo in node_info]
1816     node_snd_ips = ["%s %s" % (ninfo.name, ninfo.secondary_ip)
1817                     for ninfo in node_info]
1818
1819     instance_data = fn(instance_names)
1820     off_data = fn(node.name for node in node_info if node.offline)
1821     on_data = fn(node.name for node in node_info if not node.offline)
1822     mc_data = fn(node.name for node in node_info if node.master_candidate)
1823     mc_ips_data = fn(node.primary_ip for node in node_info
1824                      if node.master_candidate)
1825     node_data = fn(node_names)
1826     node_pri_ips_data = fn(node_pri_ips)
1827     node_snd_ips_data = fn(node_snd_ips)
1828
1829     cluster = self._config_data.cluster
1830     cluster_tags = fn(cluster.GetTags())
1831
1832     hypervisor_list = fn(cluster.enabled_hypervisors)
1833
1834     uid_pool = uidpool.FormatUidPool(cluster.uid_pool, separator="\n")
1835
1836     nodegroups = ["%s %s" % (nodegroup.uuid, nodegroup.name) for nodegroup in
1837                   self._config_data.nodegroups.values()]
1838     nodegroups_data = fn(utils.NiceSort(nodegroups))
1839
1840     ssconf_values = {
1841       constants.SS_CLUSTER_NAME: cluster.cluster_name,
1842       constants.SS_CLUSTER_TAGS: cluster_tags,
1843       constants.SS_FILE_STORAGE_DIR: cluster.file_storage_dir,
1844       constants.SS_SHARED_FILE_STORAGE_DIR: cluster.shared_file_storage_dir,
1845       constants.SS_MASTER_CANDIDATES: mc_data,
1846       constants.SS_MASTER_CANDIDATES_IPS: mc_ips_data,
1847       constants.SS_MASTER_IP: cluster.master_ip,
1848       constants.SS_MASTER_NETDEV: cluster.master_netdev,
1849       constants.SS_MASTER_NETMASK: str(cluster.master_netmask),
1850       constants.SS_MASTER_NODE: cluster.master_node,
1851       constants.SS_NODE_LIST: node_data,
1852       constants.SS_NODE_PRIMARY_IPS: node_pri_ips_data,
1853       constants.SS_NODE_SECONDARY_IPS: node_snd_ips_data,
1854       constants.SS_OFFLINE_NODES: off_data,
1855       constants.SS_ONLINE_NODES: on_data,
1856       constants.SS_PRIMARY_IP_FAMILY: str(cluster.primary_ip_family),
1857       constants.SS_INSTANCE_LIST: instance_data,
1858       constants.SS_RELEASE_VERSION: constants.RELEASE_VERSION,
1859       constants.SS_HYPERVISOR_LIST: hypervisor_list,
1860       constants.SS_MAINTAIN_NODE_HEALTH: str(cluster.maintain_node_health),
1861       constants.SS_UID_POOL: uid_pool,
1862       constants.SS_NODEGROUPS: nodegroups_data,
1863       }
1864     bad_values = [(k, v) for k, v in ssconf_values.items()
1865                   if not isinstance(v, (str, basestring))]
1866     if bad_values:
1867       err = utils.CommaJoin("%s=%s" % (k, v) for k, v in bad_values)
1868       raise errors.ConfigurationError("Some ssconf key(s) have non-string"
1869                                       " values: %s" % err)
1870     return ssconf_values
1871
1872   @locking.ssynchronized(_config_lock, shared=1)
1873   def GetSsconfValues(self):
1874     """Wrapper using lock around _UnlockedGetSsconf().
1875
1876     """
1877     return self._UnlockedGetSsconfValues()
1878
1879   @locking.ssynchronized(_config_lock, shared=1)
1880   def GetVGName(self):
1881     """Return the volume group name.
1882
1883     """
1884     return self._config_data.cluster.volume_group_name
1885
1886   @locking.ssynchronized(_config_lock)
1887   def SetVGName(self, vg_name):
1888     """Set the volume group name.
1889
1890     """
1891     self._config_data.cluster.volume_group_name = vg_name
1892     self._config_data.cluster.serial_no += 1
1893     self._WriteConfig()
1894
1895   @locking.ssynchronized(_config_lock, shared=1)
1896   def GetDRBDHelper(self):
1897     """Return DRBD usermode helper.
1898
1899     """
1900     return self._config_data.cluster.drbd_usermode_helper
1901
1902   @locking.ssynchronized(_config_lock)
1903   def SetDRBDHelper(self, drbd_helper):
1904     """Set DRBD usermode helper.
1905
1906     """
1907     self._config_data.cluster.drbd_usermode_helper = drbd_helper
1908     self._config_data.cluster.serial_no += 1
1909     self._WriteConfig()
1910
1911   @locking.ssynchronized(_config_lock, shared=1)
1912   def GetMACPrefix(self):
1913     """Return the mac prefix.
1914
1915     """
1916     return self._config_data.cluster.mac_prefix
1917
1918   @locking.ssynchronized(_config_lock, shared=1)
1919   def GetClusterInfo(self):
1920     """Returns information about the cluster
1921
1922     @rtype: L{objects.Cluster}
1923     @return: the cluster object
1924
1925     """
1926     return self._config_data.cluster
1927
1928   @locking.ssynchronized(_config_lock, shared=1)
1929   def HasAnyDiskOfType(self, dev_type):
1930     """Check if in there is at disk of the given type in the configuration.
1931
1932     """
1933     return self._config_data.HasAnyDiskOfType(dev_type)
1934
1935   @locking.ssynchronized(_config_lock)
1936   def Update(self, target, feedback_fn):
1937     """Notify function to be called after updates.
1938
1939     This function must be called when an object (as returned by
1940     GetInstanceInfo, GetNodeInfo, GetCluster) has been updated and the
1941     caller wants the modifications saved to the backing store. Note
1942     that all modified objects will be saved, but the target argument
1943     is the one the caller wants to ensure that it's saved.
1944
1945     @param target: an instance of either L{objects.Cluster},
1946         L{objects.Node} or L{objects.Instance} which is existing in
1947         the cluster
1948     @param feedback_fn: Callable feedback function
1949
1950     """
1951     if self._config_data is None:
1952       raise errors.ProgrammerError("Configuration file not read,"
1953                                    " cannot save.")
1954     update_serial = False
1955     if isinstance(target, objects.Cluster):
1956       test = target == self._config_data.cluster
1957     elif isinstance(target, objects.Node):
1958       test = target in self._config_data.nodes.values()
1959       update_serial = True
1960     elif isinstance(target, objects.Instance):
1961       test = target in self._config_data.instances.values()
1962     elif isinstance(target, objects.NodeGroup):
1963       test = target in self._config_data.nodegroups.values()
1964     else:
1965       raise errors.ProgrammerError("Invalid object type (%s) passed to"
1966                                    " ConfigWriter.Update" % type(target))
1967     if not test:
1968       raise errors.ConfigurationError("Configuration updated since object"
1969                                       " has been read or unknown object")
1970     target.serial_no += 1
1971     target.mtime = now = time.time()
1972
1973     if update_serial:
1974       # for node updates, we need to increase the cluster serial too
1975       self._config_data.cluster.serial_no += 1
1976       self._config_data.cluster.mtime = now
1977
1978     if isinstance(target, objects.Instance):
1979       self._UnlockedReleaseDRBDMinors(target.name)
1980
1981     self._WriteConfig(feedback_fn=feedback_fn)
1982
1983   @locking.ssynchronized(_config_lock)
1984   def DropECReservations(self, ec_id):
1985     """Drop per-execution-context reservations
1986
1987     """
1988     for rm in self._all_rms:
1989       rm.DropECReservations(ec_id)