Expose utils.DaemonPidFileName
[ganeti-local] / test / ganeti.utils_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for unittesting the utils module"""
23
24 import unittest
25 import os
26 import time
27 import tempfile
28 import os.path
29 import os
30 import md5
31 import signal
32 import socket
33 import shutil
34 import re
35 import tempfile
36
37 import ganeti
38 import testutils
39 from ganeti import constants
40 from ganeti import utils
41 from ganeti.utils import IsProcessAlive, RunCmd, \
42      RemoveFile, CheckDict, MatchNameComponent, FormatUnit, \
43      ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
44      ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
45      SetEtcHostsEntry, RemoveEtcHostsEntry, FirstFree
46 from ganeti.errors import LockError, UnitParseError, GenericError, \
47      ProgrammerError
48
49 def _ChildHandler(signal, stack):
50   global _ChildFlag
51   _ChildFlag = True
52
53
54 class TestIsProcessAlive(unittest.TestCase):
55   """Testing case for IsProcessAlive"""
56
57   def setUp(self):
58     global _ChildFlag
59     # create a (most probably) non-existing process-id
60     self.pid_non_existing = os.fork()
61     if self.pid_non_existing == 0:
62       os._exit(0)
63     elif self.pid_non_existing > 0:
64       os.waitpid(self.pid_non_existing, 0)
65     else:
66       raise SystemError("can't fork")
67     _ChildFlag = False
68     # Use _ChildHandler for SIGCHLD
69     self.chldOrig = signal.signal(signal.SIGCHLD, _ChildHandler)
70     # create a zombie
71     self.pid_zombie = os.fork()
72     if self.pid_zombie == 0:
73       os._exit(0)
74     elif self.pid_zombie < 0:
75       raise SystemError("can't fork")
76
77   def tearDown(self):
78     signal.signal(signal.SIGCHLD, self.chldOrig)
79
80   def testExists(self):
81     mypid = os.getpid()
82     self.assert_(IsProcessAlive(mypid),
83                  "can't find myself running")
84
85   def testZombie(self):
86     global _ChildFlag
87     timeout = 10
88
89     while not _ChildFlag:
90       if timeout >= 0:
91         time.sleep(0.2)
92         timeout -= 0.2
93       else:
94         self.fail("timed out waiting for child's signal")
95         break # not executed...
96
97     self.assert_(not IsProcessAlive(self.pid_zombie),
98                  "zombie not detected as zombie")
99
100   def testNotExisting(self):
101     self.assert_(not IsProcessAlive(self.pid_non_existing),
102                  "noexisting process detected")
103
104
105 class TestPidFileFunctions(unittest.TestCase):
106   """Tests for WritePidFile, RemovePidFile and ReadPidFile"""
107
108   def setUp(self):
109     self.dir = tempfile.mkdtemp()
110     self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
111     utils.DaemonPidFileName = self.f_dpn
112
113   def testPidFileFunctions(self):
114     pid_file = self.f_dpn('test')
115     utils.WritePidFile('test')
116     self.failUnless(os.path.exists(pid_file),
117                     "PID file should have been created")
118     read_pid = utils.ReadPidFile(pid_file)
119     self.failUnlessEqual(read_pid, os.getpid())
120     self.failUnless(utils.IsProcessAlive(read_pid))
121     self.failUnlessRaises(GenericError, utils.WritePidFile, 'test')
122     utils.RemovePidFile('test')
123     self.failIf(os.path.exists(pid_file),
124                 "PID file should not exist anymore")
125     self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
126                          "ReadPidFile should return 0 for missing pid file")
127     fh = open(pid_file, "w")
128     fh.write("blah\n")
129     fh.close()
130     self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
131                          "ReadPidFile should return 0 for invalid pid file")
132     utils.RemovePidFile('test')
133     self.failIf(os.path.exists(pid_file),
134                 "PID file should not exist anymore")
135
136   def testKill(self):
137     pid_file = self.f_dpn('child')
138     r_fd, w_fd = os.pipe()
139     new_pid = os.fork()
140     if new_pid == 0: #child
141       utils.WritePidFile('child')
142       os.write(w_fd, 'a')
143       signal.pause()
144       os._exit(0)
145       return
146     # else we are in the parent
147     # wait until the child has written the pid file
148     os.read(r_fd, 1)
149     read_pid = utils.ReadPidFile(pid_file)
150     self.failUnlessEqual(read_pid, new_pid)
151     self.failUnless(utils.IsProcessAlive(new_pid))
152     utils.KillProcess(new_pid)
153     self.failIf(utils.IsProcessAlive(new_pid))
154     utils.RemovePidFile('child')
155     self.failUnlessRaises(ProgrammerError, utils.KillProcess, 0)
156
157   def tearDown(self):
158     for name in os.listdir(self.dir):
159       os.unlink(os.path.join(self.dir, name))
160     os.rmdir(self.dir)
161
162
163 class TestRunCmd(unittest.TestCase):
164   """Testing case for the RunCmd function"""
165
166   def setUp(self):
167     self.magic = time.ctime() + " ganeti test"
168
169   def testOk(self):
170     """Test successful exit code"""
171     result = RunCmd("/bin/sh -c 'exit 0'")
172     self.assertEqual(result.exit_code, 0)
173
174   def testFail(self):
175     """Test fail exit code"""
176     result = RunCmd("/bin/sh -c 'exit 1'")
177     self.assertEqual(result.exit_code, 1)
178
179
180   def testStdout(self):
181     """Test standard output"""
182     cmd = 'echo -n "%s"' % self.magic
183     result = RunCmd("/bin/sh -c '%s'" % cmd)
184     self.assertEqual(result.stdout, self.magic)
185
186
187   def testStderr(self):
188     """Test standard error"""
189     cmd = 'echo -n "%s"' % self.magic
190     result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
191     self.assertEqual(result.stderr, self.magic)
192
193
194   def testCombined(self):
195     """Test combined output"""
196     cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
197     result = RunCmd("/bin/sh -c '%s'" % cmd)
198     self.assertEqual(result.output, "A" + self.magic + "B" + self.magic)
199
200   def testSignal(self):
201     """Test signal"""
202     result = RunCmd(["python", "-c", "import os; os.kill(os.getpid(), 15)"])
203     self.assertEqual(result.signal, 15)
204
205   def testListRun(self):
206     """Test list runs"""
207     result = RunCmd(["true"])
208     self.assertEqual(result.signal, None)
209     self.assertEqual(result.exit_code, 0)
210     result = RunCmd(["/bin/sh", "-c", "exit 1"])
211     self.assertEqual(result.signal, None)
212     self.assertEqual(result.exit_code, 1)
213     result = RunCmd(["echo", "-n", self.magic])
214     self.assertEqual(result.signal, None)
215     self.assertEqual(result.exit_code, 0)
216     self.assertEqual(result.stdout, self.magic)
217
218   def testLang(self):
219     """Test locale environment"""
220     old_env = os.environ.copy()
221     try:
222       os.environ["LANG"] = "en_US.UTF-8"
223       os.environ["LC_ALL"] = "en_US.UTF-8"
224       result = RunCmd(["locale"])
225       for line in result.output.splitlines():
226         key, value = line.split("=", 1)
227         # Ignore these variables, they're overridden by LC_ALL
228         if key == "LANG" or key == "LANGUAGE":
229           continue
230         self.failIf(value and value != "C" and value != '"C"',
231             "Variable %s is set to the invalid value '%s'" % (key, value))
232     finally:
233       os.environ = old_env
234
235
236 class TestRemoveFile(unittest.TestCase):
237   """Test case for the RemoveFile function"""
238
239   def setUp(self):
240     """Create a temp dir and file for each case"""
241     self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
242     fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
243     os.close(fd)
244
245   def tearDown(self):
246     if os.path.exists(self.tmpfile):
247       os.unlink(self.tmpfile)
248     os.rmdir(self.tmpdir)
249
250
251   def testIgnoreDirs(self):
252     """Test that RemoveFile() ignores directories"""
253     self.assertEqual(None, RemoveFile(self.tmpdir))
254
255
256   def testIgnoreNotExisting(self):
257     """Test that RemoveFile() ignores non-existing files"""
258     RemoveFile(self.tmpfile)
259     RemoveFile(self.tmpfile)
260
261
262   def testRemoveFile(self):
263     """Test that RemoveFile does remove a file"""
264     RemoveFile(self.tmpfile)
265     if os.path.exists(self.tmpfile):
266       self.fail("File '%s' not removed" % self.tmpfile)
267
268
269   def testRemoveSymlink(self):
270     """Test that RemoveFile does remove symlinks"""
271     symlink = self.tmpdir + "/symlink"
272     os.symlink("no-such-file", symlink)
273     RemoveFile(symlink)
274     if os.path.exists(symlink):
275       self.fail("File '%s' not removed" % symlink)
276     os.symlink(self.tmpfile, symlink)
277     RemoveFile(symlink)
278     if os.path.exists(symlink):
279       self.fail("File '%s' not removed" % symlink)
280
281
282 class TestCheckdict(unittest.TestCase):
283   """Test case for the CheckDict function"""
284
285   def testAdd(self):
286     """Test that CheckDict adds a missing key with the correct value"""
287
288     tgt = {'a':1}
289     tmpl = {'b': 2}
290     CheckDict(tgt, tmpl)
291     if 'b' not in tgt or tgt['b'] != 2:
292       self.fail("Failed to update dict")
293
294
295   def testNoUpdate(self):
296     """Test that CheckDict does not overwrite an existing key"""
297     tgt = {'a':1, 'b': 3}
298     tmpl = {'b': 2}
299     CheckDict(tgt, tmpl)
300     self.failUnlessEqual(tgt['b'], 3)
301
302
303 class TestMatchNameComponent(unittest.TestCase):
304   """Test case for the MatchNameComponent function"""
305
306   def testEmptyList(self):
307     """Test that there is no match against an empty list"""
308
309     self.failUnlessEqual(MatchNameComponent("", []), None)
310     self.failUnlessEqual(MatchNameComponent("test", []), None)
311
312   def testSingleMatch(self):
313     """Test that a single match is performed correctly"""
314     mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
315     for key in "test2", "test2.example", "test2.example.com":
316       self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
317
318   def testMultipleMatches(self):
319     """Test that a multiple match is returned as None"""
320     mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
321     for key in "test1", "test1.example":
322       self.failUnlessEqual(MatchNameComponent(key, mlist), None)
323
324
325 class TestFormatUnit(unittest.TestCase):
326   """Test case for the FormatUnit function"""
327
328   def testMiB(self):
329     self.assertEqual(FormatUnit(1), '1M')
330     self.assertEqual(FormatUnit(100), '100M')
331     self.assertEqual(FormatUnit(1023), '1023M')
332
333   def testGiB(self):
334     self.assertEqual(FormatUnit(1024), '1.0G')
335     self.assertEqual(FormatUnit(1536), '1.5G')
336     self.assertEqual(FormatUnit(17133), '16.7G')
337     self.assertEqual(FormatUnit(1024 * 1024 - 1), '1024.0G')
338
339   def testTiB(self):
340     self.assertEqual(FormatUnit(1024 * 1024), '1.0T')
341     self.assertEqual(FormatUnit(5120 * 1024), '5.0T')
342     self.assertEqual(FormatUnit(29829 * 1024), '29.1T')
343
344
345 class TestParseUnit(unittest.TestCase):
346   """Test case for the ParseUnit function"""
347
348   SCALES = (('', 1),
349             ('M', 1), ('G', 1024), ('T', 1024 * 1024),
350             ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
351             ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
352
353   def testRounding(self):
354     self.assertEqual(ParseUnit('0'), 0)
355     self.assertEqual(ParseUnit('1'), 4)
356     self.assertEqual(ParseUnit('2'), 4)
357     self.assertEqual(ParseUnit('3'), 4)
358
359     self.assertEqual(ParseUnit('124'), 124)
360     self.assertEqual(ParseUnit('125'), 128)
361     self.assertEqual(ParseUnit('126'), 128)
362     self.assertEqual(ParseUnit('127'), 128)
363     self.assertEqual(ParseUnit('128'), 128)
364     self.assertEqual(ParseUnit('129'), 132)
365     self.assertEqual(ParseUnit('130'), 132)
366
367   def testFloating(self):
368     self.assertEqual(ParseUnit('0'), 0)
369     self.assertEqual(ParseUnit('0.5'), 4)
370     self.assertEqual(ParseUnit('1.75'), 4)
371     self.assertEqual(ParseUnit('1.99'), 4)
372     self.assertEqual(ParseUnit('2.00'), 4)
373     self.assertEqual(ParseUnit('2.01'), 4)
374     self.assertEqual(ParseUnit('3.99'), 4)
375     self.assertEqual(ParseUnit('4.00'), 4)
376     self.assertEqual(ParseUnit('4.01'), 8)
377     self.assertEqual(ParseUnit('1.5G'), 1536)
378     self.assertEqual(ParseUnit('1.8G'), 1844)
379     self.assertEqual(ParseUnit('8.28T'), 8682212)
380
381   def testSuffixes(self):
382     for sep in ('', ' ', '   ', "\t", "\t "):
383       for suffix, scale in TestParseUnit.SCALES:
384         for func in (lambda x: x, str.lower, str.upper):
385           self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
386                            1024 * scale)
387
388   def testInvalidInput(self):
389     for sep in ('-', '_', ',', 'a'):
390       for suffix, _ in TestParseUnit.SCALES:
391         self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
392
393     for suffix, _ in TestParseUnit.SCALES:
394       self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
395
396
397 class TestSshKeys(testutils.GanetiTestCase):
398   """Test case for the AddAuthorizedKey function"""
399
400   KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
401   KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
402            'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
403
404   def setUp(self):
405     (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test')
406     try:
407       handle = os.fdopen(fd, 'w')
408       try:
409         handle.write("%s\n" % TestSshKeys.KEY_A)
410         handle.write("%s\n" % TestSshKeys.KEY_B)
411       finally:
412         handle.close()
413     except:
414       utils.RemoveFile(self.tmpname)
415       raise
416
417   def tearDown(self):
418     utils.RemoveFile(self.tmpname)
419     del self.tmpname
420
421   def testAddingNewKey(self):
422     AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
423
424     self.assertFileContent(self.tmpname,
425       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
426       'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
427       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
428       "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
429
430   def testAddingAlmostButNotCompletelyTheSameKey(self):
431     AddAuthorizedKey(self.tmpname,
432         'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
433
434     self.assertFileContent(self.tmpname,
435       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
436       'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
437       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
438       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
439
440   def testAddingExistingKeyWithSomeMoreSpaces(self):
441     AddAuthorizedKey(self.tmpname,
442         'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
443
444     self.assertFileContent(self.tmpname,
445       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
446       'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
447       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
448
449   def testRemovingExistingKeyWithSomeMoreSpaces(self):
450     RemoveAuthorizedKey(self.tmpname,
451         'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
452
453     self.assertFileContent(self.tmpname,
454       'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
455       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
456
457   def testRemovingNonExistingKey(self):
458     RemoveAuthorizedKey(self.tmpname,
459         'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
460
461     self.assertFileContent(self.tmpname,
462       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
463       'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
464       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
465
466
467 class TestEtcHosts(testutils.GanetiTestCase):
468   """Test functions modifying /etc/hosts"""
469
470   def setUp(self):
471     (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test')
472     try:
473       handle = os.fdopen(fd, 'w')
474       try:
475         handle.write('# This is a test file for /etc/hosts\n')
476         handle.write('127.0.0.1\tlocalhost\n')
477         handle.write('192.168.1.1 router gw\n')
478       finally:
479         handle.close()
480     except:
481       utils.RemoveFile(self.tmpname)
482       raise
483
484   def tearDown(self):
485     utils.RemoveFile(self.tmpname)
486     del self.tmpname
487
488   def testSettingNewIp(self):
489     SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
490
491     self.assertFileContent(self.tmpname,
492       "# This is a test file for /etc/hosts\n"
493       "127.0.0.1\tlocalhost\n"
494       "192.168.1.1 router gw\n"
495       "1.2.3.4\tmyhost.domain.tld myhost\n")
496
497   def testSettingExistingIp(self):
498     SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld',
499                      ['myhost'])
500
501     self.assertFileContent(self.tmpname,
502       "# This is a test file for /etc/hosts\n"
503       "127.0.0.1\tlocalhost\n"
504       "192.168.1.1\tmyhost.domain.tld myhost\n")
505
506   def testSettingDuplicateName(self):
507     SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost'])
508
509     self.assertFileContent(self.tmpname,
510       "# This is a test file for /etc/hosts\n"
511       "127.0.0.1\tlocalhost\n"
512       "192.168.1.1 router gw\n"
513       "1.2.3.4\tmyhost\n")
514
515   def testRemovingExistingHost(self):
516     RemoveEtcHostsEntry(self.tmpname, 'router')
517
518     self.assertFileContent(self.tmpname,
519       "# This is a test file for /etc/hosts\n"
520       "127.0.0.1\tlocalhost\n"
521       "192.168.1.1 gw\n")
522
523   def testRemovingSingleExistingHost(self):
524     RemoveEtcHostsEntry(self.tmpname, 'localhost')
525
526     self.assertFileContent(self.tmpname,
527       "# This is a test file for /etc/hosts\n"
528       "192.168.1.1 router gw\n")
529
530   def testRemovingNonExistingHost(self):
531     RemoveEtcHostsEntry(self.tmpname, 'myhost')
532
533     self.assertFileContent(self.tmpname,
534       "# This is a test file for /etc/hosts\n"
535       "127.0.0.1\tlocalhost\n"
536       "192.168.1.1 router gw\n")
537
538   def testRemovingAlias(self):
539     RemoveEtcHostsEntry(self.tmpname, 'gw')
540
541     self.assertFileContent(self.tmpname,
542       "# This is a test file for /etc/hosts\n"
543       "127.0.0.1\tlocalhost\n"
544       "192.168.1.1 router\n")
545
546
547 class TestShellQuoting(unittest.TestCase):
548   """Test case for shell quoting functions"""
549
550   def testShellQuote(self):
551     self.assertEqual(ShellQuote('abc'), "abc")
552     self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
553     self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
554     self.assertEqual(ShellQuote("a b c"), "'a b c'")
555     self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
556
557   def testShellQuoteArgs(self):
558     self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
559     self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
560     self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
561
562
563 class TestTcpPing(unittest.TestCase):
564   """Testcase for TCP version of ping - against listen(2)ing port"""
565
566   def setUp(self):
567     self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
568     self.listener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
569     self.listenerport = self.listener.getsockname()[1]
570     self.listener.listen(1)
571
572   def tearDown(self):
573     self.listener.shutdown(socket.SHUT_RDWR)
574     del self.listener
575     del self.listenerport
576
577   def testTcpPingToLocalHostAccept(self):
578     self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
579                          self.listenerport,
580                          timeout=10,
581                          live_port_needed=True,
582                          source=constants.LOCALHOST_IP_ADDRESS,
583                          ),
584                  "failed to connect to test listener")
585
586     self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
587                          self.listenerport,
588                          timeout=10,
589                          live_port_needed=True,
590                          ),
591                  "failed to connect to test listener (no source)")
592
593
594 class TestTcpPingDeaf(unittest.TestCase):
595   """Testcase for TCP version of ping - against non listen(2)ing port"""
596
597   def setUp(self):
598     self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
599     self.deaflistener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
600     self.deaflistenerport = self.deaflistener.getsockname()[1]
601
602   def tearDown(self):
603     del self.deaflistener
604     del self.deaflistenerport
605
606   def testTcpPingToLocalHostAcceptDeaf(self):
607     self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
608                         self.deaflistenerport,
609                         timeout=constants.TCP_PING_TIMEOUT,
610                         live_port_needed=True,
611                         source=constants.LOCALHOST_IP_ADDRESS,
612                         ), # need successful connect(2)
613                 "successfully connected to deaf listener")
614
615     self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
616                         self.deaflistenerport,
617                         timeout=constants.TCP_PING_TIMEOUT,
618                         live_port_needed=True,
619                         ), # need successful connect(2)
620                 "successfully connected to deaf listener (no source addr)")
621
622   def testTcpPingToLocalHostNoAccept(self):
623     self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
624                          self.deaflistenerport,
625                          timeout=constants.TCP_PING_TIMEOUT,
626                          live_port_needed=False,
627                          source=constants.LOCALHOST_IP_ADDRESS,
628                          ), # ECONNREFUSED is OK
629                  "failed to ping alive host on deaf port")
630
631     self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
632                          self.deaflistenerport,
633                          timeout=constants.TCP_PING_TIMEOUT,
634                          live_port_needed=False,
635                          ), # ECONNREFUSED is OK
636                  "failed to ping alive host on deaf port (no source addr)")
637
638
639 class TestListVisibleFiles(unittest.TestCase):
640   """Test case for ListVisibleFiles"""
641
642   def setUp(self):
643     self.path = tempfile.mkdtemp()
644
645   def tearDown(self):
646     shutil.rmtree(self.path)
647
648   def _test(self, files, expected):
649     # Sort a copy
650     expected = expected[:]
651     expected.sort()
652
653     for name in files:
654       f = open(os.path.join(self.path, name), 'w')
655       try:
656         f.write("Test\n")
657       finally:
658         f.close()
659
660     found = ListVisibleFiles(self.path)
661     found.sort()
662
663     self.assertEqual(found, expected)
664
665   def testAllVisible(self):
666     files = ["a", "b", "c"]
667     expected = files
668     self._test(files, expected)
669
670   def testNoneVisible(self):
671     files = [".a", ".b", ".c"]
672     expected = []
673     self._test(files, expected)
674
675   def testSomeVisible(self):
676     files = ["a", "b", ".c"]
677     expected = ["a", "b"]
678     self._test(files, expected)
679
680
681 class TestNewUUID(unittest.TestCase):
682   """Test case for NewUUID"""
683
684   _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
685                         '[a-f0-9]{4}-[a-f0-9]{12}$')
686
687   def runTest(self):
688     self.failUnless(self._re_uuid.match(utils.NewUUID()))
689
690
691 class TestUniqueSequence(unittest.TestCase):
692   """Test case for UniqueSequence"""
693
694   def _test(self, input, expected):
695     self.assertEqual(utils.UniqueSequence(input), expected)
696
697   def runTest(self):
698     # Ordered input
699     self._test([1, 2, 3], [1, 2, 3])
700     self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
701     self._test([1, 2, 2, 3], [1, 2, 3])
702     self._test([1, 2, 3, 3], [1, 2, 3])
703
704     # Unordered input
705     self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
706     self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])
707
708     # Strings
709     self._test(["a", "a"], ["a"])
710     self._test(["a", "b"], ["a", "b"])
711     self._test(["a", "b", "a"], ["a", "b"])
712
713 class TestFirstFree(unittest.TestCase):
714   """Test case for the FirstFree function"""
715
716   def test(self):
717     """Test FirstFree"""
718     self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
719     self.failUnlessEqual(FirstFree([]), None)
720     self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
721     self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
722     self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
723
724 if __name__ == '__main__':
725   unittest.main()