X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/0e5084eeb39f75fc5910e1d4d4ad2449e471d603..48aaca91efa214b37dba94f28582be73f3c90dbd:/test/ganeti.utils.io_unittest.py?ds=sidebyside diff --git a/test/ganeti.utils.io_unittest.py b/test/ganeti.utils.io_unittest.py index be1b4ab..8cee74b 100755 --- a/test/ganeti.utils.io_unittest.py +++ b/test/ganeti.utils.io_unittest.py @@ -28,6 +28,8 @@ import shutil import glob import time import signal +import stat +import errno from ganeti import constants from ganeti import utils @@ -237,9 +239,31 @@ class TestListVisibleFiles(unittest.TestCase): self.failUnlessRaises(errors.ProgrammerError, utils.ListVisibleFiles, "/bin/../tmp") + def testMountpoint(self): + lvfmp_fn = compat.partial(utils.ListVisibleFiles, + _is_mountpoint=lambda _: True) + self.assertEqual(lvfmp_fn(self.path), []) + + # Create "lost+found" as a regular file + self._CreateFiles(["foo", "bar", ".baz", "lost+found"]) + self.assertEqual(set(lvfmp_fn(self.path)), + set(["foo", "bar", "lost+found"])) + + # Replace "lost+found" with a directory + laf_path = utils.PathJoin(self.path, "lost+found") + utils.RemoveFile(laf_path) + os.mkdir(laf_path) + self.assertEqual(set(lvfmp_fn(self.path)), set(["foo", "bar"])) + + def testLostAndFoundNoMountpoint(self): + files = ["foo", "bar", ".Hello World", "lost+found"] + expected = ["foo", "bar", "lost+found"] + self._test(files, expected) + -class TestWriteFile(unittest.TestCase): +class TestWriteFile(testutils.GanetiTestCase): def setUp(self): + testutils.GanetiTestCase.setUp(self) self.tmpdir = None self.tfile = tempfile.NamedTemporaryFile() self.did_pre = False @@ -247,6 +271,7 @@ class TestWriteFile(unittest.TestCase): self.did_write = False def tearDown(self): + testutils.GanetiTestCase.tearDown(self) if self.tmpdir: shutil.rmtree(self.tmpdir) @@ -275,6 +300,14 @@ class TestWriteFile(unittest.TestCase): self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name) self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name, data="test", atime=0) + self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name, + mode=0400, keep_perms=utils.KP_ALWAYS) + self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name, + uid=0, keep_perms=utils.KP_ALWAYS) + self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name, + gid=0, keep_perms=utils.KP_ALWAYS) + self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name, + mode=0400, uid=0, keep_perms=utils.KP_ALWAYS) def testPreWrite(self): utils.WriteFile(self.tfile.name, data="", prewrite=self.markPre) @@ -371,6 +404,28 @@ class TestWriteFile(unittest.TestCase): self.assertTrue("test" in os.listdir(self.tmpdir)) self.assertEqual(len(os.listdir(self.tmpdir)), 2) + def testFileMode(self): + self.tmpdir = tempfile.mkdtemp() + target = utils.PathJoin(self.tmpdir, "target") + self.assertRaises(OSError, utils.WriteFile, target, data="data", + keep_perms=utils.KP_ALWAYS) + # All masks have only user bits set, to avoid interactions with umask + utils.WriteFile(target, data="data", mode=0200) + self.assertFileMode(target, 0200) + utils.WriteFile(target, data="data", mode=0400, + keep_perms=utils.KP_IF_EXISTS) + self.assertFileMode(target, 0200) + utils.WriteFile(target, data="data", keep_perms=utils.KP_ALWAYS) + self.assertFileMode(target, 0200) + utils.WriteFile(target, data="data", mode=0700) + self.assertFileMode(target, 0700) + + def testNewFileMode(self): + self.tmpdir = tempfile.mkdtemp() + target = utils.PathJoin(self.tmpdir, "target") + utils.WriteFile(target, data="data", mode=0400, + keep_perms=utils.KP_IF_EXISTS) + self.assertFileMode(target, 0400) class TestFileID(testutils.GanetiTestCase): def testEquality(self): @@ -381,7 +436,6 @@ class TestFileID(testutils.GanetiTestCase): def testUpdate(self): name = self._CreateTempFile() oldi = utils.GetFileID(path=name) - os.utime(name, None) fd = os.open(name, os.O_RDWR) try: newi = utils.GetFileID(fd=fd) @@ -416,8 +470,8 @@ class TestRemoveFile(unittest.TestCase): def setUp(self): """Create a temp dir and file for each case""" - self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-') - fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir) + self.tmpdir = tempfile.mkdtemp("", "ganeti-unittest-") + fd, self.tmpfile = tempfile.mkstemp("", "", self.tmpdir) os.close(fd) def tearDown(self): @@ -506,12 +560,15 @@ class TestRename(unittest.TestCase): self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test"))) self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/xyz"))) - utils.RenameFile(os.path.join(self.tmpdir, "test/xyz"), - os.path.join(self.tmpdir, "test/foo/bar/baz"), - mkdir=True) - self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test"))) - self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test/foo/bar"))) - self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/foo/bar/baz"))) + self.assertRaises(EnvironmentError, utils.RenameFile, + os.path.join(self.tmpdir, "test/xyz"), + os.path.join(self.tmpdir, "test/foo/bar/baz"), + mkdir=True) + + self.assertTrue(os.path.exists(os.path.join(self.tmpdir, "test/xyz"))) + self.assertFalse(os.path.exists(os.path.join(self.tmpdir, "test/foo/bar"))) + self.assertFalse(os.path.exists(os.path.join(self.tmpdir, + "test/foo/bar/baz"))) class TestMakedirs(unittest.TestCase): @@ -573,10 +630,10 @@ class TestIsNormAbsPath(unittest.TestCase): def _pathTestHelper(self, path, result): if result: self.assert_(utils.IsNormAbsPath(path), - "Path %s should result absolute and normalized" % path) + msg="Path %s should result absolute and normalized" % path) else: self.assertFalse(utils.IsNormAbsPath(path), - "Path %s should not result absolute and normalized" % path) + msg="Path %s should not result absolute and normalized" % path) def testBase(self): self._pathTestHelper("/etc", True) @@ -585,6 +642,68 @@ class TestIsNormAbsPath(unittest.TestCase): self._pathTestHelper("/etc/../root", False) self._pathTestHelper("/etc/", False) + def testSlashes(self): + # Root directory + self._pathTestHelper("/", True) + + # POSIX' "implementation-defined" double slashes + self._pathTestHelper("//", True) + + # Three and more slashes count as one, so the path is not normalized + for i in range(3, 10): + self._pathTestHelper("/" * i, False) + + +class TestIsBelowDir(unittest.TestCase): + """Testing case for IsBelowDir""" + + def testExactlyTheSame(self): + self.assertFalse(utils.IsBelowDir("/a/b", "/a/b")) + self.assertFalse(utils.IsBelowDir("/a/b", "/a/b/")) + self.assertFalse(utils.IsBelowDir("/a/b/", "/a/b")) + self.assertFalse(utils.IsBelowDir("/a/b/", "/a/b/")) + + def testSamePrefix(self): + self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/c")) + self.assertTrue(utils.IsBelowDir("/a/b/", "/a/b/e")) + + def testSamePrefixButDifferentDir(self): + self.assertFalse(utils.IsBelowDir("/a/b", "/a/bc/d")) + self.assertFalse(utils.IsBelowDir("/a/b/", "/a/bc/e")) + + def testSamePrefixButDirTraversal(self): + self.assertFalse(utils.IsBelowDir("/a/b", "/a/b/../c")) + self.assertFalse(utils.IsBelowDir("/a/b/", "/a/b/../d")) + + def testSamePrefixAndTraversal(self): + self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/c/../d")) + self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/c/./e")) + self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/../b/./e")) + + def testBothAbsPath(self): + self.assertRaises(ValueError, utils.IsBelowDir, "/a/b/c", "d") + self.assertRaises(ValueError, utils.IsBelowDir, "a/b/c", "/d") + self.assertRaises(ValueError, utils.IsBelowDir, "a/b/c", "d") + self.assertRaises(ValueError, utils.IsBelowDir, "", "/") + self.assertRaises(ValueError, utils.IsBelowDir, "/", "") + + def testRoot(self): + self.assertFalse(utils.IsBelowDir("/", "/")) + + for i in ["/a", "/tmp", "/tmp/foo/bar", "/tmp/"]: + self.assertTrue(utils.IsBelowDir("/", i)) + + def testSlashes(self): + # In POSIX a double slash is "implementation-defined". + self.assertFalse(utils.IsBelowDir("//", "//")) + self.assertFalse(utils.IsBelowDir("//", "/tmp")) + self.assertTrue(utils.IsBelowDir("//tmp", "//tmp/x")) + + # Three (or more) slashes count as one + self.assertFalse(utils.IsBelowDir("/", "///")) + self.assertTrue(utils.IsBelowDir("/", "///tmp")) + self.assertTrue(utils.IsBelowDir("/tmp", "///tmp/a/b")) + class TestPathJoin(unittest.TestCase): """Testing case for PathJoin""" @@ -653,15 +772,15 @@ class TestPidFileFunctions(unittest.TestCase): self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name) def testPidFileFunctions(self): - pid_file = self.f_dpn('test') - fd = utils.WritePidFile(self.f_dpn('test')) + pid_file = self.f_dpn("test") + fd = utils.WritePidFile(self.f_dpn("test")) self.failUnless(os.path.exists(pid_file), "PID file should have been created") read_pid = utils.ReadPidFile(pid_file) self.failUnlessEqual(read_pid, os.getpid()) self.failUnless(utils.IsProcessAlive(read_pid)) - self.failUnlessRaises(errors.LockError, utils.WritePidFile, - self.f_dpn('test')) + self.failUnlessRaises(errors.PidFileLockError, utils.WritePidFile, + self.f_dpn("test")) os.close(fd) utils.RemoveFile(self.f_dpn("test")) self.failIf(os.path.exists(pid_file), @@ -674,19 +793,19 @@ class TestPidFileFunctions(unittest.TestCase): self.failUnlessEqual(utils.ReadPidFile(pid_file), 0, "ReadPidFile should return 0 for invalid pid file") # but now, even with the file existing, we should be able to lock it - fd = utils.WritePidFile(self.f_dpn('test')) + fd = utils.WritePidFile(self.f_dpn("test")) os.close(fd) utils.RemoveFile(self.f_dpn("test")) self.failIf(os.path.exists(pid_file), "PID file should not exist anymore") def testKill(self): - pid_file = self.f_dpn('child') + pid_file = self.f_dpn("child") r_fd, w_fd = os.pipe() new_pid = os.fork() if new_pid == 0: #child - utils.WritePidFile(self.f_dpn('child')) - os.write(w_fd, 'a') + utils.WritePidFile(self.f_dpn("child")) + os.write(w_fd, "a") signal.pause() os._exit(0) return @@ -696,11 +815,28 @@ class TestPidFileFunctions(unittest.TestCase): read_pid = utils.ReadPidFile(pid_file) self.failUnlessEqual(read_pid, new_pid) self.failUnless(utils.IsProcessAlive(new_pid)) + + # Try writing to locked file + try: + utils.WritePidFile(pid_file) + except errors.PidFileLockError, err: + errmsg = str(err) + self.assertTrue(errmsg.endswith(" %s" % new_pid), + msg=("Error message ('%s') didn't contain correct" + " PID (%s)" % (errmsg, new_pid))) + else: + self.fail("Writing to locked file didn't fail") + utils.KillProcess(new_pid, waitpid=True) self.failIf(utils.IsProcessAlive(new_pid)) - utils.RemoveFile(self.f_dpn('child')) + utils.RemoveFile(self.f_dpn("child")) self.failUnlessRaises(errors.ProgrammerError, utils.KillProcess, 0) + def testExceptionType(self): + # Make sure the PID lock error is a subclass of LockError in case some code + # depends on it + self.assertTrue(issubclass(errors.PidFileLockError, errors.LockError)) + def tearDown(self): shutil.rmtree(self.dir) @@ -708,14 +844,14 @@ class TestPidFileFunctions(unittest.TestCase): class TestSshKeys(testutils.GanetiTestCase): """Test case for the AddAuthorizedKey function""" - KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a' + KEY_A = "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a" KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="198.51.100.4" ' - 'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b') + "ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b") def setUp(self): testutils.GanetiTestCase.setUp(self) self.tmpname = self._CreateTempFile() - handle = open(self.tmpname, 'w') + handle = open(self.tmpname, "w") try: handle.write("%s\n" % TestSshKeys.KEY_A) handle.write("%s\n" % TestSshKeys.KEY_B) @@ -724,7 +860,7 @@ class TestSshKeys(testutils.GanetiTestCase): def testAddingNewKey(self): utils.AddAuthorizedKey(self.tmpname, - 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test') + "ssh-dss AAAAB3NzaC1kc3MAAACB root@test") self.assertFileContent(self.tmpname, "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n" @@ -734,26 +870,30 @@ class TestSshKeys(testutils.GanetiTestCase): def testAddingAlmostButNotCompletelyTheSameKey(self): utils.AddAuthorizedKey(self.tmpname, - 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test') + "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test") + # Only significant fields are compared, therefore the key won't be + # updated/added self.assertFileContent(self.tmpname, "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n" 'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"' - " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n" - "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n") + " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n") def testAddingExistingKeyWithSomeMoreSpaces(self): utils.AddAuthorizedKey(self.tmpname, - 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a') + "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a") + utils.AddAuthorizedKey(self.tmpname, + "ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22") self.assertFileContent(self.tmpname, "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n" 'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"' - " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n") + " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n" + "ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22\n") def testRemovingExistingKeyWithSomeMoreSpaces(self): utils.RemoveAuthorizedKey(self.tmpname, - 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a') + "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a") self.assertFileContent(self.tmpname, 'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"' @@ -761,7 +901,7 @@ class TestSshKeys(testutils.GanetiTestCase): def testRemovingNonExistingKey(self): utils.RemoveAuthorizedKey(self.tmpname, - 'ssh-dss AAAAB3Nsdfj230xxjxJjsjwjsjdjU root@test') + "ssh-dss AAAAB3Nsdfj230xxjxJjsjwjsjdjU root@test") self.assertFileContent(self.tmpname, "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n" @@ -776,5 +916,127 @@ class TestNewUUID(unittest.TestCase): self.failUnless(utils.UUID_RE.match(utils.NewUUID())) +def _MockStatResult(cb, mode, uid, gid): + def _fn(path): + if cb: + cb() + return { + stat.ST_MODE: mode, + stat.ST_UID: uid, + stat.ST_GID: gid, + } + return _fn + + +def _RaiseNoEntError(): + raise EnvironmentError(errno.ENOENT, "not found") + + +def _OtherStatRaise(): + raise EnvironmentError() + + +class TestPermissionEnforcements(unittest.TestCase): + UID_A = 16024 + UID_B = 25850 + GID_A = 14028 + GID_B = 29801 + + def setUp(self): + self._chown_calls = [] + self._chmod_calls = [] + self._mkdir_calls = [] + + def tearDown(self): + self.assertRaises(IndexError, self._mkdir_calls.pop) + self.assertRaises(IndexError, self._chmod_calls.pop) + self.assertRaises(IndexError, self._chown_calls.pop) + + def _FakeMkdir(self, path): + self._mkdir_calls.append(path) + + def _FakeChown(self, path, uid, gid): + self._chown_calls.append((path, uid, gid)) + + def _ChmodWrapper(self, cb): + def _fn(path, mode): + self._chmod_calls.append((path, mode)) + if cb: + cb() + return _fn + + def _VerifyPerm(self, path, mode, uid=-1, gid=-1): + self.assertEqual(path, "/ganeti-qa-non-test") + self.assertEqual(mode, 0700) + self.assertEqual(uid, self.UID_A) + self.assertEqual(gid, self.GID_A) + + def testMakeDirWithPerm(self): + is_dir_stat = _MockStatResult(None, stat.S_IFDIR, 0, 0) + utils.MakeDirWithPerm("/ganeti-qa-non-test", 0700, self.UID_A, self.GID_A, + _lstat_fn=is_dir_stat, _perm_fn=self._VerifyPerm) + + def testDirErrors(self): + self.assertRaises(errors.GenericError, utils.MakeDirWithPerm, + "/ganeti-qa-non-test", 0700, 0, 0, + _lstat_fn=_MockStatResult(None, 0, 0, 0)) + self.assertRaises(IndexError, self._mkdir_calls.pop) + + other_stat_raise = _MockStatResult(_OtherStatRaise, stat.S_IFDIR, 0, 0) + self.assertRaises(errors.GenericError, utils.MakeDirWithPerm, + "/ganeti-qa-non-test", 0700, 0, 0, + _lstat_fn=other_stat_raise) + self.assertRaises(IndexError, self._mkdir_calls.pop) + + non_exist_stat = _MockStatResult(_RaiseNoEntError, stat.S_IFDIR, 0, 0) + utils.MakeDirWithPerm("/ganeti-qa-non-test", 0700, self.UID_A, self.GID_A, + _lstat_fn=non_exist_stat, _mkdir_fn=self._FakeMkdir, + _perm_fn=self._VerifyPerm) + self.assertEqual(self._mkdir_calls.pop(0), "/ganeti-qa-non-test") + + def testEnforcePermissionNoEnt(self): + self.assertRaises(errors.GenericError, utils.EnforcePermission, + "/ganeti-qa-non-test", 0600, + _chmod_fn=NotImplemented, _chown_fn=NotImplemented, + _stat_fn=_MockStatResult(_RaiseNoEntError, 0, 0, 0)) + + def testEnforcePermissionNoEntMustNotExist(self): + utils.EnforcePermission("/ganeti-qa-non-test", 0600, must_exist=False, + _chmod_fn=NotImplemented, + _chown_fn=NotImplemented, + _stat_fn=_MockStatResult(_RaiseNoEntError, + 0, 0, 0)) + + def testEnforcePermissionOtherErrorMustNotExist(self): + self.assertRaises(errors.GenericError, utils.EnforcePermission, + "/ganeti-qa-non-test", 0600, must_exist=False, + _chmod_fn=NotImplemented, _chown_fn=NotImplemented, + _stat_fn=_MockStatResult(_OtherStatRaise, 0, 0, 0)) + + def testEnforcePermissionNoChanges(self): + utils.EnforcePermission("/ganeti-qa-non-test", 0600, + _stat_fn=_MockStatResult(None, 0600, 0, 0), + _chmod_fn=self._ChmodWrapper(None), + _chown_fn=self._FakeChown) + + def testEnforcePermissionChangeMode(self): + utils.EnforcePermission("/ganeti-qa-non-test", 0444, + _stat_fn=_MockStatResult(None, 0600, 0, 0), + _chmod_fn=self._ChmodWrapper(None), + _chown_fn=self._FakeChown) + self.assertEqual(self._chmod_calls.pop(0), ("/ganeti-qa-non-test", 0444)) + + def testEnforcePermissionSetUidGid(self): + utils.EnforcePermission("/ganeti-qa-non-test", 0600, + uid=self.UID_B, gid=self.GID_B, + _stat_fn=_MockStatResult(None, 0600, + self.UID_A, + self.GID_A), + _chmod_fn=self._ChmodWrapper(None), + _chown_fn=self._FakeChown) + self.assertEqual(self._chown_calls.pop(0), + ("/ganeti-qa-non-test", self.UID_B, self.GID_B)) + + if __name__ == "__main__": testutils.GanetiTestProgram()