Commit e9abe82b authored by Stavros Sachtouris's avatar Stavros Sachtouris

Avoid to many threads in upload

parent a6ad7781
...@@ -239,6 +239,10 @@ class PithosClient(PithosRestAPI): ...@@ -239,6 +239,10 @@ class PithosClient(PithosRestAPI):
upload_gen = upload_cb(len(missing)) upload_gen = upload_cb(len(missing))
upload_gen.next() upload_gen.next()
thread_limit = 1
elapsed_old = 0.0
elapsed_new = 0.0
flying = [] flying = []
for hash in missing: for hash in missing:
offset, bytes = hmap[hash] offset, bytes = hmap[hash]
...@@ -248,8 +252,24 @@ class PithosClient(PithosRestAPI): ...@@ -248,8 +252,24 @@ class PithosClient(PithosRestAPI):
flying.append(r) flying.append(r)
unfinished = [] unfinished = []
for i, thread in enumerate(flying): for i, thread in enumerate(flying):
if i % self.POOL_SIZE == 0:
thread.join(0.1) if elapsed_old:
if elapsed_old >= elapsed_new\
and thread_limit < self.POOL_SIZE:
thread_limit += 1
elif elapsed_old < elapsed_new and thread_limit > 1:
thread_limit -= 1
if len(unfinished) >= thread_limit:
elapsed_old = elapsed_new
elapsed_new = 0.0
for unf in unfinished:
begin_time = time()
unf.join()
elapsed_new += time() - begin_time
elapsed_new = elapsed_new / len(unfinished)
unfinished = []
if thread.isAlive() or thread.exception: if thread.isAlive() or thread.exception:
unfinished.append(thread) unfinished.append(thread)
else: else:
...@@ -311,8 +331,6 @@ class PithosClient(PithosRestAPI): ...@@ -311,8 +331,6 @@ class PithosClient(PithosRestAPI):
if missing is None: if missing is None:
return return
if len(missing) > self.POOL_SIZE:
self.POOL_SIZE = len(missing) // 10
try: try:
self._upload_missing_blocks(missing, hmap, f, upload_cb=upload_cb) self._upload_missing_blocks(missing, hmap, f, upload_cb=upload_cb)
except KeyboardInterrupt: except KeyboardInterrupt:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment