Commit 0fbc8a52 authored by Stavros Sachtouris's avatar Stavros Sachtouris
Browse files

Optimize download_to_string by using threads

Refs: #3608
parent 49cc29b2
......@@ -494,7 +494,7 @@ class PithosClient(PithosRestClient):
- e.g. if the range is 10-100, all blocks will be written to
normal_position - 10
"""
for i, (key, g) in enumerate(flying.items()):
for key, g in flying.items():
if g.isAlive():
continue
if g.exception:
......@@ -532,8 +532,8 @@ class PithosClient(PithosRestClient):
self._thread2file(
flying, blockid_dict, local_file, offset,
**restargs)
end = total_size - 1 if key + blocksize > total_size\
else key + blocksize - 1
end = total_size - 1 if (
key + blocksize > total_size) else key + blocksize - 1
start, end = _range_up(key, end, filerange)
if start == end:
self._cb_next()
......@@ -632,7 +632,8 @@ class PithosClient(PithosRestClient):
if_none_match=None,
if_modified_since=None,
if_unmodified_since=None):
"""Download an object to a string (multiple connections)
"""Download an object to a string (multiple connections). This method
uses threads for http requests, but stores all content in memory.
:param obj: (str) remote object path
......@@ -672,21 +673,29 @@ class PithosClient(PithosRestClient):
self.progress_bar_gen = download_cb(len(hash_list))
self._cb_next()
ret = ''
num_of_blocks = len(remote_hashes)
ret = [''] * num_of_blocks
self._init_thread_limit()
flying = dict()
for blockid, blockhash in enumerate(remote_hashes):
start = blocksize * blockid
is_last = start + blocksize > total_size
end = (total_size - 1) if is_last else (start + blocksize - 1)
(start, end) = _range_up(start, end, range_str)
if start == end:
continue
restargs['data_range'] = 'bytes=%s-%s' % (start, end)
r = self.object_get(obj, success=(200, 206), **restargs)
ret += r.content
self._cb_next()
self._complete_cb()
return ret
if start < end:
self._watch_thread_limit(flying.values())
flying[blockid] = self._get_block_async(obj, **restargs)
for runid, thread in flying.items():
if (blockid + 1) == num_of_blocks:
thread.join()
elif thread.isAlive():
continue
if thread.exception:
raise thread.exception
ret[runid] = thread.value.content
self._cb_next()
flying.pop(runid)
return ''.join(ret)
#Command Progress Bar method
def _cb_next(self, step=1):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment