Commit fb0cd49a authored by Stavros Sachtouris's avatar Stavros Sachtouris
Browse files

Workarround an eventlist random bug in downloading

parent 699d3bb1
......@@ -252,7 +252,10 @@ class PithosClient(PithosRestAPI):
#resume if some blocks have been downloaded
resumed[existing_hash] = i
if with_progress_bar:
download_gen.next()
try:
download_gen.next()
except:
pass
elif not overide:
raise ClientError(message='Local file is substantialy different',
status=600)
......@@ -270,7 +273,7 @@ class PithosClient(PithosRestAPI):
end = custom_end
return (start, end)
def _manage_finished_downloading_greenlets(self, flying, objfile, sleeptime=0):
def _manage_downloading_greenlets(self, flying, objfile, broken_greenlets = [], sleeptime=0):
newflying = []
for v in flying:
h = v['handler']
......@@ -278,8 +281,15 @@ class PithosClient(PithosRestAPI):
if h.exception:
h.release()
raise h.exception
try:
block = h.value.content
except AttributeError:
#catch greenlets that break due to an eventlist bug
print('- - - - - > Got a borken greenlet here')
broken_greenlets.append(v)
continue
objfile.seek(v['start'])
objfile.write(h.value.content)
objfile.write(block)
objfile.flush()
else:
#if there are unfinished greenlets, sleep for some time - be carefull with that
......@@ -299,12 +309,13 @@ class PithosClient(PithosRestAPI):
gevent.Greenlet._report_error(self, exc_info)
finally:
sys.stderr = _stderr
POOL_SIZE =5
POOL_SIZE =7
if self.async_pool is None:
self.async_pool = gevent.pool.Pool(size=POOL_SIZE)
g = SilentGreenlet(self._get_block, obj, **kwargs)
self.async_pool.start(g)
return g
def _async_download_missing_blocks(self, obj, objfile, hmap, resumed, blocksize, total_size,
download_gen=None, custom_start = None, custom_end=None, **restargs):
"""Attempt pseudo-multithreaded (with greenlets) download of blocks, or if that
......@@ -326,11 +337,24 @@ class PithosClient(PithosRestAPI):
data_range = 'bytes=%s-%s'%(start, end)
handler = self._get_block_async(obj, data_range=data_range, **restargs)
flying.append({'handler':handler, 'start':start, 'data_range':data_range})
flying = self._manage_finished_downloading_greenlets(flying, objfile)
broken = []
flying = self._manage_downloading_greenlets(flying, objfile, broken_greenlets=broken)
#workaround for eventlib bug that breaks greenlets: replace them with new ones
for brgr in broken:
brgr['handler'] = self._get_block_async(obj, data_range=brgr['data_range'],
**restargs)
flying.append(brgr)
#write the last results and exit
while len(flying) > 0:
flying=self._manage_finished_downloading_greenlets(flying, objfile, sleeptime=0.1)
broken = []
flying=self._manage_downloading_greenlets(flying, objfile, broken_greenlets=broken,
sleeptime=0.1)
#workaround for eventlib bug that breaks greenlets: replace them with new ones
for brgr in broken:
brgr['handler'] = self._get_block_async(obj, data_range=brgr['data_range'],
**restargs)
flying.append(brgr)
objfile.truncate(total_size)
gevent.joinall(flying)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment