X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/6e797216e7bcd411e00f9f669895848f00eeb891..ba55d062da8dfb89a37afc2f13f2e689d0094829:/test/ganeti.utils_unittest.py diff --git a/test/ganeti.utils_unittest.py b/test/ganeti.utils_unittest.py index 29fddfd..846b4b4 100755 --- a/test/ganeti.utils_unittest.py +++ b/test/ganeti.utils_unittest.py @@ -38,11 +38,14 @@ import ganeti import testutils from ganeti import constants from ganeti import utils +from ganeti import errors from ganeti.utils import IsProcessAlive, RunCmd, \ RemoveFile, CheckDict, MatchNameComponent, FormatUnit, \ ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \ ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \ - SetEtcHostsEntry, RemoveEtcHostsEntry, FirstFree, OwnIpAddress + SetEtcHostsEntry, RemoveEtcHostsEntry, FirstFree, OwnIpAddress, \ + TailFile, ForceDictType, IsNormAbsPath + from ganeti.errors import LockError, UnitParseError, GenericError, \ ProgrammerError @@ -128,13 +131,9 @@ class TestRunCmd(testutils.GanetiTestCase): """Testing case for the RunCmd function""" def setUp(self): + testutils.GanetiTestCase.setUp(self) self.magic = time.ctime() + " ganeti test" - fh, self.fname = tempfile.mkstemp() - os.close(fh) - - def tearDown(self): - if self.fname: - utils.RemoveFile(self.fname) + self.fname = self._CreateTempFile() def testOk(self): """Test successful exit code""" @@ -451,21 +450,14 @@ class TestSshKeys(testutils.GanetiTestCase): 'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b') def setUp(self): - (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test') + testutils.GanetiTestCase.setUp(self) + self.tmpname = self._CreateTempFile() + handle = open(self.tmpname, 'w') try: - handle = os.fdopen(fd, 'w') - try: - handle.write("%s\n" % TestSshKeys.KEY_A) - handle.write("%s\n" % TestSshKeys.KEY_B) - finally: - handle.close() - except: - utils.RemoveFile(self.tmpname) - raise - - def tearDown(self): - utils.RemoveFile(self.tmpname) - del self.tmpname + handle.write("%s\n" % TestSshKeys.KEY_A) + handle.write("%s\n" % TestSshKeys.KEY_B) + finally: + handle.close() def testAddingNewKey(self): AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test') @@ -517,22 +509,15 @@ class TestEtcHosts(testutils.GanetiTestCase): """Test functions modifying /etc/hosts""" def setUp(self): - (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test') + testutils.GanetiTestCase.setUp(self) + self.tmpname = self._CreateTempFile() + handle = open(self.tmpname, 'w') try: - handle = os.fdopen(fd, 'w') - try: - handle.write('# This is a test file for /etc/hosts\n') - handle.write('127.0.0.1\tlocalhost\n') - handle.write('192.168.1.1 router gw\n') - finally: - handle.close() - except: - utils.RemoveFile(self.tmpname) - raise - - def tearDown(self): - utils.RemoveFile(self.tmpname) - del self.tmpname + handle.write('# This is a test file for /etc/hosts\n') + handle.write('127.0.0.1\tlocalhost\n') + handle.write('192.168.1.1 router gw\n') + finally: + handle.close() def testSettingNewIp(self): SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost']) @@ -542,6 +527,7 @@ class TestEtcHosts(testutils.GanetiTestCase): "127.0.0.1\tlocalhost\n" "192.168.1.1 router gw\n" "1.2.3.4\tmyhost.domain.tld myhost\n") + self.assertFileMode(self.tmpname, 0644) def testSettingExistingIp(self): SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld', @@ -551,6 +537,7 @@ class TestEtcHosts(testutils.GanetiTestCase): "# This is a test file for /etc/hosts\n" "127.0.0.1\tlocalhost\n" "192.168.1.1\tmyhost.domain.tld myhost\n") + self.assertFileMode(self.tmpname, 0644) def testSettingDuplicateName(self): SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost']) @@ -560,6 +547,7 @@ class TestEtcHosts(testutils.GanetiTestCase): "127.0.0.1\tlocalhost\n" "192.168.1.1 router gw\n" "1.2.3.4\tmyhost\n") + self.assertFileMode(self.tmpname, 0644) def testRemovingExistingHost(self): RemoveEtcHostsEntry(self.tmpname, 'router') @@ -568,6 +556,7 @@ class TestEtcHosts(testutils.GanetiTestCase): "# This is a test file for /etc/hosts\n" "127.0.0.1\tlocalhost\n" "192.168.1.1 gw\n") + self.assertFileMode(self.tmpname, 0644) def testRemovingSingleExistingHost(self): RemoveEtcHostsEntry(self.tmpname, 'localhost') @@ -575,6 +564,7 @@ class TestEtcHosts(testutils.GanetiTestCase): self.assertFileContent(self.tmpname, "# This is a test file for /etc/hosts\n" "192.168.1.1 router gw\n") + self.assertFileMode(self.tmpname, 0644) def testRemovingNonExistingHost(self): RemoveEtcHostsEntry(self.tmpname, 'myhost') @@ -583,6 +573,7 @@ class TestEtcHosts(testutils.GanetiTestCase): "# This is a test file for /etc/hosts\n" "127.0.0.1\tlocalhost\n" "192.168.1.1 router gw\n") + self.assertFileMode(self.tmpname, 0644) def testRemovingAlias(self): RemoveEtcHostsEntry(self.tmpname, 'gw') @@ -591,6 +582,7 @@ class TestEtcHosts(testutils.GanetiTestCase): "# This is a test file for /etc/hosts\n" "127.0.0.1\tlocalhost\n" "192.168.1.1 router\n") + self.assertFileMode(self.tmpname, 0644) class TestShellQuoting(unittest.TestCase): @@ -790,6 +782,48 @@ class TestFirstFree(unittest.TestCase): self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3) +class TestTailFile(testutils.GanetiTestCase): + """Test case for the TailFile function""" + + def testEmpty(self): + fname = self._CreateTempFile() + self.failUnlessEqual(TailFile(fname), []) + self.failUnlessEqual(TailFile(fname, lines=25), []) + + def testAllLines(self): + data = ["test %d" % i for i in range(30)] + for i in range(30): + fname = self._CreateTempFile() + fd = open(fname, "w") + fd.write("\n".join(data[:i])) + if i > 0: + fd.write("\n") + fd.close() + self.failUnlessEqual(TailFile(fname, lines=i), data[:i]) + + def testPartialLines(self): + data = ["test %d" % i for i in range(30)] + fname = self._CreateTempFile() + fd = open(fname, "w") + fd.write("\n".join(data)) + fd.write("\n") + fd.close() + for i in range(1, 30): + self.failUnlessEqual(TailFile(fname, lines=i), data[-i:]) + + def testBigFile(self): + data = ["test %d" % i for i in range(30)] + fname = self._CreateTempFile() + fd = open(fname, "w") + fd.write("X" * 1048576) + fd.write("\n") + fd.write("\n".join(data)) + fd.write("\n") + fd.close() + for i in range(1, 30): + self.failUnlessEqual(TailFile(fname, lines=i), data[-i:]) + + class TestFileLock(unittest.TestCase): """Test case for the FileLock class""" @@ -895,6 +929,63 @@ class FieldSetTestCase(unittest.TestCase): self.failIf(f.NonMatching(["b12", "c"])) self.failUnless(f.NonMatching(["a", "1"])) +class TestForceDictType(unittest.TestCase): + """Test case for ForceDictType""" + + def setUp(self): + self.key_types = { + 'a': constants.VTYPE_INT, + 'b': constants.VTYPE_BOOL, + 'c': constants.VTYPE_STRING, + 'd': constants.VTYPE_SIZE, + } + + def _fdt(self, dict, allowed_values=None): + if allowed_values is None: + ForceDictType(dict, self.key_types) + else: + ForceDictType(dict, self.key_types, allowed_values=allowed_values) + + return dict + + def testSimpleDict(self): + self.assertEqual(self._fdt({}), {}) + self.assertEqual(self._fdt({'a': 1}), {'a': 1}) + self.assertEqual(self._fdt({'a': '1'}), {'a': 1}) + self.assertEqual(self._fdt({'a': 1, 'b': 1}), {'a':1, 'b': True}) + self.assertEqual(self._fdt({'b': 1, 'c': 'foo'}), {'b': True, 'c': 'foo'}) + self.assertEqual(self._fdt({'b': 1, 'c': False}), {'b': True, 'c': ''}) + self.assertEqual(self._fdt({'b': 'false'}), {'b': False}) + self.assertEqual(self._fdt({'b': 'False'}), {'b': False}) + self.assertEqual(self._fdt({'b': 'true'}), {'b': True}) + self.assertEqual(self._fdt({'b': 'True'}), {'b': True}) + self.assertEqual(self._fdt({'d': '4'}), {'d': 4}) + self.assertEqual(self._fdt({'d': '4M'}), {'d': 4}) + + def testErrors(self): + self.assertRaises(errors.TypeEnforcementError, self._fdt, {'a': 'astring'}) + self.assertRaises(errors.TypeEnforcementError, self._fdt, {'c': True}) + self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': 'astring'}) + self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': '4 L'}) + + +class TestIsAbsNormPath(unittest.TestCase): + """Testing case for IsProcessAlive""" + + def _pathTestHelper(self, path, result): + if result: + self.assert_(IsNormAbsPath(path), + "Path %s should result absolute and normalized" % path) + else: + self.assert_(not IsNormAbsPath(path), + "Path %s should not result absolute and normalized" % path) + + def testBase(self): + self._pathTestHelper('/etc', True) + self._pathTestHelper('/srv', True) + self._pathTestHelper('etc', False) + self._pathTestHelper('/etc/../root', False) + self._pathTestHelper('/etc/', False) if __name__ == '__main__': unittest.main()