Commit 3a6c7968 authored by Kostas Papadimitriou's avatar Kostas Papadimitriou
Browse files

astakos: Shibboleth EPPN migration functionality

Prior to this commit astakos used the mod_shib2 EPPN header value as the
unique identifier for associating shibboleth idp users to astakos user entries.

This commit alters this behaviour and from now on astakos resloves unique
identifier from the REMOTE_USER header. REMOTE_USER is a header mod_shib2 sets
containing a value of the available shibboleth IdP metadata. The metadata
key (persistent-id or eppn in most common scenarios) used can be configured
from within shibboleth2.xml config file.

<ApplicationDefaults id="default" .... .... REMOTE_USER="persistent-id"...>

An additional setting ``ASTAKOS_SHIBBOLETH_MIGRATE_EPPN`` is added in order
to facilitate migration of existing EPPN entries to persistent-id/targeted-id
(or whichever metadata the REMOTE_USER maps to). When set to ``True``, after
each shibboleth login astakos will try to migrate the existing EPPN entry
by following the below mentioned steps:

* If no REMOTE_USER header exists or is empty, redirect to an error view.
  Otherwise continue to the next step.
* Resolve EPPN header and check if an account is currently associated with this
  EPPN.
* If user exists, retrieve user's shibboleth entry (AstakosUserAuthProvider
  instance) and replace stored identifier (EPPN) with the identifier contained
  in REMOTE_USER header.
* Continue to login or signup process using REMOTE_USER value as the unique
  user identifier that associates astakos user to the shibboleth account.
parent 292af98e
......@@ -36,7 +36,7 @@ import json
from synnefo.lib.ordereddict import OrderedDict
from django.core.urlresolvers import reverse
from django.core.urlresolvers import reverse, NoReverseMatch
from django.utils.translation import ugettext as _
from django.contrib.auth.models import Group
from django import template
......@@ -556,7 +556,7 @@ class AuthProvider(object):
class LocalAuthProvider(AuthProvider):
module = 'local'
login_view = 'password_change'
login_view = 'login'
remote_authenticate = False
username_key = 'user_email'
......@@ -576,9 +576,16 @@ class LocalAuthProvider(AuthProvider):
@property
def urls(self):
urls = super(LocalAuthProvider, self).urls
urls['change_password'] = reverse('password_change')
password_change_url = None
try:
password_change_url = reverse('password_change')
except NoReverseMatch:
pass
urls['change_password'] = password_change_url
if self.user:
urls['add'] = reverse('password_change')
urls['add'] = password_change_url
if self._instance:
urls.update({
'remove': reverse('remove_auth_provider',
......@@ -605,6 +612,7 @@ class ShibbolethAuthProvider(AuthProvider):
messages = {
'title': _('Academic'),
'method_details': '{account_prompt}: {provider_info_eppn}',
'login_description': _('If you are a student, professor or researcher'
' you can login using your academic account.'),
'add_prompt': _('Allows you to login using your Academic '
......
......@@ -125,7 +125,7 @@ SHIBBOLETH_INACTIVE_ACC = (
'yet activated. '
'If that is your account, you need to activate it before being able to '
'associate it with this shibboleth account.')
SHIBBOLETH_MISSING_EPPN = (
SHIBBOLETH_MISSING_USER_ID = (
'Your request is missing a unique ' +
'token. This means your academic ' +
'institution does not yet allow its users to log ' +
......
......@@ -143,6 +143,10 @@ USAGE_UPDATE_INTERVAL = getattr(settings, 'ASTAKOS_USAGE_UPDATE_INTERVAL',
ENABLE_LOCAL_ACCOUNT_MIGRATION = getattr(
settings, 'ASTAKOS_ENABLE_LOCAL_ACCOUNT_MIGRATION', True)
# Migrate eppn identifiers to remote id
SHIBBOLETH_MIGRATE_EPPN = getattr(settings, 'ASTAKOS_SHIBBOLETH_MIGRATE_EPPN',
False)
# Strict shibboleth usage
SHIBBOLETH_REQUIRE_NAME_INFO = getattr(settings,
'ASTAKOS_SHIBBOLETH_REQUIRE_NAME_INFO',
......
......@@ -61,11 +61,11 @@ class ShibbolethTests(TestCase):
# shibboleth views validation
# eepn required
r = client.get(ui_url('login/shibboleth?'), follow=True)
self.assertContains(r, messages.SHIBBOLETH_MISSING_EPPN % {
self.assertContains(r, messages.SHIBBOLETH_MISSING_USER_ID % {
'domain': astakos_settings.BASE_HOST,
'contact_email': settings.CONTACT_EMAIL
})
client.set_tokens(eppn="kpapeppn")
client.set_tokens(remote_user="kpapeppn", eppn="kpapeppn")
astakos_settings.SHIBBOLETH_REQUIRE_NAME_INFO = True
# shibboleth user info required
......@@ -74,7 +74,7 @@ class ShibbolethTests(TestCase):
astakos_settings.SHIBBOLETH_REQUIRE_NAME_INFO = False
# shibboleth logged us in
client.set_tokens(mail="kpap@synnefo.org", eppn="kpapeppn",
client.set_tokens(mail="kpap@synnefo.org", remote_user="kpapeppn",
cn="Kostas Papadimitriou",
ep_affiliation="Test Affiliation")
r = client.get(ui_url('login/shibboleth?'), follow=True)
......@@ -138,7 +138,7 @@ class ShibbolethTests(TestCase):
self.assertTrue('headers' in provider.info)
# login (not activated yet)
client.set_tokens(mail="kpap@synnefo.org", eppn="kpapeppn",
client.set_tokens(mail="kpap@synnefo.org", remote_user="kpapeppn",
cn="Kostas Papadimitriou")
r = client.get(ui_url("login/shibboleth?"), follow=True)
self.assertContains(r, 'is pending moderation')
......@@ -177,7 +177,7 @@ class ShibbolethTests(TestCase):
client = ShibbolethClient()
# shibboleth logged us in, notice that we use different email
client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
client.set_tokens(mail="kpap@shibboleth.gr", remote_user="kpapeppn",
cn="Kostas Papadimitriou", )
r = client.get(ui_url("login/shibboleth?"), follow=True)
......@@ -219,7 +219,7 @@ class ShibbolethTests(TestCase):
'username': 'kpap@synnefo.org'}
r = client.post(ui_url('local'), post_data, follow=True)
self.assertTrue(r.context['request'].user.is_authenticated())
client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
client.set_tokens(mail="kpap@shibboleth.gr", remote_user="kpapeppn",
cn="Kostas Papadimitriou", )
r = client.get(ui_url("login/shibboleth?"), follow=True)
self.assertContains(r, "enabled for this account")
......@@ -232,7 +232,7 @@ class ShibbolethTests(TestCase):
client.logout()
# look Ma, i can login with both my shibboleth and local account
client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
client.set_tokens(mail="kpap@shibboleth.gr", remote_user="kpapeppn",
cn="Kostas Papadimitriou")
r = client.get(ui_url("login/shibboleth?"), follow=True)
self.assertTrue(r.context['request'].user.is_authenticated())
......@@ -256,7 +256,7 @@ class ShibbolethTests(TestCase):
self.assertEqual(r.status_code, 200)
# cannot add the same eppn
client.set_tokens(mail="secondary@shibboleth.gr", eppn="kpapeppn",
client.set_tokens(mail="secondary@shibboleth.gr", remote_user="kpapeppn",
cn="Kostas Papadimitriou", )
r = client.get(ui_url("login/shibboleth?"), follow=True)
self.assertRedirects(r, ui_url('landing'))
......@@ -264,7 +264,7 @@ class ShibbolethTests(TestCase):
self.assertEquals(existing_user.auth_providers.count(), 2)
# only one allowed by default
client.set_tokens(mail="secondary@shibboleth.gr", eppn="kpapeppn2",
client.set_tokens(mail="secondary@shibboleth.gr", remote_user="kpapeppn2",
cn="Kostas Papadimitriou", ep_affiliation="affil2")
prov = auth_providers.get_provider('shibboleth')
r = client.get(ui_url("login/shibboleth?"), follow=True)
......@@ -276,7 +276,7 @@ class ShibbolethTests(TestCase):
client.reset_tokens()
# cannot login with another eppn
client.set_tokens(mail="kpap@synnefo.org", eppn="kpapeppninvalid",
client.set_tokens(mail="kpap@synnefo.org", remote_user="kpapeppninvalid",
cn="Kostas Papadimitriou")
r = client.get(ui_url("login/shibboleth?"), follow=True)
self.assertFalse(r.context['request'].user.is_authenticated())
......@@ -289,7 +289,7 @@ class ShibbolethTests(TestCase):
remove_local_url = user.get_auth_provider('local').get_remove_url
remove_shibbo_url = user.get_auth_provider('shibboleth',
'kpapeppn').get_remove_url
client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
client.set_tokens(mail="kpap@shibboleth.gr", remote_user="kpapeppn",
cn="Kostas Papadimtriou")
r = client.get(ui_url("login/shibboleth?"), follow=True)
client.reset_tokens()
......@@ -347,12 +347,12 @@ class ShibbolethTests(TestCase):
user2 = get_local_user('another@synnefo.org')
user2.add_auth_provider('shibboleth', identifier='existingeppn')
# login
client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
client.set_tokens(mail="kpap@shibboleth.gr", remote_user="kpapeppn",
cn="Kostas Papadimitriou")
r = client.get(ui_url("login/shibboleth?"), follow=True)
# try to assign existing shibboleth identifier of another user
client.set_tokens(mail="kpap_second@shibboleth.gr",
eppn="existingeppn", cn="Kostas Papadimitriou")
remote_user="existingeppn", cn="Kostas Papadimitriou")
r = client.get(ui_url("login/shibboleth?"), follow=True)
self.assertContains(r, "is already in use")
......@@ -371,6 +371,9 @@ class TestLocal(TestCase):
@im_settings(RECAPTCHA_ENABLED=True, RATELIMIT_RETRIES_ALLOWED=3)
def test_login_ratelimit(self):
from django.core.cache import cache
[cache.delete(key) for key in cache._cache.keys()]
credentials = {'username': 'γιού τι έφ', 'password': 'password'}
r = self.client.post(ui_url('local'), credentials, follow=True)
fields = r.context['login_form'].fields.keyOrder
......@@ -720,11 +723,46 @@ class UserActionsTests(TestCase):
Group.objects.all().delete()
TEST_TARGETED_ID1 = \
"https://idp.synnefo.org/idp/shibboleth!ZWxhIHJlIGVsYSByZSBlbGEgcmU="
TEST_TARGETED_ID2 = \
"https://idp.synnefo.org/idp/shibboleth!ZGUgc2UgeGFsYXNlLi4uLi4uLg=="
TEST_TARGETED_ID3 = \
"https://idp.synnefo.org/idp/shibboleth!"
class TestAuthProviderViews(TestCase):
def tearDown(self):
AstakosUser.objects.all().delete()
@im_settings(IM_MODULES=['shibboleth'], MODERATION_ENABLED=False,
SHIBBOLETH_MIGRATE_EPPN=True)
def migrate_to_remote_id(self):
eppn_user = get_local_user("eppn@synnefo.org")
tid_user = get_local_user("tid@synnefo.org")
eppn_user.add_auth_provider('shibboleth', 'EPPN')
tid_user.add_auth_provider('shibboleth', TEST_TARGETED_ID1)
get_user = lambda r: r.context['request'].user
client = ShibbolethClient()
client.set_tokens(eppn="EPPN", remote_user=TEST_TARGETED_ID2)
r = client.get(ui_url('login/shibboleth?'), follow=True)
self.assertTrue(get_user(r).is_authenticated())
self.assertEqual(eppn_user.get_auth_provider('shibboleth').identifier,
TEST_TARGETED_ID2)
client = ShibbolethClient()
client.set_tokens(eppn="EPPN", remote_user=TEST_TARGETED_ID1)
r = client.get(ui_url('login/shibboleth?'), follow=True)
self.assertTrue(get_user(r).is_authenticated())
self.assertEqual(tid_user.get_auth_provider('shibboleth').identifier,
TEST_TARGETED_ID1)
@shibboleth_settings(CREATION_GROUPS_POLICY=['academic-login'],
AUTOMODERATE_POLICY=True)
@im_settings(IM_MODULES=['shibboleth', 'local'], MODERATION_ENABLED=True,
......@@ -755,7 +793,7 @@ class TestAuthProviderViews(TestCase):
# new academic user
self.assertFalse(academic_users.filter(email='newuser@synnefo.org'))
cl_newuser.set_tokens(eppn="newusereppn", mail="newuser@synnefo.org",
cl_newuser.set_tokens(remote_user="newusereppn", mail="newuser@synnefo.org",
surname="Lastname")
r = cl_newuser.get(ui_url('login/shibboleth?'), follow=True)
initial = r.context['signup_form'].initial
......@@ -848,10 +886,10 @@ class TestAuthProviderViews(TestCase):
self.assertEqual(r.status_code, 200)
r = cl_olduser.get(ui_url('login/shibboleth?'), follow=True)
self.assertContains(r, 'Your request is missing a unique token')
cl_olduser.set_tokens(eppn="newusereppn")
cl_olduser.set_tokens(remote_user="newusereppn")
r = cl_olduser.get(ui_url('login/shibboleth?'), follow=True)
self.assertContains(r, 'already in use')
cl_olduser.set_tokens(eppn="oldusereppn")
cl_olduser.set_tokens(remote_user="oldusereppn")
r = cl_olduser.get(ui_url('login/shibboleth?'), follow=True)
self.assertContains(r, 'Academic login enabled for this account')
......
......@@ -38,7 +38,8 @@ from django.views.decorators.http import require_http_methods
from django.http import HttpResponseRedirect
from astakos.im.util import login_url
from astakos.im.models import AstakosUser
from astakos.im.models import AstakosUser, AstakosUserAuthProvider, \
PendingThirdPartyUser
from astakos.im import settings
from astakos.im.views.target import get_pending_key, \
handle_third_party_signup, handle_third_party_login, \
......@@ -51,6 +52,38 @@ import logging
logger = logging.getLogger(__name__)
def migrate_eppn_to_remote_id(eppn, remote_id):
"""
Retrieve active and pending accounts that are associated with shibboleth
using EPPN as the third party unique identifier update them by storing
REMOTE_USER value instead.
"""
if eppn == remote_id:
return
try:
provider = AstakosUserAuthProvider.objects.get(module='shibboleth',
identifier=eppn)
msg = "Migrating user %r eppn (%s -> %s)"
logger.info(msg, provider.user.log_display, eppn, remote_id)
provider.identifier = remote_id
provider.save()
except AstakosUserAuthProvider.DoesNotExist:
pass
pending_users = \
PendingThirdPartyUser.objects.filter(third_party_identifier=eppn,
provider='shibboleth')
for pending in pending_users:
msg = "Migrating pending user %s eppn (%s -> %s)"
logger.info(msg, pending.email, eppn, remote_id)
pending.third_party_identifier = remote_id
pending.save()
return remote_id
class Tokens:
# these are mapped by the Shibboleth SP software
SHIB_EPPN = "HTTP_EPPN" # eduPersonPrincipalName
......@@ -89,15 +122,18 @@ def login(request,
logger.info("shibboleth request: %r" % shibboleth_headers)
try:
eppn = tokens.get(Tokens.SHIB_EPPN)
eppn = tokens.get(Tokens.SHIB_EPPN, None)
user_id = tokens.get(Tokens.SHIB_REMOTE_USER)
fullname, first_name, last_name, email = None, None, None, None
if global_settings.DEBUG and not eppn:
user_id = getattr(global_settings, 'SHIBBOLETH_TEST_REMOTE_USER',
None)
eppn = getattr(global_settings, 'SHIBBOLETH_TEST_EPPN', None)
fullname = getattr(global_settings, 'SHIBBOLETH_TEST_FULLNAME',
None)
if not eppn:
raise KeyError(_(astakos_messages.SHIBBOLETH_MISSING_EPPN) % {
if not user_id:
raise KeyError(_(astakos_messages.SHIBBOLETH_MISSING_USER_ID) % {
'domain': settings.BASE_HOST,
'contact_email': settings.CONTACT_EMAIL
})
......@@ -125,16 +161,17 @@ def login(request,
messages.error(request, e.message)
return HttpResponseRedirect(login_url(request))
if settings.SHIBBOLETH_MIGRATE_EPPN:
migrate_eppn_to_remote_id(eppn, user_id)
affiliation = tokens.get(Tokens.SHIB_EP_AFFILIATION, 'Shibboleth')
email = tokens.get(Tokens.SHIB_MAIL, '')
eppn_info = tokens.get(Tokens.SHIB_EPPN)
provider_info = {'eppn': eppn_info, 'email': email, 'name': fullname,
'headers': shibboleth_headers}
userid = eppn
provider_info = {'eppn': eppn, 'email': email, 'name': fullname,
'headers': shibboleth_headers, 'user_id': user_id}
try:
return handle_third_party_login(request, 'shibboleth',
eppn, provider_info,
user_id, provider_info,
affiliation, third_party_key)
except AstakosUser.DoesNotExist, e:
third_party_key = get_pending_key(request)
......@@ -142,7 +179,7 @@ def login(request,
'first_name': first_name,
'last_name': last_name,
'email': email}
return handle_third_party_signup(request, userid, 'shibboleth',
return handle_third_party_signup(request, user_id, 'shibboleth',
third_party_key,
provider_info,
user_info,
......
......@@ -132,3 +132,7 @@
# A way to extend the resources presentation metadata
# ASTAKOS_RESOURCES_META = {}
# Migrate existing shibboleth user entries which where previously associated
# with EPPN instead of the provided value of REMOTE_ID mod_shib2 header.
# ASTAKOS_SHIBBOLETH_MIGRATE_EPPN = False
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment