4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for unittesting the utils module"""
39 from ganeti import constants
40 from ganeti import utils
41 from ganeti.utils import IsProcessAlive, RunCmd, \
42 RemoveFile, CheckDict, MatchNameComponent, FormatUnit, \
43 ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
44 ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
45 SetEtcHostsEntry, RemoveEtcHostsEntry, FirstFree, OwnIpAddress
46 from ganeti.errors import LockError, UnitParseError, GenericError, \
50 class TestIsProcessAlive(unittest.TestCase):
51 """Testing case for IsProcessAlive"""
53 def _CreateZombie(self):
55 r_fd, w_fd = os.pipe()
56 pid_zombie = os.fork()
58 # explicit close of read, write end will be closed only due to exit
62 raise SystemError("can't fork")
63 # parent: we close our end of the w_fd, so reads will error as
64 # soon as the OS cleans the child's filedescriptors on its exit
66 # wait for 60 seconds at max for the exit (pathological case, just
67 # so that the test doesn't hang indefinitely)
68 r_set, w_set, e_set = select.select([r_fd], [], [], 60)
69 if not r_set and not e_set:
70 self.fail("Timeout exceeded in zombie creation")
75 self.assert_(IsProcessAlive(mypid),
76 "can't find myself running")
79 pid_zombie = self._CreateZombie()
80 is_zombie = not IsProcessAlive(pid_zombie)
81 self.assert_(is_zombie, "zombie not detected as zombie")
82 os.waitpid(pid_zombie, os.WNOHANG)
84 def testNotExisting(self):
85 pid_non_existing = os.fork()
86 if pid_non_existing == 0:
88 elif pid_non_existing < 0:
89 raise SystemError("can't fork")
90 os.waitpid(pid_non_existing, 0)
91 self.assert_(not IsProcessAlive(pid_non_existing),
92 "nonexisting process detected")
95 class TestPidFileFunctions(unittest.TestCase):
96 """Tests for WritePidFile, RemovePidFile and ReadPidFile"""
99 self.dir = tempfile.mkdtemp()
100 self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
101 utils.DaemonPidFileName = self.f_dpn
103 def testPidFileFunctions(self):
104 pid_file = self.f_dpn('test')
105 utils.WritePidFile('test')
106 self.failUnless(os.path.exists(pid_file),
107 "PID file should have been created")
108 read_pid = utils.ReadPidFile(pid_file)
109 self.failUnlessEqual(read_pid, os.getpid())
110 self.failUnless(utils.IsProcessAlive(read_pid))
111 self.failUnlessRaises(GenericError, utils.WritePidFile, 'test')
112 utils.RemovePidFile('test')
113 self.failIf(os.path.exists(pid_file),
114 "PID file should not exist anymore")
115 self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
116 "ReadPidFile should return 0 for missing pid file")
117 fh = open(pid_file, "w")
120 self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
121 "ReadPidFile should return 0 for invalid pid file")
122 utils.RemovePidFile('test')
123 self.failIf(os.path.exists(pid_file),
124 "PID file should not exist anymore")
127 pid_file = self.f_dpn('child')
128 r_fd, w_fd = os.pipe()
130 if new_pid == 0: #child
131 utils.WritePidFile('child')
136 # else we are in the parent
137 # wait until the child has written the pid file
139 read_pid = utils.ReadPidFile(pid_file)
140 self.failUnlessEqual(read_pid, new_pid)
141 self.failUnless(utils.IsProcessAlive(new_pid))
142 utils.KillProcess(new_pid, waitpid=True)
143 self.failIf(utils.IsProcessAlive(new_pid))
144 utils.RemovePidFile('child')
145 self.failUnlessRaises(ProgrammerError, utils.KillProcess, 0)
148 for name in os.listdir(self.dir):
149 os.unlink(os.path.join(self.dir, name))
153 class TestRunCmd(testutils.GanetiTestCase):
154 """Testing case for the RunCmd function"""
157 self.magic = time.ctime() + " ganeti test"
158 fh, self.fname = tempfile.mkstemp()
163 utils.RemoveFile(self.fname)
166 """Test successful exit code"""
167 result = RunCmd("/bin/sh -c 'exit 0'")
168 self.assertEqual(result.exit_code, 0)
169 self.assertEqual(result.output, "")
172 """Test fail exit code"""
173 result = RunCmd("/bin/sh -c 'exit 1'")
174 self.assertEqual(result.exit_code, 1)
175 self.assertEqual(result.output, "")
177 def testStdout(self):
178 """Test standard output"""
179 cmd = 'echo -n "%s"' % self.magic
180 result = RunCmd("/bin/sh -c '%s'" % cmd)
181 self.assertEqual(result.stdout, self.magic)
182 result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
183 self.assertEqual(result.output, "")
184 self.assertFileContent(self.fname, self.magic)
186 def testStderr(self):
187 """Test standard error"""
188 cmd = 'echo -n "%s"' % self.magic
189 result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
190 self.assertEqual(result.stderr, self.magic)
191 result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd, output=self.fname)
192 self.assertEqual(result.output, "")
193 self.assertFileContent(self.fname, self.magic)
195 def testCombined(self):
196 """Test combined output"""
197 cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
198 expected = "A" + self.magic + "B" + self.magic
199 result = RunCmd("/bin/sh -c '%s'" % cmd)
200 self.assertEqual(result.output, expected)
201 result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
202 self.assertEqual(result.output, "")
203 self.assertFileContent(self.fname, expected)
205 def testSignal(self):
207 result = RunCmd(["python", "-c", "import os; os.kill(os.getpid(), 15)"])
208 self.assertEqual(result.signal, 15)
209 self.assertEqual(result.output, "")
211 def testListRun(self):
213 result = RunCmd(["true"])
214 self.assertEqual(result.signal, None)
215 self.assertEqual(result.exit_code, 0)
216 result = RunCmd(["/bin/sh", "-c", "exit 1"])
217 self.assertEqual(result.signal, None)
218 self.assertEqual(result.exit_code, 1)
219 result = RunCmd(["echo", "-n", self.magic])
220 self.assertEqual(result.signal, None)
221 self.assertEqual(result.exit_code, 0)
222 self.assertEqual(result.stdout, self.magic)
224 def testFileEmptyOutput(self):
225 """Test file output"""
226 result = RunCmd(["true"], output=self.fname)
227 self.assertEqual(result.signal, None)
228 self.assertEqual(result.exit_code, 0)
229 self.assertFileContent(self.fname, "")
232 """Test locale environment"""
233 old_env = os.environ.copy()
235 os.environ["LANG"] = "en_US.UTF-8"
236 os.environ["LC_ALL"] = "en_US.UTF-8"
237 result = RunCmd(["locale"])
238 for line in result.output.splitlines():
239 key, value = line.split("=", 1)
240 # Ignore these variables, they're overridden by LC_ALL
241 if key == "LANG" or key == "LANGUAGE":
243 self.failIf(value and value != "C" and value != '"C"',
244 "Variable %s is set to the invalid value '%s'" % (key, value))
248 def testDefaultCwd(self):
249 """Test default working directory"""
250 self.failUnlessEqual(RunCmd(["pwd"]).stdout.strip(), "/")
253 """Test default working directory"""
254 self.failUnlessEqual(RunCmd(["pwd"], cwd="/").stdout.strip(), "/")
255 self.failUnlessEqual(RunCmd(["pwd"], cwd="/tmp").stdout.strip(), "/tmp")
257 self.failUnlessEqual(RunCmd(["pwd"], cwd=cwd).stdout.strip(), cwd)
260 class TestRemoveFile(unittest.TestCase):
261 """Test case for the RemoveFile function"""
264 """Create a temp dir and file for each case"""
265 self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
266 fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
270 if os.path.exists(self.tmpfile):
271 os.unlink(self.tmpfile)
272 os.rmdir(self.tmpdir)
275 def testIgnoreDirs(self):
276 """Test that RemoveFile() ignores directories"""
277 self.assertEqual(None, RemoveFile(self.tmpdir))
280 def testIgnoreNotExisting(self):
281 """Test that RemoveFile() ignores non-existing files"""
282 RemoveFile(self.tmpfile)
283 RemoveFile(self.tmpfile)
286 def testRemoveFile(self):
287 """Test that RemoveFile does remove a file"""
288 RemoveFile(self.tmpfile)
289 if os.path.exists(self.tmpfile):
290 self.fail("File '%s' not removed" % self.tmpfile)
293 def testRemoveSymlink(self):
294 """Test that RemoveFile does remove symlinks"""
295 symlink = self.tmpdir + "/symlink"
296 os.symlink("no-such-file", symlink)
298 if os.path.exists(symlink):
299 self.fail("File '%s' not removed" % symlink)
300 os.symlink(self.tmpfile, symlink)
302 if os.path.exists(symlink):
303 self.fail("File '%s' not removed" % symlink)
306 class TestCheckdict(unittest.TestCase):
307 """Test case for the CheckDict function"""
310 """Test that CheckDict adds a missing key with the correct value"""
315 if 'b' not in tgt or tgt['b'] != 2:
316 self.fail("Failed to update dict")
319 def testNoUpdate(self):
320 """Test that CheckDict does not overwrite an existing key"""
321 tgt = {'a':1, 'b': 3}
324 self.failUnlessEqual(tgt['b'], 3)
327 class TestMatchNameComponent(unittest.TestCase):
328 """Test case for the MatchNameComponent function"""
330 def testEmptyList(self):
331 """Test that there is no match against an empty list"""
333 self.failUnlessEqual(MatchNameComponent("", []), None)
334 self.failUnlessEqual(MatchNameComponent("test", []), None)
336 def testSingleMatch(self):
337 """Test that a single match is performed correctly"""
338 mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
339 for key in "test2", "test2.example", "test2.example.com":
340 self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
342 def testMultipleMatches(self):
343 """Test that a multiple match is returned as None"""
344 mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
345 for key in "test1", "test1.example":
346 self.failUnlessEqual(MatchNameComponent(key, mlist), None)
349 class TestFormatUnit(unittest.TestCase):
350 """Test case for the FormatUnit function"""
353 self.assertEqual(FormatUnit(1), '1M')
354 self.assertEqual(FormatUnit(100), '100M')
355 self.assertEqual(FormatUnit(1023), '1023M')
358 self.assertEqual(FormatUnit(1024), '1.0G')
359 self.assertEqual(FormatUnit(1536), '1.5G')
360 self.assertEqual(FormatUnit(17133), '16.7G')
361 self.assertEqual(FormatUnit(1024 * 1024 - 1), '1024.0G')
364 self.assertEqual(FormatUnit(1024 * 1024), '1.0T')
365 self.assertEqual(FormatUnit(5120 * 1024), '5.0T')
366 self.assertEqual(FormatUnit(29829 * 1024), '29.1T')
369 class TestParseUnit(unittest.TestCase):
370 """Test case for the ParseUnit function"""
373 ('M', 1), ('G', 1024), ('T', 1024 * 1024),
374 ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
375 ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
377 def testRounding(self):
378 self.assertEqual(ParseUnit('0'), 0)
379 self.assertEqual(ParseUnit('1'), 4)
380 self.assertEqual(ParseUnit('2'), 4)
381 self.assertEqual(ParseUnit('3'), 4)
383 self.assertEqual(ParseUnit('124'), 124)
384 self.assertEqual(ParseUnit('125'), 128)
385 self.assertEqual(ParseUnit('126'), 128)
386 self.assertEqual(ParseUnit('127'), 128)
387 self.assertEqual(ParseUnit('128'), 128)
388 self.assertEqual(ParseUnit('129'), 132)
389 self.assertEqual(ParseUnit('130'), 132)
391 def testFloating(self):
392 self.assertEqual(ParseUnit('0'), 0)
393 self.assertEqual(ParseUnit('0.5'), 4)
394 self.assertEqual(ParseUnit('1.75'), 4)
395 self.assertEqual(ParseUnit('1.99'), 4)
396 self.assertEqual(ParseUnit('2.00'), 4)
397 self.assertEqual(ParseUnit('2.01'), 4)
398 self.assertEqual(ParseUnit('3.99'), 4)
399 self.assertEqual(ParseUnit('4.00'), 4)
400 self.assertEqual(ParseUnit('4.01'), 8)
401 self.assertEqual(ParseUnit('1.5G'), 1536)
402 self.assertEqual(ParseUnit('1.8G'), 1844)
403 self.assertEqual(ParseUnit('8.28T'), 8682212)
405 def testSuffixes(self):
406 for sep in ('', ' ', ' ', "\t", "\t "):
407 for suffix, scale in TestParseUnit.SCALES:
408 for func in (lambda x: x, str.lower, str.upper):
409 self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
412 def testInvalidInput(self):
413 for sep in ('-', '_', ',', 'a'):
414 for suffix, _ in TestParseUnit.SCALES:
415 self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
417 for suffix, _ in TestParseUnit.SCALES:
418 self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
421 class TestSshKeys(testutils.GanetiTestCase):
422 """Test case for the AddAuthorizedKey function"""
424 KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
425 KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
426 'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
429 (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test')
431 handle = os.fdopen(fd, 'w')
433 handle.write("%s\n" % TestSshKeys.KEY_A)
434 handle.write("%s\n" % TestSshKeys.KEY_B)
438 utils.RemoveFile(self.tmpname)
442 utils.RemoveFile(self.tmpname)
445 def testAddingNewKey(self):
446 AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
448 self.assertFileContent(self.tmpname,
449 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
450 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
451 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
452 "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
454 def testAddingAlmostButNotCompletelyTheSameKey(self):
455 AddAuthorizedKey(self.tmpname,
456 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
458 self.assertFileContent(self.tmpname,
459 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
460 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
461 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
462 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
464 def testAddingExistingKeyWithSomeMoreSpaces(self):
465 AddAuthorizedKey(self.tmpname,
466 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a')
468 self.assertFileContent(self.tmpname,
469 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
470 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
471 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
473 def testRemovingExistingKeyWithSomeMoreSpaces(self):
474 RemoveAuthorizedKey(self.tmpname,
475 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a')
477 self.assertFileContent(self.tmpname,
478 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
479 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
481 def testRemovingNonExistingKey(self):
482 RemoveAuthorizedKey(self.tmpname,
483 'ssh-dss AAAAB3Nsdfj230xxjxJjsjwjsjdjU root@test')
485 self.assertFileContent(self.tmpname,
486 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
487 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
488 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
491 class TestEtcHosts(testutils.GanetiTestCase):
492 """Test functions modifying /etc/hosts"""
495 (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test')
497 handle = os.fdopen(fd, 'w')
499 handle.write('# This is a test file for /etc/hosts\n')
500 handle.write('127.0.0.1\tlocalhost\n')
501 handle.write('192.168.1.1 router gw\n')
505 utils.RemoveFile(self.tmpname)
509 utils.RemoveFile(self.tmpname)
512 def testSettingNewIp(self):
513 SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
515 self.assertFileContent(self.tmpname,
516 "# This is a test file for /etc/hosts\n"
517 "127.0.0.1\tlocalhost\n"
518 "192.168.1.1 router gw\n"
519 "1.2.3.4\tmyhost.domain.tld myhost\n")
521 def testSettingExistingIp(self):
522 SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld',
525 self.assertFileContent(self.tmpname,
526 "# This is a test file for /etc/hosts\n"
527 "127.0.0.1\tlocalhost\n"
528 "192.168.1.1\tmyhost.domain.tld myhost\n")
530 def testSettingDuplicateName(self):
531 SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost'])
533 self.assertFileContent(self.tmpname,
534 "# This is a test file for /etc/hosts\n"
535 "127.0.0.1\tlocalhost\n"
536 "192.168.1.1 router gw\n"
539 def testRemovingExistingHost(self):
540 RemoveEtcHostsEntry(self.tmpname, 'router')
542 self.assertFileContent(self.tmpname,
543 "# This is a test file for /etc/hosts\n"
544 "127.0.0.1\tlocalhost\n"
547 def testRemovingSingleExistingHost(self):
548 RemoveEtcHostsEntry(self.tmpname, 'localhost')
550 self.assertFileContent(self.tmpname,
551 "# This is a test file for /etc/hosts\n"
552 "192.168.1.1 router gw\n")
554 def testRemovingNonExistingHost(self):
555 RemoveEtcHostsEntry(self.tmpname, 'myhost')
557 self.assertFileContent(self.tmpname,
558 "# This is a test file for /etc/hosts\n"
559 "127.0.0.1\tlocalhost\n"
560 "192.168.1.1 router gw\n")
562 def testRemovingAlias(self):
563 RemoveEtcHostsEntry(self.tmpname, 'gw')
565 self.assertFileContent(self.tmpname,
566 "# This is a test file for /etc/hosts\n"
567 "127.0.0.1\tlocalhost\n"
568 "192.168.1.1 router\n")
571 class TestShellQuoting(unittest.TestCase):
572 """Test case for shell quoting functions"""
574 def testShellQuote(self):
575 self.assertEqual(ShellQuote('abc'), "abc")
576 self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
577 self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
578 self.assertEqual(ShellQuote("a b c"), "'a b c'")
579 self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
581 def testShellQuoteArgs(self):
582 self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
583 self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
584 self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
587 class TestTcpPing(unittest.TestCase):
588 """Testcase for TCP version of ping - against listen(2)ing port"""
591 self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
592 self.listener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
593 self.listenerport = self.listener.getsockname()[1]
594 self.listener.listen(1)
597 self.listener.shutdown(socket.SHUT_RDWR)
599 del self.listenerport
601 def testTcpPingToLocalHostAccept(self):
602 self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
605 live_port_needed=True,
606 source=constants.LOCALHOST_IP_ADDRESS,
608 "failed to connect to test listener")
610 self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
613 live_port_needed=True,
615 "failed to connect to test listener (no source)")
618 class TestTcpPingDeaf(unittest.TestCase):
619 """Testcase for TCP version of ping - against non listen(2)ing port"""
622 self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
623 self.deaflistener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
624 self.deaflistenerport = self.deaflistener.getsockname()[1]
627 del self.deaflistener
628 del self.deaflistenerport
630 def testTcpPingToLocalHostAcceptDeaf(self):
631 self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
632 self.deaflistenerport,
633 timeout=constants.TCP_PING_TIMEOUT,
634 live_port_needed=True,
635 source=constants.LOCALHOST_IP_ADDRESS,
636 ), # need successful connect(2)
637 "successfully connected to deaf listener")
639 self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
640 self.deaflistenerport,
641 timeout=constants.TCP_PING_TIMEOUT,
642 live_port_needed=True,
643 ), # need successful connect(2)
644 "successfully connected to deaf listener (no source addr)")
646 def testTcpPingToLocalHostNoAccept(self):
647 self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
648 self.deaflistenerport,
649 timeout=constants.TCP_PING_TIMEOUT,
650 live_port_needed=False,
651 source=constants.LOCALHOST_IP_ADDRESS,
652 ), # ECONNREFUSED is OK
653 "failed to ping alive host on deaf port")
655 self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
656 self.deaflistenerport,
657 timeout=constants.TCP_PING_TIMEOUT,
658 live_port_needed=False,
659 ), # ECONNREFUSED is OK
660 "failed to ping alive host on deaf port (no source addr)")
663 class TestOwnIpAddress(unittest.TestCase):
664 """Testcase for OwnIpAddress"""
666 def testOwnLoopback(self):
667 """check having the loopback ip"""
668 self.failUnless(OwnIpAddress(constants.LOCALHOST_IP_ADDRESS),
669 "Should own the loopback address")
671 def testNowOwnAddress(self):
672 """check that I don't own an address"""
674 # network 192.0.2.0/24 is reserved for test/documentation as per
675 # rfc 3330, so we *should* not have an address of this range... if
676 # this fails, we should extend the test to multiple addresses
678 self.failIf(OwnIpAddress(DST_IP), "Should not own IP address %s" % DST_IP)
681 class TestListVisibleFiles(unittest.TestCase):
682 """Test case for ListVisibleFiles"""
685 self.path = tempfile.mkdtemp()
688 shutil.rmtree(self.path)
690 def _test(self, files, expected):
692 expected = expected[:]
696 f = open(os.path.join(self.path, name), 'w')
702 found = ListVisibleFiles(self.path)
705 self.assertEqual(found, expected)
707 def testAllVisible(self):
708 files = ["a", "b", "c"]
710 self._test(files, expected)
712 def testNoneVisible(self):
713 files = [".a", ".b", ".c"]
715 self._test(files, expected)
717 def testSomeVisible(self):
718 files = ["a", "b", ".c"]
719 expected = ["a", "b"]
720 self._test(files, expected)
723 class TestNewUUID(unittest.TestCase):
724 """Test case for NewUUID"""
726 _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
727 '[a-f0-9]{4}-[a-f0-9]{12}$')
730 self.failUnless(self._re_uuid.match(utils.NewUUID()))
733 class TestUniqueSequence(unittest.TestCase):
734 """Test case for UniqueSequence"""
736 def _test(self, input, expected):
737 self.assertEqual(utils.UniqueSequence(input), expected)
741 self._test([1, 2, 3], [1, 2, 3])
742 self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
743 self._test([1, 2, 2, 3], [1, 2, 3])
744 self._test([1, 2, 3, 3], [1, 2, 3])
747 self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
748 self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])
751 self._test(["a", "a"], ["a"])
752 self._test(["a", "b"], ["a", "b"])
753 self._test(["a", "b", "a"], ["a", "b"])
756 class TestFirstFree(unittest.TestCase):
757 """Test case for the FirstFree function"""
761 self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
762 self.failUnlessEqual(FirstFree([]), None)
763 self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
764 self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
765 self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
768 class TestFileLock(unittest.TestCase):
769 """Test case for the FileLock class"""
772 self.tmpfile = tempfile.NamedTemporaryFile()
773 self.lock = utils.FileLock(self.tmpfile.name)
775 def testSharedNonblocking(self):
776 self.lock.Shared(blocking=False)
779 def testExclusiveNonblocking(self):
780 self.lock.Exclusive(blocking=False)
783 def testUnlockNonblocking(self):
784 self.lock.Unlock(blocking=False)
787 def testSharedBlocking(self):
788 self.lock.Shared(blocking=True)
791 def testExclusiveBlocking(self):
792 self.lock.Exclusive(blocking=True)
795 def testUnlockBlocking(self):
796 self.lock.Unlock(blocking=True)
799 def testSharedExclusiveUnlock(self):
800 self.lock.Shared(blocking=False)
801 self.lock.Exclusive(blocking=False)
802 self.lock.Unlock(blocking=False)
805 def testExclusiveSharedUnlock(self):
806 self.lock.Exclusive(blocking=False)
807 self.lock.Shared(blocking=False)
808 self.lock.Unlock(blocking=False)
811 def testCloseShared(self):
813 self.assertRaises(AssertionError, self.lock.Shared, blocking=False)
815 def testCloseExclusive(self):
817 self.assertRaises(AssertionError, self.lock.Exclusive, blocking=False)
819 def testCloseUnlock(self):
821 self.assertRaises(AssertionError, self.lock.Unlock, blocking=False)
824 class TestTimeFunctions(unittest.TestCase):
825 """Test case for time functions"""
828 self.assertEqual(utils.SplitTime(1), (1, 0))
829 self.assertEqual(utils.SplitTime(1.5), (1, 500000))
830 self.assertEqual(utils.SplitTime(1218448917.4809151), (1218448917, 480915))
831 self.assertEqual(utils.SplitTime(123.48012), (123, 480120))
832 self.assertEqual(utils.SplitTime(123.9996), (123, 999600))
833 self.assertEqual(utils.SplitTime(123.9995), (123, 999500))
834 self.assertEqual(utils.SplitTime(123.9994), (123, 999400))
835 self.assertEqual(utils.SplitTime(123.999999999), (123, 999999))
837 self.assertRaises(AssertionError, utils.SplitTime, -1)
839 self.assertEqual(utils.MergeTime((1, 0)), 1.0)
840 self.assertEqual(utils.MergeTime((1, 500000)), 1.5)
841 self.assertEqual(utils.MergeTime((1218448917, 500000)), 1218448917.5)
843 self.assertEqual(round(utils.MergeTime((1218448917, 481000)), 3), 1218448917.481)
844 self.assertEqual(round(utils.MergeTime((1, 801000)), 3), 1.801)
846 self.assertRaises(AssertionError, utils.MergeTime, (0, -1))
847 self.assertRaises(AssertionError, utils.MergeTime, (0, 1000000))
848 self.assertRaises(AssertionError, utils.MergeTime, (0, 9999999))
849 self.assertRaises(AssertionError, utils.MergeTime, (-1, 0))
850 self.assertRaises(AssertionError, utils.MergeTime, (-9999, 0))
853 if __name__ == '__main__':