From fbdac0d912e794705b18637a611c9d87265c1501 Mon Sep 17 00:00:00 2001
From: Michael Hanselmann <hansmi@google.com>
Date: Thu, 4 Oct 2012 16:58:26 +0200
Subject: [PATCH] bdev: Add functions to verify file storage paths
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

- LoadAllowedFileStoragePaths: Loads a list of allowed file storage
  paths from a file
- CheckFileStoragePath: Checks a path against the list of allowed paths

The unit test for β€œutils.IsBelowDir” is updated with cases which weren't
tested before.

Signed-off-by: Michael Hanselmann <hansmi@google.com>
Reviewed-by: Iustin Pop <iustin@google.com>
---
 lib/bdev.py                      | 54 ++++++++++++++++++++++++++++++++
 lib/errors.py                    |  6 ++++
 lib/pathutils.py                 |  1 +
 test/ganeti.bdev_unittest.py     | 51 +++++++++++++++++++++++++++++-
 test/ganeti.utils.io_unittest.py |  6 ++++
 5 files changed, 117 insertions(+), 1 deletion(-)

diff --git a/lib/bdev.py b/lib/bdev.py
index d6c1a6659..9e25139aa 100644
--- a/lib/bdev.py
+++ b/lib/bdev.py
@@ -36,6 +36,7 @@ from ganeti import constants
 from ganeti import objects
 from ganeti import compat
 from ganeti import netutils
+from ganeti import pathutils
 
 
 # Size of reads in _CanReadDevice
@@ -88,6 +89,59 @@ def _CanReadDevice(path):
     return False
 
 
+def _CheckFileStoragePath(path, allowed):
+  """Checks if a path is in a list of allowed paths for file storage.
+
+  @type path: string
+  @param path: Path to check
+  @type allowed: list
+  @param allowed: List of allowed paths
+  @raise errors.FileStoragePathError: If the path is not allowed
+
+  """
+  if not os.path.isabs(path):
+    raise errors.FileStoragePathError("File storage path must be absolute,"
+                                      " got '%s'" % path)
+
+  for i in allowed:
+    if not os.path.isabs(i):
+      logging.info("Ignoring relative path '%s' for file storage", i)
+      continue
+
+    if utils.IsBelowDir(i, path):
+      break
+  else:
+    raise errors.FileStoragePathError("Path '%s' is not acceptable for file"
+                                      " storage" % path)
+
+
+def LoadAllowedFileStoragePaths(filename):
+  """Loads file containing allowed file storage paths.
+
+  @rtype: list
+  @return: List of allowed paths (can be an empty list)
+
+  """
+  try:
+    contents = utils.ReadFile(filename)
+  except EnvironmentError:
+    return []
+  else:
+    return utils.FilterEmptyLinesAndComments(contents)
+
+
+def CheckFileStoragePath(path, _filename=pathutils.FILE_STORAGE_PATHS_FILE):
+  """Checks if a path is allowed for file storage.
+
+  @type path: string
+  @param path: Path to check
+  @raise errors.FileStoragePathError: If the path is not allowed
+
+  """
+  _CheckFileStoragePath(path, LoadAllowedFileStoragePaths(_filename))
+
+
+
 class BlockDev(object):
   """Block device abstract class.
 
diff --git a/lib/errors.py b/lib/errors.py
index b9d578f03..062b6583b 100644
--- a/lib/errors.py
+++ b/lib/errors.py
@@ -430,6 +430,12 @@ class RapiTestResult(GenericError):
   """
 
 
+class FileStoragePathError(GenericError):
+  """Error from file storage path validation.
+
+  """
+
+
 # errors should be added above
 
 
diff --git a/lib/pathutils.py b/lib/pathutils.py
index e60a7a468..457b2273c 100644
--- a/lib/pathutils.py
+++ b/lib/pathutils.py
@@ -86,6 +86,7 @@ CONF_DIR = SYSCONFDIR + "/ganeti"
 USER_SCRIPTS_DIR = CONF_DIR + "/scripts"
 VNC_PASSWORD_FILE = CONF_DIR + "/vnc-cluster-password"
 HOOKS_BASE_DIR = CONF_DIR + "/hooks"
+FILE_STORAGE_PATHS_FILE = CONF_DIR + "/file-storage-paths"
 
 #: Lock file for watcher, locked in shared mode by watcher; lock in exclusive
 # mode to block watcher (see L{cli._RunWhileClusterStoppedHelper.Call}
diff --git a/test/ganeti.bdev_unittest.py b/test/ganeti.bdev_unittest.py
index e1aeef6a2..e90071977 100755
--- a/test/ganeti.bdev_unittest.py
+++ b/test/ganeti.bdev_unittest.py
@@ -28,6 +28,7 @@ import unittest
 from ganeti import bdev
 from ganeti import errors
 from ganeti import constants
+from ganeti import utils
 
 import testutils
 
@@ -372,5 +373,53 @@ class TestRADOSBlockDevice(testutils.GanetiTestCase):
                       output_extra_matches, volume_name)
 
 
-if __name__ == '__main__':
+class TestCheckFileStoragePath(unittest.TestCase):
+  def testNonAbsolute(self):
+    for i in ["", "tmp", "foo/bar/baz"]:
+      self.assertRaises(errors.FileStoragePathError,
+                        bdev._CheckFileStoragePath, i, ["/tmp"])
+
+    self.assertRaises(errors.FileStoragePathError,
+                      bdev._CheckFileStoragePath, "/tmp", ["tmp", "xyz"])
+
+  def testNoAllowed(self):
+    self.assertRaises(errors.FileStoragePathError,
+                      bdev._CheckFileStoragePath, "/tmp", [])
+
+  def testNoAdditionalPathComponent(self):
+    self.assertRaises(errors.FileStoragePathError,
+                      bdev._CheckFileStoragePath, "/tmp/foo", ["/tmp/foo"])
+
+  def testAllowed(self):
+    bdev._CheckFileStoragePath("/tmp/foo/a", ["/tmp/foo"])
+    bdev._CheckFileStoragePath("/tmp/foo/a/x", ["/tmp/foo"])
+
+
+class TestLoadAllowedFileStoragePaths(testutils.GanetiTestCase):
+  def testDevNull(self):
+    self.assertEqual(bdev.LoadAllowedFileStoragePaths("/dev/null"), [])
+
+  def testNonExistantFile(self):
+    filename = "/tmp/this/file/does/not/exist"
+    assert not os.path.exists(filename)
+    self.assertEqual(bdev.LoadAllowedFileStoragePaths(filename), [])
+
+  def test(self):
+    tmpfile = self._CreateTempFile()
+
+    utils.WriteFile(tmpfile, data="""
+      # This is a test file
+      /tmp
+      /srv/storage
+      relative/path
+      """)
+
+    self.assertEqual(bdev.LoadAllowedFileStoragePaths(tmpfile), [
+      "/tmp",
+      "/srv/storage",
+      "relative/path",
+      ])
+
+
+if __name__ == "__main__":
   testutils.GanetiTestProgram()
diff --git a/test/ganeti.utils.io_unittest.py b/test/ganeti.utils.io_unittest.py
index 109232a39..bb9dc5e51 100755
--- a/test/ganeti.utils.io_unittest.py
+++ b/test/ganeti.utils.io_unittest.py
@@ -646,6 +646,12 @@ class TestIsNormAbsPath(unittest.TestCase):
 class TestIsBelowDir(unittest.TestCase):
   """Testing case for IsBelowDir"""
 
+  def testExactlyTheSame(self):
+    self.assertFalse(utils.IsBelowDir("/a/b", "/a/b"))
+    self.assertFalse(utils.IsBelowDir("/a/b", "/a/b/"))
+    self.assertFalse(utils.IsBelowDir("/a/b/", "/a/b"))
+    self.assertFalse(utils.IsBelowDir("/a/b/", "/a/b/"))
+
   def testSamePrefix(self):
     self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/c"))
     self.assertTrue(utils.IsBelowDir("/a/b/", "/a/b/e"))
-- 
GitLab