utils: Move NewUUID to utils.io
[ganeti-local] / test / ganeti.utils_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for unittesting the utils module"""
23
24 import errno
25 import fcntl
26 import glob
27 import os
28 import os.path
29 import re
30 import shutil
31 import signal
32 import socket
33 import stat
34 import tempfile
35 import time
36 import unittest
37 import warnings
38 import random
39 import operator
40
41 import testutils
42 from ganeti import constants
43 from ganeti import compat
44 from ganeti import utils
45 from ganeti import errors
46 from ganeti.utils import RunCmd, \
47      FirstFree, \
48      RunParts
49
50
51 class TestParseCpuMask(unittest.TestCase):
52   """Test case for the ParseCpuMask function."""
53
54   def testWellFormed(self):
55     self.assertEqual(utils.ParseCpuMask(""), [])
56     self.assertEqual(utils.ParseCpuMask("1"), [1])
57     self.assertEqual(utils.ParseCpuMask("0-2,4,5-5"), [0,1,2,4,5])
58
59   def testInvalidInput(self):
60     for data in ["garbage", "0,", "0-1-2", "2-1", "1-a"]:
61       self.assertRaises(errors.ParseError, utils.ParseCpuMask, data)
62
63
64 class TestGetMounts(unittest.TestCase):
65   """Test case for GetMounts()."""
66
67   TESTDATA = (
68     "rootfs /     rootfs rw 0 0\n"
69     "none   /sys  sysfs  rw,nosuid,nodev,noexec,relatime 0 0\n"
70     "none   /proc proc   rw,nosuid,nodev,noexec,relatime 0 0\n")
71
72   def setUp(self):
73     self.tmpfile = tempfile.NamedTemporaryFile()
74     utils.WriteFile(self.tmpfile.name, data=self.TESTDATA)
75
76   def testGetMounts(self):
77     self.assertEqual(utils.GetMounts(filename=self.tmpfile.name),
78       [
79         ("rootfs", "/", "rootfs", "rw"),
80         ("none", "/sys", "sysfs", "rw,nosuid,nodev,noexec,relatime"),
81         ("none", "/proc", "proc", "rw,nosuid,nodev,noexec,relatime"),
82       ])
83
84
85 class TestFirstFree(unittest.TestCase):
86   """Test case for the FirstFree function"""
87
88   def test(self):
89     """Test FirstFree"""
90     self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
91     self.failUnlessEqual(FirstFree([]), None)
92     self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
93     self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
94     self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
95
96
97 class TestTimeFunctions(unittest.TestCase):
98   """Test case for time functions"""
99
100   def runTest(self):
101     self.assertEqual(utils.SplitTime(1), (1, 0))
102     self.assertEqual(utils.SplitTime(1.5), (1, 500000))
103     self.assertEqual(utils.SplitTime(1218448917.4809151), (1218448917, 480915))
104     self.assertEqual(utils.SplitTime(123.48012), (123, 480120))
105     self.assertEqual(utils.SplitTime(123.9996), (123, 999600))
106     self.assertEqual(utils.SplitTime(123.9995), (123, 999500))
107     self.assertEqual(utils.SplitTime(123.9994), (123, 999400))
108     self.assertEqual(utils.SplitTime(123.999999999), (123, 999999))
109
110     self.assertRaises(AssertionError, utils.SplitTime, -1)
111
112     self.assertEqual(utils.MergeTime((1, 0)), 1.0)
113     self.assertEqual(utils.MergeTime((1, 500000)), 1.5)
114     self.assertEqual(utils.MergeTime((1218448917, 500000)), 1218448917.5)
115
116     self.assertEqual(round(utils.MergeTime((1218448917, 481000)), 3),
117                      1218448917.481)
118     self.assertEqual(round(utils.MergeTime((1, 801000)), 3), 1.801)
119
120     self.assertRaises(AssertionError, utils.MergeTime, (0, -1))
121     self.assertRaises(AssertionError, utils.MergeTime, (0, 1000000))
122     self.assertRaises(AssertionError, utils.MergeTime, (0, 9999999))
123     self.assertRaises(AssertionError, utils.MergeTime, (-1, 0))
124     self.assertRaises(AssertionError, utils.MergeTime, (-9999, 0))
125
126
127 class FieldSetTestCase(unittest.TestCase):
128   """Test case for FieldSets"""
129
130   def testSimpleMatch(self):
131     f = utils.FieldSet("a", "b", "c", "def")
132     self.failUnless(f.Matches("a"))
133     self.failIf(f.Matches("d"), "Substring matched")
134     self.failIf(f.Matches("defghi"), "Prefix string matched")
135     self.failIf(f.NonMatching(["b", "c"]))
136     self.failIf(f.NonMatching(["a", "b", "c", "def"]))
137     self.failUnless(f.NonMatching(["a", "d"]))
138
139   def testRegexMatch(self):
140     f = utils.FieldSet("a", "b([0-9]+)", "c")
141     self.failUnless(f.Matches("b1"))
142     self.failUnless(f.Matches("b99"))
143     self.failIf(f.Matches("b/1"))
144     self.failIf(f.NonMatching(["b12", "c"]))
145     self.failUnless(f.NonMatching(["a", "1"]))
146
147 class TestForceDictType(unittest.TestCase):
148   """Test case for ForceDictType"""
149   KEY_TYPES = {
150     "a": constants.VTYPE_INT,
151     "b": constants.VTYPE_BOOL,
152     "c": constants.VTYPE_STRING,
153     "d": constants.VTYPE_SIZE,
154     "e": constants.VTYPE_MAYBE_STRING,
155     }
156
157   def _fdt(self, dict, allowed_values=None):
158     if allowed_values is None:
159       utils.ForceDictType(dict, self.KEY_TYPES)
160     else:
161       utils.ForceDictType(dict, self.KEY_TYPES, allowed_values=allowed_values)
162
163     return dict
164
165   def testSimpleDict(self):
166     self.assertEqual(self._fdt({}), {})
167     self.assertEqual(self._fdt({'a': 1}), {'a': 1})
168     self.assertEqual(self._fdt({'a': '1'}), {'a': 1})
169     self.assertEqual(self._fdt({'a': 1, 'b': 1}), {'a':1, 'b': True})
170     self.assertEqual(self._fdt({'b': 1, 'c': 'foo'}), {'b': True, 'c': 'foo'})
171     self.assertEqual(self._fdt({'b': 1, 'c': False}), {'b': True, 'c': ''})
172     self.assertEqual(self._fdt({'b': 'false'}), {'b': False})
173     self.assertEqual(self._fdt({'b': 'False'}), {'b': False})
174     self.assertEqual(self._fdt({'b': False}), {'b': False})
175     self.assertEqual(self._fdt({'b': 'true'}), {'b': True})
176     self.assertEqual(self._fdt({'b': 'True'}), {'b': True})
177     self.assertEqual(self._fdt({'d': '4'}), {'d': 4})
178     self.assertEqual(self._fdt({'d': '4M'}), {'d': 4})
179     self.assertEqual(self._fdt({"e": None, }), {"e": None, })
180     self.assertEqual(self._fdt({"e": "Hello World", }), {"e": "Hello World", })
181     self.assertEqual(self._fdt({"e": False, }), {"e": '', })
182     self.assertEqual(self._fdt({"b": "hello", }, ["hello"]), {"b": "hello"})
183
184   def testErrors(self):
185     self.assertRaises(errors.TypeEnforcementError, self._fdt, {'a': 'astring'})
186     self.assertRaises(errors.TypeEnforcementError, self._fdt, {"b": "hello"})
187     self.assertRaises(errors.TypeEnforcementError, self._fdt, {'c': True})
188     self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': 'astring'})
189     self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': '4 L'})
190     self.assertRaises(errors.TypeEnforcementError, self._fdt, {"e": object(), })
191     self.assertRaises(errors.TypeEnforcementError, self._fdt, {"e": [], })
192     self.assertRaises(errors.TypeEnforcementError, self._fdt, {"x": None, })
193     self.assertRaises(errors.TypeEnforcementError, self._fdt, [])
194     self.assertRaises(errors.ProgrammerError, utils.ForceDictType,
195                       {"b": "hello"}, {"b": "no-such-type"})
196
197
198 class TestValidateServiceName(unittest.TestCase):
199   def testValid(self):
200     testnames = [
201       0, 1, 2, 3, 1024, 65000, 65534, 65535,
202       "ganeti",
203       "gnt-masterd",
204       "HELLO_WORLD_SVC",
205       "hello.world.1",
206       "0", "80", "1111", "65535",
207       ]
208
209     for name in testnames:
210       self.assertEqual(utils.ValidateServiceName(name), name)
211
212   def testInvalid(self):
213     testnames = [
214       -15756, -1, 65536, 133428083,
215       "", "Hello World!", "!", "'", "\"", "\t", "\n", "`",
216       "-8546", "-1", "65536",
217       (129 * "A"),
218       ]
219
220     for name in testnames:
221       self.assertRaises(errors.OpPrereqError, utils.ValidateServiceName, name)
222
223
224 class TestReadLockedPidFile(unittest.TestCase):
225   def setUp(self):
226     self.tmpdir = tempfile.mkdtemp()
227
228   def tearDown(self):
229     shutil.rmtree(self.tmpdir)
230
231   def testNonExistent(self):
232     path = utils.PathJoin(self.tmpdir, "nonexist")
233     self.assert_(utils.ReadLockedPidFile(path) is None)
234
235   def testUnlocked(self):
236     path = utils.PathJoin(self.tmpdir, "pid")
237     utils.WriteFile(path, data="123")
238     self.assert_(utils.ReadLockedPidFile(path) is None)
239
240   def testLocked(self):
241     path = utils.PathJoin(self.tmpdir, "pid")
242     utils.WriteFile(path, data="123")
243
244     fl = utils.FileLock.Open(path)
245     try:
246       fl.Exclusive(blocking=True)
247
248       self.assertEqual(utils.ReadLockedPidFile(path), 123)
249     finally:
250       fl.Close()
251
252     self.assert_(utils.ReadLockedPidFile(path) is None)
253
254   def testError(self):
255     path = utils.PathJoin(self.tmpdir, "foobar", "pid")
256     utils.WriteFile(utils.PathJoin(self.tmpdir, "foobar"), data="")
257     # open(2) should return ENOTDIR
258     self.assertRaises(EnvironmentError, utils.ReadLockedPidFile, path)
259
260
261 class TestFindMatch(unittest.TestCase):
262   def test(self):
263     data = {
264       "aaaa": "Four A",
265       "bb": {"Two B": True},
266       re.compile(r"^x(foo|bar|bazX)([0-9]+)$"): (1, 2, 3),
267       }
268
269     self.assertEqual(utils.FindMatch(data, "aaaa"), ("Four A", []))
270     self.assertEqual(utils.FindMatch(data, "bb"), ({"Two B": True}, []))
271
272     for i in ["foo", "bar", "bazX"]:
273       for j in range(1, 100, 7):
274         self.assertEqual(utils.FindMatch(data, "x%s%s" % (i, j)),
275                          ((1, 2, 3), [i, str(j)]))
276
277   def testNoMatch(self):
278     self.assert_(utils.FindMatch({}, "") is None)
279     self.assert_(utils.FindMatch({}, "foo") is None)
280     self.assert_(utils.FindMatch({}, 1234) is None)
281
282     data = {
283       "X": "Hello World",
284       re.compile("^(something)$"): "Hello World",
285       }
286
287     self.assert_(utils.FindMatch(data, "") is None)
288     self.assert_(utils.FindMatch(data, "Hello World") is None)
289
290
291 class TestTryConvert(unittest.TestCase):
292   def test(self):
293     for src, fn, result in [
294       ("1", int, 1),
295       ("a", int, "a"),
296       ("", bool, False),
297       ("a", bool, True),
298       ]:
299       self.assertEqual(utils.TryConvert(fn, src), result)
300
301
302 if __name__ == '__main__':
303   testutils.GanetiTestProgram()