Commit 00c13f59 authored by Sofia Papagiannaki's avatar Sofia Papagiannaki

astakos: Minor improvements in validate token api call

Add tests for validate access token endpoint
parent c2fd7455
......@@ -40,6 +40,7 @@ from django.core.cache import cache
from astakos.im import settings
from astakos.im.models import Service, AstakosUser
from astakos.oa2.backends.base import OA2Error
from astakos.oa2.backends.djangobackend import DjangoBackend
from .util import json_response, xml_response, validate_user,\
get_content_length
......@@ -146,7 +147,7 @@ def validate_token(request, token_id):
oa2_backend = DjangoBackend()
try:
token = oa2_backend.consume_token(token_id)
except Exception, e:
except OA2Error, e:
raise faults.ItemNotFound(e.message)
belongsTo = request.GET.get('belongsTo')
......
......@@ -33,6 +33,8 @@
from astakos.im.tests.common import *
from astakos.im.settings import astakos_services, BASE_HOST
from astakos.oa2.backends import DjangoBackend
from synnefo.lib.services import get_service_path
from synnefo.lib import join_urls
......@@ -43,6 +45,7 @@ from datetime import date
#from xml.dom import minidom
import json
import time
ROOT = "/%s/%s/%s/" % (
astakos_settings.BASE_PATH, astakos_settings.ACCOUNTS_PREFIX, 'v1.0')
......@@ -446,6 +449,14 @@ class TokensApiTest(TestCase):
e3.data.create(key='versionId', value='v2.0')
e3.data.create(key='publicURL', value='http://localhost:8000/s3/v2.0')
oa2_backend = DjangoBackend()
self.token = oa2_backend.token_model.create(
code='12345',
expires_at=datetime.now() + timedelta(seconds=5),
user=self.user1,
client=oa2_backend.client_model.create(type='public'),
redirect_uri='https://server.com/handle_code')
def test_authenticate(self):
client = Client()
url = reverse('astakos.api.tokens.authenticate')
......@@ -570,7 +581,7 @@ class TokensApiTest(TestCase):
r = client.post(url, post_data, content_type='application/json')
self.assertEqual(r.status_code, 200)
# Check successful json response
# Check successful json response: user credential auth
post_data = """{"auth":{"passwordCredentials":{"username":"%s",
"password":"%s"},
"tenantName":"%s"}}""" % (
......@@ -594,6 +605,29 @@ class TokensApiTest(TestCase):
self.assertEqual(user, self.user1.uuid)
self.assertEqual(len(service_catalog), 3)
# Check successful json response: token auth
post_data = """{"auth":{"token":{"id":"%s"},
"tenantName":"%s"}}""" % (
self.user1.auth_token, self.user1.uuid)
r = client.post(url, post_data, content_type='application/json')
self.assertEqual(r.status_code, 200)
self.assertTrue(r['Content-Type'].startswith('application/json'))
try:
body = json.loads(r.content)
except Exception, e:
self.fail(e)
try:
token = body['access']['token']['id']
user = body['access']['user']['id']
service_catalog = body['access']['serviceCatalog']
except KeyError:
self.fail('Invalid response')
self.assertEqual(token, self.user1.auth_token)
self.assertEqual(user, self.user1.uuid)
self.assertEqual(len(service_catalog), 3)
# Check successful xml response
headers = {'HTTP_ACCEPT': 'application/xml'}
post_data = """{"auth":{"passwordCredentials":{"username":"%s",
......@@ -609,6 +643,13 @@ class TokensApiTest(TestCase):
# except Exception, e:
# self.fail(e)
# oath access token authorization
post_data = """{"auth":{"token":{"id":"%s"},
"tenantName":"%s"}}""" % (
self.token.code, self.user1.uuid)
r = client.post(url, post_data, content_type='application/json')
self.assertEqual(r.status_code, 401)
class UserCatalogsTest(TestCase):
def test_get_uuid_displayname_catalogs(self):
......@@ -707,3 +748,56 @@ class WrongPathAPITest(TestCase):
json.loads(response.content)
except ValueError:
self.assertTrue(False)
class ValidateAccessToken(TestCase):
def setUp(self):
self.oa2_backend = DjangoBackend()
self.user = AstakosUser.objects.create(username="user@synnefo.org")
self.token = self.oa2_backend.token_model.create(
code='12345',
expires_at=datetime.now() + timedelta(seconds=5),
user=self.user,
client=self.oa2_backend.client_model.create(type='public'),
redirect_uri='https://server.com/handle_code')
def test_validate_token(self):
# invalid token
url = reverse('astakos.api.tokens.validate_token',
kwargs={'token_id': 'invalid'})
r = self.client.get(url)
self.assertEqual(r.status_code, 404)
# valid token
url = reverse('astakos.api.tokens.validate_token',
kwargs={'token_id': self.token.code})
r = self.client.head(url)
self.assertEqual(r.status_code, 400)
r = self.client.put(url)
self.assertEqual(r.status_code, 400)
r = self.client.post(url)
self.assertEqual(r.status_code, 400)
r = self.client.get(url)
self.assertEqual(r.status_code, 200)
self.assertTrue(r['Content-Type'].startswith('application/json'))
try:
body = json.loads(r.content)
user = body['access']['user']['id']
self.assertEqual(user, self.user.uuid)
except Exception:
self.fail('Unexpected response content')
# expired token
sleep_time = (self.token.expires_at - datetime.now()).total_seconds()
time.sleep(sleep_time)
r = self.client.get(url)
self.assertEqual(r.status_code, 404)
# assert expired token has been deleted
self.assertEqual(self.oa2_backend.token_model.count(), 0)
# user authentication token
url = reverse('astakos.api.tokens.validate_token',
kwargs={'token_id': self.user.auth_token})
self.assertEqual(r.status_code, 404)
......@@ -6,8 +6,6 @@ import json
from base64 import b64encode, b64decode
from hashlib import sha512
from time import time, mktime
import logging
logger = logging.getLogger(__name__)
......@@ -413,8 +411,7 @@ class SimpleBackend(object):
def consume_token(self, token):
token_instance = self.get_token(token)
expires_at = mktime(token_instance.expires_at.timetuple())
if time() > expires_at:
if datetime.datetime.now() > token_instance.expires_at:
self.delete_token(token_instance) # delete expired token
raise OA2Error("Token has expired")
# TODO: delete token?
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment