-
Sofia Papagiannaki authored83859000
modular.py 77.62 KiB
# Copyright 2011-2012 GRNET S.A. All rights reserved.
#
# Redistribution and use in source and binary forms, with or
# without modification, are permitted provided that the following
# conditions are met:
#
# 1. Redistributions of source code must retain the above
# copyright notice, this list of conditions and the following
# disclaimer.
#
# 2. Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials
# provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# The views and conclusions contained in the software and
# documentation are those of the authors and should not be
# interpreted as representing official policies, either expressed
# or implied, of GRNET S.A.
import sys
import uuid as uuidlib
import logging
import hashlib
import binascii
from collections import defaultdict
from functools import wraps, partial
from traceback import format_exc
try:
from astakosclient import AstakosClient
except ImportError:
AstakosClient = None
from base import (DEFAULT_ACCOUNT_QUOTA, DEFAULT_CONTAINER_QUOTA,
DEFAULT_CONTAINER_VERSIONING, NotAllowedError, QuotaError,
BaseBackend, AccountExists, ContainerExists, AccountNotEmpty,
ContainerNotEmpty, ItemNotExists, VersionNotExists,
InvalidHash)
class DisabledAstakosClient(object):
def __init__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
def __getattr__(self, name):
m = ("AstakosClient has been disabled, "
"yet an attempt to access it was made")
raise AssertionError(m)
# Stripped-down version of the HashMap class found in tools.
class HashMap(list):
def __init__(self, blocksize, blockhash):
super(HashMap, self).__init__()
self.blocksize = blocksize
self.blockhash = blockhash
def _hash_raw(self, v):
h = hashlib.new(self.blockhash)
h.update(v)
return h.digest()
def hash(self):
if len(self) == 0:
return self._hash_raw('')
if len(self) == 1:
return self.__getitem__(0)
h = list(self)
s = 2
while s < len(h):
s = s * 2
h += [('\x00' * len(h[0]))] * (s - len(h))
while len(h) > 1:
h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
return h[0]
# Default modules and settings.
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
DEFAULT_BLOCK_PATH = 'data/'
DEFAULT_BLOCK_UMASK = 0o022
DEFAULT_BLOCK_SIZE = 4 * 1024 * 1024 # 4MB
DEFAULT_HASH_ALGORITHM = 'sha256'
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
DEFAULT_BLOCK_PARAMS = {'mappool': None, 'blockpool': None}
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
DEFAULT_PUBLIC_URL_ALPHABET = ('0123456789'
'abcdefghijklmnopqrstuvwxyz'
'ABCDEFGHIJKLMNOPQRSTUVWXYZ')
DEFAULT_PUBLIC_URL_SECURITY = 16
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
QUEUE_CLIENT_ID = 'pithos'
QUEUE_INSTANCE_ID = '1'
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
inf = float('inf')
ULTIMATE_ANSWER = 42
DEFAULT_SOURCE = 'system'
DEFAULT_DISKSPACE_RESOURCE = 'pithos.diskspace'
logger = logging.getLogger(__name__)
def backend_method(func):
@wraps(func)
def wrapper(self, *args, **kw):
# if we are inside a database transaction
# just proceed with the method execution
# otherwise manage a new transaction
if self.in_transaction:
return func(self, *args, **kw)
try:
self.pre_exec()
result = func(self, *args, **kw)
success_status = True
return result
except:
success_status = False
raise
finally:
self.post_exec(success_status)
return wrapper
def debug_method(func):
@wraps(func)
def wrapper(self, *args, **kw):
try:
result = func(self, *args, **kw)
return result
except:
result = format_exc()
raise
finally:
all_args = map(repr, args)
map(all_args.append, ('%s=%s' % (k, v) for k, v in kw.iteritems()))
logger.debug(">>> %s(%s) <<< %s" % (
func.__name__, ', '.join(all_args).rstrip(', '), result))
return wrapper
def check_allowed_paths(action):
"""Decorator for backend methods checking path access granted to user.
The 1st argument of the decorated method is expected to be a
ModularBackend instance, the 2nd the user performing the request and
the path join of the rest arguments is supposed to be the requested path.
The decorator checks whether the requested path is among the user's allowed
cached paths.
If this is the case, the decorator returns immediately to reduce the
interactions with the database.
Otherwise, it proceeds with the execution of the decorated method and if
the method returns successfully (no exceptions are raised), the requested
path is added to the user's cached allowed paths.
:param action: (int) 0 for reads / 1 for writes
:raises NotAllowedError: the user does not have access to the path
"""
def decorator(func):
@wraps(func)
def wrapper(self, *args):
user = args[0]
if action == self.READ:
d = self.read_allowed_paths
else:
d = self.write_allowed_paths
path = '/'.join(args[1:])
if path in d.get(user, []):
return # access is already checked
else:
func(self, *args) # proceed with access check
d[user].add(path) # add path in the allowed user paths
return wrapper
return decorator
def list_method(func):
@wraps(func)
def wrapper(self, *args, **kw):
marker = kw.get('marker')
limit = kw.get('limit')
result = func(self, *args, **kw)
start, limit = self._list_limits(result, marker, limit)
return result[start:start + limit]
return wrapper
class ModularBackend(BaseBackend):
"""A modular backend.
Uses modules for SQL functions and storage.
"""
def __init__(self, db_module=None, db_connection=None,
block_module=None, block_path=None, block_umask=None,
block_size=None, hash_algorithm=None,
queue_module=None, queue_hosts=None, queue_exchange=None,
astakos_auth_url=None, service_token=None,
astakosclient_poolsize=None,
free_versioning=True, block_params=None,
public_url_security=None,
public_url_alphabet=None,
account_quota_policy=None,
container_quota_policy=None,
container_versioning_policy=None):
db_module = db_module or DEFAULT_DB_MODULE
db_connection = db_connection or DEFAULT_DB_CONNECTION
block_module = block_module or DEFAULT_BLOCK_MODULE
block_path = block_path or DEFAULT_BLOCK_PATH
block_umask = block_umask or DEFAULT_BLOCK_UMASK
block_params = block_params or DEFAULT_BLOCK_PARAMS
block_size = block_size or DEFAULT_BLOCK_SIZE
hash_algorithm = hash_algorithm or DEFAULT_HASH_ALGORITHM
#queue_module = queue_module or DEFAULT_QUEUE_MODULE
account_quota_policy = account_quota_policy or DEFAULT_ACCOUNT_QUOTA
container_quota_policy = container_quota_policy \
or DEFAULT_CONTAINER_QUOTA
container_versioning_policy = container_versioning_policy \
or DEFAULT_CONTAINER_VERSIONING
self.default_account_policy = {'quota': account_quota_policy}
self.default_container_policy = {
'quota': container_quota_policy,
'versioning': container_versioning_policy
}
#queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
#queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
self.public_url_security = (public_url_security or
DEFAULT_PUBLIC_URL_SECURITY)
self.public_url_alphabet = (public_url_alphabet or
DEFAULT_PUBLIC_URL_ALPHABET)
self.hash_algorithm = hash_algorithm
self.block_size = block_size
self.free_versioning = free_versioning
def load_module(m):
__import__(m)
return sys.modules[m]
self.db_module = load_module(db_module)
self.wrapper = self.db_module.DBWrapper(db_connection)
params = {'wrapper': self.wrapper}
self.permissions = self.db_module.Permissions(**params)
self.config = self.db_module.Config(**params)
self.commission_serials = self.db_module.QuotaholderSerial(**params)
for x in ['READ', 'WRITE']:
setattr(self, x, getattr(self.db_module, x))
self.node = self.db_module.Node(**params)
for x in ['ROOTNODE', 'SERIAL', 'NODE', 'HASH', 'SIZE', 'TYPE',
'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER',
'MATCH_PREFIX', 'MATCH_EXACT']:
setattr(self, x, getattr(self.db_module, x))
self.ALLOWED = ['read', 'write']
self.block_module = load_module(block_module)
self.block_params = block_params
params = {'path': block_path,
'block_size': self.block_size,
'hash_algorithm': self.hash_algorithm,
'umask': block_umask}
params.update(self.block_params)
self.store = self.block_module.Store(**params)
if queue_module and queue_hosts:
self.queue_module = load_module(queue_module)
params = {'hosts': queue_hosts,
'exchange': queue_exchange,
'client_id': QUEUE_CLIENT_ID}
self.queue = self.queue_module.Queue(**params)
else:
class NoQueue:
def send(self, *args):
pass
def close(self):
pass
self.queue = NoQueue()
self.astakos_auth_url = astakos_auth_url
self.service_token = service_token
if not astakos_auth_url or not AstakosClient:
self.astakosclient = DisabledAstakosClient(
service_token, astakos_auth_url,
use_pool=True,
pool_size=astakosclient_poolsize)
else:
self.astakosclient = AstakosClient(
service_token, astakos_auth_url,
use_pool=True,
pool_size=astakosclient_poolsize)
self.serials = []
self.messages = []
self._move_object = partial(self._copy_object, is_move=True)
self.lock_container_path = False
self.in_transaction = False
self._reset_allowed_paths()
def pre_exec(self, lock_container_path=False):
self.lock_container_path = lock_container_path
self.wrapper.execute()
self.serials = []
self._reset_allowed_paths()
self.in_transaction = True
def post_exec(self, success_status=True):
if success_status:
# send messages produced
for m in self.messages:
self.queue.send(*m)
# register serials
if self.serials:
self.commission_serials.insert_many(
self.serials)
# commit to ensure that the serials are registered
# even if resolve commission fails
self.wrapper.commit()
# start new transaction
self.wrapper.execute()
r = self.astakosclient.resolve_commissions(
accept_serials=self.serials,
reject_serials=[])
self.commission_serials.delete_many(
r['accepted'])
self.wrapper.commit()
else:
if self.serials:
r = self.astakosclient.resolve_commissions(
accept_serials=[],
reject_serials=self.serials)
self.commission_serials.delete_many(
r['rejected'])
self.wrapper.rollback()
self.in_transaction = False
def close(self):
self.wrapper.close()
self.queue.close()
@property
def using_external_quotaholder(self):
return not isinstance(self.astakosclient, DisabledAstakosClient)
@debug_method
@backend_method
@list_method
def list_accounts(self, user, marker=None, limit=10000):
"""Return a list of accounts the user can access."""
return self._allowed_accounts(user)
def _get_account_quotas(self, account):
"""Get account usage from astakos."""
quotas = self.astakosclient.service_get_quotas(account)[account]
return quotas.get(DEFAULT_SOURCE, {}).get(DEFAULT_DISKSPACE_RESOURCE,
{})
def _get_account_quotas(self, account):
"""Get account usage from astakos."""
quotas = self.astakosclient.service_get_quotas(account)[account]
return quotas.get(DEFAULT_SOURCE, {}).get(DEFAULT_DISKSPACE_RESOURCE,
{})
@debug_method
@backend_method
def get_account_meta(self, user, account, domain, until=None,
include_user_defined=True):
"""Return a dictionary with the account metadata for the domain."""
self._can_read_account(user, account)
path, node = self._lookup_account(account, user == account)
if user != account:
if until or (node is None):
raise NotAllowedError
try:
props = self._get_properties(node, until)
mtime = props[self.MTIME]
except NameError:
props = None
mtime = until
count, bytes, tstamp = self._get_statistics(node, until, compute=True)
tstamp = max(tstamp, mtime)
if until is None:
modified = tstamp
else:
modified = self._get_statistics(
node, compute=True)[2] # Overall last modification.
modified = max(modified, mtime)
if user != account:
meta = {'name': account}
else:
meta = {}
if props is not None and include_user_defined:
meta.update(
dict(self.node.attribute_get(props[self.SERIAL], domain)))
if until is not None:
meta.update({'until_timestamp': tstamp})
meta.update({'name': account, 'count': count, 'bytes': bytes})
if self.using_external_quotaholder:
external_quota = self._get_account_quotas(account)
meta['bytes'] = external_quota.get('usage', 0)
meta.update({'modified': modified})
return meta
@debug_method
@backend_method
def update_account_meta(self, user, account, domain, meta, replace=False):
"""Update the metadata associated with the account for the domain."""
self._can_write_account(user, account)
path, node = self._lookup_account(account, True)
self._put_metadata(user, node, domain, meta, replace,
update_statistics_ancestors_depth=-1)
@debug_method
@backend_method
def get_account_groups(self, user, account):
"""Return a dictionary with the user groups defined for the account."""
self._can_read_account(user, account)
if user != account:
return {}
self._lookup_account(account, True)
return self.permissions.group_dict(account)
@debug_method
@backend_method
def update_account_groups(self, user, account, groups, replace=False):
"""Update the groups associated with the account."""
self._can_write_account(user, account)
self._lookup_account(account, True)
self._check_groups(groups)
if replace:
self.permissions.group_destroy(account)
for k, v in groups.iteritems():
if not replace: # If not already deleted.
self.permissions.group_delete(account, k)
if v:
self.permissions.group_addmany(account, k, v)
@debug_method
@backend_method
def get_account_policy(self, user, account):
"""Return a dictionary with the account policy."""
self._can_read_account(user, account)
if user != account:
return {}
path, node = self._lookup_account(account, True)
policy = self._get_policy(node, is_account_policy=True)
if self.using_external_quotaholder:
external_quota = self._get_account_quotas(account)
policy['quota'] = external_quota.get('limit', 0)
return policy
@debug_method
@backend_method
def update_account_policy(self, user, account, policy, replace=False):
"""Update the policy associated with the account."""
self._can_write_account(user, account)
path, node = self._lookup_account(account, True)
self._check_policy(policy, is_account_policy=True)
self._put_policy(node, policy, replace, is_account_policy=True)
@debug_method
@backend_method
def put_account(self, user, account, policy=None):
"""Create a new account with the given name."""
policy = policy or {}
self._can_write_account(user, account)
node = self.node.node_lookup(account)
if node is not None:
raise AccountExists('Account already exists')
if policy:
self._check_policy(policy, is_account_policy=True)
node = self._put_path(user, self.ROOTNODE, account,
update_statistics_ancestors_depth=-1)
self._put_policy(node, policy, True, is_account_policy=True)
@debug_method
@backend_method
def delete_account(self, user, account):
"""Delete the account with the given name."""
self._can_write_account(user, account)
node = self.node.node_lookup(account)
if node is None:
return
if not self.node.node_remove(node,
update_statistics_ancestors_depth=-1):
raise AccountNotEmpty('Account is not empty')
self.permissions.group_destroy(account)
# remove all the cached allowed paths
# removing the specific path could be more expensive
self._reset_allowed_paths()
@debug_method
@backend_method
@list_method
def list_containers(self, user, account, marker=None, limit=10000,
shared=False, until=None, public=False):
"""Return a list of containers existing under an account."""
self._can_read_account(user, account)
if user != account:
if until:
raise NotAllowedError
return self._allowed_containers(user, account)
if shared or public:
allowed = set()
if shared:
allowed.update([x.split('/', 2)[1] for x in
self.permissions.access_list_shared(account)])
if public:
allowed.update([x[0].split('/', 2)[1] for x in
self.permissions.public_list(account)])
return sorted(allowed)
node = self.node.node_lookup(account)
return [x[0] for x in self._list_object_properties(
node, account, '', '/', marker, limit, False, None, [], until)]
@debug_method
@backend_method
def list_container_meta(self, user, account, container, domain,
until=None):
"""Return a list of the container's object meta keys for a domain."""
self._can_read_container(user, account, container)
allowed = []
if user != account:
if until:
raise NotAllowedError
path, node = self._lookup_container(account, container)
before = until if until is not None else inf
allowed = self._get_formatted_paths(allowed)
return self.node.latest_attribute_keys(node, domain, before,
CLUSTER_DELETED, allowed)
@debug_method
@backend_method
def get_container_meta(self, user, account, container, domain, until=None,
include_user_defined=True):
"""Return a dictionary with the container metadata for the domain."""
self._can_read_container(user, account, container)
if user != account:
if until:
raise NotAllowedError
path, node = self._lookup_container(account, container)
props = self._get_properties(node, until)
mtime = props[self.MTIME]
count, bytes, tstamp = self._get_statistics(node, until)
tstamp = max(tstamp, mtime)
if until is None:
modified = tstamp
else:
modified = self._get_statistics(
node)[2] # Overall last modification.
modified = max(modified, mtime)
if user != account:
meta = {'name': container}
else:
meta = {}
if include_user_defined:
meta.update(
dict(self.node.attribute_get(props[self.SERIAL], domain)))
if until is not None:
meta.update({'until_timestamp': tstamp})
meta.update({'name': container, 'count': count, 'bytes': bytes})
meta.update({'modified': modified})
return meta
@debug_method
@backend_method
def update_container_meta(self, user, account, container, domain, meta,
replace=False):
"""Update the metadata associated with the container for the domain."""
self._can_write_container(user, account, container)
path, node = self._lookup_container(account, container)
src_version_id, dest_version_id = self._put_metadata(
user, node, domain, meta, replace,
update_statistics_ancestors_depth=0)
if src_version_id is not None:
versioning = self._get_policy(
node, is_account_policy=False)['versioning']
if versioning != 'auto':
self.node.version_remove(src_version_id,
update_statistics_ancestors_depth=0)
@debug_method
@backend_method
def get_container_policy(self, user, account, container):
"""Return a dictionary with the container policy."""
self._can_read_container(user, account, container)
if user != account:
return {}
path, node = self._lookup_container(account, container)
return self._get_policy(node, is_account_policy=False)
@debug_method
@backend_method
def update_container_policy(self, user, account, container, policy,
replace=False):
"""Update the policy associated with the container."""
self._can_write_container(user, account, container)
path, node = self._lookup_container(account, container)
self._check_policy(policy, is_account_policy=False)
self._put_policy(node, policy, replace, is_account_policy=False)
@debug_method
@backend_method
def put_container(self, user, account, container, policy=None):
"""Create a new container with the given name."""
policy = policy or {}
self._can_write_container(user, account, container)
try:
path, node = self._lookup_container(account, container)
except NameError:
pass
else:
raise ContainerExists('Container already exists')
if policy:
self._check_policy(policy, is_account_policy=False)
path = '/'.join((account, container))
node = self._put_path(
user, self._lookup_account(account, True)[1], path,
update_statistics_ancestors_depth=-1)
self._put_policy(node, policy, True, is_account_policy=False)
@debug_method
@backend_method
def delete_container(self, user, account, container, until=None, prefix='',
delimiter=None):
"""Delete/purge the container with the given name."""
self._can_write_container(user, account, container)
path, node = self._lookup_container(account, container)
if until is not None:
hashes, size, serials = self.node.node_purge_children(
node, until, CLUSTER_HISTORY,
update_statistics_ancestors_depth=0)
for h in hashes:
self.store.map_delete(h)
self.node.node_purge_children(node, until, CLUSTER_DELETED,
update_statistics_ancestors_depth=0)
if not self.free_versioning:
self._report_size_change(
user, account, -size, {
'action': 'container purge',
'path': path,
'versions': ','.join(str(i) for i in serials)
}
)
return
if not delimiter:
if self._get_statistics(node)[0] > 0:
raise ContainerNotEmpty('Container is not empty')
hashes, size, serials = self.node.node_purge_children(
node, inf, CLUSTER_HISTORY,
update_statistics_ancestors_depth=0)
for h in hashes:
self.store.map_delete(h)
self.node.node_purge_children(node, inf, CLUSTER_DELETED,
update_statistics_ancestors_depth=0)
self.node.node_remove(node, update_statistics_ancestors_depth=0)
if not self.free_versioning:
self._report_size_change(
user, account, -size, {
'action': 'container purge',
'path': path,
'versions': ','.join(str(i) for i in serials)
}
)
else:
# remove only contents
src_names = self._list_objects_no_limit(
user, account, container, prefix='', delimiter=None,
virtual=False, domain=None, keys=[], shared=False, until=None,
size_range=None, all_props=True, public=False)
paths = []
for t in src_names:
path = '/'.join((account, container, t[0]))
node = t[2]
if not self._exists(node):
continue
src_version_id, dest_version_id = self._put_version_duplicate(
user, node, size=0, type='', hash=None, checksum='',
cluster=CLUSTER_DELETED,
update_statistics_ancestors_depth=1)
del_size = self._apply_versioning(
account, container, src_version_id,
update_statistics_ancestors_depth=1)
self._report_size_change(
user, account, -del_size, {
'action': 'object delete',
'path': path,
'versions': ','.join([str(dest_version_id)])})
self._report_object_change(
user, account, path, details={'action': 'object delete'})
paths.append(path)
self.permissions.access_clear_bulk(paths)
# remove all the cached allowed paths
# removing the specific path could be more expensive
self._reset_allowed_paths()
def _list_objects(self, user, account, container, prefix, delimiter,
marker, limit, virtual, domain, keys, shared, until,
size_range, all_props, public):
if user != account and until:
raise NotAllowedError
objects = set()
if shared and public:
# get shared first
shared_paths = self._list_object_permissions(
user, account, container, prefix, shared=True, public=False)
if shared_paths:
path, node = self._lookup_container(account, container)
shared_paths = self._get_formatted_paths(shared_paths)
objects = set(self._list_object_properties(
node, path, prefix, delimiter, marker, limit, virtual,
domain, keys, until, size_range, shared_paths, all_props))
# get public
objects |= set(self._list_public_object_properties(
user, account, container, prefix, all_props))
objects = list(objects)
objects.sort(key=lambda x: x[0])
elif public:
objects = self._list_public_object_properties(
user, account, container, prefix, all_props)
else:
allowed = self._list_object_permissions(
user, account, container, prefix, shared, public=False)
if shared and not allowed:
return []
path, node = self._lookup_container(account, container)
allowed = self._get_formatted_paths(allowed)
objects = self._list_object_properties(
node, path, prefix, delimiter, marker, limit, virtual, domain,
keys, until, size_range, allowed, all_props)
# apply limits
start, limit = self._list_limits(objects, marker, limit)
return objects[start:start + limit]
def _list_public_object_properties(self, user, account, container, prefix,
all_props):
public = self._list_object_permissions(
user, account, container, prefix, shared=False, public=True)
paths, nodes = self._lookup_objects(public)
path = '/'.join((account, container))
cont_prefix = path + '/'
paths = [x[len(cont_prefix):] for x in paths]
objects = [(p,) + props for p, props in
zip(paths, self.node.version_lookup_bulk(
nodes, all_props=all_props, order_by_path=True))]
return objects
def _list_objects_no_limit(self, user, account, container, prefix,
delimiter, virtual, domain, keys, shared, until,
size_range, all_props, public):
objects = []
while True:
marker = objects[-1] if objects else None
limit = 10000
l = self._list_objects(
user, account, container, prefix, delimiter, marker, limit,
virtual, domain, keys, shared, until, size_range, all_props,
public)
objects.extend(l)
if not l or len(l) < limit:
break
return objects
def _list_object_permissions(self, user, account, container, prefix,
shared, public):
allowed = []
path = '/'.join((account, container, prefix)).rstrip('/')
if user != account:
allowed = self.permissions.access_list_paths(user, path)
if not allowed:
raise NotAllowedError
else:
allowed = set()
if shared:
allowed.update(self.permissions.access_list_shared(path))
if public:
allowed.update(
[x[0] for x in self.permissions.public_list(path)])
allowed = sorted(allowed)
if not allowed:
return []
return allowed
@debug_method
@backend_method
def list_objects(self, user, account, container, prefix='', delimiter=None,
marker=None, limit=10000, virtual=True, domain=None,
keys=None, shared=False, until=None, size_range=None,
public=False):
"""List (object name, object version_id) under a container."""
keys = keys or []
return self._list_objects(
user, account, container, prefix, delimiter, marker, limit,
virtual, domain, keys, shared, until, size_range, False, public)
@debug_method
@backend_method
def list_object_meta(self, user, account, container, prefix='',
delimiter=None, marker=None, limit=10000,
virtual=True, domain=None, keys=None, shared=False,
until=None, size_range=None, public=False):
"""Return a list of metadata dicts of objects under a container."""
keys = keys or []
props = self._list_objects(
user, account, container, prefix, delimiter, marker, limit,
virtual, domain, keys, shared, until, size_range, True, public)
objects = []
for p in props:
if len(p) == 2:
objects.append({'subdir': p[0]})
else:
objects.append({
'name': p[0],
'bytes': p[self.SIZE + 1],
'type': p[self.TYPE + 1],
'hash': p[self.HASH + 1],
'version': p[self.SERIAL + 1],
'version_timestamp': p[self.MTIME + 1],
'modified': p[self.MTIME + 1] if until is None else None,
'modified_by': p[self.MUSER + 1],
'uuid': p[self.UUID + 1],
'checksum': p[self.CHECKSUM + 1]})
return objects
@debug_method
@backend_method
def list_object_permissions(self, user, account, container, prefix=''):
"""Return a list of paths enforce permissions under a container."""
return self._list_object_permissions(user, account, container, prefix,
True, False)
@debug_method
@backend_method
def list_object_public(self, user, account, container, prefix=''):
"""Return a mapping of object paths to public ids under a container."""
public = {}
for path, p in self.permissions.public_list('/'.join((account,
container,
prefix))):
public[path] = p
return public
@debug_method
@backend_method
def get_object_meta(self, user, account, container, name, domain,
version=None, include_user_defined=True):
"""Return a dictionary with the object metadata for the domain."""
self._can_read_object(user, account, container, name)
path, node = self._lookup_object(account, container, name)
props = self._get_version(node, version)
if version is None:
modified = props[self.MTIME]
else:
try:
modified = self._get_version(
node)[self.MTIME] # Overall last modification.
except NameError: # Object may be deleted.
del_props = self.node.version_lookup(
node, inf, CLUSTER_DELETED)
if del_props is None:
raise ItemNotExists('Object does not exist')
modified = del_props[self.MTIME]
meta = {}
if include_user_defined:
meta.update(
dict(self.node.attribute_get(props[self.SERIAL], domain)))
meta.update({'name': name,
'bytes': props[self.SIZE],
'type': props[self.TYPE],
'hash': props[self.HASH],
'version': props[self.SERIAL],
'version_timestamp': props[self.MTIME],
'modified': modified,
'modified_by': props[self.MUSER],
'uuid': props[self.UUID],
'checksum': props[self.CHECKSUM]})
return meta
@debug_method
@backend_method
def update_object_meta(self, user, account, container, name, domain, meta,
replace=False):
"""Update object metadata for a domain and return the new version."""
self._can_write_object(user, account, container, name)
path, node = self._lookup_object(account, container, name,
lock_container=True)
src_version_id, dest_version_id = self._put_metadata(
user, node, domain, meta, replace,
update_statistics_ancestors_depth=1)
self._apply_versioning(account, container, src_version_id,
update_statistics_ancestors_depth=1)
return dest_version_id
@debug_method
@backend_method
def get_object_permissions_bulk(self, user, account, container, names):
"""Return the action allowed on the object, the path
from which the object gets its permissions from,
along with a dictionary containing the permissions."""
permissions_path = self._get_permissions_path_bulk(account, container,
names)
access_objects = self.permissions.access_check_bulk(permissions_path,
user)
#group_parents = access_objects['group_parents']
nobject_permissions = {}
cpath = '/'.join((account, container, ''))
cpath_idx = len(cpath)
for path in permissions_path:
allowed = 1
name = path[cpath_idx:]
if user != account:
try:
allowed = access_objects[path]
except KeyError:
raise NotAllowedError
access_dict, allowed = \
self.permissions.access_get_for_bulk(access_objects[path])
nobject_permissions[name] = (self.ALLOWED[allowed], path,
access_dict)
self._lookup_objects(permissions_path)
return nobject_permissions
@debug_method
@backend_method
def get_object_permissions(self, user, account, container, name):
"""Return the action allowed on the object, the path
from which the object gets its permissions from,
along with a dictionary containing the permissions."""
allowed = 'write'
permissions_path = self._get_permissions_path(account, container, name)
if user != account:
if self.permissions.access_check(permissions_path, self.WRITE,
user):
allowed = 'write'
elif self.permissions.access_check(permissions_path, self.READ,
user):
allowed = 'read'
else:
raise NotAllowedError
self._lookup_object(account, container, name)
return (allowed,
permissions_path,
self.permissions.access_get(permissions_path))
@debug_method
@backend_method
def update_object_permissions(self, user, account, container, name,
permissions):
"""Update the permissions associated with the object."""
if user != account:
raise NotAllowedError
path = self._lookup_object(account, container, name,
lock_container=True)[0]
self._check_permissions(path, permissions)
try:
self.permissions.access_set(path, permissions)
except:
raise ValueError
else:
self._report_sharing_change(user, account, path, {'members':
self.permissions.access_members(path)})
# remove all the cached allowed paths
# filtering out only those affected could be more expensive
self._reset_allowed_paths()
@debug_method
@backend_method
def get_object_public(self, user, account, container, name):
"""Return the public id of the object if applicable."""
self._can_read_object(user, account, container, name)
path = self._lookup_object(account, container, name)[0]
p = self.permissions.public_get(path)
return p
@debug_method
@backend_method
def update_object_public(self, user, account, container, name, public):
"""Update the public status of the object."""
self._can_write_object(user, account, container, name)
path = self._lookup_object(account, container, name,
lock_container=True)[0]
if not public:
self.permissions.public_unset(path)
else:
self.permissions.public_set(
path, self.public_url_security, self.public_url_alphabet)
@debug_method
@backend_method
def get_object_hashmap(self, user, account, container, name, version=None):
"""Return the object's size and a list with partial hashes."""
self._can_read_object(user, account, container, name)
path, node = self._lookup_object(account, container, name)
props = self._get_version(node, version)
if props[self.HASH] is None:
return 0, ()
hashmap = self.store.map_get(self._unhexlify_hash(props[self.HASH]))
return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
def _update_object_hash(self, user, account, container, name, size, type,
hash, checksum, domain, meta, replace_meta,
permissions, src_node=None, src_version_id=None,
is_copy=False, report_size_change=True):
if permissions is not None and user != account:
raise NotAllowedError
self._can_write_object(user, account, container, name)
if permissions is not None:
path = '/'.join((account, container, name))
self._check_permissions(path, permissions)
account_path, account_node = self._lookup_account(account, True)
container_path, container_node = self._lookup_container(
account, container)
path, node = self._put_object_node(
container_path, container_node, name)
pre_version_id, dest_version_id = self._put_version_duplicate(
user, node, src_node=src_node, size=size, type=type, hash=hash,
checksum=checksum, is_copy=is_copy,
update_statistics_ancestors_depth=1)
# Handle meta.
if src_version_id is None:
src_version_id = pre_version_id
self._put_metadata_duplicate(
src_version_id, dest_version_id, domain, node, meta, replace_meta)
del_size = self._apply_versioning(account, container, pre_version_id,
update_statistics_ancestors_depth=1)
size_delta = size - del_size
if size_delta > 0:
# Check account quota.
if not self.using_external_quotaholder:
account_quota = long(self._get_policy(
account_node, is_account_policy=True)['quota'])
account_usage = self._get_statistics(account_node,
compute=True)[1]
if (account_quota > 0 and account_usage > account_quota):
raise QuotaError(
'Account quota exceeded: limit: %s, usage: %s' % (
account_quota, account_usage))
# Check container quota.
container_quota = long(self._get_policy(
container_node, is_account_policy=False)['quota'])
container_usage = self._get_statistics(container_node)[1]
if (container_quota > 0 and container_usage > container_quota):
# This must be executed in a transaction, so the version is
# never created if it fails.
raise QuotaError(
'Container quota exceeded: limit: %s, usage: %s' % (
container_quota, container_usage
)
)
if report_size_change:
self._report_size_change(
user, account, size_delta,
{'action': 'object update', 'path': path,
'versions': ','.join([str(dest_version_id)])})
if permissions is not None:
self.permissions.access_set(path, permissions)
self._report_sharing_change(
user, account, path,
{'members': self.permissions.access_members(path)})
self._report_object_change(
user, account, path,
details={'version': dest_version_id, 'action': 'object update'})
return dest_version_id
@debug_method
def update_object_hashmap(self, user, account, container, name, size, type,
hashmap, checksum, domain, meta=None,
replace_meta=False, permissions=None):
"""Create/update an object's hashmap and return the new version."""
meta = meta or {}
if size == 0: # No such thing as an empty hashmap.
hashmap = [self.put_block('')]
map = HashMap(self.block_size, self.hash_algorithm)
map.extend([self._unhexlify_hash(x) for x in hashmap])
missing = self.store.block_search(map)
if missing:
ie = IndexError()
ie.data = [binascii.hexlify(x) for x in missing]
raise ie
hash = map.hash()
hexlified = binascii.hexlify(hash)
# _update_object_hash() locks destination path
dest_version_id = self._update_object_hash(
user, account, container, name, size, type, hexlified, checksum,
domain, meta, replace_meta, permissions)
self.store.map_put(hash, map)
return dest_version_id, hexlified
@debug_method
@backend_method
def update_object_checksum(self, user, account, container, name, version,
checksum):
"""Update an object's checksum."""
# Update objects with greater version and same hashmap
# and size (fix metadata updates).
self._can_write_object(user, account, container, name)
path, node = self._lookup_object(account, container, name,
lock_container=True)
props = self._get_version(node, version)
versions = self.node.node_get_versions(node)
for x in versions:
if (x[self.SERIAL] >= int(version) and
x[self.HASH] == props[self.HASH] and
x[self.SIZE] == props[self.SIZE]):
self.node.version_put_property(
x[self.SERIAL], 'checksum', checksum)
def _copy_object(self, user, src_account, src_container, src_name,
dest_account, dest_container, dest_name, type,
dest_domain=None, dest_meta=None, replace_meta=False,
permissions=None, src_version=None, is_move=False,
delimiter=None):
report_size_change = not is_move
dest_meta = dest_meta or {}
dest_version_ids = []
self._can_read_object(user, src_account, src_container, src_name)
src_container_path = '/'.join((src_account, src_container))
dest_container_path = '/'.join((dest_account, dest_container))
# Lock container paths in alphabetical order
if src_container_path < dest_container_path:
self._lookup_container(src_account, src_container)
self._lookup_container(dest_account, dest_container)
else:
self._lookup_container(dest_account, dest_container)
self._lookup_container(src_account, src_container)
path, node = self._lookup_object(src_account, src_container, src_name)
# TODO: Will do another fetch of the properties in duplicate version...
props = self._get_version(
node, src_version) # Check to see if source exists.
src_version_id = props[self.SERIAL]
hash = props[self.HASH]
size = props[self.SIZE]
is_copy = not is_move and (src_account, src_container, src_name) != (
dest_account, dest_container, dest_name) # New uuid.
dest_version_ids.append(self._update_object_hash(
user, dest_account, dest_container, dest_name, size, type, hash,
None, dest_domain, dest_meta, replace_meta, permissions,
src_node=node, src_version_id=src_version_id, is_copy=is_copy,
report_size_change=report_size_change))
if is_move and ((src_account, src_container, src_name) !=
(dest_account, dest_container, dest_name)):
self._delete_object(user, src_account, src_container, src_name,
report_size_change=report_size_change)
if delimiter:
prefix = (src_name + delimiter if not
src_name.endswith(delimiter) else src_name)
src_names = self._list_objects_no_limit(
user, src_account, src_container, prefix, delimiter=None,
virtual=False, domain=None, keys=[], shared=False, until=None,
size_range=None, all_props=True, public=False)
src_names.sort(key=lambda x: x[2]) # order by nodes
paths = [elem[0] for elem in src_names]
nodes = [elem[2] for elem in src_names]
# TODO: Will do another fetch of the properties
# in duplicate version...
props = self._get_versions(nodes) # Check to see if source exists.
for prop, path, node in zip(props, paths, nodes):
src_version_id = prop[self.SERIAL]
hash = prop[self.HASH]
vtype = prop[self.TYPE]
size = prop[self.SIZE]
dest_prefix = dest_name + delimiter if not dest_name.endswith(
delimiter) else dest_name
vdest_name = path.replace(prefix, dest_prefix, 1)
# _update_object_hash() locks destination path
dest_version_ids.append(self._update_object_hash(
user, dest_account, dest_container, vdest_name, size,
vtype, hash, None, dest_domain, meta={},
replace_meta=False, permissions=None, src_node=node,
src_version_id=src_version_id, is_copy=is_copy,
report_size_change=report_size_change))
if is_move and ((src_account, src_container, src_name) !=
(dest_account, dest_container, dest_name)):
self._delete_object(user, src_account, src_container, path,
report_size_change=report_size_change)
return (dest_version_ids[0] if len(dest_version_ids) == 1 else
dest_version_ids)
@debug_method
@backend_method
def copy_object(self, user, src_account, src_container, src_name,
dest_account, dest_container, dest_name, type, domain,
meta=None, replace_meta=False, permissions=None,
src_version=None, delimiter=None):
"""Copy an object's data and metadata."""
meta = meta or {}
dest_version_id = self._copy_object(
user, src_account, src_container, src_name, dest_account,
dest_container, dest_name, type, domain, meta, replace_meta,
permissions, src_version, False, delimiter)
return dest_version_id
@debug_method
@backend_method
def move_object(self, user, src_account, src_container, src_name,
dest_account, dest_container, dest_name, type, domain,
meta=None, replace_meta=False, permissions=None,
delimiter=None):
"""Move an object's data and metadata."""
meta = meta or {}
if user != src_account:
raise NotAllowedError
dest_version_id = self._move_object(
user, src_account, src_container, src_name, dest_account,
dest_container, dest_name, type, domain, meta, replace_meta,
permissions, None, delimiter=delimiter)
return dest_version_id
def _delete_object(self, user, account, container, name, until=None,
delimiter=None, report_size_change=True):
if user != account:
raise NotAllowedError
# lookup object and lock container path also
path, node = self._lookup_object(account, container, name,
lock_container=True)
if until is not None:
if node is None:
return
hashes = []
size = 0
serials = []
h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL,
update_statistics_ancestors_depth=1)
hashes += h
size += s
serials += v
h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY,
update_statistics_ancestors_depth=1)
hashes += h
if not self.free_versioning:
size += s
serials += v
for h in hashes:
self.store.map_delete(h)
self.node.node_purge(node, until, CLUSTER_DELETED,
update_statistics_ancestors_depth=1)
try:
self._get_version(node)
except NameError:
self.permissions.access_clear(path)
self._report_size_change(
user, account, -size, {
'action': 'object purge',
'path': path,
'versions': ','.join(str(i) for i in serials)
}
)
return
if not self._exists(node):
raise ItemNotExists('Object is deleted.')
src_version_id, dest_version_id = self._put_version_duplicate(
user, node, size=0, type='', hash=None, checksum='',
cluster=CLUSTER_DELETED, update_statistics_ancestors_depth=1)
del_size = self._apply_versioning(account, container, src_version_id,
update_statistics_ancestors_depth=1)
if report_size_change:
self._report_size_change(
user, account, -del_size,
{'action': 'object delete',
'path': path,
'versions': ','.join([str(dest_version_id)])})
self._report_object_change(
user, account, path, details={'action': 'object delete'})
self.permissions.access_clear(path)
if delimiter:
prefix = name + delimiter if not name.endswith(delimiter) else name
src_names = self._list_objects_no_limit(
user, account, container, prefix, delimiter=None,
virtual=False, domain=None, keys=[], shared=False, until=None,
size_range=None, all_props=True, public=False)
paths = []
for t in src_names:
path = '/'.join((account, container, t[0]))
node = t[2]
if not self._exists(node):
continue
src_version_id, dest_version_id = self._put_version_duplicate(
user, node, size=0, type='', hash=None, checksum='',
cluster=CLUSTER_DELETED,
update_statistics_ancestors_depth=1)
del_size = self._apply_versioning(
account, container, src_version_id,
update_statistics_ancestors_depth=1)
if report_size_change:
self._report_size_change(
user, account, -del_size,
{'action': 'object delete',
'path': path,
'versions': ','.join([str(dest_version_id)])})
self._report_object_change(
user, account, path, details={'action': 'object delete'})
paths.append(path)
self.permissions.access_clear_bulk(paths)
# remove all the cached allowed paths
# removing the specific path could be more expensive
self._reset_allowed_paths()
@debug_method
@backend_method
def delete_object(self, user, account, container, name, until=None,
prefix='', delimiter=None):
"""Delete/purge an object."""
self._delete_object(user, account, container, name, until, delimiter)
@debug_method
@backend_method
def list_versions(self, user, account, container, name):
"""Return a list of all object (version, version_timestamp) tuples."""
self._can_read_object(user, account, container, name)
path, node = self._lookup_object(account, container, name)
versions = self.node.node_get_versions(node)
return [[x[self.SERIAL], x[self.MTIME]] for x in versions if
x[self.CLUSTER] != CLUSTER_DELETED]
@debug_method
@backend_method
def get_uuid(self, user, uuid, check_permissions=True):
"""Return the (account, container, name) for the UUID given."""
info = self.node.latest_uuid(uuid, CLUSTER_NORMAL)
if info is None:
raise NameError
path, serial = info
account, container, name = path.split('/', 2)
if check_permissions:
self._can_read_object(user, account, container, name)
return (account, container, name)
@debug_method
@backend_method
def get_public(self, user, public):
"""Return the (account, container, name) for the public id given."""
path = self.permissions.public_path(public)
if path is None:
raise NameError
account, container, name = path.split('/', 2)
self._can_read_object(user, account, container, name)
return (account, container, name)
def get_block(self, hash):
"""Return a block's data."""
logger.debug("get_block: %s", hash)
block = self.store.block_get(self._unhexlify_hash(hash))
if not block:
raise ItemNotExists('Block does not exist')
return block
def put_block(self, data):
"""Store a block and return the hash."""
logger.debug("put_block: %s", len(data))
return binascii.hexlify(self.store.block_put(data))
def update_block(self, hash, data, offset=0):
"""Update a known block and return the hash."""
logger.debug("update_block: %s %s %s", hash, len(data), offset)
if offset == 0 and len(data) == self.block_size:
return self.put_block(data)
h = self.store.block_update(self._unhexlify_hash(hash), offset, data)
return binascii.hexlify(h)
# Path functions.
def _generate_uuid(self):
return str(uuidlib.uuid4())
def _put_object_node(self, path, parent, name):
path = '/'.join((path, name))
node = self.node.node_lookup(path)
if node is None:
node = self.node.node_create(parent, path)
return path, node
def _put_path(self, user, parent, path,
update_statistics_ancestors_depth=None):
node = self.node.node_create(parent, path)
self.node.version_create(node, None, 0, '', None, user,
self._generate_uuid(), '', CLUSTER_NORMAL,
update_statistics_ancestors_depth)
return node
def _lookup_account(self, account, create=True):
node = self.node.node_lookup(account)
if node is None and create:
node = self._put_path(
account, self.ROOTNODE, account,
update_statistics_ancestors_depth=-1) # User is account.
return account, node
def _lookup_container(self, account, container):
for_update = True if self.lock_container_path else False
path = '/'.join((account, container))
node = self.node.node_lookup(path, for_update)
if node is None:
raise ItemNotExists('Container does not exist')
return path, node
def _lookup_object(self, account, container, name, lock_container=False):
if lock_container:
self._lookup_container(account, container)
path = '/'.join((account, container, name))
node = self.node.node_lookup(path)
if node is None:
raise ItemNotExists('Object does not exist')
return path, node
def _lookup_objects(self, paths):
nodes = self.node.node_lookup_bulk(paths)
return paths, nodes
def _get_properties(self, node, until=None):
"""Return properties until the timestamp given."""
before = until if until is not None else inf
props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
if props is None and until is not None:
props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
if props is None:
raise ItemNotExists('Path does not exist')
return props
def _get_statistics(self, node, until=None, compute=False):
"""Return (count, sum of size, timestamp) of everything under node."""
if until is not None:
stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
elif compute:
stats = self.node.statistics_latest(node,
except_cluster=CLUSTER_DELETED)
else:
stats = self.node.statistics_get(node, CLUSTER_NORMAL)
if stats is None:
stats = (0, 0, 0)
return stats
def _get_version(self, node, version=None):
if version is None:
props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
if props is None:
raise ItemNotExists('Object does not exist')
else:
try:
version = int(version)
except ValueError:
raise VersionNotExists('Version does not exist')
props = self.node.version_get_properties(version, node=node)
if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
raise VersionNotExists('Version does not exist')
return props
def _get_versions(self, nodes):
return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
def _put_version_duplicate(self, user, node, src_node=None, size=None,
type=None, hash=None, checksum=None,
cluster=CLUSTER_NORMAL, is_copy=False,
update_statistics_ancestors_depth=None):
"""Create a new version of the node."""
props = self.node.version_lookup(
node if src_node is None else src_node, inf, CLUSTER_NORMAL)
if props is not None:
src_version_id = props[self.SERIAL]
src_hash = props[self.HASH]
src_size = props[self.SIZE]
src_type = props[self.TYPE]
src_checksum = props[self.CHECKSUM]
else:
src_version_id = None
src_hash = None
src_size = 0
src_type = ''
src_checksum = ''
if size is None: # Set metadata.
hash = src_hash # This way hash can be set to None
# (account or container).
size = src_size
if type is None:
type = src_type
if checksum is None:
checksum = src_checksum
uuid = self._generate_uuid(
) if (is_copy or src_version_id is None) else props[self.UUID]
if src_node is None:
pre_version_id = src_version_id
else:
pre_version_id = None
props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
if props is not None:
pre_version_id = props[self.SERIAL]
if pre_version_id is not None:
self.node.version_recluster(pre_version_id, CLUSTER_HISTORY,
update_statistics_ancestors_depth)
dest_version_id, mtime = self.node.version_create(
node, hash, size, type, src_version_id, user, uuid, checksum,
cluster, update_statistics_ancestors_depth)
self.node.attribute_unset_is_latest(node, dest_version_id)
return pre_version_id, dest_version_id
def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain,
node, meta, replace=False):
if src_version_id is not None:
self.node.attribute_copy(src_version_id, dest_version_id)
if not replace:
self.node.attribute_del(dest_version_id, domain, (
k for k, v in meta.iteritems() if v == ''))
self.node.attribute_set(dest_version_id, domain, node, (
(k, v) for k, v in meta.iteritems() if v != ''))
else:
self.node.attribute_del(dest_version_id, domain)
self.node.attribute_set(dest_version_id, domain, node, ((
k, v) for k, v in meta.iteritems()))
def _put_metadata(self, user, node, domain, meta, replace=False,
update_statistics_ancestors_depth=None):
"""Create a new version and store metadata."""
src_version_id, dest_version_id = self._put_version_duplicate(
user, node,
update_statistics_ancestors_depth=
update_statistics_ancestors_depth)
self._put_metadata_duplicate(
src_version_id, dest_version_id, domain, node, meta, replace)
return src_version_id, dest_version_id
def _list_limits(self, listing, marker, limit):
start = 0
if marker:
try:
start = listing.index(marker) + 1
except ValueError:
pass
if not limit or limit > 10000:
limit = 10000
return start, limit
def _list_object_properties(self, parent, path, prefix='', delimiter=None,
marker=None, limit=10000, virtual=True,
domain=None, keys=None, until=None,
size_range=None, allowed=None,
all_props=False):
keys = keys or []
allowed = allowed or []
cont_prefix = path + '/'
prefix = cont_prefix + prefix
start = cont_prefix + marker if marker else None
before = until if until is not None else inf
filterq = keys if domain else []
sizeq = size_range
objects, prefixes = self.node.latest_version_list(
parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED,
allowed, domain, filterq, sizeq, all_props)
objects.extend([(p, None) for p in prefixes] if virtual else [])
objects.sort(key=lambda x: x[0])
objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
return objects
# Reporting functions.
@debug_method
@backend_method
def _report_size_change(self, user, account, size, details=None):
details = details or {}
if size == 0:
return
account_node = self._lookup_account(account, True)[1]
total = self._get_statistics(account_node, compute=True)[1]
details.update({'user': user, 'total': total})
self.messages.append(
(QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',),
account, QUEUE_INSTANCE_ID, 'diskspace', float(size), details))
if not self.using_external_quotaholder:
return
try:
name = details['path'] if 'path' in details else ''
serial = self.astakosclient.issue_one_commission(
holder=account,
source=DEFAULT_SOURCE,
provisions={'pithos.diskspace': size},
name=name)
except BaseException, e:
raise QuotaError(e)
else:
self.serials.append(serial)
@debug_method
@backend_method
def _report_object_change(self, user, account, path, details=None):
details = details or {}
details.update({'user': user})
self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
account, QUEUE_INSTANCE_ID, 'object', path,
details))
@debug_method
@backend_method
def _report_sharing_change(self, user, account, path, details=None):
details = details or {}
details.update({'user': user})
self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
account, QUEUE_INSTANCE_ID, 'sharing', path,
details))
# Policy functions.
def _check_policy(self, policy, is_account_policy=True):
default_policy = self.default_account_policy \
if is_account_policy else self.default_container_policy
for k in policy.keys():
if policy[k] == '':
policy[k] = default_policy.get(k)
for k, v in policy.iteritems():
if k == 'quota':
q = int(v) # May raise ValueError.
if q < 0:
raise ValueError
elif k == 'versioning':
if v not in ['auto', 'none']:
raise ValueError
else:
raise ValueError
def _put_policy(self, node, policy, replace, is_account_policy=True):
default_policy = self.default_account_policy \
if is_account_policy else self.default_container_policy
if replace:
for k, v in default_policy.iteritems():
if k not in policy:
policy[k] = v
self.node.policy_set(node, policy)
def _get_policy(self, node, is_account_policy=True):
default_policy = self.default_account_policy \
if is_account_policy else self.default_container_policy
policy = default_policy.copy()
policy.update(self.node.policy_get(node))
return policy
def _apply_versioning(self, account, container, version_id,
update_statistics_ancestors_depth=None):
"""Delete the provided version if such is the policy.
Return size of object removed.
"""
if version_id is None:
return 0
path, node = self._lookup_container(account, container)
versioning = self._get_policy(
node, is_account_policy=False)['versioning']
if versioning != 'auto':
hash, size = self.node.version_remove(
version_id, update_statistics_ancestors_depth)
self.store.map_delete(hash)
return size
elif self.free_versioning:
return self.node.version_get_properties(
version_id, keys=('size',))[0]
return 0
# Access control functions.
def _check_groups(self, groups):
# raise ValueError('Bad characters in groups')
pass
def _check_permissions(self, path, permissions):
# raise ValueError('Bad characters in permissions')
pass
def _get_formatted_paths(self, paths):
formatted = []
if len(paths) == 0:
return formatted
props = self.node.get_props(paths)
if props:
for prop in props:
if prop[1].split(';', 1)[0].strip() in (
'application/directory', 'application/folder'):
formatted.append((prop[0].rstrip('/') + '/',
self.MATCH_PREFIX))
formatted.append((prop[0], self.MATCH_EXACT))
return formatted
def _get_permissions_path(self, account, container, name):
path = '/'.join((account, container, name))
permission_paths = self.permissions.access_inherit(path)
permission_paths.sort()
permission_paths.reverse()
for p in permission_paths:
if p == path:
return p
else:
if p.count('/') < 2:
continue
node = self.node.node_lookup(p)
props = None
if node is not None:
props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
if props is not None:
if props[self.TYPE].split(';', 1)[0].strip() in (
'application/directory', 'application/folder'):
return p
return None
def _get_permissions_path_bulk(self, account, container, names):
formatted_paths = []
for name in names:
path = '/'.join((account, container, name))
formatted_paths.append(path)
permission_paths = self.permissions.access_inherit_bulk(
formatted_paths)
permission_paths.sort()
permission_paths.reverse()
permission_paths_list = []
lookup_list = []
for p in permission_paths:
if p in formatted_paths:
permission_paths_list.append(p)
else:
if p.count('/') < 2:
continue
lookup_list.append(p)
if len(lookup_list) > 0:
props = self.node.get_props(lookup_list)
if props:
for prop in props:
if prop[1].split(';', 1)[0].strip() in (
'application/directory', 'application/folder'):
permission_paths_list.append(prop[0])
if len(permission_paths_list) > 0:
return permission_paths_list
return None
def _reset_allowed_paths(self):
self.read_allowed_paths = defaultdict(set)
self.write_allowed_paths = defaultdict(set)
@check_allowed_paths(action=0)
def _can_read_account(self, user, account):
if user != account:
if account not in self._allowed_accounts(user):
raise NotAllowedError
@check_allowed_paths(action=1)
def _can_write_account(self, user, account):
if user != account:
raise NotAllowedError
@check_allowed_paths(action=0)
def _can_read_container(self, user, account, container):
if user != account:
if container not in self._allowed_containers(user, account):
raise NotAllowedError
@check_allowed_paths(action=1)
def _can_write_container(self, user, account, container):
if user != account:
raise NotAllowedError
@check_allowed_paths(action=0)
def _can_read_object(self, user, account, container, name):
if user == account:
return True
path = '/'.join((account, container, name))
if self.permissions.public_get(path) is not None:
return True
path = self._get_permissions_path(account, container, name)
if not path:
raise NotAllowedError
if (not self.permissions.access_check(path, self.READ, user) and not
self.permissions.access_check(path, self.WRITE, user)):
raise NotAllowedError
@check_allowed_paths(action=1)
def _can_write_object(self, user, account, container, name):
if user == account:
return True
path = '/'.join((account, container, name))
path = self._get_permissions_path(account, container, name)
if not path:
raise NotAllowedError
if not self.permissions.access_check(path, self.WRITE, user):
raise NotAllowedError
def _allowed_accounts(self, user):
allow = set()
for path in self.permissions.access_list_paths(user):
p = path.split('/', 1)[0]
allow.add(p)
self.read_allowed_paths[user] |= allow
return sorted(allow)
def _allowed_containers(self, user, account):
allow = set()
for path in self.permissions.access_list_paths(user, account):
p = path.split('/', 2)[1]
allow.add(p)
self.read_allowed_paths[user] |= allow
return sorted(allow)
# Domain functions
@debug_method
@backend_method
def get_domain_objects(self, domain, user=None):
allowed_paths = self.permissions.access_list_paths(
user, include_owned=user is not None, include_containers=False)
if not allowed_paths:
return []
obj_list = self.node.domain_object_list(
domain, allowed_paths, CLUSTER_NORMAL)
return [(path,
self._build_metadata(props, user_defined_meta),
self.permissions.access_get(path)) for
path, props, user_defined_meta in obj_list]
# util functions
def _build_metadata(self, props, user_defined=None,
include_user_defined=True):
meta = {'bytes': props[self.SIZE],
'type': props[self.TYPE],
'hash': props[self.HASH],
'version': props[self.SERIAL],
'version_timestamp': props[self.MTIME],
'modified_by': props[self.MUSER],
'uuid': props[self.UUID],
'checksum': props[self.CHECKSUM]}
if include_user_defined and user_defined is not None:
meta.update(user_defined)
return meta
def _exists(self, node):
try:
self._get_version(node)
except ItemNotExists:
return False
else:
return True
def _unhexlify_hash(self, hash):
try:
return binascii.unhexlify(hash)
except TypeError:
raise InvalidHash(hash)