Merge branch 'stable-2.6'
[ganeti-local] / test / ganeti.utils.io_unittest.py
index 4e6dce1..109232a 100755 (executable)
@@ -239,9 +239,31 @@ class TestListVisibleFiles(unittest.TestCase):
     self.failUnlessRaises(errors.ProgrammerError, utils.ListVisibleFiles,
                           "/bin/../tmp")
 
+  def testMountpoint(self):
+    lvfmp_fn = compat.partial(utils.ListVisibleFiles,
+                              _is_mountpoint=lambda _: True)
+    self.assertEqual(lvfmp_fn(self.path), [])
+
+    # Create "lost+found" as a regular file
+    self._CreateFiles(["foo", "bar", ".baz", "lost+found"])
+    self.assertEqual(set(lvfmp_fn(self.path)),
+                     set(["foo", "bar", "lost+found"]))
+
+    # Replace "lost+found" with a directory
+    laf_path = utils.PathJoin(self.path, "lost+found")
+    utils.RemoveFile(laf_path)
+    os.mkdir(laf_path)
+    self.assertEqual(set(lvfmp_fn(self.path)), set(["foo", "bar"]))
+
+  def testLostAndFoundNoMountpoint(self):
+    files = ["foo", "bar", ".Hello World", "lost+found"]
+    expected = ["foo", "bar", "lost+found"]
+    self._test(files, expected)
+
 
-class TestWriteFile(unittest.TestCase):
+class TestWriteFile(testutils.GanetiTestCase):
   def setUp(self):
+    testutils.GanetiTestCase.setUp(self)
     self.tmpdir = None
     self.tfile = tempfile.NamedTemporaryFile()
     self.did_pre = False
@@ -249,6 +271,7 @@ class TestWriteFile(unittest.TestCase):
     self.did_write = False
 
   def tearDown(self):
+    testutils.GanetiTestCase.tearDown(self)
     if self.tmpdir:
       shutil.rmtree(self.tmpdir)
 
@@ -277,6 +300,14 @@ class TestWriteFile(unittest.TestCase):
     self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name)
     self.assertRaises(errors.ProgrammerError, utils.WriteFile,
                       self.tfile.name, data="test", atime=0)
+    self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name,
+                      mode=0400, keep_perms=utils.KP_ALWAYS)
+    self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name,
+                      uid=0, keep_perms=utils.KP_ALWAYS)
+    self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name,
+                      gid=0, keep_perms=utils.KP_ALWAYS)
+    self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name,
+                      mode=0400, uid=0, keep_perms=utils.KP_ALWAYS)
 
   def testPreWrite(self):
     utils.WriteFile(self.tfile.name, data="", prewrite=self.markPre)
@@ -373,6 +404,28 @@ class TestWriteFile(unittest.TestCase):
     self.assertTrue("test" in os.listdir(self.tmpdir))
     self.assertEqual(len(os.listdir(self.tmpdir)), 2)
 
+  def testFileMode(self):
+    self.tmpdir = tempfile.mkdtemp()
+    target = utils.PathJoin(self.tmpdir, "target")
+    self.assertRaises(OSError, utils.WriteFile, target, data="data",
+                      keep_perms=utils.KP_ALWAYS)
+    # All masks have only user bits set, to avoid interactions with umask
+    utils.WriteFile(target, data="data", mode=0200)
+    self.assertFileMode(target, 0200)
+    utils.WriteFile(target, data="data", mode=0400,
+                    keep_perms=utils.KP_IF_EXISTS)
+    self.assertFileMode(target, 0200)
+    utils.WriteFile(target, data="data", keep_perms=utils.KP_ALWAYS)
+    self.assertFileMode(target, 0200)
+    utils.WriteFile(target, data="data", mode=0700)
+    self.assertFileMode(target, 0700)
+
+  def testNewFileMode(self):
+    self.tmpdir = tempfile.mkdtemp()
+    target = utils.PathJoin(self.tmpdir, "target")
+    utils.WriteFile(target, data="data", mode=0400,
+                    keep_perms=utils.KP_IF_EXISTS)
+    self.assertFileMode(target, 0400)
 
 class TestFileID(testutils.GanetiTestCase):
   def testEquality(self):
@@ -383,7 +436,6 @@ class TestFileID(testutils.GanetiTestCase):
   def testUpdate(self):
     name = self._CreateTempFile()
     oldi = utils.GetFileID(path=name)
-    os.utime(name, None)
     fd = os.open(name, os.O_RDWR)
     try:
       newi = utils.GetFileID(fd=fd)
@@ -508,12 +560,15 @@ class TestRename(unittest.TestCase):
     self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
     self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/xyz")))
 
-    utils.RenameFile(os.path.join(self.tmpdir, "test/xyz"),
-                     os.path.join(self.tmpdir, "test/foo/bar/baz"),
-                     mkdir=True)
-    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
-    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test/foo/bar")))
-    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/foo/bar/baz")))
+    self.assertRaises(EnvironmentError, utils.RenameFile,
+                      os.path.join(self.tmpdir, "test/xyz"),
+                      os.path.join(self.tmpdir, "test/foo/bar/baz"),
+                      mkdir=True)
+
+    self.assertTrue(os.path.exists(os.path.join(self.tmpdir, "test/xyz")))
+    self.assertFalse(os.path.exists(os.path.join(self.tmpdir, "test/foo/bar")))
+    self.assertFalse(os.path.exists(os.path.join(self.tmpdir,
+                                                 "test/foo/bar/baz")))
 
 
 class TestMakedirs(unittest.TestCase):
@@ -688,7 +743,7 @@ class TestPidFileFunctions(unittest.TestCase):
     read_pid = utils.ReadPidFile(pid_file)
     self.failUnlessEqual(read_pid, os.getpid())
     self.failUnless(utils.IsProcessAlive(read_pid))
-    self.failUnlessRaises(errors.LockError, utils.WritePidFile,
+    self.failUnlessRaises(errors.PidFileLockError, utils.WritePidFile,
                           self.f_dpn('test'))
     os.close(fd)
     utils.RemoveFile(self.f_dpn("test"))
@@ -724,11 +779,28 @@ class TestPidFileFunctions(unittest.TestCase):
     read_pid = utils.ReadPidFile(pid_file)
     self.failUnlessEqual(read_pid, new_pid)
     self.failUnless(utils.IsProcessAlive(new_pid))
+
+    # Try writing to locked file
+    try:
+      utils.WritePidFile(pid_file)
+    except errors.PidFileLockError, err:
+      errmsg = str(err)
+      self.assertTrue(errmsg.endswith(" %s" % new_pid),
+                      msg=("Error message ('%s') didn't contain correct"
+                           " PID (%s)" % (errmsg, new_pid)))
+    else:
+      self.fail("Writing to locked file didn't fail")
+
     utils.KillProcess(new_pid, waitpid=True)
     self.failIf(utils.IsProcessAlive(new_pid))
     utils.RemoveFile(self.f_dpn('child'))
     self.failUnlessRaises(errors.ProgrammerError, utils.KillProcess, 0)
 
+  def testExceptionType(self):
+    # Make sure the PID lock error is a subclass of LockError in case some code
+    # depends on it
+    self.assertTrue(issubclass(errors.PidFileLockError, errors.LockError))
+
   def tearDown(self):
     shutil.rmtree(self.dir)