Commit 65a45524 authored by Stavros Sachtouris's avatar Stavros Sachtouris
Browse files

Add arguments/options to upload

manifest is now a seperated method for uploading
old, naive upload is preserved with --unchunked option
parent a298f2ab
......@@ -227,8 +227,8 @@ class PithosClient(StorageClient):
return self.put(path, *args, success=success, **kwargs)
def container_post(self, update=True, format='json',
quota=None, versioning=None, metadata={}, content_type=None, content_length=None, transfer_encoding=None,
*args, **kwargs):
quota=None, versioning=None, metadata={}, content_type=None, content_length=None,
transfer_encoding=None, *args, **kwargs):
""" Full Pithos+ POST at container level
--- request params ---
@param update (bool): if True, Do not replace metadata/groups
......@@ -586,6 +586,29 @@ class PithosClient(StorageClient):
def purge_container(self):
self.container_delete(until=unicode(time()))
def upload_object_unchunked(self, obj, f, withHashFile = False, size=None, etag=None,
content_encoding=None, content_disposition=None, content_type=None, sharing=None,
public=None):
# This is a naive implementation, it loads the whole file in memory
#Look in pithos for a nice implementation
self.assert_container()
if withHashFile:
data = f.read()
try:
import json
data = json.dumps(json.loads(data))
except ValueError:
raise ClientError(message='"%s" is not json-formated'%f.name, status=1)
except SyntaxError:
raise ClientError(message='"%s" is not a valid hashmap file'%f.name, status=1)
from StringIO import StringIO
f = StringIO(data)
data = f.read(size) if size is not None else f.read()
self.object_put(obj, data=data, etag=etag, content_encoding=content_encoding,
content_disposition=content_disposition, content_type=content_type, permitions=sharing,
public=public, success=201)
def put_block_async(self, data, hash):
class SilentGreenlet(gevent.Greenlet):
def _report_error(self, exc_info):
......@@ -608,10 +631,19 @@ class PithosClient(StorageClient):
self.reset_headers()
assert r.json[0] == hash, 'Local hash does not match server'
def async_upload_object(self, object, f, size=None, hash_cb=None,
upload_cb=None):
"""Like upload_object object but it sends blocks of data asynchronously
using geven/greenlet
def create_object_by_manifestation(self, obj, etag=None, content_encoding=None,
content_disposition=None, content_type=None, sharing=None, public=None):
self.assert_container()
obj_content_type = 'application/octet-stream' if content_type is None else content_type
self.object_put(obj, content_length=0, etag=etag, content_encoding=content_encoding,
content_disposition=content_disposition, content_type=content_type, permitions=sharing,
public=public, manifest='%s/%s'%(self.container,obj))
def upload_object(self, object, f, size=None, hash_cb=None, upload_cb=None, etag=None,
content_encoding=None, content_disposition=None, content_type=None, sharing=None,
public=None):
"""upload_object chunk by chunk. Different chunks are uploaded asynchronously
in a pseudo-parallel fashion (using greenlets)
"""
self.assert_container()
......@@ -642,9 +674,13 @@ class PithosClient(StorageClient):
assert offset == size
obj_content_type = 'application/octet-stream' if content_type is None else content_type
hashmap = dict(bytes=size, hashes=hashes)
r = self.object_put(object, format='json', hashmap=True,
content_type='application/octet-stream', json=hashmap, success=(201, 409))
r = self.object_put(object, format='json', hashmap=True, content_type=obj_content_type,
json=hashmap, etag=etag, content_encoding=content_encoding,
content_disposition=content_disposition, permitions=sharing, public=public,
success=(201, 409))
self.reset_headers()
if r.status_code == 201:
......@@ -661,7 +697,6 @@ class PithosClient(StorageClient):
offset, bytes = map[hash]
f.seek(offset)
data = f.read(bytes)
#self.put_block(data, hash)
r = self.put_block_async(data, hash)
flying.append(r)
for r in flying:
......@@ -673,72 +708,8 @@ class PithosClient(StorageClient):
flying = [r for r in flying if not r.ready()]
gevent.joinall(flying)
self.object_put(object, format='json', hashmap=True,
content_type='application/octet-stream', json=hashmap, success=201)
def upload_object(self, object, f, size=None, hash_cb=None,
upload_cb=None):
"""Create an object by uploading only the missing blocks
hash_cb is a generator function taking the total number of blocks to
be hashed as an argument. Its next() will be called every time a block
is hashed.
upload_cb is a generator function with the same properties that is
called every time a block is uploaded.
"""
self.assert_container()
meta = self.get_container_info(self.container)
blocksize = int(meta['x-container-block-size'])
blockhash = meta['x-container-block-hash']
size = size if size is not None else os.fstat(f.fileno()).st_size
nblocks = 1 + (size - 1) // blocksize
hashes = []
map = {}
offset = 0
if hash_cb:
hash_gen = hash_cb(nblocks)
hash_gen.next()
for i in range(nblocks):
block = f.read(min(blocksize, size - offset))
bytes = len(block)
hash = pithos_hash(block, blockhash)
hashes.append(hash)
map[hash] = (offset, bytes)
offset += bytes
if hash_cb:
hash_gen.next()
assert offset == size
hashmap = dict(bytes=size, hashes=hashes)
r = self.object_put(object, format='json', hashmap=True,
content_type='application/octet-stream', json=hashmap, success=(201, 409))
self.reset_headers()
if r.status_code == 201:
return
missing = r.json
if upload_cb:
upload_gen = upload_cb(len(missing))
upload_gen.next()
for hash in missing:
offset, bytes = map[hash]
f.seek(offset)
data = f.read(bytes)
self.put_block(data, hash)
r = self.put_block(data, hash)
if upload_cb:
upload_gen.next()
self.object_put(object, format='json', hashmap=True,
content_type='application/octet-stream', json=hashmap, success=201)
self.object_put(object, format='json', hashmap=True, update=True,
content_type=obj_content_type, json=hashmap, success=201)
def set_account_group(self, group, usernames):
self.account_post(update=True, groups = {group:usernames})
......
......@@ -39,6 +39,7 @@ from .pithos import PithosClient, ClientError
from .cli_utils import raiseCLIError
from kamaki.utils import print_dict, pretty_keys, print_list
from colors import bold
from sys import stdout
from progress.bar import IncrementalBar
......@@ -264,6 +265,7 @@ class store_create(_store_container_command):
"""Create a container or a directory object"""
def update_parser(self, parser):
super(self.__class__, self).update_parser(parser)
parser.add_argument('--versioning', action='store', dest='versioning', default=None,
help='set container versioning (auto/none)')
parser.add_argument('--quota', action='store', dest='quota', default=None,
......@@ -364,18 +366,113 @@ class store_overwrite(_store_container_command):
except ClientError as err:
raiseCLIError(err)
@command()
class store_manifest(_store_container_command):
"""Create a remote file with uploaded parts by manifestation"""
def update_parser(self, parser):
super(self.__class__, self).update_parser(parser)
parser.add_argument('--etag', action='store', dest='etag', default=None,
help='check written data')
parser.add_argument('--content-encoding', action='store', dest='content_encoding',
default=None, help='provide the object MIME content type')
parser.add_argument('--content-disposition', action='store', dest='content_disposition',
default=None, help='provide the presentation style of the object')
parser.add_argument('--content-type', action='store', dest='content_type', default=None,
help='create object with specific content type')
parser.add_argument('--sharing', action='store', dest='sharing', default=None,
help='define sharing object policy ( "read=user1,grp1,user2,... write=user1,grp2,...')
parser.add_argument('--public', action='store_true', dest='public', default=False,
help='make object publicly accessible')
def getsharing(self, orelse={}):
permstr = getattr(self.args, 'sharing')
if permstr is None:
return orelse
perms = {}
for p in permstr.split(' '):
(key, val) = p.split('=')
if key.lower() not in ('read', 'write'):
raise CLIError(message='in --sharing: Invalid permition key', importance=1)
val_list = val.split(',')
if not perms.has_key(key):
perms[key]=[]
for item in val_list:
if item not in perms[key]:
perms[key].append(item)
return perms
def main(self, container___path):
super(self.__class__, self).main(container___path)
try:
self.client.create_object_by_manifestation(self.path,
content_encoding=getattr(self.args, 'content_encoding'),
content_disposition=getattr(self.args, 'content_disposition'),
content_type=getattr(self.args, 'content_type'), sharing=self.getsharing(),
public=getattr(self.args, 'public'))
except ClientError as err:
raiseCLIError(err)
@command()
class store_upload(_store_container_command):
"""Upload a file"""
def update_parser(self, parser):
super(self.__class__, self).update_parser(parser)
parser.add_argument('--use_hashes', action='store_true', dest='use_hashes', default=False,
help='provide hashmap file instead of data')
parser.add_argument('--unchunked', action='store_true', dest='unchunked', default=False,
help='avoid chunked transfer mode')
parser.add_argument('--etag', action='store', dest='etag', default=None,
help='check written data')
parser.add_argument('--content-encoding', action='store', dest='content_encoding',
default=None, help='provide the object MIME content type')
parser.add_argument('--content-disposition', action='store', dest='content_disposition',
default=None, help='provide the presentation style of the object')
parser.add_argument('--content-type', action='store', dest='content_type', default=None,
help='create object with specific content type')
parser.add_argument('--sharing', action='store', dest='sharing', default=None,
help='define sharing object policy ( "read=user1,grp1,user2,... write=user1,grp2,...')
parser.add_argument('--public', action='store_true', dest='public', default=False,
help='make object publicly accessible')
def getsharing(self, orelse={}):
permstr = getattr(self.args, 'sharing')
if permstr is None:
return orelse
perms = {}
for p in permstr.split(' '):
(key, val) = p.split('=')
if key.lower() not in ('read', 'write'):
raise CLIError(message='in --sharing: Invalid permition key', importance=1)
val_list = val.split(',')
if not perms.has_key(key):
perms[key]=[]
for item in val_list:
if item not in perms[key]:
perms[key].append(item)
return perms
def main(self, local_path, container____path__):
super(self.__class__, self).main(container____path__)
remote_path = local_path if self.path is None else self.path
try:
remote_path = basename(local_path) if self.path is None else self.path
with open(local_path) as f:
hash_cb = self.progress('Calculating block hashes')
upload_cb = self.progress('Uploading blocks')
self.client.async_upload_object(remote_path, f, hash_cb=hash_cb, upload_cb=upload_cb)
if getattr(self.args, 'unchunked'):
self.client.upload_object_unchunked(remote_path, f,
etag=getattr(self.args, 'etag'), withHashFile=getattr(self.args, 'use_hashes'),
content_encoding=getattr(self.args, 'content_encoding'),
content_disposition=getattr(self.args, 'content_disposition'),
content_type=getattr(self.args, 'content_type'), sharing=self.getsharing(),
public=getattr(self.args, 'public'))
else:
hash_cb = self.progress('Calculating block hashes')
upload_cb = self.progress('Uploading blocks')
self.client.upload_object(remote_path, f, hash_cb=hash_cb, upload_cb=upload_cb,
content_encoding=getattr(self.args, 'content_encoding'),
content_disposition=getattr(self.args, 'content_disposition'),
content_type=getattr(self.args, 'content_type'), sharing=self.getsharing(),
public=getattr(self.args, 'public'))
except ClientError as err:
raiseCLIError(err)
......@@ -415,6 +512,7 @@ class store_delete(_store_container_command):
"""Delete a container [or an object]"""
def update_parser(self, parser):
super(self.__class__, self).update_parser(parser)
parser.add_argument('--until', action='store', dest='until', default=None,
help='remove history until that date')
parser.add_argument('--format', action='store', dest='format', default='%d/%m/%Y %H:%M:%S',
......@@ -568,6 +666,7 @@ class store_meta(_store_container_command):
"""Get custom meta-content for account [, container [or object]]"""
def update_parser(self, parser):
super(self.__class__, self).update_parser(parser)
parser.add_argument('-l', action='store_true', dest='detail', default=False,
help='show detailed output')
parser.add_argument('--until', action='store', dest='until', default=None,
......
......@@ -104,8 +104,7 @@ class StorageClient(Client):
r = self.get(path, success = (200, 204))
return r.json
def upload_object(self, object, f, size=None, hash_cb=None,
upload_cb=None):
def upload_object(self, object, f, size=None):
# This is a naive implementation, it loads the whole file in memory
#Look in pithos for a nice implementation
self.assert_container()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment