Implement {Add,Readd,Remove}Node in GanetiContext
[ganeti-local] / lib / config.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Configuration management for Ganeti
23
24 This module provides the interface to the Ganeti cluster configuration.
25
26 The configuration data is stored on every node but is updated on the master
27 only. After each update, the master distributes the data to the other nodes.
28
29 Currently, the data storage format is JSON. YAML was slow and consuming too
30 much memory.
31
32 """
33
34 import os
35 import tempfile
36 import random
37 import logging
38
39 from ganeti import errors
40 from ganeti import locking
41 from ganeti import logger
42 from ganeti import utils
43 from ganeti import constants
44 from ganeti import rpc
45 from ganeti import objects
46 from ganeti import serializer
47 from ganeti import ssconf
48
49
50 _config_lock = locking.SharedLock()
51
52
53 def ValidateConfig():
54   sstore = ssconf.SimpleStore()
55
56   if sstore.GetConfigVersion() != constants.CONFIG_VERSION:
57     raise errors.ConfigurationError("Cluster configuration version"
58                                     " mismatch, got %s instead of %s" %
59                                     (sstore.GetConfigVersion(),
60                                      constants.CONFIG_VERSION))
61
62
63 class ConfigWriter:
64   """The interface to the cluster configuration.
65
66   """
67   def __init__(self, cfg_file=None, offline=False):
68     self.write_count = 0
69     self._lock = _config_lock
70     self._config_data = None
71     self._config_time = None
72     self._config_size = None
73     self._config_inode = None
74     self._offline = offline
75     if cfg_file is None:
76       self._cfg_file = constants.CLUSTER_CONF_FILE
77     else:
78       self._cfg_file = cfg_file
79     self._temporary_ids = set()
80     # Note: in order to prevent errors when resolving our name in
81     # _DistributeConfig, we compute it here once and reuse it; it's
82     # better to raise an error before starting to modify the config
83     # file than after it was modified
84     self._my_hostname = utils.HostInfo().name
85
86   # this method needs to be static, so that we can call it on the class
87   @staticmethod
88   def IsCluster():
89     """Check if the cluster is configured.
90
91     """
92     return os.path.exists(constants.CLUSTER_CONF_FILE)
93
94   @locking.ssynchronized(_config_lock, shared=1)
95   def GenerateMAC(self):
96     """Generate a MAC for an instance.
97
98     This should check the current instances for duplicates.
99
100     """
101     self._OpenConfig()
102     prefix = self._config_data.cluster.mac_prefix
103     all_macs = self._AllMACs()
104     retries = 64
105     while retries > 0:
106       byte1 = random.randrange(0, 256)
107       byte2 = random.randrange(0, 256)
108       byte3 = random.randrange(0, 256)
109       mac = "%s:%02x:%02x:%02x" % (prefix, byte1, byte2, byte3)
110       if mac not in all_macs:
111         break
112       retries -= 1
113     else:
114       raise errors.ConfigurationError("Can't generate unique MAC")
115     return mac
116
117   @locking.ssynchronized(_config_lock, shared=1)
118   def IsMacInUse(self, mac):
119     """Predicate: check if the specified MAC is in use in the Ganeti cluster.
120
121     This only checks instances managed by this cluster, it does not
122     check for potential collisions elsewhere.
123
124     """
125     self._OpenConfig()
126     all_macs = self._AllMACs()
127     return mac in all_macs
128
129   def _ComputeAllLVs(self):
130     """Compute the list of all LVs.
131
132     """
133     self._OpenConfig()
134     lvnames = set()
135     for instance in self._config_data.instances.values():
136       node_data = instance.MapLVsByNode()
137       for lv_list in node_data.values():
138         lvnames.update(lv_list)
139     return lvnames
140
141   @locking.ssynchronized(_config_lock, shared=1)
142   def GenerateUniqueID(self, exceptions=None):
143     """Generate an unique disk name.
144
145     This checks the current node, instances and disk names for
146     duplicates.
147
148     Args:
149       - exceptions: a list with some other names which should be checked
150                     for uniqueness (used for example when you want to get
151                     more than one id at one time without adding each one in
152                     turn to the config file
153
154     Returns: the unique id as a string
155
156     """
157     existing = set()
158     existing.update(self._temporary_ids)
159     existing.update(self._ComputeAllLVs())
160     existing.update(self._config_data.instances.keys())
161     existing.update(self._config_data.nodes.keys())
162     if exceptions is not None:
163       existing.update(exceptions)
164     retries = 64
165     while retries > 0:
166       unique_id = utils.NewUUID()
167       if unique_id not in existing and unique_id is not None:
168         break
169     else:
170       raise errors.ConfigurationError("Not able generate an unique ID"
171                                       " (last tried ID: %s" % unique_id)
172     self._temporary_ids.add(unique_id)
173     return unique_id
174
175   def _AllMACs(self):
176     """Return all MACs present in the config.
177
178     """
179     self._OpenConfig()
180
181     result = []
182     for instance in self._config_data.instances.values():
183       for nic in instance.nics:
184         result.append(nic.mac)
185
186     return result
187
188   @locking.ssynchronized(_config_lock, shared=1)
189   def VerifyConfig(self):
190     """Stub verify function.
191     """
192     self._OpenConfig()
193
194     result = []
195     seen_macs = []
196     data = self._config_data
197     for instance_name in data.instances:
198       instance = data.instances[instance_name]
199       if instance.primary_node not in data.nodes:
200         result.append("instance '%s' has invalid primary node '%s'" %
201                       (instance_name, instance.primary_node))
202       for snode in instance.secondary_nodes:
203         if snode not in data.nodes:
204           result.append("instance '%s' has invalid secondary node '%s'" %
205                         (instance_name, snode))
206       for idx, nic in enumerate(instance.nics):
207         if nic.mac in seen_macs:
208           result.append("instance '%s' has NIC %d mac %s duplicate" %
209                         (instance_name, idx, nic.mac))
210         else:
211           seen_macs.append(nic.mac)
212     return result
213
214   def _UnlockedSetDiskID(self, disk, node_name):
215     """Convert the unique ID to the ID needed on the target nodes.
216
217     This is used only for drbd, which needs ip/port configuration.
218
219     The routine descends down and updates its children also, because
220     this helps when the only the top device is passed to the remote
221     node.
222
223     This function is for internal use, when the config lock is already held.
224
225     """
226     if disk.children:
227       for child in disk.children:
228         self._UnlockedSetDiskID(child, node_name)
229
230     if disk.logical_id is None and disk.physical_id is not None:
231       return
232     if disk.dev_type in constants.LDS_DRBD:
233       pnode, snode, port = disk.logical_id
234       if node_name not in (pnode, snode):
235         raise errors.ConfigurationError("DRBD device not knowing node %s" %
236                                         node_name)
237       pnode_info = self._UnlockedGetNodeInfo(pnode)
238       snode_info = self._UnlockedGetNodeInfo(snode)
239       if pnode_info is None or snode_info is None:
240         raise errors.ConfigurationError("Can't find primary or secondary node"
241                                         " for %s" % str(disk))
242       if pnode == node_name:
243         disk.physical_id = (pnode_info.secondary_ip, port,
244                             snode_info.secondary_ip, port)
245       else: # it must be secondary, we tested above
246         disk.physical_id = (snode_info.secondary_ip, port,
247                             pnode_info.secondary_ip, port)
248     else:
249       disk.physical_id = disk.logical_id
250     return
251
252   @locking.ssynchronized(_config_lock)
253   def SetDiskID(self, disk, node_name):
254     """Convert the unique ID to the ID needed on the target nodes.
255
256     This is used only for drbd, which needs ip/port configuration.
257
258     The routine descends down and updates its children also, because
259     this helps when the only the top device is passed to the remote
260     node.
261
262     """
263     return self._UnlockedSetDiskID(disk, node_name)
264
265   @locking.ssynchronized(_config_lock)
266   def AddTcpUdpPort(self, port):
267     """Adds a new port to the available port pool.
268
269     """
270     if not isinstance(port, int):
271       raise errors.ProgrammerError("Invalid type passed for port")
272
273     self._OpenConfig()
274     self._config_data.cluster.tcpudp_port_pool.add(port)
275     self._WriteConfig()
276
277   @locking.ssynchronized(_config_lock, shared=1)
278   def GetPortList(self):
279     """Returns a copy of the current port list.
280
281     """
282     self._OpenConfig()
283     return self._config_data.cluster.tcpudp_port_pool.copy()
284
285   @locking.ssynchronized(_config_lock)
286   def AllocatePort(self):
287     """Allocate a port.
288
289     The port will be taken from the available port pool or from the
290     default port range (and in this case we increase
291     highest_used_port).
292
293     """
294     self._OpenConfig()
295
296     # If there are TCP/IP ports configured, we use them first.
297     if self._config_data.cluster.tcpudp_port_pool:
298       port = self._config_data.cluster.tcpudp_port_pool.pop()
299     else:
300       port = self._config_data.cluster.highest_used_port + 1
301       if port >= constants.LAST_DRBD_PORT:
302         raise errors.ConfigurationError("The highest used port is greater"
303                                         " than %s. Aborting." %
304                                         constants.LAST_DRBD_PORT)
305       self._config_data.cluster.highest_used_port = port
306
307     self._WriteConfig()
308     return port
309
310   @locking.ssynchronized(_config_lock, shared=1)
311   def GetHostKey(self):
312     """Return the rsa hostkey from the config.
313
314     Args: None
315
316     Returns: rsa hostkey
317     """
318     self._OpenConfig()
319     return self._config_data.cluster.rsahostkeypub
320
321   @locking.ssynchronized(_config_lock)
322   def AddInstance(self, instance):
323     """Add an instance to the config.
324
325     This should be used after creating a new instance.
326
327     Args:
328       instance: the instance object
329     """
330     if not isinstance(instance, objects.Instance):
331       raise errors.ProgrammerError("Invalid type passed to AddInstance")
332
333     if instance.disk_template != constants.DT_DISKLESS:
334       all_lvs = instance.MapLVsByNode()
335       logger.Info("Instance '%s' DISK_LAYOUT: %s" % (instance.name, all_lvs))
336
337     self._OpenConfig()
338     self._config_data.instances[instance.name] = instance
339     self._WriteConfig()
340
341   def _SetInstanceStatus(self, instance_name, status):
342     """Set the instance's status to a given value.
343
344     """
345     if status not in ("up", "down"):
346       raise errors.ProgrammerError("Invalid status '%s' passed to"
347                                    " ConfigWriter._SetInstanceStatus()" %
348                                    status)
349     self._OpenConfig()
350
351     if instance_name not in self._config_data.instances:
352       raise errors.ConfigurationError("Unknown instance '%s'" %
353                                       instance_name)
354     instance = self._config_data.instances[instance_name]
355     if instance.status != status:
356       instance.status = status
357       self._WriteConfig()
358
359   @locking.ssynchronized(_config_lock)
360   def MarkInstanceUp(self, instance_name):
361     """Mark the instance status to up in the config.
362
363     """
364     self._SetInstanceStatus(instance_name, "up")
365
366   @locking.ssynchronized(_config_lock)
367   def RemoveInstance(self, instance_name):
368     """Remove the instance from the configuration.
369
370     """
371     self._OpenConfig()
372
373     if instance_name not in self._config_data.instances:
374       raise errors.ConfigurationError("Unknown instance '%s'" % instance_name)
375     del self._config_data.instances[instance_name]
376     self._WriteConfig()
377
378   @locking.ssynchronized(_config_lock)
379   def RenameInstance(self, old_name, new_name):
380     """Rename an instance.
381
382     This needs to be done in ConfigWriter and not by RemoveInstance
383     combined with AddInstance as only we can guarantee an atomic
384     rename.
385
386     """
387     self._OpenConfig()
388     if old_name not in self._config_data.instances:
389       raise errors.ConfigurationError("Unknown instance '%s'" % old_name)
390     inst = self._config_data.instances[old_name]
391     del self._config_data.instances[old_name]
392     inst.name = new_name
393
394     for disk in inst.disks:
395       if disk.dev_type == constants.LD_FILE:
396         # rename the file paths in logical and physical id
397         file_storage_dir = os.path.dirname(os.path.dirname(disk.logical_id[1]))
398         disk.physical_id = disk.logical_id = (disk.logical_id[0],
399                                               os.path.join(file_storage_dir,
400                                                            inst.name,
401                                                            disk.iv_name))
402
403     self._config_data.instances[inst.name] = inst
404     self._WriteConfig()
405
406   @locking.ssynchronized(_config_lock)
407   def MarkInstanceDown(self, instance_name):
408     """Mark the status of an instance to down in the configuration.
409
410     """
411     self._SetInstanceStatus(instance_name, "down")
412
413   @locking.ssynchronized(_config_lock, shared=1)
414   def GetInstanceList(self):
415     """Get the list of instances.
416
417     Returns:
418       array of instances, ex. ['instance2.example.com','instance1.example.com']
419       these contains all the instances, also the ones in Admin_down state
420
421     """
422     self._OpenConfig()
423
424     return self._config_data.instances.keys()
425
426   @locking.ssynchronized(_config_lock, shared=1)
427   def ExpandInstanceName(self, short_name):
428     """Attempt to expand an incomplete instance name.
429
430     """
431     self._OpenConfig()
432
433     return utils.MatchNameComponent(short_name,
434                                     self._config_data.instances.keys())
435
436   @locking.ssynchronized(_config_lock, shared=1)
437   def GetInstanceInfo(self, instance_name):
438     """Returns informations about an instance.
439
440     It takes the information from the configuration file. Other informations of
441     an instance are taken from the live systems.
442
443     Args:
444       instance: name of the instance, ex instance1.example.com
445
446     Returns:
447       the instance object
448
449     """
450     self._OpenConfig()
451
452     if instance_name not in self._config_data.instances:
453       return None
454
455     return self._config_data.instances[instance_name]
456
457   @locking.ssynchronized(_config_lock)
458   def AddNode(self, node):
459     """Add a node to the configuration.
460
461     Args:
462       node: an object.Node instance
463
464     """
465     logging.info("Adding node %s to configuration" % node.name)
466
467     self._OpenConfig()
468     self._config_data.nodes[node.name] = node
469     self._WriteConfig()
470
471   @locking.ssynchronized(_config_lock)
472   def RemoveNode(self, node_name):
473     """Remove a node from the configuration.
474
475     """
476     logging.info("Removing node %s from configuration" % node_name)
477
478     self._OpenConfig()
479     if node_name not in self._config_data.nodes:
480       raise errors.ConfigurationError("Unknown node '%s'" % node_name)
481
482     del self._config_data.nodes[node_name]
483     self._WriteConfig()
484
485   @locking.ssynchronized(_config_lock, shared=1)
486   def ExpandNodeName(self, short_name):
487     """Attempt to expand an incomplete instance name.
488
489     """
490     self._OpenConfig()
491
492     return utils.MatchNameComponent(short_name,
493                                     self._config_data.nodes.keys())
494
495   def _UnlockedGetNodeInfo(self, node_name):
496     """Get the configuration of a node, as stored in the config.
497
498     This function is for internal use, when the config lock is already held.
499
500     Args: node: nodename (tuple) of the node
501
502     Returns: the node object
503
504     """
505     self._OpenConfig()
506
507     if node_name not in self._config_data.nodes:
508       return None
509
510     return self._config_data.nodes[node_name]
511
512
513   @locking.ssynchronized(_config_lock, shared=1)
514   def GetNodeInfo(self, node_name):
515     """Get the configuration of a node, as stored in the config.
516
517     Args: node: nodename (tuple) of the node
518
519     Returns: the node object
520
521     """
522     return self._UnlockedGetNodeInfo(node_name)
523
524   def _UnlockedGetNodeList(self):
525     """Return the list of nodes which are in the configuration.
526
527     This function is for internal use, when the config lock is already held.
528
529     """
530     self._OpenConfig()
531     return self._config_data.nodes.keys()
532
533
534   @locking.ssynchronized(_config_lock, shared=1)
535   def GetNodeList(self):
536     """Return the list of nodes which are in the configuration.
537
538     """
539     return self._UnlockedGetNodeList()
540
541   @locking.ssynchronized(_config_lock, shared=1)
542   def DumpConfig(self):
543     """Return the entire configuration of the cluster.
544     """
545     self._OpenConfig()
546     return self._config_data
547
548   def _BumpSerialNo(self):
549     """Bump up the serial number of the config.
550
551     """
552     self._config_data.cluster.serial_no += 1
553
554   def _OpenConfig(self):
555     """Read the config data from disk.
556
557     In case we already have configuration data and the config file has
558     the same mtime as when we read it, we skip the parsing of the
559     file, since de-serialisation could be slow.
560
561     """
562     try:
563       st = os.stat(self._cfg_file)
564     except OSError, err:
565       raise errors.ConfigurationError("Can't stat config file: %s" % err)
566     if (self._config_data is not None and
567         self._config_time is not None and
568         self._config_time == st.st_mtime and
569         self._config_size == st.st_size and
570         self._config_inode == st.st_ino):
571       # data is current, so skip loading of config file
572       return
573
574     # Make sure the configuration has the right version
575     ValidateConfig()
576
577     f = open(self._cfg_file, 'r')
578     try:
579       try:
580         data = objects.ConfigData.FromDict(serializer.Load(f.read()))
581       except Exception, err:
582         raise errors.ConfigurationError(err)
583     finally:
584       f.close()
585     if (not hasattr(data, 'cluster') or
586         not hasattr(data.cluster, 'rsahostkeypub')):
587       raise errors.ConfigurationError("Incomplete configuration"
588                                       " (missing cluster.rsahostkeypub)")
589     self._config_data = data
590     self._config_time = st.st_mtime
591     self._config_size = st.st_size
592     self._config_inode = st.st_ino
593
594   def _DistributeConfig(self):
595     """Distribute the configuration to the other nodes.
596
597     Currently, this only copies the configuration file. In the future,
598     it could be used to encapsulate the 2/3-phase update mechanism.
599
600     """
601     if self._offline:
602       return True
603     bad = False
604     nodelist = self._UnlockedGetNodeList()
605     myhostname = self._my_hostname
606
607     try:
608       nodelist.remove(myhostname)
609     except ValueError:
610       pass
611
612     result = rpc.call_upload_file(nodelist, self._cfg_file)
613     for node in nodelist:
614       if not result[node]:
615         logger.Error("copy of file %s to node %s failed" %
616                      (self._cfg_file, node))
617         bad = True
618     return not bad
619
620   def _WriteConfig(self, destination=None):
621     """Write the configuration data to persistent storage.
622
623     """
624     if destination is None:
625       destination = self._cfg_file
626     self._BumpSerialNo()
627     txt = serializer.Dump(self._config_data.ToDict())
628     dir_name, file_name = os.path.split(destination)
629     fd, name = tempfile.mkstemp('.newconfig', file_name, dir_name)
630     f = os.fdopen(fd, 'w')
631     try:
632       f.write(txt)
633       os.fsync(f.fileno())
634     finally:
635       f.close()
636     # we don't need to do os.close(fd) as f.close() did it
637     os.rename(name, destination)
638     self.write_count += 1
639     # re-set our cache as not to re-read the config file
640     try:
641       st = os.stat(destination)
642     except OSError, err:
643       raise errors.ConfigurationError("Can't stat config file: %s" % err)
644     self._config_time = st.st_mtime
645     self._config_size = st.st_size
646     self._config_inode = st.st_ino
647     # and redistribute the config file
648     self._DistributeConfig()
649
650   @locking.ssynchronized(_config_lock)
651   def InitConfig(self, node, primary_ip, secondary_ip,
652                  hostkeypub, mac_prefix, vg_name, def_bridge):
653     """Create the initial cluster configuration.
654
655     It will contain the current node, which will also be the master
656     node, and no instances or operating systmes.
657
658     Args:
659       node: the nodename of the initial node
660       primary_ip: the IP address of the current host
661       secondary_ip: the secondary IP of the current host or None
662       hostkeypub: the public hostkey of this host
663
664     """
665     hu_port = constants.FIRST_DRBD_PORT - 1
666     globalconfig = objects.Cluster(serial_no=1,
667                                    rsahostkeypub=hostkeypub,
668                                    highest_used_port=hu_port,
669                                    mac_prefix=mac_prefix,
670                                    volume_group_name=vg_name,
671                                    default_bridge=def_bridge,
672                                    tcpudp_port_pool=set())
673     if secondary_ip is None:
674       secondary_ip = primary_ip
675     nodeconfig = objects.Node(name=node, primary_ip=primary_ip,
676                               secondary_ip=secondary_ip)
677
678     self._config_data = objects.ConfigData(nodes={node: nodeconfig},
679                                            instances={},
680                                            cluster=globalconfig)
681     self._WriteConfig()
682
683   @locking.ssynchronized(_config_lock, shared=1)
684   def GetVGName(self):
685     """Return the volume group name.
686
687     """
688     self._OpenConfig()
689     return self._config_data.cluster.volume_group_name
690
691   @locking.ssynchronized(_config_lock)
692   def SetVGName(self, vg_name):
693     """Set the volume group name.
694
695     """
696     self._OpenConfig()
697     self._config_data.cluster.volume_group_name = vg_name
698     self._WriteConfig()
699
700   @locking.ssynchronized(_config_lock, shared=1)
701   def GetDefBridge(self):
702     """Return the default bridge.
703
704     """
705     self._OpenConfig()
706     return self._config_data.cluster.default_bridge
707
708   @locking.ssynchronized(_config_lock, shared=1)
709   def GetMACPrefix(self):
710     """Return the mac prefix.
711
712     """
713     self._OpenConfig()
714     return self._config_data.cluster.mac_prefix
715
716   @locking.ssynchronized(_config_lock, shared=1)
717   def GetClusterInfo(self):
718     """Returns informations about the cluster
719
720     Returns:
721       the cluster object
722
723     """
724     self._OpenConfig()
725
726     return self._config_data.cluster
727
728   @locking.ssynchronized(_config_lock)
729   def Update(self, target):
730     """Notify function to be called after updates.
731
732     This function must be called when an object (as returned by
733     GetInstanceInfo, GetNodeInfo, GetCluster) has been updated and the
734     caller wants the modifications saved to the backing store. Note
735     that all modified objects will be saved, but the target argument
736     is the one the caller wants to ensure that it's saved.
737
738     """
739     if self._config_data is None:
740       raise errors.ProgrammerError("Configuration file not read,"
741                                    " cannot save.")
742     if isinstance(target, objects.Cluster):
743       test = target == self._config_data.cluster
744     elif isinstance(target, objects.Node):
745       test = target in self._config_data.nodes.values()
746     elif isinstance(target, objects.Instance):
747       test = target in self._config_data.instances.values()
748     else:
749       raise errors.ProgrammerError("Invalid object type (%s) passed to"
750                                    " ConfigWriter.Update" % type(target))
751     if not test:
752       raise errors.ConfigurationError("Configuration updated since object"
753                                       " has been read or unknown object")
754     self._WriteConfig()