Commit 217bc121 authored by Antony Chazapis's avatar Antony Chazapis

Container-level block upload documentation and integration with tools.

Fixes #1371
......@@ -25,9 +25,12 @@ Document Revisions
========================= ================================
Revision Description
========================= ================================
0.7 (Sept 28, 2011) Suggest upload/download methods using hashmaps.
0.7 (Oct 4, 2011) Suggest upload/download methods using hashmaps.
\ Propose syncing algorithm.
\ Support cross-account object copy and move.
\ Pass token as a request parameter when using ``POST`` via an HTML form.
\ Optionally use source account to update object from another object.
\ Use container ``POST`` to upload missing blocks of data.
0.6 (Sept 13, 2011) Reply with Merkle hash as the ETag when updating objects.
\ Include version id in object replace/change replies.
\ Change conflict (409) replies format to text.
......@@ -497,6 +500,9 @@ POST
==================== ================================
Request Header Name Value
==================== ================================
Content-Length The size of the supplied data (optional, to upload)
Content-Type The MIME content type of the supplied data (optional, to upload)
Transfer-Encoding Set to ``chunked`` to specify incremental uploading (if used, ``Content-Length`` is ignored)
X-Container-Policy-* Container behavior and limits
X-Container-Meta-* Optional user defined metadata
==================== ================================
......@@ -509,11 +515,13 @@ Request Parameter Name Value
update Do not replace metadata/policy (no value parameter)
====================== ============================================
No reply content/headers.
No reply content/headers, except when uploading data, where the reply consists of a list of hashes for the blocks received (in a simple text format, with one hash per line).
The operation will overwrite all user defined metadata, except if ``update`` is defined.
To change policy, include an ``X-Container-Policy-*`` header with the name in the key. If no ``X-Container-Policy-*`` header is present, no changes will be applied to policy. The ``update`` parameter also applies to policy - deleted values will revert to defaults. To delete/revert a specific policy directive, use ``update`` and an empty header value. See container ``PUT`` for a reference of policy directives.
To upload blocks of data to the container, set ``Content-Type`` to ``application/octet-stream`` and ``Content-Length`` to a valid value (except if using ``chunked`` as the ``Transfer-Encoding``).
================ ===============================
Return Code Description
================ ===============================
......@@ -825,6 +833,7 @@ Transfer-Encoding Set to ``chunked`` to specify incremental uploading (if us
Content-Encoding The encoding of the object (optional)
Content-Disposition The presentation style of the object (optional)
X-Source-Object Update with data from the object at path ``/<container>/<object>`` (optional, to update)
X-Source-Account The source account to update from
X-Source-Version The source version to update from (optional, to update)
X-Object-Bytes The updated object's final size (optional, when updating)
X-Object-Manifest Object parts prefix in ``<container>/<object>`` form (optional)
......@@ -880,15 +889,14 @@ Return Code Description
416 (Range Not Satisfiable) The supplied range is invalid
=========================== ==============================
The ``POST`` method can also be used for creating an object via a standard HTML form. If the request ``Content-Type`` is ``multipart/form-data``, none of the above headers will be processed. The form should have exactly two fields, as in the following example. ::
The ``POST`` method can also be used for creating an object via a standard HTML form. If the request ``Content-Type`` is ``multipart/form-data``, none of the above headers will be processed. The form should have an ``X-Object-Data`` field, as in the following example. The token is passed as a request parameter. ::
<form method="post" action="https://pithos.dev.grnet.gr/v1/user/folder/EXAMPLE.txt" enctype="multipart/form-data">
<input type="hidden" name="X-Auth-Token" value="0000">
<form method="post" action="https://pithos.dev.grnet.gr/v1/user/folder/EXAMPLE.txt?X-Auth-Token=0000" enctype="multipart/form-data">
<input type="file" name="X-Object-Data">
<input type="submit">
</form>
This will create/override the object with the given name, as if using ``PUT``. The ``Content-Type`` of the object will be set to the value of the corresponding header sent in the part of the request containing the data. Metadata, sharing and other object attributes can not be set this way.
This will create/override the object with the given name, as if using ``PUT``. The ``Content-Type`` of the object will be set to the value of the corresponding header sent in the part of the request containing the data (usually, automatically handled by the browser). Metadata, sharing and other object attributes can not be set this way.
========================== ===============================
Reply Header Name Value
......@@ -1053,9 +1061,9 @@ In the case of an upload, only the missing blocks will be submitted to the serve
* Server responds with status ``409`` (Conflict):
* Server's response body contains the hashes of the blocks that do not exist on the server.
* For each one of the hash values in the server's response:
* For each hash value in the server's response (or all hashes together):
* Send a ``PUT`` request to the server with the corresponding data block. Individual blocks are uploaded to a file named ``.upload``.
* Send a ``POST`` request to the destination container with the corresponding data.
* Repeat hashmap ``PUT``. Fail if the server's response is not ``201``.
......
......@@ -51,7 +51,7 @@ class BadRequest(Fault):
class Unauthorized(Fault):
code = 401
class ResizeNotAllowed(Fault):
class Forbidden(Fault):
code = 403
class ItemNotFound(Fault):
......
......@@ -392,7 +392,23 @@ def container_update(request, v_account, v_container):
raise Unauthorized('Access denied')
except NameError:
raise ItemNotFound('Container does not exist')
return HttpResponse(status=202)
content_length = -1
if request.META.get('HTTP_TRANSFER_ENCODING') != 'chunked':
content_length = get_int_parameter(request.META.get('CONTENT_LENGTH', 0))
content_type = request.META.get('CONTENT_TYPE')
hashmap = []
if content_type and content_type == 'application/octet-stream' and content_length != 0:
for data in socket_read_iterator(request, content_length,
request.backend.block_size):
# TODO: Raise 408 (Request Timeout) if this takes too long.
# TODO: Raise 499 (Client Disconnect) if a length is defined and we stop before getting this much data.
hashmap.append(request.backend.put_block(data))
response = HttpResponse(status=202)
if hashmap:
response.content = '\n'.join(hashmap) + '\n'
return response
@api_method('DELETE')
def container_delete(request, v_account, v_container):
......@@ -699,6 +715,7 @@ def object_read(request, v_account, v_container, v_object):
response['Content-Length'] = len(data)
return response
request.serialization = 'text' # Unset.
return object_data_response(request, sizes, hashmaps, meta)
@api_method('PUT', format_allowed=True)
......@@ -1041,13 +1058,16 @@ def object_update(request, v_account, v_container, v_object):
elif offset > size:
raise RangeNotSatisfiable('Supplied offset is beyond object limits')
if src_object:
src_account = smart_unicode(request.META.get('HTTP_X_SOURCE_ACCOUNT'), strings_only=True)
if not src_account:
src_account = request.user
src_container, src_name = split_container_object_string(src_object)
src_container = smart_unicode(src_container, strings_only=True)
src_name = smart_unicode(src_name, strings_only=True)
src_version = request.META.get('HTTP_X_SOURCE_VERSION')
try:
src_size, src_hashmap = request.backend.get_object_hashmap(
request.user, v_account, src_container, src_name, src_version)
src_size, src_hashmap = request.backend.get_object_hashmap(request.user,
src_account, src_container, src_name, src_version)
except NotAllowedError:
raise Unauthorized('Access denied')
except NameError:
......
......@@ -303,7 +303,7 @@ def copy_or_move_object(request, src_account, src_container, src_name, dest_acco
except ValueError:
raise BadRequest('Invalid sharing header')
except AttributeError, e:
raise Conflict(json.dumps(e.data))
raise Conflict('\n'.join(e.data) + '\n')
if public is not None:
try:
request.backend.update_object_public(request.user, dest_account, dest_container, dest_name, public)
......
......@@ -711,6 +711,16 @@ class Pithos_Client(OOS_Client):
headers['x-container-policy-%s' % key] = val
return self.post(path, headers=headers)
def update_container_data(self, container, f=stdin):
"""adds blocks of data to the container"""
account = self.account
path = '/%s/%s' % (account, container)
params = {'update': None}
headers = {'content_type': 'application/octet-stream'}
data = f.read() if f else None
headers['content_length'] = len(data)
return self.post(path, data, headers=headers, params=params)
def delete_container(self, container, until=None, account=None):
"""deletes a container or the container history until the date provided"""
account = account or self.account
......
......@@ -42,11 +42,10 @@ def file_read_iterator(fp, size=1024):
class HashMap(list):
def __init__(self, f, blocksize, blockhash):
def __init__(self, blocksize, blockhash):
super(HashMap, self).__init__()
self.blocksize = blocksize
self.blockhash = blockhash
self.load(f)
def _hash_raw(self, v):
h = hashlib.new(self.blockhash)
......@@ -74,4 +73,4 @@ class HashMap(list):
def load(self, f):
with open(f) as fp:
for block in file_read_iterator(fp, self.blocksize):
self.append(self._hash_block(block))
\ No newline at end of file
self.append(self._hash_block(block))
#!/usr/bin/env python
# Copyright 2011 GRNET S.A. All rights reserved.
#
# Redistribution and use in source and binary forms, with or
# without modification, are permitted provided that the following
# conditions are met:
#
# 1. Redistributions of source code must retain the above
# copyright notice, this list of conditions and the following
# disclaimer.
#
# 2. Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials
# provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# The views and conclusions contained in the software and
# documentation are those of the authors and should not be
# interpreted as representing official policies, either expressed
# or implied, of GRNET S.A.
from sqlalchemy import create_engine
from sqlalchemy import Table, MetaData
from pithos.backends.modular import ModularBackend
class Migration(object):
def __init__(self, db):
self.engine = create_engine(db)
self.metadata = MetaData(self.engine)
#self.engine.echo = True
self.conn = self.engine.connect()
def execute(self):
pass
\ No newline at end of file
......@@ -31,39 +31,75 @@
# interpreted as representing official policies, either expressed
# or implied, of GRNET S.A.
import os
import types
from hashmap import HashMap
from binascii import hexlify, unhexlify
from cStringIO import StringIO
from lib.client import Fault
from client import Fault
import os
import sys
def smart_upload(client, file, blocksize, blockhash):
dest_container = 'pithos'
dest_object = os.path.split(file)[-1]
def upload(client, file, container, prefix, name=None):
meta = client.retrieve_container_metadata(container)
blocksize = int(meta['x-container-block-size'])
blockhash = meta['x-container-block-hash']
size = os.path.getsize(file)
hashes = HashMap(sys.argv[1], blocksize, blockhash)
hashes = HashMap(blocksize, blockhash)
hashes.load(file)
map = {'bytes': size, 'hashes': [hexlify(x) for x in hashes]}
objectname = name if name else os.path.split(file)[-1]
object = prefix + objectname
try:
client.create_object_by_hashmap(dest_container, dest_object, map)
client.create_object_by_hashmap(container, object, map)
except Fault, fault:
if fault.status != 409:
raise
else:
return
missing = fault.data.split('\n')
if type(fault.data) == types.StringType:
missing = fault.data.split('\n')
elif type(fault.data) == types.ListType:
missing = fault.data
if '' in missing:
del missing[missing.index(''):]
with open(file) as fp:
for hash in missing:
offset = hashes.index(unhexlify(hash)) * BLOCK_SIZE
offset = hashes.index(unhexlify(hash)) * blocksize
fp.seek(offset)
block = fp.read(BLOCK_SIZE)
client.create_object('pithos', '.upload', StringIO(block))
block = fp.read(blocksize)
client.update_container_data(container, StringIO(block))
client.create_object_by_hashmap(container, object, map)
def download(client, container, object, file):
meta = client.retrieve_container_metadata(container)
blocksize = int(meta['x-container-block-size'])
blockhash = meta['x-container-block-hash']
if os.path.isfile(file):
size = os.path.getsize(file)
hashes = HashMap(blocksize, blockhash)
hashes.load(file)
else:
size = 0
hashes = []
map = client.retrieve_object_hashmap(container, object)
client.create_object_by_hashmap(dest_container, dest_object, map)
\ No newline at end of file
with open(file, 'a') as fp:
for i, h in enumerate(map):
if i < len(hashes) and h == hashes[i]:
continue
start = i * blocksize
end = '' if i == len(map) - 1 else (i + 1) * blocksize
data = client.retrieve_object(container, object, range='bytes=%s-%s' % (start, end))
fp.seek(start)
fp.write(data)
#!/usr/bin/env python
# Copyright 2011 GRNET S.A. All rights reserved.
#
# Redistribution and use in source and binary forms, with or
# without modification, are permitted provided that the following
# conditions are met:
#
# 1. Redistributions of source code must retain the above
# copyright notice, this list of conditions and the following
# disclaimer.
#
# 2. Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials
# provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# The views and conclusions contained in the software and
# documentation are those of the authors and should not be
# interpreted as representing official policies, either expressed
# or implied, of GRNET S.A.
from binascii import hexlify
from sqlalchemy import create_engine
from sqlalchemy import Table, Column, String, MetaData
from sqlalchemy.sql import select
from pithos import settings
from pithos.backends.modular import ModularBackend
from lib.hashmap import HashMap
from lib.migrate import Migration
class DataMigration(Migration):
def __init__(self, db):
Migration.__init__(self, db)
# XXX Need more columns for primary key - last modified timestamp...
columns=[]
columns.append(Column('path', String(2048), primary_key=True))
columns.append(Column('hash', String(255)))
self.files = Table('files', self.metadata, *columns)
self.metadata.create_all(self.engine)
def cache_put(self, path, hash):
# Insert or replace.
s = self.files.delete().where(self.files.c.path==path)
r = self.conn.execute(s)
r.close()
s = self.files.insert()
r = self.conn.execute(s, {'path': path, 'hash': hash})
r.close()
def cache_get(self, path):
s = select([self.files.c.hash], self.files.c.path == path)
r = self.conn.execute(s)
l = r.fetchone()
r.close()
if not l:
return l
return l[0]
def execute(self):
blocksize = self.backend.block_size
blockhash = self.backend.hash_algorithm
# Loop for all available files.
filebody = Table('filebody', self.metadata, autoload=True)
s = select([filebody.c.storedfilepath])
rp = self.conn.execute(s)
paths = rp.fetchall()
rp.close()
for path in paths:
map = HashMap(blocksize, blockhash)
map.load(path)
hash = hexlify(map.hash())
if hash != self.cache_get(path):
missing = self.backend.blocker.block_ping(map) # XXX Backend hack...
status = '[>] ' + path
if missing:
status += ' - %d block(s) missing' % len(missing)
with open(path) as fp:
for h in missing:
offset = map.index(h) * blocksize
fp.seek(offset)
block = fp.read(blocksize)
self.backend.put_block(block)
else:
status += ' - no blocks missing'
self.cache_put(path, hash)
else:
status = '[-] ' + path
print status
if __name__ == "__main__":
db = 'sqlite:///migrate.db'
dt = DataMigration(db)
dt.execute()
......@@ -33,154 +33,140 @@
# interpreted as representing official policies, either expressed
# or implied, of GRNET S.A.
from sqlalchemy import create_engine
from sqlalchemy import Table, MetaData
from sqlalchemy import Table
from sqlalchemy.sql import select
from pithos.api.util import hashmap_hash, get_container_headers
from pithos.backends.lib.hashfiler import Blocker, Mapper
from binascii import hexlify
from pithos.backends.lib.hashfiler import Blocker
from pithos.aai.models import PithosUser
from django.conf import settings
from pithos.backends.modular import ModularBackend
from lib.transfer import upload
from lib.hashmap import HashMap
from lib.client import Fault
from lib.migrate import Migration
import json
import base64
import os
class Migration(object):
def __init__(self, db):
self.engine = create_engine(db)
self.metadata = MetaData(self.engine)
#self.engine.echo = True
self.conn = self.engine.connect()
def execute(self):
pass
class UserMigration(Migration):
def __init__(self, db):
Migration.__init__(self, db)
self.gss_users = Table('gss_user', self.metadata, autoload=True)
def execute(self):
s = self.gss_users.select()
users = self.conn.execute(s).fetchall()
l = []
for u in users:
user = PithosUser()
user.pk = u['id']
user.uniq = u['username']
user.realname = u['name']
user.is_admin = False
user.affiliation = u['homeorganization'] if u['homeorganization'] else ''
user.auth_token = base64.b64encode(u['authtoken'])
user.auth_token_created = u['creationdate']
user.auth_token_expires = u['authtokenexpirydate']
user.created = u['creationdate']
user.updated = u['modificationdate']
print '#', user
user.save(update_timestamps=False)
class DataMigration(Migration):
def __init__(self, db, path, block_size, hash_algorithm):
Migration.__init__(self, db)
params = {'blocksize': block_size,
'blockpath': os.path.join(path + '/blocks'),
'hashtype': hash_algorithm}
self.blocker = Blocker(**params)
params = {'mappath': os.path.join(path + '/maps'),
'namelen': self.blocker.hashlen}
self.mapper = Mapper(**params)
def execute(self):
filebody = Table('filebody', self.metadata, autoload=True)
s = select([filebody.c.id, filebody.c.storedfilepath])
rp = self.conn.execute(s)
while True:
t = rp.fetchone()
if not t:
break
id, path = t
print '#', id, path
hashlist = self.blocker.block_stor_file(open(path))[1]
self.mapper.map_stor(id, hashlist)
rp.close()
import sys
import hashlib
class ObjectMigration(DataMigration):
def __init__(self, db, path, block_size, hash_algorithm):
DataMigration.__init__(self, db, path, block_size, hash_algorithm)
options = getattr(settings, 'BACKEND', None)[1]
self.backend = ModularBackend(*options)
self.wrapper = ClientWrapper()
def create_default_containers(self):
users = PithosUser.objects.all()
for u in users:
print '#', u.uniq
try:
self.backend.put_container(u.uniq, u.uniq, 'pithos', {})
self.backend.put_container(u.uniq, u.uniq, 'trash', {})
self.wrapper.create_container('pithos', u.uniq)
self.wrapper.create_container('trash', u.uniq)
except NameError, e:
pass
def create_directory_markers(self, parent_id=None, path=None):
def get_path(self, child_id):
folderTable = Table('folder', self.metadata, autoload=True)
userTable = Table('gss_user', self.metadata, autoload=True)
s = select([folderTable.c.id, folderTable.c.name, userTable.c.username])
s = s.where(folderTable.c.parent_id == parent_id)
s = s.where(folderTable.c.owner_id == userTable.c.id)
s = select([folderTable.c.parent_id, folderTable.c.name])
s = s.where(folderTable.c.id == child_id)
rp = self.conn.execute(s)
while True:
t = rp.fetchone()
if not t:
path = None
break
id, name, uuniq = t[0], t[1], t[2]
#print id, name, uuniq
if parent_id:
obj = '%s/%s' %(path, name) if path else name
print '#', obj
self.backend.update_object_hashmap(uuniq, uuniq, 'pithos', obj, 0, [])
else:
obj = ''
self.create_directory_markers(id, path=obj)
rp.close()
path = None
parent_id, foldername = rp.fetchone()
if not parent_id:
return ''
else:
return '%s/%s' %(self.get_path(parent_id), foldername)
def execute(self):
def create_objects(self):
fileheader = Table('fileheader', self.metadata, autoload=True)
filebody = Table('filebody', self.metadata, autoload=True)
s = select([filebody.c.id])
folder = Table('folder', self.metadata, autoload=True)
gss_user = Table('gss_user', self.metadata, autoload=True)
j = filebody.join(fileheader, filebody.c.header_id == fileheader.c.id)
j = j.join(folder, fileheader.c.folder_id == folder.c.id)
j = j.join(gss_user, fileheader.c.owner_id == gss_user.c.id)
s = select([gss_user.c.username, fileheader.c.folder_id, fileheader.c.name,
filebody.c.storedfilepath], from_obj=j)
rp = self.conn.execute(s)
while True:
id = rp.fetchone()
if not id:
break
meta = {}
hashlist = self.mapper.map_retr(id)
#hashmap = d['hashes']
#size = int(d['bytes'])
#meta.update({'hash': hashmap_hash(request, hashmap)})
#version_id = backend.update_object_hashmap(request.user, v_account,
# v_container, v_object,
# size, hashmap)
rp.close()
objects = rp.fetchall()
for username, folderid, filename, filepath in objects:
path = self.get_path(folderid)[1:]
obj = ''
#create directory markers
for f in path.split('/'):
obj = '%s/%s' %(obj, f) if obj else f
try:
self.wrapper.create_directory_marker('pithos', obj, username)
except NameError, e:
pass
self.wrapper.set_account(username)
print '#', username, path, filename
prefix = '%s/' %path if path else ''
upload(self.wrapper, filepath, 'pithos', prefix, filename)
class ClientWrapper(object):
"""Wraps client methods used by transfer.upload()
to ModularBackend methods"""
def __init__(self):
options = getattr(settings, 'BACKEND', None)[1]
self.backend = ModularBackend(*options)
self.block_size = self.backend.block_size
self.block_hash = self.backend.hash_algorithm
def set_account(self, account):