Commit f27ed9a0 authored by Stavros Sachtouris's avatar Stavros Sachtouris
Browse files

Sync progressbar to threads, adjust ctrl-c handler

In pithos download/upload ctrl-c is caught when multithreading,
and wait for active threads to finish before exiting
parent e02728f9
......@@ -98,16 +98,9 @@ class ProgressBarArgument(FlagArgument):
except NameError:
print('Waring: no progress bar functionality')
@property
def value(self):
return getattr(self, '_value', self.default)
@value.setter
def value(self, newvalue):
"""By default, it is on (True)"""
self._value = not newvalue
def get_generator(self, message, message_len=25):
if self.value:
return None
try:
bar = ProgressBar(message.ljust(message_len))
except NameError:
......@@ -538,7 +531,7 @@ class store_append(_store_container_command):
def __init__(self, arguments={}):
super(self.__class__, self).__init__(arguments)
self.arguments['progress_bar'] = ProgressBarArgument(\
'do not show progress bar', '--no-progress-bar')
'do not show progress bar', '--no-progress-bar', False)
def main(self, local_path, container___path):
super(self.__class__,
......@@ -577,7 +570,7 @@ class store_overwrite(_store_container_command):
def __init__(self, arguments={}):
super(self.__class__, self).__init__(arguments)
self.arguments['progress_bar'] = ProgressBarArgument(\
'do not show progress bar', '--no-progress-bar')
'do not show progress bar', '--no-progress-bar', False)
def main(self, local_path, container___path, start, end):
super(self.__class__,
......@@ -636,6 +629,7 @@ class store_manifest(_store_container_command):
class store_upload(_store_container_command):
"""Upload a file"""
def __init__(self, arguments={}):
super(self.__class__, self).__init__(arguments)
self.arguments['use_hashes'] = FlagArgument(\
......@@ -658,7 +652,7 @@ class store_upload(_store_container_command):
self.arguments['poolsize'] = IntArgument(\
'set pool size', '--with-pool-size')
self.arguments['progress_bar'] = ProgressBarArgument(\
'do not show progress bar', '--no-progress-bar')
'do not show progress bar', '--no-progress-bar', False)
def main(self, local_path, container____path__):
super(self.__class__, self).main(container____path__)
......@@ -752,7 +746,7 @@ class store_download(_store_container_command):
self.arguments['poolsize'] = IntArgument(\
'set pool size', '--with-pool-size')
self.arguments['progress_bar'] = ProgressBarArgument(\
'do not show progress bar', '--no-progress-bar')
'do not show progress bar', '--no-progress-bar', False)
def main(self, container___path, local_path):
super(self.__class__,
......@@ -790,6 +784,15 @@ class store_download(_store_container_command):
except ClientError as err:
raiseCLIError(err)
except KeyboardInterrupt:
from threading import enumerate as activethreads
stdout.write('\nFinishing active threads ')
for thread in activethreads():
stdout.flush()
try:
thread.join()
stdout.write('.')
except RuntimeError:
continue
print('\ndownload canceled by user')
if local_path is not None:
print('to resume, re-run with --resume')
......
......@@ -32,6 +32,7 @@
# or implied, of GRNET S.A.
from threading import Thread
from threading import enumerate as activethreads
from os import fstat
from hashlib import new as newhashlib
......@@ -97,7 +98,7 @@ class PithosClient(PithosRestAPI):
def __init__(self, base_url, token, account=None, container=None):
super(PithosClient, self).__init__(base_url, token, account, container)
self.async_pool = None
self.POOL_SIZE = 5
def purge_container(self):
r = self.container_delete(until=unicode(time()))
......@@ -246,18 +247,15 @@ class PithosClient(PithosRestAPI):
r = self.put_block_async(data, hash)
flying.append(r)
unfinished = []
for thread in flying:
for i, thread in enumerate(flying):
if i % self.POOL_SIZE == 0:
thread.join(0.1)
if thread.isAlive() or thread.exception:
unfinished.append(thread)
else:
if upload_cb:
upload_gen.next()
flying = unfinished
while upload_cb:
try:
upload_gen.next()
except StopIteration:
break
for thread in flying:
thread.join()
......@@ -270,6 +268,12 @@ class PithosClient(PithosRestAPI):
status=505,
details=details)
while upload_cb:
try:
upload_gen.next()
except StopIteration:
break
def upload_object(self, obj, f,
size=None,
hash_cb=None,
......@@ -307,9 +311,18 @@ class PithosClient(PithosRestAPI):
if missing is None:
return
self._upload_missing_blocks(missing, hmap, f, upload_cb=upload_cb)
if len(missing) > self.POOL_SIZE:
self.POOL_SIZE = len(missing) // 10
try:
self._upload_missing_blocks(missing, hmap, f, upload_cb=upload_cb)
except KeyboardInterrupt:
print('- - - wait for threads to finish')
for thread in activethreads():
thread.join()
raise
r = self.object_put(obj,
r = self.object_put(
obj,
format='json',
hashmap=True,
content_type=content_type,
......@@ -317,8 +330,7 @@ class PithosClient(PithosRestAPI):
success=201)
r.release()
#download_* auxiliary methods
#ALl untested
# download_* auxiliary methods
def _get_remote_blocks_info(self, obj, **restargs):
#retrieve object hashmap
myrange = restargs.pop('data_range', None)
......@@ -379,7 +391,9 @@ class PithosClient(PithosRestAPI):
- e.g. if the range is 10-100, all
blocks will be written to normal_position - 10"""
finished = []
for start, g in flying.items():
for i, (start, g) in enumerate(flying.items()):
if i % self.POOL_SIZE == 0:
g.join(0.1)
if not g.isAlive():
if g.exception:
raise g.exception
......@@ -409,20 +423,22 @@ class PithosClient(PithosRestAPI):
if filerange is not None:
rstart = int(filerange.split('-')[0])
offset = rstart if blocksize > rstart else rstart % blocksize
for block_hash, blockid in remote_hashes.items():
start = blocksize * blockid
if start < file_size\
and block_hash == self._hash_from_file(local_file,
start,
blocksize,
blockhash):
and block_hash == self._hash_from_file(
local_file,
start,
blocksize,
blockhash):
self._cb_next()
continue
if len(flying) >= self.POOL_SIZE:
finished += self._thread2file(flying,
local_file,
offset,
**restargs)
finished += self._thread2file(
flying,
local_file,
offset,
**restargs)
end = total_size - 1 if start + blocksize > total_size\
else start + blocksize - 1
(start, end) = _range_up(start, end, filerange)
......@@ -434,6 +450,7 @@ class PithosClient(PithosRestAPI):
for thread in flying.values():
thread.join()
print('Any downloing yet?')
finished += self._thread2file(flying, local_file, offset, **restargs)
def download_object(self,
......@@ -462,7 +479,6 @@ class PithosClient(PithosRestAPI):
hash_list,
remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
assert total_size >= 0
self.POOL_SIZE = 5
if download_cb:
self.progress_bar_gen = download_cb(len(remote_hashes))
......@@ -477,6 +493,8 @@ class PithosClient(PithosRestAPI):
range,
**restargs)
else:
if len(remote_hashes) > self.POOL_SIZE:
self.POOL_SIZE = len(remote_hashes) // 10
self._dump_blocks_async(obj,
remote_hashes,
blocksize,
......
......@@ -2225,7 +2225,7 @@ class testPithos(unittest.TestCase):
import random
random.seed(self.now)
f = open(self.fname, 'w')
sys.stdout.write(' create random file %s of size %s' % (name, size))
sys.stdout.write(' create random file %s of size %s ' % (name, size))
for hobyte_id in range(size / 8):
sss = 'hobt%s' % random.randint(1000, 9999)
f.write(sss)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment