4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for unittesting the utils module"""
39 from ganeti import constants
40 from ganeti import utils
41 from ganeti import errors
42 from ganeti.utils import IsProcessAlive, RunCmd, \
43 RemoveFile, CheckDict, MatchNameComponent, FormatUnit, \
44 ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
45 ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
46 SetEtcHostsEntry, RemoveEtcHostsEntry, FirstFree, OwnIpAddress, \
47 TailFile, ForceDictType, IsNormAbsPath
49 from ganeti.errors import LockError, UnitParseError, GenericError, \
53 class TestIsProcessAlive(unittest.TestCase):
54 """Testing case for IsProcessAlive"""
58 self.assert_(IsProcessAlive(mypid),
59 "can't find myself running")
61 def testNotExisting(self):
62 pid_non_existing = os.fork()
63 if pid_non_existing == 0:
65 elif pid_non_existing < 0:
66 raise SystemError("can't fork")
67 os.waitpid(pid_non_existing, 0)
68 self.assert_(not IsProcessAlive(pid_non_existing),
69 "nonexisting process detected")
72 class TestPidFileFunctions(unittest.TestCase):
73 """Tests for WritePidFile, RemovePidFile and ReadPidFile"""
76 self.dir = tempfile.mkdtemp()
77 self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
78 utils.DaemonPidFileName = self.f_dpn
80 def testPidFileFunctions(self):
81 pid_file = self.f_dpn('test')
82 utils.WritePidFile('test')
83 self.failUnless(os.path.exists(pid_file),
84 "PID file should have been created")
85 read_pid = utils.ReadPidFile(pid_file)
86 self.failUnlessEqual(read_pid, os.getpid())
87 self.failUnless(utils.IsProcessAlive(read_pid))
88 self.failUnlessRaises(GenericError, utils.WritePidFile, 'test')
89 utils.RemovePidFile('test')
90 self.failIf(os.path.exists(pid_file),
91 "PID file should not exist anymore")
92 self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
93 "ReadPidFile should return 0 for missing pid file")
94 fh = open(pid_file, "w")
97 self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
98 "ReadPidFile should return 0 for invalid pid file")
99 utils.RemovePidFile('test')
100 self.failIf(os.path.exists(pid_file),
101 "PID file should not exist anymore")
104 pid_file = self.f_dpn('child')
105 r_fd, w_fd = os.pipe()
107 if new_pid == 0: #child
108 utils.WritePidFile('child')
113 # else we are in the parent
114 # wait until the child has written the pid file
116 read_pid = utils.ReadPidFile(pid_file)
117 self.failUnlessEqual(read_pid, new_pid)
118 self.failUnless(utils.IsProcessAlive(new_pid))
119 utils.KillProcess(new_pid, waitpid=True)
120 self.failIf(utils.IsProcessAlive(new_pid))
121 utils.RemovePidFile('child')
122 self.failUnlessRaises(ProgrammerError, utils.KillProcess, 0)
125 for name in os.listdir(self.dir):
126 os.unlink(os.path.join(self.dir, name))
130 class TestRunCmd(testutils.GanetiTestCase):
131 """Testing case for the RunCmd function"""
134 testutils.GanetiTestCase.setUp(self)
135 self.magic = time.ctime() + " ganeti test"
136 self.fname = self._CreateTempFile()
139 """Test successful exit code"""
140 result = RunCmd("/bin/sh -c 'exit 0'")
141 self.assertEqual(result.exit_code, 0)
142 self.assertEqual(result.output, "")
145 """Test fail exit code"""
146 result = RunCmd("/bin/sh -c 'exit 1'")
147 self.assertEqual(result.exit_code, 1)
148 self.assertEqual(result.output, "")
150 def testStdout(self):
151 """Test standard output"""
152 cmd = 'echo -n "%s"' % self.magic
153 result = RunCmd("/bin/sh -c '%s'" % cmd)
154 self.assertEqual(result.stdout, self.magic)
155 result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
156 self.assertEqual(result.output, "")
157 self.assertFileContent(self.fname, self.magic)
159 def testStderr(self):
160 """Test standard error"""
161 cmd = 'echo -n "%s"' % self.magic
162 result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
163 self.assertEqual(result.stderr, self.magic)
164 result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd, output=self.fname)
165 self.assertEqual(result.output, "")
166 self.assertFileContent(self.fname, self.magic)
168 def testCombined(self):
169 """Test combined output"""
170 cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
171 expected = "A" + self.magic + "B" + self.magic
172 result = RunCmd("/bin/sh -c '%s'" % cmd)
173 self.assertEqual(result.output, expected)
174 result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
175 self.assertEqual(result.output, "")
176 self.assertFileContent(self.fname, expected)
178 def testSignal(self):
180 result = RunCmd(["python", "-c", "import os; os.kill(os.getpid(), 15)"])
181 self.assertEqual(result.signal, 15)
182 self.assertEqual(result.output, "")
184 def testListRun(self):
186 result = RunCmd(["true"])
187 self.assertEqual(result.signal, None)
188 self.assertEqual(result.exit_code, 0)
189 result = RunCmd(["/bin/sh", "-c", "exit 1"])
190 self.assertEqual(result.signal, None)
191 self.assertEqual(result.exit_code, 1)
192 result = RunCmd(["echo", "-n", self.magic])
193 self.assertEqual(result.signal, None)
194 self.assertEqual(result.exit_code, 0)
195 self.assertEqual(result.stdout, self.magic)
197 def testFileEmptyOutput(self):
198 """Test file output"""
199 result = RunCmd(["true"], output=self.fname)
200 self.assertEqual(result.signal, None)
201 self.assertEqual(result.exit_code, 0)
202 self.assertFileContent(self.fname, "")
205 """Test locale environment"""
206 old_env = os.environ.copy()
208 os.environ["LANG"] = "en_US.UTF-8"
209 os.environ["LC_ALL"] = "en_US.UTF-8"
210 result = RunCmd(["locale"])
211 for line in result.output.splitlines():
212 key, value = line.split("=", 1)
213 # Ignore these variables, they're overridden by LC_ALL
214 if key == "LANG" or key == "LANGUAGE":
216 self.failIf(value and value != "C" and value != '"C"',
217 "Variable %s is set to the invalid value '%s'" % (key, value))
221 def testDefaultCwd(self):
222 """Test default working directory"""
223 self.failUnlessEqual(RunCmd(["pwd"]).stdout.strip(), "/")
226 """Test default working directory"""
227 self.failUnlessEqual(RunCmd(["pwd"], cwd="/").stdout.strip(), "/")
228 self.failUnlessEqual(RunCmd(["pwd"], cwd="/tmp").stdout.strip(), "/tmp")
230 self.failUnlessEqual(RunCmd(["pwd"], cwd=cwd).stdout.strip(), cwd)
233 class TestRemoveFile(unittest.TestCase):
234 """Test case for the RemoveFile function"""
237 """Create a temp dir and file for each case"""
238 self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
239 fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
243 if os.path.exists(self.tmpfile):
244 os.unlink(self.tmpfile)
245 os.rmdir(self.tmpdir)
248 def testIgnoreDirs(self):
249 """Test that RemoveFile() ignores directories"""
250 self.assertEqual(None, RemoveFile(self.tmpdir))
253 def testIgnoreNotExisting(self):
254 """Test that RemoveFile() ignores non-existing files"""
255 RemoveFile(self.tmpfile)
256 RemoveFile(self.tmpfile)
259 def testRemoveFile(self):
260 """Test that RemoveFile does remove a file"""
261 RemoveFile(self.tmpfile)
262 if os.path.exists(self.tmpfile):
263 self.fail("File '%s' not removed" % self.tmpfile)
266 def testRemoveSymlink(self):
267 """Test that RemoveFile does remove symlinks"""
268 symlink = self.tmpdir + "/symlink"
269 os.symlink("no-such-file", symlink)
271 if os.path.exists(symlink):
272 self.fail("File '%s' not removed" % symlink)
273 os.symlink(self.tmpfile, symlink)
275 if os.path.exists(symlink):
276 self.fail("File '%s' not removed" % symlink)
279 class TestRename(unittest.TestCase):
280 """Test case for RenameFile"""
283 """Create a temporary directory"""
284 self.tmpdir = tempfile.mkdtemp()
285 self.tmpfile = os.path.join(self.tmpdir, "test1")
288 open(self.tmpfile, "w").close()
291 """Remove temporary directory"""
292 shutil.rmtree(self.tmpdir)
294 def testSimpleRename1(self):
295 """Simple rename 1"""
296 utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"))
298 def testSimpleRename2(self):
299 """Simple rename 2"""
300 utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"),
303 def testRenameMkdir(self):
304 """Rename with mkdir"""
305 utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "test/xyz"),
309 class TestCheckdict(unittest.TestCase):
310 """Test case for the CheckDict function"""
313 """Test that CheckDict adds a missing key with the correct value"""
318 if 'b' not in tgt or tgt['b'] != 2:
319 self.fail("Failed to update dict")
322 def testNoUpdate(self):
323 """Test that CheckDict does not overwrite an existing key"""
324 tgt = {'a':1, 'b': 3}
327 self.failUnlessEqual(tgt['b'], 3)
330 class TestMatchNameComponent(unittest.TestCase):
331 """Test case for the MatchNameComponent function"""
333 def testEmptyList(self):
334 """Test that there is no match against an empty list"""
336 self.failUnlessEqual(MatchNameComponent("", []), None)
337 self.failUnlessEqual(MatchNameComponent("test", []), None)
339 def testSingleMatch(self):
340 """Test that a single match is performed correctly"""
341 mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
342 for key in "test2", "test2.example", "test2.example.com":
343 self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
345 def testMultipleMatches(self):
346 """Test that a multiple match is returned as None"""
347 mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
348 for key in "test1", "test1.example":
349 self.failUnlessEqual(MatchNameComponent(key, mlist), None)
352 class TestFormatUnit(unittest.TestCase):
353 """Test case for the FormatUnit function"""
356 self.assertEqual(FormatUnit(1, 'h'), '1M')
357 self.assertEqual(FormatUnit(100, 'h'), '100M')
358 self.assertEqual(FormatUnit(1023, 'h'), '1023M')
360 self.assertEqual(FormatUnit(1, 'm'), '1')
361 self.assertEqual(FormatUnit(100, 'm'), '100')
362 self.assertEqual(FormatUnit(1023, 'm'), '1023')
364 self.assertEqual(FormatUnit(1024, 'm'), '1024')
365 self.assertEqual(FormatUnit(1536, 'm'), '1536')
366 self.assertEqual(FormatUnit(17133, 'm'), '17133')
367 self.assertEqual(FormatUnit(1024 * 1024 - 1, 'm'), '1048575')
370 self.assertEqual(FormatUnit(1024, 'h'), '1.0G')
371 self.assertEqual(FormatUnit(1536, 'h'), '1.5G')
372 self.assertEqual(FormatUnit(17133, 'h'), '16.7G')
373 self.assertEqual(FormatUnit(1024 * 1024 - 1, 'h'), '1024.0G')
375 self.assertEqual(FormatUnit(1024, 'g'), '1.0')
376 self.assertEqual(FormatUnit(1536, 'g'), '1.5')
377 self.assertEqual(FormatUnit(17133, 'g'), '16.7')
378 self.assertEqual(FormatUnit(1024 * 1024 - 1, 'g'), '1024.0')
380 self.assertEqual(FormatUnit(1024 * 1024, 'g'), '1024.0')
381 self.assertEqual(FormatUnit(5120 * 1024, 'g'), '5120.0')
382 self.assertEqual(FormatUnit(29829 * 1024, 'g'), '29829.0')
385 self.assertEqual(FormatUnit(1024 * 1024, 'h'), '1.0T')
386 self.assertEqual(FormatUnit(5120 * 1024, 'h'), '5.0T')
387 self.assertEqual(FormatUnit(29829 * 1024, 'h'), '29.1T')
389 self.assertEqual(FormatUnit(1024 * 1024, 't'), '1.0')
390 self.assertEqual(FormatUnit(5120 * 1024, 't'), '5.0')
391 self.assertEqual(FormatUnit(29829 * 1024, 't'), '29.1')
393 class TestParseUnit(unittest.TestCase):
394 """Test case for the ParseUnit function"""
397 ('M', 1), ('G', 1024), ('T', 1024 * 1024),
398 ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
399 ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
401 def testRounding(self):
402 self.assertEqual(ParseUnit('0'), 0)
403 self.assertEqual(ParseUnit('1'), 4)
404 self.assertEqual(ParseUnit('2'), 4)
405 self.assertEqual(ParseUnit('3'), 4)
407 self.assertEqual(ParseUnit('124'), 124)
408 self.assertEqual(ParseUnit('125'), 128)
409 self.assertEqual(ParseUnit('126'), 128)
410 self.assertEqual(ParseUnit('127'), 128)
411 self.assertEqual(ParseUnit('128'), 128)
412 self.assertEqual(ParseUnit('129'), 132)
413 self.assertEqual(ParseUnit('130'), 132)
415 def testFloating(self):
416 self.assertEqual(ParseUnit('0'), 0)
417 self.assertEqual(ParseUnit('0.5'), 4)
418 self.assertEqual(ParseUnit('1.75'), 4)
419 self.assertEqual(ParseUnit('1.99'), 4)
420 self.assertEqual(ParseUnit('2.00'), 4)
421 self.assertEqual(ParseUnit('2.01'), 4)
422 self.assertEqual(ParseUnit('3.99'), 4)
423 self.assertEqual(ParseUnit('4.00'), 4)
424 self.assertEqual(ParseUnit('4.01'), 8)
425 self.assertEqual(ParseUnit('1.5G'), 1536)
426 self.assertEqual(ParseUnit('1.8G'), 1844)
427 self.assertEqual(ParseUnit('8.28T'), 8682212)
429 def testSuffixes(self):
430 for sep in ('', ' ', ' ', "\t", "\t "):
431 for suffix, scale in TestParseUnit.SCALES:
432 for func in (lambda x: x, str.lower, str.upper):
433 self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
436 def testInvalidInput(self):
437 for sep in ('-', '_', ',', 'a'):
438 for suffix, _ in TestParseUnit.SCALES:
439 self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
441 for suffix, _ in TestParseUnit.SCALES:
442 self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
445 class TestSshKeys(testutils.GanetiTestCase):
446 """Test case for the AddAuthorizedKey function"""
448 KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
449 KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
450 'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
453 testutils.GanetiTestCase.setUp(self)
454 self.tmpname = self._CreateTempFile()
455 handle = open(self.tmpname, 'w')
457 handle.write("%s\n" % TestSshKeys.KEY_A)
458 handle.write("%s\n" % TestSshKeys.KEY_B)
462 def testAddingNewKey(self):
463 AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
465 self.assertFileContent(self.tmpname,
466 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
467 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
468 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
469 "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
471 def testAddingAlmostButNotCompletelyTheSameKey(self):
472 AddAuthorizedKey(self.tmpname,
473 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
475 self.assertFileContent(self.tmpname,
476 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
477 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
478 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
479 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
481 def testAddingExistingKeyWithSomeMoreSpaces(self):
482 AddAuthorizedKey(self.tmpname,
483 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a')
485 self.assertFileContent(self.tmpname,
486 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
487 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
488 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
490 def testRemovingExistingKeyWithSomeMoreSpaces(self):
491 RemoveAuthorizedKey(self.tmpname,
492 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a')
494 self.assertFileContent(self.tmpname,
495 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
496 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
498 def testRemovingNonExistingKey(self):
499 RemoveAuthorizedKey(self.tmpname,
500 'ssh-dss AAAAB3Nsdfj230xxjxJjsjwjsjdjU root@test')
502 self.assertFileContent(self.tmpname,
503 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
504 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
505 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
508 class TestEtcHosts(testutils.GanetiTestCase):
509 """Test functions modifying /etc/hosts"""
512 testutils.GanetiTestCase.setUp(self)
513 self.tmpname = self._CreateTempFile()
514 handle = open(self.tmpname, 'w')
516 handle.write('# This is a test file for /etc/hosts\n')
517 handle.write('127.0.0.1\tlocalhost\n')
518 handle.write('192.168.1.1 router gw\n')
522 def testSettingNewIp(self):
523 SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
525 self.assertFileContent(self.tmpname,
526 "# This is a test file for /etc/hosts\n"
527 "127.0.0.1\tlocalhost\n"
528 "192.168.1.1 router gw\n"
529 "1.2.3.4\tmyhost.domain.tld myhost\n")
530 self.assertFileMode(self.tmpname, 0644)
532 def testSettingExistingIp(self):
533 SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld',
536 self.assertFileContent(self.tmpname,
537 "# This is a test file for /etc/hosts\n"
538 "127.0.0.1\tlocalhost\n"
539 "192.168.1.1\tmyhost.domain.tld myhost\n")
540 self.assertFileMode(self.tmpname, 0644)
542 def testSettingDuplicateName(self):
543 SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost'])
545 self.assertFileContent(self.tmpname,
546 "# This is a test file for /etc/hosts\n"
547 "127.0.0.1\tlocalhost\n"
548 "192.168.1.1 router gw\n"
550 self.assertFileMode(self.tmpname, 0644)
552 def testRemovingExistingHost(self):
553 RemoveEtcHostsEntry(self.tmpname, 'router')
555 self.assertFileContent(self.tmpname,
556 "# This is a test file for /etc/hosts\n"
557 "127.0.0.1\tlocalhost\n"
559 self.assertFileMode(self.tmpname, 0644)
561 def testRemovingSingleExistingHost(self):
562 RemoveEtcHostsEntry(self.tmpname, 'localhost')
564 self.assertFileContent(self.tmpname,
565 "# This is a test file for /etc/hosts\n"
566 "192.168.1.1 router gw\n")
567 self.assertFileMode(self.tmpname, 0644)
569 def testRemovingNonExistingHost(self):
570 RemoveEtcHostsEntry(self.tmpname, 'myhost')
572 self.assertFileContent(self.tmpname,
573 "# This is a test file for /etc/hosts\n"
574 "127.0.0.1\tlocalhost\n"
575 "192.168.1.1 router gw\n")
576 self.assertFileMode(self.tmpname, 0644)
578 def testRemovingAlias(self):
579 RemoveEtcHostsEntry(self.tmpname, 'gw')
581 self.assertFileContent(self.tmpname,
582 "# This is a test file for /etc/hosts\n"
583 "127.0.0.1\tlocalhost\n"
584 "192.168.1.1 router\n")
585 self.assertFileMode(self.tmpname, 0644)
588 class TestShellQuoting(unittest.TestCase):
589 """Test case for shell quoting functions"""
591 def testShellQuote(self):
592 self.assertEqual(ShellQuote('abc'), "abc")
593 self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
594 self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
595 self.assertEqual(ShellQuote("a b c"), "'a b c'")
596 self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
598 def testShellQuoteArgs(self):
599 self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
600 self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
601 self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
604 class TestTcpPing(unittest.TestCase):
605 """Testcase for TCP version of ping - against listen(2)ing port"""
608 self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
609 self.listener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
610 self.listenerport = self.listener.getsockname()[1]
611 self.listener.listen(1)
614 self.listener.shutdown(socket.SHUT_RDWR)
616 del self.listenerport
618 def testTcpPingToLocalHostAccept(self):
619 self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
622 live_port_needed=True,
623 source=constants.LOCALHOST_IP_ADDRESS,
625 "failed to connect to test listener")
627 self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
630 live_port_needed=True,
632 "failed to connect to test listener (no source)")
635 class TestTcpPingDeaf(unittest.TestCase):
636 """Testcase for TCP version of ping - against non listen(2)ing port"""
639 self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
640 self.deaflistener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
641 self.deaflistenerport = self.deaflistener.getsockname()[1]
644 del self.deaflistener
645 del self.deaflistenerport
647 def testTcpPingToLocalHostAcceptDeaf(self):
648 self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
649 self.deaflistenerport,
650 timeout=constants.TCP_PING_TIMEOUT,
651 live_port_needed=True,
652 source=constants.LOCALHOST_IP_ADDRESS,
653 ), # need successful connect(2)
654 "successfully connected to deaf listener")
656 self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
657 self.deaflistenerport,
658 timeout=constants.TCP_PING_TIMEOUT,
659 live_port_needed=True,
660 ), # need successful connect(2)
661 "successfully connected to deaf listener (no source addr)")
663 def testTcpPingToLocalHostNoAccept(self):
664 self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
665 self.deaflistenerport,
666 timeout=constants.TCP_PING_TIMEOUT,
667 live_port_needed=False,
668 source=constants.LOCALHOST_IP_ADDRESS,
669 ), # ECONNREFUSED is OK
670 "failed to ping alive host on deaf port")
672 self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
673 self.deaflistenerport,
674 timeout=constants.TCP_PING_TIMEOUT,
675 live_port_needed=False,
676 ), # ECONNREFUSED is OK
677 "failed to ping alive host on deaf port (no source addr)")
680 class TestOwnIpAddress(unittest.TestCase):
681 """Testcase for OwnIpAddress"""
683 def testOwnLoopback(self):
684 """check having the loopback ip"""
685 self.failUnless(OwnIpAddress(constants.LOCALHOST_IP_ADDRESS),
686 "Should own the loopback address")
688 def testNowOwnAddress(self):
689 """check that I don't own an address"""
691 # network 192.0.2.0/24 is reserved for test/documentation as per
692 # rfc 3330, so we *should* not have an address of this range... if
693 # this fails, we should extend the test to multiple addresses
695 self.failIf(OwnIpAddress(DST_IP), "Should not own IP address %s" % DST_IP)
698 class TestListVisibleFiles(unittest.TestCase):
699 """Test case for ListVisibleFiles"""
702 self.path = tempfile.mkdtemp()
705 shutil.rmtree(self.path)
707 def _test(self, files, expected):
709 expected = expected[:]
713 f = open(os.path.join(self.path, name), 'w')
719 found = ListVisibleFiles(self.path)
722 self.assertEqual(found, expected)
724 def testAllVisible(self):
725 files = ["a", "b", "c"]
727 self._test(files, expected)
729 def testNoneVisible(self):
730 files = [".a", ".b", ".c"]
732 self._test(files, expected)
734 def testSomeVisible(self):
735 files = ["a", "b", ".c"]
736 expected = ["a", "b"]
737 self._test(files, expected)
740 class TestNewUUID(unittest.TestCase):
741 """Test case for NewUUID"""
743 _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
744 '[a-f0-9]{4}-[a-f0-9]{12}$')
747 self.failUnless(self._re_uuid.match(utils.NewUUID()))
750 class TestUniqueSequence(unittest.TestCase):
751 """Test case for UniqueSequence"""
753 def _test(self, input, expected):
754 self.assertEqual(utils.UniqueSequence(input), expected)
758 self._test([1, 2, 3], [1, 2, 3])
759 self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
760 self._test([1, 2, 2, 3], [1, 2, 3])
761 self._test([1, 2, 3, 3], [1, 2, 3])
764 self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
765 self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])
768 self._test(["a", "a"], ["a"])
769 self._test(["a", "b"], ["a", "b"])
770 self._test(["a", "b", "a"], ["a", "b"])
773 class TestFirstFree(unittest.TestCase):
774 """Test case for the FirstFree function"""
778 self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
779 self.failUnlessEqual(FirstFree([]), None)
780 self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
781 self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
782 self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
785 class TestTailFile(testutils.GanetiTestCase):
786 """Test case for the TailFile function"""
789 fname = self._CreateTempFile()
790 self.failUnlessEqual(TailFile(fname), [])
791 self.failUnlessEqual(TailFile(fname, lines=25), [])
793 def testAllLines(self):
794 data = ["test %d" % i for i in range(30)]
796 fname = self._CreateTempFile()
797 fd = open(fname, "w")
798 fd.write("\n".join(data[:i]))
802 self.failUnlessEqual(TailFile(fname, lines=i), data[:i])
804 def testPartialLines(self):
805 data = ["test %d" % i for i in range(30)]
806 fname = self._CreateTempFile()
807 fd = open(fname, "w")
808 fd.write("\n".join(data))
811 for i in range(1, 30):
812 self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
814 def testBigFile(self):
815 data = ["test %d" % i for i in range(30)]
816 fname = self._CreateTempFile()
817 fd = open(fname, "w")
818 fd.write("X" * 1048576)
820 fd.write("\n".join(data))
823 for i in range(1, 30):
824 self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
827 class TestFileLock(unittest.TestCase):
828 """Test case for the FileLock class"""
831 self.tmpfile = tempfile.NamedTemporaryFile()
832 self.lock = utils.FileLock(self.tmpfile.name)
834 def testSharedNonblocking(self):
835 self.lock.Shared(blocking=False)
838 def testExclusiveNonblocking(self):
839 self.lock.Exclusive(blocking=False)
842 def testUnlockNonblocking(self):
843 self.lock.Unlock(blocking=False)
846 def testSharedBlocking(self):
847 self.lock.Shared(blocking=True)
850 def testExclusiveBlocking(self):
851 self.lock.Exclusive(blocking=True)
854 def testUnlockBlocking(self):
855 self.lock.Unlock(blocking=True)
858 def testSharedExclusiveUnlock(self):
859 self.lock.Shared(blocking=False)
860 self.lock.Exclusive(blocking=False)
861 self.lock.Unlock(blocking=False)
864 def testExclusiveSharedUnlock(self):
865 self.lock.Exclusive(blocking=False)
866 self.lock.Shared(blocking=False)
867 self.lock.Unlock(blocking=False)
870 def testCloseShared(self):
872 self.assertRaises(AssertionError, self.lock.Shared, blocking=False)
874 def testCloseExclusive(self):
876 self.assertRaises(AssertionError, self.lock.Exclusive, blocking=False)
878 def testCloseUnlock(self):
880 self.assertRaises(AssertionError, self.lock.Unlock, blocking=False)
883 class TestTimeFunctions(unittest.TestCase):
884 """Test case for time functions"""
887 self.assertEqual(utils.SplitTime(1), (1, 0))
888 self.assertEqual(utils.SplitTime(1.5), (1, 500000))
889 self.assertEqual(utils.SplitTime(1218448917.4809151), (1218448917, 480915))
890 self.assertEqual(utils.SplitTime(123.48012), (123, 480120))
891 self.assertEqual(utils.SplitTime(123.9996), (123, 999600))
892 self.assertEqual(utils.SplitTime(123.9995), (123, 999500))
893 self.assertEqual(utils.SplitTime(123.9994), (123, 999400))
894 self.assertEqual(utils.SplitTime(123.999999999), (123, 999999))
896 self.assertRaises(AssertionError, utils.SplitTime, -1)
898 self.assertEqual(utils.MergeTime((1, 0)), 1.0)
899 self.assertEqual(utils.MergeTime((1, 500000)), 1.5)
900 self.assertEqual(utils.MergeTime((1218448917, 500000)), 1218448917.5)
902 self.assertEqual(round(utils.MergeTime((1218448917, 481000)), 3), 1218448917.481)
903 self.assertEqual(round(utils.MergeTime((1, 801000)), 3), 1.801)
905 self.assertRaises(AssertionError, utils.MergeTime, (0, -1))
906 self.assertRaises(AssertionError, utils.MergeTime, (0, 1000000))
907 self.assertRaises(AssertionError, utils.MergeTime, (0, 9999999))
908 self.assertRaises(AssertionError, utils.MergeTime, (-1, 0))
909 self.assertRaises(AssertionError, utils.MergeTime, (-9999, 0))
912 class FieldSetTestCase(unittest.TestCase):
913 """Test case for FieldSets"""
915 def testSimpleMatch(self):
916 f = utils.FieldSet("a", "b", "c", "def")
917 self.failUnless(f.Matches("a"))
918 self.failIf(f.Matches("d"), "Substring matched")
919 self.failIf(f.Matches("defghi"), "Prefix string matched")
920 self.failIf(f.NonMatching(["b", "c"]))
921 self.failIf(f.NonMatching(["a", "b", "c", "def"]))
922 self.failUnless(f.NonMatching(["a", "d"]))
924 def testRegexMatch(self):
925 f = utils.FieldSet("a", "b([0-9]+)", "c")
926 self.failUnless(f.Matches("b1"))
927 self.failUnless(f.Matches("b99"))
928 self.failIf(f.Matches("b/1"))
929 self.failIf(f.NonMatching(["b12", "c"]))
930 self.failUnless(f.NonMatching(["a", "1"]))
932 class TestForceDictType(unittest.TestCase):
933 """Test case for ForceDictType"""
937 'a': constants.VTYPE_INT,
938 'b': constants.VTYPE_BOOL,
939 'c': constants.VTYPE_STRING,
940 'd': constants.VTYPE_SIZE,
943 def _fdt(self, dict, allowed_values=None):
944 if allowed_values is None:
945 ForceDictType(dict, self.key_types)
947 ForceDictType(dict, self.key_types, allowed_values=allowed_values)
951 def testSimpleDict(self):
952 self.assertEqual(self._fdt({}), {})
953 self.assertEqual(self._fdt({'a': 1}), {'a': 1})
954 self.assertEqual(self._fdt({'a': '1'}), {'a': 1})
955 self.assertEqual(self._fdt({'a': 1, 'b': 1}), {'a':1, 'b': True})
956 self.assertEqual(self._fdt({'b': 1, 'c': 'foo'}), {'b': True, 'c': 'foo'})
957 self.assertEqual(self._fdt({'b': 1, 'c': False}), {'b': True, 'c': ''})
958 self.assertEqual(self._fdt({'b': 'false'}), {'b': False})
959 self.assertEqual(self._fdt({'b': 'False'}), {'b': False})
960 self.assertEqual(self._fdt({'b': 'true'}), {'b': True})
961 self.assertEqual(self._fdt({'b': 'True'}), {'b': True})
962 self.assertEqual(self._fdt({'d': '4'}), {'d': 4})
963 self.assertEqual(self._fdt({'d': '4M'}), {'d': 4})
965 def testErrors(self):
966 self.assertRaises(errors.TypeEnforcementError, self._fdt, {'a': 'astring'})
967 self.assertRaises(errors.TypeEnforcementError, self._fdt, {'c': True})
968 self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': 'astring'})
969 self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': '4 L'})
972 class TestIsAbsNormPath(unittest.TestCase):
973 """Testing case for IsProcessAlive"""
975 def _pathTestHelper(self, path, result):
977 self.assert_(IsNormAbsPath(path),
978 "Path %s should result absolute and normalized" % path)
980 self.assert_(not IsNormAbsPath(path),
981 "Path %s should not result absolute and normalized" % path)
984 self._pathTestHelper('/etc', True)
985 self._pathTestHelper('/srv', True)
986 self._pathTestHelper('etc', False)
987 self._pathTestHelper('/etc/../root', False)
988 self._pathTestHelper('/etc/', False)
990 if __name__ == '__main__':