cmdlib: Sort list of fields for QueryFields
[ganeti-local] / test / ganeti.config_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for unittesting the config module"""
23
24
25 import unittest
26 import os
27 import time
28 import tempfile
29 import os.path
30 import socket
31
32 from ganeti import bootstrap
33 from ganeti import config
34 from ganeti import constants
35 from ganeti import errors
36 from ganeti import objects
37 from ganeti import utils
38 from ganeti import netutils
39
40 from ganeti.config import TemporaryReservationManager
41
42 import testutils
43 import mocks
44
45
46 def _StubGetEntResolver():
47   return mocks.FakeGetentResolver()
48
49
50 class TestConfigRunner(unittest.TestCase):
51   """Testing case for HooksRunner"""
52   def setUp(self):
53     fd, self.cfg_file = tempfile.mkstemp()
54     os.close(fd)
55     self._init_cluster(self.cfg_file)
56
57   def tearDown(self):
58     try:
59       os.unlink(self.cfg_file)
60     except OSError:
61       pass
62
63   def _get_object(self):
64     """Returns a instance of ConfigWriter"""
65     cfg = config.ConfigWriter(cfg_file=self.cfg_file, offline=True,
66                               _getents=_StubGetEntResolver)
67     return cfg
68
69   def _init_cluster(self, cfg):
70     """Initializes the cfg object"""
71     me = netutils.Hostname()
72     ip = constants.IP4_ADDRESS_LOCALHOST
73
74     cluster_config = objects.Cluster(
75       serial_no=1,
76       rsahostkeypub="",
77       highest_used_port=(constants.FIRST_DRBD_PORT - 1),
78       mac_prefix="aa:00:00",
79       volume_group_name="xenvg",
80       drbd_usermode_helper="/bin/true",
81       nicparams={constants.PP_DEFAULT: constants.NICC_DEFAULTS},
82       ndparams=constants.NDC_DEFAULTS,
83       tcpudp_port_pool=set(),
84       enabled_hypervisors=[constants.HT_FAKE],
85       master_node=me.name,
86       master_ip="127.0.0.1",
87       master_netdev=constants.DEFAULT_BRIDGE,
88       cluster_name="cluster.local",
89       file_storage_dir="/tmp",
90       uid_pool=[],
91       )
92
93     master_node_config = objects.Node(name=me.name,
94                                       primary_ip=me.ip,
95                                       secondary_ip=ip,
96                                       serial_no=1,
97                                       master_candidate=True)
98
99     bootstrap.InitConfig(constants.CONFIG_VERSION,
100                          cluster_config, master_node_config, self.cfg_file)
101
102   def _create_instance(self):
103     """Create and return an instance object"""
104     inst = objects.Instance(name="test.example.com", disks=[], nics=[],
105                             disk_template=constants.DT_DISKLESS,
106                             primary_node=self._get_object().GetMasterNode())
107     return inst
108
109   def testEmpty(self):
110     """Test instantiate config object"""
111     self._get_object()
112
113   def testInit(self):
114     """Test initialize the config file"""
115     cfg = self._get_object()
116     self.failUnlessEqual(1, len(cfg.GetNodeList()))
117     self.failUnlessEqual(0, len(cfg.GetInstanceList()))
118
119   def testUpdateCluster(self):
120     """Test updates on the cluster object"""
121     cfg = self._get_object()
122     # construct a fake cluster object
123     fake_cl = objects.Cluster()
124     # fail if we didn't read the config
125     self.failUnlessRaises(errors.ConfigurationError, cfg.Update, fake_cl, None)
126
127     cl = cfg.GetClusterInfo()
128     # first pass, must not fail
129     cfg.Update(cl, None)
130     # second pass, also must not fail (after the config has been written)
131     cfg.Update(cl, None)
132     # but the fake_cl update should still fail
133     self.failUnlessRaises(errors.ConfigurationError, cfg.Update, fake_cl, None)
134
135   def testUpdateNode(self):
136     """Test updates on one node object"""
137     cfg = self._get_object()
138     # construct a fake node
139     fake_node = objects.Node()
140     # fail if we didn't read the config
141     self.failUnlessRaises(errors.ConfigurationError, cfg.Update, fake_node,
142                           None)
143
144     node = cfg.GetNodeInfo(cfg.GetNodeList()[0])
145     # first pass, must not fail
146     cfg.Update(node, None)
147     # second pass, also must not fail (after the config has been written)
148     cfg.Update(node, None)
149     # but the fake_node update should still fail
150     self.failUnlessRaises(errors.ConfigurationError, cfg.Update, fake_node,
151                           None)
152
153   def testUpdateInstance(self):
154     """Test updates on one instance object"""
155     cfg = self._get_object()
156     # construct a fake instance
157     inst = self._create_instance()
158     fake_instance = objects.Instance()
159     # fail if we didn't read the config
160     self.failUnlessRaises(errors.ConfigurationError, cfg.Update, fake_instance,
161                           None)
162
163     cfg.AddInstance(inst, "my-job")
164     instance = cfg.GetInstanceInfo(cfg.GetInstanceList()[0])
165     # first pass, must not fail
166     cfg.Update(instance, None)
167     # second pass, also must not fail (after the config has been written)
168     cfg.Update(instance, None)
169     # but the fake_instance update should still fail
170     self.failUnlessRaises(errors.ConfigurationError, cfg.Update, fake_instance,
171                           None)
172
173   def testNICParameterSyntaxCheck(self):
174     """Test the NIC's CheckParameterSyntax function"""
175     mode = constants.NIC_MODE
176     link = constants.NIC_LINK
177     m_bridged = constants.NIC_MODE_BRIDGED
178     m_routed = constants.NIC_MODE_ROUTED
179     CheckSyntax = objects.NIC.CheckParameterSyntax
180
181     CheckSyntax(constants.NICC_DEFAULTS)
182     CheckSyntax({mode: m_bridged, link: 'br1'})
183     CheckSyntax({mode: m_routed, link: 'default'})
184     self.assertRaises(errors.ConfigurationError,
185                       CheckSyntax, {mode: '000invalid', link: 'any'})
186     self.assertRaises(errors.ConfigurationError,
187                       CheckSyntax, {mode: m_bridged, link: None})
188     self.assertRaises(errors.ConfigurationError,
189                       CheckSyntax, {mode: m_bridged, link: ''})
190
191   def testGetNdParamsDefault(self):
192     cfg = self._get_object()
193     node = cfg.GetNodeInfo(cfg.GetNodeList()[0])
194     self.assertEqual(cfg.GetNdParams(node), constants.NDC_DEFAULTS)
195
196   def testGetNdParamsModifiedNode(self):
197     my_ndparams = {
198         constants.ND_OOB_PROGRAM: "/bin/node-oob",
199         }
200
201     cfg = self._get_object()
202     node = cfg.GetNodeInfo(cfg.GetNodeList()[0])
203     node.ndparams = my_ndparams
204     cfg.Update(node, None)
205     self.assertEqual(cfg.GetNdParams(node), my_ndparams)
206
207
208 class TestTRM(unittest.TestCase):
209   EC_ID = 1
210
211   def testEmpty(self):
212     t = TemporaryReservationManager()
213     t.Reserve(self.EC_ID, "a")
214     self.assertFalse(t.Reserved(self.EC_ID))
215     self.assertTrue(t.Reserved("a"))
216     self.assertEqual(len(t.GetReserved()), 1)
217
218   def testDuplicate(self):
219     t = TemporaryReservationManager()
220     t.Reserve(self.EC_ID, "a")
221     self.assertRaises(errors.ReservationError, t.Reserve, 2, "a")
222     t.DropECReservations(self.EC_ID)
223     self.assertFalse(t.Reserved("a"))
224
225
226 if __name__ == '__main__':
227   testutils.GanetiTestProgram()