4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for unittesting the utils module"""
39 from ganeti import constants
40 from ganeti import utils
41 from ganeti.utils import IsProcessAlive, RunCmd, \
42 RemoveFile, CheckDict, MatchNameComponent, FormatUnit, \
43 ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
44 ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
45 SetEtcHostsEntry, RemoveEtcHostsEntry, FirstFree, OwnIpAddress
46 from ganeti.errors import LockError, UnitParseError, GenericError, \
50 class TestIsProcessAlive(unittest.TestCase):
51 """Testing case for IsProcessAlive"""
55 self.assert_(IsProcessAlive(mypid),
56 "can't find myself running")
58 def testNotExisting(self):
59 pid_non_existing = os.fork()
60 if pid_non_existing == 0:
62 elif pid_non_existing < 0:
63 raise SystemError("can't fork")
64 os.waitpid(pid_non_existing, 0)
65 self.assert_(not IsProcessAlive(pid_non_existing),
66 "nonexisting process detected")
69 class TestPidFileFunctions(unittest.TestCase):
70 """Tests for WritePidFile, RemovePidFile and ReadPidFile"""
73 self.dir = tempfile.mkdtemp()
74 self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
75 utils.DaemonPidFileName = self.f_dpn
77 def testPidFileFunctions(self):
78 pid_file = self.f_dpn('test')
79 utils.WritePidFile('test')
80 self.failUnless(os.path.exists(pid_file),
81 "PID file should have been created")
82 read_pid = utils.ReadPidFile(pid_file)
83 self.failUnlessEqual(read_pid, os.getpid())
84 self.failUnless(utils.IsProcessAlive(read_pid))
85 self.failUnlessRaises(GenericError, utils.WritePidFile, 'test')
86 utils.RemovePidFile('test')
87 self.failIf(os.path.exists(pid_file),
88 "PID file should not exist anymore")
89 self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
90 "ReadPidFile should return 0 for missing pid file")
91 fh = open(pid_file, "w")
94 self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
95 "ReadPidFile should return 0 for invalid pid file")
96 utils.RemovePidFile('test')
97 self.failIf(os.path.exists(pid_file),
98 "PID file should not exist anymore")
101 pid_file = self.f_dpn('child')
102 r_fd, w_fd = os.pipe()
104 if new_pid == 0: #child
105 utils.WritePidFile('child')
110 # else we are in the parent
111 # wait until the child has written the pid file
113 read_pid = utils.ReadPidFile(pid_file)
114 self.failUnlessEqual(read_pid, new_pid)
115 self.failUnless(utils.IsProcessAlive(new_pid))
116 utils.KillProcess(new_pid, waitpid=True)
117 self.failIf(utils.IsProcessAlive(new_pid))
118 utils.RemovePidFile('child')
119 self.failUnlessRaises(ProgrammerError, utils.KillProcess, 0)
122 for name in os.listdir(self.dir):
123 os.unlink(os.path.join(self.dir, name))
127 class TestRunCmd(testutils.GanetiTestCase):
128 """Testing case for the RunCmd function"""
131 self.magic = time.ctime() + " ganeti test"
132 fh, self.fname = tempfile.mkstemp()
137 utils.RemoveFile(self.fname)
140 """Test successful exit code"""
141 result = RunCmd("/bin/sh -c 'exit 0'")
142 self.assertEqual(result.exit_code, 0)
143 self.assertEqual(result.output, "")
146 """Test fail exit code"""
147 result = RunCmd("/bin/sh -c 'exit 1'")
148 self.assertEqual(result.exit_code, 1)
149 self.assertEqual(result.output, "")
151 def testStdout(self):
152 """Test standard output"""
153 cmd = 'echo -n "%s"' % self.magic
154 result = RunCmd("/bin/sh -c '%s'" % cmd)
155 self.assertEqual(result.stdout, self.magic)
156 result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
157 self.assertEqual(result.output, "")
158 self.assertFileContent(self.fname, self.magic)
160 def testStderr(self):
161 """Test standard error"""
162 cmd = 'echo -n "%s"' % self.magic
163 result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
164 self.assertEqual(result.stderr, self.magic)
165 result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd, output=self.fname)
166 self.assertEqual(result.output, "")
167 self.assertFileContent(self.fname, self.magic)
169 def testCombined(self):
170 """Test combined output"""
171 cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
172 expected = "A" + self.magic + "B" + self.magic
173 result = RunCmd("/bin/sh -c '%s'" % cmd)
174 self.assertEqual(result.output, expected)
175 result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
176 self.assertEqual(result.output, "")
177 self.assertFileContent(self.fname, expected)
179 def testSignal(self):
181 result = RunCmd(["python", "-c", "import os; os.kill(os.getpid(), 15)"])
182 self.assertEqual(result.signal, 15)
183 self.assertEqual(result.output, "")
185 def testListRun(self):
187 result = RunCmd(["true"])
188 self.assertEqual(result.signal, None)
189 self.assertEqual(result.exit_code, 0)
190 result = RunCmd(["/bin/sh", "-c", "exit 1"])
191 self.assertEqual(result.signal, None)
192 self.assertEqual(result.exit_code, 1)
193 result = RunCmd(["echo", "-n", self.magic])
194 self.assertEqual(result.signal, None)
195 self.assertEqual(result.exit_code, 0)
196 self.assertEqual(result.stdout, self.magic)
198 def testFileEmptyOutput(self):
199 """Test file output"""
200 result = RunCmd(["true"], output=self.fname)
201 self.assertEqual(result.signal, None)
202 self.assertEqual(result.exit_code, 0)
203 self.assertFileContent(self.fname, "")
206 """Test locale environment"""
207 old_env = os.environ.copy()
209 os.environ["LANG"] = "en_US.UTF-8"
210 os.environ["LC_ALL"] = "en_US.UTF-8"
211 result = RunCmd(["locale"])
212 for line in result.output.splitlines():
213 key, value = line.split("=", 1)
214 # Ignore these variables, they're overridden by LC_ALL
215 if key == "LANG" or key == "LANGUAGE":
217 self.failIf(value and value != "C" and value != '"C"',
218 "Variable %s is set to the invalid value '%s'" % (key, value))
222 def testDefaultCwd(self):
223 """Test default working directory"""
224 self.failUnlessEqual(RunCmd(["pwd"]).stdout.strip(), "/")
227 """Test default working directory"""
228 self.failUnlessEqual(RunCmd(["pwd"], cwd="/").stdout.strip(), "/")
229 self.failUnlessEqual(RunCmd(["pwd"], cwd="/tmp").stdout.strip(), "/tmp")
231 self.failUnlessEqual(RunCmd(["pwd"], cwd=cwd).stdout.strip(), cwd)
234 class TestRemoveFile(unittest.TestCase):
235 """Test case for the RemoveFile function"""
238 """Create a temp dir and file for each case"""
239 self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
240 fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
244 if os.path.exists(self.tmpfile):
245 os.unlink(self.tmpfile)
246 os.rmdir(self.tmpdir)
249 def testIgnoreDirs(self):
250 """Test that RemoveFile() ignores directories"""
251 self.assertEqual(None, RemoveFile(self.tmpdir))
254 def testIgnoreNotExisting(self):
255 """Test that RemoveFile() ignores non-existing files"""
256 RemoveFile(self.tmpfile)
257 RemoveFile(self.tmpfile)
260 def testRemoveFile(self):
261 """Test that RemoveFile does remove a file"""
262 RemoveFile(self.tmpfile)
263 if os.path.exists(self.tmpfile):
264 self.fail("File '%s' not removed" % self.tmpfile)
267 def testRemoveSymlink(self):
268 """Test that RemoveFile does remove symlinks"""
269 symlink = self.tmpdir + "/symlink"
270 os.symlink("no-such-file", symlink)
272 if os.path.exists(symlink):
273 self.fail("File '%s' not removed" % symlink)
274 os.symlink(self.tmpfile, symlink)
276 if os.path.exists(symlink):
277 self.fail("File '%s' not removed" % symlink)
280 class TestCheckdict(unittest.TestCase):
281 """Test case for the CheckDict function"""
284 """Test that CheckDict adds a missing key with the correct value"""
289 if 'b' not in tgt or tgt['b'] != 2:
290 self.fail("Failed to update dict")
293 def testNoUpdate(self):
294 """Test that CheckDict does not overwrite an existing key"""
295 tgt = {'a':1, 'b': 3}
298 self.failUnlessEqual(tgt['b'], 3)
301 class TestMatchNameComponent(unittest.TestCase):
302 """Test case for the MatchNameComponent function"""
304 def testEmptyList(self):
305 """Test that there is no match against an empty list"""
307 self.failUnlessEqual(MatchNameComponent("", []), None)
308 self.failUnlessEqual(MatchNameComponent("test", []), None)
310 def testSingleMatch(self):
311 """Test that a single match is performed correctly"""
312 mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
313 for key in "test2", "test2.example", "test2.example.com":
314 self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
316 def testMultipleMatches(self):
317 """Test that a multiple match is returned as None"""
318 mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
319 for key in "test1", "test1.example":
320 self.failUnlessEqual(MatchNameComponent(key, mlist), None)
323 class TestFormatUnit(unittest.TestCase):
324 """Test case for the FormatUnit function"""
327 self.assertEqual(FormatUnit(1, 'h'), '1M')
328 self.assertEqual(FormatUnit(100, 'h'), '100M')
329 self.assertEqual(FormatUnit(1023, 'h'), '1023M')
331 self.assertEqual(FormatUnit(1, 'm'), '1')
332 self.assertEqual(FormatUnit(100, 'm'), '100')
333 self.assertEqual(FormatUnit(1023, 'm'), '1023')
335 self.assertEqual(FormatUnit(1024, 'm'), '1024')
336 self.assertEqual(FormatUnit(1536, 'm'), '1536')
337 self.assertEqual(FormatUnit(17133, 'm'), '17133')
338 self.assertEqual(FormatUnit(1024 * 1024 - 1, 'm'), '1048575')
341 self.assertEqual(FormatUnit(1024, 'h'), '1.0G')
342 self.assertEqual(FormatUnit(1536, 'h'), '1.5G')
343 self.assertEqual(FormatUnit(17133, 'h'), '16.7G')
344 self.assertEqual(FormatUnit(1024 * 1024 - 1, 'h'), '1024.0G')
346 self.assertEqual(FormatUnit(1024, 'g'), '1.0')
347 self.assertEqual(FormatUnit(1536, 'g'), '1.5')
348 self.assertEqual(FormatUnit(17133, 'g'), '16.7')
349 self.assertEqual(FormatUnit(1024 * 1024 - 1, 'g'), '1024.0')
351 self.assertEqual(FormatUnit(1024 * 1024, 'g'), '1024.0')
352 self.assertEqual(FormatUnit(5120 * 1024, 'g'), '5120.0')
353 self.assertEqual(FormatUnit(29829 * 1024, 'g'), '29829.0')
356 self.assertEqual(FormatUnit(1024 * 1024, 'h'), '1.0T')
357 self.assertEqual(FormatUnit(5120 * 1024, 'h'), '5.0T')
358 self.assertEqual(FormatUnit(29829 * 1024, 'h'), '29.1T')
360 self.assertEqual(FormatUnit(1024 * 1024, 't'), '1.0')
361 self.assertEqual(FormatUnit(5120 * 1024, 't'), '5.0')
362 self.assertEqual(FormatUnit(29829 * 1024, 't'), '29.1')
364 class TestParseUnit(unittest.TestCase):
365 """Test case for the ParseUnit function"""
368 ('M', 1), ('G', 1024), ('T', 1024 * 1024),
369 ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
370 ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
372 def testRounding(self):
373 self.assertEqual(ParseUnit('0'), 0)
374 self.assertEqual(ParseUnit('1'), 4)
375 self.assertEqual(ParseUnit('2'), 4)
376 self.assertEqual(ParseUnit('3'), 4)
378 self.assertEqual(ParseUnit('124'), 124)
379 self.assertEqual(ParseUnit('125'), 128)
380 self.assertEqual(ParseUnit('126'), 128)
381 self.assertEqual(ParseUnit('127'), 128)
382 self.assertEqual(ParseUnit('128'), 128)
383 self.assertEqual(ParseUnit('129'), 132)
384 self.assertEqual(ParseUnit('130'), 132)
386 def testFloating(self):
387 self.assertEqual(ParseUnit('0'), 0)
388 self.assertEqual(ParseUnit('0.5'), 4)
389 self.assertEqual(ParseUnit('1.75'), 4)
390 self.assertEqual(ParseUnit('1.99'), 4)
391 self.assertEqual(ParseUnit('2.00'), 4)
392 self.assertEqual(ParseUnit('2.01'), 4)
393 self.assertEqual(ParseUnit('3.99'), 4)
394 self.assertEqual(ParseUnit('4.00'), 4)
395 self.assertEqual(ParseUnit('4.01'), 8)
396 self.assertEqual(ParseUnit('1.5G'), 1536)
397 self.assertEqual(ParseUnit('1.8G'), 1844)
398 self.assertEqual(ParseUnit('8.28T'), 8682212)
400 def testSuffixes(self):
401 for sep in ('', ' ', ' ', "\t", "\t "):
402 for suffix, scale in TestParseUnit.SCALES:
403 for func in (lambda x: x, str.lower, str.upper):
404 self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
407 def testInvalidInput(self):
408 for sep in ('-', '_', ',', 'a'):
409 for suffix, _ in TestParseUnit.SCALES:
410 self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
412 for suffix, _ in TestParseUnit.SCALES:
413 self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
416 class TestSshKeys(testutils.GanetiTestCase):
417 """Test case for the AddAuthorizedKey function"""
419 KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
420 KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
421 'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
424 (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test')
426 handle = os.fdopen(fd, 'w')
428 handle.write("%s\n" % TestSshKeys.KEY_A)
429 handle.write("%s\n" % TestSshKeys.KEY_B)
433 utils.RemoveFile(self.tmpname)
437 utils.RemoveFile(self.tmpname)
440 def testAddingNewKey(self):
441 AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
443 self.assertFileContent(self.tmpname,
444 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
445 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
446 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
447 "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
449 def testAddingAlmostButNotCompletelyTheSameKey(self):
450 AddAuthorizedKey(self.tmpname,
451 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
453 self.assertFileContent(self.tmpname,
454 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
455 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
456 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
457 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
459 def testAddingExistingKeyWithSomeMoreSpaces(self):
460 AddAuthorizedKey(self.tmpname,
461 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a')
463 self.assertFileContent(self.tmpname,
464 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
465 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
466 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
468 def testRemovingExistingKeyWithSomeMoreSpaces(self):
469 RemoveAuthorizedKey(self.tmpname,
470 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a')
472 self.assertFileContent(self.tmpname,
473 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
474 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
476 def testRemovingNonExistingKey(self):
477 RemoveAuthorizedKey(self.tmpname,
478 'ssh-dss AAAAB3Nsdfj230xxjxJjsjwjsjdjU root@test')
480 self.assertFileContent(self.tmpname,
481 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
482 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
483 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
486 class TestEtcHosts(testutils.GanetiTestCase):
487 """Test functions modifying /etc/hosts"""
490 (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test')
492 handle = os.fdopen(fd, 'w')
494 handle.write('# This is a test file for /etc/hosts\n')
495 handle.write('127.0.0.1\tlocalhost\n')
496 handle.write('192.168.1.1 router gw\n')
500 utils.RemoveFile(self.tmpname)
504 utils.RemoveFile(self.tmpname)
507 def testSettingNewIp(self):
508 SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
510 self.assertFileContent(self.tmpname,
511 "# This is a test file for /etc/hosts\n"
512 "127.0.0.1\tlocalhost\n"
513 "192.168.1.1 router gw\n"
514 "1.2.3.4\tmyhost.domain.tld myhost\n")
516 def testSettingExistingIp(self):
517 SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld',
520 self.assertFileContent(self.tmpname,
521 "# This is a test file for /etc/hosts\n"
522 "127.0.0.1\tlocalhost\n"
523 "192.168.1.1\tmyhost.domain.tld myhost\n")
525 def testSettingDuplicateName(self):
526 SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost'])
528 self.assertFileContent(self.tmpname,
529 "# This is a test file for /etc/hosts\n"
530 "127.0.0.1\tlocalhost\n"
531 "192.168.1.1 router gw\n"
534 def testRemovingExistingHost(self):
535 RemoveEtcHostsEntry(self.tmpname, 'router')
537 self.assertFileContent(self.tmpname,
538 "# This is a test file for /etc/hosts\n"
539 "127.0.0.1\tlocalhost\n"
542 def testRemovingSingleExistingHost(self):
543 RemoveEtcHostsEntry(self.tmpname, 'localhost')
545 self.assertFileContent(self.tmpname,
546 "# This is a test file for /etc/hosts\n"
547 "192.168.1.1 router gw\n")
549 def testRemovingNonExistingHost(self):
550 RemoveEtcHostsEntry(self.tmpname, 'myhost')
552 self.assertFileContent(self.tmpname,
553 "# This is a test file for /etc/hosts\n"
554 "127.0.0.1\tlocalhost\n"
555 "192.168.1.1 router gw\n")
557 def testRemovingAlias(self):
558 RemoveEtcHostsEntry(self.tmpname, 'gw')
560 self.assertFileContent(self.tmpname,
561 "# This is a test file for /etc/hosts\n"
562 "127.0.0.1\tlocalhost\n"
563 "192.168.1.1 router\n")
566 class TestShellQuoting(unittest.TestCase):
567 """Test case for shell quoting functions"""
569 def testShellQuote(self):
570 self.assertEqual(ShellQuote('abc'), "abc")
571 self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
572 self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
573 self.assertEqual(ShellQuote("a b c"), "'a b c'")
574 self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
576 def testShellQuoteArgs(self):
577 self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
578 self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
579 self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
582 class TestTcpPing(unittest.TestCase):
583 """Testcase for TCP version of ping - against listen(2)ing port"""
586 self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
587 self.listener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
588 self.listenerport = self.listener.getsockname()[1]
589 self.listener.listen(1)
592 self.listener.shutdown(socket.SHUT_RDWR)
594 del self.listenerport
596 def testTcpPingToLocalHostAccept(self):
597 self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
600 live_port_needed=True,
601 source=constants.LOCALHOST_IP_ADDRESS,
603 "failed to connect to test listener")
605 self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
608 live_port_needed=True,
610 "failed to connect to test listener (no source)")
613 class TestTcpPingDeaf(unittest.TestCase):
614 """Testcase for TCP version of ping - against non listen(2)ing port"""
617 self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
618 self.deaflistener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
619 self.deaflistenerport = self.deaflistener.getsockname()[1]
622 del self.deaflistener
623 del self.deaflistenerport
625 def testTcpPingToLocalHostAcceptDeaf(self):
626 self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
627 self.deaflistenerport,
628 timeout=constants.TCP_PING_TIMEOUT,
629 live_port_needed=True,
630 source=constants.LOCALHOST_IP_ADDRESS,
631 ), # need successful connect(2)
632 "successfully connected to deaf listener")
634 self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
635 self.deaflistenerport,
636 timeout=constants.TCP_PING_TIMEOUT,
637 live_port_needed=True,
638 ), # need successful connect(2)
639 "successfully connected to deaf listener (no source addr)")
641 def testTcpPingToLocalHostNoAccept(self):
642 self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
643 self.deaflistenerport,
644 timeout=constants.TCP_PING_TIMEOUT,
645 live_port_needed=False,
646 source=constants.LOCALHOST_IP_ADDRESS,
647 ), # ECONNREFUSED is OK
648 "failed to ping alive host on deaf port")
650 self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
651 self.deaflistenerport,
652 timeout=constants.TCP_PING_TIMEOUT,
653 live_port_needed=False,
654 ), # ECONNREFUSED is OK
655 "failed to ping alive host on deaf port (no source addr)")
658 class TestOwnIpAddress(unittest.TestCase):
659 """Testcase for OwnIpAddress"""
661 def testOwnLoopback(self):
662 """check having the loopback ip"""
663 self.failUnless(OwnIpAddress(constants.LOCALHOST_IP_ADDRESS),
664 "Should own the loopback address")
666 def testNowOwnAddress(self):
667 """check that I don't own an address"""
669 # network 192.0.2.0/24 is reserved for test/documentation as per
670 # rfc 3330, so we *should* not have an address of this range... if
671 # this fails, we should extend the test to multiple addresses
673 self.failIf(OwnIpAddress(DST_IP), "Should not own IP address %s" % DST_IP)
676 class TestListVisibleFiles(unittest.TestCase):
677 """Test case for ListVisibleFiles"""
680 self.path = tempfile.mkdtemp()
683 shutil.rmtree(self.path)
685 def _test(self, files, expected):
687 expected = expected[:]
691 f = open(os.path.join(self.path, name), 'w')
697 found = ListVisibleFiles(self.path)
700 self.assertEqual(found, expected)
702 def testAllVisible(self):
703 files = ["a", "b", "c"]
705 self._test(files, expected)
707 def testNoneVisible(self):
708 files = [".a", ".b", ".c"]
710 self._test(files, expected)
712 def testSomeVisible(self):
713 files = ["a", "b", ".c"]
714 expected = ["a", "b"]
715 self._test(files, expected)
718 class TestNewUUID(unittest.TestCase):
719 """Test case for NewUUID"""
721 _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
722 '[a-f0-9]{4}-[a-f0-9]{12}$')
725 self.failUnless(self._re_uuid.match(utils.NewUUID()))
728 class TestUniqueSequence(unittest.TestCase):
729 """Test case for UniqueSequence"""
731 def _test(self, input, expected):
732 self.assertEqual(utils.UniqueSequence(input), expected)
736 self._test([1, 2, 3], [1, 2, 3])
737 self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
738 self._test([1, 2, 2, 3], [1, 2, 3])
739 self._test([1, 2, 3, 3], [1, 2, 3])
742 self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
743 self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])
746 self._test(["a", "a"], ["a"])
747 self._test(["a", "b"], ["a", "b"])
748 self._test(["a", "b", "a"], ["a", "b"])
751 class TestFirstFree(unittest.TestCase):
752 """Test case for the FirstFree function"""
756 self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
757 self.failUnlessEqual(FirstFree([]), None)
758 self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
759 self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
760 self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
763 class TestFileLock(unittest.TestCase):
764 """Test case for the FileLock class"""
767 self.tmpfile = tempfile.NamedTemporaryFile()
768 self.lock = utils.FileLock(self.tmpfile.name)
770 def testSharedNonblocking(self):
771 self.lock.Shared(blocking=False)
774 def testExclusiveNonblocking(self):
775 self.lock.Exclusive(blocking=False)
778 def testUnlockNonblocking(self):
779 self.lock.Unlock(blocking=False)
782 def testSharedBlocking(self):
783 self.lock.Shared(blocking=True)
786 def testExclusiveBlocking(self):
787 self.lock.Exclusive(blocking=True)
790 def testUnlockBlocking(self):
791 self.lock.Unlock(blocking=True)
794 def testSharedExclusiveUnlock(self):
795 self.lock.Shared(blocking=False)
796 self.lock.Exclusive(blocking=False)
797 self.lock.Unlock(blocking=False)
800 def testExclusiveSharedUnlock(self):
801 self.lock.Exclusive(blocking=False)
802 self.lock.Shared(blocking=False)
803 self.lock.Unlock(blocking=False)
806 def testCloseShared(self):
808 self.assertRaises(AssertionError, self.lock.Shared, blocking=False)
810 def testCloseExclusive(self):
812 self.assertRaises(AssertionError, self.lock.Exclusive, blocking=False)
814 def testCloseUnlock(self):
816 self.assertRaises(AssertionError, self.lock.Unlock, blocking=False)
819 class TestTimeFunctions(unittest.TestCase):
820 """Test case for time functions"""
823 self.assertEqual(utils.SplitTime(1), (1, 0))
824 self.assertEqual(utils.SplitTime(1.5), (1, 500000))
825 self.assertEqual(utils.SplitTime(1218448917.4809151), (1218448917, 480915))
826 self.assertEqual(utils.SplitTime(123.48012), (123, 480120))
827 self.assertEqual(utils.SplitTime(123.9996), (123, 999600))
828 self.assertEqual(utils.SplitTime(123.9995), (123, 999500))
829 self.assertEqual(utils.SplitTime(123.9994), (123, 999400))
830 self.assertEqual(utils.SplitTime(123.999999999), (123, 999999))
832 self.assertRaises(AssertionError, utils.SplitTime, -1)
834 self.assertEqual(utils.MergeTime((1, 0)), 1.0)
835 self.assertEqual(utils.MergeTime((1, 500000)), 1.5)
836 self.assertEqual(utils.MergeTime((1218448917, 500000)), 1218448917.5)
838 self.assertEqual(round(utils.MergeTime((1218448917, 481000)), 3), 1218448917.481)
839 self.assertEqual(round(utils.MergeTime((1, 801000)), 3), 1.801)
841 self.assertRaises(AssertionError, utils.MergeTime, (0, -1))
842 self.assertRaises(AssertionError, utils.MergeTime, (0, 1000000))
843 self.assertRaises(AssertionError, utils.MergeTime, (0, 9999999))
844 self.assertRaises(AssertionError, utils.MergeTime, (-1, 0))
845 self.assertRaises(AssertionError, utils.MergeTime, (-9999, 0))
848 class FieldSetTestCase(unittest.TestCase):
849 """Test case for FieldSets"""
851 def testSimpleMatch(self):
852 f = utils.FieldSet("a", "b", "c", "def")
853 self.failUnless(f.Matches("a"))
854 self.failIf(f.Matches("d"), "Substring matched")
855 self.failIf(f.Matches("defghi"), "Prefix string matched")
856 self.failIf(f.NonMatching(["b", "c"]))
857 self.failIf(f.NonMatching(["a", "b", "c", "def"]))
858 self.failUnless(f.NonMatching(["a", "d"]))
860 def testRegexMatch(self):
861 f = utils.FieldSet("a", "b([0-9]+)", "c")
862 self.failUnless(f.Matches("b1"))
863 self.failUnless(f.Matches("b99"))
864 self.failIf(f.Matches("b/1"))
865 self.failIf(f.NonMatching(["b12", "c"]))
866 self.failUnless(f.NonMatching(["a", "1"]))
869 if __name__ == '__main__':