self.failUnlessRaises(errors.ProgrammerError, utils.ListVisibleFiles,
"/bin/../tmp")
+ def testMountpoint(self):
+ lvfmp_fn = compat.partial(utils.ListVisibleFiles,
+ _is_mountpoint=lambda _: True)
+ self.assertEqual(lvfmp_fn(self.path), [])
+
+ # Create "lost+found" as a regular file
+ self._CreateFiles(["foo", "bar", ".baz", "lost+found"])
+ self.assertEqual(set(lvfmp_fn(self.path)),
+ set(["foo", "bar", "lost+found"]))
+
+ # Replace "lost+found" with a directory
+ laf_path = utils.PathJoin(self.path, "lost+found")
+ utils.RemoveFile(laf_path)
+ os.mkdir(laf_path)
+ self.assertEqual(set(lvfmp_fn(self.path)), set(["foo", "bar"]))
+
+ def testLostAndFoundNoMountpoint(self):
+ files = ["foo", "bar", ".Hello World", "lost+found"]
+ expected = ["foo", "bar", "lost+found"]
+ self._test(files, expected)
+
-class TestWriteFile(unittest.TestCase):
+class TestWriteFile(testutils.GanetiTestCase):
def setUp(self):
+ testutils.GanetiTestCase.setUp(self)
self.tmpdir = None
self.tfile = tempfile.NamedTemporaryFile()
self.did_pre = False
self.did_write = False
def tearDown(self):
+ testutils.GanetiTestCase.tearDown(self)
if self.tmpdir:
shutil.rmtree(self.tmpdir)
self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name)
self.assertRaises(errors.ProgrammerError, utils.WriteFile,
self.tfile.name, data="test", atime=0)
+ self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name,
+ mode=0400, keep_perms=utils.KP_ALWAYS)
+ self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name,
+ uid=0, keep_perms=utils.KP_ALWAYS)
+ self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name,
+ gid=0, keep_perms=utils.KP_ALWAYS)
+ self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name,
+ mode=0400, uid=0, keep_perms=utils.KP_ALWAYS)
def testPreWrite(self):
utils.WriteFile(self.tfile.name, data="", prewrite=self.markPre)
self.assertTrue("test" in os.listdir(self.tmpdir))
self.assertEqual(len(os.listdir(self.tmpdir)), 2)
+ def testFileMode(self):
+ self.tmpdir = tempfile.mkdtemp()
+ target = utils.PathJoin(self.tmpdir, "target")
+ self.assertRaises(OSError, utils.WriteFile, target, data="data",
+ keep_perms=utils.KP_ALWAYS)
+ # All masks have only user bits set, to avoid interactions with umask
+ utils.WriteFile(target, data="data", mode=0200)
+ self.assertFileMode(target, 0200)
+ utils.WriteFile(target, data="data", mode=0400,
+ keep_perms=utils.KP_IF_EXISTS)
+ self.assertFileMode(target, 0200)
+ utils.WriteFile(target, data="data", keep_perms=utils.KP_ALWAYS)
+ self.assertFileMode(target, 0200)
+ utils.WriteFile(target, data="data", mode=0700)
+ self.assertFileMode(target, 0700)
+
+ def testNewFileMode(self):
+ self.tmpdir = tempfile.mkdtemp()
+ target = utils.PathJoin(self.tmpdir, "target")
+ utils.WriteFile(target, data="data", mode=0400,
+ keep_perms=utils.KP_IF_EXISTS)
+ self.assertFileMode(target, 0400)
class TestFileID(testutils.GanetiTestCase):
def testEquality(self):
read_pid = utils.ReadPidFile(pid_file)
self.failUnlessEqual(read_pid, os.getpid())
self.failUnless(utils.IsProcessAlive(read_pid))
- self.failUnlessRaises(errors.LockError, utils.WritePidFile,
+ self.failUnlessRaises(errors.PidFileLockError, utils.WritePidFile,
self.f_dpn('test'))
os.close(fd)
utils.RemoveFile(self.f_dpn("test"))
read_pid = utils.ReadPidFile(pid_file)
self.failUnlessEqual(read_pid, new_pid)
self.failUnless(utils.IsProcessAlive(new_pid))
+
+ # Try writing to locked file
+ try:
+ utils.WritePidFile(pid_file)
+ except errors.PidFileLockError, err:
+ errmsg = str(err)
+ self.assertTrue(errmsg.endswith(" %s" % new_pid),
+ msg=("Error message ('%s') didn't contain correct"
+ " PID (%s)" % (errmsg, new_pid)))
+ else:
+ self.fail("Writing to locked file didn't fail")
+
utils.KillProcess(new_pid, waitpid=True)
self.failIf(utils.IsProcessAlive(new_pid))
utils.RemoveFile(self.f_dpn('child'))
self.failUnlessRaises(errors.ProgrammerError, utils.KillProcess, 0)
+ def testExceptionType(self):
+ # Make sure the PID lock error is a subclass of LockError in case some code
+ # depends on it
+ self.assertTrue(issubclass(errors.PidFileLockError, errors.LockError))
+
def tearDown(self):
shutil.rmtree(self.dir)