ConfigWriter: add checks for duplicate disk IDs
[ganeti-local] / lib / config.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Configuration management for Ganeti
23
24 This module provides the interface to the Ganeti cluster configuration.
25
26 The configuration data is stored on every node but is updated on the master
27 only. After each update, the master distributes the data to the other nodes.
28
29 Currently, the data storage format is JSON. YAML was slow and consuming too
30 much memory.
31
32 """
33
34 import os
35 import tempfile
36 import random
37 import logging
38
39 from ganeti import errors
40 from ganeti import locking
41 from ganeti import utils
42 from ganeti import constants
43 from ganeti import rpc
44 from ganeti import objects
45 from ganeti import serializer
46
47
48 _config_lock = locking.SharedLock()
49
50
51 def _ValidateConfig(data):
52   """Verifies that a configuration objects looks valid.
53
54   This only verifies the version of the configuration.
55
56   @raise errors.ConfigurationError: if the version differs from what
57       we expect
58
59   """
60   if data.version != constants.CONFIG_VERSION:
61     raise errors.ConfigurationError("Cluster configuration version"
62                                     " mismatch, got %s instead of %s" %
63                                     (data.version,
64                                      constants.CONFIG_VERSION))
65
66
67 class ConfigWriter:
68   """The interface to the cluster configuration.
69
70   """
71   def __init__(self, cfg_file=None, offline=False):
72     self.write_count = 0
73     self._lock = _config_lock
74     self._config_data = None
75     self._offline = offline
76     if cfg_file is None:
77       self._cfg_file = constants.CLUSTER_CONF_FILE
78     else:
79       self._cfg_file = cfg_file
80     self._temporary_ids = set()
81     self._temporary_drbds = {}
82     # Note: in order to prevent errors when resolving our name in
83     # _DistributeConfig, we compute it here once and reuse it; it's
84     # better to raise an error before starting to modify the config
85     # file than after it was modified
86     self._my_hostname = utils.HostInfo().name
87     self._last_cluster_serial = -1
88     self._OpenConfig()
89
90   # this method needs to be static, so that we can call it on the class
91   @staticmethod
92   def IsCluster():
93     """Check if the cluster is configured.
94
95     """
96     return os.path.exists(constants.CLUSTER_CONF_FILE)
97
98   @locking.ssynchronized(_config_lock, shared=1)
99   def GenerateMAC(self):
100     """Generate a MAC for an instance.
101
102     This should check the current instances for duplicates.
103
104     """
105     prefix = self._config_data.cluster.mac_prefix
106     all_macs = self._AllMACs()
107     retries = 64
108     while retries > 0:
109       byte1 = random.randrange(0, 256)
110       byte2 = random.randrange(0, 256)
111       byte3 = random.randrange(0, 256)
112       mac = "%s:%02x:%02x:%02x" % (prefix, byte1, byte2, byte3)
113       if mac not in all_macs:
114         break
115       retries -= 1
116     else:
117       raise errors.ConfigurationError("Can't generate unique MAC")
118     return mac
119
120   @locking.ssynchronized(_config_lock, shared=1)
121   def IsMacInUse(self, mac):
122     """Predicate: check if the specified MAC is in use in the Ganeti cluster.
123
124     This only checks instances managed by this cluster, it does not
125     check for potential collisions elsewhere.
126
127     """
128     all_macs = self._AllMACs()
129     return mac in all_macs
130
131   @locking.ssynchronized(_config_lock, shared=1)
132   def GenerateDRBDSecret(self):
133     """Generate a DRBD secret.
134
135     This checks the current disks for duplicates.
136
137     """
138     all_secrets = self._AllDRBDSecrets()
139     retries = 64
140     while retries > 0:
141       secret = utils.GenerateSecret()
142       if secret not in all_secrets:
143         break
144       retries -= 1
145     else:
146       raise errors.ConfigurationError("Can't generate unique DRBD secret")
147     return secret
148
149   def _ComputeAllLVs(self):
150     """Compute the list of all LVs.
151
152     """
153     lvnames = set()
154     for instance in self._config_data.instances.values():
155       node_data = instance.MapLVsByNode()
156       for lv_list in node_data.values():
157         lvnames.update(lv_list)
158     return lvnames
159
160   @locking.ssynchronized(_config_lock, shared=1)
161   def GenerateUniqueID(self, exceptions=None):
162     """Generate an unique disk name.
163
164     This checks the current node, instances and disk names for
165     duplicates.
166
167     @param exceptions: a list with some other names which should be checked
168         for uniqueness (used for example when you want to get
169         more than one id at one time without adding each one in
170         turn to the config file)
171
172     @rtype: string
173     @return: the unique id
174
175     """
176     existing = set()
177     existing.update(self._temporary_ids)
178     existing.update(self._ComputeAllLVs())
179     existing.update(self._config_data.instances.keys())
180     existing.update(self._config_data.nodes.keys())
181     if exceptions is not None:
182       existing.update(exceptions)
183     retries = 64
184     while retries > 0:
185       unique_id = utils.NewUUID()
186       if unique_id not in existing and unique_id is not None:
187         break
188     else:
189       raise errors.ConfigurationError("Not able generate an unique ID"
190                                       " (last tried ID: %s" % unique_id)
191     self._temporary_ids.add(unique_id)
192     return unique_id
193
194   def _AllMACs(self):
195     """Return all MACs present in the config.
196
197     @rtype: list
198     @return: the list of all MACs
199
200     """
201     result = []
202     for instance in self._config_data.instances.values():
203       for nic in instance.nics:
204         result.append(nic.mac)
205
206     return result
207
208   def _AllDRBDSecrets(self):
209     """Return all DRBD secrets present in the config.
210
211     @rtype: list
212     @return: the list of all DRBD secrets
213
214     """
215     def helper(disk, result):
216       """Recursively gather secrets from this disk."""
217       if disk.dev_type == constants.DT_DRBD8:
218         result.append(disk.logical_id[5])
219       if disk.children:
220         for child in disk.children:
221           helper(child, result)
222
223     result = []
224     for instance in self._config_data.instances.values():
225       for disk in instance.disks:
226         helper(disk, result)
227
228     return result
229
230   def _CheckDiskIDs(self, disk, l_ids, p_ids):
231     """Compute duplicate disk IDs
232
233     @type disk: L{objects.Disk}
234     @param disk: the disk at which to start searching
235     @type l_ids: list
236     @param l_ids: list of current logical ids
237     @type p_ids: list
238     @param p_ids: list of current physical ids
239     @rtype: list
240     @return: a list of error messages
241
242     """
243     result = []
244     if disk.logical_id in l_ids:
245       result.append("duplicate logical id %s" % str(disk.logical_id))
246     else:
247       l_ids.append(disk.logical_id)
248     if disk.physical_id in p_ids:
249       result.append("duplicate physical id %s" % str(disk.physical_id))
250     else:
251       p_ids.append(disk.physical_id)
252
253     if disk.children:
254       for child in disk.children:
255         result.extend(self._CheckDiskIDs(child, l_ids, p_ids))
256     return result
257
258   def _UnlockedVerifyConfig(self):
259     """Verify function.
260
261     @rtype: list
262     @return: a list of error messages; a non-empty list signifies
263         configuration errors
264
265     """
266     result = []
267     seen_macs = []
268     ports = {}
269     data = self._config_data
270     seen_lids = []
271     seen_pids = []
272     for instance_name in data.instances:
273       instance = data.instances[instance_name]
274       if instance.primary_node not in data.nodes:
275         result.append("instance '%s' has invalid primary node '%s'" %
276                       (instance_name, instance.primary_node))
277       for snode in instance.secondary_nodes:
278         if snode not in data.nodes:
279           result.append("instance '%s' has invalid secondary node '%s'" %
280                         (instance_name, snode))
281       for idx, nic in enumerate(instance.nics):
282         if nic.mac in seen_macs:
283           result.append("instance '%s' has NIC %d mac %s duplicate" %
284                         (instance_name, idx, nic.mac))
285         else:
286           seen_macs.append(nic.mac)
287
288       # gather the drbd ports for duplicate checks
289       for dsk in instance.disks:
290         if dsk.dev_type in constants.LDS_DRBD:
291           tcp_port = dsk.logical_id[2]
292           if tcp_port not in ports:
293             ports[tcp_port] = []
294           ports[tcp_port].append((instance.name, "drbd disk %s" % dsk.iv_name))
295       # gather network port reservation
296       net_port = getattr(instance, "network_port", None)
297       if net_port is not None:
298         if net_port not in ports:
299           ports[net_port] = []
300         ports[net_port].append((instance.name, "network port"))
301
302       # instance disk verify
303       for idx, disk in enumerate(instance.disks):
304         result.extend(["instance '%s' disk %d error: %s" %
305                        (instance.name, idx, msg) for msg in disk.Verify()])
306         result.extend(self._CheckDiskIDs(disk, seen_lids, seen_pids))
307
308     # cluster-wide pool of free ports
309     for free_port in data.cluster.tcpudp_port_pool:
310       if free_port not in ports:
311         ports[free_port] = []
312       ports[free_port].append(("cluster", "port marked as free"))
313
314     # compute tcp/udp duplicate ports
315     keys = ports.keys()
316     keys.sort()
317     for pnum in keys:
318       pdata = ports[pnum]
319       if len(pdata) > 1:
320         txt = ", ".join(["%s/%s" % val for val in pdata])
321         result.append("tcp/udp port %s has duplicates: %s" % (pnum, txt))
322
323     # highest used tcp port check
324     if keys:
325       if keys[-1] > data.cluster.highest_used_port:
326         result.append("Highest used port mismatch, saved %s, computed %s" %
327                       (data.cluster.highest_used_port, keys[-1]))
328
329     if not data.nodes[data.cluster.master_node].master_candidate:
330       result.append("Master node is not a master candidate")
331
332     # master candidate checks
333     mc_now, mc_max = self._UnlockedGetMasterCandidateStats()
334     if mc_now < mc_max:
335       result.append("Not enough master candidates: actual %d, target %d" %
336                     (mc_now, mc_max))
337
338     # node checks
339     for node in data.nodes.values():
340       if [node.master_candidate, node.drained, node.offline].count(True) > 1:
341         result.append("Node %s state is invalid: master_candidate=%s,"
342                       " drain=%s, offline=%s" %
343                       (node.name, node.master_candidate, node.drain,
344                        node.offline))
345
346     # drbd minors check
347     d_map, duplicates = self._UnlockedComputeDRBDMap()
348     for node, minor, instance_a, instance_b in duplicates:
349       result.append("DRBD minor %d on node %s is assigned twice to instances"
350                     " %s and %s" % (minor, node, instance_a, instance_b))
351
352     return result
353
354   @locking.ssynchronized(_config_lock, shared=1)
355   def VerifyConfig(self):
356     """Verify function.
357
358     This is just a wrapper over L{_UnlockedVerifyConfig}.
359
360     @rtype: list
361     @return: a list of error messages; a non-empty list signifies
362         configuration errors
363
364     """
365     return self._UnlockedVerifyConfig()
366
367   def _UnlockedSetDiskID(self, disk, node_name):
368     """Convert the unique ID to the ID needed on the target nodes.
369
370     This is used only for drbd, which needs ip/port configuration.
371
372     The routine descends down and updates its children also, because
373     this helps when the only the top device is passed to the remote
374     node.
375
376     This function is for internal use, when the config lock is already held.
377
378     """
379     if disk.children:
380       for child in disk.children:
381         self._UnlockedSetDiskID(child, node_name)
382
383     if disk.logical_id is None and disk.physical_id is not None:
384       return
385     if disk.dev_type == constants.LD_DRBD8:
386       pnode, snode, port, pminor, sminor, secret = disk.logical_id
387       if node_name not in (pnode, snode):
388         raise errors.ConfigurationError("DRBD device not knowing node %s" %
389                                         node_name)
390       pnode_info = self._UnlockedGetNodeInfo(pnode)
391       snode_info = self._UnlockedGetNodeInfo(snode)
392       if pnode_info is None or snode_info is None:
393         raise errors.ConfigurationError("Can't find primary or secondary node"
394                                         " for %s" % str(disk))
395       p_data = (pnode_info.secondary_ip, port)
396       s_data = (snode_info.secondary_ip, port)
397       if pnode == node_name:
398         disk.physical_id = p_data + s_data + (pminor, secret)
399       else: # it must be secondary, we tested above
400         disk.physical_id = s_data + p_data + (sminor, secret)
401     else:
402       disk.physical_id = disk.logical_id
403     return
404
405   @locking.ssynchronized(_config_lock)
406   def SetDiskID(self, disk, node_name):
407     """Convert the unique ID to the ID needed on the target nodes.
408
409     This is used only for drbd, which needs ip/port configuration.
410
411     The routine descends down and updates its children also, because
412     this helps when the only the top device is passed to the remote
413     node.
414
415     """
416     return self._UnlockedSetDiskID(disk, node_name)
417
418   @locking.ssynchronized(_config_lock)
419   def AddTcpUdpPort(self, port):
420     """Adds a new port to the available port pool.
421
422     """
423     if not isinstance(port, int):
424       raise errors.ProgrammerError("Invalid type passed for port")
425
426     self._config_data.cluster.tcpudp_port_pool.add(port)
427     self._WriteConfig()
428
429   @locking.ssynchronized(_config_lock, shared=1)
430   def GetPortList(self):
431     """Returns a copy of the current port list.
432
433     """
434     return self._config_data.cluster.tcpudp_port_pool.copy()
435
436   @locking.ssynchronized(_config_lock)
437   def AllocatePort(self):
438     """Allocate a port.
439
440     The port will be taken from the available port pool or from the
441     default port range (and in this case we increase
442     highest_used_port).
443
444     """
445     # If there are TCP/IP ports configured, we use them first.
446     if self._config_data.cluster.tcpudp_port_pool:
447       port = self._config_data.cluster.tcpudp_port_pool.pop()
448     else:
449       port = self._config_data.cluster.highest_used_port + 1
450       if port >= constants.LAST_DRBD_PORT:
451         raise errors.ConfigurationError("The highest used port is greater"
452                                         " than %s. Aborting." %
453                                         constants.LAST_DRBD_PORT)
454       self._config_data.cluster.highest_used_port = port
455
456     self._WriteConfig()
457     return port
458
459   def _UnlockedComputeDRBDMap(self):
460     """Compute the used DRBD minor/nodes.
461
462     @rtype: (dict, list)
463     @return: dictionary of node_name: dict of minor: instance_name;
464         the returned dict will have all the nodes in it (even if with
465         an empty list), and a list of duplicates; if the duplicates
466         list is not empty, the configuration is corrupted and its caller
467         should raise an exception
468
469     """
470     def _AppendUsedPorts(instance_name, disk, used):
471       duplicates = []
472       if disk.dev_type == constants.LD_DRBD8 and len(disk.logical_id) >= 5:
473         nodeA, nodeB, dummy, minorA, minorB = disk.logical_id[:5]
474         for node, port in ((nodeA, minorA), (nodeB, minorB)):
475           assert node in used, ("Node '%s' of instance '%s' not found"
476                                 " in node list" % (node, instance_name))
477           if port in used[node]:
478             duplicates.append((node, port, instance_name, used[node][port]))
479           else:
480             used[node][port] = instance_name
481       if disk.children:
482         for child in disk.children:
483           duplicates.extend(_AppendUsedPorts(instance_name, child, used))
484       return duplicates
485
486     duplicates = []
487     my_dict = dict((node, {}) for node in self._config_data.nodes)
488     for instance in self._config_data.instances.itervalues():
489       for disk in instance.disks:
490         duplicates.extend(_AppendUsedPorts(instance.name, disk, my_dict))
491     for (node, minor), instance in self._temporary_drbds.iteritems():
492       if minor in my_dict[node] and my_dict[node][minor] != instance:
493         duplicates.append((node, minor, instance, my_dict[node][minor]))
494       else:
495         my_dict[node][minor] = instance
496     return my_dict, duplicates
497
498   @locking.ssynchronized(_config_lock)
499   def ComputeDRBDMap(self):
500     """Compute the used DRBD minor/nodes.
501
502     This is just a wrapper over L{_UnlockedComputeDRBDMap}.
503
504     @return: dictionary of node_name: dict of minor: instance_name;
505         the returned dict will have all the nodes in it (even if with
506         an empty list).
507
508     """
509     d_map, duplicates = self._UnlockedComputeDRBDMap()
510     if duplicates:
511       raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
512                                       str(duplicates))
513     return d_map
514
515   @locking.ssynchronized(_config_lock)
516   def AllocateDRBDMinor(self, nodes, instance):
517     """Allocate a drbd minor.
518
519     The free minor will be automatically computed from the existing
520     devices. A node can be given multiple times in order to allocate
521     multiple minors. The result is the list of minors, in the same
522     order as the passed nodes.
523
524     @type instance: string
525     @param instance: the instance for which we allocate minors
526
527     """
528     assert isinstance(instance, basestring), \
529            "Invalid argument '%s' passed to AllocateDRBDMinor" % instance
530
531     d_map, duplicates = self._UnlockedComputeDRBDMap()
532     if duplicates:
533       raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
534                                       str(duplicates))
535     result = []
536     for nname in nodes:
537       ndata = d_map[nname]
538       if not ndata:
539         # no minors used, we can start at 0
540         result.append(0)
541         ndata[0] = instance
542         self._temporary_drbds[(nname, 0)] = instance
543         continue
544       keys = ndata.keys()
545       keys.sort()
546       ffree = utils.FirstFree(keys)
547       if ffree is None:
548         # return the next minor
549         # TODO: implement high-limit check
550         minor = keys[-1] + 1
551       else:
552         minor = ffree
553       # double-check minor against current instances
554       assert minor not in d_map[nname], \
555              ("Attempt to reuse allocated DRBD minor %d on node %s,"
556               " already allocated to instance %s" %
557               (minor, nname, d_map[nname][minor]))
558       ndata[minor] = instance
559       # double-check minor against reservation
560       r_key = (nname, minor)
561       assert r_key not in self._temporary_drbds, \
562              ("Attempt to reuse reserved DRBD minor %d on node %s,"
563               " reserved for instance %s" %
564               (minor, nname, self._temporary_drbds[r_key]))
565       self._temporary_drbds[r_key] = instance
566       result.append(minor)
567     logging.debug("Request to allocate drbd minors, input: %s, returning %s",
568                   nodes, result)
569     return result
570
571   def _UnlockedReleaseDRBDMinors(self, instance):
572     """Release temporary drbd minors allocated for a given instance.
573
574     @type instance: string
575     @param instance: the instance for which temporary minors should be
576                      released
577
578     """
579     assert isinstance(instance, basestring), \
580            "Invalid argument passed to ReleaseDRBDMinors"
581     for key, name in self._temporary_drbds.items():
582       if name == instance:
583         del self._temporary_drbds[key]
584
585   @locking.ssynchronized(_config_lock)
586   def ReleaseDRBDMinors(self, instance):
587     """Release temporary drbd minors allocated for a given instance.
588
589     This should be called on the error paths, on the success paths
590     it's automatically called by the ConfigWriter add and update
591     functions.
592
593     This function is just a wrapper over L{_UnlockedReleaseDRBDMinors}.
594
595     @type instance: string
596     @param instance: the instance for which temporary minors should be
597                      released
598
599     """
600     self._UnlockedReleaseDRBDMinors(instance)
601
602   @locking.ssynchronized(_config_lock, shared=1)
603   def GetConfigVersion(self):
604     """Get the configuration version.
605
606     @return: Config version
607
608     """
609     return self._config_data.version
610
611   @locking.ssynchronized(_config_lock, shared=1)
612   def GetClusterName(self):
613     """Get cluster name.
614
615     @return: Cluster name
616
617     """
618     return self._config_data.cluster.cluster_name
619
620   @locking.ssynchronized(_config_lock, shared=1)
621   def GetMasterNode(self):
622     """Get the hostname of the master node for this cluster.
623
624     @return: Master hostname
625
626     """
627     return self._config_data.cluster.master_node
628
629   @locking.ssynchronized(_config_lock, shared=1)
630   def GetMasterIP(self):
631     """Get the IP of the master node for this cluster.
632
633     @return: Master IP
634
635     """
636     return self._config_data.cluster.master_ip
637
638   @locking.ssynchronized(_config_lock, shared=1)
639   def GetMasterNetdev(self):
640     """Get the master network device for this cluster.
641
642     """
643     return self._config_data.cluster.master_netdev
644
645   @locking.ssynchronized(_config_lock, shared=1)
646   def GetFileStorageDir(self):
647     """Get the file storage dir for this cluster.
648
649     """
650     return self._config_data.cluster.file_storage_dir
651
652   @locking.ssynchronized(_config_lock, shared=1)
653   def GetHypervisorType(self):
654     """Get the hypervisor type for this cluster.
655
656     """
657     return self._config_data.cluster.default_hypervisor
658
659   @locking.ssynchronized(_config_lock, shared=1)
660   def GetHostKey(self):
661     """Return the rsa hostkey from the config.
662
663     @rtype: string
664     @return: the rsa hostkey
665
666     """
667     return self._config_data.cluster.rsahostkeypub
668
669   @locking.ssynchronized(_config_lock)
670   def AddInstance(self, instance):
671     """Add an instance to the config.
672
673     This should be used after creating a new instance.
674
675     @type instance: L{objects.Instance}
676     @param instance: the instance object
677
678     """
679     if not isinstance(instance, objects.Instance):
680       raise errors.ProgrammerError("Invalid type passed to AddInstance")
681
682     if instance.disk_template != constants.DT_DISKLESS:
683       all_lvs = instance.MapLVsByNode()
684       logging.info("Instance '%s' DISK_LAYOUT: %s", instance.name, all_lvs)
685
686     instance.serial_no = 1
687     self._config_data.instances[instance.name] = instance
688     self._config_data.cluster.serial_no += 1
689     self._UnlockedReleaseDRBDMinors(instance.name)
690     self._WriteConfig()
691
692   def _SetInstanceStatus(self, instance_name, status):
693     """Set the instance's status to a given value.
694
695     """
696     assert isinstance(status, bool), \
697            "Invalid status '%s' passed to SetInstanceStatus" % (status,)
698
699     if instance_name not in self._config_data.instances:
700       raise errors.ConfigurationError("Unknown instance '%s'" %
701                                       instance_name)
702     instance = self._config_data.instances[instance_name]
703     if instance.admin_up != status:
704       instance.admin_up = status
705       instance.serial_no += 1
706       self._WriteConfig()
707
708   @locking.ssynchronized(_config_lock)
709   def MarkInstanceUp(self, instance_name):
710     """Mark the instance status to up in the config.
711
712     """
713     self._SetInstanceStatus(instance_name, True)
714
715   @locking.ssynchronized(_config_lock)
716   def RemoveInstance(self, instance_name):
717     """Remove the instance from the configuration.
718
719     """
720     if instance_name not in self._config_data.instances:
721       raise errors.ConfigurationError("Unknown instance '%s'" % instance_name)
722     del self._config_data.instances[instance_name]
723     self._config_data.cluster.serial_no += 1
724     self._WriteConfig()
725
726   @locking.ssynchronized(_config_lock)
727   def RenameInstance(self, old_name, new_name):
728     """Rename an instance.
729
730     This needs to be done in ConfigWriter and not by RemoveInstance
731     combined with AddInstance as only we can guarantee an atomic
732     rename.
733
734     """
735     if old_name not in self._config_data.instances:
736       raise errors.ConfigurationError("Unknown instance '%s'" % old_name)
737     inst = self._config_data.instances[old_name]
738     del self._config_data.instances[old_name]
739     inst.name = new_name
740
741     for disk in inst.disks:
742       if disk.dev_type == constants.LD_FILE:
743         # rename the file paths in logical and physical id
744         file_storage_dir = os.path.dirname(os.path.dirname(disk.logical_id[1]))
745         disk.physical_id = disk.logical_id = (disk.logical_id[0],
746                                               os.path.join(file_storage_dir,
747                                                            inst.name,
748                                                            disk.iv_name))
749
750     self._config_data.instances[inst.name] = inst
751     self._WriteConfig()
752
753   @locking.ssynchronized(_config_lock)
754   def MarkInstanceDown(self, instance_name):
755     """Mark the status of an instance to down in the configuration.
756
757     """
758     self._SetInstanceStatus(instance_name, False)
759
760   def _UnlockedGetInstanceList(self):
761     """Get the list of instances.
762
763     This function is for internal use, when the config lock is already held.
764
765     """
766     return self._config_data.instances.keys()
767
768   @locking.ssynchronized(_config_lock, shared=1)
769   def GetInstanceList(self):
770     """Get the list of instances.
771
772     @return: array of instances, ex. ['instance2.example.com',
773         'instance1.example.com']
774
775     """
776     return self._UnlockedGetInstanceList()
777
778   @locking.ssynchronized(_config_lock, shared=1)
779   def ExpandInstanceName(self, short_name):
780     """Attempt to expand an incomplete instance name.
781
782     """
783     return utils.MatchNameComponent(short_name,
784                                     self._config_data.instances.keys())
785
786   def _UnlockedGetInstanceInfo(self, instance_name):
787     """Returns informations about an instance.
788
789     This function is for internal use, when the config lock is already held.
790
791     """
792     if instance_name not in self._config_data.instances:
793       return None
794
795     return self._config_data.instances[instance_name]
796
797   @locking.ssynchronized(_config_lock, shared=1)
798   def GetInstanceInfo(self, instance_name):
799     """Returns informations about an instance.
800
801     It takes the information from the configuration file. Other informations of
802     an instance are taken from the live systems.
803
804     @param instance_name: name of the instance, e.g.
805         I{instance1.example.com}
806
807     @rtype: L{objects.Instance}
808     @return: the instance object
809
810     """
811     return self._UnlockedGetInstanceInfo(instance_name)
812
813   @locking.ssynchronized(_config_lock, shared=1)
814   def GetAllInstancesInfo(self):
815     """Get the configuration of all instances.
816
817     @rtype: dict
818     @returns: dict of (instance, instance_info), where instance_info is what
819               would GetInstanceInfo return for the node
820
821     """
822     my_dict = dict([(instance, self._UnlockedGetInstanceInfo(instance))
823                     for instance in self._UnlockedGetInstanceList()])
824     return my_dict
825
826   @locking.ssynchronized(_config_lock)
827   def AddNode(self, node):
828     """Add a node to the configuration.
829
830     @type node: L{objects.Node}
831     @param node: a Node instance
832
833     """
834     logging.info("Adding node %s to configuration" % node.name)
835
836     node.serial_no = 1
837     self._config_data.nodes[node.name] = node
838     self._config_data.cluster.serial_no += 1
839     self._WriteConfig()
840
841   @locking.ssynchronized(_config_lock)
842   def RemoveNode(self, node_name):
843     """Remove a node from the configuration.
844
845     """
846     logging.info("Removing node %s from configuration" % node_name)
847
848     if node_name not in self._config_data.nodes:
849       raise errors.ConfigurationError("Unknown node '%s'" % node_name)
850
851     del self._config_data.nodes[node_name]
852     self._config_data.cluster.serial_no += 1
853     self._WriteConfig()
854
855   @locking.ssynchronized(_config_lock, shared=1)
856   def ExpandNodeName(self, short_name):
857     """Attempt to expand an incomplete instance name.
858
859     """
860     return utils.MatchNameComponent(short_name,
861                                     self._config_data.nodes.keys())
862
863   def _UnlockedGetNodeInfo(self, node_name):
864     """Get the configuration of a node, as stored in the config.
865
866     This function is for internal use, when the config lock is already
867     held.
868
869     @param node_name: the node name, e.g. I{node1.example.com}
870
871     @rtype: L{objects.Node}
872     @return: the node object
873
874     """
875     if node_name not in self._config_data.nodes:
876       return None
877
878     return self._config_data.nodes[node_name]
879
880
881   @locking.ssynchronized(_config_lock, shared=1)
882   def GetNodeInfo(self, node_name):
883     """Get the configuration of a node, as stored in the config.
884
885     This is just a locked wrapper over L{_UnlockedGetNodeInfo}.
886
887     @param node_name: the node name, e.g. I{node1.example.com}
888
889     @rtype: L{objects.Node}
890     @return: the node object
891
892     """
893     return self._UnlockedGetNodeInfo(node_name)
894
895   def _UnlockedGetNodeList(self):
896     """Return the list of nodes which are in the configuration.
897
898     This function is for internal use, when the config lock is already
899     held.
900
901     @rtype: list
902
903     """
904     return self._config_data.nodes.keys()
905
906
907   @locking.ssynchronized(_config_lock, shared=1)
908   def GetNodeList(self):
909     """Return the list of nodes which are in the configuration.
910
911     """
912     return self._UnlockedGetNodeList()
913
914   @locking.ssynchronized(_config_lock, shared=1)
915   def GetOnlineNodeList(self):
916     """Return the list of nodes which are online.
917
918     """
919     all_nodes = [self._UnlockedGetNodeInfo(node)
920                  for node in self._UnlockedGetNodeList()]
921     return [node.name for node in all_nodes if not node.offline]
922
923   @locking.ssynchronized(_config_lock, shared=1)
924   def GetAllNodesInfo(self):
925     """Get the configuration of all nodes.
926
927     @rtype: dict
928     @return: dict of (node, node_info), where node_info is what
929               would GetNodeInfo return for the node
930
931     """
932     my_dict = dict([(node, self._UnlockedGetNodeInfo(node))
933                     for node in self._UnlockedGetNodeList()])
934     return my_dict
935
936   def _UnlockedGetMasterCandidateStats(self):
937     """Get the number of current and maximum desired and possible candidates.
938
939     @rtype: tuple
940     @return: tuple of (current, desired and possible)
941
942     """
943     mc_now = mc_max = 0
944     for node in self._config_data.nodes.itervalues():
945       if not (node.offline or node.drained):
946         mc_max += 1
947       if node.master_candidate:
948         mc_now += 1
949     mc_max = min(mc_max, self._config_data.cluster.candidate_pool_size)
950     return (mc_now, mc_max)
951
952   @locking.ssynchronized(_config_lock, shared=1)
953   def GetMasterCandidateStats(self):
954     """Get the number of current and maximum possible candidates.
955
956     This is just a wrapper over L{_UnlockedGetMasterCandidateStats}.
957
958     @rtype: tuple
959     @return: tuple of (current, max)
960
961     """
962     return self._UnlockedGetMasterCandidateStats()
963
964   @locking.ssynchronized(_config_lock)
965   def MaintainCandidatePool(self):
966     """Try to grow the candidate pool to the desired size.
967
968     @rtype: list
969     @return: list with the adjusted nodes (L{objects.Node} instances)
970
971     """
972     mc_now, mc_max = self._UnlockedGetMasterCandidateStats()
973     mod_list = []
974     if mc_now < mc_max:
975       node_list = self._config_data.nodes.keys()
976       random.shuffle(node_list)
977       for name in node_list:
978         if mc_now >= mc_max:
979           break
980         node = self._config_data.nodes[name]
981         if node.master_candidate or node.offline or node.drained:
982           continue
983         mod_list.append(node)
984         node.master_candidate = True
985         node.serial_no += 1
986         mc_now += 1
987       if mc_now != mc_max:
988         # this should not happen
989         logging.warning("Warning: MaintainCandidatePool didn't manage to"
990                         " fill the candidate pool (%d/%d)", mc_now, mc_max)
991       if mod_list:
992         self._config_data.cluster.serial_no += 1
993         self._WriteConfig()
994
995     return mod_list
996
997   def _BumpSerialNo(self):
998     """Bump up the serial number of the config.
999
1000     """
1001     self._config_data.serial_no += 1
1002
1003   def _OpenConfig(self):
1004     """Read the config data from disk.
1005
1006     """
1007     f = open(self._cfg_file, 'r')
1008     try:
1009       try:
1010         data = objects.ConfigData.FromDict(serializer.Load(f.read()))
1011       except Exception, err:
1012         raise errors.ConfigurationError(err)
1013     finally:
1014       f.close()
1015
1016     # Make sure the configuration has the right version
1017     _ValidateConfig(data)
1018
1019     if (not hasattr(data, 'cluster') or
1020         not hasattr(data.cluster, 'rsahostkeypub')):
1021       raise errors.ConfigurationError("Incomplete configuration"
1022                                       " (missing cluster.rsahostkeypub)")
1023     self._config_data = data
1024     # reset the last serial as -1 so that the next write will cause
1025     # ssconf update
1026     self._last_cluster_serial = -1
1027
1028   def _DistributeConfig(self):
1029     """Distribute the configuration to the other nodes.
1030
1031     Currently, this only copies the configuration file. In the future,
1032     it could be used to encapsulate the 2/3-phase update mechanism.
1033
1034     """
1035     if self._offline:
1036       return True
1037     bad = False
1038
1039     node_list = []
1040     addr_list = []
1041     myhostname = self._my_hostname
1042     # we can skip checking whether _UnlockedGetNodeInfo returns None
1043     # since the node list comes from _UnlocketGetNodeList, and we are
1044     # called with the lock held, so no modifications should take place
1045     # in between
1046     for node_name in self._UnlockedGetNodeList():
1047       if node_name == myhostname:
1048         continue
1049       node_info = self._UnlockedGetNodeInfo(node_name)
1050       if not node_info.master_candidate:
1051         continue
1052       node_list.append(node_info.name)
1053       addr_list.append(node_info.primary_ip)
1054
1055     result = rpc.RpcRunner.call_upload_file(node_list, self._cfg_file,
1056                                             address_list=addr_list)
1057     for node in node_list:
1058       if not result[node]:
1059         logging.error("copy of file %s to node %s failed",
1060                       self._cfg_file, node)
1061         bad = True
1062     return not bad
1063
1064   def _WriteConfig(self, destination=None):
1065     """Write the configuration data to persistent storage.
1066
1067     """
1068     config_errors = self._UnlockedVerifyConfig()
1069     if config_errors:
1070       raise errors.ConfigurationError("Configuration data is not"
1071                                       " consistent: %s" %
1072                                       (", ".join(config_errors)))
1073     if destination is None:
1074       destination = self._cfg_file
1075     self._BumpSerialNo()
1076     txt = serializer.Dump(self._config_data.ToDict())
1077     dir_name, file_name = os.path.split(destination)
1078     fd, name = tempfile.mkstemp('.newconfig', file_name, dir_name)
1079     f = os.fdopen(fd, 'w')
1080     try:
1081       f.write(txt)
1082       os.fsync(f.fileno())
1083     finally:
1084       f.close()
1085     # we don't need to do os.close(fd) as f.close() did it
1086     os.rename(name, destination)
1087     self.write_count += 1
1088
1089     # and redistribute the config file to master candidates
1090     self._DistributeConfig()
1091
1092     # Write ssconf files on all nodes (including locally)
1093     if self._last_cluster_serial < self._config_data.cluster.serial_no:
1094       if not self._offline:
1095         rpc.RpcRunner.call_write_ssconf_files(self._UnlockedGetNodeList(),
1096                                               self._UnlockedGetSsconfValues())
1097       self._last_cluster_serial = self._config_data.cluster.serial_no
1098
1099   def _UnlockedGetSsconfValues(self):
1100     """Return the values needed by ssconf.
1101
1102     @rtype: dict
1103     @return: a dictionary with keys the ssconf names and values their
1104         associated value
1105
1106     """
1107     fn = "\n".join
1108     instance_names = utils.NiceSort(self._UnlockedGetInstanceList())
1109     node_names = utils.NiceSort(self._UnlockedGetNodeList())
1110     node_info = [self._UnlockedGetNodeInfo(name) for name in node_names]
1111
1112     instance_data = fn(instance_names)
1113     off_data = fn(node.name for node in node_info if node.offline)
1114     on_data = fn(node.name for node in node_info if not node.offline)
1115     mc_data = fn(node.name for node in node_info if node.master_candidate)
1116     node_data = fn(node_names)
1117
1118     cluster = self._config_data.cluster
1119     return {
1120       constants.SS_CLUSTER_NAME: cluster.cluster_name,
1121       constants.SS_FILE_STORAGE_DIR: cluster.file_storage_dir,
1122       constants.SS_MASTER_CANDIDATES: mc_data,
1123       constants.SS_MASTER_IP: cluster.master_ip,
1124       constants.SS_MASTER_NETDEV: cluster.master_netdev,
1125       constants.SS_MASTER_NODE: cluster.master_node,
1126       constants.SS_NODE_LIST: node_data,
1127       constants.SS_OFFLINE_NODES: off_data,
1128       constants.SS_ONLINE_NODES: on_data,
1129       constants.SS_INSTANCE_LIST: instance_data,
1130       constants.SS_RELEASE_VERSION: constants.RELEASE_VERSION,
1131       }
1132
1133   @locking.ssynchronized(_config_lock)
1134   def InitConfig(self, version, cluster_config, master_node_config):
1135     """Create the initial cluster configuration.
1136
1137     It will contain the current node, which will also be the master
1138     node, and no instances.
1139
1140     @type version: int
1141     @param version: Configuration version
1142     @type cluster_config: objects.Cluster
1143     @param cluster_config: Cluster configuration
1144     @type master_node_config: objects.Node
1145     @param master_node_config: Master node configuration
1146
1147     """
1148     nodes = {
1149       master_node_config.name: master_node_config,
1150       }
1151
1152     self._config_data = objects.ConfigData(version=version,
1153                                            cluster=cluster_config,
1154                                            nodes=nodes,
1155                                            instances={},
1156                                            serial_no=1)
1157     self._WriteConfig()
1158
1159   @locking.ssynchronized(_config_lock, shared=1)
1160   def GetVGName(self):
1161     """Return the volume group name.
1162
1163     """
1164     return self._config_data.cluster.volume_group_name
1165
1166   @locking.ssynchronized(_config_lock)
1167   def SetVGName(self, vg_name):
1168     """Set the volume group name.
1169
1170     """
1171     self._config_data.cluster.volume_group_name = vg_name
1172     self._config_data.cluster.serial_no += 1
1173     self._WriteConfig()
1174
1175   @locking.ssynchronized(_config_lock, shared=1)
1176   def GetDefBridge(self):
1177     """Return the default bridge.
1178
1179     """
1180     return self._config_data.cluster.default_bridge
1181
1182   @locking.ssynchronized(_config_lock, shared=1)
1183   def GetMACPrefix(self):
1184     """Return the mac prefix.
1185
1186     """
1187     return self._config_data.cluster.mac_prefix
1188
1189   @locking.ssynchronized(_config_lock, shared=1)
1190   def GetClusterInfo(self):
1191     """Returns informations about the cluster
1192
1193     @rtype: L{objects.Cluster}
1194     @return: the cluster object
1195
1196     """
1197     return self._config_data.cluster
1198
1199   @locking.ssynchronized(_config_lock)
1200   def Update(self, target):
1201     """Notify function to be called after updates.
1202
1203     This function must be called when an object (as returned by
1204     GetInstanceInfo, GetNodeInfo, GetCluster) has been updated and the
1205     caller wants the modifications saved to the backing store. Note
1206     that all modified objects will be saved, but the target argument
1207     is the one the caller wants to ensure that it's saved.
1208
1209     @param target: an instance of either L{objects.Cluster},
1210         L{objects.Node} or L{objects.Instance} which is existing in
1211         the cluster
1212
1213     """
1214     if self._config_data is None:
1215       raise errors.ProgrammerError("Configuration file not read,"
1216                                    " cannot save.")
1217     update_serial = False
1218     if isinstance(target, objects.Cluster):
1219       test = target == self._config_data.cluster
1220     elif isinstance(target, objects.Node):
1221       test = target in self._config_data.nodes.values()
1222       update_serial = True
1223     elif isinstance(target, objects.Instance):
1224       test = target in self._config_data.instances.values()
1225     else:
1226       raise errors.ProgrammerError("Invalid object type (%s) passed to"
1227                                    " ConfigWriter.Update" % type(target))
1228     if not test:
1229       raise errors.ConfigurationError("Configuration updated since object"
1230                                       " has been read or unknown object")
1231     target.serial_no += 1
1232
1233     if update_serial:
1234       # for node updates, we need to increase the cluster serial too
1235       self._config_data.cluster.serial_no += 1
1236
1237     if isinstance(target, objects.Instance):
1238       self._UnlockedReleaseDRBDMinors(target.name)
1239
1240     self._WriteConfig()