Commit 208556af authored by Georgios D. Tsoukalas's avatar Georgios D. Tsoukalas
Browse files

Fix+move HTTP quotaholder client in synnefo.lib

Allow per-service configuration of the (http) quotaholder client.
Kamaki is no longer needed in service (or ganeti) nodes,
because the client has been moved to snf-common.

Also fix the default quotaholder settings for pithos backend to be disabled
by default, and don't initialize quotaholder client when not needed.
This fixes crashes of non-user-facing pithos backend uses such as
pithcat from snf-image.

Refs #3421
parent efd8630d
......@@ -43,10 +43,11 @@ from astakos.im.settings import (
QUOTAHOLDER_URL, QUOTAHOLDER_TOKEN, LOGGING_LEVEL)
if QUOTAHOLDER_URL:
from kamaki.clients.quotaholder import QuotaholderClient
from kamaki.clients.quotaholder import QH_PRACTICALLY_INFINITE
from synnefo.lib.quotaholder import (
QuotaholderClient, QH_PRACTICALLY_INFINITE)
from synnefo.util.number import strbigdec
from astakos.im.settings import QUOTAHOLDER_POOLSIZE
ENTITY_KEY = '1'
......@@ -66,7 +67,8 @@ def get_client():
return _client
if not QUOTAHOLDER_URL:
return
_client = QuotaholderClient(QUOTAHOLDER_URL, token=QUOTAHOLDER_TOKEN)
_client = QuotaholderClient(QUOTAHOLDER_URL, token=QUOTAHOLDER_TOKEN,
poolsize=QUOTAHOLDER_POOLSIZE)
return _client
......@@ -507,7 +509,8 @@ def timeline_charge(entity, resource, after, before, details, charge_type):
m = 'charge type %s not supported' % charge_type
raise ValueError(m)
quotaholder = QuotaholderClient(QUOTAHOLDER_URL, token=QUOTAHOLDER_TOKEN)
quotaholder = QuotaholderClient(QUOTAHOLDER_URL, token=QUOTAHOLDER_TOKEN,
poolsize=QUOTAHOLDER_POOLSIZE)
timeline = quotaholder.get_timeline(
context={},
after=after,
......
......@@ -159,6 +159,7 @@ PROJECT_MEMBERSHIP_LEAVE_REQUEST_SUBJECT = getattr(
# Set the quota holder component URI
QUOTAHOLDER_URL = getattr(settings, 'ASTAKOS_QUOTAHOLDER_URL', '')
QUOTAHOLDER_TOKEN = getattr(settings, 'ASTAKOS_QUOTAHOLDER_TOKEN', '')
QUOTAHOLDER_POOLSIZE = getattr(settings, 'ASTAKOS_QUOTAHOLDER_POOLSIZE', 50)
# Set the cloud service properties
SERVICES = getattr(settings, 'ASTAKOS_SERVICES', {
......
......@@ -111,6 +111,12 @@
# Set the quotaholder component URI and token
#ASTAKOS_QUOTAHOLDER_URL = ''
#ASTAKOS_QUOTAHOLDER_TOKEN = ''
#
# Tune the size of the quotaholder http client connection pool.
# This limits the number of concurrent requests to quotaholder.
# If quotaholder is installed in the same system as astakos,
# This must be at most half the synnefo_poolsize for the django database
#ASTAKOS_QUOTAHOLDER_POOLSIZE = 50
# Setup quotaholder URL and token when snf-quotaholder-app is installed
# in the same server as astakos (recommended)
......
from .api import *
from .http import QuotaholderClient
QH_PRACTICALLY_INFINITE = 10 ** 32
......@@ -32,10 +32,11 @@
# or implied, of GRNET S.A.
from .quotaholder import QuotaholderAPI, QH_PRACTICALLY_INFINITE
from .exception import ( InvalidKeyError, NoEntityError,
NoQuantityError, NoCapacityError,
ExportLimitError, ImportLimitError,
CorruptedError, InvalidDataError,
DuplicateError)
from .exception import (CallError,
InvalidKeyError, NoEntityError,
NoQuantityError, NoCapacityError,
ExportLimitError, ImportLimitError,
CorruptedError, InvalidDataError,
DuplicateError)
API_Spec = QuotaholderAPI
# Copyright 2012 GRNET S.A. All rights reserved.
#
# Redistribution and use in source and binary forms, with or
# without modification, are permitted provided that the following
# conditions are met:
#
# 1. Redistributions of source code must retain the above
# copyright notice, this list of conditions and the following
# disclaimer.
#
# 2. Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials
# provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# The views and conclusions contained in the software and
# documentation are those of the authors and should not be
# interpreted as representing official policies, either expressed
# or implied, of GRNET S.A.
from synnefo.lib.commissioning import Callpoint, CallError
from synnefo.lib.pool.http import get_http_connection
from .api import QuotaholderAPI
from json import loads as json_loads, dumps as json_dumps
import logging
from urlparse import urlparse
logger = logging.getLogger(__name__)
class QuotaholderClient(Callpoint):
api_spec = QuotaholderAPI()
def __init__(self, base_url, token='', poolsize=1000):
super(QuotaholderClient, self).__init__()
self._url = base_url
parsed = urlparse(base_url)
self._netloc = parsed.netloc
self._scheme = parsed.scheme
basepath = parsed.path
if not basepath.endswith('/'):
basepath += '/'
self._basepath = basepath
self._token = token
self._poolsize = poolsize
def do_make_call(self, api_call, data):
gettable = ['list', 'get', 'read']
method = ('GET' if any(api_call.startswith(x) for x in gettable)
else 'POST')
path = self._basepath + api_call
json_data = json_dumps(data)
logger.debug("%s %s\n%s\n<<<\n", method, path, json_data[:128])
headers = {'X-Auth-Token': self._token}
conn = get_http_connection(scheme=self._scheme, netloc=self._netloc,
pool_size=self._poolsize)
try:
conn.request(method, path, body=json_data, headers=headers)
resp = conn.getresponse()
finally:
conn.close()
logger.debug(">>>\nStatus: %s", resp.status)
body = resp.read()
logger.debug("\n%s\n<<<\n", body[:128] if body else None)
status = int(resp.status)
if status == 200:
return json_loads(body)
else:
try:
error = json_loads(body)
except ValueError:
exc = CallError(body, call_error='ValueError')
else:
exc = CallError.from_dict(error)
raise exc
......@@ -12,4 +12,9 @@
#
# CYCLADES_QUOTAHOLDER_URL = "https://accounts.example.synnefo.org/quotaholder/v"
#
# CYCLADES_QUOTAHOLDER_TOKEN = ""
# # CYCLADES_QUOTAHOLDER_TOKEN = ""
# # Tune the size of the http pool for the quotaholder client.
# # It limits the maximum number of quota-querying requests
# # that Cyclades can serve. Extra requests will be blocked
# # until another has completed.
# CYCLADES_QUOTAHOLDER_POOLSIZE = 200
......@@ -13,3 +13,9 @@ CYCLADES_USE_QUOTAHOLDER = False
CYCLADES_QUOTAHOLDER_URL = "http://127.0.0.1:8008/api/quotaholder/v"
CYCLADES_QUOTAHOLDER_TOKEN = ""
# Tune the size of the http pool for the quotaholder client.
# It limits the maximum number of quota-querying requests
# that Cyclades can serve. Extra requests will be blocked
# until another has completed.
CYCLADES_QUOTAHOLDER_POOLSIZE = 200
......@@ -65,7 +65,8 @@ import logging
from synnefo.settings import (CYCLADES_USE_QUOTAHOLDER,
CYCLADES_QUOTAHOLDER_URL,
CYCLADES_QUOTAHOLDER_TOKEN)
CYCLADES_QUOTAHOLDER_TOKEN,
CYCLADES_QUOTAHOLDER_POOLSIZE)
logger = logging.getLogger(__name__)
......@@ -117,12 +118,14 @@ class NotAllowedError(BackendException):
from pithos.backends.util import PithosBackendPool
POOL_SIZE = 8
_pithos_backend_pool = \
PithosBackendPool(POOL_SIZE,
quotaholder_enabled=CYCLADES_USE_QUOTAHOLDER,
quotaholder_url=CYCLADES_QUOTAHOLDER_URL,
quotaholder_token=CYCLADES_QUOTAHOLDER_TOKEN,
db_connection=settings.BACKEND_DB_CONNECTION,
block_path=settings.BACKEND_BLOCK_PATH)
PithosBackendPool(
POOL_SIZE,
quotaholder_enabled=CYCLADES_USE_QUOTAHOLDER,
quotaholder_url=CYCLADES_QUOTAHOLDER_URL,
quotaholder_token=CYCLADES_QUOTAHOLDER_TOKEN,
quotaholder_client_poolsize=CYCLADES_QUOTAHOLDER_POOLSIZE,
db_connection=settings.BACKEND_DB_CONNECTION,
block_path=settings.BACKEND_BLOCK_PATH)
def get_pithos_backend():
......
......@@ -37,15 +37,15 @@ from synnefo.settings import CYCLADES_USE_QUOTAHOLDER
if CYCLADES_USE_QUOTAHOLDER:
from synnefo.settings import (CYCLADES_QUOTAHOLDER_URL,
CYCLADES_QUOTAHOLDER_TOKEN)
from kamaki.clients.quotaholder import QuotaholderClient
CYCLADES_QUOTAHOLDER_TOKEN,
CYCLADES_QUOTAHOLDER_POOLSIZE)
from synnefo.lib.quotaholder import QuotaholderClient
else:
from synnefo.settings import (VMS_USER_QUOTA, MAX_VMS_PER_USER,
NETWORKS_USER_QUOTA, MAX_NETWORKS_PER_USER)
from kamaki.clients.quotaholder.api import (NoCapacityError, NoQuantityError,
NoEntityError)
from kamaki.clients.commissioning import CallError
from synnefo.lib.quotaholder.api import (NoCapacityError, NoQuantityError,
NoEntityError, CallError)
import logging
log = logging.getLogger(__name__)
......@@ -110,7 +110,8 @@ def get_quota_holder():
"""Context manager for using a QuotaHolder."""
if CYCLADES_USE_QUOTAHOLDER:
quotaholder = QuotaholderClient(CYCLADES_QUOTAHOLDER_URL,
token=CYCLADES_QUOTAHOLDER_TOKEN)
token=CYCLADES_QUOTAHOLDER_TOKEN,
poolsize=CYCLADES_QUOTAHOLDER_POOLSIZE)
else:
quotaholder = DummyQuotaholderClient()
......
......@@ -48,3 +48,10 @@
#PITHOS_USE_QUOTAHOLDER = False
#PITHOS_QUOTAHOLDER_URL = ''
#PITHOS_QUOTAHOLDER_TOKEN = ''
#
# Tune the size of the http pool for the quotaholder client.
# It limits the maximum number of quota changing requests
# that pithos can serve. Extra requests will be blocked
# until another has completed.
#
#PITHOS_QUOTAHOLDER_POOLSIZE = 200
......@@ -64,3 +64,4 @@ USER_LOGIN_URL = getattr(settings, 'PITHOS_USER_LOGIN_URL',
USE_QUOTAHOLDER = getattr(settings, 'PITHOS_USE_QUOTAHOLDER', False)
QUOTAHOLDER_URL = getattr(settings, 'PITHOS_QUOTAHOLDER_URL', '')
QUOTAHOLDER_TOKEN = getattr(settings, 'PITHOS_QUOTAHOLDER_TOKEN', '')
QUOTAHOLDER_POOLSIZE = getattr(settings, 'PITHOS_QUOTAHOLDER_POOLSIZE', 200)
......@@ -62,6 +62,7 @@ from pithos.api.settings import (BACKEND_DB_MODULE, BACKEND_DB_CONNECTION,
BACKEND_QUEUE_MODULE, BACKEND_QUEUE_HOSTS,
BACKEND_QUEUE_EXCHANGE, USE_QUOTAHOLDER,
QUOTAHOLDER_URL, QUOTAHOLDER_TOKEN,
QUOTAHOLDER_POOLSIZE,
BACKEND_QUOTA, BACKEND_VERSIONING,
BACKEND_FREE_VERSIONING,
AUTHENTICATION_URL, AUTHENTICATION_USERS,
......@@ -967,20 +968,22 @@ else:
}
_pithos_backend_pool = PithosBackendPool(size=POOL_SIZE,
db_module=BACKEND_DB_MODULE,
db_connection=BACKEND_DB_CONNECTION,
block_module=BACKEND_BLOCK_MODULE,
block_path=BACKEND_BLOCK_PATH,
block_umask=BACKEND_BLOCK_UMASK,
queue_module=BACKEND_QUEUE_MODULE,
queue_hosts=BACKEND_QUEUE_HOSTS,
queue_exchange=BACKEND_QUEUE_EXCHANGE,
quotaholder_enabled=USE_QUOTAHOLDER,
quotaholder_url=QUOTAHOLDER_URL,
quotaholder_token=QUOTAHOLDER_TOKEN,
free_versioning=BACKEND_FREE_VERSIONING,
block_params=BLOCK_PARAMS)
_pithos_backend_pool = PithosBackendPool(
size=POOL_SIZE,
db_module=BACKEND_DB_MODULE,
db_connection=BACKEND_DB_CONNECTION,
block_module=BACKEND_BLOCK_MODULE,
block_path=BACKEND_BLOCK_PATH,
block_umask=BACKEND_BLOCK_UMASK,
queue_module=BACKEND_QUEUE_MODULE,
queue_hosts=BACKEND_QUEUE_HOSTS,
queue_exchange=BACKEND_QUEUE_EXCHANGE,
quotaholder_enabled=USE_QUOTAHOLDER,
quotaholder_url=QUOTAHOLDER_URL,
quotaholder_token=QUOTAHOLDER_TOKEN,
quotaholder_client_poolsize=QUOTAHOLDER_POOLSIZE,
free_versioning=BACKEND_FREE_VERSIONING,
block_params=BLOCK_PARAMS)
def get_backend():
backend = _pithos_backend_pool.pool_get()
......
......@@ -39,7 +39,7 @@ import logging
import hashlib
import binascii
from kamaki.clients.quotaholder import QuotaholderClient
from synnefo.lib.quotaholder import QuotaholderClient
from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend, \
AccountExists, ContainerExists, AccountNotEmpty, ContainerNotEmpty, ItemNotExists, VersionNotExists
......@@ -145,8 +145,9 @@ class ModularBackend(BaseBackend):
def __init__(self, db_module=None, db_connection=None,
block_module=None, block_path=None, block_umask=None,
queue_module=None, queue_hosts=None, queue_exchange=None,
quotaholder_enabled=True,
quotaholder_enabled=False,
quotaholder_url=None, quotaholder_token=None,
quotaholder_client_poolsize=None,
free_versioning=True, block_params=None):
db_module = db_module or DEFAULT_DB_MODULE
db_connection = db_connection or DEFAULT_DB_CONNECTION
......@@ -209,9 +210,14 @@ class ModularBackend(BaseBackend):
self.queue = NoQueue()
self.quotaholder_enabled = quotaholder_enabled
self.quotaholder_url = quotaholder_url
self.quotaholder_token = quotaholder_token
self.quotaholder = QuotaholderClient(quotaholder_url, quotaholder_token)
if quotaholder_enabled:
self.quotaholder_url = quotaholder_url
self.quotaholder_token = quotaholder_token
self.quotaholder = QuotaholderClient(
quotaholder_url,
token=quotaholder_token,
poolsize=quotaholder_client_poolsize)
self.serials = []
self.messages = []
......
......@@ -47,6 +47,7 @@ class PithosBackendPool(ObjectPool):
queue_exchange=None, free_versioning=True,
quotaholder_enabled=True,
quotaholder_url=None, quotaholder_token=None,
quotaholder_client_poolsize=None,
block_params=None):
super(PithosBackendPool, self).__init__(size=size)
self.db_module = db_module
......@@ -61,22 +62,25 @@ class PithosBackendPool(ObjectPool):
self.quotaholder_enabled = quotaholder_enabled
self.quotaholder_url = quotaholder_url
self.quotaholder_token = quotaholder_token
self.quotaholder_client_poolsize = quotaholder_client_poolsize
self.free_versioning = free_versioning
def _pool_create(self):
backend = connect_backend(db_module=self.db_module,
db_connection=self.db_connection,
block_module=self.block_module,
block_path=self.block_path,
block_umask=self.block_umask,
queue_module=self.queue_module,
block_params=self.block_params,
queue_hosts=self.queue_hosts,
queue_exchange=self.queue_exchange,
quotaholder_enabled=self.quotaholder_enabled,
quotaholder_url=self.quotaholder_url,
quotaholder_token=self.quotaholder_token,
free_versioning=self.free_versioning)
backend = connect_backend(
db_module=self.db_module,
db_connection=self.db_connection,
block_module=self.block_module,
block_path=self.block_path,
block_umask=self.block_umask,
queue_module=self.queue_module,
block_params=self.block_params,
queue_hosts=self.queue_hosts,
queue_exchange=self.queue_exchange,
quotaholder_enabled=self.quotaholder_enabled,
quotaholder_url=self.quotaholder_url,
quotaholder_token=self.quotaholder_token,
quotaholder_client_poolsize=self.quotaholder_client_poolsize,
free_versioning=self.free_versioning)
backend._real_close = backend.close
backend.close = instancemethod(_pooled_backend_close, backend,
......
......@@ -50,7 +50,7 @@ except ImportError:
raise Exception("The unittest2 package is required for Python < 2.7")
import unittest
from kamaki.clients.quotaholder import QuotaholderClient
from synnefo.lib.quotaholder import QuotaholderClient
from synnefo.lib.quotaholder.api import (InvalidKeyError, NoEntityError,
NoQuantityError, NoCapacityError,
ExportLimitError, ImportLimitError)
......
......@@ -36,8 +36,8 @@ from config import run_test_case
from config import rand_string
from config import printf
from kamaki.clients.commissioning import CallError
from kamaki.clients.quotaholder.api import (
from synnefo.lib.commissioning import CallError
from synnefo.lib.quotaholder import (
QH_PRACTICALLY_INFINITE,
InvalidDataError,
InvalidKeyError, NoEntityError,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment