Commit 7644c38e authored by Stavros Sachtouris's avatar Stavros Sachtouris
Browse files

Handle thread failures in download

If some threads fail for some reason, download re-tries 2 more times,
provided there is some progress
parent 4fcc38a2
......@@ -920,7 +920,12 @@ class store_upload(_store_container_command):
err,
'No container %s in account %s'\
% (self.container, self.account),
details=self.generic_err_details)
details=[self.generic_err_details])
elif err.status == 800:
raiseCLIError(err, details=[
'Possible cause: temporary server failure',
'Try to re-upload the file',
'For more error details, try kamaki store upload -d'])
raise_connection_errors(err)
raiseCLIError(err, '"%s" not accessible' % container____path__)
except IOError as err:
......
......@@ -94,7 +94,11 @@ class SilentEvent(Thread):
try:
self._value = self.method(*(self.args), **(self.kwargs))
except Exception as e:
print('______\n%s\n_______' % e)
recvlog.debug('Thread %s got exception %s\n<%s %s' % (
self,
type(e),
e.status if isinstance(e, ClientError) else '',
e))
self._exception = e
......@@ -204,7 +208,7 @@ class Client(object):
for key, val in r.headers.items():
recvlog.info('%s: %s', key, val)
if r.content:
recvlog.debug(r.content)
recvlog.info(r.content[:256] + ' ...')
except (HTTPResponseError, HTTPConnectionError) as err:
from traceback import format_stack
......
......@@ -246,8 +246,8 @@ class PithosClient(PithosRestAPI):
if hash_cb:
hash_gen.next()
if offset != size:
print("Size is %i" % size)
print("Offset is %i" % offset)
#print("Size is %i" % size)
#print("Offset is %i" % offset)
assert offset == size, \
"Failed to calculate uploaded blocks: " \
"Offset and object size do not match"
......@@ -264,6 +264,7 @@ class PithosClient(PithosRestAPI):
self._init_thread_limit()
flying = []
failures = []
for hash in missing:
offset, bytes = hmap[hash]
fileobj.seek(offset)
......@@ -271,28 +272,22 @@ class PithosClient(PithosRestAPI):
r = self._put_block_async(data, hash, upload_gen)
flying.append(r)
unfinished = []
for i, thread in enumerate(flying):
for thread in flying:
unfinished = self._watch_thread_limit(unfinished)
if thread.isAlive() or thread.exception:
if thread.exception:
failures.append(thread)
elif thread.isAlive():
unfinished.append(thread)
#else:
#if upload_cb:
# upload_gen.next()
flying = unfinished
for thread in flying:
thread.join()
#upload_gen.next()
if thread.exception:
failures.append(thread)
failures = [r for r in flying if r.exception]
if len(failures):
details = ', '.join([' (%s).%s' % (i, r.exception)\
for i, r in enumerate(failures)])
raise ClientError(message="Block uploading failed",
status=505,
details=details)
return [failure.kwargs['hash'] for failure in failures]
def upload_object(self, obj, f,
size=None,
......@@ -355,8 +350,24 @@ class PithosClient(PithosRestAPI):
if missing is None:
return
retries = 3
try:
self._upload_missing_blocks(missing, hmap, f, upload_cb=upload_cb)
while retries:
num_of_blocks = len(missing)
missing = self._upload_missing_blocks(
missing,
hmap,
f,
upload_cb=upload_cb)
if missing and num_of_blocks > len(missing):
num_of_blocks = len(missing)
retries -= 1
else:
break
if missing:
raise ClientError(
'%s blocks failed to upload' % len(missing),
status=800)
except KeyboardInterrupt:
sendlog.info('- - - wait for threads to finish')
for thread in activethreads():
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment