Commit 5888e742 authored by Giorgos Korfiatis's avatar Giorgos Korfiatis

select_for_update for quotaholder

parent 8d753271
......@@ -45,7 +45,9 @@ from synnefo.lib.commissioning.utils.newname import newname
from django.db.models import Q
from django.db import transaction, IntegrityError
from .models import (Holder, Entity, Policy, Holding,
Commission, Provision, ProvisionLog, now)
Commission, Provision, ProvisionLog, now,
db_get_entity, db_get_holding, db_get_policy,
db_get_commission, db_filter_provision)
class QuotaholderDjangoDBCallpoint(Callpoint):
......@@ -107,7 +109,7 @@ class QuotaholderDjangoDBCallpoint(Callpoint):
for entity, key, newkey in set_entity_key:
try:
e = Entity.objects.get(entity=entity, key=key)
e = db_get_entity(entity=entity, key=key, for_update=True)
except Entity.DoesNotExist:
append(entity)
continue
......@@ -163,7 +165,7 @@ class QuotaholderDjangoDBCallpoint(Callpoint):
import_limit, export_limit ) in set_limits:
try:
policy = Policy.objects.get(policy=policy)
policy = db_get_policy(policy=policy, for_update=True)
except Policy.DoesNotExist:
Policy.objects.create( policy=policy,
quantity=quantity,
......@@ -200,7 +202,8 @@ class QuotaholderDjangoDBCallpoint(Callpoint):
def _set_holding(self, entity, resource, policy, flags):
try:
h = Holding.objects.get(entity=entity, resource=resource)
h = db_get_holding(entity=entity, resource=resource,
for_update=True)
h.policy = p
h.flags = flags
h.save()
......@@ -231,7 +234,8 @@ class QuotaholderDjangoDBCallpoint(Callpoint):
continue
try:
h = Holding.objects.get(entity=entity, resource=resource)
h = db_get_holding(entity=entity, resource=resource,
for_update=True)
h.policy = p
h.flags = flags
h.save()
......@@ -245,7 +249,8 @@ class QuotaholderDjangoDBCallpoint(Callpoint):
imported, exported, returned, released,
flags):
try:
h = Holding.objects.get(entity=entity, resource=resource)
h = db_get_holding(entity=entity, resource=resource,
for_update=True)
except Holding.DoesNotExist:
h = Holding(entity=entity, resource=resource)
......@@ -305,7 +310,8 @@ class QuotaholderDjangoDBCallpoint(Callpoint):
continue
try:
h = Holding.objects.get(entity=entity, resource=resource)
h = db_get_holding(entity=entity, resource=resource,
for_update=True)
h.imported=imported
h.importing=imported
h.exported=exported
......@@ -341,7 +347,8 @@ class QuotaholderDjangoDBCallpoint(Callpoint):
def _increase_resource(self, entity, resource, amount):
try:
h = Holding.objects.get(entity=entity, resource=resource)
h = db_get_holding(entity=entity, resource=resource,
for_update=True)
except Holding.DoesNotExist:
h = Holding(entity=entity, resource=resource)
p = Policy.objects.create(policy=self._new_policy_name(),
......@@ -356,7 +363,8 @@ class QuotaholderDjangoDBCallpoint(Callpoint):
for idx, (entity, resource, key) in enumerate(release_holding):
try:
h = Holding.objects.get(entity=entity, resource=resource)
h = db_get_holding(entity=entity, resource=resource,
for_update=True)
except Holding.DoesNotExist:
append(idx)
continue
......@@ -462,7 +470,8 @@ class QuotaholderDjangoDBCallpoint(Callpoint):
)
try:
h = Holding.objects.get(entity=entity, resource=resource)
h = db_get_holding(entity=entity, resource=resource,
for_update=True)
p = h.policy
h.policy = newp
h.flags = flags
......@@ -528,7 +537,8 @@ class QuotaholderDjangoDBCallpoint(Callpoint):
release = 1
try:
h = Holding.objects.get(entity=entity, resource=resource)
h = db_get_holding(entity=entity, resource=resource,
for_update=True)
except Holding.DoesNotExist:
m = ("There is not enough quantity "
"to allocate from in %s.%s" % (entity, resource))
......@@ -551,7 +561,8 @@ class QuotaholderDjangoDBCallpoint(Callpoint):
raise NoQuantityError(m)
try:
th = Holding.objects.get(entity=target, resource=resource)
th = db_get_holding(entity=target, resource=resource,
for_update=True)
except Holding.DoesNotExist:
m = ("There is not enough capacity "
"to allocate into in %s.%s" % (target, resource))
......@@ -630,18 +641,20 @@ class QuotaholderDjangoDBCallpoint(Callpoint):
for serial in serials:
try:
c = Commission.objects.get(clientkey=clientkey, serial=serial)
c = db_get_commission(clientkey=clientkey, serial=serial,
for_update=True)
except Commission.DoesNotExist:
return
t = c.entity
provisions = Provision.objects.filter(serial=serial)
provisions = db_filter_provision(serial=serial, for_update=True)
for pv in provisions:
try:
h = Holding.objects.get(entity=pv.entity.entity,
resource=pv.resource )
th = Holding.objects.get(entity=t, resource=pv.resource)
h = db_get_holding(entity=pv.entity.entity,
resource=pv.resource, for_update=True)
th = db_get_holding(entity=t, resource=pv.resource,
for_update=True)
except Holding.DoesNotExist:
m = "Corrupted provision"
raise CorruptedError(m)
......@@ -673,18 +686,20 @@ class QuotaholderDjangoDBCallpoint(Callpoint):
for serial in serials:
try:
c = Commission.objects.get(clientkey=clientkey, serial=serial)
c = db_get_commission(clientkey=clientkey, serial=serial,
for_update=True)
except Commission.DoesNotExist:
return
t = c.entity
provisions = Provision.objects.filter(serial=serial)
provisions = db_filter_provision(serial=serial, for_update=True)
for pv in provisions:
try:
h = Holding.objects.get(entity=pv.entity.entity,
resource=pv.resource)
th = Holding.objects.get(entity=t, resource=pv.resource)
h = db_get_holding(entity=pv.entity.entity,
resource=pv.resource, for_update=True)
th = db_get_holding(entity=t, resource=pv.resource,
for_update=True)
except Holding.DoesNotExist:
m = "Corrupted provision"
raise CorruptedError(m)
......@@ -740,7 +755,7 @@ class QuotaholderDjangoDBCallpoint(Callpoint):
append = rejected.append
for entity, key in release_entity:
try:
e = Entity.objects.get(entity=entity, key=key)
e = db_get_entity(entity=entity, key=key, for_update=True)
except Entity.DoesNotExist:
append(entity)
continue
......
# Copyright 2012 GRNET S.A. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.
#
# The views and conclusions contained in the software and documentation are
# those of the authors and should not be interpreted as representing official
# policies, either expressed or implied, of GRNET S.A.
from django.db import connections
from django.db.models import Manager
from django.db.models.query import QuerySet
class ForUpdateManager(Manager):
""" Model manager implementing SELECT .. FOR UPDATE statement
This manager implements select_for_update() method in order to use
row-level locking in the database and guarantee exclusive access, since
this method is only implemented in Django>=1.4.
Non-blocking reads are not implemented, and each query including a row
that is locked by another transaction will block until the lock is
released. Also care must be taken in order to avoid deadlocks or retry
transactions that abort due to deadlocks.
Example:
networks = Network.objects.select_for_update().filter(public=True)
"""
def __init__(self, *args, **kwargs):
super(ForUpdateManager, self).__init__(*args, **kwargs)
self._select_for_update = False
def filter(self, *args, **kwargs):
query = self.get_query_set().filter(*args, **kwargs)
if self._select_for_update:
self._select_for_update = False
return for_update(query)
else:
return query
def get(self, *args, **kwargs):
if not self._select_for_update:
return self.get_query_set().get(*args, **kwargs)
query = self.filter(*args, **kwargs)
query = list(query)
num = len(query)
if num == 1:
return query[0]
if not num:
raise self.model.DoesNotExist(
"%s matching query does not exist. "
"Lookup parameters were %s" %
(self.model._meta.object_name, kwargs))
raise self.model.MultipleObjectsReturned(
"get() returned more than one %s -- it returned %s! "
"Lookup parameters were %s" %
(self.model._meta.object_name, num, kwargs))
def select_for_update(self, *args, **kwargs):
self._select_for_update = True
return self
def for_update(query):
""" Rewrite query using SELECT .. FOR UPDATE.
"""
if 'sqlite' in connections[query.db].settings_dict['ENGINE'].lower():
# SQLite does not support FOR UPDATE
return query
sql, params = query.query.get_compiler(query.db).as_sql()
return query.model._default_manager.raw(sql.rstrip() + ' FOR UPDATE',
params)
class ProtectedDeleteManager(ForUpdateManager):
""" Manager for protecting Backend deletion.
Call Backend delete() method in order to prevent deletion
of Backends that host non-deleted VirtualMachines.
"""
def get_query_set(self):
return BackendQuerySet(self.model, using=self._db)
class BackendQuerySet(QuerySet):
def delete(self):
for backend in self._clone():
backend.delete()
......@@ -37,7 +37,7 @@ from synnefo.lib.commissioning import CorruptedError
from django.db.models import (Model, BigIntegerField, CharField,
ForeignKey, AutoField)
from django.db import transaction
from .managers import ForUpdateManager
class Holder(Model):
......@@ -45,6 +45,7 @@ class Holder(Model):
intval = BigIntegerField()
strval = CharField(max_length=4096)
objects = ForUpdateManager()
class Entity(Model):
......@@ -53,6 +54,7 @@ class Entity(Model):
related_name='entities')
key = CharField(max_length=4096, null=False)
objects = ForUpdateManager()
class Policy(Model):
......@@ -62,6 +64,7 @@ class Policy(Model):
import_limit = BigIntegerField(null=True, default=None)
export_limit = BigIntegerField(null=True, default=None)
objects = ForUpdateManager()
class Holding(Model):
......@@ -80,6 +83,8 @@ class Holding(Model):
released = BigIntegerField(null=False, default=0)
releasing = BigIntegerField(null=False, default=0)
objects = ForUpdateManager()
class Meta:
unique_together = (('entity', 'resource'),)
......@@ -98,6 +103,7 @@ class Commission(Model):
clientkey = CharField(max_length=4096, null=False)
issue_time = CharField(max_length=24, default=now)
objects = ForUpdateManager()
class Provision(Model):
......@@ -109,6 +115,7 @@ class Provision(Model):
resource = CharField(max_length=4096, null=False)
quantity = BigIntegerField(null=False)
objects = ForUpdateManager()
class ProvisionLog(Model):
......@@ -138,6 +145,7 @@ class ProvisionLog(Model):
delta_quantity = BigIntegerField(null=False)
reason = CharField(max_length=4096)
objects = ForUpdateManager()
def source_allocated_through(self):
return self.source_imported - self.source_released
......@@ -179,3 +187,37 @@ class ProvisionLog(Model):
def target_outbound(self):
return self.target_outbound_through() + self.target_exported
def _access(*args, **kwargs):
method = args[0]
model = args[1]
args = args[2:]
o = model.objects
try:
if kwargs['for_update']:
del kwargs['for_update']
o = o.select_for_update()
except KeyError:
pass
f = getattr(o, method)
return f(*args, **kwargs)
def _get(*args, **kwargs):
return _access('get', *args, **kwargs)
def _filter(*args, **kwargs):
return _access('filter', *args, **kwargs)
def db_get_holding(*args, **kwargs):
return _get(Holding, *args, **kwargs)
def db_get_entity(*args, **kwargs):
return _get(Entity, *args, **kwargs)
def db_get_policy(*args, **kwargs):
return _get(Policy, *args, **kwargs)
def db_get_commission(*args, **kwargs):
return _get(Commission, *args, **kwargs)
def db_filter_provision(*args, **kwargs):
return _filter(Provision, *args, **kwargs)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment