Commit 400ce4ea authored by Sofia Papagiannaki's avatar Sofia Papagiannaki

Migration Tools: Progress IV

Refs #1171
parents 051c7450 739c2e9d
......@@ -124,6 +124,56 @@ Useful alias to add in ``~/.bashrc``::
alias pithos-sync='cd /pithos && git pull && python setup.py build_sphinx && /etc/init.d/apache2 restart'
Gunicorn Setup
--------------
Add in ``/etc/apt/sources.list``::
deb http://backports.debian.org/debian-backports squeeze-backports main
Then::
apt-get update
apt-get -t squeeze-backports install gunicorn
apt-get -t squeeze-backports install python-gevent
Create ``/etc/gunicorn.d/pithos``::
CONFIG = {
'mode': 'django',
'working_dir': '/pithos/pithos',
'user': 'www-data',
'group': 'www-data',
'args': (
'--bind=[::]:8080',
'--worker-class=egg:gunicorn#gevent',
'--workers=4',
'--log-level=debug',
'/pithos/pithos/settings.py',
),
}
Replace the ``WSGI*`` directives in ``/etc/apache2/sites-available/pithos`` and ``/etc/apache2/sites-available/pithos-ssl`` with::
<Proxy *>
Order allow,deny
Allow from all
</Proxy>
SetEnv proxy-sendchunked
SSLProxyEngine off
ProxyErrorOverride off
ProxyPass /api http://localhost:8080 retry=0
ProxyPassReverse /api http://localhost:8080
Configure and run::
/etc/init.d/gunicorn restart
a2enmod proxy
a2enmod proxy_http
/etc/init.d/apache2 restart
Shibboleth Setup
----------------
......@@ -135,15 +185,15 @@ Setup the files in ``/etc/shibboleth``.
Add in ``/etc/apache2/sites-available/pithos-ssl``::
ShibConfig /etc/shibboleth/shibboleth2.xml
Alias /shibboleth-sp /usr/share/shibboleth
ShibConfig /etc/shibboleth/shibboleth2.xml
Alias /shibboleth-sp /usr/share/shibboleth
<Location /api/login>
AuthType shibboleth
ShibRequireSession On
ShibUseHeaders On
require valid-user
</Location>
<Location /api/login>
AuthType shibboleth
ShibRequireSession On
ShibUseHeaders On
require valid-user
</Location>
Configure and run apache::
......
......@@ -500,7 +500,7 @@ def socket_read_iterator(request, length=0, blocksize=4096):
sock = raw_input_socket(request)
if length < 0: # Chunked transfers
# Small version (server does the dechunking).
if request.environ.get('mod_wsgi.input_chunked', None):
if request.environ.get('mod_wsgi.input_chunked', None) or request.META['SERVER_SOFTWARE'].startswith('gunicorn'):
while length < MAX_UPLOAD_SIZE:
data = sock.read(blocksize)
if data == '':
......
......@@ -138,7 +138,7 @@ class BaseBackend(object):
"""
return
def put_account(self, user, account, policy=None):
def put_account(self, user, account, policy={}):
"""Create a new account with the given name.
Raises:
......@@ -237,7 +237,7 @@ class BaseBackend(object):
"""
return
def put_container(self, user, account, container, policy=None):
def put_container(self, user, account, container, policy={}):
"""Create a new container with the given name.
Raises:
......
......@@ -243,7 +243,7 @@ class ModularBackend(BaseBackend):
self._put_policy(node, policy, replace)
@backend_method
def put_account(self, user, account, policy=None):
def put_account(self, user, account, policy={}):
"""Create a new account with the given name."""
logger.debug("put_account: %s %s", account, policy)
......@@ -353,7 +353,7 @@ class ModularBackend(BaseBackend):
self._put_policy(node, policy, replace)
@backend_method
def put_container(self, user, account, container, policy=None):
def put_container(self, user, account, container, policy={}):
"""Create a new container with the given name."""
logger.debug("put_container: %s %s %s", account, container, policy)
......@@ -541,8 +541,8 @@ class ModularBackend(BaseBackend):
# Check quota.
size_delta = size # Change with versioning.
if size_delta > 0:
account_quota = self._get_policy(account_node)['quota']
container_quota = self._get_policy(container_node)['quota']
account_quota = long(self._get_policy(account_node)['quota'])
container_quota = long(self._get_policy(container_node)['quota'])
if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
(container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
# This must be executed in a transaction, so the version is never created if it fails.
......
......@@ -45,13 +45,16 @@ import datetime
ERROR_CODES = {304:'Not Modified',
400:'Bad Request',
401:'Unauthorized',
403:'Forbidden',
404:'Not Found',
409:'Conflict',
411:'Length Required',
412:'Precondition Failed',
413:'Request Entity Too Large',
416:'Range Not Satisfiable',
422:'Unprocessable Entity',
503:'Service Unavailable'}
503:'Service Unavailable',
}
class Fault(Exception):
def __init__(self, data='', status=None):
......
......@@ -34,7 +34,8 @@
# or implied, of GRNET S.A.
from sqlalchemy import create_engine
from sqlalchemy import Table, MetaData
from sqlalchemy import Table, Column, String, MetaData
from sqlalchemy.sql import select
from django.conf import settings
......@@ -51,4 +52,35 @@ class Migration(object):
self.backend = ModularBackend(*options)
def execute(self):
pass
\ No newline at end of file
pass
class Cache():
def __init__(self, db):
self.engine = create_engine(db)
metadata = MetaData(self.engine)
columns=[]
columns.append(Column('path', String(2048), primary_key=True))
columns.append(Column('hash', String(255)))
self.files = Table('files', metadata, *columns)
self.conn = self.engine.connect()
self.engine.echo = True
metadata.create_all(self.engine)
def put(self, path, hash):
# Insert or replace.
s = self.files.delete().where(self.files.c.path==path)
r = self.conn.execute(s)
r.close()
s = self.files.insert()
r = self.conn.execute(s, {'path': path, 'hash': hash})
r.close()
def get(self, path):
s = select([self.files.c.hash], self.files.c.path == path)
r = self.conn.execute(s)
l = r.fetchone()
r.close()
if not l:
return l
return l[0]
\ No newline at end of file
......@@ -35,64 +35,47 @@
from binascii import hexlify
from sqlalchemy import create_engine
from sqlalchemy import Table, Column, String, MetaData
from sqlalchemy import Table
from sqlalchemy.sql import select
from pithos import settings
from pithos.backends.modular import ModularBackend
from lib.hashmap import HashMap
from lib.migrate import Migration
from lib.migrate import Migration, Cache
import os
class DataMigration(Migration):
def __init__(self, pithosdb, db):
Migration.__init__(self, pithosdb)
# XXX Need more columns for primary key - last modified timestamp...
engine = create_engine(db)
metadata = MetaData(engine)
columns=[]
columns.append(Column('path', String(2048), primary_key=True))
columns.append(Column('hash', String(255)))
self.files = Table('files', metadata, *columns)
metadata.create_all(engine)
def cache_put(self, path, hash):
# Insert or replace.
s = self.files.delete().where(self.files.c.path==path)
r = self.conn.execute(s)
r.close()
s = self.files.insert()
r = self.conn.execute(s, {'path': path, 'hash': hash})
r.close()
self.cache = Cache(db)
def cache_get(self, path):
s = select([self.files.c.hash], self.files.c.path == path)
r = self.conn.execute(s)
l = r.fetchone()
r.close()
if not l:
return l
return l[0]
def execute(self):
blocksize = self.backend.block_size
blockhash = self.backend.hash_algorithm
def retrieve_files(self):
# Loop for all available files.
filebody = Table('filebody', self.metadata, autoload=True)
s = select([filebody.c.storedfilepath])
rp = self.conn.execute(s)
paths = rp.fetchall()
path = rp.fetchone()
while path:
yield path
path = rp.fetchone()
rp.close()
def execute(self):
blocksize = self.backend.block_size
blockhash = self.backend.hash_algorithm
for path in paths:
for (path,) in self.retrieve_files():
map = HashMap(blocksize, blockhash)
map.load(path)
try:
map.load(open(path))
except Exception, e:
print e
continue
hash = hexlify(map.hash())
if hash != self.cache_get(path):
if hash != self.cache.get(path):
missing = self.backend.blocker.block_ping(map) # XXX Backend hack...
status = '[>] ' + path
if missing:
......@@ -105,7 +88,7 @@ class DataMigration(Migration):
self.backend.put_block(block)
else:
status += ' - no blocks missing'
self.cache_put(path, hash)
self.cache.put(path, hash)
else:
status = '[-] ' + path
print status
......
......@@ -46,12 +46,12 @@ from django.conf import settings
from pithos.backends.modular import CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED
from pithos.backends.lib.sqlalchemy.node import Node
from pithos.backends.lib.sqlalchemy.dbwrapper import DBWrapper
from lib.transfer import upload
from lib.hashmap import HashMap, file_read_iterator
from lib.client import Fault
from lib.migrate import Migration
from lib.migrate import Migration, Cache
from calendar import timegm
import json
import os
......@@ -60,34 +60,11 @@ import hashlib
import mimetypes
class ObjectMigration(Migration):
def __init__(self, old_db):
def __init__(self, old_db, db, f):
Migration.__init__(self, old_db)
self.wrapper = ClientWrapper(self.backend)
params = {'wrapper': DBWrapper(self.backend.db)}
self.node = Node(**params)
def create_default_containers(self):
users = PithosUser.objects.all()
for u in users:
print '#', u.uniq
try:
self.wrapper.create_container('pithos', u.uniq)
self.wrapper.create_container('trash', u.uniq)
except NameError, e:
pass
self.cache = Cache(db)
def get_path(self, child_id):
folderTable = Table('folder', self.metadata, autoload=True)
s = select([folderTable.c.parent_id, folderTable.c.name])
s = s.where(folderTable.c.id == child_id)
rp = self.conn.execute(s)
parent_id, foldername = rp.fetchone()
if not parent_id:
return ''
else:
return '%s/%s' %(self.get_path(parent_id), foldername)
def create_object(self, username, container, object, filepath, mimetype):
def create_node(self, username, container, object, filepath, mimetype):
obj = ''
path = '/'.join(object.split('/')[:-1])
name = object.split('/')[-1]
......@@ -95,35 +72,78 @@ class ObjectMigration(Migration):
for f in path.split('/'):
obj = '%s/%s' %(obj, f) if obj else f
try:
self.wrapper.create_directory_marker(container, obj, username)
md5 = hashlib.md5()
meta = {'Content-Type':'application/directory',
'hash': md5.hexdigest().lower()}
self.backend.update_object_hashmap(username, username, container, obj, 0, [], meta)
except NameError, e:
pass
self.wrapper.set_account(username)
prefix = '%s/' %path if path else ''
print '#', filepath, container, prefix, name, mimetype
return upload(self.wrapper, filepath, container, prefix, name, mimetype)
parent_path = '%s/%s' %(username, container)
parent_node = self.backend.node.node_lookup(parent_path)
path = '%s/%s' %(parent_path, object)
nodeid = self.backend.node.node_create(parent_node, path)
return nodeid
def create_history(self, user, header_id, node_id, deleted=False):
filebody = Table('filebody', self.metadata, autoload=True)
gss_user = Table('gss_user', self.metadata, autoload=True)
j = filebody.join(gss_user, filebody.c.modifiedby_id == gss_user.c.id)
s = select([filebody.c.filesize, gss_user.c.username], from_obj=j)
s = s.where(filebody.c.header_id == header_id)
s = s.order_by(filebody.c.version)
rp = self.conn.execute(s)
versions = rp.fetchall()
print '#', len(versions)
rp.close()
def create_history(self, header_id, node_id, deleted=False):
i = 0
for size, modyfied_by in versions:
cluster = CLUSTER_HISTORY if i < len(versions) - 1 else CLUSTER_NORMAL
map = HashMap(self.backend.block_size, self.backend.hash_algorithm)
for t, rowcount in self.retrieve_node_versions(header_id):
size, modyfied_by, filepath, mimetype, modificationdate = t
cluster = CLUSTER_HISTORY if i < rowcount - 1 else CLUSTER_NORMAL
cluster = cluster if not deleted else CLUSTER_DELETED
args = (node_id, size, None, modyfied_by, cluster)
self.node.version_create(*args)
hash = self.cache.get(filepath)
if hash == None:
raise Exception("Missing hash")
args = (node_id, hash, size, None, modyfied_by, cluster)
serial = self.backend.node.version_create(*args)[0]
meta = {'hash':hash,
'content-type':mimetype}
self.backend.node.attribute_set(serial, ((k, v) for k, v in meta.iteritems()))
timestamp = timegm(modificationdate.timetuple())
microseconds = modificationdate.time().microsecond
f.write('update versions set mtime=\'%10d.%6d\' where serial=%s;' %(timestamp, microseconds, serial))
i += 1
def create_metadata(self, header_id, node_id):
for t in self.retrieve_metadata(header_id):
pass
def create_objects(self):
for username, headerid, folderid, filename, deleted, filepath, mimetype in self.retrieve_current_nodes():
path = self.retrieve_path(folderid)[1:]
container = 'pithos' if not deleted else 'trash'
#create container if it does not exist
try:
self.backend._lookup_container(username, container)
except NameError:
self.backend.put_container(username, username, container)
#create node
object = '%s/%s' %(path, filename)
nodeid = self.create_node(username, container, object, filepath, mimetype)
#create node history
self.create_history(headerid, nodeid, deleted)
self.create_metadata(headerid, nodeid)
#self.set_public()
#self.statistics()
#self.set_permissions()
def retrieve_path(self, child_id):
folderTable = Table('folder', self.metadata, autoload=True)
s = select([folderTable.c.parent_id, folderTable.c.name])
s = s.where(folderTable.c.id == child_id)
rp = self.conn.execute(s)
parent_id, foldername = rp.fetchone()
if not parent_id:
return ''
else:
return '%s/%s' %(self.retrieve_path(parent_id), foldername)
def retrieve_current_nodes(self):
fileheader = Table('fileheader', self.metadata, autoload=True)
filebody = Table('filebody', self.metadata, autoload=True)
folder = Table('folder', self.metadata, autoload=True)
......@@ -134,88 +154,50 @@ class ObjectMigration(Migration):
s = select([gss_user.c.username, fileheader.c.id, fileheader.c.folder_id,
fileheader.c.name, fileheader.c.deleted, filebody.c.storedfilepath,
filebody.c.mimetype], from_obj=j)
s = s.limit(1)
rp = self.conn.execute(s)
objects = rp.fetchall()
for username, headerid, folderid, filename, deleted, filepath, mimetype in objects:
path = self.get_path(folderid)[1:]
container = 'pithos' if not deleted else 'trash'
object = '%s/%s' %(path, filename)
#filepath = '/Users/butters/Downloads/torvalds-linux-0f86267'
vserial = self.create_object(username, container, object, filepath, mimetype)
nodeid = self.node.version_get_properties(vserial, keys=('node',))[0]
self.create_history(username, headerid, nodeid, deleted)
self.node.version_remove(vserial)
#self.set_metadata()
#self.set_public()
#self.statistics()
#self.set_permissions()
def handle_deleted(self):
pass
def upload_dir(self, dir, prefix, user, container):
for f in os.listdir(dir):
fullpath = '%s/%s' %(dir, f)
if os.path.isfile(fullpath):
type = mimetypes.guess_type(fullpath)[0]
name = '/'.join(fullpath.split(prefix)[1:])
print '@', user, container, name, fullpath, type
self.create_object(user, container, name, fullpath, type)
else: self.upload_dir(fullpath, prefix, user, container)
class ClientWrapper(object):
"""Wraps client methods used by transfer.upload()
to ModularBackend methods"""
def __init__(self, backend):
self.backend = backend
self.block_size = self.backend.block_size
self.block_hash = self.backend.hash_algorithm
def set_account(self, account):
self.account = account
def create_container(self, container, account=None, **meta):
self.backend.put_container(account, account, container, meta)
def create_directory_marker(self, container, object, account=None):
md5 = hashlib.md5()
meta = {'Content-Type':'application/directory',
'hash': md5.hexdigest().lower()}
self.backend.update_object_hashmap(account, account, container, object, 0, [], meta)
object = rp.fetchone()
while object:
yield object
object = rp.fetchone()
rp.close()
def create_object_by_hashmap(self, container, object, map, mimetype=None):
hashmap = HashMap(self.block_size, self.block_hash)
for h in map['hashes']:
hashmap.append(h)
meta = {'hash':hexlify(hashmap.hash())}
if mimetype:
meta['content-type'] = mimetype
size = map['bytes']
try:
args = [self.account, self.account, container, object, size, map['hashes'], meta]
return self.backend.update_object_hashmap(*args)
except IndexError, ie:
fault = Fault(ie.data, 409)
raise fault
def retrieve_node_versions(self, header_id):
filebody = Table('filebody', self.metadata, autoload=True)
gss_user = Table('gss_user', self.metadata, autoload=True)
j = filebody.join(gss_user, filebody.c.modifiedby_id == gss_user.c.id)
s = select([filebody.c.filesize, gss_user.c.username,
filebody.c.storedfilepath, filebody.c.mimetype,
filebody.c.modificationdate], from_obj=j)
s = s.where(filebody.c.header_id == header_id)
s = s.order_by(filebody.c.version)
rp = self.conn.execute(s)
version = rp.fetchone()
while version:
yield version, rp.rowcount
version = rp.fetchone()
rp.close()
def update_container_data(self, container, f):
#just put the blocks
for block in file_read_iterator(f, self.block_size):
self.backend.put_block(block)
def retrieve_metadata(self, header_id):
filetag = Table('filetag', self.metadata, autoload=True)
s = filetag.select(filetag.c.fileid == header_id)
rp = self.conn.execute(s)
tag = rp.fetchone()
while tag:
yield tag
tag = tp.fetchone()
rp.close()
def retrieve_container_metadata(self, container):
return {'x-container-block-size':self.block_size,
'x-container-block-hash':self.block_hash}
def handle_deleted(self):
pass
if __name__ == "__main__":
old_db = ''
db = ''
ot = ObjectMigration(old_db)
#ot.create_default_containers()
#ot.create_objects()
p = ''
ot.upload_dir(p, p, 'chstath', 'linux')
f = open('fixdates.sql', 'w')
ot = ObjectMigration(old_db, db, f)
ot.create_objects()
f.close()
\ No newline at end of file
......@@ -65,6 +65,6 @@ class UserMigration(Migration):
user.save(update_timestamps=False)
if __name__ == "__main__":
db = 'postgresql://gss:m0ust@rda@62.217.112.56/pithos'
db = 'postgresql://gss@localhost/pithos'
m = UserMigration(db)
m.execute()
\ No newline at end of file
......@@ -69,6 +69,7 @@ class BaseTestCase(unittest.TestCase):
def setUp(self):
self.client = Pithos_Client(get_server(), get_auth(), get_user(),
get_api())
self._clean_account()
self.invalid_client = Pithos_Client(get_server(), get_auth(), 'invalid',
get_api())
#self.headers = {
......@@ -127,6 +128,9 @@ class BaseTestCase(unittest.TestCase):
self.return_codes = (400, 401, 404, 503,)
def tearDown(self):
self._clean_account()
def _clean_account(self):
for c in self.client.list_containers():
while True:
#list objects returns at most 10000 objects
......@@ -817,6 +821,24 @@ class ContainerPut(BaseTestCase):
self.client.create_container(self.containers[0])
self.assertTrue(not self.client.create_container(self.containers[0]))
def test_quota(self):
self.client.create_container(self.containers[0])
policy = {'quota':100}
self.client.set_container_policies('c1', **policy)
meta = self.client.retrieve_container_metadata('c1')
self.assertTrue('x-container-policy-quota' in meta)
self.assertEqual(meta['x-container-policy-quota'], '100')
args = ['c1', 'o1']
kwargs = {'length':101}
self.assert_raises_fault(413, self.upload_random_data, *args, **kwargs)
#reset quota
policy = {'quota':0}
self.client.set_container_policies('c1', **policy)
class ContainerPost(BaseTestCase):
def setUp(self):
BaseTestCase.setUp(self)
......@@ -839,13 +861,13 @@ class ContainerDelete(BaseTestCase):
self.containers = ['c1', 'c2']
for c in self.containers: <