Commit 9fa0fe6f authored by Stavros Sachtouris's avatar Stavros Sachtouris Committed by Giorgos Korfiatis

Implement a stream download method, use it in cat

"kamaki file cat" can now output data as they are being downloaded
It is also safe to pipe it
The new library method "stream_down" is implemented in
    "kamaki.clients.PithosClient" and it sequentially downloads
    parts of a remote object to a buffer, which is then written to
    the destination. The destination descriptor is prepared and
    provided by the caller (i.e., "kamaki file cat"). Each part of
    the object is downloaded asynchronously, though.
parent 9da30c76
...@@ -1195,7 +1195,9 @@ class file_cat(_PithosContainer): ...@@ -1195,7 +1195,9 @@ class file_cat(_PithosContainer):
if_unmodified_since=DateArgument( if_unmodified_since=DateArgument(
'show output unmodified since then', '--if-unmodified-since'), 'show output unmodified since then', '--if-unmodified-since'),
object_version=ValueArgument( object_version=ValueArgument(
'Get contents of the chosen version', '--object-version') 'Get contents of the chosen version', '--object-version'),
'Size of buffer in blocks (default: 4)', '--buffer-blocks')
) )
@errors.Generic.all @errors.Generic.all
...@@ -1203,14 +1205,16 @@ class file_cat(_PithosContainer): ...@@ -1203,14 +1205,16 @@ class file_cat(_PithosContainer):
@errors.Pithos.object_path @errors.Pithos.object_path
def _run(self): def _run(self):
try: try:
self.client.download_object( # self.client.download_object(
self.path, self._out, self.path, self._out,
range_str=self['range'], range_str=self['range'],
version=self['object_version'], version=self['object_version'],
if_match=self['if_match'], if_match=self['if_match'],
if_none_match=self['if_none_match'], if_none_match=self['if_none_match'],
if_modified_since=self['if_modified_since'], if_modified_since=self['if_modified_since'],
if_unmodified_since=self['if_unmodified_since']) if_unmodified_since=self['if_unmodified_since'],
except ClientError as ce: except ClientError as ce:
if ce.status in (404, ): if ce.status in (404, ):
self._container_exists() self._container_exists()
...@@ -1219,6 +1223,11 @@ class file_cat(_PithosContainer): ...@@ -1219,6 +1223,11 @@ class file_cat(_PithosContainer):
def main(self, path_or_url): def main(self, path_or_url):
super(self.__class__, self)._run(path_or_url) super(self.__class__, self)._run(path_or_url)
if self['buffer_blocks'] is not None and self['buffer_blocks'] < 1:
arg = self.arguments['buffer_blocks']
raise CLIInvalidArgument(
'Invalid value %s' % arg.value, importance=2, details=[
'%s must be a possitive integer' % arg.lvalue])
self._run() self._run()
...@@ -49,6 +49,12 @@ from kamaki.clients.utils import path4url, filter_in, readall ...@@ -49,6 +49,12 @@ from kamaki.clients.utils import path4url, filter_in, readall
LOG = getLogger(__name__) LOG = getLogger(__name__)
def _dump_buffer(buffer_, destination):
"""Append buffer to destination (file descriptor)"""
def _pithos_hash(block, blockhash): def _pithos_hash(block, blockhash):
h = newhashlib(blockhash) h = newhashlib(blockhash)
h.update(block.rstrip('\x00')) h.update(block.rstrip('\x00'))
...@@ -664,7 +670,8 @@ class PithosClient(PithosRestClient): ...@@ -664,7 +670,8 @@ class PithosClient(PithosRestClient):
def _get_remote_blocks_info(self, obj, **restargs): def _get_remote_blocks_info(self, obj, **restargs):
# retrieve object hashmap # retrieve object hashmap
myrange = restargs.pop('data_range', None) myrange = restargs.pop('data_range', None)
hashmap = self.get_object_hashmap(obj, **restargs) hashmap = restargs.pop('hashmap', None) or (
self.get_object_hashmap(obj, **restargs))
restargs['data_range'] = myrange restargs['data_range'] = myrange
blocksize = int(hashmap['block_size']) blocksize = int(hashmap['block_size'])
blockhash = hashmap['block_hash'] blockhash = hashmap['block_hash']
...@@ -725,6 +732,7 @@ class PithosClient(PithosRestClient): ...@@ -725,6 +732,7 @@ class PithosClient(PithosRestClient):
raise g.exception raise g.exception
block = g.value.content block = g.value.content
for block_start in blockids[key]: for block_start in blockids[key]:
# This should not be used in all cases + offset) + offset)
local_file.write(block) local_file.write(block)
self._cb_next() self._cb_next()
...@@ -849,6 +857,7 @@ class PithosClient(PithosRestClient): ...@@ -849,6 +857,7 @@ class PithosClient(PithosRestClient):
range_str, range_str,
**restargs) **restargs)
if not range_str: if not range_str:
# this should not be used in all cases
dst.truncate(total_size) dst.truncate(total_size)
self._complete_cb() self._complete_cb()
...@@ -862,6 +871,8 @@ class PithosClient(PithosRestClient): ...@@ -862,6 +871,8 @@ class PithosClient(PithosRestClient):
if_none_match=None, if_none_match=None,
if_modified_since=None, if_modified_since=None,
if_unmodified_since=None, if_unmodified_since=None,
headers=dict()): headers=dict()):
"""Download an object to a string (multiple connections). This method """Download an object to a string (multiple connections). This method
uses threads for http requests, but stores all content in memory. uses threads for http requests, but stores all content in memory.
...@@ -882,6 +893,13 @@ class PithosClient(PithosRestClient): ...@@ -882,6 +893,13 @@ class PithosClient(PithosRestClient):
:param if_unmodified_since: (str) formated date :param if_unmodified_since: (str) formated date
:param remote_block_info: (tuple) blocksize, blockhas, total_size and
:param hashmap: (dict) the remote object hashmap, if it is available
e.g., from another call. Used for minimizing HEAD container
:param headers: (dict) a placeholder dict to gather object headers :param headers: (dict) a placeholder dict to gather object headers
:returns: (str) the whole object contents :returns: (str) the whole object contents
...@@ -900,7 +918,8 @@ class PithosClient(PithosRestClient): ...@@ -900,7 +918,8 @@ class PithosClient(PithosRestClient):
blockhash, blockhash,
total_size, total_size,
hash_list, hash_list,
remote_hashes) = self._get_remote_blocks_info(obj, **restargs) remote_hashes) = self._get_remote_blocks_info(
obj, hashmap=hashmap, **restargs)
headers.update(restargs.pop('headers')) headers.update(restargs.pop('headers'))
assert total_size >= 0 assert total_size >= 0
...@@ -938,6 +957,54 @@ class PithosClient(PithosRestClient): ...@@ -938,6 +957,54 @@ class PithosClient(PithosRestClient):
for thread in activethreads(): for thread in activethreads():
thread.join() thread.join()
def stream_down(self, obj, dst, buffer_blocks=4, **kwargs):
Download obj to dst as a stream. Buffer-sized chunks are downloaded
sequentially, but the blocks of each chunk are downloaded
asynchronously, using the download_to_string method
:param obj: (str) the remote object
:param dst: a file descriptor allowing sequential writing
:param buffer_blocks: (int) the size of the buffer in blocks. If it is
1, all blocks will be downloaded sequentially
:param kwargs: (dict) keyword arguments for download_to_string method
buffer_blocks = 1 if buffer_blocks < 2 else buffer_blocks
hashmap = kwargs.get('hashmap', None)
range_str = kwargs.pop('range_str', None)
if hashmap is None:
# Clean kwargs if it contains hashmap=None
kwargs.pop('hashmap', None)
# Get new hashmap
hashmap = kwargs['hashmap'] = self.get_object_hashmap(
kwargs.get('version', None),
kwargs.get('if_match', None),
kwargs.get('if_none_match', None),
kwargs.get('if_modified_since', None),
kwargs.get('if_unmodified_since', None))
block_size, obj_size = int(hashmap['block_size']), hashmap['bytes']
buffer_size = buffer_blocks * block_size
event = None
def finish_event(e):
"""Blocking: stop until e is finished or raise error"""
if e is not None:
if e.isAlive():
if e.exception:
raise e.exception
for chunk_number in range(1 + obj_size // buffer_size):
start = chunk_number * buffer_size
end = start + buffer_size
end = (obj_size if (end > obj_size) else end) - 1
kwargs['range_str'] = _range_up(start, end, obj_size, range_str)
buffer_ = self.download_to_string(obj, **kwargs)
event = SilentEvent(_dump_buffer, buffer_, dst)
# Command Progress Bar method # Command Progress Bar method
def _cb_next(self, step=1): def _cb_next(self, step=1):
if hasattr(self, 'progress_bar_gen'): if hasattr(self, 'progress_bar_gen'):
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment