Commit 435008b6 authored by Stavros Sachtouris's avatar Stavros Sachtouris
Browse files

Asynchronous uploads with gevent-greenlet

Merge with koukis-fix
parent b08af3e6
......@@ -68,6 +68,10 @@ The order of commands is important, it will be preserved in the help output.
from __future__ import print_function
import gevent.monkey
#Monkey-patch everything for gevent early on
gevent.monkey.patch_all()
import inspect
import logging
import sys
......@@ -821,7 +825,7 @@ class store_upload(_store_container_command):
with open(local_path) as f:
hash_cb = self.progress('Calculating block hashes')
upload_cb = self.progress('Uploading blocks')
self.client.upload_object(remote_path, f, hash_cb=hash_cb, upload_cb=upload_cb)
self.client.async_upload_object(remote_path, f, hash_cb=hash_cb, upload_cb=upload_cb)
@command(api='storage')
class store_download(_store_container_command):
......
......@@ -85,7 +85,6 @@ class Client(object):
data = kwargs.pop('data', None)
self.headers.setdefault('X-Auth-Token', self.token)
#publish = kwargs.pop('publish', None)
if 'json' in kwargs:
data = json.dumps(kwargs.pop('json'))
......@@ -142,7 +141,8 @@ class Client(object):
def move(self, path, **kwargs):
return self.request('move', path, **kwargs)
#TODO These should go away
# clients class should not be aware of its instances
from .compute import ComputeClient as compute
from .image import ImageClient as image
from .storage import StorageClient as storage
......
......@@ -31,8 +31,12 @@
# interpreted as representing official policies, either expressed
# or implied, of GRNET S.A.
import hashlib
import os
import gevent
import gevent.monkey
# Monkey-patch everything for gevent early on
gevent.monkey.patch_all()
import hashlib, os, gevent.pool
from time import time
......@@ -48,6 +52,11 @@ def pithos_hash(block, blockhash):
class PithosClient(StorageClient):
"""GRNet Pithos API client"""
def __init__(self, base_url, token, account=None, container = None):
super(PithosClient, self).__init__(base_url, token,
account = account, container = container)
self.async_pool = None
def account_head(self, until = None,
if_modified_since=None, if_unmodified_since=None, *args, **kwargs):
""" Full Pithos+ HEAD at account level
......@@ -568,12 +577,96 @@ class PithosClient(StorageClient):
def purge_container(self):
self.container_delete(until=unicode(time()))
def put_block_async(self, data, hash):
class SilentGreenlet(gevent.Greenlet):
def _report_error(self, exc_info):
_stderr = sys._stderr
try:
sys.stderr = StringIO()
gevent.Greenlet._report_error(self, exc_info)
finally:
sys.stderr = _stderr
POOL_SIZE = 5
if self.async_pool is None:
self.async_pool = gevent.pool.Pool(size=POOL_SIZE)
g = SilentGreenlet(self.put_block, data, hash)
self.async_pool.start(g)
return g
def put_block(self, data, hash):
r = self.container_post(update=True, content_type='application/octet-stream',
content_length=len(data), data=data, format='json')
self.reset_headers()
assert r.json[0] == hash, 'Local hash does not match server'
def async_upload_object(self, object, f, size=None, hash_cb=None,
upload_cb=None):
"""Like upload_object object but it sends blocks of data asynchronously
using geven/greenlet
"""
self.assert_container()
meta = self.get_container_info(self.container)
blocksize = int(meta['x-container-block-size'])
blockhash = meta['x-container-block-hash']
size = size if size is not None else os.fstat(f.fileno()).st_size
nblocks = 1 + (size - 1) // blocksize
hashes = []
map = {}
offset = 0
if hash_cb:
hash_gen = hash_cb(nblocks)
hash_gen.next()
for i in range(nblocks):
block = f.read(min(blocksize, size - offset))
bytes = len(block)
hash = pithos_hash(block, blockhash)
hashes.append(hash)
map[hash] = (offset, bytes)
offset += bytes
if hash_cb:
hash_gen.next()
assert offset == size
hashmap = dict(bytes=size, hashes=hashes)
r = self.object_put(object, format='json', hashmap=True,
content_type='application/octet-stream', json=hashmap, success=(201, 409))
self.reset_headers()
if r.status_code == 201:
return
missing = r.json
if upload_cb:
upload_gen = upload_cb(len(missing))
upload_gen.next()
flying = []
for hash in missing:
offset, bytes = map[hash]
f.seek(offset)
data = f.read(bytes)
#self.put_block(data, hash)
r = self.put_block_async(data, hash)
flying.append(r)
for r in flying:
if r.ready():
if r.exception:
raise r.exception
if upload_cb:
upload_gen.next()
flying = [r for r in flying if not r.ready()]
gevent.joinall(flying)
self.object_put(object, format='json', hashmap=True,
content_type='application/octet-stream', json=hashmap, success=201)
def upload_object(self, object, f, size=None, hash_cb=None,
upload_cb=None):
"""Create an object by uploading only the missing blocks
......@@ -613,9 +706,9 @@ class PithosClient(StorageClient):
assert offset == size
hashmap = dict(bytes=size, hashes=hashes)
content_type = 'application/octet-stream'
r = self.object_put(object, format='json', hashmap=True,
content_type=content_type, json=hashmap, success=(201, 409))
content_type='application/octet-stream', json=hashmap, success=(201, 409))
self.reset_headers()
if r.status_code == 201:
return
......@@ -631,11 +724,12 @@ class PithosClient(StorageClient):
f.seek(offset)
data = f.read(bytes)
self.put_block(data, hash)
r = self.put_block(data, hash)
if upload_cb:
upload_gen.next()
r = self.object_put(object, format='json', hashmap=True,
content_type=content_type, json=hashmap, success=201)
self.object_put(object, format='json', hashmap=True,
content_type='application/octet-stream', json=hashmap, success=201)
def set_account_group(self, group, usernames):
self.account_post(update=True, groups = {group:usernames})
......
......@@ -31,11 +31,16 @@
# interpreted as representing official policies, either expressed
# or implied, of GRNET S.A.
import gevent.monkey
#Monkey-patch everything for gevent early on
gevent.monkey.patch_all()
import unittest
import time, datetime, os, sys
from shutil import copyfile
from kamaki.clients import pithos, cyclades, ClientError
from kamaki.clients import ClientError
from kamaki.clients.pithos import PithosClient as pithos
class testPithos(unittest.TestCase):
"""Set up a Pithos+ thorough test"""
......@@ -103,7 +108,7 @@ class testPithos(unittest.TestCase):
pass
self.client.container=''
def atest_account_head(self):
def test_account_head(self):
"""Test account_HEAD"""
r = self.client.account_head()
self.assertEqual(r.status_code, 204)
......@@ -122,7 +127,7 @@ class testPithos(unittest.TestCase):
self.client.reset_headers()
self.assertNotEqual(r1.status_code, r2.status_code)
def atest_account_get(self):
def test_account_get(self):
"""Test account_GET"""
r = self.client.account_get()
self.assertEqual(r.status_code, 200)
......@@ -155,7 +160,7 @@ class testPithos(unittest.TestCase):
self.client.reset_headers()
self.assertNotEqual(r1.status_code, r2.status_code)
def atest_account_post(self):
def test_account_post(self):
"""Test account_POST"""
r = self.client.account_post()
self.assertEqual(r.status_code, 202)
......@@ -192,7 +197,7 @@ class testPithos(unittest.TestCase):
you don't have permitions to modify those at account level
"""
def atest_container_head(self):
def test_container_head(self):
"""Test container_HEAD"""
self.client.container = self.c1
......@@ -213,7 +218,7 @@ class testPithos(unittest.TestCase):
self.client.reset_headers()
self.assertNotEqual(r1.status_code, r2.status_code)
def atest_container_get(self):
def test_container_get(self):
"""Test container_GET"""
self.client.container = self.c1
......@@ -276,7 +281,7 @@ class testPithos(unittest.TestCase):
self.assertNotEqual(r1.status_code, r2.status_code)
self.client.reset_headers()
def atest_container_put(self):
def test_container_put(self):
"""Test container_PUT"""
self.client.container = self.c2
......@@ -328,13 +333,15 @@ class testPithos(unittest.TestCase):
self.client.del_container_meta(self.client.container)
def atest_container_post(self):
def test_container_post(self):
"""Test container_POST"""
self.client.container = self.c2
"""Simple post"""
r = self.client.container_post()
self.assertEqual(r.status_code, 202)
"""post meta"""
self.client.set_container_meta({'m1':'v1', 'm2':'v2'})
r = self.client.get_container_meta(self.client.container)
self.assertTrue(r.has_key('x-container-meta-m1'))
......@@ -343,6 +350,7 @@ class testPithos(unittest.TestCase):
self.assertEqual(r['x-container-meta-m2'], 'v2')
self.client.reset_headers()
"""post/2del meta"""
r = self.client.del_container_meta('m1')
r = self.client.set_container_meta({'m2':'v2a'})
r = self.client.get_container_meta(self.client.container)
......@@ -351,29 +359,28 @@ class testPithos(unittest.TestCase):
self.assertEqual(r['x-container-meta-m2'], 'v2a')
self.client.reset_headers()
"""check quota"""
r = self.client.get_container_quota(self.client.container)
cquota = r.values()[0]
newquota = 2*int(cquota)
self.client.reset_headers()
r = self.client.set_container_quota(newquota)
r = self.client.get_container_quota(self.client.container)
xquota = int(r.values()[0])
self.assertEqual(newquota, xquota)
self.client.reset_headers()
r = self.client.set_container_quota(cquota)
r = self.client.get_container_quota(self.client.container)
xquota = r.values()[0]
self.assertEqual(cquota, xquota)
self.client.reset_headers()
"""Check versioning"""
self.client.set_container_versioning('auto')
r = self.client.get_container_versioning(self.client.container)
nvers = r.values()[0]
self.assertEqual('auto', nvers)
self.client.reset_headers()
self.client.set_container_versioning('none')
r = self.client.get_container_versioning(self.client.container)
nvers = r.values()[0]
......@@ -383,21 +390,17 @@ class testPithos(unittest.TestCase):
"""put_block uses content_type and content_length to
post blocks of data 2 container. All that in upload_object"""
"""Change a file at fs"""
self.fname = 'f'+unicode(self.now)
copyfile('pirifi.237M', self.fname)
newf = open(self.fname, 'a')
newf.write('add:'+unicode(self.now)+'\n')
newf.close()
self.create_large_file(1024*1024*100, 'l100M.'+unicode(self.now))
"""Upload it at a directory in container"""
self.client.create_directory('dir')
self.client.reset_headers()
newf = open(self.fname, 'r')
self.client.upload_object('/dir/sample.file', newf)
self.client.async_upload_object('/dir/sample.file', newf)
self.client.reset_headers()
newf.close()
"""Check if file has been uploaded"""
r = self.client.get_object_info('/dir/sample.file')
self.assertTrue(int(r['content-length']) > 248209936)
self.assertTrue(int(r['content-length']) > 100000000)
"""WTF is tranfer_encoding? What should I check about th** s**t? """
#TODO
......@@ -413,7 +416,7 @@ class testPithos(unittest.TestCase):
r = self.client.del_container_meta('m2')
self.client.reset_headers()
def atest_container_delete(self):
def test_container_delete(self):
"""Test container_DELETE"""
"""Fail to delete a non-empty container"""
......@@ -433,7 +436,7 @@ class testPithos(unittest.TestCase):
self.assertEqual(r.status_code, 204)
self.client.reset_headers()
def atest_object_head(self):
def test_object_head(self):
"""Test object_HEAD"""
self.client.container = self.c2
obj = 'test'
......@@ -467,7 +470,7 @@ class testPithos(unittest.TestCase):
self.assertNotEqual(r1.status_code, r2.status_code)
self.client.reset_headers()
def atest_object_get(self):
def test_object_get(self):
"""Test object_GET"""
self.client.container = self.c1
obj = 'test'
......@@ -519,7 +522,7 @@ class testPithos(unittest.TestCase):
self.assertNotEqual(r1.status_code, r2.status_code)
self.client.reset_headers()
def atest_object_put(self):
def test_object_put(self):
"""test object_PUT"""
self.client.container = self.c2
......@@ -648,7 +651,7 @@ class testPithos(unittest.TestCase):
"""Some problems with transfer-encoding?"""
def atest_object_copy(self):
def test_object_copy(self):
"""test object_COPY"""
self.client.container=self.c2
obj = 'test2'
......@@ -734,7 +737,7 @@ class testPithos(unittest.TestCase):
self.assertTrue(r.has_key('x-object-public'))
self.client.reset_headers()
def atest_object_move(self):
def test_object_move(self):
"""Test object_MOVE"""
self.client.container= self.c2
obj = 'test2'
......@@ -816,7 +819,7 @@ class testPithos(unittest.TestCase):
self.assertTrue(r.has_key('x-object-public'))
self.client.reset_headers()
def atest_object_post(self):
def test_object_post(self):
"""Test object_POST"""
self.client.container=self.c2
obj = 'test2'
......@@ -949,7 +952,7 @@ class testPithos(unittest.TestCase):
"""We need to check transfer_encoding """
def atest_object_delete(self):
def test_object_delete(self):
"""Test object_DELETE"""
self.client.container=self.c2
obj = 'test2'
......@@ -972,31 +975,31 @@ class testPithos(unittest.TestCase):
r = self.client.object_get(obj, success=(200, 404))
self.assertEqual(r.status_code, 404)
def test_large_file_operations(self):
def atest_large_file_operations(self):
"""Test large file operations"""
self.client.container = self.c1
pass
"""Create a large (~6G) file at fs"""
self.fname = 'largefile'+unicode(self.now)
fsize = 1024*1024
def create_large_file(self, size, name):
"""Create a large file at fs"""
self.fname = name
import random
random.seed(self.now)
f = open(self.fname, 'w')
sys.stdout.write('\n\tcreating large file 0%')
for hobyte_id in range(fsize):
sss = 'hobt%s'%random.randint(0, 100)
f.write(sss*1024)
if 0 == hobyte_id%10485:
sys.stdout.write(' create random file %s of size %s'%(name, size)+' 0%')
for hobyte_id in range(size/8):
sss = 'hobt%s'%random.randint(1000, 9999)
f.write(sss)
if 0 == (hobyte_id*800)%size:
f.write('\n')
sys.stdout.write('\b\b')
prs = hobyte_id//10485
prs = (hobyte_id*800)//size
if prs > 10:
sys.stdout.write('\b')
sys.stdout.write('%s'%prs+'%')
sys.stdout.flush()
print('\n')
print('\b\b\b100%')
f.close()
""""""
class testCyclades(unittest.TestCase):
......@@ -1016,7 +1019,6 @@ if __name__ == '__main__':
suiteFew = unittest.TestSuite()
#kamaki/pithos.py
"""
suiteFew.addTest(testPithos('test_account_head'))
suiteFew.addTest(testPithos('test_account_get'))
suiteFew.addTest(testPithos('test_account_post'))
......@@ -1034,6 +1036,7 @@ if __name__ == '__main__':
suiteFew.addTest(testPithos('test_object_delete'))
"""
suiteFew.addTest(testPithos('test_large_file_operations'))
"""
#kamaki/cyclades.py
#suiteFew.addTest(testCyclades('atest_list_servers'))
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment