root / test / py / ganeti.storage.filestorage_unittest.py @ f3ebe73e
History | View | Annotate | Download (7.1 kB)
1 |
#!/usr/bin/python
|
---|---|
2 |
#
|
3 |
|
4 |
# Copyright (C) 2013 Google Inc.
|
5 |
#
|
6 |
# This program is free software; you can redistribute it and/or modify
|
7 |
# it under the terms of the GNU General Public License as published by
|
8 |
# the Free Software Foundation; either version 2 of the License, or
|
9 |
# (at your option) any later version.
|
10 |
#
|
11 |
# This program is distributed in the hope that it will be useful, but
|
12 |
# WITHOUT ANY WARRANTY; without even the implied warranty of
|
13 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
14 |
# General Public License for more details.
|
15 |
#
|
16 |
# You should have received a copy of the GNU General Public License
|
17 |
# along with this program; if not, write to the Free Software
|
18 |
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
|
19 |
# 02110-1301, USA.
|
20 |
|
21 |
|
22 |
"""Script for unittesting the ganeti.storage.file module"""
|
23 |
|
24 |
import os |
25 |
import shutil |
26 |
import tempfile |
27 |
import unittest |
28 |
|
29 |
from ganeti import errors |
30 |
from ganeti.storage import filestorage |
31 |
from ganeti import utils |
32 |
|
33 |
import testutils |
34 |
|
35 |
|
36 |
class TestFileStorageSpaceInfo(unittest.TestCase): |
37 |
|
38 |
def testSpaceInfoPathInvalid(self): |
39 |
"""Tests that an error is raised when the given path is not existing.
|
40 |
|
41 |
"""
|
42 |
self.assertRaises(errors.CommandError, filestorage.GetFileStorageSpaceInfo,
|
43 |
"/path/does/not/exist/")
|
44 |
|
45 |
def testSpaceInfoPathValid(self): |
46 |
"""Smoke test run on a directory that exists for sure.
|
47 |
|
48 |
"""
|
49 |
filestorage.GetFileStorageSpaceInfo("/")
|
50 |
|
51 |
|
52 |
class TestCheckFileStoragePath(unittest.TestCase): |
53 |
def _WriteAllowedFile(self, allowed_paths_filename, allowed_paths): |
54 |
allowed_paths_file = open(allowed_paths_filename, 'w') |
55 |
allowed_paths_file.write('\n'.join(allowed_paths))
|
56 |
allowed_paths_file.close() |
57 |
|
58 |
def setUp(self): |
59 |
self.tmpdir = tempfile.mkdtemp()
|
60 |
self.allowed_paths = [os.path.join(self.tmpdir, "allowed")] |
61 |
for path in self.allowed_paths: |
62 |
os.mkdir(path) |
63 |
self.allowed_paths_filename = os.path.join(self.tmpdir, "allowed-path-file") |
64 |
self._WriteAllowedFile(self.allowed_paths_filename, self.allowed_paths) |
65 |
|
66 |
def tearDown(self): |
67 |
shutil.rmtree(self.tmpdir)
|
68 |
|
69 |
def testCheckFileStoragePathExistance(self): |
70 |
filestorage._CheckFileStoragePathExistance(self.tmpdir)
|
71 |
|
72 |
def testCheckFileStoragePathExistanceFail(self): |
73 |
path = os.path.join(self.tmpdir, "does/not/exist") |
74 |
self.assertRaises(errors.FileStoragePathError,
|
75 |
filestorage._CheckFileStoragePathExistance, path) |
76 |
|
77 |
def testCheckFileStoragePathNotWritable(self): |
78 |
path = os.path.join(self.tmpdir, "isnotwritable/") |
79 |
os.mkdir(path) |
80 |
os.chmod(path, 0)
|
81 |
self.assertRaises(errors.FileStoragePathError,
|
82 |
filestorage._CheckFileStoragePathExistance, path) |
83 |
os.chmod(path, 777)
|
84 |
|
85 |
def testCheckFileStoragePath(self): |
86 |
path = os.path.join(self.allowed_paths[0], "allowedsubdir") |
87 |
os.mkdir(path) |
88 |
result = filestorage.CheckFileStoragePath( |
89 |
path, _allowed_paths_file=self.allowed_paths_filename)
|
90 |
self.assertEqual(None, result) |
91 |
|
92 |
def testCheckFileStoragePathNotAllowed(self): |
93 |
path = os.path.join(self.tmpdir, "notallowed") |
94 |
result = filestorage.CheckFileStoragePath( |
95 |
path, _allowed_paths_file=self.allowed_paths_filename)
|
96 |
self.assertTrue("not acceptable" in result) |
97 |
|
98 |
|
99 |
class TestLoadAllowedFileStoragePaths(testutils.GanetiTestCase): |
100 |
def testDevNull(self): |
101 |
self.assertEqual(filestorage._LoadAllowedFileStoragePaths("/dev/null"), []) |
102 |
|
103 |
def testNonExistantFile(self): |
104 |
filename = "/tmp/this/file/does/not/exist"
|
105 |
assert not os.path.exists(filename) |
106 |
self.assertEqual(filestorage._LoadAllowedFileStoragePaths(filename), [])
|
107 |
|
108 |
def test(self): |
109 |
tmpfile = self._CreateTempFile()
|
110 |
|
111 |
utils.WriteFile(tmpfile, data="""
|
112 |
# This is a test file
|
113 |
/tmp
|
114 |
/srv/storage
|
115 |
relative/path
|
116 |
""")
|
117 |
|
118 |
self.assertEqual(filestorage._LoadAllowedFileStoragePaths(tmpfile), [
|
119 |
"/tmp",
|
120 |
"/srv/storage",
|
121 |
"relative/path",
|
122 |
]) |
123 |
|
124 |
|
125 |
class TestComputeWrongFileStoragePathsInternal(unittest.TestCase): |
126 |
def testPaths(self): |
127 |
paths = filestorage._GetForbiddenFileStoragePaths() |
128 |
|
129 |
for path in ["/bin", "/usr/local/sbin", "/lib64", "/etc", "/sys"]: |
130 |
self.assertTrue(path in paths) |
131 |
|
132 |
self.assertEqual(set(map(os.path.normpath, paths)), paths) |
133 |
|
134 |
def test(self): |
135 |
vfsp = filestorage._ComputeWrongFileStoragePaths |
136 |
self.assertEqual(vfsp([]), [])
|
137 |
self.assertEqual(vfsp(["/tmp"]), []) |
138 |
self.assertEqual(vfsp(["/bin/ls"]), ["/bin/ls"]) |
139 |
self.assertEqual(vfsp(["/bin"]), ["/bin"]) |
140 |
self.assertEqual(vfsp(["/usr/sbin/vim", "/srv/file-storage"]), |
141 |
["/usr/sbin/vim"])
|
142 |
|
143 |
|
144 |
class TestComputeWrongFileStoragePaths(testutils.GanetiTestCase): |
145 |
def test(self): |
146 |
tmpfile = self._CreateTempFile()
|
147 |
|
148 |
utils.WriteFile(tmpfile, data="""
|
149 |
/tmp
|
150 |
x/y///z/relative
|
151 |
# This is a test file
|
152 |
/srv/storage
|
153 |
/bin
|
154 |
/usr/local/lib32/
|
155 |
relative/path
|
156 |
""")
|
157 |
|
158 |
self.assertEqual(
|
159 |
filestorage.ComputeWrongFileStoragePaths(_filename=tmpfile), |
160 |
["/bin",
|
161 |
"/usr/local/lib32",
|
162 |
"relative/path",
|
163 |
"x/y/z/relative",
|
164 |
]) |
165 |
|
166 |
|
167 |
class TestCheckFileStoragePathInternal(unittest.TestCase): |
168 |
def testNonAbsolute(self): |
169 |
for i in ["", "tmp", "foo/bar/baz"]: |
170 |
self.assertRaises(errors.FileStoragePathError,
|
171 |
filestorage._CheckFileStoragePath, i, ["/tmp"])
|
172 |
|
173 |
self.assertRaises(errors.FileStoragePathError,
|
174 |
filestorage._CheckFileStoragePath, "/tmp", ["tmp", "xyz"]) |
175 |
|
176 |
def testNoAllowed(self): |
177 |
self.assertRaises(errors.FileStoragePathError,
|
178 |
filestorage._CheckFileStoragePath, "/tmp", [])
|
179 |
|
180 |
def testNoAdditionalPathComponent(self): |
181 |
self.assertRaises(errors.FileStoragePathError,
|
182 |
filestorage._CheckFileStoragePath, "/tmp/foo",
|
183 |
["/tmp/foo"])
|
184 |
|
185 |
def testAllowed(self): |
186 |
filestorage._CheckFileStoragePath("/tmp/foo/a", ["/tmp/foo"]) |
187 |
filestorage._CheckFileStoragePath("/tmp/foo/a/x", ["/tmp/foo"]) |
188 |
|
189 |
|
190 |
class TestCheckFileStoragePathExistance(testutils.GanetiTestCase): |
191 |
def testNonExistantFile(self): |
192 |
filename = "/tmp/this/file/does/not/exist"
|
193 |
assert not os.path.exists(filename) |
194 |
self.assertRaises(errors.FileStoragePathError,
|
195 |
filestorage.CheckFileStoragePathAcceptance, "/bin/",
|
196 |
_filename=filename) |
197 |
self.assertRaises(errors.FileStoragePathError,
|
198 |
filestorage.CheckFileStoragePathAcceptance, |
199 |
"/srv/file-storage", _filename=filename)
|
200 |
|
201 |
def testAllowedPath(self): |
202 |
tmpfile = self._CreateTempFile()
|
203 |
|
204 |
utils.WriteFile(tmpfile, data="""
|
205 |
/srv/storage
|
206 |
""")
|
207 |
|
208 |
filestorage.CheckFileStoragePathAcceptance( |
209 |
"/srv/storage/inst1", _filename=tmpfile)
|
210 |
|
211 |
# No additional path component
|
212 |
self.assertRaises(errors.FileStoragePathError,
|
213 |
filestorage.CheckFileStoragePathAcceptance, |
214 |
"/srv/storage", _filename=tmpfile)
|
215 |
|
216 |
# Forbidden path
|
217 |
self.assertRaises(errors.FileStoragePathError,
|
218 |
filestorage.CheckFileStoragePathAcceptance, |
219 |
"/usr/lib64/xyz", _filename=tmpfile)
|
220 |
|
221 |
|
222 |
if __name__ == "__main__": |
223 |
testutils.GanetiTestProgram() |