Add utils unittests for new functions
[ganeti-local] / test / ganeti.utils_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for unittesting the utils module"""
23
24 import unittest
25 import os
26 import time
27 import tempfile
28 import os.path
29 import os
30 import md5
31 import signal
32 import socket
33 import shutil
34 import re
35 import tempfile
36
37 import ganeti
38 import testutils
39 from ganeti import constants
40 from ganeti import utils
41 from ganeti.utils import IsProcessAlive, RunCmd, \
42      RemoveFile, CheckDict, MatchNameComponent, FormatUnit, \
43      ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
44      ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
45      SetEtcHostsEntry, RemoveEtcHostsEntry, FirstFree
46 from ganeti.errors import LockError, UnitParseError
47
48 def _ChildHandler(signal, stack):
49   global _ChildFlag
50   _ChildFlag = True
51
52 class TestIsProcessAlive(unittest.TestCase):
53   """Testing case for IsProcessAlive"""
54
55   def setUp(self):
56     global _ChildFlag
57     # create a (most probably) non-existing process-id
58     self.pid_non_existing = os.fork()
59     if self.pid_non_existing == 0:
60       os._exit(0)
61     elif self.pid_non_existing > 0:
62       os.waitpid(self.pid_non_existing, 0)
63     else:
64       raise SystemError("can't fork")
65     _ChildFlag = False
66     # Use _ChildHandler for SIGCHLD
67     self.chldOrig = signal.signal(signal.SIGCHLD, _ChildHandler)
68     # create a zombie
69     self.pid_zombie = os.fork()
70     if self.pid_zombie == 0:
71       os._exit(0)
72     elif self.pid_zombie < 0:
73       raise SystemError("can't fork")
74
75   def tearDown(self):
76     signal.signal(signal.SIGCHLD, self.chldOrig)
77
78   def testExists(self):
79     mypid = os.getpid()
80     self.assert_(IsProcessAlive(mypid),
81                  "can't find myself running")
82
83   def testZombie(self):
84     global _ChildFlag
85     timeout = 10
86
87     while not _ChildFlag:
88       if timeout >= 0:
89         time.sleep(0.2)
90         timeout -= 0.2
91       else:
92         self.fail("timed out waiting for child's signal")
93         break # not executed...
94
95     self.assert_(not IsProcessAlive(self.pid_zombie),
96                  "zombie not detected as zombie")
97
98   def testNotExisting(self):
99     self.assert_(not IsProcessAlive(self.pid_non_existing),
100                  "noexisting process detected")
101
102 class TestPidFileFunctions(unittest.TestCase):
103   """Tests for WritePidFile, RemovePidFile and IsPidFileAlive"""
104
105   def setUp(self):
106     self.dir = tempfile.mkdtemp()
107     self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
108     utils._DaemonPidFileName = self.f_dpn
109
110   def testPidFileFunctions(self):
111     utils.WritePidFile('test')
112     self.assert_(os.path.exists(self.f_dpn('test')))
113     self.assert_(utils.IsPidFileAlive(self.f_dpn('test')))
114     utils.RemovePidFile('test')
115     self.assert_(not os.path.exists(self.f_dpn('test')))
116
117   def tearDown(self):
118     os.rmdir(self.dir)
119
120
121 class TestRunCmd(unittest.TestCase):
122   """Testing case for the RunCmd function"""
123
124   def setUp(self):
125     self.magic = time.ctime() + " ganeti test"
126
127   def testOk(self):
128     """Test successful exit code"""
129     result = RunCmd("/bin/sh -c 'exit 0'")
130     self.assertEqual(result.exit_code, 0)
131
132   def testFail(self):
133     """Test fail exit code"""
134     result = RunCmd("/bin/sh -c 'exit 1'")
135     self.assertEqual(result.exit_code, 1)
136
137
138   def testStdout(self):
139     """Test standard output"""
140     cmd = 'echo -n "%s"' % self.magic
141     result = RunCmd("/bin/sh -c '%s'" % cmd)
142     self.assertEqual(result.stdout, self.magic)
143
144
145   def testStderr(self):
146     """Test standard error"""
147     cmd = 'echo -n "%s"' % self.magic
148     result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
149     self.assertEqual(result.stderr, self.magic)
150
151
152   def testCombined(self):
153     """Test combined output"""
154     cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
155     result = RunCmd("/bin/sh -c '%s'" % cmd)
156     self.assertEqual(result.output, "A" + self.magic + "B" + self.magic)
157
158   def testSignal(self):
159     """Test signal"""
160     result = RunCmd(["python", "-c", "import os; os.kill(os.getpid(), 15)"])
161     self.assertEqual(result.signal, 15)
162
163   def testListRun(self):
164     """Test list runs"""
165     result = RunCmd(["true"])
166     self.assertEqual(result.signal, None)
167     self.assertEqual(result.exit_code, 0)
168     result = RunCmd(["/bin/sh", "-c", "exit 1"])
169     self.assertEqual(result.signal, None)
170     self.assertEqual(result.exit_code, 1)
171     result = RunCmd(["echo", "-n", self.magic])
172     self.assertEqual(result.signal, None)
173     self.assertEqual(result.exit_code, 0)
174     self.assertEqual(result.stdout, self.magic)
175
176   def testLang(self):
177     """Test locale environment"""
178     old_env = os.environ.copy()
179     try:
180       os.environ["LANG"] = "en_US.UTF-8"
181       os.environ["LC_ALL"] = "en_US.UTF-8"
182       result = RunCmd(["locale"])
183       for line in result.output.splitlines():
184         key, value = line.split("=", 1)
185         # Ignore these variables, they're overridden by LC_ALL
186         if key == "LANG" or key == "LANGUAGE":
187           continue
188         self.failIf(value and value != "C" and value != '"C"',
189             "Variable %s is set to the invalid value '%s'" % (key, value))
190     finally:
191       os.environ = old_env
192
193
194 class TestRemoveFile(unittest.TestCase):
195   """Test case for the RemoveFile function"""
196
197   def setUp(self):
198     """Create a temp dir and file for each case"""
199     self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
200     fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
201     os.close(fd)
202
203   def tearDown(self):
204     if os.path.exists(self.tmpfile):
205       os.unlink(self.tmpfile)
206     os.rmdir(self.tmpdir)
207
208
209   def testIgnoreDirs(self):
210     """Test that RemoveFile() ignores directories"""
211     self.assertEqual(None, RemoveFile(self.tmpdir))
212
213
214   def testIgnoreNotExisting(self):
215     """Test that RemoveFile() ignores non-existing files"""
216     RemoveFile(self.tmpfile)
217     RemoveFile(self.tmpfile)
218
219
220   def testRemoveFile(self):
221     """Test that RemoveFile does remove a file"""
222     RemoveFile(self.tmpfile)
223     if os.path.exists(self.tmpfile):
224       self.fail("File '%s' not removed" % self.tmpfile)
225
226
227   def testRemoveSymlink(self):
228     """Test that RemoveFile does remove symlinks"""
229     symlink = self.tmpdir + "/symlink"
230     os.symlink("no-such-file", symlink)
231     RemoveFile(symlink)
232     if os.path.exists(symlink):
233       self.fail("File '%s' not removed" % symlink)
234     os.symlink(self.tmpfile, symlink)
235     RemoveFile(symlink)
236     if os.path.exists(symlink):
237       self.fail("File '%s' not removed" % symlink)
238
239
240 class TestCheckdict(unittest.TestCase):
241   """Test case for the CheckDict function"""
242
243   def testAdd(self):
244     """Test that CheckDict adds a missing key with the correct value"""
245
246     tgt = {'a':1}
247     tmpl = {'b': 2}
248     CheckDict(tgt, tmpl)
249     if 'b' not in tgt or tgt['b'] != 2:
250       self.fail("Failed to update dict")
251
252
253   def testNoUpdate(self):
254     """Test that CheckDict does not overwrite an existing key"""
255     tgt = {'a':1, 'b': 3}
256     tmpl = {'b': 2}
257     CheckDict(tgt, tmpl)
258     self.failUnlessEqual(tgt['b'], 3)
259
260
261 class TestMatchNameComponent(unittest.TestCase):
262   """Test case for the MatchNameComponent function"""
263
264   def testEmptyList(self):
265     """Test that there is no match against an empty list"""
266
267     self.failUnlessEqual(MatchNameComponent("", []), None)
268     self.failUnlessEqual(MatchNameComponent("test", []), None)
269
270   def testSingleMatch(self):
271     """Test that a single match is performed correctly"""
272     mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
273     for key in "test2", "test2.example", "test2.example.com":
274       self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
275
276   def testMultipleMatches(self):
277     """Test that a multiple match is returned as None"""
278     mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
279     for key in "test1", "test1.example":
280       self.failUnlessEqual(MatchNameComponent(key, mlist), None)
281
282
283 class TestFormatUnit(unittest.TestCase):
284   """Test case for the FormatUnit function"""
285
286   def testMiB(self):
287     self.assertEqual(FormatUnit(1), '1M')
288     self.assertEqual(FormatUnit(100), '100M')
289     self.assertEqual(FormatUnit(1023), '1023M')
290
291   def testGiB(self):
292     self.assertEqual(FormatUnit(1024), '1.0G')
293     self.assertEqual(FormatUnit(1536), '1.5G')
294     self.assertEqual(FormatUnit(17133), '16.7G')
295     self.assertEqual(FormatUnit(1024 * 1024 - 1), '1024.0G')
296
297   def testTiB(self):
298     self.assertEqual(FormatUnit(1024 * 1024), '1.0T')
299     self.assertEqual(FormatUnit(5120 * 1024), '5.0T')
300     self.assertEqual(FormatUnit(29829 * 1024), '29.1T')
301
302
303 class TestParseUnit(unittest.TestCase):
304   """Test case for the ParseUnit function"""
305
306   SCALES = (('', 1),
307             ('M', 1), ('G', 1024), ('T', 1024 * 1024),
308             ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
309             ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
310
311   def testRounding(self):
312     self.assertEqual(ParseUnit('0'), 0)
313     self.assertEqual(ParseUnit('1'), 4)
314     self.assertEqual(ParseUnit('2'), 4)
315     self.assertEqual(ParseUnit('3'), 4)
316
317     self.assertEqual(ParseUnit('124'), 124)
318     self.assertEqual(ParseUnit('125'), 128)
319     self.assertEqual(ParseUnit('126'), 128)
320     self.assertEqual(ParseUnit('127'), 128)
321     self.assertEqual(ParseUnit('128'), 128)
322     self.assertEqual(ParseUnit('129'), 132)
323     self.assertEqual(ParseUnit('130'), 132)
324
325   def testFloating(self):
326     self.assertEqual(ParseUnit('0'), 0)
327     self.assertEqual(ParseUnit('0.5'), 4)
328     self.assertEqual(ParseUnit('1.75'), 4)
329     self.assertEqual(ParseUnit('1.99'), 4)
330     self.assertEqual(ParseUnit('2.00'), 4)
331     self.assertEqual(ParseUnit('2.01'), 4)
332     self.assertEqual(ParseUnit('3.99'), 4)
333     self.assertEqual(ParseUnit('4.00'), 4)
334     self.assertEqual(ParseUnit('4.01'), 8)
335     self.assertEqual(ParseUnit('1.5G'), 1536)
336     self.assertEqual(ParseUnit('1.8G'), 1844)
337     self.assertEqual(ParseUnit('8.28T'), 8682212)
338
339   def testSuffixes(self):
340     for sep in ('', ' ', '   ', "\t", "\t "):
341       for suffix, scale in TestParseUnit.SCALES:
342         for func in (lambda x: x, str.lower, str.upper):
343           self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
344                            1024 * scale)
345
346   def testInvalidInput(self):
347     for sep in ('-', '_', ',', 'a'):
348       for suffix, _ in TestParseUnit.SCALES:
349         self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
350
351     for suffix, _ in TestParseUnit.SCALES:
352       self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
353
354
355 class TestSshKeys(testutils.GanetiTestCase):
356   """Test case for the AddAuthorizedKey function"""
357
358   KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
359   KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
360            'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
361
362   def setUp(self):
363     (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test')
364     try:
365       handle = os.fdopen(fd, 'w')
366       try:
367         handle.write("%s\n" % TestSshKeys.KEY_A)
368         handle.write("%s\n" % TestSshKeys.KEY_B)
369       finally:
370         handle.close()
371     except:
372       utils.RemoveFile(self.tmpname)
373       raise
374
375   def tearDown(self):
376     utils.RemoveFile(self.tmpname)
377     del self.tmpname
378
379   def testAddingNewKey(self):
380     AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
381
382     self.assertFileContent(self.tmpname,
383       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
384       'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
385       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
386       "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
387
388   def testAddingAlmostButNotCompletelyTheSameKey(self):
389     AddAuthorizedKey(self.tmpname,
390         'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
391
392     self.assertFileContent(self.tmpname,
393       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
394       'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
395       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
396       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
397
398   def testAddingExistingKeyWithSomeMoreSpaces(self):
399     AddAuthorizedKey(self.tmpname,
400         'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
401
402     self.assertFileContent(self.tmpname,
403       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
404       'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
405       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
406
407   def testRemovingExistingKeyWithSomeMoreSpaces(self):
408     RemoveAuthorizedKey(self.tmpname,
409         'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
410
411     self.assertFileContent(self.tmpname,
412       'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
413       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
414
415   def testRemovingNonExistingKey(self):
416     RemoveAuthorizedKey(self.tmpname,
417         'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
418
419     self.assertFileContent(self.tmpname,
420       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
421       'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
422       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
423
424
425 class TestEtcHosts(testutils.GanetiTestCase):
426   """Test functions modifying /etc/hosts"""
427
428   def setUp(self):
429     (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test')
430     try:
431       handle = os.fdopen(fd, 'w')
432       try:
433         handle.write('# This is a test file for /etc/hosts\n')
434         handle.write('127.0.0.1\tlocalhost\n')
435         handle.write('192.168.1.1 router gw\n')
436       finally:
437         handle.close()
438     except:
439       utils.RemoveFile(self.tmpname)
440       raise
441
442   def tearDown(self):
443     utils.RemoveFile(self.tmpname)
444     del self.tmpname
445
446   def testSettingNewIp(self):
447     SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
448
449     self.assertFileContent(self.tmpname,
450       "# This is a test file for /etc/hosts\n"
451       "127.0.0.1\tlocalhost\n"
452       "192.168.1.1 router gw\n"
453       "1.2.3.4\tmyhost.domain.tld myhost\n")
454
455   def testSettingExistingIp(self):
456     SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld',
457                      ['myhost'])
458
459     self.assertFileContent(self.tmpname,
460       "# This is a test file for /etc/hosts\n"
461       "127.0.0.1\tlocalhost\n"
462       "192.168.1.1\tmyhost.domain.tld myhost\n")
463
464   def testSettingDuplicateName(self):
465     SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost'])
466
467     self.assertFileContent(self.tmpname,
468       "# This is a test file for /etc/hosts\n"
469       "127.0.0.1\tlocalhost\n"
470       "192.168.1.1 router gw\n"
471       "1.2.3.4\tmyhost\n")
472
473   def testRemovingExistingHost(self):
474     RemoveEtcHostsEntry(self.tmpname, 'router')
475
476     self.assertFileContent(self.tmpname,
477       "# This is a test file for /etc/hosts\n"
478       "127.0.0.1\tlocalhost\n"
479       "192.168.1.1 gw\n")
480
481   def testRemovingSingleExistingHost(self):
482     RemoveEtcHostsEntry(self.tmpname, 'localhost')
483
484     self.assertFileContent(self.tmpname,
485       "# This is a test file for /etc/hosts\n"
486       "192.168.1.1 router gw\n")
487
488   def testRemovingNonExistingHost(self):
489     RemoveEtcHostsEntry(self.tmpname, 'myhost')
490
491     self.assertFileContent(self.tmpname,
492       "# This is a test file for /etc/hosts\n"
493       "127.0.0.1\tlocalhost\n"
494       "192.168.1.1 router gw\n")
495
496   def testRemovingAlias(self):
497     RemoveEtcHostsEntry(self.tmpname, 'gw')
498
499     self.assertFileContent(self.tmpname,
500       "# This is a test file for /etc/hosts\n"
501       "127.0.0.1\tlocalhost\n"
502       "192.168.1.1 router\n")
503
504
505 class TestShellQuoting(unittest.TestCase):
506   """Test case for shell quoting functions"""
507
508   def testShellQuote(self):
509     self.assertEqual(ShellQuote('abc'), "abc")
510     self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
511     self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
512     self.assertEqual(ShellQuote("a b c"), "'a b c'")
513     self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
514
515   def testShellQuoteArgs(self):
516     self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
517     self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
518     self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
519
520
521 class TestTcpPing(unittest.TestCase):
522   """Testcase for TCP version of ping - against listen(2)ing port"""
523
524   def setUp(self):
525     self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
526     self.listener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
527     self.listenerport = self.listener.getsockname()[1]
528     self.listener.listen(1)
529
530   def tearDown(self):
531     self.listener.shutdown(socket.SHUT_RDWR)
532     del self.listener
533     del self.listenerport
534
535   def testTcpPingToLocalHostAccept(self):
536     self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
537                          self.listenerport,
538                          timeout=10,
539                          live_port_needed=True,
540                          source=constants.LOCALHOST_IP_ADDRESS,
541                          ),
542                  "failed to connect to test listener")
543
544     self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
545                          self.listenerport,
546                          timeout=10,
547                          live_port_needed=True,
548                          ),
549                  "failed to connect to test listener (no source)")
550
551
552 class TestTcpPingDeaf(unittest.TestCase):
553   """Testcase for TCP version of ping - against non listen(2)ing port"""
554
555   def setUp(self):
556     self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
557     self.deaflistener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
558     self.deaflistenerport = self.deaflistener.getsockname()[1]
559
560   def tearDown(self):
561     del self.deaflistener
562     del self.deaflistenerport
563
564   def testTcpPingToLocalHostAcceptDeaf(self):
565     self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
566                         self.deaflistenerport,
567                         timeout=constants.TCP_PING_TIMEOUT,
568                         live_port_needed=True,
569                         source=constants.LOCALHOST_IP_ADDRESS,
570                         ), # need successful connect(2)
571                 "successfully connected to deaf listener")
572
573     self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
574                         self.deaflistenerport,
575                         timeout=constants.TCP_PING_TIMEOUT,
576                         live_port_needed=True,
577                         ), # need successful connect(2)
578                 "successfully connected to deaf listener (no source addr)")
579
580   def testTcpPingToLocalHostNoAccept(self):
581     self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
582                          self.deaflistenerport,
583                          timeout=constants.TCP_PING_TIMEOUT,
584                          live_port_needed=False,
585                          source=constants.LOCALHOST_IP_ADDRESS,
586                          ), # ECONNREFUSED is OK
587                  "failed to ping alive host on deaf port")
588
589     self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
590                          self.deaflistenerport,
591                          timeout=constants.TCP_PING_TIMEOUT,
592                          live_port_needed=False,
593                          ), # ECONNREFUSED is OK
594                  "failed to ping alive host on deaf port (no source addr)")
595
596
597 class TestListVisibleFiles(unittest.TestCase):
598   """Test case for ListVisibleFiles"""
599
600   def setUp(self):
601     self.path = tempfile.mkdtemp()
602
603   def tearDown(self):
604     shutil.rmtree(self.path)
605
606   def _test(self, files, expected):
607     # Sort a copy
608     expected = expected[:]
609     expected.sort()
610
611     for name in files:
612       f = open(os.path.join(self.path, name), 'w')
613       try:
614         f.write("Test\n")
615       finally:
616         f.close()
617
618     found = ListVisibleFiles(self.path)
619     found.sort()
620
621     self.assertEqual(found, expected)
622
623   def testAllVisible(self):
624     files = ["a", "b", "c"]
625     expected = files
626     self._test(files, expected)
627
628   def testNoneVisible(self):
629     files = [".a", ".b", ".c"]
630     expected = []
631     self._test(files, expected)
632
633   def testSomeVisible(self):
634     files = ["a", "b", ".c"]
635     expected = ["a", "b"]
636     self._test(files, expected)
637
638
639 class TestNewUUID(unittest.TestCase):
640   """Test case for NewUUID"""
641
642   _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
643                         '[a-f0-9]{4}-[a-f0-9]{12}$')
644
645   def runTest(self):
646     self.failUnless(self._re_uuid.match(utils.NewUUID()))
647
648
649 class TestUniqueSequence(unittest.TestCase):
650   """Test case for UniqueSequence"""
651
652   def _test(self, input, expected):
653     self.assertEqual(utils.UniqueSequence(input), expected)
654
655   def runTest(self):
656     # Ordered input
657     self._test([1, 2, 3], [1, 2, 3])
658     self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
659     self._test([1, 2, 2, 3], [1, 2, 3])
660     self._test([1, 2, 3, 3], [1, 2, 3])
661
662     # Unordered input
663     self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
664     self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])
665
666     # Strings
667     self._test(["a", "a"], ["a"])
668     self._test(["a", "b"], ["a", "b"])
669     self._test(["a", "b", "a"], ["a", "b"])
670
671 class TestFirstFree(unittest.TestCase):
672   """Test case for the FirstFree function"""
673
674   def test(self):
675     """Test FirstFree"""
676     self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
677     self.failUnlessEqual(FirstFree([]), None)
678     self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
679     self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
680     self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
681
682 if __name__ == '__main__':
683   unittest.main()