Commit 24f3ea1f authored by Giorgos Korfiatis's avatar Giorgos Korfiatis
Browse files

Advances in syncing quotaholder with astakos

Factor out trigger_sync() as lock_sync() to be used for other
sync processes in addition to sync_projects().
Implement sync_users(), which registers users and their quotas.
Use both sync processes in astakos-qh-sync management command.
parent 85577d75
......@@ -231,7 +231,7 @@ class DjangoBackend(BaseBackend):
@safe
def get_resource_usage(self, user_id):
user = self._lookup_user(user_id)
data = get_quota(user)
data = get_quota([user])
if not data:
return ()
resources = []
......
......@@ -73,18 +73,49 @@ def set_quota(payload):
logger.info('set_quota: %s rejected: %s' % (payload, result))
return result
def qh_get_quota(user, resources):
def get_entity(payload):
c = get_client()
if not c:
return
result = c.get_entity(context={}, get_entity=payload)
logger.info('get_entity: %s reply: %s' % (payload, result))
return result
def get_holding(payload):
c = get_client()
if not c:
return
result = c.get_holding(context={}, get_holding=payload)
logger.info('get_holding: %s reply: %s' % (payload, result))
return result
def qh_get_holdings(users, resources):
payload = []
append = payload.append
for user in users:
for resource in resources:
append((user.uuid, resource, ENTITY_KEY),)
result = get_holding(payload)
return result
def qh_get_quota(users, resources):
c = get_client()
if not c:
return
payload = []
append = payload.append
for r in resources:
append((user.uuid, r, ENTITY_KEY),)
for user in users:
for resource in resources:
append((user.uuid, resource, ENTITY_KEY),)
result = c.get_quota(context={}, clientkey=clientkey, get_quota=payload)
logger.info('get_quota: %s rejected: %s' % (payload, result))
return result
def qh_get_quota_limits(users, resources):
result = qh_get_quota(users, resources)
return result
def create_entity(payload):
c = get_client()
if not c:
......@@ -129,15 +160,18 @@ def add_quota_values(q1, q2):
import_limit = q1.import_limit + q2.import_limit,
export_limit = q1.export_limit + q2.export_limit)
def qh_register_user(user):
return register_users([user])
def qh_register_user_with_quotas(user):
return register_users_with_quotas([user])
def register_users(users):
rejected = create_users(users)
def register_users_with_quotas(users):
rejected = register_users(users)
if not rejected:
register_quotas(users)
def create_users(users):
def register_users(users):
if not users:
return
payload = list(CreateEntityPayload(
entity=u.uuid,
owner='system',
......@@ -146,6 +180,9 @@ def create_users(users):
return create_entity(payload)
def register_quotas(users):
if not users:
return
payload = []
append = payload.append
for u in users:
......@@ -186,6 +223,20 @@ def register_resources(resources):
flags=0) for resource in resources)
return set_quota(payload)
def qh_check_users(users):
payload = [(u.uuid, ENTITY_KEY) for u in users]
result = get_entity(payload)
uuids = [entity for (entity, owner) in result]
existing = []
nonexisting = []
for u in users:
if u.uuid in uuids:
existing.append(u)
else:
nonexisting.append(u)
return (existing, nonexisting)
def qh_add_quota(serial, sub_list, add_list):
if not QUOTAHOLDER_URL:
return ()
......
......@@ -66,14 +66,14 @@ from astakos.im.settings import (
from astakos.im.notifications import build_notification, NotificationError
from astakos.im.models import (
AstakosUser, ProjectMembership, ProjectApplication, Project,
trigger_sync, PendingMembershipError, get_resource_names)
sync_projects, PendingMembershipError, get_resource_names)
from astakos.im.models import submit_application as models_submit_application
from astakos.im.project_notif import (
membership_change_notify,
application_submit_notify, application_approve_notify,
application_deny_notify,
project_termination_notify, project_suspension_notify)
from astakos.im.endpoints.qh import qh_register_user, qh_get_quota
from astakos.im.endpoints.qh import qh_register_user_with_quotas, qh_get_quota
import astakos.im.messages as astakos_messages
......@@ -299,7 +299,7 @@ def activate(
if not user.activation_sent:
user.activation_sent = datetime.now()
user.save()
qh_register_user(user)
qh_register_user_with_quotas(user)
send_helpdesk_notification(user, helpdesk_email_template_name)
send_greeting(user, email_template_name)
......@@ -377,9 +377,9 @@ class SendNotificationError(SendMailError):
super(SendNotificationError, self).__init__()
def get_quota(user):
def get_quota(users):
resources = get_resource_names()
return qh_get_quota(user, resources)
return qh_get_quota(users, resources)
### PROJECT VIEWS ###
......@@ -507,7 +507,7 @@ def do_accept_membership(project_id, user, request_user=None):
membership = get_membership_for_update(project, user)
membership.accept()
trigger_sync()
sync_projects()
membership_change_notify(project, membership.person, 'accepted')
......@@ -560,7 +560,7 @@ def do_remove_membership(project_id, user, request_user=None):
membership = get_membership_for_update(project, user)
membership.remove()
trigger_sync()
sync_projects()
membership_change_notify(project, membership.person, 'removed')
......@@ -576,7 +576,7 @@ def do_enroll_member(project_id, user, request_user=None):
membership = create_membership(project_id, user)
membership.accept()
trigger_sync()
sync_projects()
# TODO send proper notification
return membership
......@@ -606,7 +606,7 @@ def do_leave_project(project_id, user_id):
leave_policy = project.application.member_leave_policy
if leave_policy == AUTO_ACCEPT_POLICY:
membership.remove()
trigger_sync()
sync_projects()
else:
membership.leave_request_date = datetime.now()
membership.save()
......@@ -638,7 +638,7 @@ def do_join_project(project_id, user_id):
if (join_policy == AUTO_ACCEPT_POLICY and
not project.violates_members_limit(adding=1)):
membership.accept()
trigger_sync()
sync_projects()
return membership
def submit_application(kw, request_user=None):
......@@ -698,7 +698,7 @@ def approve_application(app):
raise PermissionDenied()
application.approve()
trigger_sync()
sync_projects()
application_approve_notify(application)
......@@ -716,7 +716,7 @@ def terminate(project_id):
checkAlive(project)
project.terminate()
trigger_sync()
sync_projects()
project_termination_notify(project)
......@@ -725,7 +725,7 @@ def suspend(project_id):
checkAlive(project)
project.suspend()
trigger_sync()
sync_projects()
project_suspension_notify(project)
......@@ -737,4 +737,4 @@ def resume(project_id):
raise PermissionDenied(m)
project.resume()
trigger_sync()
sync_projects()
......@@ -31,30 +31,45 @@
# interpreted as representing official policies, either expressed
# or implied, of GRNET S.A.
from django.core.management.base import NoArgsCommand, CommandError
from optparse import make_option
from django.core.management.base import BaseCommand, CommandError
from django.db import transaction
from astakos.im.models import AstakosUser, Resource
from astakos.im.endpoints.qh import register_users, register_resources
from astakos.im.models import sync_all_users, sync_projects
import logging
logger = logging.getLogger(__name__)
class Command(BaseCommand):
help = "Inspect quotaholder status and sync"
class Command(NoArgsCommand):
help = "Send user information and resource quota in the Quotaholder"
option_list = BaseCommand.option_list + (
make_option('--users',
action='store_true',
dest='users',
default=False,
help="Check if users and their quotas are in sync with quotaholder"),
make_option('--projects',
action='store_true',
dest='projects',
default=False,
help="Check if projects are in sync with quotaholder"),
make_option('--execute',
action='store_true',
dest='execute',
default=False,
help="Perform the actual operation"),
)
@transaction.commit_on_success
def handle(self, *args, **options):
execute = options['execute']
def handle_noargs(self, **options):
try:
resources = list(Resource.objects.all())
print("Registering resources")
register_resources(resources)
print("Registering users")
users = list(AstakosUser.objects.verified().all())
if users:
register_users(users)
else:
print(" -> No verified users found.")
if options['users']:
log = sync_all_users(execute=execute)
elif options['projects']:
log = sync_projects(execute=execute)
except BaseException, e:
logger.exception(e)
raise CommandError("Syncing failed.")
......@@ -70,6 +70,7 @@ from astakos.im.settings import (
SITENAME, SERVICES, MODERATION_ENABLED, RESOURCES_PRESENTATION_DATA)
from astakos.im import settings as astakos_settings
from astakos.im.endpoints.qh import (
register_users, register_quotas, qh_check_users, qh_get_quota_limits,
register_services, register_resources, qh_add_quota, QuotaLimits,
qh_query_serials, qh_ack_serials,
QuotaValues, add_quota_values)
......@@ -2010,7 +2011,7 @@ def sync_finish_serials(serials_to_ack=None):
qh_ack_serials(list(serials_to_ack))
return len(memberships)
def pre_sync():
def pre_sync_projects():
ACCEPTED = ProjectMembership.ACCEPTED
PROJECT_DEACTIVATED = ProjectMembership.PROJECT_DEACTIVATED
psfu = Project.objects.select_for_update()
......@@ -2044,7 +2045,7 @@ def pre_sync():
membership.state = PROJECT_DEACTIVATED
membership.save()
def do_sync():
def do_sync_projects():
ACCEPTED = ProjectMembership.ACCEPTED
objects = ProjectMembership.objects.select_for_update()
......@@ -2085,7 +2086,7 @@ def do_sync():
return serial
def post_sync():
def post_sync_projects():
ACCEPTED = ProjectMembership.ACCEPTED
PROJECT_DEACTIVATED = ProjectMembership.PROJECT_DEACTIVATED
psfu = Project.objects.select_for_update()
......@@ -2120,14 +2121,48 @@ def post_sync():
transaction.commit()
def sync_projects():
def _sync_projects(execute):
sync_finish_serials()
pre_sync()
serial = do_sync()
if not execute:
# Do some reporting and
return
pre_sync_projects()
serial = do_sync_projects()
sync_finish_serials([serial])
post_sync()
post_sync_projects()
def sync_projects(execute=True, retries=3, retry_wait=1.0):
return lock_sync(_sync_projects,
args=[execute],
retries=retries,
retry_wait=retry_wait)
def _sync_users(users, execute):
sync_finish_serials()
existing, nonexisting = qh_check_users(users)
resources = get_resource_names()
quotas = qh_get_quota_limits(existing, resources)
def trigger_sync(retries=3, retry_wait=1.0):
if execute:
r = register_users(nonexisting)
r = register_quotas(users)
# TODO: some proper reporting
return (existing, nonexisting, quotas)
def sync_users(users, execute=True, retries=3, retry_wait=1.0):
return lock_sync(_sync_users,
args=[users, execute],
retries=retries,
retry_wait=retry_wait)
def sync_all_users(execute=True, retries=3, retry_wait=1.0):
users = AstakosUser.objects.all()
return sync_users(users, execute, retries=retries, retry_wait=retry_wait)
def lock_sync(func, args=[], kwargs={}, retries=3, retry_wait=1.0):
transaction.commit()
cursor = connection.cursor()
......@@ -2148,8 +2183,7 @@ def trigger_sync(retries=3, retry_wait=1.0):
return False
sleep(retry_wait)
sync_projects()
return True
return func(*args, **kwargs)
finally:
if locked:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment