Commit c2b5da2f authored by Stavros Sachtouris's avatar Stavros Sachtouris
Browse files

Redesign http connection module

Merge everything to kamaki.clients

Remove multiple hierarchies of connection classes (== abandon support for
    alternative connection methods, e.g. for requests from pypi)

New connection classes are
- RequestManager
- ResponseManager
Semantics:
    RequestManager constructor commits a request without performing it
      but it performs all sanity checks for url, path, method and params
    RequestManager.perform will perform the commited request, if called
    ResponseManager constructor needs a RequestManager instance
    ResponseManager.(various properties)
        request is pooled from a ContextManagerPool (objpool) when needed.
        Results are cached and used as long as the ResponseManager instance
        is alive

Remove KamakiConnection/ResponseErrors, use ClientError wrapers for everything

Remove the connection livetest

TODO: Adjust unittests
parent e0d3e091
......@@ -32,14 +32,17 @@
# or implied, of GRNET S.A.
from urllib2 import quote
from urlparse import urlparse
from threading import Thread
from json import dumps, loads
from time import time
from httplib import ResponseNotReady
from time import sleep
from random import random
from objpool.http import PooledHTTPConnection
from kamaki.clients.utils import logger
from kamaki.clients.connection.kamakicon import KamakiHTTPConnection
from kamaki.clients.connection.errors import KamakiConnectionError
from kamaki.clients.connection.errors import KamakiResponseError
LOG_TOKEN = False
DEBUG_LOG = logger.get_log_filename()
......@@ -60,6 +63,14 @@ datarecvlog = logger.get_logger('data.recv')
logger.add_file_logger('ClientError', __name__, filename=DEBUG_LOG)
clienterrorlog = logger.get_logger('ClientError')
HTTP_METHODS = ['GET', 'POST', 'PUT', 'HEAD', 'DELETE', 'COPY', 'MOVE']
def _encode(v):
if v and isinstance(v, unicode):
return quote(v.encode('utf-8'))
return v
class ClientError(Exception):
def __init__(self, message, status=0, details=None):
......@@ -97,6 +108,147 @@ class ClientError(Exception):
self.details = details if details else []
class RequestManager(object):
"""Handle http request information"""
def _connection_info(self, url, path, params={}):
""" Set self.url to scheme://netloc/?params
:param url: (str or unicode) The service url
:param path: (str or unicode) The service path (url/path)
:param params: (dict) Parameters to add to final url
:returns: (scheme, netloc)
"""
url = _encode(url) if url else 'http://127.0.0.1/'
url += '' if url.endswith('/') else '/'
if path:
url += _encode(path[1:] if path.startswith('/') else path)
for i, (key, val) in enumerate(params.items()):
val = _encode(val)
url += '%s%s' % ('&' if i else '?', key)
if val:
url += '=%s' % val
parsed = urlparse(url)
self.url = url
self.path = parsed.path or '/'
if parsed.query:
self.path += '?%s' % parsed.query
return (parsed.scheme, parsed.netloc)
def __init__(
self, method, url, path,
data=None, headers={}, params={}):
method = method.upper()
assert method in HTTP_METHODS, 'Invalid http method %s' % method
if headers:
assert isinstance(headers, dict)
self.headers = dict(headers)
self.method, self.data = method, data
self.scheme, self.netloc = self._connection_info(url, path, params)
def perform(self, conn):
"""
:param conn: (httplib connection object)
:returns: (HTTPResponse)
"""
# sendlog.debug(
# 'RequestManager.perform mthd(%s), url(%s), headrs(%s), bdlen(%s)',
# self.method, self.url, self.headers, self.data)
conn.request(
method=str(self.method.upper()),
url=str(self.path),
headers=self.headers,
body=self.data)
while True:
try:
return conn.getresponse()
except ResponseNotReady:
sleep(0.03 * random())
class ResponseManager(object):
"""Manage the http request and handle the response data, headers, etc."""
def __init__(self, request, poolsize=None):
"""
:param request: (RequestManager)
"""
self.request = request
self._request_performed = False
self.poolsize = poolsize
def _get_response(self):
if self._request_performed:
return
pool_kw = dict(size=self.poolsize) if self.poolsize else dict()
try:
with PooledHTTPConnection(
self.request.netloc, self.request.scheme,
**pool_kw) as connection:
r = self.request.perform(connection)
# recvlog.debug('ResponseManager(%s):' % r)
self._request_performed = True
self._headers = dict()
for k, v in r.getheaders():
self.headers[k] = v
# recvlog.debug('\t%s: %s\t(%s)' % (k, v, r))
self._content = r.read()
self._status_code = r.status
self._status = r.reason
except Exception as err:
from kamaki.clients import recvlog
from traceback import format_stack
recvlog.debug('\n'.join(['%s' % type(err)] + format_stack()))
raise ClientError(
'Failed while http-connecting to %s (%s)' % (
self.request.url,
err),
1000)
@property
def status_code(self):
self._get_response()
return self._status_code
@property
def status(self):
self._get_response()
return self._status
@property
def headers(self):
self._get_response()
return self._headers
@property
def content(self):
self._get_response()
return self._content
@property
def text(self):
"""
:returns: (str) content
"""
self._get_response()
return '%s' % self._content
@property
def json(self):
"""
:returns: (dict) squeezed from json-formated content
"""
self._get_response()
try:
return loads(self._content)
except ValueError as err:
ClientError('Response not formated in JSON - %s' % err)
class SilentEvent(Thread):
""" Thread-run method(*args, **kwargs)"""
def __init__(self, method, *args, **kwargs):
......@@ -127,15 +279,14 @@ class SilentEvent(Thread):
class Client(object):
def __init__(self, base_url, token, http_client=KamakiHTTPConnection()):
def __init__(self, base_url, token):
self.base_url = base_url
self.token = token
self.headers = {}
self.headers, self.params = dict(), dict()
self.DATE_FORMATS = [
'%a %b %d %H:%M:%S %Y',
'%A, %d-%b-%y %H:%M:%S GMT',
'%a, %d %b %Y %H:%M:%S GMT']
self.http_client = http_client
self.MAX_THREADS = 7
def _init_thread_limit(self, limit=1):
......@@ -179,18 +330,15 @@ class Client(object):
def set_header(self, name, value, iff=True):
"""Set a header 'name':'value'"""
if value is not None and iff:
self.http_client.set_header(name, value)
self.headers[name] = value
def set_param(self, name, value=None, iff=True):
if iff:
self.http_client.set_param(name, value)
self.params[name] = value
def request(
self,
method,
path,
async_headers={},
async_params={},
self, method, path,
async_headers=dict(), async_params=dict(),
**kwargs):
"""In threaded/asynchronous requests, headers and params are not safe
Therefore, the standard self.set_header/param system can be used only
......@@ -205,43 +353,33 @@ class Client(object):
assert method
assert isinstance(path, str) or isinstance(path, unicode)
try:
headers = dict(self.headers)
headers.update(async_headers)
params = dict(self.params)
params.update(async_params)
success = kwargs.pop('success', 200)
data = kwargs.pop('data', None)
self.http_client.headers.setdefault('X-Auth-Token', self.token)
headers.setdefault('X-Auth-Token', self.token)
if 'json' in kwargs:
data = dumps(kwargs.pop('json'))
self.http_client.headers.setdefault(
'Content-Type',
'application/json')
headers.setdefault('Content-Type', 'application/json')
if data:
self.http_client.headers.setdefault(
'Content-Length',
'%s' % len(data))
sendlog.info('perform a %s @ %s', method, self.base_url)
self.http_client.url = self.base_url
self.http_client.path = quote(path.encode('utf8'))
r = self.http_client.perform_request(
method,
data,
async_headers,
async_params)
req = self.http_client
sendlog.info('%s %s', method, req.url)
headers = dict(req.headers)
headers.update(async_headers)
for key, val in headers.items():
headers.setdefault('Content-Length', '%s' % len(data))
req = RequestManager(
method, self.base_url, path,
data=data, headers=headers, params=params)
sendlog.info('commit a %s @ %s\t[%s]', method, self.base_url, self)
sendlog.info('\tpath: %s\t[%s]', req.path, self)
for key, val in req.headers.items():
if (not LOG_TOKEN) and key.lower() == 'x-auth-token':
continue
sendlog.info('\t%s: %s', key, val)
sendlog.info('')
sendlog.info('\t%s: %s [%s]', key, val, self)
if data:
datasendlog.info(data)
sendlog.info('END HTTP request commit\t[%s]', self)
r = ResponseManager(req)
recvlog.info('%d %s', r.status_code, r.status)
for key, val in r.headers.items():
if (not LOG_TOKEN) and key.lower() == 'x-auth-token':
......@@ -249,26 +387,14 @@ class Client(object):
recvlog.info('%s: %s', key, val)
if r.content:
datarecvlog.info(r.content)
except (KamakiResponseError, KamakiConnectionError) as err:
from traceback import format_stack
recvlog.debug('\n'.join(['%s' % type(err)] + format_stack()))
self.http_client.reset_headers()
self.http_client.reset_params()
errstr = '%s' % err
if not errstr:
errstr = ('%s' % type(err))[7:-2]
status = getattr(err, 'status', getattr(err, 'errno', 0))
raise ClientError('%s\n' % errstr, status=status)
finally:
self.http_client.reset_headers()
self.http_client.reset_params()
self.headers = dict()
self.params = dict()
if success is not None:
# Success can either be an int or a collection
success = (success,) if isinstance(success, int) else success
if r.status_code not in success:
r.release()
self._raise_for_status(r)
return r
......
......@@ -63,9 +63,6 @@ class FR(object):
status = None
status_code = 200
def release(self):
pass
astakos_pkg = 'kamaki.clients.astakos.AstakosClient'
......
......@@ -118,16 +118,14 @@ class ComputeClient(ComputeRestClient):
:param new_name: (str)
"""
req = {'server': {'name': new_name}}
r = self.servers_put(server_id, json_data=req)
r.release()
self.servers_put(server_id, json_data=req)
def delete_server(self, server_id):
"""Submit a deletion request for a server specified by id
:param server_id: integer (str or int)
"""
r = self.servers_delete(server_id)
r.release()
self.servers_delete(server_id)
def reboot_server(self, server_id, hard=False):
"""
......@@ -137,8 +135,7 @@ class ComputeClient(ComputeRestClient):
"""
boot_type = 'HARD' if hard else 'SOFT'
req = {'reboot': {'type': boot_type}}
r = self.servers_post(server_id, 'action', json_data=req)
r.release()
self.servers_post(server_id, 'action', json_data=req)
def get_server_metadata(self, server_id, key=''):
"""
......@@ -188,8 +185,7 @@ class ComputeClient(ComputeRestClient):
:param key: (str) the meta key
"""
r = self.servers_delete(server_id, 'meta/' + key)
r.release()
self.servers_delete(server_id, 'meta/' + key)
def list_flavors(self, detail=False):
"""
......@@ -238,8 +234,7 @@ class ComputeClient(ComputeRestClient):
"""
:param image_id: (str)
"""
r = self.images_delete(image_id)
r.release()
self.images_delete(image_id)
def get_image_metadata(self, image_id, key=''):
"""
......@@ -286,5 +281,4 @@ class ComputeClient(ComputeRestClient):
:param key: (str) metadatum key
"""
command = path4url('meta', key)
r = self.images_delete(image_id, command)
r.release()
self.images_delete(image_id, command)
......@@ -106,9 +106,6 @@ class FR(object):
status = None
status_code = 200
def release(self):
pass
class ComputeRestClient(TestCase):
......
# Copyright 2011-2012 GRNET S.A. All rights reserved.
#
# Redistribution and use in source and binary forms, with or
# without modification, are permitted provided that the following
# conditions are met:
#
# 1. Redistributions of source code must retain the above
# copyright notice, self.list of conditions and the following
# disclaimer.
#
# 2. Redistributions in binary form must reproduce the above
# copyright notice, self.list of conditions and the following
# disclaimer in the documentation and/or other materials
# provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# The views and conclusions contained in the software and
# documentation are those of the authors and should not be
# interpreted as representing official policies, either expressed
# or implied, of GRNET S.A.
class KamakiResponse(object):
"""An abstract HTTP Response object to handle a performed HTTPRequest.
Subclass implementation required
"""
def __init__(self, request, prefetched=False):
self.request = request
self.prefetched = prefetched
def _get_response(self):
"""Wait for http response as late as possible"""
if self.prefetched:
return
self = self.request.response
self.prefetched = True
def release(self):
"""Release the connection.
"""
raise NotImplementedError
@property
def prefetched(self):
"""flag to avoid downloading more than nessecary"""
return self._prefetched
@prefetched.setter
def prefetched(self, p):
self._prefetched = p
@property
def content(self):
""":returns: (binary) request response content (data)"""
self._get_response()
return self._content
@content.setter
def content(self, v):
self._content = v
@property
def text(self):
"""(str)"""
self._get_response()
return self._text
@text.setter
def text(self, v):
self._text = v
@property
def json(self):
"""(dict)"""
self._get_response()
return self._json
@json.setter
def json(self, v):
self._json = v
@property
def headers(self):
"""(dict)"""
self._get_response()
return self._headers
@headers.setter
def headers(self, v):
self._headers = v
@property
def status_code(self):
"""(int) optional"""
self._get_response()
return self._status_code
@status_code.setter
def status_code(self, v):
self._status_code = v
@property
def status(self):
"""(str) useful in server error responses"""
self._get_response()
return self._status
@status.setter
def status(self, v):
self._status = v
@property
def request(self):
"""(KamakiConnection) the source of this response object"""
return self._request
@request.setter
def request(self, v):
self._request = v
class KamakiConnection(object):
"""An abstract HTTP Connection mechanism. Subclass implementation required
"""
def __init__(
self,
method=None, url=None, params={}, headers={}, poolsize=8):
self.headers = headers
self.params = params
self.url = url
self.path = ''
self.method = method
self.poolsize = poolsize
@property
def poolsize(self):
return self._poolsize
@poolsize.setter
def poolsize(self, v):
assert isinstance(v, (int, long)) and v > 0
self._poolsize = v
def set_header(self, name, value):
assert name, 'KamakiConnection header key cannot be 0 or empty'
self.headers['%s' % name] = '%s' % value
def remove_header(self, name):
try:
self.headers.pop(name)
except KeyError:
pass
def replace_headers(self, new_headers):
self.headers = new_headers
def reset_headers(self):
self.replace_headers({})
def set_param(self, name, value=None):
assert name, 'KamakiConnection param key cannot be 0 or empty'
self.params[unicode(name)] = value
def remove_param(self, name):
try:
self.params.pop(name)
except KeyError:
pass
def replace_params(self, new_params):
self.params = new_params
def reset_params(self):
self.replace_params({})
def set_url(self, url):
self.url = url
def set_path(self, path):
self.path = path
def set_method(self, method):
self.method = method
def perform_request(
self,
method=None,
url=None,
async_headers={},
async_params={},
data=None):
raise NotImplementedError
# Copyright 2012 GRNET S.A. All rights reserved.
#
# Redistribution and use in source and binary forms, with or
# without modification, are permitted provided that the following
# conditions are met:
#
# 1. Redistributions of source code must retain the above
# copyright notice, self.list of conditions and the following
# disclaimer.
#
# 2. Redistributions in binary form must reproduce the above
# copyright notice, self.list of conditions and the following
# disclaimer in the documentation and/or other materials