Commit 2d1388ab authored by Sofia Papagiannaki's avatar Sofia Papagiannaki
Browse files

pithos: Fix shared listing for implicitly shared objects

Provide also the appropriate tests

Refs: #4131
parent 8b47bf37
......@@ -198,7 +198,8 @@ def authenticate(request):
return response
@api_method('GET', format_allowed=True, user_required=True, logger=logger)
@api_method('GET', format_allowed=True, user_required=True, logger=logger,
serializations=["text", "xml", "json"])
def account_list(request):
# Normal Response Codes: 200, 204
# Error Response Codes: internalServerError (500),
......
......@@ -174,18 +174,21 @@ class PithosAPITest(TestCase):
def head(self, url, user='user', data={}, follow=False, **extra):
with astakos_user(user):
extra = dict((quote(k), quote(v)) for k, v in extra.items())
extra.setdefault('HTTP_X_AUTH_TOKEN', 'token')
response = self.client.head(url, data, follow, **extra)
return response
def get(self, url, user='user', data={}, follow=False, **extra):
with astakos_user(user):
extra = dict((quote(k), quote(v)) for k, v in extra.items())
extra.setdefault('HTTP_X_AUTH_TOKEN', 'token')
response = self.client.get(url, data, follow, **extra)
return response
def delete(self, url, user='user', data={}, follow=False, **extra):
with astakos_user(user):
extra = dict((quote(k), quote(v)) for k, v in extra.items())
extra.setdefault('HTTP_X_AUTH_TOKEN', 'token')
response = self.client.delete(url, data, follow, **extra)
return response
......@@ -193,6 +196,7 @@ class PithosAPITest(TestCase):
content_type='application/octet-stream', follow=False, **extra):
with astakos_user(user):
extra = dict((quote(k), quote(v)) for k, v in extra.items())
extra.setdefault('HTTP_X_AUTH_TOKEN', 'token')
response = self.client.post(url, data, content_type, follow,
**extra)
return response
......@@ -201,6 +205,7 @@ class PithosAPITest(TestCase):
content_type='application/octet-stream', follow=False, **extra):
with astakos_user(user):
extra = dict((quote(k), quote(v)) for k, v in extra.items())
extra.setdefault('HTTP_X_AUTH_TOKEN', 'token')
response = self.client.put(url, data, content_type, follow,
**extra)
return response
......@@ -218,7 +223,8 @@ class PithosAPITest(TestCase):
def delete_account_meta(self, meta, user=None):
user = user or self.user
transform = lambda k: 'HTTP_X_ACCOUNT_META_%s' % k.replace('-', '_').upper()
transform = (lambda k: 'HTTP_X_ACCOUNT_META_%s' %
k.replace('-', '_').upper())
kwargs = dict((transform(k), '') for k, v in meta.items())
url = join_urls(self.pithos_path, user)
r = self.post('%s?update=' % url, user=user, **kwargs)
......@@ -230,7 +236,8 @@ class PithosAPITest(TestCase):
def delete_account_groups(self, groups, user=None):
user = user or self.user
url = join_urls(self.pithos_path, user)
transform = lambda k: 'HTTP_X_ACCOUNT_GROUP_%s' % k.replace('-', '_').upper()
transform = (lambda k: 'HTTP_X_ACCOUNT_GROUP_%s' %
k.replace('-', '_').upper())
kwargs = dict((transform(k), '') for k, v in groups.items())
r = self.post('%s?update=' % url, user=user, **kwargs)
self.assertEqual(r.status_code, 202)
......@@ -289,7 +296,7 @@ class PithosAPITest(TestCase):
url = join_urls(self.pithos_path, user, container)
r = self.post('%s?update=' % url, user=user, **kwargs)
self.assertEqual(r.status_code, 202)
container_meta = self.get_container_meta(container)
container_meta = self.get_container_meta(container, user=user)
(self.assertTrue('X-Container-Meta-%s' % k in container_meta) for
k in meta.keys())
(self.assertEqual(container_meta['X-Container-Meta-%s' % k], v) for
......@@ -437,7 +444,7 @@ class PithosAPITest(TestCase):
url = join_urls(self.pithos_path, user, container, object)
r = self.post('%s?update=' % url, user=user, content_type='', **kwargs)
self.assertEqual(r.status_code, 202)
object_meta = self.get_object_meta(container, object)
object_meta = self.get_object_meta(container, object, user=user)
(self.assertTrue('X-Objecr-Meta-%s' % k in object_meta) for
k in meta.keys())
(self.assertEqual(object_meta['X-Object-Meta-%s' % k], v) for
......
# Copyright 2011-2012 GRNET S.A. All rights reserved.
#
# Redistribution and use in source and binary forms, with or
# without modification, are permitted provided that the following
# conditions are met:
#
# 1. Redistributions of source code must retain the above
# copyright notice, this list of conditions and the following
# disclaimer.
#
# 2. Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials
# provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# The views and conclusions contained in the software and
# documentation are those of the authors and should not be
# interpreted as representing official policies, either expressed
# or implied, of GRNET S.A.
from pithos.api.test import PithosAPITest
from synnefo.lib import join_urls
import django.utils.simplejson as json
class ListSharing(PithosAPITest):
def _build_structure(self, user=None):
user = user or self.user
for i in range(2):
cname = 'c%d' % i
self.create_container(cname, user=user)
self.upload_object(cname, 'obj', user=user)
self.create_folder(cname, 'f1', user=user)
self.create_folder(cname, 'f1/f2', user=user)
self.upload_object(cname, 'f1/f2/obj', user=user)
# share /c0/f1 path
url = join_urls(self.pithos_path, user, 'c0', 'f1')
r = self.post(url, user=user, content_type='',
HTTP_CONTENT_RANGE='bytes */*',
HTTP_X_OBJECT_SHARING='read=*')
self.assertEqual(r.status_code, 202)
r = self.get(url)
def test_list_share_with_me(self):
self._build_structure('alice')
url = join_urls(self.pithos_path)
r = self.get(url)
self.assertEqual(r.status_code, 200)
allowed_accounts = r.content.split('\n')
if '' in allowed_accounts:
allowed_accounts.remove('')
self.assertEqual(allowed_accounts, ['alice'])
r = self.get('%s?format=json' % url)
self.assertEqual(r.status_code, 200)
allowed_accounts = json.loads(r.content)
self.assertEqual([i['name'] for i in allowed_accounts], ['alice'])
url = join_urls(url, 'alice')
r = self.get(url)
self.assertEqual(r.status_code, 200)
allowed_containers = r.content.split('\n')
if '' in allowed_containers:
allowed_containers.remove('')
self.assertEqual(allowed_containers, ['c0'])
r = self.get('%s?format=json' % url)
self.assertEqual(r.status_code, 200)
allowed_containers = json.loads(r.content)
self.assertEqual([i['name'] for i in allowed_containers], ['c0'])
url = join_urls(url, 'c0')
r = self.get('%s?delimiter=/&shared=&format=json' % url)
self.assertEqual(r.status_code, 200)
shared_objects = [i.get('name', i.get('subdir')) for i in
json.loads(r.content)]
self.assertEqual(shared_objects, ['f1', 'f1/'])
r = self.get('%s?delimiter=/&prefix=f1&shared=&format=json' % url)
self.assertEqual(r.status_code, 200)
shared_objects = [i.get('name', i.get('subdir')) for i in
json.loads(r.content)]
self.assertEqual(shared_objects, ['f1/f2', 'f1/f2/'])
r = self.get('%s?delimiter=/&prefix=f1/f2&shared=&format=json' % url)
self.assertEqual(r.status_code, 200)
shared_objects = [i.get('name', i.get('subdir')) for i in
json.loads(r.content)]
self.assertEqual(shared_objects, ['f1/f2/obj'])
def test_list_shared_by_me(self):
self._build_structure()
url = join_urls(self.pithos_path, self.user)
r = self.get('%s?shared=' % url)
self.assertEqual(r.status_code, 200)
shared_containers = r.content.split('\n')
if '' in shared_containers:
shared_containers.remove('')
self.assertEqual(shared_containers, ['c0'])
r = self.get('%s?shared=&format=json' % url)
self.assertEqual(r.status_code, 200)
shared_containers = json.loads(r.content)
self.assertEqual([i['name'] for i in shared_containers], ['c0'])
url = join_urls(url, 'c0')
r = self.get('%s?delimiter=/&shared=&format=json' % url)
self.assertEqual(r.status_code, 200)
shared_objects = [i.get('name', i.get('subdir')) for i in
json.loads(r.content)]
self.assertEqual(shared_objects, ['f1', 'f1/'])
r = self.get('%s?delimiter=/&prefix=f1&shared=&format=json' % url)
self.assertEqual(r.status_code, 200)
shared_objects = [i.get('name', i.get('subdir')) for i in
json.loads(r.content)]
self.assertEqual(shared_objects, ['f1/f2', 'f1/f2/'])
r = self.get('%s?delimiter=/&prefix=f1/f2&shared=&format=json' % url)
self.assertEqual(r.status_code, 200)
shared_objects = [i.get('name', i.get('subdir')) for i in
json.loads(r.content)]
self.assertEqual(shared_objects, ['f1/f2/obj'])
......@@ -39,3 +39,4 @@ from pithos.api.test.permissions import *
from pithos.api.test.public import *
from pithos.api.test.views import *
from pithos.api.test.unicode import *
from pithos.api.test.listing import *
......@@ -184,9 +184,10 @@ class Permissions(XFeatures, Groups, Public, Node):
self.xfeaturevals.c.value == u.c.value)
s = select([self.xfeatures.c.path], from_obj=[inner_join]).distinct()
if prefix:
s = s.where(self.xfeatures.c.path.like(
self.escape_like(prefix) + '%', escape=ESCAPE_CHAR
))
like = lambda p: self.xfeatures.c.path.like(
self.escape_like(p) + '%', escape=ESCAPE_CHAR)
s = s.where(or_(*map(like,
self.access_inherit(prefix) or [prefix])))
r = self.conn.execute(s)
l = [row[0] for row in r.fetchall()]
r.close()
......@@ -208,11 +209,11 @@ class Permissions(XFeatures, Groups, Public, Node):
def access_list_shared(self, prefix=''):
"""Return the list of shared paths."""
s = select([self.xfeatures.c.path],
self.xfeatures.c.path.like(
self.escape_like(prefix) + '%',
escape=ESCAPE_CHAR)).order_by(
self.xfeatures.c.path.asc())
s = select([self.xfeatures.c.path])
like = lambda p: self.xfeatures.c.path.like(
self.escape_like(p) + '%', escape=ESCAPE_CHAR)
s = s.where(or_(*map(like, self.access_inherit(prefix) or [prefix])))
s = s.order_by(self.xfeatures.c.path.asc())
r = self.conn.execute(s)
l = [row[0] for row in r.fetchall()]
r.close()
......
......@@ -173,8 +173,10 @@ class Permissions(XFeatures, Groups, Public, Node):
"using (feature_id)")
p = (member, member)
if prefix:
q += " where path like ? escape '\\'"
p += (self.escape_like(prefix) + '%',)
q += " where"
for path in self.access_inherit(prefix) or [prefix]:
q += " path like ? escape '\\'"
p += (self.escape_like(path) + '%',)
self.execute(q, p)
l = [r[0] for r in self.fetchall()]
......@@ -194,6 +196,10 @@ class Permissions(XFeatures, Groups, Public, Node):
def access_list_shared(self, prefix=''):
"""Return the list of shared paths."""
q = "select path from xfeatures where path like ? escape '\\'"
self.execute(q, (self.escape_like(prefix) + '%',))
q = "select path from xfeatures where"
p = []
for path in self.access_inherit(prefix) or [prefix]:
q += " path like ? escape '\\'"
p += (self.escape_like(path) + '%',)
self.execute(q, p)
return [r[0] for r in self.fetchall()]
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment