root / snf-astakos-app / astakos / oa2 / tests / djangobackend.py @ 68122bae
History | View | Annotate | Download (20.9 kB)
1 |
# -*- coding: utf8 -*-
|
---|---|
2 |
# Copyright 2013 GRNET S.A. All rights reserved.
|
3 |
#
|
4 |
# Redistribution and use in source and binary forms, with or
|
5 |
# without modification, are permitted provided that the following
|
6 |
# conditions are met:
|
7 |
#
|
8 |
# 1. Redistributions of source code must retain the above
|
9 |
# copyright notice, this list of conditions and the following
|
10 |
# disclaimer.
|
11 |
#
|
12 |
# 2. Redistributions in binary form must reproduce the above
|
13 |
# copyright notice, this list of conditions and the following
|
14 |
# disclaimer in the documentation and/or other materials
|
15 |
# provided with the distribution.
|
16 |
#
|
17 |
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
|
18 |
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
|
19 |
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
|
20 |
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
|
21 |
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
22 |
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
23 |
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
|
24 |
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
|
25 |
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
|
26 |
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
|
27 |
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
28 |
# POSSIBILITY OF SUCH DAMAGE.
|
29 |
#
|
30 |
# The views and conclusions contained in the software and
|
31 |
# documentation are those of the authors and should not be
|
32 |
# interpreted as representing official policies, either expressed
|
33 |
# or implied, of GRNET S.A.
|
34 |
|
35 |
import urllib |
36 |
import urlparse |
37 |
import base64 |
38 |
import datetime |
39 |
|
40 |
from collections import namedtuple |
41 |
|
42 |
from django.test import TransactionTestCase as TestCase |
43 |
from django.test import Client as TestClient |
44 |
|
45 |
from django.core.urlresolvers import reverse |
46 |
from django.utils import simplejson as json |
47 |
|
48 |
from astakos.oa2 import settings |
49 |
from astakos.oa2.models import Client, AuthorizationCode, Token |
50 |
from astakos.im.tests import common |
51 |
|
52 |
|
53 |
ParsedURL = namedtuple('ParsedURL', ['host', 'scheme', 'path', 'params', |
54 |
'url'])
|
55 |
|
56 |
|
57 |
def parsed_url_wrapper(func): |
58 |
def wrapper(self, url, *args, **kwargs): |
59 |
url = self.parse_url(url)
|
60 |
return func(self, url, *args, **kwargs) |
61 |
return wrapper
|
62 |
|
63 |
|
64 |
class URLAssertionsMixin(object): |
65 |
|
66 |
def get_redirect_url(self, request): |
67 |
return self.parse_url(request['Location']) |
68 |
|
69 |
def parse_url(self, url): |
70 |
if isinstance(url, ParsedURL): |
71 |
return url
|
72 |
result = urlparse.urlparse(url) |
73 |
parsed = { |
74 |
'url': url,
|
75 |
'host': result.netloc,
|
76 |
'scheme': result.scheme,
|
77 |
'path': result.path,
|
78 |
} |
79 |
parsed['params'] = urlparse.parse_qs(result.query)
|
80 |
return ParsedURL(**parsed)
|
81 |
|
82 |
@parsed_url_wrapper
|
83 |
def assertParamEqual(self, url, key, value): |
84 |
self.assertParam(url, key)
|
85 |
self.assertEqual(url.params[key][0], value) |
86 |
|
87 |
@parsed_url_wrapper
|
88 |
def assertNoParam(self, url, key): |
89 |
self.assertFalse(key in url.params, |
90 |
"Url '%s' does contain '%s' parameter" % (url.url,
|
91 |
key)) |
92 |
|
93 |
@parsed_url_wrapper
|
94 |
def assertParam(self, url, key): |
95 |
self.assertTrue(key in url.params, |
96 |
"Url '%s' does not contain '%s' parameter" % (url.url,
|
97 |
key)) |
98 |
|
99 |
@parsed_url_wrapper
|
100 |
def assertHost(self, url, host): |
101 |
self.assertEqual(url.host, host)
|
102 |
|
103 |
@parsed_url_wrapper
|
104 |
def assertPath(self, url, path): |
105 |
self.assertEqual(url.path, path)
|
106 |
|
107 |
@parsed_url_wrapper
|
108 |
def assertSecure(self, url, key): |
109 |
self.assertEqual(url.scheme, "https") |
110 |
|
111 |
|
112 |
class OA2Client(TestClient): |
113 |
"""
|
114 |
An OAuth2 agnostic test client.
|
115 |
"""
|
116 |
def __init__(self, baseurl, *args, **kwargs): |
117 |
self.oa2_url = baseurl
|
118 |
self.token_url = self.oa2_url + 'token/' |
119 |
self.auth_url = self.oa2_url + 'auth/' |
120 |
self.credentials = kwargs.pop('credentials', ()) |
121 |
|
122 |
kwargs['wsgi.url_scheme'] = 'https' |
123 |
return super(OA2Client, self).__init__(*args, **kwargs) |
124 |
|
125 |
def request(self, *args, **kwargs): |
126 |
#print kwargs.get('PATH_INFO') + '?' + kwargs.get('QUERY_STRING'), \
|
127 |
#kwargs.get('HTTP_AUTHORIZATION', None)
|
128 |
return super(OA2Client, self).request(*args, **kwargs) |
129 |
|
130 |
def get_url(self, token_or_auth, **params): |
131 |
return token_or_auth + '?' + urllib.urlencode(params) |
132 |
|
133 |
def grant(self, clientid, *args, **kwargs): |
134 |
"""
|
135 |
Do an authorization grant request.
|
136 |
"""
|
137 |
params = { |
138 |
'grant_type': 'authorization_code', |
139 |
'client_id': clientid
|
140 |
} |
141 |
urlparams = kwargs.pop('urlparams', {})
|
142 |
params.update(urlparams) |
143 |
self.set_auth_headers(kwargs)
|
144 |
return self.get(self.get_url(self.token_url, **params), *args, |
145 |
**kwargs) |
146 |
|
147 |
def authorize_code(self, clientid, *args, **kwargs): |
148 |
"""
|
149 |
Do an authorization code request.
|
150 |
"""
|
151 |
params = { |
152 |
'response_type': 'code', |
153 |
'client_id': clientid
|
154 |
} |
155 |
urlparams = kwargs.pop('urlparams', {})
|
156 |
urlparams.update(kwargs.pop('extraparams', {}))
|
157 |
params.update(urlparams) |
158 |
self.set_auth_headers(kwargs)
|
159 |
if 'reject' in params: |
160 |
return self.post(self.get_url(self.auth_url), data=params, |
161 |
**kwargs) |
162 |
return self.get(self.get_url(self.auth_url, **params), *args, **kwargs) |
163 |
|
164 |
def access_token(self, code, |
165 |
content_type='application/x-www-form-urlencoded',
|
166 |
**kwargs): |
167 |
"""
|
168 |
Do an get token request.
|
169 |
"""
|
170 |
params = { |
171 |
'grant_type': 'authorization_code', |
172 |
'code': code
|
173 |
} |
174 |
params.update(kwargs) |
175 |
self.set_auth_headers(kwargs)
|
176 |
return self.post(self.token_url, data=urllib.urlencode(params), |
177 |
content_type=content_type, **kwargs) |
178 |
|
179 |
def set_auth_headers(self, params): |
180 |
if not self.credentials: |
181 |
return
|
182 |
credentials = base64.encodestring('%s:%s' % self.credentials).strip() |
183 |
params['HTTP_AUTHORIZATION'] = 'Basic %s' % credentials |
184 |
return params
|
185 |
|
186 |
def set_credentials(self, user=None, pwd=None): |
187 |
self.credentials = (user, pwd)
|
188 |
if not user and not pwd: |
189 |
self.credentials = ()
|
190 |
|
191 |
|
192 |
class TestOA2(TestCase, URLAssertionsMixin): |
193 |
|
194 |
def assertCount(self, model, count): |
195 |
self.assertEqual(model.objects.count(), count)
|
196 |
|
197 |
def assert_access_token_response(self, r, expected): |
198 |
self.assertEqual(r.status_code, 200) |
199 |
try:
|
200 |
data = json.loads(r.content) |
201 |
except:
|
202 |
self.fail("Unexpected response content") |
203 |
|
204 |
self.assertTrue('access_token' in data) |
205 |
access_token = data['access_token']
|
206 |
self.assertTrue('token_type' in data) |
207 |
token_type = data['token_type']
|
208 |
self.assertTrue('expires_in' in data) |
209 |
expires_in = data['expires_in']
|
210 |
|
211 |
try:
|
212 |
token = Token.objects.get(code=access_token) |
213 |
self.assertEqual(token.expires_at,
|
214 |
token.created_at + |
215 |
datetime.timedelta(seconds=expires_in)) |
216 |
self.assertEqual(token.token_type, token_type)
|
217 |
self.assertEqual(token.grant_type, 'authorization_code') |
218 |
#self.assertEqual(token.user, expected.get('user'))
|
219 |
self.assertEqual(token.redirect_uri, expected.get('redirect_uri')) |
220 |
self.assertEqual(token.scope, expected.get('scope')) |
221 |
self.assertEqual(token.state, expected.get('state')) |
222 |
except Token.DoesNotExist:
|
223 |
self.fail("Invalid access_token") |
224 |
|
225 |
def setUp(self): |
226 |
baseurl = reverse('oauth2_authenticate').replace('/auth', '/') |
227 |
self.client = OA2Client(baseurl)
|
228 |
client1 = Client.objects.create(identifier="client1", secret="secret") |
229 |
self.client1_redirect_uri = "https://server.com/handle_code" |
230 |
client1.redirecturl_set.create(url=self.client1_redirect_uri)
|
231 |
|
232 |
client2 = Client.objects.create(identifier="client2", type='public') |
233 |
self.client2_redirect_uri = "https://server2.com/handle_code" |
234 |
client2.redirecturl_set.create(url=self.client2_redirect_uri)
|
235 |
|
236 |
client3 = Client.objects.create(identifier="client3", secret='secret', |
237 |
is_trusted=True)
|
238 |
self.client3_redirect_uri = "https://server3.com/handle_code" |
239 |
client3.redirecturl_set.create(url=self.client3_redirect_uri)
|
240 |
|
241 |
common.get_local_user("user@synnefo.org", password="password") |
242 |
|
243 |
def test_code_authorization(self): |
244 |
# missing response_type
|
245 |
r = self.client.get(self.client.get_url(self.client.auth_url)) |
246 |
self.assertEqual(r.status_code, 400) |
247 |
self.assertCount(AuthorizationCode, 0) |
248 |
|
249 |
# invalid response_type
|
250 |
r = self.client.get(self.client.get_url(self.client.auth_url, |
251 |
response_type='invalid'))
|
252 |
self.assertEqual(r.status_code, 400) |
253 |
self.assertCount(AuthorizationCode, 0) |
254 |
|
255 |
# unsupported response_type
|
256 |
r = self.client.get(self.client.get_url(self.client.auth_url, |
257 |
response_type='token'))
|
258 |
self.assertEqual(r.status_code, 400) |
259 |
self.assertCount(AuthorizationCode, 0) |
260 |
|
261 |
# missing client_id
|
262 |
r = self.client.get(self.client.get_url(self.client.auth_url, |
263 |
response_type='code'))
|
264 |
self.assertEqual(r.status_code, 400) |
265 |
self.assertCount(AuthorizationCode, 0) |
266 |
|
267 |
# fake client
|
268 |
r = self.client.authorize_code('client-fake') |
269 |
self.assertEqual(r.status_code, 400) |
270 |
self.assertCount(AuthorizationCode, 0) |
271 |
|
272 |
# mixed up credentials/client_id's
|
273 |
self.client.set_credentials('client1', 'secret') |
274 |
r = self.client.authorize_code('client2') |
275 |
self.assertEqual(r.status_code, 400) |
276 |
self.assertCount(AuthorizationCode, 0) |
277 |
|
278 |
# invalid credentials
|
279 |
self.client.set_credentials('client2', '') |
280 |
r = self.client.authorize_code('client2') |
281 |
self.assertEqual(r.status_code, 400) |
282 |
self.assertCount(AuthorizationCode, 0) |
283 |
|
284 |
# invalid redirect_uri: not absolute URI
|
285 |
self.client.set_credentials()
|
286 |
params = {'redirect_uri':
|
287 |
urlparse.urlparse(self.client1_redirect_uri).path}
|
288 |
r = self.client.authorize_code('client1', urlparams=params) |
289 |
self.assertEqual(r.status_code, 400) |
290 |
self.assertCount(AuthorizationCode, 0) |
291 |
|
292 |
# mismatching redirect uri
|
293 |
self.client.set_credentials()
|
294 |
params = {'redirect_uri': self.client1_redirect_uri[1:]} |
295 |
r = self.client.authorize_code('client1', urlparams=params) |
296 |
self.assertEqual(r.status_code, 400) |
297 |
self.assertCount(AuthorizationCode, 0) |
298 |
|
299 |
# valid request: untrusted client
|
300 |
params = {'redirect_uri': self.client1_redirect_uri, |
301 |
'scope': self.client1_redirect_uri, |
302 |
'extra_param': 'γιουνικοντ'} |
303 |
self.client.set_credentials('client1', 'secret') |
304 |
r = self.client.authorize_code('client1', urlparams=params) |
305 |
self.assertEqual(r.status_code, 302) |
306 |
self.assertTrue('Location' in r) |
307 |
self.assertHost(r['Location'], "testserver:80") |
308 |
self.assertPath(r['Location'], reverse('login')) |
309 |
|
310 |
self.client.set_credentials('client1', 'secret') |
311 |
self.client.login(username="user@synnefo.org", password="password") |
312 |
r = self.client.authorize_code('client1', urlparams=params) |
313 |
self.assertEqual(r.status_code, 200) |
314 |
|
315 |
r = self.client.authorize_code('client1', urlparams=params, |
316 |
extraparams={'reject': 0}) |
317 |
self.assertCount(AuthorizationCode, 1) |
318 |
|
319 |
# redirect is valid
|
320 |
redirect = self.get_redirect_url(r)
|
321 |
self.assertParam(redirect, "code") |
322 |
self.assertNoParam(redirect, "extra_param") |
323 |
self.assertHost(redirect, "server.com") |
324 |
self.assertPath(redirect, "/handle_code") |
325 |
|
326 |
code = AuthorizationCode.objects.get(code=redirect.params['code'][0]) |
327 |
#self.assertEqual(code.state, '')
|
328 |
self.assertEqual(code.state, None) |
329 |
self.assertEqual(code.redirect_uri, self.client1_redirect_uri) |
330 |
|
331 |
params['state'] = 'csrfstate' |
332 |
params['scope'] = 'resource1' |
333 |
r = self.client.authorize_code('client1', urlparams=params) |
334 |
redirect = self.get_redirect_url(r)
|
335 |
self.assertParamEqual(redirect, "state", 'csrfstate') |
336 |
self.assertCount(AuthorizationCode, 2) |
337 |
|
338 |
code = AuthorizationCode.objects.get(code=redirect.params['code'][0]) |
339 |
self.assertEqual(code.state, 'csrfstate') |
340 |
self.assertEqual(code.redirect_uri, self.client1_redirect_uri) |
341 |
|
342 |
# valid request: trusted client
|
343 |
params = {'redirect_uri': self.client3_redirect_uri, |
344 |
'scope': self.client3_redirect_uri, |
345 |
'extra_param': '123'} |
346 |
self.client.set_credentials('client3', 'secret') |
347 |
r = self.client.authorize_code('client3', urlparams=params) |
348 |
self.assertEqual(r.status_code, 302) |
349 |
self.assertCount(AuthorizationCode, 3) |
350 |
|
351 |
# redirect is valid
|
352 |
redirect = self.get_redirect_url(r)
|
353 |
self.assertParam(redirect, "code") |
354 |
self.assertNoParam(redirect, "state") |
355 |
self.assertNoParam(redirect, "extra_param") |
356 |
self.assertHost(redirect, "server3.com") |
357 |
self.assertPath(redirect, "/handle_code") |
358 |
|
359 |
code = AuthorizationCode.objects.get(code=redirect.params['code'][0]) |
360 |
self.assertEqual(code.state, None) |
361 |
self.assertEqual(code.redirect_uri, self.client3_redirect_uri) |
362 |
|
363 |
# valid request: trusted client
|
364 |
params['state'] = 'csrfstate' |
365 |
self.client.set_credentials('client3', 'secret') |
366 |
r = self.client.authorize_code('client3', urlparams=params) |
367 |
self.assertEqual(r.status_code, 302) |
368 |
self.assertCount(AuthorizationCode, 4) |
369 |
|
370 |
# redirect is valid
|
371 |
redirect = self.get_redirect_url(r)
|
372 |
self.assertParam(redirect, "code") |
373 |
self.assertParamEqual(redirect, "state", 'csrfstate') |
374 |
self.assertNoParam(redirect, "extra_param") |
375 |
self.assertHost(redirect, "server3.com") |
376 |
self.assertPath(redirect, "/handle_code") |
377 |
|
378 |
code = AuthorizationCode.objects.get(code=redirect.params['code'][0]) |
379 |
self.assertEqual(code.state, 'csrfstate') |
380 |
self.assertEqual(code.redirect_uri, self.client3_redirect_uri) |
381 |
|
382 |
# redirect uri startswith the client's registered redirect url
|
383 |
params['redirect_uri'] = '%smore' % self.client3_redirect_uri |
384 |
self.client.set_credentials('client3', 'secret') |
385 |
r = self.client.authorize_code('client3', urlparams=params) |
386 |
self.assertEqual(r.status_code, 400) |
387 |
|
388 |
# redirect uri descendant
|
389 |
redirect_uri = '%s/' % self.client3_redirect_uri |
390 |
rest = settings.MAXIMUM_ALLOWED_REDIRECT_URI_LENGTH - len(redirect_uri)
|
391 |
redirect_uri = '%s%s' % (redirect_uri, 'a'*rest) |
392 |
params['redirect_uri'] = redirect_uri
|
393 |
self.client.set_credentials('client3', 'secret') |
394 |
r = self.client.authorize_code('client3', urlparams=params) |
395 |
self.assertEqual(r.status_code, 302) |
396 |
self.assertCount(AuthorizationCode, 5) |
397 |
|
398 |
# redirect is valid
|
399 |
redirect = self.get_redirect_url(r)
|
400 |
self.assertParam(redirect, "code") |
401 |
self.assertParamEqual(redirect, "state", 'csrfstate') |
402 |
self.assertNoParam(redirect, "extra_param") |
403 |
self.assertHost(redirect, "server3.com") |
404 |
self.assertPath(redirect, urlparse.urlparse(redirect_uri).path)
|
405 |
|
406 |
code = AuthorizationCode.objects.get(code=redirect.params['code'][0]) |
407 |
self.assertEqual(code.state, 'csrfstate') |
408 |
self.assertEqual(code.redirect_uri, redirect_uri)
|
409 |
|
410 |
# too long redirect uri
|
411 |
params['redirect_uri'] = '%sa' % redirect_uri |
412 |
self.client.set_credentials('client3', 'secret') |
413 |
r = self.client.authorize_code('client3', urlparams=params) |
414 |
self.assertEqual(r.status_code, 400) |
415 |
|
416 |
def test_get_token(self): |
417 |
# invalid method
|
418 |
r = self.client.get(self.client.token_url) |
419 |
self.assertEqual(r.status_code, 405) |
420 |
self.assertTrue('Allow' in r) |
421 |
self.assertEqual(r['Allow'], 'POST') |
422 |
|
423 |
# invalid content type
|
424 |
r = self.client.post(self.client.token_url) |
425 |
self.assertEqual(r.status_code, 400) |
426 |
|
427 |
# missing grant type
|
428 |
r = self.client.post(self.client.token_url, |
429 |
content_type='application/x-www-form-urlencoded')
|
430 |
self.assertEqual(r.status_code, 400) |
431 |
|
432 |
# unsupported grant type: client_credentials
|
433 |
r = self.client.post(self.client.token_url, |
434 |
data='grant_type=client_credentials',
|
435 |
content_type='application/x-www-form-urlencoded')
|
436 |
self.assertEqual(r.status_code, 400) |
437 |
|
438 |
# unsupported grant type: token
|
439 |
r = self.client.post(self.client.token_url, |
440 |
data='grant_type=token',
|
441 |
content_type='application/x-www-form-urlencoded')
|
442 |
self.assertEqual(r.status_code, 400) |
443 |
|
444 |
# invalid grant type
|
445 |
r = self.client.post(self.client.token_url, |
446 |
data='grant_type=invalid',
|
447 |
content_type='application/x-www-form-urlencoded')
|
448 |
self.assertEqual(r.status_code, 400) |
449 |
|
450 |
# generate authorization code: without redirect_uri
|
451 |
self.client.login(username="user@synnefo.org", password="password") |
452 |
r = self.client.authorize_code('client3') |
453 |
self.assertCount(AuthorizationCode, 1) |
454 |
redirect = self.get_redirect_url(r)
|
455 |
code_instance = AuthorizationCode.objects.get( |
456 |
code=redirect.params['code'][0]) |
457 |
|
458 |
# no client_id & no client authorization
|
459 |
r = self.client.access_token(code_instance.code)
|
460 |
self.assertEqual(r.status_code, 400) |
461 |
|
462 |
# invalid client_id
|
463 |
r = self.client.access_token(code_instance.code, client_id='client2') |
464 |
self.assertEqual(r.status_code, 400) |
465 |
|
466 |
# inexistent client_id
|
467 |
r = self.client.access_token(code_instance.code, client_id='client42') |
468 |
self.assertEqual(r.status_code, 400) |
469 |
|
470 |
# no client authorization
|
471 |
r = self.client.access_token(code_instance.code, client_id='client3') |
472 |
self.assertEqual(r.status_code, 400) |
473 |
|
474 |
# mixed up credentials/client_id's
|
475 |
self.client.set_credentials('client1', 'secret') |
476 |
r = self.client.access_token(code_instance.code, client_id='client3') |
477 |
self.assertEqual(r.status_code, 400) |
478 |
|
479 |
# mixed up credentials/client_id's
|
480 |
self.client.set_credentials('client3', 'secret') |
481 |
r = self.client.access_token(code_instance.code, client_id='client1') |
482 |
self.assertEqual(r.status_code, 400) |
483 |
|
484 |
# mismatching client
|
485 |
self.client.set_credentials('client1', 'secret') |
486 |
r = self.client.access_token(code_instance.code, client_id='client1') |
487 |
self.assertEqual(r.status_code, 400) |
488 |
|
489 |
# invalid code
|
490 |
self.client.set_credentials('client3', 'secret') |
491 |
r = self.client.access_token('invalid') |
492 |
self.assertEqual(r.status_code, 400) |
493 |
|
494 |
# valid request
|
495 |
self.client.set_credentials('client3', 'secret') |
496 |
r = self.client.access_token(code_instance.code)
|
497 |
self.assertCount(AuthorizationCode, 0) # assert code is consumed |
498 |
self.assertCount(Token, 1) |
499 |
expected = {'redirect_uri': self.client3_redirect_uri, |
500 |
'scope': self.client3_redirect_uri, |
501 |
'state': None} |
502 |
self.assert_access_token_response(r, expected)
|
503 |
|
504 |
# generate authorization code with too long redirect_uri
|
505 |
redirect_uri = '%s/' % self.client3_redirect_uri |
506 |
rest = settings.MAXIMUM_ALLOWED_REDIRECT_URI_LENGTH - len(redirect_uri)
|
507 |
redirect_uri = '%s%s' % (redirect_uri, 'a'*rest) |
508 |
params = {'redirect_uri': redirect_uri}
|
509 |
r = self.client.authorize_code('client3', urlparams=params) |
510 |
self.assertCount(AuthorizationCode, 1) |
511 |
redirect = self.get_redirect_url(r)
|
512 |
code_instance = AuthorizationCode.objects.get( |
513 |
code=redirect.params['code'][0]) |
514 |
|
515 |
# valid request
|
516 |
self.client.set_credentials('client3', 'secret') |
517 |
r = self.client.access_token(code_instance.code,
|
518 |
redirect_uri='%sa' % redirect_uri)
|
519 |
self.assertEqual(r.status_code, 400) |
520 |
|
521 |
r = self.client.access_token(code_instance.code,
|
522 |
redirect_uri=redirect_uri) |
523 |
self.assertCount(AuthorizationCode, 0) # assert code is consumed |
524 |
self.assertCount(Token, 2) |
525 |
expected = {'redirect_uri': redirect_uri,
|
526 |
'scope': redirect_uri,
|
527 |
'state': None} |
528 |
self.assert_access_token_response(r, expected)
|