X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/caad16e24a639597fb04e2daa40a42e526b63d36..637b8d7ec2a2267d99ac213812ed529a2fd1fb30:/test/ganeti.utils_unittest.py?ds=sidebyside diff --git a/test/ganeti.utils_unittest.py b/test/ganeti.utils_unittest.py index 2412328..415ffa7 100755 --- a/test/ganeti.utils_unittest.py +++ b/test/ganeti.utils_unittest.py @@ -32,73 +32,42 @@ import signal import socket import shutil import re +import select +import string import ganeti import testutils from ganeti import constants from ganeti import utils +from ganeti import errors from ganeti.utils import IsProcessAlive, RunCmd, \ - RemoveFile, CheckDict, MatchNameComponent, FormatUnit, \ + RemoveFile, MatchNameComponent, FormatUnit, \ ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \ ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \ - SetEtcHostsEntry, RemoveEtcHostsEntry, FirstFree, OwnIpAddress + SetEtcHostsEntry, RemoveEtcHostsEntry, FirstFree, OwnIpAddress, \ + TailFile, ForceDictType, SafeEncode, IsNormAbsPath + from ganeti.errors import LockError, UnitParseError, GenericError, \ ProgrammerError -def _ChildHandler(signal, stack): - global _ChildFlag - _ChildFlag = True - class TestIsProcessAlive(unittest.TestCase): """Testing case for IsProcessAlive""" - def setUp(self): - global _ChildFlag - # create a (most probably) non-existing process-id - self.pid_non_existing = os.fork() - if self.pid_non_existing == 0: - os._exit(0) - elif self.pid_non_existing > 0: - os.waitpid(self.pid_non_existing, 0) - else: - raise SystemError("can't fork") - _ChildFlag = False - # Use _ChildHandler for SIGCHLD - self.chldOrig = signal.signal(signal.SIGCHLD, _ChildHandler) - # create a zombie - self.pid_zombie = os.fork() - if self.pid_zombie == 0: - os._exit(0) - elif self.pid_zombie < 0: - raise SystemError("can't fork") - - def tearDown(self): - signal.signal(signal.SIGCHLD, self.chldOrig) - def testExists(self): mypid = os.getpid() self.assert_(IsProcessAlive(mypid), "can't find myself running") - def testZombie(self): - global _ChildFlag - timeout = 10 - - while not _ChildFlag: - if timeout >= 0: - time.sleep(0.2) - timeout -= 0.2 - else: - self.fail("timed out waiting for child's signal") - break # not executed... - - self.assert_(not IsProcessAlive(self.pid_zombie), - "zombie not detected as zombie") - def testNotExisting(self): - self.assert_(not IsProcessAlive(self.pid_non_existing), - "noexisting process detected") + pid_non_existing = os.fork() + if pid_non_existing == 0: + os._exit(0) + elif pid_non_existing < 0: + raise SystemError("can't fork") + os.waitpid(pid_non_existing, 0) + self.assert_(not IsProcessAlive(pid_non_existing), + "nonexisting process detected") class TestPidFileFunctions(unittest.TestCase): @@ -148,7 +117,7 @@ class TestPidFileFunctions(unittest.TestCase): read_pid = utils.ReadPidFile(pid_file) self.failUnlessEqual(read_pid, new_pid) self.failUnless(utils.IsProcessAlive(new_pid)) - utils.KillProcess(new_pid) + utils.KillProcess(new_pid, waitpid=True) self.failIf(utils.IsProcessAlive(new_pid)) utils.RemovePidFile('child') self.failUnlessRaises(ProgrammerError, utils.KillProcess, 0) @@ -159,47 +128,59 @@ class TestPidFileFunctions(unittest.TestCase): os.rmdir(self.dir) -class TestRunCmd(unittest.TestCase): +class TestRunCmd(testutils.GanetiTestCase): """Testing case for the RunCmd function""" def setUp(self): + testutils.GanetiTestCase.setUp(self) self.magic = time.ctime() + " ganeti test" + self.fname = self._CreateTempFile() def testOk(self): """Test successful exit code""" result = RunCmd("/bin/sh -c 'exit 0'") self.assertEqual(result.exit_code, 0) + self.assertEqual(result.output, "") def testFail(self): """Test fail exit code""" result = RunCmd("/bin/sh -c 'exit 1'") self.assertEqual(result.exit_code, 1) - + self.assertEqual(result.output, "") def testStdout(self): """Test standard output""" cmd = 'echo -n "%s"' % self.magic result = RunCmd("/bin/sh -c '%s'" % cmd) self.assertEqual(result.stdout, self.magic) - + result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname) + self.assertEqual(result.output, "") + self.assertFileContent(self.fname, self.magic) def testStderr(self): """Test standard error""" cmd = 'echo -n "%s"' % self.magic result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd) self.assertEqual(result.stderr, self.magic) - + result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd, output=self.fname) + self.assertEqual(result.output, "") + self.assertFileContent(self.fname, self.magic) def testCombined(self): """Test combined output""" cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic) + expected = "A" + self.magic + "B" + self.magic result = RunCmd("/bin/sh -c '%s'" % cmd) - self.assertEqual(result.output, "A" + self.magic + "B" + self.magic) + self.assertEqual(result.output, expected) + result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname) + self.assertEqual(result.output, "") + self.assertFileContent(self.fname, expected) def testSignal(self): """Test signal""" result = RunCmd(["python", "-c", "import os; os.kill(os.getpid(), 15)"]) self.assertEqual(result.signal, 15) + self.assertEqual(result.output, "") def testListRun(self): """Test list runs""" @@ -214,6 +195,13 @@ class TestRunCmd(unittest.TestCase): self.assertEqual(result.exit_code, 0) self.assertEqual(result.stdout, self.magic) + def testFileEmptyOutput(self): + """Test file output""" + result = RunCmd(["true"], output=self.fname) + self.assertEqual(result.signal, None) + self.assertEqual(result.exit_code, 0) + self.assertFileContent(self.fname, "") + def testLang(self): """Test locale environment""" old_env = os.environ.copy() @@ -231,6 +219,17 @@ class TestRunCmd(unittest.TestCase): finally: os.environ = old_env + def testDefaultCwd(self): + """Test default working directory""" + self.failUnlessEqual(RunCmd(["pwd"]).stdout.strip(), "/") + + def testCwd(self): + """Test default working directory""" + self.failUnlessEqual(RunCmd(["pwd"], cwd="/").stdout.strip(), "/") + self.failUnlessEqual(RunCmd(["pwd"], cwd="/tmp").stdout.strip(), "/tmp") + cwd = os.getcwd() + self.failUnlessEqual(RunCmd(["pwd"], cwd=cwd).stdout.strip(), cwd) + class TestRemoveFile(unittest.TestCase): """Test case for the RemoveFile function""" @@ -278,25 +277,34 @@ class TestRemoveFile(unittest.TestCase): self.fail("File '%s' not removed" % symlink) -class TestCheckdict(unittest.TestCase): - """Test case for the CheckDict function""" +class TestRename(unittest.TestCase): + """Test case for RenameFile""" - def testAdd(self): - """Test that CheckDict adds a missing key with the correct value""" + def setUp(self): + """Create a temporary directory""" + self.tmpdir = tempfile.mkdtemp() + self.tmpfile = os.path.join(self.tmpdir, "test1") + + # Touch the file + open(self.tmpfile, "w").close() + + def tearDown(self): + """Remove temporary directory""" + shutil.rmtree(self.tmpdir) - tgt = {'a':1} - tmpl = {'b': 2} - CheckDict(tgt, tmpl) - if 'b' not in tgt or tgt['b'] != 2: - self.fail("Failed to update dict") + def testSimpleRename1(self): + """Simple rename 1""" + utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz")) + def testSimpleRename2(self): + """Simple rename 2""" + utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"), + mkdir=True) - def testNoUpdate(self): - """Test that CheckDict does not overwrite an existing key""" - tgt = {'a':1, 'b': 3} - tmpl = {'b': 2} - CheckDict(tgt, tmpl) - self.failUnlessEqual(tgt['b'], 3) + def testRenameMkdir(self): + """Rename with mkdir""" + utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "test/xyz"), + mkdir=True) class TestMatchNameComponent(unittest.TestCase): @@ -325,21 +333,42 @@ class TestFormatUnit(unittest.TestCase): """Test case for the FormatUnit function""" def testMiB(self): - self.assertEqual(FormatUnit(1), '1M') - self.assertEqual(FormatUnit(100), '100M') - self.assertEqual(FormatUnit(1023), '1023M') + self.assertEqual(FormatUnit(1, 'h'), '1M') + self.assertEqual(FormatUnit(100, 'h'), '100M') + self.assertEqual(FormatUnit(1023, 'h'), '1023M') + + self.assertEqual(FormatUnit(1, 'm'), '1') + self.assertEqual(FormatUnit(100, 'm'), '100') + self.assertEqual(FormatUnit(1023, 'm'), '1023') + + self.assertEqual(FormatUnit(1024, 'm'), '1024') + self.assertEqual(FormatUnit(1536, 'm'), '1536') + self.assertEqual(FormatUnit(17133, 'm'), '17133') + self.assertEqual(FormatUnit(1024 * 1024 - 1, 'm'), '1048575') def testGiB(self): - self.assertEqual(FormatUnit(1024), '1.0G') - self.assertEqual(FormatUnit(1536), '1.5G') - self.assertEqual(FormatUnit(17133), '16.7G') - self.assertEqual(FormatUnit(1024 * 1024 - 1), '1024.0G') + self.assertEqual(FormatUnit(1024, 'h'), '1.0G') + self.assertEqual(FormatUnit(1536, 'h'), '1.5G') + self.assertEqual(FormatUnit(17133, 'h'), '16.7G') + self.assertEqual(FormatUnit(1024 * 1024 - 1, 'h'), '1024.0G') + + self.assertEqual(FormatUnit(1024, 'g'), '1.0') + self.assertEqual(FormatUnit(1536, 'g'), '1.5') + self.assertEqual(FormatUnit(17133, 'g'), '16.7') + self.assertEqual(FormatUnit(1024 * 1024 - 1, 'g'), '1024.0') + + self.assertEqual(FormatUnit(1024 * 1024, 'g'), '1024.0') + self.assertEqual(FormatUnit(5120 * 1024, 'g'), '5120.0') + self.assertEqual(FormatUnit(29829 * 1024, 'g'), '29829.0') def testTiB(self): - self.assertEqual(FormatUnit(1024 * 1024), '1.0T') - self.assertEqual(FormatUnit(5120 * 1024), '5.0T') - self.assertEqual(FormatUnit(29829 * 1024), '29.1T') + self.assertEqual(FormatUnit(1024 * 1024, 'h'), '1.0T') + self.assertEqual(FormatUnit(5120 * 1024, 'h'), '5.0T') + self.assertEqual(FormatUnit(29829 * 1024, 'h'), '29.1T') + self.assertEqual(FormatUnit(1024 * 1024, 't'), '1.0') + self.assertEqual(FormatUnit(5120 * 1024, 't'), '5.0') + self.assertEqual(FormatUnit(29829 * 1024, 't'), '29.1') class TestParseUnit(unittest.TestCase): """Test case for the ParseUnit function""" @@ -401,21 +430,14 @@ class TestSshKeys(testutils.GanetiTestCase): 'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b') def setUp(self): - (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test') + testutils.GanetiTestCase.setUp(self) + self.tmpname = self._CreateTempFile() + handle = open(self.tmpname, 'w') try: - handle = os.fdopen(fd, 'w') - try: - handle.write("%s\n" % TestSshKeys.KEY_A) - handle.write("%s\n" % TestSshKeys.KEY_B) - finally: - handle.close() - except: - utils.RemoveFile(self.tmpname) - raise - - def tearDown(self): - utils.RemoveFile(self.tmpname) - del self.tmpname + handle.write("%s\n" % TestSshKeys.KEY_A) + handle.write("%s\n" % TestSshKeys.KEY_B) + finally: + handle.close() def testAddingNewKey(self): AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test') @@ -467,22 +489,15 @@ class TestEtcHosts(testutils.GanetiTestCase): """Test functions modifying /etc/hosts""" def setUp(self): - (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test') + testutils.GanetiTestCase.setUp(self) + self.tmpname = self._CreateTempFile() + handle = open(self.tmpname, 'w') try: - handle = os.fdopen(fd, 'w') - try: - handle.write('# This is a test file for /etc/hosts\n') - handle.write('127.0.0.1\tlocalhost\n') - handle.write('192.168.1.1 router gw\n') - finally: - handle.close() - except: - utils.RemoveFile(self.tmpname) - raise - - def tearDown(self): - utils.RemoveFile(self.tmpname) - del self.tmpname + handle.write('# This is a test file for /etc/hosts\n') + handle.write('127.0.0.1\tlocalhost\n') + handle.write('192.168.1.1 router gw\n') + finally: + handle.close() def testSettingNewIp(self): SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost']) @@ -492,6 +507,7 @@ class TestEtcHosts(testutils.GanetiTestCase): "127.0.0.1\tlocalhost\n" "192.168.1.1 router gw\n" "1.2.3.4\tmyhost.domain.tld myhost\n") + self.assertFileMode(self.tmpname, 0644) def testSettingExistingIp(self): SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld', @@ -501,6 +517,7 @@ class TestEtcHosts(testutils.GanetiTestCase): "# This is a test file for /etc/hosts\n" "127.0.0.1\tlocalhost\n" "192.168.1.1\tmyhost.domain.tld myhost\n") + self.assertFileMode(self.tmpname, 0644) def testSettingDuplicateName(self): SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost']) @@ -510,6 +527,7 @@ class TestEtcHosts(testutils.GanetiTestCase): "127.0.0.1\tlocalhost\n" "192.168.1.1 router gw\n" "1.2.3.4\tmyhost\n") + self.assertFileMode(self.tmpname, 0644) def testRemovingExistingHost(self): RemoveEtcHostsEntry(self.tmpname, 'router') @@ -518,6 +536,7 @@ class TestEtcHosts(testutils.GanetiTestCase): "# This is a test file for /etc/hosts\n" "127.0.0.1\tlocalhost\n" "192.168.1.1 gw\n") + self.assertFileMode(self.tmpname, 0644) def testRemovingSingleExistingHost(self): RemoveEtcHostsEntry(self.tmpname, 'localhost') @@ -525,6 +544,7 @@ class TestEtcHosts(testutils.GanetiTestCase): self.assertFileContent(self.tmpname, "# This is a test file for /etc/hosts\n" "192.168.1.1 router gw\n") + self.assertFileMode(self.tmpname, 0644) def testRemovingNonExistingHost(self): RemoveEtcHostsEntry(self.tmpname, 'myhost') @@ -533,6 +553,7 @@ class TestEtcHosts(testutils.GanetiTestCase): "# This is a test file for /etc/hosts\n" "127.0.0.1\tlocalhost\n" "192.168.1.1 router gw\n") + self.assertFileMode(self.tmpname, 0644) def testRemovingAlias(self): RemoveEtcHostsEntry(self.tmpname, 'gw') @@ -541,6 +562,7 @@ class TestEtcHosts(testutils.GanetiTestCase): "# This is a test file for /etc/hosts\n" "127.0.0.1\tlocalhost\n" "192.168.1.1 router\n") + self.assertFileMode(self.tmpname, 0644) class TestShellQuoting(unittest.TestCase): @@ -740,6 +762,48 @@ class TestFirstFree(unittest.TestCase): self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3) +class TestTailFile(testutils.GanetiTestCase): + """Test case for the TailFile function""" + + def testEmpty(self): + fname = self._CreateTempFile() + self.failUnlessEqual(TailFile(fname), []) + self.failUnlessEqual(TailFile(fname, lines=25), []) + + def testAllLines(self): + data = ["test %d" % i for i in range(30)] + for i in range(30): + fname = self._CreateTempFile() + fd = open(fname, "w") + fd.write("\n".join(data[:i])) + if i > 0: + fd.write("\n") + fd.close() + self.failUnlessEqual(TailFile(fname, lines=i), data[:i]) + + def testPartialLines(self): + data = ["test %d" % i for i in range(30)] + fname = self._CreateTempFile() + fd = open(fname, "w") + fd.write("\n".join(data)) + fd.write("\n") + fd.close() + for i in range(1, 30): + self.failUnlessEqual(TailFile(fname, lines=i), data[-i:]) + + def testBigFile(self): + data = ["test %d" % i for i in range(30)] + fname = self._CreateTempFile() + fd = open(fname, "w") + fd.write("X" * 1048576) + fd.write("\n") + fd.write("\n".join(data)) + fd.write("\n") + fd.close() + for i in range(1, 30): + self.failUnlessEqual(TailFile(fname, lines=i), data[-i:]) + + class TestFileLock(unittest.TestCase): """Test case for the FileLock class""" @@ -825,5 +889,103 @@ class TestTimeFunctions(unittest.TestCase): self.assertRaises(AssertionError, utils.MergeTime, (-9999, 0)) +class FieldSetTestCase(unittest.TestCase): + """Test case for FieldSets""" + + def testSimpleMatch(self): + f = utils.FieldSet("a", "b", "c", "def") + self.failUnless(f.Matches("a")) + self.failIf(f.Matches("d"), "Substring matched") + self.failIf(f.Matches("defghi"), "Prefix string matched") + self.failIf(f.NonMatching(["b", "c"])) + self.failIf(f.NonMatching(["a", "b", "c", "def"])) + self.failUnless(f.NonMatching(["a", "d"])) + + def testRegexMatch(self): + f = utils.FieldSet("a", "b([0-9]+)", "c") + self.failUnless(f.Matches("b1")) + self.failUnless(f.Matches("b99")) + self.failIf(f.Matches("b/1")) + self.failIf(f.NonMatching(["b12", "c"])) + self.failUnless(f.NonMatching(["a", "1"])) + +class TestForceDictType(unittest.TestCase): + """Test case for ForceDictType""" + + def setUp(self): + self.key_types = { + 'a': constants.VTYPE_INT, + 'b': constants.VTYPE_BOOL, + 'c': constants.VTYPE_STRING, + 'd': constants.VTYPE_SIZE, + } + + def _fdt(self, dict, allowed_values=None): + if allowed_values is None: + ForceDictType(dict, self.key_types) + else: + ForceDictType(dict, self.key_types, allowed_values=allowed_values) + + return dict + + def testSimpleDict(self): + self.assertEqual(self._fdt({}), {}) + self.assertEqual(self._fdt({'a': 1}), {'a': 1}) + self.assertEqual(self._fdt({'a': '1'}), {'a': 1}) + self.assertEqual(self._fdt({'a': 1, 'b': 1}), {'a':1, 'b': True}) + self.assertEqual(self._fdt({'b': 1, 'c': 'foo'}), {'b': True, 'c': 'foo'}) + self.assertEqual(self._fdt({'b': 1, 'c': False}), {'b': True, 'c': ''}) + self.assertEqual(self._fdt({'b': 'false'}), {'b': False}) + self.assertEqual(self._fdt({'b': 'False'}), {'b': False}) + self.assertEqual(self._fdt({'b': 'true'}), {'b': True}) + self.assertEqual(self._fdt({'b': 'True'}), {'b': True}) + self.assertEqual(self._fdt({'d': '4'}), {'d': 4}) + self.assertEqual(self._fdt({'d': '4M'}), {'d': 4}) + + def testErrors(self): + self.assertRaises(errors.TypeEnforcementError, self._fdt, {'a': 'astring'}) + self.assertRaises(errors.TypeEnforcementError, self._fdt, {'c': True}) + self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': 'astring'}) + self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': '4 L'}) + + +class TestIsAbsNormPath(unittest.TestCase): + """Testing case for IsProcessAlive""" + + def _pathTestHelper(self, path, result): + if result: + self.assert_(IsNormAbsPath(path), + "Path %s should result absolute and normalized" % path) + else: + self.assert_(not IsNormAbsPath(path), + "Path %s should not result absolute and normalized" % path) + + def testBase(self): + self._pathTestHelper('/etc', True) + self._pathTestHelper('/srv', True) + self._pathTestHelper('etc', False) + self._pathTestHelper('/etc/../root', False) + self._pathTestHelper('/etc/', False) + + +class TestSafeEncode(unittest.TestCase): + """Test case for SafeEncode""" + + def testAscii(self): + for txt in [string.digits, string.letters, string.punctuation]: + self.failUnlessEqual(txt, SafeEncode(txt)) + + def testDoubleEncode(self): + for i in range(255): + txt = SafeEncode(chr(i)) + self.failUnlessEqual(txt, SafeEncode(txt)) + + def testUnicode(self): + # 1024 is high enough to catch non-direct ASCII mappings + for i in range(1024): + txt = SafeEncode(unichr(i)) + self.failUnlessEqual(txt, SafeEncode(txt)) + + if __name__ == '__main__': unittest.main()