Commit 0b0bd82c authored by Giorgos Korfiatis's avatar Giorgos Korfiatis
Browse files

quotaholder: Unify accept and reject commission code

Incorporate accept/reject functions in resolve_pending_commissions,
in order to enforce a total ordering on locking holdings.
Add a wrapper (resolve_pending_commission) that operates on a single
serial.
parent d1e81107
......@@ -142,21 +142,18 @@ def _issue_commission(clientkey, provisions, force, accept):
return serial
def failed_to_cloudfault(failed):
serial, reason = failed
if reason == 'NOTFOUND':
def notFoundCF(serial):
body = {"code": 404,
"message": "serial %s does not exist" % serial,
}
cloudfault = {"itemNotFound": body}
elif reason == 'CONFLICT':
return {"itemNotFound": body}
def conflictingCF(serial):
body = {"code": 400,
"message": "cannot both accept and reject serial %s" % serial,
}
cloudfault = {"badRequest": body}
else:
raise InternalServerError('Unexpected error')
return (serial, cloudfault)
return {"badRequest": body}
@csrf_exempt
......@@ -174,8 +171,10 @@ def resolve_pending_commissions(request):
result = qh.resolve_pending_commissions(clientkey=client_key,
accept_set=accept,
reject_set=reject)
accepted, rejected, failed = result
cloudfaults = [failed_to_cloudfault(f) for f in failed]
accepted, rejected, notFound, conflicting = result
notFound = [(serial, notFoundCF(serial)) for serial in notFound]
conflicting = [(serial, conflictingCF(serial)) for serial in conflicting]
cloudfaults = notFound + conflicting
data = {'accepted': accepted,
'rejected': rejected,
'failed': cloudfaults
......@@ -217,13 +216,9 @@ def serial_action(request, serial):
if accept == reject:
raise BadRequest('Specify either accept or reject action.')
if accept:
result = qh.accept_commission(clientkey=client_key,
serial=serial)
else:
result = qh.reject_commission(clientkey=client_key,
serial=serial)
result = qh.resolve_pending_commission(clientkey=client_key,
serial=serial,
accept=accept)
response = HttpResponse()
if not result:
response.status_code = 404
......
......@@ -31,8 +31,6 @@
# interpreted as representing official policies, either expressed
# or implied, of GRNET S.A.
from functools import partial
from astakos.quotaholder.exception import (
QuotaholderError,
NoCommissionError,
......@@ -41,16 +39,14 @@ from astakos.quotaholder.exception import (
DuplicateError)
from astakos.quotaholder.commission import (
Import, Release, Operations)
Import, Release, Operations, finalize, undo)
from astakos.quotaholder.utils.newname import newname
from astakos.quotaholder.api import QH_PRACTICALLY_INFINITE
from .models import (Holding,
Commission, Provision, ProvisionLog,
now,
db_get_holding,
db_get_commission, db_filter_provision)
now)
class QuotaholderDjangoDBCallpoint(object):
......@@ -184,7 +180,9 @@ class QuotaholderDjangoDBCallpoint(object):
holdings[key] = th
Provision.objects.create(serial=commission,
holding=th,
holder=th.holder,
source=th.source,
resource=th.resource,
quantity=quantity)
except QuotaholderError:
......@@ -194,9 +192,7 @@ class QuotaholderDjangoDBCallpoint(object):
return serial
def _log_provision(self,
commission, provision, log_time, reason):
holding = provision.holding
commission, provision, holding, log_time, reason):
kwargs = {
'serial': commission.serial,
......@@ -215,77 +211,89 @@ class QuotaholderDjangoDBCallpoint(object):
ProvisionLog.objects.create(**kwargs)
def accept_commission(self,
context=None, clientkey=None,
serial=None, reason=''):
log_time = now()
try:
c = db_get_commission(clientkey=clientkey, serial=serial,
for_update=True)
except Commission.DoesNotExist:
return False
def _get_commissions_for_update(self, clientkey, serials):
cs = Commission.objects.filter(
clientkey=clientkey, serial__in=serials).select_for_update()
operations = Operations()
provisions = db_filter_provision(serial=serial, for_update=True)
for pv in provisions:
try:
th = db_get_holding(id=pv.holding_id,
for_update=True)
except Holding.DoesNotExist:
m = "Corrupted provision"
raise CorruptedError(m)
commissions = {}
for c in cs:
commissions[c.serial] = c
return commissions
quantity = pv.quantity
def _partition_by(self, f, l):
d = {}
for x in l:
group = f(x)
group_l = d.get(group, [])
group_l.append(x)
d[group] = group_l
return d
if quantity >= 0:
operations.finalize(Import, th, quantity)
else: # release
abs_quantity = -quantity
operations.finalize(Release, th, abs_quantity)
def resolve_pending_commissions(self,
context=None, clientkey=None,
accept_set=[], reject_set=[],
reason=''):
actions = dict.fromkeys(accept_set, True)
conflicting = set()
for serial in reject_set:
if actions.get(serial) is True:
actions.pop(serial)
conflicting.add(serial)
else:
actions[serial] = False
reason = 'ACCEPT:' + reason[-121:]
self._log_provision(c, pv, log_time, reason)
pv.delete()
c.delete()
return True
conflicting = list(conflicting)
serials = actions.keys()
commissions = self._get_commissions_for_update(clientkey, serials)
ps = Provision.objects.filter(serial__in=serials).select_for_update()
holding_keys = sorted(p.holding_key() for p in ps)
holdings = self._get_holdings_for_update(holding_keys)
provisions = self._partition_by(lambda p: p.serial_id, ps)
def reject_commission(self,
context=None, clientkey=None,
serial=None, reason=''):
log_time = now()
try:
c = db_get_commission(clientkey=clientkey, serial=serial,
for_update=True)
except Commission.DoesNotExist:
return False
accepted, rejected, notFound = [], [], []
for serial, accept in actions.iteritems():
commission = commissions.get(serial)
if commission is None:
notFound.append(serial)
continue
operations = Operations()
accepted.append(serial) if accept else rejected.append(serial)
provisions = db_filter_provision(serial=serial, for_update=True)
for pv in provisions:
try:
th = db_get_holding(id=pv.holding_id,
for_update=True)
except Holding.DoesNotExist:
m = "Corrupted provision"
raise CorruptedError(m)
ps = provisions.get(serial)
assert ps is not None
for pv in ps:
key = pv.holding_key()
h = holdings.get(key)
if h is None:
raise CorruptedError("Corrupted provision")
quantity = pv.quantity
action = finalize if accept else undo
if quantity >= 0:
operations.undo(Import, th, quantity)
action(Import, h, quantity)
else: # release
abs_quantity = -quantity
operations.undo(Release, th, abs_quantity)
action(Release, h, -quantity)
reason = 'REJECT:' + reason[-121:]
self._log_provision(c, pv, log_time, reason)
prefix = 'ACCEPT:' if accept else 'REJECT:'
comm_reason = prefix + reason[-121:]
self._log_provision(commission, pv, h, log_time, comm_reason)
pv.delete()
c.delete()
return True
commission.delete()
return accepted, rejected, notFound, conflicting
def resolve_pending_commission(self, clientkey, serial, accept=True):
if accept:
ok, notOk, notF, confl = self.resolve_pending_commissions(
clientkey=clientkey, accept_set=[serial])
else:
notOk, ok, notF, confl = self.resolve_pending_commissions(
clientkey=clientkey, reject_set=[serial])
assert notOk == confl == []
assert ok + notF == [serial]
return bool(ok)
def get_pending_commissions(self, context=None, clientkey=None):
pending = Commission.objects.filter(clientkey=clientkey)
......@@ -310,34 +318,5 @@ class QuotaholderDjangoDBCallpoint(object):
}
return response
def _resolve(self, include, exclude, operation):
done = []
failed = []
for serial in include:
if serial in exclude:
failed.append((serial, 'CONFLICT'))
else:
response = operation(serial=serial)
if response:
done.append(serial)
else:
failed.append((serial, 'NOTFOUND'))
return done, failed
def resolve_pending_commissions(self,
context=None, clientkey=None,
accept_set=[], reject_set=[]):
accept_set = set(accept_set)
reject_set = set(reject_set)
accept = partial(self.accept_commission, clientkey=clientkey)
reject = partial(self.reject_commission, clientkey=clientkey)
accepted, failed_ac = self._resolve(accept_set, reject_set, accept)
rejected, failed_re = self._resolve(reject_set, accept_set, reject)
failed = list(set(failed_ac + failed_re))
return accepted, rejected, failed
API_Callpoint = QuotaholderDjangoDBCallpoint
......@@ -137,14 +137,14 @@ class Operations(object):
operation.prepare(holding, quantity, check)
self.operations.append((operation, holding, quantity))
def finalize(self, operation, holding, quantity):
operation.finalize(holding, quantity)
self.operations.append((operation, holding, quantity))
def undo(self, operation, holding, quantity):
operation.undo(holding, quantity)
self.operations.append((operation, holding, quantity))
def revert(self):
for (operation, holding, quantity) in self.operations:
operation.revert(holding, quantity)
def finalize(operation, holding, quantity):
operation.finalize(holding, quantity)
def undo(operation, holding, quantity):
operation.undo(holding, quantity)
......@@ -77,19 +77,24 @@ class Provision(Model):
serial = ForeignKey( Commission,
to_field='serial',
related_name='provisions' )
holding = ForeignKey(Holding,
related_name='provisions')
holder = CharField(max_length=4096, db_index=True)
source = CharField(max_length=4096, null=True)
resource = CharField(max_length=4096, null=False)
quantity = intDecimalField()
objects = ForUpdateManager()
def todict(self):
return {'holder': self.holding.holder,
'source': self.holding.source,
'resource': self.holding.resource,
return {'holder': self.holder,
'source': self.source,
'resource': self.resource,
'quantity': self.quantity,
}
def holding_key(self):
return (self.holder, self.source, self.resource)
class ProvisionLog(Model):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment