Commit e3b982f4 authored by Giorgos Korfiatis's avatar Giorgos Korfiatis
Browse files

astakos: Lock chain for projects, user for syncing

Lock project's chain for every operation affecting either a project or
an application. Subsequently, lock user when needing to sync or to issue
commission to update pending_app quota.

In views, replace locking qh_add_pending_app with check_pending_app_quota,
which will only query quota, so that to respect the above mentioned order.

Order by id when locking multiple users.
parent 0ad5d74b
......@@ -44,6 +44,7 @@ from django.contrib.auth import (
from django.contrib.auth.models import AnonymousUser
from django.core.exceptions import PermissionDenied
from django.db import IntegrityError
from django.db.models import Q
from django.http import Http404
from synnefo_branding.utils import render_to_string
......@@ -64,9 +65,11 @@ from astakos.im.settings import (
from astakos.im.notifications import build_notification, NotificationError
from astakos.im.models import (
AstakosUser, Invitation, ProjectMembership, ProjectApplication, Project,
UserSetting, new_chain)
from astakos.im.quotas import (qh_sync_user,
register_pending_apps, qh_sync_project)
UserSetting, Chain, new_chain)
from astakos.im.quotas import (qh_sync_user, get_pending_app_quota,
register_pending_apps, qh_sync_project,
qh_sync_locked_users, get_users_for_update,
members_to_sync)
from astakos.im.project_notif import (
membership_change_notify, membership_enroll_notify,
membership_request_notify, membership_leave_request_notify,
......@@ -304,17 +307,22 @@ def get_project_by_name(name):
raise IOError(m)
def get_project_for_update(project_id):
def get_chain_for_update(chain_id):
try:
return Project.objects.get_for_update(id=project_id)
except Project.DoesNotExist:
m = _(astakos_messages.UNKNOWN_PROJECT_ID) % project_id
return Chain.objects.get_for_update(chain=chain_id)
except Chain.DoesNotExist:
m = _(astakos_messages.UNKNOWN_PROJECT_ID) % chain_id
raise IOError(m)
def get_application_for_update(application_id):
def get_chain_of_application_for_update(app_id):
app = get_application(app_id)
return Chain.objects.get_for_update(chain=app.chain_id)
def get_application(application_id):
try:
return ProjectApplication.objects.get_for_update(id=application_id)
return ProjectApplication.objects.get(id=application_id)
except ProjectApplication.DoesNotExist:
m = _(astakos_messages.UNKNOWN_PROJECT_APPLICATION_ID) % application_id
raise IOError(m)
......@@ -336,21 +344,19 @@ def get_user_by_uuid(uuid):
raise IOError(m)
def get_membership_for_update(project_id, user_id):
def get_membership(project_id, user_id):
try:
objs = ProjectMembership.objects
return objs.get_for_update(project__id=project_id,
person__id=user_id)
objs = ProjectMembership.objects.select_related('project', 'person')
return objs.get(project__id=project_id, person__id=user_id)
except ProjectMembership.DoesNotExist:
m = _(astakos_messages.NOT_MEMBERSHIP_REQUEST)
raise IOError(m)
def get_membership_for_update_by_id(project_id, memb_id):
def get_membership_by_id(project_id, memb_id):
try:
objs = ProjectMembership.objects
return objs.get_for_update(project__id=project_id,
id=memb_id)
objs = ProjectMembership.objects.select_related('project', 'person')
return objs.get(project__id=project_id, id=memb_id)
except ProjectMembership.DoesNotExist:
m = _(astakos_messages.NOT_MEMBERSHIP_REQUEST)
raise IOError(m)
......@@ -396,14 +402,15 @@ def accept_membership_checks(project, request_user):
def accept_membership(project_id, memb_id, request_user=None):
project = get_project_for_update(project_id)
accept_membership_checks(project, request_user)
get_chain_for_update(project_id)
membership = get_membership_for_update_by_id(project_id, memb_id)
membership = get_membership_by_id(project_id, memb_id)
if not membership.can_accept():
m = _(astakos_messages.NOT_MEMBERSHIP_REQUEST)
raise PermissionDenied(m)
project = membership.project
accept_membership_checks(project, request_user)
user = membership.person
membership.accept()
qh_sync_user(user)
......@@ -420,13 +427,15 @@ def reject_membership_checks(project, request_user):
def reject_membership(project_id, memb_id, request_user=None):
project = get_project_for_update(project_id)
reject_membership_checks(project, request_user)
membership = get_membership_for_update_by_id(project_id, memb_id)
get_chain_for_update(project_id)
membership = get_membership_by_id(project_id, memb_id)
if not membership.can_reject():
m = _(astakos_messages.NOT_MEMBERSHIP_REQUEST)
raise PermissionDenied(m)
project = membership.project
reject_membership_checks(project, request_user)
user = membership.person
membership.reject()
logger.info("Request of user %s for %s has been rejected." %
......@@ -441,13 +450,15 @@ def cancel_membership_checks(project):
def cancel_membership(project_id, request_user):
project = get_project_for_update(project_id)
cancel_membership_checks(project)
membership = get_membership_for_update(project_id, request_user.id)
get_chain_for_update(project_id)
membership = get_membership(project_id, request_user.id)
if not membership.can_cancel():
m = _(astakos_messages.NOT_MEMBERSHIP_REQUEST)
raise PermissionDenied(m)
project = membership.project
cancel_membership_checks(project)
membership.cancel()
logger.info("Request of user %s for %s has been cancelled." %
(membership.person.log_display, project))
......@@ -464,13 +475,15 @@ def remove_membership_checks(project, request_user=None):
def remove_membership(project_id, memb_id, request_user=None):
project = get_project_for_update(project_id)
remove_membership_checks(project, request_user)
membership = get_membership_for_update_by_id(project_id, memb_id)
get_chain_for_update(project_id)
membership = get_membership_by_id(project_id, memb_id)
if not membership.can_remove():
m = _(astakos_messages.NOT_ACCEPTED_MEMBERSHIP)
raise PermissionDenied(m)
project = membership.project
remove_membership_checks(project, request_user)
user = membership.person
membership.remove()
qh_sync_user(user)
......@@ -482,7 +495,8 @@ def remove_membership(project_id, memb_id, request_user=None):
def enroll_member(project_id, user, request_user=None):
project = get_project_for_update(project_id)
get_chain_for_update(project_id)
project = get_project_by_id(project_id)
accept_membership_checks(project, request_user)
membership, created = ProjectMembership.objects.get_or_create(
......@@ -524,25 +538,28 @@ def can_leave_request(project, user):
def leave_project(project_id, request_user):
project = get_project_for_update(project_id)
leave_project_checks(project)
membership = get_membership_for_update(project_id, request_user.id)
get_chain_for_update(project_id)
membership = get_membership(project_id, request_user.id)
if not membership.can_leave():
m = _(astakos_messages.NOT_ACCEPTED_MEMBERSHIP)
raise PermissionDenied(m)
project = membership.project
leave_project_checks(project)
auto_accepted = False
leave_policy = project.application.member_leave_policy
if leave_policy == AUTO_ACCEPT_POLICY:
membership.remove()
qh_sync_user(request_user)
logger.info("User %s has left %s." %
(membership.person.log_display, project))
(request_user.log_display, project))
auto_accepted = True
else:
membership.leave_request()
logger.info("User %s requested to leave %s." %
(membership.person.log_display, project))
(request_user.log_display, project))
membership_leave_request_notify(project, membership.person)
return auto_accepted
......@@ -567,7 +584,8 @@ def can_join_request(project, user):
def join_project(project_id, request_user):
project = get_project_for_update(project_id)
get_chain_for_update(project_id)
project = get_project_by_id(project_id)
join_project_checks(project)
membership, created = ProjectMembership.objects.get_or_create(
......@@ -585,12 +603,12 @@ def join_project(project_id, request_user):
membership.accept()
qh_sync_user(request_user)
logger.info("User %s joined %s." %
(membership.person.log_display, project))
(request_user.log_display, project))
auto_accepted = True
else:
membership_request_notify(project, membership.person)
logger.info("User %s requested to join %s." %
(membership.person.log_display, project))
(request_user.log_display, project))
return auto_accepted
......@@ -610,8 +628,8 @@ def submit_application(owner=None,
precursor = None
if precursor_id is not None:
objs = ProjectApplication.objects
precursor = objs.get_for_update(id=precursor_id)
get_chain_of_application_for_update(precursor_id)
precursor = ProjectApplication.objects.get(id=precursor_id)
if (request_user and
(not precursor.owner == request_user and
......@@ -646,8 +664,7 @@ def submit_application(owner=None,
chain = precursor.chain
application.chain = chain
objs = ProjectApplication.objects
q = objs.filter(chain=chain, state=ProjectApplication.PENDING)
pending = q.select_for_update()
pending = objs.filter(chain=chain, state=ProjectApplication.PENDING)
for app in pending:
app.state = ProjectApplication.REPLACED
app.save()
......@@ -662,7 +679,8 @@ def submit_application(owner=None,
def cancel_application(application_id, request_user=None, reason=""):
application = get_application_for_update(application_id)
get_chain_of_application_for_update(application_id)
application = get_application(application_id)
checkAllowed(application, request_user)
if not application.can_cancel():
......@@ -677,7 +695,8 @@ def cancel_application(application_id, request_user=None, reason=""):
def dismiss_application(application_id, request_user=None, reason=""):
application = get_application_for_update(application_id)
get_chain_of_application_for_update(application_id)
application = get_application(application_id)
checkAllowed(application, request_user)
if not application.can_dismiss():
......@@ -690,7 +709,8 @@ def dismiss_application(application_id, request_user=None, reason=""):
def deny_application(application_id, request_user=None, reason=""):
application = get_application_for_update(application_id)
get_chain_of_application_for_update(application_id)
application = get_application(application_id)
checkAllowed(application, request_user, admin_only=True)
......@@ -707,14 +727,30 @@ def deny_application(application_id, request_user=None, reason=""):
application_deny_notify(application)
def approve_application(app_id, request_user=None, reason=""):
def check_conflicting_projects(application):
try:
project = get_project_by_id(application.chain)
except IOError:
project = None
new_project_name = application.name
try:
objects = ProjectApplication.objects
application = objects.get_for_update(id=app_id)
except ProjectApplication.DoesNotExist:
m = _(astakos_messages.UNKNOWN_PROJECT_APPLICATION_ID % (app_id,))
raise PermissionDenied(m)
q = Q(name=new_project_name) & ~Q(state=Project.TERMINATED)
conflicting_project = Project.objects.get(q)
if (conflicting_project != project):
m = (_("cannot approve: project with name '%s' "
"already exists (id: %s)") % (
new_project_name, conflicting_project.id))
raise PermissionDenied(m) # invalid argument
except Project.DoesNotExist:
pass
return project
def approve_application(app_id, request_user=None, reason=""):
get_chain_of_application_for_update(app_id)
application = get_application(app_id)
checkAllowed(application, request_user, admin_only=True)
......@@ -723,9 +759,19 @@ def approve_application(app_id, request_user=None, reason=""):
(application.id, application.state_display()))
raise PermissionDenied(m)
qh_release_pending_app(application.owner)
project = application.approve(reason)
qh_sync_project(project)
project = check_conflicting_projects(application)
# Pre-lock members and owner together in order to impose an ordering
# on locking users
members = members_to_sync(project) if project is not None else []
uids_to_sync = [member.id for member in members]
owner = application.owner
uids_to_sync.append(owner.id)
get_users_for_update(uids_to_sync)
qh_release_pending_app(owner, locked=True)
application.approve(reason)
qh_sync_locked_users(members)
logger.info("%s has been approved." % (application.log_display))
application_approve_notify(application)
......@@ -741,7 +787,8 @@ def check_expiration(execute=False):
def terminate(project_id, request_user=None):
project = get_project_for_update(project_id)
get_chain_for_update(project_id)
project = get_project_by_id(project_id)
checkAllowed(project, request_user, admin_only=True)
checkAlive(project)
......@@ -753,7 +800,8 @@ def terminate(project_id, request_user=None):
def suspend(project_id, request_user=None):
project = get_project_for_update(project_id)
get_chain_for_update(project_id)
project = get_project_by_id(project_id)
checkAllowed(project, request_user, admin_only=True)
checkAlive(project)
......@@ -765,7 +813,8 @@ def suspend(project_id, request_user=None):
def resume(project_id, request_user=None):
project = get_project_for_update(project_id)
get_chain_for_update(project_id)
project = get_project_by_id(project_id)
checkAllowed(project, request_user, admin_only=True)
if not project.is_suspended:
......@@ -836,7 +885,7 @@ def count_pending_app(users):
return usage
def qh_add_pending_app(user, precursor=None, force=False, dry_run=False):
def get_pending_app_diff(user, precursor):
if precursor is None:
diff = 1
else:
......@@ -845,9 +894,26 @@ def qh_add_pending_app(user, precursor=None, force=False, dry_run=False):
q = objs.filter(chain=chain, state=ProjectApplication.PENDING)
count = q.count()
diff = 1 - count
return diff
def qh_add_pending_app(user, precursor=None, force=False):
user = AstakosUser.forupdate.get_for_update(id=user.id)
diff = get_pending_app_diff(user, precursor)
return register_pending_apps(user, diff, force)
return register_pending_apps(user, diff, force, dry_run)
def check_pending_app_quota(user, precursor=None):
diff = get_pending_app_diff(user, precursor)
quota = get_pending_app_quota(user)
limit = quota['limit']
usage = quota['usage']
if usage + diff > limit:
return False, limit
return True, None
def qh_release_pending_app(user):
def qh_release_pending_app(user, locked=False):
if not locked:
user = AstakosUser.forupdate.get_for_update(id=user.id)
register_pending_apps(user, -1)
......@@ -36,7 +36,7 @@ from django.core.management.base import CommandError
from astakos.im.models import AstakosUser
from astakos.im.quotas import (
qh_sync_users, list_user_quotas, add_base_quota)
qh_sync_users_diffs, list_user_quotas, add_base_quota)
from astakos.im.functions import get_user_by_uuid
from astakos.im.management.commands._common import is_uuid, is_email
from snf_django.lib.db.transaction import commit_on_success_strict
......@@ -117,7 +117,7 @@ class Command(SynnefoCommand):
output_format)
elif verify or sync:
qh_limits, diff_q = qh_sync_users(users, sync=sync, diff_only=True)
qh_limits, diff_q = qh_sync_users_diffs(users, sync=sync)
if verify:
self.print_verify(qh_limits, diff_q)
if sync:
......
......@@ -1473,13 +1473,6 @@ class ProjectApplication(models.Model):
return "application %s (%s) for project %s" % (
self.id, self.name, self.chain)
def get_project(self):
try:
project = Project.objects.get(id=self.chain, state=Project.APPROVED)
return Project
except Project.DoesNotExist, e:
return None
def state_display(self):
return self.APPLICATION_STATE_DISPLAY.get(self.state, _('Unknown'))
......@@ -1579,14 +1572,6 @@ class ProjectApplication(models.Model):
def project_exists(self):
return self.get_project() is not None
def _get_project_for_update(self):
try:
objects = Project.objects
project = objects.get_for_update(id=self.chain)
return project
except Project.DoesNotExist:
return None
def can_cancel(self):
return self.state == self.PENDING
......@@ -1629,41 +1614,25 @@ class ProjectApplication(models.Model):
return self.state == self.PENDING
def approve(self, reason):
new_project_name = self.name
if not self.can_approve():
m = _("cannot approve: project '%s' in state '%s'") % (
new_project_name, self.state)
self.name, self.state)
raise AssertionError(m) # invalid argument
now = datetime.now()
project = self._get_project_for_update()
try:
q = Q(name=new_project_name) & ~Q(state=Project.TERMINATED)
conflicting_project = Project.objects.get(q)
if (conflicting_project != project):
m = (_("cannot approve: project with name '%s' "
"already exists (id: %s)") % (
new_project_name, conflicting_project.id))
raise PermissionDenied(m) # invalid argument
except Project.DoesNotExist:
pass
self.state = self.APPROVED
self.response_date = now
self.response = reason
self.save()
new_project = False
project = self.get_project()
if project is None:
new_project = True
project = Project(id=self.chain)
project.name = new_project_name
project.name = self.name
project.application = self
project.last_approval_date = now
project.save()
self.state = self.APPROVED
self.response_date = now
self.response = reason
self.save()
return project
@property
......
......@@ -128,26 +128,30 @@ def get_default_quota():
SYSTEM = 'system'
PENDING_APP_RESOURCE = 'astakos.pending_app'
def register_pending_apps(user, quantity, force=False, dry_run=False):
provision = (user.uuid, SYSTEM, 'astakos.pending_app'), quantity
name = "DRYRUN" if dry_run else ""
def register_pending_apps(user, quantity, force=False):
provision = (user.uuid, SYSTEM, PENDING_APP_RESOURCE), quantity
try:
s = qh.issue_commission(clientkey='astakos',
force=force,
name=name,
provisions=[provision])
except NoCapacityError as e:
limit = e.data['limit']
return False, limit
accept = not dry_run
qh.resolve_pending_commission('astakos', s, accept)
qh.resolve_pending_commission('astakos', s)
return True, None
def get_pending_app_quota(user):
quota = get_user_quotas(user)
return quota[SYSTEM][PENDING_APP_RESOURCE]
def add_base_quota(user, resource, capacity):
resource = Resource.objects.get(name=resource)
user = get_user_for_update(user.id)
obj, created = AstakosUserQuota.objects.get_or_create(
user=user, resource=resource, defaults={
'capacity': capacity,
......@@ -156,13 +160,14 @@ def add_base_quota(user, resource, capacity):
if not created:
obj.capacity = capacity
obj.save()
qh_sync_user(user)
qh_sync_locked_user(user)
def remove_base_quota(user, resource):
user = get_user_for_update(user.id)
AstakosUserQuota.objects.filter(
user=user, resource__name=resource).delete()
qh_sync_user(user)
qh_sync_locked_user(user)
def initial_quotas(users):
......@@ -248,51 +253,69 @@ def list_user_quotas(users):
# Syncing to quotaholder
def qh_sync_users(users, sync=True, diff_only=False):
def get_users_for_update(user_ids):
uids = sorted(user_ids)
objs = AstakosUser.forupdate
return list(objs.filter(id__in=uids).order_by('id').select_for_update())
def get_user_for_update(user_id):
return get_users_for_update([user_id])[0]
def qh_sync_locked_users(users):
astakos_quotas = astakos_users_quotas(users)
_set_user_quota(astakos_quotas)
def qh_sync_users(users):
uids = [user.id for user in users]
users = get_users_for_update(uids)
qh_sync_locked_users(users)
def qh_sync_users_diffs(users, sync=True):
uids = [user.id for user in users]
if sync:
users = AstakosUser.forupdate.filter(id__in=uids).select_for_update()
users = get_users_for_update(uids)
astakos_quotas = astakos_users_quotas(users)
qh_limits = get_users_quota_limits(users)
diff_quotas = {}
for holder, local in astakos_quotas.iteritems():
registered = qh_limits.get(holder, None)
if local != registered:
diff_quotas[holder] = dict(local)
if sync:
_set_user_quota(diff_quotas)
return qh_limits, diff_quotas
if diff_only:
qh_limits = get_users_quota_limits(users)
diff_quotas = {}
for holder, local in astakos_quotas.iteritems():
registered = qh_limits.get(holder, None)
if local != registered:
diff_quotas[holder] = dict(local)
if sync:
_set_user_quota(diff_quotas)
return qh_limits, diff_quotas
else:
if sync:
_set_user_quota(astakos_quotas)
return None
def qh_sync_locked_user(user):
qh_sync_locked_users([user])