Statistics
| Branch: | Tag: | Revision:

root / snf-astakos-app / astakos / oa2 / tests / djangobackend.py @ 0d9523c3

History | View | Annotate | Download (18.1 kB)

1
# Copyright 2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import urllib
35
import urlparse
36
import base64
37
import datetime
38

    
39
from collections import namedtuple
40

    
41
from django.test import TransactionTestCase as TestCase
42
from django.test import Client as TestClient
43

    
44
from django.core.urlresolvers import reverse
45
from django.utils import simplejson as json
46

    
47
from astakos.oa2.models import Client, AuthorizationCode, Token
48
from astakos.im.tests import common
49

    
50

    
51
ParsedURL = namedtuple('ParsedURL', ['host', 'scheme', 'path', 'params',
52
                                     'url'])
53

    
54

    
55
def parsed_url_wrapper(func):
56
    def wrapper(self, url, *args, **kwargs):
57
        url = self.parse_url(url)
58
        return func(self, url, *args, **kwargs)
59
    return wrapper
60

    
61

    
62
class URLAssertionsMixin(object):
63

    
64
    def get_redirect_url(self, request):
65
        return self.parse_url(request['Location'])
66

    
67
    def parse_url(self, url):
68
        if isinstance(url, ParsedURL):
69
            return url
70
        result = urlparse.urlparse(url)
71
        parsed = {
72
            'url': url,
73
            'host': result.netloc,
74
            'scheme': result.scheme,
75
            'path': result.path,
76
        }
77
        parsed['params'] = urlparse.parse_qs(result.query)
78
        return ParsedURL(**parsed)
79

    
80
    @parsed_url_wrapper
81
    def assertParamEqual(self, url, key, value):
82
        self.assertParam(url, key)
83
        self.assertEqual(url.params[key][0], value)
84

    
85
    @parsed_url_wrapper
86
    def assertNoParam(self, url, key):
87
        self.assertFalse(key in url.params,
88
                         "Url '%s' does contain '%s' parameter" % (url.url,
89
                                                                   key))
90

    
91
    @parsed_url_wrapper
92
    def assertParam(self, url, key):
93
        self.assertTrue(key in url.params,
94
                        "Url '%s' does not contain '%s' parameter" % (url.url,
95
                                                                      key))
96

    
97
    @parsed_url_wrapper
98
    def assertHost(self, url, host):
99
        self.assertEqual(url.host, host)
100

    
101
    @parsed_url_wrapper
102
    def assertPath(self, url, path):
103
        self.assertEqual(url.path, path)
104

    
105
    @parsed_url_wrapper
106
    def assertSecure(self, url, key):
107
        self.assertEqual(url.scheme, "https")
108

    
109

    
110
class OA2Client(TestClient):
111
    """
112
    An OAuth2 agnostic test client.
113
    """
114
    def __init__(self, baseurl, *args, **kwargs):
115
        self.oa2_url = baseurl
116
        self.token_url = self.oa2_url + 'token/'
117
        self.auth_url = self.oa2_url + 'auth/'
118
        self.credentials = kwargs.pop('credentials', ())
119

    
120
        kwargs['wsgi.url_scheme'] = 'https'
121
        return super(OA2Client, self).__init__(*args, **kwargs)
122

    
123
    def request(self, *args, **kwargs):
124
        #print kwargs.get('PATH_INFO') + '?' + kwargs.get('QUERY_STRING'), \
125
            #kwargs.get('HTTP_AUTHORIZATION', None)
126
        return super(OA2Client, self).request(*args, **kwargs)
127

    
128
    def get_url(self, token_or_auth, **params):
129
        return token_or_auth + '?' + urllib.urlencode(params)
130

    
131
    def grant(self, clientid, *args, **kwargs):
132
        """
133
        Do an authorization grant request.
134
        """
135
        params = {
136
            'grant_type': 'authorization_code',
137
            'client_id': clientid
138
        }
139
        urlparams = kwargs.pop('urlparams', {})
140
        params.update(urlparams)
141
        self.set_auth_headers(kwargs)
142
        return self.get(self.get_url(self.token_url, **params), *args,
143
                        **kwargs)
144

    
145
    def authorize_code(self, clientid, *args, **kwargs):
146
        """
147
        Do an authorization code request.
148
        """
149
        params = {
150
            'response_type': 'code',
151
            'client_id': clientid
152
        }
153
        urlparams = kwargs.pop('urlparams', {})
154
        urlparams.update(kwargs.pop('extraparams', {}))
155
        params.update(urlparams)
156
        self.set_auth_headers(kwargs)
157
        if 'reject' in params:
158
            return self.post(self.get_url(self.auth_url), data=params,
159
                             **kwargs)
160
        return self.get(self.get_url(self.auth_url, **params), *args, **kwargs)
161

    
162
    def access_token(self, code,
163
                     content_type='application/x-www-form-urlencoded',
164
                     **kwargs):
165
        """
166
        Do an get token request.
167
        """
168
        params = {
169
            'grant_type': 'authorization_code',
170
            'code': code
171
        }
172
        params.update(kwargs)
173
        self.set_auth_headers(kwargs)
174
        return self.post(self.token_url, data=urllib.urlencode(params),
175
                         content_type=content_type, **kwargs)
176

    
177
    def set_auth_headers(self, params):
178
        if not self.credentials:
179
            return
180
        credentials = base64.encodestring('%s:%s' % self.credentials).strip()
181
        params['HTTP_AUTHORIZATION'] = 'Basic %s' % credentials
182
        return params
183

    
184
    def set_credentials(self, user=None, pwd=None):
185
        self.credentials = (user, pwd)
186
        if not user and not pwd:
187
            self.credentials = ()
188

    
189

    
190
class TestOA2(TestCase, URLAssertionsMixin):
191

    
192
    def assertCount(self, model, count):
193
        self.assertEqual(model.objects.count(), count)
194

    
195
    def assert_access_token_response(self, r, expected):
196
        self.assertEqual(r.status_code, 200)
197
        try:
198
            data = json.loads(r.content)
199
        except:
200
            self.fail("Unexpected response content")
201

    
202
        self.assertTrue('access_token' in data)
203
        access_token = data['access_token']
204
        self.assertTrue('token_type' in data)
205
        token_type = data['token_type']
206
        self.assertTrue('expires_in' in data)
207
        expires_in = data['expires_in']
208

    
209
        try:
210
            token = Token.objects.get(code=access_token)
211
            self.assertEqual(token.expires_at,
212
                             token.created_at +
213
                             datetime.timedelta(seconds=expires_in))
214
            self.assertEqual(token.token_type, token_type)
215
            self.assertEqual(token.grant_type, 'authorization_code')
216
            #self.assertEqual(token.user, expected.get('user'))
217
            self.assertEqual(token.redirect_uri, expected.get('redirect_uri'))
218
            self.assertEqual(token.scope, expected.get('scope'))
219
            self.assertEqual(token.state, expected.get('state'))
220
        except Token.DoesNotExist:
221
            self.fail("Invalid access_token")
222

    
223
    def setUp(self):
224
        baseurl = reverse('oauth2_authenticate').replace('/auth', '/')
225
        self.client = OA2Client(baseurl)
226
        client1 = Client.objects.create(identifier="client1", secret="secret")
227
        self.client1_redirect_uri = "https://server.com/handle_code"
228
        client1.redirecturl_set.create(url=self.client1_redirect_uri)
229

    
230
        client2 = Client.objects.create(identifier="client2", type='public')
231
        self.client2_redirect_uri = "https://server2.com/handle_code"
232
        client2.redirecturl_set.create(url=self.client2_redirect_uri)
233

    
234
        client3 = Client.objects.create(identifier="client3", secret='secret',
235
                                        is_trusted=True)
236
        self.client3_redirect_uri = "https://server3.com/handle_code"
237
        client3.redirecturl_set.create(url=self.client3_redirect_uri)
238

    
239
        common.get_local_user("user@synnefo.org", password="password")
240

    
241
    def test_code_authorization(self):
242
        # missing response_type
243
        r = self.client.get(self.client.get_url(self.client.auth_url))
244
        self.assertEqual(r.status_code, 400)
245
        self.assertCount(AuthorizationCode, 0)
246

    
247
        # invalid response_type
248
        r = self.client.get(self.client.get_url(self.client.auth_url,
249
                                                response_type='invalid'))
250
        self.assertEqual(r.status_code, 400)
251
        self.assertCount(AuthorizationCode, 0)
252

    
253
        # unsupported response_type
254
        r = self.client.get(self.client.get_url(self.client.auth_url,
255
                                                response_type='token'))
256
        self.assertEqual(r.status_code, 400)
257
        self.assertCount(AuthorizationCode, 0)
258

    
259
        # missing client_id
260
        r = self.client.get(self.client.get_url(self.client.auth_url,
261
                                                response_type='code'))
262
        self.assertEqual(r.status_code, 400)
263
        self.assertCount(AuthorizationCode, 0)
264

    
265
        # fake client
266
        r = self.client.authorize_code('client-fake')
267
        self.assertEqual(r.status_code, 400)
268
        self.assertCount(AuthorizationCode, 0)
269

    
270
        # mixed up credentials/client_id's
271
        self.client.set_credentials('client1', 'secret')
272
        r = self.client.authorize_code('client2')
273
        self.assertEqual(r.status_code, 400)
274
        self.assertCount(AuthorizationCode, 0)
275

    
276
        # invalid credentials
277
        self.client.set_credentials('client2', '')
278
        r = self.client.authorize_code('client2')
279
        self.assertEqual(r.status_code, 400)
280
        self.assertCount(AuthorizationCode, 0)
281

    
282
        # invalid redirect_uri: not absolute URI
283
        self.client.set_credentials()
284
        params = {'redirect_uri':
285
                  urlparse.urlparse(self.client1_redirect_uri).path}
286
        r = self.client.authorize_code('client1', urlparams=params)
287
        self.assertEqual(r.status_code, 400)
288
        self.assertCount(AuthorizationCode, 0)
289

    
290
        # mismatching redirect uri
291
        self.client.set_credentials()
292
        params = {'redirect_uri': self.client1_redirect_uri[1:]}
293
        r = self.client.authorize_code('client1', urlparams=params)
294
        self.assertEqual(r.status_code, 400)
295
        self.assertCount(AuthorizationCode, 0)
296

    
297
        # valid request: untrusted client
298
        params = {'redirect_uri': self.client1_redirect_uri,
299
                  'scope': self.client1_redirect_uri,
300
                  'extra_param': '123'}
301
        self.client.set_credentials('client1', 'secret')
302
        r = self.client.authorize_code('client1', urlparams=params)
303
        self.assertEqual(r.status_code, 302)
304
        self.assertTrue('Location' in r)
305
        self.assertHost(r['Location'], "testserver:80")
306
        self.assertPath(r['Location'], reverse('login'))
307

    
308
        self.client.set_credentials('client1', 'secret')
309
        self.client.login(username="user@synnefo.org", password="password")
310
        r = self.client.authorize_code('client1', urlparams=params)
311
        self.assertEqual(r.status_code, 200)
312

    
313
        r = self.client.authorize_code('client1', urlparams=params,
314
                                       extraparams={'reject': 0})
315
        self.assertCount(AuthorizationCode, 1)
316

    
317
        # redirect is valid
318
        redirect1 = self.get_redirect_url(r)
319
        self.assertParam(redirect1, "code")
320
        self.assertNoParam(redirect1, "extra_param")
321
        self.assertHost(redirect1, "server.com")
322
        self.assertPath(redirect1, "/handle_code")
323

    
324
        params['state'] = 'csrfstate'
325
        params['scope'] = 'resource1'
326
        r = self.client.authorize_code('client1', urlparams=params)
327
        redirect2 = self.get_redirect_url(r)
328
        self.assertParamEqual(redirect2, "state", 'csrfstate')
329
        self.assertCount(AuthorizationCode, 2)
330

    
331
        code1 = AuthorizationCode.objects.get(code=redirect1.params['code'][0])
332
        #self.assertEqual(code1.state, '')
333
        self.assertEqual(code1.state, None)
334
        self.assertEqual(code1.redirect_uri, self.client1_redirect_uri)
335

    
336
        code2 = AuthorizationCode.objects.get(code=redirect2.params['code'][0])
337
        self.assertEqual(code2.state, 'csrfstate')
338
        self.assertEqual(code2.redirect_uri, self.client1_redirect_uri)
339

    
340
        # valid request: trusted client
341
        params = {'redirect_uri': self.client3_redirect_uri,
342
                  'scope': self.client3_redirect_uri,
343
                  'extra_param': '123'}
344
        self.client.set_credentials('client3', 'secret')
345
        r = self.client.authorize_code('client3', urlparams=params)
346
        self.assertEqual(r.status_code, 302)
347
        self.assertCount(AuthorizationCode, 3)
348

    
349
        # redirect is valid
350
        redirect3 = self.get_redirect_url(r)
351
        self.assertParam(redirect1, "code")
352
        self.assertNoParam(redirect3, "state")
353
        self.assertNoParam(redirect3, "extra_param")
354
        self.assertHost(redirect3, "server3.com")
355
        self.assertPath(redirect3, "/handle_code")
356

    
357
        code3 = AuthorizationCode.objects.get(code=redirect3.params['code'][0])
358
        self.assertEqual(code3.state, None)
359
        self.assertEqual(code3.redirect_uri, self.client3_redirect_uri)
360

    
361
        # valid request: trusted client
362
        params['state'] = 'csrfstate'
363
        self.client.set_credentials('client3', 'secret')
364
        r = self.client.authorize_code('client3', urlparams=params)
365
        self.assertEqual(r.status_code, 302)
366
        self.assertCount(AuthorizationCode, 4)
367

    
368
        # redirect is valid
369
        redirect4 = self.get_redirect_url(r)
370
        self.assertParam(redirect4, "code")
371
        self.assertParamEqual(redirect4, "state", 'csrfstate')
372
        self.assertNoParam(redirect4, "extra_param")
373
        self.assertHost(redirect4, "server3.com")
374
        self.assertPath(redirect4, "/handle_code")
375

    
376
        code4 = AuthorizationCode.objects.get(code=redirect4.params['code'][0])
377
        self.assertEqual(code4.state, 'csrfstate')
378
        self.assertEqual(code4.redirect_uri, self.client3_redirect_uri)
379

    
380
    def test_get_token(self):
381
        # invalid method
382
        r = self.client.get(self.client.token_url)
383
        self.assertEqual(r.status_code, 405)
384
        self.assertTrue('Allow' in r)
385
        self.assertEqual(r['Allow'], 'POST')
386

    
387
        # invalid content type
388
        r = self.client.post(self.client.token_url)
389
        self.assertEqual(r.status_code, 400)
390

    
391
        # missing grant type
392
        r = self.client.post(self.client.token_url,
393
                             content_type='application/x-www-form-urlencoded')
394
        self.assertEqual(r.status_code, 400)
395

    
396
        # unsupported grant type: client_credentials
397
        r = self.client.post(self.client.token_url,
398
                             data='grant_type=client_credentials',
399
                             content_type='application/x-www-form-urlencoded')
400
        self.assertEqual(r.status_code, 400)
401

    
402
        # unsupported grant type: token
403
        r = self.client.post(self.client.token_url,
404
                             data='grant_type=token',
405
                             content_type='application/x-www-form-urlencoded')
406
        self.assertEqual(r.status_code, 400)
407

    
408
        # invalid grant type
409
        r = self.client.post(self.client.token_url,
410
                             data='grant_type=invalid',
411
                             content_type='application/x-www-form-urlencoded')
412
        self.assertEqual(r.status_code, 400)
413

    
414
        # generate authorization code: without redirect_uri
415
        self.client.login(username="user@synnefo.org", password="password")
416
        r = self.client.authorize_code('client3')
417
        self.assertCount(AuthorizationCode, 1)
418
        redirect = self.get_redirect_url(r)
419
        code_instance = AuthorizationCode.objects.get(
420
            code=redirect.params['code'][0])
421

    
422
        # no client_id & no client authorization
423
        r = self.client.access_token(code_instance.code)
424
        self.assertEqual(r.status_code, 400)
425

    
426
        # invalid client_id
427
        r = self.client.access_token(code_instance.code, client_id='client2')
428
        self.assertEqual(r.status_code, 400)
429

    
430
        # inexistent client_id
431
        r = self.client.access_token(code_instance.code, client_id='client42')
432
        self.assertEqual(r.status_code, 400)
433

    
434
        # no client authorization
435
        r = self.client.access_token(code_instance.code, client_id='client3')
436
        self.assertEqual(r.status_code, 400)
437

    
438
        # mixed up credentials/client_id's
439
        self.client.set_credentials('client1', 'secret')
440
        r = self.client.access_token(code_instance.code, client_id='client3')
441
        self.assertEqual(r.status_code, 400)
442

    
443
        # mixed up credentials/client_id's
444
        self.client.set_credentials('client3', 'secret')
445
        r = self.client.access_token(code_instance.code, client_id='client1')
446
        self.assertEqual(r.status_code, 400)
447

    
448
        # mismatching client
449
        self.client.set_credentials('client1', 'secret')
450
        r = self.client.access_token(code_instance.code, client_id='client1')
451
        self.assertEqual(r.status_code, 400)
452

    
453
        # invalid code
454
        self.client.set_credentials('client3', 'secret')
455
        r = self.client.access_token('invalid')
456
        self.assertEqual(r.status_code, 400)
457

    
458
        # valid request
459
        self.client.set_credentials('client3', 'secret')
460
        r = self.client.access_token(code_instance.code)
461
        self.assertCount(AuthorizationCode, 0)  # assert code is consumed
462
        self.assertCount(Token, 1)
463
        expected = {'redirect_uri': self.client3_redirect_uri,
464
                    'scope': self.client3_redirect_uri,
465
                    'state': None}
466
        self.assert_access_token_response(r, expected)