Statistics
| Branch: | Tag: | Revision:

root / snf-astakos-app / astakos / oa2 / tests / djangobackend.py @ e28a4841

History | View | Annotate | Download (8.3 kB)

1
import urllib
2
import urlparse
3
import base64
4

    
5
from collections import namedtuple
6

    
7
from django.test import TransactionTestCase as TestCase
8
from django.test import Client as TestClient
9

    
10
from django.core.urlresolvers import reverse
11
from django.contrib.auth.models import User
12

    
13
from astakos.oa2.models import Client, AuthorizationCode
14

    
15

    
16
ParsedURL = namedtuple('ParsedURL', ['host', 'scheme', 'path', 'params',
17
                                     'url'])
18

    
19

    
20
def parsed_url_wrapper(func):
21
    def wrapper(self, url, *args, **kwargs):
22
        url = self.parse_url(url)
23
        return func(self, url, *args, **kwargs)
24
    return wrapper
25

    
26

    
27
class URLAssertionsMixin(object):
28

    
29
    def get_redirect_url(self, request):
30
        return self.parse_url(request['Location'])
31

    
32
    def parse_url(self, url):
33
        if isinstance(url, ParsedURL):
34
            return url
35
        result = urlparse.urlparse(url)
36
        parsed = {
37
            'url': url,
38
            'host': result.netloc,
39
            'scheme': result.scheme,
40
            'path': result.path,
41
        }
42
        parsed['params'] = urlparse.parse_qs(result.query)
43
        return ParsedURL(**parsed)
44

    
45
    @parsed_url_wrapper
46
    def assertParamEqual(self, url, key, value):
47
        self.assertParam(url, key)
48
        self.assertEqual(url.params[key][0], value)
49

    
50
    @parsed_url_wrapper
51
    def assertNoParam(self, url, key):
52
        self.assertFalse(key in url.params,
53
                         "Url '%s' does contain '%s' parameter" % (url.url,
54
                                                                   key))
55

    
56
    @parsed_url_wrapper
57
    def assertParam(self, url, key):
58
        self.assertTrue(key in url.params,
59
                        "Url '%s' does not contain '%s' parameter" % (url.url,
60
                                                                      key))
61

    
62
    @parsed_url_wrapper
63
    def assertHost(self, url, host):
64
        self.assertEqual(url.host, host)
65

    
66
    @parsed_url_wrapper
67
    def assertPath(self, url, path):
68
        self.assertEqual(url.path, path)
69

    
70
    @parsed_url_wrapper
71
    def assertSecure(self, url, key):
72
        self.assertEqual(url.scheme, "https")
73

    
74

    
75
class OA2Client(TestClient):
76
    """
77
    An OAuth2 agnostic test client.
78
    """
79
    def __init__(self, baseurl, *args, **kwargs):
80
        self.oa2_url = baseurl
81
        self.token_url = self.oa2_url + 'token/'
82
        self.auth_url = self.oa2_url + 'auth/'
83
        self.credentials = kwargs.pop('credentials', ())
84

    
85
        kwargs['wsgi.url_scheme'] = 'https'
86
        return super(OA2Client, self).__init__(*args, **kwargs)
87

    
88
    def request(self, *args, **kwargs):
89
        #print kwargs.get('PATH_INFO') + '?' + kwargs.get('QUERY_STRING'), \
90
            #kwargs.get('HTTP_AUTHORIZATION', None)
91
        return super(OA2Client, self).request(*args, **kwargs)
92

    
93
    def get_url(self, token_or_auth, **params):
94
        return token_or_auth + '?' + urllib.urlencode(params)
95

    
96
    def grant(self, clientid, *args, **kwargs):
97
        """
98
        Do an authorization grant request.
99
        """
100
        params = {
101
            'grant_type': 'authorization_code',
102
            'client_id': clientid
103
        }
104
        urlparams = kwargs.pop('urlparams', {})
105
        params.update(urlparams)
106
        self.set_auth_headers(kwargs)
107
        return self.get(self.get_url(self.token_url, **params), *args,
108
                        **kwargs)
109

    
110
    def authorize_code(self, clientid, *args, **kwargs):
111
        """
112
        Do an authorization code request.
113
        """
114
        params = {
115
            'response_type': 'code',
116
            'client_id': clientid
117
        }
118
        urlparams = kwargs.pop('urlparams', {})
119
        urlparams.update(kwargs.pop('extraparams', {}))
120
        params.update(urlparams)
121
        self.set_auth_headers(kwargs)
122
        if 'reject' in params:
123
            return self.post(self.get_url(self.auth_url), data=params,
124
                             **kwargs)
125
        return self.get(self.get_url(self.auth_url, **params), *args, **kwargs)
126

    
127
    def set_auth_headers(self, params):
128
        print 'self.credentials:', self.credentials
129
        if not self.credentials:
130
            return
131
        credentials = base64.encodestring('%s:%s' % self.credentials).strip()
132
        params['HTTP_AUTHORIZATION'] = 'Basic %s' % credentials
133
        return params
134

    
135
    def set_credentials(self, user=None, pwd=None):
136
        self.credentials = (user, pwd)
137
        if not user and not pwd:
138
            self.credentials = ()
139

    
140

    
141
class TestOA2(TestCase, URLAssertionsMixin):
142

    
143
    def assertCount(self, model, count):
144
        self.assertEqual(model.objects.count(), count)
145

    
146
    def setUp(self):
147
        baseurl = reverse('oa2_authenticate').replace('/auth', '/')
148
        self.client = OA2Client(baseurl)
149
        client1 = Client.objects.create(identifier="client1", secret="secret")
150
        self.client1_redirect_uri = "https://server.com/handle_code"
151
        client1.redirecturl_set.create(url=self.client1_redirect_uri)
152

    
153
        client2 = Client.objects.create(identifier="client2", type='public')
154
        self.client2_redirect_uri = "https://server2.com/handle_code"
155
        client2.redirecturl_set.create(url=self.client2_redirect_uri)
156

    
157
        client3 = Client.objects.create(identifier="client3", secret='secret',
158
                                        is_trusted=True)
159
        self.client3_redirect_uri = "https://server3.com/handle_code"
160
        client3.redirecturl_set.create(url=self.client3_redirect_uri)
161

    
162
        u = User.objects.create(username="user@synnefo.org")
163
        u.set_password("password")
164
        u.save()
165

    
166
    def test_code_authorization(self):
167
        r = self.client.authorize_code('client-fake')
168
        self.assertEqual(r.status_code, 400)
169
        self.assertCount(AuthorizationCode, 0)
170

    
171
#        # no auth header, client is confidential
172
#        r = self.client.authorize_code('client1')
173
#        self.assertEqual(r.status_code, 400)
174
#        self.assertCount(AuthorizationCode, 0)
175

    
176
        # mixed up credentials/client_id's
177
        self.client.set_credentials('client1', 'secret')
178
        r = self.client.authorize_code('client2')
179
        self.assertEqual(r.status_code, 400)
180
        self.assertCount(AuthorizationCode, 0)
181

    
182
        self.client.set_credentials('client2', '')
183
        r = self.client.authorize_code('client2')
184
        self.assertEqual(r.status_code, 400)
185
        self.assertCount(AuthorizationCode, 0)
186

    
187
#        self.client.set_credentials()
188
#        r = self.client.authorize_code('client1')
189
#        self.assertEqual(r.status_code, 400)
190
#        self.assertCount(AuthorizationCode, 0)
191

    
192
        # valid request
193
        params = {'redirect_uri': self.client1_redirect_uri,
194
                  'extra_param': '123'}
195
        self.client.set_credentials('client1', 'secret')
196
        r = self.client.authorize_code('client1', urlparams=params)
197
        self.assertEqual(r.status_code, 302)
198
        self.assertTrue('Location' in r)
199
        p = urlparse.urlparse(r['Location'])
200
        self.assertEqual(p.netloc, 'testserver:80')
201
        self.assertEqual(p.path, reverse('login'))
202

    
203
        self.client.set_credentials('client1', 'secret')
204
        self.client.login(username="user@synnefo.org", password="password")
205
        r = self.client.authorize_code('client1', urlparams=params)
206
        self.assertEqual(r.status_code, 200)
207

    
208
        r = self.client.authorize_code('client1', urlparams=params,
209
                                       extraparams={'reject': 0})
210
        self.assertCount(AuthorizationCode, 1)
211

    
212
        # redirect is valid
213
        redirect1 = self.get_redirect_url(r)
214
        self.assertParam(redirect1, "code")
215
        self.assertNoParam(redirect1, "extra_param")
216
        self.assertHost(redirect1, "server.com")
217
        self.assertPath(redirect1, "/handle_code")
218

    
219
        params['state'] = 'csrfstate'
220
        params['scope'] = 'resource1'
221
        r = self.client.authorize_code('client1', urlparams=params)
222
        redirect2 = self.get_redirect_url(r)
223
        self.assertParamEqual(redirect2, "state", 'csrfstate')
224
        self.assertCount(AuthorizationCode, 2)
225

    
226
        code1 = AuthorizationCode.objects.get(code=redirect1.params['code'][0])
227
        #self.assertEqual(code1.state, '')
228
        self.assertEqual(code1.state, None)
229
        self.assertEqual(code1.redirect_uri, self.client1_redirect_uri)
230

    
231
        code2 = AuthorizationCode.objects.get(code=redirect2.params['code'][0])
232
        self.assertEqual(code2.state, 'csrfstate')
233
        self.assertEqual(code2.redirect_uri, self.client1_redirect_uri)