Commit 9d502497 authored by Stavros Sachtouris's avatar Stavros Sachtouris
Browse files

Implement upload_from_string

Implemented with threads

Refs: #3608
parent 0fbc8a52
......@@ -690,6 +690,16 @@ class Pithos(livetest.Generic):
range_str='%s-%s' % (pos, (pos + 128)))
self.assertEqual(tmp_s, src_f.read(len(tmp_s)))
print('\tUploading KiBs as strings...')
trg_fname = 'fromString_%s' % self.now
src_size = 2 * 1024
src_f.seek(0)
src_str = src_f.read(src_size)
self.client.upload_from_string(trg_fname, src_str)
print('\tDownload as string and check...')
tmp_s = self.client.download_to_string(trg_fname)
self.assertEqual(tmp_s, src_str)
"""Upload a boring file"""
trg_fname = 'boringfile_%s' % self.now
src_f = self.create_boring_file(42)
......
......@@ -182,7 +182,7 @@ class PithosClient(PithosRestClient):
return r.headers
# upload_* auxiliary methods
def _put_block_async(self, data, hash, upload_gen=None):
def _put_block_async(self, data, hash):
event = SilentEvent(method=self._put_block, data=data, hash=hash)
event.start()
return event
......@@ -219,7 +219,7 @@ class PithosClient(PithosRestClient):
nblocks = 1 + (size - 1) // blocksize
return (blocksize, blockhash, size, nblocks)
def _create_or_get_missing_hashes(
def _create_object_or_get_missing_hashes(
self, obj, json,
size=None,
format='json',
......@@ -279,7 +279,7 @@ class PithosClient(PithosRestClient):
offset, bytes = hmap[hash]
fileobj.seek(offset)
data = fileobj.read(bytes)
r = self._put_block_async(data, hash, upload_gen)
r = self._put_block_async(data, hash)
flying.append(r)
unfinished = self._watch_thread_limit(flying)
for thread in set(flying).difference(unfinished):
......@@ -360,7 +360,6 @@ class PithosClient(PithosRestClient):
"""
self._assert_container()
#init
block_info = (
blocksize, blockhash, size, nblocks) = self._get_file_block_info(
f, size, container_info_cache)
......@@ -376,7 +375,7 @@ class PithosClient(PithosRestClient):
hash_cb=hash_cb)
hashmap = dict(bytes=size, hashes=hashes)
missing, obj_headers = self._create_or_get_missing_hashes(
missing, obj_headers = self._create_object_or_get_missing_hashes(
obj, hashmap,
content_type=content_type,
size=size,
......@@ -418,9 +417,135 @@ class PithosClient(PithosRestClient):
else:
break
if missing:
raise ClientError(
'%s blocks failed to upload' % len(missing),
status=800)
raise ClientError('%s blocks failed to upload' % len(missing))
except KeyboardInterrupt:
sendlog.info('- - - wait for threads to finish')
for thread in activethreads():
thread.join()
raise
r = self.object_put(
obj,
format='json',
hashmap=True,
content_type=content_type,
if_etag_match=if_etag_match,
if_etag_not_match='*' if if_not_exist else None,
etag=etag,
json=hashmap,
permissions=sharing,
public=public,
success=201)
return r.headers
def upload_from_string(
self, obj, input_str,
hash_cb=None,
upload_cb=None,
etag=None,
if_etag_match=None,
if_not_exist=None,
content_encoding=None,
content_disposition=None,
content_type=None,
sharing=None,
public=None,
container_info_cache=None):
"""Upload an object using multiple connections (threads)
:param obj: (str) remote object path
:param input_str: (str) upload content
:param hash_cb: optional progress.bar object for calculating hashes
:param upload_cb: optional progress.bar object for uploading
:param etag: (str)
:param if_etag_match: (str) Push that value to if-match header at file
creation
:param if_not_exist: (bool) If true, the file will be uploaded ONLY if
it does not exist remotely, otherwise the operation will fail.
Involves the case of an object with the same path is created while
the object is being uploaded.
:param content_encoding: (str)
:param content_disposition: (str)
:param content_type: (str)
:param sharing: {'read':[user and/or grp names],
'write':[usr and/or grp names]}
:param public: (bool)
:param container_info_cache: (dict) if given, avoid redundant calls to
server for container info (block size and hash information)
"""
self._assert_container()
blocksize, blockhash, size, nblocks = self._get_file_block_info(
fileobj=None, size=len(input_str), cache=container_info_cache)
(hashes, hmap, offset) = ([], {}, 0)
if not content_type:
content_type = 'application/octet-stream'
num_of_blocks, blockmod = size / blocksize, size % blocksize
num_of_blocks += (1 if blockmod else 0) if num_of_blocks else blockmod
hashes = {}
hmap = {}
for blockid in range(num_of_blocks):
start = blockid * blocksize
block = input_str[start: (start + blocksize)]
hashes[blockid] = _pithos_hash(block, blockhash)
hmap[hashes[blockid]] = (start, block)
hashmap = dict(bytes=size, hashes=hashes)
missing, obj_headers = self._create_object_or_get_missing_hashes(
obj, hashmap,
content_type=content_type,
size=size,
if_etag_match=if_etag_match,
if_etag_not_match='*' if if_not_exist else None,
content_encoding=content_encoding,
content_disposition=content_disposition,
permissions=sharing,
public=public)
if missing is None:
return obj_headers
num_of_missing = len(missing)
if upload_cb:
self.progress_bar_gen = upload_cb(num_of_blocks)
for i in range(num_of_blocks + 1 - num_of_missing):
self._cb_next()
try:
flying = []
for hash in missing:
offset, block = hmap[hash]
bird = self._put_block_async(block, hash)
flying.append(bird)
unfinished = self._watch_thread_limit(flying)
for thread in set(flying).difference(unfinished):
if thread.exception:
raise thread.exception
if thread.isAlive():
flying.append(thread)
else:
self._cb_next()
flying = unfinished
for thread in flying:
thread.join()
if thread.exception:
raise thread.exception
self._cb_next()
except KeyboardInterrupt:
sendlog.info('- - - wait for threads to finish')
for thread in activethreads():
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment