Commit 1ac442e2 authored by Sofia Papagiannaki's avatar Sofia Papagiannaki Committed by Chrysostomos Nanakos

pithos: Update backend tests

parent 8ce5fb09
......@@ -18,14 +18,56 @@ from .quota import TestQuotaMixin
from .delete_by_uuid import TestDeleteByUUIDMixin
from .snapshots import TestSnapshotsMixin
from sqlalchemy import create_engine
import os
import time
class TestSQLAlchemyBackend(CommonMixin, TestDeleteByUUIDMixin,
TestQuotaMixin, TestSnapshotsMixin):
db_module = 'pithos.backends.lib.sqlalchemy'
db_connection = 'sqlite:////tmp/test_pithos_backend.db'
db_connection_str = '%(scheme)s://%(user)s:%(pwd)s@%(host)s:%(port)s/%(name)s'
scheme = os.environ.get('DB_SCHEME', 'postgres')
user = os.environ.get('DB_USER', 'synnefo')
pwd = os.environ.get('DB_PWD', 'example_passw0rd')
host = os.environ.get('DB_HOST', 'db.synnefo.live')
port = os.environ.get('DB_PORT', 5432)
name = 'test_pithos_backend'
db_connection = db_connection_str % locals()
mapfile_prefix ='snf_test_pithos_backend_sqlalchemy_%s_' % time.time()
@classmethod
def create_db(cls):
db = cls.db_connection_str % {'scheme': cls.scheme, 'user': cls.user,
'pwd': cls.pwd, 'host': cls.host,
'port':cls.port, 'name': 'template1'}
e = create_engine(db)
c = e.connect()
c.connection.connection.set_isolation_level(0)
c.execute('create database %s' % cls.name)
c.connection.connection.set_isolation_level(1)
@classmethod
def destroy_db(cls):
db = cls.db_connection_str % {'scheme': cls.scheme, 'user': cls.user,
'pwd': cls.pwd, 'host': cls.host,
'port':cls.port, 'name': 'template1'}
e = create_engine(db)
c = e.connect()
c.connection.connection.set_isolation_level(0)
c.execute('drop database %s' % cls.name)
c.connection.connection.set_isolation_level(1)
class TestSQLiteBackend(CommonMixin, TestDeleteByUUIDMixin, TestQuotaMixin,
TestSnapshotsMixin):
db_module = 'pithos.backends.lib.sqlite'
db_connection = ':memory:'
db_module = 'pithos.backends.lib.sqlite'
db_connection = location = '/tmp/test_pithos_backend.db'
mapfile_prefix ='snf_test_pithos_backend_sqlite_%s_' % time.time()
@classmethod
def create_db(cls):
pass
@classmethod
def destroy_db(cls):
os.remove(cls.location)
......@@ -16,29 +16,34 @@
from mock import MagicMock
from pithos.backends.base import ItemNotExists
from pithos.backends.random_word import get_random_word
from pithos.backends.util import connect_backend
from .util import get_random_data
import random
import unittest
get_random_data = lambda length: get_random_word(length)[:length]
class CommonMixin(unittest.TestCase):
block_size = 1024
hash_algorithm = 'sha256'
block_path = '/tmp/data'
account = 'user'
free_versioning = True
@classmethod
def setUpClass(cls):
cls.create_db()
@classmethod
def tearDownClass(cls):
cls.destroy_db()
def setUp(self):
self.b = connect_backend(db_connection=self.db_connection,
db_module=self.db_module,
block_path=self.block_path,
block_size=self.block_size,
hash_algorithm=self.hash_algorithm,
free_versioning=self.free_versioning)
free_versioning=self.free_versioning,
mapfile_prefix=self.mapfile_prefix)
self.b.astakosclient = MagicMock()
self.b.astakosclient.issue_one_commission.return_value = 42
self.b.commission_serials = MagicMock()
......
......@@ -13,40 +13,58 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from pithos.backends.base import (IllegalOperationError, NotAllowedError,
ItemNotExists)
class TestSnapshotsMixin(object):
def test_copy_snapshot(self):
name = 'snf-snap-1-1'
t = [self.account, self.account, 'snapshots', name]
mapfile = 'archip:%s' % name
self.b.register_object_map(*t, size=100,
type='application/octet-stream',
mapfile='archip:%s' % name)
mapfile=mapfile)
meta = self.b.get_object_meta(*t, include_user_defined=False)
self.assertTrue('available' in meta)
self.assertEqual(meta['available'], False)
self.assertTrue('mapfile' in meta)
self.assertEqual(meta['mapfile'], mapfile)
self.assertTrue('is_snapshot' in meta)
self.assertEqual(meta['is_snapshot'], True)
dest_name = 'snf-snap-1-2'
t2 = [self.account, self.account, 'snapshots', dest_name]
self.b.copy_object(*(t + t2[1:]), type='application/octet-stream',
domain='snapshots')
self.assertRaises(NotAllowedError, self.b.copy_object, *(t + t2[1:]),
type='application/octet-stream', domain='snapshots')
meta2 = self.b.get_object_meta(*t2, include_user_defined=False)
self.assertRaises(ItemNotExists, self.b.get_object_meta, *t2)
meta2 = self.b.get_object_meta(*t, include_user_defined=False)
self.assertTrue('available' in meta2)
self.assertEqual(meta['available'], meta2['available'])
self.assertTrue('mapfile' in meta2)
self.assertTrue(meta['mapfile'] == meta2['mapfile'])
self.assertTrue('is_snapshot' in meta2)
self.assertEqual(meta['is_snapshot'], meta2['is_snapshot'])
def test_move_snapshot(self):
name = 'snf-snap-1-1'
name = 'snf-snap-2-1'
t = [self.account, self.account, 'snapshots', name]
mapfile = 'archip:%s' % name
self.b.register_object_map(*t, size=100,
type='application/octet-stream',
mapfile='archip:%s' % name)
mapfile=mapfile)
meta = self.b.get_object_meta(*t, include_user_defined=False)
self.assertTrue('available' in meta)
self.assertEqual(meta['available'], False)
self.assertTrue('mapfile' in meta)
self.assertEqual(meta['mapfile'], mapfile)
self.assertTrue('is_snapshot' in meta)
self.assertEqual(meta['is_snapshot'], True)
dest_name = 'snf-snap-1-2'
dest_name = 'snf-snap-2-2'
t2 = [self.account, self.account, 'snapshots', dest_name]
self.b.move_object(*(t + t2[1:]), type='application/octet-stream',
domain='snapshots')
......@@ -54,3 +72,51 @@ class TestSnapshotsMixin(object):
meta2 = self.b.get_object_meta(*t2, include_user_defined=False)
self.assertTrue('available' in meta2)
self.assertEqual(meta['available'], meta2['available'])
self.assertTrue('mapfile' in meta2)
self.assertEqual(meta['mapfile'], meta2['mapfile'])
self.assertTrue('is_snapshot', meta2['is_snapshot'])
self.assertEqual(meta['is_snapshot'], meta2['is_snapshot'])
def test_update_snapshot(self):
name = 'snf-snap-3-1'
mapfile = 'archip:%s' % name
t = [self.account, self.account, 'snapshots', name]
self.b.register_object_map(*t, size=100,
type='application/octet-stream',
mapfile=mapfile)
meta = self.b.get_object_meta(*t, include_user_defined=False)
self.assertTrue('available' in meta)
self.assertEqual(meta['available'], False)
self.assertTrue('mapfile' in meta)
self.assertEqual(meta['mapfile'], mapfile)
self.assertTrue('is_snapshot' in meta)
self.assertEqual(meta['is_snapshot'], True)
domain = 'plankton'
self.b.update_object_meta(*t, domain=domain, meta={'foo': 'bar'})
meta2 = self.b.get_object_meta(*t, domain=domain,
include_user_defined=True)
self.assertTrue('available' in meta2)
self.assertEqual(meta2['available'], False)
self.assertTrue('mapfile' in meta2)
self.assertEqual(meta2['mapfile'], mapfile)
self.assertTrue('is_snapshot' in meta2)
self.assertEqual(meta2['is_snapshot'], True)
self.assertTrue('foo' in meta2)
self.assertTrue(meta2['foo'], 'bar')
try:
self.b.update_object_hashmap(*t, size=0,
type='application/octet-stream',
hashmap=(), checksum='',
domain='plankton')
except IllegalOperationError:
meta = self.b.get_object_meta(*t, include_user_defined=False)
self.assertTrue('available' in meta)
self.assertEqual(meta['available'], False)
self.assertTrue('mapfile' in meta)
self.assertEqual(meta['mapfile'], mapfile)
self.assertTrue('is_snapshot' in meta)
self.assertEqual(meta['is_snapshot'], True)
else:
self.fail('Update snapshot should not be allowed')
# Copyright (C) 2014 GRNET S.A.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import random
import string
get_random_data = lambda length: ''.join(random.choice(string.letters[:26])
for i in xrange(length))
get_random_name = lambda: get_random_data(length=8)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment