Commit cad39033 authored by Stavros Sachtouris's avatar Stavros Sachtouris

Gradual thread inc/decrease in up/download

Thread inc/decrease is now generic for all clients that might need it
with the method _watch_thread_limit

Algorithm: set a small thread limit and when reached wait for active threads to
finish. Measure avg time. If avg time decreases, increase thread limit
If avg time increases, decrease thread limit.
parent e9533b0c
......@@ -133,7 +133,6 @@ class Shell(Cmd):
self._unregister_method('complete_%s' % subname)
self._unregister_method('help_%s' % subname)
@classmethod
def _backup(self):
return dict(self.__dict__)
......
......@@ -32,6 +32,7 @@
# or implied, of GRNET S.A.
from json import dumps, loads
from time import time
import logging
from kamaki.clients.connection.kamakicon import KamakiHTTPConnection
......@@ -68,6 +69,8 @@ class ClientError(Exception):
class Client(object):
POOL_SIZE = 5
def __init__(self, base_url, token, http_client=KamakiHTTPConnection()):
self.base_url = base_url
self.token = token
......@@ -77,6 +80,29 @@ class Client(object):
"%a, %d %b %Y %H:%M:%S GMT"]
self.http_client = http_client
def _init_thread_limit(self, limit=1):
self._thread_limit = limit
self._elapsed_old = 0.0
self._elapsed_new = 0.0
def _watch_thread_limit(self, threadlist):
if self._elapsed_old > self._elapsed_new\
and self._thread_limit < self.POOL_SIZE:
self._thread_limit += 1
elif self._elapsed_old < self._elapsed_new and self._thread_limit > 1:
self._thread_limit -= 1
self._elapsed_old = self._elapsed_new
if len(threadlist) >= self._thread_limit:
self._elapsed_new = 0.0
for thread in threadlist:
begin_time = time()
thread.join()
self._elapsed_new += time() - begin_time
self._elapsed_new = self._elapsed_new / len(threadlist)
return []
return threadlist
def _raise_for_status(self, r):
status_msg = getattr(r, 'status', '')
try:
......
......@@ -98,7 +98,6 @@ class PithosClient(PithosRestAPI):
def __init__(self, base_url, token, account=None, container=None):
super(PithosClient, self).__init__(base_url, token, account, container)
self.POOL_SIZE = 5
def purge_container(self):
r = self.container_delete(until=unicode(time()))
......@@ -239,9 +238,7 @@ class PithosClient(PithosRestAPI):
upload_gen = upload_cb(len(missing))
upload_gen.next()
thread_limit = 1
elapsed_old = 0.0
elapsed_new = 0.0
self._init_thread_limit()
flying = []
for hash in missing:
......@@ -253,22 +250,7 @@ class PithosClient(PithosRestAPI):
unfinished = []
for i, thread in enumerate(flying):
if elapsed_old:
if elapsed_old >= elapsed_new\
and thread_limit < self.POOL_SIZE:
thread_limit += 1
elif elapsed_old < elapsed_new and thread_limit > 1:
thread_limit -= 1
if len(unfinished) >= thread_limit:
elapsed_old = elapsed_new
elapsed_new = 0.0
for unf in unfinished:
begin_time = time()
unf.join()
elapsed_new += time() - begin_time
elapsed_new = elapsed_new / len(unfinished)
unfinished = []
unfinished = self._watch_thread_limit(unfinished)
if thread.isAlive() or thread.exception:
unfinished.append(thread)
......@@ -410,8 +392,8 @@ class PithosClient(PithosRestAPI):
blocks will be written to normal_position - 10"""
finished = []
for i, (start, g) in enumerate(flying.items()):
if i % self.POOL_SIZE == 0:
g.join(0.1)
#if i % self.POOL_SIZE == 0:
# g.join(0.1)
if not g.isAlive():
if g.exception:
raise g.exception
......@@ -442,6 +424,7 @@ class PithosClient(PithosRestAPI):
rstart = int(filerange.split('-')[0])
offset = rstart if blocksize > rstart else rstart % blocksize
self._init_thread_limit()
for block_hash, blockid in remote_hashes.items():
start = blocksize * blockid
if start < file_size\
......@@ -452,6 +435,7 @@ class PithosClient(PithosRestAPI):
blockhash):
self._cb_next()
continue
self._watch_thread_limit(flying.values())
finished += self._thread2file(
flying,
local_file,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment