Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.utils.text_unittest.py @ b39d17b1

History | View | Annotate | Download (16.2 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for testing ganeti.utils.text"""
23

    
24
import re
25
import string
26
import time
27
import unittest
28
import os
29

    
30
from cStringIO import StringIO
31

    
32
from ganeti import constants
33
from ganeti import utils
34
from ganeti import errors
35

    
36
import testutils
37

    
38

    
39
class TestMatchNameComponent(unittest.TestCase):
40
  """Test case for the MatchNameComponent function"""
41

    
42
  def testEmptyList(self):
43
    """Test that there is no match against an empty list"""
44
    self.failUnlessEqual(utils.MatchNameComponent("", []), None)
45
    self.failUnlessEqual(utils.MatchNameComponent("test", []), None)
46

    
47
  def testSingleMatch(self):
48
    """Test that a single match is performed correctly"""
49
    mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
50
    for key in "test2", "test2.example", "test2.example.com":
51
      self.failUnlessEqual(utils.MatchNameComponent(key, mlist), mlist[1])
52

    
53
  def testMultipleMatches(self):
54
    """Test that a multiple match is returned as None"""
55
    mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
56
    for key in "test1", "test1.example":
57
      self.failUnlessEqual(utils.MatchNameComponent(key, mlist), None)
58

    
59
  def testFullMatch(self):
60
    """Test that a full match is returned correctly"""
61
    key1 = "test1"
62
    key2 = "test1.example"
63
    mlist = [key2, key2 + ".com"]
64
    self.failUnlessEqual(utils.MatchNameComponent(key1, mlist), None)
65
    self.failUnlessEqual(utils.MatchNameComponent(key2, mlist), key2)
66

    
67
  def testCaseInsensitivePartialMatch(self):
68
    """Test for the case_insensitive keyword"""
69
    mlist = ["test1.example.com", "test2.example.net"]
70
    self.assertEqual(utils.MatchNameComponent("test2", mlist,
71
                                              case_sensitive=False),
72
                     "test2.example.net")
73
    self.assertEqual(utils.MatchNameComponent("Test2", mlist,
74
                                              case_sensitive=False),
75
                     "test2.example.net")
76
    self.assertEqual(utils.MatchNameComponent("teSt2", mlist,
77
                                              case_sensitive=False),
78
                     "test2.example.net")
79
    self.assertEqual(utils.MatchNameComponent("TeSt2", mlist,
80
                                              case_sensitive=False),
81
                     "test2.example.net")
82

    
83
  def testCaseInsensitiveFullMatch(self):
84
    mlist = ["ts1.ex", "ts1.ex.org", "ts2.ex", "Ts2.ex"]
85

    
86
    # Between the two ts1 a full string match non-case insensitive should work
87
    self.assertEqual(utils.MatchNameComponent("Ts1", mlist,
88
                                              case_sensitive=False),
89
                     None)
90
    self.assertEqual(utils.MatchNameComponent("Ts1.ex", mlist,
91
                                              case_sensitive=False),
92
                     "ts1.ex")
93
    self.assertEqual(utils.MatchNameComponent("ts1.ex", mlist,
94
                                              case_sensitive=False),
95
                     "ts1.ex")
96

    
97
    # Between the two ts2 only case differs, so only case-match works
98
    self.assertEqual(utils.MatchNameComponent("ts2.ex", mlist,
99
                                              case_sensitive=False),
100
                     "ts2.ex")
101
    self.assertEqual(utils.MatchNameComponent("Ts2.ex", mlist,
102
                                              case_sensitive=False),
103
                     "Ts2.ex")
104
    self.assertEqual(utils.MatchNameComponent("TS2.ex", mlist,
105
                                              case_sensitive=False),
106
                     None)
107

    
108

    
109
class TestFormatUnit(unittest.TestCase):
110
  """Test case for the FormatUnit function"""
111

    
112
  def testMiB(self):
113
    self.assertEqual(utils.FormatUnit(1, "h"), "1M")
114
    self.assertEqual(utils.FormatUnit(100, "h"), "100M")
115
    self.assertEqual(utils.FormatUnit(1023, "h"), "1023M")
116

    
117
    self.assertEqual(utils.FormatUnit(1, "m"), "1")
118
    self.assertEqual(utils.FormatUnit(100, "m"), "100")
119
    self.assertEqual(utils.FormatUnit(1023, "m"), "1023")
120

    
121
    self.assertEqual(utils.FormatUnit(1024, "m"), "1024")
122
    self.assertEqual(utils.FormatUnit(1536, "m"), "1536")
123
    self.assertEqual(utils.FormatUnit(17133, "m"), "17133")
124
    self.assertEqual(utils.FormatUnit(1024 * 1024 - 1, "m"), "1048575")
125

    
126
  def testGiB(self):
127
    self.assertEqual(utils.FormatUnit(1024, "h"), "1.0G")
128
    self.assertEqual(utils.FormatUnit(1536, "h"), "1.5G")
129
    self.assertEqual(utils.FormatUnit(17133, "h"), "16.7G")
130
    self.assertEqual(utils.FormatUnit(1024 * 1024 - 1, "h"), "1024.0G")
131

    
132
    self.assertEqual(utils.FormatUnit(1024, "g"), "1.0")
133
    self.assertEqual(utils.FormatUnit(1536, "g"), "1.5")
134
    self.assertEqual(utils.FormatUnit(17133, "g"), "16.7")
135
    self.assertEqual(utils.FormatUnit(1024 * 1024 - 1, "g"), "1024.0")
136

    
137
    self.assertEqual(utils.FormatUnit(1024 * 1024, "g"), "1024.0")
138
    self.assertEqual(utils.FormatUnit(5120 * 1024, "g"), "5120.0")
139
    self.assertEqual(utils.FormatUnit(29829 * 1024, "g"), "29829.0")
140

    
141
  def testTiB(self):
142
    self.assertEqual(utils.FormatUnit(1024 * 1024, "h"), "1.0T")
143
    self.assertEqual(utils.FormatUnit(5120 * 1024, "h"), "5.0T")
144
    self.assertEqual(utils.FormatUnit(29829 * 1024, "h"), "29.1T")
145

    
146
    self.assertEqual(utils.FormatUnit(1024 * 1024, "t"), "1.0")
147
    self.assertEqual(utils.FormatUnit(5120 * 1024, "t"), "5.0")
148
    self.assertEqual(utils.FormatUnit(29829 * 1024, "t"), "29.1")
149

    
150
  def testErrors(self):
151
    self.assertRaises(errors.ProgrammerError, utils.FormatUnit, 1, "a")
152

    
153

    
154
class TestParseUnit(unittest.TestCase):
155
  """Test case for the ParseUnit function"""
156

    
157
  SCALES = (("", 1),
158
            ("M", 1), ("G", 1024), ("T", 1024 * 1024),
159
            ("MB", 1), ("GB", 1024), ("TB", 1024 * 1024),
160
            ("MiB", 1), ("GiB", 1024), ("TiB", 1024 * 1024))
161

    
162
  def testRounding(self):
163
    self.assertEqual(utils.ParseUnit("0"), 0)
164
    self.assertEqual(utils.ParseUnit("1"), 4)
165
    self.assertEqual(utils.ParseUnit("2"), 4)
166
    self.assertEqual(utils.ParseUnit("3"), 4)
167

    
168
    self.assertEqual(utils.ParseUnit("124"), 124)
169
    self.assertEqual(utils.ParseUnit("125"), 128)
170
    self.assertEqual(utils.ParseUnit("126"), 128)
171
    self.assertEqual(utils.ParseUnit("127"), 128)
172
    self.assertEqual(utils.ParseUnit("128"), 128)
173
    self.assertEqual(utils.ParseUnit("129"), 132)
174
    self.assertEqual(utils.ParseUnit("130"), 132)
175

    
176
  def testFloating(self):
177
    self.assertEqual(utils.ParseUnit("0"), 0)
178
    self.assertEqual(utils.ParseUnit("0.5"), 4)
179
    self.assertEqual(utils.ParseUnit("1.75"), 4)
180
    self.assertEqual(utils.ParseUnit("1.99"), 4)
181
    self.assertEqual(utils.ParseUnit("2.00"), 4)
182
    self.assertEqual(utils.ParseUnit("2.01"), 4)
183
    self.assertEqual(utils.ParseUnit("3.99"), 4)
184
    self.assertEqual(utils.ParseUnit("4.00"), 4)
185
    self.assertEqual(utils.ParseUnit("4.01"), 8)
186
    self.assertEqual(utils.ParseUnit("1.5G"), 1536)
187
    self.assertEqual(utils.ParseUnit("1.8G"), 1844)
188
    self.assertEqual(utils.ParseUnit("8.28T"), 8682212)
189

    
190
  def testSuffixes(self):
191
    for sep in ("", " ", "   ", "\t", "\t "):
192
      for suffix, scale in self.SCALES:
193
        for func in (lambda x: x, str.lower, str.upper):
194
          self.assertEqual(utils.ParseUnit("1024" + sep + func(suffix)),
195
                           1024 * scale)
196

    
197
  def testInvalidInput(self):
198
    for sep in ("-", "_", ",", "a"):
199
      for suffix, _ in self.SCALES:
200
        self.assertRaises(errors.UnitParseError, utils.ParseUnit,
201
                          "1" + sep + suffix)
202

    
203
    for suffix, _ in self.SCALES:
204
      self.assertRaises(errors.UnitParseError, utils.ParseUnit,
205
                        "1,3" + suffix)
206

    
207

    
208
class TestShellQuoting(unittest.TestCase):
209
  """Test case for shell quoting functions"""
210

    
211
  def testShellQuote(self):
212
    self.assertEqual(utils.ShellQuote('abc'), "abc")
213
    self.assertEqual(utils.ShellQuote('ab"c'), "'ab\"c'")
214
    self.assertEqual(utils.ShellQuote("a'bc"), "'a'\\''bc'")
215
    self.assertEqual(utils.ShellQuote("a b c"), "'a b c'")
216
    self.assertEqual(utils.ShellQuote("a b\\ c"), "'a b\\ c'")
217

    
218
  def testShellQuoteArgs(self):
219
    self.assertEqual(utils.ShellQuoteArgs(['a', 'b', 'c']), "a b c")
220
    self.assertEqual(utils.ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
221
    self.assertEqual(utils.ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
222

    
223

    
224
class TestShellWriter(unittest.TestCase):
225
  def test(self):
226
    buf = StringIO()
227
    sw = utils.ShellWriter(buf)
228
    sw.Write("#!/bin/bash")
229
    sw.Write("if true; then")
230
    sw.IncIndent()
231
    try:
232
      sw.Write("echo true")
233

    
234
      sw.Write("for i in 1 2 3")
235
      sw.Write("do")
236
      sw.IncIndent()
237
      try:
238
        self.assertEqual(sw._indent, 2)
239
        sw.Write("date")
240
      finally:
241
        sw.DecIndent()
242
      sw.Write("done")
243
    finally:
244
      sw.DecIndent()
245
    sw.Write("echo %s", utils.ShellQuote("Hello World"))
246
    sw.Write("exit 0")
247

    
248
    self.assertEqual(sw._indent, 0)
249

    
250
    output = buf.getvalue()
251

    
252
    self.assert_(output.endswith("\n"))
253

    
254
    lines = output.splitlines()
255
    self.assertEqual(len(lines), 9)
256
    self.assertEqual(lines[0], "#!/bin/bash")
257
    self.assert_(re.match(r"^\s+date$", lines[5]))
258
    self.assertEqual(lines[7], "echo 'Hello World'")
259

    
260
  def testEmpty(self):
261
    buf = StringIO()
262
    sw = utils.ShellWriter(buf)
263
    sw = None
264
    self.assertEqual(buf.getvalue(), "")
265

    
266

    
267
class TestNormalizeAndValidateMac(unittest.TestCase):
268
  def testInvalid(self):
269
    self.assertRaises(errors.OpPrereqError,
270
                      utils.NormalizeAndValidateMac, "xxx")
271

    
272
  def testNormalization(self):
273
    for mac in ["aa:bb:cc:dd:ee:ff", "00:AA:11:bB:22:cc"]:
274
      self.assertEqual(utils.NormalizeAndValidateMac(mac), mac.lower())
275

    
276

    
277
class TestSafeEncode(unittest.TestCase):
278
  """Test case for SafeEncode"""
279

    
280
  def testAscii(self):
281
    for txt in [string.digits, string.letters, string.punctuation]:
282
      self.failUnlessEqual(txt, utils.SafeEncode(txt))
283

    
284
  def testDoubleEncode(self):
285
    for i in range(255):
286
      txt = utils.SafeEncode(chr(i))
287
      self.failUnlessEqual(txt, utils.SafeEncode(txt))
288

    
289
  def testUnicode(self):
290
    # 1024 is high enough to catch non-direct ASCII mappings
291
    for i in range(1024):
292
      txt = utils.SafeEncode(unichr(i))
293
      self.failUnlessEqual(txt, utils.SafeEncode(txt))
294

    
295

    
296
class TestUnescapeAndSplit(unittest.TestCase):
297
  """Testing case for UnescapeAndSplit"""
298

    
299
  def setUp(self):
300
    # testing more that one separator for regexp safety
301
    self._seps = [",", "+", ".", ":"]
302

    
303
  def testSimple(self):
304
    a = ["a", "b", "c", "d"]
305
    for sep in self._seps:
306
      self.failUnlessEqual(utils.UnescapeAndSplit(sep.join(a), sep=sep), a)
307

    
308
  def testEscape(self):
309
    for sep in self._seps:
310
      a = ["a", "b\\" + sep + "c", "d"]
311
      b = ["a", "b" + sep + "c", "d"]
312
      self.failUnlessEqual(utils.UnescapeAndSplit(sep.join(a), sep=sep), b)
313

    
314
  def testDoubleEscape(self):
315
    for sep in self._seps:
316
      a = ["a", "b\\\\", "c", "d"]
317
      b = ["a", "b\\", "c", "d"]
318
      self.failUnlessEqual(utils.UnescapeAndSplit(sep.join(a), sep=sep), b)
319

    
320
  def testThreeEscape(self):
321
    for sep in self._seps:
322
      a = ["a", "b\\\\\\" + sep + "c", "d"]
323
      b = ["a", "b\\" + sep + "c", "d"]
324
      self.failUnlessEqual(utils.UnescapeAndSplit(sep.join(a), sep=sep), b)
325

    
326
  def testEscapeAtEnd(self):
327
    for sep in self._seps:
328
      self.assertEqual(utils.UnescapeAndSplit("\\", sep=sep), ["\\"])
329

    
330
      a = ["a", "b\\", "c"]
331
      b = ["a", "b" + sep + "c\\"]
332
      self.assertEqual(utils.UnescapeAndSplit("%s\\" % sep.join(a), sep=sep), b)
333

    
334
      a = ["\\" + sep, "\\" + sep, "c", "d\\.moo"]
335
      b = [sep, sep, "c", "d.moo\\"]
336
      self.assertEqual(utils.UnescapeAndSplit("%s\\" % sep.join(a), sep=sep), b)
337

    
338
  def testMultipleEscapes(self):
339
    for sep in self._seps:
340
      a = ["a", "b\\" + sep + "c", "d\\" + sep + "e\\" + sep + "f", "g"]
341
      b = ["a", "b" + sep + "c", "d" + sep + "e" + sep + "f", "g"]
342
      self.failUnlessEqual(utils.UnescapeAndSplit(sep.join(a), sep=sep), b)
343

    
344

    
345
class TestCommaJoin(unittest.TestCase):
346
  def test(self):
347
    self.assertEqual(utils.CommaJoin([]), "")
348
    self.assertEqual(utils.CommaJoin([1, 2, 3]), "1, 2, 3")
349
    self.assertEqual(utils.CommaJoin(["Hello"]), "Hello")
350
    self.assertEqual(utils.CommaJoin(["Hello", "World"]), "Hello, World")
351
    self.assertEqual(utils.CommaJoin(["Hello", "World", 99]),
352
                     "Hello, World, 99")
353

    
354

    
355
class TestFormatTime(unittest.TestCase):
356
  """Testing case for FormatTime"""
357

    
358
  @staticmethod
359
  def _TestInProcess(tz, timestamp, expected):
360
    os.environ["TZ"] = tz
361
    time.tzset()
362
    return utils.FormatTime(timestamp) == expected
363

    
364
  def _Test(self, *args):
365
    # Need to use separate process as we want to change TZ
366
    self.assert_(utils.RunInSeparateProcess(self._TestInProcess, *args))
367

    
368
  def test(self):
369
    self._Test("UTC", 0, "1970-01-01 00:00:00")
370
    self._Test("America/Sao_Paulo", 1292606926, "2010-12-17 15:28:46")
371
    self._Test("Europe/London", 1292606926, "2010-12-17 17:28:46")
372
    self._Test("Europe/Zurich", 1292606926, "2010-12-17 18:28:46")
373
    self._Test("Australia/Sydney", 1292606926, "2010-12-18 04:28:46")
374

    
375
  def testNone(self):
376
    self.failUnlessEqual(utils.FormatTime(None), "N/A")
377

    
378
  def testInvalid(self):
379
    self.failUnlessEqual(utils.FormatTime(()), "N/A")
380

    
381
  def testNow(self):
382
    # tests that we accept time.time input
383
    utils.FormatTime(time.time())
384
    # tests that we accept int input
385
    utils.FormatTime(int(time.time()))
386

    
387

    
388
class TestFormatSeconds(unittest.TestCase):
389
  def test(self):
390
    self.assertEqual(utils.FormatSeconds(1), "1s")
391
    self.assertEqual(utils.FormatSeconds(3600), "1h 0m 0s")
392
    self.assertEqual(utils.FormatSeconds(3599), "59m 59s")
393
    self.assertEqual(utils.FormatSeconds(7200), "2h 0m 0s")
394
    self.assertEqual(utils.FormatSeconds(7201), "2h 0m 1s")
395
    self.assertEqual(utils.FormatSeconds(7281), "2h 1m 21s")
396
    self.assertEqual(utils.FormatSeconds(29119), "8h 5m 19s")
397
    self.assertEqual(utils.FormatSeconds(19431228), "224d 21h 33m 48s")
398
    self.assertEqual(utils.FormatSeconds(-1), "-1s")
399
    self.assertEqual(utils.FormatSeconds(-282), "-282s")
400
    self.assertEqual(utils.FormatSeconds(-29119), "-29119s")
401

    
402
  def testFloat(self):
403
    self.assertEqual(utils.FormatSeconds(1.3), "1s")
404
    self.assertEqual(utils.FormatSeconds(1.9), "2s")
405
    self.assertEqual(utils.FormatSeconds(3912.12311), "1h 5m 12s")
406
    self.assertEqual(utils.FormatSeconds(3912.8), "1h 5m 13s")
407

    
408

    
409
class TestLineSplitter(unittest.TestCase):
410
  def test(self):
411
    lines = []
412
    ls = utils.LineSplitter(lines.append)
413
    ls.write("Hello World\n")
414
    self.assertEqual(lines, [])
415
    ls.write("Foo\n Bar\r\n ")
416
    ls.write("Baz")
417
    ls.write("Moo")
418
    self.assertEqual(lines, [])
419
    ls.flush()
420
    self.assertEqual(lines, ["Hello World", "Foo", " Bar"])
421
    ls.close()
422
    self.assertEqual(lines, ["Hello World", "Foo", " Bar", " BazMoo"])
423

    
424
  def _testExtra(self, line, all_lines, p1, p2):
425
    self.assertEqual(p1, 999)
426
    self.assertEqual(p2, "extra")
427
    all_lines.append(line)
428

    
429
  def testExtraArgsNoFlush(self):
430
    lines = []
431
    ls = utils.LineSplitter(self._testExtra, lines, 999, "extra")
432
    ls.write("\n\nHello World\n")
433
    ls.write("Foo\n Bar\r\n ")
434
    ls.write("")
435
    ls.write("Baz")
436
    ls.write("Moo\n\nx\n")
437
    self.assertEqual(lines, [])
438
    ls.close()
439
    self.assertEqual(lines, ["", "", "Hello World", "Foo", " Bar", " BazMoo",
440
                             "", "x"])
441

    
442

    
443
class TestIsValidShellParam(unittest.TestCase):
444
  def test(self):
445
    for val, result in [
446
      ("abc", True),
447
      ("ab;cd", False),
448
      ]:
449
      self.assertEqual(utils.IsValidShellParam(val), result)
450

    
451

    
452
class TestBuildShellCmd(unittest.TestCase):
453
  def test(self):
454
    self.assertRaises(errors.ProgrammerError, utils.BuildShellCmd,
455
                      "ls %s", "ab;cd")
456
    self.assertEqual(utils.BuildShellCmd("ls %s", "ab"), "ls ab")
457

    
458

    
459
if __name__ == "__main__":
460
  testutils.GanetiTestProgram()