Refactor hotplug implementation
[ganeti-local] / lib / config.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Configuration management for Ganeti
23
24 This module provides the interface to the Ganeti cluster configuration.
25
26 The configuration data is stored on every node but is updated on the master
27 only. After each update, the master distributes the data to the other nodes.
28
29 Currently, the data storage format is JSON. YAML was slow and consuming too
30 much memory.
31
32 """
33
34 # pylint: disable=R0904
35 # R0904: Too many public methods
36
37 import os
38 import random
39 import logging
40 import time
41 import itertools
42
43 from ganeti import errors
44 from ganeti import locking
45 from ganeti import utils
46 from ganeti import constants
47 from ganeti import rpc
48 from ganeti import objects
49 from ganeti import serializer
50 from ganeti import uidpool
51 from ganeti import netutils
52 from ganeti import runtime
53
54
55 _config_lock = locking.SharedLock("ConfigWriter")
56
57 # job id used for resource management at config upgrade time
58 _UPGRADE_CONFIG_JID = "jid-cfg-upgrade"
59
60
61 def _ValidateConfig(data):
62   """Verifies that a configuration objects looks valid.
63
64   This only verifies the version of the configuration.
65
66   @raise errors.ConfigurationError: if the version differs from what
67       we expect
68
69   """
70   if data.version != constants.CONFIG_VERSION:
71     raise errors.ConfigVersionMismatch(constants.CONFIG_VERSION, data.version)
72
73
74 class TemporaryReservationManager:
75   """A temporary resource reservation manager.
76
77   This is used to reserve resources in a job, before using them, making sure
78   other jobs cannot get them in the meantime.
79
80   """
81   def __init__(self):
82     self._ec_reserved = {}
83
84   def Reserved(self, resource):
85     for holder_reserved in self._ec_reserved.values():
86       if resource in holder_reserved:
87         return True
88     return False
89
90   def Reserve(self, ec_id, resource):
91     if self.Reserved(resource):
92       raise errors.ReservationError("Duplicate reservation for resource '%s'"
93                                     % str(resource))
94     if ec_id not in self._ec_reserved:
95       self._ec_reserved[ec_id] = set([resource])
96     else:
97       self._ec_reserved[ec_id].add(resource)
98
99   def DropECReservations(self, ec_id):
100     if ec_id in self._ec_reserved:
101       del self._ec_reserved[ec_id]
102
103   def GetReserved(self):
104     all_reserved = set()
105     for holder_reserved in self._ec_reserved.values():
106       all_reserved.update(holder_reserved)
107     return all_reserved
108
109   def Generate(self, existing, generate_one_fn, ec_id):
110     """Generate a new resource of this type
111
112     """
113     assert callable(generate_one_fn)
114
115     all_elems = self.GetReserved()
116     all_elems.update(existing)
117     retries = 64
118     while retries > 0:
119       new_resource = generate_one_fn()
120       if new_resource is not None and new_resource not in all_elems:
121         break
122     else:
123       raise errors.ConfigurationError("Not able generate new resource"
124                                       " (last tried: %s)" % new_resource)
125     self.Reserve(ec_id, new_resource)
126     return new_resource
127
128
129 def _MatchNameComponentIgnoreCase(short_name, names):
130   """Wrapper around L{utils.text.MatchNameComponent}.
131
132   """
133   return utils.MatchNameComponent(short_name, names, case_sensitive=False)
134
135
136 def _CheckInstanceDiskIvNames(disks):
137   """Checks if instance's disks' C{iv_name} attributes are in order.
138
139   @type disks: list of L{objects.Disk}
140   @param disks: List of disks
141   @rtype: list of tuples; (int, string, string)
142   @return: List of wrongly named disks, each tuple contains disk index,
143     expected and actual name
144
145   """
146   result = []
147
148   for (idx, disk) in enumerate(disks):
149     exp_iv_name = "disk/%s" % idx
150     if disk.iv_name != exp_iv_name:
151       result.append((idx, exp_iv_name, disk.iv_name))
152
153   return result
154
155
156 class ConfigWriter:
157   """The interface to the cluster configuration.
158
159   @ivar _temporary_lvs: reservation manager for temporary LVs
160   @ivar _all_rms: a list of all temporary reservation managers
161
162   """
163   def __init__(self, cfg_file=None, offline=False, _getents=runtime.GetEnts,
164                accept_foreign=False):
165     self.write_count = 0
166     self._lock = _config_lock
167     self._config_data = None
168     self._offline = offline
169     if cfg_file is None:
170       self._cfg_file = constants.CLUSTER_CONF_FILE
171     else:
172       self._cfg_file = cfg_file
173     self._getents = _getents
174     self._temporary_ids = TemporaryReservationManager()
175     self._temporary_drbds = {}
176     self._temporary_macs = TemporaryReservationManager()
177     self._temporary_secrets = TemporaryReservationManager()
178     self._temporary_lvs = TemporaryReservationManager()
179     self._all_rms = [self._temporary_ids, self._temporary_macs,
180                      self._temporary_secrets, self._temporary_lvs]
181     # Note: in order to prevent errors when resolving our name in
182     # _DistributeConfig, we compute it here once and reuse it; it's
183     # better to raise an error before starting to modify the config
184     # file than after it was modified
185     self._my_hostname = netutils.Hostname.GetSysName()
186     self._last_cluster_serial = -1
187     self._cfg_id = None
188     self._context = None
189     self._OpenConfig(accept_foreign)
190
191   def _GetRpc(self, address_list):
192     """Returns RPC runner for configuration.
193
194     """
195     return rpc.ConfigRunner(self._context, address_list)
196
197   def SetContext(self, context):
198     """Sets Ganeti context.
199
200     """
201     self._context = context
202
203   # this method needs to be static, so that we can call it on the class
204   @staticmethod
205   def IsCluster():
206     """Check if the cluster is configured.
207
208     """
209     return os.path.exists(constants.CLUSTER_CONF_FILE)
210
211   def _GenerateOneMAC(self):
212     """Generate one mac address
213
214     """
215     prefix = self._config_data.cluster.mac_prefix
216     byte1 = random.randrange(0, 256)
217     byte2 = random.randrange(0, 256)
218     byte3 = random.randrange(0, 256)
219     mac = "%s:%02x:%02x:%02x" % (prefix, byte1, byte2, byte3)
220     return mac
221
222   @locking.ssynchronized(_config_lock, shared=1)
223   def GetNdParams(self, node):
224     """Get the node params populated with cluster defaults.
225
226     @type node: L{objects.Node}
227     @param node: The node we want to know the params for
228     @return: A dict with the filled in node params
229
230     """
231     nodegroup = self._UnlockedGetNodeGroup(node.group)
232     return self._config_data.cluster.FillND(node, nodegroup)
233
234   @locking.ssynchronized(_config_lock, shared=1)
235   def GetInstanceDiskParams(self, instance):
236     """Get the disk params populated with inherit chain.
237
238     @type instance: L{objects.Instance}
239     @param instance: The instance we want to know the params for
240     @return: A dict with the filled in disk params
241
242     """
243     node = self._UnlockedGetNodeInfo(instance.primary_node)
244     nodegroup = self._UnlockedGetNodeGroup(node.group)
245     return self._UnlockedGetGroupDiskParams(nodegroup)
246
247   @locking.ssynchronized(_config_lock, shared=1)
248   def GetGroupDiskParams(self, group):
249     """Get the disk params populated with inherit chain.
250
251     @type group: L{objects.NodeGroup}
252     @param group: The group we want to know the params for
253     @return: A dict with the filled in disk params
254
255     """
256     return self._UnlockedGetGroupDiskParams(group)
257
258   def _UnlockedGetGroupDiskParams(self, group):
259     """Get the disk params populated with inherit chain down to node-group.
260
261     @type group: L{objects.NodeGroup}
262     @param group: The group we want to know the params for
263     @return: A dict with the filled in disk params
264
265     """
266     return self._config_data.cluster.SimpleFillDP(group.diskparams)
267
268   @locking.ssynchronized(_config_lock, shared=1)
269   def GenerateMAC(self, ec_id):
270     """Generate a MAC for an instance.
271
272     This should check the current instances for duplicates.
273
274     """
275     existing = self._AllMACs()
276     return self._temporary_ids.Generate(existing, self._GenerateOneMAC, ec_id)
277
278   @locking.ssynchronized(_config_lock, shared=1)
279   def ReserveMAC(self, mac, ec_id):
280     """Reserve a MAC for an instance.
281
282     This only checks instances managed by this cluster, it does not
283     check for potential collisions elsewhere.
284
285     """
286     all_macs = self._AllMACs()
287     if mac in all_macs:
288       raise errors.ReservationError("mac already in use")
289     else:
290       self._temporary_macs.Reserve(ec_id, mac)
291
292   @locking.ssynchronized(_config_lock)
293   def GetHotplugIndex(self, instance, dev_type):
294
295     idx = getattr(instance.hotplug_info, dev_type)
296     setattr(instance.hotplug_info, dev_type, idx + 1)
297     self._WriteConfig()
298
299     return idx
300
301   @locking.ssynchronized(_config_lock, shared=1)
302   def ReserveLV(self, lv_name, ec_id):
303     """Reserve an VG/LV pair for an instance.
304
305     @type lv_name: string
306     @param lv_name: the logical volume name to reserve
307
308     """
309     all_lvs = self._AllLVs()
310     if lv_name in all_lvs:
311       raise errors.ReservationError("LV already in use")
312     else:
313       self._temporary_lvs.Reserve(ec_id, lv_name)
314
315   @locking.ssynchronized(_config_lock, shared=1)
316   def GenerateDRBDSecret(self, ec_id):
317     """Generate a DRBD secret.
318
319     This checks the current disks for duplicates.
320
321     """
322     return self._temporary_secrets.Generate(self._AllDRBDSecrets(),
323                                             utils.GenerateSecret,
324                                             ec_id)
325
326   def _AllLVs(self):
327     """Compute the list of all LVs.
328
329     """
330     lvnames = set()
331     for instance in self._config_data.instances.values():
332       node_data = instance.MapLVsByNode()
333       for lv_list in node_data.values():
334         lvnames.update(lv_list)
335     return lvnames
336
337   def _AllIDs(self, include_temporary):
338     """Compute the list of all UUIDs and names we have.
339
340     @type include_temporary: boolean
341     @param include_temporary: whether to include the _temporary_ids set
342     @rtype: set
343     @return: a set of IDs
344
345     """
346     existing = set()
347     if include_temporary:
348       existing.update(self._temporary_ids.GetReserved())
349     existing.update(self._AllLVs())
350     existing.update(self._config_data.instances.keys())
351     existing.update(self._config_data.nodes.keys())
352     existing.update([i.uuid for i in self._AllUUIDObjects() if i.uuid])
353     return existing
354
355   def _GenerateUniqueID(self, ec_id):
356     """Generate an unique UUID.
357
358     This checks the current node, instances and disk names for
359     duplicates.
360
361     @rtype: string
362     @return: the unique id
363
364     """
365     existing = self._AllIDs(include_temporary=False)
366     return self._temporary_ids.Generate(existing, utils.NewUUID, ec_id)
367
368   @locking.ssynchronized(_config_lock, shared=1)
369   def GenerateUniqueID(self, ec_id):
370     """Generate an unique ID.
371
372     This is just a wrapper over the unlocked version.
373
374     @type ec_id: string
375     @param ec_id: unique id for the job to reserve the id to
376
377     """
378     return self._GenerateUniqueID(ec_id)
379
380   def _AllMACs(self):
381     """Return all MACs present in the config.
382
383     @rtype: list
384     @return: the list of all MACs
385
386     """
387     result = []
388     for instance in self._config_data.instances.values():
389       for nic in instance.nics:
390         result.append(nic.mac)
391
392     return result
393
394   def _AllDRBDSecrets(self):
395     """Return all DRBD secrets present in the config.
396
397     @rtype: list
398     @return: the list of all DRBD secrets
399
400     """
401     def helper(disk, result):
402       """Recursively gather secrets from this disk."""
403       if disk.dev_type == constants.DT_DRBD8:
404         result.append(disk.logical_id[5])
405       if disk.children:
406         for child in disk.children:
407           helper(child, result)
408
409     result = []
410     for instance in self._config_data.instances.values():
411       for disk in instance.disks:
412         helper(disk, result)
413
414     return result
415
416   def _CheckDiskIDs(self, disk, l_ids, p_ids):
417     """Compute duplicate disk IDs
418
419     @type disk: L{objects.Disk}
420     @param disk: the disk at which to start searching
421     @type l_ids: list
422     @param l_ids: list of current logical ids
423     @type p_ids: list
424     @param p_ids: list of current physical ids
425     @rtype: list
426     @return: a list of error messages
427
428     """
429     result = []
430     if disk.logical_id is not None:
431       if disk.logical_id in l_ids:
432         result.append("duplicate logical id %s" % str(disk.logical_id))
433       else:
434         l_ids.append(disk.logical_id)
435     if disk.physical_id is not None:
436       if disk.physical_id in p_ids:
437         result.append("duplicate physical id %s" % str(disk.physical_id))
438       else:
439         p_ids.append(disk.physical_id)
440
441     if disk.children:
442       for child in disk.children:
443         result.extend(self._CheckDiskIDs(child, l_ids, p_ids))
444     return result
445
446   def _UnlockedVerifyConfig(self):
447     """Verify function.
448
449     @rtype: list
450     @return: a list of error messages; a non-empty list signifies
451         configuration errors
452
453     """
454     # pylint: disable=R0914
455     result = []
456     seen_macs = []
457     ports = {}
458     data = self._config_data
459     cluster = data.cluster
460     seen_lids = []
461     seen_pids = []
462
463     # global cluster checks
464     if not cluster.enabled_hypervisors:
465       result.append("enabled hypervisors list doesn't have any entries")
466     invalid_hvs = set(cluster.enabled_hypervisors) - constants.HYPER_TYPES
467     if invalid_hvs:
468       result.append("enabled hypervisors contains invalid entries: %s" %
469                     invalid_hvs)
470     missing_hvp = (set(cluster.enabled_hypervisors) -
471                    set(cluster.hvparams.keys()))
472     if missing_hvp:
473       result.append("hypervisor parameters missing for the enabled"
474                     " hypervisor(s) %s" % utils.CommaJoin(missing_hvp))
475
476     if cluster.master_node not in data.nodes:
477       result.append("cluster has invalid primary node '%s'" %
478                     cluster.master_node)
479
480     def _helper(owner, attr, value, template):
481       try:
482         utils.ForceDictType(value, template)
483       except errors.GenericError, err:
484         result.append("%s has invalid %s: %s" % (owner, attr, err))
485
486     def _helper_nic(owner, params):
487       try:
488         objects.NIC.CheckParameterSyntax(params)
489       except errors.ConfigurationError, err:
490         result.append("%s has invalid nicparams: %s" % (owner, err))
491
492     def _helper_ipolicy(owner, params, check_std):
493       try:
494         objects.InstancePolicy.CheckParameterSyntax(params, check_std)
495       except errors.ConfigurationError, err:
496         result.append("%s has invalid instance policy: %s" % (owner, err))
497
498     def _helper_ispecs(owner, params):
499       for key, value in params.items():
500         if key in constants.IPOLICY_ISPECS:
501           fullkey = "ipolicy/" + key
502           _helper(owner, fullkey, value, constants.ISPECS_PARAMETER_TYPES)
503         else:
504           # FIXME: assuming list type
505           if key in constants.IPOLICY_PARAMETERS:
506             exp_type = float
507           else:
508             exp_type = list
509           if not isinstance(value, exp_type):
510             result.append("%s has invalid instance policy: for %s,"
511                           " expecting %s, got %s" %
512                           (owner, key, exp_type.__name__, type(value)))
513
514     # check cluster parameters
515     _helper("cluster", "beparams", cluster.SimpleFillBE({}),
516             constants.BES_PARAMETER_TYPES)
517     _helper("cluster", "nicparams", cluster.SimpleFillNIC({}),
518             constants.NICS_PARAMETER_TYPES)
519     _helper_nic("cluster", cluster.SimpleFillNIC({}))
520     _helper("cluster", "ndparams", cluster.SimpleFillND({}),
521             constants.NDS_PARAMETER_TYPES)
522     _helper_ipolicy("cluster", cluster.SimpleFillIPolicy({}), True)
523     _helper_ispecs("cluster", cluster.SimpleFillIPolicy({}))
524
525     # per-instance checks
526     for instance_name in data.instances:
527       instance = data.instances[instance_name]
528       if instance.name != instance_name:
529         result.append("instance '%s' is indexed by wrong name '%s'" %
530                       (instance.name, instance_name))
531       if instance.primary_node not in data.nodes:
532         result.append("instance '%s' has invalid primary node '%s'" %
533                       (instance_name, instance.primary_node))
534       for snode in instance.secondary_nodes:
535         if snode not in data.nodes:
536           result.append("instance '%s' has invalid secondary node '%s'" %
537                         (instance_name, snode))
538       for idx, nic in enumerate(instance.nics):
539         if nic.mac in seen_macs:
540           result.append("instance '%s' has NIC %d mac %s duplicate" %
541                         (instance_name, idx, nic.mac))
542         else:
543           seen_macs.append(nic.mac)
544         if nic.nicparams:
545           filled = cluster.SimpleFillNIC(nic.nicparams)
546           owner = "instance %s nic %d" % (instance.name, idx)
547           _helper(owner, "nicparams",
548                   filled, constants.NICS_PARAMETER_TYPES)
549           _helper_nic(owner, filled)
550
551       # parameter checks
552       if instance.beparams:
553         _helper("instance %s" % instance.name, "beparams",
554                 cluster.FillBE(instance), constants.BES_PARAMETER_TYPES)
555
556       # gather the drbd ports for duplicate checks
557       for (idx, dsk) in enumerate(instance.disks):
558         if dsk.dev_type in constants.LDS_DRBD:
559           tcp_port = dsk.logical_id[2]
560           if tcp_port not in ports:
561             ports[tcp_port] = []
562           ports[tcp_port].append((instance.name, "drbd disk %s" % idx))
563       # gather network port reservation
564       net_port = getattr(instance, "network_port", None)
565       if net_port is not None:
566         if net_port not in ports:
567           ports[net_port] = []
568         ports[net_port].append((instance.name, "network port"))
569
570       # instance disk verify
571       for idx, disk in enumerate(instance.disks):
572         result.extend(["instance '%s' disk %d error: %s" %
573                        (instance.name, idx, msg) for msg in disk.Verify()])
574         result.extend(self._CheckDiskIDs(disk, seen_lids, seen_pids))
575
576       wrong_names = _CheckInstanceDiskIvNames(instance.disks)
577       if wrong_names:
578         tmp = "; ".join(("name of disk %s should be '%s', but is '%s'" %
579                          (idx, exp_name, actual_name))
580                         for (idx, exp_name, actual_name) in wrong_names)
581
582         result.append("Instance '%s' has wrongly named disks: %s" %
583                       (instance.name, tmp))
584
585     # cluster-wide pool of free ports
586     for free_port in cluster.tcpudp_port_pool:
587       if free_port not in ports:
588         ports[free_port] = []
589       ports[free_port].append(("cluster", "port marked as free"))
590
591     # compute tcp/udp duplicate ports
592     keys = ports.keys()
593     keys.sort()
594     for pnum in keys:
595       pdata = ports[pnum]
596       if len(pdata) > 1:
597         txt = utils.CommaJoin(["%s/%s" % val for val in pdata])
598         result.append("tcp/udp port %s has duplicates: %s" % (pnum, txt))
599
600     # highest used tcp port check
601     if keys:
602       if keys[-1] > cluster.highest_used_port:
603         result.append("Highest used port mismatch, saved %s, computed %s" %
604                       (cluster.highest_used_port, keys[-1]))
605
606     if not data.nodes[cluster.master_node].master_candidate:
607       result.append("Master node is not a master candidate")
608
609     # master candidate checks
610     mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats()
611     if mc_now < mc_max:
612       result.append("Not enough master candidates: actual %d, target %d" %
613                     (mc_now, mc_max))
614
615     # node checks
616     for node_name, node in data.nodes.items():
617       if node.name != node_name:
618         result.append("Node '%s' is indexed by wrong name '%s'" %
619                       (node.name, node_name))
620       if [node.master_candidate, node.drained, node.offline].count(True) > 1:
621         result.append("Node %s state is invalid: master_candidate=%s,"
622                       " drain=%s, offline=%s" %
623                       (node.name, node.master_candidate, node.drained,
624                        node.offline))
625       if node.group not in data.nodegroups:
626         result.append("Node '%s' has invalid group '%s'" %
627                       (node.name, node.group))
628       else:
629         _helper("node %s" % node.name, "ndparams",
630                 cluster.FillND(node, data.nodegroups[node.group]),
631                 constants.NDS_PARAMETER_TYPES)
632
633     # nodegroups checks
634     nodegroups_names = set()
635     for nodegroup_uuid in data.nodegroups:
636       nodegroup = data.nodegroups[nodegroup_uuid]
637       if nodegroup.uuid != nodegroup_uuid:
638         result.append("node group '%s' (uuid: '%s') indexed by wrong uuid '%s'"
639                       % (nodegroup.name, nodegroup.uuid, nodegroup_uuid))
640       if utils.UUID_RE.match(nodegroup.name.lower()):
641         result.append("node group '%s' (uuid: '%s') has uuid-like name" %
642                       (nodegroup.name, nodegroup.uuid))
643       if nodegroup.name in nodegroups_names:
644         result.append("duplicate node group name '%s'" % nodegroup.name)
645       else:
646         nodegroups_names.add(nodegroup.name)
647       group_name = "group %s" % nodegroup.name
648       _helper_ipolicy(group_name, cluster.SimpleFillIPolicy(nodegroup.ipolicy),
649                       False)
650       _helper_ispecs(group_name, cluster.SimpleFillIPolicy(nodegroup.ipolicy))
651       if nodegroup.ndparams:
652         _helper(group_name, "ndparams",
653                 cluster.SimpleFillND(nodegroup.ndparams),
654                 constants.NDS_PARAMETER_TYPES)
655
656     # drbd minors check
657     _, duplicates = self._UnlockedComputeDRBDMap()
658     for node, minor, instance_a, instance_b in duplicates:
659       result.append("DRBD minor %d on node %s is assigned twice to instances"
660                     " %s and %s" % (minor, node, instance_a, instance_b))
661
662     # IP checks
663     default_nicparams = cluster.nicparams[constants.PP_DEFAULT]
664     ips = {}
665
666     def _AddIpAddress(ip, name):
667       ips.setdefault(ip, []).append(name)
668
669     _AddIpAddress(cluster.master_ip, "cluster_ip")
670
671     for node in data.nodes.values():
672       _AddIpAddress(node.primary_ip, "node:%s/primary" % node.name)
673       if node.secondary_ip != node.primary_ip:
674         _AddIpAddress(node.secondary_ip, "node:%s/secondary" % node.name)
675
676     for instance in data.instances.values():
677       for idx, nic in enumerate(instance.nics):
678         if nic.ip is None:
679           continue
680
681         nicparams = objects.FillDict(default_nicparams, nic.nicparams)
682         nic_mode = nicparams[constants.NIC_MODE]
683         nic_link = nicparams[constants.NIC_LINK]
684
685         if nic_mode == constants.NIC_MODE_BRIDGED:
686           link = "bridge:%s" % nic_link
687         elif nic_mode == constants.NIC_MODE_ROUTED:
688           link = "route:%s" % nic_link
689         else:
690           raise errors.ProgrammerError("NIC mode '%s' not handled" % nic_mode)
691
692         _AddIpAddress("%s/%s" % (link, nic.ip),
693                       "instance:%s/nic:%d" % (instance.name, idx))
694
695     for ip, owners in ips.items():
696       if len(owners) > 1:
697         result.append("IP address %s is used by multiple owners: %s" %
698                       (ip, utils.CommaJoin(owners)))
699
700     return result
701
702   @locking.ssynchronized(_config_lock, shared=1)
703   def VerifyConfig(self):
704     """Verify function.
705
706     This is just a wrapper over L{_UnlockedVerifyConfig}.
707
708     @rtype: list
709     @return: a list of error messages; a non-empty list signifies
710         configuration errors
711
712     """
713     return self._UnlockedVerifyConfig()
714
715   def _UnlockedSetDiskID(self, disk, node_name):
716     """Convert the unique ID to the ID needed on the target nodes.
717
718     This is used only for drbd, which needs ip/port configuration.
719
720     The routine descends down and updates its children also, because
721     this helps when the only the top device is passed to the remote
722     node.
723
724     This function is for internal use, when the config lock is already held.
725
726     """
727     if disk.children:
728       for child in disk.children:
729         self._UnlockedSetDiskID(child, node_name)
730
731     if disk.logical_id is None and disk.physical_id is not None:
732       return
733     if disk.dev_type == constants.LD_DRBD8:
734       pnode, snode, port, pminor, sminor, secret = disk.logical_id
735       if node_name not in (pnode, snode):
736         raise errors.ConfigurationError("DRBD device not knowing node %s" %
737                                         node_name)
738       pnode_info = self._UnlockedGetNodeInfo(pnode)
739       snode_info = self._UnlockedGetNodeInfo(snode)
740       if pnode_info is None or snode_info is None:
741         raise errors.ConfigurationError("Can't find primary or secondary node"
742                                         " for %s" % str(disk))
743       p_data = (pnode_info.secondary_ip, port)
744       s_data = (snode_info.secondary_ip, port)
745       if pnode == node_name:
746         disk.physical_id = p_data + s_data + (pminor, secret)
747       else: # it must be secondary, we tested above
748         disk.physical_id = s_data + p_data + (sminor, secret)
749     else:
750       disk.physical_id = disk.logical_id
751     return
752
753   @locking.ssynchronized(_config_lock)
754   def SetDiskID(self, disk, node_name):
755     """Convert the unique ID to the ID needed on the target nodes.
756
757     This is used only for drbd, which needs ip/port configuration.
758
759     The routine descends down and updates its children also, because
760     this helps when the only the top device is passed to the remote
761     node.
762
763     """
764     return self._UnlockedSetDiskID(disk, node_name)
765
766   @locking.ssynchronized(_config_lock)
767   def AddTcpUdpPort(self, port):
768     """Adds a new port to the available port pool.
769
770     @warning: this method does not "flush" the configuration (via
771         L{_WriteConfig}); callers should do that themselves once the
772         configuration is stable
773
774     """
775     if not isinstance(port, int):
776       raise errors.ProgrammerError("Invalid type passed for port")
777
778     self._config_data.cluster.tcpudp_port_pool.add(port)
779
780   @locking.ssynchronized(_config_lock, shared=1)
781   def GetPortList(self):
782     """Returns a copy of the current port list.
783
784     """
785     return self._config_data.cluster.tcpudp_port_pool.copy()
786
787   @locking.ssynchronized(_config_lock)
788   def AllocatePort(self):
789     """Allocate a port.
790
791     The port will be taken from the available port pool or from the
792     default port range (and in this case we increase
793     highest_used_port).
794
795     """
796     # If there are TCP/IP ports configured, we use them first.
797     if self._config_data.cluster.tcpudp_port_pool:
798       port = self._config_data.cluster.tcpudp_port_pool.pop()
799     else:
800       port = self._config_data.cluster.highest_used_port + 1
801       if port >= constants.LAST_DRBD_PORT:
802         raise errors.ConfigurationError("The highest used port is greater"
803                                         " than %s. Aborting." %
804                                         constants.LAST_DRBD_PORT)
805       self._config_data.cluster.highest_used_port = port
806
807     self._WriteConfig()
808     return port
809
810   def _UnlockedComputeDRBDMap(self):
811     """Compute the used DRBD minor/nodes.
812
813     @rtype: (dict, list)
814     @return: dictionary of node_name: dict of minor: instance_name;
815         the returned dict will have all the nodes in it (even if with
816         an empty list), and a list of duplicates; if the duplicates
817         list is not empty, the configuration is corrupted and its caller
818         should raise an exception
819
820     """
821     def _AppendUsedPorts(instance_name, disk, used):
822       duplicates = []
823       if disk.dev_type == constants.LD_DRBD8 and len(disk.logical_id) >= 5:
824         node_a, node_b, _, minor_a, minor_b = disk.logical_id[:5]
825         for node, port in ((node_a, minor_a), (node_b, minor_b)):
826           assert node in used, ("Node '%s' of instance '%s' not found"
827                                 " in node list" % (node, instance_name))
828           if port in used[node]:
829             duplicates.append((node, port, instance_name, used[node][port]))
830           else:
831             used[node][port] = instance_name
832       if disk.children:
833         for child in disk.children:
834           duplicates.extend(_AppendUsedPorts(instance_name, child, used))
835       return duplicates
836
837     duplicates = []
838     my_dict = dict((node, {}) for node in self._config_data.nodes)
839     for instance in self._config_data.instances.itervalues():
840       for disk in instance.disks:
841         duplicates.extend(_AppendUsedPorts(instance.name, disk, my_dict))
842     for (node, minor), instance in self._temporary_drbds.iteritems():
843       if minor in my_dict[node] and my_dict[node][minor] != instance:
844         duplicates.append((node, minor, instance, my_dict[node][minor]))
845       else:
846         my_dict[node][minor] = instance
847     return my_dict, duplicates
848
849   @locking.ssynchronized(_config_lock)
850   def ComputeDRBDMap(self):
851     """Compute the used DRBD minor/nodes.
852
853     This is just a wrapper over L{_UnlockedComputeDRBDMap}.
854
855     @return: dictionary of node_name: dict of minor: instance_name;
856         the returned dict will have all the nodes in it (even if with
857         an empty list).
858
859     """
860     d_map, duplicates = self._UnlockedComputeDRBDMap()
861     if duplicates:
862       raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
863                                       str(duplicates))
864     return d_map
865
866   @locking.ssynchronized(_config_lock)
867   def AllocateDRBDMinor(self, nodes, instance):
868     """Allocate a drbd minor.
869
870     The free minor will be automatically computed from the existing
871     devices. A node can be given multiple times in order to allocate
872     multiple minors. The result is the list of minors, in the same
873     order as the passed nodes.
874
875     @type instance: string
876     @param instance: the instance for which we allocate minors
877
878     """
879     assert isinstance(instance, basestring), \
880            "Invalid argument '%s' passed to AllocateDRBDMinor" % instance
881
882     d_map, duplicates = self._UnlockedComputeDRBDMap()
883     if duplicates:
884       raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
885                                       str(duplicates))
886     result = []
887     for nname in nodes:
888       ndata = d_map[nname]
889       if not ndata:
890         # no minors used, we can start at 0
891         result.append(0)
892         ndata[0] = instance
893         self._temporary_drbds[(nname, 0)] = instance
894         continue
895       keys = ndata.keys()
896       keys.sort()
897       ffree = utils.FirstFree(keys)
898       if ffree is None:
899         # return the next minor
900         # TODO: implement high-limit check
901         minor = keys[-1] + 1
902       else:
903         minor = ffree
904       # double-check minor against current instances
905       assert minor not in d_map[nname], \
906              ("Attempt to reuse allocated DRBD minor %d on node %s,"
907               " already allocated to instance %s" %
908               (minor, nname, d_map[nname][minor]))
909       ndata[minor] = instance
910       # double-check minor against reservation
911       r_key = (nname, minor)
912       assert r_key not in self._temporary_drbds, \
913              ("Attempt to reuse reserved DRBD minor %d on node %s,"
914               " reserved for instance %s" %
915               (minor, nname, self._temporary_drbds[r_key]))
916       self._temporary_drbds[r_key] = instance
917       result.append(minor)
918     logging.debug("Request to allocate drbd minors, input: %s, returning %s",
919                   nodes, result)
920     return result
921
922   def _UnlockedReleaseDRBDMinors(self, instance):
923     """Release temporary drbd minors allocated for a given instance.
924
925     @type instance: string
926     @param instance: the instance for which temporary minors should be
927                      released
928
929     """
930     assert isinstance(instance, basestring), \
931            "Invalid argument passed to ReleaseDRBDMinors"
932     for key, name in self._temporary_drbds.items():
933       if name == instance:
934         del self._temporary_drbds[key]
935
936   @locking.ssynchronized(_config_lock)
937   def ReleaseDRBDMinors(self, instance):
938     """Release temporary drbd minors allocated for a given instance.
939
940     This should be called on the error paths, on the success paths
941     it's automatically called by the ConfigWriter add and update
942     functions.
943
944     This function is just a wrapper over L{_UnlockedReleaseDRBDMinors}.
945
946     @type instance: string
947     @param instance: the instance for which temporary minors should be
948                      released
949
950     """
951     self._UnlockedReleaseDRBDMinors(instance)
952
953   @locking.ssynchronized(_config_lock, shared=1)
954   def GetConfigVersion(self):
955     """Get the configuration version.
956
957     @return: Config version
958
959     """
960     return self._config_data.version
961
962   @locking.ssynchronized(_config_lock, shared=1)
963   def GetClusterName(self):
964     """Get cluster name.
965
966     @return: Cluster name
967
968     """
969     return self._config_data.cluster.cluster_name
970
971   @locking.ssynchronized(_config_lock, shared=1)
972   def GetMasterNode(self):
973     """Get the hostname of the master node for this cluster.
974
975     @return: Master hostname
976
977     """
978     return self._config_data.cluster.master_node
979
980   @locking.ssynchronized(_config_lock, shared=1)
981   def GetMasterIP(self):
982     """Get the IP of the master node for this cluster.
983
984     @return: Master IP
985
986     """
987     return self._config_data.cluster.master_ip
988
989   @locking.ssynchronized(_config_lock, shared=1)
990   def GetMasterNetdev(self):
991     """Get the master network device for this cluster.
992
993     """
994     return self._config_data.cluster.master_netdev
995
996   @locking.ssynchronized(_config_lock, shared=1)
997   def GetMasterNetmask(self):
998     """Get the netmask of the master node for this cluster.
999
1000     """
1001     return self._config_data.cluster.master_netmask
1002
1003   @locking.ssynchronized(_config_lock, shared=1)
1004   def GetUseExternalMipScript(self):
1005     """Get flag representing whether to use the external master IP setup script.
1006
1007     """
1008     return self._config_data.cluster.use_external_mip_script
1009
1010   @locking.ssynchronized(_config_lock, shared=1)
1011   def GetFileStorageDir(self):
1012     """Get the file storage dir for this cluster.
1013
1014     """
1015     return self._config_data.cluster.file_storage_dir
1016
1017   @locking.ssynchronized(_config_lock, shared=1)
1018   def GetSharedFileStorageDir(self):
1019     """Get the shared file storage dir for this cluster.
1020
1021     """
1022     return self._config_data.cluster.shared_file_storage_dir
1023
1024   @locking.ssynchronized(_config_lock, shared=1)
1025   def GetHypervisorType(self):
1026     """Get the hypervisor type for this cluster.
1027
1028     """
1029     return self._config_data.cluster.enabled_hypervisors[0]
1030
1031   @locking.ssynchronized(_config_lock, shared=1)
1032   def GetHostKey(self):
1033     """Return the rsa hostkey from the config.
1034
1035     @rtype: string
1036     @return: the rsa hostkey
1037
1038     """
1039     return self._config_data.cluster.rsahostkeypub
1040
1041   @locking.ssynchronized(_config_lock, shared=1)
1042   def GetDefaultIAllocator(self):
1043     """Get the default instance allocator for this cluster.
1044
1045     """
1046     return self._config_data.cluster.default_iallocator
1047
1048   @locking.ssynchronized(_config_lock, shared=1)
1049   def GetPrimaryIPFamily(self):
1050     """Get cluster primary ip family.
1051
1052     @return: primary ip family
1053
1054     """
1055     return self._config_data.cluster.primary_ip_family
1056
1057   @locking.ssynchronized(_config_lock, shared=1)
1058   def GetMasterNetworkParameters(self):
1059     """Get network parameters of the master node.
1060
1061     @rtype: L{object.MasterNetworkParameters}
1062     @return: network parameters of the master node
1063
1064     """
1065     cluster = self._config_data.cluster
1066     result = objects.MasterNetworkParameters(name=cluster.master_node,
1067       ip=cluster.master_ip,
1068       netmask=cluster.master_netmask,
1069       netdev=cluster.master_netdev,
1070       ip_family=cluster.primary_ip_family)
1071
1072     return result
1073
1074   @locking.ssynchronized(_config_lock)
1075   def AddNodeGroup(self, group, ec_id, check_uuid=True):
1076     """Add a node group to the configuration.
1077
1078     This method calls group.UpgradeConfig() to fill any missing attributes
1079     according to their default values.
1080
1081     @type group: L{objects.NodeGroup}
1082     @param group: the NodeGroup object to add
1083     @type ec_id: string
1084     @param ec_id: unique id for the job to use when creating a missing UUID
1085     @type check_uuid: bool
1086     @param check_uuid: add an UUID to the group if it doesn't have one or, if
1087                        it does, ensure that it does not exist in the
1088                        configuration already
1089
1090     """
1091     self._UnlockedAddNodeGroup(group, ec_id, check_uuid)
1092     self._WriteConfig()
1093
1094   def _UnlockedAddNodeGroup(self, group, ec_id, check_uuid):
1095     """Add a node group to the configuration.
1096
1097     """
1098     logging.info("Adding node group %s to configuration", group.name)
1099
1100     # Some code might need to add a node group with a pre-populated UUID
1101     # generated with ConfigWriter.GenerateUniqueID(). We allow them to bypass
1102     # the "does this UUID" exist already check.
1103     if check_uuid:
1104       self._EnsureUUID(group, ec_id)
1105
1106     try:
1107       existing_uuid = self._UnlockedLookupNodeGroup(group.name)
1108     except errors.OpPrereqError:
1109       pass
1110     else:
1111       raise errors.OpPrereqError("Desired group name '%s' already exists as a"
1112                                  " node group (UUID: %s)" %
1113                                  (group.name, existing_uuid),
1114                                  errors.ECODE_EXISTS)
1115
1116     group.serial_no = 1
1117     group.ctime = group.mtime = time.time()
1118     group.UpgradeConfig()
1119
1120     self._config_data.nodegroups[group.uuid] = group
1121     self._config_data.cluster.serial_no += 1
1122
1123   @locking.ssynchronized(_config_lock)
1124   def RemoveNodeGroup(self, group_uuid):
1125     """Remove a node group from the configuration.
1126
1127     @type group_uuid: string
1128     @param group_uuid: the UUID of the node group to remove
1129
1130     """
1131     logging.info("Removing node group %s from configuration", group_uuid)
1132
1133     if group_uuid not in self._config_data.nodegroups:
1134       raise errors.ConfigurationError("Unknown node group '%s'" % group_uuid)
1135
1136     assert len(self._config_data.nodegroups) != 1, \
1137             "Group '%s' is the only group, cannot be removed" % group_uuid
1138
1139     del self._config_data.nodegroups[group_uuid]
1140     self._config_data.cluster.serial_no += 1
1141     self._WriteConfig()
1142
1143   def _UnlockedLookupNodeGroup(self, target):
1144     """Lookup a node group's UUID.
1145
1146     @type target: string or None
1147     @param target: group name or UUID or None to look for the default
1148     @rtype: string
1149     @return: nodegroup UUID
1150     @raises errors.OpPrereqError: when the target group cannot be found
1151
1152     """
1153     if target is None:
1154       if len(self._config_data.nodegroups) != 1:
1155         raise errors.OpPrereqError("More than one node group exists. Target"
1156                                    " group must be specified explicitly.")
1157       else:
1158         return self._config_data.nodegroups.keys()[0]
1159     if target in self._config_data.nodegroups:
1160       return target
1161     for nodegroup in self._config_data.nodegroups.values():
1162       if nodegroup.name == target:
1163         return nodegroup.uuid
1164     raise errors.OpPrereqError("Node group '%s' not found" % target,
1165                                errors.ECODE_NOENT)
1166
1167   @locking.ssynchronized(_config_lock, shared=1)
1168   def LookupNodeGroup(self, target):
1169     """Lookup a node group's UUID.
1170
1171     This function is just a wrapper over L{_UnlockedLookupNodeGroup}.
1172
1173     @type target: string or None
1174     @param target: group name or UUID or None to look for the default
1175     @rtype: string
1176     @return: nodegroup UUID
1177
1178     """
1179     return self._UnlockedLookupNodeGroup(target)
1180
1181   def _UnlockedGetNodeGroup(self, uuid):
1182     """Lookup a node group.
1183
1184     @type uuid: string
1185     @param uuid: group UUID
1186     @rtype: L{objects.NodeGroup} or None
1187     @return: nodegroup object, or None if not found
1188
1189     """
1190     if uuid not in self._config_data.nodegroups:
1191       return None
1192
1193     return self._config_data.nodegroups[uuid]
1194
1195   @locking.ssynchronized(_config_lock, shared=1)
1196   def GetNodeGroup(self, uuid):
1197     """Lookup a node group.
1198
1199     @type uuid: string
1200     @param uuid: group UUID
1201     @rtype: L{objects.NodeGroup} or None
1202     @return: nodegroup object, or None if not found
1203
1204     """
1205     return self._UnlockedGetNodeGroup(uuid)
1206
1207   @locking.ssynchronized(_config_lock, shared=1)
1208   def GetAllNodeGroupsInfo(self):
1209     """Get the configuration of all node groups.
1210
1211     """
1212     return dict(self._config_data.nodegroups)
1213
1214   @locking.ssynchronized(_config_lock, shared=1)
1215   def GetNodeGroupList(self):
1216     """Get a list of node groups.
1217
1218     """
1219     return self._config_data.nodegroups.keys()
1220
1221   @locking.ssynchronized(_config_lock, shared=1)
1222   def GetNodeGroupMembersByNodes(self, nodes):
1223     """Get nodes which are member in the same nodegroups as the given nodes.
1224
1225     """
1226     ngfn = lambda node_name: self._UnlockedGetNodeInfo(node_name).group
1227     return frozenset(member_name
1228                      for node_name in nodes
1229                      for member_name in
1230                        self._UnlockedGetNodeGroup(ngfn(node_name)).members)
1231
1232   @locking.ssynchronized(_config_lock, shared=1)
1233   def GetMultiNodeGroupInfo(self, group_uuids):
1234     """Get the configuration of multiple node groups.
1235
1236     @param group_uuids: List of node group UUIDs
1237     @rtype: list
1238     @return: List of tuples of (group_uuid, group_info)
1239
1240     """
1241     return [(uuid, self._UnlockedGetNodeGroup(uuid)) for uuid in group_uuids]
1242
1243   @locking.ssynchronized(_config_lock)
1244   def AddInstance(self, instance, ec_id):
1245     """Add an instance to the config.
1246
1247     This should be used after creating a new instance.
1248
1249     @type instance: L{objects.Instance}
1250     @param instance: the instance object
1251
1252     """
1253     if not isinstance(instance, objects.Instance):
1254       raise errors.ProgrammerError("Invalid type passed to AddInstance")
1255
1256     if instance.disk_template != constants.DT_DISKLESS:
1257       all_lvs = instance.MapLVsByNode()
1258       logging.info("Instance '%s' DISK_LAYOUT: %s", instance.name, all_lvs)
1259
1260     all_macs = self._AllMACs()
1261     for nic in instance.nics:
1262       if nic.mac in all_macs:
1263         raise errors.ConfigurationError("Cannot add instance %s:"
1264                                         " MAC address '%s' already in use." %
1265                                         (instance.name, nic.mac))
1266
1267     self._EnsureUUID(instance, ec_id)
1268
1269     instance.serial_no = 1
1270     instance.ctime = instance.mtime = time.time()
1271     self._config_data.instances[instance.name] = instance
1272     self._config_data.cluster.serial_no += 1
1273     self._UnlockedReleaseDRBDMinors(instance.name)
1274     self._WriteConfig()
1275
1276   def _EnsureUUID(self, item, ec_id):
1277     """Ensures a given object has a valid UUID.
1278
1279     @param item: the instance or node to be checked
1280     @param ec_id: the execution context id for the uuid reservation
1281
1282     """
1283     if not item.uuid:
1284       item.uuid = self._GenerateUniqueID(ec_id)
1285     elif item.uuid in self._AllIDs(include_temporary=True):
1286       raise errors.ConfigurationError("Cannot add '%s': UUID %s already"
1287                                       " in use" % (item.name, item.uuid))
1288
1289   def _SetInstanceStatus(self, instance_name, status):
1290     """Set the instance's status to a given value.
1291
1292     """
1293     assert status in constants.ADMINST_ALL, \
1294            "Invalid status '%s' passed to SetInstanceStatus" % (status,)
1295
1296     if instance_name not in self._config_data.instances:
1297       raise errors.ConfigurationError("Unknown instance '%s'" %
1298                                       instance_name)
1299     instance = self._config_data.instances[instance_name]
1300     if instance.admin_state != status:
1301       instance.admin_state = status
1302       instance.serial_no += 1
1303       instance.mtime = time.time()
1304       self._WriteConfig()
1305
1306   @locking.ssynchronized(_config_lock)
1307   def MarkInstanceUp(self, instance_name):
1308     """Mark the instance status to up in the config.
1309
1310     """
1311     self._SetInstanceStatus(instance_name, constants.ADMINST_UP)
1312
1313   @locking.ssynchronized(_config_lock)
1314   def MarkInstanceOffline(self, instance_name):
1315     """Mark the instance status to down in the config.
1316
1317     """
1318     self._SetInstanceStatus(instance_name, constants.ADMINST_OFFLINE)
1319
1320   @locking.ssynchronized(_config_lock)
1321   def RemoveInstance(self, instance_name):
1322     """Remove the instance from the configuration.
1323
1324     """
1325     if instance_name not in self._config_data.instances:
1326       raise errors.ConfigurationError("Unknown instance '%s'" % instance_name)
1327
1328     # If a network port has been allocated to the instance,
1329     # return it to the pool of free ports.
1330     inst = self._config_data.instances[instance_name]
1331     network_port = getattr(inst, "network_port", None)
1332     if network_port is not None:
1333       self._config_data.cluster.tcpudp_port_pool.add(network_port)
1334
1335     del self._config_data.instances[instance_name]
1336     self._config_data.cluster.serial_no += 1
1337     self._WriteConfig()
1338
1339   @locking.ssynchronized(_config_lock)
1340   def RenameInstance(self, old_name, new_name):
1341     """Rename an instance.
1342
1343     This needs to be done in ConfigWriter and not by RemoveInstance
1344     combined with AddInstance as only we can guarantee an atomic
1345     rename.
1346
1347     """
1348     if old_name not in self._config_data.instances:
1349       raise errors.ConfigurationError("Unknown instance '%s'" % old_name)
1350
1351     # Operate on a copy to not loose instance object in case of a failure
1352     inst = self._config_data.instances[old_name].Copy()
1353     inst.name = new_name
1354
1355     for (idx, disk) in enumerate(inst.disks):
1356       if disk.dev_type == constants.LD_FILE:
1357         # rename the file paths in logical and physical id
1358         file_storage_dir = os.path.dirname(os.path.dirname(disk.logical_id[1]))
1359         disk.logical_id = (disk.logical_id[0],
1360                            utils.PathJoin(file_storage_dir, inst.name,
1361                                           "disk%s" % idx))
1362         disk.physical_id = disk.logical_id
1363
1364     # Actually replace instance object
1365     del self._config_data.instances[old_name]
1366     self._config_data.instances[inst.name] = inst
1367
1368     # Force update of ssconf files
1369     self._config_data.cluster.serial_no += 1
1370
1371     self._WriteConfig()
1372
1373   @locking.ssynchronized(_config_lock)
1374   def MarkInstanceDown(self, instance_name):
1375     """Mark the status of an instance to down in the configuration.
1376
1377     """
1378     self._SetInstanceStatus(instance_name, constants.ADMINST_DOWN)
1379
1380   def _UnlockedGetInstanceList(self):
1381     """Get the list of instances.
1382
1383     This function is for internal use, when the config lock is already held.
1384
1385     """
1386     return self._config_data.instances.keys()
1387
1388   @locking.ssynchronized(_config_lock, shared=1)
1389   def GetInstanceList(self):
1390     """Get the list of instances.
1391
1392     @return: array of instances, ex. ['instance2.example.com',
1393         'instance1.example.com']
1394
1395     """
1396     return self._UnlockedGetInstanceList()
1397
1398   def ExpandInstanceName(self, short_name):
1399     """Attempt to expand an incomplete instance name.
1400
1401     """
1402     # Locking is done in L{ConfigWriter.GetInstanceList}
1403     return _MatchNameComponentIgnoreCase(short_name, self.GetInstanceList())
1404
1405   def _UnlockedGetInstanceInfo(self, instance_name):
1406     """Returns information about an instance.
1407
1408     This function is for internal use, when the config lock is already held.
1409
1410     """
1411     if instance_name not in self._config_data.instances:
1412       return None
1413
1414     return self._config_data.instances[instance_name]
1415
1416   @locking.ssynchronized(_config_lock, shared=1)
1417   def GetInstanceInfo(self, instance_name):
1418     """Returns information about an instance.
1419
1420     It takes the information from the configuration file. Other information of
1421     an instance are taken from the live systems.
1422
1423     @param instance_name: name of the instance, e.g.
1424         I{instance1.example.com}
1425
1426     @rtype: L{objects.Instance}
1427     @return: the instance object
1428
1429     """
1430     return self._UnlockedGetInstanceInfo(instance_name)
1431
1432   @locking.ssynchronized(_config_lock, shared=1)
1433   def GetInstanceNodeGroups(self, instance_name, primary_only=False):
1434     """Returns set of node group UUIDs for instance's nodes.
1435
1436     @rtype: frozenset
1437
1438     """
1439     instance = self._UnlockedGetInstanceInfo(instance_name)
1440     if not instance:
1441       raise errors.ConfigurationError("Unknown instance '%s'" % instance_name)
1442
1443     if primary_only:
1444       nodes = [instance.primary_node]
1445     else:
1446       nodes = instance.all_nodes
1447
1448     return frozenset(self._UnlockedGetNodeInfo(node_name).group
1449                      for node_name in nodes)
1450
1451   @locking.ssynchronized(_config_lock, shared=1)
1452   def GetMultiInstanceInfo(self, instances):
1453     """Get the configuration of multiple instances.
1454
1455     @param instances: list of instance names
1456     @rtype: list
1457     @return: list of tuples (instance, instance_info), where
1458         instance_info is what would GetInstanceInfo return for the
1459         node, while keeping the original order
1460
1461     """
1462     return [(name, self._UnlockedGetInstanceInfo(name)) for name in instances]
1463
1464   @locking.ssynchronized(_config_lock, shared=1)
1465   def GetAllInstancesInfo(self):
1466     """Get the configuration of all instances.
1467
1468     @rtype: dict
1469     @return: dict of (instance, instance_info), where instance_info is what
1470               would GetInstanceInfo return for the node
1471
1472     """
1473     my_dict = dict([(instance, self._UnlockedGetInstanceInfo(instance))
1474                     for instance in self._UnlockedGetInstanceList()])
1475     return my_dict
1476
1477   @locking.ssynchronized(_config_lock, shared=1)
1478   def GetInstancesInfoByFilter(self, filter_fn):
1479     """Get instance configuration with a filter.
1480
1481     @type filter_fn: callable
1482     @param filter_fn: Filter function receiving instance object as parameter,
1483       returning boolean. Important: this function is called while the
1484       configuration locks is held. It must not do any complex work or call
1485       functions potentially leading to a deadlock. Ideally it doesn't call any
1486       other functions and just compares instance attributes.
1487
1488     """
1489     return dict((name, inst)
1490                 for (name, inst) in self._config_data.instances.items()
1491                 if filter_fn(inst))
1492
1493   @locking.ssynchronized(_config_lock)
1494   def AddNode(self, node, ec_id):
1495     """Add a node to the configuration.
1496
1497     @type node: L{objects.Node}
1498     @param node: a Node instance
1499
1500     """
1501     logging.info("Adding node %s to configuration", node.name)
1502
1503     self._EnsureUUID(node, ec_id)
1504
1505     node.serial_no = 1
1506     node.ctime = node.mtime = time.time()
1507     self._UnlockedAddNodeToGroup(node.name, node.group)
1508     self._config_data.nodes[node.name] = node
1509     self._config_data.cluster.serial_no += 1
1510     self._WriteConfig()
1511
1512   @locking.ssynchronized(_config_lock)
1513   def RemoveNode(self, node_name):
1514     """Remove a node from the configuration.
1515
1516     """
1517     logging.info("Removing node %s from configuration", node_name)
1518
1519     if node_name not in self._config_data.nodes:
1520       raise errors.ConfigurationError("Unknown node '%s'" % node_name)
1521
1522     self._UnlockedRemoveNodeFromGroup(self._config_data.nodes[node_name])
1523     del self._config_data.nodes[node_name]
1524     self._config_data.cluster.serial_no += 1
1525     self._WriteConfig()
1526
1527   def ExpandNodeName(self, short_name):
1528     """Attempt to expand an incomplete node name.
1529
1530     """
1531     # Locking is done in L{ConfigWriter.GetNodeList}
1532     return _MatchNameComponentIgnoreCase(short_name, self.GetNodeList())
1533
1534   def _UnlockedGetNodeInfo(self, node_name):
1535     """Get the configuration of a node, as stored in the config.
1536
1537     This function is for internal use, when the config lock is already
1538     held.
1539
1540     @param node_name: the node name, e.g. I{node1.example.com}
1541
1542     @rtype: L{objects.Node}
1543     @return: the node object
1544
1545     """
1546     if node_name not in self._config_data.nodes:
1547       return None
1548
1549     return self._config_data.nodes[node_name]
1550
1551   @locking.ssynchronized(_config_lock, shared=1)
1552   def GetNodeInfo(self, node_name):
1553     """Get the configuration of a node, as stored in the config.
1554
1555     This is just a locked wrapper over L{_UnlockedGetNodeInfo}.
1556
1557     @param node_name: the node name, e.g. I{node1.example.com}
1558
1559     @rtype: L{objects.Node}
1560     @return: the node object
1561
1562     """
1563     return self._UnlockedGetNodeInfo(node_name)
1564
1565   @locking.ssynchronized(_config_lock, shared=1)
1566   def GetNodeInstances(self, node_name):
1567     """Get the instances of a node, as stored in the config.
1568
1569     @param node_name: the node name, e.g. I{node1.example.com}
1570
1571     @rtype: (list, list)
1572     @return: a tuple with two lists: the primary and the secondary instances
1573
1574     """
1575     pri = []
1576     sec = []
1577     for inst in self._config_data.instances.values():
1578       if inst.primary_node == node_name:
1579         pri.append(inst.name)
1580       if node_name in inst.secondary_nodes:
1581         sec.append(inst.name)
1582     return (pri, sec)
1583
1584   @locking.ssynchronized(_config_lock, shared=1)
1585   def GetNodeGroupInstances(self, uuid, primary_only=False):
1586     """Get the instances of a node group.
1587
1588     @param uuid: Node group UUID
1589     @param primary_only: Whether to only consider primary nodes
1590     @rtype: frozenset
1591     @return: List of instance names in node group
1592
1593     """
1594     if primary_only:
1595       nodes_fn = lambda inst: [inst.primary_node]
1596     else:
1597       nodes_fn = lambda inst: inst.all_nodes
1598
1599     return frozenset(inst.name
1600                      for inst in self._config_data.instances.values()
1601                      for node_name in nodes_fn(inst)
1602                      if self._UnlockedGetNodeInfo(node_name).group == uuid)
1603
1604   def _UnlockedGetNodeList(self):
1605     """Return the list of nodes which are in the configuration.
1606
1607     This function is for internal use, when the config lock is already
1608     held.
1609
1610     @rtype: list
1611
1612     """
1613     return self._config_data.nodes.keys()
1614
1615   @locking.ssynchronized(_config_lock, shared=1)
1616   def GetNodeList(self):
1617     """Return the list of nodes which are in the configuration.
1618
1619     """
1620     return self._UnlockedGetNodeList()
1621
1622   def _UnlockedGetOnlineNodeList(self):
1623     """Return the list of nodes which are online.
1624
1625     """
1626     all_nodes = [self._UnlockedGetNodeInfo(node)
1627                  for node in self._UnlockedGetNodeList()]
1628     return [node.name for node in all_nodes if not node.offline]
1629
1630   @locking.ssynchronized(_config_lock, shared=1)
1631   def GetOnlineNodeList(self):
1632     """Return the list of nodes which are online.
1633
1634     """
1635     return self._UnlockedGetOnlineNodeList()
1636
1637   @locking.ssynchronized(_config_lock, shared=1)
1638   def GetVmCapableNodeList(self):
1639     """Return the list of nodes which are not vm capable.
1640
1641     """
1642     all_nodes = [self._UnlockedGetNodeInfo(node)
1643                  for node in self._UnlockedGetNodeList()]
1644     return [node.name for node in all_nodes if node.vm_capable]
1645
1646   @locking.ssynchronized(_config_lock, shared=1)
1647   def GetNonVmCapableNodeList(self):
1648     """Return the list of nodes which are not vm capable.
1649
1650     """
1651     all_nodes = [self._UnlockedGetNodeInfo(node)
1652                  for node in self._UnlockedGetNodeList()]
1653     return [node.name for node in all_nodes if not node.vm_capable]
1654
1655   @locking.ssynchronized(_config_lock, shared=1)
1656   def GetMultiNodeInfo(self, nodes):
1657     """Get the configuration of multiple nodes.
1658
1659     @param nodes: list of node names
1660     @rtype: list
1661     @return: list of tuples of (node, node_info), where node_info is
1662         what would GetNodeInfo return for the node, in the original
1663         order
1664
1665     """
1666     return [(name, self._UnlockedGetNodeInfo(name)) for name in nodes]
1667
1668   @locking.ssynchronized(_config_lock, shared=1)
1669   def GetAllNodesInfo(self):
1670     """Get the configuration of all nodes.
1671
1672     @rtype: dict
1673     @return: dict of (node, node_info), where node_info is what
1674               would GetNodeInfo return for the node
1675
1676     """
1677     return self._UnlockedGetAllNodesInfo()
1678
1679   def _UnlockedGetAllNodesInfo(self):
1680     """Gets configuration of all nodes.
1681
1682     @note: See L{GetAllNodesInfo}
1683
1684     """
1685     return dict([(node, self._UnlockedGetNodeInfo(node))
1686                  for node in self._UnlockedGetNodeList()])
1687
1688   @locking.ssynchronized(_config_lock, shared=1)
1689   def GetNodeGroupsFromNodes(self, nodes):
1690     """Returns groups for a list of nodes.
1691
1692     @type nodes: list of string
1693     @param nodes: List of node names
1694     @rtype: frozenset
1695
1696     """
1697     return frozenset(self._UnlockedGetNodeInfo(name).group for name in nodes)
1698
1699   def _UnlockedGetMasterCandidateStats(self, exceptions=None):
1700     """Get the number of current and maximum desired and possible candidates.
1701
1702     @type exceptions: list
1703     @param exceptions: if passed, list of nodes that should be ignored
1704     @rtype: tuple
1705     @return: tuple of (current, desired and possible, possible)
1706
1707     """
1708     mc_now = mc_should = mc_max = 0
1709     for node in self._config_data.nodes.values():
1710       if exceptions and node.name in exceptions:
1711         continue
1712       if not (node.offline or node.drained) and node.master_capable:
1713         mc_max += 1
1714       if node.master_candidate:
1715         mc_now += 1
1716     mc_should = min(mc_max, self._config_data.cluster.candidate_pool_size)
1717     return (mc_now, mc_should, mc_max)
1718
1719   @locking.ssynchronized(_config_lock, shared=1)
1720   def GetMasterCandidateStats(self, exceptions=None):
1721     """Get the number of current and maximum possible candidates.
1722
1723     This is just a wrapper over L{_UnlockedGetMasterCandidateStats}.
1724
1725     @type exceptions: list
1726     @param exceptions: if passed, list of nodes that should be ignored
1727     @rtype: tuple
1728     @return: tuple of (current, max)
1729
1730     """
1731     return self._UnlockedGetMasterCandidateStats(exceptions)
1732
1733   @locking.ssynchronized(_config_lock)
1734   def MaintainCandidatePool(self, exceptions):
1735     """Try to grow the candidate pool to the desired size.
1736
1737     @type exceptions: list
1738     @param exceptions: if passed, list of nodes that should be ignored
1739     @rtype: list
1740     @return: list with the adjusted nodes (L{objects.Node} instances)
1741
1742     """
1743     mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats(exceptions)
1744     mod_list = []
1745     if mc_now < mc_max:
1746       node_list = self._config_data.nodes.keys()
1747       random.shuffle(node_list)
1748       for name in node_list:
1749         if mc_now >= mc_max:
1750           break
1751         node = self._config_data.nodes[name]
1752         if (node.master_candidate or node.offline or node.drained or
1753             node.name in exceptions or not node.master_capable):
1754           continue
1755         mod_list.append(node)
1756         node.master_candidate = True
1757         node.serial_no += 1
1758         mc_now += 1
1759       if mc_now != mc_max:
1760         # this should not happen
1761         logging.warning("Warning: MaintainCandidatePool didn't manage to"
1762                         " fill the candidate pool (%d/%d)", mc_now, mc_max)
1763       if mod_list:
1764         self._config_data.cluster.serial_no += 1
1765         self._WriteConfig()
1766
1767     return mod_list
1768
1769   def _UnlockedAddNodeToGroup(self, node_name, nodegroup_uuid):
1770     """Add a given node to the specified group.
1771
1772     """
1773     if nodegroup_uuid not in self._config_data.nodegroups:
1774       # This can happen if a node group gets deleted between its lookup and
1775       # when we're adding the first node to it, since we don't keep a lock in
1776       # the meantime. It's ok though, as we'll fail cleanly if the node group
1777       # is not found anymore.
1778       raise errors.OpExecError("Unknown node group: %s" % nodegroup_uuid)
1779     if node_name not in self._config_data.nodegroups[nodegroup_uuid].members:
1780       self._config_data.nodegroups[nodegroup_uuid].members.append(node_name)
1781
1782   def _UnlockedRemoveNodeFromGroup(self, node):
1783     """Remove a given node from its group.
1784
1785     """
1786     nodegroup = node.group
1787     if nodegroup not in self._config_data.nodegroups:
1788       logging.warning("Warning: node '%s' has unknown node group '%s'"
1789                       " (while being removed from it)", node.name, nodegroup)
1790     nodegroup_obj = self._config_data.nodegroups[nodegroup]
1791     if node.name not in nodegroup_obj.members:
1792       logging.warning("Warning: node '%s' not a member of its node group '%s'"
1793                       " (while being removed from it)", node.name, nodegroup)
1794     else:
1795       nodegroup_obj.members.remove(node.name)
1796
1797   @locking.ssynchronized(_config_lock)
1798   def AssignGroupNodes(self, mods):
1799     """Changes the group of a number of nodes.
1800
1801     @type mods: list of tuples; (node name, new group UUID)
1802     @param mods: Node membership modifications
1803
1804     """
1805     groups = self._config_data.nodegroups
1806     nodes = self._config_data.nodes
1807
1808     resmod = []
1809
1810     # Try to resolve names/UUIDs first
1811     for (node_name, new_group_uuid) in mods:
1812       try:
1813         node = nodes[node_name]
1814       except KeyError:
1815         raise errors.ConfigurationError("Unable to find node '%s'" % node_name)
1816
1817       if node.group == new_group_uuid:
1818         # Node is being assigned to its current group
1819         logging.debug("Node '%s' was assigned to its current group (%s)",
1820                       node_name, node.group)
1821         continue
1822
1823       # Try to find current group of node
1824       try:
1825         old_group = groups[node.group]
1826       except KeyError:
1827         raise errors.ConfigurationError("Unable to find old group '%s'" %
1828                                         node.group)
1829
1830       # Try to find new group for node
1831       try:
1832         new_group = groups[new_group_uuid]
1833       except KeyError:
1834         raise errors.ConfigurationError("Unable to find new group '%s'" %
1835                                         new_group_uuid)
1836
1837       assert node.name in old_group.members, \
1838         ("Inconsistent configuration: node '%s' not listed in members for its"
1839          " old group '%s'" % (node.name, old_group.uuid))
1840       assert node.name not in new_group.members, \
1841         ("Inconsistent configuration: node '%s' already listed in members for"
1842          " its new group '%s'" % (node.name, new_group.uuid))
1843
1844       resmod.append((node, old_group, new_group))
1845
1846     # Apply changes
1847     for (node, old_group, new_group) in resmod:
1848       assert node.uuid != new_group.uuid and old_group.uuid != new_group.uuid, \
1849         "Assigning to current group is not possible"
1850
1851       node.group = new_group.uuid
1852
1853       # Update members of involved groups
1854       if node.name in old_group.members:
1855         old_group.members.remove(node.name)
1856       if node.name not in new_group.members:
1857         new_group.members.append(node.name)
1858
1859     # Update timestamps and serials (only once per node/group object)
1860     now = time.time()
1861     for obj in frozenset(itertools.chain(*resmod)): # pylint: disable=W0142
1862       obj.serial_no += 1
1863       obj.mtime = now
1864
1865     # Force ssconf update
1866     self._config_data.cluster.serial_no += 1
1867
1868     self._WriteConfig()
1869
1870   def _BumpSerialNo(self):
1871     """Bump up the serial number of the config.
1872
1873     """
1874     self._config_data.serial_no += 1
1875     self._config_data.mtime = time.time()
1876
1877   def _AllUUIDObjects(self):
1878     """Returns all objects with uuid attributes.
1879
1880     """
1881     return (self._config_data.instances.values() +
1882             self._config_data.nodes.values() +
1883             self._config_data.nodegroups.values() +
1884             [self._config_data.cluster])
1885
1886   def _OpenConfig(self, accept_foreign):
1887     """Read the config data from disk.
1888
1889     """
1890     raw_data = utils.ReadFile(self._cfg_file)
1891
1892     try:
1893       data = objects.ConfigData.FromDict(serializer.Load(raw_data))
1894     except Exception, err:
1895       raise errors.ConfigurationError(err)
1896
1897     # Make sure the configuration has the right version
1898     _ValidateConfig(data)
1899
1900     if (not hasattr(data, "cluster") or
1901         not hasattr(data.cluster, "rsahostkeypub")):
1902       raise errors.ConfigurationError("Incomplete configuration"
1903                                       " (missing cluster.rsahostkeypub)")
1904
1905     if data.cluster.master_node != self._my_hostname and not accept_foreign:
1906       msg = ("The configuration denotes node %s as master, while my"
1907              " hostname is %s; opening a foreign configuration is only"
1908              " possible in accept_foreign mode" %
1909              (data.cluster.master_node, self._my_hostname))
1910       raise errors.ConfigurationError(msg)
1911
1912     # Upgrade configuration if needed
1913     data.UpgradeConfig()
1914
1915     self._config_data = data
1916     # reset the last serial as -1 so that the next write will cause
1917     # ssconf update
1918     self._last_cluster_serial = -1
1919
1920     # And finally run our (custom) config upgrade sequence
1921     self._UpgradeConfig()
1922
1923     self._cfg_id = utils.GetFileID(path=self._cfg_file)
1924
1925   def _UpgradeConfig(self):
1926     """Run upgrade steps that cannot be done purely in the objects.
1927
1928     This is because some data elements need uniqueness across the
1929     whole configuration, etc.
1930
1931     @warning: this function will call L{_WriteConfig()}, but also
1932         L{DropECReservations} so it needs to be called only from a
1933         "safe" place (the constructor). If one wanted to call it with
1934         the lock held, a DropECReservationUnlocked would need to be
1935         created first, to avoid causing deadlock.
1936
1937     """
1938     modified = False
1939     for item in self._AllUUIDObjects():
1940       if item.uuid is None:
1941         item.uuid = self._GenerateUniqueID(_UPGRADE_CONFIG_JID)
1942         modified = True
1943     if not self._config_data.nodegroups:
1944       default_nodegroup_name = constants.INITIAL_NODE_GROUP_NAME
1945       default_nodegroup = objects.NodeGroup(name=default_nodegroup_name,
1946                                             members=[])
1947       self._UnlockedAddNodeGroup(default_nodegroup, _UPGRADE_CONFIG_JID, True)
1948       modified = True
1949     for node in self._config_data.nodes.values():
1950       if not node.group:
1951         node.group = self.LookupNodeGroup(None)
1952         modified = True
1953       # This is technically *not* an upgrade, but needs to be done both when
1954       # nodegroups are being added, and upon normally loading the config,
1955       # because the members list of a node group is discarded upon
1956       # serializing/deserializing the object.
1957       self._UnlockedAddNodeToGroup(node.name, node.group)
1958     if modified:
1959       self._WriteConfig()
1960       # This is ok even if it acquires the internal lock, as _UpgradeConfig is
1961       # only called at config init time, without the lock held
1962       self.DropECReservations(_UPGRADE_CONFIG_JID)
1963
1964   def _DistributeConfig(self, feedback_fn):
1965     """Distribute the configuration to the other nodes.
1966
1967     Currently, this only copies the configuration file. In the future,
1968     it could be used to encapsulate the 2/3-phase update mechanism.
1969
1970     """
1971     if self._offline:
1972       return True
1973
1974     bad = False
1975
1976     node_list = []
1977     addr_list = []
1978     myhostname = self._my_hostname
1979     # we can skip checking whether _UnlockedGetNodeInfo returns None
1980     # since the node list comes from _UnlocketGetNodeList, and we are
1981     # called with the lock held, so no modifications should take place
1982     # in between
1983     for node_name in self._UnlockedGetNodeList():
1984       if node_name == myhostname:
1985         continue
1986       node_info = self._UnlockedGetNodeInfo(node_name)
1987       if not node_info.master_candidate:
1988         continue
1989       node_list.append(node_info.name)
1990       addr_list.append(node_info.primary_ip)
1991
1992     # TODO: Use dedicated resolver talking to config writer for name resolution
1993     result = \
1994       self._GetRpc(addr_list).call_upload_file(node_list, self._cfg_file)
1995     for to_node, to_result in result.items():
1996       msg = to_result.fail_msg
1997       if msg:
1998         msg = ("Copy of file %s to node %s failed: %s" %
1999                (self._cfg_file, to_node, msg))
2000         logging.error(msg)
2001
2002         if feedback_fn:
2003           feedback_fn(msg)
2004
2005         bad = True
2006
2007     return not bad
2008
2009   def _WriteConfig(self, destination=None, feedback_fn=None):
2010     """Write the configuration data to persistent storage.
2011
2012     """
2013     assert feedback_fn is None or callable(feedback_fn)
2014
2015     # Warn on config errors, but don't abort the save - the
2016     # configuration has already been modified, and we can't revert;
2017     # the best we can do is to warn the user and save as is, leaving
2018     # recovery to the user
2019     config_errors = self._UnlockedVerifyConfig()
2020     if config_errors:
2021       errmsg = ("Configuration data is not consistent: %s" %
2022                 (utils.CommaJoin(config_errors)))
2023       logging.critical(errmsg)
2024       if feedback_fn:
2025         feedback_fn(errmsg)
2026
2027     if destination is None:
2028       destination = self._cfg_file
2029     self._BumpSerialNo()
2030     txt = serializer.Dump(self._config_data.ToDict())
2031
2032     getents = self._getents()
2033     try:
2034       fd = utils.SafeWriteFile(destination, self._cfg_id, data=txt,
2035                                close=False, gid=getents.confd_gid, mode=0640)
2036     except errors.LockError:
2037       raise errors.ConfigurationError("The configuration file has been"
2038                                       " modified since the last write, cannot"
2039                                       " update")
2040     try:
2041       self._cfg_id = utils.GetFileID(fd=fd)
2042     finally:
2043       os.close(fd)
2044
2045     self.write_count += 1
2046
2047     # and redistribute the config file to master candidates
2048     self._DistributeConfig(feedback_fn)
2049
2050     # Write ssconf files on all nodes (including locally)
2051     if self._last_cluster_serial < self._config_data.cluster.serial_no:
2052       if not self._offline:
2053         result = self._GetRpc(None).call_write_ssconf_files(
2054           self._UnlockedGetOnlineNodeList(),
2055           self._UnlockedGetSsconfValues())
2056
2057         for nname, nresu in result.items():
2058           msg = nresu.fail_msg
2059           if msg:
2060             errmsg = ("Error while uploading ssconf files to"
2061                       " node %s: %s" % (nname, msg))
2062             logging.warning(errmsg)
2063
2064             if feedback_fn:
2065               feedback_fn(errmsg)
2066
2067       self._last_cluster_serial = self._config_data.cluster.serial_no
2068
2069   def _UnlockedGetSsconfValues(self):
2070     """Return the values needed by ssconf.
2071
2072     @rtype: dict
2073     @return: a dictionary with keys the ssconf names and values their
2074         associated value
2075
2076     """
2077     fn = "\n".join
2078     instance_names = utils.NiceSort(self._UnlockedGetInstanceList())
2079     node_names = utils.NiceSort(self._UnlockedGetNodeList())
2080     node_info = [self._UnlockedGetNodeInfo(name) for name in node_names]
2081     node_pri_ips = ["%s %s" % (ninfo.name, ninfo.primary_ip)
2082                     for ninfo in node_info]
2083     node_snd_ips = ["%s %s" % (ninfo.name, ninfo.secondary_ip)
2084                     for ninfo in node_info]
2085
2086     instance_data = fn(instance_names)
2087     off_data = fn(node.name for node in node_info if node.offline)
2088     on_data = fn(node.name for node in node_info if not node.offline)
2089     mc_data = fn(node.name for node in node_info if node.master_candidate)
2090     mc_ips_data = fn(node.primary_ip for node in node_info
2091                      if node.master_candidate)
2092     node_data = fn(node_names)
2093     node_pri_ips_data = fn(node_pri_ips)
2094     node_snd_ips_data = fn(node_snd_ips)
2095
2096     cluster = self._config_data.cluster
2097     cluster_tags = fn(cluster.GetTags())
2098
2099     hypervisor_list = fn(cluster.enabled_hypervisors)
2100
2101     uid_pool = uidpool.FormatUidPool(cluster.uid_pool, separator="\n")
2102
2103     nodegroups = ["%s %s" % (nodegroup.uuid, nodegroup.name) for nodegroup in
2104                   self._config_data.nodegroups.values()]
2105     nodegroups_data = fn(utils.NiceSort(nodegroups))
2106
2107     ssconf_values = {
2108       constants.SS_CLUSTER_NAME: cluster.cluster_name,
2109       constants.SS_CLUSTER_TAGS: cluster_tags,
2110       constants.SS_FILE_STORAGE_DIR: cluster.file_storage_dir,
2111       constants.SS_SHARED_FILE_STORAGE_DIR: cluster.shared_file_storage_dir,
2112       constants.SS_MASTER_CANDIDATES: mc_data,
2113       constants.SS_MASTER_CANDIDATES_IPS: mc_ips_data,
2114       constants.SS_MASTER_IP: cluster.master_ip,
2115       constants.SS_MASTER_NETDEV: cluster.master_netdev,
2116       constants.SS_MASTER_NETMASK: str(cluster.master_netmask),
2117       constants.SS_MASTER_NODE: cluster.master_node,
2118       constants.SS_NODE_LIST: node_data,
2119       constants.SS_NODE_PRIMARY_IPS: node_pri_ips_data,
2120       constants.SS_NODE_SECONDARY_IPS: node_snd_ips_data,
2121       constants.SS_OFFLINE_NODES: off_data,
2122       constants.SS_ONLINE_NODES: on_data,
2123       constants.SS_PRIMARY_IP_FAMILY: str(cluster.primary_ip_family),
2124       constants.SS_INSTANCE_LIST: instance_data,
2125       constants.SS_RELEASE_VERSION: constants.RELEASE_VERSION,
2126       constants.SS_HYPERVISOR_LIST: hypervisor_list,
2127       constants.SS_MAINTAIN_NODE_HEALTH: str(cluster.maintain_node_health),
2128       constants.SS_UID_POOL: uid_pool,
2129       constants.SS_NODEGROUPS: nodegroups_data,
2130       }
2131     bad_values = [(k, v) for k, v in ssconf_values.items()
2132                   if not isinstance(v, (str, basestring))]
2133     if bad_values:
2134       err = utils.CommaJoin("%s=%s" % (k, v) for k, v in bad_values)
2135       raise errors.ConfigurationError("Some ssconf key(s) have non-string"
2136                                       " values: %s" % err)
2137     return ssconf_values
2138
2139   @locking.ssynchronized(_config_lock, shared=1)
2140   def GetSsconfValues(self):
2141     """Wrapper using lock around _UnlockedGetSsconf().
2142
2143     """
2144     return self._UnlockedGetSsconfValues()
2145
2146   @locking.ssynchronized(_config_lock, shared=1)
2147   def GetVGName(self):
2148     """Return the volume group name.
2149
2150     """
2151     return self._config_data.cluster.volume_group_name
2152
2153   @locking.ssynchronized(_config_lock)
2154   def SetVGName(self, vg_name):
2155     """Set the volume group name.
2156
2157     """
2158     self._config_data.cluster.volume_group_name = vg_name
2159     self._config_data.cluster.serial_no += 1
2160     self._WriteConfig()
2161
2162   @locking.ssynchronized(_config_lock, shared=1)
2163   def GetDRBDHelper(self):
2164     """Return DRBD usermode helper.
2165
2166     """
2167     return self._config_data.cluster.drbd_usermode_helper
2168
2169   @locking.ssynchronized(_config_lock)
2170   def SetDRBDHelper(self, drbd_helper):
2171     """Set DRBD usermode helper.
2172
2173     """
2174     self._config_data.cluster.drbd_usermode_helper = drbd_helper
2175     self._config_data.cluster.serial_no += 1
2176     self._WriteConfig()
2177
2178   @locking.ssynchronized(_config_lock, shared=1)
2179   def GetMACPrefix(self):
2180     """Return the mac prefix.
2181
2182     """
2183     return self._config_data.cluster.mac_prefix
2184
2185   @locking.ssynchronized(_config_lock, shared=1)
2186   def GetClusterInfo(self):
2187     """Returns information about the cluster
2188
2189     @rtype: L{objects.Cluster}
2190     @return: the cluster object
2191
2192     """
2193     return self._config_data.cluster
2194
2195   @locking.ssynchronized(_config_lock, shared=1)
2196   def HasAnyDiskOfType(self, dev_type):
2197     """Check if in there is at disk of the given type in the configuration.
2198
2199     """
2200     return self._config_data.HasAnyDiskOfType(dev_type)
2201
2202   @locking.ssynchronized(_config_lock)
2203   def Update(self, target, feedback_fn):
2204     """Notify function to be called after updates.
2205
2206     This function must be called when an object (as returned by
2207     GetInstanceInfo, GetNodeInfo, GetCluster) has been updated and the
2208     caller wants the modifications saved to the backing store. Note
2209     that all modified objects will be saved, but the target argument
2210     is the one the caller wants to ensure that it's saved.
2211
2212     @param target: an instance of either L{objects.Cluster},
2213         L{objects.Node} or L{objects.Instance} which is existing in
2214         the cluster
2215     @param feedback_fn: Callable feedback function
2216
2217     """
2218     if self._config_data is None:
2219       raise errors.ProgrammerError("Configuration file not read,"
2220                                    " cannot save.")
2221     update_serial = False
2222     if isinstance(target, objects.Cluster):
2223       test = target == self._config_data.cluster
2224     elif isinstance(target, objects.Node):
2225       test = target in self._config_data.nodes.values()
2226       update_serial = True
2227     elif isinstance(target, objects.Instance):
2228       test = target in self._config_data.instances.values()
2229     elif isinstance(target, objects.NodeGroup):
2230       test = target in self._config_data.nodegroups.values()
2231     else:
2232       raise errors.ProgrammerError("Invalid object type (%s) passed to"
2233                                    " ConfigWriter.Update" % type(target))
2234     if not test:
2235       raise errors.ConfigurationError("Configuration updated since object"
2236                                       " has been read or unknown object")
2237     target.serial_no += 1
2238     target.mtime = now = time.time()
2239
2240     if update_serial:
2241       # for node updates, we need to increase the cluster serial too
2242       self._config_data.cluster.serial_no += 1
2243       self._config_data.cluster.mtime = now
2244
2245     if isinstance(target, objects.Instance):
2246       self._UnlockedReleaseDRBDMinors(target.name)
2247
2248     self._WriteConfig(feedback_fn=feedback_fn)
2249
2250   @locking.ssynchronized(_config_lock)
2251   def DropECReservations(self, ec_id):
2252     """Drop per-execution-context reservations
2253
2254     """
2255     for rm in self._all_rms:
2256       rm.DropECReservations(ec_id)