Commit 4172d729 authored by Sofia Papagiannaki's avatar Sofia Papagiannaki
Browse files

pithos: Update view tests

parent 01dc82e0
......@@ -37,6 +37,7 @@
from urlparse import urlunsplit, urlsplit, urlparse
from xml.dom import minidom
from urllib import quote, unquote
from mock import patch, PropertyMock
from snf_django.utils.testing import with_settings, astakos_user
......@@ -46,7 +47,9 @@ from pithos.backends.migrate import initialize_db
from synnefo.lib.services import get_service_path
from synnefo.lib import join_urls
from synnefo.util import text
from django.core.urlresolvers import reverse
from django.test import TestCase
from django.test.client import Client, MULTIPART_CONTENT, FakePayload
from django.test.simple import DjangoTestSuiteRunner
......@@ -56,6 +59,7 @@ from django.db.backends.creation import TEST_DATABASE_PREFIX
import django.utils.simplejson as json
import random
import functools
......@@ -202,6 +206,12 @@ class PithosTestClient(Client):
class PithosAPITest(TestCase):
def create_patch(self, name, new_callable=None):
patcher = patch(name, new_callable=new_callable)
thing = patcher.start()
self.addCleanup(patcher.stop)
return thing
def setUp(self):
self.client = PithosTestClient()
......@@ -213,6 +223,29 @@ class PithosAPITest(TestCase):
self.pithos_path = join_urls(get_service_path(
pithos_settings.pithos_services, 'object-store'))
# patch astakosclient.AstakosClient.validate_token
mock_validate_token = self.create_patch(
'astakosclient.AstakosClient.validate_token')
mock_validate_token.return_value = {
'access': {'user': {'id': text.udec(self.user, 'utf8')}}}
# patch astakosclient.AstakosClient.get_token
mock_get_token = self.create_patch(
'astakosclient.AstakosClient.get_token')
mock_get_token.return_value = {'access_token': 'valid_token'}
# patch astakosclient.AstakosClient.api_oa2_auth
mock_api_oa2_auth = self.create_patch(
'astakosclient.AstakosClient.api_oa2_auth',
new_callable=PropertyMock)
mock_api_oa2_auth.return_value = reverse('oa2_authenticate')
# patch astakosclient.AstakosClient.api_oa2_token
mock_api_oa2_token = self.create_patch(
'astakosclient.AstakosClient.api_oa2_token',
new_callable=PropertyMock)
mock_api_oa2_token.return_value = reverse('oa2_token')
def tearDown(self):
#delete additionally created metadata
meta = self.get_account_meta()
......
......@@ -42,49 +42,31 @@ from pithos.api.test.objects import merkle
from synnefo.lib.services import get_service_path
from synnefo.lib import join_urls
from mock import patch
#from mock import patch
from urllib import quote
from urlparse import urlsplit, parse_qs
import django.utils.simplejson as json
from django.core.urlresolvers import reverse
import re
import datetime
import time as _time
import random
import urllib
import urlparse
class NotAllowedView(PithosAPITest):
def head(self, url, *args, **kwargs):
with patch("pithos.api.util.get_token_from_cookie") as m:
m.return_value = 'token'
return super(NotAllowedView, self).head(url, *args, **kwargs)
def delete(self, url, *args, **kwargs):
with patch("pithos.api.util.get_token_from_cookie") as m:
m.return_value = 'token'
return super(NotAllowedView, self).delete(url, *args, **kwargs)
def post(self, url, *args, **kwargs):
with patch("pithos.api.util.get_token_from_cookie") as m:
m.return_value = 'token'
return super(NotAllowedView, self).post(url, *args, **kwargs)
def put(self, url, *args, **kwargs):
with patch("pithos.api.util.get_token_from_cookie") as m:
m.return_value = 'token'
return super(NotAllowedView, self).put(url, *args, **kwargs)
def copy(self, url, *args, **kwargs):
with patch("pithos.api.util.get_token_from_cookie") as m:
m.return_value = 'token'
return super(NotAllowedView, self).copy(url, *args, **kwargs)
def move(self, url, *args, **kwargs):
with patch("pithos.api.util.get_token_from_cookie") as m:
m.return_value = 'token'
return super(NotAllowedView, self).move(url, *args, **kwargs)
def add_url_params(url, **kwargs):
if not kwargs:
return url
parts = list(urlparse.urlsplit(url))
params = dict(urlparse.parse_qsl(parts[3], keep_blank_values=True))
params.update(kwargs)
parts[3] = urllib.urlencode(params)
return urlparse.urlunsplit(parts)
class NotAllowedView(PithosAPITest):
def test_not_allowed(self):
self.view_path = join_urls(get_service_path(
pithos_settings.pithos_services, 'pithos_ui'), 'view')
......@@ -93,8 +75,6 @@ class NotAllowedView(PithosAPITest):
r = self.head(self.view_url)
self.assertEqual(r.status_code, 405)
self.assertTrue('Allow' in r)
self.assertEqual(r['Allow'], 'GET')
r = self.delete(self.view_url)
self.assertEqual(r.status_code, 405)
......@@ -135,21 +115,68 @@ class ObjectGetView(PithosAPITest):
self.api_url = join_urls(self.pithos_path, self.user, self.cname,
self.oname)
def get(self, url, user='user', *args, **kwargs):
with patch("pithos.api.util.get_token_from_cookie") as m:
m.return_value = 'token'
return super(ObjectGetView, self).get(url, user='user', *args,
**kwargs)
def view(self, url, access_token='valid_token', user='user', *args,
**kwargs):
params = {}
if access_token is not None:
params['access_token'] = access_token
return self.get(add_url_params(url, **params), user, *args,
**kwargs)
def test_no_cookie_redirect(self):
r = super(ObjectGetView, self).get(self.view_url)
def test_no_authorization_granted(self):
r = self.get(self.view_url)
self.assertEqual(r.status_code, 302)
self.assertTrue('Location' in r)
parts = list(urlsplit(r['Location']))
qs = parse_qs(parts[3])
self.assertTrue('next' in qs)
self.assertEqual(qs['next'][0], join_urls(pithos_settings.BASE_HOST,
self.view_url))
p = urlparse.urlparse(r['Location'])
self.assertEqual(p.netloc, 'testserver')
self.assertEqual(p.path, reverse('oa2_authenticate'))
r = self.get(add_url_params(self.view_url, code='valid_code'),
follow=True)
self.assertEqual(r.status_code, 200)
self.assertTrue(r.content, self.odata)
intermidiate_url = r.redirect_chain[0][0]
p = urlparse.urlparse(intermidiate_url)
params = urlparse.parse_qs(p.query)
self.assertTrue('access_token' in params)
r = self.get(add_url_params(self.view_url, access_token='valid_token'))
self.assertEqual(r.status_code, 200)
self.assertTrue(r.content, self.odata)
def test_forbidden(self):
container = self.create_container(user='alice')[0]
obj = self.upload_object(container, user='alice')[0]
url = join_urls(self.view_path, 'alice', container, obj)
r = self.view(url)
self.assertEqual(r.status_code, 403)
def test_shared_with_me(self):
container = self.create_container(user='alice')[0]
obj, data = self.upload_object(container, user='alice')[:-1]
# share object
url = join_urls(self.pithos_path, 'alice', container, obj)
self.post(url, user='alice', content_type='',
HTTP_CONTENT_RANGE='bytes */*',
HTTP_X_OBJECT_SHARING='read=user')
url = join_urls(self.view_path, 'alice', container, obj)
r = self.view(url)
self.assertEqual(r.status_code, 200)
self.assertEqual(r.content, data)
def test_view(self):
r = self.view(self.view_url)
self.assertEqual(r.status_code, 200)
self.assertEqual(r.content, self.odata)
def test_not_existing(self):
url = self.view_url[:-1]
r = self.view(url)
self.assertEqual(r.status_code, 404)
def test_versions(self):
c = self.cname
......@@ -159,7 +186,7 @@ class ObjectGetView(PithosAPITest):
r = self.post(self.api_url, content_type='', **meta)
self.assertEqual(r.status_code, 202)
r = self.get('%s?version=list&format=json' % self.view_url)
r = self.view('%s?version=list&format=json' % self.view_url)
self.assertEqual(r.status_code, 200)
l1 = json.loads(r.content)['versions']
self.assertEqual(len(l1), 2)
......@@ -171,7 +198,7 @@ class ObjectGetView(PithosAPITest):
self.assertEqual(r.status_code, 202)
# assert a newly created version has been created
r = self.get('%s?version=list&format=json' % self.view_url)
r = self.view('%s?version=list&format=json' % self.view_url)
self.assertEqual(r.status_code, 200)
l2 = json.loads(r.content)['versions']
self.assertEqual(len(l2), len(l1) + 1)
......@@ -185,7 +212,7 @@ class ObjectGetView(PithosAPITest):
self.append_object_data(c, o)
# assert a newly created version has been created
r = self.get('%s?version=list&format=json' % self.view_url)
r = self.view('%s?version=list&format=json' % self.view_url)
self.assertEqual(r.status_code, 200)
l3 = json.loads(r.content)['versions']
self.assertEqual(len(l3), len(l2) + 1)
......@@ -194,29 +221,29 @@ class ObjectGetView(PithosAPITest):
def test_objects_with_trailing_spaces(self):
cname = self.cname
r = self.get(quote('%s ' % self.view_url))
r = self.view(quote('%s ' % self.view_url))
self.assertEqual(r.status_code, 404)
# delete object
self.delete(self.api_url)
r = self.get(self.view_url)
r = self.view(self.view_url)
self.assertEqual(r.status_code, 404)
# upload object with trailing space
oname = self.upload_object(cname, quote('%s ' % get_random_name()))[0]
view_url = join_urls(self.view_path, self.user, cname, oname)
r = self.get(view_url)
r = self.view(view_url)
self.assertEqual(r.status_code, 200)
view_url = join_urls(self.view_path, self.user, cname, oname[:-1])
r = self.get(view_url)
r = self.view(view_url)
self.assertEqual(r.status_code, 404)
def test_get_partial(self):
limit = pithos_settings.BACKEND_BLOCK_SIZE + 1
r = self.get(self.view_url, HTTP_RANGE='bytes=0-%d' % limit)
r = self.view(self.view_url, HTTP_RANGE='bytes=0-%d' % limit)
self.assertEqual(r.status_code, 206)
self.assertEqual(r.content, self.odata[:limit + 1])
self.assertTrue('Content-Range' in r)
......@@ -227,17 +254,17 @@ class ObjectGetView(PithosAPITest):
def test_get_range_not_satisfiable(self):
# TODO
#r = self.get(self.view_url, HTTP_RANGE='bytes=50-10')
#r = self.view(self.view_url, HTTP_RANGE='bytes=50-10')
#self.assertEqual(r.status_code, 416)
offset = len(self.odata) + 1
r = self.get(self.view_url, HTTP_RANGE='bytes=0-%s' % offset)
r = self.view(self.view_url, HTTP_RANGE='bytes=0-%s' % offset)
self.assertEqual(r.status_code, 416)
def test_multiple_range(self):
l = ['0-499', '-500', '1000-']
ranges = 'bytes=%s' % ','.join(l)
r = self.get(self.view_url, HTTP_RANGE=ranges)
r = self.view(self.view_url, HTTP_RANGE=ranges)
self.assertEqual(r.status_code, 206)
self.assertTrue('content-type' in r)
p = re.compile(
......@@ -283,7 +310,7 @@ class ObjectGetView(PithosAPITest):
out_of_range = len(self.odata) + 1
l = ['0-499', '-500', '%d-' % out_of_range]
ranges = 'bytes=%s' % ','.join(l)
r = self.get(self.view_url, HTTP_RANGE=ranges)
r = self.view(self.view_url, HTTP_RANGE=ranges)
self.assertEqual(r.status_code, 416)
def test_get_if_match(self):
......@@ -292,7 +319,7 @@ class ObjectGetView(PithosAPITest):
else:
etag = merkle(self.odata)
r = self.get(self.view_url, HTTP_IF_MATCH=etag)
r = self.view(self.view_url, HTTP_IF_MATCH=etag)
# assert get success
self.assertEqual(r.status_code, 200)
......@@ -301,7 +328,7 @@ class ObjectGetView(PithosAPITest):
self.assertEqual(r.content, self.odata)
def test_get_if_match_star(self):
r = self.get(self.view_url, HTTP_IF_MATCH='*')
r = self.view(self.view_url, HTTP_IF_MATCH='*')
# assert get success
self.assertEqual(r.status_code, 200)
......@@ -316,7 +343,7 @@ class ObjectGetView(PithosAPITest):
etag = merkle(self.odata)
quoted = lambda s: '"%s"' % s
r = self.get(self.view_url, HTTP_IF_MATCH=','.join(
r = self.view(self.view_url, HTTP_IF_MATCH=','.join(
[quoted(etag), quoted(get_random_data(64))]))
# assert get success
......@@ -326,7 +353,7 @@ class ObjectGetView(PithosAPITest):
self.assertEqual(r.content, self.odata)
def test_if_match_precondition_failed(self):
r = self.get(self.view_url, HTTP_IF_MATCH=get_random_name())
r = self.view(self.view_url, HTTP_IF_MATCH=get_random_name())
self.assertEqual(r.status_code, 412)
def test_if_none_match(self):
......@@ -336,7 +363,7 @@ class ObjectGetView(PithosAPITest):
etag = merkle(self.odata)
# perform get with If-None-Match
r = self.get(self.view_url, HTTP_IF_NONE_MATCH=etag)
r = self.view(self.view_url, HTTP_IF_NONE_MATCH=etag)
# assert precondition_failed
self.assertEqual(r.status_code, 304)
......@@ -346,14 +373,14 @@ class ObjectGetView(PithosAPITest):
self.assertTrue(etag != r['ETag'])
# perform get with If-None-Match
r = self.get(self.view_url, HTTP_IF_NONE_MATCH=etag)
r = self.view(self.view_url, HTTP_IF_NONE_MATCH=etag)
# assert get success
self.assertEqual(r.status_code, 200)
def test_if_none_match_star(self):
# perform get with If-None-Match with star
r = self.get(self.view_url, HTTP_IF_NONE_MATCH='*')
r = self.view(self.view_url, HTTP_IF_NONE_MATCH='*')
self.assertEqual(r.status_code, 304)
def test_if_modified_since(self):
......@@ -365,7 +392,7 @@ class ObjectGetView(PithosAPITest):
# Check not modified since
for t in t1_formats:
r = self.get(self.view_url, HTTP_IF_MODIFIED_SINCE=t)
r = self.view(self.view_url, HTTP_IF_MODIFIED_SINCE=t)
self.assertEqual(r.status_code, 304)
_time.sleep(1)
......@@ -375,12 +402,12 @@ class ObjectGetView(PithosAPITest):
# Check modified since
for t in t1_formats:
r = self.get(self.view_url, HTTP_IF_MODIFIED_SINCE=t)
r = self.view(self.view_url, HTTP_IF_MODIFIED_SINCE=t)
self.assertEqual(r.status_code, 200)
self.assertEqual(r.content, self.odata + appended_data)
def test_if_modified_since_invalid_date(self):
r = self.get(self.view_url, HTTP_IF_MODIFIED_SINCE='Monday')
r = self.view(self.view_url, HTTP_IF_MODIFIED_SINCE='Monday')
self.assertEqual(r.status_code, 200)
self.assertEqual(r.content, self.odata)
......@@ -393,7 +420,7 @@ class ObjectGetView(PithosAPITest):
t1 = t + datetime.timedelta(seconds=1)
t1_formats = map(t1.strftime, DATE_FORMATS)
for t in t1_formats:
r = self.get(self.view_url, HTTP_IF_UNMODIFIED_SINCE=t)
r = self.view(self.view_url, HTTP_IF_UNMODIFIED_SINCE=t)
self.assertEqual(r.status_code, 200)
self.assertEqual(r.content, self.odata)
......@@ -409,7 +436,7 @@ class ObjectGetView(PithosAPITest):
# check modified
for t in t2_formats:
r = self.get(self.view_url, HTTP_IF_UNMODIFIED_SINCE=t)
r = self.view(self.view_url, HTTP_IF_UNMODIFIED_SINCE=t)
self.assertEqual(r.status_code, 412)
# modify account: update object meta
......@@ -424,7 +451,7 @@ class ObjectGetView(PithosAPITest):
# check modified
for t in t3_formats:
r = self.get(self.view_url, HTTP_IF_UNMODIFIED_SINCE=t)
r = self.view(self.view_url, HTTP_IF_UNMODIFIED_SINCE=t)
self.assertEqual(r.status_code, 412)
def test_if_unmodified_since(self):
......@@ -435,7 +462,7 @@ class ObjectGetView(PithosAPITest):
t_formats = map(t.strftime, DATE_FORMATS)
for tf in t_formats:
r = self.get(self.view_url, HTTP_IF_UNMODIFIED_SINCE=tf)
r = self.view(self.view_url, HTTP_IF_UNMODIFIED_SINCE=tf)
self.assertEqual(r.status_code, 200)
self.assertEqual(r.content, self.odata)
......@@ -447,7 +474,7 @@ class ObjectGetView(PithosAPITest):
t_formats = map(t.strftime, DATE_FORMATS)
for tf in t_formats:
r = self.get(self.view_url, HTTP_IF_UNMODIFIED_SINCE=tf)
r = self.view(self.view_url, HTTP_IF_UNMODIFIED_SINCE=tf)
self.assertEqual(r.status_code, 412)
def test_hashes(self):
......@@ -456,7 +483,7 @@ class ObjectGetView(PithosAPITest):
size = len(odata)
view_url = join_urls(self.view_path, self.user, self.cname, oname)
r = self.get('%s?format=json&hashmap' % view_url)
r = self.view('%s?format=json&hashmap' % view_url)
self.assertEqual(r.status_code, 200)
body = json.loads(r.content)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment