root / snf-astakos-app / astakos / oa2 / tests / djangobackend.py @ 0d9523c3
History | View | Annotate | Download (18.1 kB)
1 |
# Copyright 2013 GRNET S.A. All rights reserved.
|
---|---|
2 |
#
|
3 |
# Redistribution and use in source and binary forms, with or
|
4 |
# without modification, are permitted provided that the following
|
5 |
# conditions are met:
|
6 |
#
|
7 |
# 1. Redistributions of source code must retain the above
|
8 |
# copyright notice, this list of conditions and the following
|
9 |
# disclaimer.
|
10 |
#
|
11 |
# 2. Redistributions in binary form must reproduce the above
|
12 |
# copyright notice, this list of conditions and the following
|
13 |
# disclaimer in the documentation and/or other materials
|
14 |
# provided with the distribution.
|
15 |
#
|
16 |
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
|
17 |
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
|
18 |
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
|
19 |
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
|
20 |
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
21 |
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
22 |
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
|
23 |
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
|
24 |
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
|
25 |
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
|
26 |
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
27 |
# POSSIBILITY OF SUCH DAMAGE.
|
28 |
#
|
29 |
# The views and conclusions contained in the software and
|
30 |
# documentation are those of the authors and should not be
|
31 |
# interpreted as representing official policies, either expressed
|
32 |
# or implied, of GRNET S.A.
|
33 |
|
34 |
import urllib |
35 |
import urlparse |
36 |
import base64 |
37 |
import datetime |
38 |
|
39 |
from collections import namedtuple |
40 |
|
41 |
from django.test import TransactionTestCase as TestCase |
42 |
from django.test import Client as TestClient |
43 |
|
44 |
from django.core.urlresolvers import reverse |
45 |
from django.utils import simplejson as json |
46 |
|
47 |
from astakos.oa2.models import Client, AuthorizationCode, Token |
48 |
from astakos.im.tests import common |
49 |
|
50 |
|
51 |
ParsedURL = namedtuple('ParsedURL', ['host', 'scheme', 'path', 'params', |
52 |
'url'])
|
53 |
|
54 |
|
55 |
def parsed_url_wrapper(func): |
56 |
def wrapper(self, url, *args, **kwargs): |
57 |
url = self.parse_url(url)
|
58 |
return func(self, url, *args, **kwargs) |
59 |
return wrapper
|
60 |
|
61 |
|
62 |
class URLAssertionsMixin(object): |
63 |
|
64 |
def get_redirect_url(self, request): |
65 |
return self.parse_url(request['Location']) |
66 |
|
67 |
def parse_url(self, url): |
68 |
if isinstance(url, ParsedURL): |
69 |
return url
|
70 |
result = urlparse.urlparse(url) |
71 |
parsed = { |
72 |
'url': url,
|
73 |
'host': result.netloc,
|
74 |
'scheme': result.scheme,
|
75 |
'path': result.path,
|
76 |
} |
77 |
parsed['params'] = urlparse.parse_qs(result.query)
|
78 |
return ParsedURL(**parsed)
|
79 |
|
80 |
@parsed_url_wrapper
|
81 |
def assertParamEqual(self, url, key, value): |
82 |
self.assertParam(url, key)
|
83 |
self.assertEqual(url.params[key][0], value) |
84 |
|
85 |
@parsed_url_wrapper
|
86 |
def assertNoParam(self, url, key): |
87 |
self.assertFalse(key in url.params, |
88 |
"Url '%s' does contain '%s' parameter" % (url.url,
|
89 |
key)) |
90 |
|
91 |
@parsed_url_wrapper
|
92 |
def assertParam(self, url, key): |
93 |
self.assertTrue(key in url.params, |
94 |
"Url '%s' does not contain '%s' parameter" % (url.url,
|
95 |
key)) |
96 |
|
97 |
@parsed_url_wrapper
|
98 |
def assertHost(self, url, host): |
99 |
self.assertEqual(url.host, host)
|
100 |
|
101 |
@parsed_url_wrapper
|
102 |
def assertPath(self, url, path): |
103 |
self.assertEqual(url.path, path)
|
104 |
|
105 |
@parsed_url_wrapper
|
106 |
def assertSecure(self, url, key): |
107 |
self.assertEqual(url.scheme, "https") |
108 |
|
109 |
|
110 |
class OA2Client(TestClient): |
111 |
"""
|
112 |
An OAuth2 agnostic test client.
|
113 |
"""
|
114 |
def __init__(self, baseurl, *args, **kwargs): |
115 |
self.oa2_url = baseurl
|
116 |
self.token_url = self.oa2_url + 'token/' |
117 |
self.auth_url = self.oa2_url + 'auth/' |
118 |
self.credentials = kwargs.pop('credentials', ()) |
119 |
|
120 |
kwargs['wsgi.url_scheme'] = 'https' |
121 |
return super(OA2Client, self).__init__(*args, **kwargs) |
122 |
|
123 |
def request(self, *args, **kwargs): |
124 |
#print kwargs.get('PATH_INFO') + '?' + kwargs.get('QUERY_STRING'), \
|
125 |
#kwargs.get('HTTP_AUTHORIZATION', None)
|
126 |
return super(OA2Client, self).request(*args, **kwargs) |
127 |
|
128 |
def get_url(self, token_or_auth, **params): |
129 |
return token_or_auth + '?' + urllib.urlencode(params) |
130 |
|
131 |
def grant(self, clientid, *args, **kwargs): |
132 |
"""
|
133 |
Do an authorization grant request.
|
134 |
"""
|
135 |
params = { |
136 |
'grant_type': 'authorization_code', |
137 |
'client_id': clientid
|
138 |
} |
139 |
urlparams = kwargs.pop('urlparams', {})
|
140 |
params.update(urlparams) |
141 |
self.set_auth_headers(kwargs)
|
142 |
return self.get(self.get_url(self.token_url, **params), *args, |
143 |
**kwargs) |
144 |
|
145 |
def authorize_code(self, clientid, *args, **kwargs): |
146 |
"""
|
147 |
Do an authorization code request.
|
148 |
"""
|
149 |
params = { |
150 |
'response_type': 'code', |
151 |
'client_id': clientid
|
152 |
} |
153 |
urlparams = kwargs.pop('urlparams', {})
|
154 |
urlparams.update(kwargs.pop('extraparams', {}))
|
155 |
params.update(urlparams) |
156 |
self.set_auth_headers(kwargs)
|
157 |
if 'reject' in params: |
158 |
return self.post(self.get_url(self.auth_url), data=params, |
159 |
**kwargs) |
160 |
return self.get(self.get_url(self.auth_url, **params), *args, **kwargs) |
161 |
|
162 |
def access_token(self, code, |
163 |
content_type='application/x-www-form-urlencoded',
|
164 |
**kwargs): |
165 |
"""
|
166 |
Do an get token request.
|
167 |
"""
|
168 |
params = { |
169 |
'grant_type': 'authorization_code', |
170 |
'code': code
|
171 |
} |
172 |
params.update(kwargs) |
173 |
self.set_auth_headers(kwargs)
|
174 |
return self.post(self.token_url, data=urllib.urlencode(params), |
175 |
content_type=content_type, **kwargs) |
176 |
|
177 |
def set_auth_headers(self, params): |
178 |
if not self.credentials: |
179 |
return
|
180 |
credentials = base64.encodestring('%s:%s' % self.credentials).strip() |
181 |
params['HTTP_AUTHORIZATION'] = 'Basic %s' % credentials |
182 |
return params
|
183 |
|
184 |
def set_credentials(self, user=None, pwd=None): |
185 |
self.credentials = (user, pwd)
|
186 |
if not user and not pwd: |
187 |
self.credentials = ()
|
188 |
|
189 |
|
190 |
class TestOA2(TestCase, URLAssertionsMixin): |
191 |
|
192 |
def assertCount(self, model, count): |
193 |
self.assertEqual(model.objects.count(), count)
|
194 |
|
195 |
def assert_access_token_response(self, r, expected): |
196 |
self.assertEqual(r.status_code, 200) |
197 |
try:
|
198 |
data = json.loads(r.content) |
199 |
except:
|
200 |
self.fail("Unexpected response content") |
201 |
|
202 |
self.assertTrue('access_token' in data) |
203 |
access_token = data['access_token']
|
204 |
self.assertTrue('token_type' in data) |
205 |
token_type = data['token_type']
|
206 |
self.assertTrue('expires_in' in data) |
207 |
expires_in = data['expires_in']
|
208 |
|
209 |
try:
|
210 |
token = Token.objects.get(code=access_token) |
211 |
self.assertEqual(token.expires_at,
|
212 |
token.created_at + |
213 |
datetime.timedelta(seconds=expires_in)) |
214 |
self.assertEqual(token.token_type, token_type)
|
215 |
self.assertEqual(token.grant_type, 'authorization_code') |
216 |
#self.assertEqual(token.user, expected.get('user'))
|
217 |
self.assertEqual(token.redirect_uri, expected.get('redirect_uri')) |
218 |
self.assertEqual(token.scope, expected.get('scope')) |
219 |
self.assertEqual(token.state, expected.get('state')) |
220 |
except Token.DoesNotExist:
|
221 |
self.fail("Invalid access_token") |
222 |
|
223 |
def setUp(self): |
224 |
baseurl = reverse('oauth2_authenticate').replace('/auth', '/') |
225 |
self.client = OA2Client(baseurl)
|
226 |
client1 = Client.objects.create(identifier="client1", secret="secret") |
227 |
self.client1_redirect_uri = "https://server.com/handle_code" |
228 |
client1.redirecturl_set.create(url=self.client1_redirect_uri)
|
229 |
|
230 |
client2 = Client.objects.create(identifier="client2", type='public') |
231 |
self.client2_redirect_uri = "https://server2.com/handle_code" |
232 |
client2.redirecturl_set.create(url=self.client2_redirect_uri)
|
233 |
|
234 |
client3 = Client.objects.create(identifier="client3", secret='secret', |
235 |
is_trusted=True)
|
236 |
self.client3_redirect_uri = "https://server3.com/handle_code" |
237 |
client3.redirecturl_set.create(url=self.client3_redirect_uri)
|
238 |
|
239 |
common.get_local_user("user@synnefo.org", password="password") |
240 |
|
241 |
def test_code_authorization(self): |
242 |
# missing response_type
|
243 |
r = self.client.get(self.client.get_url(self.client.auth_url)) |
244 |
self.assertEqual(r.status_code, 400) |
245 |
self.assertCount(AuthorizationCode, 0) |
246 |
|
247 |
# invalid response_type
|
248 |
r = self.client.get(self.client.get_url(self.client.auth_url, |
249 |
response_type='invalid'))
|
250 |
self.assertEqual(r.status_code, 400) |
251 |
self.assertCount(AuthorizationCode, 0) |
252 |
|
253 |
# unsupported response_type
|
254 |
r = self.client.get(self.client.get_url(self.client.auth_url, |
255 |
response_type='token'))
|
256 |
self.assertEqual(r.status_code, 400) |
257 |
self.assertCount(AuthorizationCode, 0) |
258 |
|
259 |
# missing client_id
|
260 |
r = self.client.get(self.client.get_url(self.client.auth_url, |
261 |
response_type='code'))
|
262 |
self.assertEqual(r.status_code, 400) |
263 |
self.assertCount(AuthorizationCode, 0) |
264 |
|
265 |
# fake client
|
266 |
r = self.client.authorize_code('client-fake') |
267 |
self.assertEqual(r.status_code, 400) |
268 |
self.assertCount(AuthorizationCode, 0) |
269 |
|
270 |
# mixed up credentials/client_id's
|
271 |
self.client.set_credentials('client1', 'secret') |
272 |
r = self.client.authorize_code('client2') |
273 |
self.assertEqual(r.status_code, 400) |
274 |
self.assertCount(AuthorizationCode, 0) |
275 |
|
276 |
# invalid credentials
|
277 |
self.client.set_credentials('client2', '') |
278 |
r = self.client.authorize_code('client2') |
279 |
self.assertEqual(r.status_code, 400) |
280 |
self.assertCount(AuthorizationCode, 0) |
281 |
|
282 |
# invalid redirect_uri: not absolute URI
|
283 |
self.client.set_credentials()
|
284 |
params = {'redirect_uri':
|
285 |
urlparse.urlparse(self.client1_redirect_uri).path}
|
286 |
r = self.client.authorize_code('client1', urlparams=params) |
287 |
self.assertEqual(r.status_code, 400) |
288 |
self.assertCount(AuthorizationCode, 0) |
289 |
|
290 |
# mismatching redirect uri
|
291 |
self.client.set_credentials()
|
292 |
params = {'redirect_uri': self.client1_redirect_uri[1:]} |
293 |
r = self.client.authorize_code('client1', urlparams=params) |
294 |
self.assertEqual(r.status_code, 400) |
295 |
self.assertCount(AuthorizationCode, 0) |
296 |
|
297 |
# valid request: untrusted client
|
298 |
params = {'redirect_uri': self.client1_redirect_uri, |
299 |
'scope': self.client1_redirect_uri, |
300 |
'extra_param': '123'} |
301 |
self.client.set_credentials('client1', 'secret') |
302 |
r = self.client.authorize_code('client1', urlparams=params) |
303 |
self.assertEqual(r.status_code, 302) |
304 |
self.assertTrue('Location' in r) |
305 |
self.assertHost(r['Location'], "testserver:80") |
306 |
self.assertPath(r['Location'], reverse('login')) |
307 |
|
308 |
self.client.set_credentials('client1', 'secret') |
309 |
self.client.login(username="user@synnefo.org", password="password") |
310 |
r = self.client.authorize_code('client1', urlparams=params) |
311 |
self.assertEqual(r.status_code, 200) |
312 |
|
313 |
r = self.client.authorize_code('client1', urlparams=params, |
314 |
extraparams={'reject': 0}) |
315 |
self.assertCount(AuthorizationCode, 1) |
316 |
|
317 |
# redirect is valid
|
318 |
redirect1 = self.get_redirect_url(r)
|
319 |
self.assertParam(redirect1, "code") |
320 |
self.assertNoParam(redirect1, "extra_param") |
321 |
self.assertHost(redirect1, "server.com") |
322 |
self.assertPath(redirect1, "/handle_code") |
323 |
|
324 |
params['state'] = 'csrfstate' |
325 |
params['scope'] = 'resource1' |
326 |
r = self.client.authorize_code('client1', urlparams=params) |
327 |
redirect2 = self.get_redirect_url(r)
|
328 |
self.assertParamEqual(redirect2, "state", 'csrfstate') |
329 |
self.assertCount(AuthorizationCode, 2) |
330 |
|
331 |
code1 = AuthorizationCode.objects.get(code=redirect1.params['code'][0]) |
332 |
#self.assertEqual(code1.state, '')
|
333 |
self.assertEqual(code1.state, None) |
334 |
self.assertEqual(code1.redirect_uri, self.client1_redirect_uri) |
335 |
|
336 |
code2 = AuthorizationCode.objects.get(code=redirect2.params['code'][0]) |
337 |
self.assertEqual(code2.state, 'csrfstate') |
338 |
self.assertEqual(code2.redirect_uri, self.client1_redirect_uri) |
339 |
|
340 |
# valid request: trusted client
|
341 |
params = {'redirect_uri': self.client3_redirect_uri, |
342 |
'scope': self.client3_redirect_uri, |
343 |
'extra_param': '123'} |
344 |
self.client.set_credentials('client3', 'secret') |
345 |
r = self.client.authorize_code('client3', urlparams=params) |
346 |
self.assertEqual(r.status_code, 302) |
347 |
self.assertCount(AuthorizationCode, 3) |
348 |
|
349 |
# redirect is valid
|
350 |
redirect3 = self.get_redirect_url(r)
|
351 |
self.assertParam(redirect1, "code") |
352 |
self.assertNoParam(redirect3, "state") |
353 |
self.assertNoParam(redirect3, "extra_param") |
354 |
self.assertHost(redirect3, "server3.com") |
355 |
self.assertPath(redirect3, "/handle_code") |
356 |
|
357 |
code3 = AuthorizationCode.objects.get(code=redirect3.params['code'][0]) |
358 |
self.assertEqual(code3.state, None) |
359 |
self.assertEqual(code3.redirect_uri, self.client3_redirect_uri) |
360 |
|
361 |
# valid request: trusted client
|
362 |
params['state'] = 'csrfstate' |
363 |
self.client.set_credentials('client3', 'secret') |
364 |
r = self.client.authorize_code('client3', urlparams=params) |
365 |
self.assertEqual(r.status_code, 302) |
366 |
self.assertCount(AuthorizationCode, 4) |
367 |
|
368 |
# redirect is valid
|
369 |
redirect4 = self.get_redirect_url(r)
|
370 |
self.assertParam(redirect4, "code") |
371 |
self.assertParamEqual(redirect4, "state", 'csrfstate') |
372 |
self.assertNoParam(redirect4, "extra_param") |
373 |
self.assertHost(redirect4, "server3.com") |
374 |
self.assertPath(redirect4, "/handle_code") |
375 |
|
376 |
code4 = AuthorizationCode.objects.get(code=redirect4.params['code'][0]) |
377 |
self.assertEqual(code4.state, 'csrfstate') |
378 |
self.assertEqual(code4.redirect_uri, self.client3_redirect_uri) |
379 |
|
380 |
def test_get_token(self): |
381 |
# invalid method
|
382 |
r = self.client.get(self.client.token_url) |
383 |
self.assertEqual(r.status_code, 405) |
384 |
self.assertTrue('Allow' in r) |
385 |
self.assertEqual(r['Allow'], 'POST') |
386 |
|
387 |
# invalid content type
|
388 |
r = self.client.post(self.client.token_url) |
389 |
self.assertEqual(r.status_code, 400) |
390 |
|
391 |
# missing grant type
|
392 |
r = self.client.post(self.client.token_url, |
393 |
content_type='application/x-www-form-urlencoded')
|
394 |
self.assertEqual(r.status_code, 400) |
395 |
|
396 |
# unsupported grant type: client_credentials
|
397 |
r = self.client.post(self.client.token_url, |
398 |
data='grant_type=client_credentials',
|
399 |
content_type='application/x-www-form-urlencoded')
|
400 |
self.assertEqual(r.status_code, 400) |
401 |
|
402 |
# unsupported grant type: token
|
403 |
r = self.client.post(self.client.token_url, |
404 |
data='grant_type=token',
|
405 |
content_type='application/x-www-form-urlencoded')
|
406 |
self.assertEqual(r.status_code, 400) |
407 |
|
408 |
# invalid grant type
|
409 |
r = self.client.post(self.client.token_url, |
410 |
data='grant_type=invalid',
|
411 |
content_type='application/x-www-form-urlencoded')
|
412 |
self.assertEqual(r.status_code, 400) |
413 |
|
414 |
# generate authorization code: without redirect_uri
|
415 |
self.client.login(username="user@synnefo.org", password="password") |
416 |
r = self.client.authorize_code('client3') |
417 |
self.assertCount(AuthorizationCode, 1) |
418 |
redirect = self.get_redirect_url(r)
|
419 |
code_instance = AuthorizationCode.objects.get( |
420 |
code=redirect.params['code'][0]) |
421 |
|
422 |
# no client_id & no client authorization
|
423 |
r = self.client.access_token(code_instance.code)
|
424 |
self.assertEqual(r.status_code, 400) |
425 |
|
426 |
# invalid client_id
|
427 |
r = self.client.access_token(code_instance.code, client_id='client2') |
428 |
self.assertEqual(r.status_code, 400) |
429 |
|
430 |
# inexistent client_id
|
431 |
r = self.client.access_token(code_instance.code, client_id='client42') |
432 |
self.assertEqual(r.status_code, 400) |
433 |
|
434 |
# no client authorization
|
435 |
r = self.client.access_token(code_instance.code, client_id='client3') |
436 |
self.assertEqual(r.status_code, 400) |
437 |
|
438 |
# mixed up credentials/client_id's
|
439 |
self.client.set_credentials('client1', 'secret') |
440 |
r = self.client.access_token(code_instance.code, client_id='client3') |
441 |
self.assertEqual(r.status_code, 400) |
442 |
|
443 |
# mixed up credentials/client_id's
|
444 |
self.client.set_credentials('client3', 'secret') |
445 |
r = self.client.access_token(code_instance.code, client_id='client1') |
446 |
self.assertEqual(r.status_code, 400) |
447 |
|
448 |
# mismatching client
|
449 |
self.client.set_credentials('client1', 'secret') |
450 |
r = self.client.access_token(code_instance.code, client_id='client1') |
451 |
self.assertEqual(r.status_code, 400) |
452 |
|
453 |
# invalid code
|
454 |
self.client.set_credentials('client3', 'secret') |
455 |
r = self.client.access_token('invalid') |
456 |
self.assertEqual(r.status_code, 400) |
457 |
|
458 |
# valid request
|
459 |
self.client.set_credentials('client3', 'secret') |
460 |
r = self.client.access_token(code_instance.code)
|
461 |
self.assertCount(AuthorizationCode, 0) # assert code is consumed |
462 |
self.assertCount(Token, 1) |
463 |
expected = {'redirect_uri': self.client3_redirect_uri, |
464 |
'scope': self.client3_redirect_uri, |
465 |
'state': None} |
466 |
self.assert_access_token_response(r, expected)
|