Commit 75e1804b authored by Giorgos Korfiatis's avatar Giorgos Korfiatis
Browse files

wip Update astakos.im to use local quotaholder

Import directly quotaholder.callpoint in endpoint/qh.py
instead of an HTTP client.
Remove code related to entities, such as registering users and services.
Remove sync_projects logic.
parent 29985fb5
......@@ -42,15 +42,11 @@ from django.utils.translation import ugettext as _
from astakos.im.settings import (
QUOTAHOLDER_URL, QUOTAHOLDER_TOKEN, LOGGING_LEVEL)
if QUOTAHOLDER_URL:
from synnefo.lib.quotaholder import (
QuotaholderClient, QH_PRACTICALLY_INFINITE)
from astakos.quotaholder.callpoint import QuotaholderDjangoDBCallpoint
from synnefo.util.number import strbigdec
from astakos.im.settings import QUOTAHOLDER_POOLSIZE
ENTITY_KEY = '1'
from astakos.quotaholder.api import QH_PRACTICALLY_INFINITE
logger = logging.getLogger(__name__)
......@@ -65,10 +61,7 @@ def get_client():
global _client
if _client:
return _client
if not QUOTAHOLDER_URL:
return
_client = QuotaholderClient(QUOTAHOLDER_URL, token=QUOTAHOLDER_TOKEN,
poolsize=QUOTAHOLDER_POOLSIZE)
_client = QuotaholderDjangoDBCallpoint()
return _client
......@@ -78,22 +71,11 @@ def set_quota(payload):
return
if payload == []:
return []
result = c.set_quota(context={}, clientkey=clientkey, set_quota=payload)
result = c.set_quota(context={}, set_quota=payload)
logger.debug('set_quota: %s rejected: %s' % (payload, result))
return result
def get_entity(payload):
c = get_client()
if not c:
return
if payload == []:
return []
result = c.get_entity(context={}, get_entity=payload)
logger.debug('get_entity: %s reply: %s' % (payload, result))
return result
def get_holding(payload):
c = get_client()
if not c:
......@@ -110,7 +92,7 @@ def qh_get_holdings(users, resources):
append = payload.append
for user in users:
for resource in resources:
append((user.uuid, resource, ENTITY_KEY),)
append((user.uuid, resource),)
result = get_holding(payload)
return result
......@@ -149,11 +131,11 @@ def qh_get_quota(users, resources):
append = payload.append
for user in users:
for resource in resources:
append((user.uuid, resource, ENTITY_KEY),)
append((user.uuid, resource),)
if payload == []:
return []
result = c.get_quota(context={}, clientkey=clientkey, get_quota=payload)
result = c.get_quota(context={}, get_quota=payload)
logger.debug('get_quota: %s rejected: %s' % (payload, result))
return result
......@@ -168,34 +150,14 @@ def qh_get_quotas(users, resources):
return quotas_per_user_from_get(result)
def create_entity(payload):
c = get_client()
if not c:
return
if payload == []:
return []
result = c.create_entity(
context={}, clientkey=clientkey, create_entity=payload)
logger.debug('create_entity: %s rejected: %s' % (payload, result))
return result
SetQuotaPayload = namedtuple('SetQuotaPayload', ('holder',
'resource',
'key',
'quantity',
'capacity',
'import_limit',
'export_limit',
'flags'))
GetQuotaPayload = namedtuple('GetQuotaPayload', ('holder',
'resource',
'key'))
CreateEntityPayload = namedtuple('CreateEntityPayload', ('entity',
'owner',
'key',
'ownerkey'))
QuotaLimits = namedtuple('QuotaLimits', ('holder',
'resource',
'capacity',
......@@ -230,18 +192,6 @@ def add_quota_values(q1, q2):
export_limit = q1.export_limit + q2.export_limit)
def register_users(users):
if not users:
return
payload = list(CreateEntityPayload(
entity=u.uuid,
owner='system',
key=ENTITY_KEY,
ownerkey='') for u in users)
return create_entity(payload)
def register_quotas(quotas):
if not quotas:
return
......@@ -253,7 +203,6 @@ def register_quotas(quotas):
append(SetQuotaPayload(
holder=uuid,
resource=resource,
key=ENTITY_KEY,
quantity=q.quantity,
capacity=q.capacity,
import_limit=q.import_limit,
......@@ -273,7 +222,6 @@ def send_quotas(userquotas):
append(SetQuotaPayload(
holder=holder,
resource=resource,
key=ENTITY_KEY,
quantity=q.quantity,
capacity=q.capacity,
import_limit=q.import_limit,
......@@ -282,39 +230,11 @@ def send_quotas(userquotas):
return set_quota(payload)
def register_services(services):
def payload(services):
return list(CreateEntityPayload(
entity=service,
owner='system',
key=ENTITY_KEY,
ownerkey='')
for service in set(services))
if not services:
return
existing = create_entity(payload(services))
if 0 < len(existing) < len(services):
nonexisting = [s for i, s in enumerate(services)
if i not in existing]
r = create_entity(payload(nonexisting))
if r:
failed = [s for i, s in enumerate(nonexisting)
if i in r]
m = "Failed to register services: %s" % (failed,)
raise RuntimeError(m)
def register_resources(resources):
try:
QH_PRACTICALLY_INFINITE
except NameError:
return
payload = list(SetQuotaPayload(
holder=resource.service,
resource=resource,
key=ENTITY_KEY,
quantity=QH_PRACTICALLY_INFINITE,
capacity=QH_PRACTICALLY_INFINITE,
import_limit=QH_PRACTICALLY_INFINITE,
......@@ -323,25 +243,7 @@ def register_resources(resources):
return set_quota(payload)
def qh_check_users(users):
payload = [(u.uuid, ENTITY_KEY) for u in users]
result = get_entity(payload)
uuids = [entity for (entity, owner) in result]
existing = []
nonexisting = []
for u in users:
if u.uuid in uuids:
existing.append(u)
else:
nonexisting.append(u)
return (existing, nonexisting)
def qh_add_quota(serial, sub_list, add_list):
if not QUOTAHOLDER_URL:
return ()
def qh_add_quota(sub_list, add_list):
context = {}
c = get_client()
......@@ -351,47 +253,22 @@ def qh_add_quota(serial, sub_list, add_list):
add_append = add_quota.append
for ql in sub_list:
args = (ql.holder, ql.resource, ENTITY_KEY,
args = (ql.holder, ql.resource,
0, ql.capacity, ql.import_limit, ql.export_limit)
sub_append(args)
for ql in add_list:
args = (ql.holder, ql.resource, ENTITY_KEY,
args = (ql.holder, ql.resource,
0, ql.capacity, ql.import_limit, ql.export_limit)
add_append(args)
result = c.add_quota(context=context,
clientkey=clientkey,
serial=serial,
sub_quota=sub_quota,
add_quota=add_quota)
return result
def qh_query_serials(serials):
if not QUOTAHOLDER_URL:
return ()
context = {}
c = get_client()
result = c.query_serials(context=context,
clientkey=clientkey,
serials=serials)
return result
def qh_ack_serials(serials):
if not QUOTAHOLDER_URL:
return ()
context = {}
c = get_client()
result = c.ack_serials(context=context,
clientkey=clientkey,
serials=serials)
return
from datetime import datetime
strptime = datetime.strptime
......@@ -500,7 +377,6 @@ def traffic_units(timeline, after, before, details=0):
def timeline_charge(entity, resource, after, before, details, charge_type):
key = '1'
if charge_type == 'charge_usage':
charge_units = usage_units
elif charge_type == 'charge_traffic':
......@@ -515,6 +391,6 @@ def timeline_charge(entity, resource, after, before, details, charge_type):
context={},
after=after,
before=before,
get_timeline=[[entity, resource, key]])
get_timeline=[[entity, resource]])
cu = charge_units(timeline, after, before, details=details)
return cu
......@@ -79,7 +79,7 @@ from astakos.im.project_notif import (
application_deny_notify,
project_termination_notify, project_suspension_notify)
from astakos.im.endpoints.qh import (
register_users, register_quotas, qh_get_quota)
register_quotas, qh_get_quota)
import astakos.im.messages as astakos_messages
......@@ -297,7 +297,7 @@ def activate(
if not user.activation_sent:
user.activation_sent = datetime.now()
user.save()
register_user_with_quotas(user)
register_user_quotas(user)
send_helpdesk_notification(user, helpdesk_email_template_name)
send_greeting(user, email_template_name)
......@@ -375,11 +375,9 @@ class SendNotificationError(SendMailError):
super(SendNotificationError, self).__init__()
def register_user_with_quotas(user):
rejected = register_users([user])
if not rejected:
quotas = users_quotas([user])
register_quotas(quotas)
def register_user_quotas(user):
quotas = users_quotas([user])
register_quotas(quotas)
def get_quota(users):
......
# Copyright 2012-2013 GRNET S.A. All rights reserved.
#
# Redistribution and use in source and binary forms, with or
# without modification, are permitted provided that the following
# conditions are met:
#
# 1. Redistributions of source code must retain the above
# copyright notice, this list of conditions and the following
# disclaimer.
#
# 2. Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials
# provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# The views and conclusions contained in the software and
# documentation are those of the authors and should not be
# interpreted as representing official policies, either expressed
# or implied, of GRNET S.A.
from django.db import transaction
from django.db import connection
from time import sleep
import logging
logger = logging.getLogger(__name__)
def with_lock(retries=3, retry_wait=1.0):
def wrap(func):
def inner(*args, **kwargs):
transaction.commit()
_retries = retries
cursor = connection.cursor()
locked = True
try:
while 1:
cursor.execute("SELECT pg_try_advisory_lock(1)")
r = cursor.fetchone()
if r is None:
m = "Impossible"
raise AssertionError(m)
locked = r[0]
if locked:
break
_retries -= 1
if _retries <= 0:
return False
sleep(retry_wait)
return func(*args, **kwargs)
finally:
if locked:
try:
transaction.commit()
except Exception as e:
logger.exception(e)
transaction.rollback()
cursor.execute("SELECT pg_advisory_unlock(1)")
cursor.fetchall()
return inner
return wrap
......@@ -33,11 +33,11 @@
from optparse import make_option
from django.core.management.base import BaseCommand, CommandError
from django.db import transaction
from astakos.im.models import sync_all_users, sync_users, AstakosUser
from astakos.im.functions import get_user_by_uuid
from astakos.im.management.commands._common import is_uuid, is_email
from synnefo.lib.db.transaction import commit_on_success_strict
import logging
logger = logging.getLogger(__name__)
......@@ -80,17 +80,17 @@ class Command(BaseCommand):
else:
log = self.run(sync)
ex, nonex, qh_l, qh_c, astakos_i, diff_q, info = log
qh_l, qh_c, astakos_i, diff_q, info = log
if list_only:
self.list_quotas(qh_l, qh_c, astakos_i, info)
else:
if verify:
self.print_verify(nonex, qh_l, diff_q)
self.print_verify(qh_l, diff_q)
if sync:
self.print_sync(diff_q)
@transaction.commit_on_success
@commit_on_success_strict()
def run_sync_user(self, user_ident, sync):
if is_uuid(user_ident):
try:
......@@ -116,7 +116,7 @@ class Command(BaseCommand):
logger.exception(e)
raise CommandError("Failed to compute quotas.")
@transaction.commit_on_success
@commit_on_success_strict()
def run(self, sync):
try:
self.stderr.write("Calculating all quotas...\n")
......@@ -165,16 +165,9 @@ class Command(BaseCommand):
self.stdout.write("%s (%s)\n" % (holder, user.username))
def print_verify(self,
nonexisting,
qh_limits,
diff_quotas):
if nonexisting:
self.stdout.write("Users not registered in quotaholder:\n")
for user in nonexisting:
self.stdout.write("%s\n" % (user))
self.stdout.write("\n")
for holder, local in diff_quotas.iteritems():
registered = qh_limits.pop(holder, None)
user = get_user_by_uuid(holder)
......
......@@ -72,17 +72,15 @@ from astakos.im.settings import (
PROJECT_MEMBER_JOIN_POLICIES, PROJECT_MEMBER_LEAVE_POLICIES, PROJECT_ADMINS)
from astakos.im import settings as astakos_settings
from astakos.im.endpoints.qh import (
register_users, send_quotas, qh_check_users, qh_get_quotas,
register_services, register_resources, qh_add_quota, QuotaLimits,
qh_query_serials, qh_ack_serials,
send_quotas, qh_get_quotas,
register_resources, qh_add_quota, QuotaLimits,
QuotaValues, add_quota_values)
from astakos.im import auth_providers as auth
import astakos.im.messages as astakos_messages
from astakos.im.lock import with_lock
from synnefo.lib.db.managers import ForUpdateManager
from synnefo.lib.quotaholder.api import QH_PRACTICALLY_INFINITE
from astakos.quotaholder.api import QH_PRACTICALLY_INFINITE
from synnefo.lib.db.intdecimalfield import intDecimalField
from synnefo.util.text import uenc, udec
......@@ -255,7 +253,6 @@ def load_service_resources():
import traceback; traceback.print_exc()
continue
register_services(ss)
register_resources(rs)
def _quota_values(capacity):
......@@ -2400,242 +2397,22 @@ class ProjectMembership(models.Model):
return (sub_list, add_list)
def is_fully_applied(self, project=None):
if project is None:
project = self.project
if project.is_deactivated():
return self.application is None
else:
return self.application_id == project.application_id
def set_sync(self):
if not self.is_pending:
m = _("%s: attempt to sync a non pending membership") % (self,)
raise AssertionError(m)
state = self.state
if state in self.ACTUALLY_ACCEPTED:
pending_application = self.pending_application
self.application = pending_application
self.is_active = (self.application is not None)
self.pending_application = None
self.pending_serial = None
# project.application may have changed in the meantime,
# in which case we stay PENDING;
# we are safe to check due to select_for_update
self.is_pending = not self.is_fully_applied()
self.save()
elif state == self.REMOVED:
self.delete()
else:
m = _("%s: attempt to sync in state '%s'") % (self, state)
raise AssertionError(m)
def reset_sync(self):
if not self.is_pending:
m = _("%s: attempt to reset a non pending membership") % (self,)
raise AssertionError(m)
state = self.state
if state in [self.ACCEPTED, self.LEAVE_REQUESTED, self.REMOVED]:
self.pending_application = None
self.pending_serial = None
self.save()
else:
m = _("%s: attempt to reset sync in state '%s'") % (self, state)
raise AssertionError(m)
class Serial(models.Model):
serial = models.AutoField(primary_key=True)
def new_serial():
s = Serial.objects.create()
serial = s.serial
s.delete()
return serial
class SyncError(Exception):
pass
def reset_serials(serials):
objs = ProjectMembership.objects
q = objs.filter(pending_serial__in=serials).select_for_update()
memberships = list(q)
if memberships:
for membership in memberships:
membership.reset_sync()
transaction.commit()
def sync_finish_serials(serials_to_ack=None):
if serials_to_ack is None:
serials_to_ack = qh_query_serials([])
serials_to_ack = set(serials_to_ack)
objs = ProjectMembership.objects
q = objs.filter(pending_serial__isnull=False).select_for_update()
memberships = list(q)
if memberships:
for membership in memberships:
serial = membership.pending_serial
if serial in serials_to_ack:
membership.set_sync()
else:
membership.reset_sync()
transaction.commit()
qh_ack_serials(list(serials_to_ack))
return len(memberships)
def _pre_sync_projects(projects):
for project in projects:
objects = project.projectmembership_set
memberships = objects.actually_accepted().select_for_update()
for membership in memberships:
if not membership.is_fully_applied(project):
membership.is_pending = True
membership.save()
def pre_sync_projects(sync=True):
objs = Project.objects
modified = list(objs.modified_projects().select_for_update())
reactivating = list(objs.reactivating_projects().select_for_update())
deactivating = list(objs.deactivating_projects().select_for_update())
if sync:
_pre_sync_projects(modified)
_pre_sync_projects(reactivating)
_pre_sync_projects(deactivating)
# transaction.commit()
return (modified, reactivating, deactivating)
def set_sync_projects(exclude=None):
ACTUALLY_ACCEPTED = ProjectMembership.ACTUALLY_ACCEPTED
objects = ProjectMembership.objects
sub_quota, add_quota = [], []
serial = new_serial()
pending = objects.filter(is_pending=True).select_for_update()
for membership in pending:
if membership.pending_application:
m = "%s: impossible: pending_application is not None (%s)" % (