Commit fbfee225 authored by Stavros Sachtouris's avatar Stavros Sachtouris
Browse files

Refactor pithos+ download

3-modes:
a sequential
b sequential with resume
c asynchronous/parallel
bug: Still can't multi-download correctly
parent fb0cd49a
......@@ -567,8 +567,8 @@ class store_download(_store_container_command):
super(self.__class__, self).update_parser(parser)
parser.add_argument('--no-progress-bar', action='store_true', dest='no_progress_bar',
default=False, help='Dont display progress bars')
parser.add_argument('--overide', action='store_true', dest='overide', default=False,
help='Force download to overide an existing file')
parser.add_argument('--resume', action='store_true', dest='resume', default=False,
help='Enable download resume (slower)')
parser.add_argument('--range', action='store', dest='range', default=None,
help='show range of data')
parser.add_argument('--if-match', action='store', dest='if_match', default=None,
......@@ -591,10 +591,10 @@ class store_download(_store_container_command):
out = stdout
else:
try:
if getattr(self.args, 'overide'):
out = open(local_path, 'wb+')
else:
if getattr(self.args, 'resume'):
out = open(local_path, 'ab+')
else:
out = open(local_path, 'wb+')
except IOError as err:
raise CLIError(message='Cannot write to file %s - %s'%(local_path,unicode(err)),
importance=1)
......@@ -605,7 +605,7 @@ class store_download(_store_container_command):
try:
self.client.download_object(self.path, out, download_cb,
range=getattr(self.args, 'range'), version=getattr(self.args,'object_version'),
if_match=getattr(self.args, 'if_match'), overide=getattr(self.args, 'overide'),
if_match=getattr(self.args, 'if_match'), resume=getattr(self.args, 'resume'),
if_none_match=getattr(self.args, 'if_none_match'),
if_modified_since=getattr(self.args, 'if_modified_since'),
if_unmodified_since=getattr(self.args, 'if_unmodified_since'))
......
......@@ -210,97 +210,51 @@ class PithosClient(PithosRestAPI):
self.object_put(obj, format='json', hashmap=True, content_type=content_type,
json=hashmap, success=201)
#download_* auxiliary methods
def _get_object_block_info(self,obj, **kwargs):
def _get_remote_blocks_info(self, obj, **restargs):
#retrieve object hashmap
hashmap = self.get_object_hashmap(obj, **kwargs)
hashmap = self.get_object_hashmap(obj, **restargs)
blocksize = int(hashmap['block_size'])
blockhash = hashmap['block_hash']
total_size = hashmap['bytes']
hmap = hashmap['hashes']
print('total_size:%s, blocksize:%s, x/y:%s, len:%s'%(total_size, blocksize,
total_size/blocksize + 1, len(hashmap['hashes'])))
#assert total_size/blocksize + 1 == len(hashmap['hashes'])
map_dict = {}
for h in hmap:
map_dict[h] = True
return (blocksize, blockhash, total_size, hmap, map_dict)
for i, h in enumerate(hashmap['hashes']):
map_dict[h] = i
return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
def _get_range_limits(self, range):
try:
(custom_start, custom_end) = range.split('-')
(custom_start, custom_end) = (int(custom_start), int(custom_end))
except ValueError:
raise ClientError(message='Invalid range string', status=601)
if custom_start > custom_end or custom_start < 0:
raise ClientError(message='Negative range', status=601)
elif custom_start == custom_end:
return
elif custom_end > total_size:
raise ClientError(message='Range exceeds file size', status=601)
return (custom_start, custom_end)
def _get_downloaded_blocks(self, hmap, fileobj, blocksize, blockhash, map_dict,
overide=False, download_gen=None):
if fileobj.isatty() or not path.exists(fileobj.name):
return {}
h = HashMap(blocksize, blockhash)
with_progress_bar = False if download_gen is None else True
h.load(fileobj, with_progress_bar)
resumed = {}
for i, x in enumerate(h):
existing_hash = hexlify(x)
if existing_hash in map_dict:
#resume if some blocks have been downloaded
resumed[existing_hash] = i
if with_progress_bar:
try:
download_gen.next()
except:
pass
elif not overide:
raise ClientError(message='Local file is substantialy different',
status=600)
return resumed
def _get_block_range(self, blockid, blocksize, total_size, custom_start, custom_end):
start = blockid*blocksize
if custom_start is not None:
if start < custom_start:
start = custom_start
elif start > custom_end:
return (None, None)
end = start + blocksize -1 if start+blocksize < total_size else total_size -1
if custom_end is not None and end > custom_end:
end = custom_end
return (start, end)
def _manage_downloading_greenlets(self, flying, objfile, broken_greenlets = [], sleeptime=0):
newflying = []
for v in flying:
h = v['handler']
if h.ready():
if h.exception:
h.release()
raise h.exception
try:
block = h.value.content
except AttributeError:
#catch greenlets that break due to an eventlist bug
print('- - - - - > Got a borken greenlet here')
broken_greenlets.append(v)
continue
objfile.seek(v['start'])
objfile.write(block)
objfile.flush()
def _dump_blocks_sync(self, obj, remote_hashes, blocksize, total_size, dst, **restargs):
for blockid, blockhash in enumerate(remote_hashes):
if blockhash == None:
continue
start = blocksize*blockid
end = total_size-1 if start+blocksize > total_size else start+blocksize-1
restargs['data_range'] = 'bytes=%s-%s'%(start, end)
r = self.object_get(obj, success=(200, 206), **restargs)
self._cb_next()
dst.write(r.content)
dst.flush()
def _filter_out_downloaded_hashses(self, remote_hashes, hash_list, local_file, blocksize,
blockhash):
#load file hashmap
file_hashmap = HashMap(blocksize, blockhash)
file_hashmap.load(local_file, hasattr(self, 'progress_bar_gen'))
#filter out blocks that are already downloaded
for i, x in enumerate(file_hashmap):
local_hash = hexlify(x)
if local_hash in remote_hashes:
blockid = remote_hashes.pop(local_hash)
hash_list[blockid] = None
self._cb_next()
else:
#if there are unfinished greenlets, sleep for some time - be carefull with that
sleep(sleeptime)
newflying.append(v)
return newflying
def _get_block(self, obj, **kwargs):
return self.object_get(obj, success=(200, 206), binary=True, **kwargs)
raise ClientError(message='Local file is substantialy different', status=600)
def _get_block_async(self, obj, **kwargs):
def _get_block_async(self, obj, **restargs):
class SilentGreenlet(gevent.Greenlet):
def _report_error(self, exc_info):
_stderr = sys._stderr
......@@ -309,116 +263,113 @@ class PithosClient(PithosRestAPI):
gevent.Greenlet._report_error(self, exc_info)
finally:
sys.stderr = _stderr
POOL_SIZE =7
self.POOL_SIZE = 5
if self.async_pool is None:
self.async_pool = gevent.pool.Pool(size=POOL_SIZE)
g = SilentGreenlet(self._get_block, obj, **kwargs)
self.async_pool = gevent.pool.Pool(size=self.POOL_SIZE)
g = SilentGreenlet(self.object_get, obj, success=(200, 206), **restargs)
self.async_pool.start(g)
return g
def _async_download_missing_blocks(self, obj, objfile, hmap, resumed, blocksize, total_size,
download_gen=None, custom_start = None, custom_end=None, **restargs):
"""Attempt pseudo-multithreaded (with greenlets) download of blocks, or if that
is not possible retreat to sequensial block download
"""
flying = []
for i, h in enumerate(hmap):
if h in resumed:
continue
if download_gen:
try:
download_gen.next()
except StopIteration:
pass
(start, end) = self._get_block_range(i, blocksize, total_size, custom_start, custom_end)
if start is None:
continue
data_range = 'bytes=%s-%s'%(start, end)
handler = self._get_block_async(obj, data_range=data_range, **restargs)
flying.append({'handler':handler, 'start':start, 'data_range':data_range})
broken = []
flying = self._manage_downloading_greenlets(flying, objfile, broken_greenlets=broken)
#workaround for eventlib bug that breaks greenlets: replace them with new ones
for brgr in broken:
brgr['handler'] = self._get_block_async(obj, data_range=brgr['data_range'],
**restargs)
flying.append(brgr)
#write the last results and exit
while len(flying) > 0:
broken = []
flying=self._manage_downloading_greenlets(flying, objfile, broken_greenlets=broken,
sleeptime=0.1)
#workaround for eventlib bug that breaks greenlets: replace them with new ones
for brgr in broken:
brgr['handler'] = self._get_block_async(obj, data_range=brgr['data_range'],
**restargs)
flying.append(brgr)
objfile.truncate(total_size)
gevent.joinall(flying)
def _append_missing_blocks(self, obj, objfile, hmap, resumed, blocksize, total_size,
download_gen=None, custom_start=None, custom_end=None, **restargs):
for i, h in enumerate(hmap):
if h in resumed:
continue
if download_gen:
def _greenlet2file(self, flying_greenlets, local_file, broken={}, **restargs):
finished = []
for start, g in flying_greenlets.items():
if g.ready():
if g.exception:
g.release()
raise g.exception
try:
download_gen.next()
except StopIteration:
pass
(start, end) = self._get_block_range(i, blocksize, total_size, custom_start, custom_end)
data_range = 'bytes=%s-%s'%(start, end)
r = self._get_block(obj, data_range=data_range, **restargs)
objfile.write(r.content)
objfile.flush()
def download_object(self, obj, objfile, download_cb=None, version=None, overide=False, range=None,
if_match=None, if_none_match=None, if_modified_since=None, if_unmodified_since=None):
"""overide is forcing the local file to become exactly as the remote, even if it is
substantialy different
"""
self.assert_container()
(blocksize, blockhash, total_size, hmap, map_dict) = self._get_object_block_info(obj,
version=version, if_match=if_match, if_none_match=if_none_match,
if_modified_since=if_modified_since, if_unmodified_since=if_unmodified_since)
if total_size <= 0:
return
(custom_start, custom_end) = (None, None) if range is None \
else self._get_range_limits(range)
#load progress bar
if download_cb is not None:
download_gen = download_cb(total_size/blocksize + 1)
download_gen.next()
resumed = self._get_downloaded_blocks(hmap, objfile, blocksize, blockhash, map_dict,
overide=overide, download_gen=download_gen)
restargs=dict(version=version, if_etag_match=if_match, if_etag_not_match=if_none_match,
if_modified_since=if_modified_since, if_unmodified_since=if_unmodified_since)
if objfile.isatty():
self._append_missing_blocks(obj, objfile, hmap, resumed, blocksize, total_size,
download_gen, custom_start=custom_start, custom_end=custom_end, **restargs)
block = g.value.content
except AttributeError:
broken[start] = flying_greenlets.pop(start)
continue
local_file.seek(start)
local_file.write(block)
#local_file.flush()
self._cb_next()
finished.append(flying_greenlets.pop(start))
local_file.flush()
return finished
def _dump_blocks_async(self, obj, remote_hashes, blocksize, total_size, local_file, **restargs):
#let the fly
flying_greenlets = {}
finished_greenlets = []
broken = {}
for block_hash, blockid in remote_hashes.items():
start = blocksize*blockid
end = total_size-1 if start+blocksize > total_size else start+blocksize-1
restargs['data_range'] = 'bytes=%s-%s'%(start, end)
#store info for relaunching greenlet if needed
flying_greenlets[start] = self._get_block_async(obj, **restargs)
finished_greenlets += self._greenlet2file(flying_greenlets, local_file, broken,
**restargs)
#check the greenlets
while len(flying_greenlets) > 0:
sleep(0.1)
finished_greenlets += self._greenlet2file(flying_greenlets, local_file, broken,
**restargs)
gevent.joinall(finished_greenlets)
def download_object(self, obj, dst, download_cb=None, version=None, overide=False, resume=False,
range=None, if_match=None, if_none_match=None, if_modified_since=None,
if_unmodified_since=None):
#init REST api args
restargs=dict(version=version,
data_range = None if range is None else 'bytes=%s'%range,
if_match=if_match,
if_none_match=if_none_match,
if_modified_since=if_modified_since,
if_unmodified_since=if_unmodified_since)
#1. get remote object hash info
( blocksize,
blockhash,
total_size,
hash_list,
remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
assert total_size >= 0
if download_cb:
self.progress_bar_gen = download_cb(len(remote_hashes)+1)
self._cb_next()
if dst.isatty():
self._dump_blocks_sync(obj, hash_list, blocksize, total_size, dst, **restargs)
elif resume:
self._filter_out_downloaded_hashses(remote_hashes, hash_list, dst, blocksize, blockhash)
self._dump_blocks_sync(obj, hash_list, blocksize, total_size, dst, **restargs)
else:
self._async_download_missing_blocks(obj, objfile, hmap, resumed, blocksize, total_size,
download_gen, custom_start=custom_start, custom_end=custom_end, **restargs)
self._dump_blocks_async(obj, remote_hashes, blocksize, total_size, dst, **restargs)
dst.truncate(total_size)
self._complete_cb()
#Command Progress Bar method
def _cb_next(self):
if hasattr(self, 'progress_bar_gen'):
try:
self.progress_bar_gen.next()
except:
pass
def _complete_cb(self):
while True:
try:
self.progress_bar_gen.next()
except:
break
def get_object_hashmap(self, obj, version=None, if_match=None, if_none_match=None,
if_modified_since=None, if_unmodified_since=None):
if_modified_since=None, if_unmodified_since=None, data_range=None):
try:
r = self.object_get(obj, hashmap=True, version=version, if_etag_match=if_match,
if_etag_not_match=if_none_match, if_modified_since=if_modified_since,
if_unmodified_since=if_unmodified_since)
if_unmodified_since=if_unmodified_since, data_range=data_range)
except ClientError as err:
if err.status == 304 or err.status == 412:
return {}
raise
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment