4 # Copyright (C) 2013 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for unittesting the ganeti.storage.file module"""
29 from ganeti import errors
30 from ganeti.storage import filestorage
31 from ganeti import utils
36 class TestFileStorageSpaceInfo(unittest.TestCase):
38 def testSpaceInfoPathInvalid(self):
39 """Tests that an error is raised when the given path is not existing.
42 self.assertRaises(errors.CommandError, filestorage.GetFileStorageSpaceInfo,
43 "/path/does/not/exist/")
45 def testSpaceInfoPathValid(self):
46 """Smoke test run on a directory that exists for sure.
49 filestorage.GetFileStorageSpaceInfo("/")
52 class TestCheckFileStoragePath(unittest.TestCase):
53 def _WriteAllowedFile(self, allowed_paths_filename, allowed_paths):
54 allowed_paths_file = open(allowed_paths_filename, 'w')
55 allowed_paths_file.write('\n'.join(allowed_paths))
56 allowed_paths_file.close()
59 self.tmpdir = tempfile.mkdtemp()
60 self.allowed_paths = [os.path.join(self.tmpdir, "allowed")]
61 for path in self.allowed_paths:
63 self.allowed_paths_filename = os.path.join(self.tmpdir, "allowed-path-file")
64 self._WriteAllowedFile(self.allowed_paths_filename, self.allowed_paths)
67 shutil.rmtree(self.tmpdir)
69 def testCheckFileStoragePathExistance(self):
70 filestorage._CheckFileStoragePathExistance(self.tmpdir)
72 def testCheckFileStoragePathExistanceFail(self):
73 path = os.path.join(self.tmpdir, "does/not/exist")
74 self.assertRaises(errors.FileStoragePathError,
75 filestorage._CheckFileStoragePathExistance, path)
77 def testCheckFileStoragePathNotWritable(self):
78 path = os.path.join(self.tmpdir, "isnotwritable/")
81 self.assertRaises(errors.FileStoragePathError,
82 filestorage._CheckFileStoragePathExistance, path)
85 def testCheckFileStoragePath(self):
86 path = os.path.join(self.allowed_paths[0], "allowedsubdir")
88 result = filestorage.CheckFileStoragePath(
89 path, _allowed_paths_file=self.allowed_paths_filename)
90 self.assertEqual(None, result)
92 def testCheckFileStoragePathNotAllowed(self):
93 path = os.path.join(self.tmpdir, "notallowed")
94 result = filestorage.CheckFileStoragePath(
95 path, _allowed_paths_file=self.allowed_paths_filename)
96 self.assertTrue("not acceptable" in result)
99 class TestLoadAllowedFileStoragePaths(testutils.GanetiTestCase):
100 def testDevNull(self):
101 self.assertEqual(filestorage._LoadAllowedFileStoragePaths("/dev/null"), [])
103 def testNonExistantFile(self):
104 filename = "/tmp/this/file/does/not/exist"
105 assert not os.path.exists(filename)
106 self.assertEqual(filestorage._LoadAllowedFileStoragePaths(filename), [])
109 tmpfile = self._CreateTempFile()
111 utils.WriteFile(tmpfile, data="""
112 # This is a test file
118 self.assertEqual(filestorage._LoadAllowedFileStoragePaths(tmpfile), [
125 class TestComputeWrongFileStoragePathsInternal(unittest.TestCase):
127 paths = filestorage._GetForbiddenFileStoragePaths()
129 for path in ["/bin", "/usr/local/sbin", "/lib64", "/etc", "/sys"]:
130 self.assertTrue(path in paths)
132 self.assertEqual(set(map(os.path.normpath, paths)), paths)
135 vfsp = filestorage._ComputeWrongFileStoragePaths
136 self.assertEqual(vfsp([]), [])
137 self.assertEqual(vfsp(["/tmp"]), [])
138 self.assertEqual(vfsp(["/bin/ls"]), ["/bin/ls"])
139 self.assertEqual(vfsp(["/bin"]), ["/bin"])
140 self.assertEqual(vfsp(["/usr/sbin/vim", "/srv/file-storage"]),
144 class TestComputeWrongFileStoragePaths(testutils.GanetiTestCase):
146 tmpfile = self._CreateTempFile()
148 utils.WriteFile(tmpfile, data="""
151 # This is a test file
159 filestorage.ComputeWrongFileStoragePaths(_filename=tmpfile),
167 class TestCheckFileStoragePathInternal(unittest.TestCase):
168 def testNonAbsolute(self):
169 for i in ["", "tmp", "foo/bar/baz"]:
170 self.assertRaises(errors.FileStoragePathError,
171 filestorage._CheckFileStoragePath, i, ["/tmp"])
173 self.assertRaises(errors.FileStoragePathError,
174 filestorage._CheckFileStoragePath, "/tmp", ["tmp", "xyz"])
176 def testNoAllowed(self):
177 self.assertRaises(errors.FileStoragePathError,
178 filestorage._CheckFileStoragePath, "/tmp", [])
180 def testNoAdditionalPathComponent(self):
181 self.assertRaises(errors.FileStoragePathError,
182 filestorage._CheckFileStoragePath, "/tmp/foo",
185 def testAllowed(self):
186 filestorage._CheckFileStoragePath("/tmp/foo/a", ["/tmp/foo"])
187 filestorage._CheckFileStoragePath("/tmp/foo/a/x", ["/tmp/foo"])
190 class TestCheckFileStoragePathExistance(testutils.GanetiTestCase):
191 def testNonExistantFile(self):
192 filename = "/tmp/this/file/does/not/exist"
193 assert not os.path.exists(filename)
194 self.assertRaises(errors.FileStoragePathError,
195 filestorage.CheckFileStoragePathAcceptance, "/bin/",
197 self.assertRaises(errors.FileStoragePathError,
198 filestorage.CheckFileStoragePathAcceptance,
199 "/srv/file-storage", _filename=filename)
201 def testAllowedPath(self):
202 tmpfile = self._CreateTempFile()
204 utils.WriteFile(tmpfile, data="""
208 filestorage.CheckFileStoragePathAcceptance(
209 "/srv/storage/inst1", _filename=tmpfile)
211 # No additional path component
212 self.assertRaises(errors.FileStoragePathError,
213 filestorage.CheckFileStoragePathAcceptance,
214 "/srv/storage", _filename=tmpfile)
217 self.assertRaises(errors.FileStoragePathError,
218 filestorage.CheckFileStoragePathAcceptance,
219 "/usr/lib64/xyz", _filename=tmpfile)
222 if __name__ == "__main__":
223 testutils.GanetiTestProgram()