root / snf-astakos-app / astakos / oa2 / tests / simple.py @ 3fc7fd80
History | View | Annotate | Download (7.1 kB)
1 |
import unittest |
---|---|
2 |
import base64 |
3 |
import urllib |
4 |
|
5 |
from astakos.oa2.backends import SimpleBackend |
6 |
from astakos.oa2.backends.base import Token, Client, AuthorizationCode, User |
7 |
from astakos.oa2.backends.base import Request, Response, OA2Error |
8 |
|
9 |
|
10 |
# Test helpers
|
11 |
class OA2TestCase(unittest.TestCase): |
12 |
|
13 |
def _get_request(self, method, get, post, meta, secure): |
14 |
return Request(method=method, GET=get, POST=post, META=meta,
|
15 |
secure=secure) |
16 |
|
17 |
def auth_request(self, request, username, secret): |
18 |
username = urllib.quote_plus(username) |
19 |
secret = urllib.quote_plus(secret) |
20 |
b64 = base64.encodestring('%s:%s' % (username, secret))[:-1] |
21 |
request.META['Authorization'] = 'Basic %s' % b64 |
22 |
|
23 |
def build_authorization_request(self, type, client_id, scope=None, |
24 |
state=None, uri=None, **extra): |
25 |
method = extra.get('method', 'GET') |
26 |
if method == 'POST': |
27 |
extra_params = extra.get('post', {})
|
28 |
params = {} |
29 |
post = params |
30 |
get = extra.get('get', {})
|
31 |
else:
|
32 |
extra_params = extra.get('get', {})
|
33 |
params = {} |
34 |
get = params |
35 |
post = extra.get('post', {})
|
36 |
|
37 |
params.update({'response_type': type, 'client_id': client_id}) |
38 |
if not state is None: |
39 |
params['state'] = state
|
40 |
if not scope is None: |
41 |
params['scope'] = scope
|
42 |
if not uri is None: |
43 |
params['redirect_uri'] = uri
|
44 |
|
45 |
params.update(extra_params) |
46 |
|
47 |
secure = extra.get('secure', True) |
48 |
meta = extra.get('meta', None) |
49 |
return self._get_request(method, get, post, meta, secure) |
50 |
|
51 |
def build_token_request(self, grant_type, client_id, scope=None, |
52 |
state=None, uri=None, **extra): |
53 |
method = extra.get('method', 'POST') |
54 |
params = {'grant_type': grant_type, 'client_id': client_id} |
55 |
if method == 'POST': |
56 |
extra_params = extra.get('post', {})
|
57 |
post = params |
58 |
get = extra.get('get', {})
|
59 |
else:
|
60 |
extra_params = extra.get('get', {})
|
61 |
get = params |
62 |
post = extra.get('post', {})
|
63 |
|
64 |
if scope:
|
65 |
params['scope'] = scope
|
66 |
if state:
|
67 |
params['state'] = state
|
68 |
if uri:
|
69 |
params['redirect_uri'] = uri
|
70 |
|
71 |
params.update(extra_params) |
72 |
|
73 |
secure = extra.get('secure', True) |
74 |
meta = extra.get('meta', None) |
75 |
return self._get_request(method, get, post, meta, secure) |
76 |
|
77 |
def assertRaisesOA2(self, *args, **kwargs): |
78 |
return self.assertRaises(OA2Error, *args, **kwargs) |
79 |
|
80 |
def assertResponseRedirect(self, response, url=None): |
81 |
self.assertResponseStatus(response, status=302) |
82 |
self.assertResponseContains(response, 'Location') |
83 |
if not url is None: |
84 |
self.assertEqual(response.headers.get('Location'), url) |
85 |
|
86 |
def assertResponseStatus(self, response, status=200): |
87 |
self.assertEqual(response.status, status)
|
88 |
|
89 |
def assertResponseContains(self, response, header, value=None): |
90 |
if not header in response.headers: |
91 |
raise AssertionError("Response does not contain '%s'" % header) |
92 |
if value is None: |
93 |
return
|
94 |
self.assertEqual(response.headers.get(header), value)
|
95 |
|
96 |
|
97 |
class TestClient(OA2TestCase): |
98 |
|
99 |
def _cleanup(self): |
100 |
Client.ENTRIES = {} |
101 |
Token.ENTRIES = {} |
102 |
User.ENTRIES = {} |
103 |
AuthorizationCode.ENTRIES = {} |
104 |
|
105 |
def setUp(self): |
106 |
self._cleanup()
|
107 |
uris = ['http://client1.synnefo.org/oauth2_callback']
|
108 |
self.client_public = Client.create('client_public', uris=uris, |
109 |
client_type='public')
|
110 |
self.client_conf = Client.create('client_conf', secret='pass', |
111 |
uris=uris, client_type='confidential')
|
112 |
self.backend = SimpleBackend(errors_to_http=False) |
113 |
|
114 |
def test_authorization(self): |
115 |
client_id = self.client_public.get_id()
|
116 |
auth_request = self.build_authorization_request
|
117 |
token_request = self.build_token_request
|
118 |
User.create("kpap@grnet.gr", name='kpap') |
119 |
|
120 |
def assert_codes_len(check=0): |
121 |
self.assertEqual(len(AuthorizationCode.ENTRIES.keys()), check) |
122 |
|
123 |
# plain http code request
|
124 |
req = auth_request('code', client_id, secure=False) |
125 |
self.assertRaisesOA2(self.backend.authorize, req) |
126 |
assert_codes_len(0)
|
127 |
|
128 |
# wrong method
|
129 |
req = auth_request('code', client_id, method='POST') |
130 |
self.assertRaisesOA2(self.backend.authorize, req) |
131 |
assert_codes_len(0)
|
132 |
|
133 |
# invalid client id
|
134 |
req = auth_request('code', 'client123') |
135 |
self.assertRaisesOA2(self.backend.authorize, req) |
136 |
assert_codes_len(0)
|
137 |
|
138 |
# invalid redirect uri
|
139 |
invalid_uri = 'http://client1.synnefo.org/oauth2_callback?invalid'
|
140 |
req = auth_request('code', client_id, uri=invalid_uri)
|
141 |
self.assertRaisesOA2(self.backend.authorize, req) |
142 |
assert_codes_len(0)
|
143 |
|
144 |
# code request
|
145 |
req = auth_request('code', client_id, scope="scope1 scope2") |
146 |
res = self.backend.authorize(req)
|
147 |
self.assertResponseRedirect(res)
|
148 |
assert_codes_len(1)
|
149 |
|
150 |
# authorize grant
|
151 |
auth_code = AuthorizationCode.ENTRIES.keys()[0]
|
152 |
req = token_request('authorization_code', client_id,
|
153 |
scope="scope1 scope2", post={'code': auth_code}) |
154 |
|
155 |
# invalid code
|
156 |
req.POST['code'] = "123" |
157 |
self.assertRaisesOA2(self.backend.grant, req) |
158 |
|
159 |
# valid code
|
160 |
req.POST['code'] = auth_code
|
161 |
res = self.backend.grant(req)
|
162 |
|
163 |
# code consumed
|
164 |
assert_codes_len(0)
|
165 |
|
166 |
# code reuse fails
|
167 |
self.assertRaisesOA2(self.backend.grant, req) |
168 |
|
169 |
# valid token scope
|
170 |
token = Token.ENTRIES.keys()[0]
|
171 |
token_obj = Token.get(token) |
172 |
self.assertEqual(token_obj.scope, "scope1 scope2") |
173 |
|
174 |
def test_authenticated_client(self): |
175 |
client_id = self.client_conf.get_id()
|
176 |
client_secret = self.client_conf.secret
|
177 |
auth_request = self.build_authorization_request
|
178 |
token_request = self.build_token_request
|
179 |
|
180 |
req = auth_request('code', client_id, scope="scope1 scope2") |
181 |
self.auth_request(req, client_id, client_secret)
|
182 |
|
183 |
def test_invalid_client(self): |
184 |
client_id = self.client_public.get_id()
|
185 |
auth_request = self.build_authorization_request
|
186 |
token_request = self.build_token_request
|
187 |
|
188 |
# code request
|
189 |
req = auth_request('code', 'client5', scope="scope1 scope2") |
190 |
self.assertRaisesOA2(self.backend.authorize, req) |
191 |
|
192 |
req = auth_request('code', client_id, scope="scope1 scope2") |
193 |
self.backend.authorize(req)
|
194 |
|
195 |
auth_code = AuthorizationCode.ENTRIES.keys()[0]
|
196 |
req = token_request('authorization_code', 'fakeclient', |
197 |
scope="scope1 scope2", post={'code': auth_code}) |
198 |
self.assertRaisesOA2(self.backend.grant, req) |
199 |
|
200 |
req.POST['client_id'] = client_id
|
201 |
self.backend.grant(req)
|
202 |
|
203 |
|
204 |
if __name__ == '__main__': |
205 |
unittest.main() |