import glob
import time
import signal
+import stat
+import errno
from ganeti import constants
from ganeti import utils
h.update(data)
self.assertEqual(h.hexdigest(), "893772354e4e690b9efd073eed433ce7")
+ def testCallback(self):
+ def _Cb(fh):
+ self.assertEqual(fh.tell(), 0)
+ data = utils.ReadFile(self._TestDataFilename("cert1.pem"), preread=_Cb)
+ self.assertEqual(len(data), 814)
+
def testError(self):
self.assertRaises(EnvironmentError, utils.ReadFile,
"/dev/null/does-not-exist")
self.failUnlessRaises(errors.ProgrammerError, utils.ListVisibleFiles,
"/bin/../tmp")
+ def testMountpoint(self):
+ lvfmp_fn = compat.partial(utils.ListVisibleFiles,
+ _is_mountpoint=lambda _: True)
+ self.assertEqual(lvfmp_fn(self.path), [])
+
+ # Create "lost+found" as a regular file
+ self._CreateFiles(["foo", "bar", ".baz", "lost+found"])
+ self.assertEqual(set(lvfmp_fn(self.path)),
+ set(["foo", "bar", "lost+found"]))
+
+ # Replace "lost+found" with a directory
+ laf_path = utils.PathJoin(self.path, "lost+found")
+ utils.RemoveFile(laf_path)
+ os.mkdir(laf_path)
+ self.assertEqual(set(lvfmp_fn(self.path)), set(["foo", "bar"]))
+
+ def testLostAndFoundNoMountpoint(self):
+ files = ["foo", "bar", ".Hello World", "lost+found"]
+ expected = ["foo", "bar", "lost+found"]
+ self._test(files, expected)
+
-class TestWriteFile(unittest.TestCase):
+class TestWriteFile(testutils.GanetiTestCase):
def setUp(self):
+ testutils.GanetiTestCase.setUp(self)
+ self.tmpdir = None
self.tfile = tempfile.NamedTemporaryFile()
self.did_pre = False
self.did_post = False
self.did_write = False
+ def tearDown(self):
+ testutils.GanetiTestCase.tearDown(self)
+ if self.tmpdir:
+ shutil.rmtree(self.tmpdir)
+
def markPre(self, fd):
self.did_pre = True
utils.WriteFile(self.tfile.name, data=data)
self.assertEqual(utils.ReadFile(self.tfile.name), data)
+ def testWriteSimpleUnicode(self):
+ data = u"abc"
+ utils.WriteFile(self.tfile.name, data=data)
+ self.assertEqual(utils.ReadFile(self.tfile.name), data)
+
def testErrors(self):
self.assertRaises(errors.ProgrammerError, utils.WriteFile,
self.tfile.name, data="test", fn=lambda fd: None)
self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name)
self.assertRaises(errors.ProgrammerError, utils.WriteFile,
self.tfile.name, data="test", atime=0)
-
- def testCalls(self):
- utils.WriteFile(self.tfile.name, fn=self.markWrite,
- prewrite=self.markPre, postwrite=self.markPost)
+ self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name,
+ mode=0400, keep_perms=utils.KP_ALWAYS)
+ self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name,
+ uid=0, keep_perms=utils.KP_ALWAYS)
+ self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name,
+ gid=0, keep_perms=utils.KP_ALWAYS)
+ self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name,
+ mode=0400, uid=0, keep_perms=utils.KP_ALWAYS)
+
+ def testPreWrite(self):
+ utils.WriteFile(self.tfile.name, data="", prewrite=self.markPre)
self.assertTrue(self.did_pre)
+ self.assertFalse(self.did_post)
+ self.assertFalse(self.did_write)
+
+ def testPostWrite(self):
+ utils.WriteFile(self.tfile.name, data="", postwrite=self.markPost)
+ self.assertFalse(self.did_pre)
self.assertTrue(self.did_post)
+ self.assertFalse(self.did_write)
+
+ def testWriteFunction(self):
+ utils.WriteFile(self.tfile.name, fn=self.markWrite)
+ self.assertFalse(self.did_pre)
+ self.assertFalse(self.did_post)
self.assertTrue(self.did_write)
def testDryRun(self):
finally:
os.close(fd)
+ def testNoLeftovers(self):
+ self.tmpdir = tempfile.mkdtemp()
+ self.assertEqual(utils.WriteFile(utils.PathJoin(self.tmpdir, "test"),
+ data="abc"),
+ None)
+ self.assertEqual(os.listdir(self.tmpdir), ["test"])
+
+ def testFailRename(self):
+ self.tmpdir = tempfile.mkdtemp()
+ target = utils.PathJoin(self.tmpdir, "target")
+ os.mkdir(target)
+ self.assertRaises(OSError, utils.WriteFile, target, data="abc")
+ self.assertTrue(os.path.isdir(target))
+ self.assertEqual(os.listdir(self.tmpdir), ["target"])
+ self.assertFalse(os.listdir(target))
+
+ def testFailRenameDryRun(self):
+ self.tmpdir = tempfile.mkdtemp()
+ target = utils.PathJoin(self.tmpdir, "target")
+ os.mkdir(target)
+ self.assertEqual(utils.WriteFile(target, data="abc", dry_run=True), None)
+ self.assertTrue(os.path.isdir(target))
+ self.assertEqual(os.listdir(self.tmpdir), ["target"])
+ self.assertFalse(os.listdir(target))
+
+ def testBackup(self):
+ self.tmpdir = tempfile.mkdtemp()
+ testfile = utils.PathJoin(self.tmpdir, "test")
+
+ self.assertEqual(utils.WriteFile(testfile, data="foo", backup=True), None)
+ self.assertEqual(utils.ReadFile(testfile), "foo")
+ self.assertEqual(os.listdir(self.tmpdir), ["test"])
+
+ # Write again
+ assert os.path.isfile(testfile)
+ self.assertEqual(utils.WriteFile(testfile, data="bar", backup=True), None)
+ self.assertEqual(utils.ReadFile(testfile), "bar")
+ self.assertEqual(len(glob.glob("%s.backup*" % testfile)), 1)
+ self.assertTrue("test" in os.listdir(self.tmpdir))
+ self.assertEqual(len(os.listdir(self.tmpdir)), 2)
+
+ # Write again as dry-run
+ assert os.path.isfile(testfile)
+ self.assertEqual(utils.WriteFile(testfile, data="000", backup=True,
+ dry_run=True),
+ None)
+ self.assertEqual(utils.ReadFile(testfile), "bar")
+ self.assertEqual(len(glob.glob("%s.backup*" % testfile)), 1)
+ self.assertTrue("test" in os.listdir(self.tmpdir))
+ self.assertEqual(len(os.listdir(self.tmpdir)), 2)
+
+ def testFileMode(self):
+ self.tmpdir = tempfile.mkdtemp()
+ target = utils.PathJoin(self.tmpdir, "target")
+ self.assertRaises(OSError, utils.WriteFile, target, data="data",
+ keep_perms=utils.KP_ALWAYS)
+ # All masks have only user bits set, to avoid interactions with umask
+ utils.WriteFile(target, data="data", mode=0200)
+ self.assertFileMode(target, 0200)
+ utils.WriteFile(target, data="data", mode=0400,
+ keep_perms=utils.KP_IF_EXISTS)
+ self.assertFileMode(target, 0200)
+ utils.WriteFile(target, data="data", keep_perms=utils.KP_ALWAYS)
+ self.assertFileMode(target, 0200)
+ utils.WriteFile(target, data="data", mode=0700)
+ self.assertFileMode(target, 0700)
+
+ def testNewFileMode(self):
+ self.tmpdir = tempfile.mkdtemp()
+ target = utils.PathJoin(self.tmpdir, "target")
+ utils.WriteFile(target, data="data", mode=0400,
+ keep_perms=utils.KP_IF_EXISTS)
+ self.assertFileMode(target, 0400)
class TestFileID(testutils.GanetiTestCase):
def testEquality(self):
def testUpdate(self):
name = self._CreateTempFile()
oldi = utils.GetFileID(path=name)
- os.utime(name, None)
fd = os.open(name, os.O_RDWR)
try:
newi = utils.GetFileID(fd=fd)
self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/xyz")))
- utils.RenameFile(os.path.join(self.tmpdir, "test/xyz"),
- os.path.join(self.tmpdir, "test/foo/bar/baz"),
- mkdir=True)
- self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
- self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test/foo/bar")))
- self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/foo/bar/baz")))
+ self.assertRaises(EnvironmentError, utils.RenameFile,
+ os.path.join(self.tmpdir, "test/xyz"),
+ os.path.join(self.tmpdir, "test/foo/bar/baz"),
+ mkdir=True)
+
+ self.assertTrue(os.path.exists(os.path.join(self.tmpdir, "test/xyz")))
+ self.assertFalse(os.path.exists(os.path.join(self.tmpdir, "test/foo/bar")))
+ self.assertFalse(os.path.exists(os.path.join(self.tmpdir,
+ "test/foo/bar/baz")))
class TestMakedirs(unittest.TestCase):
self._pathTestHelper("/etc/", False)
+class TestIsBelowDir(unittest.TestCase):
+ """Testing case for IsBelowDir"""
+
+ def testSamePrefix(self):
+ self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/c"))
+ self.assertTrue(utils.IsBelowDir("/a/b/", "/a/b/e"))
+
+ def testSamePrefixButDifferentDir(self):
+ self.assertFalse(utils.IsBelowDir("/a/b", "/a/bc/d"))
+ self.assertFalse(utils.IsBelowDir("/a/b/", "/a/bc/e"))
+
+ def testSamePrefixButDirTraversal(self):
+ self.assertFalse(utils.IsBelowDir("/a/b", "/a/b/../c"))
+ self.assertFalse(utils.IsBelowDir("/a/b/", "/a/b/../d"))
+
+ def testSamePrefixAndTraversal(self):
+ self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/c/../d"))
+ self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/c/./e"))
+ self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/../b/./e"))
+
+ def testBothAbsPath(self):
+ self.assertRaises(ValueError, utils.IsBelowDir, "/a/b/c", "d")
+ self.assertRaises(ValueError, utils.IsBelowDir, "a/b/c", "/d")
+ self.assertRaises(ValueError, utils.IsBelowDir, "a/b/c", "d")
+
+
class TestPathJoin(unittest.TestCase):
"""Testing case for PathJoin"""
read_pid = utils.ReadPidFile(pid_file)
self.failUnlessEqual(read_pid, os.getpid())
self.failUnless(utils.IsProcessAlive(read_pid))
- self.failUnlessRaises(errors.LockError, utils.WritePidFile,
+ self.failUnlessRaises(errors.PidFileLockError, utils.WritePidFile,
self.f_dpn('test'))
os.close(fd)
utils.RemoveFile(self.f_dpn("test"))
read_pid = utils.ReadPidFile(pid_file)
self.failUnlessEqual(read_pid, new_pid)
self.failUnless(utils.IsProcessAlive(new_pid))
+
+ # Try writing to locked file
+ try:
+ utils.WritePidFile(pid_file)
+ except errors.PidFileLockError, err:
+ errmsg = str(err)
+ self.assertTrue(errmsg.endswith(" %s" % new_pid),
+ msg=("Error message ('%s') didn't contain correct"
+ " PID (%s)" % (errmsg, new_pid)))
+ else:
+ self.fail("Writing to locked file didn't fail")
+
utils.KillProcess(new_pid, waitpid=True)
self.failIf(utils.IsProcessAlive(new_pid))
utils.RemoveFile(self.f_dpn('child'))
self.failUnlessRaises(errors.ProgrammerError, utils.KillProcess, 0)
+ def testExceptionType(self):
+ # Make sure the PID lock error is a subclass of LockError in case some code
+ # depends on it
+ self.assertTrue(issubclass(errors.PidFileLockError, errors.LockError))
+
def tearDown(self):
shutil.rmtree(self.dir)
self.failUnless(utils.UUID_RE.match(utils.NewUUID()))
+def _MockStatResult(cb, mode, uid, gid):
+ def _fn(path):
+ if cb:
+ cb()
+ return {
+ stat.ST_MODE: mode,
+ stat.ST_UID: uid,
+ stat.ST_GID: gid,
+ }
+ return _fn
+
+
+def _RaiseNoEntError():
+ raise EnvironmentError(errno.ENOENT, "not found")
+
+
+def _OtherStatRaise():
+ raise EnvironmentError()
+
+
+class TestPermissionEnforcements(unittest.TestCase):
+ UID_A = 16024
+ UID_B = 25850
+ GID_A = 14028
+ GID_B = 29801
+
+ def setUp(self):
+ self._chown_calls = []
+ self._chmod_calls = []
+ self._mkdir_calls = []
+
+ def tearDown(self):
+ self.assertRaises(IndexError, self._mkdir_calls.pop)
+ self.assertRaises(IndexError, self._chmod_calls.pop)
+ self.assertRaises(IndexError, self._chown_calls.pop)
+
+ def _FakeMkdir(self, path):
+ self._mkdir_calls.append(path)
+
+ def _FakeChown(self, path, uid, gid):
+ self._chown_calls.append((path, uid, gid))
+
+ def _ChmodWrapper(self, cb):
+ def _fn(path, mode):
+ self._chmod_calls.append((path, mode))
+ if cb:
+ cb()
+ return _fn
+
+ def _VerifyPerm(self, path, mode, uid=-1, gid=-1):
+ self.assertEqual(path, "/ganeti-qa-non-test")
+ self.assertEqual(mode, 0700)
+ self.assertEqual(uid, self.UID_A)
+ self.assertEqual(gid, self.GID_A)
+
+ def testMakeDirWithPerm(self):
+ is_dir_stat = _MockStatResult(None, stat.S_IFDIR, 0, 0)
+ utils.MakeDirWithPerm("/ganeti-qa-non-test", 0700, self.UID_A, self.GID_A,
+ _lstat_fn=is_dir_stat, _perm_fn=self._VerifyPerm)
+
+ def testDirErrors(self):
+ self.assertRaises(errors.GenericError, utils.MakeDirWithPerm,
+ "/ganeti-qa-non-test", 0700, 0, 0,
+ _lstat_fn=_MockStatResult(None, 0, 0, 0))
+ self.assertRaises(IndexError, self._mkdir_calls.pop)
+
+ other_stat_raise = _MockStatResult(_OtherStatRaise, stat.S_IFDIR, 0, 0)
+ self.assertRaises(errors.GenericError, utils.MakeDirWithPerm,
+ "/ganeti-qa-non-test", 0700, 0, 0,
+ _lstat_fn=other_stat_raise)
+ self.assertRaises(IndexError, self._mkdir_calls.pop)
+
+ non_exist_stat = _MockStatResult(_RaiseNoEntError, stat.S_IFDIR, 0, 0)
+ utils.MakeDirWithPerm("/ganeti-qa-non-test", 0700, self.UID_A, self.GID_A,
+ _lstat_fn=non_exist_stat, _mkdir_fn=self._FakeMkdir,
+ _perm_fn=self._VerifyPerm)
+ self.assertEqual(self._mkdir_calls.pop(0), "/ganeti-qa-non-test")
+
+ def testEnforcePermissionNoEnt(self):
+ self.assertRaises(errors.GenericError, utils.EnforcePermission,
+ "/ganeti-qa-non-test", 0600,
+ _chmod_fn=NotImplemented, _chown_fn=NotImplemented,
+ _stat_fn=_MockStatResult(_RaiseNoEntError, 0, 0, 0))
+
+ def testEnforcePermissionNoEntMustNotExist(self):
+ utils.EnforcePermission("/ganeti-qa-non-test", 0600, must_exist=False,
+ _chmod_fn=NotImplemented,
+ _chown_fn=NotImplemented,
+ _stat_fn=_MockStatResult(_RaiseNoEntError,
+ 0, 0, 0))
+
+ def testEnforcePermissionOtherErrorMustNotExist(self):
+ self.assertRaises(errors.GenericError, utils.EnforcePermission,
+ "/ganeti-qa-non-test", 0600, must_exist=False,
+ _chmod_fn=NotImplemented, _chown_fn=NotImplemented,
+ _stat_fn=_MockStatResult(_OtherStatRaise, 0, 0, 0))
+
+ def testEnforcePermissionNoChanges(self):
+ utils.EnforcePermission("/ganeti-qa-non-test", 0600,
+ _stat_fn=_MockStatResult(None, 0600, 0, 0),
+ _chmod_fn=self._ChmodWrapper(None),
+ _chown_fn=self._FakeChown)
+
+ def testEnforcePermissionChangeMode(self):
+ utils.EnforcePermission("/ganeti-qa-non-test", 0444,
+ _stat_fn=_MockStatResult(None, 0600, 0, 0),
+ _chmod_fn=self._ChmodWrapper(None),
+ _chown_fn=self._FakeChown)
+ self.assertEqual(self._chmod_calls.pop(0), ("/ganeti-qa-non-test", 0444))
+
+ def testEnforcePermissionSetUidGid(self):
+ utils.EnforcePermission("/ganeti-qa-non-test", 0600,
+ uid=self.UID_B, gid=self.GID_B,
+ _stat_fn=_MockStatResult(None, 0600,
+ self.UID_A,
+ self.GID_A),
+ _chmod_fn=self._ChmodWrapper(None),
+ _chown_fn=self._FakeChown)
+ self.assertEqual(self._chown_calls.pop(0),
+ ("/ganeti-qa-non-test", self.UID_B, self.GID_B))
+
+
if __name__ == "__main__":
testutils.GanetiTestProgram()