Skip to content
Snippets Groups Projects
Commit db464c6f authored by Giorgos Korfiatis's avatar Giorgos Korfiatis
Browse files

astakos: Use bulk_create to speed up commissions

Update holdings and create provisions and log in bulk when issuing or
resolving commissions to speed up the operation,
parent 99b0f832
No related branches found
No related tags found
No related merge requests found
......@@ -136,11 +136,13 @@ def issue_commission(clientkey, provisions, name="", force=False):
provisions = _merge_same_keys(provisions)
keys = [key for (key, value) in provisions]
holdings = _get_holdings_for_update(keys)
changed_holdings = {}
try:
for key, quantity in provisions:
# Target
try:
th = holdings[key]
changed_holdings[th.id] = th
except KeyError:
m = ("There is no such holding %s" % unicode(key))
provision = _mkProvision(key, quantity)
......@@ -156,20 +158,22 @@ def issue_commission(clientkey, provisions, name="", force=False):
holdings[key] = th
provisions_to_create.append((key, quantity))
except QuotaholderError:
operations.revert()
raise
Holding.objects.filter(id__in=changed_holdings.keys()).delete()
Holding.objects.bulk_create(changed_holdings.values())
commission = Commission.objects.create(clientkey=clientkey,
name=name,
issue_datetime=datetime.now())
ps = []
for (holder, source, resource), quantity in provisions_to_create:
Provision.objects.create(serial=commission,
holder=holder,
source=source,
resource=resource,
quantity=quantity)
ps.append(Provision(serial=commission,
holder=holder,
source=source,
resource=resource,
quantity=quantity))
Provision.objects.bulk_create(ps)
return commission.serial
......@@ -191,7 +195,7 @@ def _log_provision(commission, provision, holding, log_datetime, reason):
'reason': reason,
}
ProvisionLog.objects.create(**kwargs)
return ProvisionLog(**kwargs)
def _get_commissions_for_update(clientkey, serials):
......@@ -252,12 +256,17 @@ def resolve_pending_commissions(clientkey, accept_set=None, reject_set=None,
accepted.append(serial) if accept else rejected.append(serial)
ps = provisions.get(serial, [])
changed_holdings = {}
provision_ids = []
plog = []
for pv in ps:
key = pv.holding_key()
h = holdings.get(key)
changed_holdings[h.id] = h
if h is None:
raise CorruptedError("Corrupted provision")
provision_ids.append(pv.id)
quantity = pv.quantity
action = finalize if accept else undo
if quantity >= 0:
......@@ -267,8 +276,12 @@ def resolve_pending_commissions(clientkey, accept_set=None, reject_set=None,
prefix = 'ACCEPT:' if accept else 'REJECT:'
comm_reason = prefix + reason[-121:]
_log_provision(commission, pv, h, log_datetime, comm_reason)
pv.delete()
plog.append(
_log_provision(commission, pv, h, log_datetime, comm_reason))
Holding.objects.filter(id__in=changed_holdings.keys()).delete()
Holding.objects.bulk_create(changed_holdings.values())
Provision.objects.filter(id__in=provision_ids).delete()
ProvisionLog.objects.bulk_create(plog)
commission.delete()
return accepted, rejected, notFound, conflicting
......
......@@ -77,12 +77,10 @@ class Import(Operation):
usage=usage_max)
holding.usage_max = new_usage_max
holding.save()
@classmethod
def _finalize(cls, holding, quantity):
holding.usage_min += quantity
holding.save()
class Release(Operation):
......@@ -104,12 +102,10 @@ class Release(Operation):
usage=usage_min)
holding.usage_min = new_usage_min
holding.save()
@classmethod
def _finalize(cls, holding, quantity):
holding.usage_max -= quantity
holding.save()
class Operations(object):
......
......@@ -83,6 +83,12 @@ class QuotaholderTest(TestCase):
with assertRaises(NoCommissionError):
qh.get_commission(self.client, s1+1)
r = qh.get_quota()
self.assertEqual(r,
{(holder, source, resource1): (limit1, 0, limit1/2),
(holder, source, resource2): (limit2, 0, limit2),
})
# commission exceptions
with assertRaises(NoCapacityError) as cm:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment