Statistics
| Branch: | Tag: | Revision:

root / snf-astakos-app / astakos / im / tests.py @ f432088a

History | View | Annotate | Download (20.6 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import datetime
35

    
36
from django.test import TestCase, Client
37
from django.conf import settings
38
from django.core import mail
39

    
40
from astakos.im.target.shibboleth import Tokens as ShibbolethTokens
41
from astakos.im.models import *
42
from astakos.im import functions
43
from astakos.im import settings as astakos_settings
44

    
45
from urllib import quote
46

    
47
from astakos.im import messages
48

    
49
class ShibbolethClient(Client):
50
    """
51
    A shibboleth agnostic client.
52
    """
53
    VALID_TOKENS = filter(lambda x: not x.startswith("_"), dir(ShibbolethTokens))
54

    
55
    def __init__(self, *args, **kwargs):
56
        self.tokens = kwargs.pop('tokens', {})
57
        super(ShibbolethClient, self).__init__(*args, **kwargs)
58

    
59
    def set_tokens(self, **kwargs):
60
        for key, value in kwargs.iteritems():
61
            key = 'SHIB_%s' % key.upper()
62
            if not key in self.VALID_TOKENS:
63
                raise Exception('Invalid shibboleth token')
64

    
65
            self.tokens[key] = value
66

    
67
    def unset_tokens(self, *keys):
68
        for key in keys:
69
            key = 'SHIB_%s' % param.upper()
70
            if key in self.tokens:
71
                del self.tokens[key]
72

    
73
    def reset_tokens(self):
74
        self.tokens = {}
75

    
76
    def get_http_token(self, key):
77
        http_header = getattr(ShibbolethTokens, key)
78
        return http_header
79

    
80
    def request(self, **request):
81
        """
82
        Transform valid shibboleth tokens to http headers
83
        """
84
        for token, value in self.tokens.iteritems():
85
            request[self.get_http_token(token)] = value
86

    
87
        for param in request.keys():
88
            key = 'SHIB_%s' % param.upper()
89
            if key in self.VALID_TOKENS:
90
                request[self.get_http_token(key)] = request[param]
91
                del request[param]
92

    
93
        return super(ShibbolethClient, self).request(**request)
94

    
95

    
96
def get_local_user(username, **kwargs):
97
        try:
98
            return AstakosUser.objects.get(email=username)
99
        except:
100
            user_params = {
101
                'username': username,
102
                'email': username,
103
                'is_active': True,
104
                'activation_sent': datetime.now(),
105
                'email_verified': True,
106
                'provider': 'local'
107
            }
108
            user_params.update(kwargs)
109
            user = AstakosUser(**user_params)
110
            user.set_password(kwargs.get('password', 'password'))
111
            user.save()
112
            user.add_auth_provider('local', auth_backend='astakos')
113
            if kwargs.get('is_active', True):
114
                user.is_active = True
115
            else:
116
                user.is_active = False
117
            user.save()
118
            return user
119

    
120

    
121
def get_mailbox(email):
122
    mails = []
123
    for sent_email in mail.outbox:
124
        for recipient in sent_email.recipients():
125
            if email in recipient:
126
                mails.append(sent_email)
127
    return mails
128

    
129

    
130
class ShibbolethTests(TestCase):
131
    """
132
    Testing shibboleth authentication.
133
    """
134

    
135
    fixtures = ['groups']
136

    
137
    def setUp(self):
138
        self.client = ShibbolethClient()
139
        settings.ASTAKOS_IM_MODULES = ['local', 'shibboleth']
140

    
141
    def test_create_account(self):
142
        client = ShibbolethClient()
143

    
144
        # shibboleth views validation
145
        # eepn required
146
        r = client.get('/im/login/shibboleth?', follow=True)
147
        self.assertContains(r, messages.SHIBBOLETH_MISSING_EPPN)
148
        client.set_tokens(eppn="kpapeppn")
149
        # shibboleth user info required
150
        r = client.get('/im/login/shibboleth?', follow=True)
151
        self.assertContains(r, messages.SHIBBOLETH_MISSING_NAME)
152

    
153
        # shibboleth logged us in
154
        client.set_tokens(mail="kpap@grnet.gr", eppn="kpapeppn", cn="1", )
155
        r = client.get('/im/login/shibboleth?')
156

    
157
        # astakos asks if we want to add shibboleth
158
        self.assertContains(r, "Already have an account?")
159

    
160
        # a new pending user created
161
        pending_user = PendingThirdPartyUser.objects.get(
162
            third_party_identifier="kpapeppn")
163
        self.assertEqual(PendingThirdPartyUser.objects.count(), 1)
164
        token = pending_user.token
165
        # from now on no shibboleth headers are sent to the server
166
        client.reset_tokens()
167

    
168
        # we choose to signup as a new user
169
        r = client.get('/im/shibboleth/signup/%s' % pending_user.username)
170
        self.assertEqual(r.status_code, 404)
171

    
172
        r = client.get('/im/shibboleth/signup/%s' % token)
173
        form = r.context['form']
174
        post_data = {'email': 'kpap@grnet.gr',
175
                     'third_party_identifier': pending_user.third_party_identifier,
176
                     'first_name': 'Kostas',
177
                     'third_party_token': token,
178
                     'last_name': 'Mitroglou',
179
                     'additional_email': 'kpap@grnet.gr',
180
                     'provider': 'shibboleth'
181
                    }
182
        r = client.post('/im/signup', post_data)
183
        self.assertEqual(r.status_code, 200)
184
        self.assertEqual(AstakosUser.objects.count(), 1)
185
        self.assertEqual(PendingThirdPartyUser.objects.count(), 0)
186
        self.assertEqual(AstakosUserAuthProvider.objects.count(), 1)
187

    
188

    
189
        client.set_tokens(mail="kpap@grnet.gr", eppn="kpapeppn", cn="1", )
190
        r = client.get("/im/login/shibboleth?", follow=True)
191
        self.assertContains(r, "Your request is pending activation")
192
        r = client.get("/im/profile", follow=True)
193
        self.assertRedirects(r, 'http://testserver/im/?next=%2Fim%2Fprofile')
194

    
195
        u = AstakosUser.objects.get()
196
        functions.activate(u)
197
        self.assertEqual(u.is_active, True)
198

    
199
        r = client.get("/im/login/shibboleth?")
200
        self.assertRedirects(r, '/im/profile')
201

    
202
    def test_existing(self):
203
        existing_user = get_local_user('kpap@grnet.gr')
204

    
205
        client = ShibbolethClient()
206
        # shibboleth logged us in, notice that we use different email
207
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn", cn="1", )
208
        r = client.get("/im/login/shibboleth?")
209
        # astakos asks if we want to switch a local account to shibboleth
210
        self.assertContains(r, "Already have an account?")
211

    
212
        # a new pending user created
213
        pending_user = PendingThirdPartyUser.objects.get()
214
        self.assertEqual(PendingThirdPartyUser.objects.count(), 1)
215
        pending_key = pending_user.token
216
        client.reset_tokens()
217

    
218
        # we choose to add shibboleth to an our existing account
219
        # we get redirected to login page with the pending token set
220
        r = client.get('/im/login?key=%s' % pending_key)
221
        post_data = {'password': 'password',
222
                     'username': 'kpap@grnet.gr',
223
                     'key': pending_key}
224
        r = client.post('/im/local', post_data, follow=True)
225
        self.assertContains(r, "Your new login method has been added")
226

    
227
        user = AstakosUser.objects.get(username="kpap@grnet.gr",
228
                                       email="kpap@grnet.gr")
229
        self.assertTrue(user.has_auth_provider('shibboleth'))
230
        self.assertTrue(user.has_auth_provider('local', auth_backend='astakos'))
231
        client.logout()
232

    
233
        # again ???? show her a message
234
        r = client.get('/im/login?key=%s' % pending_key)
235
        post_data = {'password': 'password',
236
                     'username': 'kpap@grnet.gr',
237
                     'key': pending_key}
238
        r = self.client.post('/im/local', post_data, follow=True)
239
        self.assertContains(r, "Account method assignment failed")
240
        self.client.logout()
241
        client.logout()
242

    
243
        # look Ma, i can login with both my shibboleth and local account
244
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn", cn="1")
245
        r = client.get("/im/login/shibboleth?", follow=True)
246
        self.assertTrue(r.context['request'].user.is_authenticated())
247
        self.assertTrue(r.context['request'].user.email == "kpap@grnet.gr")
248
        r = client.get("/im/profile")
249
        self.assertEquals(r.status_code,200)
250
        client.logout()
251
        client.reset_tokens()
252
        r = client.get("/im/profile", follow=True)
253
        self.assertFalse(r.context['request'].user.is_authenticated())
254

    
255
        post_data = {'password': 'password',
256
                     'username': 'kpap@grnet.gr'}
257
        r = self.client.post('/im/local', post_data, follow=True)
258
        self.assertTrue(r.context['request'].user.is_authenticated())
259
        r = self.client.get("/im/profile")
260
        self.assertEquals(r.status_code,200)
261

    
262
        r = client.post('/im/local', post_data, follow=True)
263
        client.set_tokens(mail="secondary@shibboleth.gr", eppn="kpapeppn", cn="1", )
264
        r = client.get("/im/login/shibboleth?", follow=True)
265
        client.reset_tokens()
266

    
267
        client.logout()
268
        client.set_tokens(mail="kpap@grnet.gr", eppn="kpapeppninvalid", cn="1")
269
        r = client.get("/im/login/shibboleth?", follow=True)
270
        self.assertFalse(r.context['request'].user.is_authenticated())
271

    
272
        # lets remove local password
273
        user = AstakosUser.objects.get(username="kpap@grnet.gr",
274
                                       email="kpap@grnet.gr")
275
        provider_pk = user.auth_providers.get(module='local').pk
276
        provider_shib_pk = user.auth_providers.get(module='shibboleth').pk
277
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn", cn="1")
278
        r = client.get("/im/login/shibboleth?", follow=True)
279
        client.reset_tokens()
280
        r = client.get("/im/remove_auth_provider/%d" % provider_pk)
281
        self.assertEqual(user.auth_providers.count(), 1)
282
        r = client.get("/im/remove_auth_provider/%d" % provider_pk)
283
        self.assertEqual(r.status_code, 404)
284
        r = client.get("/im/remove_auth_provider/%d" % provider_shib_pk)
285
        self.assertEqual(r.status_code, 403)
286

    
287
        self.client.logout()
288
        post_data = {'password': 'password',
289
                     'username': 'kpap@grnet.gr'}
290
        r = self.client.post('/im/local', post_data, follow=True)
291
        self.assertFalse(r.context['request'].user.is_authenticated())
292

    
293
        r = client.get("/im/password_change", follow=True)
294
        r = client.post("/im/password_change", {'new_password1':'111',
295
                                                'new_password2': '111'},
296
                        follow=True)
297
        user = r.context['request'].user
298
        self.assertTrue(user.has_auth_provider('local'))
299
        self.assertTrue(user.has_auth_provider('shibboleth'))
300
        self.assertTrue(user.check_password('111'))
301
        self.assertTrue(user.has_usable_password())
302
        self.client.logout()
303
        post_data = {'password': '111',
304
                     'username': 'kpap@grnet.gr'}
305
        r = self.client.post('/im/local', post_data, follow=True)
306
        self.assertTrue(r.context['request'].user.is_authenticated())
307

    
308
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn", cn="1")
309
        r = client.get("/im/login/shibboleth?", follow=True)
310
        r = client.get("/im/login/shibboleth?", follow=True)
311
        user = AstakosUser.objects.get(username="kpap@grnet.gr",
312
                                       email="kpap@grnet.gr")
313

    
314
        user2 = get_local_user('another@grnet.gr')
315
        user2.add_auth_provider('shibboleth', identifier='existingeppn')
316

    
317
        self.assertEqual(user.auth_providers.count(), 2) # local and 1 shibboleth
318
        client.set_tokens(mail="kpap_second@shibboleth.gr", eppn="kpapeppn2", cn="1")
319
        r = client.get("/im/login/shibboleth?", follow=True)
320
        self.assertEqual(user.auth_providers.count(), 3) # local and 2 shibboleth
321

    
322
        client.set_tokens(mail="kpap_second@shibboleth.gr", eppn="kpapeppn2", cn="1")
323
        r = client.get("/im/login/shibboleth?", follow=True)
324

    
325
        client.set_tokens(mail="kpap_second@shibboleth.gr", eppn="existingeppn", cn="1")
326
        r = client.get("/im/login/shibboleth?", follow=True)
327
        self.assertContains(r, 'Account already exists')
328

    
329

    
330
class LocalUserTests(TestCase):
331

    
332
    fixtures = ['groups']
333

    
334
    def setUp(self):
335
        from django.conf import settings
336
        settings.ADMINS = (('admin', 'support@cloud.grnet.gr'),)
337
        settings.SERVER_EMAIL = 'no-reply@grnet.gr'
338

    
339
    def test_invitations(self):
340
        return
341

    
342
    def test_local_provider(self):
343
        r = self.client.get("/im/signup")
344
        self.assertEqual(r.status_code, 200)
345

    
346
        data = {'email':'kpap@grnet.gr', 'password1':'password',
347
                'password2':'password', 'first_name': 'Kostas',
348
                'last_name': 'Mitroglou', 'provider': 'local'}
349
        r = self.client.post("/im/signup", data)
350
        self.assertEqual(AstakosUser.objects.count(), 1)
351
        user = AstakosUser.objects.get(username="kpap@grnet.gr",
352
                                       email="kpap@grnet.gr")
353
        self.assertEqual(user.username, 'kpap@grnet.gr')
354
        self.assertEqual(user.has_auth_provider('local'), True)
355
        self.assertFalse(user.is_active)
356

    
357
        # admin gets notified
358
        self.assertEqual(len(get_mailbox('support@cloud.grnet.gr')), 1)
359
        # and sends user activation email
360
        functions.send_activation(user)
361

    
362
        # user activation fields updated
363
        user = AstakosUser.objects.get(pk=user.pk)
364
        self.assertTrue(user.activation_sent)
365
        self.assertFalse(user.email_verified)
366
        # email sent to user
367
        self.assertEqual(len(get_mailbox('kpap@grnet.gr')), 1)
368

    
369
        # user forgot she got registered and tries to submit registration
370
        # form. Notice the upper case in email
371
        data = {'email':'KPAP@grnet.gr', 'password1':'password',
372
                'password2':'password', 'first_name': 'Kostas',
373
                'last_name': 'Mitroglou', 'provider': 'local'}
374
        r = self.client.post("/im/signup", data)
375
        self.assertContains(r, messages.EMAIL_USED)
376

    
377
        # hmmm, email exists; lets get the password
378
        r = self.client.get('/im/local/password_reset')
379
        self.assertEqual(r.status_code, 200)
380
        r = self.client.post('/im/local/password_reset', {'email':
381
                                                          'kpap@grnet.gr'})
382
        # she can't because account is not active yet
383
        self.assertContains(r, "doesn't have an associated user account")
384

    
385
        # moderation is enabled so no automatic activation can be send
386
        r = self.client.get('/im/send/activation/%d' % user.pk)
387
        self.assertEqual(r.status_code, 403)
388
        self.assertEqual(len(get_mailbox('kpap@grnet.gr')), 1)
389
        # also she cannot login
390
        r = self.client.post('/im/local', {'username': 'kpap@grnet.gr',
391
                                                 'password': 'password'})
392
        self.assertContains(r, 'You have not followed the activation link')
393
        self.assertNotContains(r, 'Resend activation')
394
        self.assertFalse(r.context['request'].user.is_authenticated())
395
        self.assertFalse('_pithos2_a' in self.client.cookies)
396

    
397
        # lets disable moderation
398
        astakos_settings.MODERATION_ENABLED = False
399
        r = self.client.post('/im/local/password_reset', {'email':
400
                                                          'kpap@grnet.gr'})
401
        self.assertContains(r, "doesn't have an associated user account")
402
        r = self.client.post('/im/local', {'username': 'kpap@grnet.gr',
403
                                                 'password': 'password'})
404
        self.assertContains(r, 'You have not followed the activation link')
405
        self.assertContains(r, 'Resend activation')
406
        self.assertFalse(r.context['request'].user.is_authenticated())
407
        self.assertFalse('_pithos2_a' in self.client.cookies)
408
        # user sees the message and resends activation
409
        r = self.client.get('/im/send/activation/%d' % user.pk)
410
        # email sent
411
        self.assertEqual(len(get_mailbox('kpap@grnet.gr')), 2)
412

    
413
        # switch back moderation setting
414
        astakos_settings.MODERATION_ENABLED = True
415
        # lets activate the user
416
        r = self.client.get(user.get_activation_url(), follow=True)
417
        self.assertRedirects(r, "/im/profile")
418
        self.assertContains(r, "kpap@grnet.gr")
419
        self.assertEqual(len(get_mailbox('kpap@grnet.gr')), 3)
420

    
421
        user = AstakosUser.objects.get(pk=user.pk)
422
        # user activated and logged in, token cookie set
423
        self.assertTrue(r.context['request'].user.is_authenticated())
424
        self.assertTrue('_pithos2_a' in self.client.cookies)
425
        cookies = self.client.cookies
426
        self.assertTrue(quote(user.auth_token) in cookies.get('_pithos2_a').value)
427
        r = self.client.get('/im/logout', follow=True)
428
        r = self.client.get('/im/')
429
        # user logged out, token cookie removed
430
        self.assertFalse(r.context['request'].user.is_authenticated())
431
        self.assertFalse(self.client.cookies.get('_pithos2_a').value)
432
        # https://docs.djangoproject.com/en/dev/topics/testing/#persistent-state
433
        del self.client.cookies['_pithos2_a']
434

    
435
        # user can login
436
        r = self.client.post('/im/local', {'username': 'kpap@grnet.gr',
437
                                           'password': 'password'},
438
                                          follow=True)
439
        self.assertTrue(r.context['request'].user.is_authenticated())
440
        self.assertTrue('_pithos2_a' in self.client.cookies)
441
        cookies = self.client.cookies
442
        self.assertTrue(quote(user.auth_token) in cookies.get('_pithos2_a').value)
443
        self.client.get('/im/logout', follow=True)
444

    
445
        # user forgot password
446
        old_pass = user.password
447
        r = self.client.get('/im/local/password_reset')
448
        self.assertEqual(r.status_code, 200)
449
        r = self.client.post('/im/local/password_reset', {'email':
450
                                                          'kpap@grnet.gr'})
451
        self.assertEqual(r.status_code, 302)
452
        # email sent
453
        self.assertEqual(len(get_mailbox('kpap@grnet.gr')), 4)
454

    
455
        # user visits change password link
456
        r = self.client.get(user.get_password_reset_url())
457
        r = self.client.post(user.get_password_reset_url(),
458
                            {'new_password1':'newpass',
459
                             'new_password2':'newpass'})
460

    
461
        user = AstakosUser.objects.get(pk=user.pk)
462
        self.assertNotEqual(old_pass, user.password)
463

    
464
        # old pass is not usable
465
        r = self.client.post('/im/local', {'username': 'kpap@grnet.gr',
466
                                           'password': 'password'})
467
        self.assertContains(r, 'Please enter a correct username and password')
468
        r = self.client.post('/im/local', {'username': 'kpap@grnet.gr',
469
                                           'password': 'newpass'},
470
                                           follow=True)
471
        self.assertTrue(r.context['request'].user.is_authenticated())
472
        self.client.logout()
473

    
474
        # tests of special local backends
475
        user = AstakosUser.objects.get(pk=user.pk)
476
        user.auth_providers.filter(module='local').update(auth_backend='ldap')
477
        user.save()
478

    
479
        # non astakos local backends do not support password reset
480
        r = self.client.get('/im/local/password_reset')
481
        self.assertEqual(r.status_code, 200)
482
        r = self.client.post('/im/local/password_reset', {'email':
483
                                                          'kpap@grnet.gr'})
484
        # she can't because account is not active yet
485
        self.assertContains(r, "Password change for this account is not"
486
                                " supported")
487