Commit c3f54085 authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

ensure-dirs: Check mode and owner before changing



This avoids many calls to chmod(2) and chown(2), and thereby ctime
updates.

Since I had to update the unittests anyway I untangled the code a bit,
split it into more separate functions and added some more tests.
Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarRené Nussbaumer <rn@google.com>
parent d00a730d
......@@ -56,7 +56,7 @@ class EnsureError(errors.GenericError):
def EnsurePermission(path, mode, uid=-1, gid=-1, must_exist=True,
_chmod_fn=os.chmod, _chown_fn=os.chown):
_chmod_fn=os.chmod, _chown_fn=os.chown, _stat_fn=os.stat):
"""Ensures that given path has given mode.
@param path: The path to the file
......@@ -70,10 +70,20 @@ def EnsurePermission(path, mode, uid=-1, gid=-1, must_exist=True,
"""
logging.debug("Checking %s", path)
try:
_chmod_fn(path, mode)
st = _stat_fn(path)
fmode = stat.S_IMODE(st[stat.ST_MODE])
if fmode != mode:
logging.debug("Changing mode of %s from %#o to %#o", path, fmode, mode)
_chmod_fn(path, mode)
if max(uid, gid) > -1:
_chown_fn(path, uid, gid)
fuid = st[stat.ST_UID]
fgid = st[stat.ST_GID]
if fuid != uid or fgid != gid:
logging.debug("Changing owner of %s from UID %s/GID %s to"
" UID %s/GID %s", path, fuid, fgid, uid, gid)
_chown_fn(path, uid, gid)
except EnvironmentError, err:
if err.errno == errno.ENOENT:
if must_exist:
......
......@@ -31,91 +31,126 @@ from ganeti.tools import ensure_dirs
import testutils
def _MockStatResult(cb, mode, uid, gid):
def _fn(path):
if cb:
cb()
return {
stat.ST_MODE: mode,
stat.ST_UID: uid,
stat.ST_GID: gid,
}
return _fn
def _RaiseNoEntError():
raise EnvironmentError(errno.ENOENT, "not found")
def _OtherStatRaise():
raise EnvironmentError()
class TestEnsureDirsFunctions(unittest.TestCase):
def _NoopMkdir(self, _):
self.mkdir_called = True
@staticmethod
def _MockStatResult(mode, pre_fn=lambda: 0):
def _fn(path):
pre_fn()
return {stat.ST_MODE: mode}
return _fn
UID_A = 16024
UID_B = 25850
GID_A = 14028
GID_B = 29801
def _VerifyEnsure(self, path, mode, uid=-1, gid=-1):
self.assertEqual(path, "/ganeti-qa-non-test")
self.assertEqual(mode, 0700)
self.assertEqual(uid, 0)
self.assertEqual(gid, 0)
def setUp(self):
self._chown_calls = []
self._chmod_calls = []
self._mkdir_calls = []
def tearDown(self):
self.assertRaises(IndexError, self._mkdir_calls.pop)
self.assertRaises(IndexError, self._chmod_calls.pop)
self.assertRaises(IndexError, self._chown_calls.pop)
@staticmethod
def _RaiseNoEntError():
noent_error = EnvironmentError()
noent_error.errno = errno.ENOENT
raise noent_error
def _FakeMkdir(self, path):
self._mkdir_calls.append(path)
@staticmethod
def _OtherStatRaise():
raise EnvironmentError()
def _FakeChown(self, path, uid, gid):
self._chown_calls.append((path, uid, gid))
def _ChmodWrapper(self, pre_fn=lambda: 0):
def _ChmodWrapper(self, cb):
def _fn(path, mode):
self.chmod_called = True
pre_fn()
self._chmod_calls.append((path, mode))
if cb:
cb()
return _fn
def _NoopChown(self, path, uid, gid):
self.chown_called = True
def _VerifyEnsure(self, path, mode, uid=-1, gid=-1):
self.assertEqual(path, "/ganeti-qa-non-test")
self.assertEqual(mode, 0700)
self.assertEqual(uid, self.UID_A)
self.assertEqual(gid, self.GID_A)
def testEnsureDir(self):
is_dir_stat = self._MockStatResult(stat.S_IFDIR)
not_dir_stat = self._MockStatResult(0)
non_exist_stat = self._MockStatResult(stat.S_IFDIR,
pre_fn=self._RaiseNoEntError)
other_stat_raise = self._MockStatResult(stat.S_IFDIR,
pre_fn=self._OtherStatRaise)
is_dir_stat = _MockStatResult(None, stat.S_IFDIR, 0, 0)
ensure_dirs.EnsureDir("/ganeti-qa-non-test", 0700, self.UID_A, self.GID_A,
_lstat_fn=is_dir_stat, _ensure_fn=self._VerifyEnsure)
def testEnsureDirErrors(self):
self.assertRaises(ensure_dirs.EnsureError, ensure_dirs.EnsureDir,
"/ganeti-qa-non-test", 0700, 0, 0,
_lstat_fn=not_dir_stat)
_lstat_fn=_MockStatResult(None, 0, 0, 0))
self.assertRaises(IndexError, self._mkdir_calls.pop)
other_stat_raise = _MockStatResult(_OtherStatRaise, stat.S_IFDIR, 0, 0)
self.assertRaises(ensure_dirs.EnsureError, ensure_dirs.EnsureDir,
"/ganeti-qa-non-test", 0700, 0, 0,
_lstat_fn=other_stat_raise)
self.mkdir_called = False
ensure_dirs.EnsureDir("/ganeti-qa-non-test", 0700, 0, 0,
_lstat_fn=non_exist_stat, _mkdir_fn=self._NoopMkdir,
self.assertRaises(IndexError, self._mkdir_calls.pop)
non_exist_stat = _MockStatResult(_RaiseNoEntError, stat.S_IFDIR, 0, 0)
ensure_dirs.EnsureDir("/ganeti-qa-non-test", 0700, self.UID_A, self.GID_A,
_lstat_fn=non_exist_stat, _mkdir_fn=self._FakeMkdir,
_ensure_fn=self._VerifyEnsure)
self.assertTrue(self.mkdir_called)
self.mkdir_called = False
ensure_dirs.EnsureDir("/ganeti-qa-non-test", 0700, 0, 0,
_lstat_fn=is_dir_stat, _ensure_fn=self._VerifyEnsure)
self.assertFalse(self.mkdir_called)
self.assertEqual(self._mkdir_calls.pop(0), "/ganeti-qa-non-test")
def testEnsurePermission(self):
noent_chmod_fn = self._ChmodWrapper(pre_fn=self._RaiseNoEntError)
def testEnsurePermissionNoEnt(self):
self.assertRaises(ensure_dirs.EnsureError, ensure_dirs.EnsurePermission,
"/ganeti-qa-non-test", 0600,
_chmod_fn=noent_chmod_fn)
self.chmod_called = False
_chmod_fn=NotImplemented, _chown_fn=NotImplemented,
_stat_fn=_MockStatResult(_RaiseNoEntError, 0, 0, 0))
def testEnsurePermissionNoEntMustNotExist(self):
ensure_dirs.EnsurePermission("/ganeti-qa-non-test", 0600, must_exist=False,
_chmod_fn=noent_chmod_fn)
self.assertTrue(self.chmod_called)
_chmod_fn=NotImplemented,
_chown_fn=NotImplemented,
_stat_fn=_MockStatResult(_RaiseNoEntError,
0, 0, 0))
def testEnsurePermissionOtherErrorMustNotExist(self):
self.assertRaises(ensure_dirs.EnsureError, ensure_dirs.EnsurePermission,
"/ganeti-qa-non-test", 0600, must_exist=False,
_chmod_fn=self._ChmodWrapper(pre_fn=self._OtherStatRaise))
self.chmod_called = False
self.chown_called = False
_chmod_fn=NotImplemented, _chown_fn=NotImplemented,
_stat_fn=_MockStatResult(_OtherStatRaise, 0, 0, 0))
def testEnsurePermissionNoChanges(self):
ensure_dirs.EnsurePermission("/ganeti-qa-non-test", 0600,
_stat_fn=_MockStatResult(None, 0600, 0, 0),
_chmod_fn=self._ChmodWrapper(None),
_chown_fn=self._FakeChown)
def testEnsurePermissionChangeMode(self):
ensure_dirs.EnsurePermission("/ganeti-qa-non-test", 0444,
_stat_fn=_MockStatResult(None, 0600, 0, 0),
_chmod_fn=self._ChmodWrapper(None),
_chown_fn=self._FakeChown)
self.assertEqual(self._chmod_calls.pop(0), ("/ganeti-qa-non-test", 0444))
def testEnsurePermissionSetUidGid(self):
ensure_dirs.EnsurePermission("/ganeti-qa-non-test", 0600,
_chmod_fn=self._ChmodWrapper(),
_chown_fn=self._NoopChown)
self.assertTrue(self.chmod_called)
self.assertFalse(self.chown_called)
self.chmod_called = False
ensure_dirs.EnsurePermission("/ganeti-qa-non-test", 0600, uid=1, gid=1,
_chmod_fn=self._ChmodWrapper(),
_chown_fn=self._NoopChown)
self.assertTrue(self.chmod_called)
self.assertTrue(self.chown_called)
uid=self.UID_B, gid=self.GID_B,
_stat_fn=_MockStatResult(None, 0600,
self.UID_A,
self.GID_A),
_chmod_fn=self._ChmodWrapper(None),
_chown_fn=self._FakeChown)
self.assertEqual(self._chown_calls.pop(0),
("/ganeti-qa-non-test", self.UID_B, self.GID_B))
def testPaths(self):
paths = [(path[0], path[1]) for path in ensure_dirs.GetPaths()]
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment