Revision c1e4d459 snf-astakos-app/astakos/oa2/tests/djangobackend.py
b/snf-astakos-app/astakos/oa2/tests/djangobackend.py | ||
---|---|---|
1 | 1 |
import urllib |
2 | 2 |
import urlparse |
3 | 3 |
import base64 |
4 |
import datetime |
|
4 | 5 |
|
5 | 6 |
from collections import namedtuple |
6 | 7 |
|
... | ... | |
9 | 10 |
|
10 | 11 |
from django.core.urlresolvers import reverse |
11 | 12 |
from django.contrib.auth.models import User |
13 |
from django.utils import simplejson as json |
|
12 | 14 |
|
13 |
from astakos.oa2.models import Client, AuthorizationCode |
|
15 |
from astakos.oa2.models import Client, AuthorizationCode, Token
|
|
14 | 16 |
|
15 | 17 |
|
16 | 18 |
ParsedURL = namedtuple('ParsedURL', ['host', 'scheme', 'path', 'params', |
... | ... | |
124 | 126 |
**kwargs) |
125 | 127 |
return self.get(self.get_url(self.auth_url, **params), *args, **kwargs) |
126 | 128 |
|
129 |
def access_token(self, code, |
|
130 |
content_type='application/x-www-form-urlencoded', |
|
131 |
**kwargs): |
|
132 |
""" |
|
133 |
Do an get token request. |
|
134 |
""" |
|
135 |
params = { |
|
136 |
'grant_type': 'authorization_code', |
|
137 |
'code': code |
|
138 |
} |
|
139 |
params.update(kwargs) |
|
140 |
self.set_auth_headers(kwargs) |
|
141 |
return self.post(self.token_url, data=urllib.urlencode(params), |
|
142 |
content_type=content_type, **kwargs) |
|
143 |
|
|
127 | 144 |
def set_auth_headers(self, params): |
128 |
print 'self.credentials:', self.credentials |
|
129 | 145 |
if not self.credentials: |
130 | 146 |
return |
131 | 147 |
credentials = base64.encodestring('%s:%s' % self.credentials).strip() |
... | ... | |
143 | 159 |
def assertCount(self, model, count): |
144 | 160 |
self.assertEqual(model.objects.count(), count) |
145 | 161 |
|
162 |
def assert_access_token_response(self, r, expected): |
|
163 |
self.assertEqual(r.status_code, 200) |
|
164 |
try: |
|
165 |
data = json.loads(r.content) |
|
166 |
except: |
|
167 |
self.fail("Unexpected response content") |
|
168 |
|
|
169 |
self.assertTrue('access_token' in data) |
|
170 |
access_token = data['access_token'] |
|
171 |
self.assertTrue('token_type' in data) |
|
172 |
token_type = data['token_type'] |
|
173 |
self.assertTrue('expires_in' in data) |
|
174 |
expires_in = data['expires_in'] |
|
175 |
|
|
176 |
try: |
|
177 |
token = Token.objects.get(code=access_token) |
|
178 |
self.assertEqual(token.expires_at, |
|
179 |
token.created_at + |
|
180 |
datetime.timedelta(seconds=expires_in)) |
|
181 |
self.assertEqual(token.token_type, token_type) |
|
182 |
self.assertEqual(token.grant_type, 'authorization_code') |
|
183 |
#self.assertEqual(token.user, expected.get('user')) |
|
184 |
self.assertEqual(token.redirect_uri, expected.get('redirect_uri')) |
|
185 |
self.assertEqual(token.scope, expected.get('scope')) |
|
186 |
self.assertEqual(token.state, expected.get('state')) |
|
187 |
except Token.DoesNotExist: |
|
188 |
self.fail("Invalid access_token") |
|
189 |
|
|
146 | 190 |
def setUp(self): |
147 | 191 |
baseurl = reverse('oa2_authenticate').replace('/auth', '/') |
148 | 192 |
self.client = OA2Client(baseurl) |
... | ... | |
164 | 208 |
u.save() |
165 | 209 |
|
166 | 210 |
def test_code_authorization(self): |
167 |
r = self.client.authorize_code('client-fake') |
|
211 |
# missing response_type |
|
212 |
r = self.client.get(self.client.get_url(self.client.auth_url)) |
|
213 |
self.assertEqual(r.status_code, 400) |
|
214 |
self.assertCount(AuthorizationCode, 0) |
|
215 |
|
|
216 |
# invalid response_type |
|
217 |
r = self.client.get(self.client.get_url(self.client.auth_url, |
|
218 |
response_type='invalid')) |
|
219 |
self.assertEqual(r.status_code, 400) |
|
220 |
self.assertCount(AuthorizationCode, 0) |
|
221 |
|
|
222 |
# unsupported response_type |
|
223 |
r = self.client.get(self.client.get_url(self.client.auth_url, |
|
224 |
response_type='token')) |
|
225 |
self.assertEqual(r.status_code, 400) |
|
226 |
self.assertCount(AuthorizationCode, 0) |
|
227 |
|
|
228 |
# missing client_id |
|
229 |
r = self.client.get(self.client.get_url(self.client.auth_url, |
|
230 |
response_type='code')) |
|
168 | 231 |
self.assertEqual(r.status_code, 400) |
169 | 232 |
self.assertCount(AuthorizationCode, 0) |
170 | 233 |
|
171 |
# # no auth header, client is confidential
|
|
172 |
# r = self.client.authorize_code('client1')
|
|
173 |
# self.assertEqual(r.status_code, 400)
|
|
174 |
# self.assertCount(AuthorizationCode, 0)
|
|
234 |
# fake client
|
|
235 |
r = self.client.authorize_code('client-fake')
|
|
236 |
self.assertEqual(r.status_code, 400) |
|
237 |
self.assertCount(AuthorizationCode, 0) |
|
175 | 238 |
|
176 | 239 |
# mixed up credentials/client_id's |
177 | 240 |
self.client.set_credentials('client1', 'secret') |
... | ... | |
179 | 242 |
self.assertEqual(r.status_code, 400) |
180 | 243 |
self.assertCount(AuthorizationCode, 0) |
181 | 244 |
|
245 |
# invalid credentials |
|
182 | 246 |
self.client.set_credentials('client2', '') |
183 | 247 |
r = self.client.authorize_code('client2') |
184 | 248 |
self.assertEqual(r.status_code, 400) |
185 | 249 |
self.assertCount(AuthorizationCode, 0) |
186 | 250 |
|
187 |
# self.client.set_credentials() |
|
188 |
# r = self.client.authorize_code('client1') |
|
189 |
# self.assertEqual(r.status_code, 400) |
|
190 |
# self.assertCount(AuthorizationCode, 0) |
|
251 |
# invalid redirect_uri: not absolute URI |
|
252 |
self.client.set_credentials() |
|
253 |
params = {'redirect_uri': |
|
254 |
urlparse.urlparse(self.client1_redirect_uri).path} |
|
255 |
r = self.client.authorize_code('client1', urlparams=params) |
|
256 |
self.assertEqual(r.status_code, 400) |
|
257 |
self.assertCount(AuthorizationCode, 0) |
|
191 | 258 |
|
192 |
# valid request |
|
259 |
# mismatching redirect uri |
|
260 |
self.client.set_credentials() |
|
261 |
params = {'redirect_uri': self.client1_redirect_uri[1:]} |
|
262 |
r = self.client.authorize_code('client1', urlparams=params) |
|
263 |
self.assertEqual(r.status_code, 400) |
|
264 |
self.assertCount(AuthorizationCode, 0) |
|
265 |
|
|
266 |
# valid request: untrusted client |
|
193 | 267 |
params = {'redirect_uri': self.client1_redirect_uri, |
268 |
'scope': self.client1_redirect_uri, |
|
194 | 269 |
'extra_param': '123'} |
195 | 270 |
self.client.set_credentials('client1', 'secret') |
196 | 271 |
r = self.client.authorize_code('client1', urlparams=params) |
197 | 272 |
self.assertEqual(r.status_code, 302) |
198 | 273 |
self.assertTrue('Location' in r) |
199 |
p = urlparse.urlparse(r['Location']) |
|
200 |
self.assertEqual(p.netloc, 'testserver:80') |
|
201 |
self.assertEqual(p.path, reverse('login')) |
|
274 |
self.assertHost(r['Location'], "testserver:80") |
|
275 |
self.assertPath(r['Location'], reverse('login')) |
|
202 | 276 |
|
203 | 277 |
self.client.set_credentials('client1', 'secret') |
204 | 278 |
self.client.login(username="user@synnefo.org", password="password") |
... | ... | |
231 | 305 |
code2 = AuthorizationCode.objects.get(code=redirect2.params['code'][0]) |
232 | 306 |
self.assertEqual(code2.state, 'csrfstate') |
233 | 307 |
self.assertEqual(code2.redirect_uri, self.client1_redirect_uri) |
308 |
|
|
309 |
# valid request: trusted client |
|
310 |
params = {'redirect_uri': self.client3_redirect_uri, |
|
311 |
'scope': self.client3_redirect_uri, |
|
312 |
'extra_param': '123'} |
|
313 |
self.client.set_credentials('client3', 'secret') |
|
314 |
r = self.client.authorize_code('client3', urlparams=params) |
|
315 |
self.assertEqual(r.status_code, 302) |
|
316 |
self.assertCount(AuthorizationCode, 3) |
|
317 |
|
|
318 |
# redirect is valid |
|
319 |
redirect3 = self.get_redirect_url(r) |
|
320 |
self.assertParam(redirect1, "code") |
|
321 |
self.assertNoParam(redirect3, "state") |
|
322 |
self.assertNoParam(redirect3, "extra_param") |
|
323 |
self.assertHost(redirect3, "server3.com") |
|
324 |
self.assertPath(redirect3, "/handle_code") |
|
325 |
|
|
326 |
code3 = AuthorizationCode.objects.get(code=redirect3.params['code'][0]) |
|
327 |
self.assertEqual(code3.state, None) |
|
328 |
self.assertEqual(code3.redirect_uri, self.client3_redirect_uri) |
|
329 |
|
|
330 |
# valid request: trusted client |
|
331 |
params['state'] = 'csrfstate' |
|
332 |
self.client.set_credentials('client3', 'secret') |
|
333 |
r = self.client.authorize_code('client3', urlparams=params) |
|
334 |
self.assertEqual(r.status_code, 302) |
|
335 |
self.assertCount(AuthorizationCode, 4) |
|
336 |
|
|
337 |
# redirect is valid |
|
338 |
redirect4 = self.get_redirect_url(r) |
|
339 |
self.assertParam(redirect4, "code") |
|
340 |
self.assertParamEqual(redirect4, "state", 'csrfstate') |
|
341 |
self.assertNoParam(redirect4, "extra_param") |
|
342 |
self.assertHost(redirect4, "server3.com") |
|
343 |
self.assertPath(redirect4, "/handle_code") |
|
344 |
|
|
345 |
code4 = AuthorizationCode.objects.get(code=redirect4.params['code'][0]) |
|
346 |
self.assertEqual(code4.state, 'csrfstate') |
|
347 |
self.assertEqual(code4.redirect_uri, self.client3_redirect_uri) |
|
348 |
|
|
349 |
def test_get_token(self): |
|
350 |
# invalid method |
|
351 |
r = self.client.get(self.client.token_url) |
|
352 |
self.assertEqual(r.status_code, 405) |
|
353 |
self.assertTrue('Allow' in r) |
|
354 |
self.assertEqual(r['Allow'], 'POST') |
|
355 |
|
|
356 |
# invalid content type |
|
357 |
r = self.client.post(self.client.token_url) |
|
358 |
self.assertEqual(r.status_code, 400) |
|
359 |
|
|
360 |
# missing grant type |
|
361 |
r = self.client.post(self.client.token_url, |
|
362 |
content_type='application/x-www-form-urlencoded') |
|
363 |
self.assertEqual(r.status_code, 400) |
|
364 |
|
|
365 |
# unsupported grant type: client_credentials |
|
366 |
r = self.client.post(self.client.token_url, |
|
367 |
data='grant_type=client_credentials', |
|
368 |
content_type='application/x-www-form-urlencoded') |
|
369 |
self.assertEqual(r.status_code, 400) |
|
370 |
|
|
371 |
# unsupported grant type: token |
|
372 |
r = self.client.post(self.client.token_url, |
|
373 |
data='grant_type=token', |
|
374 |
content_type='application/x-www-form-urlencoded') |
|
375 |
self.assertEqual(r.status_code, 400) |
|
376 |
|
|
377 |
# invalid grant type |
|
378 |
r = self.client.post(self.client.token_url, |
|
379 |
data='grant_type=invalid', |
|
380 |
content_type='application/x-www-form-urlencoded') |
|
381 |
self.assertEqual(r.status_code, 400) |
|
382 |
|
|
383 |
# generate authorization code: without redirect_uri |
|
384 |
self.client.login(username="user@synnefo.org", password="password") |
|
385 |
r = self.client.authorize_code('client3') |
|
386 |
self.assertCount(AuthorizationCode, 1) |
|
387 |
redirect = self.get_redirect_url(r) |
|
388 |
code_instance = AuthorizationCode.objects.get( |
|
389 |
code=redirect.params['code'][0]) |
|
390 |
|
|
391 |
# no client_id & no client authorization |
|
392 |
r = self.client.access_token(code_instance.code) |
|
393 |
self.assertEqual(r.status_code, 400) |
|
394 |
|
|
395 |
# invalid client_id |
|
396 |
r = self.client.access_token(code_instance.code, client_id='client2') |
|
397 |
self.assertEqual(r.status_code, 400) |
|
398 |
|
|
399 |
# inexistent client_id |
|
400 |
r = self.client.access_token(code_instance.code, client_id='client42') |
|
401 |
self.assertEqual(r.status_code, 400) |
|
402 |
|
|
403 |
# no client authorization |
|
404 |
r = self.client.access_token(code_instance.code, client_id='client3') |
|
405 |
self.assertEqual(r.status_code, 400) |
|
406 |
|
|
407 |
# mixed up credentials/client_id's |
|
408 |
self.client.set_credentials('client1', 'secret') |
|
409 |
r = self.client.access_token(code_instance.code, client_id='client3') |
|
410 |
self.assertEqual(r.status_code, 400) |
|
411 |
|
|
412 |
# mixed up credentials/client_id's |
|
413 |
self.client.set_credentials('client3', 'secret') |
|
414 |
r = self.client.access_token(code_instance.code, client_id='client1') |
|
415 |
self.assertEqual(r.status_code, 400) |
|
416 |
|
|
417 |
# mismatching client |
|
418 |
self.client.set_credentials('client1', 'secret') |
|
419 |
r = self.client.access_token(code_instance.code, client_id='client1') |
|
420 |
self.assertEqual(r.status_code, 400) |
|
421 |
|
|
422 |
# invalid code |
|
423 |
self.client.set_credentials('client3', 'secret') |
|
424 |
r = self.client.access_token('invalid') |
|
425 |
self.assertEqual(r.status_code, 400) |
|
426 |
|
|
427 |
# valid request |
|
428 |
self.client.set_credentials('client3', 'secret') |
|
429 |
r = self.client.access_token(code_instance.code) |
|
430 |
self.assertCount(AuthorizationCode, 0) # assert code is consumed |
|
431 |
self.assertCount(Token, 1) |
|
432 |
expected = {'redirect_uri': self.client3_redirect_uri, |
|
433 |
'scope': self.client3_redirect_uri, |
|
434 |
'state': None} |
|
435 |
self.assert_access_token_response(r, expected) |
Also available in: Unified diff