Commit 2f432a67 authored by Georgios D. Tsoukalas's avatar Georgios D. Tsoukalas
Browse files

pool context manager, http pool AssertionError fix

Refs #3492

- Introduce a generic PooledObject class to act both
  as a context manager for getting and putting back
  an object from a pool.

- Implement a class PooledHTTPConnection(PooledObject)
  as a sublcass of the one in (1)

- Eliminate httplib.HTTPConnection patching of close().
  Eliminate put/get_http_connection.
  The httplib.HTTPConnection object is no longer released
  back to the pool via a method on it.
  One must explicitly put it to the pool,
  or use the PooledObject context manager in (2) above.

- Update lib.astakos, lib.quotaholder.http,
  synnefo.api.delegate, pithos.api.delegate to use
  PooledHTTPConnection.

- Update tests
parent 8fe10b65
......@@ -37,7 +37,7 @@ from urlparse import urlparse
from urllib import unquote
from django.utils import simplejson as json
from synnefo.lib.pool.http import get_http_connection
from synnefo.lib.pool.http import PooledHTTPConnection
logger = logging.getLogger(__name__)
......@@ -78,8 +78,7 @@ def call(token, url, headers=None, body=None, method='GET'):
'application/octet-stream')
kwargs['headers'].setdefault('content-length', len(body) if body else 0)
conn = get_http_connection(p.netloc, p.scheme)
try:
with PooledHTTPConnection(p.netloc, p.scheme) as conn:
conn.request(method, p.path + '?' + p.query, **kwargs)
response = conn.getresponse()
headers = response.getheaders()
......@@ -87,8 +86,6 @@ def call(token, url, headers=None, body=None, method='GET'):
length = response.getheader('content-length', None)
data = response.read(length)
status = int(response.status)
finally:
conn.close()
if status < 200 or status >= 300:
raise Exception(data, status)
......
......@@ -196,12 +196,32 @@ class ObjectPool(object):
log.debug("PUT-AFTER: finished putting object %r back to pool %r",
obj, self)
def pool_create_free(self):
"""Create a free new object that is not put into the pool.
Just for convenience, let the users create objects with
the exact same configuration as those that are used with the pool
"""
obj = self._pool_create_free()
return obj
def _pool_create_free(self):
"""Create a free new object that is not put into the pool.
This should be overriden by pool classes.
Otherwise, it just calls _pool_create().
"""
return self._pool_create()
def _pool_create(self):
"""Create a new object to be used with this pool.
Create a new object to be used with this pool,
should be overriden in subclasses.
Must be thread-safe.
"""
raise NotImplementedError
......@@ -226,3 +246,222 @@ class ObjectPool(object):
"""
raise NotImplementedError
class PooledObject(object):
"""Generic Object Pool context manager and pooled object proxy.
The PooledObject instance acts as a context manager to
be used in a with statement:
with PooledObject(...) as obj:
use(obj)
The with block above is roughly equivalent to:
pooled = PooledObject(...):
try:
obj = pooled.acquire()
assert(obj is pooled.obj)
use(obj)
finally:
pooled.release()
After exiting the with block, or releasing,
the code MUST not use the obj again in any way.
"""
# NOTE: We need all definitions at class-level
# to avoid infinite __gettatr__() recursion.
# This is also true for subclasses.
# NOTE: Typically you will only need to override
# __init__() and get_pool
# Initialization. Do not customize.
_pool_settings = None
_pool_get_settings = None
_pool_kwargs = None
_pool = None
obj = None
#####################################################
### Subclass attribute customization begins here. ###
_pool_log_prefix = "POOL"
_pool_class = ObjectPool
# default keyword args to pass to pool initialization
_pool_default_settings = (
('size', 25),
)
# keyword args to pass to pool_get
_pool_default_get_settings = (
('blocking', True),
#('timeout', None),
('create', True),
('verify', True),
)
# behavior settings
_pool_attach_context = False
_pool_disable_after_release = True
_pool_ignore_double_release = False
### Subclass attribute customization ends here. ###
#####################################################
def __init__(self, pool_settings=None,
get_settings=None,
attach_context=None,
disable_after_release=None,
ignore_double_release=None,
**kwargs):
"""Initialize a PooledObject instance.
Accept only keyword arguments.
Some of them are filtered for this instance's configuration,
and the rest are saved in ._pool_kwargs for later use.
The filtered keywords are:
pool_settings: keyword args forwarded to pool instance initialization
in get_pool(), on top of the class defaults.
If not given, the remaining keyword args are
forwarded instead.
get_settings: keyword args forwarded to the pool's .pool_get() on top
of the class defaults.
attach_context: boolean overriding the class default.
If True, after getting an object from the pool,
attach self onto it before returning it,
so that the context manager caller can have
access to the manager object within the with: block.
disable_after_release:
boolean overriding the class default.
If True, the PooledObject will not allow a second
acquisition after the first release. For example,
the second with will raise an AssertionError:
manager = PooledObject()
with manager as c:
pass
with manager as c:
pass
ignore_double_release:
boolean overriding the class default.
If True, the PooledObject will allow consecutive
calls to release the underlying pooled object.
Only the first one has an effect.
If False, an AssertionError is raised.
"""
self._pool_kwargs = kwargs
self._pool = None
self.obj = None
_get_settings = dict(self._pool_default_get_settings)
if get_settings is not None:
_get_settings.update(get_settings)
self._pool_get_settings = _get_settings
if attach_context is not None:
self._pool_attach_context = attach_context
if pool_settings is None:
pool_settings = kwargs
_pool_settings = dict(self._pool_default_settings)
_pool_settings.update(**pool_settings)
self._pool_settings = _pool_settings
def get_pool(self):
"""Return a suitable pool object to work with.
Called within .acquire(), it is meant to be
overriden by sublasses, to create a new pool,
or retrieve an existing one, based on the PooledObject
initialization keywords stored in self._pool_kwargs.
"""
pool = self._pool_class(**self._pool_settings)
return pool
### Maybe overriding get_pool() and __init__() above is enough ###
def __repr__(self):
return ("<object %s of class %s: "
"proxy for object (%r) in pool (%r)>" % (
id(self), self.__class__.__name__,
self.obj, self._pool))
__str__ = __repr__
## Proxy the real object. Disabled until needed.
##
##def __getattr__(self, name):
## return getattr(self.obj, name)
##def __setattr__(self, name, value):
## if hasattr(self, name):
## _setattr = super(PooledObject, self).__setattr__
## _setattr(name, value)
## else:
## setattr(self.obj, name, value)
##def __delattr_(self, name):
## _delattr = super(PooledObject, self).__delattr__
## if hasattr(self, name):
## _delattr(self, name)
## else:
## delattr(self.obj, name)
def __enter__(self):
return self.acquire()
def __exit__(self, exc_type, exc_value, trace):
return self.release()
def acquire(self):
log.debug("%s Acquiring (context: %r)", self._pool_log_prefix, self)
pool = self._pool
if pool is False:
m = "%r: has been released. No further pool access allowed." % (
self,)
raise AssertionError(m)
if pool is not None:
m = "Double acquire in %r" % self
raise AssertionError(m)
pool = self.get_pool()
self._pool = pool
obj = pool.pool_get(**self._pool_get_settings)
if self._pool_attach_context:
obj._pool_context = self
self.obj = obj
log.debug("%s Acquired %r", self._pool_log_prefix, obj)
return obj
def release(self):
log.debug("%s Releasing (context: %r)", self._pool_log_prefix, self)
pool = self._pool
if pool is None:
m = "%r: no pool" % (self,)
raise AssertionError(m)
obj = self.obj
if obj is None:
if self._pool_ignore_double_release:
return
m = "%r: no object. Double release?" % (self,)
raise AssertionError(m)
pool.pool_put(obj)
self.obj = None
if self._pool_disable_after_release:
self._pool = False
......@@ -31,7 +31,7 @@
# interpreted as representing official policies, either expressed
# or implied, of GRNET S.A.
from synnefo.lib.pool import ObjectPool
from synnefo.lib.pool import ObjectPool, PooledObject
from select import select
from httplib import (
......@@ -40,33 +40,22 @@ from httplib import (
ResponseNotReady,
)
from new import instancemethod
from threading import Lock
import logging
log = logging.getLogger(__name__)
_pools = {}
pool_size = 8
_pools_mutex = Lock()
default_pool_size = 100
USAGE_LIMIT = 1000
def init_http_pooling(size):
global pool_size
pool_size = size
def put_http_connection(conn):
pool = conn._pool
log.debug("HTTP-PUT-BEFORE: putting connection %r back to pool %r",
conn, pool)
if pool is None:
log.debug("HTTP-PUT: connection %r does not have a pool", conn)
return
conn._pool = None
pool.pool_put(conn)
global default_pool_size
default_pool_size = size
class HTTPConnectionPool(ObjectPool):
......@@ -93,23 +82,13 @@ class HTTPConnectionPool(ObjectPool):
def _pool_create(self):
log.debug("CREATE-HTTP-BEFORE from pool %r", self)
conn = self.connection_class(self.netloc)
conn._use_counter = USAGE_LIMIT
conn._pool = self
conn._real_close = conn.close
conn.close = instancemethod(put_http_connection, conn, type(conn))
conn._pool_use_counter = USAGE_LIMIT
return conn
def _pool_verify(self, conn):
log.debug("VERIFY-HTTP")
# _pool verify is called at every pool_get().
# Make sure this connection obj is associated with the proper pool.
# The association is broken by put_http_connection(), to prevent
# a connection object from being returned to the pool twice,
# on duplicate invocations of conn.close().
if conn is None:
return False
if not conn._pool:
conn._pool = self
sock = conn.sock
if sock is None:
return True
......@@ -120,10 +99,11 @@ class HTTPConnectionPool(ObjectPool):
def _pool_cleanup(self, conn):
log.debug("CLEANUP-HTTP")
# every connection can be used a finite number of times
conn._use_counter -= 1
conn._pool_use_counter -= 1
# see httplib source for connection states documentation
if conn._use_counter > 0 and conn._HTTPConnection__state == 'Idle':
if (conn._pool_use_counter > 0 and
conn._HTTPConnection__state == 'Idle'):
try:
conn.getresponse()
except ResponseNotReady:
......@@ -131,23 +111,39 @@ class HTTPConnectionPool(ObjectPool):
return False
log.debug("CLEANUP-HTTP: Closing connection. Will not reuse.")
conn._real_close()
conn.close()
return True
def get_http_connection(netloc=None, scheme='http', pool_size=pool_size):
log.debug("HTTP-GET: Getting HTTP connection")
if netloc is None:
m = "netloc cannot be None"
raise ValueError(m)
# does the pool need to be created?
# ensure distinct pools are created for every (scheme, netloc) combination
key = (scheme, netloc)
if key not in _pools:
log.debug("HTTP-GET: Creating pool for key %s", key)
pool = HTTPConnectionPool(scheme, netloc, size=pool_size)
_pools[key] = pool
obj = _pools[key].pool_get()
log.debug("HTTP-GET: Returning object %r", obj)
return obj
class PooledHTTPConnection(PooledObject):
_pool_log_prefix = "HTTP"
_pool_class = HTTPConnectionPool
def __init__(self, netloc, scheme='http', pool=None, **kw):
kw['netloc'] = netloc
kw['scheme'] = scheme
kw['pool'] = pool
super(PooledHTTPConnection, self).__init__(**kw)
def get_pool(self):
kwargs = self._pool_kwargs
pool = kwargs.pop('pool', None)
if pool is not None:
return pool
# pool was not given, find one from the global registry
scheme = kwargs['scheme']
netloc = kwargs['netloc']
size = kwargs.get('size', default_pool_size)
# ensure distinct pools for every (scheme, netloc) combination
key = (scheme, netloc)
with _pools_mutex:
if key not in _pools:
log.debug("HTTP-GET: Creating pool for key %s", key)
pool = HTTPConnectionPool(scheme, netloc, size=size)
_pools[key] = pool
else:
pool = _pools[key]
return pool
......@@ -57,8 +57,10 @@ import time
import threading
from collections import defaultdict
from socket import socket, AF_INET, SOCK_STREAM, IPPROTO_TCP, SHUT_RDWR
from synnefo.lib.pool import ObjectPool, PoolLimitError, PoolVerificationError
from synnefo.lib.pool.http import get_http_connection
from synnefo.lib.pool.http import PooledHTTPConnection, HTTPConnectionPool
from synnefo.lib.pool.http import _pools as _http_pools
# Use backported unittest functionality if Python < 2.7
......@@ -291,25 +293,115 @@ class ThreadSafetyTestCase(unittest.TestCase):
class TestHTTPConnectionTestCase(unittest.TestCase):
def test_double_close(self):
conn = get_http_connection("127.0.0.1", "http")
self.assertEqual(conn._pool, _http_pools[("http", "127.0.0.1")])
conn.close()
self.assertIsNone(conn._pool)
# This call does nothing, because conn._pool is already None
conn.close()
self.assertIsNone(conn._pool)
def setUp(self):
#netloc = "127.0.0.1:9999"
#scheme='http'
#self.pool = HTTPConnectionPool(
# netloc=netloc,
# scheme=scheme,
# pool_size=1)
#key = (scheme, netloc)
#_http_pools[key] = pool
_http_pools.clear()
self.host = "127.0.0.1"
self.port = 9999
self.netloc = "%s:%s" % (self.host, self.port)
self.scheme = "http"
self.key = (self.scheme, self.netloc)
sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)
sock.bind((self.host, self.port))
sock.listen(1)
self.sock = sock
def tearDown(self):
sock = self.sock
sock.shutdown(SHUT_RDWR)
sock.close()
def test_double_release(self):
pooled = PooledHTTPConnection(self.netloc, self.scheme)
pooled.acquire()
pool = pooled._pool
self.assertTrue(pooled._pool is _http_pools[(self.scheme, self.netloc)])
pooled.release()
poolsize = len(pool._set)
if PooledHTTPConnection._pool_disable_after_release:
self.assertTrue(pooled._pool is False)
if not PooledHTTPConnection._pool_ignore_double_release:
with self.assertRaises(AssertionError):
pooled.release()
else:
pooled.release()
self.assertEqual(poolsize, len(pool._set))
def test_distinct_pools_per_scheme(self):
conn = get_http_connection("127.0.0.1", "http")
pool = conn._pool
self.assertTrue(pool is _http_pools[("http", "127.0.0.1")])
conn.close()
conn2 = get_http_connection("127.0.0.1", "https")
self.assertTrue(conn is not conn2)
self.assertNotEqual(pool, conn2._pool)
self.assertTrue(conn2._pool is _http_pools[("https", "127.0.0.1")])
conn2.close()
with PooledHTTPConnection("127.0.0.1", "http",
attach_context=True) as conn:
pool = conn._pool_context._pool
self.assertTrue(pool is _http_pools[("http", "127.0.0.1")])
with PooledHTTPConnection("127.0.0.1", "https",
attach_context=True) as conn2:
pool2 = conn2._pool_context._pool
self.assertTrue(conn is not conn2)
self.assertNotEqual(pool, pool2)
self.assertTrue(pool2 is _http_pools[("https", "127.0.0.1")])
def test_clean_connection(self):
pool = None
pooled = PooledHTTPConnection(self.netloc, self.scheme)
conn = pooled.acquire()
pool = pooled._pool
self.assertTrue(pool is not None)
pooled.release()
self.assertTrue(pooled._pool is False)
poolset = pool._set
self.assertEqual(len(poolset), 1)
pooled_conn = list(poolset)[0]
self.assertTrue(pooled_conn is conn)
def test_dirty_connection(self):
pooled = PooledHTTPConnection(self.netloc, self.scheme)
conn = pooled.acquire()
pool = pooled._pool
conn.request("GET", "/")
serversock, addr = self.sock.accept()
serversock.send("HTTP/1.1 200 OK\n"
"Content-Length: 6\n"
"\n"
"HELLO\n")
time.sleep(0.3)
# We would read this message like this
#resp = conn.getresponse()
# but we won't so the connection is dirty
pooled.release()
poolset = pool._set
self.assertEqual(len(poolset), 0)
def test_context_manager_exception_safety(self):
class TestError(Exception):
pass
for i in xrange(10):
pool = None
try:
with PooledHTTPConnection(
self.netloc, self.scheme,
size=1, attach_context=True) as conn:
pool = conn._pool_context._pool
raise TestError()
except TestError:
self.assertTrue(pool is not None)
self.assertEqual(pool._semaphore._Semaphore__value, 1)
if __name__ == '__main__':
unittest.main()
......@@ -32,7 +32,7 @@
# or implied, of GRNET S.A.
from synnefo.lib.commissioning import Callpoint, CallError
from synnefo.lib.pool.http import get_http_connection
from synnefo.lib.pool.http import PooledHTTPConnection
from .api import QuotaholderAPI
from json import loads as json_loads, dumps as json_dumps
......@@ -70,17 +70,14 @@ class QuotaholderClient(Callpoint):
logger.debug("%s %s\n%s\n<<<\n", method, path, json_data[:128])
headers = {'X-Auth-Token': self._token}
conn = get_http_connection(scheme=self._scheme, netloc=self._netloc,
pool_size=self._poolsize)
try:
with PooledHTTPConnection(scheme=self._scheme,
netloc=self._netloc,
size=self._poolsize) as conn:
conn.request(method, path, body=json_data, headers=headers)
resp = conn.getresponse()
finally:
conn.close()
body = resp.read()
logger.debug(">>>\nStatus: %s", resp.status)
body = resp.read()
logger.debug("\n%s\n<<<\n", body[:128] if body else None)
status = int(resp.status)
......
......@@ -42,7 +42,7 @@ from django.conf import settings
USER_CATALOG_URL = getattr(settings, 'CYCLADES_USER_CATALOG_URL', None)
USER_FEEDBACK_URL = getattr(settings, 'CYCLADES_USER_FEEDBACK_URL', None)
from synnefo.lib.pool.http import get_http_connection
from synnefo.lib.pool.http import PooledHTTPConnection
logger = logging.getLogger(__name__)