417a0016591b3f086462fde5c2de63945e9ec69e
[ganeti-local] / test / ganeti.utils.text_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for testing ganeti.utils.text"""
23
24 import re
25 import string
26 import time
27 import unittest
28 import os
29
30 from cStringIO import StringIO
31
32 from ganeti import constants
33 from ganeti import utils
34 from ganeti import errors
35
36 import testutils
37
38
39 class TestMatchNameComponent(unittest.TestCase):
40   """Test case for the MatchNameComponent function"""
41
42   def testEmptyList(self):
43     """Test that there is no match against an empty list"""
44     self.failUnlessEqual(utils.MatchNameComponent("", []), None)
45     self.failUnlessEqual(utils.MatchNameComponent("test", []), None)
46
47   def testSingleMatch(self):
48     """Test that a single match is performed correctly"""
49     mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
50     for key in "test2", "test2.example", "test2.example.com":
51       self.failUnlessEqual(utils.MatchNameComponent(key, mlist), mlist[1])
52
53   def testMultipleMatches(self):
54     """Test that a multiple match is returned as None"""
55     mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
56     for key in "test1", "test1.example":
57       self.failUnlessEqual(utils.MatchNameComponent(key, mlist), None)
58
59   def testFullMatch(self):
60     """Test that a full match is returned correctly"""
61     key1 = "test1"
62     key2 = "test1.example"
63     mlist = [key2, key2 + ".com"]
64     self.failUnlessEqual(utils.MatchNameComponent(key1, mlist), None)
65     self.failUnlessEqual(utils.MatchNameComponent(key2, mlist), key2)
66
67   def testCaseInsensitivePartialMatch(self):
68     """Test for the case_insensitive keyword"""
69     mlist = ["test1.example.com", "test2.example.net"]
70     self.assertEqual(utils.MatchNameComponent("test2", mlist,
71                                               case_sensitive=False),
72                      "test2.example.net")
73     self.assertEqual(utils.MatchNameComponent("Test2", mlist,
74                                               case_sensitive=False),
75                      "test2.example.net")
76     self.assertEqual(utils.MatchNameComponent("teSt2", mlist,
77                                               case_sensitive=False),
78                      "test2.example.net")
79     self.assertEqual(utils.MatchNameComponent("TeSt2", mlist,
80                                               case_sensitive=False),
81                      "test2.example.net")
82
83   def testCaseInsensitiveFullMatch(self):
84     mlist = ["ts1.ex", "ts1.ex.org", "ts2.ex", "Ts2.ex"]
85
86     # Between the two ts1 a full string match non-case insensitive should work
87     self.assertEqual(utils.MatchNameComponent("Ts1", mlist,
88                                               case_sensitive=False),
89                      None)
90     self.assertEqual(utils.MatchNameComponent("Ts1.ex", mlist,
91                                               case_sensitive=False),
92                      "ts1.ex")
93     self.assertEqual(utils.MatchNameComponent("ts1.ex", mlist,
94                                               case_sensitive=False),
95                      "ts1.ex")
96
97     # Between the two ts2 only case differs, so only case-match works
98     self.assertEqual(utils.MatchNameComponent("ts2.ex", mlist,
99                                               case_sensitive=False),
100                      "ts2.ex")
101     self.assertEqual(utils.MatchNameComponent("Ts2.ex", mlist,
102                                               case_sensitive=False),
103                      "Ts2.ex")
104     self.assertEqual(utils.MatchNameComponent("TS2.ex", mlist,
105                                               case_sensitive=False),
106                      None)
107
108
109 class TestFormatUnit(unittest.TestCase):
110   """Test case for the FormatUnit function"""
111
112   def testMiB(self):
113     self.assertEqual(utils.FormatUnit(1, "h"), "1M")
114     self.assertEqual(utils.FormatUnit(100, "h"), "100M")
115     self.assertEqual(utils.FormatUnit(1023, "h"), "1023M")
116
117     self.assertEqual(utils.FormatUnit(1, "m"), "1")
118     self.assertEqual(utils.FormatUnit(100, "m"), "100")
119     self.assertEqual(utils.FormatUnit(1023, "m"), "1023")
120
121     self.assertEqual(utils.FormatUnit(1024, "m"), "1024")
122     self.assertEqual(utils.FormatUnit(1536, "m"), "1536")
123     self.assertEqual(utils.FormatUnit(17133, "m"), "17133")
124     self.assertEqual(utils.FormatUnit(1024 * 1024 - 1, "m"), "1048575")
125
126   def testGiB(self):
127     self.assertEqual(utils.FormatUnit(1024, "h"), "1.0G")
128     self.assertEqual(utils.FormatUnit(1536, "h"), "1.5G")
129     self.assertEqual(utils.FormatUnit(17133, "h"), "16.7G")
130     self.assertEqual(utils.FormatUnit(1024 * 1024 - 1, "h"), "1024.0G")
131
132     self.assertEqual(utils.FormatUnit(1024, "g"), "1.0")
133     self.assertEqual(utils.FormatUnit(1536, "g"), "1.5")
134     self.assertEqual(utils.FormatUnit(17133, "g"), "16.7")
135     self.assertEqual(utils.FormatUnit(1024 * 1024 - 1, "g"), "1024.0")
136
137     self.assertEqual(utils.FormatUnit(1024 * 1024, "g"), "1024.0")
138     self.assertEqual(utils.FormatUnit(5120 * 1024, "g"), "5120.0")
139     self.assertEqual(utils.FormatUnit(29829 * 1024, "g"), "29829.0")
140
141   def testTiB(self):
142     self.assertEqual(utils.FormatUnit(1024 * 1024, "h"), "1.0T")
143     self.assertEqual(utils.FormatUnit(5120 * 1024, "h"), "5.0T")
144     self.assertEqual(utils.FormatUnit(29829 * 1024, "h"), "29.1T")
145
146     self.assertEqual(utils.FormatUnit(1024 * 1024, "t"), "1.0")
147     self.assertEqual(utils.FormatUnit(5120 * 1024, "t"), "5.0")
148     self.assertEqual(utils.FormatUnit(29829 * 1024, "t"), "29.1")
149
150   def testErrors(self):
151     self.assertRaises(errors.ProgrammerError, utils.FormatUnit, 1, "a")
152
153
154 class TestParseUnit(unittest.TestCase):
155   """Test case for the ParseUnit function"""
156
157   SCALES = (("", 1),
158             ("M", 1), ("G", 1024), ("T", 1024 * 1024),
159             ("MB", 1), ("GB", 1024), ("TB", 1024 * 1024),
160             ("MiB", 1), ("GiB", 1024), ("TiB", 1024 * 1024))
161
162   def testRounding(self):
163     self.assertEqual(utils.ParseUnit("0"), 0)
164     self.assertEqual(utils.ParseUnit("1"), 4)
165     self.assertEqual(utils.ParseUnit("2"), 4)
166     self.assertEqual(utils.ParseUnit("3"), 4)
167
168     self.assertEqual(utils.ParseUnit("124"), 124)
169     self.assertEqual(utils.ParseUnit("125"), 128)
170     self.assertEqual(utils.ParseUnit("126"), 128)
171     self.assertEqual(utils.ParseUnit("127"), 128)
172     self.assertEqual(utils.ParseUnit("128"), 128)
173     self.assertEqual(utils.ParseUnit("129"), 132)
174     self.assertEqual(utils.ParseUnit("130"), 132)
175
176   def testFloating(self):
177     self.assertEqual(utils.ParseUnit("0"), 0)
178     self.assertEqual(utils.ParseUnit("0.5"), 4)
179     self.assertEqual(utils.ParseUnit("1.75"), 4)
180     self.assertEqual(utils.ParseUnit("1.99"), 4)
181     self.assertEqual(utils.ParseUnit("2.00"), 4)
182     self.assertEqual(utils.ParseUnit("2.01"), 4)
183     self.assertEqual(utils.ParseUnit("3.99"), 4)
184     self.assertEqual(utils.ParseUnit("4.00"), 4)
185     self.assertEqual(utils.ParseUnit("4.01"), 8)
186     self.assertEqual(utils.ParseUnit("1.5G"), 1536)
187     self.assertEqual(utils.ParseUnit("1.8G"), 1844)
188     self.assertEqual(utils.ParseUnit("8.28T"), 8682212)
189
190   def testSuffixes(self):
191     for sep in ("", " ", "   ", "\t", "\t "):
192       for suffix, scale in self.SCALES:
193         for func in (lambda x: x, str.lower, str.upper):
194           self.assertEqual(utils.ParseUnit("1024" + sep + func(suffix)),
195                            1024 * scale)
196
197   def testInvalidInput(self):
198     for sep in ("-", "_", ",", "a"):
199       for suffix, _ in self.SCALES:
200         self.assertRaises(errors.UnitParseError, utils.ParseUnit,
201                           "1" + sep + suffix)
202
203     for suffix, _ in self.SCALES:
204       self.assertRaises(errors.UnitParseError, utils.ParseUnit,
205                         "1,3" + suffix)
206
207
208 class TestShellQuoting(unittest.TestCase):
209   """Test case for shell quoting functions"""
210
211   def testShellQuote(self):
212     self.assertEqual(utils.ShellQuote('abc'), "abc")
213     self.assertEqual(utils.ShellQuote('ab"c'), "'ab\"c'")
214     self.assertEqual(utils.ShellQuote("a'bc"), "'a'\\''bc'")
215     self.assertEqual(utils.ShellQuote("a b c"), "'a b c'")
216     self.assertEqual(utils.ShellQuote("a b\\ c"), "'a b\\ c'")
217
218   def testShellQuoteArgs(self):
219     self.assertEqual(utils.ShellQuoteArgs(['a', 'b', 'c']), "a b c")
220     self.assertEqual(utils.ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
221     self.assertEqual(utils.ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
222
223
224 class TestShellWriter(unittest.TestCase):
225   def test(self):
226     buf = StringIO()
227     sw = utils.ShellWriter(buf)
228     sw.Write("#!/bin/bash")
229     sw.Write("if true; then")
230     sw.IncIndent()
231     try:
232       sw.Write("echo true")
233
234       sw.Write("for i in 1 2 3")
235       sw.Write("do")
236       sw.IncIndent()
237       try:
238         self.assertEqual(sw._indent, 2)
239         sw.Write("date")
240       finally:
241         sw.DecIndent()
242       sw.Write("done")
243     finally:
244       sw.DecIndent()
245     sw.Write("echo %s", utils.ShellQuote("Hello World"))
246     sw.Write("exit 0")
247
248     self.assertEqual(sw._indent, 0)
249
250     output = buf.getvalue()
251
252     self.assert_(output.endswith("\n"))
253
254     lines = output.splitlines()
255     self.assertEqual(len(lines), 9)
256     self.assertEqual(lines[0], "#!/bin/bash")
257     self.assert_(re.match(r"^\s+date$", lines[5]))
258     self.assertEqual(lines[7], "echo 'Hello World'")
259
260   def testEmpty(self):
261     buf = StringIO()
262     sw = utils.ShellWriter(buf)
263     sw = None
264     self.assertEqual(buf.getvalue(), "")
265
266
267 class TestNormalizeAndValidateMac(unittest.TestCase):
268   def testInvalid(self):
269     self.assertRaises(errors.OpPrereqError,
270                       utils.NormalizeAndValidateMac, "xxx")
271
272   def testNormalization(self):
273     for mac in ["aa:bb:cc:dd:ee:ff", "00:AA:11:bB:22:cc"]:
274       self.assertEqual(utils.NormalizeAndValidateMac(mac), mac.lower())
275
276
277 class TestSafeEncode(unittest.TestCase):
278   """Test case for SafeEncode"""
279
280   def testAscii(self):
281     for txt in [string.digits, string.letters, string.punctuation]:
282       self.failUnlessEqual(txt, utils.SafeEncode(txt))
283
284   def testDoubleEncode(self):
285     for i in range(255):
286       txt = utils.SafeEncode(chr(i))
287       self.failUnlessEqual(txt, utils.SafeEncode(txt))
288
289   def testUnicode(self):
290     # 1024 is high enough to catch non-direct ASCII mappings
291     for i in range(1024):
292       txt = utils.SafeEncode(unichr(i))
293       self.failUnlessEqual(txt, utils.SafeEncode(txt))
294
295
296 class TestUnescapeAndSplit(unittest.TestCase):
297   """Testing case for UnescapeAndSplit"""
298
299   def setUp(self):
300     # testing more that one separator for regexp safety
301     self._seps = [",", "+", "."]
302
303   def testSimple(self):
304     a = ["a", "b", "c", "d"]
305     for sep in self._seps:
306       self.failUnlessEqual(utils.UnescapeAndSplit(sep.join(a), sep=sep), a)
307
308   def testEscape(self):
309     for sep in self._seps:
310       a = ["a", "b\\" + sep + "c", "d"]
311       b = ["a", "b" + sep + "c", "d"]
312       self.failUnlessEqual(utils.UnescapeAndSplit(sep.join(a), sep=sep), b)
313
314   def testDoubleEscape(self):
315     for sep in self._seps:
316       a = ["a", "b\\\\", "c", "d"]
317       b = ["a", "b\\", "c", "d"]
318       self.failUnlessEqual(utils.UnescapeAndSplit(sep.join(a), sep=sep), b)
319
320   def testThreeEscape(self):
321     for sep in self._seps:
322       a = ["a", "b\\\\\\" + sep + "c", "d"]
323       b = ["a", "b\\" + sep + "c", "d"]
324       self.failUnlessEqual(utils.UnescapeAndSplit(sep.join(a), sep=sep), b)
325
326
327 class TestCommaJoin(unittest.TestCase):
328   def test(self):
329     self.assertEqual(utils.CommaJoin([]), "")
330     self.assertEqual(utils.CommaJoin([1, 2, 3]), "1, 2, 3")
331     self.assertEqual(utils.CommaJoin(["Hello"]), "Hello")
332     self.assertEqual(utils.CommaJoin(["Hello", "World"]), "Hello, World")
333     self.assertEqual(utils.CommaJoin(["Hello", "World", 99]),
334                      "Hello, World, 99")
335
336
337 class TestFormatTime(unittest.TestCase):
338   """Testing case for FormatTime"""
339
340   @staticmethod
341   def _TestInProcess(tz, timestamp, expected):
342     os.environ["TZ"] = tz
343     time.tzset()
344     return utils.FormatTime(timestamp) == expected
345
346   def _Test(self, *args):
347     # Need to use separate process as we want to change TZ
348     self.assert_(utils.RunInSeparateProcess(self._TestInProcess, *args))
349
350   def test(self):
351     self._Test("UTC", 0, "1970-01-01 00:00:00")
352     self._Test("America/Sao_Paulo", 1292606926, "2010-12-17 15:28:46")
353     self._Test("Europe/London", 1292606926, "2010-12-17 17:28:46")
354     self._Test("Europe/Zurich", 1292606926, "2010-12-17 18:28:46")
355     self._Test("Australia/Sydney", 1292606926, "2010-12-18 04:28:46")
356
357   def testNone(self):
358     self.failUnlessEqual(utils.FormatTime(None), "N/A")
359
360   def testInvalid(self):
361     self.failUnlessEqual(utils.FormatTime(()), "N/A")
362
363   def testNow(self):
364     # tests that we accept time.time input
365     utils.FormatTime(time.time())
366     # tests that we accept int input
367     utils.FormatTime(int(time.time()))
368
369
370 class TestFormatSeconds(unittest.TestCase):
371   def test(self):
372     self.assertEqual(utils.FormatSeconds(1), "1s")
373     self.assertEqual(utils.FormatSeconds(3600), "1h 0m 0s")
374     self.assertEqual(utils.FormatSeconds(3599), "59m 59s")
375     self.assertEqual(utils.FormatSeconds(7200), "2h 0m 0s")
376     self.assertEqual(utils.FormatSeconds(7201), "2h 0m 1s")
377     self.assertEqual(utils.FormatSeconds(7281), "2h 1m 21s")
378     self.assertEqual(utils.FormatSeconds(29119), "8h 5m 19s")
379     self.assertEqual(utils.FormatSeconds(19431228), "224d 21h 33m 48s")
380     self.assertEqual(utils.FormatSeconds(-1), "-1s")
381     self.assertEqual(utils.FormatSeconds(-282), "-282s")
382     self.assertEqual(utils.FormatSeconds(-29119), "-29119s")
383
384   def testFloat(self):
385     self.assertEqual(utils.FormatSeconds(1.3), "1s")
386     self.assertEqual(utils.FormatSeconds(1.9), "2s")
387     self.assertEqual(utils.FormatSeconds(3912.12311), "1h 5m 12s")
388     self.assertEqual(utils.FormatSeconds(3912.8), "1h 5m 13s")
389
390
391 class TestLineSplitter(unittest.TestCase):
392   def test(self):
393     lines = []
394     ls = utils.LineSplitter(lines.append)
395     ls.write("Hello World\n")
396     self.assertEqual(lines, [])
397     ls.write("Foo\n Bar\r\n ")
398     ls.write("Baz")
399     ls.write("Moo")
400     self.assertEqual(lines, [])
401     ls.flush()
402     self.assertEqual(lines, ["Hello World", "Foo", " Bar"])
403     ls.close()
404     self.assertEqual(lines, ["Hello World", "Foo", " Bar", " BazMoo"])
405
406   def _testExtra(self, line, all_lines, p1, p2):
407     self.assertEqual(p1, 999)
408     self.assertEqual(p2, "extra")
409     all_lines.append(line)
410
411   def testExtraArgsNoFlush(self):
412     lines = []
413     ls = utils.LineSplitter(self._testExtra, lines, 999, "extra")
414     ls.write("\n\nHello World\n")
415     ls.write("Foo\n Bar\r\n ")
416     ls.write("")
417     ls.write("Baz")
418     ls.write("Moo\n\nx\n")
419     self.assertEqual(lines, [])
420     ls.close()
421     self.assertEqual(lines, ["", "", "Hello World", "Foo", " Bar", " BazMoo",
422                              "", "x"])
423
424
425 class TestIsValidShellParam(unittest.TestCase):
426   def test(self):
427     for val, result in [
428       ("abc", True),
429       ("ab;cd", False),
430       ]:
431       self.assertEqual(utils.IsValidShellParam(val), result)
432
433
434 class TestBuildShellCmd(unittest.TestCase):
435   def test(self):
436     self.assertRaises(errors.ProgrammerError, utils.BuildShellCmd,
437                       "ls %s", "ab;cd")
438     self.assertEqual(utils.BuildShellCmd("ls %s", "ab"), "ls ab")
439
440
441 class TestOrdinal(unittest.TestCase):
442   def test(self):
443     checks = {
444       0: "0th", 1: "1st", 2: "2nd", 3: "3rd", 4: "4th", 5: "5th", 6: "6th",
445       7: "7th", 8: "8th", 9: "9th", 10: "10th", 11: "11th", 12: "12th",
446       13: "13th", 14: "14th", 15: "15th", 16: "16th", 17: "17th",
447       18: "18th", 19: "19th", 20: "20th", 21: "21st", 25: "25th", 30: "30th",
448       32: "32nd", 40: "40th", 50: "50th", 55: "55th", 60: "60th", 62: "62nd",
449       70: "70th", 80: "80th", 83: "83rd", 90: "90th", 91: "91st",
450       582: "582nd", 999: "999th",
451       }
452
453     for value, ordinal in checks.items():
454       self.assertEqual(utils.FormatOrdinal(value), ordinal)
455
456
457 if __name__ == "__main__":
458   testutils.GanetiTestProgram()