X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/e5392d79cda3c56ca7af5229627a086e028a6d5d..ba55d062da8dfb89a37afc2f13f2e689d0094829:/test/ganeti.utils_unittest.py?ds=sidebyside diff --git a/test/ganeti.utils_unittest.py b/test/ganeti.utils_unittest.py index e7b93c0..846b4b4 100755 --- a/test/ganeti.utils_unittest.py +++ b/test/ganeti.utils_unittest.py @@ -32,114 +32,154 @@ import signal import socket import shutil import re +import select import ganeti import testutils from ganeti import constants from ganeti import utils +from ganeti import errors from ganeti.utils import IsProcessAlive, RunCmd, \ RemoveFile, CheckDict, MatchNameComponent, FormatUnit, \ ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \ ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \ - SetEtcHostsEntry, RemoveEtcHostsEntry, FirstFree -from ganeti.errors import LockError, UnitParseError + SetEtcHostsEntry, RemoveEtcHostsEntry, FirstFree, OwnIpAddress, \ + TailFile, ForceDictType, IsNormAbsPath + +from ganeti.errors import LockError, UnitParseError, GenericError, \ + ProgrammerError -def _ChildHandler(signal, stack): - global _ChildFlag - _ChildFlag = True class TestIsProcessAlive(unittest.TestCase): """Testing case for IsProcessAlive""" - def setUp(self): - global _ChildFlag - # create a (most probably) non-existing process-id - self.pid_non_existing = os.fork() - if self.pid_non_existing == 0: - os._exit(0) - elif self.pid_non_existing > 0: - os.waitpid(self.pid_non_existing, 0) - else: - raise SystemError("can't fork") - _ChildFlag = False - # Use _ChildHandler for SIGCHLD - self.chldOrig = signal.signal(signal.SIGCHLD, _ChildHandler) - # create a zombie - self.pid_zombie = os.fork() - if self.pid_zombie == 0: - os._exit(0) - elif self.pid_zombie < 0: - raise SystemError("can't fork") - - def tearDown(self): - signal.signal(signal.SIGCHLD, self.chldOrig) - def testExists(self): mypid = os.getpid() self.assert_(IsProcessAlive(mypid), "can't find myself running") - def testZombie(self): - global _ChildFlag - timeout = 10 + def testNotExisting(self): + pid_non_existing = os.fork() + if pid_non_existing == 0: + os._exit(0) + elif pid_non_existing < 0: + raise SystemError("can't fork") + os.waitpid(pid_non_existing, 0) + self.assert_(not IsProcessAlive(pid_non_existing), + "nonexisting process detected") - while not _ChildFlag: - if timeout >= 0: - time.sleep(0.2) - timeout -= 0.2 - else: - self.fail("timed out waiting for child's signal") - break # not executed... - self.assert_(not IsProcessAlive(self.pid_zombie), - "zombie not detected as zombie") +class TestPidFileFunctions(unittest.TestCase): + """Tests for WritePidFile, RemovePidFile and ReadPidFile""" - def testNotExisting(self): - self.assert_(not IsProcessAlive(self.pid_non_existing), - "noexisting process detected") + def setUp(self): + self.dir = tempfile.mkdtemp() + self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name) + utils.DaemonPidFileName = self.f_dpn + + def testPidFileFunctions(self): + pid_file = self.f_dpn('test') + utils.WritePidFile('test') + self.failUnless(os.path.exists(pid_file), + "PID file should have been created") + read_pid = utils.ReadPidFile(pid_file) + self.failUnlessEqual(read_pid, os.getpid()) + self.failUnless(utils.IsProcessAlive(read_pid)) + self.failUnlessRaises(GenericError, utils.WritePidFile, 'test') + utils.RemovePidFile('test') + self.failIf(os.path.exists(pid_file), + "PID file should not exist anymore") + self.failUnlessEqual(utils.ReadPidFile(pid_file), 0, + "ReadPidFile should return 0 for missing pid file") + fh = open(pid_file, "w") + fh.write("blah\n") + fh.close() + self.failUnlessEqual(utils.ReadPidFile(pid_file), 0, + "ReadPidFile should return 0 for invalid pid file") + utils.RemovePidFile('test') + self.failIf(os.path.exists(pid_file), + "PID file should not exist anymore") + + def testKill(self): + pid_file = self.f_dpn('child') + r_fd, w_fd = os.pipe() + new_pid = os.fork() + if new_pid == 0: #child + utils.WritePidFile('child') + os.write(w_fd, 'a') + signal.pause() + os._exit(0) + return + # else we are in the parent + # wait until the child has written the pid file + os.read(r_fd, 1) + read_pid = utils.ReadPidFile(pid_file) + self.failUnlessEqual(read_pid, new_pid) + self.failUnless(utils.IsProcessAlive(new_pid)) + utils.KillProcess(new_pid, waitpid=True) + self.failIf(utils.IsProcessAlive(new_pid)) + utils.RemovePidFile('child') + self.failUnlessRaises(ProgrammerError, utils.KillProcess, 0) + + def tearDown(self): + for name in os.listdir(self.dir): + os.unlink(os.path.join(self.dir, name)) + os.rmdir(self.dir) -class TestRunCmd(unittest.TestCase): +class TestRunCmd(testutils.GanetiTestCase): """Testing case for the RunCmd function""" def setUp(self): + testutils.GanetiTestCase.setUp(self) self.magic = time.ctime() + " ganeti test" + self.fname = self._CreateTempFile() def testOk(self): """Test successful exit code""" result = RunCmd("/bin/sh -c 'exit 0'") self.assertEqual(result.exit_code, 0) + self.assertEqual(result.output, "") def testFail(self): """Test fail exit code""" result = RunCmd("/bin/sh -c 'exit 1'") self.assertEqual(result.exit_code, 1) - + self.assertEqual(result.output, "") def testStdout(self): """Test standard output""" cmd = 'echo -n "%s"' % self.magic result = RunCmd("/bin/sh -c '%s'" % cmd) self.assertEqual(result.stdout, self.magic) - + result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname) + self.assertEqual(result.output, "") + self.assertFileContent(self.fname, self.magic) def testStderr(self): """Test standard error""" cmd = 'echo -n "%s"' % self.magic result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd) self.assertEqual(result.stderr, self.magic) - + result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd, output=self.fname) + self.assertEqual(result.output, "") + self.assertFileContent(self.fname, self.magic) def testCombined(self): """Test combined output""" cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic) + expected = "A" + self.magic + "B" + self.magic result = RunCmd("/bin/sh -c '%s'" % cmd) - self.assertEqual(result.output, "A" + self.magic + "B" + self.magic) + self.assertEqual(result.output, expected) + result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname) + self.assertEqual(result.output, "") + self.assertFileContent(self.fname, expected) def testSignal(self): """Test signal""" result = RunCmd(["python", "-c", "import os; os.kill(os.getpid(), 15)"]) self.assertEqual(result.signal, 15) + self.assertEqual(result.output, "") def testListRun(self): """Test list runs""" @@ -154,6 +194,13 @@ class TestRunCmd(unittest.TestCase): self.assertEqual(result.exit_code, 0) self.assertEqual(result.stdout, self.magic) + def testFileEmptyOutput(self): + """Test file output""" + result = RunCmd(["true"], output=self.fname) + self.assertEqual(result.signal, None) + self.assertEqual(result.exit_code, 0) + self.assertFileContent(self.fname, "") + def testLang(self): """Test locale environment""" old_env = os.environ.copy() @@ -171,6 +218,17 @@ class TestRunCmd(unittest.TestCase): finally: os.environ = old_env + def testDefaultCwd(self): + """Test default working directory""" + self.failUnlessEqual(RunCmd(["pwd"]).stdout.strip(), "/") + + def testCwd(self): + """Test default working directory""" + self.failUnlessEqual(RunCmd(["pwd"], cwd="/").stdout.strip(), "/") + self.failUnlessEqual(RunCmd(["pwd"], cwd="/tmp").stdout.strip(), "/tmp") + cwd = os.getcwd() + self.failUnlessEqual(RunCmd(["pwd"], cwd=cwd).stdout.strip(), cwd) + class TestRemoveFile(unittest.TestCase): """Test case for the RemoveFile function""" @@ -218,6 +276,36 @@ class TestRemoveFile(unittest.TestCase): self.fail("File '%s' not removed" % symlink) +class TestRename(unittest.TestCase): + """Test case for RenameFile""" + + def setUp(self): + """Create a temporary directory""" + self.tmpdir = tempfile.mkdtemp() + self.tmpfile = os.path.join(self.tmpdir, "test1") + + # Touch the file + open(self.tmpfile, "w").close() + + def tearDown(self): + """Remove temporary directory""" + shutil.rmtree(self.tmpdir) + + def testSimpleRename1(self): + """Simple rename 1""" + utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz")) + + def testSimpleRename2(self): + """Simple rename 2""" + utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"), + mkdir=True) + + def testRenameMkdir(self): + """Rename with mkdir""" + utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "test/xyz"), + mkdir=True) + + class TestCheckdict(unittest.TestCase): """Test case for the CheckDict function""" @@ -265,21 +353,42 @@ class TestFormatUnit(unittest.TestCase): """Test case for the FormatUnit function""" def testMiB(self): - self.assertEqual(FormatUnit(1), '1M') - self.assertEqual(FormatUnit(100), '100M') - self.assertEqual(FormatUnit(1023), '1023M') + self.assertEqual(FormatUnit(1, 'h'), '1M') + self.assertEqual(FormatUnit(100, 'h'), '100M') + self.assertEqual(FormatUnit(1023, 'h'), '1023M') + + self.assertEqual(FormatUnit(1, 'm'), '1') + self.assertEqual(FormatUnit(100, 'm'), '100') + self.assertEqual(FormatUnit(1023, 'm'), '1023') + + self.assertEqual(FormatUnit(1024, 'm'), '1024') + self.assertEqual(FormatUnit(1536, 'm'), '1536') + self.assertEqual(FormatUnit(17133, 'm'), '17133') + self.assertEqual(FormatUnit(1024 * 1024 - 1, 'm'), '1048575') def testGiB(self): - self.assertEqual(FormatUnit(1024), '1.0G') - self.assertEqual(FormatUnit(1536), '1.5G') - self.assertEqual(FormatUnit(17133), '16.7G') - self.assertEqual(FormatUnit(1024 * 1024 - 1), '1024.0G') + self.assertEqual(FormatUnit(1024, 'h'), '1.0G') + self.assertEqual(FormatUnit(1536, 'h'), '1.5G') + self.assertEqual(FormatUnit(17133, 'h'), '16.7G') + self.assertEqual(FormatUnit(1024 * 1024 - 1, 'h'), '1024.0G') + + self.assertEqual(FormatUnit(1024, 'g'), '1.0') + self.assertEqual(FormatUnit(1536, 'g'), '1.5') + self.assertEqual(FormatUnit(17133, 'g'), '16.7') + self.assertEqual(FormatUnit(1024 * 1024 - 1, 'g'), '1024.0') + + self.assertEqual(FormatUnit(1024 * 1024, 'g'), '1024.0') + self.assertEqual(FormatUnit(5120 * 1024, 'g'), '5120.0') + self.assertEqual(FormatUnit(29829 * 1024, 'g'), '29829.0') def testTiB(self): - self.assertEqual(FormatUnit(1024 * 1024), '1.0T') - self.assertEqual(FormatUnit(5120 * 1024), '5.0T') - self.assertEqual(FormatUnit(29829 * 1024), '29.1T') + self.assertEqual(FormatUnit(1024 * 1024, 'h'), '1.0T') + self.assertEqual(FormatUnit(5120 * 1024, 'h'), '5.0T') + self.assertEqual(FormatUnit(29829 * 1024, 'h'), '29.1T') + self.assertEqual(FormatUnit(1024 * 1024, 't'), '1.0') + self.assertEqual(FormatUnit(5120 * 1024, 't'), '5.0') + self.assertEqual(FormatUnit(29829 * 1024, 't'), '29.1') class TestParseUnit(unittest.TestCase): """Test case for the ParseUnit function""" @@ -341,21 +450,14 @@ class TestSshKeys(testutils.GanetiTestCase): 'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b') def setUp(self): - (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test') + testutils.GanetiTestCase.setUp(self) + self.tmpname = self._CreateTempFile() + handle = open(self.tmpname, 'w') try: - handle = os.fdopen(fd, 'w') - try: - handle.write("%s\n" % TestSshKeys.KEY_A) - handle.write("%s\n" % TestSshKeys.KEY_B) - finally: - handle.close() - except: - utils.RemoveFile(self.tmpname) - raise - - def tearDown(self): - utils.RemoveFile(self.tmpname) - del self.tmpname + handle.write("%s\n" % TestSshKeys.KEY_A) + handle.write("%s\n" % TestSshKeys.KEY_B) + finally: + handle.close() def testAddingNewKey(self): AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test') @@ -407,22 +509,15 @@ class TestEtcHosts(testutils.GanetiTestCase): """Test functions modifying /etc/hosts""" def setUp(self): - (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test') + testutils.GanetiTestCase.setUp(self) + self.tmpname = self._CreateTempFile() + handle = open(self.tmpname, 'w') try: - handle = os.fdopen(fd, 'w') - try: - handle.write('# This is a test file for /etc/hosts\n') - handle.write('127.0.0.1\tlocalhost\n') - handle.write('192.168.1.1 router gw\n') - finally: - handle.close() - except: - utils.RemoveFile(self.tmpname) - raise - - def tearDown(self): - utils.RemoveFile(self.tmpname) - del self.tmpname + handle.write('# This is a test file for /etc/hosts\n') + handle.write('127.0.0.1\tlocalhost\n') + handle.write('192.168.1.1 router gw\n') + finally: + handle.close() def testSettingNewIp(self): SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost']) @@ -432,6 +527,7 @@ class TestEtcHosts(testutils.GanetiTestCase): "127.0.0.1\tlocalhost\n" "192.168.1.1 router gw\n" "1.2.3.4\tmyhost.domain.tld myhost\n") + self.assertFileMode(self.tmpname, 0644) def testSettingExistingIp(self): SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld', @@ -441,6 +537,7 @@ class TestEtcHosts(testutils.GanetiTestCase): "# This is a test file for /etc/hosts\n" "127.0.0.1\tlocalhost\n" "192.168.1.1\tmyhost.domain.tld myhost\n") + self.assertFileMode(self.tmpname, 0644) def testSettingDuplicateName(self): SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost']) @@ -450,6 +547,7 @@ class TestEtcHosts(testutils.GanetiTestCase): "127.0.0.1\tlocalhost\n" "192.168.1.1 router gw\n" "1.2.3.4\tmyhost\n") + self.assertFileMode(self.tmpname, 0644) def testRemovingExistingHost(self): RemoveEtcHostsEntry(self.tmpname, 'router') @@ -458,6 +556,7 @@ class TestEtcHosts(testutils.GanetiTestCase): "# This is a test file for /etc/hosts\n" "127.0.0.1\tlocalhost\n" "192.168.1.1 gw\n") + self.assertFileMode(self.tmpname, 0644) def testRemovingSingleExistingHost(self): RemoveEtcHostsEntry(self.tmpname, 'localhost') @@ -465,6 +564,7 @@ class TestEtcHosts(testutils.GanetiTestCase): self.assertFileContent(self.tmpname, "# This is a test file for /etc/hosts\n" "192.168.1.1 router gw\n") + self.assertFileMode(self.tmpname, 0644) def testRemovingNonExistingHost(self): RemoveEtcHostsEntry(self.tmpname, 'myhost') @@ -473,6 +573,7 @@ class TestEtcHosts(testutils.GanetiTestCase): "# This is a test file for /etc/hosts\n" "127.0.0.1\tlocalhost\n" "192.168.1.1 router gw\n") + self.assertFileMode(self.tmpname, 0644) def testRemovingAlias(self): RemoveEtcHostsEntry(self.tmpname, 'gw') @@ -481,6 +582,7 @@ class TestEtcHosts(testutils.GanetiTestCase): "# This is a test file for /etc/hosts\n" "127.0.0.1\tlocalhost\n" "192.168.1.1 router\n") + self.assertFileMode(self.tmpname, 0644) class TestShellQuoting(unittest.TestCase): @@ -575,6 +677,24 @@ class TestTcpPingDeaf(unittest.TestCase): "failed to ping alive host on deaf port (no source addr)") +class TestOwnIpAddress(unittest.TestCase): + """Testcase for OwnIpAddress""" + + def testOwnLoopback(self): + """check having the loopback ip""" + self.failUnless(OwnIpAddress(constants.LOCALHOST_IP_ADDRESS), + "Should own the loopback address") + + def testNowOwnAddress(self): + """check that I don't own an address""" + + # network 192.0.2.0/24 is reserved for test/documentation as per + # rfc 3330, so we *should* not have an address of this range... if + # this fails, we should extend the test to multiple addresses + DST_IP = "192.0.2.1" + self.failIf(OwnIpAddress(DST_IP), "Should not own IP address %s" % DST_IP) + + class TestListVisibleFiles(unittest.TestCase): """Test case for ListVisibleFiles""" @@ -649,6 +769,7 @@ class TestUniqueSequence(unittest.TestCase): self._test(["a", "b"], ["a", "b"]) self._test(["a", "b", "a"], ["a", "b"]) + class TestFirstFree(unittest.TestCase): """Test case for the FirstFree function""" @@ -660,5 +781,211 @@ class TestFirstFree(unittest.TestCase): self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5) self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3) + +class TestTailFile(testutils.GanetiTestCase): + """Test case for the TailFile function""" + + def testEmpty(self): + fname = self._CreateTempFile() + self.failUnlessEqual(TailFile(fname), []) + self.failUnlessEqual(TailFile(fname, lines=25), []) + + def testAllLines(self): + data = ["test %d" % i for i in range(30)] + for i in range(30): + fname = self._CreateTempFile() + fd = open(fname, "w") + fd.write("\n".join(data[:i])) + if i > 0: + fd.write("\n") + fd.close() + self.failUnlessEqual(TailFile(fname, lines=i), data[:i]) + + def testPartialLines(self): + data = ["test %d" % i for i in range(30)] + fname = self._CreateTempFile() + fd = open(fname, "w") + fd.write("\n".join(data)) + fd.write("\n") + fd.close() + for i in range(1, 30): + self.failUnlessEqual(TailFile(fname, lines=i), data[-i:]) + + def testBigFile(self): + data = ["test %d" % i for i in range(30)] + fname = self._CreateTempFile() + fd = open(fname, "w") + fd.write("X" * 1048576) + fd.write("\n") + fd.write("\n".join(data)) + fd.write("\n") + fd.close() + for i in range(1, 30): + self.failUnlessEqual(TailFile(fname, lines=i), data[-i:]) + + +class TestFileLock(unittest.TestCase): + """Test case for the FileLock class""" + + def setUp(self): + self.tmpfile = tempfile.NamedTemporaryFile() + self.lock = utils.FileLock(self.tmpfile.name) + + def testSharedNonblocking(self): + self.lock.Shared(blocking=False) + self.lock.Close() + + def testExclusiveNonblocking(self): + self.lock.Exclusive(blocking=False) + self.lock.Close() + + def testUnlockNonblocking(self): + self.lock.Unlock(blocking=False) + self.lock.Close() + + def testSharedBlocking(self): + self.lock.Shared(blocking=True) + self.lock.Close() + + def testExclusiveBlocking(self): + self.lock.Exclusive(blocking=True) + self.lock.Close() + + def testUnlockBlocking(self): + self.lock.Unlock(blocking=True) + self.lock.Close() + + def testSharedExclusiveUnlock(self): + self.lock.Shared(blocking=False) + self.lock.Exclusive(blocking=False) + self.lock.Unlock(blocking=False) + self.lock.Close() + + def testExclusiveSharedUnlock(self): + self.lock.Exclusive(blocking=False) + self.lock.Shared(blocking=False) + self.lock.Unlock(blocking=False) + self.lock.Close() + + def testCloseShared(self): + self.lock.Close() + self.assertRaises(AssertionError, self.lock.Shared, blocking=False) + + def testCloseExclusive(self): + self.lock.Close() + self.assertRaises(AssertionError, self.lock.Exclusive, blocking=False) + + def testCloseUnlock(self): + self.lock.Close() + self.assertRaises(AssertionError, self.lock.Unlock, blocking=False) + + +class TestTimeFunctions(unittest.TestCase): + """Test case for time functions""" + + def runTest(self): + self.assertEqual(utils.SplitTime(1), (1, 0)) + self.assertEqual(utils.SplitTime(1.5), (1, 500000)) + self.assertEqual(utils.SplitTime(1218448917.4809151), (1218448917, 480915)) + self.assertEqual(utils.SplitTime(123.48012), (123, 480120)) + self.assertEqual(utils.SplitTime(123.9996), (123, 999600)) + self.assertEqual(utils.SplitTime(123.9995), (123, 999500)) + self.assertEqual(utils.SplitTime(123.9994), (123, 999400)) + self.assertEqual(utils.SplitTime(123.999999999), (123, 999999)) + + self.assertRaises(AssertionError, utils.SplitTime, -1) + + self.assertEqual(utils.MergeTime((1, 0)), 1.0) + self.assertEqual(utils.MergeTime((1, 500000)), 1.5) + self.assertEqual(utils.MergeTime((1218448917, 500000)), 1218448917.5) + + self.assertEqual(round(utils.MergeTime((1218448917, 481000)), 3), 1218448917.481) + self.assertEqual(round(utils.MergeTime((1, 801000)), 3), 1.801) + + self.assertRaises(AssertionError, utils.MergeTime, (0, -1)) + self.assertRaises(AssertionError, utils.MergeTime, (0, 1000000)) + self.assertRaises(AssertionError, utils.MergeTime, (0, 9999999)) + self.assertRaises(AssertionError, utils.MergeTime, (-1, 0)) + self.assertRaises(AssertionError, utils.MergeTime, (-9999, 0)) + + +class FieldSetTestCase(unittest.TestCase): + """Test case for FieldSets""" + + def testSimpleMatch(self): + f = utils.FieldSet("a", "b", "c", "def") + self.failUnless(f.Matches("a")) + self.failIf(f.Matches("d"), "Substring matched") + self.failIf(f.Matches("defghi"), "Prefix string matched") + self.failIf(f.NonMatching(["b", "c"])) + self.failIf(f.NonMatching(["a", "b", "c", "def"])) + self.failUnless(f.NonMatching(["a", "d"])) + + def testRegexMatch(self): + f = utils.FieldSet("a", "b([0-9]+)", "c") + self.failUnless(f.Matches("b1")) + self.failUnless(f.Matches("b99")) + self.failIf(f.Matches("b/1")) + self.failIf(f.NonMatching(["b12", "c"])) + self.failUnless(f.NonMatching(["a", "1"])) + +class TestForceDictType(unittest.TestCase): + """Test case for ForceDictType""" + + def setUp(self): + self.key_types = { + 'a': constants.VTYPE_INT, + 'b': constants.VTYPE_BOOL, + 'c': constants.VTYPE_STRING, + 'd': constants.VTYPE_SIZE, + } + + def _fdt(self, dict, allowed_values=None): + if allowed_values is None: + ForceDictType(dict, self.key_types) + else: + ForceDictType(dict, self.key_types, allowed_values=allowed_values) + + return dict + + def testSimpleDict(self): + self.assertEqual(self._fdt({}), {}) + self.assertEqual(self._fdt({'a': 1}), {'a': 1}) + self.assertEqual(self._fdt({'a': '1'}), {'a': 1}) + self.assertEqual(self._fdt({'a': 1, 'b': 1}), {'a':1, 'b': True}) + self.assertEqual(self._fdt({'b': 1, 'c': 'foo'}), {'b': True, 'c': 'foo'}) + self.assertEqual(self._fdt({'b': 1, 'c': False}), {'b': True, 'c': ''}) + self.assertEqual(self._fdt({'b': 'false'}), {'b': False}) + self.assertEqual(self._fdt({'b': 'False'}), {'b': False}) + self.assertEqual(self._fdt({'b': 'true'}), {'b': True}) + self.assertEqual(self._fdt({'b': 'True'}), {'b': True}) + self.assertEqual(self._fdt({'d': '4'}), {'d': 4}) + self.assertEqual(self._fdt({'d': '4M'}), {'d': 4}) + + def testErrors(self): + self.assertRaises(errors.TypeEnforcementError, self._fdt, {'a': 'astring'}) + self.assertRaises(errors.TypeEnforcementError, self._fdt, {'c': True}) + self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': 'astring'}) + self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': '4 L'}) + + +class TestIsAbsNormPath(unittest.TestCase): + """Testing case for IsProcessAlive""" + + def _pathTestHelper(self, path, result): + if result: + self.assert_(IsNormAbsPath(path), + "Path %s should result absolute and normalized" % path) + else: + self.assert_(not IsNormAbsPath(path), + "Path %s should not result absolute and normalized" % path) + + def testBase(self): + self._pathTestHelper('/etc', True) + self._pathTestHelper('/srv', True) + self._pathTestHelper('etc', False) + self._pathTestHelper('/etc/../root', False) + self._pathTestHelper('/etc/', False) + if __name__ == '__main__': unittest.main()