Commit dccf8009 authored by Sofia Papagiannaki's avatar Sofia Papagiannaki Committed by Giorgos Korfiatis

pithos: Add support for project originated quota

Introduce ``project`` container policy. The value of this policy denotes
the project from which the container quota originate.

Further changes:
* domain argument in get_{account|container|object}_meta backend methods
  has become optional unless user defined metadata are requested
* the pithos frontend does not query anymore the astakosclient
  for the account usage; this is handled by the backend methods
parent ffafa697
......@@ -665,15 +665,17 @@ Available policy directives:
* ``versioning``: Set to ``auto`` or ``none`` (default is ``auto``)
* ``quota``: Size limit in KB (default is ``0`` - unlimited)
* ``project``: The project origin of the container quota
If the container already exists, the operation is equal to a ``POST`` with ``update`` defined.
================ ===============================
Return Code Description
================ ===============================
201 (Created) The container has been created
202 (Accepted) The request has been accepted
================ ===============================
============================== ===============================
Return Code Description
============================== ===============================
201 (Created) The container has been created
202 (Accepted) The request has been accepted
413 (Request Entity Too Large) Insufficient quota to complete the request
============================== ===============================
POST
......@@ -705,11 +707,12 @@ To change policy, include an ``X-Container-Policy-*`` header with the name in th
To upload blocks of data to the container, set ``Content-Type`` to ``application/octet-stream`` and ``Content-Length`` to a valid value (except if using ``chunked`` as the ``Transfer-Encoding``).
================ ===============================
Return Code Description
================ ===============================
202 (Accepted) The request has been accepted
================ ===============================
============================== ===============================
Return Code Description
============================== ===============================
202 (Accepted) The request has been accepted
413 (Request Entity Too Large) Insufficient quota to complete the request
============================== ===============================
DELETE
......
......@@ -150,13 +150,14 @@ def astakos_user(user):
"name": "Firstname Lastname"}}
}
with patch('astakosclient.AstakosClient.get_quotas') as m3:
m3.return_value = {
with patch('astakosclient.AstakosClient.service_get_quotas') as m2:
m2.return_value = {user: {
"system": {
"pithos.diskspace": {
"usage": 0,
"limit": 1073741824, # 1GB
"pending": 0
}
}
}
}
......
......@@ -294,6 +294,7 @@ def account_meta(request, v_account):
getattr(request, 'token', None), groups[k])
policy = request.backend.get_account_policy(
request.user_uniq, v_account)
logger.debug(policy)
except NotAllowedError:
raise faults.Forbidden('Not allowed')
......@@ -503,6 +504,8 @@ def container_create(request, v_account, v_container):
raise faults.ItemNotFound('Container does not exist')
except ValueError:
raise faults.BadRequest('Invalid policy header')
except QuotaError, e:
raise faults.RequestEntityTooLarge('Quota error: %s' % e)
if meta:
try:
request.backend.update_container_meta(request.user_uniq, v_account,
......@@ -540,6 +543,8 @@ def container_update(request, v_account, v_container):
raise faults.ItemNotFound('Container does not exist')
except ValueError:
raise faults.BadRequest('Invalid policy header')
except QuotaError, e:
raise faults.RequestEntityTooLarge('Quota error: %s' % e)
if meta or replace:
try:
request.backend.update_container_meta(request.user_uniq, v_account,
......
......@@ -112,8 +112,8 @@ class BaseBackend(object):
"""
return []
def get_account_meta(self, user, account, domain, until=None,
include_user_defined=True, external_quota=None):
def get_account_meta(self, user, account, domain=None, until=None,
include_user_defined=True):
"""Return a dictionary with the account metadata for the domain.
The keys returned are all user-defined, except:
......@@ -127,11 +127,10 @@ class BaseBackend(object):
'until_timestamp': Last modification until the timestamp provided
'external_quota': The quota computed from external quota holder
mechanism
Raises:
NotAllowedError: Operation not permitted
ValueError: if domain is None and include_user_defined==True
"""
return {}
......@@ -242,7 +241,7 @@ class BaseBackend(object):
"""
return []
def get_container_meta(self, user, account, container, domain, until=None,
def get_container_meta(self, user, account, container, domain=None, until=None,
include_user_defined=True):
"""Return a dictionary with the container metadata for the domain.
......@@ -261,6 +260,8 @@ class BaseBackend(object):
NotAllowedError: Operation not permitted
ItemNotExists: Container does not exist
ValueError: if domain is None and include_user_defined==True
"""
return {}
......@@ -411,7 +412,7 @@ class BaseBackend(object):
"""Return a mapping of object paths to public ids under a container."""
return {}
def get_object_meta(self, user, account, container, name, domain,
def get_object_meta(self, user, account, container, name, domain=None,
version=None, include_user_defined=True):
"""Return a dictionary with the object metadata for the domain.
......@@ -444,6 +445,8 @@ class BaseBackend(object):
ItemNotExists: Container/object does not exist
VersionNotExists: Version does not exist
ValueError: if domain is None and include_user_defined==True
"""
return {}
......
"""Set container quota source
Revision ID: 4451e165da19
Revises: 3b62b3f1bf6c
Create Date: 2013-09-27 13:36:27.477141
"""
# revision identifiers, used by Alembic.
revision = '4451e165da19'
down_revision = '54dbdde2d187'
from alembic import op
import sqlalchemy as sa
from sqlalchemy.sql import table, column, select
ROOTNODE = 0
def upgrade():
connection = op.get_bind()
nodes = table('nodes',
column('path', sa.String(2048)),
column('node', sa.Integer),
column('parent', sa.Integer))
n1 = nodes.alias('n1')
n2 = nodes.alias('n2')
policy = table('policy',
column('node', sa.Integer),
column('key', sa.String(128)),
column('value', sa.String(256)))
s = select([n2.c.node, n1.c.path])
s = s.where(n2.c.parent == n1.c.node)
s = s.where(n1.c.parent == ROOTNODE)
s = s.where(n1.c.node != ROOTNODE)
r = connection.execute(s)
rows = r.fetchall()
op.bulk_insert(policy, [{'node': node,
'key': 'project',
'value': path} for node, path in rows])
def downgrade():
pass
......@@ -278,6 +278,22 @@ class Node(DBWorker):
r.close()
return l
def node_get_parent_path(self, node):
"""Return the node's parent path.
Return None if the node is not found.
"""
n1 = self.nodes.alias('n1')
n2 = self.nodes.alias('n2')
s = select([n2.c.path])
s = s.where(n2.c.node == n1.c.parent)
s = s.where(n1.c.node == node)
r = self.conn.execute(s)
l = r.fetchone()
r.close()
return l[0] if l is not None else None
def node_get_versions(self, node, keys=(), propnames=_propnames):
"""Return the properties of all versions at node.
If keys is empty, return all properties in the order
......
......@@ -243,6 +243,17 @@ class Node(DBWorker):
self.execute(q, (node,))
return self.fetchone()
def node_get_parent_path(self, node):
"""Return the node's parent path.
Return None if the node is not found.
"""
q = ("select path from nodes as n1, nodes as n2 "
"where n2.node = n1.parent and n1.node = ?")
self.execute(q, (node,))
l = self.fetchone()
return l[0] if l is not None else None
def node_get_versions(self, node, keys=(), propnames=_propnames):
"""Return the properties of all versions at node.
If keys is empty, return all properties in the order
......
......@@ -116,11 +116,14 @@ QUEUE_INSTANCE_ID = '1'
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
QUOTA_POLICY = 'quota'
VERSIONING_POLICY = 'versioning'
PROJECT = 'project'
inf = float('inf')
ULTIMATE_ANSWER = 42
DEFAULT_SOURCE = 'system'
DEFAULT_DISKSPACE_RESOURCE = 'pithos.diskspace'
logger = logging.getLogger(__name__)
......@@ -245,10 +248,11 @@ class ModularBackend(BaseBackend):
container_versioning_policy = container_versioning_policy \
or DEFAULT_CONTAINER_VERSIONING
self.default_account_policy = {'quota': account_quota_policy}
self.default_account_policy = {}
self.default_container_policy = {
'quota': container_quota_policy,
'versioning': container_versioning_policy
QUOTA_POLICY: container_quota_policy,
VERSIONING_POLICY: container_versioning_policy,
PROJECT: None
}
#queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
#queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
......@@ -390,23 +394,9 @@ class ModularBackend(BaseBackend):
return self._allowed_accounts(user)
def _get_account_quotas(self, account):
"""Get account usage from astakos."""
quotas = self.astakosclient.service_get_quotas(account)[account]
return quotas.get(DEFAULT_SOURCE, {}).get(DEFAULT_DISKSPACE_RESOURCE,
{})
def _get_account_quotas(self, account):
"""Get account usage from astakos."""
quotas = self.astakosclient.service_get_quotas(account)[account]
return quotas.get(DEFAULT_SOURCE, {}).get(DEFAULT_DISKSPACE_RESOURCE,
{})
@debug_method
@backend_method
def get_account_meta(self, user, account, domain, until=None,
def get_account_meta(self, user, account, domain=None, until=None,
include_user_defined=True):
"""Return a dictionary with the account metadata for the domain."""
......@@ -435,14 +425,20 @@ class ModularBackend(BaseBackend):
else:
meta = {}
if props is not None and include_user_defined:
if domain is None:
raise ValueError(
'Domain argument is obligatory for getting '
'user defined metadata')
meta.update(
dict(self.node.attribute_get(props[self.SERIAL], domain)))
if until is not None:
meta.update({'until_timestamp': tstamp})
meta.update({'name': account, 'count': count, 'bytes': bytes})
if self.using_external_quotaholder:
external_quota = self._get_account_quotas(account)
meta['bytes'] = external_quota.get('usage', 0)
external_quota = self.astakosclient.service_get_quotas(
account)[account]
meta['bytes'] = sum(d['pithos.diskspace']['usage'] for d in
external_quota.values())
meta.update({'modified': modified})
return meta
......@@ -494,8 +490,12 @@ class ModularBackend(BaseBackend):
path, node = self._lookup_account(account, True)
policy = self._get_policy(node, is_account_policy=True)
if self.using_external_quotaholder:
external_quota = self._get_account_quotas(account)
policy['quota'] = external_quota.get('limit', 0)
external_quota = self.astakosclient.service_get_quotas(
account)[account]
policy.update(dict(('%s-%s' % (QUOTA_POLICY, k),
v['pithos.diskspace']['limit']) for k, v in
external_quota.items()))
return policy
@debug_method
......@@ -505,8 +505,8 @@ class ModularBackend(BaseBackend):
self._can_write_account(user, account)
path, node = self._lookup_account(account, True)
self._check_policy(policy, is_account_policy=True)
self._put_policy(node, policy, replace, is_account_policy=True)
self._put_policy(node, policy, replace, is_account_policy=True,
check=True)
@debug_method
@backend_method
......@@ -518,11 +518,10 @@ class ModularBackend(BaseBackend):
node = self.node.node_lookup(account)
if node is not None:
raise AccountExists('Account already exists')
if policy:
self._check_policy(policy, is_account_policy=True)
node = self._put_path(user, self.ROOTNODE, account,
update_statistics_ancestors_depth=-1)
self._put_policy(node, policy, True, is_account_policy=True)
self._put_policy(node, policy, True, is_account_policy=True,
check=True if policy else False)
@debug_method
@backend_method
......@@ -586,8 +585,8 @@ class ModularBackend(BaseBackend):
@debug_method
@backend_method
def get_container_meta(self, user, account, container, domain, until=None,
include_user_defined=True):
def get_container_meta(self, user, account, container, domain=None,
until=None, include_user_defined=True):
"""Return a dictionary with the container metadata for the domain."""
self._can_read_container(user, account, container)
......@@ -611,6 +610,10 @@ class ModularBackend(BaseBackend):
else:
meta = {}
if include_user_defined:
if domain is None:
raise ValueError(
'Domain argument is obligatory for getting '
'user defined metadata')
meta.update(
dict(self.node.attribute_get(props[self.SERIAL], domain)))
if until is not None:
......@@ -632,7 +635,7 @@ class ModularBackend(BaseBackend):
update_statistics_ancestors_depth=0)
if src_version_id is not None:
versioning = self._get_policy(
node, is_account_policy=False)['versioning']
node, is_account_policy=False)[VERSIONING_POLICY]
if versioning != 'auto':
self.node.version_remove(src_version_id,
update_statistics_ancestors_depth=0)
......@@ -656,8 +659,24 @@ class ModularBackend(BaseBackend):
self._can_write_container(user, account, container)
path, node = self._lookup_container(account, container)
self._check_policy(policy, is_account_policy=False)
self._put_policy(node, policy, replace, is_account_policy=False)
if PROJECT in policy:
project = self._get_project(node)
try:
serial = self.astakosclient.issue_resource_reassignment(
holder=account,
from_source=project,
to_source=policy[PROJECT],
provisions={'pithos.diskspace': self.get_container_meta(
user, account, container,
include_user_defined=False)['bytes']})
except BaseException, e:
raise QuotaError(e)
else:
self.serials.append(serial)
self._put_policy(node, policy, replace, is_account_policy=False,
default_project=account, check=True)
@debug_method
@backend_method
......@@ -672,13 +691,13 @@ class ModularBackend(BaseBackend):
pass
else:
raise ContainerExists('Container already exists')
if policy:
self._check_policy(policy, is_account_policy=False)
path = '/'.join((account, container))
node = self._put_path(
user, self._lookup_account(account, True)[1], path,
update_statistics_ancestors_depth=-1)
self._put_policy(node, policy, True, is_account_policy=False)
self._put_policy(node, policy, True, is_account_policy=False,
default_project=account,
check=True if policy else False)
@debug_method
@backend_method
......@@ -688,6 +707,7 @@ class ModularBackend(BaseBackend):
self._can_write_container(user, account, container)
path, node = self._lookup_container(account, container)
project = self._get_project(node)
if until is not None:
hashes, size, serials = self.node.node_purge_children(
......@@ -699,7 +719,7 @@ class ModularBackend(BaseBackend):
update_statistics_ancestors_depth=0)
if not self.free_versioning:
self._report_size_change(
user, account, -size, {
user, account, -size, project, {
'action': 'container purge',
'path': path,
'versions': ','.join(str(i) for i in serials)
......@@ -720,7 +740,7 @@ class ModularBackend(BaseBackend):
self.node.node_remove(node, update_statistics_ancestors_depth=0)
if not self.free_versioning:
self._report_size_change(
user, account, -size, {
user, account, -size, project, {
'action': 'container purge',
'path': path,
'versions': ','.join(str(i) for i in serials)
......@@ -746,7 +766,7 @@ class ModularBackend(BaseBackend):
account, container, src_version_id,
update_statistics_ancestors_depth=1)
self._report_size_change(
user, account, -del_size, {
user, account, -del_size, project, {
'action': 'object delete',
'path': path,
'versions': ','.join([str(dest_version_id)])})
......@@ -915,7 +935,7 @@ class ModularBackend(BaseBackend):
@debug_method
@backend_method
def get_object_meta(self, user, account, container, name, domain,
def get_object_meta(self, user, account, container, name, domain=None,
version=None, include_user_defined=True):
"""Return a dictionary with the object metadata for the domain."""
......@@ -937,6 +957,10 @@ class ModularBackend(BaseBackend):
meta = {}
if include_user_defined:
if domain is None:
raise ValueError(
'Domain argument is obligatory for getting '
'user defined metadata')
meta.update(
dict(self.node.attribute_get(props[self.SERIAL], domain)))
meta.update({'name': name,
......@@ -1095,6 +1119,7 @@ class ModularBackend(BaseBackend):
account_path, account_node = self._lookup_account(account, True)
container_path, container_node = self._lookup_container(
account, container)
project = self._get_project(container_node)
path, node = self._put_object_node(
container_path, container_node, name)
......@@ -1116,7 +1141,7 @@ class ModularBackend(BaseBackend):
# Check account quota.
if not self.using_external_quotaholder:
account_quota = long(self._get_policy(
account_node, is_account_policy=True)['quota'])
account_node, is_account_policy=True)[QUOTA_POLICY])
account_usage = self._get_statistics(account_node,
compute=True)[1]
if (account_quota > 0 and account_usage > account_quota):
......@@ -1126,7 +1151,7 @@ class ModularBackend(BaseBackend):
# Check container quota.
container_quota = long(self._get_policy(
container_node, is_account_policy=False)['quota'])
container_node, is_account_policy=False)[QUOTA_POLICY])
container_usage = self._get_statistics(container_node)[1]
if (container_quota > 0 and container_usage > container_quota):
# This must be executed in a transaction, so the version is
......@@ -1139,7 +1164,7 @@ class ModularBackend(BaseBackend):
if report_size_change:
self._report_size_change(
user, account, size_delta,
user, account, size_delta, project,
{'action': 'object update', 'path': path,
'versions': ','.join([str(dest_version_id)])})
if permissions is not None:
......@@ -1312,9 +1337,11 @@ class ModularBackend(BaseBackend):
if user != account:
raise NotAllowedError
# lookup object and lock container path also
path, node = self._lookup_object(account, container, name,
lock_container=True)
# lock container path
container_path, container_node = self._lookup_container(account,
container)
project = self._get_project(container_node)
path, node = self._lookup_object(account, container, name)
if until is not None:
if node is None:
......@@ -1342,7 +1369,7 @@ class ModularBackend(BaseBackend):
except NameError:
self.permissions.access_clear(path)
self._report_size_change(
user, account, -size, {
user, account, -size, project, {
'action': 'object purge',
'path': path,
'versions': ','.join(str(i) for i in serials)
......@@ -1360,7 +1387,7 @@ class ModularBackend(BaseBackend):
update_statistics_ancestors_depth=1)
if report_size_change:
self._report_size_change(
user, account, -del_size,
user, account, -del_size, project,
{'action': 'object delete',
'path': path,
'versions': ','.join([str(dest_version_id)])})
......@@ -1389,7 +1416,7 @@ class ModularBackend(BaseBackend):
update_statistics_ancestors_depth=1)
if report_size_change:
self._report_size_change(
user, account, -del_size,
user, account, -del_size, project,
{'action': 'object delete',
'path': path,
'versions': ','.join([str(dest_version_id)])})
......@@ -1677,7 +1704,7 @@ class ModularBackend(BaseBackend):
@debug_method
@backend_method
def _report_size_change(self, user, account, size, details=None):
def _report_size_change(self, user, account, size, source, details=None):
details = details or {}
if size == 0:
......@@ -1697,7 +1724,7 @@ class ModularBackend(BaseBackend):
name = details['path'] if 'path' in details else ''
serial = self.astakosclient.issue_one_commission(
holder=account,
source=DEFAULT_SOURCE,
source=source,
provisions={'pithos.diskspace': size},
name=name)
except BaseException, e:
......@@ -1725,39 +1752,64 @@ class ModularBackend(BaseBackend):
# Policy functions.
def _check_policy(self, policy, is_account_policy=True):
default_policy = self.default_account_policy \
if is_account_policy else self.default_container_policy
for k in policy.keys():
if policy[k] == '':
policy[k] = default_policy.get(k)
def _check_project(self, value):
# raise ValueError('Bad quota source policy')
pass
def _check_policy(self, policy):
for k, v in policy.iteritems():
if k == 'quota':
if k == QUOTA_POLICY:
q = int(v) # May raise ValueError.
if q < 0:
raise ValueError
elif k == 'versioning':
elif k == VERSIONING_POLICY:
if v not in ['auto', 'none']:
raise ValueError
elif k == PROJECT:
self._check_project(v)
else:
raise ValueError
def _put_policy(self, node, policy, replace, is_account_policy=True):
default_policy = self.default_account_policy \
if is_account_policy else self.default_container_policy
def _get_default_policy(self, node=None, is_account_policy=True,
default_project=None):