Merge branch 'stable-2.9' into stable-2.10
[ganeti-local] / test / py / ganeti.ssconf_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for testing ganeti.ssconf"""
23
24 import os
25 import unittest
26 import tempfile
27 import shutil
28 import errno
29
30 from ganeti import utils
31 from ganeti import constants
32 from ganeti import errors
33 from ganeti import ssconf
34
35 import testutils
36 import mock
37
38
39 class TestReadSsconfFile(unittest.TestCase):
40   def setUp(self):
41     self.tmpdir = tempfile.mkdtemp()
42
43   def tearDown(self):
44     shutil.rmtree(self.tmpdir)
45
46   def testReadDirectory(self):
47     self.assertRaises(EnvironmentError, ssconf.ReadSsconfFile, self.tmpdir)
48
49   def testNonExistantFile(self):
50     testfile = utils.PathJoin(self.tmpdir, "does.not.exist")
51
52     self.assertFalse(os.path.exists(testfile))
53
54     try:
55       ssconf.ReadSsconfFile(testfile)
56     except EnvironmentError, err:
57       self.assertEqual(err.errno, errno.ENOENT)
58     else:
59       self.fail("Exception was not raised")
60
61   def testEmptyFile(self):
62     testfile = utils.PathJoin(self.tmpdir, "empty")
63
64     utils.WriteFile(testfile, data="")
65
66     self.assertEqual(ssconf.ReadSsconfFile(testfile), "")
67
68   def testSingleLine(self):
69     testfile = utils.PathJoin(self.tmpdir, "data")
70
71     for nl in range(0, 10):
72       utils.WriteFile(testfile, data="Hello World" + ("\n" * nl))
73
74       self.assertEqual(ssconf.ReadSsconfFile(testfile),
75                        "Hello World")
76
77   def testExactlyMaxSize(self):
78     testfile = utils.PathJoin(self.tmpdir, "data")
79
80     data = "A" * ssconf._MAX_SIZE
81     utils.WriteFile(testfile, data=data)
82
83     self.assertEqual(os.path.getsize(testfile), ssconf._MAX_SIZE)
84
85     self.assertEqual(ssconf.ReadSsconfFile(testfile),
86                      data)
87
88   def testLargeFile(self):
89     testfile = utils.PathJoin(self.tmpdir, "data")
90
91     for size in [ssconf._MAX_SIZE + 1, ssconf._MAX_SIZE * 2]:
92       utils.WriteFile(testfile, data="A" * size)
93       self.assertTrue(os.path.getsize(testfile) > ssconf._MAX_SIZE)
94       self.assertRaises(RuntimeError, ssconf.ReadSsconfFile, testfile)
95
96
97 class TestSimpleStore(unittest.TestCase):
98   def setUp(self):
99     self._tmpdir = tempfile.mkdtemp()
100     self.ssdir = utils.PathJoin(self._tmpdir, "files")
101     lockfile = utils.PathJoin(self._tmpdir, "lock")
102
103     os.mkdir(self.ssdir)
104
105     self.sstore = ssconf.SimpleStore(cfg_location=self.ssdir,
106                                      _lockfile=lockfile)
107
108   def tearDown(self):
109     shutil.rmtree(self._tmpdir)
110
111   def _ReadSsFile(self, filename):
112     return utils.ReadFile(utils.PathJoin(self.ssdir, "ssconf_%s" % filename))
113
114   def testInvalidKey(self):
115     self.assertRaises(errors.ProgrammerError, self.sstore.KeyToFilename,
116                       "not a valid key")
117     self.assertRaises(errors.ProgrammerError, self.sstore._ReadFile,
118                       "not a valid key")
119
120   def testKeyToFilename(self):
121     for key in ssconf._VALID_KEYS:
122       result = self.sstore.KeyToFilename(key)
123       self.assertTrue(utils.IsBelowDir(self.ssdir, result))
124       self.assertTrue(os.path.basename(result).startswith("ssconf_"))
125
126   def testReadFileNonExistingFile(self):
127     filename = self.sstore.KeyToFilename(constants.SS_CLUSTER_NAME)
128
129     self.assertFalse(os.path.exists(filename))
130     try:
131       self.sstore._ReadFile(constants.SS_CLUSTER_NAME)
132     except errors.ConfigurationError, err:
133       self.assertTrue(str(err).startswith("Can't read ssconf file"))
134     else:
135       self.fail("Exception was not raised")
136
137     for default in ["", "Hello World", 0, 100]:
138       self.assertFalse(os.path.exists(filename))
139       result = self.sstore._ReadFile(constants.SS_CLUSTER_NAME, default=default)
140       self.assertEqual(result, default)
141
142   def testReadFile(self):
143     utils.WriteFile(self.sstore.KeyToFilename(constants.SS_CLUSTER_NAME),
144                     data="cluster.example.com")
145
146     self.assertEqual(self.sstore._ReadFile(constants.SS_CLUSTER_NAME),
147                      "cluster.example.com")
148
149     self.assertEqual(self.sstore._ReadFile(constants.SS_CLUSTER_NAME,
150                                            default="something.example.com"),
151                      "cluster.example.com")
152
153   def testReadAllNoFiles(self):
154     self.assertEqual(self.sstore.ReadAll(), {})
155
156   def testReadAllSingleFile(self):
157     utils.WriteFile(self.sstore.KeyToFilename(constants.SS_CLUSTER_NAME),
158                     data="cluster.example.com")
159     self.assertEqual(self.sstore.ReadAll(), {
160       constants.SS_CLUSTER_NAME: "cluster.example.com",
161       })
162
163   def testWriteFiles(self):
164     values = {
165       constants.SS_CLUSTER_NAME: "cluster.example.com",
166       constants.SS_CLUSTER_TAGS: "value\nwith\nnewlines\n",
167       constants.SS_INSTANCE_LIST: "",
168       }
169
170     self.sstore.WriteFiles(values)
171
172     self.assertEqual(sorted(os.listdir(self.ssdir)), sorted([
173       "ssconf_cluster_name",
174       "ssconf_cluster_tags",
175       "ssconf_instance_list",
176       ]))
177
178     self.assertEqual(self._ReadSsFile(constants.SS_CLUSTER_NAME),
179                      "cluster.example.com\n")
180     self.assertEqual(self._ReadSsFile(constants.SS_CLUSTER_TAGS),
181                      "value\nwith\nnewlines\n")
182     self.assertEqual(self._ReadSsFile(constants.SS_INSTANCE_LIST), "")
183
184   def testWriteFilesUnknownKey(self):
185     values = {
186       "unknown key": "value",
187       }
188
189     self.assertRaises(errors.ProgrammerError, self.sstore.WriteFiles,
190                       values, dry_run=True)
191
192     self.assertEqual(os.listdir(self.ssdir), [])
193
194   def testWriteFilesDryRun(self):
195     values = {
196       constants.SS_CLUSTER_NAME: "cluster.example.com",
197       }
198
199     self.sstore.WriteFiles(values, dry_run=True)
200
201     self.assertEqual(os.listdir(self.ssdir), [])
202
203   def testWriteFilesNoValues(self):
204     for dry_run in [False, True]:
205       self.sstore.WriteFiles({}, dry_run=dry_run)
206
207       self.assertEqual(os.listdir(self.ssdir), [])
208
209   def testWriteFilesTooLong(self):
210     values = {
211       constants.SS_INSTANCE_LIST: "A" * ssconf._MAX_SIZE,
212       }
213
214     for dry_run in [False, True]:
215       try:
216         self.sstore.WriteFiles(values, dry_run=dry_run)
217       except errors.ConfigurationError, err:
218         self.assertTrue(str(err).startswith("Value 'instance_list' has"))
219       else:
220         self.fail("Exception was not raised")
221
222       self.assertEqual(os.listdir(self.ssdir), [])
223
224   def testGetHvparamsForHypervisor(self):
225     hvparams = [("a", "A"), ("b", "B"), ("c", "C")]
226     ssconf_file_content = '\n'.join("%s=%s" % (key, value) for (key, value)
227                                     in hvparams)
228     self.sstore._ReadFile = mock.Mock(return_value=ssconf_file_content)
229     result = self.sstore.GetHvparamsForHypervisor("foo")
230     for (key, value) in hvparams:
231       self.assertTrue(key in result)
232       self.assertEqual(value, result[key])
233
234
235 class TestVerifyClusterName(unittest.TestCase):
236   def setUp(self):
237     self.tmpdir = tempfile.mkdtemp()
238
239   def tearDown(self):
240     shutil.rmtree(self.tmpdir)
241
242   def testMissingFile(self):
243     tmploc = utils.PathJoin(self.tmpdir, "does-not-exist")
244     ssconf.VerifyClusterName(NotImplemented, _cfg_location=tmploc)
245
246   def testMatchingName(self):
247     tmpfile = utils.PathJoin(self.tmpdir, "ssconf_cluster_name")
248
249     for content in ["cluster.example.com", "cluster.example.com\n\n"]:
250       utils.WriteFile(tmpfile, data=content)
251       ssconf.VerifyClusterName("cluster.example.com",
252                                _cfg_location=self.tmpdir)
253
254   def testNameMismatch(self):
255     tmpfile = utils.PathJoin(self.tmpdir, "ssconf_cluster_name")
256
257     for content in ["something.example.com", "foobar\n\ncluster.example.com"]:
258       utils.WriteFile(tmpfile, data=content)
259       self.assertRaises(errors.GenericError, ssconf.VerifyClusterName,
260                         "cluster.example.com", _cfg_location=self.tmpdir)
261
262
263 class TestVerifyKeys(unittest.TestCase):
264   def testNoKeys(self):
265     ssconf.VerifyKeys({})
266
267   def testValidKeys(self):
268     ssconf.VerifyKeys(ssconf._VALID_KEYS)
269
270     for key in ssconf._VALID_KEYS:
271       ssconf.VerifyKeys([key])
272
273   def testInvalidKeys(self):
274     for key in ["", ".", " ", "foo", "bar", "HelloWorld"]:
275       self.assertRaises(errors.GenericError, ssconf.VerifyKeys, [key])
276
277
278 if __name__ == "__main__":
279   testutils.GanetiTestProgram()