Statistics
| Branch: | Tag: | Revision:

root / snf-astakos-app / astakos / im / tests.py @ d2633501

History | View | Annotate | Download (17.5 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import datetime
35

    
36
from django.test import TestCase, Client
37
from django.conf import settings
38
from django.core import mail
39

    
40
from astakos.im.target.shibboleth import Tokens as ShibbolethTokens
41
from astakos.im.models import *
42
from astakos.im import functions
43
from astakos.im import settings as astakos_settings
44

    
45
from urllib import quote
46

    
47
class ShibbolethClient(Client):
48
    """
49
    A shibboleth agnostic client.
50
    """
51
    VALID_TOKENS = filter(lambda x: not x.startswith("_"), dir(ShibbolethTokens))
52

    
53
    def __init__(self, *args, **kwargs):
54
        self.tokens = kwargs.pop('tokens', {})
55
        super(ShibbolethClient, self).__init__(*args, **kwargs)
56

    
57
    def set_tokens(self, **kwargs):
58
        for key, value in kwargs.iteritems():
59
            key = 'SHIB_%s' % key.upper()
60
            if not key in self.VALID_TOKENS:
61
                raise Exception('Invalid shibboleth token')
62

    
63
            self.tokens[key] = value
64

    
65
    def unset_tokens(self, *keys):
66
        for key in keys:
67
            key = 'SHIB_%s' % param.upper()
68
            if key in self.tokens:
69
                del self.tokens[key]
70

    
71
    def reset_tokens(self):
72
        self.tokens = {}
73

    
74
    def get_http_token(self, key):
75
        http_header = getattr(ShibbolethTokens, key)
76
        return http_header
77

    
78
    def request(self, **request):
79
        """
80
        Transform valid shibboleth tokens to http headers
81
        """
82
        for token, value in self.tokens.iteritems():
83
            request[self.get_http_token(token)] = value
84

    
85
        for param in request.keys():
86
            key = 'SHIB_%s' % param.upper()
87
            if key in self.VALID_TOKENS:
88
                request[self.get_http_token(key)] = request[param]
89
                del request[param]
90

    
91
        return super(ShibbolethClient, self).request(**request)
92

    
93

    
94
def get_local_user(username, **kwargs):
95
        try:
96
            return AstakosUser.objects.get(email=username)
97
        except:
98
            user_params = {
99
                'username': username,
100
                'email': username,
101
                'is_active': True,
102
                'activation_sent': datetime.now(),
103
                'email_verified': True,
104
                'provider': 'local'
105
            }
106
            user_params.update(kwargs)
107
            user = AstakosUser(**user_params)
108
            user.set_password(kwargs.get('password', 'password'))
109
            user.save()
110
            user.add_auth_provider('local', auth_backend='astakos')
111
            if kwargs.get('is_active', True):
112
                user.is_active = True
113
            else:
114
                user.is_active = False
115
            user.save()
116
            return user
117

    
118

    
119
def get_mailbox(email):
120
    mails = []
121
    for sent_email in mail.outbox:
122
        for recipient in sent_email.recipients():
123
            if email in recipient:
124
                mails.append(sent_email)
125
    return mails
126

    
127

    
128
class ShibbolethTests(TestCase):
129
    """
130
    Testing shibboleth authentication.
131
    """
132

    
133
    fixtures = ['groups']
134

    
135
    def setUp(self):
136
        self.client = ShibbolethClient()
137
        settings.ASTAKOS_IM_MODULES = ['local', 'shibboleth']
138

    
139
    def test_create_account(self):
140
        client = ShibbolethClient()
141

    
142
        # shibboleth views validation
143
        # eepn required
144
        r = client.get('/im/login/shibboleth?', follow=True)
145
        self.assertContains(r, 'Missing provider token')
146
        client.set_tokens(eppn="kpapeppn")
147
        # shibboleth user info required
148
        r = client.get('/im/login/shibboleth?', follow=True)
149
        self.assertContains(r, 'Missing provider user information')
150

    
151
        # shibboleth logged us in
152
        client.set_tokens(mail="kpap@grnet.gr", eppn="kpapeppn", cn="1", )
153
        r = client.get('/im/login/shibboleth?')
154

    
155
        # astakos asks if we want to add shibboleth
156
        self.assertContains(r, "Already have an account?")
157

    
158
        # a new pending user created
159
        pending_user = PendingThirdPartyUser.objects.get(
160
            third_party_identifier="kpapeppn")
161
        self.assertEqual(PendingThirdPartyUser.objects.count(), 1)
162
        token = pending_user.token
163
        # from now on no shibboleth headers are sent to the server
164
        client.reset_tokens()
165

    
166
        # we choose to signup as a new user
167
        r = client.get('/im/shibboleth/signup/%s' % pending_user.username)
168
        self.assertEqual(r.status_code, 404)
169

    
170
        r = client.get('/im/shibboleth/signup/%s' % token)
171
        form = r.context['form']
172
        post_data = {'email': 'kpap@grnet.gr',
173
                     'third_party_identifier': pending_user.third_party_identifier,
174
                     'first_name': 'Kostas',
175
                     'third_party_token': token,
176
                     'last_name': 'Mitroglou',
177
                     'additional_email': 'kpap@grnet.gr',
178
                     'provider': 'shibboleth'
179
                    }
180
        r = client.post('/im/signup', post_data)
181
        self.assertEqual(r.status_code, 200)
182
        self.assertEqual(AstakosUser.objects.count(), 1)
183
        self.assertEqual(PendingThirdPartyUser.objects.count(), 0)
184
        self.assertEqual(AstakosUserAuthProvider.objects.count(), 1)
185

    
186

    
187
        client.set_tokens(mail="kpap@grnet.gr", eppn="kpapeppn", cn="1", )
188
        r = client.get("/im/login/shibboleth?", follow=True)
189
        self.assertContains(r, "Your request is pending activation")
190
        r = client.get("/im/profile", follow=True)
191
        self.assertRedirects(r, 'http://testserver/im/?next=%2Fim%2Fprofile')
192

    
193
        u = AstakosUser.objects.get()
194
        functions.activate(u)
195
        self.assertEqual(u.is_active, True)
196

    
197
        r = client.get("/im/login/shibboleth?")
198
        self.assertRedirects(r, '/im/profile')
199

    
200
    def test_existing(self):
201
        existing_user = get_local_user('kpap@grnet.gr')
202

    
203
        client = ShibbolethClient()
204
        # shibboleth logged us in, notice that we use different email
205
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn", cn="1", )
206
        r = client.get("/im/login/shibboleth?")
207
        # astakos asks if we want to switch a local account to shibboleth
208
        self.assertContains(r, "Already have an account?")
209

    
210
        # a new pending user created
211
        pending_user = PendingThirdPartyUser.objects.get()
212
        self.assertEqual(PendingThirdPartyUser.objects.count(), 1)
213
        pending_key = pending_user.token
214
        client.reset_tokens()
215

    
216
        # we choose to add shibboleth to an our existing account
217
        # we get redirected to login page with the pending token set
218
        r = client.get('/im/login?key=%s' % pending_key)
219
        post_data = {'password': 'password',
220
                     'username': 'kpap@grnet.gr',
221
                     'key': pending_key}
222
        r = client.post('/im/local', post_data, follow=True)
223
        self.assertContains(r, "Your new login method has been added")
224

    
225
        user = AstakosUser.objects.get(username="kpap@grnet.gr",
226
                                       email="kpap@grnet.gr")
227
        self.assertTrue(user.has_auth_provider('shibboleth'))
228
        self.assertTrue(user.has_auth_provider('local', auth_backend='astakos'))
229
        client.logout()
230

    
231
        # again ???? show her a message
232
        r = client.get('/im/login?key=%s' % pending_key)
233
        post_data = {'password': 'password',
234
                     'username': 'kpap@grnet.gr',
235
                     'key': pending_key}
236
        r = self.client.post('/im/local', post_data, follow=True)
237
        self.assertContains(r, "Account method assignment failed")
238
        self.client.logout()
239
        client.logout()
240

    
241
        # look Ma, i can login with both my shibboleth and local account
242
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn", cn="1")
243
        r = client.get("/im/login/shibboleth?", follow=True)
244
        self.assertTrue(r.context['request'].user.is_authenticated())
245
        self.assertTrue(r.context['request'].user.email == "kpap@grnet.gr")
246
        r = client.get("/im/profile")
247
        self.assertEquals(r.status_code,200)
248
        client.logout()
249
        client.reset_tokens()
250
        r = client.get("/im/profile", follow=True)
251
        self.assertFalse(r.context['request'].user.is_authenticated())
252

    
253
        post_data = {'password': 'password',
254
                     'username': 'kpap@grnet.gr'}
255
        r = self.client.post('/im/local', post_data, follow=True)
256
        self.assertTrue(r.context['request'].user.is_authenticated())
257
        r = self.client.get("/im/profile")
258
        self.assertEquals(r.status_code,200)
259

    
260
        r = client.post('/im/local', post_data, follow=True)
261
        client.set_tokens(mail="secondary@shibboleth.gr", eppn="kpapeppn", cn="1", )
262
        r = client.get("/im/login/shibboleth?", follow=True)
263
        client.reset_tokens()
264

    
265
        client.logout()
266
        client.set_tokens(mail="kpap@grnet.gr", eppn="kpapeppninvalid", cn="1")
267
        r = client.get("/im/login/shibboleth?", follow=True)
268
        self.assertFalse(r.context['request'].user.is_authenticated())
269

    
270

    
271
class LocalUserTests(TestCase):
272

    
273
    fixtures = ['groups']
274

    
275
    def test_invitations(self):
276
        return
277

    
278
    def test_local_provider(self):
279
        r = self.client.get("/im/signup")
280
        self.assertEqual(r.status_code, 200)
281

    
282
        data = {'email':'kpap@grnet.gr', 'password1':'password',
283
                'password2':'password', 'first_name': 'Kostas',
284
                'last_name': 'Mitroglou', 'provider': 'local'}
285
        r = self.client.post("/im/signup", data)
286
        self.assertEqual(AstakosUser.objects.count(), 1)
287
        user = AstakosUser.objects.get(username="kpap@grnet.gr",
288
                                       email="kpap@grnet.gr")
289
        self.assertEqual(user.username, 'kpap@grnet.gr')
290
        self.assertEqual(user.has_auth_provider('local'), True)
291
        self.assertFalse(user.is_active)
292

    
293
        # admin gets notified
294
        self.assertEqual(len(get_mailbox('support@cloud.grnet.gr')), 1)
295
        # and sends user activation email
296
        functions.send_activation(user)
297

    
298
        # user activation fields updated
299
        user = AstakosUser.objects.get(pk=user.pk)
300
        self.assertTrue(user.activation_sent)
301
        self.assertFalse(user.email_verified)
302
        # email sent to user
303
        self.assertEqual(len(get_mailbox('kpap@grnet.gr')), 1)
304

    
305
        # user forgot she got registered and tries to submit registration
306
        # form. Notice the upper case in email
307
        data = {'email':'KPAP@grnet.gr', 'password1':'password',
308
                'password2':'password', 'first_name': 'Kostas',
309
                'last_name': 'Mitroglou', 'provider': 'local'}
310
        r = self.client.post("/im/signup", data)
311
        self.assertContains(r, "This email is already used")
312

    
313
        # hmmm, email exists; lets get the password
314
        r = self.client.get('/im/local/password_reset')
315
        self.assertEqual(r.status_code, 200)
316
        r = self.client.post('/im/local/password_reset', {'email':
317
                                                          'kpap@grnet.gr'})
318
        # she can't because account is not active yet
319
        self.assertContains(r, "doesn't have an associated user account")
320

    
321
        # moderation is enabled so no automatic activation can be send
322
        r = self.client.get('/im/send/activation/%d' % user.pk)
323
        self.assertEqual(r.status_code, 403)
324
        self.assertEqual(len(get_mailbox('kpap@grnet.gr')), 1)
325
        # also she cannot login
326
        r = self.client.post('/im/local', {'username': 'kpap@grnet.gr',
327
                                                 'password': 'password'})
328
        self.assertContains(r, 'You have not followed the activation link')
329
        self.assertNotContains(r, 'Resend activation')
330
        self.assertFalse(r.context['request'].user.is_authenticated())
331
        self.assertFalse('_pithos2_a' in self.client.cookies)
332

    
333
        # lets disable moderation
334
        astakos_settings.MODERATION_ENABLED = False
335
        r = self.client.post('/im/local/password_reset', {'email':
336
                                                          'kpap@grnet.gr'})
337
        self.assertContains(r, "doesn't have an associated user account")
338
        r = self.client.post('/im/local', {'username': 'kpap@grnet.gr',
339
                                                 'password': 'password'})
340
        self.assertContains(r, 'You have not followed the activation link')
341
        self.assertContains(r, 'Resend activation')
342
        self.assertFalse(r.context['request'].user.is_authenticated())
343
        self.assertFalse('_pithos2_a' in self.client.cookies)
344
        # user sees the message and resends activation
345
        r = self.client.get('/im/send/activation/%d' % user.pk)
346
        # email sent
347
        self.assertEqual(len(get_mailbox('kpap@grnet.gr')), 2)
348

    
349
        # switch back moderation setting
350
        astakos_settings.MODERATION_ENABLED = True
351
        # lets activate the user
352
        r = self.client.get(user.get_activation_url(), follow=True)
353
        self.assertRedirects(r, "/im/profile")
354
        self.assertContains(r, "kpap@grnet.gr")
355
        self.assertEqual(len(get_mailbox('kpap@grnet.gr')), 3)
356

    
357
        user = AstakosUser.objects.get(pk=user.pk)
358
        # user activated and logged in, token cookie set
359
        self.assertTrue(r.context['request'].user.is_authenticated())
360
        self.assertTrue('_pithos2_a' in self.client.cookies)
361
        cookies = self.client.cookies
362
        self.assertTrue(quote(user.auth_token) in cookies.get('_pithos2_a').value)
363
        r = self.client.get('/im/logout', follow=True)
364
        r = self.client.get('/im/')
365
        # user logged out, token cookie removed
366
        self.assertFalse(r.context['request'].user.is_authenticated())
367
        self.assertFalse(self.client.cookies.get('_pithos2_a').value)
368
        # https://docs.djangoproject.com/en/dev/topics/testing/#persistent-state
369
        del self.client.cookies['_pithos2_a']
370

    
371
        # user can login
372
        r = self.client.post('/im/local', {'username': 'kpap@grnet.gr',
373
                                           'password': 'password'},
374
                                          follow=True)
375
        self.assertTrue(r.context['request'].user.is_authenticated())
376
        self.assertTrue('_pithos2_a' in self.client.cookies)
377
        cookies = self.client.cookies
378
        self.assertTrue(quote(user.auth_token) in cookies.get('_pithos2_a').value)
379
        self.client.get('/im/logout', follow=True)
380

    
381
        # user forgot password
382
        old_pass = user.password
383
        r = self.client.get('/im/local/password_reset')
384
        self.assertEqual(r.status_code, 200)
385
        r = self.client.post('/im/local/password_reset', {'email':
386
                                                          'kpap@grnet.gr'})
387
        self.assertEqual(r.status_code, 302)
388
        # email sent
389
        self.assertEqual(len(get_mailbox('kpap@grnet.gr')), 4)
390

    
391
        # user visits change password link
392
        r = self.client.get(user.get_password_reset_url())
393
        r = self.client.post(user.get_password_reset_url(),
394
                            {'new_password1':'newpass',
395
                             'new_password2':'newpass'})
396

    
397
        user = AstakosUser.objects.get(pk=user.pk)
398
        self.assertNotEqual(old_pass, user.password)
399

    
400
        # old pass is not usable
401
        r = self.client.post('/im/local', {'username': 'kpap@grnet.gr',
402
                                           'password': 'password'})
403
        self.assertContains(r, 'Please enter a correct username and password')
404
        r = self.client.post('/im/local', {'username': 'kpap@grnet.gr',
405
                                           'password': 'newpass'},
406
                                           follow=True)
407
        self.assertTrue(r.context['request'].user.is_authenticated())
408
        self.client.logout()
409

    
410
        # tests of special local backends
411
        user = AstakosUser.objects.get(pk=user.pk)
412
        user.auth_providers.filter(module='local').update(auth_backend='ldap')
413
        user.save()
414

    
415
        # non astakos local backends do not support password reset
416
        r = self.client.get('/im/local/password_reset')
417
        self.assertEqual(r.status_code, 200)
418
        r = self.client.post('/im/local/password_reset', {'email':
419
                                                          'kpap@grnet.gr'})
420
        # she can't because account is not active yet
421
        self.assertContains(r, "Password change for this account is not"
422
                                " supported")
423