4 # Copyright (C) 2011 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for testing ganeti.utils.text"""
30 from cStringIO import StringIO
32 from ganeti import constants
33 from ganeti import utils
34 from ganeti import errors
39 class TestMatchNameComponent(unittest.TestCase):
40 """Test case for the MatchNameComponent function"""
42 def testEmptyList(self):
43 """Test that there is no match against an empty list"""
44 self.failUnlessEqual(utils.MatchNameComponent("", []), None)
45 self.failUnlessEqual(utils.MatchNameComponent("test", []), None)
47 def testSingleMatch(self):
48 """Test that a single match is performed correctly"""
49 mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
50 for key in "test2", "test2.example", "test2.example.com":
51 self.failUnlessEqual(utils.MatchNameComponent(key, mlist), mlist[1])
53 def testMultipleMatches(self):
54 """Test that a multiple match is returned as None"""
55 mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
56 for key in "test1", "test1.example":
57 self.failUnlessEqual(utils.MatchNameComponent(key, mlist), None)
59 def testFullMatch(self):
60 """Test that a full match is returned correctly"""
62 key2 = "test1.example"
63 mlist = [key2, key2 + ".com"]
64 self.failUnlessEqual(utils.MatchNameComponent(key1, mlist), None)
65 self.failUnlessEqual(utils.MatchNameComponent(key2, mlist), key2)
67 def testCaseInsensitivePartialMatch(self):
68 """Test for the case_insensitive keyword"""
69 mlist = ["test1.example.com", "test2.example.net"]
70 self.assertEqual(utils.MatchNameComponent("test2", mlist,
71 case_sensitive=False),
73 self.assertEqual(utils.MatchNameComponent("Test2", mlist,
74 case_sensitive=False),
76 self.assertEqual(utils.MatchNameComponent("teSt2", mlist,
77 case_sensitive=False),
79 self.assertEqual(utils.MatchNameComponent("TeSt2", mlist,
80 case_sensitive=False),
83 def testCaseInsensitiveFullMatch(self):
84 mlist = ["ts1.ex", "ts1.ex.org", "ts2.ex", "Ts2.ex"]
86 # Between the two ts1 a full string match non-case insensitive should work
87 self.assertEqual(utils.MatchNameComponent("Ts1", mlist,
88 case_sensitive=False),
90 self.assertEqual(utils.MatchNameComponent("Ts1.ex", mlist,
91 case_sensitive=False),
93 self.assertEqual(utils.MatchNameComponent("ts1.ex", mlist,
94 case_sensitive=False),
97 # Between the two ts2 only case differs, so only case-match works
98 self.assertEqual(utils.MatchNameComponent("ts2.ex", mlist,
99 case_sensitive=False),
101 self.assertEqual(utils.MatchNameComponent("Ts2.ex", mlist,
102 case_sensitive=False),
104 self.assertEqual(utils.MatchNameComponent("TS2.ex", mlist,
105 case_sensitive=False),
109 class TestFormatUnit(unittest.TestCase):
110 """Test case for the FormatUnit function"""
113 self.assertEqual(utils.FormatUnit(1, "h"), "1M")
114 self.assertEqual(utils.FormatUnit(100, "h"), "100M")
115 self.assertEqual(utils.FormatUnit(1023, "h"), "1023M")
117 self.assertEqual(utils.FormatUnit(1, "m"), "1")
118 self.assertEqual(utils.FormatUnit(100, "m"), "100")
119 self.assertEqual(utils.FormatUnit(1023, "m"), "1023")
121 self.assertEqual(utils.FormatUnit(1024, "m"), "1024")
122 self.assertEqual(utils.FormatUnit(1536, "m"), "1536")
123 self.assertEqual(utils.FormatUnit(17133, "m"), "17133")
124 self.assertEqual(utils.FormatUnit(1024 * 1024 - 1, "m"), "1048575")
127 self.assertEqual(utils.FormatUnit(1024, "h"), "1.0G")
128 self.assertEqual(utils.FormatUnit(1536, "h"), "1.5G")
129 self.assertEqual(utils.FormatUnit(17133, "h"), "16.7G")
130 self.assertEqual(utils.FormatUnit(1024 * 1024 - 1, "h"), "1024.0G")
132 self.assertEqual(utils.FormatUnit(1024, "g"), "1.0")
133 self.assertEqual(utils.FormatUnit(1536, "g"), "1.5")
134 self.assertEqual(utils.FormatUnit(17133, "g"), "16.7")
135 self.assertEqual(utils.FormatUnit(1024 * 1024 - 1, "g"), "1024.0")
137 self.assertEqual(utils.FormatUnit(1024 * 1024, "g"), "1024.0")
138 self.assertEqual(utils.FormatUnit(5120 * 1024, "g"), "5120.0")
139 self.assertEqual(utils.FormatUnit(29829 * 1024, "g"), "29829.0")
142 self.assertEqual(utils.FormatUnit(1024 * 1024, "h"), "1.0T")
143 self.assertEqual(utils.FormatUnit(5120 * 1024, "h"), "5.0T")
144 self.assertEqual(utils.FormatUnit(29829 * 1024, "h"), "29.1T")
146 self.assertEqual(utils.FormatUnit(1024 * 1024, "t"), "1.0")
147 self.assertEqual(utils.FormatUnit(5120 * 1024, "t"), "5.0")
148 self.assertEqual(utils.FormatUnit(29829 * 1024, "t"), "29.1")
150 def testErrors(self):
151 self.assertRaises(errors.ProgrammerError, utils.FormatUnit, 1, "a")
154 class TestParseUnit(unittest.TestCase):
155 """Test case for the ParseUnit function"""
158 ("M", 1), ("G", 1024), ("T", 1024 * 1024),
159 ("MB", 1), ("GB", 1024), ("TB", 1024 * 1024),
160 ("MiB", 1), ("GiB", 1024), ("TiB", 1024 * 1024))
162 def testRounding(self):
163 self.assertEqual(utils.ParseUnit("0"), 0)
164 self.assertEqual(utils.ParseUnit("1"), 4)
165 self.assertEqual(utils.ParseUnit("2"), 4)
166 self.assertEqual(utils.ParseUnit("3"), 4)
168 self.assertEqual(utils.ParseUnit("124"), 124)
169 self.assertEqual(utils.ParseUnit("125"), 128)
170 self.assertEqual(utils.ParseUnit("126"), 128)
171 self.assertEqual(utils.ParseUnit("127"), 128)
172 self.assertEqual(utils.ParseUnit("128"), 128)
173 self.assertEqual(utils.ParseUnit("129"), 132)
174 self.assertEqual(utils.ParseUnit("130"), 132)
176 def testFloating(self):
177 self.assertEqual(utils.ParseUnit("0"), 0)
178 self.assertEqual(utils.ParseUnit("0.5"), 4)
179 self.assertEqual(utils.ParseUnit("1.75"), 4)
180 self.assertEqual(utils.ParseUnit("1.99"), 4)
181 self.assertEqual(utils.ParseUnit("2.00"), 4)
182 self.assertEqual(utils.ParseUnit("2.01"), 4)
183 self.assertEqual(utils.ParseUnit("3.99"), 4)
184 self.assertEqual(utils.ParseUnit("4.00"), 4)
185 self.assertEqual(utils.ParseUnit("4.01"), 8)
186 self.assertEqual(utils.ParseUnit("1.5G"), 1536)
187 self.assertEqual(utils.ParseUnit("1.8G"), 1844)
188 self.assertEqual(utils.ParseUnit("8.28T"), 8682212)
190 def testSuffixes(self):
191 for sep in ("", " ", " ", "\t", "\t "):
192 for suffix, scale in self.SCALES:
193 for func in (lambda x: x, str.lower, str.upper):
194 self.assertEqual(utils.ParseUnit("1024" + sep + func(suffix)),
197 def testInvalidInput(self):
198 for sep in ("-", "_", ",", "a"):
199 for suffix, _ in self.SCALES:
200 self.assertRaises(errors.UnitParseError, utils.ParseUnit,
203 for suffix, _ in self.SCALES:
204 self.assertRaises(errors.UnitParseError, utils.ParseUnit,
208 class TestShellQuoting(unittest.TestCase):
209 """Test case for shell quoting functions"""
211 def testShellQuote(self):
212 self.assertEqual(utils.ShellQuote('abc'), "abc")
213 self.assertEqual(utils.ShellQuote('ab"c'), "'ab\"c'")
214 self.assertEqual(utils.ShellQuote("a'bc"), "'a'\\''bc'")
215 self.assertEqual(utils.ShellQuote("a b c"), "'a b c'")
216 self.assertEqual(utils.ShellQuote("a b\\ c"), "'a b\\ c'")
218 def testShellQuoteArgs(self):
219 self.assertEqual(utils.ShellQuoteArgs(['a', 'b', 'c']), "a b c")
220 self.assertEqual(utils.ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
221 self.assertEqual(utils.ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
224 class TestShellWriter(unittest.TestCase):
227 sw = utils.ShellWriter(buf)
228 sw.Write("#!/bin/bash")
229 sw.Write("if true; then")
232 sw.Write("echo true")
234 sw.Write("for i in 1 2 3")
238 self.assertEqual(sw._indent, 2)
245 sw.Write("echo %s", utils.ShellQuote("Hello World"))
248 self.assertEqual(sw._indent, 0)
250 output = buf.getvalue()
252 self.assert_(output.endswith("\n"))
254 lines = output.splitlines()
255 self.assertEqual(len(lines), 9)
256 self.assertEqual(lines[0], "#!/bin/bash")
257 self.assert_(re.match(r"^\s+date$", lines[5]))
258 self.assertEqual(lines[7], "echo 'Hello World'")
262 sw = utils.ShellWriter(buf)
264 self.assertEqual(buf.getvalue(), "")
267 class TestNormalizeAndValidateMac(unittest.TestCase):
268 def testInvalid(self):
269 self.assertRaises(errors.OpPrereqError,
270 utils.NormalizeAndValidateMac, "xxx")
272 def testNormalization(self):
273 for mac in ["aa:bb:cc:dd:ee:ff", "00:AA:11:bB:22:cc"]:
274 self.assertEqual(utils.NormalizeAndValidateMac(mac), mac.lower())
277 class TestSafeEncode(unittest.TestCase):
278 """Test case for SafeEncode"""
281 for txt in [string.digits, string.letters, string.punctuation]:
282 self.failUnlessEqual(txt, utils.SafeEncode(txt))
284 def testDoubleEncode(self):
286 txt = utils.SafeEncode(chr(i))
287 self.failUnlessEqual(txt, utils.SafeEncode(txt))
289 def testUnicode(self):
290 # 1024 is high enough to catch non-direct ASCII mappings
291 for i in range(1024):
292 txt = utils.SafeEncode(unichr(i))
293 self.failUnlessEqual(txt, utils.SafeEncode(txt))
296 class TestUnescapeAndSplit(unittest.TestCase):
297 """Testing case for UnescapeAndSplit"""
300 # testing more that one separator for regexp safety
301 self._seps = [",", "+", "."]
303 def testSimple(self):
304 a = ["a", "b", "c", "d"]
305 for sep in self._seps:
306 self.failUnlessEqual(utils.UnescapeAndSplit(sep.join(a), sep=sep), a)
308 def testEscape(self):
309 for sep in self._seps:
310 a = ["a", "b\\" + sep + "c", "d"]
311 b = ["a", "b" + sep + "c", "d"]
312 self.failUnlessEqual(utils.UnescapeAndSplit(sep.join(a), sep=sep), b)
314 def testDoubleEscape(self):
315 for sep in self._seps:
316 a = ["a", "b\\\\", "c", "d"]
317 b = ["a", "b\\", "c", "d"]
318 self.failUnlessEqual(utils.UnescapeAndSplit(sep.join(a), sep=sep), b)
320 def testThreeEscape(self):
321 for sep in self._seps:
322 a = ["a", "b\\\\\\" + sep + "c", "d"]
323 b = ["a", "b\\" + sep + "c", "d"]
324 self.failUnlessEqual(utils.UnescapeAndSplit(sep.join(a), sep=sep), b)
327 class TestCommaJoin(unittest.TestCase):
329 self.assertEqual(utils.CommaJoin([]), "")
330 self.assertEqual(utils.CommaJoin([1, 2, 3]), "1, 2, 3")
331 self.assertEqual(utils.CommaJoin(["Hello"]), "Hello")
332 self.assertEqual(utils.CommaJoin(["Hello", "World"]), "Hello, World")
333 self.assertEqual(utils.CommaJoin(["Hello", "World", 99]),
337 class TestFormatTime(unittest.TestCase):
338 """Testing case for FormatTime"""
341 def _TestInProcess(tz, timestamp, expected):
342 os.environ["TZ"] = tz
344 return utils.FormatTime(timestamp) == expected
346 def _Test(self, *args):
347 # Need to use separate process as we want to change TZ
348 self.assert_(utils.RunInSeparateProcess(self._TestInProcess, *args))
351 self._Test("UTC", 0, "1970-01-01 00:00:00")
352 self._Test("America/Sao_Paulo", 1292606926, "2010-12-17 15:28:46")
353 self._Test("Europe/London", 1292606926, "2010-12-17 17:28:46")
354 self._Test("Europe/Zurich", 1292606926, "2010-12-17 18:28:46")
355 self._Test("Australia/Sydney", 1292606926, "2010-12-18 04:28:46")
358 self.failUnlessEqual(utils.FormatTime(None), "N/A")
360 def testInvalid(self):
361 self.failUnlessEqual(utils.FormatTime(()), "N/A")
364 # tests that we accept time.time input
365 utils.FormatTime(time.time())
366 # tests that we accept int input
367 utils.FormatTime(int(time.time()))
370 class TestFormatSeconds(unittest.TestCase):
372 self.assertEqual(utils.FormatSeconds(1), "1s")
373 self.assertEqual(utils.FormatSeconds(3600), "1h 0m 0s")
374 self.assertEqual(utils.FormatSeconds(3599), "59m 59s")
375 self.assertEqual(utils.FormatSeconds(7200), "2h 0m 0s")
376 self.assertEqual(utils.FormatSeconds(7201), "2h 0m 1s")
377 self.assertEqual(utils.FormatSeconds(7281), "2h 1m 21s")
378 self.assertEqual(utils.FormatSeconds(29119), "8h 5m 19s")
379 self.assertEqual(utils.FormatSeconds(19431228), "224d 21h 33m 48s")
380 self.assertEqual(utils.FormatSeconds(-1), "-1s")
381 self.assertEqual(utils.FormatSeconds(-282), "-282s")
382 self.assertEqual(utils.FormatSeconds(-29119), "-29119s")
385 self.assertEqual(utils.FormatSeconds(1.3), "1s")
386 self.assertEqual(utils.FormatSeconds(1.9), "2s")
387 self.assertEqual(utils.FormatSeconds(3912.12311), "1h 5m 12s")
388 self.assertEqual(utils.FormatSeconds(3912.8), "1h 5m 13s")
391 class TestLineSplitter(unittest.TestCase):
394 ls = utils.LineSplitter(lines.append)
395 ls.write("Hello World\n")
396 self.assertEqual(lines, [])
397 ls.write("Foo\n Bar\r\n ")
400 self.assertEqual(lines, [])
402 self.assertEqual(lines, ["Hello World", "Foo", " Bar"])
404 self.assertEqual(lines, ["Hello World", "Foo", " Bar", " BazMoo"])
406 def _testExtra(self, line, all_lines, p1, p2):
407 self.assertEqual(p1, 999)
408 self.assertEqual(p2, "extra")
409 all_lines.append(line)
411 def testExtraArgsNoFlush(self):
413 ls = utils.LineSplitter(self._testExtra, lines, 999, "extra")
414 ls.write("\n\nHello World\n")
415 ls.write("Foo\n Bar\r\n ")
418 ls.write("Moo\n\nx\n")
419 self.assertEqual(lines, [])
421 self.assertEqual(lines, ["", "", "Hello World", "Foo", " Bar", " BazMoo",
425 class TestIsValidShellParam(unittest.TestCase):
431 self.assertEqual(utils.IsValidShellParam(val), result)
434 class TestBuildShellCmd(unittest.TestCase):
436 self.assertRaises(errors.ProgrammerError, utils.BuildShellCmd,
438 self.assertEqual(utils.BuildShellCmd("ls %s", "ab"), "ls ab")
441 class TestOrdinal(unittest.TestCase):
444 0: "0th", 1: "1st", 2: "2nd", 3: "3rd", 4: "4th", 5: "5th", 6: "6th",
445 7: "7th", 8: "8th", 9: "9th", 10: "10th", 11: "11th", 12: "12th",
446 13: "13th", 14: "14th", 15: "15th", 16: "16th", 17: "17th",
447 18: "18th", 19: "19th", 20: "20th", 21: "21st", 25: "25th", 30: "30th",
448 32: "32nd", 40: "40th", 50: "50th", 55: "55th", 60: "60th", 62: "62nd",
449 70: "70th", 80: "80th", 83: "83rd", 90: "90th", 91: "91st",
450 582: "582nd", 999: "999th",
453 for value, ordinal in checks.items():
454 self.assertEqual(utils.FormatOrdinal(value), ordinal)
457 if __name__ == "__main__":
458 testutils.GanetiTestProgram()