Add cluster options from ssconf to configuration
[ganeti-local] / test / ganeti.config_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for unittesting the config module"""
23
24
25 import unittest
26 import os
27 import time
28 import tempfile
29 import os.path
30 import socket
31
32 from ganeti import errors
33 from ganeti import constants
34 from ganeti import config
35 from ganeti import objects
36 from ganeti import utils
37
38
39 class TestConfigRunner(unittest.TestCase):
40   """Testing case for HooksRunner"""
41   def setUp(self):
42     fd, self.cfg_file = tempfile.mkstemp()
43     os.close(fd)
44
45   def tearDown(self):
46     try:
47       os.unlink(self.cfg_file)
48     except OSError:
49       pass
50
51   def _get_object(self):
52     """Returns a instance of ConfigWriter"""
53     cfg = config.ConfigWriter(cfg_file=self.cfg_file, offline=True)
54     return cfg
55
56   def _init_cluster(self, cfg):
57     """Initializes the cfg object"""
58     me = utils.HostInfo()
59     ip = constants.LOCALHOST_IP_ADDRESS
60
61     cluster_config = objects.Cluster(
62       serial_no=1,
63       rsahostkeypub="",
64       highest_used_port=(constants.FIRST_DRBD_PORT - 1),
65       mac_prefix="aa:00:00",
66       volume_group_name="xenvg",
67       default_bridge=constants.DEFAULT_BRIDGE,
68       tcpudp_port_pool=set(),
69       hypervisor=constants.HT_FAKE,
70       master_node=me.name,
71       master_ip="127.0.0.1",
72       master_netdev=constants.DEFAULT_BRIDGE,
73       cluster_name="cluster.local",
74       file_storage_dir="/tmp",
75       )
76
77     master_node_config = objects.Node(name=me.name,
78                                       primary_ip=me.ip,
79                                       secondary_ip=ip)
80
81     cfg.InitConfig(constants.CONFIG_VERSION,
82                    cluster_config, master_node_config)
83
84   def _create_instance(self):
85     """Create and return an instance object"""
86     inst = objects.Instance(name="test.example.com", disks=[],
87                             disk_template=constants.DT_DISKLESS)
88     return inst
89
90   def testEmpty(self):
91     """Test instantiate config object"""
92     self._get_object()
93
94   def testInit(self):
95     """Test initialize the config file"""
96     cfg = self._get_object()
97     self._init_cluster(cfg)
98     self.failUnlessEqual(1, len(cfg.GetNodeList()))
99     self.failUnlessEqual(0, len(cfg.GetInstanceList()))
100
101   def testUpdateCluster(self):
102     """Test updates on the cluster object"""
103     cfg = self._get_object()
104     # construct a fake cluster object
105     fake_cl = objects.Cluster()
106     # fail if we didn't read the config
107     self.failUnlessRaises(errors.ProgrammerError, cfg.Update, fake_cl)
108
109     self._init_cluster(cfg)
110     cl = cfg.GetClusterInfo()
111     # first pass, must not fail
112     cfg.Update(cl)
113     # second pass, also must not fail (after the config has been written)
114     cfg.Update(cl)
115     # but the fake_cl update should still fail
116     self.failUnlessRaises(errors.ConfigurationError, cfg.Update, fake_cl)
117
118   def testUpdateNode(self):
119     """Test updates on one node object"""
120     cfg = self._get_object()
121     # construct a fake node
122     fake_node = objects.Node()
123     # fail if we didn't read the config
124     self.failUnlessRaises(errors.ProgrammerError, cfg.Update, fake_node)
125
126     self._init_cluster(cfg)
127     node = cfg.GetNodeInfo(cfg.GetNodeList()[0])
128     # first pass, must not fail
129     cfg.Update(node)
130     # second pass, also must not fail (after the config has been written)
131     cfg.Update(node)
132     # but the fake_node update should still fail
133     self.failUnlessRaises(errors.ConfigurationError, cfg.Update, fake_node)
134
135   def testUpdateInstance(self):
136     """Test updates on one instance object"""
137     cfg = self._get_object()
138     # construct a fake instance
139     inst = self._create_instance()
140     fake_instance = objects.Instance()
141     # fail if we didn't read the config
142     self.failUnlessRaises(errors.ProgrammerError, cfg.Update, fake_instance)
143
144     self._init_cluster(cfg)
145     cfg.AddInstance(inst)
146     instance = cfg.GetInstanceInfo(cfg.GetInstanceList()[0])
147     # first pass, must not fail
148     cfg.Update(instance)
149     # second pass, also must not fail (after the config has been written)
150     cfg.Update(instance)
151     # but the fake_instance update should still fail
152     self.failUnlessRaises(errors.ConfigurationError, cfg.Update, fake_instance)
153
154
155 if __name__ == '__main__':
156   unittest.main()