Commit 83cf60a6 authored by Antony Chazapis's avatar Antony Chazapis

Check version when copying. Remove version option from move when using the...

Check version when copying. Remove version option from move when using the store utility. Fix version listing.

Fixes #1235
parents df82b3cf 26f5b9ac
......@@ -25,6 +25,7 @@ Document Revisions
========================= ================================
Revision Description
========================= ================================
0.7 (Sept 22, 2011) Suggest upload/download methods using hashmaps.
0.6 (Sept 13, 2011) Reply with Merkle hash as the ETag when updating objects.
\ Include version id in object replace/change replies.
\ Change conflict (409) replies format to text.
......@@ -1029,6 +1030,31 @@ A special application menu, or a section in application preferences, should be d
Browsing past versions of objects should be available both at the object and the container level. At the object level, a list of past versions can be included in the screen showing details or more information on the object (metadata, permissions, etc.). At the container level, it is suggested that clients use a ``history`` element, which presents to the user a read-only, time-variable view of ``pithos`` contents. This can be accomplished via the ``until`` parameter in listings. Optionally, ``history`` may include ``trash``.
Uploading and downloading data
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
By using hashmaps to upload and download objects the corresponding operations can complete much faster.
In the case of an upload, only the missing blocks will be submitted to the server:
* Calculate the hash value for each block of the object to be uploaded. Use the hash algorithm and block size of the destination container.
* Send a hashmap ``PUT`` request for the object.
* Server responds with status ``201`` (Created):
* Blocks are already on the server. The object has been created. Done.
* Server responds with status ``409`` (Conflict):
* Server's response body contains the hashes of the blocks that do not exist on the server.
* For each one of the hash values in the server's response:
* Send a ``PUT`` request to the server with the corresponding data block. Individual blocks are uploaded to a file named ``.upload``.
* Repeat hashmap ``PUT``. Fail if the server's response is not ``201``.
Consulting hashmaps when downloading allows for resuming partially transferred objects. The client should retrieve the hashmap from the server and compare it with the hashmap computed from the respective local file. Any missing parts can be downloaded with ``GET`` requests with the additional ``Range`` header.
Recommended Practices and Examples
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
......
......@@ -7,7 +7,8 @@
"auth_token": "0000",
"auth_token_created": "2011-04-07 09:17:14",
"auth_token_expires": "2015-04-07 09:17:14",
"created": "2011-02-06"
"created": "2011-02-06",
"updated": "2011-02-06"
}
},
{
......@@ -18,7 +19,8 @@
"auth_token": "0001",
"auth_token_created": "2011-04-07 09:17:14",
"auth_token_expires": "2015-04-07 09:17:14",
"created": "2011-02-06"
"created": "2011-02-06",
"updated": "2011-02-06"
}
},
{
......@@ -29,7 +31,8 @@
"auth_token": "0002",
"auth_token_created": "2011-04-07 09:17:14",
"auth_token_expires": "2015-04-07 09:17:14",
"created": "2011-02-06"
"created": "2011-02-06",
"updated": "2011-02-06"
}
},
{
......@@ -40,7 +43,8 @@
"auth_token": "0003",
"auth_token_created": "2011-04-07 09:17:14",
"auth_token_expires": "2015-04-07 09:17:14",
"created": "2011-02-06"
"created": "2011-02-06",
"updated": "2011-02-06"
}
},
{
......@@ -51,7 +55,8 @@
"auth_token": "0004",
"auth_token_created": "2011-04-07 09:17:14",
"auth_token_expires": "2015-04-07 09:17:14",
"created": "2011-02-06"
"created": "2011-02-06",
"updated": "2011-02-06"
}
},
{
......@@ -62,7 +67,8 @@
"auth_token": "0005",
"auth_token_created": "2011-04-07 09:17:14",
"auth_token_expires": "2015-04-07 09:17:14",
"created": "2011-02-06"
"created": "2011-02-06",
"updated": "2011-02-06"
}
},
{
......@@ -73,7 +79,8 @@
"auth_token": "0006",
"auth_token_created": "2011-04-07 09:17:14",
"auth_token_expires": "2015-04-07 09:17:14",
"created": "2011-02-06"
"created": "2011-02-06",
"updated": "2011-02-06"
}
},
{
......@@ -84,7 +91,8 @@
"auth_token": "0007",
"auth_token_created": "2011-04-07 09:17:14",
"auth_token_expires": "2015-04-07 09:17:14",
"created": "2011-02-06"
"created": "2011-02-06",
"updated": "2011-02-06"
}
},
{
......@@ -95,7 +103,8 @@
"auth_token": "0008",
"auth_token_created": "2011-04-07 09:17:14",
"auth_token_expires": "2015-04-07 09:17:14",
"created": "2011-02-06"
"created": "2011-02-06",
"updated": "2011-02-06"
}
},
{
......@@ -106,7 +115,8 @@
"auth_token": "0009",
"auth_token_created": "2011-04-07 09:17:14",
"auth_token_expires": "2015-04-07 09:17:14",
"created": "2011-02-06"
"created": "2011-02-06",
"updated": "2011-02-06"
}
}
]
......@@ -43,10 +43,19 @@ class PithosUser(models.Model):
affiliation = models.CharField('Affiliation', max_length=255, default='')
quota = models.IntegerField('Storage Limit', default=settings.DEFAULT_QUOTA)
auth_token = models.CharField('Authentication Token', max_length=32, null=True)
auth_token_created = models.DateTimeField('Time of auth token creation', auto_now_add=True)
auth_token_expires = models.DateTimeField('Time of auth token expiration', auto_now_add=True)
created = models.DateTimeField('Time of creation', auto_now_add=True)
updated = models.DateTimeField('Time of last update', auto_now=True)
auth_token_created = models.DateTimeField('Time of auth token creation')
auth_token_expires = models.DateTimeField('Time of auth token expiration')
created = models.DateTimeField('Time of creation')
updated = models.DateTimeField('Time of last update')
def save(self, update_timestamps=True):
if update_timestamps:
if not self.id:
self.created = datetime.datetime.now()
self.auth_token_created = datetime.datetime.now()
self.auth_token_expires = datetime.datetime.now()
self.updated = datetime.datetime.now()
super(PithosUser, self).save()
class Meta:
verbose_name = u'Pithos User'
......
......@@ -64,7 +64,7 @@ def create_auth_token(user):
md5.update(user.realname.encode('ascii', 'ignore'))
md5.update(asctime())
user.auth_token = md5.hexdigest()
user.auth_token = b64encode(md5.digest())
user.auth_token_created = datetime.now()
user.auth_token_expires = user.auth_token_created + \
timedelta(hours=settings.AUTH_TOKEN_DURATION)
......
......@@ -32,6 +32,8 @@
# or implied, of GRNET S.A.
from sqlalchemy import create_engine
#from sqlalchemy.event import listen
from sqlalchemy.engine import Engine
from sqlalchemy.pool import NullPool
from sqlalchemy.interfaces import PoolListener
......
......@@ -32,7 +32,7 @@
# or implied, of GRNET S.A.
from time import time
from sqlalchemy import Table, Integer, Float, Column, String, MetaData, ForeignKey
from sqlalchemy import Table, Integer, BigInteger, Float, Column, String, MetaData, ForeignKey
from sqlalchemy.schema import Index, Sequence
from sqlalchemy.sql import func, and_, or_, null, select, bindparam
from sqlalchemy.ext.compiler import compiles
......@@ -126,7 +126,7 @@ class Node(DBWorker):
onupdate='CASCADE'),
primary_key=True))
columns.append(Column('population', Integer, nullable=False, default=0))
columns.append(Column('size', Integer, nullable=False, default=0))
columns.append(Column('size', BigInteger, nullable=False, default=0))
columns.append(Column('mtime', Float))
columns.append(Column('cluster', Integer, nullable=False, default=0,
primary_key=True))
......@@ -139,7 +139,7 @@ class Node(DBWorker):
ForeignKey('nodes.node',
ondelete='CASCADE',
onupdate='CASCADE')))
columns.append(Column('size', Integer, nullable=False, default=0))
columns.append(Column('size', BigInteger, nullable=False, default=0))
columns.append(Column('source', Integer))
columns.append(Column('mtime', Float))
columns.append(Column('muser', String(255), nullable=False, default=''))
......
......@@ -395,7 +395,13 @@ class ModularBackend(BaseBackend):
if version is None:
modified = props[self.MTIME]
else:
modified = self._get_version(node)[self.MTIME] # Overall last modification.
try:
modified = self._get_version(node)[self.MTIME] # Overall last modification.
except NameError: # Object may be deleted.
del_props = self.node.version_lookup(node, inf, CLUSTER_DELETED)
if del_props is None:
raise NameError('Object does not exist')
modified = del_props[self.MTIME]
meta = dict(self.node.attribute_get(props[self.SERIAL]))
meta.update({'name': name, 'bytes': props[self.SIZE]})
......@@ -508,6 +514,7 @@ class ModularBackend(BaseBackend):
self._can_read(user, account, src_container, src_name)
self._can_write(user, account, dest_container, dest_name)
src_path, src_node = self._lookup_object(account, src_container, src_name)
self._get_version(src_node, src_version)
if permissions is not None:
dest_path = '/'.join((account, container, name))
self._check_permissions(dest_path, permissions)
......@@ -578,7 +585,8 @@ class ModularBackend(BaseBackend):
logger.debug("list_versions: %s %s %s", account, container, name)
self._can_read(user, account, container, name)
path, node = self._lookup_object(account, container, name)
return self.node.node_get_versions(node, ['serial', 'mtime'])
versions = self.node.node_get_versions(node)
return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
@backend_method(autocommit=0)
def get_block(self, hash):
......
......@@ -931,20 +931,18 @@ class Pithos_Client(OOS_Client):
headers = {}
headers['x_object_public'] = public
if version:
headers['x_object_version'] = version
headers['x_source_version'] = version
return OOS_Client.copy_object(self, src_container, src_object,
dst_container, dst_object, meta=meta,
account=account, content_type=content_type,
**headers)
def move_object(self, src_container, src_object, dst_container,
dst_object, meta={}, public=False, version=None,
dst_object, meta={}, public=False,
account=None, content_type=None):
"""moves an object"""
headers = {}
headers['x_object_public'] = public
if version:
headers['x_object_version'] = version
return OOS_Client.move_object(self, src_container, src_object,
dst_container, dst_object, meta=meta,
account=account, content_type=content_type,
......
#!/usr/bin/env python
# Copyright 2011 GRNET S.A. All rights reserved.
#
# Redistribution and use in source and binary forms, with or
# without modification, are permitted provided that the following
# conditions are met:
#
# 1. Redistributions of source code must retain the above
# copyright notice, this list of conditions and the following
# disclaimer.
#
# 2. Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials
# provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# The views and conclusions contained in the software and
# documentation are those of the authors and should not be
# interpreted as representing official policies, either expressed
# or implied, of GRNET S.A.
from sqlalchemy import create_engine
from sqlalchemy import Table, MetaData
from sqlalchemy.sql import select
from pithos.api.util import hashmap_hash, get_container_headers
from pithos.backends.lib.hashfiler import Blocker, Mapper
from pithos.aai.models import PithosUser
from django.conf import settings
from pithos.backends.modular import ModularBackend
import json
import base64
import os
class Migration(object):
def __init__(self, db):
self.engine = create_engine(db)
self.metadata = MetaData(self.engine)
#self.engine.echo = True
self.conn = self.engine.connect()
def execute(self):
pass
class UserMigration(Migration):
def __init__(self, db):
Migration.__init__(self, db)
self.gss_users = Table('gss_user', self.metadata, autoload=True)
def execute(self):
s = self.gss_users.select()
users = self.conn.execute(s).fetchall()
l = []
for u in users:
user = PithosUser()
user.pk = u['id']
user.uniq = u['username']
user.realname = u['name']
user.is_admin = False
user.affiliation = u['homeorganization'] if u['homeorganization'] else ''
user.auth_token = base64.b64encode(u['authtoken'])
user.auth_token_created = u['creationdate']
user.auth_token_expires = u['authtokenexpirydate']
user.created = u['creationdate']
user.updated = u['modificationdate']
print '#', user
user.save(update_timestamps=False)
class DataMigration(Migration):
def __init__(self, db, path, block_size, hash_algorithm):
Migration.__init__(self, db)
params = {'blocksize': block_size,
'blockpath': os.path.join(path + '/blocks'),
'hashtype': hash_algorithm}
self.blocker = Blocker(**params)
params = {'mappath': os.path.join(path + '/maps'),
'namelen': self.blocker.hashlen}
self.mapper = Mapper(**params)
def execute(self):
filebody = Table('filebody', self.metadata, autoload=True)
s = select([filebody.c.id, filebody.c.storedfilepath])
rp = self.conn.execute(s)
while True:
t = rp.fetchone()
if not t:
break
id, path = t
print '#', id, path
hashlist = self.blocker.block_stor_file(open(path))[1]
self.mapper.map_stor(id, hashlist)
rp.close()
class ObjectMigration(DataMigration):
def __init__(self, db, path, block_size, hash_algorithm):
DataMigration.__init__(self, db, path, block_size, hash_algorithm)
options = getattr(settings, 'BACKEND', None)[1]
self.backend = ModularBackend(*options)
def create_default_containers(self):
users = PithosUser.objects.all()
for u in users:
try:
self.backend.put_container(u.uniq, u.uniq, 'pithos', {})
self.backend.put_container(u.uniq, u.uniq, 'trash', {})
except NameError, e:
pass
def create_directory_markers(self, parent_id=None, path=None):
folderTable = Table('folder', self.metadata, autoload=True)
userTable = Table('gss_user', self.metadata, autoload=True)
s = select([folderTable.c.id, folderTable.c.name, userTable.c.username])
s = s.where(folderTable.c.parent_id == parent_id)
s = s.where(folderTable.c.owner_id == userTable.c.id)
rp = self.conn.execute(s)
while True:
t = rp.fetchone()
if not t:
path = None
break
id, name, uuniq = t[0], t[1], t[2]
#print id, name, uuniq
if parent_id:
obj = '%s/%s' %(path, name) if path else name
print '#', obj
self.backend.update_object_hashmap(uuniq, uuniq, 'pithos', obj, 0, [])
else:
obj = ''
self.create_directory_markers(id, path=obj)
rp.close()
path = None
def execute(self):
filebody = Table('filebody', self.metadata, autoload=True)
s = select([filebody.c.id])
rp = self.conn.execute(s)
while True:
id = rp.fetchone()
if not id:
break
meta = {}
hashlist = self.mapper.map_retr(id)
#hashmap = d['hashes']
#size = int(d['bytes'])
#meta.update({'hash': hashmap_hash(request, hashmap)})
#version_id = backend.update_object_hashmap(request.user, v_account,
# v_container, v_object,
# size, hashmap)
rp.close()
if __name__ == "__main__":
db = ''
t = UserMigration(db)
t.execute()
basepath = options = getattr(settings, 'PROJECT_PATH', None)
params = {'db':db,
'path':os.path.join(basepath, 'data/pithos/'),
'block_size':(4 * 1024 * 1024),
'hash_algorithm':'sha256'}
dt = DataMigration(**params)
dt.execute()
ot = ObjectMigration(**params)
ot.create_default_containers()
ot.create_directory_markers()
\ No newline at end of file
#!/usr/bin/env python
import os
import hashlib
import sys
from binascii import hexlify, unhexlify
from cStringIO import StringIO
from lib.client import Pithos_Client, Fault
from lib.util import get_user, get_auth, get_server, get_api
# XXX Get these from container...
BLOCK_SIZE = 4 * 1024 * 1024
BLOCK_HASH = 'sha256'
def file_read_iterator(fp, size=1024):
while True:
data = fp.read(size)
if not data:
break
yield data
class HashMap(list):
def __init__(self, f):
super(HashMap, self).__init__()
self.load(f)
def _hash_raw(self, v):
h = hashlib.new(BLOCK_HASH)
h.update(v)
return h.digest()
def _hash_block(self, v):
return self._hash_raw(v.rstrip('\x00'))
def hash(self):
if len(self) == 0:
return self._hash_raw('')
if len(self) == 1:
return self.__getitem__(0)
h = list(self)
s = 2
while s < len(h):
s = s * 2
h += [('\x00' * len(h[0]))] * (s - len(h))
while len(h) > 1:
h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
return h[0]
def load(self, f):
with open(f) as fp:
for block in file_read_iterator(fp, BLOCK_SIZE):
self.append(self._hash_block(block))
def smart_upload(client, file):
dest_container = 'pithos'
dest_object = os.path.split(file)[-1]
size = os.path.getsize(file)
hashes = HashMap(sys.argv[1])
map = {'bytes': size, 'hashes': [hexlify(x) for x in hashes]}
try:
client.create_object_by_hashmap(dest_container, dest_object, map)
except Fault, fault:
if fault.status != 409:
raise
else:
return
missing = fault.data.split('\n')
if '' in missing:
del missing[missing.index(''):]
with open(file) as fp:
for hash in missing:
offset = hashes.index(unhexlify(hash)) * BLOCK_SIZE
fp.seek(offset)
block = fp.read(BLOCK_SIZE)
client.create_object('pithos', '.upload', StringIO(block))
client.create_object_by_hashmap(dest_container, dest_object, map)
if __name__ == '__main__':
if len(sys.argv) != 2 or not os.path.isfile(sys.argv[1]):
print 'syntax: %s <file>' % sys.argv[0]
sys.exit(1)
client = Pithos_Client(get_server(), get_auth(), get_user())
smart_upload(client, sys.argv[1])
......@@ -551,9 +551,6 @@ class MoveObject(Command):
description = 'move an object to a different location'
def add_options(self, parser):
parser.add_option('--version', action='store',
dest='version', default=None,
help='move a specific object version')
parser.add_option('--public', action='store_true',
dest='public', default=False,
help='make object publicly accessible')
......@@ -576,7 +573,7 @@ class MoveObject(Command):
args = {'content_type':self.content_type} if self.content_type else {}
self.client.move_object(src_container, src_object, dst_container,
dst_object, meta, self.public, self.version, **args)
dst_object, meta, self.public, **args)
@cli_command('unset')
class UnsetObject(Command):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment