Commit 92bc7295 authored by Giorgos Korfiatis's avatar Giorgos Korfiatis
Browse files

Redesign quota holding

A Holding now tracks the current usage (amount imported)
and the amount available to export (stock).

Counters come in pairs (imported_min, imported_max and
stock_min, stock_max respectively) to allow for a two-step
commission.

For example, issuing a commission to import some quantity
increases imported_max. When accepting the commission,
imported_min increases by the same quantity; at the same time
the stock counters increase, too, since the quantity imported
becomes available for re-export.

This commissioning logic is factored out for clarity in file commission.py.

Remove also quantity (initial supply) from Policy.
This can be simulated by setting the holding counters to the desired value.
Since capacity now represents the total capacity, care must be taken
that it be no less than the initial supply set as suggested.
parent cc61bd29
......@@ -34,9 +34,12 @@
from astakos.quotaholder.exception import (
QuotaholderError,
CorruptedError, InvalidDataError,
NoQuantityError, NoCapacityError,
NoStockError, NoCapacityError,
DuplicateError)
from astakos.quotaholder.commission import (
Import, Export, Reclaim, Release, Operations)
from astakos.quotaholder.utils.newname import newname
from astakos.quotaholder.api import QH_PRACTICALLY_INFINITE
......@@ -61,23 +64,21 @@ class QuotaholderDjangoDBCallpoint(object):
except Policy.DoesNotExist:
continue
append((policy, p.quantity, p.capacity))
append((policy, p.capacity))
return limits
def set_limits(self, context=None, set_limits=[]):
for (policy, quantity, capacity) in set_limits:
for (policy, capacity) in set_limits:
try:
policy = db_get_policy(policy=policy, for_update=True)
except Policy.DoesNotExist:
Policy.objects.create(policy=policy,
quantity=quantity,
capacity=capacity,
)
else:
policy.quantity = quantity
policy.capacity = capacity
policy.save()
......@@ -94,8 +95,8 @@ class QuotaholderDjangoDBCallpoint(object):
continue
append((h.holder, h.resource, h.policy.policy,
h.imported, h.exported,
h.returned, h.released, h.flags))
h.imported_min, h.imported_max,
h.stock_min, h.stock_max, h.flags))
return holdings
......@@ -126,7 +127,7 @@ class QuotaholderDjangoDBCallpoint(object):
def _init_holding(self,
holder, resource, policy,
imported, exported, returned, released,
imported_min, imported_max, stock_min, stock_max,
flags):
try:
h = db_get_holding(holder=holder, resource=resource,
......@@ -136,14 +137,10 @@ class QuotaholderDjangoDBCallpoint(object):
h.policy = policy
h.flags = flags
h.imported = imported
h.importing = imported
h.exported = exported
h.exporting = exported
h.returned = returned
h.returning = returned
h.released = released
h.releasing = released
h.imported_min = imported_min
h.imported_max = imported_max
h.stock_min = stock_min
h.stock_max = stock_max
h.save()
def init_holding(self, context=None, init_holding=[]):
......@@ -152,7 +149,7 @@ class QuotaholderDjangoDBCallpoint(object):
for idx, sfh in enumerate(init_holding):
(holder, resource, policy,
imported, exported, returned, released,
imported_min, imported_max, stock_min, stock_max,
flags) = sfh
try:
......@@ -162,8 +159,8 @@ class QuotaholderDjangoDBCallpoint(object):
continue
self._init_holding(holder, resource, p,
imported, exported,
returned, released,
imported_min, imported_max,
stock_min, stock_max,
flags)
if rejected:
raise QuotaholderError(rejected)
......@@ -175,19 +172,15 @@ class QuotaholderDjangoDBCallpoint(object):
for idx, tpl in enumerate(reset_holding):
(holder, resource,
imported, exported, returned, released) = tpl
imported_min, imported_max, stock_min, stock_max) = tpl
try:
h = db_get_holding(holder=holder, resource=resource,
for_update=True)
h.imported = imported
h.importing = imported
h.exported = exported
h.exporting = exported
h.returned = returned
h.returning = returned
h.released = released
h.releasing = released
h.imported_min = imported_min
h.imported_max = imported_max
h.stock_min = stock_min
h.stock_max = stock_max
h.save()
except Holding.DoesNotExist:
append(idx)
......@@ -207,11 +200,6 @@ class QuotaholderDjangoDBCallpoint(object):
return as_target + as_source
def _actual_quantity(self, holding):
hp = holding.policy
return hp.quantity + (holding.imported + holding.returned -
holding.exported - holding.released)
def release_holding(self, context=None, release_holding=[]):
rejected = []
append = rejected.append
......@@ -228,8 +216,7 @@ class QuotaholderDjangoDBCallpoint(object):
append(idx)
continue
q = self._actual_quantity(h)
if q > 0:
if h.imported_max > 0:
append(idx)
continue
......@@ -257,7 +244,7 @@ class QuotaholderDjangoDBCallpoint(object):
continue
append([(holder, h.resource,
h.imported, h.exported, h.returned, h.released)
h.imported_min, h.imported_max, h.stock_min, h.stock_max)
for h in holdings])
return holdings_list, rejected
......@@ -280,9 +267,9 @@ class QuotaholderDjangoDBCallpoint(object):
p = h.policy
append((h.holder, h.resource, p.quantity, p.capacity,
h.imported, h.exported,
h.returned, h.released,
append((h.holder, h.resource, p.capacity,
h.imported_min, h.imported_max,
h.stock_min, h.stock_max,
h.flags))
return quotas
......@@ -293,7 +280,7 @@ class QuotaholderDjangoDBCallpoint(object):
q_holdings = Q()
holders = []
for (holder, resource, _, _, _) in set_quota:
for (holder, resource, _, _) in set_quota:
holders.append(holder)
hs = Holding.objects.filter(holder__in=holders).select_for_update()
......@@ -304,12 +291,11 @@ class QuotaholderDjangoDBCallpoint(object):
old_policies = []
for (holder, resource,
quantity, capacity,
capacity,
flags) in set_quota:
policy = newname('policy_')
newp = Policy(policy=policy,
quantity=quantity,
capacity=capacity,
)
......@@ -345,7 +331,7 @@ class QuotaholderDjangoDBCallpoint(object):
sources = sub_quota + add_quota
q_holdings = Q()
holders = []
for (holder, resource, _, _) in sources:
for (holder, resource, _) in sources:
holders.append(holder)
hs = Holding.objects.filter(holder__in=holders).select_for_update()
......@@ -360,7 +346,7 @@ class QuotaholderDjangoDBCallpoint(object):
for removing, source in [(True, sub_quota), (False, add_quota)]:
for (holder, resource,
quantity, capacity,
capacity,
) in source:
try:
......@@ -381,8 +367,6 @@ class QuotaholderDjangoDBCallpoint(object):
policy = newname('policy_')
newp = Policy(policy=policy)
newp.quantity = _add(p.quantity if p else 0, quantity,
invert=removing)
newp.capacity = _add(p.capacity if p else 0, capacity,
invert=removing)
......@@ -413,135 +397,74 @@ class QuotaholderDjangoDBCallpoint(object):
clientkey=None,
target=None,
name=None,
provisions=[]):
provisions=()):
create = Commission.objects.create
commission = create(holder=target, clientkey=clientkey, name=name)
serial = commission.serial
checked = []
for holder, resource, quantity in provisions:
operations = Operations()
if holder == target:
m = "Cannot issue commission from an holder to itself (%s)" % (
holder,)
raise InvalidDataError(m)
ent_res = holder, resource
if ent_res in checked:
m = "Duplicate provision for %s.%s" % ent_res
raise DuplicateError(m)
checked.append(ent_res)
try:
checked = []
for holder, resource, quantity in provisions:
release = 0
if quantity < 0:
release = 1
if holder == target:
m = ("Cannot issue commission from a holder "
"to itself (%s)" % (holder,))
raise InvalidDataError(m)
# Source limits checks
try:
h = db_get_holding(holder=holder, resource=resource,
for_update=True)
except Holding.DoesNotExist:
m = ("There is no quantity "
"to allocate from in %s.%s" % (holder, resource))
raise NoQuantityError(m,
source=holder, target=target,
resource=resource, requested=quantity,
current=0, limit=0)
hp = h.policy
if not release:
limit = hp.quantity + h.imported - h.releasing
unavailable = h.exporting - h.returned
available = limit - unavailable
if quantity > available:
m = ("There is not enough quantity "
"to allocate from in %s.%s" % (holder, resource))
raise NoQuantityError(m,
source=holder,
target=target,
resource=resource,
requested=quantity,
current=unavailable,
limit=limit)
else:
current = (+ h.importing + h.returning
- h.exported - h.returned)
limit = hp.capacity
if current - quantity > limit:
m = ("There is not enough capacity "
"to release to in %s.%s" % (holder, resource))
raise NoQuantityError(m,
source=holder,
target=target,
resource=resource,
requested=quantity,
current=current,
limit=limit)
ent_res = holder, resource
if ent_res in checked:
m = "Duplicate provision for %s.%s" % ent_res
raise DuplicateError(m)
checked.append(ent_res)
# Target limits checks
try:
th = db_get_holding(holder=target, resource=resource,
for_update=True)
except Holding.DoesNotExist:
m = ("There is no capacity "
"to allocate into in %s.%s" % (target, resource))
raise NoCapacityError(m,
source=holder,
target=target,
resource=resource,
requested=quantity,
current=0,
limit=0)
tp = th.policy
if not release:
limit = tp.quantity + tp.capacity
current = (+ th.importing + th.returning + tp.quantity
- th.exported - th.released)
if current + quantity > limit:
m = ("There is not enough capacity "
# Source
try:
h = db_get_holding(holder=holder, resource=resource,
for_update=True)
except Holding.DoesNotExist:
m = ("%s has no stock of %s." % (holder, resource))
raise NoStockError(m,
holder=holder,
resource=resource,
requested=quantity,
current=0,
limit=0)
# Target
try:
th = db_get_holding(holder=target, resource=resource,
for_update=True)
except Holding.DoesNotExist:
m = ("There is no capacity "
"to allocate into in %s.%s" % (target, resource))
raise NoCapacityError(m,
source=holder,
target=target,
holder=holder,
resource=resource,
requested=quantity,
current=current,
limit=limit)
else:
limit = tp.quantity + th.imported - th.releasing
unavailable = th.exporting - th.returned
available = limit - unavailable
current=0,
limit=0)
if available + quantity < 0:
m = ("There is not enough quantity "
"to release from in %s.%s" % (target, resource))
raise NoCapacityError(m,
source=holder,
target=target,
resource=resource,
requested=quantity,
current=unavailable,
limit=limit)
Provision.objects.create(serial=commission,
holder=holder,
resource=resource,
quantity=quantity)
if release:
h.returning -= quantity
th.releasing -= quantity
else:
h.exporting += quantity
th.importing += quantity
if quantity >= 0:
operations.prepare(Export, h, quantity)
operations.prepare(Import, th, quantity)
h.save()
th.save()
else: # release
abs_quantity = -quantity
operations.prepare(Reclaim, h, abs_quantity)
operations.prepare(Release, th, abs_quantity)
Provision.objects.create(serial=commission,
holder=holder,
resource=resource,
quantity=quantity)
except QuotaholderError:
operations.revert()
raise
return serial
......@@ -560,18 +483,16 @@ class QuotaholderDjangoDBCallpoint(object):
'source': s_holder,
'target': t_holder,
'resource': provision.resource,
'source_quantity': s_policy.quantity,
'source_capacity': s_policy.capacity,
'source_imported': s_holding.imported,
'source_exported': s_holding.exported,
'source_returned': s_holding.returned,
'source_released': s_holding.released,
'target_quantity': t_policy.quantity,
'source_imported_min': s_holding.imported_min,
'source_imported_max': s_holding.imported_max,
'source_stock_min': s_holding.stock_min,
'source_stock_max': s_holding.stock_max,
'target_capacity': t_policy.capacity,
'target_imported': t_holding.imported,
'target_exported': t_holding.exported,
'target_returned': t_holding.returned,
'target_released': t_holding.released,
'target_imported_min': t_holding.imported_min,
'target_imported_max': t_holding.imported_max,
'target_stock_min': t_holding.stock_min,
'target_stock_max': t_holding.stock_max,
'delta_quantity': provision.quantity,
'issue_time': commission.issue_time,
'log_time': log_time,
......@@ -594,6 +515,8 @@ class QuotaholderDjangoDBCallpoint(object):
t = c.holder
operations = Operations()
provisions = db_filter_provision(serial=serial, for_update=True)
for pv in provisions:
try:
......@@ -606,21 +529,18 @@ class QuotaholderDjangoDBCallpoint(object):
raise CorruptedError(m)
quantity = pv.quantity
release = 0
if quantity < 0:
release = 1
if release:
h.returned -= quantity
th.released -= quantity
else:
h.exported += quantity
th.imported += quantity
if quantity >= 0:
operations.finalize(Export, h, quantity)
operations.finalize(Import, th, quantity)
else: # release
abs_quantity = -quantity
operations.finalize(Reclaim, h, abs_quantity)
operations.finalize(Release, th, abs_quantity)
reason = 'ACCEPT:' + reason[-121:]
self._log_provision(c, h, th, pv, log_time, reason)
h.save()
th.save()
pv.delete()
c.delete()
......@@ -640,6 +560,8 @@ class QuotaholderDjangoDBCallpoint(object):
t = c.holder
operations = Operations()
provisions = db_filter_provision(serial=serial, for_update=True)
for pv in provisions:
try:
......@@ -652,21 +574,18 @@ class QuotaholderDjangoDBCallpoint(object):
raise CorruptedError(m)
quantity = pv.quantity
release = 0
if quantity < 0:
release = 1
if release:
h.returning += quantity
th.releasing += quantity
else:
h.exporting -= quantity
th.importing -= quantity
if quantity >= 0:
operations.undo(Export, h, quantity)
operations.undo(Import, th, quantity)
else: # release
abs_quantity = -quantity
operations.undo(Reclaim, h, abs_quantity)
operations.undo(Release, th, abs_quantity)
reason = 'REJECT:' + reason[-121:]
self._log_provision(c, h, th, pv, log_time, reason)
h.save()
th.save()
pv.delete()
c.delete()
......
# Copyright 2013 GRNET S.A. All rights reserved.
#
# Redistribution and use in source and binary forms, with or
# without modification, are permitted provided that the following
# conditions are met:
#
# 1. Redistributions of source code must retain the above
# copyright notice, this list of conditions and the following
# disclaimer.
#
# 2. Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials
# provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# The views and conclusions contained in the software and
# documentation are those of the authors and should not be
# interpreted as representing official policies, either expressed
# or implied, of GRNET S.A.
from astakos.quotaholder.exception import (
NoCapacityError, NoStockError,
NonImportedError, NoStockReleaseError, NonExportedError)
class Operation(object):
@staticmethod
def assertions(holding):
assert(0 <= holding.imported_min)
assert(holding.imported_min <= holding.imported_max)
assert(0 <= holding.stock_min)
assert(holding.stock_min <= holding.stock_max)
@classmethod
def _prepare(cls, holding, quantity, check=True):
raise NotImplementedError
@classmethod
def prepare(cls, holding, quantity, check=True):
cls.assertions(holding)
cls._prepare(holding, quantity, check=True)
@classmethod
def _finalize(cls, holding, quantity):
raise NotImplementedError
@classmethod
def finalize(cls, holding, quantity):
cls.assertions(holding)
cls._finalize(holding, quantity)
@classmethod
def undo(cls, holding, quantity):
cls.prepare(holding, -quantity, check=False)
@classmethod
def revert(cls, holding, quantity):
# Assertions do not hold when reverting
cls._prepare(holding, -quantity, check=False)
class Import(Operation):
@classmethod
def _prepare(cls, holding, quantity, check=True):
imported_max = holding.imported_max
new_imported_max = imported_max + quantity
capacity = holding.policy.capacity
if check and new_imported_max > capacity:
holder = holding.holder
resource = holding.resource
m = ("%s has not enough capacity of %s." % (holder, resource))
raise NoCapacityError(m,
holder=holder,
resource=resource,
requested=quantity,
current=imported_max,
limit=capacity)
holding.imported_max = new_imported_max
holding.save()