Statistics
| Branch: | Tag: | Revision:

root / snf-astakos-app / astakos / oa2 / tests / djangobackend.py @ c985de5c

History | View | Annotate | Download (20.8 kB)

1
# Copyright 2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import urllib
35
import urlparse
36
import base64
37
import datetime
38

    
39
from collections import namedtuple
40

    
41
from django.test import TransactionTestCase as TestCase
42
from django.test import Client as TestClient
43

    
44
from django.core.urlresolvers import reverse
45
from django.utils import simplejson as json
46

    
47
from astakos.oa2 import settings
48
from astakos.oa2.models import Client, AuthorizationCode, Token
49
from astakos.im.tests import common
50

    
51

    
52
ParsedURL = namedtuple('ParsedURL', ['host', 'scheme', 'path', 'params',
53
                                     'url'])
54

    
55

    
56
def parsed_url_wrapper(func):
57
    def wrapper(self, url, *args, **kwargs):
58
        url = self.parse_url(url)
59
        return func(self, url, *args, **kwargs)
60
    return wrapper
61

    
62

    
63
class URLAssertionsMixin(object):
64

    
65
    def get_redirect_url(self, request):
66
        return self.parse_url(request['Location'])
67

    
68
    def parse_url(self, url):
69
        if isinstance(url, ParsedURL):
70
            return url
71
        result = urlparse.urlparse(url)
72
        parsed = {
73
            'url': url,
74
            'host': result.netloc,
75
            'scheme': result.scheme,
76
            'path': result.path,
77
        }
78
        parsed['params'] = urlparse.parse_qs(result.query)
79
        return ParsedURL(**parsed)
80

    
81
    @parsed_url_wrapper
82
    def assertParamEqual(self, url, key, value):
83
        self.assertParam(url, key)
84
        self.assertEqual(url.params[key][0], value)
85

    
86
    @parsed_url_wrapper
87
    def assertNoParam(self, url, key):
88
        self.assertFalse(key in url.params,
89
                         "Url '%s' does contain '%s' parameter" % (url.url,
90
                                                                   key))
91

    
92
    @parsed_url_wrapper
93
    def assertParam(self, url, key):
94
        self.assertTrue(key in url.params,
95
                        "Url '%s' does not contain '%s' parameter" % (url.url,
96
                                                                      key))
97

    
98
    @parsed_url_wrapper
99
    def assertHost(self, url, host):
100
        self.assertEqual(url.host, host)
101

    
102
    @parsed_url_wrapper
103
    def assertPath(self, url, path):
104
        self.assertEqual(url.path, path)
105

    
106
    @parsed_url_wrapper
107
    def assertSecure(self, url, key):
108
        self.assertEqual(url.scheme, "https")
109

    
110

    
111
class OA2Client(TestClient):
112
    """
113
    An OAuth2 agnostic test client.
114
    """
115
    def __init__(self, baseurl, *args, **kwargs):
116
        self.oa2_url = baseurl
117
        self.token_url = self.oa2_url + 'token/'
118
        self.auth_url = self.oa2_url + 'auth/'
119
        self.credentials = kwargs.pop('credentials', ())
120

    
121
        kwargs['wsgi.url_scheme'] = 'https'
122
        return super(OA2Client, self).__init__(*args, **kwargs)
123

    
124
    def request(self, *args, **kwargs):
125
        #print kwargs.get('PATH_INFO') + '?' + kwargs.get('QUERY_STRING'), \
126
            #kwargs.get('HTTP_AUTHORIZATION', None)
127
        return super(OA2Client, self).request(*args, **kwargs)
128

    
129
    def get_url(self, token_or_auth, **params):
130
        return token_or_auth + '?' + urllib.urlencode(params)
131

    
132
    def grant(self, clientid, *args, **kwargs):
133
        """
134
        Do an authorization grant request.
135
        """
136
        params = {
137
            'grant_type': 'authorization_code',
138
            'client_id': clientid
139
        }
140
        urlparams = kwargs.pop('urlparams', {})
141
        params.update(urlparams)
142
        self.set_auth_headers(kwargs)
143
        return self.get(self.get_url(self.token_url, **params), *args,
144
                        **kwargs)
145

    
146
    def authorize_code(self, clientid, *args, **kwargs):
147
        """
148
        Do an authorization code request.
149
        """
150
        params = {
151
            'response_type': 'code',
152
            'client_id': clientid
153
        }
154
        urlparams = kwargs.pop('urlparams', {})
155
        urlparams.update(kwargs.pop('extraparams', {}))
156
        params.update(urlparams)
157
        self.set_auth_headers(kwargs)
158
        if 'reject' in params:
159
            return self.post(self.get_url(self.auth_url), data=params,
160
                             **kwargs)
161
        return self.get(self.get_url(self.auth_url, **params), *args, **kwargs)
162

    
163
    def access_token(self, code,
164
                     content_type='application/x-www-form-urlencoded',
165
                     **kwargs):
166
        """
167
        Do an get token request.
168
        """
169
        params = {
170
            'grant_type': 'authorization_code',
171
            'code': code
172
        }
173
        params.update(kwargs)
174
        self.set_auth_headers(kwargs)
175
        return self.post(self.token_url, data=urllib.urlencode(params),
176
                         content_type=content_type, **kwargs)
177

    
178
    def set_auth_headers(self, params):
179
        if not self.credentials:
180
            return
181
        credentials = base64.encodestring('%s:%s' % self.credentials).strip()
182
        params['HTTP_AUTHORIZATION'] = 'Basic %s' % credentials
183
        return params
184

    
185
    def set_credentials(self, user=None, pwd=None):
186
        self.credentials = (user, pwd)
187
        if not user and not pwd:
188
            self.credentials = ()
189

    
190

    
191
class TestOA2(TestCase, URLAssertionsMixin):
192

    
193
    def assertCount(self, model, count):
194
        self.assertEqual(model.objects.count(), count)
195

    
196
    def assert_access_token_response(self, r, expected):
197
        self.assertEqual(r.status_code, 200)
198
        try:
199
            data = json.loads(r.content)
200
        except:
201
            self.fail("Unexpected response content")
202

    
203
        self.assertTrue('access_token' in data)
204
        access_token = data['access_token']
205
        self.assertTrue('token_type' in data)
206
        token_type = data['token_type']
207
        self.assertTrue('expires_in' in data)
208
        expires_in = data['expires_in']
209

    
210
        try:
211
            token = Token.objects.get(code=access_token)
212
            self.assertEqual(token.expires_at,
213
                             token.created_at +
214
                             datetime.timedelta(seconds=expires_in))
215
            self.assertEqual(token.token_type, token_type)
216
            self.assertEqual(token.grant_type, 'authorization_code')
217
            #self.assertEqual(token.user, expected.get('user'))
218
            self.assertEqual(token.redirect_uri, expected.get('redirect_uri'))
219
            self.assertEqual(token.scope, expected.get('scope'))
220
            self.assertEqual(token.state, expected.get('state'))
221
        except Token.DoesNotExist:
222
            self.fail("Invalid access_token")
223

    
224
    def setUp(self):
225
        baseurl = reverse('oauth2_authenticate').replace('/auth', '/')
226
        self.client = OA2Client(baseurl)
227
        client1 = Client.objects.create(identifier="client1", secret="secret")
228
        self.client1_redirect_uri = "https://server.com/handle_code"
229
        client1.redirecturl_set.create(url=self.client1_redirect_uri)
230

    
231
        client2 = Client.objects.create(identifier="client2", type='public')
232
        self.client2_redirect_uri = "https://server2.com/handle_code"
233
        client2.redirecturl_set.create(url=self.client2_redirect_uri)
234

    
235
        client3 = Client.objects.create(identifier="client3", secret='secret',
236
                                        is_trusted=True)
237
        self.client3_redirect_uri = "https://server3.com/handle_code"
238
        client3.redirecturl_set.create(url=self.client3_redirect_uri)
239

    
240
        common.get_local_user("user@synnefo.org", password="password")
241

    
242
    def test_code_authorization(self):
243
        # missing response_type
244
        r = self.client.get(self.client.get_url(self.client.auth_url))
245
        self.assertEqual(r.status_code, 400)
246
        self.assertCount(AuthorizationCode, 0)
247

    
248
        # invalid response_type
249
        r = self.client.get(self.client.get_url(self.client.auth_url,
250
                                                response_type='invalid'))
251
        self.assertEqual(r.status_code, 400)
252
        self.assertCount(AuthorizationCode, 0)
253

    
254
        # unsupported response_type
255
        r = self.client.get(self.client.get_url(self.client.auth_url,
256
                                                response_type='token'))
257
        self.assertEqual(r.status_code, 400)
258
        self.assertCount(AuthorizationCode, 0)
259

    
260
        # missing client_id
261
        r = self.client.get(self.client.get_url(self.client.auth_url,
262
                                                response_type='code'))
263
        self.assertEqual(r.status_code, 400)
264
        self.assertCount(AuthorizationCode, 0)
265

    
266
        # fake client
267
        r = self.client.authorize_code('client-fake')
268
        self.assertEqual(r.status_code, 400)
269
        self.assertCount(AuthorizationCode, 0)
270

    
271
        # mixed up credentials/client_id's
272
        self.client.set_credentials('client1', 'secret')
273
        r = self.client.authorize_code('client2')
274
        self.assertEqual(r.status_code, 400)
275
        self.assertCount(AuthorizationCode, 0)
276

    
277
        # invalid credentials
278
        self.client.set_credentials('client2', '')
279
        r = self.client.authorize_code('client2')
280
        self.assertEqual(r.status_code, 400)
281
        self.assertCount(AuthorizationCode, 0)
282

    
283
        # invalid redirect_uri: not absolute URI
284
        self.client.set_credentials()
285
        params = {'redirect_uri':
286
                  urlparse.urlparse(self.client1_redirect_uri).path}
287
        r = self.client.authorize_code('client1', urlparams=params)
288
        self.assertEqual(r.status_code, 400)
289
        self.assertCount(AuthorizationCode, 0)
290

    
291
        # mismatching redirect uri
292
        self.client.set_credentials()
293
        params = {'redirect_uri': self.client1_redirect_uri[1:]}
294
        r = self.client.authorize_code('client1', urlparams=params)
295
        self.assertEqual(r.status_code, 400)
296
        self.assertCount(AuthorizationCode, 0)
297

    
298
        # valid request: untrusted client
299
        params = {'redirect_uri': self.client1_redirect_uri,
300
                  'scope': self.client1_redirect_uri,
301
                  'extra_param': '123'}
302
        self.client.set_credentials('client1', 'secret')
303
        r = self.client.authorize_code('client1', urlparams=params)
304
        self.assertEqual(r.status_code, 302)
305
        self.assertTrue('Location' in r)
306
        self.assertHost(r['Location'], "testserver:80")
307
        self.assertPath(r['Location'], reverse('login'))
308

    
309
        self.client.set_credentials('client1', 'secret')
310
        self.client.login(username="user@synnefo.org", password="password")
311
        r = self.client.authorize_code('client1', urlparams=params)
312
        self.assertEqual(r.status_code, 200)
313

    
314
        r = self.client.authorize_code('client1', urlparams=params,
315
                                       extraparams={'reject': 0})
316
        self.assertCount(AuthorizationCode, 1)
317

    
318
        # redirect is valid
319
        redirect = self.get_redirect_url(r)
320
        self.assertParam(redirect, "code")
321
        self.assertNoParam(redirect, "extra_param")
322
        self.assertHost(redirect, "server.com")
323
        self.assertPath(redirect, "/handle_code")
324

    
325
        code = AuthorizationCode.objects.get(code=redirect.params['code'][0])
326
        #self.assertEqual(code.state, '')
327
        self.assertEqual(code.state, None)
328
        self.assertEqual(code.redirect_uri, self.client1_redirect_uri)
329

    
330
        params['state'] = 'csrfstate'
331
        params['scope'] = 'resource1'
332
        r = self.client.authorize_code('client1', urlparams=params)
333
        redirect = self.get_redirect_url(r)
334
        self.assertParamEqual(redirect, "state", 'csrfstate')
335
        self.assertCount(AuthorizationCode, 2)
336

    
337
        code = AuthorizationCode.objects.get(code=redirect.params['code'][0])
338
        self.assertEqual(code.state, 'csrfstate')
339
        self.assertEqual(code.redirect_uri, self.client1_redirect_uri)
340

    
341
        # valid request: trusted client
342
        params = {'redirect_uri': self.client3_redirect_uri,
343
                  'scope': self.client3_redirect_uri,
344
                  'extra_param': '123'}
345
        self.client.set_credentials('client3', 'secret')
346
        r = self.client.authorize_code('client3', urlparams=params)
347
        self.assertEqual(r.status_code, 302)
348
        self.assertCount(AuthorizationCode, 3)
349

    
350
        # redirect is valid
351
        redirect = self.get_redirect_url(r)
352
        self.assertParam(redirect, "code")
353
        self.assertNoParam(redirect, "state")
354
        self.assertNoParam(redirect, "extra_param")
355
        self.assertHost(redirect, "server3.com")
356
        self.assertPath(redirect, "/handle_code")
357

    
358
        code = AuthorizationCode.objects.get(code=redirect.params['code'][0])
359
        self.assertEqual(code.state, None)
360
        self.assertEqual(code.redirect_uri, self.client3_redirect_uri)
361

    
362
        # valid request: trusted client
363
        params['state'] = 'csrfstate'
364
        self.client.set_credentials('client3', 'secret')
365
        r = self.client.authorize_code('client3', urlparams=params)
366
        self.assertEqual(r.status_code, 302)
367
        self.assertCount(AuthorizationCode, 4)
368

    
369
        # redirect is valid
370
        redirect = self.get_redirect_url(r)
371
        self.assertParam(redirect, "code")
372
        self.assertParamEqual(redirect, "state", 'csrfstate')
373
        self.assertNoParam(redirect, "extra_param")
374
        self.assertHost(redirect, "server3.com")
375
        self.assertPath(redirect, "/handle_code")
376

    
377
        code = AuthorizationCode.objects.get(code=redirect.params['code'][0])
378
        self.assertEqual(code.state, 'csrfstate')
379
        self.assertEqual(code.redirect_uri, self.client3_redirect_uri)
380

    
381
        # redirect uri startswith the client's registered redirect url
382
        params['redirect_uri'] = '%smore' % self.client3_redirect_uri
383
        self.client.set_credentials('client3', 'secret')
384
        r = self.client.authorize_code('client3', urlparams=params)
385
        self.assertEqual(r.status_code, 400)
386

    
387
        # redirect uri descendant
388
        redirect_uri = '%s/' % self.client3_redirect_uri
389
        rest = settings.MAXIMUM_ALLOWED_REDIRECT_URI_LENGTH - len(redirect_uri)
390
        redirect_uri = '%s%s' % (redirect_uri, 'a'*rest)
391
        params['redirect_uri'] = redirect_uri
392
        self.client.set_credentials('client3', 'secret')
393
        r = self.client.authorize_code('client3', urlparams=params)
394
        self.assertEqual(r.status_code, 302)
395
        self.assertCount(AuthorizationCode, 5)
396

    
397
        # redirect is valid
398
        redirect = self.get_redirect_url(r)
399
        self.assertParam(redirect, "code")
400
        self.assertParamEqual(redirect, "state", 'csrfstate')
401
        self.assertNoParam(redirect, "extra_param")
402
        self.assertHost(redirect, "server3.com")
403
        self.assertPath(redirect, urlparse.urlparse(redirect_uri).path)
404

    
405
        code = AuthorizationCode.objects.get(code=redirect.params['code'][0])
406
        self.assertEqual(code.state, 'csrfstate')
407
        self.assertEqual(code.redirect_uri, redirect_uri)
408

    
409
        # too long redirect uri
410
        params['redirect_uri'] = '%sa' % redirect_uri
411
        self.client.set_credentials('client3', 'secret')
412
        r = self.client.authorize_code('client3', urlparams=params)
413
        self.assertEqual(r.status_code, 400)
414

    
415
    def test_get_token(self):
416
        # invalid method
417
        r = self.client.get(self.client.token_url)
418
        self.assertEqual(r.status_code, 405)
419
        self.assertTrue('Allow' in r)
420
        self.assertEqual(r['Allow'], 'POST')
421

    
422
        # invalid content type
423
        r = self.client.post(self.client.token_url)
424
        self.assertEqual(r.status_code, 400)
425

    
426
        # missing grant type
427
        r = self.client.post(self.client.token_url,
428
                             content_type='application/x-www-form-urlencoded')
429
        self.assertEqual(r.status_code, 400)
430

    
431
        # unsupported grant type: client_credentials
432
        r = self.client.post(self.client.token_url,
433
                             data='grant_type=client_credentials',
434
                             content_type='application/x-www-form-urlencoded')
435
        self.assertEqual(r.status_code, 400)
436

    
437
        # unsupported grant type: token
438
        r = self.client.post(self.client.token_url,
439
                             data='grant_type=token',
440
                             content_type='application/x-www-form-urlencoded')
441
        self.assertEqual(r.status_code, 400)
442

    
443
        # invalid grant type
444
        r = self.client.post(self.client.token_url,
445
                             data='grant_type=invalid',
446
                             content_type='application/x-www-form-urlencoded')
447
        self.assertEqual(r.status_code, 400)
448

    
449
        # generate authorization code: without redirect_uri
450
        self.client.login(username="user@synnefo.org", password="password")
451
        r = self.client.authorize_code('client3')
452
        self.assertCount(AuthorizationCode, 1)
453
        redirect = self.get_redirect_url(r)
454
        code_instance = AuthorizationCode.objects.get(
455
            code=redirect.params['code'][0])
456

    
457
        # no client_id & no client authorization
458
        r = self.client.access_token(code_instance.code)
459
        self.assertEqual(r.status_code, 400)
460

    
461
        # invalid client_id
462
        r = self.client.access_token(code_instance.code, client_id='client2')
463
        self.assertEqual(r.status_code, 400)
464

    
465
        # inexistent client_id
466
        r = self.client.access_token(code_instance.code, client_id='client42')
467
        self.assertEqual(r.status_code, 400)
468

    
469
        # no client authorization
470
        r = self.client.access_token(code_instance.code, client_id='client3')
471
        self.assertEqual(r.status_code, 400)
472

    
473
        # mixed up credentials/client_id's
474
        self.client.set_credentials('client1', 'secret')
475
        r = self.client.access_token(code_instance.code, client_id='client3')
476
        self.assertEqual(r.status_code, 400)
477

    
478
        # mixed up credentials/client_id's
479
        self.client.set_credentials('client3', 'secret')
480
        r = self.client.access_token(code_instance.code, client_id='client1')
481
        self.assertEqual(r.status_code, 400)
482

    
483
        # mismatching client
484
        self.client.set_credentials('client1', 'secret')
485
        r = self.client.access_token(code_instance.code, client_id='client1')
486
        self.assertEqual(r.status_code, 400)
487

    
488
        # invalid code
489
        self.client.set_credentials('client3', 'secret')
490
        r = self.client.access_token('invalid')
491
        self.assertEqual(r.status_code, 400)
492

    
493
        # valid request
494
        self.client.set_credentials('client3', 'secret')
495
        r = self.client.access_token(code_instance.code)
496
        self.assertCount(AuthorizationCode, 0)  # assert code is consumed
497
        self.assertCount(Token, 1)
498
        expected = {'redirect_uri': self.client3_redirect_uri,
499
                    'scope': self.client3_redirect_uri,
500
                    'state': None}
501
        self.assert_access_token_response(r, expected)
502

    
503
        # generate authorization code with too long redirect_uri
504
        redirect_uri = '%s/' % self.client3_redirect_uri
505
        rest = settings.MAXIMUM_ALLOWED_REDIRECT_URI_LENGTH - len(redirect_uri)
506
        redirect_uri = '%s%s' % (redirect_uri, 'a'*rest)
507
        params = {'redirect_uri': redirect_uri}
508
        r = self.client.authorize_code('client3', urlparams=params)
509
        self.assertCount(AuthorizationCode, 1)
510
        redirect = self.get_redirect_url(r)
511
        code_instance = AuthorizationCode.objects.get(
512
            code=redirect.params['code'][0])
513

    
514
        # valid request
515
        self.client.set_credentials('client3', 'secret')
516
        r = self.client.access_token(code_instance.code,
517
                                     redirect_uri='%sa' % redirect_uri)
518
        self.assertEqual(r.status_code, 400)
519

    
520
        r = self.client.access_token(code_instance.code,
521
                                     redirect_uri=redirect_uri)
522
        self.assertCount(AuthorizationCode, 0)  # assert code is consumed
523
        self.assertCount(Token, 2)
524
        expected = {'redirect_uri': redirect_uri,
525
                    'scope': redirect_uri,
526
                    'state': None}
527
        self.assert_access_token_response(r, expected)