Commit 699d3bb1 authored by Stavros Sachtouris's avatar Stavros Sachtouris
Browse files

Make object-download code readable

parent afd9d603
......@@ -39,7 +39,7 @@ import gevent.pool
from os import fstat, path
from hashlib import new as newhashlib
from time import time
from time import time, sleep
from datetime import datetime
import sys
......@@ -258,14 +258,107 @@ class PithosClient(PithosRestAPI):
status=600)
return resumed
def download_object(self, obj, f, download_cb=None, version=None, overide=False, range=None,
def _get_block_range(self, blockid, blocksize, total_size, custom_start, custom_end):
start = blockid*blocksize
if custom_start is not None:
if start < custom_start:
start = custom_start
elif start > custom_end:
return (None, None)
end = start + blocksize -1 if start+blocksize < total_size else total_size -1
if custom_end is not None and end > custom_end:
end = custom_end
return (start, end)
def _manage_finished_downloading_greenlets(self, flying, objfile, sleeptime=0):
newflying = []
for v in flying:
h = v['handler']
if h.ready():
if h.exception:
h.release()
raise h.exception
objfile.seek(v['start'])
objfile.write(h.value.content)
objfile.flush()
else:
#if there are unfinished greenlets, sleep for some time - be carefull with that
sleep(sleeptime)
newflying.append(v)
return newflying
def _get_block(self, obj, **kwargs):
return self.object_get(obj, success=(200, 206), binary=True, **kwargs)
def _get_block_async(self, obj, **kwargs):
class SilentGreenlet(gevent.Greenlet):
def _report_error(self, exc_info):
_stderr = sys._stderr
try:
sys.stderr = StringIO()
gevent.Greenlet._report_error(self, exc_info)
finally:
sys.stderr = _stderr
POOL_SIZE =5
if self.async_pool is None:
self.async_pool = gevent.pool.Pool(size=POOL_SIZE)
g = SilentGreenlet(self._get_block, obj, **kwargs)
self.async_pool.start(g)
return g
def _async_download_missing_blocks(self, obj, objfile, hmap, resumed, blocksize, total_size,
download_gen=None, custom_start = None, custom_end=None, **restargs):
"""Attempt pseudo-multithreaded (with greenlets) download of blocks, or if that
is not possible retreat to sequensial block download
"""
flying = []
for i, h in enumerate(hmap):
if h in resumed:
continue
if download_gen:
try:
download_gen.next()
except StopIteration:
pass
(start, end) = self._get_block_range(i, blocksize, total_size, custom_start, custom_end)
if start is None:
continue
data_range = 'bytes=%s-%s'%(start, end)
handler = self._get_block_async(obj, data_range=data_range, **restargs)
flying.append({'handler':handler, 'start':start, 'data_range':data_range})
flying = self._manage_finished_downloading_greenlets(flying, objfile)
#write the last results and exit
while len(flying) > 0:
flying=self._manage_finished_downloading_greenlets(flying, objfile, sleeptime=0.1)
objfile.truncate(total_size)
gevent.joinall(flying)
def _append_missing_blocks(self, obj, objfile, hmap, resumed, blocksize, total_size,
download_gen=None, custom_start=None, custom_end=None, **restargs):
for i, h in enumerate(hmap):
if h in resumed:
continue
if download_gen:
try:
download_gen.next()
except StopIteration:
pass
(start, end) = self._get_block_range(i, blocksize, total_size, custom_start, custom_end)
data_range = 'bytes=%s-%s'%(start, end)
r = self._get_block(obj, data_range=data_range, **restargs)
objfile.write(r.content)
objfile.flush()
def download_object(self, obj, objfile, download_cb=None, version=None, overide=False, range=None,
if_match=None, if_none_match=None, if_modified_since=None, if_unmodified_since=None):
"""overide is forcing the local file to become exactly as the remote, even if it is
substantialy different
"""
self.assert_container()
islocalfile = False if f.isatty() else True
(blocksize, blockhash, total_size, hmap, map_dict) = self._get_object_block_info(obj,
version=version, if_match=if_match, if_none_match=if_none_match,
if_modified_since=if_modified_since, if_unmodified_since=if_unmodified_since)
......@@ -273,105 +366,26 @@ class PithosClient(PithosRestAPI):
if total_size <= 0:
return
if range is not None:
(custom_start, custom_end) = self._get_range_limits(range)
(custom_start, custom_end) = (None, None) if range is None \
else self._get_range_limits(range)
#load progress bar
if download_cb is not None:
download_gen = download_cb(total_size/blocksize + 1)
download_gen.next()
resumed = self._get_downloaded_blocks(hmap, f, blocksize, blockhash, map_dict,
overide=overide, download_gen=download_gen)
#download and save/print
flying = []
for i, h in enumerate(hmap):
if h in resumed:
continue
if download_cb is not None:
try:
download_gen.next()
except StopIteration:
pass
start = i*blocksize
if range is not None:
if start < custom_start:
start = custom_start
elif start > custom_end:
continue
end = start + blocksize -1 if start+blocksize < total_size else total_size -1
if range is not None and end > custom_end:
end = custom_end
data_range = 'bytes=%s-%s'%(start, end)
result_array = []
if islocalfile:
handler = self._get_block_async(obj, data_range=data_range, version=version,
if_etag_match=if_match, if_etag_not_match=if_none_match,
if_modified_since=if_modified_since, if_unmodified_since=if_unmodified_since)
flying.append({'handler':handler, 'start':start, 'data_range':data_range})
newflying = []
for v in flying:
h = v['handler']
if h.ready():
if h.exception:
h.release()
raise h.exception
f.seek(v['start'])
f.write(h.value.content)
f.flush()
#h.value.release()
else:
newflying.append(v)
flying = newflying
else:
r = self._get_block(obj, data_range=data_range, version=version,
if_etag_match=if_match, if_etag_not_match=if_none_match,
if_modified_since=if_modified_since, if_unmodified_since=if_unmodified_since)
f.write(r.content)
f.flush()
#write the last results and exit
if islocalfile:
from time import sleep
while len(flying) > 0:
result_array=[]
newflying = []
for v in flying:
h = v['handler']
if h.ready():
if h.exception:
h.release()
raise h.exception
f.seek(v['start'])
f.write(h.value.content)
f.flush()
#h.value.release()
else:
sleep(.2)
newflying.append(v)
flying = newflying
f.truncate(total_size)
gevent.joinall(flying)
resumed = self._get_downloaded_blocks(hmap, objfile, blocksize, blockhash, map_dict,
overide=overide, download_gen=download_gen)
restargs=dict(version=version, if_etag_match=if_match, if_etag_not_match=if_none_match,
if_modified_since=if_modified_since, if_unmodified_since=if_unmodified_since)
def _get_block(self, obj, **kwargs):
return self.object_get(obj, success=(200, 206), binary=True, **kwargs)
if objfile.isatty():
self._append_missing_blocks(obj, objfile, hmap, resumed, blocksize, total_size,
download_gen, custom_start=custom_start, custom_end=custom_end, **restargs)
else:
self._async_download_missing_blocks(obj, objfile, hmap, resumed, blocksize, total_size,
download_gen, custom_start=custom_start, custom_end=custom_end, **restargs)
def _get_block_async(self, obj, **kwargs):
class SilentGreenlet(gevent.Greenlet):
def _report_error(self, exc_info):
_stderr = sys._stderr
try:
sys.stderr = StringIO()
gevent.Greenlet._report_error(self, exc_info)
finally:
sys.stderr = _stderr
POOL_SIZE = 5
if self.async_pool is None:
self.async_pool = gevent.pool.Pool(size=POOL_SIZE)
g = SilentGreenlet(self._get_block, obj, **kwargs)
self.async_pool.start(g)
return g
def get_object_hashmap(self, obj, version=None, if_match=None, if_none_match=None,
if_modified_since=None, if_unmodified_since=None):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment