Revision c1e4d459

b/snf-astakos-app/astakos/oa2/tests/djangobackend.py
1 1
import urllib
2 2
import urlparse
3 3
import base64
4
import datetime
4 5

  
5 6
from collections import namedtuple
6 7

  
......
9 10

  
10 11
from django.core.urlresolvers import reverse
11 12
from django.contrib.auth.models import User
13
from django.utils import simplejson as json
12 14

  
13
from astakos.oa2.models import Client, AuthorizationCode
15
from astakos.oa2.models import Client, AuthorizationCode, Token
14 16

  
15 17

  
16 18
ParsedURL = namedtuple('ParsedURL', ['host', 'scheme', 'path', 'params',
......
124 126
                             **kwargs)
125 127
        return self.get(self.get_url(self.auth_url, **params), *args, **kwargs)
126 128

  
129
    def access_token(self, code,
130
                     content_type='application/x-www-form-urlencoded',
131
                     **kwargs):
132
        """
133
        Do an get token request.
134
        """
135
        params = {
136
            'grant_type': 'authorization_code',
137
            'code': code
138
        }
139
        params.update(kwargs)
140
        self.set_auth_headers(kwargs)
141
        return self.post(self.token_url, data=urllib.urlencode(params),
142
                         content_type=content_type, **kwargs)
143

  
127 144
    def set_auth_headers(self, params):
128
        print 'self.credentials:', self.credentials
129 145
        if not self.credentials:
130 146
            return
131 147
        credentials = base64.encodestring('%s:%s' % self.credentials).strip()
......
143 159
    def assertCount(self, model, count):
144 160
        self.assertEqual(model.objects.count(), count)
145 161

  
162
    def assert_access_token_response(self, r, expected):
163
        self.assertEqual(r.status_code, 200)
164
        try:
165
            data = json.loads(r.content)
166
        except:
167
            self.fail("Unexpected response content")
168

  
169
        self.assertTrue('access_token' in data)
170
        access_token = data['access_token']
171
        self.assertTrue('token_type' in data)
172
        token_type = data['token_type']
173
        self.assertTrue('expires_in' in data)
174
        expires_in = data['expires_in']
175

  
176
        try:
177
            token = Token.objects.get(code=access_token)
178
            self.assertEqual(token.expires_at,
179
                             token.created_at +
180
                             datetime.timedelta(seconds=expires_in))
181
            self.assertEqual(token.token_type, token_type)
182
            self.assertEqual(token.grant_type, 'authorization_code')
183
            #self.assertEqual(token.user, expected.get('user'))
184
            self.assertEqual(token.redirect_uri, expected.get('redirect_uri'))
185
            self.assertEqual(token.scope, expected.get('scope'))
186
            self.assertEqual(token.state, expected.get('state'))
187
        except Token.DoesNotExist:
188
            self.fail("Invalid access_token")
189

  
146 190
    def setUp(self):
147 191
        baseurl = reverse('oa2_authenticate').replace('/auth', '/')
148 192
        self.client = OA2Client(baseurl)
......
164 208
        u.save()
165 209

  
166 210
    def test_code_authorization(self):
167
        r = self.client.authorize_code('client-fake')
211
        # missing response_type
212
        r = self.client.get(self.client.get_url(self.client.auth_url))
213
        self.assertEqual(r.status_code, 400)
214
        self.assertCount(AuthorizationCode, 0)
215

  
216
        # invalid response_type
217
        r = self.client.get(self.client.get_url(self.client.auth_url,
218
                                                response_type='invalid'))
219
        self.assertEqual(r.status_code, 400)
220
        self.assertCount(AuthorizationCode, 0)
221

  
222
        # unsupported response_type
223
        r = self.client.get(self.client.get_url(self.client.auth_url,
224
                                                response_type='token'))
225
        self.assertEqual(r.status_code, 400)
226
        self.assertCount(AuthorizationCode, 0)
227

  
228
        # missing client_id
229
        r = self.client.get(self.client.get_url(self.client.auth_url,
230
                                                response_type='code'))
168 231
        self.assertEqual(r.status_code, 400)
169 232
        self.assertCount(AuthorizationCode, 0)
170 233

  
171
#        # no auth header, client is confidential
172
#        r = self.client.authorize_code('client1')
173
#        self.assertEqual(r.status_code, 400)
174
#        self.assertCount(AuthorizationCode, 0)
234
        # fake client
235
        r = self.client.authorize_code('client-fake')
236
        self.assertEqual(r.status_code, 400)
237
        self.assertCount(AuthorizationCode, 0)
175 238

  
176 239
        # mixed up credentials/client_id's
177 240
        self.client.set_credentials('client1', 'secret')
......
179 242
        self.assertEqual(r.status_code, 400)
180 243
        self.assertCount(AuthorizationCode, 0)
181 244

  
245
        # invalid credentials
182 246
        self.client.set_credentials('client2', '')
183 247
        r = self.client.authorize_code('client2')
184 248
        self.assertEqual(r.status_code, 400)
185 249
        self.assertCount(AuthorizationCode, 0)
186 250

  
187
#        self.client.set_credentials()
188
#        r = self.client.authorize_code('client1')
189
#        self.assertEqual(r.status_code, 400)
190
#        self.assertCount(AuthorizationCode, 0)
251
        # invalid redirect_uri: not absolute URI
252
        self.client.set_credentials()
253
        params = {'redirect_uri':
254
                  urlparse.urlparse(self.client1_redirect_uri).path}
255
        r = self.client.authorize_code('client1', urlparams=params)
256
        self.assertEqual(r.status_code, 400)
257
        self.assertCount(AuthorizationCode, 0)
191 258

  
192
        # valid request
259
        # mismatching redirect uri
260
        self.client.set_credentials()
261
        params = {'redirect_uri': self.client1_redirect_uri[1:]}
262
        r = self.client.authorize_code('client1', urlparams=params)
263
        self.assertEqual(r.status_code, 400)
264
        self.assertCount(AuthorizationCode, 0)
265

  
266
        # valid request: untrusted client
193 267
        params = {'redirect_uri': self.client1_redirect_uri,
268
                  'scope': self.client1_redirect_uri,
194 269
                  'extra_param': '123'}
195 270
        self.client.set_credentials('client1', 'secret')
196 271
        r = self.client.authorize_code('client1', urlparams=params)
197 272
        self.assertEqual(r.status_code, 302)
198 273
        self.assertTrue('Location' in r)
199
        p = urlparse.urlparse(r['Location'])
200
        self.assertEqual(p.netloc, 'testserver:80')
201
        self.assertEqual(p.path, reverse('login'))
274
        self.assertHost(r['Location'], "testserver:80")
275
        self.assertPath(r['Location'], reverse('login'))
202 276

  
203 277
        self.client.set_credentials('client1', 'secret')
204 278
        self.client.login(username="user@synnefo.org", password="password")
......
231 305
        code2 = AuthorizationCode.objects.get(code=redirect2.params['code'][0])
232 306
        self.assertEqual(code2.state, 'csrfstate')
233 307
        self.assertEqual(code2.redirect_uri, self.client1_redirect_uri)
308

  
309
        # valid request: trusted client
310
        params = {'redirect_uri': self.client3_redirect_uri,
311
                  'scope': self.client3_redirect_uri,
312
                  'extra_param': '123'}
313
        self.client.set_credentials('client3', 'secret')
314
        r = self.client.authorize_code('client3', urlparams=params)
315
        self.assertEqual(r.status_code, 302)
316
        self.assertCount(AuthorizationCode, 3)
317

  
318
        # redirect is valid
319
        redirect3 = self.get_redirect_url(r)
320
        self.assertParam(redirect1, "code")
321
        self.assertNoParam(redirect3, "state")
322
        self.assertNoParam(redirect3, "extra_param")
323
        self.assertHost(redirect3, "server3.com")
324
        self.assertPath(redirect3, "/handle_code")
325

  
326
        code3 = AuthorizationCode.objects.get(code=redirect3.params['code'][0])
327
        self.assertEqual(code3.state, None)
328
        self.assertEqual(code3.redirect_uri, self.client3_redirect_uri)
329

  
330
        # valid request: trusted client
331
        params['state'] = 'csrfstate'
332
        self.client.set_credentials('client3', 'secret')
333
        r = self.client.authorize_code('client3', urlparams=params)
334
        self.assertEqual(r.status_code, 302)
335
        self.assertCount(AuthorizationCode, 4)
336

  
337
        # redirect is valid
338
        redirect4 = self.get_redirect_url(r)
339
        self.assertParam(redirect4, "code")
340
        self.assertParamEqual(redirect4, "state", 'csrfstate')
341
        self.assertNoParam(redirect4, "extra_param")
342
        self.assertHost(redirect4, "server3.com")
343
        self.assertPath(redirect4, "/handle_code")
344

  
345
        code4 = AuthorizationCode.objects.get(code=redirect4.params['code'][0])
346
        self.assertEqual(code4.state, 'csrfstate')
347
        self.assertEqual(code4.redirect_uri, self.client3_redirect_uri)
348

  
349
    def test_get_token(self):
350
        # invalid method
351
        r = self.client.get(self.client.token_url)
352
        self.assertEqual(r.status_code, 405)
353
        self.assertTrue('Allow' in r)
354
        self.assertEqual(r['Allow'], 'POST')
355

  
356
        # invalid content type
357
        r = self.client.post(self.client.token_url)
358
        self.assertEqual(r.status_code, 400)
359

  
360
        # missing grant type
361
        r = self.client.post(self.client.token_url,
362
                             content_type='application/x-www-form-urlencoded')
363
        self.assertEqual(r.status_code, 400)
364

  
365
        # unsupported grant type: client_credentials
366
        r = self.client.post(self.client.token_url,
367
                             data='grant_type=client_credentials',
368
                             content_type='application/x-www-form-urlencoded')
369
        self.assertEqual(r.status_code, 400)
370

  
371
        # unsupported grant type: token
372
        r = self.client.post(self.client.token_url,
373
                             data='grant_type=token',
374
                             content_type='application/x-www-form-urlencoded')
375
        self.assertEqual(r.status_code, 400)
376

  
377
        # invalid grant type
378
        r = self.client.post(self.client.token_url,
379
                             data='grant_type=invalid',
380
                             content_type='application/x-www-form-urlencoded')
381
        self.assertEqual(r.status_code, 400)
382

  
383
        # generate authorization code: without redirect_uri
384
        self.client.login(username="user@synnefo.org", password="password")
385
        r = self.client.authorize_code('client3')
386
        self.assertCount(AuthorizationCode, 1)
387
        redirect = self.get_redirect_url(r)
388
        code_instance = AuthorizationCode.objects.get(
389
            code=redirect.params['code'][0])
390

  
391
        # no client_id & no client authorization
392
        r = self.client.access_token(code_instance.code)
393
        self.assertEqual(r.status_code, 400)
394

  
395
        # invalid client_id
396
        r = self.client.access_token(code_instance.code, client_id='client2')
397
        self.assertEqual(r.status_code, 400)
398

  
399
        # inexistent client_id
400
        r = self.client.access_token(code_instance.code, client_id='client42')
401
        self.assertEqual(r.status_code, 400)
402

  
403
        # no client authorization
404
        r = self.client.access_token(code_instance.code, client_id='client3')
405
        self.assertEqual(r.status_code, 400)
406

  
407
        # mixed up credentials/client_id's
408
        self.client.set_credentials('client1', 'secret')
409
        r = self.client.access_token(code_instance.code, client_id='client3')
410
        self.assertEqual(r.status_code, 400)
411

  
412
        # mixed up credentials/client_id's
413
        self.client.set_credentials('client3', 'secret')
414
        r = self.client.access_token(code_instance.code, client_id='client1')
415
        self.assertEqual(r.status_code, 400)
416

  
417
        # mismatching client
418
        self.client.set_credentials('client1', 'secret')
419
        r = self.client.access_token(code_instance.code, client_id='client1')
420
        self.assertEqual(r.status_code, 400)
421

  
422
        # invalid code
423
        self.client.set_credentials('client3', 'secret')
424
        r = self.client.access_token('invalid')
425
        self.assertEqual(r.status_code, 400)
426

  
427
        # valid request
428
        self.client.set_credentials('client3', 'secret')
429
        r = self.client.access_token(code_instance.code)
430
        self.assertCount(AuthorizationCode, 0)  # assert code is consumed
431
        self.assertCount(Token, 1)
432
        expected = {'redirect_uri': self.client3_redirect_uri,
433
                    'scope': self.client3_redirect_uri,
434
                    'state': None}
435
        self.assert_access_token_response(r, expected)

Also available in: Unified diff