Commit 7fd21948 authored by Olga Brani's avatar Olga Brani
Browse files

Merge branch 'latest-quota' of https://code.grnet.gr/git/synnefo into tmp

Conflicts:
	snf-astakos-app/astakos/im/models.py
	snf-astakos-app/astakos/im/target/twitter.py
parents 02ce19b3 ace3a71a
......@@ -118,24 +118,33 @@ QuotaLimits = namedtuple('QuotaLimits', ('holder',
'import_limit',
'export_limit'))
def qh_add_quota(serial, quotalimits_list):
def qh_add_quota(serial, sub_list, add_list):
if not QUOTAHOLDER_URL:
return ()
context = {}
c = get_client()
data = []
append = data.append
for ql in quotalimits_list:
sub_quota = []
sub_append = sub_quota.append
add_quota = []
add_append = add_quota.append
for ql in sub_quota:
args = (ql.holder, ql.resource, ENTITY_KEY,
0, ql.capacity, ql.import_limit, ql.export_limit)
append(args)
sub_append(args)
for ql in add_quota:
args = (ql.holder, ql.resource, ENTITY_KEY,
0, ql.capacity, ql.import_limit, ql.export_limit)
add_append(args)
result = c.add_quota(context=context,
clientkey=clientkey,
serial=serial,
add_quota=data)
sub_quota=sub_quota,
add_quota=add_quota)
return result
......
......@@ -1501,9 +1501,6 @@ class ProjectApplication(models.Model):
self.state = APPROVED
self.save()
transaction.commit()
trigger_sync()
class ProjectResourceGrant(models.Model):
......@@ -1682,41 +1679,6 @@ class Project(models.Model):
# logger.error(e.messages)
class ExclusiveOrRaise(object):
"""Context Manager to exclusively execute a critical code section.
The exclusion must be global.
(IPC semaphores will not protect across OS,
DB locks will if it's the same DB)
"""
class Busy(Exception):
pass
def __init__(self, locked=False):
init = 0 if locked else 1
from multiprocessing import Semaphore
self._sema = Semaphore(init)
def enter(self):
acquired = self._sema.acquire(False)
if not acquired:
raise self.Busy()
def leave(self):
self._sema.release()
def __enter__(self):
self.enter()
return self
def __exit__(self, exc_type, exc_value, exc_traceback):
self.leave()
exclusive_or_raise = ExclusiveOrRaise(locked=False)
class ProjectMembership(models.Model):
person = models.ForeignKey(AstakosUser)
......@@ -1783,7 +1745,6 @@ class ProjectMembership(models.Model):
self._set_history_item(reason='ACCEPT', date=now)
self.state = self.PENDING
self.save()
trigger_sync()
def remove(self):
if state != self.ACCEPTED:
......@@ -1793,7 +1754,6 @@ class ProjectMembership(models.Model):
self._set_history_item(reason='REMOVE')
self.state = self.REMOVING
self.save()
trigger_sync()
def reject(self):
if state != self.REQUESTED:
......@@ -1805,59 +1765,41 @@ class ProjectMembership(models.Model):
self._set_history_item(reason='REJECT')
self.delete()
def get_diff_quotas(self, limits_list=None, remove=False):
if limits_list is None:
limits_list = []
def get_diff_quotas(self, sub_list=None, add_list=None, remove=False):
if sub_list is None:
sub_list = []
if add_list is None:
add_list = []
append = limits_list.append
sub_append = sub_list.append
add_append = add_list.append
holder = self.person.username
key = "1"
tmp_grants = {}
synced_application = self.application
if synced_application is not None:
# first, inverse all current limits, and index them by resource name
cur_grants = synced_application.resource_grants.all()
f = -1
for grant in cur_grants:
name = grant.resource.name
tmp_grants[name] = QuotaLimits(
holder = holder,
resource = name,
capacity = f * grant.member_capacity,
import_limit = f * grant.member_import_limit,
export_limit = f * grant.member_export_limit)
sub_append(QuotaLimits(
holder = holder,
resource = grant.resource.name,
capacity = grant.member_capacity,
import_limit = grant.member_import_limit,
export_limit = grant.member_export_limit))
if not remove:
# second, add each new limit to its inverted current
new_grants = self.pending_application.projectresourcegrant_set.all()
for new_grant in new_grants:
name = new_grant.resource.name
cur_grant = tmp_grants.pop(name, None)
if cur_grant is None:
# if limits on a new resource, set 0 current values
capacity = 0
import_limit = 0
export_limit = 0
else:
capacity = cur_grant.capacity
import_limit = cur_grant.import_limit
export_limit = cur_grant.export_limit
capacity += new_grant.member_capacity
import_limit += new_grant.member_import_limit
export_limit += new_grant.member_export_limit
append(QuotaLimits(holder = holder,
key = key,
resource = name,
capacity = capacity,
import_limit = import_limit,
export_limit = export_limit))
add_append(QuotaLimits(
holder = holder,
resource = new_grant.resource.name,
capacity = new_grant.capacity,
import_limit = new_grant.import_limit,
export_limit = new_grant.export_limit))
# third, append all the inverted current limits for removed resources
limits_list.extend(tmp_grants.itervalues())
return limits_list
return (sub_list, add_list)
def set_sync(self):
state = self.state
......@@ -1914,7 +1856,7 @@ def sync_projects():
REMOVING = ProjectMembership.REMOVING
objects = ProjectMembership.objects.select_for_update()
quotas = []
sub_quota, add_quota = [], []
serial = new_serial()
......@@ -1932,7 +1874,7 @@ def sync_projects():
membership.pending_application = membership.project.application
membership.pending_serial = serial
membership.get_diff_quotas(quotas)
membership.get_diff_quotas(sub_quota, add_quota)
membership.save()
removing = objects.filter(state=REMOVING)
......@@ -1948,7 +1890,7 @@ def sync_projects():
raise AssertionError(m)
membership.pending_serial = serial
membership.get_diff_quotas(quotas, remove=True)
membership.get_diff_quotas(sub_quota, add_quota, remove=True)
membership.save()
transaction.commit()
......@@ -1957,7 +1899,7 @@ def sync_projects():
# which has been scheduled to sync with the old project.application
# Need to check in ProjectMembership.set_sync()
qh_add_quota(serial, quotas)
qh_add_quota(serial, sub_quota, add_quota)
sync_finish_serials()
......@@ -1980,6 +1922,7 @@ def trigger_sync(retries=3, retry_wait=1.0):
return False
sleep(retry_wait)
transaction.commit()
sync_projects()
return True
......@@ -2136,3 +2079,4 @@ def renew_token(sender, instance, **kwargs):
instance.renew_token()
pre_save.connect(renew_token, sender=AstakosUser)
pre_save.connect(renew_token, sender=Service)
......@@ -46,7 +46,6 @@ from django.shortcuts import get_object_or_404
from urlparse import urlunsplit, urlsplit
from astakos.im.util import prepare_response, get_context
from astakos.im.views import requires_anonymous, render_response, \
requires_auth_provider, required_auth_methods_assigned
from astakos.im.settings import ENABLE_LOCAL_ACCOUNT_MIGRATION, BASEURL
......
This diff is collapsed.
......@@ -239,6 +239,9 @@ class QuotaholderAPI(Specificator):
context = Context,
clientkey = ClientKey,
serial = Serial,
sub_quota = ListOf( Entity, Resource, Key,
QuantityDelta, CapacityDelta,
ImportLimitDelta, ExportLimitDelta ),
add_quota = ListOf( Entity, Resource, Key,
QuantityDelta, CapacityDelta,
ImportLimitDelta, ExportLimitDelta )
......
......@@ -506,64 +506,72 @@ class QuotaholderDjangoDBCallpoint(Callpoint):
raise ReturnButFail(rejected)
return rejected
def add_quota(self, context={}, clientkey=None, serial=None, add_quota=()):
def add_quota(self, context={}, clientkey=None, serial=None,
sub_quota=(), add_quota=()):
rejected = []
append = rejected.append
all_pairs = [(q[0], q[1]) for q in add_quota]
if serial is not None:
if clientkey is None:
all_pairs = [(q[0], q[1]) for q in sub_quota + add_quota]
raise ReturnButFail(all_pairs)
try:
cs = CallSerial.objects.get(serial=serial, clientkey=clientkey)
all_pairs = [(q[0], q[1]) for q in sub_quota + add_quota]
raise ReturnButFail(all_pairs)
except CallSerial.DoesNotExist:
pass
for ( entity, resource, key,
quantity, capacity,
import_limit, export_limit ) in add_quota:
try:
e = Entity.objects.get(entity=entity, key=key)
except Entity.DoesNotExist:
append((entity, resource))
continue
for removing, source in [(True, sub_quota), (False, add_quota)]:
for ( entity, resource, key,
quantity, capacity,
import_limit, export_limit ) in source:
try:
h = db_get_holding(entity=entity, resource=resource,
for_update=True)
p = h.policy
except Holding.DoesNotExist:
h = Holding(entity=e, resource=resource, flags=0)
p = None
try:
e = Entity.objects.get(entity=entity, key=key)
except Entity.DoesNotExist:
append((entity, resource))
continue
policy = newname('policy_')
newp = Policy(policy=policy)
newp.quantity = _add(p.quantity if p else 0, quantity)
newp.capacity = _add(p.capacity if p else 0, capacity)
newp.import_limit = _add(p.import_limit if p else 0,
import_limit)
newp.export_limit = _add(p.export_limit if p else 0,
export_limit)
new_values = [newp.capacity,
newp.import_limit, newp.export_limit]
if any(map(_isneg, new_values)):
append((entity, resource))
continue
try:
h = db_get_holding(entity=entity, resource=resource,
for_update=True)
p = h.policy
except Holding.DoesNotExist:
if removing:
append((entity, resource))
continue
h = Holding(entity=e, resource=resource, flags=0)
p = None
policy = newname('policy_')
newp = Policy(policy=policy)
newp.quantity = _add(p.quantity if p else 0, quantity,
invert=removing)
newp.capacity = _add(p.capacity if p else 0, capacity,
invert=removing)
newp.import_limit = _add(p.import_limit if p else 0,
import_limit, invert=removing)
newp.export_limit = _add(p.export_limit if p else 0,
export_limit, invert=removing)
new_values = [newp.capacity,
newp.import_limit, newp.export_limit]
if any(map(_isneg, new_values)):
append((entity, resource))
continue
h.policy = newp
h.policy = newp
# the order is intentionally reversed so that it
# would break if we are not within a transaction.
# Has helped before.
h.save()
newp.save()
# the order is intentionally reversed so that it
# would break if we are not within a transaction.
# Has helped before.
h.save()
newp.save()
if p is not None and p.holding_set.count() == 0:
p.delete()
if p is not None and p.holding_set.count() == 0:
p.delete()
if rejected:
raise ReturnButFail(rejected)
......@@ -964,10 +972,12 @@ class QuotaholderDjangoDBCallpoint(Callpoint):
return timeline
def _add(x, y):
def _add(x, y, invert=False):
if invert and y is None:
return 0
if x is None or y is None:
return None
return x + y
return x + y if not invert else x - y
def _update(dest, source, attr, delta):
dest_attr = getattr(dest, attr)
......
......@@ -230,19 +230,21 @@ class QHAPITest(QHTestCase):
resource1 = self.rand_resource()
r = self.qh.set_quota(
set_quota=[(e0, resource0, k0) + (5, 5, 5, 5) + (0,),
set_quota=[(e0, resource0, k0) + (5, None, 5, 6) + (0,),
(e1, resource0, k1) + (5, 5, 5, 5) + (0,)])
self.assertEqual(r, [])
r = self.qh.add_quota(clientkey=self.client,
serial=1,
add_quota=[(e0, resource0, k0, 0, (-2), None, 0),
sub_quota=[(e0, resource0, k0, 0, None, 1, 1)],
add_quota=[(e0, resource0, k0, 0, 3, None, 0),
# new holding
(e0, resource1, k0, 0, None, 5, 5)])
self.assertEqual(r, [])
r = self.qh.get_quota(get_quota=[(e0, resource0, k0),
(e0, resource1, k0)])
self.assertEqual(r, [(e0, resource0, 5, 5 - 2, None, 5)
self.assertEqual(r, [(e0, resource0, 5, 3, None, 5)
+ DEFAULT_HOLDING + (0,),
(e0, resource1, 0, None, 5, 5)
+ DEFAULT_HOLDING + (0,)])
......@@ -250,9 +252,9 @@ class QHAPITest(QHTestCase):
# repeated serial
r = self.qh.add_quota(clientkey=self.client,
serial=1,
add_quota=[(e0, resource0, k0, 0, 2, None, 0),
(e0, resource1, k0, 0, None, (-5), 0)])
self.assertEqual(r, [(e0, resource0), (e0, resource1)])
sub_quota=[(e0, resource1, k0, 0, None, (-5), 0)],
add_quota=[(e0, resource0, k0, 0, 2, None, 0)])
self.assertEqual(r, [(e0, resource1), (e0, resource0)])
r = self.qh.query_serials(clientkey=self.client, serials=[1, 2])
self.assertEqual(r, [1])
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment