Commit 22fc09fb authored by Stavros Sachtouris's avatar Stavros Sachtouris
Browse files

Support range at file (threaded) download

parent 642f1bbd
......@@ -278,14 +278,17 @@ class PithosClient(PithosRestAPI):
h.update(block.strip('\x00'))
return hexlify(h.digest())
def _greenlet2file(self, flying_greenlets, local_file, **restargs):
def _greenlet2file(self, flying_greenlets, local_file, offset = 0, **restargs):
"""write the results of a greenleted rest call to a file
@offset: the offset of the file up to blocksize - e.g. if the range is 10-100, all
blocks will be written to normal_position - 10"""
finished = []
for start, g in flying_greenlets.items():
if g.ready():
if g.exception:
raise g.exception
block = g.value.content
local_file.seek(start)
local_file.seek(start - offset)
local_file.write(block)
self._cb_next()
finished.append(flying_greenlets.pop(start))
......@@ -293,11 +296,15 @@ class PithosClient(PithosRestAPI):
return finished
def _dump_blocks_async(self, obj, remote_hashes, blocksize, total_size, local_file,
blockhash=None, resume=False, **restargs):
blockhash=None, resume=False, filerange = None, **restargs):
file_size = fstat(local_file.fileno()).st_size if resume else 0
flying_greenlets = {}
finished_greenlets = []
offset = 0
if filerange is not None:
rstart = int(filerange.split('-')[0])
offset = rstart if blocksize > rstart else rstart%blocksize
for block_hash, blockid in remote_hashes.items():
start = blocksize*blockid
if start < file_size and block_hash == self._hash_from_file(local_file,
......@@ -305,15 +312,21 @@ class PithosClient(PithosRestAPI):
self._cb_next()
continue
if len(flying_greenlets) >= self.POOL_SIZE:
finished_greenlets += self._greenlet2file(flying_greenlets, local_file, **restargs)
finished_greenlets += self._greenlet2file(flying_greenlets, local_file, offset,
**restargs)
end = total_size-1 if start+blocksize > total_size else start+blocksize-1
restargs['async_headers'] = dict(range='bytes=%s-%s'%(start, end))
(start, end) = _range_up(start, end, filerange)
if start == end:
self._cb_next()
continue
restargs['async_headers'] = dict(Range='bytes=%s-%s'%(start, end))
flying_greenlets[start] = self._get_block_async(obj, **restargs)
#check the greenlets
while len(flying_greenlets) > 0:
sleep(0.001)
finished_greenlets += self._greenlet2file(flying_greenlets, local_file, **restargs)
finished_greenlets += self._greenlet2file(flying_greenlets, local_file, offset,
**restargs)
gevent.joinall(finished_greenlets)
......@@ -344,8 +357,9 @@ class PithosClient(PithosRestAPI):
self._dump_blocks_sync(obj, hash_list, blocksize, total_size, dst, range, **restargs)
else:
self._dump_blocks_async(obj, remote_hashes, blocksize, total_size, dst, blockhash,
resume, **restargs)
dst.truncate(total_size)
resume, range, **restargs)
if range is None:
dst.truncate(total_size)
self._complete_cb()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment