X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/2dbc685719001fbd44cde48a0eab5d07763b8179..43c16a8a1adfd543751fcaf60ad4c8e04cf83688:/test/ganeti.utils.io_unittest.py diff --git a/test/ganeti.utils.io_unittest.py b/test/ganeti.utils.io_unittest.py index 46bb009..109232a 100755 --- a/test/ganeti.utils.io_unittest.py +++ b/test/ganeti.utils.io_unittest.py @@ -261,8 +261,9 @@ class TestListVisibleFiles(unittest.TestCase): self._test(files, expected) -class TestWriteFile(unittest.TestCase): +class TestWriteFile(testutils.GanetiTestCase): def setUp(self): + testutils.GanetiTestCase.setUp(self) self.tmpdir = None self.tfile = tempfile.NamedTemporaryFile() self.did_pre = False @@ -270,6 +271,7 @@ class TestWriteFile(unittest.TestCase): self.did_write = False def tearDown(self): + testutils.GanetiTestCase.tearDown(self) if self.tmpdir: shutil.rmtree(self.tmpdir) @@ -298,6 +300,14 @@ class TestWriteFile(unittest.TestCase): self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name) self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name, data="test", atime=0) + self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name, + mode=0400, keep_perms=utils.KP_ALWAYS) + self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name, + uid=0, keep_perms=utils.KP_ALWAYS) + self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name, + gid=0, keep_perms=utils.KP_ALWAYS) + self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name, + mode=0400, uid=0, keep_perms=utils.KP_ALWAYS) def testPreWrite(self): utils.WriteFile(self.tfile.name, data="", prewrite=self.markPre) @@ -394,6 +404,28 @@ class TestWriteFile(unittest.TestCase): self.assertTrue("test" in os.listdir(self.tmpdir)) self.assertEqual(len(os.listdir(self.tmpdir)), 2) + def testFileMode(self): + self.tmpdir = tempfile.mkdtemp() + target = utils.PathJoin(self.tmpdir, "target") + self.assertRaises(OSError, utils.WriteFile, target, data="data", + keep_perms=utils.KP_ALWAYS) + # All masks have only user bits set, to avoid interactions with umask + utils.WriteFile(target, data="data", mode=0200) + self.assertFileMode(target, 0200) + utils.WriteFile(target, data="data", mode=0400, + keep_perms=utils.KP_IF_EXISTS) + self.assertFileMode(target, 0200) + utils.WriteFile(target, data="data", keep_perms=utils.KP_ALWAYS) + self.assertFileMode(target, 0200) + utils.WriteFile(target, data="data", mode=0700) + self.assertFileMode(target, 0700) + + def testNewFileMode(self): + self.tmpdir = tempfile.mkdtemp() + target = utils.PathJoin(self.tmpdir, "target") + utils.WriteFile(target, data="data", mode=0400, + keep_perms=utils.KP_IF_EXISTS) + self.assertFileMode(target, 0400) class TestFileID(testutils.GanetiTestCase): def testEquality(self): @@ -711,7 +743,7 @@ class TestPidFileFunctions(unittest.TestCase): read_pid = utils.ReadPidFile(pid_file) self.failUnlessEqual(read_pid, os.getpid()) self.failUnless(utils.IsProcessAlive(read_pid)) - self.failUnlessRaises(errors.LockError, utils.WritePidFile, + self.failUnlessRaises(errors.PidFileLockError, utils.WritePidFile, self.f_dpn('test')) os.close(fd) utils.RemoveFile(self.f_dpn("test")) @@ -747,11 +779,28 @@ class TestPidFileFunctions(unittest.TestCase): read_pid = utils.ReadPidFile(pid_file) self.failUnlessEqual(read_pid, new_pid) self.failUnless(utils.IsProcessAlive(new_pid)) + + # Try writing to locked file + try: + utils.WritePidFile(pid_file) + except errors.PidFileLockError, err: + errmsg = str(err) + self.assertTrue(errmsg.endswith(" %s" % new_pid), + msg=("Error message ('%s') didn't contain correct" + " PID (%s)" % (errmsg, new_pid))) + else: + self.fail("Writing to locked file didn't fail") + utils.KillProcess(new_pid, waitpid=True) self.failIf(utils.IsProcessAlive(new_pid)) utils.RemoveFile(self.f_dpn('child')) self.failUnlessRaises(errors.ProgrammerError, utils.KillProcess, 0) + def testExceptionType(self): + # Make sure the PID lock error is a subclass of LockError in case some code + # depends on it + self.assertTrue(issubclass(errors.PidFileLockError, errors.LockError)) + def tearDown(self): shutil.rmtree(self.dir)