Commit e85237ac authored by Sofia Papagiannaki's avatar Sofia Papagiannaki

Migration Tools: Progess III

Refs #1171
parent a3e0302d
......@@ -86,6 +86,7 @@ class ModularBackend(BaseBackend):
__import__(mod)
self.mod = sys.modules[mod]
self.db = db
self.wrapper = self.mod.dbwrapper.DBWrapper(db)
params = {'blocksize': self.block_size,
......
......@@ -70,7 +70,8 @@ class HashMap(list):
h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
return h[0]
def load(self, f):
with open(f) as fp:
for block in file_read_iterator(fp, self.blocksize):
self.append(self._hash_block(block))
def load(self, fp):
self.size = 0
for block in file_read_iterator(fp, self.blocksize):
self.append(self._hash_block(block))
self.size += len(block)
......@@ -36,6 +36,8 @@
from sqlalchemy import create_engine
from sqlalchemy import Table, MetaData
from django.conf import settings
from pithos.backends.modular import ModularBackend
class Migration(object):
......@@ -44,6 +46,9 @@ class Migration(object):
self.metadata = MetaData(self.engine)
#self.engine.echo = True
self.conn = self.engine.connect()
options = getattr(settings, 'BACKEND', None)[1]
self.backend = ModularBackend(*options)
def execute(self):
pass
\ No newline at end of file
......@@ -40,7 +40,7 @@ from cStringIO import StringIO
from client import Fault
def upload(client, file, container, prefix, name=None):
def upload(client, file, container, prefix, name=None, mimetype=None):
meta = client.retrieve_container_metadata(container)
blocksize = int(meta['x-container-block-size'])
......@@ -48,18 +48,20 @@ def upload(client, file, container, prefix, name=None):
size = os.path.getsize(file)
hashes = HashMap(blocksize, blockhash)
hashes.load(file)
hashes.load(open(file))
map = {'bytes': size, 'hashes': [hexlify(x) for x in hashes]}
objectname = name if name else os.path.split(file)[-1]
object = prefix + objectname
kwargs = {'mimetype':mimetype} if mimetype else {}
v = None
try:
client.create_object_by_hashmap(container, object, map)
v = client.create_object_by_hashmap(container, object, map, **kwargs)
except Fault, fault:
if fault.status != 409:
raise
else:
return
return v
if type(fault.data) == types.StringType:
missing = fault.data.split('\n')
......@@ -76,7 +78,7 @@ def upload(client, file, container, prefix, name=None):
block = fp.read(blocksize)
client.update_container_data(container, StringIO(block))
client.create_object_by_hashmap(container, object, map)
return client.create_object_by_hashmap(container, object, map, **kwargs)
def download(client, container, object, file):
......
......@@ -46,14 +46,17 @@ from lib.hashmap import HashMap
from lib.migrate import Migration
class DataMigration(Migration):
def __init__(self, db):
Migration.__init__(self, db)
def __init__(self, pithosdb, db):
Migration.__init__(self, pithosdb)
# XXX Need more columns for primary key - last modified timestamp...
engine = create_engine(db)
metadata = MetaData(engine)
columns=[]
columns.append(Column('path', String(2048), primary_key=True))
columns.append(Column('hash', String(255)))
self.files = Table('files', self.metadata, *columns)
self.metadata.create_all(self.engine)
self.files = Table('files', metadata, *columns)
metadata.create_all(engine)
def cache_put(self, path, hash):
# Insert or replace.
......@@ -108,7 +111,8 @@ class DataMigration(Migration):
print status
if __name__ == "__main__":
pithosdb = ''
db = 'sqlite:///migrate.db'
dt = DataMigration(db)
dt = DataMigration(pithosdb, db)
dt.execute()
......@@ -39,14 +39,17 @@ from sqlalchemy.sql import select
from binascii import hexlify
from pithos.backends.lib.hashfiler import Blocker
from pithos.backends.lib.sqlalchemy import Node
from pithos.aai.models import PithosUser
from django.conf import settings
from pithos.backends.modular import ModularBackend
from pithos.backends.modular import CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED
from pithos.backends.lib.sqlalchemy.node import Node
from pithos.backends.lib.sqlalchemy.dbwrapper import DBWrapper
from lib.transfer import upload
from lib.hashmap import HashMap
from lib.hashmap import HashMap, file_read_iterator
from lib.client import Fault
from lib.migrate import Migration
......@@ -54,12 +57,15 @@ import json
import os
import sys
import hashlib
import mimetypes
class ObjectMigration(DataMigration):
def __init__(self, db, path, block_size, hash_algorithm):
DataMigration.__init__(self, db, path, block_size, hash_algorithm)
self.wrapper = ClientWrapper()
class ObjectMigration(Migration):
def __init__(self, old_db):
Migration.__init__(self, old_db)
self.wrapper = ClientWrapper(self.backend)
params = {'wrapper': DBWrapper(self.backend.db)}
self.node = Node(**params)
def create_default_containers(self):
users = PithosUser.objects.all()
for u in users:
......@@ -81,41 +87,88 @@ class ObjectMigration(DataMigration):
else:
return '%s/%s' %(self.get_path(parent_id), foldername)
def create_object(self, username, container, object, filepath, mimetype):
obj = ''
path = '/'.join(object.split('/')[:-1])
name = object.split('/')[-1]
#create directory markers
for f in path.split('/'):
obj = '%s/%s' %(obj, f) if obj else f
try:
self.wrapper.create_directory_marker('pithos', obj, username)
except NameError, e:
pass
self.wrapper.set_account(username)
prefix = '%s/' %path if path else ''
print '#', filepath, container, prefix, name, mimetype
return upload(self.wrapper, filepath, container, prefix, name, mimetype)
def create_history(self, user, header_id, node_id, deleted=False):
filebody = Table('filebody', self.metadata, autoload=True)
gss_user = Table('gss_user', self.metadata, autoload=True)
j = filebody.join(gss_user, filebody.c.modifiedby_id == gss_user.c.id)
s = select([filebody.c.filesize, gss_user.c.username], from_obj=j)
s = s.where(filebody.c.header_id == header_id)
s = s.order_by(filebody.c.version)
rp = self.conn.execute(s)
versions = rp.fetchall()
print '#', len(versions)
rp.close()
i = 0
for size, modyfied_by in versions:
cluster = CLUSTER_HISTORY if i < len(versions) - 1 else CLUSTER_NORMAL
cluster = cluster if not deleted else CLUSTER_DELETED
args = (node_id, size, None, modyfied_by, cluster)
self.node.version_create(*args)
i += 1
def create_objects(self):
fileheader = Table('fileheader', self.metadata, autoload=True)
filebody = Table('filebody', self.metadata, autoload=True)
folder = Table('folder', self.metadata, autoload=True)
gss_user = Table('gss_user', self.metadata, autoload=True)
j = filebody.join(fileheader, filebody.c.header_id == fileheader.c.id)
j = filebody.join(fileheader, filebody.c.id == fileheader.c.currentbody_id)
j = j.join(folder, fileheader.c.folder_id == folder.c.id)
j = j.join(gss_user, fileheader.c.owner_id == gss_user.c.id)
s = select([gss_user.c.username, fileheader.c.folder_id, fileheader.c.name,
filebody.c.storedfilepath], from_obj=j)
s = select([gss_user.c.username, fileheader.c.id, fileheader.c.folder_id,
fileheader.c.name, fileheader.c.deleted, filebody.c.storedfilepath,
filebody.c.mimetype], from_obj=j)
rp = self.conn.execute(s)
objects = rp.fetchall()
for username, folderid, filename, filepath in objects:
for username, headerid, folderid, filename, deleted, filepath, mimetype in objects:
path = self.get_path(folderid)[1:]
obj = ''
#create directory markers
for f in path.split('/'):
obj = '%s/%s' %(obj, f) if obj else f
try:
self.wrapper.create_directory_marker('pithos', obj, username)
except NameError, e:
pass
self.wrapper.set_account(username)
print '#', username, path, filename
prefix = '%s/' %path if path else ''
upload(self.wrapper, filepath, 'pithos', prefix, filename)
container = 'pithos' if not deleted else 'trash'
object = '%s/%s' %(path, filename)
#filepath = '/Users/butters/Downloads/torvalds-linux-0f86267'
vserial = self.create_object(username, container, object, filepath, mimetype)
nodeid = self.node.version_get_properties(vserial, keys=('node',))[0]
self.create_history(username, headerid, nodeid, deleted)
self.node.version_remove(vserial)
#self.set_metadata()
#self.set_public()
#self.statistics()
#self.set_permissions()
def handle_deleted(self):
pass
def upload_dir(self, dir, prefix, user, container):
for f in os.listdir(dir):
fullpath = '%s/%s' %(dir, f)
if os.path.isfile(fullpath):
type = mimetypes.guess_type(fullpath)[0]
name = '/'.join(fullpath.split(prefix)[1:])
print '@', user, container, name, fullpath, type
self.create_object(user, container, name, fullpath, type)
else: self.upload_dir(fullpath, prefix, user, container)
class ClientWrapper(object):
"""Wraps client methods used by transfer.upload()
to ModularBackend methods"""
def __init__(self):
options = getattr(settings, 'BACKEND', None)[1]
self.backend = ModularBackend(*options)
def __init__(self, backend):
self.backend = backend
self.block_size = self.backend.block_size
self.block_hash = self.backend.hash_algorithm
......@@ -131,42 +184,38 @@ class ClientWrapper(object):
'hash': md5.hexdigest().lower()}
self.backend.update_object_hashmap(account, account, container, object, 0, [], meta)
def create_object_by_hashmap(self, container, object, map):
def create_object_by_hashmap(self, container, object, map, mimetype=None):
hashmap = HashMap(self.block_size, self.block_hash)
for hash in map['hashes']:
hashmap.append(hash)
for h in map['hashes']:
hashmap.append(h)
meta = {'hash':hexlify(hashmap.hash())}
if mimetype:
meta['content-type'] = mimetype
size = map['bytes']
try:
args = [self.account, self.account, container, object, size, map['hashes'], meta]
self.backend.update_object_hashmap(*args)
return self.backend.update_object_hashmap(*args)
except IndexError, ie:
fault = Fault(ie.data, 409)
raise fault
def create_object(self, container, object, f):
hashmap = HashMap(self.block_size, self.block_hash)
hashmap.load(f)
map = [hexlify(x) for x in hashmap]
meta = {'hash':hashmap.hash()}
size = hashmap.size
self.backend.update_object_hashmap(self.account, self.account, container, object, size, hashmap, meta)
def update_container_data(self, container, f):
#just put the blocks
for block in file_read_iterator(f, self.block_size):
self.backend.put_block(block)
def retrieve_container_metadata(self, container):
return {'x-container-block-size':self.block_size,
'x-container-block-hash':self.block_hash}
if __name__ == "__main__":
db = ''
old_db = ''
basepath = options = getattr(settings, 'PROJECT_PATH', None)
params = {'db':db,
'path':os.path.join(basepath, 'data/pithos/'),
'block_size':(4 * 1024 * 1024),
'hash_algorithm':'sha256'}
ot = ObjectMigration(old_db)
#ot.create_default_containers()
#ot.create_objects()
ot = ObjectMigration(**params)
ot.create_default_containers()
ot.create_objects()
p = ''
ot.upload_dir(p, p, 'chstath', 'linux')
\ No newline at end of file
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment