Modify utils.TcpPing to make source address optional
[ganeti-local] / test / ganeti.utils_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for unittesting the utils module"""
23
24 import unittest
25 import os
26 import time
27 import tempfile
28 import os.path
29 import os
30 import md5
31 import socket
32 import shutil
33 import re
34
35 import ganeti
36 from ganeti import constants
37 from ganeti import utils
38 from ganeti.utils import IsProcessAlive, Lock, Unlock, RunCmd, \
39      RemoveFile, CheckDict, MatchNameComponent, FormatUnit, \
40      ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
41      ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
42      SetEtcHostsEntry, RemoveEtcHostsEntry
43 from ganeti.errors import LockError, UnitParseError
44
45
46 class GanetiTestCase(unittest.TestCase):
47   def assertFileContent(self, file_name, content):
48     """Checks the content of a file.
49
50     """
51     handle = open(file_name, 'r')
52     try:
53       self.assertEqual(handle.read(), content)
54     finally:
55       handle.close()
56
57
58 class TestIsProcessAlive(unittest.TestCase):
59   """Testing case for IsProcessAlive"""
60   def setUp(self):
61     # create a zombie and a (hopefully) non-existing process id
62     self.pid_zombie = os.fork()
63     if self.pid_zombie == 0:
64       os._exit(0)
65     elif self.pid_zombie < 0:
66       raise SystemError("can't fork")
67     self.pid_non_existing = os.fork()
68     if self.pid_non_existing == 0:
69       os._exit(0)
70     elif self.pid_non_existing > 0:
71       os.waitpid(self.pid_non_existing, 0)
72     else:
73       raise SystemError("can't fork")
74
75
76   def testExists(self):
77     mypid = os.getpid()
78     self.assert_(IsProcessAlive(mypid),
79                  "can't find myself running")
80
81   def testZombie(self):
82     self.assert_(not IsProcessAlive(self.pid_zombie),
83                  "zombie not detected as zombie")
84
85
86   def testNotExisting(self):
87     self.assert_(not IsProcessAlive(self.pid_non_existing),
88                  "noexisting process detected")
89
90
91 class TestLocking(unittest.TestCase):
92   """Testing case for the Lock/Unlock functions"""
93
94   def setUp(self):
95     lock_dir = tempfile.mkdtemp(prefix="ganeti.unittest.",
96                                 suffix=".locking")
97     self.old_lock_dir = constants.LOCK_DIR
98     constants.LOCK_DIR = lock_dir
99
100   def tearDown(self):
101     try:
102       ganeti.utils.Unlock("unittest")
103     except LockError:
104       pass
105     shutil.rmtree(constants.LOCK_DIR, ignore_errors=True)
106     constants.LOCK_DIR = self.old_lock_dir
107
108   def clean_lock(self, name):
109     try:
110       ganeti.utils.Unlock("unittest")
111     except LockError:
112       pass
113
114
115   def testLock(self):
116     self.clean_lock("unittest")
117     self.assertEqual(None, Lock("unittest"))
118
119
120   def testUnlock(self):
121     self.clean_lock("unittest")
122     ganeti.utils.Lock("unittest")
123     self.assertEqual(None, Unlock("unittest"))
124
125   def testDoubleLock(self):
126     self.clean_lock("unittest")
127     ganeti.utils.Lock("unittest")
128     self.assertRaises(LockError, Lock, "unittest")
129
130
131 class TestRunCmd(unittest.TestCase):
132   """Testing case for the RunCmd function"""
133
134   def setUp(self):
135     self.magic = time.ctime() + " ganeti test"
136
137   def testOk(self):
138     """Test successful exit code"""
139     result = RunCmd("/bin/sh -c 'exit 0'")
140     self.assertEqual(result.exit_code, 0)
141
142   def testFail(self):
143     """Test fail exit code"""
144     result = RunCmd("/bin/sh -c 'exit 1'")
145     self.assertEqual(result.exit_code, 1)
146
147
148   def testStdout(self):
149     """Test standard output"""
150     cmd = 'echo -n "%s"' % self.magic
151     result = RunCmd("/bin/sh -c '%s'" % cmd)
152     self.assertEqual(result.stdout, self.magic)
153
154
155   def testStderr(self):
156     """Test standard error"""
157     cmd = 'echo -n "%s"' % self.magic
158     result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
159     self.assertEqual(result.stderr, self.magic)
160
161
162   def testCombined(self):
163     """Test combined output"""
164     cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
165     result = RunCmd("/bin/sh -c '%s'" % cmd)
166     self.assertEqual(result.output, "A" + self.magic + "B" + self.magic)
167
168   def testSignal(self):
169     """Test standard error"""
170     result = RunCmd("/bin/sh -c 'kill -15 $$'")
171     self.assertEqual(result.signal, 15)
172
173   def testListRun(self):
174     """Test list runs"""
175     result = RunCmd(["true"])
176     self.assertEqual(result.signal, None)
177     self.assertEqual(result.exit_code, 0)
178     result = RunCmd(["/bin/sh", "-c", "exit 1"])
179     self.assertEqual(result.signal, None)
180     self.assertEqual(result.exit_code, 1)
181     result = RunCmd(["echo", "-n", self.magic])
182     self.assertEqual(result.signal, None)
183     self.assertEqual(result.exit_code, 0)
184     self.assertEqual(result.stdout, self.magic)
185
186   def testLang(self):
187     """Test locale environment"""
188     old_env = os.environ.copy()
189     try:
190       os.environ["LANG"] = "en_US.UTF-8"
191       os.environ["LC_ALL"] = "en_US.UTF-8"
192       result = RunCmd(["locale"])
193       for line in result.output.splitlines():
194         key, value = line.split("=", 1)
195         # Ignore these variables, they're overridden by LC_ALL
196         if key == "LANG" or key == "LANGUAGE":
197           continue
198         self.failIf(value and value != "C" and value != '"C"',
199             "Variable %s is set to the invalid value '%s'" % (key, value))
200     finally:
201       os.environ = old_env
202
203
204 class TestRemoveFile(unittest.TestCase):
205   """Test case for the RemoveFile function"""
206
207   def setUp(self):
208     """Create a temp dir and file for each case"""
209     self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
210     fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
211     os.close(fd)
212
213   def tearDown(self):
214     if os.path.exists(self.tmpfile):
215       os.unlink(self.tmpfile)
216     os.rmdir(self.tmpdir)
217
218
219   def testIgnoreDirs(self):
220     """Test that RemoveFile() ignores directories"""
221     self.assertEqual(None, RemoveFile(self.tmpdir))
222
223
224   def testIgnoreNotExisting(self):
225     """Test that RemoveFile() ignores non-existing files"""
226     RemoveFile(self.tmpfile)
227     RemoveFile(self.tmpfile)
228
229
230   def testRemoveFile(self):
231     """Test that RemoveFile does remove a file"""
232     RemoveFile(self.tmpfile)
233     if os.path.exists(self.tmpfile):
234       self.fail("File '%s' not removed" % self.tmpfile)
235
236
237   def testRemoveSymlink(self):
238     """Test that RemoveFile does remove symlinks"""
239     symlink = self.tmpdir + "/symlink"
240     os.symlink("no-such-file", symlink)
241     RemoveFile(symlink)
242     if os.path.exists(symlink):
243       self.fail("File '%s' not removed" % symlink)
244     os.symlink(self.tmpfile, symlink)
245     RemoveFile(symlink)
246     if os.path.exists(symlink):
247       self.fail("File '%s' not removed" % symlink)
248
249
250 class TestCheckdict(unittest.TestCase):
251   """Test case for the CheckDict function"""
252
253   def testAdd(self):
254     """Test that CheckDict adds a missing key with the correct value"""
255
256     tgt = {'a':1}
257     tmpl = {'b': 2}
258     CheckDict(tgt, tmpl)
259     if 'b' not in tgt or tgt['b'] != 2:
260       self.fail("Failed to update dict")
261
262
263   def testNoUpdate(self):
264     """Test that CheckDict does not overwrite an existing key"""
265     tgt = {'a':1, 'b': 3}
266     tmpl = {'b': 2}
267     CheckDict(tgt, tmpl)
268     self.failUnlessEqual(tgt['b'], 3)
269
270
271 class TestMatchNameComponent(unittest.TestCase):
272   """Test case for the MatchNameComponent function"""
273
274   def testEmptyList(self):
275     """Test that there is no match against an empty list"""
276
277     self.failUnlessEqual(MatchNameComponent("", []), None)
278     self.failUnlessEqual(MatchNameComponent("test", []), None)
279
280   def testSingleMatch(self):
281     """Test that a single match is performed correctly"""
282     mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
283     for key in "test2", "test2.example", "test2.example.com":
284       self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
285
286   def testMultipleMatches(self):
287     """Test that a multiple match is returned as None"""
288     mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
289     for key in "test1", "test1.example":
290       self.failUnlessEqual(MatchNameComponent(key, mlist), None)
291
292
293 class TestFormatUnit(unittest.TestCase):
294   """Test case for the FormatUnit function"""
295
296   def testMiB(self):
297     self.assertEqual(FormatUnit(1), '1M')
298     self.assertEqual(FormatUnit(100), '100M')
299     self.assertEqual(FormatUnit(1023), '1023M')
300
301   def testGiB(self):
302     self.assertEqual(FormatUnit(1024), '1.0G')
303     self.assertEqual(FormatUnit(1536), '1.5G')
304     self.assertEqual(FormatUnit(17133), '16.7G')
305     self.assertEqual(FormatUnit(1024 * 1024 - 1), '1024.0G')
306
307   def testTiB(self):
308     self.assertEqual(FormatUnit(1024 * 1024), '1.0T')
309     self.assertEqual(FormatUnit(5120 * 1024), '5.0T')
310     self.assertEqual(FormatUnit(29829 * 1024), '29.1T')
311
312
313 class TestParseUnit(unittest.TestCase):
314   """Test case for the ParseUnit function"""
315
316   SCALES = (('', 1),
317             ('M', 1), ('G', 1024), ('T', 1024 * 1024),
318             ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
319             ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
320
321   def testRounding(self):
322     self.assertEqual(ParseUnit('0'), 0)
323     self.assertEqual(ParseUnit('1'), 4)
324     self.assertEqual(ParseUnit('2'), 4)
325     self.assertEqual(ParseUnit('3'), 4)
326
327     self.assertEqual(ParseUnit('124'), 124)
328     self.assertEqual(ParseUnit('125'), 128)
329     self.assertEqual(ParseUnit('126'), 128)
330     self.assertEqual(ParseUnit('127'), 128)
331     self.assertEqual(ParseUnit('128'), 128)
332     self.assertEqual(ParseUnit('129'), 132)
333     self.assertEqual(ParseUnit('130'), 132)
334
335   def testFloating(self):
336     self.assertEqual(ParseUnit('0'), 0)
337     self.assertEqual(ParseUnit('0.5'), 4)
338     self.assertEqual(ParseUnit('1.75'), 4)
339     self.assertEqual(ParseUnit('1.99'), 4)
340     self.assertEqual(ParseUnit('2.00'), 4)
341     self.assertEqual(ParseUnit('2.01'), 4)
342     self.assertEqual(ParseUnit('3.99'), 4)
343     self.assertEqual(ParseUnit('4.00'), 4)
344     self.assertEqual(ParseUnit('4.01'), 8)
345     self.assertEqual(ParseUnit('1.5G'), 1536)
346     self.assertEqual(ParseUnit('1.8G'), 1844)
347     self.assertEqual(ParseUnit('8.28T'), 8682212)
348
349   def testSuffixes(self):
350     for sep in ('', ' ', '   ', "\t", "\t "):
351       for suffix, scale in TestParseUnit.SCALES:
352         for func in (lambda x: x, str.lower, str.upper):
353           self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
354                            1024 * scale)
355
356   def testInvalidInput(self):
357     for sep in ('-', '_', ',', 'a'):
358       for suffix, _ in TestParseUnit.SCALES:
359         self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
360
361     for suffix, _ in TestParseUnit.SCALES:
362       self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
363
364
365 class TestSshKeys(GanetiTestCase):
366   """Test case for the AddAuthorizedKey function"""
367
368   KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
369   KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
370            'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
371
372   def setUp(self):
373     (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test')
374     try:
375       handle = os.fdopen(fd, 'w')
376       try:
377         handle.write("%s\n" % TestSshKeys.KEY_A)
378         handle.write("%s\n" % TestSshKeys.KEY_B)
379       finally:
380         handle.close()
381     except:
382       utils.RemoveFile(self.tmpname)
383       raise
384
385   def tearDown(self):
386     utils.RemoveFile(self.tmpname)
387     del self.tmpname
388
389   def testAddingNewKey(self):
390     AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
391
392     self.assertFileContent(self.tmpname,
393       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
394       'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
395       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
396       "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
397
398   def testAddingAlmostButNotCompletelyTheSameKey(self):
399     AddAuthorizedKey(self.tmpname,
400         'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
401
402     self.assertFileContent(self.tmpname,
403       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
404       'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
405       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
406       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
407
408   def testAddingExistingKeyWithSomeMoreSpaces(self):
409     AddAuthorizedKey(self.tmpname,
410         'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
411
412     self.assertFileContent(self.tmpname,
413       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
414       'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
415       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
416
417   def testRemovingExistingKeyWithSomeMoreSpaces(self):
418     RemoveAuthorizedKey(self.tmpname,
419         'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
420
421     self.assertFileContent(self.tmpname,
422       'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
423       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
424
425   def testRemovingNonExistingKey(self):
426     RemoveAuthorizedKey(self.tmpname,
427         'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
428
429     self.assertFileContent(self.tmpname,
430       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
431       'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
432       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
433
434
435 class TestEtcHosts(GanetiTestCase):
436   """Test functions modifying /etc/hosts"""
437
438   def setUp(self):
439     (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test')
440     try:
441       handle = os.fdopen(fd, 'w')
442       try:
443         handle.write('# This is a test file for /etc/hosts\n')
444         handle.write('127.0.0.1\tlocalhost\n')
445         handle.write('192.168.1.1 router gw\n')
446       finally:
447         handle.close()
448     except:
449       utils.RemoveFile(self.tmpname)
450       raise
451
452   def tearDown(self):
453     utils.RemoveFile(self.tmpname)
454     del self.tmpname
455
456   def testSettingNewIp(self):
457     SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
458
459     self.assertFileContent(self.tmpname,
460       "# This is a test file for /etc/hosts\n"
461       "127.0.0.1\tlocalhost\n"
462       "192.168.1.1 router gw\n"
463       "1.2.3.4\tmyhost.domain.tld myhost\n")
464
465   def testSettingExistingIp(self):
466     SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld',
467                      ['myhost'])
468
469     self.assertFileContent(self.tmpname,
470       "# This is a test file for /etc/hosts\n"
471       "127.0.0.1\tlocalhost\n"
472       "192.168.1.1\tmyhost.domain.tld myhost\n")
473
474   def testSettingDuplicateName(self):
475     SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost'])
476
477     self.assertFileContent(self.tmpname,
478       "# This is a test file for /etc/hosts\n"
479       "127.0.0.1\tlocalhost\n"
480       "192.168.1.1 router gw\n"
481       "1.2.3.4\tmyhost\n")
482
483   def testRemovingExistingHost(self):
484     RemoveEtcHostsEntry(self.tmpname, 'router')
485
486     self.assertFileContent(self.tmpname,
487       "# This is a test file for /etc/hosts\n"
488       "127.0.0.1\tlocalhost\n"
489       "192.168.1.1 gw\n")
490
491   def testRemovingSingleExistingHost(self):
492     RemoveEtcHostsEntry(self.tmpname, 'localhost')
493
494     self.assertFileContent(self.tmpname,
495       "# This is a test file for /etc/hosts\n"
496       "192.168.1.1 router gw\n")
497
498   def testRemovingNonExistingHost(self):
499     RemoveEtcHostsEntry(self.tmpname, 'myhost')
500
501     self.assertFileContent(self.tmpname,
502       "# This is a test file for /etc/hosts\n"
503       "127.0.0.1\tlocalhost\n"
504       "192.168.1.1 router gw\n")
505
506   def testRemovingAlias(self):
507     RemoveEtcHostsEntry(self.tmpname, 'gw')
508
509     self.assertFileContent(self.tmpname,
510       "# This is a test file for /etc/hosts\n"
511       "127.0.0.1\tlocalhost\n"
512       "192.168.1.1 router\n")
513
514
515 class TestShellQuoting(unittest.TestCase):
516   """Test case for shell quoting functions"""
517
518   def testShellQuote(self):
519     self.assertEqual(ShellQuote('abc'), "abc")
520     self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
521     self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
522     self.assertEqual(ShellQuote("a b c"), "'a b c'")
523     self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
524
525   def testShellQuoteArgs(self):
526     self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
527     self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
528     self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
529
530
531 class TestTcpPing(unittest.TestCase):
532   """Testcase for TCP version of ping - against listen(2)ing port"""
533
534   def setUp(self):
535     self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
536     self.listener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
537     self.listenerport = self.listener.getsockname()[1]
538     self.listener.listen(1)
539
540   def tearDown(self):
541     self.listener.shutdown(socket.SHUT_RDWR)
542     del self.listener
543     del self.listenerport
544
545   def testTcpPingToLocalHostAccept(self):
546     self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
547                          self.listenerport,
548                          timeout=10,
549                          live_port_needed=True,
550                          source=constants.LOCALHOST_IP_ADDRESS,
551                          ),
552                  "failed to connect to test listener")
553
554     self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
555                          self.listenerport,
556                          timeout=10,
557                          live_port_needed=True,
558                          ),
559                  "failed to connect to test listener (no source)")
560
561
562 class TestTcpPingDeaf(unittest.TestCase):
563   """Testcase for TCP version of ping - against non listen(2)ing port"""
564
565   def setUp(self):
566     self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
567     self.deaflistener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
568     self.deaflistenerport = self.deaflistener.getsockname()[1]
569
570   def tearDown(self):
571     del self.deaflistener
572     del self.deaflistenerport
573
574   def testTcpPingToLocalHostAcceptDeaf(self):
575     self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
576                         self.deaflistenerport,
577                         timeout=constants.TCP_PING_TIMEOUT,
578                         live_port_needed=True,
579                         source=constants.LOCALHOST_IP_ADDRESS,
580                         ), # need successful connect(2)
581                 "successfully connected to deaf listener")
582
583     self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
584                         self.deaflistenerport,
585                         timeout=constants.TCP_PING_TIMEOUT,
586                         live_port_needed=True,
587                         ), # need successful connect(2)
588                 "successfully connected to deaf listener (no source addr)")
589
590   def testTcpPingToLocalHostNoAccept(self):
591     self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
592                          self.deaflistenerport,
593                          timeout=constants.TCP_PING_TIMEOUT,
594                          live_port_needed=False,
595                          source=constants.LOCALHOST_IP_ADDRESS,
596                          ), # ECONNREFUSED is OK
597                  "failed to ping alive host on deaf port")
598
599     self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
600                          self.deaflistenerport,
601                          timeout=constants.TCP_PING_TIMEOUT,
602                          live_port_needed=False,
603                          ), # ECONNREFUSED is OK
604                  "failed to ping alive host on deaf port (no source addr)")
605
606
607 class TestListVisibleFiles(unittest.TestCase):
608   """Test case for ListVisibleFiles"""
609
610   def setUp(self):
611     self.path = tempfile.mkdtemp()
612
613   def tearDown(self):
614     shutil.rmtree(self.path)
615
616   def _test(self, files, expected):
617     # Sort a copy
618     expected = expected[:]
619     expected.sort()
620
621     for name in files:
622       f = open(os.path.join(self.path, name), 'w')
623       try:
624         f.write("Test\n")
625       finally:
626         f.close()
627
628     found = ListVisibleFiles(self.path)
629     found.sort()
630
631     self.assertEqual(found, expected)
632
633   def testAllVisible(self):
634     files = ["a", "b", "c"]
635     expected = files
636     self._test(files, expected)
637
638   def testNoneVisible(self):
639     files = [".a", ".b", ".c"]
640     expected = []
641     self._test(files, expected)
642
643   def testSomeVisible(self):
644     files = ["a", "b", ".c"]
645     expected = ["a", "b"]
646     self._test(files, expected)
647
648
649 class TestNewUUID(unittest.TestCase):
650   """Test case for NewUUID"""
651
652   _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
653                         '[a-f0-9]{4}-[a-f0-9]{12}$')
654
655   def runTest(self):
656     self.failUnless(self._re_uuid.match(utils.NewUUID()))
657
658
659 class TestUniqueSequence(unittest.TestCase):
660   """Test case for UniqueSequence"""
661
662   def _test(self, input, expected):
663     self.assertEqual(utils.UniqueSequence(input), expected)
664
665   def runTest(self):
666     # Ordered input
667     self._test([1, 2, 3], [1, 2, 3])
668     self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
669     self._test([1, 2, 2, 3], [1, 2, 3])
670     self._test([1, 2, 3, 3], [1, 2, 3])
671
672     # Unordered input
673     self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
674     self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])
675
676     # Strings
677     self._test(["a", "a"], ["a"])
678     self._test(["a", "b"], ["a", "b"])
679     self._test(["a", "b", "a"], ["a", "b"])
680
681
682 if __name__ == '__main__':
683   unittest.main()