Enable lockless node queries
[ganeti-local] / lib / config.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Configuration management for Ganeti
23
24 This module provides the interface to the Ganeti cluster configuration.
25
26 The configuration data is stored on every node but is updated on the master
27 only. After each update, the master distributes the data to the other nodes.
28
29 Currently, the data storage format is JSON. YAML was slow and consuming too
30 much memory.
31
32 """
33
34 import os
35 import tempfile
36 import random
37 import logging
38
39 from ganeti import errors
40 from ganeti import locking
41 from ganeti import utils
42 from ganeti import constants
43 from ganeti import rpc
44 from ganeti import objects
45 from ganeti import serializer
46
47
48 _config_lock = locking.SharedLock()
49
50
51 def _ValidateConfig(data):
52   """Verifies that a configuration objects looks valid.
53
54   This only verifies the version of the configuration.
55
56   @raise errors.ConfigurationError: if the version differs from what
57       we expect
58
59   """
60   if data.version != constants.CONFIG_VERSION:
61     raise errors.ConfigurationError("Cluster configuration version"
62                                     " mismatch, got %s instead of %s" %
63                                     (data.version,
64                                      constants.CONFIG_VERSION))
65
66
67 class ConfigWriter:
68   """The interface to the cluster configuration.
69
70   """
71   def __init__(self, cfg_file=None, offline=False):
72     self.write_count = 0
73     self._lock = _config_lock
74     self._config_data = None
75     self._offline = offline
76     if cfg_file is None:
77       self._cfg_file = constants.CLUSTER_CONF_FILE
78     else:
79       self._cfg_file = cfg_file
80     self._temporary_ids = set()
81     self._temporary_drbds = {}
82     # Note: in order to prevent errors when resolving our name in
83     # _DistributeConfig, we compute it here once and reuse it; it's
84     # better to raise an error before starting to modify the config
85     # file than after it was modified
86     self._my_hostname = utils.HostInfo().name
87     self._last_cluster_serial = -1
88     self._OpenConfig()
89
90   # this method needs to be static, so that we can call it on the class
91   @staticmethod
92   def IsCluster():
93     """Check if the cluster is configured.
94
95     """
96     return os.path.exists(constants.CLUSTER_CONF_FILE)
97
98   @locking.ssynchronized(_config_lock, shared=1)
99   def GenerateMAC(self):
100     """Generate a MAC for an instance.
101
102     This should check the current instances for duplicates.
103
104     """
105     prefix = self._config_data.cluster.mac_prefix
106     all_macs = self._AllMACs()
107     retries = 64
108     while retries > 0:
109       byte1 = random.randrange(0, 256)
110       byte2 = random.randrange(0, 256)
111       byte3 = random.randrange(0, 256)
112       mac = "%s:%02x:%02x:%02x" % (prefix, byte1, byte2, byte3)
113       if mac not in all_macs:
114         break
115       retries -= 1
116     else:
117       raise errors.ConfigurationError("Can't generate unique MAC")
118     return mac
119
120   @locking.ssynchronized(_config_lock, shared=1)
121   def IsMacInUse(self, mac):
122     """Predicate: check if the specified MAC is in use in the Ganeti cluster.
123
124     This only checks instances managed by this cluster, it does not
125     check for potential collisions elsewhere.
126
127     """
128     all_macs = self._AllMACs()
129     return mac in all_macs
130
131   @locking.ssynchronized(_config_lock, shared=1)
132   def GenerateDRBDSecret(self):
133     """Generate a DRBD secret.
134
135     This checks the current disks for duplicates.
136
137     """
138     all_secrets = self._AllDRBDSecrets()
139     retries = 64
140     while retries > 0:
141       secret = utils.GenerateSecret()
142       if secret not in all_secrets:
143         break
144       retries -= 1
145     else:
146       raise errors.ConfigurationError("Can't generate unique DRBD secret")
147     return secret
148
149   def _ComputeAllLVs(self):
150     """Compute the list of all LVs.
151
152     """
153     lvnames = set()
154     for instance in self._config_data.instances.values():
155       node_data = instance.MapLVsByNode()
156       for lv_list in node_data.values():
157         lvnames.update(lv_list)
158     return lvnames
159
160   @locking.ssynchronized(_config_lock, shared=1)
161   def GenerateUniqueID(self, exceptions=None):
162     """Generate an unique disk name.
163
164     This checks the current node, instances and disk names for
165     duplicates.
166
167     @param exceptions: a list with some other names which should be checked
168         for uniqueness (used for example when you want to get
169         more than one id at one time without adding each one in
170         turn to the config file)
171
172     @rtype: string
173     @return: the unique id
174
175     """
176     existing = set()
177     existing.update(self._temporary_ids)
178     existing.update(self._ComputeAllLVs())
179     existing.update(self._config_data.instances.keys())
180     existing.update(self._config_data.nodes.keys())
181     if exceptions is not None:
182       existing.update(exceptions)
183     retries = 64
184     while retries > 0:
185       unique_id = utils.NewUUID()
186       if unique_id not in existing and unique_id is not None:
187         break
188     else:
189       raise errors.ConfigurationError("Not able generate an unique ID"
190                                       " (last tried ID: %s" % unique_id)
191     self._temporary_ids.add(unique_id)
192     return unique_id
193
194   def _AllMACs(self):
195     """Return all MACs present in the config.
196
197     @rtype: list
198     @return: the list of all MACs
199
200     """
201     result = []
202     for instance in self._config_data.instances.values():
203       for nic in instance.nics:
204         result.append(nic.mac)
205
206     return result
207
208   def _AllDRBDSecrets(self):
209     """Return all DRBD secrets present in the config.
210
211     @rtype: list
212     @return: the list of all DRBD secrets
213
214     """
215     def helper(disk, result):
216       """Recursively gather secrets from this disk."""
217       if disk.dev_type == constants.DT_DRBD8:
218         result.append(disk.logical_id[5])
219       if disk.children:
220         for child in disk.children:
221           helper(child, result)
222
223     result = []
224     for instance in self._config_data.instances.values():
225       for disk in instance.disks:
226         helper(disk, result)
227
228     return result
229
230   def _UnlockedVerifyConfig(self):
231     """Verify function.
232
233     @rtype: list
234     @return: a list of error messages; a non-empty list signifies
235         configuration errors
236
237     """
238     result = []
239     seen_macs = []
240     ports = {}
241     data = self._config_data
242     for instance_name in data.instances:
243       instance = data.instances[instance_name]
244       if instance.primary_node not in data.nodes:
245         result.append("instance '%s' has invalid primary node '%s'" %
246                       (instance_name, instance.primary_node))
247       for snode in instance.secondary_nodes:
248         if snode not in data.nodes:
249           result.append("instance '%s' has invalid secondary node '%s'" %
250                         (instance_name, snode))
251       for idx, nic in enumerate(instance.nics):
252         if nic.mac in seen_macs:
253           result.append("instance '%s' has NIC %d mac %s duplicate" %
254                         (instance_name, idx, nic.mac))
255         else:
256           seen_macs.append(nic.mac)
257
258       # gather the drbd ports for duplicate checks
259       for dsk in instance.disks:
260         if dsk.dev_type in constants.LDS_DRBD:
261           tcp_port = dsk.logical_id[2]
262           if tcp_port not in ports:
263             ports[tcp_port] = []
264           ports[tcp_port].append((instance.name, "drbd disk %s" % dsk.iv_name))
265       # gather network port reservation
266       net_port = getattr(instance, "network_port", None)
267       if net_port is not None:
268         if net_port not in ports:
269           ports[net_port] = []
270         ports[net_port].append((instance.name, "network port"))
271
272       # instance disk verify
273       for idx, disk in enumerate(instance.disks):
274         result.extend(["instance '%s' disk %d error: %s" %
275                        (instance.name, idx, msg) for msg in disk.Verify()])
276
277     # cluster-wide pool of free ports
278     for free_port in data.cluster.tcpudp_port_pool:
279       if free_port not in ports:
280         ports[free_port] = []
281       ports[free_port].append(("cluster", "port marked as free"))
282
283     # compute tcp/udp duplicate ports
284     keys = ports.keys()
285     keys.sort()
286     for pnum in keys:
287       pdata = ports[pnum]
288       if len(pdata) > 1:
289         txt = ", ".join(["%s/%s" % val for val in pdata])
290         result.append("tcp/udp port %s has duplicates: %s" % (pnum, txt))
291
292     # highest used tcp port check
293     if keys:
294       if keys[-1] > data.cluster.highest_used_port:
295         result.append("Highest used port mismatch, saved %s, computed %s" %
296                       (data.cluster.highest_used_port, keys[-1]))
297
298     if not data.nodes[data.cluster.master_node].master_candidate:
299       result.append("Master node is not a master candidate")
300
301     # master candidate checks
302     mc_now, mc_max = self._UnlockedGetMasterCandidateStats()
303     if mc_now < mc_max:
304       result.append("Not enough master candidates: actual %d, target %d" %
305                     (mc_now, mc_max))
306
307     # drbd minors check
308     d_map, duplicates = self._UnlockedComputeDRBDMap()
309     for node, minor, instance_a, instance_b in duplicates:
310       result.append("DRBD minor %d on node %s is assigned twice to instances"
311                     " %s and %s" % (minor, node, instance_a, instance_b))
312
313     return result
314
315   @locking.ssynchronized(_config_lock, shared=1)
316   def VerifyConfig(self):
317     """Verify function.
318
319     This is just a wrapper over L{_UnlockedVerifyConfig}.
320
321     @rtype: list
322     @return: a list of error messages; a non-empty list signifies
323         configuration errors
324
325     """
326     return self._UnlockedVerifyConfig()
327
328   def _UnlockedSetDiskID(self, disk, node_name):
329     """Convert the unique ID to the ID needed on the target nodes.
330
331     This is used only for drbd, which needs ip/port configuration.
332
333     The routine descends down and updates its children also, because
334     this helps when the only the top device is passed to the remote
335     node.
336
337     This function is for internal use, when the config lock is already held.
338
339     """
340     if disk.children:
341       for child in disk.children:
342         self._UnlockedSetDiskID(child, node_name)
343
344     if disk.logical_id is None and disk.physical_id is not None:
345       return
346     if disk.dev_type == constants.LD_DRBD8:
347       pnode, snode, port, pminor, sminor, secret = disk.logical_id
348       if node_name not in (pnode, snode):
349         raise errors.ConfigurationError("DRBD device not knowing node %s" %
350                                         node_name)
351       pnode_info = self._UnlockedGetNodeInfo(pnode)
352       snode_info = self._UnlockedGetNodeInfo(snode)
353       if pnode_info is None or snode_info is None:
354         raise errors.ConfigurationError("Can't find primary or secondary node"
355                                         " for %s" % str(disk))
356       p_data = (pnode_info.secondary_ip, port)
357       s_data = (snode_info.secondary_ip, port)
358       if pnode == node_name:
359         disk.physical_id = p_data + s_data + (pminor, secret)
360       else: # it must be secondary, we tested above
361         disk.physical_id = s_data + p_data + (sminor, secret)
362     else:
363       disk.physical_id = disk.logical_id
364     return
365
366   @locking.ssynchronized(_config_lock)
367   def SetDiskID(self, disk, node_name):
368     """Convert the unique ID to the ID needed on the target nodes.
369
370     This is used only for drbd, which needs ip/port configuration.
371
372     The routine descends down and updates its children also, because
373     this helps when the only the top device is passed to the remote
374     node.
375
376     """
377     return self._UnlockedSetDiskID(disk, node_name)
378
379   @locking.ssynchronized(_config_lock)
380   def AddTcpUdpPort(self, port):
381     """Adds a new port to the available port pool.
382
383     """
384     if not isinstance(port, int):
385       raise errors.ProgrammerError("Invalid type passed for port")
386
387     self._config_data.cluster.tcpudp_port_pool.add(port)
388     self._WriteConfig()
389
390   @locking.ssynchronized(_config_lock, shared=1)
391   def GetPortList(self):
392     """Returns a copy of the current port list.
393
394     """
395     return self._config_data.cluster.tcpudp_port_pool.copy()
396
397   @locking.ssynchronized(_config_lock)
398   def AllocatePort(self):
399     """Allocate a port.
400
401     The port will be taken from the available port pool or from the
402     default port range (and in this case we increase
403     highest_used_port).
404
405     """
406     # If there are TCP/IP ports configured, we use them first.
407     if self._config_data.cluster.tcpudp_port_pool:
408       port = self._config_data.cluster.tcpudp_port_pool.pop()
409     else:
410       port = self._config_data.cluster.highest_used_port + 1
411       if port >= constants.LAST_DRBD_PORT:
412         raise errors.ConfigurationError("The highest used port is greater"
413                                         " than %s. Aborting." %
414                                         constants.LAST_DRBD_PORT)
415       self._config_data.cluster.highest_used_port = port
416
417     self._WriteConfig()
418     return port
419
420   def _UnlockedComputeDRBDMap(self):
421     """Compute the used DRBD minor/nodes.
422
423     @rtype: (dict, list)
424     @return: dictionary of node_name: dict of minor: instance_name;
425         the returned dict will have all the nodes in it (even if with
426         an empty list), and a list of duplicates; if the duplicates
427         list is not empty, the configuration is corrupted and its caller
428         should raise an exception
429
430     """
431     def _AppendUsedPorts(instance_name, disk, used):
432       duplicates = []
433       if disk.dev_type == constants.LD_DRBD8 and len(disk.logical_id) >= 5:
434         nodeA, nodeB, dummy, minorA, minorB = disk.logical_id[:5]
435         for node, port in ((nodeA, minorA), (nodeB, minorB)):
436           assert node in used, ("Node '%s' of instance '%s' not found"
437                                 " in node list" % (node, instance_name))
438           if port in used[node]:
439             duplicates.append((node, port, instance_name, used[node][port]))
440           else:
441             used[node][port] = instance_name
442       if disk.children:
443         for child in disk.children:
444           duplicates.extend(_AppendUsedPorts(instance_name, child, used))
445       return duplicates
446
447     duplicates = []
448     my_dict = dict((node, {}) for node in self._config_data.nodes)
449     for instance in self._config_data.instances.itervalues():
450       for disk in instance.disks:
451         duplicates.extend(_AppendUsedPorts(instance.name, disk, my_dict))
452     for (node, minor), instance in self._temporary_drbds.iteritems():
453       if minor in my_dict[node] and my_dict[node][minor] != instance:
454         duplicates.append((node, minor, instance, my_dict[node][minor]))
455       else:
456         my_dict[node][minor] = instance
457     return my_dict, duplicates
458
459   @locking.ssynchronized(_config_lock)
460   def ComputeDRBDMap(self):
461     """Compute the used DRBD minor/nodes.
462
463     This is just a wrapper over L{_UnlockedComputeDRBDMap}.
464
465     @return: dictionary of node_name: dict of minor: instance_name;
466         the returned dict will have all the nodes in it (even if with
467         an empty list).
468
469     """
470     d_map, duplicates = self._UnlockedComputeDRBDMap()
471     if duplicates:
472       raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
473                                       str(duplicates))
474     return d_map
475
476   @locking.ssynchronized(_config_lock)
477   def AllocateDRBDMinor(self, nodes, instance):
478     """Allocate a drbd minor.
479
480     The free minor will be automatically computed from the existing
481     devices. A node can be given multiple times in order to allocate
482     multiple minors. The result is the list of minors, in the same
483     order as the passed nodes.
484
485     @type instance: string
486     @param instance: the instance for which we allocate minors
487
488     """
489     assert isinstance(instance, basestring), \
490            "Invalid argument '%s' passed to AllocateDRBDMinor" % instance
491
492     d_map, duplicates = self._UnlockedComputeDRBDMap()
493     if duplicates:
494       raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
495                                       str(duplicates))
496     result = []
497     for nname in nodes:
498       ndata = d_map[nname]
499       if not ndata:
500         # no minors used, we can start at 0
501         result.append(0)
502         ndata[0] = instance
503         self._temporary_drbds[(nname, 0)] = instance
504         continue
505       keys = ndata.keys()
506       keys.sort()
507       ffree = utils.FirstFree(keys)
508       if ffree is None:
509         # return the next minor
510         # TODO: implement high-limit check
511         minor = keys[-1] + 1
512       else:
513         minor = ffree
514       # double-check minor against current instances
515       assert minor not in d_map[nname], \
516              ("Attempt to reuse allocated DRBD minor %d on node %s,"
517               " already allocated to instance %s" %
518               (minor, nname, d_map[nname][minor]))
519       ndata[minor] = instance
520       # double-check minor against reservation
521       r_key = (nname, minor)
522       assert r_key not in self._temporary_drbds, \
523              ("Attempt to reuse reserved DRBD minor %d on node %s,"
524               " reserved for instance %s" %
525               (minor, nname, self._temporary_drbds[r_key]))
526       self._temporary_drbds[r_key] = instance
527       result.append(minor)
528     logging.debug("Request to allocate drbd minors, input: %s, returning %s",
529                   nodes, result)
530     return result
531
532   def _UnlockedReleaseDRBDMinors(self, instance):
533     """Release temporary drbd minors allocated for a given instance.
534
535     @type instance: string
536     @param instance: the instance for which temporary minors should be
537                      released
538
539     """
540     assert isinstance(instance, basestring), \
541            "Invalid argument passed to ReleaseDRBDMinors"
542     for key, name in self._temporary_drbds.items():
543       if name == instance:
544         del self._temporary_drbds[key]
545
546   @locking.ssynchronized(_config_lock)
547   def ReleaseDRBDMinors(self, instance):
548     """Release temporary drbd minors allocated for a given instance.
549
550     This should be called on the error paths, on the success paths
551     it's automatically called by the ConfigWriter add and update
552     functions.
553
554     This function is just a wrapper over L{_UnlockedReleaseDRBDMinors}.
555
556     @type instance: string
557     @param instance: the instance for which temporary minors should be
558                      released
559
560     """
561     self._UnlockedReleaseDRBDMinors(instance)
562
563   @locking.ssynchronized(_config_lock, shared=1)
564   def GetConfigVersion(self):
565     """Get the configuration version.
566
567     @return: Config version
568
569     """
570     return self._config_data.version
571
572   @locking.ssynchronized(_config_lock, shared=1)
573   def GetClusterName(self):
574     """Get cluster name.
575
576     @return: Cluster name
577
578     """
579     return self._config_data.cluster.cluster_name
580
581   @locking.ssynchronized(_config_lock, shared=1)
582   def GetMasterNode(self):
583     """Get the hostname of the master node for this cluster.
584
585     @return: Master hostname
586
587     """
588     return self._config_data.cluster.master_node
589
590   @locking.ssynchronized(_config_lock, shared=1)
591   def GetMasterIP(self):
592     """Get the IP of the master node for this cluster.
593
594     @return: Master IP
595
596     """
597     return self._config_data.cluster.master_ip
598
599   @locking.ssynchronized(_config_lock, shared=1)
600   def GetMasterNetdev(self):
601     """Get the master network device for this cluster.
602
603     """
604     return self._config_data.cluster.master_netdev
605
606   @locking.ssynchronized(_config_lock, shared=1)
607   def GetFileStorageDir(self):
608     """Get the file storage dir for this cluster.
609
610     """
611     return self._config_data.cluster.file_storage_dir
612
613   @locking.ssynchronized(_config_lock, shared=1)
614   def GetHypervisorType(self):
615     """Get the hypervisor type for this cluster.
616
617     """
618     return self._config_data.cluster.default_hypervisor
619
620   @locking.ssynchronized(_config_lock, shared=1)
621   def GetHostKey(self):
622     """Return the rsa hostkey from the config.
623
624     @rtype: string
625     @return: the rsa hostkey
626
627     """
628     return self._config_data.cluster.rsahostkeypub
629
630   @locking.ssynchronized(_config_lock)
631   def AddInstance(self, instance):
632     """Add an instance to the config.
633
634     This should be used after creating a new instance.
635
636     @type instance: L{objects.Instance}
637     @param instance: the instance object
638
639     """
640     if not isinstance(instance, objects.Instance):
641       raise errors.ProgrammerError("Invalid type passed to AddInstance")
642
643     if instance.disk_template != constants.DT_DISKLESS:
644       all_lvs = instance.MapLVsByNode()
645       logging.info("Instance '%s' DISK_LAYOUT: %s", instance.name, all_lvs)
646
647     instance.serial_no = 1
648     self._config_data.instances[instance.name] = instance
649     self._config_data.cluster.serial_no += 1
650     self._UnlockedReleaseDRBDMinors(instance.name)
651     self._WriteConfig()
652
653   def _SetInstanceStatus(self, instance_name, status):
654     """Set the instance's status to a given value.
655
656     """
657     assert isinstance(status, bool), \
658            "Invalid status '%s' passed to SetInstanceStatus" % (status,)
659
660     if instance_name not in self._config_data.instances:
661       raise errors.ConfigurationError("Unknown instance '%s'" %
662                                       instance_name)
663     instance = self._config_data.instances[instance_name]
664     if instance.admin_up != status:
665       instance.admin_up = status
666       instance.serial_no += 1
667       self._WriteConfig()
668
669   @locking.ssynchronized(_config_lock)
670   def MarkInstanceUp(self, instance_name):
671     """Mark the instance status to up in the config.
672
673     """
674     self._SetInstanceStatus(instance_name, True)
675
676   @locking.ssynchronized(_config_lock)
677   def RemoveInstance(self, instance_name):
678     """Remove the instance from the configuration.
679
680     """
681     if instance_name not in self._config_data.instances:
682       raise errors.ConfigurationError("Unknown instance '%s'" % instance_name)
683     del self._config_data.instances[instance_name]
684     self._config_data.cluster.serial_no += 1
685     self._WriteConfig()
686
687   @locking.ssynchronized(_config_lock)
688   def RenameInstance(self, old_name, new_name):
689     """Rename an instance.
690
691     This needs to be done in ConfigWriter and not by RemoveInstance
692     combined with AddInstance as only we can guarantee an atomic
693     rename.
694
695     """
696     if old_name not in self._config_data.instances:
697       raise errors.ConfigurationError("Unknown instance '%s'" % old_name)
698     inst = self._config_data.instances[old_name]
699     del self._config_data.instances[old_name]
700     inst.name = new_name
701
702     for disk in inst.disks:
703       if disk.dev_type == constants.LD_FILE:
704         # rename the file paths in logical and physical id
705         file_storage_dir = os.path.dirname(os.path.dirname(disk.logical_id[1]))
706         disk.physical_id = disk.logical_id = (disk.logical_id[0],
707                                               os.path.join(file_storage_dir,
708                                                            inst.name,
709                                                            disk.iv_name))
710
711     self._config_data.instances[inst.name] = inst
712     self._WriteConfig()
713
714   @locking.ssynchronized(_config_lock)
715   def MarkInstanceDown(self, instance_name):
716     """Mark the status of an instance to down in the configuration.
717
718     """
719     self._SetInstanceStatus(instance_name, False)
720
721   def _UnlockedGetInstanceList(self):
722     """Get the list of instances.
723
724     This function is for internal use, when the config lock is already held.
725
726     """
727     return self._config_data.instances.keys()
728
729   @locking.ssynchronized(_config_lock, shared=1)
730   def GetInstanceList(self):
731     """Get the list of instances.
732
733     @return: array of instances, ex. ['instance2.example.com',
734         'instance1.example.com']
735
736     """
737     return self._UnlockedGetInstanceList()
738
739   @locking.ssynchronized(_config_lock, shared=1)
740   def ExpandInstanceName(self, short_name):
741     """Attempt to expand an incomplete instance name.
742
743     """
744     return utils.MatchNameComponent(short_name,
745                                     self._config_data.instances.keys())
746
747   def _UnlockedGetInstanceInfo(self, instance_name):
748     """Returns informations about an instance.
749
750     This function is for internal use, when the config lock is already held.
751
752     """
753     if instance_name not in self._config_data.instances:
754       return None
755
756     return self._config_data.instances[instance_name]
757
758   @locking.ssynchronized(_config_lock, shared=1)
759   def GetInstanceInfo(self, instance_name):
760     """Returns informations about an instance.
761
762     It takes the information from the configuration file. Other informations of
763     an instance are taken from the live systems.
764
765     @param instance_name: name of the instance, e.g.
766         I{instance1.example.com}
767
768     @rtype: L{objects.Instance}
769     @return: the instance object
770
771     """
772     return self._UnlockedGetInstanceInfo(instance_name)
773
774   @locking.ssynchronized(_config_lock, shared=1)
775   def GetAllInstancesInfo(self):
776     """Get the configuration of all instances.
777
778     @rtype: dict
779     @returns: dict of (instance, instance_info), where instance_info is what
780               would GetInstanceInfo return for the node
781
782     """
783     my_dict = dict([(instance, self._UnlockedGetInstanceInfo(instance))
784                     for instance in self._UnlockedGetInstanceList()])
785     return my_dict
786
787   @locking.ssynchronized(_config_lock)
788   def AddNode(self, node):
789     """Add a node to the configuration.
790
791     @type node: L{objects.Node}
792     @param node: a Node instance
793
794     """
795     logging.info("Adding node %s to configuration" % node.name)
796
797     node.serial_no = 1
798     self._config_data.nodes[node.name] = node
799     self._config_data.cluster.serial_no += 1
800     self._WriteConfig()
801
802   @locking.ssynchronized(_config_lock)
803   def RemoveNode(self, node_name):
804     """Remove a node from the configuration.
805
806     """
807     logging.info("Removing node %s from configuration" % node_name)
808
809     if node_name not in self._config_data.nodes:
810       raise errors.ConfigurationError("Unknown node '%s'" % node_name)
811
812     del self._config_data.nodes[node_name]
813     self._config_data.cluster.serial_no += 1
814     self._WriteConfig()
815
816   @locking.ssynchronized(_config_lock, shared=1)
817   def ExpandNodeName(self, short_name):
818     """Attempt to expand an incomplete instance name.
819
820     """
821     return utils.MatchNameComponent(short_name,
822                                     self._config_data.nodes.keys())
823
824   def _UnlockedGetNodeInfo(self, node_name):
825     """Get the configuration of a node, as stored in the config.
826
827     This function is for internal use, when the config lock is already
828     held.
829
830     @param node_name: the node name, e.g. I{node1.example.com}
831
832     @rtype: L{objects.Node}
833     @return: the node object
834
835     """
836     if node_name not in self._config_data.nodes:
837       return None
838
839     return self._config_data.nodes[node_name]
840
841
842   @locking.ssynchronized(_config_lock, shared=1)
843   def GetNodeInfo(self, node_name):
844     """Get the configuration of a node, as stored in the config.
845
846     This is just a locked wrapper over L{_UnlockedGetNodeInfo}.
847
848     @param node_name: the node name, e.g. I{node1.example.com}
849
850     @rtype: L{objects.Node}
851     @return: the node object
852
853     """
854     return self._UnlockedGetNodeInfo(node_name)
855
856   def _UnlockedGetNodeList(self):
857     """Return the list of nodes which are in the configuration.
858
859     This function is for internal use, when the config lock is already
860     held.
861
862     @rtype: list
863
864     """
865     return self._config_data.nodes.keys()
866
867
868   @locking.ssynchronized(_config_lock, shared=1)
869   def GetNodeList(self):
870     """Return the list of nodes which are in the configuration.
871
872     """
873     return self._UnlockedGetNodeList()
874
875   @locking.ssynchronized(_config_lock, shared=1)
876   def GetOnlineNodeList(self):
877     """Return the list of nodes which are online.
878
879     """
880     all_nodes = [self._UnlockedGetNodeInfo(node)
881                  for node in self._UnlockedGetNodeList()]
882     return [node.name for node in all_nodes if not node.offline]
883
884   @locking.ssynchronized(_config_lock, shared=1)
885   def GetAllNodesInfo(self):
886     """Get the configuration of all nodes.
887
888     @rtype: dict
889     @return: dict of (node, node_info), where node_info is what
890               would GetNodeInfo return for the node
891
892     """
893     my_dict = dict([(node, self._UnlockedGetNodeInfo(node))
894                     for node in self._UnlockedGetNodeList()])
895     return my_dict
896
897   def _UnlockedGetMasterCandidateStats(self):
898     """Get the number of current and maximum desired and possible candidates.
899
900     @rtype: tuple
901     @return: tuple of (current, desired and possible)
902
903     """
904     mc_now = mc_max = 0
905     for node in self._config_data.nodes.itervalues():
906       if not node.offline:
907         mc_max += 1
908       if node.master_candidate:
909         mc_now += 1
910     mc_max = min(mc_max, self._config_data.cluster.candidate_pool_size)
911     return (mc_now, mc_max)
912
913   @locking.ssynchronized(_config_lock, shared=1)
914   def GetMasterCandidateStats(self):
915     """Get the number of current and maximum possible candidates.
916
917     This is just a wrapper over L{_UnlockedGetMasterCandidateStats}.
918
919     @rtype: tuple
920     @return: tuple of (current, max)
921
922     """
923     return self._UnlockedGetMasterCandidateStats()
924
925   @locking.ssynchronized(_config_lock)
926   def MaintainCandidatePool(self):
927     """Try to grow the candidate pool to the desired size.
928
929     @rtype: list
930     @return: list with the adjusted nodes (L{objects.Node} instances)
931
932     """
933     mc_now, mc_max = self._UnlockedGetMasterCandidateStats()
934     mod_list = []
935     if mc_now < mc_max:
936       node_list = self._config_data.nodes.keys()
937       random.shuffle(node_list)
938       for name in node_list:
939         if mc_now >= mc_max:
940           break
941         node = self._config_data.nodes[name]
942         if node.master_candidate or node.offline:
943           continue
944         mod_list.append(node)
945         node.master_candidate = True
946         node.serial_no += 1
947         mc_now += 1
948       if mc_now != mc_max:
949         # this should not happen
950         logging.warning("Warning: MaintainCandidatePool didn't manage to"
951                         " fill the candidate pool (%d/%d)", mc_now, mc_max)
952       if mod_list:
953         self._config_data.cluster.serial_no += 1
954         self._WriteConfig()
955
956     return mod_list
957
958   def _BumpSerialNo(self):
959     """Bump up the serial number of the config.
960
961     """
962     self._config_data.serial_no += 1
963
964   def _OpenConfig(self):
965     """Read the config data from disk.
966
967     """
968     f = open(self._cfg_file, 'r')
969     try:
970       try:
971         data = objects.ConfigData.FromDict(serializer.Load(f.read()))
972       except Exception, err:
973         raise errors.ConfigurationError(err)
974     finally:
975       f.close()
976
977     # Make sure the configuration has the right version
978     _ValidateConfig(data)
979
980     if (not hasattr(data, 'cluster') or
981         not hasattr(data.cluster, 'rsahostkeypub')):
982       raise errors.ConfigurationError("Incomplete configuration"
983                                       " (missing cluster.rsahostkeypub)")
984     self._config_data = data
985     # reset the last serial as -1 so that the next write will cause
986     # ssconf update
987     self._last_cluster_serial = -1
988
989   def _DistributeConfig(self):
990     """Distribute the configuration to the other nodes.
991
992     Currently, this only copies the configuration file. In the future,
993     it could be used to encapsulate the 2/3-phase update mechanism.
994
995     """
996     if self._offline:
997       return True
998     bad = False
999
1000     node_list = []
1001     addr_list = []
1002     myhostname = self._my_hostname
1003     # we can skip checking whether _UnlockedGetNodeInfo returns None
1004     # since the node list comes from _UnlocketGetNodeList, and we are
1005     # called with the lock held, so no modifications should take place
1006     # in between
1007     for node_name in self._UnlockedGetNodeList():
1008       if node_name == myhostname:
1009         continue
1010       node_info = self._UnlockedGetNodeInfo(node_name)
1011       if not node_info.master_candidate:
1012         continue
1013       node_list.append(node_info.name)
1014       addr_list.append(node_info.primary_ip)
1015
1016     result = rpc.RpcRunner.call_upload_file(node_list, self._cfg_file,
1017                                             address_list=addr_list)
1018     for node in node_list:
1019       if not result[node]:
1020         logging.error("copy of file %s to node %s failed",
1021                       self._cfg_file, node)
1022         bad = True
1023     return not bad
1024
1025   def _WriteConfig(self, destination=None):
1026     """Write the configuration data to persistent storage.
1027
1028     """
1029     config_errors = self._UnlockedVerifyConfig()
1030     if config_errors:
1031       raise errors.ConfigurationError("Configuration data is not"
1032                                       " consistent: %s" %
1033                                       (", ".join(config_errors)))
1034     if destination is None:
1035       destination = self._cfg_file
1036     self._BumpSerialNo()
1037     txt = serializer.Dump(self._config_data.ToDict())
1038     dir_name, file_name = os.path.split(destination)
1039     fd, name = tempfile.mkstemp('.newconfig', file_name, dir_name)
1040     f = os.fdopen(fd, 'w')
1041     try:
1042       f.write(txt)
1043       os.fsync(f.fileno())
1044     finally:
1045       f.close()
1046     # we don't need to do os.close(fd) as f.close() did it
1047     os.rename(name, destination)
1048     self.write_count += 1
1049
1050     # and redistribute the config file to master candidates
1051     self._DistributeConfig()
1052
1053     # Write ssconf files on all nodes (including locally)
1054     if self._last_cluster_serial < self._config_data.cluster.serial_no:
1055       if not self._offline:
1056         rpc.RpcRunner.call_write_ssconf_files(self._UnlockedGetNodeList(),
1057                                               self._UnlockedGetSsconfValues())
1058       self._last_cluster_serial = self._config_data.cluster.serial_no
1059
1060   def _UnlockedGetSsconfValues(self):
1061     """Return the values needed by ssconf.
1062
1063     @rtype: dict
1064     @return: a dictionary with keys the ssconf names and values their
1065         associated value
1066
1067     """
1068     fn = "\n".join
1069     instance_names = utils.NiceSort(self._UnlockedGetInstanceList())
1070     node_names = utils.NiceSort(self._UnlockedGetNodeList())
1071     node_info = [self._UnlockedGetNodeInfo(name) for name in node_names]
1072
1073     instance_data = fn(instance_names)
1074     off_data = fn(node.name for node in node_info if node.offline)
1075     on_data = fn(node.name for node in node_info if not node.offline)
1076     mc_data = fn(node.name for node in node_info if node.master_candidate)
1077     node_data = fn(node_names)
1078
1079     cluster = self._config_data.cluster
1080     return {
1081       constants.SS_CLUSTER_NAME: cluster.cluster_name,
1082       constants.SS_FILE_STORAGE_DIR: cluster.file_storage_dir,
1083       constants.SS_MASTER_CANDIDATES: mc_data,
1084       constants.SS_MASTER_IP: cluster.master_ip,
1085       constants.SS_MASTER_NETDEV: cluster.master_netdev,
1086       constants.SS_MASTER_NODE: cluster.master_node,
1087       constants.SS_NODE_LIST: node_data,
1088       constants.SS_OFFLINE_NODES: off_data,
1089       constants.SS_ONLINE_NODES: on_data,
1090       constants.SS_INSTANCE_LIST: instance_data,
1091       constants.SS_RELEASE_VERSION: constants.RELEASE_VERSION,
1092       }
1093
1094   @locking.ssynchronized(_config_lock)
1095   def InitConfig(self, version, cluster_config, master_node_config):
1096     """Create the initial cluster configuration.
1097
1098     It will contain the current node, which will also be the master
1099     node, and no instances.
1100
1101     @type version: int
1102     @param version: Configuration version
1103     @type cluster_config: objects.Cluster
1104     @param cluster_config: Cluster configuration
1105     @type master_node_config: objects.Node
1106     @param master_node_config: Master node configuration
1107
1108     """
1109     nodes = {
1110       master_node_config.name: master_node_config,
1111       }
1112
1113     self._config_data = objects.ConfigData(version=version,
1114                                            cluster=cluster_config,
1115                                            nodes=nodes,
1116                                            instances={},
1117                                            serial_no=1)
1118     self._WriteConfig()
1119
1120   @locking.ssynchronized(_config_lock, shared=1)
1121   def GetVGName(self):
1122     """Return the volume group name.
1123
1124     """
1125     return self._config_data.cluster.volume_group_name
1126
1127   @locking.ssynchronized(_config_lock)
1128   def SetVGName(self, vg_name):
1129     """Set the volume group name.
1130
1131     """
1132     self._config_data.cluster.volume_group_name = vg_name
1133     self._config_data.cluster.serial_no += 1
1134     self._WriteConfig()
1135
1136   @locking.ssynchronized(_config_lock, shared=1)
1137   def GetDefBridge(self):
1138     """Return the default bridge.
1139
1140     """
1141     return self._config_data.cluster.default_bridge
1142
1143   @locking.ssynchronized(_config_lock, shared=1)
1144   def GetMACPrefix(self):
1145     """Return the mac prefix.
1146
1147     """
1148     return self._config_data.cluster.mac_prefix
1149
1150   @locking.ssynchronized(_config_lock, shared=1)
1151   def GetClusterInfo(self):
1152     """Returns informations about the cluster
1153
1154     @rtype: L{objects.Cluster}
1155     @return: the cluster object
1156
1157     """
1158     return self._config_data.cluster
1159
1160   @locking.ssynchronized(_config_lock)
1161   def Update(self, target):
1162     """Notify function to be called after updates.
1163
1164     This function must be called when an object (as returned by
1165     GetInstanceInfo, GetNodeInfo, GetCluster) has been updated and the
1166     caller wants the modifications saved to the backing store. Note
1167     that all modified objects will be saved, but the target argument
1168     is the one the caller wants to ensure that it's saved.
1169
1170     @param target: an instance of either L{objects.Cluster},
1171         L{objects.Node} or L{objects.Instance} which is existing in
1172         the cluster
1173
1174     """
1175     if self._config_data is None:
1176       raise errors.ProgrammerError("Configuration file not read,"
1177                                    " cannot save.")
1178     update_serial = False
1179     if isinstance(target, objects.Cluster):
1180       test = target == self._config_data.cluster
1181     elif isinstance(target, objects.Node):
1182       test = target in self._config_data.nodes.values()
1183       update_serial = True
1184     elif isinstance(target, objects.Instance):
1185       test = target in self._config_data.instances.values()
1186     else:
1187       raise errors.ProgrammerError("Invalid object type (%s) passed to"
1188                                    " ConfigWriter.Update" % type(target))
1189     if not test:
1190       raise errors.ConfigurationError("Configuration updated since object"
1191                                       " has been read or unknown object")
1192     target.serial_no += 1
1193
1194     if update_serial:
1195       # for node updates, we need to increase the cluster serial too
1196       self._config_data.cluster.serial_no += 1
1197
1198     if isinstance(target, objects.Instance):
1199       self._UnlockedReleaseDRBDMinors(target.name)
1200
1201     self._WriteConfig()