Add use_external_mip_script cluster parameter
[ganeti-local] / lib / config.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Configuration management for Ganeti
23
24 This module provides the interface to the Ganeti cluster configuration.
25
26 The configuration data is stored on every node but is updated on the master
27 only. After each update, the master distributes the data to the other nodes.
28
29 Currently, the data storage format is JSON. YAML was slow and consuming too
30 much memory.
31
32 """
33
34 # pylint: disable=R0904
35 # R0904: Too many public methods
36
37 import os
38 import random
39 import logging
40 import time
41
42 from ganeti import errors
43 from ganeti import locking
44 from ganeti import utils
45 from ganeti import constants
46 from ganeti import rpc
47 from ganeti import objects
48 from ganeti import serializer
49 from ganeti import uidpool
50 from ganeti import netutils
51 from ganeti import runtime
52
53
54 _config_lock = locking.SharedLock("ConfigWriter")
55
56 # job id used for resource management at config upgrade time
57 _UPGRADE_CONFIG_JID = "jid-cfg-upgrade"
58
59
60 def _ValidateConfig(data):
61   """Verifies that a configuration objects looks valid.
62
63   This only verifies the version of the configuration.
64
65   @raise errors.ConfigurationError: if the version differs from what
66       we expect
67
68   """
69   if data.version != constants.CONFIG_VERSION:
70     raise errors.ConfigVersionMismatch(constants.CONFIG_VERSION, data.version)
71
72
73 class TemporaryReservationManager:
74   """A temporary resource reservation manager.
75
76   This is used to reserve resources in a job, before using them, making sure
77   other jobs cannot get them in the meantime.
78
79   """
80   def __init__(self):
81     self._ec_reserved = {}
82
83   def Reserved(self, resource):
84     for holder_reserved in self._ec_reserved.values():
85       if resource in holder_reserved:
86         return True
87     return False
88
89   def Reserve(self, ec_id, resource):
90     if self.Reserved(resource):
91       raise errors.ReservationError("Duplicate reservation for resource '%s'"
92                                     % str(resource))
93     if ec_id not in self._ec_reserved:
94       self._ec_reserved[ec_id] = set([resource])
95     else:
96       self._ec_reserved[ec_id].add(resource)
97
98   def DropECReservations(self, ec_id):
99     if ec_id in self._ec_reserved:
100       del self._ec_reserved[ec_id]
101
102   def GetReserved(self):
103     all_reserved = set()
104     for holder_reserved in self._ec_reserved.values():
105       all_reserved.update(holder_reserved)
106     return all_reserved
107
108   def Generate(self, existing, generate_one_fn, ec_id):
109     """Generate a new resource of this type
110
111     """
112     assert callable(generate_one_fn)
113
114     all_elems = self.GetReserved()
115     all_elems.update(existing)
116     retries = 64
117     while retries > 0:
118       new_resource = generate_one_fn()
119       if new_resource is not None and new_resource not in all_elems:
120         break
121     else:
122       raise errors.ConfigurationError("Not able generate new resource"
123                                       " (last tried: %s)" % new_resource)
124     self.Reserve(ec_id, new_resource)
125     return new_resource
126
127
128 def _MatchNameComponentIgnoreCase(short_name, names):
129   """Wrapper around L{utils.text.MatchNameComponent}.
130
131   """
132   return utils.MatchNameComponent(short_name, names, case_sensitive=False)
133
134
135 class ConfigWriter:
136   """The interface to the cluster configuration.
137
138   @ivar _temporary_lvs: reservation manager for temporary LVs
139   @ivar _all_rms: a list of all temporary reservation managers
140
141   """
142   def __init__(self, cfg_file=None, offline=False, _getents=runtime.GetEnts,
143                accept_foreign=False):
144     self.write_count = 0
145     self._lock = _config_lock
146     self._config_data = None
147     self._offline = offline
148     if cfg_file is None:
149       self._cfg_file = constants.CLUSTER_CONF_FILE
150     else:
151       self._cfg_file = cfg_file
152     self._getents = _getents
153     self._temporary_ids = TemporaryReservationManager()
154     self._temporary_drbds = {}
155     self._temporary_macs = TemporaryReservationManager()
156     self._temporary_secrets = TemporaryReservationManager()
157     self._temporary_lvs = TemporaryReservationManager()
158     self._all_rms = [self._temporary_ids, self._temporary_macs,
159                      self._temporary_secrets, self._temporary_lvs]
160     # Note: in order to prevent errors when resolving our name in
161     # _DistributeConfig, we compute it here once and reuse it; it's
162     # better to raise an error before starting to modify the config
163     # file than after it was modified
164     self._my_hostname = netutils.Hostname.GetSysName()
165     self._last_cluster_serial = -1
166     self._cfg_id = None
167     self._context = None
168     self._OpenConfig(accept_foreign)
169
170   def _GetRpc(self, address_list):
171     """Returns RPC runner for configuration.
172
173     """
174     return rpc.ConfigRunner(self._context, address_list)
175
176   def SetContext(self, context):
177     """Sets Ganeti context.
178
179     """
180     self._context = context
181
182   # this method needs to be static, so that we can call it on the class
183   @staticmethod
184   def IsCluster():
185     """Check if the cluster is configured.
186
187     """
188     return os.path.exists(constants.CLUSTER_CONF_FILE)
189
190   def _GenerateOneMAC(self):
191     """Generate one mac address
192
193     """
194     prefix = self._config_data.cluster.mac_prefix
195     byte1 = random.randrange(0, 256)
196     byte2 = random.randrange(0, 256)
197     byte3 = random.randrange(0, 256)
198     mac = "%s:%02x:%02x:%02x" % (prefix, byte1, byte2, byte3)
199     return mac
200
201   @locking.ssynchronized(_config_lock, shared=1)
202   def GetNdParams(self, node):
203     """Get the node params populated with cluster defaults.
204
205     @type node: L{objects.Node}
206     @param node: The node we want to know the params for
207     @return: A dict with the filled in node params
208
209     """
210     nodegroup = self._UnlockedGetNodeGroup(node.group)
211     return self._config_data.cluster.FillND(node, nodegroup)
212
213   @locking.ssynchronized(_config_lock, shared=1)
214   def GenerateMAC(self, ec_id):
215     """Generate a MAC for an instance.
216
217     This should check the current instances for duplicates.
218
219     """
220     existing = self._AllMACs()
221     return self._temporary_ids.Generate(existing, self._GenerateOneMAC, ec_id)
222
223   @locking.ssynchronized(_config_lock, shared=1)
224   def ReserveMAC(self, mac, ec_id):
225     """Reserve a MAC for an instance.
226
227     This only checks instances managed by this cluster, it does not
228     check for potential collisions elsewhere.
229
230     """
231     all_macs = self._AllMACs()
232     if mac in all_macs:
233       raise errors.ReservationError("mac already in use")
234     else:
235       self._temporary_macs.Reserve(ec_id, mac)
236
237   @locking.ssynchronized(_config_lock, shared=1)
238   def ReserveLV(self, lv_name, ec_id):
239     """Reserve an VG/LV pair for an instance.
240
241     @type lv_name: string
242     @param lv_name: the logical volume name to reserve
243
244     """
245     all_lvs = self._AllLVs()
246     if lv_name in all_lvs:
247       raise errors.ReservationError("LV already in use")
248     else:
249       self._temporary_lvs.Reserve(ec_id, lv_name)
250
251   @locking.ssynchronized(_config_lock, shared=1)
252   def GenerateDRBDSecret(self, ec_id):
253     """Generate a DRBD secret.
254
255     This checks the current disks for duplicates.
256
257     """
258     return self._temporary_secrets.Generate(self._AllDRBDSecrets(),
259                                             utils.GenerateSecret,
260                                             ec_id)
261
262   def _AllLVs(self):
263     """Compute the list of all LVs.
264
265     """
266     lvnames = set()
267     for instance in self._config_data.instances.values():
268       node_data = instance.MapLVsByNode()
269       for lv_list in node_data.values():
270         lvnames.update(lv_list)
271     return lvnames
272
273   def _AllIDs(self, include_temporary):
274     """Compute the list of all UUIDs and names we have.
275
276     @type include_temporary: boolean
277     @param include_temporary: whether to include the _temporary_ids set
278     @rtype: set
279     @return: a set of IDs
280
281     """
282     existing = set()
283     if include_temporary:
284       existing.update(self._temporary_ids.GetReserved())
285     existing.update(self._AllLVs())
286     existing.update(self._config_data.instances.keys())
287     existing.update(self._config_data.nodes.keys())
288     existing.update([i.uuid for i in self._AllUUIDObjects() if i.uuid])
289     return existing
290
291   def _GenerateUniqueID(self, ec_id):
292     """Generate an unique UUID.
293
294     This checks the current node, instances and disk names for
295     duplicates.
296
297     @rtype: string
298     @return: the unique id
299
300     """
301     existing = self._AllIDs(include_temporary=False)
302     return self._temporary_ids.Generate(existing, utils.NewUUID, ec_id)
303
304   @locking.ssynchronized(_config_lock, shared=1)
305   def GenerateUniqueID(self, ec_id):
306     """Generate an unique ID.
307
308     This is just a wrapper over the unlocked version.
309
310     @type ec_id: string
311     @param ec_id: unique id for the job to reserve the id to
312
313     """
314     return self._GenerateUniqueID(ec_id)
315
316   def _AllMACs(self):
317     """Return all MACs present in the config.
318
319     @rtype: list
320     @return: the list of all MACs
321
322     """
323     result = []
324     for instance in self._config_data.instances.values():
325       for nic in instance.nics:
326         result.append(nic.mac)
327
328     return result
329
330   def _AllDRBDSecrets(self):
331     """Return all DRBD secrets present in the config.
332
333     @rtype: list
334     @return: the list of all DRBD secrets
335
336     """
337     def helper(disk, result):
338       """Recursively gather secrets from this disk."""
339       if disk.dev_type == constants.DT_DRBD8:
340         result.append(disk.logical_id[5])
341       if disk.children:
342         for child in disk.children:
343           helper(child, result)
344
345     result = []
346     for instance in self._config_data.instances.values():
347       for disk in instance.disks:
348         helper(disk, result)
349
350     return result
351
352   def _CheckDiskIDs(self, disk, l_ids, p_ids):
353     """Compute duplicate disk IDs
354
355     @type disk: L{objects.Disk}
356     @param disk: the disk at which to start searching
357     @type l_ids: list
358     @param l_ids: list of current logical ids
359     @type p_ids: list
360     @param p_ids: list of current physical ids
361     @rtype: list
362     @return: a list of error messages
363
364     """
365     result = []
366     if disk.logical_id is not None:
367       if disk.logical_id in l_ids:
368         result.append("duplicate logical id %s" % str(disk.logical_id))
369       else:
370         l_ids.append(disk.logical_id)
371     if disk.physical_id is not None:
372       if disk.physical_id in p_ids:
373         result.append("duplicate physical id %s" % str(disk.physical_id))
374       else:
375         p_ids.append(disk.physical_id)
376
377     if disk.children:
378       for child in disk.children:
379         result.extend(self._CheckDiskIDs(child, l_ids, p_ids))
380     return result
381
382   def _UnlockedVerifyConfig(self):
383     """Verify function.
384
385     @rtype: list
386     @return: a list of error messages; a non-empty list signifies
387         configuration errors
388
389     """
390     # pylint: disable=R0914
391     result = []
392     seen_macs = []
393     ports = {}
394     data = self._config_data
395     cluster = data.cluster
396     seen_lids = []
397     seen_pids = []
398
399     # global cluster checks
400     if not cluster.enabled_hypervisors:
401       result.append("enabled hypervisors list doesn't have any entries")
402     invalid_hvs = set(cluster.enabled_hypervisors) - constants.HYPER_TYPES
403     if invalid_hvs:
404       result.append("enabled hypervisors contains invalid entries: %s" %
405                     invalid_hvs)
406     missing_hvp = (set(cluster.enabled_hypervisors) -
407                    set(cluster.hvparams.keys()))
408     if missing_hvp:
409       result.append("hypervisor parameters missing for the enabled"
410                     " hypervisor(s) %s" % utils.CommaJoin(missing_hvp))
411
412     if cluster.master_node not in data.nodes:
413       result.append("cluster has invalid primary node '%s'" %
414                     cluster.master_node)
415
416     def _helper(owner, attr, value, template):
417       try:
418         utils.ForceDictType(value, template)
419       except errors.GenericError, err:
420         result.append("%s has invalid %s: %s" % (owner, attr, err))
421
422     def _helper_nic(owner, params):
423       try:
424         objects.NIC.CheckParameterSyntax(params)
425       except errors.ConfigurationError, err:
426         result.append("%s has invalid nicparams: %s" % (owner, err))
427
428     # check cluster parameters
429     _helper("cluster", "beparams", cluster.SimpleFillBE({}),
430             constants.BES_PARAMETER_TYPES)
431     _helper("cluster", "nicparams", cluster.SimpleFillNIC({}),
432             constants.NICS_PARAMETER_TYPES)
433     _helper_nic("cluster", cluster.SimpleFillNIC({}))
434     _helper("cluster", "ndparams", cluster.SimpleFillND({}),
435             constants.NDS_PARAMETER_TYPES)
436
437     # per-instance checks
438     for instance_name in data.instances:
439       instance = data.instances[instance_name]
440       if instance.name != instance_name:
441         result.append("instance '%s' is indexed by wrong name '%s'" %
442                       (instance.name, instance_name))
443       if instance.primary_node not in data.nodes:
444         result.append("instance '%s' has invalid primary node '%s'" %
445                       (instance_name, instance.primary_node))
446       for snode in instance.secondary_nodes:
447         if snode not in data.nodes:
448           result.append("instance '%s' has invalid secondary node '%s'" %
449                         (instance_name, snode))
450       for idx, nic in enumerate(instance.nics):
451         if nic.mac in seen_macs:
452           result.append("instance '%s' has NIC %d mac %s duplicate" %
453                         (instance_name, idx, nic.mac))
454         else:
455           seen_macs.append(nic.mac)
456         if nic.nicparams:
457           filled = cluster.SimpleFillNIC(nic.nicparams)
458           owner = "instance %s nic %d" % (instance.name, idx)
459           _helper(owner, "nicparams",
460                   filled, constants.NICS_PARAMETER_TYPES)
461           _helper_nic(owner, filled)
462
463       # parameter checks
464       if instance.beparams:
465         _helper("instance %s" % instance.name, "beparams",
466                 cluster.FillBE(instance), constants.BES_PARAMETER_TYPES)
467
468       # gather the drbd ports for duplicate checks
469       for dsk in instance.disks:
470         if dsk.dev_type in constants.LDS_DRBD:
471           tcp_port = dsk.logical_id[2]
472           if tcp_port not in ports:
473             ports[tcp_port] = []
474           ports[tcp_port].append((instance.name, "drbd disk %s" % dsk.iv_name))
475       # gather network port reservation
476       net_port = getattr(instance, "network_port", None)
477       if net_port is not None:
478         if net_port not in ports:
479           ports[net_port] = []
480         ports[net_port].append((instance.name, "network port"))
481
482       # instance disk verify
483       for idx, disk in enumerate(instance.disks):
484         result.extend(["instance '%s' disk %d error: %s" %
485                        (instance.name, idx, msg) for msg in disk.Verify()])
486         result.extend(self._CheckDiskIDs(disk, seen_lids, seen_pids))
487
488     # cluster-wide pool of free ports
489     for free_port in cluster.tcpudp_port_pool:
490       if free_port not in ports:
491         ports[free_port] = []
492       ports[free_port].append(("cluster", "port marked as free"))
493
494     # compute tcp/udp duplicate ports
495     keys = ports.keys()
496     keys.sort()
497     for pnum in keys:
498       pdata = ports[pnum]
499       if len(pdata) > 1:
500         txt = utils.CommaJoin(["%s/%s" % val for val in pdata])
501         result.append("tcp/udp port %s has duplicates: %s" % (pnum, txt))
502
503     # highest used tcp port check
504     if keys:
505       if keys[-1] > cluster.highest_used_port:
506         result.append("Highest used port mismatch, saved %s, computed %s" %
507                       (cluster.highest_used_port, keys[-1]))
508
509     if not data.nodes[cluster.master_node].master_candidate:
510       result.append("Master node is not a master candidate")
511
512     # master candidate checks
513     mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats()
514     if mc_now < mc_max:
515       result.append("Not enough master candidates: actual %d, target %d" %
516                     (mc_now, mc_max))
517
518     # node checks
519     for node_name, node in data.nodes.items():
520       if node.name != node_name:
521         result.append("Node '%s' is indexed by wrong name '%s'" %
522                       (node.name, node_name))
523       if [node.master_candidate, node.drained, node.offline].count(True) > 1:
524         result.append("Node %s state is invalid: master_candidate=%s,"
525                       " drain=%s, offline=%s" %
526                       (node.name, node.master_candidate, node.drained,
527                        node.offline))
528       if node.group not in data.nodegroups:
529         result.append("Node '%s' has invalid group '%s'" %
530                       (node.name, node.group))
531       else:
532         _helper("node %s" % node.name, "ndparams",
533                 cluster.FillND(node, data.nodegroups[node.group]),
534                 constants.NDS_PARAMETER_TYPES)
535
536     # nodegroups checks
537     nodegroups_names = set()
538     for nodegroup_uuid in data.nodegroups:
539       nodegroup = data.nodegroups[nodegroup_uuid]
540       if nodegroup.uuid != nodegroup_uuid:
541         result.append("node group '%s' (uuid: '%s') indexed by wrong uuid '%s'"
542                       % (nodegroup.name, nodegroup.uuid, nodegroup_uuid))
543       if utils.UUID_RE.match(nodegroup.name.lower()):
544         result.append("node group '%s' (uuid: '%s') has uuid-like name" %
545                       (nodegroup.name, nodegroup.uuid))
546       if nodegroup.name in nodegroups_names:
547         result.append("duplicate node group name '%s'" % nodegroup.name)
548       else:
549         nodegroups_names.add(nodegroup.name)
550       if nodegroup.ndparams:
551         _helper("group %s" % nodegroup.name, "ndparams",
552                 cluster.SimpleFillND(nodegroup.ndparams),
553                 constants.NDS_PARAMETER_TYPES)
554
555     # drbd minors check
556     _, duplicates = self._UnlockedComputeDRBDMap()
557     for node, minor, instance_a, instance_b in duplicates:
558       result.append("DRBD minor %d on node %s is assigned twice to instances"
559                     " %s and %s" % (minor, node, instance_a, instance_b))
560
561     # IP checks
562     default_nicparams = cluster.nicparams[constants.PP_DEFAULT]
563     ips = {}
564
565     def _AddIpAddress(ip, name):
566       ips.setdefault(ip, []).append(name)
567
568     _AddIpAddress(cluster.master_ip, "cluster_ip")
569
570     for node in data.nodes.values():
571       _AddIpAddress(node.primary_ip, "node:%s/primary" % node.name)
572       if node.secondary_ip != node.primary_ip:
573         _AddIpAddress(node.secondary_ip, "node:%s/secondary" % node.name)
574
575     for instance in data.instances.values():
576       for idx, nic in enumerate(instance.nics):
577         if nic.ip is None:
578           continue
579
580         nicparams = objects.FillDict(default_nicparams, nic.nicparams)
581         nic_mode = nicparams[constants.NIC_MODE]
582         nic_link = nicparams[constants.NIC_LINK]
583
584         if nic_mode == constants.NIC_MODE_BRIDGED:
585           link = "bridge:%s" % nic_link
586         elif nic_mode == constants.NIC_MODE_ROUTED:
587           link = "route:%s" % nic_link
588         else:
589           raise errors.ProgrammerError("NIC mode '%s' not handled" % nic_mode)
590
591         _AddIpAddress("%s/%s" % (link, nic.ip),
592                       "instance:%s/nic:%d" % (instance.name, idx))
593
594     for ip, owners in ips.items():
595       if len(owners) > 1:
596         result.append("IP address %s is used by multiple owners: %s" %
597                       (ip, utils.CommaJoin(owners)))
598
599     return result
600
601   @locking.ssynchronized(_config_lock, shared=1)
602   def VerifyConfig(self):
603     """Verify function.
604
605     This is just a wrapper over L{_UnlockedVerifyConfig}.
606
607     @rtype: list
608     @return: a list of error messages; a non-empty list signifies
609         configuration errors
610
611     """
612     return self._UnlockedVerifyConfig()
613
614   def _UnlockedSetDiskID(self, disk, node_name):
615     """Convert the unique ID to the ID needed on the target nodes.
616
617     This is used only for drbd, which needs ip/port configuration.
618
619     The routine descends down and updates its children also, because
620     this helps when the only the top device is passed to the remote
621     node.
622
623     This function is for internal use, when the config lock is already held.
624
625     """
626     if disk.children:
627       for child in disk.children:
628         self._UnlockedSetDiskID(child, node_name)
629
630     if disk.logical_id is None and disk.physical_id is not None:
631       return
632     if disk.dev_type == constants.LD_DRBD8:
633       pnode, snode, port, pminor, sminor, secret = disk.logical_id
634       if node_name not in (pnode, snode):
635         raise errors.ConfigurationError("DRBD device not knowing node %s" %
636                                         node_name)
637       pnode_info = self._UnlockedGetNodeInfo(pnode)
638       snode_info = self._UnlockedGetNodeInfo(snode)
639       if pnode_info is None or snode_info is None:
640         raise errors.ConfigurationError("Can't find primary or secondary node"
641                                         " for %s" % str(disk))
642       p_data = (pnode_info.secondary_ip, port)
643       s_data = (snode_info.secondary_ip, port)
644       if pnode == node_name:
645         disk.physical_id = p_data + s_data + (pminor, secret)
646       else: # it must be secondary, we tested above
647         disk.physical_id = s_data + p_data + (sminor, secret)
648     else:
649       disk.physical_id = disk.logical_id
650     return
651
652   @locking.ssynchronized(_config_lock)
653   def SetDiskID(self, disk, node_name):
654     """Convert the unique ID to the ID needed on the target nodes.
655
656     This is used only for drbd, which needs ip/port configuration.
657
658     The routine descends down and updates its children also, because
659     this helps when the only the top device is passed to the remote
660     node.
661
662     """
663     return self._UnlockedSetDiskID(disk, node_name)
664
665   @locking.ssynchronized(_config_lock)
666   def AddTcpUdpPort(self, port):
667     """Adds a new port to the available port pool.
668
669     """
670     if not isinstance(port, int):
671       raise errors.ProgrammerError("Invalid type passed for port")
672
673     self._config_data.cluster.tcpudp_port_pool.add(port)
674     self._WriteConfig()
675
676   @locking.ssynchronized(_config_lock, shared=1)
677   def GetPortList(self):
678     """Returns a copy of the current port list.
679
680     """
681     return self._config_data.cluster.tcpudp_port_pool.copy()
682
683   @locking.ssynchronized(_config_lock)
684   def AllocatePort(self):
685     """Allocate a port.
686
687     The port will be taken from the available port pool or from the
688     default port range (and in this case we increase
689     highest_used_port).
690
691     """
692     # If there are TCP/IP ports configured, we use them first.
693     if self._config_data.cluster.tcpudp_port_pool:
694       port = self._config_data.cluster.tcpudp_port_pool.pop()
695     else:
696       port = self._config_data.cluster.highest_used_port + 1
697       if port >= constants.LAST_DRBD_PORT:
698         raise errors.ConfigurationError("The highest used port is greater"
699                                         " than %s. Aborting." %
700                                         constants.LAST_DRBD_PORT)
701       self._config_data.cluster.highest_used_port = port
702
703     self._WriteConfig()
704     return port
705
706   def _UnlockedComputeDRBDMap(self):
707     """Compute the used DRBD minor/nodes.
708
709     @rtype: (dict, list)
710     @return: dictionary of node_name: dict of minor: instance_name;
711         the returned dict will have all the nodes in it (even if with
712         an empty list), and a list of duplicates; if the duplicates
713         list is not empty, the configuration is corrupted and its caller
714         should raise an exception
715
716     """
717     def _AppendUsedPorts(instance_name, disk, used):
718       duplicates = []
719       if disk.dev_type == constants.LD_DRBD8 and len(disk.logical_id) >= 5:
720         node_a, node_b, _, minor_a, minor_b = disk.logical_id[:5]
721         for node, port in ((node_a, minor_a), (node_b, minor_b)):
722           assert node in used, ("Node '%s' of instance '%s' not found"
723                                 " in node list" % (node, instance_name))
724           if port in used[node]:
725             duplicates.append((node, port, instance_name, used[node][port]))
726           else:
727             used[node][port] = instance_name
728       if disk.children:
729         for child in disk.children:
730           duplicates.extend(_AppendUsedPorts(instance_name, child, used))
731       return duplicates
732
733     duplicates = []
734     my_dict = dict((node, {}) for node in self._config_data.nodes)
735     for instance in self._config_data.instances.itervalues():
736       for disk in instance.disks:
737         duplicates.extend(_AppendUsedPorts(instance.name, disk, my_dict))
738     for (node, minor), instance in self._temporary_drbds.iteritems():
739       if minor in my_dict[node] and my_dict[node][minor] != instance:
740         duplicates.append((node, minor, instance, my_dict[node][minor]))
741       else:
742         my_dict[node][minor] = instance
743     return my_dict, duplicates
744
745   @locking.ssynchronized(_config_lock)
746   def ComputeDRBDMap(self):
747     """Compute the used DRBD minor/nodes.
748
749     This is just a wrapper over L{_UnlockedComputeDRBDMap}.
750
751     @return: dictionary of node_name: dict of minor: instance_name;
752         the returned dict will have all the nodes in it (even if with
753         an empty list).
754
755     """
756     d_map, duplicates = self._UnlockedComputeDRBDMap()
757     if duplicates:
758       raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
759                                       str(duplicates))
760     return d_map
761
762   @locking.ssynchronized(_config_lock)
763   def AllocateDRBDMinor(self, nodes, instance):
764     """Allocate a drbd minor.
765
766     The free minor will be automatically computed from the existing
767     devices. A node can be given multiple times in order to allocate
768     multiple minors. The result is the list of minors, in the same
769     order as the passed nodes.
770
771     @type instance: string
772     @param instance: the instance for which we allocate minors
773
774     """
775     assert isinstance(instance, basestring), \
776            "Invalid argument '%s' passed to AllocateDRBDMinor" % instance
777
778     d_map, duplicates = self._UnlockedComputeDRBDMap()
779     if duplicates:
780       raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
781                                       str(duplicates))
782     result = []
783     for nname in nodes:
784       ndata = d_map[nname]
785       if not ndata:
786         # no minors used, we can start at 0
787         result.append(0)
788         ndata[0] = instance
789         self._temporary_drbds[(nname, 0)] = instance
790         continue
791       keys = ndata.keys()
792       keys.sort()
793       ffree = utils.FirstFree(keys)
794       if ffree is None:
795         # return the next minor
796         # TODO: implement high-limit check
797         minor = keys[-1] + 1
798       else:
799         minor = ffree
800       # double-check minor against current instances
801       assert minor not in d_map[nname], \
802              ("Attempt to reuse allocated DRBD minor %d on node %s,"
803               " already allocated to instance %s" %
804               (minor, nname, d_map[nname][minor]))
805       ndata[minor] = instance
806       # double-check minor against reservation
807       r_key = (nname, minor)
808       assert r_key not in self._temporary_drbds, \
809              ("Attempt to reuse reserved DRBD minor %d on node %s,"
810               " reserved for instance %s" %
811               (minor, nname, self._temporary_drbds[r_key]))
812       self._temporary_drbds[r_key] = instance
813       result.append(minor)
814     logging.debug("Request to allocate drbd minors, input: %s, returning %s",
815                   nodes, result)
816     return result
817
818   def _UnlockedReleaseDRBDMinors(self, instance):
819     """Release temporary drbd minors allocated for a given instance.
820
821     @type instance: string
822     @param instance: the instance for which temporary minors should be
823                      released
824
825     """
826     assert isinstance(instance, basestring), \
827            "Invalid argument passed to ReleaseDRBDMinors"
828     for key, name in self._temporary_drbds.items():
829       if name == instance:
830         del self._temporary_drbds[key]
831
832   @locking.ssynchronized(_config_lock)
833   def ReleaseDRBDMinors(self, instance):
834     """Release temporary drbd minors allocated for a given instance.
835
836     This should be called on the error paths, on the success paths
837     it's automatically called by the ConfigWriter add and update
838     functions.
839
840     This function is just a wrapper over L{_UnlockedReleaseDRBDMinors}.
841
842     @type instance: string
843     @param instance: the instance for which temporary minors should be
844                      released
845
846     """
847     self._UnlockedReleaseDRBDMinors(instance)
848
849   @locking.ssynchronized(_config_lock, shared=1)
850   def GetConfigVersion(self):
851     """Get the configuration version.
852
853     @return: Config version
854
855     """
856     return self._config_data.version
857
858   @locking.ssynchronized(_config_lock, shared=1)
859   def GetClusterName(self):
860     """Get cluster name.
861
862     @return: Cluster name
863
864     """
865     return self._config_data.cluster.cluster_name
866
867   @locking.ssynchronized(_config_lock, shared=1)
868   def GetMasterNode(self):
869     """Get the hostname of the master node for this cluster.
870
871     @return: Master hostname
872
873     """
874     return self._config_data.cluster.master_node
875
876   @locking.ssynchronized(_config_lock, shared=1)
877   def GetMasterIP(self):
878     """Get the IP of the master node for this cluster.
879
880     @return: Master IP
881
882     """
883     return self._config_data.cluster.master_ip
884
885   @locking.ssynchronized(_config_lock, shared=1)
886   def GetMasterNetdev(self):
887     """Get the master network device for this cluster.
888
889     """
890     return self._config_data.cluster.master_netdev
891
892   @locking.ssynchronized(_config_lock, shared=1)
893   def GetMasterNetmask(self):
894     """Get the netmask of the master node for this cluster.
895
896     """
897     return self._config_data.cluster.master_netmask
898
899   @locking.ssynchronized(_config_lock, shared=1)
900   def GetUseExternalMipScript(self):
901     """Get flag representing whether to use the external master IP setup script.
902
903     """
904     return self._config_data.cluster.use_external_mip_script
905
906   @locking.ssynchronized(_config_lock, shared=1)
907   def GetFileStorageDir(self):
908     """Get the file storage dir for this cluster.
909
910     """
911     return self._config_data.cluster.file_storage_dir
912
913   @locking.ssynchronized(_config_lock, shared=1)
914   def GetSharedFileStorageDir(self):
915     """Get the shared file storage dir for this cluster.
916
917     """
918     return self._config_data.cluster.shared_file_storage_dir
919
920   @locking.ssynchronized(_config_lock, shared=1)
921   def GetHypervisorType(self):
922     """Get the hypervisor type for this cluster.
923
924     """
925     return self._config_data.cluster.enabled_hypervisors[0]
926
927   @locking.ssynchronized(_config_lock, shared=1)
928   def GetHostKey(self):
929     """Return the rsa hostkey from the config.
930
931     @rtype: string
932     @return: the rsa hostkey
933
934     """
935     return self._config_data.cluster.rsahostkeypub
936
937   @locking.ssynchronized(_config_lock, shared=1)
938   def GetDefaultIAllocator(self):
939     """Get the default instance allocator for this cluster.
940
941     """
942     return self._config_data.cluster.default_iallocator
943
944   @locking.ssynchronized(_config_lock, shared=1)
945   def GetPrimaryIPFamily(self):
946     """Get cluster primary ip family.
947
948     @return: primary ip family
949
950     """
951     return self._config_data.cluster.primary_ip_family
952
953   @locking.ssynchronized(_config_lock, shared=1)
954   def GetMasterNetworkParameters(self):
955     """Get network parameters of the master node.
956
957     @rtype: L{object.MasterNetworkParameters}
958     @return: network parameters of the master node
959
960     """
961     cluster = self._config_data.cluster
962     result = objects.MasterNetworkParameters(name=cluster.master_node,
963       ip=cluster.master_ip,
964       netmask=cluster.master_netmask,
965       netdev=cluster.master_netdev,
966       ip_family=cluster.primary_ip_family)
967
968     return result
969
970   @locking.ssynchronized(_config_lock)
971   def AddNodeGroup(self, group, ec_id, check_uuid=True):
972     """Add a node group to the configuration.
973
974     This method calls group.UpgradeConfig() to fill any missing attributes
975     according to their default values.
976
977     @type group: L{objects.NodeGroup}
978     @param group: the NodeGroup object to add
979     @type ec_id: string
980     @param ec_id: unique id for the job to use when creating a missing UUID
981     @type check_uuid: bool
982     @param check_uuid: add an UUID to the group if it doesn't have one or, if
983                        it does, ensure that it does not exist in the
984                        configuration already
985
986     """
987     self._UnlockedAddNodeGroup(group, ec_id, check_uuid)
988     self._WriteConfig()
989
990   def _UnlockedAddNodeGroup(self, group, ec_id, check_uuid):
991     """Add a node group to the configuration.
992
993     """
994     logging.info("Adding node group %s to configuration", group.name)
995
996     # Some code might need to add a node group with a pre-populated UUID
997     # generated with ConfigWriter.GenerateUniqueID(). We allow them to bypass
998     # the "does this UUID" exist already check.
999     if check_uuid:
1000       self._EnsureUUID(group, ec_id)
1001
1002     try:
1003       existing_uuid = self._UnlockedLookupNodeGroup(group.name)
1004     except errors.OpPrereqError:
1005       pass
1006     else:
1007       raise errors.OpPrereqError("Desired group name '%s' already exists as a"
1008                                  " node group (UUID: %s)" %
1009                                  (group.name, existing_uuid),
1010                                  errors.ECODE_EXISTS)
1011
1012     group.serial_no = 1
1013     group.ctime = group.mtime = time.time()
1014     group.UpgradeConfig()
1015
1016     self._config_data.nodegroups[group.uuid] = group
1017     self._config_data.cluster.serial_no += 1
1018
1019   @locking.ssynchronized(_config_lock)
1020   def RemoveNodeGroup(self, group_uuid):
1021     """Remove a node group from the configuration.
1022
1023     @type group_uuid: string
1024     @param group_uuid: the UUID of the node group to remove
1025
1026     """
1027     logging.info("Removing node group %s from configuration", group_uuid)
1028
1029     if group_uuid not in self._config_data.nodegroups:
1030       raise errors.ConfigurationError("Unknown node group '%s'" % group_uuid)
1031
1032     assert len(self._config_data.nodegroups) != 1, \
1033             "Group '%s' is the only group, cannot be removed" % group_uuid
1034
1035     del self._config_data.nodegroups[group_uuid]
1036     self._config_data.cluster.serial_no += 1
1037     self._WriteConfig()
1038
1039   def _UnlockedLookupNodeGroup(self, target):
1040     """Lookup a node group's UUID.
1041
1042     @type target: string or None
1043     @param target: group name or UUID or None to look for the default
1044     @rtype: string
1045     @return: nodegroup UUID
1046     @raises errors.OpPrereqError: when the target group cannot be found
1047
1048     """
1049     if target is None:
1050       if len(self._config_data.nodegroups) != 1:
1051         raise errors.OpPrereqError("More than one node group exists. Target"
1052                                    " group must be specified explicitely.")
1053       else:
1054         return self._config_data.nodegroups.keys()[0]
1055     if target in self._config_data.nodegroups:
1056       return target
1057     for nodegroup in self._config_data.nodegroups.values():
1058       if nodegroup.name == target:
1059         return nodegroup.uuid
1060     raise errors.OpPrereqError("Node group '%s' not found" % target,
1061                                errors.ECODE_NOENT)
1062
1063   @locking.ssynchronized(_config_lock, shared=1)
1064   def LookupNodeGroup(self, target):
1065     """Lookup a node group's UUID.
1066
1067     This function is just a wrapper over L{_UnlockedLookupNodeGroup}.
1068
1069     @type target: string or None
1070     @param target: group name or UUID or None to look for the default
1071     @rtype: string
1072     @return: nodegroup UUID
1073
1074     """
1075     return self._UnlockedLookupNodeGroup(target)
1076
1077   def _UnlockedGetNodeGroup(self, uuid):
1078     """Lookup a node group.
1079
1080     @type uuid: string
1081     @param uuid: group UUID
1082     @rtype: L{objects.NodeGroup} or None
1083     @return: nodegroup object, or None if not found
1084
1085     """
1086     if uuid not in self._config_data.nodegroups:
1087       return None
1088
1089     return self._config_data.nodegroups[uuid]
1090
1091   @locking.ssynchronized(_config_lock, shared=1)
1092   def GetNodeGroup(self, uuid):
1093     """Lookup a node group.
1094
1095     @type uuid: string
1096     @param uuid: group UUID
1097     @rtype: L{objects.NodeGroup} or None
1098     @return: nodegroup object, or None if not found
1099
1100     """
1101     return self._UnlockedGetNodeGroup(uuid)
1102
1103   @locking.ssynchronized(_config_lock, shared=1)
1104   def GetAllNodeGroupsInfo(self):
1105     """Get the configuration of all node groups.
1106
1107     """
1108     return dict(self._config_data.nodegroups)
1109
1110   @locking.ssynchronized(_config_lock, shared=1)
1111   def GetNodeGroupList(self):
1112     """Get a list of node groups.
1113
1114     """
1115     return self._config_data.nodegroups.keys()
1116
1117   @locking.ssynchronized(_config_lock, shared=1)
1118   def GetNodeGroupMembersByNodes(self, nodes):
1119     """Get nodes which are member in the same nodegroups as the given nodes.
1120
1121     """
1122     ngfn = lambda node_name: self._UnlockedGetNodeInfo(node_name).group
1123     return frozenset(member_name
1124                      for node_name in nodes
1125                      for member_name in
1126                        self._UnlockedGetNodeGroup(ngfn(node_name)).members)
1127
1128   @locking.ssynchronized(_config_lock)
1129   def AddInstance(self, instance, ec_id):
1130     """Add an instance to the config.
1131
1132     This should be used after creating a new instance.
1133
1134     @type instance: L{objects.Instance}
1135     @param instance: the instance object
1136
1137     """
1138     if not isinstance(instance, objects.Instance):
1139       raise errors.ProgrammerError("Invalid type passed to AddInstance")
1140
1141     if instance.disk_template != constants.DT_DISKLESS:
1142       all_lvs = instance.MapLVsByNode()
1143       logging.info("Instance '%s' DISK_LAYOUT: %s", instance.name, all_lvs)
1144
1145     all_macs = self._AllMACs()
1146     for nic in instance.nics:
1147       if nic.mac in all_macs:
1148         raise errors.ConfigurationError("Cannot add instance %s:"
1149                                         " MAC address '%s' already in use." %
1150                                         (instance.name, nic.mac))
1151
1152     self._EnsureUUID(instance, ec_id)
1153
1154     instance.serial_no = 1
1155     instance.ctime = instance.mtime = time.time()
1156     self._config_data.instances[instance.name] = instance
1157     self._config_data.cluster.serial_no += 1
1158     self._UnlockedReleaseDRBDMinors(instance.name)
1159     self._WriteConfig()
1160
1161   def _EnsureUUID(self, item, ec_id):
1162     """Ensures a given object has a valid UUID.
1163
1164     @param item: the instance or node to be checked
1165     @param ec_id: the execution context id for the uuid reservation
1166
1167     """
1168     if not item.uuid:
1169       item.uuid = self._GenerateUniqueID(ec_id)
1170     elif item.uuid in self._AllIDs(include_temporary=True):
1171       raise errors.ConfigurationError("Cannot add '%s': UUID %s already"
1172                                       " in use" % (item.name, item.uuid))
1173
1174   def _SetInstanceStatus(self, instance_name, status):
1175     """Set the instance's status to a given value.
1176
1177     """
1178     assert isinstance(status, bool), \
1179            "Invalid status '%s' passed to SetInstanceStatus" % (status,)
1180
1181     if instance_name not in self._config_data.instances:
1182       raise errors.ConfigurationError("Unknown instance '%s'" %
1183                                       instance_name)
1184     instance = self._config_data.instances[instance_name]
1185     if instance.admin_up != status:
1186       instance.admin_up = status
1187       instance.serial_no += 1
1188       instance.mtime = time.time()
1189       self._WriteConfig()
1190
1191   @locking.ssynchronized(_config_lock)
1192   def MarkInstanceUp(self, instance_name):
1193     """Mark the instance status to up in the config.
1194
1195     """
1196     self._SetInstanceStatus(instance_name, True)
1197
1198   @locking.ssynchronized(_config_lock)
1199   def RemoveInstance(self, instance_name):
1200     """Remove the instance from the configuration.
1201
1202     """
1203     if instance_name not in self._config_data.instances:
1204       raise errors.ConfigurationError("Unknown instance '%s'" % instance_name)
1205     del self._config_data.instances[instance_name]
1206     self._config_data.cluster.serial_no += 1
1207     self._WriteConfig()
1208
1209   @locking.ssynchronized(_config_lock)
1210   def RenameInstance(self, old_name, new_name):
1211     """Rename an instance.
1212
1213     This needs to be done in ConfigWriter and not by RemoveInstance
1214     combined with AddInstance as only we can guarantee an atomic
1215     rename.
1216
1217     """
1218     if old_name not in self._config_data.instances:
1219       raise errors.ConfigurationError("Unknown instance '%s'" % old_name)
1220     inst = self._config_data.instances[old_name]
1221     del self._config_data.instances[old_name]
1222     inst.name = new_name
1223
1224     for disk in inst.disks:
1225       if disk.dev_type == constants.LD_FILE:
1226         # rename the file paths in logical and physical id
1227         file_storage_dir = os.path.dirname(os.path.dirname(disk.logical_id[1]))
1228         disk_fname = "disk%s" % disk.iv_name.split("/")[1]
1229         disk.physical_id = disk.logical_id = (disk.logical_id[0],
1230                                               utils.PathJoin(file_storage_dir,
1231                                                              inst.name,
1232                                                              disk_fname))
1233
1234     # Force update of ssconf files
1235     self._config_data.cluster.serial_no += 1
1236
1237     self._config_data.instances[inst.name] = inst
1238     self._WriteConfig()
1239
1240   @locking.ssynchronized(_config_lock)
1241   def MarkInstanceDown(self, instance_name):
1242     """Mark the status of an instance to down in the configuration.
1243
1244     """
1245     self._SetInstanceStatus(instance_name, False)
1246
1247   def _UnlockedGetInstanceList(self):
1248     """Get the list of instances.
1249
1250     This function is for internal use, when the config lock is already held.
1251
1252     """
1253     return self._config_data.instances.keys()
1254
1255   @locking.ssynchronized(_config_lock, shared=1)
1256   def GetInstanceList(self):
1257     """Get the list of instances.
1258
1259     @return: array of instances, ex. ['instance2.example.com',
1260         'instance1.example.com']
1261
1262     """
1263     return self._UnlockedGetInstanceList()
1264
1265   def ExpandInstanceName(self, short_name):
1266     """Attempt to expand an incomplete instance name.
1267
1268     """
1269     # Locking is done in L{ConfigWriter.GetInstanceList}
1270     return _MatchNameComponentIgnoreCase(short_name, self.GetInstanceList())
1271
1272   def _UnlockedGetInstanceInfo(self, instance_name):
1273     """Returns information about an instance.
1274
1275     This function is for internal use, when the config lock is already held.
1276
1277     """
1278     if instance_name not in self._config_data.instances:
1279       return None
1280
1281     return self._config_data.instances[instance_name]
1282
1283   @locking.ssynchronized(_config_lock, shared=1)
1284   def GetInstanceInfo(self, instance_name):
1285     """Returns information about an instance.
1286
1287     It takes the information from the configuration file. Other information of
1288     an instance are taken from the live systems.
1289
1290     @param instance_name: name of the instance, e.g.
1291         I{instance1.example.com}
1292
1293     @rtype: L{objects.Instance}
1294     @return: the instance object
1295
1296     """
1297     return self._UnlockedGetInstanceInfo(instance_name)
1298
1299   @locking.ssynchronized(_config_lock, shared=1)
1300   def GetInstanceNodeGroups(self, instance_name, primary_only=False):
1301     """Returns set of node group UUIDs for instance's nodes.
1302
1303     @rtype: frozenset
1304
1305     """
1306     instance = self._UnlockedGetInstanceInfo(instance_name)
1307     if not instance:
1308       raise errors.ConfigurationError("Unknown instance '%s'" % instance_name)
1309
1310     if primary_only:
1311       nodes = [instance.primary_node]
1312     else:
1313       nodes = instance.all_nodes
1314
1315     return frozenset(self._UnlockedGetNodeInfo(node_name).group
1316                      for node_name in nodes)
1317
1318   @locking.ssynchronized(_config_lock, shared=1)
1319   def GetMultiInstanceInfo(self, instances):
1320     """Get the configuration of multiple instances.
1321
1322     @param instances: list of instance names
1323     @rtype: list
1324     @return: list of tuples (instance, instance_info), where
1325         instance_info is what would GetInstanceInfo return for the
1326         node, while keeping the original order
1327
1328     """
1329     return [(name, self._UnlockedGetInstanceInfo(name)) for name in instances]
1330
1331   @locking.ssynchronized(_config_lock, shared=1)
1332   def GetAllInstancesInfo(self):
1333     """Get the configuration of all instances.
1334
1335     @rtype: dict
1336     @return: dict of (instance, instance_info), where instance_info is what
1337               would GetInstanceInfo return for the node
1338
1339     """
1340     my_dict = dict([(instance, self._UnlockedGetInstanceInfo(instance))
1341                     for instance in self._UnlockedGetInstanceList()])
1342     return my_dict
1343
1344   @locking.ssynchronized(_config_lock, shared=1)
1345   def GetInstancesInfoByFilter(self, filter_fn):
1346     """Get instance configuration with a filter.
1347
1348     @type filter_fn: callable
1349     @param filter_fn: Filter function receiving instance object as parameter,
1350       returning boolean. Important: this function is called while the
1351       configuration locks is held. It must not do any complex work or call
1352       functions potentially leading to a deadlock. Ideally it doesn't call any
1353       other functions and just compares instance attributes.
1354
1355     """
1356     return dict((name, inst)
1357                 for (name, inst) in self._config_data.instances.items()
1358                 if filter_fn(inst))
1359
1360   @locking.ssynchronized(_config_lock)
1361   def AddNode(self, node, ec_id):
1362     """Add a node to the configuration.
1363
1364     @type node: L{objects.Node}
1365     @param node: a Node instance
1366
1367     """
1368     logging.info("Adding node %s to configuration", node.name)
1369
1370     self._EnsureUUID(node, ec_id)
1371
1372     node.serial_no = 1
1373     node.ctime = node.mtime = time.time()
1374     self._UnlockedAddNodeToGroup(node.name, node.group)
1375     self._config_data.nodes[node.name] = node
1376     self._config_data.cluster.serial_no += 1
1377     self._WriteConfig()
1378
1379   @locking.ssynchronized(_config_lock)
1380   def RemoveNode(self, node_name):
1381     """Remove a node from the configuration.
1382
1383     """
1384     logging.info("Removing node %s from configuration", node_name)
1385
1386     if node_name not in self._config_data.nodes:
1387       raise errors.ConfigurationError("Unknown node '%s'" % node_name)
1388
1389     self._UnlockedRemoveNodeFromGroup(self._config_data.nodes[node_name])
1390     del self._config_data.nodes[node_name]
1391     self._config_data.cluster.serial_no += 1
1392     self._WriteConfig()
1393
1394   def ExpandNodeName(self, short_name):
1395     """Attempt to expand an incomplete node name.
1396
1397     """
1398     # Locking is done in L{ConfigWriter.GetNodeList}
1399     return _MatchNameComponentIgnoreCase(short_name, self.GetNodeList())
1400
1401   def _UnlockedGetNodeInfo(self, node_name):
1402     """Get the configuration of a node, as stored in the config.
1403
1404     This function is for internal use, when the config lock is already
1405     held.
1406
1407     @param node_name: the node name, e.g. I{node1.example.com}
1408
1409     @rtype: L{objects.Node}
1410     @return: the node object
1411
1412     """
1413     if node_name not in self._config_data.nodes:
1414       return None
1415
1416     return self._config_data.nodes[node_name]
1417
1418   @locking.ssynchronized(_config_lock, shared=1)
1419   def GetNodeInfo(self, node_name):
1420     """Get the configuration of a node, as stored in the config.
1421
1422     This is just a locked wrapper over L{_UnlockedGetNodeInfo}.
1423
1424     @param node_name: the node name, e.g. I{node1.example.com}
1425
1426     @rtype: L{objects.Node}
1427     @return: the node object
1428
1429     """
1430     return self._UnlockedGetNodeInfo(node_name)
1431
1432   @locking.ssynchronized(_config_lock, shared=1)
1433   def GetNodeInstances(self, node_name):
1434     """Get the instances of a node, as stored in the config.
1435
1436     @param node_name: the node name, e.g. I{node1.example.com}
1437
1438     @rtype: (list, list)
1439     @return: a tuple with two lists: the primary and the secondary instances
1440
1441     """
1442     pri = []
1443     sec = []
1444     for inst in self._config_data.instances.values():
1445       if inst.primary_node == node_name:
1446         pri.append(inst.name)
1447       if node_name in inst.secondary_nodes:
1448         sec.append(inst.name)
1449     return (pri, sec)
1450
1451   @locking.ssynchronized(_config_lock, shared=1)
1452   def GetNodeGroupInstances(self, uuid, primary_only=False):
1453     """Get the instances of a node group.
1454
1455     @param uuid: Node group UUID
1456     @param primary_only: Whether to only consider primary nodes
1457     @rtype: frozenset
1458     @return: List of instance names in node group
1459
1460     """
1461     if primary_only:
1462       nodes_fn = lambda inst: [inst.primary_node]
1463     else:
1464       nodes_fn = lambda inst: inst.all_nodes
1465
1466     return frozenset(inst.name
1467                      for inst in self._config_data.instances.values()
1468                      for node_name in nodes_fn(inst)
1469                      if self._UnlockedGetNodeInfo(node_name).group == uuid)
1470
1471   def _UnlockedGetNodeList(self):
1472     """Return the list of nodes which are in the configuration.
1473
1474     This function is for internal use, when the config lock is already
1475     held.
1476
1477     @rtype: list
1478
1479     """
1480     return self._config_data.nodes.keys()
1481
1482   @locking.ssynchronized(_config_lock, shared=1)
1483   def GetNodeList(self):
1484     """Return the list of nodes which are in the configuration.
1485
1486     """
1487     return self._UnlockedGetNodeList()
1488
1489   def _UnlockedGetOnlineNodeList(self):
1490     """Return the list of nodes which are online.
1491
1492     """
1493     all_nodes = [self._UnlockedGetNodeInfo(node)
1494                  for node in self._UnlockedGetNodeList()]
1495     return [node.name for node in all_nodes if not node.offline]
1496
1497   @locking.ssynchronized(_config_lock, shared=1)
1498   def GetOnlineNodeList(self):
1499     """Return the list of nodes which are online.
1500
1501     """
1502     return self._UnlockedGetOnlineNodeList()
1503
1504   @locking.ssynchronized(_config_lock, shared=1)
1505   def GetVmCapableNodeList(self):
1506     """Return the list of nodes which are not vm capable.
1507
1508     """
1509     all_nodes = [self._UnlockedGetNodeInfo(node)
1510                  for node in self._UnlockedGetNodeList()]
1511     return [node.name for node in all_nodes if node.vm_capable]
1512
1513   @locking.ssynchronized(_config_lock, shared=1)
1514   def GetNonVmCapableNodeList(self):
1515     """Return the list of nodes which are not vm capable.
1516
1517     """
1518     all_nodes = [self._UnlockedGetNodeInfo(node)
1519                  for node in self._UnlockedGetNodeList()]
1520     return [node.name for node in all_nodes if not node.vm_capable]
1521
1522   @locking.ssynchronized(_config_lock, shared=1)
1523   def GetMultiNodeInfo(self, nodes):
1524     """Get the configuration of multiple nodes.
1525
1526     @param nodes: list of node names
1527     @rtype: list
1528     @return: list of tuples of (node, node_info), where node_info is
1529         what would GetNodeInfo return for the node, in the original
1530         order
1531
1532     """
1533     return [(name, self._UnlockedGetNodeInfo(name)) for name in nodes]
1534
1535   @locking.ssynchronized(_config_lock, shared=1)
1536   def GetAllNodesInfo(self):
1537     """Get the configuration of all nodes.
1538
1539     @rtype: dict
1540     @return: dict of (node, node_info), where node_info is what
1541               would GetNodeInfo return for the node
1542
1543     """
1544     return self._UnlockedGetAllNodesInfo()
1545
1546   def _UnlockedGetAllNodesInfo(self):
1547     """Gets configuration of all nodes.
1548
1549     @note: See L{GetAllNodesInfo}
1550
1551     """
1552     return dict([(node, self._UnlockedGetNodeInfo(node))
1553                  for node in self._UnlockedGetNodeList()])
1554
1555   @locking.ssynchronized(_config_lock, shared=1)
1556   def GetNodeGroupsFromNodes(self, nodes):
1557     """Returns groups for a list of nodes.
1558
1559     @type nodes: list of string
1560     @param nodes: List of node names
1561     @rtype: frozenset
1562
1563     """
1564     return frozenset(self._UnlockedGetNodeInfo(name).group for name in nodes)
1565
1566   def _UnlockedGetMasterCandidateStats(self, exceptions=None):
1567     """Get the number of current and maximum desired and possible candidates.
1568
1569     @type exceptions: list
1570     @param exceptions: if passed, list of nodes that should be ignored
1571     @rtype: tuple
1572     @return: tuple of (current, desired and possible, possible)
1573
1574     """
1575     mc_now = mc_should = mc_max = 0
1576     for node in self._config_data.nodes.values():
1577       if exceptions and node.name in exceptions:
1578         continue
1579       if not (node.offline or node.drained) and node.master_capable:
1580         mc_max += 1
1581       if node.master_candidate:
1582         mc_now += 1
1583     mc_should = min(mc_max, self._config_data.cluster.candidate_pool_size)
1584     return (mc_now, mc_should, mc_max)
1585
1586   @locking.ssynchronized(_config_lock, shared=1)
1587   def GetMasterCandidateStats(self, exceptions=None):
1588     """Get the number of current and maximum possible candidates.
1589
1590     This is just a wrapper over L{_UnlockedGetMasterCandidateStats}.
1591
1592     @type exceptions: list
1593     @param exceptions: if passed, list of nodes that should be ignored
1594     @rtype: tuple
1595     @return: tuple of (current, max)
1596
1597     """
1598     return self._UnlockedGetMasterCandidateStats(exceptions)
1599
1600   @locking.ssynchronized(_config_lock)
1601   def MaintainCandidatePool(self, exceptions):
1602     """Try to grow the candidate pool to the desired size.
1603
1604     @type exceptions: list
1605     @param exceptions: if passed, list of nodes that should be ignored
1606     @rtype: list
1607     @return: list with the adjusted nodes (L{objects.Node} instances)
1608
1609     """
1610     mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats(exceptions)
1611     mod_list = []
1612     if mc_now < mc_max:
1613       node_list = self._config_data.nodes.keys()
1614       random.shuffle(node_list)
1615       for name in node_list:
1616         if mc_now >= mc_max:
1617           break
1618         node = self._config_data.nodes[name]
1619         if (node.master_candidate or node.offline or node.drained or
1620             node.name in exceptions or not node.master_capable):
1621           continue
1622         mod_list.append(node)
1623         node.master_candidate = True
1624         node.serial_no += 1
1625         mc_now += 1
1626       if mc_now != mc_max:
1627         # this should not happen
1628         logging.warning("Warning: MaintainCandidatePool didn't manage to"
1629                         " fill the candidate pool (%d/%d)", mc_now, mc_max)
1630       if mod_list:
1631         self._config_data.cluster.serial_no += 1
1632         self._WriteConfig()
1633
1634     return mod_list
1635
1636   def _UnlockedAddNodeToGroup(self, node_name, nodegroup_uuid):
1637     """Add a given node to the specified group.
1638
1639     """
1640     if nodegroup_uuid not in self._config_data.nodegroups:
1641       # This can happen if a node group gets deleted between its lookup and
1642       # when we're adding the first node to it, since we don't keep a lock in
1643       # the meantime. It's ok though, as we'll fail cleanly if the node group
1644       # is not found anymore.
1645       raise errors.OpExecError("Unknown node group: %s" % nodegroup_uuid)
1646     if node_name not in self._config_data.nodegroups[nodegroup_uuid].members:
1647       self._config_data.nodegroups[nodegroup_uuid].members.append(node_name)
1648
1649   def _UnlockedRemoveNodeFromGroup(self, node):
1650     """Remove a given node from its group.
1651
1652     """
1653     nodegroup = node.group
1654     if nodegroup not in self._config_data.nodegroups:
1655       logging.warning("Warning: node '%s' has unknown node group '%s'"
1656                       " (while being removed from it)", node.name, nodegroup)
1657     nodegroup_obj = self._config_data.nodegroups[nodegroup]
1658     if node.name not in nodegroup_obj.members:
1659       logging.warning("Warning: node '%s' not a member of its node group '%s'"
1660                       " (while being removed from it)", node.name, nodegroup)
1661     else:
1662       nodegroup_obj.members.remove(node.name)
1663
1664   def _BumpSerialNo(self):
1665     """Bump up the serial number of the config.
1666
1667     """
1668     self._config_data.serial_no += 1
1669     self._config_data.mtime = time.time()
1670
1671   def _AllUUIDObjects(self):
1672     """Returns all objects with uuid attributes.
1673
1674     """
1675     return (self._config_data.instances.values() +
1676             self._config_data.nodes.values() +
1677             self._config_data.nodegroups.values() +
1678             [self._config_data.cluster])
1679
1680   def _OpenConfig(self, accept_foreign):
1681     """Read the config data from disk.
1682
1683     """
1684     raw_data = utils.ReadFile(self._cfg_file)
1685
1686     try:
1687       data = objects.ConfigData.FromDict(serializer.Load(raw_data))
1688     except Exception, err:
1689       raise errors.ConfigurationError(err)
1690
1691     # Make sure the configuration has the right version
1692     _ValidateConfig(data)
1693
1694     if (not hasattr(data, 'cluster') or
1695         not hasattr(data.cluster, 'rsahostkeypub')):
1696       raise errors.ConfigurationError("Incomplete configuration"
1697                                       " (missing cluster.rsahostkeypub)")
1698
1699     if data.cluster.master_node != self._my_hostname and not accept_foreign:
1700       msg = ("The configuration denotes node %s as master, while my"
1701              " hostname is %s; opening a foreign configuration is only"
1702              " possible in accept_foreign mode" %
1703              (data.cluster.master_node, self._my_hostname))
1704       raise errors.ConfigurationError(msg)
1705
1706     # Upgrade configuration if needed
1707     data.UpgradeConfig()
1708
1709     self._config_data = data
1710     # reset the last serial as -1 so that the next write will cause
1711     # ssconf update
1712     self._last_cluster_serial = -1
1713
1714     # And finally run our (custom) config upgrade sequence
1715     self._UpgradeConfig()
1716
1717     self._cfg_id = utils.GetFileID(path=self._cfg_file)
1718
1719   def _UpgradeConfig(self):
1720     """Run upgrade steps that cannot be done purely in the objects.
1721
1722     This is because some data elements need uniqueness across the
1723     whole configuration, etc.
1724
1725     @warning: this function will call L{_WriteConfig()}, but also
1726         L{DropECReservations} so it needs to be called only from a
1727         "safe" place (the constructor). If one wanted to call it with
1728         the lock held, a DropECReservationUnlocked would need to be
1729         created first, to avoid causing deadlock.
1730
1731     """
1732     modified = False
1733     for item in self._AllUUIDObjects():
1734       if item.uuid is None:
1735         item.uuid = self._GenerateUniqueID(_UPGRADE_CONFIG_JID)
1736         modified = True
1737     if not self._config_data.nodegroups:
1738       default_nodegroup_name = constants.INITIAL_NODE_GROUP_NAME
1739       default_nodegroup = objects.NodeGroup(name=default_nodegroup_name,
1740                                             members=[])
1741       self._UnlockedAddNodeGroup(default_nodegroup, _UPGRADE_CONFIG_JID, True)
1742       modified = True
1743     for node in self._config_data.nodes.values():
1744       if not node.group:
1745         node.group = self.LookupNodeGroup(None)
1746         modified = True
1747       # This is technically *not* an upgrade, but needs to be done both when
1748       # nodegroups are being added, and upon normally loading the config,
1749       # because the members list of a node group is discarded upon
1750       # serializing/deserializing the object.
1751       self._UnlockedAddNodeToGroup(node.name, node.group)
1752     if modified:
1753       self._WriteConfig()
1754       # This is ok even if it acquires the internal lock, as _UpgradeConfig is
1755       # only called at config init time, without the lock held
1756       self.DropECReservations(_UPGRADE_CONFIG_JID)
1757
1758   def _DistributeConfig(self, feedback_fn):
1759     """Distribute the configuration to the other nodes.
1760
1761     Currently, this only copies the configuration file. In the future,
1762     it could be used to encapsulate the 2/3-phase update mechanism.
1763
1764     """
1765     if self._offline:
1766       return True
1767
1768     bad = False
1769
1770     node_list = []
1771     addr_list = []
1772     myhostname = self._my_hostname
1773     # we can skip checking whether _UnlockedGetNodeInfo returns None
1774     # since the node list comes from _UnlocketGetNodeList, and we are
1775     # called with the lock held, so no modifications should take place
1776     # in between
1777     for node_name in self._UnlockedGetNodeList():
1778       if node_name == myhostname:
1779         continue
1780       node_info = self._UnlockedGetNodeInfo(node_name)
1781       if not node_info.master_candidate:
1782         continue
1783       node_list.append(node_info.name)
1784       addr_list.append(node_info.primary_ip)
1785
1786     # TODO: Use dedicated resolver talking to config writer for name resolution
1787     result = \
1788       self._GetRpc(addr_list).call_upload_file(node_list, self._cfg_file)
1789     for to_node, to_result in result.items():
1790       msg = to_result.fail_msg
1791       if msg:
1792         msg = ("Copy of file %s to node %s failed: %s" %
1793                (self._cfg_file, to_node, msg))
1794         logging.error(msg)
1795
1796         if feedback_fn:
1797           feedback_fn(msg)
1798
1799         bad = True
1800
1801     return not bad
1802
1803   def _WriteConfig(self, destination=None, feedback_fn=None):
1804     """Write the configuration data to persistent storage.
1805
1806     """
1807     assert feedback_fn is None or callable(feedback_fn)
1808
1809     # Warn on config errors, but don't abort the save - the
1810     # configuration has already been modified, and we can't revert;
1811     # the best we can do is to warn the user and save as is, leaving
1812     # recovery to the user
1813     config_errors = self._UnlockedVerifyConfig()
1814     if config_errors:
1815       errmsg = ("Configuration data is not consistent: %s" %
1816                 (utils.CommaJoin(config_errors)))
1817       logging.critical(errmsg)
1818       if feedback_fn:
1819         feedback_fn(errmsg)
1820
1821     if destination is None:
1822       destination = self._cfg_file
1823     self._BumpSerialNo()
1824     txt = serializer.Dump(self._config_data.ToDict())
1825
1826     getents = self._getents()
1827     try:
1828       fd = utils.SafeWriteFile(destination, self._cfg_id, data=txt,
1829                                close=False, gid=getents.confd_gid, mode=0640)
1830     except errors.LockError:
1831       raise errors.ConfigurationError("The configuration file has been"
1832                                       " modified since the last write, cannot"
1833                                       " update")
1834     try:
1835       self._cfg_id = utils.GetFileID(fd=fd)
1836     finally:
1837       os.close(fd)
1838
1839     self.write_count += 1
1840
1841     # and redistribute the config file to master candidates
1842     self._DistributeConfig(feedback_fn)
1843
1844     # Write ssconf files on all nodes (including locally)
1845     if self._last_cluster_serial < self._config_data.cluster.serial_no:
1846       if not self._offline:
1847         result = self._GetRpc(None).call_write_ssconf_files(
1848           self._UnlockedGetOnlineNodeList(),
1849           self._UnlockedGetSsconfValues())
1850
1851         for nname, nresu in result.items():
1852           msg = nresu.fail_msg
1853           if msg:
1854             errmsg = ("Error while uploading ssconf files to"
1855                       " node %s: %s" % (nname, msg))
1856             logging.warning(errmsg)
1857
1858             if feedback_fn:
1859               feedback_fn(errmsg)
1860
1861       self._last_cluster_serial = self._config_data.cluster.serial_no
1862
1863   def _UnlockedGetSsconfValues(self):
1864     """Return the values needed by ssconf.
1865
1866     @rtype: dict
1867     @return: a dictionary with keys the ssconf names and values their
1868         associated value
1869
1870     """
1871     fn = "\n".join
1872     instance_names = utils.NiceSort(self._UnlockedGetInstanceList())
1873     node_names = utils.NiceSort(self._UnlockedGetNodeList())
1874     node_info = [self._UnlockedGetNodeInfo(name) for name in node_names]
1875     node_pri_ips = ["%s %s" % (ninfo.name, ninfo.primary_ip)
1876                     for ninfo in node_info]
1877     node_snd_ips = ["%s %s" % (ninfo.name, ninfo.secondary_ip)
1878                     for ninfo in node_info]
1879
1880     instance_data = fn(instance_names)
1881     off_data = fn(node.name for node in node_info if node.offline)
1882     on_data = fn(node.name for node in node_info if not node.offline)
1883     mc_data = fn(node.name for node in node_info if node.master_candidate)
1884     mc_ips_data = fn(node.primary_ip for node in node_info
1885                      if node.master_candidate)
1886     node_data = fn(node_names)
1887     node_pri_ips_data = fn(node_pri_ips)
1888     node_snd_ips_data = fn(node_snd_ips)
1889
1890     cluster = self._config_data.cluster
1891     cluster_tags = fn(cluster.GetTags())
1892
1893     hypervisor_list = fn(cluster.enabled_hypervisors)
1894
1895     uid_pool = uidpool.FormatUidPool(cluster.uid_pool, separator="\n")
1896
1897     nodegroups = ["%s %s" % (nodegroup.uuid, nodegroup.name) for nodegroup in
1898                   self._config_data.nodegroups.values()]
1899     nodegroups_data = fn(utils.NiceSort(nodegroups))
1900
1901     ssconf_values = {
1902       constants.SS_CLUSTER_NAME: cluster.cluster_name,
1903       constants.SS_CLUSTER_TAGS: cluster_tags,
1904       constants.SS_FILE_STORAGE_DIR: cluster.file_storage_dir,
1905       constants.SS_SHARED_FILE_STORAGE_DIR: cluster.shared_file_storage_dir,
1906       constants.SS_MASTER_CANDIDATES: mc_data,
1907       constants.SS_MASTER_CANDIDATES_IPS: mc_ips_data,
1908       constants.SS_MASTER_IP: cluster.master_ip,
1909       constants.SS_MASTER_NETDEV: cluster.master_netdev,
1910       constants.SS_MASTER_NETMASK: str(cluster.master_netmask),
1911       constants.SS_MASTER_NODE: cluster.master_node,
1912       constants.SS_NODE_LIST: node_data,
1913       constants.SS_NODE_PRIMARY_IPS: node_pri_ips_data,
1914       constants.SS_NODE_SECONDARY_IPS: node_snd_ips_data,
1915       constants.SS_OFFLINE_NODES: off_data,
1916       constants.SS_ONLINE_NODES: on_data,
1917       constants.SS_PRIMARY_IP_FAMILY: str(cluster.primary_ip_family),
1918       constants.SS_INSTANCE_LIST: instance_data,
1919       constants.SS_RELEASE_VERSION: constants.RELEASE_VERSION,
1920       constants.SS_HYPERVISOR_LIST: hypervisor_list,
1921       constants.SS_MAINTAIN_NODE_HEALTH: str(cluster.maintain_node_health),
1922       constants.SS_UID_POOL: uid_pool,
1923       constants.SS_NODEGROUPS: nodegroups_data,
1924       }
1925     bad_values = [(k, v) for k, v in ssconf_values.items()
1926                   if not isinstance(v, (str, basestring))]
1927     if bad_values:
1928       err = utils.CommaJoin("%s=%s" % (k, v) for k, v in bad_values)
1929       raise errors.ConfigurationError("Some ssconf key(s) have non-string"
1930                                       " values: %s" % err)
1931     return ssconf_values
1932
1933   @locking.ssynchronized(_config_lock, shared=1)
1934   def GetSsconfValues(self):
1935     """Wrapper using lock around _UnlockedGetSsconf().
1936
1937     """
1938     return self._UnlockedGetSsconfValues()
1939
1940   @locking.ssynchronized(_config_lock, shared=1)
1941   def GetVGName(self):
1942     """Return the volume group name.
1943
1944     """
1945     return self._config_data.cluster.volume_group_name
1946
1947   @locking.ssynchronized(_config_lock)
1948   def SetVGName(self, vg_name):
1949     """Set the volume group name.
1950
1951     """
1952     self._config_data.cluster.volume_group_name = vg_name
1953     self._config_data.cluster.serial_no += 1
1954     self._WriteConfig()
1955
1956   @locking.ssynchronized(_config_lock, shared=1)
1957   def GetDRBDHelper(self):
1958     """Return DRBD usermode helper.
1959
1960     """
1961     return self._config_data.cluster.drbd_usermode_helper
1962
1963   @locking.ssynchronized(_config_lock)
1964   def SetDRBDHelper(self, drbd_helper):
1965     """Set DRBD usermode helper.
1966
1967     """
1968     self._config_data.cluster.drbd_usermode_helper = drbd_helper
1969     self._config_data.cluster.serial_no += 1
1970     self._WriteConfig()
1971
1972   @locking.ssynchronized(_config_lock, shared=1)
1973   def GetMACPrefix(self):
1974     """Return the mac prefix.
1975
1976     """
1977     return self._config_data.cluster.mac_prefix
1978
1979   @locking.ssynchronized(_config_lock, shared=1)
1980   def GetClusterInfo(self):
1981     """Returns information about the cluster
1982
1983     @rtype: L{objects.Cluster}
1984     @return: the cluster object
1985
1986     """
1987     return self._config_data.cluster
1988
1989   @locking.ssynchronized(_config_lock, shared=1)
1990   def HasAnyDiskOfType(self, dev_type):
1991     """Check if in there is at disk of the given type in the configuration.
1992
1993     """
1994     return self._config_data.HasAnyDiskOfType(dev_type)
1995
1996   @locking.ssynchronized(_config_lock)
1997   def Update(self, target, feedback_fn):
1998     """Notify function to be called after updates.
1999
2000     This function must be called when an object (as returned by
2001     GetInstanceInfo, GetNodeInfo, GetCluster) has been updated and the
2002     caller wants the modifications saved to the backing store. Note
2003     that all modified objects will be saved, but the target argument
2004     is the one the caller wants to ensure that it's saved.
2005
2006     @param target: an instance of either L{objects.Cluster},
2007         L{objects.Node} or L{objects.Instance} which is existing in
2008         the cluster
2009     @param feedback_fn: Callable feedback function
2010
2011     """
2012     if self._config_data is None:
2013       raise errors.ProgrammerError("Configuration file not read,"
2014                                    " cannot save.")
2015     update_serial = False
2016     if isinstance(target, objects.Cluster):
2017       test = target == self._config_data.cluster
2018     elif isinstance(target, objects.Node):
2019       test = target in self._config_data.nodes.values()
2020       update_serial = True
2021     elif isinstance(target, objects.Instance):
2022       test = target in self._config_data.instances.values()
2023     elif isinstance(target, objects.NodeGroup):
2024       test = target in self._config_data.nodegroups.values()
2025     else:
2026       raise errors.ProgrammerError("Invalid object type (%s) passed to"
2027                                    " ConfigWriter.Update" % type(target))
2028     if not test:
2029       raise errors.ConfigurationError("Configuration updated since object"
2030                                       " has been read or unknown object")
2031     target.serial_no += 1
2032     target.mtime = now = time.time()
2033
2034     if update_serial:
2035       # for node updates, we need to increase the cluster serial too
2036       self._config_data.cluster.serial_no += 1
2037       self._config_data.cluster.mtime = now
2038
2039     if isinstance(target, objects.Instance):
2040       self._UnlockedReleaseDRBDMinors(target.name)
2041
2042     self._WriteConfig(feedback_fn=feedback_fn)
2043
2044   @locking.ssynchronized(_config_lock)
2045   def DropECReservations(self, ec_id):
2046     """Drop per-execution-context reservations
2047
2048     """
2049     for rm in self._all_rms:
2050       rm.DropECReservations(ec_id)