Add a few more checks to verify config
[ganeti-local] / lib / config.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Configuration management for Ganeti
23
24 This module provides the interface to the Ganeti cluster configuration.
25
26 The configuration data is stored on every node but is updated on the master
27 only. After each update, the master distributes the data to the other nodes.
28
29 Currently, the data storage format is JSON. YAML was slow and consuming too
30 much memory.
31
32 """
33
34 import os
35 import tempfile
36 import random
37 import logging
38
39 from ganeti import errors
40 from ganeti import locking
41 from ganeti import utils
42 from ganeti import constants
43 from ganeti import rpc
44 from ganeti import objects
45 from ganeti import serializer
46
47
48 _config_lock = locking.SharedLock()
49
50
51 def _ValidateConfig(data):
52   """Verifies that a configuration objects looks valid.
53
54   This only verifies the version of the configuration.
55
56   @raise errors.ConfigurationError: if the version differs from what
57       we expect
58
59   """
60   if data.version != constants.CONFIG_VERSION:
61     raise errors.ConfigurationError("Cluster configuration version"
62                                     " mismatch, got %s instead of %s" %
63                                     (data.version,
64                                      constants.CONFIG_VERSION))
65
66
67 class ConfigWriter:
68   """The interface to the cluster configuration.
69
70   """
71   def __init__(self, cfg_file=None, offline=False):
72     self.write_count = 0
73     self._lock = _config_lock
74     self._config_data = None
75     self._offline = offline
76     if cfg_file is None:
77       self._cfg_file = constants.CLUSTER_CONF_FILE
78     else:
79       self._cfg_file = cfg_file
80     self._temporary_ids = set()
81     self._temporary_drbds = {}
82     self._temporary_macs = set()
83     # Note: in order to prevent errors when resolving our name in
84     # _DistributeConfig, we compute it here once and reuse it; it's
85     # better to raise an error before starting to modify the config
86     # file than after it was modified
87     self._my_hostname = utils.HostInfo().name
88     self._last_cluster_serial = -1
89     self._OpenConfig()
90
91   # this method needs to be static, so that we can call it on the class
92   @staticmethod
93   def IsCluster():
94     """Check if the cluster is configured.
95
96     """
97     return os.path.exists(constants.CLUSTER_CONF_FILE)
98
99   @locking.ssynchronized(_config_lock, shared=1)
100   def GenerateMAC(self):
101     """Generate a MAC for an instance.
102
103     This should check the current instances for duplicates.
104
105     """
106     prefix = self._config_data.cluster.mac_prefix
107     all_macs = self._AllMACs()
108     retries = 64
109     while retries > 0:
110       byte1 = random.randrange(0, 256)
111       byte2 = random.randrange(0, 256)
112       byte3 = random.randrange(0, 256)
113       mac = "%s:%02x:%02x:%02x" % (prefix, byte1, byte2, byte3)
114       if mac not in all_macs and mac not in self._temporary_macs:
115         break
116       retries -= 1
117     else:
118       raise errors.ConfigurationError("Can't generate unique MAC")
119     self._temporary_macs.add(mac)
120     return mac
121
122   @locking.ssynchronized(_config_lock, shared=1)
123   def IsMacInUse(self, mac):
124     """Predicate: check if the specified MAC is in use in the Ganeti cluster.
125
126     This only checks instances managed by this cluster, it does not
127     check for potential collisions elsewhere.
128
129     """
130     all_macs = self._AllMACs()
131     return mac in all_macs or mac in self._temporary_macs
132
133   @locking.ssynchronized(_config_lock, shared=1)
134   def GenerateDRBDSecret(self):
135     """Generate a DRBD secret.
136
137     This checks the current disks for duplicates.
138
139     """
140     all_secrets = self._AllDRBDSecrets()
141     retries = 64
142     while retries > 0:
143       secret = utils.GenerateSecret()
144       if secret not in all_secrets:
145         break
146       retries -= 1
147     else:
148       raise errors.ConfigurationError("Can't generate unique DRBD secret")
149     return secret
150
151   def _ComputeAllLVs(self):
152     """Compute the list of all LVs.
153
154     """
155     lvnames = set()
156     for instance in self._config_data.instances.values():
157       node_data = instance.MapLVsByNode()
158       for lv_list in node_data.values():
159         lvnames.update(lv_list)
160     return lvnames
161
162   @locking.ssynchronized(_config_lock, shared=1)
163   def GenerateUniqueID(self, exceptions=None):
164     """Generate an unique disk name.
165
166     This checks the current node, instances and disk names for
167     duplicates.
168
169     @param exceptions: a list with some other names which should be checked
170         for uniqueness (used for example when you want to get
171         more than one id at one time without adding each one in
172         turn to the config file)
173
174     @rtype: string
175     @return: the unique id
176
177     """
178     existing = set()
179     existing.update(self._temporary_ids)
180     existing.update(self._ComputeAllLVs())
181     existing.update(self._config_data.instances.keys())
182     existing.update(self._config_data.nodes.keys())
183     if exceptions is not None:
184       existing.update(exceptions)
185     retries = 64
186     while retries > 0:
187       unique_id = utils.NewUUID()
188       if unique_id not in existing and unique_id is not None:
189         break
190     else:
191       raise errors.ConfigurationError("Not able generate an unique ID"
192                                       " (last tried ID: %s" % unique_id)
193     self._temporary_ids.add(unique_id)
194     return unique_id
195
196   def _AllMACs(self):
197     """Return all MACs present in the config.
198
199     @rtype: list
200     @return: the list of all MACs
201
202     """
203     result = []
204     for instance in self._config_data.instances.values():
205       for nic in instance.nics:
206         result.append(nic.mac)
207
208     return result
209
210   def _AllDRBDSecrets(self):
211     """Return all DRBD secrets present in the config.
212
213     @rtype: list
214     @return: the list of all DRBD secrets
215
216     """
217     def helper(disk, result):
218       """Recursively gather secrets from this disk."""
219       if disk.dev_type == constants.DT_DRBD8:
220         result.append(disk.logical_id[5])
221       if disk.children:
222         for child in disk.children:
223           helper(child, result)
224
225     result = []
226     for instance in self._config_data.instances.values():
227       for disk in instance.disks:
228         helper(disk, result)
229
230     return result
231
232   def _CheckDiskIDs(self, disk, l_ids, p_ids):
233     """Compute duplicate disk IDs
234
235     @type disk: L{objects.Disk}
236     @param disk: the disk at which to start searching
237     @type l_ids: list
238     @param l_ids: list of current logical ids
239     @type p_ids: list
240     @param p_ids: list of current physical ids
241     @rtype: list
242     @return: a list of error messages
243
244     """
245     result = []
246     if disk.logical_id is not None:
247       if disk.logical_id in l_ids:
248         result.append("duplicate logical id %s" % str(disk.logical_id))
249       else:
250         l_ids.append(disk.logical_id)
251     if disk.physical_id is not None:
252       if disk.physical_id in p_ids:
253         result.append("duplicate physical id %s" % str(disk.physical_id))
254       else:
255         p_ids.append(disk.physical_id)
256
257     if disk.children:
258       for child in disk.children:
259         result.extend(self._CheckDiskIDs(child, l_ids, p_ids))
260     return result
261
262   def _UnlockedVerifyConfig(self):
263     """Verify function.
264
265     @rtype: list
266     @return: a list of error messages; a non-empty list signifies
267         configuration errors
268
269     """
270     result = []
271     seen_macs = []
272     ports = {}
273     data = self._config_data
274     seen_lids = []
275     seen_pids = []
276
277     # global cluster checks
278     if not data.cluster.enabled_hypervisors:
279       result.append("enabled hypervisors list doesn't have any entries")
280     invalid_hvs = set(data.cluster.enabled_hypervisors) - constants.HYPER_TYPES
281     if invalid_hvs:
282       result.append("enabled hypervisors contains invalid entries: %s" %
283                     invalid_hvs)
284
285     if data.cluster.master_node not in data.nodes:
286       result.append("cluster has invalid primary node '%s'" %
287                     data.cluster.master_node)
288
289     # per-instance checks
290     for instance_name in data.instances:
291       instance = data.instances[instance_name]
292       if instance.primary_node not in data.nodes:
293         result.append("instance '%s' has invalid primary node '%s'" %
294                       (instance_name, instance.primary_node))
295       for snode in instance.secondary_nodes:
296         if snode not in data.nodes:
297           result.append("instance '%s' has invalid secondary node '%s'" %
298                         (instance_name, snode))
299       for idx, nic in enumerate(instance.nics):
300         if nic.mac in seen_macs:
301           result.append("instance '%s' has NIC %d mac %s duplicate" %
302                         (instance_name, idx, nic.mac))
303         else:
304           seen_macs.append(nic.mac)
305
306       # gather the drbd ports for duplicate checks
307       for dsk in instance.disks:
308         if dsk.dev_type in constants.LDS_DRBD:
309           tcp_port = dsk.logical_id[2]
310           if tcp_port not in ports:
311             ports[tcp_port] = []
312           ports[tcp_port].append((instance.name, "drbd disk %s" % dsk.iv_name))
313       # gather network port reservation
314       net_port = getattr(instance, "network_port", None)
315       if net_port is not None:
316         if net_port not in ports:
317           ports[net_port] = []
318         ports[net_port].append((instance.name, "network port"))
319
320       # instance disk verify
321       for idx, disk in enumerate(instance.disks):
322         result.extend(["instance '%s' disk %d error: %s" %
323                        (instance.name, idx, msg) for msg in disk.Verify()])
324         result.extend(self._CheckDiskIDs(disk, seen_lids, seen_pids))
325
326     # cluster-wide pool of free ports
327     for free_port in data.cluster.tcpudp_port_pool:
328       if free_port not in ports:
329         ports[free_port] = []
330       ports[free_port].append(("cluster", "port marked as free"))
331
332     # compute tcp/udp duplicate ports
333     keys = ports.keys()
334     keys.sort()
335     for pnum in keys:
336       pdata = ports[pnum]
337       if len(pdata) > 1:
338         txt = ", ".join(["%s/%s" % val for val in pdata])
339         result.append("tcp/udp port %s has duplicates: %s" % (pnum, txt))
340
341     # highest used tcp port check
342     if keys:
343       if keys[-1] > data.cluster.highest_used_port:
344         result.append("Highest used port mismatch, saved %s, computed %s" %
345                       (data.cluster.highest_used_port, keys[-1]))
346
347     if not data.nodes[data.cluster.master_node].master_candidate:
348       result.append("Master node is not a master candidate")
349
350     # master candidate checks
351     mc_now, mc_max = self._UnlockedGetMasterCandidateStats()
352     if mc_now < mc_max:
353       result.append("Not enough master candidates: actual %d, target %d" %
354                     (mc_now, mc_max))
355
356     # node checks
357     for node in data.nodes.values():
358       if [node.master_candidate, node.drained, node.offline].count(True) > 1:
359         result.append("Node %s state is invalid: master_candidate=%s,"
360                       " drain=%s, offline=%s" %
361                       (node.name, node.master_candidate, node.drain,
362                        node.offline))
363
364     # drbd minors check
365     d_map, duplicates = self._UnlockedComputeDRBDMap()
366     for node, minor, instance_a, instance_b in duplicates:
367       result.append("DRBD minor %d on node %s is assigned twice to instances"
368                     " %s and %s" % (minor, node, instance_a, instance_b))
369
370     return result
371
372   @locking.ssynchronized(_config_lock, shared=1)
373   def VerifyConfig(self):
374     """Verify function.
375
376     This is just a wrapper over L{_UnlockedVerifyConfig}.
377
378     @rtype: list
379     @return: a list of error messages; a non-empty list signifies
380         configuration errors
381
382     """
383     return self._UnlockedVerifyConfig()
384
385   def _UnlockedSetDiskID(self, disk, node_name):
386     """Convert the unique ID to the ID needed on the target nodes.
387
388     This is used only for drbd, which needs ip/port configuration.
389
390     The routine descends down and updates its children also, because
391     this helps when the only the top device is passed to the remote
392     node.
393
394     This function is for internal use, when the config lock is already held.
395
396     """
397     if disk.children:
398       for child in disk.children:
399         self._UnlockedSetDiskID(child, node_name)
400
401     if disk.logical_id is None and disk.physical_id is not None:
402       return
403     if disk.dev_type == constants.LD_DRBD8:
404       pnode, snode, port, pminor, sminor, secret = disk.logical_id
405       if node_name not in (pnode, snode):
406         raise errors.ConfigurationError("DRBD device not knowing node %s" %
407                                         node_name)
408       pnode_info = self._UnlockedGetNodeInfo(pnode)
409       snode_info = self._UnlockedGetNodeInfo(snode)
410       if pnode_info is None or snode_info is None:
411         raise errors.ConfigurationError("Can't find primary or secondary node"
412                                         " for %s" % str(disk))
413       p_data = (pnode_info.secondary_ip, port)
414       s_data = (snode_info.secondary_ip, port)
415       if pnode == node_name:
416         disk.physical_id = p_data + s_data + (pminor, secret)
417       else: # it must be secondary, we tested above
418         disk.physical_id = s_data + p_data + (sminor, secret)
419     else:
420       disk.physical_id = disk.logical_id
421     return
422
423   @locking.ssynchronized(_config_lock)
424   def SetDiskID(self, disk, node_name):
425     """Convert the unique ID to the ID needed on the target nodes.
426
427     This is used only for drbd, which needs ip/port configuration.
428
429     The routine descends down and updates its children also, because
430     this helps when the only the top device is passed to the remote
431     node.
432
433     """
434     return self._UnlockedSetDiskID(disk, node_name)
435
436   @locking.ssynchronized(_config_lock)
437   def AddTcpUdpPort(self, port):
438     """Adds a new port to the available port pool.
439
440     """
441     if not isinstance(port, int):
442       raise errors.ProgrammerError("Invalid type passed for port")
443
444     self._config_data.cluster.tcpudp_port_pool.add(port)
445     self._WriteConfig()
446
447   @locking.ssynchronized(_config_lock, shared=1)
448   def GetPortList(self):
449     """Returns a copy of the current port list.
450
451     """
452     return self._config_data.cluster.tcpudp_port_pool.copy()
453
454   @locking.ssynchronized(_config_lock)
455   def AllocatePort(self):
456     """Allocate a port.
457
458     The port will be taken from the available port pool or from the
459     default port range (and in this case we increase
460     highest_used_port).
461
462     """
463     # If there are TCP/IP ports configured, we use them first.
464     if self._config_data.cluster.tcpudp_port_pool:
465       port = self._config_data.cluster.tcpudp_port_pool.pop()
466     else:
467       port = self._config_data.cluster.highest_used_port + 1
468       if port >= constants.LAST_DRBD_PORT:
469         raise errors.ConfigurationError("The highest used port is greater"
470                                         " than %s. Aborting." %
471                                         constants.LAST_DRBD_PORT)
472       self._config_data.cluster.highest_used_port = port
473
474     self._WriteConfig()
475     return port
476
477   def _UnlockedComputeDRBDMap(self):
478     """Compute the used DRBD minor/nodes.
479
480     @rtype: (dict, list)
481     @return: dictionary of node_name: dict of minor: instance_name;
482         the returned dict will have all the nodes in it (even if with
483         an empty list), and a list of duplicates; if the duplicates
484         list is not empty, the configuration is corrupted and its caller
485         should raise an exception
486
487     """
488     def _AppendUsedPorts(instance_name, disk, used):
489       duplicates = []
490       if disk.dev_type == constants.LD_DRBD8 and len(disk.logical_id) >= 5:
491         node_a, node_b, _, minor_a, minor_b = disk.logical_id[:5]
492         for node, port in ((node_a, minor_a), (node_b, minor_b)):
493           assert node in used, ("Node '%s' of instance '%s' not found"
494                                 " in node list" % (node, instance_name))
495           if port in used[node]:
496             duplicates.append((node, port, instance_name, used[node][port]))
497           else:
498             used[node][port] = instance_name
499       if disk.children:
500         for child in disk.children:
501           duplicates.extend(_AppendUsedPorts(instance_name, child, used))
502       return duplicates
503
504     duplicates = []
505     my_dict = dict((node, {}) for node in self._config_data.nodes)
506     for instance in self._config_data.instances.itervalues():
507       for disk in instance.disks:
508         duplicates.extend(_AppendUsedPorts(instance.name, disk, my_dict))
509     for (node, minor), instance in self._temporary_drbds.iteritems():
510       if minor in my_dict[node] and my_dict[node][minor] != instance:
511         duplicates.append((node, minor, instance, my_dict[node][minor]))
512       else:
513         my_dict[node][minor] = instance
514     return my_dict, duplicates
515
516   @locking.ssynchronized(_config_lock)
517   def ComputeDRBDMap(self):
518     """Compute the used DRBD minor/nodes.
519
520     This is just a wrapper over L{_UnlockedComputeDRBDMap}.
521
522     @return: dictionary of node_name: dict of minor: instance_name;
523         the returned dict will have all the nodes in it (even if with
524         an empty list).
525
526     """
527     d_map, duplicates = self._UnlockedComputeDRBDMap()
528     if duplicates:
529       raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
530                                       str(duplicates))
531     return d_map
532
533   @locking.ssynchronized(_config_lock)
534   def AllocateDRBDMinor(self, nodes, instance):
535     """Allocate a drbd minor.
536
537     The free minor will be automatically computed from the existing
538     devices. A node can be given multiple times in order to allocate
539     multiple minors. The result is the list of minors, in the same
540     order as the passed nodes.
541
542     @type instance: string
543     @param instance: the instance for which we allocate minors
544
545     """
546     assert isinstance(instance, basestring), \
547            "Invalid argument '%s' passed to AllocateDRBDMinor" % instance
548
549     d_map, duplicates = self._UnlockedComputeDRBDMap()
550     if duplicates:
551       raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
552                                       str(duplicates))
553     result = []
554     for nname in nodes:
555       ndata = d_map[nname]
556       if not ndata:
557         # no minors used, we can start at 0
558         result.append(0)
559         ndata[0] = instance
560         self._temporary_drbds[(nname, 0)] = instance
561         continue
562       keys = ndata.keys()
563       keys.sort()
564       ffree = utils.FirstFree(keys)
565       if ffree is None:
566         # return the next minor
567         # TODO: implement high-limit check
568         minor = keys[-1] + 1
569       else:
570         minor = ffree
571       # double-check minor against current instances
572       assert minor not in d_map[nname], \
573              ("Attempt to reuse allocated DRBD minor %d on node %s,"
574               " already allocated to instance %s" %
575               (minor, nname, d_map[nname][minor]))
576       ndata[minor] = instance
577       # double-check minor against reservation
578       r_key = (nname, minor)
579       assert r_key not in self._temporary_drbds, \
580              ("Attempt to reuse reserved DRBD minor %d on node %s,"
581               " reserved for instance %s" %
582               (minor, nname, self._temporary_drbds[r_key]))
583       self._temporary_drbds[r_key] = instance
584       result.append(minor)
585     logging.debug("Request to allocate drbd minors, input: %s, returning %s",
586                   nodes, result)
587     return result
588
589   def _UnlockedReleaseDRBDMinors(self, instance):
590     """Release temporary drbd minors allocated for a given instance.
591
592     @type instance: string
593     @param instance: the instance for which temporary minors should be
594                      released
595
596     """
597     assert isinstance(instance, basestring), \
598            "Invalid argument passed to ReleaseDRBDMinors"
599     for key, name in self._temporary_drbds.items():
600       if name == instance:
601         del self._temporary_drbds[key]
602
603   @locking.ssynchronized(_config_lock)
604   def ReleaseDRBDMinors(self, instance):
605     """Release temporary drbd minors allocated for a given instance.
606
607     This should be called on the error paths, on the success paths
608     it's automatically called by the ConfigWriter add and update
609     functions.
610
611     This function is just a wrapper over L{_UnlockedReleaseDRBDMinors}.
612
613     @type instance: string
614     @param instance: the instance for which temporary minors should be
615                      released
616
617     """
618     self._UnlockedReleaseDRBDMinors(instance)
619
620   @locking.ssynchronized(_config_lock, shared=1)
621   def GetConfigVersion(self):
622     """Get the configuration version.
623
624     @return: Config version
625
626     """
627     return self._config_data.version
628
629   @locking.ssynchronized(_config_lock, shared=1)
630   def GetClusterName(self):
631     """Get cluster name.
632
633     @return: Cluster name
634
635     """
636     return self._config_data.cluster.cluster_name
637
638   @locking.ssynchronized(_config_lock, shared=1)
639   def GetMasterNode(self):
640     """Get the hostname of the master node for this cluster.
641
642     @return: Master hostname
643
644     """
645     return self._config_data.cluster.master_node
646
647   @locking.ssynchronized(_config_lock, shared=1)
648   def GetMasterIP(self):
649     """Get the IP of the master node for this cluster.
650
651     @return: Master IP
652
653     """
654     return self._config_data.cluster.master_ip
655
656   @locking.ssynchronized(_config_lock, shared=1)
657   def GetMasterNetdev(self):
658     """Get the master network device for this cluster.
659
660     """
661     return self._config_data.cluster.master_netdev
662
663   @locking.ssynchronized(_config_lock, shared=1)
664   def GetFileStorageDir(self):
665     """Get the file storage dir for this cluster.
666
667     """
668     return self._config_data.cluster.file_storage_dir
669
670   @locking.ssynchronized(_config_lock, shared=1)
671   def GetHypervisorType(self):
672     """Get the hypervisor type for this cluster.
673
674     """
675     return self._config_data.cluster.default_hypervisor
676
677   @locking.ssynchronized(_config_lock, shared=1)
678   def GetHostKey(self):
679     """Return the rsa hostkey from the config.
680
681     @rtype: string
682     @return: the rsa hostkey
683
684     """
685     return self._config_data.cluster.rsahostkeypub
686
687   @locking.ssynchronized(_config_lock)
688   def AddInstance(self, instance):
689     """Add an instance to the config.
690
691     This should be used after creating a new instance.
692
693     @type instance: L{objects.Instance}
694     @param instance: the instance object
695
696     """
697     if not isinstance(instance, objects.Instance):
698       raise errors.ProgrammerError("Invalid type passed to AddInstance")
699
700     if instance.disk_template != constants.DT_DISKLESS:
701       all_lvs = instance.MapLVsByNode()
702       logging.info("Instance '%s' DISK_LAYOUT: %s", instance.name, all_lvs)
703
704     all_macs = self._AllMACs()
705     for nic in instance.nics:
706       if nic.mac in all_macs:
707         raise errors.ConfigurationError("Cannot add instance %s:"
708           " MAC address '%s' already in use." % (instance.name, nic.mac))
709
710     instance.serial_no = 1
711     self._config_data.instances[instance.name] = instance
712     self._config_data.cluster.serial_no += 1
713     self._UnlockedReleaseDRBDMinors(instance.name)
714     for nic in instance.nics:
715       self._temporary_macs.discard(nic.mac)
716     self._WriteConfig()
717
718   def _SetInstanceStatus(self, instance_name, status):
719     """Set the instance's status to a given value.
720
721     """
722     assert isinstance(status, bool), \
723            "Invalid status '%s' passed to SetInstanceStatus" % (status,)
724
725     if instance_name not in self._config_data.instances:
726       raise errors.ConfigurationError("Unknown instance '%s'" %
727                                       instance_name)
728     instance = self._config_data.instances[instance_name]
729     if instance.admin_up != status:
730       instance.admin_up = status
731       instance.serial_no += 1
732       self._WriteConfig()
733
734   @locking.ssynchronized(_config_lock)
735   def MarkInstanceUp(self, instance_name):
736     """Mark the instance status to up in the config.
737
738     """
739     self._SetInstanceStatus(instance_name, True)
740
741   @locking.ssynchronized(_config_lock)
742   def RemoveInstance(self, instance_name):
743     """Remove the instance from the configuration.
744
745     """
746     if instance_name not in self._config_data.instances:
747       raise errors.ConfigurationError("Unknown instance '%s'" % instance_name)
748     del self._config_data.instances[instance_name]
749     self._config_data.cluster.serial_no += 1
750     self._WriteConfig()
751
752   @locking.ssynchronized(_config_lock)
753   def RenameInstance(self, old_name, new_name):
754     """Rename an instance.
755
756     This needs to be done in ConfigWriter and not by RemoveInstance
757     combined with AddInstance as only we can guarantee an atomic
758     rename.
759
760     """
761     if old_name not in self._config_data.instances:
762       raise errors.ConfigurationError("Unknown instance '%s'" % old_name)
763     inst = self._config_data.instances[old_name]
764     del self._config_data.instances[old_name]
765     inst.name = new_name
766
767     for disk in inst.disks:
768       if disk.dev_type == constants.LD_FILE:
769         # rename the file paths in logical and physical id
770         file_storage_dir = os.path.dirname(os.path.dirname(disk.logical_id[1]))
771         disk.physical_id = disk.logical_id = (disk.logical_id[0],
772                                               os.path.join(file_storage_dir,
773                                                            inst.name,
774                                                            disk.iv_name))
775
776     self._config_data.instances[inst.name] = inst
777     self._WriteConfig()
778
779   @locking.ssynchronized(_config_lock)
780   def MarkInstanceDown(self, instance_name):
781     """Mark the status of an instance to down in the configuration.
782
783     """
784     self._SetInstanceStatus(instance_name, False)
785
786   def _UnlockedGetInstanceList(self):
787     """Get the list of instances.
788
789     This function is for internal use, when the config lock is already held.
790
791     """
792     return self._config_data.instances.keys()
793
794   @locking.ssynchronized(_config_lock, shared=1)
795   def GetInstanceList(self):
796     """Get the list of instances.
797
798     @return: array of instances, ex. ['instance2.example.com',
799         'instance1.example.com']
800
801     """
802     return self._UnlockedGetInstanceList()
803
804   @locking.ssynchronized(_config_lock, shared=1)
805   def ExpandInstanceName(self, short_name):
806     """Attempt to expand an incomplete instance name.
807
808     """
809     return utils.MatchNameComponent(short_name,
810                                     self._config_data.instances.keys())
811
812   def _UnlockedGetInstanceInfo(self, instance_name):
813     """Returns information about an instance.
814
815     This function is for internal use, when the config lock is already held.
816
817     """
818     if instance_name not in self._config_data.instances:
819       return None
820
821     return self._config_data.instances[instance_name]
822
823   @locking.ssynchronized(_config_lock, shared=1)
824   def GetInstanceInfo(self, instance_name):
825     """Returns information about an instance.
826
827     It takes the information from the configuration file. Other information of
828     an instance are taken from the live systems.
829
830     @param instance_name: name of the instance, e.g.
831         I{instance1.example.com}
832
833     @rtype: L{objects.Instance}
834     @return: the instance object
835
836     """
837     return self._UnlockedGetInstanceInfo(instance_name)
838
839   @locking.ssynchronized(_config_lock, shared=1)
840   def GetAllInstancesInfo(self):
841     """Get the configuration of all instances.
842
843     @rtype: dict
844     @return: dict of (instance, instance_info), where instance_info is what
845               would GetInstanceInfo return for the node
846
847     """
848     my_dict = dict([(instance, self._UnlockedGetInstanceInfo(instance))
849                     for instance in self._UnlockedGetInstanceList()])
850     return my_dict
851
852   @locking.ssynchronized(_config_lock)
853   def AddNode(self, node):
854     """Add a node to the configuration.
855
856     @type node: L{objects.Node}
857     @param node: a Node instance
858
859     """
860     logging.info("Adding node %s to configuration" % node.name)
861
862     node.serial_no = 1
863     self._config_data.nodes[node.name] = node
864     self._config_data.cluster.serial_no += 1
865     self._WriteConfig()
866
867   @locking.ssynchronized(_config_lock)
868   def RemoveNode(self, node_name):
869     """Remove a node from the configuration.
870
871     """
872     logging.info("Removing node %s from configuration" % node_name)
873
874     if node_name not in self._config_data.nodes:
875       raise errors.ConfigurationError("Unknown node '%s'" % node_name)
876
877     del self._config_data.nodes[node_name]
878     self._config_data.cluster.serial_no += 1
879     self._WriteConfig()
880
881   @locking.ssynchronized(_config_lock, shared=1)
882   def ExpandNodeName(self, short_name):
883     """Attempt to expand an incomplete instance name.
884
885     """
886     return utils.MatchNameComponent(short_name,
887                                     self._config_data.nodes.keys())
888
889   def _UnlockedGetNodeInfo(self, node_name):
890     """Get the configuration of a node, as stored in the config.
891
892     This function is for internal use, when the config lock is already
893     held.
894
895     @param node_name: the node name, e.g. I{node1.example.com}
896
897     @rtype: L{objects.Node}
898     @return: the node object
899
900     """
901     if node_name not in self._config_data.nodes:
902       return None
903
904     return self._config_data.nodes[node_name]
905
906
907   @locking.ssynchronized(_config_lock, shared=1)
908   def GetNodeInfo(self, node_name):
909     """Get the configuration of a node, as stored in the config.
910
911     This is just a locked wrapper over L{_UnlockedGetNodeInfo}.
912
913     @param node_name: the node name, e.g. I{node1.example.com}
914
915     @rtype: L{objects.Node}
916     @return: the node object
917
918     """
919     return self._UnlockedGetNodeInfo(node_name)
920
921   def _UnlockedGetNodeList(self):
922     """Return the list of nodes which are in the configuration.
923
924     This function is for internal use, when the config lock is already
925     held.
926
927     @rtype: list
928
929     """
930     return self._config_data.nodes.keys()
931
932
933   @locking.ssynchronized(_config_lock, shared=1)
934   def GetNodeList(self):
935     """Return the list of nodes which are in the configuration.
936
937     """
938     return self._UnlockedGetNodeList()
939
940   @locking.ssynchronized(_config_lock, shared=1)
941   def GetOnlineNodeList(self):
942     """Return the list of nodes which are online.
943
944     """
945     all_nodes = [self._UnlockedGetNodeInfo(node)
946                  for node in self._UnlockedGetNodeList()]
947     return [node.name for node in all_nodes if not node.offline]
948
949   @locking.ssynchronized(_config_lock, shared=1)
950   def GetAllNodesInfo(self):
951     """Get the configuration of all nodes.
952
953     @rtype: dict
954     @return: dict of (node, node_info), where node_info is what
955               would GetNodeInfo return for the node
956
957     """
958     my_dict = dict([(node, self._UnlockedGetNodeInfo(node))
959                     for node in self._UnlockedGetNodeList()])
960     return my_dict
961
962   def _UnlockedGetMasterCandidateStats(self, exceptions=None):
963     """Get the number of current and maximum desired and possible candidates.
964
965     @type exceptions: list
966     @param exceptions: if passed, list of nodes that should be ignored
967     @rtype: tuple
968     @return: tuple of (current, desired and possible)
969
970     """
971     mc_now = mc_max = 0
972     for node in self._config_data.nodes.values():
973       if exceptions and node.name in exceptions:
974         continue
975       if not (node.offline or node.drained):
976         mc_max += 1
977       if node.master_candidate:
978         mc_now += 1
979     mc_max = min(mc_max, self._config_data.cluster.candidate_pool_size)
980     return (mc_now, mc_max)
981
982   @locking.ssynchronized(_config_lock, shared=1)
983   def GetMasterCandidateStats(self, exceptions=None):
984     """Get the number of current and maximum possible candidates.
985
986     This is just a wrapper over L{_UnlockedGetMasterCandidateStats}.
987
988     @type exceptions: list
989     @param exceptions: if passed, list of nodes that should be ignored
990     @rtype: tuple
991     @return: tuple of (current, max)
992
993     """
994     return self._UnlockedGetMasterCandidateStats(exceptions)
995
996   @locking.ssynchronized(_config_lock)
997   def MaintainCandidatePool(self):
998     """Try to grow the candidate pool to the desired size.
999
1000     @rtype: list
1001     @return: list with the adjusted nodes (L{objects.Node} instances)
1002
1003     """
1004     mc_now, mc_max = self._UnlockedGetMasterCandidateStats()
1005     mod_list = []
1006     if mc_now < mc_max:
1007       node_list = self._config_data.nodes.keys()
1008       random.shuffle(node_list)
1009       for name in node_list:
1010         if mc_now >= mc_max:
1011           break
1012         node = self._config_data.nodes[name]
1013         if node.master_candidate or node.offline or node.drained:
1014           continue
1015         mod_list.append(node)
1016         node.master_candidate = True
1017         node.serial_no += 1
1018         mc_now += 1
1019       if mc_now != mc_max:
1020         # this should not happen
1021         logging.warning("Warning: MaintainCandidatePool didn't manage to"
1022                         " fill the candidate pool (%d/%d)", mc_now, mc_max)
1023       if mod_list:
1024         self._config_data.cluster.serial_no += 1
1025         self._WriteConfig()
1026
1027     return mod_list
1028
1029   def _BumpSerialNo(self):
1030     """Bump up the serial number of the config.
1031
1032     """
1033     self._config_data.serial_no += 1
1034
1035   def _OpenConfig(self):
1036     """Read the config data from disk.
1037
1038     """
1039     f = open(self._cfg_file, 'r')
1040     try:
1041       try:
1042         data = objects.ConfigData.FromDict(serializer.Load(f.read()))
1043       except Exception, err:
1044         raise errors.ConfigurationError(err)
1045     finally:
1046       f.close()
1047
1048     # Make sure the configuration has the right version
1049     _ValidateConfig(data)
1050
1051     if (not hasattr(data, 'cluster') or
1052         not hasattr(data.cluster, 'rsahostkeypub')):
1053       raise errors.ConfigurationError("Incomplete configuration"
1054                                       " (missing cluster.rsahostkeypub)")
1055     self._config_data = data
1056     # reset the last serial as -1 so that the next write will cause
1057     # ssconf update
1058     self._last_cluster_serial = -1
1059
1060   def _DistributeConfig(self):
1061     """Distribute the configuration to the other nodes.
1062
1063     Currently, this only copies the configuration file. In the future,
1064     it could be used to encapsulate the 2/3-phase update mechanism.
1065
1066     """
1067     if self._offline:
1068       return True
1069     bad = False
1070
1071     node_list = []
1072     addr_list = []
1073     myhostname = self._my_hostname
1074     # we can skip checking whether _UnlockedGetNodeInfo returns None
1075     # since the node list comes from _UnlocketGetNodeList, and we are
1076     # called with the lock held, so no modifications should take place
1077     # in between
1078     for node_name in self._UnlockedGetNodeList():
1079       if node_name == myhostname:
1080         continue
1081       node_info = self._UnlockedGetNodeInfo(node_name)
1082       if not node_info.master_candidate:
1083         continue
1084       node_list.append(node_info.name)
1085       addr_list.append(node_info.primary_ip)
1086
1087     result = rpc.RpcRunner.call_upload_file(node_list, self._cfg_file,
1088                                             address_list=addr_list)
1089     for node in node_list:
1090       if not result[node]:
1091         logging.error("copy of file %s to node %s failed",
1092                       self._cfg_file, node)
1093         bad = True
1094     return not bad
1095
1096   def _WriteConfig(self, destination=None):
1097     """Write the configuration data to persistent storage.
1098
1099     """
1100     config_errors = self._UnlockedVerifyConfig()
1101     if config_errors:
1102       raise errors.ConfigurationError("Configuration data is not"
1103                                       " consistent: %s" %
1104                                       (", ".join(config_errors)))
1105     if destination is None:
1106       destination = self._cfg_file
1107     self._BumpSerialNo()
1108     txt = serializer.Dump(self._config_data.ToDict())
1109     dir_name, file_name = os.path.split(destination)
1110     fd, name = tempfile.mkstemp('.newconfig', file_name, dir_name)
1111     f = os.fdopen(fd, 'w')
1112     try:
1113       f.write(txt)
1114       os.fsync(f.fileno())
1115     finally:
1116       f.close()
1117     # we don't need to do os.close(fd) as f.close() did it
1118     os.rename(name, destination)
1119     self.write_count += 1
1120
1121     # and redistribute the config file to master candidates
1122     self._DistributeConfig()
1123
1124     # Write ssconf files on all nodes (including locally)
1125     if self._last_cluster_serial < self._config_data.cluster.serial_no:
1126       if not self._offline:
1127         rpc.RpcRunner.call_write_ssconf_files(self._UnlockedGetNodeList(),
1128                                               self._UnlockedGetSsconfValues())
1129       self._last_cluster_serial = self._config_data.cluster.serial_no
1130
1131   def _UnlockedGetSsconfValues(self):
1132     """Return the values needed by ssconf.
1133
1134     @rtype: dict
1135     @return: a dictionary with keys the ssconf names and values their
1136         associated value
1137
1138     """
1139     fn = "\n".join
1140     instance_names = utils.NiceSort(self._UnlockedGetInstanceList())
1141     node_names = utils.NiceSort(self._UnlockedGetNodeList())
1142     node_info = [self._UnlockedGetNodeInfo(name) for name in node_names]
1143
1144     instance_data = fn(instance_names)
1145     off_data = fn(node.name for node in node_info if node.offline)
1146     on_data = fn(node.name for node in node_info if not node.offline)
1147     mc_data = fn(node.name for node in node_info if node.master_candidate)
1148     node_data = fn(node_names)
1149
1150     cluster = self._config_data.cluster
1151     cluster_tags = fn(cluster.GetTags())
1152     return {
1153       constants.SS_CLUSTER_NAME: cluster.cluster_name,
1154       constants.SS_CLUSTER_TAGS: cluster_tags,
1155       constants.SS_FILE_STORAGE_DIR: cluster.file_storage_dir,
1156       constants.SS_MASTER_CANDIDATES: mc_data,
1157       constants.SS_MASTER_IP: cluster.master_ip,
1158       constants.SS_MASTER_NETDEV: cluster.master_netdev,
1159       constants.SS_MASTER_NODE: cluster.master_node,
1160       constants.SS_NODE_LIST: node_data,
1161       constants.SS_OFFLINE_NODES: off_data,
1162       constants.SS_ONLINE_NODES: on_data,
1163       constants.SS_INSTANCE_LIST: instance_data,
1164       constants.SS_RELEASE_VERSION: constants.RELEASE_VERSION,
1165       }
1166
1167   @locking.ssynchronized(_config_lock, shared=1)
1168   def GetVGName(self):
1169     """Return the volume group name.
1170
1171     """
1172     return self._config_data.cluster.volume_group_name
1173
1174   @locking.ssynchronized(_config_lock)
1175   def SetVGName(self, vg_name):
1176     """Set the volume group name.
1177
1178     """
1179     self._config_data.cluster.volume_group_name = vg_name
1180     self._config_data.cluster.serial_no += 1
1181     self._WriteConfig()
1182
1183   @locking.ssynchronized(_config_lock, shared=1)
1184   def GetDefBridge(self):
1185     """Return the default bridge.
1186
1187     """
1188     return self._config_data.cluster.default_bridge
1189
1190   @locking.ssynchronized(_config_lock, shared=1)
1191   def GetMACPrefix(self):
1192     """Return the mac prefix.
1193
1194     """
1195     return self._config_data.cluster.mac_prefix
1196
1197   @locking.ssynchronized(_config_lock, shared=1)
1198   def GetClusterInfo(self):
1199     """Returns information about the cluster
1200
1201     @rtype: L{objects.Cluster}
1202     @return: the cluster object
1203
1204     """
1205     return self._config_data.cluster
1206
1207   @locking.ssynchronized(_config_lock)
1208   def Update(self, target):
1209     """Notify function to be called after updates.
1210
1211     This function must be called when an object (as returned by
1212     GetInstanceInfo, GetNodeInfo, GetCluster) has been updated and the
1213     caller wants the modifications saved to the backing store. Note
1214     that all modified objects will be saved, but the target argument
1215     is the one the caller wants to ensure that it's saved.
1216
1217     @param target: an instance of either L{objects.Cluster},
1218         L{objects.Node} or L{objects.Instance} which is existing in
1219         the cluster
1220
1221     """
1222     if self._config_data is None:
1223       raise errors.ProgrammerError("Configuration file not read,"
1224                                    " cannot save.")
1225     update_serial = False
1226     if isinstance(target, objects.Cluster):
1227       test = target == self._config_data.cluster
1228     elif isinstance(target, objects.Node):
1229       test = target in self._config_data.nodes.values()
1230       update_serial = True
1231     elif isinstance(target, objects.Instance):
1232       test = target in self._config_data.instances.values()
1233     else:
1234       raise errors.ProgrammerError("Invalid object type (%s) passed to"
1235                                    " ConfigWriter.Update" % type(target))
1236     if not test:
1237       raise errors.ConfigurationError("Configuration updated since object"
1238                                       " has been read or unknown object")
1239     target.serial_no += 1
1240
1241     if update_serial:
1242       # for node updates, we need to increase the cluster serial too
1243       self._config_data.cluster.serial_no += 1
1244
1245     if isinstance(target, objects.Instance):
1246       self._UnlockedReleaseDRBDMinors(target.name)
1247       for nic in target.nics:
1248         self._temporary_macs.discard(nic.mac)
1249
1250     self._WriteConfig()