root / snf-astakos-app / astakos / oa2 / tests / djangobackend.py @ 0a3ff8a6
History | View | Annotate | Download (23.1 kB)
1 |
# -*- coding: utf8 -*-
|
---|---|
2 |
# Copyright 2013 GRNET S.A. All rights reserved.
|
3 |
#
|
4 |
# Redistribution and use in source and binary forms, with or
|
5 |
# without modification, are permitted provided that the following
|
6 |
# conditions are met:
|
7 |
#
|
8 |
# 1. Redistributions of source code must retain the above
|
9 |
# copyright notice, this list of conditions and the following
|
10 |
# disclaimer.
|
11 |
#
|
12 |
# 2. Redistributions in binary form must reproduce the above
|
13 |
# copyright notice, this list of conditions and the following
|
14 |
# disclaimer in the documentation and/or other materials
|
15 |
# provided with the distribution.
|
16 |
#
|
17 |
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
|
18 |
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
|
19 |
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
|
20 |
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
|
21 |
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
22 |
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
23 |
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
|
24 |
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
|
25 |
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
|
26 |
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
|
27 |
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
28 |
# POSSIBILITY OF SUCH DAMAGE.
|
29 |
#
|
30 |
# The views and conclusions contained in the software and
|
31 |
# documentation are those of the authors and should not be
|
32 |
# interpreted as representing official policies, either expressed
|
33 |
# or implied, of GRNET S.A.
|
34 |
|
35 |
import urllib |
36 |
import urlparse |
37 |
import base64 |
38 |
import datetime |
39 |
|
40 |
from collections import namedtuple |
41 |
from urltools import normalize |
42 |
|
43 |
from django.test import TransactionTestCase as TestCase |
44 |
from django.test import Client as TestClient |
45 |
|
46 |
from django.core.urlresolvers import reverse |
47 |
from django.utils import simplejson as json |
48 |
|
49 |
from astakos.oa2 import settings |
50 |
from astakos.oa2.models import Client, AuthorizationCode, Token |
51 |
from astakos.im.tests import common |
52 |
|
53 |
from synnefo.util.text import uenc |
54 |
|
55 |
|
56 |
ParsedURL = namedtuple('ParsedURL', ['host', 'scheme', 'path', 'params', |
57 |
'url'])
|
58 |
|
59 |
|
60 |
def parsed_url_wrapper(func): |
61 |
def wrapper(self, url, *args, **kwargs): |
62 |
url = self.parse_url(url)
|
63 |
return func(self, url, *args, **kwargs) |
64 |
return wrapper
|
65 |
|
66 |
|
67 |
class URLAssertionsMixin(object): |
68 |
|
69 |
def get_redirect_url(self, request): |
70 |
return self.parse_url(request['Location']) |
71 |
|
72 |
def parse_url(self, url): |
73 |
if isinstance(url, ParsedURL): |
74 |
return url
|
75 |
result = urlparse.urlparse(url) |
76 |
parsed = { |
77 |
'url': url,
|
78 |
'host': result.netloc,
|
79 |
'scheme': result.scheme,
|
80 |
'path': result.path,
|
81 |
} |
82 |
parsed['params'] = urlparse.parse_qs(result.query)
|
83 |
return ParsedURL(**parsed)
|
84 |
|
85 |
@parsed_url_wrapper
|
86 |
def assertParamEqual(self, url, key, value): |
87 |
self.assertParam(url, key)
|
88 |
self.assertEqual(url.params[key][0], value) |
89 |
|
90 |
@parsed_url_wrapper
|
91 |
def assertNoParam(self, url, key): |
92 |
self.assertFalse(key in url.params, |
93 |
"Url '%s' does contain '%s' parameter" % (url.url,
|
94 |
key)) |
95 |
|
96 |
@parsed_url_wrapper
|
97 |
def assertParam(self, url, key): |
98 |
self.assertTrue(key in url.params, |
99 |
"Url '%s' does not contain '%s' parameter" % (url.url,
|
100 |
key)) |
101 |
|
102 |
@parsed_url_wrapper
|
103 |
def assertHost(self, url, host): |
104 |
self.assertEqual(url.host, host)
|
105 |
|
106 |
@parsed_url_wrapper
|
107 |
def assertPath(self, url, path): |
108 |
self.assertEqual(url.path, path)
|
109 |
|
110 |
@parsed_url_wrapper
|
111 |
def assertSecure(self, url, key): |
112 |
self.assertEqual(url.scheme, "https") |
113 |
|
114 |
|
115 |
class OA2Client(TestClient): |
116 |
"""
|
117 |
An OAuth2 agnostic test client.
|
118 |
"""
|
119 |
def __init__(self, baseurl, *args, **kwargs): |
120 |
self.oa2_url = baseurl
|
121 |
self.token_url = self.oa2_url + 'token/' |
122 |
self.auth_url = self.oa2_url + 'auth/' |
123 |
self.credentials = kwargs.pop('credentials', ()) |
124 |
|
125 |
kwargs['wsgi.url_scheme'] = 'https' |
126 |
return super(OA2Client, self).__init__(*args, **kwargs) |
127 |
|
128 |
def request(self, *args, **kwargs): |
129 |
#print kwargs.get('PATH_INFO') + '?' + kwargs.get('QUERY_STRING'), \
|
130 |
#kwargs.get('HTTP_AUTHORIZATION', None)
|
131 |
return super(OA2Client, self).request(*args, **kwargs) |
132 |
|
133 |
def get_url(self, token_or_auth, **params): |
134 |
return token_or_auth + '?' + urllib.urlencode(params) |
135 |
|
136 |
def grant(self, clientid, *args, **kwargs): |
137 |
"""
|
138 |
Do an authorization grant request.
|
139 |
"""
|
140 |
params = { |
141 |
'grant_type': 'authorization_code', |
142 |
'client_id': clientid
|
143 |
} |
144 |
urlparams = kwargs.pop('urlparams', {})
|
145 |
params.update(urlparams) |
146 |
self.set_auth_headers(kwargs)
|
147 |
return self.get(self.get_url(self.token_url, **params), *args, |
148 |
**kwargs) |
149 |
|
150 |
def authorize_code(self, clientid, *args, **kwargs): |
151 |
"""
|
152 |
Do an authorization code request.
|
153 |
"""
|
154 |
params = { |
155 |
'response_type': 'code', |
156 |
'client_id': clientid
|
157 |
} |
158 |
urlparams = kwargs.pop('urlparams', {})
|
159 |
urlparams.update(kwargs.pop('extraparams', {}))
|
160 |
params.update(urlparams) |
161 |
self.set_auth_headers(kwargs)
|
162 |
if 'reject' in params: |
163 |
return self.post(self.get_url(self.auth_url), data=params, |
164 |
**kwargs) |
165 |
return self.get(self.get_url(self.auth_url, **params), *args, **kwargs) |
166 |
|
167 |
def access_token(self, code, |
168 |
content_type='application/x-www-form-urlencoded',
|
169 |
**kwargs): |
170 |
"""
|
171 |
Do an get token request.
|
172 |
"""
|
173 |
params = { |
174 |
'grant_type': 'authorization_code', |
175 |
'code': code
|
176 |
} |
177 |
params.update(kwargs) |
178 |
self.set_auth_headers(kwargs)
|
179 |
return self.post(self.token_url, data=urllib.urlencode(params), |
180 |
content_type=content_type, **kwargs) |
181 |
|
182 |
def set_auth_headers(self, params): |
183 |
if not self.credentials: |
184 |
return
|
185 |
credentials = base64.encodestring('%s:%s' % self.credentials).strip() |
186 |
params['HTTP_AUTHORIZATION'] = 'Basic %s' % credentials |
187 |
return params
|
188 |
|
189 |
def set_credentials(self, user=None, pwd=None): |
190 |
self.credentials = (user, pwd)
|
191 |
if not user and not pwd: |
192 |
self.credentials = ()
|
193 |
|
194 |
|
195 |
class TestOA2(TestCase, URLAssertionsMixin): |
196 |
|
197 |
def assertCount(self, model, count): |
198 |
self.assertEqual(model.objects.count(), count)
|
199 |
|
200 |
def assert_access_token_response(self, r, expected): |
201 |
self.assertEqual(r.status_code, 200) |
202 |
try:
|
203 |
data = json.loads(r.content) |
204 |
except:
|
205 |
self.fail("Unexpected response content") |
206 |
|
207 |
self.assertTrue('access_token' in data) |
208 |
access_token = data['access_token']
|
209 |
self.assertTrue('token_type' in data) |
210 |
token_type = data['token_type']
|
211 |
self.assertTrue('expires_in' in data) |
212 |
expires_in = data['expires_in']
|
213 |
|
214 |
try:
|
215 |
token = Token.objects.get(code=access_token) |
216 |
self.assertEqual(token.expires_at,
|
217 |
token.created_at + |
218 |
datetime.timedelta(seconds=expires_in)) |
219 |
self.assertEqual(token.token_type, token_type)
|
220 |
self.assertEqual(token.grant_type, 'authorization_code') |
221 |
#self.assertEqual(token.user, expected.get('user'))
|
222 |
self.assertEqual(normalize(uenc(token.redirect_uri)),
|
223 |
normalize(uenc(expected.get('redirect_uri'))))
|
224 |
self.assertEqual(normalize(uenc(token.scope)),
|
225 |
normalize(uenc(expected.get('scope'))))
|
226 |
self.assertEqual(token.state, expected.get('state')) |
227 |
except Token.DoesNotExist:
|
228 |
self.fail("Invalid access_token") |
229 |
|
230 |
def setUp(self): |
231 |
baseurl = reverse('oauth2_authenticate').replace('/auth', '/') |
232 |
self.client = OA2Client(baseurl)
|
233 |
client1 = Client.objects.create(identifier="client1", secret="secret") |
234 |
self.client1_redirect_uri = "https://server.com/handle_code" |
235 |
client1.redirecturl_set.create(url=self.client1_redirect_uri)
|
236 |
|
237 |
client2 = Client.objects.create(identifier="client2", type='public') |
238 |
self.client2_redirect_uri = "https://server2.com/handle_code" |
239 |
client2.redirecturl_set.create(url=self.client2_redirect_uri)
|
240 |
|
241 |
client3 = Client.objects.create(identifier="client3", secret='secret', |
242 |
is_trusted=True)
|
243 |
self.client3_redirect_uri = "https://server3.com/handle_code" |
244 |
client3.redirecturl_set.create(url=self.client3_redirect_uri)
|
245 |
|
246 |
common.get_local_user("user@synnefo.org", password="password") |
247 |
|
248 |
def test_code_authorization(self): |
249 |
# missing response_type
|
250 |
r = self.client.get(self.client.get_url(self.client.auth_url)) |
251 |
self.assertEqual(r.status_code, 400) |
252 |
self.assertCount(AuthorizationCode, 0) |
253 |
|
254 |
# invalid response_type
|
255 |
r = self.client.get(self.client.get_url(self.client.auth_url, |
256 |
response_type='invalid'))
|
257 |
self.assertEqual(r.status_code, 400) |
258 |
self.assertCount(AuthorizationCode, 0) |
259 |
|
260 |
# unsupported response_type
|
261 |
r = self.client.get(self.client.get_url(self.client.auth_url, |
262 |
response_type='token'))
|
263 |
self.assertEqual(r.status_code, 400) |
264 |
self.assertCount(AuthorizationCode, 0) |
265 |
|
266 |
# missing client_id
|
267 |
r = self.client.get(self.client.get_url(self.client.auth_url, |
268 |
response_type='code'))
|
269 |
self.assertEqual(r.status_code, 400) |
270 |
self.assertCount(AuthorizationCode, 0) |
271 |
|
272 |
# fake client
|
273 |
r = self.client.authorize_code('client-fake') |
274 |
self.assertEqual(r.status_code, 400) |
275 |
self.assertCount(AuthorizationCode, 0) |
276 |
|
277 |
# mixed up credentials/client_id's
|
278 |
self.client.set_credentials('client1', 'secret') |
279 |
r = self.client.authorize_code('client3') |
280 |
self.assertEqual(r.status_code, 400) |
281 |
self.assertCount(AuthorizationCode, 0) |
282 |
|
283 |
# invalid credentials
|
284 |
self.client.set_credentials('client2', '') |
285 |
r = self.client.authorize_code('client2') |
286 |
self.assertEqual(r.status_code, 400) |
287 |
self.assertCount(AuthorizationCode, 0) |
288 |
|
289 |
# invalid redirect_uri: not absolute URI
|
290 |
self.client.set_credentials()
|
291 |
params = {'redirect_uri':
|
292 |
urlparse.urlparse(self.client1_redirect_uri).path}
|
293 |
r = self.client.authorize_code('client1', urlparams=params) |
294 |
self.assertEqual(r.status_code, 400) |
295 |
self.assertCount(AuthorizationCode, 0) |
296 |
|
297 |
# mismatching redirect uri
|
298 |
self.client.set_credentials()
|
299 |
params = {'redirect_uri': self.client1_redirect_uri[1:]} |
300 |
r = self.client.authorize_code('client1', urlparams=params) |
301 |
self.assertEqual(r.status_code, 400) |
302 |
self.assertCount(AuthorizationCode, 0) |
303 |
|
304 |
# valid request: untrusted client
|
305 |
params = {'redirect_uri': self.client1_redirect_uri, |
306 |
'scope': self.client1_redirect_uri, |
307 |
'extra_param': 'γιουνικοντ'} |
308 |
self.client.set_credentials('client1', 'secret') |
309 |
r = self.client.authorize_code('client1', urlparams=params) |
310 |
self.assertEqual(r.status_code, 302) |
311 |
self.assertTrue('Location' in r) |
312 |
self.assertHost(r['Location'], "testserver:80") |
313 |
self.assertPath(r['Location'], reverse('login')) |
314 |
|
315 |
self.client.set_credentials('client1', 'secret') |
316 |
self.client.login(username="user@synnefo.org", password="password") |
317 |
r = self.client.authorize_code('client1', urlparams=params) |
318 |
self.assertEqual(r.status_code, 200) |
319 |
|
320 |
r = self.client.authorize_code('client1', urlparams=params, |
321 |
extraparams={'reject': 0}) |
322 |
self.assertCount(AuthorizationCode, 1) |
323 |
|
324 |
# redirect is valid
|
325 |
redirect = self.get_redirect_url(r)
|
326 |
self.assertParam(redirect, "code") |
327 |
self.assertNoParam(redirect, "extra_param") |
328 |
self.assertHost(redirect, "server.com") |
329 |
self.assertPath(redirect, "/handle_code") |
330 |
|
331 |
code = AuthorizationCode.objects.get(code=redirect.params['code'][0]) |
332 |
#self.assertEqual(code.state, '')
|
333 |
self.assertEqual(code.state, None) |
334 |
self.assertEqual(code.redirect_uri, self.client1_redirect_uri) |
335 |
|
336 |
params['state'] = 'csrfstate' |
337 |
params['scope'] = 'resource1' |
338 |
r = self.client.authorize_code('client1', urlparams=params) |
339 |
redirect = self.get_redirect_url(r)
|
340 |
self.assertParamEqual(redirect, "state", 'csrfstate') |
341 |
self.assertCount(AuthorizationCode, 2) |
342 |
|
343 |
code = AuthorizationCode.objects.get(code=redirect.params['code'][0]) |
344 |
self.assertEqual(code.state, 'csrfstate') |
345 |
self.assertEqual(code.redirect_uri, self.client1_redirect_uri) |
346 |
|
347 |
# valid request: trusted client
|
348 |
params = {'redirect_uri': self.client3_redirect_uri, |
349 |
'scope': self.client3_redirect_uri, |
350 |
'extra_param': 'γιουνικοντ'} |
351 |
self.client.set_credentials('client3', 'secret') |
352 |
r = self.client.authorize_code('client3', urlparams=params) |
353 |
self.assertEqual(r.status_code, 302) |
354 |
self.assertCount(AuthorizationCode, 3) |
355 |
|
356 |
# redirect is valid
|
357 |
redirect = self.get_redirect_url(r)
|
358 |
self.assertParam(redirect, "code") |
359 |
self.assertNoParam(redirect, "state") |
360 |
self.assertNoParam(redirect, "extra_param") |
361 |
self.assertHost(redirect, "server3.com") |
362 |
self.assertPath(redirect, "/handle_code") |
363 |
|
364 |
code = AuthorizationCode.objects.get(code=redirect.params['code'][0]) |
365 |
self.assertEqual(code.state, None) |
366 |
self.assertEqual(code.redirect_uri, self.client3_redirect_uri) |
367 |
|
368 |
# valid request: trusted client
|
369 |
params['state'] = 'csrfstate' |
370 |
self.client.set_credentials('client3', 'secret') |
371 |
r = self.client.authorize_code('client3', urlparams=params) |
372 |
self.assertEqual(r.status_code, 302) |
373 |
self.assertCount(AuthorizationCode, 4) |
374 |
|
375 |
# redirect is valid
|
376 |
redirect = self.get_redirect_url(r)
|
377 |
self.assertParam(redirect, "code") |
378 |
self.assertParamEqual(redirect, "state", 'csrfstate') |
379 |
self.assertNoParam(redirect, "extra_param") |
380 |
self.assertHost(redirect, "server3.com") |
381 |
self.assertPath(redirect, "/handle_code") |
382 |
|
383 |
code = AuthorizationCode.objects.get(code=redirect.params['code'][0]) |
384 |
self.assertEqual(code.state, 'csrfstate') |
385 |
self.assertEqual(code.redirect_uri, self.client3_redirect_uri) |
386 |
|
387 |
# redirect uri startswith the client's registered redirect url
|
388 |
params['redirect_uri'] = '%smore' % self.client3_redirect_uri |
389 |
self.client.set_credentials('client3', 'secret') |
390 |
r = self.client.authorize_code('client3', urlparams=params) |
391 |
self.assertEqual(r.status_code, 400) |
392 |
|
393 |
# redirect uri descendant
|
394 |
redirect_uri = '%s/' % self.client3_redirect_uri |
395 |
rest = settings.MAXIMUM_ALLOWED_REDIRECT_URI_LENGTH - len(redirect_uri)
|
396 |
redirect_uri = '%s%s' % (redirect_uri, 'a'*rest) |
397 |
params['redirect_uri'] = redirect_uri
|
398 |
self.client.set_credentials('client3', 'secret') |
399 |
r = self.client.authorize_code('client3', urlparams=params) |
400 |
self.assertEqual(r.status_code, 302) |
401 |
self.assertCount(AuthorizationCode, 5) |
402 |
|
403 |
# redirect is valid
|
404 |
redirect = self.get_redirect_url(r)
|
405 |
self.assertParam(redirect, "code") |
406 |
self.assertParamEqual(redirect, "state", 'csrfstate') |
407 |
self.assertNoParam(redirect, "extra_param") |
408 |
self.assertHost(redirect, "server3.com") |
409 |
self.assertPath(redirect, urlparse.urlparse(redirect_uri).path)
|
410 |
|
411 |
code = AuthorizationCode.objects.get(code=redirect.params['code'][0]) |
412 |
self.assertEqual(code.state, 'csrfstate') |
413 |
self.assertEqual(code.redirect_uri, redirect_uri)
|
414 |
|
415 |
# too long redirect uri
|
416 |
params['redirect_uri'] = '%sa' % redirect_uri |
417 |
self.client.set_credentials('client3', 'secret') |
418 |
r = self.client.authorize_code('client3', urlparams=params) |
419 |
self.assertEqual(r.status_code, 400) |
420 |
|
421 |
# redirect uri descendant
|
422 |
redirect_uri = '%s/more?α=γιουνικοντ' % self.client3_redirect_uri |
423 |
params['redirect_uri'] = redirect_uri
|
424 |
self.client.set_credentials('client3', 'secret') |
425 |
r = self.client.authorize_code('client3', urlparams=params) |
426 |
self.assertEqual(r.status_code, 302) |
427 |
self.assertCount(AuthorizationCode, 6) |
428 |
|
429 |
# redirect is valid
|
430 |
redirect = self.get_redirect_url(r)
|
431 |
self.assertParam(redirect, "code") |
432 |
self.assertParamEqual(redirect, "state", 'csrfstate') |
433 |
self.assertNoParam(redirect, "extra_param") |
434 |
self.assertHost(redirect, "server3.com") |
435 |
self.assertPath(redirect, urlparse.urlparse(redirect_uri).path)
|
436 |
|
437 |
code = AuthorizationCode.objects.get(code=redirect.params['code'][0]) |
438 |
self.assertEqual(code.state, 'csrfstate') |
439 |
self.assertEqual(normalize(uenc(code.redirect_uri)),
|
440 |
normalize(uenc(redirect_uri))) |
441 |
|
442 |
def test_get_token(self): |
443 |
# invalid method
|
444 |
r = self.client.get(self.client.token_url) |
445 |
self.assertEqual(r.status_code, 405) |
446 |
self.assertTrue('Allow' in r) |
447 |
self.assertEqual(r['Allow'], 'POST') |
448 |
|
449 |
# invalid content type
|
450 |
r = self.client.post(self.client.token_url) |
451 |
self.assertEqual(r.status_code, 400) |
452 |
|
453 |
# missing grant type
|
454 |
r = self.client.post(self.client.token_url, |
455 |
content_type='application/x-www-form-urlencoded')
|
456 |
self.assertEqual(r.status_code, 400) |
457 |
|
458 |
# unsupported grant type: client_credentials
|
459 |
r = self.client.post(self.client.token_url, |
460 |
data='grant_type=client_credentials',
|
461 |
content_type='application/x-www-form-urlencoded')
|
462 |
self.assertEqual(r.status_code, 400) |
463 |
|
464 |
# unsupported grant type: token
|
465 |
r = self.client.post(self.client.token_url, |
466 |
data='grant_type=token',
|
467 |
content_type='application/x-www-form-urlencoded')
|
468 |
self.assertEqual(r.status_code, 400) |
469 |
|
470 |
# invalid grant type
|
471 |
r = self.client.post(self.client.token_url, |
472 |
data='grant_type=invalid',
|
473 |
content_type='application/x-www-form-urlencoded')
|
474 |
self.assertEqual(r.status_code, 400) |
475 |
|
476 |
# generate authorization code: without redirect_uri
|
477 |
self.client.login(username="user@synnefo.org", password="password") |
478 |
r = self.client.authorize_code('client3') |
479 |
self.assertCount(AuthorizationCode, 1) |
480 |
redirect = self.get_redirect_url(r)
|
481 |
code_instance = AuthorizationCode.objects.get( |
482 |
code=redirect.params['code'][0]) |
483 |
|
484 |
# no client_id & no client authorization
|
485 |
r = self.client.access_token(code_instance.code)
|
486 |
self.assertEqual(r.status_code, 400) |
487 |
|
488 |
# invalid client_id
|
489 |
r = self.client.access_token(code_instance.code, client_id='client2') |
490 |
self.assertEqual(r.status_code, 400) |
491 |
|
492 |
# inexistent client_id
|
493 |
r = self.client.access_token(code_instance.code, client_id='client42') |
494 |
self.assertEqual(r.status_code, 400) |
495 |
|
496 |
# no client authorization
|
497 |
r = self.client.access_token(code_instance.code, client_id='client3') |
498 |
self.assertEqual(r.status_code, 400) |
499 |
|
500 |
# mixed up credentials/client_id's
|
501 |
self.client.set_credentials('client1', 'secret') |
502 |
r = self.client.access_token(code_instance.code, client_id='client3') |
503 |
self.assertEqual(r.status_code, 400) |
504 |
|
505 |
# mixed up credentials/client_id's
|
506 |
self.client.set_credentials('client3', 'secret') |
507 |
r = self.client.access_token(code_instance.code, client_id='client1') |
508 |
self.assertEqual(r.status_code, 400) |
509 |
|
510 |
# mismatching client
|
511 |
self.client.set_credentials('client1', 'secret') |
512 |
r = self.client.access_token(code_instance.code, client_id='client1') |
513 |
self.assertEqual(r.status_code, 400) |
514 |
|
515 |
# invalid code
|
516 |
self.client.set_credentials('client3', 'secret') |
517 |
r = self.client.access_token('invalid') |
518 |
self.assertEqual(r.status_code, 400) |
519 |
|
520 |
# valid request
|
521 |
self.client.set_credentials('client3', 'secret') |
522 |
r = self.client.access_token(code_instance.code)
|
523 |
self.assertCount(AuthorizationCode, 0) # assert code is consumed |
524 |
self.assertCount(Token, 1) |
525 |
expected = {'redirect_uri': self.client3_redirect_uri, |
526 |
'scope': self.client3_redirect_uri, |
527 |
'state': None} |
528 |
self.assert_access_token_response(r, expected)
|
529 |
|
530 |
# generate authorization code with too long redirect_uri
|
531 |
redirect_uri = '%s/' % self.client3_redirect_uri |
532 |
rest = settings.MAXIMUM_ALLOWED_REDIRECT_URI_LENGTH - len(redirect_uri)
|
533 |
redirect_uri = '%s%s' % (redirect_uri, 'a'*rest) |
534 |
params = {'redirect_uri': redirect_uri}
|
535 |
r = self.client.authorize_code('client3', urlparams=params) |
536 |
self.assertCount(AuthorizationCode, 1) |
537 |
redirect = self.get_redirect_url(r)
|
538 |
code_instance = AuthorizationCode.objects.get( |
539 |
code=redirect.params['code'][0]) |
540 |
|
541 |
# valid request
|
542 |
self.client.set_credentials('client3', 'secret') |
543 |
r = self.client.access_token(code_instance.code,
|
544 |
redirect_uri='%sa' % redirect_uri)
|
545 |
self.assertEqual(r.status_code, 400) |
546 |
|
547 |
r = self.client.access_token(code_instance.code,
|
548 |
redirect_uri=redirect_uri) |
549 |
self.assertCount(AuthorizationCode, 0) # assert code is consumed |
550 |
self.assertCount(Token, 2) |
551 |
expected = {'redirect_uri': redirect_uri,
|
552 |
'scope': redirect_uri,
|
553 |
'state': None} |
554 |
self.assert_access_token_response(r, expected)
|
555 |
|
556 |
redirect_uri = '%s/more?α=γιουνικοντ' % self.client3_redirect_uri |
557 |
params = {'redirect_uri': redirect_uri}
|
558 |
r = self.client.authorize_code('client3', urlparams=params) |
559 |
self.assertCount(AuthorizationCode, 1) |
560 |
redirect = self.get_redirect_url(r)
|
561 |
code_instance = AuthorizationCode.objects.get( |
562 |
code=redirect.params['code'][0]) |
563 |
|
564 |
# valid request
|
565 |
self.client.set_credentials('client3', 'secret') |
566 |
r = self.client.access_token(code_instance.code,
|
567 |
redirect_uri='%sa' % redirect_uri)
|
568 |
self.assertEqual(r.status_code, 400) |
569 |
|
570 |
r = self.client.access_token(code_instance.code,
|
571 |
redirect_uri=redirect_uri) |
572 |
self.assertCount(AuthorizationCode, 0) # assert code is consumed |
573 |
self.assertCount(Token, 3) |
574 |
expected = {'redirect_uri': redirect_uri,
|
575 |
'scope': redirect_uri,
|
576 |
'state': None} |
577 |
self.assert_access_token_response(r, expected)
|