Statistics
| Branch: | Tag: | Revision:

root / snf-astakos-app / astakos / oa2 / tests / djangobackend.py @ dfdd413b

History | View | Annotate | Download (18.1 kB)

1
# Copyright 2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import urllib
35
import urlparse
36
import base64
37
import datetime
38

    
39
from collections import namedtuple
40

    
41
from django.test import TransactionTestCase as TestCase
42
from django.test import Client as TestClient
43

    
44
from django.core.urlresolvers import reverse
45
from django.utils import simplejson as json
46

    
47
from astakos.oa2.models import Client, AuthorizationCode, Token
48
from astakos.im.models import AstakosUser
49

    
50

    
51
ParsedURL = namedtuple('ParsedURL', ['host', 'scheme', 'path', 'params',
52
                                     'url'])
53

    
54

    
55
def parsed_url_wrapper(func):
56
    def wrapper(self, url, *args, **kwargs):
57
        url = self.parse_url(url)
58
        return func(self, url, *args, **kwargs)
59
    return wrapper
60

    
61

    
62
class URLAssertionsMixin(object):
63

    
64
    def get_redirect_url(self, request):
65
        return self.parse_url(request['Location'])
66

    
67
    def parse_url(self, url):
68
        if isinstance(url, ParsedURL):
69
            return url
70
        result = urlparse.urlparse(url)
71
        parsed = {
72
            'url': url,
73
            'host': result.netloc,
74
            'scheme': result.scheme,
75
            'path': result.path,
76
        }
77
        parsed['params'] = urlparse.parse_qs(result.query)
78
        return ParsedURL(**parsed)
79

    
80
    @parsed_url_wrapper
81
    def assertParamEqual(self, url, key, value):
82
        self.assertParam(url, key)
83
        self.assertEqual(url.params[key][0], value)
84

    
85
    @parsed_url_wrapper
86
    def assertNoParam(self, url, key):
87
        self.assertFalse(key in url.params,
88
                         "Url '%s' does contain '%s' parameter" % (url.url,
89
                                                                   key))
90

    
91
    @parsed_url_wrapper
92
    def assertParam(self, url, key):
93
        self.assertTrue(key in url.params,
94
                        "Url '%s' does not contain '%s' parameter" % (url.url,
95
                                                                      key))
96

    
97
    @parsed_url_wrapper
98
    def assertHost(self, url, host):
99
        self.assertEqual(url.host, host)
100

    
101
    @parsed_url_wrapper
102
    def assertPath(self, url, path):
103
        self.assertEqual(url.path, path)
104

    
105
    @parsed_url_wrapper
106
    def assertSecure(self, url, key):
107
        self.assertEqual(url.scheme, "https")
108

    
109

    
110
class OA2Client(TestClient):
111
    """
112
    An OAuth2 agnostic test client.
113
    """
114
    def __init__(self, baseurl, *args, **kwargs):
115
        self.oa2_url = baseurl
116
        self.token_url = self.oa2_url + 'token/'
117
        self.auth_url = self.oa2_url + 'auth/'
118
        self.credentials = kwargs.pop('credentials', ())
119

    
120
        kwargs['wsgi.url_scheme'] = 'https'
121
        return super(OA2Client, self).__init__(*args, **kwargs)
122

    
123
    def request(self, *args, **kwargs):
124
        #print kwargs.get('PATH_INFO') + '?' + kwargs.get('QUERY_STRING'), \
125
            #kwargs.get('HTTP_AUTHORIZATION', None)
126
        return super(OA2Client, self).request(*args, **kwargs)
127

    
128
    def get_url(self, token_or_auth, **params):
129
        return token_or_auth + '?' + urllib.urlencode(params)
130

    
131
    def grant(self, clientid, *args, **kwargs):
132
        """
133
        Do an authorization grant request.
134
        """
135
        params = {
136
            'grant_type': 'authorization_code',
137
            'client_id': clientid
138
        }
139
        urlparams = kwargs.pop('urlparams', {})
140
        params.update(urlparams)
141
        self.set_auth_headers(kwargs)
142
        return self.get(self.get_url(self.token_url, **params), *args,
143
                        **kwargs)
144

    
145
    def authorize_code(self, clientid, *args, **kwargs):
146
        """
147
        Do an authorization code request.
148
        """
149
        params = {
150
            'response_type': 'code',
151
            'client_id': clientid
152
        }
153
        urlparams = kwargs.pop('urlparams', {})
154
        urlparams.update(kwargs.pop('extraparams', {}))
155
        params.update(urlparams)
156
        self.set_auth_headers(kwargs)
157
        if 'reject' in params:
158
            return self.post(self.get_url(self.auth_url), data=params,
159
                             **kwargs)
160
        return self.get(self.get_url(self.auth_url, **params), *args, **kwargs)
161

    
162
    def access_token(self, code,
163
                     content_type='application/x-www-form-urlencoded',
164
                     **kwargs):
165
        """
166
        Do an get token request.
167
        """
168
        params = {
169
            'grant_type': 'authorization_code',
170
            'code': code
171
        }
172
        params.update(kwargs)
173
        self.set_auth_headers(kwargs)
174
        return self.post(self.token_url, data=urllib.urlencode(params),
175
                         content_type=content_type, **kwargs)
176

    
177
    def set_auth_headers(self, params):
178
        if not self.credentials:
179
            return
180
        credentials = base64.encodestring('%s:%s' % self.credentials).strip()
181
        params['HTTP_AUTHORIZATION'] = 'Basic %s' % credentials
182
        return params
183

    
184
    def set_credentials(self, user=None, pwd=None):
185
        self.credentials = (user, pwd)
186
        if not user and not pwd:
187
            self.credentials = ()
188

    
189

    
190
class TestOA2(TestCase, URLAssertionsMixin):
191

    
192
    def assertCount(self, model, count):
193
        self.assertEqual(model.objects.count(), count)
194

    
195
    def assert_access_token_response(self, r, expected):
196
        self.assertEqual(r.status_code, 200)
197
        try:
198
            data = json.loads(r.content)
199
        except:
200
            self.fail("Unexpected response content")
201

    
202
        self.assertTrue('access_token' in data)
203
        access_token = data['access_token']
204
        self.assertTrue('token_type' in data)
205
        token_type = data['token_type']
206
        self.assertTrue('expires_in' in data)
207
        expires_in = data['expires_in']
208

    
209
        try:
210
            token = Token.objects.get(code=access_token)
211
            self.assertEqual(token.expires_at,
212
                             token.created_at +
213
                             datetime.timedelta(seconds=expires_in))
214
            self.assertEqual(token.token_type, token_type)
215
            self.assertEqual(token.grant_type, 'authorization_code')
216
            #self.assertEqual(token.user, expected.get('user'))
217
            self.assertEqual(token.redirect_uri, expected.get('redirect_uri'))
218
            self.assertEqual(token.scope, expected.get('scope'))
219
            self.assertEqual(token.state, expected.get('state'))
220
        except Token.DoesNotExist:
221
            self.fail("Invalid access_token")
222

    
223
    def setUp(self):
224
        baseurl = reverse('oauth2_authenticate').replace('/auth', '/')
225
        self.client = OA2Client(baseurl)
226
        client1 = Client.objects.create(identifier="client1", secret="secret")
227
        self.client1_redirect_uri = "https://server.com/handle_code"
228
        client1.redirecturl_set.create(url=self.client1_redirect_uri)
229

    
230
        client2 = Client.objects.create(identifier="client2", type='public')
231
        self.client2_redirect_uri = "https://server2.com/handle_code"
232
        client2.redirecturl_set.create(url=self.client2_redirect_uri)
233

    
234
        client3 = Client.objects.create(identifier="client3", secret='secret',
235
                                        is_trusted=True)
236
        self.client3_redirect_uri = "https://server3.com/handle_code"
237
        client3.redirecturl_set.create(url=self.client3_redirect_uri)
238

    
239
        u = AstakosUser.objects.create(username="user@synnefo.org")
240
        u.set_password("password")
241
        u.save()
242

    
243
    def test_code_authorization(self):
244
        # missing response_type
245
        r = self.client.get(self.client.get_url(self.client.auth_url))
246
        self.assertEqual(r.status_code, 400)
247
        self.assertCount(AuthorizationCode, 0)
248

    
249
        # invalid response_type
250
        r = self.client.get(self.client.get_url(self.client.auth_url,
251
                                                response_type='invalid'))
252
        self.assertEqual(r.status_code, 400)
253
        self.assertCount(AuthorizationCode, 0)
254

    
255
        # unsupported response_type
256
        r = self.client.get(self.client.get_url(self.client.auth_url,
257
                                                response_type='token'))
258
        self.assertEqual(r.status_code, 400)
259
        self.assertCount(AuthorizationCode, 0)
260

    
261
        # missing client_id
262
        r = self.client.get(self.client.get_url(self.client.auth_url,
263
                                                response_type='code'))
264
        self.assertEqual(r.status_code, 400)
265
        self.assertCount(AuthorizationCode, 0)
266

    
267
        # fake client
268
        r = self.client.authorize_code('client-fake')
269
        self.assertEqual(r.status_code, 400)
270
        self.assertCount(AuthorizationCode, 0)
271

    
272
        # mixed up credentials/client_id's
273
        self.client.set_credentials('client1', 'secret')
274
        r = self.client.authorize_code('client2')
275
        self.assertEqual(r.status_code, 400)
276
        self.assertCount(AuthorizationCode, 0)
277

    
278
        # invalid credentials
279
        self.client.set_credentials('client2', '')
280
        r = self.client.authorize_code('client2')
281
        self.assertEqual(r.status_code, 400)
282
        self.assertCount(AuthorizationCode, 0)
283

    
284
        # invalid redirect_uri: not absolute URI
285
        self.client.set_credentials()
286
        params = {'redirect_uri':
287
                  urlparse.urlparse(self.client1_redirect_uri).path}
288
        r = self.client.authorize_code('client1', urlparams=params)
289
        self.assertEqual(r.status_code, 400)
290
        self.assertCount(AuthorizationCode, 0)
291

    
292
        # mismatching redirect uri
293
        self.client.set_credentials()
294
        params = {'redirect_uri': self.client1_redirect_uri[1:]}
295
        r = self.client.authorize_code('client1', urlparams=params)
296
        self.assertEqual(r.status_code, 400)
297
        self.assertCount(AuthorizationCode, 0)
298

    
299
        # valid request: untrusted client
300
        params = {'redirect_uri': self.client1_redirect_uri,
301
                  'scope': self.client1_redirect_uri,
302
                  'extra_param': '123'}
303
        self.client.set_credentials('client1', 'secret')
304
        r = self.client.authorize_code('client1', urlparams=params)
305
        self.assertEqual(r.status_code, 302)
306
        self.assertTrue('Location' in r)
307
        self.assertHost(r['Location'], "testserver:80")
308
        self.assertPath(r['Location'], reverse('login'))
309

    
310
        self.client.set_credentials('client1', 'secret')
311
        self.client.login(username="user@synnefo.org", password="password")
312
        r = self.client.authorize_code('client1', urlparams=params)
313
        self.assertEqual(r.status_code, 200)
314

    
315
        r = self.client.authorize_code('client1', urlparams=params,
316
                                       extraparams={'reject': 0})
317
        self.assertCount(AuthorizationCode, 1)
318

    
319
        # redirect is valid
320
        redirect1 = self.get_redirect_url(r)
321
        self.assertParam(redirect1, "code")
322
        self.assertNoParam(redirect1, "extra_param")
323
        self.assertHost(redirect1, "server.com")
324
        self.assertPath(redirect1, "/handle_code")
325

    
326
        params['state'] = 'csrfstate'
327
        params['scope'] = 'resource1'
328
        r = self.client.authorize_code('client1', urlparams=params)
329
        redirect2 = self.get_redirect_url(r)
330
        self.assertParamEqual(redirect2, "state", 'csrfstate')
331
        self.assertCount(AuthorizationCode, 2)
332

    
333
        code1 = AuthorizationCode.objects.get(code=redirect1.params['code'][0])
334
        #self.assertEqual(code1.state, '')
335
        self.assertEqual(code1.state, None)
336
        self.assertEqual(code1.redirect_uri, self.client1_redirect_uri)
337

    
338
        code2 = AuthorizationCode.objects.get(code=redirect2.params['code'][0])
339
        self.assertEqual(code2.state, 'csrfstate')
340
        self.assertEqual(code2.redirect_uri, self.client1_redirect_uri)
341

    
342
        # valid request: trusted client
343
        params = {'redirect_uri': self.client3_redirect_uri,
344
                  'scope': self.client3_redirect_uri,
345
                  'extra_param': '123'}
346
        self.client.set_credentials('client3', 'secret')
347
        r = self.client.authorize_code('client3', urlparams=params)
348
        self.assertEqual(r.status_code, 302)
349
        self.assertCount(AuthorizationCode, 3)
350

    
351
        # redirect is valid
352
        redirect3 = self.get_redirect_url(r)
353
        self.assertParam(redirect1, "code")
354
        self.assertNoParam(redirect3, "state")
355
        self.assertNoParam(redirect3, "extra_param")
356
        self.assertHost(redirect3, "server3.com")
357
        self.assertPath(redirect3, "/handle_code")
358

    
359
        code3 = AuthorizationCode.objects.get(code=redirect3.params['code'][0])
360
        self.assertEqual(code3.state, None)
361
        self.assertEqual(code3.redirect_uri, self.client3_redirect_uri)
362

    
363
        # valid request: trusted client
364
        params['state'] = 'csrfstate'
365
        self.client.set_credentials('client3', 'secret')
366
        r = self.client.authorize_code('client3', urlparams=params)
367
        self.assertEqual(r.status_code, 302)
368
        self.assertCount(AuthorizationCode, 4)
369

    
370
        # redirect is valid
371
        redirect4 = self.get_redirect_url(r)
372
        self.assertParam(redirect4, "code")
373
        self.assertParamEqual(redirect4, "state", 'csrfstate')
374
        self.assertNoParam(redirect4, "extra_param")
375
        self.assertHost(redirect4, "server3.com")
376
        self.assertPath(redirect4, "/handle_code")
377

    
378
        code4 = AuthorizationCode.objects.get(code=redirect4.params['code'][0])
379
        self.assertEqual(code4.state, 'csrfstate')
380
        self.assertEqual(code4.redirect_uri, self.client3_redirect_uri)
381

    
382
    def test_get_token(self):
383
        # invalid method
384
        r = self.client.get(self.client.token_url)
385
        self.assertEqual(r.status_code, 405)
386
        self.assertTrue('Allow' in r)
387
        self.assertEqual(r['Allow'], 'POST')
388

    
389
        # invalid content type
390
        r = self.client.post(self.client.token_url)
391
        self.assertEqual(r.status_code, 400)
392

    
393
        # missing grant type
394
        r = self.client.post(self.client.token_url,
395
                             content_type='application/x-www-form-urlencoded')
396
        self.assertEqual(r.status_code, 400)
397

    
398
        # unsupported grant type: client_credentials
399
        r = self.client.post(self.client.token_url,
400
                             data='grant_type=client_credentials',
401
                             content_type='application/x-www-form-urlencoded')
402
        self.assertEqual(r.status_code, 400)
403

    
404
        # unsupported grant type: token
405
        r = self.client.post(self.client.token_url,
406
                             data='grant_type=token',
407
                             content_type='application/x-www-form-urlencoded')
408
        self.assertEqual(r.status_code, 400)
409

    
410
        # invalid grant type
411
        r = self.client.post(self.client.token_url,
412
                             data='grant_type=invalid',
413
                             content_type='application/x-www-form-urlencoded')
414
        self.assertEqual(r.status_code, 400)
415

    
416
        # generate authorization code: without redirect_uri
417
        self.client.login(username="user@synnefo.org", password="password")
418
        r = self.client.authorize_code('client3')
419
        self.assertCount(AuthorizationCode, 1)
420
        redirect = self.get_redirect_url(r)
421
        code_instance = AuthorizationCode.objects.get(
422
            code=redirect.params['code'][0])
423

    
424
        # no client_id & no client authorization
425
        r = self.client.access_token(code_instance.code)
426
        self.assertEqual(r.status_code, 400)
427

    
428
        # invalid client_id
429
        r = self.client.access_token(code_instance.code, client_id='client2')
430
        self.assertEqual(r.status_code, 400)
431

    
432
        # inexistent client_id
433
        r = self.client.access_token(code_instance.code, client_id='client42')
434
        self.assertEqual(r.status_code, 400)
435

    
436
        # no client authorization
437
        r = self.client.access_token(code_instance.code, client_id='client3')
438
        self.assertEqual(r.status_code, 400)
439

    
440
        # mixed up credentials/client_id's
441
        self.client.set_credentials('client1', 'secret')
442
        r = self.client.access_token(code_instance.code, client_id='client3')
443
        self.assertEqual(r.status_code, 400)
444

    
445
        # mixed up credentials/client_id's
446
        self.client.set_credentials('client3', 'secret')
447
        r = self.client.access_token(code_instance.code, client_id='client1')
448
        self.assertEqual(r.status_code, 400)
449

    
450
        # mismatching client
451
        self.client.set_credentials('client1', 'secret')
452
        r = self.client.access_token(code_instance.code, client_id='client1')
453
        self.assertEqual(r.status_code, 400)
454

    
455
        # invalid code
456
        self.client.set_credentials('client3', 'secret')
457
        r = self.client.access_token('invalid')
458
        self.assertEqual(r.status_code, 400)
459

    
460
        # valid request
461
        self.client.set_credentials('client3', 'secret')
462
        r = self.client.access_token(code_instance.code)
463
        self.assertCount(AuthorizationCode, 0)  # assert code is consumed
464
        self.assertCount(Token, 1)
465
        expected = {'redirect_uri': self.client3_redirect_uri,
466
                    'scope': self.client3_redirect_uri,
467
                    'state': None}
468
        self.assert_access_token_response(r, expected)