root / snf-astakos-app / astakos / oa2 / tests / djangobackend.py @ e28a4841
History | View | Annotate | Download (8.3 kB)
1 |
import urllib |
---|---|
2 |
import urlparse |
3 |
import base64 |
4 |
|
5 |
from collections import namedtuple |
6 |
|
7 |
from django.test import TransactionTestCase as TestCase |
8 |
from django.test import Client as TestClient |
9 |
|
10 |
from django.core.urlresolvers import reverse |
11 |
from django.contrib.auth.models import User |
12 |
|
13 |
from astakos.oa2.models import Client, AuthorizationCode |
14 |
|
15 |
|
16 |
ParsedURL = namedtuple('ParsedURL', ['host', 'scheme', 'path', 'params', |
17 |
'url'])
|
18 |
|
19 |
|
20 |
def parsed_url_wrapper(func): |
21 |
def wrapper(self, url, *args, **kwargs): |
22 |
url = self.parse_url(url)
|
23 |
return func(self, url, *args, **kwargs) |
24 |
return wrapper
|
25 |
|
26 |
|
27 |
class URLAssertionsMixin(object): |
28 |
|
29 |
def get_redirect_url(self, request): |
30 |
return self.parse_url(request['Location']) |
31 |
|
32 |
def parse_url(self, url): |
33 |
if isinstance(url, ParsedURL): |
34 |
return url
|
35 |
result = urlparse.urlparse(url) |
36 |
parsed = { |
37 |
'url': url,
|
38 |
'host': result.netloc,
|
39 |
'scheme': result.scheme,
|
40 |
'path': result.path,
|
41 |
} |
42 |
parsed['params'] = urlparse.parse_qs(result.query)
|
43 |
return ParsedURL(**parsed)
|
44 |
|
45 |
@parsed_url_wrapper
|
46 |
def assertParamEqual(self, url, key, value): |
47 |
self.assertParam(url, key)
|
48 |
self.assertEqual(url.params[key][0], value) |
49 |
|
50 |
@parsed_url_wrapper
|
51 |
def assertNoParam(self, url, key): |
52 |
self.assertFalse(key in url.params, |
53 |
"Url '%s' does contain '%s' parameter" % (url.url,
|
54 |
key)) |
55 |
|
56 |
@parsed_url_wrapper
|
57 |
def assertParam(self, url, key): |
58 |
self.assertTrue(key in url.params, |
59 |
"Url '%s' does not contain '%s' parameter" % (url.url,
|
60 |
key)) |
61 |
|
62 |
@parsed_url_wrapper
|
63 |
def assertHost(self, url, host): |
64 |
self.assertEqual(url.host, host)
|
65 |
|
66 |
@parsed_url_wrapper
|
67 |
def assertPath(self, url, path): |
68 |
self.assertEqual(url.path, path)
|
69 |
|
70 |
@parsed_url_wrapper
|
71 |
def assertSecure(self, url, key): |
72 |
self.assertEqual(url.scheme, "https") |
73 |
|
74 |
|
75 |
class OA2Client(TestClient): |
76 |
"""
|
77 |
An OAuth2 agnostic test client.
|
78 |
"""
|
79 |
def __init__(self, baseurl, *args, **kwargs): |
80 |
self.oa2_url = baseurl
|
81 |
self.token_url = self.oa2_url + 'token/' |
82 |
self.auth_url = self.oa2_url + 'auth/' |
83 |
self.credentials = kwargs.pop('credentials', ()) |
84 |
|
85 |
kwargs['wsgi.url_scheme'] = 'https' |
86 |
return super(OA2Client, self).__init__(*args, **kwargs) |
87 |
|
88 |
def request(self, *args, **kwargs): |
89 |
#print kwargs.get('PATH_INFO') + '?' + kwargs.get('QUERY_STRING'), \
|
90 |
#kwargs.get('HTTP_AUTHORIZATION', None)
|
91 |
return super(OA2Client, self).request(*args, **kwargs) |
92 |
|
93 |
def get_url(self, token_or_auth, **params): |
94 |
return token_or_auth + '?' + urllib.urlencode(params) |
95 |
|
96 |
def grant(self, clientid, *args, **kwargs): |
97 |
"""
|
98 |
Do an authorization grant request.
|
99 |
"""
|
100 |
params = { |
101 |
'grant_type': 'authorization_code', |
102 |
'client_id': clientid
|
103 |
} |
104 |
urlparams = kwargs.pop('urlparams', {})
|
105 |
params.update(urlparams) |
106 |
self.set_auth_headers(kwargs)
|
107 |
return self.get(self.get_url(self.token_url, **params), *args, |
108 |
**kwargs) |
109 |
|
110 |
def authorize_code(self, clientid, *args, **kwargs): |
111 |
"""
|
112 |
Do an authorization code request.
|
113 |
"""
|
114 |
params = { |
115 |
'response_type': 'code', |
116 |
'client_id': clientid
|
117 |
} |
118 |
urlparams = kwargs.pop('urlparams', {})
|
119 |
urlparams.update(kwargs.pop('extraparams', {}))
|
120 |
params.update(urlparams) |
121 |
self.set_auth_headers(kwargs)
|
122 |
if 'reject' in params: |
123 |
return self.post(self.get_url(self.auth_url), data=params, |
124 |
**kwargs) |
125 |
return self.get(self.get_url(self.auth_url, **params), *args, **kwargs) |
126 |
|
127 |
def set_auth_headers(self, params): |
128 |
print 'self.credentials:', self.credentials |
129 |
if not self.credentials: |
130 |
return
|
131 |
credentials = base64.encodestring('%s:%s' % self.credentials).strip() |
132 |
params['HTTP_AUTHORIZATION'] = 'Basic %s' % credentials |
133 |
return params
|
134 |
|
135 |
def set_credentials(self, user=None, pwd=None): |
136 |
self.credentials = (user, pwd)
|
137 |
if not user and not pwd: |
138 |
self.credentials = ()
|
139 |
|
140 |
|
141 |
class TestOA2(TestCase, URLAssertionsMixin): |
142 |
|
143 |
def assertCount(self, model, count): |
144 |
self.assertEqual(model.objects.count(), count)
|
145 |
|
146 |
def setUp(self): |
147 |
baseurl = reverse('oa2_authenticate').replace('/auth', '/') |
148 |
self.client = OA2Client(baseurl)
|
149 |
client1 = Client.objects.create(identifier="client1", secret="secret") |
150 |
self.client1_redirect_uri = "https://server.com/handle_code" |
151 |
client1.redirecturl_set.create(url=self.client1_redirect_uri)
|
152 |
|
153 |
client2 = Client.objects.create(identifier="client2", type='public') |
154 |
self.client2_redirect_uri = "https://server2.com/handle_code" |
155 |
client2.redirecturl_set.create(url=self.client2_redirect_uri)
|
156 |
|
157 |
client3 = Client.objects.create(identifier="client3", secret='secret', |
158 |
is_trusted=True)
|
159 |
self.client3_redirect_uri = "https://server3.com/handle_code" |
160 |
client3.redirecturl_set.create(url=self.client3_redirect_uri)
|
161 |
|
162 |
u = User.objects.create(username="user@synnefo.org")
|
163 |
u.set_password("password")
|
164 |
u.save() |
165 |
|
166 |
def test_code_authorization(self): |
167 |
r = self.client.authorize_code('client-fake') |
168 |
self.assertEqual(r.status_code, 400) |
169 |
self.assertCount(AuthorizationCode, 0) |
170 |
|
171 |
# # no auth header, client is confidential
|
172 |
# r = self.client.authorize_code('client1')
|
173 |
# self.assertEqual(r.status_code, 400)
|
174 |
# self.assertCount(AuthorizationCode, 0)
|
175 |
|
176 |
# mixed up credentials/client_id's
|
177 |
self.client.set_credentials('client1', 'secret') |
178 |
r = self.client.authorize_code('client2') |
179 |
self.assertEqual(r.status_code, 400) |
180 |
self.assertCount(AuthorizationCode, 0) |
181 |
|
182 |
self.client.set_credentials('client2', '') |
183 |
r = self.client.authorize_code('client2') |
184 |
self.assertEqual(r.status_code, 400) |
185 |
self.assertCount(AuthorizationCode, 0) |
186 |
|
187 |
# self.client.set_credentials()
|
188 |
# r = self.client.authorize_code('client1')
|
189 |
# self.assertEqual(r.status_code, 400)
|
190 |
# self.assertCount(AuthorizationCode, 0)
|
191 |
|
192 |
# valid request
|
193 |
params = {'redirect_uri': self.client1_redirect_uri, |
194 |
'extra_param': '123'} |
195 |
self.client.set_credentials('client1', 'secret') |
196 |
r = self.client.authorize_code('client1', urlparams=params) |
197 |
self.assertEqual(r.status_code, 302) |
198 |
self.assertTrue('Location' in r) |
199 |
p = urlparse.urlparse(r['Location'])
|
200 |
self.assertEqual(p.netloc, 'testserver:80') |
201 |
self.assertEqual(p.path, reverse('login')) |
202 |
|
203 |
self.client.set_credentials('client1', 'secret') |
204 |
self.client.login(username="user@synnefo.org", password="password") |
205 |
r = self.client.authorize_code('client1', urlparams=params) |
206 |
self.assertEqual(r.status_code, 200) |
207 |
|
208 |
r = self.client.authorize_code('client1', urlparams=params, |
209 |
extraparams={'reject': 0}) |
210 |
self.assertCount(AuthorizationCode, 1) |
211 |
|
212 |
# redirect is valid
|
213 |
redirect1 = self.get_redirect_url(r)
|
214 |
self.assertParam(redirect1, "code") |
215 |
self.assertNoParam(redirect1, "extra_param") |
216 |
self.assertHost(redirect1, "server.com") |
217 |
self.assertPath(redirect1, "/handle_code") |
218 |
|
219 |
params['state'] = 'csrfstate' |
220 |
params['scope'] = 'resource1' |
221 |
r = self.client.authorize_code('client1', urlparams=params) |
222 |
redirect2 = self.get_redirect_url(r)
|
223 |
self.assertParamEqual(redirect2, "state", 'csrfstate') |
224 |
self.assertCount(AuthorizationCode, 2) |
225 |
|
226 |
code1 = AuthorizationCode.objects.get(code=redirect1.params['code'][0]) |
227 |
#self.assertEqual(code1.state, '')
|
228 |
self.assertEqual(code1.state, None) |
229 |
self.assertEqual(code1.redirect_uri, self.client1_redirect_uri) |
230 |
|
231 |
code2 = AuthorizationCode.objects.get(code=redirect2.params['code'][0]) |
232 |
self.assertEqual(code2.state, 'csrfstate') |
233 |
self.assertEqual(code2.redirect_uri, self.client1_redirect_uri) |