Statistics
| Branch: | Tag: | Revision:

root / test / py / ganeti.storage.filestorage_unittest.py @ 560ef132

History | View | Annotate | Download (10.3 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the ganeti.storage.filestorage module"""
23

    
24
import os
25
import shutil
26
import tempfile
27
import unittest
28

    
29
from ganeti import errors
30
from ganeti.storage import filestorage
31
from ganeti.utils import io
32
from ganeti import utils
33
from ganeti import constants
34

    
35
import testutils
36

    
37

    
38
class TestFileStorageSpaceInfo(unittest.TestCase):
39

    
40
  def testSpaceInfoPathInvalid(self):
41
    """Tests that an error is raised when the given path is not existing.
42

43
    """
44
    self.assertRaises(errors.CommandError, filestorage.GetFileStorageSpaceInfo,
45
                      "/path/does/not/exist/")
46

    
47
  def testSpaceInfoPathValid(self):
48
    """Smoke test run on a directory that exists for sure.
49

50
    """
51
    filestorage.GetFileStorageSpaceInfo("/")
52

    
53

    
54
class TestCheckFileStoragePath(unittest.TestCase):
55
  def _WriteAllowedFile(self, allowed_paths_filename, allowed_paths):
56
    allowed_paths_file = open(allowed_paths_filename, 'w')
57
    allowed_paths_file.write('\n'.join(allowed_paths))
58
    allowed_paths_file.close()
59

    
60
  def setUp(self):
61
    self.tmpdir = tempfile.mkdtemp()
62
    self.allowed_paths = [os.path.join(self.tmpdir, "allowed")]
63
    for path in self.allowed_paths:
64
      os.mkdir(path)
65
    self.allowed_paths_filename = os.path.join(self.tmpdir, "allowed-path-file")
66
    self._WriteAllowedFile(self.allowed_paths_filename, self.allowed_paths)
67

    
68
  def tearDown(self):
69
    shutil.rmtree(self.tmpdir)
70

    
71
  def testCheckFileStoragePathExistance(self):
72
    filestorage._CheckFileStoragePathExistance(self.tmpdir)
73

    
74
  def testCheckFileStoragePathExistanceFail(self):
75
    path = os.path.join(self.tmpdir, "does/not/exist")
76
    self.assertRaises(errors.FileStoragePathError,
77
        filestorage._CheckFileStoragePathExistance, path)
78

    
79
  def testCheckFileStoragePathNotWritable(self):
80
    path = os.path.join(self.tmpdir, "isnotwritable/")
81
    os.mkdir(path)
82
    os.chmod(path, 0)
83
    self.assertRaises(errors.FileStoragePathError,
84
        filestorage._CheckFileStoragePathExistance, path)
85
    os.chmod(path, 777)
86

    
87
  def testCheckFileStoragePath(self):
88
    path = os.path.join(self.allowed_paths[0], "allowedsubdir")
89
    os.mkdir(path)
90
    result = filestorage.CheckFileStoragePath(
91
        path, _allowed_paths_file=self.allowed_paths_filename)
92
    self.assertEqual(None, result)
93

    
94
  def testCheckFileStoragePathNotAllowed(self):
95
    path = os.path.join(self.tmpdir, "notallowed")
96
    result = filestorage.CheckFileStoragePath(
97
        path, _allowed_paths_file=self.allowed_paths_filename)
98
    self.assertTrue("not acceptable" in result)
99

    
100

    
101
class TestLoadAllowedFileStoragePaths(testutils.GanetiTestCase):
102
  def testDevNull(self):
103
    self.assertEqual(filestorage._LoadAllowedFileStoragePaths("/dev/null"), [])
104

    
105
  def testNonExistantFile(self):
106
    filename = "/tmp/this/file/does/not/exist"
107
    assert not os.path.exists(filename)
108
    self.assertEqual(filestorage._LoadAllowedFileStoragePaths(filename), [])
109

    
110
  def test(self):
111
    tmpfile = self._CreateTempFile()
112

    
113
    utils.WriteFile(tmpfile, data="""
114
      # This is a test file
115
      /tmp
116
      /srv/storage
117
      relative/path
118
      """)
119

    
120
    self.assertEqual(filestorage._LoadAllowedFileStoragePaths(tmpfile), [
121
      "/tmp",
122
      "/srv/storage",
123
      "relative/path",
124
      ])
125

    
126

    
127
class TestComputeWrongFileStoragePathsInternal(unittest.TestCase):
128
  def testPaths(self):
129
    paths = filestorage._GetForbiddenFileStoragePaths()
130

    
131
    for path in ["/bin", "/usr/local/sbin", "/lib64", "/etc", "/sys"]:
132
      self.assertTrue(path in paths)
133

    
134
    self.assertEqual(set(map(os.path.normpath, paths)), paths)
135

    
136
  def test(self):
137
    vfsp = filestorage._ComputeWrongFileStoragePaths
138
    self.assertEqual(vfsp([]), [])
139
    self.assertEqual(vfsp(["/tmp"]), [])
140
    self.assertEqual(vfsp(["/bin/ls"]), ["/bin/ls"])
141
    self.assertEqual(vfsp(["/bin"]), ["/bin"])
142
    self.assertEqual(vfsp(["/usr/sbin/vim", "/srv/file-storage"]),
143
                     ["/usr/sbin/vim"])
144

    
145

    
146
class TestComputeWrongFileStoragePaths(testutils.GanetiTestCase):
147
  def test(self):
148
    tmpfile = self._CreateTempFile()
149

    
150
    utils.WriteFile(tmpfile, data="""
151
      /tmp
152
      x/y///z/relative
153
      # This is a test file
154
      /srv/storage
155
      /bin
156
      /usr/local/lib32/
157
      relative/path
158
      """)
159

    
160
    self.assertEqual(
161
        filestorage.ComputeWrongFileStoragePaths(_filename=tmpfile),
162
        ["/bin",
163
         "/usr/local/lib32",
164
         "relative/path",
165
         "x/y/z/relative",
166
        ])
167

    
168

    
169
class TestCheckFileStoragePathInternal(unittest.TestCase):
170
  def testNonAbsolute(self):
171
    for i in ["", "tmp", "foo/bar/baz"]:
172
      self.assertRaises(errors.FileStoragePathError,
173
                        filestorage._CheckFileStoragePath, i, ["/tmp"])
174

    
175
    self.assertRaises(errors.FileStoragePathError,
176
                      filestorage._CheckFileStoragePath, "/tmp", ["tmp", "xyz"])
177

    
178
  def testNoAllowed(self):
179
    self.assertRaises(errors.FileStoragePathError,
180
                      filestorage._CheckFileStoragePath, "/tmp", [])
181

    
182
  def testNoAdditionalPathComponent(self):
183
    self.assertRaises(errors.FileStoragePathError,
184
                      filestorage._CheckFileStoragePath, "/tmp/foo",
185
                      ["/tmp/foo"])
186

    
187
  def testAllowed(self):
188
    filestorage._CheckFileStoragePath("/tmp/foo/a", ["/tmp/foo"])
189
    filestorage._CheckFileStoragePath("/tmp/foo/a/x", ["/tmp/foo"])
190

    
191

    
192
class TestCheckFileStoragePathExistance(testutils.GanetiTestCase):
193
  def testNonExistantFile(self):
194
    filename = "/tmp/this/file/does/not/exist"
195
    assert not os.path.exists(filename)
196
    self.assertRaises(errors.FileStoragePathError,
197
                      filestorage.CheckFileStoragePathAcceptance, "/bin/",
198
                      _filename=filename)
199
    self.assertRaises(errors.FileStoragePathError,
200
                      filestorage.CheckFileStoragePathAcceptance,
201
                      "/srv/file-storage", _filename=filename)
202

    
203
  def testAllowedPath(self):
204
    tmpfile = self._CreateTempFile()
205

    
206
    utils.WriteFile(tmpfile, data="""
207
      /srv/storage
208
      """)
209

    
210
    filestorage.CheckFileStoragePathAcceptance(
211
        "/srv/storage/inst1", _filename=tmpfile)
212

    
213
    # No additional path component
214
    self.assertRaises(errors.FileStoragePathError,
215
                      filestorage.CheckFileStoragePathAcceptance,
216
                      "/srv/storage", _filename=tmpfile)
217

    
218
    # Forbidden path
219
    self.assertRaises(errors.FileStoragePathError,
220
                      filestorage.CheckFileStoragePathAcceptance,
221
                      "/usr/lib64/xyz", _filename=tmpfile)
222

    
223

    
224
class TestFileDeviceHelper(testutils.GanetiTestCase):
225

    
226
  @staticmethod
227
  def _Make(path, create_with_size=None, create_folders=False):
228
    skip_checks = lambda path: None
229
    if create_with_size:
230
      return filestorage.FileDeviceHelper.CreateFile(
231
        path, create_with_size, create_folders=create_folders,
232
        _file_path_acceptance_fn=skip_checks
233
      )
234
    else:
235
      return filestorage.FileDeviceHelper(path,
236
                                          _file_path_acceptance_fn=skip_checks)
237

    
238
  class TempEnvironment(object):
239

    
240
    def __init__(self, create_file=False, delete_file=True):
241
      self.create_file = create_file
242
      self.delete_file = delete_file
243

    
244
    def __enter__(self):
245
      self.directory = tempfile.mkdtemp()
246
      self.subdirectory = io.PathJoin(self.directory, "pinky")
247
      os.mkdir(self.subdirectory)
248
      self.path = io.PathJoin(self.subdirectory, "bunny")
249
      self.volume = TestFileDeviceHelper._Make(self.path)
250
      if self.create_file:
251
        open(self.path, mode="w").close()
252
      return self
253

    
254
    def __exit__(self, *args):
255
      if self.delete_file:
256
        os.unlink(self.path)
257
      os.rmdir(self.subdirectory)
258
      os.rmdir(self.directory)
259
      return False #don't swallow exceptions
260

    
261
  def testOperationsOnNonExistingFiles(self):
262
    path = "/e/no/ent"
263
    volume = TestFileDeviceHelper._Make(path)
264

    
265
    # These should fail horribly.
266
    volume.Exists(assert_exists=False)
267
    self.assertRaises(errors.BlockDeviceError, lambda: \
268
      volume.Exists(assert_exists=True))
269
    self.assertRaises(errors.BlockDeviceError, lambda: \
270
      volume.Size())
271
    self.assertRaises(errors.BlockDeviceError, lambda: \
272
      volume.Grow(0.020, True, False, None))
273

    
274
    # Removing however fails silently.
275
    volume.Remove()
276

    
277
    # Make sure we don't create all directories for you unless we ask for it
278
    self.assertRaises(errors.BlockDeviceError, lambda: \
279
      TestFileDeviceHelper._Make(path, create_with_size=1))
280

    
281
  def testFileCreation(self):
282
    with TestFileDeviceHelper.TempEnvironment() as env:
283
      TestFileDeviceHelper._Make(env.path, create_with_size=1)
284

    
285
      self.assertTrue(env.volume.Exists())
286
      env.volume.Exists(assert_exists=True)
287
      self.assertRaises(errors.BlockDeviceError, lambda: \
288
        env.volume.Exists(assert_exists=False))
289

    
290
    self.assertRaises(errors.BlockDeviceError, lambda: \
291
      TestFileDeviceHelper._Make("/enoent", create_with_size=0.042))
292

    
293
  def testFailSizeDirectory(self):
294
  # This should still fail.
295
   with TestFileDeviceHelper.TempEnvironment(delete_file=False) as env:
296
    self.assertRaises(errors.BlockDeviceError, lambda: \
297
      TestFileDeviceHelper._Make(env.subdirectory).Size())
298

    
299
  def testGrowFile(self):
300
    with TestFileDeviceHelper.TempEnvironment(create_file=True) as env:
301
      self.assertRaises(errors.BlockDeviceError, lambda: \
302
        env.volume.Grow(-1, False, True, None))
303

    
304
      env.volume.Grow(2, False, True, None)
305
      self.assertEqual(2.0, env.volume.Size() / 1024.0**2)
306

    
307
  def testRemoveFile(self):
308
    with TestFileDeviceHelper.TempEnvironment(create_file=True,
309
                                              delete_file=False) as env:
310
      env.volume.Remove()
311
      env.volume.Exists(assert_exists=False)
312

    
313
if __name__ == "__main__":
314
  testutils.GanetiTestProgram()