X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/fb17bebdd6bbe60dda0f1bacee80b2078d7e9602..43c16a8a1adfd543751fcaf60ad4c8e04cf83688:/test/ganeti.utils.io_unittest.py diff --git a/test/ganeti.utils.io_unittest.py b/test/ganeti.utils.io_unittest.py index 8b2cc8b..109232a 100755 --- a/test/ganeti.utils.io_unittest.py +++ b/test/ganeti.utils.io_unittest.py @@ -28,6 +28,8 @@ import shutil import glob import time import signal +import stat +import errno from ganeti import constants from ganeti import utils @@ -237,9 +239,31 @@ class TestListVisibleFiles(unittest.TestCase): self.failUnlessRaises(errors.ProgrammerError, utils.ListVisibleFiles, "/bin/../tmp") + def testMountpoint(self): + lvfmp_fn = compat.partial(utils.ListVisibleFiles, + _is_mountpoint=lambda _: True) + self.assertEqual(lvfmp_fn(self.path), []) + + # Create "lost+found" as a regular file + self._CreateFiles(["foo", "bar", ".baz", "lost+found"]) + self.assertEqual(set(lvfmp_fn(self.path)), + set(["foo", "bar", "lost+found"])) + + # Replace "lost+found" with a directory + laf_path = utils.PathJoin(self.path, "lost+found") + utils.RemoveFile(laf_path) + os.mkdir(laf_path) + self.assertEqual(set(lvfmp_fn(self.path)), set(["foo", "bar"])) + + def testLostAndFoundNoMountpoint(self): + files = ["foo", "bar", ".Hello World", "lost+found"] + expected = ["foo", "bar", "lost+found"] + self._test(files, expected) + -class TestWriteFile(unittest.TestCase): +class TestWriteFile(testutils.GanetiTestCase): def setUp(self): + testutils.GanetiTestCase.setUp(self) self.tmpdir = None self.tfile = tempfile.NamedTemporaryFile() self.did_pre = False @@ -247,6 +271,7 @@ class TestWriteFile(unittest.TestCase): self.did_write = False def tearDown(self): + testutils.GanetiTestCase.tearDown(self) if self.tmpdir: shutil.rmtree(self.tmpdir) @@ -275,6 +300,14 @@ class TestWriteFile(unittest.TestCase): self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name) self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name, data="test", atime=0) + self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name, + mode=0400, keep_perms=utils.KP_ALWAYS) + self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name, + uid=0, keep_perms=utils.KP_ALWAYS) + self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name, + gid=0, keep_perms=utils.KP_ALWAYS) + self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name, + mode=0400, uid=0, keep_perms=utils.KP_ALWAYS) def testPreWrite(self): utils.WriteFile(self.tfile.name, data="", prewrite=self.markPre) @@ -371,6 +404,28 @@ class TestWriteFile(unittest.TestCase): self.assertTrue("test" in os.listdir(self.tmpdir)) self.assertEqual(len(os.listdir(self.tmpdir)), 2) + def testFileMode(self): + self.tmpdir = tempfile.mkdtemp() + target = utils.PathJoin(self.tmpdir, "target") + self.assertRaises(OSError, utils.WriteFile, target, data="data", + keep_perms=utils.KP_ALWAYS) + # All masks have only user bits set, to avoid interactions with umask + utils.WriteFile(target, data="data", mode=0200) + self.assertFileMode(target, 0200) + utils.WriteFile(target, data="data", mode=0400, + keep_perms=utils.KP_IF_EXISTS) + self.assertFileMode(target, 0200) + utils.WriteFile(target, data="data", keep_perms=utils.KP_ALWAYS) + self.assertFileMode(target, 0200) + utils.WriteFile(target, data="data", mode=0700) + self.assertFileMode(target, 0700) + + def testNewFileMode(self): + self.tmpdir = tempfile.mkdtemp() + target = utils.PathJoin(self.tmpdir, "target") + utils.WriteFile(target, data="data", mode=0400, + keep_perms=utils.KP_IF_EXISTS) + self.assertFileMode(target, 0400) class TestFileID(testutils.GanetiTestCase): def testEquality(self): @@ -381,7 +436,6 @@ class TestFileID(testutils.GanetiTestCase): def testUpdate(self): name = self._CreateTempFile() oldi = utils.GetFileID(path=name) - os.utime(name, None) fd = os.open(name, os.O_RDWR) try: newi = utils.GetFileID(fd=fd) @@ -506,12 +560,15 @@ class TestRename(unittest.TestCase): self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test"))) self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/xyz"))) - utils.RenameFile(os.path.join(self.tmpdir, "test/xyz"), - os.path.join(self.tmpdir, "test/foo/bar/baz"), - mkdir=True) - self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test"))) - self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test/foo/bar"))) - self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/foo/bar/baz"))) + self.assertRaises(EnvironmentError, utils.RenameFile, + os.path.join(self.tmpdir, "test/xyz"), + os.path.join(self.tmpdir, "test/foo/bar/baz"), + mkdir=True) + + self.assertTrue(os.path.exists(os.path.join(self.tmpdir, "test/xyz"))) + self.assertFalse(os.path.exists(os.path.join(self.tmpdir, "test/foo/bar"))) + self.assertFalse(os.path.exists(os.path.join(self.tmpdir, + "test/foo/bar/baz"))) class TestMakedirs(unittest.TestCase): @@ -686,7 +743,7 @@ class TestPidFileFunctions(unittest.TestCase): read_pid = utils.ReadPidFile(pid_file) self.failUnlessEqual(read_pid, os.getpid()) self.failUnless(utils.IsProcessAlive(read_pid)) - self.failUnlessRaises(errors.LockError, utils.WritePidFile, + self.failUnlessRaises(errors.PidFileLockError, utils.WritePidFile, self.f_dpn('test')) os.close(fd) utils.RemoveFile(self.f_dpn("test")) @@ -722,11 +779,28 @@ class TestPidFileFunctions(unittest.TestCase): read_pid = utils.ReadPidFile(pid_file) self.failUnlessEqual(read_pid, new_pid) self.failUnless(utils.IsProcessAlive(new_pid)) + + # Try writing to locked file + try: + utils.WritePidFile(pid_file) + except errors.PidFileLockError, err: + errmsg = str(err) + self.assertTrue(errmsg.endswith(" %s" % new_pid), + msg=("Error message ('%s') didn't contain correct" + " PID (%s)" % (errmsg, new_pid))) + else: + self.fail("Writing to locked file didn't fail") + utils.KillProcess(new_pid, waitpid=True) self.failIf(utils.IsProcessAlive(new_pid)) utils.RemoveFile(self.f_dpn('child')) self.failUnlessRaises(errors.ProgrammerError, utils.KillProcess, 0) + def testExceptionType(self): + # Make sure the PID lock error is a subclass of LockError in case some code + # depends on it + self.assertTrue(issubclass(errors.PidFileLockError, errors.LockError)) + def tearDown(self): shutil.rmtree(self.dir) @@ -802,5 +876,127 @@ class TestNewUUID(unittest.TestCase): self.failUnless(utils.UUID_RE.match(utils.NewUUID())) +def _MockStatResult(cb, mode, uid, gid): + def _fn(path): + if cb: + cb() + return { + stat.ST_MODE: mode, + stat.ST_UID: uid, + stat.ST_GID: gid, + } + return _fn + + +def _RaiseNoEntError(): + raise EnvironmentError(errno.ENOENT, "not found") + + +def _OtherStatRaise(): + raise EnvironmentError() + + +class TestPermissionEnforcements(unittest.TestCase): + UID_A = 16024 + UID_B = 25850 + GID_A = 14028 + GID_B = 29801 + + def setUp(self): + self._chown_calls = [] + self._chmod_calls = [] + self._mkdir_calls = [] + + def tearDown(self): + self.assertRaises(IndexError, self._mkdir_calls.pop) + self.assertRaises(IndexError, self._chmod_calls.pop) + self.assertRaises(IndexError, self._chown_calls.pop) + + def _FakeMkdir(self, path): + self._mkdir_calls.append(path) + + def _FakeChown(self, path, uid, gid): + self._chown_calls.append((path, uid, gid)) + + def _ChmodWrapper(self, cb): + def _fn(path, mode): + self._chmod_calls.append((path, mode)) + if cb: + cb() + return _fn + + def _VerifyPerm(self, path, mode, uid=-1, gid=-1): + self.assertEqual(path, "/ganeti-qa-non-test") + self.assertEqual(mode, 0700) + self.assertEqual(uid, self.UID_A) + self.assertEqual(gid, self.GID_A) + + def testMakeDirWithPerm(self): + is_dir_stat = _MockStatResult(None, stat.S_IFDIR, 0, 0) + utils.MakeDirWithPerm("/ganeti-qa-non-test", 0700, self.UID_A, self.GID_A, + _lstat_fn=is_dir_stat, _perm_fn=self._VerifyPerm) + + def testDirErrors(self): + self.assertRaises(errors.GenericError, utils.MakeDirWithPerm, + "/ganeti-qa-non-test", 0700, 0, 0, + _lstat_fn=_MockStatResult(None, 0, 0, 0)) + self.assertRaises(IndexError, self._mkdir_calls.pop) + + other_stat_raise = _MockStatResult(_OtherStatRaise, stat.S_IFDIR, 0, 0) + self.assertRaises(errors.GenericError, utils.MakeDirWithPerm, + "/ganeti-qa-non-test", 0700, 0, 0, + _lstat_fn=other_stat_raise) + self.assertRaises(IndexError, self._mkdir_calls.pop) + + non_exist_stat = _MockStatResult(_RaiseNoEntError, stat.S_IFDIR, 0, 0) + utils.MakeDirWithPerm("/ganeti-qa-non-test", 0700, self.UID_A, self.GID_A, + _lstat_fn=non_exist_stat, _mkdir_fn=self._FakeMkdir, + _perm_fn=self._VerifyPerm) + self.assertEqual(self._mkdir_calls.pop(0), "/ganeti-qa-non-test") + + def testEnforcePermissionNoEnt(self): + self.assertRaises(errors.GenericError, utils.EnforcePermission, + "/ganeti-qa-non-test", 0600, + _chmod_fn=NotImplemented, _chown_fn=NotImplemented, + _stat_fn=_MockStatResult(_RaiseNoEntError, 0, 0, 0)) + + def testEnforcePermissionNoEntMustNotExist(self): + utils.EnforcePermission("/ganeti-qa-non-test", 0600, must_exist=False, + _chmod_fn=NotImplemented, + _chown_fn=NotImplemented, + _stat_fn=_MockStatResult(_RaiseNoEntError, + 0, 0, 0)) + + def testEnforcePermissionOtherErrorMustNotExist(self): + self.assertRaises(errors.GenericError, utils.EnforcePermission, + "/ganeti-qa-non-test", 0600, must_exist=False, + _chmod_fn=NotImplemented, _chown_fn=NotImplemented, + _stat_fn=_MockStatResult(_OtherStatRaise, 0, 0, 0)) + + def testEnforcePermissionNoChanges(self): + utils.EnforcePermission("/ganeti-qa-non-test", 0600, + _stat_fn=_MockStatResult(None, 0600, 0, 0), + _chmod_fn=self._ChmodWrapper(None), + _chown_fn=self._FakeChown) + + def testEnforcePermissionChangeMode(self): + utils.EnforcePermission("/ganeti-qa-non-test", 0444, + _stat_fn=_MockStatResult(None, 0600, 0, 0), + _chmod_fn=self._ChmodWrapper(None), + _chown_fn=self._FakeChown) + self.assertEqual(self._chmod_calls.pop(0), ("/ganeti-qa-non-test", 0444)) + + def testEnforcePermissionSetUidGid(self): + utils.EnforcePermission("/ganeti-qa-non-test", 0600, + uid=self.UID_B, gid=self.GID_B, + _stat_fn=_MockStatResult(None, 0600, + self.UID_A, + self.GID_A), + _chmod_fn=self._ChmodWrapper(None), + _chown_fn=self._FakeChown) + self.assertEqual(self._chown_calls.pop(0), + ("/ganeti-qa-non-test", self.UID_B, self.GID_B)) + + if __name__ == "__main__": testutils.GanetiTestProgram()