Commit fcc62e5d authored by Sofia Papagiannaki's avatar Sofia Papagiannaki
Browse files

pithos: Update/extend tests

parent 1639759e
......@@ -60,3 +60,5 @@ PITHOS_BASE_URL = 'http://storage.example.synnefo.org/pithos/'
CLOUDBAR_LOCATION = '/static/im/cloudbar/'
CLOUDBAR_SERVICES_URL = '/ui/get_services'
CLOUDBAR_MENU_URL = '/ui/get_menu'
TEST_RUNNER = 'pithos.api.test.PithosTestSuiteRunner'
......@@ -40,19 +40,21 @@ from xml.dom import minidom
from snf_django.utils.testing import with_settings, astakos_user
from pithos.api import settings as pithos_settings
from pithos.api.test.util import is_date, get_random_data
from pithos.api.test.util import is_date, get_random_data, get_random_name
from pithos.backends.migrate import initialize_db
from synnefo.lib.services import get_service_path
from synnefo.lib import join_urls
from django.test import TestCase
from django.test.simple import DjangoTestSuiteRunner
from django.conf import settings
from django.utils.http import urlencode
from django.db.backends.creation import TEST_DATABASE_PREFIX
import django.utils.simplejson as json
import random
import threading
import functools
......@@ -83,26 +85,31 @@ details = {'container': ('name', 'count', 'bytes', 'last_modified',
'object': ('name', 'hash', 'bytes', 'content_type',
'content_encoding', 'last_modified',)}
return_codes = (400, 401, 403, 404, 503)
TEST_BLOCK_SIZE = 1024
TEST_HASH_ALGORITHM = 'sha256'
BACKEND_DB_CONNECTION = None
print 'backend module:', pithos_settings.BACKEND_DB_MODULE
print 'backend database engine:', settings.DATABASES['default']['ENGINE']
print 'update md5:', pithos_settings.UPDATE_MD5
django_sqlalchemy_engines = {
'django.db.backends.postgresql_psycopg2': 'postgresql+psycopg2',
'django.db.backends.postgresql': 'postgresql',
'django.db.backends.mysql': '',
'django.db.backends.sqlite3': 'mssql',
'django.db.backends.oracle': 'oracle'}
def django_to_sqlalchemy():
"""Convert the django default database to sqlalchemy connection string"""
global BACKEND_DB_CONNECTION
if BACKEND_DB_CONNECTION:
return BACKEND_DB_CONNECTION
def prepate_db_connection():
"""Build pithos backend connection string from django default database"""
# TODO support for more complex configuration
db = settings.DATABASES['default']
name = db.get('TEST_NAME', 'test_%s' % db['NAME'])
name = db.get('TEST_NAME', TEST_DATABASE_PREFIX + db['NAME'])
if (pithos_settings.BACKEND_DB_MODULE == 'pithos.backends.lib.sqlalchemy'):
if db['ENGINE'] == 'django.db.backends.sqlite3':
BACKEND_DB_CONNECTION = 'sqlite:///%s' % name
db_connection = 'sqlite:///%s' % name
else:
d = dict(scheme=django_sqlalchemy_engines.get(db['ENGINE']),
user=db['USER'],
......@@ -110,18 +117,26 @@ def django_to_sqlalchemy():
host=db['HOST'].lower(),
port=int(db['PORT']) if db['PORT'] != '' else '',
name=name)
BACKEND_DB_CONNECTION = (
db_connection = (
'%(scheme)s://%(user)s:%(pwd)s@%(host)s:%(port)s/%(name)s' % d)
return BACKEND_DB_CONNECTION
# initialize pithos database
initialize_db(db_connection)
else:
db_connection = name
pithos_settings.BACKEND_DB_CONNECTION = db_connection
class PithosTestSuiteRunner(DjangoTestSuiteRunner):
def setup_databases(self, **kwargs):
old_names, mirrors = super(PithosTestSuiteRunner,
self).setup_databases(**kwargs)
prepate_db_connection()
return old_names, mirrors
class PithosAPITest(TestCase):
def setUp(self):
if (pithos_settings.BACKEND_DB_MODULE ==
'pithos.backends.lib.sqlalchemy'):
pithos_settings.BACKEND_DB_CONNECTION = django_to_sqlalchemy()
pithos_settings.BACKEND_POOL_SIZE = 1
# Override default block size to spead up tests
pithos_settings.BACKEND_BLOCK_SIZE = TEST_BLOCK_SIZE
pithos_settings.BACKEND_HASH_ALGORITHM = TEST_HASH_ALGORITHM
......@@ -318,7 +333,7 @@ class PithosAPITest(TestCase):
def upload_object(self, cname, oname=None, length=None, verify=True,
**meta):
oname = oname or get_random_data(8)
oname = oname or get_random_name()
length = length or random.randint(TEST_BLOCK_SIZE, 2 * TEST_BLOCK_SIZE)
data = get_random_data(length=length)
headers = dict(('HTTP_X_OBJECT_META_%s' % k.upper(), v)
......@@ -332,7 +347,7 @@ class PithosAPITest(TestCase):
def update_object_data(self, cname, oname=None, length=None,
content_type=None, content_range=None,
verify=True, **meta):
oname = oname or get_random_data(8)
oname = oname or get_random_name()
length = length or random.randint(TEST_BLOCK_SIZE, 2 * TEST_BLOCK_SIZE)
content_type = content_type or 'application/octet-stream'
data = get_random_data(length=length)
......@@ -354,7 +369,7 @@ class PithosAPITest(TestCase):
content_range='bytes */*')
def create_folder(self, cname, oname=None, **headers):
oname = oname or get_random_data(8)
oname = oname or get_random_name()
url = join_urls(self.pithos_path, self.user, cname, oname)
r = self.put(url, data='', content_type='application/directory',
**headers)
......@@ -408,14 +423,6 @@ class PithosAPITest(TestCase):
(self.assertEqual(object_meta['X-Object-Meta-%s' % k], v) for
k, v in meta.items())
def assert_status(self, status, codes):
l = [elem for elem in return_codes]
if isinstance(codes, list):
l.extend(codes)
else:
l.append(codes)
self.assertTrue(status in l)
def assert_extended(self, data, format, type, size=10000):
if format == 'xml':
self._assert_xml(data, type, size)
......@@ -484,45 +491,3 @@ class AssertUUidInvariant(object):
assert('x-object-uuid' in self.map)
uuid = map['x-object-uuid']
assert(uuid == self.uuid)
django_sqlalchemy_engines = {
'django.db.backends.postgresql_psycopg2': 'postgresql+psycopg2',
'django.db.backends.postgresql': 'postgresql',
'django.db.backends.mysql': '',
'django.db.backends.sqlite3': 'mssql',
'django.db.backends.oracle': 'oracle'}
def test_concurrently(times=2):
"""
Add this decorator to small pieces of code that you want to test
concurrently to make sure they don't raise exceptions when run at the
same time. E.g., some Django views that do a SELECT and then a subsequent
INSERT might fail when the INSERT assumes that the data has not changed
since the SELECT.
"""
def test_concurrently_decorator(test_func):
def wrapper(*args, **kwargs):
exceptions = []
def call_test_func():
try:
test_func(*args, **kwargs)
except Exception, e:
exceptions.append(e)
raise
threads = []
for i in range(times):
threads.append(threading.Thread())
for t in threads:
t.start()
for t in threads:
t.join()
if exceptions:
raise Exception(
('test_concurrently intercepted %s',
'exceptions: %s') % (len(exceptions), exceptions))
return wrapper
return test_concurrently_decorator
......@@ -34,8 +34,8 @@
# interpreted as representing official policies, either expressed
# or implied, of GRNET S.A.
from pithos.api.test import PithosAPITest, AssertMappingInvariant,\
DATE_FORMATS
from pithos.api.test import (PithosAPITest, AssertMappingInvariant,
DATE_FORMATS)
from synnefo.lib import join_urls
......
......@@ -36,7 +36,7 @@
from pithos.api.test import (PithosAPITest, DATE_FORMATS, o_names,
pithos_settings, pithos_test_settings)
from pithos.api.test.util import strnextling, get_random_data
from pithos.api.test.util import strnextling, get_random_data, get_random_name
from synnefo.lib import join_urls
......@@ -170,7 +170,7 @@ class ContainerGet(PithosAPITest):
# check folder inheritance
oname, _ = self.create_folder(cname, HTTP_X_OBJECT_SHARING='read=*')
# create child object
descendant = '%s/%s' % (oname, get_random_data(8))
descendant = '%s/%s' % (oname, get_random_name())
self.upload_object(cname, descendant)
# request shared
url = join_urls(self.pithos_path, self.user, cname)
......@@ -261,7 +261,7 @@ class ContainerGet(PithosAPITest):
# test folder inheritance
oname, _ = self.create_folder(cname, HTTP_X_OBJECT_PUBLIC='true')
# create child object
descendant = '%s/%s' % (oname, get_random_data(8))
descendant = '%s/%s' % (oname, get_random_name())
self.upload_object(cname, descendant)
# request public
r = self.get('%s?public=' % url)
......@@ -322,7 +322,7 @@ class ContainerGet(PithosAPITest):
# test folder inheritance
oname, _ = self.create_folder(cname, HTTP_X_OBJECT_PUBLIC='true')
# create child object
descendant = '%s/%s' % (oname, get_random_data(8))
descendant = '%s/%s' % (oname, get_random_name())
self.upload_object(cname, descendant)
# request public
r = self.get('%s?shared=&public=' % container_url)
......
......@@ -42,7 +42,8 @@ from pithos.api.test import (PithosAPITest, pithos_settings,
AssertMappingInvariant, AssertUUidInvariant,
TEST_BLOCK_SIZE, TEST_HASH_ALGORITHM,
DATE_FORMATS)
from pithos.api.test.util import md5_hash, merkle, strnextling, get_random_data
from pithos.api.test.util import (md5_hash, merkle, strnextling,
get_random_data, get_random_name)
from synnefo.lib import join_urls
......@@ -128,7 +129,7 @@ class ObjectGet(PithosAPITest):
self.assertEqual(r.status_code, 404)
# upload object with trailing space
oname = self.upload_object('c1', quote('%s ' % get_random_data(8)))[0]
oname = self.upload_object('c1', quote('%s ' % get_random_name()))[0]
url = join_urls(self.pithos_path, self.user, 'c1', oname)
r = self.get(url)
......@@ -314,7 +315,7 @@ class ObjectGet(PithosAPITest):
# perform get with If-Match
url = join_urls(self.pithos_path, self.user, cname, oname)
r = self.get(url, HTTP_IF_MATCH=get_random_data(8))
r = self.get(url, HTTP_IF_MATCH=get_random_name())
self.assertEqual(r.status_code, 412)
def test_if_none_match(self):
......@@ -494,12 +495,12 @@ class ObjectGet(PithosAPITest):
class ObjectPut(PithosAPITest):
def setUp(self):
PithosAPITest.setUp(self)
self.container = get_random_data(8)
self.container = get_random_name()
self.create_container(self.container)
def test_upload(self):
cname = self.container
oname = get_random_data(8)
oname = get_random_name()
data = get_random_data()
meta = {'test': 'test1'}
headers = dict(('HTTP_X_OBJECT_META_%s' % k.upper(), v)
......@@ -526,7 +527,7 @@ class ObjectPut(PithosAPITest):
def test_maximum_upload_size_exceeds(self):
cname = self.container
oname = get_random_data(8)
oname = get_random_name()
# set container quota to 100
url = join_urls(self.pithos_path, self.user, cname)
......@@ -543,7 +544,7 @@ class ObjectPut(PithosAPITest):
def test_upload_with_name_containing_slash(self):
cname = self.container
oname = '/%s' % get_random_data(8)
oname = '/%s' % get_random_name()
data = get_random_data()
url = join_urls(self.pithos_path, self.user, cname, oname)
r = self.put(url, data=data)
......@@ -557,7 +558,7 @@ class ObjectPut(PithosAPITest):
def test_upload_unprocessable_entity(self):
cname = self.container
oname = get_random_data(8)
oname = get_random_name()
data = get_random_data()
url = join_urls(self.pithos_path, self.user, cname, oname)
r = self.put(url, data=data, HTTP_ETAG='123')
......@@ -565,7 +566,7 @@ class ObjectPut(PithosAPITest):
# def test_chunked_transfer(self):
# cname = self.container
# oname = '/%s' % get_random_data(8)
# oname = '/%s' % get_random_name()
# data = get_random_data()
# url = join_urls(self.pithos_path, self.user, cname, oname)
# r = self.put(url, data=data, HTTP_TRANSFER_ENCODING='chunked')
......@@ -582,7 +583,7 @@ class ObjectPut(PithosAPITest):
data += self.upload_object(cname, oname=part)[1]
manifest = '%s/%s' % (cname, prefix)
oname = get_random_data(8)
oname = get_random_name()
url = join_urls(self.pithos_path, self.user, cname, oname)
r = self.put(url, data='', HTTP_X_OBJECT_MANIFEST=manifest)
self.assertEqual(r.status_code, 201)
......@@ -595,14 +596,14 @@ class ObjectPut(PithosAPITest):
self.assertEqual(r.content, data)
# invalid manifestation
invalid_manifestation = '%s/%s' % (cname, get_random_data(8))
invalid_manifestation = '%s/%s' % (cname, get_random_name())
self.put(url, data='', HTTP_X_OBJECT_MANIFEST=invalid_manifestation)
r = self.get(url)
self.assertEqual(r.content, '')
def test_create_zero_length_object(self):
cname = self.container
oname = get_random_data(8)
oname = get_random_name()
url = join_urls(self.pithos_path, self.user, cname, oname)
r = self.put(url, data='')
self.assertEqual(r.status_code, 201)
......@@ -629,7 +630,7 @@ class ObjectPut(PithosAPITest):
url = join_urls(self.pithos_path, self.user, cname, oname)
r = self.get('%s?hashmap=&format=json' % url)
oname = get_random_data(8)
oname = get_random_name()
url = join_urls(self.pithos_path, self.user, cname, oname)
r = self.put('%s?hashmap=' % url, data=r.content)
self.assertEqual(r.status_code, 201)
......@@ -655,7 +656,7 @@ class ObjectCopy(PithosAPITest):
with AssertMappingInvariant(self.get_object_info, self.container,
self.object):
# copy object
oname = get_random_data(8)
oname = get_random_name()
url = join_urls(self.pithos_path, self.user, self.container, oname)
r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy',
HTTP_X_COPY_FROM='/%s/%s' % (
......@@ -679,7 +680,7 @@ class ObjectCopy(PithosAPITest):
self.create_container(cname)
with AssertMappingInvariant(self.get_object_info, self.container,
self.object):
oname = get_random_data(8)
oname = get_random_name()
url = join_urls(self.pithos_path, self.user, cname, oname)
r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy',
HTTP_X_COPY_FROM='/%s/%s' % (
......@@ -700,32 +701,32 @@ class ObjectCopy(PithosAPITest):
def test_copy_invalid(self):
# copy from non-existent object
oname = get_random_data(8)
oname = get_random_name()
url = join_urls(self.pithos_path, self.user, self.container, oname)
r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy',
HTTP_X_COPY_FROM='/%s/%s' % (
self.container, get_random_data(8)))
self.container, get_random_name()))
self.assertEqual(r.status_code, 404)
# copy from non-existent container
oname = get_random_data(8)
oname = get_random_name()
url = join_urls(self.pithos_path, self.user, self.container, oname)
r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy',
HTTP_X_COPY_FROM='/%s/%s' % (
get_random_data(8), self.object))
get_random_name(), self.object))
self.assertEqual(r.status_code, 404)
def test_copy_dir(self):
folder = self.create_folder(self.container)[0]
subfolder = self.create_folder(
self.container, oname='%s/%s' % (folder, get_random_data(8)))[0]
self.container, oname='%s/%s' % (folder, get_random_name()))[0]
objects = [subfolder]
append = objects.append
append(self.upload_object(self.container,
'%s/%s' % (folder, get_random_data(8)),
'%s/%s' % (folder, get_random_name()),
HTTP_X_OBJECT_META_DEPTH='1')[0])
append(self.upload_object(self.container,
'%s/%s' % (subfolder, get_random_data(8)),
'%s/%s' % (subfolder, get_random_name()),
HTTP_X_OBJECT_META_DEPTH='2')[0])
other = self.upload_object(self.container, strnextling(folder))[0]
......@@ -772,7 +773,7 @@ class ObjectMove(PithosAPITest):
def test_move(self):
# move object
oname = get_random_data(8)
oname = get_random_name()
url = join_urls(self.pithos_path, self.user, self.container, oname)
r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy',
HTTP_X_MOVE_FROM='/%s/%s' % (
......@@ -799,17 +800,17 @@ class ObjectMove(PithosAPITest):
def test_move_dir(self):
folder = self.create_folder(self.container)[0]
subfolder = self.create_folder(
self.container, oname='%s/%s' % (folder, get_random_data(8)))[0]
self.container, oname='%s/%s' % (folder, get_random_name()))[0]
objects = [subfolder]
append = objects.append
meta = {}
meta[objects[0]] = {}
append(self.upload_object(self.container,
'%s/%s' % (folder, get_random_data(8)),
'%s/%s' % (folder, get_random_name()),
HTTP_X_OBJECT_META_DEPTH='1')[0])
meta[objects[1]] = {'X-Object-Meta-Depth': '1'}
append(self.upload_object(self.container,
'%s/%s' % (subfolder, get_random_data(8)),
'%s/%s' % (subfolder, get_random_name()),
HTTP_X_OBJECT_META_DEPTH='2')[0])
meta[objects[1]] = {'X-Object-Meta-Depth': '2'}
other = self.upload_object(self.container, strnextling(folder))[0]
......@@ -1081,7 +1082,7 @@ class ObjectPost(PithosAPITest):
def test_update_from_other_object(self):
src = self.object
dest = get_random_data(8)
dest = get_random_name()
url = join_urls(self.pithos_path, self.user, self.container, src)
r = self.get(url)
......@@ -1111,7 +1112,7 @@ class ObjectPost(PithosAPITest):
def test_update_range_from_other_object(self):
src = self.object
dest = get_random_data(8)
dest = get_random_name()
url = join_urls(self.pithos_path, self.user, self.container, src)
r = self.get(url)
......@@ -1156,24 +1157,24 @@ class ObjectDelete(PithosAPITest):
def test_delete_non_existent(self):
url = join_urls(self.pithos_path, self.user, self.container,
get_random_data(8))
get_random_name())
r = self.delete(url)
self.assertEqual(r.status_code, 404)
def test_delete_dir(self):
folder = self.create_folder(self.container)[0]
subfolder = self.create_folder(
self.container, oname='%s/%s' % (folder, get_random_data(8)))[0]
self.container, oname='%s/%s' % (folder, get_random_name()))[0]
objects = [subfolder]
append = objects.append
meta = {}
meta[objects[0]] = {}
append(self.upload_object(self.container,
'%s/%s' % (folder, get_random_data(8)),
'%s/%s' % (folder, get_random_name()),
HTTP_X_OBJECT_META_DEPTH='1')[0])
meta[objects[1]] = {'X-Object-Meta-Depth': '1'}
append(self.upload_object(self.container,
'%s/%s' % (subfolder, get_random_data(8)),
'%s/%s' % (subfolder, get_random_name()),
HTTP_X_OBJECT_META_DEPTH='2')[0])
meta[objects[1]] = {'X-Object-Meta-Depth': '2'}
other = self.upload_object(self.container, strnextling(folder))[0]
......
......@@ -35,7 +35,7 @@
# or implied, of GRNET S.A.
from pithos.api.test import PithosAPITest
from pithos.api.test.util import get_random_data
from pithos.api.test.util import get_random_data, get_random_name
from synnefo.lib import join_urls
......@@ -53,7 +53,7 @@ class TestPermissions(PithosAPITest):
r = self.post(url, **kwargs)
self.assertEqual(r.status_code, 202)
self.container = get_random_data(8)
self.container = get_random_name()
self.create_container(self.container)
self.object = self.upload_object(self.container)[0]
l = [self.object + '/', self.object + '/a', self.object + 'a',
......
# Copyright 2011 GRNET S.A. All rights reserved.
#
# Redistribution and use in source and binary forms, with or
# without modification, are permitted provided that the following
# conditions are met:
#
# 1. Redistributions of source code must retain the above
# copyright notice, this list of conditions and the following
# disclaimer.
#
# 2. Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials
# provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# The views and conclusions contained in the software and
# documentation are those of the authors and should not be
# interpreted as representing official policies, either expressed
# or implied, of GRNET S.A.
import random
import datetime
import time as _time
from synnefo.lib import join_urls
import django.utils.simplejson as json
from pithos.api.test import PithosAPITest
from pithos.api.test.util import get_random_name
from pithos.api import settings as pithos_settings
class TestPublic(PithosAPITest):
def _assert_not_public_object(self, cname, oname):
info = self.get_object_info(cname, oname)
self.assertTrue('X-Object-Public' not in info)
def _assert_public_object(self, cname, oname, odata):
info = self.get_object_info(cname, oname)
self.assertTrue('X-Object-Public' in info)
public = info['X-Object-Public']
self.assertTrue(len(public) >= pithos_settings.PUBLIC_URL_SECURITY)
(self.assertTrue(l in pithos_settings.PUBLIC_URL_ALPHABET) for
l in public)
r = self.get(public, user='user2')
self.assertEqual(r.status_code, 200)
self.assertTrue('X-Object-Public' not in r)
self.assertEqual(r.content, odata)
# assert other users cannot access the object using the priavate path
url = join_urls(self.pithos_path, self.user, cname, oname)
r = self.head(url, user='user2')
self.assertEqual(r.status_code, 403)
r = self.get(url, user='user2')
self.assertEqual(r.status_code, 403)
return public
def test_set_object_public(self):
cname = get_random_name()
self.create_container(cname)
oname, odata = self.upload_object(cname)[:-1]
self._assert_not_public_object(cname, oname)
# set public
url = join_urls(self.pithos_path, self.user, cname, oname)
r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
self.assertEqual(r.status_code, 202)
self._assert_public_object(cname, oname, odata)
def test_set_twice(self):
cname = get_random_name()
self.create_container(cname)
oname, odata = self.upload_object(cname)[:-1]
self._assert_not_public_object(cname, oname)