Commit 5b263ba2 authored by Stavros Sachtouris's avatar Stavros Sachtouris
Browse files

Implement kamakicon, use it

kamakicon: an httplib/ObjectPool based connection class
parent b1713259
......@@ -34,8 +34,8 @@
import json
import logging
from .connection import HTTPConnectionError
from .connection.request import HTTPRequest
#from .connection.kamakicon import KamakiHTTPConnection
#from .connection.request import HTTPRequest
from .connection.kamakicon import KamakiHTTPConnection
sendlog = logging.getLogger('clients.send')
recvlog = logging.getLogger('clients.recv')
......@@ -50,7 +50,7 @@ class ClientError(Exception):
class Client(object):
def __init__(self, base_url, token, http_client=HTTPRequest()):
def __init__(self, base_url, token, http_client=KamakiHTTPConnection()):
self.base_url = base_url
self.token = token
self.headers = {}
......@@ -98,11 +98,11 @@ class Client(object):
#kwargs.setdefault('verify', False) # Disable certificate verification
self.http_client.url = self.base_url + path
r = self.http_client.perform_request(method=method, data=data, binary=binary)
r = self.http_client.perform_request(method=method, data=data)
#r = requests.request(method, url, headers=self.headers, data=data, **kwargs)
req = self.http_client
sendlog.info('%s %s', req.method, req.url)
sendlog.info('%s %s', method, req.url)
for key, val in req.headers.items():
sendlog.info('%s: %s', key, val)
sendlog.info('')
......
......@@ -31,31 +31,10 @@
# interpreted as representing official policies, either expressed
# or implied, of GRNET S.A.
from .pool import ObjectPool
POOL_SIZE=8
class HTTPResponsePool(ObjectPool):
def __init__(self, netloc, size=POOL_SIZE):
super(HTTPResponsePool, self).__init__(size=size)
self.netloc = netloc
def _pool_create(self):
resp = HTTPResponse()
resp._pool = self
return resp
def _pool_cleanup(self, resp):
resp._get_response()
return True
class HTTPResponse(object):
def __init__(self, request=None, prefetched=False):
self.request=request
if prefetched:
self = request.response
self.prefetched = prefetched
def _get_response(self):
......
......@@ -35,6 +35,7 @@ from urlparse import urlparse
from .pool.http import get_http_connection
from . import HTTPConnection, HTTPResponse, HTTPConnectionError
from json import loads
from time import sleep
from httplib import ResponseNotReady
......@@ -42,25 +43,30 @@ from httplib import ResponseNotReady
class KamakiHTTPResponse(HTTPResponse):
def _get_response(self):
print('KamakiHTTPResponse:should I get response?')
if self.prefetched:
print('\tKamakiHTTPResponse: no, I have already done that before')
return
print('\tKamakiHTTPResponse: yes, pls')
r = self.request.getresponse()
ready = False
while not ready:
try:
r = self.request.getresponse()
except ResponseNotReady:
sleep(0.2)
continue
break
self.prefetched = True
headers = {}
for k,v in r.getheaders():
headers.update({k:v})
self.headers = headers
self.content = r.read(r.length)
self.content = r.read()
self.status_code = r.status
self.status = r.reason
print('KamakiHTTPResponse: Niiiiice')
self.request.close()
@property
def text(self):
_get_response()
self._get_response()
return self._content
@text.setter
def test(self, v):
......@@ -68,8 +74,7 @@ class KamakiHTTPResponse(HTTPResponse):
@property
def json(self):
_get_response()
from json import loads
self._get_response()
try:
return loads(self._content)
except ValueError as err:
......@@ -78,65 +83,32 @@ class KamakiHTTPResponse(HTTPResponse):
def json(self, v):
pass
class KamakiHTTPConnection(HTTPConnection):
url = None
scheme = None
netloc = None
method = None
data = None
headers = None
def release(self):
if not self.prefetched:
self.request.close()
scheme_ports = {
'http': '80',
'https': '443',
}
def _load_connection_settings(self, url=None, scheme=None, params=None, headers=None, host=None,
port=None, method=None):
if params is not None:
self.params = params
if headers is not None:
self.headers = headers
if url is None:
url = self.url
if host is None or scheme is None:
p = urlparse(url)
netloc = p.netloc
if not netloc:
netloc = 'localhost'
scheme = p.scheme
if not scheme:
scheme = 'http'
param_str = ''
for i,(key, val) in enumerate(self.params.items()):
param_str = ('?' if i == 0 else '&') + unicode(key)
if val is not None:
param_str+= '='+unicode(val)
url = p.path + param_str
else:
host = host
port = port if port is not None else self.scheme_ports[scheme]
#NOTE: we force host:port as canonical form,
# lest we have a cache miss 'host' vs 'host:80'
netloc = "%s%s" % (host, port)
class KamakiHTTPConnection(HTTPConnection):
self.netloc = netloc
self.url = url #if url in (None, '') or url[0] != '/' else url[1:]
self.scheme = scheme
def _retrieve_connection_info(self):
""" return (scheme, netloc, url?with&params) """
url = self.url
for i,(key, val) in enumerate(self.params.items()):
param_str = ('?' if i == 0 else '&') + unicode(key)
if val is not None:
param_str+= '='+unicode(val)
url += param_str
if method is not None:
self.method = method
parsed = urlparse(self.url)
self.url = url
return (parsed.scheme, parsed.netloc)
def perform_request(self, url=None, params=None, headers=None, method=None, host=None,
port=None, data=None):
self._load_connection_settings(url=url, params=params, headers=headers, host=host,
port=port, method=method)
print('---> %s %s %s %s %s'%(self.method, self.scheme, self.netloc, self.url, self.headers))
conn = get_http_connection(netloc=self.netloc, scheme=self.scheme)
def perform_request(self, method=None, data=None):
(scheme, netloc) = self._retrieve_connection_info()
#get connection from pool
conn = get_http_connection(netloc=netloc, scheme=scheme)
try:
conn.request(self.method, self.url, headers=self.headers, body=data)
conn.request(method = method.upper(), url=self.url, headers=self.headers, body=data)
except:
conn.close()
raise
......
......@@ -115,7 +115,13 @@ def get_http_connection(netloc=None, scheme='http'):
def main():
#cpool = HTTPConnectionPool('https', 'pithos.okeanos.io/v1', size=8)
headers={'X-Auth-Token':'0TpoyAXqJSPxLdDuZHiLOA=='}
c = get_http_connection('pithos.okeanos.io', 'https')
c.request(method='get', url='https://pithos.okeanos.io/v1/saxtouri@admin.grnet.gr?format=json',
headers=headers)
r = c.getresponse()
print('HEADERS:'+unicode(r.getheaders()))
print('BODY:'+unicode(r.read()))
if __name__ == '__main__':
......
......@@ -32,7 +32,7 @@
# or implied, of GRNET S.A.
import requests
from . import HTTPConnection, HTTPResponse, HTTPConnectionError, HTTPResponsePool
from . import HTTPConnection, HTTPResponse, HTTPConnectionError
from .pool import ObjectPool
from urlparse import urlparse
......@@ -43,7 +43,10 @@ requests.Response.status = property(_status)
class HTTPRequestsResponse(HTTPResponse):
_get_content_only=False
def __init__(self, request=None, prefetched=False):
super(HTTPRequestsResponse, self).__init__(request=request, prefetched=prefetched)
if prefetched:
self = request.response
def _get_response(self):
if self.prefetched:
......@@ -54,8 +57,12 @@ class HTTPRequestsResponse(HTTPResponse):
self.status = r.status
self.status_code = r.status_code
self.content = r.content if hasattr(r, 'content') else None
self.text = None if self._get_content_only else r.text
self.json = None if self._get_content_only else r.json
from json import loads
try:
self.json = loads(r.content)#None if self._get_content_only else r.json
except ValueError:
self.json = None
self.text = r.content#None if self._get_content_only else r.text
self.exception = r.exception if hasattr(r, 'exception') else None
except requests.ConnectionError as err:
raise HTTPConnectionError('Connection error', status=651, details=err.message)
......@@ -74,7 +81,15 @@ class HTTPRequestsResponse(HTTPResponse):
if hasattr(self, '_pool'):
self._pool.pool_put(self)
class HTTPRequestsResponsePool(HTTPResponsePool):
POOL_SIZE=8
class HTTPRequestsResponsePool(ObjectPool):
def __init__(self, netloc, size=POOL_SIZE):
super(ObjectPool, self).__init__(size=size)
self.netloc = netloc
def _pool_cleanup(self, resp):
resp._get_response()
return True
@classmethod
def key(self, full_url):
......@@ -102,8 +117,7 @@ class HTTPRequest(HTTPConnection):
respool = self._pools[pool_key]
return respool.pool_get()
def perform_request(self, method=None, url=None, params=None, headers=None, data=None,
binary=False):
def perform_request(self, method=None, url=None, params=None, headers=None, data=None):
"""perform a request
Example: method='PUT' url='https://my.server:8080/path/to/service'
params={'update':None, 'format':'json'} headers={'X-Auth-Token':'s0m3t0k3n=='}
......@@ -130,6 +144,4 @@ class HTTPRequest(HTTPConnection):
self._response_object = requests.request(self.method, self.url, headers=self.headers, data=data,
verify=self.verify, prefetch = False)
res.request = self._response_object.request
if binary:
res._get_content_only = True
return res
......@@ -39,6 +39,7 @@ gevent.monkey.patch_all()
import hashlib, os, gevent.pool
from time import time
from datetime import datetime
from .storage import StorageClient, ClientError
from .utils import path4url, prefix_keys, filter_in, filter_out, list2str
......@@ -604,7 +605,8 @@ class PithosClient(StorageClient):
return self.delete(path, *args, success=success, **kwargs)
def purge_container(self):
self.container_delete(until=unicode(time()))
r = self.container_delete(until=unicode(time()))
r.release()
def upload_object_unchunked(self, obj, f, withHashFile = False, size=None, etag=None,
content_encoding=None, content_disposition=None, content_type=None, sharing=None,
......@@ -625,9 +627,10 @@ class PithosClient(StorageClient):
from StringIO import StringIO
f = StringIO(data)
data = f.read(size) if size is not None else f.read()
self.object_put(obj, data=data, etag=etag, content_encoding=content_encoding,
r = self.object_put(obj, data=data, etag=etag, content_encoding=content_encoding,
content_disposition=content_disposition, content_type=content_type, permitions=sharing,
public=public, success=201)
r.release()
def put_block_async(self, data, hash):
class SilentGreenlet(gevent.Greenlet):
......@@ -655,9 +658,10 @@ class PithosClient(StorageClient):
content_disposition=None, content_type=None, sharing=None, public=None):
self.assert_container()
obj_content_type = 'application/octet-stream' if content_type is None else content_type
self.object_put(obj, content_length=0, etag=etag, content_encoding=content_encoding,
r = self.object_put(obj, content_length=0, etag=etag, content_encoding=content_encoding,
content_disposition=content_disposition, content_type=content_type, permitions=sharing,
public=public, manifest='%s/%s'%(self.container,obj))
r.release()
def upload_object(self, object, f, size=None, hash_cb=None, upload_cb=None, etag=None,
content_encoding=None, content_disposition=None, content_type=None, sharing=None,
......@@ -674,7 +678,7 @@ class PithosClient(StorageClient):
size = size if size is not None else os.fstat(f.fileno()).st_size
nblocks = 1 + (size - 1) // blocksize
hashes = []
map = {}
hmap = {}
offset = 0
......@@ -687,7 +691,7 @@ class PithosClient(StorageClient):
bytes = len(block)
hash = pithos_hash(block, blockhash)
hashes.append(hash)
map[hash] = (offset, bytes)
hmap[hash] = (offset, bytes)
offset += bytes
if hash_cb:
hash_gen.next()
......@@ -710,11 +714,11 @@ class PithosClient(StorageClient):
if upload_cb:
upload_gen = upload_cb(len(missing))
upload_gen.next()
r .release()
flying = []
r .release()
for hash in missing:
offset, bytes = map[hash]
offset, bytes = hmap[hash]
f.seek(offset)
data = f.read(bytes)
r = self.put_block_async(data, hash)
......@@ -729,8 +733,9 @@ class PithosClient(StorageClient):
flying = [r for r in flying if not r.ready()]
gevent.joinall(flying)
self.object_put(object, format='json', hashmap=True, content_type=obj_content_type,
r = self.object_put(object, format='json', hashmap=True, content_type=obj_content_type,
json=hashmap, success=201)
r.release()
def download_object(self, obj, f, download_cb=None, version=None, overide=False,
range=None, if_match=None, if_none_match=None, if_modified_since=None,
......@@ -776,8 +781,8 @@ class PithosClient(StorageClient):
download_gen.next()
#load local file existing hashmap
hash_dict = {}
if islocalfile:
hash_dict = {}
from os import path
if path.exists(f.name):
from binascii import hexlify
......@@ -797,6 +802,7 @@ class PithosClient(StorageClient):
status=600)
#download and save/print
flying = []
for i, h in enumerate(map):
#if not islocalfile and h in hash_dict:
if h in hash_dict:
......@@ -813,19 +819,75 @@ class PithosClient(StorageClient):
if range is not None and end > custom_end:
end = custom_end
data_range = 'bytes=%s-%s'%(start, end)
r = self.object_get(obj, data_range=data_range, success=(200, 206), version=version,
if_etag_match=if_match, if_etag_not_match=if_none_match, binary=True,
if_modified_since=if_modified_since, if_unmodified_since=if_unmodified_since)
result_array = []
if islocalfile:
f.seek(start)
f.write(r.content)
f.flush()
r.release()
#f.write(data.text.encode('utf-8'))
if overide and not islocalfile:
handler = self._get_block_async(obj, data_range=data_range, version=version,
if_etag_match=if_match, if_etag_not_match=if_none_match,
if_modified_since=if_modified_since, if_unmodified_since=if_unmodified_since)
flying.append({'handler':handler, 'start':start})
newflying = []
for v in flying:
h = v['handler']
if h.ready():
if h.exception:
h.release()
raise h.exception
f.seek(v['start'])
f.write(h.value.content)
f.flush()
h.value.release()
else:
newflying.append(v)
flying = newflying
else:
r = self._get_block(obj, data_range=data_range, version=version,
if_etag_match=if_match, if_etag_not_match=if_none_match,
if_modified_since=if_modified_since, if_unmodified_since=if_unmodified_since)
f.write(r.content)
f.flush()
r.release()
#write the last results and exit
if islocalfile:
from time import sleep
while len(flying) > 0:
result_array=[]
newflying = []
for v in flying:
h = v['handler']
if h.ready():
if h.exception:
h.release()
raise h.exception
f.seek(v['start'])
f.write(h.value.content)
f.flush()
h.value.release()
else:
sleep(.2)
newflying.append(v)
flying = newflying
f.truncate(total_size)
def _get_block(self, obj, **kwargs):
r = self.object_get(obj, success=(200, 206), binary=True, **kwargs)
return r
def _get_block_async(self, obj, **kwargs):
class SilentGreenlet(gevent.Greenlet):
def _report_error(self, exc_info):
_stderr = sys._stderr
try:
sys.stderr = StringIO()
gevent.Greenlet._report_error(self, exc_info)
finally:
sys.stderr = _stderr
POOL_SIZE = 5
if self.async_pool is None:
self.async_pool = gevent.pool.Pool(size=POOL_SIZE)
g = SilentGreenlet(self._get_block, obj, **kwargs)
self.async_pool.start(g)
return g
def get_object_hashmap(self, obj, version=None, if_match=None, if_none_match=None,
if_modified_since=None, if_unmodified_since=None):
......@@ -834,6 +896,7 @@ class PithosClient(StorageClient):
if_etag_not_match=if_none_match, if_modified_since=if_modified_since,
if_unmodified_since=if_unmodified_since)
except ClientError as err:
r.release()
if err.status == 304 or err.status == 412:
return {}
raise
......@@ -842,17 +905,21 @@ class PithosClient(StorageClient):
return result
def set_account_group(self, group, usernames):
self.account_post(update=True, groups = {group:usernames})
r = self.account_post(update=True, groups = {group:usernames})
r.release()
def del_account_group(self, group):
return self.account_post(update=True, groups={group:[]})
r = self.account_post(update=True, groups={group:[]})
r.release()
def get_account_info(self, until=None):
from datetime import datetime
r = self.account_head(until=until)
if r.status_code == 401:
r.release()
raise ClientError("No authorization")
return r.headers
reply = r.headers
r.release()
return reply
def get_account_quota(self):
return filter_in(self.get_account_info(), 'X-Account-Policy-Quota', exactMatch = True)
......@@ -868,28 +935,37 @@ class PithosClient(StorageClient):
def set_account_meta(self, metapairs):
assert(type(metapairs) is dict)
self.account_post(update=True, metadata=metapairs)
r = self.account_post(update=True, metadata=metapairs)
r.release()
def del_account_meta(self, metakey):
self.account_post(update=True, metadata={metakey:''})
r = self.account_post(update=True, metadata={metakey:''})
r.release()
def set_account_quota(self, quota):
self.account_post(update=True, quota=quota)
r = self.account_post(update=True, quota=quota)
r.release()
def set_account_versioning(self, versioning):
self.account_post(update=True, versioning = versioning)
r = self.account_post(update=True, versioning = versioning)
r.release()
def list_containers(self):
r = self.account_get()
return r.json
reply = r.json
r.release()
return reply
def del_container(self, until=None, delimiter=None):
self.assert_container()
r = self.container_delete(until=until, delimiter=delimiter, success=(204, 404, 409))
if r.status_code == 404:
r.release()
raise ClientError('Container "%s" does not exist'%self.container, r.status_code)
elif r.status_code == 409:
r.release()
raise ClientError('Container "%s" is not empty'%self.container, r.status_code)
r.release()
def get_container_versioning(self, container):
self.container = container
......@@ -901,6 +977,8 @@ class PithosClient(StorageClient):
def get_container_info(self, until = None):
r = self.container_head(until=until)
reply = r.headers
r.release()
return r.headers
def get_container_meta(self, until = None):
......@@ -911,37 +989,48 @@ class PithosClient(StorageClient):
def set_container_meta(self, metapairs):
assert(type(metapairs) is dict)
self.container_post(update=True, metadata=metapairs)
r = self.container_post(update=True, metadata=metapairs)
r.release()
def del_container_meta(self, metakey):
self.container_post(update=True, metadata={metakey:''})
r = self.container_post(update=True, metadata={metakey:''})
r.release()
def set_container_quota(self, quota):
self.container_post(update=True, quota=quota)
r = self.container_post(update=True, quota=quota)
r.release()
def set_container_versioning(self, versioning):
self.container_post(update=True, versioning=versioning)
r = self.container_post(update=True, versioning=versioning)
r.release()
def del_object(self, obj, until=None, delimiter=None):
self.assert_container()
self.object_delete(obj, until=until, delimiter=delimiter)
r = self.object_delete(obj, until=until, delimiter=delimiter)
r.release()
def set_object_meta(self, object, metapairs):
assert(type(metapairs) is dict)
self.object_post(object, update=True, metadata=metapairs)
r = self.object_post(object, update=True, metadata=metapairs)
r.release()
def del_object_meta(self, metakey, object):
self.object_post(object, update=True, metadata={metakey:''})
r = self.object_post(object, update=True, metadata={metakey:''})
r.release()
def publish_object(self, object):
self.object_post(object, update=True, public=True)
r = self.object_post(object, update=True, public=True)
r.release()