Commit 6d8e24e8 authored by Giorgos Korfiatis's avatar Giorgos Korfiatis Committed by Georgios D. Tsoukalas
Browse files

Make ForUpdateManager thread-safe

The manager held a global flag `_for_update' on a single manager object,
which could be read or set by multiple threads.

select_for_update() is now provided by a subclass of QuerySet.
It must be used as the last modifier of a query set,
e.g. Project.objects.filter(state=1).select_for_update(),
and cannot be combined with operators provided by other subclasses,
such as values_list().

Since get() returns an element rather than a query set,
the manager also provides a specific function get_for_update().
parent d7936ff8
......@@ -440,15 +440,14 @@ def get_project_by_name(name):
def get_project_for_update(project_id):
try:
return Project.objects.select_for_update().get(id=project_id)
return Project.objects.get_for_update(id=project_id)
except Project.DoesNotExist:
raise IOError(
_(astakos_messages.UNKNOWN_PROJECT_ID) % project_id)
def get_application_for_update(application_id):
try:
objects = ProjectApplication.objects.select_for_update()
return objects.get(id=application_id)
return ProjectApplication.objects.get_for_update(id=application_id)
except ProjectApplication.DoesNotExist:
m = _(astakos_messages.UNKNOWN_PROJECT_APPLICATION_ID) % application_id
raise IOError(m)
......@@ -488,8 +487,8 @@ def get_membership_for_update(project, user):
if isinstance(user, (int, long)):
user = get_user_by_id(user)
try:
sfu = ProjectMembership.objects.select_for_update()
m = sfu.get(project=project, person=user)
objs = ProjectMembership.objects
m = objs.get_for_update(project=project, person=user)
if m.is_pending:
raise PendingMembershipError()
return m
......@@ -684,8 +683,8 @@ def submit_application(kw, request_user=None):
precursor = None
precursor_id = kw.get('precursor_application', None)
if precursor_id is not None:
sfu = ProjectApplication.objects.select_for_update()
precursor = sfu.get(id=precursor_id)
objs = ProjectApplication.objects
precursor = objs.get_for_update(id=precursor_id)
kw['precursor_application'] = precursor
if (request_user and
......@@ -702,8 +701,9 @@ def submit_application(kw, request_user=None):
else:
chain = precursor.chain
application.chain = chain
sfu = ProjectApplication.objects.select_for_update()
pending = sfu.filter(chain=chain, state=ProjectApplication.PENDING)
objs = ProjectApplication.objects
q = objs.filter(chain=chain, state=ProjectApplication.PENDING)
pending = q.select_for_update()
for app in pending:
app.state = ProjectApplication.REPLACED
app.save()
......@@ -749,8 +749,8 @@ def deny_application(application_id):
def approve_application(app_id):
try:
objects = ProjectApplication.objects.select_for_update()
application = objects.get(id=app_id)
objects = ProjectApplication.objects
application = objects.get_for_update(id=app_id)
except ProjectApplication.DoesNotExist:
m = _(astakos_messages.UNKNOWN_PROJECT_APPLICATION_ID % (app_id,))
raise PermissionDenied(m)
......
# Copyright 2012 GRNET S.A. All rights reserved.
# Copyright 2012, 2013 GRNET S.A. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
......@@ -45,27 +45,15 @@ class ForUpdateManager(Manager):
transactions that abort due to deadlocks.
Example:
networks = Network.objects.select_for_update().filter(public=True)
networks = Network.objects.filter(public=True).select_for_update()
"""
def __init__(self, *args, **kwargs):
super(ForUpdateManager, self).__init__(*args, **kwargs)
self._select_for_update = False
def filter(self, *args, **kwargs):
query = self.get_query_set().filter(*args, **kwargs)
if self._select_for_update:
self._select_for_update = False
return for_update(query)
else:
return query
def get(self, *args, **kwargs):
if not self._select_for_update:
return self.get_query_set().get(*args, **kwargs)
def get_query_set(self):
return ForUpdateQuerySet(self.model, using=self._db)
query = self.filter(*args, **kwargs)
def get_for_update(self, *args, **kwargs):
query = for_update(self.filter(*args, **kwargs))
query = list(query)
num = len(query)
if num == 1:
......@@ -80,9 +68,11 @@ class ForUpdateManager(Manager):
"Lookup parameters were %s" %
(self.model._meta.object_name, num, kwargs))
def select_for_update(self, *args, **kwargs):
self._select_for_update = True
return self
class ForUpdateQuerySet(QuerySet):
def select_for_update(self):
return for_update(self)
def for_update(query):
......@@ -95,21 +85,3 @@ def for_update(query):
sql, params = query.query.get_compiler(query.db).as_sql()
return query.model._default_manager.raw(sql.rstrip() + ' FOR UPDATE',
params)
class ProtectedDeleteManager(ForUpdateManager):
""" Manager for protecting Backend deletion.
Call Backend delete() method in order to prevent deletion
of Backends that host non-deleted VirtualMachines.
"""
def get_query_set(self):
return BackendQuerySet(self.model, using=self._db)
class BackendQuerySet(QuerySet):
def delete(self):
for backend in self._clone():
backend.delete()
......@@ -1656,8 +1656,8 @@ class ProjectApplication(models.Model):
def _get_project_for_update(self):
try:
objects = Project.objects.select_for_update()
project = objects.get(id=self.chain)
objects = Project.objects
project = objects.get_for_update(id=self.chain)
return project
except Project.DoesNotExist:
return None
......@@ -2459,8 +2459,9 @@ class SyncError(Exception):
pass
def reset_serials(serials):
sfu = ProjectMembership.objects.select_for_update()
memberships = list(sfu.filter(pending_serial__in=serials))
objs = ProjectMembership.objects
q = objs.filter(pending_serial__in=serials).select_for_update()
memberships = list(q)
if memberships:
for membership in memberships:
......@@ -2473,8 +2474,9 @@ def sync_finish_serials(serials_to_ack=None):
serials_to_ack = qh_query_serials([])
serials_to_ack = set(serials_to_ack)
sfu = ProjectMembership.objects.select_for_update()
memberships = list(sfu.filter(pending_serial__isnull=False))
objs = ProjectMembership.objects
q = objs.filter(pending_serial__isnull=False).select_for_update()
memberships = list(q)
if memberships:
for membership in memberships:
......@@ -2493,24 +2495,25 @@ def pre_sync_projects(sync=True):
ACCEPTED = ProjectMembership.ACCEPTED
LEAVE_REQUESTED = ProjectMembership.LEAVE_REQUESTED
PROJECT_DEACTIVATED = ProjectMembership.PROJECT_DEACTIVATED
psfu = Project.objects.select_for_update()
objs = Project.objects
modified = list(psfu.modified_projects())
modified = list(objs.modified_projects().select_for_update())
if sync:
for project in modified:
objects = project.projectmembership_set.select_for_update()
objects = project.projectmembership_set
memberships = objects.actually_accepted()
memberships = objects.actually_accepted().select_for_update()
for membership in memberships:
membership.is_pending = True
membership.save()
reactivating = list(psfu.reactivating_projects())
reactivating = list(objs.reactivating_projects().select_for_update())
if sync:
for project in reactivating:
objects = project.projectmembership_set.select_for_update()
objects = project.projectmembership_set
memberships = objects.filter(state=PROJECT_DEACTIVATED)
q = objects.filter(state=PROJECT_DEACTIVATED)
memberships = q.select_for_update()
for membership in memberships:
membership.is_pending = True
if membership.leave_request_date is None:
......@@ -2519,31 +2522,32 @@ def pre_sync_projects(sync=True):
membership.state = LEAVE_REQUESTED
membership.save()
deactivating = list(psfu.deactivating_projects())
deactivating = list(objs.deactivating_projects().select_for_update())
if sync:
for project in deactivating:
objects = project.projectmembership_set.select_for_update()
objects = project.projectmembership_set
# Note: we keep a user-level deactivation
# (e.g. USER_SUSPENDED) intact
memberships = objects.actually_accepted()
memberships = objects.actually_accepted().select_for_update()
for membership in memberships:
membership.is_pending = True
membership.state = PROJECT_DEACTIVATED
membership.save()
# transaction.commit()
return (modified, reactivating, deactivating)
def set_sync_projects(exclude=None):
ACTUALLY_ACCEPTED = ProjectMembership.ACTUALLY_ACCEPTED
objects = ProjectMembership.objects.select_for_update()
objects = ProjectMembership.objects
sub_quota, add_quota = [], []
serial = new_serial()
pending = objects.filter(is_pending=True)
pending = objects.filter(is_pending=True).select_for_update()
for membership in pending:
if membership.pending_application:
......@@ -2596,33 +2600,31 @@ def do_sync_projects():
def post_sync_projects():
PROJECT_DEACTIVATED = ProjectMembership.PROJECT_DEACTIVATED
Q_ACTUALLY_ACCEPTED = ProjectMembership.Q_ACTUALLY_ACCEPTED
psfu = Project.objects.select_for_update()
objs = Project.objects
modified = psfu.modified_projects()
modified = objs.modified_projects().select_for_update()
for project in modified:
objects = project.projectmembership_set.select_for_update()
memberships = list(objects.filter(Q_ACTUALLY_ACCEPTED &
Q(is_pending=True)))
objects = project.projectmembership_set
q = objects.filter(Q_ACTUALLY_ACCEPTED & Q(is_pending=True))
memberships = list(q.select_for_update())
if not memberships:
project.is_modified = False
project.save()
reactivating = psfu.reactivating_projects()
reactivating = objs.reactivating_projects().select_for_update()
for project in reactivating:
objects = project.projectmembership_set.select_for_update()
memberships = list(objects.filter(Q(state=PROJECT_DEACTIVATED) |
Q(is_pending=True)))
objects = project.projectmembership_set
q = objects.filter(Q(state=PROJECT_DEACTIVATED) | Q(is_pending=True))
memberships = list(q.select_for_update())
if not memberships:
project.reactivate()
project.save()
deactivating = psfu.deactivating_projects()
deactivating = objs.deactivating_projects().select_for_update()
for project in deactivating:
objects = project.projectmembership_set.select_for_update()
memberships = list(objects.filter(Q_ACTUALLY_ACCEPTED |
Q(is_pending=True)))
objects = project.projectmembership_set
q = objects.filter(Q_ACTUALLY_ACCEPTED | Q(is_pending=True))
memberships = list(q.select_for_update())
if not memberships:
project.deactivate()
project.save()
......
# Copyright 2012 GRNET S.A. All rights reserved.
# Copyright 2012, 2013 GRNET S.A. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
......@@ -45,27 +45,15 @@ class ForUpdateManager(Manager):
transactions that abort due to deadlocks.
Example:
networks = Network.objects.select_for_update().filter(public=True)
networks = Network.objects.filter(public=True).select_for_update()
"""
def __init__(self, *args, **kwargs):
super(ForUpdateManager, self).__init__(*args, **kwargs)
self._select_for_update = False
def filter(self, *args, **kwargs):
query = self.get_query_set().filter(*args, **kwargs)
if self._select_for_update:
self._select_for_update = False
return for_update(query)
else:
return query
def get(self, *args, **kwargs):
if not self._select_for_update:
return self.get_query_set().get(*args, **kwargs)
def get_query_set(self):
return ForUpdateQuerySet(self.model, using=self._db)
query = self.filter(*args, **kwargs)
def get_for_update(self, *args, **kwargs):
query = for_update(self.filter(*args, **kwargs))
query = list(query)
num = len(query)
if num == 1:
......@@ -80,9 +68,11 @@ class ForUpdateManager(Manager):
"Lookup parameters were %s" %
(self.model._meta.object_name, num, kwargs))
def select_for_update(self, *args, **kwargs):
self._select_for_update = True
return self
class ForUpdateQuerySet(QuerySet):
def select_for_update(self):
return for_update(self)
def for_update(query):
......@@ -95,21 +85,3 @@ def for_update(query):
sql, params = query.query.get_compiler(query.db).as_sql()
return query.model._default_manager.raw(sql.rstrip() + ' FOR UPDATE',
params)
class ProtectedDeleteManager(ForUpdateManager):
""" Manager for protecting Backend deletion.
Call Backend delete() method in order to prevent deletion
of Backends that host non-deleted VirtualMachines.
"""
def get_query_set(self):
return BackendQuerySet(self.model, using=self._db)
class BackendQuerySet(QuerySet):
def delete(self):
for backend in self._clone():
backend.delete()
# Copyright 2012 GRNET S.A. All rights reserved.
# Copyright 2012, 2013 GRNET S.A. All rights reserved.
#
# Redistribution and use in source and binary forms, with or
# without modification, are permitted provided that the following
......@@ -199,25 +199,26 @@ class CallSerial(Model):
unique_together = (('serial', 'clientkey'),)
def _access(*args, **kwargs):
method = args[0]
model = args[1]
args = args[2:]
def _get(*args, **kwargs):
model = args[0]
args = args[1:]
o = model.objects
try:
if kwargs['for_update']:
del kwargs['for_update']
o = o.select_for_update()
except KeyError:
pass
f = getattr(o, method)
for_update = kwargs.pop('for_update', False)
f = o.get_for_update if for_update else o.get
return f(*args, **kwargs)
def _get(*args, **kwargs):
return _access('get', *args, **kwargs)
def _filter(*args, **kwargs):
return _access('filter', *args, **kwargs)
model = args[0]
args = args[1:]
o = model.objects
for_update = kwargs.pop('for_update', False)
q = o.filter(*args, **kwargs)
q = q.select_for_update() if for_update else q
return q
def db_get_holding(*args, **kwargs):
return _get(Holding, *args, **kwargs)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment