Commit 36778d79 authored by Stavros Sachtouris's avatar Stavros Sachtouris
Browse files

Merge branch 'feature-updownload-fromto-string' into develop

parents 653e6193 dcfe7455
......@@ -32,4 +32,6 @@ Features:
- Add a -l option to upload, for listing uploaded objects details [#3730]
- Add option to cache container info for upload_object [#3707]
- Add enumeration to all listing commands, make it optional [#3739]
- Add a download_to_string method in pithos client [#3608]
- Add an upload_from_string method in pithos client [#3608]
......@@ -677,10 +677,27 @@ class Pithos(livetest.Generic):
self.client.download_object(trg_fname, dnl_f)
print('\tCheck if files match...')
for pos in (0, f_size / 2, f_size - 20):
for pos in (0, f_size / 2, f_size - 128):
src_f.seek(pos)
dnl_f.seek(pos)
self.assertEqual(src_f.read(10), dnl_f.read(10))
self.assertEqual(src_f.read(64), dnl_f.read(64))
print('\tDownload KiBs to string and check again...')
for pos in (0, f_size / 2, f_size - 256):
src_f.seek(pos)
tmp_s = self.client.download_to_string(
trg_fname,
range_str='%s-%s' % (pos, (pos + 128)))
self.assertEqual(tmp_s, src_f.read(len(tmp_s)))
print('\tUploading KiBs as strings...')
trg_fname = 'fromString_%s' % self.now
src_size = 2 * 1024
src_f.seek(0)
src_str = src_f.read(src_size)
self.client.upload_from_string(trg_fname, src_str)
print('\tDownload as string and check...')
tmp_s = self.client.download_to_string(trg_fname)
self.assertEqual(tmp_s, src_str)
"""Upload a boring file"""
trg_fname = 'boringfile_%s' % self.now
......
......@@ -36,6 +36,7 @@ from threading import enumerate as activethreads
from os import fstat
from hashlib import new as newhashlib
from time import time
from StringIO import StringIO
from binascii import hexlify
......@@ -43,7 +44,6 @@ from kamaki.clients import SilentEvent, sendlog
from kamaki.clients.pithos.rest_api import PithosRestClient
from kamaki.clients.storage import ClientError
from kamaki.clients.utils import path4url, filter_in
from StringIO import StringIO
def _pithos_hash(block, blockhash):
......@@ -182,7 +182,7 @@ class PithosClient(PithosRestClient):
return r.headers
# upload_* auxiliary methods
def _put_block_async(self, data, hash, upload_gen=None):
def _put_block_async(self, data, hash):
event = SilentEvent(method=self._put_block, data=data, hash=hash)
event.start()
return event
......@@ -219,7 +219,7 @@ class PithosClient(PithosRestClient):
nblocks = 1 + (size - 1) // blocksize
return (blocksize, blockhash, size, nblocks)
def _create_or_get_missing_hashes(
def _create_object_or_get_missing_hashes(
self, obj, json,
size=None,
format='json',
......@@ -279,7 +279,7 @@ class PithosClient(PithosRestClient):
offset, bytes = hmap[hash]
fileobj.seek(offset)
data = fileobj.read(bytes)
r = self._put_block_async(data, hash, upload_gen)
r = self._put_block_async(data, hash)
flying.append(r)
unfinished = self._watch_thread_limit(flying)
for thread in set(flying).difference(unfinished):
......@@ -360,7 +360,6 @@ class PithosClient(PithosRestClient):
"""
self._assert_container()
#init
block_info = (
blocksize, blockhash, size, nblocks) = self._get_file_block_info(
f, size, container_info_cache)
......@@ -376,7 +375,7 @@ class PithosClient(PithosRestClient):
hash_cb=hash_cb)
hashmap = dict(bytes=size, hashes=hashes)
missing, obj_headers = self._create_or_get_missing_hashes(
missing, obj_headers = self._create_object_or_get_missing_hashes(
obj, hashmap,
content_type=content_type,
size=size,
......@@ -420,7 +419,145 @@ class PithosClient(PithosRestClient):
if missing:
raise ClientError(
'%s blocks failed to upload' % len(missing),
status=800)
details=['%s' % thread.exception for thread in missing])
except KeyboardInterrupt:
sendlog.info('- - - wait for threads to finish')
for thread in activethreads():
thread.join()
raise
r = self.object_put(
obj,
format='json',
hashmap=True,
content_type=content_type,
if_etag_match=if_etag_match,
if_etag_not_match='*' if if_not_exist else None,
etag=etag,
json=hashmap,
permissions=sharing,
public=public,
success=201)
return r.headers
def upload_from_string(
self, obj, input_str,
hash_cb=None,
upload_cb=None,
etag=None,
if_etag_match=None,
if_not_exist=None,
content_encoding=None,
content_disposition=None,
content_type=None,
sharing=None,
public=None,
container_info_cache=None):
"""Upload an object using multiple connections (threads)
:param obj: (str) remote object path
:param input_str: (str) upload content
:param hash_cb: optional progress.bar object for calculating hashes
:param upload_cb: optional progress.bar object for uploading
:param etag: (str)
:param if_etag_match: (str) Push that value to if-match header at file
creation
:param if_not_exist: (bool) If true, the file will be uploaded ONLY if
it does not exist remotely, otherwise the operation will fail.
Involves the case of an object with the same path is created while
the object is being uploaded.
:param content_encoding: (str)
:param content_disposition: (str)
:param content_type: (str)
:param sharing: {'read':[user and/or grp names],
'write':[usr and/or grp names]}
:param public: (bool)
:param container_info_cache: (dict) if given, avoid redundant calls to
server for container info (block size and hash information)
"""
self._assert_container()
blocksize, blockhash, size, nblocks = self._get_file_block_info(
fileobj=None, size=len(input_str), cache=container_info_cache)
(hashes, hmap, offset) = ([], {}, 0)
if not content_type:
content_type = 'application/octet-stream'
num_of_blocks, blockmod = size / blocksize, size % blocksize
num_of_blocks += (1 if blockmod else 0) if num_of_blocks else blockmod
hashes = []
hmap = {}
for blockid in range(num_of_blocks):
start = blockid * blocksize
block = input_str[start: (start + blocksize)]
hashes.append(_pithos_hash(block, blockhash))
hmap[hashes[blockid]] = (start, block)
hashmap = dict(bytes=size, hashes=hashes)
missing, obj_headers = self._create_object_or_get_missing_hashes(
obj, hashmap,
content_type=content_type,
size=size,
if_etag_match=if_etag_match,
if_etag_not_match='*' if if_not_exist else None,
content_encoding=content_encoding,
content_disposition=content_disposition,
permissions=sharing,
public=public)
if missing is None:
return obj_headers
num_of_missing = len(missing)
if upload_cb:
self.progress_bar_gen = upload_cb(num_of_blocks)
for i in range(num_of_blocks + 1 - num_of_missing):
self._cb_next()
tries = 7
old_failures = 0
try:
while tries and missing:
flying = []
failures = []
for hash in missing:
offset, block = hmap[hash]
bird = self._put_block_async(block, hash)
flying.append(bird)
unfinished = self._watch_thread_limit(flying)
for thread in set(flying).difference(unfinished):
if thread.exception:
failures.append(thread.kwargs['hash'])
if thread.isAlive():
flying.append(thread)
else:
self._cb_next()
flying = unfinished
for thread in flying:
thread.join()
if thread.exception:
failures.append(thread.kwargs['hash'])
self._cb_next()
missing = failures
if missing and len(missing) == old_failures:
tries -= 1
old_failures = len(missing)
if missing:
raise ClientError(
'%s blocks failed to upload' % len(missing),
details=['%s' % thread.exception for thread in missing])
except KeyboardInterrupt:
sendlog.info('- - - wait for threads to finish')
for thread in activethreads():
......@@ -494,7 +631,7 @@ class PithosClient(PithosRestClient):
- e.g. if the range is 10-100, all blocks will be written to
normal_position - 10
"""
for i, (key, g) in enumerate(flying.items()):
for key, g in flying.items():
if g.isAlive():
continue
if g.exception:
......@@ -532,8 +669,8 @@ class PithosClient(PithosRestClient):
self._thread2file(
flying, blockid_dict, local_file, offset,
**restargs)
end = total_size - 1 if key + blocksize > total_size\
else key + blocksize - 1
end = total_size - 1 if (
key + blocksize > total_size) else key + blocksize - 1
start, end = _range_up(key, end, filerange)
if start == end:
self._cb_next()
......@@ -623,6 +760,80 @@ class PithosClient(PithosRestClient):
self._complete_cb()
def download_to_string(
self, obj,
download_cb=None,
version=None,
range_str=None,
if_match=None,
if_none_match=None,
if_modified_since=None,
if_unmodified_since=None):
"""Download an object to a string (multiple connections). This method
uses threads for http requests, but stores all content in memory.
:param obj: (str) remote object path
:param download_cb: optional progress.bar object for downloading
:param version: (str) file version
:param range_str: (str) from, to are file positions (int) in bytes
:param if_match: (str)
:param if_none_match: (str)
:param if_modified_since: (str) formated date
:param if_unmodified_since: (str) formated date
:returns: (str) the whole object contents
"""
restargs = dict(
version=version,
data_range=None if range_str is None else 'bytes=%s' % range_str,
if_match=if_match,
if_none_match=if_none_match,
if_modified_since=if_modified_since,
if_unmodified_since=if_unmodified_since)
(
blocksize,
blockhash,
total_size,
hash_list,
remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
assert total_size >= 0
if download_cb:
self.progress_bar_gen = download_cb(len(hash_list))
self._cb_next()
num_of_blocks = len(remote_hashes)
ret = [''] * num_of_blocks
self._init_thread_limit()
flying = dict()
for blockid, blockhash in enumerate(remote_hashes):
start = blocksize * blockid
is_last = start + blocksize > total_size
end = (total_size - 1) if is_last else (start + blocksize - 1)
(start, end) = _range_up(start, end, range_str)
if start < end:
self._watch_thread_limit(flying.values())
flying[blockid] = self._get_block_async(obj, **restargs)
for runid, thread in flying.items():
if (blockid + 1) == num_of_blocks:
thread.join()
elif thread.isAlive():
continue
if thread.exception:
raise thread.exception
ret[runid] = thread.value.content
self._cb_next()
flying.pop(runid)
return ''.join(ret)
#Command Progress Bar method
def _cb_next(self, step=1):
if hasattr(self, 'progress_bar_gen'):
......
......@@ -826,8 +826,133 @@ class PithosClient(TestCase):
r = self.client.upload_object(obj, tmpFile)
self.assert_dicts_are_equal(r, exp_headers)
self.assertEqual(GCI.mock_calls[-1], call())
[call1, call2] = OP.mock_calls
(args1, kwargs1) = call1[1:3]
(args2, kwargs2) = call2[1:3]
self.assertEqual(args1, (obj,))
expected1 = dict(
hashmap=True,
success=(201, 409),
format='json',
json=dict(
hashes=['s0m3h@5h'] * num_of_blocks,
bytes=num_of_blocks * 4 * 1024 * 1024),
content_encoding=None,
content_type='application/octet-stream',
content_disposition=None,
public=None,
permissions=None)
for k, v in expected1.items():
if k == 'json':
self.assertEqual(len(v['hashes']), len(kwargs1[k]['hashes']))
self.assertEqual(v['bytes'], kwargs1[k]['bytes'])
else:
self.assertEqual(v, kwargs1[k])
(args2, kwargs2) = call2[1:3]
self.assertEqual(args2, (obj,))
expected2 = dict(
json=dict(
hashes=['s0m3h@5h'] * num_of_blocks,
bytes=num_of_blocks * 4 * 1024 * 1024),
content_type='application/octet-stream',
hashmap=True,
success=201,
format='json')
for k, v in expected2.items():
if k == 'json':
self.assertEqual(len(v['hashes']), len(kwargs2[k]['hashes']))
self.assertEqual(v['bytes'], kwargs2[k]['bytes'])
else:
self.assertEqual(v, kwargs2[k])
mock_offset = 2
# With progress bars
try:
from progress.bar import ShadyBar
blck_bar = ShadyBar('Mock blck calc.')
upld_bar = ShadyBar('Mock uplds')
except ImportError:
blck_bar = None
upld_bar = None
if blck_bar and upld_bar:
def blck_gen(n):
for i in blck_bar.iter(range(n)):
yield
yield
def upld_gen(n):
for i in upld_bar.iter(range(n)):
yield
yield
tmpFile.seek(0)
r = self.client.upload_object(
obj, tmpFile,
hash_cb=blck_gen, upload_cb=upld_gen)
self.assert_dicts_are_equal(r, exp_headers)
for i, c in enumerate(OP.mock_calls[-mock_offset:]):
self.assertEqual(OP.mock_calls[i], c)
# With content-type
tmpFile.seek(0)
ctype = 'video/mpeg'
sharing = dict(read=['u1', 'g1', 'u2'], write=['u1'])
r = self.client.upload_object(obj, tmpFile,
content_type=ctype, sharing=sharing)
self.assert_dicts_are_equal(r, exp_headers)
self.assertEqual(OP.mock_calls[-1][2]['content_type'], ctype)
self.assert_dicts_are_equal(
OP.mock_calls[-2][2]['permissions'],
sharing)
# With other args
tmpFile.seek(0)
kwargs = dict(
etag='s0m3E74g',
if_etag_match='if etag match',
if_not_exist=True,
content_type=ctype,
content_disposition=ctype + 'd15p051710n',
public=True,
content_encoding='802.11',
container_info_cache={})
r = self.client.upload_object(obj, tmpFile, **kwargs)
self.assert_dicts_are_equal(r, exp_headers)
kwargs.pop('if_not_exist')
ematch = kwargs.pop('if_etag_match')
etag = kwargs.pop('etag')
self.assert_dicts_are_equal(
kwargs.pop('container_info_cache'),
{self.client.container: container_info})
for arg, val in kwargs.items():
self.assertEqual(OP.mock_calls[-2][2][arg], val)
self.assertEqual(OP.mock_calls[-1][2]['if_etag_match'], ematch)
self.assertEqual(OP.mock_calls[-1][2]['if_etag_not_match'], '*')
self.assertEqual(OP.mock_calls[-1][2]['etag'], etag)
@patch('%s.get_container_info' % pithos_pkg, return_value=container_info)
@patch('%s.container_post' % pithos_pkg, return_value=FR())
@patch('%s.object_put' % pithos_pkg, return_value=FR())
def test_upload_from_string(self, OP, CP, GCI):
num_of_blocks = 2
tmpFile = self._create_temp_file(num_of_blocks)
tmpFile.seek(0)
src_str = tmpFile.read()
exp_headers = dict(id='container id', name='container name')
FR.headers = dict(exp_headers)
r = self.client.upload_from_string(obj, src_str)
self.assert_dicts_are_equal(r, exp_headers)
self.assertEqual(GCI.mock_calls[-1], call())
[call1, call2] = OP.mock_calls
(args1, kwargs1) = call1[1:3]
(args2, kwargs2) = call2[1:3]
self.assertEqual(args1, (obj,))
......@@ -1101,6 +1226,37 @@ class PithosClient(TestCase):
expected['permissions'] = expected.pop('sharing')
self.assertEqual(put.mock_calls[-1], call(obj, **expected))
@patch('%s.get_object_hashmap' % pithos_pkg, return_value=object_hashmap)
@patch('%s.object_get' % pithos_pkg, return_value=FR())
def test_download_to_string(self, GET, GOH):
FR.content = 'some sample content'
num_of_blocks = len(object_hashmap['hashes'])
r = self.client.download_to_string(obj)
expected_content = FR.content * num_of_blocks
self.assertEqual(expected_content, r)
self.assertEqual(len(GET.mock_calls), num_of_blocks)
self.assertEqual(GET.mock_calls[-1][1], (obj,))
kwargs = dict(
version='version',
range_str='10-20',
if_match='if and only if',
if_none_match='if and only not',
if_modified_since='what if not?',
if_unmodified_since='this happens if not!')
expargs = dict(kwargs)
expargs.pop('range_str')
for k in expargs:
expargs[k] = None
GOH.assert_called_once_with(obj, **expargs)
r = self.client.download_to_string(obj, **kwargs)
expargs['data_range'] = 'bytes=%s' % kwargs['range_str']
for k, v in expargs.items():
self.assertEqual(
GET.mock_calls[-1][2][k],
v or kwargs.get(k))
@patch('%s.get_object_hashmap' % pithos_pkg, return_value=object_hashmap)
@patch('%s.object_get' % pithos_pkg, return_value=FR())
def test_download_object(self, GET, GOH):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment