Implement disk verify checks in config verify
[ganeti-local] / lib / config.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Configuration management for Ganeti
23
24 This module provides the interface to the Ganeti cluster configuration.
25
26 The configuration data is stored on every node but is updated on the master
27 only. After each update, the master distributes the data to the other nodes.
28
29 Currently, the data storage format is JSON. YAML was slow and consuming too
30 much memory.
31
32 """
33
34 import os
35 import tempfile
36 import random
37 import logging
38
39 from ganeti import errors
40 from ganeti import locking
41 from ganeti import utils
42 from ganeti import constants
43 from ganeti import rpc
44 from ganeti import objects
45 from ganeti import serializer
46
47
48 _config_lock = locking.SharedLock()
49
50
51 def _ValidateConfig(data):
52   """Verifies that a configuration objects looks valid.
53
54   This only verifies the version of the configuration.
55
56   @raise errors.ConfigurationError: if the version differs from what
57       we expect
58
59   """
60   if data.version != constants.CONFIG_VERSION:
61     raise errors.ConfigurationError("Cluster configuration version"
62                                     " mismatch, got %s instead of %s" %
63                                     (data.version,
64                                      constants.CONFIG_VERSION))
65
66
67 class ConfigWriter:
68   """The interface to the cluster configuration.
69
70   """
71   def __init__(self, cfg_file=None, offline=False):
72     self.write_count = 0
73     self._lock = _config_lock
74     self._config_data = None
75     self._offline = offline
76     if cfg_file is None:
77       self._cfg_file = constants.CLUSTER_CONF_FILE
78     else:
79       self._cfg_file = cfg_file
80     self._temporary_ids = set()
81     self._temporary_drbds = {}
82     # Note: in order to prevent errors when resolving our name in
83     # _DistributeConfig, we compute it here once and reuse it; it's
84     # better to raise an error before starting to modify the config
85     # file than after it was modified
86     self._my_hostname = utils.HostInfo().name
87     self._last_cluster_serial = -1
88     self._OpenConfig()
89
90   # this method needs to be static, so that we can call it on the class
91   @staticmethod
92   def IsCluster():
93     """Check if the cluster is configured.
94
95     """
96     return os.path.exists(constants.CLUSTER_CONF_FILE)
97
98   @locking.ssynchronized(_config_lock, shared=1)
99   def GenerateMAC(self):
100     """Generate a MAC for an instance.
101
102     This should check the current instances for duplicates.
103
104     """
105     prefix = self._config_data.cluster.mac_prefix
106     all_macs = self._AllMACs()
107     retries = 64
108     while retries > 0:
109       byte1 = random.randrange(0, 256)
110       byte2 = random.randrange(0, 256)
111       byte3 = random.randrange(0, 256)
112       mac = "%s:%02x:%02x:%02x" % (prefix, byte1, byte2, byte3)
113       if mac not in all_macs:
114         break
115       retries -= 1
116     else:
117       raise errors.ConfigurationError("Can't generate unique MAC")
118     return mac
119
120   @locking.ssynchronized(_config_lock, shared=1)
121   def IsMacInUse(self, mac):
122     """Predicate: check if the specified MAC is in use in the Ganeti cluster.
123
124     This only checks instances managed by this cluster, it does not
125     check for potential collisions elsewhere.
126
127     """
128     all_macs = self._AllMACs()
129     return mac in all_macs
130
131   @locking.ssynchronized(_config_lock, shared=1)
132   def GenerateDRBDSecret(self):
133     """Generate a DRBD secret.
134
135     This checks the current disks for duplicates.
136
137     """
138     all_secrets = self._AllDRBDSecrets()
139     retries = 64
140     while retries > 0:
141       secret = utils.GenerateSecret()
142       if secret not in all_secrets:
143         break
144       retries -= 1
145     else:
146       raise errors.ConfigurationError("Can't generate unique DRBD secret")
147     return secret
148
149   def _ComputeAllLVs(self):
150     """Compute the list of all LVs.
151
152     """
153     lvnames = set()
154     for instance in self._config_data.instances.values():
155       node_data = instance.MapLVsByNode()
156       for lv_list in node_data.values():
157         lvnames.update(lv_list)
158     return lvnames
159
160   @locking.ssynchronized(_config_lock, shared=1)
161   def GenerateUniqueID(self, exceptions=None):
162     """Generate an unique disk name.
163
164     This checks the current node, instances and disk names for
165     duplicates.
166
167     @param exceptions: a list with some other names which should be checked
168         for uniqueness (used for example when you want to get
169         more than one id at one time without adding each one in
170         turn to the config file)
171
172     @rtype: string
173     @return: the unique id
174
175     """
176     existing = set()
177     existing.update(self._temporary_ids)
178     existing.update(self._ComputeAllLVs())
179     existing.update(self._config_data.instances.keys())
180     existing.update(self._config_data.nodes.keys())
181     if exceptions is not None:
182       existing.update(exceptions)
183     retries = 64
184     while retries > 0:
185       unique_id = utils.NewUUID()
186       if unique_id not in existing and unique_id is not None:
187         break
188     else:
189       raise errors.ConfigurationError("Not able generate an unique ID"
190                                       " (last tried ID: %s" % unique_id)
191     self._temporary_ids.add(unique_id)
192     return unique_id
193
194   def _AllMACs(self):
195     """Return all MACs present in the config.
196
197     @rtype: list
198     @return: the list of all MACs
199
200     """
201     result = []
202     for instance in self._config_data.instances.values():
203       for nic in instance.nics:
204         result.append(nic.mac)
205
206     return result
207
208   def _AllDRBDSecrets(self):
209     """Return all DRBD secrets present in the config.
210
211     @rtype: list
212     @return: the list of all DRBD secrets
213
214     """
215     def helper(disk, result):
216       """Recursively gather secrets from this disk."""
217       if disk.dev_type == constants.DT_DRBD8:
218         result.append(disk.logical_id[5])
219       if disk.children:
220         for child in disk.children:
221           helper(child, result)
222
223     result = []
224     for instance in self._config_data.instances.values():
225       for disk in instance.disks:
226         helper(disk, result)
227
228     return result
229
230   def _UnlockedVerifyConfig(self):
231     """Verify function.
232
233     @rtype: list
234     @return: a list of error messages; a non-empty list signifies
235         configuration errors
236
237     """
238     result = []
239     seen_macs = []
240     ports = {}
241     data = self._config_data
242     for instance_name in data.instances:
243       instance = data.instances[instance_name]
244       if instance.primary_node not in data.nodes:
245         result.append("instance '%s' has invalid primary node '%s'" %
246                       (instance_name, instance.primary_node))
247       for snode in instance.secondary_nodes:
248         if snode not in data.nodes:
249           result.append("instance '%s' has invalid secondary node '%s'" %
250                         (instance_name, snode))
251       for idx, nic in enumerate(instance.nics):
252         if nic.mac in seen_macs:
253           result.append("instance '%s' has NIC %d mac %s duplicate" %
254                         (instance_name, idx, nic.mac))
255         else:
256           seen_macs.append(nic.mac)
257
258       # gather the drbd ports for duplicate checks
259       for dsk in instance.disks:
260         if dsk.dev_type in constants.LDS_DRBD:
261           tcp_port = dsk.logical_id[2]
262           if tcp_port not in ports:
263             ports[tcp_port] = []
264           ports[tcp_port].append((instance.name, "drbd disk %s" % dsk.iv_name))
265       # gather network port reservation
266       net_port = getattr(instance, "network_port", None)
267       if net_port is not None:
268         if net_port not in ports:
269           ports[net_port] = []
270         ports[net_port].append((instance.name, "network port"))
271
272       # instance disk verify
273       for idx, disk in enumerate(instance.disks):
274         result.extend(["instance '%s' disk %d error: %s" %
275                        (instance.name, idx, msg) for msg in disk.Verify()])
276
277     # cluster-wide pool of free ports
278     for free_port in data.cluster.tcpudp_port_pool:
279       if free_port not in ports:
280         ports[free_port] = []
281       ports[free_port].append(("cluster", "port marked as free"))
282
283     # compute tcp/udp duplicate ports
284     keys = ports.keys()
285     keys.sort()
286     for pnum in keys:
287       pdata = ports[pnum]
288       if len(pdata) > 1:
289         txt = ", ".join(["%s/%s" % val for val in pdata])
290         result.append("tcp/udp port %s has duplicates: %s" % (pnum, txt))
291
292     # highest used tcp port check
293     if keys:
294       if keys[-1] > data.cluster.highest_used_port:
295         result.append("Highest used port mismatch, saved %s, computed %s" %
296                       (data.cluster.highest_used_port, keys[-1]))
297
298     if not data.nodes[data.cluster.master_node].master_candidate:
299       result.append("Master node is not a master candidate")
300
301     # master candidate checks
302     mc_now, mc_max = self._UnlockedGetMasterCandidateStats()
303     if mc_now < mc_max:
304       result.append("Not enough master candidates: actual %d, target %d" %
305                     (mc_now, mc_max))
306
307     # drbd minors check
308     d_map, duplicates = self._UnlockedComputeDRBDMap()
309     for node, minor, instance_a, instance_b in duplicates:
310       result.append("DRBD minor %d on node %s is assigned twice to instances"
311                     " %s and %s" % (minor, node, instance_a, instance_b))
312
313     return result
314
315   @locking.ssynchronized(_config_lock, shared=1)
316   def VerifyConfig(self):
317     """Verify function.
318
319     This is just a wrapper over L{_UnlockedVerifyConfig}.
320
321     @rtype: list
322     @return: a list of error messages; a non-empty list signifies
323         configuration errors
324
325     """
326     return self._UnlockedVerifyConfig()
327
328   def _UnlockedSetDiskID(self, disk, node_name):
329     """Convert the unique ID to the ID needed on the target nodes.
330
331     This is used only for drbd, which needs ip/port configuration.
332
333     The routine descends down and updates its children also, because
334     this helps when the only the top device is passed to the remote
335     node.
336
337     This function is for internal use, when the config lock is already held.
338
339     """
340     if disk.children:
341       for child in disk.children:
342         self._UnlockedSetDiskID(child, node_name)
343
344     if disk.logical_id is None and disk.physical_id is not None:
345       return
346     if disk.dev_type == constants.LD_DRBD8:
347       pnode, snode, port, pminor, sminor, secret = disk.logical_id
348       if node_name not in (pnode, snode):
349         raise errors.ConfigurationError("DRBD device not knowing node %s" %
350                                         node_name)
351       pnode_info = self._UnlockedGetNodeInfo(pnode)
352       snode_info = self._UnlockedGetNodeInfo(snode)
353       if pnode_info is None or snode_info is None:
354         raise errors.ConfigurationError("Can't find primary or secondary node"
355                                         " for %s" % str(disk))
356       p_data = (pnode_info.secondary_ip, port)
357       s_data = (snode_info.secondary_ip, port)
358       if pnode == node_name:
359         disk.physical_id = p_data + s_data + (pminor, secret)
360       else: # it must be secondary, we tested above
361         disk.physical_id = s_data + p_data + (sminor, secret)
362     else:
363       disk.physical_id = disk.logical_id
364     return
365
366   @locking.ssynchronized(_config_lock)
367   def SetDiskID(self, disk, node_name):
368     """Convert the unique ID to the ID needed on the target nodes.
369
370     This is used only for drbd, which needs ip/port configuration.
371
372     The routine descends down and updates its children also, because
373     this helps when the only the top device is passed to the remote
374     node.
375
376     """
377     return self._UnlockedSetDiskID(disk, node_name)
378
379   @locking.ssynchronized(_config_lock)
380   def AddTcpUdpPort(self, port):
381     """Adds a new port to the available port pool.
382
383     """
384     if not isinstance(port, int):
385       raise errors.ProgrammerError("Invalid type passed for port")
386
387     self._config_data.cluster.tcpudp_port_pool.add(port)
388     self._WriteConfig()
389
390   @locking.ssynchronized(_config_lock, shared=1)
391   def GetPortList(self):
392     """Returns a copy of the current port list.
393
394     """
395     return self._config_data.cluster.tcpudp_port_pool.copy()
396
397   @locking.ssynchronized(_config_lock)
398   def AllocatePort(self):
399     """Allocate a port.
400
401     The port will be taken from the available port pool or from the
402     default port range (and in this case we increase
403     highest_used_port).
404
405     """
406     # If there are TCP/IP ports configured, we use them first.
407     if self._config_data.cluster.tcpudp_port_pool:
408       port = self._config_data.cluster.tcpudp_port_pool.pop()
409     else:
410       port = self._config_data.cluster.highest_used_port + 1
411       if port >= constants.LAST_DRBD_PORT:
412         raise errors.ConfigurationError("The highest used port is greater"
413                                         " than %s. Aborting." %
414                                         constants.LAST_DRBD_PORT)
415       self._config_data.cluster.highest_used_port = port
416
417     self._WriteConfig()
418     return port
419
420   def _UnlockedComputeDRBDMap(self):
421     """Compute the used DRBD minor/nodes.
422
423     @rtype: (dict, list)
424     @return: dictionary of node_name: dict of minor: instance_name;
425         the returned dict will have all the nodes in it (even if with
426         an empty list), and a list of duplicates; if the duplicates
427         list is not empty, the configuration is corrupted and its caller
428         should raise an exception
429
430     """
431     def _AppendUsedPorts(instance_name, disk, used):
432       duplicates = []
433       if disk.dev_type == constants.LD_DRBD8 and len(disk.logical_id) >= 5:
434         nodeA, nodeB, dummy, minorA, minorB = disk.logical_id[:5]
435         for node, port in ((nodeA, minorA), (nodeB, minorB)):
436           assert node in used, ("Node '%s' of instance '%s' not found"
437                                 " in node list" % (node, instance_name))
438           if port in used[node]:
439             duplicates.append((node, port, instance_name, used[node][port]))
440           else:
441             used[node][port] = instance_name
442       if disk.children:
443         for child in disk.children:
444           duplicates.extend(_AppendUsedPorts(instance_name, child, used))
445       return duplicates
446
447     duplicates = []
448     my_dict = dict((node, {}) for node in self._config_data.nodes)
449     for instance in self._config_data.instances.itervalues():
450       for disk in instance.disks:
451         duplicates.extend(_AppendUsedPorts(instance.name, disk, my_dict))
452     for (node, minor), instance in self._temporary_drbds.iteritems():
453       if minor in my_dict[node] and my_dict[node][minor] != instance:
454         duplicates.append((node, minor, instance, my_dict[node][minor]))
455       else:
456         my_dict[node][minor] = instance
457     return my_dict, duplicates
458
459   @locking.ssynchronized(_config_lock)
460   def ComputeDRBDMap(self):
461     """Compute the used DRBD minor/nodes.
462
463     This is just a wrapper over L{_UnlockedComputeDRBDMap}.
464
465     @return: dictionary of node_name: dict of minor: instance_name;
466         the returned dict will have all the nodes in it (even if with
467         an empty list).
468
469     """
470     d_map, duplicates = self._UnlockedComputeDRBDMap()
471     if duplicates:
472       raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
473                                       str(duplicates))
474     return d_map
475
476   @locking.ssynchronized(_config_lock)
477   def AllocateDRBDMinor(self, nodes, instance):
478     """Allocate a drbd minor.
479
480     The free minor will be automatically computed from the existing
481     devices. A node can be given multiple times in order to allocate
482     multiple minors. The result is the list of minors, in the same
483     order as the passed nodes.
484
485     @type instance: string
486     @param instance: the instance for which we allocate minors
487
488     """
489     assert isinstance(instance, basestring), \
490            "Invalid argument '%s' passed to AllocateDRBDMinor" % instance
491
492     d_map, duplicates = self._UnlockedComputeDRBDMap()
493     if duplicates:
494       raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
495                                       str(duplicates))
496     result = []
497     for nname in nodes:
498       ndata = d_map[nname]
499       if not ndata:
500         # no minors used, we can start at 0
501         result.append(0)
502         ndata[0] = instance
503         self._temporary_drbds[(nname, 0)] = instance
504         continue
505       keys = ndata.keys()
506       keys.sort()
507       ffree = utils.FirstFree(keys)
508       if ffree is None:
509         # return the next minor
510         # TODO: implement high-limit check
511         minor = keys[-1] + 1
512       else:
513         minor = ffree
514       # double-check minor against current instances
515       assert minor not in d_map[nname], \
516              ("Attempt to reuse allocated DRBD minor %d on node %s,"
517               " already allocated to instance %s" %
518               (minor, nname, d_map[nname][minor]))
519       ndata[minor] = instance
520       # double-check minor against reservation
521       r_key = (nname, minor)
522       assert r_key not in self._temporary_drbds, \
523              ("Attempt to reuse reserved DRBD minor %d on node %s,"
524               " reserved for instance %s" %
525               (minor, nname, self._temporary_drbds[r_key]))
526       self._temporary_drbds[r_key] = instance
527       result.append(minor)
528     logging.debug("Request to allocate drbd minors, input: %s, returning %s",
529                   nodes, result)
530     return result
531
532   def _UnlockedReleaseDRBDMinors(self, instance):
533     """Release temporary drbd minors allocated for a given instance.
534
535     @type instance: string
536     @param instance: the instance for which temporary minors should be
537                      released
538
539     """
540     assert isinstance(instance, basestring), \
541            "Invalid argument passed to ReleaseDRBDMinors"
542     for key, name in self._temporary_drbds.items():
543       if name == instance:
544         del self._temporary_drbds[key]
545
546   @locking.ssynchronized(_config_lock)
547   def ReleaseDRBDMinors(self, instance):
548     """Release temporary drbd minors allocated for a given instance.
549
550     This should be called on the error paths, on the success paths
551     it's automatically called by the ConfigWriter add and update
552     functions.
553
554     This function is just a wrapper over L{_UnlockedReleaseDRBDMinors}.
555
556     @type instance: string
557     @param instance: the instance for which temporary minors should be
558                      released
559
560     """
561     self._UnlockedReleaseDRBDMinors(instance)
562
563   @locking.ssynchronized(_config_lock, shared=1)
564   def GetConfigVersion(self):
565     """Get the configuration version.
566
567     @return: Config version
568
569     """
570     return self._config_data.version
571
572   @locking.ssynchronized(_config_lock, shared=1)
573   def GetClusterName(self):
574     """Get cluster name.
575
576     @return: Cluster name
577
578     """
579     return self._config_data.cluster.cluster_name
580
581   @locking.ssynchronized(_config_lock, shared=1)
582   def GetMasterNode(self):
583     """Get the hostname of the master node for this cluster.
584
585     @return: Master hostname
586
587     """
588     return self._config_data.cluster.master_node
589
590   @locking.ssynchronized(_config_lock, shared=1)
591   def GetMasterIP(self):
592     """Get the IP of the master node for this cluster.
593
594     @return: Master IP
595
596     """
597     return self._config_data.cluster.master_ip
598
599   @locking.ssynchronized(_config_lock, shared=1)
600   def GetMasterNetdev(self):
601     """Get the master network device for this cluster.
602
603     """
604     return self._config_data.cluster.master_netdev
605
606   @locking.ssynchronized(_config_lock, shared=1)
607   def GetFileStorageDir(self):
608     """Get the file storage dir for this cluster.
609
610     """
611     return self._config_data.cluster.file_storage_dir
612
613   @locking.ssynchronized(_config_lock, shared=1)
614   def GetHypervisorType(self):
615     """Get the hypervisor type for this cluster.
616
617     """
618     return self._config_data.cluster.default_hypervisor
619
620   @locking.ssynchronized(_config_lock, shared=1)
621   def GetHostKey(self):
622     """Return the rsa hostkey from the config.
623
624     @rtype: string
625     @return: the rsa hostkey
626
627     """
628     return self._config_data.cluster.rsahostkeypub
629
630   @locking.ssynchronized(_config_lock)
631   def AddInstance(self, instance):
632     """Add an instance to the config.
633
634     This should be used after creating a new instance.
635
636     @type instance: L{objects.Instance}
637     @param instance: the instance object
638
639     """
640     if not isinstance(instance, objects.Instance):
641       raise errors.ProgrammerError("Invalid type passed to AddInstance")
642
643     if instance.disk_template != constants.DT_DISKLESS:
644       all_lvs = instance.MapLVsByNode()
645       logging.info("Instance '%s' DISK_LAYOUT: %s", instance.name, all_lvs)
646
647     instance.serial_no = 1
648     self._config_data.instances[instance.name] = instance
649     self._UnlockedReleaseDRBDMinors(instance.name)
650     self._WriteConfig()
651
652   def _SetInstanceStatus(self, instance_name, status):
653     """Set the instance's status to a given value.
654
655     """
656     assert isinstance(status, bool), \
657            "Invalid status '%s' passed to SetInstanceStatus" % (status,)
658
659     if instance_name not in self._config_data.instances:
660       raise errors.ConfigurationError("Unknown instance '%s'" %
661                                       instance_name)
662     instance = self._config_data.instances[instance_name]
663     if instance.admin_up != status:
664       instance.admin_up = status
665       instance.serial_no += 1
666       self._WriteConfig()
667
668   @locking.ssynchronized(_config_lock)
669   def MarkInstanceUp(self, instance_name):
670     """Mark the instance status to up in the config.
671
672     """
673     self._SetInstanceStatus(instance_name, True)
674
675   @locking.ssynchronized(_config_lock)
676   def RemoveInstance(self, instance_name):
677     """Remove the instance from the configuration.
678
679     """
680     if instance_name not in self._config_data.instances:
681       raise errors.ConfigurationError("Unknown instance '%s'" % instance_name)
682     del self._config_data.instances[instance_name]
683     self._WriteConfig()
684
685   @locking.ssynchronized(_config_lock)
686   def RenameInstance(self, old_name, new_name):
687     """Rename an instance.
688
689     This needs to be done in ConfigWriter and not by RemoveInstance
690     combined with AddInstance as only we can guarantee an atomic
691     rename.
692
693     """
694     if old_name not in self._config_data.instances:
695       raise errors.ConfigurationError("Unknown instance '%s'" % old_name)
696     inst = self._config_data.instances[old_name]
697     del self._config_data.instances[old_name]
698     inst.name = new_name
699
700     for disk in inst.disks:
701       if disk.dev_type == constants.LD_FILE:
702         # rename the file paths in logical and physical id
703         file_storage_dir = os.path.dirname(os.path.dirname(disk.logical_id[1]))
704         disk.physical_id = disk.logical_id = (disk.logical_id[0],
705                                               os.path.join(file_storage_dir,
706                                                            inst.name,
707                                                            disk.iv_name))
708
709     self._config_data.instances[inst.name] = inst
710     self._WriteConfig()
711
712   @locking.ssynchronized(_config_lock)
713   def MarkInstanceDown(self, instance_name):
714     """Mark the status of an instance to down in the configuration.
715
716     """
717     self._SetInstanceStatus(instance_name, False)
718
719   def _UnlockedGetInstanceList(self):
720     """Get the list of instances.
721
722     This function is for internal use, when the config lock is already held.
723
724     """
725     return self._config_data.instances.keys()
726
727   @locking.ssynchronized(_config_lock, shared=1)
728   def GetInstanceList(self):
729     """Get the list of instances.
730
731     @return: array of instances, ex. ['instance2.example.com',
732         'instance1.example.com']
733
734     """
735     return self._UnlockedGetInstanceList()
736
737   @locking.ssynchronized(_config_lock, shared=1)
738   def ExpandInstanceName(self, short_name):
739     """Attempt to expand an incomplete instance name.
740
741     """
742     return utils.MatchNameComponent(short_name,
743                                     self._config_data.instances.keys())
744
745   def _UnlockedGetInstanceInfo(self, instance_name):
746     """Returns informations about an instance.
747
748     This function is for internal use, when the config lock is already held.
749
750     """
751     if instance_name not in self._config_data.instances:
752       return None
753
754     return self._config_data.instances[instance_name]
755
756   @locking.ssynchronized(_config_lock, shared=1)
757   def GetInstanceInfo(self, instance_name):
758     """Returns informations about an instance.
759
760     It takes the information from the configuration file. Other informations of
761     an instance are taken from the live systems.
762
763     @param instance_name: name of the instance, e.g.
764         I{instance1.example.com}
765
766     @rtype: L{objects.Instance}
767     @return: the instance object
768
769     """
770     return self._UnlockedGetInstanceInfo(instance_name)
771
772   @locking.ssynchronized(_config_lock, shared=1)
773   def GetAllInstancesInfo(self):
774     """Get the configuration of all instances.
775
776     @rtype: dict
777     @returns: dict of (instance, instance_info), where instance_info is what
778               would GetInstanceInfo return for the node
779
780     """
781     my_dict = dict([(instance, self._UnlockedGetInstanceInfo(instance))
782                     for instance in self._UnlockedGetInstanceList()])
783     return my_dict
784
785   @locking.ssynchronized(_config_lock)
786   def AddNode(self, node):
787     """Add a node to the configuration.
788
789     @type node: L{objects.Node}
790     @param node: a Node instance
791
792     """
793     logging.info("Adding node %s to configuration" % node.name)
794
795     node.serial_no = 1
796     self._config_data.nodes[node.name] = node
797     self._config_data.cluster.serial_no += 1
798     self._WriteConfig()
799
800   @locking.ssynchronized(_config_lock)
801   def RemoveNode(self, node_name):
802     """Remove a node from the configuration.
803
804     """
805     logging.info("Removing node %s from configuration" % node_name)
806
807     if node_name not in self._config_data.nodes:
808       raise errors.ConfigurationError("Unknown node '%s'" % node_name)
809
810     del self._config_data.nodes[node_name]
811     self._config_data.cluster.serial_no += 1
812     self._WriteConfig()
813
814   @locking.ssynchronized(_config_lock, shared=1)
815   def ExpandNodeName(self, short_name):
816     """Attempt to expand an incomplete instance name.
817
818     """
819     return utils.MatchNameComponent(short_name,
820                                     self._config_data.nodes.keys())
821
822   def _UnlockedGetNodeInfo(self, node_name):
823     """Get the configuration of a node, as stored in the config.
824
825     This function is for internal use, when the config lock is already
826     held.
827
828     @param node_name: the node name, e.g. I{node1.example.com}
829
830     @rtype: L{objects.Node}
831     @return: the node object
832
833     """
834     if node_name not in self._config_data.nodes:
835       return None
836
837     return self._config_data.nodes[node_name]
838
839
840   @locking.ssynchronized(_config_lock, shared=1)
841   def GetNodeInfo(self, node_name):
842     """Get the configuration of a node, as stored in the config.
843
844     This is just a locked wrapper over L{_UnlockedGetNodeInfo}.
845
846     @param node_name: the node name, e.g. I{node1.example.com}
847
848     @rtype: L{objects.Node}
849     @return: the node object
850
851     """
852     return self._UnlockedGetNodeInfo(node_name)
853
854   def _UnlockedGetNodeList(self):
855     """Return the list of nodes which are in the configuration.
856
857     This function is for internal use, when the config lock is already
858     held.
859
860     @rtype: list
861
862     """
863     return self._config_data.nodes.keys()
864
865
866   @locking.ssynchronized(_config_lock, shared=1)
867   def GetNodeList(self):
868     """Return the list of nodes which are in the configuration.
869
870     """
871     return self._UnlockedGetNodeList()
872
873   @locking.ssynchronized(_config_lock, shared=1)
874   def GetOnlineNodeList(self):
875     """Return the list of nodes which are online.
876
877     """
878     all_nodes = [self._UnlockedGetNodeInfo(node)
879                  for node in self._UnlockedGetNodeList()]
880     return [node.name for node in all_nodes if not node.offline]
881
882   @locking.ssynchronized(_config_lock, shared=1)
883   def GetAllNodesInfo(self):
884     """Get the configuration of all nodes.
885
886     @rtype: dict
887     @return: dict of (node, node_info), where node_info is what
888               would GetNodeInfo return for the node
889
890     """
891     my_dict = dict([(node, self._UnlockedGetNodeInfo(node))
892                     for node in self._UnlockedGetNodeList()])
893     return my_dict
894
895   def _UnlockedGetMasterCandidateStats(self):
896     """Get the number of current and maximum desired and possible candidates.
897
898     @rtype: tuple
899     @return: tuple of (current, desired and possible)
900
901     """
902     mc_now = mc_max = 0
903     for node in self._config_data.nodes.itervalues():
904       if not node.offline:
905         mc_max += 1
906       if node.master_candidate:
907         mc_now += 1
908     mc_max = min(mc_max, self._config_data.cluster.candidate_pool_size)
909     return (mc_now, mc_max)
910
911   @locking.ssynchronized(_config_lock, shared=1)
912   def GetMasterCandidateStats(self):
913     """Get the number of current and maximum possible candidates.
914
915     This is just a wrapper over L{_UnlockedGetMasterCandidateStats}.
916
917     @rtype: tuple
918     @return: tuple of (current, max)
919
920     """
921     return self._UnlockedGetMasterCandidateStats()
922
923   @locking.ssynchronized(_config_lock)
924   def MaintainCandidatePool(self):
925     """Try to grow the candidate pool to the desired size.
926
927     @rtype: list
928     @return: list with the adjusted nodes (L{objects.Node} instances)
929
930     """
931     mc_now, mc_max = self._UnlockedGetMasterCandidateStats()
932     mod_list = []
933     if mc_now < mc_max:
934       node_list = self._config_data.nodes.keys()
935       random.shuffle(node_list)
936       for name in node_list:
937         if mc_now >= mc_max:
938           break
939         node = self._config_data.nodes[name]
940         if node.master_candidate or node.offline:
941           continue
942         mod_list.append(node)
943         node.master_candidate = True
944         node.serial_no += 1
945         mc_now += 1
946       if mc_now != mc_max:
947         # this should not happen
948         logging.warning("Warning: MaintainCandidatePool didn't manage to"
949                         " fill the candidate pool (%d/%d)", mc_now, mc_max)
950       if mod_list:
951         self._config_data.cluster.serial_no += 1
952         self._WriteConfig()
953
954     return mod_list
955
956   def _BumpSerialNo(self):
957     """Bump up the serial number of the config.
958
959     """
960     self._config_data.serial_no += 1
961
962   def _OpenConfig(self):
963     """Read the config data from disk.
964
965     """
966     f = open(self._cfg_file, 'r')
967     try:
968       try:
969         data = objects.ConfigData.FromDict(serializer.Load(f.read()))
970       except Exception, err:
971         raise errors.ConfigurationError(err)
972     finally:
973       f.close()
974
975     # Make sure the configuration has the right version
976     _ValidateConfig(data)
977
978     if (not hasattr(data, 'cluster') or
979         not hasattr(data.cluster, 'rsahostkeypub')):
980       raise errors.ConfigurationError("Incomplete configuration"
981                                       " (missing cluster.rsahostkeypub)")
982     self._config_data = data
983     # reset the last serial as -1 so that the next write will cause
984     # ssconf update
985     self._last_cluster_serial = -1
986
987   def _DistributeConfig(self):
988     """Distribute the configuration to the other nodes.
989
990     Currently, this only copies the configuration file. In the future,
991     it could be used to encapsulate the 2/3-phase update mechanism.
992
993     """
994     if self._offline:
995       return True
996     bad = False
997
998     node_list = []
999     addr_list = []
1000     myhostname = self._my_hostname
1001     # we can skip checking whether _UnlockedGetNodeInfo returns None
1002     # since the node list comes from _UnlocketGetNodeList, and we are
1003     # called with the lock held, so no modifications should take place
1004     # in between
1005     for node_name in self._UnlockedGetNodeList():
1006       if node_name == myhostname:
1007         continue
1008       node_info = self._UnlockedGetNodeInfo(node_name)
1009       if not node_info.master_candidate:
1010         continue
1011       node_list.append(node_info.name)
1012       addr_list.append(node_info.primary_ip)
1013
1014     result = rpc.RpcRunner.call_upload_file(node_list, self._cfg_file,
1015                                             address_list=addr_list)
1016     for node in node_list:
1017       if not result[node]:
1018         logging.error("copy of file %s to node %s failed",
1019                       self._cfg_file, node)
1020         bad = True
1021     return not bad
1022
1023   def _WriteConfig(self, destination=None):
1024     """Write the configuration data to persistent storage.
1025
1026     """
1027     config_errors = self._UnlockedVerifyConfig()
1028     if config_errors:
1029       raise errors.ConfigurationError("Configuration data is not"
1030                                       " consistent: %s" %
1031                                       (", ".join(config_errors)))
1032     if destination is None:
1033       destination = self._cfg_file
1034     self._BumpSerialNo()
1035     txt = serializer.Dump(self._config_data.ToDict())
1036     dir_name, file_name = os.path.split(destination)
1037     fd, name = tempfile.mkstemp('.newconfig', file_name, dir_name)
1038     f = os.fdopen(fd, 'w')
1039     try:
1040       f.write(txt)
1041       os.fsync(f.fileno())
1042     finally:
1043       f.close()
1044     # we don't need to do os.close(fd) as f.close() did it
1045     os.rename(name, destination)
1046     self.write_count += 1
1047
1048     # and redistribute the config file to master candidates
1049     self._DistributeConfig()
1050
1051     # Write ssconf files on all nodes (including locally)
1052     if self._last_cluster_serial < self._config_data.cluster.serial_no:
1053       if not self._offline:
1054         rpc.RpcRunner.call_write_ssconf_files(self._UnlockedGetNodeList(),
1055                                               self._UnlockedGetSsconfValues())
1056       self._last_cluster_serial = self._config_data.cluster.serial_no
1057
1058   def _UnlockedGetSsconfValues(self):
1059     """Return the values needed by ssconf.
1060
1061     @rtype: dict
1062     @return: a dictionary with keys the ssconf names and values their
1063         associated value
1064
1065     """
1066     fn = "\n".join
1067     node_names = utils.NiceSort(self._UnlockedGetNodeList())
1068     node_info = [self._UnlockedGetNodeInfo(name) for name in node_names]
1069
1070     off_data = fn(node.name for node in node_info if node.offline)
1071     mc_data = fn(node.name for node in node_info if node.master_candidate)
1072     node_data = fn(node_names)
1073
1074     cluster = self._config_data.cluster
1075     return {
1076       constants.SS_CLUSTER_NAME: cluster.cluster_name,
1077       constants.SS_FILE_STORAGE_DIR: cluster.file_storage_dir,
1078       constants.SS_MASTER_CANDIDATES: mc_data,
1079       constants.SS_MASTER_IP: cluster.master_ip,
1080       constants.SS_MASTER_NETDEV: cluster.master_netdev,
1081       constants.SS_MASTER_NODE: cluster.master_node,
1082       constants.SS_NODE_LIST: node_data,
1083       constants.SS_OFFLINE_NODES: off_data,
1084       constants.SS_RELEASE_VERSION: constants.RELEASE_VERSION,
1085       }
1086
1087   @locking.ssynchronized(_config_lock)
1088   def InitConfig(self, version, cluster_config, master_node_config):
1089     """Create the initial cluster configuration.
1090
1091     It will contain the current node, which will also be the master
1092     node, and no instances.
1093
1094     @type version: int
1095     @param version: Configuration version
1096     @type cluster_config: objects.Cluster
1097     @param cluster_config: Cluster configuration
1098     @type master_node_config: objects.Node
1099     @param master_node_config: Master node configuration
1100
1101     """
1102     nodes = {
1103       master_node_config.name: master_node_config,
1104       }
1105
1106     self._config_data = objects.ConfigData(version=version,
1107                                            cluster=cluster_config,
1108                                            nodes=nodes,
1109                                            instances={},
1110                                            serial_no=1)
1111     self._WriteConfig()
1112
1113   @locking.ssynchronized(_config_lock, shared=1)
1114   def GetVGName(self):
1115     """Return the volume group name.
1116
1117     """
1118     return self._config_data.cluster.volume_group_name
1119
1120   @locking.ssynchronized(_config_lock)
1121   def SetVGName(self, vg_name):
1122     """Set the volume group name.
1123
1124     """
1125     self._config_data.cluster.volume_group_name = vg_name
1126     self._config_data.cluster.serial_no += 1
1127     self._WriteConfig()
1128
1129   @locking.ssynchronized(_config_lock, shared=1)
1130   def GetDefBridge(self):
1131     """Return the default bridge.
1132
1133     """
1134     return self._config_data.cluster.default_bridge
1135
1136   @locking.ssynchronized(_config_lock, shared=1)
1137   def GetMACPrefix(self):
1138     """Return the mac prefix.
1139
1140     """
1141     return self._config_data.cluster.mac_prefix
1142
1143   @locking.ssynchronized(_config_lock, shared=1)
1144   def GetClusterInfo(self):
1145     """Returns informations about the cluster
1146
1147     @rtype: L{objects.Cluster}
1148     @return: the cluster object
1149
1150     """
1151     return self._config_data.cluster
1152
1153   @locking.ssynchronized(_config_lock)
1154   def Update(self, target):
1155     """Notify function to be called after updates.
1156
1157     This function must be called when an object (as returned by
1158     GetInstanceInfo, GetNodeInfo, GetCluster) has been updated and the
1159     caller wants the modifications saved to the backing store. Note
1160     that all modified objects will be saved, but the target argument
1161     is the one the caller wants to ensure that it's saved.
1162
1163     @param target: an instance of either L{objects.Cluster},
1164         L{objects.Node} or L{objects.Instance} which is existing in
1165         the cluster
1166
1167     """
1168     if self._config_data is None:
1169       raise errors.ProgrammerError("Configuration file not read,"
1170                                    " cannot save.")
1171     update_serial = False
1172     if isinstance(target, objects.Cluster):
1173       test = target == self._config_data.cluster
1174     elif isinstance(target, objects.Node):
1175       test = target in self._config_data.nodes.values()
1176       update_serial = True
1177     elif isinstance(target, objects.Instance):
1178       test = target in self._config_data.instances.values()
1179     else:
1180       raise errors.ProgrammerError("Invalid object type (%s) passed to"
1181                                    " ConfigWriter.Update" % type(target))
1182     if not test:
1183       raise errors.ConfigurationError("Configuration updated since object"
1184                                       " has been read or unknown object")
1185     target.serial_no += 1
1186
1187     if update_serial:
1188       # for node updates, we need to increase the cluster serial too
1189       self._config_data.cluster.serial_no += 1
1190
1191     if isinstance(target, objects.Instance):
1192       self._UnlockedReleaseDRBDMinors(target.name)
1193
1194     self._WriteConfig()