Commit 28cbc3c2 authored by Stavros Sachtouris's avatar Stavros Sachtouris
Browse files

Minimize requeests whn dnlding same block

If a file has multiple same blocks, download one of them and copy it to local
file locations
parent ca7f78c0
......@@ -442,32 +442,33 @@ class PithosClient(PithosRestClient):
h.update(block.strip('\x00'))
return hexlify(h.digest())
def _thread2file(self, flying, local_file, offset=0, **restargs):
def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
"""write the results of a greenleted rest call to a file
:param offset: the offset of the file up to blocksize
- e.g. if the range is 10-100, all blocks will be written to
normal_position - 10
"""
finished = []
for i, (start, g) in enumerate(flying.items()):
if not g.isAlive():
if g.exception:
raise g.exception
block = g.value.content
local_file.seek(start - offset)
for i, (key, g) in enumerate(flying.items()):
if g.isAlive():
continue
if g.exception:
raise g.exception
block = g.value.content
for block_start in blockids[key]:
local_file.seek(block_start + offset)
local_file.write(block)
self._cb_next()
finished.append(flying.pop(start))
flying.pop(key)
blockids.pop(key)
local_file.flush()
return finished
def _dump_blocks_async(
self, obj, remote_hashes, blocksize, total_size, local_file,
blockhash=None, resume=False, filerange=None, **restargs):
file_size = fstat(local_file.fileno()).st_size if resume else 0
flying = {}
finished = []
flying = dict()
blockid_dict = dict()
offset = 0
if filerange is not None:
rstart = int(filerange.split('-')[0])
......@@ -475,31 +476,31 @@ class PithosClient(PithosRestClient):
self._init_thread_limit()
for block_hash, blockids in remote_hashes.items():
for blockid in blockids:
start = blocksize * blockid
if start < file_size and block_hash == self._hash_from_file(
local_file, start, blocksize, blockhash):
self._cb_next()
continue
blockids = [blk * blocksize for blk in blockids]
unsaved = [blk for blk in blockids if not (
blk < file_size and block_hash == self._hash_from_file(
local_file, blk, blocksize, blockhash))]
self._cb_next(len(blockids) - len(unsaved))
if unsaved:
key = unsaved[0]
self._watch_thread_limit(flying.values())
finished += self._thread2file(
flying,
local_file,
offset,
self._thread2file(
flying, blockid_dict, local_file, offset,
**restargs)
end = total_size - 1 if start + blocksize > total_size\
else start + blocksize - 1
(start, end) = _range_up(start, end, filerange)
end = total_size - 1 if key + blocksize > total_size\
else key + blocksize - 1
start, end = _range_up(key, end, filerange)
if start == end:
self._cb_next()
continue
restargs['async_headers'] = {
'Range': 'bytes=%s-%s' % (start, end)}
flying[start] = self._get_block_async(obj, **restargs)
flying[key] = self._get_block_async(obj, **restargs)
blockid_dict[key] = unsaved
for thread in flying.values():
thread.join()
finished += self._thread2file(flying, local_file, offset, **restargs)
self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
def download_object(
self, obj, dst,
......@@ -578,17 +579,17 @@ class PithosClient(PithosRestClient):
self._complete_cb()
#Command Progress Bar method
def _cb_next(self):
def _cb_next(self, step=1):
if hasattr(self, 'progress_bar_gen'):
try:
self.progress_bar_gen.next()
self.progress_bar_gen.next(step)
except:
pass
def _complete_cb(self):
while True:
try:
self.progress_bar_gen.next()
self.progress_bar_gen.next(step)
except:
break
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment