4 # Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for unittesting the utils module"""
42 from ganeti import constants
43 from ganeti import compat
44 from ganeti import utils
45 from ganeti import errors
46 from ganeti.utils import RunCmd, \
51 class TestParseCpuMask(unittest.TestCase):
52 """Test case for the ParseCpuMask function."""
54 def testWellFormed(self):
55 self.assertEqual(utils.ParseCpuMask(""), [])
56 self.assertEqual(utils.ParseCpuMask("1"), [1])
57 self.assertEqual(utils.ParseCpuMask("0-2,4,5-5"), [0,1,2,4,5])
59 def testInvalidInput(self):
60 for data in ["garbage", "0,", "0-1-2", "2-1", "1-a"]:
61 self.assertRaises(errors.ParseError, utils.ParseCpuMask, data)
64 class TestGetMounts(unittest.TestCase):
65 """Test case for GetMounts()."""
68 "rootfs / rootfs rw 0 0\n"
69 "none /sys sysfs rw,nosuid,nodev,noexec,relatime 0 0\n"
70 "none /proc proc rw,nosuid,nodev,noexec,relatime 0 0\n")
73 self.tmpfile = tempfile.NamedTemporaryFile()
74 utils.WriteFile(self.tmpfile.name, data=self.TESTDATA)
76 def testGetMounts(self):
77 self.assertEqual(utils.GetMounts(filename=self.tmpfile.name),
79 ("rootfs", "/", "rootfs", "rw"),
80 ("none", "/sys", "sysfs", "rw,nosuid,nodev,noexec,relatime"),
81 ("none", "/proc", "proc", "rw,nosuid,nodev,noexec,relatime"),
85 class TestFirstFree(unittest.TestCase):
86 """Test case for the FirstFree function"""
90 self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
91 self.failUnlessEqual(FirstFree([]), None)
92 self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
93 self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
94 self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
97 class TestTimeFunctions(unittest.TestCase):
98 """Test case for time functions"""
101 self.assertEqual(utils.SplitTime(1), (1, 0))
102 self.assertEqual(utils.SplitTime(1.5), (1, 500000))
103 self.assertEqual(utils.SplitTime(1218448917.4809151), (1218448917, 480915))
104 self.assertEqual(utils.SplitTime(123.48012), (123, 480120))
105 self.assertEqual(utils.SplitTime(123.9996), (123, 999600))
106 self.assertEqual(utils.SplitTime(123.9995), (123, 999500))
107 self.assertEqual(utils.SplitTime(123.9994), (123, 999400))
108 self.assertEqual(utils.SplitTime(123.999999999), (123, 999999))
110 self.assertRaises(AssertionError, utils.SplitTime, -1)
112 self.assertEqual(utils.MergeTime((1, 0)), 1.0)
113 self.assertEqual(utils.MergeTime((1, 500000)), 1.5)
114 self.assertEqual(utils.MergeTime((1218448917, 500000)), 1218448917.5)
116 self.assertEqual(round(utils.MergeTime((1218448917, 481000)), 3),
118 self.assertEqual(round(utils.MergeTime((1, 801000)), 3), 1.801)
120 self.assertRaises(AssertionError, utils.MergeTime, (0, -1))
121 self.assertRaises(AssertionError, utils.MergeTime, (0, 1000000))
122 self.assertRaises(AssertionError, utils.MergeTime, (0, 9999999))
123 self.assertRaises(AssertionError, utils.MergeTime, (-1, 0))
124 self.assertRaises(AssertionError, utils.MergeTime, (-9999, 0))
127 class FieldSetTestCase(unittest.TestCase):
128 """Test case for FieldSets"""
130 def testSimpleMatch(self):
131 f = utils.FieldSet("a", "b", "c", "def")
132 self.failUnless(f.Matches("a"))
133 self.failIf(f.Matches("d"), "Substring matched")
134 self.failIf(f.Matches("defghi"), "Prefix string matched")
135 self.failIf(f.NonMatching(["b", "c"]))
136 self.failIf(f.NonMatching(["a", "b", "c", "def"]))
137 self.failUnless(f.NonMatching(["a", "d"]))
139 def testRegexMatch(self):
140 f = utils.FieldSet("a", "b([0-9]+)", "c")
141 self.failUnless(f.Matches("b1"))
142 self.failUnless(f.Matches("b99"))
143 self.failIf(f.Matches("b/1"))
144 self.failIf(f.NonMatching(["b12", "c"]))
145 self.failUnless(f.NonMatching(["a", "1"]))
147 class TestForceDictType(unittest.TestCase):
148 """Test case for ForceDictType"""
150 "a": constants.VTYPE_INT,
151 "b": constants.VTYPE_BOOL,
152 "c": constants.VTYPE_STRING,
153 "d": constants.VTYPE_SIZE,
154 "e": constants.VTYPE_MAYBE_STRING,
157 def _fdt(self, dict, allowed_values=None):
158 if allowed_values is None:
159 utils.ForceDictType(dict, self.KEY_TYPES)
161 utils.ForceDictType(dict, self.KEY_TYPES, allowed_values=allowed_values)
165 def testSimpleDict(self):
166 self.assertEqual(self._fdt({}), {})
167 self.assertEqual(self._fdt({'a': 1}), {'a': 1})
168 self.assertEqual(self._fdt({'a': '1'}), {'a': 1})
169 self.assertEqual(self._fdt({'a': 1, 'b': 1}), {'a':1, 'b': True})
170 self.assertEqual(self._fdt({'b': 1, 'c': 'foo'}), {'b': True, 'c': 'foo'})
171 self.assertEqual(self._fdt({'b': 1, 'c': False}), {'b': True, 'c': ''})
172 self.assertEqual(self._fdt({'b': 'false'}), {'b': False})
173 self.assertEqual(self._fdt({'b': 'False'}), {'b': False})
174 self.assertEqual(self._fdt({'b': False}), {'b': False})
175 self.assertEqual(self._fdt({'b': 'true'}), {'b': True})
176 self.assertEqual(self._fdt({'b': 'True'}), {'b': True})
177 self.assertEqual(self._fdt({'d': '4'}), {'d': 4})
178 self.assertEqual(self._fdt({'d': '4M'}), {'d': 4})
179 self.assertEqual(self._fdt({"e": None, }), {"e": None, })
180 self.assertEqual(self._fdt({"e": "Hello World", }), {"e": "Hello World", })
181 self.assertEqual(self._fdt({"e": False, }), {"e": '', })
182 self.assertEqual(self._fdt({"b": "hello", }, ["hello"]), {"b": "hello"})
184 def testErrors(self):
185 self.assertRaises(errors.TypeEnforcementError, self._fdt, {'a': 'astring'})
186 self.assertRaises(errors.TypeEnforcementError, self._fdt, {"b": "hello"})
187 self.assertRaises(errors.TypeEnforcementError, self._fdt, {'c': True})
188 self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': 'astring'})
189 self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': '4 L'})
190 self.assertRaises(errors.TypeEnforcementError, self._fdt, {"e": object(), })
191 self.assertRaises(errors.TypeEnforcementError, self._fdt, {"e": [], })
192 self.assertRaises(errors.TypeEnforcementError, self._fdt, {"x": None, })
193 self.assertRaises(errors.TypeEnforcementError, self._fdt, [])
194 self.assertRaises(errors.ProgrammerError, utils.ForceDictType,
195 {"b": "hello"}, {"b": "no-such-type"})
198 class TestValidateServiceName(unittest.TestCase):
201 0, 1, 2, 3, 1024, 65000, 65534, 65535,
206 "0", "80", "1111", "65535",
209 for name in testnames:
210 self.assertEqual(utils.ValidateServiceName(name), name)
212 def testInvalid(self):
214 -15756, -1, 65536, 133428083,
215 "", "Hello World!", "!", "'", "\"", "\t", "\n", "`",
216 "-8546", "-1", "65536",
220 for name in testnames:
221 self.assertRaises(errors.OpPrereqError, utils.ValidateServiceName, name)
224 class TestReadLockedPidFile(unittest.TestCase):
226 self.tmpdir = tempfile.mkdtemp()
229 shutil.rmtree(self.tmpdir)
231 def testNonExistent(self):
232 path = utils.PathJoin(self.tmpdir, "nonexist")
233 self.assert_(utils.ReadLockedPidFile(path) is None)
235 def testUnlocked(self):
236 path = utils.PathJoin(self.tmpdir, "pid")
237 utils.WriteFile(path, data="123")
238 self.assert_(utils.ReadLockedPidFile(path) is None)
240 def testLocked(self):
241 path = utils.PathJoin(self.tmpdir, "pid")
242 utils.WriteFile(path, data="123")
244 fl = utils.FileLock.Open(path)
246 fl.Exclusive(blocking=True)
248 self.assertEqual(utils.ReadLockedPidFile(path), 123)
252 self.assert_(utils.ReadLockedPidFile(path) is None)
255 path = utils.PathJoin(self.tmpdir, "foobar", "pid")
256 utils.WriteFile(utils.PathJoin(self.tmpdir, "foobar"), data="")
257 # open(2) should return ENOTDIR
258 self.assertRaises(EnvironmentError, utils.ReadLockedPidFile, path)
261 class TestFindMatch(unittest.TestCase):
265 "bb": {"Two B": True},
266 re.compile(r"^x(foo|bar|bazX)([0-9]+)$"): (1, 2, 3),
269 self.assertEqual(utils.FindMatch(data, "aaaa"), ("Four A", []))
270 self.assertEqual(utils.FindMatch(data, "bb"), ({"Two B": True}, []))
272 for i in ["foo", "bar", "bazX"]:
273 for j in range(1, 100, 7):
274 self.assertEqual(utils.FindMatch(data, "x%s%s" % (i, j)),
275 ((1, 2, 3), [i, str(j)]))
277 def testNoMatch(self):
278 self.assert_(utils.FindMatch({}, "") is None)
279 self.assert_(utils.FindMatch({}, "foo") is None)
280 self.assert_(utils.FindMatch({}, 1234) is None)
284 re.compile("^(something)$"): "Hello World",
287 self.assert_(utils.FindMatch(data, "") is None)
288 self.assert_(utils.FindMatch(data, "Hello World") is None)
291 class TestTryConvert(unittest.TestCase):
293 for src, fn, result in [
299 self.assertEqual(utils.TryConvert(fn, src), result)
302 if __name__ == '__main__':
303 testutils.GanetiTestProgram()