Statistics
| Branch: | Tag: | Revision:

root / snf-astakos-app / astakos / oa2 / tests / djangobackend.py @ 68122bae

History | View | Annotate | Download (20.9 kB)

1
# -*- coding: utf8 -*-
2
# Copyright 2013 GRNET S.A. All rights reserved.
3
#
4
# Redistribution and use in source and binary forms, with or
5
# without modification, are permitted provided that the following
6
# conditions are met:
7
#
8
#   1. Redistributions of source code must retain the above
9
#      copyright notice, this list of conditions and the following
10
#      disclaimer.
11
#
12
#   2. Redistributions in binary form must reproduce the above
13
#      copyright notice, this list of conditions and the following
14
#      disclaimer in the documentation and/or other materials
15
#      provided with the distribution.
16
#
17
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28
# POSSIBILITY OF SUCH DAMAGE.
29
#
30
# The views and conclusions contained in the software and
31
# documentation are those of the authors and should not be
32
# interpreted as representing official policies, either expressed
33
# or implied, of GRNET S.A.
34

    
35
import urllib
36
import urlparse
37
import base64
38
import datetime
39

    
40
from collections import namedtuple
41

    
42
from django.test import TransactionTestCase as TestCase
43
from django.test import Client as TestClient
44

    
45
from django.core.urlresolvers import reverse
46
from django.utils import simplejson as json
47

    
48
from astakos.oa2 import settings
49
from astakos.oa2.models import Client, AuthorizationCode, Token
50
from astakos.im.tests import common
51

    
52

    
53
ParsedURL = namedtuple('ParsedURL', ['host', 'scheme', 'path', 'params',
54
                                     'url'])
55

    
56

    
57
def parsed_url_wrapper(func):
58
    def wrapper(self, url, *args, **kwargs):
59
        url = self.parse_url(url)
60
        return func(self, url, *args, **kwargs)
61
    return wrapper
62

    
63

    
64
class URLAssertionsMixin(object):
65

    
66
    def get_redirect_url(self, request):
67
        return self.parse_url(request['Location'])
68

    
69
    def parse_url(self, url):
70
        if isinstance(url, ParsedURL):
71
            return url
72
        result = urlparse.urlparse(url)
73
        parsed = {
74
            'url': url,
75
            'host': result.netloc,
76
            'scheme': result.scheme,
77
            'path': result.path,
78
        }
79
        parsed['params'] = urlparse.parse_qs(result.query)
80
        return ParsedURL(**parsed)
81

    
82
    @parsed_url_wrapper
83
    def assertParamEqual(self, url, key, value):
84
        self.assertParam(url, key)
85
        self.assertEqual(url.params[key][0], value)
86

    
87
    @parsed_url_wrapper
88
    def assertNoParam(self, url, key):
89
        self.assertFalse(key in url.params,
90
                         "Url '%s' does contain '%s' parameter" % (url.url,
91
                                                                   key))
92

    
93
    @parsed_url_wrapper
94
    def assertParam(self, url, key):
95
        self.assertTrue(key in url.params,
96
                        "Url '%s' does not contain '%s' parameter" % (url.url,
97
                                                                      key))
98

    
99
    @parsed_url_wrapper
100
    def assertHost(self, url, host):
101
        self.assertEqual(url.host, host)
102

    
103
    @parsed_url_wrapper
104
    def assertPath(self, url, path):
105
        self.assertEqual(url.path, path)
106

    
107
    @parsed_url_wrapper
108
    def assertSecure(self, url, key):
109
        self.assertEqual(url.scheme, "https")
110

    
111

    
112
class OA2Client(TestClient):
113
    """
114
    An OAuth2 agnostic test client.
115
    """
116
    def __init__(self, baseurl, *args, **kwargs):
117
        self.oa2_url = baseurl
118
        self.token_url = self.oa2_url + 'token/'
119
        self.auth_url = self.oa2_url + 'auth/'
120
        self.credentials = kwargs.pop('credentials', ())
121

    
122
        kwargs['wsgi.url_scheme'] = 'https'
123
        return super(OA2Client, self).__init__(*args, **kwargs)
124

    
125
    def request(self, *args, **kwargs):
126
        #print kwargs.get('PATH_INFO') + '?' + kwargs.get('QUERY_STRING'), \
127
            #kwargs.get('HTTP_AUTHORIZATION', None)
128
        return super(OA2Client, self).request(*args, **kwargs)
129

    
130
    def get_url(self, token_or_auth, **params):
131
        return token_or_auth + '?' + urllib.urlencode(params)
132

    
133
    def grant(self, clientid, *args, **kwargs):
134
        """
135
        Do an authorization grant request.
136
        """
137
        params = {
138
            'grant_type': 'authorization_code',
139
            'client_id': clientid
140
        }
141
        urlparams = kwargs.pop('urlparams', {})
142
        params.update(urlparams)
143
        self.set_auth_headers(kwargs)
144
        return self.get(self.get_url(self.token_url, **params), *args,
145
                        **kwargs)
146

    
147
    def authorize_code(self, clientid, *args, **kwargs):
148
        """
149
        Do an authorization code request.
150
        """
151
        params = {
152
            'response_type': 'code',
153
            'client_id': clientid
154
        }
155
        urlparams = kwargs.pop('urlparams', {})
156
        urlparams.update(kwargs.pop('extraparams', {}))
157
        params.update(urlparams)
158
        self.set_auth_headers(kwargs)
159
        if 'reject' in params:
160
            return self.post(self.get_url(self.auth_url), data=params,
161
                             **kwargs)
162
        return self.get(self.get_url(self.auth_url, **params), *args, **kwargs)
163

    
164
    def access_token(self, code,
165
                     content_type='application/x-www-form-urlencoded',
166
                     **kwargs):
167
        """
168
        Do an get token request.
169
        """
170
        params = {
171
            'grant_type': 'authorization_code',
172
            'code': code
173
        }
174
        params.update(kwargs)
175
        self.set_auth_headers(kwargs)
176
        return self.post(self.token_url, data=urllib.urlencode(params),
177
                         content_type=content_type, **kwargs)
178

    
179
    def set_auth_headers(self, params):
180
        if not self.credentials:
181
            return
182
        credentials = base64.encodestring('%s:%s' % self.credentials).strip()
183
        params['HTTP_AUTHORIZATION'] = 'Basic %s' % credentials
184
        return params
185

    
186
    def set_credentials(self, user=None, pwd=None):
187
        self.credentials = (user, pwd)
188
        if not user and not pwd:
189
            self.credentials = ()
190

    
191

    
192
class TestOA2(TestCase, URLAssertionsMixin):
193

    
194
    def assertCount(self, model, count):
195
        self.assertEqual(model.objects.count(), count)
196

    
197
    def assert_access_token_response(self, r, expected):
198
        self.assertEqual(r.status_code, 200)
199
        try:
200
            data = json.loads(r.content)
201
        except:
202
            self.fail("Unexpected response content")
203

    
204
        self.assertTrue('access_token' in data)
205
        access_token = data['access_token']
206
        self.assertTrue('token_type' in data)
207
        token_type = data['token_type']
208
        self.assertTrue('expires_in' in data)
209
        expires_in = data['expires_in']
210

    
211
        try:
212
            token = Token.objects.get(code=access_token)
213
            self.assertEqual(token.expires_at,
214
                             token.created_at +
215
                             datetime.timedelta(seconds=expires_in))
216
            self.assertEqual(token.token_type, token_type)
217
            self.assertEqual(token.grant_type, 'authorization_code')
218
            #self.assertEqual(token.user, expected.get('user'))
219
            self.assertEqual(token.redirect_uri, expected.get('redirect_uri'))
220
            self.assertEqual(token.scope, expected.get('scope'))
221
            self.assertEqual(token.state, expected.get('state'))
222
        except Token.DoesNotExist:
223
            self.fail("Invalid access_token")
224

    
225
    def setUp(self):
226
        baseurl = reverse('oauth2_authenticate').replace('/auth', '/')
227
        self.client = OA2Client(baseurl)
228
        client1 = Client.objects.create(identifier="client1", secret="secret")
229
        self.client1_redirect_uri = "https://server.com/handle_code"
230
        client1.redirecturl_set.create(url=self.client1_redirect_uri)
231

    
232
        client2 = Client.objects.create(identifier="client2", type='public')
233
        self.client2_redirect_uri = "https://server2.com/handle_code"
234
        client2.redirecturl_set.create(url=self.client2_redirect_uri)
235

    
236
        client3 = Client.objects.create(identifier="client3", secret='secret',
237
                                        is_trusted=True)
238
        self.client3_redirect_uri = "https://server3.com/handle_code"
239
        client3.redirecturl_set.create(url=self.client3_redirect_uri)
240

    
241
        common.get_local_user("user@synnefo.org", password="password")
242

    
243
    def test_code_authorization(self):
244
        # missing response_type
245
        r = self.client.get(self.client.get_url(self.client.auth_url))
246
        self.assertEqual(r.status_code, 400)
247
        self.assertCount(AuthorizationCode, 0)
248

    
249
        # invalid response_type
250
        r = self.client.get(self.client.get_url(self.client.auth_url,
251
                                                response_type='invalid'))
252
        self.assertEqual(r.status_code, 400)
253
        self.assertCount(AuthorizationCode, 0)
254

    
255
        # unsupported response_type
256
        r = self.client.get(self.client.get_url(self.client.auth_url,
257
                                                response_type='token'))
258
        self.assertEqual(r.status_code, 400)
259
        self.assertCount(AuthorizationCode, 0)
260

    
261
        # missing client_id
262
        r = self.client.get(self.client.get_url(self.client.auth_url,
263
                                                response_type='code'))
264
        self.assertEqual(r.status_code, 400)
265
        self.assertCount(AuthorizationCode, 0)
266

    
267
        # fake client
268
        r = self.client.authorize_code('client-fake')
269
        self.assertEqual(r.status_code, 400)
270
        self.assertCount(AuthorizationCode, 0)
271

    
272
        # mixed up credentials/client_id's
273
        self.client.set_credentials('client1', 'secret')
274
        r = self.client.authorize_code('client2')
275
        self.assertEqual(r.status_code, 400)
276
        self.assertCount(AuthorizationCode, 0)
277

    
278
        # invalid credentials
279
        self.client.set_credentials('client2', '')
280
        r = self.client.authorize_code('client2')
281
        self.assertEqual(r.status_code, 400)
282
        self.assertCount(AuthorizationCode, 0)
283

    
284
        # invalid redirect_uri: not absolute URI
285
        self.client.set_credentials()
286
        params = {'redirect_uri':
287
                  urlparse.urlparse(self.client1_redirect_uri).path}
288
        r = self.client.authorize_code('client1', urlparams=params)
289
        self.assertEqual(r.status_code, 400)
290
        self.assertCount(AuthorizationCode, 0)
291

    
292
        # mismatching redirect uri
293
        self.client.set_credentials()
294
        params = {'redirect_uri': self.client1_redirect_uri[1:]}
295
        r = self.client.authorize_code('client1', urlparams=params)
296
        self.assertEqual(r.status_code, 400)
297
        self.assertCount(AuthorizationCode, 0)
298

    
299
        # valid request: untrusted client
300
        params = {'redirect_uri': self.client1_redirect_uri,
301
                  'scope': self.client1_redirect_uri,
302
                  'extra_param': 'γιουνικοντ'}
303
        self.client.set_credentials('client1', 'secret')
304
        r = self.client.authorize_code('client1', urlparams=params)
305
        self.assertEqual(r.status_code, 302)
306
        self.assertTrue('Location' in r)
307
        self.assertHost(r['Location'], "testserver:80")
308
        self.assertPath(r['Location'], reverse('login'))
309

    
310
        self.client.set_credentials('client1', 'secret')
311
        self.client.login(username="user@synnefo.org", password="password")
312
        r = self.client.authorize_code('client1', urlparams=params)
313
        self.assertEqual(r.status_code, 200)
314

    
315
        r = self.client.authorize_code('client1', urlparams=params,
316
                                       extraparams={'reject': 0})
317
        self.assertCount(AuthorizationCode, 1)
318

    
319
        # redirect is valid
320
        redirect = self.get_redirect_url(r)
321
        self.assertParam(redirect, "code")
322
        self.assertNoParam(redirect, "extra_param")
323
        self.assertHost(redirect, "server.com")
324
        self.assertPath(redirect, "/handle_code")
325

    
326
        code = AuthorizationCode.objects.get(code=redirect.params['code'][0])
327
        #self.assertEqual(code.state, '')
328
        self.assertEqual(code.state, None)
329
        self.assertEqual(code.redirect_uri, self.client1_redirect_uri)
330

    
331
        params['state'] = 'csrfstate'
332
        params['scope'] = 'resource1'
333
        r = self.client.authorize_code('client1', urlparams=params)
334
        redirect = self.get_redirect_url(r)
335
        self.assertParamEqual(redirect, "state", 'csrfstate')
336
        self.assertCount(AuthorizationCode, 2)
337

    
338
        code = AuthorizationCode.objects.get(code=redirect.params['code'][0])
339
        self.assertEqual(code.state, 'csrfstate')
340
        self.assertEqual(code.redirect_uri, self.client1_redirect_uri)
341

    
342
        # valid request: trusted client
343
        params = {'redirect_uri': self.client3_redirect_uri,
344
                  'scope': self.client3_redirect_uri,
345
                  'extra_param': '123'}
346
        self.client.set_credentials('client3', 'secret')
347
        r = self.client.authorize_code('client3', urlparams=params)
348
        self.assertEqual(r.status_code, 302)
349
        self.assertCount(AuthorizationCode, 3)
350

    
351
        # redirect is valid
352
        redirect = self.get_redirect_url(r)
353
        self.assertParam(redirect, "code")
354
        self.assertNoParam(redirect, "state")
355
        self.assertNoParam(redirect, "extra_param")
356
        self.assertHost(redirect, "server3.com")
357
        self.assertPath(redirect, "/handle_code")
358

    
359
        code = AuthorizationCode.objects.get(code=redirect.params['code'][0])
360
        self.assertEqual(code.state, None)
361
        self.assertEqual(code.redirect_uri, self.client3_redirect_uri)
362

    
363
        # valid request: trusted client
364
        params['state'] = 'csrfstate'
365
        self.client.set_credentials('client3', 'secret')
366
        r = self.client.authorize_code('client3', urlparams=params)
367
        self.assertEqual(r.status_code, 302)
368
        self.assertCount(AuthorizationCode, 4)
369

    
370
        # redirect is valid
371
        redirect = self.get_redirect_url(r)
372
        self.assertParam(redirect, "code")
373
        self.assertParamEqual(redirect, "state", 'csrfstate')
374
        self.assertNoParam(redirect, "extra_param")
375
        self.assertHost(redirect, "server3.com")
376
        self.assertPath(redirect, "/handle_code")
377

    
378
        code = AuthorizationCode.objects.get(code=redirect.params['code'][0])
379
        self.assertEqual(code.state, 'csrfstate')
380
        self.assertEqual(code.redirect_uri, self.client3_redirect_uri)
381

    
382
        # redirect uri startswith the client's registered redirect url
383
        params['redirect_uri'] = '%smore' % self.client3_redirect_uri
384
        self.client.set_credentials('client3', 'secret')
385
        r = self.client.authorize_code('client3', urlparams=params)
386
        self.assertEqual(r.status_code, 400)
387

    
388
        # redirect uri descendant
389
        redirect_uri = '%s/' % self.client3_redirect_uri
390
        rest = settings.MAXIMUM_ALLOWED_REDIRECT_URI_LENGTH - len(redirect_uri)
391
        redirect_uri = '%s%s' % (redirect_uri, 'a'*rest)
392
        params['redirect_uri'] = redirect_uri
393
        self.client.set_credentials('client3', 'secret')
394
        r = self.client.authorize_code('client3', urlparams=params)
395
        self.assertEqual(r.status_code, 302)
396
        self.assertCount(AuthorizationCode, 5)
397

    
398
        # redirect is valid
399
        redirect = self.get_redirect_url(r)
400
        self.assertParam(redirect, "code")
401
        self.assertParamEqual(redirect, "state", 'csrfstate')
402
        self.assertNoParam(redirect, "extra_param")
403
        self.assertHost(redirect, "server3.com")
404
        self.assertPath(redirect, urlparse.urlparse(redirect_uri).path)
405

    
406
        code = AuthorizationCode.objects.get(code=redirect.params['code'][0])
407
        self.assertEqual(code.state, 'csrfstate')
408
        self.assertEqual(code.redirect_uri, redirect_uri)
409

    
410
        # too long redirect uri
411
        params['redirect_uri'] = '%sa' % redirect_uri
412
        self.client.set_credentials('client3', 'secret')
413
        r = self.client.authorize_code('client3', urlparams=params)
414
        self.assertEqual(r.status_code, 400)
415

    
416
    def test_get_token(self):
417
        # invalid method
418
        r = self.client.get(self.client.token_url)
419
        self.assertEqual(r.status_code, 405)
420
        self.assertTrue('Allow' in r)
421
        self.assertEqual(r['Allow'], 'POST')
422

    
423
        # invalid content type
424
        r = self.client.post(self.client.token_url)
425
        self.assertEqual(r.status_code, 400)
426

    
427
        # missing grant type
428
        r = self.client.post(self.client.token_url,
429
                             content_type='application/x-www-form-urlencoded')
430
        self.assertEqual(r.status_code, 400)
431

    
432
        # unsupported grant type: client_credentials
433
        r = self.client.post(self.client.token_url,
434
                             data='grant_type=client_credentials',
435
                             content_type='application/x-www-form-urlencoded')
436
        self.assertEqual(r.status_code, 400)
437

    
438
        # unsupported grant type: token
439
        r = self.client.post(self.client.token_url,
440
                             data='grant_type=token',
441
                             content_type='application/x-www-form-urlencoded')
442
        self.assertEqual(r.status_code, 400)
443

    
444
        # invalid grant type
445
        r = self.client.post(self.client.token_url,
446
                             data='grant_type=invalid',
447
                             content_type='application/x-www-form-urlencoded')
448
        self.assertEqual(r.status_code, 400)
449

    
450
        # generate authorization code: without redirect_uri
451
        self.client.login(username="user@synnefo.org", password="password")
452
        r = self.client.authorize_code('client3')
453
        self.assertCount(AuthorizationCode, 1)
454
        redirect = self.get_redirect_url(r)
455
        code_instance = AuthorizationCode.objects.get(
456
            code=redirect.params['code'][0])
457

    
458
        # no client_id & no client authorization
459
        r = self.client.access_token(code_instance.code)
460
        self.assertEqual(r.status_code, 400)
461

    
462
        # invalid client_id
463
        r = self.client.access_token(code_instance.code, client_id='client2')
464
        self.assertEqual(r.status_code, 400)
465

    
466
        # inexistent client_id
467
        r = self.client.access_token(code_instance.code, client_id='client42')
468
        self.assertEqual(r.status_code, 400)
469

    
470
        # no client authorization
471
        r = self.client.access_token(code_instance.code, client_id='client3')
472
        self.assertEqual(r.status_code, 400)
473

    
474
        # mixed up credentials/client_id's
475
        self.client.set_credentials('client1', 'secret')
476
        r = self.client.access_token(code_instance.code, client_id='client3')
477
        self.assertEqual(r.status_code, 400)
478

    
479
        # mixed up credentials/client_id's
480
        self.client.set_credentials('client3', 'secret')
481
        r = self.client.access_token(code_instance.code, client_id='client1')
482
        self.assertEqual(r.status_code, 400)
483

    
484
        # mismatching client
485
        self.client.set_credentials('client1', 'secret')
486
        r = self.client.access_token(code_instance.code, client_id='client1')
487
        self.assertEqual(r.status_code, 400)
488

    
489
        # invalid code
490
        self.client.set_credentials('client3', 'secret')
491
        r = self.client.access_token('invalid')
492
        self.assertEqual(r.status_code, 400)
493

    
494
        # valid request
495
        self.client.set_credentials('client3', 'secret')
496
        r = self.client.access_token(code_instance.code)
497
        self.assertCount(AuthorizationCode, 0)  # assert code is consumed
498
        self.assertCount(Token, 1)
499
        expected = {'redirect_uri': self.client3_redirect_uri,
500
                    'scope': self.client3_redirect_uri,
501
                    'state': None}
502
        self.assert_access_token_response(r, expected)
503

    
504
        # generate authorization code with too long redirect_uri
505
        redirect_uri = '%s/' % self.client3_redirect_uri
506
        rest = settings.MAXIMUM_ALLOWED_REDIRECT_URI_LENGTH - len(redirect_uri)
507
        redirect_uri = '%s%s' % (redirect_uri, 'a'*rest)
508
        params = {'redirect_uri': redirect_uri}
509
        r = self.client.authorize_code('client3', urlparams=params)
510
        self.assertCount(AuthorizationCode, 1)
511
        redirect = self.get_redirect_url(r)
512
        code_instance = AuthorizationCode.objects.get(
513
            code=redirect.params['code'][0])
514

    
515
        # valid request
516
        self.client.set_credentials('client3', 'secret')
517
        r = self.client.access_token(code_instance.code,
518
                                     redirect_uri='%sa' % redirect_uri)
519
        self.assertEqual(r.status_code, 400)
520

    
521
        r = self.client.access_token(code_instance.code,
522
                                     redirect_uri=redirect_uri)
523
        self.assertCount(AuthorizationCode, 0)  # assert code is consumed
524
        self.assertCount(Token, 2)
525
        expected = {'redirect_uri': redirect_uri,
526
                    'scope': redirect_uri,
527
                    'state': None}
528
        self.assert_access_token_response(r, expected)