4 # Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for unittesting the utils module"""
42 from ganeti import constants
43 from ganeti import compat
44 from ganeti import utils
45 from ganeti import errors
46 from ganeti.utils import RunCmd, \
51 class TestParseCpuMask(unittest.TestCase):
52 """Test case for the ParseCpuMask function."""
54 def testWellFormed(self):
55 self.assertEqual(utils.ParseCpuMask(""), [])
56 self.assertEqual(utils.ParseCpuMask("1"), [1])
57 self.assertEqual(utils.ParseCpuMask("0-2,4,5-5"), [0,1,2,4,5])
59 def testInvalidInput(self):
60 for data in ["garbage", "0,", "0-1-2", "2-1", "1-a"]:
61 self.assertRaises(errors.ParseError, utils.ParseCpuMask, data)
64 class TestParseMultiCpuMask(unittest.TestCase):
65 """Test case for the ParseMultiCpuMask function."""
67 def testWellFormed(self):
68 self.assertEqual(utils.ParseMultiCpuMask(""), [])
69 self.assertEqual(utils.ParseMultiCpuMask("1"), [[1]])
70 self.assertEqual(utils.ParseMultiCpuMask("0-2,4,5-5"), [[0, 1, 2, 4, 5]])
71 self.assertEqual(utils.ParseMultiCpuMask("all"), [[-1]])
72 self.assertEqual(utils.ParseMultiCpuMask("0-2:all:4,6-8"),
73 [[0, 1, 2], [-1], [4, 6, 7, 8]])
75 def testInvalidInput(self):
76 for data in ["garbage", "0,", "0-1-2", "2-1", "1-a", "all-all"]:
77 self.assertRaises(errors.ParseError, utils.ParseCpuMask, data)
80 class TestGetMounts(unittest.TestCase):
81 """Test case for GetMounts()."""
84 "rootfs / rootfs rw 0 0\n"
85 "none /sys sysfs rw,nosuid,nodev,noexec,relatime 0 0\n"
86 "none /proc proc rw,nosuid,nodev,noexec,relatime 0 0\n")
89 self.tmpfile = tempfile.NamedTemporaryFile()
90 utils.WriteFile(self.tmpfile.name, data=self.TESTDATA)
92 def testGetMounts(self):
93 self.assertEqual(utils.GetMounts(filename=self.tmpfile.name),
95 ("rootfs", "/", "rootfs", "rw"),
96 ("none", "/sys", "sysfs", "rw,nosuid,nodev,noexec,relatime"),
97 ("none", "/proc", "proc", "rw,nosuid,nodev,noexec,relatime"),
101 class TestFirstFree(unittest.TestCase):
102 """Test case for the FirstFree function"""
106 self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
107 self.failUnlessEqual(FirstFree([]), None)
108 self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
109 self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
110 self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
113 class TestTimeFunctions(unittest.TestCase):
114 """Test case for time functions"""
117 self.assertEqual(utils.SplitTime(1), (1, 0))
118 self.assertEqual(utils.SplitTime(1.5), (1, 500000))
119 self.assertEqual(utils.SplitTime(1218448917.4809151), (1218448917, 480915))
120 self.assertEqual(utils.SplitTime(123.48012), (123, 480120))
121 self.assertEqual(utils.SplitTime(123.9996), (123, 999600))
122 self.assertEqual(utils.SplitTime(123.9995), (123, 999500))
123 self.assertEqual(utils.SplitTime(123.9994), (123, 999400))
124 self.assertEqual(utils.SplitTime(123.999999999), (123, 999999))
126 self.assertRaises(AssertionError, utils.SplitTime, -1)
128 self.assertEqual(utils.MergeTime((1, 0)), 1.0)
129 self.assertEqual(utils.MergeTime((1, 500000)), 1.5)
130 self.assertEqual(utils.MergeTime((1218448917, 500000)), 1218448917.5)
132 self.assertEqual(round(utils.MergeTime((1218448917, 481000)), 3),
134 self.assertEqual(round(utils.MergeTime((1, 801000)), 3), 1.801)
136 self.assertRaises(AssertionError, utils.MergeTime, (0, -1))
137 self.assertRaises(AssertionError, utils.MergeTime, (0, 1000000))
138 self.assertRaises(AssertionError, utils.MergeTime, (0, 9999999))
139 self.assertRaises(AssertionError, utils.MergeTime, (-1, 0))
140 self.assertRaises(AssertionError, utils.MergeTime, (-9999, 0))
143 class FieldSetTestCase(unittest.TestCase):
144 """Test case for FieldSets"""
146 def testSimpleMatch(self):
147 f = utils.FieldSet("a", "b", "c", "def")
148 self.failUnless(f.Matches("a"))
149 self.failIf(f.Matches("d"), "Substring matched")
150 self.failIf(f.Matches("defghi"), "Prefix string matched")
151 self.failIf(f.NonMatching(["b", "c"]))
152 self.failIf(f.NonMatching(["a", "b", "c", "def"]))
153 self.failUnless(f.NonMatching(["a", "d"]))
155 def testRegexMatch(self):
156 f = utils.FieldSet("a", "b([0-9]+)", "c")
157 self.failUnless(f.Matches("b1"))
158 self.failUnless(f.Matches("b99"))
159 self.failIf(f.Matches("b/1"))
160 self.failIf(f.NonMatching(["b12", "c"]))
161 self.failUnless(f.NonMatching(["a", "1"]))
163 class TestForceDictType(unittest.TestCase):
164 """Test case for ForceDictType"""
166 "a": constants.VTYPE_INT,
167 "b": constants.VTYPE_BOOL,
168 "c": constants.VTYPE_STRING,
169 "d": constants.VTYPE_SIZE,
170 "e": constants.VTYPE_MAYBE_STRING,
173 def _fdt(self, dict, allowed_values=None):
174 if allowed_values is None:
175 utils.ForceDictType(dict, self.KEY_TYPES)
177 utils.ForceDictType(dict, self.KEY_TYPES, allowed_values=allowed_values)
181 def testSimpleDict(self):
182 self.assertEqual(self._fdt({}), {})
183 self.assertEqual(self._fdt({"a": 1}), {"a": 1})
184 self.assertEqual(self._fdt({"a": "1"}), {"a": 1})
185 self.assertEqual(self._fdt({"a": 1, "b": 1}), {"a":1, "b": True})
186 self.assertEqual(self._fdt({"b": 1, "c": "foo"}), {"b": True, "c": "foo"})
187 self.assertEqual(self._fdt({"b": 1, "c": False}), {"b": True, "c": ""})
188 self.assertEqual(self._fdt({"b": "false"}), {"b": False})
189 self.assertEqual(self._fdt({"b": "False"}), {"b": False})
190 self.assertEqual(self._fdt({"b": False}), {"b": False})
191 self.assertEqual(self._fdt({"b": "true"}), {"b": True})
192 self.assertEqual(self._fdt({"b": "True"}), {"b": True})
193 self.assertEqual(self._fdt({"d": "4"}), {"d": 4})
194 self.assertEqual(self._fdt({"d": "4M"}), {"d": 4})
195 self.assertEqual(self._fdt({"e": constants.VALUE_HS_NOTHING, }), {"e":
196 constants.VALUE_HS_NOTHING, })
197 self.assertEqual(self._fdt({"e": "Hello World", }), {"e": "Hello World", })
198 self.assertEqual(self._fdt({"e": False, }), {"e": "", })
199 self.assertEqual(self._fdt({"b": "hello", }, ["hello"]), {"b": "hello"})
201 def testErrors(self):
202 self.assertRaises(errors.TypeEnforcementError, self._fdt, {"a": "astring"})
203 self.assertRaises(errors.TypeEnforcementError, self._fdt, {"b": "hello"})
204 self.assertRaises(errors.TypeEnforcementError, self._fdt, {"c": True})
205 self.assertRaises(errors.TypeEnforcementError, self._fdt, {"d": "astring"})
206 self.assertRaises(errors.TypeEnforcementError, self._fdt, {"d": "4 L"})
207 self.assertRaises(errors.TypeEnforcementError, self._fdt, {"e": object(), })
208 self.assertRaises(errors.TypeEnforcementError, self._fdt, {"e": [], })
209 self.assertRaises(errors.TypeEnforcementError, self._fdt, {"x": None, })
210 self.assertRaises(errors.TypeEnforcementError, self._fdt, [])
211 self.assertRaises(errors.ProgrammerError, utils.ForceDictType,
212 {"b": "hello"}, {"b": "no-such-type"})
215 class TestValidateServiceName(unittest.TestCase):
218 0, 1, 2, 3, 1024, 65000, 65534, 65535,
223 "0", "80", "1111", "65535",
226 for name in testnames:
227 self.assertEqual(utils.ValidateServiceName(name), name)
229 def testInvalid(self):
231 -15756, -1, 65536, 133428083,
232 "", "Hello World!", "!", "'", "\"", "\t", "\n", "`",
233 "-8546", "-1", "65536",
237 for name in testnames:
238 self.assertRaises(errors.OpPrereqError, utils.ValidateServiceName, name)
241 class TestReadLockedPidFile(unittest.TestCase):
243 self.tmpdir = tempfile.mkdtemp()
246 shutil.rmtree(self.tmpdir)
248 def testNonExistent(self):
249 path = utils.PathJoin(self.tmpdir, "nonexist")
250 self.assert_(utils.ReadLockedPidFile(path) is None)
252 def testUnlocked(self):
253 path = utils.PathJoin(self.tmpdir, "pid")
254 utils.WriteFile(path, data="123")
255 self.assert_(utils.ReadLockedPidFile(path) is None)
257 def testLocked(self):
258 path = utils.PathJoin(self.tmpdir, "pid")
259 utils.WriteFile(path, data="123")
261 fl = utils.FileLock.Open(path)
263 fl.Exclusive(blocking=True)
265 self.assertEqual(utils.ReadLockedPidFile(path), 123)
269 self.assert_(utils.ReadLockedPidFile(path) is None)
272 path = utils.PathJoin(self.tmpdir, "foobar", "pid")
273 utils.WriteFile(utils.PathJoin(self.tmpdir, "foobar"), data="")
274 # open(2) should return ENOTDIR
275 self.assertRaises(EnvironmentError, utils.ReadLockedPidFile, path)
278 class TestFindMatch(unittest.TestCase):
282 "bb": {"Two B": True},
283 re.compile(r"^x(foo|bar|bazX)([0-9]+)$"): (1, 2, 3),
286 self.assertEqual(utils.FindMatch(data, "aaaa"), ("Four A", []))
287 self.assertEqual(utils.FindMatch(data, "bb"), ({"Two B": True}, []))
289 for i in ["foo", "bar", "bazX"]:
290 for j in range(1, 100, 7):
291 self.assertEqual(utils.FindMatch(data, "x%s%s" % (i, j)),
292 ((1, 2, 3), [i, str(j)]))
294 def testNoMatch(self):
295 self.assert_(utils.FindMatch({}, "") is None)
296 self.assert_(utils.FindMatch({}, "foo") is None)
297 self.assert_(utils.FindMatch({}, 1234) is None)
301 re.compile("^(something)$"): "Hello World",
304 self.assert_(utils.FindMatch(data, "") is None)
305 self.assert_(utils.FindMatch(data, "Hello World") is None)
308 class TestTryConvert(unittest.TestCase):
310 for src, fn, result in [
316 self.assertEqual(utils.TryConvert(fn, src), result)
319 class TestVerifyDictOptions(unittest.TestCase):
322 "first_key": "foobar",
327 "another_key": "another_value",
332 "first_key": "blubb",
337 utils.VerifyDictOptions(some_keys, self.defaults)
339 def testInvalid(self):
341 "invalid_key": "blubb",
346 self.assertRaises(errors.OpPrereqError, utils.VerifyDictOptions,
347 some_keys, self.defaults)
349 def testNestedInvalid(self):
356 self.assertRaises(errors.OpPrereqError, utils.VerifyDictOptions,
357 some_keys, self.defaults)
359 def testMultiInvalid(self):
363 "key6": "Right here",
365 "invalid_with_sub": {
369 self.assertRaises(errors.OpPrereqError, utils.VerifyDictOptions,
370 some_keys, self.defaults)
373 class TestValidateDeviceNames(unittest.TestCase):
375 utils.ValidateDeviceNames("NIC", [])
376 utils.ValidateDeviceNames("disk", [])
378 def testNoName(self):
380 utils.ValidateDeviceNames("NIC", nics)
382 def testInvalidName(self):
383 self.assertRaises(errors.OpPrereqError, utils.ValidateDeviceNames,
384 "disk", [{constants.IDISK_NAME: "42"}])
385 self.assertRaises(errors.OpPrereqError, utils.ValidateDeviceNames,
386 "NIC", [{constants.INIC_NAME: "42"}])
388 def testUsedName(self):
389 disks = [{constants.IDISK_NAME: "name1"}, {constants.IDISK_NAME: "name1"}]
390 self.assertRaises(errors.OpPrereqError, utils.ValidateDeviceNames,
394 if __name__ == "__main__":
395 testutils.GanetiTestProgram()