Merge branch 'devel-2.5'
[ganeti-local] / test / ganeti.utils.text_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for testing ganeti.utils.text"""
23
24 import re
25 import string
26 import time
27 import unittest
28 import os
29
30 from cStringIO import StringIO
31
32 from ganeti import constants
33 from ganeti import utils
34 from ganeti import errors
35
36 import testutils
37
38
39 class TestMatchNameComponent(unittest.TestCase):
40   """Test case for the MatchNameComponent function"""
41
42   def testEmptyList(self):
43     """Test that there is no match against an empty list"""
44     self.failUnlessEqual(utils.MatchNameComponent("", []), None)
45     self.failUnlessEqual(utils.MatchNameComponent("test", []), None)
46
47   def testSingleMatch(self):
48     """Test that a single match is performed correctly"""
49     mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
50     for key in "test2", "test2.example", "test2.example.com":
51       self.failUnlessEqual(utils.MatchNameComponent(key, mlist), mlist[1])
52
53   def testMultipleMatches(self):
54     """Test that a multiple match is returned as None"""
55     mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
56     for key in "test1", "test1.example":
57       self.failUnlessEqual(utils.MatchNameComponent(key, mlist), None)
58
59   def testFullMatch(self):
60     """Test that a full match is returned correctly"""
61     key1 = "test1"
62     key2 = "test1.example"
63     mlist = [key2, key2 + ".com"]
64     self.failUnlessEqual(utils.MatchNameComponent(key1, mlist), None)
65     self.failUnlessEqual(utils.MatchNameComponent(key2, mlist), key2)
66
67   def testCaseInsensitivePartialMatch(self):
68     """Test for the case_insensitive keyword"""
69     mlist = ["test1.example.com", "test2.example.net"]
70     self.assertEqual(utils.MatchNameComponent("test2", mlist,
71                                               case_sensitive=False),
72                      "test2.example.net")
73     self.assertEqual(utils.MatchNameComponent("Test2", mlist,
74                                               case_sensitive=False),
75                      "test2.example.net")
76     self.assertEqual(utils.MatchNameComponent("teSt2", mlist,
77                                               case_sensitive=False),
78                      "test2.example.net")
79     self.assertEqual(utils.MatchNameComponent("TeSt2", mlist,
80                                               case_sensitive=False),
81                      "test2.example.net")
82
83   def testCaseInsensitiveFullMatch(self):
84     mlist = ["ts1.ex", "ts1.ex.org", "ts2.ex", "Ts2.ex"]
85
86     # Between the two ts1 a full string match non-case insensitive should work
87     self.assertEqual(utils.MatchNameComponent("Ts1", mlist,
88                                               case_sensitive=False),
89                      None)
90     self.assertEqual(utils.MatchNameComponent("Ts1.ex", mlist,
91                                               case_sensitive=False),
92                      "ts1.ex")
93     self.assertEqual(utils.MatchNameComponent("ts1.ex", mlist,
94                                               case_sensitive=False),
95                      "ts1.ex")
96
97     # Between the two ts2 only case differs, so only case-match works
98     self.assertEqual(utils.MatchNameComponent("ts2.ex", mlist,
99                                               case_sensitive=False),
100                      "ts2.ex")
101     self.assertEqual(utils.MatchNameComponent("Ts2.ex", mlist,
102                                               case_sensitive=False),
103                      "Ts2.ex")
104     self.assertEqual(utils.MatchNameComponent("TS2.ex", mlist,
105                                               case_sensitive=False),
106                      None)
107
108
109 class TestDnsNameGlobPattern(unittest.TestCase):
110   def setUp(self):
111     self.names = [
112       "node1.example.com",
113       "node2-0.example.com",
114       "node2-1.example.com",
115       "node1.example.net",
116       "web1.example.com",
117       "web2.example.com",
118       "sub.site.example.com",
119       ]
120
121   def _Test(self, pattern):
122     re_pat = utils.DnsNameGlobPattern(pattern)
123
124     return filter(re.compile(re_pat).match, self.names)
125
126   def test(self):
127     for pattern in ["xyz", "node", " ", "example.net", "x*.example.*",
128                     "x*.example.com"]:
129       self.assertEqual(self._Test(pattern), [])
130
131     for pattern in ["*", "???*"]:
132       self.assertEqual(self._Test(pattern), self.names)
133
134     self.assertEqual(self._Test("node1.*.net"), ["node1.example.net"])
135     self.assertEqual(self._Test("*.example.net"), ["node1.example.net"])
136     self.assertEqual(self._Test("web1.example.com"), ["web1.example.com"])
137
138     for pattern in ["*.*.*.*", "???", "*.site"]:
139       self.assertEqual(self._Test(pattern), ["sub.site.example.com"])
140
141     self.assertEqual(self._Test("node1"), [
142       "node1.example.com",
143       "node1.example.net",
144       ])
145     self.assertEqual(self._Test("node?*.example.*"), [
146       "node1.example.com",
147       "node2-0.example.com",
148       "node2-1.example.com",
149       "node1.example.net",
150       ])
151     self.assertEqual(self._Test("*-?"), [
152       "node2-0.example.com",
153       "node2-1.example.com",
154       ])
155     self.assertEqual(self._Test("node2-?.example.com"), [
156       "node2-0.example.com",
157       "node2-1.example.com",
158       ])
159
160
161 class TestFormatUnit(unittest.TestCase):
162   """Test case for the FormatUnit function"""
163
164   def testMiB(self):
165     self.assertEqual(utils.FormatUnit(1, "h"), "1M")
166     self.assertEqual(utils.FormatUnit(100, "h"), "100M")
167     self.assertEqual(utils.FormatUnit(1023, "h"), "1023M")
168
169     self.assertEqual(utils.FormatUnit(1, "m"), "1")
170     self.assertEqual(utils.FormatUnit(100, "m"), "100")
171     self.assertEqual(utils.FormatUnit(1023, "m"), "1023")
172
173     self.assertEqual(utils.FormatUnit(1024, "m"), "1024")
174     self.assertEqual(utils.FormatUnit(1536, "m"), "1536")
175     self.assertEqual(utils.FormatUnit(17133, "m"), "17133")
176     self.assertEqual(utils.FormatUnit(1024 * 1024 - 1, "m"), "1048575")
177
178   def testGiB(self):
179     self.assertEqual(utils.FormatUnit(1024, "h"), "1.0G")
180     self.assertEqual(utils.FormatUnit(1536, "h"), "1.5G")
181     self.assertEqual(utils.FormatUnit(17133, "h"), "16.7G")
182     self.assertEqual(utils.FormatUnit(1024 * 1024 - 1, "h"), "1024.0G")
183
184     self.assertEqual(utils.FormatUnit(1024, "g"), "1.0")
185     self.assertEqual(utils.FormatUnit(1536, "g"), "1.5")
186     self.assertEqual(utils.FormatUnit(17133, "g"), "16.7")
187     self.assertEqual(utils.FormatUnit(1024 * 1024 - 1, "g"), "1024.0")
188
189     self.assertEqual(utils.FormatUnit(1024 * 1024, "g"), "1024.0")
190     self.assertEqual(utils.FormatUnit(5120 * 1024, "g"), "5120.0")
191     self.assertEqual(utils.FormatUnit(29829 * 1024, "g"), "29829.0")
192
193   def testTiB(self):
194     self.assertEqual(utils.FormatUnit(1024 * 1024, "h"), "1.0T")
195     self.assertEqual(utils.FormatUnit(5120 * 1024, "h"), "5.0T")
196     self.assertEqual(utils.FormatUnit(29829 * 1024, "h"), "29.1T")
197
198     self.assertEqual(utils.FormatUnit(1024 * 1024, "t"), "1.0")
199     self.assertEqual(utils.FormatUnit(5120 * 1024, "t"), "5.0")
200     self.assertEqual(utils.FormatUnit(29829 * 1024, "t"), "29.1")
201
202   def testErrors(self):
203     self.assertRaises(errors.ProgrammerError, utils.FormatUnit, 1, "a")
204
205
206 class TestParseUnit(unittest.TestCase):
207   """Test case for the ParseUnit function"""
208
209   SCALES = (("", 1),
210             ("M", 1), ("G", 1024), ("T", 1024 * 1024),
211             ("MB", 1), ("GB", 1024), ("TB", 1024 * 1024),
212             ("MiB", 1), ("GiB", 1024), ("TiB", 1024 * 1024))
213
214   def testRounding(self):
215     self.assertEqual(utils.ParseUnit("0"), 0)
216     self.assertEqual(utils.ParseUnit("1"), 4)
217     self.assertEqual(utils.ParseUnit("2"), 4)
218     self.assertEqual(utils.ParseUnit("3"), 4)
219
220     self.assertEqual(utils.ParseUnit("124"), 124)
221     self.assertEqual(utils.ParseUnit("125"), 128)
222     self.assertEqual(utils.ParseUnit("126"), 128)
223     self.assertEqual(utils.ParseUnit("127"), 128)
224     self.assertEqual(utils.ParseUnit("128"), 128)
225     self.assertEqual(utils.ParseUnit("129"), 132)
226     self.assertEqual(utils.ParseUnit("130"), 132)
227
228   def testFloating(self):
229     self.assertEqual(utils.ParseUnit("0"), 0)
230     self.assertEqual(utils.ParseUnit("0.5"), 4)
231     self.assertEqual(utils.ParseUnit("1.75"), 4)
232     self.assertEqual(utils.ParseUnit("1.99"), 4)
233     self.assertEqual(utils.ParseUnit("2.00"), 4)
234     self.assertEqual(utils.ParseUnit("2.01"), 4)
235     self.assertEqual(utils.ParseUnit("3.99"), 4)
236     self.assertEqual(utils.ParseUnit("4.00"), 4)
237     self.assertEqual(utils.ParseUnit("4.01"), 8)
238     self.assertEqual(utils.ParseUnit("1.5G"), 1536)
239     self.assertEqual(utils.ParseUnit("1.8G"), 1844)
240     self.assertEqual(utils.ParseUnit("8.28T"), 8682212)
241
242   def testSuffixes(self):
243     for sep in ("", " ", "   ", "\t", "\t "):
244       for suffix, scale in self.SCALES:
245         for func in (lambda x: x, str.lower, str.upper):
246           self.assertEqual(utils.ParseUnit("1024" + sep + func(suffix)),
247                            1024 * scale)
248
249   def testInvalidInput(self):
250     for sep in ("-", "_", ",", "a"):
251       for suffix, _ in self.SCALES:
252         self.assertRaises(errors.UnitParseError, utils.ParseUnit,
253                           "1" + sep + suffix)
254
255     for suffix, _ in self.SCALES:
256       self.assertRaises(errors.UnitParseError, utils.ParseUnit,
257                         "1,3" + suffix)
258
259
260 class TestShellQuoting(unittest.TestCase):
261   """Test case for shell quoting functions"""
262
263   def testShellQuote(self):
264     self.assertEqual(utils.ShellQuote('abc'), "abc")
265     self.assertEqual(utils.ShellQuote('ab"c'), "'ab\"c'")
266     self.assertEqual(utils.ShellQuote("a'bc"), "'a'\\''bc'")
267     self.assertEqual(utils.ShellQuote("a b c"), "'a b c'")
268     self.assertEqual(utils.ShellQuote("a b\\ c"), "'a b\\ c'")
269
270   def testShellQuoteArgs(self):
271     self.assertEqual(utils.ShellQuoteArgs(['a', 'b', 'c']), "a b c")
272     self.assertEqual(utils.ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
273     self.assertEqual(utils.ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
274
275
276 class TestShellWriter(unittest.TestCase):
277   def test(self):
278     buf = StringIO()
279     sw = utils.ShellWriter(buf)
280     sw.Write("#!/bin/bash")
281     sw.Write("if true; then")
282     sw.IncIndent()
283     try:
284       sw.Write("echo true")
285
286       sw.Write("for i in 1 2 3")
287       sw.Write("do")
288       sw.IncIndent()
289       try:
290         self.assertEqual(sw._indent, 2)
291         sw.Write("date")
292       finally:
293         sw.DecIndent()
294       sw.Write("done")
295     finally:
296       sw.DecIndent()
297     sw.Write("echo %s", utils.ShellQuote("Hello World"))
298     sw.Write("exit 0")
299
300     self.assertEqual(sw._indent, 0)
301
302     output = buf.getvalue()
303
304     self.assert_(output.endswith("\n"))
305
306     lines = output.splitlines()
307     self.assertEqual(len(lines), 9)
308     self.assertEqual(lines[0], "#!/bin/bash")
309     self.assert_(re.match(r"^\s+date$", lines[5]))
310     self.assertEqual(lines[7], "echo 'Hello World'")
311
312   def testEmpty(self):
313     buf = StringIO()
314     sw = utils.ShellWriter(buf)
315     sw = None
316     self.assertEqual(buf.getvalue(), "")
317
318
319 class TestNormalizeAndValidateMac(unittest.TestCase):
320   def testInvalid(self):
321     self.assertRaises(errors.OpPrereqError,
322                       utils.NormalizeAndValidateMac, "xxx")
323
324   def testNormalization(self):
325     for mac in ["aa:bb:cc:dd:ee:ff", "00:AA:11:bB:22:cc"]:
326       self.assertEqual(utils.NormalizeAndValidateMac(mac), mac.lower())
327
328
329 class TestSafeEncode(unittest.TestCase):
330   """Test case for SafeEncode"""
331
332   def testAscii(self):
333     for txt in [string.digits, string.letters, string.punctuation]:
334       self.failUnlessEqual(txt, utils.SafeEncode(txt))
335
336   def testDoubleEncode(self):
337     for i in range(255):
338       txt = utils.SafeEncode(chr(i))
339       self.failUnlessEqual(txt, utils.SafeEncode(txt))
340
341   def testUnicode(self):
342     # 1024 is high enough to catch non-direct ASCII mappings
343     for i in range(1024):
344       txt = utils.SafeEncode(unichr(i))
345       self.failUnlessEqual(txt, utils.SafeEncode(txt))
346
347
348 class TestUnescapeAndSplit(unittest.TestCase):
349   """Testing case for UnescapeAndSplit"""
350
351   def setUp(self):
352     # testing more that one separator for regexp safety
353     self._seps = [",", "+", ".", ":"]
354
355   def testSimple(self):
356     a = ["a", "b", "c", "d"]
357     for sep in self._seps:
358       self.failUnlessEqual(utils.UnescapeAndSplit(sep.join(a), sep=sep), a)
359
360   def testEscape(self):
361     for sep in self._seps:
362       a = ["a", "b\\" + sep + "c", "d"]
363       b = ["a", "b" + sep + "c", "d"]
364       self.failUnlessEqual(utils.UnescapeAndSplit(sep.join(a), sep=sep), b)
365
366   def testDoubleEscape(self):
367     for sep in self._seps:
368       a = ["a", "b\\\\", "c", "d"]
369       b = ["a", "b\\", "c", "d"]
370       self.failUnlessEqual(utils.UnescapeAndSplit(sep.join(a), sep=sep), b)
371
372   def testThreeEscape(self):
373     for sep in self._seps:
374       a = ["a", "b\\\\\\" + sep + "c", "d"]
375       b = ["a", "b\\" + sep + "c", "d"]
376       self.failUnlessEqual(utils.UnescapeAndSplit(sep.join(a), sep=sep), b)
377
378   def testEscapeAtEnd(self):
379     for sep in self._seps:
380       self.assertEqual(utils.UnescapeAndSplit("\\", sep=sep), ["\\"])
381
382       a = ["a", "b\\", "c"]
383       b = ["a", "b" + sep + "c\\"]
384       self.assertEqual(utils.UnescapeAndSplit("%s\\" % sep.join(a), sep=sep), b)
385
386       a = ["\\" + sep, "\\" + sep, "c", "d\\.moo"]
387       b = [sep, sep, "c", "d.moo\\"]
388       self.assertEqual(utils.UnescapeAndSplit("%s\\" % sep.join(a), sep=sep), b)
389
390
391 class TestCommaJoin(unittest.TestCase):
392   def test(self):
393     self.assertEqual(utils.CommaJoin([]), "")
394     self.assertEqual(utils.CommaJoin([1, 2, 3]), "1, 2, 3")
395     self.assertEqual(utils.CommaJoin(["Hello"]), "Hello")
396     self.assertEqual(utils.CommaJoin(["Hello", "World"]), "Hello, World")
397     self.assertEqual(utils.CommaJoin(["Hello", "World", 99]),
398                      "Hello, World, 99")
399
400
401 class TestFormatTime(unittest.TestCase):
402   """Testing case for FormatTime"""
403
404   @staticmethod
405   def _TestInProcess(tz, timestamp, expected):
406     os.environ["TZ"] = tz
407     time.tzset()
408     return utils.FormatTime(timestamp) == expected
409
410   def _Test(self, *args):
411     # Need to use separate process as we want to change TZ
412     self.assert_(utils.RunInSeparateProcess(self._TestInProcess, *args))
413
414   def test(self):
415     self._Test("UTC", 0, "1970-01-01 00:00:00")
416     self._Test("America/Sao_Paulo", 1292606926, "2010-12-17 15:28:46")
417     self._Test("Europe/London", 1292606926, "2010-12-17 17:28:46")
418     self._Test("Europe/Zurich", 1292606926, "2010-12-17 18:28:46")
419     self._Test("Australia/Sydney", 1292606926, "2010-12-18 04:28:46")
420
421   def testNone(self):
422     self.failUnlessEqual(utils.FormatTime(None), "N/A")
423
424   def testInvalid(self):
425     self.failUnlessEqual(utils.FormatTime(()), "N/A")
426
427   def testNow(self):
428     # tests that we accept time.time input
429     utils.FormatTime(time.time())
430     # tests that we accept int input
431     utils.FormatTime(int(time.time()))
432
433
434 class TestFormatSeconds(unittest.TestCase):
435   def test(self):
436     self.assertEqual(utils.FormatSeconds(1), "1s")
437     self.assertEqual(utils.FormatSeconds(3600), "1h 0m 0s")
438     self.assertEqual(utils.FormatSeconds(3599), "59m 59s")
439     self.assertEqual(utils.FormatSeconds(7200), "2h 0m 0s")
440     self.assertEqual(utils.FormatSeconds(7201), "2h 0m 1s")
441     self.assertEqual(utils.FormatSeconds(7281), "2h 1m 21s")
442     self.assertEqual(utils.FormatSeconds(29119), "8h 5m 19s")
443     self.assertEqual(utils.FormatSeconds(19431228), "224d 21h 33m 48s")
444     self.assertEqual(utils.FormatSeconds(-1), "-1s")
445     self.assertEqual(utils.FormatSeconds(-282), "-282s")
446     self.assertEqual(utils.FormatSeconds(-29119), "-29119s")
447
448   def testFloat(self):
449     self.assertEqual(utils.FormatSeconds(1.3), "1s")
450     self.assertEqual(utils.FormatSeconds(1.9), "2s")
451     self.assertEqual(utils.FormatSeconds(3912.12311), "1h 5m 12s")
452     self.assertEqual(utils.FormatSeconds(3912.8), "1h 5m 13s")
453
454
455 class TestLineSplitter(unittest.TestCase):
456   def test(self):
457     lines = []
458     ls = utils.LineSplitter(lines.append)
459     ls.write("Hello World\n")
460     self.assertEqual(lines, [])
461     ls.write("Foo\n Bar\r\n ")
462     ls.write("Baz")
463     ls.write("Moo")
464     self.assertEqual(lines, [])
465     ls.flush()
466     self.assertEqual(lines, ["Hello World", "Foo", " Bar"])
467     ls.close()
468     self.assertEqual(lines, ["Hello World", "Foo", " Bar", " BazMoo"])
469
470   def _testExtra(self, line, all_lines, p1, p2):
471     self.assertEqual(p1, 999)
472     self.assertEqual(p2, "extra")
473     all_lines.append(line)
474
475   def testExtraArgsNoFlush(self):
476     lines = []
477     ls = utils.LineSplitter(self._testExtra, lines, 999, "extra")
478     ls.write("\n\nHello World\n")
479     ls.write("Foo\n Bar\r\n ")
480     ls.write("")
481     ls.write("Baz")
482     ls.write("Moo\n\nx\n")
483     self.assertEqual(lines, [])
484     ls.close()
485     self.assertEqual(lines, ["", "", "Hello World", "Foo", " Bar", " BazMoo",
486                              "", "x"])
487
488
489 class TestIsValidShellParam(unittest.TestCase):
490   def test(self):
491     for val, result in [
492       ("abc", True),
493       ("ab;cd", False),
494       ]:
495       self.assertEqual(utils.IsValidShellParam(val), result)
496
497
498 class TestBuildShellCmd(unittest.TestCase):
499   def test(self):
500     self.assertRaises(errors.ProgrammerError, utils.BuildShellCmd,
501                       "ls %s", "ab;cd")
502     self.assertEqual(utils.BuildShellCmd("ls %s", "ab"), "ls ab")
503
504
505 class TestOrdinal(unittest.TestCase):
506   def test(self):
507     checks = {
508       0: "0th", 1: "1st", 2: "2nd", 3: "3rd", 4: "4th", 5: "5th", 6: "6th",
509       7: "7th", 8: "8th", 9: "9th", 10: "10th", 11: "11th", 12: "12th",
510       13: "13th", 14: "14th", 15: "15th", 16: "16th", 17: "17th",
511       18: "18th", 19: "19th", 20: "20th", 21: "21st", 25: "25th", 30: "30th",
512       32: "32nd", 40: "40th", 50: "50th", 55: "55th", 60: "60th", 62: "62nd",
513       70: "70th", 80: "80th", 83: "83rd", 90: "90th", 91: "91st",
514       582: "582nd", 999: "999th",
515       }
516
517     for value, ordinal in checks.items():
518       self.assertEqual(utils.FormatOrdinal(value), ordinal)
519
520
521 if __name__ == "__main__":
522   testutils.GanetiTestProgram()