import pyparsing as pyp
import os
import logging
+import itertools
from ganeti import utils
from ganeti import errors
return False
+def _GetForbiddenFileStoragePaths():
+ """Builds a list of path prefixes which shouldn't be used for file storage.
+
+ @rtype: frozenset
+
+ """
+ paths = set([
+ "/boot",
+ "/dev",
+ "/etc",
+ "/home",
+ "/proc",
+ "/root",
+ "/sys",
+ ])
+
+ for prefix in ["", "/usr", "/usr/local"]:
+ paths.update(map(lambda s: "%s/%s" % (prefix, s),
+ ["bin", "lib", "lib32", "lib64", "sbin"]))
+
+ return frozenset(map(os.path.normpath, paths))
+
+
+def _ComputeWrongFileStoragePaths(paths,
+ _forbidden=_GetForbiddenFileStoragePaths()):
+ """Cross-checks a list of paths for prefixes considered bad.
+
+ Some paths, e.g. "/bin", should not be used for file storage.
+
+ @type paths: list
+ @param paths: List of paths to be checked
+ @rtype: list
+ @return: Sorted list of paths for which the user should be warned
+
+ """
+ def _Check(path):
+ return (not os.path.isabs(path) or
+ path in _forbidden or
+ filter(lambda p: utils.IsBelowDir(p, path), _forbidden))
+
+ return utils.NiceSort(filter(_Check, map(os.path.normpath, paths)))
+
+
+def ComputeWrongFileStoragePaths(_filename=pathutils.FILE_STORAGE_PATHS_FILE):
+ """Returns a list of file storage paths whose prefix is considered bad.
+
+ See L{_ComputeWrongFileStoragePaths}.
+
+ """
+ return _ComputeWrongFileStoragePaths(_LoadAllowedFileStoragePaths(_filename))
+
+
def _CheckFileStoragePath(path, allowed):
"""Checks if a path is in a list of allowed paths for file storage.
" storage" % path)
-def LoadAllowedFileStoragePaths(filename):
+def _LoadAllowedFileStoragePaths(filename):
"""Loads file containing allowed file storage paths.
@rtype: list
@raise errors.FileStoragePathError: If the path is not allowed
"""
- _CheckFileStoragePath(path, LoadAllowedFileStoragePaths(_filename))
+ allowed = _LoadAllowedFileStoragePaths(_filename)
+
+ if _ComputeWrongFileStoragePaths([path]):
+ raise errors.FileStoragePathError("Path '%s' uses a forbidden prefix" %
+ path)
+
+ _CheckFileStoragePath(path, allowed)
class BlockDev(object):
output_extra_matches, volume_name)
-class TestCheckFileStoragePath(unittest.TestCase):
+class TestComputeWrongFileStoragePathsInternal(unittest.TestCase):
+ def testPaths(self):
+ paths = bdev._GetForbiddenFileStoragePaths()
+
+ for path in ["/bin", "/usr/local/sbin", "/lib64", "/etc", "/sys"]:
+ self.assertTrue(path in paths)
+
+ self.assertEqual(set(map(os.path.normpath, paths)), paths)
+
+ def test(self):
+ vfsp = bdev._ComputeWrongFileStoragePaths
+ self.assertEqual(vfsp([]), [])
+ self.assertEqual(vfsp(["/tmp"]), [])
+ self.assertEqual(vfsp(["/bin/ls"]), ["/bin/ls"])
+ self.assertEqual(vfsp(["/bin"]), ["/bin"])
+ self.assertEqual(vfsp(["/usr/sbin/vim", "/srv/file-storage"]),
+ ["/usr/sbin/vim"])
+
+
+class TestComputeWrongFileStoragePaths(testutils.GanetiTestCase):
+ def test(self):
+ tmpfile = self._CreateTempFile()
+
+ utils.WriteFile(tmpfile, data="""
+ /tmp
+ x/y///z/relative
+ # This is a test file
+ /srv/storage
+ /bin
+ /usr/local/lib32/
+ relative/path
+ """)
+
+ self.assertEqual(bdev.ComputeWrongFileStoragePaths(_filename=tmpfile), [
+ "/bin",
+ "/usr/local/lib32",
+ "relative/path",
+ "x/y/z/relative",
+ ])
+
+
+class TestCheckFileStoragePathInternal(unittest.TestCase):
def testNonAbsolute(self):
for i in ["", "tmp", "foo/bar/baz"]:
self.assertRaises(errors.FileStoragePathError,
bdev._CheckFileStoragePath("/tmp/foo/a/x", ["/tmp/foo"])
+class TestCheckFileStoragePath(testutils.GanetiTestCase):
+ def testNonExistantFile(self):
+ filename = "/tmp/this/file/does/not/exist"
+ assert not os.path.exists(filename)
+ self.assertRaises(errors.FileStoragePathError,
+ bdev.CheckFileStoragePath, "/bin/", _filename=filename)
+ self.assertRaises(errors.FileStoragePathError,
+ bdev.CheckFileStoragePath, "/srv/file-storage",
+ _filename=filename)
+
+ def testAllowedPath(self):
+ tmpfile = self._CreateTempFile()
+
+ utils.WriteFile(tmpfile, data="""
+ /srv/storage
+ """)
+
+ bdev.CheckFileStoragePath("/srv/storage/inst1", _filename=tmpfile)
+
+ # No additional path component
+ self.assertRaises(errors.FileStoragePathError,
+ bdev.CheckFileStoragePath, "/srv/storage",
+ _filename=tmpfile)
+
+ # Forbidden path
+ self.assertRaises(errors.FileStoragePathError,
+ bdev.CheckFileStoragePath, "/usr/lib64/xyz",
+ _filename=tmpfile)
+
+
class TestLoadAllowedFileStoragePaths(testutils.GanetiTestCase):
def testDevNull(self):
- self.assertEqual(bdev.LoadAllowedFileStoragePaths("/dev/null"), [])
+ self.assertEqual(bdev._LoadAllowedFileStoragePaths("/dev/null"), [])
def testNonExistantFile(self):
filename = "/tmp/this/file/does/not/exist"
assert not os.path.exists(filename)
- self.assertEqual(bdev.LoadAllowedFileStoragePaths(filename), [])
+ self.assertEqual(bdev._LoadAllowedFileStoragePaths(filename), [])
def test(self):
tmpfile = self._CreateTempFile()
relative/path
""")
- self.assertEqual(bdev.LoadAllowedFileStoragePaths(tmpfile), [
+ self.assertEqual(bdev._LoadAllowedFileStoragePaths(tmpfile), [
"/tmp",
"/srv/storage",
"relative/path",