Statistics
| Branch: | Tag: | Revision:

root / snf-astakos-app / astakos / oa2 / tests / simple.py @ 3fc7fd80

History | View | Annotate | Download (7.1 kB)

1
import unittest
2
import base64
3
import urllib
4

    
5
from astakos.oa2.backends import SimpleBackend
6
from astakos.oa2.backends.base import Token, Client, AuthorizationCode, User
7
from astakos.oa2.backends.base import Request, Response, OA2Error
8

    
9

    
10
# Test helpers
11
class OA2TestCase(unittest.TestCase):
12

    
13
    def _get_request(self, method, get, post, meta, secure):
14
        return Request(method=method, GET=get, POST=post, META=meta,
15
                       secure=secure)
16

    
17
    def auth_request(self, request, username, secret):
18
        username = urllib.quote_plus(username)
19
        secret = urllib.quote_plus(secret)
20
        b64 = base64.encodestring('%s:%s' % (username, secret))[:-1]
21
        request.META['Authorization'] = 'Basic %s' % b64
22

    
23
    def build_authorization_request(self, type, client_id, scope=None,
24
                                    state=None, uri=None, **extra):
25
        method = extra.get('method', 'GET')
26
        if method == 'POST':
27
            extra_params = extra.get('post', {})
28
            params = {}
29
            post = params
30
            get = extra.get('get', {})
31
        else:
32
            extra_params = extra.get('get', {})
33
            params = {}
34
            get = params
35
            post = extra.get('post', {})
36

    
37
        params.update({'response_type': type, 'client_id': client_id})
38
        if not state is None:
39
            params['state'] = state
40
        if not scope is None:
41
            params['scope'] = scope
42
        if not uri is None:
43
            params['redirect_uri'] = uri
44

    
45
        params.update(extra_params)
46

    
47
        secure = extra.get('secure', True)
48
        meta = extra.get('meta', None)
49
        return self._get_request(method, get, post, meta, secure)
50

    
51
    def build_token_request(self, grant_type, client_id, scope=None,
52
                            state=None, uri=None, **extra):
53
        method = extra.get('method', 'POST')
54
        params = {'grant_type': grant_type, 'client_id': client_id}
55
        if method == 'POST':
56
            extra_params = extra.get('post', {})
57
            post = params
58
            get = extra.get('get', {})
59
        else:
60
            extra_params = extra.get('get', {})
61
            get = params
62
            post = extra.get('post', {})
63

    
64
        if scope:
65
            params['scope'] = scope
66
        if state:
67
            params['state'] = state
68
        if uri:
69
            params['redirect_uri'] = uri
70

    
71
        params.update(extra_params)
72

    
73
        secure = extra.get('secure', True)
74
        meta = extra.get('meta', None)
75
        return self._get_request(method, get, post, meta, secure)
76

    
77
    def assertRaisesOA2(self, *args, **kwargs):
78
        return self.assertRaises(OA2Error, *args, **kwargs)
79

    
80
    def assertResponseRedirect(self, response, url=None):
81
        self.assertResponseStatus(response, status=302)
82
        self.assertResponseContains(response, 'Location')
83
        if not url is None:
84
            self.assertEqual(response.headers.get('Location'), url)
85

    
86
    def assertResponseStatus(self, response, status=200):
87
        self.assertEqual(response.status, status)
88

    
89
    def assertResponseContains(self, response, header, value=None):
90
        if not header in response.headers:
91
            raise AssertionError("Response does not contain '%s'" % header)
92
        if value is None:
93
            return
94
        self.assertEqual(response.headers.get(header), value)
95

    
96

    
97
class TestClient(OA2TestCase):
98

    
99
    def _cleanup(self):
100
        Client.ENTRIES = {}
101
        Token.ENTRIES = {}
102
        User.ENTRIES = {}
103
        AuthorizationCode.ENTRIES = {}
104

    
105
    def setUp(self):
106
        self._cleanup()
107
        uris = ['http://client1.synnefo.org/oauth2_callback']
108
        self.client_public = Client.create('client_public', uris=uris,
109
                                           client_type='public')
110
        self.client_conf = Client.create('client_conf', secret='pass',
111
                                         uris=uris, client_type='confidential')
112
        self.backend = SimpleBackend(errors_to_http=False)
113

    
114
    def test_authorization(self):
115
        client_id = self.client_public.get_id()
116
        auth_request = self.build_authorization_request
117
        token_request = self.build_token_request
118
        User.create("kpap@grnet.gr", name='kpap')
119

    
120
        def assert_codes_len(check=0):
121
            self.assertEqual(len(AuthorizationCode.ENTRIES.keys()), check)
122

    
123
        # plain http code request
124
        req = auth_request('code', client_id, secure=False)
125
        self.assertRaisesOA2(self.backend.authorize, req)
126
        assert_codes_len(0)
127

    
128
        # wrong method
129
        req = auth_request('code', client_id, method='POST')
130
        self.assertRaisesOA2(self.backend.authorize, req)
131
        assert_codes_len(0)
132

    
133
        # invalid client id
134
        req = auth_request('code', 'client123')
135
        self.assertRaisesOA2(self.backend.authorize, req)
136
        assert_codes_len(0)
137

    
138
        # invalid redirect uri
139
        invalid_uri = 'http://client1.synnefo.org/oauth2_callback?invalid'
140
        req = auth_request('code', client_id, uri=invalid_uri)
141
        self.assertRaisesOA2(self.backend.authorize, req)
142
        assert_codes_len(0)
143

    
144
        # code request
145
        req = auth_request('code', client_id, scope="scope1 scope2")
146
        res = self.backend.authorize(req)
147
        self.assertResponseRedirect(res)
148
        assert_codes_len(1)
149

    
150
        # authorize grant
151
        auth_code = AuthorizationCode.ENTRIES.keys()[0]
152
        req = token_request('authorization_code', client_id,
153
                            scope="scope1 scope2", post={'code': auth_code})
154

    
155
        # invalid code
156
        req.POST['code'] = "123"
157
        self.assertRaisesOA2(self.backend.grant, req)
158

    
159
        # valid code
160
        req.POST['code'] = auth_code
161
        res = self.backend.grant(req)
162

    
163
        # code consumed
164
        assert_codes_len(0)
165

    
166
        # code reuse fails
167
        self.assertRaisesOA2(self.backend.grant, req)
168

    
169
        # valid token scope
170
        token = Token.ENTRIES.keys()[0]
171
        token_obj = Token.get(token)
172
        self.assertEqual(token_obj.scope, "scope1 scope2")
173

    
174
    def test_authenticated_client(self):
175
        client_id = self.client_conf.get_id()
176
        client_secret = self.client_conf.secret
177
        auth_request = self.build_authorization_request
178
        token_request = self.build_token_request
179

    
180
        req = auth_request('code', client_id, scope="scope1 scope2")
181
        self.auth_request(req, client_id, client_secret)
182

    
183
    def test_invalid_client(self):
184
        client_id = self.client_public.get_id()
185
        auth_request = self.build_authorization_request
186
        token_request = self.build_token_request
187

    
188
        # code request
189
        req = auth_request('code', 'client5', scope="scope1 scope2")
190
        self.assertRaisesOA2(self.backend.authorize, req)
191

    
192
        req = auth_request('code', client_id, scope="scope1 scope2")
193
        self.backend.authorize(req)
194

    
195
        auth_code = AuthorizationCode.ENTRIES.keys()[0]
196
        req = token_request('authorization_code', 'fakeclient',
197
                            scope="scope1 scope2", post={'code': auth_code})
198
        self.assertRaisesOA2(self.backend.grant, req)
199

    
200
        req.POST['client_id'] = client_id
201
        self.backend.grant(req)
202

    
203

    
204
if __name__ == '__main__':
205
    unittest.main()