Commit 6fa30b1b authored by Stavros Sachtouris's avatar Stavros Sachtouris
Browse files

Functional test and debug for upload_from_string

Refs: #3608
parent 9d502497
......@@ -689,7 +689,6 @@ class Pithos(livetest.Generic):
trg_fname,
range_str='%s-%s' % (pos, (pos + 128)))
self.assertEqual(tmp_s, src_f.read(len(tmp_s)))
print('\tUploading KiBs as strings...')
trg_fname = 'fromString_%s' % self.now
src_size = 2 * 1024
......
......@@ -417,7 +417,9 @@ class PithosClient(PithosRestClient):
else:
break
if missing:
raise ClientError('%s blocks failed to upload' % len(missing))
raise ClientError(
'%s blocks failed to upload' % len(missing),
details=['%s' % thread.exception for thread in missing])
except KeyboardInterrupt:
sendlog.info('- - - wait for threads to finish')
for thread in activethreads():
......@@ -438,7 +440,6 @@ class PithosClient(PithosRestClient):
success=201)
return r.headers
def upload_from_string(
self, obj, input_str,
hash_cb=None,
......@@ -497,12 +498,12 @@ class PithosClient(PithosRestClient):
num_of_blocks, blockmod = size / blocksize, size % blocksize
num_of_blocks += (1 if blockmod else 0) if num_of_blocks else blockmod
hashes = {}
hashes = []
hmap = {}
for blockid in range(num_of_blocks):
start = blockid * blocksize
block = input_str[start: (start + blocksize)]
hashes[blockid] = _pithos_hash(block, blockhash)
hashes.append(_pithos_hash(block, blockhash))
hmap[hashes[blockid]] = (start, block)
hashmap = dict(bytes=size, hashes=hashes)
......@@ -525,27 +526,38 @@ class PithosClient(PithosRestClient):
for i in range(num_of_blocks + 1 - num_of_missing):
self._cb_next()
tries = 7
old_failures = 0
try:
flying = []
for hash in missing:
offset, block = hmap[hash]
bird = self._put_block_async(block, hash)
flying.append(bird)
unfinished = self._watch_thread_limit(flying)
for thread in set(flying).difference(unfinished):
while tries and missing:
flying = []
failures = []
for hash in missing:
offset, block = hmap[hash]
bird = self._put_block_async(block, hash)
flying.append(bird)
unfinished = self._watch_thread_limit(flying)
for thread in set(flying).difference(unfinished):
if thread.exception:
failures.append(thread.kwargs['hash'])
if thread.isAlive():
flying.append(thread)
else:
self._cb_next()
flying = unfinished
for thread in flying:
thread.join()
if thread.exception:
raise thread.exception
if thread.isAlive():
flying.append(thread)
else:
self._cb_next()
flying = unfinished
for thread in flying:
thread.join()
if thread.exception:
raise thread.exception
self._cb_next()
failures.append(thread.kwargs['hash'])
self._cb_next()
missing = failures
if missing and len(missing) == old_failures:
tries -= 1
old_failures = len(missing)
if missing:
raise ClientError(
'%s blocks failed to upload' % len(missing),
details=['%s' % thread.exception for thread in missing])
except KeyboardInterrupt:
sendlog.info('- - - wait for threads to finish')
for thread in activethreads():
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment