root / snf-astakos-app / astakos / oa2 / tests / djangobackend.py @ c985de5c
History | View | Annotate | Download (20.8 kB)
1 |
# Copyright 2013 GRNET S.A. All rights reserved.
|
---|---|
2 |
#
|
3 |
# Redistribution and use in source and binary forms, with or
|
4 |
# without modification, are permitted provided that the following
|
5 |
# conditions are met:
|
6 |
#
|
7 |
# 1. Redistributions of source code must retain the above
|
8 |
# copyright notice, this list of conditions and the following
|
9 |
# disclaimer.
|
10 |
#
|
11 |
# 2. Redistributions in binary form must reproduce the above
|
12 |
# copyright notice, this list of conditions and the following
|
13 |
# disclaimer in the documentation and/or other materials
|
14 |
# provided with the distribution.
|
15 |
#
|
16 |
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
|
17 |
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
|
18 |
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
|
19 |
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
|
20 |
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
21 |
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
22 |
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
|
23 |
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
|
24 |
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
|
25 |
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
|
26 |
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
27 |
# POSSIBILITY OF SUCH DAMAGE.
|
28 |
#
|
29 |
# The views and conclusions contained in the software and
|
30 |
# documentation are those of the authors and should not be
|
31 |
# interpreted as representing official policies, either expressed
|
32 |
# or implied, of GRNET S.A.
|
33 |
|
34 |
import urllib |
35 |
import urlparse |
36 |
import base64 |
37 |
import datetime |
38 |
|
39 |
from collections import namedtuple |
40 |
|
41 |
from django.test import TransactionTestCase as TestCase |
42 |
from django.test import Client as TestClient |
43 |
|
44 |
from django.core.urlresolvers import reverse |
45 |
from django.utils import simplejson as json |
46 |
|
47 |
from astakos.oa2 import settings |
48 |
from astakos.oa2.models import Client, AuthorizationCode, Token |
49 |
from astakos.im.tests import common |
50 |
|
51 |
|
52 |
ParsedURL = namedtuple('ParsedURL', ['host', 'scheme', 'path', 'params', |
53 |
'url'])
|
54 |
|
55 |
|
56 |
def parsed_url_wrapper(func): |
57 |
def wrapper(self, url, *args, **kwargs): |
58 |
url = self.parse_url(url)
|
59 |
return func(self, url, *args, **kwargs) |
60 |
return wrapper
|
61 |
|
62 |
|
63 |
class URLAssertionsMixin(object): |
64 |
|
65 |
def get_redirect_url(self, request): |
66 |
return self.parse_url(request['Location']) |
67 |
|
68 |
def parse_url(self, url): |
69 |
if isinstance(url, ParsedURL): |
70 |
return url
|
71 |
result = urlparse.urlparse(url) |
72 |
parsed = { |
73 |
'url': url,
|
74 |
'host': result.netloc,
|
75 |
'scheme': result.scheme,
|
76 |
'path': result.path,
|
77 |
} |
78 |
parsed['params'] = urlparse.parse_qs(result.query)
|
79 |
return ParsedURL(**parsed)
|
80 |
|
81 |
@parsed_url_wrapper
|
82 |
def assertParamEqual(self, url, key, value): |
83 |
self.assertParam(url, key)
|
84 |
self.assertEqual(url.params[key][0], value) |
85 |
|
86 |
@parsed_url_wrapper
|
87 |
def assertNoParam(self, url, key): |
88 |
self.assertFalse(key in url.params, |
89 |
"Url '%s' does contain '%s' parameter" % (url.url,
|
90 |
key)) |
91 |
|
92 |
@parsed_url_wrapper
|
93 |
def assertParam(self, url, key): |
94 |
self.assertTrue(key in url.params, |
95 |
"Url '%s' does not contain '%s' parameter" % (url.url,
|
96 |
key)) |
97 |
|
98 |
@parsed_url_wrapper
|
99 |
def assertHost(self, url, host): |
100 |
self.assertEqual(url.host, host)
|
101 |
|
102 |
@parsed_url_wrapper
|
103 |
def assertPath(self, url, path): |
104 |
self.assertEqual(url.path, path)
|
105 |
|
106 |
@parsed_url_wrapper
|
107 |
def assertSecure(self, url, key): |
108 |
self.assertEqual(url.scheme, "https") |
109 |
|
110 |
|
111 |
class OA2Client(TestClient): |
112 |
"""
|
113 |
An OAuth2 agnostic test client.
|
114 |
"""
|
115 |
def __init__(self, baseurl, *args, **kwargs): |
116 |
self.oa2_url = baseurl
|
117 |
self.token_url = self.oa2_url + 'token/' |
118 |
self.auth_url = self.oa2_url + 'auth/' |
119 |
self.credentials = kwargs.pop('credentials', ()) |
120 |
|
121 |
kwargs['wsgi.url_scheme'] = 'https' |
122 |
return super(OA2Client, self).__init__(*args, **kwargs) |
123 |
|
124 |
def request(self, *args, **kwargs): |
125 |
#print kwargs.get('PATH_INFO') + '?' + kwargs.get('QUERY_STRING'), \
|
126 |
#kwargs.get('HTTP_AUTHORIZATION', None)
|
127 |
return super(OA2Client, self).request(*args, **kwargs) |
128 |
|
129 |
def get_url(self, token_or_auth, **params): |
130 |
return token_or_auth + '?' + urllib.urlencode(params) |
131 |
|
132 |
def grant(self, clientid, *args, **kwargs): |
133 |
"""
|
134 |
Do an authorization grant request.
|
135 |
"""
|
136 |
params = { |
137 |
'grant_type': 'authorization_code', |
138 |
'client_id': clientid
|
139 |
} |
140 |
urlparams = kwargs.pop('urlparams', {})
|
141 |
params.update(urlparams) |
142 |
self.set_auth_headers(kwargs)
|
143 |
return self.get(self.get_url(self.token_url, **params), *args, |
144 |
**kwargs) |
145 |
|
146 |
def authorize_code(self, clientid, *args, **kwargs): |
147 |
"""
|
148 |
Do an authorization code request.
|
149 |
"""
|
150 |
params = { |
151 |
'response_type': 'code', |
152 |
'client_id': clientid
|
153 |
} |
154 |
urlparams = kwargs.pop('urlparams', {})
|
155 |
urlparams.update(kwargs.pop('extraparams', {}))
|
156 |
params.update(urlparams) |
157 |
self.set_auth_headers(kwargs)
|
158 |
if 'reject' in params: |
159 |
return self.post(self.get_url(self.auth_url), data=params, |
160 |
**kwargs) |
161 |
return self.get(self.get_url(self.auth_url, **params), *args, **kwargs) |
162 |
|
163 |
def access_token(self, code, |
164 |
content_type='application/x-www-form-urlencoded',
|
165 |
**kwargs): |
166 |
"""
|
167 |
Do an get token request.
|
168 |
"""
|
169 |
params = { |
170 |
'grant_type': 'authorization_code', |
171 |
'code': code
|
172 |
} |
173 |
params.update(kwargs) |
174 |
self.set_auth_headers(kwargs)
|
175 |
return self.post(self.token_url, data=urllib.urlencode(params), |
176 |
content_type=content_type, **kwargs) |
177 |
|
178 |
def set_auth_headers(self, params): |
179 |
if not self.credentials: |
180 |
return
|
181 |
credentials = base64.encodestring('%s:%s' % self.credentials).strip() |
182 |
params['HTTP_AUTHORIZATION'] = 'Basic %s' % credentials |
183 |
return params
|
184 |
|
185 |
def set_credentials(self, user=None, pwd=None): |
186 |
self.credentials = (user, pwd)
|
187 |
if not user and not pwd: |
188 |
self.credentials = ()
|
189 |
|
190 |
|
191 |
class TestOA2(TestCase, URLAssertionsMixin): |
192 |
|
193 |
def assertCount(self, model, count): |
194 |
self.assertEqual(model.objects.count(), count)
|
195 |
|
196 |
def assert_access_token_response(self, r, expected): |
197 |
self.assertEqual(r.status_code, 200) |
198 |
try:
|
199 |
data = json.loads(r.content) |
200 |
except:
|
201 |
self.fail("Unexpected response content") |
202 |
|
203 |
self.assertTrue('access_token' in data) |
204 |
access_token = data['access_token']
|
205 |
self.assertTrue('token_type' in data) |
206 |
token_type = data['token_type']
|
207 |
self.assertTrue('expires_in' in data) |
208 |
expires_in = data['expires_in']
|
209 |
|
210 |
try:
|
211 |
token = Token.objects.get(code=access_token) |
212 |
self.assertEqual(token.expires_at,
|
213 |
token.created_at + |
214 |
datetime.timedelta(seconds=expires_in)) |
215 |
self.assertEqual(token.token_type, token_type)
|
216 |
self.assertEqual(token.grant_type, 'authorization_code') |
217 |
#self.assertEqual(token.user, expected.get('user'))
|
218 |
self.assertEqual(token.redirect_uri, expected.get('redirect_uri')) |
219 |
self.assertEqual(token.scope, expected.get('scope')) |
220 |
self.assertEqual(token.state, expected.get('state')) |
221 |
except Token.DoesNotExist:
|
222 |
self.fail("Invalid access_token") |
223 |
|
224 |
def setUp(self): |
225 |
baseurl = reverse('oauth2_authenticate').replace('/auth', '/') |
226 |
self.client = OA2Client(baseurl)
|
227 |
client1 = Client.objects.create(identifier="client1", secret="secret") |
228 |
self.client1_redirect_uri = "https://server.com/handle_code" |
229 |
client1.redirecturl_set.create(url=self.client1_redirect_uri)
|
230 |
|
231 |
client2 = Client.objects.create(identifier="client2", type='public') |
232 |
self.client2_redirect_uri = "https://server2.com/handle_code" |
233 |
client2.redirecturl_set.create(url=self.client2_redirect_uri)
|
234 |
|
235 |
client3 = Client.objects.create(identifier="client3", secret='secret', |
236 |
is_trusted=True)
|
237 |
self.client3_redirect_uri = "https://server3.com/handle_code" |
238 |
client3.redirecturl_set.create(url=self.client3_redirect_uri)
|
239 |
|
240 |
common.get_local_user("user@synnefo.org", password="password") |
241 |
|
242 |
def test_code_authorization(self): |
243 |
# missing response_type
|
244 |
r = self.client.get(self.client.get_url(self.client.auth_url)) |
245 |
self.assertEqual(r.status_code, 400) |
246 |
self.assertCount(AuthorizationCode, 0) |
247 |
|
248 |
# invalid response_type
|
249 |
r = self.client.get(self.client.get_url(self.client.auth_url, |
250 |
response_type='invalid'))
|
251 |
self.assertEqual(r.status_code, 400) |
252 |
self.assertCount(AuthorizationCode, 0) |
253 |
|
254 |
# unsupported response_type
|
255 |
r = self.client.get(self.client.get_url(self.client.auth_url, |
256 |
response_type='token'))
|
257 |
self.assertEqual(r.status_code, 400) |
258 |
self.assertCount(AuthorizationCode, 0) |
259 |
|
260 |
# missing client_id
|
261 |
r = self.client.get(self.client.get_url(self.client.auth_url, |
262 |
response_type='code'))
|
263 |
self.assertEqual(r.status_code, 400) |
264 |
self.assertCount(AuthorizationCode, 0) |
265 |
|
266 |
# fake client
|
267 |
r = self.client.authorize_code('client-fake') |
268 |
self.assertEqual(r.status_code, 400) |
269 |
self.assertCount(AuthorizationCode, 0) |
270 |
|
271 |
# mixed up credentials/client_id's
|
272 |
self.client.set_credentials('client1', 'secret') |
273 |
r = self.client.authorize_code('client2') |
274 |
self.assertEqual(r.status_code, 400) |
275 |
self.assertCount(AuthorizationCode, 0) |
276 |
|
277 |
# invalid credentials
|
278 |
self.client.set_credentials('client2', '') |
279 |
r = self.client.authorize_code('client2') |
280 |
self.assertEqual(r.status_code, 400) |
281 |
self.assertCount(AuthorizationCode, 0) |
282 |
|
283 |
# invalid redirect_uri: not absolute URI
|
284 |
self.client.set_credentials()
|
285 |
params = {'redirect_uri':
|
286 |
urlparse.urlparse(self.client1_redirect_uri).path}
|
287 |
r = self.client.authorize_code('client1', urlparams=params) |
288 |
self.assertEqual(r.status_code, 400) |
289 |
self.assertCount(AuthorizationCode, 0) |
290 |
|
291 |
# mismatching redirect uri
|
292 |
self.client.set_credentials()
|
293 |
params = {'redirect_uri': self.client1_redirect_uri[1:]} |
294 |
r = self.client.authorize_code('client1', urlparams=params) |
295 |
self.assertEqual(r.status_code, 400) |
296 |
self.assertCount(AuthorizationCode, 0) |
297 |
|
298 |
# valid request: untrusted client
|
299 |
params = {'redirect_uri': self.client1_redirect_uri, |
300 |
'scope': self.client1_redirect_uri, |
301 |
'extra_param': '123'} |
302 |
self.client.set_credentials('client1', 'secret') |
303 |
r = self.client.authorize_code('client1', urlparams=params) |
304 |
self.assertEqual(r.status_code, 302) |
305 |
self.assertTrue('Location' in r) |
306 |
self.assertHost(r['Location'], "testserver:80") |
307 |
self.assertPath(r['Location'], reverse('login')) |
308 |
|
309 |
self.client.set_credentials('client1', 'secret') |
310 |
self.client.login(username="user@synnefo.org", password="password") |
311 |
r = self.client.authorize_code('client1', urlparams=params) |
312 |
self.assertEqual(r.status_code, 200) |
313 |
|
314 |
r = self.client.authorize_code('client1', urlparams=params, |
315 |
extraparams={'reject': 0}) |
316 |
self.assertCount(AuthorizationCode, 1) |
317 |
|
318 |
# redirect is valid
|
319 |
redirect = self.get_redirect_url(r)
|
320 |
self.assertParam(redirect, "code") |
321 |
self.assertNoParam(redirect, "extra_param") |
322 |
self.assertHost(redirect, "server.com") |
323 |
self.assertPath(redirect, "/handle_code") |
324 |
|
325 |
code = AuthorizationCode.objects.get(code=redirect.params['code'][0]) |
326 |
#self.assertEqual(code.state, '')
|
327 |
self.assertEqual(code.state, None) |
328 |
self.assertEqual(code.redirect_uri, self.client1_redirect_uri) |
329 |
|
330 |
params['state'] = 'csrfstate' |
331 |
params['scope'] = 'resource1' |
332 |
r = self.client.authorize_code('client1', urlparams=params) |
333 |
redirect = self.get_redirect_url(r)
|
334 |
self.assertParamEqual(redirect, "state", 'csrfstate') |
335 |
self.assertCount(AuthorizationCode, 2) |
336 |
|
337 |
code = AuthorizationCode.objects.get(code=redirect.params['code'][0]) |
338 |
self.assertEqual(code.state, 'csrfstate') |
339 |
self.assertEqual(code.redirect_uri, self.client1_redirect_uri) |
340 |
|
341 |
# valid request: trusted client
|
342 |
params = {'redirect_uri': self.client3_redirect_uri, |
343 |
'scope': self.client3_redirect_uri, |
344 |
'extra_param': '123'} |
345 |
self.client.set_credentials('client3', 'secret') |
346 |
r = self.client.authorize_code('client3', urlparams=params) |
347 |
self.assertEqual(r.status_code, 302) |
348 |
self.assertCount(AuthorizationCode, 3) |
349 |
|
350 |
# redirect is valid
|
351 |
redirect = self.get_redirect_url(r)
|
352 |
self.assertParam(redirect, "code") |
353 |
self.assertNoParam(redirect, "state") |
354 |
self.assertNoParam(redirect, "extra_param") |
355 |
self.assertHost(redirect, "server3.com") |
356 |
self.assertPath(redirect, "/handle_code") |
357 |
|
358 |
code = AuthorizationCode.objects.get(code=redirect.params['code'][0]) |
359 |
self.assertEqual(code.state, None) |
360 |
self.assertEqual(code.redirect_uri, self.client3_redirect_uri) |
361 |
|
362 |
# valid request: trusted client
|
363 |
params['state'] = 'csrfstate' |
364 |
self.client.set_credentials('client3', 'secret') |
365 |
r = self.client.authorize_code('client3', urlparams=params) |
366 |
self.assertEqual(r.status_code, 302) |
367 |
self.assertCount(AuthorizationCode, 4) |
368 |
|
369 |
# redirect is valid
|
370 |
redirect = self.get_redirect_url(r)
|
371 |
self.assertParam(redirect, "code") |
372 |
self.assertParamEqual(redirect, "state", 'csrfstate') |
373 |
self.assertNoParam(redirect, "extra_param") |
374 |
self.assertHost(redirect, "server3.com") |
375 |
self.assertPath(redirect, "/handle_code") |
376 |
|
377 |
code = AuthorizationCode.objects.get(code=redirect.params['code'][0]) |
378 |
self.assertEqual(code.state, 'csrfstate') |
379 |
self.assertEqual(code.redirect_uri, self.client3_redirect_uri) |
380 |
|
381 |
# redirect uri startswith the client's registered redirect url
|
382 |
params['redirect_uri'] = '%smore' % self.client3_redirect_uri |
383 |
self.client.set_credentials('client3', 'secret') |
384 |
r = self.client.authorize_code('client3', urlparams=params) |
385 |
self.assertEqual(r.status_code, 400) |
386 |
|
387 |
# redirect uri descendant
|
388 |
redirect_uri = '%s/' % self.client3_redirect_uri |
389 |
rest = settings.MAXIMUM_ALLOWED_REDIRECT_URI_LENGTH - len(redirect_uri)
|
390 |
redirect_uri = '%s%s' % (redirect_uri, 'a'*rest) |
391 |
params['redirect_uri'] = redirect_uri
|
392 |
self.client.set_credentials('client3', 'secret') |
393 |
r = self.client.authorize_code('client3', urlparams=params) |
394 |
self.assertEqual(r.status_code, 302) |
395 |
self.assertCount(AuthorizationCode, 5) |
396 |
|
397 |
# redirect is valid
|
398 |
redirect = self.get_redirect_url(r)
|
399 |
self.assertParam(redirect, "code") |
400 |
self.assertParamEqual(redirect, "state", 'csrfstate') |
401 |
self.assertNoParam(redirect, "extra_param") |
402 |
self.assertHost(redirect, "server3.com") |
403 |
self.assertPath(redirect, urlparse.urlparse(redirect_uri).path)
|
404 |
|
405 |
code = AuthorizationCode.objects.get(code=redirect.params['code'][0]) |
406 |
self.assertEqual(code.state, 'csrfstate') |
407 |
self.assertEqual(code.redirect_uri, redirect_uri)
|
408 |
|
409 |
# too long redirect uri
|
410 |
params['redirect_uri'] = '%sa' % redirect_uri |
411 |
self.client.set_credentials('client3', 'secret') |
412 |
r = self.client.authorize_code('client3', urlparams=params) |
413 |
self.assertEqual(r.status_code, 400) |
414 |
|
415 |
def test_get_token(self): |
416 |
# invalid method
|
417 |
r = self.client.get(self.client.token_url) |
418 |
self.assertEqual(r.status_code, 405) |
419 |
self.assertTrue('Allow' in r) |
420 |
self.assertEqual(r['Allow'], 'POST') |
421 |
|
422 |
# invalid content type
|
423 |
r = self.client.post(self.client.token_url) |
424 |
self.assertEqual(r.status_code, 400) |
425 |
|
426 |
# missing grant type
|
427 |
r = self.client.post(self.client.token_url, |
428 |
content_type='application/x-www-form-urlencoded')
|
429 |
self.assertEqual(r.status_code, 400) |
430 |
|
431 |
# unsupported grant type: client_credentials
|
432 |
r = self.client.post(self.client.token_url, |
433 |
data='grant_type=client_credentials',
|
434 |
content_type='application/x-www-form-urlencoded')
|
435 |
self.assertEqual(r.status_code, 400) |
436 |
|
437 |
# unsupported grant type: token
|
438 |
r = self.client.post(self.client.token_url, |
439 |
data='grant_type=token',
|
440 |
content_type='application/x-www-form-urlencoded')
|
441 |
self.assertEqual(r.status_code, 400) |
442 |
|
443 |
# invalid grant type
|
444 |
r = self.client.post(self.client.token_url, |
445 |
data='grant_type=invalid',
|
446 |
content_type='application/x-www-form-urlencoded')
|
447 |
self.assertEqual(r.status_code, 400) |
448 |
|
449 |
# generate authorization code: without redirect_uri
|
450 |
self.client.login(username="user@synnefo.org", password="password") |
451 |
r = self.client.authorize_code('client3') |
452 |
self.assertCount(AuthorizationCode, 1) |
453 |
redirect = self.get_redirect_url(r)
|
454 |
code_instance = AuthorizationCode.objects.get( |
455 |
code=redirect.params['code'][0]) |
456 |
|
457 |
# no client_id & no client authorization
|
458 |
r = self.client.access_token(code_instance.code)
|
459 |
self.assertEqual(r.status_code, 400) |
460 |
|
461 |
# invalid client_id
|
462 |
r = self.client.access_token(code_instance.code, client_id='client2') |
463 |
self.assertEqual(r.status_code, 400) |
464 |
|
465 |
# inexistent client_id
|
466 |
r = self.client.access_token(code_instance.code, client_id='client42') |
467 |
self.assertEqual(r.status_code, 400) |
468 |
|
469 |
# no client authorization
|
470 |
r = self.client.access_token(code_instance.code, client_id='client3') |
471 |
self.assertEqual(r.status_code, 400) |
472 |
|
473 |
# mixed up credentials/client_id's
|
474 |
self.client.set_credentials('client1', 'secret') |
475 |
r = self.client.access_token(code_instance.code, client_id='client3') |
476 |
self.assertEqual(r.status_code, 400) |
477 |
|
478 |
# mixed up credentials/client_id's
|
479 |
self.client.set_credentials('client3', 'secret') |
480 |
r = self.client.access_token(code_instance.code, client_id='client1') |
481 |
self.assertEqual(r.status_code, 400) |
482 |
|
483 |
# mismatching client
|
484 |
self.client.set_credentials('client1', 'secret') |
485 |
r = self.client.access_token(code_instance.code, client_id='client1') |
486 |
self.assertEqual(r.status_code, 400) |
487 |
|
488 |
# invalid code
|
489 |
self.client.set_credentials('client3', 'secret') |
490 |
r = self.client.access_token('invalid') |
491 |
self.assertEqual(r.status_code, 400) |
492 |
|
493 |
# valid request
|
494 |
self.client.set_credentials('client3', 'secret') |
495 |
r = self.client.access_token(code_instance.code)
|
496 |
self.assertCount(AuthorizationCode, 0) # assert code is consumed |
497 |
self.assertCount(Token, 1) |
498 |
expected = {'redirect_uri': self.client3_redirect_uri, |
499 |
'scope': self.client3_redirect_uri, |
500 |
'state': None} |
501 |
self.assert_access_token_response(r, expected)
|
502 |
|
503 |
# generate authorization code with too long redirect_uri
|
504 |
redirect_uri = '%s/' % self.client3_redirect_uri |
505 |
rest = settings.MAXIMUM_ALLOWED_REDIRECT_URI_LENGTH - len(redirect_uri)
|
506 |
redirect_uri = '%s%s' % (redirect_uri, 'a'*rest) |
507 |
params = {'redirect_uri': redirect_uri}
|
508 |
r = self.client.authorize_code('client3', urlparams=params) |
509 |
self.assertCount(AuthorizationCode, 1) |
510 |
redirect = self.get_redirect_url(r)
|
511 |
code_instance = AuthorizationCode.objects.get( |
512 |
code=redirect.params['code'][0]) |
513 |
|
514 |
# valid request
|
515 |
self.client.set_credentials('client3', 'secret') |
516 |
r = self.client.access_token(code_instance.code,
|
517 |
redirect_uri='%sa' % redirect_uri)
|
518 |
self.assertEqual(r.status_code, 400) |
519 |
|
520 |
r = self.client.access_token(code_instance.code,
|
521 |
redirect_uri=redirect_uri) |
522 |
self.assertCount(AuthorizationCode, 0) # assert code is consumed |
523 |
self.assertCount(Token, 2) |
524 |
expected = {'redirect_uri': redirect_uri,
|
525 |
'scope': redirect_uri,
|
526 |
'state': None} |
527 |
self.assert_access_token_response(r, expected)
|