Statistics
| Branch: | Tag: | Revision:

root / snf-astakos-app / astakos / oa2 / tests / djangobackend.py @ b806a15a

History | View | Annotate | Download (20.5 kB)

1
# Copyright 2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import urllib
35
import urlparse
36
import base64
37
import datetime
38

    
39
from collections import namedtuple
40

    
41
from django.test import TransactionTestCase as TestCase
42
from django.test import Client as TestClient
43

    
44
from django.core.urlresolvers import reverse
45
from django.utils import simplejson as json
46

    
47
from astakos.oa2.models import Client, AuthorizationCode, Token
48
from astakos.im.tests import common
49

    
50

    
51
ParsedURL = namedtuple('ParsedURL', ['host', 'scheme', 'path', 'params',
52
                                     'url'])
53

    
54

    
55
def parsed_url_wrapper(func):
56
    def wrapper(self, url, *args, **kwargs):
57
        url = self.parse_url(url)
58
        return func(self, url, *args, **kwargs)
59
    return wrapper
60

    
61

    
62
class URLAssertionsMixin(object):
63

    
64
    def get_redirect_url(self, request):
65
        return self.parse_url(request['Location'])
66

    
67
    def parse_url(self, url):
68
        if isinstance(url, ParsedURL):
69
            return url
70
        result = urlparse.urlparse(url)
71
        parsed = {
72
            'url': url,
73
            'host': result.netloc,
74
            'scheme': result.scheme,
75
            'path': result.path,
76
        }
77
        parsed['params'] = urlparse.parse_qs(result.query)
78
        return ParsedURL(**parsed)
79

    
80
    @parsed_url_wrapper
81
    def assertParamEqual(self, url, key, value):
82
        self.assertParam(url, key)
83
        self.assertEqual(url.params[key][0], value)
84

    
85
    @parsed_url_wrapper
86
    def assertNoParam(self, url, key):
87
        self.assertFalse(key in url.params,
88
                         "Url '%s' does contain '%s' parameter" % (url.url,
89
                                                                   key))
90

    
91
    @parsed_url_wrapper
92
    def assertParam(self, url, key):
93
        self.assertTrue(key in url.params,
94
                        "Url '%s' does not contain '%s' parameter" % (url.url,
95
                                                                      key))
96

    
97
    @parsed_url_wrapper
98
    def assertHost(self, url, host):
99
        self.assertEqual(url.host, host)
100

    
101
    @parsed_url_wrapper
102
    def assertPath(self, url, path):
103
        self.assertEqual(url.path, path)
104

    
105
    @parsed_url_wrapper
106
    def assertSecure(self, url, key):
107
        self.assertEqual(url.scheme, "https")
108

    
109

    
110
class OA2Client(TestClient):
111
    """
112
    An OAuth2 agnostic test client.
113
    """
114
    def __init__(self, baseurl, *args, **kwargs):
115
        self.oa2_url = baseurl
116
        self.token_url = self.oa2_url + 'token/'
117
        self.auth_url = self.oa2_url + 'auth/'
118
        self.credentials = kwargs.pop('credentials', ())
119

    
120
        kwargs['wsgi.url_scheme'] = 'https'
121
        return super(OA2Client, self).__init__(*args, **kwargs)
122

    
123
    def request(self, *args, **kwargs):
124
        #print kwargs.get('PATH_INFO') + '?' + kwargs.get('QUERY_STRING'), \
125
            #kwargs.get('HTTP_AUTHORIZATION', None)
126
        return super(OA2Client, self).request(*args, **kwargs)
127

    
128
    def get_url(self, token_or_auth, **params):
129
        return token_or_auth + '?' + urllib.urlencode(params)
130

    
131
    def grant(self, clientid, *args, **kwargs):
132
        """
133
        Do an authorization grant request.
134
        """
135
        params = {
136
            'grant_type': 'authorization_code',
137
            'client_id': clientid
138
        }
139
        urlparams = kwargs.pop('urlparams', {})
140
        params.update(urlparams)
141
        self.set_auth_headers(kwargs)
142
        return self.get(self.get_url(self.token_url, **params), *args,
143
                        **kwargs)
144

    
145
    def authorize_code(self, clientid, *args, **kwargs):
146
        """
147
        Do an authorization code request.
148
        """
149
        params = {
150
            'response_type': 'code',
151
            'client_id': clientid
152
        }
153
        urlparams = kwargs.pop('urlparams', {})
154
        urlparams.update(kwargs.pop('extraparams', {}))
155
        params.update(urlparams)
156
        self.set_auth_headers(kwargs)
157
        if 'reject' in params:
158
            return self.post(self.get_url(self.auth_url), data=params,
159
                             **kwargs)
160
        return self.get(self.get_url(self.auth_url, **params), *args, **kwargs)
161

    
162
    def access_token(self, code,
163
                     content_type='application/x-www-form-urlencoded',
164
                     **kwargs):
165
        """
166
        Do an get token request.
167
        """
168
        params = {
169
            'grant_type': 'authorization_code',
170
            'code': code
171
        }
172
        params.update(kwargs)
173
        self.set_auth_headers(kwargs)
174
        return self.post(self.token_url, data=urllib.urlencode(params),
175
                         content_type=content_type, **kwargs)
176

    
177
    def set_auth_headers(self, params):
178
        if not self.credentials:
179
            return
180
        credentials = base64.encodestring('%s:%s' % self.credentials).strip()
181
        params['HTTP_AUTHORIZATION'] = 'Basic %s' % credentials
182
        return params
183

    
184
    def set_credentials(self, user=None, pwd=None):
185
        self.credentials = (user, pwd)
186
        if not user and not pwd:
187
            self.credentials = ()
188

    
189

    
190
class TestOA2(TestCase, URLAssertionsMixin):
191

    
192
    def assertCount(self, model, count):
193
        self.assertEqual(model.objects.count(), count)
194

    
195
    def assert_access_token_response(self, r, expected):
196
        self.assertEqual(r.status_code, 200)
197
        try:
198
            data = json.loads(r.content)
199
        except:
200
            self.fail("Unexpected response content")
201

    
202
        self.assertTrue('access_token' in data)
203
        access_token = data['access_token']
204
        self.assertTrue('token_type' in data)
205
        token_type = data['token_type']
206
        self.assertTrue('expires_in' in data)
207
        expires_in = data['expires_in']
208

    
209
        try:
210
            token = Token.objects.get(code=access_token)
211
            self.assertEqual(token.expires_at,
212
                             token.created_at +
213
                             datetime.timedelta(seconds=expires_in))
214
            self.assertEqual(token.token_type, token_type)
215
            self.assertEqual(token.grant_type, 'authorization_code')
216
            #self.assertEqual(token.user, expected.get('user'))
217
            self.assertEqual(token.redirect_uri, expected.get('redirect_uri'))
218
            self.assertEqual(token.scope, expected.get('scope'))
219
            self.assertEqual(token.state, expected.get('state'))
220
        except Token.DoesNotExist:
221
            self.fail("Invalid access_token")
222

    
223
    def setUp(self):
224
        baseurl = reverse('oauth2_authenticate').replace('/auth', '/')
225
        self.client = OA2Client(baseurl)
226
        client1 = Client.objects.create(identifier="client1", secret="secret")
227
        self.client1_redirect_uri = "https://server.com/handle_code"
228
        client1.redirecturl_set.create(url=self.client1_redirect_uri)
229

    
230
        client2 = Client.objects.create(identifier="client2", type='public')
231
        self.client2_redirect_uri = "https://server2.com/handle_code"
232
        client2.redirecturl_set.create(url=self.client2_redirect_uri)
233

    
234
        client3 = Client.objects.create(identifier="client3", secret='secret',
235
                                        is_trusted=True)
236
        self.client3_redirect_uri = "https://server3.com/handle_code"
237
        client3.redirecturl_set.create(url=self.client3_redirect_uri)
238

    
239
        common.get_local_user("user@synnefo.org", password="password")
240

    
241
    def test_code_authorization(self):
242
        # missing response_type
243
        r = self.client.get(self.client.get_url(self.client.auth_url))
244
        self.assertEqual(r.status_code, 400)
245
        self.assertCount(AuthorizationCode, 0)
246

    
247
        # invalid response_type
248
        r = self.client.get(self.client.get_url(self.client.auth_url,
249
                                                response_type='invalid'))
250
        self.assertEqual(r.status_code, 400)
251
        self.assertCount(AuthorizationCode, 0)
252

    
253
        # unsupported response_type
254
        r = self.client.get(self.client.get_url(self.client.auth_url,
255
                                                response_type='token'))
256
        self.assertEqual(r.status_code, 400)
257
        self.assertCount(AuthorizationCode, 0)
258

    
259
        # missing client_id
260
        r = self.client.get(self.client.get_url(self.client.auth_url,
261
                                                response_type='code'))
262
        self.assertEqual(r.status_code, 400)
263
        self.assertCount(AuthorizationCode, 0)
264

    
265
        # fake client
266
        r = self.client.authorize_code('client-fake')
267
        self.assertEqual(r.status_code, 400)
268
        self.assertCount(AuthorizationCode, 0)
269

    
270
        # mixed up credentials/client_id's
271
        self.client.set_credentials('client1', 'secret')
272
        r = self.client.authorize_code('client2')
273
        self.assertEqual(r.status_code, 400)
274
        self.assertCount(AuthorizationCode, 0)
275

    
276
        # invalid credentials
277
        self.client.set_credentials('client2', '')
278
        r = self.client.authorize_code('client2')
279
        self.assertEqual(r.status_code, 400)
280
        self.assertCount(AuthorizationCode, 0)
281

    
282
        # invalid redirect_uri: not absolute URI
283
        self.client.set_credentials()
284
        params = {'redirect_uri':
285
                  urlparse.urlparse(self.client1_redirect_uri).path}
286
        r = self.client.authorize_code('client1', urlparams=params)
287
        self.assertEqual(r.status_code, 400)
288
        self.assertCount(AuthorizationCode, 0)
289

    
290
        # mismatching redirect uri
291
        self.client.set_credentials()
292
        params = {'redirect_uri': self.client1_redirect_uri[1:]}
293
        r = self.client.authorize_code('client1', urlparams=params)
294
        self.assertEqual(r.status_code, 400)
295
        self.assertCount(AuthorizationCode, 0)
296

    
297
        # valid request: untrusted client
298
        params = {'redirect_uri': self.client1_redirect_uri,
299
                  'scope': self.client1_redirect_uri,
300
                  'extra_param': '123'}
301
        self.client.set_credentials('client1', 'secret')
302
        r = self.client.authorize_code('client1', urlparams=params)
303
        self.assertEqual(r.status_code, 302)
304
        self.assertTrue('Location' in r)
305
        self.assertHost(r['Location'], "testserver:80")
306
        self.assertPath(r['Location'], reverse('login'))
307

    
308
        self.client.set_credentials('client1', 'secret')
309
        self.client.login(username="user@synnefo.org", password="password")
310
        r = self.client.authorize_code('client1', urlparams=params)
311
        self.assertEqual(r.status_code, 200)
312

    
313
        r = self.client.authorize_code('client1', urlparams=params,
314
                                       extraparams={'reject': 0})
315
        self.assertCount(AuthorizationCode, 1)
316

    
317
        # redirect is valid
318
        redirect = self.get_redirect_url(r)
319
        self.assertParam(redirect, "code")
320
        self.assertNoParam(redirect, "extra_param")
321
        self.assertHost(redirect, "server.com")
322
        self.assertPath(redirect, "/handle_code")
323

    
324
        code = AuthorizationCode.objects.get(code=redirect.params['code'][0])
325
        #self.assertEqual(code.state, '')
326
        self.assertEqual(code.state, None)
327
        self.assertEqual(code.redirect_uri, self.client1_redirect_uri)
328

    
329
        params['state'] = 'csrfstate'
330
        params['scope'] = 'resource1'
331
        r = self.client.authorize_code('client1', urlparams=params)
332
        redirect = self.get_redirect_url(r)
333
        self.assertParamEqual(redirect, "state", 'csrfstate')
334
        self.assertCount(AuthorizationCode, 2)
335

    
336
        code = AuthorizationCode.objects.get(code=redirect.params['code'][0])
337
        self.assertEqual(code.state, 'csrfstate')
338
        self.assertEqual(code.redirect_uri, self.client1_redirect_uri)
339

    
340
        # valid request: trusted client
341
        params = {'redirect_uri': self.client3_redirect_uri,
342
                  'scope': self.client3_redirect_uri,
343
                  'extra_param': '123'}
344
        self.client.set_credentials('client3', 'secret')
345
        r = self.client.authorize_code('client3', urlparams=params)
346
        self.assertEqual(r.status_code, 302)
347
        self.assertCount(AuthorizationCode, 3)
348

    
349
        # redirect is valid
350
        redirect = self.get_redirect_url(r)
351
        self.assertParam(redirect, "code")
352
        self.assertNoParam(redirect, "state")
353
        self.assertNoParam(redirect, "extra_param")
354
        self.assertHost(redirect, "server3.com")
355
        self.assertPath(redirect, "/handle_code")
356

    
357
        code = AuthorizationCode.objects.get(code=redirect.params['code'][0])
358
        self.assertEqual(code.state, None)
359
        self.assertEqual(code.redirect_uri, self.client3_redirect_uri)
360

    
361
        # valid request: trusted client
362
        params['state'] = 'csrfstate'
363
        self.client.set_credentials('client3', 'secret')
364
        r = self.client.authorize_code('client3', urlparams=params)
365
        self.assertEqual(r.status_code, 302)
366
        self.assertCount(AuthorizationCode, 4)
367

    
368
        # redirect is valid
369
        redirect = self.get_redirect_url(r)
370
        self.assertParam(redirect, "code")
371
        self.assertParamEqual(redirect, "state", 'csrfstate')
372
        self.assertNoParam(redirect, "extra_param")
373
        self.assertHost(redirect, "server3.com")
374
        self.assertPath(redirect, "/handle_code")
375

    
376
        code = AuthorizationCode.objects.get(code=redirect.params['code'][0])
377
        self.assertEqual(code.state, 'csrfstate')
378
        self.assertEqual(code.redirect_uri, self.client3_redirect_uri)
379

    
380
        # redirect uri startswith the client's registered redirect url
381
        params['redirect_uri'] = '%smore' % self.client3_redirect_uri
382
        self.client.set_credentials('client3', 'secret')
383
        r = self.client.authorize_code('client3', urlparams=params)
384
        self.assertEqual(r.status_code, 400)
385

    
386
        # redirect uri descendant
387
        redirect_uri = '%s/more' % self.client3_redirect_uri
388
        params['redirect_uri'] = redirect_uri
389
        self.client.set_credentials('client3', 'secret')
390
        r = self.client.authorize_code('client3', urlparams=params)
391
        self.assertEqual(r.status_code, 302)
392
        self.assertCount(AuthorizationCode, 5)
393

    
394
        # redirect is valid
395
        redirect = self.get_redirect_url(r)
396
        self.assertParam(redirect, "code")
397
        self.assertParamEqual(redirect, "state", 'csrfstate')
398
        self.assertNoParam(redirect, "extra_param")
399
        self.assertHost(redirect, "server3.com")
400
        self.assertPath(redirect, urlparse.urlparse(redirect_uri).path)
401

    
402
        code = AuthorizationCode.objects.get(code=redirect.params['code'][0])
403
        self.assertEqual(code.state, 'csrfstate')
404
        self.assertEqual(code.redirect_uri,
405
                         '%s/more' % self.client3_redirect_uri)
406

    
407
        # too long redirect uri
408
        redirect_uri = '%s?foo=%s' % (self.client3_redirect_uri, 'a'*10000)
409
        params['redirect_uri'] = redirect_uri
410
        self.client.set_credentials('client3', 'secret')
411
        r = self.client.authorize_code('client3', urlparams=params)
412
        self.assertEqual(r.status_code, 400)
413

    
414
    def test_get_token(self):
415
        # invalid method
416
        r = self.client.get(self.client.token_url)
417
        self.assertEqual(r.status_code, 405)
418
        self.assertTrue('Allow' in r)
419
        self.assertEqual(r['Allow'], 'POST')
420

    
421
        # invalid content type
422
        r = self.client.post(self.client.token_url)
423
        self.assertEqual(r.status_code, 400)
424

    
425
        # missing grant type
426
        r = self.client.post(self.client.token_url,
427
                             content_type='application/x-www-form-urlencoded')
428
        self.assertEqual(r.status_code, 400)
429

    
430
        # unsupported grant type: client_credentials
431
        r = self.client.post(self.client.token_url,
432
                             data='grant_type=client_credentials',
433
                             content_type='application/x-www-form-urlencoded')
434
        self.assertEqual(r.status_code, 400)
435

    
436
        # unsupported grant type: token
437
        r = self.client.post(self.client.token_url,
438
                             data='grant_type=token',
439
                             content_type='application/x-www-form-urlencoded')
440
        self.assertEqual(r.status_code, 400)
441

    
442
        # invalid grant type
443
        r = self.client.post(self.client.token_url,
444
                             data='grant_type=invalid',
445
                             content_type='application/x-www-form-urlencoded')
446
        self.assertEqual(r.status_code, 400)
447

    
448
        # generate authorization code: without redirect_uri
449
        self.client.login(username="user@synnefo.org", password="password")
450
        r = self.client.authorize_code('client3')
451
        self.assertCount(AuthorizationCode, 1)
452
        redirect = self.get_redirect_url(r)
453
        code_instance = AuthorizationCode.objects.get(
454
            code=redirect.params['code'][0])
455

    
456
        # no client_id & no client authorization
457
        r = self.client.access_token(code_instance.code)
458
        self.assertEqual(r.status_code, 400)
459

    
460
        # invalid client_id
461
        r = self.client.access_token(code_instance.code, client_id='client2')
462
        self.assertEqual(r.status_code, 400)
463

    
464
        # inexistent client_id
465
        r = self.client.access_token(code_instance.code, client_id='client42')
466
        self.assertEqual(r.status_code, 400)
467

    
468
        # no client authorization
469
        r = self.client.access_token(code_instance.code, client_id='client3')
470
        self.assertEqual(r.status_code, 400)
471

    
472
        # mixed up credentials/client_id's
473
        self.client.set_credentials('client1', 'secret')
474
        r = self.client.access_token(code_instance.code, client_id='client3')
475
        self.assertEqual(r.status_code, 400)
476

    
477
        # mixed up credentials/client_id's
478
        self.client.set_credentials('client3', 'secret')
479
        r = self.client.access_token(code_instance.code, client_id='client1')
480
        self.assertEqual(r.status_code, 400)
481

    
482
        # mismatching client
483
        self.client.set_credentials('client1', 'secret')
484
        r = self.client.access_token(code_instance.code, client_id='client1')
485
        self.assertEqual(r.status_code, 400)
486

    
487
        # invalid code
488
        self.client.set_credentials('client3', 'secret')
489
        r = self.client.access_token('invalid')
490
        self.assertEqual(r.status_code, 400)
491

    
492
        # valid request
493
        self.client.set_credentials('client3', 'secret')
494
        r = self.client.access_token(code_instance.code)
495
        self.assertCount(AuthorizationCode, 0)  # assert code is consumed
496
        self.assertCount(Token, 1)
497
        expected = {'redirect_uri': self.client3_redirect_uri,
498
                    'scope': self.client3_redirect_uri,
499
                    'state': None}
500
        self.assert_access_token_response(r, expected)
501

    
502
        # generate authorization code with too long redirect_uri
503
        redirect_uri = '%s/%s' % (self.client3_redirect_uri, 'a'*2000)
504
        params = {'redirect_uri': redirect_uri}
505
        r = self.client.authorize_code('client3', urlparams=params)
506
        self.assertCount(AuthorizationCode, 1)
507
        redirect = self.get_redirect_url(r)
508
        code_instance = AuthorizationCode.objects.get(
509
            code=redirect.params['code'][0])
510

    
511
        # valid request
512
        self.client.set_credentials('client3', 'secret')
513
        r = self.client.access_token(code_instance.code,
514
                                     redirect_uri=redirect_uri)
515
        self.assertCount(AuthorizationCode, 0)  # assert code is consumed
516
        self.assertCount(Token, 2)
517
        expected = {'redirect_uri': redirect_uri,
518
                    'scope': redirect_uri,
519
                    'state': None}
520
        self.assert_access_token_response(r, expected)