560c7aca7b78bcf89bcdfbae9d31131cd3701336
[ganeti-local] / test / ganeti.utils_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for unittesting the utils module"""
23
24 import unittest
25 import os
26 import time
27 import tempfile
28 import os.path
29 import os
30 import md5
31 import socket
32 import shutil
33 import re
34
35 import ganeti
36 from ganeti import constants
37 from ganeti import utils
38 from ganeti.utils import IsProcessAlive, Lock, Unlock, RunCmd, \
39      RemoveFile, CheckDict, MatchNameComponent, FormatUnit, \
40      ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
41      ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
42      SetEtcHostsEntry, RemoveEtcHostsEntry
43 from ganeti.errors import LockError, UnitParseError
44
45
46 class GanetiTestCase(unittest.TestCase):
47   def assertFileContent(self, file_name, content):
48     """Checks the content of a file.
49
50     """
51     handle = open(file_name, 'r')
52     try:
53       self.assertEqual(handle.read(), content)
54     finally:
55       handle.close()
56
57
58 class TestIsProcessAlive(unittest.TestCase):
59   """Testing case for IsProcessAlive"""
60   def setUp(self):
61     # create a zombie and a (hopefully) non-existing process id
62     self.pid_zombie = os.fork()
63     if self.pid_zombie == 0:
64       os._exit(0)
65     elif self.pid_zombie < 0:
66       raise SystemError("can't fork")
67     self.pid_non_existing = os.fork()
68     if self.pid_non_existing == 0:
69       os._exit(0)
70     elif self.pid_non_existing > 0:
71       os.waitpid(self.pid_non_existing, 0)
72     else:
73       raise SystemError("can't fork")
74
75
76   def testExists(self):
77     mypid = os.getpid()
78     self.assert_(IsProcessAlive(mypid),
79                  "can't find myself running")
80
81   def testZombie(self):
82     self.assert_(not IsProcessAlive(self.pid_zombie),
83                  "zombie not detected as zombie")
84
85
86   def testNotExisting(self):
87     self.assert_(not IsProcessAlive(self.pid_non_existing),
88                  "noexisting process detected")
89
90
91 class TestLocking(unittest.TestCase):
92   """Testing case for the Lock/Unlock functions"""
93
94   def setUp(self):
95     lock_dir = tempfile.mkdtemp(prefix="ganeti.unittest.",
96                                 suffix=".locking")
97     self.old_lock_dir = constants.LOCK_DIR
98     constants.LOCK_DIR = lock_dir
99
100   def tearDown(self):
101     try:
102       ganeti.utils.Unlock("unittest")
103     except LockError:
104       pass
105     shutil.rmtree(constants.LOCK_DIR, ignore_errors=True)
106     constants.LOCK_DIR = self.old_lock_dir
107
108   def clean_lock(self, name):
109     try:
110       ganeti.utils.Unlock("unittest")
111     except LockError:
112       pass
113
114
115   def testLock(self):
116     self.clean_lock("unittest")
117     self.assertEqual(None, Lock("unittest"))
118
119
120   def testUnlock(self):
121     self.clean_lock("unittest")
122     ganeti.utils.Lock("unittest")
123     self.assertEqual(None, Unlock("unittest"))
124
125   def testDoubleLock(self):
126     self.clean_lock("unittest")
127     ganeti.utils.Lock("unittest")
128     self.assertRaises(LockError, Lock, "unittest")
129
130
131 class TestRunCmd(unittest.TestCase):
132   """Testing case for the RunCmd function"""
133
134   def setUp(self):
135     self.magic = time.ctime() + " ganeti test"
136
137   def testOk(self):
138     """Test successful exit code"""
139     result = RunCmd("/bin/sh -c 'exit 0'")
140     self.assertEqual(result.exit_code, 0)
141
142   def testFail(self):
143     """Test fail exit code"""
144     result = RunCmd("/bin/sh -c 'exit 1'")
145     self.assertEqual(result.exit_code, 1)
146
147
148   def testStdout(self):
149     """Test standard output"""
150     cmd = 'echo -n "%s"' % self.magic
151     result = RunCmd("/bin/sh -c '%s'" % cmd)
152     self.assertEqual(result.stdout, self.magic)
153
154
155   def testStderr(self):
156     """Test standard error"""
157     cmd = 'echo -n "%s"' % self.magic
158     result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
159     self.assertEqual(result.stderr, self.magic)
160
161
162   def testCombined(self):
163     """Test combined output"""
164     cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
165     result = RunCmd("/bin/sh -c '%s'" % cmd)
166     self.assertEqual(result.output, "A" + self.magic + "B" + self.magic)
167
168   def testSignal(self):
169     """Test standard error"""
170     result = RunCmd("/bin/sh -c 'kill -15 $$'")
171     self.assertEqual(result.signal, 15)
172
173   def testListRun(self):
174     """Test list runs"""
175     result = RunCmd(["true"])
176     self.assertEqual(result.signal, None)
177     self.assertEqual(result.exit_code, 0)
178     result = RunCmd(["/bin/sh", "-c", "exit 1"])
179     self.assertEqual(result.signal, None)
180     self.assertEqual(result.exit_code, 1)
181     result = RunCmd(["echo", "-n", self.magic])
182     self.assertEqual(result.signal, None)
183     self.assertEqual(result.exit_code, 0)
184     self.assertEqual(result.stdout, self.magic)
185
186   def testLang(self):
187     """Test locale environment"""
188     old_env = os.environ.copy()
189     try:
190       os.environ["LANG"] = "en_US.UTF-8"
191       os.environ["LC_ALL"] = "en_US.UTF-8"
192       result = RunCmd(["locale"])
193       for line in result.output.splitlines():
194         key, value = line.split("=", 1)
195         # Ignore these variables, they're overridden by LC_ALL
196         if key == "LANG" or key == "LANGUAGE":
197           continue
198         self.failIf(value and value != "C" and value != '"C"',
199             "Variable %s is set to the invalid value '%s'" % (key, value))
200     finally:
201       os.environ = old_env
202
203
204 class TestRemoveFile(unittest.TestCase):
205   """Test case for the RemoveFile function"""
206
207   def setUp(self):
208     """Create a temp dir and file for each case"""
209     self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
210     fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
211     os.close(fd)
212
213   def tearDown(self):
214     if os.path.exists(self.tmpfile):
215       os.unlink(self.tmpfile)
216     os.rmdir(self.tmpdir)
217
218
219   def testIgnoreDirs(self):
220     """Test that RemoveFile() ignores directories"""
221     self.assertEqual(None, RemoveFile(self.tmpdir))
222
223
224   def testIgnoreNotExisting(self):
225     """Test that RemoveFile() ignores non-existing files"""
226     RemoveFile(self.tmpfile)
227     RemoveFile(self.tmpfile)
228
229
230   def testRemoveFile(self):
231     """Test that RemoveFile does remove a file"""
232     RemoveFile(self.tmpfile)
233     if os.path.exists(self.tmpfile):
234       self.fail("File '%s' not removed" % self.tmpfile)
235
236
237   def testRemoveSymlink(self):
238     """Test that RemoveFile does remove symlinks"""
239     symlink = self.tmpdir + "/symlink"
240     os.symlink("no-such-file", symlink)
241     RemoveFile(symlink)
242     if os.path.exists(symlink):
243       self.fail("File '%s' not removed" % symlink)
244     os.symlink(self.tmpfile, symlink)
245     RemoveFile(symlink)
246     if os.path.exists(symlink):
247       self.fail("File '%s' not removed" % symlink)
248
249
250 class TestCheckdict(unittest.TestCase):
251   """Test case for the CheckDict function"""
252
253   def testAdd(self):
254     """Test that CheckDict adds a missing key with the correct value"""
255
256     tgt = {'a':1}
257     tmpl = {'b': 2}
258     CheckDict(tgt, tmpl)
259     if 'b' not in tgt or tgt['b'] != 2:
260       self.fail("Failed to update dict")
261
262
263   def testNoUpdate(self):
264     """Test that CheckDict does not overwrite an existing key"""
265     tgt = {'a':1, 'b': 3}
266     tmpl = {'b': 2}
267     CheckDict(tgt, tmpl)
268     self.failUnlessEqual(tgt['b'], 3)
269
270
271 class TestMatchNameComponent(unittest.TestCase):
272   """Test case for the MatchNameComponent function"""
273
274   def testEmptyList(self):
275     """Test that there is no match against an empty list"""
276
277     self.failUnlessEqual(MatchNameComponent("", []), None)
278     self.failUnlessEqual(MatchNameComponent("test", []), None)
279
280   def testSingleMatch(self):
281     """Test that a single match is performed correctly"""
282     mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
283     for key in "test2", "test2.example", "test2.example.com":
284       self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
285
286   def testMultipleMatches(self):
287     """Test that a multiple match is returned as None"""
288     mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
289     for key in "test1", "test1.example":
290       self.failUnlessEqual(MatchNameComponent(key, mlist), None)
291
292
293 class TestFormatUnit(unittest.TestCase):
294   """Test case for the FormatUnit function"""
295
296   def testMiB(self):
297     self.assertEqual(FormatUnit(1), '1M')
298     self.assertEqual(FormatUnit(100), '100M')
299     self.assertEqual(FormatUnit(1023), '1023M')
300
301   def testGiB(self):
302     self.assertEqual(FormatUnit(1024), '1.0G')
303     self.assertEqual(FormatUnit(1536), '1.5G')
304     self.assertEqual(FormatUnit(17133), '16.7G')
305     self.assertEqual(FormatUnit(1024 * 1024 - 1), '1024.0G')
306
307   def testTiB(self):
308     self.assertEqual(FormatUnit(1024 * 1024), '1.0T')
309     self.assertEqual(FormatUnit(5120 * 1024), '5.0T')
310     self.assertEqual(FormatUnit(29829 * 1024), '29.1T')
311
312
313 class TestParseUnit(unittest.TestCase):
314   """Test case for the ParseUnit function"""
315
316   SCALES = (('', 1),
317             ('M', 1), ('G', 1024), ('T', 1024 * 1024),
318             ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
319             ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
320
321   def testRounding(self):
322     self.assertEqual(ParseUnit('0'), 0)
323     self.assertEqual(ParseUnit('1'), 4)
324     self.assertEqual(ParseUnit('2'), 4)
325     self.assertEqual(ParseUnit('3'), 4)
326
327     self.assertEqual(ParseUnit('124'), 124)
328     self.assertEqual(ParseUnit('125'), 128)
329     self.assertEqual(ParseUnit('126'), 128)
330     self.assertEqual(ParseUnit('127'), 128)
331     self.assertEqual(ParseUnit('128'), 128)
332     self.assertEqual(ParseUnit('129'), 132)
333     self.assertEqual(ParseUnit('130'), 132)
334
335   def testFloating(self):
336     self.assertEqual(ParseUnit('0'), 0)
337     self.assertEqual(ParseUnit('0.5'), 4)
338     self.assertEqual(ParseUnit('1.75'), 4)
339     self.assertEqual(ParseUnit('1.99'), 4)
340     self.assertEqual(ParseUnit('2.00'), 4)
341     self.assertEqual(ParseUnit('2.01'), 4)
342     self.assertEqual(ParseUnit('3.99'), 4)
343     self.assertEqual(ParseUnit('4.00'), 4)
344     self.assertEqual(ParseUnit('4.01'), 8)
345     self.assertEqual(ParseUnit('1.5G'), 1536)
346     self.assertEqual(ParseUnit('1.8G'), 1844)
347     self.assertEqual(ParseUnit('8.28T'), 8682212)
348
349   def testSuffixes(self):
350     for sep in ('', ' ', '   ', "\t", "\t "):
351       for suffix, scale in TestParseUnit.SCALES:
352         for func in (lambda x: x, str.lower, str.upper):
353           self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
354                            1024 * scale)
355
356   def testInvalidInput(self):
357     for sep in ('-', '_', ',', 'a'):
358       for suffix, _ in TestParseUnit.SCALES:
359         self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
360
361     for suffix, _ in TestParseUnit.SCALES:
362       self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
363
364
365 class TestSshKeys(GanetiTestCase):
366   """Test case for the AddAuthorizedKey function"""
367
368   KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
369   KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
370            'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
371
372   def setUp(self):
373     (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test')
374     try:
375       handle = os.fdopen(fd, 'w')
376       try:
377         handle.write("%s\n" % TestSshKeys.KEY_A)
378         handle.write("%s\n" % TestSshKeys.KEY_B)
379       finally:
380         handle.close()
381     except:
382       utils.RemoveFile(self.tmpname)
383       raise
384
385   def tearDown(self):
386     utils.RemoveFile(self.tmpname)
387     del self.tmpname
388
389   def testAddingNewKey(self):
390     AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
391
392     self.assertFileContent(self.tmpname,
393       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
394       'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
395       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
396       "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
397
398   def testAddingAlmostButNotCompletelyTheSameKey(self):
399     AddAuthorizedKey(self.tmpname,
400         'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
401
402     self.assertFileContent(self.tmpname,
403       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
404       'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
405       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
406       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
407
408   def testAddingExistingKeyWithSomeMoreSpaces(self):
409     AddAuthorizedKey(self.tmpname,
410         'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
411
412     self.assertFileContent(self.tmpname,
413       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
414       'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
415       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
416
417   def testRemovingExistingKeyWithSomeMoreSpaces(self):
418     RemoveAuthorizedKey(self.tmpname,
419         'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
420
421     self.assertFileContent(self.tmpname,
422       'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
423       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
424
425   def testRemovingNonExistingKey(self):
426     RemoveAuthorizedKey(self.tmpname,
427         'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
428
429     self.assertFileContent(self.tmpname,
430       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
431       'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
432       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
433
434
435 class TestEtcHosts(GanetiTestCase):
436   """Test functions modifying /etc/hosts"""
437
438   def setUp(self):
439     (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test')
440     try:
441       handle = os.fdopen(fd, 'w')
442       try:
443         handle.write('# This is a test file for /etc/hosts\n')
444         handle.write('127.0.0.1\tlocalhost\n')
445         handle.write('192.168.1.1 router gw\n')
446       finally:
447         handle.close()
448     except:
449       utils.RemoveFile(self.tmpname)
450       raise
451
452   def tearDown(self):
453     utils.RemoveFile(self.tmpname)
454     del self.tmpname
455
456   def testSettingNewIp(self):
457     SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
458
459     self.assertFileContent(self.tmpname,
460       "# This is a test file for /etc/hosts\n"
461       "127.0.0.1\tlocalhost\n"
462       "192.168.1.1 router gw\n"
463       "1.2.3.4\tmyhost.domain.tld myhost\n")
464
465   def testSettingExistingIp(self):
466     SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld',
467                      ['myhost'])
468
469     self.assertFileContent(self.tmpname,
470       "# This is a test file for /etc/hosts\n"
471       "127.0.0.1\tlocalhost\n"
472       "192.168.1.1\tmyhost.domain.tld myhost\n")
473
474   def testSettingDuplicateName(self):
475     SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost'])
476
477     self.assertFileContent(self.tmpname,
478       "# This is a test file for /etc/hosts\n"
479       "127.0.0.1\tlocalhost\n"
480       "192.168.1.1 router gw\n"
481       "1.2.3.4\tmyhost\n")
482
483   def testRemovingExistingHost(self):
484     RemoveEtcHostsEntry(self.tmpname, 'router')
485
486     self.assertFileContent(self.tmpname,
487       "# This is a test file for /etc/hosts\n"
488       "127.0.0.1\tlocalhost\n"
489       "192.168.1.1 gw\n")
490
491   def testRemovingSingleExistingHost(self):
492     RemoveEtcHostsEntry(self.tmpname, 'localhost')
493
494     self.assertFileContent(self.tmpname,
495       "# This is a test file for /etc/hosts\n"
496       "192.168.1.1 router gw\n")
497
498   def testRemovingNonExistingHost(self):
499     RemoveEtcHostsEntry(self.tmpname, 'myhost')
500
501     self.assertFileContent(self.tmpname,
502       "# This is a test file for /etc/hosts\n"
503       "127.0.0.1\tlocalhost\n"
504       "192.168.1.1 router gw\n")
505
506   def testRemovingAlias(self):
507     RemoveEtcHostsEntry(self.tmpname, 'gw')
508
509     self.assertFileContent(self.tmpname,
510       "# This is a test file for /etc/hosts\n"
511       "127.0.0.1\tlocalhost\n"
512       "192.168.1.1 router\n")
513
514
515 class TestShellQuoting(unittest.TestCase):
516   """Test case for shell quoting functions"""
517
518   def testShellQuote(self):
519     self.assertEqual(ShellQuote('abc'), "abc")
520     self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
521     self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
522     self.assertEqual(ShellQuote("a b c"), "'a b c'")
523     self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
524
525   def testShellQuoteArgs(self):
526     self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
527     self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
528     self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
529
530
531 class TestTcpPing(unittest.TestCase):
532   """Testcase for TCP version of ping - against listen(2)ing port"""
533
534   def setUp(self):
535     self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
536     self.listener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
537     self.listenerport = self.listener.getsockname()[1]
538     self.listener.listen(1)
539
540   def tearDown(self):
541     self.listener.shutdown(socket.SHUT_RDWR)
542     del self.listener
543     del self.listenerport
544
545   def testTcpPingToLocalHostAccept(self):
546     self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
547                          constants.LOCALHOST_IP_ADDRESS,
548                          self.listenerport,
549                          timeout=10,
550                          live_port_needed=True),
551                  "failed to connect to test listener")
552
553
554 class TestTcpPingDeaf(unittest.TestCase):
555   """Testcase for TCP version of ping - against non listen(2)ing port"""
556
557   def setUp(self):
558     self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
559     self.deaflistener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
560     self.deaflistenerport = self.deaflistener.getsockname()[1]
561
562   def tearDown(self):
563     del self.deaflistener
564     del self.deaflistenerport
565
566   def testTcpPingToLocalHostAcceptDeaf(self):
567     self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
568                         constants.LOCALHOST_IP_ADDRESS,
569                         self.deaflistenerport,
570                         timeout=constants.TCP_PING_TIMEOUT,
571                         live_port_needed=True), # need successful connect(2)
572                 "successfully connected to deaf listener")
573
574   def testTcpPingToLocalHostNoAccept(self):
575     self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
576                          constants.LOCALHOST_IP_ADDRESS,
577                          self.deaflistenerport,
578                          timeout=constants.TCP_PING_TIMEOUT,
579                          live_port_needed=False), # ECONNREFUSED is OK
580                  "failed to ping alive host on deaf port")
581
582
583 class TestListVisibleFiles(unittest.TestCase):
584   """Test case for ListVisibleFiles"""
585
586   def setUp(self):
587     self.path = tempfile.mkdtemp()
588
589   def tearDown(self):
590     shutil.rmtree(self.path)
591
592   def _test(self, files, expected):
593     # Sort a copy
594     expected = expected[:]
595     expected.sort()
596
597     for name in files:
598       f = open(os.path.join(self.path, name), 'w')
599       try:
600         f.write("Test\n")
601       finally:
602         f.close()
603
604     found = ListVisibleFiles(self.path)
605     found.sort()
606
607     self.assertEqual(found, expected)
608
609   def testAllVisible(self):
610     files = ["a", "b", "c"]
611     expected = files
612     self._test(files, expected)
613
614   def testNoneVisible(self):
615     files = [".a", ".b", ".c"]
616     expected = []
617     self._test(files, expected)
618
619   def testSomeVisible(self):
620     files = ["a", "b", ".c"]
621     expected = ["a", "b"]
622     self._test(files, expected)
623
624
625 class TestNewUUID(unittest.TestCase):
626   """Test case for NewUUID"""
627
628   _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
629                         '[a-f0-9]{4}-[a-f0-9]{12}$')
630
631   def runTest(self):
632     self.failUnless(self._re_uuid.match(utils.NewUUID()))
633
634
635 class TestUniqueSequence(unittest.TestCase):
636   """Test case for UniqueSequence"""
637
638   def _test(self, input, expected):
639     self.assertEqual(utils.UniqueSequence(input), expected)
640
641   def runTest(self):
642     # Ordered input
643     self._test([1, 2, 3], [1, 2, 3])
644     self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
645     self._test([1, 2, 2, 3], [1, 2, 3])
646     self._test([1, 2, 3, 3], [1, 2, 3])
647
648     # Unordered input
649     self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
650     self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])
651
652     # Strings
653     self._test(["a", "a"], ["a"])
654     self._test(["a", "b"], ["a", "b"])
655     self._test(["a", "b", "a"], ["a", "b"])
656
657
658 if __name__ == '__main__':
659   unittest.main()