Convert the locking unittests to repetition-test
[ganeti-local] / test / ganeti.utils_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for unittesting the utils module"""
23
24 import unittest
25 import os
26 import time
27 import tempfile
28 import os.path
29 import os
30 import md5
31 import signal
32 import socket
33 import shutil
34 import re
35 import select
36
37 import ganeti
38 import testutils
39 from ganeti import constants
40 from ganeti import utils
41 from ganeti.utils import IsProcessAlive, RunCmd, \
42      RemoveFile, CheckDict, MatchNameComponent, FormatUnit, \
43      ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
44      ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
45      SetEtcHostsEntry, RemoveEtcHostsEntry, FirstFree, OwnIpAddress
46 from ganeti.errors import LockError, UnitParseError, GenericError, \
47      ProgrammerError
48
49
50 class TestIsProcessAlive(unittest.TestCase):
51   """Testing case for IsProcessAlive"""
52
53   def testExists(self):
54     mypid = os.getpid()
55     self.assert_(IsProcessAlive(mypid),
56                  "can't find myself running")
57
58   def testNotExisting(self):
59     pid_non_existing = os.fork()
60     if pid_non_existing == 0:
61       os._exit(0)
62     elif pid_non_existing < 0:
63       raise SystemError("can't fork")
64     os.waitpid(pid_non_existing, 0)
65     self.assert_(not IsProcessAlive(pid_non_existing),
66                  "nonexisting process detected")
67
68
69 class TestPidFileFunctions(unittest.TestCase):
70   """Tests for WritePidFile, RemovePidFile and ReadPidFile"""
71
72   def setUp(self):
73     self.dir = tempfile.mkdtemp()
74     self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
75     utils.DaemonPidFileName = self.f_dpn
76
77   def testPidFileFunctions(self):
78     pid_file = self.f_dpn('test')
79     utils.WritePidFile('test')
80     self.failUnless(os.path.exists(pid_file),
81                     "PID file should have been created")
82     read_pid = utils.ReadPidFile(pid_file)
83     self.failUnlessEqual(read_pid, os.getpid())
84     self.failUnless(utils.IsProcessAlive(read_pid))
85     self.failUnlessRaises(GenericError, utils.WritePidFile, 'test')
86     utils.RemovePidFile('test')
87     self.failIf(os.path.exists(pid_file),
88                 "PID file should not exist anymore")
89     self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
90                          "ReadPidFile should return 0 for missing pid file")
91     fh = open(pid_file, "w")
92     fh.write("blah\n")
93     fh.close()
94     self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
95                          "ReadPidFile should return 0 for invalid pid file")
96     utils.RemovePidFile('test')
97     self.failIf(os.path.exists(pid_file),
98                 "PID file should not exist anymore")
99
100   def testKill(self):
101     pid_file = self.f_dpn('child')
102     r_fd, w_fd = os.pipe()
103     new_pid = os.fork()
104     if new_pid == 0: #child
105       utils.WritePidFile('child')
106       os.write(w_fd, 'a')
107       signal.pause()
108       os._exit(0)
109       return
110     # else we are in the parent
111     # wait until the child has written the pid file
112     os.read(r_fd, 1)
113     read_pid = utils.ReadPidFile(pid_file)
114     self.failUnlessEqual(read_pid, new_pid)
115     self.failUnless(utils.IsProcessAlive(new_pid))
116     utils.KillProcess(new_pid, waitpid=True)
117     self.failIf(utils.IsProcessAlive(new_pid))
118     utils.RemovePidFile('child')
119     self.failUnlessRaises(ProgrammerError, utils.KillProcess, 0)
120
121   def tearDown(self):
122     for name in os.listdir(self.dir):
123       os.unlink(os.path.join(self.dir, name))
124     os.rmdir(self.dir)
125
126
127 class TestRunCmd(testutils.GanetiTestCase):
128   """Testing case for the RunCmd function"""
129
130   def setUp(self):
131     self.magic = time.ctime() + " ganeti test"
132     fh, self.fname = tempfile.mkstemp()
133     os.close(fh)
134
135   def tearDown(self):
136     if self.fname:
137       utils.RemoveFile(self.fname)
138
139   def testOk(self):
140     """Test successful exit code"""
141     result = RunCmd("/bin/sh -c 'exit 0'")
142     self.assertEqual(result.exit_code, 0)
143     self.assertEqual(result.output, "")
144
145   def testFail(self):
146     """Test fail exit code"""
147     result = RunCmd("/bin/sh -c 'exit 1'")
148     self.assertEqual(result.exit_code, 1)
149     self.assertEqual(result.output, "")
150
151   def testStdout(self):
152     """Test standard output"""
153     cmd = 'echo -n "%s"' % self.magic
154     result = RunCmd("/bin/sh -c '%s'" % cmd)
155     self.assertEqual(result.stdout, self.magic)
156     result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
157     self.assertEqual(result.output, "")
158     self.assertFileContent(self.fname, self.magic)
159
160   def testStderr(self):
161     """Test standard error"""
162     cmd = 'echo -n "%s"' % self.magic
163     result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
164     self.assertEqual(result.stderr, self.magic)
165     result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd, output=self.fname)
166     self.assertEqual(result.output, "")
167     self.assertFileContent(self.fname, self.magic)
168
169   def testCombined(self):
170     """Test combined output"""
171     cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
172     expected = "A" + self.magic + "B" + self.magic
173     result = RunCmd("/bin/sh -c '%s'" % cmd)
174     self.assertEqual(result.output, expected)
175     result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
176     self.assertEqual(result.output, "")
177     self.assertFileContent(self.fname, expected)
178
179   def testSignal(self):
180     """Test signal"""
181     result = RunCmd(["python", "-c", "import os; os.kill(os.getpid(), 15)"])
182     self.assertEqual(result.signal, 15)
183     self.assertEqual(result.output, "")
184
185   def testListRun(self):
186     """Test list runs"""
187     result = RunCmd(["true"])
188     self.assertEqual(result.signal, None)
189     self.assertEqual(result.exit_code, 0)
190     result = RunCmd(["/bin/sh", "-c", "exit 1"])
191     self.assertEqual(result.signal, None)
192     self.assertEqual(result.exit_code, 1)
193     result = RunCmd(["echo", "-n", self.magic])
194     self.assertEqual(result.signal, None)
195     self.assertEqual(result.exit_code, 0)
196     self.assertEqual(result.stdout, self.magic)
197
198   def testFileEmptyOutput(self):
199     """Test file output"""
200     result = RunCmd(["true"], output=self.fname)
201     self.assertEqual(result.signal, None)
202     self.assertEqual(result.exit_code, 0)
203     self.assertFileContent(self.fname, "")
204
205   def testLang(self):
206     """Test locale environment"""
207     old_env = os.environ.copy()
208     try:
209       os.environ["LANG"] = "en_US.UTF-8"
210       os.environ["LC_ALL"] = "en_US.UTF-8"
211       result = RunCmd(["locale"])
212       for line in result.output.splitlines():
213         key, value = line.split("=", 1)
214         # Ignore these variables, they're overridden by LC_ALL
215         if key == "LANG" or key == "LANGUAGE":
216           continue
217         self.failIf(value and value != "C" and value != '"C"',
218             "Variable %s is set to the invalid value '%s'" % (key, value))
219     finally:
220       os.environ = old_env
221
222   def testDefaultCwd(self):
223     """Test default working directory"""
224     self.failUnlessEqual(RunCmd(["pwd"]).stdout.strip(), "/")
225
226   def testCwd(self):
227     """Test default working directory"""
228     self.failUnlessEqual(RunCmd(["pwd"], cwd="/").stdout.strip(), "/")
229     self.failUnlessEqual(RunCmd(["pwd"], cwd="/tmp").stdout.strip(), "/tmp")
230     cwd = os.getcwd()
231     self.failUnlessEqual(RunCmd(["pwd"], cwd=cwd).stdout.strip(), cwd)
232
233
234 class TestRemoveFile(unittest.TestCase):
235   """Test case for the RemoveFile function"""
236
237   def setUp(self):
238     """Create a temp dir and file for each case"""
239     self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
240     fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
241     os.close(fd)
242
243   def tearDown(self):
244     if os.path.exists(self.tmpfile):
245       os.unlink(self.tmpfile)
246     os.rmdir(self.tmpdir)
247
248
249   def testIgnoreDirs(self):
250     """Test that RemoveFile() ignores directories"""
251     self.assertEqual(None, RemoveFile(self.tmpdir))
252
253
254   def testIgnoreNotExisting(self):
255     """Test that RemoveFile() ignores non-existing files"""
256     RemoveFile(self.tmpfile)
257     RemoveFile(self.tmpfile)
258
259
260   def testRemoveFile(self):
261     """Test that RemoveFile does remove a file"""
262     RemoveFile(self.tmpfile)
263     if os.path.exists(self.tmpfile):
264       self.fail("File '%s' not removed" % self.tmpfile)
265
266
267   def testRemoveSymlink(self):
268     """Test that RemoveFile does remove symlinks"""
269     symlink = self.tmpdir + "/symlink"
270     os.symlink("no-such-file", symlink)
271     RemoveFile(symlink)
272     if os.path.exists(symlink):
273       self.fail("File '%s' not removed" % symlink)
274     os.symlink(self.tmpfile, symlink)
275     RemoveFile(symlink)
276     if os.path.exists(symlink):
277       self.fail("File '%s' not removed" % symlink)
278
279
280 class TestCheckdict(unittest.TestCase):
281   """Test case for the CheckDict function"""
282
283   def testAdd(self):
284     """Test that CheckDict adds a missing key with the correct value"""
285
286     tgt = {'a':1}
287     tmpl = {'b': 2}
288     CheckDict(tgt, tmpl)
289     if 'b' not in tgt or tgt['b'] != 2:
290       self.fail("Failed to update dict")
291
292
293   def testNoUpdate(self):
294     """Test that CheckDict does not overwrite an existing key"""
295     tgt = {'a':1, 'b': 3}
296     tmpl = {'b': 2}
297     CheckDict(tgt, tmpl)
298     self.failUnlessEqual(tgt['b'], 3)
299
300
301 class TestMatchNameComponent(unittest.TestCase):
302   """Test case for the MatchNameComponent function"""
303
304   def testEmptyList(self):
305     """Test that there is no match against an empty list"""
306
307     self.failUnlessEqual(MatchNameComponent("", []), None)
308     self.failUnlessEqual(MatchNameComponent("test", []), None)
309
310   def testSingleMatch(self):
311     """Test that a single match is performed correctly"""
312     mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
313     for key in "test2", "test2.example", "test2.example.com":
314       self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
315
316   def testMultipleMatches(self):
317     """Test that a multiple match is returned as None"""
318     mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
319     for key in "test1", "test1.example":
320       self.failUnlessEqual(MatchNameComponent(key, mlist), None)
321
322
323 class TestFormatUnit(unittest.TestCase):
324   """Test case for the FormatUnit function"""
325
326   def testMiB(self):
327     self.assertEqual(FormatUnit(1), '1M')
328     self.assertEqual(FormatUnit(100), '100M')
329     self.assertEqual(FormatUnit(1023), '1023M')
330
331   def testGiB(self):
332     self.assertEqual(FormatUnit(1024), '1.0G')
333     self.assertEqual(FormatUnit(1536), '1.5G')
334     self.assertEqual(FormatUnit(17133), '16.7G')
335     self.assertEqual(FormatUnit(1024 * 1024 - 1), '1024.0G')
336
337   def testTiB(self):
338     self.assertEqual(FormatUnit(1024 * 1024), '1.0T')
339     self.assertEqual(FormatUnit(5120 * 1024), '5.0T')
340     self.assertEqual(FormatUnit(29829 * 1024), '29.1T')
341
342
343 class TestParseUnit(unittest.TestCase):
344   """Test case for the ParseUnit function"""
345
346   SCALES = (('', 1),
347             ('M', 1), ('G', 1024), ('T', 1024 * 1024),
348             ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
349             ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
350
351   def testRounding(self):
352     self.assertEqual(ParseUnit('0'), 0)
353     self.assertEqual(ParseUnit('1'), 4)
354     self.assertEqual(ParseUnit('2'), 4)
355     self.assertEqual(ParseUnit('3'), 4)
356
357     self.assertEqual(ParseUnit('124'), 124)
358     self.assertEqual(ParseUnit('125'), 128)
359     self.assertEqual(ParseUnit('126'), 128)
360     self.assertEqual(ParseUnit('127'), 128)
361     self.assertEqual(ParseUnit('128'), 128)
362     self.assertEqual(ParseUnit('129'), 132)
363     self.assertEqual(ParseUnit('130'), 132)
364
365   def testFloating(self):
366     self.assertEqual(ParseUnit('0'), 0)
367     self.assertEqual(ParseUnit('0.5'), 4)
368     self.assertEqual(ParseUnit('1.75'), 4)
369     self.assertEqual(ParseUnit('1.99'), 4)
370     self.assertEqual(ParseUnit('2.00'), 4)
371     self.assertEqual(ParseUnit('2.01'), 4)
372     self.assertEqual(ParseUnit('3.99'), 4)
373     self.assertEqual(ParseUnit('4.00'), 4)
374     self.assertEqual(ParseUnit('4.01'), 8)
375     self.assertEqual(ParseUnit('1.5G'), 1536)
376     self.assertEqual(ParseUnit('1.8G'), 1844)
377     self.assertEqual(ParseUnit('8.28T'), 8682212)
378
379   def testSuffixes(self):
380     for sep in ('', ' ', '   ', "\t", "\t "):
381       for suffix, scale in TestParseUnit.SCALES:
382         for func in (lambda x: x, str.lower, str.upper):
383           self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
384                            1024 * scale)
385
386   def testInvalidInput(self):
387     for sep in ('-', '_', ',', 'a'):
388       for suffix, _ in TestParseUnit.SCALES:
389         self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
390
391     for suffix, _ in TestParseUnit.SCALES:
392       self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
393
394
395 class TestSshKeys(testutils.GanetiTestCase):
396   """Test case for the AddAuthorizedKey function"""
397
398   KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
399   KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
400            'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
401
402   def setUp(self):
403     (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test')
404     try:
405       handle = os.fdopen(fd, 'w')
406       try:
407         handle.write("%s\n" % TestSshKeys.KEY_A)
408         handle.write("%s\n" % TestSshKeys.KEY_B)
409       finally:
410         handle.close()
411     except:
412       utils.RemoveFile(self.tmpname)
413       raise
414
415   def tearDown(self):
416     utils.RemoveFile(self.tmpname)
417     del self.tmpname
418
419   def testAddingNewKey(self):
420     AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
421
422     self.assertFileContent(self.tmpname,
423       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
424       'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
425       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
426       "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
427
428   def testAddingAlmostButNotCompletelyTheSameKey(self):
429     AddAuthorizedKey(self.tmpname,
430         'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
431
432     self.assertFileContent(self.tmpname,
433       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
434       'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
435       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
436       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
437
438   def testAddingExistingKeyWithSomeMoreSpaces(self):
439     AddAuthorizedKey(self.tmpname,
440         'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
441
442     self.assertFileContent(self.tmpname,
443       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
444       'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
445       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
446
447   def testRemovingExistingKeyWithSomeMoreSpaces(self):
448     RemoveAuthorizedKey(self.tmpname,
449         'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
450
451     self.assertFileContent(self.tmpname,
452       'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
453       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
454
455   def testRemovingNonExistingKey(self):
456     RemoveAuthorizedKey(self.tmpname,
457         'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
458
459     self.assertFileContent(self.tmpname,
460       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
461       'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
462       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
463
464
465 class TestEtcHosts(testutils.GanetiTestCase):
466   """Test functions modifying /etc/hosts"""
467
468   def setUp(self):
469     (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test')
470     try:
471       handle = os.fdopen(fd, 'w')
472       try:
473         handle.write('# This is a test file for /etc/hosts\n')
474         handle.write('127.0.0.1\tlocalhost\n')
475         handle.write('192.168.1.1 router gw\n')
476       finally:
477         handle.close()
478     except:
479       utils.RemoveFile(self.tmpname)
480       raise
481
482   def tearDown(self):
483     utils.RemoveFile(self.tmpname)
484     del self.tmpname
485
486   def testSettingNewIp(self):
487     SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
488
489     self.assertFileContent(self.tmpname,
490       "# This is a test file for /etc/hosts\n"
491       "127.0.0.1\tlocalhost\n"
492       "192.168.1.1 router gw\n"
493       "1.2.3.4\tmyhost.domain.tld myhost\n")
494
495   def testSettingExistingIp(self):
496     SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld',
497                      ['myhost'])
498
499     self.assertFileContent(self.tmpname,
500       "# This is a test file for /etc/hosts\n"
501       "127.0.0.1\tlocalhost\n"
502       "192.168.1.1\tmyhost.domain.tld myhost\n")
503
504   def testSettingDuplicateName(self):
505     SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost'])
506
507     self.assertFileContent(self.tmpname,
508       "# This is a test file for /etc/hosts\n"
509       "127.0.0.1\tlocalhost\n"
510       "192.168.1.1 router gw\n"
511       "1.2.3.4\tmyhost\n")
512
513   def testRemovingExistingHost(self):
514     RemoveEtcHostsEntry(self.tmpname, 'router')
515
516     self.assertFileContent(self.tmpname,
517       "# This is a test file for /etc/hosts\n"
518       "127.0.0.1\tlocalhost\n"
519       "192.168.1.1 gw\n")
520
521   def testRemovingSingleExistingHost(self):
522     RemoveEtcHostsEntry(self.tmpname, 'localhost')
523
524     self.assertFileContent(self.tmpname,
525       "# This is a test file for /etc/hosts\n"
526       "192.168.1.1 router gw\n")
527
528   def testRemovingNonExistingHost(self):
529     RemoveEtcHostsEntry(self.tmpname, 'myhost')
530
531     self.assertFileContent(self.tmpname,
532       "# This is a test file for /etc/hosts\n"
533       "127.0.0.1\tlocalhost\n"
534       "192.168.1.1 router gw\n")
535
536   def testRemovingAlias(self):
537     RemoveEtcHostsEntry(self.tmpname, 'gw')
538
539     self.assertFileContent(self.tmpname,
540       "# This is a test file for /etc/hosts\n"
541       "127.0.0.1\tlocalhost\n"
542       "192.168.1.1 router\n")
543
544
545 class TestShellQuoting(unittest.TestCase):
546   """Test case for shell quoting functions"""
547
548   def testShellQuote(self):
549     self.assertEqual(ShellQuote('abc'), "abc")
550     self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
551     self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
552     self.assertEqual(ShellQuote("a b c"), "'a b c'")
553     self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
554
555   def testShellQuoteArgs(self):
556     self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
557     self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
558     self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
559
560
561 class TestTcpPing(unittest.TestCase):
562   """Testcase for TCP version of ping - against listen(2)ing port"""
563
564   def setUp(self):
565     self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
566     self.listener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
567     self.listenerport = self.listener.getsockname()[1]
568     self.listener.listen(1)
569
570   def tearDown(self):
571     self.listener.shutdown(socket.SHUT_RDWR)
572     del self.listener
573     del self.listenerport
574
575   def testTcpPingToLocalHostAccept(self):
576     self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
577                          self.listenerport,
578                          timeout=10,
579                          live_port_needed=True,
580                          source=constants.LOCALHOST_IP_ADDRESS,
581                          ),
582                  "failed to connect to test listener")
583
584     self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
585                          self.listenerport,
586                          timeout=10,
587                          live_port_needed=True,
588                          ),
589                  "failed to connect to test listener (no source)")
590
591
592 class TestTcpPingDeaf(unittest.TestCase):
593   """Testcase for TCP version of ping - against non listen(2)ing port"""
594
595   def setUp(self):
596     self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
597     self.deaflistener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
598     self.deaflistenerport = self.deaflistener.getsockname()[1]
599
600   def tearDown(self):
601     del self.deaflistener
602     del self.deaflistenerport
603
604   def testTcpPingToLocalHostAcceptDeaf(self):
605     self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
606                         self.deaflistenerport,
607                         timeout=constants.TCP_PING_TIMEOUT,
608                         live_port_needed=True,
609                         source=constants.LOCALHOST_IP_ADDRESS,
610                         ), # need successful connect(2)
611                 "successfully connected to deaf listener")
612
613     self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
614                         self.deaflistenerport,
615                         timeout=constants.TCP_PING_TIMEOUT,
616                         live_port_needed=True,
617                         ), # need successful connect(2)
618                 "successfully connected to deaf listener (no source addr)")
619
620   def testTcpPingToLocalHostNoAccept(self):
621     self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
622                          self.deaflistenerport,
623                          timeout=constants.TCP_PING_TIMEOUT,
624                          live_port_needed=False,
625                          source=constants.LOCALHOST_IP_ADDRESS,
626                          ), # ECONNREFUSED is OK
627                  "failed to ping alive host on deaf port")
628
629     self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
630                          self.deaflistenerport,
631                          timeout=constants.TCP_PING_TIMEOUT,
632                          live_port_needed=False,
633                          ), # ECONNREFUSED is OK
634                  "failed to ping alive host on deaf port (no source addr)")
635
636
637 class TestOwnIpAddress(unittest.TestCase):
638   """Testcase for OwnIpAddress"""
639
640   def testOwnLoopback(self):
641     """check having the loopback ip"""
642     self.failUnless(OwnIpAddress(constants.LOCALHOST_IP_ADDRESS),
643                     "Should own the loopback address")
644
645   def testNowOwnAddress(self):
646     """check that I don't own an address"""
647
648     # network 192.0.2.0/24 is reserved for test/documentation as per
649     # rfc 3330, so we *should* not have an address of this range... if
650     # this fails, we should extend the test to multiple addresses
651     DST_IP = "192.0.2.1"
652     self.failIf(OwnIpAddress(DST_IP), "Should not own IP address %s" % DST_IP)
653
654
655 class TestListVisibleFiles(unittest.TestCase):
656   """Test case for ListVisibleFiles"""
657
658   def setUp(self):
659     self.path = tempfile.mkdtemp()
660
661   def tearDown(self):
662     shutil.rmtree(self.path)
663
664   def _test(self, files, expected):
665     # Sort a copy
666     expected = expected[:]
667     expected.sort()
668
669     for name in files:
670       f = open(os.path.join(self.path, name), 'w')
671       try:
672         f.write("Test\n")
673       finally:
674         f.close()
675
676     found = ListVisibleFiles(self.path)
677     found.sort()
678
679     self.assertEqual(found, expected)
680
681   def testAllVisible(self):
682     files = ["a", "b", "c"]
683     expected = files
684     self._test(files, expected)
685
686   def testNoneVisible(self):
687     files = [".a", ".b", ".c"]
688     expected = []
689     self._test(files, expected)
690
691   def testSomeVisible(self):
692     files = ["a", "b", ".c"]
693     expected = ["a", "b"]
694     self._test(files, expected)
695
696
697 class TestNewUUID(unittest.TestCase):
698   """Test case for NewUUID"""
699
700   _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
701                         '[a-f0-9]{4}-[a-f0-9]{12}$')
702
703   def runTest(self):
704     self.failUnless(self._re_uuid.match(utils.NewUUID()))
705
706
707 class TestUniqueSequence(unittest.TestCase):
708   """Test case for UniqueSequence"""
709
710   def _test(self, input, expected):
711     self.assertEqual(utils.UniqueSequence(input), expected)
712
713   def runTest(self):
714     # Ordered input
715     self._test([1, 2, 3], [1, 2, 3])
716     self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
717     self._test([1, 2, 2, 3], [1, 2, 3])
718     self._test([1, 2, 3, 3], [1, 2, 3])
719
720     # Unordered input
721     self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
722     self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])
723
724     # Strings
725     self._test(["a", "a"], ["a"])
726     self._test(["a", "b"], ["a", "b"])
727     self._test(["a", "b", "a"], ["a", "b"])
728
729
730 class TestFirstFree(unittest.TestCase):
731   """Test case for the FirstFree function"""
732
733   def test(self):
734     """Test FirstFree"""
735     self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
736     self.failUnlessEqual(FirstFree([]), None)
737     self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
738     self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
739     self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
740
741
742 class TestFileLock(unittest.TestCase):
743   """Test case for the FileLock class"""
744
745   def setUp(self):
746     self.tmpfile = tempfile.NamedTemporaryFile()
747     self.lock = utils.FileLock(self.tmpfile.name)
748
749   def testSharedNonblocking(self):
750     self.lock.Shared(blocking=False)
751     self.lock.Close()
752
753   def testExclusiveNonblocking(self):
754     self.lock.Exclusive(blocking=False)
755     self.lock.Close()
756
757   def testUnlockNonblocking(self):
758     self.lock.Unlock(blocking=False)
759     self.lock.Close()
760
761   def testSharedBlocking(self):
762     self.lock.Shared(blocking=True)
763     self.lock.Close()
764
765   def testExclusiveBlocking(self):
766     self.lock.Exclusive(blocking=True)
767     self.lock.Close()
768
769   def testUnlockBlocking(self):
770     self.lock.Unlock(blocking=True)
771     self.lock.Close()
772
773   def testSharedExclusiveUnlock(self):
774     self.lock.Shared(blocking=False)
775     self.lock.Exclusive(blocking=False)
776     self.lock.Unlock(blocking=False)
777     self.lock.Close()
778
779   def testExclusiveSharedUnlock(self):
780     self.lock.Exclusive(blocking=False)
781     self.lock.Shared(blocking=False)
782     self.lock.Unlock(blocking=False)
783     self.lock.Close()
784
785   def testCloseShared(self):
786     self.lock.Close()
787     self.assertRaises(AssertionError, self.lock.Shared, blocking=False)
788
789   def testCloseExclusive(self):
790     self.lock.Close()
791     self.assertRaises(AssertionError, self.lock.Exclusive, blocking=False)
792
793   def testCloseUnlock(self):
794     self.lock.Close()
795     self.assertRaises(AssertionError, self.lock.Unlock, blocking=False)
796
797
798 class TestTimeFunctions(unittest.TestCase):
799   """Test case for time functions"""
800
801   def runTest(self):
802     self.assertEqual(utils.SplitTime(1), (1, 0))
803     self.assertEqual(utils.SplitTime(1.5), (1, 500000))
804     self.assertEqual(utils.SplitTime(1218448917.4809151), (1218448917, 480915))
805     self.assertEqual(utils.SplitTime(123.48012), (123, 480120))
806     self.assertEqual(utils.SplitTime(123.9996), (123, 999600))
807     self.assertEqual(utils.SplitTime(123.9995), (123, 999500))
808     self.assertEqual(utils.SplitTime(123.9994), (123, 999400))
809     self.assertEqual(utils.SplitTime(123.999999999), (123, 999999))
810
811     self.assertRaises(AssertionError, utils.SplitTime, -1)
812
813     self.assertEqual(utils.MergeTime((1, 0)), 1.0)
814     self.assertEqual(utils.MergeTime((1, 500000)), 1.5)
815     self.assertEqual(utils.MergeTime((1218448917, 500000)), 1218448917.5)
816
817     self.assertEqual(round(utils.MergeTime((1218448917, 481000)), 3), 1218448917.481)
818     self.assertEqual(round(utils.MergeTime((1, 801000)), 3), 1.801)
819
820     self.assertRaises(AssertionError, utils.MergeTime, (0, -1))
821     self.assertRaises(AssertionError, utils.MergeTime, (0, 1000000))
822     self.assertRaises(AssertionError, utils.MergeTime, (0, 9999999))
823     self.assertRaises(AssertionError, utils.MergeTime, (-1, 0))
824     self.assertRaises(AssertionError, utils.MergeTime, (-9999, 0))
825
826
827 if __name__ == '__main__':
828   unittest.main()