Commit 3f8091d6 authored by Stavros Sachtouris's avatar Stavros Sachtouris
Browse files

Merge branch 'feature-store' into develop

parents 2aca7f5e e5c76b1a
......@@ -35,12 +35,35 @@ import time
import datetime
from os import urandom
from tempfile import NamedTemporaryFile
from string import ascii_letters
from kamaki.clients import livetest, ClientError
from kamaki.clients.pithos import PithosClient
from kamaki.clients.astakos import AstakosClient
def chargen():
"""10 + 2 * 26 + 26 = 88"""
while True:
for CH in xrange(10):
yield '%s' % CH
for CH in ascii_letters:
yield CH
for CH in '~!@#$%^&*()_+`-=:";|<>?,./':
yield CH
def sample_block(f, block):
block_size = 4 * 1024 * 1024
f.seek(block * block_size)
ch = [f.read(1)]
f.seek(block_size / 2, 1)
ch.append(f.read(1))
f.seek((block + 1) * block_size - 1)
ch.append(f.read(1))
return ch
class Pithos(livetest.Generic):
files = []
......@@ -654,6 +677,20 @@ class Pithos(livetest.Generic):
dnl_f.seek(pos)
self.assertEqual(src_f.read(10), dnl_f.read(10))
"""Upload a boring file"""
trg_fname = 'boringfile_%s' % self.now
src_f = self.create_boring_file(42)
print('\tUploading boring file...')
self.client.upload_object(trg_fname, src_f)
print('\tDownloading boring file...')
self.files.append(NamedTemporaryFile())
dnl_f = self.files[-1]
self.client.download_object(trg_fname, dnl_f)
print('\tCheck if files match...')
for i in range(42):
self.assertEqual(sample_block(src_f, i), sample_block(dnl_f, i))
def test_object_put(self):
"""Test object_PUT"""
self._test_object_put()
......@@ -1247,3 +1284,18 @@ class Pithos(livetest.Generic):
bytelist)
f.seek(0)
return f
def create_boring_file(self, num_of_blocks):
"""Create a file with some blocks being the same"""
self.files.append(NamedTemporaryFile())
tmpFile = self.files[-1]
block_size = 4 * 1024 * 1024
print('\n\tCreate boring file of %s blocks' % num_of_blocks)
chars = chargen()
while num_of_blocks:
fslice = 3 if num_of_blocks > 3 else num_of_blocks
tmpFile.write(fslice * block_size * chars.next())
num_of_blocks -= fslice
print('\t\tDone')
tmpFile.seek(0)
return tmpFile
......@@ -408,7 +408,11 @@ class PithosClient(PithosRestClient):
#assert total_size/blocksize + 1 == len(hashmap['hashes'])
map_dict = {}
for i, h in enumerate(hashmap['hashes']):
map_dict[h] = i
# map_dict[h] = i CHAGE
if h in map_dict:
map_dict[h].append(i)
else:
map_dict[h] = [i]
return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
def _dump_blocks_sync(
......@@ -438,62 +442,65 @@ class PithosClient(PithosRestClient):
h.update(block.strip('\x00'))
return hexlify(h.digest())
def _thread2file(self, flying, local_file, offset=0, **restargs):
def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
"""write the results of a greenleted rest call to a file
:param offset: the offset of the file up to blocksize
- e.g. if the range is 10-100, all blocks will be written to
normal_position - 10
"""
finished = []
for i, (start, g) in enumerate(flying.items()):
if not g.isAlive():
if g.exception:
raise g.exception
block = g.value.content
local_file.seek(start - offset)
for i, (key, g) in enumerate(flying.items()):
if g.isAlive():
continue
if g.exception:
raise g.exception
block = g.value.content
for block_start in blockids[key]:
local_file.seek(block_start + offset)
local_file.write(block)
self._cb_next()
finished.append(flying.pop(start))
flying.pop(key)
blockids.pop(key)
local_file.flush()
return finished
def _dump_blocks_async(
self, obj, remote_hashes, blocksize, total_size, local_file,
blockhash=None, resume=False, filerange=None, **restargs):
file_size = fstat(local_file.fileno()).st_size if resume else 0
flying = {}
finished = []
flying = dict()
blockid_dict = dict()
offset = 0
if filerange is not None:
rstart = int(filerange.split('-')[0])
offset = rstart if blocksize > rstart else rstart % blocksize
self._init_thread_limit()
for block_hash, blockid in remote_hashes.items():
start = blocksize * blockid
if start < file_size and block_hash == self._hash_from_file(
local_file, start, blocksize, blockhash):
self._cb_next()
continue
self._watch_thread_limit(flying.values())
finished += self._thread2file(
flying,
local_file,
offset,
**restargs)
end = total_size - 1 if start + blocksize > total_size\
else start + blocksize - 1
(start, end) = _range_up(start, end, filerange)
if start == end:
self._cb_next()
continue
restargs['async_headers'] = {'Range': 'bytes=%s-%s' % (start, end)}
flying[start] = self._get_block_async(obj, **restargs)
for block_hash, blockids in remote_hashes.items():
blockids = [blk * blocksize for blk in blockids]
unsaved = [blk for blk in blockids if not (
blk < file_size and block_hash == self._hash_from_file(
local_file, blk, blocksize, blockhash))]
self._cb_next(len(blockids) - len(unsaved))
if unsaved:
key = unsaved[0]
self._watch_thread_limit(flying.values())
self._thread2file(
flying, blockid_dict, local_file, offset,
**restargs)
end = total_size - 1 if key + blocksize > total_size\
else key + blocksize - 1
start, end = _range_up(key, end, filerange)
if start == end:
self._cb_next()
continue
restargs['async_headers'] = {
'Range': 'bytes=%s-%s' % (start, end)}
flying[key] = self._get_block_async(obj, **restargs)
blockid_dict[key] = unsaved
for thread in flying.values():
thread.join()
finished += self._thread2file(flying, local_file, offset, **restargs)
self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
def download_object(
self, obj, dst,
......@@ -543,7 +550,7 @@ class PithosClient(PithosRestClient):
assert total_size >= 0
if download_cb:
self.progress_bar_gen = download_cb(len(remote_hashes))
self.progress_bar_gen = download_cb(len(hash_list))
self._cb_next()
if dst.isatty():
......@@ -572,10 +579,11 @@ class PithosClient(PithosRestClient):
self._complete_cb()
#Command Progress Bar method
def _cb_next(self):
def _cb_next(self, step=1):
if hasattr(self, 'progress_bar_gen'):
try:
self.progress_bar_gen.next()
for i in xrange(step):
self.progress_bar_gen.next()
except:
pass
......
......@@ -1153,7 +1153,6 @@ class PithosClient(TestCase):
self.assertEqual(GET.mock_calls[-1][2][k], v)
# ALl options on no tty
def foo():
return True
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment