Commit b81b3c96 authored by René Nussbaumer's avatar René Nussbaumer
Browse files

ensure_dirs: Move some useful functions into utils.



With this change we can easily reuse this functionality where it makes
sense on other parts of Ganeti.
Signed-off-by: default avatarRené Nussbaumer <rn@google.com>
Reviewed-by: default avatarMichael Hanselmann <hansmi@google.com>
parent af5af644
......@@ -22,12 +22,10 @@
"""
import errno
import os
import os.path
import optparse
import sys
import stat
import logging
from ganeti import constants
......@@ -49,79 +47,6 @@ ALL_TYPES = frozenset([
])
class EnsureError(errors.GenericError):
"""Top-level error class related to this script.
"""
def EnsurePermission(path, mode, uid=-1, gid=-1, must_exist=True,
_chmod_fn=os.chmod, _chown_fn=os.chown, _stat_fn=os.stat):
"""Ensures that given path has given mode.
@param path: The path to the file
@param mode: The mode of the file
@param uid: The uid of the owner of this file
@param gid: The gid of the owner of this file
@param must_exist: Specifies if non-existance of path will be an error
@param _chmod_fn: chmod function to use (unittest only)
@param _chown_fn: chown function to use (unittest only)
"""
logging.debug("Checking %s", path)
try:
st = _stat_fn(path)
fmode = stat.S_IMODE(st[stat.ST_MODE])
if fmode != mode:
logging.debug("Changing mode of %s from %#o to %#o", path, fmode, mode)
_chmod_fn(path, mode)
if max(uid, gid) > -1:
fuid = st[stat.ST_UID]
fgid = st[stat.ST_GID]
if fuid != uid or fgid != gid:
logging.debug("Changing owner of %s from UID %s/GID %s to"
" UID %s/GID %s", path, fuid, fgid, uid, gid)
_chown_fn(path, uid, gid)
except EnvironmentError, err:
if err.errno == errno.ENOENT:
if must_exist:
raise EnsureError("Path %s should exist, but does not" % path)
else:
raise EnsureError("Error while changing permissions on %s: %s" %
(path, err))
def EnsureDir(path, mode, uid, gid, _lstat_fn=os.lstat, _mkdir_fn=os.mkdir,
_ensure_fn=EnsurePermission):
"""Ensures that given path is a dir and has given mode, uid and gid set.
@param path: The path to the file
@param mode: The mode of the file
@param uid: The uid of the owner of this file
@param gid: The gid of the owner of this file
@param _lstat_fn: Stat function to use (unittest only)
@param _mkdir_fn: mkdir function to use (unittest only)
@param _ensure_fn: ensure function to use (unittest only)
"""
logging.debug("Checking directory %s", path)
try:
# We don't want to follow symlinks
st = _lstat_fn(path)
except EnvironmentError, err:
if err.errno != errno.ENOENT:
raise EnsureError("stat(2) on %s failed: %s" % (path, err))
_mkdir_fn(path)
else:
if not stat.S_ISDIR(st[stat.ST_MODE]):
raise EnsureError("Path %s is expected to be a directory, but isn't" %
path)
_ensure_fn(path, mode, uid=uid, gid=gid)
def RecursiveEnsure(path, uid, gid, dir_perm, file_perm):
"""Ensures permissions recursively down a directory.
......@@ -141,11 +66,12 @@ def RecursiveEnsure(path, uid, gid, dir_perm, file_perm):
for root, dirs, files in os.walk(path):
for subdir in dirs:
EnsurePermission(os.path.join(root, subdir), dir_perm, uid=uid, gid=gid)
utils.EnforcePermission(os.path.join(root, subdir), dir_perm, uid=uid,
gid=gid)
for filename in files:
EnsurePermission(os.path.join(root, filename), file_perm, uid=uid,
gid=gid)
utils.EnforcePermission(os.path.join(root, filename), file_perm, uid=uid,
gid=gid)
def EnsureQueueDir(path, mode, uid, gid):
......@@ -159,7 +85,8 @@ def EnsureQueueDir(path, mode, uid, gid):
"""
for filename in utils.ListVisibleFiles(path):
if constants.JOB_FILE_RE.match(filename):
EnsurePermission(utils.PathJoin(path, filename), mode, uid=uid, gid=gid)
utils.EnforcePermission(utils.PathJoin(path, filename), mode, uid=uid,
gid=gid)
def ProcessPath(path):
......@@ -176,12 +103,13 @@ def ProcessPath(path):
# No additional parameters
assert len(path[5:]) == 0
if pathtype == DIR:
EnsureDir(pathname, mode, uid, gid)
utils.MakeDirWithPerm(pathname, mode, uid, gid)
elif pathtype == QUEUE_DIR:
EnsureQueueDir(pathname, mode, uid, gid)
elif pathtype == FILE:
(must_exist, ) = path[5:]
EnsurePermission(pathname, mode, uid=uid, gid=gid, must_exist=must_exist)
utils.EnforcePermission(pathname, mode, uid=uid, gid=gid,
must_exist=must_exist)
def GetPaths():
......@@ -323,7 +251,7 @@ def Main():
if opts.full_run:
RecursiveEnsure(constants.JOB_QUEUE_ARCHIVE_DIR, getent.masterd_uid,
getent.masterd_gid, 0700, 0600)
except EnsureError, err:
except errors.GenericError, err:
logging.error("An error occurred while setting permissions: %s", err)
return constants.EXIT_FAILURE
......
......@@ -28,6 +28,7 @@ import shutil
import tempfile
import errno
import time
import stat
from ganeti import errors
from ganeti import constants
......@@ -331,6 +332,73 @@ def RenameFile(old, new, mkdir=False, mkdir_mode=0750, dir_uid=None,
raise
def EnforcePermission(path, mode, uid=-1, gid=-1, must_exist=True,
_chmod_fn=os.chmod, _chown_fn=os.chown, _stat_fn=os.stat):
"""Enforces that given path has given permissions.
@param path: The path to the file
@param mode: The mode of the file
@param uid: The uid of the owner of this file
@param gid: The gid of the owner of this file
@param must_exist: Specifies if non-existance of path will be an error
@param _chmod_fn: chmod function to use (unittest only)
@param _chown_fn: chown function to use (unittest only)
"""
logging.debug("Checking %s", path)
try:
st = _stat_fn(path)
fmode = stat.S_IMODE(st[stat.ST_MODE])
if fmode != mode:
logging.debug("Changing mode of %s from %#o to %#o", path, fmode, mode)
_chmod_fn(path, mode)
if max(uid, gid) > -1:
fuid = st[stat.ST_UID]
fgid = st[stat.ST_GID]
if fuid != uid or fgid != gid:
logging.debug("Changing owner of %s from UID %s/GID %s to"
" UID %s/GID %s", path, fuid, fgid, uid, gid)
_chown_fn(path, uid, gid)
except EnvironmentError, err:
if err.errno == errno.ENOENT:
if must_exist:
raise errors.GenericError("Path %s should exist, but does not" % path)
else:
raise errors.GenericError("Error while changing permissions on %s: %s" %
(path, err))
def MakeDirWithPerm(path, mode, uid, gid, _lstat_fn=os.lstat,
_mkdir_fn=os.mkdir, _perm_fn=EnforcePermission):
"""Enforces that given path is a dir and has given mode, uid and gid set.
@param path: The path to the file
@param mode: The mode of the file
@param uid: The uid of the owner of this file
@param gid: The gid of the owner of this file
@param _lstat_fn: Stat function to use (unittest only)
@param _mkdir_fn: mkdir function to use (unittest only)
@param _perm_fn: permission setter function to use (unittest only)
"""
logging.debug("Checking directory %s", path)
try:
# We don't want to follow symlinks
st = _lstat_fn(path)
except EnvironmentError, err:
if err.errno != errno.ENOENT:
raise errors.GenericError("stat(2) on %s failed: %s" % (path, err))
_mkdir_fn(path)
else:
if not stat.S_ISDIR(st[stat.ST_MODE]):
raise errors.GenericError(("Path %s is expected to be a directory, but "
"isn't") % path)
_perm_fn(path, mode, uid=uid, gid=gid)
def Makedirs(path, mode=0750):
"""Super-mkdir; create a leaf directory and all intermediate ones.
......
......@@ -21,8 +21,6 @@
"""Script for testing ganeti.tools.ensure_dirs"""
import errno
import stat
import unittest
import os.path
......@@ -31,127 +29,7 @@ from ganeti.tools import ensure_dirs
import testutils
def _MockStatResult(cb, mode, uid, gid):
def _fn(path):
if cb:
cb()
return {
stat.ST_MODE: mode,
stat.ST_UID: uid,
stat.ST_GID: gid,
}
return _fn
def _RaiseNoEntError():
raise EnvironmentError(errno.ENOENT, "not found")
def _OtherStatRaise():
raise EnvironmentError()
class TestEnsureDirsFunctions(unittest.TestCase):
UID_A = 16024
UID_B = 25850
GID_A = 14028
GID_B = 29801
def setUp(self):
self._chown_calls = []
self._chmod_calls = []
self._mkdir_calls = []
def tearDown(self):
self.assertRaises(IndexError, self._mkdir_calls.pop)
self.assertRaises(IndexError, self._chmod_calls.pop)
self.assertRaises(IndexError, self._chown_calls.pop)
def _FakeMkdir(self, path):
self._mkdir_calls.append(path)
def _FakeChown(self, path, uid, gid):
self._chown_calls.append((path, uid, gid))
def _ChmodWrapper(self, cb):
def _fn(path, mode):
self._chmod_calls.append((path, mode))
if cb:
cb()
return _fn
def _VerifyEnsure(self, path, mode, uid=-1, gid=-1):
self.assertEqual(path, "/ganeti-qa-non-test")
self.assertEqual(mode, 0700)
self.assertEqual(uid, self.UID_A)
self.assertEqual(gid, self.GID_A)
def testEnsureDir(self):
is_dir_stat = _MockStatResult(None, stat.S_IFDIR, 0, 0)
ensure_dirs.EnsureDir("/ganeti-qa-non-test", 0700, self.UID_A, self.GID_A,
_lstat_fn=is_dir_stat, _ensure_fn=self._VerifyEnsure)
def testEnsureDirErrors(self):
self.assertRaises(ensure_dirs.EnsureError, ensure_dirs.EnsureDir,
"/ganeti-qa-non-test", 0700, 0, 0,
_lstat_fn=_MockStatResult(None, 0, 0, 0))
self.assertRaises(IndexError, self._mkdir_calls.pop)
other_stat_raise = _MockStatResult(_OtherStatRaise, stat.S_IFDIR, 0, 0)
self.assertRaises(ensure_dirs.EnsureError, ensure_dirs.EnsureDir,
"/ganeti-qa-non-test", 0700, 0, 0,
_lstat_fn=other_stat_raise)
self.assertRaises(IndexError, self._mkdir_calls.pop)
non_exist_stat = _MockStatResult(_RaiseNoEntError, stat.S_IFDIR, 0, 0)
ensure_dirs.EnsureDir("/ganeti-qa-non-test", 0700, self.UID_A, self.GID_A,
_lstat_fn=non_exist_stat, _mkdir_fn=self._FakeMkdir,
_ensure_fn=self._VerifyEnsure)
self.assertEqual(self._mkdir_calls.pop(0), "/ganeti-qa-non-test")
def testEnsurePermissionNoEnt(self):
self.assertRaises(ensure_dirs.EnsureError, ensure_dirs.EnsurePermission,
"/ganeti-qa-non-test", 0600,
_chmod_fn=NotImplemented, _chown_fn=NotImplemented,
_stat_fn=_MockStatResult(_RaiseNoEntError, 0, 0, 0))
def testEnsurePermissionNoEntMustNotExist(self):
ensure_dirs.EnsurePermission("/ganeti-qa-non-test", 0600, must_exist=False,
_chmod_fn=NotImplemented,
_chown_fn=NotImplemented,
_stat_fn=_MockStatResult(_RaiseNoEntError,
0, 0, 0))
def testEnsurePermissionOtherErrorMustNotExist(self):
self.assertRaises(ensure_dirs.EnsureError, ensure_dirs.EnsurePermission,
"/ganeti-qa-non-test", 0600, must_exist=False,
_chmod_fn=NotImplemented, _chown_fn=NotImplemented,
_stat_fn=_MockStatResult(_OtherStatRaise, 0, 0, 0))
def testEnsurePermissionNoChanges(self):
ensure_dirs.EnsurePermission("/ganeti-qa-non-test", 0600,
_stat_fn=_MockStatResult(None, 0600, 0, 0),
_chmod_fn=self._ChmodWrapper(None),
_chown_fn=self._FakeChown)
def testEnsurePermissionChangeMode(self):
ensure_dirs.EnsurePermission("/ganeti-qa-non-test", 0444,
_stat_fn=_MockStatResult(None, 0600, 0, 0),
_chmod_fn=self._ChmodWrapper(None),
_chown_fn=self._FakeChown)
self.assertEqual(self._chmod_calls.pop(0), ("/ganeti-qa-non-test", 0444))
def testEnsurePermissionSetUidGid(self):
ensure_dirs.EnsurePermission("/ganeti-qa-non-test", 0600,
uid=self.UID_B, gid=self.GID_B,
_stat_fn=_MockStatResult(None, 0600,
self.UID_A,
self.GID_A),
_chmod_fn=self._ChmodWrapper(None),
_chown_fn=self._FakeChown)
self.assertEqual(self._chown_calls.pop(0),
("/ganeti-qa-non-test", self.UID_B, self.GID_B))
def testPaths(self):
paths = [(path[0], path[1]) for path in ensure_dirs.GetPaths()]
......
......@@ -28,6 +28,8 @@ import shutil
import glob
import time
import signal
import stat
import errno
from ganeti import constants
from ganeti import utils
......@@ -802,5 +804,127 @@ class TestNewUUID(unittest.TestCase):
self.failUnless(utils.UUID_RE.match(utils.NewUUID()))
def _MockStatResult(cb, mode, uid, gid):
def _fn(path):
if cb:
cb()
return {
stat.ST_MODE: mode,
stat.ST_UID: uid,
stat.ST_GID: gid,
}
return _fn
def _RaiseNoEntError():
raise EnvironmentError(errno.ENOENT, "not found")
def _OtherStatRaise():
raise EnvironmentError()
class TestPermissionEnforcements(unittest.TestCase):
UID_A = 16024
UID_B = 25850
GID_A = 14028
GID_B = 29801
def setUp(self):
self._chown_calls = []
self._chmod_calls = []
self._mkdir_calls = []
def tearDown(self):
self.assertRaises(IndexError, self._mkdir_calls.pop)
self.assertRaises(IndexError, self._chmod_calls.pop)
self.assertRaises(IndexError, self._chown_calls.pop)
def _FakeMkdir(self, path):
self._mkdir_calls.append(path)
def _FakeChown(self, path, uid, gid):
self._chown_calls.append((path, uid, gid))
def _ChmodWrapper(self, cb):
def _fn(path, mode):
self._chmod_calls.append((path, mode))
if cb:
cb()
return _fn
def _VerifyPerm(self, path, mode, uid=-1, gid=-1):
self.assertEqual(path, "/ganeti-qa-non-test")
self.assertEqual(mode, 0700)
self.assertEqual(uid, self.UID_A)
self.assertEqual(gid, self.GID_A)
def testMakeDirWithPerm(self):
is_dir_stat = _MockStatResult(None, stat.S_IFDIR, 0, 0)
utils.MakeDirWithPerm("/ganeti-qa-non-test", 0700, self.UID_A, self.GID_A,
_lstat_fn=is_dir_stat, _perm_fn=self._VerifyPerm)
def testDirErrors(self):
self.assertRaises(errors.GenericError, utils.MakeDirWithPerm,
"/ganeti-qa-non-test", 0700, 0, 0,
_lstat_fn=_MockStatResult(None, 0, 0, 0))
self.assertRaises(IndexError, self._mkdir_calls.pop)
other_stat_raise = _MockStatResult(_OtherStatRaise, stat.S_IFDIR, 0, 0)
self.assertRaises(errors.GenericError, utils.MakeDirWithPerm,
"/ganeti-qa-non-test", 0700, 0, 0,
_lstat_fn=other_stat_raise)
self.assertRaises(IndexError, self._mkdir_calls.pop)
non_exist_stat = _MockStatResult(_RaiseNoEntError, stat.S_IFDIR, 0, 0)
utils.MakeDirWithPerm("/ganeti-qa-non-test", 0700, self.UID_A, self.GID_A,
_lstat_fn=non_exist_stat, _mkdir_fn=self._FakeMkdir,
_perm_fn=self._VerifyPerm)
self.assertEqual(self._mkdir_calls.pop(0), "/ganeti-qa-non-test")
def testEnforcePermissionNoEnt(self):
self.assertRaises(errors.GenericError, utils.EnforcePermission,
"/ganeti-qa-non-test", 0600,
_chmod_fn=NotImplemented, _chown_fn=NotImplemented,
_stat_fn=_MockStatResult(_RaiseNoEntError, 0, 0, 0))
def testEnforcePermissionNoEntMustNotExist(self):
utils.EnforcePermission("/ganeti-qa-non-test", 0600, must_exist=False,
_chmod_fn=NotImplemented,
_chown_fn=NotImplemented,
_stat_fn=_MockStatResult(_RaiseNoEntError,
0, 0, 0))
def testEnforcePermissionOtherErrorMustNotExist(self):
self.assertRaises(errors.GenericError, utils.EnforcePermission,
"/ganeti-qa-non-test", 0600, must_exist=False,
_chmod_fn=NotImplemented, _chown_fn=NotImplemented,
_stat_fn=_MockStatResult(_OtherStatRaise, 0, 0, 0))
def testEnforcePermissionNoChanges(self):
utils.EnforcePermission("/ganeti-qa-non-test", 0600,
_stat_fn=_MockStatResult(None, 0600, 0, 0),
_chmod_fn=self._ChmodWrapper(None),
_chown_fn=self._FakeChown)
def testEnforcePermissionChangeMode(self):
utils.EnforcePermission("/ganeti-qa-non-test", 0444,
_stat_fn=_MockStatResult(None, 0600, 0, 0),
_chmod_fn=self._ChmodWrapper(None),
_chown_fn=self._FakeChown)
self.assertEqual(self._chmod_calls.pop(0), ("/ganeti-qa-non-test", 0444))
def testEnforcePermissionSetUidGid(self):
utils.EnforcePermission("/ganeti-qa-non-test", 0600,
uid=self.UID_B, gid=self.GID_B,
_stat_fn=_MockStatResult(None, 0600,
self.UID_A,
self.GID_A),
_chmod_fn=self._ChmodWrapper(None),
_chown_fn=self._FakeChown)
self.assertEqual(self._chown_calls.pop(0),
("/ganeti-qa-non-test", self.UID_B, self.GID_B))
if __name__ == "__main__":
testutils.GanetiTestProgram()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment