Commit be36a7d9 authored by Sofia Papagiannaki's avatar Sofia Papagiannaki
Browse files

pithos: Reduce database interactions for access control.

Each frontend method calls several backend methods.
Each backend method checks whether the user has permission to access the
specific path.
This results to several identical queries to the database.
This commit introduces the following optimization:
We keep 2 dictionaries:
* one for the allowed paths for read per user and
* another for the allowed paths for write per user
The lifespan of these dictionaries is a database transaction.
The dictionaries are updated appropriately after each successful query for
permissions and after actions that affect permissions
(delete & update object permissions).
Especially in the latter case the dictionaries are reset because it was
estimated that this is less expensive than identifying the affected paths.

Conflicts:
	snf-pithos-backend/pithos/backends/modular.py
parent 5fc0a98c
......@@ -37,6 +37,7 @@ import logging
import hashlib
import binascii
from collections import defaultdict
from functools import wraps, partial
from traceback import format_exc
......@@ -164,6 +165,53 @@ def debug_method(func):
return wrapper
def check_allowed_paths(action):
"""Decorator for backend methods checking path access granted to user.
The 1st argument of the decorated method is expected to be a
ModularBackend instance, the 2nd the user performing the request and
the path join of the rest arguments is supposed to be the requested path.
The decorator checks whether the requested path is among the user's allowed
cached paths.
If this is the case, the decorator returns immediately to reduce the
interactions with the database.
Otherwise, it proceeds with the execution of the decorated method and if
the method returns successfully (no exceptions are raised), the requested
path is added to the user's cached allowed paths.
:param action: (int) 0 for reads / 1 for writes
:raises NotAllowedError: the user does not have access to the path
"""
def decorator(func):
@wraps(func)
def wrapper(self, *args):
user = args[0]
if action == self.READ:
d = self.read_allowed_paths
else:
d = self.write_allowed_paths
path = '/'.join(args[1:])
if path in d.get(user, []):
return # access is already checked
else:
func(self, *args) # proceed with access check
d[user].add(path) # add path in the allowed user paths
return wrapper
return decorator
def list_method(func):
@wraps(func)
def wrapper(self, *args, **kw):
marker = kw.get('marker')
limit = kw.get('limit')
result = func(self, *args, **kw)
start, limit = self._list_limits(result, marker, limit)
return result[start:start + limit]
return wrapper
class ModularBackend(BaseBackend):
"""A modular backend.
......@@ -282,10 +330,13 @@ class ModularBackend(BaseBackend):
self.in_transaction = False
self._reset_allowed_paths()
def pre_exec(self, lock_container_path=False):
self.lock_container_path = lock_container_path
self.wrapper.execute()
self.serials = []
self._reset_allowed_paths()
self.in_transaction = True
def post_exec(self, success_status=True):
......@@ -353,10 +404,10 @@ class ModularBackend(BaseBackend):
include_user_defined=True):
"""Return a dictionary with the account metadata for the domain."""
self._can_read_account(user, account)
path, node = self._lookup_account(account, user == account)
if user != account:
if until or (node is None) or (account not
in self._allowed_accounts(user)):
if until or (node is None):
raise NotAllowedError
try:
props = self._get_properties(node, until)
......@@ -394,8 +445,7 @@ class ModularBackend(BaseBackend):
def update_account_meta(self, user, account, domain, meta, replace=False):
"""Update the metadata associated with the account for the domain."""
if user != account:
raise NotAllowedError
self._can_write_account(user, account)
path, node = self._lookup_account(account, True)
self._put_metadata(user, node, domain, meta, replace,
update_statistics_ancestors_depth=-1)
......@@ -405,9 +455,8 @@ class ModularBackend(BaseBackend):
def get_account_groups(self, user, account):
"""Return a dictionary with the user groups defined for the account."""
self._can_read_account(user, account)
if user != account:
if account not in self._allowed_accounts(user):
raise NotAllowedError
return {}
self._lookup_account(account, True)
return self.permissions.group_dict(account)
......@@ -417,8 +466,7 @@ class ModularBackend(BaseBackend):
def update_account_groups(self, user, account, groups, replace=False):
"""Update the groups associated with the account."""
if user != account:
raise NotAllowedError
self._can_write_account(user, account)
self._lookup_account(account, True)
self._check_groups(groups)
if replace:
......@@ -434,9 +482,8 @@ class ModularBackend(BaseBackend):
def get_account_policy(self, user, account):
"""Return a dictionary with the account policy."""
self._can_read_account(user, account)
if user != account:
if account not in self._allowed_accounts(user):
raise NotAllowedError
return {}
path, node = self._lookup_account(account, True)
policy = self._get_policy(node, is_account_policy=True)
......@@ -450,8 +497,7 @@ class ModularBackend(BaseBackend):
def update_account_policy(self, user, account, policy, replace=False):
"""Update the policy associated with the account."""
if user != account:
raise NotAllowedError
self._can_write_account(user, account)
path, node = self._lookup_account(account, True)
self._check_policy(policy, is_account_policy=True)
self._put_policy(node, policy, replace, is_account_policy=True)
......@@ -462,8 +508,7 @@ class ModularBackend(BaseBackend):
"""Create a new account with the given name."""
policy = policy or {}
if user != account:
raise NotAllowedError
self._can_write_account(user, account)
node = self.node.node_lookup(account)
if node is not None:
raise AccountExists('Account already exists')
......@@ -478,8 +523,7 @@ class ModularBackend(BaseBackend):
def delete_account(self, user, account):
"""Delete the account with the given name."""
if user != account:
raise NotAllowedError
self._can_write_account(user, account)
node = self.node.node_lookup(account)
if node is None:
return
......@@ -488,14 +532,19 @@ class ModularBackend(BaseBackend):
raise AccountNotEmpty('Account is not empty')
self.permissions.group_destroy(account)
# remove all the cached allowed paths
# removing the specific path could be more expensive
self._reset_allowed_paths()
@debug_method
@backend_method
def list_containers(self, user, account, marker=None, limit=10000,
shared=False, until=None, public=False):
"""Return a list of containers existing under an account."""
self._can_read_account(user, account)
if user != account:
if until or account not in self._allowed_accounts(user):
if until:
raise NotAllowedError
allowed = self._allowed_containers(user, account)
start, limit = self._list_limits(allowed, marker, limit)
......@@ -524,14 +573,11 @@ class ModularBackend(BaseBackend):
until=None):
"""Return a list of the container's object meta keys for a domain."""
self._can_read_container(user, account, container)
allowed = []
if user != account:
if until:
raise NotAllowedError
allowed = self.permissions.access_list_paths(
user, '/'.join((account, container)))
if not allowed:
raise NotAllowedError
path, node = self._lookup_container(account, container)
before = until if until is not None else inf
allowed = self._get_formatted_paths(allowed)
......@@ -544,9 +590,9 @@ class ModularBackend(BaseBackend):
include_user_defined=True):
"""Return a dictionary with the container metadata for the domain."""
self._can_read_container(user, account, container)
if user != account:
if until or container not in self._allowed_containers(user,
account):
if until:
raise NotAllowedError
path, node = self._lookup_container(account, container)
props = self._get_properties(node, until)
......@@ -579,8 +625,7 @@ class ModularBackend(BaseBackend):
replace=False):
"""Update the metadata associated with the container for the domain."""
if user != account:
raise NotAllowedError
self._can_write_container(user, account, container)
path, node = self._lookup_container(account, container)
src_version_id, dest_version_id = self._put_metadata(
user, node, domain, meta, replace,
......@@ -597,9 +642,8 @@ class ModularBackend(BaseBackend):
def get_container_policy(self, user, account, container):
"""Return a dictionary with the container policy."""
self._can_read_container(user, account, container)
if user != account:
if container not in self._allowed_containers(user, account):
raise NotAllowedError
return {}
path, node = self._lookup_container(account, container)
return self._get_policy(node, is_account_policy=False)
......@@ -610,8 +654,7 @@ class ModularBackend(BaseBackend):
replace=False):
"""Update the policy associated with the container."""
if user != account:
raise NotAllowedError
self._can_write_container(user, account, container)
path, node = self._lookup_container(account, container)
self._check_policy(policy, is_account_policy=False)
self._put_policy(node, policy, replace, is_account_policy=False)
......@@ -622,8 +665,7 @@ class ModularBackend(BaseBackend):
"""Create a new container with the given name."""
policy = policy or {}
if user != account:
raise NotAllowedError
self._can_write_container(user, account, container)
try:
path, node = self._lookup_container(account, container)
except NameError:
......@@ -644,8 +686,7 @@ class ModularBackend(BaseBackend):
delimiter=None):
"""Delete/purge the container with the given name."""
if user != account:
raise NotAllowedError
self._can_write_container(user, account, container)
path, node = self._lookup_container(account, container)
if until is not None:
......@@ -714,6 +755,10 @@ class ModularBackend(BaseBackend):
paths.append(path)
self.permissions.access_clear_bulk(paths)
# remove all the cached allowed paths
# removing the specific path could be more expensive
self._reset_allowed_paths()
def _list_objects(self, user, account, container, prefix, delimiter,
marker, limit, virtual, domain, keys, shared, until,
size_range, all_props, public):
......@@ -878,7 +923,7 @@ class ModularBackend(BaseBackend):
version=None, include_user_defined=True):
"""Return a dictionary with the object metadata for the domain."""
self._can_read(user, account, container, name)
self._can_read_object(user, account, container, name)
path, node = self._lookup_object(account, container, name)
props = self._get_version(node, version)
if version is None:
......@@ -916,7 +961,7 @@ class ModularBackend(BaseBackend):
replace=False):
"""Update object metadata for a domain and return the new version."""
self._can_write(user, account, container, name)
self._can_write_object(user, account, container, name)
path, node = self._lookup_object(account, container, name,
lock_container=True)
......@@ -999,12 +1044,16 @@ class ModularBackend(BaseBackend):
self._report_sharing_change(user, account, path, {'members':
self.permissions.access_members(path)})
# remove all the cached allowed paths
# filtering out only those affected could be more expensive
self._reset_allowed_paths()
@debug_method
@backend_method
def get_object_public(self, user, account, container, name):
"""Return the public id of the object if applicable."""
self._can_read(user, account, container, name)
self._can_read_object(user, account, container, name)
path = self._lookup_object(account, container, name)[0]
p = self.permissions.public_get(path)
return p
......@@ -1014,7 +1063,7 @@ class ModularBackend(BaseBackend):
def update_object_public(self, user, account, container, name, public):
"""Update the public status of the object."""
self._can_write(user, account, container, name)
self._can_write_object(user, account, container, name)
path = self._lookup_object(account, container, name,
lock_container=True)[0]
if not public:
......@@ -1028,7 +1077,7 @@ class ModularBackend(BaseBackend):
def get_object_hashmap(self, user, account, container, name, version=None):
"""Return the object's size and a list with partial hashes."""
self._can_read(user, account, container, name)
self._can_read_object(user, account, container, name)
path, node = self._lookup_object(account, container, name)
props = self._get_version(node, version)
if props[self.HASH] is None:
......@@ -1042,7 +1091,7 @@ class ModularBackend(BaseBackend):
is_copy=False, report_size_change=True):
if permissions is not None and user != account:
raise NotAllowedError
self._can_write(user, account, container, name)
self._can_write_object(user, account, container, name)
if permissions is not None:
path = '/'.join((account, container, name))
self._check_permissions(path, permissions)
......@@ -1142,7 +1191,7 @@ class ModularBackend(BaseBackend):
# Update objects with greater version and same hashmap
# and size (fix metadata updates).
self._can_write(user, account, container, name)
self._can_write_object(user, account, container, name)
path, node = self._lookup_object(account, container, name,
lock_container=True)
props = self._get_version(node, version)
......@@ -1163,7 +1212,7 @@ class ModularBackend(BaseBackend):
report_size_change = not is_move
dest_meta = dest_meta or {}
dest_version_ids = []
self._can_read(user, src_account, src_container, src_name)
self._can_read_object(user, src_account, src_container, src_name)
src_container_path = '/'.join((src_account, src_container))
dest_container_path = '/'.join((dest_account, dest_container))
......@@ -1353,6 +1402,10 @@ class ModularBackend(BaseBackend):
paths.append(path)
self.permissions.access_clear_bulk(paths)
# remove all the cached allowed paths
# removing the specific path could be more expensive
self._reset_allowed_paths()
@debug_method
@backend_method
def delete_object(self, user, account, container, name, until=None,
......@@ -1366,7 +1419,7 @@ class ModularBackend(BaseBackend):
def list_versions(self, user, account, container, name):
"""Return a list of all object (version, version_timestamp) tuples."""
self._can_read(user, account, container, name)
self._can_read_object(user, account, container, name)
path, node = self._lookup_object(account, container, name)
versions = self.node.node_get_versions(node)
return [[x[self.SERIAL], x[self.MTIME]] for x in versions if
......@@ -1383,7 +1436,7 @@ class ModularBackend(BaseBackend):
path, serial = info
account, container, name = path.split('/', 2)
if check_permissions:
self._can_read(user, account, container, name)
self._can_read_object(user, account, container, name)
return (account, container, name)
@debug_method
......@@ -1395,7 +1448,7 @@ class ModularBackend(BaseBackend):
if path is None:
raise NameError
account, container, name = path.split('/', 2)
self._can_read(user, account, container, name)
self._can_read_object(user, account, container, name)
return (account, container, name)
def get_block(self, hash):
......@@ -1807,7 +1860,34 @@ class ModularBackend(BaseBackend):
return None
def _can_read(self, user, account, container, name):
def _reset_allowed_paths(self):
self.read_allowed_paths = defaultdict(set)
self.write_allowed_paths = defaultdict(set)
@check_allowed_paths(action=0)
def _can_read_account(self, user, account):
if user != account:
if account not in self._allowed_accounts(user):
raise NotAllowedError
@check_allowed_paths(action=1)
def _can_write_account(self, user, account):
if user != account:
raise NotAllowedError
@check_allowed_paths(action=0)
def _can_read_container(self, user, account, container):
if user != account:
if container not in self._allowed_containers(user, account):
raise NotAllowedError
@check_allowed_paths(action=1)
def _can_write_container(self, user, account, container):
if user != account:
raise NotAllowedError
@check_allowed_paths(action=0)
def _can_read_object(self, user, account, container, name):
if user == account:
return True
path = '/'.join((account, container, name))
......@@ -1820,7 +1900,8 @@ class ModularBackend(BaseBackend):
self.permissions.access_check(path, self.WRITE, user)):
raise NotAllowedError
def _can_write(self, user, account, container, name):
@check_allowed_paths(action=1)
def _can_write_object(self, user, account, container, name):
if user == account:
return True
path = '/'.join((account, container, name))
......@@ -1833,13 +1914,17 @@ class ModularBackend(BaseBackend):
def _allowed_accounts(self, user):
allow = set()
for path in self.permissions.access_list_paths(user):
allow.add(path.split('/', 1)[0])
p = path.split('/', 1)[0]
allow.add(p)
self.read_allowed_paths[user] |= allow
return sorted(allow)
def _allowed_containers(self, user, account):
allow = set()
for path in self.permissions.access_list_paths(user, account):
allow.add(path.split('/', 2)[1])
p = path.split('/', 2)[1]
allow.add(p)
self.read_allowed_paths[user] |= allow
return sorted(allow)
# Domain functions
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment