Migrate lib/cli.py from constants to pathutils
[ganeti-local] / test / ganeti.utils_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for unittesting the utils module"""
23
24 import errno
25 import fcntl
26 import glob
27 import os
28 import os.path
29 import re
30 import shutil
31 import signal
32 import socket
33 import stat
34 import tempfile
35 import time
36 import unittest
37 import warnings
38 import random
39 import operator
40
41 import testutils
42 from ganeti import constants
43 from ganeti import compat
44 from ganeti import utils
45 from ganeti import errors
46 from ganeti.utils import RunCmd, \
47      FirstFree, \
48      RunParts
49
50
51 class TestParseCpuMask(unittest.TestCase):
52   """Test case for the ParseCpuMask function."""
53
54   def testWellFormed(self):
55     self.assertEqual(utils.ParseCpuMask(""), [])
56     self.assertEqual(utils.ParseCpuMask("1"), [1])
57     self.assertEqual(utils.ParseCpuMask("0-2,4,5-5"), [0,1,2,4,5])
58
59   def testInvalidInput(self):
60     for data in ["garbage", "0,", "0-1-2", "2-1", "1-a"]:
61       self.assertRaises(errors.ParseError, utils.ParseCpuMask, data)
62
63
64 class TestParseMultiCpuMask(unittest.TestCase):
65   """Test case for the ParseMultiCpuMask function."""
66
67   def testWellFormed(self):
68     self.assertEqual(utils.ParseMultiCpuMask(""), [])
69     self.assertEqual(utils.ParseMultiCpuMask("1"), [[1]])
70     self.assertEqual(utils.ParseMultiCpuMask("0-2,4,5-5"), [[0, 1, 2, 4, 5]])
71     self.assertEqual(utils.ParseMultiCpuMask("all"), [[-1]])
72     self.assertEqual(utils.ParseMultiCpuMask("0-2:all:4,6-8"),
73       [[0, 1, 2], [-1], [4, 6, 7, 8]])
74
75   def testInvalidInput(self):
76     for data in ["garbage", "0,", "0-1-2", "2-1", "1-a", "all-all"]:
77       self.assertRaises(errors.ParseError, utils.ParseCpuMask, data)
78
79
80 class TestGetMounts(unittest.TestCase):
81   """Test case for GetMounts()."""
82
83   TESTDATA = (
84     "rootfs /     rootfs rw 0 0\n"
85     "none   /sys  sysfs  rw,nosuid,nodev,noexec,relatime 0 0\n"
86     "none   /proc proc   rw,nosuid,nodev,noexec,relatime 0 0\n")
87
88   def setUp(self):
89     self.tmpfile = tempfile.NamedTemporaryFile()
90     utils.WriteFile(self.tmpfile.name, data=self.TESTDATA)
91
92   def testGetMounts(self):
93     self.assertEqual(utils.GetMounts(filename=self.tmpfile.name),
94       [
95         ("rootfs", "/", "rootfs", "rw"),
96         ("none", "/sys", "sysfs", "rw,nosuid,nodev,noexec,relatime"),
97         ("none", "/proc", "proc", "rw,nosuid,nodev,noexec,relatime"),
98       ])
99
100
101 class TestFirstFree(unittest.TestCase):
102   """Test case for the FirstFree function"""
103
104   def test(self):
105     """Test FirstFree"""
106     self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
107     self.failUnlessEqual(FirstFree([]), None)
108     self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
109     self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
110     self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
111
112
113 class TestTimeFunctions(unittest.TestCase):
114   """Test case for time functions"""
115
116   def runTest(self):
117     self.assertEqual(utils.SplitTime(1), (1, 0))
118     self.assertEqual(utils.SplitTime(1.5), (1, 500000))
119     self.assertEqual(utils.SplitTime(1218448917.4809151), (1218448917, 480915))
120     self.assertEqual(utils.SplitTime(123.48012), (123, 480120))
121     self.assertEqual(utils.SplitTime(123.9996), (123, 999600))
122     self.assertEqual(utils.SplitTime(123.9995), (123, 999500))
123     self.assertEqual(utils.SplitTime(123.9994), (123, 999400))
124     self.assertEqual(utils.SplitTime(123.999999999), (123, 999999))
125
126     self.assertRaises(AssertionError, utils.SplitTime, -1)
127
128     self.assertEqual(utils.MergeTime((1, 0)), 1.0)
129     self.assertEqual(utils.MergeTime((1, 500000)), 1.5)
130     self.assertEqual(utils.MergeTime((1218448917, 500000)), 1218448917.5)
131
132     self.assertEqual(round(utils.MergeTime((1218448917, 481000)), 3),
133                      1218448917.481)
134     self.assertEqual(round(utils.MergeTime((1, 801000)), 3), 1.801)
135
136     self.assertRaises(AssertionError, utils.MergeTime, (0, -1))
137     self.assertRaises(AssertionError, utils.MergeTime, (0, 1000000))
138     self.assertRaises(AssertionError, utils.MergeTime, (0, 9999999))
139     self.assertRaises(AssertionError, utils.MergeTime, (-1, 0))
140     self.assertRaises(AssertionError, utils.MergeTime, (-9999, 0))
141
142
143 class FieldSetTestCase(unittest.TestCase):
144   """Test case for FieldSets"""
145
146   def testSimpleMatch(self):
147     f = utils.FieldSet("a", "b", "c", "def")
148     self.failUnless(f.Matches("a"))
149     self.failIf(f.Matches("d"), "Substring matched")
150     self.failIf(f.Matches("defghi"), "Prefix string matched")
151     self.failIf(f.NonMatching(["b", "c"]))
152     self.failIf(f.NonMatching(["a", "b", "c", "def"]))
153     self.failUnless(f.NonMatching(["a", "d"]))
154
155   def testRegexMatch(self):
156     f = utils.FieldSet("a", "b([0-9]+)", "c")
157     self.failUnless(f.Matches("b1"))
158     self.failUnless(f.Matches("b99"))
159     self.failIf(f.Matches("b/1"))
160     self.failIf(f.NonMatching(["b12", "c"]))
161     self.failUnless(f.NonMatching(["a", "1"]))
162
163 class TestForceDictType(unittest.TestCase):
164   """Test case for ForceDictType"""
165   KEY_TYPES = {
166     "a": constants.VTYPE_INT,
167     "b": constants.VTYPE_BOOL,
168     "c": constants.VTYPE_STRING,
169     "d": constants.VTYPE_SIZE,
170     "e": constants.VTYPE_MAYBE_STRING,
171     }
172
173   def _fdt(self, dict, allowed_values=None):
174     if allowed_values is None:
175       utils.ForceDictType(dict, self.KEY_TYPES)
176     else:
177       utils.ForceDictType(dict, self.KEY_TYPES, allowed_values=allowed_values)
178
179     return dict
180
181   def testSimpleDict(self):
182     self.assertEqual(self._fdt({}), {})
183     self.assertEqual(self._fdt({'a': 1}), {'a': 1})
184     self.assertEqual(self._fdt({'a': '1'}), {'a': 1})
185     self.assertEqual(self._fdt({'a': 1, 'b': 1}), {'a':1, 'b': True})
186     self.assertEqual(self._fdt({'b': 1, 'c': 'foo'}), {'b': True, 'c': 'foo'})
187     self.assertEqual(self._fdt({'b': 1, 'c': False}), {'b': True, 'c': ''})
188     self.assertEqual(self._fdt({'b': 'false'}), {'b': False})
189     self.assertEqual(self._fdt({'b': 'False'}), {'b': False})
190     self.assertEqual(self._fdt({'b': False}), {'b': False})
191     self.assertEqual(self._fdt({'b': 'true'}), {'b': True})
192     self.assertEqual(self._fdt({'b': 'True'}), {'b': True})
193     self.assertEqual(self._fdt({'d': '4'}), {'d': 4})
194     self.assertEqual(self._fdt({'d': '4M'}), {'d': 4})
195     self.assertEqual(self._fdt({"e": None, }), {"e": None, })
196     self.assertEqual(self._fdt({"e": "Hello World", }), {"e": "Hello World", })
197     self.assertEqual(self._fdt({"e": False, }), {"e": '', })
198     self.assertEqual(self._fdt({"b": "hello", }, ["hello"]), {"b": "hello"})
199
200   def testErrors(self):
201     self.assertRaises(errors.TypeEnforcementError, self._fdt, {'a': 'astring'})
202     self.assertRaises(errors.TypeEnforcementError, self._fdt, {"b": "hello"})
203     self.assertRaises(errors.TypeEnforcementError, self._fdt, {'c': True})
204     self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': 'astring'})
205     self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': '4 L'})
206     self.assertRaises(errors.TypeEnforcementError, self._fdt, {"e": object(), })
207     self.assertRaises(errors.TypeEnforcementError, self._fdt, {"e": [], })
208     self.assertRaises(errors.TypeEnforcementError, self._fdt, {"x": None, })
209     self.assertRaises(errors.TypeEnforcementError, self._fdt, [])
210     self.assertRaises(errors.ProgrammerError, utils.ForceDictType,
211                       {"b": "hello"}, {"b": "no-such-type"})
212
213
214 class TestValidateServiceName(unittest.TestCase):
215   def testValid(self):
216     testnames = [
217       0, 1, 2, 3, 1024, 65000, 65534, 65535,
218       "ganeti",
219       "gnt-masterd",
220       "HELLO_WORLD_SVC",
221       "hello.world.1",
222       "0", "80", "1111", "65535",
223       ]
224
225     for name in testnames:
226       self.assertEqual(utils.ValidateServiceName(name), name)
227
228   def testInvalid(self):
229     testnames = [
230       -15756, -1, 65536, 133428083,
231       "", "Hello World!", "!", "'", "\"", "\t", "\n", "`",
232       "-8546", "-1", "65536",
233       (129 * "A"),
234       ]
235
236     for name in testnames:
237       self.assertRaises(errors.OpPrereqError, utils.ValidateServiceName, name)
238
239
240 class TestReadLockedPidFile(unittest.TestCase):
241   def setUp(self):
242     self.tmpdir = tempfile.mkdtemp()
243
244   def tearDown(self):
245     shutil.rmtree(self.tmpdir)
246
247   def testNonExistent(self):
248     path = utils.PathJoin(self.tmpdir, "nonexist")
249     self.assert_(utils.ReadLockedPidFile(path) is None)
250
251   def testUnlocked(self):
252     path = utils.PathJoin(self.tmpdir, "pid")
253     utils.WriteFile(path, data="123")
254     self.assert_(utils.ReadLockedPidFile(path) is None)
255
256   def testLocked(self):
257     path = utils.PathJoin(self.tmpdir, "pid")
258     utils.WriteFile(path, data="123")
259
260     fl = utils.FileLock.Open(path)
261     try:
262       fl.Exclusive(blocking=True)
263
264       self.assertEqual(utils.ReadLockedPidFile(path), 123)
265     finally:
266       fl.Close()
267
268     self.assert_(utils.ReadLockedPidFile(path) is None)
269
270   def testError(self):
271     path = utils.PathJoin(self.tmpdir, "foobar", "pid")
272     utils.WriteFile(utils.PathJoin(self.tmpdir, "foobar"), data="")
273     # open(2) should return ENOTDIR
274     self.assertRaises(EnvironmentError, utils.ReadLockedPidFile, path)
275
276
277 class TestFindMatch(unittest.TestCase):
278   def test(self):
279     data = {
280       "aaaa": "Four A",
281       "bb": {"Two B": True},
282       re.compile(r"^x(foo|bar|bazX)([0-9]+)$"): (1, 2, 3),
283       }
284
285     self.assertEqual(utils.FindMatch(data, "aaaa"), ("Four A", []))
286     self.assertEqual(utils.FindMatch(data, "bb"), ({"Two B": True}, []))
287
288     for i in ["foo", "bar", "bazX"]:
289       for j in range(1, 100, 7):
290         self.assertEqual(utils.FindMatch(data, "x%s%s" % (i, j)),
291                          ((1, 2, 3), [i, str(j)]))
292
293   def testNoMatch(self):
294     self.assert_(utils.FindMatch({}, "") is None)
295     self.assert_(utils.FindMatch({}, "foo") is None)
296     self.assert_(utils.FindMatch({}, 1234) is None)
297
298     data = {
299       "X": "Hello World",
300       re.compile("^(something)$"): "Hello World",
301       }
302
303     self.assert_(utils.FindMatch(data, "") is None)
304     self.assert_(utils.FindMatch(data, "Hello World") is None)
305
306
307 class TestTryConvert(unittest.TestCase):
308   def test(self):
309     for src, fn, result in [
310       ("1", int, 1),
311       ("a", int, "a"),
312       ("", bool, False),
313       ("a", bool, True),
314       ]:
315       self.assertEqual(utils.TryConvert(fn, src), result)
316
317
318 class TestVerifyDictOptions(unittest.TestCase):
319   def setUp(self):
320     self.defaults = {
321       "first_key": "foobar",
322       "foobar": {
323         "key1": "value2",
324         "key2": "value1",
325         },
326       "another_key": "another_value",
327       }
328
329   def test(self):
330     some_keys = {
331       "first_key": "blubb",
332       "foobar": {
333         "key2": "foo",
334         },
335       }
336     utils.VerifyDictOptions(some_keys, self.defaults)
337
338   def testInvalid(self):
339     some_keys = {
340       "invalid_key": "blubb",
341       "foobar": {
342         "key2": "foo",
343         },
344       }
345     self.assertRaises(errors.OpPrereqError, utils.VerifyDictOptions,
346                       some_keys, self.defaults)
347
348   def testNestedInvalid(self):
349     some_keys = {
350       "foobar": {
351         "key2": "foo",
352         "key3": "blibb"
353         },
354       }
355     self.assertRaises(errors.OpPrereqError, utils.VerifyDictOptions,
356                       some_keys, self.defaults)
357
358   def testMultiInvalid(self):
359     some_keys = {
360         "foobar": {
361           "key1": "value3",
362           "key6": "Right here",
363         },
364         "invalid_with_sub": {
365           "sub1": "value3",
366         },
367       }
368     self.assertRaises(errors.OpPrereqError, utils.VerifyDictOptions,
369                       some_keys, self.defaults)
370
371
372 if __name__ == '__main__':
373   testutils.GanetiTestProgram()