tests.py 8.32 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# Copyright 2011 GRNET S.A. All rights reserved.
#
# Redistribution and use in source and binary forms, with or
# without modification, are permitted provided that the following
# conditions are met:
#
#   1. Redistributions of source code must retain the above
#      copyright notice, this list of conditions and the following
#      disclaimer.
#
#   2. Redistributions in binary form must reproduce the above
#      copyright notice, this list of conditions and the following
#      disclaimer in the documentation and/or other materials
#      provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# The views and conclusions contained in the software and
# documentation are those of the authors and should not be
# interpreted as representing official policies, either expressed
# or implied, of GRNET S.A.
#
Kostas Papadimitriou's avatar
Kostas Papadimitriou committed
34

35
from django import http
36
from django.test import TransactionTestCase
Kostas Papadimitriou's avatar
Kostas Papadimitriou committed
37
38
39
from django.conf import settings
from django.test.client import Client
from django.core.urlresolvers import clear_url_caches
Kostas Papadimitriou's avatar
Kostas Papadimitriou committed
40
from django.utils import simplejson as json
41
from django.conf import settings
42
from django.core.urlresolvers import reverse
Kostas Papadimitriou's avatar
Kostas Papadimitriou committed
43

44
from synnefo.userdata.models import *
45

Kostas Papadimitriou's avatar
Kostas Papadimitriou committed
46
47
48
49

class AaiClient(Client):

    def request(self, **request):
50
        request['HTTP_X_AUTH_TOKEN'] = '0000'
Kostas Papadimitriou's avatar
Kostas Papadimitriou committed
51
52
        return super(AaiClient, self).request(**request)

53
class TestRestViews(TransactionTestCase):
Kostas Papadimitriou's avatar
Kostas Papadimitriou committed
54
55
56
57

    fixtures = ['users']

    def setUp(self):
58
59
        settings.USERDATA_MAX_SSH_KEYS_PER_USER = 10

Kostas Papadimitriou's avatar
Kostas Papadimitriou committed
60
61
62
        def get_user_mock(request, *Args, **kwargs):
            if request.META.get('HTTP_X_AUTH_TOKEN', None) == '0000':
                request.user_uniq = 'test'
63
64
65
                request.user = {'id': 'id',
                                'username': 'username',
                                'uuid': 'test'}
Kostas Papadimitriou's avatar
Kostas Papadimitriou committed
66
67

        # mock the astakos authentication function
68
        from snf_django.lib import astakos
Kostas Papadimitriou's avatar
Kostas Papadimitriou committed
69
70
        astakos.get_user = get_user_mock

71
        settings.SKIP_SSH_VALIDATION = True
Kostas Papadimitriou's avatar
Kostas Papadimitriou committed
72
        self.client = AaiClient()
73
        self.user = 'test'
74
        self.keys_url = reverse('ui_keys_collection')
Kostas Papadimitriou's avatar
Kostas Papadimitriou committed
75
76

    def test_keys_collection_get(self):
77
        resp = self.client.get(self.keys_url)
Kostas Papadimitriou's avatar
Kostas Papadimitriou committed
78
79
80
81
82
        self.assertEqual(resp.content, "[]")

        PublicKeyPair.objects.create(user=self.user, name="key pair 1",
                content="content1")

83
84
85
86
87
88
        resp = self.client.get(self.keys_url)
        resp_list = json.loads(resp.content);
        exp_list = [{"content": "content1", "id": 1,
                    "uri": self.keys_url + "/1", "name": "key pair 1",
                    "fingerprint": "unknown fingerprint"}]
        self.assertEqual(resp_list, exp_list)
Kostas Papadimitriou's avatar
Kostas Papadimitriou committed
89
90
91
92

        PublicKeyPair.objects.create(user=self.user, name="key pair 2",
                content="content2")

93
94
95
96
97
98
99
100
101
102
103
        resp = self.client.get(self.keys_url)
        resp_list = json.loads(resp.content)
        exp_list = [{"content": "content1", "id": 1,
                     "uri": self.keys_url + "/1", "name": "key pair 1",
                     "fingerprint": "unknown fingerprint"},
                    {"content": "content2", "id": 2,
                     "uri": self.keys_url + "/2",
                     "name": "key pair 2",
                     "fingerprint": "unknown fingerprint"}]

        self.assertEqual(resp_list, exp_list)
Kostas Papadimitriou's avatar
Kostas Papadimitriou committed
104
105

    def test_keys_resourse_get(self):
106
        resp = self.client.get(self.keys_url + "/1")
Kostas Papadimitriou's avatar
Kostas Papadimitriou committed
107
108
109
110
111
        self.assertEqual(resp.status_code, 404)

        # create a public key
        PublicKeyPair.objects.create(user=self.user, name="key pair 1",
                content="content1")
112
113
114
115
116
117
        resp = self.client.get(self.keys_url + "/1")
        resp_dict = json.loads(resp.content);
        exp_dict = {"content": "content1", "id": 1,
                    "uri": self.keys_url + "/1", "name": "key pair 1",
                    "fingerprint": "unknown fingerprint"}
        self.assertEqual(resp_dict, exp_dict)
Kostas Papadimitriou's avatar
Kostas Papadimitriou committed
118
119

        # update
120
121
122
123
        resp = self.client.put(self.keys_url + "/1",
                               json.dumps({'name':'key pair 1 new name'}),
                               content_type='application/json')

Kostas Papadimitriou's avatar
Kostas Papadimitriou committed
124
125
126
127
        pk = PublicKeyPair.objects.get(pk=1)
        self.assertEqual(pk.name, "key pair 1 new name")

        # delete
128
        resp = self.client.delete(self.keys_url + "/1")
Kostas Papadimitriou's avatar
Kostas Papadimitriou committed
129
130
        self.assertEqual(PublicKeyPair.objects.count(), 0)

131
        resp = self.client.get(self.keys_url + "/1")
Kostas Papadimitriou's avatar
Kostas Papadimitriou committed
132
133
        self.assertEqual(resp.status_code, 404)

134
        resp = self.client.get(self.keys_url)
Kostas Papadimitriou's avatar
Kostas Papadimitriou committed
135
        self.assertEqual(resp.content, "[]")
136
137

        # test rest create
138
139
140
141
        resp = self.client.post(self.keys_url,
                                json.dumps({'name':'key pair 2',
                                            'content':"""key 2 content"""}),
                                content_type='application/json')
142
        self.assertEqual(PublicKeyPair.objects.count(), 1)
143
        pk = PublicKeyPair.objects.get()
144
145
146
        self.assertEqual(pk.name, "key pair 2")
        self.assertEqual(pk.content, "key 2 content")

147
148
149
150
    def test_generate_views(self):
        import base64

        # just test that
151
        resp = self.client.post(self.keys_url + "/generate")
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
        self.assertNotEqual(resp, "")

        data = json.loads(resp.content)
        self.assertEqual(data.has_key('private'), True)
        self.assertEqual(data.has_key('private'), True)

        # public key is base64 encoded
        base64.b64decode(data['public'].replace("ssh-rsa ",""))

        # remove header/footer
        private = "".join(data['private'].split("\n")[1:-1])

        # private key is base64 encoded
        base64.b64decode(private)

167
168
169
        new_key = PublicKeyPair()
        new_key.content = data['public']
        new_key.name = "new key"
170
        new_key.user = 'test'
171
172
173
        new_key.full_clean()
        new_key.save()

174
175
176
177
178
179
180
181
182
183
    def test_generate_limit(self):
        settings.USERDATA_MAX_SSH_KEYS_PER_USER = 1
        resp = self.client.post(self.keys_url,
                                json.dumps({'name':'key1',
                                            'content':"""key 1 content"""}),
                                content_type='application/json')
        genpath = self.keys_url + "/generate"
        r = self.client.post(genpath)
        assert isinstance(r, http.HttpResponseServerError)

184
    def test_invalid_data(self):
185
186
187
        resp = self.client.post(self.keys_url,
                                json.dumps({'content':"""key 2 content"""}),
                                content_type='application/json')
188
189
190
191
192

        self.assertEqual(resp.status_code, 500)
        self.assertEqual(resp.content, """{"non_field_key": "__all__", "errors": """
                                       """{"name": ["This field cannot be blank."]}}""")

193
        settings.USERDATA_MAX_SSH_KEYS_PER_USER = 2
194
195

        # test ssh limit
196
197
198
199
200
201
202
203
204
205
206
207
        resp = self.client.post(self.keys_url,
                                json.dumps({'name':'key1',
                                            'content':"""key 1 content"""}),
                                content_type='application/json')
        resp = self.client.post(self.keys_url,
                                json.dumps({'name':'key1',
                                            'content':"""key 1 content"""}),
                                content_type='application/json')
        resp = self.client.post(self.keys_url,
                                json.dumps({'name':'key1',
                                            'content':"""key 1 content"""}),
                                content_type='application/json')
208
209
210
        self.assertEqual(resp.status_code, 500)
        self.assertEqual(resp.content, """{"non_field_key": "__all__", "errors": """
                                       """{"__all__": ["SSH keys limit exceeded."]}}""")
211