Commit fbdac0d9 authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

bdev: Add functions to verify file storage paths



- LoadAllowedFileStoragePaths: Loads a list of allowed file storage
  paths from a file
- CheckFileStoragePath: Checks a path against the list of allowed paths

The unit test for “utils.IsBelowDir” is updated with cases which weren't
tested before.
Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarIustin Pop <iustin@google.com>
parent 04569469
......@@ -36,6 +36,7 @@ from ganeti import constants
from ganeti import objects
from ganeti import compat
from ganeti import netutils
from ganeti import pathutils
# Size of reads in _CanReadDevice
......@@ -88,6 +89,59 @@ def _CanReadDevice(path):
return False
def _CheckFileStoragePath(path, allowed):
"""Checks if a path is in a list of allowed paths for file storage.
@type path: string
@param path: Path to check
@type allowed: list
@param allowed: List of allowed paths
@raise errors.FileStoragePathError: If the path is not allowed
"""
if not os.path.isabs(path):
raise errors.FileStoragePathError("File storage path must be absolute,"
" got '%s'" % path)
for i in allowed:
if not os.path.isabs(i):
logging.info("Ignoring relative path '%s' for file storage", i)
continue
if utils.IsBelowDir(i, path):
break
else:
raise errors.FileStoragePathError("Path '%s' is not acceptable for file"
" storage" % path)
def LoadAllowedFileStoragePaths(filename):
"""Loads file containing allowed file storage paths.
@rtype: list
@return: List of allowed paths (can be an empty list)
"""
try:
contents = utils.ReadFile(filename)
except EnvironmentError:
return []
else:
return utils.FilterEmptyLinesAndComments(contents)
def CheckFileStoragePath(path, _filename=pathutils.FILE_STORAGE_PATHS_FILE):
"""Checks if a path is allowed for file storage.
@type path: string
@param path: Path to check
@raise errors.FileStoragePathError: If the path is not allowed
"""
_CheckFileStoragePath(path, LoadAllowedFileStoragePaths(_filename))
class BlockDev(object):
"""Block device abstract class.
......
......@@ -430,6 +430,12 @@ class RapiTestResult(GenericError):
"""
class FileStoragePathError(GenericError):
"""Error from file storage path validation.
"""
# errors should be added above
......
......@@ -86,6 +86,7 @@ CONF_DIR = SYSCONFDIR + "/ganeti"
USER_SCRIPTS_DIR = CONF_DIR + "/scripts"
VNC_PASSWORD_FILE = CONF_DIR + "/vnc-cluster-password"
HOOKS_BASE_DIR = CONF_DIR + "/hooks"
FILE_STORAGE_PATHS_FILE = CONF_DIR + "/file-storage-paths"
#: Lock file for watcher, locked in shared mode by watcher; lock in exclusive
# mode to block watcher (see L{cli._RunWhileClusterStoppedHelper.Call}
......
......@@ -28,6 +28,7 @@ import unittest
from ganeti import bdev
from ganeti import errors
from ganeti import constants
from ganeti import utils
import testutils
......@@ -372,5 +373,53 @@ class TestRADOSBlockDevice(testutils.GanetiTestCase):
output_extra_matches, volume_name)
if __name__ == '__main__':
class TestCheckFileStoragePath(unittest.TestCase):
def testNonAbsolute(self):
for i in ["", "tmp", "foo/bar/baz"]:
self.assertRaises(errors.FileStoragePathError,
bdev._CheckFileStoragePath, i, ["/tmp"])
self.assertRaises(errors.FileStoragePathError,
bdev._CheckFileStoragePath, "/tmp", ["tmp", "xyz"])
def testNoAllowed(self):
self.assertRaises(errors.FileStoragePathError,
bdev._CheckFileStoragePath, "/tmp", [])
def testNoAdditionalPathComponent(self):
self.assertRaises(errors.FileStoragePathError,
bdev._CheckFileStoragePath, "/tmp/foo", ["/tmp/foo"])
def testAllowed(self):
bdev._CheckFileStoragePath("/tmp/foo/a", ["/tmp/foo"])
bdev._CheckFileStoragePath("/tmp/foo/a/x", ["/tmp/foo"])
class TestLoadAllowedFileStoragePaths(testutils.GanetiTestCase):
def testDevNull(self):
self.assertEqual(bdev.LoadAllowedFileStoragePaths("/dev/null"), [])
def testNonExistantFile(self):
filename = "/tmp/this/file/does/not/exist"
assert not os.path.exists(filename)
self.assertEqual(bdev.LoadAllowedFileStoragePaths(filename), [])
def test(self):
tmpfile = self._CreateTempFile()
utils.WriteFile(tmpfile, data="""
# This is a test file
/tmp
/srv/storage
relative/path
""")
self.assertEqual(bdev.LoadAllowedFileStoragePaths(tmpfile), [
"/tmp",
"/srv/storage",
"relative/path",
])
if __name__ == "__main__":
testutils.GanetiTestProgram()
......@@ -646,6 +646,12 @@ class TestIsNormAbsPath(unittest.TestCase):
class TestIsBelowDir(unittest.TestCase):
"""Testing case for IsBelowDir"""
def testExactlyTheSame(self):
self.assertFalse(utils.IsBelowDir("/a/b", "/a/b"))
self.assertFalse(utils.IsBelowDir("/a/b", "/a/b/"))
self.assertFalse(utils.IsBelowDir("/a/b/", "/a/b"))
self.assertFalse(utils.IsBelowDir("/a/b/", "/a/b/"))
def testSamePrefix(self):
self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/c"))
self.assertTrue(utils.IsBelowDir("/a/b/", "/a/b/e"))
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment