Add QA test for “gnt-debug delay”
[ganeti-local] / lib / config.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Configuration management for Ganeti
23
24 This module provides the interface to the Ganeti cluster configuration.
25
26 The configuration data is stored on every node but is updated on the master
27 only. After each update, the master distributes the data to the other nodes.
28
29 Currently, the data storage format is JSON. YAML was slow and consuming too
30 much memory.
31
32 """
33
34 # pylint: disable=R0904
35 # R0904: Too many public methods
36
37 import os
38 import random
39 import logging
40 import time
41
42 from ganeti import errors
43 from ganeti import locking
44 from ganeti import utils
45 from ganeti import constants
46 from ganeti import rpc
47 from ganeti import objects
48 from ganeti import serializer
49 from ganeti import uidpool
50 from ganeti import netutils
51 from ganeti import runtime
52
53
54 _config_lock = locking.SharedLock("ConfigWriter")
55
56 # job id used for resource management at config upgrade time
57 _UPGRADE_CONFIG_JID = "jid-cfg-upgrade"
58
59
60 def _ValidateConfig(data):
61   """Verifies that a configuration objects looks valid.
62
63   This only verifies the version of the configuration.
64
65   @raise errors.ConfigurationError: if the version differs from what
66       we expect
67
68   """
69   if data.version != constants.CONFIG_VERSION:
70     raise errors.ConfigVersionMismatch(constants.CONFIG_VERSION, data.version)
71
72
73 class TemporaryReservationManager:
74   """A temporary resource reservation manager.
75
76   This is used to reserve resources in a job, before using them, making sure
77   other jobs cannot get them in the meantime.
78
79   """
80   def __init__(self):
81     self._ec_reserved = {}
82
83   def Reserved(self, resource):
84     for holder_reserved in self._ec_reserved.values():
85       if resource in holder_reserved:
86         return True
87     return False
88
89   def Reserve(self, ec_id, resource):
90     if self.Reserved(resource):
91       raise errors.ReservationError("Duplicate reservation for resource '%s'"
92                                     % str(resource))
93     if ec_id not in self._ec_reserved:
94       self._ec_reserved[ec_id] = set([resource])
95     else:
96       self._ec_reserved[ec_id].add(resource)
97
98   def DropECReservations(self, ec_id):
99     if ec_id in self._ec_reserved:
100       del self._ec_reserved[ec_id]
101
102   def GetReserved(self):
103     all_reserved = set()
104     for holder_reserved in self._ec_reserved.values():
105       all_reserved.update(holder_reserved)
106     return all_reserved
107
108   def Generate(self, existing, generate_one_fn, ec_id):
109     """Generate a new resource of this type
110
111     """
112     assert callable(generate_one_fn)
113
114     all_elems = self.GetReserved()
115     all_elems.update(existing)
116     retries = 64
117     while retries > 0:
118       new_resource = generate_one_fn()
119       if new_resource is not None and new_resource not in all_elems:
120         break
121     else:
122       raise errors.ConfigurationError("Not able generate new resource"
123                                       " (last tried: %s)" % new_resource)
124     self.Reserve(ec_id, new_resource)
125     return new_resource
126
127
128 def _MatchNameComponentIgnoreCase(short_name, names):
129   """Wrapper around L{utils.text.MatchNameComponent}.
130
131   """
132   return utils.MatchNameComponent(short_name, names, case_sensitive=False)
133
134
135 class ConfigWriter:
136   """The interface to the cluster configuration.
137
138   @ivar _temporary_lvs: reservation manager for temporary LVs
139   @ivar _all_rms: a list of all temporary reservation managers
140
141   """
142   def __init__(self, cfg_file=None, offline=False, _getents=runtime.GetEnts,
143                accept_foreign=False):
144     self.write_count = 0
145     self._lock = _config_lock
146     self._config_data = None
147     self._offline = offline
148     if cfg_file is None:
149       self._cfg_file = constants.CLUSTER_CONF_FILE
150     else:
151       self._cfg_file = cfg_file
152     self._getents = _getents
153     self._temporary_ids = TemporaryReservationManager()
154     self._temporary_drbds = {}
155     self._temporary_macs = TemporaryReservationManager()
156     self._temporary_secrets = TemporaryReservationManager()
157     self._temporary_lvs = TemporaryReservationManager()
158     self._all_rms = [self._temporary_ids, self._temporary_macs,
159                      self._temporary_secrets, self._temporary_lvs]
160     # Note: in order to prevent errors when resolving our name in
161     # _DistributeConfig, we compute it here once and reuse it; it's
162     # better to raise an error before starting to modify the config
163     # file than after it was modified
164     self._my_hostname = netutils.Hostname.GetSysName()
165     self._last_cluster_serial = -1
166     self._cfg_id = None
167     self._context = None
168     self._OpenConfig(accept_foreign)
169
170   def _GetRpc(self, address_list):
171     """Returns RPC runner for configuration.
172
173     """
174     return rpc.ConfigRunner(self._context, address_list)
175
176   def SetContext(self, context):
177     """Sets Ganeti context.
178
179     """
180     self._context = context
181
182   # this method needs to be static, so that we can call it on the class
183   @staticmethod
184   def IsCluster():
185     """Check if the cluster is configured.
186
187     """
188     return os.path.exists(constants.CLUSTER_CONF_FILE)
189
190   def _GenerateOneMAC(self):
191     """Generate one mac address
192
193     """
194     prefix = self._config_data.cluster.mac_prefix
195     byte1 = random.randrange(0, 256)
196     byte2 = random.randrange(0, 256)
197     byte3 = random.randrange(0, 256)
198     mac = "%s:%02x:%02x:%02x" % (prefix, byte1, byte2, byte3)
199     return mac
200
201   @locking.ssynchronized(_config_lock, shared=1)
202   def GetNdParams(self, node):
203     """Get the node params populated with cluster defaults.
204
205     @type node: L{objects.Node}
206     @param node: The node we want to know the params for
207     @return: A dict with the filled in node params
208
209     """
210     nodegroup = self._UnlockedGetNodeGroup(node.group)
211     return self._config_data.cluster.FillND(node, nodegroup)
212
213   @locking.ssynchronized(_config_lock, shared=1)
214   def GenerateMAC(self, ec_id):
215     """Generate a MAC for an instance.
216
217     This should check the current instances for duplicates.
218
219     """
220     existing = self._AllMACs()
221     return self._temporary_ids.Generate(existing, self._GenerateOneMAC, ec_id)
222
223   @locking.ssynchronized(_config_lock, shared=1)
224   def ReserveMAC(self, mac, ec_id):
225     """Reserve a MAC for an instance.
226
227     This only checks instances managed by this cluster, it does not
228     check for potential collisions elsewhere.
229
230     """
231     all_macs = self._AllMACs()
232     if mac in all_macs:
233       raise errors.ReservationError("mac already in use")
234     else:
235       self._temporary_macs.Reserve(ec_id, mac)
236
237   @locking.ssynchronized(_config_lock, shared=1)
238   def ReserveLV(self, lv_name, ec_id):
239     """Reserve an VG/LV pair for an instance.
240
241     @type lv_name: string
242     @param lv_name: the logical volume name to reserve
243
244     """
245     all_lvs = self._AllLVs()
246     if lv_name in all_lvs:
247       raise errors.ReservationError("LV already in use")
248     else:
249       self._temporary_lvs.Reserve(ec_id, lv_name)
250
251   @locking.ssynchronized(_config_lock, shared=1)
252   def GenerateDRBDSecret(self, ec_id):
253     """Generate a DRBD secret.
254
255     This checks the current disks for duplicates.
256
257     """
258     return self._temporary_secrets.Generate(self._AllDRBDSecrets(),
259                                             utils.GenerateSecret,
260                                             ec_id)
261
262   def _AllLVs(self):
263     """Compute the list of all LVs.
264
265     """
266     lvnames = set()
267     for instance in self._config_data.instances.values():
268       node_data = instance.MapLVsByNode()
269       for lv_list in node_data.values():
270         lvnames.update(lv_list)
271     return lvnames
272
273   def _AllIDs(self, include_temporary):
274     """Compute the list of all UUIDs and names we have.
275
276     @type include_temporary: boolean
277     @param include_temporary: whether to include the _temporary_ids set
278     @rtype: set
279     @return: a set of IDs
280
281     """
282     existing = set()
283     if include_temporary:
284       existing.update(self._temporary_ids.GetReserved())
285     existing.update(self._AllLVs())
286     existing.update(self._config_data.instances.keys())
287     existing.update(self._config_data.nodes.keys())
288     existing.update([i.uuid for i in self._AllUUIDObjects() if i.uuid])
289     return existing
290
291   def _GenerateUniqueID(self, ec_id):
292     """Generate an unique UUID.
293
294     This checks the current node, instances and disk names for
295     duplicates.
296
297     @rtype: string
298     @return: the unique id
299
300     """
301     existing = self._AllIDs(include_temporary=False)
302     return self._temporary_ids.Generate(existing, utils.NewUUID, ec_id)
303
304   @locking.ssynchronized(_config_lock, shared=1)
305   def GenerateUniqueID(self, ec_id):
306     """Generate an unique ID.
307
308     This is just a wrapper over the unlocked version.
309
310     @type ec_id: string
311     @param ec_id: unique id for the job to reserve the id to
312
313     """
314     return self._GenerateUniqueID(ec_id)
315
316   def _AllMACs(self):
317     """Return all MACs present in the config.
318
319     @rtype: list
320     @return: the list of all MACs
321
322     """
323     result = []
324     for instance in self._config_data.instances.values():
325       for nic in instance.nics:
326         result.append(nic.mac)
327
328     return result
329
330   def _AllDRBDSecrets(self):
331     """Return all DRBD secrets present in the config.
332
333     @rtype: list
334     @return: the list of all DRBD secrets
335
336     """
337     def helper(disk, result):
338       """Recursively gather secrets from this disk."""
339       if disk.dev_type == constants.DT_DRBD8:
340         result.append(disk.logical_id[5])
341       if disk.children:
342         for child in disk.children:
343           helper(child, result)
344
345     result = []
346     for instance in self._config_data.instances.values():
347       for disk in instance.disks:
348         helper(disk, result)
349
350     return result
351
352   def _CheckDiskIDs(self, disk, l_ids, p_ids):
353     """Compute duplicate disk IDs
354
355     @type disk: L{objects.Disk}
356     @param disk: the disk at which to start searching
357     @type l_ids: list
358     @param l_ids: list of current logical ids
359     @type p_ids: list
360     @param p_ids: list of current physical ids
361     @rtype: list
362     @return: a list of error messages
363
364     """
365     result = []
366     if disk.logical_id is not None:
367       if disk.logical_id in l_ids:
368         result.append("duplicate logical id %s" % str(disk.logical_id))
369       else:
370         l_ids.append(disk.logical_id)
371     if disk.physical_id is not None:
372       if disk.physical_id in p_ids:
373         result.append("duplicate physical id %s" % str(disk.physical_id))
374       else:
375         p_ids.append(disk.physical_id)
376
377     if disk.children:
378       for child in disk.children:
379         result.extend(self._CheckDiskIDs(child, l_ids, p_ids))
380     return result
381
382   def _UnlockedVerifyConfig(self):
383     """Verify function.
384
385     @rtype: list
386     @return: a list of error messages; a non-empty list signifies
387         configuration errors
388
389     """
390     # pylint: disable=R0914
391     result = []
392     seen_macs = []
393     ports = {}
394     data = self._config_data
395     cluster = data.cluster
396     seen_lids = []
397     seen_pids = []
398
399     # global cluster checks
400     if not cluster.enabled_hypervisors:
401       result.append("enabled hypervisors list doesn't have any entries")
402     invalid_hvs = set(cluster.enabled_hypervisors) - constants.HYPER_TYPES
403     if invalid_hvs:
404       result.append("enabled hypervisors contains invalid entries: %s" %
405                     invalid_hvs)
406     missing_hvp = (set(cluster.enabled_hypervisors) -
407                    set(cluster.hvparams.keys()))
408     if missing_hvp:
409       result.append("hypervisor parameters missing for the enabled"
410                     " hypervisor(s) %s" % utils.CommaJoin(missing_hvp))
411
412     if cluster.master_node not in data.nodes:
413       result.append("cluster has invalid primary node '%s'" %
414                     cluster.master_node)
415
416     def _helper(owner, attr, value, template):
417       try:
418         utils.ForceDictType(value, template)
419       except errors.GenericError, err:
420         result.append("%s has invalid %s: %s" % (owner, attr, err))
421
422     def _helper_nic(owner, params):
423       try:
424         objects.NIC.CheckParameterSyntax(params)
425       except errors.ConfigurationError, err:
426         result.append("%s has invalid nicparams: %s" % (owner, err))
427
428     # check cluster parameters
429     _helper("cluster", "beparams", cluster.SimpleFillBE({}),
430             constants.BES_PARAMETER_TYPES)
431     _helper("cluster", "nicparams", cluster.SimpleFillNIC({}),
432             constants.NICS_PARAMETER_TYPES)
433     _helper_nic("cluster", cluster.SimpleFillNIC({}))
434     _helper("cluster", "ndparams", cluster.SimpleFillND({}),
435             constants.NDS_PARAMETER_TYPES)
436
437     # per-instance checks
438     for instance_name in data.instances:
439       instance = data.instances[instance_name]
440       if instance.name != instance_name:
441         result.append("instance '%s' is indexed by wrong name '%s'" %
442                       (instance.name, instance_name))
443       if instance.primary_node not in data.nodes:
444         result.append("instance '%s' has invalid primary node '%s'" %
445                       (instance_name, instance.primary_node))
446       for snode in instance.secondary_nodes:
447         if snode not in data.nodes:
448           result.append("instance '%s' has invalid secondary node '%s'" %
449                         (instance_name, snode))
450       for idx, nic in enumerate(instance.nics):
451         if nic.mac in seen_macs:
452           result.append("instance '%s' has NIC %d mac %s duplicate" %
453                         (instance_name, idx, nic.mac))
454         else:
455           seen_macs.append(nic.mac)
456         if nic.nicparams:
457           filled = cluster.SimpleFillNIC(nic.nicparams)
458           owner = "instance %s nic %d" % (instance.name, idx)
459           _helper(owner, "nicparams",
460                   filled, constants.NICS_PARAMETER_TYPES)
461           _helper_nic(owner, filled)
462
463       # parameter checks
464       if instance.beparams:
465         _helper("instance %s" % instance.name, "beparams",
466                 cluster.FillBE(instance), constants.BES_PARAMETER_TYPES)
467
468       # gather the drbd ports for duplicate checks
469       for dsk in instance.disks:
470         if dsk.dev_type in constants.LDS_DRBD:
471           tcp_port = dsk.logical_id[2]
472           if tcp_port not in ports:
473             ports[tcp_port] = []
474           ports[tcp_port].append((instance.name, "drbd disk %s" % dsk.iv_name))
475       # gather network port reservation
476       net_port = getattr(instance, "network_port", None)
477       if net_port is not None:
478         if net_port not in ports:
479           ports[net_port] = []
480         ports[net_port].append((instance.name, "network port"))
481
482       # instance disk verify
483       for idx, disk in enumerate(instance.disks):
484         result.extend(["instance '%s' disk %d error: %s" %
485                        (instance.name, idx, msg) for msg in disk.Verify()])
486         result.extend(self._CheckDiskIDs(disk, seen_lids, seen_pids))
487
488     # cluster-wide pool of free ports
489     for free_port in cluster.tcpudp_port_pool:
490       if free_port not in ports:
491         ports[free_port] = []
492       ports[free_port].append(("cluster", "port marked as free"))
493
494     # compute tcp/udp duplicate ports
495     keys = ports.keys()
496     keys.sort()
497     for pnum in keys:
498       pdata = ports[pnum]
499       if len(pdata) > 1:
500         txt = utils.CommaJoin(["%s/%s" % val for val in pdata])
501         result.append("tcp/udp port %s has duplicates: %s" % (pnum, txt))
502
503     # highest used tcp port check
504     if keys:
505       if keys[-1] > cluster.highest_used_port:
506         result.append("Highest used port mismatch, saved %s, computed %s" %
507                       (cluster.highest_used_port, keys[-1]))
508
509     if not data.nodes[cluster.master_node].master_candidate:
510       result.append("Master node is not a master candidate")
511
512     # master candidate checks
513     mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats()
514     if mc_now < mc_max:
515       result.append("Not enough master candidates: actual %d, target %d" %
516                     (mc_now, mc_max))
517
518     # node checks
519     for node_name, node in data.nodes.items():
520       if node.name != node_name:
521         result.append("Node '%s' is indexed by wrong name '%s'" %
522                       (node.name, node_name))
523       if [node.master_candidate, node.drained, node.offline].count(True) > 1:
524         result.append("Node %s state is invalid: master_candidate=%s,"
525                       " drain=%s, offline=%s" %
526                       (node.name, node.master_candidate, node.drained,
527                        node.offline))
528       if node.group not in data.nodegroups:
529         result.append("Node '%s' has invalid group '%s'" %
530                       (node.name, node.group))
531       else:
532         _helper("node %s" % node.name, "ndparams",
533                 cluster.FillND(node, data.nodegroups[node.group]),
534                 constants.NDS_PARAMETER_TYPES)
535
536     # nodegroups checks
537     nodegroups_names = set()
538     for nodegroup_uuid in data.nodegroups:
539       nodegroup = data.nodegroups[nodegroup_uuid]
540       if nodegroup.uuid != nodegroup_uuid:
541         result.append("node group '%s' (uuid: '%s') indexed by wrong uuid '%s'"
542                       % (nodegroup.name, nodegroup.uuid, nodegroup_uuid))
543       if utils.UUID_RE.match(nodegroup.name.lower()):
544         result.append("node group '%s' (uuid: '%s') has uuid-like name" %
545                       (nodegroup.name, nodegroup.uuid))
546       if nodegroup.name in nodegroups_names:
547         result.append("duplicate node group name '%s'" % nodegroup.name)
548       else:
549         nodegroups_names.add(nodegroup.name)
550       if nodegroup.ndparams:
551         _helper("group %s" % nodegroup.name, "ndparams",
552                 cluster.SimpleFillND(nodegroup.ndparams),
553                 constants.NDS_PARAMETER_TYPES)
554
555     # drbd minors check
556     _, duplicates = self._UnlockedComputeDRBDMap()
557     for node, minor, instance_a, instance_b in duplicates:
558       result.append("DRBD minor %d on node %s is assigned twice to instances"
559                     " %s and %s" % (minor, node, instance_a, instance_b))
560
561     # IP checks
562     default_nicparams = cluster.nicparams[constants.PP_DEFAULT]
563     ips = {}
564
565     def _AddIpAddress(ip, name):
566       ips.setdefault(ip, []).append(name)
567
568     _AddIpAddress(cluster.master_ip, "cluster_ip")
569
570     for node in data.nodes.values():
571       _AddIpAddress(node.primary_ip, "node:%s/primary" % node.name)
572       if node.secondary_ip != node.primary_ip:
573         _AddIpAddress(node.secondary_ip, "node:%s/secondary" % node.name)
574
575     for instance in data.instances.values():
576       for idx, nic in enumerate(instance.nics):
577         if nic.ip is None:
578           continue
579
580         nicparams = objects.FillDict(default_nicparams, nic.nicparams)
581         nic_mode = nicparams[constants.NIC_MODE]
582         nic_link = nicparams[constants.NIC_LINK]
583
584         if nic_mode == constants.NIC_MODE_BRIDGED:
585           link = "bridge:%s" % nic_link
586         elif nic_mode == constants.NIC_MODE_ROUTED:
587           link = "route:%s" % nic_link
588         else:
589           raise errors.ProgrammerError("NIC mode '%s' not handled" % nic_mode)
590
591         _AddIpAddress("%s/%s" % (link, nic.ip),
592                       "instance:%s/nic:%d" % (instance.name, idx))
593
594     for ip, owners in ips.items():
595       if len(owners) > 1:
596         result.append("IP address %s is used by multiple owners: %s" %
597                       (ip, utils.CommaJoin(owners)))
598
599     return result
600
601   @locking.ssynchronized(_config_lock, shared=1)
602   def VerifyConfig(self):
603     """Verify function.
604
605     This is just a wrapper over L{_UnlockedVerifyConfig}.
606
607     @rtype: list
608     @return: a list of error messages; a non-empty list signifies
609         configuration errors
610
611     """
612     return self._UnlockedVerifyConfig()
613
614   def _UnlockedSetDiskID(self, disk, node_name):
615     """Convert the unique ID to the ID needed on the target nodes.
616
617     This is used only for drbd, which needs ip/port configuration.
618
619     The routine descends down and updates its children also, because
620     this helps when the only the top device is passed to the remote
621     node.
622
623     This function is for internal use, when the config lock is already held.
624
625     """
626     if disk.children:
627       for child in disk.children:
628         self._UnlockedSetDiskID(child, node_name)
629
630     if disk.logical_id is None and disk.physical_id is not None:
631       return
632     if disk.dev_type == constants.LD_DRBD8:
633       pnode, snode, port, pminor, sminor, secret = disk.logical_id
634       if node_name not in (pnode, snode):
635         raise errors.ConfigurationError("DRBD device not knowing node %s" %
636                                         node_name)
637       pnode_info = self._UnlockedGetNodeInfo(pnode)
638       snode_info = self._UnlockedGetNodeInfo(snode)
639       if pnode_info is None or snode_info is None:
640         raise errors.ConfigurationError("Can't find primary or secondary node"
641                                         " for %s" % str(disk))
642       p_data = (pnode_info.secondary_ip, port)
643       s_data = (snode_info.secondary_ip, port)
644       if pnode == node_name:
645         disk.physical_id = p_data + s_data + (pminor, secret)
646       else: # it must be secondary, we tested above
647         disk.physical_id = s_data + p_data + (sminor, secret)
648     else:
649       disk.physical_id = disk.logical_id
650     return
651
652   @locking.ssynchronized(_config_lock)
653   def SetDiskID(self, disk, node_name):
654     """Convert the unique ID to the ID needed on the target nodes.
655
656     This is used only for drbd, which needs ip/port configuration.
657
658     The routine descends down and updates its children also, because
659     this helps when the only the top device is passed to the remote
660     node.
661
662     """
663     return self._UnlockedSetDiskID(disk, node_name)
664
665   @locking.ssynchronized(_config_lock)
666   def AddTcpUdpPort(self, port):
667     """Adds a new port to the available port pool.
668
669     """
670     if not isinstance(port, int):
671       raise errors.ProgrammerError("Invalid type passed for port")
672
673     self._config_data.cluster.tcpudp_port_pool.add(port)
674     self._WriteConfig()
675
676   @locking.ssynchronized(_config_lock, shared=1)
677   def GetPortList(self):
678     """Returns a copy of the current port list.
679
680     """
681     return self._config_data.cluster.tcpudp_port_pool.copy()
682
683   @locking.ssynchronized(_config_lock)
684   def AllocatePort(self):
685     """Allocate a port.
686
687     The port will be taken from the available port pool or from the
688     default port range (and in this case we increase
689     highest_used_port).
690
691     """
692     # If there are TCP/IP ports configured, we use them first.
693     if self._config_data.cluster.tcpudp_port_pool:
694       port = self._config_data.cluster.tcpudp_port_pool.pop()
695     else:
696       port = self._config_data.cluster.highest_used_port + 1
697       if port >= constants.LAST_DRBD_PORT:
698         raise errors.ConfigurationError("The highest used port is greater"
699                                         " than %s. Aborting." %
700                                         constants.LAST_DRBD_PORT)
701       self._config_data.cluster.highest_used_port = port
702
703     self._WriteConfig()
704     return port
705
706   def _UnlockedComputeDRBDMap(self):
707     """Compute the used DRBD minor/nodes.
708
709     @rtype: (dict, list)
710     @return: dictionary of node_name: dict of minor: instance_name;
711         the returned dict will have all the nodes in it (even if with
712         an empty list), and a list of duplicates; if the duplicates
713         list is not empty, the configuration is corrupted and its caller
714         should raise an exception
715
716     """
717     def _AppendUsedPorts(instance_name, disk, used):
718       duplicates = []
719       if disk.dev_type == constants.LD_DRBD8 and len(disk.logical_id) >= 5:
720         node_a, node_b, _, minor_a, minor_b = disk.logical_id[:5]
721         for node, port in ((node_a, minor_a), (node_b, minor_b)):
722           assert node in used, ("Node '%s' of instance '%s' not found"
723                                 " in node list" % (node, instance_name))
724           if port in used[node]:
725             duplicates.append((node, port, instance_name, used[node][port]))
726           else:
727             used[node][port] = instance_name
728       if disk.children:
729         for child in disk.children:
730           duplicates.extend(_AppendUsedPorts(instance_name, child, used))
731       return duplicates
732
733     duplicates = []
734     my_dict = dict((node, {}) for node in self._config_data.nodes)
735     for instance in self._config_data.instances.itervalues():
736       for disk in instance.disks:
737         duplicates.extend(_AppendUsedPorts(instance.name, disk, my_dict))
738     for (node, minor), instance in self._temporary_drbds.iteritems():
739       if minor in my_dict[node] and my_dict[node][minor] != instance:
740         duplicates.append((node, minor, instance, my_dict[node][minor]))
741       else:
742         my_dict[node][minor] = instance
743     return my_dict, duplicates
744
745   @locking.ssynchronized(_config_lock)
746   def ComputeDRBDMap(self):
747     """Compute the used DRBD minor/nodes.
748
749     This is just a wrapper over L{_UnlockedComputeDRBDMap}.
750
751     @return: dictionary of node_name: dict of minor: instance_name;
752         the returned dict will have all the nodes in it (even if with
753         an empty list).
754
755     """
756     d_map, duplicates = self._UnlockedComputeDRBDMap()
757     if duplicates:
758       raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
759                                       str(duplicates))
760     return d_map
761
762   @locking.ssynchronized(_config_lock)
763   def AllocateDRBDMinor(self, nodes, instance):
764     """Allocate a drbd minor.
765
766     The free minor will be automatically computed from the existing
767     devices. A node can be given multiple times in order to allocate
768     multiple minors. The result is the list of minors, in the same
769     order as the passed nodes.
770
771     @type instance: string
772     @param instance: the instance for which we allocate minors
773
774     """
775     assert isinstance(instance, basestring), \
776            "Invalid argument '%s' passed to AllocateDRBDMinor" % instance
777
778     d_map, duplicates = self._UnlockedComputeDRBDMap()
779     if duplicates:
780       raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
781                                       str(duplicates))
782     result = []
783     for nname in nodes:
784       ndata = d_map[nname]
785       if not ndata:
786         # no minors used, we can start at 0
787         result.append(0)
788         ndata[0] = instance
789         self._temporary_drbds[(nname, 0)] = instance
790         continue
791       keys = ndata.keys()
792       keys.sort()
793       ffree = utils.FirstFree(keys)
794       if ffree is None:
795         # return the next minor
796         # TODO: implement high-limit check
797         minor = keys[-1] + 1
798       else:
799         minor = ffree
800       # double-check minor against current instances
801       assert minor not in d_map[nname], \
802              ("Attempt to reuse allocated DRBD minor %d on node %s,"
803               " already allocated to instance %s" %
804               (minor, nname, d_map[nname][minor]))
805       ndata[minor] = instance
806       # double-check minor against reservation
807       r_key = (nname, minor)
808       assert r_key not in self._temporary_drbds, \
809              ("Attempt to reuse reserved DRBD minor %d on node %s,"
810               " reserved for instance %s" %
811               (minor, nname, self._temporary_drbds[r_key]))
812       self._temporary_drbds[r_key] = instance
813       result.append(minor)
814     logging.debug("Request to allocate drbd minors, input: %s, returning %s",
815                   nodes, result)
816     return result
817
818   def _UnlockedReleaseDRBDMinors(self, instance):
819     """Release temporary drbd minors allocated for a given instance.
820
821     @type instance: string
822     @param instance: the instance for which temporary minors should be
823                      released
824
825     """
826     assert isinstance(instance, basestring), \
827            "Invalid argument passed to ReleaseDRBDMinors"
828     for key, name in self._temporary_drbds.items():
829       if name == instance:
830         del self._temporary_drbds[key]
831
832   @locking.ssynchronized(_config_lock)
833   def ReleaseDRBDMinors(self, instance):
834     """Release temporary drbd minors allocated for a given instance.
835
836     This should be called on the error paths, on the success paths
837     it's automatically called by the ConfigWriter add and update
838     functions.
839
840     This function is just a wrapper over L{_UnlockedReleaseDRBDMinors}.
841
842     @type instance: string
843     @param instance: the instance for which temporary minors should be
844                      released
845
846     """
847     self._UnlockedReleaseDRBDMinors(instance)
848
849   @locking.ssynchronized(_config_lock, shared=1)
850   def GetConfigVersion(self):
851     """Get the configuration version.
852
853     @return: Config version
854
855     """
856     return self._config_data.version
857
858   @locking.ssynchronized(_config_lock, shared=1)
859   def GetClusterName(self):
860     """Get cluster name.
861
862     @return: Cluster name
863
864     """
865     return self._config_data.cluster.cluster_name
866
867   @locking.ssynchronized(_config_lock, shared=1)
868   def GetMasterNode(self):
869     """Get the hostname of the master node for this cluster.
870
871     @return: Master hostname
872
873     """
874     return self._config_data.cluster.master_node
875
876   @locking.ssynchronized(_config_lock, shared=1)
877   def GetMasterIP(self):
878     """Get the IP of the master node for this cluster.
879
880     @return: Master IP
881
882     """
883     return self._config_data.cluster.master_ip
884
885   @locking.ssynchronized(_config_lock, shared=1)
886   def GetMasterNetdev(self):
887     """Get the master network device for this cluster.
888
889     """
890     return self._config_data.cluster.master_netdev
891
892   @locking.ssynchronized(_config_lock, shared=1)
893   def GetMasterNetmask(self):
894     """Get the netmask of the master node for this cluster.
895
896     """
897     return self._config_data.cluster.master_netmask
898
899   @locking.ssynchronized(_config_lock, shared=1)
900   def GetFileStorageDir(self):
901     """Get the file storage dir for this cluster.
902
903     """
904     return self._config_data.cluster.file_storage_dir
905
906   @locking.ssynchronized(_config_lock, shared=1)
907   def GetSharedFileStorageDir(self):
908     """Get the shared file storage dir for this cluster.
909
910     """
911     return self._config_data.cluster.shared_file_storage_dir
912
913   @locking.ssynchronized(_config_lock, shared=1)
914   def GetHypervisorType(self):
915     """Get the hypervisor type for this cluster.
916
917     """
918     return self._config_data.cluster.enabled_hypervisors[0]
919
920   @locking.ssynchronized(_config_lock, shared=1)
921   def GetHostKey(self):
922     """Return the rsa hostkey from the config.
923
924     @rtype: string
925     @return: the rsa hostkey
926
927     """
928     return self._config_data.cluster.rsahostkeypub
929
930   @locking.ssynchronized(_config_lock, shared=1)
931   def GetDefaultIAllocator(self):
932     """Get the default instance allocator for this cluster.
933
934     """
935     return self._config_data.cluster.default_iallocator
936
937   @locking.ssynchronized(_config_lock, shared=1)
938   def GetPrimaryIPFamily(self):
939     """Get cluster primary ip family.
940
941     @return: primary ip family
942
943     """
944     return self._config_data.cluster.primary_ip_family
945
946   @locking.ssynchronized(_config_lock, shared=1)
947   def GetMasterNetworkParameters(self):
948     """Get network parameters of the master node.
949
950     @rtype: L{object.MasterNetworkParameters}
951     @return: network parameters of the master node
952
953     """
954     cluster = self._config_data.cluster
955     result = objects.MasterNetworkParameters(name=cluster.master_node,
956       ip=cluster.master_ip,
957       netmask=cluster.master_netmask,
958       netdev=cluster.master_netdev,
959       ip_family=cluster.primary_ip_family)
960
961     return result
962
963   @locking.ssynchronized(_config_lock)
964   def AddNodeGroup(self, group, ec_id, check_uuid=True):
965     """Add a node group to the configuration.
966
967     This method calls group.UpgradeConfig() to fill any missing attributes
968     according to their default values.
969
970     @type group: L{objects.NodeGroup}
971     @param group: the NodeGroup object to add
972     @type ec_id: string
973     @param ec_id: unique id for the job to use when creating a missing UUID
974     @type check_uuid: bool
975     @param check_uuid: add an UUID to the group if it doesn't have one or, if
976                        it does, ensure that it does not exist in the
977                        configuration already
978
979     """
980     self._UnlockedAddNodeGroup(group, ec_id, check_uuid)
981     self._WriteConfig()
982
983   def _UnlockedAddNodeGroup(self, group, ec_id, check_uuid):
984     """Add a node group to the configuration.
985
986     """
987     logging.info("Adding node group %s to configuration", group.name)
988
989     # Some code might need to add a node group with a pre-populated UUID
990     # generated with ConfigWriter.GenerateUniqueID(). We allow them to bypass
991     # the "does this UUID" exist already check.
992     if check_uuid:
993       self._EnsureUUID(group, ec_id)
994
995     try:
996       existing_uuid = self._UnlockedLookupNodeGroup(group.name)
997     except errors.OpPrereqError:
998       pass
999     else:
1000       raise errors.OpPrereqError("Desired group name '%s' already exists as a"
1001                                  " node group (UUID: %s)" %
1002                                  (group.name, existing_uuid),
1003                                  errors.ECODE_EXISTS)
1004
1005     group.serial_no = 1
1006     group.ctime = group.mtime = time.time()
1007     group.UpgradeConfig()
1008
1009     self._config_data.nodegroups[group.uuid] = group
1010     self._config_data.cluster.serial_no += 1
1011
1012   @locking.ssynchronized(_config_lock)
1013   def RemoveNodeGroup(self, group_uuid):
1014     """Remove a node group from the configuration.
1015
1016     @type group_uuid: string
1017     @param group_uuid: the UUID of the node group to remove
1018
1019     """
1020     logging.info("Removing node group %s from configuration", group_uuid)
1021
1022     if group_uuid not in self._config_data.nodegroups:
1023       raise errors.ConfigurationError("Unknown node group '%s'" % group_uuid)
1024
1025     assert len(self._config_data.nodegroups) != 1, \
1026             "Group '%s' is the only group, cannot be removed" % group_uuid
1027
1028     del self._config_data.nodegroups[group_uuid]
1029     self._config_data.cluster.serial_no += 1
1030     self._WriteConfig()
1031
1032   def _UnlockedLookupNodeGroup(self, target):
1033     """Lookup a node group's UUID.
1034
1035     @type target: string or None
1036     @param target: group name or UUID or None to look for the default
1037     @rtype: string
1038     @return: nodegroup UUID
1039     @raises errors.OpPrereqError: when the target group cannot be found
1040
1041     """
1042     if target is None:
1043       if len(self._config_data.nodegroups) != 1:
1044         raise errors.OpPrereqError("More than one node group exists. Target"
1045                                    " group must be specified explicitely.")
1046       else:
1047         return self._config_data.nodegroups.keys()[0]
1048     if target in self._config_data.nodegroups:
1049       return target
1050     for nodegroup in self._config_data.nodegroups.values():
1051       if nodegroup.name == target:
1052         return nodegroup.uuid
1053     raise errors.OpPrereqError("Node group '%s' not found" % target,
1054                                errors.ECODE_NOENT)
1055
1056   @locking.ssynchronized(_config_lock, shared=1)
1057   def LookupNodeGroup(self, target):
1058     """Lookup a node group's UUID.
1059
1060     This function is just a wrapper over L{_UnlockedLookupNodeGroup}.
1061
1062     @type target: string or None
1063     @param target: group name or UUID or None to look for the default
1064     @rtype: string
1065     @return: nodegroup UUID
1066
1067     """
1068     return self._UnlockedLookupNodeGroup(target)
1069
1070   def _UnlockedGetNodeGroup(self, uuid):
1071     """Lookup a node group.
1072
1073     @type uuid: string
1074     @param uuid: group UUID
1075     @rtype: L{objects.NodeGroup} or None
1076     @return: nodegroup object, or None if not found
1077
1078     """
1079     if uuid not in self._config_data.nodegroups:
1080       return None
1081
1082     return self._config_data.nodegroups[uuid]
1083
1084   @locking.ssynchronized(_config_lock, shared=1)
1085   def GetNodeGroup(self, uuid):
1086     """Lookup a node group.
1087
1088     @type uuid: string
1089     @param uuid: group UUID
1090     @rtype: L{objects.NodeGroup} or None
1091     @return: nodegroup object, or None if not found
1092
1093     """
1094     return self._UnlockedGetNodeGroup(uuid)
1095
1096   @locking.ssynchronized(_config_lock, shared=1)
1097   def GetAllNodeGroupsInfo(self):
1098     """Get the configuration of all node groups.
1099
1100     """
1101     return dict(self._config_data.nodegroups)
1102
1103   @locking.ssynchronized(_config_lock, shared=1)
1104   def GetNodeGroupList(self):
1105     """Get a list of node groups.
1106
1107     """
1108     return self._config_data.nodegroups.keys()
1109
1110   @locking.ssynchronized(_config_lock, shared=1)
1111   def GetNodeGroupMembersByNodes(self, nodes):
1112     """Get nodes which are member in the same nodegroups as the given nodes.
1113
1114     """
1115     ngfn = lambda node_name: self._UnlockedGetNodeInfo(node_name).group
1116     return frozenset(member_name
1117                      for node_name in nodes
1118                      for member_name in
1119                        self._UnlockedGetNodeGroup(ngfn(node_name)).members)
1120
1121   @locking.ssynchronized(_config_lock)
1122   def AddInstance(self, instance, ec_id):
1123     """Add an instance to the config.
1124
1125     This should be used after creating a new instance.
1126
1127     @type instance: L{objects.Instance}
1128     @param instance: the instance object
1129
1130     """
1131     if not isinstance(instance, objects.Instance):
1132       raise errors.ProgrammerError("Invalid type passed to AddInstance")
1133
1134     if instance.disk_template != constants.DT_DISKLESS:
1135       all_lvs = instance.MapLVsByNode()
1136       logging.info("Instance '%s' DISK_LAYOUT: %s", instance.name, all_lvs)
1137
1138     all_macs = self._AllMACs()
1139     for nic in instance.nics:
1140       if nic.mac in all_macs:
1141         raise errors.ConfigurationError("Cannot add instance %s:"
1142                                         " MAC address '%s' already in use." %
1143                                         (instance.name, nic.mac))
1144
1145     self._EnsureUUID(instance, ec_id)
1146
1147     instance.serial_no = 1
1148     instance.ctime = instance.mtime = time.time()
1149     self._config_data.instances[instance.name] = instance
1150     self._config_data.cluster.serial_no += 1
1151     self._UnlockedReleaseDRBDMinors(instance.name)
1152     self._WriteConfig()
1153
1154   def _EnsureUUID(self, item, ec_id):
1155     """Ensures a given object has a valid UUID.
1156
1157     @param item: the instance or node to be checked
1158     @param ec_id: the execution context id for the uuid reservation
1159
1160     """
1161     if not item.uuid:
1162       item.uuid = self._GenerateUniqueID(ec_id)
1163     elif item.uuid in self._AllIDs(include_temporary=True):
1164       raise errors.ConfigurationError("Cannot add '%s': UUID %s already"
1165                                       " in use" % (item.name, item.uuid))
1166
1167   def _SetInstanceStatus(self, instance_name, status):
1168     """Set the instance's status to a given value.
1169
1170     """
1171     assert isinstance(status, bool), \
1172            "Invalid status '%s' passed to SetInstanceStatus" % (status,)
1173
1174     if instance_name not in self._config_data.instances:
1175       raise errors.ConfigurationError("Unknown instance '%s'" %
1176                                       instance_name)
1177     instance = self._config_data.instances[instance_name]
1178     if instance.admin_up != status:
1179       instance.admin_up = status
1180       instance.serial_no += 1
1181       instance.mtime = time.time()
1182       self._WriteConfig()
1183
1184   @locking.ssynchronized(_config_lock)
1185   def MarkInstanceUp(self, instance_name):
1186     """Mark the instance status to up in the config.
1187
1188     """
1189     self._SetInstanceStatus(instance_name, True)
1190
1191   @locking.ssynchronized(_config_lock)
1192   def RemoveInstance(self, instance_name):
1193     """Remove the instance from the configuration.
1194
1195     """
1196     if instance_name not in self._config_data.instances:
1197       raise errors.ConfigurationError("Unknown instance '%s'" % instance_name)
1198     del self._config_data.instances[instance_name]
1199     self._config_data.cluster.serial_no += 1
1200     self._WriteConfig()
1201
1202   @locking.ssynchronized(_config_lock)
1203   def RenameInstance(self, old_name, new_name):
1204     """Rename an instance.
1205
1206     This needs to be done in ConfigWriter and not by RemoveInstance
1207     combined with AddInstance as only we can guarantee an atomic
1208     rename.
1209
1210     """
1211     if old_name not in self._config_data.instances:
1212       raise errors.ConfigurationError("Unknown instance '%s'" % old_name)
1213     inst = self._config_data.instances[old_name]
1214     del self._config_data.instances[old_name]
1215     inst.name = new_name
1216
1217     for disk in inst.disks:
1218       if disk.dev_type == constants.LD_FILE:
1219         # rename the file paths in logical and physical id
1220         file_storage_dir = os.path.dirname(os.path.dirname(disk.logical_id[1]))
1221         disk_fname = "disk%s" % disk.iv_name.split("/")[1]
1222         disk.physical_id = disk.logical_id = (disk.logical_id[0],
1223                                               utils.PathJoin(file_storage_dir,
1224                                                              inst.name,
1225                                                              disk_fname))
1226
1227     # Force update of ssconf files
1228     self._config_data.cluster.serial_no += 1
1229
1230     self._config_data.instances[inst.name] = inst
1231     self._WriteConfig()
1232
1233   @locking.ssynchronized(_config_lock)
1234   def MarkInstanceDown(self, instance_name):
1235     """Mark the status of an instance to down in the configuration.
1236
1237     """
1238     self._SetInstanceStatus(instance_name, False)
1239
1240   def _UnlockedGetInstanceList(self):
1241     """Get the list of instances.
1242
1243     This function is for internal use, when the config lock is already held.
1244
1245     """
1246     return self._config_data.instances.keys()
1247
1248   @locking.ssynchronized(_config_lock, shared=1)
1249   def GetInstanceList(self):
1250     """Get the list of instances.
1251
1252     @return: array of instances, ex. ['instance2.example.com',
1253         'instance1.example.com']
1254
1255     """
1256     return self._UnlockedGetInstanceList()
1257
1258   def ExpandInstanceName(self, short_name):
1259     """Attempt to expand an incomplete instance name.
1260
1261     """
1262     # Locking is done in L{ConfigWriter.GetInstanceList}
1263     return _MatchNameComponentIgnoreCase(short_name, self.GetInstanceList())
1264
1265   def _UnlockedGetInstanceInfo(self, instance_name):
1266     """Returns information about an instance.
1267
1268     This function is for internal use, when the config lock is already held.
1269
1270     """
1271     if instance_name not in self._config_data.instances:
1272       return None
1273
1274     return self._config_data.instances[instance_name]
1275
1276   @locking.ssynchronized(_config_lock, shared=1)
1277   def GetInstanceInfo(self, instance_name):
1278     """Returns information about an instance.
1279
1280     It takes the information from the configuration file. Other information of
1281     an instance are taken from the live systems.
1282
1283     @param instance_name: name of the instance, e.g.
1284         I{instance1.example.com}
1285
1286     @rtype: L{objects.Instance}
1287     @return: the instance object
1288
1289     """
1290     return self._UnlockedGetInstanceInfo(instance_name)
1291
1292   @locking.ssynchronized(_config_lock, shared=1)
1293   def GetInstanceNodeGroups(self, instance_name, primary_only=False):
1294     """Returns set of node group UUIDs for instance's nodes.
1295
1296     @rtype: frozenset
1297
1298     """
1299     instance = self._UnlockedGetInstanceInfo(instance_name)
1300     if not instance:
1301       raise errors.ConfigurationError("Unknown instance '%s'" % instance_name)
1302
1303     if primary_only:
1304       nodes = [instance.primary_node]
1305     else:
1306       nodes = instance.all_nodes
1307
1308     return frozenset(self._UnlockedGetNodeInfo(node_name).group
1309                      for node_name in nodes)
1310
1311   @locking.ssynchronized(_config_lock, shared=1)
1312   def GetMultiInstanceInfo(self, instances):
1313     """Get the configuration of multiple instances.
1314
1315     @param instances: list of instance names
1316     @rtype: list
1317     @return: list of tuples (instance, instance_info), where
1318         instance_info is what would GetInstanceInfo return for the
1319         node, while keeping the original order
1320
1321     """
1322     return [(name, self._UnlockedGetInstanceInfo(name)) for name in instances]
1323
1324   @locking.ssynchronized(_config_lock, shared=1)
1325   def GetAllInstancesInfo(self):
1326     """Get the configuration of all instances.
1327
1328     @rtype: dict
1329     @return: dict of (instance, instance_info), where instance_info is what
1330               would GetInstanceInfo return for the node
1331
1332     """
1333     my_dict = dict([(instance, self._UnlockedGetInstanceInfo(instance))
1334                     for instance in self._UnlockedGetInstanceList()])
1335     return my_dict
1336
1337   @locking.ssynchronized(_config_lock)
1338   def AddNode(self, node, ec_id):
1339     """Add a node to the configuration.
1340
1341     @type node: L{objects.Node}
1342     @param node: a Node instance
1343
1344     """
1345     logging.info("Adding node %s to configuration", node.name)
1346
1347     self._EnsureUUID(node, ec_id)
1348
1349     node.serial_no = 1
1350     node.ctime = node.mtime = time.time()
1351     self._UnlockedAddNodeToGroup(node.name, node.group)
1352     self._config_data.nodes[node.name] = node
1353     self._config_data.cluster.serial_no += 1
1354     self._WriteConfig()
1355
1356   @locking.ssynchronized(_config_lock)
1357   def RemoveNode(self, node_name):
1358     """Remove a node from the configuration.
1359
1360     """
1361     logging.info("Removing node %s from configuration", node_name)
1362
1363     if node_name not in self._config_data.nodes:
1364       raise errors.ConfigurationError("Unknown node '%s'" % node_name)
1365
1366     self._UnlockedRemoveNodeFromGroup(self._config_data.nodes[node_name])
1367     del self._config_data.nodes[node_name]
1368     self._config_data.cluster.serial_no += 1
1369     self._WriteConfig()
1370
1371   def ExpandNodeName(self, short_name):
1372     """Attempt to expand an incomplete node name.
1373
1374     """
1375     # Locking is done in L{ConfigWriter.GetNodeList}
1376     return _MatchNameComponentIgnoreCase(short_name, self.GetNodeList())
1377
1378   def _UnlockedGetNodeInfo(self, node_name):
1379     """Get the configuration of a node, as stored in the config.
1380
1381     This function is for internal use, when the config lock is already
1382     held.
1383
1384     @param node_name: the node name, e.g. I{node1.example.com}
1385
1386     @rtype: L{objects.Node}
1387     @return: the node object
1388
1389     """
1390     if node_name not in self._config_data.nodes:
1391       return None
1392
1393     return self._config_data.nodes[node_name]
1394
1395   @locking.ssynchronized(_config_lock, shared=1)
1396   def GetNodeInfo(self, node_name):
1397     """Get the configuration of a node, as stored in the config.
1398
1399     This is just a locked wrapper over L{_UnlockedGetNodeInfo}.
1400
1401     @param node_name: the node name, e.g. I{node1.example.com}
1402
1403     @rtype: L{objects.Node}
1404     @return: the node object
1405
1406     """
1407     return self._UnlockedGetNodeInfo(node_name)
1408
1409   @locking.ssynchronized(_config_lock, shared=1)
1410   def GetNodeInstances(self, node_name):
1411     """Get the instances of a node, as stored in the config.
1412
1413     @param node_name: the node name, e.g. I{node1.example.com}
1414
1415     @rtype: (list, list)
1416     @return: a tuple with two lists: the primary and the secondary instances
1417
1418     """
1419     pri = []
1420     sec = []
1421     for inst in self._config_data.instances.values():
1422       if inst.primary_node == node_name:
1423         pri.append(inst.name)
1424       if node_name in inst.secondary_nodes:
1425         sec.append(inst.name)
1426     return (pri, sec)
1427
1428   @locking.ssynchronized(_config_lock, shared=1)
1429   def GetNodeGroupInstances(self, uuid, primary_only=False):
1430     """Get the instances of a node group.
1431
1432     @param uuid: Node group UUID
1433     @param primary_only: Whether to only consider primary nodes
1434     @rtype: frozenset
1435     @return: List of instance names in node group
1436
1437     """
1438     if primary_only:
1439       nodes_fn = lambda inst: [inst.primary_node]
1440     else:
1441       nodes_fn = lambda inst: inst.all_nodes
1442
1443     return frozenset(inst.name
1444                      for inst in self._config_data.instances.values()
1445                      for node_name in nodes_fn(inst)
1446                      if self._UnlockedGetNodeInfo(node_name).group == uuid)
1447
1448   def _UnlockedGetNodeList(self):
1449     """Return the list of nodes which are in the configuration.
1450
1451     This function is for internal use, when the config lock is already
1452     held.
1453
1454     @rtype: list
1455
1456     """
1457     return self._config_data.nodes.keys()
1458
1459   @locking.ssynchronized(_config_lock, shared=1)
1460   def GetNodeList(self):
1461     """Return the list of nodes which are in the configuration.
1462
1463     """
1464     return self._UnlockedGetNodeList()
1465
1466   def _UnlockedGetOnlineNodeList(self):
1467     """Return the list of nodes which are online.
1468
1469     """
1470     all_nodes = [self._UnlockedGetNodeInfo(node)
1471                  for node in self._UnlockedGetNodeList()]
1472     return [node.name for node in all_nodes if not node.offline]
1473
1474   @locking.ssynchronized(_config_lock, shared=1)
1475   def GetOnlineNodeList(self):
1476     """Return the list of nodes which are online.
1477
1478     """
1479     return self._UnlockedGetOnlineNodeList()
1480
1481   @locking.ssynchronized(_config_lock, shared=1)
1482   def GetVmCapableNodeList(self):
1483     """Return the list of nodes which are not vm capable.
1484
1485     """
1486     all_nodes = [self._UnlockedGetNodeInfo(node)
1487                  for node in self._UnlockedGetNodeList()]
1488     return [node.name for node in all_nodes if node.vm_capable]
1489
1490   @locking.ssynchronized(_config_lock, shared=1)
1491   def GetNonVmCapableNodeList(self):
1492     """Return the list of nodes which are not vm capable.
1493
1494     """
1495     all_nodes = [self._UnlockedGetNodeInfo(node)
1496                  for node in self._UnlockedGetNodeList()]
1497     return [node.name for node in all_nodes if not node.vm_capable]
1498
1499   @locking.ssynchronized(_config_lock, shared=1)
1500   def GetMultiNodeInfo(self, nodes):
1501     """Get the configuration of multiple nodes.
1502
1503     @param nodes: list of node names
1504     @rtype: list
1505     @return: list of tuples of (node, node_info), where node_info is
1506         what would GetNodeInfo return for the node, in the original
1507         order
1508
1509     """
1510     return [(name, self._UnlockedGetNodeInfo(name)) for name in nodes]
1511
1512   @locking.ssynchronized(_config_lock, shared=1)
1513   def GetAllNodesInfo(self):
1514     """Get the configuration of all nodes.
1515
1516     @rtype: dict
1517     @return: dict of (node, node_info), where node_info is what
1518               would GetNodeInfo return for the node
1519
1520     """
1521     return self._UnlockedGetAllNodesInfo()
1522
1523   def _UnlockedGetAllNodesInfo(self):
1524     """Gets configuration of all nodes.
1525
1526     @note: See L{GetAllNodesInfo}
1527
1528     """
1529     return dict([(node, self._UnlockedGetNodeInfo(node))
1530                  for node in self._UnlockedGetNodeList()])
1531
1532   @locking.ssynchronized(_config_lock, shared=1)
1533   def GetNodeGroupsFromNodes(self, nodes):
1534     """Returns groups for a list of nodes.
1535
1536     @type nodes: list of string
1537     @param nodes: List of node names
1538     @rtype: frozenset
1539
1540     """
1541     return frozenset(self._UnlockedGetNodeInfo(name).group for name in nodes)
1542
1543   def _UnlockedGetMasterCandidateStats(self, exceptions=None):
1544     """Get the number of current and maximum desired and possible candidates.
1545
1546     @type exceptions: list
1547     @param exceptions: if passed, list of nodes that should be ignored
1548     @rtype: tuple
1549     @return: tuple of (current, desired and possible, possible)
1550
1551     """
1552     mc_now = mc_should = mc_max = 0
1553     for node in self._config_data.nodes.values():
1554       if exceptions and node.name in exceptions:
1555         continue
1556       if not (node.offline or node.drained) and node.master_capable:
1557         mc_max += 1
1558       if node.master_candidate:
1559         mc_now += 1
1560     mc_should = min(mc_max, self._config_data.cluster.candidate_pool_size)
1561     return (mc_now, mc_should, mc_max)
1562
1563   @locking.ssynchronized(_config_lock, shared=1)
1564   def GetMasterCandidateStats(self, exceptions=None):
1565     """Get the number of current and maximum possible candidates.
1566
1567     This is just a wrapper over L{_UnlockedGetMasterCandidateStats}.
1568
1569     @type exceptions: list
1570     @param exceptions: if passed, list of nodes that should be ignored
1571     @rtype: tuple
1572     @return: tuple of (current, max)
1573
1574     """
1575     return self._UnlockedGetMasterCandidateStats(exceptions)
1576
1577   @locking.ssynchronized(_config_lock)
1578   def MaintainCandidatePool(self, exceptions):
1579     """Try to grow the candidate pool to the desired size.
1580
1581     @type exceptions: list
1582     @param exceptions: if passed, list of nodes that should be ignored
1583     @rtype: list
1584     @return: list with the adjusted nodes (L{objects.Node} instances)
1585
1586     """
1587     mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats(exceptions)
1588     mod_list = []
1589     if mc_now < mc_max:
1590       node_list = self._config_data.nodes.keys()
1591       random.shuffle(node_list)
1592       for name in node_list:
1593         if mc_now >= mc_max:
1594           break
1595         node = self._config_data.nodes[name]
1596         if (node.master_candidate or node.offline or node.drained or
1597             node.name in exceptions or not node.master_capable):
1598           continue
1599         mod_list.append(node)
1600         node.master_candidate = True
1601         node.serial_no += 1
1602         mc_now += 1
1603       if mc_now != mc_max:
1604         # this should not happen
1605         logging.warning("Warning: MaintainCandidatePool didn't manage to"
1606                         " fill the candidate pool (%d/%d)", mc_now, mc_max)
1607       if mod_list:
1608         self._config_data.cluster.serial_no += 1
1609         self._WriteConfig()
1610
1611     return mod_list
1612
1613   def _UnlockedAddNodeToGroup(self, node_name, nodegroup_uuid):
1614     """Add a given node to the specified group.
1615
1616     """
1617     if nodegroup_uuid not in self._config_data.nodegroups:
1618       # This can happen if a node group gets deleted between its lookup and
1619       # when we're adding the first node to it, since we don't keep a lock in
1620       # the meantime. It's ok though, as we'll fail cleanly if the node group
1621       # is not found anymore.
1622       raise errors.OpExecError("Unknown node group: %s" % nodegroup_uuid)
1623     if node_name not in self._config_data.nodegroups[nodegroup_uuid].members:
1624       self._config_data.nodegroups[nodegroup_uuid].members.append(node_name)
1625
1626   def _UnlockedRemoveNodeFromGroup(self, node):
1627     """Remove a given node from its group.
1628
1629     """
1630     nodegroup = node.group
1631     if nodegroup not in self._config_data.nodegroups:
1632       logging.warning("Warning: node '%s' has unknown node group '%s'"
1633                       " (while being removed from it)", node.name, nodegroup)
1634     nodegroup_obj = self._config_data.nodegroups[nodegroup]
1635     if node.name not in nodegroup_obj.members:
1636       logging.warning("Warning: node '%s' not a member of its node group '%s'"
1637                       " (while being removed from it)", node.name, nodegroup)
1638     else:
1639       nodegroup_obj.members.remove(node.name)
1640
1641   def _BumpSerialNo(self):
1642     """Bump up the serial number of the config.
1643
1644     """
1645     self._config_data.serial_no += 1
1646     self._config_data.mtime = time.time()
1647
1648   def _AllUUIDObjects(self):
1649     """Returns all objects with uuid attributes.
1650
1651     """
1652     return (self._config_data.instances.values() +
1653             self._config_data.nodes.values() +
1654             self._config_data.nodegroups.values() +
1655             [self._config_data.cluster])
1656
1657   def _OpenConfig(self, accept_foreign):
1658     """Read the config data from disk.
1659
1660     """
1661     raw_data = utils.ReadFile(self._cfg_file)
1662
1663     try:
1664       data = objects.ConfigData.FromDict(serializer.Load(raw_data))
1665     except Exception, err:
1666       raise errors.ConfigurationError(err)
1667
1668     # Make sure the configuration has the right version
1669     _ValidateConfig(data)
1670
1671     if (not hasattr(data, 'cluster') or
1672         not hasattr(data.cluster, 'rsahostkeypub')):
1673       raise errors.ConfigurationError("Incomplete configuration"
1674                                       " (missing cluster.rsahostkeypub)")
1675
1676     if data.cluster.master_node != self._my_hostname and not accept_foreign:
1677       msg = ("The configuration denotes node %s as master, while my"
1678              " hostname is %s; opening a foreign configuration is only"
1679              " possible in accept_foreign mode" %
1680              (data.cluster.master_node, self._my_hostname))
1681       raise errors.ConfigurationError(msg)
1682
1683     # Upgrade configuration if needed
1684     data.UpgradeConfig()
1685
1686     self._config_data = data
1687     # reset the last serial as -1 so that the next write will cause
1688     # ssconf update
1689     self._last_cluster_serial = -1
1690
1691     # And finally run our (custom) config upgrade sequence
1692     self._UpgradeConfig()
1693
1694     self._cfg_id = utils.GetFileID(path=self._cfg_file)
1695
1696   def _UpgradeConfig(self):
1697     """Run upgrade steps that cannot be done purely in the objects.
1698
1699     This is because some data elements need uniqueness across the
1700     whole configuration, etc.
1701
1702     @warning: this function will call L{_WriteConfig()}, but also
1703         L{DropECReservations} so it needs to be called only from a
1704         "safe" place (the constructor). If one wanted to call it with
1705         the lock held, a DropECReservationUnlocked would need to be
1706         created first, to avoid causing deadlock.
1707
1708     """
1709     modified = False
1710     for item in self._AllUUIDObjects():
1711       if item.uuid is None:
1712         item.uuid = self._GenerateUniqueID(_UPGRADE_CONFIG_JID)
1713         modified = True
1714     if not self._config_data.nodegroups:
1715       default_nodegroup_name = constants.INITIAL_NODE_GROUP_NAME
1716       default_nodegroup = objects.NodeGroup(name=default_nodegroup_name,
1717                                             members=[])
1718       self._UnlockedAddNodeGroup(default_nodegroup, _UPGRADE_CONFIG_JID, True)
1719       modified = True
1720     for node in self._config_data.nodes.values():
1721       if not node.group:
1722         node.group = self.LookupNodeGroup(None)
1723         modified = True
1724       # This is technically *not* an upgrade, but needs to be done both when
1725       # nodegroups are being added, and upon normally loading the config,
1726       # because the members list of a node group is discarded upon
1727       # serializing/deserializing the object.
1728       self._UnlockedAddNodeToGroup(node.name, node.group)
1729     if modified:
1730       self._WriteConfig()
1731       # This is ok even if it acquires the internal lock, as _UpgradeConfig is
1732       # only called at config init time, without the lock held
1733       self.DropECReservations(_UPGRADE_CONFIG_JID)
1734
1735   def _DistributeConfig(self, feedback_fn):
1736     """Distribute the configuration to the other nodes.
1737
1738     Currently, this only copies the configuration file. In the future,
1739     it could be used to encapsulate the 2/3-phase update mechanism.
1740
1741     """
1742     if self._offline:
1743       return True
1744
1745     bad = False
1746
1747     node_list = []
1748     addr_list = []
1749     myhostname = self._my_hostname
1750     # we can skip checking whether _UnlockedGetNodeInfo returns None
1751     # since the node list comes from _UnlocketGetNodeList, and we are
1752     # called with the lock held, so no modifications should take place
1753     # in between
1754     for node_name in self._UnlockedGetNodeList():
1755       if node_name == myhostname:
1756         continue
1757       node_info = self._UnlockedGetNodeInfo(node_name)
1758       if not node_info.master_candidate:
1759         continue
1760       node_list.append(node_info.name)
1761       addr_list.append(node_info.primary_ip)
1762
1763     # TODO: Use dedicated resolver talking to config writer for name resolution
1764     result = \
1765       self._GetRpc(addr_list).call_upload_file(node_list, self._cfg_file)
1766     for to_node, to_result in result.items():
1767       msg = to_result.fail_msg
1768       if msg:
1769         msg = ("Copy of file %s to node %s failed: %s" %
1770                (self._cfg_file, to_node, msg))
1771         logging.error(msg)
1772
1773         if feedback_fn:
1774           feedback_fn(msg)
1775
1776         bad = True
1777
1778     return not bad
1779
1780   def _WriteConfig(self, destination=None, feedback_fn=None):
1781     """Write the configuration data to persistent storage.
1782
1783     """
1784     assert feedback_fn is None or callable(feedback_fn)
1785
1786     # Warn on config errors, but don't abort the save - the
1787     # configuration has already been modified, and we can't revert;
1788     # the best we can do is to warn the user and save as is, leaving
1789     # recovery to the user
1790     config_errors = self._UnlockedVerifyConfig()
1791     if config_errors:
1792       errmsg = ("Configuration data is not consistent: %s" %
1793                 (utils.CommaJoin(config_errors)))
1794       logging.critical(errmsg)
1795       if feedback_fn:
1796         feedback_fn(errmsg)
1797
1798     if destination is None:
1799       destination = self._cfg_file
1800     self._BumpSerialNo()
1801     txt = serializer.Dump(self._config_data.ToDict())
1802
1803     getents = self._getents()
1804     try:
1805       fd = utils.SafeWriteFile(destination, self._cfg_id, data=txt,
1806                                close=False, gid=getents.confd_gid, mode=0640)
1807     except errors.LockError:
1808       raise errors.ConfigurationError("The configuration file has been"
1809                                       " modified since the last write, cannot"
1810                                       " update")
1811     try:
1812       self._cfg_id = utils.GetFileID(fd=fd)
1813     finally:
1814       os.close(fd)
1815
1816     self.write_count += 1
1817
1818     # and redistribute the config file to master candidates
1819     self._DistributeConfig(feedback_fn)
1820
1821     # Write ssconf files on all nodes (including locally)
1822     if self._last_cluster_serial < self._config_data.cluster.serial_no:
1823       if not self._offline:
1824         result = self._GetRpc(None).call_write_ssconf_files(
1825           self._UnlockedGetOnlineNodeList(),
1826           self._UnlockedGetSsconfValues())
1827
1828         for nname, nresu in result.items():
1829           msg = nresu.fail_msg
1830           if msg:
1831             errmsg = ("Error while uploading ssconf files to"
1832                       " node %s: %s" % (nname, msg))
1833             logging.warning(errmsg)
1834
1835             if feedback_fn:
1836               feedback_fn(errmsg)
1837
1838       self._last_cluster_serial = self._config_data.cluster.serial_no
1839
1840   def _UnlockedGetSsconfValues(self):
1841     """Return the values needed by ssconf.
1842
1843     @rtype: dict
1844     @return: a dictionary with keys the ssconf names and values their
1845         associated value
1846
1847     """
1848     fn = "\n".join
1849     instance_names = utils.NiceSort(self._UnlockedGetInstanceList())
1850     node_names = utils.NiceSort(self._UnlockedGetNodeList())
1851     node_info = [self._UnlockedGetNodeInfo(name) for name in node_names]
1852     node_pri_ips = ["%s %s" % (ninfo.name, ninfo.primary_ip)
1853                     for ninfo in node_info]
1854     node_snd_ips = ["%s %s" % (ninfo.name, ninfo.secondary_ip)
1855                     for ninfo in node_info]
1856
1857     instance_data = fn(instance_names)
1858     off_data = fn(node.name for node in node_info if node.offline)
1859     on_data = fn(node.name for node in node_info if not node.offline)
1860     mc_data = fn(node.name for node in node_info if node.master_candidate)
1861     mc_ips_data = fn(node.primary_ip for node in node_info
1862                      if node.master_candidate)
1863     node_data = fn(node_names)
1864     node_pri_ips_data = fn(node_pri_ips)
1865     node_snd_ips_data = fn(node_snd_ips)
1866
1867     cluster = self._config_data.cluster
1868     cluster_tags = fn(cluster.GetTags())
1869
1870     hypervisor_list = fn(cluster.enabled_hypervisors)
1871
1872     uid_pool = uidpool.FormatUidPool(cluster.uid_pool, separator="\n")
1873
1874     nodegroups = ["%s %s" % (nodegroup.uuid, nodegroup.name) for nodegroup in
1875                   self._config_data.nodegroups.values()]
1876     nodegroups_data = fn(utils.NiceSort(nodegroups))
1877
1878     ssconf_values = {
1879       constants.SS_CLUSTER_NAME: cluster.cluster_name,
1880       constants.SS_CLUSTER_TAGS: cluster_tags,
1881       constants.SS_FILE_STORAGE_DIR: cluster.file_storage_dir,
1882       constants.SS_SHARED_FILE_STORAGE_DIR: cluster.shared_file_storage_dir,
1883       constants.SS_MASTER_CANDIDATES: mc_data,
1884       constants.SS_MASTER_CANDIDATES_IPS: mc_ips_data,
1885       constants.SS_MASTER_IP: cluster.master_ip,
1886       constants.SS_MASTER_NETDEV: cluster.master_netdev,
1887       constants.SS_MASTER_NETMASK: str(cluster.master_netmask),
1888       constants.SS_MASTER_NODE: cluster.master_node,
1889       constants.SS_NODE_LIST: node_data,
1890       constants.SS_NODE_PRIMARY_IPS: node_pri_ips_data,
1891       constants.SS_NODE_SECONDARY_IPS: node_snd_ips_data,
1892       constants.SS_OFFLINE_NODES: off_data,
1893       constants.SS_ONLINE_NODES: on_data,
1894       constants.SS_PRIMARY_IP_FAMILY: str(cluster.primary_ip_family),
1895       constants.SS_INSTANCE_LIST: instance_data,
1896       constants.SS_RELEASE_VERSION: constants.RELEASE_VERSION,
1897       constants.SS_HYPERVISOR_LIST: hypervisor_list,
1898       constants.SS_MAINTAIN_NODE_HEALTH: str(cluster.maintain_node_health),
1899       constants.SS_UID_POOL: uid_pool,
1900       constants.SS_NODEGROUPS: nodegroups_data,
1901       }
1902     bad_values = [(k, v) for k, v in ssconf_values.items()
1903                   if not isinstance(v, (str, basestring))]
1904     if bad_values:
1905       err = utils.CommaJoin("%s=%s" % (k, v) for k, v in bad_values)
1906       raise errors.ConfigurationError("Some ssconf key(s) have non-string"
1907                                       " values: %s" % err)
1908     return ssconf_values
1909
1910   @locking.ssynchronized(_config_lock, shared=1)
1911   def GetSsconfValues(self):
1912     """Wrapper using lock around _UnlockedGetSsconf().
1913
1914     """
1915     return self._UnlockedGetSsconfValues()
1916
1917   @locking.ssynchronized(_config_lock, shared=1)
1918   def GetVGName(self):
1919     """Return the volume group name.
1920
1921     """
1922     return self._config_data.cluster.volume_group_name
1923
1924   @locking.ssynchronized(_config_lock)
1925   def SetVGName(self, vg_name):
1926     """Set the volume group name.
1927
1928     """
1929     self._config_data.cluster.volume_group_name = vg_name
1930     self._config_data.cluster.serial_no += 1
1931     self._WriteConfig()
1932
1933   @locking.ssynchronized(_config_lock, shared=1)
1934   def GetDRBDHelper(self):
1935     """Return DRBD usermode helper.
1936
1937     """
1938     return self._config_data.cluster.drbd_usermode_helper
1939
1940   @locking.ssynchronized(_config_lock)
1941   def SetDRBDHelper(self, drbd_helper):
1942     """Set DRBD usermode helper.
1943
1944     """
1945     self._config_data.cluster.drbd_usermode_helper = drbd_helper
1946     self._config_data.cluster.serial_no += 1
1947     self._WriteConfig()
1948
1949   @locking.ssynchronized(_config_lock, shared=1)
1950   def GetMACPrefix(self):
1951     """Return the mac prefix.
1952
1953     """
1954     return self._config_data.cluster.mac_prefix
1955
1956   @locking.ssynchronized(_config_lock, shared=1)
1957   def GetClusterInfo(self):
1958     """Returns information about the cluster
1959
1960     @rtype: L{objects.Cluster}
1961     @return: the cluster object
1962
1963     """
1964     return self._config_data.cluster
1965
1966   @locking.ssynchronized(_config_lock, shared=1)
1967   def HasAnyDiskOfType(self, dev_type):
1968     """Check if in there is at disk of the given type in the configuration.
1969
1970     """
1971     return self._config_data.HasAnyDiskOfType(dev_type)
1972
1973   @locking.ssynchronized(_config_lock)
1974   def Update(self, target, feedback_fn):
1975     """Notify function to be called after updates.
1976
1977     This function must be called when an object (as returned by
1978     GetInstanceInfo, GetNodeInfo, GetCluster) has been updated and the
1979     caller wants the modifications saved to the backing store. Note
1980     that all modified objects will be saved, but the target argument
1981     is the one the caller wants to ensure that it's saved.
1982
1983     @param target: an instance of either L{objects.Cluster},
1984         L{objects.Node} or L{objects.Instance} which is existing in
1985         the cluster
1986     @param feedback_fn: Callable feedback function
1987
1988     """
1989     if self._config_data is None:
1990       raise errors.ProgrammerError("Configuration file not read,"
1991                                    " cannot save.")
1992     update_serial = False
1993     if isinstance(target, objects.Cluster):
1994       test = target == self._config_data.cluster
1995     elif isinstance(target, objects.Node):
1996       test = target in self._config_data.nodes.values()
1997       update_serial = True
1998     elif isinstance(target, objects.Instance):
1999       test = target in self._config_data.instances.values()
2000     elif isinstance(target, objects.NodeGroup):
2001       test = target in self._config_data.nodegroups.values()
2002     else:
2003       raise errors.ProgrammerError("Invalid object type (%s) passed to"
2004                                    " ConfigWriter.Update" % type(target))
2005     if not test:
2006       raise errors.ConfigurationError("Configuration updated since object"
2007                                       " has been read or unknown object")
2008     target.serial_no += 1
2009     target.mtime = now = time.time()
2010
2011     if update_serial:
2012       # for node updates, we need to increase the cluster serial too
2013       self._config_data.cluster.serial_no += 1
2014       self._config_data.cluster.mtime = now
2015
2016     if isinstance(target, objects.Instance):
2017       self._UnlockedReleaseDRBDMinors(target.name)
2018
2019     self._WriteConfig(feedback_fn=feedback_fn)
2020
2021   @locking.ssynchronized(_config_lock)
2022   def DropECReservations(self, ec_id):
2023     """Drop per-execution-context reservations
2024
2025     """
2026     for rm in self._all_rms:
2027       rm.DropECReservations(ec_id)