X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/90e234a6b1bc3638304c4563c37a019eb882f882..43c16a8a1adfd543751fcaf60ad4c8e04cf83688:/test/ganeti.utils.io_unittest.py diff --git a/test/ganeti.utils.io_unittest.py b/test/ganeti.utils.io_unittest.py index 43b4634..109232a 100755 --- a/test/ganeti.utils.io_unittest.py +++ b/test/ganeti.utils.io_unittest.py @@ -28,6 +28,8 @@ import shutil import glob import time import signal +import stat +import errno from ganeti import constants from ganeti import utils @@ -55,6 +57,12 @@ class TestReadFile(testutils.GanetiTestCase): h.update(data) self.assertEqual(h.hexdigest(), "893772354e4e690b9efd073eed433ce7") + def testCallback(self): + def _Cb(fh): + self.assertEqual(fh.tell(), 0) + data = utils.ReadFile(self._TestDataFilename("cert1.pem"), preread=_Cb) + self.assertEqual(len(data), 814) + def testError(self): self.assertRaises(EnvironmentError, utils.ReadFile, "/dev/null/does-not-exist") @@ -231,14 +239,42 @@ class TestListVisibleFiles(unittest.TestCase): self.failUnlessRaises(errors.ProgrammerError, utils.ListVisibleFiles, "/bin/../tmp") + def testMountpoint(self): + lvfmp_fn = compat.partial(utils.ListVisibleFiles, + _is_mountpoint=lambda _: True) + self.assertEqual(lvfmp_fn(self.path), []) + + # Create "lost+found" as a regular file + self._CreateFiles(["foo", "bar", ".baz", "lost+found"]) + self.assertEqual(set(lvfmp_fn(self.path)), + set(["foo", "bar", "lost+found"])) + + # Replace "lost+found" with a directory + laf_path = utils.PathJoin(self.path, "lost+found") + utils.RemoveFile(laf_path) + os.mkdir(laf_path) + self.assertEqual(set(lvfmp_fn(self.path)), set(["foo", "bar"])) + + def testLostAndFoundNoMountpoint(self): + files = ["foo", "bar", ".Hello World", "lost+found"] + expected = ["foo", "bar", "lost+found"] + self._test(files, expected) + -class TestWriteFile(unittest.TestCase): +class TestWriteFile(testutils.GanetiTestCase): def setUp(self): + testutils.GanetiTestCase.setUp(self) + self.tmpdir = None self.tfile = tempfile.NamedTemporaryFile() self.did_pre = False self.did_post = False self.did_write = False + def tearDown(self): + testutils.GanetiTestCase.tearDown(self) + if self.tmpdir: + shutil.rmtree(self.tmpdir) + def markPre(self, fd): self.did_pre = True @@ -253,18 +289,42 @@ class TestWriteFile(unittest.TestCase): utils.WriteFile(self.tfile.name, data=data) self.assertEqual(utils.ReadFile(self.tfile.name), data) + def testWriteSimpleUnicode(self): + data = u"abc" + utils.WriteFile(self.tfile.name, data=data) + self.assertEqual(utils.ReadFile(self.tfile.name), data) + def testErrors(self): self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name, data="test", fn=lambda fd: None) self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name) self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name, data="test", atime=0) - - def testCalls(self): - utils.WriteFile(self.tfile.name, fn=self.markWrite, - prewrite=self.markPre, postwrite=self.markPost) + self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name, + mode=0400, keep_perms=utils.KP_ALWAYS) + self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name, + uid=0, keep_perms=utils.KP_ALWAYS) + self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name, + gid=0, keep_perms=utils.KP_ALWAYS) + self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name, + mode=0400, uid=0, keep_perms=utils.KP_ALWAYS) + + def testPreWrite(self): + utils.WriteFile(self.tfile.name, data="", prewrite=self.markPre) self.assertTrue(self.did_pre) + self.assertFalse(self.did_post) + self.assertFalse(self.did_write) + + def testPostWrite(self): + utils.WriteFile(self.tfile.name, data="", postwrite=self.markPost) + self.assertFalse(self.did_pre) self.assertTrue(self.did_post) + self.assertFalse(self.did_write) + + def testWriteFunction(self): + utils.WriteFile(self.tfile.name, fn=self.markWrite) + self.assertFalse(self.did_pre) + self.assertFalse(self.did_post) self.assertTrue(self.did_write) def testDryRun(self): @@ -293,6 +353,79 @@ class TestWriteFile(unittest.TestCase): finally: os.close(fd) + def testNoLeftovers(self): + self.tmpdir = tempfile.mkdtemp() + self.assertEqual(utils.WriteFile(utils.PathJoin(self.tmpdir, "test"), + data="abc"), + None) + self.assertEqual(os.listdir(self.tmpdir), ["test"]) + + def testFailRename(self): + self.tmpdir = tempfile.mkdtemp() + target = utils.PathJoin(self.tmpdir, "target") + os.mkdir(target) + self.assertRaises(OSError, utils.WriteFile, target, data="abc") + self.assertTrue(os.path.isdir(target)) + self.assertEqual(os.listdir(self.tmpdir), ["target"]) + self.assertFalse(os.listdir(target)) + + def testFailRenameDryRun(self): + self.tmpdir = tempfile.mkdtemp() + target = utils.PathJoin(self.tmpdir, "target") + os.mkdir(target) + self.assertEqual(utils.WriteFile(target, data="abc", dry_run=True), None) + self.assertTrue(os.path.isdir(target)) + self.assertEqual(os.listdir(self.tmpdir), ["target"]) + self.assertFalse(os.listdir(target)) + + def testBackup(self): + self.tmpdir = tempfile.mkdtemp() + testfile = utils.PathJoin(self.tmpdir, "test") + + self.assertEqual(utils.WriteFile(testfile, data="foo", backup=True), None) + self.assertEqual(utils.ReadFile(testfile), "foo") + self.assertEqual(os.listdir(self.tmpdir), ["test"]) + + # Write again + assert os.path.isfile(testfile) + self.assertEqual(utils.WriteFile(testfile, data="bar", backup=True), None) + self.assertEqual(utils.ReadFile(testfile), "bar") + self.assertEqual(len(glob.glob("%s.backup*" % testfile)), 1) + self.assertTrue("test" in os.listdir(self.tmpdir)) + self.assertEqual(len(os.listdir(self.tmpdir)), 2) + + # Write again as dry-run + assert os.path.isfile(testfile) + self.assertEqual(utils.WriteFile(testfile, data="000", backup=True, + dry_run=True), + None) + self.assertEqual(utils.ReadFile(testfile), "bar") + self.assertEqual(len(glob.glob("%s.backup*" % testfile)), 1) + self.assertTrue("test" in os.listdir(self.tmpdir)) + self.assertEqual(len(os.listdir(self.tmpdir)), 2) + + def testFileMode(self): + self.tmpdir = tempfile.mkdtemp() + target = utils.PathJoin(self.tmpdir, "target") + self.assertRaises(OSError, utils.WriteFile, target, data="data", + keep_perms=utils.KP_ALWAYS) + # All masks have only user bits set, to avoid interactions with umask + utils.WriteFile(target, data="data", mode=0200) + self.assertFileMode(target, 0200) + utils.WriteFile(target, data="data", mode=0400, + keep_perms=utils.KP_IF_EXISTS) + self.assertFileMode(target, 0200) + utils.WriteFile(target, data="data", keep_perms=utils.KP_ALWAYS) + self.assertFileMode(target, 0200) + utils.WriteFile(target, data="data", mode=0700) + self.assertFileMode(target, 0700) + + def testNewFileMode(self): + self.tmpdir = tempfile.mkdtemp() + target = utils.PathJoin(self.tmpdir, "target") + utils.WriteFile(target, data="data", mode=0400, + keep_perms=utils.KP_IF_EXISTS) + self.assertFileMode(target, 0400) class TestFileID(testutils.GanetiTestCase): def testEquality(self): @@ -303,7 +436,6 @@ class TestFileID(testutils.GanetiTestCase): def testUpdate(self): name = self._CreateTempFile() oldi = utils.GetFileID(path=name) - os.utime(name, None) fd = os.open(name, os.O_RDWR) try: newi = utils.GetFileID(fd=fd) @@ -428,12 +560,15 @@ class TestRename(unittest.TestCase): self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test"))) self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/xyz"))) - utils.RenameFile(os.path.join(self.tmpdir, "test/xyz"), - os.path.join(self.tmpdir, "test/foo/bar/baz"), - mkdir=True) - self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test"))) - self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test/foo/bar"))) - self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/foo/bar/baz"))) + self.assertRaises(EnvironmentError, utils.RenameFile, + os.path.join(self.tmpdir, "test/xyz"), + os.path.join(self.tmpdir, "test/foo/bar/baz"), + mkdir=True) + + self.assertTrue(os.path.exists(os.path.join(self.tmpdir, "test/xyz"))) + self.assertFalse(os.path.exists(os.path.join(self.tmpdir, "test/foo/bar"))) + self.assertFalse(os.path.exists(os.path.join(self.tmpdir, + "test/foo/bar/baz"))) class TestMakedirs(unittest.TestCase): @@ -508,6 +643,32 @@ class TestIsNormAbsPath(unittest.TestCase): self._pathTestHelper("/etc/", False) +class TestIsBelowDir(unittest.TestCase): + """Testing case for IsBelowDir""" + + def testSamePrefix(self): + self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/c")) + self.assertTrue(utils.IsBelowDir("/a/b/", "/a/b/e")) + + def testSamePrefixButDifferentDir(self): + self.assertFalse(utils.IsBelowDir("/a/b", "/a/bc/d")) + self.assertFalse(utils.IsBelowDir("/a/b/", "/a/bc/e")) + + def testSamePrefixButDirTraversal(self): + self.assertFalse(utils.IsBelowDir("/a/b", "/a/b/../c")) + self.assertFalse(utils.IsBelowDir("/a/b/", "/a/b/../d")) + + def testSamePrefixAndTraversal(self): + self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/c/../d")) + self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/c/./e")) + self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/../b/./e")) + + def testBothAbsPath(self): + self.assertRaises(ValueError, utils.IsBelowDir, "/a/b/c", "d") + self.assertRaises(ValueError, utils.IsBelowDir, "a/b/c", "/d") + self.assertRaises(ValueError, utils.IsBelowDir, "a/b/c", "d") + + class TestPathJoin(unittest.TestCase): """Testing case for PathJoin""" @@ -568,7 +729,7 @@ class TestTailFile(testutils.GanetiTestCase): class TestPidFileFunctions(unittest.TestCase): - """Tests for WritePidFile, RemovePidFile and ReadPidFile""" + """Tests for WritePidFile and ReadPidFile""" def setUp(self): self.dir = tempfile.mkdtemp() @@ -582,10 +743,10 @@ class TestPidFileFunctions(unittest.TestCase): read_pid = utils.ReadPidFile(pid_file) self.failUnlessEqual(read_pid, os.getpid()) self.failUnless(utils.IsProcessAlive(read_pid)) - self.failUnlessRaises(errors.LockError, utils.WritePidFile, + self.failUnlessRaises(errors.PidFileLockError, utils.WritePidFile, self.f_dpn('test')) os.close(fd) - utils.RemovePidFile(self.f_dpn("test")) + utils.RemoveFile(self.f_dpn("test")) self.failIf(os.path.exists(pid_file), "PID file should not exist anymore") self.failUnlessEqual(utils.ReadPidFile(pid_file), 0, @@ -598,7 +759,7 @@ class TestPidFileFunctions(unittest.TestCase): # but now, even with the file existing, we should be able to lock it fd = utils.WritePidFile(self.f_dpn('test')) os.close(fd) - utils.RemovePidFile(self.f_dpn("test")) + utils.RemoveFile(self.f_dpn("test")) self.failIf(os.path.exists(pid_file), "PID file should not exist anymore") @@ -618,11 +779,28 @@ class TestPidFileFunctions(unittest.TestCase): read_pid = utils.ReadPidFile(pid_file) self.failUnlessEqual(read_pid, new_pid) self.failUnless(utils.IsProcessAlive(new_pid)) + + # Try writing to locked file + try: + utils.WritePidFile(pid_file) + except errors.PidFileLockError, err: + errmsg = str(err) + self.assertTrue(errmsg.endswith(" %s" % new_pid), + msg=("Error message ('%s') didn't contain correct" + " PID (%s)" % (errmsg, new_pid))) + else: + self.fail("Writing to locked file didn't fail") + utils.KillProcess(new_pid, waitpid=True) self.failIf(utils.IsProcessAlive(new_pid)) - utils.RemovePidFile(self.f_dpn('child')) + utils.RemoveFile(self.f_dpn('child')) self.failUnlessRaises(errors.ProgrammerError, utils.KillProcess, 0) + def testExceptionType(self): + # Make sure the PID lock error is a subclass of LockError in case some code + # depends on it + self.assertTrue(issubclass(errors.PidFileLockError, errors.LockError)) + def tearDown(self): shutil.rmtree(self.dir) @@ -698,5 +876,127 @@ class TestNewUUID(unittest.TestCase): self.failUnless(utils.UUID_RE.match(utils.NewUUID())) +def _MockStatResult(cb, mode, uid, gid): + def _fn(path): + if cb: + cb() + return { + stat.ST_MODE: mode, + stat.ST_UID: uid, + stat.ST_GID: gid, + } + return _fn + + +def _RaiseNoEntError(): + raise EnvironmentError(errno.ENOENT, "not found") + + +def _OtherStatRaise(): + raise EnvironmentError() + + +class TestPermissionEnforcements(unittest.TestCase): + UID_A = 16024 + UID_B = 25850 + GID_A = 14028 + GID_B = 29801 + + def setUp(self): + self._chown_calls = [] + self._chmod_calls = [] + self._mkdir_calls = [] + + def tearDown(self): + self.assertRaises(IndexError, self._mkdir_calls.pop) + self.assertRaises(IndexError, self._chmod_calls.pop) + self.assertRaises(IndexError, self._chown_calls.pop) + + def _FakeMkdir(self, path): + self._mkdir_calls.append(path) + + def _FakeChown(self, path, uid, gid): + self._chown_calls.append((path, uid, gid)) + + def _ChmodWrapper(self, cb): + def _fn(path, mode): + self._chmod_calls.append((path, mode)) + if cb: + cb() + return _fn + + def _VerifyPerm(self, path, mode, uid=-1, gid=-1): + self.assertEqual(path, "/ganeti-qa-non-test") + self.assertEqual(mode, 0700) + self.assertEqual(uid, self.UID_A) + self.assertEqual(gid, self.GID_A) + + def testMakeDirWithPerm(self): + is_dir_stat = _MockStatResult(None, stat.S_IFDIR, 0, 0) + utils.MakeDirWithPerm("/ganeti-qa-non-test", 0700, self.UID_A, self.GID_A, + _lstat_fn=is_dir_stat, _perm_fn=self._VerifyPerm) + + def testDirErrors(self): + self.assertRaises(errors.GenericError, utils.MakeDirWithPerm, + "/ganeti-qa-non-test", 0700, 0, 0, + _lstat_fn=_MockStatResult(None, 0, 0, 0)) + self.assertRaises(IndexError, self._mkdir_calls.pop) + + other_stat_raise = _MockStatResult(_OtherStatRaise, stat.S_IFDIR, 0, 0) + self.assertRaises(errors.GenericError, utils.MakeDirWithPerm, + "/ganeti-qa-non-test", 0700, 0, 0, + _lstat_fn=other_stat_raise) + self.assertRaises(IndexError, self._mkdir_calls.pop) + + non_exist_stat = _MockStatResult(_RaiseNoEntError, stat.S_IFDIR, 0, 0) + utils.MakeDirWithPerm("/ganeti-qa-non-test", 0700, self.UID_A, self.GID_A, + _lstat_fn=non_exist_stat, _mkdir_fn=self._FakeMkdir, + _perm_fn=self._VerifyPerm) + self.assertEqual(self._mkdir_calls.pop(0), "/ganeti-qa-non-test") + + def testEnforcePermissionNoEnt(self): + self.assertRaises(errors.GenericError, utils.EnforcePermission, + "/ganeti-qa-non-test", 0600, + _chmod_fn=NotImplemented, _chown_fn=NotImplemented, + _stat_fn=_MockStatResult(_RaiseNoEntError, 0, 0, 0)) + + def testEnforcePermissionNoEntMustNotExist(self): + utils.EnforcePermission("/ganeti-qa-non-test", 0600, must_exist=False, + _chmod_fn=NotImplemented, + _chown_fn=NotImplemented, + _stat_fn=_MockStatResult(_RaiseNoEntError, + 0, 0, 0)) + + def testEnforcePermissionOtherErrorMustNotExist(self): + self.assertRaises(errors.GenericError, utils.EnforcePermission, + "/ganeti-qa-non-test", 0600, must_exist=False, + _chmod_fn=NotImplemented, _chown_fn=NotImplemented, + _stat_fn=_MockStatResult(_OtherStatRaise, 0, 0, 0)) + + def testEnforcePermissionNoChanges(self): + utils.EnforcePermission("/ganeti-qa-non-test", 0600, + _stat_fn=_MockStatResult(None, 0600, 0, 0), + _chmod_fn=self._ChmodWrapper(None), + _chown_fn=self._FakeChown) + + def testEnforcePermissionChangeMode(self): + utils.EnforcePermission("/ganeti-qa-non-test", 0444, + _stat_fn=_MockStatResult(None, 0600, 0, 0), + _chmod_fn=self._ChmodWrapper(None), + _chown_fn=self._FakeChown) + self.assertEqual(self._chmod_calls.pop(0), ("/ganeti-qa-non-test", 0444)) + + def testEnforcePermissionSetUidGid(self): + utils.EnforcePermission("/ganeti-qa-non-test", 0600, + uid=self.UID_B, gid=self.GID_B, + _stat_fn=_MockStatResult(None, 0600, + self.UID_A, + self.GID_A), + _chmod_fn=self._ChmodWrapper(None), + _chown_fn=self._FakeChown) + self.assertEqual(self._chown_calls.pop(0), + ("/ganeti-qa-non-test", self.UID_B, self.GID_B)) + + if __name__ == "__main__": testutils.GanetiTestProgram()