Convert RunCmd to an epydoc docstring
[ganeti-local] / test / ganeti.utils_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for unittesting the utils module"""
23
24 import unittest
25 import os
26 import time
27 import tempfile
28 import os.path
29 import os
30 import md5
31 import signal
32 import socket
33 import shutil
34 import re
35
36 import ganeti
37 from ganeti import constants
38 from ganeti import utils
39 from ganeti.utils import IsProcessAlive, Lock, Unlock, RunCmd, \
40      RemoveFile, CheckDict, MatchNameComponent, FormatUnit, \
41      ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
42      ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
43      SetEtcHostsEntry, RemoveEtcHostsEntry
44 from ganeti.errors import LockError, UnitParseError
45
46 def _ChildHandler(signal, stack):
47   global _ChildFlag
48   _ChildFlag = True
49
50 class GanetiTestCase(unittest.TestCase):
51   def assertFileContent(self, file_name, content):
52     """Checks the content of a file.
53
54     """
55     handle = open(file_name, 'r')
56     try:
57       self.assertEqual(handle.read(), content)
58     finally:
59       handle.close()
60
61
62 class TestIsProcessAlive(unittest.TestCase):
63   """Testing case for IsProcessAlive"""
64
65   def setUp(self):
66     global _ChildFlag
67     # create a (most probably) non-existing process-id
68     self.pid_non_existing = os.fork()
69     if self.pid_non_existing == 0:
70       os._exit(0)
71     elif self.pid_non_existing > 0:
72       os.waitpid(self.pid_non_existing, 0)
73     else:
74       raise SystemError("can't fork")
75     _ChildFlag = False
76     # Use _ChildHandler for SIGCHLD
77     self.chldOrig = signal.signal(signal.SIGCHLD, _ChildHandler)
78     # create a zombie
79     self.pid_zombie = os.fork()
80     if self.pid_zombie == 0:
81       os._exit(0)
82     elif self.pid_zombie < 0:
83       raise SystemError("can't fork")
84
85   def tearDown(self):
86     signal.signal(signal.SIGCHLD, self.chldOrig)
87
88   def testExists(self):
89     mypid = os.getpid()
90     self.assert_(IsProcessAlive(mypid),
91                  "can't find myself running")
92
93   def testZombie(self):
94     global _ChildFlag
95     timeout = 10
96
97     while not _ChildFlag:
98       if timeout >= 0:
99         time.sleep(0.2)
100         timeout -= 0.2
101       else:
102         self.fail("timed out waiting for child's signal")
103         break # not executed...
104
105     self.assert_(not IsProcessAlive(self.pid_zombie),
106                  "zombie not detected as zombie")
107
108   def testNotExisting(self):
109     self.assert_(not IsProcessAlive(self.pid_non_existing),
110                  "noexisting process detected")
111
112
113 class TestLocking(unittest.TestCase):
114   """Testing case for the Lock/Unlock functions"""
115
116   def setUp(self):
117     lock_dir = tempfile.mkdtemp(prefix="ganeti.unittest.",
118                                 suffix=".locking")
119     self.old_lock_dir = constants.LOCK_DIR
120     constants.LOCK_DIR = lock_dir
121
122   def tearDown(self):
123     try:
124       ganeti.utils.Unlock("unittest")
125     except LockError:
126       pass
127     shutil.rmtree(constants.LOCK_DIR, ignore_errors=True)
128     constants.LOCK_DIR = self.old_lock_dir
129
130   def clean_lock(self, name):
131     try:
132       ganeti.utils.Unlock("unittest")
133     except LockError:
134       pass
135
136
137   def testLock(self):
138     self.clean_lock("unittest")
139     self.assertEqual(None, Lock("unittest"))
140
141
142   def testUnlock(self):
143     self.clean_lock("unittest")
144     ganeti.utils.Lock("unittest")
145     self.assertEqual(None, Unlock("unittest"))
146
147   def testDoubleLock(self):
148     self.clean_lock("unittest")
149     ganeti.utils.Lock("unittest")
150     self.assertRaises(LockError, Lock, "unittest")
151
152
153 class TestRunCmd(unittest.TestCase):
154   """Testing case for the RunCmd function"""
155
156   def setUp(self):
157     self.magic = time.ctime() + " ganeti test"
158
159   def testOk(self):
160     """Test successful exit code"""
161     result = RunCmd("/bin/sh -c 'exit 0'")
162     self.assertEqual(result.exit_code, 0)
163
164   def testFail(self):
165     """Test fail exit code"""
166     result = RunCmd("/bin/sh -c 'exit 1'")
167     self.assertEqual(result.exit_code, 1)
168
169
170   def testStdout(self):
171     """Test standard output"""
172     cmd = 'echo -n "%s"' % self.magic
173     result = RunCmd("/bin/sh -c '%s'" % cmd)
174     self.assertEqual(result.stdout, self.magic)
175
176
177   def testStderr(self):
178     """Test standard error"""
179     cmd = 'echo -n "%s"' % self.magic
180     result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
181     self.assertEqual(result.stderr, self.magic)
182
183
184   def testCombined(self):
185     """Test combined output"""
186     cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
187     result = RunCmd("/bin/sh -c '%s'" % cmd)
188     self.assertEqual(result.output, "A" + self.magic + "B" + self.magic)
189
190   def testSignal(self):
191     """Test signal"""
192     result = RunCmd(["python", "-c", "import os; os.kill(os.getpid(), 15)"])
193     self.assertEqual(result.signal, 15)
194
195   def testListRun(self):
196     """Test list runs"""
197     result = RunCmd(["true"])
198     self.assertEqual(result.signal, None)
199     self.assertEqual(result.exit_code, 0)
200     result = RunCmd(["/bin/sh", "-c", "exit 1"])
201     self.assertEqual(result.signal, None)
202     self.assertEqual(result.exit_code, 1)
203     result = RunCmd(["echo", "-n", self.magic])
204     self.assertEqual(result.signal, None)
205     self.assertEqual(result.exit_code, 0)
206     self.assertEqual(result.stdout, self.magic)
207
208   def testLang(self):
209     """Test locale environment"""
210     old_env = os.environ.copy()
211     try:
212       os.environ["LANG"] = "en_US.UTF-8"
213       os.environ["LC_ALL"] = "en_US.UTF-8"
214       result = RunCmd(["locale"])
215       for line in result.output.splitlines():
216         key, value = line.split("=", 1)
217         # Ignore these variables, they're overridden by LC_ALL
218         if key == "LANG" or key == "LANGUAGE":
219           continue
220         self.failIf(value and value != "C" and value != '"C"',
221             "Variable %s is set to the invalid value '%s'" % (key, value))
222     finally:
223       os.environ = old_env
224
225
226 class TestRemoveFile(unittest.TestCase):
227   """Test case for the RemoveFile function"""
228
229   def setUp(self):
230     """Create a temp dir and file for each case"""
231     self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
232     fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
233     os.close(fd)
234
235   def tearDown(self):
236     if os.path.exists(self.tmpfile):
237       os.unlink(self.tmpfile)
238     os.rmdir(self.tmpdir)
239
240
241   def testIgnoreDirs(self):
242     """Test that RemoveFile() ignores directories"""
243     self.assertEqual(None, RemoveFile(self.tmpdir))
244
245
246   def testIgnoreNotExisting(self):
247     """Test that RemoveFile() ignores non-existing files"""
248     RemoveFile(self.tmpfile)
249     RemoveFile(self.tmpfile)
250
251
252   def testRemoveFile(self):
253     """Test that RemoveFile does remove a file"""
254     RemoveFile(self.tmpfile)
255     if os.path.exists(self.tmpfile):
256       self.fail("File '%s' not removed" % self.tmpfile)
257
258
259   def testRemoveSymlink(self):
260     """Test that RemoveFile does remove symlinks"""
261     symlink = self.tmpdir + "/symlink"
262     os.symlink("no-such-file", symlink)
263     RemoveFile(symlink)
264     if os.path.exists(symlink):
265       self.fail("File '%s' not removed" % symlink)
266     os.symlink(self.tmpfile, symlink)
267     RemoveFile(symlink)
268     if os.path.exists(symlink):
269       self.fail("File '%s' not removed" % symlink)
270
271
272 class TestCheckdict(unittest.TestCase):
273   """Test case for the CheckDict function"""
274
275   def testAdd(self):
276     """Test that CheckDict adds a missing key with the correct value"""
277
278     tgt = {'a':1}
279     tmpl = {'b': 2}
280     CheckDict(tgt, tmpl)
281     if 'b' not in tgt or tgt['b'] != 2:
282       self.fail("Failed to update dict")
283
284
285   def testNoUpdate(self):
286     """Test that CheckDict does not overwrite an existing key"""
287     tgt = {'a':1, 'b': 3}
288     tmpl = {'b': 2}
289     CheckDict(tgt, tmpl)
290     self.failUnlessEqual(tgt['b'], 3)
291
292
293 class TestMatchNameComponent(unittest.TestCase):
294   """Test case for the MatchNameComponent function"""
295
296   def testEmptyList(self):
297     """Test that there is no match against an empty list"""
298
299     self.failUnlessEqual(MatchNameComponent("", []), None)
300     self.failUnlessEqual(MatchNameComponent("test", []), None)
301
302   def testSingleMatch(self):
303     """Test that a single match is performed correctly"""
304     mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
305     for key in "test2", "test2.example", "test2.example.com":
306       self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
307
308   def testMultipleMatches(self):
309     """Test that a multiple match is returned as None"""
310     mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
311     for key in "test1", "test1.example":
312       self.failUnlessEqual(MatchNameComponent(key, mlist), None)
313
314
315 class TestFormatUnit(unittest.TestCase):
316   """Test case for the FormatUnit function"""
317
318   def testMiB(self):
319     self.assertEqual(FormatUnit(1), '1M')
320     self.assertEqual(FormatUnit(100), '100M')
321     self.assertEqual(FormatUnit(1023), '1023M')
322
323   def testGiB(self):
324     self.assertEqual(FormatUnit(1024), '1.0G')
325     self.assertEqual(FormatUnit(1536), '1.5G')
326     self.assertEqual(FormatUnit(17133), '16.7G')
327     self.assertEqual(FormatUnit(1024 * 1024 - 1), '1024.0G')
328
329   def testTiB(self):
330     self.assertEqual(FormatUnit(1024 * 1024), '1.0T')
331     self.assertEqual(FormatUnit(5120 * 1024), '5.0T')
332     self.assertEqual(FormatUnit(29829 * 1024), '29.1T')
333
334
335 class TestParseUnit(unittest.TestCase):
336   """Test case for the ParseUnit function"""
337
338   SCALES = (('', 1),
339             ('M', 1), ('G', 1024), ('T', 1024 * 1024),
340             ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
341             ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
342
343   def testRounding(self):
344     self.assertEqual(ParseUnit('0'), 0)
345     self.assertEqual(ParseUnit('1'), 4)
346     self.assertEqual(ParseUnit('2'), 4)
347     self.assertEqual(ParseUnit('3'), 4)
348
349     self.assertEqual(ParseUnit('124'), 124)
350     self.assertEqual(ParseUnit('125'), 128)
351     self.assertEqual(ParseUnit('126'), 128)
352     self.assertEqual(ParseUnit('127'), 128)
353     self.assertEqual(ParseUnit('128'), 128)
354     self.assertEqual(ParseUnit('129'), 132)
355     self.assertEqual(ParseUnit('130'), 132)
356
357   def testFloating(self):
358     self.assertEqual(ParseUnit('0'), 0)
359     self.assertEqual(ParseUnit('0.5'), 4)
360     self.assertEqual(ParseUnit('1.75'), 4)
361     self.assertEqual(ParseUnit('1.99'), 4)
362     self.assertEqual(ParseUnit('2.00'), 4)
363     self.assertEqual(ParseUnit('2.01'), 4)
364     self.assertEqual(ParseUnit('3.99'), 4)
365     self.assertEqual(ParseUnit('4.00'), 4)
366     self.assertEqual(ParseUnit('4.01'), 8)
367     self.assertEqual(ParseUnit('1.5G'), 1536)
368     self.assertEqual(ParseUnit('1.8G'), 1844)
369     self.assertEqual(ParseUnit('8.28T'), 8682212)
370
371   def testSuffixes(self):
372     for sep in ('', ' ', '   ', "\t", "\t "):
373       for suffix, scale in TestParseUnit.SCALES:
374         for func in (lambda x: x, str.lower, str.upper):
375           self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
376                            1024 * scale)
377
378   def testInvalidInput(self):
379     for sep in ('-', '_', ',', 'a'):
380       for suffix, _ in TestParseUnit.SCALES:
381         self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
382
383     for suffix, _ in TestParseUnit.SCALES:
384       self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
385
386
387 class TestSshKeys(GanetiTestCase):
388   """Test case for the AddAuthorizedKey function"""
389
390   KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
391   KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
392            'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
393
394   def setUp(self):
395     (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test')
396     try:
397       handle = os.fdopen(fd, 'w')
398       try:
399         handle.write("%s\n" % TestSshKeys.KEY_A)
400         handle.write("%s\n" % TestSshKeys.KEY_B)
401       finally:
402         handle.close()
403     except:
404       utils.RemoveFile(self.tmpname)
405       raise
406
407   def tearDown(self):
408     utils.RemoveFile(self.tmpname)
409     del self.tmpname
410
411   def testAddingNewKey(self):
412     AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
413
414     self.assertFileContent(self.tmpname,
415       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
416       'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
417       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
418       "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
419
420   def testAddingAlmostButNotCompletelyTheSameKey(self):
421     AddAuthorizedKey(self.tmpname,
422         'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
423
424     self.assertFileContent(self.tmpname,
425       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
426       'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
427       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
428       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
429
430   def testAddingExistingKeyWithSomeMoreSpaces(self):
431     AddAuthorizedKey(self.tmpname,
432         'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
433
434     self.assertFileContent(self.tmpname,
435       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
436       'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
437       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
438
439   def testRemovingExistingKeyWithSomeMoreSpaces(self):
440     RemoveAuthorizedKey(self.tmpname,
441         'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
442
443     self.assertFileContent(self.tmpname,
444       'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
445       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
446
447   def testRemovingNonExistingKey(self):
448     RemoveAuthorizedKey(self.tmpname,
449         'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
450
451     self.assertFileContent(self.tmpname,
452       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
453       'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
454       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
455
456
457 class TestEtcHosts(GanetiTestCase):
458   """Test functions modifying /etc/hosts"""
459
460   def setUp(self):
461     (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test')
462     try:
463       handle = os.fdopen(fd, 'w')
464       try:
465         handle.write('# This is a test file for /etc/hosts\n')
466         handle.write('127.0.0.1\tlocalhost\n')
467         handle.write('192.168.1.1 router gw\n')
468       finally:
469         handle.close()
470     except:
471       utils.RemoveFile(self.tmpname)
472       raise
473
474   def tearDown(self):
475     utils.RemoveFile(self.tmpname)
476     del self.tmpname
477
478   def testSettingNewIp(self):
479     SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
480
481     self.assertFileContent(self.tmpname,
482       "# This is a test file for /etc/hosts\n"
483       "127.0.0.1\tlocalhost\n"
484       "192.168.1.1 router gw\n"
485       "1.2.3.4\tmyhost.domain.tld myhost\n")
486
487   def testSettingExistingIp(self):
488     SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld',
489                      ['myhost'])
490
491     self.assertFileContent(self.tmpname,
492       "# This is a test file for /etc/hosts\n"
493       "127.0.0.1\tlocalhost\n"
494       "192.168.1.1\tmyhost.domain.tld myhost\n")
495
496   def testSettingDuplicateName(self):
497     SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost'])
498
499     self.assertFileContent(self.tmpname,
500       "# This is a test file for /etc/hosts\n"
501       "127.0.0.1\tlocalhost\n"
502       "192.168.1.1 router gw\n"
503       "1.2.3.4\tmyhost\n")
504
505   def testRemovingExistingHost(self):
506     RemoveEtcHostsEntry(self.tmpname, 'router')
507
508     self.assertFileContent(self.tmpname,
509       "# This is a test file for /etc/hosts\n"
510       "127.0.0.1\tlocalhost\n"
511       "192.168.1.1 gw\n")
512
513   def testRemovingSingleExistingHost(self):
514     RemoveEtcHostsEntry(self.tmpname, 'localhost')
515
516     self.assertFileContent(self.tmpname,
517       "# This is a test file for /etc/hosts\n"
518       "192.168.1.1 router gw\n")
519
520   def testRemovingNonExistingHost(self):
521     RemoveEtcHostsEntry(self.tmpname, 'myhost')
522
523     self.assertFileContent(self.tmpname,
524       "# This is a test file for /etc/hosts\n"
525       "127.0.0.1\tlocalhost\n"
526       "192.168.1.1 router gw\n")
527
528   def testRemovingAlias(self):
529     RemoveEtcHostsEntry(self.tmpname, 'gw')
530
531     self.assertFileContent(self.tmpname,
532       "# This is a test file for /etc/hosts\n"
533       "127.0.0.1\tlocalhost\n"
534       "192.168.1.1 router\n")
535
536
537 class TestShellQuoting(unittest.TestCase):
538   """Test case for shell quoting functions"""
539
540   def testShellQuote(self):
541     self.assertEqual(ShellQuote('abc'), "abc")
542     self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
543     self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
544     self.assertEqual(ShellQuote("a b c"), "'a b c'")
545     self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
546
547   def testShellQuoteArgs(self):
548     self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
549     self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
550     self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
551
552
553 class TestTcpPing(unittest.TestCase):
554   """Testcase for TCP version of ping - against listen(2)ing port"""
555
556   def setUp(self):
557     self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
558     self.listener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
559     self.listenerport = self.listener.getsockname()[1]
560     self.listener.listen(1)
561
562   def tearDown(self):
563     self.listener.shutdown(socket.SHUT_RDWR)
564     del self.listener
565     del self.listenerport
566
567   def testTcpPingToLocalHostAccept(self):
568     self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
569                          self.listenerport,
570                          timeout=10,
571                          live_port_needed=True,
572                          source=constants.LOCALHOST_IP_ADDRESS,
573                          ),
574                  "failed to connect to test listener")
575
576     self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
577                          self.listenerport,
578                          timeout=10,
579                          live_port_needed=True,
580                          ),
581                  "failed to connect to test listener (no source)")
582
583
584 class TestTcpPingDeaf(unittest.TestCase):
585   """Testcase for TCP version of ping - against non listen(2)ing port"""
586
587   def setUp(self):
588     self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
589     self.deaflistener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
590     self.deaflistenerport = self.deaflistener.getsockname()[1]
591
592   def tearDown(self):
593     del self.deaflistener
594     del self.deaflistenerport
595
596   def testTcpPingToLocalHostAcceptDeaf(self):
597     self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
598                         self.deaflistenerport,
599                         timeout=constants.TCP_PING_TIMEOUT,
600                         live_port_needed=True,
601                         source=constants.LOCALHOST_IP_ADDRESS,
602                         ), # need successful connect(2)
603                 "successfully connected to deaf listener")
604
605     self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
606                         self.deaflistenerport,
607                         timeout=constants.TCP_PING_TIMEOUT,
608                         live_port_needed=True,
609                         ), # need successful connect(2)
610                 "successfully connected to deaf listener (no source addr)")
611
612   def testTcpPingToLocalHostNoAccept(self):
613     self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
614                          self.deaflistenerport,
615                          timeout=constants.TCP_PING_TIMEOUT,
616                          live_port_needed=False,
617                          source=constants.LOCALHOST_IP_ADDRESS,
618                          ), # ECONNREFUSED is OK
619                  "failed to ping alive host on deaf port")
620
621     self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
622                          self.deaflistenerport,
623                          timeout=constants.TCP_PING_TIMEOUT,
624                          live_port_needed=False,
625                          ), # ECONNREFUSED is OK
626                  "failed to ping alive host on deaf port (no source addr)")
627
628
629 class TestListVisibleFiles(unittest.TestCase):
630   """Test case for ListVisibleFiles"""
631
632   def setUp(self):
633     self.path = tempfile.mkdtemp()
634
635   def tearDown(self):
636     shutil.rmtree(self.path)
637
638   def _test(self, files, expected):
639     # Sort a copy
640     expected = expected[:]
641     expected.sort()
642
643     for name in files:
644       f = open(os.path.join(self.path, name), 'w')
645       try:
646         f.write("Test\n")
647       finally:
648         f.close()
649
650     found = ListVisibleFiles(self.path)
651     found.sort()
652
653     self.assertEqual(found, expected)
654
655   def testAllVisible(self):
656     files = ["a", "b", "c"]
657     expected = files
658     self._test(files, expected)
659
660   def testNoneVisible(self):
661     files = [".a", ".b", ".c"]
662     expected = []
663     self._test(files, expected)
664
665   def testSomeVisible(self):
666     files = ["a", "b", ".c"]
667     expected = ["a", "b"]
668     self._test(files, expected)
669
670
671 class TestNewUUID(unittest.TestCase):
672   """Test case for NewUUID"""
673
674   _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
675                         '[a-f0-9]{4}-[a-f0-9]{12}$')
676
677   def runTest(self):
678     self.failUnless(self._re_uuid.match(utils.NewUUID()))
679
680
681 class TestUniqueSequence(unittest.TestCase):
682   """Test case for UniqueSequence"""
683
684   def _test(self, input, expected):
685     self.assertEqual(utils.UniqueSequence(input), expected)
686
687   def runTest(self):
688     # Ordered input
689     self._test([1, 2, 3], [1, 2, 3])
690     self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
691     self._test([1, 2, 2, 3], [1, 2, 3])
692     self._test([1, 2, 3, 3], [1, 2, 3])
693
694     # Unordered input
695     self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
696     self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])
697
698     # Strings
699     self._test(["a", "a"], ["a"])
700     self._test(["a", "b"], ["a", "b"])
701     self._test(["a", "b", "a"], ["a", "b"])
702
703
704 if __name__ == '__main__':
705   unittest.main()