4 # Copyright (C) 2012 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for testing ganeti.ssconf"""
30 from ganeti import utils
31 from ganeti import constants
32 from ganeti import errors
33 from ganeti import ssconf
39 class TestReadSsconfFile(unittest.TestCase):
41 self.tmpdir = tempfile.mkdtemp()
44 shutil.rmtree(self.tmpdir)
46 def testReadDirectory(self):
47 self.assertRaises(EnvironmentError, ssconf.ReadSsconfFile, self.tmpdir)
49 def testNonExistantFile(self):
50 testfile = utils.PathJoin(self.tmpdir, "does.not.exist")
52 self.assertFalse(os.path.exists(testfile))
55 ssconf.ReadSsconfFile(testfile)
56 except EnvironmentError, err:
57 self.assertEqual(err.errno, errno.ENOENT)
59 self.fail("Exception was not raised")
61 def testEmptyFile(self):
62 testfile = utils.PathJoin(self.tmpdir, "empty")
64 utils.WriteFile(testfile, data="")
66 self.assertEqual(ssconf.ReadSsconfFile(testfile), "")
68 def testSingleLine(self):
69 testfile = utils.PathJoin(self.tmpdir, "data")
71 for nl in range(0, 10):
72 utils.WriteFile(testfile, data="Hello World" + ("\n" * nl))
74 self.assertEqual(ssconf.ReadSsconfFile(testfile),
77 def testExactlyMaxSize(self):
78 testfile = utils.PathJoin(self.tmpdir, "data")
80 data = "A" * ssconf._MAX_SIZE
81 utils.WriteFile(testfile, data=data)
83 self.assertEqual(os.path.getsize(testfile), ssconf._MAX_SIZE)
85 self.assertEqual(ssconf.ReadSsconfFile(testfile),
88 def testLargeFile(self):
89 testfile = utils.PathJoin(self.tmpdir, "data")
91 for size in [ssconf._MAX_SIZE + 1, ssconf._MAX_SIZE * 2]:
92 utils.WriteFile(testfile, data="A" * size)
93 self.assertTrue(os.path.getsize(testfile) > ssconf._MAX_SIZE)
94 self.assertRaises(RuntimeError, ssconf.ReadSsconfFile, testfile)
97 class TestSimpleStore(unittest.TestCase):
99 self._tmpdir = tempfile.mkdtemp()
100 self.ssdir = utils.PathJoin(self._tmpdir, "files")
101 lockfile = utils.PathJoin(self._tmpdir, "lock")
105 self.sstore = ssconf.SimpleStore(cfg_location=self.ssdir,
109 shutil.rmtree(self._tmpdir)
111 def _ReadSsFile(self, filename):
112 return utils.ReadFile(utils.PathJoin(self.ssdir, "ssconf_%s" % filename))
114 def testInvalidKey(self):
115 self.assertRaises(errors.ProgrammerError, self.sstore.KeyToFilename,
117 self.assertRaises(errors.ProgrammerError, self.sstore._ReadFile,
120 def testKeyToFilename(self):
121 for key in ssconf._VALID_KEYS:
122 result = self.sstore.KeyToFilename(key)
123 self.assertTrue(utils.IsBelowDir(self.ssdir, result))
124 self.assertTrue(os.path.basename(result).startswith("ssconf_"))
126 def testReadFileNonExistingFile(self):
127 filename = self.sstore.KeyToFilename(constants.SS_CLUSTER_NAME)
129 self.assertFalse(os.path.exists(filename))
131 self.sstore._ReadFile(constants.SS_CLUSTER_NAME)
132 except errors.ConfigurationError, err:
133 self.assertTrue(str(err).startswith("Can't read ssconf file"))
135 self.fail("Exception was not raised")
137 for default in ["", "Hello World", 0, 100]:
138 self.assertFalse(os.path.exists(filename))
139 result = self.sstore._ReadFile(constants.SS_CLUSTER_NAME, default=default)
140 self.assertEqual(result, default)
142 def testReadFile(self):
143 utils.WriteFile(self.sstore.KeyToFilename(constants.SS_CLUSTER_NAME),
144 data="cluster.example.com")
146 self.assertEqual(self.sstore._ReadFile(constants.SS_CLUSTER_NAME),
147 "cluster.example.com")
149 self.assertEqual(self.sstore._ReadFile(constants.SS_CLUSTER_NAME,
150 default="something.example.com"),
151 "cluster.example.com")
153 def testReadAllNoFiles(self):
154 self.assertEqual(self.sstore.ReadAll(), {})
156 def testReadAllSingleFile(self):
157 utils.WriteFile(self.sstore.KeyToFilename(constants.SS_CLUSTER_NAME),
158 data="cluster.example.com")
159 self.assertEqual(self.sstore.ReadAll(), {
160 constants.SS_CLUSTER_NAME: "cluster.example.com",
163 def testWriteFiles(self):
165 constants.SS_CLUSTER_NAME: "cluster.example.com",
166 constants.SS_CLUSTER_TAGS: "value\nwith\nnewlines\n",
167 constants.SS_INSTANCE_LIST: "",
170 self.sstore.WriteFiles(values)
172 self.assertEqual(sorted(os.listdir(self.ssdir)), sorted([
173 "ssconf_cluster_name",
174 "ssconf_cluster_tags",
175 "ssconf_instance_list",
178 self.assertEqual(self._ReadSsFile(constants.SS_CLUSTER_NAME),
179 "cluster.example.com\n")
180 self.assertEqual(self._ReadSsFile(constants.SS_CLUSTER_TAGS),
181 "value\nwith\nnewlines\n")
182 self.assertEqual(self._ReadSsFile(constants.SS_INSTANCE_LIST), "")
184 def testWriteFilesUnknownKey(self):
186 "unknown key": "value",
189 self.assertRaises(errors.ProgrammerError, self.sstore.WriteFiles,
190 values, dry_run=True)
192 self.assertEqual(os.listdir(self.ssdir), [])
194 def testWriteFilesDryRun(self):
196 constants.SS_CLUSTER_NAME: "cluster.example.com",
199 self.sstore.WriteFiles(values, dry_run=True)
201 self.assertEqual(os.listdir(self.ssdir), [])
203 def testWriteFilesNoValues(self):
204 for dry_run in [False, True]:
205 self.sstore.WriteFiles({}, dry_run=dry_run)
207 self.assertEqual(os.listdir(self.ssdir), [])
209 def testWriteFilesTooLong(self):
211 constants.SS_INSTANCE_LIST: "A" * ssconf._MAX_SIZE,
214 for dry_run in [False, True]:
216 self.sstore.WriteFiles(values, dry_run=dry_run)
217 except errors.ConfigurationError, err:
218 self.assertTrue(str(err).startswith("Value 'instance_list' has"))
220 self.fail("Exception was not raised")
222 self.assertEqual(os.listdir(self.ssdir), [])
224 def testGetHvparamsForHypervisor(self):
225 hvparams = [("a", "A"), ("b", "B"), ("c", "C")]
226 ssconf_file_content = '\n'.join("%s=%s" % (key, value) for (key, value)
228 self.sstore._ReadFile = mock.Mock(return_value=ssconf_file_content)
229 result = self.sstore.GetHvparamsForHypervisor("foo")
230 for (key, value) in hvparams:
231 self.assertTrue(key in result)
232 self.assertEqual(value, result[key])
235 class TestVerifyClusterName(unittest.TestCase):
237 self.tmpdir = tempfile.mkdtemp()
240 shutil.rmtree(self.tmpdir)
242 def testMissingFile(self):
243 tmploc = utils.PathJoin(self.tmpdir, "does-not-exist")
244 ssconf.VerifyClusterName(NotImplemented, _cfg_location=tmploc)
246 def testMatchingName(self):
247 tmpfile = utils.PathJoin(self.tmpdir, "ssconf_cluster_name")
249 for content in ["cluster.example.com", "cluster.example.com\n\n"]:
250 utils.WriteFile(tmpfile, data=content)
251 ssconf.VerifyClusterName("cluster.example.com",
252 _cfg_location=self.tmpdir)
254 def testNameMismatch(self):
255 tmpfile = utils.PathJoin(self.tmpdir, "ssconf_cluster_name")
257 for content in ["something.example.com", "foobar\n\ncluster.example.com"]:
258 utils.WriteFile(tmpfile, data=content)
259 self.assertRaises(errors.GenericError, ssconf.VerifyClusterName,
260 "cluster.example.com", _cfg_location=self.tmpdir)
263 class TestVerifyKeys(unittest.TestCase):
264 def testNoKeys(self):
265 ssconf.VerifyKeys({})
267 def testValidKeys(self):
268 ssconf.VerifyKeys(ssconf._VALID_KEYS)
270 for key in ssconf._VALID_KEYS:
271 ssconf.VerifyKeys([key])
273 def testInvalidKeys(self):
274 for key in ["", ".", " ", "foo", "bar", "HelloWorld"]:
275 self.assertRaises(errors.GenericError, ssconf.VerifyKeys, [key])
278 if __name__ == "__main__":
279 testutils.GanetiTestProgram()