cfg auto update: match ipolicy with enabled disk templates
[ganeti-local] / test / py / ganeti.storage.filestorage_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for unittesting the ganeti.storage.file module"""
23
24 import os
25 import shutil
26 import tempfile
27 import unittest
28
29 from ganeti import errors
30 from ganeti.storage import filestorage
31 from ganeti import utils
32
33 import testutils
34
35
36 class TestFileStorageSpaceInfo(unittest.TestCase):
37
38   def testSpaceInfoPathInvalid(self):
39     """Tests that an error is raised when the given path is not existing.
40
41     """
42     self.assertRaises(errors.CommandError, filestorage.GetFileStorageSpaceInfo,
43                       "/path/does/not/exist/")
44
45   def testSpaceInfoPathValid(self):
46     """Smoke test run on a directory that exists for sure.
47
48     """
49     filestorage.GetFileStorageSpaceInfo("/")
50
51
52 class TestCheckFileStoragePath(unittest.TestCase):
53   def _WriteAllowedFile(self, allowed_paths_filename, allowed_paths):
54     allowed_paths_file = open(allowed_paths_filename, 'w')
55     allowed_paths_file.write('\n'.join(allowed_paths))
56     allowed_paths_file.close()
57
58   def setUp(self):
59     self.tmpdir = tempfile.mkdtemp()
60     self.allowed_paths = [os.path.join(self.tmpdir, "allowed")]
61     for path in self.allowed_paths:
62       os.mkdir(path)
63     self.allowed_paths_filename = os.path.join(self.tmpdir, "allowed-path-file")
64     self._WriteAllowedFile(self.allowed_paths_filename, self.allowed_paths)
65
66   def tearDown(self):
67     shutil.rmtree(self.tmpdir)
68
69   def testCheckFileStoragePathExistance(self):
70     filestorage._CheckFileStoragePathExistance(self.tmpdir)
71
72   def testCheckFileStoragePathExistanceFail(self):
73     path = os.path.join(self.tmpdir, "does/not/exist")
74     self.assertRaises(errors.FileStoragePathError,
75         filestorage._CheckFileStoragePathExistance, path)
76
77   def testCheckFileStoragePathNotWritable(self):
78     path = os.path.join(self.tmpdir, "isnotwritable/")
79     os.mkdir(path)
80     os.chmod(path, 0)
81     self.assertRaises(errors.FileStoragePathError,
82         filestorage._CheckFileStoragePathExistance, path)
83     os.chmod(path, 777)
84
85   def testCheckFileStoragePath(self):
86     path = os.path.join(self.allowed_paths[0], "allowedsubdir")
87     os.mkdir(path)
88     result = filestorage.CheckFileStoragePath(
89         path, _allowed_paths_file=self.allowed_paths_filename)
90     self.assertEqual(None, result)
91
92   def testCheckFileStoragePathNotAllowed(self):
93     path = os.path.join(self.tmpdir, "notallowed")
94     result = filestorage.CheckFileStoragePath(
95         path, _allowed_paths_file=self.allowed_paths_filename)
96     self.assertTrue("not acceptable" in result)
97
98
99 class TestLoadAllowedFileStoragePaths(testutils.GanetiTestCase):
100   def testDevNull(self):
101     self.assertEqual(filestorage._LoadAllowedFileStoragePaths("/dev/null"), [])
102
103   def testNonExistantFile(self):
104     filename = "/tmp/this/file/does/not/exist"
105     assert not os.path.exists(filename)
106     self.assertEqual(filestorage._LoadAllowedFileStoragePaths(filename), [])
107
108   def test(self):
109     tmpfile = self._CreateTempFile()
110
111     utils.WriteFile(tmpfile, data="""
112       # This is a test file
113       /tmp
114       /srv/storage
115       relative/path
116       """)
117
118     self.assertEqual(filestorage._LoadAllowedFileStoragePaths(tmpfile), [
119       "/tmp",
120       "/srv/storage",
121       "relative/path",
122       ])
123
124
125 class TestComputeWrongFileStoragePathsInternal(unittest.TestCase):
126   def testPaths(self):
127     paths = filestorage._GetForbiddenFileStoragePaths()
128
129     for path in ["/bin", "/usr/local/sbin", "/lib64", "/etc", "/sys"]:
130       self.assertTrue(path in paths)
131
132     self.assertEqual(set(map(os.path.normpath, paths)), paths)
133
134   def test(self):
135     vfsp = filestorage._ComputeWrongFileStoragePaths
136     self.assertEqual(vfsp([]), [])
137     self.assertEqual(vfsp(["/tmp"]), [])
138     self.assertEqual(vfsp(["/bin/ls"]), ["/bin/ls"])
139     self.assertEqual(vfsp(["/bin"]), ["/bin"])
140     self.assertEqual(vfsp(["/usr/sbin/vim", "/srv/file-storage"]),
141                      ["/usr/sbin/vim"])
142
143
144 class TestComputeWrongFileStoragePaths(testutils.GanetiTestCase):
145   def test(self):
146     tmpfile = self._CreateTempFile()
147
148     utils.WriteFile(tmpfile, data="""
149       /tmp
150       x/y///z/relative
151       # This is a test file
152       /srv/storage
153       /bin
154       /usr/local/lib32/
155       relative/path
156       """)
157
158     self.assertEqual(
159         filestorage.ComputeWrongFileStoragePaths(_filename=tmpfile),
160         ["/bin",
161          "/usr/local/lib32",
162          "relative/path",
163          "x/y/z/relative",
164         ])
165
166
167 class TestCheckFileStoragePathInternal(unittest.TestCase):
168   def testNonAbsolute(self):
169     for i in ["", "tmp", "foo/bar/baz"]:
170       self.assertRaises(errors.FileStoragePathError,
171                         filestorage._CheckFileStoragePath, i, ["/tmp"])
172
173     self.assertRaises(errors.FileStoragePathError,
174                       filestorage._CheckFileStoragePath, "/tmp", ["tmp", "xyz"])
175
176   def testNoAllowed(self):
177     self.assertRaises(errors.FileStoragePathError,
178                       filestorage._CheckFileStoragePath, "/tmp", [])
179
180   def testNoAdditionalPathComponent(self):
181     self.assertRaises(errors.FileStoragePathError,
182                       filestorage._CheckFileStoragePath, "/tmp/foo",
183                       ["/tmp/foo"])
184
185   def testAllowed(self):
186     filestorage._CheckFileStoragePath("/tmp/foo/a", ["/tmp/foo"])
187     filestorage._CheckFileStoragePath("/tmp/foo/a/x", ["/tmp/foo"])
188
189
190 class TestCheckFileStoragePathExistance(testutils.GanetiTestCase):
191   def testNonExistantFile(self):
192     filename = "/tmp/this/file/does/not/exist"
193     assert not os.path.exists(filename)
194     self.assertRaises(errors.FileStoragePathError,
195                       filestorage.CheckFileStoragePathAcceptance, "/bin/",
196                       _filename=filename)
197     self.assertRaises(errors.FileStoragePathError,
198                       filestorage.CheckFileStoragePathAcceptance,
199                       "/srv/file-storage", _filename=filename)
200
201   def testAllowedPath(self):
202     tmpfile = self._CreateTempFile()
203
204     utils.WriteFile(tmpfile, data="""
205       /srv/storage
206       """)
207
208     filestorage.CheckFileStoragePathAcceptance(
209         "/srv/storage/inst1", _filename=tmpfile)
210
211     # No additional path component
212     self.assertRaises(errors.FileStoragePathError,
213                       filestorage.CheckFileStoragePathAcceptance,
214                       "/srv/storage", _filename=tmpfile)
215
216     # Forbidden path
217     self.assertRaises(errors.FileStoragePathError,
218                       filestorage.CheckFileStoragePathAcceptance,
219                       "/usr/lib64/xyz", _filename=tmpfile)
220
221
222 if __name__ == "__main__":
223   testutils.GanetiTestProgram()