Commit 502ac2d1 authored by Giorgos Korfiatis's avatar Giorgos Korfiatis
Browse files

quotaholder: Impose ordering on locking holdings

Prefetch all holdings required for update with a specified order
(by entity/resource pair).

Refs #3560
parent c2c795fe
......@@ -210,6 +210,37 @@ class QuotaholderDjangoDBCallpoint(Callpoint):
return holdings
def get_holdings_for_update(self, holding_keys):
holding_keys = sorted(holding_keys)
holdings = {}
for (entity, resource) in holding_keys:
try:
h = Holding.objects.get_for_update(
entity=entity, resource=resource)
holdings[(entity, resource)] = h
except Holding.DoesNotExist:
pass
return holdings
def keys_from_list(self, param_list):
return set((p[0], p[1]) for p in param_list)
def holdings_for_update(self, param_list):
keys = self.keys_from_list(param_list)
return self.get_holdings_for_update(keys)
def commission_holdings_for_update(self, provisions, target):
source_keys = self.keys_from_list(provisions)
target_keys = set((target, resource)
for (_, resource, _) in provisions)
all_keys = source_keys.union(target_keys)
return self.get_holdings_for_update(all_keys)
def provision_holdings_for_update(self, provisions, target):
provisions_list = [(p.entity_id, p.resource, p.quantity)
for p in provisions]
return self.commission_holdings_for_update(provisions_list, target)
def _set_holding(self, entity, resource, policy, flags):
try:
h = db_get_holding(entity=entity, resource=resource,
......@@ -315,6 +346,8 @@ class QuotaholderDjangoDBCallpoint(Callpoint):
rejected = []
append = rejected.append
holdings = self.holdings_for_update(reset_holding)
for idx, tpl in enumerate(reset_holding):
(entity, resource, key,
imported, exported, returned, released) = tpl
......@@ -325,8 +358,7 @@ class QuotaholderDjangoDBCallpoint(Callpoint):
continue
try:
h = db_get_holding(entity=entity, resource=resource,
for_update=True)
h = holdings[(entity, resource)]
h.imported = imported
h.importing = imported
h.exported = exported
......@@ -336,7 +368,8 @@ class QuotaholderDjangoDBCallpoint(Callpoint):
h.released = released
h.releasing = released
h.save()
except Holding.DoesNotExist:
holdings[(entity, resource)] = h
except KeyError:
append(idx)
continue
......@@ -474,21 +507,16 @@ class QuotaholderDjangoDBCallpoint(Callpoint):
return quotas
def entities_from_input(self, param_list):
names = [p[0] for p in param_list]
return Entity.objects.in_bulk(names)
def set_quota(self, context=None, set_quota=()):
rejected = []
append = rejected.append
q_holdings = Q()
entities = []
for (entity, resource, key, _, _, _, _, _) in set_quota:
entities.append(entity)
hs = Holding.objects.filter(entity__in=entities).select_for_update()
holdings = {}
for h in hs:
holdings[(h.entity_id, h.resource)] = h
entities = Entity.objects.in_bulk(entities)
holdings = self.holdings_for_update(set_quota)
entities = self.entities_from_input(set_quota)
old_policies = []
......@@ -549,19 +577,10 @@ class QuotaholderDjangoDBCallpoint(Callpoint):
pass
sources = sub_quota + add_quota
q_holdings = Q()
entities = []
for (entity, resource, key, _, _, _, _) in sources:
entities.append(entity)
hs = Holding.objects.filter(entity__in=entities).select_for_update()
holdings = {}
for h in hs:
holdings[(h.entity_id, h.resource)] = h
entities = Entity.objects.in_bulk(entities)
holdings = self.holdings_for_update(sources)
entities = self.entities_from_input(sources)
pids = [h.policy_id for h in hs]
pids = [h.policy_id for h in holdings.itervalues()]
policies = Policy.objects.in_bulk(pids)
old_policies = []
......@@ -632,7 +651,7 @@ class QuotaholderDjangoDBCallpoint(Callpoint):
if clientkey is None:
return
for serial in serials:
for serial in sorted(serials):
try:
c = db_get_callserial(clientkey=clientkey,
serial=serial,
......@@ -684,6 +703,8 @@ class QuotaholderDjangoDBCallpoint(Callpoint):
commission = create(entity_id=target, clientkey=clientkey, name=name)
serial = commission.serial
holdings = self.commission_holdings_for_update(provisions, target)
checked = []
for entity, resource, quantity in provisions:
......@@ -710,9 +731,8 @@ class QuotaholderDjangoDBCallpoint(Callpoint):
# Source limits checks
try:
h = db_get_holding(entity=entity, resource=resource,
for_update=True)
except Holding.DoesNotExist:
h = holdings[(entity, resource)]
except KeyError:
m = ("There is no quantity "
"to allocate from in %s.%s" % (entity, resource))
raise NoQuantityError(m,
......@@ -766,9 +786,8 @@ class QuotaholderDjangoDBCallpoint(Callpoint):
# Target limits checks
try:
th = db_get_holding(entity=target, resource=resource,
for_update=True)
except Holding.DoesNotExist:
th = holdings[(target, resource)]
except KeyError:
m = ("There is no capacity "
"to allocate into in %s.%s" % (target, resource))
raise NoCapacityError(m,
......@@ -837,6 +856,8 @@ class QuotaholderDjangoDBCallpoint(Callpoint):
h.save()
th.save()
holdings[(entity, resource)] = h
holdings[(target, resource)] = th
return serial
......@@ -894,13 +915,12 @@ class QuotaholderDjangoDBCallpoint(Callpoint):
t = c.entity
provisions = db_filter_provision(serial=serial, for_update=True)
holdings = self.provision_holdings_for_update(provisions, t)
for pv in provisions:
try:
h = db_get_holding(entity=pv.entity.entity,
resource=pv.resource, for_update=True)
th = db_get_holding(entity=t, resource=pv.resource,
for_update=True)
except Holding.DoesNotExist:
h = holdings[(pv.entity_id, pv.resource)]
th = holdings[(t, pv.resource)]
except KeyError:
m = "Corrupted provision"
raise CorruptedError(m)
......@@ -920,6 +940,8 @@ class QuotaholderDjangoDBCallpoint(Callpoint):
self._log_provision(c, h, th, pv, log_time, reason)
h.save()
th.save()
holdings[(pv.entity_id, pv.resource)] = h
holdings[(t, pv.resource)] = th
pv.delete()
c.delete()
......@@ -940,12 +962,11 @@ class QuotaholderDjangoDBCallpoint(Callpoint):
t = c.entity
provisions = db_filter_provision(serial=serial, for_update=True)
holdings = self.provision_holdings_for_update(provisions, t)
for pv in provisions:
try:
h = db_get_holding(entity=pv.entity.entity,
resource=pv.resource, for_update=True)
th = db_get_holding(entity=t, resource=pv.resource,
for_update=True)
h = holdings[(pv.entity_id, pv.resource)]
th = holdings[(t, pv.resource)]
except Holding.DoesNotExist:
m = "Corrupted provision"
raise CorruptedError(m)
......@@ -966,6 +987,8 @@ class QuotaholderDjangoDBCallpoint(Callpoint):
self._log_provision(c, h, th, pv, log_time, reason)
h.save()
th.save()
holdings[(pv.entity_id, pv.resource)] = h
holdings[(t, pv.resource)] = th
pv.delete()
c.delete()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment