4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for unittesting the utils module"""
39 from ganeti import constants
40 from ganeti import utils
41 from ganeti.utils import IsProcessAlive, RunCmd, \
42 RemoveFile, CheckDict, MatchNameComponent, FormatUnit, \
43 ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
44 ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
45 SetEtcHostsEntry, RemoveEtcHostsEntry, FirstFree, OwnIpAddress
46 from ganeti.errors import LockError, UnitParseError, GenericError, \
50 class TestIsProcessAlive(unittest.TestCase):
51 """Testing case for IsProcessAlive"""
55 self.assert_(IsProcessAlive(mypid),
56 "can't find myself running")
58 def testNotExisting(self):
59 pid_non_existing = os.fork()
60 if pid_non_existing == 0:
62 elif pid_non_existing < 0:
63 raise SystemError("can't fork")
64 os.waitpid(pid_non_existing, 0)
65 self.assert_(not IsProcessAlive(pid_non_existing),
66 "nonexisting process detected")
69 class TestPidFileFunctions(unittest.TestCase):
70 """Tests for WritePidFile, RemovePidFile and ReadPidFile"""
73 self.dir = tempfile.mkdtemp()
74 self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
75 utils.DaemonPidFileName = self.f_dpn
77 def testPidFileFunctions(self):
78 pid_file = self.f_dpn('test')
79 utils.WritePidFile('test')
80 self.failUnless(os.path.exists(pid_file),
81 "PID file should have been created")
82 read_pid = utils.ReadPidFile(pid_file)
83 self.failUnlessEqual(read_pid, os.getpid())
84 self.failUnless(utils.IsProcessAlive(read_pid))
85 self.failUnlessRaises(GenericError, utils.WritePidFile, 'test')
86 utils.RemovePidFile('test')
87 self.failIf(os.path.exists(pid_file),
88 "PID file should not exist anymore")
89 self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
90 "ReadPidFile should return 0 for missing pid file")
91 fh = open(pid_file, "w")
94 self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
95 "ReadPidFile should return 0 for invalid pid file")
96 utils.RemovePidFile('test')
97 self.failIf(os.path.exists(pid_file),
98 "PID file should not exist anymore")
101 pid_file = self.f_dpn('child')
102 r_fd, w_fd = os.pipe()
104 if new_pid == 0: #child
105 utils.WritePidFile('child')
110 # else we are in the parent
111 # wait until the child has written the pid file
113 read_pid = utils.ReadPidFile(pid_file)
114 self.failUnlessEqual(read_pid, new_pid)
115 self.failUnless(utils.IsProcessAlive(new_pid))
116 utils.KillProcess(new_pid, waitpid=True)
117 self.failIf(utils.IsProcessAlive(new_pid))
118 utils.RemovePidFile('child')
119 self.failUnlessRaises(ProgrammerError, utils.KillProcess, 0)
122 for name in os.listdir(self.dir):
123 os.unlink(os.path.join(self.dir, name))
127 class TestRunCmd(testutils.GanetiTestCase):
128 """Testing case for the RunCmd function"""
131 self.magic = time.ctime() + " ganeti test"
132 fh, self.fname = tempfile.mkstemp()
137 utils.RemoveFile(self.fname)
140 """Test successful exit code"""
141 result = RunCmd("/bin/sh -c 'exit 0'")
142 self.assertEqual(result.exit_code, 0)
143 self.assertEqual(result.output, "")
146 """Test fail exit code"""
147 result = RunCmd("/bin/sh -c 'exit 1'")
148 self.assertEqual(result.exit_code, 1)
149 self.assertEqual(result.output, "")
151 def testStdout(self):
152 """Test standard output"""
153 cmd = 'echo -n "%s"' % self.magic
154 result = RunCmd("/bin/sh -c '%s'" % cmd)
155 self.assertEqual(result.stdout, self.magic)
156 result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
157 self.assertEqual(result.output, "")
158 self.assertFileContent(self.fname, self.magic)
160 def testStderr(self):
161 """Test standard error"""
162 cmd = 'echo -n "%s"' % self.magic
163 result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
164 self.assertEqual(result.stderr, self.magic)
165 result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd, output=self.fname)
166 self.assertEqual(result.output, "")
167 self.assertFileContent(self.fname, self.magic)
169 def testCombined(self):
170 """Test combined output"""
171 cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
172 expected = "A" + self.magic + "B" + self.magic
173 result = RunCmd("/bin/sh -c '%s'" % cmd)
174 self.assertEqual(result.output, expected)
175 result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
176 self.assertEqual(result.output, "")
177 self.assertFileContent(self.fname, expected)
179 def testSignal(self):
181 result = RunCmd(["python", "-c", "import os; os.kill(os.getpid(), 15)"])
182 self.assertEqual(result.signal, 15)
183 self.assertEqual(result.output, "")
185 def testListRun(self):
187 result = RunCmd(["true"])
188 self.assertEqual(result.signal, None)
189 self.assertEqual(result.exit_code, 0)
190 result = RunCmd(["/bin/sh", "-c", "exit 1"])
191 self.assertEqual(result.signal, None)
192 self.assertEqual(result.exit_code, 1)
193 result = RunCmd(["echo", "-n", self.magic])
194 self.assertEqual(result.signal, None)
195 self.assertEqual(result.exit_code, 0)
196 self.assertEqual(result.stdout, self.magic)
198 def testFileEmptyOutput(self):
199 """Test file output"""
200 result = RunCmd(["true"], output=self.fname)
201 self.assertEqual(result.signal, None)
202 self.assertEqual(result.exit_code, 0)
203 self.assertFileContent(self.fname, "")
206 """Test locale environment"""
207 old_env = os.environ.copy()
209 os.environ["LANG"] = "en_US.UTF-8"
210 os.environ["LC_ALL"] = "en_US.UTF-8"
211 result = RunCmd(["locale"])
212 for line in result.output.splitlines():
213 key, value = line.split("=", 1)
214 # Ignore these variables, they're overridden by LC_ALL
215 if key == "LANG" or key == "LANGUAGE":
217 self.failIf(value and value != "C" and value != '"C"',
218 "Variable %s is set to the invalid value '%s'" % (key, value))
222 def testDefaultCwd(self):
223 """Test default working directory"""
224 self.failUnlessEqual(RunCmd(["pwd"]).stdout.strip(), "/")
227 """Test default working directory"""
228 self.failUnlessEqual(RunCmd(["pwd"], cwd="/").stdout.strip(), "/")
229 self.failUnlessEqual(RunCmd(["pwd"], cwd="/tmp").stdout.strip(), "/tmp")
231 self.failUnlessEqual(RunCmd(["pwd"], cwd=cwd).stdout.strip(), cwd)
234 class TestRemoveFile(unittest.TestCase):
235 """Test case for the RemoveFile function"""
238 """Create a temp dir and file for each case"""
239 self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
240 fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
244 if os.path.exists(self.tmpfile):
245 os.unlink(self.tmpfile)
246 os.rmdir(self.tmpdir)
249 def testIgnoreDirs(self):
250 """Test that RemoveFile() ignores directories"""
251 self.assertEqual(None, RemoveFile(self.tmpdir))
254 def testIgnoreNotExisting(self):
255 """Test that RemoveFile() ignores non-existing files"""
256 RemoveFile(self.tmpfile)
257 RemoveFile(self.tmpfile)
260 def testRemoveFile(self):
261 """Test that RemoveFile does remove a file"""
262 RemoveFile(self.tmpfile)
263 if os.path.exists(self.tmpfile):
264 self.fail("File '%s' not removed" % self.tmpfile)
267 def testRemoveSymlink(self):
268 """Test that RemoveFile does remove symlinks"""
269 symlink = self.tmpdir + "/symlink"
270 os.symlink("no-such-file", symlink)
272 if os.path.exists(symlink):
273 self.fail("File '%s' not removed" % symlink)
274 os.symlink(self.tmpfile, symlink)
276 if os.path.exists(symlink):
277 self.fail("File '%s' not removed" % symlink)
280 class TestCheckdict(unittest.TestCase):
281 """Test case for the CheckDict function"""
284 """Test that CheckDict adds a missing key with the correct value"""
289 if 'b' not in tgt or tgt['b'] != 2:
290 self.fail("Failed to update dict")
293 def testNoUpdate(self):
294 """Test that CheckDict does not overwrite an existing key"""
295 tgt = {'a':1, 'b': 3}
298 self.failUnlessEqual(tgt['b'], 3)
301 class TestMatchNameComponent(unittest.TestCase):
302 """Test case for the MatchNameComponent function"""
304 def testEmptyList(self):
305 """Test that there is no match against an empty list"""
307 self.failUnlessEqual(MatchNameComponent("", []), None)
308 self.failUnlessEqual(MatchNameComponent("test", []), None)
310 def testSingleMatch(self):
311 """Test that a single match is performed correctly"""
312 mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
313 for key in "test2", "test2.example", "test2.example.com":
314 self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
316 def testMultipleMatches(self):
317 """Test that a multiple match is returned as None"""
318 mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
319 for key in "test1", "test1.example":
320 self.failUnlessEqual(MatchNameComponent(key, mlist), None)
323 class TestFormatUnit(unittest.TestCase):
324 """Test case for the FormatUnit function"""
327 self.assertEqual(FormatUnit(1), '1M')
328 self.assertEqual(FormatUnit(100), '100M')
329 self.assertEqual(FormatUnit(1023), '1023M')
332 self.assertEqual(FormatUnit(1024), '1.0G')
333 self.assertEqual(FormatUnit(1536), '1.5G')
334 self.assertEqual(FormatUnit(17133), '16.7G')
335 self.assertEqual(FormatUnit(1024 * 1024 - 1), '1024.0G')
338 self.assertEqual(FormatUnit(1024 * 1024), '1.0T')
339 self.assertEqual(FormatUnit(5120 * 1024), '5.0T')
340 self.assertEqual(FormatUnit(29829 * 1024), '29.1T')
343 class TestParseUnit(unittest.TestCase):
344 """Test case for the ParseUnit function"""
347 ('M', 1), ('G', 1024), ('T', 1024 * 1024),
348 ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
349 ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
351 def testRounding(self):
352 self.assertEqual(ParseUnit('0'), 0)
353 self.assertEqual(ParseUnit('1'), 4)
354 self.assertEqual(ParseUnit('2'), 4)
355 self.assertEqual(ParseUnit('3'), 4)
357 self.assertEqual(ParseUnit('124'), 124)
358 self.assertEqual(ParseUnit('125'), 128)
359 self.assertEqual(ParseUnit('126'), 128)
360 self.assertEqual(ParseUnit('127'), 128)
361 self.assertEqual(ParseUnit('128'), 128)
362 self.assertEqual(ParseUnit('129'), 132)
363 self.assertEqual(ParseUnit('130'), 132)
365 def testFloating(self):
366 self.assertEqual(ParseUnit('0'), 0)
367 self.assertEqual(ParseUnit('0.5'), 4)
368 self.assertEqual(ParseUnit('1.75'), 4)
369 self.assertEqual(ParseUnit('1.99'), 4)
370 self.assertEqual(ParseUnit('2.00'), 4)
371 self.assertEqual(ParseUnit('2.01'), 4)
372 self.assertEqual(ParseUnit('3.99'), 4)
373 self.assertEqual(ParseUnit('4.00'), 4)
374 self.assertEqual(ParseUnit('4.01'), 8)
375 self.assertEqual(ParseUnit('1.5G'), 1536)
376 self.assertEqual(ParseUnit('1.8G'), 1844)
377 self.assertEqual(ParseUnit('8.28T'), 8682212)
379 def testSuffixes(self):
380 for sep in ('', ' ', ' ', "\t", "\t "):
381 for suffix, scale in TestParseUnit.SCALES:
382 for func in (lambda x: x, str.lower, str.upper):
383 self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
386 def testInvalidInput(self):
387 for sep in ('-', '_', ',', 'a'):
388 for suffix, _ in TestParseUnit.SCALES:
389 self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
391 for suffix, _ in TestParseUnit.SCALES:
392 self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
395 class TestSshKeys(testutils.GanetiTestCase):
396 """Test case for the AddAuthorizedKey function"""
398 KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
399 KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
400 'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
403 (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test')
405 handle = os.fdopen(fd, 'w')
407 handle.write("%s\n" % TestSshKeys.KEY_A)
408 handle.write("%s\n" % TestSshKeys.KEY_B)
412 utils.RemoveFile(self.tmpname)
416 utils.RemoveFile(self.tmpname)
419 def testAddingNewKey(self):
420 AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
422 self.assertFileContent(self.tmpname,
423 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
424 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
425 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
426 "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
428 def testAddingAlmostButNotCompletelyTheSameKey(self):
429 AddAuthorizedKey(self.tmpname,
430 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
432 self.assertFileContent(self.tmpname,
433 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
434 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
435 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
436 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
438 def testAddingExistingKeyWithSomeMoreSpaces(self):
439 AddAuthorizedKey(self.tmpname,
440 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a')
442 self.assertFileContent(self.tmpname,
443 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
444 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
445 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
447 def testRemovingExistingKeyWithSomeMoreSpaces(self):
448 RemoveAuthorizedKey(self.tmpname,
449 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a')
451 self.assertFileContent(self.tmpname,
452 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
453 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
455 def testRemovingNonExistingKey(self):
456 RemoveAuthorizedKey(self.tmpname,
457 'ssh-dss AAAAB3Nsdfj230xxjxJjsjwjsjdjU root@test')
459 self.assertFileContent(self.tmpname,
460 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
461 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
462 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
465 class TestEtcHosts(testutils.GanetiTestCase):
466 """Test functions modifying /etc/hosts"""
469 (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test')
471 handle = os.fdopen(fd, 'w')
473 handle.write('# This is a test file for /etc/hosts\n')
474 handle.write('127.0.0.1\tlocalhost\n')
475 handle.write('192.168.1.1 router gw\n')
479 utils.RemoveFile(self.tmpname)
483 utils.RemoveFile(self.tmpname)
486 def testSettingNewIp(self):
487 SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
489 self.assertFileContent(self.tmpname,
490 "# This is a test file for /etc/hosts\n"
491 "127.0.0.1\tlocalhost\n"
492 "192.168.1.1 router gw\n"
493 "1.2.3.4\tmyhost.domain.tld myhost\n")
495 def testSettingExistingIp(self):
496 SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld',
499 self.assertFileContent(self.tmpname,
500 "# This is a test file for /etc/hosts\n"
501 "127.0.0.1\tlocalhost\n"
502 "192.168.1.1\tmyhost.domain.tld myhost\n")
504 def testSettingDuplicateName(self):
505 SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost'])
507 self.assertFileContent(self.tmpname,
508 "# This is a test file for /etc/hosts\n"
509 "127.0.0.1\tlocalhost\n"
510 "192.168.1.1 router gw\n"
513 def testRemovingExistingHost(self):
514 RemoveEtcHostsEntry(self.tmpname, 'router')
516 self.assertFileContent(self.tmpname,
517 "# This is a test file for /etc/hosts\n"
518 "127.0.0.1\tlocalhost\n"
521 def testRemovingSingleExistingHost(self):
522 RemoveEtcHostsEntry(self.tmpname, 'localhost')
524 self.assertFileContent(self.tmpname,
525 "# This is a test file for /etc/hosts\n"
526 "192.168.1.1 router gw\n")
528 def testRemovingNonExistingHost(self):
529 RemoveEtcHostsEntry(self.tmpname, 'myhost')
531 self.assertFileContent(self.tmpname,
532 "# This is a test file for /etc/hosts\n"
533 "127.0.0.1\tlocalhost\n"
534 "192.168.1.1 router gw\n")
536 def testRemovingAlias(self):
537 RemoveEtcHostsEntry(self.tmpname, 'gw')
539 self.assertFileContent(self.tmpname,
540 "# This is a test file for /etc/hosts\n"
541 "127.0.0.1\tlocalhost\n"
542 "192.168.1.1 router\n")
545 class TestShellQuoting(unittest.TestCase):
546 """Test case for shell quoting functions"""
548 def testShellQuote(self):
549 self.assertEqual(ShellQuote('abc'), "abc")
550 self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
551 self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
552 self.assertEqual(ShellQuote("a b c"), "'a b c'")
553 self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
555 def testShellQuoteArgs(self):
556 self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
557 self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
558 self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
561 class TestTcpPing(unittest.TestCase):
562 """Testcase for TCP version of ping - against listen(2)ing port"""
565 self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
566 self.listener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
567 self.listenerport = self.listener.getsockname()[1]
568 self.listener.listen(1)
571 self.listener.shutdown(socket.SHUT_RDWR)
573 del self.listenerport
575 def testTcpPingToLocalHostAccept(self):
576 self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
579 live_port_needed=True,
580 source=constants.LOCALHOST_IP_ADDRESS,
582 "failed to connect to test listener")
584 self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
587 live_port_needed=True,
589 "failed to connect to test listener (no source)")
592 class TestTcpPingDeaf(unittest.TestCase):
593 """Testcase for TCP version of ping - against non listen(2)ing port"""
596 self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
597 self.deaflistener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
598 self.deaflistenerport = self.deaflistener.getsockname()[1]
601 del self.deaflistener
602 del self.deaflistenerport
604 def testTcpPingToLocalHostAcceptDeaf(self):
605 self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
606 self.deaflistenerport,
607 timeout=constants.TCP_PING_TIMEOUT,
608 live_port_needed=True,
609 source=constants.LOCALHOST_IP_ADDRESS,
610 ), # need successful connect(2)
611 "successfully connected to deaf listener")
613 self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
614 self.deaflistenerport,
615 timeout=constants.TCP_PING_TIMEOUT,
616 live_port_needed=True,
617 ), # need successful connect(2)
618 "successfully connected to deaf listener (no source addr)")
620 def testTcpPingToLocalHostNoAccept(self):
621 self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
622 self.deaflistenerport,
623 timeout=constants.TCP_PING_TIMEOUT,
624 live_port_needed=False,
625 source=constants.LOCALHOST_IP_ADDRESS,
626 ), # ECONNREFUSED is OK
627 "failed to ping alive host on deaf port")
629 self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
630 self.deaflistenerport,
631 timeout=constants.TCP_PING_TIMEOUT,
632 live_port_needed=False,
633 ), # ECONNREFUSED is OK
634 "failed to ping alive host on deaf port (no source addr)")
637 class TestOwnIpAddress(unittest.TestCase):
638 """Testcase for OwnIpAddress"""
640 def testOwnLoopback(self):
641 """check having the loopback ip"""
642 self.failUnless(OwnIpAddress(constants.LOCALHOST_IP_ADDRESS),
643 "Should own the loopback address")
645 def testNowOwnAddress(self):
646 """check that I don't own an address"""
648 # network 192.0.2.0/24 is reserved for test/documentation as per
649 # rfc 3330, so we *should* not have an address of this range... if
650 # this fails, we should extend the test to multiple addresses
652 self.failIf(OwnIpAddress(DST_IP), "Should not own IP address %s" % DST_IP)
655 class TestListVisibleFiles(unittest.TestCase):
656 """Test case for ListVisibleFiles"""
659 self.path = tempfile.mkdtemp()
662 shutil.rmtree(self.path)
664 def _test(self, files, expected):
666 expected = expected[:]
670 f = open(os.path.join(self.path, name), 'w')
676 found = ListVisibleFiles(self.path)
679 self.assertEqual(found, expected)
681 def testAllVisible(self):
682 files = ["a", "b", "c"]
684 self._test(files, expected)
686 def testNoneVisible(self):
687 files = [".a", ".b", ".c"]
689 self._test(files, expected)
691 def testSomeVisible(self):
692 files = ["a", "b", ".c"]
693 expected = ["a", "b"]
694 self._test(files, expected)
697 class TestNewUUID(unittest.TestCase):
698 """Test case for NewUUID"""
700 _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
701 '[a-f0-9]{4}-[a-f0-9]{12}$')
704 self.failUnless(self._re_uuid.match(utils.NewUUID()))
707 class TestUniqueSequence(unittest.TestCase):
708 """Test case for UniqueSequence"""
710 def _test(self, input, expected):
711 self.assertEqual(utils.UniqueSequence(input), expected)
715 self._test([1, 2, 3], [1, 2, 3])
716 self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
717 self._test([1, 2, 2, 3], [1, 2, 3])
718 self._test([1, 2, 3, 3], [1, 2, 3])
721 self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
722 self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])
725 self._test(["a", "a"], ["a"])
726 self._test(["a", "b"], ["a", "b"])
727 self._test(["a", "b", "a"], ["a", "b"])
730 class TestFirstFree(unittest.TestCase):
731 """Test case for the FirstFree function"""
735 self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
736 self.failUnlessEqual(FirstFree([]), None)
737 self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
738 self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
739 self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
742 class TestFileLock(unittest.TestCase):
743 """Test case for the FileLock class"""
746 self.tmpfile = tempfile.NamedTemporaryFile()
747 self.lock = utils.FileLock(self.tmpfile.name)
749 def testSharedNonblocking(self):
750 self.lock.Shared(blocking=False)
753 def testExclusiveNonblocking(self):
754 self.lock.Exclusive(blocking=False)
757 def testUnlockNonblocking(self):
758 self.lock.Unlock(blocking=False)
761 def testSharedBlocking(self):
762 self.lock.Shared(blocking=True)
765 def testExclusiveBlocking(self):
766 self.lock.Exclusive(blocking=True)
769 def testUnlockBlocking(self):
770 self.lock.Unlock(blocking=True)
773 def testSharedExclusiveUnlock(self):
774 self.lock.Shared(blocking=False)
775 self.lock.Exclusive(blocking=False)
776 self.lock.Unlock(blocking=False)
779 def testExclusiveSharedUnlock(self):
780 self.lock.Exclusive(blocking=False)
781 self.lock.Shared(blocking=False)
782 self.lock.Unlock(blocking=False)
785 def testCloseShared(self):
787 self.assertRaises(AssertionError, self.lock.Shared, blocking=False)
789 def testCloseExclusive(self):
791 self.assertRaises(AssertionError, self.lock.Exclusive, blocking=False)
793 def testCloseUnlock(self):
795 self.assertRaises(AssertionError, self.lock.Unlock, blocking=False)
798 class TestTimeFunctions(unittest.TestCase):
799 """Test case for time functions"""
802 self.assertEqual(utils.SplitTime(1), (1, 0))
803 self.assertEqual(utils.SplitTime(1.5), (1, 500000))
804 self.assertEqual(utils.SplitTime(1218448917.4809151), (1218448917, 480915))
805 self.assertEqual(utils.SplitTime(123.48012), (123, 480120))
806 self.assertEqual(utils.SplitTime(123.9996), (123, 999600))
807 self.assertEqual(utils.SplitTime(123.9995), (123, 999500))
808 self.assertEqual(utils.SplitTime(123.9994), (123, 999400))
809 self.assertEqual(utils.SplitTime(123.999999999), (123, 999999))
811 self.assertRaises(AssertionError, utils.SplitTime, -1)
813 self.assertEqual(utils.MergeTime((1, 0)), 1.0)
814 self.assertEqual(utils.MergeTime((1, 500000)), 1.5)
815 self.assertEqual(utils.MergeTime((1218448917, 500000)), 1218448917.5)
817 self.assertEqual(round(utils.MergeTime((1218448917, 481000)), 3), 1218448917.481)
818 self.assertEqual(round(utils.MergeTime((1, 801000)), 3), 1.801)
820 self.assertRaises(AssertionError, utils.MergeTime, (0, -1))
821 self.assertRaises(AssertionError, utils.MergeTime, (0, 1000000))
822 self.assertRaises(AssertionError, utils.MergeTime, (0, 9999999))
823 self.assertRaises(AssertionError, utils.MergeTime, (-1, 0))
824 self.assertRaises(AssertionError, utils.MergeTime, (-9999, 0))
827 if __name__ == '__main__':